Skip to content

Data Quality for Robot Learning

Data quality is the single most important factor determining the success of data-driven robot learning.

Why Data Quality Matters

Impact on Performance: - 100 high-quality demos often outperform 1000 low-quality demos - Quality issues compound through training - Garbage in = garbage out (especially for imitation learning)

Common Quality Issues: - Incomplete task executions - Sensor noise and artifacts - Synchronization errors between modalities - Annotation errors - Distribution shift - Duplicate or near-duplicate data

Quality Metrics

1. Task Success Rate

Most fundamental metric - did the demonstration complete the task?

class TaskSuccessChecker:
    """Check if demonstrations successfully complete tasks"""
    def __init__(self, task_spec):
        self.task_spec = task_spec

    def check_success(self, demo):
        """Verify task completion"""
        final_state = demo[-1]['observation']

        # Task-specific checks
        if self.task_spec['task'] == 'pick_and_place':
            return self.check_pick_and_place(demo, final_state)
        elif self.task_spec['task'] == 'drawer_opening':
            return self.check_drawer_opening(demo, final_state)
        elif self.task_spec['task'] == 'wiping':
            return self.check_wiping(demo, final_state)
        else:
            raise NotImplementedError(f"Task {self.task_spec['task']} not implemented")

    def check_pick_and_place(self, demo, final_state):
        """Check pick and place success"""
        # 1. Object grasped at some point
        grasp_detected = any(
            step['observation'].get('gripper_force', 0) > 0.5
            for step in demo
        )

        # 2. Object in target region at end
        if 'object_pos' in final_state and 'target_pos' in self.task_spec:
            obj_pos = np.array(final_state['object_pos'])
            target_pos = np.array(self.task_spec['target_pos'])
            distance = np.linalg.norm(obj_pos - target_pos)

            in_target = distance < self.task_spec.get('target_radius', 0.05)
        else:
            in_target = False

        # 3. Gripper open at end (object released)
        gripper_open = final_state.get('gripper_state', 0) < 0.1

        success = grasp_detected and in_target and gripper_open

        return {
            'success': success,
            'grasp_detected': grasp_detected,
            'in_target': in_target,
            'gripper_open': gripper_open
        }

    def check_drawer_opening(self, demo, final_state):
        """Check drawer opening success"""
        initial_state = demo[0]['observation']

        initial_drawer_pos = initial_state.get('drawer_position', 0)
        final_drawer_pos = final_state.get('drawer_position', 0)

        # Drawer opened by at least threshold
        opened = (final_drawer_pos - initial_drawer_pos) > self.task_spec.get('open_threshold', 0.1)

        return {
            'success': opened,
            'drawer_displacement': final_drawer_pos - initial_drawer_pos
        }

    def check_wiping(self, demo, final_state):
        """Check wiping/cleaning success"""
        # Check coverage of target area
        if 'ee_trajectory' in demo:
            trajectory = np.array([
                step['observation']['ee_pos'][:2]  # x, y only
                for step in demo
            ])

            coverage = self.compute_area_coverage(
                trajectory,
                self.task_spec['target_area']
            )

            success = coverage > self.task_spec.get('coverage_threshold', 0.8)

            return {
                'success': success,
                'coverage': coverage
            }

        return {'success': False, 'coverage': 0.0}

    def compute_area_coverage(self, trajectory, target_area):
        """Compute fraction of target area covered by trajectory"""
        # Discretize target area into grid
        grid_size = 50
        x_min, x_max, y_min, y_max = target_area
        x_bins = np.linspace(x_min, x_max, grid_size)
        y_bins = np.linspace(y_min, y_max, grid_size)

        covered = np.zeros((grid_size, grid_size), dtype=bool)

        # Mark cells along trajectory
        for point in trajectory:
            x_idx = np.digitize(point[0], x_bins) - 1
            y_idx = np.digitize(point[1], y_bins) - 1

            if 0 <= x_idx < grid_size and 0 <= y_idx < grid_size:
                covered[x_idx, y_idx] = True

        return covered.mean()

2. Trajectory Smoothness

Smooth trajectories indicate confident, controlled execution:

def compute_trajectory_metrics(demo):
    """Compute trajectory quality metrics"""
    actions = np.array([step['action'] for step in demo])

    metrics = {}

    # Velocity (first derivative)
    velocity = np.diff(actions, axis=0)
    metrics['avg_velocity'] = np.linalg.norm(velocity, axis=1).mean()
    metrics['max_velocity'] = np.linalg.norm(velocity, axis=1).max()

    # Acceleration (second derivative)
    acceleration = np.diff(actions, n=2, axis=0)
    metrics['avg_acceleration'] = np.linalg.norm(acceleration, axis=1).mean()
    metrics['max_acceleration'] = np.linalg.norm(acceleration, axis=1).max()

    # Jerk (third derivative) - smoothness indicator
    jerk = np.diff(actions, n=3, axis=0)
    metrics['avg_jerk'] = np.linalg.norm(jerk, axis=1).mean()
    metrics['max_jerk'] = np.linalg.norm(jerk, axis=1).max()

    # Spectral arc length (frequency domain smoothness)
    metrics['spectral_arc_length'] = compute_spectral_arc_length(actions)

    # Path efficiency
    if 'ee_pos' in demo[0]['observation']:
        ee_trajectory = np.array([
            step['observation']['ee_pos']
            for step in demo
        ])
        metrics['path_efficiency'] = compute_path_efficiency(ee_trajectory)

    return metrics

def compute_spectral_arc_length(trajectory):
    """Compute spectral arc length (lower = smoother)"""
    # FFT of trajectory
    freqs = np.fft.fft(trajectory, axis=0)

    # Magnitude spectrum
    magnitude = np.abs(freqs)

    # Spectral arc length
    arc_length = 0
    for i in range(1, len(magnitude)):
        arc_length += np.sqrt(
            1 + (magnitude[i] - magnitude[i-1])**2
        ).mean()

    return arc_length

def compute_path_efficiency(trajectory):
    """Ratio of direct distance to actual path length"""
    # Direct distance from start to end
    direct_distance = np.linalg.norm(trajectory[-1] - trajectory[0])

    # Actual path length
    path_length = np.sum([
        np.linalg.norm(trajectory[i+1] - trajectory[i])
        for i in range(len(trajectory) - 1)
    ])

    # Efficiency (1.0 = straight line)
    efficiency = direct_distance / (path_length + 1e-6)

    return efficiency

3. Multi-Modal Synchronization

Critical for vision-language-action models:

class SynchronizationChecker:
    """Check synchronization across modalities"""
    def __init__(self, tolerance_ms=50):
        self.tolerance_ms = tolerance_ms

    def check_demo_sync(self, demo):
        """Verify all modalities are properly synchronized"""
        issues = []

        # Check timestamps
        if 'timestamps' in demo:
            timestamps = demo['timestamps']

            # Check for missing timestamps
            for modality in ['image', 'state', 'action']:
                if modality not in timestamps:
                    issues.append(f"Missing timestamps for {modality}")

            # Check timestamp alignment
            if len(set(len(ts) for ts in timestamps.values())) > 1:
                issues.append("Modalities have different numbers of timestamps")

            # Check for large gaps
            for modality, ts in timestamps.items():
                gaps = np.diff(ts)
                expected_gap = 1000 / 30  # 30 Hz in ms

                large_gaps = gaps > expected_gap * 2
                if np.any(large_gaps):
                    gap_indices = np.where(large_gaps)[0]
                    issues.append(
                        f"{modality}: Found {len(gap_indices)} large timestamp gaps "
                        f"(max: {gaps.max():.1f}ms)"
                    )

            # Check relative synchronization
            sync_errors = self.compute_sync_errors(timestamps)
            if np.any(sync_errors > self.tolerance_ms):
                issues.append(
                    f"Synchronization errors up to {sync_errors.max():.1f}ms detected"
                )

        return {
            'synchronized': len(issues) == 0,
            'issues': issues
        }

    def compute_sync_errors(self, timestamps):
        """Compute synchronization errors between modalities"""
        # Use state as reference
        ref_timestamps = timestamps['state']
        errors = []

        for modality, ts in timestamps.items():
            if modality == 'state':
                continue

            # Find nearest reference timestamp for each modality timestamp
            for t in ts:
                nearest_ref = ref_timestamps[np.argmin(np.abs(ref_timestamps - t))]
                error = abs(t - nearest_ref)
                errors.append(error)

        return np.array(errors)

    def resync_demo(self, demo):
        """Resynchronize demonstration by interpolation"""
        # Create common timeline
        all_timestamps = []
        for ts in demo['timestamps'].values():
            all_timestamps.extend(ts)

        # Common timeline at 30 Hz
        start_time = min(all_timestamps)
        end_time = max(all_timestamps)
        common_timeline = np.arange(start_time, end_time, 1000/30)

        # Interpolate each modality to common timeline
        resynced_demo = {'timeline': common_timeline}

        for modality in ['image', 'state', 'action']:
            original_ts = demo['timestamps'][modality]
            original_data = demo[modality]

            resynced_data = self.interpolate_modality(
                original_ts,
                original_data,
                common_timeline
            )

            resynced_demo[modality] = resynced_data

        return resynced_demo

    def interpolate_modality(self, original_ts, original_data, target_ts):
        """Interpolate modality data to target timestamps"""
        from scipy.interpolate import interp1d

        if isinstance(original_data[0], np.ndarray):
            # Numerical data (states, actions)
            data_array = np.array(original_data)

            # Interpolate each dimension
            interpolated = []
            for dim in range(data_array.shape[1]):
                interp_func = interp1d(
                    original_ts,
                    data_array[:, dim],
                    kind='linear',
                    fill_value='extrapolate'
                )
                interpolated.append(interp_func(target_ts))

            return np.column_stack(interpolated)
        else:
            # Images - use nearest neighbor
            indices = [
                np.argmin(np.abs(original_ts - t))
                for t in target_ts
            ]
            return [original_data[i] for i in indices]

4. Sensor Data Quality

Detect and handle sensor artifacts:

class SensorQualityChecker:
    """Check for sensor data quality issues"""
    def __init__(self):
        self.image_checks = ['motion_blur', 'overexposure', 'occlusion']
        self.state_checks = ['outliers', 'dropouts', 'noise']

    def check_image_quality(self, images):
        """Check vision sensor quality"""
        issues = []

        for i, img in enumerate(images):
            # Motion blur detection
            if self.detect_motion_blur(img):
                issues.append(f"Frame {i}: Motion blur detected")

            # Exposure issues
            if self.detect_overexposure(img):
                issues.append(f"Frame {i}: Overexposed")
            if self.detect_underexposure(img):
                issues.append(f"Frame {i}: Underexposed")

            # Occlusion detection
            if self.detect_occlusion(img):
                issues.append(f"Frame {i}: Possible occlusion")

        return {
            'quality_ok': len(issues) == 0,
            'issues': issues,
            'blur_ratio': self.compute_blur_ratio(images),
            'avg_brightness': self.compute_avg_brightness(images)
        }

    def detect_motion_blur(self, image):
        """Detect motion blur using Laplacian variance"""
        gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)
        laplacian = cv2.Laplacian(gray, cv2.CV_64F)
        variance = laplacian.var()

        # Threshold: typical sharp images have variance > 100
        return variance < 50

    def detect_overexposure(self, image):
        """Detect overexposed regions"""
        # Count pixels near maximum value
        overexposed_pixels = (image > 250).sum()
        total_pixels = image.size

        # More than 10% overexposed is problematic
        return (overexposed_pixels / total_pixels) > 0.1

    def detect_underexposure(self, image):
        """Detect underexposed regions"""
        # Count pixels near minimum value
        underexposed_pixels = (image < 10).sum()
        total_pixels = image.size

        return (underexposed_pixels / total_pixels) > 0.1

    def detect_occlusion(self, image, template=None):
        """Detect if robot/gripper occluding view"""
        # Simple heuristic: check for large dark regions
        # (assuming robot is darker than environment)
        gray = cv2.cvtColor(image, cv2.COLOR_RGB2GRAY)

        # Bottom portion of image (where gripper usually is)
        bottom_region = gray[int(gray.shape[0]*0.7):, :]

        # Large dark region suggests occlusion
        dark_pixels = (bottom_region < 50).sum()
        return (dark_pixels / bottom_region.size) > 0.3

    def compute_blur_ratio(self, images):
        """Fraction of blurry frames"""
        blurry_count = sum(self.detect_motion_blur(img) for img in images)
        return blurry_count / len(images)

    def compute_avg_brightness(self, images):
        """Average brightness across sequence"""
        brightnesses = [img.mean() for img in images]
        return np.mean(brightnesses)

    def check_state_quality(self, states):
        """Check proprioceptive state quality"""
        states_array = np.array(states)
        issues = []

        # Detect outliers
        outliers = self.detect_outliers(states_array)
        if len(outliers) > 0:
            issues.append(f"Found {len(outliers)} outlier states at indices {outliers[:10]}")

        # Detect dropouts (repeated values)
        dropouts = self.detect_dropouts(states_array)
        if len(dropouts) > 0:
            issues.append(f"Found {len(dropouts)} potential sensor dropouts")

        # Check noise level
        noise_level = self.estimate_noise_level(states_array)
        if noise_level > 0.05:
            issues.append(f"High noise level: {noise_level:.4f}")

        return {
            'quality_ok': len(issues) == 0,
            'issues': issues,
            'outlier_count': len(outliers),
            'dropout_count': len(dropouts),
            'noise_level': noise_level
        }

    def detect_outliers(self, states, threshold=3.0):
        """Detect outlier states using z-score"""
        mean = states.mean(axis=0)
        std = states.std(axis=0)

        z_scores = np.abs((states - mean) / (std + 1e-6))

        # Indices where any dimension has high z-score
        outlier_indices = np.where(np.any(z_scores > threshold, axis=1))[0]

        return outlier_indices

    def detect_dropouts(self, states, threshold=5):
        """Detect sensor dropouts (repeated identical values)"""
        dropout_indices = []

        for i in range(len(states) - threshold):
            # Check if next `threshold` values are identical
            window = states[i:i+threshold]
            if np.all(window == window[0]):
                dropout_indices.append(i)

        return dropout_indices

    def estimate_noise_level(self, states):
        """Estimate sensor noise level"""
        # High-frequency component indicates noise
        # Apply high-pass filter and measure variance

        from scipy.signal import butter, filtfilt

        b, a = butter(4, 0.1, btype='high')

        noise_levels = []
        for dim in range(states.shape[1]):
            filtered = filtfilt(b, a, states[:, dim])
            noise_levels.append(np.std(filtered))

        return np.mean(noise_levels)

Dataset-Level Quality

Distribution Analysis

class DatasetQualityAnalyzer:
    """Analyze quality of entire dataset"""
    def __init__(self, dataset):
        self.dataset = dataset

    def analyze_distribution(self):
        """Analyze dataset distribution"""
        report = {}

        # State space coverage
        all_states = self.collect_all_states()
        report['state_coverage'] = self.compute_coverage(all_states)

        # Action distribution
        all_actions = self.collect_all_actions()
        report['action_distribution'] = self.compute_action_stats(all_actions)

        # Task distribution
        report['task_distribution'] = self.compute_task_distribution()

        # Diversity metrics
        report['diversity'] = self.compute_diversity()

        # Balance metrics
        report['balance'] = self.check_balance()

        return report

    def compute_coverage(self, states):
        """Compute state space coverage"""
        # Discretize state space
        bins_per_dim = 20
        coverage = []

        for dim in range(states.shape[1]):
            hist, _ = np.histogram(states[:, dim], bins=bins_per_dim)
            occupied = (hist > 0).sum()
            coverage.append(occupied / bins_per_dim)

        return {
            'per_dimension': coverage,
            'average': np.mean(coverage),
            'min': np.min(coverage)
        }

    def compute_action_stats(self, actions):
        """Compute action distribution statistics"""
        return {
            'mean': actions.mean(axis=0).tolist(),
            'std': actions.std(axis=0).tolist(),
            'min': actions.min(axis=0).tolist(),
            'max': actions.max(axis=0).tolist(),
            'saturation_rate': self.compute_saturation_rate(actions)
        }

    def compute_saturation_rate(self, actions):
        """Fraction of actions at limits"""
        # Assume actions in [-1, 1]
        saturated = (np.abs(actions) > 0.95).any(axis=1)
        return saturated.mean()

    def compute_diversity(self):
        """Measure dataset diversity"""
        # Initial state diversity
        initial_states = np.array([
            demo[0]['observation']['state']
            for demo in self.dataset
        ])

        # Determinant of covariance
        cov = np.cov(initial_states.T)
        sign, logdet = np.linalg.slogdet(cov)
        diversity_score = logdet if sign > 0 else -np.inf

        # Trajectory diversity (average DTW distance)
        if len(self.dataset) > 10:
            sample_indices = np.random.choice(len(self.dataset), 10, replace=False)
            sample_demos = [self.dataset[i] for i in sample_indices]

            pairwise_distances = []
            for i in range(len(sample_demos)):
                for j in range(i+1, len(sample_demos)):
                    dist = self.compute_trajectory_distance(
                        sample_demos[i],
                        sample_demos[j]
                    )
                    pairwise_distances.append(dist)

            trajectory_diversity = np.mean(pairwise_distances)
        else:
            trajectory_diversity = None

        return {
            'initial_state_diversity': diversity_score,
            'trajectory_diversity': trajectory_diversity
        }

    def check_balance(self):
        """Check if dataset is balanced"""
        # Task balance
        task_counts = self.compute_task_distribution()

        # Success/failure balance
        success_count = sum(1 for demo in self.dataset if demo.get('success', True))
        failure_count = len(self.dataset) - success_count

        # Operator balance (if available)
        operator_counts = {}
        for demo in self.dataset:
            operator = demo.get('metadata', {}).get('operator_id', 'unknown')
            operator_counts[operator] = operator_counts.get(operator, 0) + 1

        return {
            'task_balance': task_counts,
            'success_rate': success_count / len(self.dataset),
            'operator_balance': operator_counts
        }

    def compute_task_distribution(self):
        """Count demonstrations per task"""
        task_counts = {}
        for demo in self.dataset:
            task = demo.get('metadata', {}).get('task', 'unknown')
            task_counts[task] = task_counts.get(task, 0) + 1
        return task_counts

    def detect_duplicates(self, similarity_threshold=0.98):
        """Detect near-duplicate demonstrations"""
        duplicates = []

        for i in range(len(self.dataset)):
            for j in range(i+1, len(self.dataset)):
                similarity = self.compute_demo_similarity(
                    self.dataset[i],
                    self.dataset[j]
                )

                if similarity > similarity_threshold:
                    duplicates.append((i, j, similarity))

        return duplicates

    def compute_demo_similarity(self, demo1, demo2):
        """Compute similarity between two demonstrations"""
        # Compare state trajectories
        states1 = np.array([s['observation']['state'] for s in demo1])
        states2 = np.array([s['observation']['state'] for s in demo2])

        # Resample to same length
        if len(states1) != len(states2):
            from scipy.interpolate import interp1d
            min_len = min(len(states1), len(states2))

            # Resample
            t1 = np.linspace(0, 1, len(states1))
            t2 = np.linspace(0, 1, len(states2))
            t_common = np.linspace(0, 1, min_len)

            states1_resampled = np.array([
                interp1d(t1, states1[:, dim])(t_common)
                for dim in range(states1.shape[1])
            ]).T

            states2_resampled = np.array([
                interp1d(t2, states2[:, dim])(t_common)
                for dim in range(states2.shape[1])
            ]).T

        else:
            states1_resampled = states1
            states2_resampled = states2

        # Compute normalized distance
        distance = np.linalg.norm(states1_resampled - states2_resampled)
        max_distance = np.linalg.norm(states1_resampled) + np.linalg.norm(states2_resampled)

        similarity = 1.0 - (distance / max_distance)

        return similarity

Data Cleaning Pipeline

class DataCleaningPipeline:
    """Automated data cleaning pipeline"""
    def __init__(self):
        self.success_checker = TaskSuccessChecker()
        self.sync_checker = SynchronizationChecker()
        self.sensor_checker = SensorQualityChecker()

    def clean_dataset(self, dataset, config):
        """Clean and filter dataset"""
        print(f"Cleaning dataset ({len(dataset)} demonstrations)...")

        cleaned_dataset = []
        removal_reasons = []

        for i, demo in enumerate(dataset):
            keep, reason = self.should_keep_demo(demo, config)

            if keep:
                # Apply fixes
                demo_cleaned = self.apply_fixes(demo, config)
                cleaned_dataset.append(demo_cleaned)
            else:
                removal_reasons.append((i, reason))

        # Report
        print(f"\nCleaning complete:")
        print(f"  Kept: {len(cleaned_dataset)} / {len(dataset)}")
        print(f"  Removed: {len(removal_reasons)}")

        if removal_reasons:
            print("\nRemoval reasons:")
            reason_counts = {}
            for _, reason in removal_reasons:
                reason_counts[reason] = reason_counts.get(reason, 0) + 1

            for reason, count in sorted(reason_counts.items(), key=lambda x: -x[1]):
                print(f"  {reason}: {count}")

        return cleaned_dataset

    def should_keep_demo(self, demo, config):
        """Decide if demonstration should be kept"""
        # Check task success
        if config.get('require_success', True):
            if not demo.get('success', False):
                return False, "task_failure"

        # Check trajectory length
        min_length = config.get('min_length', 10)
        max_length = config.get('max_length', 1000)

        if len(demo) < min_length:
            return False, "too_short"
        if len(demo) > max_length:
            return False, "too_long"

        # Check trajectory smoothness
        if config.get('check_smoothness', True):
            metrics = compute_trajectory_metrics(demo)
            if metrics['avg_jerk'] > config.get('max_jerk', 1.0):
                return False, "too_jerky"

        # Check synchronization
        if config.get('check_sync', True):
            sync_result = self.sync_checker.check_demo_sync(demo)
            if not sync_result['synchronized']:
                if config.get('auto_resync', False):
                    # Will fix in apply_fixes
                    pass
                else:
                    return False, "sync_error"

        # Check sensor quality
        if 'images' in demo and config.get('check_image_quality', True):
            image_quality = self.sensor_checker.check_image_quality(demo['images'])
            if image_quality['blur_ratio'] > config.get('max_blur_ratio', 0.3):
                return False, "excessive_blur"

        return True, None

    def apply_fixes(self, demo, config):
        """Apply automatic fixes to demonstration"""
        demo_fixed = copy.deepcopy(demo)

        # Resynchronize if needed
        if config.get('auto_resync', False):
            sync_result = self.sync_checker.check_demo_sync(demo_fixed)
            if not sync_result['synchronized']:
                demo_fixed = self.sync_checker.resync_demo(demo_fixed)

        # Smooth trajectory if needed
        if config.get('smooth_trajectories', False):
            demo_fixed = self.smooth_trajectory(demo_fixed, config.get('smoothing_window', 3))

        # Remove outliers
        if config.get('remove_outliers', False):
            demo_fixed = self.remove_outlier_steps(demo_fixed)

        return demo_fixed

    def smooth_trajectory(self, demo, window_size=3):
        """Apply smoothing filter to trajectory"""
        from scipy.signal import savgol_filter

        actions = np.array([step['action'] for step in demo])

        # Savitzky-Golay filter
        if len(actions) > window_size:
            smoothed = savgol_filter(actions, window_size, 2, axis=0)

            # Update demo
            for i, step in enumerate(demo):
                step['action'] = smoothed[i]

        return demo

    def remove_outlier_steps(self, demo):
        """Remove outlier steps from demonstration"""
        states = np.array([s['observation']['state'] for s in demo])

        # Detect outliers
        outlier_indices = self.sensor_checker.detect_outliers(states)

        # Remove outliers
        demo_cleaned = [
            step for i, step in enumerate(demo)
            if i not in outlier_indices
        ]

        return demo_cleaned

Quality Monitoring Dashboard

def generate_quality_report(dataset):
    """Generate comprehensive quality report"""
    analyzer = DatasetQualityAnalyzer(dataset)

    print("="*60)
    print("DATASET QUALITY REPORT")
    print("="*60)

    # Basic stats
    print(f"\nDataset Size: {len(dataset)} demonstrations")

    # Success rate
    success_count = sum(1 for d in dataset if d.get('success', True))
    print(f"Success Rate: {success_count/len(dataset)*100:.1f}%")

    # Distribution analysis
    dist_report = analyzer.analyze_distribution()

    print(f"\nState Space Coverage:")
    print(f"  Average: {dist_report['state_coverage']['average']*100:.1f}%")
    print(f"  Min dimension: {dist_report['state_coverage']['min']*100:.1f}%")

    print(f"\nAction Statistics:")
    action_stats = dist_report['action_distribution']
    print(f"  Mean: {np.array(action_stats['mean'])}")
    print(f"  Std: {np.array(action_stats['std'])}")
    print(f"  Saturation rate: {action_stats['saturation_rate']*100:.1f}%")

    print(f"\nDiversity Metrics:")
    diversity = dist_report['diversity']
    print(f"  Initial state diversity: {diversity['initial_state_diversity']:.2f}")
    if diversity['trajectory_diversity']:
        print(f"  Trajectory diversity: {diversity['trajectory_diversity']:.4f}")

    print(f"\nBalance:")
    balance = dist_report['balance']
    print(f"  Task distribution: {balance['task_balance']}")
    print(f"  Operator distribution: {balance['operator_balance']}")

    # Check for duplicates
    duplicates = analyzer.detect_duplicates()
    if duplicates:
        print(f"\n⚠ Found {len(duplicates)} near-duplicate pairs")

    print("="*60)

Best Practices

DO:

✓ Validate every demonstration immediately after collection ✓ Monitor quality metrics throughout collection ✓ Remove low-quality demonstrations early ✓ Check multi-modal synchronization ✓ Analyze dataset-level distributions ✓ Maintain high success rate (>90% for IL) ✓ Ensure trajectory smoothness ✓ Balance task and condition distributions

DON'T:

✗Accept demonstrations with failed tasks (for IL) ✗Ignore sensor artifacts and noise ✗Skip synchronization checks for multi-modal data ✗Keep near-duplicate demonstrations ✗Overlook distribution imbalances ✗Forget to monitor data quality metrics

Next Steps