diff --git a/agimus_controller/agimus_controller/factory/robot_model.py b/agimus_controller/agimus_controller/factory/robot_model.py index c8be7d6e..efae318a 100644 --- a/agimus_controller/agimus_controller/factory/robot_model.py +++ b/agimus_controller/agimus_controller/factory/robot_model.py @@ -1,96 +1,208 @@ +from dataclasses import dataclass, field from copy import deepcopy -from dataclasses import dataclass -import numpy as np +from dataclasses import dataclass, field from pathlib import Path + +import coal +import numpy as np +import numpy.typing as npt import pinocchio as pin -from typing import Union @dataclass class RobotModelParameters: - q0_name = str() - free_flyer = False - locked_joint_names = [] - urdf = Path() | str - srdf = Path() | str - collision_as_capsule = False - self_collision = False - - -class RobotModelFactory: + q0: npt.NDArray[np.float64] = np.array( + [], dtype=np.float64 + ) # Initial configuration of the robot + free_flyer: bool = False # True if the robot has a free flyer + locked_joint_names: list[str] = field(default_factory=list) + urdf_path: Path = Path() # Path to the URDF file + srdf_path: Path | None = None # Path to the SRDF file + urdf_meshes_dir: Path | None = ( + Path() # Path to the directory containing the meshes and the URDF file. + ) + collision_as_capsule: bool = ( + False # True if the collision model should be reduced to capsules. + ) + # By default, the collision model when convexified is a sum of spheres and cylinders, often representing capsules. Here, all the couples sphere cylinder sphere are replaced by coal capsules. + self_collision: bool = False # If True, the collision model takes into account collisions pairs written in the srdf file. + armature: npt.NDArray[np.float64] = field( + default_factory=lambda: np.array([], dtype=np.float64) + ) # Default empty NumPy array + collision_color: npt.NDArray[np.float64] = ( + np.array([249.0, 136.0, 126.0, 125.0]) / 255.0 + ) # Red color for the collision model + + def __post_init__(self): + # Check q0 is not empty + if len(self.q0) == 0: + raise ValueError("q0 cannot be empty.") + + # Handle armature: + if self.armature.size == 0: + # Use a default armature filled with 0s, based on the size of q0 + self.armature = np.zeros(len(self.q0), dtype=np.float64) + + # Ensure armature has the same shape as q0 + if ( + self.armature.shape != self.q0.shape and not self.free_flyer + ): #! TODO: Do the same for free flyer + raise ValueError( + f"Armature must have the same shape as q0. Got {self.armature.shape} and {self.q0.shape}." + ) + + # Ensure paths are valid strings + if not self.urdf_path.is_file(): + raise ValueError("urdf_path must be a valid file path.") + + if self.srdf_path is not None and not self.srdf_path.is_file(): + raise ValueError("srdf_path must be a valid file path.") + + +class RobotModels: """Parse the robot model, reduce it and filter the collision model.""" - """Complete model of the robot.""" - _complete_model = pin.Model() - """ Complete model of the robot with collision meshes. """ - _complete_collision_model = pin.GeometryModel() - """ Complete model of the robot with visualization meshes. """ - _complete_visual_model = pin.GeometryModel() - """ Reduced model of the robot. """ - _rmodel = pin.Model() - """ Reduced model of the robot with visualization meshes. """ - _rcmodel = pin.GeometryModel() - """ Reduced model of the robot with collision meshes. """ - _rvmodel = pin.GeometryModel() - """ Default configuration q0. """ - _q0 = np.array([]) - """ Parameters of the model. """ - _params = RobotModelParameters() - """ Path to the collisions environment. """ - _env = Path() - - def load_model(self, param: RobotModelParameters, env: Union[Path, None]) -> None: + def __init__(self, param: RobotModelParameters): + """Parse the robot model, reduce it and filter the collision model. + + Args: + param (RobotModelParameters): Parameters to load the robot models. + """ self._params = param - self._env = env - self._load_pinocchio_models(param.urdf, param.free_flyer) - self._load_default_configuration(param.srdf, param.q0_name) - self._load_reduced_model(param.locked_joint_names, param.q0_name) - self._update_collision_model( - env, param.collision_as_capsule, param.self_collision, param.srdf + self._full_robot_model = None + self._robot_model = None + self._collision_model = None + self._visual_model = None + self._q0 = deepcopy(self._params.q0) + self.load_models() # Populate models + + @property + def full_robot_model(self) -> pin.Model: + """Full robot model.""" + if self._full_robot_model is None: + raise AttributeError("Full robot model has not been initialized yet.") + return self._full_robot_model + + @property + def robot_model(self) -> pin.Model: + """Robot model, reduced if specified in the parameters.""" + if self._robot_model is None: + raise AttributeError("Robot model has not been computed yet.") + return self._robot_model + + @property + def visual_model(self) -> pin.GeometryModel: + """Visual model of the robot.""" + if self._visual_model is None: + raise AttributeError("Visual model has not been computed yet.") + return self._visual_model + + @property + def collision_model(self) -> pin.GeometryModel: + """Collision model of the robot.""" + if self._collision_model is None: + raise AttributeError("Visual model has not been computed yet.") + return self._collision_model + + @property + def q0(self) -> np.array: + """Initial configuration of the robot.""" + return self._q0 + + def load_models(self) -> None: + """Load and prepare robot models based on parameters.""" + self._load_full_pinocchio_models() + if self._params.locked_joint_names: + self._apply_locked_joints() + if self._params.collision_as_capsule: + self._update_collision_model_to_capsules() + if self._params.self_collision: + self._update_collision_model_to_self_collision() + + def _load_full_pinocchio_models(self) -> None: + """Load the full robot model, the visual model and the collision model.""" + try: + ( + self._full_robot_model, + self._collision_model, + self._visual_model, + ) = pin.buildModelsFromUrdf( + str(self._params.urdf_path), + str(self._params.urdf_meshes_dir), + pin.JointModelFreeFlyer() if self._params.free_flyer else None, + ) + except Exception as e: + raise ValueError( + f"Failed to load URDF models from {self._params.urdf_path}: {e}" + ) + + def _apply_locked_joints(self) -> None: + """Apply locked joints.""" + joints_to_lock = [] + for jn in self._params.locked_joint_names: + if self._full_robot_model.existJointName(jn): + joints_to_lock.append(self._full_robot_model.getJointId(jn)) + else: + raise ValueError(f"Joint {jn} not found in the robot model.") + + self._robot_model = pin.buildReducedModel( + self._full_robot_model, joints_to_lock, self._q0 ) - def _load_pinocchio_models(self, urdf: Path, free_flyer: bool) -> None: - pass - - def _load_default_configuration(self, srdf_path: Path, q0_name: str) -> None: - pass - - def _load_reduced_model(self, locked_joint_names, q0_name) -> None: - pass - - def _update_collision_model( - self, - env: Union[Path, None], - collision_as_capsule: bool, - self_collision: bool, - srdf: Path, - ) -> None: - pass - - def create_complete_robot_model(self) -> pin.Model: - return self._complete_model.copy() - - def create_complete_collision_model(self) -> pin.GeometryModel: - return self._complete_collision_model.copy() - - def create_complete_visual_model(self) -> pin.GeometryModel: - return self._complete_visual_model.copy() - - def create_reduced_robot_model(self) -> pin.Model: - return self._rmodel.copy() - - def create_reduced_collision_model(self) -> pin.GeometryModel: - return self._rcmodel.copy() - - def create_reduced_visual_model(self) -> pin.GeometryModel: - return self._rvmodel.copy() - - def create_default_configuration(self) -> np.array: - return self._q0.copy() - - def create_model_parameters(self) -> RobotModelParameters: - return deepcopy(self._params) + def _update_collision_model_to_capsules(self) -> None: + """Update the collision model to capsules.""" + cmodel = self._collision_model.copy() + list_names_capsules = [] + # Iterate through geometry objects in the collision model + for geom_object in cmodel.geometryObjects: + geometry = geom_object.geometry + # Remove superfluous suffix from the name + base_name = "_".join(geom_object.name.split("_")[:-1]) + # Convert cylinders to capsules + if isinstance(geometry, coal.Cylinder): + name = self._generate_capsule_name(base_name, list_names_capsules) + list_names_capsules.append(name) + capsule = pin.GeometryObject( + name=name, + parent_frame=geom_object.parentFrame, + parent_joint=geom_object.parentJoint, + collision_geometry=coal.Capsule( + geometry.radius, geometry.halfLength + ), + placement=geom_object.placement, + ) + capsule.meshColor = self._params.collision_color + self._collision_model.addGeometryObject(capsule) + self._collision_model.removeGeometryObject(geom_object.name) + + # Remove spheres associated with links + elif isinstance(geometry, coal.Sphere) and "link" in geom_object.name: + self._collision_model.removeGeometryObject(geom_object.name) + + def _update_collision_model_to_self_collision(self) -> None: + """Update the collision model to self collision.""" + self._collision_model.addAllCollisionPairs() + pin.removeCollisionPairs( + self._robot_model, self._collision_model, str(self._params.srdf_path) + ) - def print_model(self): - print("full model =\n", self._complete_model) - print("reduced model =\n", self._rmodel) + def _generate_capsule_name(self, base_name: str, existing_names: list[str]) -> str: + """Generates a unique capsule name for a geometry object. + Args: + base_name (str): The base name of the geometry object. + existing_names (list): List of names already assigned to capsules. + Returns: + str: Unique capsule name. + """ + i = 0 + while f"{base_name}_capsule_{i}" in existing_names: + i += 1 + return f"{base_name}_capsule_{i}" + + @property + def armature(self) -> npt.NDArray[np.float64]: + """Armature of the robot. + Returns: + npt.NDArray[np.float64]: Armature of the robot. + """ + return self._params.armature diff --git a/agimus_controller/agimus_controller/mpc_data.py b/agimus_controller/agimus_controller/mpc_data.py index a01e0a94..a8ae345f 100644 --- a/agimus_controller/agimus_controller/mpc_data.py +++ b/agimus_controller/agimus_controller/mpc_data.py @@ -16,14 +16,13 @@ class OCPResults: @dataclass class OCPDebugData: - # Solver infos - problem_solved: bool = False - # Debug data result: list[TrajectoryPoint] references: list[TrajectoryPoint] kkt_norms: list[np.float64] collision_distance_residuals: list[dict[np.float64]] + # Solver infos + problem_solved: bool = False @dataclass diff --git a/agimus_controller/agimus_controller/ocp_base.py b/agimus_controller/agimus_controller/ocp_base.py index a0475b46..19f9a77c 100644 --- a/agimus_controller/agimus_controller/ocp_base.py +++ b/agimus_controller/agimus_controller/ocp_base.py @@ -1,11 +1,17 @@ from abc import ABC, abstractmethod + import numpy as np +import numpy.typing as npt from agimus_controller.mpc_data import OCPResults, OCPDebugData from agimus_controller.trajectory import WeightedTrajectoryPoint class OCPBase(ABC): + """Base class for the Optimal Control Problem (OCP) solver. + + This class defines the interface for the OCP solver.""" + def __init__(self) -> None: pass @@ -13,30 +19,72 @@ def __init__(self) -> None: def set_reference_horizon( self, reference_trajectory: list[WeightedTrajectoryPoint] ) -> None: - ... + """Set the reference trajectory and the weights of the costs for the OCP solver. This method should be implemented by the derived class.""" + pass - @abstractmethod @property + @abstractmethod def horizon_size() -> int: - ... + """Returns the horizon size of the OCP. + + Returns: + int: size of the horizon. + """ + pass - @abstractmethod @property - def dt() -> int: - ... + @abstractmethod + def dt() -> float: + """Returns the time step of the OCP in seconds. + + Returns: + int: time step of the OCP. + """ + pass @abstractmethod def solve( - self, x0: np.ndarray, x_init: list[np.ndarray], u_init: list[np.ndarray] + self, + x0: npt.NDArray[np.float64], + x_warmstart: list[npt.NDArray[np.float64]], + u_warmstart: list[npt.NDArray[np.float64]], ) -> None: - ... + """Solver for the OCP. This method should be implemented by the derived class. + The method should solve the OCP for the given initial state and warmstart values. + + Args: + x0 (npt.NDArray[np.float64]): current state of the robot. + x_warmstart (list[npt.NDArray[np.float64]]): Warmstart values for the state. This doesn't include the current state. + u_warmstart (list[npt.NDArray[np.float64]]): Warmstart values for the control inputs. + """ + pass - @abstractmethod @property + @abstractmethod def ocp_results(self) -> OCPResults: - ... + """Returns the results of the OCP solver. + The solve method should be called before calling this method. + + Returns: + OCPResults: Class containing the results of the OCP solver. + """ + pass + + @ocp_results.setter + def ocp_results(self, value: OCPResults) -> None: + """Set the output data structure of the OCP. + + Args: + value (OCPResults): New output data structure of the OCP. + """ + pass - @abstractmethod @property + @abstractmethod def debug_data(self) -> OCPDebugData: - ... + """Returns the debug data of the OCP solver. + + Returns: + OCPDebugData: Class containing the debug data of the OCP solver. + """ + pass diff --git a/agimus_controller/agimus_controller/ocp_base_croco.py b/agimus_controller/agimus_controller/ocp_base_croco.py new file mode 100644 index 00000000..f18bece0 --- /dev/null +++ b/agimus_controller/agimus_controller/ocp_base_croco.py @@ -0,0 +1,144 @@ +from abc import abstractmethod + +import crocoddyl +import mim_solvers +import numpy as np +import numpy.typing as npt + +from agimus_controller.factory.robot_model import RobotModels +from agimus_controller.mpc_data import OCPResults, OCPDebugData +from agimus_controller.ocp_base import OCPBase +from agimus_controller.ocp_param_base import OCPParamsBaseCroco +from agimus_controller.trajectory import TrajectoryPointWeights + + +class OCPBaseCroco(OCPBase): + def __init__( + self, + robot_model: RobotModels, + ocp_params: OCPParamsBaseCroco, + ) -> None: + """Defines common behavior for all OCP using croccodyl. This is an abstract class with some helpers to design OCPs in a more friendly way. + + Args: + robot_model (RobotModelFactory): All models of the robot. + ocp_params (OCPParamsBaseCroco): Input data structure of the OCP. + """ + # Setting the robot model + self._robot_model = robot_model + self._rmodel = self._robot_model.robot_model + self._cmodel = self._robot_model.collision_model + self._armature = self._robot_model._params.armature + + # Stat and actuation model + self._state = crocoddyl.StateMultibody(self._rmodel) + self._actuation = crocoddyl.ActuationModelFull(self._state) + + # Setting the OCP parameters + self._ocp_params = ocp_params + self._ocp = None + self._ocp_results = None + + @property + def horizon_size(self) -> int: + """Number of time steps in the horizon.""" + return self._ocp_params.horizon_size + + @property + def dt(self) -> float: + """Integration step of the OCP.""" + return self._ocp_params.dt + + @abstractmethod + def create_running_model_list(self) -> list[crocoddyl.ActionModelAbstract]: + """Create the list of running models.""" + pass + + @abstractmethod + def create_terminal_model(self) -> crocoddyl.ActionModelAbstract: + """Create the terminal model.""" + pass + + @abstractmethod + def update_crocoddyl_problem( + self, + x0: npt.NDArray[np.float64], + weighted_trajectory_points: list[TrajectoryPointWeights], + ) -> None: + """Update the Crocoddyl problem's references, weights and x0.""" + pass + + def solve( + self, + x0: npt.NDArray[np.float64], + x_warmstart: list[npt.NDArray[np.float64]], + u_warmstart: list[npt.NDArray[np.float64]], + ) -> None: + """Solves the OCP. + The results can be accessed through the ocp_results property. + + Args: + x0 (npt.NDArray[np.float64]): Current state of the robot. + x_warmstart (list[npt.NDArray[np.float64]]): Predicted states for the OCP. + u_warmstart (list[npt.NDArray[np.float64]]): Predicted control inputs for the OCP. + """ + ### Creation of the state and actuation models + + if self._ocp is None: + # Create the running models + self.running_model_list = self.create_running_model_list() + # Create the terminal model + self.terminal_model = self.create_terminal_model() + # Create the shooting problem + self._problem = crocoddyl.ShootingProblem( + x0, self.running_model_list, self.terminal_model + ) + # Create solver + callbacks + self._ocp = mim_solvers.SolverCSQP(self._problem) + + # Merit function + self._ocp.use_filter_line_search = self._ocp_params.use_filter_line_search + + # Parameters of the solver + self._ocp.termination_tolerance = self._ocp_params.termination_tolerance + self._ocp.max_qp_iters = self._ocp_params.qp_iters + self._ocp.eps_abs = self._ocp_params.eps_abs + self._ocp.eps_rel = self._ocp_params.eps_rel + self._ocp.with_callbacks = self._ocp_params.callbacks + + # Creating the warmstart lists for the solver + # Solve the OCP + self._ocp.solve([x0] + x_warmstart, u_warmstart, self._ocp_params.solver_iters) + + # Store the results + self.ocp_results = OCPResults( + states=self._ocp.xs, + ricatti_gains=self._ocp.K, + feed_forward_terms=self._ocp.us, + ) + + @property + def ocp_results(self) -> OCPResults: + """Output data structure of the OCP. + + Returns: + OCPResults: Output data structure of the OCP. It contains the states, Ricatti gains, and feed-forward terms. + """ + return self._ocp_results + + @ocp_results.setter + def ocp_results(self, value: OCPResults) -> None: + """Set the output data structure of the OCP. + + Args: + value (OCPResults): New output data structure of the OCP. + """ + self._ocp_results = value + + @property + def debug_data(self) -> OCPDebugData: + pass + + @debug_data.setter + def debug_data(self, value: OCPDebugData) -> None: + pass diff --git a/agimus_controller/agimus_controller/ocp_param_base.py b/agimus_controller/agimus_controller/ocp_param_base.py new file mode 100644 index 00000000..040d2691 --- /dev/null +++ b/agimus_controller/agimus_controller/ocp_param_base.py @@ -0,0 +1,21 @@ +from dataclasses import dataclass + +import numpy as np + + +@dataclass +class OCPParamsBaseCroco: + """Input data structure of the OCP.""" + + # Data relevant to solve the OCP + dt: float # Integration step of the OCP + horizon_size: int # Number of time steps in the horizon + solver_iters: int # Number of solver iterations + qp_iters: int = 200 # Number of QP iterations (must be a multiple of 25). + termination_tolerance: float = ( + 1e-3 # Termination tolerance (norm of the KKT conditions) + ) + eps_abs: float = 1e-6 # Absolute tolerance of the solver + eps_rel: float = 0.0 # Relative tolerance of the solver + callbacks: bool = False # Flag to enable/disable callbacks + use_filter_line_search = False # Flag to enable/disable the filter line searchs diff --git a/agimus_controller/tests/ressources/simple_ocp_croco_results.npy b/agimus_controller/tests/ressources/simple_ocp_croco_results.npy new file mode 100644 index 00000000..1f976f0b Binary files /dev/null and b/agimus_controller/tests/ressources/simple_ocp_croco_results.npy differ diff --git a/agimus_controller/tests/test_ocp_croco_base.py b/agimus_controller/tests/test_ocp_croco_base.py new file mode 100644 index 00000000..9fa81a96 --- /dev/null +++ b/agimus_controller/tests/test_ocp_croco_base.py @@ -0,0 +1,250 @@ +from os.path import dirname +import unittest +from unittest.mock import MagicMock +from pathlib import Path +import crocoddyl +import example_robot_data as robex +import numpy as np +import pinocchio as pin + +from agimus_controller.ocp_base_croco import OCPBaseCroco +from agimus_controller.ocp_param_base import OCPParamsBaseCroco +from agimus_controller.factory.robot_model import RobotModels, RobotModelParameters + + +class TestOCPBaseCroco(unittest.TestCase): + @classmethod + def setUpClass(self): + # Mock the RobotModelFactory and OCPParamsCrocoBase + robot = robex.load("panda") + urdf_path = Path(robot.urdf) + srdf_path = Path(robot.urdf.replace("urdf", "srdf")) + urdf_meshes_dir = urdf_path.parent.parent.parent.parent.parent + free_flyer = False + q0 = np.zeros(robot.model.nq) + armature = np.full(robot.model.nq, 0.1) + + # Store shared initial parameters + self.params = RobotModelParameters( + q0=q0, + free_flyer=free_flyer, + locked_joint_names=["panda_joint1", "panda_joint2"], + urdf_path=urdf_path, + srdf_path=srdf_path, + urdf_meshes_dir=urdf_meshes_dir, + collision_as_capsule=True, + self_collision=True, + armature=armature, + ) + + self.robot_models = RobotModels(self.params) + self.robot_model = self.robot_models.robot_model + self.collision_model = self.robot_models.collision_model + + self.ocp_params = OCPParamsBaseCroco( + dt=0.1, horizon_size=10, solver_iters=100, callbacks=True + ) + + # Create a concrete implementation of OCPBaseCroco + class TestOCPCroco(OCPBaseCroco): + def create_running_model_list(self): + return None + + def create_terminal_model(self): + return None + + def update_crocoddyl_problem(self, x0, trajectory_points_list): + return None + + def set_reference_horizon(self, horizon_size): + return None + + self.ocp = TestOCPCroco(self.robot_models, self.ocp_params) + + def test_horizon_size(self): + """Test the horizon_size property.""" + self.assertEqual(self.ocp.horizon_size, self.ocp_params.horizon_size) + + def test_dt(self): + """Test the dt property.""" + self.assertAlmostEqual(self.ocp.dt, self.ocp_params.dt) + + +class TestSimpleOCPCroco(unittest.TestCase): + class TestOCPCroco(OCPBaseCroco): + def create_running_model_list(self): + # Running cost model + running_cost_model = crocoddyl.CostModelSum(self._state) + + ### Creation of cost terms + # State Regularization cost + x_residual = crocoddyl.ResidualModelState( + self._state, + np.concatenate((pin.neutral(self._rmodel), np.zeros(self._rmodel.nv))), + ) + x_reg_cost = crocoddyl.CostModelResidual(self._state, x_residual) + + # Control Regularization cost + u_residual = crocoddyl.ResidualModelControl(self._state) + u_reg_cost = crocoddyl.CostModelResidual(self._state, u_residual) + + # End effector frame cost + frame_placement_residual = crocoddyl.ResidualModelFramePlacement( + self._state, + self._rmodel.getFrameId("panda_hand_tcp"), + pin.SE3(np.eye(3), np.array([1.0, 1.0, 1.0])), + ) + + goal_tracking_cost = crocoddyl.CostModelResidual( + self._state, frame_placement_residual + ) + running_cost_model.addCost("stateReg", x_reg_cost, 0.1) + running_cost_model.addCost("ctrlRegGrav", u_reg_cost, 0.0001) + running_cost_model.addCost("gripperPoseRM", goal_tracking_cost, 1.0) + # Create Differential Action Model (DAM), i.e. continuous dynamics and cost functions + running_DAM = crocoddyl.DifferentialActionModelFreeFwdDynamics( + self._state, + self._actuation, + running_cost_model, + ) + running_model = crocoddyl.IntegratedActionModelEuler( + running_DAM, + ) + running_model.differential.armature = self._robot_model.armature + + running_model_list = [running_model] * (self._ocp_params.horizon_size - 1) + return running_model_list + + def create_terminal_model(self): + # Terminal cost models + terminal_cost_model = crocoddyl.CostModelSum(self._state) + + ### Creation of cost terms + # State Regularization cost + x_residual = crocoddyl.ResidualModelState( + self._state, + np.concatenate((pin.neutral(self._rmodel), np.zeros(self._rmodel.nv))), + ) + x_reg_cost = crocoddyl.CostModelResidual(self._state, x_residual) + + # End effector frame cost + frame_placement_residual = crocoddyl.ResidualModelFramePlacement( + self._state, + self._rmodel.getFrameId("panda_hand_tcp"), + pin.SE3(np.eye(3), np.array([1.0, 1.0, 1.0])), + ) + + goal_tracking_cost = crocoddyl.CostModelResidual( + self._state, frame_placement_residual + ) + terminal_cost_model.addCost("stateReg", x_reg_cost, 0.1) + terminal_cost_model.addCost("gripperPose", goal_tracking_cost, 50) + + # Create Differential Action Model (DAM), i.e. continuous dynamics and cost functions + terminal_DAM = crocoddyl.DifferentialActionModelFreeFwdDynamics( + self._state, + self._actuation, + terminal_cost_model, + ) + + terminal_model = crocoddyl.IntegratedActionModelEuler(terminal_DAM, 0.0) + terminal_model.differential.armature = self._robot_model.armature + + return terminal_model + + def set_reference_horizon(self, horizon_size): + ### Not implemented in this OCP example. + return None + + def update_crocoddyl_problem(self, x0, trajectory_points_list): + ### Not implemented in this OCP example. + return None + + @classmethod + def setUpClass(self): + # Mock the RobotModelFactory and OCPParamsCrocoBase + robot = robex.load("panda") + urdf_path = Path(robot.urdf) + srdf_path = Path(robot.urdf.replace("urdf", "srdf")) + urdf_meshes_dir = urdf_path.parent.parent.parent.parent.parent + free_flyer = False + q0 = np.zeros(robot.model.nq) + + # Store shared initial parameters + self.params = RobotModelParameters( + q0=q0, + free_flyer=free_flyer, + locked_joint_names=["panda_joint1", "panda_joint2"], + urdf_path=urdf_path, + srdf_path=srdf_path, + urdf_meshes_dir=urdf_meshes_dir, + collision_as_capsule=True, + self_collision=True, + armature=np.linspace(0.1, 0.9, robot.model.nq), + ) + + self.robot_models = RobotModels(self.params) + self.robot_model = self.robot_models.robot_model + self.collision_model = self.robot_models.collision_model + + # Set mock parameters + self.ocp_params = OCPParamsBaseCroco( + dt=0.1, horizon_size=10, solver_iters=100, callbacks=True + ) + self.state_reg = np.concatenate( + (pin.neutral(self.robot_model), np.zeros(self.robot_model.nv)) + ) + self.state_warmstart = [np.zeros(self.robot_model.nq + self.robot_model.nv)] * ( + self.ocp_params.horizon_size - 1 + ) # The first state is the current state + self.control_warmstart = [np.zeros(self.robot_model.nq)] * ( + self.ocp_params.horizon_size - 1 + ) + # Create a concrete implementation of OCPBaseCroco + self.ocp = self.TestOCPCroco(self.robot_models, self.ocp_params) + self.ocp.solve(self.state_reg, self.state_warmstart, self.control_warmstart) + # self.save_results() + + def save_results(self): + results = np.array( + [ + self.ocp.ocp_results.states.tolist(), + self.ocp.ocp_results.ricatti_gains.tolist(), + self.ocp.ocp_results.feed_forward_terms.tolist(), + ], + dtype=object, # Ensure the array is dtype=object before saving + ) + np.save( + "ressources/simple_ocp_croco_results.npy", + results, + ) + + def test_check_results(self): + results = np.load("ressources/simple_ocp_croco_results.npy", allow_pickle=True) + # Checking the states + for iter, state in enumerate(results[0]): + np.testing.assert_array_almost_equal( + state, + self.ocp.ocp_results.states.tolist()[iter], + err_msg="States are not equal", + ) + + # Checking the ricatti gains + for iter, gain in enumerate(results[1]): + np.testing.assert_array_almost_equal( + gain, + self.ocp.ocp_results.ricatti_gains.tolist()[iter], + err_msg="Ricatti gains are not equal", + ) + + # Checking the feed forward terms + for iter, term in enumerate(results[2]): + np.testing.assert_array_almost_equal( + term, + self.ocp.ocp_results.feed_forward_terms.tolist()[iter], + err_msg="Feed forward term are not equal", + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/agimus_controller/tests/test_ocp_param_base.py b/agimus_controller/tests/test_ocp_param_base.py new file mode 100644 index 00000000..0855f7b4 --- /dev/null +++ b/agimus_controller/tests/test_ocp_param_base.py @@ -0,0 +1,39 @@ +import unittest + +from agimus_controller.ocp_param_base import OCPParamsBaseCroco + + +class TestOCPParamsCrocoBase(unittest.TestCase): + """ + TestOCPParamsCrocoBase unittests parameters settters and getters of OCPParamsBaseCroco class. + """ + + def __init__(self, methodName="runTest"): + super().__init__(methodName) + + def test_initialization(self): + """ + Test the initialization of the OCPParamsBaseCroco class. + """ + params = { + "dt": 0.01, + "horizon_size": 100, + "solver_iters": 50, + "qp_iters": 200, + "termination_tolerance": 1e-3, + "eps_abs": 1e-6, + "eps_rel": 0, + "callbacks": False, + } + ocp_param_base_croco = OCPParamsBaseCroco(**params) + for key, val in params.items(): + res = getattr(ocp_param_base_croco, key) + self.assertEqual( + res, + val, + f"Value missmatch for parameter '{key}'. Expected: '{val}', got: '{res}'", + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/agimus_controller/tests/test_robot_models.py b/agimus_controller/tests/test_robot_models.py new file mode 100644 index 00000000..c8ab1813 --- /dev/null +++ b/agimus_controller/tests/test_robot_models.py @@ -0,0 +1,170 @@ +from copy import deepcopy +from os.path import dirname +import unittest +from pathlib import Path +import example_robot_data as robex +import numpy as np +import pinocchio as pin + +from agimus_controller.factory.robot_model import RobotModelParameters, RobotModels + + +class TestRobotModelParameters(unittest.TestCase): + def test_valid_initialization(self): + """Test that the dataclass initializes correctly with valid input.""" + + robot = robex.load("panda") + urdf_path = Path(robot.urdf) + srdf_path = Path(robot.urdf.replace("urdf", "srdf")) + urdf_meshes_dir = urdf_path.parent.parent.parent.parent.parent + free_flyer = False + q0 = np.zeros(robot.model.nq) + locked_joint_names = ["panda_joint1", "panda_joint2"] + params = RobotModelParameters( + q0=q0, + free_flyer=free_flyer, + locked_joint_names=locked_joint_names, + urdf_path=urdf_path, + srdf_path=srdf_path, + urdf_meshes_dir=urdf_meshes_dir, + collision_as_capsule=True, + self_collision=True, + armature=np.linspace(0.1, 0.9, 9), + ) + + self.assertTrue(np.array_equal(params.q0, q0)) + self.assertEqual(params.free_flyer, free_flyer) + self.assertEqual(params.locked_joint_names, ["panda_joint1", "panda_joint2"]) + self.assertEqual(params.urdf_path, urdf_path) + self.assertEqual(params.srdf_path, srdf_path) + self.assertEqual(params.urdf_meshes_dir, urdf_meshes_dir) + self.assertTrue(params.collision_as_capsule) + self.assertTrue(params.self_collision) + self.assertTrue(np.array_equal(params.armature, np.linspace(0.1, 0.9, 9))) + + def test_empty_q0_raises_error(self): + """Test that an empty q0 raises a ValueError.""" + with self.assertRaises(ValueError): + RobotModelParameters(q0=np.array([])) + + def test_armature_default_value(self): + """Test that the armature is set to default if not provided.""" + robot = robex.load("panda") + urdf_path = Path(robot.urdf) + srdf_path = Path(robot.urdf.replace("urdf", "srdf")) + urdf_meshes_dir = urdf_path.parent.parent.parent.parent.parent + locked_joint_names = ["panda_joint1", "panda_joint2"] + free_flyer = False + q0 = np.zeros(robot.model.nq) + params = RobotModelParameters( + q0=q0, + free_flyer=free_flyer, + locked_joint_names=locked_joint_names, + urdf_path=urdf_path, + srdf_path=srdf_path, + urdf_meshes_dir=urdf_meshes_dir, + collision_as_capsule=True, + self_collision=True, + ) + self.assertTrue(np.array_equal(params.armature, np.zeros_like(q0))) + + def test_armature_mismatched_shape_raises_error(self): + """Test that a mismatched armature raises a ValueError.""" + q0 = np.array([0.0, 1.0, 2.0]) + armature = np.array([0.1, 0.2]) # Wrong shape + with self.assertRaises(ValueError): + RobotModelParameters(q0=q0, armature=armature) + + def test_invalid_urdf_path_raises_error(self): + """Test that an invalid URDF path raises a ValueError.""" + q0 = np.array([0.0, 1.0, 2.0]) + with self.assertRaises(ValueError): + RobotModelParameters(q0=q0, urdf_path=Path("invalid_path")) + + def test_invalid_srdf_path_type_raises_error(self): + """Test that a non-string SRDF path raises a ValueError.""" + q0 = np.array([0.0, 1.0, 2.0]) + with self.assertRaises(ValueError): + RobotModelParameters(q0=q0, srdf_path=Path("invalid_path")) + + +class TestRobotModels(unittest.TestCase): + @classmethod + def setUpClass(self): + """ + This method sets up the shared environment for all test cases in the class. + """ + # Load the example robot model using example robot data to get the URDF path. + robot = robex.load("panda") + urdf_path = Path(robot.urdf) + srdf_path = Path(robot.urdf.replace("urdf", "srdf")) + urdf_meshes_dir = urdf_path.parent.parent.parent.parent.parent + free_flyer = False + q0 = np.zeros(robot.model.nq) + locked_joint_names = ["panda_joint1", "panda_joint2"] + # Store shared initial parameters + self.shared_params = RobotModelParameters( + q0=q0, + free_flyer=free_flyer, + locked_joint_names=locked_joint_names, + urdf_path=urdf_path, + srdf_path=srdf_path, + urdf_meshes_dir=urdf_meshes_dir, + collision_as_capsule=True, + self_collision=True, + armature=np.linspace(0.1, 0.9, robot.model.nq), + ) + + def setUp(self): + """ + This method ensures that a fresh RobotModelParameters and RobotModels instance + are created for each test case while still benefiting from shared setup computations. + """ + self.params = deepcopy(self.shared_params) + self.robot_models = RobotModels(self.params) + + def test_initial_configuration(self): + self.assertTrue(np.array_equal(self.robot_models.q0, self.params.q0)) + + def test_load_models_populates_models(self): + self.robot_models.load_models() + self.assertIsNotNone(self.robot_models.full_robot_model) + self.assertIsNotNone(self.robot_models.visual_model) + self.assertIsNotNone(self.robot_models.collision_model) + + def test_reduced_robot_model(self): + self.robot_models.load_models() + self.assertTrue( + self.robot_models.robot_model.nq + == self.robot_models.full_robot_model.nq + - len(self.params.locked_joint_names) + ) + + def test_invalid_joint_name_raises_value_error(self): + # Modify a fresh instance of parameters for this test + self.params.locked_joint_names = ["InvalidJoint"] + with self.assertRaises(ValueError): + self.robot_models._apply_locked_joints() + + def test_armature_property(self): + self.assertTrue( + np.array_equal(self.robot_models.armature, self.params.armature) + ) + + def test_collision_pairs(self): + """Checking that the collision model has collision pairs.""" + self.assertTrue( + len(self.robot_models.collision_model.collisionPairs) == 44 + ) # Number of collision pairs in the panda model + + def test_rnea(self): + """Checking that the RNEA method works.""" + q = np.zeros(self.robot_models.robot_model.nq) + v = np.zeros(self.robot_models.robot_model.nv) + a = np.zeros(self.robot_models.robot_model.nv) + robot_data = self.robot_models.robot_model.createData() + pin.rnea(self.robot_models.robot_model, robot_data, q, v, a) + + +if __name__ == "__main__": + unittest.main()