Filesystem-Based Agent State

Nikola Balic (@nibzard)· established

问题

许多Agent工作流属于长时运行类型,或可能因错误、超时或用户干预而中断。将所有中间状态保存在模型的上下文窗口中不仅可靠性差,而且无法跨会话持久化。当故障发生或Agent触达上下文限制时,已完成的工作会丢失,且必须从头开始重新执行。

方案

Agent会将中间结果与工作状态持久化到执行环境的文件中。这一操作可创建持久化检查点,支持工作流恢复、故障恢复,以及处理超出单会话context限制的任务。

核心模式:

# Agent 将中间状态写入文件
def multi_step_workflow():
    # 检查是否存在已完成的工作内容
    if os.path.exists("state/step1_results.json"):
        print("从步骤1恢复执行...")
        step1_data = json.load(open("state/step1_results.json"))
    else:
        print("从头开始执行...")
        step1_data = perform_step1()
        with open("state/step1_results.json", "w") as f:
            json.dump(step1_data, f)

    # 继续执行步骤2
    if os.path.exists("state/step2_results.json"):
        print("从步骤2恢复执行...")
        step2_data = json.load(open("state/step2_results.json"))
    else:
        step2_data = perform_step2(step1_data)
        with open("state/step2_results.json", "w") as f:
            json.dump(step2_data, f)

    # 最终步骤
    return perform_step3(step2_data)

状态组织结构:

工作区/
├── 状态/
│   ├── step1_results.json
│   ├── step2_results.json
│   └── progress.txt
├── 数据/
│   ├── input.csv
│   └── processed.csv
└── 日志/
    └── execution.log

如何使用

适用场景:

  • 包含高成本操作(API调用、数据处理)的多步骤工作流
  • 可能超出会话限制的长时间运行任务
  • 需要从瞬时故障中恢复的工作流
  • 由多个Agent或会话基于已有成果协作完成的任务
  • 需使用Checkpointing的批处理作业

实现模式:

  1. 高成本操作后添加Checkpoint:

    def process_large_dataset():
        checkpoint_file = "state/processed_rows.json"
    
        # 如果存在则加载进度
        if os.path.exists(checkpoint_file):
            processed = json.load(open(checkpoint_file))
            start_row = len(processed)
        else:
            processed = []
            start_row = 0
    
        # 从检查点开始处理
        for i, row in enumerate(data[start_row:]):
            result = expensive_operation(row)
            processed.append(result)
    
            # 每处理100行创建一次Checkpoint
            if (i + 1) % 100 == 0:
                with open(checkpoint_file, "w") as f:
                    json.dump(processed, f)
    
        return processed
    
  2. 带元数据的状态文件:

    {
      "workflow_id": "abc-123",
      "current_step": "data_processing",
      "completed_steps": ["data_fetch", "validation"],
      "last_update": "2024-01-15T10:30:00Z",
      "data": {
        "records_processed": 1500,
        "errors_encountered": 3
      }
    }
    
  3. 用于可视化的进度日志:

    def log_progress(step, status, details=None):
        with open("logs/progress.log", "a") as f:
            timestamp = datetime.now().isoformat()
            log_entry = f"{timestamp} | {step} | {status}"
            if details:
                log_entry += f" | {json.dumps(details)}"
            f.write(log_entry + "\n")
            print(log_entry)  # 同时在Agent Context中显示
    

权衡

优点:

  • 支持中断后恢复工作流
  • 抵御瞬时故障引发的数据丢失
  • 支持超出单会话限制的长时间运行任务
  • 允许检查中间结果
  • 便于调试(可查看每个检查点的状态)
  • 多个Agent可通过读写共享状态实现协作

缺点:

  • Agent必须编写检查点/恢复逻辑
  • 文件I/O会增加工作流执行的开销
  • 对状态的命名与组织有规范性要求
  • 若未清理过时状态文件,可能引发混淆
  • 并发访问需要协调(文件锁定、原子写入)
  • 执行环境需要持久化存储

操作注意事项:

  • 定义状态文件清理策略(保留周期、自动清理规则)
  • 使用原子写入防止文件损坏(先写入临时文件,再重命名为目标文件)
  • 在状态文件中包含时间戳和版本信息
  • 考虑状态文件大小限制(不要对大规模数据集执行检查点操作)
  • 若状态文件包含敏感数据,需对其进行安全防护

参考文献

关键词

涉及Anthropic工程领域中基于MCP的代码执行技术,以及用于实现对话级持久化的情景记忆模式。

直译
  • Anthropic工程:基于MCP的代码执行(2024)
  • 相关内容:情景记忆模式(用于对话级持久化)

来源摘要

正在获取来源并生成中文摘要…

来源: https://www.anthropic.com/engineering/code-execution-with-mcp

← 返回社区