Skip to content

Commit

Permalink
clean up graph_components.py
Browse files Browse the repository at this point in the history
  • Loading branch information
stevebachmeier committed Jan 7, 2025
1 parent 3e488af commit eb85ea1
Showing 1 changed file with 62 additions and 75 deletions.
137 changes: 62 additions & 75 deletions src/easylink/graph_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,60 +26,60 @@
class InputSlot:
"""An abstraction representing a single input slot to a specific :class:`~easylink.step.Step`.
In order to pass data between :class:`Steps<easylink.step.Step>`, an InputSlot
of one Step can be connected to an :class:`OutputSlot` of another Step via an :class:`EdgeParams`
In order to pass data between ``Steps``, an ``InputSlot`` of one ``Step`` can
be connected to an :class:`OutputSlot` of another ``Step`` via an :class:`EdgeParams`
instance.
"""

name: str
"""The name of the input slot."""
"""The name of the ``InputSlot``."""
env_var: str | None
"""The environment variable that this input slot will use to pass a list of data filepaths
to an Implementation."""
"""The environment variable that this ``InputSlot`` will use to pass a list
of data filepaths to an :class:`~easylink.implementation.Implementation`."""
validator: Callable[[str], None]
"""A callable that validates the input data being passed into the pipeline
via this input slot. If the data is invalid, the callable should raise an exception
"""A callable that validates the input data being passed into the pipeline via
this ``InputSlot``. If the data is invalid, the callable should raise an exception
with a descriptive error message which will then be reported to the user."""


@dataclass(frozen=True)
class OutputSlot:
"""An abstraction representing a single output slot from a specific :class:`~easylink.step.Step`.
In order to pass data between :class:`Steps<easylink.step.Step>`, an OutputSlot
of one Step can be connected to an :class:`InputSlot` of another Step via an :class:`EdgeParams`
In order to pass data between ``Steps``, an ``OutputSlot`` of one ``Step`` can
be connected to an :class:`InputSlot` of another ``Step`` via an :class:`EdgeParams`
instance.
Notes
-----
Input data is validated via the :class:`InputSlot's<InputSlot>` required
:attr:`~InputSlot.validator` attribute. In order to prevent multiple
validations of the same files (since outputs of one :class:`~easylink.step.Step`
can be inputs to another), no such validator is stored here on the OutputSlot.
Input data is validated via the ``InputSlot`` required :attr:`~InputSlot.validator`
attribute. In order to prevent multiple validations of the same files (since
outputs of one ``Step`` can be inputs to another), no such validator is stored
here on the ``OutputSlot``.
"""

name: str
"""The name of the output slot."""
"""The name of the ``OutputSlot``."""


@dataclass(frozen=True)
class EdgeParams:
"""A representation of an edge between two nodes (:class:`Steps<easylink.step.Step>`) in a graph.
EdgeParams connect the :class:`OutputSlot` of a source Step to the :class:`InputSlot`
of a target Step.
``EdgeParams`` connect the :class:`OutputSlot` of a source ``Step`` to the
:class:`InputSlot` of a target ``Step``.
"""

source_node: str
"""The name of the source node."""
"""The name of the source node/``Step``."""
target_node: str
"""The name of the target node."""
"""The name of the target node/``Step``."""
output_slot: str
"""The name of the :class:`OutputSlot` of the source node."""
"""The name of the ``OutputSlot`` of the source node/``Step``."""
input_slot: str
"""The name of the :class:`InputSlot` of the target node."""
"""The name of the ``InputSlot`` of the target node/``Step``."""
filepaths: tuple[str] | None = None
"""The filepaths that are passed from the source node to the target node."""
"""The filepaths that are passed from the source node/``Step`` to the target node/``Step``."""

@classmethod
def from_graph_edge(
Expand All @@ -88,16 +88,16 @@ def from_graph_edge(
sink: str,
edge_attrs: dict[str, OutputSlot | InputSlot | str | None],
) -> EdgeParams:
"""A convenience method to create an EdgeParams instance.
"""A convenience method to create an ``EdgeParams`` instance.
Parameters
----------
source
The name of the source node.
The name of the source node/``Step``.
sink
The name of the target node.
The name of the target node/``Step``.
edge_attrs
The attributes of the edge connecting the source and target nodes.
The attributes of the edge connecting the source and target nodes/``Steps``.
'output_slot' and 'input_slot' are required keys and 'filepaths' is optional.
"""
return cls(
Expand All @@ -110,53 +110,47 @@ def from_graph_edge(


class StepGraph(nx.MultiDiGraph):
"""A DAG of :class:`Steps<easylink.step.Step>` and the data dependencies between them.
"""A directed acyclic graph (DAG) of :class:`Steps<easylink.step.Step>` and the data dependencies between them.
StepGraphs are DAGs with :class:`Steps<easylink.step.Step>`
for nodes and the file dependencies between them for edges. Multiple edges
between nodes are permitted.
``StepGraphs`` are DAGs with ``Steps`` for nodes and the file dependencies between
them for edges. Multiple edges between nodes are permitted.
Notes
-----
These are high-level abstractions; they represent a conceptual pipeline
graph with no detail as to how each :class:`~easylink.step.Step` is implemented.
The highest level StepGraph is the that of the entire :class:`~easylink.pipeline_schema.PipelineSchema`.
graph with no detail as to how each ``Step`` is implemented.
See Also
--------
:class:`ImplementationGraph`
:class:`~easylink.pipeline_schema.PipelineSchema`
The highest level ``StepGraph`` is the that of the entire :class:`~easylink.pipeline_schema.PipelineSchema`.
"""

@property
def step_nodes(self) -> list[str]:
"""The topologically sorted list of node/:class:`~easylink.step.Step` names."""
"""The topologically sorted list of node/``Step`` names."""
ordered_nodes = list(nx.topological_sort(self))
return [node for node in ordered_nodes if node != "input_data" and node != "results"]

@property
def steps(self) -> list[Step]:
"""The list of all :class:`Steps<easylink.step.Step>` in the graph."""
"""The list of all ``Steps`` in the graph."""
return [self.nodes[node]["step"] for node in self.step_nodes]

def add_node_from_step(self, step: Step) -> None:
"""Adds a new node to the StepGraph.
"""Adds a new node to the ``StepGraph``.
Parameters
----------
step
The :class:`~easylink.step.Step` to add to the graph as a new node.
The ``Step`` to add to the graph as a new node.
"""
self.add_node(step.name, step=step)

def add_edge_from_params(self, edge_params: EdgeParams) -> None:
"""Adds a new edge to the StepGraph.
"""Adds a new edge to the ``StepGraph``.
Parameters
----------
edge_params
The :class:`EdgeParams` to add to the graph as a new edge.
The details of the new edge to be added to the graph.
"""
return self.add_edge(
edge_params.source_node,
Expand All @@ -171,62 +165,55 @@ def add_edge_from_params(self, edge_params: EdgeParams) -> None:


class ImplementationGraph(nx.MultiDiGraph):
"""A graph of :class:`Implementations<easylink.implementation.Implementation>`.
"""A directed acyclic graph (DAG) of :class:`Implementations<easylink.implementation.Implementation>`.
ImplementationGraphs are directed graphs with :class:`Implementations<easylink.implementation.Implementation>`
for nodes and the file dependencies between them for edges. Self-edges as well
as multiple edges between nodes are permitted.
``ImplementationGraphs`` are DAGs with ``Implementations`` for nodes and the
file dependencies between them for edges. Self-edges as well as multiple edges
between nodes are permitted.
Notes
-----
An ImplementationGraph is a low-level abstraction; it represents the *actual
An ``ImplementationGraph`` is a low-level abstraction; it represents the *actual
implementations* of each :class:`~easylink.step.Step` in the pipeline. This
is in contrast to a :class:`StepGraph`, which can be an intricate nested structure
due to the various complex and self-similar :class:`~easylink.step.Step` instances
(which represent abstract operations such as "loop this step N times"). An
ImplementationGraph is the flattened and concrete graph of
:class:`Implementations<easylink.implementation.Implementation>` to run.
due to the various complex and self-similar ``Step`` instances (which represent
abstract operations such as "loop this step N times"). An ``ImplementationGraph``
is the flattened and concrete graph of ``Implementations`` to run.
The highest level ImplementationGraph is the that of the entire
The highest level ``ImplementationGraph`` is the that of the entire
:class:`~easylink.pipeline_graph.PipelineGraph`.
See Also
--------
:class:`StepGraph`
:class:`~easylink.pipeline_graph.PipelineGraph`
"""

@property
def implementation_nodes(self) -> list[str]:
"""The topologically sorted list of node/:class:`~easylink.implementation.Implementation` names."""
"""The topologically sorted list of node/``Implementation`` names."""
ordered_nodes = list(nx.topological_sort(self))
return [node for node in ordered_nodes if node != "input_data" and node != "results"]

@property
def implementations(self) -> list[Implementation]:
"""The list of all :class:`Implementations<easylink.implementation.Implementation>` in the graph."""
"""The list of all ``Implementations`` in the graph."""
return [self.nodes[node]["implementation"] for node in self.implementation_nodes]

def add_node_from_implementation(self, node_name, implementation: Implementation) -> None:
"""Adds a new node to the ImplementationGraph.
"""Adds a new node to the ``ImplementationGraph``.
Parameters
----------
node_name
The name of the new node.
implementation
The :class:`~easylink.implementation.Implementation` to add to the graph
as a new node.
The ``Implementation`` to add to the graph as a new node.
"""
self.add_node(node_name, implementation=implementation)

def add_edge_from_params(self, edge_params: EdgeParams) -> None:
"""Adds a new edge to the ImplementationGraph.
"""Adds a new edge to the ``ImplementationGraph``.
Parameters
----------
edge_params
The :class:`EdgeParams` to add to the graph as a new edge.
The details of the new edge to be added to the graph.
"""
return self.add_edge(
edge_params.source_node,
Expand All @@ -243,32 +230,32 @@ def add_edge_from_params(self, edge_params: EdgeParams) -> None:

@dataclass(frozen=True)
class SlotMapping(ABC):
"""A mapping between a slot on a parent Step and a slot on (one of) its child Steps.
"""A mapping between a slot on a parent :class:`~easylink.step.Step` and a slot on one of its child ``Steps``.
SlotMapping is an interface intended to be used by concrete :class:`InputSlotMapping`
``SlotMapping`` is an interface intended to be used by concrete :class:`InputSlotMapping`
and :class:`OutputSlotMapping` classes. It represents a mapping between
parent and child nodes/:class:`Steps<easylink.step.Step>` at different levels
of a potentially-nested :class:`~easylink.pipeline_schema.PipelineSchema`.
parent and child nodes/``Steps`` at different levels of a potentially-nested
:class:`~easylink.pipeline_schema.PipelineSchema`.
"""

parent_slot: str
"""The name of the parent slot."""
child_node: str
"""The name of the child node."""
"""The name of the child node/``Step``."""
child_slot: str
"""The name of the child slot."""

@abstractmethod
def remap_edge(self, edge: EdgeParams) -> EdgeParams:
"""Remaps an edge to connect the parent and child nodes."""
"""Remaps an edge to connect the parent and child nodes/``Steps``."""
pass


class InputSlotMapping(SlotMapping):
"""A mapping between :class:`InputSlots<InputSlot>` of a parent node and a child node."""
"""A mapping between :class:`InputSlots<InputSlot>` of a parent node/:class:`~easylink.step.Step` and a child node/``Step``."""

def remap_edge(self, edge: EdgeParams) -> EdgeParams:
"""Remaps an edge's :class:`InputSlot`.
"""Remaps an edge's ``InputSlot``.
Parameters
----------
Expand All @@ -278,7 +265,7 @@ def remap_edge(self, edge: EdgeParams) -> EdgeParams:
Returns
-------
EdgeParams
The remapped edge.
The details of the remapped edge.
Raises
------
Expand All @@ -296,7 +283,7 @@ def remap_edge(self, edge: EdgeParams) -> EdgeParams:


class OutputSlotMapping(SlotMapping):
"""A mapping between :class:`InputSlots<InputSlot>` of a parent node and a child node."""
"""A mapping between :class:`InputSlots<InputSlot>` of a parent node/:class:`~easylink.step.Step` and a child node/``Step``."""

def remap_edge(self, edge: EdgeParams) -> EdgeParams:
"""Remaps an edge's :class:`OutputSlot`.
Expand All @@ -309,7 +296,7 @@ def remap_edge(self, edge: EdgeParams) -> EdgeParams:
Returns
-------
EdgeParams
The remapped edge.
The details of the remapped edge.
Raises
------
Expand Down

0 comments on commit eb85ea1

Please sign in to comment.