diff --git a/block_storage.py b/block_storage.py index 43cd0b8..ddb39b0 100644 --- a/block_storage.py +++ b/block_storage.py @@ -5,6 +5,7 @@ import google.protobuf.json_format import binascii import hashlib +import multiprocessing from schema_version import * @@ -37,6 +38,37 @@ def iterate(self): yield block height += 1 + def iterate_with_callable(self, func): + 'WARNING func must be thread safe!' + current_height = multiprocessing.Value('i') + current_height.value = 0 + + def load_and_feed_next_block(): + def load_block(height=current_height): + with height.get_lock(): + height_local = height.value + height.value += 1 + return self.load_at_height(height_local) + + for block in iter(load_block, None): + func(block) + + pool = list() + + for _ in range(multiprocessing.cpu_count()): + process = multiprocessing.Process(target=load_and_feed_next_block) + process.start() + pool.append(process) + + for process in pool: + process.join() + + for process in pool: + if process.exitcode != 0: + raise Exception( + f'BlockStorageFiles: child process {process.name} exited with code {process.exitcode}' + ) + def _get_block_file_path_at_height(self, height: int) -> str: return os.path.join(self._path, '{0:0>16}'.format(height)) diff --git a/migration_data/release_1_2_0.py b/migration_data/release_1_2_0.py index 749488a..edf75cb 100644 --- a/migration_data/release_1_2_0.py +++ b/migration_data/release_1_2_0.py @@ -1,5 +1,6 @@ import itertools import json +import multiprocessing def migrate_1_1_3_to_1_2_0(cursor, block_storage): @@ -34,24 +35,33 @@ def migrate_1_1_3_to_1_2_0(cursor, block_storage): cursor.execute( 'create table blockstorage_data (h bigint, i bigint, t bigint)') - def get_blocks_data(blocks): - for block in blocks: - height = block.block_v1.payload.height - for index, tx in enumerate(block.block_v1.payload.transactions): - yield { - 'h': height, - 'i': index, - 't': tx.payload.reduced_payload.created_time - } - - BULK_SZ = 1000 - it = iter(get_blocks_data(block_storage.iterate())) - while True: - chunk = tuple(itertools.islice(it, BULK_SZ)) - if len(chunk) == 0: break - cursor.execute( - 'insert into blockstorage_data (select * from json_populate_recordset(null::blockstorage_data, %s))', - (json.dumps(chunk), )) + multiprocessing_manager = multiprocessing.Manager() + block_data_to_submit = multiprocessing_manager.Queue() + + def get_block_data(block): + height = block.block_v1.payload.height + for index, tx in enumerate(block.block_v1.payload.transactions): + block_data_to_submit.put( + (height, index, tx.payload.reduced_payload.created_time)) + + def submit_block_data(block_data_to_submit): + BULK_SZ = 1000 + data = iter(block_data_to_submit.get, 'stop') + while True: + chunk = tuple(itertools.islice(data, BULK_SZ)) + if len(chunk) == 0: break + cursor.execute( + 'insert into blockstorage_data (h, i, t) values {}'.format( + ', '.join(map(str, chunk)))) + + block_data_submitter = multiprocessing.Process( + target=submit_block_data, args=(block_data_to_submit, )) + block_data_submitter.start() + + block_storage.iterate_with_callable(get_block_data) + + block_data_to_submit.put('stop') + block_data_submitter.join() # now merge the tables and block storage data cursor.execute(''' @@ -63,16 +73,6 @@ def get_blocks_data(blocks): height bigint, index bigint ); - CREATE INDEX IF NOT EXISTS tx_positions_hash_index - ON tx_positions - USING hash - (hash); - CREATE INDEX IF NOT EXISTS tx_positions_creator_id_asset_index - ON tx_positions - (creator_id, asset_id); - CREATE INDEX IF NOT EXISTS tx_positions_ts_height_index_index - ON tx_positions - (ts); -- aux constraint to avoid duplicate rows alter table tx_positions @@ -126,6 +126,18 @@ def get_blocks_data(blocks): -- drop aux stuff drop table blockstorage_data; alter table tx_positions drop constraint aux_tx_positions_unique_constraint cascade; + + -- create indices + CREATE INDEX IF NOT EXISTS tx_positions_hash_index + ON tx_positions + USING hash + (hash); + CREATE INDEX IF NOT EXISTS tx_positions_creator_id_asset_index + ON tx_positions + (creator_id, asset_id); + CREATE INDEX IF NOT EXISTS tx_positions_ts_height_index_index + ON tx_positions + (ts); ''') # burrow stuff