From 4495f0dbfbb2eeac539e7502e9c7417cb7428491 Mon Sep 17 00:00:00 2001 From: Matthias Date: Fri, 15 Mar 2024 16:42:59 +0100 Subject: [PATCH 1/3] fix listing txs between nodes (list_address_links), fix #100, fix #101, fix #102 --- gsrest/db/cassandra.py | 154 ++++++++++++++++++++++++----------------- 1 file changed, 89 insertions(+), 65 deletions(-) diff --git a/gsrest/db/cassandra.py b/gsrest/db/cassandra.py index 787cb187..a548c63a 100644 --- a/gsrest/db/cassandra.py +++ b/gsrest/db/cassandra.py @@ -38,6 +38,7 @@ class NetworkParameters(UserDict): + def __getitem__(self, network): if network not in self: raise NetworkNotFoundException(network) @@ -88,6 +89,7 @@ def identity1(x): def identity2(y, x): return x + def fmt(v): if isinstance(v, str): return "'" + v + "'" @@ -95,6 +97,7 @@ def fmt(v): return "0x" + v.hex() return str(v) + def replaceFrom(keyspace, query): r = re.compile(r'\s+FROM\s+', re.IGNORECASE) return r.sub(f' FROM {keyspace}.', query) @@ -130,7 +133,9 @@ def build_token_tx(token_currency, tx, token_tx, log): "value": value["value"] } + class BytesPrettyPrinter(PrettyPrinter): + def format(self, object, context, maxlevels, level): if isinstance(object, bytes): x = object.hex() @@ -139,6 +144,7 @@ def format(self, object, context, maxlevels, level): class Result: + def __init__(self, current_rows, params, paging_state): self.current_rows = current_rows self.params = params @@ -172,12 +178,12 @@ def wc(cl, cond): def merge_address_txs_subquery_results( - result_sets: Sequence[Result], - ascending: bool, - fetch_size: int, - tx_id_keys: str = "tx_id", - merge_all: bool = False, - fetched_limit: Optional[int] = None) -> Tuple[Sequence[dict], Optional[int]]: + result_sets: Sequence[Result], + ascending: bool, + fetch_size: int, + tx_id_keys: str = "tx_id", + fetched_limit: Optional[int] = None +) -> Tuple[Sequence[dict], Optional[int]]: """Merges sub results of the address txs queries per asset and direction Args: @@ -185,7 +191,6 @@ def merge_address_txs_subquery_results( one per parameter tuple fetch_size (int): number of items return at most tx_id_keys (str): name of the tx_id column - merge_all (bool): Just merge the sets without detecting page fetched_limit (int): The limit that was used to fetch the result sets Returns: @@ -196,9 +201,11 @@ def merge_address_txs_subquery_results( # find the least common tx_id where we then cut the result sets border_tx_id = None + total_results_len = 0 for results in result_sets: if not results: continue + total_results_len += len(results) if fetched_limit and len(results) < fetched_limit: continue if border_tx_id is None: @@ -212,16 +219,10 @@ def merge_address_txs_subquery_results( # filtered out rows could be overlapping with yet not retrieved result sets candidates = [ v for results in result_sets for v in results - if border_tx_id is None - or merge_all - or ascending and v[tx_id_keys] <= border_tx_id - or not ascending and v[tx_id_keys] >= border_tx_id + if border_tx_id is None or ascending and v[tx_id_keys] <= border_tx_id + or not ascending and v[tx_id_keys] >= border_tx_id ] - results = heapq.nlargest(fetch_size, - candidates, - key=partial(transaction_ordering_key, tx_id_keys)) - # Merge overlapping result sets by given sort order (uses a priority # queue; heapq) # fetch_sized items or less are returned @@ -231,7 +232,8 @@ def merge_address_txs_subquery_results( key=partial(transaction_ordering_key, tx_id_keys)) # use the last tx_id as page handle - border_tx_id = results[-1][tx_id_keys] if results else None + border_tx_id = results[-1][tx_id_keys] \ + if results and total_results_len > fetch_size else None return results, border_tx_id @@ -284,7 +286,9 @@ def build_select_address_txs_statement(network: str, node_type: NodeType, class Cassandra: + def eth(func): + def check(*args, **kwargs): self = args[0] currency = args[1] @@ -303,6 +307,7 @@ def check(*args, **kwargs): return check def new(func): + def check(*args, **kwargs): self = args[0] currency = args[1] @@ -574,9 +579,6 @@ def execute_async_lowlevel(self, loop = asyncio.get_event_loop() future = loop.create_future() - # h = hash(q + str(params)) - # self.logger.debug(f'{h} {q} {params}') - def on_done(result): if future.cancelled(): loop.call_soon_threadsafe(future.set_result, None) @@ -587,14 +589,16 @@ def on_done(result): if self.logger.level == logging.DEBUG: if named_params: formatted = query - for k,v in params.items(): - formatted = formatted.replace("%(" + k + ")s", fmt(v)) + for k, v in params.items(): + formatted = formatted.replace( + "%(" + k + ")s", fmt(v)) else: formatted = query % tuple([fmt(v) for v in params]) self.logger.debug(formatted) - pp = BytesPrettyPrinter() + # pp = BytesPrettyPrinter() # self.logger.debug(pp.pformat(result.current_rows)) - self.logger.debug(f'result size {len(result.current_rows)}') + self.logger.debug( + f'result size {len(result.current_rows)}') loop.call_soon_threadsafe(future.set_result, result) def on_err(result): @@ -1119,7 +1123,6 @@ async def list_links(self, first_value = 'output_value' second_value = 'input_value' - if is_eth_like(currency): token_config = self.get_token_configuration(currency) include_assets = list(token_config.keys()) @@ -1142,9 +1145,14 @@ async def list_links(self, page=page, fetch_size=fs_it) + self.logger.debug(f'results1 {len(results1)} {new_page}') + tx_id = 'transaction_id' if is_eth_like(currency) else 'tx_id' - first_tx_ids = [row[tx_id] for row in results1] + first_tx_ids = \ + [(row[tx_id], row['tx_reference']) for row in results1] \ + if is_eth_like(currency) else \ + [(row[tx_id], None) for row in results1] assets = set([currency.upper()]) if is_eth_like(currency): @@ -1164,13 +1172,13 @@ async def list_links(self, tx_ids=first_tx_ids, # limit second set by tx ids of first set page=page, fetch_size=fs_it) - self.logger.debug(f'results2 {page} {len(results2)}') + self.logger.debug(f'results2 {len(results2)} {page}') - results1 = {row[tx_id]: row for row in results1} if is_eth_like(currency): - results2 = await self.normalize_address_transactions(currency, - results2) + results2 = await self.normalize_address_transactions( + currency, results2) else: + results1 = {row[tx_id]: row for row in results1} tx_ids = [row[tx_id] for row in results2] txs = await self.list_txs_by_ids(currency, tx_ids) @@ -1183,27 +1191,33 @@ async def list_links(self, row[second_value] = row['value'] row[first_value] = results1[row[tx_id]]['value'] - for k, v in tx.items(): row[k] = v if is_eth_like(currency): + # TODO probably this check is no longer necessary + # since we filtered on tx_ref level already + # in list_address_txs_ordered if node_type == NodeType.CLUSTER: neighbor = dst_node['root_address'] id = src_node['root_address'] - # Token/Trace transactions might not be between the requested nodes - # so only keep the relevant ones + # Token/Trace transactions might not be between the requested + # nodes so only keep the relevant ones + before = len(results2) results2 = [ - tx for tx in results2 - if tx["to_address"] == neighbor + tx for tx in results2 if tx["to_address"] == neighbor and tx["from_address"] == id ] + self.logger.debug(f'pruned {before - len(results2)}') final_results.extend(results2) page = new_page + self.logger.debug(f'next page {page}') if page is None: break + self.logger.debug(f'final_results len {len(final_results)}') + return final_results, str(page) if page is not None else None async def list_matching_addresses(self, currency, expression, limit=10): @@ -1891,7 +1905,8 @@ async def add_balance_eth(self, currency, row): "and currency=%s" results = { - c: one(await self.execute_async( + c: + one(await self.execute_async( currency, 'transformed', query, [row['address_id'], row['address_id_group'], c])) for c in balance_currencies @@ -2027,20 +2042,20 @@ async def get_id_secondary_group_eth(self, currency, table, id_group): return 0 if result is None else \ result['max_secondary_id'] - async def list_address_txs_ordered(self, - network: str, - node_type: NodeType, - id, - tx_id_lower_bound: Optional[int], - tx_id_upper_bound: Optional[int], - is_outgoing: Optional[bool], - include_assets: Sequence[Tuple[str, - bool]], - page: Optional[int], - fetch_size: int, - cols: Optional[Sequence[str]] = None, - tx_ids: Optional[Sequence[int]] = None, - ascending: bool = False) -> Tuple[Sequence[dict], Optional[int]]: + async def list_address_txs_ordered( + self, + network: str, + node_type: NodeType, + id, + tx_id_lower_bound: Optional[int], + tx_id_upper_bound: Optional[int], + is_outgoing: Optional[bool], + include_assets: Sequence[Tuple[str, bool]], + page: Optional[int], + fetch_size: int, + cols: Optional[Sequence[str]] = None, + tx_ids: Optional[Sequence[Tuple[int, Optional[dict]]]] = None, + ascending: bool = False) -> Tuple[Sequence[dict], Optional[int]]: """Loads a address transactions in execution order it allows to only get out- or incoming transaction or only transactions of a certain asset (token), for a given address id @@ -2138,18 +2153,30 @@ async def list_address_txs_ordered(self, "s_d_group": s_d_group, "currency": asset, "is_outgoing": is_outgoing, - "tx_id": tx_id - } for is_outgoing, asset, s_d_group, tx_id in product( - directions, include_assets, item_id_secondary_group, - [0] if tx_ids is None else tx_ids)] - - # run one query per direction and asset - aws = [ - self.execute_async(network, - 'transformed', - cql_stmt, - p) for p in params_junks - ] + "tx_id": tx_id, + "tx_ref": tx_ref + } for is_outgoing, asset, s_d_group, (tx_id, tx_ref) in product( + directions, include_assets, item_id_secondary_group, [( + 0, None)] if tx_ids is None else tx_ids)] + + def tx_ref_match(a, b): + return a[0] == b[0] and \ + a[1] == b[1] + + async def fetch(stmt, params): + res = await self.execute_async(network, 'transformed', + cql_stmt, params) + if params['tx_ref'] is None: + return res + + res.current_rows = [ + r for r in res + if tx_ref_match(r['tx_reference'], params['tx_ref']) + ] + return res + + # run one query per direction, asset and secondary group id + aws = [fetch(cql_stmt, p) for p in params_junks] # collect and merge results more_results, page = merge_address_txs_subquery_results( @@ -2157,15 +2184,12 @@ async def list_address_txs_ordered(self, ascending, fs_it, 'transaction_id' if is_eth_like(network) else 'tx_id', - merge_all=tx_ids is not None, fetched_limit=fs_junk) self.logger.debug(f'list tx ordered page {page}') + self.logger.debug(f'more_results len {len(more_results)}') results.extend(more_results) - if tx_ids is not None: - # don't page if querying specific tx_ids - break if page is None: # no more data expected end loop break From df8b91cebb9291ae3a52a91a45f11201c7471edb Mon Sep 17 00:00:00 2001 From: Matthias Date: Fri, 15 Mar 2024 16:51:31 +0100 Subject: [PATCH 2/3] adapt changelog --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7bb67f52..be0c76d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). +## [24.02.3] - 2024-03-15 + +### Fixed +- Listing txs between addresses (list_address_txs) + +## [24.02.2] - 2024-03-14 + +- Listing txs between addresses (list_address_txs) + ## [24.02.1] - 2024-03-06 ### Fixed From 42a3382b6e78fe7aa6a9044cabe38436d7c4bbcc Mon Sep 17 00:00:00 2001 From: Matthias Date: Fri, 15 Mar 2024 16:52:01 +0100 Subject: [PATCH 3/3] fix make lint, add adev --- Makefile | 5 ++++- test-requirements.txt | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index c635ae50..7a54227b 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ test-all-env: tox -- graphsense lint: - flake8 gsrest + flake8 ./gsrest --count --max-line-length=80 --statistics --exclude plugins format: autopep8 --in-place --recursive gsrest @@ -16,6 +16,9 @@ format: serve: python -m aiohttp.web -H localhost -P 9000 openapi_server:main +dev: + adev runserver -p 9000 --root . --app-factory main openapi_server/__init__.py + build-docker: docker build -t graphsense-rest . diff --git a/test-requirements.txt b/test-requirements.txt index ddf446ea..cca2aa2a 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -2,4 +2,5 @@ pytest>=6.2.5 pytest-cov>=3.0.0 pytest-randomly>=3.11 pytest-aiohttp>=1.0.3 -pytest-icdiff >= 0.6 \ No newline at end of file +pytest-icdiff >= 0.6 +aiohttp-devtools==1.1.2