Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved DFP documentation, logging and fix MonitorStage #2106

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ci/release/update-version.sh
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ sed_runner "s|branch-${CURRENT_SHORT_TAG}|branch-${NEXT_SHORT_TAG}|g" manifest.y

# Depedencies file
sed_runner "s/mrc=${CURRENT_SHORT_TAG}/mrc=${NEXT_SHORT_TAG}/g" dependencies.yaml
sed_runner "s/morpheus-dfp=${CURRENT_SHORT_TAG}/morpheus-dfp=${NEXT_SHORT_TAG}/g" dependencies.yaml

# Generate the environment files based upon the updated dependencies.yaml
rapids-dependency-file-generator
Expand Down
2 changes: 1 addition & 1 deletion dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ dependencies:
common:
- output_types: [conda]
packages:
- morpheus-dfp=24.10
- morpheus-dfp=25.02
- tini=0.19
- pip:
- --extra-index-url https://download.pytorch.org/whl/cu124
Expand Down
9 changes: 9 additions & 0 deletions examples/digital_fingerprinting/fetch_example_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,8 @@ def fetch_dataset(dataset):
fs_hndl = s3fs.S3FileSystem(anon=True)
s3_base_path = os.path.join(S3_BASE_PATH, dataset)

download_count = 0

train_dir = f"{EXAMPLE_DATA_DIR}/dfp/{dataset}-training-data/"
if not os.path.exists(train_dir):
os.makedirs(train_dir)
Expand All @@ -552,6 +554,7 @@ def fetch_dataset(dataset):
if not exists(train_dir + f):
print(f"Downloading {f}")
fs_hndl.get_file(os.path.join(s3_base_path, f), train_dir + f)
download_count += 1

infer_dir = f"{EXAMPLE_DATA_DIR}/dfp/{dataset}-inference-data/"
if not exists(infer_dir):
Expand All @@ -562,6 +565,12 @@ def fetch_dataset(dataset):
if not os.path.exists(infer_dir + f):
print(f"Downloading {f}")
fs_hndl.get_file(os.path.join(s3_base_path, f), infer_dir + f)
download_count += 1

if download_count == 0:
print(f"No new files to download for {dataset} dataset")
else:
print(f"Download complete for {dataset} dataset")


def parse_args():
Expand Down
6 changes: 3 additions & 3 deletions examples/digital_fingerprinting/production/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
ARG BASE_IMG=nvcr.io/nvidia/cuda
ARG BASE_IMG_TAG=12.5.1-base-ubuntu22.04

FROM ${BASE_IMG}:${BASE_IMG_TAG} as base
FROM ${BASE_IMG}:${BASE_IMG_TAG} AS base

# Install necessary dependencies using apt-get
RUN apt-get update && apt-get install -y \
Expand Down Expand Up @@ -55,13 +55,13 @@ ENTRYPOINT [ "/opt/conda/envs/morpheus-dfp/bin/tini", "--", "/workspace/examples
SHELL ["/bin/bash", "-c"]

# ===== Setup for running unattended =====
FROM base as runtime
FROM base AS runtime

# Launch morpheus
CMD ["./launch.sh"]

# ===== Setup for running Jupyter =====
FROM base as jupyter
FROM base AS jupyter

# Install the jupyter specific requirements
RUN source activate morpheus-dfp &&\
Expand Down
63 changes: 26 additions & 37 deletions examples/digital_fingerprinting/production/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,32 @@ docker compose build
>
> This is most likely due to using an older version of the `docker-compose` command, instead re-run the build with `docker compose`. Refer to [Migrate to Compose V2](https://docs.docker.com/compose/migrate/) for more information.

### Fetch Example Data
The `examples/digital_fingerprinting/fetch_example_data.py` script can be used to fetch the Duo and Azure logs to run the example pipelines.

Download the data needed to run the DFP pipeline on Azure / Duo logs:
```bash
docker compose run fetch_data
```


### Running the services

The Morpheus DFP pipeline can be run from either a Jupyter Notebook using the `jupyter` service or from the command line using the `morpheus_pipeline` service. The `mlflow` service is also started in the background to provide a tracking URI for the Morpheus pipeline.

#### Optional MLflow Service
Starting either the `morpheus_pipeline` or the `jupyter` service, will start the `mlflow` service in the background. For debugging purposes it can be helpful to view the logs of the running MLflow service.

From the `examples/digital_fingerprinting/production` dir run:
```bash
docker compose up mlflow
```

By default, a MLflow dashboard will be available at:
```bash
http://localhost:5000
```

#### Jupyter Server
From the `examples/digital_fingerprinting/production` dir run:
```bash
Expand All @@ -70,7 +95,7 @@ Copy and paste the URL into a web browser. There are six notebooks included with

> **Note:** The token in the URL is a one-time use token, and a new one is generated with each invocation.

#### Morpheus Pipeline
#### Morpheus Pipeline Service
By default the `morpheus_pipeline` will run the training pipeline for Duo data, from the `examples/digital_fingerprinting/production` dir run:
```bash
docker compose up morpheus_pipeline
Expand Down Expand Up @@ -105,29 +130,6 @@ Both scripts are capable of running either a training or inference pipeline for
| `--help` | | Show this message and exit. |

##### Steps to Run Example Pipeline
The `examples/digital_fingerprinting/fetch_example_data.py` script can be used to fetch the Duo and Azure logs to run the example pipelines.

```bash
export DFP_HOME=examples/digital_fingerprinting
```

Usage of the script is as follows:
```bash
python $DFP_HOME/fetch_example_data.py --help

usage: Fetches training and inference data for DFP examples [-h] [{azure,duo,all} [{azure,duo,all} ...]]

positional arguments:
{azure,duo,all} Data set to fetch

optional arguments:
-h, --help show this help message and exit
```

Download the data needed to run a pipeline on Azure / Duo logs:
```bash
python $DFP_HOME/fetch_example_data.py all
```

Run Duo Training Pipeline:
```bash
Expand Down Expand Up @@ -156,19 +158,6 @@ The commands in the previous section run stage-based example DFP pipelines. The

Commands to run equivalent module-based DFP pipelines can be found [here](../../../docs/source/developer_guide/guides/10_modular_pipeline_digital_fingerprinting.md#running-example-modular-dfp-pipelines).

#### Optional MLflow Service
Starting either the `morpheus_pipeline` or the `jupyter` service, will start the `mlflow` service in the background. For debugging purposes it can be helpful to view the logs of the running MLflow service.

From the `examples/digital_fingerprinting/production` dir run:
```bash
docker compose up mlflow
```

By default, a MLflow dashboard will be available at:
```bash
http://localhost:5000
```

## Kubernetes deployment

The Morpheus project also maintains Helm charts and container images for Kubernetes deployment of Morpheus and MLflow (both for serving and for the Triton plugin). These are located in the NVIDIA GPU Cloud (NGC) [public catalog](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/morpheus/collections/morpheus_).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ channels:
dependencies:
- boto3=1.35
- kfp
- morpheus-dfp=24.10
- morpheus-dfp=25.02
- nodejs=18.*
- papermill=2.4.0
- pip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def _file_type_name_to_enum(file_type: str) -> FileTypes:
help="The location to cache data such as S3 downloads and pre-processed data",
)
@click.option("--log_level",
default=logging.getLevelName(Config().log_level),
default="INFO",
type=click.Choice(get_log_levels(), case_sensitive=False),
callback=parse_log_level,
help="Specify the logging level to use.")
Expand Down Expand Up @@ -141,9 +141,7 @@ def _file_type_name_to_enum(file_type: str) -> FileTypes:
type=click.Choice(["AUTO", "JSON", "CSV", "PARQUET"], case_sensitive=False),
default="JSON",
help="Override the detected file type. Values can be 'AUTO', 'JSON', 'CSV', or 'PARQUET'.",
callback=lambda _,
__,
value: None if value is None else _file_type_name_to_enum(value))
callback=lambda _, __, value: None if value is None else _file_type_name_to_enum(value))
@click.option('--watch_inputs',
type=bool,
is_flag=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def _file_type_name_to_enum(file_type: str) -> FileTypes:
help="The location to cache data such as S3 downloads and pre-processed data",
)
@click.option("--log_level",
default=logging.getLevelName(Config().log_level),
default="INFO",
type=click.Choice(get_log_levels(), case_sensitive=False),
callback=parse_log_level,
help="Specify the logging level to use.")
Expand Down Expand Up @@ -142,9 +142,7 @@ def _file_type_name_to_enum(file_type: str) -> FileTypes:
type=click.Choice(["AUTO", "JSON", "CSV", "PARQUET"], case_sensitive=False),
default="JSON",
help="Override the detected file type. Values can be 'AUTO', 'JSON', 'CSV', or 'PARQUET'.",
callback=lambda _,
__,
value: None if value is None else _file_type_name_to_enum(value))
callback=lambda _, __, value: None if value is None else _file_type_name_to_enum(value))
@click.option('--watch_inputs',
type=bool,
is_flag=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
help="The location to cache data such as S3 downloads and pre-processed data",
)
@click.option("--log_level",
default=logging.getLevelName(Config().log_level),
default="INFO",
type=click.Choice(get_log_levels(), case_sensitive=False),
callback=parse_log_level,
help="Specify the logging level to use.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
help="The location to cache data such as S3 downloads and pre-processed data",
)
@click.option("--log_level",
default=logging.getLevelName(Config().log_level),
default="INFO",
type=click.Choice(get_log_levels(), case_sensitive=False),
callback=parse_log_level,
help="Specify the logging level to use.")
Expand Down
18 changes: 18 additions & 0 deletions examples/digital_fingerprinting/production/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,24 @@ services:
cap_add:
- sys_nice

fetch_data:
build:
context: ./
dockerfile: ./Dockerfile
target: runtime
image: dfp_morpheus
networks:
- frontend
environment:
# Colorize the terminal in the container if possible
TERM: "${TERM:-}"
working_dir: /workspace/examples/digital_fingerprinting
command: ./fetch_example_data.py all
volumes:
- ../../..:/workspace
cap_add:
- sys_nice

grafana:
image: grafana/grafana:10.0.0
environment:
Expand Down
6 changes: 2 additions & 4 deletions examples/digital_fingerprinting/production/grafana/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def _file_type_name_to_enum(file_type: str) -> FileTypes:
help="The location to cache data such as S3 downloads and pre-processed data",
)
@click.option("--log_level",
default=logging.getLevelName(Config().log_level),
default="INFO",
type=click.Choice(get_log_levels(), case_sensitive=False),
callback=parse_log_level,
help="Specify the logging level to use.")
Expand Down Expand Up @@ -143,9 +143,7 @@ def _file_type_name_to_enum(file_type: str) -> FileTypes:
type=click.Choice(["AUTO", "JSON", "CSV", "PARQUET"], case_sensitive=False),
default="JSON",
help="Override the detected file type. Values can be 'AUTO', 'JSON', 'CSV', or 'PARQUET'.",
callback=lambda _,
__,
value: None if value is None else _file_type_name_to_enum(value))
callback=lambda _, __, value: None if value is None else _file_type_name_to_enum(value))
@click.option('--watch_inputs',
type=bool,
is_flag=True,
Expand Down
13 changes: 7 additions & 6 deletions python/morpheus/morpheus/stages/general/monitor_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from morpheus.config import Config
from morpheus.controllers.monitor_controller import MonitorController
from morpheus.messages import ControlMessage
from morpheus.messages import MessageMeta
from morpheus.pipeline.execution_mode_mixins import GpuAndCpuMixin
from morpheus.pipeline.pass_thru_type_mixin import PassThruTypeMixin
from morpheus.pipeline.single_port_stage import SinglePortStage
Expand Down Expand Up @@ -128,7 +129,10 @@ async def join(self):
self._mc.progress.close()

def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
if self._build_cpp_node():
if not self._mc.is_enabled():
return input_node

if self._build_cpp_node() and self._schema.input_type in (ControlMessage, MessageMeta):
if self._schema.input_type == ControlMessage:
node = _stages.MonitorControlMessageStage(builder,
self.unique_name,
Expand All @@ -137,7 +141,6 @@ def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) ->
self._mc._text_color,
self._mc._font_style,
self._mc._determine_count_fn)
node.launch_options.pe_count = self._config.num_threads
else:
node = _stages.MonitorMessageMetaStage(builder,
self.unique_name,
Expand All @@ -146,12 +149,10 @@ def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) ->
self._mc._text_color,
self._mc._font_style,
self._mc._determine_count_fn)
node.launch_options.pe_count = self._config.num_threads

else:
if not self._mc.is_enabled():
return input_node
node.launch_options.pe_count = self._config.num_threads

else:
# Use a component so we track progress using the upstream progress engine. This will provide more accurate
# results
node = builder.make_node_component(self.unique_name,
Expand Down
Loading