API Reference¶
Complete API documentation for Pulsing Actor Framework.
Contract & Semantics (Derived from llms.binding.md)¶
This section is the user-facing contract for Pulsing's Python API. It is derived from the repository document llms.binding.md.
- Source of truth:
llms.binding.mdis the canonical contract. - This page: API reference + explicit semantics (concurrency, errors, trust boundaries).
- If there's a mismatch: treat
llms.binding.mdas authoritative; please open an issue/PR to sync docs.
Concurrency model for @pulsing.remote¶
For a @pulsing.remote class, method calls are translated into actor messages.
- Sync method (
def method) - Executed serially (one request at a time) in the actor.
- Recommended for fast CPU work and state mutation.
- Async method (
async def method) - The call uses stream-backed execution and is scheduled as a background task on the actor side.
- While the method is awaiting, the actor can continue receiving other messages (non-blocking behavior).
- You can either:
await proxy.async_method(...)to get the final value, orasync for chunk in proxy.async_method(...): ...to consume streamed yields.
- Generators (sync/async)
- Returning a generator (sync or async) is treated as a streaming response.
Streaming & cancellation¶
- Streaming is implemented via Pulsing stream messages; cancellation is best-effort.
- If a caller cancels the local await/iteration, the remote side may or may not stop immediately, depending on transport-level cancellation propagation.
ask vs tell¶
ask(msg): request/response. Returns a value (or raises).tell(msg): fire-and-forget. No response is awaited.
Error Model¶
Pulsing provides a unified error handling system across Rust and Python with clear error categorization:
Error Categories¶
- PulsingRuntimeError: Framework/system-level errors
- Actor system errors (NotFound, Stopped, etc.)
- Transport errors (ConnectionFailed, etc.)
- Cluster errors (NodeNotFound, etc.)
- Config errors (InvalidValue, etc.)
-
I/O errors, Serialization errors
-
PulsingActorError: User Actor execution errors
- PulsingBusinessError: User input errors, business logic errors (recoverable, return to caller)
- PulsingSystemError: Internal errors, resource errors (may trigger actor restart)
- PulsingTimeoutError: Operation timeouts (retryable)
- PulsingUnsupportedError: Unsupported operations
Usage Example¶
from pulsing.exceptions import (
PulsingBusinessError,
PulsingSystemError,
PulsingTimeoutError,
PulsingRuntimeError,
)
@pul.remote
class Service:
async def validate(self, data: str) -> bool:
if not data:
raise PulsingBusinessError(400, "Data cannot be empty")
return True
async def process(self, data: str) -> str:
try:
return expensive_operation(data)
except Exception as e:
raise PulsingSystemError(f"Processing failed: {e}", recoverable=True)
# Caller side
try:
result = await service.process("")
except PulsingBusinessError as e:
print(f"Business error [{e.code}]: {e.message}")
except PulsingSystemError as e:
print(f"System error: {e.error}, recoverable: {e.recoverable}")
except PulsingRuntimeError as e:
print(f"Framework error: {e}")
Automatic Error Classification¶
Standard Python exceptions are automatically classified:
- ValueError, TypeError → PulsingBusinessError (code=400)
- TimeoutError → PulsingTimeoutError
- RuntimeError, SystemError → PulsingSystemError (recoverable=True)
- Other exceptions → PulsingSystemError (recoverable=True)
Note: Error type information is preserved for both local and remote calls. Remote error propagation maintains error categorization.
Trust boundary & security notes¶
- Pickle-based payloads (Python ↔ Python):
- Python-to-Python payloads are transported as pickle by default for convenience.
- Risk: unpickling untrusted data can lead to arbitrary code execution (RCE).
- Guideline: only use pickle payloads inside a trusted network / trusted cluster boundary.
- Transport security (TLS):
- For production deployments, always enable TLS and treat the cluster as an authenticated trust boundary.
Queue semantics (distributed queue)¶
- Bucketing:
- Writer uses
bucket_column+num_bucketsto partition records into buckets. - Readers must use a consistent
num_buckets(and backend) with writers. - Ownership:
- Bucket ownership is computed by hashing over live cluster members; requests may be redirected to the owning node.
- Backends:
- Default backend is in-memory; persistence depends on the selected backend.
Core Functions¶
pul.init / pul.shutdown¶
Global system initialization.
import asyncio
import pulsing as pul
async def example():
await pul.init(addr=None, seeds=None, passphrase=None)
await pul.shutdown()
# Usage example
if __name__ == "__main__":
# To run the example
# system = asyncio.run(example())
pass
Parameters:
- addr: Bind address (str or None for standalone)
- seeds: Seed nodes to join cluster (list[str] or None)
- passphrase: TLS passphrase (str or None)
Under the Hood: pul.actor_system¶
Create a new explicit ActorSystem instance when you need low-level control.
import asyncio
import pulsing as pul
async def main():
# Standalone mode
system = await pul.actor_system()
await system.shutdown()
# Cluster mode
system = await pul.actor_system(addr="0.0.0.0:8000")
await system.shutdown()
# Join existing cluster
system = await pul.actor_system(
addr="0.0.0.0:8001",
seeds=["127.0.0.1:8000"]
)
await system.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Core Classes¶
ActorSystem¶
Main entry point for the actor system.
class ActorSystem:
async def spawn(
self,
actor, # Actor instance
# Keyword-only arguments follow
name=None, # Actor name (str or None)
# public parameter is deprecated: all named actors are resolvable
restart_policy="never", # Restart policy ("never", "always", "on-failure")
max_restarts=3, # Maximum restart attempts
min_backoff=0.1, # Minimum backoff seconds
max_backoff=30.0 # Maximum backoff seconds
):
"""
Spawn a new actor.
- With name: named actor, discoverable via resolve()
- Without name: anonymous actor, only accessible via returned ActorRef
"""
pass
async def refer(self, actorid):
"""Get ActorRef by ActorId."""
pass
async def resolve(self, name, *, node_id=None):
"""Resolve actor by name."""
pass
async def shutdown(self):
"""Shutdown the actor system."""
pass
ActorRef¶
Low-level reference to an actor. Use ask() and tell() to communicate.
class ActorRef:
@property
def actor_id(self):
"""Get the actor's ID."""
pass
async def ask(self, msg):
"""Send a message and wait for response."""
pass
async def tell(self, msg):
"""Send a message without waiting for response (fire-and-forget)."""
pass
def as_any(self):
"""Get untyped ActorProxy when remote class is unknown."""
pass
def as_type(self, cls):
"""Get typed ActorProxy bound to class metadata."""
pass
ActorProxy¶
High-level proxy for @remote classes. Call methods directly.
class ActorProxy:
@property
def ref(self):
"""Get underlying ActorRef."""
pass
# Call methods directly:
# result = await proxy.my_method(arg1, arg2)
Decorators¶
@remote / @pul.remote¶
Convert a class into a distributed Actor.
import pulsing as pul
@pul.remote
class Counter:
def __init__(self, init_value: int = 0):
self.value = init_value
# Sync method - sequential execution
def incr(self):
self.value += 1
return self.value
# Async method - concurrent execution during await
async def fetch_and_add(self, url):
data = await http_get(url)
self.value += data
return self.value
# Generator - automatic streaming
async def stream(self):
for i in range(10):
yield {"count": i}
# Create actor
counter = await Counter.spawn(name="counter")
# Call methods directly
result = await counter.incr()
# Streaming
async for chunk in counter.stream():
print(chunk)
# Resolve existing actor
proxy = await Counter.resolve("counter")
Supervision parameters:
@pul.remote(
restart_policy="on_failure", # "never" | "on_failure" | "always"
max_restarts=3,
min_backoff=0.1,
max_backoff=30.0,
)
class ResilientWorker:
def work(self, data): ...
Under the Hood: Base Actor¶
For low-level control, inherit from Actor base class.
class MyActor:
def __init__(self):
self.value = 0
def on_start(self, actor_id):
"""Called when actor starts."""
print(f"Started: {actor_id}")
async def receive(self, msg):
"""Handle incoming messages."""
if msg.get("action") == "add":
self.value += msg.get("n", 1)
return {"value": self.value}
return {"error": "unknown action"}
# Spawn
system = await pul.actor_system()
actor = await system.spawn(MyActor(), name="my_actor")
# Communicate via ask/tell
response = await actor.ask({"action": "add", "n": 10})
Queue API¶
Distributed queue for data pipelines.
# Write
writer = await pul.queue.write(
"my_queue",
bucket_column="user_id",
num_buckets=4,
)
await writer.put({"user_id": "u1", "data": "hello"})
await writer.flush()
# Read
reader = await pul.queue.read("my_queue")
records = await reader.get(limit=100)
Rust API¶
The Rust API is organized into three trait layers (all re-exported in pulsing_actor::prelude::*):
ActorSystemCoreExt (Primary API)¶
Core spawn and resolve operations:
// Spawn - Simple API
system.spawn(actor).await?; // Anonymous actor (not resolvable)
system.spawn_named(name, actor).await?; // Named actor (resolvable)
// Spawn - Builder pattern (advanced config)
system.spawning()
.name("services/counter") // Optional: with name = resolvable
.supervision(SupervisionSpec::on_failure().max_restarts(3))
.mailbox_capacity(256)
.spawn(actor).await?;
// Resolve - Simple API
system.actor_ref(&actor_id).await?; // Get by ActorId
system.resolve(name).await?; // Resolve by name
// Resolve - Builder pattern (advanced config)
system.resolving()
.node(node_id) // Optional: target node
.policy(RoundRobinPolicy::new()) // Optional: load balancing
.filter_alive(true) // Optional: only alive nodes
.resolve(name).await?; // Resolve single
system.resolving().list(name).await?; // Get all instances
system.resolving().lazy(name)?; // Lazy resolution (~5s TTL auto-refresh)
ActorSystemAdvancedExt (Supervision/Restart)¶
Factory-based spawning for supervision restarts (named actors only):
let options = SpawnOptions::default()
.supervision(SupervisionSpec::on_failure().max_restarts(3));
// Only named actors support supervision (anonymous cannot be re-resolved)
system.spawn_named_factory(name, || Ok(Service::new()), options).await?;
ActorSystemOpsExt (Operations/Diagnostics)¶
System info, cluster membership, lifecycle:
system.node_id();
system.addr();
system.members().await;
system.all_named_actors().await;
system.stop(name).await?;
system.shutdown().await?;
Examples¶
See the Quick Start Guide for usage examples.