diff --git a/dcpy/connectors/socrata/publish.py b/dcpy/connectors/socrata/publish.py index cbf768464..ed909b368 100644 --- a/dcpy/connectors/socrata/publish.py +++ b/dcpy/connectors/socrata/publish.py @@ -233,27 +233,21 @@ class RevisionDataSource: "The field names in the uploaded data do not match our metadata" ) - uploaded_columns: list[Any] - column_update_endpoint: str _raw_socrata_source: dict[str, Any] | None = None @classmethod def from_socrata_source(cls, socrata_source): - return RevisionDataSource( - _raw_socrata_source=socrata_source, - # AR Note: It's not clear to me why/when you would have multiple schemas/output - # schemas for any one datasource. In any case, I haven't yet encountered it, - # and for our use case (single dataset upload per product) I don't expect we will - uploaded_columns=socrata_source.attributes["schemas"][0]["output_schemas"][ - 0 - ]["output_columns"], - column_update_endpoint=f"https://{SOCRATA_DOMAIN}" - + socrata_source.input_schemas[0].links["transform"], - ) + return RevisionDataSource(_raw_socrata_source=socrata_source) + + def get_latest_output_schema(self): + return self._raw_socrata_source.get_latest_input_schema().get_latest_output_schema() @property def column_names(self) -> list[str]: - return [c["field_name"] for c in self.uploaded_columns] + return [ + c["field_name"] + for c in self.get_latest_output_schema().attributes["output_columns"] + ] def calculate_pushed_col_metadata(self, our_columns: list[md.DatasetColumn]): # TODO: using c.id or c.name? @@ -281,39 +275,46 @@ def calculate_pushed_col_metadata(self, our_columns: list[md.DatasetColumn]): } ) - new_uploaded_cols = [] - for uploaded_col in self.uploaded_columns: + output_schema = self.get_latest_output_schema() + print(output_schema) + for col in output_schema.attributes["output_columns"]: # Take the Socrata metadata for columns that have been uploaded, # modify them to match our metadata. - # Input columns need to be matched to what's been uploaded, - # via the `initial_output_column_id`. Otherwise update - # requests are ignored. - new_col = copy.deepcopy(uploaded_col) - - our_col = our_cols_by_field_name[uploaded_col["field_name"]] - our_col_index = list(our_api_names).index(uploaded_col["field_name"]) + field_name = col["field_name"] + our_col = our_cols_by_field_name[field_name] + our_col_index = list(our_api_names).index(field_name) - new_col["position"] = our_col_index + 1 - new_col["initial_output_column_id"] = new_col["id"] + output_schema.change_column_metadata(field_name, "position").to( + our_col_index + 1 + ) + # .change_column_metadata(col, "initial_output_column_id").to(col.id) - new_col["is_primary_key"] = ( + output_schema.change_column_metadata(field_name, "is_primary_key").to( True if (our_col.checks and our_col.checks.is_primary_key) else False ) - new_col["display_name"] = our_col.name - new_col["description"] = our_col.description - new_uploaded_cols.append(new_col) + output_schema.change_column_metadata(field_name, "display_name").to( + our_col.name + ) + output_schema.change_column_metadata(field_name, "description").to( + our_col.description + ) + if our_col.custom and our_col.custom.get("api_name"): + default_transform = col["transform"]["transform_expr"] + assert field_name in default_transform + logger.info(f"Mapping column {our_col.id} to {field_name}") + output_schema.change_column_transform(field_name).to( + default_transform.replace( + f"`{field_name}`", f"`{our_col.custom['api_name']}`" + ) + ) - return new_uploaded_cols + return output_schema def push_socrata_column_metadata(self, our_cols: list[md.DatasetColumn]): - cols = self.calculate_pushed_col_metadata(our_cols) - return _request( - self.column_update_endpoint, - "POST", - json={"output_columns": cols}, - ) + new_schema = self.calculate_pushed_col_metadata(our_cols) + return new_schema.run() @dataclass(frozen=True) @@ -594,6 +595,7 @@ def push_dataset( for attachment_id in socrata_dest.attachment_ids ] + dataset.discard_open_revisions() rev = dataset.create_replace_revision() attachments_metadata = [ diff --git a/dcpy/lifecycle/package/assemble.py b/dcpy/lifecycle/package/assemble.py index b686d488e..12ba0b2a5 100644 --- a/dcpy/lifecycle/package/assemble.py +++ b/dcpy/lifecycle/package/assemble.py @@ -140,7 +140,9 @@ def pull_destination_files( ) make_package_folder(local_package_path) product_metadata.write_to_yaml(local_package_path / "metadata.yml") + import ssl + ssl._create_default_https_context = ssl._create_unverified_context package_ids = {p.id for p in product_metadata.assembly} for f in dest.files: paths_and_dests = ids_to_paths_and_dests[f.id]