From a65d25dfab2193a305c33f92da9dcbcd8b500d7f Mon Sep 17 00:00:00 2001 From: Michael Bulinski Date: Fri, 23 Jun 2017 10:15:33 +0200 Subject: [PATCH 01/26] changes to allow single output files per input file --- erna/scripts/process_fact_data.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/erna/scripts/process_fact_data.py b/erna/scripts/process_fact_data.py index 070dbb8..dcad5bb 100755 --- a/erna/scripts/process_fact_data.py +++ b/erna/scripts/process_fact_data.py @@ -10,19 +10,24 @@ import erna from erna import stream_runner +from erna import stream_runner_local_output + import erna.datacheck_conditions as dcc logger = logging.getLogger(__name__) -def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, vmem, num_runs_per_bunch, walltime): +def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, vmem, num_runs_per_bunch, walltime, local=False): jobs = [] # create job objects df_mapping["bunch_index"]= np.arange(len(df_mapping)) // num_runs_per_bunch for num, df in df_mapping.groupby("bunch_index"): df=df.copy() df["bunch_index"] = num - job = Job(stream_runner.run, [jar, xml, df, db_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) + if local: + job = Job(stream_runner_local_output.run, [jar, xml, df, output_directory, db_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) + else: + job = Job(stream_runner.run, [jar, xml, df, db_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) jobs.append(job) return jobs @@ -47,8 +52,15 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, v @click.option('--conditions', help='Name of the data conditions as given in datacheck_conditions.py e.g std', default='std') @click.option('--max_delta_t', default=30, help='Maximum time difference (minutes) allowed between drs and data files.', type=click.INT) @click.option('--local', default=False,is_flag=True, help='Flag indicating whether jobs should be executed localy .') +@click.option('--local_output', default=False,is_flag=True, + help='Flag indicating whether jobs write their output localy' + + 'to disk without gathering everything in the mother' + + 'process. In this case the output file only contains a' + + 'summary oth the processed jobs. The data ouput will be' + + 'inseparate files', + show_default=True) @click.password_option(help='password to read from the always awesome RunDB') -def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, walltime, engine, num_runs, vmem, log_level, port, source, conditions, max_delta_t, local, password): +def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, walltime, engine, num_runs, vmem, log_level, port, source, conditions, max_delta_t, local, local_output, password): level=logging.INFO if log_level is 'DEBUG': @@ -77,7 +89,7 @@ def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, wallt logger.info("Would process {} jobs with {} runs per job".format(len(df_runs)//num_runs, num_runs)) click.confirm('Do you want to continue processing and start jobs?', abort=True) - job_list = make_jobs(jarpath, xmlpath, db_path, output_directory, df_runs, engine, queue, vmem, num_runs, walltime) + job_list = make_jobs(jarpath, xmlpath, db_path, output_directory, df_runs, engine, queue, vmem, num_runs, walltime, local_output) job_outputs = gridmap.process_jobs(job_list, max_processes=len(job_list), local=local) erna.collect_output(job_outputs, out, df_runs) From 2e796bd9f61d4019bd5c9e544670f0fe648eeb03 Mon Sep 17 00:00:00 2001 From: tarrox Date: Tue, 27 Jun 2017 02:52:48 +0200 Subject: [PATCH 02/26] some small fixes --- erna/scripts/process_fact_data.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/erna/scripts/process_fact_data.py b/erna/scripts/process_fact_data.py index dcad5bb..fa3cd55 100755 --- a/erna/scripts/process_fact_data.py +++ b/erna/scripts/process_fact_data.py @@ -24,8 +24,9 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, v for num, df in df_mapping.groupby("bunch_index"): df=df.copy() df["bunch_index"] = num + file = output_directory+"/output"+str(num)+".bin" if local: - job = Job(stream_runner_local_output.run, [jar, xml, df, output_directory, db_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) + job = Job(stream_runner_local_output.run, [jar, xml, df, file, db_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) else: job = Job(stream_runner.run, [jar, xml, df, db_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) jobs.append(job) @@ -49,7 +50,7 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, v @click.option('--log_level', type=click.Choice(['INFO', 'DEBUG', 'WARN']), help='increase output verbosity', default='INFO') @click.option('--port', help='The port through which to communicate with the JobMonitor', default=12856, type=int) @click.option('--source', help='Name of the source to analyze. e.g Crab', default='Crab') -@click.option('--conditions', help='Name of the data conditions as given in datacheck_conditions.py e.g std', default='std') +@click.option('--conditions', help='Name of the data conditions as given in datacheck_conditions.py e.g std', default='standard') @click.option('--max_delta_t', default=30, help='Maximum time difference (minutes) allowed between drs and data files.', type=click.INT) @click.option('--local', default=False,is_flag=True, help='Flag indicating whether jobs should be executed localy .') @click.option('--local_output', default=False,is_flag=True, From 4b2cd7a1b8abf305add66afdf3f5a38f2d77e7c5 Mon Sep 17 00:00:00 2001 From: Michael Bulinski Date: Tue, 27 Jun 2017 03:19:25 +0200 Subject: [PATCH 03/26] more changes for local output working --- erna/scripts/process_fact_data.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/erna/scripts/process_fact_data.py b/erna/scripts/process_fact_data.py index fa3cd55..78c9c7e 100755 --- a/erna/scripts/process_fact_data.py +++ b/erna/scripts/process_fact_data.py @@ -60,7 +60,7 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, v + 'summary oth the processed jobs. The data ouput will be' + 'inseparate files', show_default=True) -@click.password_option(help='password to read from the always awesome RunDB') +@click.password_option(help='password to read from the always awesome RunDB', confirmation_prompt=False) def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, walltime, engine, num_runs, vmem, log_level, port, source, conditions, max_delta_t, local, local_output, password): level=logging.INFO @@ -80,7 +80,7 @@ def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, wallt erna.ensure_output(out) db_path = os.path.abspath(db) output_directory = os.path.dirname(outpath) - #create dir if it doesnt exist + #create dir if it doesnt exist #TODO check: should already been done by enra.ensure_output os.makedirs(output_directory, exist_ok=True) logger.info("Writing output data to {}".format(out)) factdb = sqlalchemy.create_engine("mysql+pymysql://factread:{}@129.194.168.95/factdata".format(password)) @@ -92,7 +92,8 @@ def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, wallt job_list = make_jobs(jarpath, xmlpath, db_path, output_directory, df_runs, engine, queue, vmem, num_runs, walltime, local_output) job_outputs = gridmap.process_jobs(job_list, max_processes=len(job_list), local=local) - erna.collect_output(job_outputs, out, df_runs) + if not local_output: + erna.collect_output(job_outputs, out, df_runs) if __name__ == "__main__": main() From 8ad09d61fb69b907e9cd0d89106d2b0141079153 Mon Sep 17 00:00:00 2001 From: Michael Bulinski Date: Tue, 27 Jun 2017 03:35:57 +0200 Subject: [PATCH 04/26] fixed error with not considering fits.gz filenames --- erna/__init__.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/erna/__init__.py b/erna/__init__.py index e2fce71..2cdfee9 100644 --- a/erna/__init__.py +++ b/erna/__init__.py @@ -21,6 +21,18 @@ def build_path(row, path_to_data, extension): return os.path.join(path_to_data, year, month, day, row.filename + extension) +def build_path_data(row, path_to_data): + night = str(row.NIGHT) + year = night[0:4] + month = night[4:6] + day = night[6:8] + res_path = os.path.join(path_to_data, year, month, day, row.filename + "fits.fz") + if not os.path.exists(res_path): + res_path = os.path.join(path_to_data, year, month, day, row.filename + "fits.gz") + if not os.path.exists(res_path): + raise FileNotFoundError("The given datafile was not fount") + return res_path + def build_filename(night, run_id): return night.astype(str) + '_' + run_id.map('{:03d}'.format) @@ -165,8 +177,8 @@ def load( data["filename"] = build_filename(data.NIGHT, data.RUNID) drs_data["filename"] = build_filename(drs_data.NIGHT, drs_data.RUNID) - # write path TODO: file ending? is everything in fz? - data["path"] = data.apply(build_path, axis=1, path_to_data=path_to_data, extension='.fits.fz') + # write path TODO: file ending? is everything in fz? #TODO in work mbulinski + data["path"] = data.apply(build_path_data, axis=1, path_to_data=path_to_data) drs_data["path"] = drs_data.apply(build_path, axis=1, path_to_data=path_to_data, extension='.drs.fits.gz') # reindex the drs table using the index of the data table. From aed1c682daf0983c7630329a52da6cb2ad20b03a Mon Sep 17 00:00:00 2001 From: Michael Bulinski Date: Tue, 27 Jun 2017 03:41:54 +0200 Subject: [PATCH 05/26] more error debug infos --- erna/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/erna/__init__.py b/erna/__init__.py index 2cdfee9..23b1471 100644 --- a/erna/__init__.py +++ b/erna/__init__.py @@ -30,7 +30,7 @@ def build_path_data(row, path_to_data): if not os.path.exists(res_path): res_path = os.path.join(path_to_data, year, month, day, row.filename + "fits.gz") if not os.path.exists(res_path): - raise FileNotFoundError("The given datafile was not fount") + raise FileNotFoundError("The given datafile was not found: "+res_path) return res_path def build_filename(night, run_id): From 86d1e2149745acc005c6040c819e359b9d2d70ae Mon Sep 17 00:00:00 2001 From: tarrox Date: Fri, 30 Jun 2017 08:37:17 +0200 Subject: [PATCH 06/26] added missing dots to filename --- erna/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/erna/__init__.py b/erna/__init__.py index 23b1471..b44f179 100644 --- a/erna/__init__.py +++ b/erna/__init__.py @@ -26,9 +26,9 @@ def build_path_data(row, path_to_data): year = night[0:4] month = night[4:6] day = night[6:8] - res_path = os.path.join(path_to_data, year, month, day, row.filename + "fits.fz") + res_path = os.path.join(path_to_data, year, month, day, row.filename + ".fits.fz") if not os.path.exists(res_path): - res_path = os.path.join(path_to_data, year, month, day, row.filename + "fits.gz") + res_path = os.path.join(path_to_data, year, month, day, row.filename + ".fits.gz") if not os.path.exists(res_path): raise FileNotFoundError("The given datafile was not found: "+res_path) return res_path From 07a2609efdf38d97021b1410cd15c02ee138e3f6 Mon Sep 17 00:00:00 2001 From: Michael Bulinski Date: Fri, 30 Jun 2017 11:05:51 +0200 Subject: [PATCH 07/26] added fix for non existing drs files --- erna/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/erna/__init__.py b/erna/__init__.py index 23b1471..9037c00 100644 --- a/erna/__init__.py +++ b/erna/__init__.py @@ -18,7 +18,10 @@ def build_path(row, path_to_data, extension): year = night[0:4] month = night[4:6] day = night[6:8] - return os.path.join(path_to_data, year, month, day, row.filename + extension) + res = os.path.join(path_to_data, year, month, day, row.filename + extension) + if not os.path.exists(res_path): + return np.nan + return res def build_path_data(row, path_to_data): From bd0c17a534e4ff6197df58f75ea01c17bb2c40a2 Mon Sep 17 00:00:00 2001 From: Michael Bulinski Date: Fri, 30 Jun 2017 11:13:29 +0200 Subject: [PATCH 08/26] non existing files are dropped --- erna/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/erna/__init__.py b/erna/__init__.py index 4b859e5..b69b06a 100644 --- a/erna/__init__.py +++ b/erna/__init__.py @@ -33,7 +33,8 @@ def build_path_data(row, path_to_data): if not os.path.exists(res_path): res_path = os.path.join(path_to_data, year, month, day, row.filename + ".fits.gz") if not os.path.exists(res_path): - raise FileNotFoundError("The given datafile was not found: "+res_path) + return numpy.nan + #raise FileNotFoundError("The given datafile was not found: "+res_path) return res_path def build_filename(night, run_id): From 661dde25f99603c95de0bbaf3e69df3bac79ad8e Mon Sep 17 00:00:00 2001 From: tarrox Date: Tue, 4 Jul 2017 12:17:59 +0200 Subject: [PATCH 09/26] replaced fileerror with nan --- erna/__init__.py | 4 ++-- erna/scripts/process_fact_data.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/erna/__init__.py b/erna/__init__.py index b69b06a..3622c78 100644 --- a/erna/__init__.py +++ b/erna/__init__.py @@ -19,7 +19,7 @@ def build_path(row, path_to_data, extension): month = night[4:6] day = night[6:8] res = os.path.join(path_to_data, year, month, day, row.filename + extension) - if not os.path.exists(res_path): + if not os.path.exists(res): return np.nan return res @@ -33,7 +33,7 @@ def build_path_data(row, path_to_data): if not os.path.exists(res_path): res_path = os.path.join(path_to_data, year, month, day, row.filename + ".fits.gz") if not os.path.exists(res_path): - return numpy.nan + return np.nan #raise FileNotFoundError("The given datafile was not found: "+res_path) return res_path diff --git a/erna/scripts/process_fact_data.py b/erna/scripts/process_fact_data.py index 78c9c7e..5b0fba3 100755 --- a/erna/scripts/process_fact_data.py +++ b/erna/scripts/process_fact_data.py @@ -40,7 +40,7 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, v @click.argument('data_dir', type=click.Path(exists=True, dir_okay=True, file_okay=False, readable=True) ) @click.argument('jar', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) ) @click.argument('xml', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) ) -@click.argument('db', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) ) +@click.argument('db', type=click.Path(exists=True, dir_okay=True, file_okay=True, readable=True) ) @click.argument('out', type=click.Path(exists=False, dir_okay=False, file_okay=True, readable=True) ) @click.option('--queue', help='Name of the queue you want to send jobs to.', default='short') @click.option('--walltime', help='Estimated maximum walltime of your job in format hh:mm:ss.', default='02:00:00') From cf02cd7e036b88a8f46f2f8f74266d838d28a7b0 Mon Sep 17 00:00:00 2001 From: tarrox Date: Tue, 10 Oct 2017 14:26:17 +0200 Subject: [PATCH 10/26] added new functions needed for noise processing --- erna/__init__.py | 18 ++++++++++++++--- erna/scripts/process_fact_data.py | 8 +++++--- erna/scripts/process_fact_mc.py | 32 ++++++++++++++++++++++++++----- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/erna/__init__.py b/erna/__init__.py index 3622c78..decb192 100644 --- a/erna/__init__.py +++ b/erna/__init__.py @@ -81,6 +81,10 @@ def collect_output(job_outputs, output_path, df_started_runs=None, **kwargs): df_returned_data = pd.concat(frames, ignore_index=True) logger.info("There are a total of {} events in the result".format(len(df_returned_data))) + if len(df_returned_data)==0: + logger.info("No events in the result were returned, something must have gone bad, better go fix it.") + return + if df_started_runs is not None: df_merged = pd.merge(df_started_runs, df_returned_data, on=['NIGHT','RUNID'], how='inner') total_on_time_in_seconds = df_merged.on_time.sum() @@ -185,12 +189,18 @@ def load( data["path"] = data.apply(build_path_data, axis=1, path_to_data=path_to_data) drs_data["path"] = drs_data.apply(build_path, axis=1, path_to_data=path_to_data, extension='.drs.fits.gz') + drs_data = drs_data.dropna(subset=["path"]) + #data.to_csv("data.csv") + #drs_data.to_csv("drs.csv") + # reindex the drs table using the index of the data table. # There are always more data runs than drs run in the db. # hence missing rows have to be filled either forward or backwards earlier_drs_entries = drs_data.reindex(data.index, method="ffill") + earlier_drs_entries = earlier_drs_entries.fillna(axis="index", method="ffill") later_drs_entries = drs_data.reindex(data.index, method="backfill") - + later_drs_entries = later_drs_entries.fillna(axis="index", method="ffill") + # when backfilling the drs obeservations the last rows might be invalid and contain nans. # We cannot drop them becasue the tables have to have the same length. # in that case simply fill them up. @@ -209,7 +219,8 @@ def load( closest_drs_entries.deltaT, data.fOnTime, data.fEffectiveOn, data.NIGHT, - data.RUNID + data.RUNID, + data.filename ], axis=1, keys=[ "filename", "drs_path", @@ -219,8 +230,9 @@ def load( "effective_on", "NIGHT", "RUNID", + "filenameData" ]) - + mapping = mapping.dropna(how='any') logger.info("Fetched {} data runs and approx {} drs entries from database where time delta is less than {} minutes".format(len(mapping), mapping['drs_path'].nunique(), timedelta_in_minutes)) diff --git a/erna/scripts/process_fact_data.py b/erna/scripts/process_fact_data.py index 5b0fba3..3ab605c 100755 --- a/erna/scripts/process_fact_data.py +++ b/erna/scripts/process_fact_data.py @@ -24,7 +24,8 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, v for num, df in df_mapping.groupby("bunch_index"): df=df.copy() df["bunch_index"] = num - file = output_directory+"/output"+str(num)+".bin" + file = output_directory+"/"+df.filenameData[0]+".fits" + #print(file, num) if local: job = Job(stream_runner_local_output.run, [jar, xml, df, file, db_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) else: @@ -87,10 +88,11 @@ def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, wallt data_conditions=dcc.conditions[conditions] df_runs = erna.load(earliest_night, latest_night, data_dir, source_name=source, timedelta_in_minutes=max_delta_t, factdb=factdb, data_conditions=data_conditions) - logger.info("Would process {} jobs with {} runs per job".format(len(df_runs)//num_runs, num_runs)) - click.confirm('Do you want to continue processing and start jobs?', abort=True) job_list = make_jobs(jarpath, xmlpath, db_path, output_directory, df_runs, engine, queue, vmem, num_runs, walltime, local_output) + logger.info("Would process {} jobs with {} runs per job".format(len(df_runs)//num_runs, num_runs)) + click.confirm('Do you want to continue processing and start jobs?', abort=True) + job_outputs = gridmap.process_jobs(job_list, max_processes=len(job_list), local=local) if not local_output: erna.collect_output(job_outputs, out, df_runs) diff --git a/erna/scripts/process_fact_mc.py b/erna/scripts/process_fact_mc.py index df4d86d..a2767cc 100644 --- a/erna/scripts/process_fact_mc.py +++ b/erna/scripts/process_fact_mc.py @@ -15,7 +15,22 @@ logger = logging.getLogger(__name__) - +import re +# given a string +def createFilenameFromFormat(format, basename, num): + m = re.search('\%[0-9]?n', '%b_%4n.json') + if not m: + raise "Missing number placement in format string" + numstr = "" + if len(m.group(0))==3: + numstr = ("%0"+m.group(0)[1]+"i") % num + else: + numstr = "%i" % num + stemp = format.replace(m.group(0), numstr) + stemp = format.replace("%b", basename) + return stemp + + def make_jobs(jar, xml, data_paths, drs_paths, engine, queue, vmem, num_jobs, walltime, output_path=None): jobs = [] @@ -31,6 +46,7 @@ def make_jobs(jar, xml, data_paths, drs_paths, df = pd.DataFrame({'data_path': data, 'drs_path': drs}) if output_path: file_name, _ = path.splitext(path.basename(output_path)) + #file_name = createFilenameFromFormat("%b_%n.json", file_name, num) file_name += "_{}.json".format(num) out_path = path.dirname(output_path) run = [jar, xml, df, path.join(out_path, file_name)] @@ -68,14 +84,16 @@ def make_jobs(jar, xml, data_paths, drs_paths, @click.option("--log_level", type=click.Choice(['INFO', 'DEBUG', 'WARN']), help='increase output verbosity', default='INFO') @click.option('--port', help='The port through which to communicate with the JobMonitor', default=12856, type=int) @click.option('--local', default=False,is_flag=True, help='Flag indicating whether jobs should be executed localy.',show_default=True) -@click.option('--local_output', default=False,is_flag=True, +@click.option('--local_output', default=False, is_flag=True, help='Flag indicating whether jobs write their output localy' + 'to disk without gathering everything in the mother' + 'process. In this case the output file only contains a' + 'summary oth the processed jobs. The data ouput will be' - + 'inseparate files', + + 'in separate files', show_default=True) -def main( jar, xml, out, mc_path, queue, walltime, engine, num_jobs, vmem, log_level, port, local, local_output): +@click.option('--mcdrs', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True)) +@click.option('--mcwildcard', help="Gives the wildward for searching the folder for files.", type=click.STRING, default='**/*_Events.fit*') +def main( jar, xml, out, mc_path, queue, walltime, engine, num_jobs, vmem, log_level, port, local, local_output, mcdrs, mcwildcard): ''' Script to execute fact-tools on MonteCarlo files. Use the MC_PATH argument to specifiy the folders containing the MC ''' @@ -99,13 +117,17 @@ def main( jar, xml, out, mc_path, queue, walltime, engine, num_jobs, vmem, log_l jarpath = path.abspath(jar) xmlpath = path.abspath(xml) drspath = erna.mc_drs_file() + if mcdrs: + drspath = mcdrs + + #drspath = "/home/mbulinski/mc2.drs.fits.gz" logger.info('Using drs file at {}'.format(drspath)) #get data files files=[] for folder in tqdm(mc_path): # print("Entering folder {}".format(folder)) - pattern = path.join(folder, '**/*_Events.fit*') + pattern = path.join(folder, mcwildcard) f = glob.glob(pattern, recursive=True) files = files + f From bedd99ec8b7d32819e050f6047d1e3ef84c3a9b6 Mon Sep 17 00:00:00 2001 From: Michael Bulinski Date: Tue, 10 Oct 2017 14:41:42 +0200 Subject: [PATCH 11/26] added the ability to give a local mc output process a different name --- erna/scripts/process_fact_mc.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/erna/scripts/process_fact_mc.py b/erna/scripts/process_fact_mc.py index a2767cc..a65f18d 100644 --- a/erna/scripts/process_fact_mc.py +++ b/erna/scripts/process_fact_mc.py @@ -16,9 +16,10 @@ logger = logging.getLogger(__name__) import re -# given a string + +# given a string create a filename def createFilenameFromFormat(format, basename, num): - m = re.search('\%[0-9]?n', '%b_%4n.json') + m = re.search('\%[0-9]?n', format) if not m: raise "Missing number placement in format string" numstr = "" @@ -32,7 +33,7 @@ def createFilenameFromFormat(format, basename, num): def make_jobs(jar, xml, data_paths, drs_paths, - engine, queue, vmem, num_jobs, walltime, output_path=None): + engine, queue, vmem, num_jobs, walltime, output_path=None, format=None): jobs = [] data_partitions = np.array_split(data_paths, num_jobs) @@ -45,9 +46,12 @@ def make_jobs(jar, xml, data_paths, drs_paths, for num, (data, drs) in enumerate(zip(data_partitions, drs_partitions)): df = pd.DataFrame({'data_path': data, 'drs_path': drs}) if output_path: + # create the filenames for each single local run file_name, _ = path.splitext(path.basename(output_path)) - #file_name = createFilenameFromFormat("%b_%n.json", file_name, num) - file_name += "_{}.json".format(num) + if format: + file_name = createFilenameFromFormat(format, file_name, num) + else: + file_name = createFilenameFromFormat("%b_%n.json", file_name, num) out_path = path.dirname(output_path) run = [jar, xml, df, path.join(out_path, file_name)] stream_runner = stream_runner_local @@ -93,7 +97,10 @@ def make_jobs(jar, xml, data_paths, drs_paths, show_default=True) @click.option('--mcdrs', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True)) @click.option('--mcwildcard', help="Gives the wildward for searching the folder for files.", type=click.STRING, default='**/*_Events.fit*') -def main( jar, xml, out, mc_path, queue, walltime, engine, num_jobs, vmem, log_level, port, local, local_output, mcdrs, mcwildcard): +@click.option('--local_output_format', default=None, help="Give the file format for the local output funktionality." + + "%b will replace the out filename and %[1-9]n the given local number." + + "Default is: '%b_%n.json'.Only works with option --local_output. ") +def main( jar, xml, out, mc_path, queue, walltime, engine, num_jobs, vmem, log_level, port, local, local_output, mcdrs, mcwildcard, local_output_format): ''' Script to execute fact-tools on MonteCarlo files. Use the MC_PATH argument to specifiy the folders containing the MC ''' @@ -149,7 +156,7 @@ def main( jar, xml, out, mc_path, queue, walltime, engine, num_jobs, vmem, log_l job_list = make_jobs( jarpath, xmlpath, mc_paths_array, drs_paths_array, engine, queue, - vmem, num_jobs, walltime, output_path=local_output_dir + vmem, num_jobs, walltime, output_path=local_output_dir, format=local_output_format ) else: job_list = make_jobs( From deae60720e9abb087d355354b8868abb1c76d314 Mon Sep 17 00:00:00 2001 From: Michael Bulinski Date: Tue, 10 Oct 2017 15:07:15 +0200 Subject: [PATCH 12/26] removed stuff from process data not needed any more --- erna/__init__.py | 4 +--- erna/scripts/process_fact_data.py | 36 +++++++++---------------------- erna/scripts/process_fact_mc.py | 1 - 3 files changed, 11 insertions(+), 30 deletions(-) diff --git a/erna/__init__.py b/erna/__init__.py index decb192..80f13da 100644 --- a/erna/__init__.py +++ b/erna/__init__.py @@ -185,13 +185,11 @@ def load( data["filename"] = build_filename(data.NIGHT, data.RUNID) drs_data["filename"] = build_filename(drs_data.NIGHT, drs_data.RUNID) - # write path TODO: file ending? is everything in fz? #TODO in work mbulinski + # write path TODO automaticly figures the out weather it is a gz or fz file data["path"] = data.apply(build_path_data, axis=1, path_to_data=path_to_data) drs_data["path"] = drs_data.apply(build_path, axis=1, path_to_data=path_to_data, extension='.drs.fits.gz') drs_data = drs_data.dropna(subset=["path"]) - #data.to_csv("data.csv") - #drs_data.to_csv("drs.csv") # reindex the drs table using the index of the data table. # There are always more data runs than drs run in the db. diff --git a/erna/scripts/process_fact_data.py b/erna/scripts/process_fact_data.py index 3ab605c..070dbb8 100755 --- a/erna/scripts/process_fact_data.py +++ b/erna/scripts/process_fact_data.py @@ -10,26 +10,19 @@ import erna from erna import stream_runner -from erna import stream_runner_local_output - import erna.datacheck_conditions as dcc logger = logging.getLogger(__name__) -def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, vmem, num_runs_per_bunch, walltime, local=False): +def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, vmem, num_runs_per_bunch, walltime): jobs = [] # create job objects df_mapping["bunch_index"]= np.arange(len(df_mapping)) // num_runs_per_bunch for num, df in df_mapping.groupby("bunch_index"): df=df.copy() df["bunch_index"] = num - file = output_directory+"/"+df.filenameData[0]+".fits" - #print(file, num) - if local: - job = Job(stream_runner_local_output.run, [jar, xml, df, file, db_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) - else: - job = Job(stream_runner.run, [jar, xml, df, db_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) + job = Job(stream_runner.run, [jar, xml, df, db_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) jobs.append(job) return jobs @@ -41,7 +34,7 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, v @click.argument('data_dir', type=click.Path(exists=True, dir_okay=True, file_okay=False, readable=True) ) @click.argument('jar', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) ) @click.argument('xml', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) ) -@click.argument('db', type=click.Path(exists=True, dir_okay=True, file_okay=True, readable=True) ) +@click.argument('db', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) ) @click.argument('out', type=click.Path(exists=False, dir_okay=False, file_okay=True, readable=True) ) @click.option('--queue', help='Name of the queue you want to send jobs to.', default='short') @click.option('--walltime', help='Estimated maximum walltime of your job in format hh:mm:ss.', default='02:00:00') @@ -51,18 +44,11 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, v @click.option('--log_level', type=click.Choice(['INFO', 'DEBUG', 'WARN']), help='increase output verbosity', default='INFO') @click.option('--port', help='The port through which to communicate with the JobMonitor', default=12856, type=int) @click.option('--source', help='Name of the source to analyze. e.g Crab', default='Crab') -@click.option('--conditions', help='Name of the data conditions as given in datacheck_conditions.py e.g std', default='standard') +@click.option('--conditions', help='Name of the data conditions as given in datacheck_conditions.py e.g std', default='std') @click.option('--max_delta_t', default=30, help='Maximum time difference (minutes) allowed between drs and data files.', type=click.INT) @click.option('--local', default=False,is_flag=True, help='Flag indicating whether jobs should be executed localy .') -@click.option('--local_output', default=False,is_flag=True, - help='Flag indicating whether jobs write their output localy' - + 'to disk without gathering everything in the mother' - + 'process. In this case the output file only contains a' - + 'summary oth the processed jobs. The data ouput will be' - + 'inseparate files', - show_default=True) -@click.password_option(help='password to read from the always awesome RunDB', confirmation_prompt=False) -def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, walltime, engine, num_runs, vmem, log_level, port, source, conditions, max_delta_t, local, local_output, password): +@click.password_option(help='password to read from the always awesome RunDB') +def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, walltime, engine, num_runs, vmem, log_level, port, source, conditions, max_delta_t, local, password): level=logging.INFO if log_level is 'DEBUG': @@ -81,21 +67,19 @@ def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, wallt erna.ensure_output(out) db_path = os.path.abspath(db) output_directory = os.path.dirname(outpath) - #create dir if it doesnt exist #TODO check: should already been done by enra.ensure_output + #create dir if it doesnt exist os.makedirs(output_directory, exist_ok=True) logger.info("Writing output data to {}".format(out)) factdb = sqlalchemy.create_engine("mysql+pymysql://factread:{}@129.194.168.95/factdata".format(password)) data_conditions=dcc.conditions[conditions] df_runs = erna.load(earliest_night, latest_night, data_dir, source_name=source, timedelta_in_minutes=max_delta_t, factdb=factdb, data_conditions=data_conditions) - - job_list = make_jobs(jarpath, xmlpath, db_path, output_directory, df_runs, engine, queue, vmem, num_runs, walltime, local_output) logger.info("Would process {} jobs with {} runs per job".format(len(df_runs)//num_runs, num_runs)) click.confirm('Do you want to continue processing and start jobs?', abort=True) - + + job_list = make_jobs(jarpath, xmlpath, db_path, output_directory, df_runs, engine, queue, vmem, num_runs, walltime) job_outputs = gridmap.process_jobs(job_list, max_processes=len(job_list), local=local) - if not local_output: - erna.collect_output(job_outputs, out, df_runs) + erna.collect_output(job_outputs, out, df_runs) if __name__ == "__main__": main() diff --git a/erna/scripts/process_fact_mc.py b/erna/scripts/process_fact_mc.py index a65f18d..2b05e3e 100644 --- a/erna/scripts/process_fact_mc.py +++ b/erna/scripts/process_fact_mc.py @@ -127,7 +127,6 @@ def main( jar, xml, out, mc_path, queue, walltime, engine, num_jobs, vmem, log_l if mcdrs: drspath = mcdrs - #drspath = "/home/mbulinski/mc2.drs.fits.gz" logger.info('Using drs file at {}'.format(drspath)) #get data files From a1110e8127f25ef19cc78174806ba3a306b570be Mon Sep 17 00:00:00 2001 From: Michael Bulinski Date: Fri, 13 Oct 2017 11:59:44 +0200 Subject: [PATCH 13/26] removed unnecessary stuff and refactored the way missing files are checked --- erna/__init__.py | 44 ++++++++++++++++++------------- erna/scripts/process_fact_data.py | 9 ++++++- erna/scripts/process_fact_mc.py | 9 +++---- 3 files changed, 36 insertions(+), 26 deletions(-) diff --git a/erna/__init__.py b/erna/__init__.py index 80f13da..b70a48a 100644 --- a/erna/__init__.py +++ b/erna/__init__.py @@ -19,23 +19,31 @@ def build_path(row, path_to_data, extension): month = night[4:6] day = night[6:8] res = os.path.join(path_to_data, year, month, day, row.filename + extension) - if not os.path.exists(res): - return np.nan return res - -def build_path_data(row, path_to_data): - night = str(row.NIGHT) - year = night[0:4] - month = night[4:6] - day = night[6:8] - res_path = os.path.join(path_to_data, year, month, day, row.filename + ".fits.fz") - if not os.path.exists(res_path): - res_path = os.path.join(path_to_data, year, month, day, row.filename + ".fits.gz") - if not os.path.exists(res_path): - return np.nan - #raise FileNotFoundError("The given datafile was not found: "+res_path) - return res_path +def test_drs_path(df, key): + numRows = len(df) + mask = np.ones(numRows); + for i in range(numRows): + drspath = fd[key][i] + if not os.path.exists(drspath): + mask[i] = 0 + return df.groupby(mask) + +def test_path_data(df, key): + numRows = len(df) + mask = np.ones(numRows); + for i in range(numRows): + datapath = fd[key][i] + if not os.path.exists(datapath): + #check if maybe the datafile is a gz file and not fz + datapath = datapath[:-3]+".gz" + if not os.path.exists(datapath): + mask[i] = 0 + else: #fix the datapath + fd[key][i] = datapath + + return df.groupby(mask) def build_filename(night, run_id): return night.astype(str) + '_' + run_id.map('{:03d}'.format) @@ -186,10 +194,10 @@ def load( drs_data["filename"] = build_filename(drs_data.NIGHT, drs_data.RUNID) # write path TODO automaticly figures the out weather it is a gz or fz file - data["path"] = data.apply(build_path_data, axis=1, path_to_data=path_to_data) + data["path"] = data.apply(build_path, axis=1, path_to_data=path_to_data, extension='.fits.fz') drs_data["path"] = drs_data.apply(build_path, axis=1, path_to_data=path_to_data, extension='.drs.fits.gz') - drs_data = drs_data.dropna(subset=["path"]) + drs_data = test_drs_path(drs_data, "path").groups[1] # reindex the drs table using the index of the data table. # There are always more data runs than drs run in the db. @@ -218,7 +226,6 @@ def load( data.fOnTime, data.fEffectiveOn, data.NIGHT, data.RUNID, - data.filename ], axis=1, keys=[ "filename", "drs_path", @@ -228,7 +235,6 @@ def load( "effective_on", "NIGHT", "RUNID", - "filenameData" ]) mapping = mapping.dropna(how='any') diff --git a/erna/scripts/process_fact_data.py b/erna/scripts/process_fact_data.py index 070dbb8..3d5ed86 100755 --- a/erna/scripts/process_fact_data.py +++ b/erna/scripts/process_fact_data.py @@ -67,12 +67,19 @@ def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, wallt erna.ensure_output(out) db_path = os.path.abspath(db) output_directory = os.path.dirname(outpath) - #create dir if it doesnt exist + # create dir if it doesnt exist os.makedirs(output_directory, exist_ok=True) logger.info("Writing output data to {}".format(out)) factdb = sqlalchemy.create_engine("mysql+pymysql://factread:{}@129.194.168.95/factdata".format(password)) data_conditions=dcc.conditions[conditions] df_runs = erna.load(earliest_night, latest_night, data_dir, source_name=source, timedelta_in_minutes=max_delta_t, factdb=factdb, data_conditions=data_conditions) + + # check for missing data and fix possible wrong file extension (.fz->.gz) + groups = erna.test_data_path(df_runs, "data_path").groups + df_runs = groups[1] + missing_data_df = groups[0] + + logger.warn("Missing {} dataruns due to missing datafiles".format(len(missing_data_df))) logger.info("Would process {} jobs with {} runs per job".format(len(df_runs)//num_runs, num_runs)) click.confirm('Do you want to continue processing and start jobs?', abort=True) diff --git a/erna/scripts/process_fact_mc.py b/erna/scripts/process_fact_mc.py index 2b05e3e..d2fe35c 100644 --- a/erna/scripts/process_fact_mc.py +++ b/erna/scripts/process_fact_mc.py @@ -33,7 +33,7 @@ def createFilenameFromFormat(format, basename, num): def make_jobs(jar, xml, data_paths, drs_paths, - engine, queue, vmem, num_jobs, walltime, output_path=None, format=None): + engine, queue, vmem, num_jobs, walltime, output_path=None, format="%b_%n.json"): jobs = [] data_partitions = np.array_split(data_paths, num_jobs) @@ -48,10 +48,7 @@ def make_jobs(jar, xml, data_paths, drs_paths, if output_path: # create the filenames for each single local run file_name, _ = path.splitext(path.basename(output_path)) - if format: - file_name = createFilenameFromFormat(format, file_name, num) - else: - file_name = createFilenameFromFormat("%b_%n.json", file_name, num) + file_name = createFilenameFromFormat(format, file_name, num) out_path = path.dirname(output_path) run = [jar, xml, df, path.join(out_path, file_name)] stream_runner = stream_runner_local @@ -97,7 +94,7 @@ def make_jobs(jar, xml, data_paths, drs_paths, show_default=True) @click.option('--mcdrs', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True)) @click.option('--mcwildcard', help="Gives the wildward for searching the folder for files.", type=click.STRING, default='**/*_Events.fit*') -@click.option('--local_output_format', default=None, help="Give the file format for the local output funktionality." +@click.option('--local_output_format', default="%b_%n.json", help="Give the file format for the local output funktionality." + "%b will replace the out filename and %[1-9]n the given local number." + "Default is: '%b_%n.json'.Only works with option --local_output. ") def main( jar, xml, out, mc_path, queue, walltime, engine, num_jobs, vmem, log_level, port, local, local_output, mcdrs, mcwildcard, local_output_format): From bab0ef4d4f1707b8ae849bf8bca55095f56e1418 Mon Sep 17 00:00:00 2001 From: Michael Bulinski Date: Fri, 13 Oct 2017 12:08:10 +0200 Subject: [PATCH 14/26] fixed error in createFilenameFromFormat --- erna/scripts/process_fact_mc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/erna/scripts/process_fact_mc.py b/erna/scripts/process_fact_mc.py index d2fe35c..4785240 100644 --- a/erna/scripts/process_fact_mc.py +++ b/erna/scripts/process_fact_mc.py @@ -28,7 +28,7 @@ def createFilenameFromFormat(format, basename, num): else: numstr = "%i" % num stemp = format.replace(m.group(0), numstr) - stemp = format.replace("%b", basename) + stemp = stemp.replace("%b", basename) return stemp From 8683455d4791eb6c5cb581ce737ca8680c1abd2f Mon Sep 17 00:00:00 2001 From: Michael Bulinski Date: Fri, 13 Oct 2017 12:26:39 +0200 Subject: [PATCH 15/26] changed db argument to dir_okey --- erna/scripts/process_fact_data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/erna/scripts/process_fact_data.py b/erna/scripts/process_fact_data.py index 3d5ed86..8fa5038 100755 --- a/erna/scripts/process_fact_data.py +++ b/erna/scripts/process_fact_data.py @@ -34,7 +34,7 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, v @click.argument('data_dir', type=click.Path(exists=True, dir_okay=True, file_okay=False, readable=True) ) @click.argument('jar', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) ) @click.argument('xml', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) ) -@click.argument('db', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) ) +@click.argument('db', type=click.Path(exists=True, dir_okay=True, file_okay=True, readable=True) ) @click.argument('out', type=click.Path(exists=False, dir_okay=False, file_okay=True, readable=True) ) @click.option('--queue', help='Name of the queue you want to send jobs to.', default='short') @click.option('--walltime', help='Estimated maximum walltime of your job in format hh:mm:ss.', default='02:00:00') From ec3c9489104c8570b55a550a3c882faa63953f20 Mon Sep 17 00:00:00 2001 From: Michael Bulinski Date: Fri, 13 Oct 2017 12:31:01 +0200 Subject: [PATCH 16/26] changed default condition to standard and fixed spelling error in variable --- erna/__init__.py | 6 +++--- erna/scripts/process_fact_data.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/erna/__init__.py b/erna/__init__.py index b70a48a..9ca25f0 100644 --- a/erna/__init__.py +++ b/erna/__init__.py @@ -25,7 +25,7 @@ def test_drs_path(df, key): numRows = len(df) mask = np.ones(numRows); for i in range(numRows): - drspath = fd[key][i] + drspath = df[key][i] if not os.path.exists(drspath): mask[i] = 0 return df.groupby(mask) @@ -34,14 +34,14 @@ def test_path_data(df, key): numRows = len(df) mask = np.ones(numRows); for i in range(numRows): - datapath = fd[key][i] + datapath = df[key][i] if not os.path.exists(datapath): #check if maybe the datafile is a gz file and not fz datapath = datapath[:-3]+".gz" if not os.path.exists(datapath): mask[i] = 0 else: #fix the datapath - fd[key][i] = datapath + df[key][i] = datapath return df.groupby(mask) diff --git a/erna/scripts/process_fact_data.py b/erna/scripts/process_fact_data.py index 8fa5038..865a06f 100755 --- a/erna/scripts/process_fact_data.py +++ b/erna/scripts/process_fact_data.py @@ -44,7 +44,7 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, v @click.option('--log_level', type=click.Choice(['INFO', 'DEBUG', 'WARN']), help='increase output verbosity', default='INFO') @click.option('--port', help='The port through which to communicate with the JobMonitor', default=12856, type=int) @click.option('--source', help='Name of the source to analyze. e.g Crab', default='Crab') -@click.option('--conditions', help='Name of the data conditions as given in datacheck_conditions.py e.g std', default='std') +@click.option('--conditions', help='Name of the data conditions as given in datacheck_conditions.py e.g standard', default='standard') @click.option('--max_delta_t', default=30, help='Maximum time difference (minutes) allowed between drs and data files.', type=click.INT) @click.option('--local', default=False,is_flag=True, help='Flag indicating whether jobs should be executed localy .') @click.password_option(help='password to read from the always awesome RunDB') From f4b9004622f94f80f1ba980df5fc2e11e9ed2262 Mon Sep 17 00:00:00 2001 From: Michael Bulinski Date: Fri, 13 Oct 2017 12:35:21 +0200 Subject: [PATCH 17/26] fixed error with usage of groupby --- erna/__init__.py | 2 +- erna/scripts/process_fact_data.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/erna/__init__.py b/erna/__init__.py index 9ca25f0..0cff492 100644 --- a/erna/__init__.py +++ b/erna/__init__.py @@ -197,7 +197,7 @@ def load( data["path"] = data.apply(build_path, axis=1, path_to_data=path_to_data, extension='.fits.fz') drs_data["path"] = drs_data.apply(build_path, axis=1, path_to_data=path_to_data, extension='.drs.fits.gz') - drs_data = test_drs_path(drs_data, "path").groups[1] + drs_data = test_drs_path(drs_data, "path").get_group(1) # reindex the drs table using the index of the data table. # There are always more data runs than drs run in the db. diff --git a/erna/scripts/process_fact_data.py b/erna/scripts/process_fact_data.py index 865a06f..72a0d2d 100755 --- a/erna/scripts/process_fact_data.py +++ b/erna/scripts/process_fact_data.py @@ -75,9 +75,9 @@ def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, wallt df_runs = erna.load(earliest_night, latest_night, data_dir, source_name=source, timedelta_in_minutes=max_delta_t, factdb=factdb, data_conditions=data_conditions) # check for missing data and fix possible wrong file extension (.fz->.gz) - groups = erna.test_data_path(df_runs, "data_path").groups - df_runs = groups[1] - missing_data_df = groups[0] + df = erna.test_data_path(df_runs, "data_path") + df_runs = df.get_group(1) + missing_data_df = df.get_group(0) logger.warn("Missing {} dataruns due to missing datafiles".format(len(missing_data_df))) From f8885cc5ba9da543c54de9ecde5d35e50f3edb90 Mon Sep 17 00:00:00 2001 From: tarrox Date: Fri, 13 Oct 2017 12:52:07 +0200 Subject: [PATCH 18/26] fixed error with setting data in dataframe --- erna/__init__.py | 2 +- erna/scripts/process_fact_data.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/erna/__init__.py b/erna/__init__.py index 0cff492..c380461 100644 --- a/erna/__init__.py +++ b/erna/__init__.py @@ -41,7 +41,7 @@ def test_path_data(df, key): if not os.path.exists(datapath): mask[i] = 0 else: #fix the datapath - df[key][i] = datapath + df.iloc[i, df.columns.get_loc(key)] = datapath return df.groupby(mask) diff --git a/erna/scripts/process_fact_data.py b/erna/scripts/process_fact_data.py index 72a0d2d..b44d94c 100755 --- a/erna/scripts/process_fact_data.py +++ b/erna/scripts/process_fact_data.py @@ -75,7 +75,7 @@ def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, wallt df_runs = erna.load(earliest_night, latest_night, data_dir, source_name=source, timedelta_in_minutes=max_delta_t, factdb=factdb, data_conditions=data_conditions) # check for missing data and fix possible wrong file extension (.fz->.gz) - df = erna.test_data_path(df_runs, "data_path") + df = erna.test_path_data(df_runs, "data_path") df_runs = df.get_group(1) missing_data_df = df.get_group(0) From 9085ad734049f48a31a3bced76eef7debe58fc5f Mon Sep 17 00:00:00 2001 From: Michael Bulinski Date: Fri, 13 Oct 2017 13:41:10 +0200 Subject: [PATCH 19/26] Fixed all the stuff for the pull request --- erna/__init__.py | 47 ++++++++++++++++--------------- erna/scripts/process_fact_data.py | 6 ++-- erna/scripts/process_fact_mc.py | 39 ++++++++++++++----------- 3 files changed, 51 insertions(+), 41 deletions(-) diff --git a/erna/__init__.py b/erna/__init__.py index c380461..fd5c205 100644 --- a/erna/__init__.py +++ b/erna/__init__.py @@ -14,6 +14,9 @@ def build_path(row, path_to_data, extension): + """ + builds a path to the fact data given the night, extension and filename + """ night = str(row.NIGHT) year = night[0:4] month = night[4:6] @@ -22,28 +25,26 @@ def build_path(row, path_to_data, extension): return res def test_drs_path(df, key): - numRows = len(df) - mask = np.ones(numRows); - for i in range(numRows): - drspath = df[key][i] - if not os.path.exists(drspath): - mask[i] = 0 - return df.groupby(mask) + """ + Test if the given drs paths in the key are present + """ + mask = df[key].apply(os.path.exists) + df['drs_file_exists'] = mask -def test_path_data(df, key): - numRows = len(df) - mask = np.ones(numRows); - for i in range(numRows): - datapath = df[key][i] - if not os.path.exists(datapath): - #check if maybe the datafile is a gz file and not fz - datapath = datapath[:-3]+".gz" - if not os.path.exists(datapath): - mask[i] = 0 - else: #fix the datapath - df.iloc[i, df.columns.get_loc(key)] = datapath - - return df.groupby(mask) + return df + + +def test_data_path(df, key): + """ + Test the given data paths in key if they exists. It tests for + both possible fileextensions [.fz, .gz] and corrects if necessary. + """ + mask = df[key].apply(os.path.exists) + df['data_file_exists'] = mask + df.loc[mask, key] = df.loc[mask, key].str.replace('.fz', '.gz') + df.loc[mask, 'file_exists'] = df.loc[mask, key].apply(os.path.exists) + + return df def build_filename(night, run_id): return night.astype(str) + '_' + run_id.map('{:03d}'.format) @@ -197,7 +198,9 @@ def load( data["path"] = data.apply(build_path, axis=1, path_to_data=path_to_data, extension='.fits.fz') drs_data["path"] = drs_data.apply(build_path, axis=1, path_to_data=path_to_data, extension='.drs.fits.gz') - drs_data = test_drs_path(drs_data, "path").get_group(1) + #remove all none existing drs files + drs_data = test_drs_path(drs_data, "path") + drs_data = drs_data[drs_data['drs_data_exists']] # reindex the drs table using the index of the data table. # There are always more data runs than drs run in the db. diff --git a/erna/scripts/process_fact_data.py b/erna/scripts/process_fact_data.py index b44d94c..21ae7a7 100755 --- a/erna/scripts/process_fact_data.py +++ b/erna/scripts/process_fact_data.py @@ -76,10 +76,10 @@ def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, wallt # check for missing data and fix possible wrong file extension (.fz->.gz) df = erna.test_path_data(df_runs, "data_path") - df_runs = df.get_group(1) - missing_data_df = df.get_group(0) + df_runs = df[df['data_runs_exists']] + df_runs_missing = df[~df['data_runs_exists']] - logger.warn("Missing {} dataruns due to missing datafiles".format(len(missing_data_df))) + logger.warn("Missing {} dataruns due to missing datafiles".format(len(df_runs_missing))) logger.info("Would process {} jobs with {} runs per job".format(len(df_runs)//num_runs, num_runs)) click.confirm('Do you want to continue processing and start jobs?', abort=True) diff --git a/erna/scripts/process_fact_mc.py b/erna/scripts/process_fact_mc.py index 4785240..ee29ea3 100644 --- a/erna/scripts/process_fact_mc.py +++ b/erna/scripts/process_fact_mc.py @@ -18,22 +18,29 @@ import re # given a string create a filename -def createFilenameFromFormat(format, basename, num): - m = re.search('\%[0-9]?n', format) - if not m: - raise "Missing number placement in format string" - numstr = "" - if len(m.group(0))==3: - numstr = ("%0"+m.group(0)[1]+"i") % num - else: - numstr = "%i" % num - stemp = format.replace(m.group(0), numstr) - stemp = stemp.replace("%b", basename) - return stemp - +def create_filename_from_format(filename_format, basename, num): + """ + Given a special format string, create a filename_format with the basename and a given number. + %b is replaced with the basename and is optional + %[0-9]?n is replaced with the number and must be supplied the number given will pad the given number with zeros + + Exp: create_filename_from_format("%b_%4n.fits", "test", 45)->"test_0045.fits" + """ + m = re.search('\%[0-9]?n', filename_format) + if not m: + raise "Missing number placement in format string" + numstr = "" + if len(m.group(0))==3: + numstr = ("%0"+m.group(0)[1]+"i") % num + else: + numstr = "%i" % num + stemp = filename_format.replace(m.group(0), numstr) + stemp = stemp.replace("%b", basename) + return stemp + def make_jobs(jar, xml, data_paths, drs_paths, - engine, queue, vmem, num_jobs, walltime, output_path=None, format="%b_%n.json"): + engine, queue, vmem, num_jobs, walltime, output_path=None, filename_format="%b_%n.json"): jobs = [] data_partitions = np.array_split(data_paths, num_jobs) @@ -48,7 +55,7 @@ def make_jobs(jar, xml, data_paths, drs_paths, if output_path: # create the filenames for each single local run file_name, _ = path.splitext(path.basename(output_path)) - file_name = createFilenameFromFormat(format, file_name, num) + file_name = createFilenameFromFormat(filename_format, file_name, num) out_path = path.dirname(output_path) run = [jar, xml, df, path.join(out_path, file_name)] stream_runner = stream_runner_local @@ -152,7 +159,7 @@ def main( jar, xml, out, mc_path, queue, walltime, engine, num_jobs, vmem, log_l job_list = make_jobs( jarpath, xmlpath, mc_paths_array, drs_paths_array, engine, queue, - vmem, num_jobs, walltime, output_path=local_output_dir, format=local_output_format + vmem, num_jobs, walltime, output_path=local_output_dir, filename_format=local_output_format ) else: job_list = make_jobs( From 17ed393224bb24abb5a16e9a5d52cdb08b35e8d5 Mon Sep 17 00:00:00 2001 From: Michael Bulinski Date: Fri, 13 Oct 2017 13:43:24 +0200 Subject: [PATCH 20/26] fixed wrong key names --- erna/__init__.py | 2 +- erna/scripts/process_fact_data.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/erna/__init__.py b/erna/__init__.py index fd5c205..54a154a 100644 --- a/erna/__init__.py +++ b/erna/__init__.py @@ -200,7 +200,7 @@ def load( #remove all none existing drs files drs_data = test_drs_path(drs_data, "path") - drs_data = drs_data[drs_data['drs_data_exists']] + drs_data = drs_data[drs_data['drs_file_exists']] # reindex the drs table using the index of the data table. # There are always more data runs than drs run in the db. diff --git a/erna/scripts/process_fact_data.py b/erna/scripts/process_fact_data.py index 21ae7a7..feb31b5 100755 --- a/erna/scripts/process_fact_data.py +++ b/erna/scripts/process_fact_data.py @@ -76,8 +76,8 @@ def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, wallt # check for missing data and fix possible wrong file extension (.fz->.gz) df = erna.test_path_data(df_runs, "data_path") - df_runs = df[df['data_runs_exists']] - df_runs_missing = df[~df['data_runs_exists']] + df_runs = df[df['data_file_exists']] + df_runs_missing = df[~df['data_file_exists']] logger.warn("Missing {} dataruns due to missing datafiles".format(len(df_runs_missing))) From b3fad26211824880fccf6219e398f21f9b0cbf94 Mon Sep 17 00:00:00 2001 From: tarrox Date: Fri, 13 Oct 2017 13:48:18 +0200 Subject: [PATCH 21/26] fixed error with the test_data_path method not inverting the mask --- erna/__init__.py | 4 ++-- erna/scripts/process_fact_data.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/erna/__init__.py b/erna/__init__.py index 54a154a..a4afa25 100644 --- a/erna/__init__.py +++ b/erna/__init__.py @@ -41,8 +41,8 @@ def test_data_path(df, key): """ mask = df[key].apply(os.path.exists) df['data_file_exists'] = mask - df.loc[mask, key] = df.loc[mask, key].str.replace('.fz', '.gz') - df.loc[mask, 'file_exists'] = df.loc[mask, key].apply(os.path.exists) + df.loc[~mask, key] = df.loc[~mask, key].str.replace('.fz', '.gz') + df.loc[~mask, 'data_file_exists'] = df.loc[~mask, key].apply(os.path.exists) return df diff --git a/erna/scripts/process_fact_data.py b/erna/scripts/process_fact_data.py index feb31b5..dc84c29 100755 --- a/erna/scripts/process_fact_data.py +++ b/erna/scripts/process_fact_data.py @@ -75,7 +75,7 @@ def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, wallt df_runs = erna.load(earliest_night, latest_night, data_dir, source_name=source, timedelta_in_minutes=max_delta_t, factdb=factdb, data_conditions=data_conditions) # check for missing data and fix possible wrong file extension (.fz->.gz) - df = erna.test_path_data(df_runs, "data_path") + df = erna.test_data_path(df_runs, "data_path") df_runs = df[df['data_file_exists']] df_runs_missing = df[~df['data_file_exists']] From 1a4f21ec81607051855eb8464f695fc9ee5e11fc Mon Sep 17 00:00:00 2001 From: Michael Bulinski Date: Fri, 13 Oct 2017 16:41:40 +0200 Subject: [PATCH 22/26] fixed errors --- erna/__init__.py | 2 +- erna/scripts/process_fact_mc.py | 30 ++++++++++-------------------- 2 files changed, 11 insertions(+), 21 deletions(-) diff --git a/erna/__init__.py b/erna/__init__.py index 54a154a..a7b69ad 100644 --- a/erna/__init__.py +++ b/erna/__init__.py @@ -194,7 +194,7 @@ def load( data["filename"] = build_filename(data.NIGHT, data.RUNID) drs_data["filename"] = build_filename(drs_data.NIGHT, drs_data.RUNID) - # write path TODO automaticly figures the out weather it is a gz or fz file + # write path data["path"] = data.apply(build_path, axis=1, path_to_data=path_to_data, extension='.fits.fz') drs_data["path"] = drs_data.apply(build_path, axis=1, path_to_data=path_to_data, extension='.drs.fits.gz') diff --git a/erna/scripts/process_fact_mc.py b/erna/scripts/process_fact_mc.py index ee29ea3..097a31b 100644 --- a/erna/scripts/process_fact_mc.py +++ b/erna/scripts/process_fact_mc.py @@ -17,30 +17,20 @@ import re -# given a string create a filename def create_filename_from_format(filename_format, basename, num): """ Given a special format string, create a filename_format with the basename and a given number. - %b is replaced with the basename and is optional - %[0-9]?n is replaced with the number and must be supplied the number given will pad the given number with zeros - - Exp: create_filename_from_format("%b_%4n.fits", "test", 45)->"test_0045.fits" + There are two named variables that can be used, one is basename which inserts the basename + and the second one is num which is mandatory. """ - m = re.search('\%[0-9]?n', filename_format) + m = re.search('\{num', filename_format) if not m: - raise "Missing number placement in format string" - numstr = "" - if len(m.group(0))==3: - numstr = ("%0"+m.group(0)[1]+"i") % num - else: - numstr = "%i" % num - stemp = filename_format.replace(m.group(0), numstr) - stemp = stemp.replace("%b", basename) - return stemp + raise ValueError("Missing named placeholder 'num' in format string") + return filename_format.format({"basename":basename, "num":num}) def make_jobs(jar, xml, data_paths, drs_paths, - engine, queue, vmem, num_jobs, walltime, output_path=None, filename_format="%b_%n.json"): + engine, queue, vmem, num_jobs, walltime, output_path=None, filename_format="{basename}_{num}.json"): jobs = [] data_partitions = np.array_split(data_paths, num_jobs) @@ -55,7 +45,7 @@ def make_jobs(jar, xml, data_paths, drs_paths, if output_path: # create the filenames for each single local run file_name, _ = path.splitext(path.basename(output_path)) - file_name = createFilenameFromFormat(filename_format, file_name, num) + file_name = create_filename_from_format(filename_format, file_name, num) out_path = path.dirname(output_path) run = [jar, xml, df, path.join(out_path, file_name)] stream_runner = stream_runner_local @@ -100,10 +90,10 @@ def make_jobs(jar, xml, data_paths, drs_paths, + 'in separate files', show_default=True) @click.option('--mcdrs', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True)) -@click.option('--mcwildcard', help="Gives the wildward for searching the folder for files.", type=click.STRING, default='**/*_Events.fit*') -@click.option('--local_output_format', default="%b_%n.json", help="Give the file format for the local output funktionality." +@click.option('--mcwildcard', help="Gives the wildcard for searching the folder for files.", type=click.STRING, default='**/*_Events.fit*') +@click.option('--local_output_format', default="{basename}_{num}.json", help="Give the file format for the local output funktionality." + "%b will replace the out filename and %[1-9]n the given local number." - + "Default is: '%b_%n.json'.Only works with option --local_output. ") + + "Default is: '{basename}_{num}.json'.Only works with option --local_output. ") def main( jar, xml, out, mc_path, queue, walltime, engine, num_jobs, vmem, log_level, port, local, local_output, mcdrs, mcwildcard, local_output_format): ''' Script to execute fact-tools on MonteCarlo files. Use the MC_PATH argument to specifiy the folders containing the MC From 012bc8af8e6b295f2c7f87deb002fcde4001f541 Mon Sep 17 00:00:00 2001 From: Jens Buss Date: Tue, 17 Oct 2017 13:02:20 +0200 Subject: [PATCH 23/26] rename alle occurances of db to aux_source_path --- erna/scripts/process_fact_data.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/erna/scripts/process_fact_data.py b/erna/scripts/process_fact_data.py index dc84c29..e78cd78 100755 --- a/erna/scripts/process_fact_data.py +++ b/erna/scripts/process_fact_data.py @@ -15,14 +15,14 @@ logger = logging.getLogger(__name__) -def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, vmem, num_runs_per_bunch, walltime): +def make_jobs(jar, xml, aux_source_path, output_directory, df_mapping, engine, queue, vmem, num_runs_per_bunch, walltime): jobs = [] # create job objects df_mapping["bunch_index"]= np.arange(len(df_mapping)) // num_runs_per_bunch for num, df in df_mapping.groupby("bunch_index"): df=df.copy() df["bunch_index"] = num - job = Job(stream_runner.run, [jar, xml, df, db_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) + job = Job(stream_runner.run, [jar, xml, df, aux_source_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) jobs.append(job) return jobs @@ -34,7 +34,7 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, v @click.argument('data_dir', type=click.Path(exists=True, dir_okay=True, file_okay=False, readable=True) ) @click.argument('jar', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) ) @click.argument('xml', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) ) -@click.argument('db', type=click.Path(exists=True, dir_okay=True, file_okay=True, readable=True) ) +@click.argument('aux_source', type=click.Path(exists=True, dir_okay=True, file_okay=True, readable=True) ) @click.argument('out', type=click.Path(exists=False, dir_okay=False, file_okay=True, readable=True) ) @click.option('--queue', help='Name of the queue you want to send jobs to.', default='short') @click.option('--walltime', help='Estimated maximum walltime of your job in format hh:mm:ss.', default='02:00:00') @@ -48,7 +48,7 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, v @click.option('--max_delta_t', default=30, help='Maximum time difference (minutes) allowed between drs and data files.', type=click.INT) @click.option('--local', default=False,is_flag=True, help='Flag indicating whether jobs should be executed localy .') @click.password_option(help='password to read from the always awesome RunDB') -def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, walltime, engine, num_runs, vmem, log_level, port, source, conditions, max_delta_t, local, password): +def main(earliest_night, latest_night, data_dir, jar, xml, aux_source, out, queue, walltime, engine, num_runs, vmem, log_level, port, source, conditions, max_delta_t, local, password): level=logging.INFO if log_level is 'DEBUG': @@ -65,7 +65,7 @@ def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, wallt xmlpath =os. path.abspath(xml) outpath = os.path.abspath(out) erna.ensure_output(out) - db_path = os.path.abspath(db) + aux_source_path = os.path.abspath(aux_source) output_directory = os.path.dirname(outpath) # create dir if it doesnt exist os.makedirs(output_directory, exist_ok=True) @@ -84,7 +84,7 @@ def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, wallt logger.info("Would process {} jobs with {} runs per job".format(len(df_runs)//num_runs, num_runs)) click.confirm('Do you want to continue processing and start jobs?', abort=True) - job_list = make_jobs(jarpath, xmlpath, db_path, output_directory, df_runs, engine, queue, vmem, num_runs, walltime) + job_list = make_jobs(jarpath, xmlpath, aux_source_path, output_directory, df_runs, engine, queue, vmem, num_runs, walltime) job_outputs = gridmap.process_jobs(job_list, max_processes=len(job_list), local=local) erna.collect_output(job_outputs, out, df_runs) From 976bd2327d6c63c7d04f8f676c39d43b0c932007 Mon Sep 17 00:00:00 2001 From: Jens Buss Date: Tue, 17 Oct 2017 13:18:28 +0200 Subject: [PATCH 24/26] replace key "db" by "aux-source" --- erna/scripts/process_fact_data_qsub.py | 8 ++++---- erna/scripts/process_fact_run_list.py | 12 ++++++------ erna/stream_runner.py | 4 ++-- erna/stream_runner_local_output.py | 4 ++-- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/erna/scripts/process_fact_data_qsub.py b/erna/scripts/process_fact_data_qsub.py index 0df9917..c04b34f 100644 --- a/erna/scripts/process_fact_data_qsub.py +++ b/erna/scripts/process_fact_data_qsub.py @@ -54,7 +54,7 @@ def read_outputs_to_list(job_output_paths): @click.argument('data_dir', type=click.Path(exists=True, dir_okay=True, file_okay=False, readable=True)) @click.argument('jar', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True)) @click.argument('xml', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True)) -@click.argument('db', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True)) +@click.argument('aux_source', type=click.Path(exists=True, dir_okay=True, file_okay=True, readable=True)) @click.argument('out', type=click.Path(exists=False, dir_okay=False, file_okay=True, readable=True)) @click.option('--queue', help='Name of the queue you want to send jobs to.', default='short') @click.option('--mail', help='qsub mail settings.', default='a') @@ -70,7 +70,7 @@ def read_outputs_to_list(job_output_paths): @click.option('--max_delta_t', default=30, help='Maximum time difference (minutes) allowed between drs and data files.', type=click.INT) @click.option('--local', default=False,is_flag=True, help='Flag indicating whether jobs should be executed localy .') @click.password_option(help='password to read from the always awesome RunDB') -def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, mail, +def main(earliest_night, latest_night, data_dir, jar, xml, aux_source, out, queue, mail, walltime, engine, num_runs, qjobs, vmem, log_level, port, source, conditions, max_delta_t, local, password): @@ -91,7 +91,7 @@ def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, mail, erna.ensure_output(out) logger.info("Output data will be written to {}".format(out)) - db_path = os.path.abspath(db) + aux_source_path = os.path.abspath(aux_source) output_directory = os.path.dirname(outpath) # create dir if it doesnt exist os.makedirs(output_directory, exist_ok=True) @@ -128,7 +128,7 @@ def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, mail, if ( n_toqueue > 0 ) and ( len(df_runs) > 0): df_to_submit = df_runs.head(n_toqueue*num_runs).copy() processing_identifier = "{}_{}".format(source, time.strftime('%Y%m%d%H%M')) - df_submitted_last = q.submit_qsub_jobs(processing_identifier, jarpath, xmlpath, db_path, df_to_submit, engine, queue, vmem, num_runs, walltime, db, mail) + df_submitted_last = q.submit_qsub_jobs(processing_identifier, jarpath, xmlpath, aux_source_path, df_to_submit, engine, queue, vmem, num_runs, walltime, aux_source, mail) df_submitted = df_submitted.append(df_submitted_last) diff --git a/erna/scripts/process_fact_run_list.py b/erna/scripts/process_fact_run_list.py index 00a3a4e..c83c80e 100644 --- a/erna/scripts/process_fact_run_list.py +++ b/erna/scripts/process_fact_run_list.py @@ -13,7 +13,7 @@ logger = logging.getLogger(__name__) -def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, +def make_jobs(jar, xml, aux_source_path, output_directory, df_mapping, engine, queue, vmem, num_jobs, walltime): jobs = [] # create job objects @@ -21,7 +21,7 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, for num, indices in enumerate(split_indices): df = df_mapping[indices.min(): indices.max()] - job = Job(stream_runner.run, [jar, xml, df, db_path], + job = Job(stream_runner.run, [jar, xml, df, aux_source_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem)) jobs.append(job) @@ -35,7 +35,7 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, @click.argument('file_list', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) ) @click.argument('jar', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) ) @click.argument('xml', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) ) -@click.argument('db', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) ) +@click.argument('aux_source', type=click.Path(exists=True, dir_okay=True, file_okay=True, readable=True) ) @click.argument('out', type=click.Path(exists=False, dir_okay=False, file_okay=True, readable=True) ) @click.option('--queue', help='Name of the queue you want to send jobs to.', default='short') @click.option('--walltime', help='Estimated maximum walltime of your job in format hh:mm:ss.', default='02:00:00') @@ -45,7 +45,7 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, @click.option("--log_level", type=click.Choice(['INFO', 'DEBUG', 'WARN']), help='increase output verbosity', default='INFO') @click.option('--port', help='The port through which to communicate with the JobMonitor', default=12856, type=int) @click.option('--local', default=False,is_flag=True, help='Flag indicating whether jobs should be executed localy .') -def main(file_list, jar, xml, db, out, queue, walltime, engine, num_jobs, vmem, log_level, port, local): +def main(file_list, jar, xml, aux_source, out, queue, walltime, engine, num_jobs, vmem, log_level, port, local): ''' Specify the path to a .json file as created by the fetch_runs.py script via the FILE_LIST argument. num_jobs will be created and executed on the cluster. @@ -67,7 +67,7 @@ def main(file_list, jar, xml, db, out, queue, walltime, engine, num_jobs, vmem, #get data files jarpath = path.abspath(jar) xmlpath = path.abspath(xml) - db_path = path.abspath(db) + aux_source_path = path.abspath(aux_source) outpath = path.abspath(out) output_directory = path.dirname(outpath) #create dir if it doesnt exist @@ -75,7 +75,7 @@ def main(file_list, jar, xml, db, out, queue, walltime, engine, num_jobs, vmem, logger.info("Writing output and temporary data to {}".format(output_directory)) - job_list = make_jobs(jarpath, xmlpath, db_path, output_directory, df, engine, queue, vmem, num_jobs, walltime) + job_list = make_jobs(jarpath, xmlpath, aux_source_path, output_directory, df, engine, queue, vmem, num_jobs, walltime) job_outputs = gridmap.process_jobs(job_list, max_processes=num_jobs, local=local) erna.collect_output(job_outputs, out, df) diff --git a/erna/stream_runner.py b/erna/stream_runner.py index 5279574..0a6c8c0 100644 --- a/erna/stream_runner.py +++ b/erna/stream_runner.py @@ -11,7 +11,7 @@ ) -def run(jar, xml, input_files_df, db_path=None): +def run(jar, xml, input_files_df, aux_source_path=None): ''' This is what will be executed on the cluster ''' @@ -23,7 +23,7 @@ def run(jar, xml, input_files_df, db_path=None): output_path = os.path.join(output_directory, "output.json") input_files_df.to_json(input_path, orient='records', date_format='epoch') - call = assemble_facttools_call(jar, xml, input_path, output_path, db_path) + call = assemble_facttools_call(jar, xml, input_path, output_path, aux_source_path) check_environment_on_node() diff --git a/erna/stream_runner_local_output.py b/erna/stream_runner_local_output.py index 204e7ae..747d842 100644 --- a/erna/stream_runner_local_output.py +++ b/erna/stream_runner_local_output.py @@ -12,7 +12,7 @@ ) -def run(jar, xml, input_files_df, output_path, db_path=None): +def run(jar, xml, input_files_df, output_path, aux_source_path=None): ''' This is a version of ernas stream runner that will be executed on the cluster, but writes its results directly to disk without sending them @@ -26,7 +26,7 @@ def run(jar, xml, input_files_df, output_path, db_path=None): tmp_output_path = os.path.join(output_directory, "output.json") input_files_df.to_json(input_path, orient='records', date_format='epoch') - call = assemble_facttools_call(jar, xml, input_path, tmp_output_path, db_path) + call = assemble_facttools_call(jar, xml, input_path, tmp_output_path, aux_source_path) check_environment_on_node() From 0e24d8503d4008c6700fd4793bf27bb204558798 Mon Sep 17 00:00:00 2001 From: Jens Buss Date: Tue, 17 Oct 2017 13:22:02 +0200 Subject: [PATCH 25/26] replace key "db" by "aux-source" --- erna/utils.py | 4 ++-- example.xml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/erna/utils.py b/erna/utils.py index 0cbec9e..9a4cad8 100644 --- a/erna/utils.py +++ b/erna/utils.py @@ -65,7 +65,7 @@ def date_to_night_int(night): return 10000 * night.year + 100 * night.month + night.day -def assemble_facttools_call(jar, xml, input_path, output_path, db_path=None): +def assemble_facttools_call(jar, xml, input_path, output_path, aux_source_path=None): ''' Assemble the call for fact-tools with the given combinations of jar, xml, input_path and output_path. The db_path is optional for the case where a db_file is needed @@ -83,7 +83,7 @@ def assemble_facttools_call(jar, xml, input_path, output_path, db_path=None): xml, '-Dinput=file:{}'.format(input_path), '-Doutput=file:{}'.format(output_path), - '-Ddb=file:{}'.format(db_path), + '-Daux_source=file:{}'.format(aux_source_path), ] return call diff --git a/example.xml b/example.xml index abba8fd..afee3be 100644 --- a/example.xml +++ b/example.xml @@ -2,13 +2,13 @@ - + - + From 796732e2407eb07ef995a94cd2244436c14bae3b Mon Sep 17 00:00:00 2001 From: Jens Buss Date: Wed, 18 Oct 2017 12:02:02 +0200 Subject: [PATCH 26/26] bump version number --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 095cbc2..71f9691 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name='erna', - version='0.4.2', + version='0.4.3', description='Easy RuN Access. Tools that help to do batch processing of FACT data', url='https://github.com/fact-project/erna', author='Kai Brügge, Jens Buss, Maximilian Nöthe',