Isolated VM per RL Rollout

Nikola Balic (@nibzard)· emerging

问题

在训练工具使用型Agent的强化学习流程中,多轨迹会并行执行,且可能调用破坏性或有状态的工具:

  • 交叉污染:某一轨迹的操作会影响另一轨迹的运行环境
  • 破坏性命令:Agent可能执行rm -rf这类命令,破坏共享状态
  • 状态泄露:文件系统的变更会在不同轨迹间残留,导致训练数据不一致
  • 奖励失真:若轨迹B受到轨迹A的副作用影响,奖励信号将失去意义
  • 调试噩梦:因竞态条件引发的非确定性故障

Cognition公司在训练Devon文件规划Agent时就遭遇了这类问题:该Agent可访问一个shell工具,能够执行grepfind甚至rm等任意命令。在共享基础设施上并行运行32条轨迹会引发严重混乱。

方案


为每个RL Rollout启动独立的虚拟机(或容器),确保环境完全隔离。

架构设计:

  1. Rollout ID 跟踪:OpenAI的Agent RFT平台为每个Rollout分配唯一标识ID
  2. VM/容器映射:基础设施将Rollout ID与专属VM一一绑定
  3. 干净初始状态:每个VM从完全一致的文件系统、依赖包和配置的全新状态启动
  4. 自动清理:Rollout执行完成(无论成功或失败)后,立即销毁对应VM

核心组件:

  • VM 快速部署:支持快速创建VM(通常为云实例或容器)
  • 突发扩缩容能力:在训练步骤切换的边界节点,支持处理数百至数千个同时发起的VM创建请求
  • 强状态隔离:VM之间无共享文件系统或数据库,完全独立
  • 超时防护:VM超出指定时长后自动销毁,避免资源泄漏

# 基础设施配置(Cognition 落地方案)
from modal import Image, App, method
import uuid

# 包含所有依赖的基础VM镜像
base_image = (
    Image.debian_slim()
    .apt_install("git", "build-essential")  # 安装版本控制与编译工具
    .pip_install("pandas", "numpy", "openai")  # 安装Python依赖库
    .copy_local_dir("./corpus", "/workspace/corpus")  # 复制训练数据集
)

app = App("agent-rft-tool-server")

@app.cls(
    image=base_image,
    cpu=2,
    memory=4096,
    timeout=600,  # 单个Rollout最长执行时长:10分钟
)
class IsolatedToolExecutor:
    """
    每个类实例对应一台独立隔离的VM
    在RL训练阶段,为每个Rollout按需启动
    """

    def __init__(self):
        """初始化当前Rollout的全新执行状态"""
        self.rollout_id = None
        self.workspace = "/workspace"
        self.execution_history = []

    @method()
    def initialize_rollout(self, rollout_id: str):
        """
        Rollout启动时首个调用的方法
        为当前特定Rollout初始化专属隔离环境
        """
        self.rollout_id = rollout_id
        print(f"[{rollout_id}] 已完成隔离VM初始化")

        # 创建当前Rollout专属的工作目录
        import os
        self.work_dir = f"{self.workspace}/rollout_{rollout_id}"
        os.makedirs(self.work_dir, exist_ok=True)

        return {"status": "ready", "rollout_id": rollout_id}

    @method()
    def execute_shell(self, rollout_id: str, command: str):
        """
        在隔离环境中执行Shell命令
        因VM为当前Rollout专属,即使是破坏性命令也无安全风险
        """
        if rollout_id != self.rollout_id:
            raise ValueError(f"Rollout ID不匹配: {rollout_id} != {self.rollout_id}")

        import subprocess

        print(f"[{rollout_id}] 执行命令: {command}")

        # 执行Shell命令,捕获输出与返回码
        result = subprocess.run(
            command,
            shell=True,
            cwd=self.work_dir,
            capture_output=True,
            text=True,
            timeout=60
        )

        # 记录执行历史
        self.execution_history.append({
            "command": command,
            "returncode": result.returncode,
            "stdout": result.stdout[:1000],  # 截断过长输出
            "stderr": result.stderr[:1000]
        })

        return {
            "stdout": result.stdout,
            "stderr": result.stderr,
            "returncode": result.returncode
        }

    @method()
    def read_file(self, rollout_id: str, filepath: str):
        """从语料库或当前工作目录读取文件"""
        if rollout_id != self.rollout_id:
            raise ValueError(f"Rollout ID不匹配")

        # 文件路径仅对当前隔离VM可见
        full_path = f"{self.workspace}/{filepath}"

        try:
            with open(full_path, 'r') as f:
                content = f.read()
            return {"content": content, "error": None}
        except Exception as e:
            return {"content": None, "error": str(e)}

    @method()
    def search_corpus(self, rollout_id: str, query: str):
        """对语料库文档执行语义搜索"""
        if rollout_id != self.rollout_id:
            raise ValueError(f"Rollout ID不匹配")

        # 语义搜索实现代码在此处扩展
        # 语料库为只读模式,在VM启动时已完成复制

        return {"results": [...]}

    @method()
    def cleanup(self, rollout_id: str):
        """
        可选手动清理操作(Modal平台会自动处理VM销毁)
        """
        print(f"[{rollout_id}] Rollout执行完成,VM将被销毁")
        return {"history": self.execution_history}


# 适配OpenAI Agent RFT的工具端点配置
tools_config = [
    {
        "name": "shell",
        "url": "https://your-app.modal.run/execute_shell",
        "headers": {"Authorization": "Bearer YOUR_TOKEN"}
    },
    {
        "name": "read_file",
        "url": "https://your-app.modal.run/read_file",
        "headers": {"Authorization": "Bearer YOUR_TOKEN"}
    },
    {
        "name": "search",
        "url": "https://your-app.modal.run/search_corpus",
        "headers": {"Authorization": "Bearer YOUR_TOKEN"}
    }
]

请求执行流程:

sequenceDiagram
    participant OAI as OpenAI 训练平台
    participant LB as 负载均衡器
    participant VM1 as VM(Rollout 1)
    participant VM2 as VM(Rollout 2)

    Note over OAI: 训练步骤启动
    OAI->>LB: Rollout 1: 执行shell命令("grep TODO")
    OAI->>LB: Rollout 2: 执行shell命令("rm temp.txt")

    LB->>VM1: 路由至独立VM 1
    LB->>VM2: 路由至独立VM 2

    Note over VM1: 执行grep TODO<br/>(安全隔离环境)
    Note over VM2: 执行rm temp.txt<br/>(安全隔离环境)

    VM1->>LB: 返回grep执行结果
    VM2->>LB: 返回执行成功状态

    LB->>OAI: 汇总返回结果

    Note over VM1,VM2: Rollout执行完成<br/>对应VM被销毁

如何使用

阶段1:基础设施搭建

选择隔离技术:

  • Modal/Lambda:搭载容器隔离的无服务器函数(最简便)
  • Docker:每次推演对应一个独立容器(均衡方案)
  • 云虚拟机(Cloud VMs):每次推演对应EC2/GCP实例(隔离性最强,启动速度最慢)
  • Kubernetes Jobs:每次推演对应K8s Pod(生产级标准)

阶段2:实现推演ID追踪

# 所有工具端点必须接收并验证rollout_id
@app.post("/tool/{tool_name}")
async def execute_tool(tool_name: str, rollout_id: str, params: dict):
    # 获取或创建该推演对应的隔离环境
    vm = get_or_create_vm(rollout_id)

    # 在隔离context中执行工具调用
    result = vm.execute(tool_name, params)

    return result

阶段3:应对突发流量

Agent RFT的流量呈现突发特性:

  • 训练步骤边界:100-500个并发推演请求
  • 工具调用延迟:Agent思考时的短暂停顿期
  • 清理阶段:批量销毁虚拟机(VM)

配置自动扩缩容策略:

# Modal框架示例
@app.cls(
    image=base_image,
    concurrency_limit=500,  # 最大并发VM数量
    container_idle_timeout=60,  # 闲置1分钟后自动清理
)

阶段4:基础设施监控

核心监控指标:

  • VM部署耗时:需控制在5秒以内
  • 失败率:基础设施错误会导致零奖励,进而引发训练崩溃
  • 资源泄漏:VM未被正确回收
  • 成本:500台VM持续运行整个训练周期可能产生高昂费用

来自Cognition的Sam给出的建议:

“比如有时候出现基础设施错误导致VM故障……这确实会引发训练崩溃,因为哪怕模型表现出色,最终也会得到零奖励。”

搭建监控体系:

import logging

logger = logging.getLogger("rollout-infra")

@method()
def execute_tool(self, rollout_id: str, tool: str, params: dict):
    try:
        result = self._execute(tool, params)

        # 记录成功日志
        logger.info(f"rollout={rollout_id} tool={tool} status=success")

        return result

    except Exception as e:
        # 记录失败日志——对排查训练崩溃问题至关重要
        logger.error(
            f"rollout={rollout_id} tool={tool} status=error error={str(e)}"
        )

        # 向模型返回错误信息(不要因基础设施问题判定零奖励)
        return {
            "error": "基础设施错误,请重试",
            "retryable": True
        }

权衡

优势

  • 完全隔离:不同部署实例之间无交叉污染
  • 安全性:破坏性命令不会影响其他部署实例或宿主系统
  • 确定性:环境一致,可提供可靠的奖励信号
  • 生产环境一致性:可使用与生产环境完全相同的环境

劣势

  • 成本高昂:同时运行数十上百台虚拟机(VM)可能成本不菲
  • 置备耗时:虚拟机启动会增加延迟(容器启动速度更快)
  • 复杂度高:需要健壮的基础设施及监控体系
  • 扩展性受限:云服务商的配额可能限制并发运行的虚拟机数量
  • 故障风险:基础设施问题可能导致训练进程崩溃

参考文献

关键词

涵盖OpenAI智能体强化微调认知案例分享、Modal平台技术文档、Docker容器隔离安全最佳实践,以及智能体强化微调、虚拟机操作员智能体两类相关技术模式。

直译
  • 【OpenAI 构建小时:智能体强化微调(Agent RFT)——认知案例研究(2025年11月)】(视频链接:https://youtu.be/1s_7RMG4O4U)
  • 【Modal 平台文档】(链接:https://modal.com/docs)
  • 【Docker 隔离最佳实践】(链接:https://docs.docker.com/engine/security/)
  • 相关技术模式:智能体强化微调(Agent Reinforcement Fine-Tuning)、虚拟机操作员智能体

来源摘要

正在获取来源并生成中文摘要…

来源: https://youtu.be/1s_7RMG4O4U

← 返回社区