Skip to content

Commit

Permalink
Merge branch 'hotfix/24.02.3'
Browse files Browse the repository at this point in the history
  • Loading branch information
myrho committed Mar 15, 2024
2 parents 5e00b8d + 42a3382 commit 497283c
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 67 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [24.02.3] - 2024-03-15

### Fixed
- Listing txs between addresses (list_address_txs)

## [24.02.2] - 2024-03-14

- Listing txs between addresses (list_address_txs)

## [24.02.1] - 2024-03-06

### Fixed
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ test-all-env:
tox -- graphsense

lint:
flake8 gsrest
flake8 ./gsrest --count --max-line-length=80 --statistics --exclude plugins

format:
autopep8 --in-place --recursive gsrest
Expand All @@ -16,6 +16,9 @@ format:
serve:
python -m aiohttp.web -H localhost -P 9000 openapi_server:main

dev:
adev runserver -p 9000 --root . --app-factory main openapi_server/__init__.py

build-docker:
docker build -t graphsense-rest .

Expand Down
154 changes: 89 additions & 65 deletions gsrest/db/cassandra.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@


class NetworkParameters(UserDict):

def __getitem__(self, network):
if network not in self:
raise NetworkNotFoundException(network)
Expand Down Expand Up @@ -88,13 +89,15 @@ def identity1(x):
def identity2(y, x):
return x


def fmt(v):
if isinstance(v, str):
return "'" + v + "'"
if isinstance(v, bytes):
return "0x" + v.hex()
return str(v)


def replaceFrom(keyspace, query):
r = re.compile(r'\s+FROM\s+', re.IGNORECASE)
return r.sub(f' FROM {keyspace}.', query)
Expand Down Expand Up @@ -130,7 +133,9 @@ def build_token_tx(token_currency, tx, token_tx, log):
"value": value["value"]
}


class BytesPrettyPrinter(PrettyPrinter):

def format(self, object, context, maxlevels, level):
if isinstance(object, bytes):
x = object.hex()
Expand All @@ -139,6 +144,7 @@ def format(self, object, context, maxlevels, level):


class Result:

def __init__(self, current_rows, params, paging_state):
self.current_rows = current_rows
self.params = params
Expand Down Expand Up @@ -172,20 +178,19 @@ def wc(cl, cond):


def merge_address_txs_subquery_results(
result_sets: Sequence[Result],
ascending: bool,
fetch_size: int,
tx_id_keys: str = "tx_id",
merge_all: bool = False,
fetched_limit: Optional[int] = None) -> Tuple[Sequence[dict], Optional[int]]:
result_sets: Sequence[Result],
ascending: bool,
fetch_size: int,
tx_id_keys: str = "tx_id",
fetched_limit: Optional[int] = None
) -> Tuple[Sequence[dict], Optional[int]]:
"""Merges sub results of the address txs queries per asset and direction
Args:
result_sets (Sequence[Result]): List of result sets,
one per parameter tuple
fetch_size (int): number of items return at most
tx_id_keys (str): name of the tx_id column
merge_all (bool): Just merge the sets without detecting page
fetched_limit (int): The limit that was used to fetch the result sets
Returns:
Expand All @@ -196,9 +201,11 @@ def merge_address_txs_subquery_results(

# find the least common tx_id where we then cut the result sets
border_tx_id = None
total_results_len = 0
for results in result_sets:
if not results:
continue
total_results_len += len(results)
if fetched_limit and len(results) < fetched_limit:
continue
if border_tx_id is None:
Expand All @@ -212,16 +219,10 @@ def merge_address_txs_subquery_results(
# filtered out rows could be overlapping with yet not retrieved result sets
candidates = [
v for results in result_sets for v in results
if border_tx_id is None
or merge_all
or ascending and v[tx_id_keys] <= border_tx_id
or not ascending and v[tx_id_keys] >= border_tx_id
if border_tx_id is None or ascending and v[tx_id_keys] <= border_tx_id
or not ascending and v[tx_id_keys] >= border_tx_id
]

results = heapq.nlargest(fetch_size,
candidates,
key=partial(transaction_ordering_key, tx_id_keys))

# Merge overlapping result sets by given sort order (uses a priority
# queue; heapq)
# fetch_sized items or less are returned
Expand All @@ -231,7 +232,8 @@ def merge_address_txs_subquery_results(
key=partial(transaction_ordering_key, tx_id_keys))

# use the last tx_id as page handle
border_tx_id = results[-1][tx_id_keys] if results else None
border_tx_id = results[-1][tx_id_keys] \
if results and total_results_len > fetch_size else None
return results, border_tx_id


Expand Down Expand Up @@ -284,7 +286,9 @@ def build_select_address_txs_statement(network: str, node_type: NodeType,


class Cassandra:

def eth(func):

def check(*args, **kwargs):
self = args[0]
currency = args[1]
Expand All @@ -303,6 +307,7 @@ def check(*args, **kwargs):
return check

def new(func):

def check(*args, **kwargs):
self = args[0]
currency = args[1]
Expand Down Expand Up @@ -574,9 +579,6 @@ def execute_async_lowlevel(self,
loop = asyncio.get_event_loop()
future = loop.create_future()

# h = hash(q + str(params))
# self.logger.debug(f'{h} {q} {params}')

def on_done(result):
if future.cancelled():
loop.call_soon_threadsafe(future.set_result, None)
Expand All @@ -587,14 +589,16 @@ def on_done(result):
if self.logger.level == logging.DEBUG:
if named_params:
formatted = query
for k,v in params.items():
formatted = formatted.replace("%(" + k + ")s", fmt(v))
for k, v in params.items():
formatted = formatted.replace(
"%(" + k + ")s", fmt(v))
else:
formatted = query % tuple([fmt(v) for v in params])
self.logger.debug(formatted)
pp = BytesPrettyPrinter()
# pp = BytesPrettyPrinter()
# self.logger.debug(pp.pformat(result.current_rows))
self.logger.debug(f'result size {len(result.current_rows)}')
self.logger.debug(
f'result size {len(result.current_rows)}')
loop.call_soon_threadsafe(future.set_result, result)

def on_err(result):
Expand Down Expand Up @@ -1119,7 +1123,6 @@ async def list_links(self,
first_value = 'output_value'
second_value = 'input_value'


if is_eth_like(currency):
token_config = self.get_token_configuration(currency)
include_assets = list(token_config.keys())
Expand All @@ -1142,9 +1145,14 @@ async def list_links(self,
page=page,
fetch_size=fs_it)

self.logger.debug(f'results1 {len(results1)} {new_page}')

tx_id = 'transaction_id' if is_eth_like(currency) else 'tx_id'

first_tx_ids = [row[tx_id] for row in results1]
first_tx_ids = \
[(row[tx_id], row['tx_reference']) for row in results1] \
if is_eth_like(currency) else \
[(row[tx_id], None) for row in results1]

assets = set([currency.upper()])
if is_eth_like(currency):
Expand All @@ -1164,13 +1172,13 @@ async def list_links(self,
tx_ids=first_tx_ids, # limit second set by tx ids of first set
page=page,
fetch_size=fs_it)
self.logger.debug(f'results2 {page} {len(results2)}')
self.logger.debug(f'results2 {len(results2)} {page}')

results1 = {row[tx_id]: row for row in results1}
if is_eth_like(currency):
results2 = await self.normalize_address_transactions(currency,
results2)
results2 = await self.normalize_address_transactions(
currency, results2)
else:
results1 = {row[tx_id]: row for row in results1}
tx_ids = [row[tx_id] for row in results2]
txs = await self.list_txs_by_ids(currency, tx_ids)

Expand All @@ -1183,27 +1191,33 @@ async def list_links(self,
row[second_value] = row['value']
row[first_value] = results1[row[tx_id]]['value']


for k, v in tx.items():
row[k] = v

if is_eth_like(currency):
# TODO probably this check is no longer necessary
# since we filtered on tx_ref level already
# in list_address_txs_ordered
if node_type == NodeType.CLUSTER:
neighbor = dst_node['root_address']
id = src_node['root_address']
# Token/Trace transactions might not be between the requested nodes
# so only keep the relevant ones
# Token/Trace transactions might not be between the requested
# nodes so only keep the relevant ones
before = len(results2)
results2 = [
tx for tx in results2
if tx["to_address"] == neighbor
tx for tx in results2 if tx["to_address"] == neighbor
and tx["from_address"] == id
]
self.logger.debug(f'pruned {before - len(results2)}')
final_results.extend(results2)

page = new_page
self.logger.debug(f'next page {page}')
if page is None:
break

self.logger.debug(f'final_results len {len(final_results)}')

return final_results, str(page) if page is not None else None

async def list_matching_addresses(self, currency, expression, limit=10):
Expand Down Expand Up @@ -1891,7 +1905,8 @@ async def add_balance_eth(self, currency, row):
"and currency=%s"

results = {
c: one(await self.execute_async(
c:
one(await self.execute_async(
currency, 'transformed', query,
[row['address_id'], row['address_id_group'], c]))
for c in balance_currencies
Expand Down Expand Up @@ -2027,20 +2042,20 @@ async def get_id_secondary_group_eth(self, currency, table, id_group):
return 0 if result is None else \
result['max_secondary_id']

async def list_address_txs_ordered(self,
network: str,
node_type: NodeType,
id,
tx_id_lower_bound: Optional[int],
tx_id_upper_bound: Optional[int],
is_outgoing: Optional[bool],
include_assets: Sequence[Tuple[str,
bool]],
page: Optional[int],
fetch_size: int,
cols: Optional[Sequence[str]] = None,
tx_ids: Optional[Sequence[int]] = None,
ascending: bool = False) -> Tuple[Sequence[dict], Optional[int]]:
async def list_address_txs_ordered(
self,
network: str,
node_type: NodeType,
id,
tx_id_lower_bound: Optional[int],
tx_id_upper_bound: Optional[int],
is_outgoing: Optional[bool],
include_assets: Sequence[Tuple[str, bool]],
page: Optional[int],
fetch_size: int,
cols: Optional[Sequence[str]] = None,
tx_ids: Optional[Sequence[Tuple[int, Optional[dict]]]] = None,
ascending: bool = False) -> Tuple[Sequence[dict], Optional[int]]:
"""Loads a address transactions in execution order
it allows to only get out- or incoming transaction or only
transactions of a certain asset (token), for a given address id
Expand Down Expand Up @@ -2138,34 +2153,43 @@ async def list_address_txs_ordered(self,
"s_d_group": s_d_group,
"currency": asset,
"is_outgoing": is_outgoing,
"tx_id": tx_id
} for is_outgoing, asset, s_d_group, tx_id in product(
directions, include_assets, item_id_secondary_group,
[0] if tx_ids is None else tx_ids)]

# run one query per direction and asset
aws = [
self.execute_async(network,
'transformed',
cql_stmt,
p) for p in params_junks
]
"tx_id": tx_id,
"tx_ref": tx_ref
} for is_outgoing, asset, s_d_group, (tx_id, tx_ref) in product(
directions, include_assets, item_id_secondary_group, [(
0, None)] if tx_ids is None else tx_ids)]

def tx_ref_match(a, b):
return a[0] == b[0] and \
a[1] == b[1]

async def fetch(stmt, params):
res = await self.execute_async(network, 'transformed',
cql_stmt, params)
if params['tx_ref'] is None:
return res

res.current_rows = [
r for r in res
if tx_ref_match(r['tx_reference'], params['tx_ref'])
]
return res

# run one query per direction, asset and secondary group id
aws = [fetch(cql_stmt, p) for p in params_junks]

# collect and merge results
more_results, page = merge_address_txs_subquery_results(
[r.current_rows for r in await asyncio.gather(*aws)],
ascending,
fs_it,
'transaction_id' if is_eth_like(network) else 'tx_id',
merge_all=tx_ids is not None,
fetched_limit=fs_junk)

self.logger.debug(f'list tx ordered page {page}')
self.logger.debug(f'more_results len {len(more_results)}')

results.extend(more_results)
if tx_ids is not None:
# don't page if querying specific tx_ids
break
if page is None:
# no more data expected end loop
break
Expand Down
3 changes: 2 additions & 1 deletion test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ pytest>=6.2.5
pytest-cov>=3.0.0
pytest-randomly>=3.11
pytest-aiohttp>=1.0.3
pytest-icdiff >= 0.6
pytest-icdiff >= 0.6
aiohttp-devtools==1.1.2

0 comments on commit 497283c

Please sign in to comment.