API 参考¶
Pulsing Actor 框架的完整 API 文档。
契约与语义(由 llms.binding.md 派生)¶
本节是 Pulsing Python API 的面向用户契约,内容派生自仓库根目录的 llms.binding.md。
- 权威来源:
llms.binding.md是对外契约源文件。 - 本文档:API 参考 + 显式语义声明(并发、错误、信任边界)。
- 若两者不一致:以
llms.binding.md为准,并请提 Issue/PR 同步。
@pulsing.remote 的并发语义¶
对 @pulsing.remote 类,方法调用会被翻译为 actor 消息并在 actor 内执行。
- 同步方法(
def method) - 在 actor 内串行执行(一次处理一个请求)。
- 适合:快速计算、状态修改。
- 异步方法(
async def method) - 调用走基于流(stream)的执行路径,在 actor 侧以后台任务调度。
- 当方法处于
await等待期间,actor 仍可继续处理其他消息(即“非阻塞 actor”语义)。 - 你既可以:
await proxy.async_method(...)获取最终返回值;也可以async for chunk in proxy.async_method(...): ...消费中间yield。
- 生成器(同步/异步)
- 返回 generator(sync/async)会被当作流式响应。
流式与取消(cancellation)¶
- 流式响应通过 Pulsing stream message 实现;取消传播属于 best-effort。
- 调用方取消本地
await/async for时,远端是否立即停止取决于传输层取消传播。
ask 与 tell¶
ask(msg):请求-响应,返回值或抛异常。tell(msg):fire-and-forget,不等待返回。
错误模型¶
Pulsing 提供了跨 Rust 和 Python 的统一错误处理系统,具有清晰的错误分类:
错误分类¶
- PulsingRuntimeError: 框架/系统级错误
- Actor 系统错误(NotFound, Stopped 等)
- 传输错误(ConnectionFailed 等)
- 集群错误(NodeNotFound 等)
- 配置错误(InvalidValue 等)
-
I/O 错误、序列化错误
-
PulsingActorError: 用户 Actor 执行错误
- PulsingBusinessError: 用户输入错误、业务逻辑错误(可恢复,返回给调用者)
- PulsingSystemError: 内部错误、资源错误(可能触发 Actor 重启)
- PulsingTimeoutError: 操作超时(可重试)
- PulsingUnsupportedError: 不支持的操作
使用示例¶
from pulsing.exceptions import (
PulsingBusinessError,
PulsingSystemError,
PulsingTimeoutError,
PulsingRuntimeError,
)
@pul.remote
class Service:
async def validate(self, data: str) -> bool:
if not data:
raise PulsingBusinessError(400, "数据不能为空")
return True
async def process(self, data: str) -> str:
try:
return expensive_operation(data)
except Exception as e:
raise PulsingSystemError(f"处理失败: {e}", recoverable=True)
# 调用方
try:
result = await service.process("")
except PulsingBusinessError as e:
print(f"业务错误 [{e.code}]: {e.message}")
except PulsingSystemError as e:
print(f"系统错误: {e.error}, 可恢复: {e.recoverable}")
except PulsingRuntimeError as e:
print(f"框架错误: {e}")
自动错误分类¶
标准 Python 异常会自动分类:
- ValueError, TypeError → PulsingBusinessError (code=400)
- TimeoutError → PulsingTimeoutError
- RuntimeError, SystemError → PulsingSystemError (recoverable=True)
- 其他异常 → PulsingSystemError (recoverable=True)
注意:错误类型信息在本地和远程调用中都会保留。远程错误传播会保持错误分类。
信任边界与安全声明¶
- Python ↔ Python 默认使用 Pickle:
- 为了易用性,Python↔Python 对象载荷默认使用 pickle。
- 风险:对不可信数据 unpickle 可能导致任意代码执行(RCE)。
- 建议:仅在可信网络/可信集群边界内使用。
- 传输层安全(TLS):
- 生产环境建议启用 TLS/mTLS,并把集群内部通信视为经过认证的信任边界。
队列语义(Distributed Queue)¶
- 分桶:
- 写端使用
bucket_column+num_buckets决定 record 落到哪个 bucket。 - 读端必须与写端保持一致的
num_buckets(以及 backend)。 - 归属(owner):
- bucket 的 owner 基于集群成员一致性哈希计算;请求可能被重定向到 owner 节点。
- 后端:
- 默认内存后端;是否持久化取决于 backend。
核心函数¶
pul.init / pul.shutdown¶
全局系统初始化。
import asyncio
import pulsing as pul
async def example():
await pul.init(addr=None, seeds=None, passphrase=None)
await pul.shutdown()
# 使用示例
if __name__ == "__main__":
# 运行示例
# system = asyncio.run(example())
pass
参数:
- addr: 绑定地址(str 或 None 表示单机模式)
- seeds: 加入集群的种子节点(list[str] 或 None)
- passphrase: TLS 密码短语(str 或 None)
Under the Hood:pul.actor_system¶
当需要低层控制时,创建显式 ActorSystem 实例。
import asyncio
import pulsing as pul
async def main():
# 单机模式
system = await pul.actor_system()
await system.shutdown()
# 集群模式
system = await pul.actor_system(addr="0.0.0.0:8000")
await system.shutdown()
# 加入现有集群
system = await pul.actor_system(
addr="0.0.0.0:8001",
seeds=["127.0.0.1:8000"]
)
await system.shutdown()
if __name__ == "__main__":
asyncio.run(main())
核心类¶
ActorSystem¶
Actor 系统的主入口点。
class ActorSystem:
async def spawn(
self,
actor, # Actor 实例
# # 关键字参数开始 # 关键字参数开始
name=None, # Actor 名称(str 或 None)
# public 参数已废弃:所有命名 actor 自动可被 resolve
restart_policy="never", # 重启策略("never" | "always" | "on-failure")
max_restarts=3, # 最大重启次数
min_backoff=0.1, # 最小退避时间(秒)
max_backoff=30.0 # 最大退避时间(秒)
):
"""
生成新的 actor。
- 有 name: 命名 actor,可通过 resolve() 发现
- 无 name: 匿名 actor,仅通过返回的 ActorRef 访问
"""
pass
async def refer(self, actorid):
"""
通过 ActorId 获取 ActorRef。
**参数:**
- `actorid`: Actor ID(ActorId 实例或字符串格式 "node_id:local_id")
**返回:** 对应 actor 的 ActorRef
"""
pass
async def resolve(self, name, *, node_id=None):
"""
通过名称解析 actor。
**参数:**
- `name`: Actor 名称(str)
- `node_id`: 目标节点 ID(int 或 None)
**返回:** 对应 actor 的 ActorRef
"""
pass
async def shutdown(self):
"""关闭 actor 系统。"""
pass
ActorRef¶
Actor 的底层引用。使用 ask() 和 tell() 进行通信。
class ActorRef:
@property
def actor_id(self):
"""获取 actor 的 ID。"""
pass
async def ask(self, msg):
"""
发送消息并等待响应。
**参数:**
- `msg`: 任意消息对象(Any)
**返回:** 响应消息(Any)
"""
pass
async def tell(self, msg):
"""发送消息但不等待响应(fire-and-forget)。"""
pass
def as_any(self):
"""在未知远端类型时,返回无类型 ActorProxy。"""
pass
def as_type(self, cls):
"""绑定类元数据并返回有类型 ActorProxy。"""
pass
ActorProxy¶
@remote 类的高级代理。可直接调用方法。
class ActorProxy:
@property
def ref(self):
"""获取底层 ActorRef。"""
pass
# 直接调用方法:
# result = await proxy.my_method(arg1, arg2)
装饰器¶
@remote / @pul.remote¶
将类转换为分布式 Actor。
import pulsing as pul
@pul.remote
class Counter:
def __init__(self, init_value: int = 0):
self.value = init_value
# 同步方法 - 顺序执行
def incr(self):
self.value += 1
return self.value
# 异步方法 - await 期间可并发执行
async def fetch_and_add(self, url):
data = await http_get(url)
self.value += data
return self.value
# Generator - 自动流式传输
async def stream(self):
for i in range(10):
yield {"count": i}
# 创建 actor
counter = await Counter.spawn(name="counter")
# 直接调用方法
result = await counter.incr()
# 流式传输
async for chunk in counter.stream():
print(chunk)
# 解析已有 actor
proxy = await Counter.resolve("counter")
监督参数:
@pul.remote(
restart_policy="on_failure", # "never" | "on_failure" | "always"
max_restarts=3,
min_backoff=0.1,
max_backoff=30.0,
)
class ResilientWorker:
def work(self, data): ...
Under the Hood:基础 Actor¶
需要底层控制时,可使用基础 Actor 类。
class MyActor:
def __init__(self):
self.value = 0
def on_start(self, actor_id):
"""Actor 启动时调用。"""
print(f"Started: {actor_id}")
async def receive(self, msg):
"""处理传入消息。"""
if msg.get("action") == "add":
self.value += msg.get("n", 1)
return {"value": self.value}
return {"error": "unknown action"}
# 生成
system = await pul.actor_system()
actor = await system.spawn(MyActor(), name="my_actor")
# 通过 ask/tell 通信
response = await actor.ask({"action": "add", "n": 10})
队列 API¶
用于数据管道的分布式队列。
# 写入
writer = await pul.queue.write(
"my_queue",
bucket_column="user_id",
num_buckets=4,
)
await writer.put({"user_id": "u1", "data": "hello"})
await writer.flush()
# 读取
reader = await pul.queue.read("my_queue")
records = await reader.get(limit=100)
Rust API¶
Rust API 通过三层 trait 组织(均在 pulsing_actor::prelude::* 中 re-export):
ActorSystemCoreExt(主路径)¶
核心 spawn 与 resolve 操作:
// Spawn - 简洁 API
system.spawn(actor).await?; // 匿名 actor(不可 resolve)
system.spawn_named(name, actor).await?; // 命名 actor(可 resolve)
// Spawn - Builder 模式(高级配置)
system.spawning()
.name("services/counter") // 可选:有 name = 可 resolve
.supervision(SupervisionSpec::on_failure().max_restarts(3))
.mailbox_capacity(256)
.spawn(actor).await?;
// Resolve - 简洁 API
system.actor_ref(&actor_id).await?; // 按 ActorId 获取
system.resolve(name).await?; // 按名称解析
// Resolve - Builder 模式(高级配置)
system.resolving()
.node(node_id) // 可选:指定目标节点
.policy(RoundRobinPolicy::new()) // 可选:负载均衡策略
.filter_alive(true) // 可选:只选存活节点
.resolve(name).await?; // 解析单个
system.resolving().list(name).await?; // 获取所有实例
system.resolving().lazy(name)?; // 懒解析(~5s TTL 自动刷新)
ActorSystemAdvancedExt(高级:监督/重启)¶
Factory 模式 spawn,支持 supervision 重启(仅命名 actor):
let options = SpawnOptions::default()
.supervision(SupervisionSpec::on_failure().max_restarts(3));
// 仅命名 actor 支持 supervision(匿名 actor 无法重新解析)
system.spawn_named_factory(name, || Ok(Service::new()), options).await?;
ActorSystemOpsExt(运维/诊断)¶
系统信息、集群成员、生命周期控制:
system.node_id();
system.addr();
system.members().await;
system.all_named_actors().await;
system.stop(name).await?;
system.shutdown().await?;
示例¶
查看快速开始指南了解使用示例。