Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic statistics #510

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 196 additions & 10 deletions mikeio/generic.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
from typing import Iterable, List, Union
from typing import Iterable, List, Union, Callable
import math
import numpy as np
import pandas as pd
Expand Down Expand Up @@ -519,7 +519,7 @@ def extract(
"""
dfs_i = DfsFileFactory.DfsGenericOpenEdit(str(infilename))

is_layered_dfsu = dfs_i.ItemInfo[0].Name == "Z coordinate"
is_layered_dfsu = _is_layered_dfsu(dfs_i.ItemInfo[0])

file_start_new, start_step, start_sec, end_step, end_sec = _parse_start_end(
dfs_i, start, end
Expand Down Expand Up @@ -571,6 +571,12 @@ def extract(
dfs_o.Close()


def _is_layered_dfsu(iteminfo0):
if (iteminfo0.Name == "Z coordinate") and (EUMType(iteminfo0.Quantity.Item) == EUMType.ItemGeometry3D):
return True
else:
return False

def _parse_start_end(dfs_i, start, end):
"""Helper function for parsing start and end arguments"""
n_time_steps = dfs_i.FileInfo.TimeAxis.NumberOfTimeSteps
Expand Down Expand Up @@ -725,6 +731,187 @@ def avg_time(infilename: str, outfilename: str, skipna=True):
dfs_o.Close()


def statistics(infilename: str, outfilename: str, items=None):
"""Calculate time statistics: mean, min, and max

Parameters
----------
infilename : str
input filename
outfilename : str
output filename
items: List[str] or List[int], optional
Process only selected items, by number (0-based)
or name, by default: all
"""
dfs_i = DfsFileFactory.DfsGenericOpen(infilename)

is_layered_dfsu = _is_layered_dfsu(dfs_i.ItemInfo[0])

item_numbers = _valid_item_numbers(dfs_i.ItemInfo, items, ignore_first=is_layered_dfsu)
if is_layered_dfsu:
item_numbers = [it + 1 for it in item_numbers]

n_items_in = len(item_numbers)

n_time_steps = dfs_i.FileInfo.TimeAxis.NumberOfTimeSteps

functxt = ["Mean", "Minimum", "Maximum"]
core_items = [dfs_i.ItemInfo[i] for i in item_numbers]
items = _get_repeated_items(core_items, prefixes=functxt)

if is_layered_dfsu:
items.insert(0, dfs_i.ItemInfo[0])

dfs_o = _clone(infilename, outfilename, items=items)

sumlist = []
minlist = []
maxlist = []
steplist = []

# step 0
for item in item_numbers:
d = _read_item(dfs_i, item, 0)
sumlist.append(d)
minlist.append(d.copy())
maxlist.append(d.copy())

step0 = np.zeros_like(d, dtype=np.int32)
step0[~np.isnan(d)] = 1
steplist.append(step0)

for timestep in trange(1, n_time_steps, disable=not show_progress):
for item in range(n_items_in):
d = _read_item(dfs_i, item_numbers[item], timestep)

smaller = d < minlist[item]
minlist[item][smaller] = d[smaller]
larger = d > maxlist[item]
maxlist[item][larger] = d[larger]

sumlist[item][~np.isnan(d)] += d[~np.isnan(d)]
steplist[item][~np.isnan(d)] += 1

if is_layered_dfsu:
znitemdata = dfs_i.ReadItemTimeStep(1, 0)
# TODO should this be static Z coordinates instead?
dfs_o.WriteItemTimeStepNext(0.0, znitemdata.Data)

for item in range(n_items_in):
n = steplist[item]
mean = sumlist[item]
mean[n>0] = mean[n>0]/n[n>0]
dfs_o.WriteItemTimeStepNext(0.0, mean.astype(np.float32))
dfs_o.WriteItemTimeStepNext(0.0, minlist[item].astype(np.float32))
dfs_o.WriteItemTimeStepNext(0.0, maxlist[item].astype(np.float32))

dfs_o.Close()


def aggregate(
infilename: str, outfilename: str, func: Callable, name: str=None, *, items=None, buffer_size=1.0e9, **kwargs
) -> None:
"""Create temporal aggregation of items in dfs file

Parameters
----------
infilename : str
input filename
outfilename : str
output filename
func : function
Aggregation function, e.g. np.nanmean
name : str, optional
function name
items : List[str] or List[int], optional
Process only selected items, by number (0-based) or name, by default: all
buffer_size: float, optional
for huge files the quantiles need to be calculated for chunks of
elements. buffer_size gives the maximum amount of memory available
for the computation in bytes, by default 1e9 (=1GB)

Examples
--------
>>> aggregate("in.dfsu", "time_min_max.dfsu", [np.argmin, np.argmax])
"""
dfs_i = DfsFileFactory.DfsGenericOpen(infilename)

is_layered_dfsu = _is_layered_dfsu(dfs_i.ItemInfo[0])

item_numbers = _valid_item_numbers(dfs_i.ItemInfo, items, ignore_first=is_layered_dfsu)
if is_layered_dfsu:
item_numbers = [it + 1 for it in item_numbers]

n_items_in = len(item_numbers)

n_time_steps = dfs_i.FileInfo.TimeAxis.NumberOfTimeSteps

ci = _ChunkInfo.from_dfs(dfs_i, item_numbers, buffer_size)

fvec = [func] if callable(func) else func
if name is None:
ftxt = [getattr(f, '__name__', 'Aggregate') for f in fvec]
else:
ftxt = [name] # TODO check if list... etc
core_items = [dfs_i.ItemInfo[i] for i in item_numbers]
items = _get_repeated_items(core_items, prefixes=ftxt)

if is_layered_dfsu:
items.insert(0, dfs_i.ItemInfo[0])

dfs_o = _clone(infilename, outfilename, items=items)

n_items_out = len(items)
if is_layered_dfsu:
n_items_out = n_items_out - 1

datalist = []
outdatalist = []

for item in item_numbers:
indata = _read_item(dfs_i, item, 0)
for _ in fvec:
outdatalist.append(np.zeros_like(indata))
datalist.append(np.zeros((n_time_steps, ci.chunk_size)))

e1 = 0
for _ in range(ci.n_chunks):
e2 = ci.stop(e1)
# the last chunk may be smaller than the rest:
chunk_end = ci.chunk_end(e1)

# read all data for this chunk
for timestep in range(n_time_steps):
item_out = 0
for item_no in item_numbers:
itemdata = _read_item(dfs_i, item_no, timestep)
data_chunk = itemdata[e1:e2]
datalist[item_out][timestep, 0:chunk_end] = data_chunk
item_out += 1

# calculate aggregates (for this chunk)
item_out = 0
for item in range(n_items_in):
for f in fvec:
data = f(datalist[item][:, 0:chunk_end], axis=0, keepdims=False, **kwargs)
outdatalist[item_out][e1:e2] = data
item_out += 1

e1 = e2

if is_layered_dfsu:
znitemdata = dfs_i.ReadItemTimeStep(1, 0)
# TODO should this be static Z coordinates instead?
dfs_o.WriteItemTimeStepNext(0.0, znitemdata.Data)

for item in range(n_items_out):
darray = outdatalist[item].astype(np.float32)
dfs_o.WriteItemTimeStepNext(0.0, darray)

dfs_o.Close()


def quantile(
infilename: str, outfilename: str, q, *, items=None, skipna=True, buffer_size=1.0e9
):
Expand Down Expand Up @@ -760,12 +947,11 @@ def quantile(

dfs_i = DfsFileFactory.DfsGenericOpen(infilename)

is_dfsu_3d = dfs_i.ItemInfo[0].Name == "Z coordinate"

item_numbers = _valid_item_numbers(dfs_i.ItemInfo, items)
is_layered_dfsu = _is_layered_dfsu(dfs_i.ItemInfo[0])

if is_dfsu_3d and 0 in item_numbers:
item_numbers.remove(0) # Remove Zn item for special treatment
item_numbers = _valid_item_numbers(dfs_i.ItemInfo, items, ignore_first=is_layered_dfsu)
if is_layered_dfsu:
item_numbers = [it + 1 for it in item_numbers]

n_items_in = len(item_numbers)

Expand All @@ -780,13 +966,13 @@ def quantile(
core_items = [dfs_i.ItemInfo[i] for i in item_numbers]
items = _get_repeated_items(core_items, prefixes=qtxt)

if is_dfsu_3d:
if is_layered_dfsu:
items.insert(0, dfs_i.ItemInfo[0])

dfs_o = _clone(infilename, outfilename, items=items)

n_items_out = len(items)
if is_dfsu_3d:
if is_layered_dfsu:
n_items_out = n_items_out - 1

datalist = []
Expand Down Expand Up @@ -824,7 +1010,7 @@ def quantile(

e1 = e2

if is_dfsu_3d:
if is_layered_dfsu:
znitemdata = dfs_i.ReadItemTimeStep(1, 0)
# TODO should this be static Z coordinates instead?
dfs_o.WriteItemTimeStepNext(0.0, znitemdata.Data)
Expand Down
90 changes: 89 additions & 1 deletion tests/test_generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np
import pandas as pd
import mikeio
from mikeio import Dfsu
#from mikeio import Dfsu
from mikeio import generic
from mikeio.generic import scale, diff, sum, extract, avg_time, fill_corrupt
import pytest
Expand Down Expand Up @@ -518,6 +518,94 @@ def test_time_average_deletevalues(tmpdir):
assert np.allclose(org[0].to_numpy()[~nan1], averaged[0].to_numpy()[~nan2])


def test_statistics(tmpdir):

infilename = "tests/testdata/NorthSea_HD_and_windspeed.dfsu"
outfilename = os.path.join(tmpdir.dirname, "NorthSea_HD_and_windspeed_stats.dfsu")
generic.statistics(infilename, outfilename)

dso = mikeio.read(infilename)
dsa = mikeio.read(outfilename)

assert dso.time[0] == dsa.time[0]
assert dso.shape[1] == dsa.shape[1]
assert dsa.shape[0] == 1
assert np.allclose(dso[0].mean().values, dsa[0].values)
assert np.allclose(dso[0].min().values, dsa[1].values)
assert np.allclose(dso[0].max().values, dsa[2].values)

# select items
generic.statistics(infilename, outfilename, items=0)
dsa2 = mikeio.read(outfilename)
assert np.allclose(dsa2[0].values, dsa[0].values)
assert dsa2.items[0].name == "Mean, Surface elevation"
assert dsa2.n_items == 3


def test_statistics_dfsu_3d(tmpdir):
infilename = "tests/testdata/oresund_sigma_z.dfsu"
outfilename = os.path.join(tmpdir, "oresund_sigma_z_stats.dfsu")
generic.statistics(infilename, outfilename)

dso = mikeio.read(infilename)
dsa = mikeio.read(outfilename)

assert dsa.n_timesteps == 1
assert dso.start_time == dsa.start_time
assert dso.n_items*3 == dsa.n_items
assert np.allclose(dso.Temperature.min().values, dsa.Minimum_Temperature.values)

# select items
generic.statistics(infilename, outfilename, items=0)
dsa2 = mikeio.read(outfilename)
assert dsa2.items[0].name == "Mean, Temperature"
assert np.allclose(dsa2.Mean_Temperature.values, dsa.Mean_Temperature.values)


def test_aggregate(tmpdir):

infilename = "tests/testdata/NorthSea_HD_and_windspeed.dfsu"
outfilename = os.path.join(tmpdir.dirname, "NorthSea_HD_and_windspeed_stats.dfsu")
generic.aggregate(infilename, outfilename, func=[np.mean, np.min, np.max])

dso = mikeio.read(infilename)
dsa = mikeio.read(outfilename)

assert dso.time[0] == dsa.time[0]
assert dso.shape[1] == dsa.shape[1]
assert dsa.shape[0] == 1
assert np.allclose(dso[0].mean().values, dsa[0].values)
assert np.allclose(dso[0].min().values, dsa[1].values)
assert np.allclose(dso[0].max().values, dsa[2].values)

# select items
generic.aggregate(infilename, outfilename, func=np.mean, name="Mean", items=0)
dsa2 = mikeio.read(outfilename)
assert np.allclose(dsa2[0].values, dsa[0].values)
assert dsa2.items[0].name == "Mean, Surface elevation"
assert dsa2.n_items == 1


def test_aggregate_dfsu_3d(tmpdir):
infilename = "tests/testdata/oresund_sigma_z.dfsu"
outfilename = os.path.join(tmpdir, "oresund_sigma_z_stats.dfsu")
generic.aggregate(infilename, outfilename, func=[np.mean, np.min, np.max])

dso = mikeio.read(infilename)
dsa = mikeio.read(outfilename)

assert dsa.n_timesteps == 1
assert dso.start_time == dsa.start_time
assert dso.n_items*3 == dsa.n_items
assert np.allclose(dso.Temperature.min().values, dsa.amin_Temperature.values)

# select items
generic.aggregate(infilename, outfilename, func=np.mean, name="mymean", items=0)
dsa2 = mikeio.read(outfilename)
assert dsa2.items[0].name == "mymean, Temperature"
assert np.allclose(dsa2.mymean_Temperature.values, dsa.mean_Temperature.values)


def test_quantile_dfsu(tmpdir):

infilename = "tests/testdata/oresundHD_run1.dfsu"
Expand Down