Jump to content
View in the app

A better way to browse. Learn more.

T.M.I IThub

A full-screen app on your home screen with push notifications, badges and more.

To install this app on iOS and iPadOS
  1. Tap the Share icon in Safari
  2. Scroll the menu and tap Add to Home Screen.
  3. Tap Add in the top-right corner.
To install this app on Android
  1. Tap the 3-dot menu (⋮) in the top-right corner of the browser.
  2. Tap Add to Home screen or Install app.
  3. Confirm by tapping Install.

Complete Tutorial: Deep RL for Autonomous Obstacle Avoidance (Lidar + ROS 2 + Isaac Sim)

(0 reviews)

This comprehensive guide walks you through building a drone obstacle avoidance system from scratch, combining Reinforcement Learning theory, ROS 2 infrastructure, and Isaac Sim physics simulation. By the end, you'll have a trained "brain" that can be deployed to real hardware like NVIDIA Jetson.

1. Technology Stack

  • OS: Ubuntu 22.04 LTS (recommended for stability)

  • Communication: ROS 2 Humble (LTS)

  • Simulator: NVIDIA Isaac Sim 2023.1.1 or later (Omniverse)

  • RL Libraries: Stable Baselines3 (PPO), PyTorch 2.0+, Gymnasium

  • Sensor: 3D LiDAR (simulated Velodyne VLP-16 in Isaac Sim)

  • Python: 3.10+

2. Environment Setup

2.1 Install ROS 2 Humble

# Add ROS 2 repository
sudo apt update && sudo apt install software-properties-common
sudo add-apt-repository universe
sudo apt update && sudo apt install curl -y
sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.asc | sudo apt-key add -
sudo sh -c 'echo "deb [arch=$(dpkg --print-architecture)] http://packages.ros.org/ros2/ubuntu $(lsb_release -cs) main" > /etc/apt/sources.list.d/ros2-latest.list'

# Install ROS 2 Humble
sudo apt update
sudo apt install ros-humble-desktop ros-humble-ros2bag ros-humble-rosbag2-storage-default-plugins

# Source ROS 2
echo "source /opt/ros/humble/setup.bash" >> ~/.bashrc
source ~/.bashrc

2.2 Install Isaac Sim

  1. Download and install NVIDIA Omniverse Launcher

  2. From the launcher, install Isaac Sim 2023.1.1 or later

  3. Enable the ROS 2 bridge extension:

    • Open Isaac Sim

    • Go to Window → Extensions

    • Search for omni.isaac.ros2_bridge

    • Enable it and allow dependencies to install

2.3 Install Python Dependencies

# Create a virtual environment (recommended)
python3 -m venv ~/rl_drone_env
source ~/rl_drone_env/bin/activate

# Install core dependencies
pip install --upgrade pip
pip install gymnasium==0.29.1
pip install stable-baselines3[extra]==2.2.1
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
pip install tensorboard
pip install numpy scipy
pip install open3d  # For point cloud visualization/processing

# Install ROS 2 Python bindings (if not already available)
pip install rclpy sensor-msgs-py geometry-msgs-py

3. Isaac Sim Scene Setup

3.1 Create the Drone Model

  1. Launch Isaac Sim and create a new stage

  2. Add the quadcopter:

    • Go to Isaac Utils → Robots → Quadcopter

    • Or manually: Create → Physics → Rigid Body, then add mesh and configure

  3. Configure physics properties:

    Mass: 1.5 kg
    Center of Mass: (0, 0, -0.05)
    Linear Damping: 0.05
    Angular Damping: 0.05
    
  4. Add motors/thrusters: The Isaac quadcopter comes with 4 rotor controllers

3.2 Add LiDAR Sensor

  1. Add Velodyne VLP-16:

    • Select your drone root prim

    • Go to Create → Isaac → Sensors → Lidar → Rotating

    • Set position: (0.1, 0, 0.05) relative to drone body

  2. Configure LiDAR parameters:

    Horizontal Resolution: 1.0 degree (360 beams)Vertical FOV: 30 degrees (-15 to +15)Vertical Resolution: 2 degrees (16 channels)Range: 0.1 to 10.0 metersRotation Rate: 10 Hz
    

3.3 Setup Action Graph for ROS 2

  1. Open Action Graph:

    • Window → Visual Scripting → Action Graph

  2. Create ROS 2 publishing node:

    Add nodes:
    - On Playback Tick (trigger)
    - Isaac Read Lidar Point Cloud
    - ROS2 Publish Point Cloud
    - ROS2 Context (set domain_id = 0)
    
    Connections:
    - Tick → Read Lidar → Publish Point Cloud
    - Set topic name: "/lidar/points"
    - Frame ID: "lidar_frame"
    
  3. Create velocity command subscriber:

    Add nodes:
    - ROS2 Subscribe Twist
    - Isaac Articulation Controller
    
    Set topic: "/cmd_vel"
    Connect Twist message to quadcopter controller
    

3.4 Create Training Environment

  1. Add obstacles: Create a room with walls and random obstacles

    - Create → Shapes → Cube (for walls)
    - Scale: Make walls 0.1m thick, 3m high
    - Add physics collision
    - Add 5-10 random boxes/cylinders as obstacles
    
  2. Add randomization script (optional but recommended):

    • Randomize obstacle positions on reset

    • Randomize lighting conditions

    • Add procedural textures

  3. Save the scene: File → Save As → drone_training_env.usd

4. Point Cloud Processing Implementation

Here's the complete implementation for converting 3D LiDAR data to a 360-dimensional observation:

Code: point_cloud_processor.py

import numpy as np
from sensor_msgs.msg import PointCloud2
import struct

class PointCloudProcessor:
    def __init__(self, num_beams=360, max_range=10.0, height_threshold=0.5):
        """
        Args:
            num_beams: Number of angular bins (360 for 1-degree resolution)
            max_range: Maximum detection distance (normalized to 1.0)
            height_threshold: Accept points within ±height_threshold meters
        """
        self.num_beams = num_beams
        self.max_range = max_range
        self.height_threshold = height_threshold
        
    def process_point_cloud(self, msg: PointCloud2) -> np.ndarray:
        """
        Convert 3D PointCloud2 to 360-dimensional array (bird's-eye view)
        
        Process:
        1. Parse binary point cloud data
        2. Filter points by height (take horizontal slice)
        3. Convert to polar coordinates
        4. Bin by angle (360 bins)
        5. Take minimum distance in each bin
        6. Normalize to [0, 1]
        """
        # Initialize output array (1.0 = no obstacle detected)
        ranges = np.ones(self.num_beams, dtype=np.float32)
        
        # Parse PointCloud2 binary data
        points = self._parse_pointcloud2(msg)
        
        if points is None or len(points) == 0:
            return ranges
        
        # Extract x, y, z
        x = points[:, 0]
        y = points[:, 1]
        z = points[:, 2]
        
        # Filter by height: only keep points near drone's horizontal plane
        height_mask = np.abs(z) < self.height_threshold
        x = x[height_mask]
        y = y[height_mask]
        
        if len(x) == 0:
            return ranges
        
        # Convert to polar coordinates
        distances = np.sqrt(x**2 + y**2)
        angles = np.arctan2(y, x)  # Range: [-pi, pi]
        
        # Convert angles to degrees and shift to [0, 360)
        angles_deg = np.degrees(angles) % 360
        
        # Bin points by angle
        angle_bins = (angles_deg * self.num_beams / 360.0).astype(np.int32)
        angle_bins = np.clip(angle_bins, 0, self.num_beams - 1)
        
        # For each bin, keep the minimum distance
        for i in range(len(distances)):
            bin_idx = angle_bins[i]
            if distances[i] < ranges[bin_idx] * self.max_range:
                ranges[bin_idx] = distances[i] / self.max_range
        
        # Clip to [0, 1] range
        ranges = np.clip(ranges, 0.0, 1.0)
        
        return ranges
    
    def _parse_pointcloud2(self, msg: PointCloud2) -> np.ndarray:
        """
        Parse binary PointCloud2 data to numpy array
        
        PointCloud2 format:
        - data: byte array with packed XYZ values
        - point_step: bytes per point (usually 16: x,y,z,intensity as floats)
        - row_step: bytes per row
        """
        # Determine format
        point_step = msg.point_step
        num_points = len(msg.data) // point_step
        
        if num_points == 0:
            return None
        
        # Parse binary data (assumes XYZ as first 3 floats)
        points = []
        for i in range(num_points):
            offset = i * point_step
            # Extract x, y, z (first 12 bytes = 3 floats)
            x, y, z = struct.unpack('fff', msg.data[offset:offset+12])
            points.append([x, y, z])
        
        return np.array(points, dtype=np.float32)
    
    def visualize_scan(self, scan: np.ndarray):
        """
        Optional: Visualize the 360-degree scan as a polar plot
        Useful for debugging
        """
        import matplotlib.pyplot as plt
        
        angles = np.linspace(0, 2*np.pi, self.num_beams)
        distances = scan * self.max_range
        
        fig, ax = plt.subplots(subplot_kw=dict(projection='polar'))
        ax.plot(angles, distances)
        ax.set_ylim(0, self.max_range)
        ax.set_title("LiDAR Scan (Bird's Eye View)")
        plt.show()

5. Complete Gymnasium Environment

Code: drone_lidar_env.py

import gymnasium as gym
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from sensor_msgs.msg import PointCloud2
from geometry_msgs.msg import Twist
from std_srvs.srv import Empty
import numpy as np
import time
from point_cloud_processor import PointCloudProcessor

class IsaacDroneEnv(gym.Env):
    """
    Gymnasium environment for drone obstacle avoidance using Isaac Sim
    """
    metadata = {'render_modes': ['human']}
    
    def __init__(self, max_steps=1000, render_mode=None):
        super().__init__()
        
        # Initialize ROS 2
        if not rclpy.ok():
            rclpy.init()
        
        # Create node with proper QoS settings
        self.node = Node('rl_drone_env')
        
        # QoS profile for reliable communication
        qos_profile = QoSProfile(
            reliability=ReliabilityPolicy.RELIABLE,
            history=HistoryPolicy.KEEP_LAST,
            depth=10
        )
        
        # Observation space: 360 lidar beams (normalized distances)
        self.observation_space = gym.spaces.Box(
            low=0.0, 
            high=1.0, 
            shape=(360,), 
            dtype=np.float32
        )
        
        # Action space: [linear_x, angular_z] in normalized range
        self.action_space = gym.spaces.Box(
            low=np.array([-1.0, -1.0]), 
            high=np.array([1.0, 1.0]), 
            dtype=np.float32
        )
        
        # Publishers and subscribers
        self.vel_pub = self.node.create_publisher(
            Twist, '/cmd_vel', qos_profile
        )
        
        self.lidar_sub = self.node.create_subscription(
            PointCloud2, 
            '/lidar/points', 
            self._lidar_callback, 
            qos_profile
        )
        
        # Point cloud processor
        self.pc_processor = PointCloudProcessor(
            num_beams=360,
            max_range=10.0,
            height_threshold=0.5
        )
        
        # State variables
        self.latest_scan = np.ones(360, dtype=np.float32)
        self.scan_received = False
        self.collision = False
        self.steps_count = 0
        self.max_steps = max_steps
        
        # Action scaling (tune based on drone capabilities)
        self.max_linear_vel = 2.0  # m/s
        self.max_angular_vel = 1.5  # rad/s
        
        # Collision threshold
        self.collision_distance = 0.3  # meters (normalized: 0.03 for 10m max)
        
        # Rendering
        self.render_mode = render_mode
        
    def _lidar_callback(self, msg: PointCloud2):
        """Process incoming LiDAR point cloud"""
        # Convert 3D point cloud to 2D scan
        self.latest_scan = self.pc_processor.process_point_cloud(msg)
        self.scan_received = True
        
        # Check for collision (any beam < threshold)
        min_distance = np.min(self.latest_scan)
        if min_distance < (self.collision_distance / self.pc_processor.max_range):
            self.collision = True
    
    def step(self, action):
        """Execute one step in the environment"""
        self.steps_count += 1
        
        # Scale action to real velocity commands
        cmd_vel = Twist()
        cmd_vel.linear.x = float(action[0] * self.max_linear_vel)
        cmd_vel.angular.z = float(action[1] * self.max_angular_vel)
        
        # Publish command
        self.vel_pub.publish(cmd_vel)
        
        # Wait for sensor update (spin ROS)
        self.scan_received = False
        timeout = time.time() + 0.1  # 100ms timeout
        
        while not self.scan_received and time.time() < timeout:
            rclpy.spin_once(self.node, timeout_sec=0.01)
        
        # Calculate reward
        reward = self._calculate_reward(action)
        
        # Check termination conditions
        terminated = self.collision
        truncated = self.steps_count >= self.max_steps
        
        # Additional info for debugging
        info = {
            'collision': self.collision,
            'min_distance': float(np.min(self.latest_scan)),
            'steps': self.steps_count
        }
        
        return self.latest_scan.copy(), reward, terminated, truncated, info
    
    def _calculate_reward(self, action):
        """
        Reward function design:
        - Survival bonus: +1.0 per step
        - Collision penalty: -200
        - Progress reward: bonus for moving forward
        - Smoothness penalty: discourage erratic movements
        - Proximity penalty: penalize getting too close
        """
        reward = 0.0
        
        if self.collision:
            return -200.0
        
        # Survival bonus
        reward += 1.0
        
        # Progress reward (encourage forward movement)
        forward_speed = action[0]
        if forward_speed > 0:
            reward += forward_speed * 0.5
        
        # Proximity penalty (discourage getting too close to obstacles)
        min_dist = np.min(self.latest_scan)
        if min_dist < 0.1:  # Within 1 meter (0.1 normalized)
            reward -= (0.1 - min_dist) * 10.0
        
        # Smoothness penalty (discourage sharp turns at high speed)
        if abs(action[0]) > 0.5 and abs(action[1]) > 0.5:
            reward -= 0.2
        
        return reward
    
    def reset(self, seed=None, options=None):
        """Reset the environment to initial state"""
        super().reset(seed=seed)
        
        # Stop the drone
        stop_cmd = Twist()
        self.vel_pub.publish(stop_cmd)
        
        # Reset state variables
        self.collision = False
        self.steps_count = 0
        self.latest_scan = np.ones(360, dtype=np.float32)
        
        # Wait for environment to stabilize
        time.sleep(0.5)
        
        # Spin to get fresh sensor data
        for _ in range(10):
            rclpy.spin_once(self.node, timeout_sec=0.01)
        
        return self.latest_scan.copy(), {}
    
    def close(self):
        """Cleanup"""
        # Stop the drone
        stop_cmd = Twist()
        self.vel_pub.publish(stop_cmd)
        
        # Shutdown ROS
        self.node.destroy_node()
        if rclpy.ok():
            rclpy.shutdown()

6. Training Script with Frame Stacking

Code: train.py

import numpy as np
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecFrameStack
from stable_baselines3.common.callbacks import CheckpointCallback, EvalCallback
from stable_baselines3.common.monitor import Monitor
from drone_lidar_env import IsaacDroneEnv
import os

def make_env():
    """Create and wrap the environment"""
    env = IsaacDroneEnv(max_steps=1000)
    env = Monitor(env)  # For logging
    return env

# Create directories
os.makedirs("./models", exist_ok=True)
os.makedirs("./logs", exist_ok=True)

# Create vectorized environment
env = DummyVecEnv([make_env])

# Add frame stacking for temporal information
# Agent sees last 4 scans (360x4 = 1440 inputs)
env = VecFrameStack(env, n_stack=4)

# Checkpoint callback: save model every 50k steps
checkpoint_callback = CheckpointCallback(
    save_freq=50000,
    save_path="./models/",
    name_prefix="drone_rl_model"
)

# Configure PPO with optimized hyperparameters
model = PPO(
    policy="MlpPolicy",
    env=env,
    learning_rate=3e-4,
    n_steps=2048,
    batch_size=64,
    n_epochs=10,
    gamma=0.99,
    gae_lambda=0.95,
    clip_range=0.2,
    ent_coef=0.01,  # Encourage exploration
    vf_coef=0.5,
    max_grad_norm=0.5,
    verbose=1,
    tensorboard_log="./logs/",
    device="cuda"  # Use GPU
)

print("Starting training...")
print("Monitor with: tensorboard --logdir ./logs/")

# Train the model
model.learn(
    total_timesteps=1_000_000,
    callback=checkpoint_callback,
    progress_bar=True
)

# Save final model
model.save("./models/drone_avoidance_final")
print("Training complete! Model saved to ./models/drone_avoidance_final.zip")

7. Running the Complete Pipeline

7.1 Launch Isaac Sim

  1. Open Isaac Sim

  2. Load your saved scene: drone_training_env.usd

  3. Press Play to start the simulation

  4. Verify ROS 2 topics are publishing:

    ros2 topic list# Should see: /cmd_vel, /lidar/points
    

7.2 Start Training

# Activate virtual environment
source ~/rl_drone_env/bin/activate

# Start training
python train.py

7.3 Monitor Progress

In a new terminal:

tensorboard --logdir ./logs/

Open browser to http://localhost:6006

What to look for:

  • rollout/ep_rew_mean: Should increase over time (target: > 500)

  • train/entropy_loss: Should gradually decrease

  • Episode length: Should increase as drone survives longer

7.4 Expected Training Timeline

  • First 100k steps: Random exploration, many collisions

  • 100k-300k steps: Begins avoiding obvious obstacles

  • 300k-500k steps: Smooth navigation in simple environments

  • 500k+ steps: Handles complex scenarios

8. Deployment to Real Hardware (Sim-to-Real)

8.1 Export Model to ONNX

# export_model.py
import torch
from stable_baselines3 import PPO
import numpy as np

# Load trained model
model = PPO.load("./models/drone_avoidance_final")

# Extract policy network
policy = model.policy

# Create dummy input (4 stacked frames of 360 beams)
dummy_input = torch.FloatTensor(np.zeros((1, 1440)))

# Export to ONNX
torch.onnx.export(
    policy,
    dummy_input,
    "drone_brain.onnx",
    export_params=True,
    opset_version=11,
    input_names=['observation'],
    output_names=['action'],
    dynamic_axes={'observation': {0: 'batch_size'}}
)

print("Model exported to drone_brain.onnx")

8.2 Inference Node for NVIDIA Jetson

Code: inference_node.py

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import PointCloud2
from geometry_msgs.msg import Twist
import onnxruntime as ort
import numpy as np
from collections import deque
from point_cloud_processor import PointCloudProcessor

class DroneInferenceNode(Node):
    def __init__(self):
        super().__init__('drone_inference_node')
        
        # Load ONNX model
        self.session = ort.InferenceSession(
            "drone_brain.onnx",
            providers=['TensorrtExecutionProvider', 'CUDAExecutionProvider']
        )
        
        # Frame buffer for stacking
        self.frame_buffer = deque(maxlen=4)
        for _ in range(4):
            self.frame_buffer.append(np.ones(360, dtype=np.float32))
        
        # Point cloud processor
        self.pc_processor = PointCloudProcessor()
        
        # ROS communication
        self.vel_pub = self.create_publisher(Twist, '/cmd_vel', 10)
        self.lidar_sub = self.create_subscription(
            PointCloud2,
            '/lidar/points',
            self.lidar_callback,
            10
        )
        
        # Safety parameters
        self.emergency_stop_distance = 0.2  # meters
        self.max_linear_vel = 1.5  # m/s (reduced for real flight)
        self.max_angular_vel = 1.0  # rad/s
        
    def lidar_callback(self, msg):
        # Process point cloud
        scan = self.pc_processor.process_point_cloud(msg)
        
        # Emergency stop check
        if np.min(scan) < (self.emergency_stop_distance / 10.0):
            self.get_logger().warn("EMERGENCY STOP: Obstacle too close!")
            self.publish_stop()
            return
        
        # Update frame buffer
        self.frame_buffer.append(scan)
        
        # Stack frames
        stacked_obs = np.concatenate(list(self.frame_buffer))
        stacked_obs = stacked_obs.reshape(1, -1).astype(np.float32)
        
        # Run inference
        action = self.session.run(None, {'observation': stacked_obs})[0][0]
        
        # Publish command
        cmd = Twist()
        cmd.linear.x = float(np.clip(action[0], -1, 1) * self.max_linear_vel)
        cmd.angular.z = float(np.clip(action[1], -1, 1) * self.max_angular_vel)
        self.vel_pub.publish(cmd)
        
    def publish_stop(self):
        stop = Twist()
        self.vel_pub.publish(stop)

def main():
    rclpy.init()
    node = DroneInferenceNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

8.3 Deploy to Jetson

# On your development machine
scp drone_brain.onnx jetson@drone-jetson:~/
scp inference_node.py jetson@drone-jetson:~/
scp point_cloud_processor.py jetson@drone-jetson:~/

# SSH into Jetson
ssh jetson@drone-jetson

# Install dependencies
pip install onnxruntime-gpu numpy

# Run inference node
python3 inference_node.py

9. Safety Considerations

Critical safety features implemented:

  1. Emergency Stop: Hardware override always available

  2. Minimum Distance Check: Classic algorithm runs in parallel

  3. Velocity Limits: Capped at safe speeds for real hardware

  4. Timeout Handling: Stops drone if no sensor data received

  5. Gradual Deployment: Test in safe environment first

Pre-flight checklist:

  • [ ] Test in simulation until 95%+ success rate

  • [ ] Verify all sensors publishing at expected rates

  • [ ] Test emergency stop button

  • [ ] Start with reduced velocity limits (50% of max)

  • [ ] Use safety nets or fly in padded area

  • [ ] Keep manual override pilot ready

10. Troubleshooting

LiDAR data not received:

# Check topics
ros2 topic list
ros2 topic hz /lidar/points

# Verify Isaac Sim ROS bridge is running
# Check Action Graph is enabled

Training not converging:

  • Increase training steps to 2M+

  • Reduce action space (limit max velocities)

  • Simplify environment (fewer obstacles initially)

  • Check reward function (use tensorboard to monitor)

Sim-to-real gap:

  • Add sensor noise during training

  • Randomize lighting in Isaac Sim

  • Calibrate LiDAR parameters to match real sensor

  • Use domain randomization for textures

11. Next Steps

Enhancements to try:

  • Add GPS waypoint navigation

  • Implement dynamic obstacle avoidance

  • Train with curriculum learning (start easy, increase difficulty)

  • Add camera vision for semantic understanding

  • Multi-agent training (multiple drones)


Edited by Elena Eroshina

0 Comments

Recommended Comments

There are no comments to display.

Configure browser push notifications

Chrome (Android)
  1. Tap the lock icon next to the address bar.
  2. Tap Permissions → Notifications.
  3. Adjust your preference.
Chrome (Desktop)
  1. Click the padlock icon in the address bar.
  2. Select Site settings.
  3. Find Notifications and adjust your preference.