diff --git a/.github/workflows/build-auth-check.yml b/.github/workflows/build-experiment.yml similarity index 95% rename from .github/workflows/build-auth-check.yml rename to .github/workflows/build-experiment.yml index 4c5a32d..17968d8 100644 --- a/.github/workflows/build-auth-check.yml +++ b/.github/workflows/build-experiment.yml @@ -1,7 +1,7 @@ -name: Build on push to auth-check +name: Build on push to experiment on: push: - branches: [ auth-check ] + branches: [ experiment ] jobs: build_commit: diff --git a/requirements.txt b/requirements.txt index 171b439..adbcaad 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,5 @@ flask==2.0.3 gunicorn==20.1.0 requests==2.27.0 pandas==1.3.5 +retrying==1.3.4 +Werkzeug==2.0.3 \ No newline at end of file diff --git a/src/app.py b/src/app.py index 8e3139a..1d44a9c 100644 --- a/src/app.py +++ b/src/app.py @@ -1,11 +1,14 @@ +from datetime import datetime from flask import Flask, jsonify, request import os import pandas as pd import csv +import logging # from data_processing import * from data_loading import * + # ---------------------------------------------------------------------------- # DATA PARAMETERS # ---------------------------------------------------------------------------- @@ -118,6 +121,25 @@ app = Flask(__name__) app.debug = True +gunicorn_logger = logging.getLogger('gunicorn.error') +app.logger = logging.getLogger("datastore_app") +app.logger.handlers = gunicorn_logger.handlers +app.logger.setLevel(logging.DEBUG) + +logger = logging.getLogger('werkzeug') +logger.addHandler = gunicorn_logger.handlers +logger.setLevel(logging.DEBUG) + +@app.before_request +def before_request_log(): + app.logger.debug(f"{request.remote_addr} \"{request.method} {request.url}\"") + +@app.after_request +def after_request_log(response): + app.logger.debug(f"{request.remote_addr} \"{request.method} {request.url}\" {response.status_code}") + return response + + # APIS: try to load new data, if doesn't work, get most recent @app.route("/api/apis") @@ -130,15 +152,17 @@ def api_imaging(): global api_data_index global api_data_cache try: + tapis_token = get_tapis_token(request) if not api_data_index['imaging'] or not check_data_current(datetime.strptime(api_data_index['imaging'], datetime_format)): api_date = datetime.now().strftime(datetime_format) - imaging_data = get_api_imaging_data(request) + imaging_data = get_api_imaging_data(tapis_token) if imaging_data: + app.logger.info(f"Caching imaging report data. Date: {api_date}") api_data_cache['imaging'] = imaging_data api_data_index['imaging'] = api_date return jsonify({'date': api_data_index['imaging'], 'data': api_data_cache['imaging']}) except Exception as e: - traceback.print_exc() + app.logger.error(("Error in imaging API request: {0}").format(str(e))) return jsonify('error: {}'.format(e)) @app.route("/api/consort") @@ -147,10 +171,12 @@ def api_consort(): global api_data_index global api_data_cache # try: + tapis_token = get_tapis_token(request) if not api_data_index['consort'] or not check_data_current(datetime.strptime(api_data_index['consort'], datetime_format)): api_date = datetime.now().strftime(datetime_format) - consort_data_json = get_api_consort_data(request) + consort_data_json = get_api_consort_data(tapis_token) if consort_data_json: + app.logger.info(f"Caching consort report data. Date: {api_date}") api_data_cache['consort'] = consort_data_json api_data_index['consort'] = api_date return jsonify({'date': api_data_index['consort'], 'data': api_data_cache['consort']}) @@ -165,10 +191,12 @@ def api_blood(): global api_data_index global api_data_cache try: + tapis_token = get_tapis_token(request) if not api_data_index['blood'] or not check_data_current(datetime.strptime(api_data_index['blood'], datetime_format)): api_date = datetime.now().strftime(datetime_format) - blood_data, blood_data_request_status = get_api_blood_data(request) + blood_data, blood_data_request_status = get_api_blood_data(tapis_token) if blood_data: + app.logger.info(f"Caching blood api response data. Date: {api_date}") api_data_index['blood'] = api_date api_data_cache['blood'] = blood_data @@ -180,7 +208,7 @@ def api_blood(): return jsonify({'date': api_data_index['blood'], 'data': api_data_cache['blood']}) except Exception as e: - traceback.print_exc() + app.logger.error(("Error in blood API request: {0}").format(str(e))) return jsonify('error: {}'.format(e)) @@ -192,20 +220,22 @@ def api_subjects(): global subjects_raw_cols_for_reports try: + tapis_token = get_tapis_token(request) if not api_data_index['subjects'] or not check_data_current(datetime.strptime(api_data_index['subjects'], datetime_format)): api_date = datetime.now().strftime(datetime_format) - latest_subjects_json = get_api_subjects_json(request) + latest_subjects_json = get_api_subjects_json(tapis_token) if latest_subjects_json: # latest_data = create_clean_subjects(latest_subjects_json, screening_sites, display_terms_dict, display_terms_dict_multi) latest_data = process_subjects_data(latest_subjects_json,subjects_raw_cols_for_reports,screening_sites, display_terms_dict, display_terms_dict_multi) - + app.logger.info(f"Caching subjects api response data. Date: {api_date}") api_data_cache['subjects'] = latest_data api_data_index['subjects'] = api_date return jsonify({'date': api_data_index['subjects'], 'data': api_data_cache['subjects']}) except Exception as e: - traceback.print_exc() + app.logger.error(("Error in subjects API request: {0}").format(str(e))) return jsonify('error: {}'.format(e)) + def api_tester(): global local_subjects_data @@ -236,6 +266,5 @@ def api_simple(): else: return jsonify('not found') - if __name__ == "__main__": app.run(host='0.0.0.0') diff --git a/src/data_loading.py b/src/data_loading.py index 235e0ae..3ec6b8c 100644 --- a/src/data_loading.py +++ b/src/data_loading.py @@ -11,11 +11,13 @@ import datetime from datetime import datetime +from retrying import retry import logging -logger = logging.getLogger(__name__) files_api_root = os.environ.get('FILES_API_ROOT') portal_api_root = os.environ.get('PORTAL_API_ROOT') +logger = logging.getLogger("datastore_app") + # ---------------------------------------------------------------------------- # Updating data checks @@ -198,12 +200,11 @@ def get_local_subjects_raw(data_directory): # LOAD DATA FROM API # ---------------------------------------------------------------------------- -def get_api_consort_data(api_request, +def get_api_consort_data(tapis_token, report='consort', report_suffix = 'consort-data-[mcc]-latest.csv'): '''Load data for a specified consort file. Handle 500 server errors''' try: - tapis_token = get_tapis_token(api_request) if tapis_token: cosort_columns = ['source','target','value', 'mcc'] @@ -219,7 +220,7 @@ def get_api_consort_data(api_request, for mcc in mcc_list: filename = report_suffix.replace('[mcc]',str(mcc)) csv_url = '/'.join([files_api_root, report, filename]) - csv_request = requests.get(csv_url, headers={'X-Tapis-Token': tapis_token}) + csv_request = make_report_data_request(csv_url, tapis_token) csv_content = csv_request.content try: csv_df = pd.read_csv(io.StringIO(csv_content.decode('utf-8')), usecols=[0,1,2], header=None) @@ -239,7 +240,7 @@ def get_api_consort_data(api_request, return consort_data_json else: - logger.warning("Unauthorized attempt to access Consort data") + logger.exception("Unauthorized attempt to access Consort data") return None except Exception as e: @@ -250,11 +251,9 @@ def get_api_consort_data(api_request, ## Function to rebuild dataset from apis -def get_api_imaging_data(api_request): +def get_api_imaging_data(tapis_token): ''' Load data from imaging api. Return bad status notice if hits Tapis API''' - try: - tapis_token = get_tapis_token(api_request) - + try: if tapis_token: api_dict = { 'subjects':{'subjects1': 'subjects-1-latest.json','subjects2': 'subjects-2-latest.json'}, @@ -264,7 +263,7 @@ def get_api_imaging_data(api_request): # IMAGING imaging_filepath = '/'.join([files_api_root,'imaging',api_dict['imaging']['imaging']]) - imaging_request = requests.get(imaging_filepath, headers={'X-Tapis-Token': tapis_token}) + imaging_request = make_report_data_request(imaging_filepath, tapis_token) if imaging_request.status_code == 200: imaging = pd.read_csv(io.StringIO(imaging_request.content.decode('utf-8'))) else: @@ -272,7 +271,7 @@ def get_api_imaging_data(api_request): qc_filepath = '/'.join([files_api_root,'imaging',api_dict['imaging']['qc']]) - qc_request = requests.get(qc_filepath, headers={'X-Tapis-Token': tapis_token}) + qc_request = make_report_data_request(qc_filepath, tapis_token) if qc_request.status_code == 200: qc = pd.read_csv(io.StringIO(qc_request.content.decode('utf-8'))) else: @@ -286,7 +285,7 @@ def get_api_imaging_data(api_request): return imaging_data_json else: - logger.warning("Unauthorized attempt to access Imaging data") + logger.exception("Unauthorized attempt to access Imaging data") return None except Exception as e: @@ -295,12 +294,10 @@ def get_api_imaging_data(api_request): ## Function to rebuild dataset from apis -def get_api_blood_data(api_request): +def get_api_blood_data(tapis_token): ''' Load blood data from api''' try: current_datetime = datetime.now() - tapis_token = get_tapis_token(api_request) - if tapis_token: api_dict = { 'subjects':{'subjects1': 'subjects-1-latest.json','subjects2': 'subjects-2-latest.json'}, @@ -310,10 +307,10 @@ def get_api_blood_data(api_request): # BLOOD blood1_filepath = '/'.join([files_api_root,'blood',api_dict['blood']['blood1']]) - blood1_request = requests.get(blood1_filepath, headers={'X-Tapis-Token': tapis_token}) + blood1_request = make_report_data_request(blood1_filepath, tapis_token) blood2_filepath = '/'.join([files_api_root,'blood',api_dict['blood']['blood2']]) - blood2_request = requests.get(blood2_filepath, headers={'X-Tapis-Token': tapis_token}) + blood2_request = make_report_data_request(blood2_filepath, tapis_token) if blood1_request.status_code == 200: blood1 = blood1_request.json() @@ -345,7 +342,7 @@ def get_api_blood_data(api_request): return blood_data_json, request_status else: - logger.warning("Unauthorized attempt to access Blood data") + logger.exception("Unauthorized attempt to access Blood data") return None except Exception as e: @@ -354,15 +351,13 @@ def get_api_blood_data(api_request): -def get_api_subjects_json(api_request): +def get_api_subjects_json(tapis_token): ''' Load subjects data from api. Note data needs to be cleaned, etc. to create properly formatted data product''' - try: - tapis_token = get_tapis_token(api_request) - + try: if tapis_token: # Load Json Data subjects1_filepath = '/'.join([files_api_root,'subjects','subjects-1-latest.json']) - subjects1_request = requests.get(subjects1_filepath, headers={'X-Tapis-Token': tapis_token}) + subjects1_request = make_report_data_request(subjects1_filepath, tapis_token) if subjects1_request.status_code == 200: subjects1 = subjects1_request.json() else: @@ -370,7 +365,7 @@ def get_api_subjects_json(api_request): # return {'status':'500', 'source': api_dict['subjects']['subjects1']} subjects2_filepath = '/'.join([files_api_root,'subjects','subjects-2-latest.json']) - subjects2_request = requests.get(subjects2_filepath, headers={'X-Tapis-Token': tapis_token}) + subjects2_request = make_report_data_request(subjects2_filepath, tapis_token) if subjects2_request.status_code == 200: subjects2 = subjects2_request.json() else: @@ -382,26 +377,38 @@ def get_api_subjects_json(api_request): return subjects_json else: - logger.warning("Unauthorized attempt to access Subjects data") + logger.exception("Unauthorized attempt to access Subjects data") return None except Exception as e: traceback.print_exc() return None +# Retry handler for requests +@retry(wait_exponential_multiplier=500, wait_exponential_max=5000, stop_max_attempt_number=3) +def make_request_with_retry(url, cookies): + '''Use exponential retry with requests.''' + return requests.get(url, cookies=cookies) + +# Get Tapis token if authorized to access data files def get_tapis_token(api_request): - try: - response = requests.get(portal_api_root + '/auth/tapis/', cookies=api_request.cookies) - #headers={'cookie':'coresessionid=' + api_request.cookies.get('coresessionid')}) - if response: - tapis_token = response.json()['token'] - return tapis_token - else: - logger.warning("Unauthorized to access tapis token") - raise Exception - except Exception as e: - logger.warning('portal api error: {}'.format(e)) - return False + '''Get tapis token using the session cookie. If the session is not authenticated, this will fail.''' + session_id = api_request.cookies.get("coresessionid") + if session_id is None: + raise Exception("Missing session id") + cookies = {'coresessionid':session_id} + response = make_request_with_retry(portal_api_root + '/auth/tapis/', cookies) + + response.raise_for_status() + tapis_token = response.json()['token'] + logger.info("Received tapis token.") + return tapis_token + +def make_report_data_request(url, tapis_token): + logger.info(f"Sending request to {url}") + response = requests.get(url, headers={'X-Tapis-Token': tapis_token}) + logger.info(f'Response status code: {response.status_code}') + return response # ---------------------------------------------------------------------------- # PROCESS SUBJECTS DATA