Skip to content

Commit

Permalink
Add a command-line script
Browse files Browse the repository at this point in the history
  • Loading branch information
mraspaud committed May 14, 2024
1 parent 286cb9f commit 31bb7d6
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 19 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ classifiers = [

[project.scripts]
pytroll-watcher = "pytroll_watchers.main_interface:cli"
pytroll-selector = "pytroll_watchers.selector:cli"

[project.entry-points."pytroll_watchers.backends"]
local = "pytroll_watchers.local_watcher"
Expand Down
98 changes: 87 additions & 11 deletions src/pytroll_watchers/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,61 @@
This is useful when multiple source for the same data are sending messages (eg two reception servers for eumetcast) but
only one of each file is needed for further processing.
At the moment, this module makes use of redis as a refined dictionary for keeping track of the received files.
At the moment, this module makes use of redis as a refined dictionary for keeping track of the received files. Hence a
redis server instance will be started along with some of the functions here, and with the cli.
A command-line script is also made available by this module. It is called ``pytroll-selector``::
usage: pytroll-selector [-h] [-l LOG_CONFIG] config
Selects unique messages from multiple sources.
positional arguments:
config The yaml config file.
options:
-h, --help show this help message and exit
-l LOG_CONFIG, --log-config LOG_CONFIG
The yaml config file for logging.
Thanks for using pytroll-selector!
An example config file to use with this script is the following::
selector_config:
ttl: 30
publisher_config:
name: hrit_selector
subscriber_config:
addresses:
- tcp://eumetcast_reception_1:9999
- tcp://eumetcast_reception_2:9999
nameserver: false
topics:
- /1b/hrit-segment/0deg
The different sections are passed straight on to :py:func:`run_selector`, so check it to have more information about
what to pass to it.
"""

import argparse
import logging
import time
from contextlib import closing, contextmanager
from functools import cache
from pathlib import Path
from subprocess import Popen

import redis
from posttroll.message import Message
import yaml
from posttroll.publisher import create_publisher_from_dict_config
from posttroll.subscriber import create_subscriber_from_dict_config

from pytroll_watchers.main_interface import configure_logging

logger = logging.getLogger(__name__)

@cache
def _connect_to_redis(**kwargs):
Expand All @@ -30,6 +70,12 @@ class TTLDict:
"""A simple dictionary-like object that discards items older than a time-to-live.
Not thread-safe.
Args:
ttl: the time to live of the stored items in integer seconds or as a timedelta instance. Cannot be less
than 1 second.
redis_params: the keyword arguments to pass to the underlying :py:class:`~redis.Redis` instance.
"""

def __init__(self, ttl=300, **redis_params):
Expand Down Expand Up @@ -62,7 +108,7 @@ def running_selector(selector_config, subscriber_config):
Args:
selector_config: a dictionary of arguments to pass to the underlying redis instance, see
https://redis.readthedocs.io/en/stable/connections.html#redis.Redis. You can also provide a ttl as an int
:py:class:`~redis.Redis`. You can also provide a ttl as an int
(seconds) or timedelta instance.
subscriber_config: a dictionary of arguments to pass to
:py:func:`~posttroll.subscriber.create_subscriber_from_dict_config`.
Expand All @@ -75,12 +121,15 @@ def running_selector(selector_config, subscriber_config):
with closing(subscriber):
sel = TTLDict(**selector_config)
for msg in subscriber.recv():
key = Message.decode(msg).data["uid"]
key = msg.data["uid"]
try:
_ = sel[key]
logger.info(f"Discarded {str(msg)}")
except KeyError:
sel[key] = msg
yield msg
msg_string = str(msg)
sel[key] = msg_string
logger.info(f"New content {str(msg)}")
yield msg_string


def _run_selector_with_managed_dict_server(selector_config, subscriber_config, publisher_config):
Expand All @@ -95,17 +144,16 @@ def _run_selector_with_managed_dict_server(selector_config, subscriber_config, p
def run_selector(selector_config, subscriber_config, publisher_config):
"""Run the selector.
The aim of the selector is to skip messages that refer to already processed files. For example
The aim of the selector is to skip messages that are duplicates to already published messages.
Duplicate in this context means messages referring to the same file (even if stored in different locations).
Messages that refer to new files will be published.
Args:
selector_config: a dictionary of arguments to pass to the underlying redis instance, see
https://redis.readthedocs.io/en/stable/connections.html#redis.Redis. You can also provide a ttl as an int
(seconds) or timedelta instance.
:py:class:`~redis.Redis`. You can also provide a *ttl* for the
selector as an int (seconds) or timedelta instance, so that incoming messages are forgotten after that time.
Also, you can provide a *directory* for the underlying datastructure to store the data in.
subscriber_config: a dictionary of arguments to pass to
:py:func:`~posttroll.subscriber.create_subscriber_from_dict_config`. The subscribtion is used as a source for
messages to process.
Expand All @@ -114,7 +162,7 @@ def run_selector(selector_config, subscriber_config, publisher_config):
messages.
"""
with _running_redis_server(port=selector_config.get("port")):
with _running_redis_server(port=selector_config.get("port"), directory=selector_config.pop("directory", None)):
_run_selector_with_managed_dict_server(selector_config, subscriber_config, publisher_config)


Expand All @@ -135,3 +183,31 @@ def _running_redis_server(port=None, directory=None):
finally:
proc.terminate()
proc.wait(3)


def cli(args=None):
"""Command line interface."""
parser = argparse.ArgumentParser(
prog="pytroll-selector",
description="Selects unique messages from multiple sources.",
epilog="Thanks for using pytroll-selector!")

parser.add_argument("config", type=str, help="The yaml config file.")
parser.add_argument("-l", "--log-config", type=str, help="The yaml config file for logging.", default=None)

parsed = parser.parse_args(args)


log_config_filename = parsed.log_config
configure_logging(log_config_filename)

config_file = parsed.config

with open(config_file) as fd:
config_dict = yaml.safe_load(fd.read())

selector_config = config_dict.get("selector_config", {})
subscriber_config = config_dict.get("subscriber_config", {})
publisher_config = config_dict.get("publisher_config", {})

return run_selector(selector_config, subscriber_config, publisher_config)
8 changes: 4 additions & 4 deletions tests/test_local_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ def test_publish_paths(tmp_path, patched_local_events, caplog): # noqa
with patched_local_events([filename]):
with patched_publisher() as messages:
local_watcher.file_publisher(fs_config=local_settings,
publisher_config=publisher_settings,
message_config=message_settings)
publisher_config=publisher_settings,
message_config=message_settings)

assert "uri" not in message_settings["data"]
assert len(messages) == 1
Expand All @@ -115,5 +115,5 @@ def test_publish_paths_forbids_passing_password(tmp_path, patched_local_events,
with patched_publisher():
with pytest.raises(SecurityError):
local_watcher.file_publisher(fs_config=local_settings,
publisher_config=publisher_settings,
message_config=message_settings)
publisher_config=publisher_settings,
message_config=message_settings)
52 changes: 48 additions & 4 deletions tests/test_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
import time

import pytest
import yaml
from posttroll.message import Message
from posttroll.testing import patched_publisher, patched_subscriber_recv
from pytroll_watchers.selector import (
TTLDict,
_run_selector_with_managed_dict_server,
_running_redis_server,
cli,
run_selector,
)

Expand Down Expand Up @@ -48,7 +51,7 @@ def test_run_selector_that_starts_redis_on_given_port(tmp_path):
f'"uid": "{uid}", "uri": "file://{str(sdr_file)}", "path": "{str(sdr_file)}", '
'"filesystem": {"cls": "fsspec.implementations.local.LocalFileSystem", "protocol": "file", "args": []}}')

messages = [msg1]
messages = [Message.decode(msg1)]

pipe_in_address = "ipc://" + str(tmp_path / "in.ipc")
pipe_out_address = "ipc://" + str(tmp_path / "out.ipc")
Expand All @@ -67,7 +70,6 @@ def test_run_selector_that_starts_redis_on_given_port(tmp_path):
assert len(published_messages) == 1



@pytest.fixture(scope="module")
def _redis_server():
"""Start a redis server."""
Expand All @@ -84,7 +86,7 @@ def create_data_file(path):


@pytest.mark.usefixtures("_redis_server")
def test_run_selector_on_single_file_messages(tmp_path):
def test_run_selector_on_single_file_messages(tmp_path, caplog):
"""Test running the selector on single file messages."""
uid = "IVCDB_j02_d20240419_t1114110_e1115356_b07465_c20240419113435035578_cspp_dev.h5"
sdr_file = tmp_path / "sdr" / uid
Expand All @@ -110,7 +112,7 @@ def test_run_selector_on_single_file_messages(tmp_path):
f'"uid": "{uid2}", "uri": "ssh://someplace.pytroll.org:/{str(sdr_file2)}", "path": "{str(sdr_file2)}", '
'"filesystem": {"cls": "fsspec.implementations.ssh.SFTPFileSystem", "protocol": "ssh", "args": []}}')

messages = [msg1, msg2, msg3]
messages = [Message.decode(msg1), Message.decode(msg2), Message.decode(msg3)]

pipe_in_address = "ipc://" + str(tmp_path / "in.ipc")
pipe_out_address = "ipc://" + str(tmp_path / "out.ipc")
Expand All @@ -123,12 +125,17 @@ def test_run_selector_on_single_file_messages(tmp_path):

selector_config = dict(ttl=1, host="localhost", port=6379)

caplog.set_level("INFO")
with patched_subscriber_recv(messages):
with patched_publisher() as published_messages:
_run_selector_with_managed_dict_server(selector_config, subscriber_config, publisher_config)
assert len(published_messages) == 2
assert published_messages[0] == msg1
assert published_messages[1] == msg3
assert "New content " + str(msg1) in caplog.text
assert "Discarded " + str(msg2) in caplog.text
assert "New content " + str(msg3) in caplog.text



@pytest.mark.usefixtures("_redis_server")
Expand All @@ -148,3 +155,40 @@ def test_ttldict():
time.sleep(ttl+1)
sel[key] = other_value
assert sel[key] == other_value


def test_cli(tmp_path):
"""Test the command-line interface."""
uid = "IVCDB_j03_d20240419_t1114110_e1115356_b07465_c20240419113435035578_cspp_dev.h5"
sdr_file = tmp_path / "sdr" / uid
create_data_file(sdr_file)

msg1 = ('pytroll://segment/viirs/l1b/ info [email protected] 2024-04-19T11:35:00.487388 v1.01 '
'application/json {"sensor": "viirs", '
f'"uid": "{uid}", "uri": "file://{str(sdr_file)}", "path": "{str(sdr_file)}", '
'"filesystem": {"cls": "fsspec.implementations.local.LocalFileSystem", "protocol": "file", "args": []}}')

messages = [Message.decode(msg1)]

pipe_in_address = "ipc://" + str(tmp_path / "in.ipc")
pipe_out_address = "ipc://" + str(tmp_path / "out.ipc")
subscriber_config = dict(addresses=[pipe_in_address],
nameserver=False,
port=3000)

publisher_config = dict(address=pipe_out_address,
nameservers=False)

redis_dir = tmp_path / "redis_dir"

selector_config = dict(ttl=1, port=6389, directory=str(redis_dir))
config = dict(publisher_config=publisher_config,
subscriber_config=subscriber_config,
selector_config=selector_config)
config_file = tmp_path / "selector_config"
with open(config_file, "w") as fd:
fd.write(yaml.dump(config))
with patched_subscriber_recv(messages):
with patched_publisher() as published_messages:
cli([str(config_file)])
assert len(published_messages) == 1

0 comments on commit 31bb7d6

Please sign in to comment.