diff --git a/mikeio/generic.py b/mikeio/generic.py index c805b1424..c18c74296 100644 --- a/mikeio/generic.py +++ b/mikeio/generic.py @@ -1,5 +1,5 @@ import os -from typing import Iterable, List, Union +from typing import Iterable, List, Union, Callable import math import numpy as np import pandas as pd @@ -519,7 +519,7 @@ def extract( """ dfs_i = DfsFileFactory.DfsGenericOpenEdit(str(infilename)) - is_layered_dfsu = dfs_i.ItemInfo[0].Name == "Z coordinate" + is_layered_dfsu = _is_layered_dfsu(dfs_i.ItemInfo[0]) file_start_new, start_step, start_sec, end_step, end_sec = _parse_start_end( dfs_i, start, end @@ -571,6 +571,12 @@ def extract( dfs_o.Close() +def _is_layered_dfsu(iteminfo0): + if (iteminfo0.Name == "Z coordinate") and (EUMType(iteminfo0.Quantity.Item) == EUMType.ItemGeometry3D): + return True + else: + return False + def _parse_start_end(dfs_i, start, end): """Helper function for parsing start and end arguments""" n_time_steps = dfs_i.FileInfo.TimeAxis.NumberOfTimeSteps @@ -725,6 +731,187 @@ def avg_time(infilename: str, outfilename: str, skipna=True): dfs_o.Close() +def statistics(infilename: str, outfilename: str, items=None): + """Calculate time statistics: mean, min, and max + + Parameters + ---------- + infilename : str + input filename + outfilename : str + output filename + items: List[str] or List[int], optional + Process only selected items, by number (0-based) + or name, by default: all + """ + dfs_i = DfsFileFactory.DfsGenericOpen(infilename) + + is_layered_dfsu = _is_layered_dfsu(dfs_i.ItemInfo[0]) + + item_numbers = _valid_item_numbers(dfs_i.ItemInfo, items, ignore_first=is_layered_dfsu) + if is_layered_dfsu: + item_numbers = [it + 1 for it in item_numbers] + + n_items_in = len(item_numbers) + + n_time_steps = dfs_i.FileInfo.TimeAxis.NumberOfTimeSteps + + functxt = ["Mean", "Minimum", "Maximum"] + core_items = [dfs_i.ItemInfo[i] for i in item_numbers] + items = _get_repeated_items(core_items, prefixes=functxt) + + if is_layered_dfsu: + items.insert(0, dfs_i.ItemInfo[0]) + + dfs_o = _clone(infilename, outfilename, items=items) + + sumlist = [] + minlist = [] + maxlist = [] + steplist = [] + + # step 0 + for item in item_numbers: + d = _read_item(dfs_i, item, 0) + sumlist.append(d) + minlist.append(d.copy()) + maxlist.append(d.copy()) + + step0 = np.zeros_like(d, dtype=np.int32) + step0[~np.isnan(d)] = 1 + steplist.append(step0) + + for timestep in trange(1, n_time_steps, disable=not show_progress): + for item in range(n_items_in): + d = _read_item(dfs_i, item_numbers[item], timestep) + + smaller = d < minlist[item] + minlist[item][smaller] = d[smaller] + larger = d > maxlist[item] + maxlist[item][larger] = d[larger] + + sumlist[item][~np.isnan(d)] += d[~np.isnan(d)] + steplist[item][~np.isnan(d)] += 1 + + if is_layered_dfsu: + znitemdata = dfs_i.ReadItemTimeStep(1, 0) + # TODO should this be static Z coordinates instead? + dfs_o.WriteItemTimeStepNext(0.0, znitemdata.Data) + + for item in range(n_items_in): + n = steplist[item] + mean = sumlist[item] + mean[n>0] = mean[n>0]/n[n>0] + dfs_o.WriteItemTimeStepNext(0.0, mean.astype(np.float32)) + dfs_o.WriteItemTimeStepNext(0.0, minlist[item].astype(np.float32)) + dfs_o.WriteItemTimeStepNext(0.0, maxlist[item].astype(np.float32)) + + dfs_o.Close() + + +def aggregate( + infilename: str, outfilename: str, func: Callable, name: str=None, *, items=None, buffer_size=1.0e9, **kwargs +) -> None: + """Create temporal aggregation of items in dfs file + + Parameters + ---------- + infilename : str + input filename + outfilename : str + output filename + func : function + Aggregation function, e.g. np.nanmean + name : str, optional + function name + items : List[str] or List[int], optional + Process only selected items, by number (0-based) or name, by default: all + buffer_size: float, optional + for huge files the quantiles need to be calculated for chunks of + elements. buffer_size gives the maximum amount of memory available + for the computation in bytes, by default 1e9 (=1GB) + + Examples + -------- + >>> aggregate("in.dfsu", "time_min_max.dfsu", [np.argmin, np.argmax]) + """ + dfs_i = DfsFileFactory.DfsGenericOpen(infilename) + + is_layered_dfsu = _is_layered_dfsu(dfs_i.ItemInfo[0]) + + item_numbers = _valid_item_numbers(dfs_i.ItemInfo, items, ignore_first=is_layered_dfsu) + if is_layered_dfsu: + item_numbers = [it + 1 for it in item_numbers] + + n_items_in = len(item_numbers) + + n_time_steps = dfs_i.FileInfo.TimeAxis.NumberOfTimeSteps + + ci = _ChunkInfo.from_dfs(dfs_i, item_numbers, buffer_size) + + fvec = [func] if callable(func) else func + if name is None: + ftxt = [getattr(f, '__name__', 'Aggregate') for f in fvec] + else: + ftxt = [name] # TODO check if list... etc + core_items = [dfs_i.ItemInfo[i] for i in item_numbers] + items = _get_repeated_items(core_items, prefixes=ftxt) + + if is_layered_dfsu: + items.insert(0, dfs_i.ItemInfo[0]) + + dfs_o = _clone(infilename, outfilename, items=items) + + n_items_out = len(items) + if is_layered_dfsu: + n_items_out = n_items_out - 1 + + datalist = [] + outdatalist = [] + + for item in item_numbers: + indata = _read_item(dfs_i, item, 0) + for _ in fvec: + outdatalist.append(np.zeros_like(indata)) + datalist.append(np.zeros((n_time_steps, ci.chunk_size))) + + e1 = 0 + for _ in range(ci.n_chunks): + e2 = ci.stop(e1) + # the last chunk may be smaller than the rest: + chunk_end = ci.chunk_end(e1) + + # read all data for this chunk + for timestep in range(n_time_steps): + item_out = 0 + for item_no in item_numbers: + itemdata = _read_item(dfs_i, item_no, timestep) + data_chunk = itemdata[e1:e2] + datalist[item_out][timestep, 0:chunk_end] = data_chunk + item_out += 1 + + # calculate aggregates (for this chunk) + item_out = 0 + for item in range(n_items_in): + for f in fvec: + data = f(datalist[item][:, 0:chunk_end], axis=0, keepdims=False, **kwargs) + outdatalist[item_out][e1:e2] = data + item_out += 1 + + e1 = e2 + + if is_layered_dfsu: + znitemdata = dfs_i.ReadItemTimeStep(1, 0) + # TODO should this be static Z coordinates instead? + dfs_o.WriteItemTimeStepNext(0.0, znitemdata.Data) + + for item in range(n_items_out): + darray = outdatalist[item].astype(np.float32) + dfs_o.WriteItemTimeStepNext(0.0, darray) + + dfs_o.Close() + + def quantile( infilename: str, outfilename: str, q, *, items=None, skipna=True, buffer_size=1.0e9 ): @@ -760,12 +947,11 @@ def quantile( dfs_i = DfsFileFactory.DfsGenericOpen(infilename) - is_dfsu_3d = dfs_i.ItemInfo[0].Name == "Z coordinate" - - item_numbers = _valid_item_numbers(dfs_i.ItemInfo, items) + is_layered_dfsu = _is_layered_dfsu(dfs_i.ItemInfo[0]) - if is_dfsu_3d and 0 in item_numbers: - item_numbers.remove(0) # Remove Zn item for special treatment + item_numbers = _valid_item_numbers(dfs_i.ItemInfo, items, ignore_first=is_layered_dfsu) + if is_layered_dfsu: + item_numbers = [it + 1 for it in item_numbers] n_items_in = len(item_numbers) @@ -780,13 +966,13 @@ def quantile( core_items = [dfs_i.ItemInfo[i] for i in item_numbers] items = _get_repeated_items(core_items, prefixes=qtxt) - if is_dfsu_3d: + if is_layered_dfsu: items.insert(0, dfs_i.ItemInfo[0]) dfs_o = _clone(infilename, outfilename, items=items) n_items_out = len(items) - if is_dfsu_3d: + if is_layered_dfsu: n_items_out = n_items_out - 1 datalist = [] @@ -824,7 +1010,7 @@ def quantile( e1 = e2 - if is_dfsu_3d: + if is_layered_dfsu: znitemdata = dfs_i.ReadItemTimeStep(1, 0) # TODO should this be static Z coordinates instead? dfs_o.WriteItemTimeStepNext(0.0, znitemdata.Data) diff --git a/tests/test_generic.py b/tests/test_generic.py index 626ae7713..cbafc828a 100644 --- a/tests/test_generic.py +++ b/tests/test_generic.py @@ -3,7 +3,7 @@ import numpy as np import pandas as pd import mikeio -from mikeio import Dfsu +#from mikeio import Dfsu from mikeio import generic from mikeio.generic import scale, diff, sum, extract, avg_time, fill_corrupt import pytest @@ -518,6 +518,94 @@ def test_time_average_deletevalues(tmpdir): assert np.allclose(org[0].to_numpy()[~nan1], averaged[0].to_numpy()[~nan2]) +def test_statistics(tmpdir): + + infilename = "tests/testdata/NorthSea_HD_and_windspeed.dfsu" + outfilename = os.path.join(tmpdir.dirname, "NorthSea_HD_and_windspeed_stats.dfsu") + generic.statistics(infilename, outfilename) + + dso = mikeio.read(infilename) + dsa = mikeio.read(outfilename) + + assert dso.time[0] == dsa.time[0] + assert dso.shape[1] == dsa.shape[1] + assert dsa.shape[0] == 1 + assert np.allclose(dso[0].mean().values, dsa[0].values) + assert np.allclose(dso[0].min().values, dsa[1].values) + assert np.allclose(dso[0].max().values, dsa[2].values) + + # select items + generic.statistics(infilename, outfilename, items=0) + dsa2 = mikeio.read(outfilename) + assert np.allclose(dsa2[0].values, dsa[0].values) + assert dsa2.items[0].name == "Mean, Surface elevation" + assert dsa2.n_items == 3 + + +def test_statistics_dfsu_3d(tmpdir): + infilename = "tests/testdata/oresund_sigma_z.dfsu" + outfilename = os.path.join(tmpdir, "oresund_sigma_z_stats.dfsu") + generic.statistics(infilename, outfilename) + + dso = mikeio.read(infilename) + dsa = mikeio.read(outfilename) + + assert dsa.n_timesteps == 1 + assert dso.start_time == dsa.start_time + assert dso.n_items*3 == dsa.n_items + assert np.allclose(dso.Temperature.min().values, dsa.Minimum_Temperature.values) + + # select items + generic.statistics(infilename, outfilename, items=0) + dsa2 = mikeio.read(outfilename) + assert dsa2.items[0].name == "Mean, Temperature" + assert np.allclose(dsa2.Mean_Temperature.values, dsa.Mean_Temperature.values) + + +def test_aggregate(tmpdir): + + infilename = "tests/testdata/NorthSea_HD_and_windspeed.dfsu" + outfilename = os.path.join(tmpdir.dirname, "NorthSea_HD_and_windspeed_stats.dfsu") + generic.aggregate(infilename, outfilename, func=[np.mean, np.min, np.max]) + + dso = mikeio.read(infilename) + dsa = mikeio.read(outfilename) + + assert dso.time[0] == dsa.time[0] + assert dso.shape[1] == dsa.shape[1] + assert dsa.shape[0] == 1 + assert np.allclose(dso[0].mean().values, dsa[0].values) + assert np.allclose(dso[0].min().values, dsa[1].values) + assert np.allclose(dso[0].max().values, dsa[2].values) + + # select items + generic.aggregate(infilename, outfilename, func=np.mean, name="Mean", items=0) + dsa2 = mikeio.read(outfilename) + assert np.allclose(dsa2[0].values, dsa[0].values) + assert dsa2.items[0].name == "Mean, Surface elevation" + assert dsa2.n_items == 1 + + +def test_aggregate_dfsu_3d(tmpdir): + infilename = "tests/testdata/oresund_sigma_z.dfsu" + outfilename = os.path.join(tmpdir, "oresund_sigma_z_stats.dfsu") + generic.aggregate(infilename, outfilename, func=[np.mean, np.min, np.max]) + + dso = mikeio.read(infilename) + dsa = mikeio.read(outfilename) + + assert dsa.n_timesteps == 1 + assert dso.start_time == dsa.start_time + assert dso.n_items*3 == dsa.n_items + assert np.allclose(dso.Temperature.min().values, dsa.amin_Temperature.values) + + # select items + generic.aggregate(infilename, outfilename, func=np.mean, name="mymean", items=0) + dsa2 = mikeio.read(outfilename) + assert dsa2.items[0].name == "mymean, Temperature" + assert np.allclose(dsa2.mymean_Temperature.values, dsa.mean_Temperature.values) + + def test_quantile_dfsu(tmpdir): infilename = "tests/testdata/oresundHD_run1.dfsu"