diff --git a/tf2_ros_py/package.xml b/tf2_ros_py/package.xml
index 4c358f8b8..aa61acd1e 100644
--- a/tf2_ros_py/package.xml
+++ b/tf2_ros_py/package.xml
@@ -18,7 +18,8 @@
tf2_py
python3-pytest
-
+ sensor_msgs
+
ament_python
diff --git a/tf2_ros_py/tf2_ros/buffer.py b/tf2_ros_py/tf2_ros/buffer.py
index 894041e7a..080fef2a2 100644
--- a/tf2_ros_py/tf2_ros/buffer.py
+++ b/tf2_ros_py/tf2_ros/buffer.py
@@ -1,9 +1,9 @@
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
-#
+#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
-#
+#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
@@ -12,7 +12,7 @@
# * Neither the name of the Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
-#
+#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
@@ -26,6 +26,12 @@
# POSSIBILITY OF SUCH DAMAGE.
# author: Wim Meeussen
+from typing import TypeVar
+from typing import Optional
+from typing import Any
+from typing import List
+from typing import Tuple
+from typing import Callable
import threading
@@ -33,12 +39,18 @@
import tf2_py as tf2
import tf2_ros
from tf2_msgs.srv import FrameGraph
+from geometry_msgs.msg import TransformStamped
# TODO(vinnamkim): It seems rosgraph is not ready
# import rosgraph.masterapi
from time import sleep
+from rclpy.node import Node
+from rclpy.time import Time
from rclpy.duration import Duration
from rclpy.task import Future
+FrameGraphSrvRequest = TypeVar('FrameGraphSrvRequest')
+FrameGraphSrvResponse = TypeVar('FrameGraphSrvResponse')
+
class Buffer(tf2.BufferCore, tf2_ros.BufferInterface):
"""
@@ -48,15 +60,19 @@ class Buffer(tf2.BufferCore, tf2_ros.BufferInterface):
Stores known frames and offers a ROS service, "tf_frames", which responds to client requests
with a response containing a :class:`tf2_msgs.FrameGraph` representing the relationship of
- known frames.
+ known frames.
"""
- def __init__(self, cache_time = None, node = None):
+ def __init__(
+ self,
+ cache_time: Optional[Duration] = None,
+ node: Optional[Node] = None
+ ) -> None:
"""
Constructor.
- :param cache_time: (Optional) Duration object describing how long to retain past information in BufferCore.
- :param node: (Optional) If node create a tf2_frames service, It responses all frames as a yaml
+ :param cache_time: Duration object describing how long to retain past information in BufferCore.
+ :param node: Create a tf2_frames service that returns all frames as a yaml document.
"""
if cache_time is not None:
tf2.BufferCore.__init__(self, cache_time)
@@ -64,25 +80,29 @@ def __init__(self, cache_time = None, node = None):
tf2.BufferCore.__init__(self)
tf2_ros.BufferInterface.__init__(self)
- self._new_data_callbacks = []
- self._callbacks_to_remove = []
+ self._new_data_callbacks: List[Callable[[], None]] = []
+ self._callbacks_to_remove: List[Callable[[], None]] = []
self._callbacks_lock = threading.RLock()
if node is not None:
self.srv = node.create_service(FrameGraph, 'tf2_frames', self.__get_frames)
- def __get_frames(self, req, res):
- return FrameGraph.Response(frame_yaml=self.all_frames_as_yaml())
+ def __get_frames(
+ self,
+ req: FrameGraphSrvRequest,
+ res: FrameGraphSrvResponse
+ ) -> FrameGraphSrvResponse:
+ return FrameGraph.Response(frame_yaml=self.all_frames_as_yaml())
- def set_transform(self, *args, **kwargs):
+ def set_transform(self, *args: Tuple[TransformStamped, str], **kwargs: Any) -> None:
super().set_transform(*args, **kwargs)
self._call_new_data_callbacks()
- def set_transform_static(self, *args, **kwargs):
+ def set_transform_static(self, *args: Tuple[TransformStamped, str], **kwargs: Any) -> None:
super().set_transform_static(*args, **kwargs)
self._call_new_data_callbacks()
- def _call_new_data_callbacks(self):
+ def _call_new_data_callbacks(self) -> None:
with self._callbacks_lock:
for callback in self._new_data_callbacks:
callback()
@@ -91,82 +111,108 @@ def _call_new_data_callbacks(self):
self._new_data_callbacks.remove(callback)
self._callbacks_to_remove.clear()
- def _remove_callback(self, callback):
+ def _remove_callback(self, callback: Callable[[], None]) -> None:
with self._callbacks_lock:
# Actually remove the callback later
self._callbacks_to_remove.append(callback)
- def lookup_transform(self, target_frame, source_frame, time, timeout=Duration()):
+ def lookup_transform(
+ self,
+ target_frame: str,
+ source_frame: str,
+ time: Time,
+ timeout: Duration = Duration()
+ ) -> TransformStamped:
"""
Get the transform from the source frame to the target frame.
:param target_frame: Name of the frame to transform into.
:param source_frame: Name of the input frame.
- :param time: The time at which to get the transform. (0 will get the latest)
- :param timeout: (Optional) Time to wait for the target frame to become available.
+ :param time: The time at which to get the transform (0 will get the latest).
+ :param timeout: Time to wait for the target frame to become available.
:return: The transform between the frames.
- :rtype: :class:`geometry_msgs.msg.TransformStamped`
"""
-
self.can_transform(target_frame, source_frame, time, timeout)
return self.lookup_transform_core(target_frame, source_frame, time)
- async def lookup_transform_async(self, target_frame, source_frame, time):
+ async def lookup_transform_async(
+ self,
+ target_frame: str,
+ source_frame: str,
+ time: Time
+ ) -> TransformStamped:
"""
Get the transform from the source frame to the target frame asyncronously.
:param target_frame: Name of the frame to transform into.
:param source_frame: Name of the input frame.
- :param time: The time at which to get the transform. (0 will get the latest)
- :return: the transform
- :rtype: :class:`geometry_msgs.msg.TransformStamped`
+ :param time: The time at which to get the transform (0 will get the latest).
+ :return: The transform between the frames.
"""
await self.wait_for_transform_async(target_frame, source_frame, time)
return self.lookup_transform_core(target_frame, source_frame, time)
-
- def lookup_transform_full(self, target_frame, target_time, source_frame, source_time, fixed_frame, timeout=Duration()):
+ def lookup_transform_full(
+ self,
+ target_frame: str,
+ target_time: Time,
+ source_frame: str,
+ source_time: Time,
+ fixed_frame: str,
+ timeout: Duration = Duration()
+ ) -> TransformStamped:
"""
Get the transform from the source frame to the target frame using the advanced API.
:param target_frame: Name of the frame to transform into.
- :param target_time: The time to transform to. (0 will get the latest)
+ :param target_time: The time to transform to (0 will get the latest).
:param source_frame: Name of the input frame.
- :param source_time: The time at which source_frame will be evaluated. (0 will get the latest)
+ :param source_time: The time at which source_frame will be evaluated (0 will get the latest).
:param fixed_frame: Name of the frame to consider constant in time.
- :param timeout: (Optional) Time to wait for the target frame to become available.
+ :param timeout: Time to wait for the target frame to become available.
:return: The transform between the frames.
- :rtype: :class:`geometry_msgs.msg.TransformStamped`
"""
self.can_transform_full(target_frame, target_time, source_frame, source_time, fixed_frame, timeout)
return self.lookup_transform_full_core(target_frame, target_time, source_frame, source_time, fixed_frame)
- async def lookup_transform_full_async(self, target_frame, target_time, source_frame, source_time, fixed_frame):
+ async def lookup_transform_full_async(
+ self,
+ target_frame: str,
+ target_time: Time,
+ source_frame: str,
+ source_time: Time,
+ fixed_frame: str
+ ) -> TransformStamped:
"""
Get the transform from the source frame to the target frame using the advanced API asyncronously.
:param target_frame: Name of the frame to transform into.
- :param target_time: The time to transform to. (0 will get the latest)
+ :param target_time: The time to transform to (0 will get the latest).
:param source_frame: Name of the input frame.
- :param source_time: The time at which source_frame will be evaluated. (0 will get the latest)
+ :param source_time: The time at which source_frame will be evaluated (0 will get the latest).
:param fixed_frame: Name of the frame to consider constant in time.
- :return: the transform
- :rtype: :class:`geometry_msgs.msg.TransformStamped`
+ :return: The transform between the frames.
"""
await self.wait_for_transform_full_async(target_frame, target_time, source_frame, source_time, fixed_frame)
return self.lookup_transform_full_core(target_frame, target_time, source_frame, source_time, fixed_frame)
- def can_transform(self, target_frame, source_frame, time, timeout=Duration(), return_debug_tuple=False):
+ def can_transform(
+ self,
+ target_frame: str,
+ source_frame: str,
+ time: Time,
+ timeout: Duration = Duration(),
+ return_debug_tuple: bool = False
+ ) -> bool:
"""
Check if a transform from the source frame to the target frame is possible.
:param target_frame: Name of the frame to transform into.
:param source_frame: Name of the input frame.
- :param time: The time at which to get the transform. (0 will get the latest)
- :param timeout: (Optional) Time to wait for the target frame to become available.
- :param return_debug_type: (Optional) If true, return a tuple representing debug information.
+ :param time: The time at which to get the transform (0 will get the latest).
+ :param timeout: Time to wait for the target frame to become available.
+ :param return_debug_type: If true, return a tuple representing debug information.
:return: True if the transform is possible, false otherwise.
- :rtype: bool
"""
clock = rclpy.clock.Clock()
if timeout != Duration():
@@ -185,23 +231,29 @@ def can_transform(self, target_frame, source_frame, time, timeout=Duration(), re
return core_result
return core_result[0]
- def can_transform_full(self, target_frame, target_time, source_frame, source_time, fixed_frame, timeout=Duration(),
-
- return_debug_tuple=False):
+ def can_transform_full(
+ self,
+ target_frame: str,
+ target_time: Time,
+ source_frame: str,
+ source_time: Time,
+ fixed_frame: str,
+ timeout: Duration = Duration(),
+ return_debug_tuple: bool = False
+ ) -> bool:
"""
Check if a transform from the source frame to the target frame is possible (advanced API).
Must be implemented by a subclass of BufferInterface.
:param target_frame: Name of the frame to transform into.
- :param target_time: The time to transform to. (0 will get the latest)
+ :param target_time: The time to transform to (0 will get the latest).
:param source_frame: Name of the input frame.
- :param source_time: The time at which source_frame will be evaluated. (0 will get the latest)
+ :param source_time: The time at which source_frame will be evaluated (0 will get the latest).
:param fixed_frame: Name of the frame to consider constant in time.
- :param timeout: (Optional) Time to wait for the target frame to become available.
- :param return_debug_type: (Optional) If true, return a tuple representing debug information.
+ :param timeout: Time to wait for the target frame to become available.
+ :param return_debug_type: If true, return a tuple representing debug information.
:return: True if the transform is possible, false otherwise.
- :rtype: bool
"""
clock = rclpy.clock.Clock()
if timeout != Duration():
@@ -219,15 +271,19 @@ def can_transform_full(self, target_frame, target_time, source_frame, source_tim
return core_result
return core_result[0]
- def wait_for_transform_async(self, target_frame, source_frame, time):
+ def wait_for_transform_async(
+ self,
+ target_frame: str,
+ source_frame: str,
+ time: Time
+ ) -> Future:
"""
Wait for a transform from the source frame to the target frame to become possible.
:param target_frame: Name of the frame to transform into.
:param source_frame: Name of the input frame.
- :param time: The time at which to get the transform. (0 will get the latest)
- :return: A future that becomes true when the transform is available
- :rtype: rclpy.task.Future
+ :param time: The time at which to get the transform (0 will get the latest).
+ :return: A future that becomes true when the transform is available.
"""
fut = rclpy.task.Future()
if self.can_transform_core(target_frame, source_frame, time)[0]:
@@ -247,17 +303,23 @@ def _on_new_data():
return fut
- def wait_for_transform_full_async(self, target_frame, target_time, source_frame, source_time, fixed_frame):
+ def wait_for_transform_full_async(
+ self,
+ target_frame: str,
+ target_time: Time,
+ source_frame: str,
+ source_time: Time,
+ fixed_frame: str
+ ) -> Future:
"""
Wait for a transform from the source frame to the target frame to become possible.
:param target_frame: Name of the frame to transform into.
- :param target_time: The time to transform to. (0 will get the latest)
+ :param target_time: The time to transform to (0 will get the latest).
:param source_frame: Name of the input frame.
- :param source_time: The time at which source_frame will be evaluated. (0 will get the latest)
+ :param source_time: The time at which source_frame will be evaluated (0 will get the latest).
:param fixed_frame: Name of the frame to consider constant in time.
- :return: A future that becomes true when the transform is available
- :rtype: rclpy.task.Future
+ :return: A future that becomes true when the transform is available.
"""
fut = rclpy.task.Future()
if self.can_transform_full_core(target_frame, target_time, source_frame, source_time, fixed_frame)[0]:
diff --git a/tf2_ros_py/tf2_ros/buffer_client.py b/tf2_ros_py/tf2_ros/buffer_client.py
index 9d1ebd43e..8f822a29a 100644
--- a/tf2_ros_py/tf2_ros/buffer_client.py
+++ b/tf2_ros_py/tf2_ros/buffer_client.py
@@ -34,8 +34,14 @@
#*
#* Author: Eitan Marder-Eppstein
#***********************************************************
+from typing import TypeVar
+
+from geometry_msgs.msg import TransformStamped
+
+from rclpy.node import Node
from rclpy.action.client import ActionClient
from rclpy.duration import Duration
+from rclpy.time import Time
from rclpy.clock import Clock
from time import sleep
import tf2_py as tf2
@@ -44,11 +50,22 @@
from tf2_msgs.action import LookupTransform
+# Used for documentation purposes only
+LookupTransformGoal = TypeVar('LookupTransformGoal')
+LookupTransformResult = TypeVar('LookupTransformResult')
+
+
class BufferClient(tf2_ros.BufferInterface):
"""
Action client-based implementation of BufferInterface.
"""
- def __init__(self, node, ns, check_frequency = 10.0, timeout_padding = Duration(seconds=2.0)):
+ def __init__(
+ self,
+ node: Node,
+ ns: str,
+ check_frequency: float = 10.0,
+ timeout_padding: Duration = Duration(seconds=2.0)
+ ) -> None:
"""
.. function:: __init__(ns, check_frequency = 10.0, timeout_padding = rospy.Duration.from_sec(2.0))
@@ -66,16 +83,21 @@ def __init__(self, node, ns, check_frequency = 10.0, timeout_padding = Duration(
self.timeout_padding = timeout_padding
# lookup, simple api
- def lookup_transform(self, target_frame, source_frame, time, timeout=Duration()):
+ def lookup_transform(
+ self,
+ target_frame: str,
+ source_frame: str,
+ time: Time,
+ timeout: Duration = Duration()
+ ) -> TransformStamped:
"""
Get the transform from the source frame to the target frame.
:param target_frame: Name of the frame to transform into.
:param source_frame: Name of the input frame.
- :param time: The time at which to get the transform. (0 will get the latest)
- :param timeout: (Optional) Time to wait for the target frame to become available.
+ :param time: The time at which to get the transform (0 will get the latest).
+ :param timeout: Time to wait for the target frame to become available.
:return: The transform between the frames.
- :rtype: :class:`geometry_msgs.msg.TransformStamped`
"""
goal = LookupTransform.Goal()
goal.target_frame = target_frame
@@ -87,7 +109,15 @@ def lookup_transform(self, target_frame, source_frame, time, timeout=Duration())
return self.__process_goal(goal)
# lookup, advanced api
- def lookup_transform_full(self, target_frame, target_time, source_frame, source_time, fixed_frame, timeout=Duration()):
+ def lookup_transform_full(
+ self,
+ target_frame: str,
+ target_time: Time,
+ source_frame: str,
+ source_time: Time,
+ fixed_frame: str,
+ timeout: Duration = Duration()
+ ) -> TransformStamped:
"""
Get the transform from the source frame to the target frame using the advanced API.
@@ -96,9 +126,8 @@ def lookup_transform_full(self, target_frame, target_time, source_frame, source_
:param source_frame: Name of the input frame.
:param source_time: The time at which source_frame will be evaluated. (0 will get the latest)
:param fixed_frame: Name of the frame to consider constant in time.
- :param timeout: (Optional) Time to wait for the target frame to become available.
+ :param timeout: Time to wait for the target frame to become available.
:return: The transform between the frames.
- :rtype: :class:`geometry_msgs.msg.TransformStamped`
"""
goal = LookupTransform.Goal()
goal.target_frame = target_frame
@@ -112,17 +141,22 @@ def lookup_transform_full(self, target_frame, target_time, source_frame, source_
return self.__process_goal(goal)
# can, simple api
- def can_transform(self, target_frame, source_frame, time, timeout=Duration()):
+ def can_transform(
+ self,
+ target_frame: str,
+ source_frame: str,
+ time: Time,
+ timeout: Duration = Duration()
+ ) -> bool:
"""
Check if a transform from the source frame to the target frame is possible.
:param target_frame: Name of the frame to transform into.
:param source_frame: Name of the input frame.
:param time: The time at which to get the transform. (0 will get the latest)
- :param timeout: (Optional) Time to wait for the target frame to become available.
- :param return_debug_type: (Optional) If true, return a tuple representing debug information.
+ :param timeout: Time to wait for the target frame to become available.
+ :param return_debug_type: If true, return a tuple representing debug information.
:return: True if the transform is possible, false otherwise.
- :rtype: bool
"""
try:
self.lookup_transform(target_frame, source_frame, time, timeout)
@@ -130,9 +164,16 @@ def can_transform(self, target_frame, source_frame, time, timeout=Duration()):
except tf2.TransformException:
return False
-
# can, advanced api
- def can_transform_full(self, target_frame, target_time, source_frame, source_time, fixed_frame, timeout=Duration()):
+ def can_transform_full(
+ self,
+ target_frame: str,
+ target_time: Time,
+ source_frame: str,
+ source_time: Time,
+ fixed_frame: str,
+ timeout: Duration = Duration()
+ ) -> bool:
"""
Check if a transform from the source frame to the target frame is possible (advanced API).
@@ -143,10 +184,9 @@ def can_transform_full(self, target_frame, target_time, source_frame, source_tim
:param source_frame: Name of the input frame.
:param source_time: The time at which source_frame will be evaluated. (0 will get the latest)
:param fixed_frame: Name of the frame to consider constant in time.
- :param timeout: (Optional) Time to wait for the target frame to become available.
- :param return_debug_type: (Optional) If true, return a tuple representing debug information.
+ :param timeout: Time to wait for the target frame to become available.
+ :param return_debug_type: If true, return a tuple representing debug information.
:return: True if the transform is possible, false otherwise.
- :rtype: bool
"""
try:
self.lookup_transform_full(target_frame, target_time, source_frame, source_time, fixed_frame, timeout)
@@ -154,10 +194,10 @@ def can_transform_full(self, target_frame, target_time, source_frame, source_tim
except tf2.TransformException:
return False
- def __process_goal(self, goal):
+ def __process_goal(self, goal: LookupTransformGoal) -> TransformStamped:
# TODO(sloretz) why is this an action client? Service seems more appropriate.
if not self.action_client.server_is_ready():
- raise tf2.TimeoutException("The BufferServer is not ready")
+ raise tf2.TimeoutException("The BufferServer is not ready.")
event = threading.Event()
@@ -191,7 +231,7 @@ def unblock_by_timeout():
#This shouldn't happen, but could in rare cases where the server hangs
if not send_goal_future.done():
- raise tf2.TimeoutException("The LookupTransform goal sent to the BufferServer did not come back in the specified time. Something is likely wrong with the server")
+ raise tf2.TimeoutException("The LookupTransform goal sent to the BufferServer did not come back in the specified time. Something is likely wrong with the server.")
# Raises if future was given an exception
goal_handle = send_goal_future.result()
@@ -203,7 +243,7 @@ def unblock_by_timeout():
return self.__process_result(response.result)
- def __process_result(self, result):
+ def __process_result(self, result: LookupTransformResult) -> TransformStamped:
if result == None or result.error == None:
raise tf2.TransformException("The BufferServer returned None for result or result.error! Something is likely wrong with the server.")
if result.error.error != result.error.NO_ERROR:
diff --git a/tf2_ros_py/tf2_ros/buffer_interface.py b/tf2_ros_py/tf2_ros/buffer_interface.py
index 12e4aa85d..ecd677168 100644
--- a/tf2_ros_py/tf2_ros/buffer_interface.py
+++ b/tf2_ros_py/tf2_ros/buffer_interface.py
@@ -1,9 +1,9 @@
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
-#
+#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
-#
+#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
@@ -12,7 +12,7 @@
# * Neither the name of the Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
-#
+#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
@@ -26,15 +26,40 @@
# POSSIBILITY OF SUCH DAMAGE.
# author: Wim Meeussen
+from typing import Optional
+from typing import TypeVar
+from typing import Union
+from typing import Callable
+from typing import Tuple
+from typing import Any
import rclpy
import tf2_py as tf2
import tf2_ros
from copy import deepcopy
from std_msgs.msg import Header
+from geometry_msgs.msg import TransformStamped
+from geometry_msgs.msg import PointStamped
+from geometry_msgs.msg import PoseStamped
+from geometry_msgs.msg import PoseWithCovarianceStamped
+from geometry_msgs.msg import Vector3Stamped
+from sensor_msgs.msg import PointCloud2
+
from rclpy.time import Time
from rclpy.duration import Duration
+MsgStamped = Union[
+ PointStamped,
+ PoseStamped,
+ PoseWithCovarianceStamped,
+ Vector3Stamped,
+ PointCloud2
+ ]
+PyKDLType = TypeVar("PyKDLType")
+TransformableObject = Union[MsgStamped, PyKDLType]
+TransformableObjectType = TypeVar("TransformableObjectType")
+
+
class BufferInterface:
"""
Abstract interface for wrapping the Python bindings for the tf2 library in
@@ -42,11 +67,17 @@ class BufferInterface:
Implementations include :class:tf2_ros.buffer.Buffer and
:class:tf2_ros.buffer_client.BufferClient.
"""
- def __init__(self):
+ def __init__(self) -> None:
self.registration = tf2_ros.TransformRegistration()
# transform, simple api
- def transform(self, object_stamped, target_frame, timeout=Duration(), new_type = None):
+ def transform(
+ self,
+ object_stamped: TransformableObject,
+ target_frame: str,
+ timeout: Duration = Duration(),
+ new_type: Optional[TransformableObjectType] = None
+ ) -> TransformableObject:
"""
Transform an input into the target frame.
@@ -55,10 +86,10 @@ def transform(self, object_stamped, target_frame, timeout=Duration(), new_type =
If new_type is not None, the type specified must have a valid conversion from the input type,
else the function will raise an exception.
- :param object_stamped: The timestamped object the transform.
+ :param object_stamped: The timestamped object to transform.
:param target_frame: Name of the frame to transform the input into.
- :param timeout: (Optional) Time to wait for the target frame to become available.
- :param new_type: (Optional) Type to convert the object to.
+ :param timeout: Time to wait for the target frame to become available.
+ :param new_type: Type to convert the object to.
:return: The transformed, timestamped output, possibly converted to a new type.
"""
do_transform = self.registration.get(type(object_stamped))
@@ -70,7 +101,15 @@ def transform(self, object_stamped, target_frame, timeout=Duration(), new_type =
return convert(res, new_type)
# transform, advanced api
- def transform_full(self, object_stamped, target_frame, target_time, fixed_frame, timeout=Duration(), new_type = None):
+ def transform_full(
+ self,
+ object_stamped: TransformableObject,
+ target_frame: str,
+ target_time: Time,
+ fixed_frame: str,
+ timeout: Duration = Duration(),
+ new_type: Optional[TransformableObjectType] = None
+ ) -> TransformableObject:
"""
Transform an input into the target frame (advanced API).
@@ -82,24 +121,30 @@ def transform_full(self, object_stamped, target_frame, target_time, fixed_frame,
This function follows the advanced API, which allows tranforming between different time points,
as well as specifying a frame to be considered fixed in time.
- :param object_stamped: The timestamped object the transform.
+ :param object_stamped: The timestamped object to transform.
:param target_frame: Name of the frame to transform the input into.
:param target_time: Time to transform the input into.
:param fixed_frame: Name of the frame to consider constant in time.
- :param timeout: (Optional) Time to wait for the target frame to become available.
- :param new_type: (Optional) Type to convert the object to.
+ :param timeout: Time to wait for the target frame to become available.
+ :param new_type: Type to convert the object to.
:return: The transformed, timestamped output, possibly converted to a new type.
"""
do_transform = self.registration.get(type(object_stamped))
res = do_transform(object_stamped, self.lookup_transform_full(target_frame, target_time,
- object_stamped.header.frame_id, object_stamped.header.stamp,
+ object_stamped.header.frame_id, object_stamped.header.stamp,
fixed_frame, timeout))
if new_type == None:
return res
return convert(res, new_type)
- def lookup_transform(self, target_frame, source_frame, time, timeout=Duration()):
+ def lookup_transform(
+ self,
+ target_frame: str,
+ source_frame: str,
+ time: Time,
+ timeout: Duration = Duration()
+ ) -> TransformStamped:
"""
Get the transform from the source frame to the target frame.
@@ -107,32 +152,44 @@ def lookup_transform(self, target_frame, source_frame, time, timeout=Duration())
:param target_frame: Name of the frame to transform into.
:param source_frame: Name of the input frame.
- :param time: The time at which to get the transform. (0 will get the latest)
- :param timeout: (Optional) Time to wait for the target frame to become available.
+ :param time: The time at which to get the transform (0 will get the latest).
+ :param timeout: Time to wait for the target frame to become available.
:return: The transform between the frames.
- :rtype: :class:`geometry_msgs.msg.TransformStamped`
"""
raise NotImplementedException()
- def lookup_transform_full(self, target_frame, target_time, source_frame, source_time, fixed_frame, timeout=Duration()):
+ def lookup_transform_full(
+ self,
+ target_frame: str,
+ target_time: Time,
+ source_frame: str,
+ source_time: Time,
+ fixed_frame: str,
+ timeout: Duration = Duration()
+ ) -> TransformStamped:
"""
Get the transform from the source frame to the target frame using the advanced API.
Must be implemented by a subclass of BufferInterface.
:param target_frame: Name of the frame to transform into.
- :param target_time: The time to transform to. (0 will get the latest)
+ :param target_time: The time to transform to (0 will get the latest).
:param source_frame: Name of the input frame.
- :param source_time: The time at which source_frame will be evaluated. (0 will get the latest)
+ :param source_time: The time at which source_frame will be evaluated (0 will get the latest).
:param fixed_frame: Name of the frame to consider constant in time.
- :param timeout: (Optional) Time to wait for the target frame to become available.
+ :param timeout: Time to wait for the target frame to become available.
:return: The transform between the frames.
- :rtype: :class:`geometry_msgs.msg.TransformStamped`
"""
- raise NotImplementedException()
+ raise NotImplementedException()
# can, simple api
- def can_transform(self, target_frame, source_frame, time, timeout=Duration()):
+ def can_transform(
+ self,
+ target_frame: str,
+ source_frame: str,
+ time: Time,
+ timeout: Duration = Duration()
+ ) -> bool:
"""
Check if a transform from the source frame to the target frame is possible.
@@ -140,33 +197,43 @@ def can_transform(self, target_frame, source_frame, time, timeout=Duration()):
:param target_frame: Name of the frame to transform into.
:param source_frame: Name of the input frame.
- :param time: The time at which to get the transform. (0 will get the latest)
- :param timeout: (Optional) Time to wait for the target frame to become available.
+ :param time: The time at which to get the transform (0 will get the latest).
+ :param timeout: Time to wait for the target frame to become available.
:return: True if the transform is possible, false otherwise.
- :rtype: bool
"""
- raise NotImplementedException()
-
+ raise NotImplementedException()
+
# can, advanced api
- def can_transform_full(self, target_frame, target_time, source_frame, source_time, fixed_frame, timeout=Duration()):
+ def can_transform_full(
+ self,
+ target_frame: str,
+ target_time: Time,
+ source_frame: str,
+ source_time: Time,
+ fixed_frame: str,
+ timeout: Duration = Duration()
+ ) -> bool:
"""
Check if a transform from the source frame to the target frame is possible (advanced API).
Must be implemented by a subclass of BufferInterface.
:param target_frame: Name of the frame to transform into.
- :param target_time: The time to transform to. (0 will get the latest)
+ :param target_time: The time to transform to (0 will get the latest).
:param source_frame: Name of the input frame.
- :param source_time: The time at which source_frame will be evaluated. (0 will get the latest)
+ :param source_time: The time at which source_frame will be evaluated (0 will get the latest).
:param fixed_frame: Name of the frame to consider constant in time.
- :param timeout: (Optional) Time to wait for the target frame to become available.
+ :param timeout: Time to wait for the target frame to become available.
:return: True if the transform is possible, false otherwise.
- :rtype: bool
"""
- raise NotImplementedException()
+ raise NotImplementedException()
-def Stamped(obj, stamp, frame_id):
+def Stamped(
+ obj: TransformableObject,
+ stamp: Time,
+ frame_id: str
+) -> TransformableObject:
obj.header = Header(frame_id=frame_id, stamp=stamp)
return obj
@@ -177,66 +244,97 @@ class TypeException(Exception):
Raised when an unexpected type is received while registering a transform
in :class:`tf2_ros.buffer_interface.BufferInterface`.
"""
- def __init__(self, errstr):
+ def __init__(self, errstr: str) -> None:
self.errstr = errstr
+
class NotImplementedException(Exception):
"""
Raised when can_transform or lookup_transform is not implemented in a
subclass of :class:`tf2_ros.buffer_interface.BufferInterface`.
"""
- def __init__(self):
+ def __init__(self) -> None:
self.errstr = 'CanTransform or LookupTransform not implemented'
class TransformRegistration():
__type_map = {}
-
- def print_me(self):
+
+ def print_me(self) -> None:
print(TransformRegistration.__type_map)
- def add(self, key, callback):
+ def add(
+ self,
+ key: TransformableObjectType,
+ callback: Callable[[TransformableObject, TransformStamped], TransformableObject]
+ ) -> None:
TransformRegistration.__type_map[key] = callback
- def get(self, key):
+ def get(
+ self,
+ key: TransformableObjectType
+ ) -> Callable[[TransformableObject, TransformStamped], TransformableObject]:
if not key in TransformRegistration.__type_map:
- raise TypeException('Type %s if not loaded or supported'% str(key))
+ raise TypeException('Type %s is not loaded or supported' % str(key))
else:
return TransformRegistration.__type_map[key]
+
class ConvertRegistration():
__to_msg_map = {}
__from_msg_map = {}
__convert_map = {}
-
- def add_from_msg(self, key, callback):
+
+ def add_from_msg(
+ self,
+ key: TransformableObjectType,
+ callback: Callable[[MsgStamped], TransformableObject]
+ ) -> None:
ConvertRegistration.__from_msg_map[key] = callback
- def add_to_msg(self, key, callback):
+ def add_to_msg(
+ self,
+ key: TransformableObjectType,
+ callback: Callable[[TransformableObject], MsgStamped]
+ ) -> None:
ConvertRegistration.__to_msg_map[key] = callback
- def add_convert(self, key, callback):
+ def add_convert(
+ self,
+ key: Tuple[TransformableObjectType, TransformableObjectType],
+ callback: Callable[[Any], TransformableObject]
+ ) -> None:
ConvertRegistration.__convert_map[key] = callback
- def get_from_msg(self, key):
+ def get_from_msg(
+ self,
+ key: TransformableObjectType
+ ) -> Callable[[MsgStamped], TransformableObject]:
if not key in ConvertRegistration.__from_msg_map:
- raise TypeException('Type %s if not loaded or supported'% str(key))
+ raise TypeException('Type %s is not loaded or supported' % str(key))
else:
return ConvertRegistration.__from_msg_map[key]
- def get_to_msg(self, key):
+ def get_to_msg(
+ self,
+ key: TransformableObjectType
+ ) -> Callable[[TransformableObject], MsgStamped]:
if not key in ConvertRegistration.__to_msg_map:
- raise TypeException('Type %s if not loaded or supported'%str(key))
+ raise TypeException('Type %s is not loaded or supported' % str(key))
else:
return ConvertRegistration.__to_msg_map[key]
- def get_convert(self, key):
+ def get_convert(
+ self,
+ key: Tuple[TransformableObjectType, TransformableObjectType]
+ ) -> Callable[[Any], TransformableObject]:
if not key in ConvertRegistration.__convert_map:
- raise TypeException("Type %s if not loaded or supported" % str(key))
+ raise TypeException("Type %s is not loaded or supported" % str(key))
else:
return ConvertRegistration.__convert_map[key]
-def convert(a, b_type):
+
+def convert(a: TransformableObject, b_type: TransformableObjectType) -> TransformableObject:
c = ConvertRegistration()
#check if an efficient conversion function between the types exists
try:
diff --git a/tf2_ros_py/tf2_ros/static_transform_broadcaster.py b/tf2_ros_py/tf2_ros/static_transform_broadcaster.py
index 457359e36..ed179cddb 100644
--- a/tf2_ros_py/tf2_ros/static_transform_broadcaster.py
+++ b/tf2_ros_py/tf2_ros/static_transform_broadcaster.py
@@ -29,7 +29,11 @@
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
+from typing import Optional
+from typing import Union
+from typing import List
+from rclpy.node import Node
from rclpy.qos import DurabilityPolicy
from rclpy.qos import HistoryPolicy
from rclpy.qos import QoSProfile
@@ -42,7 +46,7 @@ class StaticTransformBroadcaster:
:class:`StaticTransformBroadcaster` is a convenient way to send static transformation on the ``"/tf_static"`` message topic.
"""
- def __init__(self, node, qos=None):
+ def __init__(self, node: Node, qos: Optional[Union[QoSProfile, int]] = None) -> None:
"""
Constructor.
@@ -57,12 +61,10 @@ def __init__(self, node, qos=None):
)
self.pub_tf = node.create_publisher(TFMessage, "/tf_static", qos)
- def sendTransform(self, transform):
+ def sendTransform(self, transform: Union[TransformStamped, List[TransformStamped]]) -> None:
if not isinstance(transform, list):
if hasattr(transform, '__iter__'):
transform = list(transform)
else:
transform = [transform]
self.pub_tf.publish(TFMessage(transforms=transform))
-
-
diff --git a/tf2_ros_py/tf2_ros/transform_broadcaster.py b/tf2_ros_py/tf2_ros/transform_broadcaster.py
index dfaf136a5..44f36f0a0 100644
--- a/tf2_ros_py/tf2_ros/transform_broadcaster.py
+++ b/tf2_ros_py/tf2_ros/transform_broadcaster.py
@@ -29,7 +29,11 @@
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
+from typing import Optional
+from typing import Union
+from typing import List
+from rclpy.node import Node
from rclpy.qos import QoSProfile
from tf2_msgs.msg import TFMessage
from geometry_msgs.msg import TransformStamped
@@ -39,7 +43,7 @@ class TransformBroadcaster:
"""
:class:`TransformBroadcaster` is a convenient way to send transformation updates on the ``"/tf"`` message topic.
"""
- def __init__(self, node, qos=QoSProfile(depth=100)):
+ def __init__(self, node: Node, qos: Union[QoSProfile, int] = QoSProfile(depth=100)) -> None:
"""
.. function:: __init__(node, qos=QoSProfile(depth=100))
@@ -50,7 +54,7 @@ def __init__(self, node, qos=QoSProfile(depth=100)):
"""
self.pub_tf = node.create_publisher(TFMessage, "/tf", qos)
- def sendTransform(self, transform):
+ def sendTransform(self, transform: Union[TransformStamped, List[TransformStamped]]) -> None:
"""
Send a transform, or a list of transforms, to the Buffer associated with this TransformBroadcaster.
@@ -62,5 +66,3 @@ def sendTransform(self, transform):
else:
transform = [transform]
self.pub_tf.publish(TFMessage(transforms=transform))
-
-
diff --git a/tf2_ros_py/tf2_ros/transform_listener.py b/tf2_ros_py/tf2_ros/transform_listener.py
index c7bbb15a8..e4c69b970 100644
--- a/tf2_ros_py/tf2_ros/transform_listener.py
+++ b/tf2_ros_py/tf2_ros/transform_listener.py
@@ -1,9 +1,9 @@
# Copyright (c) 2008, Willow Garage, Inc.
# All rights reserved.
-#
+#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
-#
+#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
@@ -12,7 +12,7 @@
# * Neither the name of the Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
-#
+#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
@@ -26,23 +26,36 @@
# POSSIBILITY OF SUCH DAMAGE.
# author: Wim Meeussen
+from typing import Optional
+from typing import Union
+from rclpy.node import Node
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.executors import SingleThreadedExecutor
from rclpy.qos import DurabilityPolicy
from rclpy.qos import HistoryPolicy
from rclpy.qos import QoSProfile
import tf2_ros
+from tf2_ros.buffer import Buffer
from tf2_msgs.msg import TFMessage
from threading import Thread
+
class TransformListener:
"""
:class:`TransformListener` is a convenient way to listen for coordinate frame transformation info.
This class takes an object that instantiates the :class:`BufferInterface` interface, to which
it propagates changes to the tf frame graph.
"""
- def __init__(self, buffer, node, *, spin_thread=False, qos=None, static_qos=None):
+ def __init__(
+ self,
+ buffer: Buffer,
+ node: Node,
+ *,
+ spin_thread: bool = False,
+ qos: Optional[Union[QoSProfile, int]] = None,
+ static_qos: Optional[Union[QoSProfile, int]] = None
+ ) -> None:
"""
Constructor.
@@ -85,26 +98,26 @@ def run_func():
self.dedicated_listener_thread = Thread(target=run_func)
self.dedicated_listener_thread.start()
- def __del__(self):
+ def __del__(self) -> None:
if hasattr(self, 'dedicated_listener_thread') and hasattr(self, 'executor'):
self.executor.shutdown()
self.dedicated_listener_thread.join()
self.unregister()
- def unregister(self):
+ def unregister(self) -> None:
"""
Unregisters all tf subscribers.
"""
self.node.destroy_subscription(self.tf_sub)
self.node.destroy_subscription(self.tf_static_sub)
- def callback(self, data):
+ def callback(self, data: TFMessage) -> None:
who = 'default_authority'
for transform in data.transforms:
self.buffer.set_transform(transform, who)
- def static_callback(self, data):
+ def static_callback(self, data: TFMessage) -> None:
who = 'default_authority'
for transform in data.transforms:
self.buffer.set_transform_static(transform, who)