MPC Planner Implementation Guide¶
For Developers: This document provides technical guidance for implementing the MPC planner in curobo_ros. It complements the Unified Planner Architecture specification and MPC Planner Tutorial user documentation.
Overview¶
The MPC (Model Predictive Control) planner implementation will add closed-loop, real-time trajectory planning capabilities to curobo_ros. This guide covers:
Architecture integration points
cuRobo MPC API usage
Real-time control loop design
ROS 2 interface implementation
Testing and validation strategy
Architecture Integration¶
File Structure¶
curobo_ros/
├── planners/
│ ├── __init__.py
│ ├── base_planner.py # Abstract base (Phase 2)
│ ├── planner_factory.py # Factory pattern (Phase 2)
│ ├── planner_manager.py # Runtime switching (Phase 2)
│ ├── classic_planner.py # Existing MotionGen wrapper (Phase 2)
│ └── mpc_planner.py # ⭐ MPC implementation (Phase 3)
├── core/
│ ├── config_wrapper.py # Extend with MPC params
│ └── config_wrapper_motion.py # MPC config integration
└── interfaces/
└── unified_planner_node.py # ROS node exposing all planners
Integration with Existing Code¶
The MPC planner integrates with:
ConfigWrapper (
curobo_ros/core/config_wrapper.py)Add MPC-specific parameters
Manage solver configuration
Handle GPU resource allocation
RobotContext (
curobo_ros/robot/robot_context.py)Real-time state feedback loop
Command publishing at control frequency
Joint state monitoring
CollisionContext (voxel grid updates)
Dynamic obstacle updates during execution
Camera integration for real-time perception
cuRobo MPC API Integration¶
Understanding cuRobo’s MPC¶
cuRobo provides MPC capabilities through its optimization framework. Key components:
from curobo.wrap.reacher.mpc import MpcSolver
from curobo.types.math import Pose
from curobo.types.robot import RobotConfig
from curobo.types.state import JointState
# MPC solver initialization
mpc_solver = MpcSolver(
robot_config=robot_config,
world_model=world_model,
mpc_config=mpc_config
)
Key cuRobo APIs to Use¶
MpcSolver.solve_step() - Single MPC iteration
MpcSolver.update_goal() - Update target during execution
MpcSolver.update_world() - Update obstacles in real-time
MpcSolver.get_trajectory() - Get predicted trajectory
MpcSolver.reset() - Reset for new planning task
Configuration Parameters¶
Map ROS parameters to cuRobo MPC config:
# ROS parameters → cuRobo MpcConfig
mpc_config = MpcConfig(
horizon=self.get_parameter('mpc_horizon').value, # 5-15 steps
dt=1.0 / self.get_parameter('mpc_control_frequency').value, # 0.01s @ 100Hz
max_iterations=self.get_parameter('mpc_max_iterations').value, # 1000
convergence_threshold=self.get_parameter('mpc_convergence_threshold').value, # 0.01m
cost_weights={
'position': 1.0,
'rotation': 1.0,
'smoothness': 0.1,
'obstacle': 10.0
}
)
MPC Planner Class Design¶
Class Structure¶
# curobo_ros/planners/mpc_planner.py
from .base_planner import TrajectoryPlanner, PlannerResult, ExecutionMode
from curobo.wrap.reacher.mpc import MpcSolver
import rclpy
from rclpy.node import Node
import threading
class MPCPlanner(TrajectoryPlanner):
"""
Model Predictive Control planner for real-time reactive trajectory planning.
Implements closed-loop control with continuous replanning at each control step.
"""
def __init__(self, node: Node, config_wrapper):
"""
Initialize MPC planner.
Args:
node: ROS 2 node for parameter access and communication
config_wrapper: Configuration wrapper with MPC parameters
"""
self.node = node
self.config_wrapper = config_wrapper
# Load MPC-specific parameters
self.horizon = node.get_parameter('mpc_horizon').value
self.control_freq = node.get_parameter('mpc_control_frequency').value
self.max_iterations = node.get_parameter('mpc_max_iterations').value
self.convergence_threshold = node.get_parameter('mpc_convergence_threshold').value
# Initialize cuRobo MPC solver
self.mpc_solver = self._create_mpc_solver()
# Control loop state
self.is_executing = False
self.execution_thread = None
self.goal_pose = None
# Performance monitoring
self.stats_publisher = node.create_publisher(
MPCStats,
'mpc_stats',
10
)
self.node.get_logger().info('MPC Planner initialized')
def _create_mpc_solver(self) -> MpcSolver:
"""Create and configure cuRobo MPC solver."""
# Get robot and world config from ConfigWrapper
robot_config = self.config_wrapper.get_robot_config()
world_model = self.config_wrapper.get_world_model()
# Build MPC configuration
mpc_config = MpcConfig(
horizon=self.horizon,
dt=1.0 / self.control_freq,
max_iterations=self.max_iterations,
convergence_threshold=self.convergence_threshold,
# Cost weights for optimization
cost_weights={
'position_error': 1.0,
'rotation_error': 1.0,
'smoothness': 0.1,
'obstacle_avoidance': 10.0,
'joint_limits': 5.0,
'velocity_limits': 2.0
}
)
return MpcSolver(
robot_config=robot_config,
world_model=world_model,
mpc_config=mpc_config
)
def get_execution_mode(self) -> ExecutionMode:
"""MPC uses closed-loop execution."""
return ExecutionMode.CLOSED_LOOP
def plan(self, start_state, goal_pose, config) -> PlannerResult:
"""
Plan initial trajectory to goal using MPC.
For MPC, this performs initial feasibility check and warm-start.
The actual trajectory is generated continuously during execute().
Args:
start_state: Current robot joint state
goal_pose: Target end-effector pose
config: Planning configuration (optional overrides)
Returns:
PlannerResult with initial trajectory and feasibility
"""
self.node.get_logger().info('MPC planning started')
# Store goal for execution phase
self.goal_pose = goal_pose
# Reset MPC solver
self.mpc_solver.reset()
self.mpc_solver.update_goal(goal_pose)
# Perform initial solve for feasibility check
initial_result = self.mpc_solver.solve_step(
current_state=start_state,
goal_pose=goal_pose
)
if not initial_result.success:
self.node.get_logger().error(
f'MPC initial planning failed: {initial_result.error_msg}'
)
return PlannerResult(
success=False,
message='MPC initial solve failed - goal may be unreachable',
trajectory=None
)
# Return initial trajectory (will be refined during execution)
return PlannerResult(
success=True,
message='MPC planning succeeded - ready for closed-loop execution',
trajectory=initial_result.trajectory,
computation_time=initial_result.solve_time
)
def execute(self, robot_context, goal_handle) -> bool:
"""
Execute trajectory using closed-loop MPC control.
Continuously replans at control frequency until goal is reached.
Args:
robot_context: Interface to robot for state/command
goal_handle: ROS action goal handle for feedback/cancellation
Returns:
True if goal reached successfully, False if failed/cancelled
"""
self.node.get_logger().info('MPC execution started')
self.is_executing = True
control_rate = self.node.create_rate(self.control_freq)
iteration = 0
start_time = self.node.get_clock().now()
try:
while self.is_executing and rclpy.ok():
# Check for cancellation
if goal_handle.is_cancel_requested:
self.node.get_logger().warn('MPC execution cancelled')
goal_handle.canceled()
return False
# Get current robot state
current_state = robot_context.get_current_joint_state()
# Update world model (dynamic obstacles from cameras)
self._update_world_model()
# Solve MPC step
step_start = self.node.get_clock().now()
result = self.mpc_solver.solve_step(
current_state=current_state,
goal_pose=self.goal_pose
)
step_time = (self.node.get_clock().now() - step_start).nanoseconds / 1e6
if not result.success:
self.node.get_logger().error(f'MPC step failed: {result.error_msg}')
return False
# Execute first control action
command = result.trajectory.position[0] # First waypoint
robot_context.send_joint_command(command)
# Compute distance to goal
current_pose = robot_context.compute_fk(current_state)
goal_error = self._compute_pose_error(current_pose, self.goal_pose)
# Publish performance stats
self._publish_stats(iteration, goal_error, step_time, result)
# Publish feedback
feedback = SendTrajectoryFeedback()
feedback.progress = max(0.0, 1.0 - goal_error / 1.0) # Normalize
feedback.current_error = goal_error
goal_handle.publish_feedback(feedback)
# Check convergence
if goal_error < self.convergence_threshold:
self.node.get_logger().info(
f'MPC converged after {iteration} iterations '
f'({(self.node.get_clock().now() - start_time).nanoseconds / 1e9:.2f}s)'
)
goal_handle.succeed()
return True
# Check max iterations
if iteration >= self.max_iterations:
self.node.get_logger().warn(
f'MPC reached max iterations ({self.max_iterations}) '
f'without converging (error: {goal_error:.4f}m)'
)
return False
iteration += 1
control_rate.sleep()
return False
except Exception as e:
self.node.get_logger().error(f'MPC execution error: {str(e)}')
return False
finally:
self.is_executing = False
def _update_world_model(self):
"""Update collision world with latest camera data."""
# Get latest voxel grid from cameras
voxel_grid = self.config_wrapper.get_voxel_grid()
# Update MPC solver's world model
self.mpc_solver.update_world(voxel_grid)
def _compute_pose_error(self, current_pose, goal_pose) -> float:
"""Compute position + orientation error to goal."""
pos_error = np.linalg.norm(
current_pose.position - goal_pose.position
)
# Quaternion distance (simplified)
rot_error = 1.0 - abs(np.dot(
current_pose.quaternion,
goal_pose.quaternion
))
# Weighted combination
return pos_error + 0.1 * rot_error
def _publish_stats(self, iteration, error, step_time, result):
"""Publish MPC performance statistics."""
stats = MPCStats()
stats.stamp = self.node.get_clock().now().to_msg()
stats.iteration = iteration
stats.convergence_error = error
stats.computation_time = step_time
stats.horizon_length = self.horizon
stats.success = result.success
self.stats_publisher.publish(stats)
def stop(self):
"""Stop MPC execution immediately."""
self.is_executing = False
self.node.get_logger().info('MPC execution stopped')
def update_parameters(self, params: dict):
"""Dynamically update MPC parameters."""
if 'mpc_horizon' in params:
self.horizon = params['mpc_horizon']
self.mpc_solver.update_config(horizon=self.horizon)
if 'mpc_convergence_threshold' in params:
self.convergence_threshold = params['mpc_convergence_threshold']
# Recreate solver if major config changed
if 'mpc_control_frequency' in params:
self.control_freq = params['mpc_control_frequency']
self.mpc_solver = self._create_mpc_solver()
ROS 2 Parameters¶
Add to curobo_ros/core/config_wrapper.py:
# MPC Planner Parameters
self.declare_parameter('mpc_convergence_threshold', 0.01)
self.declare_parameter('mpc_max_iterations', 1000)
self.declare_parameter('mpc_horizon', 10)
self.declare_parameter('mpc_control_frequency', 100.0)
self.declare_parameter('mpc_position_weight', 1.0)
self.declare_parameter('mpc_rotation_weight', 1.0)
self.declare_parameter('mpc_smoothness_weight', 0.1)
self.declare_parameter('mpc_obstacle_weight', 10.0)
ROS 2 Messages¶
Add to curobo_msgs/msg/MPCStats.msg:
# MPC performance statistics
std_msgs/Header header
int32 iteration # Current iteration number
float64 convergence_error # Distance to goal (meters)
float64 computation_time # Step solve time (ms)
int32 horizon_length # Planning horizon
bool success # Step succeeded
Testing Strategy¶
Unit Tests¶
# tests/test_mpc_planner.py
def test_mpc_planner_initialization():
"""Test MPC planner creates successfully."""
planner = MPCPlanner(node, config_wrapper)
assert planner.get_execution_mode() == ExecutionMode.CLOSED_LOOP
def test_mpc_planning_feasibility():
"""Test MPC can find initial solution."""
start = get_test_joint_state()
goal = get_reachable_pose()
result = planner.plan(start, goal, {})
assert result.success
assert result.trajectory is not None
def test_mpc_execution_convergence():
"""Test MPC execution converges to goal."""
# Mock robot context
robot_context = MockRobotContext()
goal_handle = MockGoalHandle()
success = planner.execute(robot_context, goal_handle)
assert success
assert robot_context.final_error < 0.01
Integration Tests¶
def test_mpc_with_dynamic_obstacles():
"""Test MPC avoids obstacles added during execution."""
# Start MPC execution
threading.Thread(target=planner.execute, args=(robot_context, goal_handle)).start()
# Wait then add obstacle in path
time.sleep(1.0)
add_collision_sphere([0.5, 0.3, 0.4], radius=0.1)
# MPC should avoid and still reach goal
wait_for_completion(timeout=10.0)
assert goal_reached()
assert no_collisions()
Performance Optimization¶
GPU Memory Management¶
Pre-allocate tensors for MPC solver
Reuse memory across iterations
Monitor GPU utilization
Real-Time Considerations¶
Use dedicated high-priority thread for control loop
Ensure control frequency is maintained
Add watchdog for missed deadlines
Benchmarking¶
Target performance:
Control frequency: 50-100 Hz
Step solve time: < 10 ms (at 100 Hz)
Convergence: < 5 seconds for typical tasks
GPU usage: 50-80% sustained
Future Enhancements¶
Adaptive Horizon: Adjust horizon based on distance to goal
Warm-Start: Use previous solution for faster convergence
Multi-Goal MPC: Switch goals dynamically
Learned Cost Functions: Integrate learned models
Distributed MPC: Multi-robot coordination
References¶
Questions / Support¶
For implementation questions:
Check existing robot strategy pattern in
curobo_ros/robot/Review
config_wrapper.pyfor parameter management patternsRefer to cuRobo examples in
curobo/examples/Open GitHub issue with
[MPC Implementation]tag