Skip to content

Commit

Permalink
feat: Add logging for study definition stats and timings
Browse files Browse the repository at this point in the history
  • Loading branch information
rebkwok committed Apr 13, 2022
1 parent 127e3b5 commit a9b3567
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 53 deletions.
116 changes: 68 additions & 48 deletions cohortextractor/cohortextractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import cohortextractor
from cohortextractor.exceptions import DummyDataValidationError

from .log_utils import log_execution_time, log_stats

logger = structlog.get_logger()

notebook_tag = "opencorona-research"
Expand Down Expand Up @@ -148,16 +150,19 @@ def generate_cohort(
msg = "You can only provide dummy data for a single study definition"
raise DummyDataValidationError(msg)
for study_name, suffix in study_definitions:
_generate_cohort(
output_dir,
study_name,
suffix,
expectations_population,
dummy_data_file,
index_date_range=index_date_range,
skip_existing=skip_existing,
output_format=output_format,
)
with log_execution_time(
logger, f"generate_cohort for {study_name} (all index dates)"
):
_generate_cohort(
output_dir,
study_name,
suffix,
expectations_population,
dummy_data_file,
index_date_range=index_date_range,
skip_existing=skip_existing,
output_format=output_format,
)


def _generate_cohort(
Expand Down Expand Up @@ -186,25 +191,37 @@ def _generate_cohort(
study = load_study_definition(study_name)

os.makedirs(output_dir, exist_ok=True)
for index_date in _generate_date_range(index_date_range):

index_dates = _generate_date_range(index_date_range)
log_stats(logger, index_date_count=len(index_dates) if index_date_range else 0)
if index_date_range:
log_stats(logger, min_index_date=index_dates[-1], max_index_date=index_dates[0])

for index_date in index_dates:
log_event = f"generate_cohort for {study_name}"
if index_date is not None:
logger.info(f"Setting index_date to {index_date}")
study.set_index_date(index_date)
date_suffix = f"_{index_date}"
else:
date_suffix = ""
# If this is changed then the regex in `_generate_measures()`
# must be updated
output_file = f"{output_dir}/input{suffix}{date_suffix}.{output_format}"
if skip_existing and os.path.exists(output_file):
logger.info(f"Not regenerating pre-existing file at {output_file}")
else:
study.to_file(
output_file,
expectations_population=expectations_population,
dummy_data_file=dummy_data_file,
)
logger.info(f"Successfully created cohort and covariates at {output_file}")
log_event += f" at {index_date}"
with log_execution_time(logger, log_event):
if index_date is not None:
logger.info(f"Setting index_date to {index_date}")
study.set_index_date(index_date)
date_suffix = f"_{index_date}"
else:
date_suffix = ""
# If this is changed then the regex in `_generate_measures()`
# must be updated
output_file = f"{output_dir}/input{suffix}{date_suffix}.{output_format}"
if skip_existing and os.path.exists(output_file):
logger.info(f"Not regenerating pre-existing file at {output_file}")
else:
study.to_file(
output_file,
expectations_population=expectations_population,
dummy_data_file=dummy_data_file,
)
logger.info(
f"Successfully created cohort and covariates at {output_file}"
)


def _generate_date_range(date_range_str):
Expand Down Expand Up @@ -302,6 +319,8 @@ def _generate_measures(
measures = load_study_definition(study_name, value="measures")
measure_outputs = defaultdict(list)
filename_re = re.compile(rf"^input{re.escape(suffix)}.+\.({EXTENSION_REGEX})$")

log_stats(logger, measures_count=len(measures))
for file in os.listdir(output_dir):
if not filename_re.match(file):
continue
Expand All @@ -310,26 +329,27 @@ def _generate_measures(
continue
filepath = os.path.join(output_dir, file)
logger.info(f"Calculating measures for {filepath}")
patient_df = None
for measure in measures:
logger.info(f"Calculating {measure.id}")
output_file = f"{output_dir}/measure_{measure.id}_{date}.csv"
measure_outputs[measure.id].append(output_file)
if skip_existing and os.path.exists(output_file):
logger.info(f"Not generating pre-existing file {output_file}")
continue
# We do this lazily so that if all corresponding output files
# already exist we can avoid loading the patient data entirely
if patient_df is None:
logger.info(f"Loading patient data from {filepath}")
patient_df = _load_dataframe_for_measures(filepath, measures)
logger.info(patient_df.memory_usage())

measure_df = measure.calculate(patient_df, _report)
logger.info(f"Data size for measure {measure.id}:")
logger.info(measure_df.memory_usage())
measure_df.to_csv(output_file, index=False)
logger.info(f"Created measure output at {output_file}")
with log_execution_time(logger, f"generate_measures for {filepath}"):
patient_df = None
for measure in measures:
logger.info(f"Calculating {measure.id}")
output_file = f"{output_dir}/measure_{measure.id}_{date}.csv"
measure_outputs[measure.id].append(output_file)
if skip_existing and os.path.exists(output_file):
logger.info(f"Not generating pre-existing file {output_file}")
continue
# We do this lazily so that if all corresponding output files
# already exist we can avoid loading the patient data entirely
if patient_df is None:
logger.info(f"Loading patient data from {filepath}")
patient_df = _load_dataframe_for_measures(filepath, measures)
logger.info(patient_df.memory_usage())

measure_df = measure.calculate(patient_df, _report)
logger.info(f"Data size for measure {measure.id}:")
logger.info(measure_df.memory_usage())
measure_df.to_csv(output_file, index=False)
logger.info(f"Created measure output at {output_file}")
if not measure_outputs:
logger.warn(
"No matching output files found. You may need to first run:\n"
Expand Down
17 changes: 17 additions & 0 deletions cohortextractor/log_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import datetime
import logging.config
import os
from contextlib import contextmanager

import structlog

Expand Down Expand Up @@ -60,3 +62,18 @@ def init_logging():
},
}
)


def log_stats(logger, **kwargs):
logger.info("cohortextractor-stats", **kwargs)


@contextmanager
def log_execution_time(logger, name):
start = datetime.datetime.utcnow()
try:
yield
finally:
log_stats(
logger, target=name, execution_time=str(datetime.datetime.utcnow() - start)
)
21 changes: 21 additions & 0 deletions cohortextractor/study_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)
from .exceptions import DummyDataValidationError
from .expectation_generators import generate
from .log_utils import log_stats
from .pandas_utils import dataframe_from_rows, dataframe_to_file, dataframe_to_rows
from .process_covariate_definitions import process_covariate_definitions
from .validate_dummy_data import validate_dummy_data
Expand Down Expand Up @@ -46,6 +47,25 @@ def __init__(
self.validate_study_definition()
self.backend = None

self.log_initial_stats(self._original_covariates)

def log_initial_stats(self, covariate_definitions):
"""
Log some initial stats about the study definition
"""
log_stats(logger, variable_count=len(covariate_definitions))
codelist_counts = {
variable_name: len(def_item["codelist"])
for variable_name, definition in covariate_definitions.items()
for def_item in definition
if isinstance(def_item, dict) and "codelist" in def_item
}
log_stats(logger, variables_using_codelist_count=len(codelist_counts))
for variable, codelist_count in codelist_counts.items():
log_stats(
logger, variable_using_codelist=variable, codelist_size=codelist_count
)

def set_index_date(self, index_date):
"""
Re-evaluate all date expressions in the covariate definitions and the
Expand All @@ -64,6 +84,7 @@ def set_index_date(self, index_date):
)
)
if self.backend:
log_stats(logger, resetting_backend_index_date=index_date)
self.recreate_backend()

def to_file(
Expand Down
30 changes: 25 additions & 5 deletions cohortextractor/tpp_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .csv_utils import is_csv_filename, write_rows_to_csv
from .date_expressions import MSSQLDateFormatter
from .expressions import format_expression
from .log_utils import log_execution_time, log_stats
from .mssql_utils import (
mssql_connection_params_from_url,
mssql_dbapi_connection_from_url,
Expand Down Expand Up @@ -99,10 +100,14 @@ def check_ids_and_log(results):
# Special handling for CSV as we can stream this directly to disk
# without building a dataframe in memory
if is_csv_filename(filename):
write_rows_to_csv(results, filename)
with log_execution_time(logger, f"write_rows_to_csv {filename}"):
write_rows_to_csv(results, filename)
else:
df = dataframe_from_rows(self.covariate_definitions, results)
dataframe_to_file(df, filename)
with log_execution_time(
logger, f"Create df and write dataframe_to_file {filename}"
):
df = dataframe_from_rows(self.covariate_definitions, results)
dataframe_to_file(df, filename)

self.execute_queries(
[f"-- Deleting '{output_table}'\nDROP TABLE {output_table}"]
Expand Down Expand Up @@ -368,6 +373,14 @@ def get_queries(self, covariate_definitions):
for sql_list in table_queries.values():
all_queries.extend(sql_list)
all_queries.append(joined_output_query)

log_stats(
logger,
output_column_count=len(output_columns),
table_count=len(table_queries),
table_joins_count=len(joins),
)

return all_queries

def get_column_expression(self, column_type, source, returning, date_format=None):
Expand Down Expand Up @@ -532,8 +545,15 @@ def execute_queries(self, queries):
for query in queries:
comment_match = re.match(r"^\s*\-\-\s*(.+)\n", query)
if comment_match:
logger.info(f"Running: {comment_match.group(1)}")
cursor.execute(query)
event_name = comment_match.group(1)
logger.info(f"Running: {event_name}")
else:
# Log either the initial comment line or the first 50
# characters of the query
event_name = f"{query[:50] if len(query) > 50 else query}..."
with log_execution_time(logger, event_name):
cursor.execute(query)

return cursor

def get_queries_for_column(
Expand Down

0 comments on commit a9b3567

Please sign in to comment.