diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index 4dc33409..19fa573f 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -1,8 +1,10 @@ +import itertools +from memory_profiler import profile from time import time -from typing import Callable, Dict, List, Any, Tuple, Union, Sequence, Mapping +from typing import Callable, Dict, Generator, List, Any, Tuple, Union, Sequence, Mapping from tqdm.auto import tqdm -from cadCAD.utils import flatten +from cadCAD.utils import flatten, lazy_flatten from cadCAD.utils.execution import print_exec_info from cadCAD.configuration import Configuration, Processor from cadCAD.configuration.utils import TensorFieldReport, configs_as_objs, configs_as_dicts @@ -80,6 +82,7 @@ def __init__(self, self.configs = configs self.empty_return = empty_return + @profile def execute(self) -> Tuple[object, object, Dict[str, object]]: if self.empty_return is True: return [], [], [] @@ -142,21 +145,44 @@ def get_final_dist_results(simulations: List[StateHistory], psu, ep) for psu, ep in list(zip(psus, eps))] return simulations, tensor_fields, sessions + def get_final_results_lazy(simulations: Generator, + psus: List[StateUpdateBlocks], + eps, + sessions: List[SessionDict], + remote_threshold: int): + is_generator: bool = isinstance(simulations, Generator) + if is_generator == False: + raise ValueError( + 'Invalid simulation results (Executor output is not a Generator required for lazy execution)') + + tensor_fields = [] + # NOTE here we change the result type to iterable + tensor_fields = itertools.chain.from_iterable( + map(create_tensor_field, zip(psus, eps))) + + flat_simulations = map( + lazy_flatten, map(lazy_flatten, simulations)) + + # NOTE here we change the result type, which is now an iterable + iterable_flat_simulations = itertools.chain.from_iterable( + flat_simulations) + + return iterable_flat_simulations, tensor_fields, sessions + def get_final_results(simulations: List[StateHistory], psus: List[StateUpdateBlocks], eps, sessions: List[SessionDict], remote_threshold: int): - + # if list of lists of lists of dicts: do flatten # if list of dicts: do not flatetn # else raise error - init: bool = isinstance(simulations, Sequence) failed_1 = False failed_2 = False - + try: init: bool = isinstance(simulations, Sequence) dont_flatten = init & isinstance(simulations[0], Mapping) @@ -174,8 +200,8 @@ def get_final_results(simulations: List[StateHistory], do_flatten = False if failed_1 and failed_2: - raise ValueError('Invalid simulation results (Executor output is not list[dict] or list[list[list[dict]]])') - + raise ValueError( + 'Invalid simulation results (Executor output is not list[dict] or list[list[list[dict]]])') flat_timesteps, tensor_fields = [], [] for sim_result, psu, ep in tqdm(list(zip(simulations, psus, eps)), @@ -184,7 +210,7 @@ def get_final_results(simulations: List[StateHistory], if do_flatten: flat_timesteps.append(flatten(sim_result)) tensor_fields.append(create_tensor_field(psu, ep)) - + if do_flatten: flat_simulations = flatten(flat_timesteps) else: @@ -209,15 +235,19 @@ def get_final_results(simulations: List[StateHistory], else: raise ValueError("Invalid execution mode specified") - print("Execution Method: " + self.exec_method.__name__) simulations_results = self.exec_method( sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, RunIDs, ExpIDs, SubsetIDs, SubsetWindows, original_N, self.additional_objs ) - final_result = get_final_results( - simulations_results, partial_state_updates, eps, sessions, remote_threshold) + if (self.additional_objs is not None and self.additional_objs.get('lazy_eval', False)): + final_result = get_final_results_lazy( + simulations_results, partial_state_updates, eps, sessions, remote_threshold) + else: + final_result = get_final_results( + simulations_results, partial_state_updates, eps, sessions, remote_threshold) + elif self.exec_context == ExecutionMode.distributed: print("Execution Method: " + self.exec_method.__name__) simulations_results = self.exec_method( @@ -228,6 +258,6 @@ def get_final_results(simulations: List[StateHistory], simulations_results, partial_state_updates, eps, sessions) t2 = time() - print(f"Total execution time: {t2 - t1 :.2f}s") + print(f"Total execution time: {t2 - t1:.2f}s") return final_result diff --git a/cadCAD/engine/execution.py b/cadCAD/engine/execution.py index 97a5fa87..abd396a4 100644 --- a/cadCAD/engine/execution.py +++ b/cadCAD/engine/execution.py @@ -1,8 +1,14 @@ -from typing import Callable, Dict, List, Any, Tuple, Sequence -from pathos.multiprocessing import ProcessPool # type: ignore +import os +from typing import Callable, Dict, Generator, List, Any, Tuple, Sequence +from pathos.multiprocessing import ProcessPool # type: ignore from collections import Counter from cadCAD.types import * -from cadCAD.utils import flatten +from cadCAD.utils import flatten, lazy_flatten +import tempfile +import pickle +import sys +from memory_profiler import profile +import dill VarDictType = Dict[str, List[object]] StatesListsType = List[dict[str, object]] @@ -25,15 +31,14 @@ def single_proc_exec( configured_n: Sequence[N_Runs], additional_objs=None ) -> List: - - + if not isinstance(var_dict_list, Sequence): var_dict_list = list([var_dict_list]) raw_params = ( simulation_execs, states_lists, configs_structs, env_processes_list, Ts, SimIDs, Ns, SubsetIDs, SubsetWindows, var_dict_list) - + results: List = [] print(f'Execution Mode: single_threaded') for raw_param in zip(*raw_params): @@ -44,6 +49,54 @@ def single_proc_exec( results.append(flatten(result)) return flatten(results) + +def process_executor(params): + simulation_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n = params + + result = [simulation_exec( + var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n + )] + return result + + +def process_executor_disk(params): + simulation_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n = params + + result = [simulation_exec( + var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n + )] + temp_file = tempfile.NamedTemporaryFile(delete=False) + with open(temp_file.name, 'wb') as f: # Note 'wb' for binary writing mode + dill.dump(result, f) + return temp_file.name + + +@profile +def file_handler_inc(filenames: List[str]) -> Generator[List, None, None]: + # combined_results = [] + for file_name in filenames: + with open(file_name, 'rb') as f: # Note 'rb' for binary reading mode + result = dill.load(f) + yield result # Yield the loaded result for immediate processing + + f.close() + os.remove(file_name) # Clean up temporary file + + +@profile +def file_handler(filenames: List[str]) -> Generator[List, None, None]: + combined_results = [] + for file_name in filenames: + with open(file_name, 'rb') as f: # Note 'rb' for binary reading mode + result = dill.load(f) + combined_results.append(result) + result = None + f.close() + os.remove(file_name) # Clean up temporary file + return combined_results + + +@profile def parallelize_simulations( simulation_execs: List[ExecutorFunction], var_dict_list: List[Parameters], @@ -61,50 +114,28 @@ def parallelize_simulations( ): print(f'Execution Mode: parallelized') - params = list( - zip( - simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, - Ts, SimIDs, Ns, SubsetIDs, SubsetWindows - ) - ) - - len_configs_structs = len(configs_structs) - - unique_runs = Counter(SimIDs) - sim_count = max(unique_runs.values()) - highest_divisor = int(len_configs_structs / sim_count) - - new_configs_structs, new_params = [], [] - for count in range(len(params)): - if count == 0: - new_params.append( - params[count: highest_divisor] - ) - new_configs_structs.append( - configs_structs[count: highest_divisor] - ) - elif count > 0: - new_params.append( - params[count * highest_divisor: (count + 1) * highest_divisor] - ) - new_configs_structs.append( - configs_structs[count * highest_divisor: (count + 1) * highest_divisor] - ) - - def process_executor(params): - if len_configs_structs > 1: - with ProcessPool(processes=len_configs_structs) as pp: - results = pp.map( - lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], configured_n), params - ) - else: - t = params[0] - results = t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], configured_n) - return results - - results = flatten(list(map(lambda params: process_executor(params), new_params))) - - return results + lazy_eval = False + if (additional_objs): + lazy_eval = additional_objs.get('lazy_eval', False) + + params = [ + (sim_exec, var_dict, states_list, config, env_processes, + T, sim_id, N, subset_id, subset_window, configured_n) + for sim_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window in + zip(simulation_execs, var_dict_list, states_lists, configs_structs, + env_processes_list, Ts, SimIDs, Ns, SubsetIDs, SubsetWindows) + ] + + if (lazy_eval): + with ProcessPool(maxtasksperchild=1) as pool: + temp_files = pool.map(process_executor_disk, params) + generator = file_handler_inc(temp_files) + return lazy_flatten(generator) + + with ProcessPool(maxtasksperchild=1) as pool: + results = pool.map(process_executor, params) + + return flatten(results) def local_simulations( @@ -121,15 +152,15 @@ def local_simulations( SubsetWindows: List[SubsetWindow], configured_n: List[N_Runs], additional_objs=None - ): +): config_amt = len(configs_structs) - if config_amt == 1: # and configured_n != 1 + if config_amt == 1: # and configured_n != 1 return single_proc_exec( simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n, additional_objs ) - elif config_amt > 1: # and configured_n != 1 + elif config_amt > 1: # and configured_n != 1 return parallelize_simulations( simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n, additional_objs diff --git a/cadCAD/tools/execution/easy_run.py b/cadCAD/tools/execution/easy_run.py index 5ea737e1..c1fda147 100644 --- a/cadCAD/tools/execution/easy_run.py +++ b/cadCAD/tools/execution/easy_run.py @@ -42,6 +42,7 @@ def easy_run( drop_substeps=True, exec_mode='local', deepcopy_off=False, + lazy_eval=False ) -> pd.DataFrame: """ Run cadCAD simulations without headaches. @@ -66,7 +67,10 @@ def easy_run( _exec_mode = ExecutionMode().local_mode elif exec_mode == 'single': _exec_mode = ExecutionMode().single_mode - exec_context = ExecutionContext(_exec_mode, additional_objs={'deepcopy_off': deepcopy_off}) + exec_context = ExecutionContext(_exec_mode, additional_objs={ + 'deepcopy_off': deepcopy_off, + 'lazy_eval': lazy_eval + }) executor = Executor(exec_context=exec_context, configs=configs) # Execute the cadCAD experiment diff --git a/cadCAD/utils/__init__.py b/cadCAD/utils/__init__.py index 7b944a35..87e42aab 100644 --- a/cadCAD/utils/__init__.py +++ b/cadCAD/utils/__init__.py @@ -2,14 +2,14 @@ from collections import defaultdict from itertools import product import warnings -from typing import Union +from typing import Any, Generator, Union from cadCAD.types import * from typing import List, Dict, Union import functools import operator -from pandas import DataFrame # type: ignore +from pandas import DataFrame # type: ignore class SilentDF(DataFrame): @@ -33,7 +33,8 @@ def arrange_cols(df: DataFrame, reverse=False) -> DataFrame: """ session_metrics = ['session_id', 'user_id', 'simulation_id', 'run_id'] sys_metrics = ['run', 'timestep', 'substep'] - result_cols = list(set(df.columns) - set(session_metrics) - set(sys_metrics)) + result_cols = list(set(df.columns) - + set(session_metrics) - set(sys_metrics)) result_cols.sort(reverse=reverse) return df[session_metrics + sys_metrics + result_cols] @@ -75,6 +76,7 @@ def tupalize(k: object, vs: Union[list, dict]): l.append((k, vs)) return l + def flattenDict(l: dict) -> list: """ >>> flattenDict({1: [1, 2, 3], 4: 5}) @@ -92,6 +94,32 @@ def flatten(l: Union[list, dict]): return flattenDict(l) +# Incremental version of flatten with type hints +def lazy_tupalize(k: Any, vs: Union[Iterable[Any], Any]) -> Generator[tuple, None, None]: + if isinstance(vs, Iterable) and not isinstance(vs, str): + for v in vs: + yield (k, v) + else: + yield (k, vs) + + +def lazy_flattenDict(d: Dict[Any, Any]) -> Generator[Dict[Any, Any], None, None]: + flat_list = (lazy_tupalize(k, vs) for k, vs in d.items()) + for items in product(*flat_list): + yield dict(items) + + +def lazy_flatten(l: Union[Iterable[Any], Dict[Any, Any]]) -> Generator[Any, None, None]: + if isinstance(l, Iterable) and not isinstance(l, (str, dict)): + for item in l: + if isinstance(item, Iterable) and not isinstance(item, (str, dict)): + yield from lazy_flatten(item) + else: + yield item + elif isinstance(l, dict): + yield from lazy_flattenDict(l) + + def flatMap(f, collection): return flatten(list(map(f, collection))) diff --git a/documentation/examples/headless_tools.py b/documentation/examples/headless_tools.py new file mode 100644 index 00000000..9464dae6 --- /dev/null +++ b/documentation/examples/headless_tools.py @@ -0,0 +1,72 @@ +# isort:skip_file +# fmt: off +import sys +sys.path.append(".") +from cadCAD.tools import easy_run +import plotly.express as px +import numpy as np +import sys +import seaborn as sns +from tqdm.auto import tqdm +# fmt: on + + +TIMESTEPS = 3650 +SAMPLES = 2 +GARBAGE_SIZE = 100000 +# STATE_SIZE = 1 +STATE_SIZE = 100000 + +initial_conditions = { + 'big_state': "a" * STATE_SIZE +} + +params = { + 'big_param': ["p" * GARBAGE_SIZE] +} + + +def p_big_policy(params, step, sL, s): + y = 'big_state' + x = s['big_state'] + "a" + return {'big_state': x} + + +def s_big_state(params, step, sL, s, _input): + y = 'big_state' + x = s['big_state'] + "b" + return (y, x) + + +partial_state_update_blocks = [ + { + 'label': 'Memory Consumer', + 'policies': { + 'big_policy': p_big_policy, + }, + 'variables': { + 'big_state': s_big_state + } + }, + { + 'label': 'Do Nothing', + 'policies': { + + }, + 'variables': { + + } + } +] + + +df = easy_run(initial_conditions, + params, + partial_state_update_blocks, + TIMESTEPS, + SAMPLES, + deepcopy_off=True, + lazy_eval=True, + assign_params=True, + drop_substeps=False) +print(df) diff --git a/requirements.txt b/requirements.txt index 763c639c..a215fc19 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,10 +7,11 @@ wheel>=0.38.1 pandas>=1.1.5 funcy>=1.16 dill>=0.3.4 +memory-profiler==0.61.0 pathos>=0.2.8 numpy>=1.22.0 pytz>=2021.1 setuptools>=69.0.2 graphviz>=0.20.1 tqdm>=4.65.0 -pytest>=7.4.3 \ No newline at end of file +pytest>=7.4.3