diff --git a/src/tlo/util.py b/src/tlo/util.py index 77924e4fa3..168b1d41a1 100644 --- a/src/tlo/util.py +++ b/src/tlo/util.py @@ -1,11 +1,12 @@ """This file contains helpful utility functions.""" import hashlib from collections import defaultdict -from typing import Dict, List, Optional, Set, Union +from pathlib import Path +from typing import Any, Dict, List, Optional, Set, Union import numpy as np import pandas as pd -from pandas import DateOffset +from pandas import DataFrame, DateOffset from tlo import Population, Property, Types @@ -436,3 +437,76 @@ def get_person_id_to_inherit_from(child_id, mother_id, population_dataframe, rng return abs(mother_id) elif mother_id >= 0: return mother_id + + +def convert_excel_files_to_csv(folder: Path, files: Optional[list[str]] = None, *, delete_excel_files: bool = False) -> None: + """ convert Excel files to csv files. + + :param folder: Folder containing Excel files. + :param files: List of Excel file names to convert to csv files. When `None`, all Excel files in the folder and + subsequent folders within this folder will be converted to csv files with Excel file name becoming + folder name and sheet names becoming csv file names. + :param delete_excel_files: When true, the Excel file we are generating csv files from will get deleted. + """ + # get path to Excel files + if files is None: + excel_file_paths = sorted(folder.rglob("*.xlsx")) + else: + excel_file_paths = [folder / file for file in files] + # exit function if no Excel file is given or found within the path + if excel_file_paths is None: + return + + for excel_file_path in excel_file_paths: + sheet_dataframes: dict[Any, DataFrame] = pd.read_excel(excel_file_path, sheet_name=None) + excel_file_directory: Path = excel_file_path.with_suffix("") + # Create a container directory for per sheet CSVs + if excel_file_directory.exists(): + print(f"Directory {excel_file_directory} already exists") + else: + excel_file_directory.mkdir() + # Write a CSV for each worksheet + for sheet_name, dataframe in sheet_dataframes.items(): + dataframe.to_csv(f'{excel_file_directory / sheet_name}.csv', index=False) + + if delete_excel_files: + # Remove no longer needed Excel file + Path(folder/excel_file_path).unlink() + + +def read_csv_files(folder: Path, files: Optional[list[str]] = None) -> DataFrame | dict[str, DataFrame]: + """ + A function to read CSV files in a similar way pandas reads Excel files (:py:func:`pandas.read_excel`). + + NB: Converting Excel files to csv files caused all columns that had no relevant data to simulation (i.e. + parameter descriptions or data references) to be named `Unnamed1, Unnamed2, ....., UnnamedN` in the csv files. + We are therefore using :py:func:`pandas.filter` to track all unnamed columns and silently drop them using + :py:func:`pandas.drop`. + + :param folder: Path to folder containing CSV files to read. + :param files: preferred csv file name(s). This is the same as sheet names in Excel file. Note that if None(no files + selected) then all files in the containing folder will be loaded + + """ + all_data: dict[str, DataFrame] = {} # dataframes dictionary + + def clean_dataframe(dataframes_dict: dict[str, DataFrame]) -> None: + """ silently drop all columns that have no relevant data to simulation (all columns with a name starting with + Unnamed + :param dataframes_dict: Dictionary of dataframes to clean + """ + for _key, dataframe in dataframes_dict.items(): + all_data[_key] = dataframe.drop(dataframe.filter(like='Unnamed'), axis=1) # filter and drop Unnamed columns + + if files is None: + for f_name in folder.rglob("*.csv"): + all_data[f_name.stem] = pd.read_csv(f_name) + + else: + for f_name in files: + all_data[f_name] = pd.read_csv((folder / f_name).with_suffix(".csv")) + # clean and return the dataframe dictionary + clean_dataframe(all_data) + # If only one file loaded return dataframe directly rather than dict + return next(iter(all_data.values())) if len(all_data) == 1 else all_data + diff --git a/tests/resources/ResourceFile_test_convert_to_csv/ResourceFile_test_convert_to_csv.xlsx b/tests/resources/ResourceFile_test_convert_to_csv/ResourceFile_test_convert_to_csv.xlsx new file mode 100644 index 0000000000..84edbd2636 --- /dev/null +++ b/tests/resources/ResourceFile_test_convert_to_csv/ResourceFile_test_convert_to_csv.xlsx @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:af1a6a6aa24a7de385efdf1564da3e3abfbba9fe467d92212b5c87b127e899f6 +size 10714 diff --git a/tests/test_utils.py b/tests/test_utils.py index 02ae63b7ba..1022c95010 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,6 +1,7 @@ """Unit tests for utility functions.""" import os import pickle +import shutil import string import types from pathlib import Path @@ -14,7 +15,7 @@ from tlo import Date, Simulation from tlo.analysis.utils import parse_log_file from tlo.methods import demography -from tlo.util import DEFAULT_MOTHER_ID +from tlo.util import DEFAULT_MOTHER_ID, convert_excel_files_to_csv, read_csv_files path_to_files = Path(os.path.dirname(__file__)) @@ -317,3 +318,148 @@ def check_hash_is_valid(dfh): # check hash differs for different dataframes if not dataframes[i].equals(dataframes[j]): assert df_hash != tlo.util.hash_dataframe(dataframes[j]) + + +def copy_files_to_temporal_directory_and_return_path(tmpdir): + """ copy resource files in tests/resources to a temporal directory and return its path + + :param tmpdir: path to a temporal directory + + """ + resource_filepath = path_to_files / 'resources' + tmpdir_resource_filepath = Path(tmpdir / 'resources') + shutil.copytree(resource_filepath, tmpdir_resource_filepath) + return tmpdir_resource_filepath + + +def test_read_csv_method_with_no_file(tmpdir): + """ read csv method when no file name is supplied + i) should return dictionary. + ii) dictionary keys should match csv file names in resource folder + iii) all dictionary values should be dataframes + + :param tmpdir: path to a temporal directory + + """ + tmpdir_resource_filepath = copy_files_to_temporal_directory_and_return_path(tmpdir) + file_names = [csv_file_path.stem for csv_file_path in tmpdir_resource_filepath.rglob("*.csv")] + df_no_files = read_csv_files(tmpdir_resource_filepath) + assert isinstance(df_no_files, dict) + assert set(df_no_files.keys()) == set(file_names) + assert all(isinstance(value, pd.DataFrame) for value in df_no_files.values()) + + +def test_read_csv_method_with_one_file(tmpdir): + """ test read csv method when one file name is supplied. should return a dataframe + :param tmpdir: path to a temporal directory + + """ + tmpdir_resource_filepath = copy_files_to_temporal_directory_and_return_path(tmpdir) + df = read_csv_files(tmpdir_resource_filepath, files=['df_at_healthcareseeking']) + assert isinstance(df, pd.DataFrame) + + +def test_read_csv_method_with_multiple_files(tmpdir): + """ read csv method when multiple file names are supplied. + i) should return dictionary. + ii) dictionary keys should match supplied file names + iii) all dictionary values should be dataframes + + :param tmpdir: path to a temporal directory + + """ + tmpdir_resource_filepath = copy_files_to_temporal_directory_and_return_path(tmpdir) + file_names = ['df_at_healthcareseeking', 'df_at_init_of_lifestyle'] + df_dict = read_csv_files(tmpdir_resource_filepath, files=file_names) + assert isinstance(df_dict, dict) + assert set(df_dict.keys()) == set(file_names) + for _key, dataframe in df_dict.items(): + assert isinstance(dataframe, pd.DataFrame) + + +def test_read_csv_method_output_matches_previously_used_read_excel(tmpdir): + """ check read from csv method produces same output as the read Excel file + :param tmpdir: path to a temporal directory + + """ + tmpdir_resource_filepath = copy_files_to_temporal_directory_and_return_path(tmpdir) + excel_file_path = Path(tmpdir_resource_filepath + / 'ResourceFile_test_convert_to_csv/ResourceFile_test_convert_to_csv.xlsx') + xls = pd.ExcelFile(excel_file_path) + sheet_names = xls.sheet_names + # convert the above Excel file into csv equivalent. we will use the newly converted files to determine if + # loading parameters from Excel file will be equal to loading parameters from the converted csv files + convert_excel_files_to_csv(folder=Path(tmpdir_resource_filepath / 'ResourceFile_test_convert_to_csv'), + files=[excel_file_path.name]) + + # get excel sheet names + df_excel = pd.read_excel(xls, sheet_name=sheet_names) + + # read newly converted csv files using read_csv_files method + df_csv = read_csv_files(Path(str(excel_file_path).split('.')[0]), + files=sheet_names) + + # dictionary keys from both dataframe dictionaries should match + assert isinstance(df_excel, dict) and isinstance(df_csv, dict) + assert df_excel.keys() == df_csv.keys() + for key in df_excel: + assert df_excel[key].astype(str).equals(df_csv[key].astype(str)) + + +def test_convert_excel_files_method(tmpdir): + """ Test converting Excel files to csv equivalent is done as expected + + 1) Excel file name should become the name of the folder containing the newly converted csv files + 2) Excel file sheet names should become csv file names + 3) if files are given, the function should only convert to excel only those given files in a folder + 4) if no files are given, all Excel files in the parent folder and subsequent folders within the parent folder + should get converted to csv files + + """ + + def check_logic_of_converting_excel_files_to_csv_files(folder: Path, files: list) -> None: + """ check converting Excel files to csv files is done as expected + 1) check that a new directory to hold the newly created csv files has been created + 2) check that this new directory name matches the Excel file name it has been created from + 3) check csv files are created and that the csv names should match sheet names of an Excel file they + have been created from + """ + # check that the above function has created a folder named `ResourceFile_load-parameters`(name of the Excel + # file) and a csv file named `parameter_values` (Excel file sheet name). + excel_file_paths = [folder / file for file in files] + + for excel_file_path in excel_file_paths: + xl = pd.ExcelFile(excel_file_path) + path_to_new_directory = excel_file_path.with_suffix("") + # new folder should be created + assert path_to_new_directory.exists() and path_to_new_directory.is_dir() + # the new folder name should be the same as the Excel file name + assert excel_file_path.stem == path_to_new_directory.name + for sheet_name in xl.sheet_names: + path_to_new_file = Path(path_to_new_directory / f'{sheet_name}.csv') + # new csv file(s) should be created with name(s) resembling sheet name(s) in excel file + assert path_to_new_file.exists() and path_to_new_file.is_file() + assert sheet_name == path_to_new_file.name.split('.')[0] + + + # get resource file path + resourcefilepath = path_to_files / 'resources' + tmpdir_resourcefilepath = Path(tmpdir/'resources') + shutil.copytree(resourcefilepath, tmpdir_resourcefilepath) + + # check convert to csv logic when a list of file name(s) is given + excel_file = ['ResourceFile_load-parameters.xlsx'] + convert_excel_files_to_csv(tmpdir_resourcefilepath, files=excel_file) + # check new folder containing csv file is created. The folder name and csv file name should resemble the supplied + # Excel file name and sheet name respectively + check_logic_of_converting_excel_files_to_csv_files(tmpdir_resourcefilepath, files=excel_file) + + # check convert to csv logic when no list of file name(s) is given + excel_files = [file for file in tmpdir_resourcefilepath.rglob("*.xlsx")] + if excel_files is None: + excel_files = excel_file + + convert_excel_files_to_csv(tmpdir_resourcefilepath) + # check behaviours are as expected. New folders containing csv files should be created with names resembling the + # Excel file they were created from + check_logic_of_converting_excel_files_to_csv_files(tmpdir_resourcefilepath, excel_files)