Skip to content

Commit

Permalink
Merge pull request #13 from mraspaud/feature-selector
Browse files Browse the repository at this point in the history
Add selector feature
  • Loading branch information
mraspaud authored May 17, 2024
2 parents f15af05 + ba3b20b commit 0a4c946
Show file tree
Hide file tree
Showing 10 changed files with 522 additions and 8 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ jobs:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
sudo apt install -y redis-server
python -m pip install --upgrade pip
python -m pip install ruff pytest pytest-cov freezegun responses
python -m pip install git+https://github.com/gorakhargosh/watchdog
python -m pip install -e .[local,minio,publishing,ssh,dataspace,datastore,dhus]
python -m pip install -e .[local,minio,publishing,ssh,dataspace,datastore,dhus,selector]
- name: Lint with ruff
run: |
ruff check .
Expand Down
10 changes: 8 additions & 2 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration

extensions = ["sphinx.ext.napoleon", "sphinx.ext.autodoc"]
extensions = ["sphinx.ext.napoleon", "sphinx.ext.autodoc", "sphinx.ext.intersphinx"]
autodoc_mock_imports = ["watchdog", "minio", "posttroll", "pytest", "trollsift", "universal_path",
"freezegun", "responses", "oauthlib", "requests_oauthlib", "defusedxml"]
"freezegun", "responses", "oauthlib", "requests_oauthlib", "defusedxml", "redis"]

templates_path = ["_templates"]
exclude_patterns = []
Expand All @@ -29,3 +29,9 @@

html_theme = "alabaster"
html_static_path = ["_static"]

# intersphinx
intersphinx_mapping = {
"posttroll": ("https://posttroll.readthedocs.io/en/latest/", None),
"redis": ("https://redis.readthedocs.io/en/latest/", None),
}
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Welcome to pytroll-watchers's documentation!
published
backends
other_api
selector

Pytroll-watcher is a library and command-line tool to detect changes on a local or remote file system.

Expand Down
9 changes: 9 additions & 0 deletions docs/source/selector.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
Selector
--------

.. warning::
For this module and script to work properly, redis-server must be installed! It is available for most linux
distributions, or in conda-forge.

.. automodule:: pytroll_watchers.selector
:members:
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ classifiers = [

[project.scripts]
pytroll-watcher = "pytroll_watchers.main_interface:cli"
pytroll-selector = "pytroll_watchers.selector:cli"

[project.entry-points."pytroll_watchers.backends"]
local = "pytroll_watchers.local_watcher"
Expand All @@ -39,6 +40,7 @@ ssh = ["paramiko"]
dataspace = ["oauthlib", "requests_oauthlib", "s3fs"]
datastore = ["oauthlib", "requests_oauthlib"]
dhus = ["defusedxml"]
selector = ["redis"]

[build-system]
requires = ["hatchling", "hatch-vcs"]
Expand Down
5 changes: 4 additions & 1 deletion src/pytroll_watchers/local_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
from urllib.parse import urlunparse

from upath import UPath
from upath._flavour import WrappedFileSystemFlavour

from pytroll_watchers.backends.local import listen_to_local_events
from pytroll_watchers.publisher import SecurityError, file_publisher_from_generator, parse_metadata

# This is a workaround for a but in universal_pathlib, see
WrappedFileSystemFlavour.protocol_config["netloc_is_anchor"].add("ssh")
logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -72,7 +75,7 @@ def file_generator(directory, observer_type="os", file_pattern=None, protocol=No
except ValueError:
continue
if protocol is not None:
uri = urlunparse((protocol, None, path, None, None, None))
uri = urlunparse((protocol, None, str(path), None, None, None))
yield UPath(uri, **storage_options), file_metadata
else:
yield Path(path), file_metadata
238 changes: 238 additions & 0 deletions src/pytroll_watchers/selector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
"""Functions and classes for performing message selection.
Selection in this context means making sure only one message refering to some file will be published further.
This is useful when multiple source for the same data are sending messages (eg two reception servers for eumetcast) but
only one of each file is needed for further processing.
To check if two messages refer to the same data, the *uid* metadata of the messages is used.
At the moment, this module makes use of redis as a refined dictionary for keeping track of the received files. Hence a
redis server instance will be started along with some of the functions here, and with the cli.
A command-line script is also made available by this module. It is called ``pytroll-selector``::
usage: pytroll-selector [-h] [-l LOG_CONFIG] config
Selects unique messages (based on uid) from multiple sources.
positional arguments:
config The yaml config file.
options:
-h, --help show this help message and exit
-l LOG_CONFIG, --log-config LOG_CONFIG
The yaml config file for logging.
Thanks for using pytroll-selector!
An example config file to use with this script is the following::
selector_config:
ttl: 30
publisher_config:
name: hrit_selector
subscriber_config:
addresses:
- tcp://eumetcast_reception_1:9999
- tcp://eumetcast_reception_2:9999
nameserver: false
topics:
- /1b/hrit-segment/0deg
The different sections are passed straight on to :py:func:`run_selector`, so check it to have more information about
what to pass to it.
"""

import argparse
import logging
import time
from contextlib import closing, contextmanager
from functools import cache
from pathlib import Path
from subprocess import Popen

import redis
import yaml
from posttroll.publisher import create_publisher_from_dict_config
from posttroll.subscriber import create_subscriber_from_dict_config

from pytroll_watchers.main_interface import configure_logging

logger = logging.getLogger(__name__)

@cache
def _connect_to_redis(**kwargs):
return redis.Redis(**kwargs)


class TTLDict:
"""A simple dictionary-like object that discards items older than a time-to-live.
Not thread-safe.
Args:
ttl: the time to live of the stored items in integer seconds or as a timedelta instance. Cannot be less
than 1 second.
redis_params: the keyword arguments to pass to the underlying :py:class:`~redis.Redis` instance.
"""

def __init__(self, ttl=300, **redis_params):
"""Set up the instance.
Args:
ttl: the time to live of the stored items in integer seconds or as a timedelta instance. Cannot be less
than 1 second.
redis_params: the keyword arguments to pass to the underlying :py:class:`~redis.Redis` instance.
"""
self._redis = _connect_to_redis(**redis_params)
self._ttl = ttl

def __getitem__(self, key):
"""Get the value corresponding to *key*."""
return self._redis[key]

def __setitem__(self, key, value):
"""Set the *value* corresponding to *key*."""
res = self._redis.get(key)
if not res:
self._redis.set(key, value, ex=self._ttl)

def __contains__(self, key):
"""Check if key is already present."""
try:
_ = self[key]
return True
except KeyError:
return False


def running_selector(selector_config, subscriber_config):
"""Generate selected messages.
The aim of this generator is to skip messages that are duplicates to already processed messages.
Duplicate in this context means messages referring to the same file (even if stored in different locations).
Args:
selector_config: a dictionary of arguments to pass to the underlying redis instance, see
:py:class:`~redis.Redis`. You can also provide a ttl as an int
(seconds) or timedelta instance, otherwise it defaults to 300 seconds (5 minutes).
subscriber_config: a dictionary of arguments to pass to
:py:func:`~posttroll.subscriber.create_subscriber_from_dict_config`.
Yields:
JSON representations of posttroll messages.
"""
ttl_dict = TTLDict(**selector_config)

for msg in _data_messages(subscriber_config):
key = unique_key(msg)
msg_string = str(msg)

if key not in ttl_dict:
ttl_dict[key] = msg_string
logger.info(f"New content {msg_string}")
yield msg_string
else:
logger.debug(f"Discarded {msg_string}")


def _data_messages(subscriber_config):
"""Generate messages referring to new data from subscriber settings."""
subscriber = create_subscriber_from_dict_config(subscriber_config)

with closing(subscriber):
for msg in subscriber.recv():
if msg.type != "file":
continue
yield msg


def unique_key(msg):
"""Identify the content of the message with a unique key."""
return msg.data["uid"]


def _run_selector_with_managed_dict_server(selector_config, subscriber_config, publisher_config):
"""Run the selector with a managed ttldict server."""
publisher = create_publisher_from_dict_config(publisher_config)
publisher.start()
with closing(publisher):
for msg in running_selector(selector_config, subscriber_config):
publisher.send(msg)


def run_selector(selector_config, subscriber_config, publisher_config):
"""Run the selector.
The aim of the selector is to skip messages that are duplicates to already published messages.
Duplicate in this context means messages referring to the same file (even if stored in different locations).
Messages that refer to new files will be published.
Args:
selector_config: a dictionary of arguments to pass to the underlying redis instance, see
:py:class:`~redis.Redis`. You can also provide a *ttl* for the
selector as an int (seconds) or timedelta instance, so that incoming messages are forgotten after that time.
If not provided, the ttl defaults to 300 seconds (5 minutes).
Also, you can provide a *directory* for the underlying datastructure to store the data in.
subscriber_config: a dictionary of arguments to pass to
:py:func:`~posttroll.subscriber.create_subscriber_from_dict_config`. The subscribtion is used as a source for
messages to process.
publisher_config: a dictionary of arguments to pass to
:py:func:`~posttroll.publisher.create_publisher_from_dict_config`. This publisher will send the selected
messages.
"""
with _started_redis_server(port=selector_config.get("port"), directory=selector_config.pop("directory", None)):
_run_selector_with_managed_dict_server(selector_config, subscriber_config, publisher_config)


@contextmanager
def _started_redis_server(port=None, directory=None):
command = ["redis-server"]
if port:
port = str(int(port)) # using int first here prevents arbitrary strings to be passed to Popen
command += ["--port", port]
if directory:
directory = Path(directory)
directory.mkdir(parents=True, exist_ok=True)
command += ["--dir", directory]
proc = Popen(command) # noqa:S603 port is validated and directory is a Path
time.sleep(.25)
try:
yield
finally:
proc.terminate()
proc.wait(3)


def cli(args=None):
"""Command line interface."""
parser = argparse.ArgumentParser(
prog="pytroll-selector",
description="Selects unique messages (based on uid) from multiple sources.",
epilog="Thanks for using pytroll-selector!")

parser.add_argument("config", type=str, help="The yaml config file.")
parser.add_argument("-l", "--log-config", type=str, help="The yaml config file for logging.", default=None)

parsed = parser.parse_args(args)


log_config_filename = parsed.log_config
configure_logging(log_config_filename)

config_file = parsed.config

with open(config_file) as fd:
config_dict = yaml.safe_load(fd.read())

selector_config = config_dict.get("selector_config", {})
subscriber_config = config_dict.get("subscriber_config", {})
publisher_config = config_dict.get("publisher_config", {})

return run_selector(selector_config, subscriber_config, publisher_config)
8 changes: 4 additions & 4 deletions tests/test_local_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ def test_publish_paths(tmp_path, patched_local_events, caplog): # noqa
with patched_local_events([filename]):
with patched_publisher() as messages:
local_watcher.file_publisher(fs_config=local_settings,
publisher_config=publisher_settings,
message_config=message_settings)
publisher_config=publisher_settings,
message_config=message_settings)

assert "uri" not in message_settings["data"]
assert len(messages) == 1
Expand All @@ -115,5 +115,5 @@ def test_publish_paths_forbids_passing_password(tmp_path, patched_local_events,
with patched_publisher():
with pytest.raises(SecurityError):
local_watcher.file_publisher(fs_config=local_settings,
publisher_config=publisher_settings,
message_config=message_settings)
publisher_config=publisher_settings,
message_config=message_settings)
Loading

0 comments on commit 0a4c946

Please sign in to comment.