Skip to content

Commit

Permalink
Synchronise with dask-expr, newer Dask and newer deltalake (#69)
Browse files Browse the repository at this point in the history
Co-authored-by: Patrick Hoefler <[email protected]>
  • Loading branch information
fjetter and phofl authored Jul 17, 2024
1 parent 4533f53 commit 7706c22
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
strategy:
matrix:
os: ["windows-latest", "ubuntu-latest", "macos-latest"]
python-version: ["3.9", "3.10", "3.11"]
python-version: ["3.9", "3.10", "3.11", "3.12"]

steps:
- name: Checkout source
Expand Down
9 changes: 9 additions & 0 deletions continous_integeration/environment-3.12.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: test-environment
channels:
- conda-forge
dependencies:
- python=3.12
- dask
- pyarrow
- pytest
- pytest-cov
7 changes: 3 additions & 4 deletions dask_deltatable/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def _read_from_filesystem(
storage_options: dict[str, str] | None = None,
delta_storage_options: dict[str, str] | None = None,
**kwargs: dict[str, Any],
) -> dd.core.DataFrame:
) -> dd.DataFrame:
"""
Reads the list of parquet files in parallel
"""
Expand Down Expand Up @@ -123,6 +123,7 @@ def _read_from_filesystem(
if not dd._dask_expr_enabled():
# Setting token not supported in dask-expr
kwargs["token"] = tokenize(path, fs_token, **kwargs) # type: ignore

return dd.from_map(
_read_delta_partition,
pq_files,
Expand Down Expand Up @@ -151,9 +152,7 @@ def _get_type_mapper(
)


def _read_from_catalog(
database_name: str, table_name: str, **kwargs
) -> dd.core.DataFrame:
def _read_from_catalog(database_name: str, table_name: str, **kwargs) -> dd.DataFrame:
if ("AWS_ACCESS_KEY_ID" not in os.environ) and (
"AWS_SECRET_ACCESS_KEY" not in os.environ
):
Expand Down
53 changes: 28 additions & 25 deletions dask_deltatable/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@
from pathlib import Path
from typing import Any, Literal

import dask
import dask.dataframe as dd
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.fs as pa_fs
from dask.core import flatten
from dask.dataframe.core import Scalar
from dask.highlevelgraph import HighLevelGraph
from deltalake import DeltaTable

try:
Expand All @@ -31,6 +30,7 @@
DeltaStorageHandler,
__enforce_append_only,
get_file_stats_from_metadata,
get_num_idx_cols_and_stats_columns,
get_partitions_from_path,
try_get_table_and_table_uri,
write_deltalake_pyarrow,
Expand Down Expand Up @@ -197,7 +197,6 @@ def to_deltalake(
if mode == "overwrite":
# FIXME: There are a couple of checks that are not migrated yet
raise NotImplementedError("mode='overwrite' is not implemented")

written = df.map_partitions(
_write_partition,
schema=schema,
Expand All @@ -211,27 +210,24 @@ def to_deltalake(
filesystem=filesystem,
max_partitions=max_partitions,
meta=(None, object),
table=table,
configuration=configuration,
)
final_name = "delta-commit"
dsk = {
(final_name, 0): (
_commit,
table,
written.__dask_keys__(),
table_uri,
schema,
mode,
partition_by,
name,
description,
configuration,
storage_options,
partition_filters,
custom_metadata,
)
}
graph = HighLevelGraph.from_collections(final_name, dsk, dependencies=(written,)) # type: ignore
result = Scalar(graph, final_name, "")
result = dask.delayed(_commit, name="deltatable-commit")(
table,
written,
table_uri,
schema,
mode,
partition_by,
name,
description,
configuration,
storage_options,
partition_filters,
custom_metadata,
)

if compute:
result = result.compute()
return result
Expand All @@ -258,7 +254,7 @@ def _commit(
if schema:
schemas.append(schema)

# TODO: This is applying a potentially stricted schema control than what
# TODO: This is applying a potentially stricter schema control than what
# Delta requires but if this passes, it should be good to go
schema = validate_compatible(schemas)
assert schema
Expand Down Expand Up @@ -300,6 +296,8 @@ def _write_partition(
max_rows_per_group,
filesystem,
max_partitions,
table,
configuration,
) -> tuple[pa.Schema, list[AddAction]]:
if schema is None:
#
Expand All @@ -309,8 +307,13 @@ def _write_partition(
add_actions: list[AddAction] = []

def visitor(written_file: Any) -> None:
num_indexed_cols, stats_cols = get_num_idx_cols_and_stats_columns(
table._table if table is not None else None, configuration
)
path, partition_values = get_partitions_from_path(written_file.path)
stats = get_file_stats_from_metadata(written_file.metadata)
stats = get_file_stats_from_metadata(
written_file.metadata, num_indexed_cols, stats_cols
)

# PyArrow added support for written_file.size in 9.0.0
if PYARROW_MAJOR_VERSION >= 9:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ profile = "black"
add_imports = ["from __future__ import annotations"]

[tool.black]
target-version = ['py38']
target-version = ['py310']
include = '\.pyi?$'
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
dask[dataframe]
deltalake>=0.16
deltalake>=0.18
fsspec
pyarrow

0 comments on commit 7706c22

Please sign in to comment.