Skip to content

Rust API Reference

This page provides an overview of Pulsing's Rust API with examples and usage patterns.

Installation

Add Pulsing to your Cargo.toml:

[dependencies]
pulsing-actor = "0.1"
tokio = { version = "1.0", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }

Core Concepts

The Rust API is organized into trait layers that provide different levels of functionality:

  • ActorSystemCoreExt: Primary API for spawning and resolving actors
  • ActorSystemAdvancedExt: Advanced features like supervision and factory-based spawning
  • ActorSystemOpsExt: Operations, diagnostics, and lifecycle management

Quick Start

use pulsing_actor::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct Ping(i32);

#[derive(Serialize, Deserialize)]
struct Pong(i32);

struct Echo;

#[async_trait::async_trait]
impl Actor for Echo {
    async fn receive(&mut self, msg: Message, _ctx: &mut ActorContext) -> anyhow::Result<Message> {
        let Ping(x) = msg.unpack()?;
        Message::pack(&Pong(x))
    }
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let system = ActorSystem::builder().build().await?;

    // Spawn a named actor
    let actor = system.spawn_named("services/echo", Echo).await?;

    // Send a message and wait for response
    let Pong(x): Pong = actor.ask(Ping(42)).await?;

    println!("Received: {}", x);

    system.shutdown().await?;
    Ok(())
}

Core API

ActorSystem

The main entry point for the actor system.

pub struct ActorSystem { /* fields omitted */ }

impl ActorSystem {
    pub async fn builder() -> ActorSystemBuilder {
        // Create a new actor system builder
    }
}

ActorSystemBuilder

Builder pattern for configuring the actor system.

pub struct ActorSystemBuilder { /* fields omitted */ }

impl ActorSystemBuilder {
    pub fn addr<A: Into<String>>(self, addr: A) -> Self {
        // Set the bind address
    }

    pub fn seeds<I: IntoIterator<Item = String>>(self, seeds: I) -> Self {
        // Set seed nodes for cluster discovery
    }

    pub fn build(self) -> impl Future<Output = anyhow::Result<ActorSystem>> {
        // Build the actor system
    }
}

ActorSystemCoreExt

Core spawning and resolving functionality.

#[async_trait::async_trait]
pub trait ActorSystemCoreExt {
    async fn spawn<A>(&self, actor: A) -> anyhow::Result<TypedRef<A::Message>>
    where
        A: Actor + 'static;

    async fn spawn_named<A>(
        &self,
        name: &str,
        actor: A
    ) -> anyhow::Result<TypedRef<A::Message>>
    where
        A: Actor + 'static;

    async fn actor_ref(&self, id: &ActorId) -> anyhow::Result<ActorRef>;

    async fn resolve(&self, name: &str) -> anyhow::Result<ActorRef>;
}

Actor Trait

The core trait that all actors must implement.

#[async_trait::async_trait]
pub trait Actor: Send + 'static {
    type Message: Serialize + for<'de> Deserialize<'de> + Send + 'static;

    async fn receive(
        &mut self,
        msg: Message,
        ctx: &mut ActorContext
    ) -> anyhow::Result<Message>;

    fn on_start(&mut self, _id: ActorId, _ctx: &mut ActorContext) {}

    fn on_stop(&mut self, _ctx: &mut ActorContext) {}
}

TypedRef

Type-safe reference to an actor.

pub struct TypedRef<M> { /* fields omitted */ }

impl<M> TypedRef<M>
where
    M: Serialize + for<'de> Deserialize<'de> + Send + 'static,
{
    pub async fn ask(&self, msg: M) -> anyhow::Result<M> {
        // Send message and wait for typed response
    }

    pub async fn tell(&self, msg: M) -> anyhow::Result<()> {
        // Send message without waiting for response
    }
}

Advanced Features

Supervision

Actors can be configured with restart policies for fault tolerance.

use pulsing_actor::system::SupervisionSpec;

let options = SpawnOptions::default()
    .supervision(SupervisionSpec::on_failure().max_restarts(3));

// Factory-based spawning with supervision
system.spawn_named_factory("services/worker", || Ok(Worker::new()), options).await?;

Behavior (Type-Safe Actors)

A higher-level API for type-safe actors using the behavior pattern.

use pulsing_actor::prelude::*;

fn counter(init: i32) -> Behavior<i32> {
    stateful(init, |count, n, _ctx| {
        *count += n;
        BehaviorAction::Same
    })
}

// Behavior implements IntoActor trait
let counter = system.spawn(counter(0)).await?;
let result: i32 = counter.ask(5).await?; // Result is 5

Message Types

Regular Messages

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct MyMessage {
    action: String,
    data: Vec<i32>,
}

// Pack/unpack messages
let msg = Message::pack(&MyMessage {
    action: "process".to_string(),
    data: vec![1, 2, 3],
})?;

let MyMessage { action, data } = msg.unpack()?;

Streaming Messages

// For streaming responses
let stream_msg = Message::Stream(Stream::from_iter(items));

// Handle streaming in actor
async fn receive(&mut self, msg: Message, _ctx: &mut ActorContext) -> anyhow::Result<Message> {
    match msg {
        Message::Stream(stream) => {
            // Process stream
            let result = process_stream(stream).await?;
            Message::pack(&result)
        }
        _ => Message::pack(&"Unsupported message type")
    }
}

Cluster Management

Node Discovery

Pulsing uses the SWIM protocol for automatic cluster discovery.

// Single node
let system = ActorSystem::builder()
    .addr("0.0.0.0:8000")
    .build()
    .await?;

// Join existing cluster
let system = ActorSystem::builder()
    .addr("0.0.0.0:8001")
    .seeds(vec!["127.0.0.1:8000".to_string()])
    .build()
    .await?;

ActorSystemOpsExt

Operations and diagnostics.

#[async_trait::async_trait]
pub trait ActorSystemOpsExt {
    fn node_id(&self) -> NodeId;

    fn addr(&self) -> &str;

    async fn members(&self) -> Vec<NodeInfo>;

    async fn all_named_actors(&self) -> HashMap<String, ActorId>;

    async fn stop(&self, name: &str) -> anyhow::Result<()>;

    async fn shutdown(self) -> anyhow::Result<()>;
}

Error Handling

Pulsing uses anyhow::Result<T> for error handling throughout the API.

use anyhow::{Result, Context};

async fn my_actor_logic(system: &ActorSystem) -> Result<()> {
    let actor = system.spawn_named("my_actor", MyActor)
        .await
        .context("Failed to spawn actor")?;

    let response = actor.ask(MyMessage::default())
        .await
        .context("Failed to send message")?;

    Ok(())
}

Examples

HTTP Server Actor

use pulsing_actor::prelude::*;
use warp::Filter;

struct HttpServer {
    system: ActorSystem,
}

#[async_trait::async_trait]
impl Actor for HttpServer {
    async fn receive(&mut self, msg: Message, _ctx: &mut ActorContext) -> anyhow::Result<Message> {
        // Handle HTTP requests by forwarding to other actors
        let request: HttpRequest = msg.unpack()?;
        let processor = self.system.resolve("request_processor").await?;
        processor.ask(request).await
    }
}

Worker Pool

use pulsing_actor::prelude::*;

struct WorkerPool {
    workers: Vec<ActorRef>,
}

#[async_trait::async_trait]
impl Actor for WorkerPool {
    async fn receive(&mut self, msg: Message, _ctx: &mut ActorContext) -> anyhow::Result<Message> {
        // Round-robin task distribution
        let worker = &self.workers[self.next_worker()];
        worker.ask(msg).await
    }
}

Performance Considerations

  • Zero-copy messaging: Messages are passed by reference when possible
  • Async runtime: Built on Tokio for high concurrency
  • Binary serialization: Efficient bincode serialization
  • Connection pooling: HTTP/2 connection reuse

Integration

With Axum/Warp

use axum::{routing::post, Router};
use pulsing_actor::prelude::*;

async fn handle_request(
    Extension(system): Extension<ActorSystem>,
    Json(payload): Json<MyRequest>,
) -> Json<MyResponse> {
    let actor = system.resolve("request_handler").await?;
    let response: MyResponse = actor.ask(payload).await?;
    Ok(Json(response))
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let system = ActorSystem::builder().build().await?;

    let app = Router::new()
        .route("/api", post(handle_request))
        .layer(Extension(system.clone()));

    // Start both HTTP server and actor system
    tokio::select! {
        _ = serve(app, ([127, 0, 0, 1], 3000)) => {},
        _ = system.run() => {},
    }

    Ok(())
}

Next Steps