跳转至

Pulsing 原生 Agent

Pulsing 提供轻量的原生 Agent 工具箱,用于构建多智能体应用,与 Actor 模型完全兼容。

设计理念

简单透明。

  • @agent = @remote + 元信息(无魔法)
  • 元信息用于可视化/调试,不会自动注入 prompt
  • 完全掌控 LLM 调用和对话流程

核心 API

@agent 装饰器

@agent 装饰器等同于 @pul.remote,但附加元信息用于可视化和调试:

import pulsing as pul
from pulsing.agent import agent, llm, get_agent_meta, list_agents

# @pul.remote: 基础 Actor
@pul.remote
class Worker:
    async def work(self):
        return "done"

# @agent: 带元信息的 Actor
@agent(role="研究员", goal="深入分析", domain="AI")
class Researcher:
    async def analyze(self, topic: str) -> str:
        client = await llm()
        resp = await client.ainvoke(f"分析: {topic}")
        return resp.content

元信息访问

await pul.init()
try:
    r = await Researcher.spawn(name="researcher")

    # 通过名称获取元信息
    meta = get_agent_meta("researcher")
    print(meta.role)       # "研究员"
    print(meta.goal)       # "深入分析"
    print(meta.tags)       # {"domain": "AI"}

    # 列出所有 Agent
    for name, meta in list_agents().items():
        print(f"{name}: {meta.role}")
finally:
    await pul.shutdown()

@pul.remote vs @agent

特性 @pul.remote @agent
功能 Actor 化 Actor 化 + 元信息
用途 通用 可视化/调试
元信息 role, goal, backstory, tags
性能 基准 几乎一致

运行时管理

import pulsing as pul

await pul.init()
try:
    # 创建和使用 Agent
    agent = await MyAgent.spawn(name="agent")
    await agent.work()
finally:
    await pul.shutdown()

runtime() 仍可作为便捷 context manager 使用。本文统一使用显式 await pul.init() / await pul.shutdown()

分布式模式

# 节点 A
await pul.init(addr="0.0.0.0:8001")
try:
    await JudgeActor.spawn(name="judge")
finally:
    await pul.shutdown()

# 节点 B(自动发现节点 A)
await pul.init(addr="0.0.0.0:8002", seeds=["node_a:8001"])
try:
    judge = await JudgeActor.resolve("judge")  # 跨节点透明调用
    await judge.submit(idea)
finally:
    await pul.shutdown()

LLM 集成

from pulsing.agent import llm

async def analyze(topic: str):
    # 获取 LLM 客户端(懒加载单例)
    client = await llm(temperature=0.8)
    resp = await client.ainvoke(f"分析: {topic}")
    return resp.content

环境变量:

  • OPENAI_API_KEY: API 密钥(必需)
  • OPENAI_BASE_URL: 自定义 API 地址(可选)
  • LLM_MODEL: 默认模型名(可选,默认 gpt-4o-mini

工具函数

JSON 解析

from pulsing.agent import parse_json, extract_field

# 安全 JSON 解析,带默认值
data = parse_json('{"key": "value"}', default={})

# 提取特定字段
value = extract_field(response, "answer", default="unknown")

完整示例

import asyncio
import pulsing as pul
from pulsing.agent import agent, llm, parse_json, list_agents

@pul.remote
class Moderator:
    """使用 @pul.remote 的协调者(基础 Actor)"""

    def __init__(self, topic: str):
        self.topic = topic
        self.opinions = []

    async def collect_opinion(self, agent_name: str, opinion: str):
        self.opinions.append({"agent": agent_name, "opinion": opinion})
        return {"received": True}

    async def summarize(self):
        return {"topic": self.topic, "opinions": self.opinions}

@agent(role="分析师", goal="提供洞见", domain="tech")
class Analyst:
    """使用 @agent 的分析师(带元信息的 Actor)"""

    def __init__(self, name: str, moderator: str, mock: bool = True):
        self.name = name
        self.moderator_name = moderator
        self.mock = mock

    async def analyze(self, topic: str):
        if self.mock:
            opinion = f"[{self.name}] 对 {topic} 的分析:前景看好"
        else:
            client = await llm()
            resp = await client.ainvoke(f"简要分析: {topic}")
            opinion = resp.content

        # 提交给协调者
        moderator = await Moderator.resolve(self.moderator_name)
        await moderator.collect_opinion(self.name, opinion)
        return opinion

async def main():
    await pul.init()
    try:
        # 创建协调者
        moderator = await Moderator.spawn(topic="AI 趋势", name="moderator")

        # 创建分析师
        for i in range(3):
            name = f"analyst_{i}"
            await Analyst.spawn(
                name=name,
                moderator="moderator",
                mock=True,
                name=name,
            )

        # 显示 Agent 元信息
        print("已注册的 Agent:")
        for name, meta in list_agents().items():
            print(f"  {name}: {meta.role}")

        # 运行分析
        for i in range(3):
            analyst = await Analyst.resolve(f"analyst_{i}")
            await analyst.analyze("AI 趋势")

        # 获取总结
        result = await moderator.summarize()
        print(f"总结: {result}")
    finally:
        await pul.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

示例

参见 examples/agent/pulsing/ 目录的完整示例:

示例 说明
mbti_discussion.py 基于 MBTI 人格的多智能体讨论
parallel_ideas_async.py 并行创意生成与竞争提交
runtime_lifecycle_example.py 运行时生命周期管理
# 运行 MBTI 讨论(模拟模式)
python examples/agent/pulsing/mbti_discussion.py --mock --group-size 6

# 运行并行创意(模拟模式)
python examples/agent/pulsing/parallel_ideas_async.py --mock --n-ideas 5

下一步