Skip to main content

Full VLA Workflow

Introduction

This chapter integrates all VLA components - voice recognition, language planning, vision pipelines, humanoid kinematics, and manipulation - into a complete end-to-end system. We'll build a household humanoid robot that can execute complex multi-step tasks from natural language commands.

Learning Objectives:

  • Integrate all VLA subsystems into unified architecture
  • Handle complete task workflows from command to execution
  • Implement error recovery and replanning
  • Deploy VLA system on real humanoid hardware
  • Benchmark and optimize system performance

Complete VLA Architecture

System Overview

┌────────────────────────────────────────────────────────────────┐
│ User: "Clean the table" │
└────────────────────────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────────────┐
│ Voice Interface (Whisper) │
│ Input: Audio → Output: Text │
└────────────────────────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────────────┐
│ Language Planner (GPT-4 / Llama 3) │
│ Input: Text → Output: Task Plan │
│ [navigate(table), pick(cup), place(cup, tray), wipe(table)] │
└────────────────────────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────────────┐
│ Perception Pipeline (YOLO + VSLAM + Depth) │
│ Input: RGB-D Images → Output: Scene Graph │
│ {table: pose, cup: pose, tray: pose, obstacles: [...]} │
└────────────────────────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────────────┐
│ Task Executor │
│ For each action in plan: │
│ - Navigation: ROS Nav2 + VSLAM │
│ - Manipulation: Grasp Planning + MoveIt │
│ - Whole-Body Control: Locomotion + Arms │
└────────────────────────────────────────────────────────────────┘

┌────────────────────────────────────────────────────────────────┐
│ Robot Hardware (Humanoid) │
│ - Motors: Joint control │
│ - Sensors: Cameras, IMU, Force/Torque │
│ - Feedback: Success/Failure + Observations │
└────────────────────────────────────────────────────────────────┘

Implementation

Main VLA System

import rclpy
from rclpy.node import Node
import whisper
from transformers import AutoModelForCausalLM, AutoTokenizer
from ultralytics import YOLO
import numpy as np

class CompleteVLASystem(Node):
def __init__(self):
super().__init__('vla_system')

# 1. Voice Interface
self.whisper_model = whisper.load_model("base")

# 2. Language Planner
self.llm = AutoModelForCausalLM.from_pretrained("meta-llama/Llama-3-8B")
self.tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3-8B")

# 3. Vision Pipeline
self.object_detector = YOLO("yolov8n.pt")

# 4. Controllers
self.navigation = NavigationController()
self.manipulation = ManipulationController()
self.locomotion = HumanoidController()

# ROS subscriptions
self.image_sub = self.create_subscription(
Image, '/camera/image_raw', self.image_callback, 10
)

# State
self.latest_image = None
self.scene_graph = {}

self.get_logger().info("Complete VLA system initialized")

def execute_voice_command(self, audio_data):
"""
End-to-end pipeline: Voice → Action

Example: User says "Bring me the red cup from the kitchen"
"""
# Step 1: Voice → Text
text_command = self.voice_to_text(audio_data)
self.get_logger().info(f"Recognized command: {text_command}")

# Step 2: Text → Task Plan
task_plan = self.language_to_plan(text_command)
self.get_logger().info(f"Generated plan: {task_plan}")

# Step 3: Execute Plan
success = self.execute_plan(task_plan)

if success:
self.get_logger().info("Task completed successfully!")
self.speak("Task completed!")
else:
self.get_logger().error("Task failed")
self.speak("Sorry, I couldn't complete the task")

return success

def voice_to_text(self, audio_data):
"""Convert speech to text with Whisper"""
result = self.whisper_model.transcribe(audio_data)
return result['text']

def language_to_plan(self, command: str):
"""
Use LLM to decompose command into executable steps

Input: "Bring me the red cup from the kitchen"
Output: [
{'action': 'navigate', 'location': 'kitchen'},
{'action': 'detect', 'object': 'red cup'},
{'action': 'pick', 'object': 'red cup'},
{'action': 'navigate', 'location': 'user'},
{'action': 'place', 'object': 'red cup', 'location': 'user'}
]
"""
prompt = f"""
You are a robot task planner. Break down this command into steps.

Command: "{command}"

Available actions:
- navigate(location)
- detect(object)
- pick(object)
- place(object, location)
- say(message)

Output JSON plan:
"""

inputs = self.tokenizer(prompt, return_tensors="pt")
outputs = self.llm.generate(**inputs, max_length=300)
response = self.tokenizer.decode(outputs[0])

# Parse JSON from response
plan = self.parse_plan_json(response)
return plan

def execute_plan(self, plan):
"""Execute each step in the plan"""
for step_idx, step in enumerate(plan):
self.get_logger().info(f"Executing step {step_idx + 1}/{len(plan)}: {step}")

action = step['action']

try:
if action == 'navigate':
self.execute_navigation(step['location'])

elif action == 'detect':
self.execute_detection(step['object'])

elif action == 'pick':
self.execute_pick(step['object'])

elif action == 'place':
self.execute_place(step['object'], step.get('location'))

elif action == 'say':
self.speak(step['message'])

else:
self.get_logger().error(f"Unknown action: {action}")
return False

except Exception as e:
self.get_logger().error(f"Step failed: {e}")

# Attempt recovery
if self.attempt_recovery(step, e):
self.get_logger().info("Recovery successful")
continue
else:
return False

return True

def execute_navigation(self, location: str):
"""Navigate to named location"""
# Convert location name to coordinates
location_map = {
'kitchen': (5.0, 2.0, 0.0),
'living_room': (0.0, 0.0, 0.0),
'user': (1.0, 1.0, 0.0)
}

if location not in location_map:
raise ValueError(f"Unknown location: {location}")

target_pose = location_map[location]

# Use Nav2 to navigate
self.navigation.navigate_to_pose(target_pose)

# Wait for completion
while not self.navigation.is_goal_reached():
rclpy.spin_once(self, timeout_sec=0.1)

def execute_detection(self, object_name: str):
"""Detect object in scene"""
if self.latest_image is None:
raise RuntimeError("No camera image available")

# Run object detection
detections = self.object_detector(self.latest_image)

# Find matching object
for det in detections:
if object_name.lower() in det['class'].lower():
# Estimate 3D pose
bbox = det['bbox']
depth_value = self.get_depth_at_bbox(bbox)

object_pose_3d = self.bbox_to_3d_pose(bbox, depth_value)

# Store in scene graph
self.scene_graph[object_name] = {
'pose': object_pose_3d,
'bbox': bbox,
'confidence': det['confidence']
}

self.get_logger().info(f"Detected {object_name} at {object_pose_3d}")
return

raise ValueError(f"Could not detect {object_name}")

def execute_pick(self, object_name: str):
"""Pick up object"""
if object_name not in self.scene_graph:
raise ValueError(f"Object {object_name} not in scene graph")

object_pose = self.scene_graph[object_name]['pose']

# 1. Plan grasp
grasp_pose = self.manipulation.plan_grasp(object_pose)

# 2. Reach to pre-grasp pose
pre_grasp_pose = grasp_pose.copy()
pre_grasp_pose[2] += 0.10 # 10cm above

self.manipulation.move_to_pose(pre_grasp_pose)

# 3. Approach
self.manipulation.move_to_pose(grasp_pose)

# 4. Close gripper
self.manipulation.close_gripper()

# 5. Lift
lift_pose = grasp_pose.copy()
lift_pose[2] += 0.15
self.manipulation.move_to_pose(lift_pose)

self.get_logger().info(f"Picked {object_name}")

def execute_place(self, object_name: str, location: str):
"""Place object at location"""
# Get placement location
if location in self.scene_graph:
place_pose = self.scene_graph[location]['pose']
else:
# Named location (e.g., "user")
place_pose = self.get_named_location(location)

# Adjust for placement height
place_pose[2] += 0.05 # 5cm above surface

# 1. Move to placement position
self.manipulation.move_to_pose(place_pose)

# 2. Open gripper
self.manipulation.open_gripper()

# 3. Retract
retract_pose = place_pose.copy()
retract_pose[2] += 0.10
self.manipulation.move_to_pose(retract_pose)

self.get_logger().info(f"Placed {object_name} at {location}")

def attempt_recovery(self, failed_step, error):
"""
Error recovery with LLM replanning

Example errors:
- Navigation blocked: Find alternate path
- Object not detected: Search in wider area
- Grasp failed: Try different grasp angle
"""
prompt = f"""
Step failed: {failed_step}
Error: {error}

Suggest recovery action:
"""

inputs = self.tokenizer(prompt, return_tensors="pt")
outputs = self.llm.generate(**inputs, max_length=100)
recovery_plan = self.tokenizer.decode(outputs[0])

self.get_logger().info(f"Recovery plan: {recovery_plan}")

# Execute recovery
# (Simplified - real implementation would parse and execute recovery steps)
return False

def speak(self, text: str):
"""Text-to-speech feedback"""
from gtts import gTTS
import os

tts = gTTS(text=text, lang='en')
tts.save("/tmp/speech.mp3")
os.system("mpg321 /tmp/speech.mp3")

def image_callback(self, msg):
"""Store latest camera image"""
self.latest_image = self.bridge.imgmsg_to_cv2(msg, "bgr8")


def main():
rclpy.init()
vla_system = CompleteVLASystem()

# Listen for voice commands
voice_interface = VoiceInterface()

while rclpy.ok():
# Wait for voice command
print("Listening for command...")
audio = voice_interface.listen(duration=5)

if audio is not None:
# Execute command
vla_system.execute_voice_command(audio)

# Spin ROS
rclpy.spin_once(vla_system, timeout_sec=0.1)

if __name__ == '__main__':
main()

Real-World Example: Kitchen Assistant

Complete Task Workflow

User Command: "Clean up the kitchen - put dirty dishes in the sink and wipe the counter"

System Execution:

class KitchenAssistant(CompleteVLASystem):
def clean_kitchen_workflow(self):
"""
Multi-step task execution:
1. Navigate to kitchen
2. Detect all objects on counter
3. Classify objects (dishes vs non-dishes)
4. Pick each dish and place in sink
5. Grasp cleaning cloth
6. Wipe counter with dual-arm coordination
7. Return to starting position
"""

# Step 1: Navigate to kitchen
self.get_logger().info("Navigating to kitchen...")
self.execute_navigation('kitchen')

# Step 2: Detect objects
self.get_logger().info("Detecting objects on counter...")
counter_objects = self.detect_objects_in_region(
region_name='counter',
region_bounds={'x': [0, 1.5], 'y': [-0.5, 0.5]}
)

# Step 3: Classify objects
dishes = []
other_objects = []

for obj in counter_objects:
if self.is_dish(obj):
dishes.append(obj)
else:
other_objects.append(obj)

self.get_logger().info(f"Found {len(dishes)} dishes, {len(other_objects)} other objects")

# Step 4: Move dishes to sink
for dish in dishes:
try:
self.execute_pick(dish['name'])
self.execute_place(dish['name'], 'sink')
except Exception as e:
self.get_logger().warn(f"Failed to move {dish['name']}: {e}")
continue

# Step 5: Grasp cleaning cloth
self.execute_pick('cloth')

# Step 6: Wipe counter
self.wipe_surface(
surface='counter',
wiping_pattern='zigzag',
num_passes=3
)

# Step 7: Return home
self.execute_navigation('home')

self.speak("Kitchen cleaned!")

def is_dish(self, object_dict):
"""Classify if object is a dish using vision model"""
object_class = object_dict['class'].lower()
dish_keywords = ['plate', 'bowl', 'cup', 'mug', 'glass', 'utensil']

return any(keyword in object_class for keyword in dish_keywords)

def wipe_surface(self, surface: str, wiping_pattern: str, num_passes: int):
"""
Wipe surface with cloth using dual-arm coordination

Args:
surface: Surface to wipe (e.g., 'counter')
wiping_pattern: 'zigzag', 'circular', or 'linear'
num_passes: Number of wiping passes
"""
surface_bounds = self.scene_graph[surface]['bounds']

# Generate wiping trajectory
waypoints = self.generate_wiping_trajectory(
surface_bounds, wiping_pattern, num_passes
)

# Execute wiping motion
for waypoint in waypoints:
# Move arm while applying downward force
self.manipulation.move_with_force_control(
target_pose=waypoint,
force_direction=[0, 0, -1],
force_magnitude=5.0 # 5N downward
)

def generate_wiping_trajectory(self, bounds, pattern, num_passes):
"""Generate waypoints for wiping motion"""
waypoints = []

x_min, x_max = bounds['x']
y_min, y_max = bounds['y']
z = bounds['z'] + 0.05 # 5cm above surface

if pattern == 'zigzag':
for i in range(num_passes):
y = y_min + (y_max - y_min) * i / (num_passes - 1)

# Forward pass
if i % 2 == 0:
waypoints.append([x_min, y, z])
waypoints.append([x_max, y, z])
# Backward pass
else:
waypoints.append([x_max, y, z])
waypoints.append([x_min, y, z])

return waypoints

# Usage
assistant = KitchenAssistant()
assistant.clean_kitchen_workflow()

Performance Optimization

Latency Breakdown

Typical End-to-End Latency:

ComponentLatencyOptimization
Voice Recognition500msUse Whisper-small (300ms)
LLM Planning2000msQuantize to 4-bit (800ms)
Object Detection50msTensorRT (30ms)
Grasp Planning100msPre-compute common grasps
Motion Planning500msCache trajectories
Execution5000ms(Physical robot motion)
Total8150ms6630ms (optimized)

Parallel Processing

import concurrent.futures

class OptimizedVLASystem(CompleteVLASystem):
def execute_plan_parallel(self, plan):
"""
Execute independent steps in parallel

Example: While navigating, start detecting objects
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = []

for step in plan:
# Submit step to thread pool
future = executor.submit(self.execute_step, step)
futures.append((step, future))

# Wait for completion
for step, future in futures:
try:
result = future.result(timeout=30)
self.get_logger().info(f"Step completed: {step}")
except Exception as e:
self.get_logger().error(f"Step failed: {step}, {e}")

def prefetch_perception(self):
"""Continuously update scene graph in background"""
while rclpy.ok():
if self.latest_image is not None:
# Run detection
detections = self.object_detector(self.latest_image)

# Update scene graph
for det in detections:
self.scene_graph[det['class']] = det

time.sleep(0.1) # 10 Hz update rate

Deployment Checklist

Hardware Requirements:

  • Humanoid robot (e.g., Unitree H1, Figure 01)
  • RGB-D camera (RealSense D435i)
  • Microphone array
  • Compute: Jetson Orin (edge) or Desktop GPU (RTX 4090)

Software Stack:

  • ROS 2 Humble/Iron
  • Isaac ROS (GPU acceleration)
  • MoveIt 2 (motion planning)
  • Nav2 (navigation)
  • Whisper (voice recognition)
  • Llama 3 or GPT-4 (planning)
  • YOLOv8 (object detection)

Calibration:

  • Camera intrinsics and extrinsics
  • Robot URDF with accurate dimensions
  • Joint offsets and limits
  • Gripper force calibration

Safety:

  • E-stop button accessible
  • Joint torque limits configured
  • Collision detection enabled
  • Workspace boundaries defined

Exercises

  1. Voice-to-Action: Implement complete pipeline from voice command to robot motion
  2. Multi-Step Task: Create task plan for "Set the table" with navigation, grasping, and placement
  3. Error Recovery: Add replanning when object detection fails
  4. Performance: Benchmark and optimize end-to-end latency
  5. Real Robot: Deploy on physical humanoid and test household tasks

Summary

A complete VLA system integrates voice recognition, language planning, vision pipelines, locomotion, and manipulation into an end-to-end workflow. LLMs decompose natural language commands into executable action sequences. Vision provides real-time scene understanding. Motion planners and controllers execute physical actions. Error recovery and replanning handle failures. Optimization techniques reduce latency for real-time performance. This unified architecture enables humanoid robots to perform complex, multi-step tasks in unstructured environments.

Further Reading