跳转至

教程:分布式 Agent

将 Pulsing 与 AutoGenLangGraph 集成,跨机器分布你的 Agent。

你将学到:

  • 使用 PulsingRuntime 分布 AutoGen Agent
  • 使用 with_pulsing() 分布 LangGraph 节点
  • LLM 在 GPU 上运行,工具在 CPU 上运行
┌─────────────────────────────────────────────────────────────┐
│                    你的 Agent 代码                          │
│              (AutoGen / LangGraph)                          │
└─────────────────────────┬───────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│                  Pulsing Actor System                       │
└─────────────────────────┬───────────────────────────────────┘
          ┌───────────────┼───────────────┐
          ▼               ▼               ▼
    ┌──────────┐   ┌──────────┐   ┌──────────┐
    │  节点 1  │   │  节点 2  │   │  节点 3  │
    │   GPU    │   │   CPU    │   │   CPU    │
    │   LLM    │   │   工具   │   │   工具   │
    └──────────┘   └──────────┘   └──────────┘

前置条件

pip install pulsing

# AutoGen
pip install autogen-agentchat autogen-ext

# LangGraph
pip install langgraph langchain-openai

方案 A:AutoGen

步骤 1:定义 Agent

from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient

model = OpenAIChatCompletionClient(model="gpt-4o-mini")
agent = AssistantAgent("assistant", model_client=model)

步骤 2:使用 PulsingRuntime

PulsingRuntime 替换 SingleThreadedAgentRuntime

from pulsing.integrations.autogen import PulsingRuntime

# 单进程(默认)
runtime = PulsingRuntime()

# 分布式模式
runtime = PulsingRuntime(addr="0.0.0.0:8000")

步骤 3:注册并运行

await runtime.start()
await runtime.register_factory("assistant", lambda: agent)

# 发送消息
result = await runtime.send_message(
    "Hello, agent!",
    AgentId("assistant", "default")
)

完整示例

import asyncio
from autogen_agentchat.agents import AssistantAgent
from autogen_core import AgentId
from autogen_ext.models.openai import OpenAIChatCompletionClient
from pulsing.integrations.autogen import PulsingRuntime

async def main():
    model = OpenAIChatCompletionClient(model="gpt-4o-mini")
    agent = AssistantAgent("assistant", model_client=model)

    runtime = PulsingRuntime(addr="0.0.0.0:8000")
    await runtime.start()
    await runtime.register_factory("assistant", lambda: agent)

    result = await runtime.send_message(
        "2 + 2 等于多少?",
        AgentId("assistant", "default")
    )
    print(result)

asyncio.run(main())

方案 B:LangGraph

步骤 1:构建图

from langgraph.graph import StateGraph

def llm_node(state):
    # LLM 调用
    return {"messages": [...]}

def tool_node(state):
    # 工具执行
    return {"messages": [...]}

graph = StateGraph(State)
graph.add_node("llm", llm_node)
graph.add_node("tool", tool_node)
graph.add_edge("llm", "tool")
app = graph.compile()

步骤 2:用 Pulsing 包装

from pulsing.integrations.langgraph import with_pulsing

distributed_app = with_pulsing(
    app,
    node_mapping={
        "llm": "langgraph_node_llm",    # → GPU 服务器
        "tool": "langgraph_node_tool",  # → CPU 服务器
    },
    seeds=["gpu-server:8001"],
)

步骤 3:运行

result = await distributed_app.ainvoke({"messages": [...]})

运行示例

# AutoGen 分布式
cd examples/agent/autogen && ./run_distributed.sh

# LangGraph 分布式
cd examples/agent/langgraph && ./run_distributed.sh

使用场景

场景 配置
LLM + 工具 LLM 在 GPU 节点,工具在 CPU 节点
多 Agent 每个 Agent 在不同节点
弹性扩展 无需代码改动即可增减节点

下一步