Skip to content

Commit

Permalink
added log pass through and modified logic for time correction
Browse files Browse the repository at this point in the history
  • Loading branch information
danellecline committed Jul 3, 2024
1 parent dfd63c0 commit d37d888
Show file tree
Hide file tree
Showing 14 changed files with 251 additions and 1,376 deletions.
128 changes: 65 additions & 63 deletions pbp/json_generator/corrector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,25 @@

import datetime
from datetime import timedelta
from loguru import logger as log
import numpy as np
import pandas as pd
from pathlib import Path
import shutil
import tempfile
import json

from pbp.json_generator.utils import InstrumentType


class MetadataCorrector:
def __init__(
self,
log, # : loguru.Logger,
correct_df: pd.DataFrame,
json_path_out: str,
day: datetime,
variable_duration: bool = False,
instrument_type: InstrumentType,
time_correct: bool = False,
seconds_per_file: float = -1,
):
"""
Expand All @@ -30,97 +33,100 @@ def __init__(
The path to save the corrected metadata json file
:param day:
The day to correct
:param variable_duration:
True if the files vary in duration
:param instrument_type:
The type of instrument the metadata is coming from: NRS, ICLISTEN, SOUNDTRAP
:param time_correct:
True if need to adjust the time stamp based only supported for ICLISTEN
:param seconds_per_file:
The number of seconds in each file; not used for sound trap files
(optional) number of seconds in each file
"""
self.instrument_type = instrument_type
self.correct_df = correct_df
self.json_base_dir = json_path_out
self.day = day
self.variable_duration = variable_duration
self.log = log
self.time_correct = time_correct
self.seconds_per_file = seconds_per_file
self.files_per_day = None
# Must have seconds per file for ICLISTEN to correct for drift conditional check
if self.instrument_type == InstrumentType.ICLISTEN:
if self.seconds_per_file == -1:
self.log.exception("No seconds per file provided for ICLISTEN")
return
self.files_per_day = int(86400 / self.seconds_per_file)
self.log.debug(
f"Metadata corrector for {self.instrument_type} with {self.seconds_per_file} seconds per file"
)

def run(self):
"""Run the corrector"""

try:
if self.variable_duration:
files_per_day = None
# Filter the metadata to the day, starting 6 hours before the day starts to capture overlap
df = self.correct_df[
(self.correct_df["start"] >= self.day - timedelta(hours=6))
& (self.correct_df["start"] < self.day + timedelta(days=1))
]
else: # files are fixed, but may be missing or incomplete if the system was down
files_per_day = int(86400 / self.seconds_per_file)
# Filter the metadata to the day, starting 1 file before the day starts to capture overlap
df = self.correct_df[
(
(self.correct_df["start"] >= self.day)
& (self.correct_df["start"] < self.day + timedelta(days=1))
)
| (
(self.correct_df["end"] >= self.day)
& (self.correct_df["start"] < self.day)
)
]

log.debug(f"Creating metadata for day {self.day}")
# Filter the metadata to the day capturing the files both immediately
# before and after the day
df = self.correct_df[
(
(self.correct_df["start"] >= self.day)
& (self.correct_df["end"] < self.day + timedelta(days=1))
)
| (
(self.correct_df["end"] > self.day)
& (self.correct_df["start"] <= self.day)
)
]

self.log.debug(f"Creating metadata for day {self.day} from {len(df)} files...")

if len(df) == 0:
log.warning(f"No metadata found for day {self.day}")
self.log.warning(f"No metadata found for day {self.day}")
return

# convert the start and end times to datetime
self.log.info(f'{df.iloc[0]["start"]}')
df = df.copy()
self.log.info(f'{df.iloc[0]["start"]}')

df["start"] = pd.to_datetime(df["start"])
df["end"] = pd.to_datetime(df["end"])

self.log.info(f'====> {len(df)}')
# get the file list that covers the requested day
log.info(
f'Found {len(df)} files from day {self.day}, starting {df.iloc[0]["start"]} ending {df.iloc[-1]["end"]}'
)
# self.log.info(
# f'Found {len(df)} files from day {self.day}, starting {df.iloc[0]["start"]} ending {df.iloc[-1]["end"]}'
# )

# if there are no files, then return
if len(df) == 0:
log.warning(f"No files found for {self.day}")
self.log.warning(f"No files found for {self.day}")
return

day_process = df

if self.variable_duration:
log.info(f"Files for {self.day} are variable. Skipping duration check")
for index, row in day_process.iterrows():
log.debug(f'File {row["uri"]} duration {row["duration_secs"]} ')
else:
for index, row in day_process.iterrows():
# if the duration_secs is not seconds per file, then the file is not complete
if row["duration_secs"] != self.seconds_per_file:
log.warning(
f'File {row["duration_secs"]} != {self.seconds_per_file}. File is not complete'
)
continue
for index, row in day_process.iterrows():
self.log.debug(f'File {row["uri"]} duration {row["duration_secs"]} ')
if (
self.seconds_per_file > 0
and row["duration_secs"] != self.seconds_per_file
):
self.log.warning(
f'File {row["duration_secs"]} != {self.seconds_per_file}. File is not complete'
)

# check whether there is a discrepancy between the number of seconds in the file and the number
# of seconds in the metadata. If there is a discrepancy, then correct the metadata
# This is only reliable for full days of data contained in complete files for IcListen data
day_process["jitter_secs"] = 0

if self.variable_duration or (
len(day_process) == files_per_day + 1
if self.instrument_type == InstrumentType.ICLISTEN and (
len(day_process) == self.files_per_day + 1
and len(day_process["duration_secs"].unique()) == 1
and day_process.iloc[0]["duration_secs"] == self.seconds_per_file
):
# check whether the differences are all the same
if (
len(day_process["start"].diff().unique()) == 1
or self.variable_duration
):
log.warning(f"No drift for {self.day}")
if len(day_process["start"].diff().unique()) == 1 or self.time_correct:
self.log.warning(f"No drift for {self.day}")
else:
log.info(f"Correcting drift for {self.day}")
self.log.info(f"Correcting drift for {self.day}")

# correct the metadata
jitter = 0
Expand All @@ -140,7 +146,7 @@ def run(self):
day_process.loc[index, "end"] = end
day_process.loc[index, "jitter_secs"] = jitter

if self.variable_duration:
if self.time_correct:
end = row.end
else:
end = start + timedelta(seconds=self.seconds_per_file)
Expand All @@ -160,20 +166,16 @@ def run(self):

# save explicitly as UTC by setting the timezone in the start and end times
day_process["start"] = day_process["start"].dt.tz_localize("UTC")
# Note: as day_process["end"] coming from upstream seems to become incorrect
# (except for the first entry in the JSON), that is, with `end` becoming equal to `start`,
# directly assigning it here based on day_process["start"]:
day_process["end"] = day_process["start"] + timedelta(
seconds=self.seconds_per_file
)
# TODO(Danelle): review/confirm the above.

self.save_day(self.day, day_process)

except Exception as e:
log.exception(f"Error correcting metadata for {self.day}. {e}")
self.log.exception(f"Error correcting metadata for {self.day}. {e}")
finally:
log.debug(
self.log.debug(
f"Done correcting metadata for {self.day}. Saved to {self.json_base_dir}"
)

Expand All @@ -187,7 +189,7 @@ def no_jitter(self, day: datetime, day_process: pd.DataFrame) -> pd.DataFrame:
:return:
The corrected dataframe
"""
log.warning(
self.log.warning(
f"Cannot correct {self.day}. Using file start times as is, setting jitter to 0 and using "
f"calculated end times."
)
Expand Down Expand Up @@ -242,7 +244,7 @@ def save_day(self, day: datetime, day_process: pd.DataFrame, prefix: str = None)
date_format="iso",
date_unit="s",
)
log.debug(f"Wrote {temp_metadata.as_posix()}")
self.log.debug(f"Wrote {temp_metadata.as_posix()}")

# read the file back in using records format with json
with open(temp_metadata.as_posix(), "r") as f:
Expand All @@ -256,4 +258,4 @@ def save_day(self, day: datetime, day_process: pd.DataFrame, prefix: str = None)
output_path = Path(self.json_base_dir, str(day.year))
output_path.mkdir(parents=True, exist_ok=True)
shutil.copy2(temp_metadata.as_posix(), output_path)
log.info(f"Wrote {output_path}/{temp_metadata.name}")
self.log.info(f"Wrote {output_path}/{temp_metadata.name}")
6 changes: 4 additions & 2 deletions pbp/json_generator/gen_abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
class MetadataGeneratorAbstract(object):
def __init__(
self,
log, # : loguru.Logger,
audio_loc: str,
json_base_dir: str,
prefix: List[str],
Expand Down Expand Up @@ -41,6 +42,7 @@ def __init__(
self.start = start
self.end = end
self.prefix = prefix
self._log = log
self._seconds_per_file = None if seconds_per_file == 0 else seconds_per_file
except Exception as e:
raise e
Expand All @@ -50,8 +52,8 @@ def seconds_per_file(self):
return self._seconds_per_file

@property
def correct_df(self):
return self.df
def log(self):
return self._log

# abstract run method
def run(self):
Expand Down
40 changes: 24 additions & 16 deletions pbp/json_generator/gen_iclisten.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ class IcListenMetadataGenerator(MetadataGeneratorAbstract):

def __init__(
self,
log, # : loguru.Logger,
uri: str,
json_base_dir: str,
start: datetime,
end: datetime,
prefix: List[str],
seconds_per_file: float = 300.0,
seconds_per_file: float = 600.0,
):
"""
Captures ICListen wav metadata in a pandas dataframe from either a local directory or S3 bucket.
Expand All @@ -46,24 +47,24 @@ def __init__(
The number of seconds per file expected in a wav file to check for missing data. If 0, then no check is done.
:return:
"""
super().__init__(uri, json_base_dir, prefix, start, end, seconds_per_file)
super().__init__(log, uri, json_base_dir, prefix, start, end, seconds_per_file)
self.log_prefix = f"{self.__class__.__name__} {start:%Y%m%d}"

def run(self):
log.info(f"Generating metadata for {self.start} to {self.end}...")
self.log.info(f"Generating metadata for {self.start} to {self.end}...")

bucket_name, prefix, scheme = utils.parse_s3_or_gcp_url(self.audio_loc)

# gs is not supported for icListen
if scheme == "gs":
log.error(f"{self.log_prefix} GS is not supported for icListen audio files")
self.log.error(f"{self.log_prefix} GS is not supported for icListen audio files")
return

# Run for each day in the range
for day in pd.date_range(self.start, self.end, freq="D"):
try:
self.df = None
log.info(
self.log.info(
f"{self.log_prefix} Searching in {self.audio_loc}/*.wav for wav files that match the search pattern {self.prefix}* ..."
)

Expand Down Expand Up @@ -96,20 +97,20 @@ def check_file(f: str, f_start_dt: datetime, f_end_dt: datetime):
)

if f_start_dt <= f_path_dt <= f_end_dt:
log.info(
self.log.info(
f"{self.log_prefix} Found {f_path.name} to process"
)
wav_files.append(IcListenWavFile(f, f_path_dt))
wav_files.append(IcListenWavFile(self.log, f, f_path_dt))
f_wav_dt = f_path_dt
except ValueError:
log.error(
self.log.error(
f"{self.log_prefix} Could not parse {f_path.name}"
)
return None

return f_wav_dt

# Set the start and end dates to 30 minutes before and after the start and end dates
# Set the start and end dates to 1 hour before and after the start and end dates
start_dt = day - timedelta(hours=1)
end_dt = day + timedelta(days=1)

Expand All @@ -133,14 +134,14 @@ def check_file(f: str, f_start_dt: datetime, f_end_dt: datetime):

operation_parameters = {"Bucket": bucket, "Prefix": prefix}
page_iterator = paginator.paginate(**operation_parameters)
log.info(
self.log.info(
f"{self.log_prefix} Searching in bucket: {bucket} prefix: {prefix}"
)
# list the objects in the bucket
# loop through the objects and check if they match the search pattern
for page in page_iterator:
if "Contents" not in page:
log.info(f"{self.log_prefix} No data found in {bucket}")
self.log.info(f"{self.log_prefix} No data found in {bucket}")
break

for obj in page["Contents"]:
Expand All @@ -155,15 +156,15 @@ def check_file(f: str, f_start_dt: datetime, f_end_dt: datetime):
if wav_dt < start_dt_hour:
break

log.info(
self.log.info(
f"{self.log_prefix} Found {len(wav_files)} files to process that cover the period {start_dt} - {end_dt}"
)

# sort the files by start time
wav_files.sort(key=lambda x: x.start)

# create a dataframe from the wav files
log.info(
self.log.info(
f"{self.log_prefix} Creating dataframe from {len(wav_files)} files spanning {wav_files[0].start} to {wav_files[-1].start}..."
)
for wc in wav_files:
Expand All @@ -172,14 +173,20 @@ def check_file(f: str, f_start_dt: datetime, f_end_dt: datetime):
# concatenate the metadata to the dataframe
self.df = pd.concat([self.df, df_wav], axis=0)

log.debug(f"{self.log_prefix} Running metadata corrector for {day}")
self.log.debug(f"{self.log_prefix} Running metadata corrector for {day}")
corrector = MetadataCorrector(
self.df, self.json_base_dir, day, False, 600.0
self.log,
self.df,
self.json_base_dir,
day,
utils.InstrumentType.NRS,
True,
self.seconds_per_file,
)
corrector.run()

except Exception as ex:
log.exception(str(ex))
self.log.exception(str(ex))


if __name__ == "__main__":
Expand All @@ -203,6 +210,7 @@ def check_file(f: str, f_start_dt: datetime, f_end_dt: datetime):

# If only running one day, use a single generator
generator = IcListenMetadataGenerator(
log,
uri="s3://pacific-sound-256khz",
json_base_dir=json_dir.as_posix(),
prefix=["MARS"],
Expand Down
Loading

0 comments on commit d37d888

Please sign in to comment.