Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor nodelet manager #247

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
66 changes: 54 additions & 12 deletions src/rosdiscover/acme/acme.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

The main class provided by this module is :class:`AcmeGenerator`
"""
__all__ = ("AcmeGenerator", )
__all__ = ("AcmeGenerator",)

import json
import os
import subprocess
Expand All @@ -21,6 +22,7 @@

# Constants for Acme generation
from ..interpreter.context import Provenance
from ..interpreter.summary import NodeletManagerSummary, NodeletSummary

TOPIC_CONNECTOR = """ connector {conn_name} : TopicConnectorT = new TopicConnectorT extended with {{
{roles}
Expand Down Expand Up @@ -49,6 +51,20 @@
property launchedBy = "{filename}";
}};
"""
NODELET_COMPONENT = """ component {comp_name} : ROSNodeletCompT = new ROSNodeletCompT extended with {{
{ports}
property name = "{node_name}";
property node_manager = "{manager_name}";
property launchedBy = "{filename}";
}};
"""

NODELET_MANAGER = """ groups {comp_name}: ROSNodeletManagerGroup = new ROSNodeletManagerGroup extended with {{
members {{{members}}}
property name = "{node_name}";
property launchedBy = "{filename}";
"""

ATTACHMENT = " attachment {comp}.{port} to {conn}.{role};"
SERVICE_ATTACHMENT = " attachment {qualified_port} to {conn}.{role};"
SUBSCRIBER_ROLE = """ role {role_name} : ROSTopicSubscriberRoleT = new ROSTopicSubscriberRoleT;
Expand Down Expand Up @@ -160,10 +176,10 @@ def __init__(self,
self.__to_ignore = things_to_ignore if things_to_ignore is not None else []

def get_components_and_connectors(self) \
-> Tuple[List[NodeSummary],
Dict[str, _TopicInformation],
Dict[str, _ServiceInformation],
Dict[str, _ActionInformation]]:
-> Tuple[List[NodeSummary],
Dict[str, _TopicInformation],
Dict[str, _ServiceInformation],
Dict[str, _ActionInformation]]:
components: List[NodeSummary] = []
topics: Dict[str, _TopicInformation] = {}
services: Dict[str, _ServiceInformation] = {}
Expand Down Expand Up @@ -254,10 +270,13 @@ def generate_acme(self) -> str:
service_conns: Dict[str, dict] = {}
action_conns: Dict[str, dict] = {}
attachments_to_topic: Dict[str, List[str]] = {}
for c in components:
nodelet_manager_to_nodelet: Dict[str, List[str]] = {}
for c in [c for c in components if not isinstance(c, NodeletManagerSummary)]:
ports = []
comp_name = self.to_acme_name(c.name)

is_nodelet = isinstance(c, NodeletSummary)

for pub in [t for t in c.pubs if not t.implicit and not self._ignore(t.name)]:
if pub.name not in attachments_to_topic:
attachments_to_topic[pub.name] = []
Expand Down Expand Up @@ -323,12 +342,26 @@ def generate_acme(self) -> str:
name,
f"{comp_name}.{pname}",
False)
component_template: str = NODE_COMPONENT \
if c.provenance != Provenance.PLACEHOLDER else NODE_PLACEHOLDER_COMPONENT
comp = component_template.format(comp_name=comp_name,
ports='\n'.join(ports),
node_name=c.name,
filename=c.filename)
if is_nodelet:
assert isinstance(c, NodeletSummary)
comp = NODELET_COMPONENT.format(comp_name=comp_name,
ports='\n'.join(ports),
node_name=c.name,
filename=c.filename,
manager=c.nodelet_manager)
if c.nodelet_manager in nodelet_manager_to_nodelet:
nodelets = nodelet_manager_to_nodelet[c.nodelet_manager]
else:
nodelets = []
nodelet_manager_to_nodelet[c.nodelet_manager] = nodelets
nodelets += comp_name
else:
component_template: str = NODE_COMPONENT \
if c.provenance != Provenance.PLACEHOLDER else NODE_PLACEHOLDER_COMPONENT
comp = component_template.format(comp_name=comp_name,
ports='\n'.join(ports),
node_name=c.name,
filename=c.filename)
component_strs.append(comp)

acme = acme + "\n".join(component_strs)
Expand All @@ -338,6 +371,15 @@ def generate_acme(self) -> str:
self._process_services(service_conns, attachments, connector_strs)
self._process_actions(action_conns, attachments, connector_strs)

group_strs: List[str] = []
for c in [c for c in components if isinstance(c, NodeletManagerSummary)]:
comp_name = self.to_acme_name(c.name)
members = ", ".join(nodelet_manager_to_nodelet.get(c.name, []))
group = NODELET_MANAGER.format(comp_name=comp_name, members=members, node_name=c.name, filename=c.filename)
group_strs.append(group)

acme = acme + "\n".join(group_strs)

acme = acme + "\n".join(connector_strs)
acme = acme + "\n".join(attachments) + "}"
self.generate_acme_file(acme)
Expand Down
2 changes: 1 addition & 1 deletion src/rosdiscover/interpreter/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
The main class within this module is :class:`Interpreter`, which acts as a
model evaluator / virtual machine for a ROS architecture.
"""
from .context import NodeContext
from .context import NodeContext, NodeletContext, NodeletManagerContext
from .model import model, NodeModel
from .parameter import ParameterServer
from .plugin import ModelPlugin
Expand Down
98 changes: 49 additions & 49 deletions src/rosdiscover/interpreter/context.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
import re
import typing
from typing import Any, List, Mapping, Optional, Set, Tuple, Union
import typing as t

import attr
import dockerblade
Expand All @@ -14,7 +13,7 @@
from .summary import NodeSummary
from ..core import Action, Service, Topic

if typing.TYPE_CHECKING:
if t.TYPE_CHECKING:
from .plugin import ModelPlugin
from ..recover.symbolic import SymbolicUnknown

Expand All @@ -28,24 +27,23 @@ class NodeContext:
kind: str
package: str
args: str
remappings: Mapping[str, str]
remappings: t.Mapping[str, str]
launch_filename: str
app: "AppInstance" = attr.ib(repr=False)
_params: ParameterServer = attr.ib(repr=False)
_files: dockerblade.files.FileSystem = attr.ib(repr=False)
_nodelet: bool = attr.ib(default=False, repr=False)
_provenance: "Provenance" = attr.ib(default=Provenance.UNKNOWN, repr=False)
_uses: Set[Service] = attr.ib(factory=set, repr=False)
_provides: Set[Service] = attr.ib(factory=set, repr=False)
_subs: Set[Topic] = attr.ib(factory=set, repr=False)
_pubs: Set[Topic] = attr.ib(factory=set, repr=False)
_action_servers: Set[Action] = attr.ib(factory=set, repr=False)
_action_clients: Set[Action] = attr.ib(factory=set, repr=False)
_uses: t.Set[Service] = attr.ib(factory=set, repr=False)
_provides: t.Set[Service] = attr.ib(factory=set, repr=False)
_subs: t.Set[Topic] = attr.ib(factory=set, repr=False)
_pubs: t.Set[Topic] = attr.ib(factory=set, repr=False)
_action_servers: t.Set[Action] = attr.ib(factory=set, repr=False)
_action_clients: t.Set[Action] = attr.ib(factory=set, repr=False)
# The tuple is (name, dynamic) where name is the name of the parameter
# and dynamic is whether the node reacts to updates to the parameter via reconfigure
_reads: Set[Tuple[str, bool]] = attr.ib(factory=set, repr=False)
_writes: Set[str] = attr.ib(factory=set, repr=False)
_plugins: List['ModelPlugin'] = attr.ib(factory=list)
_reads: t.Set[t.Tuple[str, bool]] = attr.ib(factory=set, repr=False)
_writes: t.Set[str] = attr.ib(factory=set, repr=False)
_plugins: t.List['ModelPlugin'] = attr.ib(factory=list)

def merge(self, context: 'NodeContext') -> None:
self._params.update(context._params)
Expand Down Expand Up @@ -95,7 +93,6 @@ def summarise(self) -> NodeSummary:
namespace=self.namespace,
kind=self.kind,
package=self.package,
nodelet=self._nodelet,
provenance=self._provenance,
reads=self._reads,
writes=self._writes,
Expand All @@ -121,7 +118,7 @@ def _resolve_without_remapping(self, name: str) -> str:
else:
return rosname.namespace_join(self.namespace, name)

def resolve(self, name: Union[str, 'SymbolicUnknown']) -> str:
def resolve(self, name: t.Union[str, 'SymbolicUnknown']) -> str:
"""Resolves a given name within the context of this node.

Returns
Expand All @@ -140,26 +137,26 @@ def resolve(self, name: Union[str, 'SymbolicUnknown']) -> str:
logger.warning(f"Unable to resolve unknown name in NodeContext [{self.name}]")
return UNKNOWN_NAME

def _name_str(self, name: Union[str, 'SymbolicUnknown']) -> str:
def _name_str(self, name: t.Union[str, 'SymbolicUnknown']) -> str:
if isinstance(name, str):
return name
return "Unknown Symbol"

def provide(self, service: Union[str, 'SymbolicUnknown'], fmt: str) -> None:
def provide(self, service: t.Union[str, 'SymbolicUnknown'], fmt: str) -> None:
"""Instructs the node to provide a service."""
logger.debug(f"node [{self.name}] provides service [{self._name_str(service)}] "
f"using format [{fmt}]")
service_name_full = self.resolve(service)
self._provides.add(Service(name=service_name_full, format=fmt))

def use(self, service: Union[str, 'SymbolicUnknown'], fmt: str) -> None:
def use(self, service: t.Union[str, 'SymbolicUnknown'], fmt: str) -> None:
"""Instructs the node to use a given service."""
logger.debug(f"node [{self.name}] uses a service [{self._name_str(service)}] "
f"with format [{fmt}]")
service_name_full = self.resolve(service)
self._uses.add(Service(name=service_name_full, format=fmt))

def sub(self, topic_name: Union[str, 'SymbolicUnknown'], fmt: str, implicit: bool = False) -> None:
def sub(self, topic_name: t.Union[str, 'SymbolicUnknown'], fmt: str, implicit: bool = False) -> None:
"""Subscribes the node to a given topic.

Parameters
Expand All @@ -177,7 +174,7 @@ def sub(self, topic_name: Union[str, 'SymbolicUnknown'], fmt: str, implicit: boo
f"[{self._name_str(topic_name)}] with format [{fmt}]")
self._subs.add(Topic(name=topic_name_full, format=fmt, implicit=implicit))

def pub(self, topic_name: Union[str, 'SymbolicUnknown'], fmt: str, implicit: bool = False) -> None:
def pub(self, topic_name: t.Union[str, 'SymbolicUnknown'], fmt: str, implicit: bool = False) -> None:
"""Instructs the node to publish to a given topic.

Parameters
Expand All @@ -196,34 +193,34 @@ def pub(self, topic_name: Union[str, 'SymbolicUnknown'], fmt: str, implicit: boo
self._pubs.add(Topic(name=topic_name_full, format=fmt, implicit=implicit))

def read(self,
param: Union[str, 'SymbolicUnknown'],
default: Optional[Any] = None,
param: t.Union[str, 'SymbolicUnknown'],
default: t.Optional[t.Any] = None,
dynamic: bool = False
) -> Any:
) -> t.Any:
"""Obtains the value of a given parameter from the parameter server."""
logger.debug(f"node [{self.name}] reads parameter [{self._name_str(param)}]")
param = self.resolve(param)
self._reads.add((param, dynamic))
return self._params.get(param, default)

def write(self, param: Union[str, 'SymbolicUnknown'], val: Any) -> None:
def write(self, param: t.Union[str, 'SymbolicUnknown'], val: t.Any) -> None:
logger.debug(f"node [{self.name}] writes [{val}] to "
f"parameter [{self._name_str(param)}]")
param = self.resolve(param)
self._writes.add(param)
self._params[param] = val

# FIXME we _may_ want to record this interaction in our summary
def has_param(self, param: Union[str, 'SymbolicUnknown']) -> bool:
def has_param(self, param: t.Union[str, 'SymbolicUnknown']) -> bool:
"""Determines whether a given parameter has been defined."""
logger.debug(f"node [{self.name}] checks for existence of parameter [{param}]")
param = self.resolve(param)
return param in self._params

def delete_param(self, param: Union[str, 'SymbolicUnknown']) -> None:
def delete_param(self, param: t.Union[str, 'SymbolicUnknown']) -> None:
raise NotImplementedError("parameter deletion is not implemented")

def read_file(self, fn: Union[str, 'SymbolicUnknown']) -> str:
def read_file(self, fn: t.Union[str, 'SymbolicUnknown']) -> str:
"""Reads the contents of a text file."""
if isinstance(fn, str):
if not self._files.exists(fn):
Expand All @@ -234,11 +231,11 @@ def read_file(self, fn: Union[str, 'SymbolicUnknown']) -> str:
logger.warning(f"Unable to resolve unknown parameter filename in NodeContext [{self.name}]")
return UNKNOWN_NAME

def parameter_keys(self, prefix: str) -> typing.Iterable[str]:
def parameter_keys(self, prefix: str) -> t.Iterable[str]:
prefix = self.resolve(prefix)
return (key for key in self._params.keys() if key.startswith(prefix))

def action_server(self, ns: Union[str, 'SymbolicUnknown'], fmt: str) -> None:
def action_server(self, ns: t.Union[str, 'SymbolicUnknown'], fmt: str) -> None:
"""Creates a new action server.

Parameters
Expand All @@ -262,7 +259,7 @@ def action_server(self, ns: Union[str, 'SymbolicUnknown'], fmt: str) -> None:
self.pub(f'{ns}/feedback', f'{fmt}Feedback', implicit=True)
self.pub(f'{ns}/result', f'{fmt}Result', implicit=True)

def action_client(self, ns: Union[str, 'SymbolicUnknown'], fmt: str) -> None:
def action_client(self, ns: t.Union[str, 'SymbolicUnknown'], fmt: str) -> None:
"""Creates a new action client.

Parameters
Expand Down Expand Up @@ -294,29 +291,11 @@ def actions_have_topics(self):
else:
return distribution < ROSDistribution.FOXY

def load_nodelet(self, nodelet_context: 'NodeContext'):
self.merge(nodelet_context)
# In the recovered architecture, the nodelets themselves don't
# report what they publish etc.
# TODO: Fix this when we have NodeletManagerContexts and NodeletContexts
nodelet_context._params.clear()
nodelet_context._uses.clear()
nodelet_context._provides.clear()
nodelet_context._subs.clear()
nodelet_context._pubs.clear()
nodelet_context._action_servers.clear()
nodelet_context._action_clients.clear()
nodelet_context._reads.clear()
nodelet_context._writes.clear()

def load_plugin(self, plugin: 'ModelPlugin') -> None:
"""Loads a given dynamic plugin."""
logger.debug(f'loading plugin in node [{self.name}]: {plugin}')
self._plugins.append(plugin)

def mark_nodelet(self) -> None:
self._nodelet = True

def mark_placeholder(self) -> None:
self._provenance = Provenance.PLACEHOLDER

Expand All @@ -325,3 +304,24 @@ def mark_handwritten(self) -> None:

def mark_recovered(self) -> None:
self._provenance = Provenance.RECOVERED


@attr.s(slots=True, auto_attribs=True)
class NodeletManagerContext(NodeContext):
_nodelets: t.Collection['NodeletContext'] = attr.ib(factory=set, repr=False)

def load_nodelet(self, nodelet_context: 'NodeletContext') -> None:
self.merge(nodelet_context)
# In the recovered architecture, the nodelets themselves don't
# report what they publish etc.
nodelets = set(self._nodelets)
nodelets.add(nodelet_context)
self.__setattr__("_nodelets", nodelets)


@attr.s(slots=True, auto_attribs=True)
class NodeletContext(NodeContext):
_nodelet_manager: 'NodeletManagerContext' = attr.ib(default=None)

def set_nodelet_manager(self, manager: 'NodeletManagerContext') -> None:
self._nodelet_manager = manager
Loading