Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment with polars #472

Draft
wants to merge 28 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1b64e3e
Experiment with polars
ecomodeller Nov 29, 2024
0615fc3
Use polars in `skill.py`
ecomodeller Dec 1, 2024
309ab91
Fix score
ecomodeller Dec 2, 2024
332812f
103 failed, 465 passed
ecomodeller Dec 2, 2024
fb8ea17
97 failed, 471 passed
ecomodeller Dec 2, 2024
9c79144
Temporarily disable the tests
ecomodeller Dec 2, 2024
a5aceca
Fix gridded_skill
ecomodeller Dec 8, 2024
5aa3b45
n_min
ecomodeller Dec 8, 2024
bafe3e9
Collection gridded skill
ecomodeller Dec 8, 2024
b8b1840
71 failed, 497 passed
ecomodeller Dec 9, 2024
b9a7707
64 failed, 504 passed
ecomodeller Dec 9, 2024
a40a6a9
51 failed, 517 passed
ecomodeller Dec 9, 2024
7c9daff
28 failed, 537 passed
ecomodeller Dec 9, 2024
12effff
Taylor etc.
ecomodeller Dec 10, 2024
96c0626
Group by freq
ecomodeller Dec 10, 2024
856a329
More freq
ecomodeller Dec 10, 2024
524ff1a
15 tests left
ecomodeller Dec 11, 2024
d3d4402
Some custom function (except peak_ratio)
ecomodeller Dec 11, 2024
6b303e0
Disable mypy for now
ecomodeller Dec 11, 2024
90078c3
Min Python 3.10
ecomodeller Dec 11, 2024
cda42ef
Tests passing
ecomodeller Dec 11, 2024
2ab4915
Floating point precison
ecomodeller Dec 11, 2024
387e957
melt -> unpivot
ecomodeller Dec 11, 2024
a597864
Remove deprecated stuff, this PR will not be merged any time soon.
ecomodeller Dec 11, 2024
ddbe55c
Merge branch 'main' into polars
ecomodeller Dec 12, 2024
4ee497a
Incoming updates
ecomodeller Dec 12, 2024
4c8568d
Merge branch 'main' into polars
ecomodeller Dec 12, 2024
4631a56
Remove unused code
ecomodeller Dec 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/notebooks_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ on:

jobs:
build:
# disable this job for now
if : false

runs-on: ubuntu-latest

Expand Down
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,6 @@ notebooks/sandbox.ipynb
streamlit_app.py
notes.md

notebooks/Untitled.ipynb
notebooks/Untitled.ipynb

uv.lock
159 changes: 105 additions & 54 deletions modelskill/comparison/_collection.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import annotations
from collections import defaultdict
from copy import deepcopy
import os
from pathlib import Path
Expand All @@ -21,9 +22,9 @@
import zipfile
import numpy as np
import pandas as pd
import polars as pl


from .. import metrics as mtr

from ._collection_plotter import ComparerCollectionPlotter
from ..skill import SkillTable
Expand Down Expand Up @@ -449,36 +450,50 @@ def skill(
cc = self

pmetrics = _parse_metric(metrics)
# TODO don't use hardcoded metrics
# pmetrics = ["n", "bias", "rmse", "urmse", "mae"]

agg_cols = _parse_groupby(by, n_mod=cc.n_models, n_qnt=cc.n_quantities)
agg_cols, attrs_keys = self._attrs_keys_in_by(agg_cols)

df = cc._to_long_dataframe(attrs_keys=attrs_keys, observed=observed)
assert "model" in df.columns

res = _groupby_df(df, by=agg_cols, metrics=pmetrics)
mtr_cols = [m.__name__ for m in pmetrics] # type: ignore
res = res.dropna(subset=mtr_cols, how="all") # TODO: ok to remove empty?
res = self._append_xy_to_res(res, cc)
res = cc._add_as_col_if_not_in_index(df, skilldf=res) # type: ignore

# TODO this should not be necessary
if "model" not in res.columns:
res = res.with_columns(pl.lit(cc.mod_names[0]).alias("model"))
return SkillTable(res)

def _to_long_dataframe(
self, attrs_keys: Iterable[str] | None = None, observed: bool = False
) -> pd.DataFrame:
) -> pl.DataFrame:
"""Return a copy of the data as a long-format pandas DataFrame (for groupby operations)"""
frames = []
for cmp in self:
frame = cmp._to_long_dataframe(attrs_keys=attrs_keys)

if self.n_quantities > 1:
frame["quantity"] = cmp.quantity.name
frame = frame.with_columns(pl.lit(cmp.quantity.name).alias("quantity"))
frames.append(frame)
res = pd.concat(frames)

cat_cols = res.select_dtypes(include=["object"]).columns
res[cat_cols] = res[cat_cols].astype("category")
# convert all ints and floats to f64
import polars.selectors as cs

# frames = [
# df.with_columns([pl.col(pl.Float32, pl.Int32, pl.Int64).cast(pl.Float64)])
# for df in frames
# ]
frames = [df.with_columns(cs.numeric().cast(pl.Float64)) for df in frames]

# TODO why doesn't all frames have the same columns?
res = pl.concat(frames, how="diagonal")
if observed and attrs_keys is not None:
# res = res.loc[~(res == False).any(axis=1)] # noqa
res = res.filter(pl.col(attrs_keys).is_not_null())

if observed:
res = res.loc[~(res == False).any(axis=1)] # noqa
return res

@staticmethod
Expand All @@ -495,21 +510,24 @@ def _attrs_keys_in_by(by: List[str | pd.Grouper]) -> Tuple[List[str], List[str]]
return agg_cols, attrs_keys

@staticmethod
def _append_xy_to_res(res: pd.DataFrame, cc: ComparerCollection) -> pd.DataFrame:
def _append_xy_to_res(res: pl.DataFrame, cc: ComparerCollection) -> pl.DataFrame:
"""skill() helper: Append x and y to res if possible"""
res["x"] = np.nan
res["y"] = np.nan

# for MultiIndex in res find "observation" level and
# insert x, y if gtype=point for that observation
if "observation" in res.index.names:
idx_names = res.index.names
res = res.reset_index()
for cmp in cc:
if cmp.gtype == "point":
res.loc[res.observation == cmp.name, "x"] = cmp.x
res.loc[res.observation == cmp.name, "y"] = cmp.y
res = res.set_index(idx_names)

if "observation" not in res.columns:
return res
xs = defaultdict(lambda: np.nan)
ys = defaultdict(lambda: np.nan)
for cmp in cc:
if cmp.gtype == "point":
xs[cmp.name] = float(cmp.x)
ys[cmp.name] = float(cmp.y)

# add x and y to res based on observation name based on the xs and ys dicts
res = res.with_columns(
pl.col("observation").map_elements(lambda name: xs.get(name)).alias("x"),
pl.col("observation").map_elements(lambda name: ys.get(name)).alias("y"),
)

return res

def _add_as_col_if_not_in_index(
Expand Down Expand Up @@ -602,6 +620,9 @@ def gridded_skill(

metrics = _parse_metric(metrics)

# TODO avoid hardcoded metrics
# metrics = ["n", "bias", "rmse", "mae"]

df = cmp._to_long_dataframe()
df = _add_spatial_grid_to_df(df=df, bins=bins, binsize=binsize)

Expand All @@ -611,13 +632,17 @@ def gridded_skill(
if "y" not in agg_cols:
agg_cols.insert(0, "y")

df = df.drop(columns=["x", "y"]).rename(columns=dict(xBin="x", yBin="y"))
res = _groupby_df(df, by=agg_cols, metrics=metrics, n_min=n_min)
ds = res.to_xarray().squeeze()
df = df.drop(["x", "y"]).rename(dict(xBin="x", yBin="y"))
res = _groupby_df(df, by=agg_cols, metrics=metrics, n_min=n_min) # type: ignore

potential_cols = ["x", "y", "model", "observation"]
cols = [c for c in potential_cols if c in res.columns]

ds = res.to_pandas().set_index(cols).to_xarray().squeeze()

# change categorial index to coordinates
for dim in ("x", "y"):
ds[dim] = ds[dim].astype(float)
# for dim in ("x", "y"):
# ds[dim] = ds[dim].astype(float)
return SkillGrid(ds)

def mean_skill(
Expand Down Expand Up @@ -675,39 +700,66 @@ def mean_skill(
cc = self

df = cc._to_long_dataframe() # TODO: remove
assert "model" in df.columns
mod_names = cc.mod_names
# obs_names = cmp.obs_names # df.observation.unique()
qnt_names = cc.quantity_names

# skill assessment
pmetrics = _parse_metric(metrics)

# TODO avoid hardcoded metrics
# pmetrics = ["n", "bias", "rmse", "mae"]
sk = cc.skill(metrics=pmetrics)
if sk is None:
return None
skilldf = sk.to_dataframe()
assert "model" in skilldf.columns

# weights
weights = cc._parse_weights(weights, sk.obs_names)
skilldf["weights"] = (
skilldf.n if weights is None else np.tile(weights, len(mod_names)) # type: ignore
)
# skilldf["weights"] = (
# skilldf.n if weights is None else np.tile(weights, len(mod_names)) # type: ignore
# )
if weights is None:
skilldf = skilldf.with_columns(pl.col("n").alias("weights"))
else:
wdict = {o: w for o, w in zip(sk.obs_names, weights)}
skilldf = skilldf.with_columns(
pl.col("observation")
.map_elements(lambda name: wdict.get(name))
.alias("weights")
)

def weighted_mean(x: Any) -> Any:
return np.average(x, weights=skilldf.loc[x.index, "weights"])
# def weighted_mean(x: Any) -> Any:
# return np.average(x, weights=skilldf.loc[x.index, "weights"])
# weighted_mean = pl.

# group by
by = cc._mean_skill_by(skilldf, mod_names, qnt_names) # type: ignore
agg = {"n": "sum"}
for metric in pmetrics: # type: ignore
agg[metric.__name__] = weighted_mean # type: ignore
res = skilldf.groupby(by, observed=False).agg(agg)
# agg = {"n": "sum"}
# for metric in pmetrics: # type: ignore
# agg[metric.__name__] = weighted_mean # type: ignore
# (pl.col("values") * pl.col("weights")).sum() / pl.col("weights").sum()).alias("weighted_mean")
exprs = [
(pl.col(metric) * pl.col("weights")).sum() / pl.col("weights").sum()
for metric in pmetrics
]
# res = skilldf.groupby(by, observed=False).agg(agg)

# Numpy ufuncs are supported, e.g.
# df.select(np.log(pl.all()).name.suffix("_log"))
res = skilldf.group_by(by).agg(exprs)

# res = skilldf.group_by(by).agg(np.average(pl.all(), weights=pl.col("weights")))

# TODO is this correct?
res.index.name = "model"
# res.index.name = "model"

# output
res = cc._add_as_col_if_not_in_index(df, res, fields=["model", "quantity"]) # type: ignore
return SkillTable(res.astype({"n": int}))
# res = cc._add_as_col_if_not_in_index(df, res, fields=["model", "quantity"]) # type: ignore
return SkillTable(res)
# return SkillTable(res.astype({"n": int}))

# def mean_skill_points(
# self,
Expand Down Expand Up @@ -760,7 +812,8 @@ def weighted_mean(x: Any) -> Any:
# return cmp.skill(metrics=metrics) # NOT CORRECT - SEE ABOVE

def _mean_skill_by(self, skilldf, mod_names, qnt_names): # type: ignore
by = []
# TODO clean up this mess
by = ["model"]
if len(mod_names) > 1:
by.append("model")
if len(qnt_names) > 1:
Expand All @@ -771,9 +824,11 @@ def _mean_skill_by(self, skilldf, mod_names, qnt_names): # type: ignore
elif "model" in skilldf:
by.append("model")
else:
by = [mod_names[0]] * len(skilldf)
return by
# by = [mod_names[0]] * len(skilldf)
by = None
return list(set(by))

# TODO add useful type hints
def _parse_weights(self, weights: Any, observations: Any) -> Any:
if observations is None:
observations = self.obs_names
Expand Down Expand Up @@ -819,7 +874,7 @@ def _parse_weights(self, weights: Any, observations: Any) -> Any:

def score(
self,
metric: str | Callable = mtr.rmse,
metric: str | Callable = "rmse",
**kwargs: Any,
) -> Dict[str, float]:
"""Weighted mean score of model(s) over all observations
Expand Down Expand Up @@ -878,21 +933,15 @@ def score(
if not (callable(metric) or isinstance(metric, str)):
raise ValueError("metric must be a string or a function")

assert kwargs == {}, f"Unknown keyword arguments: {kwargs}"

cmp = self

if cmp.n_points == 0:
raise ValueError("Dataset is empty, no data to compare.")

sk = cmp.mean_skill(weights=weights, metrics=[metric])
df = sk.to_dataframe()
assert isinstance(metric, str)

metric_name = metric if isinstance(metric, str) else metric.__name__
ser = df[metric_name]
score = {str(col): float(value) for col, value in ser.items()}

return score
return {k: v for k, v in sk.sort("model").select(["model", metric]).rows()}

def save(self, filename: Union[str, Path]) -> None:
"""Save the ComparerCollection to a zip file.
Expand Down Expand Up @@ -963,3 +1012,5 @@ def _load_comparer(folder: str, f: str) -> Comparer:
cmp = Comparer.load(f)
os.remove(f)
return cmp


Loading
Loading