深入 Open Agent SDK(四):多 Agent 协作——子代理、团队与任务编排

本文是「深入 Open Agent SDK (Swift)」系列第四篇。系列目录见这里

单个 Agent 再强,也只是一个执行者。真实的开发任务往往是多步骤、多角色的:先有人探索代码库,有人设计方案,再有人写代码、跑测试。一个 Agent 单干,上下文容易膨胀,效率也上不去。

Open Agent SDK 从三个层面解决这个问题:

  1. 子 Agent -- 主 Agent 在运行过程中动态生成子 Agent,把专门的任务委派出去
  2. Task 系统 -- 用任务追踪多步骤工作的进度和结果
  3. Team + 消息传递 -- 多个 Agent 组成团队,通过邮箱系统互相通信

这篇文章逐一分析这三个层面的实现,最后看它们怎么组合起来做任务编排。

一、子 Agent:SubAgentSpawner 协议与 AgentTool

SubAgentSpawner 协议

子 Agent 的生成不是 AgentTool 直接 new 一个 Agent 出来——中间隔了一层协议。SubAgentSpawner 定义在 Types/AgentTypes.swift 里:

public protocol SubAgentSpawner: Sendable {
    func spawn(
        prompt: String,
        model: String?,
        systemPrompt: String?,
        allowedTools: [String]?,
        maxTurns: Int?
    ) async -> SubAgentResult

    func spawn(
        prompt: String,
        model: String?,
        systemPrompt: String?,
        allowedTools: [String]?,
        maxTurns: Int?,
        disallowedTools: [String]?,
        mcpServers: [AgentMcpServerSpec]?,
        skills: [String]?,
        runInBackground: Bool?,
        isolation: String?,
        name: String?,
        teamName: String?,
        mode: PermissionMode?,
        resume: String?
    ) async -> SubAgentResult
}

两个方法,一个基础版(5 个参数),一个增强版(13 个参数)。协议还提供了默认实现,增强版直接调用基础版,这样已有的实现类不用改代码就能兼容。

为什么要把 spawner 放在 Types/ 而不是 Core/?因为 Tools/Advanced/AgentTool.swift 需要用它,但 Tools/ 不应该导入 Core/。把协议定义在 Types/,具体实现放在 Core/,通过 ToolContext.agentSpawner 注入——这是 SDK 里常见的依赖倒置。

DefaultSubAgentSpawner 实现

DefaultSubAgentSpawnerCore/DefaultSubAgentSpawner.swift 里,做了这几件事:

final class DefaultSubAgentSpawner: SubAgentSpawner, @unchecked Sendable {
    private let apiKey: String
    private let baseURL: String?
    private let parentModel: String
    private let parentTools: [ToolProtocol]
    private let provider: LLMProvider
    private let client: (any LLMClient)?

    func spawn(...) async -> SubAgentResult {
        // 1. 过滤掉 AgentTool,防止无限递归
        var subTools = parentTools.filter { $0.name != "Agent" }

        // 2. 如果指定了 allowedTools,进一步过滤
        if let allowed = allowedTools, !allowed.isEmpty {
            let allowedSet = Set(allowed)
            subTools = subTools.filter { allowedSet.contains($0.name) }
        }

        // 3. disallowedTools 再过一遍(优先级高于 allowedTools)
        if let disallowed = disallowedTools, !disallowed.isEmpty {
            let disallowedSet = Set(disallowed)
            subTools = subTools.filter { !disallowedSet.contains($0.name) }
        }

        // 4. 创建子 Agent 并执行
        let options = AgentOptions(
            apiKey: apiKey,
            model: model ?? parentModel,
            systemPrompt: systemPrompt,
            maxTurns: maxTurns ?? 10,
            tools: subTools
        )
        let agent = Agent(options: options)
        let result = await agent.prompt(prompt)

        return SubAgentResult(
            text: result.text.isEmpty
                ? "(Subagent completed with no text output)"
                : result.text,
            toolCalls: [],
            isError: result.status != .success
        )
    }
}

几个关键点:

  • 防递归:子 Agent 不会再拿到 AgentTool,所以不会出现 Agent 套 Agent 套 Agent 的情况
  • 工具继承:子 Agent 默认继承父 Agent 的所有工具(除了 AgentTool),但可以通过 allowedTools / disallowedTools 限制
  • 阻塞式执行:父 Agent 调用 spawn() 后会 await,等子 Agent 跑完才继续

AgentTool:LLM 眼里的子 Agent 工具

AgentTool 是暴露给 LLM 的工具。LLM 调用 Agent 工具时传入 prompt 和参数,AgentTool 负责调用 spawner 生成子 Agent。

它内置了两种预定义的子 Agent 类型:

private let BUILTIN_AGENTS: [String: AgentDefinition] = [
    "Explore": AgentDefinition(
        name: "Explore",
        description: "Fast agent specialized for exploring codebases...",
        systemPrompt: "You are a codebase exploration agent. Search through files and code to answer questions...",
        tools: ["Read", "Glob", "Grep", "Bash"],
        maxTurns: 10
    ),
    "Plan": AgentDefinition(
        name: "Plan",
        description: "Software architect agent for designing implementation plans...",
        systemPrompt: "You are a software architect. Design implementation plans...",
        tools: ["Read", "Glob", "Grep", "Bash"],
        maxTurns: 10
    ),
]
  • Explore:代码库探索,用 Glob 找文件、Grep 搜内容、Read 读文件
  • Plan:软件架构师,理解代码库后输出实施方案

LLM 调用 AgentTool 时,通过 subagent_type 字段指定用哪种:

{
  "prompt": "Explore the project structure and find all Swift source files",
  "description": "Explore codebase",
  "subagent_type": "Explore"
}

AgentTool 还支持一堆可选参数:model(指定模型)、maxTurns(覆盖轮次上限)、run_in_background(后台运行)、isolation(隔离模式,比如 worktree)、team_name(关联团队)、mode(权限模式)。这些参数直接透传给 spawner。

一个完整的示例

SDK 自带了一个 SubagentExample,演示了主 Agent 作为协调者,通过 AgentTool 委派 Explore 子 Agent 的完整流程:

// 主 Agent 的系统提示
let systemPrompt = """
You are a coordinator agent. When given a task, you should delegate it to a sub-agent \
using the Agent tool. The Agent tool will spawn a specialized agent (e.g., "Explore" type) \
that can use Read, Glob, Grep, and Bash tools to investigate the codebase. \
After the sub-agent returns its findings, summarize the results for the user.
"""

// 注册工具:核心工具 + AgentTool
let agent = createAgent(options: AgentOptions(
    apiKey: apiKey,
    model: defaultModel,
    systemPrompt: systemPrompt,
    maxTurns: 10,
    tools: getAllBaseTools(tier: .core) + [createAgentTool()]
))

// 发任务——主 Agent 会调用 AgentTool 委派给 Explore 子 Agent
for await message in agent.stream("""
    Explore the current project directory. Find all Swift source files, \
    examine the project structure, and provide a summary. \
    Use the Agent tool to delegate this task to an Explore sub-agent.
""") {
    switch message {
    case .toolUse(let data):
        if data.toolName == "Agent" {
            print("[Sub-agent Delegation: \(data.toolName)]")
        }
    case .toolResult(let data):
        print("[Result: \(data.content.prefix(200))]")
    case .result(let data):
        print("Turns: \(data.numTurns), Cost: $\(data.totalCostUsd)")
    default:
        break
    }
}

执行流程:用户发 prompt -> 主 Agent 判断需要探索代码库 -> 调用 AgentTool -> AgentTool 通过 spawner 生成 Explore 子 Agent -> 子 Agent 用 Glob/Grep/Read 执行探索 -> 结果返回给主 Agent -> 主 Agent 汇总后回复用户。

二、Task 系统:任务追踪与状态机

子 Agent 解决了"谁干活"的问题,Task 系统解决的是"活干了多少、谁在干、结果是什么"的问题。

TaskStore:线程安全的 Actor

TaskStore 是一个 Swift Actor,保证并发安全:

public actor TaskStore {
    private var tasks: [String: Task] = [:]
    private var taskCounter: Int = 0

    public func create(
        subject: String,
        description: String? = nil,
        owner: String? = nil,
        status: TaskStatus = .pending
    ) -> Task {
        taskCounter += 1
        let id = "task_\(taskCounter)"
        let now = dateFormatter.string(from: Date())
        let task = Task(
            id: id, subject: subject, description: description,
            status: status, owner: owner,
            createdAt: now, updatedAt: now
        )
        tasks[id] = task
        return task
    }
}

用 Actor 而不是普通类,意味着所有方法都是隐式串行化的——不需要自己加锁。多个 Agent 同时创建任务不会出现竞态条件。

Task 的状态机

Task 有 5 种状态,流转规则很明确:

public enum TaskStatus: String, Sendable, Equatable, Codable {
    case pending      // 等待开始
    case inProgress   // 进行中
    case completed    // 已完成
    case failed       // 失败
    case cancelled    // 已取消
}

状态转换有约束:pendinginProgress 可以转到任何状态,但 completedfailedcancelled 是终态,不可再变:

private func isValidTransition(from: TaskStatus, to: TaskStatus) -> Bool {
    switch from {
    case .pending, .inProgress:
        return true
    case .completed, .failed, .cancelled:
        return false  // 终态,不能再转
    }
}

画成状态图:

pending ──→ inProgress ──→ completed
   │            │
   │            ├──→ failed
   │            │
   └──→ cancelled ←──┘

TaskStatus 还有个贴心的 parse() 方法,同时支持 camelCase(inProgress)和 snake_case(in_progress),因为 LLM 返回的 JSON 格式不一定统一:

public static func parse(_ string: String) -> TaskStatus? {
    if let direct = TaskStatus(rawValue: string) { return direct }
    // snake_case → camelCase
    let camel = string
        .split(separator: "_")
        .enumerated()
        .map { $0.offset == 0 ? String($0.element) : String($0.element).capitalized }
        .joined()
    return TaskStatus(rawValue: camel)
}

Task 结构体

一个 Task 实例除了基本的状态追踪,还预留了依赖关系和元数据:

public struct Task: Sendable, Equatable, Codable {
    public let id: String
    public var subject: String
    public var description: String?
    public var status: TaskStatus
    public var owner: String?        // 谁在干
    public let createdAt: String
    public var updatedAt: String
    public var output: String?       // 结果
    public var blockedBy: [String]?  // 被哪些任务阻塞
    public var blocks: [String]?     // 阻塞了哪些任务
    public var metadata: [String: String]?
}

blockedByblocks 字段说明 Task 系统预留了任务依赖的能力——任务 A 可以声明"我需要等任务 B 和 C 完成才能开始"。

三个 Task 工具

SDK 提供了三个工具让 LLM 操作 Task 系统:

TaskCreate -- 创建任务:

public func createTaskCreateTool() -> ToolProtocol {
    return defineTool(
        name: "TaskCreate",
        description: "Create a new task for tracking work progress.",
        inputSchema: taskCreateSchema,
        isReadOnly: false
    ) { (input: TaskCreateInput, context: ToolContext) in
        guard let taskStore = context.taskStore else {
            return ToolExecuteResult(content: "Error: TaskStore not available.", isError: true)
        }
        let initialStatus: TaskStatus = input.status.flatMap { TaskStatus.parse($0) } ?? .pending
        let task = await taskStore.create(
            subject: input.subject,
            description: input.description,
            owner: input.owner,
            status: initialStatus
        )
        return ToolExecuteResult(
            content: "Task created: \(task.id) - \"\(task.subject)\" (\(task.status.rawValue))",
            isError: false
        )
    }
}

TaskList -- 列出任务(支持按 status 和 owner 过滤):

// LLM 可以查 "列出所有 pending 状态的任务" 或 "列出分配给 agent-1 的任务"
let tasks = await taskStore.list(status: status, owner: input.owner)

TaskUpdate -- 更新任务(状态、描述、负责人、输出):

do {
    let task = try await taskStore.update(
        id: input.id,
        status: status,
        description: input.description,
        owner: input.owner,
        output: input.output
    )
    return ToolExecuteResult(
        content: "Task updated: \(task.id) - \(task.status.rawValue) - \"\(task.subject)\"",
        isError: false
    )
} catch let error as TaskStoreError {
    return ToolExecuteResult(content: "Error: \(error.localizedDescription)", isError: true)
}

注意 TaskUpdate 会抛出 invalidStatusTransition 错误——比如试图把一个 completed 的任务改成 inProgress,LLM 会收到错误提示,可以据此调整策略。

三、Team 系统:团队组建与管理

Task 系统追踪"做什么",Team 系统解决"谁跟谁一组"。

TeamStore

和 TaskStore 一样,TeamStore 也是 Actor:

public actor TeamStore {
    private var teams: [String: Team] = [:]
    private var teamCounter: Int = 0

    public func create(
        name: String,
        members: [TeamMember] = [],
        leaderId: String = "self"
    ) -> Team {
        teamCounter += 1
        let id = "team_\(teamCounter)"
        let team = Team(
            id: id, name: name, members: members,
            leaderId: leaderId,
            createdAt: dateFormatter.string(from: Date()),
            status: .active
        )
        teams[id] = team
        return team
    }
}

Team 有两种状态:activedisbanded。删除 Team 不是真删,而是把状态改成 disbanded——标记为 disbanded 的 Team 不允许添加/移除成员。

TeamMember 和角色

public enum TeamRole: String, Sendable, Equatable, Codable {
    case leader   // 团队领导
    case member   // 普通成员
}

public struct TeamMember: Sendable, Equatable, Codable {
    public let name: String
    public let role: TeamRole
}

TeamCreateTool 创建 Team 时,所有传入的成员默认都是 member 角色,leaderId 默认是 "self"(即创建者自己):

let members: [TeamMember] = input.members?.map { TeamMember(name: $0) } ?? []
let team = await teamStore.create(
    name: input.name,
    members: members,
    leaderId: "self"
)

TeamStore 还提供了动态管理成员的能力:

// 添加成员
try teamStore.addMember(teamId: "team_1", member: TeamMember(name: "agent-coder"))

// 移除成员
try teamStore.removeMember(teamId: "team_1", agentName: "agent-coder")

// 查找某个 Agent 属于哪个团队
let team = await teamStore.getTeamForAgent(agentName: "agent-coder")

getTeamForAgent 对消息传递很重要——发消息时需要知道发件人属于哪个 Team,才能验证收件人是不是队友。

AgentRegistry:Agent 注册表

除了 TeamStore,还有一个 AgentRegistry 负责追踪所有活跃的 Agent:

public actor AgentRegistry {
    private var agents: [String: AgentRegistryEntry] = [:]
    private var nameIndex: [String: String] = [:]  // name -> agentId

    public func register(agentId: String, name: String, agentType: String) throws -> AgentRegistryEntry {
        if nameIndex[name] != nil {
            throw AgentRegistryError.duplicateAgentName(name: name)
        }
        let entry = AgentRegistryEntry(...)
        agents[agentId] = entry
        nameIndex[name] = agentId
        return entry
    }

    public func getByName(name: String) -> AgentRegistryEntry? {
        guard let agentId = nameIndex[name] else { return nil }
        return agents[agentId]
    }
}

名字唯一性约束——同一个 AgentRegistry 里不能注册两个同名的 Agent。nameIndex 是一个反查索引,支持 O(1) 的名字查找。

四、消息传递:MailboxStore 与 SendMessage

有了 Team,Agent 之间需要能通信。SDK 用的是邮箱模式(Mailbox)——发消息不直接推给对方,而是放进对方的邮箱,对方自己来取。

MailboxStore

public actor MailboxStore {
    private var mailboxes: [String: [AgentMessage]] = [:]

    // 点对点发送
    public func send(from: String, to: String, content: String, type: AgentMessageType = .text) {
        let message = AgentMessage(from: from, to: to, content: content,
                                   timestamp: dateFormatter.string(from: Date()), type: type)
        if mailboxes[to] == nil { mailboxes[to] = [] }
        mailboxes[to]?.append(message)
    }

    // 广播——发给所有有邮箱的 Agent
    public func broadcast(from: String, content: String, type: AgentMessageType = .text) {
        let timestamp = dateFormatter.string(from: Date())
        for (agentName, _) in mailboxes {
            let message = AgentMessage(from: from, to: agentName, content: content,
                                       timestamp: timestamp, type: type)
            mailboxes[agentName]?.append(message)
        }
    }

    // 读取并清空邮箱
    public func read(agentName: String) -> [AgentMessage] {
        guard let messages = mailboxes[agentName] else { return [] }
        mailboxes[agentName] = []  // 读完清空
        return messages
    }
}

三个核心操作:send(点对点)、broadcast(广播)、read(读取)。read 是破坏性读取——读一次邮箱就清空了。broadcast 只发给已经有邮箱的 Agent,不会凭空创建邮箱。

消息类型除了普通文本(.text),还有 .shutdownRequest.shutdownResponse.planApprovalResponse——这些特殊类型用于团队管理的协调操作。

SendMessage 工具

SendMessageTool 做了三层校验:

// 1. 必须有 MailboxStore
guard let mailboxStore = context.mailboxStore else { ... }
// 2. 必须有 TeamStore
guard let teamStore = context.teamStore else { ... }
// 3. 必须知道发送者是谁
guard let senderName = context.senderName else { ... }

// 4. 发送者必须在某个 Team 里
guard let team = await teamStore.getTeamForAgent(agentName: senderName) else { ... }

// 5. 收件人必须是同 Team 的成员
let isMember = team.members.contains { $0.name == input.to }
guard isMember else { ... }

广播用 "*" 作为收件人:

{ "to": "*", "message": "Phase 1 complete, starting Phase 2." }

点对点用具体名字:

{ "to": "agent-coder", "message": "Here's the spec for module A." }

校验不通过时返回错误信息,LLM 能看到哪些成员可用,可以调整发送目标。

五、编排模式:怎么组合这些能力

单个 Agent、Task、Team、Mailbox 各自能做什么清楚了。实际场景中怎么组合?看一个典型的工作流。

模式一:主 Agent + 并行子 Agent

最简单的模式。主 Agent 收到复杂任务后,同时启动多个子 Agent 各自处理一部分:

let agent = createAgent(options: AgentOptions(
    apiKey: apiKey,
    model: "claude-sonnet-4-6",
    systemPrompt: """
    You are a coordinator. Break complex tasks into subtasks, \
    delegate each to an Explore sub-agent, then synthesize results.
    """,
    maxTurns: 20,
    tools: getAllBaseTools(tier: .core) + [
        createAgentTool(),
        createTaskCreateTool(),
        createTaskUpdateTool(),
        createTaskListTool()
    ],
    taskStore: TaskStore()
))

LLM 可能这样编排:

  1. TaskCreate("Analyze module A") -- 创建任务
  2. Agent(prompt: "Analyze module A", subagent_type: "Explore") -- 委派子 Agent
  3. TaskUpdate(id: "task_1", status: "completed", output: result) -- 标记完成
  4. 重复步骤 1-3 处理其他模块
  5. 汇总所有结果

模式二:团队协作 + 消息传递

需要多个 Agent 长期协作时,用 Team + Mailbox:

let mailboxStore = MailboxStore()
let teamStore = TeamStore()

let agent = createAgent(options: AgentOptions(
    apiKey: apiKey,
    model: "claude-sonnet-4-6",
    agentName: "coordinator",
    mailboxStore: mailboxStore,
    teamStore: teamStore,
    tools: getAllBaseTools(tier: .core) + [
        createAgentTool(),
        createTeamCreateTool(),
        createTeamDeleteTool(),
        createSendMessageTool(),
        createTaskCreateTool(),
        createTaskListTool(),
        createTaskUpdateTool()
    ]
))

LLM 的编排可能像这样:

  1. TeamCreate(name: "refactor-team", members: ["explorer", "planner", "coder"]) -- 建团队
  2. TaskCreate("Explore codebase", owner: "explorer") -- 创建任务
  3. Agent(prompt: "...", name: "explorer", subagent_type: "Explore") -- 启动探索 Agent
  4. SendMessage(to: "planner", message: "Exploration done, here's the summary...") -- 通知规划 Agent
  5. TaskCreate("Write implementation plan", owner: "planner") -- 下一个任务
  6. 持续推进...

模式三:工作队列

用 Task 系统做工作队列,主 Agent 创建一批任务,子 Agent 逐个领取执行:

主 Agent:
  TaskCreate("Fix bug #1")     → task_1 (pending)
  TaskCreate("Fix bug #2")     → task_2 (pending)
  TaskCreate("Add feature X")  → task_3 (pending)

子 Agent A:
  TaskList(status: "pending")       → [task_1, task_2, task_3]
  TaskUpdate(task_1, status: "in_progress", owner: "agent-a")
  ... 干活 ...
  TaskUpdate(task_1, status: "completed", output: "Fixed by ...")

子 Agent B:
  TaskList(status: "pending")       → [task_2, task_3]
  TaskUpdate(task_2, status: "in_progress", owner: "agent-b")
  ... 干活 ...

TaskStore 是 Actor,多个 Agent 并发更新同一条任务不会出问题(先到先得),但不会自动分配——需要 LLM 自己协调谁认领哪个任务。

设计思路的取舍

这套多 Agent 协作机制有几个设计选择:

为什么子 Agent 不能再生子 Agent? DefaultSubAgentSpawner 在创建子 Agent 时过滤掉了 AgentTool。这是有意的限制——如果不限制,一个 Agent 生成一个 Agent 再生成一个 Agent,递归深度不可控,token 消耗也会指数级增长。

为什么消息是拉取(Pull)不是推送(Push)? MailboxStore.read() 是破坏性读取,Agent 需要主动调用才能收到消息。这比推送模式简单得多——不需要维护回调、不需要处理 Agent 离线的情况。代价是实时性差,但在 Agent Loop 的工具调用频率下(每个 turn 都可以调工具),拉取的延迟可以接受。

为什么 Task 的状态机没有自动流转? blockedBy 字段只是声明了依赖关系,但 TaskStore.update() 不会自动检查前置任务是否完成。这意味着"等任务 A 做完再做任务 B"这个逻辑需要 LLM 自己实现——调 TaskList 看状态,再决定下一步。这是一个务实的取舍:自动依赖解析可以加,但对 LLM 来说,显式检查反而更可控。

小结

Open Agent SDK 的多 Agent 协作由三层构成:

  • 子 Agent:通过 SubAgentSpawner 协议和 AgentTool 实现,主 Agent 在运行时动态生成子 Agent 委派任务,内置 Explore 和 Plan 两种类型
  • Task 系统:基于 TaskStore Actor 的任务追踪,有明确的状态机(pending -> inProgress -> completed/failed/cancelled),终态不可逆转
  • Team + MailboxTeamStore 管理团队和成员,MailboxStore 实现邮箱式消息传递,支持点对点和广播

三层可以独立使用,也可以组合——用 Task 追踪进度,用 Team 组织成员,用 Mailbox 协调通信,用子 Agent 执行具体工作。

下一篇会看 SDK 的 会话持久化:Agent 对话历史怎么存、怎么恢复、怎么在重启后继续之前的工作。


系列文章

GitHubterryso/open-agent-sdk-swift