Actor Guide¶
This guide covers the Actor model concepts and patterns for building robust distributed applications.
Prerequisite
If you haven't completed the Quickstart, start there first.
Communication Patterns
Not sure when to use sync vs async vs streaming? See the Communication Patterns Guide for detailed guidance.
What is an Actor?¶
An Actor is a fundamental unit of computation in concurrent and distributed systems. The Actor model, introduced by Carl Hewitt in 1973, provides a principled approach to building systems that are:
- Concurrent: Multiple actors run in parallel
- Distributed: Actors can be on different machines
- Fault-tolerant: Failures are isolated
Core Principles¶
┌─────────────────────────────────────────────────────────────┐
│ Actor │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Private │ │ Mailbox │ │ Behavior │ │
│ │ State │ │ (FIFO) │ │ (Methods) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ ▲ │ │ │
│ │ ▼ ▼ │
│ └──────── process one message at a time ────────────│
└─────────────────────────────────────────────────────────────┘
| Principle | Description |
|---|---|
| Isolation | Each actor has private state; no shared memory |
| Message-passing | Actors communicate only via async messages |
| Sequential processing | One message at a time (no internal locks) |
| Location transparency | Same API for local and remote actors |
Why Pulsing over Ray?¶
Ray's "actor" is essentially a stateful remote object — you call methods on it, but there's no formal message queue or delivery semantics.
Pulsing follows the classical Actor model (like Erlang/Akka):
| Feature | Pulsing | Ray |
|---|---|---|
| Message queue (mailbox) | ✅ FIFO | ❌ Direct call |
| Sequential guarantee | ✅ Per-actor | ⚠️ Per-method |
| Supervision/restart | ✅ Built-in | ❌ Manual |
| Zero external deps | ✅ | ❌ (needs Ray cluster) |
| Streaming messages | ✅ Native | ❌ |
Python API¶
Global Async API¶
import pulsing as pul
@pul.remote
class Calculator:
def __init__(self, initial_value: int = 0):
self.value = initial_value
def add(self, n: int) -> int:
self.value += n
return self.value
async def main():
await pul.init()
calc = await Calculator.spawn(initial_value=100)
result = await calc.add(50) # 150
await pul.shutdown()
Message Patterns¶
Ask (Request-Response)¶
Tell (Fire-and-forget)¶
Streaming Messages¶
For continuous data flow (e.g., LLM token generation), just return a generator:
@pul.remote
class TokenGenerator:
async def generate(self, prompt: str):
# Just return an async generator - Pulsing handles streaming automatically
for token in self.generate_tokens(prompt):
yield {"token": token}
# Consume the stream
async for chunk in generator.generate("Hello"):
print(chunk["token"], end="", flush=True)
Supervision (Actor-Level Restart)¶
Pulsing supports automatic actor restart on failure:
@pul.remote(
restart_policy="on_failure", # "never" | "on_failure" | "always"
max_restarts=3,
min_backoff=1.0,
max_backoff=60.0
)
class ReliableWorker:
def process(self, data):
# If this crashes, actor restarts automatically
return heavy_computation(data)
Note
Restart restores the actor but not its in-memory state. See Reliability Guide for idempotency patterns.
Error Handling¶
Pulsing provides a unified error handling system with clear error categorization.
Throwing Errors¶
from pulsing.exceptions import (
PulsingBusinessError,
PulsingSystemError,
PulsingTimeoutError,
)
@pul.remote
class Service:
async def validate(self, data: str) -> bool:
if not data:
raise PulsingBusinessError(400, "Data required")
return True
async def process(self, data: str) -> str:
try:
return expensive_operation(data)
except Exception as e:
raise PulsingSystemError(f"Processing failed: {e}", recoverable=True)
async def fetch_with_timeout(self, url: str) -> str:
try:
return await asyncio.wait_for(httpx.get(url), timeout=5.0)
except asyncio.TimeoutError:
raise PulsingTimeoutError("fetch", duration_ms=5000)
Catching Errors¶
from pulsing.exceptions import (
PulsingBusinessError,
PulsingSystemError,
PulsingRuntimeError,
)
try:
result = await service.process(data)
except PulsingBusinessError as e:
# Handle business logic error
print(f"Validation error: {e.message}")
except PulsingSystemError as e:
# Handle system error
if e.recoverable:
# May retry or wait for actor restart
pass
else:
# Non-recoverable error
logger.error(f"Fatal error: {e.error}")
except PulsingRuntimeError as e:
# Handle framework error (network, cluster, etc.)
print(f"System error: {e}")
Automatic Error Classification¶
Standard Python exceptions are automatically classified:
@pul.remote
class Processor:
def process(self, data: str) -> str:
if not data:
# ValueError → PulsingBusinessError (code=400)
raise ValueError("Data required")
# Other exceptions → PulsingSystemError (recoverable=True)
return process_data(data)
Advanced Patterns¶
1. Stateful Actor¶
@pul.remote
class SessionManager:
def __init__(self):
self.sessions = {}
def create_session(self, user_id: str) -> str:
session_id = str(uuid.uuid4())
self.sessions[session_id] = {"user_id": user_id, "data": {}}
return session_id
def get_session(self, session_id: str) -> dict | None:
return self.sessions.get(session_id)
2. Worker Pool (Round-Robin)¶
@pul.remote
class WorkerPool:
def __init__(self, workers: list):
self.workers = workers
self.idx = 0
async def submit(self, task: dict):
worker = self.workers[self.idx]
self.idx = (self.idx + 1) % len(self.workers)
return await worker.process(task)
3. Pipeline¶
@pul.remote
class PipelineStage:
def __init__(self, next_stage=None):
self.next_stage = next_stage
async def process(self, data: dict) -> dict:
result = await self.transform(data)
if self.next_stage:
return await self.next_stage.process(result)
return result
4. LLM Inference Service¶
@pul.remote
class LLMService:
def __init__(self, model_name: str):
self.model_name = model_name
self.model = None
async def load_model(self):
from transformers import AutoModelForCausalLM, AutoTokenizer
self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
self.model = AutoModelForCausalLM.from_pretrained(self.model_name)
async def generate(self, prompt: str, max_tokens: int = 100) -> str:
inputs = self.tokenizer(prompt, return_tensors="pt")
outputs = self.model.generate(**inputs, max_new_tokens=max_tokens)
return self.tokenizer.decode(outputs[0], skip_special_tokens=True)
Best Practices¶
| ✅ DO | ❌ DON'T |
|---|---|
| Single responsibility per actor | Share mutable state between actors |
| Use async for I/O | Block in methods |
| Handle errors gracefully | Ignore exceptions |
Initialize state in __init__ |
Use global variables |
Error Handling¶
@pul.remote
class ResilientActor:
async def risky_operation(self, data: dict) -> dict:
try:
result = await self.process(data)
return {"success": True, "result": result}
except ValueError as e:
return {"success": False, "error": str(e)}
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
Quick Reference¶
Common Operations¶
import pulsing as pul
await pul.init()
# Using @pul.remote decorator (recommended)
@pul.remote
class MyService:
def process(self, data): return data
service = await MyService.spawn(name="service")
result = await service.process("hello")
# Resolve existing actor
proxy = await MyService.resolve("service")
# Shutdown
await pul.shutdown()
Next Steps¶
- Remote Actors — cluster communication
- Reliability — idempotency, retries, timeouts
- Operations — CLI tools for inspection
- LLM Inference — production inference setup