Actor 指南¶
本指南介绍 Actor 模型 概念和构建健壮分布式应用的模式。
前置要求
如果尚未完成 快速开始,请先阅读。
通信范式
不确定何时使用同步、异步还是流式?请参阅通信范式指南获取详细指导。
什么是 Actor?¶
Actor 是并发和分布式系统中的基本计算单元。Actor 模型由 Carl Hewitt 于 1973 年提出,提供了一种构建以下系统的原则性方法:
- 并发:多个 Actor 并行运行
- 分布式:Actor 可以在不同机器上
- 容错:故障被隔离
核心原则¶
┌─────────────────────────────────────────────────────────────┐
│ Actor │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 私有状态 │ │ 消息邮箱 │ │ 行为 │ │
│ │ (State) │ │ (FIFO) │ │ (Methods) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ ▲ │ │ │
│ │ ▼ ▼ │
│ └───────── 每次只处理一条消息 ──────────────────────│
└─────────────────────────────────────────────────────────────┘
| 原则 | 描述 |
|---|---|
| 隔离性 | 每个 Actor 有私有状态;无共享内存 |
| 消息传递 | Actor 仅通过异步消息通信 |
| 顺序处理 | 每次只处理一条消息(无内部锁) |
| 位置透明 | 本地和远程 Actor 使用相同 API |
为什么选择 Pulsing 而非 Ray?¶
Ray 的 "actor" 本质上是带状态的远程对象 — 你调用它的方法,但没有正式的消息队列或投递语义。
Pulsing 遵循经典 Actor 模型(类似 Erlang/Akka):
| 特性 | Pulsing | Ray |
|---|---|---|
| 消息队列(邮箱) | ✅ FIFO | ❌ 直接调用 |
| 顺序保证 | ✅ 每 Actor | ⚠️ 每方法 |
| 监督/重启 | ✅ 内置 | ❌ 手动 |
| 零外部依赖 | ✅ | ❌ (需 Ray 集群) |
| 流式消息 | ✅ 原生 | ❌ |
Python API¶
全局异步 API¶
import pulsing as pul
@pul.remote
class Calculator:
def __init__(self, initial_value: int = 0):
self.value = initial_value
def add(self, n: int) -> int:
self.value += n
return self.value
async def main():
await pul.init()
calc = await Calculator.spawn(initial_value=100)
result = await calc.add(50) # 150
await pul.shutdown()
消息模式¶
Ask(请求-响应)¶
Tell(即发即忘)¶
流式消息¶
用于持续数据流(如 LLM token 生成),只需返回 generator:
@pul.remote
class TokenGenerator:
async def generate(self, prompt: str):
# 直接返回 async generator - Pulsing 自动处理流式传输
for token in self.generate_tokens(prompt):
yield {"token": token}
# 消费流
async for chunk in generator.generate("Hello"):
print(chunk["token"], end="", flush=True)
监督(Actor 级重启)¶
Pulsing 支持 Actor 失败后自动重启:
@pul.remote(
restart_policy="on_failure", # "never" | "on_failure" | "always"
max_restarts=3,
min_backoff=1.0,
max_backoff=60.0
)
class ReliableWorker:
def process(self, data):
# 如果崩溃,Actor 会自动重启
return heavy_computation(data)
Note
重启恢复 Actor 但不恢复内存状态。参阅 可靠性指南 了解幂等模式。
错误处理¶
Pulsing 提供了统一的错误处理系统,具有清晰的错误分类。
抛出错误¶
from pulsing.exceptions import (
PulsingBusinessError,
PulsingSystemError,
PulsingTimeoutError,
)
@pul.remote
class Service:
async def validate(self, data: str) -> bool:
if not data:
raise PulsingBusinessError(400, "数据必需")
return True
async def process(self, data: str) -> str:
try:
return expensive_operation(data)
except Exception as e:
raise PulsingSystemError(f"处理失败: {e}", recoverable=True)
async def fetch_with_timeout(self, url: str) -> str:
try:
return await asyncio.wait_for(httpx.get(url), timeout=5.0)
except asyncio.TimeoutError:
raise PulsingTimeoutError("fetch", duration_ms=5000)
捕获错误¶
from pulsing.exceptions import (
PulsingBusinessError,
PulsingSystemError,
PulsingRuntimeError,
)
try:
result = await service.process(data)
except PulsingBusinessError as e:
# 处理业务逻辑错误
print(f"验证错误: {e.message}")
except PulsingSystemError as e:
# 处理系统错误
if e.recoverable:
# 可以重试或等待 Actor 重启
pass
else:
# 不可恢复的错误
logger.error(f"致命错误: {e.error}")
except PulsingRuntimeError as e:
# 处理框架错误(网络、集群等)
print(f"系统错误: {e}")
自动错误分类¶
标准 Python 异常会自动分类:
@pul.remote
class Processor:
def process(self, data: str) -> str:
if not data:
# ValueError → PulsingBusinessError (code=400)
raise ValueError("数据必需")
# 其他异常 → PulsingSystemError (recoverable=True)
return process_data(data)
进阶模式¶
1. 有状态 Actor¶
@pul.remote
class SessionManager:
def __init__(self):
self.sessions = {}
def create_session(self, user_id: str) -> str:
session_id = str(uuid.uuid4())
self.sessions[session_id] = {"user_id": user_id, "data": {}}
return session_id
def get_session(self, session_id: str) -> dict | None:
return self.sessions.get(session_id)
2. Worker 池(轮询)¶
@pul.remote
class WorkerPool:
def __init__(self, workers: list):
self.workers = workers
self.idx = 0
async def submit(self, task: dict):
worker = self.workers[self.idx]
self.idx = (self.idx + 1) % len(self.workers)
return await worker.process(task)
3. 流水线¶
@pul.remote
class PipelineStage:
def __init__(self, next_stage=None):
self.next_stage = next_stage
async def process(self, data: dict) -> dict:
result = await self.transform(data)
if self.next_stage:
return await self.next_stage.process(result)
return result
4. LLM 推理服务¶
@pul.remote
class LLMService:
def __init__(self, model_name: str):
self.model_name = model_name
self.model = None
async def load_model(self):
from transformers import AutoModelForCausalLM, AutoTokenizer
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
self.model = AutoModelForCausalLM.from_pretrained(self.model_name)
async def generate(self, prompt: str, max_tokens: int = 100) -> str:
inputs = self.tokenizer(prompt, return_tensors="pt")
outputs = self.model.generate(**inputs, max_new_tokens=max_tokens)
return self.tokenizer.decode(outputs[0], skip_special_tokens=True)
最佳实践¶
| ✅ 应该 | ❌ 不应该 |
|---|---|
| Actor 单一职责 | 在 Actor 间共享可变状态 |
| I/O 使用 async | 在方法中阻塞 |
| 优雅处理错误 | 忽略异常 |
在 __init__ 初始化状态 |
使用全局变量 |
错误处理¶
@pul.remote
class ResilientActor:
async def risky_operation(self, data: dict) -> dict:
try:
result = await self.process(data)
return {"success": True, "result": result}
except ValueError as e:
return {"success": False, "error": str(e)}
except Exception as e:
logger.error(f"意外错误: {e}")
raise
快速参考¶
常用操作¶
import pulsing as pul
await pul.init()
# 使用 @remote 装饰器(推荐)
@pul.remote
class MyService:
def process(self, data): return data
service = await MyService.spawn(name="service")
result = await service.process("hello")
# 解析已有 actor
proxy = await MyService.resolve("service")
# 关闭
await pul.shutdown()