Skip to content

Commit

Permalink
Generalize ReplayMover (#17)
Browse files Browse the repository at this point in the history
* generalize ReplayMover

* bug fixes
  • Loading branch information
timothyas authored Apr 30, 2024
1 parent 47e9364 commit 977c733
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 49 deletions.
10 changes: 10 additions & 0 deletions examples/replay/config-0.25-degree.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ FV3Dataset:
coords_path_out: "gcs://noaa-ufs-gefsv13replay/ufs-hr1/0.25-degree/coordinates/zarr/"
forecast_hours : [0, 3]

cycles:
start : "1994-01-01T00"
end : "2023-10-13T06"
freq : "6h"

time:
start : "1993-12-31T18"
end : "2023-10-13T03"
freq : "3h"

chunks_out:
time : 1
pfull : 1
Expand Down
10 changes: 10 additions & 0 deletions examples/replay/config-1.00-degree.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ FV3Dataset:
coords_path_out: "gcs://noaa-ufs-gefsv13replay/ufs-hr1/1.00-degree/coordinates/zarr/"
forecast_hours : [0, 3]

cycles:
start : "1994-01-01T00"
end : "1999-06-13T06"
freq : "6h"

time:
start : "1993-12-31T18"
end : "1999-06-13T03"
freq : "3h"

chunks_in:
# estimated 37MB per chunk (the full 3D field)
time : 1
Expand Down
2 changes: 2 additions & 0 deletions examples/replay/move_one_degree.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def submit_slurm_mover(job_id, mover):
f" config_filename='{mover.config_filename}',\n"+\
f" storage_options={mover.storage_options},\n"+\
f" main_cache_path='{mover.main_cache_path}',\n"+\
f" component='{mover.component}',\n"+\
f")\n"+\
f"mover.run({job_id})"

Expand Down Expand Up @@ -62,6 +63,7 @@ def submit_slurm_mover(job_id, mover):
config_filename="config-1.00-degree.yaml",
storage_options={"token": "/contrib/Tim.Smith/.gcs/replay-service-account.json"},
main_cache_path="/lustre/Tim.Smith/tmp-replay/1.00-degree",
component="fv3",
)

localtime.start("Make and Store Container Dataset")
Expand Down
2 changes: 2 additions & 0 deletions examples/replay/move_quarter_degree.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def submit_slurm_mover(job_id, mover):
f" config_filename='{mover.config_filename}',\n"+\
f" storage_options={mover.storage_options},\n"+\
f" main_cache_path='{mover.main_cache_path}',\n"+\
f" component='{mover.component}',\n"+\
f")\n"+\
f"mover.run({job_id})"

Expand Down Expand Up @@ -62,6 +63,7 @@ def submit_slurm_mover(job_id, mover):
config_filename="config-0.25-degree.yaml",
storage_options={"token": "/contrib/Tim.Smith/.gcs/replay-service-account.json"},
main_cache_path="/lustre/Tim.Smith/tmp-replay/0.25-degree",
component="fv3",
)

localtime.start("Make and Store Container Dataset")
Expand Down
152 changes: 103 additions & 49 deletions examples/replay/replay_mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,9 @@
import dask.array as darray
from zarr import NestedDirectoryStore

from ufs2arco import FV3Dataset, Timer
from ufs2arco import FV3Dataset, MOM6Dataset, CICE6Dataset, Timer

class ReplayMover1Degree():
"""
Note:
Currently this makes the unnecessary but easy-to-implement assumption that we want forecast_hours 0 & 3.
This assumption is key to the hard coded end date and timedelta used to make :attr:`xtime`.
It should also be confirmed that this assumption does not impact the "ftime" variable, that is right now
being created in the container dataset. The values should be overwritten when the data are actually
generated, but who knows.
"""


n_jobs = None

Expand All @@ -33,22 +23,18 @@ class ReplayMover1Degree():

@property
def xcycles(self):
"""These are the DA cycle timestamps, which are every 6 hours. There is one s3 directory per cycle for replay."""
cycles = pd.date_range(start="1994-01-01", end="1999-06-13T06:00:00", freq="6h")
return xr.DataArray(cycles, coords={"cycles": cycles}, dims="cycles")
return xr.DataArray(self.cycles, coords={"cycles": self.cycles}, dims="cycles")


@property
def xtime(self):
"""These are the time stamps of the resulting dataset, assuming we are grabbing fhr00 and fhr03.
"""These are the time stamps of the resulting dataset.
This was created by processing a few DA cycles with the desired forecast hours, and figuring out
operations were needed to map from a list of all DA cycles to the resulting ``"time"``
array in the final dataset.
"""
time = pd.date_range(start="1994-01-01", end="1999-06-13T09:00:00", freq="3h")
iau_time = time - timedelta(hours=6)
return xr.DataArray(iau_time, coords={"time": iau_time}, dims="time", attrs={"long_name": "time", "axis": "T"})
return xr.DataArray(self.time, coords={"time": self.time}, dims="time", attrs={"long_name": "time", "axis": "T"})


@property
Expand Down Expand Up @@ -111,6 +97,7 @@ def __init__(
):
self.n_jobs = n_jobs
self.config_filename = config_filename
self.component = component
self.storage_options = storage_options if storage_options is not None else dict()
self.main_cache_path = main_cache_path

Expand All @@ -120,8 +107,30 @@ def __init__(
name = f"{component.upper()}Dataset" # i.e., FV3Dataset ... maybe an unnecessary generalization at this stage
self.forecast_hours = config[name]["forecast_hours"]
self.file_prefixes = config[name]["file_prefixes"]

assert tuple(self.forecast_hours) == (0, 3)
self.cycles = pd.date_range(**config[name]["cycles"])
self.time = pd.date_range(**config[name]["time"])

if component.lower() == "fv3":
self.Dataset = FV3Dataset
self.cached_path = self.fv3_path
elif component.lower() == "mom6":
self.Dataset = MOM6Dataset
self.cached_path = self.mom6_path
elif component.lower() == "cice6":
self.Dataset = CICE6Dataset
self.cached_path = self.cice6_path

# for move_single_dataset, we have to figure out how many resulting timestamps we have
# within a single DA cycle
try:
assert "freq" in config[name]["cycles"].keys()
assert "freq" in config[name]["time"].keys()

except:
raise KeyError("ReplayMover.__init__: we need 'freq' inside the config 'cycles' and 'time' sections")
delta_t_cycle = pd.Timedelta(config[name]["cycles"]["freq"])
delta_t_time = pd.Timedelta(config[name]["time"]["freq"])
self.n_steps_per_cycle = delta_t_cycle // delta_t_time


def run(self, job_id):
Expand All @@ -137,6 +146,7 @@ def run(self, job_id):

walltime = Timer()
localtime = Timer()
walltime.start()

store_coords = False # we don't need a separate coords dataset for FV3
for cycle in self.my_cycles(job_id):
Expand All @@ -156,17 +166,21 @@ def run(self, job_id):
def move_single_dataset(self, job_id, cycle):
"""Store a single cycle to zarr"""

replay = FV3Dataset(path_in=self.cached_path, config_filename=self.config_filename)
replay = self.Dataset(path_in=self.cached_path, config_filename=self.config_filename)
xds = replay.open_dataset(cycle, **self.ods_kwargs(job_id))

index = list(self.xtime.values).index(xds.time.values[0])
tslice = slice(index, index+2)
tslice = slice(index, index+self.n_steps_per_cycle)

# subset the variables here in order to remove extraneous dimensions
if len(replay.data_vars)>0:
data_vars = [x for x in replay.data_vars if x in xds]
xds = xds[data_vars]

spatial_region = {k: slice(None, None) for k in xds.dims if k != "time"}
region = {
"time": tslice,
"pfull": slice(None, None),
"grid_yt": slice(None, None),
"grid_xt": slice(None, None),
**spatial_region,
}
region = {k : v for k,v in region.items() if k in xds.dims}

Expand All @@ -187,7 +201,7 @@ def store_container(self):

localtime = Timer()

replay = FV3Dataset(path_in=self.cached_path, config_filename=self.config_filename)
replay = self.Dataset(path_in=self.cached_path, config_filename=self.config_filename)

localtime.start("Reading Single Dataset")
cycle = self.my_cycles(0)[0]
Expand Down Expand Up @@ -241,7 +255,7 @@ def store_container(self):


@staticmethod
def cached_path(dates, forecast_hours, file_prefixes):
def fv3_path(dates, forecast_hours, file_prefixes):
"""This is passed to :class:`FV3Dataset`, and it generates the paths to read from for the given inputs
Note:
Expand Down Expand Up @@ -286,6 +300,46 @@ def cached_path(dates, forecast_hours, file_prefixes):
files.append(this_file)
return [join(upper, this_file) for this_file in files]

@staticmethod
def mom6_path(dates, forecast_hours, file_prefixes):
"""This is passed to :class:`MOM6Dataset`, and it generates the paths to read from for the given inputs
Note:
With simplecache it's not clear where the cached files go, and they
do not clear until the process is done running (maybe?) which can file up a filesystem easily.
So we use filecache instead.
Args:
dates (Iterable[datetime]): with the DA cycles to read from
forecast_hours (List[int]): with the forecast hours to grab ... note here we assume [0, 3] ... but don't enforce it here
file_prefixes (List[str]): e.g. ["sfg_", "bfg_"]
Returns:
list_of_paths (List[str]): see example
Example:
>>> mover = ReplayMover( ... )
>>> mover.cached_path(
dates=[datetime(1994,1,1,0), datetime(1994,1,1,6)],
forecast_hours=[0],
file_prefixes=["ocn_"],
)
['filecache::s3://noaa-ufs-gefsv13replay-pds/1deg/1994/01/1994010100/ocn_1994_01_01_00.nc',
'filecache::s3://noaa-ufs-gefsv13replay-pds/1deg/1994/01/1994010106/ocn_1994_01_01_06.nc']
"""
upper = "filecache::s3://noaa-ufs-gefsv13replay-pds/1deg"
dates = [dates] if not isinstance(dates, Iterable) else dates

files = []
for date in dates:
this_dir = f"{date.year:04d}/{date.month:02d}/{date.year:04d}{date.month:02d}{date.day:02d}{date.hour:02d}"
for fp in file_prefixes:
for fhr in forecast_hours:
this_date = cycle+timedelta(hours=fhr)
this_file = f"{this_dir}/{fp}{this_date.year:04d}_{this_date.month:02d}_{this_date.day:02d}_{this_date.hour:02d}.nc"
files.append(this_file)
return [join(upper, this_file) for this_file in files]


@staticmethod
def npdate2datetime(npdate):
Expand Down Expand Up @@ -320,18 +374,17 @@ def remove_time(xds):
del single[key]
return single

@staticmethod
def add_time_coords(xds, time2cftime):
def add_time_coords(self, xds, time2cftime):
"""add ftime and cftime to container
This is a bit dirty, passing a static method from another class as a function arg... so it goes.
"""

iau_offset = -6 if self.component == "fv3" else 0
repeater = tuple(np.timedelta64(timedelta(hours=fhr-iau_offset)) for fhr in self.forecast_hours)
ftime = np.array(
[
(np.timedelta64(timedelta(hours=-6)), np.timedelta64(timedelta(hours=-3)))
for _ in range(len(xds["time"])//2)
]
).flatten()
[repeater for _ in range(len(xds["time"])//len(repeater))]
).flatten()

xds["ftime"] = xr.DataArray(
ftime,
Expand Down Expand Up @@ -359,20 +412,6 @@ def add_time_coords(xds, time2cftime):

class ReplayMoverQuarterDegree(ReplayMover1Degree):

@property
def xcycles(self):
"""These are the DA cycle timestamps, which are every 6 hours. There is one s3 directory per cycle for replay."""
cycles = pd.date_range(start="1994-01-01", end="2023-10-13T06:00:00", freq="6h")
return xr.DataArray(cycles, coords={"cycles": cycles}, dims="cycles")


@property
def xtime(self):
"""These are the time stamps of the resulting dataset, assuming we are grabbing fhr00 and fhr03"""
time = pd.date_range(start="1994-01-01", end="2023-10-13T09:00:00", freq="3h")
iau_time = time - timedelta(hours=6)
return xr.DataArray(iau_time, coords={"time": iau_time}, dims="time", attrs={"long_name": "time", "axis": "T"})

def __init__(
self,
n_jobs,
Expand All @@ -390,7 +429,7 @@ def __init__(
)

@staticmethod
def cached_path(dates, forecast_hours, file_prefixes):
def fv3_path(dates, forecast_hours, file_prefixes):
upper = "filecache::s3://noaa-ufs-gefsv13replay-pds"
dates = [dates] if not isinstance(dates, Iterable) else dates

Expand All @@ -402,3 +441,18 @@ def cached_path(dates, forecast_hours, file_prefixes):
this_file = join(this_dir, f"{fp}{date.year:04d}{date.month:02d}{date.day:02d}{date.hour:02d}_fhr{fhr:02d}_control")
files.append(this_file)
return [join(upper, this_file) for this_file in files]

@staticmethod
def mom6_path(dates, forecast_hours, file_prefixes):
upper = "filecache::s3://noaa-ufs-gefsv13replay-pds"
dates = [dates] if not isinstance(dates, Iterable) else dates

files = []
for date in dates:
this_dir = f"{date.year:04d}/{date.month:02d}/{date.year:04d}{date.month:02d}{date.day:02d}{date.hour:02d}"
for fp in file_prefixes:
for fhr in forecast_hours:
this_date = cycle+timedelta(hours=fhr)
this_file = f"{this_dir}/{fp}{this_date.year:04d}_{this_date.month:02d}_{this_date.day:02d}_{this_date.hour:02d}.nc"
files.append(this_file)
return [join(upper, this_file) for this_file in files]

0 comments on commit 977c733

Please sign in to comment.