diff --git a/examples/replay/config-0.25-degree.yaml b/examples/replay/config-0.25-degree.yaml index dd18edc..bf2af5b 100644 --- a/examples/replay/config-0.25-degree.yaml +++ b/examples/replay/config-0.25-degree.yaml @@ -7,6 +7,16 @@ FV3Dataset: coords_path_out: "gcs://noaa-ufs-gefsv13replay/ufs-hr1/0.25-degree/coordinates/zarr/" forecast_hours : [0, 3] + cycles: + start : "1994-01-01T00" + end : "2023-10-13T06" + freq : "6h" + + time: + start : "1993-12-31T18" + end : "2023-10-13T03" + freq : "3h" + chunks_out: time : 1 pfull : 1 diff --git a/examples/replay/config-1.00-degree.yaml b/examples/replay/config-1.00-degree.yaml index 75814e0..7ea7e0c 100644 --- a/examples/replay/config-1.00-degree.yaml +++ b/examples/replay/config-1.00-degree.yaml @@ -7,6 +7,16 @@ FV3Dataset: coords_path_out: "gcs://noaa-ufs-gefsv13replay/ufs-hr1/1.00-degree/coordinates/zarr/" forecast_hours : [0, 3] + cycles: + start : "1994-01-01T00" + end : "1999-06-13T06" + freq : "6h" + + time: + start : "1993-12-31T18" + end : "1999-06-13T03" + freq : "3h" + chunks_in: # estimated 37MB per chunk (the full 3D field) time : 1 diff --git a/examples/replay/move_one_degree.py b/examples/replay/move_one_degree.py index 5f18e0f..4bf977d 100644 --- a/examples/replay/move_one_degree.py +++ b/examples/replay/move_one_degree.py @@ -20,6 +20,7 @@ def submit_slurm_mover(job_id, mover): f" config_filename='{mover.config_filename}',\n"+\ f" storage_options={mover.storage_options},\n"+\ f" main_cache_path='{mover.main_cache_path}',\n"+\ + f" component='{mover.component}',\n"+\ f")\n"+\ f"mover.run({job_id})" @@ -62,6 +63,7 @@ def submit_slurm_mover(job_id, mover): config_filename="config-1.00-degree.yaml", storage_options={"token": "/contrib/Tim.Smith/.gcs/replay-service-account.json"}, main_cache_path="/lustre/Tim.Smith/tmp-replay/1.00-degree", + component="fv3", ) localtime.start("Make and Store Container Dataset") diff --git a/examples/replay/move_quarter_degree.py b/examples/replay/move_quarter_degree.py index 62e1e27..813996b 100644 --- a/examples/replay/move_quarter_degree.py +++ b/examples/replay/move_quarter_degree.py @@ -20,6 +20,7 @@ def submit_slurm_mover(job_id, mover): f" config_filename='{mover.config_filename}',\n"+\ f" storage_options={mover.storage_options},\n"+\ f" main_cache_path='{mover.main_cache_path}',\n"+\ + f" component='{mover.component}',\n"+\ f")\n"+\ f"mover.run({job_id})" @@ -62,6 +63,7 @@ def submit_slurm_mover(job_id, mover): config_filename="config-0.25-degree.yaml", storage_options={"token": "/contrib/Tim.Smith/.gcs/replay-service-account.json"}, main_cache_path="/lustre/Tim.Smith/tmp-replay/0.25-degree", + component="fv3", ) localtime.start("Make and Store Container Dataset") diff --git a/examples/replay/replay_mover.py b/examples/replay/replay_mover.py index 0de9411..68e04e6 100644 --- a/examples/replay/replay_mover.py +++ b/examples/replay/replay_mover.py @@ -12,19 +12,9 @@ import dask.array as darray from zarr import NestedDirectoryStore -from ufs2arco import FV3Dataset, Timer +from ufs2arco import FV3Dataset, MOM6Dataset, CICE6Dataset, Timer class ReplayMover1Degree(): - """ - - Note: - Currently this makes the unnecessary but easy-to-implement assumption that we want forecast_hours 0 & 3. - This assumption is key to the hard coded end date and timedelta used to make :attr:`xtime`. - It should also be confirmed that this assumption does not impact the "ftime" variable, that is right now - being created in the container dataset. The values should be overwritten when the data are actually - generated, but who knows. - """ - n_jobs = None @@ -33,22 +23,18 @@ class ReplayMover1Degree(): @property def xcycles(self): - """These are the DA cycle timestamps, which are every 6 hours. There is one s3 directory per cycle for replay.""" - cycles = pd.date_range(start="1994-01-01", end="1999-06-13T06:00:00", freq="6h") - return xr.DataArray(cycles, coords={"cycles": cycles}, dims="cycles") + return xr.DataArray(self.cycles, coords={"cycles": self.cycles}, dims="cycles") @property def xtime(self): - """These are the time stamps of the resulting dataset, assuming we are grabbing fhr00 and fhr03. + """These are the time stamps of the resulting dataset. This was created by processing a few DA cycles with the desired forecast hours, and figuring out operations were needed to map from a list of all DA cycles to the resulting ``"time"`` array in the final dataset. """ - time = pd.date_range(start="1994-01-01", end="1999-06-13T09:00:00", freq="3h") - iau_time = time - timedelta(hours=6) - return xr.DataArray(iau_time, coords={"time": iau_time}, dims="time", attrs={"long_name": "time", "axis": "T"}) + return xr.DataArray(self.time, coords={"time": self.time}, dims="time", attrs={"long_name": "time", "axis": "T"}) @property @@ -111,6 +97,7 @@ def __init__( ): self.n_jobs = n_jobs self.config_filename = config_filename + self.component = component self.storage_options = storage_options if storage_options is not None else dict() self.main_cache_path = main_cache_path @@ -120,8 +107,30 @@ def __init__( name = f"{component.upper()}Dataset" # i.e., FV3Dataset ... maybe an unnecessary generalization at this stage self.forecast_hours = config[name]["forecast_hours"] self.file_prefixes = config[name]["file_prefixes"] - - assert tuple(self.forecast_hours) == (0, 3) + self.cycles = pd.date_range(**config[name]["cycles"]) + self.time = pd.date_range(**config[name]["time"]) + + if component.lower() == "fv3": + self.Dataset = FV3Dataset + self.cached_path = self.fv3_path + elif component.lower() == "mom6": + self.Dataset = MOM6Dataset + self.cached_path = self.mom6_path + elif component.lower() == "cice6": + self.Dataset = CICE6Dataset + self.cached_path = self.cice6_path + + # for move_single_dataset, we have to figure out how many resulting timestamps we have + # within a single DA cycle + try: + assert "freq" in config[name]["cycles"].keys() + assert "freq" in config[name]["time"].keys() + + except: + raise KeyError("ReplayMover.__init__: we need 'freq' inside the config 'cycles' and 'time' sections") + delta_t_cycle = pd.Timedelta(config[name]["cycles"]["freq"]) + delta_t_time = pd.Timedelta(config[name]["time"]["freq"]) + self.n_steps_per_cycle = delta_t_cycle // delta_t_time def run(self, job_id): @@ -137,6 +146,7 @@ def run(self, job_id): walltime = Timer() localtime = Timer() + walltime.start() store_coords = False # we don't need a separate coords dataset for FV3 for cycle in self.my_cycles(job_id): @@ -156,17 +166,21 @@ def run(self, job_id): def move_single_dataset(self, job_id, cycle): """Store a single cycle to zarr""" - replay = FV3Dataset(path_in=self.cached_path, config_filename=self.config_filename) + replay = self.Dataset(path_in=self.cached_path, config_filename=self.config_filename) xds = replay.open_dataset(cycle, **self.ods_kwargs(job_id)) index = list(self.xtime.values).index(xds.time.values[0]) - tslice = slice(index, index+2) + tslice = slice(index, index+self.n_steps_per_cycle) + + # subset the variables here in order to remove extraneous dimensions + if len(replay.data_vars)>0: + data_vars = [x for x in replay.data_vars if x in xds] + xds = xds[data_vars] + spatial_region = {k: slice(None, None) for k in xds.dims if k != "time"} region = { "time": tslice, - "pfull": slice(None, None), - "grid_yt": slice(None, None), - "grid_xt": slice(None, None), + **spatial_region, } region = {k : v for k,v in region.items() if k in xds.dims} @@ -187,7 +201,7 @@ def store_container(self): localtime = Timer() - replay = FV3Dataset(path_in=self.cached_path, config_filename=self.config_filename) + replay = self.Dataset(path_in=self.cached_path, config_filename=self.config_filename) localtime.start("Reading Single Dataset") cycle = self.my_cycles(0)[0] @@ -241,7 +255,7 @@ def store_container(self): @staticmethod - def cached_path(dates, forecast_hours, file_prefixes): + def fv3_path(dates, forecast_hours, file_prefixes): """This is passed to :class:`FV3Dataset`, and it generates the paths to read from for the given inputs Note: @@ -286,6 +300,46 @@ def cached_path(dates, forecast_hours, file_prefixes): files.append(this_file) return [join(upper, this_file) for this_file in files] + @staticmethod + def mom6_path(dates, forecast_hours, file_prefixes): + """This is passed to :class:`MOM6Dataset`, and it generates the paths to read from for the given inputs + + Note: + With simplecache it's not clear where the cached files go, and they + do not clear until the process is done running (maybe?) which can file up a filesystem easily. + So we use filecache instead. + + Args: + dates (Iterable[datetime]): with the DA cycles to read from + forecast_hours (List[int]): with the forecast hours to grab ... note here we assume [0, 3] ... but don't enforce it here + file_prefixes (List[str]): e.g. ["sfg_", "bfg_"] + + Returns: + list_of_paths (List[str]): see example + + Example: + >>> mover = ReplayMover( ... ) + >>> mover.cached_path( + dates=[datetime(1994,1,1,0), datetime(1994,1,1,6)], + forecast_hours=[0], + file_prefixes=["ocn_"], + ) + ['filecache::s3://noaa-ufs-gefsv13replay-pds/1deg/1994/01/1994010100/ocn_1994_01_01_00.nc', + 'filecache::s3://noaa-ufs-gefsv13replay-pds/1deg/1994/01/1994010106/ocn_1994_01_01_06.nc'] + """ + upper = "filecache::s3://noaa-ufs-gefsv13replay-pds/1deg" + dates = [dates] if not isinstance(dates, Iterable) else dates + + files = [] + for date in dates: + this_dir = f"{date.year:04d}/{date.month:02d}/{date.year:04d}{date.month:02d}{date.day:02d}{date.hour:02d}" + for fp in file_prefixes: + for fhr in forecast_hours: + this_date = cycle+timedelta(hours=fhr) + this_file = f"{this_dir}/{fp}{this_date.year:04d}_{this_date.month:02d}_{this_date.day:02d}_{this_date.hour:02d}.nc" + files.append(this_file) + return [join(upper, this_file) for this_file in files] + @staticmethod def npdate2datetime(npdate): @@ -320,18 +374,17 @@ def remove_time(xds): del single[key] return single - @staticmethod - def add_time_coords(xds, time2cftime): + def add_time_coords(self, xds, time2cftime): """add ftime and cftime to container This is a bit dirty, passing a static method from another class as a function arg... so it goes. """ + + iau_offset = -6 if self.component == "fv3" else 0 + repeater = tuple(np.timedelta64(timedelta(hours=fhr-iau_offset)) for fhr in self.forecast_hours) ftime = np.array( - [ - (np.timedelta64(timedelta(hours=-6)), np.timedelta64(timedelta(hours=-3))) - for _ in range(len(xds["time"])//2) - ] - ).flatten() + [repeater for _ in range(len(xds["time"])//len(repeater))] + ).flatten() xds["ftime"] = xr.DataArray( ftime, @@ -359,20 +412,6 @@ def add_time_coords(xds, time2cftime): class ReplayMoverQuarterDegree(ReplayMover1Degree): - @property - def xcycles(self): - """These are the DA cycle timestamps, which are every 6 hours. There is one s3 directory per cycle for replay.""" - cycles = pd.date_range(start="1994-01-01", end="2023-10-13T06:00:00", freq="6h") - return xr.DataArray(cycles, coords={"cycles": cycles}, dims="cycles") - - - @property - def xtime(self): - """These are the time stamps of the resulting dataset, assuming we are grabbing fhr00 and fhr03""" - time = pd.date_range(start="1994-01-01", end="2023-10-13T09:00:00", freq="3h") - iau_time = time - timedelta(hours=6) - return xr.DataArray(iau_time, coords={"time": iau_time}, dims="time", attrs={"long_name": "time", "axis": "T"}) - def __init__( self, n_jobs, @@ -390,7 +429,7 @@ def __init__( ) @staticmethod - def cached_path(dates, forecast_hours, file_prefixes): + def fv3_path(dates, forecast_hours, file_prefixes): upper = "filecache::s3://noaa-ufs-gefsv13replay-pds" dates = [dates] if not isinstance(dates, Iterable) else dates @@ -402,3 +441,18 @@ def cached_path(dates, forecast_hours, file_prefixes): this_file = join(this_dir, f"{fp}{date.year:04d}{date.month:02d}{date.day:02d}{date.hour:02d}_fhr{fhr:02d}_control") files.append(this_file) return [join(upper, this_file) for this_file in files] + + @staticmethod + def mom6_path(dates, forecast_hours, file_prefixes): + upper = "filecache::s3://noaa-ufs-gefsv13replay-pds" + dates = [dates] if not isinstance(dates, Iterable) else dates + + files = [] + for date in dates: + this_dir = f"{date.year:04d}/{date.month:02d}/{date.year:04d}{date.month:02d}{date.day:02d}{date.hour:02d}" + for fp in file_prefixes: + for fhr in forecast_hours: + this_date = cycle+timedelta(hours=fhr) + this_file = f"{this_dir}/{fp}{this_date.year:04d}_{this_date.month:02d}_{this_date.day:02d}_{this_date.hour:02d}.nc" + files.append(this_file) + return [join(upper, this_file) for this_file in files]