Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INTPYTHON-165 Refactor nested data handling #245

Merged
merged 25 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
tox -e benchmark -- --set-commit-hash $(git rev-parse HEAD)
}

pip install asv
pip install asv virtualenv
asv machine --yes
git fetch origin main:main
git update-ref refs/bm/pr HEAD
Expand All @@ -66,7 +66,8 @@ jobs:

- name: Compare benchmarks
run: |
asv compare refs/bm/merge-target refs/bm/pr --
asv compare --factor 1.2 --split refs/bm/merge-target refs/bm/pr --

- name: Fail if any benchmarks have slowed down too much
run: |
! asv compare --factor 1.2 --split refs/bm/merge-target refs/bm/pr | grep -q "got worse"
! asv compare --factor 1.2 --split refs/bm/merge-target refs/bm/pr 2> /dev/null | grep -q "got worse"
2 changes: 1 addition & 1 deletion .github/workflows/release-python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ jobs:
export LIBBSON_INSTALL_DIR="$(pwd)/libbson"
python -m pip install dist/*.gz
cd ..
python -c "from pymongoarrow.lib import process_bson_stream"
python -c "from pymongoarrow.lib import libbson_version"

- uses: actions/upload-artifact@v4
with:
Expand Down
6 changes: 6 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ repos:
stages: [manual]
args: ["--no-strict-imports"]

- repo: https://github.com/MarcoGorelli/cython-lint
rev: v0.16.2
hooks:
- id: cython-lint
args: ["--no-pycodestyle"]

- repo: https://github.com/codespell-project/codespell
rev: "v2.2.6"
hooks:
Expand Down
3 changes: 2 additions & 1 deletion bindings/python/asv.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@
"N_DOCS": ["20000", "1000"]
}
},
"environment_type": "virtualenv"
"environment_type": "virtualenv",
"plugins": ["virtualenv"]
}
43 changes: 16 additions & 27 deletions bindings/python/benchmarks/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,6 @@ class Read(ABC):
def setup(self):
raise NotImplementedError

# We need this because the naive methods don't always convert nested objects.
@staticmethod # noqa: B027
def exercise_table(table):
pass

def time_conventional_ndarray(self):
collection = db.benchmark
cursor = collection.find(projection={"_id": 0})
Expand Down Expand Up @@ -147,13 +142,11 @@ def time_to_pandas(self):
def time_conventional_arrow(self):
c = db.benchmark
f = list(c.find({}, projection={"_id": 0}))
table = pa.Table.from_pylist(f)
self.exercise_table(table)
pa.Table.from_pylist(f)

def time_to_arrow(self):
c = db.benchmark
table = find_arrow_all(c, {}, schema=self.schema, projection={"_id": 0})
self.exercise_table(table)
find_arrow_all(c, {}, schema=self.schema, projection={"_id": 0})

def time_conventional_polars(self):
collection = db.benchmark
Expand Down Expand Up @@ -211,27 +204,25 @@ def setup(self):
% (N_DOCS, len(BSON.encode(base_dict)) // 1024, len(base_dict))
)

# We need this because the naive methods don't always convert nested objects.
@staticmethod
def exercise_table(table):
[
[[n for n in i.values] if isinstance(i, pa.ListScalar) else i for i in column]
for column in table.columns
]

# All of the following tests are being skipped because NumPy/Pandas do not work with nested arrays.
# All of the following tests are being skipped because NumPy/Pandas/Polars do not work with nested arrays.
def time_to_numpy(self):
pass

def time_to_pandas(self):
pass

def time_to_polars(self):
pass

def time_conventional_ndarray(self):
pass

def time_conventional_pandas(self):
pass

def time_conventional_polars(self):
pass


class ProfileReadDocument(Read):
schema = Schema(
Expand Down Expand Up @@ -260,27 +251,25 @@ def setup(self):
% (N_DOCS, len(BSON.encode(base_dict)) // 1024, len(base_dict))
)

# We need this because the naive methods don't always convert nested objects.
@staticmethod
def exercise_table(table):
[
[[n for n in i.values()] if isinstance(i, pa.StructScalar) else i for i in column]
for column in table.columns
]

# All of the following tests are being skipped because NumPy/Pandas do not work with nested documents.
# All of the following tests are being skipped because NumPy/Pandas/Polars do not work with nested documents.
def time_to_numpy(self):
pass

def time_to_pandas(self):
pass

def time_to_polars(self):
pass

def time_conventional_ndarray(self):
pass

def time_conventional_pandas(self):
pass

def time_conventional_polars(self):
pass


class ProfileReadSmall(Read):
schema = Schema({"x": pa.int64(), "y": pa.float64()})
Expand Down
13 changes: 4 additions & 9 deletions bindings/python/pymongoarrow/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@
from pymongoarrow.schema import Schema
from pymongoarrow.types import _validate_schema, get_numpy_type

try: # noqa: SIM105
from pymongoarrow.lib import process_bson_stream
except ImportError:
pass

__all__ = [
"aggregate_arrow_all",
"find_arrow_all",
Expand Down Expand Up @@ -93,7 +88,7 @@ def find_arrow_all(collection, query, *, schema=None, **kwargs):
:Returns:
An instance of class:`pyarrow.Table`.
"""
context = PyMongoArrowContext.from_schema(schema, codec_options=collection.codec_options)
context = PyMongoArrowContext(schema, codec_options=collection.codec_options)

for opt in ("cursor_type",):
if kwargs.pop(opt, None):
Expand All @@ -108,7 +103,7 @@ def find_arrow_all(collection, query, *, schema=None, **kwargs):

raw_batch_cursor = collection.find_raw_batches(query, **kwargs)
for batch in raw_batch_cursor:
process_bson_stream(batch, context)
context.process_bson_stream(batch)

return context.finish()

Expand All @@ -131,7 +126,7 @@ def aggregate_arrow_all(collection, pipeline, *, schema=None, **kwargs):
:Returns:
An instance of class:`pyarrow.Table`.
"""
context = PyMongoArrowContext.from_schema(schema, codec_options=collection.codec_options)
context = PyMongoArrowContext(schema, codec_options=collection.codec_options)

if pipeline and ("$out" in pipeline[-1] or "$merge" in pipeline[-1]):
msg = (
Expand All @@ -152,7 +147,7 @@ def aggregate_arrow_all(collection, pipeline, *, schema=None, **kwargs):

raw_batch_cursor = collection.aggregate_raw_batches(pipeline, **kwargs)
for batch in raw_batch_cursor:
process_bson_stream(batch, context)
context.process_bson_stream(batch)

return context.finish()

Expand Down
150 changes: 64 additions & 86 deletions bindings/python/pymongoarrow/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,55 +11,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from bson.codec_options import DEFAULT_CODEC_OPTIONS
from pyarrow import Table, timestamp
from pyarrow import ListArray, StructArray, Table

from pymongoarrow.types import _BsonArrowTypes, _get_internal_typemap

try:
from pymongoarrow.lib import (
BinaryBuilder,
BoolBuilder,
CodeBuilder,
Date32Builder,
Date64Builder,
DatetimeBuilder,
Decimal128Builder,
DocumentBuilder,
DoubleBuilder,
Int32Builder,
Int64Builder,
ListBuilder,
NullBuilder,
ObjectIdBuilder,
StringBuilder,
)

_TYPE_TO_BUILDER_CLS = {
_BsonArrowTypes.int32: Int32Builder,
_BsonArrowTypes.int64: Int64Builder,
_BsonArrowTypes.double: DoubleBuilder,
_BsonArrowTypes.datetime: DatetimeBuilder,
_BsonArrowTypes.objectid: ObjectIdBuilder,
_BsonArrowTypes.decimal128: Decimal128Builder,
_BsonArrowTypes.string: StringBuilder,
_BsonArrowTypes.bool: BoolBuilder,
_BsonArrowTypes.document: DocumentBuilder,
_BsonArrowTypes.array: ListBuilder,
_BsonArrowTypes.binary: BinaryBuilder,
_BsonArrowTypes.code: CodeBuilder,
_BsonArrowTypes.date32: Date32Builder,
_BsonArrowTypes.date64: Date64Builder,
_BsonArrowTypes.null: NullBuilder,
}
except ImportError:
pass


class PyMongoArrowContext:
"""A context for converting BSON-formatted data to an Arrow Table."""

def __init__(self, schema, builder_map, codec_options=None):
def __init__(self, schema, codec_options=None):
"""Initialize the context.

:Parameters:
Expand All @@ -68,57 +28,75 @@ def __init__(self, schema, builder_map, codec_options=None):
:class:`~pymongoarrow.builders._BuilderBase` instances.
"""
self.schema = schema
self.builder_map = builder_map
if self.schema is None and codec_options is not None:
self.tzinfo = codec_options.tzinfo
else:
self.tzinfo = None
schema_map = {}
if self.schema is not None:
str_type_map = _get_internal_typemap(schema.typemap)
_parse_types(str_type_map, schema_map, self.tzinfo)

@classmethod
def from_schema(cls, schema, codec_options=DEFAULT_CODEC_OPTIONS):
"""Initialize the context from a :class:`~pymongoarrow.schema.Schema`
instance.
# Delayed import to prevent import errors for unbuilt library.
from pymongoarrow.lib import BuilderManager

:Parameters:
- `schema`: Instance of :class:`~pymongoarrow.schema.Schema`.
- `codec_options` (optional): An instance of
:class:`~bson.codec_options.CodecOptions`.
"""
if schema is None:
return cls(schema, {}, codec_options)

builder_map = {}
tzinfo = codec_options.tzinfo
str_type_map = _get_internal_typemap(schema.typemap)
for fname, ftype in str_type_map.items():
builder_cls = _TYPE_TO_BUILDER_CLS[ftype]
encoded_fname = fname.encode("utf-8")

# special-case initializing builders for parameterized types
if builder_cls == DatetimeBuilder:
arrow_type = schema.typemap[fname]
if tzinfo is not None and arrow_type.tz is None:
arrow_type = timestamp(arrow_type.unit, tz=tzinfo)
builder_map[encoded_fname] = DatetimeBuilder(dtype=arrow_type)
elif builder_cls == DocumentBuilder:
arrow_type = schema.typemap[fname]
builder_map[encoded_fname] = DocumentBuilder(arrow_type, tzinfo)
elif builder_cls == ListBuilder:
arrow_type = schema.typemap[fname]
builder_map[encoded_fname] = ListBuilder(arrow_type, tzinfo)
elif builder_cls == BinaryBuilder:
subtype = schema.typemap[fname].subtype
builder_map[encoded_fname] = BinaryBuilder(subtype)
else:
builder_map[encoded_fname] = builder_cls()
return cls(schema, builder_map)
self.manager = BuilderManager(schema_map, self.schema is not None, self.tzinfo)

def process_bson_stream(self, stream):
self.manager.process_bson_stream(stream, len(stream))

def finish(self):
arrays = []
names = []
for fname, builder in self.builder_map.items():
arrays.append(builder.finish())
names.append(fname.decode("utf-8"))
array_map = _parse_builder_map(self.manager.finish())
arrays = list(array_map.values())
if self.schema is not None:
return Table.from_arrays(arrays=arrays, schema=self.schema.to_arrow())
return Table.from_arrays(arrays=arrays, names=names)
return Table.from_arrays(arrays=arrays, names=list(array_map.keys()))


def _parse_builder_map(builder_map):
# Handle nested builders.
to_remove = []
# Traverse the builder map right to left.
for key, value in reversed(builder_map.items()):
if value.type_marker == _BsonArrowTypes.document.value:
names = value.finish()
full_names = [f"{key}.{name}" for name in names]
arrs = [builder_map[c] for c in full_names]
builder_map[key] = StructArray.from_arrays(arrs, names=names)
to_remove.extend(full_names)
elif value.type_marker == _BsonArrowTypes.array.value:
child_name = key + "[]"
to_remove.append(child_name)
child = builder_map[child_name]
builder_map[key] = ListArray.from_arrays(value.finish(), child)
else:
builder_map[key] = value.finish()

for key in to_remove:
if key in builder_map:
del builder_map[key]

return builder_map


def _parse_types(str_type_map, schema_map, tzinfo):
for fname, (ftype, arrow_type) in str_type_map.items():
schema_map[fname] = ftype, arrow_type

# special-case nested builders
if ftype == _BsonArrowTypes.document.value:
# construct a sub type map here
sub_type_map = {}
for i in range(arrow_type.num_fields):
field = arrow_type[i]
sub_name = f"{fname}.{field.name}"
sub_type_map[sub_name] = field.type
sub_type_map = _get_internal_typemap(sub_type_map)
_parse_types(sub_type_map, schema_map, tzinfo)
elif ftype == _BsonArrowTypes.array.value:
sub_type_map = {}
sub_name = f"{fname}[]"
sub_value_type = arrow_type.value_type
sub_type_map[sub_name] = sub_value_type
sub_type_map = _get_internal_typemap(sub_type_map)
_parse_types(sub_type_map, schema_map, tzinfo)
Loading
Loading