Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fvk socrata tweaks #1344

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 38 additions & 36 deletions dcpy/connectors/socrata/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,27 +233,21 @@ class RevisionDataSource:
"The field names in the uploaded data do not match our metadata"
)

uploaded_columns: list[Any]
column_update_endpoint: str
_raw_socrata_source: dict[str, Any] | None = None

@classmethod
def from_socrata_source(cls, socrata_source):
return RevisionDataSource(
_raw_socrata_source=socrata_source,
# AR Note: It's not clear to me why/when you would have multiple schemas/output
# schemas for any one datasource. In any case, I haven't yet encountered it,
# and for our use case (single dataset upload per product) I don't expect we will
uploaded_columns=socrata_source.attributes["schemas"][0]["output_schemas"][
0
]["output_columns"],
column_update_endpoint=f"https://{SOCRATA_DOMAIN}"
+ socrata_source.input_schemas[0].links["transform"],
)
return RevisionDataSource(_raw_socrata_source=socrata_source)

def get_latest_output_schema(self):
return self._raw_socrata_source.get_latest_input_schema().get_latest_output_schema()

@property
def column_names(self) -> list[str]:
return [c["field_name"] for c in self.uploaded_columns]
return [
c["field_name"]
for c in self.get_latest_output_schema().attributes["output_columns"]
]

def calculate_pushed_col_metadata(self, our_columns: list[md.DatasetColumn]):
# TODO: using c.id or c.name?
Expand Down Expand Up @@ -281,39 +275,46 @@ def calculate_pushed_col_metadata(self, our_columns: list[md.DatasetColumn]):
}
)

new_uploaded_cols = []
for uploaded_col in self.uploaded_columns:
output_schema = self.get_latest_output_schema()
print(output_schema)
for col in output_schema.attributes["output_columns"]:
# Take the Socrata metadata for columns that have been uploaded,
# modify them to match our metadata.

# Input columns need to be matched to what's been uploaded,
# via the `initial_output_column_id`. Otherwise update
# requests are ignored.
new_col = copy.deepcopy(uploaded_col)

our_col = our_cols_by_field_name[uploaded_col["field_name"]]
our_col_index = list(our_api_names).index(uploaded_col["field_name"])
field_name = col["field_name"]
our_col = our_cols_by_field_name[field_name]
our_col_index = list(our_api_names).index(field_name)

new_col["position"] = our_col_index + 1
new_col["initial_output_column_id"] = new_col["id"]
output_schema.change_column_metadata(field_name, "position").to(
our_col_index + 1
)
# .change_column_metadata(col, "initial_output_column_id").to(col.id)
Copy link
Contributor Author

@fvankrieken fvankrieken Dec 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left this as a comment but seems like this is handled under the hood when using the socratapy methods


new_col["is_primary_key"] = (
output_schema.change_column_metadata(field_name, "is_primary_key").to(
True if (our_col.checks and our_col.checks.is_primary_key) else False
)

new_col["display_name"] = our_col.name
new_col["description"] = our_col.description
new_uploaded_cols.append(new_col)
output_schema.change_column_metadata(field_name, "display_name").to(
our_col.name
)
output_schema.change_column_metadata(field_name, "description").to(
our_col.description
)
if our_col.custom and our_col.custom.get("api_name"):
default_transform = col["transform"]["transform_expr"]
assert field_name in default_transform
logger.info(f"Mapping column {our_col.id} to {field_name}")
output_schema.change_column_transform(field_name).to(
default_transform.replace(
f"`{field_name}`", f"`{our_col.custom['api_name']}`"
)
)

return new_uploaded_cols
return output_schema

def push_socrata_column_metadata(self, our_cols: list[md.DatasetColumn]):
cols = self.calculate_pushed_col_metadata(our_cols)
return _request(
self.column_update_endpoint,
"POST",
json={"output_columns": cols},
)
new_schema = self.calculate_pushed_col_metadata(our_cols)
return new_schema.run()


@dataclass(frozen=True)
Expand Down Expand Up @@ -594,6 +595,7 @@ def push_dataset(
for attachment_id in socrata_dest.attachment_ids
]

dataset.discard_open_revisions()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should remember to take this out 🙂

rev = dataset.create_replace_revision()

attachments_metadata = [
Expand Down
2 changes: 2 additions & 0 deletions dcpy/lifecycle/package/assemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,9 @@ def pull_destination_files(
)
make_package_folder(local_package_path)
product_metadata.write_to_yaml(local_package_path / "metadata.yml")
import ssl

ssl._create_default_https_context = ssl._create_unverified_context
package_ids = {p.id for p in product_metadata.assembly}
for f in dest.files:
paths_and_dests = ids_to_paths_and_dests[f.id]
Expand Down
Loading