Pulsing Actors: 完整指南¶
目录¶
简介¶
Actor 是 Pulsing 应用的基本构建块。它们是通过异步消息传递进行通信的隔离并发状态机。本文档提供了在 Pulsing 中理解和使用 Actor 的完整指南。
核心设计理念: Pulsing 设计为轻量级且自包含。与其他 Actor 框架不同,它不需要 etcd、NATS 或 Consul 等外部依赖。所需的一切都是内置的。
什么是 Actor?¶
定义¶
Pulsing 中的 Actor 是:
- 具有私有状态的隔离计算单元
- 按顺序处理消息的消息处理器
- 具有用于远程调用的方法的类型化实体
- 位置透明:本地和远程 Actor 使用相同的 API
核心特性¶
graph LR
A[Actor] --> B[私有状态]
A --> C[消息邮箱]
A --> D[方法/端点]
A --> E[生命周期钩子]
style A fill:#6366F1,stroke:#333,stroke-width:2px,color:#fff
style B fill:#818CF8,color:#fff
style C fill:#818CF8,color:#fff
style D fill:#818CF8,color:#fff
style E fill:#818CF8,color:#fff
1. 隔离性
- 每个 Actor 都有自己的私有状态
- 其他 Actor 永远不能直接访问状态
- 所有交互都通过消息进行
2. 顺序处理
- 消息一次处理一条
- 下一条消息等待当前消息处理完成
- 保证 Actor 内部状态一致性
3. 异步通信
- 消息异步发送
- 发送者可以等待响应或即发即忘
- 结果作为可等待的 future 返回
4. 位置透明
- Actor 可以是本地或远程的
- 无论位置如何,API 都相同
- 框架处理序列化和路由
Actor 生命周期¶
生命周期阶段¶
stateDiagram-v2
[*] --> 创建中: spawn()
创建中 --> 初始化中: __init__()
初始化中 --> 运行中: on_start()
运行中 --> 运行中: 处理消息
运行中 --> 停止中: stop()
停止中 --> [*]: on_stop()
运行中 --> 失败: 未处理的错误
失败 --> [*]: 传播错误
1. 创建阶段¶
生成 Actor:
from pulsing.actor import ActorSystem, SystemConfig
# 创建 Actor 系统
system = await ActorSystem.create(SystemConfig.standalone())
# 生成 Actor
actor_ref = await system.spawn(MyActor(), "my-actor")
发生了什么:
- 运行时为 Actor 分配资源
- Actor 在系统中注册
- 创建用于消息传递的邮箱
- 分配 Actor ID
2. 初始化阶段¶
__init__ 方法:
class DataProcessor:
def __init__(self, buffer_size: int):
# 初始化状态
self.buffer_size = buffer_size
self.buffer = []
self.processed_count = 0
重要说明:
__init__在 Actor 构造期间调用- 只应初始化状态
- 还没有消息传递 - Actor 尚未完全注册
3. 运行阶段¶
初始化后,Actor 进入主生命周期,开始处理消息。
sequenceDiagram
participant 发送者
participant 邮箱
participant Actor
loop 消息处理
发送者->>邮箱: 发送消息
邮箱->>Actor: 传递消息
Actor->>Actor: 处理消息
Actor-->>发送者: 返回结果
end
4. 终止阶段¶
正常终止:
- 处理所有待处理消息
- 清空邮箱
- 清理资源
- 通知系统
错误终止:
- 处理器中未处理的异常
- 错误传播给调用者
创建 Actor¶
方法 1:使用 @as_actor 装饰器(推荐)¶
@as_actor 装饰器是创建 Actor 的最简单方式:
from pulsing.actor import as_actor, SystemConfig, create_actor_system
@as_actor
class Calculator:
"""一个简单的计算器 Actor。"""
def __init__(self, initial_value: int = 0):
self.value = initial_value
self.history = []
def add(self, n: int) -> int:
"""将 n 加到当前值。"""
self.value += n
self.history.append(("add", n, self.value))
return self.value
def subtract(self, n: int) -> int:
"""从当前值减去 n。"""
self.value -= n
self.history.append(("subtract", n, self.value))
return self.value
def get_value(self) -> int:
"""获取当前值。"""
return self.value
async def main():
system = await create_actor_system(SystemConfig.standalone())
# 创建本地 Actor
calc = await Calculator.local(system, initial_value=100)
# 调用方法
result = await calc.add(50) # 150
result = await calc.subtract(30) # 120
value = await calc.get_value() # 120
await system.shutdown()
@as_actor 的优点:
- 无样板代码
- 方法自动成为端点
- 保留类型提示
- IDE 自动补全正常工作
方法 2:使用 Actor 基类¶
需要更多控制时,扩展 Actor 基类:
from pulsing.actor import Actor, Message, SystemConfig, create_actor_system
class EchoActor(Actor):
"""回显消息的 Actor。"""
def __init__(self):
self.message_count = 0
async def receive(self, msg: Message) -> Message:
"""处理传入消息。"""
self.message_count += 1
if msg.msg_type == "echo":
return Message.single("echo_response", msg.payload)
elif msg.msg_type == "count":
return Message.single("count_response",
str(self.message_count).encode())
else:
raise ValueError(f"未知消息类型: {msg.msg_type}")
async def main():
system = await create_actor_system(SystemConfig.standalone())
actor_ref = await system.spawn(EchoActor(), "echo")
# 发送消息并获取响应
response = await actor_ref.ask(Message.single("echo", b"hello"))
print(response.payload) # b"hello"
await system.shutdown()
方法 3:异步方法¶
Actor 可以有用于 I/O 操作的异步方法:
@as_actor
class AsyncWorker:
"""具有异步方法的 Actor。"""
def __init__(self):
self.cache = {}
async def fetch_data(self, url: str) -> dict:
"""从 URL 获取数据(异步操作)。"""
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
data = await response.json()
self.cache[url] = data
return data
async def process_batch(self, items: list) -> list:
"""使用异步操作处理项目。"""
results = []
for item in items:
await asyncio.sleep(0.01) # 模拟异步工作
results.append(item.upper())
return results
消息传递¶
消息类型¶
Pulsing 支持两种类型的消息:
graph TD
A[消息类型] --> B[单条消息]
A --> C[流式消息]
B --> B1[请求-响应]
B --> B2[即发即忘]
C --> C1[持续数据流]
C --> C2[分块处理]
style A fill:#6366F1,color:#fff
style B fill:#818CF8,color:#fff
style C fill:#818CF8,color:#fff
Ask 模式(请求-响应)¶
发送消息并等待响应:
# 使用 @as_actor
result = await calc.add(10)
# 使用 Actor 基类
response = await actor_ref.ask(Message.single("operation", data))
Tell 模式(即发即忘)¶
发送消息而不等待响应:
流式消息¶
用于持续数据流:
from pulsing.actor import StreamMessage
# 返回流式响应的 Actor
@as_actor
class TokenGenerator:
async def generate(self, prompt: str) -> Message:
stream_msg, writer = StreamMessage.create("tokens")
async def produce():
for i, token in enumerate(self.generate_tokens(prompt)):
# 直接写入 Python 对象 - 自动序列化
await writer.write({"token": token, "index": i})
await writer.close()
asyncio.create_task(produce())
return stream_msg
# 客户端消费流
response = await generator.generate("Hello")
async for chunk in response.stream_reader():
# chunk 已经是 Python dict - 自动反序列化
print(chunk["token"], end="", flush=True)
核心特性:
- 透明序列化:直接读写 Python 对象
- 异构流:每个 chunk 可以有不同的结构
- 背压控制:有界缓冲防止内存溢出
远程 Actor¶
集群设置¶
启动种子节点:
# 节点 1:启动种子节点
config = SystemConfig.with_addr("0.0.0.0:8000")
system = await create_actor_system(config)
# 生成公共 Actor(对集群可见)
await system.spawn(WorkerActor(), "worker", public=True)
加入集群:
# 节点 2:加入集群
config = (SystemConfig
.with_addr("0.0.0.0:8001")
.with_seeds(["192.168.1.1:8000"]))
system = await create_actor_system(config)
# 等待集群同步
await asyncio.sleep(1.0)
# 查找远程 Actor
worker = await system.find("worker")
result = await worker.process(data)
位置透明¶
# 本地 Actor
local_ref = await system.spawn(MyActor(), "local")
# 远程 Actor(通过集群找到)
remote_ref = await system.find("remote-worker")
# 两者使用相同的 API!
response1 = await local_ref.process(data)
response2 = await remote_ref.process(data)
公共 vs 私有 Actor¶
| 特性 | 公共 Actor | 私有 Actor |
|---|---|---|
| 集群可见性 | ✅ 对所有节点可见 | ❌ 仅本地 |
| 通过 find() 发现 | ✅ 是 | ❌ 否 |
| Gossip 注册 | ✅ 已注册 | ❌ 未注册 |
| 用例 | 服务、共享工作器 | 内部辅助器 |
# 公共 Actor - 可被其他节点找到
await system.spawn(ServiceActor(), "api-service", public=True)
# 私有 Actor - 仅本地
await system.spawn(HelperActor(), "internal-helper", public=False)
高级模式¶
1. LLM 推理服务模式¶
@as_actor
class LLMService:
"""用于 LLM 推理的 Actor。"""
def __init__(self, model_name: str):
self.model_name = model_name
self.model = None
self.tokenizer = None
async def load_model(self):
"""加载模型(创建后调用一次)。"""
from transformers import AutoModelForCausalLM, AutoTokenizer
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
self.model = AutoModelForCausalLM.from_pretrained(self.model_name)
async def generate(self, prompt: str, max_tokens: int = 100) -> str:
"""从提示生成文本。"""
inputs = self.tokenizer(prompt, return_tensors="pt")
outputs = self.model.generate(**inputs, max_new_tokens=max_tokens)
return self.tokenizer.decode(outputs[0], skip_special_tokens=True)
2. 工作池模式¶
@as_actor
class WorkerPool:
def __init__(self, num_workers: int):
self.workers = []
self.current_worker = 0
async def submit(self, task: dict) -> any:
"""将任务提交给下一个可用的工作器(轮询)。"""
worker = self.workers[self.current_worker]
self.current_worker = (self.current_worker + 1) % len(self.workers)
return await worker.process(task)
最佳实践¶
1. Actor 设计¶
✅ 应该:
- 保持 Actor 专注于单一职责
- 尽可能使用不可变消息
- 优雅地处理错误
- 记录方法契约
❌ 不应该:
- 在 Actor 之间共享可变状态
- 在方法中阻塞(使用 async)
- 创建循环依赖
- 忽略错误处理
2. 错误处理¶
@as_actor
class ResilientActor:
async def risky_operation(self, data: dict) -> dict:
try:
result = await self.process(data)
return {"success": True, "result": result}
except ValueError as e:
# 预期错误 - 返回错误响应
return {"success": False, "error": str(e)}
except Exception as e:
# 意外错误 - 记录并重新抛出
logger.error(f"意外错误: {e}")
raise
快速参考¶
基本 Actor¶
from pulsing.actor import as_actor, create_actor_system, SystemConfig
@as_actor
class MyActor:
def __init__(self, param: int):
self.param = param
def method(self, arg: int) -> int:
return self.param + arg
async def main():
system = await create_actor_system(SystemConfig.standalone())
actor = await MyActor.local(system, param=10)
result = await actor.method(5) # 15
await system.shutdown()
集群设置¶
# 种子节点
config = SystemConfig.with_addr("0.0.0.0:8000")
system = await create_actor_system(config)
await system.spawn(MyActor(), "service", public=True)
# 加入节点
config = SystemConfig.with_addr("0.0.0.0:8001").with_seeds(["seed:8000"])
system = await create_actor_system(config)
service = await system.find("service")
常用操作¶
# 生成本地 Actor
actor = await system.spawn(MyActor(), "name")
# 生成公共 Actor
actor = await system.spawn(MyActor(), "name", public=True)
# 查找远程 Actor
actor = await system.find("name")
# 检查 Actor 是否存在
exists = await system.has_actor("name")
# 停止 Actor
await system.stop("name")
# 关闭系统
await system.shutdown()
总结¶
关键要点¶
- Actor 是隔离的:私有状态,基于消息的通信
- 顺序处理:一次处理一条消息,FIFO 顺序
- @as_actor 装饰器:创建 Actor 的最简单方式
- 位置透明:本地/远程 Actor 使用相同代码
- 零外部依赖:不需要 etcd、NATS 或 Consul
- 内置集群:用于发现的 SWIM 协议
下一步¶
- 阅读远程 Actor 指南了解集群详情
- 探索设计文档了解实现细节
- 查看示例了解实用模式
- 参阅 API 参考获取完整文档