跳转至

远程 Actor 指南

在集群中运行与发现 Actor 的指南:集群搭建、命名 Actor、resolve。

前后对比:单节点 vs 集群

同一套 Actor 代码。 只有初始化以及获取引用的方式不同。

单节点(独立) 集群(两节点)
初始化 await pul.init() 节点 1:await pul.init(addr="0.0.0.0:8000")
节点 2:await pul.init(addr="0.0.0.0:8001", seeds=["127.0.0.1:8000"])
获取 Actor await Counter.spawn(value=0) 节点 1:await Counter.spawn(value=0, name="counter")
节点 2:await Counter.resolve("counter")
调用 await counter.inc() 相同:await counter.inc() — 位置透明

一旦拿到 proxy(来自 spawnresolve),API 完全一致,业务逻辑无需区分“远程”和“本地”。


集群设置

启动种子节点

import pulsing as pul

# Node 1: 启动种子节点
system = await pul.actor_system(addr="0.0.0.0:8000")

# 生成命名 actor(可通过 resolve 发现)
await system.spawn(WorkerActor(), name="worker")

加入集群

# Node 2: 加入集群
system = await pul.actor_system(
    addr="0.0.0.0:8001",
    seeds=["192.168.1.1:8000"]
)

# 等待集群同步
await asyncio.sleep(1.0)

查找远程 Actor

使用 pul.resolve()(推荐)

pul.resolve() 直接返回 ActorProxy,无需再调用 .as_type().as_any()

# 类型化代理 — 已知 actor 类型时
proxy = await pul.resolve("worker", cls=Worker, timeout=30)
result = await proxy.process("hello")

# 无类型代理 — 远端类型未知时(任意方法)
proxy = await pul.resolve("worker", timeout=30)
result = await proxy.process("hello")

使用 @remote 类的 resolve()

@pul.remote
class Worker:
    def process(self, data): return f"processed: {data}"

# 等价于 pul.resolve("worker", cls=Worker)
worker = await Worker.resolve("worker")
result = await worker.process("hello")  # 直接方法调用

使用 system.resolve()(低层)

需要原始 ActorRef 时(例如使用 .ask() / .tell() 或传入其他 API):

remote_ref = await system.resolve("worker")
response = await remote_ref.ask({"action": "process", "data": "hello"})
# 需要 proxy 时:remote_ref.as_any() 或 remote_ref.as_type(Worker)

Note

推荐使用 pul.resolve(name, cls=...)Class.resolve(name) 获得即用型 proxy。仅在需要低层 ActorRef 时使用 system.resolve(name)

命名 vs 匿名 Actor

命名 Actor(可发现)

命名 Actor 在集群中可被任意节点通过 resolve() 发现:

# 命名 actor - 可通过 resolve() 从任意节点发现
await system.spawn(WorkerActor(), name="worker")

# 其他节点可以通过名称找到
ref = await other_system.resolve("worker")

匿名 Actor(仅本地引用)

匿名 Actor 只能通过 spawn 返回的 ActorRef 访问:

# 匿名 actor - 仅通过 ActorRef 访问
local_ref = await system.spawn(WorkerActor())

# 无法通过 resolve() 找到,只能使用返回的 ActorRef
await local_ref.ask(msg)

位置透明性

命名 Actor 支持位置透明 —— 相同的 API 适用于本地和远程:

# 本地命名 actor
local_ref = await system.spawn(MyActor(), name="local-worker")

# 远程命名 actor(通过集群 resolve)
remote_ref = await system.resolve("remote-worker")

# 两者使用完全相同的 API
response1 = await local_ref.ask(msg)
response2 = await remote_ref.ask(msg)

错误处理

Pulsing 为本地和远程 Actor 提供了统一的错误类型,确保在集群中一致的错误处理。

错误类型

  • PulsingRuntimeError: 框架错误(网络、集群、Actor 系统等)
  • PulsingActorError: Actor 执行错误
  • PulsingBusinessError: 业务逻辑错误(用户输入验证等)
  • PulsingSystemError: 系统错误(可能触发 Actor 重启)
  • PulsingTimeoutError: 超时错误(可重试)

示例

from pulsing.exceptions import (
    PulsingBusinessError,
    PulsingSystemError,
    PulsingRuntimeError,
)

try:
    remote_ref = await system.resolve("worker")
    response = await remote_ref.ask(msg)
except PulsingBusinessError as e:
    # 处理业务错误(用户输入问题)
    print(f"验证失败: {e.message}")
except PulsingSystemError as e:
    # 处理系统错误(可能触发重启)
    print(f"系统错误: {e.error}, 可恢复: {e.recoverable}")
except PulsingRuntimeError as e:
    # 处理框架错误(网络、集群等)
    print(f"框架错误: {e}")

网络故障

网络相关错误会作为 PulsingRuntimeError 抛出:

try:
    remote_ref = await system.resolve("worker")
    response = await remote_ref.ask(msg)
except PulsingRuntimeError as e:
    # 网络故障、集群问题或 Actor 未找到
    if "Connection" in str(e) or "timeout" in str(e).lower():
        # 使用退避策略重试
        pass
    elif "not found" in str(e).lower():
        # Actor 不存在
        pass

超时

为远程调用使用超时,避免无限等待:

try:
    response = await asyncio.wait_for(remote_ref.ask(msg), timeout=10.0)
except asyncio.TimeoutError:
    print("请求超时")
except PulsingRuntimeError as e:
    print(f"远程调用失败: {e}")

对 proxy 方法调用可使用:await asyncio.wait_for(proxy.some_method(), timeout=10.0)

最佳实践

  1. 等待集群同步:加入集群后添加短暂延迟
  2. 优雅处理错误:在 try-except 块中包装远程调用
  3. 使用命名 actor:需要远程访问的 actor 必须有 name
  4. 使用 pul.resolve(name, cls=...) 或 Class.resolve(name):获取有类型代理以获得更好的 API 体验
  5. 使用超时:考虑为远程调用添加超时

示例:分布式计数器

import pulsing as pul

@pul.remote
class DistributedCounter:
    def __init__(self, init_value: int = 0):
        self.value = init_value

    def get(self) -> int:
        return self.value

    def increment(self, n: int = 1) -> int:
        self.value += n
        return self.value

# Node 1: 创建命名计数器(可被远程发现)
system1 = await pul.actor_system(addr="0.0.0.0:8000")
counter = await DistributedCounter.spawn(name="counter", init_value=0)

# Node 2: 访问远程计数器
system2 = await pul.actor_system(addr="0.0.0.0:8001", seeds=["127.0.0.1:8000"])
await asyncio.sleep(1.0)

# 解析并使用远程计数器
remote_counter = await DistributedCounter.resolve("counter")
value = await remote_counter.get()  # 0
value = await remote_counter.increment(5)  # 5

下一步