diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 952c1d0..2d9c112 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -54,4 +54,4 @@ repos: - id: mypy types: [ python ] exclude: ^.*example.*$ - additional_dependencies: [ pandas-stubs, types-requests, types-PyYAML, pydantic ] + additional_dependencies: [ pandas-stubs, types-requests, types-PyYAML, pydantic, types-tabulate ] diff --git a/core/morph/api/service.py b/core/morph/api/service.py index 6b10bca..07c2152 100644 --- a/core/morph/api/service.py +++ b/core/morph/api/service.py @@ -1,4 +1,3 @@ -import ast import asyncio import io import json @@ -25,13 +24,14 @@ ) from morph.api.utils import convert_file_output, convert_variables_values from morph.cli.flags import Flags -from morph.config.project import load_project +from morph.config.project import default_output_paths, load_project from morph.task.resource import PrintResourceTask from morph.task.run import RunTask from morph.task.utils.morph import find_project_root_dir from morph.task.utils.run_backend.errors import MorphFunctionLoadError +from morph.task.utils.run_backend.execution import execution_cache from morph.task.utils.run_backend.state import MorphGlobalContext -from morph.task.utils.sqlite import SqliteDBManager +from morph.task.utils.run_backend.types import RunStatus logger = logging.getLogger("uvicorn") @@ -61,9 +61,6 @@ def run_file_with_type_service( f"Alias not found {input.name}. Check the console for more detailed error information.", ) - db_manager = SqliteDBManager(project_root) - db_manager.initialize_database() - with click.Context(click.Command(name="")) as ctx: ctx.params["FILENAME"] = input.name ctx.params["RUN_ID"] = f"{int(time.time() * 1000)}" @@ -80,20 +77,10 @@ def run_file_with_type_service( str(e), ) - run_results = db_manager.get_run_records_by_run_id(task.run_id) - run_result = next( - (result for result in run_results if result["cell_alias"] == input.name), None - ) - if run_result is None: - raise WarningError( - ErrorCode.ExecutionError, - ErrorMessage.ExecutionErrorMessage["executionFailed"], - "run result not found", - ) - elif run_result["status"] == "failed": - if run_result["error"] is not None: + if task.final_state != RunStatus.DONE.value: + if task.error is not None: try: - error = json.loads(run_result["error"])["details"] + error = json.loads(task.error) except Exception: # noqa raise WarningError( ErrorCode.ExecutionError, @@ -110,14 +97,19 @@ def run_file_with_type_service( ErrorCode.ExecutionError, ErrorMessage.ExecutionErrorMessage["executionFailed"], ) - try: - output_paths = ast.literal_eval(run_result["outputs"]) - except Exception: # noqa + output_paths = task.output_paths + if not output_paths or len(output_paths) == 0: raise WarningError( ErrorCode.ExecutionError, ErrorMessage.ExecutionErrorMessage["executionFailed"], "output not found", ) + + # ------------------------------------------------------------------ + # After execution, update the global cache + # ------------------------------------------------------------------ + execution_cache.update_cache(input.name, output_paths) + output_path = output_paths[0] if input.type == "image" or input.type == "html": if len(output_paths) == 2: @@ -185,9 +177,6 @@ def run_file_service(input: RunFileService) -> SuccessResponse: f"Alias not found {input.name}. Check the console for more detailed error information.", ) - db_manager = SqliteDBManager(project_root) - db_manager.initialize_database() - with click.Context(click.Command(name="")) as ctx: run_id = input.run_id if input.run_id else f"{int(time.time() * 1000)}" ctx.params["FILENAME"] = input.name @@ -205,20 +194,10 @@ def run_file_service(input: RunFileService) -> SuccessResponse: str(e), ) - run_results = db_manager.get_run_records_by_run_id(task.run_id) - run_result = next( - (result for result in run_results if result["cell_alias"] == input.name), None - ) - if run_result is None: - raise WarningError( - ErrorCode.ExecutionError, - ErrorMessage.ExecutionErrorMessage["executionFailed"], - "run result not found", - ) - elif run_result["status"] == "failed": - if run_result["error"] is not None: + if task.final_state != RunStatus.DONE.value: + if task.error is not None: try: - error = json.loads(run_result["error"])["details"] + error = json.loads(task.error) except Exception: # noqa raise WarningError( ErrorCode.ExecutionError, @@ -354,16 +333,7 @@ async def file_upload_service(input: UploadFileService) -> Any: ) # Retrieve the result of the file_upload function - project_root = find_project_root_dir() - db_manager = SqliteDBManager(project_root) - - run_results = db_manager.get_run_records_by_run_id(run_id) - run_result = next( - (result for result in run_results if result["run_id"] == run_id), None - ) - - # Retrieve the saved file path from the output - output_file = ast.literal_eval(run_result["outputs"])[0] if run_result else None + output_file = default_output_paths()[0] with open(output_file, "r") as f: saved_filepath = f.read() diff --git a/core/morph/config/project.py b/core/morph/config/project.py index 19818d8..5cc487c 100644 --- a/core/morph/config/project.py +++ b/core/morph/config/project.py @@ -4,12 +4,12 @@ import yaml from pydantic import BaseModel, Field -from morph.constants import MorphConstant from morph.task.utils.connection import ( CONNECTION_TYPE, MORPH_DUCKDB_CONNECTION_SLUG, MorphConnection, ) +from morph.task.utils.morph import find_project_root_dir class Schedule(BaseModel): @@ -27,11 +27,6 @@ class MorphProject(BaseModel): profile: Optional[str] = "default" source_paths: List[str] = Field(default_factory=lambda: ["src"]) default_connection: Optional[str] = MORPH_DUCKDB_CONNECTION_SLUG - output_paths: List[str] = Field( - default_factory=lambda: [ - f"{MorphConstant.TMP_MORPH_DIR}/{{name}}/{{run_id}}{{ext()}}" - ] - ) scheduled_jobs: Optional[Dict[str, ScheduledJob]] = Field(default=None) result_cache_ttl: Optional[int] = Field(default=0) project_id: Optional[str] = Field(default=None) @@ -40,6 +35,11 @@ class Config: arbitrary_types_allowed = True +def default_output_paths() -> List[str]: + project_root = find_project_root_dir() + return [f"{project_root}/.morph/cache/{{name}}{{ext()}}"] + + def default_initial_project() -> MorphProject: return MorphProject() diff --git a/core/morph/constants.py b/core/morph/constants.py index 7b9305e..0a53086 100644 --- a/core/morph/constants.py +++ b/core/morph/constants.py @@ -7,7 +7,6 @@ class MorphConstant: TMP_MORPH_DIR = "/tmp/morph" # Files MORPH_CRED_PATH = os.path.expanduser("~/.morph/credentials") - MORPH_PROJECT_DB = "morph_project.sqlite3" MORPH_CONNECTION_PATH = os.path.expanduser("~/.morph/connections.yml") # Others EXECUTABLE_EXTENSIONS = [".sql", ".py"] diff --git a/core/morph/include/starter_template/.gitignore b/core/morph/include/starter_template/.gitignore index d97921a..0c446a8 100644 --- a/core/morph/include/starter_template/.gitignore +++ b/core/morph/include/starter_template/.gitignore @@ -1,5 +1,4 @@ # Morph config files and directories .gitconifg .morph -morph_project.sqlite3 node_modules diff --git a/core/morph/include/starter_template/morph_project.yml b/core/morph/include/starter_template/morph_project.yml index 846e4f5..6e71e8f 100644 --- a/core/morph/include/starter_template/morph_project.yml +++ b/core/morph/include/starter_template/morph_project.yml @@ -1,6 +1,4 @@ default_connection: DUCKDB -output_paths: -- /tmp/morph/{name}/{run_id}{ext()} result_cache_ttl: 0 scheduled_jobs: null source_paths: diff --git a/core/morph/task/clean.py b/core/morph/task/clean.py index 5946f89..73a150b 100644 --- a/core/morph/task/clean.py +++ b/core/morph/task/clean.py @@ -23,32 +23,24 @@ def run(self): raise e clean_dir = Path(project_root).joinpath(".morph") - clean_files = ["meta.json", "knowledge.json", "template.json"] - clean_directories = ["frontend"] - - for _f in clean_files: - clean_file = clean_dir.joinpath(_f) - if clean_file.exists(): - if verbose: - click.echo(click.style(f"Removing {clean_file}", fg="yellow")) - clean_file.unlink() - else: - if verbose: - click.echo(click.style(f"File {clean_file} not found", fg="yellow")) - - for _d in clean_directories: - clean_directory = clean_dir.joinpath(_d) - if clean_directory.exists(): - if verbose: - click.echo(click.style(f"Removing {clean_directory}", fg="yellow")) - shutil.rmtree(clean_directory) - else: - if verbose: - click.echo( - click.style( - f"Directory {clean_directory} not found", fg="yellow" - ) - ) + + if clean_dir.exists(): + # Delete the entire .morph directory + if verbose: + click.echo( + click.style(f"Removing directory {clean_dir}...", fg="yellow") + ) + shutil.rmtree(clean_dir) + + # Recreate the empty .morph directory + clean_dir.mkdir(parents=True, exist_ok=True) + if verbose: + click.echo( + click.style(f"Recreated empty directory {clean_dir}", fg="yellow") + ) + else: + if verbose: + click.echo(click.style(f"Directory {clean_dir} not found", fg="yellow")) click.echo( click.style( diff --git a/core/morph/task/new.py b/core/morph/task/new.py index 007448d..265ab43 100644 --- a/core/morph/task/new.py +++ b/core/morph/task/new.py @@ -11,7 +11,6 @@ from morph.task.base import BaseTask from morph.task.utils.connection import ConnectionYaml from morph.task.utils.run_backend.state import MorphGlobalContext -from morph.task.utils.sqlite import SqliteDBManager class NewTask(BaseTask): @@ -51,15 +50,6 @@ def run(self): dest_file = os.path.join(target_path, template_file) shutil.copy2(src_file, dest_file) - db_path = f"{self.project_root}/morph_project.sqlite3" - if not os.path.exists(db_path): - with open(db_path, "w") as f: - f.write("") - - # Initialize the project database - db_manager = SqliteDBManager(self.project_root) - db_manager.initialize_database() - # Execute the post-setup tasks original_working_dir = os.getcwd() os.chdir(self.project_root) diff --git a/core/morph/task/resource.py b/core/morph/task/resource.py index aa7f70b..e520b7c 100644 --- a/core/morph/task/resource.py +++ b/core/morph/task/resource.py @@ -13,7 +13,6 @@ from morph.task.utils.morph import Resource, find_project_root_dir from morph.task.utils.run_backend.inspection import get_checksum from morph.task.utils.run_backend.state import MorphGlobalContext, load_cache -from morph.task.utils.sqlite import SqliteDBManager class PrintResourceTask(BaseTask): @@ -45,10 +44,6 @@ def __init__(self, args: Flags): click.echo(click.style(str(e), fg="red")) raise e - # Initialize SQLite database - self.db_manager = SqliteDBManager(self.project_root) - self.db_manager.initialize_database() - def run(self): try: cache = load_cache(self.project_root) diff --git a/core/morph/task/run.py b/core/morph/task/run.py index 11728dc..64d5f95 100644 --- a/core/morph/task/run.py +++ b/core/morph/task/run.py @@ -4,11 +4,13 @@ import sys import time from pathlib import Path -from typing import Any, Dict, Literal, Optional +from typing import Any, Dict, List, Literal, Optional import click +import pandas as pd import pydantic from dotenv import dotenv_values, load_dotenv +from tabulate import tabulate from morph.cli.flags import Flags from morph.config.project import ( @@ -26,11 +28,7 @@ MorphFunctionLoadError, logging_file_error_exception, ) -from morph.task.utils.run_backend.execution import ( - RunDagArgs, - generate_variables_hash, - run_cell, -) +from morph.task.utils.run_backend.execution import RunDagArgs, run_cell from morph.task.utils.run_backend.output import ( finalize_run, is_async_generator, @@ -45,7 +43,7 @@ MorphGlobalContext, load_cache, ) -from morph.task.utils.sqlite import CliError, RunStatus, SqliteDBManager +from morph.task.utils.run_backend.types import CliError, RunStatus from morph.task.utils.timezone import TimezoneManager @@ -53,6 +51,11 @@ class RunTask(BaseTask): def __init__(self, args: Flags, mode: Optional[Literal["cli", "api"]] = "cli"): super().__init__(args) + # class state + self.final_state: Optional[str] = None + self.error: Optional[str] = None + self.output_paths: Optional[List[str]] = None + # parse arguments filename_or_alias: str = os.path.normpath(args.FILENAME) self.run_id: str = self.args.RUN_ID or f"{int(time.time() * 1000)}" @@ -99,10 +102,6 @@ def __init__(self, args: Flags, mode: Optional[Literal["cli", "api"]] = "cli"): if self.project.project_id is not None: os.environ["MORPH_PROJECT_ID"] = self.project.project_id - # Initialize database - self.db_manager = SqliteDBManager(self.project_root) - self.db_manager.initialize_database() - context = MorphGlobalContext.get_instance() try: errors = context.partial_load( @@ -178,22 +177,7 @@ def __init__(self, args: Flags, mode: Optional[Literal["cli", "api"]] = "cli"): self.resource = resource self.ext = os.path.splitext(os.path.basename(self.filename))[1] self.cell_alias = str(self.resource.name) - - # Set up run directory - self.runs_dir = os.path.normpath( - os.path.join( - self.project_root, - ".morph/runs", - self.run_id, - ) - ) - if not os.path.exists(self.runs_dir): - os.makedirs(self.runs_dir) - - # Set up logger - log_filename = f"{os.path.splitext(os.path.basename(self.cell_alias))[0]}.log" - self.log_path = os.path.join(self.runs_dir, log_filename) - self.logger = get_morph_logger(self.log_path) + self.logger = get_morph_logger() # load .env in project root and set timezone dotenv_path = os.path.join(self.project_root, ".env") @@ -213,37 +197,21 @@ def __init__(self, args: Flags, mode: Optional[Literal["cli", "api"]] = "cli"): tz_manager.set_timezone(desired_tz) def run(self) -> Any: - cached_cell = ( - self.meta_obj_cache.find_by_name(self.cell_alias) - if self.meta_obj_cache - else None - ) - - self.db_manager.insert_run_record( - self.run_id, - self.cell_alias, - self.is_dag, - self.log_path, - cached_cell.checksum if cached_cell else None, - generate_variables_hash(self.vars), - self.vars, - ) - if self.ext != ".sql" and self.ext != ".py": - text = "Invalid file type. Please specify a .sql or .py file." - self.logger.error(text) - finalize_run( + self.error = "Invalid file type. Please specify a .sql or .py file." + self.logger.error(self.error) + self.final_state = RunStatus.FAILED.value + self.output_paths = finalize_run( self.project, - self.db_manager, self.resource, self.cell_alias, - RunStatus.FAILED.value, + self.final_state, None, self.logger, self.run_id, CliError( type="general", - details=text, + details=self.error, ), ) return @@ -259,15 +227,10 @@ def run(self) -> Any: ) try: - dag = ( - RunDagArgs(run_id=self.run_id, runs_dir=self.runs_dir) - if self.is_dag - else None - ) + dag = RunDagArgs(run_id=self.run_id) if self.is_dag else None output = run_cell( self.project, self.resource, - self.db_manager, self.vars, self.logger, dag, @@ -283,14 +246,15 @@ def run(self) -> Any: else str(e) ) text = f"An error occurred while running the file 💥: {error_txt}" - self.logger.error(text) - click.echo(click.style(text, fg="red")) - finalize_run( + self.error = text + self.logger.error(self.error) + click.echo(click.style(self.error, fg="red")) + self.final_state = RunStatus.FAILED.value + self.output_paths = finalize_run( self.project, - self.db_manager, self.resource, cell, - RunStatus.FAILED.value, + self.final_state, None, self.logger, self.run_id, @@ -303,6 +267,16 @@ def run(self) -> Any: raise Exception(text) return + # print preview of the DataFrame + if isinstance(output.result, pd.DataFrame): + preview = tabulate( + output.result.head().values.tolist(), + headers=output.result.columns.tolist(), + tablefmt="grid", + showindex=True, + ) + self.logger.info("DataFrame preview:\n" + preview) + if ( is_stream(output.result) or is_async_generator(output.result) @@ -311,7 +285,6 @@ def run(self) -> Any: if self.mode == "api": return stream_and_write_and_response( self.project, - self.db_manager, self.resource, cell, RunStatus.DONE.value, @@ -323,7 +296,6 @@ def run(self) -> Any: else: stream_and_write( self.project, - self.db_manager, self.resource, cell, RunStatus.DONE.value, @@ -333,12 +305,12 @@ def run(self) -> Any: None, ) else: - finalize_run( + self.final_state = RunStatus.DONE.value + self.output_paths = finalize_run( self.project, - self.db_manager, self.resource, cell, - RunStatus.DONE.value, + self.final_state, transform_output(self.resource, output.result), self.logger, self.run_id, diff --git a/core/morph/task/utils/logging.py b/core/morph/task/utils/logging.py index 05d8bb6..417cacc 100644 --- a/core/morph/task/utils/logging.py +++ b/core/morph/task/utils/logging.py @@ -40,18 +40,13 @@ async def redirect_stdout_to_logger_async(logger, level=logging.INFO): yield -def get_morph_logger(log_file: str) -> logging.Logger: - logger = logging.getLogger(log_file) +def get_morph_logger() -> logging.Logger: + logger = logging.getLogger("morph_logger") if not logger.hasHandlers(): logger.setLevel(logging.DEBUG) - file_handler = logging.FileHandler(log_file) - file_handler.setFormatter( - logging.Formatter("%(asctime)s [%(levelname)s] %(message)s") - ) - logger.addHandler(file_handler) - + # Console handler with color formatting console_handler = colorlog.StreamHandler() console_handler.setLevel(logging.DEBUG) console_formatter = colorlog.ColoredFormatter( diff --git a/core/morph/task/utils/run_backend/cache.py b/core/morph/task/utils/run_backend/cache.py new file mode 100644 index 0000000..ae3ca0e --- /dev/null +++ b/core/morph/task/utils/run_backend/cache.py @@ -0,0 +1,92 @@ +from datetime import datetime, timedelta +from typing import List, Optional + + +class ExecutionCache: + """ + A class to maintain TTL-based caching of run results. + If expiration_seconds == 0, caching is disabled (no cache entries are stored or retrieved). + """ + + def __init__(self, expiration_seconds: int = 0): + """ + Initialize the execution cache. + + :param expiration_seconds: The number of seconds after which a cache entry expires. + If set to 0, caching is disabled. + """ + self.cache: dict[str, dict[str, object]] = {} + self.expiration_seconds = expiration_seconds + + def update_cache(self, function_name: str, cache_paths: List[str]) -> None: + """ + Update or add an entry to the cache. Replaces existing cache paths with the provided ones. + If expiration_seconds == 0, do nothing (caching is disabled). + """ + if self.expiration_seconds == 0: + return # Skip storing anything + + current_time = datetime.now().isoformat() + cache_entry = self.cache.get(function_name) + + # Check if an existing cache entry is still valid + if cache_entry: + last_executed_at = cache_entry.get("last_executed_at") + if isinstance(last_executed_at, str): + last_executed_time = datetime.fromisoformat(last_executed_at) + if datetime.now() - last_executed_time <= timedelta( + seconds=self.expiration_seconds + ): + # Cache is still valid, only update the cache paths + cache_entry["cache_paths"] = cache_paths + return + + # Either no cache entry exists, or the cache has expired + self.cache[function_name] = { + "last_executed_at": current_time, + "cache_paths": cache_paths, + } + + def get_cache(self, function_name: str) -> Optional[dict[str, object]]: + """ + Retrieve cache data for a specific function. Automatically invalidates expired cache. + If expiration_seconds == 0, always return None (no caching). + """ + if self.expiration_seconds == 0: + return None # Caching is disabled + + cache_entry = self.cache.get(function_name) + if not cache_entry: + return None + + last_executed_at = cache_entry["last_executed_at"] + if not isinstance(last_executed_at, str): + raise ValueError("Invalid cache entry: last_executed_at is not a string") + + last_executed_time = datetime.fromisoformat(last_executed_at) + if datetime.now() - last_executed_time > timedelta( + seconds=self.expiration_seconds + ): + self.clear_cache(function_name) + return None + + return cache_entry + + def clear_cache(self, function_name: str) -> None: + """ + Remove the cache entry for a specific function. + """ + if function_name in self.cache: + del self.cache[function_name] + + def clear_all_cache(self) -> None: + """ + Clear all cache entries. + """ + self.cache.clear() + + def list_all_cache(self) -> List[str]: + """ + List all function names currently cached. + """ + return list(self.cache.keys()) diff --git a/core/morph/task/utils/run_backend/decorators.py b/core/morph/task/utils/run_backend/decorators.py index 3e7a76b..c7d7284 100644 --- a/core/morph/task/utils/run_backend/decorators.py +++ b/core/morph/task/utils/run_backend/decorators.py @@ -1,20 +1,17 @@ from __future__ import annotations import inspect -import os from functools import wraps from typing import Any, Callable, List, Literal, Optional, TypeVar from typing_extensions import ParamSpec -from morph.config.project import load_project -from morph.constants import MorphConstant +from morph.config.project import default_output_paths from morph.task.utils.knowledge.inspection import ( MorphKnowledgeMetaObjectGlossaryTerm, MorphKnowledgeMetaObjectSchema, ) from morph.task.utils.morph import find_project_root_dir - from .state import MorphFunctionMetaObject, MorphGlobalContext Param = ParamSpec("Param") @@ -39,7 +36,6 @@ def func( title: str | None = None, schemas: list[MorphKnowledgeMetaObjectSchema] | None = None, terms: list[MorphKnowledgeMetaObjectGlossaryTerm] | None = None, - output_paths: list[str] | None = None, output_type: Optional[ Literal["dataframe", "csv", "visualization", "markdown", "json"] ] = None, @@ -50,7 +46,6 @@ def func( name = alias or name context = MorphGlobalContext.get_instance() - project = load_project(find_project_root_dir()) def decorator(func: Callable[Param, RetType]) -> Callable[Param, RetType]: fid = _get_morph_function_id(func) @@ -66,25 +61,6 @@ def decorator(func: Callable[Param, RetType]) -> Callable[Param, RetType]: if not isinstance(connection, (str, type(None))): connection = None - output_paths_ = output_paths - if project and project.output_paths and len(project.output_paths) > 0: - project_output_paths: List[str] = [] - for project_output_path in project.output_paths: - if ( - os.path.isdir(project_output_path) - and "ext()" not in project_output_path - ): - project_output_paths.append( - f"{project_output_path}/{{name}}/{{run_id}}{{ext()}}" - ) - output_paths_ = ( - project_output_paths if len(project_output_paths) > 0 else output_paths_ - ) - if output_paths_ is None: - output_paths_ = [ - f"{MorphConstant.TMP_MORPH_DIR}/{{name}}/{{run_id}}{{ext()}}" - ] - meta_obj = MorphFunctionMetaObject( id=fid, name=name or func.__name__, @@ -95,7 +71,7 @@ def decorator(func: Callable[Param, RetType]) -> Callable[Param, RetType]: terms=terms, variables=variables, data_requirements=data_requirements, - output_paths=output_paths_, + output_paths=default_output_paths(), output_type=output_type, connection=connection, result_cache_ttl=result_cache_ttl, diff --git a/core/morph/task/utils/run_backend/execution.py b/core/morph/task/utils/run_backend/execution.py index b5ea610..b16d45f 100644 --- a/core/morph/task/utils/run_backend/execution.py +++ b/core/morph/task/utils/run_backend/execution.py @@ -1,6 +1,5 @@ from __future__ import annotations -import ast import asyncio import base64 import hashlib @@ -8,9 +7,7 @@ import json import logging import os -import time -from datetime import datetime -from typing import Any, Callable, List, Optional, Union, cast +from typing import Any, Callable, List, Optional, Union import pandas as pd from jinja2 import BaseLoader, Environment @@ -18,7 +15,7 @@ from morph_lib.types import HtmlImageResponse, MarkdownResponse, MorphChatStreamChunk from pydantic import BaseModel -from morph.config.project import MorphProject +from morph.config.project import MorphProject, default_output_paths from morph.task.utils.connection import Connection, ConnectionYaml, DatabaseConnection from morph.task.utils.connections.connector import Connector from morph.task.utils.logging import ( @@ -36,18 +33,23 @@ stream_and_write, transform_output, ) -from morph.task.utils.sqlite import CliError, RunStatus, SqliteDBManager +from morph.task.utils.run_backend.types import CliError, RunStatus +from .cache import ExecutionCache from .state import ( MorphFunctionMetaObject, MorphFunctionMetaObjectCache, MorphGlobalContext, ) +# ----------------------------------------------------- +# Global cache instance used throughout the module +# ----------------------------------------------------- +execution_cache = ExecutionCache() + class RunDagArgs(BaseModel): run_id: str - runs_dir: str class RunCellResult(BaseModel): @@ -58,51 +60,49 @@ class RunCellResult(BaseModel): def run_cell( project: Optional[MorphProject], cell: str | MorphFunctionMetaObject, - db_manager: SqliteDBManager, vars: dict[str, Any] = {}, logger: logging.Logger | None = None, dag: Optional[RunDagArgs] = None, meta_obj_cache: Optional[MorphFunctionMetaObjectCache] = None, ) -> RunCellResult: - # retrieve resource context = MorphGlobalContext.get_instance() + + # Resolve the meta object if isinstance(cell, str): meta_obj = context.search_meta_object_by_name(cell) if meta_obj is None: - raise ValueError("not registered as a morph function.") + raise ValueError("Not registered as a Morph function.") else: meta_obj = cell if meta_obj.id is None: raise ValueError(f"Invalid metadata: {meta_obj}") - # get cached cell + # Attempt to get cached cell from meta_obj_cache cached_cell = meta_obj_cache.find_by_name(meta_obj.name) if meta_obj_cache else None - # whether the cache is valid or not (invalid if the parent files are updated) is_cache_valid = True - # register parents to meta_obj if the file is SQL + # If SQL, register data requirements ext = meta_obj.id.split(".")[-1] if ext == "sql": _regist_sql_data_requirements(meta_obj) meta_obj = context.search_meta_object_by_name(meta_obj.name or "") if meta_obj is None: - raise ValueError("not registered as a morph function.") + raise ValueError("Not registered as a Morph function.") - # retrieve data from parents if exists + # Handle dependencies required_data = meta_obj.data_requirements or [] for data_name in required_data: required_meta_obj = context.search_meta_object_by_name(data_name) if required_meta_obj is None: raise ValueError( - f"required data '{data_name}' is not registered as a morph function." + f"Required data '{data_name}' is not registered as a Morph function." ) - # save the data to the intermediate output in case of DAG + if dag: required_data_result = _run_cell_with_dag( project, required_meta_obj, - db_manager, vars, dag, meta_obj_cache, @@ -111,7 +111,6 @@ def run_cell( required_data_result = run_cell( project, required_meta_obj, - db_manager, vars, logger, None, @@ -186,122 +185,149 @@ def run_cell( ): raise RequestError(f"Variable '{var_name}' is required.") - # get cached result if exists + # ------------------------------------------------------------------------- + # Use the global _execution_cache. If project.result_cache_ttl is set, apply it. + # project.result_cache_ttl is in SECONDS, so we directly assign it to expiration_seconds. + # ------------------------------------------------------------------------- + if project and project.result_cache_ttl and project.result_cache_ttl > 0: + execution_cache.expiration_seconds = project.result_cache_ttl + + # Check cache + cache_entry = execution_cache.get_cache(meta_obj.name) + if cache_entry: + # If valid cache entry, try to load from disk + if logger: + logger.info(f"Running {meta_obj.name} using cached result.") + + cache_paths_obj = cache_entry.get("cache_paths", []) + if not isinstance(cache_paths_obj, list): + raise ValueError("Invalid cache entry: cache_paths is not a list.") + + for path in cache_paths_obj: + if not os.path.exists(path): + continue + ext_ = path.split(".")[-1] + if ext_ in {"parquet", "csv", "json", "md", "txt", "html", "png"}: + cached_result = None + if ext_ == "parquet": + cached_result = RunCellResult(result=pd.read_parquet(path)) + elif ext_ == "csv": + cached_result = RunCellResult(result=pd.read_csv(path)) + elif ext_ == "json": + json_dict = json.loads(open(path, "r").read()) + if not MorphChatStreamChunk.is_chat_stream_chunk_json(json_dict): + cached_result = RunCellResult( + result=pd.read_json(path, orient="records") + ) + elif ext_ in {"md", "txt"}: + cached_result = RunCellResult( + result=MarkdownResponse(open(path, "r").read()) + ) + elif ext_ == "html": + cached_result = RunCellResult( + result=HtmlImageResponse(html=open(path, "r").read()) + ) + elif ext_ == "png": + cached_result = RunCellResult( + result=HtmlImageResponse(image=convert_image_base64(path)) + ) + if cached_result: + return cached_result + + # ------------------------------------------------------------------ + # Legacy file-based cache logic + # ------------------------------------------------------------------ cache_ttl = ( - meta_obj.result_cache_ttl - or (project.result_cache_ttl if project else None) - or 0 + meta_obj.result_cache_ttl or (project.result_cache_ttl if project else 0) or 0 ) if project and cache_ttl > 0 and cached_cell and is_cache_valid: - if len(vars.items()) == 0: - cache, _ = db_manager.get_run_records( - None, - meta_obj.name, - RunStatus.DONE.value, - "started_at", - "DESC", - 1, - None, - cached_cell.checksum, - None, - datetime.fromtimestamp(int(time.time()) - cache_ttl), - ) - else: - cache, _ = db_manager.get_run_records( - None, - meta_obj.name, - RunStatus.DONE.value, - "started_at", - "DESC", - 1, - None, - cached_cell.checksum, - generate_variables_hash(vars), - datetime.fromtimestamp(int(time.time()) - cache_ttl), - ) - if len(cache) > 0: - cache_outputs = ast.literal_eval(cache[0]["outputs"]) - if len(cache_outputs) > 1: - html_path = next( - (x for x in cache_outputs if x.split(".")[-1] == "html"), None - ) - image_path = next( - (x for x in cache_outputs if x.split(".")[-1] == "png"), None - ) - if html_path and image_path: - if logger: - logger.info(f"{meta_obj.name} using cached result.") - return RunCellResult( - result=HtmlImageResponse( - html=open(html_path, "r").read(), - image=convert_image_base64(image_path), - ) + cache_outputs = default_output_paths() + if len(cache_outputs) > 1: + html_path = next((x for x in cache_outputs if x.endswith(".html")), None) + image_path = next((x for x in cache_outputs if x.endswith(".png")), None) + if ( + html_path + and image_path + and os.path.exists(html_path) + and os.path.exists(image_path) + ): + if logger: + logger.info( + f"Running {meta_obj.name} using existing file-based cache (legacy)." ) - if len(cache_outputs) > 0: - cache_path = cast(str, cache_outputs[0]) - cache_path_ext = cache_path.split(".")[-1] - if cache_path_ext in { - "parquet", - "csv", - "json", - "md", - "txt", - "html", - "png", - } and os.path.exists(cache_path): - cached_result = None - if cache_path_ext == "parquet": - cached_result = RunCellResult( - result=pd.read_parquet(cache_path) - ) - elif cache_path_ext == "csv": - cached_result = RunCellResult(result=pd.read_csv(cache_path)) - elif cache_path_ext == "json": - json_dict = json.loads(open(cache_path, "r").read()) - if not MorphChatStreamChunk.is_chat_stream_chunk_json( - json_dict - ): - cached_result = RunCellResult( - result=pd.read_json(cache_path, orient="records") - ) - elif cache_path_ext == "md" or cache_path_ext == "txt": - cached_result = RunCellResult( - result=MarkdownResponse(open(cache_path, "r").read()) - ) - elif cache_path_ext == "html": + return RunCellResult( + result=HtmlImageResponse( + html=open(html_path, "r").read(), + image=convert_image_base64(image_path), + ) + ) + if len(cache_outputs) > 0: + cache_path = cache_outputs[0] + cache_path_ext = cache_path.split(".")[-1] + if cache_path_ext in { + "parquet", + "csv", + "json", + "md", + "txt", + "html", + "png", + } and os.path.exists(cache_path): + cached_result = None + if cache_path_ext == "parquet": + cached_result = RunCellResult(result=pd.read_parquet(cache_path)) + elif cache_path_ext == "csv": + cached_result = RunCellResult(result=pd.read_csv(cache_path)) + elif cache_path_ext == "json": + json_dict = json.loads(open(cache_path, "r").read()) + if not MorphChatStreamChunk.is_chat_stream_chunk_json(json_dict): cached_result = RunCellResult( - result=HtmlImageResponse(html=open(cache_path, "r").read()) + result=pd.read_json(cache_path, orient="records") ) - elif cache_path_ext == "png": - cached_result = RunCellResult( - result=HtmlImageResponse( - image=convert_image_base64(cache_path) - ) + elif cache_path_ext == "md" or cache_path_ext == "txt": + cached_result = RunCellResult( + result=MarkdownResponse(open(cache_path, "r").read()) + ) + elif cache_path_ext == "html": + cached_result = RunCellResult( + result=HtmlImageResponse(html=open(cache_path, "r").read()) + ) + elif cache_path_ext == "png": + cached_result = RunCellResult( + result=HtmlImageResponse(image=convert_image_base64(cache_path)) + ) + if cached_result: + if logger: + logger.info( + f"{meta_obj.name} using existing file-based cache (legacy)." ) - if cached_result: - if logger: - logger.info(f"{meta_obj.name} using cached result.") - return cached_result + return cached_result - # execute the cell + # ------------------------------------------------------------------ + # Actual execution + # ------------------------------------------------------------------ if ext == "sql": if logger: - logger.info(f"Formatting SQL file: {meta_obj.id} variables: {vars}") - sql = _fill_sql(meta_obj, vars) - return RunCellResult( - result=_run_sql(project, meta_obj, sql, logger), - is_cache_valid=False, - ) + logger.info(f"Formatting SQL file: {meta_obj.id} with variables: {vars}") + sql_text = _fill_sql(meta_obj, vars) + result_df = _run_sql(project, meta_obj, sql_text, logger) + run_cell_result = RunCellResult(result=result_df, is_cache_valid=False) else: if not meta_obj.function: raise ValueError(f"Invalid metadata: {meta_obj}") - return RunCellResult( - result=convert_run_result(execute_with_logger(meta_obj, context, logger)), + run_result = execute_with_logger(meta_obj, context, logger) + run_cell_result = RunCellResult( + result=convert_run_result(run_result), is_cache_valid=False, ) + return run_cell_result + def execute_with_logger(meta_obj, context, logger): + """ + Runs a Python function (sync or async) with logging. + """ try: if is_coroutine_function(meta_obj.function): @@ -313,9 +339,8 @@ async def run_async(): else: with redirect_stdout_to_logger(logger, logging.INFO): result = meta_obj.function(context) - except Exception: # noqa - raise - + except Exception as e: + raise e return result @@ -323,10 +348,10 @@ def is_coroutine_function(func: Callable) -> bool: return asyncio.iscoroutinefunction(func) -def _fill_sql( - resource: MorphFunctionMetaObject, - vars: dict[str, Any] = {}, -) -> str: +def _fill_sql(resource: MorphFunctionMetaObject, vars: dict[str, Any] = {}) -> str: + """ + Reads a SQL file from disk and applies Jinja-based templating using the provided vars. + """ if not resource.id or not resource.name: raise ValueError("resource id or name is not set.") @@ -353,14 +378,17 @@ def _load_data(v: Optional[str] = None) -> str: env.globals["connection"] = _connection env.globals["load_data"] = _load_data - sql = open(filepath, "r").read() - template = env.from_string(sql) - sql = template.render(vars) + sql_original = open(filepath, "r").read() + template = env.from_string(sql_original) + rendered_sql = template.render(vars) - return str(sql) + return str(rendered_sql) def _regist_sql_data_requirements(resource: MorphFunctionMetaObject) -> List[str]: + """ + Parses a SQL file to identify 'load_data()' references and sets data requirements accordingly. + """ if not resource.id or not resource.name: raise ValueError("resource id or name is not set.") @@ -389,8 +417,8 @@ def _load_data(v: Optional[str] = None) -> str: env.globals["connection"] = _connection env.globals["load_data"] = _load_data - sql = open(filepath, "r").read() - template = env.from_string(sql) + sql_original = open(filepath, "r").read() + template = env.from_string(sql_original) template.render() if len(load_data) > 0: meta = MorphFunctionMetaObject( @@ -419,16 +447,20 @@ def _run_sql( sql: str, logger: Optional[logging.Logger], ) -> pd.DataFrame: + """ + Execute SQL via DuckDB (if data_requirements exist) or via a configured connection. + """ load_data = resource.data_requirements or [] connection = resource.connection + # If data dependencies exist, load them into DuckDB. if load_data: from duckdb import connect context = MorphGlobalContext.get_instance() con = connect() - for df_name, df in context.data.items(): - con.register(df_name, df) + for df_name, df_value in context.data.items(): + con.register(df_name, df_value) return con.sql(sql).to_df() # type: ignore database_connection: Optional[Union[Connection, DatabaseConnection]] = None @@ -445,9 +477,7 @@ def _run_sql( if project is None: raise ValueError("Could not find project.") elif project.default_connection is None: - raise ValueError( - "Default connection is not set in morph_project.yml. Please set default_connection." - ) + raise ValueError("Default connection is not set in morph_project.yml.") default_connection = project.default_connection connection_yaml = ConnectionYaml.load_yaml() database_connection = ConnectionYaml.find_connection( @@ -463,44 +493,29 @@ def _run_sql( logger.info("Connecting to database...") df = connector.execute_sql(sql) if logger: - logger.info("Obtained results from database...") + logger.info("Obtained results from database.") return df def _run_cell_with_dag( project: Optional[MorphProject], cell: MorphFunctionMetaObject, - db_manager: SqliteDBManager, vars: dict[str, Any] = {}, dag: Optional[RunDagArgs] = None, meta_obj_cache: Optional[MorphFunctionMetaObjectCache] = None, ) -> RunCellResult: if dag is None: - raise ValueError("dag is not set.") - - log_path = os.path.join(dag.runs_dir, f"{cell.name}.log") - logger = get_morph_logger(log_path) - - cached_cell = meta_obj_cache.find_by_name(cell.name) if meta_obj_cache else None - - db_manager.insert_run_record( - dag.run_id, - cell.name, - True, - log_path, - cached_cell.checksum if cached_cell else None, - generate_variables_hash(vars), - vars, - ) + raise ValueError("No DAG settings provided.") + logger = get_morph_logger() filepath = cell.id.split(":")[0] ext = os.path.splitext(os.path.basename(filepath))[1] + try: - logger.info(f"Running load_data file: {filepath}, variables: {vars}") + logger.info(f"Running load_data file: {filepath}, with variables: {vars}") output = run_cell( project, cell, - db_manager, vars, logger, dag, @@ -514,17 +529,13 @@ def _run_cell_with_dag( logger.error(text) finalize_run( project, - db_manager, cell, cell.name, RunStatus.FAILED.value, None, logger, dag.run_id, - CliError( - type="general", - details=text, - ), + CliError(type="general", details=text), ) raise Exception(text) @@ -535,7 +546,6 @@ def _run_cell_with_dag( ): stream_and_write( project, - db_manager, cell, cell.name, RunStatus.DONE.value, @@ -547,7 +557,6 @@ def _run_cell_with_dag( else: finalize_run( project, - db_manager, cell, cell.name, RunStatus.DONE.value, @@ -556,8 +565,7 @@ def _run_cell_with_dag( dag.run_id, None, ) - logger.info(f"Successfully ran file: {filepath}") - + logger.info(f"Successfully executed file: {filepath}") return output diff --git a/core/morph/task/utils/run_backend/inspection.py b/core/morph/task/utils/run_backend/inspection.py index 13c7826..e3a4b71 100644 --- a/core/morph/task/utils/run_backend/inspection.py +++ b/core/morph/task/utils/run_backend/inspection.py @@ -8,8 +8,7 @@ from jinja2 import Environment, nodes from pydantic import BaseModel -from morph.config.project import MorphProject -from morph.constants import MorphConstant +from morph.config.project import MorphProject, default_output_paths from .errors import MorphFunctionLoadError, MorphFunctionLoadErrorCategory @@ -149,27 +148,7 @@ def _import_sql_file( if name is None: name = file.stem if output_paths is None: - if project and project.output_paths and len(project.output_paths) > 0: - project_output_paths: List[str] = [] - for project_output_path in project.output_paths: - if ( - Path(project_output_path).is_dir() - and "ext()" not in project_output_path - ): - project_output_paths.append( - f"{project_output_path}/{{name}}/{{run_id}}{{ext()}}" - ) - else: - project_output_paths.append(project_output_path) - output_paths = ( - project_output_paths - if len(project_output_paths) > 0 - else output_paths - ) - else: - output_paths = [ - f"{MorphConstant.TMP_MORPH_DIR}/{{name}}/{{run_id}}{{ext()}}" - ] + output_paths = default_output_paths() if output_type is None: output_type = "dataframe" if result_cache_ttl is None: diff --git a/core/morph/task/utils/run_backend/output.py b/core/morph/task/utils/run_backend/output.py index 1aea161..8cfc383 100644 --- a/core/morph/task/utils/run_backend/output.py +++ b/core/morph/task/utils/run_backend/output.py @@ -6,7 +6,6 @@ import os import threading import traceback -from pathlib import Path from typing import Any, Dict, Generator, List, Literal, Optional, Union, cast import click @@ -22,7 +21,7 @@ ) from pydantic import BaseModel -from morph.config.project import MorphProject +from morph.config.project import MorphProject, default_output_paths from morph.constants import MorphConstant from morph.task.utils.logging import ( redirect_stdout_to_logger, @@ -30,7 +29,7 @@ ) from morph.task.utils.morph import Resource from morph.task.utils.run_backend.state import MorphFunctionMetaObject -from morph.task.utils.sqlite import CliError, RunStatus, SqliteDBManager +from morph.task.utils.run_backend.types import CliError, RunStatus class StreamChatResponse(BaseModel): @@ -51,7 +50,6 @@ def to_model(cls, data: List[Dict[str, Any]]) -> "StreamChatResponse": def finalize_run( project: Optional[MorphProject], - db_manager: SqliteDBManager, resource: MorphFunctionMetaObject, cell_alias: str, final_state: str, @@ -59,29 +57,13 @@ def finalize_run( logger: logging.Logger, run_id: str, error: Optional[CliError], -) -> None: - output_paths: Optional[List[str]] = ( - _save_output_to_file( - project, - resource, - output, - logger, - run_id, - ) - if final_state != RunStatus.FAILED.value - else None - ) - abs_output_paths: List[str] = [] - for output_path in output_paths if output_paths is not None else []: - abs_path = Path(output_path).resolve() - abs_output_paths.append(str(abs_path)) - - db_manager.update_run_record( +) -> Optional[List[str]]: + return _save_output_to_file( + project, + resource, + output, + logger, run_id, - cell_alias, - final_state, - error, - None if len(abs_output_paths) < 1 else abs_output_paths, ) @@ -166,7 +148,6 @@ def is_generator(output: Any) -> bool: def stream_and_write_and_response( project: Optional[MorphProject], - db_manager: SqliteDBManager, resource: MorphFunctionMetaObject, cell_alias: str, final_state: str, @@ -204,7 +185,6 @@ async def process_async_output(): finally: finalize_run( project, - db_manager, resource, cell_alias, final_state_, @@ -246,7 +226,6 @@ async def process_async_output(): finally: finalize_run( project, - db_manager, resource, cell_alias, final_state, @@ -264,7 +243,6 @@ async def process_async_output(): def stream_and_write( project: Optional[MorphProject], - db_manager: SqliteDBManager, resource: MorphFunctionMetaObject, cell_alias: str, final_state: str, @@ -300,7 +278,6 @@ async def process_async_output(): finally: finalize_run( project, - db_manager, resource, cell_alias, final_state_, @@ -334,7 +311,6 @@ async def process_async_output(): finally: finalize_run( project, - db_manager, resource, cell_alias, final_state, @@ -394,22 +370,7 @@ def _infer_output_type(output: Any) -> Optional[str]: def _get_output_paths( project: Optional[MorphProject], resource: MorphFunctionMetaObject ) -> List[str]: - output_paths: List[str] = [] - if project and project.output_paths and len(project.output_paths) > 0: - project_output_paths: List[str] = [] - for project_output_path in project.output_paths: - if ( - os.path.isdir(project_output_path) - and "ext()" not in project_output_path - ): - project_output_paths.append( - f"{project_output_path}/{{name}}/{{run_id}}{{ext()}}" - ) - else: - project_output_paths.append(project_output_path) - output_paths = ( - project_output_paths if len(project_output_paths) > 0 else output_paths - ) + output_paths = default_output_paths() if resource.output_paths and len(resource.output_paths) > 0: output_paths = cast(list, resource.output_paths) output_type = resource.output_type if resource.output_type else None diff --git a/core/morph/task/utils/run_backend/types.py b/core/morph/task/utils/run_backend/types.py new file mode 100644 index 0000000..6d38a86 --- /dev/null +++ b/core/morph/task/utils/run_backend/types.py @@ -0,0 +1,34 @@ +from enum import Enum +from typing import List, Literal, Optional, Union + +from pydantic import BaseModel + + +class RunStatus(str, Enum): + DONE = "done" + TIMEOUT = "timeout" + IN_PROGRESS = "inProgress" + FAILED = "failed" + + +class StackTraceFrame(BaseModel): + filename: str + lineno: Optional[int] = None + name: str + line: Optional[str] = None + + +class PythonError(BaseModel): + type: str + message: str + code: str + stacktrace: str + structured_stacktrace: List[StackTraceFrame] + + +GeneralError = str + + +class CliError(BaseModel): + type: Literal["python", "general"] + details: Union[PythonError, GeneralError] diff --git a/core/morph/task/utils/sqlite.py b/core/morph/task/utils/sqlite.py deleted file mode 100644 index 0b3376f..0000000 --- a/core/morph/task/utils/sqlite.py +++ /dev/null @@ -1,256 +0,0 @@ -import json -import os -import sqlite3 -from datetime import datetime -from enum import Enum -from typing import List, Literal, Optional, Tuple, Union - -from pydantic import BaseModel - -from morph.constants import MorphConstant - - -class RunStatus(str, Enum): - DONE = "done" - TIMEOUT = "timeout" - IN_PROGRESS = "inProgress" - FAILED = "failed" - - -class StackTraceFrame(BaseModel): - filename: str - lineno: Optional[int] = None - name: str - line: Optional[str] = None - - -class PythonError(BaseModel): - type: str - message: str - code: str - stacktrace: str - structured_stacktrace: List[StackTraceFrame] - - -GeneralError = str - - -class CliError(BaseModel): - type: Literal["python", "general"] - details: Union[PythonError, GeneralError] - - -class SqliteDBManager: - def __init__(self, project_root: str): - self.project_root = project_root - self.db_path = os.path.join(self.project_root, MorphConstant.MORPH_PROJECT_DB) - - def initialize_database(self): - # Connect to the SQLite database - conn = sqlite3.connect(self.db_path) - cursor = conn.cursor() - - # Create "runs" table - cursor.execute( - """ - CREATE TABLE IF NOT EXISTS runs ( - run_id TEXT, - canvas TEXT, - cell_alias TEXT, - is_dag BOOLEAN, - status TEXT, - error TEXT, - started_at TEXT, - ended_at TEXT, - log TEXT, - outputs TEXT, - PRIMARY KEY (run_id, canvas, cell_alias) - ) - """ - ) - conn.commit() - - # Check if 'variables_hash' column exists, and add it if it doesn't - cursor.execute("PRAGMA table_info(runs)") - columns = [column[1] for column in cursor.fetchall()] - if "variables_hash" not in columns: - cursor.execute("ALTER TABLE runs ADD COLUMN variables_hash TEXT") - if "variables" not in columns: - cursor.execute("ALTER TABLE runs ADD COLUMN variables TEXT") - if "file_hash" not in columns: - cursor.execute("ALTER TABLE runs ADD COLUMN file_hash TEXT") - - # Create indexes for "runs" table - cursor.execute( - """ - CREATE INDEX IF NOT EXISTS idx_runs_cell_alias ON runs(cell_alias) - """ - ) - - # Commit changes and close the connection - conn.commit() - conn.close() - - def insert_run_record( - self, - run_id: str, - cell_alias: str, - is_dag: bool, - log_path: str, - file_hash: Optional[str], - variables_hash: Optional[str], - variables: Optional[dict], - ) -> None: - conn = sqlite3.connect(self.db_path) - cursor = conn.cursor() - - try: - cursor.execute("BEGIN TRANSACTION") - cursor.execute( - """ - INSERT INTO runs (run_id, canvas, cell_alias, is_dag, status, started_at, ended_at, log, outputs, variables_hash, variables, file_hash) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, - ( - run_id, - None, - cell_alias, - is_dag, - RunStatus.IN_PROGRESS.value, - datetime.now().isoformat(), - None, - log_path, - None, - variables_hash, - json.dumps(variables) if variables else None, - file_hash, - ), - ) - conn.commit() - except sqlite3.Error as e: - conn.rollback() - raise e - finally: - conn.close() - - def update_run_record( - self, - run_id: str, - cell_alias: str, - new_status: str, - error: Optional[CliError], - outputs: Optional[Union[str, dict, List[str]]] = None, - ) -> None: - conn = sqlite3.connect(self.db_path) - cursor = conn.cursor() - - ended_at = datetime.now().isoformat() - - # Ensure error and outputs are JSON serializable strings - error_str: Optional[str] = None - if error: - error_str = error.model_dump_json() - if outputs and not isinstance(outputs, str): - outputs = json.dumps(outputs) - - try: - cursor.execute("BEGIN TRANSACTION") - cursor.execute( - """ - UPDATE runs - SET status = ?, error = ?, ended_at = ?, outputs = ? - WHERE run_id = ? AND cell_alias = ? - """, - (new_status, error_str, ended_at, outputs, run_id, cell_alias), - ) - conn.commit() - except sqlite3.Error as e: - conn.rollback() - raise e - finally: - conn.close() - - def get_run_records( - self, - canvas: Optional[str], - cell_alias: str, - status: Optional[str], - sort_by: Optional[str], - order: Optional[str], - limit: Optional[int], - skip: Optional[int], - file_hash: Optional[str] = None, - variables_hash: Optional[str] = None, - greather_than_ended_at: Optional[datetime] = None, - run_id: Optional[str] = None, - ) -> Tuple[List[dict], int]: - conn = sqlite3.connect(self.db_path) - cursor = conn.cursor() - - try: - params = [cell_alias] - base_query = "FROM runs WHERE cell_alias = ?" - - if canvas: - base_query += " AND canvas = ?" - params.append(canvas) - if status: - base_query += " AND status = ?" - params.append(status) - if file_hash: - base_query += " AND file_hash = ?" - params.append(file_hash) - if variables_hash: - base_query += " AND variables_hash = ?" - params.append(variables_hash) - if greather_than_ended_at: - base_query += " AND ended_at >= ?" - params.append(greather_than_ended_at.isoformat()) - if run_id: - base_query += " AND run_id = ?" - params.append(run_id) - - count_query = f"SELECT COUNT(*) {base_query}" - cursor.execute(count_query, params) - count = cursor.fetchone()[0] - - query = f"SELECT * {base_query}" - if sort_by: - if not order: - order = "DESC" - query += f" ORDER BY {sort_by} {order}" - if limit: - query += f" LIMIT {limit}" - if skip: - query += f" OFFSET {skip}" - - cursor.execute(query, params) - - column_names = [description[0] for description in cursor.description] - records = cursor.fetchall() - result = [dict(zip(column_names, row)) for row in records] - except sqlite3.Error as e: - raise e - finally: - conn.close() - - return result, count - - def get_run_records_by_run_id(self, run_id: str) -> List[dict]: - conn = sqlite3.connect(self.db_path) - cursor = conn.cursor() - - try: - cursor.execute( - "SELECT * FROM runs WHERE run_id = ? ORDER BY started_at ASC", (run_id,) - ) - - column_names = [description[0] for description in cursor.description] - records = cursor.fetchall() - result = [dict(zip(column_names, row)) for row in records] - except sqlite3.Error as e: - raise e - finally: - conn.close() - - return result diff --git a/poetry.lock b/poetry.lock index 964027c..191f33d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.5 and should not be changed by hand. [[package]] name = "aiomysql" @@ -251,32 +251,32 @@ lxml = ["lxml"] [[package]] name = "boto3" -version = "1.35.93" +version = "1.36.1" description = "The AWS SDK for Python" optional = false python-versions = ">=3.8" files = [ - {file = "boto3-1.35.93-py3-none-any.whl", hash = "sha256:7de2c44c960e486f3c57e5203ea6393c6c4f0914c5f81c789ceb8b5d2ba5d1c5"}, - {file = "boto3-1.35.93.tar.gz", hash = "sha256:2446e819cf4e295833474cdcf2c92bc82718ce537e9ee1f17f7e3d237f60e69b"}, + {file = "boto3-1.36.1-py3-none-any.whl", hash = "sha256:eb21380d73fec6645439c0d802210f72a0cdb3295b02953f246ff53f512faa8f"}, + {file = "boto3-1.36.1.tar.gz", hash = "sha256:258ab77225a81d3cf3029c9afe9920cd9dec317689dfadec6f6f0a23130bb60a"}, ] [package.dependencies] -botocore = ">=1.35.93,<1.36.0" +botocore = ">=1.36.1,<1.37.0" jmespath = ">=0.7.1,<2.0.0" -s3transfer = ">=0.10.0,<0.11.0" +s3transfer = ">=0.11.0,<0.12.0" [package.extras] crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.35.93" +version = "1.36.1" description = "Low-level, data-driven core of boto 3." optional = false python-versions = ">=3.8" files = [ - {file = "botocore-1.35.93-py3-none-any.whl", hash = "sha256:47f7161000af6036f806449e3de12acdd3ec11aac7f5578e43e96241413a0f8f"}, - {file = "botocore-1.35.93.tar.gz", hash = "sha256:b8d245a01e7d64c41edcf75a42be158df57b9518a83a3dbf5c7e4b8c2bc540cc"}, + {file = "botocore-1.36.1-py3-none-any.whl", hash = "sha256:dec513b4eb8a847d79bbefdcdd07040ed9d44c20b0001136f0890a03d595705a"}, + {file = "botocore-1.36.1.tar.gz", hash = "sha256:f789a6f272b5b3d8f8756495019785e33868e5e00dd9662a3ee7959ac939bb12"}, ] [package.dependencies] @@ -288,7 +288,7 @@ urllib3 = [ ] [package.extras] -crt = ["awscrt (==0.22.0)"] +crt = ["awscrt (==0.23.4)"] [[package]] name = "cachetools" @@ -955,13 +955,13 @@ grpcio-gcp = ["grpcio-gcp (>=0.2.2,<1.0.dev0)"] [[package]] name = "google-api-python-client" -version = "2.157.0" +version = "2.159.0" description = "Google API Client Library for Python" optional = false python-versions = ">=3.7" files = [ - {file = "google_api_python_client-2.157.0-py2.py3-none-any.whl", hash = "sha256:0b0231db106324c659bf8b85f390391c00da57a60ebc4271e33def7aac198c75"}, - {file = "google_api_python_client-2.157.0.tar.gz", hash = "sha256:2ee342d0967ad1cedec43ccd7699671d94bff151e1f06833ea81303f9a6d86fd"}, + {file = "google_api_python_client-2.159.0-py2.py3-none-any.whl", hash = "sha256:baef0bb631a60a0bd7c0bf12a5499e3a40cd4388484de7ee55c1950bf820a0cf"}, + {file = "google_api_python_client-2.159.0.tar.gz", hash = "sha256:55197f430f25c907394b44fa078545ffef89d33fd4dca501b7db9f0d8e224bd6"}, ] [package.dependencies] @@ -2108,13 +2108,13 @@ signedtoken = ["cryptography (>=3.0.0)", "pyjwt (>=2.0.0,<3)"] [[package]] name = "openai" -version = "1.59.3" +version = "1.59.7" description = "The official Python library for the openai API" optional = false python-versions = ">=3.8" files = [ - {file = "openai-1.59.3-py3-none-any.whl", hash = "sha256:b041887a0d8f3e70d1fc6ffbb2bf7661c3b9a2f3e806c04bf42f572b9ac7bc37"}, - {file = "openai-1.59.3.tar.gz", hash = "sha256:7f7fff9d8729968588edf1524e73266e8593bb6cab09298340efb755755bb66f"}, + {file = "openai-1.59.7-py3-none-any.whl", hash = "sha256:cfa806556226fa96df7380ab2e29814181d56fea44738c2b0e581b462c268692"}, + {file = "openai-1.59.7.tar.gz", hash = "sha256:043603def78c00befb857df9f0a16ee76a3af5984ba40cb7ee5e2f40db4646bf"}, ] [package.dependencies] @@ -2431,22 +2431,22 @@ testing = ["google-api-core (>=1.31.5)"] [[package]] name = "protobuf" -version = "5.29.2" +version = "5.29.3" description = "" optional = false python-versions = ">=3.8" files = [ - {file = "protobuf-5.29.2-cp310-abi3-win32.whl", hash = "sha256:c12ba8249f5624300cf51c3d0bfe5be71a60c63e4dcf51ffe9a68771d958c851"}, - {file = "protobuf-5.29.2-cp310-abi3-win_amd64.whl", hash = "sha256:842de6d9241134a973aab719ab42b008a18a90f9f07f06ba480df268f86432f9"}, - {file = "protobuf-5.29.2-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:a0c53d78383c851bfa97eb42e3703aefdc96d2036a41482ffd55dc5f529466eb"}, - {file = "protobuf-5.29.2-cp38-abi3-manylinux2014_aarch64.whl", hash = "sha256:494229ecd8c9009dd71eda5fd57528395d1eacdf307dbece6c12ad0dd09e912e"}, - {file = "protobuf-5.29.2-cp38-abi3-manylinux2014_x86_64.whl", hash = "sha256:b6b0d416bbbb9d4fbf9d0561dbfc4e324fd522f61f7af0fe0f282ab67b22477e"}, - {file = "protobuf-5.29.2-cp38-cp38-win32.whl", hash = "sha256:e621a98c0201a7c8afe89d9646859859be97cb22b8bf1d8eacfd90d5bda2eb19"}, - {file = "protobuf-5.29.2-cp38-cp38-win_amd64.whl", hash = "sha256:13d6d617a2a9e0e82a88113d7191a1baa1e42c2cc6f5f1398d3b054c8e7e714a"}, - {file = "protobuf-5.29.2-cp39-cp39-win32.whl", hash = "sha256:36000f97ea1e76e8398a3f02936aac2a5d2b111aae9920ec1b769fc4a222c4d9"}, - {file = "protobuf-5.29.2-cp39-cp39-win_amd64.whl", hash = "sha256:2d2e674c58a06311c8e99e74be43e7f3a8d1e2b2fdf845eaa347fbd866f23355"}, - {file = "protobuf-5.29.2-py3-none-any.whl", hash = "sha256:fde4554c0e578a5a0bcc9a276339594848d1e89f9ea47b4427c80e5d72f90181"}, - {file = "protobuf-5.29.2.tar.gz", hash = "sha256:b2cc8e8bb7c9326996f0e160137b0861f1a82162502658df2951209d0cb0309e"}, + {file = "protobuf-5.29.3-cp310-abi3-win32.whl", hash = "sha256:3ea51771449e1035f26069c4c7fd51fba990d07bc55ba80701c78f886bf9c888"}, + {file = "protobuf-5.29.3-cp310-abi3-win_amd64.whl", hash = "sha256:a4fa6f80816a9a0678429e84973f2f98cbc218cca434abe8db2ad0bffc98503a"}, + {file = "protobuf-5.29.3-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:a8434404bbf139aa9e1300dbf989667a83d42ddda9153d8ab76e0d5dcaca484e"}, + {file = "protobuf-5.29.3-cp38-abi3-manylinux2014_aarch64.whl", hash = "sha256:daaf63f70f25e8689c072cfad4334ca0ac1d1e05a92fc15c54eb9cf23c3efd84"}, + {file = "protobuf-5.29.3-cp38-abi3-manylinux2014_x86_64.whl", hash = "sha256:c027e08a08be10b67c06bf2370b99c811c466398c357e615ca88c91c07f0910f"}, + {file = "protobuf-5.29.3-cp38-cp38-win32.whl", hash = "sha256:84a57163a0ccef3f96e4b6a20516cedcf5bb3a95a657131c5c3ac62200d23252"}, + {file = "protobuf-5.29.3-cp38-cp38-win_amd64.whl", hash = "sha256:b89c115d877892a512f79a8114564fb435943b59067615894c3b13cd3e1fa107"}, + {file = "protobuf-5.29.3-cp39-cp39-win32.whl", hash = "sha256:0eb32bfa5219fc8d4111803e9a690658aa2e6366384fd0851064b963b6d1f2a7"}, + {file = "protobuf-5.29.3-cp39-cp39-win_amd64.whl", hash = "sha256:6ce8cc3389a20693bfde6c6562e03474c40851b44975c9b2bf6df7d8c4f864da"}, + {file = "protobuf-5.29.3-py3-none-any.whl", hash = "sha256:0a18ed4a24198528f2333802eb075e59dea9d679ab7a6c5efb017a59004d849f"}, + {file = "protobuf-5.29.3.tar.gz", hash = "sha256:5da0f41edaf117bde316404bad1a486cb4ededf8e4a54891296f648e8e076620"}, ] [[package]] @@ -2618,6 +2618,7 @@ description = "Pure-Python implementation of ASN.1 types and DER/BER/CER codecs optional = false python-versions = ">=3.8" files = [ + {file = "pyasn1-0.6.1-py3-none-any.whl", hash = "sha256:0d632f46f2ba09143da3a8afe9e33fb6f92fa2320ab7e886e2d0f7672af84629"}, {file = "pyasn1-0.6.1.tar.gz", hash = "sha256:6f580d2bdd84365380830acf45550f2511469f673cb4a5ae3857a3170128b034"}, ] @@ -2628,6 +2629,7 @@ description = "A collection of ASN.1-based protocols modules" optional = false python-versions = ">=3.8" files = [ + {file = "pyasn1_modules-0.4.1-py3-none-any.whl", hash = "sha256:49bfa96b45a292b711e986f222502c1c9a5e1f4e568fc30e2574a6c7d07838fd"}, {file = "pyasn1_modules-0.4.1.tar.gz", hash = "sha256:c28e2dbf9c06ad61c71a075c7e0f9fd0f1b0bb2d2ad4377f240d33ac2ab60a7c"}, ] @@ -2658,13 +2660,13 @@ files = [ [[package]] name = "pydantic" -version = "2.10.4" +version = "2.10.5" description = "Data validation using Python type hints" optional = false python-versions = ">=3.8" files = [ - {file = "pydantic-2.10.4-py3-none-any.whl", hash = "sha256:597e135ea68be3a37552fb524bc7d0d66dcf93d395acd93a00682f1efcb8ee3d"}, - {file = "pydantic-2.10.4.tar.gz", hash = "sha256:82f12e9723da6de4fe2ba888b5971157b3be7ad914267dea8f05f82b28254f06"}, + {file = "pydantic-2.10.5-py3-none-any.whl", hash = "sha256:4dd4e322dbe55472cb7ca7e73f4b63574eecccf2835ffa2af9021ce113c83c53"}, + {file = "pydantic-2.10.5.tar.gz", hash = "sha256:278b38dbbaec562011d659ee05f63346951b3a248a6f3642e1bc68894ea2b4ff"}, ] [package.dependencies] @@ -3161,13 +3163,13 @@ rsa = ["oauthlib[signedtoken] (>=3.0.0)"] [[package]] name = "responses" -version = "0.25.3" +version = "0.25.6" description = "A utility library for mocking out the `requests` Python library." optional = false python-versions = ">=3.8" files = [ - {file = "responses-0.25.3-py3-none-any.whl", hash = "sha256:521efcbc82081ab8daa588e08f7e8a64ce79b91c39f6e62199b19159bea7dbcb"}, - {file = "responses-0.25.3.tar.gz", hash = "sha256:617b9247abd9ae28313d57a75880422d55ec63c29d33d629697590a034358dba"}, + {file = "responses-0.25.6-py3-none-any.whl", hash = "sha256:9cac8f21e1193bb150ec557875377e41ed56248aed94e4567ed644db564bacf1"}, + {file = "responses-0.25.6.tar.gz", hash = "sha256:eae7ce61a9603004e76c05691e7c389e59652d91e94b419623c12bbfb8e331d8"}, ] [package.dependencies] @@ -3194,20 +3196,20 @@ pyasn1 = ">=0.1.3" [[package]] name = "s3transfer" -version = "0.10.4" +version = "0.11.1" description = "An Amazon S3 Transfer Manager" optional = false python-versions = ">=3.8" files = [ - {file = "s3transfer-0.10.4-py3-none-any.whl", hash = "sha256:244a76a24355363a68164241438de1b72f8781664920260c48465896b712a41e"}, - {file = "s3transfer-0.10.4.tar.gz", hash = "sha256:29edc09801743c21eb5ecbc617a152df41d3c287f67b615f73e5f750583666a7"}, + {file = "s3transfer-0.11.1-py3-none-any.whl", hash = "sha256:8fa0aa48177be1f3425176dfe1ab85dcd3d962df603c3dbfc585e6bf857ef0ff"}, + {file = "s3transfer-0.11.1.tar.gz", hash = "sha256:3f25c900a367c8b7f7d8f9c34edc87e300bde424f779dc9f0a8ae4f9df9264f6"}, ] [package.dependencies] -botocore = ">=1.33.2,<2.0a.0" +botocore = ">=1.36.0,<2.0a.0" [package.extras] -crt = ["botocore[crt] (>=1.33.2,<2.0a.0)"] +crt = ["botocore[crt] (>=1.36.0,<2.0a.0)"] [[package]] name = "seaborn" @@ -3803,6 +3805,17 @@ files = [ [package.dependencies] types-urllib3 = "*" +[[package]] +name = "types-tabulate" +version = "0.9.0.20241207" +description = "Typing stubs for tabulate" +optional = false +python-versions = ">=3.8" +files = [ + {file = "types_tabulate-0.9.0.20241207-py3-none-any.whl", hash = "sha256:b8dad1343c2a8ba5861c5441370c3e35908edd234ff036d4298708a1d4cf8a85"}, + {file = "types_tabulate-0.9.0.20241207.tar.gz", hash = "sha256:ac1ac174750c0a385dfd248edc6279fa328aaf4ea317915ab879a2ec47833230"}, +] + [[package]] name = "types-urllib3" version = "1.26.25.14" @@ -3884,13 +3897,13 @@ standard = ["colorama (>=0.4)", "httptools (>=0.6.3)", "python-dotenv (>=0.13)", [[package]] name = "virtualenv" -version = "20.28.1" +version = "20.29.0" description = "Virtual Python Environment builder" optional = false python-versions = ">=3.8" files = [ - {file = "virtualenv-20.28.1-py3-none-any.whl", hash = "sha256:412773c85d4dab0409b83ec36f7a6499e72eaf08c80e81e9576bca61831c71cb"}, - {file = "virtualenv-20.28.1.tar.gz", hash = "sha256:5d34ab240fdb5d21549b76f9e8ff3af28252f5499fb6d6f031adac4e5a8c5329"}, + {file = "virtualenv-20.29.0-py3-none-any.whl", hash = "sha256:c12311863497992dc4b8644f8ea82d3b35bb7ef8ee82e6630d76d0197c39baf9"}, + {file = "virtualenv-20.29.0.tar.gz", hash = "sha256:6345e1ff19d4b1296954cee076baaf58ff2a12a84a338c62b02eda39f20aa982"}, ] [package.dependencies] @@ -4018,4 +4031,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "905cec002c7dbadf7a9c67ae31816027287ef8bb0ae511cc7be34d6bf01b6ebe" +content-hash = "78da45cea1f77e9d07499bf051586ccdc1bb65b09f7104b20c8b1687c68c76fe" diff --git a/pyproject.toml b/pyproject.toml index fc64763..1b93467 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -108,6 +108,7 @@ pydantic = {extras = ["mypy"], version = "^2.8.2"} pandas-stubs = "^2.2.2.240603" types-pyyaml = "^6.0.12.20240311" types-paramiko = "^3.4.0.20240423" +types-tabulate = "^0.9.0.20241207" [tool.poetry.scripts] morph = "morph.cli.main:cli"