From 1c9304cc536c115d575c1073d45886b8a71a0739 Mon Sep 17 00:00:00 2001 From: danellecline Date: Tue, 14 Nov 2023 17:41:39 -0800 Subject: [PATCH 01/10] added exceptions which are used in the json gen code --- src/logging_helper.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/logging_helper.py b/src/logging_helper.py index b88450e..614e13f 100644 --- a/src/logging_helper.py +++ b/src/logging_helper.py @@ -66,6 +66,8 @@ def warn(self, s: str): def error(self, s: str): self.logger.error(s) + def exception(self, s: str): + self.logger.exception(s) def create_logger( log_filename_and_level: Optional[Tuple[str, int]] = None, From f2ff9163e8ffb83bb75d12faa2bd9d8b5a006929 Mon Sep 17 00:00:00 2001 From: danellecline Date: Tue, 14 Nov 2023 17:43:40 -0800 Subject: [PATCH 02/10] initial check-in of json generator code. not functional yet, just some refactoring for styling, removing some MBARI specific code, and replacing loggins with the PbpLogger class --- src/metadata/generator/__init__.py | 2 + src/metadata/generator/gen_abstract.py | 95 +++++++++ src/metadata/generator/gen_iclisten.py | 154 ++++++++++++++ src/metadata/generator/gen_soundtrap.py | 167 +++++++++++++++ src/metadata/generator/utils.py | 15 ++ src/metadata/utils/__init__.py | 2 + src/metadata/utils/corrector.py | 263 ++++++++++++++++++++++++ src/metadata/utils/wavfile.py | 141 +++++++++++++ 8 files changed, 839 insertions(+) create mode 100644 src/metadata/generator/__init__.py create mode 100644 src/metadata/generator/gen_abstract.py create mode 100644 src/metadata/generator/gen_iclisten.py create mode 100644 src/metadata/generator/gen_soundtrap.py create mode 100644 src/metadata/generator/utils.py create mode 100644 src/metadata/utils/__init__.py create mode 100644 src/metadata/utils/corrector.py create mode 100755 src/metadata/utils/wavfile.py diff --git a/src/metadata/generator/__init__.py b/src/metadata/generator/__init__.py new file mode 100644 index 0000000..ecdaa7b --- /dev/null +++ b/src/metadata/generator/__init__.py @@ -0,0 +1,2 @@ +from .gen_soundtrap import SoundTrapMetadataGenerator +from .gen_iclisten import IcListenMetadataGenerator diff --git a/src/metadata/generator/gen_abstract.py b/src/metadata/generator/gen_abstract.py new file mode 100644 index 0000000..844a103 --- /dev/null +++ b/src/metadata/generator/gen_abstract.py @@ -0,0 +1,95 @@ +# pypam-based-processing +# Filename: metadata/generator/gen_abstract.py +# Description: Abstract class that captures sound wav metadata + +import pathlib +from datetime import datetime +from pathlib import Path + +import pandas as pd +import logger +import utils as utils +from src.logging_helper import PbpLogger + + +class MetadataGeneratorAbstract(object): + def __init__(self, + pbp_logger: PbpLogger, + wav_loc: str, + metadata_loc: str, + search: [str], + start: datetime, + end: datetime, + seconds_per_file: float = 0.): + """ + Abstract class for capturing sound wav metadata + :param pbp_logger: + The logger + :param wav_loc: + The local directory or S3 bucket that contains the wav files + :param metadata_loc: + The local directory or S3 bucket to store the metadata + :param search: + The search pattern to match the wav files, e.g. 'MARS' + :param start: + The start date to search for wav files + :param end: + The end date to search for wav files + :param seconds_per_file: + The number of seconds per file expected in a wav file to check for missing data. If missing, then no check is done. + :return: + """ + try: + self.wav_loc = wav_loc + self.metadata_path = metadata_loc + self.df = pd.DataFrame() + self.start = start + self.end = end + self.search = search + self.seconds_per_file = None if seconds_per_file == 0 else seconds_per_file + self._log = pbp_logger + self.cache_path = Path(log_dir) / 's3cache' / f'{self.__class__.__name__}' + self.cache_path.mkdir(parents=True, exist_ok=True) + except Exception as e: + self._log.err(f'Could not initialize {self.__class__.__name__} for {start:%Y%m%d}') + raise e + + + + def search(self): + self.log.info( + f'{self.log_prefix} Searching in {self.wav_loc}/*.wav for wav files that match the search pattern {self.search}* ...') + + is_s3 = re.match(r'^s3://', self.wav_loc) + # the bucket name will optionally have a * at the end + # keep only the bucket name before the * + bucket_core = re.sub(r'\*$', '', self.wav_loc) + bucket_core = re.sub(r'^s3://', '', bucket_core) + return bucket_core, is_s3, wav_files + + + + @staticmethod + def raw(path_or_url: str): + w = utils.IcListenWavFile(path_or_url) + + if w.has_exception(): + return None # skip this file + + return w + + @property + def log(self): + return self._log + + @property + def seconds_per_file(self): + return self.seconds_per_file + + @property + def correct_df(self): + return self.df + + # abstract run method + def run(self): + pass diff --git a/src/metadata/generator/gen_iclisten.py b/src/metadata/generator/gen_iclisten.py new file mode 100644 index 0000000..5b60880 --- /dev/null +++ b/src/metadata/generator/gen_iclisten.py @@ -0,0 +1,154 @@ +# pypam-based-processing, Apache License 2.0 +# Filename: metadata/generator/gen_iclisten.py +# Description: Captures ICListen wav metadata in a pandas dataframe from either a local directory or S3 bucket. + +import re +from datetime import timedelta +from datetime import datetime +import boto3 +import numpy as np + +import pandas as pd +from pathlib import Path +from progressbar import progressbar +import metadata.utils as utils +from .gen_abstract import MetadataGeneratorAbstract + + +class IcListenMetadataGenerator(MetadataGeneratorAbstract): + + def __int__( + self, + pbp_logger: PbpLogger, + wav_loc: str, + metadata_loc: str, + search: [str], + start: datetime, + end: datetime, + seconds_per_file: float = 0.): + """ + Captures ICListen wav metadata in a pandas dataframe from either a local directory or S3 bucket. + :param pbp_logger: + The logger + :param wav_loc: + The local directory or S3 bucket that contains the wav files + :param metadata_loc: + The local directory or S3 bucket to store the metadata + :param search: + The search pattern to match the wav files, e.g. 'MARS' + :param start: + The start date to search for wav files + :param end: + The end date to search for wav files + :param seconds_per_file: + The number of seconds per file expected in a wav file to check for missing data. If 0, then no check is done. + :return: + """ + super().__init__(pbp_logger, wav_loc, metadata_loc, search, start, end, seconds_per_file) + self.log_prefix = f'{self.__class__.__name__} {self.start:%Y%m%d}' + + def run(self): + print(f'Generating metadata for {self.start} to {self.end}...') + + # Run for each day in the range + for day in pd.date_range(self.start, self.end, freq='D'): + try: + self.df = None + self.log.info(f'{self.log_prefix} Searching in {self.wav_loc}/*.wav for wav files that match the search pattern {self.search}* ...') + + wav_files = [] + is_s3 = re.match(r'^s3://', self.wav_loc) + # the bucket name will optionally have a * at the end + # keep only the bucket name before the * + bucket_core = re.sub(r'\*$', '', self.wav_loc) + bucket_core = re.sub(r'^s3://', '', bucket_core) + + def check_file(f: str, f_start_dt: datetime, f_end_dt: datetime): + + f_path = Path(f) + wav_dt = None + + for s in self.search: + # see if the file is a regexp match to search + rc = re.search(s, f_path.stem) + + if rc and rc.group(0): + try: + # MARS file date is in the filename MARS_YYYYMMDD_HHMMSS.wav + f_path_dt = datetime.strptime(f_path.stem, f'{s}_%Y%m%d_%H%M%S') + + if f_start_dt <= f_path_dt <= f_end_dt: + wc = utils.IcListenWavFile(f, f_path_dt) + wav_files.append(wc) + wav_dt = f_path_dt + except ValueError: + self.log.error(f'{self.log_prefix} Could not parse {f_path.name}') + return None + + return wav_dt + + if not is_s3: + wav_path = Path(self.wav_loc) + for filename in progressbar(sorted(wav_path.rglob('*.wav')), prefix='Searching : '): + check_file(filename, start_dt, end_dt) + else: + # if the wav_loc is a s3 url, then we need to list the files in buckets that cover the start and end + # dates + client = boto3.client('s3') + + # Set the start and end dates to an hour before and after the start and end dates + start_dt = day - timedelta(hours=1) + end_dt = day + timedelta(days=1) + start_dt_hour = start_dt - timedelta(minutes=30) + end_dt_hour = end_dt + timedelta(minutes=30) + + for day_hour in pd.date_range(start=start_dt, end=end_dt, freq='H'): + + bucket = f'{bucket_core}-{day_hour.year:04d}' + prefix = f'{day_hour.month:02d}/MARS_{day_hour.year:04d}{day_hour.month:02d}{day_hour.day:02d}_{day_hour.hour:02d}' + paginator = client.get_paginator('list_objects') + + operation_parameters = {'Bucket': bucket, 'Prefix': prefix} + page_iterator = paginator.paginate(**operation_parameters) + self.log.info(f'{self.log_prefix} Searching in bucket: {bucket} prefix: {prefix}') + # list the objects in the bucket + # loop through the objects and check if they match the search pattern + for page in page_iterator: + if 'Contents' not in page: + self.log.info(f'{self.log_prefix} No data found in {bucket}') + break + + for obj in page['Contents']: + key = obj['Key'] + wav_dt = check_file(f's3://{bucket}/{key}', start_dt, end_dt) + if wav_dt is None: + continue + if wav_dt > end_dt_hour: + break + if wav_dt < start_dt_hour: + break + self.log.debug(f'{self.log_prefix} Found {wav_dt}') + # num_found += 1 + # if num_found > 100: + # break + + self.log.info(f'{self.log_prefix} Found {len(wav_files)} files to process that cover the period {start_dt} - {end_dt}') + + # sort the files by start time + wav_files.sort(key=lambda x: x.start) + + # create a dataframe from the wav files + self.log.info( + f'{self.log_prefix} Creating dataframe from {len(wav_files)} files spanning {wav_files[0].start} to {wav_files[-1].start}...') + for wc in wav_files: + df_wav = wc.to_df() + + # concatenate the metadata to the dataframe + self.df = pd.concat([self.df, df_wav], axis=0) + + self.log.debug(f'{self.log_prefix} Running metadata corrector for {day}') + corrector = utils.MetadataCorrector(self.log, self.df, self.metadata_path, day, False, 600.) + corrector.run() + + except Exception as ex: + self.log.exception(str(ex)) diff --git a/src/metadata/generator/gen_soundtrap.py b/src/metadata/generator/gen_soundtrap.py new file mode 100644 index 0000000..24ddcd7 --- /dev/null +++ b/src/metadata/generator/gen_soundtrap.py @@ -0,0 +1,167 @@ +# pypam-based-processing +# Filename: metadata/generator/gen_soundtrap.py +# Description: Captures SoundTrap metadata either from a local directory of S3 bucket + +import datetime +import shutil +from datetime import timedelta, datetime +import pandas as pd +from pathlib import Path +import boto3 +import tempfile +import re +from progressbar import progressbar +import utils +from .gen_abstract import MetadataGeneratorAbstract + + +class SoundTrapMetadataGenerator(MetadataGeneratorAbstract): + """ + Captures SoundTrap wav file metadata either from a local directory or S3 bucket. + """ + start = datetime.utcnow() + end = datetime.utcnow() + + def __init__( + self, + log_dir: str, + wav_loc: str, + metadata_loc: str, + search: [str], + start: datetime, + end: datetime): + """ + Captures SoundTrap wav file metadata either from a local directory or S3 bucket. + + :param pbp_logger: + The logger + :param wav_loc: + The local directory or S3 bucket that contains the wav files + :param metadata_loc: + The local directory or S3 bucket to store the metadata + :param search: + The search pattern to match the wav files, e.g. 'MARS' + :param start: + The start date to search for wav files + :param end: + The end date to search for wav files + :param seconds_per_file: + The number of seconds per file expected in a wav file to check for missing data. If missing, then no check is done. + :return: + """ + super().__init__(log_dir, wav_loc, metadata_loc, search, start, end, 0.) + self.start = start + self.end = end + # Add a prefix to the log messages to differentiate between the different metadata generators running by date + # This is useful when running multiple metadata generators in parallel + self.log_prefix = f'{self.__class__.__name__} {self.start:%Y%m%d}' # SoundTrapMetadataGenerator 20210801 + + def run(self): + + try: + self.search() + + def add_file(xml_file: str, wav_file: str): + """ + Check if the xml file is in the cache directory + :param xml_file: + The xml file with the metadata + :param wav_file: + The wav file + :return: + None + """ + + f_path = Path(xml_file) + # see if the file is a regexp match to self.search + for s in self.search: + rc = re.search(s, f_path.stem) + + if rc and rc.group(0): + try: + # If a SoundTrap file, then the date is in the filename XXXX.YYYYMMDDHHMMSS.xml + f_path_dt = datetime.strptime(f_path.stem.split('.')[1], '%y%m%d%H%M%S') + if self.start <= f_path_dt <= self.end: + wav_files.append(utils.SoundTrapWavFile(wav_file, xml_file)) + except ValueError: + self.log.error(f'{self.log_prefix} Could not parse {f_path.name}') + + if not is_s3: + wav_path = Path(self.wav_loc) + for filename in progressbar(sorted(wav_path.rglob('*.xml')), prefix='Searching : '): + wav_path = filename.parent / f'{filename.stem}.wav' + add_file(filename, wav_path) + else: + # if the wav_loc is a s3 url, then we need to list the files in buckets that cover the start and end + # dates + self.log.info(f'{self.log_prefix} Searching between {self.start} and {self.end}') + + client = boto3.client('s3') + + bucket = f'{bucket_core}' + paginator = client.get_paginator('list_objects') + + operation_parameters = {'Bucket': bucket} + page_iterator = paginator.paginate(**operation_parameters) + self.log.info(f'Searching in bucket: {bucket} for .wav and .xml files between {self.start} and {self.end} ') + # list the objects in the bucket + # loop through the objects and check if they match the search pattern + with tempfile.TemporaryDirectory() as tmpdir: + for page in page_iterator: + for obj in page['Contents']: + key = obj['Key'] + + if '.xml' in key: + output_xml = f'{tmpdir}/{key}' + output_wav = f's3://{bucket}/{key}'.replace('log.xml', 'wav') + + # Check if the xml file is in the cache directory + xml_path = Path(self.cache_path, key) + if xml_path.exists(): + shutil.copy(xml_path, output_xml) + else: + # Download the xml file to a temporary directory + self.log.info(f'{self.log_prefix} Downloading {key} ...') + client.download_file(bucket, key, output_xml) + # Save the xml file to the cache directory + self.log.info(f'{self.log_prefix} Saving {key} to {self.cache_path} ...') + shutil.copy(output_xml, self.cache_path) + add_file(xml_path, output_wav) + + self.log.info(f'{self.log_prefix} Found {len(wav_files)} files to process that cover the period {self.start} - {self.end}') + + if len(wav_files) == 0: + return + + # sort the files by start time + wav_files.sort(key=lambda x: x.start) + + # create a dataframe from the wav files + self.log.info(f'{self.log_prefix} Creating dataframe from {len(wav_files)} files spanning {wav_files[0].start} to {wav_files[-1].start}...') + for wc in wav_files: + df_wav = wc.to_df() + + # concatenate the metadata to the dataframe + self.df = pd.concat([self.df, df_wav], axis=0) + + # drop any rows with duplicate uris, keeping the first + self.df = self.df.drop_duplicates(subset=['uri'], keep='first') + + except Exception as ex: + self.log.exception(str(ex)) + finally: + days = (self.end - self.start).days + 1 + + if len(self.df) == 0: + self.log.info(f'{self.log_prefix} No data found between {self.start} and {self.end}') + return + + # Correct the metadata for each day + for day in range(days): + day_start = self.start + timedelta(days=day) + self.log.debug(f'{self.log_prefix} Running metadata corrector for {day_start}') + soundtrap = True + corrector = utils.MetadataCorrector(self.log, self.df, self.metadata_path, day_start, soundtrap, 0) + corrector.run() + + diff --git a/src/metadata/generator/utils.py b/src/metadata/generator/utils.py new file mode 100644 index 0000000..04f0982 --- /dev/null +++ b/src/metadata/generator/utils.py @@ -0,0 +1,15 @@ +def is_s3(wav_loc: str) -> (bool, str): + """ + Check if the wav_loc is an s3 bucket + :param wav_loc: + The wav_loc to check + :return: + A tuple of (is_s3, bucket_core) + """ + + is_s3_match = re.match(r'^s3://', wav_loc) + # the bucket name will optionally have a * at the end + # keep only the bucket name before the * + bucket_core = re.sub(r'\*$', '', wav_loc) + bucket_core = re.sub(r'^s3://', '', bucket_core) + return is_s3_match, bucket_core \ No newline at end of file diff --git a/src/metadata/utils/__init__.py b/src/metadata/utils/__init__.py new file mode 100644 index 0000000..bc65bab --- /dev/null +++ b/src/metadata/utils/__init__.py @@ -0,0 +1,2 @@ +from .corrector import MetadataCorrector +from .wavfile import IcListenWavFile, SoundTrapWavFile \ No newline at end of file diff --git a/src/metadata/utils/corrector.py b/src/metadata/utils/corrector.py new file mode 100644 index 0000000..821e913 --- /dev/null +++ b/src/metadata/utils/corrector.py @@ -0,0 +1,263 @@ +# pypam-based-processing, Apache License 2.0 +# Filename: metadata/utils/corrector.py +# Description: Correct metadata for wav files and saves the results to a json file. Results are optionally uploaded to S3. + +import datetime +from datetime import timedelta + +import logger +import numpy as np +import pandas as pd +from pathlib import Path +import shutil +import boto3 +import tempfile +import time +import re +import json +from urllib.parse import urlparse + + +class MetadataCorrector: + + def __init__( + self, + logger: PbpLogger, + correct_df: pd.DataFrame, + json_path_out: str, + day: datetime, + sound_trap: bool, + seconds_per_file: float): + """ + Correct the metadata for a day and save to a json file + :param logger: + The logger to use + :param correct_df: + The dataframe containing the metadata to correct + :param json_path_out: + The path to save the corrected metadata json file + :param day: + The day to correct + :param sound_trap: + True if the files are from a sound trap + :param seconds_per_file: + The number of seconds in each file; not used for sound trap files + """ + self.correct_df = correct_df + self.metadata_path = json_path_out + self.day = day + self.sound_trap = sound_trap + self.seconds_per_file = seconds_per_file + self.log = logger + + def run(self): + """Run the corrector""" + + is_s3 = False + if re.match(r'^s3://', self.metadata_path): + is_s3 = True + + try: + + # Soundtrap files can be variable + if self.sound_trap: + files_per_day = None + # Filter the metadata to the day, starting 6 hours before the day starts to capture overlap + df = self.correct_df[(self.correct_df['start'] >= day - timedelta(hours=6)) & (self.correct_df['start'] < day + timedelta(days=1))] + else: # ICListen files fixed, but may be missing or incomplete if the system was down + files_per_day = int(86400 / self.seconds_per_file) + # Filter the metadata to the day, starting 10 minutes before the day starts to capture overlap + df = self.correct_df[(self.correct_df['start'] >= day - timedelta(minutes=10)) & (self.correct_df['start'] < day + timedelta(days=1))] + + self.log.debug(f'Creating metadata for day {day}') + + if len(df) == 0: + self.log.warn(f'No metadata found for day {day}') + return + + # convert the start and end times to datetime + df = df.copy() + + df['start'] = pd.to_datetime(df['start']) + df['end'] = pd.to_datetime(df['end']) + + # get the file list that covers the requested day + self.log.info(f'Found {len(df)} files from day {day}, starting {df.iloc[0]["start"]} ending {df.iloc[-1]["end"]}') + + # if there are no files, then return + if len(df) == 0: + self.log.warn(f'No files found for {day}') + return + + day_process = df + + if self.sound_trap: + self.log.info(f'Soundtrap files for {day} are variable. Skipping duration check') + for index, row in day_process.iterrows(): + self.log.debug(f'File {row["uri"]} duration {row["duration_secs"]} ') + else: + for index, row in day_process.iterrows(): + # if the duration_secs is not seconds per file, then the file is not complete + if row['duration_secs'] != self.seconds_per_file: + self.log.warn(f'File {row["duration_secs"]} != {self.seconds_per_file}. File is not complete') + continue + + # check whether there is a discrepancy between the number of seconds in the file and the number + # of seconds in the metadata. If there is a discrepancy, then correct the metadata + # This is only reliable for full days of data contained in complete files + day_process['jitter_secs'] = 0 + + if self.sound_trap or \ + (len(day_process) == files_per_day + 1 \ + and len(day_process['duration_secs'].unique()) == 1 \ + and day_process.iloc[0]['duration_secs'] == self.seconds_per_file): + + self.log.info(f'{len(day_process)} files available for {day}') + + # check whether the differences are all the same + if len(day_process['start'].diff().unique()) == 1 or self.sound_trap: + self.log.warn(f'No drift for {day}') + else: + self.log.info(f'Correcting drift for {day}') + + # correct the metadata + jitter = 0 + start = day_process.iloc[0]['start'] + end = start + timedelta(seconds=self.seconds_per_file) + + for index, row in day_process.iterrows(): + # jitter is the difference between the expected start time and the actual start time + # jitter is 0 for the first file + if row.start == start: + # round the jitter to the nearest second + jitter = start.to_datetime64() - row.start.to_datetime64() + jitter = int(jitter / np.timedelta64(1, 's')) + + # correct the start and end times + day_process.loc[index, 'start'] = start + day_process.loc[index, 'end'] = end + day_process.loc[index, 'jitter_secs'] = jitter + + if self.sound_trap: + end = row.end + else: + end = start + timedelta(seconds=self.seconds_per_file) + # round the end time to the nearest second as the timestamp is only accurate to the second + end = end.replace(microsecond=0) + # set the times for the next files + start = end + else: + day_process = self.no_jitter(day, day_process) + + # drop any rows with duplicate uri times, keeping the first + # duplicates can be caused by the jitter correction + day_process = day_process.drop_duplicates(subset=['uri'], keep='first') + + # save explicitly as UTC by setting the timezone in the start and end times + day_process['start'] = day_process['start'].dt.tz_localize('UTC') + day_process['end'] = day_process['end'].dt.tz_localize('UTC') + + self.save_day(day, day_process, is_s3) + + except Exception as e: + self.log.exception(f'Error correcting metadata for {day}. {e}') + finally: + self.log.debug(f'Done correcting metadata for {day}') + + def no_jitter( + self, + day: datetime, + day_process: pd.DataFrame) -> pd.DataFrame: + """ + Set the jitter to 0 and calculate the end time from the start time and the duration + :param day: + The day being processed + :param day_process: + The dataframe to correct + :return: + The corrected dataframe + """ + self.log.warn(f'Cannot correct {day}. Using file start times as is, setting jitter to 0 and using ' + f'calculated end times.') + # calculate the difference between each row start time and save as diff in a copy of the dataframe + day_process = day_process.copy() + day_process['diff'] = day_process['start'].diff() + day_process['jitter_secs'] = 0 + # calculate the end time which is the start time plus the number of seconds in the file + day_process['end'] = day_process['start'] + pd.to_timedelta(day_process['duration_secs'], unit='s') + return day_process + + def save_day( + self, + day: datetime, + day_process: pd.DataFrame, + is_s3: bool, + prefix: str = None): + """ + Save the day's metadata to a single json file either locally or to s3 + :param day: + The day to save + :param day_process: + The dataframe containing the metadata for the day + :param prefix: + An optional prefix for the filename + :param is_s3: + True if saving to s3 + :return: + """ + # if the exception column is empty, then drop it + if day_process['exception'].isnull().all(): + day_process.drop(columns=['exception'], inplace=True) + else: + # replace the NaN with an empty string + day_process['exception'].fillna('', inplace=True) + + # drop the pcm, fs, subtype, etc. columns + day_process.drop(columns=['fs', 'subtype', 'jitter_secs'], inplace=True) + + # if there is a diff column, then drop it + if 'diff' in day_process.columns: + day_process.drop(columns=['diff'], inplace=True) + + # Save with second accuracy to a temporary file formatted with ISO date format + df_final = day_process.sort_values(by=['start']) + + with tempfile.TemporaryDirectory() as tmpdir: + + tmp_path = Path(tmpdir) + if prefix: + temp_metadata = tmp_path / f'{prefix}_{day:%Y%m%d}.json' + else: + temp_metadata = tmp_path / f'{day:%Y%m%d}.json' + + df_final.to_json(temp_metadata.as_posix(), orient='records', date_format='iso', date_unit='s') + self.log.debug(f'Wrote {temp_metadata.as_posix()}') + + # read the file back in using records format with json + with open(temp_metadata.as_posix(), 'r') as f: + dict_records = json.load(f) + + # write the file back out with indenting + with open(temp_metadata.as_posix(), 'w', encoding='utf-8') as f: + json.dump(dict_records, f, ensure_ascii=True, indent=4) + + # if a s3 url then upload the file and retry if it fails + if is_s3: + client = boto3.client('s3') + for retry in range(10): + try: + with open(temp_metadata.as_posix(), 'rb') as data: + p = urlparse(self.metadata_path.rstrip('/')) + self.log.info(f"Uploading to s3://{p.netloc}/{p.path.lstrip('/')}") + if prefix: + client.upload_fileobj(data, p.netloc, + f"{p.path.lstrip('/')}/{prefix}_{day:%Y%m%d}.json") + else: + client.upload_fileobj(data, p.netloc, f"{p.path.lstrip('/')}/{day:%Y/%Y%m%d}.json") + break + except Exception as e: + self.log.exception(f'Exception {e} on retry {retry}') + time.sleep(60) + else: + # copy the file to a local metadata directory + shutil.copy2(temp_metadata.as_posix(), self.metadata_path.as_posix()) diff --git a/src/metadata/utils/wavfile.py b/src/metadata/utils/wavfile.py new file mode 100755 index 0000000..2d5d468 --- /dev/null +++ b/src/metadata/utils/wavfile.py @@ -0,0 +1,141 @@ +# pypam-based-processing, Apache License 2.0 +# Filename: metadata/utils/wavfile.py +# Description: wav file metadata reader. Supports SoundTrap and icListen wav files + +from logging import exception, warning +from pathlib import Path + +import numpy as np +from six.moves.urllib.request import urlopen +import io +import re +import soundfile as sf +import pandas as pd +from datetime import datetime, timedelta +import xml.etree.ElementTree as ET + + +class WavFile: + + # Abstract class for reading wav file metadata + def __init__( + self, + path_or_url: str, + start: datetime): + self.start = start + self.path_or_url = path_or_url + + def has_exception(self): + return True if len(self.exception) > 0 else False + + def to_df(self): + # if the self.path_or_url is a url, then add to the data frame with the appropriate prefix + if 's3://' in self.path_or_url: + df = pd.DataFrame({'uri': self.path_or_url, 'start': self.start, 'end': self.end, 'fs': self.fs, + 'duration_secs': self.duration_secs, 'channels': self.channels, + 'subtype': self.subtype, 'exception': self.exception}, + index=[self.start]) + else: + df = pd.DataFrame({'url': 'file://' + self.path_or_url, 'start': self.start, 'end': self.end, 'fs': self.fs, + 'duration_secs': self.duration_secs, 'channels': self.channels, + 'subtype': self.subtype, 'exception': self.exception}, + index=[self.start]) + return df + + def get_max_freq(self): + return self.fs / 2 + + +class SoundTrapWavFile(WavFile): + """SoundTrapWavFile uses the metadata from the xml files, not the wav file itself """ + + def __init__( + self, + uri: str, + xml_file: str): + tree = ET.parse(xml_file) + root = tree.getroot() + + # Iterate over the XML elements grabbing the needed metadata values + for element in root.iter('WavFileHandler'): + # Get the value of the id attribute + value = element.get('SamplingStartTimeUTC') + if value: + wav_start_dt = datetime.strptime(value, '%Y-%m-%dT%H:%M:%S') + + value = element.get('SamplingStopTimeUTC') + if value: + wav_stop_dt = datetime.strptime(value, '%Y-%m-%dT%H:%M:%S') + + value = element.get('SampleCount') + if value: + sample_count = int(value) + + self.path_or_url = uri + self.start = wav_start_dt + self.end = wav_stop_dt + self.duration_secs = sample_count / 48000 + self.fs = 48000 + self.frames = sample_count + self.channels = 1 + self.subtype = 'SoundTrap' + self.exception = np.NAN # no exceptions for SoundTrap files + + +class IcListenWavFile(WavFile): + """IcListenWavFile uses the metadata from the wav file itself, + but only grabs the needed metadata from the header in S3""" + + def __init__( + self, + path_or_url: str, + start: datetime): + self.path_or_url = path_or_url + self.start = start + self.duration_secs = -1 + self.fs = -1 + self.frames = -1 + self.channels = -1 + self.subtype = '' + self.exception = np.NAN + self.path_or_url = path_or_url + bytes_per_sec = 3 * 256e3 # 3 bytes per sample at 24-bit resolution and 256 kHz sampling rate + + try: + # if the in_file is a s3 url, then read the metadata from the s3 url + if re.match(r'^s3://', path_or_url): + p = Path(path_or_url) + bucket, key = p.parts[1], '/'.join(p.parts[2:]) + url = f'http://{bucket}.s3.amazonaws.com/{key}' + + # read the first 20,000 bytes of the file to get the metadata + info = sf.info(io.BytesIO(urlopen(url).read(20_000)), verbose=True) + # get the duration from the extra_info data field which stores the duration in total bytes + fields = info.extra_info.split() + idx = fields.index('data') + self.duration_secs = float(fields[idx + 2]) / bytes_per_sec + # get the size in bytes of the data+RIFF header + idx = fields.index('RIFF') + riff_size = int(fields[idx + 2]) + 8 + # get the content length from the http header + content_length = int(urlopen(url).info()['Content-Length']) + # if the content length is less than the size of the data+RIFF header, then the file is truncated but + # still may be usable + if content_length < riff_size: + self.exception = f'Truncated file {path_or_url}. Content length {content_length} < RIFF size {riff_size}' + # calculate the duration which is the size of the content length minus the size of the RIFF + # header which is 44 bytes. Round the duration to the nearest second since the recording is + # always in 1 second increments + self.duration_secs = round(content_length - 44) / bytes_per_sec + warning(self.exception) + else: + info = sf.info(path_or_url) + self.duration_secs = info.duration + + self.end = self.start + timedelta(microseconds=int(info.frames * 1e6 / info.samplerate)) + self.fs = info.samplerate + self.frames = info.frames + self.channels = info.channels + self.subtype = info.subtype if info.subtype else '' + except Exception as ex: + self.log.exception(f'Corrupt file {path_or_url}. {ex}') From c365fa3ce2e34a764b24eb8ab777f2678192f3a8 Mon Sep 17 00:00:00 2001 From: danellecline Date: Mon, 20 Nov 2023 15:16:23 -0800 Subject: [PATCH 03/10] initial check-in of json generator code. not functional yet, just some styling abd remove MBARI/MARS specific code --- src/metadata/generator/gen_iclisten.py | 41 ++++++++++++++++---------- src/metadata/generator/utils.py | 8 ++--- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/src/metadata/generator/gen_iclisten.py b/src/metadata/generator/gen_iclisten.py index 5b60880..7968dc3 100644 --- a/src/metadata/generator/gen_iclisten.py +++ b/src/metadata/generator/gen_iclisten.py @@ -50,6 +50,8 @@ def __int__( def run(self): print(f'Generating metadata for {self.start} to {self.end}...') + is_s3, bucket_name = utils.is_s3(self.wav_loc) + # Run for each day in the range for day in pd.date_range(self.start, self.end, freq='D'): try: @@ -57,16 +59,23 @@ def run(self): self.log.info(f'{self.log_prefix} Searching in {self.wav_loc}/*.wav for wav files that match the search pattern {self.search}* ...') wav_files = [] - is_s3 = re.match(r'^s3://', self.wav_loc) - # the bucket name will optionally have a * at the end - # keep only the bucket name before the * - bucket_core = re.sub(r'\*$', '', self.wav_loc) - bucket_core = re.sub(r'^s3://', '', bucket_core) - def check_file(f: str, f_start_dt: datetime, f_end_dt: datetime): + def check_file(f: str, + f_start_dt: datetime, + f_end_dt: datetime): + """ + Check if the file matches the search pattern and is within the start and end dates + :param f: + The path to the file + :param f_start_dt: + The start date to check + :param f_end_dt: + The end date to check + :return: + """ f_path = Path(f) - wav_dt = None + f_wav_dt = None for s in self.search: # see if the file is a regexp match to search @@ -78,14 +87,13 @@ def check_file(f: str, f_start_dt: datetime, f_end_dt: datetime): f_path_dt = datetime.strptime(f_path.stem, f'{s}_%Y%m%d_%H%M%S') if f_start_dt <= f_path_dt <= f_end_dt: - wc = utils.IcListenWavFile(f, f_path_dt) - wav_files.append(wc) - wav_dt = f_path_dt + wav_files.append(utils.IcListenWavFile(f, f_path_dt)) + f_wav_dt = f_path_dt except ValueError: self.log.error(f'{self.log_prefix} Could not parse {f_path.name}') return None - return wav_dt + return f_wav_dt if not is_s3: wav_path = Path(self.wav_loc) @@ -96,15 +104,18 @@ def check_file(f: str, f_start_dt: datetime, f_end_dt: datetime): # dates client = boto3.client('s3') - # Set the start and end dates to an hour before and after the start and end dates + # Set the start and end dates to 30 minutes before and after the start and end dates start_dt = day - timedelta(hours=1) end_dt = day + timedelta(days=1) - start_dt_hour = start_dt - timedelta(minutes=30) - end_dt_hour = end_dt + timedelta(minutes=30) + + # set the window to 3x the expected duration of the wav file to account for any missing data + minutes_window = int(self.seconds_per_file * 3 / 60) + start_dt_hour = start_dt - timedelta(minutes=minutes_window) + end_dt_hour = end_dt + timedelta(minutes=minutes_window) for day_hour in pd.date_range(start=start_dt, end=end_dt, freq='H'): - bucket = f'{bucket_core}-{day_hour.year:04d}' + bucket = f'{bucket_name}-{day_hour.year:04d}' prefix = f'{day_hour.month:02d}/MARS_{day_hour.year:04d}{day_hour.month:02d}{day_hour.day:02d}_{day_hour.hour:02d}' paginator = client.get_paginator('list_objects') diff --git a/src/metadata/generator/utils.py b/src/metadata/generator/utils.py index 04f0982..069d78a 100644 --- a/src/metadata/generator/utils.py +++ b/src/metadata/generator/utils.py @@ -1,15 +1,15 @@ def is_s3(wav_loc: str) -> (bool, str): """ - Check if the wav_loc is an s3 bucket + Check if the wav_loc is a s3 bucket, and return the bucket name :param wav_loc: The wav_loc to check :return: - A tuple of (is_s3, bucket_core) + A tuple of (is_s3, bucket_name) """ is_s3_match = re.match(r'^s3://', wav_loc) # the bucket name will optionally have a * at the end # keep only the bucket name before the * bucket_core = re.sub(r'\*$', '', wav_loc) - bucket_core = re.sub(r'^s3://', '', bucket_core) - return is_s3_match, bucket_core \ No newline at end of file + bucket_name = re.sub(r'^s3://', '', bucket_core) + return is_s3_match, bucket_name \ No newline at end of file From 1569c141a31a721c7b113600055be9c0dfc4d18d Mon Sep 17 00:00:00 2001 From: danellecline Date: Mon, 12 Feb 2024 19:14:22 -0800 Subject: [PATCH 04/10] working soundtrap and iclisten pytests --- requirements.txt | 1 + src/json_generator/__init__.py | 0 .../utils => json_generator}/corrector.py | 66 +++------ .../gen_abstract.py | 50 +++---- .../gen_iclisten.py | 75 +++++++--- .../gen_soundtrap.py | 138 +++++++++++------- .../generator => json_generator}/utils.py | 3 + .../utils => json_generator}/wavfile.py | 2 +- src/metadata/generator/__init__.py | 2 - src/metadata/utils/__init__.py | 2 - tests/test_json_generator.py | 126 ++++++++++++++++ 11 files changed, 314 insertions(+), 151 deletions(-) create mode 100644 src/json_generator/__init__.py rename src/{metadata/utils => json_generator}/corrector.py (78%) rename src/{metadata/generator => json_generator}/gen_abstract.py (69%) rename src/{metadata/generator => json_generator}/gen_iclisten.py (75%) rename src/{metadata/generator => json_generator}/gen_soundtrap.py (56%) rename src/{metadata/generator => json_generator}/utils.py (97%) rename src/{metadata/utils => json_generator}/wavfile.py (99%) delete mode 100644 src/metadata/generator/__init__.py delete mode 100644 src/metadata/utils/__init__.py create mode 100644 tests/test_json_generator.py diff --git a/requirements.txt b/requirements.txt index 6f8ce53..c19ef65 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,6 +6,7 @@ pyyaml==6.0.1 marshmallow==3.20.2 # lifewatch-pypam # when published soundfile==0.12.1 +Pyarrow==15.0.0 # quickly tried it but got: AttributeError: module 'xarray_extras' has no attribute 'csv' # xarray-extras==0.5.0 diff --git a/src/json_generator/__init__.py b/src/json_generator/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/metadata/utils/corrector.py b/src/json_generator/corrector.py similarity index 78% rename from src/metadata/utils/corrector.py rename to src/json_generator/corrector.py index 821e913..f4e1e9e 100644 --- a/src/metadata/utils/corrector.py +++ b/src/json_generator/corrector.py @@ -5,7 +5,6 @@ import datetime from datetime import timedelta -import logger import numpy as np import pandas as pd from pathlib import Path @@ -17,6 +16,8 @@ import json from urllib.parse import urlparse +from src import PbpLogger + class MetadataCorrector: @@ -44,7 +45,7 @@ def __init__( The number of seconds in each file; not used for sound trap files """ self.correct_df = correct_df - self.metadata_path = json_path_out + self.json_base_dir = json_path_out self.day = day self.sound_trap = sound_trap self.seconds_per_file = seconds_per_file @@ -53,26 +54,22 @@ def __init__( def run(self): """Run the corrector""" - is_s3 = False - if re.match(r'^s3://', self.metadata_path): - is_s3 = True - try: # Soundtrap files can be variable if self.sound_trap: files_per_day = None # Filter the metadata to the day, starting 6 hours before the day starts to capture overlap - df = self.correct_df[(self.correct_df['start'] >= day - timedelta(hours=6)) & (self.correct_df['start'] < day + timedelta(days=1))] + df = self.correct_df[(self.correct_df['start'] >= self.day - timedelta(hours=6)) & (self.correct_df['start'] < self.day + timedelta(days=1))] else: # ICListen files fixed, but may be missing or incomplete if the system was down files_per_day = int(86400 / self.seconds_per_file) # Filter the metadata to the day, starting 10 minutes before the day starts to capture overlap - df = self.correct_df[(self.correct_df['start'] >= day - timedelta(minutes=10)) & (self.correct_df['start'] < day + timedelta(days=1))] + df = self.correct_df[(self.correct_df['start'] >= self.day - timedelta(minutes=10)) & (self.correct_df['start'] < self.day + timedelta(days=1))] - self.log.debug(f'Creating metadata for day {day}') + self.log.debug(f'Creating metadata for day {self.day}') if len(df) == 0: - self.log.warn(f'No metadata found for day {day}') + self.log.warn(f'No metadata found for day {self.day}') return # convert the start and end times to datetime @@ -82,17 +79,17 @@ def run(self): df['end'] = pd.to_datetime(df['end']) # get the file list that covers the requested day - self.log.info(f'Found {len(df)} files from day {day}, starting {df.iloc[0]["start"]} ending {df.iloc[-1]["end"]}') + self.log.info(f'Found {len(df)} files from day {self.day}, starting {df.iloc[0]["start"]} ending {df.iloc[-1]["end"]}') # if there are no files, then return if len(df) == 0: - self.log.warn(f'No files found for {day}') + self.log.warn(f'No files found for {self.day}') return day_process = df if self.sound_trap: - self.log.info(f'Soundtrap files for {day} are variable. Skipping duration check') + self.log.info(f'Soundtrap files for {self.day} are variable. Skipping duration check') for index, row in day_process.iterrows(): self.log.debug(f'File {row["uri"]} duration {row["duration_secs"]} ') else: @@ -112,13 +109,13 @@ def run(self): and len(day_process['duration_secs'].unique()) == 1 \ and day_process.iloc[0]['duration_secs'] == self.seconds_per_file): - self.log.info(f'{len(day_process)} files available for {day}') + self.log.info(f'{len(day_process)} files available for {self.day}') # check whether the differences are all the same if len(day_process['start'].diff().unique()) == 1 or self.sound_trap: - self.log.warn(f'No drift for {day}') + self.log.warn(f'No drift for {self.day}') else: - self.log.info(f'Correcting drift for {day}') + self.log.info(f'Correcting drift for {self.day}') # correct the metadata jitter = 0 @@ -147,7 +144,7 @@ def run(self): # set the times for the next files start = end else: - day_process = self.no_jitter(day, day_process) + day_process = self.no_jitter(self.day, day_process) # drop any rows with duplicate uri times, keeping the first # duplicates can be caused by the jitter correction @@ -157,12 +154,12 @@ def run(self): day_process['start'] = day_process['start'].dt.tz_localize('UTC') day_process['end'] = day_process['end'].dt.tz_localize('UTC') - self.save_day(day, day_process, is_s3) + self.save_day(self.day, day_process) except Exception as e: - self.log.exception(f'Error correcting metadata for {day}. {e}') + self.log.exception(f'Error correcting metadata for {self.day}. {e}') finally: - self.log.debug(f'Done correcting metadata for {day}') + self.log.debug(f'Done correcting metadata for {self.day}') def no_jitter( self, @@ -177,7 +174,7 @@ def no_jitter( :return: The corrected dataframe """ - self.log.warn(f'Cannot correct {day}. Using file start times as is, setting jitter to 0 and using ' + self.log.warn(f'Cannot correct {self.day}. Using file start times as is, setting jitter to 0 and using ' f'calculated end times.') # calculate the difference between each row start time and save as diff in a copy of the dataframe day_process = day_process.copy() @@ -191,7 +188,6 @@ def save_day( self, day: datetime, day_process: pd.DataFrame, - is_s3: bool, prefix: str = None): """ Save the day's metadata to a single json file either locally or to s3 @@ -201,8 +197,6 @@ def save_day( The dataframe containing the metadata for the day :param prefix: An optional prefix for the filename - :param is_s3: - True if saving to s3 :return: """ # if the exception column is empty, then drop it @@ -241,23 +235,7 @@ def save_day( with open(temp_metadata.as_posix(), 'w', encoding='utf-8') as f: json.dump(dict_records, f, ensure_ascii=True, indent=4) - # if a s3 url then upload the file and retry if it fails - if is_s3: - client = boto3.client('s3') - for retry in range(10): - try: - with open(temp_metadata.as_posix(), 'rb') as data: - p = urlparse(self.metadata_path.rstrip('/')) - self.log.info(f"Uploading to s3://{p.netloc}/{p.path.lstrip('/')}") - if prefix: - client.upload_fileobj(data, p.netloc, - f"{p.path.lstrip('/')}/{prefix}_{day:%Y%m%d}.json") - else: - client.upload_fileobj(data, p.netloc, f"{p.path.lstrip('/')}/{day:%Y/%Y%m%d}.json") - break - except Exception as e: - self.log.exception(f'Exception {e} on retry {retry}') - time.sleep(60) - else: - # copy the file to a local metadata directory - shutil.copy2(temp_metadata.as_posix(), self.metadata_path.as_posix()) + # copy the file to a local metadata directory with year subdirectory + output_path = Path(self.json_base_dir, str(day.year)) + output_path.mkdir(parents=True, exist_ok=True) + shutil.copy2(temp_metadata.as_posix(), output_path) diff --git a/src/metadata/generator/gen_abstract.py b/src/json_generator/gen_abstract.py similarity index 69% rename from src/metadata/generator/gen_abstract.py rename to src/json_generator/gen_abstract.py index 844a103..9c27650 100644 --- a/src/metadata/generator/gen_abstract.py +++ b/src/json_generator/gen_abstract.py @@ -1,34 +1,36 @@ # pypam-based-processing # Filename: metadata/generator/gen_abstract.py # Description: Abstract class that captures sound wav metadata +import logging + +import re -import pathlib from datetime import datetime -from pathlib import Path import pandas as pd -import logger -import utils as utils -from src.logging_helper import PbpLogger + +from src.json_generator import utils +from src.logging_helper import PbpLogger, create_logger class MetadataGeneratorAbstract(object): def __init__(self, - pbp_logger: PbpLogger, + logger: PbpLogger, wav_loc: str, - metadata_loc: str, + json_base_dir: str, search: [str], start: datetime, end: datetime, - seconds_per_file: float = 0.): + seconds_per_file: float = 0., + **kwargs): """ Abstract class for capturing sound wav metadata - :param pbp_logger: + :param logger: The logger :param wav_loc: The local directory or S3 bucket that contains the wav files - :param metadata_loc: - The local directory or S3 bucket to store the metadata + :param json_base_dir: + The local directory to write the json files to :param search: The search pattern to match the wav files, e.g. 'MARS' :param start: @@ -41,22 +43,21 @@ def __init__(self, """ try: self.wav_loc = wav_loc - self.metadata_path = metadata_loc + self.json_base_dir = json_base_dir self.df = pd.DataFrame() self.start = start self.end = end self.search = search - self.seconds_per_file = None if seconds_per_file == 0 else seconds_per_file - self._log = pbp_logger - self.cache_path = Path(log_dir) / 's3cache' / f'{self.__class__.__name__}' - self.cache_path.mkdir(parents=True, exist_ok=True) + self._seconds_per_file = None if seconds_per_file == 0 else seconds_per_file + self.logger = logger except Exception as e: - self._log.err(f'Could not initialize {self.__class__.__name__} for {start:%Y%m%d}') raise e - - - def search(self): + def setup(self): + """ + Setup by first getting the bucket name and checking if it is an S3 bucket + :return: + """ self.log.info( f'{self.log_prefix} Searching in {self.wav_loc}/*.wav for wav files that match the search pattern {self.search}* ...') @@ -65,9 +66,7 @@ def search(self): # keep only the bucket name before the * bucket_core = re.sub(r'\*$', '', self.wav_loc) bucket_core = re.sub(r'^s3://', '', bucket_core) - return bucket_core, is_s3, wav_files - - + return bucket_core, is_s3 @staticmethod def raw(path_or_url: str): @@ -80,11 +79,11 @@ def raw(path_or_url: str): @property def log(self): - return self._log + return self.logger @property def seconds_per_file(self): - return self.seconds_per_file + return self._seconds_per_file @property def correct_df(self): @@ -93,3 +92,4 @@ def correct_df(self): # abstract run method def run(self): pass + diff --git a/src/metadata/generator/gen_iclisten.py b/src/json_generator/gen_iclisten.py similarity index 75% rename from src/metadata/generator/gen_iclisten.py rename to src/json_generator/gen_iclisten.py index 7968dc3..a830aa9 100644 --- a/src/metadata/generator/gen_iclisten.py +++ b/src/json_generator/gen_iclisten.py @@ -6,49 +6,52 @@ from datetime import timedelta from datetime import datetime import boto3 -import numpy as np import pandas as pd from pathlib import Path from progressbar import progressbar -import metadata.utils as utils -from .gen_abstract import MetadataGeneratorAbstract - +import json_generator.utils as utils +from json_generator.corrector import MetadataCorrector +from json_generator.wavfile import IcListenWavFile +from src import PbpLogger +from src.json_generator.gen_abstract import MetadataGeneratorAbstract class IcListenMetadataGenerator(MetadataGeneratorAbstract): + log_prefix = None + def __int__( self, pbp_logger: PbpLogger, wav_loc: str, - metadata_loc: str, - search: [str], + json_base_dir: str, start: datetime, end: datetime, - seconds_per_file: float = 0.): + search: [str], + seconds_per_file: float): """ Captures ICListen wav metadata in a pandas dataframe from either a local directory or S3 bucket. :param pbp_logger: The logger :param wav_loc: The local directory or S3 bucket that contains the wav files - :param metadata_loc: - The local directory or S3 bucket to store the metadata - :param search: - The search pattern to match the wav files, e.g. 'MARS' + :param json_base_dir: + The local directory to store the metadata :param start: The start date to search for wav files :param end: The end date to search for wav files + :param search: + The search pattern to match the wav files, e.g. 'MARS' for MARS_YYYYMMDD_HHMMSS.wav :param seconds_per_file: The number of seconds per file expected in a wav file to check for missing data. If 0, then no check is done. :return: """ - super().__init__(pbp_logger, wav_loc, metadata_loc, search, start, end, seconds_per_file) - self.log_prefix = f'{self.__class__.__name__} {self.start:%Y%m%d}' + super().__init__(pbp_logger, wav_loc, json_base_dir, search, start, end, seconds_per_file) + self.log_prefix = f'{self.__class__.__name__} {start:%Y%m%d}' def run(self): - print(f'Generating metadata for {self.start} to {self.end}...') + self.log.info(f'Generating metadata for {self.start} to {self.end}...') is_s3, bucket_name = utils.is_s3(self.wav_loc) @@ -87,7 +90,7 @@ def check_file(f: str, f_path_dt = datetime.strptime(f_path.stem, f'{s}_%Y%m%d_%H%M%S') if f_start_dt <= f_path_dt <= f_end_dt: - wav_files.append(utils.IcListenWavFile(f, f_path_dt)) + wav_files.append(IcListenWavFile(f, f_path_dt)) f_wav_dt = f_path_dt except ValueError: self.log.error(f'{self.log_prefix} Could not parse {f_path.name}') @@ -113,7 +116,7 @@ def check_file(f: str, start_dt_hour = start_dt - timedelta(minutes=minutes_window) end_dt_hour = end_dt + timedelta(minutes=minutes_window) - for day_hour in pd.date_range(start=start_dt, end=end_dt, freq='H'): + for day_hour in pd.date_range(start=start_dt, end=end_dt, freq='h'): bucket = f'{bucket_name}-{day_hour.year:04d}' prefix = f'{day_hour.month:02d}/MARS_{day_hour.year:04d}{day_hour.month:02d}{day_hour.day:02d}_{day_hour.hour:02d}' @@ -124,6 +127,7 @@ def check_file(f: str, self.log.info(f'{self.log_prefix} Searching in bucket: {bucket} prefix: {prefix}') # list the objects in the bucket # loop through the objects and check if they match the search pattern + num_found = 0 for page in page_iterator: if 'Contents' not in page: self.log.info(f'{self.log_prefix} No data found in {bucket}') @@ -138,10 +142,6 @@ def check_file(f: str, break if wav_dt < start_dt_hour: break - self.log.debug(f'{self.log_prefix} Found {wav_dt}') - # num_found += 1 - # if num_found > 100: - # break self.log.info(f'{self.log_prefix} Found {len(wav_files)} files to process that cover the period {start_dt} - {end_dt}') @@ -158,8 +158,41 @@ def check_file(f: str, self.df = pd.concat([self.df, df_wav], axis=0) self.log.debug(f'{self.log_prefix} Running metadata corrector for {day}') - corrector = utils.MetadataCorrector(self.log, self.df, self.metadata_path, day, False, 600.) + corrector = MetadataCorrector(self.log, self.df, self.json_base_dir, day, False, 600.) corrector.run() except Exception as ex: self.log.exception(str(ex)) + + +if __name__ == '__main__': + import logging + from src.logging_helper import PbpLogger, create_logger + from src.json_generator.gen_iclisten import IcListenMetadataGenerator + + log_dir = Path('tests/log') + json_dir = Path('tests/json/mars') + log_dir.mkdir(exist_ok=True, parents=True) + json_dir.mkdir(exist_ok=True, parents=True) + + logger = create_logger( + log_filename_and_level=( + f"{log_dir}/test_soundtrap_metadata_generator.log", + logging.INFO, + ), + console_level=logging.INFO, + ) + + + start = datetime(2023, 7, 18, 0, 0, 0) + end = datetime(2023, 7, 18, 0, 0, 0) + + # If only running one day, use a single generator + generator = IcListenMetadataGenerator(logger=logger, + wav_loc='s3://pacific-sound-256khz', + json_base_dir=json_dir.as_posix(), + search=['MARS'], + start=start, + end=end, + seconds_per_file=300) + generator.run() \ No newline at end of file diff --git a/src/metadata/generator/gen_soundtrap.py b/src/json_generator/gen_soundtrap.py similarity index 56% rename from src/metadata/generator/gen_soundtrap.py rename to src/json_generator/gen_soundtrap.py index 24ddcd7..bbe176d 100644 --- a/src/metadata/generator/gen_soundtrap.py +++ b/src/json_generator/gen_soundtrap.py @@ -1,44 +1,50 @@ # pypam-based-processing -# Filename: metadata/generator/gen_soundtrap.py +# Filename: json_generator/gen_soundtrap.py # Description: Captures SoundTrap metadata either from a local directory of S3 bucket +import logging +import boto3 import datetime -import shutil -from datetime import timedelta, datetime import pandas as pd -from pathlib import Path -import boto3 -import tempfile import re +import pytz + +from datetime import timedelta, datetime +from pathlib import Path from progressbar import progressbar -import utils -from .gen_abstract import MetadataGeneratorAbstract + +from src import PbpLogger +from src.json_generator.gen_abstract import MetadataGeneratorAbstract +from src.json_generator.wavfile import SoundTrapWavFile +from src.json_generator.corrector import MetadataCorrector class SoundTrapMetadataGenerator(MetadataGeneratorAbstract): """ Captures SoundTrap wav file metadata either from a local directory or S3 bucket. """ - start = datetime.utcnow() - end = datetime.utcnow() + + # Set the start and end dates to the current time in UTC + start = datetime.now(pytz.utc) + end = datetime.now(pytz.utc) + + log_prefix = None def __init__( self, - log_dir: str, + logger: PbpLogger, wav_loc: str, - metadata_loc: str, + json_base_dir: str, search: [str], start: datetime, end: datetime): """ - Captures SoundTrap wav file metadata either from a local directory or S3 bucket. - - :param pbp_logger: + :param logger: The logger :param wav_loc: The local directory or S3 bucket that contains the wav files - :param metadata_loc: - The local directory or S3 bucket to store the metadata + :param json_base_dir: + The local directory to write the json files to :param search: The search pattern to match the wav files, e.g. 'MARS' :param start: @@ -49,29 +55,28 @@ def __init__( The number of seconds per file expected in a wav file to check for missing data. If missing, then no check is done. :return: """ - super().__init__(log_dir, wav_loc, metadata_loc, search, start, end, 0.) - self.start = start - self.end = end + super().__init__(logger, wav_loc, json_base_dir, search, start, end, 0.) + # Add a prefix to the log messages to differentiate between the different metadata generators running by date # This is useful when running multiple metadata generators in parallel - self.log_prefix = f'{self.__class__.__name__} {self.start:%Y%m%d}' # SoundTrapMetadataGenerator 20210801 + self.log_prefix = f'{self.__class__.__name__} {self.start:%Y%m%d}' def run(self): - try: - self.search() + xml_cache_path = Path(self.json_base_dir) / 'xml_cache' + xml_cache_path.mkdir(exist_ok=True, parents=True) + wav_files = [] + bucket_core, is_s3 = self.setup() - def add_file(xml_file: str, wav_file: str): + def check_file(xml_file: str) -> bool: """ Check if the xml file is in the cache directory :param xml_file: The xml file with the metadata - :param wav_file: - The wav file :return: - None + True if the file is within the start and end dates """ - + wav_files = [] f_path = Path(xml_file) # see if the file is a regexp match to self.search for s in self.search: @@ -82,7 +87,7 @@ def add_file(xml_file: str, wav_file: str): # If a SoundTrap file, then the date is in the filename XXXX.YYYYMMDDHHMMSS.xml f_path_dt = datetime.strptime(f_path.stem.split('.')[1], '%y%m%d%H%M%S') if self.start <= f_path_dt <= self.end: - wav_files.append(utils.SoundTrapWavFile(wav_file, xml_file)) + return True except ValueError: self.log.error(f'{self.log_prefix} Could not parse {f_path.name}') @@ -90,7 +95,8 @@ def add_file(xml_file: str, wav_file: str): wav_path = Path(self.wav_loc) for filename in progressbar(sorted(wav_path.rglob('*.xml')), prefix='Searching : '): wav_path = filename.parent / f'{filename.stem}.wav' - add_file(filename, wav_path) + if check_file(filename): + wav_files.append(SoundTrapWavFile(wav_path, filename)) else: # if the wav_loc is a s3 url, then we need to list the files in buckets that cover the start and end # dates @@ -103,32 +109,27 @@ def add_file(xml_file: str, wav_file: str): operation_parameters = {'Bucket': bucket} page_iterator = paginator.paginate(**operation_parameters) - self.log.info(f'Searching in bucket: {bucket} for .wav and .xml files between {self.start} and {self.end} ') + self.log.info( + f'Searching in bucket: {bucket} for .wav and .xml files between {self.start} and {self.end} ') # list the objects in the bucket # loop through the objects and check if they match the search pattern - with tempfile.TemporaryDirectory() as tmpdir: - for page in page_iterator: - for obj in page['Contents']: - key = obj['Key'] - - if '.xml' in key: - output_xml = f'{tmpdir}/{key}' - output_wav = f's3://{bucket}/{key}'.replace('log.xml', 'wav') - - # Check if the xml file is in the cache directory - xml_path = Path(self.cache_path, key) - if xml_path.exists(): - shutil.copy(xml_path, output_xml) - else: - # Download the xml file to a temporary directory - self.log.info(f'{self.log_prefix} Downloading {key} ...') - client.download_file(bucket, key, output_xml) - # Save the xml file to the cache directory - self.log.info(f'{self.log_prefix} Saving {key} to {self.cache_path} ...') - shutil.copy(output_xml, self.cache_path) - add_file(xml_path, output_wav) - - self.log.info(f'{self.log_prefix} Found {len(wav_files)} files to process that cover the period {self.start} - {self.end}') + for page in page_iterator: + for obj in page['Contents']: + key = obj['Key'] + + if '.xml' in key and check_file(key): + xml_path = xml_cache_path / key + wav_uri = f's3://{bucket}/{key}'.replace('log.xml', 'wav') + + # Check if the xml file is in the cache directory + if not xml_path.exists(): + # Download the xml file to a temporary directory + self.log.info(f'{self.log_prefix} Downloading {key} ...') + client.download_file(bucket, key, xml_path) + wav_files.append(SoundTrapWavFile(wav_uri, xml_path)) + + self.log.info( + f'{self.log_prefix} Found {len(wav_files)} files to process that cover the period {self.start} - {self.end}') if len(wav_files) == 0: return @@ -137,7 +138,8 @@ def add_file(xml_file: str, wav_file: str): wav_files.sort(key=lambda x: x.start) # create a dataframe from the wav files - self.log.info(f'{self.log_prefix} Creating dataframe from {len(wav_files)} files spanning {wav_files[0].start} to {wav_files[-1].start}...') + self.log.info( + f'{self.log_prefix} Creating dataframe from {len(wav_files)} files spanning {wav_files[0].start} to {wav_files[-1].start}...') for wc in wav_files: df_wav = wc.to_df() @@ -161,7 +163,31 @@ def add_file(xml_file: str, wav_file: str): day_start = self.start + timedelta(days=day) self.log.debug(f'{self.log_prefix} Running metadata corrector for {day_start}') soundtrap = True - corrector = utils.MetadataCorrector(self.log, self.df, self.metadata_path, day_start, soundtrap, 0) + corrector = MetadataCorrector(self.log, self.df, self.json_base_dir, day_start, soundtrap, 0) corrector.run() +if __name__ == '__main__': + from src.logging_helper import PbpLogger, create_logger + from generator import SoundTrapMetadataGenerator + log_dir = Path('tests/log') + json_dir = Path('tests/json/soundtrap') + log_dir.mkdir(exist_ok=True, parents=True) + json_dir.mkdir(exist_ok=True, parents=True) + + logger = create_logger( + log_filename_and_level=( + f"{log_dir}/test_soundtrap_metadata_generator.log", + logging.INFO, + ), + console_level=logging.INFO, + ) + + start = datetime(2023, 7, 18) + end = datetime(2023, 7, 19) + gen = SoundTrapMetadataGenerator(logger, + 's3://pacific-sound-ch01', + json_dir.as_posix(), + ["7000"], + start, end) + gen.run() diff --git a/src/metadata/generator/utils.py b/src/json_generator/utils.py similarity index 97% rename from src/metadata/generator/utils.py rename to src/json_generator/utils.py index 069d78a..7fada7c 100644 --- a/src/metadata/generator/utils.py +++ b/src/json_generator/utils.py @@ -1,3 +1,6 @@ +import re + + def is_s3(wav_loc: str) -> (bool, str): """ Check if the wav_loc is a s3 bucket, and return the bucket name diff --git a/src/metadata/utils/wavfile.py b/src/json_generator/wavfile.py similarity index 99% rename from src/metadata/utils/wavfile.py rename to src/json_generator/wavfile.py index 2d5d468..f12c227 100755 --- a/src/metadata/utils/wavfile.py +++ b/src/json_generator/wavfile.py @@ -1,5 +1,5 @@ # pypam-based-processing, Apache License 2.0 -# Filename: metadata/utils/wavfile.py +# Filename: json_generator/wavfile.py # Description: wav file metadata reader. Supports SoundTrap and icListen wav files from logging import exception, warning diff --git a/src/metadata/generator/__init__.py b/src/metadata/generator/__init__.py deleted file mode 100644 index ecdaa7b..0000000 --- a/src/metadata/generator/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .gen_soundtrap import SoundTrapMetadataGenerator -from .gen_iclisten import IcListenMetadataGenerator diff --git a/src/metadata/utils/__init__.py b/src/metadata/utils/__init__.py deleted file mode 100644 index bc65bab..0000000 --- a/src/metadata/utils/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .corrector import MetadataCorrector -from .wavfile import IcListenWavFile, SoundTrapWavFile \ No newline at end of file diff --git a/tests/test_json_generator.py b/tests/test_json_generator.py new file mode 100644 index 0000000..f33fb83 --- /dev/null +++ b/tests/test_json_generator.py @@ -0,0 +1,126 @@ +import json + +import boto3 +import botocore +import pytest +from botocore.exceptions import ClientError +from datetime import datetime + +import logging + +from pathlib import Path + +from json_generator.gen_iclisten import IcListenMetadataGenerator +from src.logging_helper import create_logger +from src.json_generator.gen_soundtrap import SoundTrapMetadataGenerator +from src.json_generator.gen_iclisten import IcListenMetadataGenerator + + +def get_account() -> str: + """ + Get the account number associated with this user + :return: + """ + try: + account_number = boto3.client('sts').get_caller_identity()['Account'] + print(f'Found account {account_number}') + return account_number + except ClientError as e: + print(e) + msg = f'Could not get account number from AWS. Check your config.ini file. ' \ + f'Account number is not set in the config.ini file and AWS credentials are not configured.' + print(msg) + return None + except botocore.exceptions.NoCredentialsError as e: + print(e) + return None + +# Check if an AWS account is configured by checking if it can access the model with the default credentials +AWS_AVAILABLE = False +if get_account(): + AWS_AVAILABLE = True + +@pytest.mark.skipif(not AWS_AVAILABLE, + reason="This test is excluded because it requires a valid AWS account") +def test_soundtrap_json_generator(): + """ + Test fixture for SoundTrapMetadataGenerator. + Tests the SoundTrapMetadataGenerator class ability to generate metadata for soundtrap recording files. + Two files should be generated in the json directory for the dates specified. + :return: + """ + log_dir = Path('tests/log') + json_dir = Path('tests/json/soundtrap') + log_dir.mkdir(exist_ok=True, parents=True) + json_dir.mkdir(exist_ok=True, parents=True) + + logger = create_logger( + log_filename_and_level=( + f"{log_dir}/test_soundtrap_metadata_generator.log", + logging.INFO, + ), + console_level=logging.INFO, + ) + + start = datetime(2023, 7, 18) + end = datetime(2023, 7, 19) + gen = SoundTrapMetadataGenerator(logger=logger, + wav_loc='s3://pacific-sound-ch01', + json_base_dir=json_dir.as_posix(), + search=["7000"], + start=start, + end=end) + gen.run() + + # There should be two files in the json directory named 20230718.json and 20230719.json + json_files = list(Path('tests/json/soundtrap').rglob('*.json')) + assert len(json_files) == 2 + assert Path('tests/json/soundtrap/2023/20230718.json').exists() + assert Path('tests/json/soundtrap/2023/20230719.json').exists() + +@pytest.mark.skipif(not AWS_AVAILABLE, + reason="This test is excluded because it requires a valid AWS account") +def test_iclisten_json_generator(): + """ + Test fixture for IcListenMetadataGenerator. + Tests the IcListenMetadataGenerator class ability to generate metadata for soundtrap recording files. + One files should be generated in the json directory for the date specified. Note this currently + only works for MBARI MARS data + :return: + """ + + log_dir = Path('tests/log') + json_dir = Path('tests/json/mars') + log_dir.mkdir(exist_ok=True, parents=True) + json_dir.mkdir(exist_ok=True, parents=True) + + logger = create_logger( + log_filename_and_level=( + f"{log_dir}/test_soundtrap_metadata_generator.log", + logging.INFO, + ), + console_level=logging.INFO, + ) + + start = datetime(2023, 7, 18, 0, 0, 0) + end = datetime(2023, 7, 18, 0, 0, 0) + + # If only running one day, use a single generator + generator = IcListenMetadataGenerator(logger=logger, + wav_loc='s3://pacific-sound-256khz', + json_base_dir=json_dir.as_posix(), + search=['MARS'], + start=start, + end=end, + seconds_per_file=300) + generator.run() + # There should be one files in the json directory named 20230718.json and it should have 145 json objects + json_files = list(Path('tests/json/mars/').rglob('*.json')) + assert len(json_files) == 1 + assert Path('tests/json/mars/2023/20230718.json').exists() + + # Read the file and check the number of json objects + with open('tests/json/mars/2023/20230718.json') as f: + json_objcts = json.load(f) + if len(json_objcts) != 145: + assert False \ No newline at end of file From 72bd8130e66c7daeadb4e1c467f23a3075d79de0 Mon Sep 17 00:00:00 2001 From: danellecline Date: Wed, 28 Feb 2024 10:27:58 -0800 Subject: [PATCH 05/10] some refactoring and addition of nrs data. working nrs pytests --- requirements.txt | 1 + src/json_generator/corrector.py | 45 ++--- src/json_generator/gen_abstract.py | 41 +--- src/json_generator/gen_iclisten.py | 70 +++---- src/json_generator/gen_nrs.py | 191 ++++++++++++++++++ src/json_generator/gen_soundtrap.py | 86 ++++---- .../{wavfile.py => metadata_extractor.py} | 118 +++++++++-- src/json_generator/utils.py | 21 +- tests/test_json_generator.py | 73 ++++++- 9 files changed, 472 insertions(+), 174 deletions(-) create mode 100644 src/json_generator/gen_nrs.py rename src/json_generator/{wavfile.py => metadata_extractor.py} (54%) diff --git a/requirements.txt b/requirements.txt index c19ef65..65439c3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,7 @@ google-cloud-storage==2.14.0 dataclasses-json==0.6.3 python-dateutil==2.8.2 pyyaml==6.0.1 +progressbar2==3.53.1 marshmallow==3.20.2 # lifewatch-pypam # when published soundfile==0.12.1 diff --git a/src/json_generator/corrector.py b/src/json_generator/corrector.py index f4e1e9e..ad9f45b 100644 --- a/src/json_generator/corrector.py +++ b/src/json_generator/corrector.py @@ -1,6 +1,6 @@ # pypam-based-processing, Apache License 2.0 # Filename: metadata/utils/corrector.py -# Description: Correct metadata for wav files and saves the results to a json file. Results are optionally uploaded to S3. +# Description: Correct metadata for wav files and saves the results to a json file. import datetime from datetime import timedelta @@ -9,12 +9,8 @@ import pandas as pd from pathlib import Path import shutil -import boto3 import tempfile -import time -import re import json -from urllib.parse import urlparse from src import PbpLogger @@ -27,8 +23,8 @@ def __init__( correct_df: pd.DataFrame, json_path_out: str, day: datetime, - sound_trap: bool, - seconds_per_file: float): + variable_duration: bool = False, + seconds_per_file: float = -1): """ Correct the metadata for a day and save to a json file :param logger: @@ -39,15 +35,15 @@ def __init__( The path to save the corrected metadata json file :param day: The day to correct - :param sound_trap: - True if the files are from a sound trap + :param variable_duration: + True if the files vary in duration :param seconds_per_file: The number of seconds in each file; not used for sound trap files """ self.correct_df = correct_df self.json_base_dir = json_path_out self.day = day - self.sound_trap = sound_trap + self.variable_duration = variable_duration self.seconds_per_file = seconds_per_file self.log = logger @@ -56,15 +52,15 @@ def run(self): try: - # Soundtrap files can be variable - if self.sound_trap: + if self.variable_duration: files_per_day = None # Filter the metadata to the day, starting 6 hours before the day starts to capture overlap df = self.correct_df[(self.correct_df['start'] >= self.day - timedelta(hours=6)) & (self.correct_df['start'] < self.day + timedelta(days=1))] - else: # ICListen files fixed, but may be missing or incomplete if the system was down + else: # ICListen/NRS files fixed, but may be missing or incomplete if the system was down files_per_day = int(86400 / self.seconds_per_file) - # Filter the metadata to the day, starting 10 minutes before the day starts to capture overlap - df = self.correct_df[(self.correct_df['start'] >= self.day - timedelta(minutes=10)) & (self.correct_df['start'] < self.day + timedelta(days=1))] + minutes_per_file = int(1.1*self.seconds_per_file / 60) + # Filter the metadata to the day, starting 1 file before the day starts to capture overlap + df = self.correct_df[(self.correct_df['start'] >= self.day - timedelta(minutes=minutes_per_file)) & (self.correct_df['start'] < self.day + timedelta(days=1))] self.log.debug(f'Creating metadata for day {self.day}') @@ -88,7 +84,7 @@ def run(self): day_process = df - if self.sound_trap: + if self.variable_duration: self.log.info(f'Soundtrap files for {self.day} are variable. Skipping duration check') for index, row in day_process.iterrows(): self.log.debug(f'File {row["uri"]} duration {row["duration_secs"]} ') @@ -104,15 +100,15 @@ def run(self): # This is only reliable for full days of data contained in complete files day_process['jitter_secs'] = 0 - if self.sound_trap or \ - (len(day_process) == files_per_day + 1 \ - and len(day_process['duration_secs'].unique()) == 1 \ + if self.variable_duration or \ + (len(day_process) == files_per_day + 1 + and len(day_process['duration_secs'].unique()) == 1 and day_process.iloc[0]['duration_secs'] == self.seconds_per_file): self.log.info(f'{len(day_process)} files available for {self.day}') # check whether the differences are all the same - if len(day_process['start'].diff().unique()) == 1 or self.sound_trap: + if len(day_process['start'].diff().unique()) == 1 or self.variable_duration: self.log.warn(f'No drift for {self.day}') else: self.log.info(f'Correcting drift for {self.day}') @@ -135,7 +131,7 @@ def run(self): day_process.loc[index, 'end'] = end day_process.loc[index, 'jitter_secs'] = jitter - if self.sound_trap: + if self.variable_duration: end = row.end else: end = start + timedelta(seconds=self.seconds_per_file) @@ -148,7 +144,10 @@ def run(self): # drop any rows with duplicate uri times, keeping the first # duplicates can be caused by the jitter correction - day_process = day_process.drop_duplicates(subset=['uri'], keep='first') + if 'uri' in day_process.columns: + day_process = day_process.drop_duplicates(subset=['uri'], keep='first') + if 'url' in day_process.columns: + day_process = day_process.drop_duplicates(subset=['url'], keep='first') # save explicitly as UTC by setting the timezone in the start and end times day_process['start'] = day_process['start'].dt.tz_localize('UTC') @@ -159,7 +158,7 @@ def run(self): except Exception as e: self.log.exception(f'Error correcting metadata for {self.day}. {e}') finally: - self.log.debug(f'Done correcting metadata for {self.day}') + self.log.debug(f'Done correcting metadata for {self.day}. Saved to {self.json_base_dir}') def no_jitter( self, diff --git a/src/json_generator/gen_abstract.py b/src/json_generator/gen_abstract.py index 9c27650..d836e62 100644 --- a/src/json_generator/gen_abstract.py +++ b/src/json_generator/gen_abstract.py @@ -1,22 +1,15 @@ # pypam-based-processing # Filename: metadata/generator/gen_abstract.py # Description: Abstract class that captures sound wav metadata -import logging - -import re - from datetime import datetime - import pandas as pd - -from src.json_generator import utils -from src.logging_helper import PbpLogger, create_logger +from src.logging_helper import PbpLogger class MetadataGeneratorAbstract(object): def __init__(self, logger: PbpLogger, - wav_loc: str, + audio_loc: str, json_base_dir: str, search: [str], start: datetime, @@ -27,8 +20,8 @@ def __init__(self, Abstract class for capturing sound wav metadata :param logger: The logger - :param wav_loc: - The local directory or S3 bucket that contains the wav files + :param audio_loc: + The local directory or cloud bucket that contains the wav files :param json_base_dir: The local directory to write the json files to :param search: @@ -42,7 +35,7 @@ def __init__(self, :return: """ try: - self.wav_loc = wav_loc + self.audio_loc = audio_loc self.json_base_dir = json_base_dir self.df = pd.DataFrame() self.start = start @@ -53,30 +46,6 @@ def __init__(self, except Exception as e: raise e - def setup(self): - """ - Setup by first getting the bucket name and checking if it is an S3 bucket - :return: - """ - self.log.info( - f'{self.log_prefix} Searching in {self.wav_loc}/*.wav for wav files that match the search pattern {self.search}* ...') - - is_s3 = re.match(r'^s3://', self.wav_loc) - # the bucket name will optionally have a * at the end - # keep only the bucket name before the * - bucket_core = re.sub(r'\*$', '', self.wav_loc) - bucket_core = re.sub(r'^s3://', '', bucket_core) - return bucket_core, is_s3 - - @staticmethod - def raw(path_or_url: str): - w = utils.IcListenWavFile(path_or_url) - - if w.has_exception(): - return None # skip this file - - return w - @property def log(self): return self.logger diff --git a/src/json_generator/gen_iclisten.py b/src/json_generator/gen_iclisten.py index a830aa9..b415d25 100644 --- a/src/json_generator/gen_iclisten.py +++ b/src/json_generator/gen_iclisten.py @@ -12,18 +12,18 @@ from progressbar import progressbar import json_generator.utils as utils from json_generator.corrector import MetadataCorrector -from json_generator.wavfile import IcListenWavFile +from json_generator.metadata_extractor import IcListenWavFile from src import PbpLogger from src.json_generator.gen_abstract import MetadataGeneratorAbstract -class IcListenMetadataGenerator(MetadataGeneratorAbstract): +class IcListenMetadataGenerator(MetadataGeneratorAbstract): log_prefix = None - def __int__( + def __init__( self, pbp_logger: PbpLogger, - wav_loc: str, + audio_loc: str, json_base_dir: str, start: datetime, end: datetime, @@ -33,7 +33,7 @@ def __int__( Captures ICListen wav metadata in a pandas dataframe from either a local directory or S3 bucket. :param pbp_logger: The logger - :param wav_loc: + :param audio_loc: The local directory or S3 bucket that contains the wav files :param json_base_dir: The local directory to store the metadata @@ -47,19 +47,25 @@ def __int__( The number of seconds per file expected in a wav file to check for missing data. If 0, then no check is done. :return: """ - super().__init__(pbp_logger, wav_loc, json_base_dir, search, start, end, seconds_per_file) + super().__init__(pbp_logger, audio_loc, json_base_dir, search, start, end, seconds_per_file) self.log_prefix = f'{self.__class__.__name__} {start:%Y%m%d}' def run(self): self.log.info(f'Generating metadata for {self.start} to {self.end}...') - is_s3, bucket_name = utils.is_s3(self.wav_loc) + bucket_name, prefix, scheme = utils.parse_s3_or_gcp_url(self.audio_loc) + + # gs is not supported for icListen + if scheme == 'gs': + self.log.error(f'{self.log_prefix} GS is not supported for icListen audio files') + return # Run for each day in the range for day in pd.date_range(self.start, self.end, freq='D'): try: self.df = None - self.log.info(f'{self.log_prefix} Searching in {self.wav_loc}/*.wav for wav files that match the search pattern {self.search}* ...') + self.log.info( + f'{self.log_prefix} Searching in {self.audio_loc}/*.wav for wav files that match the search pattern {self.search}* ...') wav_files = [] @@ -90,6 +96,7 @@ def check_file(f: str, f_path_dt = datetime.strptime(f_path.stem, f'{s}_%Y%m%d_%H%M%S') if f_start_dt <= f_path_dt <= f_end_dt: + self.log.info(f'{self.log_prefix} Found {f_path.name} to process') wav_files.append(IcListenWavFile(f, f_path_dt)) f_wav_dt = f_path_dt except ValueError: @@ -98,24 +105,21 @@ def check_file(f: str, return f_wav_dt - if not is_s3: - wav_path = Path(self.wav_loc) - for filename in progressbar(sorted(wav_path.rglob('*.wav')), prefix='Searching : '): - check_file(filename, start_dt, end_dt) - else: - # if the wav_loc is a s3 url, then we need to list the files in buckets that cover the start and end - # dates - client = boto3.client('s3') - - # Set the start and end dates to 30 minutes before and after the start and end dates - start_dt = day - timedelta(hours=1) - end_dt = day + timedelta(days=1) + # Set the start and end dates to 30 minutes before and after the start and end dates + start_dt = day - timedelta(hours=1) + end_dt = day + timedelta(days=1) - # set the window to 3x the expected duration of the wav file to account for any missing data - minutes_window = int(self.seconds_per_file * 3 / 60) - start_dt_hour = start_dt - timedelta(minutes=minutes_window) - end_dt_hour = end_dt + timedelta(minutes=minutes_window) + # set the window to 3x the expected duration of the wav file to account for any missing data + minutes_window = int(self.seconds_per_file * 3 / 60) + start_dt_hour = start_dt - timedelta(minutes=minutes_window) + end_dt_hour = end_dt + timedelta(minutes=minutes_window) + if scheme == 'file': + wav_path = Path(self.audio_loc) + for filename in progressbar(sorted(wav_path.rglob('*.wav')), prefix='Searching : '): + check_file(filename.as_posix(), start_dt, end_dt) + if scheme == 's3': + client = boto3.client('s3') for day_hour in pd.date_range(start=start_dt, end=end_dt, freq='h'): bucket = f'{bucket_name}-{day_hour.year:04d}' @@ -168,7 +172,6 @@ def check_file(f: str, if __name__ == '__main__': import logging from src.logging_helper import PbpLogger, create_logger - from src.json_generator.gen_iclisten import IcListenMetadataGenerator log_dir = Path('tests/log') json_dir = Path('tests/json/mars') @@ -177,22 +180,21 @@ def check_file(f: str, logger = create_logger( log_filename_and_level=( - f"{log_dir}/test_soundtrap_metadata_generator.log", + f"{log_dir}/test_iclisten_metadata_generator.log", logging.INFO, ), console_level=logging.INFO, ) - start = datetime(2023, 7, 18, 0, 0, 0) end = datetime(2023, 7, 18, 0, 0, 0) # If only running one day, use a single generator - generator = IcListenMetadataGenerator(logger=logger, - wav_loc='s3://pacific-sound-256khz', - json_base_dir=json_dir.as_posix(), - search=['MARS'], - start=start, - end=end, - seconds_per_file=300) + generator = IcListenMetadataGenerator(pbp_logger=logger, + audio_loc='s3://pacific-sound-256khz', + json_base_dir=json_dir.as_posix(), + search=['MARS'], + start=start, + end=end, + seconds_per_file=300) generator.run() \ No newline at end of file diff --git a/src/json_generator/gen_nrs.py b/src/json_generator/gen_nrs.py new file mode 100644 index 0000000..0b94866 --- /dev/null +++ b/src/json_generator/gen_nrs.py @@ -0,0 +1,191 @@ +# pypam-based-processing, Apache License 2.0 +# Filename: metadata/generator/gen_nrs.py +# Description: Captures NRS flac metadata in a pandas dataframe from either a local directory or gs bucket. + +import re +from datetime import timedelta, datetime +import time +from datetime import datetime +from google.cloud import storage + +import pandas as pd +from pathlib import Path +from progressbar import progressbar +from json_generator.corrector import MetadataCorrector +from json_generator.metadata_extractor import FlacFile +from src import PbpLogger +from src.json_generator.gen_abstract import MetadataGeneratorAbstract +from src.json_generator.utils import parse_s3_or_gcp_url + + +class NRSMetadataGenerator(MetadataGeneratorAbstract): + + def __init__( + self, + pbp_logger: PbpLogger, + sound_loc: str, + json_base_dir: str, + start: datetime, + end: datetime, + search: [str], + seconds_per_file: float = 14400.0): + """ + Captures NRS audio metadata in a pandas dataframe from either a local directory or GS bucket. + :param pbp_logger: + The logger + :param sound_loc: + The local directory or GCP bucket that contains the audio files + :param json_base_dir: + The local directory to store the metadata + :param start: + The start date to search for flac files + :param end: + The end date to search for flac files + :param search: + The search pattern to match the flac files, e.g. 'MARS' for MARS_YYYYMMDD_HHMMSS.flac + :param seconds_per_file: + The number of seconds per file expected in a flac file to check for missing data. If 0, then no check is done. + :return: + """ + super().__init__(pbp_logger, sound_loc, json_base_dir, search, start, end, seconds_per_file) + + def run(self): + self.log.info(f'Generating metadata for {self.start} to {self.end}...') + + bucket, prefix, scheme = parse_s3_or_gcp_url(self.audio_loc) + + # S3 is not supported for NRS + if scheme == 's3': + self.log.error(f'S3 is not supported for NRS audio files') + return + + def parse_filename(f: str) -> datetime | None: + """ + Check if the file matches the search pattern and is within the start and end dates + :param f: + The path to the file + :return: The beginning recording time of the file + """ + f_path = Path(f) + f_flac_dt = None + + for s in self.search: + # see if the file is a regexp match to search + rc = re.search(s, f_path.stem) + + if rc and rc.group(0): + try: + # files are in the format NRS11_20191231_230836.flac' + # extract the timestamp from the file name into the format YYYYMMDDHHMMSS + f_parts = f_path.stem.split('_') + # If the last two digits of the timestamp are 60, subtract 1 second + if f_parts[2][-2:] == '60': + f_parts = f_parts[1] + f_parts[2] + # Make the last two digits 59 + f_parts = f_parts[:-2] + '59' + else: + f_parts = f_parts[1] + f_parts[2] + + f_path_dt = datetime.strptime(f_parts, '%Y%m%d%H%M%S') + return f_path_dt + except ValueError: + self.log.error(f'Could not parse {f_path.name}') + return None + + return f_flac_dt + + flac_files = [] + self.df = None + self.log.info( + f'Searching in {self.audio_loc}/ for files that match the search pattern {self.search}* ...') + + # set the window to 1 flac file to account for any missing data + minutes_window = int(self.seconds_per_file / 60) + + # set the start and end dates to 1 hour before and after the start and end dates + start_dt = self.start - timedelta(minutes=minutes_window) - timedelta(minutes=minutes_window) + end_dt = self.end + timedelta(days=1) + + if scheme == 'file' or scheme == '': + flac_path = Path(f'/{bucket}/{prefix}') + for filename in progressbar(sorted(flac_path.rglob('*.flac')), prefix='Searching : '): + flac_dt = parse_filename(filename) + if start_dt <= flac_dt <= end_dt: + self.log.info(f'Found file {filename} with timestamp {flac_dt}') + flac_files.append(FlacFile(filename, flac_dt)) + if scheme == 'gs': + client = storage.Client.create_anonymous_client() + bucket_obj = client.get_bucket(bucket) + + # get list of files - this is a generator + # data is organized in a flat filesystem, so there are no optimizations here for querying + blobs = bucket_obj.list_blobs(prefix=prefix) + for i, blob in enumerate(blobs): + self.log.info(f'Processing {blob.name}') + f_path = f'gs://{bucket}/{blob.name}' + flac_dt = parse_filename(f_path) + if start_dt <= flac_dt <= end_dt: + self.log.info(f'Found file {blob.name} with timestamp {flac_dt}') + flac_files.append(FlacFile(f_path, flac_dt)) + # delay to avoid 400 error + if i % 100 == 0: + self.log.info(f'{i} files processed') + time.sleep(1) + if flac_dt is None or flac_dt > start_dt or flac_dt < end_dt: + break + + self.log.info(f'Found {len(flac_files)} files to process that cover the period {start_dt} - {end_dt}') + + if len(flac_files) == 0: + return + + # sort the files by start time + flac_files.sort(key=lambda x: x.start) + + # correct each day in the range + for day in pd.date_range(self.start, self.end, freq='D'): + try: + # create a dataframe from the flac files + self.log.info(f'Creating dataframe from {len(flac_files)} ' + f'files spanning {flac_files[0].start} to {flac_files[-1].start} in self.json_base_dir...') + for wc in flac_files: + df_flac = wc.to_df() + + # concatenate the metadata to the dataframe + self.df = pd.concat([self.df, df_flac], axis=0) + + self.log.debug(f' Running metadata corrector for {day}') + corrector = MetadataCorrector(self.log, self.df, self.json_base_dir, day, False, self.seconds_per_file) + corrector.run() + + except Exception as ex: + self.log.exception(str(ex)) + + +if __name__ == '__main__': + import logging + from src.logging_helper import PbpLogger, create_logger + + log_dir = Path('tests/log') + json_dir = Path('tests/json/nrs') + log_dir.mkdir(exist_ok=True, parents=True) + json_dir.mkdir(exist_ok=True, parents=True) + + logger = create_logger( + log_filename_and_level=( + f"{log_dir}/test_nrs_metadata_generator.log", + logging.INFO, + ), + console_level=logging.INFO, + ) + + start = datetime(2019, 10, 24, 0, 0, 0) + end = datetime(2019, 10, 24, 0, 0, 0) + + generator = NRSMetadataGenerator(pbp_logger=logger, + sound_loc='gs://noaa-passive-bioacoustic/nrs/audio/11/nrs_11_2019-2021/audio', + json_base_dir=json_dir.as_posix(), + search=['NRS11'], + start=start, + end=end) + generator.run() \ No newline at end of file diff --git a/src/json_generator/gen_soundtrap.py b/src/json_generator/gen_soundtrap.py index bbe176d..9f78e3c 100644 --- a/src/json_generator/gen_soundtrap.py +++ b/src/json_generator/gen_soundtrap.py @@ -15,8 +15,9 @@ from src import PbpLogger from src.json_generator.gen_abstract import MetadataGeneratorAbstract -from src.json_generator.wavfile import SoundTrapWavFile +from src.json_generator.metadata_extractor import SoundTrapWavFile from src.json_generator.corrector import MetadataCorrector +from src.json_generator.utils import parse_s3_or_gcp_url class SoundTrapMetadataGenerator(MetadataGeneratorAbstract): @@ -28,20 +29,18 @@ class SoundTrapMetadataGenerator(MetadataGeneratorAbstract): start = datetime.now(pytz.utc) end = datetime.now(pytz.utc) - log_prefix = None - def __init__( self, - logger: PbpLogger, - wav_loc: str, + pbp_logger: PbpLogger, + audio_loc: str, json_base_dir: str, search: [str], start: datetime, end: datetime): """ - :param logger: + :param pbp_logger: The logger - :param wav_loc: + :param audio_loc: The local directory or S3 bucket that contains the wav files :param json_base_dir: The local directory to write the json files to @@ -55,56 +54,59 @@ def __init__( The number of seconds per file expected in a wav file to check for missing data. If missing, then no check is done. :return: """ - super().__init__(logger, wav_loc, json_base_dir, search, start, end, 0.) - - # Add a prefix to the log messages to differentiate between the different metadata generators running by date - # This is useful when running multiple metadata generators in parallel - self.log_prefix = f'{self.__class__.__name__} {self.start:%Y%m%d}' + super().__init__(pbp_logger, audio_loc, json_base_dir, search, start, end, 0.) def run(self): try: xml_cache_path = Path(self.json_base_dir) / 'xml_cache' xml_cache_path.mkdir(exist_ok=True, parents=True) wav_files = [] - bucket_core, is_s3 = self.setup() - def check_file(xml_file: str) -> bool: + self.log.info( + f'Searching in {self.audio_loc}/*.wav for wav files that match the search pattern {self.search}* ...') + + bucket, prefix, scheme = parse_s3_or_gcp_url(self.audio_loc) + # This does not work for GCS + if scheme == 'gs': + self.log.error(f'GS not supported for SoundTrap') + return + + def get_file_date(xml_file: str) -> datetime | None: """ - Check if the xml file is in the cache directory + Check if the xml file is in the search pattern and is within the start and end dates :param xml_file: The xml file with the metadata :return: - True if the file is within the start and end dates + Record starting datetime if the file is within the start and end dates; otherwise, return None """ - wav_files = [] - f_path = Path(xml_file) + xml_file = Path(xml_file) # see if the file is a regexp match to self.search for s in self.search: - rc = re.search(s, f_path.stem) + rc = re.search(s, xml_file.stem) if rc and rc.group(0): try: # If a SoundTrap file, then the date is in the filename XXXX.YYYYMMDDHHMMSS.xml - f_path_dt = datetime.strptime(f_path.stem.split('.')[1], '%y%m%d%H%M%S') + f_path_dt = datetime.strptime(xml_file.stem.split('.')[1], '%y%m%d%H%M%S') if self.start <= f_path_dt <= self.end: - return True + return f_path_dt except ValueError: - self.log.error(f'{self.log_prefix} Could not parse {f_path.name}') + self.log.error(f'Could not parse {xml_file.name}') + return None - if not is_s3: - wav_path = Path(self.wav_loc) + if scheme == 'file': + wav_path = Path(self.audio_loc) for filename in progressbar(sorted(wav_path.rglob('*.xml')), prefix='Searching : '): wav_path = filename.parent / f'{filename.stem}.wav' - if check_file(filename): - wav_files.append(SoundTrapWavFile(wav_path, filename)) + start_dt = get_file_date(filename) + if start_dt: + wav_files.append(SoundTrapWavFile(wav_path.as_posix(), filename, start_dt)) else: - # if the wav_loc is a s3 url, then we need to list the files in buckets that cover the start and end + # if the audio_loc is a s3 url, then we need to list the files in buckets that cover the start and end # dates - self.log.info(f'{self.log_prefix} Searching between {self.start} and {self.end}') + self.log.info(f'Searching between {self.start} and {self.end}') client = boto3.client('s3') - - bucket = f'{bucket_core}' paginator = client.get_paginator('list_objects') operation_parameters = {'Bucket': bucket} @@ -117,19 +119,21 @@ def check_file(xml_file: str) -> bool: for obj in page['Contents']: key = obj['Key'] - if '.xml' in key and check_file(key): + if '.xml' in key and get_file_date(key): xml_path = xml_cache_path / key wav_uri = f's3://{bucket}/{key}'.replace('log.xml', 'wav') # Check if the xml file is in the cache directory if not xml_path.exists(): # Download the xml file to a temporary directory - self.log.info(f'{self.log_prefix} Downloading {key} ...') + self.log.info(f'Downloading {key} ...') client.download_file(bucket, key, xml_path) - wav_files.append(SoundTrapWavFile(wav_uri, xml_path)) - self.log.info( - f'{self.log_prefix} Found {len(wav_files)} files to process that cover the period {self.start} - {self.end}') + start_dt = get_file_date(wav_uri) + if start_dt: + wav_files.append(SoundTrapWavFile(wav_uri, xml_path, start_dt)) + + self.log.info(f'Found {len(wav_files)} files to process that cover the period {self.start} - {self.end}') if len(wav_files) == 0: return @@ -139,7 +143,7 @@ def check_file(xml_file: str) -> bool: # create a dataframe from the wav files self.log.info( - f'{self.log_prefix} Creating dataframe from {len(wav_files)} files spanning {wav_files[0].start} to {wav_files[-1].start}...') + f'Creating dataframe from {len(wav_files)} files spanning {wav_files[0].start} to {wav_files[-1].start}...') for wc in wav_files: df_wav = wc.to_df() @@ -155,21 +159,21 @@ def check_file(xml_file: str) -> bool: days = (self.end - self.start).days + 1 if len(self.df) == 0: - self.log.info(f'{self.log_prefix} No data found between {self.start} and {self.end}') + self.log.info(f'No data found between {self.start} and {self.end}') return # Correct the metadata for each day for day in range(days): day_start = self.start + timedelta(days=day) - self.log.debug(f'{self.log_prefix} Running metadata corrector for {day_start}') - soundtrap = True - corrector = MetadataCorrector(self.log, self.df, self.json_base_dir, day_start, soundtrap, 0) + self.log.debug(f'Running metadata corrector for {day_start}') + variable_duration = True + corrector = MetadataCorrector(self.log, self.df, self.json_base_dir, day_start, variable_duration, 0) corrector.run() if __name__ == '__main__': from src.logging_helper import PbpLogger, create_logger - from generator import SoundTrapMetadataGenerator + log_dir = Path('tests/log') json_dir = Path('tests/json/soundtrap') log_dir.mkdir(exist_ok=True, parents=True) diff --git a/src/json_generator/wavfile.py b/src/json_generator/metadata_extractor.py similarity index 54% rename from src/json_generator/wavfile.py rename to src/json_generator/metadata_extractor.py index f12c227..09f9458 100755 --- a/src/json_generator/wavfile.py +++ b/src/json_generator/metadata_extractor.py @@ -2,7 +2,7 @@ # Filename: json_generator/wavfile.py # Description: wav file metadata reader. Supports SoundTrap and icListen wav files -from logging import exception, warning +from logging import exception, warning, debug from pathlib import Path import numpy as np @@ -13,15 +13,22 @@ import pandas as pd from datetime import datetime, timedelta import xml.etree.ElementTree as ET +from src.json_generator.utils import parse_s3_or_gcp_url -class WavFile: +class AudioFile: - # Abstract class for reading wav file metadata def __init__( self, path_or_url: str, start: datetime): + """ + Abstract class for reading wav file metadata + :param path_or_url: + The path or url to the wav file + :param start: + The start time of the wav file + """ self.start = start self.path_or_url = path_or_url @@ -30,7 +37,7 @@ def has_exception(self): def to_df(self): # if the self.path_or_url is a url, then add to the data frame with the appropriate prefix - if 's3://' in self.path_or_url: + if 's3://' in self.path_or_url or 'gs://' in self.path_or_url: df = pd.DataFrame({'uri': self.path_or_url, 'start': self.start, 'end': self.end, 'fs': self.fs, 'duration_secs': self.duration_secs, 'channels': self.channels, 'subtype': self.subtype, 'exception': self.exception}, @@ -46,15 +53,25 @@ def get_max_freq(self): return self.fs / 2 -class SoundTrapWavFile(WavFile): - """SoundTrapWavFile uses the metadata from the xml files, not the wav file itself """ +class SoundTrapWavFile(AudioFile): - def __init__( - self, - uri: str, - xml_file: str): + def __init__(self, path_or_url: str, xml_file: str, start: datetime): + """ + SoundTrapWavFile uses the metadata from the xml files, not the wav file itself + :param path_or_url: + The path or uri of the wav file + :param xml_file: + The uri of the xml file that contains the metadata + :param path_or_url: + + :param start: + """ + super().__init__(path_or_url, start) tree = ET.parse(xml_file) root = tree.getroot() + wav_start_dt = None + wav_stop_dt = None + sample_count = None # Iterate over the XML elements grabbing the needed metadata values for element in root.iter('WavFileHandler'): @@ -71,7 +88,11 @@ def __init__( if value: sample_count = int(value) - self.path_or_url = uri + # Error checking + if not wav_start_dt or not wav_stop_dt or not sample_count: + raise ValueError(f'Error reading {xml_file}. Missing metadata') + + self.path_or_url = path_or_url self.start = wav_start_dt self.end = wav_stop_dt self.duration_secs = sample_count / 48000 @@ -82,14 +103,12 @@ def __init__( self.exception = np.NAN # no exceptions for SoundTrap files -class IcListenWavFile(WavFile): +class IcListenWavFile(AudioFile): """IcListenWavFile uses the metadata from the wav file itself, but only grabs the needed metadata from the header in S3""" - def __init__( - self, - path_or_url: str, - start: datetime): + def __init__(self, path_or_url: str, start: datetime): + super().__init__(path_or_url, start) self.path_or_url = path_or_url self.start = start self.duration_secs = -1 @@ -138,4 +157,69 @@ def __init__( self.channels = info.channels self.subtype = info.subtype if info.subtype else '' except Exception as ex: - self.log.exception(f'Corrupt file {path_or_url}. {ex}') + exception(f'Corrupt file {path_or_url}. {ex}') + + +class FlacFile(AudioFile): + """FlacFile uses the metadata from the flac file itself, + but only grabs the needed metadata from the header in gs or local file system.""" + + def __init__(self, path_or_url: str, start: datetime): + super().__init__(path_or_url, start) + self.path_or_url = path_or_url + self.start = start + self.end = start + self.duration_secs = -1 + self.fs = -1 + self.frames = -1 + self.channels = -1 + self.subtype = '' + self.exception = np.NAN + self.path_or_url = path_or_url + + try: + # if the in_file is a gs url, then read the metadata + bucket, prefix, scheme = parse_s3_or_gcp_url(path_or_url) + if scheme == 'gs': + url = f'http://storage.googleapis.com/{bucket}/{prefix}' + + info = sf.info(io.BytesIO(urlopen(url).read(20_000)), verbose=True) + + # get the duration from the extra_info data field which stores the duration in total bytes + fields = info.extra_info.split(':') + debug('\n'.join(fields)) + sample_rate = int(fields[3].split('\n')[0]) + channels = int(fields[2].split('\n')[0]) + length_microseconds = int(info.frames * 1e6 / info.samplerate) + # get the file name from the url + file_name = url.split('/')[-1] + + # files are in the format NRS11_20191231_230836.flac' + # extract the timestamp from the file name + f = Path(file_name).stem.split('_') + # If the last two digits of the timestamp are 60, subtract 1 seconds + if f[2][-2:] == '60': + f = f[1] + f[2] + # Make the last two digits 59 + f = f[:-2] + '59' + else: + f = f[1] + f[2] + # convert the timestamp to a datetime object + timestamp = datetime.strptime(f, '%Y%m%d%H%M%S') + self.start = timestamp + self.end = self.start + timedelta(microseconds=length_microseconds) + self.duration_secs = int(length_microseconds / 1e6) + self.channels = channels + self.subtype = 'flac' + self.fs = sample_rate + self.frames = info.frames if info.frames else 0 + if scheme == 'file' or scheme == '': + info = sf.info(path_or_url) + self.duration_secs = int(length_microseconds / 1e6) + self.end = self.start + timedelta(microseconds=length_microseconds) + self.fs = info.samplerate + self.frames = info.frames + self.channels = info.channels + self.subtype = info.subtype if info.subtype else '' + except Exception as ex: + exception(f'Corrupt file {path_or_url}. {ex}') diff --git a/src/json_generator/utils.py b/src/json_generator/utils.py index 7fada7c..8924c0e 100644 --- a/src/json_generator/utils.py +++ b/src/json_generator/utils.py @@ -1,18 +1,15 @@ import re +from urllib.parse import urlparse -def is_s3(wav_loc: str) -> (bool, str): + +def parse_s3_or_gcp_url(url) -> (str, str, str): """ - Check if the wav_loc is a s3 bucket, and return the bucket name - :param wav_loc: - The wav_loc to check + Parse the S3, GS of local file url + :param url: :return: - A tuple of (is_s3, bucket_name) """ - - is_s3_match = re.match(r'^s3://', wav_loc) - # the bucket name will optionally have a * at the end - # keep only the bucket name before the * - bucket_core = re.sub(r'\*$', '', wav_loc) - bucket_name = re.sub(r'^s3://', '', bucket_core) - return is_s3_match, bucket_name \ No newline at end of file + parsed_url = urlparse(url) + bucket = parsed_url.netloc + prefix = parsed_url.path.lstrip('/') + return bucket, prefix, parsed_url.scheme diff --git a/tests/test_json_generator.py b/tests/test_json_generator.py index f33fb83..02ba336 100644 --- a/tests/test_json_generator.py +++ b/tests/test_json_generator.py @@ -1,3 +1,8 @@ +# pypam-based-processing +# Filename: tests/test_json_generator.py +# Description: Test fixtures for the json generator classes. +# Tests the ability to generate metadata for soundtrap, iclisten, and nrs recording files. + import json import boto3 @@ -10,13 +15,13 @@ from pathlib import Path -from json_generator.gen_iclisten import IcListenMetadataGenerator +from json_generator.gen_nrs import NRSMetadataGenerator from src.logging_helper import create_logger from src.json_generator.gen_soundtrap import SoundTrapMetadataGenerator from src.json_generator.gen_iclisten import IcListenMetadataGenerator -def get_account() -> str: +def get_aws_account() -> str | None: """ Get the account number associated with this user :return: @@ -35,11 +40,13 @@ def get_account() -> str: print(e) return None + # Check if an AWS account is configured by checking if it can access the model with the default credentials AWS_AVAILABLE = False -if get_account(): +if get_aws_account(): AWS_AVAILABLE = True + @pytest.mark.skipif(not AWS_AVAILABLE, reason="This test is excluded because it requires a valid AWS account") def test_soundtrap_json_generator(): @@ -64,8 +71,8 @@ def test_soundtrap_json_generator(): start = datetime(2023, 7, 18) end = datetime(2023, 7, 19) - gen = SoundTrapMetadataGenerator(logger=logger, - wav_loc='s3://pacific-sound-ch01', + gen = SoundTrapMetadataGenerator(pbp_logger=logger, + audio_loc='s3://pacific-sound-ch01', json_base_dir=json_dir.as_posix(), search=["7000"], start=start, @@ -78,14 +85,15 @@ def test_soundtrap_json_generator(): assert Path('tests/json/soundtrap/2023/20230718.json').exists() assert Path('tests/json/soundtrap/2023/20230719.json').exists() + @pytest.mark.skipif(not AWS_AVAILABLE, reason="This test is excluded because it requires a valid AWS account") def test_iclisten_json_generator(): """ Test fixture for IcListenMetadataGenerator. Tests the IcListenMetadataGenerator class ability to generate metadata for soundtrap recording files. - One files should be generated in the json directory for the date specified. Note this currently - only works for MBARI MARS data + One file should be generated in the json directory for the date specified. Note this currently + only works for MBARI MARS ICListen data :return: """ @@ -96,7 +104,7 @@ def test_iclisten_json_generator(): logger = create_logger( log_filename_and_level=( - f"{log_dir}/test_soundtrap_metadata_generator.log", + f"{log_dir}/test_mars_metadata_generator.log", logging.INFO, ), console_level=logging.INFO, @@ -106,8 +114,8 @@ def test_iclisten_json_generator(): end = datetime(2023, 7, 18, 0, 0, 0) # If only running one day, use a single generator - generator = IcListenMetadataGenerator(logger=logger, - wav_loc='s3://pacific-sound-256khz', + generator = IcListenMetadataGenerator(pbp_logger=logger, + audio_loc='s3://pacific-sound-256khz', json_base_dir=json_dir.as_posix(), search=['MARS'], start=start, @@ -123,4 +131,47 @@ def test_iclisten_json_generator(): with open('tests/json/mars/2023/20230718.json') as f: json_objcts = json.load(f) if len(json_objcts) != 145: - assert False \ No newline at end of file + assert False + + +def test_nrs_json_generator(): + """ + Test fixture for NRSMetadataGenerator. + Tests the NRSMetadataGenerator class ability to generate metadata for NRS recording files. + One files should be generated in the json directory for the date specified. + :return: + """ + log_dir = Path('tests/log') + json_dir = Path('tests/json/nrs') + log_dir.mkdir(exist_ok=True, parents=True) + json_dir.mkdir(exist_ok=True, parents=True) + + logger = create_logger( + log_filename_and_level=( + f"{log_dir}/test_nrs_metadata_generator.log", + logging.INFO, + ), + console_level=logging.INFO, + ) + + start = datetime(2019, 10, 24, 0, 0, 0) + end = datetime(2019, 10, 24, 0, 0, 0) + + generator = NRSMetadataGenerator(pbp_logger=logger, + sound_loc='gs://noaa-passive-bioacoustic/nrs/audio/11/nrs_11_2019-2021/audio', + json_base_dir=json_dir.as_posix(), + search=['NRS11'], + start=start, + end=end, + seconds_per_file=14400.0) + generator.run() + # There should be one files in the json directory named 20230718.json, and it should have 1 json objects + json_files = list(Path('tests/json/nrs/').rglob('*.json')) + assert len(json_files) == 1 + assert Path('tests/json/nrs/2019/20191024.json').exists() + + # Read the file and check the number of json objects + with open('tests/json/nrs/2019/20191024.json') as f: + json_objcts = json.load(f) + if len(json_objcts) != 1: + assert False From 4065ca78a1f5b2643f217b31a896f29e36b7a09c Mon Sep 17 00:00:00 2001 From: danellecline Date: Wed, 28 Feb 2024 12:51:43 -0800 Subject: [PATCH 06/10] fixed nrs exit logic --- src/json_generator/gen_nrs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/json_generator/gen_nrs.py b/src/json_generator/gen_nrs.py index 0b94866..0633d36 100644 --- a/src/json_generator/gen_nrs.py +++ b/src/json_generator/gen_nrs.py @@ -131,7 +131,7 @@ def parse_filename(f: str) -> datetime | None: if i % 100 == 0: self.log.info(f'{i} files processed') time.sleep(1) - if flac_dt is None or flac_dt > start_dt or flac_dt < end_dt: + if flac_dt > end_dt: break self.log.info(f'Found {len(flac_files)} files to process that cover the period {start_dt} - {end_dt}') @@ -180,7 +180,7 @@ def parse_filename(f: str) -> datetime | None: ) start = datetime(2019, 10, 24, 0, 0, 0) - end = datetime(2019, 10, 24, 0, 0, 0) + end = datetime(2019, 11, 1, 0, 0, 0) generator = NRSMetadataGenerator(pbp_logger=logger, sound_loc='gs://noaa-passive-bioacoustic/nrs/audio/11/nrs_11_2019-2021/audio', From ade245e6411f44d4aa5925f772c8b478b105b381 Mon Sep 17 00:00:00 2001 From: danellecline Date: Wed, 28 Feb 2024 17:20:27 -0800 Subject: [PATCH 07/10] added main entrypoint for JSON generation --- README.md | 4 +++- src/json_generator/corrector.py | 7 +++---- src/json_generator/gen_abstract.py | 6 +++--- src/json_generator/gen_iclisten.py | 20 ++++++++++---------- src/json_generator/gen_nrs.py | 18 +++++++++--------- src/json_generator/gen_soundtrap.py | 16 ++++++++-------- src/json_generator/metadata_extractor.py | 8 ++++++++ tests/test_json_generator.py | 12 ++++++------ 8 files changed, 50 insertions(+), 41 deletions(-) diff --git a/README.md b/README.md index 84395f0..34079b3 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,7 @@ to generate _hybrid millidecade band spectra_ for soundscape data. **Status**: Functional version, including support for S3-based cloud based processing. +- [x] JSON generation of timekeeping - [x] Timekeeping based on given JSON indicating start and duration of every available (recognized) sound file - [x] Audio file processing - [x] Frequency and psd array output @@ -39,7 +40,8 @@ TODO more details All parameters passed via environment variables, see source file. - `src/plot.py` - Plotting program: `python src/plot.py --help`. - + +- `src/main_json_generator.py` - Main CLI program to generate JSONS with audio metadata, run `python src/main_json_gen.py --help` for usage. ## Refs diff --git a/src/json_generator/corrector.py b/src/json_generator/corrector.py index ad9f45b..8035881 100644 --- a/src/json_generator/corrector.py +++ b/src/json_generator/corrector.py @@ -85,7 +85,7 @@ def run(self): day_process = df if self.variable_duration: - self.log.info(f'Soundtrap files for {self.day} are variable. Skipping duration check') + self.log.info(f'Files for {self.day} are variable. Skipping duration check') for index, row in day_process.iterrows(): self.log.debug(f'File {row["uri"]} duration {row["duration_secs"]} ') else: @@ -97,7 +97,7 @@ def run(self): # check whether there is a discrepancy between the number of seconds in the file and the number # of seconds in the metadata. If there is a discrepancy, then correct the metadata - # This is only reliable for full days of data contained in complete files + # This is only reliable for full days of data contained in complete files for IcListen data day_process['jitter_secs'] = 0 if self.variable_duration or \ @@ -105,8 +105,6 @@ def run(self): and len(day_process['duration_secs'].unique()) == 1 and day_process.iloc[0]['duration_secs'] == self.seconds_per_file): - self.log.info(f'{len(day_process)} files available for {self.day}') - # check whether the differences are all the same if len(day_process['start'].diff().unique()) == 1 or self.variable_duration: self.log.warn(f'No drift for {self.day}') @@ -238,3 +236,4 @@ def save_day( output_path = Path(self.json_base_dir, str(day.year)) output_path.mkdir(parents=True, exist_ok=True) shutil.copy2(temp_metadata.as_posix(), output_path) + self.log.info(f'Wrote {output_path}/{temp_metadata.name}') diff --git a/src/json_generator/gen_abstract.py b/src/json_generator/gen_abstract.py index d836e62..abdca2b 100644 --- a/src/json_generator/gen_abstract.py +++ b/src/json_generator/gen_abstract.py @@ -11,7 +11,7 @@ def __init__(self, logger: PbpLogger, audio_loc: str, json_base_dir: str, - search: [str], + prefix: [str], start: datetime, end: datetime, seconds_per_file: float = 0., @@ -24,7 +24,7 @@ def __init__(self, The local directory or cloud bucket that contains the wav files :param json_base_dir: The local directory to write the json files to - :param search: + :param prefix: The search pattern to match the wav files, e.g. 'MARS' :param start: The start date to search for wav files @@ -40,7 +40,7 @@ def __init__(self, self.df = pd.DataFrame() self.start = start self.end = end - self.search = search + self.prefix = prefix self._seconds_per_file = None if seconds_per_file == 0 else seconds_per_file self.logger = logger except Exception as e: diff --git a/src/json_generator/gen_iclisten.py b/src/json_generator/gen_iclisten.py index b415d25..59b90cc 100644 --- a/src/json_generator/gen_iclisten.py +++ b/src/json_generator/gen_iclisten.py @@ -23,17 +23,17 @@ class IcListenMetadataGenerator(MetadataGeneratorAbstract): def __init__( self, pbp_logger: PbpLogger, - audio_loc: str, + uri: str, json_base_dir: str, start: datetime, end: datetime, - search: [str], - seconds_per_file: float): + prefix: [str], + seconds_per_file: float = 300.): """ Captures ICListen wav metadata in a pandas dataframe from either a local directory or S3 bucket. :param pbp_logger: The logger - :param audio_loc: + :param uri: The local directory or S3 bucket that contains the wav files :param json_base_dir: The local directory to store the metadata @@ -41,13 +41,13 @@ def __init__( The start date to search for wav files :param end: The end date to search for wav files - :param search: + :param prefix: The search pattern to match the wav files, e.g. 'MARS' for MARS_YYYYMMDD_HHMMSS.wav :param seconds_per_file: The number of seconds per file expected in a wav file to check for missing data. If 0, then no check is done. :return: """ - super().__init__(pbp_logger, audio_loc, json_base_dir, search, start, end, seconds_per_file) + super().__init__(pbp_logger, uri, json_base_dir, prefix, start, end, seconds_per_file) self.log_prefix = f'{self.__class__.__name__} {start:%Y%m%d}' def run(self): @@ -65,7 +65,7 @@ def run(self): try: self.df = None self.log.info( - f'{self.log_prefix} Searching in {self.audio_loc}/*.wav for wav files that match the search pattern {self.search}* ...') + f'{self.log_prefix} Searching in {self.audio_loc}/*.wav for wav files that match the search pattern {self.prefix}* ...') wav_files = [] @@ -86,7 +86,7 @@ def check_file(f: str, f_path = Path(f) f_wav_dt = None - for s in self.search: + for s in self.prefix: # see if the file is a regexp match to search rc = re.search(s, f_path.stem) @@ -191,9 +191,9 @@ def check_file(f: str, # If only running one day, use a single generator generator = IcListenMetadataGenerator(pbp_logger=logger, - audio_loc='s3://pacific-sound-256khz', + uri='s3://pacific-sound-256khz', json_base_dir=json_dir.as_posix(), - search=['MARS'], + prefix=['MARS'], start=start, end=end, seconds_per_file=300) diff --git a/src/json_generator/gen_nrs.py b/src/json_generator/gen_nrs.py index 0633d36..1aa2bf7 100644 --- a/src/json_generator/gen_nrs.py +++ b/src/json_generator/gen_nrs.py @@ -23,17 +23,17 @@ class NRSMetadataGenerator(MetadataGeneratorAbstract): def __init__( self, pbp_logger: PbpLogger, - sound_loc: str, + uri: str, json_base_dir: str, start: datetime, end: datetime, - search: [str], + prefix: [str], seconds_per_file: float = 14400.0): """ Captures NRS audio metadata in a pandas dataframe from either a local directory or GS bucket. :param pbp_logger: The logger - :param sound_loc: + :param uri: The local directory or GCP bucket that contains the audio files :param json_base_dir: The local directory to store the metadata @@ -41,13 +41,13 @@ def __init__( The start date to search for flac files :param end: The end date to search for flac files - :param search: + :param prefix: The search pattern to match the flac files, e.g. 'MARS' for MARS_YYYYMMDD_HHMMSS.flac :param seconds_per_file: The number of seconds per file expected in a flac file to check for missing data. If 0, then no check is done. :return: """ - super().__init__(pbp_logger, sound_loc, json_base_dir, search, start, end, seconds_per_file) + super().__init__(pbp_logger, uri, json_base_dir, prefix, start, end, seconds_per_file) def run(self): self.log.info(f'Generating metadata for {self.start} to {self.end}...') @@ -69,7 +69,7 @@ def parse_filename(f: str) -> datetime | None: f_path = Path(f) f_flac_dt = None - for s in self.search: + for s in self.prefix: # see if the file is a regexp match to search rc = re.search(s, f_path.stem) @@ -97,7 +97,7 @@ def parse_filename(f: str) -> datetime | None: flac_files = [] self.df = None self.log.info( - f'Searching in {self.audio_loc}/ for files that match the search pattern {self.search}* ...') + f'Searching in {self.audio_loc}/ for files that match the search pattern {self.prefix}* ...') # set the window to 1 flac file to account for any missing data minutes_window = int(self.seconds_per_file / 60) @@ -183,9 +183,9 @@ def parse_filename(f: str) -> datetime | None: end = datetime(2019, 11, 1, 0, 0, 0) generator = NRSMetadataGenerator(pbp_logger=logger, - sound_loc='gs://noaa-passive-bioacoustic/nrs/audio/11/nrs_11_2019-2021/audio', + uri='gs://noaa-passive-bioacoustic/nrs/audio/11/nrs_11_2019-2021/audio', json_base_dir=json_dir.as_posix(), - search=['NRS11'], + prefix=['NRS11'], start=start, end=end) generator.run() \ No newline at end of file diff --git a/src/json_generator/gen_soundtrap.py b/src/json_generator/gen_soundtrap.py index 9f78e3c..cbdff2a 100644 --- a/src/json_generator/gen_soundtrap.py +++ b/src/json_generator/gen_soundtrap.py @@ -32,19 +32,19 @@ class SoundTrapMetadataGenerator(MetadataGeneratorAbstract): def __init__( self, pbp_logger: PbpLogger, - audio_loc: str, + uri: str, json_base_dir: str, - search: [str], + prefix: [str], start: datetime, end: datetime): """ :param pbp_logger: The logger - :param audio_loc: + :param uri: The local directory or S3 bucket that contains the wav files :param json_base_dir: The local directory to write the json files to - :param search: + :param prefix: The search pattern to match the wav files, e.g. 'MARS' :param start: The start date to search for wav files @@ -54,7 +54,7 @@ def __init__( The number of seconds per file expected in a wav file to check for missing data. If missing, then no check is done. :return: """ - super().__init__(pbp_logger, audio_loc, json_base_dir, search, start, end, 0.) + super().__init__(pbp_logger, uri, json_base_dir, prefix, start, end, 0.) def run(self): try: @@ -63,7 +63,7 @@ def run(self): wav_files = [] self.log.info( - f'Searching in {self.audio_loc}/*.wav for wav files that match the search pattern {self.search}* ...') + f'Searching in {self.audio_loc}/*.wav for wav files that match the prefix {self.prefix}* ...') bucket, prefix, scheme = parse_s3_or_gcp_url(self.audio_loc) # This does not work for GCS @@ -80,8 +80,8 @@ def get_file_date(xml_file: str) -> datetime | None: Record starting datetime if the file is within the start and end dates; otherwise, return None """ xml_file = Path(xml_file) - # see if the file is a regexp match to self.search - for s in self.search: + # see if the file is a regexp match to self.prefix + for s in self.prefix: rc = re.search(s, xml_file.stem) if rc and rc.group(0): diff --git a/src/json_generator/metadata_extractor.py b/src/json_generator/metadata_extractor.py index 09f9458..ca91ee4 100755 --- a/src/json_generator/metadata_extractor.py +++ b/src/json_generator/metadata_extractor.py @@ -31,6 +31,13 @@ def __init__( """ self.start = start self.path_or_url = path_or_url + self.end = start + self.duration_secs = -1 + self.fs = -1 + self.frames = -1 + self.channels = -1 + self.subtype = '' + self.exception = np.NAN def has_exception(self): return True if len(self.exception) > 0 else False @@ -215,6 +222,7 @@ def __init__(self, path_or_url: str, start: datetime): self.frames = info.frames if info.frames else 0 if scheme == 'file' or scheme == '': info = sf.info(path_or_url) + length_microseconds = int(info.frames * 1e6 / info.samplerate) self.duration_secs = int(length_microseconds / 1e6) self.end = self.start + timedelta(microseconds=length_microseconds) self.fs = info.samplerate diff --git a/tests/test_json_generator.py b/tests/test_json_generator.py index 02ba336..d03230a 100644 --- a/tests/test_json_generator.py +++ b/tests/test_json_generator.py @@ -72,9 +72,9 @@ def test_soundtrap_json_generator(): start = datetime(2023, 7, 18) end = datetime(2023, 7, 19) gen = SoundTrapMetadataGenerator(pbp_logger=logger, - audio_loc='s3://pacific-sound-ch01', + uri='s3://pacific-sound-ch01', json_base_dir=json_dir.as_posix(), - search=["7000"], + prefix=["7000"], start=start, end=end) gen.run() @@ -115,9 +115,9 @@ def test_iclisten_json_generator(): # If only running one day, use a single generator generator = IcListenMetadataGenerator(pbp_logger=logger, - audio_loc='s3://pacific-sound-256khz', + uri='s3://pacific-sound-256khz', json_base_dir=json_dir.as_posix(), - search=['MARS'], + prefix=['MARS'], start=start, end=end, seconds_per_file=300) @@ -158,9 +158,9 @@ def test_nrs_json_generator(): end = datetime(2019, 10, 24, 0, 0, 0) generator = NRSMetadataGenerator(pbp_logger=logger, - sound_loc='gs://noaa-passive-bioacoustic/nrs/audio/11/nrs_11_2019-2021/audio', + uri='gs://noaa-passive-bioacoustic/nrs/audio/11/nrs_11_2019-2021/audio', json_base_dir=json_dir.as_posix(), - search=['NRS11'], + prefix=['NRS11'], start=start, end=end, seconds_per_file=14400.0) From 50d52402162bc1c22fe52be6c7e460c0b96ec35d Mon Sep 17 00:00:00 2001 From: danellecline Date: Wed, 28 Feb 2024 17:37:43 -0800 Subject: [PATCH 08/10] fixed import paths and nrs pytest --- src/json_generator/gen_iclisten.py | 6 +++--- src/json_generator/gen_nrs.py | 4 ++-- tests/test_json_generator.py | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/json_generator/gen_iclisten.py b/src/json_generator/gen_iclisten.py index 59b90cc..27ddf76 100644 --- a/src/json_generator/gen_iclisten.py +++ b/src/json_generator/gen_iclisten.py @@ -10,9 +10,9 @@ import pandas as pd from pathlib import Path from progressbar import progressbar -import json_generator.utils as utils -from json_generator.corrector import MetadataCorrector -from json_generator.metadata_extractor import IcListenWavFile +import src.json_generator.utils as utils +from src.json_generator.corrector import MetadataCorrector +from src.json_generator.metadata_extractor import IcListenWavFile from src import PbpLogger from src.json_generator.gen_abstract import MetadataGeneratorAbstract diff --git a/src/json_generator/gen_nrs.py b/src/json_generator/gen_nrs.py index 1aa2bf7..31baa02 100644 --- a/src/json_generator/gen_nrs.py +++ b/src/json_generator/gen_nrs.py @@ -11,8 +11,8 @@ import pandas as pd from pathlib import Path from progressbar import progressbar -from json_generator.corrector import MetadataCorrector -from json_generator.metadata_extractor import FlacFile +from src.json_generator.corrector import MetadataCorrector +from src.json_generator.metadata_extractor import FlacFile from src import PbpLogger from src.json_generator.gen_abstract import MetadataGeneratorAbstract from src.json_generator.utils import parse_s3_or_gcp_url diff --git a/tests/test_json_generator.py b/tests/test_json_generator.py index d03230a..75f0354 100644 --- a/tests/test_json_generator.py +++ b/tests/test_json_generator.py @@ -15,7 +15,7 @@ from pathlib import Path -from json_generator.gen_nrs import NRSMetadataGenerator +from src.json_generator.gen_nrs import NRSMetadataGenerator from src.logging_helper import create_logger from src.json_generator.gen_soundtrap import SoundTrapMetadataGenerator from src.json_generator.gen_iclisten import IcListenMetadataGenerator @@ -165,7 +165,7 @@ def test_nrs_json_generator(): end=end, seconds_per_file=14400.0) generator.run() - # There should be one files in the json directory named 20230718.json, and it should have 1 json objects + # There should be one files in the json directory named 20230718.json, and it should have 7 json objects json_files = list(Path('tests/json/nrs/').rglob('*.json')) assert len(json_files) == 1 assert Path('tests/json/nrs/2019/20191024.json').exists() @@ -173,5 +173,5 @@ def test_nrs_json_generator(): # Read the file and check the number of json objects with open('tests/json/nrs/2019/20191024.json') as f: json_objcts = json.load(f) - if len(json_objcts) != 1: + if len(json_objcts) != 7: assert False From 69e08e0e3924f169e43fae31de57650c77e17667 Mon Sep 17 00:00:00 2001 From: danellecline Date: Wed, 28 Feb 2024 17:40:10 -0800 Subject: [PATCH 09/10] fixed TypeError --- tests/test_json_generator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_json_generator.py b/tests/test_json_generator.py index 75f0354..55f6dbe 100644 --- a/tests/test_json_generator.py +++ b/tests/test_json_generator.py @@ -21,7 +21,7 @@ from src.json_generator.gen_iclisten import IcListenMetadataGenerator -def get_aws_account() -> str | None: +def get_aws_account() -> str: """ Get the account number associated with this user :return: From 92141d522af0780a67e957f60ac1eaf59bfee687 Mon Sep 17 00:00:00 2001 From: danellecline Date: Wed, 28 Feb 2024 17:42:21 -0800 Subject: [PATCH 10/10] fixed TypeError --- src/json_generator/gen_nrs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/json_generator/gen_nrs.py b/src/json_generator/gen_nrs.py index 31baa02..2a764ac 100644 --- a/src/json_generator/gen_nrs.py +++ b/src/json_generator/gen_nrs.py @@ -59,7 +59,7 @@ def run(self): self.log.error(f'S3 is not supported for NRS audio files') return - def parse_filename(f: str) -> datetime | None: + def parse_filename(f: str) -> datetime: """ Check if the file matches the search pattern and is within the start and end dates :param f: