Skip to content

Commit

Permalink
✨ Add first version of mapply for DataFrame and Series
Browse files Browse the repository at this point in the history
  • Loading branch information
ddelange committed Oct 27, 2020
1 parent 85cd2d2 commit 05c1adb
Show file tree
Hide file tree
Showing 12 changed files with 367 additions and 42 deletions.
6 changes: 2 additions & 4 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [2.7, 3.4, 3.5, 3.6, 3.7, 3.8, 3.9]
python-version: [3.6, 3.7, 3.8, 3.9]

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -52,17 +52,15 @@ jobs:
pip install codecov
- name: Lint
if: matrix.python-version != 2.7
run: make lint

- name: Install (self)
run: pip install -e .
run: pip install .

- name: Test
run: make test

- name: Docs
if: matrix.python-version != 2.7
run: SPHINXOPTS=-W make builddocs

- name: Codecov
Expand Down
67 changes: 67 additions & 0 deletions .secrets.baseline
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
{
"custom_plugin_paths": [],
"exclude": {
"files": null,
"lines": null
},
"generated_at": "2020-10-26T21:50:19Z",
"plugins_used": [
{
"name": "AWSKeyDetector"
},
{
"name": "ArtifactoryDetector"
},
{
"base64_limit": 4.5,
"name": "Base64HighEntropyString"
},
{
"name": "BasicAuthDetector"
},
{
"name": "CloudantDetector"
},
{
"hex_limit": 3,
"name": "HexHighEntropyString"
},
{
"name": "IbmCloudIamDetector"
},
{
"name": "IbmCosHmacDetector"
},
{
"name": "JwtTokenDetector"
},
{
"keyword_exclude": null,
"name": "KeywordDetector"
},
{
"name": "MailchimpDetector"
},
{
"name": "PrivateKeyDetector"
},
{
"name": "SlackDetector"
},
{
"name": "SoftlayerDetector"
},
{
"name": "StripeDetector"
},
{
"name": "TwilioKeyDetector"
}
],
"results": {},
"version": "0.14.3",
"word_list": {
"file": null,
"hash": null
}
}
49 changes: 20 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,42 +1,19 @@
# mapply

[![build](https://img.shields.io/github/workflow/status/ddelange/mapply/GH/master?logo=github&cacheSeconds=86400)](https://github.com/ddelange/mapply/actions?query=branch%3Amaster)
[![build](https://img.shields.io/github/workflow/status/ddelange/mapply/CI/master?logo=github&cacheSeconds=86400)](https://github.com/ddelange/mapply/actions?query=branch%3Amaster)
[![readthedocs](https://readthedocs.org/projects/mapply/badge/?version=latest)](https://mapply.readthedocs.io/en/latest/?badge=latest)
[![codecov](https://img.shields.io/codecov/c/github/ddelange/mapply/master?logo=codecov&logoColor=white)](https://codecov.io/gh/ddelange/mapply)
[![pypi Version](https://img.shields.io/pypi/v/mapply.svg?logo=pypi&logoColor=white)](https://pypi.org/project/mapply/)
[![python](https://img.shields.io/pypi/pyversions/mapply.svg?logo=python&logoColor=white)](https://pypi.org/project/mapply/)
[![downloads](https://pepy.tech/badge/mapply)](https://pypistats.org/packages/mapply)
[![black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/python/black)

[`mapply`](https://github.com/ddelange/mapply) provides sensible multi-core apply/map/applymap functions for Pandas.

## Initial setup of this repo
### mapply vs. pandarallel vs. swifter

Add this repo to Github:
Where [`pandarallel`](https://github.com/nalepae/pandarallel) only requires [`dill`](https://github.com/uqfoundation/dill) (and therefore has to rely on in-house multiprocessing and progressbars), [`swifter`](https://github.com/jmcarpenter2/swifter) relies on the heavy [`dask`](https://github.com/dask/dask) framework, converting to Dask DataFrames and back. In an attempt to find the golden mean, `mapply` is highly customizable and remains lightweight, leveraging the powerful [`pathos`](https://github.com/uqfoundation/pathos) framework, which shadows Python's built-in multiprocessing module using `dill` for universal pickling.

- [Create a new repository](https://github.com/new) on GitHub. Only fill in `mapply` and an optional description and click `Create repository`. Do not initialize the new repository with README, license, or gitignore files.

- Now push this repo to Github (`__version__` is populated based on tags, so tag the initial commit):

```sh
cd mapply
git init .
git add .
git commit -m ':tada: Initial commit'
git tag -a "0.1.0-rc.1" -m 'Initial release candidate. Bump version on GitHub and it will be reflected on the next `git pull; pip install -e .`'
git remote add origin https://github.com/ddelange/mapply.git
git push --set-upstream origin master
```

- This repo contains GitHub Actions to to run `linting`, `tests`, `codecov`, and `PyPi` deploys for all GitHub releases.

- This requires `$PYPI_USER` and `$PYPI_PASSWORD` and `$CODECOV_TOKEN` (found under `Repository Upload Token` at https://codecov.io/gh/ddelange/mapply/settings)

- Add these variables to the repo's secrets here: https://github.com/ddelange/mapply/settings/secrets

- It is also recommended to make `master` a protected branch. The first two ticks should be enough (`Require branches to be up to date before merging` is also nice, and `Include administrators` will avoid accidental pushes to `master`): https://github.com/ddelange/mapply/settings/branch_protection_rules/new

- If you'd like, add a LICENSE.md file manually or via GitHub GUI (don't forget to pull afterwards), and add an appropriate keyword to [`setup()`](setup.py), e.g. `license="MIT"`, and the appropriate [classifier](https://pypi.org/classifiers/), e.g. `"License :: OSI Approved :: MIT License"`.

- You can remove this (now unnecessary) section.

## Installation

Expand All @@ -48,8 +25,22 @@ $ pip install mapply

## Usage

For documentation, see [mapply.readthedocs.io](https://mapply.readthedocs.io/en/latest).

```py
# TODO
import pandas as pd
import mapply

mapply.init(
n_workers=-1,
chunk_size=100,
max_chunks_per_worker=10,
progressbar=False
)

df = pd.DataFrame({"a": list(range(100))})

df["squared"] = df.mapply(lambda x: x ** 2)
```

## Development
Expand Down
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

project = "mapply"
author = "ddelange"
copyright = "ddelange"
copyright = "2020, ddelange" # noqa:A001

# The full version, including alpha/beta/rc tags
release = __version__
Expand Down
2 changes: 1 addition & 1 deletion requirements/ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mypy~=0.782
pre-commit~=2.6.0
pytest-cov~=2.10.1
pytest-env~=0.6.2
pytest-randomly~=3.4.1
pytest-sugar~=0.9.4
pytest~=4.6
testfixtures~=6.14.1
pandas
3 changes: 3 additions & 0 deletions requirements/prod.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pathos
psutil
tqdm>=4.27 # from tqdm.auto import tqdm
11 changes: 6 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from os import path
from setuptools import setup

from setuptools import find_packages, setup

here = path.abspath(path.dirname(__file__))

Expand Down Expand Up @@ -31,19 +32,18 @@ def read_readme(path):
install_requires=read_requirements(requirements_path),
use_scm_version={"write_to": "src/mapply/_version.py"},
package_dir={"": "src"},
packages=find_packages(where="src"),
author="ddelange",
author_email="[email protected]",
url="https://github.com/ddelange/mapply",
python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*",
python_requires=">=3.6",
classifiers=[
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"Operating System :: OS Independent",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
Expand All @@ -52,4 +52,5 @@ def read_readme(path):
"Topic :: Utilities",
],
keywords="pandas parallel apply map applymap multicore multiprocessing",
license="MIT",
)
43 changes: 41 additions & 2 deletions src/mapply/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,41 @@
# flake8: noqa:F401
from mapply._version import version as __version__
from functools import partialmethod

from mapply._version import version as __version__ # noqa:F401
from mapply.mapply import mapply as _mapply


def init(
*,
n_workers: int = -1,
chunk_size: int = 100,
max_chunks_per_worker: int = 20,
progressbar: bool = True,
apply_name: str = "mapply",
map_name: str = "mmap",
applymap_name: str = "mapplymap",
):
"""Initialize and patch PandasObject.
Args:
n_workers: Amount of workers (processes) to spawn.
chunk_size: Minimum amount of items per chunk. Determines upper limit for n_chunks.
max_chunks_per_worker: Upper limit on amount of chunks per worker. Will lower
n_chunks determined by chunk_size if necessary. Set to 0 to skip this check.
progressbar: Whether to wrap the chunks in a tqdm.auto.tqdm.
apply_name: Attribute name for the patched apply function.
map_name: Attribute name for the patched map function.
applymap_name: Attribute name for the patched applymap function.
"""
from pandas.core.base import PandasObject

setattr(
PandasObject,
apply_name,
partialmethod(
_mapply,
n_workers=n_workers,
chunk_size=chunk_size,
max_chunks_per_worker=max_chunks_per_worker,
progressbar=progressbar,
),
)
97 changes: 97 additions & 0 deletions src/mapply/mapply.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
from functools import partial
from typing import Any, Callable, Union

from mapply.parallel import N_CORES, multiprocessing_imap


def _choose_n_chunks(
df_or_series: Any,
n_workers: int,
chunk_size: int,
max_chunks_per_worker: int,
):
"""Choose final amount of chunks to be sent to the ProcessingPool."""
# no sense running parallel if data is too small
n_chunks = int(len(df_or_series) / chunk_size)

if max_chunks_per_worker:
# no sense making too many chunks
n_chunks = min(n_chunks, max_chunks_per_worker * N_CORES)
if n_chunks < 1 or n_workers == 1 or N_CORES == 1:
# no sense running parallel
n_chunks = 1

return n_chunks


def mapply(
df_or_series: Any,
function: Callable,
axis: Union[int, str] = 0,
*,
n_workers: int = -1,
chunk_size: int = 100,
max_chunks_per_worker: int = 20,
progressbar: bool = True,
args=(),
**kwargs
) -> Any:
"""Run apply on n_workers. Split in chunks, gather results, and concat them.
Args:
df_or_series: Argument reserved to the class instance, a.k.a. 'self'.
function: Function to apply to each column or row.
axis: Axis along which the function is applied.
n_workers: Amount of workers (processes) to spawn.
chunk_size: Minimum amount of items per chunk. Determines upper limit for n_chunks.
max_chunks_per_worker: Upper limit on amount of chunks per worker. Will lower
n_chunks determined by chunk_size if necessary. Set to 0 to skip this check.
progressbar: Whether to wrap the chunks in a tqdm.auto.tqdm.
args: Additional positional arguments to pass to function.
kwargs: Additional keyword arguments to pass to function.
Returns:
Series or DataFrame resulting from applying function along given axis.
"""
from numpy import array_split
from pandas import Series, concat

n_chunks = _choose_n_chunks(
df_or_series,
n_workers,
chunk_size,
max_chunks_per_worker,
)

if isinstance(axis, str):
axis = ["index", "columns"].index(axis)

if axis == 1:
# axis argument pre-processing
df_or_series = df_or_series.T

dfs = array_split(df_or_series, n_chunks, axis=axis)

def run_apply(function, df, args=(), **kwargs):
# axis argument is handled such that always axis=0 here
return df.apply(function, args=args, **kwargs) # pragma: no cover

results = multiprocessing_imap(
partial(run_apply, function, args=args, **kwargs),
dfs,
n_workers=n_workers,
progressbar=progressbar,
)

if (
len(results) > 1
and isinstance(results[0], Series)
and results[0].index.equals(results[1].index)
):
# one more aggregation needed for final df, e.g. df.parallel_apply(sum)
return concat(results, axis=1).apply(function, axis=1, args=args, **kwargs)

if axis == 1:
# axis argument pre-processing
results = (df.T for df in results) # type: ignore
return concat(results)
Loading

0 comments on commit 05c1adb

Please sign in to comment.