跳转至

Behavior API (Rust)

Pulsing 提供类型安全的函数式 Actor 编程接口,灵感来自 Akka Typed。Behavior API 提供了传统 Actor trait 之外的另一种选择,具备编译时消息类型检查。

设计理念

类型安全优先。

  • TypedRef<M> 确保消息在编译时进行类型检查
  • 通过 BehaviorAction::Become 进行状态转换,实现清晰的状态机模式
  • 函数式风格:Actor 就是消息处理函数

核心概念

Behavior<M>

Actor 定义为处理类型 M 消息的函数:

use pulsing_actor::behavior::{stateful, Behavior, BehaviorAction};

fn counter(initial: i32) -> Behavior<i32> {
    stateful(initial, |count, n, _ctx| {
        *count += n;
        println!("count = {}", *count);
        BehaviorAction::Same
    })
}

TypedRef<M>

类型安全的 Actor 引用,只接受类型 M 的消息:

// Behavior 实现了 IntoActor,可以直接 spawn
let counter = system.spawn_named("actors/counter", counter(0)).await?;

counter.tell(5).await?;  // OK
counter.tell(3).await?;  // OK
// counter.tell("hello").await?;  // 运行时错误(反序列化时类型不匹配)

BehaviorAction

控制 Actor 生命周期和状态转换:

Action 说明
Same 保持当前 behavior
Become(behavior) 切换到新的 behavior(状态机转换)
Stop(reason) 优雅停止 Actor

创建 Behavior

有状态 Behavior

使用 stateful() 创建有内部状态的 Actor:

use pulsing_actor::behavior::{stateful, Behavior, BehaviorAction};

fn counter(initial: i32) -> Behavior<i32> {
    stateful(initial, |count, msg, ctx| {
        *count += msg;
        println!("[{}] count = {}", ctx.name(), *count);
        BehaviorAction::Same
    })
}

处理函数接收: - &mut S - 状态的可变引用 - M - 消息 - &BehaviorContext<M> - Actor 上下文

无状态 Behavior

使用 stateless() 创建无内部状态的 Actor:

use pulsing_actor::behavior::{stateless, Behavior, BehaviorAction};

fn echo() -> Behavior<String> {
    stateless(|msg, ctx| {
        Box::pin(async move {
            println!("[{}] Received: {}", ctx.name(), msg);
            BehaviorAction::Same
        })
    })
}

状态机模式

使用 BehaviorAction::Become 实现状态机:

use pulsing_actor::behavior::{stateful, Behavior, BehaviorAction};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
enum Signal {
    Next,
    Query,
}

// 带共享数据的状态
#[derive(Clone)]
struct Stats {
    cycles: u32,
    transitions: u32,
}

fn red(stats: Stats) -> Behavior<Signal> {
    stateful(stats, |stats, msg, ctx| match msg {
        Signal::Next => {
            stats.transitions += 1;
            println!("[{}] 🔴 红灯 -> 🟢 绿灯", ctx.name());
            BehaviorAction::Become(green(stats.clone()))
        }
        Signal::Query => {
            println!("[{}] 当前: 🔴 红灯", ctx.name());
            BehaviorAction::Same
        }
    })
}

fn green(stats: Stats) -> Behavior<Signal> {
    stateful(stats, |stats, msg, ctx| match msg {
        Signal::Next => {
            stats.transitions += 1;
            println!("[{}] 🟢 绿灯 -> 🟡 黄灯", ctx.name());
            BehaviorAction::Become(yellow(stats.clone()))
        }
        Signal::Query => {
            println!("[{}] 当前: 🟢 绿灯", ctx.name());
            BehaviorAction::Same
        }
    })
}

fn yellow(stats: Stats) -> Behavior<Signal> {
    stateful(stats, |stats, msg, ctx| match msg {
        Signal::Next => {
            stats.transitions += 1;
            stats.cycles += 1;
            println!("[{}] 🟡 黄灯 -> 🔴 红灯 (周期 #{})", ctx.name(), stats.cycles);
            BehaviorAction::Become(red(stats.clone()))
        }
        Signal::Query => {
            println!("[{}] 当前: 🟡 黄灯", ctx.name());
            BehaviorAction::Same
        }
    })
}

BehaviorContext

上下文提供:

// 获取 Actor 名称
let name = ctx.name();

// 获取类型安全的自引用(用于 reply-to 模式)
let self_ref: TypedRef<M> = ctx.self_ref();

// 获取其他 Actor 的类型引用
let other: TypedRef<OtherMsg> = ctx.typed_ref("other_actor");

// 调度延迟消息给自己
ctx.schedule_self(msg, Duration::from_secs(5));

// 检查 Actor 是否应该停止
if ctx.is_cancelled() { ... }

// 访问底层 ActorSystem
let system = ctx.system();

TypedRef 操作

// 发送不等待响应
counter.tell(5).await?;

// 请求-响应
let result: i32 = counter.ask(CounterMsg::Get).await?;

// 带超时
let result: i32 = counter.ask_timeout(msg, Duration::from_secs(5)).await?;

// 检查 Actor 是否存活
if counter.is_alive() { ... }

// 获取底层无类型 ActorRef
let actor_ref = counter.as_untyped()?;

完整示例

use pulsing_actor::behavior::{stateful, Behavior, BehaviorAction};
use pulsing_actor::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
enum CounterMsg {
    Increment(i32),
    Decrement(i32),
    Get,
}

fn counter(initial: i32) -> Behavior<CounterMsg> {
    stateful(initial, |count, msg, ctx| match msg {
        CounterMsg::Increment(n) => {
            *count += n;
            println!("[{}] +{} = {}", ctx.name(), n, *count);
            BehaviorAction::Same
        }
        CounterMsg::Decrement(n) => {
            *count -= n;
            println!("[{}] -{} = {}", ctx.name(), n, *count);
            BehaviorAction::Same
        }
        CounterMsg::Get => {
            println!("[{}] 当前 = {}", ctx.name(), *count);
            BehaviorAction::Same
        }
    })
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let system = ActorSystem::builder().build().await?;

    // Behavior 实现了 IntoActor,可以直接 spawn
    let counter = system.spawn_named("actors/counter", counter(0)).await?;

    // 消息发送
    counter.tell(CounterMsg::Increment(5)).await?;
    counter.tell(CounterMsg::Increment(3)).await?;
    counter.tell(CounterMsg::Decrement(2)).await?;
    counter.tell(CounterMsg::Get).await?;

    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
    system.shutdown().await
}

Actor Trait vs Behavior

特性 Actor Trait Behavior API
类型安全 运行时 (Message 类型) 编译时 (TypedRef<M>)
状态 结构体字段 封装在闭包中
状态机 手动实现 BehaviorAction::Become
风格 OOP (impl trait) 函数式 (函数)
灵活性 更高 结构化

何时使用 Behavior:

  • 需要编译时消息类型检查
  • 构建状态机
  • 偏好函数式编程风格

何时使用 Actor trait:

  • 需要最大灵活性
  • 复杂的初始化逻辑
  • 现有 OOP 代码库

运行示例

# 计数器示例
cargo run --example behavior_counter -p pulsing-actor

# 状态机示例
cargo run --example behavior_fsm -p pulsing-actor

下一步