远程 Actor 指南¶
在集群中运行与发现 Actor 的指南:集群搭建、命名 Actor、resolve。
前后对比:单节点 vs 集群¶
同一套 Actor 代码。 只有初始化以及获取引用的方式不同。
| 单节点(独立) | 集群(两节点) | |
|---|---|---|
| 初始化 | await pul.init() |
节点 1:await pul.init(addr="0.0.0.0:8000")节点 2: await pul.init(addr="0.0.0.0:8001", seeds=["127.0.0.1:8000"]) |
| 获取 Actor | await Counter.spawn(value=0) |
节点 1:await Counter.spawn(value=0, name="counter")节点 2: await Counter.resolve("counter") |
| 调用 | await counter.inc() |
相同:await counter.inc() — 位置透明 |
一旦拿到 proxy(来自 spawn 或 resolve),API 完全一致,业务逻辑无需区分“远程”和“本地”。
集群设置¶
启动种子节点¶
import pulsing as pul
# Node 1: 启动种子节点
system = await pul.actor_system(addr="0.0.0.0:8000")
# 生成命名 actor(可通过 resolve 发现)
await system.spawn(WorkerActor(), name="worker")
加入集群¶
# Node 2: 加入集群
system = await pul.actor_system(
addr="0.0.0.0:8001",
seeds=["192.168.1.1:8000"]
)
# 等待集群同步
await asyncio.sleep(1.0)
查找远程 Actor¶
使用 pul.resolve()(推荐)¶
pul.resolve() 直接返回 ActorProxy,无需再调用 .as_type() 或 .as_any():
# 类型化代理 — 已知 actor 类型时
proxy = await pul.resolve("worker", cls=Worker, timeout=30)
result = await proxy.process("hello")
# 无类型代理 — 远端类型未知时(任意方法)
proxy = await pul.resolve("worker", timeout=30)
result = await proxy.process("hello")
使用 @remote 类的 resolve()¶
@pul.remote
class Worker:
def process(self, data): return f"processed: {data}"
# 等价于 pul.resolve("worker", cls=Worker)
worker = await Worker.resolve("worker")
result = await worker.process("hello") # 直接方法调用
使用 system.resolve()(低层)¶
需要原始 ActorRef 时(例如使用 .ask() / .tell() 或传入其他 API):
remote_ref = await system.resolve("worker")
response = await remote_ref.ask({"action": "process", "data": "hello"})
# 需要 proxy 时:remote_ref.as_any() 或 remote_ref.as_type(Worker)
Note
推荐使用 pul.resolve(name, cls=...) 或 Class.resolve(name) 获得即用型 proxy。仅在需要低层 ActorRef 时使用 system.resolve(name)。
命名 vs 匿名 Actor¶
命名 Actor(可发现)¶
命名 Actor 在集群中可被任意节点通过 resolve() 发现:
# 命名 actor - 可通过 resolve() 从任意节点发现
await system.spawn(WorkerActor(), name="worker")
# 其他节点可以通过名称找到
ref = await other_system.resolve("worker")
匿名 Actor(仅本地引用)¶
匿名 Actor 只能通过 spawn 返回的 ActorRef 访问:
# 匿名 actor - 仅通过 ActorRef 访问
local_ref = await system.spawn(WorkerActor())
# 无法通过 resolve() 找到,只能使用返回的 ActorRef
await local_ref.ask(msg)
位置透明性¶
命名 Actor 支持位置透明 —— 相同的 API 适用于本地和远程:
# 本地命名 actor
local_ref = await system.spawn(MyActor(), name="local-worker")
# 远程命名 actor(通过集群 resolve)
remote_ref = await system.resolve("remote-worker")
# 两者使用完全相同的 API
response1 = await local_ref.ask(msg)
response2 = await remote_ref.ask(msg)
错误处理¶
Pulsing 为本地和远程 Actor 提供了统一的错误类型,确保在集群中一致的错误处理。
错误类型¶
- PulsingRuntimeError: 框架错误(网络、集群、Actor 系统等)
- PulsingActorError: Actor 执行错误
- PulsingBusinessError: 业务逻辑错误(用户输入验证等)
- PulsingSystemError: 系统错误(可能触发 Actor 重启)
- PulsingTimeoutError: 超时错误(可重试)
示例¶
from pulsing.exceptions import (
PulsingBusinessError,
PulsingSystemError,
PulsingRuntimeError,
)
try:
remote_ref = await system.resolve("worker")
response = await remote_ref.ask(msg)
except PulsingBusinessError as e:
# 处理业务错误(用户输入问题)
print(f"验证失败: {e.message}")
except PulsingSystemError as e:
# 处理系统错误(可能触发重启)
print(f"系统错误: {e.error}, 可恢复: {e.recoverable}")
except PulsingRuntimeError as e:
# 处理框架错误(网络、集群等)
print(f"框架错误: {e}")
网络故障¶
网络相关错误会作为 PulsingRuntimeError 抛出:
try:
remote_ref = await system.resolve("worker")
response = await remote_ref.ask(msg)
except PulsingRuntimeError as e:
# 网络故障、集群问题或 Actor 未找到
if "Connection" in str(e) or "timeout" in str(e).lower():
# 使用退避策略重试
pass
elif "not found" in str(e).lower():
# Actor 不存在
pass
超时¶
为远程调用使用超时,避免无限等待:
try:
response = await asyncio.wait_for(remote_ref.ask(msg), timeout=10.0)
except asyncio.TimeoutError:
print("请求超时")
except PulsingRuntimeError as e:
print(f"远程调用失败: {e}")
对 proxy 方法调用可使用:await asyncio.wait_for(proxy.some_method(), timeout=10.0)。
最佳实践¶
- 等待集群同步:加入集群后添加短暂延迟
- 优雅处理错误:在 try-except 块中包装远程调用
- 使用命名 actor:需要远程访问的 actor 必须有
name - 使用 pul.resolve(name, cls=...) 或 Class.resolve(name):获取有类型代理以获得更好的 API 体验
- 使用超时:考虑为远程调用添加超时
示例:分布式计数器¶
import pulsing as pul
@pul.remote
class DistributedCounter:
def __init__(self, init_value: int = 0):
self.value = init_value
def get(self) -> int:
return self.value
def increment(self, n: int = 1) -> int:
self.value += n
return self.value
# Node 1: 创建命名计数器(可被远程发现)
system1 = await pul.actor_system(addr="0.0.0.0:8000")
counter = await DistributedCounter.spawn(name="counter", init_value=0)
# Node 2: 访问远程计数器
system2 = await pul.actor_system(addr="0.0.0.0:8001", seeds=["127.0.0.1:8000"])
await asyncio.sleep(1.0)
# 解析并使用远程计数器
remote_counter = await DistributedCounter.resolve("counter")
value = await remote_counter.get() # 0
value = await remote_counter.increment(5) # 5