Skip to content
This repository has been archived by the owner on May 12, 2022. It is now read-only.

migration speedup #12

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions block_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import google.protobuf.json_format
import binascii
import hashlib
import multiprocessing

from schema_version import *

Expand Down Expand Up @@ -37,6 +38,37 @@ def iterate(self):
yield block
height += 1

def iterate_with_callable(self, func):
'WARNING func must be thread safe!'
current_height = multiprocessing.Value('i')
current_height.value = 0

def load_and_feed_next_block():
def load_block(height=current_height):
with height.get_lock():
height_local = height.value
height.value += 1
return self.load_at_height(height_local)

for block in iter(load_block, None):
func(block)

pool = list()

for _ in range(multiprocessing.cpu_count()):
process = multiprocessing.Process(target=load_and_feed_next_block)
process.start()
pool.append(process)

for process in pool:
process.join()

for process in pool:
if process.exitcode != 0:
raise Exception(
f'BlockStorageFiles: child process {process.name} exited with code {process.exitcode}'
)

def _get_block_file_path_at_height(self, height: int) -> str:
return os.path.join(self._path, '{0:0>16}'.format(height))

Expand Down
68 changes: 40 additions & 28 deletions migration_data/release_1_2_0.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import itertools
import json
import multiprocessing


def migrate_1_1_3_to_1_2_0(cursor, block_storage):
Expand Down Expand Up @@ -34,24 +35,33 @@ def migrate_1_1_3_to_1_2_0(cursor, block_storage):
cursor.execute(
'create table blockstorage_data (h bigint, i bigint, t bigint)')

def get_blocks_data(blocks):
for block in blocks:
height = block.block_v1.payload.height
for index, tx in enumerate(block.block_v1.payload.transactions):
yield {
'h': height,
'i': index,
't': tx.payload.reduced_payload.created_time
}

BULK_SZ = 1000
it = iter(get_blocks_data(block_storage.iterate()))
while True:
chunk = tuple(itertools.islice(it, BULK_SZ))
if len(chunk) == 0: break
cursor.execute(
'insert into blockstorage_data (select * from json_populate_recordset(null::blockstorage_data, %s))',
(json.dumps(chunk), ))
multiprocessing_manager = multiprocessing.Manager()
block_data_to_submit = multiprocessing_manager.Queue()

def get_block_data(block):
height = block.block_v1.payload.height
for index, tx in enumerate(block.block_v1.payload.transactions):
block_data_to_submit.put(
(height, index, tx.payload.reduced_payload.created_time))

def submit_block_data(block_data_to_submit):
BULK_SZ = 1000
data = iter(block_data_to_submit.get, 'stop')
while True:
chunk = tuple(itertools.islice(data, BULK_SZ))
if len(chunk) == 0: break
cursor.execute(
'insert into blockstorage_data (h, i, t) values {}'.format(
', '.join(map(str, chunk))))

block_data_submitter = multiprocessing.Process(
target=submit_block_data, args=(block_data_to_submit, ))
block_data_submitter.start()

block_storage.iterate_with_callable(get_block_data)

block_data_to_submit.put('stop')
block_data_submitter.join()

# now merge the tables and block storage data
cursor.execute('''
Expand All @@ -63,16 +73,6 @@ def get_blocks_data(blocks):
height bigint,
index bigint
);
CREATE INDEX IF NOT EXISTS tx_positions_hash_index
ON tx_positions
USING hash
(hash);
CREATE INDEX IF NOT EXISTS tx_positions_creator_id_asset_index
ON tx_positions
(creator_id, asset_id);
CREATE INDEX IF NOT EXISTS tx_positions_ts_height_index_index
ON tx_positions
(ts);

-- aux constraint to avoid duplicate rows
alter table tx_positions
Expand Down Expand Up @@ -126,6 +126,18 @@ def get_blocks_data(blocks):
-- drop aux stuff
drop table blockstorage_data;
alter table tx_positions drop constraint aux_tx_positions_unique_constraint cascade;

-- create indices
CREATE INDEX IF NOT EXISTS tx_positions_hash_index
MBoldyrev marked this conversation as resolved.
Show resolved Hide resolved
ON tx_positions
USING hash
(hash);
CREATE INDEX IF NOT EXISTS tx_positions_creator_id_asset_index
ON tx_positions
(creator_id, asset_id);
CREATE INDEX IF NOT EXISTS tx_positions_ts_height_index_index
ON tx_positions
(ts);
''')

# burrow stuff
Expand Down