跳转至

API 参考

Pulsing Actor 框架的完整 API 文档。

契约与语义(由 llms.binding.md 派生)

本节是 Pulsing Python API 的面向用户契约,内容派生自仓库根目录的 llms.binding.md

  • 权威来源llms.binding.md 是对外契约源文件。
  • 本文档:API 参考 + 显式语义声明(并发、错误、信任边界)。
  • 若两者不一致:以 llms.binding.md 为准,并请提 Issue/PR 同步。

@pulsing.remote 的并发语义

@pulsing.remote 类,方法调用会被翻译为 actor 消息并在 actor 内执行。

  • 同步方法(def method
  • 在 actor 内串行执行(一次处理一个请求)。
  • 适合:快速计算、状态修改。
  • 异步方法(async def method
  • 调用走基于流(stream)的执行路径,在 actor 侧以后台任务调度。
  • 当方法处于 await 等待期间,actor 仍可继续处理其他消息(即“非阻塞 actor”语义)。
  • 你既可以:
    • await proxy.async_method(...) 获取最终返回值;也可以
    • async for chunk in proxy.async_method(...): ... 消费中间 yield
  • 生成器(同步/异步)
  • 返回 generator(sync/async)会被当作流式响应

流式与取消(cancellation)

  • 流式响应通过 Pulsing stream message 实现;取消传播属于 best-effort
  • 调用方取消本地 await/async for 时,远端是否立即停止取决于传输层取消传播。

asktell

  • ask(msg):请求-响应,返回值或抛异常。
  • tell(msg):fire-and-forget,不等待返回。

错误模型

Pulsing 提供了跨 Rust 和 Python 的统一错误处理系统,具有清晰的错误分类:

错误分类

  1. PulsingRuntimeError: 框架/系统级错误
  2. Actor 系统错误(NotFound, Stopped 等)
  3. 传输错误(ConnectionFailed 等)
  4. 集群错误(NodeNotFound 等)
  5. 配置错误(InvalidValue 等)
  6. I/O 错误、序列化错误

  7. PulsingActorError: 用户 Actor 执行错误

  8. PulsingBusinessError: 用户输入错误、业务逻辑错误(可恢复,返回给调用者)
  9. PulsingSystemError: 内部错误、资源错误(可能触发 Actor 重启)
  10. PulsingTimeoutError: 操作超时(可重试)
  11. PulsingUnsupportedError: 不支持的操作

使用示例

from pulsing.exceptions import (
    PulsingBusinessError,
    PulsingSystemError,
    PulsingTimeoutError,
    PulsingRuntimeError,
)

@pul.remote
class Service:
    async def validate(self, data: str) -> bool:
        if not data:
            raise PulsingBusinessError(400, "数据不能为空")
        return True

    async def process(self, data: str) -> str:
        try:
            return expensive_operation(data)
        except Exception as e:
            raise PulsingSystemError(f"处理失败: {e}", recoverable=True)

# 调用方
try:
    result = await service.process("")
except PulsingBusinessError as e:
    print(f"业务错误 [{e.code}]: {e.message}")
except PulsingSystemError as e:
    print(f"系统错误: {e.error}, 可恢复: {e.recoverable}")
except PulsingRuntimeError as e:
    print(f"框架错误: {e}")

自动错误分类

标准 Python 异常会自动分类: - ValueError, TypeErrorPulsingBusinessError (code=400) - TimeoutErrorPulsingTimeoutError - RuntimeError, SystemErrorPulsingSystemError (recoverable=True) - 其他异常 → PulsingSystemError (recoverable=True)

注意:错误类型信息在本地和远程调用中都会保留。远程错误传播会保持错误分类。

信任边界与安全声明

  • Python ↔ Python 默认使用 Pickle
  • 为了易用性,Python↔Python 对象载荷默认使用 pickle
  • 风险:对不可信数据 unpickle 可能导致任意代码执行(RCE)。
  • 建议:仅在可信网络/可信集群边界内使用。
  • 传输层安全(TLS)
  • 生产环境建议启用 TLS/mTLS,并把集群内部通信视为经过认证的信任边界。

队列语义(Distributed Queue)

  • 分桶
  • 写端使用 bucket_column + num_buckets 决定 record 落到哪个 bucket。
  • 读端必须与写端保持一致的 num_buckets(以及 backend)。
  • 归属(owner)
  • bucket 的 owner 基于集群成员一致性哈希计算;请求可能被重定向到 owner 节点。
  • 后端
  • 默认内存后端;是否持久化取决于 backend。

核心函数

pul.init / pul.shutdown

全局系统初始化。

import asyncio
import pulsing as pul

async def example():
    await pul.init(addr=None, seeds=None, passphrase=None)
    await pul.shutdown()

# 使用示例
if __name__ == "__main__":
    # 运行示例
    # system = asyncio.run(example())
    pass

参数: - addr: 绑定地址(str 或 None 表示单机模式) - seeds: 加入集群的种子节点(list[str] 或 None) - passphrase: TLS 密码短语(str 或 None)

Under the Hood:pul.actor_system

当需要低层控制时,创建显式 ActorSystem 实例。

import asyncio
import pulsing as pul

async def main():
    # 单机模式
    system = await pul.actor_system()
    await system.shutdown()

    # 集群模式
    system = await pul.actor_system(addr="0.0.0.0:8000")
    await system.shutdown()

    # 加入现有集群
    system = await pul.actor_system(
        addr="0.0.0.0:8001",
        seeds=["127.0.0.1:8000"]
    )
    await system.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

核心类

ActorSystem

Actor 系统的主入口点。

class ActorSystem:
    async def spawn(
        self,
        actor,                # Actor 实例
#        # 关键字参数开始                    # 关键字参数开始
        name=None,           # Actor 名称(str 或 None)
        # public 参数已废弃:所有命名 actor 自动可被 resolve
        restart_policy="never",  # 重启策略("never" | "always" | "on-failure")
        max_restarts=3,      # 最大重启次数
        min_backoff=0.1,     # 最小退避时间(秒)
        max_backoff=30.0     # 最大退避时间(秒)
    ):
        """
        生成新的 actor。

        - 有 name: 命名 actor,可通过 resolve() 发现
        - 无 name: 匿名 actor,仅通过返回的 ActorRef 访问
        """
        pass

    async def refer(self, actorid):
        """
        通过 ActorId 获取 ActorRef。

        **参数:**
        - `actorid`: Actor ID(ActorId 实例或字符串格式 "node_id:local_id")

        **返回:** 对应 actor 的 ActorRef
        """
        pass

    async def resolve(self, name, *, node_id=None):
        """
        通过名称解析 actor。

        **参数:**
        - `name`: Actor 名称(str)
        - `node_id`: 目标节点 ID(int 或 None)

        **返回:** 对应 actor 的 ActorRef
        """
        pass

    async def shutdown(self):
        """关闭 actor 系统。"""
        pass

ActorRef

Actor 的底层引用。使用 ask()tell() 进行通信。

class ActorRef:
    @property
    def actor_id(self):
        """获取 actor 的 ID。"""
        pass

    async def ask(self, msg):
        """
        发送消息并等待响应。

        **参数:**
        - `msg`: 任意消息对象(Any)

        **返回:** 响应消息(Any)
        """
        pass

    async def tell(self, msg):
        """发送消息但不等待响应(fire-and-forget)。"""
        pass

    def as_any(self):
        """在未知远端类型时,返回无类型 ActorProxy。"""
        pass

    def as_type(self, cls):
        """绑定类元数据并返回有类型 ActorProxy。"""
        pass

ActorProxy

@remote 类的高级代理。可直接调用方法。

class ActorProxy:
    @property
    def ref(self):
        """获取底层 ActorRef。"""
        pass

    # 直接调用方法:
    # result = await proxy.my_method(arg1, arg2)

装饰器

@remote / @pul.remote

将类转换为分布式 Actor。

import pulsing as pul

@pul.remote
class Counter:
    def __init__(self, init_value: int = 0):
        self.value = init_value

    # 同步方法 - 顺序执行
    def incr(self):
        self.value += 1
        return self.value

    # 异步方法 - await 期间可并发执行
    async def fetch_and_add(self, url):
        data = await http_get(url)
        self.value += data
        return self.value

    # Generator - 自动流式传输
    async def stream(self):
        for i in range(10):
            yield {"count": i}

# 创建 actor
counter = await Counter.spawn(name="counter")

# 直接调用方法
result = await counter.incr()

# 流式传输
async for chunk in counter.stream():
    print(chunk)

# 解析已有 actor
proxy = await Counter.resolve("counter")

监督参数:

@pul.remote(
    restart_policy="on_failure",  # "never" | "on_failure" | "always"
    max_restarts=3,
    min_backoff=0.1,
    max_backoff=30.0,
)
class ResilientWorker:
    def work(self, data): ...

Under the Hood:基础 Actor

需要底层控制时,可使用基础 Actor 类。

class MyActor:
    def __init__(self):
        self.value = 0

    def on_start(self, actor_id):
        """Actor 启动时调用。"""
        print(f"Started: {actor_id}")

    async def receive(self, msg):
        """处理传入消息。"""
        if msg.get("action") == "add":
            self.value += msg.get("n", 1)
            return {"value": self.value}
        return {"error": "unknown action"}

# 生成
system = await pul.actor_system()
actor = await system.spawn(MyActor(), name="my_actor")

# 通过 ask/tell 通信
response = await actor.ask({"action": "add", "n": 10})

队列 API

用于数据管道的分布式队列。

# 写入
writer = await pul.queue.write(
    "my_queue",
    bucket_column="user_id",
    num_buckets=4,
)
await writer.put({"user_id": "u1", "data": "hello"})
await writer.flush()

# 读取
reader = await pul.queue.read("my_queue")
records = await reader.get(limit=100)

Rust API

Rust API 通过三层 trait 组织(均在 pulsing_actor::prelude::* 中 re-export):

ActorSystemCoreExt(主路径)

核心 spawn 与 resolve 操作:

// Spawn - 简洁 API
system.spawn(actor).await?;                    // 匿名 actor(不可 resolve)
system.spawn_named(name, actor).await?;        // 命名 actor(可 resolve)

// Spawn - Builder 模式(高级配置)
system.spawning()
    .name("services/counter")                  // 可选:有 name = 可 resolve
    .supervision(SupervisionSpec::on_failure().max_restarts(3))
    .mailbox_capacity(256)
    .spawn(actor).await?;

// Resolve - 简洁 API
system.actor_ref(&actor_id).await?;            // 按 ActorId 获取
system.resolve(name).await?;                   // 按名称解析

// Resolve - Builder 模式(高级配置)
system.resolving()
    .node(node_id)                             // 可选:指定目标节点
    .policy(RoundRobinPolicy::new())           // 可选:负载均衡策略
    .filter_alive(true)                        // 可选:只选存活节点
    .resolve(name).await?;                     // 解析单个

system.resolving().list(name).await?;          // 获取所有实例
system.resolving().lazy(name)?;                // 懒解析(~5s TTL 自动刷新)

ActorSystemAdvancedExt(高级:监督/重启)

Factory 模式 spawn,支持 supervision 重启(仅命名 actor):

let options = SpawnOptions::default()
    .supervision(SupervisionSpec::on_failure().max_restarts(3));

// 仅命名 actor 支持 supervision(匿名 actor 无法重新解析)
system.spawn_named_factory(name, || Ok(Service::new()), options).await?;

ActorSystemOpsExt(运维/诊断)

系统信息、集群成员、生命周期控制:

system.node_id();
system.addr();
system.members().await;
system.all_named_actors().await;
system.stop(name).await?;
system.shutdown().await?;

示例

查看快速开始指南了解使用示例。