diff --git a/osism/commands/reconciler.py b/osism/commands/reconciler.py index 826edb70..e4923152 100644 --- a/osism/commands/reconciler.py +++ b/osism/commands/reconciler.py @@ -1,11 +1,13 @@ # SPDX-License-Identifier: Apache-2.0 import subprocess +import time from cliff.command import Command from loguru import logger from osism.tasks import reconciler +from osism.utils import redis class Run(Command): @@ -30,12 +32,51 @@ def get_parser(self, prog_name): help="Do not wait until the sync has been completed", action="store_true", ) + parser.add_argument( + "--task-timeout", + default=3600, + type=int, + help="Timeout for a scheduled task that has not been executed yet", + ) return parser def take_action(self, parsed_args): wait = not parsed_args.no_wait + task_timeout = parsed_args.task_timeout - task = reconciler.run.delay() + t = reconciler.run.delay(publish=wait) if wait: - logger.info(f"Task {task.task_id} is running. Wait. No more output.") - task.wait(timeout=None, interval=0.5) + logger.info( + f"Task {t.task_id} is running in background. Output coming soon." + ) + rc = 0 + stoptime = time.time() + task_timeout + last_id = 0 + while time.time() < stoptime: + data = redis.xread( + {str(t.task_id): last_id}, count=1, block=(300 * 1000) + ) + if data: + stoptime = time.time() + task_timeout + messages = data[0] + for message_id, message in messages[1]: + last_id = message_id.decode() + message_type = message[b"type"].decode() + message_content = message[b"content"].decode() + + logger.debug( + f"Processing message {last_id} of type {message_type}" + ) + redis.xdel(str(t.task_id), last_id) + + if message_type == "stdout": + print(message_content, end="") + elif message_type == "rc": + rc = int(message_content) + elif message_type == "action" and message_content == "quit": + redis.close() + return rc + else: + logger.info( + f"Task {t.task_id} is running in background. No more output." + ) diff --git a/osism/tasks/reconciler.py b/osism/tasks/reconciler.py index 38a917d1..10aaaba1 100644 --- a/osism/tasks/reconciler.py +++ b/osism/tasks/reconciler.py @@ -50,7 +50,7 @@ def setup_periodic_tasks(sender, **kwargs): @app.task(bind=True, name="osism.tasks.reconciler.run") -def run(self): +def run(self, publish=True): lock = Redlock( key="lock_osism_tasks_reconciler_run", masters={redis}, auto_release_time=60 ) @@ -59,7 +59,16 @@ def run(self): p = subprocess.Popen( "/run.sh", shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) - p.wait() + + for line in io.TextIOWrapper(p.stdout, encoding="utf-8"): + if publish: + redis.xadd(self.request.id, {"type": "stdout", "content": line}) + + rc = p.wait(timeout=60) + + if publish: + redis.xadd(self.request.id, {"type": "rc", "content": rc}) + redis.xadd(self.request.id, {"type": "action", "content": "quit"}) lock.release() diff --git a/releasenotes/notes/reconciler-logs-a1e260ae5d637709.yaml b/releasenotes/notes/reconciler-logs-a1e260ae5d637709.yaml new file mode 100644 index 00000000..886d3670 --- /dev/null +++ b/releasenotes/notes/reconciler-logs-a1e260ae5d637709.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + Logs from the reconciler are now returned. This makes it easier + to identify errors in the inventory from a configuration repository.