Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename un/merge, add unexplode #84

Merged
merged 2 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/demo/akimbo-demo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@
"id": "5c253d82-c1bf-4535-9e61-fa4b8bde799c",
"metadata": {},
"source": [
"We can convert back to an awkward Record array with merge:"
"We can convert back to an awkward Record array with pack:"
]
},
{
Expand All @@ -955,7 +955,7 @@
"metadata": {},
"outputs": [],
"source": [
"merged = akimbo.merge(df[[\"run\", \"luminosityBlock\"]])"
"merged = akimbo.pack(df[[\"run\", \"luminosityBlock\"]])"
]
},
{
Expand Down Expand Up @@ -1024,7 +1024,7 @@
}
],
"source": [
"ak.to_parquet(akimbo.merge(df[df.luminosityBlock == maybe_strange_lumiblock]), \"strange.parquet\")"
"ak.to_parquet(akimbo.pack(df[df.luminosityBlock == maybe_strange_lumiblock]), \"strange.parquet\")"
]
},
{
Expand All @@ -1051,7 +1051,7 @@
}
],
"source": [
"ak.to_parquet(akimbo.merge(df[df.luminosityBlock == maybe_strange_lumiblock]).values._data, \"strange.parquet\")"
"ak.to_parquet(akimbo.pack(df[df.luminosityBlock == maybe_strange_lumiblock]).values._data, \"strange.parquet\")"
]
},
{
Expand Down
6 changes: 3 additions & 3 deletions docs/quickstart.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@
"id": "d60887e8-582b-474f-a79f-173bc62c4bd1",
"metadata": {},
"source": [
"Of these, two are \"normal\" fields - they can be made into dataframe columns containing no nesting. To unwrap the top record-like structure of the data, we can use ``unmerge``."
"Of these, two are \"normal\" fields - they can be made into dataframe columns containing no nesting. To unwrap the top record-like structure of the data, we can use ``unpack``."
]
},
{
Expand All @@ -361,7 +361,7 @@
"metadata": {},
"outputs": [],
"source": [
"df = data.ak.unmerge()"
"df = data.ak.unpack()"
]
},
{
Expand Down Expand Up @@ -591,7 +591,7 @@
"metadata": {},
"outputs": [],
"source": [
"s = df.ak.merge()"
"s = df.ak.pack()"
]
},
{
Expand Down
2 changes: 1 addition & 1 deletion example/cudf-ak.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@
" 'typestr',\n",
" 'typetracer',\n",
" 'unflatten',\n",
" 'unmerge',\n",
" 'unpack',\n",
" 'unzip',\n",
" 'validity_error',\n",
" 'values_astype',\n",
Expand Down
5 changes: 5 additions & 0 deletions src/akimbo/cudf.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ class CudfAwkwardAccessor(Accessor):
series_type = Series
dataframe_type = DataFrame

@classmethod
def _arrow_to_series(cls, data):
# this implies CPU->GPU copy
return Series(data)

@classmethod
def _to_output(cls, arr):
if isinstance(arr, ak.Array):
Expand Down
2 changes: 1 addition & 1 deletion src/akimbo/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def ak_to_series(ds, backend="pandas", extract=True):
else:
raise ValueError("Backend must be in {'pandas', 'polars', 'dask'}")
if extract and ds.fields:
return s.ak.unmerge()
return s.ak.unpack()
return s


Expand Down
58 changes: 47 additions & 11 deletions src/akimbo/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
import pyarrow.compute as pc

from akimbo.apply_tree import dec, match_any, numeric, run_with_transform
from akimbo.utils import to_ak_layout
from akimbo.utils import rec_list_swap, to_ak_layout

methods = [
_ for _ in (dir(ak)) if not _.startswith(("_", "ak_")) and not _[0].isupper()
] + ["apply", "array", "explode", "dt", "str"]

df_methods = sorted(methods + ["merge"])
series_methods = sorted(methods + ["unmerge"])
df_methods = sorted(methods + ["pack"])
series_methods = sorted(methods + ["unpack"])


def radd(left, right):
Expand Down Expand Up @@ -165,11 +165,19 @@ def is_dataframe(cls, data):
return isinstance(data, cls.dataframe_type)

@classmethod
def _to_output(cls, data):
# TODO: clarify protocol here; can data be in arrow already?
def _arrow_to_series(cls, data):
"""How to make a series from arrow data"""
raise NotImplementedError

@classmethod
def _to_output(cls, data):
"""How to make a series from ak or arrow"""
if isinstance(data, ak.Array):
data = ak.to_arrow(data, extensionarray=False)
return cls._arrow_to_series(data)

def to_output(self, data=None):
"""Data returned as a series"""
data = data if data is not None else self.array
if not isinstance(data, Iterable):
return data
Expand Down Expand Up @@ -313,26 +321,54 @@ def rename(self, where, to):
parent.fields[this] = to
return self.to_output(ak.Array(lay))

def merge(self):
def pack(self):
"""Make a single complex series out of the columns of a dataframe"""
if not self.is_dataframe(self._obj):
raise ValueError("Can only merge on a dataframe")
raise ValueError("Can only pack on a dataframe")
out = {}
for k in self._obj.columns:
# TODO: partial merge when column names are like "record.field"
# TODO: partial pack when column names are like "record.field"
out[k] = self._obj[k].ak.array
arr = ak.Array(out)
return self.to_output(arr)

def unmerge(self):
def unpack(self):
"""Make dataframe out of a series of record type"""
# TODO: what to do when passed a dataframe, partial unpack of record fields?
arr = self.array
if not arr.fields:
raise ValueError("Not array-of-records")
# TODO: partial unmerge when (some) fields are records
out = {k: self.to_output(arr[k]) for k in arr.fields}
return self.dataframe_type(out)

def unexplode(self, *cols, outname="grouped"):
"""Repack "exploded" form dataframes into lists of structs

This is the inverse of the regular dataframe explode() process.
"""
# TODO: this does not work on cuDF as here we use arrow directly
# TODO: pandas indexes are pre-grouped cat-like structures
cols = list(cols)
arr = self.arrow
if set(cols) - set(arr.column_names):
raise ValueError(
"One or more rouping column (%s) not in available columns %s",
cols,
arr.column_names,
)
outcols = [(_, "list") for _ in arr.column_names if _ not in cols]
if not outcols:
raise ValueError("Cannot group on all available columns")
outcols2 = [f"{_[0]}_list" for _ in outcols]
grouped = arr.group_by(cols).aggregate(outcols)
akarr = ak.from_arrow(grouped)
akarr2 = akarr[outcols2]
akarr2.layout._fields = [_[0] for _ in outcols]
struct = rec_list_swap(akarr2)
final = ak.with_field(akarr[cols], struct, outname)

return self._to_output(final).ak.unpack()

def join(
self,
other,
Expand Down Expand Up @@ -394,7 +430,7 @@ def f(self, *args, **kw):
**kw,
)
if isinstance(self._obj, self.dataframe_type):
return out.ak.unmerge()
return out.ak.unpack()
return out

return f
Expand Down
6 changes: 2 additions & 4 deletions src/akimbo/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ def to_arrow(cls, data):
return pa.table(data)

@classmethod
def _to_output(cls, data):
return pd.Series(
pd.arrays.ArrowExtensionArray(ak.to_arrow(data, extensionarray=False))
)
def _arrow_to_series(cls, data):
return pd.Series(pd.arrays.ArrowExtensionArray(data))

def to_output(self, data=None):
# override to apply index
Expand Down
9 changes: 6 additions & 3 deletions src/akimbo/polars.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import awkward as ak
import polars as pl

from akimbo.mixin import Accessor
Expand All @@ -13,9 +12,13 @@ class PolarsAwkwardAccessor(Accessor):
dataframe_type = pl.DataFrame

@classmethod
def _to_output(cls, arr):
return pl.from_arrow(ak.to_arrow(arr, extensionarray=False))
def _arrow_to_series(cls, arr):
return pl.from_arrow(arr)

@classmethod
def to_arrow(cls, data):
return data.to_arrow()

def pack(self):
# polars already implements this directly
return self._obj.to_struct()
9 changes: 9 additions & 0 deletions src/akimbo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,12 @@ def to_ak_layout(ar):

def match_string(*layout):
return layout[0].is_list and layout[0].parameter("__array__") == "string"


def rec_list_swap(arr: ak.Array, field: str | None = None) -> ak.Array:
"""Make a record-of-lists into a list-of-records, assuming the lists have the same lengths"""
record_of_lists = arr[field] if field else arr
list_of_records = ak.zip(
dict(zip(ak.fields(record_of_lists), ak.unzip(record_of_lists))), depth_limit=2
)
return ak.with_field(arr, list_of_records, field) if field else list_of_records
39 changes: 39 additions & 0 deletions tests/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,42 @@ def test_rename():

s2 = s.ak.rename(("a", "b", "c"), "d")
assert s2.tolist() == [{"a": [{"b": {"d": 0}}] * 2}] * 3


def test_unexplode():
df = pd.DataFrame(
{
"x": [1, 1, 1, 2, 1, 3, 3, 1],
"y": [1, 1, 1, 2, 1, 3, 3, 1],
"z": [1, 1, 1, 2, 1, 3, 3, 2],
}
)
out = df.ak.unexplode("x")
compact = out["grouped"].tolist()
expected = [
[
{"y": 1, "z": 1},
{"y": 1, "z": 1},
{"y": 1, "z": 1},
{"y": 1, "z": 1},
{"y": 1, "z": 2},
],
[{"y": 2, "z": 2}],
[{"y": 3, "z": 3}, {"y": 3, "z": 3}],
]
assert compact == expected

out = df.ak.unexplode("x", "y")
compact = out["grouped"].tolist()
expected = [
[{"z": 1}, {"z": 1}, {"z": 1}, {"z": 1}, {"z": 2}],
[{"z": 2}],
[{"z": 3}, {"z": 3}],
]
assert compact == expected

with pytest.raises(ValueError):
df.ak.unexplode("x", "y", "z")

with pytest.raises(ValueError):
df.ak.unexplode("unknown")
45 changes: 42 additions & 3 deletions tests/test_polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ def test_apply_where():
assert s2[0] == {"a": [-1, -2, -3], "b": [1, 2, 3]}


def test_merge_unmerge():
def test_pack_unpack():
data = [
{"a": [1, 2, 3], "b": [1, 2, 3]},
{"a": [1, 2, 3], "b": [1, 2, 3]},
{"a": [1, 2, 3], "b": [1, 2, 3]},
]
s = pl.Series(data)
df = s.ak.unmerge()
df = s.ak.unpack()
assert df["a"].to_list() == [[1, 2, 3]] * 3
s2 = df.ak.merge()
s2 = df.ak.pack()
assert s.to_list() == s2.to_list()


Expand All @@ -58,3 +58,42 @@ def test_ufunc():
df = pl.DataFrame({"a": s})
df2 = df.ak + 1
assert df2["a"].to_list() == [[2, 3, 4], [], [5, 6]]


def test_unexplode():
df = pl.DataFrame(
{
"x": [1, 1, 1, 2, 1, 3, 3, 1],
"y": [1, 1, 1, 2, 1, 3, 3, 1],
"z": [1, 1, 1, 2, 1, 3, 3, 2],
}
)
out = df.ak.unexplode("x")
compact = out["grouped"].to_list()
expected = [
[
{"y": 1, "z": 1},
{"y": 1, "z": 1},
{"y": 1, "z": 1},
{"y": 1, "z": 1},
{"y": 1, "z": 2},
],
[{"y": 2, "z": 2}],
[{"y": 3, "z": 3}, {"y": 3, "z": 3}],
]
assert compact == expected

out = df.ak.unexplode("x", "y")
compact = out["grouped"].to_list()
expected = [
[{"z": 1}, {"z": 1}, {"z": 1}, {"z": 1}, {"z": 2}],
[{"z": 2}],
[{"z": 3}, {"z": 3}],
]
assert compact == expected

with pytest.raises(ValueError):
df.ak.unexplode("x", "y", "z")

with pytest.raises(ValueError):
df.ak.unexplode("unknown")
Loading