diff --git a/README.md b/README.md index 2de0c00..8c486b1 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,7 @@ to generate _hybrid millidecade band spectra_ for soundscape data. **Status**: Functional version, including support for S3-based cloud based processing. +- [x] JSON generation of timekeeping - [x] Timekeeping based on given JSON indicating start and duration of every available (recognized) sound file - [x] Audio file processing - [x] Frequency and psd array output @@ -38,7 +39,8 @@ TODO more details All parameters passed via environment variables, see source file. - `src/plot.py` - Plotting program: `python src/plot.py --help`. - + +- `src/main_json_generator.py` - Main CLI program to generate JSONS with audio metadata, run `python src/main_json_gen.py --help` for usage. ## Refs diff --git a/requirements.txt b/requirements.txt index 7bda771..5eb4d9d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,8 +5,10 @@ google-cloud-storage==2.14.0 dataclasses-json==0.6.4 python-dateutil==2.8.2 pyyaml==6.0.1 +progressbar2==3.53.1 marshmallow==3.20.2 soundfile==0.12.1 +Pyarrow==15.0.0 # quickly tried it but got: AttributeError: module 'xarray_extras' has no attribute 'csv' # xarray-extras==0.5.0 diff --git a/src/json_generator/__init__.py b/src/json_generator/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/json_generator/corrector.py b/src/json_generator/corrector.py new file mode 100644 index 0000000..8035881 --- /dev/null +++ b/src/json_generator/corrector.py @@ -0,0 +1,239 @@ +# pypam-based-processing, Apache License 2.0 +# Filename: metadata/utils/corrector.py +# Description: Correct metadata for wav files and saves the results to a json file. + +import datetime +from datetime import timedelta + +import numpy as np +import pandas as pd +from pathlib import Path +import shutil +import tempfile +import json + +from src import PbpLogger + + +class MetadataCorrector: + + def __init__( + self, + logger: PbpLogger, + correct_df: pd.DataFrame, + json_path_out: str, + day: datetime, + variable_duration: bool = False, + seconds_per_file: float = -1): + """ + Correct the metadata for a day and save to a json file + :param logger: + The logger to use + :param correct_df: + The dataframe containing the metadata to correct + :param json_path_out: + The path to save the corrected metadata json file + :param day: + The day to correct + :param variable_duration: + True if the files vary in duration + :param seconds_per_file: + The number of seconds in each file; not used for sound trap files + """ + self.correct_df = correct_df + self.json_base_dir = json_path_out + self.day = day + self.variable_duration = variable_duration + self.seconds_per_file = seconds_per_file + self.log = logger + + def run(self): + """Run the corrector""" + + try: + + if self.variable_duration: + files_per_day = None + # Filter the metadata to the day, starting 6 hours before the day starts to capture overlap + df = self.correct_df[(self.correct_df['start'] >= self.day - timedelta(hours=6)) & (self.correct_df['start'] < self.day + timedelta(days=1))] + else: # ICListen/NRS files fixed, but may be missing or incomplete if the system was down + files_per_day = int(86400 / self.seconds_per_file) + minutes_per_file = int(1.1*self.seconds_per_file / 60) + # Filter the metadata to the day, starting 1 file before the day starts to capture overlap + df = self.correct_df[(self.correct_df['start'] >= self.day - timedelta(minutes=minutes_per_file)) & (self.correct_df['start'] < self.day + timedelta(days=1))] + + self.log.debug(f'Creating metadata for day {self.day}') + + if len(df) == 0: + self.log.warn(f'No metadata found for day {self.day}') + return + + # convert the start and end times to datetime + df = df.copy() + + df['start'] = pd.to_datetime(df['start']) + df['end'] = pd.to_datetime(df['end']) + + # get the file list that covers the requested day + self.log.info(f'Found {len(df)} files from day {self.day}, starting {df.iloc[0]["start"]} ending {df.iloc[-1]["end"]}') + + # if there are no files, then return + if len(df) == 0: + self.log.warn(f'No files found for {self.day}') + return + + day_process = df + + if self.variable_duration: + self.log.info(f'Files for {self.day} are variable. Skipping duration check') + for index, row in day_process.iterrows(): + self.log.debug(f'File {row["uri"]} duration {row["duration_secs"]} ') + else: + for index, row in day_process.iterrows(): + # if the duration_secs is not seconds per file, then the file is not complete + if row['duration_secs'] != self.seconds_per_file: + self.log.warn(f'File {row["duration_secs"]} != {self.seconds_per_file}. File is not complete') + continue + + # check whether there is a discrepancy between the number of seconds in the file and the number + # of seconds in the metadata. If there is a discrepancy, then correct the metadata + # This is only reliable for full days of data contained in complete files for IcListen data + day_process['jitter_secs'] = 0 + + if self.variable_duration or \ + (len(day_process) == files_per_day + 1 + and len(day_process['duration_secs'].unique()) == 1 + and day_process.iloc[0]['duration_secs'] == self.seconds_per_file): + + # check whether the differences are all the same + if len(day_process['start'].diff().unique()) == 1 or self.variable_duration: + self.log.warn(f'No drift for {self.day}') + else: + self.log.info(f'Correcting drift for {self.day}') + + # correct the metadata + jitter = 0 + start = day_process.iloc[0]['start'] + end = start + timedelta(seconds=self.seconds_per_file) + + for index, row in day_process.iterrows(): + # jitter is the difference between the expected start time and the actual start time + # jitter is 0 for the first file + if row.start == start: + # round the jitter to the nearest second + jitter = start.to_datetime64() - row.start.to_datetime64() + jitter = int(jitter / np.timedelta64(1, 's')) + + # correct the start and end times + day_process.loc[index, 'start'] = start + day_process.loc[index, 'end'] = end + day_process.loc[index, 'jitter_secs'] = jitter + + if self.variable_duration: + end = row.end + else: + end = start + timedelta(seconds=self.seconds_per_file) + # round the end time to the nearest second as the timestamp is only accurate to the second + end = end.replace(microsecond=0) + # set the times for the next files + start = end + else: + day_process = self.no_jitter(self.day, day_process) + + # drop any rows with duplicate uri times, keeping the first + # duplicates can be caused by the jitter correction + if 'uri' in day_process.columns: + day_process = day_process.drop_duplicates(subset=['uri'], keep='first') + if 'url' in day_process.columns: + day_process = day_process.drop_duplicates(subset=['url'], keep='first') + + # save explicitly as UTC by setting the timezone in the start and end times + day_process['start'] = day_process['start'].dt.tz_localize('UTC') + day_process['end'] = day_process['end'].dt.tz_localize('UTC') + + self.save_day(self.day, day_process) + + except Exception as e: + self.log.exception(f'Error correcting metadata for {self.day}. {e}') + finally: + self.log.debug(f'Done correcting metadata for {self.day}. Saved to {self.json_base_dir}') + + def no_jitter( + self, + day: datetime, + day_process: pd.DataFrame) -> pd.DataFrame: + """ + Set the jitter to 0 and calculate the end time from the start time and the duration + :param day: + The day being processed + :param day_process: + The dataframe to correct + :return: + The corrected dataframe + """ + self.log.warn(f'Cannot correct {self.day}. Using file start times as is, setting jitter to 0 and using ' + f'calculated end times.') + # calculate the difference between each row start time and save as diff in a copy of the dataframe + day_process = day_process.copy() + day_process['diff'] = day_process['start'].diff() + day_process['jitter_secs'] = 0 + # calculate the end time which is the start time plus the number of seconds in the file + day_process['end'] = day_process['start'] + pd.to_timedelta(day_process['duration_secs'], unit='s') + return day_process + + def save_day( + self, + day: datetime, + day_process: pd.DataFrame, + prefix: str = None): + """ + Save the day's metadata to a single json file either locally or to s3 + :param day: + The day to save + :param day_process: + The dataframe containing the metadata for the day + :param prefix: + An optional prefix for the filename + :return: + """ + # if the exception column is empty, then drop it + if day_process['exception'].isnull().all(): + day_process.drop(columns=['exception'], inplace=True) + else: + # replace the NaN with an empty string + day_process['exception'].fillna('', inplace=True) + + # drop the pcm, fs, subtype, etc. columns + day_process.drop(columns=['fs', 'subtype', 'jitter_secs'], inplace=True) + + # if there is a diff column, then drop it + if 'diff' in day_process.columns: + day_process.drop(columns=['diff'], inplace=True) + + # Save with second accuracy to a temporary file formatted with ISO date format + df_final = day_process.sort_values(by=['start']) + + with tempfile.TemporaryDirectory() as tmpdir: + + tmp_path = Path(tmpdir) + if prefix: + temp_metadata = tmp_path / f'{prefix}_{day:%Y%m%d}.json' + else: + temp_metadata = tmp_path / f'{day:%Y%m%d}.json' + + df_final.to_json(temp_metadata.as_posix(), orient='records', date_format='iso', date_unit='s') + self.log.debug(f'Wrote {temp_metadata.as_posix()}') + + # read the file back in using records format with json + with open(temp_metadata.as_posix(), 'r') as f: + dict_records = json.load(f) + + # write the file back out with indenting + with open(temp_metadata.as_posix(), 'w', encoding='utf-8') as f: + json.dump(dict_records, f, ensure_ascii=True, indent=4) + + # copy the file to a local metadata directory with year subdirectory + output_path = Path(self.json_base_dir, str(day.year)) + output_path.mkdir(parents=True, exist_ok=True) + shutil.copy2(temp_metadata.as_posix(), output_path) + self.log.info(f'Wrote {output_path}/{temp_metadata.name}') diff --git a/src/json_generator/gen_abstract.py b/src/json_generator/gen_abstract.py new file mode 100644 index 0000000..abdca2b --- /dev/null +++ b/src/json_generator/gen_abstract.py @@ -0,0 +1,64 @@ +# pypam-based-processing +# Filename: metadata/generator/gen_abstract.py +# Description: Abstract class that captures sound wav metadata +from datetime import datetime +import pandas as pd +from src.logging_helper import PbpLogger + + +class MetadataGeneratorAbstract(object): + def __init__(self, + logger: PbpLogger, + audio_loc: str, + json_base_dir: str, + prefix: [str], + start: datetime, + end: datetime, + seconds_per_file: float = 0., + **kwargs): + """ + Abstract class for capturing sound wav metadata + :param logger: + The logger + :param audio_loc: + The local directory or cloud bucket that contains the wav files + :param json_base_dir: + The local directory to write the json files to + :param prefix: + The search pattern to match the wav files, e.g. 'MARS' + :param start: + The start date to search for wav files + :param end: + The end date to search for wav files + :param seconds_per_file: + The number of seconds per file expected in a wav file to check for missing data. If missing, then no check is done. + :return: + """ + try: + self.audio_loc = audio_loc + self.json_base_dir = json_base_dir + self.df = pd.DataFrame() + self.start = start + self.end = end + self.prefix = prefix + self._seconds_per_file = None if seconds_per_file == 0 else seconds_per_file + self.logger = logger + except Exception as e: + raise e + + @property + def log(self): + return self.logger + + @property + def seconds_per_file(self): + return self._seconds_per_file + + @property + def correct_df(self): + return self.df + + # abstract run method + def run(self): + pass + diff --git a/src/json_generator/gen_iclisten.py b/src/json_generator/gen_iclisten.py new file mode 100644 index 0000000..27ddf76 --- /dev/null +++ b/src/json_generator/gen_iclisten.py @@ -0,0 +1,200 @@ +# pypam-based-processing, Apache License 2.0 +# Filename: metadata/generator/gen_iclisten.py +# Description: Captures ICListen wav metadata in a pandas dataframe from either a local directory or S3 bucket. + +import re +from datetime import timedelta +from datetime import datetime +import boto3 + +import pandas as pd +from pathlib import Path +from progressbar import progressbar +import src.json_generator.utils as utils +from src.json_generator.corrector import MetadataCorrector +from src.json_generator.metadata_extractor import IcListenWavFile +from src import PbpLogger +from src.json_generator.gen_abstract import MetadataGeneratorAbstract + + +class IcListenMetadataGenerator(MetadataGeneratorAbstract): + log_prefix = None + + def __init__( + self, + pbp_logger: PbpLogger, + uri: str, + json_base_dir: str, + start: datetime, + end: datetime, + prefix: [str], + seconds_per_file: float = 300.): + """ + Captures ICListen wav metadata in a pandas dataframe from either a local directory or S3 bucket. + :param pbp_logger: + The logger + :param uri: + The local directory or S3 bucket that contains the wav files + :param json_base_dir: + The local directory to store the metadata + :param start: + The start date to search for wav files + :param end: + The end date to search for wav files + :param prefix: + The search pattern to match the wav files, e.g. 'MARS' for MARS_YYYYMMDD_HHMMSS.wav + :param seconds_per_file: + The number of seconds per file expected in a wav file to check for missing data. If 0, then no check is done. + :return: + """ + super().__init__(pbp_logger, uri, json_base_dir, prefix, start, end, seconds_per_file) + self.log_prefix = f'{self.__class__.__name__} {start:%Y%m%d}' + + def run(self): + self.log.info(f'Generating metadata for {self.start} to {self.end}...') + + bucket_name, prefix, scheme = utils.parse_s3_or_gcp_url(self.audio_loc) + + # gs is not supported for icListen + if scheme == 'gs': + self.log.error(f'{self.log_prefix} GS is not supported for icListen audio files') + return + + # Run for each day in the range + for day in pd.date_range(self.start, self.end, freq='D'): + try: + self.df = None + self.log.info( + f'{self.log_prefix} Searching in {self.audio_loc}/*.wav for wav files that match the search pattern {self.prefix}* ...') + + wav_files = [] + + def check_file(f: str, + f_start_dt: datetime, + f_end_dt: datetime): + """ + Check if the file matches the search pattern and is within the start and end dates + :param f: + The path to the file + :param f_start_dt: + The start date to check + :param f_end_dt: + The end date to check + :return: + """ + + f_path = Path(f) + f_wav_dt = None + + for s in self.prefix: + # see if the file is a regexp match to search + rc = re.search(s, f_path.stem) + + if rc and rc.group(0): + try: + # MARS file date is in the filename MARS_YYYYMMDD_HHMMSS.wav + f_path_dt = datetime.strptime(f_path.stem, f'{s}_%Y%m%d_%H%M%S') + + if f_start_dt <= f_path_dt <= f_end_dt: + self.log.info(f'{self.log_prefix} Found {f_path.name} to process') + wav_files.append(IcListenWavFile(f, f_path_dt)) + f_wav_dt = f_path_dt + except ValueError: + self.log.error(f'{self.log_prefix} Could not parse {f_path.name}') + return None + + return f_wav_dt + + # Set the start and end dates to 30 minutes before and after the start and end dates + start_dt = day - timedelta(hours=1) + end_dt = day + timedelta(days=1) + + # set the window to 3x the expected duration of the wav file to account for any missing data + minutes_window = int(self.seconds_per_file * 3 / 60) + start_dt_hour = start_dt - timedelta(minutes=minutes_window) + end_dt_hour = end_dt + timedelta(minutes=minutes_window) + + if scheme == 'file': + wav_path = Path(self.audio_loc) + for filename in progressbar(sorted(wav_path.rglob('*.wav')), prefix='Searching : '): + check_file(filename.as_posix(), start_dt, end_dt) + if scheme == 's3': + client = boto3.client('s3') + for day_hour in pd.date_range(start=start_dt, end=end_dt, freq='h'): + + bucket = f'{bucket_name}-{day_hour.year:04d}' + prefix = f'{day_hour.month:02d}/MARS_{day_hour.year:04d}{day_hour.month:02d}{day_hour.day:02d}_{day_hour.hour:02d}' + paginator = client.get_paginator('list_objects') + + operation_parameters = {'Bucket': bucket, 'Prefix': prefix} + page_iterator = paginator.paginate(**operation_parameters) + self.log.info(f'{self.log_prefix} Searching in bucket: {bucket} prefix: {prefix}') + # list the objects in the bucket + # loop through the objects and check if they match the search pattern + num_found = 0 + for page in page_iterator: + if 'Contents' not in page: + self.log.info(f'{self.log_prefix} No data found in {bucket}') + break + + for obj in page['Contents']: + key = obj['Key'] + wav_dt = check_file(f's3://{bucket}/{key}', start_dt, end_dt) + if wav_dt is None: + continue + if wav_dt > end_dt_hour: + break + if wav_dt < start_dt_hour: + break + + self.log.info(f'{self.log_prefix} Found {len(wav_files)} files to process that cover the period {start_dt} - {end_dt}') + + # sort the files by start time + wav_files.sort(key=lambda x: x.start) + + # create a dataframe from the wav files + self.log.info( + f'{self.log_prefix} Creating dataframe from {len(wav_files)} files spanning {wav_files[0].start} to {wav_files[-1].start}...') + for wc in wav_files: + df_wav = wc.to_df() + + # concatenate the metadata to the dataframe + self.df = pd.concat([self.df, df_wav], axis=0) + + self.log.debug(f'{self.log_prefix} Running metadata corrector for {day}') + corrector = MetadataCorrector(self.log, self.df, self.json_base_dir, day, False, 600.) + corrector.run() + + except Exception as ex: + self.log.exception(str(ex)) + + +if __name__ == '__main__': + import logging + from src.logging_helper import PbpLogger, create_logger + + log_dir = Path('tests/log') + json_dir = Path('tests/json/mars') + log_dir.mkdir(exist_ok=True, parents=True) + json_dir.mkdir(exist_ok=True, parents=True) + + logger = create_logger( + log_filename_and_level=( + f"{log_dir}/test_iclisten_metadata_generator.log", + logging.INFO, + ), + console_level=logging.INFO, + ) + + start = datetime(2023, 7, 18, 0, 0, 0) + end = datetime(2023, 7, 18, 0, 0, 0) + + # If only running one day, use a single generator + generator = IcListenMetadataGenerator(pbp_logger=logger, + uri='s3://pacific-sound-256khz', + json_base_dir=json_dir.as_posix(), + prefix=['MARS'], + start=start, + end=end, + seconds_per_file=300) + generator.run() \ No newline at end of file diff --git a/src/json_generator/gen_nrs.py b/src/json_generator/gen_nrs.py new file mode 100644 index 0000000..2a764ac --- /dev/null +++ b/src/json_generator/gen_nrs.py @@ -0,0 +1,191 @@ +# pypam-based-processing, Apache License 2.0 +# Filename: metadata/generator/gen_nrs.py +# Description: Captures NRS flac metadata in a pandas dataframe from either a local directory or gs bucket. + +import re +from datetime import timedelta, datetime +import time +from datetime import datetime +from google.cloud import storage + +import pandas as pd +from pathlib import Path +from progressbar import progressbar +from src.json_generator.corrector import MetadataCorrector +from src.json_generator.metadata_extractor import FlacFile +from src import PbpLogger +from src.json_generator.gen_abstract import MetadataGeneratorAbstract +from src.json_generator.utils import parse_s3_or_gcp_url + + +class NRSMetadataGenerator(MetadataGeneratorAbstract): + + def __init__( + self, + pbp_logger: PbpLogger, + uri: str, + json_base_dir: str, + start: datetime, + end: datetime, + prefix: [str], + seconds_per_file: float = 14400.0): + """ + Captures NRS audio metadata in a pandas dataframe from either a local directory or GS bucket. + :param pbp_logger: + The logger + :param uri: + The local directory or GCP bucket that contains the audio files + :param json_base_dir: + The local directory to store the metadata + :param start: + The start date to search for flac files + :param end: + The end date to search for flac files + :param prefix: + The search pattern to match the flac files, e.g. 'MARS' for MARS_YYYYMMDD_HHMMSS.flac + :param seconds_per_file: + The number of seconds per file expected in a flac file to check for missing data. If 0, then no check is done. + :return: + """ + super().__init__(pbp_logger, uri, json_base_dir, prefix, start, end, seconds_per_file) + + def run(self): + self.log.info(f'Generating metadata for {self.start} to {self.end}...') + + bucket, prefix, scheme = parse_s3_or_gcp_url(self.audio_loc) + + # S3 is not supported for NRS + if scheme == 's3': + self.log.error(f'S3 is not supported for NRS audio files') + return + + def parse_filename(f: str) -> datetime: + """ + Check if the file matches the search pattern and is within the start and end dates + :param f: + The path to the file + :return: The beginning recording time of the file + """ + f_path = Path(f) + f_flac_dt = None + + for s in self.prefix: + # see if the file is a regexp match to search + rc = re.search(s, f_path.stem) + + if rc and rc.group(0): + try: + # files are in the format NRS11_20191231_230836.flac' + # extract the timestamp from the file name into the format YYYYMMDDHHMMSS + f_parts = f_path.stem.split('_') + # If the last two digits of the timestamp are 60, subtract 1 second + if f_parts[2][-2:] == '60': + f_parts = f_parts[1] + f_parts[2] + # Make the last two digits 59 + f_parts = f_parts[:-2] + '59' + else: + f_parts = f_parts[1] + f_parts[2] + + f_path_dt = datetime.strptime(f_parts, '%Y%m%d%H%M%S') + return f_path_dt + except ValueError: + self.log.error(f'Could not parse {f_path.name}') + return None + + return f_flac_dt + + flac_files = [] + self.df = None + self.log.info( + f'Searching in {self.audio_loc}/ for files that match the search pattern {self.prefix}* ...') + + # set the window to 1 flac file to account for any missing data + minutes_window = int(self.seconds_per_file / 60) + + # set the start and end dates to 1 hour before and after the start and end dates + start_dt = self.start - timedelta(minutes=minutes_window) - timedelta(minutes=minutes_window) + end_dt = self.end + timedelta(days=1) + + if scheme == 'file' or scheme == '': + flac_path = Path(f'/{bucket}/{prefix}') + for filename in progressbar(sorted(flac_path.rglob('*.flac')), prefix='Searching : '): + flac_dt = parse_filename(filename) + if start_dt <= flac_dt <= end_dt: + self.log.info(f'Found file {filename} with timestamp {flac_dt}') + flac_files.append(FlacFile(filename, flac_dt)) + if scheme == 'gs': + client = storage.Client.create_anonymous_client() + bucket_obj = client.get_bucket(bucket) + + # get list of files - this is a generator + # data is organized in a flat filesystem, so there are no optimizations here for querying + blobs = bucket_obj.list_blobs(prefix=prefix) + for i, blob in enumerate(blobs): + self.log.info(f'Processing {blob.name}') + f_path = f'gs://{bucket}/{blob.name}' + flac_dt = parse_filename(f_path) + if start_dt <= flac_dt <= end_dt: + self.log.info(f'Found file {blob.name} with timestamp {flac_dt}') + flac_files.append(FlacFile(f_path, flac_dt)) + # delay to avoid 400 error + if i % 100 == 0: + self.log.info(f'{i} files processed') + time.sleep(1) + if flac_dt > end_dt: + break + + self.log.info(f'Found {len(flac_files)} files to process that cover the period {start_dt} - {end_dt}') + + if len(flac_files) == 0: + return + + # sort the files by start time + flac_files.sort(key=lambda x: x.start) + + # correct each day in the range + for day in pd.date_range(self.start, self.end, freq='D'): + try: + # create a dataframe from the flac files + self.log.info(f'Creating dataframe from {len(flac_files)} ' + f'files spanning {flac_files[0].start} to {flac_files[-1].start} in self.json_base_dir...') + for wc in flac_files: + df_flac = wc.to_df() + + # concatenate the metadata to the dataframe + self.df = pd.concat([self.df, df_flac], axis=0) + + self.log.debug(f' Running metadata corrector for {day}') + corrector = MetadataCorrector(self.log, self.df, self.json_base_dir, day, False, self.seconds_per_file) + corrector.run() + + except Exception as ex: + self.log.exception(str(ex)) + + +if __name__ == '__main__': + import logging + from src.logging_helper import PbpLogger, create_logger + + log_dir = Path('tests/log') + json_dir = Path('tests/json/nrs') + log_dir.mkdir(exist_ok=True, parents=True) + json_dir.mkdir(exist_ok=True, parents=True) + + logger = create_logger( + log_filename_and_level=( + f"{log_dir}/test_nrs_metadata_generator.log", + logging.INFO, + ), + console_level=logging.INFO, + ) + + start = datetime(2019, 10, 24, 0, 0, 0) + end = datetime(2019, 11, 1, 0, 0, 0) + + generator = NRSMetadataGenerator(pbp_logger=logger, + uri='gs://noaa-passive-bioacoustic/nrs/audio/11/nrs_11_2019-2021/audio', + json_base_dir=json_dir.as_posix(), + prefix=['NRS11'], + start=start, + end=end) + generator.run() \ No newline at end of file diff --git a/src/json_generator/gen_soundtrap.py b/src/json_generator/gen_soundtrap.py new file mode 100644 index 0000000..cbdff2a --- /dev/null +++ b/src/json_generator/gen_soundtrap.py @@ -0,0 +1,197 @@ +# pypam-based-processing +# Filename: json_generator/gen_soundtrap.py +# Description: Captures SoundTrap metadata either from a local directory of S3 bucket +import logging + +import boto3 +import datetime +import pandas as pd +import re +import pytz + +from datetime import timedelta, datetime +from pathlib import Path +from progressbar import progressbar + +from src import PbpLogger +from src.json_generator.gen_abstract import MetadataGeneratorAbstract +from src.json_generator.metadata_extractor import SoundTrapWavFile +from src.json_generator.corrector import MetadataCorrector +from src.json_generator.utils import parse_s3_or_gcp_url + + +class SoundTrapMetadataGenerator(MetadataGeneratorAbstract): + """ + Captures SoundTrap wav file metadata either from a local directory or S3 bucket. + """ + + # Set the start and end dates to the current time in UTC + start = datetime.now(pytz.utc) + end = datetime.now(pytz.utc) + + def __init__( + self, + pbp_logger: PbpLogger, + uri: str, + json_base_dir: str, + prefix: [str], + start: datetime, + end: datetime): + """ + :param pbp_logger: + The logger + :param uri: + The local directory or S3 bucket that contains the wav files + :param json_base_dir: + The local directory to write the json files to + :param prefix: + The search pattern to match the wav files, e.g. 'MARS' + :param start: + The start date to search for wav files + :param end: + The end date to search for wav files + :param seconds_per_file: + The number of seconds per file expected in a wav file to check for missing data. If missing, then no check is done. + :return: + """ + super().__init__(pbp_logger, uri, json_base_dir, prefix, start, end, 0.) + + def run(self): + try: + xml_cache_path = Path(self.json_base_dir) / 'xml_cache' + xml_cache_path.mkdir(exist_ok=True, parents=True) + wav_files = [] + + self.log.info( + f'Searching in {self.audio_loc}/*.wav for wav files that match the prefix {self.prefix}* ...') + + bucket, prefix, scheme = parse_s3_or_gcp_url(self.audio_loc) + # This does not work for GCS + if scheme == 'gs': + self.log.error(f'GS not supported for SoundTrap') + return + + def get_file_date(xml_file: str) -> datetime | None: + """ + Check if the xml file is in the search pattern and is within the start and end dates + :param xml_file: + The xml file with the metadata + :return: + Record starting datetime if the file is within the start and end dates; otherwise, return None + """ + xml_file = Path(xml_file) + # see if the file is a regexp match to self.prefix + for s in self.prefix: + rc = re.search(s, xml_file.stem) + + if rc and rc.group(0): + try: + # If a SoundTrap file, then the date is in the filename XXXX.YYYYMMDDHHMMSS.xml + f_path_dt = datetime.strptime(xml_file.stem.split('.')[1], '%y%m%d%H%M%S') + if self.start <= f_path_dt <= self.end: + return f_path_dt + except ValueError: + self.log.error(f'Could not parse {xml_file.name}') + return None + + if scheme == 'file': + wav_path = Path(self.audio_loc) + for filename in progressbar(sorted(wav_path.rglob('*.xml')), prefix='Searching : '): + wav_path = filename.parent / f'{filename.stem}.wav' + start_dt = get_file_date(filename) + if start_dt: + wav_files.append(SoundTrapWavFile(wav_path.as_posix(), filename, start_dt)) + else: + # if the audio_loc is a s3 url, then we need to list the files in buckets that cover the start and end + # dates + self.log.info(f'Searching between {self.start} and {self.end}') + + client = boto3.client('s3') + paginator = client.get_paginator('list_objects') + + operation_parameters = {'Bucket': bucket} + page_iterator = paginator.paginate(**operation_parameters) + self.log.info( + f'Searching in bucket: {bucket} for .wav and .xml files between {self.start} and {self.end} ') + # list the objects in the bucket + # loop through the objects and check if they match the search pattern + for page in page_iterator: + for obj in page['Contents']: + key = obj['Key'] + + if '.xml' in key and get_file_date(key): + xml_path = xml_cache_path / key + wav_uri = f's3://{bucket}/{key}'.replace('log.xml', 'wav') + + # Check if the xml file is in the cache directory + if not xml_path.exists(): + # Download the xml file to a temporary directory + self.log.info(f'Downloading {key} ...') + client.download_file(bucket, key, xml_path) + + start_dt = get_file_date(wav_uri) + if start_dt: + wav_files.append(SoundTrapWavFile(wav_uri, xml_path, start_dt)) + + self.log.info(f'Found {len(wav_files)} files to process that cover the period {self.start} - {self.end}') + + if len(wav_files) == 0: + return + + # sort the files by start time + wav_files.sort(key=lambda x: x.start) + + # create a dataframe from the wav files + self.log.info( + f'Creating dataframe from {len(wav_files)} files spanning {wav_files[0].start} to {wav_files[-1].start}...') + for wc in wav_files: + df_wav = wc.to_df() + + # concatenate the metadata to the dataframe + self.df = pd.concat([self.df, df_wav], axis=0) + + # drop any rows with duplicate uris, keeping the first + self.df = self.df.drop_duplicates(subset=['uri'], keep='first') + + except Exception as ex: + self.log.exception(str(ex)) + finally: + days = (self.end - self.start).days + 1 + + if len(self.df) == 0: + self.log.info(f'No data found between {self.start} and {self.end}') + return + + # Correct the metadata for each day + for day in range(days): + day_start = self.start + timedelta(days=day) + self.log.debug(f'Running metadata corrector for {day_start}') + variable_duration = True + corrector = MetadataCorrector(self.log, self.df, self.json_base_dir, day_start, variable_duration, 0) + corrector.run() + + +if __name__ == '__main__': + from src.logging_helper import PbpLogger, create_logger + + log_dir = Path('tests/log') + json_dir = Path('tests/json/soundtrap') + log_dir.mkdir(exist_ok=True, parents=True) + json_dir.mkdir(exist_ok=True, parents=True) + + logger = create_logger( + log_filename_and_level=( + f"{log_dir}/test_soundtrap_metadata_generator.log", + logging.INFO, + ), + console_level=logging.INFO, + ) + + start = datetime(2023, 7, 18) + end = datetime(2023, 7, 19) + gen = SoundTrapMetadataGenerator(logger, + 's3://pacific-sound-ch01', + json_dir.as_posix(), + ["7000"], + start, end) + gen.run() diff --git a/src/json_generator/metadata_extractor.py b/src/json_generator/metadata_extractor.py new file mode 100755 index 0000000..ca91ee4 --- /dev/null +++ b/src/json_generator/metadata_extractor.py @@ -0,0 +1,233 @@ +# pypam-based-processing, Apache License 2.0 +# Filename: json_generator/wavfile.py +# Description: wav file metadata reader. Supports SoundTrap and icListen wav files + +from logging import exception, warning, debug +from pathlib import Path + +import numpy as np +from six.moves.urllib.request import urlopen +import io +import re +import soundfile as sf +import pandas as pd +from datetime import datetime, timedelta +import xml.etree.ElementTree as ET +from src.json_generator.utils import parse_s3_or_gcp_url + + +class AudioFile: + + def __init__( + self, + path_or_url: str, + start: datetime): + """ + Abstract class for reading wav file metadata + :param path_or_url: + The path or url to the wav file + :param start: + The start time of the wav file + """ + self.start = start + self.path_or_url = path_or_url + self.end = start + self.duration_secs = -1 + self.fs = -1 + self.frames = -1 + self.channels = -1 + self.subtype = '' + self.exception = np.NAN + + def has_exception(self): + return True if len(self.exception) > 0 else False + + def to_df(self): + # if the self.path_or_url is a url, then add to the data frame with the appropriate prefix + if 's3://' in self.path_or_url or 'gs://' in self.path_or_url: + df = pd.DataFrame({'uri': self.path_or_url, 'start': self.start, 'end': self.end, 'fs': self.fs, + 'duration_secs': self.duration_secs, 'channels': self.channels, + 'subtype': self.subtype, 'exception': self.exception}, + index=[self.start]) + else: + df = pd.DataFrame({'url': 'file://' + self.path_or_url, 'start': self.start, 'end': self.end, 'fs': self.fs, + 'duration_secs': self.duration_secs, 'channels': self.channels, + 'subtype': self.subtype, 'exception': self.exception}, + index=[self.start]) + return df + + def get_max_freq(self): + return self.fs / 2 + + +class SoundTrapWavFile(AudioFile): + + def __init__(self, path_or_url: str, xml_file: str, start: datetime): + """ + SoundTrapWavFile uses the metadata from the xml files, not the wav file itself + :param path_or_url: + The path or uri of the wav file + :param xml_file: + The uri of the xml file that contains the metadata + :param path_or_url: + + :param start: + """ + super().__init__(path_or_url, start) + tree = ET.parse(xml_file) + root = tree.getroot() + wav_start_dt = None + wav_stop_dt = None + sample_count = None + + # Iterate over the XML elements grabbing the needed metadata values + for element in root.iter('WavFileHandler'): + # Get the value of the id attribute + value = element.get('SamplingStartTimeUTC') + if value: + wav_start_dt = datetime.strptime(value, '%Y-%m-%dT%H:%M:%S') + + value = element.get('SamplingStopTimeUTC') + if value: + wav_stop_dt = datetime.strptime(value, '%Y-%m-%dT%H:%M:%S') + + value = element.get('SampleCount') + if value: + sample_count = int(value) + + # Error checking + if not wav_start_dt or not wav_stop_dt or not sample_count: + raise ValueError(f'Error reading {xml_file}. Missing metadata') + + self.path_or_url = path_or_url + self.start = wav_start_dt + self.end = wav_stop_dt + self.duration_secs = sample_count / 48000 + self.fs = 48000 + self.frames = sample_count + self.channels = 1 + self.subtype = 'SoundTrap' + self.exception = np.NAN # no exceptions for SoundTrap files + + +class IcListenWavFile(AudioFile): + """IcListenWavFile uses the metadata from the wav file itself, + but only grabs the needed metadata from the header in S3""" + + def __init__(self, path_or_url: str, start: datetime): + super().__init__(path_or_url, start) + self.path_or_url = path_or_url + self.start = start + self.duration_secs = -1 + self.fs = -1 + self.frames = -1 + self.channels = -1 + self.subtype = '' + self.exception = np.NAN + self.path_or_url = path_or_url + bytes_per_sec = 3 * 256e3 # 3 bytes per sample at 24-bit resolution and 256 kHz sampling rate + + try: + # if the in_file is a s3 url, then read the metadata from the s3 url + if re.match(r'^s3://', path_or_url): + p = Path(path_or_url) + bucket, key = p.parts[1], '/'.join(p.parts[2:]) + url = f'http://{bucket}.s3.amazonaws.com/{key}' + + # read the first 20,000 bytes of the file to get the metadata + info = sf.info(io.BytesIO(urlopen(url).read(20_000)), verbose=True) + # get the duration from the extra_info data field which stores the duration in total bytes + fields = info.extra_info.split() + idx = fields.index('data') + self.duration_secs = float(fields[idx + 2]) / bytes_per_sec + # get the size in bytes of the data+RIFF header + idx = fields.index('RIFF') + riff_size = int(fields[idx + 2]) + 8 + # get the content length from the http header + content_length = int(urlopen(url).info()['Content-Length']) + # if the content length is less than the size of the data+RIFF header, then the file is truncated but + # still may be usable + if content_length < riff_size: + self.exception = f'Truncated file {path_or_url}. Content length {content_length} < RIFF size {riff_size}' + # calculate the duration which is the size of the content length minus the size of the RIFF + # header which is 44 bytes. Round the duration to the nearest second since the recording is + # always in 1 second increments + self.duration_secs = round(content_length - 44) / bytes_per_sec + warning(self.exception) + else: + info = sf.info(path_or_url) + self.duration_secs = info.duration + + self.end = self.start + timedelta(microseconds=int(info.frames * 1e6 / info.samplerate)) + self.fs = info.samplerate + self.frames = info.frames + self.channels = info.channels + self.subtype = info.subtype if info.subtype else '' + except Exception as ex: + exception(f'Corrupt file {path_or_url}. {ex}') + + +class FlacFile(AudioFile): + """FlacFile uses the metadata from the flac file itself, + but only grabs the needed metadata from the header in gs or local file system.""" + + def __init__(self, path_or_url: str, start: datetime): + super().__init__(path_or_url, start) + self.path_or_url = path_or_url + self.start = start + self.end = start + self.duration_secs = -1 + self.fs = -1 + self.frames = -1 + self.channels = -1 + self.subtype = '' + self.exception = np.NAN + self.path_or_url = path_or_url + + try: + # if the in_file is a gs url, then read the metadata + bucket, prefix, scheme = parse_s3_or_gcp_url(path_or_url) + if scheme == 'gs': + url = f'http://storage.googleapis.com/{bucket}/{prefix}' + + info = sf.info(io.BytesIO(urlopen(url).read(20_000)), verbose=True) + + # get the duration from the extra_info data field which stores the duration in total bytes + fields = info.extra_info.split(':') + debug('\n'.join(fields)) + sample_rate = int(fields[3].split('\n')[0]) + channels = int(fields[2].split('\n')[0]) + length_microseconds = int(info.frames * 1e6 / info.samplerate) + # get the file name from the url + file_name = url.split('/')[-1] + + # files are in the format NRS11_20191231_230836.flac' + # extract the timestamp from the file name + f = Path(file_name).stem.split('_') + # If the last two digits of the timestamp are 60, subtract 1 seconds + if f[2][-2:] == '60': + f = f[1] + f[2] + # Make the last two digits 59 + f = f[:-2] + '59' + else: + f = f[1] + f[2] + # convert the timestamp to a datetime object + timestamp = datetime.strptime(f, '%Y%m%d%H%M%S') + self.start = timestamp + self.end = self.start + timedelta(microseconds=length_microseconds) + self.duration_secs = int(length_microseconds / 1e6) + self.channels = channels + self.subtype = 'flac' + self.fs = sample_rate + self.frames = info.frames if info.frames else 0 + if scheme == 'file' or scheme == '': + info = sf.info(path_or_url) + length_microseconds = int(info.frames * 1e6 / info.samplerate) + self.duration_secs = int(length_microseconds / 1e6) + self.end = self.start + timedelta(microseconds=length_microseconds) + self.fs = info.samplerate + self.frames = info.frames + self.channels = info.channels + self.subtype = info.subtype if info.subtype else '' + except Exception as ex: + exception(f'Corrupt file {path_or_url}. {ex}') diff --git a/src/json_generator/utils.py b/src/json_generator/utils.py new file mode 100644 index 0000000..8924c0e --- /dev/null +++ b/src/json_generator/utils.py @@ -0,0 +1,15 @@ +import re + +from urllib.parse import urlparse + + +def parse_s3_or_gcp_url(url) -> (str, str, str): + """ + Parse the S3, GS of local file url + :param url: + :return: + """ + parsed_url = urlparse(url) + bucket = parsed_url.netloc + prefix = parsed_url.path.lstrip('/') + return bucket, prefix, parsed_url.scheme diff --git a/src/logging_helper.py b/src/logging_helper.py index b4954c3..6b178d1 100644 --- a/src/logging_helper.py +++ b/src/logging_helper.py @@ -70,6 +70,8 @@ def warn(self, s: str): def error(self, s: str): self.logger.error(s) + def exception(self, s: str): + self.logger.exception(s) def create_logger( log_filename_and_level: Optional[Tuple[str, int]] = None, diff --git a/tests/test_json_generator.py b/tests/test_json_generator.py new file mode 100644 index 0000000..55f6dbe --- /dev/null +++ b/tests/test_json_generator.py @@ -0,0 +1,177 @@ +# pypam-based-processing +# Filename: tests/test_json_generator.py +# Description: Test fixtures for the json generator classes. +# Tests the ability to generate metadata for soundtrap, iclisten, and nrs recording files. + +import json + +import boto3 +import botocore +import pytest +from botocore.exceptions import ClientError +from datetime import datetime + +import logging + +from pathlib import Path + +from src.json_generator.gen_nrs import NRSMetadataGenerator +from src.logging_helper import create_logger +from src.json_generator.gen_soundtrap import SoundTrapMetadataGenerator +from src.json_generator.gen_iclisten import IcListenMetadataGenerator + + +def get_aws_account() -> str: + """ + Get the account number associated with this user + :return: + """ + try: + account_number = boto3.client('sts').get_caller_identity()['Account'] + print(f'Found account {account_number}') + return account_number + except ClientError as e: + print(e) + msg = f'Could not get account number from AWS. Check your config.ini file. ' \ + f'Account number is not set in the config.ini file and AWS credentials are not configured.' + print(msg) + return None + except botocore.exceptions.NoCredentialsError as e: + print(e) + return None + + +# Check if an AWS account is configured by checking if it can access the model with the default credentials +AWS_AVAILABLE = False +if get_aws_account(): + AWS_AVAILABLE = True + + +@pytest.mark.skipif(not AWS_AVAILABLE, + reason="This test is excluded because it requires a valid AWS account") +def test_soundtrap_json_generator(): + """ + Test fixture for SoundTrapMetadataGenerator. + Tests the SoundTrapMetadataGenerator class ability to generate metadata for soundtrap recording files. + Two files should be generated in the json directory for the dates specified. + :return: + """ + log_dir = Path('tests/log') + json_dir = Path('tests/json/soundtrap') + log_dir.mkdir(exist_ok=True, parents=True) + json_dir.mkdir(exist_ok=True, parents=True) + + logger = create_logger( + log_filename_and_level=( + f"{log_dir}/test_soundtrap_metadata_generator.log", + logging.INFO, + ), + console_level=logging.INFO, + ) + + start = datetime(2023, 7, 18) + end = datetime(2023, 7, 19) + gen = SoundTrapMetadataGenerator(pbp_logger=logger, + uri='s3://pacific-sound-ch01', + json_base_dir=json_dir.as_posix(), + prefix=["7000"], + start=start, + end=end) + gen.run() + + # There should be two files in the json directory named 20230718.json and 20230719.json + json_files = list(Path('tests/json/soundtrap').rglob('*.json')) + assert len(json_files) == 2 + assert Path('tests/json/soundtrap/2023/20230718.json').exists() + assert Path('tests/json/soundtrap/2023/20230719.json').exists() + + +@pytest.mark.skipif(not AWS_AVAILABLE, + reason="This test is excluded because it requires a valid AWS account") +def test_iclisten_json_generator(): + """ + Test fixture for IcListenMetadataGenerator. + Tests the IcListenMetadataGenerator class ability to generate metadata for soundtrap recording files. + One file should be generated in the json directory for the date specified. Note this currently + only works for MBARI MARS ICListen data + :return: + """ + + log_dir = Path('tests/log') + json_dir = Path('tests/json/mars') + log_dir.mkdir(exist_ok=True, parents=True) + json_dir.mkdir(exist_ok=True, parents=True) + + logger = create_logger( + log_filename_and_level=( + f"{log_dir}/test_mars_metadata_generator.log", + logging.INFO, + ), + console_level=logging.INFO, + ) + + start = datetime(2023, 7, 18, 0, 0, 0) + end = datetime(2023, 7, 18, 0, 0, 0) + + # If only running one day, use a single generator + generator = IcListenMetadataGenerator(pbp_logger=logger, + uri='s3://pacific-sound-256khz', + json_base_dir=json_dir.as_posix(), + prefix=['MARS'], + start=start, + end=end, + seconds_per_file=300) + generator.run() + # There should be one files in the json directory named 20230718.json and it should have 145 json objects + json_files = list(Path('tests/json/mars/').rglob('*.json')) + assert len(json_files) == 1 + assert Path('tests/json/mars/2023/20230718.json').exists() + + # Read the file and check the number of json objects + with open('tests/json/mars/2023/20230718.json') as f: + json_objcts = json.load(f) + if len(json_objcts) != 145: + assert False + + +def test_nrs_json_generator(): + """ + Test fixture for NRSMetadataGenerator. + Tests the NRSMetadataGenerator class ability to generate metadata for NRS recording files. + One files should be generated in the json directory for the date specified. + :return: + """ + log_dir = Path('tests/log') + json_dir = Path('tests/json/nrs') + log_dir.mkdir(exist_ok=True, parents=True) + json_dir.mkdir(exist_ok=True, parents=True) + + logger = create_logger( + log_filename_and_level=( + f"{log_dir}/test_nrs_metadata_generator.log", + logging.INFO, + ), + console_level=logging.INFO, + ) + + start = datetime(2019, 10, 24, 0, 0, 0) + end = datetime(2019, 10, 24, 0, 0, 0) + + generator = NRSMetadataGenerator(pbp_logger=logger, + uri='gs://noaa-passive-bioacoustic/nrs/audio/11/nrs_11_2019-2021/audio', + json_base_dir=json_dir.as_posix(), + prefix=['NRS11'], + start=start, + end=end, + seconds_per_file=14400.0) + generator.run() + # There should be one files in the json directory named 20230718.json, and it should have 7 json objects + json_files = list(Path('tests/json/nrs/').rglob('*.json')) + assert len(json_files) == 1 + assert Path('tests/json/nrs/2019/20191024.json').exists() + + # Read the file and check the number of json objects + with open('tests/json/nrs/2019/20191024.json') as f: + json_objcts = json.load(f) + if len(json_objcts) != 7: + assert False