通信范式指南¶
本指南解释 Pulsing 中不同通信范式的设计原理和使用场景,帮助您理解"为什么"需要这些范式,以及"何时"使用它们。
为什么需要不同的通信范式?¶
Actor 的核心特性¶
在 Actor 模型中,每个 Actor 一次只处理一条消息。这是 Actor 模型的基础保证,确保状态更新的安全性。
Actor 邮箱(FIFO 队列)
↓
[消息1] → Actor 处理 → 响应1
[消息2] → Actor 处理 → 响应2 ← 必须等待消息1完成
[消息3] → Actor 处理 → 响应3 ← 必须等待消息2完成
问题:阻塞 vs 非阻塞¶
如果 Actor 在处理一条消息时被阻塞(例如等待网络响应),那么:
结果:Actor 无法处理其他消息,吞吐量极低。
解决方案:使用异步非阻塞模式:
结果:Actor 可以并发处理多个请求,吞吐量大幅提升。
为什么需要流式响应?¶
对于需要长时间生成结果的操作(如 LLM token 生成),如果等待全部完成:
问题:用户体验差,需要等待很久。
解决方案:流式传输,边生成边返回:
结果:用户立即看到进度,体验更好。
四种通信范式¶
基于上述原理,Pulsing 提供了四种通信范式:
| 范式 | 方法类型 | 为什么需要 | 使用场景 |
|---|---|---|---|
| 同步 | def method() |
快速操作不需要并发,简单直接 | 快速 CPU 工作、状态变更 |
| 异步 | async def method() |
避免阻塞,允许并发处理 | I/O 操作、外部 API 调用 |
| 流式 | async def method() 带 yield |
增量返回,提升用户体验 | LLM token 生成、大数据传输 |
| 发送即忘 | tell() |
不需要响应,最大化吞吐量 | 日志记录、通知 |
1. 同步方法 (def method)¶
为什么需要同步方法?¶
原理:对于快速操作(< 10ms),并发带来的开销大于收益。
- ✅ 简单直接:不需要
async/await,代码更简洁 - ✅ 无并发开销:快速操作不需要并发,顺序执行即可
- ✅ 可预测:严格顺序执行,易于理解和调试
适用场景:操作足够快,阻塞时间可以忽略不计。
行为特性¶
- 顺序执行:Actor 一次处理一个请求
- 阻塞 Actor:处理时,Actor 无法处理其他消息
- 简单可预测:无并发问题
何时使用¶
✅ 最适合: - 快速 CPU 密集型操作(计算、状态更新) - 简单状态变更(递增计数器、更新字典) - 在微秒到毫秒内完成的操作(< 10ms)
❌ 避免用于: - 网络请求(HTTP、数据库查询) - 文件 I/O 操作 - 可能耗时 > 10ms 的任何操作
示例¶
@pul.remote
class Counter:
def __init__(self):
self.value = 0
self.history = []
# ✅ 好:快速状态变更
def increment(self, n: int = 1) -> int:
self.value += n
self.history.append(self.value)
return self.value
# ✅ 好:简单计算
def get_average(self) -> float:
if not self.history:
return 0.0
return sum(self.history) / len(self.history)
# ❌ 差:网络 I/O 会阻塞 Actor
def fetch_data(self, url: str) -> dict:
# 这会阻塞 Actor 整个 HTTP 请求期间!
response = requests.get(url) # 不要这样做!
return response.json()
性能特征¶
2. 异步方法 (async def method)¶
为什么需要异步方法?¶
核心问题:如果使用同步方法处理 I/O 操作,Actor 会被阻塞,无法处理其他消息。
原理:
- 异步方法在 await 时会让出控制权
- Actor 可以在等待期间处理其他消息
- 多个异步操作可以并发执行
对比:
# ❌ 同步:阻塞 Actor
def fetch_data(self, url: str) -> dict:
response = requests.get(url) # 阻塞 500ms
return response.json()
# 结果:Actor 在这 500ms 内无法处理任何其他消息
# ✅ 异步:非阻塞
async def fetch_data(self, url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url) # 等待期间可以处理其他消息
return response.json()
# 结果:Actor 可以在等待 HTTP 响应时处理其他请求
行为特性¶
- 非阻塞执行:Actor 可以在等待时处理其他消息
- 并发处理:多个异步方法可以同时运行
- 后台任务:方法作为 Actor 上的后台任务运行
何时使用¶
✅ 最适合: - I/O 操作(HTTP 请求、数据库查询、文件 I/O) - 外部 API 调用 - 可能耗时 > 10ms 的操作 - 需要并发处理多个请求
❌ 避免用于: - 快速 CPU 密集型操作(使用同步方法更简单) - 简单状态变更(同步方法更简单)
示例¶
@pul.remote
class DataService:
def __init__(self):
self.cache = {}
# ✅ 好:网络 I/O - 不阻塞 Actor
async def fetch_user(self, user_id: str) -> dict:
# 等待 HTTP 响应时,Actor 可以处理其他请求
async with httpx.AsyncClient() as client:
response = await client.get(f"https://api.example.com/users/{user_id}")
return response.json()
# ✅ 好:数据库查询
async def get_orders(self, user_id: str) -> list[dict]:
# 等待数据库时,Actor 可以处理其他请求
async with database.transaction() as tx:
return await tx.fetch("SELECT * FROM orders WHERE user_id = $1", user_id)
# ✅ 好:多个并发操作
async def fetch_user_profile(self, user_id: str) -> dict:
# 这些操作并发运行,不是顺序运行
user, orders, preferences = await asyncio.gather(
self.fetch_user(user_id),
self.get_orders(user_id),
self.get_preferences(user_id),
)
return {"user": user, "orders": orders, "preferences": preferences}
# ❌ 差:快速操作 - 同步更简单
async def get_cache(self, key: str) -> dict:
# 这个操作足够快,适合同步方法
return self.cache.get(key, {})
性能特征¶
请求 1: [████████████████████] 50ms(等待 HTTP)
请求 2: [████████████████████] 50ms(等待 HTTP) ← 并发!
请求 3: [████████████████████] 50ms(等待 HTTP) ← 并发!
总计: ~50ms(并发,不是 150ms!)
使用模式¶
模式 1:等待最终结果¶
service = await DataService.spawn()
# 等待最终结果
result = await service.fetch_user("user123")
print(result)
模式 2:发送即忘(后台任务)¶
# 启动异步操作,不等待
task = asyncio.create_task(service.fetch_user("user123"))
# 做其他工作...
await other_operations()
# 稍后获取结果
result = await task
3. 流式响应 (async def method 带 yield)¶
为什么需要流式响应?¶
核心问题:某些操作需要很长时间才能完成(如 LLM 生成 1000 个 token),如果等待全部完成再返回:
原理:
- 使用 yield 增量返回结果
- 客户端可以立即开始处理第一个结果
- 提升用户体验,减少感知延迟
额外好处: - 可以提前取消(如果用户不需要了) - 可以显示进度更新 - 可以处理大数据集(不需要全部加载到内存)
行为特性¶
- 增量交付:结果在可用时立即发送
- 非阻塞:Actor 可以在生成流时处理其他消息
- 背压:通过有界通道自然流控
- 可取消:客户端可以取消流消费
何时使用¶
✅ 最适合: - LLM token 生成(用户希望立即看到输出) - 大数据传输(分块处理,避免内存溢出) - 实时数据流(传感器数据、日志) - 进度更新(长时间任务需要反馈)
❌ 避免用于: - 小的完整响应(使用常规异步方法) - 需要原子结果时(全有或全无)
示例¶
@pul.remote
class LLMService:
# ✅ 好:流式 LLM token
async def generate(self, prompt: str):
# 在生成时流式传输 token
async for token in self.llm_client.stream(prompt):
yield {"token": token, "type": "token"}
# 最终结果
yield {"type": "done", "total_tokens": count}
# ✅ 好:大文件处理
async def process_large_file(self, file_path: str):
with open(file_path, "r") as f:
for i, line in enumerate(f):
processed = process_line(line)
yield {"line": i, "data": processed}
# 允许处理其他消息
await asyncio.sleep(0) # 让出控制权
# ✅ 好:进度更新
async def long_running_task(self, task_id: str):
for step in range(100):
result = await do_work(step)
yield {"progress": step, "result": result}
使用模式¶
模式 1:增量消费流¶
service = await LLMService.spawn()
# 在 token 到达时处理
async for chunk in service.generate("Hello, world!"):
if chunk["type"] == "token":
print(chunk["token"], end="", flush=True)
elif chunk["type"] == "done":
print(f"\n总 token 数: {chunk['total_tokens']}")
模式 2:等待最终结果(跳过中间结果)¶
模式 3:提前取消流¶
async def consume_with_timeout():
async with asyncio.timeout(5.0):
async for chunk in service.generate("很长的提示..."):
process(chunk)
# 超时时自动取消流
性能特征¶
客户端: [chunk1][chunk2][chunk3]...
↓ ↓ ↓
网络: [████][████][████]...
↓ ↓ ↓
Actor: [gen][gen][gen]... ← 非阻塞生成
↓ ↓ ↓
LLM API: [████████████████]... ← 持续生成
总延迟: 第一个块快速到达,不等待所有块
4. Ask vs Tell¶
为什么需要两种模式?¶
核心区别:是否需要等待响应。
ask():需要响应,等待结果返回tell():不需要响应,发送后立即继续
为什么重要:
❌ 所有操作都用 ask():
await logger.ask({"level": "info", "msg": "..."}) # 等待响应
await metrics.ask({"event": "..."}) # 等待响应
await notifier.ask({"user": "..."}) # 等待响应
问题:即使不需要结果,也要等待,降低吞吐量
✅ 区分使用:
await logger.tell({"level": "info", "msg": "..."}) # 不等待
await metrics.tell({"event": "..."}) # 不等待
result = await service.get_user("123") # 需要结果,使用 ask
好处:不需要响应的操作不阻塞,吞吐量更高
ask() - 请求/响应¶
为什么使用:需要知道操作结果或是否成功。
何时使用: - 需要响应进行后续处理 - 需要知道操作是否成功 - 需要错误处理
# ✅ 好:需要结果
result = await counter.increment(10)
print(f"新值: {result}")
# ✅ 好:需要检查成功
try:
user = await service.get_user("user123")
except PulsingActorError:
print("用户未找到")
tell() - 发送即忘¶
为什么使用:最大化吞吐量,不需要等待响应。
何时使用: - 不需要响应(日志、指标) - 操作可以安全丢弃 - 想要最大吞吐量
# ✅ 好:日志记录 - 不需要响应
await logger.tell({"level": "info", "message": "用户已登录"})
# ✅ 好:指标 - 发送即忘
await metrics.tell({"event": "page_view", "page": "/home"})
# ✅ 好:通知 - 最终交付即可
await notifier.tell({"user_id": "123", "message": "新邮件"})
对比¶
| 方面 | ask() |
tell() |
|---|---|---|
| 响应 | ✅ 返回值 | ❌ 无响应 |
| 错误处理 | ✅ 抛出异常 | ❌ 静默失败 |
| 吞吐量 | 较低(等待响应) | 较高(不等待) |
| 使用场景 | 需要结果的操作 | 可以丢弃的操作 |
5. 快速决策指南¶
决策流程¶
开始:你的操作需要什么?
1. 需要响应吗?
├─ 否 → 使用 `tell()`(发送即忘)
│ 原因:不需要等待,最大化吞吐量
│
└─ 是 → 继续下一步
2. 操作需要多长时间?
├─ < 10ms → 使用 `def method()`(同步)
│ 原因:足够快,不需要并发,代码更简单
│
└─ > 10ms → 继续下一步
3. 需要增量返回结果吗?
├─ 否 → 使用 `async def method()`(异步)
│ 原因:避免阻塞,允许并发处理
│
└─ 是 → 使用 `async def method()` 带 `yield`(流式)
原因:立即返回部分结果,提升用户体验
为什么这样选择?¶
| 选择 | 原因 |
|---|---|
tell() |
不需要响应,不等待可以最大化吞吐量 |
def method() |
快速操作不需要并发,同步代码更简单 |
async def method() |
避免阻塞 Actor,允许并发处理多个请求 |
async def method() + yield |
立即返回部分结果,提升用户体验 |
6. 实际示例¶
示例 1:计数器服务¶
@pul.remote
class Counter:
def __init__(self):
self.value = 0
# ✅ 同步:快速状态变更
def increment(self, n: int = 1) -> int:
self.value += n
return self.value
# ✅ 同步:简单读取
def get(self) -> int:
return self.value
# ✅ 同步:快速操作
def reset(self) -> None:
self.value = 0
为什么使用同步? - 所有操作都很快(< 1ms) - 无 I/O 操作,纯内存操作 - 不需要并发,顺序执行即可 - 同步代码更简单,易于理解
如果改用异步会怎样?
- ❌ 增加不必要的 async/await 开销
- ❌ 代码更复杂,但没有性能提升
- ❌ 操作太快,并发带来的收益为零
示例 2:HTTP API 客户端¶
@pul.remote
class APIClient:
# ✅ 异步:网络 I/O
async def fetch_data(self, url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url) # 等待期间,Actor 可以处理其他请求
return response.json()
# ✅ 异步:多个并发请求
async def fetch_multiple(self, urls: list[str]) -> list[dict]:
tasks = [self.fetch_data(url) for url in urls]
return await asyncio.gather(*tasks) # 并发执行,不是顺序执行
为什么使用异步? - 网络请求需要时间(通常 50-500ms) - 如果使用同步,Actor 会被阻塞,无法处理其他请求 - 使用异步,Actor 可以在等待 HTTP 响应时处理其他消息 - 多个请求可以并发执行,大幅提升吞吐量
如果改用同步会怎样? - ❌ Actor 在等待 HTTP 响应时无法处理任何其他消息 - ❌ 吞吐量极低(一次只能处理一个请求) - ❌ 用户体验差(所有请求排队等待)
示例 3:LLM 服务¶
@pul.remote
class LLMService:
# ✅ 流式:Token 增量到达
async def generate(self, prompt: str):
async for token in self.llm_client.stream(prompt):
yield {"token": token} # 立即返回每个 token
yield {"done": True}
# ✅ 异步:单次完成(不需要流式)
async def embed(self, text: str) -> list[float]:
return await self.llm_client.embed(text) # 快速完成,不需要流式
为什么 generate 使用流式?
- LLM 生成需要时间(可能 5-30 秒)
- 如果等待全部完成,用户需要等待很久才能看到任何内容
- 使用流式,用户立即看到第一个 token,体验更好
- 用户可以提前取消(如果不需要了)
为什么 embed 使用异步而不是流式?
- Embedding 操作通常很快(< 1 秒)
- 结果是单个向量,不需要增量返回
- 使用异步避免阻塞即可,不需要流式
如果 generate 不使用流式会怎样?
- ❌ 用户需要等待 10-30 秒才能看到任何输出
- ❌ 无法提前取消(即使不需要了也要等待)
- ❌ 用户体验极差
示例 4:混合模式¶
@pul.remote
class DataProcessor:
def __init__(self):
self.processed_count = 0 # 快速状态更新
# ✅ 同步:快速计数器更新
def get_stats(self) -> dict:
return {"processed": self.processed_count}
# ✅ 异步:I/O 操作
async def fetch_from_db(self, query: str) -> list[dict]:
return await database.query(query)
# ✅ 流式:增量处理大数据集
async def process_large_dataset(self, dataset_id: str):
async for record in self.fetch_records(dataset_id):
processed = await self.process_record(record)
self.processed_count += 1 # 快速更新
yield {"record": processed, "count": self.processed_count}
为什么混合? 不同操作有不同的特性 - 为每个操作使用正确的工具。
7. 性能对比:理解差异¶
场景:处理 1000 个请求¶
同步方法(顺序执行)¶
执行时间线:
为什么慢? 必须等待前一个请求完成才能处理下一个。
异步方法(并发执行)¶
async def process(self, data: str) -> str:
result = await external_api(data) # 每个 50ms(等待网络)
return result
执行时间线:
为什么快? 所有请求并发执行,Actor 在等待网络响应时可以处理其他请求。
流式(增量返回)¶
async def process(self, data: str):
for chunk in split_data(data):
result = await process_chunk(chunk)
yield result # 立即返回
执行时间线:
为什么更好? 用户不需要等待全部完成,可以立即开始处理第一个结果。
关键理解¶
- 同步:顺序执行,简单但慢(适合快速操作)
- 异步:并发执行,快但需要
async/await(适合 I/O 操作) - 流式:增量返回,用户体验好(适合长时间操作)
8. 常见陷阱:理解为什么错误¶
❌ 陷阱 1:对 I/O 使用同步¶
问题:阻塞 Actor,无法处理其他消息。
# ❌ 差:在 HTTP 请求期间阻塞 Actor
def fetch_data(self, url: str) -> dict:
response = requests.get(url) # 阻塞数秒!
return response.json()
# 结果:Actor 在这几秒内无法处理任何其他消息
为什么错误? - Actor 被阻塞,无法处理其他请求 - 吞吐量极低(一次只能处理一个请求) - 用户体验差(所有请求排队)
# ✅ 好:非阻塞异步
async def fetch_data(self, url: str) -> dict:
async with httpx.AsyncClient() as client:
response = await client.get(url) # 等待期间可以处理其他请求
return response.json()
# 结果:Actor 可以并发处理多个请求
❌ 陷阱 2:对快速操作使用异步¶
问题:增加不必要的复杂度,没有性能提升。
# ❌ 差:不必要的异步开销
async def increment(self, n: int) -> int:
self.value += n # 这个操作只需要 < 1ms
return self.value
# 问题:操作太快,并发带来的收益为零,但代码更复杂
为什么错误?
- 操作太快(< 1ms),不需要并发
- 增加 async/await 语法复杂度
- 没有性能提升
❌ 陷阱 3:LLM 不使用流式¶
问题:用户体验差,需要等待很久。
# ❌ 差:等待所有 token
async def generate(self, prompt: str) -> str:
tokens = []
async for token in self.llm_client.stream(prompt):
tokens.append(token)
return "".join(tokens) # 用户等待 10-30 秒才能看到任何内容
# 问题:用户需要等待全部完成,无法提前取消
为什么错误? - 用户需要等待 10-30 秒才能看到任何输出 - 无法提前取消(即使不需要了) - 用户体验极差
# ✅ 好:token 到达时流式传输
async def generate(self, prompt: str):
async for token in self.llm_client.stream(prompt):
yield token # 用户立即看到 token
# 结果:用户立即看到输出,可以提前取消
❌ 陷阱 4:对发送即忘使用 Ask¶
问题:不必要的等待,降低吞吐量。
为什么错误? - 不需要响应,但还是要等待 - 降低吞吐量(所有日志操作都要等待) - 增加延迟
9. 最佳实践总结¶
核心原则¶
- 快速操作(< 10ms):使用
def method()(同步) -
原因:足够快,不需要并发,代码更简单
-
I/O 操作(> 10ms):使用
async def method()(异步) -
原因:避免阻塞 Actor,允许并发处理
-
增量结果:使用
async def method()带yield(流式) -
原因:立即返回部分结果,提升用户体验
-
不需要响应:使用
tell()(发送即忘) -
原因:最大化吞吐量,不阻塞
-
需要响应:使用
ask()或方法调用 -
原因:需要知道操作结果或是否成功
-
LLM token 生成:始终使用流式
-
原因:生成时间长,用户希望立即看到输出
-
多个并发操作:使用
async def配合asyncio.gather() - 原因:并发执行,而不是顺序执行
10. 快速参考¶
| 操作类型 | 范式 | 示例 |
|---|---|---|
| 计数器递增 | def increment() |
快速状态更新 |
| HTTP 请求 | async def fetch() |
网络 I/O |
| 数据库查询 | async def query() |
I/O 操作 |
| LLM 生成 | async def generate() 带 yield |
流式 token |
| 文件处理 | async def process() 带 yield |
大数据 |
| 日志记录 | tell() |
发送即忘 |
| 指标收集 | tell() |
发送即忘 |
| 获取结果 | ask() 或 await method() |
需要响应 |