Pulsing Actor System 设计文档¶
概述¶
Pulsing Actor System 是一个轻量级的分布式 Actor 框架,专为 Pulsing 项目设计。它提供了简单易用的 Actor 模型实现,支持单机和集群部署,无需依赖外部服务(如 etcd、NATS)。
设计目标¶
- 轻量级 - 最小化依赖,易于集成
- 零外部依赖 - 不依赖 etcd、NATS 等外部服务
- 位置透明 - 本地和远程 Actor 使用相同 API
- 高性能 - 基于 Tokio 异步运行时
- 易于使用 - 简洁的 API 设计
架构概览¶
graph TB
subgraph ActorSystem["ActorSystem"]
subgraph LocalActors["Local Actors"]
A["Actor A<br/>(Mailbox)"]
B["Actor B<br/>(Mailbox)"]
C["Actor C<br/>(Mailbox)"]
end
subgraph Transport["HTTP Transport"]
T1["POST /actor/{name}<br/>Actor Messages"]
T2["POST /cluster/gossip<br/>Cluster Protocol"]
end
subgraph Cluster["GossipCluster"]
M["成员发现<br/>(Membership)"]
R["Actor 位置<br/>(Actor Registry)"]
S["故障检测<br/>(SWIM)"]
end
end
A & B & C --> Transport
Transport --> Cluster
style ActorSystem fill:#f5f5f5,stroke:#333,stroke-width:2px
style LocalActors fill:#e3f2fd,stroke:#1976d2
style Transport fill:#fff3e0,stroke:#f57c00
style Cluster fill:#e8f5e9,stroke:#388e3c
核心组件¶
1. Actor¶
Actor 是系统的基本计算单元,具有以下特性:
- 封装状态 - 状态只能通过消息访问
- 异步消息处理 - 非阻塞处理
- 生命周期管理 - on_start / on_stop 回调
#[async_trait]
pub trait Actor: Send + 'static {
/// Actor 唯一标识
fn id(&self) -> &ActorId;
/// 启动回调
async fn on_start(&mut self, ctx: &mut ActorContext) -> anyhow::Result<()> {
Ok(())
}
/// 停止回调
async fn on_stop(&mut self, ctx: &mut ActorContext) -> anyhow::Result<()> {
Ok(())
}
/// 处理原始消息 (用于类型擦除的远程消息)
async fn receive(
&mut self,
msg: RawMessage,
ctx: &mut ActorContext,
) -> anyhow::Result<RawMessage>;
}
2. Message¶
消息是 Actor 间通信的载体:
pub trait Message: Serialize + DeserializeOwned + Send + 'static {
/// 消息类型标识 (用于序列化/反序列化)
fn type_id() -> &'static str;
}
// 示例
#[derive(Serialize, Deserialize)]
struct Ping { value: i32 }
impl Message for Ping {
fn type_id() -> &'static str { "Ping" }
}
3. ActorRef¶
ActorRef 是 Actor 的引用,提供位置透明的消息发送:
pub struct ActorRef {
actor_id: ActorId,
inner: ActorRefInner, // Local 或 Remote
}
impl ActorRef {
/// 请求-响应模式
pub async fn ask<M, R>(&self, msg: M) -> anyhow::Result<R>
where
M: Message,
R: Message;
/// 单向消息 (Fire-and-forget)
pub async fn tell<M>(&self, msg: M) -> anyhow::Result<()>
where
M: Message;
}
位置透明性:
// 本地 Actor
let local_ref = system.spawn(MyActor::new()).await?;
// 远程 Actor (API 完全相同)
let remote_ref = system.actor_ref(&remote_actor_id).await?;
// 使用方式完全一致
let response: Pong = local_ref.ask(Ping { value: 1 }).await?;
let response: Pong = remote_ref.ask(Ping { value: 1 }).await?;
4. ActorContext¶
ActorContext 提供 Actor 执行上下文:
pub struct ActorContext {
actor_id: Option<ActorId>,
node_id: Option<NodeId>,
system: Option<Arc<dyn ActorSystemRef>>,
cancel_token: CancellationToken,
}
impl ActorContext {
/// 获取其他 Actor 的引用
pub async fn actor_ref(&self, id: &ActorId) -> anyhow::Result<ActorRef>;
/// 检查是否应该停止
pub fn is_cancelled(&self) -> bool;
}
5. Mailbox¶
Mailbox 是 Actor 的消息队列:
pub struct Mailbox {
sender: mpsc::Sender<Envelope>,
receiver: mpsc::Receiver<Envelope>,
}
pub struct Envelope {
msg_type: String,
payload: Vec<u8>,
respond_to: Option<oneshot::Sender<Result<Vec<u8>>>>,
}
特性: - 有界队列 (默认 256) - 背压支持 - Ask/Tell 两种模式
6. ActorSystem¶
ActorSystem 是整个框架的入口:
pub struct ActorSystem {
node_id: NodeId,
addr: SocketAddr,
local_actors: Arc<DashMap<String, LocalActorHandle>>,
cluster: Arc<RwLock<Option<Arc<GossipCluster>>>>,
transport: Arc<HttpTransport>,
cancel_token: CancellationToken,
}
impl ActorSystem {
/// 创建新系统
pub async fn new(config: SystemConfig) -> anyhow::Result<Arc<Self>>;
/// 创建 Actor
pub async fn spawn<A: Actor>(&self, actor: A) -> anyhow::Result<ActorRef>;
/// 获取 Actor 引用
pub async fn actor_ref(&self, id: &ActorId) -> anyhow::Result<ActorRef>;
/// 停止 Actor
pub async fn stop(&self, actor_name: &str) -> anyhow::Result<()>;
/// 关闭系统
pub async fn shutdown(&self) -> anyhow::Result<()>;
}
消息处理流程¶
本地消息¶
sequenceDiagram
participant S as Sender
participant M as Mailbox
participant A as Actor
S->>M: ask(Ping)
M->>A: recv()
A->>A: handle()
A-->>M: respond(Pong)
M-->>S: Pong
远程消息¶
sequenceDiagram
participant S as Sender
participant R as ActorRef(Remote)
participant N as Network
participant A as Actor (Node B)
S->>R: ask(Ping)
R->>N: HTTP POST /actor/{name}
Note over R,N: {msg_type, payload}
N->>A: Envelope
A->>A: handle()
A-->>N: {result}
N-->>R: HTTP Response
R-->>S: Pong
集群管理¶
GossipCluster¶
负责集群成员管理和 Actor 位置发现:
pub struct GossipCluster {
local_node: NodeId,
local_addr: SocketAddr,
members: Arc<RwLock<HashMap<NodeId, MemberInfo>>>,
actors: Arc<RwLock<HashMap<ActorId, NodeId>>>,
transport: Arc<HttpTransport>,
seed_addrs: Arc<RwLock<Vec<SocketAddr>>>,
config: GossipConfig,
swim: SwimDetector,
}
功能: - 成员发现和同步 - Actor 位置注册/查询 - SWIM 故障检测 - 周期性 seed 探测
节点发现¶
配置选项¶
pub struct SystemConfig {
/// HTTP 绑定地址
pub addr: SocketAddr,
/// Seed 节点地址
pub seed_nodes: Vec<SocketAddr>,
/// Gossip 配置
pub gossip_config: GossipConfig,
/// HTTP 传输配置
pub http_config: HttpTransportConfig,
}
pub struct HttpTransportConfig {
/// 请求超时 (默认 30s)
pub request_timeout: Duration,
/// 连接超时 (默认 5s)
pub connect_timeout: Duration,
/// Keep-alive 超时 (默认 60s)
pub keepalive_timeout: Duration,
/// 每主机最大连接数 (默认 32)
pub max_connections_per_host: usize,
}
使用示例¶
定义 Actor¶
#[derive(Serialize, Deserialize)]
struct Ping { value: i32 }
impl Message for Ping {
fn type_id() -> &'static str { "Ping" }
}
#[derive(Serialize, Deserialize)]
struct Pong { result: i32 }
impl Message for Pong {
fn type_id() -> &'static str { "Pong" }
}
struct EchoActor {
id: ActorId,
}
#[async_trait]
impl Actor for EchoActor {
fn id(&self) -> &ActorId { &self.id }
async fn receive(
&mut self,
msg: RawMessage,
_ctx: &mut ActorContext,
) -> anyhow::Result<RawMessage> {
match msg.msg_type.as_str() {
"Ping" => {
let ping: Ping = bincode::deserialize(&msg.payload)?;
let pong = Pong { result: ping.value * 2 };
RawMessage::from_message(&pong)
}
_ => Err(anyhow::anyhow!("Unknown message type")),
}
}
}
单机模式¶
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 创建系统
let system = ActorSystem::new(SystemConfig::standalone()).await?;
// 创建 Actor
let actor = EchoActor { id: ActorId::local("echo") };
let actor_ref = system.spawn(actor).await?;
// 发送消息
let response: Pong = actor_ref.ask(Ping { value: 21 }).await?;
assert_eq!(response.result, 42);
// 关闭
system.shutdown().await?;
Ok(())
}
集群模式¶
// 节点 1 (Seed)
let system1 = ActorSystem::new(
SystemConfig::with_addr("0.0.0.0:8001".parse()?)
).await?;
let actor = EchoActor { id: ActorId::local("echo") };
let _ref = system1.spawn(actor).await?;
// 节点 2 (加入集群)
let system2 = ActorSystem::new(
SystemConfig::with_addr("0.0.0.0:8002".parse()?)
.with_seeds(vec!["127.0.0.1:8001".parse()?])
).await?;
// 等待集群同步
tokio::time::sleep(Duration::from_millis(500)).await;
// 从节点 2 访问节点 1 的 Actor
let remote_id = ActorId::new(system1.node_id().clone(), "echo");
let remote_ref = system2.actor_ref(&remote_id).await?;
let response: Pong = remote_ref.ask(Ping { value: 10 }).await?;
assert_eq!(response.result, 20);
模块结构¶
pulsing/actor_system/
├── src/
│ ├── lib.rs # 库入口,prelude 导出
│ ├── system.rs # ActorSystem 实现
│ ├── actor/
│ │ ├── mod.rs
│ │ ├── traits.rs # Actor, Message, Handler traits
│ │ ├── context.rs # ActorContext
│ │ ├── mailbox.rs # Mailbox, Envelope
│ │ └── reference.rs # ActorRef, RemoteTransport
│ ├── cluster/
│ │ ├── mod.rs
│ │ ├── gossip.rs # GossipCluster, GossipMessage
│ │ ├── member.rs # MemberInfo, MemberStatus
│ │ └── swim.rs # SWIM 故障检测
│ └── transport/
│ ├── mod.rs
│ ├── http.rs # HTTP 传输层
│ ├── tcp.rs # TCP 传输层 (保留)
│ └── codec.rs # 消息编解码
├── tests/ # 集成测试
├── examples/ # 使用示例
└── Cargo.toml
错误处理¶
// Actor 内部错误
async fn receive(&mut self, msg: RawMessage, _ctx: &mut ActorContext)
-> anyhow::Result<RawMessage>
{
// 返回 Err 会通过响应通道传递给调用者
Err(anyhow::anyhow!("Processing failed"))
}
// 调用方处理
match actor_ref.ask::<Ping, Pong>(msg).await {
Ok(response) => { /* 成功 */ }
Err(e) => { /* 处理错误 */ }
}
性能考虑¶
- 消息序列化 - 使用 bincode 进行高效二进制序列化
- 连接复用 - HTTP keepalive 和连接池
- 异步处理 - 基于 Tokio,非阻塞 I/O
- 有界队列 - Mailbox 有界防止内存溢出
未来规划¶
- [ ] Actor 监督树 (Supervision)
- [ ] 持久化支持
- [ ] 更完善的 Leader Election
- [ ] Metrics 和 Tracing 集成
- [ ] Actor 迁移支持