跳转至

Actor 指南

本指南介绍 Actor 模型 概念和构建健壮分布式应用的模式。

前置要求

如果尚未完成 快速开始,请先阅读。

通信范式

不确定何时使用同步、异步还是流式?请参阅通信范式指南获取详细指导。


什么是 Actor?

Actor 是并发和分布式系统中的基本计算单元。Actor 模型由 Carl Hewitt 于 1973 年提出,提供了一种构建以下系统的原则性方法:

  • 并发:多个 Actor 并行运行
  • 分布式:Actor 可以在不同机器上
  • 容错:故障被隔离

核心原则

┌─────────────────────────────────────────────────────────────┐
│                         Actor                               │
├─────────────────────────────────────────────────────────────┤
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐     │
│  │ 私有状态     │    │  消息邮箱    │    │  行为       │     │
│  │ (State)     │    │  (FIFO)     │    │  (Methods)  │     │
│  └─────────────┘    └─────────────┘    └─────────────┘     │
│        ▲                  │                   │             │
│        │                  ▼                   ▼             │
│        └───────── 每次只处理一条消息 ──────────────────────│
└─────────────────────────────────────────────────────────────┘
原则 描述
隔离性 每个 Actor 有私有状态;无共享内存
消息传递 Actor 仅通过异步消息通信
顺序处理 每次只处理一条消息(无内部锁)
位置透明 本地和远程 Actor 使用相同 API

为什么选择 Pulsing 而非 Ray?

Ray 的 "actor" 本质上是带状态的远程对象 — 你调用它的方法,但没有正式的消息队列或投递语义。

Pulsing 遵循经典 Actor 模型(类似 Erlang/Akka):

特性 Pulsing Ray
消息队列(邮箱) ✅ FIFO ❌ 直接调用
顺序保证 ✅ 每 Actor ⚠️ 每方法
监督/重启 ✅ 内置 ❌ 手动
零外部依赖 ❌ (需 Ray 集群)
流式消息 ✅ 原生

Python API

全局异步 API

import pulsing as pul

@pul.remote
class Calculator:
    def __init__(self, initial_value: int = 0):
        self.value = initial_value

    def add(self, n: int) -> int:
        self.value += n
        return self.value

async def main():
    await pul.init()
    calc = await Calculator.spawn(initial_value=100)
    result = await calc.add(50)  # 150
    await pul.shutdown()

消息模式

Ask(请求-响应)

result = await calc.add(10)

Tell(即发即忘)

await actor_ref.tell({"event": "notify", "data": "event_data"})

流式消息

用于持续数据流(如 LLM token 生成),只需返回 generator:

@pul.remote
class TokenGenerator:
    async def generate(self, prompt: str):
        # 直接返回 async generator - Pulsing 自动处理流式传输
        for token in self.generate_tokens(prompt):
            yield {"token": token}

# 消费流
async for chunk in generator.generate("Hello"):
    print(chunk["token"], end="", flush=True)

监督(Actor 级重启)

Pulsing 支持 Actor 失败后自动重启:

@pul.remote(
    restart_policy="on_failure",  # "never" | "on_failure" | "always"
    max_restarts=3,
    min_backoff=1.0,
    max_backoff=60.0
)
class ReliableWorker:
    def process(self, data):
        # 如果崩溃,Actor 会自动重启
        return heavy_computation(data)

Note

重启恢复 Actor 但不恢复内存状态。参阅 可靠性指南 了解幂等模式。


错误处理

Pulsing 提供了统一的错误处理系统,具有清晰的错误分类。

抛出错误

from pulsing.exceptions import (
    PulsingBusinessError,
    PulsingSystemError,
    PulsingTimeoutError,
)

@pul.remote
class Service:
    async def validate(self, data: str) -> bool:
        if not data:
            raise PulsingBusinessError(400, "数据必需")
        return True

    async def process(self, data: str) -> str:
        try:
            return expensive_operation(data)
        except Exception as e:
            raise PulsingSystemError(f"处理失败: {e}", recoverable=True)

    async def fetch_with_timeout(self, url: str) -> str:
        try:
            return await asyncio.wait_for(httpx.get(url), timeout=5.0)
        except asyncio.TimeoutError:
            raise PulsingTimeoutError("fetch", duration_ms=5000)

捕获错误

from pulsing.exceptions import (
    PulsingBusinessError,
    PulsingSystemError,
    PulsingRuntimeError,
)

try:
    result = await service.process(data)
except PulsingBusinessError as e:
    # 处理业务逻辑错误
    print(f"验证错误: {e.message}")
except PulsingSystemError as e:
    # 处理系统错误
    if e.recoverable:
        # 可以重试或等待 Actor 重启
        pass
    else:
        # 不可恢复的错误
        logger.error(f"致命错误: {e.error}")
except PulsingRuntimeError as e:
    # 处理框架错误(网络、集群等)
    print(f"系统错误: {e}")

自动错误分类

标准 Python 异常会自动分类:

@pul.remote
class Processor:
    def process(self, data: str) -> str:
        if not data:
            # ValueError → PulsingBusinessError (code=400)
            raise ValueError("数据必需")

        # 其他异常 → PulsingSystemError (recoverable=True)
        return process_data(data)

进阶模式

1. 有状态 Actor

@pul.remote
class SessionManager:
    def __init__(self):
        self.sessions = {}

    def create_session(self, user_id: str) -> str:
        session_id = str(uuid.uuid4())
        self.sessions[session_id] = {"user_id": user_id, "data": {}}
        return session_id

    def get_session(self, session_id: str) -> dict | None:
        return self.sessions.get(session_id)

2. Worker 池(轮询)

@pul.remote
class WorkerPool:
    def __init__(self, workers: list):
        self.workers = workers
        self.idx = 0

    async def submit(self, task: dict):
        worker = self.workers[self.idx]
        self.idx = (self.idx + 1) % len(self.workers)
        return await worker.process(task)

3. 流水线

@pul.remote
class PipelineStage:
    def __init__(self, next_stage=None):
        self.next_stage = next_stage

    async def process(self, data: dict) -> dict:
        result = await self.transform(data)
        if self.next_stage:
            return await self.next_stage.process(result)
        return result

4. LLM 推理服务

@pul.remote
class LLMService:
    def __init__(self, model_name: str):
        self.model_name = model_name
        self.model = None

    async def load_model(self):
        from transformers import AutoModelForCausalLM, AutoTokenizer
        self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        self.model = AutoModelForCausalLM.from_pretrained(self.model_name)

    async def generate(self, prompt: str, max_tokens: int = 100) -> str:
        inputs = self.tokenizer(prompt, return_tensors="pt")
        outputs = self.model.generate(**inputs, max_new_tokens=max_tokens)
        return self.tokenizer.decode(outputs[0], skip_special_tokens=True)

最佳实践

✅ 应该 ❌ 不应该
Actor 单一职责 在 Actor 间共享可变状态
I/O 使用 async 在方法中阻塞
优雅处理错误 忽略异常
__init__ 初始化状态 使用全局变量

错误处理

@pul.remote
class ResilientActor:
    async def risky_operation(self, data: dict) -> dict:
        try:
            result = await self.process(data)
            return {"success": True, "result": result}
        except ValueError as e:
            return {"success": False, "error": str(e)}
        except Exception as e:
            logger.error(f"意外错误: {e}")
            raise

快速参考

常用操作

import pulsing as pul

await pul.init()

# 使用 @remote 装饰器(推荐)
@pul.remote
class MyService:
    def process(self, data): return data

service = await MyService.spawn(name="service")
result = await service.process("hello")

# 解析已有 actor
proxy = await MyService.resolve("service")

# 关闭
await pul.shutdown()

下一步