MPC Planner Implementation Guide

For Developers: This document provides technical guidance for implementing the MPC planner in curobo_ros. It complements the Unified Planner Architecture specification and MPC Planner Tutorial user documentation.


Overview

The MPC (Model Predictive Control) planner implementation will add closed-loop, real-time trajectory planning capabilities to curobo_ros. This guide covers:

  1. Architecture integration points

  2. cuRobo MPC API usage

  3. Real-time control loop design

  4. ROS 2 interface implementation

  5. Testing and validation strategy


Architecture Integration

File Structure

curobo_ros/
├── planners/
│   ├── __init__.py
│   ├── base_planner.py          # Abstract base (Phase 2)
│   ├── planner_factory.py       # Factory pattern (Phase 2)
│   ├── planner_manager.py       # Runtime switching (Phase 2)
│   ├── classic_planner.py       # Existing MotionGen wrapper (Phase 2)
│   └── mpc_planner.py           # ⭐ MPC implementation (Phase 3)
├── core/
│   ├── config_wrapper.py        # Extend with MPC params
│   └── config_wrapper_motion.py # MPC config integration
└── interfaces/
    └── unified_planner_node.py  # ROS node exposing all planners

Integration with Existing Code

The MPC planner integrates with:

  1. ConfigWrapper (curobo_ros/core/config_wrapper.py)

    • Add MPC-specific parameters

    • Manage solver configuration

    • Handle GPU resource allocation

  2. RobotContext (curobo_ros/robot/robot_context.py)

    • Real-time state feedback loop

    • Command publishing at control frequency

    • Joint state monitoring

  3. CollisionContext (voxel grid updates)

    • Dynamic obstacle updates during execution

    • Camera integration for real-time perception


cuRobo MPC API Integration

Understanding cuRobo’s MPC

cuRobo provides MPC capabilities through its optimization framework. Key components:

from curobo.wrap.reacher.mpc import MpcSolver
from curobo.types.math import Pose
from curobo.types.robot import RobotConfig
from curobo.types.state import JointState

# MPC solver initialization
mpc_solver = MpcSolver(
    robot_config=robot_config,
    world_model=world_model,
    mpc_config=mpc_config
)

Key cuRobo APIs to Use

  1. MpcSolver.solve_step() - Single MPC iteration

  2. MpcSolver.update_goal() - Update target during execution

  3. MpcSolver.update_world() - Update obstacles in real-time

  4. MpcSolver.get_trajectory() - Get predicted trajectory

  5. MpcSolver.reset() - Reset for new planning task

Configuration Parameters

Map ROS parameters to cuRobo MPC config:

# ROS parameters → cuRobo MpcConfig
mpc_config = MpcConfig(
    horizon=self.get_parameter('mpc_horizon').value,  # 5-15 steps
    dt=1.0 / self.get_parameter('mpc_control_frequency').value,  # 0.01s @ 100Hz
    max_iterations=self.get_parameter('mpc_max_iterations').value,  # 1000
    convergence_threshold=self.get_parameter('mpc_convergence_threshold').value,  # 0.01m
    cost_weights={
        'position': 1.0,
        'rotation': 1.0,
        'smoothness': 0.1,
        'obstacle': 10.0
    }
)

MPC Planner Class Design

Class Structure

# curobo_ros/planners/mpc_planner.py

from .base_planner import TrajectoryPlanner, PlannerResult, ExecutionMode
from curobo.wrap.reacher.mpc import MpcSolver
import rclpy
from rclpy.node import Node
import threading

class MPCPlanner(TrajectoryPlanner):
    """
    Model Predictive Control planner for real-time reactive trajectory planning.

    Implements closed-loop control with continuous replanning at each control step.
    """

    def __init__(self, node: Node, config_wrapper):
        """
        Initialize MPC planner.

        Args:
            node: ROS 2 node for parameter access and communication
            config_wrapper: Configuration wrapper with MPC parameters
        """
        self.node = node
        self.config_wrapper = config_wrapper

        # Load MPC-specific parameters
        self.horizon = node.get_parameter('mpc_horizon').value
        self.control_freq = node.get_parameter('mpc_control_frequency').value
        self.max_iterations = node.get_parameter('mpc_max_iterations').value
        self.convergence_threshold = node.get_parameter('mpc_convergence_threshold').value

        # Initialize cuRobo MPC solver
        self.mpc_solver = self._create_mpc_solver()

        # Control loop state
        self.is_executing = False
        self.execution_thread = None
        self.goal_pose = None

        # Performance monitoring
        self.stats_publisher = node.create_publisher(
            MPCStats,
            'mpc_stats',
            10
        )

        self.node.get_logger().info('MPC Planner initialized')

    def _create_mpc_solver(self) -> MpcSolver:
        """Create and configure cuRobo MPC solver."""
        # Get robot and world config from ConfigWrapper
        robot_config = self.config_wrapper.get_robot_config()
        world_model = self.config_wrapper.get_world_model()

        # Build MPC configuration
        mpc_config = MpcConfig(
            horizon=self.horizon,
            dt=1.0 / self.control_freq,
            max_iterations=self.max_iterations,
            convergence_threshold=self.convergence_threshold,
            # Cost weights for optimization
            cost_weights={
                'position_error': 1.0,
                'rotation_error': 1.0,
                'smoothness': 0.1,
                'obstacle_avoidance': 10.0,
                'joint_limits': 5.0,
                'velocity_limits': 2.0
            }
        )

        return MpcSolver(
            robot_config=robot_config,
            world_model=world_model,
            mpc_config=mpc_config
        )

    def get_execution_mode(self) -> ExecutionMode:
        """MPC uses closed-loop execution."""
        return ExecutionMode.CLOSED_LOOP

    def plan(self, start_state, goal_pose, config) -> PlannerResult:
        """
        Plan initial trajectory to goal using MPC.

        For MPC, this performs initial feasibility check and warm-start.
        The actual trajectory is generated continuously during execute().

        Args:
            start_state: Current robot joint state
            goal_pose: Target end-effector pose
            config: Planning configuration (optional overrides)

        Returns:
            PlannerResult with initial trajectory and feasibility
        """
        self.node.get_logger().info('MPC planning started')

        # Store goal for execution phase
        self.goal_pose = goal_pose

        # Reset MPC solver
        self.mpc_solver.reset()
        self.mpc_solver.update_goal(goal_pose)

        # Perform initial solve for feasibility check
        initial_result = self.mpc_solver.solve_step(
            current_state=start_state,
            goal_pose=goal_pose
        )

        if not initial_result.success:
            self.node.get_logger().error(
                f'MPC initial planning failed: {initial_result.error_msg}'
            )
            return PlannerResult(
                success=False,
                message='MPC initial solve failed - goal may be unreachable',
                trajectory=None
            )

        # Return initial trajectory (will be refined during execution)
        return PlannerResult(
            success=True,
            message='MPC planning succeeded - ready for closed-loop execution',
            trajectory=initial_result.trajectory,
            computation_time=initial_result.solve_time
        )

    def execute(self, robot_context, goal_handle) -> bool:
        """
        Execute trajectory using closed-loop MPC control.

        Continuously replans at control frequency until goal is reached.

        Args:
            robot_context: Interface to robot for state/command
            goal_handle: ROS action goal handle for feedback/cancellation

        Returns:
            True if goal reached successfully, False if failed/cancelled
        """
        self.node.get_logger().info('MPC execution started')

        self.is_executing = True
        control_rate = self.node.create_rate(self.control_freq)

        iteration = 0
        start_time = self.node.get_clock().now()

        try:
            while self.is_executing and rclpy.ok():
                # Check for cancellation
                if goal_handle.is_cancel_requested:
                    self.node.get_logger().warn('MPC execution cancelled')
                    goal_handle.canceled()
                    return False

                # Get current robot state
                current_state = robot_context.get_current_joint_state()

                # Update world model (dynamic obstacles from cameras)
                self._update_world_model()

                # Solve MPC step
                step_start = self.node.get_clock().now()
                result = self.mpc_solver.solve_step(
                    current_state=current_state,
                    goal_pose=self.goal_pose
                )
                step_time = (self.node.get_clock().now() - step_start).nanoseconds / 1e6

                if not result.success:
                    self.node.get_logger().error(f'MPC step failed: {result.error_msg}')
                    return False

                # Execute first control action
                command = result.trajectory.position[0]  # First waypoint
                robot_context.send_joint_command(command)

                # Compute distance to goal
                current_pose = robot_context.compute_fk(current_state)
                goal_error = self._compute_pose_error(current_pose, self.goal_pose)

                # Publish performance stats
                self._publish_stats(iteration, goal_error, step_time, result)

                # Publish feedback
                feedback = SendTrajectoryFeedback()
                feedback.progress = max(0.0, 1.0 - goal_error / 1.0)  # Normalize
                feedback.current_error = goal_error
                goal_handle.publish_feedback(feedback)

                # Check convergence
                if goal_error < self.convergence_threshold:
                    self.node.get_logger().info(
                        f'MPC converged after {iteration} iterations '
                        f'({(self.node.get_clock().now() - start_time).nanoseconds / 1e9:.2f}s)'
                    )
                    goal_handle.succeed()
                    return True

                # Check max iterations
                if iteration >= self.max_iterations:
                    self.node.get_logger().warn(
                        f'MPC reached max iterations ({self.max_iterations}) '
                        f'without converging (error: {goal_error:.4f}m)'
                    )
                    return False

                iteration += 1
                control_rate.sleep()

            return False

        except Exception as e:
            self.node.get_logger().error(f'MPC execution error: {str(e)}')
            return False

        finally:
            self.is_executing = False

    def _update_world_model(self):
        """Update collision world with latest camera data."""
        # Get latest voxel grid from cameras
        voxel_grid = self.config_wrapper.get_voxel_grid()

        # Update MPC solver's world model
        self.mpc_solver.update_world(voxel_grid)

    def _compute_pose_error(self, current_pose, goal_pose) -> float:
        """Compute position + orientation error to goal."""
        pos_error = np.linalg.norm(
            current_pose.position - goal_pose.position
        )

        # Quaternion distance (simplified)
        rot_error = 1.0 - abs(np.dot(
            current_pose.quaternion,
            goal_pose.quaternion
        ))

        # Weighted combination
        return pos_error + 0.1 * rot_error

    def _publish_stats(self, iteration, error, step_time, result):
        """Publish MPC performance statistics."""
        stats = MPCStats()
        stats.stamp = self.node.get_clock().now().to_msg()
        stats.iteration = iteration
        stats.convergence_error = error
        stats.computation_time = step_time
        stats.horizon_length = self.horizon
        stats.success = result.success

        self.stats_publisher.publish(stats)

    def stop(self):
        """Stop MPC execution immediately."""
        self.is_executing = False
        self.node.get_logger().info('MPC execution stopped')

    def update_parameters(self, params: dict):
        """Dynamically update MPC parameters."""
        if 'mpc_horizon' in params:
            self.horizon = params['mpc_horizon']
            self.mpc_solver.update_config(horizon=self.horizon)

        if 'mpc_convergence_threshold' in params:
            self.convergence_threshold = params['mpc_convergence_threshold']

        # Recreate solver if major config changed
        if 'mpc_control_frequency' in params:
            self.control_freq = params['mpc_control_frequency']
            self.mpc_solver = self._create_mpc_solver()

ROS 2 Parameters

Add to curobo_ros/core/config_wrapper.py:

# MPC Planner Parameters
self.declare_parameter('mpc_convergence_threshold', 0.01)
self.declare_parameter('mpc_max_iterations', 1000)
self.declare_parameter('mpc_horizon', 10)
self.declare_parameter('mpc_control_frequency', 100.0)
self.declare_parameter('mpc_position_weight', 1.0)
self.declare_parameter('mpc_rotation_weight', 1.0)
self.declare_parameter('mpc_smoothness_weight', 0.1)
self.declare_parameter('mpc_obstacle_weight', 10.0)

ROS 2 Messages

Add to curobo_msgs/msg/MPCStats.msg:

# MPC performance statistics
std_msgs/Header header

int32 iteration              # Current iteration number
float64 convergence_error    # Distance to goal (meters)
float64 computation_time     # Step solve time (ms)
int32 horizon_length         # Planning horizon
bool success                 # Step succeeded

Testing Strategy

Unit Tests

# tests/test_mpc_planner.py

def test_mpc_planner_initialization():
    """Test MPC planner creates successfully."""
    planner = MPCPlanner(node, config_wrapper)
    assert planner.get_execution_mode() == ExecutionMode.CLOSED_LOOP

def test_mpc_planning_feasibility():
    """Test MPC can find initial solution."""
    start = get_test_joint_state()
    goal = get_reachable_pose()

    result = planner.plan(start, goal, {})
    assert result.success
    assert result.trajectory is not None

def test_mpc_execution_convergence():
    """Test MPC execution converges to goal."""
    # Mock robot context
    robot_context = MockRobotContext()
    goal_handle = MockGoalHandle()

    success = planner.execute(robot_context, goal_handle)
    assert success
    assert robot_context.final_error < 0.01

Integration Tests

def test_mpc_with_dynamic_obstacles():
    """Test MPC avoids obstacles added during execution."""
    # Start MPC execution
    threading.Thread(target=planner.execute, args=(robot_context, goal_handle)).start()

    # Wait then add obstacle in path
    time.sleep(1.0)
    add_collision_sphere([0.5, 0.3, 0.4], radius=0.1)

    # MPC should avoid and still reach goal
    wait_for_completion(timeout=10.0)
    assert goal_reached()
    assert no_collisions()

Performance Optimization

GPU Memory Management

  • Pre-allocate tensors for MPC solver

  • Reuse memory across iterations

  • Monitor GPU utilization

Real-Time Considerations

  • Use dedicated high-priority thread for control loop

  • Ensure control frequency is maintained

  • Add watchdog for missed deadlines

Benchmarking

Target performance:

  • Control frequency: 50-100 Hz

  • Step solve time: < 10 ms (at 100 Hz)

  • Convergence: < 5 seconds for typical tasks

  • GPU usage: 50-80% sustained


Future Enhancements

  1. Adaptive Horizon: Adjust horizon based on distance to goal

  2. Warm-Start: Use previous solution for faster convergence

  3. Multi-Goal MPC: Switch goals dynamically

  4. Learned Cost Functions: Integrate learned models

  5. Distributed MPC: Multi-robot coordination


References


Questions / Support

For implementation questions:

  1. Check existing robot strategy pattern in curobo_ros/robot/

  2. Review config_wrapper.py for parameter management patterns

  3. Refer to cuRobo examples in curobo/examples/

  4. Open GitHub issue with [MPC Implementation] tag