Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Refresh streams schema #194

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -617,4 +617,3 @@ Program, unless a warranty or assumption of liability accompanies a
copy of the Program in return for a fee.

END OF TERMS AND CONDITIONS

4 changes: 2 additions & 2 deletions tap_postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ def do_sync_incremental(conn_config, stream, state, desired_columns, md_map):

def sync_method_for_streams(streams, state, default_replication_method):
"""
Determines the replication method of each stream
"""
Determines the replication method of each stream
"""
lookup = {}
traditional_steams = []
logical_streams = []
Expand Down
52 changes: 31 additions & 21 deletions tap_postgres/stream_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import singer

from typing import List, Dict
from singer import metadata
from singer import metadata as metadata_util

from tap_postgres.db import open_connection
from tap_postgres.discovery_utils import discover_db
Expand All @@ -29,7 +29,7 @@ def is_selected_via_metadata(stream: Dict) -> bool:

Returns: True if selected, False otherwise.
"""
table_md = metadata.to_map(stream['metadata']).get((), {})
table_md = metadata_util.to_map(stream['metadata']).get((), {})
return table_md.get('selected', False)


Expand Down Expand Up @@ -72,37 +72,47 @@ def refresh_streams_schema(conn_config: Dict, streams: List[Dict]):
for stream in discover_db(conn, conn_config.get('filter_schemas'), [st['table_name'] for st in streams])
}

LOGGER.debug('New discovery schemas %s', new_discovery)
LOGGER.debug('New discovery schemas %s', new_discovery)

# For every stream dictionary, update the schema and metadata from the new discovery
for idx, stream in enumerate(streams):
# update schema
streams[idx]['schema'] = copy.deepcopy(new_discovery[stream['tap_stream_id']]['schema'])
# For every stream, update the schema and metadata from the corresponding discovered stream
for idx, stream in enumerate(streams):
discovered_stream = new_discovery[stream['tap_stream_id']]

if not stream.get('schema', {}).get('properties'):
discovered_schema = copy.deepcopy(discovered_stream['schema'])
LOGGER.info('Overriding schema for %s with %s', stream['tap_stream_id'], discovered_schema)
streams[idx]['schema'] = discovered_schema

streams[idx]['metadata'] = _merge_stream_metadata(stream, discovered_stream)

# Update metadata
#
# 1st step: new discovery doesn't contain non-discoverable metadata: e.g replication method & key, selected
# so let's copy those from the original stream object
md_map = metadata.to_map(stream['metadata'])
meta = md_map.get(())
LOGGER.debug('Updated streams schemas %s', streams)

for idx_met, metadatum in enumerate(new_discovery[stream['tap_stream_id']]['metadata']):
if not metadatum['breadcrumb']:
meta.update(new_discovery[stream['tap_stream_id']]['metadata'][idx_met]['metadata'])
new_discovery[stream['tap_stream_id']]['metadata'][idx_met]['metadata'] = meta

# 2nd step: now copy all the metadata from the updated new discovery to the original stream
streams[idx]['metadata'] = copy.deepcopy(new_discovery[stream['tap_stream_id']]['metadata'])
def _merge_stream_metadata(stream, discovered_stream):
"""
Discovered metadata for a stream doesn't contain non-discoverable
keys/values such as replication method, key, selected, or any other
arbitrary overridden metadata from the catalog file. Merges the discovered
metadata into the metadata from the catalog file.
"""
stream_md = metadata_util.to_map(stream['metadata'])
discovery_md = metadata_util.to_map(discovered_stream['metadata'])

LOGGER.debug('Updated streams schemas %s', streams)
for breadcrumb, metadata in discovery_md.items():
if breadcrumb in stream_md:
stream_md[breadcrumb].update(metadata)
else:
stream_md[breadcrumb] = metadata

return copy.deepcopy(metadata_util.to_list(stream_md))


def any_logical_streams(streams, default_replication_method):
"""
Checks if streams list contains any stream with log_based method
"""
for stream in streams:
stream_metadata = metadata.to_map(stream['metadata'])
stream_metadata = metadata_util.to_map(stream['metadata'])
replication_method = stream_metadata.get((), {}).get('replication-method', default_replication_method)
if replication_method == 'LOG_BASED':
return True
Expand Down
170 changes: 91 additions & 79 deletions tests/test_streams_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,92 +8,104 @@
from tap_postgres import stream_utils

try:
from tests.utils import get_test_connection, ensure_test_table, get_test_connection_config
from tests.utils import get_test_connection, ensure_test_table, get_test_connection_config
except ImportError:
from utils import get_test_connection, ensure_test_table, get_test_connection_config
from utils import get_test_connection, ensure_test_table, get_test_connection_config


def do_not_dump_catalog(catalog):
pass
pass


tap_postgres.dump_catalog = do_not_dump_catalog


class TestInit(unittest.TestCase):
maxDiff = None
table_name = 'CHICKEN TIMES'

def setUp(self):
table_spec = {"columns": [{"name": "id", "type": "integer", "primary_key": True, "serial": True},
{"name": '"character-varying_name"', "type": "character varying"},
{"name": '"varchar-name"', "type": "varchar(28)"},
{"name": 'char_name', "type": "char(10)"},
{"name": '"text-name"', "type": "text"}],
"name": self.table_name}

ensure_test_table(table_spec)

def test_refresh_streams_schema(self):
conn_config = get_test_connection_config()

streams = [
{
'table_name': self.table_name,
'stream': self.table_name,
'tap_stream_id': f'public-{self.table_name}',
'schema': [],
'metadata': [
{
'breadcrumb': [],
'metadata': {
'replication-method': 'LOG_BASED',
'table-key-properties': ['some_id'],
'row-count': 1000,
}
}
]
}
]

stream_utils.refresh_streams_schema(conn_config, streams)

self.assertEqual(len(streams), 1)
self.assertEqual(self.table_name, streams[0].get('table_name'))
self.assertEqual(self.table_name, streams[0].get('stream'))

streams[0]['metadata'].sort(key=lambda md: md['breadcrumb'])

self.assertEqual(metadata.to_map(streams[0]['metadata']), {
(): {'table-key-properties': ['id'],
'database-name': 'postgres',
'schema-name': 'public',
'is-view': False,
'row-count': 0,
'replication-method': 'LOG_BASED'
},
('properties', 'character-varying_name'): {'inclusion': 'available',
'sql-datatype': 'character varying',
'selected-by-default': True},
('properties', 'id'): {'inclusion': 'automatic',
'sql-datatype': 'integer',
'selected-by-default': True},
('properties', 'varchar-name'): {'inclusion': 'available',
'sql-datatype': 'character varying',
'selected-by-default': True},
('properties', 'text-name'): {'inclusion': 'available',
'sql-datatype': 'text',
'selected-by-default': True},
('properties', 'char_name'): {'selected-by-default': True,
'inclusion': 'available',
'sql-datatype': 'character'}})

self.assertEqual({'properties': {'id': {'type': ['integer'],
'maximum': 2147483647,
'minimum': -2147483648},
'character-varying_name': {'type': ['null', 'string']},
'varchar-name': {'type': ['null', 'string'], 'maxLength': 28},
'char_name': {'type': ['null', 'string'], 'maxLength': 10},
'text-name': {'type': ['null', 'string']}},
'type': 'object',
'definitions': BASE_RECURSIVE_SCHEMAS}, streams[0].get('schema'))
maxDiff = None
table_name = 'CHICKEN TIMES'

def setUp(self):
table_spec = {"columns": [{"name": "id", "type": "integer", "primary_key": True, "serial": True},
{"name": '"character-varying_name"', "type": "character varying"},
{"name": '"varchar-name"', "type": "varchar(28)"},
{"name": 'char_name', "type": "char(10)"},
{"name": '"text-name"', "type": "text"},
{"name": "json_name", "type": "jsonb"}],
"name": self.table_name}

ensure_test_table(table_spec)

def test_refresh_streams_schema(self):
conn_config = get_test_connection_config()

streams = [
{
'table_name': self.table_name,
'stream': self.table_name,
'tap_stream_id': f'public-{self.table_name}',
'schema': {'properties': {'json_name': {'type': ['null', 'string']}}},
'metadata': [
{
'breadcrumb': [],
'metadata': {
'replication-method': 'LOG_BASED',
'table-key-properties': ['some_id'],
'row-count': 1000,
}
},
{
'breadcrumb': ['properties', 'char_name'],
'metadata': {
'arbitrary_field_metadata': 'should be preserved'
}
}
]
}
]

stream_utils.refresh_streams_schema(conn_config, streams)

self.assertEqual(len(streams), 1)
self.assertEqual(self.table_name, streams[0].get('table_name'))
self.assertEqual(self.table_name, streams[0].get('stream'))

streams[0]['metadata'].sort(key=lambda md: md['breadcrumb'])

self.assertEqual(metadata.to_map(streams[0]['metadata']), {
(): {'table-key-properties': ['id'],
'database-name': 'postgres',
'schema-name': 'public',
'is-view': False,
'row-count': 0,
'replication-method': 'LOG_BASED'
},
('properties', 'character-varying_name'): {'inclusion': 'available',
'sql-datatype': 'character varying',
'selected-by-default': True},
('properties', 'id'): {'inclusion': 'automatic',
'sql-datatype': 'integer',
'selected-by-default': True},
('properties', 'varchar-name'): {'inclusion': 'available',
'sql-datatype': 'character varying',
'selected-by-default': True},
('properties', 'text-name'): {'inclusion': 'available',
'sql-datatype': 'text',
'selected-by-default': True},
('properties', 'char_name'): {'selected-by-default': True,
'inclusion': 'available',
'sql-datatype': 'character',
'arbitrary_field_metadata': 'should be preserved'},
('properties', 'json_name'): {'selected-by-default': True,
'inclusion': 'available',
'sql-datatype': 'jsonb'}})

self.assertEqual({'properties': {'id': {'type': ['integer'],
'maximum': 2147483647,
'minimum': -2147483648},
'character-varying_name': {'type': ['null', 'string']},
'varchar-name': {'type': ['null', 'string'], 'maxLength': 28},
'char_name': {'type': ['null', 'string'], 'maxLength': 10},
'text-name': {'type': ['null', 'string']},
'json_name': {'type': ['null', 'string']}},
'type': 'object',
'definitions': BASE_RECURSIVE_SCHEMAS}, streams[0].get('schema'))