Skip to content

API Reference

Complete API documentation for Pulsing Actor Framework.

Contract & Semantics (Derived from llms.binding.md)

This section is the user-facing contract for Pulsing's Python API. It is derived from the repository document llms.binding.md.

  • Source of truth: llms.binding.md is the canonical contract.
  • This page: API reference + explicit semantics (concurrency, errors, trust boundaries).
  • If there's a mismatch: treat llms.binding.md as authoritative; please open an issue/PR to sync docs.

Concurrency model for @pulsing.remote

For a @pulsing.remote class, method calls are translated into actor messages.

  • Sync method (def method)
  • Executed serially (one request at a time) in the actor.
  • Recommended for fast CPU work and state mutation.
  • Async method (async def method)
  • The call uses stream-backed execution and is scheduled as a background task on the actor side.
  • While the method is awaiting, the actor can continue receiving other messages (non-blocking behavior).
  • You can either:
    • await proxy.async_method(...) to get the final value, or
    • async for chunk in proxy.async_method(...): ... to consume streamed yields.
  • Generators (sync/async)
  • Returning a generator (sync or async) is treated as a streaming response.

Streaming & cancellation

  • Streaming is implemented via Pulsing stream messages; cancellation is best-effort.
  • If a caller cancels the local await/iteration, the remote side may or may not stop immediately, depending on transport-level cancellation propagation.

ask vs tell

  • ask(msg): request/response. Returns a value (or raises).
  • tell(msg): fire-and-forget. No response is awaited.

Error Model

Pulsing provides a unified error handling system across Rust and Python with clear error categorization:

Error Categories

  1. PulsingRuntimeError: Framework/system-level errors
  2. Actor system errors (NotFound, Stopped, etc.)
  3. Transport errors (ConnectionFailed, etc.)
  4. Cluster errors (NodeNotFound, etc.)
  5. Config errors (InvalidValue, etc.)
  6. I/O errors, Serialization errors

  7. PulsingActorError: User Actor execution errors

  8. PulsingBusinessError: User input errors, business logic errors (recoverable, return to caller)
  9. PulsingSystemError: Internal errors, resource errors (may trigger actor restart)
  10. PulsingTimeoutError: Operation timeouts (retryable)
  11. PulsingUnsupportedError: Unsupported operations

Usage Example

from pulsing.exceptions import (
    PulsingBusinessError,
    PulsingSystemError,
    PulsingTimeoutError,
    PulsingRuntimeError,
)

@pul.remote
class Service:
    async def validate(self, data: str) -> bool:
        if not data:
            raise PulsingBusinessError(400, "Data cannot be empty")
        return True

    async def process(self, data: str) -> str:
        try:
            return expensive_operation(data)
        except Exception as e:
            raise PulsingSystemError(f"Processing failed: {e}", recoverable=True)

# Caller side
try:
    result = await service.process("")
except PulsingBusinessError as e:
    print(f"Business error [{e.code}]: {e.message}")
except PulsingSystemError as e:
    print(f"System error: {e.error}, recoverable: {e.recoverable}")
except PulsingRuntimeError as e:
    print(f"Framework error: {e}")

Automatic Error Classification

Standard Python exceptions are automatically classified: - ValueError, TypeErrorPulsingBusinessError (code=400) - TimeoutErrorPulsingTimeoutError - RuntimeError, SystemErrorPulsingSystemError (recoverable=True) - Other exceptions → PulsingSystemError (recoverable=True)

Note: Error type information is preserved for both local and remote calls. Remote error propagation maintains error categorization.

Trust boundary & security notes

  • Pickle-based payloads (Python ↔ Python):
  • Python-to-Python payloads are transported as pickle by default for convenience.
  • Risk: unpickling untrusted data can lead to arbitrary code execution (RCE).
  • Guideline: only use pickle payloads inside a trusted network / trusted cluster boundary.
  • Transport security (TLS):
  • For production deployments, always enable TLS and treat the cluster as an authenticated trust boundary.

Queue semantics (distributed queue)

  • Bucketing:
  • Writer uses bucket_column + num_buckets to partition records into buckets.
  • Readers must use a consistent num_buckets (and backend) with writers.
  • Ownership:
  • Bucket ownership is computed by hashing over live cluster members; requests may be redirected to the owning node.
  • Backends:
  • Default backend is in-memory; persistence depends on the selected backend.

Core Functions

pul.init / pul.shutdown

Global system initialization.

import asyncio
import pulsing as pul

async def example():
    await pul.init(addr=None, seeds=None, passphrase=None)
    await pul.shutdown()

# Usage example
if __name__ == "__main__":
    # To run the example
    # system = asyncio.run(example())
    pass

Parameters: - addr: Bind address (str or None for standalone) - seeds: Seed nodes to join cluster (list[str] or None) - passphrase: TLS passphrase (str or None)

Under the Hood: pul.actor_system

Create a new explicit ActorSystem instance when you need low-level control.

import asyncio
import pulsing as pul

async def main():
    # Standalone mode
    system = await pul.actor_system()
    await system.shutdown()

    # Cluster mode
    system = await pul.actor_system(addr="0.0.0.0:8000")
    await system.shutdown()

    # Join existing cluster
    system = await pul.actor_system(
        addr="0.0.0.0:8001",
        seeds=["127.0.0.1:8000"]
    )
    await system.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

Core Classes

ActorSystem

Main entry point for the actor system.

class ActorSystem:
    async def spawn(
        self,
        actor,                # Actor instance
        # Keyword-only arguments follow
        name=None,           # Actor name (str or None)
        # public parameter is deprecated: all named actors are resolvable
        restart_policy="never",  # Restart policy ("never", "always", "on-failure")
        max_restarts=3,      # Maximum restart attempts
        min_backoff=0.1,     # Minimum backoff seconds
        max_backoff=30.0     # Maximum backoff seconds
    ):
        """
        Spawn a new actor.

        - With name: named actor, discoverable via resolve()
        - Without name: anonymous actor, only accessible via returned ActorRef
        """
        pass

    async def refer(self, actorid):
        """Get ActorRef by ActorId."""
        pass

    async def resolve(self, name, *, node_id=None):
        """Resolve actor by name."""
        pass

    async def shutdown(self):
        """Shutdown the actor system."""
        pass

ActorRef

Low-level reference to an actor. Use ask() and tell() to communicate.

class ActorRef:
    @property
    def actor_id(self):
        """Get the actor's ID."""
        pass

    async def ask(self, msg):
        """Send a message and wait for response."""
        pass

    async def tell(self, msg):
        """Send a message without waiting for response (fire-and-forget)."""
        pass

    def as_any(self):
        """Get untyped ActorProxy when remote class is unknown."""
        pass

    def as_type(self, cls):
        """Get typed ActorProxy bound to class metadata."""
        pass

ActorProxy

High-level proxy for @remote classes. Call methods directly.

class ActorProxy:
    @property
    def ref(self):
        """Get underlying ActorRef."""
        pass

    # Call methods directly:
    # result = await proxy.my_method(arg1, arg2)

Decorators

@remote / @pul.remote

Convert a class into a distributed Actor.

import pulsing as pul

@pul.remote
class Counter:
    def __init__(self, init_value: int = 0):
        self.value = init_value

    # Sync method - sequential execution
    def incr(self):
        self.value += 1
        return self.value

    # Async method - concurrent execution during await
    async def fetch_and_add(self, url):
        data = await http_get(url)
        self.value += data
        return self.value

    # Generator - automatic streaming
    async def stream(self):
        for i in range(10):
            yield {"count": i}

# Create actor
counter = await Counter.spawn(name="counter")

# Call methods directly
result = await counter.incr()

# Streaming
async for chunk in counter.stream():
    print(chunk)

# Resolve existing actor
proxy = await Counter.resolve("counter")

Supervision parameters:

@pul.remote(
    restart_policy="on_failure",  # "never" | "on_failure" | "always"
    max_restarts=3,
    min_backoff=0.1,
    max_backoff=30.0,
)
class ResilientWorker:
    def work(self, data): ...

Under the Hood: Base Actor

For low-level control, inherit from Actor base class.

class MyActor:
    def __init__(self):
        self.value = 0

    def on_start(self, actor_id):
        """Called when actor starts."""
        print(f"Started: {actor_id}")

    async def receive(self, msg):
        """Handle incoming messages."""
        if msg.get("action") == "add":
            self.value += msg.get("n", 1)
            return {"value": self.value}
        return {"error": "unknown action"}

# Spawn
system = await pul.actor_system()
actor = await system.spawn(MyActor(), name="my_actor")

# Communicate via ask/tell
response = await actor.ask({"action": "add", "n": 10})

Queue API

Distributed queue for data pipelines.

# Write
writer = await pul.queue.write(
    "my_queue",
    bucket_column="user_id",
    num_buckets=4,
)
await writer.put({"user_id": "u1", "data": "hello"})
await writer.flush()

# Read
reader = await pul.queue.read("my_queue")
records = await reader.get(limit=100)

Rust API

The Rust API is organized into three trait layers (all re-exported in pulsing_actor::prelude::*):

ActorSystemCoreExt (Primary API)

Core spawn and resolve operations:

// Spawn - Simple API
system.spawn(actor).await?;                    // Anonymous actor (not resolvable)
system.spawn_named(name, actor).await?;        // Named actor (resolvable)

// Spawn - Builder pattern (advanced config)
system.spawning()
    .name("services/counter")                  // Optional: with name = resolvable
    .supervision(SupervisionSpec::on_failure().max_restarts(3))
    .mailbox_capacity(256)
    .spawn(actor).await?;

// Resolve - Simple API
system.actor_ref(&actor_id).await?;            // Get by ActorId
system.resolve(name).await?;                   // Resolve by name

// Resolve - Builder pattern (advanced config)
system.resolving()
    .node(node_id)                             // Optional: target node
    .policy(RoundRobinPolicy::new())           // Optional: load balancing
    .filter_alive(true)                        // Optional: only alive nodes
    .resolve(name).await?;                     // Resolve single

system.resolving().list(name).await?;          // Get all instances
system.resolving().lazy(name)?;                // Lazy resolution (~5s TTL auto-refresh)

ActorSystemAdvancedExt (Supervision/Restart)

Factory-based spawning for supervision restarts (named actors only):

let options = SpawnOptions::default()
    .supervision(SupervisionSpec::on_failure().max_restarts(3));

// Only named actors support supervision (anonymous cannot be re-resolved)
system.spawn_named_factory(name, || Ok(Service::new()), options).await?;

ActorSystemOpsExt (Operations/Diagnostics)

System info, cluster membership, lifecycle:

system.node_id();
system.addr();
system.members().await;
system.all_named_actors().await;
system.stop(name).await?;
system.shutdown().await?;

Examples

See the Quick Start Guide for usage examples.