API 概述¶
Pulsing 是分布式 AI 系统的通信骨干——一个具备流式支持、零依赖和内置发现的分布式 Actor 运行时。
核心概念¶
Pulsing 基于 Actor 模型构建,actor 是计算的基本单位,通过异步消息传递通信:
- 位置透明性:本地和远程 actor 使用相同 API
- 容错性:Actor 可以独立失败,不会影响其他 actor
- 并发性:Actor 一次处理一条消息,简化并发编程
主要特性¶
- 零外部依赖:纯 Rust + Tokio 实现
- 内置服务发现:SWIM/Gossip 协议管理集群
- 流式支持:原生支持流式请求/响应
- 多语言:Python 优先,Rust 核心,可扩展到其他语言
Quick Start¶
import pulsing as pul
await pul.init()
@pul.remote
class Counter:
def __init__(self): self.value = 0
def incr(self): self.value += 1; return self.value
counter = await Counter.spawn(name="counter")
print(await counter.incr()) # 1
counter2 = await Counter.resolve("counter")
print(await counter2.incr()) # 2
await pul.shutdown()
Python API¶
使用任何 API 之前,必须先调用 await pul.init()。
生命周期¶
import pulsing as pul
await pul.init(
addr=None, # 绑定地址,单机模式为 None
seeds=None, # 集群种子节点
passphrase=None, # TLS 密码短语
)
await pul.shutdown()
定义 Actor¶
使用 @pul.remote 将任意类变为分布式 actor:
@pul.remote
class Counter:
def __init__(self, init=0):
self.value = init
def incr(self): # 同步方法 — 串行执行
self.value += 1
return self.value
async def fetch_and_add(self, url): # 异步方法 — await 期间可并发
data = await http_get(url)
self.value += data
return self.value
创建与调用¶
Class.spawn() 创建 actor 并返回类型化代理:
解析已有 Actor¶
# 类型化代理 — 已知 actor 类型时
proxy = await Counter.resolve("counter")
result = await proxy.incr()
# 类型化代理 — 手动绑定
ref = await pul.resolve("counter", timeout=30)
proxy = ref.as_type(Counter)
# 无类型代理 — 远端类型未知时
ref = await pul.resolve("service_name")
proxy = ref.as_any()
result = await proxy.any_method(args)
流式响应¶
远程方法返回生成器即可进行流式传输:
@pul.remote
class StreamingService:
async def generate_tokens(self, prompt):
for token in generate_tokens(prompt):
yield token
service = await StreamingService.spawn()
async for token in service.generate_tokens("Hello world"):
print(token, end="")
监督与容错¶
@pul.remote(
restart_policy="on_failure", # "never", "on_failure", "always"
max_restarts=3,
min_backoff=0.1,
max_backoff=30.0,
)
class ResilientWorker:
def process(self, data):
return risky_computation(data)
队列 (Queue)¶
分布式队列,支持 bucket 分区:
writer = await pul.queue.write("my_queue", bucket_column="user_id")
await writer.put({"user_id": "u1", "data": "hello"})
await writer.flush()
reader = await pul.queue.read("my_queue")
records = await reader.get(limit=100)
主题 (Topic)¶
轻量级发布/订阅,用于实时消息分发:
writer = await pul.topic.write("events")
await writer.publish({"type": "user_login", "user": "alice"})
reader = await pul.topic.read("events")
@reader.on_message
async def handle(msg):
print(f"Received: {msg}")
await reader.start()
Under the Hood¶
ActorSystem(显式管理)¶
import pulsing as pul
system = await pul.actor_system(addr="0.0.0.0:8000")
class MyActor:
async def receive(self, msg):
return f"echo: {msg}"
actor = await system.spawn(MyActor(), name="my_actor")
response = await actor.ask({"message": "hello"})
await actor.tell({"event": "fire_and_forget"})
await system.shutdown()
Rust API¶
核心 Trait¶
Rust API 通过 trait 定义契约,分为三层:
ActorSystemCoreExt(主路径,prelude 自动导入)¶
use pulsing_actor::prelude::*;
// 生成 actor
let actor = system.spawn_named("services/echo", EchoActor).await?;
// 通信
let response = actor.ask(Ping(42)).await?;
ActorSystemAdvancedExt(高级:可重启监督)¶
Factory 模式生成,支持监督重启(仅命名 actor):
let options = SpawnOptions::default()
.supervision(SupervisionSpec::on_failure().max_restarts(3));
system.spawn_named_factory("services/worker", || Ok(Worker::new()), options).await?;
ActorSystemOpsExt(运维/诊断/生命周期)¶
系统信息、集群成员、停止/关闭等:
system.node_id();
system.addr();
system.members().await;
system.all_named_actors().await;
system.stop("name").await?;
system.shutdown().await?;
Behavior(类型安全,Akka Typed 风格)¶
- 核心:
Behavior<M>+TypedRef<M>+BehaviorAction (Same/Become/Stop) - 约定:
TypedRef<M>要求M: Serialize + DeserializeOwned + Send + 'static
错误处理¶
Python¶
from pulsing.exceptions import (
PulsingBusinessError,
PulsingSystemError,
PulsingRuntimeError,
)
try:
result = await service.process(data)
except PulsingBusinessError as e:
print(f"业务错误 [{e.code}]: {e.message}")
except PulsingSystemError as e:
print(f"系统错误: {e.error}, 可恢复: {e.recoverable}")
except PulsingRuntimeError as e:
print(f"框架错误: {e}")
Rust¶
use anyhow::Result;
match actor.ask(Ping(42)).await {
Ok(response) => println!("Got: {:?}", response),
Err(e) => println!("Error: {:?}", e),
}
安全考虑¶
- Pickle 载荷在 Python-Python 通信中可能导致 RCE
- 生产环境使用 TLS
- 将集群视为经过认证的信任边界
性能特性¶
- 低延迟:HTTP/2 传输与二进制序列化
- 高吞吐量:异步运行时与高效任务调度
- 内存高效:基于 actor 的并发,无需线程
- 可扩展:Gossip 基础集群发现,适用于大型部署
后续步骤¶
- Python API:Python 接口完整文档
- Rust API:Rust 接口完整文档
- 示例:工作代码示例
- 指南:深入指南和教程