Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor updates 2022 11 11 #1097

Open
wants to merge 19 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
bb8af30
Fixing an error with array types in dark monitor output, and making C…
york-stsci Nov 11, 2022
0df0013
Merge branch 'spacetelescope:develop' into monitor_updates_2022-11-11
york-stsci Nov 11, 2022
302d148
Merge branch 'spacetelescope:develop' into monitor_updates_2022-11-11
york-stsci Nov 11, 2022
86a67e4
Adding more logging to bad pixel monitor
york-stsci Nov 14, 2022
a78bb9d
Hopefully speeding up and including more data
york-stsci Nov 14, 2022
c5c357f
Merge branch 'monitor_updates_2022-11-11' of https://github.com/york-…
york-stsci Nov 14, 2022
2a6be15
changed line colour to hopefully stand out a bit better
york-stsci Nov 14, 2022
47ca5b7
Actually look for if all exts are done before doing a step, in case i…
york-stsci Nov 15, 2022
8d36b76
Added file name to hover tooltips
york-stsci Nov 15, 2022
ef3c1d6
More monitoring around quitting when all the outputs are there
york-stsci Nov 16, 2022
118908f
Trying again on finding output files
york-stsci Nov 16, 2022
605fedf
More logging
york-stsci Nov 17, 2022
8f41d9e
Don't assume files are not None
york-stsci Nov 17, 2022
cccdda0
Merge branch 'develop' into monitor_updates_2022-11-11
mfixstsci Nov 17, 2022
c7c19ea
Value debugging
york-stsci Nov 17, 2022
01aa442
Merge branch 'monitor_updates_2022-11-11' of https://github.com/york-…
york-stsci Nov 17, 2022
9638ecf
Fixing typo
york-stsci Nov 17, 2022
3d31dc2
Added readnoise check for existing files, and custom maximum group limit
york-stsci Nov 18, 2022
133521b
typo
york-stsci Nov 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 47 additions & 17 deletions jwql/instrument_monitors/common_monitors/bad_pixel_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,16 +781,18 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun
badpix_types_from_darks = ['HOT', 'RC', 'OTHER_BAD_PIXEL', 'TELEGRAPH']
illuminated_obstimes = []
if illuminated_raw_files:
logging.info("Found {} uncalibrated flat fields".format(len(illuminated_raw_files)))
badpix_types.extend(badpix_types_from_flats)
out_exts = defaultdict(lambda: ['jump', '0_ramp_fit'])
in_files = []
for uncal_file, rate_file in zip(illuminated_raw_files, illuminated_slope_files):
logging.info("\tChecking illuminated raw file {} with rate file {}".format(uncal_file, rate_file))
self.get_metadata(uncal_file)
if rate_file == 'None':
short_name = os.path.basename(uncal_file).replace('_uncal.fits', '')
local_uncal_file = os.path.join(self.data_dir, os.path.basename(uncal_file))
logging.info('Calling pipeline for {}'.format(uncal_file))
logging.info("Copying raw file to {}".format(self.data_dir))
logging.info('\t\tCalling pipeline for {}'.format(uncal_file))
logging.info("\t\tCopying uncal file to {}".format(self.data_dir))
copy_files([uncal_file], self.data_dir)
if hasattr(self, 'nints') and self.nints > 1:
out_exts[short_name] = ['jump', '1_ramp_fit']
Expand All @@ -801,30 +803,44 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun
if needs_calibration:
in_files.append(local_uncal_file)
else:
logging.info("Calibrated files already exist for {}".format(short_name))
outputs = run_parallel_pipeline(in_files, "uncal", out_exts, self.instrument, jump_pipe=True)
logging.info("\t\tCalibrated files already exist for {}".format(short_name))
else:
logging.info("\tRate file found for {}".format(uncal_file))

outputs = {}
if len(in_files) > 0:
logging.info("Running pipeline for {} files".format(len(in_files)))
outputs = run_parallel_pipeline(in_files, "uncal", out_exts, self.instrument, jump_pipe=True)

index = 0
logging.info("Checking files post-calibration")
for uncal_file, rate_file in zip(illuminated_raw_files, illuminated_slope_files):
logging.info("\tChecking files {}, {}".format(os.path.basename(uncal_file), os.path.basename(rate_file)))
local_uncal_file = os.path.join(self.data_dir, os.path.basename(uncal_file))
if local_uncal_file in outputs:
logging.info("\t\tAdding calibrated file.")
illuminated_slope_files[index] = deepcopy(outputs[local_uncal_file][1])
else:
logging.info("\t\tCalibration was skipped for file")
self.get_metadata(illuminated_raw_files[index])
local_ramp_file = local_uncal_file.replace("uncal", "0_ramp_fit")
if hasattr(self, 'nints') and self.nints > 1:
local_ramp_file = local_ramp_file.replace("0_ramp_fit", "1_ramp_fit")
if os.path.isfile(local_ramp_file):
illuminated_slope_files[index] = local_ramp_file
else:
illuminated_slope_files[index] = None
if not os.path.isfile(illuminated_slope_files[index]):
logging.info("\t\tLooking for local rate file")
local_rate_file = local_uncal_file.replace("uncal", "rateint")
if os.path.isfile(local_rate_file):
illuminated_slope_files[index] = local_rate_file
else:
logging.info("\tCould not find local rate file {}".format(local_rate_file))
illuminated_slope_files[index] = None
index += 1

# Get observation time for all files
illuminated_obstimes.append(instrument_properties.get_obstime(uncal_file))

logging.info("Trimming unfound files.")
index = 0
while index < len(illuminated_raw_files):
if illuminated_slope_files[index] is None or illuminated_slope_files[index] == 'None':
logging.info("\tRemoving {}".format(illuminated_raw_files[index]))
del illuminated_raw_files[index]
del illuminated_slope_files[index]
del illuminated_obstimes[index]
Expand All @@ -842,6 +858,7 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun
dark_fitopt_files = []
dark_obstimes = []
if dark_raw_files:
logging.info("Found {} uncalibrated darks".format(len(dark_raw_files)))
index = 0
badpix_types.extend(badpix_types_from_darks)
# In this case we need to run the pipeline on all input files,
Expand All @@ -850,17 +867,19 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun
in_files = []
out_exts = defaultdict(lambda: ['jump', 'fitopt', '0_ramp_fit'])
for uncal_file, rate_file in zip(dark_raw_files, dark_slope_files):
logging.info("Checking dark file {} with rate file {}".format(uncal_file, rate_file))
self.get_metadata(uncal_file)
logging.info('Calling pipeline for {} {}'.format(uncal_file, rate_file))
logging.info("Copying raw file to {}".format(self.data_dir))
copy_files([uncal_file], self.data_dir)
local_uncal_file = os.path.join(self.data_dir, os.path.basename(uncal_file))
short_name = os.path.basename(uncal_file).replace('_uncal.fits', '')
local_uncal_file = os.path.join(self.data_dir, os.path.basename(uncal_file))
if not os.path.isfile(local_uncal_file):
logging.info("\tCopying raw file to {}".format(self.data_dir))
copy_files([uncal_file], self.data_dir)
if hasattr(self, 'nints') and self.nints > 1:
out_exts[short_name] = ['jump', 'fitopt', '1_ramp_fit']
local_processed_files = [local_uncal_file.replace("uncal", x) for x in out_exts[short_name]]
calibrated_data = [os.path.isfile(x) for x in local_processed_files]
if not all(calibrated_data):
logging.info('\tCalling pipeline for {} {}'.format(uncal_file, rate_file))
in_files.append(local_uncal_file)
dark_jump_files.append(None)
dark_fitopt_files.append(None)
Expand All @@ -872,16 +891,25 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun
dark_slope_files[index] = deepcopy(local_processed_files[2])
dark_obstimes.append(instrument_properties.get_obstime(uncal_file))
index += 1
outputs = run_parallel_pipeline(in_files, "uncal", out_exts, self.instrument, jump_pipe=True)

outputs = {}
if len(in_files) > 0:
logging.info("Running pipeline for {} files".format(len(in_files)))
outputs = run_parallel_pipeline(in_files, "uncal", out_exts, self.instrument, jump_pipe=True)

index = 0
logging.info("Checking files post-calibration")
for uncal_file, rate_file in zip(dark_raw_files, dark_slope_files):
logging.info("\tChecking files {}, {}".format(uncal_file, rate_file))
local_uncal_file = os.path.join(self.data_dir, os.path.basename(uncal_file))
short_name = os.path.basename(uncal_file).replace('_uncal.fits', '')
if local_uncal_file in outputs:
logging.info("\t\tAdding calibrated files")
dark_jump_files[index] = outputs[local_uncal_file][0]
dark_fitopt_files[index] = outputs[local_uncal_file][1]
dark_slope_files[index] = deepcopy(outputs[local_uncal_file][2])
else:
logging.info("\t\tCalibration skipped for file")
self.get_metadata(local_uncal_file)
local_ramp_file = local_uncal_file.replace("uncal", "0_ramp_fit")
if hasattr(self, 'nints') and self.nints > 1:
Expand All @@ -893,10 +921,12 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun
if not os.path.isfile(local_ramp_file):
dark_slope_files[index] = None
index += 1

index = 0
logging.info("Trimming unfound files.")
while index < len(dark_raw_files):
if dark_jump_files[index] is None or dark_fitopt_files[index] is None or dark_slope_files[index] is None:
logging.info("\tRemoving {}".format(dark_raw_files[index]))
del dark_raw_files[index]
del dark_jump_files[index]
del dark_fitopt_files[index]
Expand Down
15 changes: 11 additions & 4 deletions jwql/instrument_monitors/common_monitors/readnoise_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,11 @@ def process(self, file_list):
files_to_calibrate.append(file)

# Run the files through the necessary pipeline steps
outputs = run_parallel_pipeline(files_to_calibrate, "uncal", "refpix", self.instrument)
# NOTE: Because the readnoise monitor only needs to get as far as the "refpix"
# pipeline stage, it doesn't have the incredibly increased run times that are
# otherwise associated with files with a lot of groups, so the readnoise
# monitor can pretty much calibrate anything as far as it goes.
outputs = run_parallel_pipeline(files_to_calibrate, "uncal", "refpix", self.instrument, max_groups=20000)

for filename in file_list:
logging.info('\tWorking on file: {}'.format(filename))
Expand Down Expand Up @@ -623,9 +627,12 @@ def run(self):
# Skip processing if the file doesnt have enough groups/ints to calculate the readnoise.
# MIRI needs extra since they omit the first five and last group before calculating the readnoise.
if total_cds_frames >= 10:
shutil.copy(uncal_filename, self.data_dir)
logging.info('\tCopied {} to {}'.format(uncal_filename, output_filename))
set_permissions(output_filename)
if os.path.isfile(os.path.join(self.data_dir, os.path.basename(uncal_filename))):
logging.info("\tRaw file already exists locally.")
else:
shutil.copy(uncal_filename, self.data_dir)
logging.info('\tCopied {} to {}'.format(uncal_filename, output_filename))
set_permissions(output_filename)
new_files.append(output_filename)
else:
logging.info('\tNot enough groups/ints to calculate readnoise in {}'.format(uncal_filename))
Expand Down
28 changes: 16 additions & 12 deletions jwql/shared_tasks/shared_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def collect_after_task(**kwargs):


@celery_app.task(name='jwql.shared_tasks.shared_tasks.run_calwebb_detector1')
def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument, step_args={}):
def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument, step_args={}, max_groups=1000):
"""Run the steps of ``calwebb_detector1`` on the input file, saving the result of each
step as a separate output file, then return the name-and-path of the file as reduced
in the reduction directory. Once all requested extensions have been produced, the
Expand Down Expand Up @@ -250,6 +250,9 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument,
msg = "*****CELERY: Starting {} calibration task for {}"
logging.info(msg.format(instrument, input_file_name))
config = get_config()

if isinstance(ext_or_exts, str):
ext_or_exts = [ext_or_exts]

input_dir = os.path.join(config['transfer_dir'], "incoming")
cal_dir = os.path.join(config['outputs'], "calibrated_data")
Expand All @@ -268,7 +271,6 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument,
set_permissions(uncal_file)

# Check for exposures with too many groups
max_groups = config.get("max_groups", 1000)
with fits.open(uncal_file) as inf:
total_groups = inf[0].header["NINTS"] * inf[0].header["NGROUPS"]
if total_groups > max_groups:
Expand Down Expand Up @@ -333,8 +335,11 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument,
# subsequent pipeline steps)
done = True
for ext in ext_or_exts:
if not os.path.isfile("{}_{}.fits".format(short_name, ext)):
logging.info("*****CELERY: Checking for {} output".format(ext))
check_file = output_file.replace(step_name, ext)
if not os.path.isfile(check_file):
done = False
logging.info("*****CELERY: {} not found. Continuing.".format(check_file))
if done:
print("*****CELERY: Created all files in {}. Finished.".format(ext_or_exts))
break
Expand All @@ -350,7 +355,7 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument,


@celery_app.task(name='jwql.shared_tasks.shared_tasks.calwebb_detector1_save_jump')
def calwebb_detector1_save_jump(input_file_name, ramp_fit=True, save_fitopt=True):
def calwebb_detector1_save_jump(input_file_name, ramp_fit=True, save_fitopt=True, max_groups=1000):
"""Call ``calwebb_detector1`` on the provided file, running all
steps up to the ``ramp_fit`` step, and save the result. Optionally
run the ``ramp_fit`` step and save the resulting slope file as well.
Expand Down Expand Up @@ -403,7 +408,6 @@ def calwebb_detector1_save_jump(input_file_name, ramp_fit=True, save_fitopt=True
set_permissions(uncal_file)

# Check for exposures with too many groups
max_groups = config.get("max_groups", 1000)
with fits.open(uncal_file) as inf:
total_groups = inf[0].header["NINTS"] * inf[0].header["NGROUPS"]
if total_groups > max_groups:
Expand Down Expand Up @@ -566,7 +570,7 @@ def prep_file(input_file, in_ext):
return short_name, cal_lock, os.path.join(send_path, uncal_name)


def start_pipeline(input_file, short_name, ext_or_exts, instrument, jump_pipe=False):
def start_pipeline(input_file, short_name, ext_or_exts, instrument, jump_pipe=False, max_groups=1000):
"""Starts the standard or save_jump pipeline for the provided file.

.. warning::
Expand Down Expand Up @@ -618,9 +622,9 @@ def start_pipeline(input_file, short_name, ext_or_exts, instrument, jump_pipe=Fa
ramp_fit = True
elif "fitopt" in ext:
save_fitopt = True
result = calwebb_detector1_save_jump.delay(input_file, ramp_fit=ramp_fit, save_fitopt=save_fitopt)
result = calwebb_detector1_save_jump.delay(input_file, ramp_fit=ramp_fit, save_fitopt=save_fitopt, max_groups=max_groups)
else:
result = run_calwebb_detector1.delay(input_file, short_name, ext_or_exts, instrument)
result = run_calwebb_detector1.delay(input_file, short_name, ext_or_exts, instrument, max_groups=max_groups)
return result


Expand Down Expand Up @@ -672,7 +676,7 @@ def retrieve_files(short_name, ext_or_exts, dest_dir):
return output_file_or_files


def run_pipeline(input_file, in_ext, ext_or_exts, instrument, jump_pipe=False):
def run_pipeline(input_file, in_ext, ext_or_exts, instrument, jump_pipe=False, max_groups=1000):
"""Convenience function for using the ``run_calwebb_detector1`` function on a data
file, including the following steps:

Expand Down Expand Up @@ -712,7 +716,7 @@ def run_pipeline(input_file, in_ext, ext_or_exts, instrument, jump_pipe=False):
retrieve_dir = os.path.dirname(input_file)
short_name, cal_lock, uncal_file = prep_file(input_file, in_ext)
uncal_name = os.path.basename(uncal_file)
result = start_pipeline(uncal_name, short_name, ext_or_exts, instrument, jump_pipe=jump_pipe)
result = start_pipeline(uncal_name, short_name, ext_or_exts, instrument, jump_pipe=jump_pipe, max_groups=max_groups)
logging.info("\t\tStarting with ID {}".format(result.id))
processed_path = result.get()
logging.info("\t\tPipeline Complete")
Expand All @@ -728,7 +732,7 @@ def run_pipeline(input_file, in_ext, ext_or_exts, instrument, jump_pipe=False):
return output


def run_parallel_pipeline(input_files, in_ext, ext_or_exts, instrument, jump_pipe=False):
def run_parallel_pipeline(input_files, in_ext, ext_or_exts, instrument, jump_pipe=False, max_groups=1000):
"""Convenience function for using the ``run_calwebb_detector1`` function on a list of
data files, breaking them into parallel celery calls, collecting the results together,
and returning the results as another list. In particular, this function will do the
Expand Down Expand Up @@ -794,7 +798,7 @@ def run_parallel_pipeline(input_files, in_ext, ext_or_exts, instrument, jump_pip
output_dirs[short_name] = retrieve_dir
input_file_paths[short_name] = input_file
locks[short_name] = cal_lock
results[short_name] = start_pipeline(uncal_name, short_name, ext_or_exts, instrument, jump_pipe=jump_pipe)
results[short_name] = start_pipeline(uncal_name, short_name, ext_or_exts, instrument, jump_pipe=jump_pipe, max_groups=max_groups)
logging.info("\tStarting {} with ID {}".format(short_name, results[short_name].id))
logging.info("Celery tasks submitted.")
logging.info("Waiting for task results")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,12 @@ def most_recent_coords(self, bad_pixel_type):
def pre_init(self):
# Start with default values for instrument and aperture because
# BokehTemplate's __init__ method does not allow input arguments
got_init_aperture = True
try:
dummy_instrument = self._instrument
dummy_aperture = self._aperture
except AttributeError:
got_init_aperture = False
self._instrument = 'NIRCam'
self._aperture = 'NRCA1_FULL'

Expand All @@ -186,6 +188,10 @@ def pre_init(self):
self.detector = '{}LONG'.format(self._aperture[0:4])
else:
self.detector = self._aperture.split('_')[0]

instrument = self._instrument
aperture = self._aperture
detector = self.detector

# App design
self.format_string = None
Expand Down
Loading