diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 61db9d6..94b31b9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,7 +16,7 @@ jobs: fail-fast: false matrix: os: [macos-latest, ubuntu-latest, windows-latest] - python-version: [3.7, 3.8, 3.9] + python-version: ["3.9"] if: "!contains(github.event.head_commit.message, 'ci skip')" diff --git a/mlchain/__init__.py b/mlchain/__init__.py index 9a8835b..83be48f 100644 --- a/mlchain/__init__.py +++ b/mlchain/__init__.py @@ -1,3 +1,23 @@ +from os import environ + +if "DISABLE_GEVENT_FIX" not in environ: + # Fix gevent + try: + from gevent import monkey + monkey.patch_all(thread=False, socket=False) + except ImportError: + pass + + import ssl + try: + _create_unverified_https_context = ssl._create_unverified_context + except AttributeError: + # Legacy Python that doesn't verify HTTPS certificates by default + pass + else: + # Handle target environment that doesn't support HTTPS verification + ssl._create_default_https_context = _create_unverified_https_context + # Mlchain Context from contextvars import ContextVar from typing import Any, Dict @@ -7,26 +27,17 @@ ) # Parameters of MLchain -__version__ = "0.3.0" +__version__ = "0.3.1" HOST = "https://www.api.mlchain.ml" WEB_HOST = HOST API_ADDRESS = HOST MODEL_ID = None -import ssl - -try: - _create_unverified_https_context = ssl._create_unverified_context -except AttributeError: - # Legacy Python that doesn't verify HTTPS certificates by default - pass -else: - # Handle target environment that doesn't support HTTPS verification - ssl._create_default_https_context = _create_unverified_https_context from os import environ environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES' + from mlchain.base.log import logger from .context import mlchain_context diff --git a/mlchain/cli/mlconfig.yaml b/mlchain/cli/mlconfig.yaml index fe42806..fa366f1 100644 --- a/mlchain/cli/mlconfig.yaml +++ b/mlchain/cli/mlconfig.yaml @@ -26,7 +26,6 @@ gunicorn: threads: 1 # Number of simultaneous threads in workers worker_class: gthread # The base worker_class, can use gevent (For better IO) or uvicorn.workers.UvicornWorker (starlette - For Async) max_requests_jitter: 50 # Restart worker different time - accesslog: mlchain-server.log # Log file for gunicorn bind: - 'unix:/tmp/gunicorn.sock' # Using sock to make gunicorn faster diff --git a/mlchain/config.py b/mlchain/config.py index c686019..af5a5bc 100644 --- a/mlchain/config.py +++ b/mlchain/config.py @@ -102,6 +102,11 @@ def load_config(self, path, mode=None): for k, v in data['mode']['env'][mode].items(): if k in environ: data['mode']['env'][mode][k] = environ[k] + if isinstance(v, dict): + for sub_k, sub_v in v.items(): + env_key = f"{k}___{sub_k}" + if env_key in environ: + data['mode']['env'][mode][k][sub_k] = environ[env_key] self.update(data['mode']['env'][mode]) def get_client_config(self, name): @@ -152,6 +157,11 @@ def load_config(data): for k, v in data['mode']['env'][mode].items(): if k in environ: data['mode']['env'][mode][k] = environ[k] + if isinstance(v, dict): + for sub_k, sub_v in v.items(): + env_key = f"{k}___{sub_k}" + if env_key in environ: + data['mode']['env'][mode][k][sub_k] = environ[env_key] mlconfig.update(data['mode']['env'][mode]) if (mlconfig.MLCHAIN_SENTRY_DSN is not None and mlconfig.MLCHAIN_SENTRY_DSN != 'None') and data.get('wrapper', None) != 'gunicorn': diff --git a/mlchain/server/base.py b/mlchain/server/base.py index 62417af..3a91f3f 100644 --- a/mlchain/server/base.py +++ b/mlchain/server/base.py @@ -85,6 +85,7 @@ def __init__(self, model: ServeModel, name=None, version=None, api_format=None, self.converter = Converter() self.initialize_app() + self._swaggered = False def _check_status(self): """ @@ -113,7 +114,9 @@ def register_swagger(self): """ Add Swagger to URL """ - return self._register_swagger() + if not self._swaggered: + self._register_swagger() + self._swaggered = True def add_endpoint(self, endpoint=None, endpoint_name=None, handler=None, methods=['GET', 'POST']): @@ -178,7 +181,7 @@ def initialize_endpoint(self): self._register_home() try: - self._register_swagger() + self.register_swagger() except Exception as ex: logger.error("Can't register swagger with error {0}".format(ex)) diff --git a/mlchain/workflows/__init__.py b/mlchain/workflows/__init__.py index 46127b3..7d7da88 100644 --- a/mlchain/workflows/__init__.py +++ b/mlchain/workflows/__init__.py @@ -1,5 +1,2 @@ -from .background import Background -from .parallel import Parallel -from .task import Task, SyncTask -from .pipeline import Step, Pipeline -from .limiter import RateLimiter, AsyncRateLimiter \ No newline at end of file +from mlchain.workflows_sync import Task as SyncTask +from mlchain.workflows_sync import Background, Parallel, Task, RateLimiter, AsyncRateLimiter \ No newline at end of file diff --git a/mlchain/workflows/background.py b/mlchain/workflows/background.py deleted file mode 100644 index 9e27b98..0000000 --- a/mlchain/workflows/background.py +++ /dev/null @@ -1,130 +0,0 @@ -import inspect -import time -from threading import Thread, Event -import trio -from .task import Task -from datetime import timedelta -from concurrent.futures import ThreadPoolExecutor -import logging -import traceback - -class BackgroundTask(Thread): - def __init__(self, interval, task, max_repeat, callback=None, max_thread:int=1, pass_fail_job:bool=False): - assert callable(task) - - Thread.__init__(self) - self.stopped = Event() - self.is_done = False - self.interval = interval - self.task = task - self.max_repeat = max_repeat - self.callback = callback - self.output = None - self.pool_limit = ThreadPoolExecutor(max_workers=max_thread) - self.pass_fail_job = pass_fail_job - - if callback is not None: - self.pool_limit_callback = ThreadPoolExecutor(max_workers=1) - - def stop(self): - self.stopped.set() - self.join() - - def get_output(self, task, *args, **kwargs): - try: - self.output = task(*args, **kwargs) - except Exception as ex: - self.output = ("MLCHAIN_BACKGROUND_ERROR", traceback.format_exc()) - self.call_the_callback() - - def call_the_callback(self): - if self.callback: - self.pool_limit_callback.submit(self.callback) - - if isinstance(self.output, tuple) and len(self.output) == 2 and self.output[0] == "MLCHAIN_BACKGROUND_ERROR": - if self.pass_fail_job: - logging.error("BACKGROUND CALL ERROR: {0}".format(self.output[1])) - else: - raise Exception("BACKGROUND CALL ERROR: {0}".format(self.output[1])) - - def run(self): - if self.interval is not None: - count_repeat = 0 - while (self.max_repeat < 0 or count_repeat < self.max_repeat) \ - and (not self.stopped.wait(self.interval.total_seconds())): - - if isinstance(type(self.task), Task) \ - or issubclass(type(self.task), Task): - if inspect.iscoroutinefunction(self.task.func_): - self.pool_limit.submit(self.get_output, trio.run, self.task) - else: - self.pool_limit.submit(self.get_output, self.task.func_, *self.task.args, **self.task.kwargs) - else: - if inspect.iscoroutinefunction(self.task): - self.pool_limit.submit(self.get_output, trio.run, self.task) - else: - self.pool_limit.submit(self.get_output, self.task) - count_repeat += 1 - else: - if isinstance(type(self.task), Task) \ - or issubclass(type(self.task), Task): - if inspect.iscoroutinefunction(self.task.func_): - self.pool_limit.submit(self.get_output, trio.run, self.task) - else: - self.pool_limit.submit(self.get_output, self.task.func_, *self.task.args, **self.task.kwargs) - else: - if inspect.iscoroutinefunction(self.task): - self.pool_limit.submit(self.get_output, trio.run, self.task) - else: - self.pool_limit.submit(self.get_output, self.task) - - self.pool_limit.shutdown(wait=True) - self.is_done = True - - if isinstance(self.output, tuple) and len(self.output) == 2 and self.output[0] == "MLCHAIN_BACKGROUND_ERROR": - if self.pass_fail_job: - logging.error("BACKGROUND CALL ERROR: {0}".format(self.output[1])) - else: - raise Exception("BACKGROUND CALL ERROR: {0}".format(self.output[1])) - - if self.callback is not None: - self.pool_limit_callback.shutdown(wait=True) - self.is_done = True - - def wait(self, interval: float = 0.1): - while not self.is_done: - time.sleep(interval) - return self.output - - def wait(self, interval: float = 0.1): - while not self.is_done: - time.sleep(interval) - return self.output - -class Background: - """ - Run a task in background using Threading.Event - :task: [Task, function] item - :interval: timedelta or float seconds - """ - - def __init__(self, task, interval:float=None, max_repeat:int=-1, callback=None): - assert callable(task), 'You have to transfer a callable instance or an mlchain.Task' - assert (max_repeat > 0 and interval is not None and interval > 0) or max_repeat == -1, "interval need to be set when max_repeat > 0" - assert callback is None or callable(callback), "callback need to be callable" - - if interval is not None: - if isinstance(interval, int) or isinstance(interval, float): - interval = timedelta(seconds = interval) - - self.task = task - self.interval = interval - self.max_repeat = max_repeat - self.callback = callback - - def run(self, max_thread:int=1, pass_fail_job:bool=False): - task = BackgroundTask(interval=self.interval, task=self.task, - max_repeat=self.max_repeat, callback=self.callback, max_thread=max_thread, pass_fail_job=pass_fail_job) - task.start() - - return task \ No newline at end of file diff --git a/mlchain/workflows/parallel.py b/mlchain/workflows/parallel.py deleted file mode 100644 index 0e6b7e0..0000000 --- a/mlchain/workflows/parallel.py +++ /dev/null @@ -1,283 +0,0 @@ -import os -import inspect -from multiprocessing.pool import ThreadPool -import trio -from mlchain.base.log import format_exc, except_handler, logger - - -class TrioProgress(trio.abc.Instrument): - def __init__(self, total, notebook_mode=False, **kwargs): - if notebook_mode: # pragma: no cover - from tqdm.notebook import tqdm - else: - from tqdm import tqdm - - self.tqdm = tqdm(total=total, **kwargs) - - def task_processed(self): - self.tqdm.update(1) - - -class Parallel: - """ - Build a collection of tasks to be executed in parallel - :tasks: List of [Task, function] items - :max_threads: Maximum Threads for this Parallel - :max_retries: Maximum retry time when a task fail - :pass_fail_job: Pass or Raise error when a task run fail - :verbose: Print error or not - """ - - def __init__( - self, - tasks: [], - max_threads: int = 10, - max_retries: int = 0, - pass_fail_job: bool = False, - verbose: bool = True, - threading: bool = True, - ): - """ - :tasks: [Task, function] items - :max_threads: Maximum threads to Parallel, max_threads=0 means no limitation - :max_retries: How many time retry when job fail - :pass_fail_job: No exeption when a job fail - :verbose: Verbose or not - """ - - assert isinstance(tasks, list) and all( - callable(task) for task in tasks - ), "You have to transfer a list of callable instances or mlchain.Task" - self.tasks = tasks - if max_threads == -1: - max_threads = 100 - elif max_threads == 0: - max_threads = os.cpu_count() - self.max_threads = max(0, max_threads) - self.threading = threading - if self.max_threads > 0: - self.limiter = trio.CapacityLimiter(self.max_threads) - else: - self.limiter = None - - self.max_retries = max(max_retries + 1, 1) - self.pass_fail_job = pass_fail_job - self.verbose = verbose - self.show_progress_bar = False - self.progress_bar = None - - def update_progress_bar(self): - if self.show_progress_bar: - self.progress_bar.task_processed() - - async def __call_sync( - self, task, outputs, idx, limiter, max_retries=1, pass_fail_job=False - ): - if limiter is not None: - async with limiter: - for retry_idx in range(max_retries): - try: - outputs[idx] = task() - self.update_progress_bar() - return None - except Exception: - if retry_idx == max_retries - 1 and not pass_fail_job: - with except_handler(): - raise AssertionError( - "ERROR in {}th task.\n {1}".format(idx, format_exc(name="mlchain.workflows.parallel")) - ) - if retry_idx < max_retries - 1 or not self.verbose: - logger.error( - "PARALLEL ERROR in {0}th task and retry task," - " run times = {1}".format(idx, retry_idx + 1) - ) - else: - logger.debug( - "PASSED PARALLEL ERROR in {}th task:".format(idx, format_exc(name="mlchain.workflows.parallel")) - ) - else: - for retry_idx in range(max_retries): - try: - outputs[idx] = task() - self.update_progress_bar() - return None - except Exception: - if retry_idx == max_retries - 1 and not pass_fail_job: - with except_handler(): - raise AssertionError( - "ERROR in {}th task\n".format(idx, format_exc(name="mlchain.workflows.parallel")) - ) - if retry_idx < max_retries - 1 or not self.verbose: - logger.error( - "PARALLEL ERROR in {0}th task and retry task," - " run times = {1}".format(idx, retry_idx + 1) - ) - else: - logger.debug( - "PASSED PARALLEL ERROR: {0}".format(format_exc(name="mlchain.workflows.parallel")) - ) - self.update_progress_bar() - - async def __call_async( - self, task, outputs, idx, limiter, max_retries=1, pass_fail_job=False - ): - if limiter is not None: - async with limiter: - for retry_idx in range(max_retries): - try: - outputs[idx] = await task() - self.update_progress_bar() - return None - except Exception: - if retry_idx == max_retries - 1 and not pass_fail_job: - with except_handler(): - raise AssertionError( - "ERROR in {0}th task\n {1}".format( - idx, - format_exc(name="mlchain.workflows.parallel"), - ) - ) - - if retry_idx < max_retries - 1 or not self.verbose: - logger.error( - "PARALLEL ERROR in {0}th task and retry task, " - "run times = {1}".format(idx, retry_idx + 1) - ) - else: - logger.debug( - "PASSED PARALLEL ERROR in {0}th task: {1}".format( - idx, format_exc(name="mlchain.workflows.parallel") - ) - ) - else: - for retry_idx in range(max_retries): - try: - outputs[idx] = await task() - self.update_progress_bar() - return None - except Exception as ex: - if retry_idx == max_retries - 1 and not pass_fail_job: - with except_handler(): - raise AssertionError( - "ERROR in {0}th task\n {1}".format( - idx, format_exc(name="mlchain.workflows.parallel") - ) - ) - - if retry_idx < max_retries - 1 or not self.verbose: - logger.error( - "PARALLEL ERROR in {0}th task and retry task, " - "run times = {1}".format(idx, retry_idx + 1) - ) - else: - logger.debug( - "PASSED PARALLEL ERROR in {}th task: {1}".format( - idx, format_exc(name="mlchain.workflows.parallel") - ) - ) - - self.update_progress_bar() - - async def dispatch(self): - """ - When you run parallel inside another parallel, please use this function - """ - if len(self.tasks) == 0: - return None - - outputs = [None] * len(self.tasks) - - async with trio.open_nursery() as nursery: - for idx, task in enumerate(self.tasks): - if hasattr(task, "to_async") and callable(task.to_async): - nursery.start_soon( - self.__call_async, - task.to_async(), - outputs, - idx, - self.limiter, - self.max_retries, - self.pass_fail_job, - ) - elif inspect.iscoroutinefunction(task) or ( - not inspect.isfunction(task) - and hasattr(task, "__call__") - and inspect.iscoroutinefunction(task.__call__) - ): - nursery.start_soon( - self.__call_async, - task, - outputs, - idx, - self.limiter, - self.max_retries, - self.pass_fail_job, - ) - else: - nursery.start_soon( - self.__call_sync, - task, - outputs, - idx, - self.limiter, - self.max_retries, - self.pass_fail_job, - ) - - return outputs - - def exec_task(self, task, idx=None): - for retry_idx in range(self.max_retries): - try: - output = task.exec() - self.update_progress_bar() - return output - except Exception as ex: - if retry_idx == self.max_retries - 1 and not self.pass_fail_job: - return ex - if retry_idx < self.max_retries - 1 or not self.verbose: - logger.error( - "PARALLEL ERROR in {0}th task and retry task, " - "run times = {1}".format(idx, retry_idx + 1) - ) - else: - logger.debug( - "PASSED PARALLEL ERROR in {}th task:".format(idx, format_exc(name="mlchain.workflows.parallel")) - ) - return None - - def run(self, progress_bar: bool = False, notebook_mode: bool = False): - """ - When you run parallel in root, please use this function - :progress_bar: Use tqdm to show the progress of calling Parallel - :notebook_mode: Put it to true if run mlchain inside notebook - """ - if self.threading: - pool = ThreadPool(max(1, self.max_threads)) - if progress_bar: - self.show_progress_bar = True - self.progress_bar = TrioProgress( - total=len(self.tasks), notebook_mode=notebook_mode - ) - - async_result = [ - pool.apply_async(self.exec_task, args=[task, idx]) - for idx, task in enumerate(self.tasks) - ] - - results = [] - for result in async_result: - output = result.get() - if isinstance(output, Exception): - pool.terminate() - pool.close() - raise output - results.append(output) - pool.close() - return results - if progress_bar: - self.show_progress_bar = True - self.progress_bar = TrioProgress( - total=len(self.tasks), notebook_mode=notebook_mode - ) - return trio.run(self.dispatch) diff --git a/mlchain/workflows/pipeline.py b/mlchain/workflows/pipeline.py deleted file mode 100644 index f308540..0000000 --- a/mlchain/workflows/pipeline.py +++ /dev/null @@ -1,190 +0,0 @@ - -import time -from .background import Background -from .task import Task, SyncTask -from collections import deque -import threading -import logging - -class Step: - """ - Step of a Pipeline - """ - def __init__(self, func, max_thread:int=1, max_calls:int=None, interval:float=1.0): - """ - Initialize step with func and max_threads - :max_calls: Maximum call step in interval, fps = max_calls / interval - """ - self.func = func - self.max_thread = max_thread - self.max_calls = max_calls - self.interval = interval - - if max_calls is not None: - self.interval_time = self.period / self.max_calls - - self.accept_next_call = time.time() - - def __call__(self, input): - if self.max_calls is not None: - self.accept_next_call = time.time() + self.interval_time - - return self.func(input) - - @property - def can_call(self): - return time.time() > self.accept_next_call - -class StepOutput: - def __init__(self, steps, step_max_thread_dict, pass_fail_job:bool=False): - self.steps = steps - self.current_step = 0 - self.is_available = True - self.max_step_index = len(steps) - self.step_max_thread_dict = step_max_thread_dict - self.pass_fail_job = pass_fail_job - self.is_fail = False - - self.output = [] - - def increase_current_step(self, callback=None): - self.step_max_thread_dict[self.current_step] += 1 - self.current_step += 1 - self.is_available = True - - if isinstance(self.output[-1].output, tuple) and len(self.output[-1].output) == 2 and self.output[-1].output[0] == "MLCHAIN_BACKGROUND_ERROR": - self.is_fail = True - self.current_step = self.max_step_index - self.is_done = True - - if not self.pass_fail_job: - raise Exception("Mlchain Pipeline error, stop here!") - - if callback: - t = threading.Thread(target=callback) - t.daemon = True - t.start() - - # Call next step - if self.need_call: - self.call_next_step() - - def call_first_step(self, input, callback=None): - self.is_available = False - self.step_max_thread_dict[self.current_step] -= 1 - - if not self.is_done: - self.output.append(Background(Task(self.steps[self.current_step], input), callback=SyncTask(self.increase_current_step, callback)).run(pass_fail_job=self.pass_fail_job)) - - def call_next_step(self, callback=None): - self.is_available = False - self.step_max_thread_dict[self.current_step] -= 1 - - self.output.append(Background(Task(self.steps[self.current_step], self.output[-1].output), callback=SyncTask(self.increase_current_step, callback)).run(pass_fail_job=self.pass_fail_job)) - - @property - def is_done(self): - return self.current_step >= self.max_step_index - - @property - def need_call(self): - if self.is_done: - return False - return self.is_available and self.step_max_thread_dict[self.current_step] > 0 - -class Pipeline(object): - def __init__(self, *steps: Step): - """ - Pipeline multiple steps - """ - if len(steps) == 0: - raise ValueError('Input Pipeline should have at least one Step!') - - self.steps = steps - self.max_step_index = len(self.steps) - - # When the pipeline is running loop forever, this is the way to stop it - self.running = False - - def stop(self): - """ Stop a Pipeline """ - self.running = False - - def run(self, inputs, max_processing_queue:int=1000, return_output:bool=True, loop_forever:bool=False, pass_fail_job:bool=False): - """ - :inputs: A list or an iterator - :max_processing_queue: Max of queue for processing task - :return_output: Return output or not - :loop_forever: Loop forever or not, for infinite inputs - :pass_fail_job: Only logging the failure job, not stop pipeline - """ - inputs = iter(inputs) - - self.running = True - self.processing_queue = deque() - self.output_queue = [] - self.step_max_thread_dict = {idx:step.max_thread for idx, step in enumerate(self.steps)} - - while True and self.running: - check_having_update = False - # Pop done processing and add into self.output_queue - while len(self.processing_queue) > 0 and self.processing_queue[0].is_done: - the_output = self.processing_queue.popleft() - if the_output.is_fail and not pass_fail_job: - self.stop() - raise Exception("Pipeline error, stop now!") - - check_having_update = True - - if return_output: - if not the_output.is_fail: - self.output_queue.append(the_output) - else: - del the_output - - # Processing more inputs - while True and self.step_max_thread_dict[0] > 0 and len(self.processing_queue) < max_processing_queue: - try: - input = next(inputs) - except StopIteration: - logging.info("Processing pipeline waiting because there's no input left") - if not loop_forever: - logging.info("Processing pipeline stop because there's no input left, if you want to loop forever, use loop_forever") - self.stop() - break - - check_having_update = True - the_step_output = StepOutput(steps = self.steps, step_max_thread_dict=self.step_max_thread_dict, pass_fail_job=pass_fail_job) - the_step_output.call_first_step(input) - - self.processing_queue.append(the_step_output) - - # Processing processing steps to next step - for processing_step in self.processing_queue: - if processing_step.is_fail and not pass_fail_job: - self.stop() - raise Exception("Pipeline error, stop now!") - - if not check_having_update: - logging.debug("There's no done task update") - time.sleep(0.01) - - # Check already run all then return - while len(self.processing_queue) > 0: - if self.processing_queue[0].is_done: - the_output = self.processing_queue.popleft() - - if the_output.is_fail and not pass_fail_job: - self.stop() - raise Exception("Pipeline error, stop now!") - - if return_output and len(the_output.output) == self.max_step_index: - if not the_output.is_fail: - self.output_queue.append(the_output) - else: - del the_output - else: - time.sleep(0.01) - - logging.info("Stopped Pipeline") - return self.output_queue \ No newline at end of file diff --git a/mlchain/workflows/task.py b/mlchain/workflows/task.py deleted file mode 100644 index 3e8c307..0000000 --- a/mlchain/workflows/task.py +++ /dev/null @@ -1,94 +0,0 @@ -import inspect -import trio -from mlchain import mlchain_context -from sentry_sdk import Hub - -class Task: - """ - This class wrap a function to a Task - :func_: Function - """ - - def __init__(self, func_, *args, **kwargs): - assert callable(func_), 'You have to transfer a callable instance and its params' - self.func_ = func_ - self.args = args - self.kwargs = kwargs - self.span = None - self.context = mlchain_context.copy() - - def exec(self): - if inspect.iscoroutinefunction(self.func_) \ - or (not inspect.isfunction(self.func_) - and hasattr(self.func_, '__call__') - and inspect.iscoroutinefunction(self.func_.__call__)): - return trio.run(self.__call__) - with self: - return self.func_(*self.args, **self.kwargs) - - async def exec_async(self): - return self.__call__() - - async def __call__(self): - """ - Task's process code - """ - async def _call_func(): - if inspect.iscoroutinefunction(self.func_) \ - or (not inspect.isfunction(self.func_) - and hasattr(self.func_, '__call__') - and inspect.iscoroutinefunction(self.func_.__call__)): - async with self: - return await self.func_(*self.args, **self.kwargs) - with self: - return self.func_(*self.args, **self.kwargs) - - transaction = Hub.current.scope.transaction - - if transaction is not None: - with transaction.start_child(op="task", description="{0}".format(self.func_.__name__)) as span: - return await _call_func() - else: - return await _call_func() - - async def __aenter__(self): - return self.__enter__() - - async def __aexit__(self, exc_type, exc_val, exc_tb): - return self.__exit__(exc_type, exc_val, exc_tb) - - def __enter__(self): - mlchain_context.update(self.context) - - def __exit__(self, exc_type, exc_val, exc_tb): - pass - - -class SyncTask: - """ - This class wrap a function to a Task - :func_: Function - """ - - def __init__(self, func_, *args, **kwargs): - assert callable(func_), 'You have to transfer a callable instance and its params' - self.func_ = func_ - self.args = args - self.kwargs = kwargs - self.span = None - self.context = mlchain_context.copy() - - def exec(self): - return self.func_(*self.args, **self.kwargs) - - def __call__(self): - """ - Task's process code - """ - return self.func_(*self.args, **self.kwargs) - - def __enter__(self): - mlchain_context.update(self.context) - - def __exit__(self, exc_type, exc_val, exc_tb): - pass diff --git a/mlchain/workflows_sync/__init__.py b/mlchain/workflows_sync/__init__.py index 0211dd8..28bba38 100644 --- a/mlchain/workflows_sync/__init__.py +++ b/mlchain/workflows_sync/__init__.py @@ -1,3 +1,5 @@ from .background import Background from .parallel import Parallel -from .task import Task \ No newline at end of file +from .task import Task +from .limiter import RateLimiter, AsyncRateLimiter +from .pipeline import Pipeline, Step \ No newline at end of file diff --git a/mlchain/workflows/limiter.py b/mlchain/workflows_sync/limiter.py similarity index 97% rename from mlchain/workflows/limiter.py rename to mlchain/workflows_sync/limiter.py index bd4d370..a6874f3 100644 --- a/mlchain/workflows/limiter.py +++ b/mlchain/workflows_sync/limiter.py @@ -88,7 +88,7 @@ async def __aenter__(self): expect_call_time = self.last_time + self.interval_time - self.average_call_time if expect_call_time >= time.time(): if self.callback: - asyncio.ensure_future(self.callback(until)) + asyncio.ensure_future(self.callback(expect_call_time)) sleeptime = expect_call_time - time.time() if sleeptime > 0: diff --git a/mlchain/workflows_sync/parallel.py b/mlchain/workflows_sync/parallel.py index cd99e2d..839723a 100644 --- a/mlchain/workflows_sync/parallel.py +++ b/mlchain/workflows_sync/parallel.py @@ -37,6 +37,7 @@ def __init__( max_retries: int = 0, pass_fail_job: bool = False, verbose: bool = True, + threading: bool = True, ): """ :tasks: [Task, function] items @@ -55,7 +56,7 @@ def __init__( elif max_threads == 0: max_threads = os.cpu_count() self.max_threads = max(0, max_threads) - + self.threading = threading self.max_retries = max(max_retries + 1, 1) self.pass_fail_job = pass_fail_job self.verbose = verbose diff --git a/mlchain/workflows_sync/pipeline.py b/mlchain/workflows_sync/pipeline.py new file mode 100644 index 0000000..4436981 --- /dev/null +++ b/mlchain/workflows_sync/pipeline.py @@ -0,0 +1,201 @@ + +import time +from multiprocessing.pool import ThreadPool +from .task import Task +from collections import deque +import logging +from typing import List +from mlchain.base.log import format_exc + +logger = logging.getLogger(__name__) + +class Step: + """ + Step of a Pipeline + """ + def __init__(self, func, max_thread:int=1, max_calls:int=None, interval:float=1.0): + """ + Initialize step with func and max_threads + :max_thread: Maximum thread parallel of a Step + :max_calls: Maximum call step in interval, fps = max_calls / interval + """ + self.func = func + self.max_thread = max_thread + self.max_calls = max_calls + self.interval = interval + self.index = None + self.thread_pool = ThreadPool(max(1, self.max_thread)) + + if max_calls is not None: + self.interval_time = self.period / self.max_calls + + self.accept_next_call = time.time() + + def __call__(self, input): + if self.max_calls is not None: + if time.time() < self.accept_next_call: + time.sleep(self.interval_time) + self.accept_next_call = time.time() + self.interval_time + + return self.func(input) + +class StepOutput: + """ + Output of Step Task + """ + def __init__(self, input, steps: List, fail_step_list:List, pass_fail_job:bool=False): + """ + :input: Input of the first step + :steps: list of Step + :fail_step_list: The list of failed Step + :pass_fail_job: Ignore fail step or not + """ + self.last_step_output = input + self.steps = steps + self.n_steps = len(steps) + self.current_step = -1 + self.pass_fail_job = pass_fail_job + self.fail_step_list = fail_step_list + + self.is_success = False + self.exception = None + + self.output = [] + + self.next_step() + + def exec_task(self, task: Task): + try: + output = task.exec() + except Exception as ex: + self.exception = ex + self.fail_step_list.append(self) + + if not self.pass_fail_job: + logger.error( + f"PIPELINE ERROR in {self.current_step}th step: {format_exc()}" + ) + + if self.exception is None: + self.output.append(output) + self.last_step_output = output + self.next_step() + + def next_step(self): + self.current_step += 1 + if self.current_step >= self.n_steps: + self.is_success = True + return self.output + + current_step = self.steps[self.current_step] + task = Task(current_step, self.last_step_output) + current_step.thread_pool.apply_async(self.exec_task, args=[task]) + + @property + def is_done(self): + return self.current_step >= self.n_steps and self.is_success and self.exception is None + + @property + def is_fail(self): + return self.exception is not None + +class Pipeline(object): + def __init__(self, *steps: Step): + """ + Pipeline multiple steps + """ + if len(steps) == 0: + raise ValueError('Input Pipeline should have at least one Step!') + + self.steps = steps + self.n_steps = len(self.steps) + + # When the pipeline is running loop forever, this is the way to stop it + self.running = False + + def stop(self): + """ Stop a Pipeline """ + self.running = False + + def run(self, inputs, max_processing_queue:int=1000, return_output:bool=True, loop_forever:bool=False, pass_fail_job:bool=False, progress_bar=True): + def _process_first_task_done(): + the_output = self.processing_queue.popleft() + check_having_update = True + + if return_output: + self.output_queue.append(the_output) + else: + del the_output + return check_having_update + + """ + :inputs: A list or an iterator + :max_processing_queue: Max of queue for processing task + :return_output: Return output or not + :loop_forever: Loop forever or not, for infinite inputs + :pass_fail_job: Only logging the failure job, not stop Pipeline + :progress_bar: Use tqdm to show the progress of calling Pipeline + """ + inputs = iter(inputs) + + self.running = True + self.processing_queue = deque() + self.output_queue = [] + self.progress_bar = progress_bar + self.fail_step_list = [] + + for idx, step in enumerate(self.steps): + step.index = idx + + while self.running: + check_having_update = False + # Pop done processing and add into self.output_queue + while len(self.processing_queue) > 0 and self.processing_queue[0].is_done: + check_having_update = _process_first_task_done() + + # Processing more inputs + while self.running and len(self.processing_queue) < max_processing_queue: + try: + input = next(inputs) + except StopIteration: + logger.info("Processing pipeline waiting because there's no input left") + if not loop_forever: + logger.info("Processing pipeline stop because there's no input left, if you want to loop forever, use loop_forever") + self.stop() + break + + check_having_update = True + the_step_output = StepOutput(input=input, steps = self.steps, fail_step_list=self.fail_step_list, pass_fail_job=pass_fail_job) + self.processing_queue.append(the_step_output) + + # Check is any step fail + if len(self.fail_step_list) > 0: + if not pass_fail_job: + self.stop() + logger.error("Pipeline error, stop now!") + raise self.fail_step_list[0].exception + + if not check_having_update: + logger.debug("There's no done task update") + time.sleep(0.01) + + # Check already run all then return + while len(self.processing_queue) > 0: + if self.processing_queue[0].is_done: + check_having_update = _process_first_task_done() + elif self.processing_queue[0].is_fail: + if not pass_fail_job: + self.stop() + logger.error("Pipeline error, stop now!") + raise self.fail_step_list[0].exception + else: + self.processing_queue.popleft() + else: + time.sleep(0.01) + + for step in self.steps: + step.thread_pool.close() + + logger.info("Stopped Pipeline") + return self.output_queue + \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 44b69ea..bdcdbad 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,31 +1,31 @@ attrs>=20.3.0 blosc==1.10.6; sys_platform != 'win32' -h11==0.12.0 -Flask==2.1.2 +h11==0.14.0 +Flask==2.2.2 Flask-Cors==3.0.10 gunicorn==20.1.0 -gevent==21.12.0 +gevent==22.10.2 msgpack==1.0.4 numpy<1.20; python_version == '3.6' numpy<=1.21.6; python_version <= '3.7' numpy<=1.22.4; python_version > '3.7' opencv-python>=4.5.1 Pillow>=8.0.1 -starlette[full]==0.20.2 +starlette[full]==0.22.0 requests>=2.25.1 six>=1.13.0 toml>=0.10.0 -trio==0.21.0 urllib3>=1.26.2 -uvicorn[standard]==0.17.6 +uvicorn[standard]==0.20.0 uvloop==0.14.0; sys_platform != 'win32' and python_version == '3.6' -uvloop>=0.16.0; sys_platform != 'win32' and python_version >= '3.7' -httpx==0.23.0 +uvloop==0.16.0; sys_platform != 'win32' and python_version >= '3.7' +httpx==0.23.1 boto3>=1.16.43 pyyaml>=5.3.1 sentry-sdk[flask]>=1.5.12 -thefuzz -GPUtil>=1.4.0 +thefuzz==0.19.0 +python-Levenshtein==0.20.8 +GPUtil==1.4.0 tqdm -pyngrok>=5.0.1 -pynvml \ No newline at end of file +pyngrok>=5.1.0 +pynvml==11.4.1 \ No newline at end of file diff --git a/setup.py b/setup.py index 409f763..f4a002d 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ import os from setuptools import setup, find_packages -__version__ = "0.3.0" +__version__ = "0.3.1" project = "mlchain" diff --git a/tests/test_workflow.py b/tests/test_workflow.py index 58fe267..2d7cc53 100644 --- a/tests/test_workflow.py +++ b/tests/test_workflow.py @@ -4,7 +4,8 @@ import os import time -from mlchain.workflows import Parallel, Task, Background, Pipeline, Step +from mlchain.workflows import Parallel, Task, Background +from mlchain.workflows_sync import Pipeline, Step logger = logging.getLogger() @@ -117,25 +118,25 @@ async def dummy_task(n): return n+1 task = Task(dummy_task, 5) - # def test_mlchain_pipeline(self): - # def step_1(i): - # time.sleep(0.001) - # return i * 2 - - # def step_2(i): - # time.sleep(0.001) - # return i * 2 - - # def step_3(i): - # time.sleep(0.001) - # return i + 1 - - # pipeline = Pipeline( - # Step(step_1, max_thread = 1), - # Step(step_2, max_thread = 1), - # Step(step_3, max_thread = 1) - # ) - # inputs = range(20) - # results = pipeline.run(inputs) - # logger.info("Done pipeline execution. Checking the results") - # assert [x.output[-1].output for x in results] == [1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45, 49, 53, 57, 61, 65, 69, 73, 77] \ No newline at end of file + def test_mlchain_pipeline(self): + def step_1(i): + time.sleep(0.001) + return i * 2 + + def step_2(i): + time.sleep(0.001) + return i * 2 + + def step_3(i): + time.sleep(0.001) + return i + 1 + + pipeline = Pipeline( + Step(step_1, max_thread = 1), + Step(step_2, max_thread = 1), + Step(step_3, max_thread = 1) + ) + inputs = range(20) + results = pipeline.run(inputs) + logger.info("Done pipeline execution. Checking the results") + assert [x.output[-1] for x in results] == [step_3(step_2(step_1(i))) for i in range(20)] \ No newline at end of file diff --git a/tests/utils.py b/tests/utils.py index e009e69..7d60947 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,5 +1,4 @@ import logging -import multiprocessing import os from multiprocessing import Process, Queue from os import getpid, kill @@ -8,9 +7,6 @@ from time import sleep import time -if os.name != 'nt': - multiprocessing.set_start_method("fork") - logger = logging.getLogger()