diff --git a/jwql/instrument_monitors/common_monitors/bad_pixel_monitor.py b/jwql/instrument_monitors/common_monitors/bad_pixel_monitor.py index 415a6fb2e..eeaafbaf5 100755 --- a/jwql/instrument_monitors/common_monitors/bad_pixel_monitor.py +++ b/jwql/instrument_monitors/common_monitors/bad_pixel_monitor.py @@ -781,16 +781,18 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun badpix_types_from_darks = ['HOT', 'RC', 'OTHER_BAD_PIXEL', 'TELEGRAPH'] illuminated_obstimes = [] if illuminated_raw_files: + logging.info("Found {} uncalibrated flat fields".format(len(illuminated_raw_files))) badpix_types.extend(badpix_types_from_flats) out_exts = defaultdict(lambda: ['jump', '0_ramp_fit']) in_files = [] for uncal_file, rate_file in zip(illuminated_raw_files, illuminated_slope_files): + logging.info("\tChecking illuminated raw file {} with rate file {}".format(uncal_file, rate_file)) self.get_metadata(uncal_file) if rate_file == 'None': short_name = os.path.basename(uncal_file).replace('_uncal.fits', '') local_uncal_file = os.path.join(self.data_dir, os.path.basename(uncal_file)) - logging.info('Calling pipeline for {}'.format(uncal_file)) - logging.info("Copying raw file to {}".format(self.data_dir)) + logging.info('\t\tCalling pipeline for {}'.format(uncal_file)) + logging.info("\t\tCopying uncal file to {}".format(self.data_dir)) copy_files([uncal_file], self.data_dir) if hasattr(self, 'nints') and self.nints > 1: out_exts[short_name] = ['jump', '1_ramp_fit'] @@ -801,30 +803,44 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun if needs_calibration: in_files.append(local_uncal_file) else: - logging.info("Calibrated files already exist for {}".format(short_name)) - outputs = run_parallel_pipeline(in_files, "uncal", out_exts, self.instrument, jump_pipe=True) + logging.info("\t\tCalibrated files already exist for {}".format(short_name)) + else: + logging.info("\tRate file found for {}".format(uncal_file)) + + outputs = {} + if len(in_files) > 0: + logging.info("Running pipeline for {} files".format(len(in_files))) + outputs = run_parallel_pipeline(in_files, "uncal", out_exts, self.instrument, jump_pipe=True) + index = 0 + logging.info("Checking files post-calibration") for uncal_file, rate_file in zip(illuminated_raw_files, illuminated_slope_files): + logging.info("\tChecking files {}, {}".format(os.path.basename(uncal_file), os.path.basename(rate_file))) local_uncal_file = os.path.join(self.data_dir, os.path.basename(uncal_file)) if local_uncal_file in outputs: + logging.info("\t\tAdding calibrated file.") illuminated_slope_files[index] = deepcopy(outputs[local_uncal_file][1]) else: + logging.info("\t\tCalibration was skipped for file") self.get_metadata(illuminated_raw_files[index]) - local_ramp_file = local_uncal_file.replace("uncal", "0_ramp_fit") - if hasattr(self, 'nints') and self.nints > 1: - local_ramp_file = local_ramp_file.replace("0_ramp_fit", "1_ramp_fit") - if os.path.isfile(local_ramp_file): - illuminated_slope_files[index] = local_ramp_file - else: - illuminated_slope_files[index] = None + if not os.path.isfile(illuminated_slope_files[index]): + logging.info("\t\tLooking for local rate file") + local_rate_file = local_uncal_file.replace("uncal", "rateint") + if os.path.isfile(local_rate_file): + illuminated_slope_files[index] = local_rate_file + else: + logging.info("\tCould not find local rate file {}".format(local_rate_file)) + illuminated_slope_files[index] = None index += 1 # Get observation time for all files illuminated_obstimes.append(instrument_properties.get_obstime(uncal_file)) + logging.info("Trimming unfound files.") index = 0 while index < len(illuminated_raw_files): if illuminated_slope_files[index] is None or illuminated_slope_files[index] == 'None': + logging.info("\tRemoving {}".format(illuminated_raw_files[index])) del illuminated_raw_files[index] del illuminated_slope_files[index] del illuminated_obstimes[index] @@ -842,6 +858,7 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun dark_fitopt_files = [] dark_obstimes = [] if dark_raw_files: + logging.info("Found {} uncalibrated darks".format(len(dark_raw_files))) index = 0 badpix_types.extend(badpix_types_from_darks) # In this case we need to run the pipeline on all input files, @@ -850,17 +867,19 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun in_files = [] out_exts = defaultdict(lambda: ['jump', 'fitopt', '0_ramp_fit']) for uncal_file, rate_file in zip(dark_raw_files, dark_slope_files): + logging.info("Checking dark file {} with rate file {}".format(uncal_file, rate_file)) self.get_metadata(uncal_file) - logging.info('Calling pipeline for {} {}'.format(uncal_file, rate_file)) - logging.info("Copying raw file to {}".format(self.data_dir)) - copy_files([uncal_file], self.data_dir) - local_uncal_file = os.path.join(self.data_dir, os.path.basename(uncal_file)) short_name = os.path.basename(uncal_file).replace('_uncal.fits', '') + local_uncal_file = os.path.join(self.data_dir, os.path.basename(uncal_file)) + if not os.path.isfile(local_uncal_file): + logging.info("\tCopying raw file to {}".format(self.data_dir)) + copy_files([uncal_file], self.data_dir) if hasattr(self, 'nints') and self.nints > 1: out_exts[short_name] = ['jump', 'fitopt', '1_ramp_fit'] local_processed_files = [local_uncal_file.replace("uncal", x) for x in out_exts[short_name]] calibrated_data = [os.path.isfile(x) for x in local_processed_files] if not all(calibrated_data): + logging.info('\tCalling pipeline for {} {}'.format(uncal_file, rate_file)) in_files.append(local_uncal_file) dark_jump_files.append(None) dark_fitopt_files.append(None) @@ -872,16 +891,25 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun dark_slope_files[index] = deepcopy(local_processed_files[2]) dark_obstimes.append(instrument_properties.get_obstime(uncal_file)) index += 1 - outputs = run_parallel_pipeline(in_files, "uncal", out_exts, self.instrument, jump_pipe=True) + + outputs = {} + if len(in_files) > 0: + logging.info("Running pipeline for {} files".format(len(in_files))) + outputs = run_parallel_pipeline(in_files, "uncal", out_exts, self.instrument, jump_pipe=True) + index = 0 + logging.info("Checking files post-calibration") for uncal_file, rate_file in zip(dark_raw_files, dark_slope_files): + logging.info("\tChecking files {}, {}".format(uncal_file, rate_file)) local_uncal_file = os.path.join(self.data_dir, os.path.basename(uncal_file)) short_name = os.path.basename(uncal_file).replace('_uncal.fits', '') if local_uncal_file in outputs: + logging.info("\t\tAdding calibrated files") dark_jump_files[index] = outputs[local_uncal_file][0] dark_fitopt_files[index] = outputs[local_uncal_file][1] dark_slope_files[index] = deepcopy(outputs[local_uncal_file][2]) else: + logging.info("\t\tCalibration skipped for file") self.get_metadata(local_uncal_file) local_ramp_file = local_uncal_file.replace("uncal", "0_ramp_fit") if hasattr(self, 'nints') and self.nints > 1: @@ -893,10 +921,12 @@ def process(self, illuminated_raw_files, illuminated_slope_files, flat_file_coun if not os.path.isfile(local_ramp_file): dark_slope_files[index] = None index += 1 - + index = 0 + logging.info("Trimming unfound files.") while index < len(dark_raw_files): if dark_jump_files[index] is None or dark_fitopt_files[index] is None or dark_slope_files[index] is None: + logging.info("\tRemoving {}".format(dark_raw_files[index])) del dark_raw_files[index] del dark_jump_files[index] del dark_fitopt_files[index] diff --git a/jwql/instrument_monitors/common_monitors/readnoise_monitor.py b/jwql/instrument_monitors/common_monitors/readnoise_monitor.py index de8855c4c..33e3d5a77 100755 --- a/jwql/instrument_monitors/common_monitors/readnoise_monitor.py +++ b/jwql/instrument_monitors/common_monitors/readnoise_monitor.py @@ -415,7 +415,11 @@ def process(self, file_list): files_to_calibrate.append(file) # Run the files through the necessary pipeline steps - outputs = run_parallel_pipeline(files_to_calibrate, "uncal", "refpix", self.instrument) + # NOTE: Because the readnoise monitor only needs to get as far as the "refpix" + # pipeline stage, it doesn't have the incredibly increased run times that are + # otherwise associated with files with a lot of groups, so the readnoise + # monitor can pretty much calibrate anything as far as it goes. + outputs = run_parallel_pipeline(files_to_calibrate, "uncal", "refpix", self.instrument, max_groups=20000) for filename in file_list: logging.info('\tWorking on file: {}'.format(filename)) @@ -623,9 +627,12 @@ def run(self): # Skip processing if the file doesnt have enough groups/ints to calculate the readnoise. # MIRI needs extra since they omit the first five and last group before calculating the readnoise. if total_cds_frames >= 10: - shutil.copy(uncal_filename, self.data_dir) - logging.info('\tCopied {} to {}'.format(uncal_filename, output_filename)) - set_permissions(output_filename) + if os.path.isfile(os.path.join(self.data_dir, os.path.basename(uncal_filename))): + logging.info("\tRaw file already exists locally.") + else: + shutil.copy(uncal_filename, self.data_dir) + logging.info('\tCopied {} to {}'.format(uncal_filename, output_filename)) + set_permissions(output_filename) new_files.append(output_filename) else: logging.info('\tNot enough groups/ints to calculate readnoise in {}'.format(uncal_filename)) diff --git a/jwql/shared_tasks/shared_tasks.py b/jwql/shared_tasks/shared_tasks.py index dd40c3e21..1ad8e7263 100644 --- a/jwql/shared_tasks/shared_tasks.py +++ b/jwql/shared_tasks/shared_tasks.py @@ -215,7 +215,7 @@ def collect_after_task(**kwargs): @celery_app.task(name='jwql.shared_tasks.shared_tasks.run_calwebb_detector1') -def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument, step_args={}): +def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument, step_args={}, max_groups=1000): """Run the steps of ``calwebb_detector1`` on the input file, saving the result of each step as a separate output file, then return the name-and-path of the file as reduced in the reduction directory. Once all requested extensions have been produced, the @@ -250,6 +250,9 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument, msg = "*****CELERY: Starting {} calibration task for {}" logging.info(msg.format(instrument, input_file_name)) config = get_config() + + if isinstance(ext_or_exts, str): + ext_or_exts = [ext_or_exts] input_dir = os.path.join(config['transfer_dir'], "incoming") cal_dir = os.path.join(config['outputs'], "calibrated_data") @@ -268,7 +271,6 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument, set_permissions(uncal_file) # Check for exposures with too many groups - max_groups = config.get("max_groups", 1000) with fits.open(uncal_file) as inf: total_groups = inf[0].header["NINTS"] * inf[0].header["NGROUPS"] if total_groups > max_groups: @@ -333,8 +335,11 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument, # subsequent pipeline steps) done = True for ext in ext_or_exts: - if not os.path.isfile("{}_{}.fits".format(short_name, ext)): + logging.info("*****CELERY: Checking for {} output".format(ext)) + check_file = output_file.replace(step_name, ext) + if not os.path.isfile(check_file): done = False + logging.info("*****CELERY: {} not found. Continuing.".format(check_file)) if done: print("*****CELERY: Created all files in {}. Finished.".format(ext_or_exts)) break @@ -350,7 +355,7 @@ def run_calwebb_detector1(input_file_name, short_name, ext_or_exts, instrument, @celery_app.task(name='jwql.shared_tasks.shared_tasks.calwebb_detector1_save_jump') -def calwebb_detector1_save_jump(input_file_name, ramp_fit=True, save_fitopt=True): +def calwebb_detector1_save_jump(input_file_name, ramp_fit=True, save_fitopt=True, max_groups=1000): """Call ``calwebb_detector1`` on the provided file, running all steps up to the ``ramp_fit`` step, and save the result. Optionally run the ``ramp_fit`` step and save the resulting slope file as well. @@ -403,7 +408,6 @@ def calwebb_detector1_save_jump(input_file_name, ramp_fit=True, save_fitopt=True set_permissions(uncal_file) # Check for exposures with too many groups - max_groups = config.get("max_groups", 1000) with fits.open(uncal_file) as inf: total_groups = inf[0].header["NINTS"] * inf[0].header["NGROUPS"] if total_groups > max_groups: @@ -566,7 +570,7 @@ def prep_file(input_file, in_ext): return short_name, cal_lock, os.path.join(send_path, uncal_name) -def start_pipeline(input_file, short_name, ext_or_exts, instrument, jump_pipe=False): +def start_pipeline(input_file, short_name, ext_or_exts, instrument, jump_pipe=False, max_groups=1000): """Starts the standard or save_jump pipeline for the provided file. .. warning:: @@ -618,9 +622,9 @@ def start_pipeline(input_file, short_name, ext_or_exts, instrument, jump_pipe=Fa ramp_fit = True elif "fitopt" in ext: save_fitopt = True - result = calwebb_detector1_save_jump.delay(input_file, ramp_fit=ramp_fit, save_fitopt=save_fitopt) + result = calwebb_detector1_save_jump.delay(input_file, ramp_fit=ramp_fit, save_fitopt=save_fitopt, max_groups=max_groups) else: - result = run_calwebb_detector1.delay(input_file, short_name, ext_or_exts, instrument) + result = run_calwebb_detector1.delay(input_file, short_name, ext_or_exts, instrument, max_groups=max_groups) return result @@ -672,7 +676,7 @@ def retrieve_files(short_name, ext_or_exts, dest_dir): return output_file_or_files -def run_pipeline(input_file, in_ext, ext_or_exts, instrument, jump_pipe=False): +def run_pipeline(input_file, in_ext, ext_or_exts, instrument, jump_pipe=False, max_groups=1000): """Convenience function for using the ``run_calwebb_detector1`` function on a data file, including the following steps: @@ -712,7 +716,7 @@ def run_pipeline(input_file, in_ext, ext_or_exts, instrument, jump_pipe=False): retrieve_dir = os.path.dirname(input_file) short_name, cal_lock, uncal_file = prep_file(input_file, in_ext) uncal_name = os.path.basename(uncal_file) - result = start_pipeline(uncal_name, short_name, ext_or_exts, instrument, jump_pipe=jump_pipe) + result = start_pipeline(uncal_name, short_name, ext_or_exts, instrument, jump_pipe=jump_pipe, max_groups=max_groups) logging.info("\t\tStarting with ID {}".format(result.id)) processed_path = result.get() logging.info("\t\tPipeline Complete") @@ -728,7 +732,7 @@ def run_pipeline(input_file, in_ext, ext_or_exts, instrument, jump_pipe=False): return output -def run_parallel_pipeline(input_files, in_ext, ext_or_exts, instrument, jump_pipe=False): +def run_parallel_pipeline(input_files, in_ext, ext_or_exts, instrument, jump_pipe=False, max_groups=1000): """Convenience function for using the ``run_calwebb_detector1`` function on a list of data files, breaking them into parallel celery calls, collecting the results together, and returning the results as another list. In particular, this function will do the @@ -794,7 +798,7 @@ def run_parallel_pipeline(input_files, in_ext, ext_or_exts, instrument, jump_pip output_dirs[short_name] = retrieve_dir input_file_paths[short_name] = input_file locks[short_name] = cal_lock - results[short_name] = start_pipeline(uncal_name, short_name, ext_or_exts, instrument, jump_pipe=jump_pipe) + results[short_name] = start_pipeline(uncal_name, short_name, ext_or_exts, instrument, jump_pipe=jump_pipe, max_groups=max_groups) logging.info("\tStarting {} with ID {}".format(short_name, results[short_name].id)) logging.info("Celery tasks submitted.") logging.info("Waiting for task results") diff --git a/jwql/website/apps/jwql/monitor_pages/monitor_bad_pixel_bokeh.py b/jwql/website/apps/jwql/monitor_pages/monitor_bad_pixel_bokeh.py index 0be5d85a0..d798e3b89 100755 --- a/jwql/website/apps/jwql/monitor_pages/monitor_bad_pixel_bokeh.py +++ b/jwql/website/apps/jwql/monitor_pages/monitor_bad_pixel_bokeh.py @@ -172,10 +172,12 @@ def most_recent_coords(self, bad_pixel_type): def pre_init(self): # Start with default values for instrument and aperture because # BokehTemplate's __init__ method does not allow input arguments + got_init_aperture = True try: dummy_instrument = self._instrument dummy_aperture = self._aperture except AttributeError: + got_init_aperture = False self._instrument = 'NIRCam' self._aperture = 'NRCA1_FULL' @@ -186,6 +188,10 @@ def pre_init(self): self.detector = '{}LONG'.format(self._aperture[0:4]) else: self.detector = self._aperture.split('_')[0] + + instrument = self._instrument + aperture = self._aperture + detector = self.detector # App design self.format_string = None diff --git a/jwql/website/apps/jwql/monitor_pages/monitor_cosmic_rays_bokeh.py b/jwql/website/apps/jwql/monitor_pages/monitor_cosmic_rays_bokeh.py index 4160e9b87..33d22640c 100644 --- a/jwql/website/apps/jwql/monitor_pages/monitor_cosmic_rays_bokeh.py +++ b/jwql/website/apps/jwql/monitor_pages/monitor_cosmic_rays_bokeh.py @@ -69,25 +69,46 @@ def get_histogram_data(self): """Get data required to create cosmic ray histogram from the database query. """ - - self.mags = [row.magnitude for row in self.cosmic_ray_table] - + + self.mags_hist = [row.magnitude for row in self.cosmic_ray_table] + self.mags_outliers = [row.outliers for row in self.cosmic_ray_table] + + self.mags = np.zeros((65536*2+1), dtype=np.int64) + for mag_histogram in self.mags_hist: + self.mags += mag_histogram + # If there are no data, then create something reasonable - if len(self.mags) == 0: - self.mags = [[0]] - - last_hist_index = -1 - # We'll never see CRs with magnitudes above 65535. - # Let's fix the bins for now, and see some data to check - # if they are reasonable - bins = np.arange(-65000, 66000, 5000) - hist = plt.hist(self.mags[last_hist_index], bins=bins) - - self.bin_left = np.array([bar.get_x() for bar in hist[2]]) - self.amplitude = [bar.get_height() for bar in hist[2]] - self.bottom = [bar.get_y() for bar in hist[2]] - deltas = self.bin_left[1:] - self.bin_left[0: -1] - self.bin_width = np.append(deltas[0], deltas) + if len(self.mags_hist) == 0: + self.mags[65527] = 1 + self.mags[65528] = 2 + self.mags[65529] = 3 + self.mags[65530] = 4 + self.mags[65531] = 5 + self.mags[65532] = 6 + self.mags[65533] = 7 + self.mags[65534] = 8 + self.mags[65535] = 9 + self.mags[65536] = 10 + self.mags[65537] = 9 + self.mags[65538] = 8 + self.mags[65539] = 7 + self.mags[65540] = 6 + self.mags[65541] = 5 + self.mags[65542] = 4 + self.mags[65543] = 3 + self.mags[65544] = 2 + self.mags[65545] = 1 + + # For the moment, we're using a single bin per cosmic ray magnitude, between + # -65536 and +65536. + # Previous comment: + # We'll never see CRs with magnitudes above 65535. + # Note by BQ on 2022-11-11: Yes, we will. They're not physical, but we'll sure + # see them. + self.bin_left = np.arange(65536*2+1, dtype=np.int32) - 65536 + self.amplitude = self.mags + self.bottomn = np.zeros((65536*2+1,), dtype=np.int32) + self.bin_width = np.ones((65536*2+1,), dtype=np.int32) def get_history_data(self): """Extract data on the history of cosmic ray numbers from the @@ -95,6 +116,7 @@ def get_history_data(self): """ self.times = [row.obs_end_time for row in self.cosmic_ray_table] self.rate = [row.jump_rate for row in self.cosmic_ray_table] + self.files = [row.source_file.replace("_uncal.fits", "") for row in self.cosmic_ray_table] def histogram_plot(self): """Create the histogram figure of CR magnitudes. @@ -104,7 +126,7 @@ def histogram_plot(self): title = f'Magnitudes: {self._instrument}, {self._aperture}' fig = figure(title=title, tools='zoom_in, zoom_out, box_zoom, pan, reset, save', background_fill_color="#fafafa") fig.quad(top=self.amplitude, bottom=0, left=self.bin_left, right=self.bin_left + self.bin_width, - fill_color="navy", line_color="white", alpha=0.5) + fill_color="navy", line_color="black", alpha=0.5) fig.y_range.start = 0 fig.xaxis.formatter.use_scientific = False @@ -128,8 +150,9 @@ def history_plot(self): if len(self.times) == 0: self.times = [datetime(2021, 12, 25), datetime(2021, 12, 26)] self.rate = [0, 0] + self.files = ["dummy", "dummy"] - source = ColumnDataSource(data={'x': self.times, 'y': self.rate}) + source = ColumnDataSource(data={'x': self.times, 'y': self.rate, 'filename': self.files}) # Create a useful plot title title = f'CR Rates: {self._instrument}, {self._aperture}' @@ -158,7 +181,8 @@ def history_plot(self): fig.yaxis[0].formatter = BasicTickFormatter(use_scientific=True, precision=2) hover_tool = HoverTool(tooltips=[('Value', '@y'), - ('Date', '@x{%d %b %Y %H:%M:%S}') + ('Date', '@x{%d %b %Y %H:%M:%S}'), + ('File', '@filename') ], mode='mouse', renderers=[data]) hover_tool.formatters = {'@x': 'datetime'} fig.tools.append(hover_tool) diff --git a/jwql/website/apps/jwql/monitor_pages/monitor_dark_bokeh.py b/jwql/website/apps/jwql/monitor_pages/monitor_dark_bokeh.py index 941eed901..09b8319fc 100755 --- a/jwql/website/apps/jwql/monitor_pages/monitor_dark_bokeh.py +++ b/jwql/website/apps/jwql/monitor_pages/monitor_dark_bokeh.py @@ -25,6 +25,7 @@ from astropy.time import Time from bokeh.models.tickers import LogTicker from datetime import datetime +from glob import glob import numpy as np from jwql.database.database_interface import session @@ -63,6 +64,11 @@ def _dark_mean_image(self): mean_dark_image_file = self.pixel_table[-1].mean_dark_image_file mean_slope_dir = os.path.join(OUTPUTS_DIR, 'dark_monitor', 'mean_slope_images') mean_dark_image_path = os.path.join(mean_slope_dir, mean_dark_image_file) + if not os.path.isfile(mean_dark_image_path): + possible_files = glob(os.path.join(mean_slope_dir, "_".join(mean_dark_image_file.split("_")[:3])+"*")) + if len(possible_files) > 0: + # Pick the bottom file because it's likely the most recent + mean_dark_image_path = possible_files[-1] with fits.open(mean_dark_image_path) as hdulist: data = hdulist[1].data else: @@ -130,7 +136,7 @@ def pre_init(self): self.full_dark_amplitude = [0., 1., 0.] else: self.dark_current = [row.mean for row in self.dark_table] - self.full_dark_bin_center = np.array([row.hist_dark_values for + self.full_dark_bin_center = np.array([np.array(row.hist_dark_values) for row in self.dark_table])[last_hist_index] self.full_dark_amplitude = [row.hist_amplitudes for row in self.dark_table][last_hist_index]