Octo: Open-Source Generalist Robot Policy¶
Octo is a versatile, open-source transformer-based robot policy trained on diverse datasets for cross-embodiment transfer.
Paper: "Octo: An Open-Source Generalist Robot Policy" (Team et al., 2024)
GitHub: https://github.com/octo-models/octo
Models: https://huggingface.co/rail-berkeley/octo
Overview¶
Octo addresses the challenge of building generalist robot policies that work across different robots and tasks:
- ✓ Open-source: Fully available weights, code, and training data
- ✓ Cross-embodiment: Trained on 800K+ trajectories from 9+ robot types
- ✓ Flexible: Supports goal images, language, both, or neither
- ✓ Efficient: 93M parameters (much smaller than RT-2/OpenVLA)
- ✓ Fast fine-tuning: Adapts with 50-200 demonstrations
Architecture¶
Octo uses a transformer-based architecture optimized for robotics:
graph TD
A[Images<br/>224x224] --> B[ResNet-18<br/>Vision Encoder]
C[Language/Goal Image] --> D[T5<br/>Language Encoder]
B --> E[Transformer<br/>8 layers]
D --> E
F[Proprio State] --> E
E --> G[Diffusion Head<br/>Action Chunking]
G --> H[Action Chunk<br/>4 steps]
Key Design Choices¶
1. Lightweight Vision Encoder¶
Unlike VLA models that use massive vision encoders (DINOv2, SigLIP), Octo uses ResNet-18:
class OctoVisionEncoder(nn.Module):
"""Lightweight vision encoder for Octo"""
def __init__(self):
super().__init__()
# ResNet-18 (much smaller than ViT-L)
self.resnet = torchvision.models.resnet18(pretrained=True)
# Remove classification head
self.resnet = nn.Sequential(*list(self.resnet.children())[:-1])
# Project to transformer dimension
self.projection = nn.Linear(512, 512)
def forward(self, images):
"""
Args:
images: (B, 3, 224, 224) RGB images
Returns:
features: (B, 512) visual features
"""
features = self.resnet(images)
features = features.flatten(1)
features = self.projection(features)
return features
Why ResNet-18? - Much faster inference (important for real-time control) - Sufficient for manipulation tasks (not as complex as general vision) - Enables deployment on edge devices
2. Flexible Task Specification¶
Octo supports multiple ways to specify tasks:
class OctoTaskEncoder(nn.Module):
"""Encode tasks from language, goal images, or both"""
def __init__(self):
super().__init__()
# Language encoder (T5)
self.language_encoder = T5EncoderModel.from_pretrained('t5-base')
# Goal image encoder (same as observation encoder)
self.goal_encoder = OctoVisionEncoder()
# Fusion
self.fusion = nn.Linear(512 + 768, 512)
def forward(self, language=None, goal_image=None):
"""
Flexible task encoding
Args:
language: Optional language instruction
goal_image: Optional goal image
Returns:
task_embedding: (B, 512)
"""
embeddings = []
if language is not None:
lang_embed = self.language_encoder(language).last_hidden_state.mean(dim=1)
embeddings.append(lang_embed)
if goal_image is not None:
goal_embed = self.goal_encoder(goal_image)
embeddings.append(goal_embed)
if len(embeddings) == 0:
# No task specification - use learned null embedding
return self.null_embedding.expand(batch_size, -1)
elif len(embeddings) == 1:
return embeddings[0]
else:
# Both language and goal image
combined = torch.cat(embeddings, dim=1)
return self.fusion(combined)
3. Transformer Backbone¶
Octo uses a standard transformer to process temporal observations:
class OctoTransformer(nn.Module):
"""Transformer backbone for Octo"""
def __init__(self, dim=512, depth=8, heads=8, context_length=32):
super().__init__()
self.dim = dim
self.context_length = context_length
# Positional encoding
self.pos_embed = nn.Parameter(torch.randn(1, context_length, dim))
# Transformer layers
self.layers = nn.ModuleList([
TransformerBlock(dim=dim, heads=heads)
for _ in range(depth)
])
self.norm = nn.LayerNorm(dim)
def forward(self, tokens):
"""
Args:
tokens: (B, T, dim) - sequence of observation/task tokens
Returns:
output: (B, T, dim) - processed tokens
"""
B, T, _ = tokens.shape
# Add positional encoding
tokens = tokens + self.pos_embed[:, :T, :]
# Transformer layers
for layer in self.layers:
tokens = layer(tokens)
return self.norm(tokens)
class TransformerBlock(nn.Module):
"""Standard transformer block"""
def __init__(self, dim, heads):
super().__init__()
self.attention = nn.MultiheadAttention(dim, heads)
self.feed_forward = nn.Sequential(
nn.Linear(dim, dim * 4),
nn.GELU(),
nn.Linear(dim * 4, dim)
)
self.norm1 = nn.LayerNorm(dim)
self.norm2 = nn.LayerNorm(dim)
def forward(self, x):
# Self-attention
x_norm = self.norm1(x)
attn_out, _ = self.attention(x_norm, x_norm, x_norm)
x = x + attn_out
# Feed-forward
x = x + self.feed_forward(self.norm2(x))
return x
4. Diffusion Action Head¶
Octo predicts actions using a diffusion model (inspired by Diffusion Policy):
class OctoDiffusionHead(nn.Module):
"""Diffusion-based action prediction with chunking"""
def __init__(self, obs_dim, action_dim, action_horizon=4, diffusion_steps=10):
super().__init__()
self.action_dim = action_dim
self.action_horizon = action_horizon
self.diffusion_steps = diffusion_steps
# Conditioning network
self.obs_encoder = nn.Sequential(
nn.Linear(obs_dim, 256),
nn.ReLU(),
nn.Linear(256, 256)
)
# Noise prediction network (U-Net style)
self.noise_pred_net = ConditionalUNet1D(
input_dim=action_dim,
global_cond_dim=256,
diffusion_step_embed_dim=64,
down_dims=[128, 256],
n_groups=8
)
# Noise schedule
self.betas = self.make_beta_schedule()
self.alphas = 1 - self.betas
self.alpha_bars = torch.cumprod(self.alphas, dim=0)
def forward(self, obs_features, actions=None):
"""
Training: denoise ground-truth actions
Inference: generate actions from noise
Args:
obs_features: (B, obs_dim) - observation encoding
actions: (B, action_horizon, action_dim) - ground truth (training only)
Returns:
If training: loss
If inference: predicted actions
"""
if actions is not None:
# Training: predict noise
return self.compute_loss(obs_features, actions)
else:
# Inference: sample actions
return self.sample_actions(obs_features)
def compute_loss(self, obs_features, actions):
"""Diffusion training loss"""
B, T, D = actions.shape
# Random timestep for each sample
timesteps = torch.randint(
0, self.diffusion_steps, (B,),
device=actions.device
)
# Add noise to actions
noise = torch.randn_like(actions)
alpha_bar = self.alpha_bars[timesteps].view(-1, 1, 1)
noisy_actions = (
torch.sqrt(alpha_bar) * actions +
torch.sqrt(1 - alpha_bar) * noise
)
# Encode observations
obs_cond = self.obs_encoder(obs_features)
# Predict noise
predicted_noise = self.noise_pred_net(
noisy_actions,
timesteps,
global_cond=obs_cond
)
# MSE loss
loss = F.mse_loss(predicted_noise, noise)
return loss
@torch.no_grad()
def sample_actions(self, obs_features, num_samples=1):
"""Sample action chunk using DDPM"""
B = obs_features.shape[0]
# Start from random noise
actions = torch.randn(
B, self.action_horizon, self.action_dim,
device=obs_features.device
)
# Encode observations
obs_cond = self.obs_encoder(obs_features)
# Iterative denoising
for t in reversed(range(self.diffusion_steps)):
timesteps = torch.full((B,), t, device=actions.device)
# Predict noise
predicted_noise = self.noise_pred_net(
actions,
timesteps,
global_cond=obs_cond
)
# Denoise
alpha = self.alphas[t]
alpha_bar = self.alpha_bars[t]
if t > 0:
noise = torch.randn_like(actions)
else:
noise = 0
actions = (1 / torch.sqrt(alpha)) * (
actions - ((1 - alpha) / torch.sqrt(1 - alpha_bar)) * predicted_noise
) + torch.sqrt(self.betas[t]) * noise
return actions
def make_beta_schedule(self):
"""Linear beta schedule"""
return torch.linspace(1e-4, 0.02, self.diffusion_steps)
Training Data¶
Octo is trained on the Open X-Embodiment dataset:
- Size: 800K+ trajectories
- Robots: Franka, UR5, xArm, Google Robot, etc.
- Tasks: Manipulation, navigation, mobile manipulation
- Environments: Labs, kitchens, outdoor
class OpenXDataset(torch.utils.data.Dataset):
"""Open X-Embodiment dataset loader"""
def __init__(self, data_dir, robots=None, transform=None):
self.data_dir = Path(data_dir)
self.transform = transform
# Load trajectories
self.trajectories = []
for traj_file in self.data_dir.glob("**/*.pkl"):
with open(traj_file, 'rb') as f:
traj = pickle.load(f)
# Filter by robot if specified
if robots is None or traj['robot'] in robots:
self.trajectories.append(traj)
print(f"Loaded {len(self.trajectories)} trajectories")
def __len__(self):
return len(self.trajectories)
def __getitem__(self, idx):
traj = self.trajectories[idx]
# Sample random window
T = len(traj['observations'])
start_idx = np.random.randint(0, max(1, T - 32))
end_idx = min(start_idx + 32, T)
# Extract window
observations = traj['observations'][start_idx:end_idx]
actions = traj['actions'][start_idx:end_idx]
# Apply transforms
if self.transform:
observations = self.transform(observations)
return {
'observations': observations,
'actions': actions,
'language': traj.get('language', ''),
'robot': traj['robot']
}
Training Procedure¶
def train_octo(config):
"""Train Octo model"""
# Initialize model
model = OctoModel(config)
model = model.cuda()
# Load dataset
dataset = OpenXDataset(config.data_dir)
dataloader = DataLoader(
dataset,
batch_size=config.batch_size,
num_workers=config.num_workers,
shuffle=True,
pin_memory=True
)
# Optimizer
optimizer = torch.optim.AdamW(
model.parameters(),
lr=config.learning_rate,
weight_decay=config.weight_decay
)
# Learning rate scheduler
scheduler = get_cosine_schedule_with_warmup(
optimizer,
num_warmup_steps=config.warmup_steps,
num_training_steps=config.max_steps
)
# Training loop
global_step = 0
for epoch in range(config.num_epochs):
for batch in dataloader:
# Move to GPU
batch = {k: v.cuda() if isinstance(v, torch.Tensor) else v
for k, v in batch.items()}
# Forward pass
loss = model.compute_loss(batch)
# Backward pass
optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), config.grad_clip)
optimizer.step()
scheduler.step()
global_step += 1
# Logging
if global_step % 100 == 0:
print(f"Step {global_step}: Loss = {loss.item():.4f}, "
f"LR = {scheduler.get_last_lr()[0]:.2e}")
# Checkpointing
if global_step % 10000 == 0:
save_checkpoint(model, optimizer, global_step, config.checkpoint_dir)
return model
Fine-Tuning for Your Robot¶
Octo excels at few-shot adaptation:
from octo.model import OctoModel
# Load pre-trained Octo
model = OctoModel.load_pretrained('octo-base')
# Prepare your robot data (50-200 demos)
your_dataset = YourRobotDataset(num_demos=100)
# Fine-tune with LoRA for efficiency
from peft import LoraConfig, get_peft_model
lora_config = LoraConfig(
r=16, # Rank
lora_alpha=32,
target_modules=["q_proj", "v_proj"], # Transformer attention
lora_dropout=0.05
)
model = get_peft_model(model, lora_config)
# Fine-tune
finetune_octo(model, your_dataset, num_epochs=20, lr=1e-4)
Fine-Tuning Results¶
| Demos | Success Rate | Training Time |
|---|---|---|
| 0 (zero-shot) | 35% | 0 |
| 50 | 68% | ~30 min |
| 100 | 79% | ~1 hour |
| 200 | 87% | ~2 hours |
Deployment¶
Inference¶
import torch
from octo.model import OctoModel
# Load model
model = OctoModel.load_pretrained('octo-base')
model.eval()
model = model.cuda()
@torch.no_grad()
def predict_action(model, observation, language_instruction):
"""
Predict action from current observation
Args:
observation: dict with 'image' and 'state'
language_instruction: str
Returns:
action: (action_dim,) numpy array
"""
# Preprocess
image = preprocess_image(observation['image'])
state = torch.tensor(observation['state']).float()
language = model.tokenizer(language_instruction)
# Move to GPU
image = image.cuda()
state = state.cuda()
# Predict action chunk
action_chunk = model.predict(
image=image.unsqueeze(0),
state=state.unsqueeze(0),
language=language
) # (1, action_horizon, action_dim)
# Return first action
action = action_chunk[0, 0].cpu().numpy()
return action
# Control loop
obs = env.reset()
for t in range(100):
action = predict_action(model, obs, "pick up the red block")
obs, reward, done, info = env.step(action)
if done:
break
Optimization for Real-Time Control¶
# 1. Compile model (PyTorch 2.0+)
model = torch.compile(model, mode='reduce-overhead')
# 2. Use half precision
model = model.half()
# 3. Reduce diffusion steps
model.diffusion_head.diffusion_steps = 5 # Faster, slight accuracy drop
# 4. Batch inference if controlling multiple robots
actions = model.predict(
image=images, # (num_robots, 3, 224, 224)
state=states,
language=["pick red block"] * num_robots
)
Comparison with Other Models¶
| Model | Params | Open | Multi-Robot | Action Pred | Inference |
|---|---|---|---|---|---|
| RT-1 | 35M | ✗ | ✗ | Categorical | ~3 Hz |
| RT-2 | 5B-562B | ✗ | ✗ | Categorical | ~1 Hz |
| OpenVLA | 7B | ✓ | ✓ | Categorical | ~8 Hz |
| Octo | 93M | ✓ | ✓ | Diffusion | ~10 Hz |
Key Advantages¶
Compared to OpenVLA/RT-2: - ✓ Faster inference (93M vs 7B parameters) - ✓ Better action precision (diffusion vs categorical) - ✓ Easier to deploy (smaller model)
Compared to RT-1: - ✓ Cross-embodiment (trained on 9+ robots) - ✓ Flexible task specification (language + goal images) - ✓ Better sample efficiency (transfer learning)
Limitations¶
- ✗Weaker language understanding (vs RT-2/OpenVLA with large LLMs)
- ✗Lower zero-shot performance (vs RT-2-PaLM-E)
- ✗Requires more demos for new tasks (vs OpenVLA)
Best Use Cases¶
Use Octo when: - Real-time control is critical (need >5 Hz) - Have 50-200 demonstrations available - Deploying on edge devices with limited compute - Task specification via goal images is sufficient - Need precise, multi-modal action distributions
Use OpenVLA/RT-2 when: - Zero-shot generalization is critical - Complex language understanding needed - Can tolerate slower inference - Have access to GPUs for deployment
Resources¶
- Code: https://github.com/octo-models/octo
- Models: https://huggingface.co/rail-berkeley/octo
- Paper: https://octo-models.github.io
- Colab Tutorial: https://colab.research.google.com/github/octo-models/octo/blob/main/examples/octo_example.ipynb
Next Steps¶
- OpenVLA - Compare with OpenVLA approach
- Diffusion Policies - Deep dive into diffusion for actions
- Fine-tuning - Adapt Octo to your robot
- Deployment - Deploy on edge devices