跳转至

入门指南

本指南介绍 Pulsing 的核心概念——一个用于构建可扩展 AI 系统的轻量级分布式 Actor 框架。


什么是 Actor?

Actor 是:

  • 具有私有状态的隔离计算单元
  • 顺序处理消息的消息处理器
  • 位置透明:本地和远程 Actor 使用相同的 API
graph LR
    A[发送者] -->|消息| B[Actor 邮箱]
    B --> C[Actor]
    C -->|响应| A

    style A fill:#6366F1,color:#fff
    style B fill:#818CF8,color:#fff
    style C fill:#818CF8,color:#fff

安装

pip install pulsing

第一个 Actor(30秒)

import asyncio
from pulsing.actor import Actor, SystemConfig, create_actor_system


class PingPong(Actor):
    async def receive(self, msg):
        if msg == "ping":
            return "pong"
        return f"echo: {msg}"


async def main():
    system = await create_actor_system(SystemConfig.standalone())
    actor = await system.spawn("pingpong", PingPong())

    print(await actor.ask("ping"))   # -> pong
    print(await actor.ask("hello"))  # -> echo: hello

    await system.shutdown()


asyncio.run(main())

就是这样!任意 Python 对象都可以作为消息——字符串、字典、列表或自定义类。


有状态的 Actor

class Counter(Actor):
    def __init__(self):
        self.value = 0

    async def receive(self, msg):
        if msg == "inc":
            self.value += 1
            return self.value
        if msg == "get":
            return self.value


async def main():
    system = await create_actor_system(SystemConfig.standalone())
    counter = await system.spawn("counter", Counter())

    print(await counter.ask("inc"))  # 1
    print(await counter.ask("inc"))  # 2
    print(await counter.ask("get"))  # 2

    await system.shutdown()

字典消息(最常用)

对于结构化数据,使用字典:

class Calculator(Actor):
    def __init__(self):
        self.result = 0

    async def receive(self, msg):
        if isinstance(msg, dict):
            op = msg.get("op")
            n = msg.get("n", 0)

            if op == "add":
                self.result += n
            elif op == "mul":
                self.result *= n
            elif op == "reset":
                self.result = 0

            return {"result": self.result}


# 使用
resp = await calc.ask({"op": "add", "n": 10})  # {'result': 10}
resp = await calc.ask({"op": "mul", "n": 2})   # {'result': 20}

@as_actor 装饰器(方法调用风格)

想要更面向对象的 API,使用 @as_actor

from pulsing.actor import as_actor, create_actor_system, SystemConfig


@as_actor
class Counter:
    def __init__(self, initial=0):
        self.value = initial

    def inc(self, n=1):
        self.value += n
        return self.value

    def get(self):
        return self.value


async def main():
    system = await create_actor_system(SystemConfig.standalone())
    counter = await Counter.local(system, initial=10)

    print(await counter.inc(5))   # 15
    print(await counter.get())    # 15

    await system.shutdown()

Ask vs Tell

模式 描述 使用场景
ask 发送并等待响应 需要结果
tell 发后即忘 仅副作用、日志
# ask - 等待响应
result = await actor.ask("ping")

# tell - 不等待
await actor.tell("log this event")

流式响应

用于持续输出数据(LLM token、进度更新等):

from pulsing.actor import StreamMessage

@as_actor
class TokenGenerator:
    async def generate(self, prompt: str):
        stream_msg, writer = StreamMessage.create("tokens")

        async def produce():
            for i, word in enumerate(prompt.split()):
                await writer.write({"token": word, "index": i})  # 自动序列化
            await writer.close()

        asyncio.create_task(produce())
        return stream_msg


# 消费流
response = await generator.generate("Hello world from Pulsing")
async for chunk in response.stream_reader():
    print(chunk["token"])  # chunk 已经是 Python dict

核心特性:

  • writer.write(obj) - 写入任意 Python 对象(自动 pickle)
  • stream_reader() - 迭代接收 Python 对象(自动反序列化)
  • 有界缓冲区,支持背压

集群配置

Pulsing 使用 SWIM gossip 协议——无需外部服务!

节点 1(种子节点):

config = SystemConfig.with_addr("0.0.0.0:8000")
system = await create_actor_system(config)
await system.spawn("worker", MyActor(), public=True)  # public = 对集群可见

节点 2(加入集群):

config = SystemConfig.with_addr("0.0.0.0:8001").with_seeds(["192.168.1.100:8000"])
system = await create_actor_system(config)

# 查找并调用远程 actor(API 完全相同!)
worker = await system.resolve_named("worker")
result = await worker.ask("do_work")


总结

概念 描述
Actor 具有私有状态的隔离单元
消息 任意 Python 对象(字符串、字典、列表等)
ask/tell 请求-响应 / 发后即忘
流式响应 持续数据流,自动序列化
@as_actor 将任何类转换为支持方法调用的 Actor
集群 使用 SWIM 协议自动发现

下一步