Skip to content

Commit

Permalink
Merge pull request #42 from fact-project/noise
Browse files Browse the repository at this point in the history
Noise
  • Loading branch information
maxnoe authored Oct 18, 2017
2 parents 6b3cb7b + 796732e commit 93005b4
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 40 deletions.
47 changes: 41 additions & 6 deletions erna/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,37 @@


def build_path(row, path_to_data, extension):
"""
builds a path to the fact data given the night, extension and filename
"""
night = str(row.NIGHT)
year = night[0:4]
month = night[4:6]
day = night[6:8]
return os.path.join(path_to_data, year, month, day, row.filename + extension)

res = os.path.join(path_to_data, year, month, day, row.filename + extension)
return res

def test_drs_path(df, key):
"""
Test if the given drs paths in the key are present
"""
mask = df[key].apply(os.path.exists)
df['drs_file_exists'] = mask

return df


def test_data_path(df, key):
"""
Test the given data paths in key if they exists. It tests for
both possible fileextensions [.fz, .gz] and corrects if necessary.
"""
mask = df[key].apply(os.path.exists)
df['data_file_exists'] = mask
df.loc[~mask, key] = df.loc[~mask, key].str.replace('.fz', '.gz')
df.loc[~mask, 'data_file_exists'] = df.loc[~mask, key].apply(os.path.exists)

return df

def build_filename(night, run_id):
return night.astype(str) + '_' + run_id.map('{:03d}'.format)
Expand Down Expand Up @@ -65,6 +90,10 @@ def collect_output(job_outputs, output_path, df_started_runs=None, **kwargs):
df_returned_data = pd.concat(frames, ignore_index=True)
logger.info("There are a total of {} events in the result".format(len(df_returned_data)))

if len(df_returned_data)==0:
logger.info("No events in the result were returned, something must have gone bad, better go fix it.")
return

if df_started_runs is not None:
df_merged = pd.merge(df_started_runs, df_returned_data, on=['NIGHT','RUNID'], how='inner')
total_on_time_in_seconds = df_merged.on_time.sum()
Expand Down Expand Up @@ -165,16 +194,22 @@ def load(
data["filename"] = build_filename(data.NIGHT, data.RUNID)
drs_data["filename"] = build_filename(drs_data.NIGHT, drs_data.RUNID)

# write path TODO: file ending? is everything in fz?
# write path
data["path"] = data.apply(build_path, axis=1, path_to_data=path_to_data, extension='.fits.fz')
drs_data["path"] = drs_data.apply(build_path, axis=1, path_to_data=path_to_data, extension='.drs.fits.gz')

#remove all none existing drs files
drs_data = test_drs_path(drs_data, "path")
drs_data = drs_data[drs_data['drs_file_exists']]

# reindex the drs table using the index of the data table.
# There are always more data runs than drs run in the db.
# hence missing rows have to be filled either forward or backwards
earlier_drs_entries = drs_data.reindex(data.index, method="ffill")
earlier_drs_entries = earlier_drs_entries.fillna(axis="index", method="ffill")
later_drs_entries = drs_data.reindex(data.index, method="backfill")

later_drs_entries = later_drs_entries.fillna(axis="index", method="ffill")

# when backfilling the drs obeservations the last rows might be invalid and contain nans.
# We cannot drop them becasue the tables have to have the same length.
# in that case simply fill them up.
Expand All @@ -193,7 +228,7 @@ def load(
closest_drs_entries.deltaT,
data.fOnTime, data.fEffectiveOn,
data.NIGHT,
data.RUNID
data.RUNID,
], axis=1, keys=[
"filename",
"drs_path",
Expand All @@ -204,7 +239,7 @@ def load(
"NIGHT",
"RUNID",
])

mapping = mapping.dropna(how='any')

logger.info("Fetched {} data runs and approx {} drs entries from database where time delta is less than {} minutes".format(len(mapping), mapping['drs_path'].nunique(), timedelta_in_minutes))
Expand Down
23 changes: 15 additions & 8 deletions erna/scripts/process_fact_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
logger = logging.getLogger(__name__)


def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, vmem, num_runs_per_bunch, walltime):
def make_jobs(jar, xml, aux_source_path, output_directory, df_mapping, engine, queue, vmem, num_runs_per_bunch, walltime):
jobs = []
# create job objects
df_mapping["bunch_index"]= np.arange(len(df_mapping)) // num_runs_per_bunch
for num, df in df_mapping.groupby("bunch_index"):
df=df.copy()
df["bunch_index"] = num
job = Job(stream_runner.run, [jar, xml, df, db_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem))
job = Job(stream_runner.run, [jar, xml, df, aux_source_path], queue=queue, walltime=walltime, engine=engine, mem_free='{}mb'.format(vmem))
jobs.append(job)

return jobs
Expand All @@ -34,7 +34,7 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, v
@click.argument('data_dir', type=click.Path(exists=True, dir_okay=True, file_okay=False, readable=True) )
@click.argument('jar', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) )
@click.argument('xml', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) )
@click.argument('db', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) )
@click.argument('aux_source', type=click.Path(exists=True, dir_okay=True, file_okay=True, readable=True) )
@click.argument('out', type=click.Path(exists=False, dir_okay=False, file_okay=True, readable=True) )
@click.option('--queue', help='Name of the queue you want to send jobs to.', default='short')
@click.option('--walltime', help='Estimated maximum walltime of your job in format hh:mm:ss.', default='02:00:00')
Expand All @@ -44,11 +44,11 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue, v
@click.option('--log_level', type=click.Choice(['INFO', 'DEBUG', 'WARN']), help='increase output verbosity', default='INFO')
@click.option('--port', help='The port through which to communicate with the JobMonitor', default=12856, type=int)
@click.option('--source', help='Name of the source to analyze. e.g Crab', default='Crab')
@click.option('--conditions', help='Name of the data conditions as given in datacheck_conditions.py e.g std', default='std')
@click.option('--conditions', help='Name of the data conditions as given in datacheck_conditions.py e.g standard', default='standard')
@click.option('--max_delta_t', default=30, help='Maximum time difference (minutes) allowed between drs and data files.', type=click.INT)
@click.option('--local', default=False,is_flag=True, help='Flag indicating whether jobs should be executed localy .')
@click.password_option(help='password to read from the always awesome RunDB')
def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, walltime, engine, num_runs, vmem, log_level, port, source, conditions, max_delta_t, local, password):
def main(earliest_night, latest_night, data_dir, jar, xml, aux_source, out, queue, walltime, engine, num_runs, vmem, log_level, port, source, conditions, max_delta_t, local, password):

level=logging.INFO
if log_level is 'DEBUG':
Expand All @@ -65,19 +65,26 @@ def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, wallt
xmlpath =os. path.abspath(xml)
outpath = os.path.abspath(out)
erna.ensure_output(out)
db_path = os.path.abspath(db)
aux_source_path = os.path.abspath(aux_source)
output_directory = os.path.dirname(outpath)
#create dir if it doesnt exist
# create dir if it doesnt exist
os.makedirs(output_directory, exist_ok=True)
logger.info("Writing output data to {}".format(out))
factdb = sqlalchemy.create_engine("mysql+pymysql://factread:{}@129.194.168.95/factdata".format(password))
data_conditions=dcc.conditions[conditions]
df_runs = erna.load(earliest_night, latest_night, data_dir, source_name=source, timedelta_in_minutes=max_delta_t, factdb=factdb, data_conditions=data_conditions)

# check for missing data and fix possible wrong file extension (.fz->.gz)
df = erna.test_data_path(df_runs, "data_path")
df_runs = df[df['data_file_exists']]
df_runs_missing = df[~df['data_file_exists']]

logger.warn("Missing {} dataruns due to missing datafiles".format(len(df_runs_missing)))

logger.info("Would process {} jobs with {} runs per job".format(len(df_runs)//num_runs, num_runs))
click.confirm('Do you want to continue processing and start jobs?', abort=True)

job_list = make_jobs(jarpath, xmlpath, db_path, output_directory, df_runs, engine, queue, vmem, num_runs, walltime)
job_list = make_jobs(jarpath, xmlpath, aux_source_path, output_directory, df_runs, engine, queue, vmem, num_runs, walltime)
job_outputs = gridmap.process_jobs(job_list, max_processes=len(job_list), local=local)
erna.collect_output(job_outputs, out, df_runs)

Expand Down
8 changes: 4 additions & 4 deletions erna/scripts/process_fact_data_qsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def read_outputs_to_list(job_output_paths):
@click.argument('data_dir', type=click.Path(exists=True, dir_okay=True, file_okay=False, readable=True))
@click.argument('jar', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True))
@click.argument('xml', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True))
@click.argument('db', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True))
@click.argument('aux_source', type=click.Path(exists=True, dir_okay=True, file_okay=True, readable=True))
@click.argument('out', type=click.Path(exists=False, dir_okay=False, file_okay=True, readable=True))
@click.option('--queue', help='Name of the queue you want to send jobs to.', default='short')
@click.option('--mail', help='qsub mail settings.', default='a')
Expand All @@ -70,7 +70,7 @@ def read_outputs_to_list(job_output_paths):
@click.option('--max_delta_t', default=30, help='Maximum time difference (minutes) allowed between drs and data files.', type=click.INT)
@click.option('--local', default=False,is_flag=True, help='Flag indicating whether jobs should be executed localy .')
@click.password_option(help='password to read from the always awesome RunDB')
def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, mail,
def main(earliest_night, latest_night, data_dir, jar, xml, aux_source, out, queue, mail,
walltime, engine, num_runs, qjobs, vmem, log_level, port, source, conditions,
max_delta_t, local, password):

Expand All @@ -91,7 +91,7 @@ def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, mail,
erna.ensure_output(out)
logger.info("Output data will be written to {}".format(out))

db_path = os.path.abspath(db)
aux_source_path = os.path.abspath(aux_source)
output_directory = os.path.dirname(outpath)
# create dir if it doesnt exist
os.makedirs(output_directory, exist_ok=True)
Expand Down Expand Up @@ -128,7 +128,7 @@ def main(earliest_night, latest_night, data_dir, jar, xml, db, out, queue, mail,
if ( n_toqueue > 0 ) and ( len(df_runs) > 0):
df_to_submit = df_runs.head(n_toqueue*num_runs).copy()
processing_identifier = "{}_{}".format(source, time.strftime('%Y%m%d%H%M'))
df_submitted_last = q.submit_qsub_jobs(processing_identifier, jarpath, xmlpath, db_path, df_to_submit, engine, queue, vmem, num_runs, walltime, db, mail)
df_submitted_last = q.submit_qsub_jobs(processing_identifier, jarpath, xmlpath, aux_source_path, df_to_submit, engine, queue, vmem, num_runs, walltime, aux_source, mail)
df_submitted = df_submitted.append(df_submitted_last)


Expand Down
38 changes: 30 additions & 8 deletions erna/scripts/process_fact_mc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,22 @@

logger = logging.getLogger(__name__)


import re

def create_filename_from_format(filename_format, basename, num):
"""
Given a special format string, create a filename_format with the basename and a given number.
There are two named variables that can be used, one is basename which inserts the basename
and the second one is num which is mandatory.
"""
m = re.search('\{num', filename_format)
if not m:
raise ValueError("Missing named placeholder 'num' in format string")
return filename_format.format({"basename":basename, "num":num})


def make_jobs(jar, xml, data_paths, drs_paths,
engine, queue, vmem, num_jobs, walltime, output_path=None):
engine, queue, vmem, num_jobs, walltime, output_path=None, filename_format="{basename}_{num}.json"):
jobs = []

data_partitions = np.array_split(data_paths, num_jobs)
Expand All @@ -30,8 +43,9 @@ def make_jobs(jar, xml, data_paths, drs_paths,
for num, (data, drs) in enumerate(zip(data_partitions, drs_partitions)):
df = pd.DataFrame({'data_path': data, 'drs_path': drs})
if output_path:
# create the filenames for each single local run
file_name, _ = path.splitext(path.basename(output_path))
file_name += "_{}.json".format(num)
file_name = create_filename_from_format(filename_format, file_name, num)
out_path = path.dirname(output_path)
run = [jar, xml, df, path.join(out_path, file_name)]
stream_runner = stream_runner_local
Expand Down Expand Up @@ -68,14 +82,19 @@ def make_jobs(jar, xml, data_paths, drs_paths,
@click.option("--log_level", type=click.Choice(['INFO', 'DEBUG', 'WARN']), help='increase output verbosity', default='INFO')
@click.option('--port', help='The port through which to communicate with the JobMonitor', default=12856, type=int)
@click.option('--local', default=False,is_flag=True, help='Flag indicating whether jobs should be executed localy.',show_default=True)
@click.option('--local_output', default=False,is_flag=True,
@click.option('--local_output', default=False, is_flag=True,
help='Flag indicating whether jobs write their output localy'
+ 'to disk without gathering everything in the mother'
+ 'process. In this case the output file only contains a'
+ 'summary oth the processed jobs. The data ouput will be'
+ 'inseparate files',
+ 'in separate files',
show_default=True)
def main( jar, xml, out, mc_path, queue, walltime, engine, num_jobs, vmem, log_level, port, local, local_output):
@click.option('--mcdrs', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True))
@click.option('--mcwildcard', help="Gives the wildcard for searching the folder for files.", type=click.STRING, default='**/*_Events.fit*')
@click.option('--local_output_format', default="{basename}_{num}.json", help="Give the file format for the local output funktionality."
+ "%b will replace the out filename and %[1-9]n the given local number."
+ "Default is: '{basename}_{num}.json'.Only works with option --local_output. ")
def main( jar, xml, out, mc_path, queue, walltime, engine, num_jobs, vmem, log_level, port, local, local_output, mcdrs, mcwildcard, local_output_format):
'''
Script to execute fact-tools on MonteCarlo files. Use the MC_PATH argument to specifiy the folders containing the MC
'''
Expand All @@ -99,13 +118,16 @@ def main( jar, xml, out, mc_path, queue, walltime, engine, num_jobs, vmem, log_l
jarpath = path.abspath(jar)
xmlpath = path.abspath(xml)
drspath = erna.mc_drs_file()
if mcdrs:
drspath = mcdrs

logger.info('Using drs file at {}'.format(drspath))

#get data files
files=[]
for folder in tqdm(mc_path):
# print("Entering folder {}".format(folder))
pattern = path.join(folder, '**/*_Events.fit*')
pattern = path.join(folder, mcwildcard)
f = glob.glob(pattern, recursive=True)
files = files + f

Expand All @@ -127,7 +149,7 @@ def main( jar, xml, out, mc_path, queue, walltime, engine, num_jobs, vmem, log_l
job_list = make_jobs(
jarpath, xmlpath, mc_paths_array,
drs_paths_array, engine, queue,
vmem, num_jobs, walltime, output_path=local_output_dir
vmem, num_jobs, walltime, output_path=local_output_dir, filename_format=local_output_format
)
else:
job_list = make_jobs(
Expand Down
12 changes: 6 additions & 6 deletions erna/scripts/process_fact_run_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
logger = logging.getLogger(__name__)


def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue,
def make_jobs(jar, xml, aux_source_path, output_directory, df_mapping, engine, queue,
vmem, num_jobs, walltime):
jobs = []
# create job objects
split_indices = np.array_split(np.arange(len(df_mapping)), num_jobs)
for num, indices in enumerate(split_indices):
df = df_mapping[indices.min(): indices.max()]

job = Job(stream_runner.run, [jar, xml, df, db_path],
job = Job(stream_runner.run, [jar, xml, df, aux_source_path],
queue=queue, walltime=walltime, engine=engine,
mem_free='{}mb'.format(vmem))
jobs.append(job)
Expand All @@ -35,7 +35,7 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue,
@click.argument('file_list', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) )
@click.argument('jar', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) )
@click.argument('xml', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) )
@click.argument('db', type=click.Path(exists=True, dir_okay=False, file_okay=True, readable=True) )
@click.argument('aux_source', type=click.Path(exists=True, dir_okay=True, file_okay=True, readable=True) )
@click.argument('out', type=click.Path(exists=False, dir_okay=False, file_okay=True, readable=True) )
@click.option('--queue', help='Name of the queue you want to send jobs to.', default='short')
@click.option('--walltime', help='Estimated maximum walltime of your job in format hh:mm:ss.', default='02:00:00')
Expand All @@ -45,7 +45,7 @@ def make_jobs(jar, xml, db_path, output_directory, df_mapping, engine, queue,
@click.option("--log_level", type=click.Choice(['INFO', 'DEBUG', 'WARN']), help='increase output verbosity', default='INFO')
@click.option('--port', help='The port through which to communicate with the JobMonitor', default=12856, type=int)
@click.option('--local', default=False,is_flag=True, help='Flag indicating whether jobs should be executed localy .')
def main(file_list, jar, xml, db, out, queue, walltime, engine, num_jobs, vmem, log_level, port, local):
def main(file_list, jar, xml, aux_source, out, queue, walltime, engine, num_jobs, vmem, log_level, port, local):
'''
Specify the path to a .json file as created by the fetch_runs.py script via the FILE_LIST argument.
num_jobs will be created and executed on the cluster.
Expand All @@ -67,15 +67,15 @@ def main(file_list, jar, xml, db, out, queue, walltime, engine, num_jobs, vmem,
#get data files
jarpath = path.abspath(jar)
xmlpath = path.abspath(xml)
db_path = path.abspath(db)
aux_source_path = path.abspath(aux_source)
outpath = path.abspath(out)
output_directory = path.dirname(outpath)
#create dir if it doesnt exist
os.makedirs(output_directory, exist_ok=True)
logger.info("Writing output and temporary data to {}".format(output_directory))


job_list = make_jobs(jarpath, xmlpath, db_path, output_directory, df, engine, queue, vmem, num_jobs, walltime)
job_list = make_jobs(jarpath, xmlpath, aux_source_path, output_directory, df, engine, queue, vmem, num_jobs, walltime)
job_outputs = gridmap.process_jobs(job_list, max_processes=num_jobs, local=local)
erna.collect_output(job_outputs, out, df)

Expand Down
4 changes: 2 additions & 2 deletions erna/stream_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)


def run(jar, xml, input_files_df, db_path=None):
def run(jar, xml, input_files_df, aux_source_path=None):
'''
This is what will be executed on the cluster
'''
Expand All @@ -23,7 +23,7 @@ def run(jar, xml, input_files_df, db_path=None):
output_path = os.path.join(output_directory, "output.json")

input_files_df.to_json(input_path, orient='records', date_format='epoch')
call = assemble_facttools_call(jar, xml, input_path, output_path, db_path)
call = assemble_facttools_call(jar, xml, input_path, output_path, aux_source_path)

check_environment_on_node()

Expand Down
Loading

0 comments on commit 93005b4

Please sign in to comment.