Full VLA Workflow
Introduction
This chapter integrates all VLA components - voice recognition, language planning, vision pipelines, humanoid kinematics, and manipulation - into a complete end-to-end system. We'll build a household humanoid robot that can execute complex multi-step tasks from natural language commands.
Learning Objectives:
- Integrate all VLA subsystems into unified architecture
- Handle complete task workflows from command to execution
- Implement error recovery and replanning
- Deploy VLA system on real humanoid hardware
- Benchmark and optimize system performance
Complete VLA Architecture
System Overview
┌────────────────────────────────────────────────────────────────┐
│ User: "Clean the table" │
└────────────────────────────────────────────────────────────────┘
↓
┌────────────────────────────────────────────────────────────────┐
│ Voice Interface (Whisper) │
│ Input: Audio → Output: Text │
└────────────────────────────────────────────────────────────────┘
↓
┌────────────────────────────────────────────────────────────────┐
│ Language Planner (GPT-4 / Llama 3) │
│ Input: Text → Output: Task Plan │
│ [navigate(table), pick(cup), place(cup, tray), wipe(table)] │
└────────────────────────────────────────────────────────────────┘
↓
┌────────────────────────────────────────────────────────────────┐
│ Perception Pipeline (YOLO + VSLAM + Depth) │
│ Input: RGB-D Images → Output: Scene Graph │
│ {table: pose, cup: pose, tray: pose, obstacles: [...]} │
└────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────── ─────────────────────────────────┐
│ Task Executor │
│ For each action in plan: │
│ - Navigation: ROS Nav2 + VSLAM │
│ - Manipulation: Grasp Planning + MoveIt │
│ - Whole-Body Control: Locomotion + Arms │
└────────────────────────────────────────────────────────────────┘
↓
┌────────────────────────────────────────────────────────────────┐
│ Robot Hardware (Humanoid) │
│ - Motors: Joint control │
│ - Sensors: Cameras, IMU, Force/Torque │
│ - Feedback: Success/Failure + Observations │
└────────────────────────────────────────────────────────────────┘
Implementation
Main VLA System
import rclpy
from rclpy.node import Node
import whisper
from transformers import AutoModelForCausalLM, AutoTokenizer
from ultralytics import YOLO
import numpy as np
class CompleteVLASystem(Node):
def __init__(self):
super().__init__('vla_system')
# 1. Voice Interface
self.whisper_model = whisper.load_model("base")
# 2. Language Planner
self.llm = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-3-8B")
self.tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3-8B")
# 3. Vision Pipeline
self.object_detector = YOLO("yolov8n.pt")
# 4. Controllers
self.navigation = NavigationController()
self.manipulation = ManipulationController()
self.locomotion = HumanoidController()
# ROS subscriptions
self.image_sub = self.create_subscription(
Image, '/camera/image_raw', self.image_callback, 10
)
# State
self.latest_image = None
self.scene_graph = {}
self.get_logger().info("Complete VLA system initialized")
def execute_voice_command(self, audio_data):
"""
End-to-end pipeline: Voice → Action
Example: User says "Bring me the red cup from the kitchen"
"""
# Step 1: Voice → Text
text_command = self.voice_to_text(audio_data)
self.get_logger().info(f"Recognized command: {text_command}")
# Step 2: Text → Task Plan
task_plan = self.language_to_plan(text_command)
self.get_logger().info(f"Generated plan: {task_plan}")
# Step 3: Execute Plan
success = self.execute_plan(task_plan)
if success:
self.get_logger().info("Task completed successfully!")
self.speak("Task completed!")
else:
self.get_logger().error("Task failed")
self.speak("Sorry, I couldn't complete the task")
return success
def voice_to_text(self, audio_data):
"""Convert speech to text with Whisper"""
result = self.whisper_model.transcribe(audio_data)
return result['text']
def language_to_plan(self, command: str):
"""
Use LLM to decompose command into executable steps
Input: "Bring me the red cup from the kitchen"
Output: [
{'action': 'navigate', 'location': 'kitchen'},
{'action': 'detect', 'object': 'red cup'},
{'action': 'pick', 'object': 'red cup'},
{'action': 'navigate', 'location': 'user'},
{'action': 'place', 'object': 'red cup', 'location': 'user'}
]
"""
prompt = f"""
You are a robot task planner. Break down this command into steps.
Command: "{command}"
Available actions:
- navigate(location)
- detect(object)
- pick(object)
- place(object, location)
- say(message)
Output JSON plan:
"""
inputs = self.tokenizer(prompt, return_tensors="pt")
outputs = self.llm.generate(**inputs, max_length=300)
response = self.tokenizer.decode(outputs[0])
# Parse JSON from response
plan = self.parse_plan_json(response)
return plan
def execute_plan(self, plan):
"""Execute each step in the plan"""
for step_idx, step in enumerate(plan):
self.get_logger().info(f"Executing step {step_idx + 1}/{len(plan)}: {step}")
action = step['action']
try:
if action == 'navigate':
self.execute_navigation(step['location'])
elif action == 'detect':
self.execute_detection(step['object'])
elif action == 'pick':
self.execute_pick(step['object'])
elif action == 'place':
self.execute_place(step['object'], step.get('location'))
elif action == 'say':
self.speak(step['message'])
else:
self.get_logger().error(f"Unknown action: {action}")
return False
except Exception as e:
self.get_logger().error(f"Step failed: {e}")
# Attempt recovery
if self.attempt_recovery(step, e):
self.get_logger().info("Recovery successful")
continue
else:
return False
return True
def execute_navigation(self, location: str):
"""Navigate to named location"""
# Convert location name to coordinates
location_map = {
'kitchen': (5.0, 2.0, 0.0),
'living_room': (0.0, 0.0, 0.0),
'user': (1.0, 1.0, 0.0)
}
if location not in location_map:
raise ValueError(f"Unknown location: {location}")
target_pose = location_map[location]
# Use Nav2 to navigate
self.navigation.navigate_to_pose(target_pose)
# Wait for completion
while not self.navigation.is_goal_reached():
rclpy.spin_once(self, timeout_sec=0.1)
def execute_detection(self, object_name: str):
"""Detect object in scene"""
if self.latest_image is None:
raise RuntimeError("No camera image available")
# Run object detection
detections = self.object_detector(self.latest_image)
# Find matching object
for det in detections:
if object_name.lower() in det['class'].lower():
# Estimate 3D pose
bbox = det['bbox']
depth_value = self.get_depth_at_bbox(bbox)
object_pose_3d = self.bbox_to_3d_pose(bbox, depth_value)
# Store in scene graph
self.scene_graph[object_name] = {
'pose': object_pose_3d,
'bbox': bbox,
'confidence': det['confidence']
}
self.get_logger().info(f"Detected {object_name} at {object_pose_3d}")
return
raise ValueError(f"Could not detect {object_name}")
def execute_pick(self, object_name: str):
"""Pick up object"""
if object_name not in self.scene_graph:
raise ValueError(f"Object {object_name} not in scene graph")
object_pose = self.scene_graph[object_name]['pose']
# 1. Plan grasp
grasp_pose = self.manipulation.plan_grasp(object_pose)
# 2. Reach to pre-grasp pose
pre_grasp_pose = grasp_pose.copy()
pre_grasp_pose[2] += 0.10 # 10cm above
self.manipulation.move_to_pose(pre_grasp_pose)
# 3. Approach
self.manipulation.move_to_pose(grasp_pose)
# 4. Close gripper
self.manipulation.close_gripper()
# 5. Lift
lift_pose = grasp_pose.copy()
lift_pose[2] += 0.15
self.manipulation.move_to_pose(lift_pose)
self.get_logger().info(f"Picked {object_name}")
def execute_place(self, object_name: str, location: str):
"""Place object at location"""
# Get placement location
if location in self.scene_graph:
place_pose = self.scene_graph[location]['pose']
else:
# Named location (e.g., "user")
place_pose = self.get_named_location(location)
# Adjust for placement height
place_pose[2] += 0.05 # 5cm above surface
# 1. Move to placement position
self.manipulation.move_to_pose(place_pose)
# 2. Open gripper
self.manipulation.open_gripper()
# 3. Retract
retract_pose = place_pose.copy()
retract_pose[2] += 0.10
self.manipulation.move_to_pose(retract_pose)
self.get_logger().info(f"Placed {object_name} at {location}")
def attempt_recovery(self, failed_step, error):
"""
Error recovery with LLM replanning
Example errors:
- Navigation blocked: Find alternate path
- Object not detected: Search in wider area
- Grasp failed: Try different grasp angle
"""
prompt = f"""
Step failed: {failed_step}
Error: {error}
Suggest recovery action:
"""
inputs = self.tokenizer(prompt, return_tensors="pt")
outputs = self.llm.generate(**inputs, max_length=100)
recovery_plan = self.tokenizer.decode(outputs[0])
self.get_logger().info(f"Recovery plan: {recovery_plan}")
# Execute recovery
# (Simplified - real implementation would parse and execute recovery steps)
return False
def speak(self, text: str):
"""Text-to-speech feedback"""
from gtts import gTTS
import os
tts = gTTS(text=text, lang='en')
tts.save("/tmp/speech.mp3")
os.system("mpg321 /tmp/speech.mp3")
def image_callback(self, msg):
"""Store latest camera image"""
self.latest_image = self.bridge.imgmsg_to_cv2(msg, "bgr8")
def main():
rclpy.init()
vla_system = CompleteVLASystem()
# Listen for voice commands
voice_interface = VoiceInterface()
while rclpy.ok():
# Wait for voice command
print("Listening for command...")
audio = voice_interface.listen(duration=5)
if audio is not None:
# Execute command
vla_system.execute_voice_command(audio)
# Spin ROS
rclpy.spin_once(vla_system, timeout_sec=0.1)
if __name__ == '__main__':
main()
Real-World Example: Kitchen Assistant
Complete Task Workflow
User Command: "Clean up the kitchen - put dirty dishes in the sink and wipe the counter"
System Execution:
class KitchenAssistant(CompleteVLASystem):
def clean_kitchen_workflow(self):
"""
Multi-step task execution:
1. Navigate to kitchen
2. Detect all objects on counter
3. Classify objects (dishes vs non-dishes)
4. Pick each dish and place in sink
5. Grasp cleaning cloth
6. Wipe counter with dual-arm coordination
7. Return to starting position
"""
# Step 1: Navigate to kitchen
self.get_logger().info("Navigating to kitchen...")
self.execute_navigation('kitchen')
# Step 2: Detect objects
self.get_logger().info("Detecting objects on counter...")
counter_objects = self.detect_objects_in_region(
region_name='counter',
region_bounds={'x': [0, 1.5], 'y': [-0.5, 0.5]}
)
# Step 3: Classify objects
dishes = []
other_objects = []
for obj in counter_objects:
if self.is_dish(obj):
dishes.append(obj)
else:
other_objects.append(obj)
self.get_logger().info(f"Found {len(dishes)} dishes, {len(other_objects)} other objects")
# Step 4: Move dishes to sink
for dish in dishes:
try:
self.execute_pick(dish['name'])
self.execute_place(dish['name'], 'sink')
except Exception as e:
self.get_logger().warn(f"Failed to move {dish['name']}: {e}")
continue
# Step 5: Grasp cleaning cloth
self.execute_pick('cloth')
# Step 6: Wipe counter
self.wipe_surface(
surface='counter',
wiping_pattern='zigzag',
num_passes=3
)
# Step 7: Return home
self.execute_navigation('home')
self.speak("Kitchen cleaned!")
def is_dish(self, object_dict):
"""Classify if object is a dish using vision model"""
object_class = object_dict['class'].lower()
dish_keywords = ['plate', 'bowl', 'cup', 'mug', 'glass', 'utensil']
return any(keyword in object_class for keyword in dish_keywords)
def wipe_surface(self, surface: str, wiping_pattern: str, num_passes: int):
"""
Wipe surface with cloth using dual-arm coordination
Args:
surface: Surface to wipe (e.g., 'counter')
wiping_pattern: 'zigzag', 'circular', or 'linear'
num_passes: Number of wiping passes
"""
surface_bounds = self.scene_graph[surface]['bounds']
# Generate wiping trajectory
waypoints = self.generate_wiping_trajectory(
surface_bounds, wiping_pattern, num_passes
)
# Execute wiping motion
for waypoint in waypoints:
# Move arm while applying downward force
self.manipulation.move_with_force_control(
target_pose=waypoint,
force_direction=[0, 0, -1],
force_magnitude=5.0 # 5N downward
)
def generate_wiping_trajectory(self, bounds, pattern, num_passes):
"""Generate waypoints for wiping motion"""
waypoints = []
x_min, x_max = bounds['x']
y_min, y_max = bounds['y']
z = bounds['z'] + 0.05 # 5cm above surface
if pattern == 'zigzag':
for i in range(num_passes):
y = y_min + (y_max - y_min) * i / (num_passes - 1)
# Forward pass
if i % 2 == 0:
waypoints.append([x_min, y, z])
waypoints.append([x_max, y, z])
# Backward pass
else:
waypoints.append([x_max, y, z])
waypoints.append([x_min, y, z])
return waypoints
# Usage
assistant = KitchenAssistant()
assistant.clean_kitchen_workflow()
Performance Optimization
Latency Breakdown
Typical End-to-End Latency:
| Component | Latency | Optimization |
|---|---|---|
| Voice Recognition | 500ms | Use Whisper-small (300ms) |
| LLM Planning | 2000ms | Quantize to 4-bit (800ms) |
| Object Detection | 50ms | TensorRT (30ms) |
| Grasp Planning | 100ms | Pre-compute common grasps |
| Motion Planning | 500ms | Cache trajectories |
| Execution | 5000ms | (Physical robot motion) |
| Total | 8150ms | 6630ms (optimized) |
Parallel Processing
import concurrent.futures
class OptimizedVLASystem(CompleteVLASystem):
def execute_plan_parallel(self, plan):
"""
Execute independent steps in parallel
Example: While navigating, start detecting objects
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = []
for step in plan:
# Submit step to thread pool
future = executor.submit(self.execute_step, step)
futures.append((step, future))
# Wait for completion
for step, future in futures:
try:
result = future.result(timeout=30)
self.get_logger().info(f"Step completed: {step}")
except Exception as e:
self.get_logger().error(f"Step failed: {step}, {e}")
def prefetch_perception(self):
"""Continuously update scene graph in background"""
while rclpy.ok():
if self.latest_image is not None:
# Run detection
detections = self.object_detector(self.latest_image)
# Update scene graph
for det in detections:
self.scene_graph[det['class']] = det
time.sleep(0.1) # 10 Hz update rate
Deployment Checklist
Hardware Requirements:
- Humanoid robot (e.g., Unitree H1, Figure 01)
- RGB-D camera (RealSense D435i)
- Microphone array
- Compute: Jetson Orin (edge) or Desktop GPU (RTX 4090)
Software Stack:
- ROS 2 Humble/Iron
- Isaac ROS (GPU acceleration)
- MoveIt 2 (motion planning)
- Nav2 (navigation)
- Whisper (voice recognition)
- Llama 3 or GPT-4 (planning)
- YOLOv8 (object detection)
Calibration:
- Camera intrinsics and extrinsics
- Robot URDF with accurate dimensions
- Joint offsets and limits
- Gripper force calibration
Safety:
- E-stop button accessible
- Joint torque limits configured
- Collision detection enabled
- Workspace boundaries defined
Exercises
- Voice-to-Action: Implement complete pipeline from voice command to robot motion
- Multi-Step Task: Create task plan for "Set the table" with navigation, grasping, and placement
- Error Recovery: Add replanning when object detection fails
- Performance: Benchmark and optimize end-to-end latency
- Real Robot: Deploy on physical humanoid and test household tasks
Summary
A complete VLA system integrates voice recognition, language planning, vision pipelines, locomotion, and manipulation into an end-to-end workflow. LLMs decompose natural language commands into executable action sequences. Vision provides real-time scene understanding. Motion planners and controllers execute physical actions. Error recovery and replanning handle failures. Optimization techniques reduce latency for real-time performance. This unified architecture enables humanoid robots to perform complex, multi-step tasks in unstructured environments.