From 954bf8efac3be9384a4a67275cc6893e983e0206 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Mon, 11 Nov 2024 15:41:54 -0500 Subject: [PATCH 1/2] Rename un/merge, allow groupby listify --- docs/demo/akimbo-demo.ipynb | 8 +++---- docs/quickstart.ipynb | 6 ++--- example/cudf-ak.ipynb | 2 +- src/akimbo/cudf.py | 5 +++++ src/akimbo/io.py | 2 +- src/akimbo/mixin.py | 44 +++++++++++++++++++++++++++---------- src/akimbo/pandas.py | 6 ++--- src/akimbo/polars.py | 5 ++--- src/akimbo/utils.py | 9 ++++++++ tests/test_polars.py | 6 ++--- 10 files changed, 63 insertions(+), 30 deletions(-) diff --git a/docs/demo/akimbo-demo.ipynb b/docs/demo/akimbo-demo.ipynb index f06ae2f..7b992c1 100644 --- a/docs/demo/akimbo-demo.ipynb +++ b/docs/demo/akimbo-demo.ipynb @@ -945,7 +945,7 @@ "id": "5c253d82-c1bf-4535-9e61-fa4b8bde799c", "metadata": {}, "source": [ - "We can convert back to an awkward Record array with merge:" + "We can convert back to an awkward Record array with pack:" ] }, { @@ -955,7 +955,7 @@ "metadata": {}, "outputs": [], "source": [ - "merged = akimbo.merge(df[[\"run\", \"luminosityBlock\"]])" + "merged = akimbo.pack(df[[\"run\", \"luminosityBlock\"]])" ] }, { @@ -1024,7 +1024,7 @@ } ], "source": [ - "ak.to_parquet(akimbo.merge(df[df.luminosityBlock == maybe_strange_lumiblock]), \"strange.parquet\")" + "ak.to_parquet(akimbo.pack(df[df.luminosityBlock == maybe_strange_lumiblock]), \"strange.parquet\")" ] }, { @@ -1051,7 +1051,7 @@ } ], "source": [ - "ak.to_parquet(akimbo.merge(df[df.luminosityBlock == maybe_strange_lumiblock]).values._data, \"strange.parquet\")" + "ak.to_parquet(akimbo.pack(df[df.luminosityBlock == maybe_strange_lumiblock]).values._data, \"strange.parquet\")" ] }, { diff --git a/docs/quickstart.ipynb b/docs/quickstart.ipynb index 48a5983..2046468 100644 --- a/docs/quickstart.ipynb +++ b/docs/quickstart.ipynb @@ -351,7 +351,7 @@ "id": "d60887e8-582b-474f-a79f-173bc62c4bd1", "metadata": {}, "source": [ - "Of these, two are \"normal\" fields - they can be made into dataframe columns containing no nesting. To unwrap the top record-like structure of the data, we can use ``unmerge``." + "Of these, two are \"normal\" fields - they can be made into dataframe columns containing no nesting. To unwrap the top record-like structure of the data, we can use ``unpack``." ] }, { @@ -361,7 +361,7 @@ "metadata": {}, "outputs": [], "source": [ - "df = data.ak.unmerge()" + "df = data.ak.unpack()" ] }, { @@ -591,7 +591,7 @@ "metadata": {}, "outputs": [], "source": [ - "s = df.ak.merge()" + "s = df.ak.pack()" ] }, { diff --git a/example/cudf-ak.ipynb b/example/cudf-ak.ipynb index db5008d..8d82eba 100644 --- a/example/cudf-ak.ipynb +++ b/example/cudf-ak.ipynb @@ -273,7 +273,7 @@ " 'typestr',\n", " 'typetracer',\n", " 'unflatten',\n", - " 'unmerge',\n", + " 'unpack',\n", " 'unzip',\n", " 'validity_error',\n", " 'values_astype',\n", diff --git a/src/akimbo/cudf.py b/src/akimbo/cudf.py index 6ef07a0..c9df0b0 100644 --- a/src/akimbo/cudf.py +++ b/src/akimbo/cudf.py @@ -103,6 +103,11 @@ class CudfAwkwardAccessor(Accessor): series_type = Series dataframe_type = DataFrame + @classmethod + def _arrow_to_series(cls, data): + # this implies CPU->GPU copy + return Series(data) + @classmethod def _to_output(cls, arr): if isinstance(arr, ak.Array): diff --git a/src/akimbo/io.py b/src/akimbo/io.py index 9c75230..afaf496 100644 --- a/src/akimbo/io.py +++ b/src/akimbo/io.py @@ -26,7 +26,7 @@ def ak_to_series(ds, backend="pandas", extract=True): else: raise ValueError("Backend must be in {'pandas', 'polars', 'dask'}") if extract and ds.fields: - return s.ak.unmerge() + return s.ak.unpack() return s diff --git a/src/akimbo/mixin.py b/src/akimbo/mixin.py index c123ab6..184281f 100644 --- a/src/akimbo/mixin.py +++ b/src/akimbo/mixin.py @@ -8,14 +8,14 @@ import pyarrow.compute as pc from akimbo.apply_tree import dec, match_any, numeric, run_with_transform -from akimbo.utils import to_ak_layout +from akimbo.utils import rec_list_swap, to_ak_layout methods = [ _ for _ in (dir(ak)) if not _.startswith(("_", "ak_")) and not _[0].isupper() ] + ["apply", "array", "explode", "dt", "str"] -df_methods = sorted(methods + ["merge"]) -series_methods = sorted(methods + ["unmerge"]) +df_methods = sorted(methods + ["pack"]) +series_methods = sorted(methods + ["unpack"]) def radd(left, right): @@ -165,11 +165,19 @@ def is_dataframe(cls, data): return isinstance(data, cls.dataframe_type) @classmethod - def _to_output(cls, data): - # TODO: clarify protocol here; can data be in arrow already? + def _arrow_to_series(cls, data): + """How to make a series from arrow data""" raise NotImplementedError + @classmethod + def _to_output(cls, data): + """How to make a series from ak or arrow""" + if isinstance(data, ak.Array): + data = ak.to_arrow(data, extensionarray=False) + return cls._arrow_to_series(data) + def to_output(self, data=None): + """Data returned as a series""" data = data if data is not None else self.array if not isinstance(data, Iterable): return data @@ -313,26 +321,40 @@ def rename(self, where, to): parent.fields[this] = to return self.to_output(ak.Array(lay)) - def merge(self): + def pack(self): """Make a single complex series out of the columns of a dataframe""" if not self.is_dataframe(self._obj): - raise ValueError("Can only merge on a dataframe") + raise ValueError("Can only pack on a dataframe") out = {} for k in self._obj.columns: - # TODO: partial merge when column names are like "record.field" + # TODO: partial pack when column names are like "record.field" out[k] = self._obj[k].ak.array arr = ak.Array(out) return self.to_output(arr) - def unmerge(self): + def unpack(self): """Make dataframe out of a series of record type""" arr = self.array if not arr.fields: raise ValueError("Not array-of-records") - # TODO: partial unmerge when (some) fields are records + # TODO: partial unpack when (some) fields are records out = {k: self.to_output(arr[k]) for k in arr.fields} return self.dataframe_type(out) + def group_lists(self, *cols, outname="grouped"): + # TODO: this does not work on cuDF as here we use arrow directly + cols = list(cols) + outcols = [(_, "list") for _ in self.arrow.column_names if _ not in cols] + outcols2 = [f"{_[0]}_list" for _ in outcols] + grouped = self.arrow.group_by(cols).aggregate(outcols) + akarr = ak.from_arrow(grouped) + akarr2 = akarr[outcols2] + akarr2.layout._fields = [_[0] for _ in outcols] + struct = rec_list_swap(akarr2) + final = ak.with_field(akarr[cols], struct, outname) + + return self._to_output(final).ak.unpack() + def join( self, other, @@ -394,7 +416,7 @@ def f(self, *args, **kw): **kw, ) if isinstance(self._obj, self.dataframe_type): - return out.ak.unmerge() + return out.ak.unpack() return out return f diff --git a/src/akimbo/pandas.py b/src/akimbo/pandas.py index 7946375..996e6ae 100644 --- a/src/akimbo/pandas.py +++ b/src/akimbo/pandas.py @@ -35,10 +35,8 @@ def to_arrow(cls, data): return pa.table(data) @classmethod - def _to_output(cls, data): - return pd.Series( - pd.arrays.ArrowExtensionArray(ak.to_arrow(data, extensionarray=False)) - ) + def _arrow_to_series(cls, data): + return pd.Series(pd.arrays.ArrowExtensionArray(data)) def to_output(self, data=None): # override to apply index diff --git a/src/akimbo/polars.py b/src/akimbo/polars.py index ffc3811..b7a7013 100644 --- a/src/akimbo/polars.py +++ b/src/akimbo/polars.py @@ -1,4 +1,3 @@ -import awkward as ak import polars as pl from akimbo.mixin import Accessor @@ -13,8 +12,8 @@ class PolarsAwkwardAccessor(Accessor): dataframe_type = pl.DataFrame @classmethod - def _to_output(cls, arr): - return pl.from_arrow(ak.to_arrow(arr, extensionarray=False)) + def _arrow_to_series(cls, arr): + return pl.from_arrow(arr) @classmethod def to_arrow(cls, data): diff --git a/src/akimbo/utils.py b/src/akimbo/utils.py index a5f1b89..de0dfce 100644 --- a/src/akimbo/utils.py +++ b/src/akimbo/utils.py @@ -40,3 +40,12 @@ def to_ak_layout(ar): def match_string(*layout): return layout[0].is_list and layout[0].parameter("__array__") == "string" + + +def rec_list_swap(arr: ak.Array, field: str | None = None) -> ak.Array: + """Make a record-of-lists into a list-of-records, assuming the lists have the same lengths""" + record_of_lists = arr[field] if field else arr + list_of_records = ak.zip( + dict(zip(ak.fields(record_of_lists), ak.unzip(record_of_lists))), depth_limit=2 + ) + return ak.with_field(arr, list_of_records, field) if field else list_of_records diff --git a/tests/test_polars.py b/tests/test_polars.py index e4180fc..9a80bf4 100644 --- a/tests/test_polars.py +++ b/tests/test_polars.py @@ -28,16 +28,16 @@ def test_apply_where(): assert s2[0] == {"a": [-1, -2, -3], "b": [1, 2, 3]} -def test_merge_unmerge(): +def test_pack_unpack(): data = [ {"a": [1, 2, 3], "b": [1, 2, 3]}, {"a": [1, 2, 3], "b": [1, 2, 3]}, {"a": [1, 2, 3], "b": [1, 2, 3]}, ] s = pl.Series(data) - df = s.ak.unmerge() + df = s.ak.unpack() assert df["a"].to_list() == [[1, 2, 3]] * 3 - s2 = df.ak.merge() + s2 = df.ak.pack() assert s.to_list() == s2.to_list() From f80f4437dd55fa308e5d0ca3b907023cfe17a02f Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 12 Nov 2024 10:39:22 -0500 Subject: [PATCH 2/2] Name "unexplode" and add tests --- src/akimbo/mixin.py | 22 ++++++++++++++++++---- src/akimbo/polars.py | 4 ++++ tests/test_pandas.py | 39 +++++++++++++++++++++++++++++++++++++++ tests/test_polars.py | 39 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 100 insertions(+), 4 deletions(-) diff --git a/src/akimbo/mixin.py b/src/akimbo/mixin.py index 184281f..227a52b 100644 --- a/src/akimbo/mixin.py +++ b/src/akimbo/mixin.py @@ -334,19 +334,33 @@ def pack(self): def unpack(self): """Make dataframe out of a series of record type""" + # TODO: what to do when passed a dataframe, partial unpack of record fields? arr = self.array if not arr.fields: raise ValueError("Not array-of-records") - # TODO: partial unpack when (some) fields are records out = {k: self.to_output(arr[k]) for k in arr.fields} return self.dataframe_type(out) - def group_lists(self, *cols, outname="grouped"): + def unexplode(self, *cols, outname="grouped"): + """Repack "exploded" form dataframes into lists of structs + + This is the inverse of the regular dataframe explode() process. + """ # TODO: this does not work on cuDF as here we use arrow directly + # TODO: pandas indexes are pre-grouped cat-like structures cols = list(cols) - outcols = [(_, "list") for _ in self.arrow.column_names if _ not in cols] + arr = self.arrow + if set(cols) - set(arr.column_names): + raise ValueError( + "One or more rouping column (%s) not in available columns %s", + cols, + arr.column_names, + ) + outcols = [(_, "list") for _ in arr.column_names if _ not in cols] + if not outcols: + raise ValueError("Cannot group on all available columns") outcols2 = [f"{_[0]}_list" for _ in outcols] - grouped = self.arrow.group_by(cols).aggregate(outcols) + grouped = arr.group_by(cols).aggregate(outcols) akarr = ak.from_arrow(grouped) akarr2 = akarr[outcols2] akarr2.layout._fields = [_[0] for _ in outcols] diff --git a/src/akimbo/polars.py b/src/akimbo/polars.py index b7a7013..db20ab9 100644 --- a/src/akimbo/polars.py +++ b/src/akimbo/polars.py @@ -18,3 +18,7 @@ def _arrow_to_series(cls, arr): @classmethod def to_arrow(cls, data): return data.to_arrow() + + def pack(self): + # polars already implements this directly + return self._obj.to_struct() diff --git a/tests/test_pandas.py b/tests/test_pandas.py index 2580e59..4c3a22a 100644 --- a/tests/test_pandas.py +++ b/tests/test_pandas.py @@ -122,3 +122,42 @@ def test_rename(): s2 = s.ak.rename(("a", "b", "c"), "d") assert s2.tolist() == [{"a": [{"b": {"d": 0}}] * 2}] * 3 + + +def test_unexplode(): + df = pd.DataFrame( + { + "x": [1, 1, 1, 2, 1, 3, 3, 1], + "y": [1, 1, 1, 2, 1, 3, 3, 1], + "z": [1, 1, 1, 2, 1, 3, 3, 2], + } + ) + out = df.ak.unexplode("x") + compact = out["grouped"].tolist() + expected = [ + [ + {"y": 1, "z": 1}, + {"y": 1, "z": 1}, + {"y": 1, "z": 1}, + {"y": 1, "z": 1}, + {"y": 1, "z": 2}, + ], + [{"y": 2, "z": 2}], + [{"y": 3, "z": 3}, {"y": 3, "z": 3}], + ] + assert compact == expected + + out = df.ak.unexplode("x", "y") + compact = out["grouped"].tolist() + expected = [ + [{"z": 1}, {"z": 1}, {"z": 1}, {"z": 1}, {"z": 2}], + [{"z": 2}], + [{"z": 3}, {"z": 3}], + ] + assert compact == expected + + with pytest.raises(ValueError): + df.ak.unexplode("x", "y", "z") + + with pytest.raises(ValueError): + df.ak.unexplode("unknown") diff --git a/tests/test_polars.py b/tests/test_polars.py index 9a80bf4..d2d0f23 100644 --- a/tests/test_polars.py +++ b/tests/test_polars.py @@ -58,3 +58,42 @@ def test_ufunc(): df = pl.DataFrame({"a": s}) df2 = df.ak + 1 assert df2["a"].to_list() == [[2, 3, 4], [], [5, 6]] + + +def test_unexplode(): + df = pl.DataFrame( + { + "x": [1, 1, 1, 2, 1, 3, 3, 1], + "y": [1, 1, 1, 2, 1, 3, 3, 1], + "z": [1, 1, 1, 2, 1, 3, 3, 2], + } + ) + out = df.ak.unexplode("x") + compact = out["grouped"].to_list() + expected = [ + [ + {"y": 1, "z": 1}, + {"y": 1, "z": 1}, + {"y": 1, "z": 1}, + {"y": 1, "z": 1}, + {"y": 1, "z": 2}, + ], + [{"y": 2, "z": 2}], + [{"y": 3, "z": 3}, {"y": 3, "z": 3}], + ] + assert compact == expected + + out = df.ak.unexplode("x", "y") + compact = out["grouped"].to_list() + expected = [ + [{"z": 1}, {"z": 1}, {"z": 1}, {"z": 1}, {"z": 2}], + [{"z": 2}], + [{"z": 3}, {"z": 3}], + ] + assert compact == expected + + with pytest.raises(ValueError): + df.ak.unexplode("x", "y", "z") + + with pytest.raises(ValueError): + df.ak.unexplode("unknown")