Python API Reference¶
This page contains the complete auto-generated API documentation for Pulsing's Python interface.
Installation¶
Pulsing requires Python 3.10+ and can be installed via pip:
For development, clone the repository and install in development mode:
Core Module¶
Pulsing - Distributed Actor Framework
Usage
import pulsing as pul
await pul.init()
@pul.remote class Counter: def init(self, init=0): self.value = init def incr(self): self.value += 1; return self.value
counter = await Counter.spawn(name="counter") result = await counter.incr()
await pul.shutdown()
Classes¶
Actor
¶
Bases: ABC
Base class for Python actors. Implement receive to handle messages.
Python actors can receive and return arbitrary Python objects when communicating with other Python actors. The objects are automatically pickled and unpickled.
For communication with Rust actors, use Message.from_json() and msg.to_json().
Functions¶
metadata()
¶
on_start(actor_id)
¶
on_stop()
¶
receive(msg)
abstractmethod
async
¶
Handle incoming message
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
msg
|
Incoming message. Can be: - Any Python object (when called from Python actors with ask/tell) - Message object (when called from Rust actors or with Message.from_json) |
required |
Returns:
| Type | Description |
|---|---|
|
|
|
|
|
|
|
Example (Python-to-Python, simple objects): # Caller: result = await counter.ask({"action": "increment", "n": 10})
# Actor receive:
async def receive(self, msg):
if isinstance(msg, dict) and msg.get("action") == "increment":
self.value += msg["n"]
return {"value": self.value}
Example (Rust actor communication): async def receive(self, msg): if isinstance(msg, Message) and msg.msg_type == "Ping": return Message.from_json("Pong", {"count": 1}) return None
Source code in pulsing/core/__init__.py
ActorProxy(actor_ref, method_names=None, async_methods=None)
¶
Actor proxy.
Source code in pulsing/core/remote.py
ActorSystem(inner)
¶
ActorSystem wrapper with queue/topic API
This wraps the Rust ActorSystem and adds Python-level extensions like queue and topic APIs.
Source code in pulsing/__init__.py
Functions¶
refer(actorid)
async
¶
Get actor reference by ID
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
actorid
|
ActorId | str
|
Actor ID (ActorId instance or string in format "node_id:local_id") |
required |
Returns:
| Type | Description |
|---|---|
ActorRef
|
ActorRef to the actor |
Source code in pulsing/__init__.py
PulsingActorError(message, actor_name=None, cause=None)
¶
Bases: PulsingError
User Actor execution errors.
This corresponds to pulsing_actor::error::ActorError in Rust.
These are errors raised by user code during Actor execution: - Business errors (user input errors) - System errors (internal errors from user code) - Timeout errors (operation timeouts) - Unsupported errors (unsupported operations)
Note: Framework-level errors like "Actor not found" are RuntimeError, not ActorError.
Source code in pulsing/exceptions.py
PulsingBusinessError(code, message, details=None)
¶
Bases: PulsingActorError
Business error: User input error, business logic error.
These errors are recoverable and should be returned to the caller. Automatically converted to ActorError::Business in Rust.
Example
@remote class UserActor: async def validate_age(self, age: int) -> bool: if age < 18: raise PulsingBusinessError(400, "Age must be >= 18", details="User validation failed") return True
Source code in pulsing/exceptions.py
PulsingError
¶
Bases: Exception
Base exception for all Pulsing errors.
This corresponds to pulsing_actor::error::PulsingError in Rust.
PulsingRuntimeError(message, cause=None)
¶
Bases: PulsingError
Framework/system-level errors.
This corresponds to pulsing_actor::error::RuntimeError in Rust.
These are framework-level errors, not caused by user code: - Actor system errors (NotFound, Stopped, etc.) - Transport errors (ConnectionFailed, etc.) - Cluster errors (NodeNotFound, etc.) - Config errors (InvalidValue, etc.) - I/O errors - Serialization errors
Source code in pulsing/exceptions.py
PulsingSystemError(error, recoverable=True)
¶
Bases: PulsingActorError
System error: Internal error, resource error.
May trigger Actor restart depending on recoverable flag. Automatically converted to ActorError::System in Rust.
Example
@remote class DataProcessor: async def process(self, data: str) -> str: try: return process_data(data) except Exception as e: raise PulsingSystemError(f"Processing failed: {e}", recoverable=True)
Source code in pulsing/exceptions.py
PulsingTimeoutError(operation, duration_ms=0)
¶
Bases: PulsingActorError
Timeout error: Operation timed out.
Usually recoverable, can be retried. Automatically converted to ActorError::Timeout in Rust.
Example
@remote class NetworkActor: async def fetch(self, url: str) -> str: try: return await asyncio.wait_for(httpx.get(url), timeout=5.0) except asyncio.TimeoutError: raise PulsingTimeoutError("fetch", duration_ms=5000)
Source code in pulsing/exceptions.py
PulsingUnsupportedError(operation)
¶
Bases: PulsingActorError
Unsupported operation error.
Not recoverable. Indicates that the requested operation is not supported. Automatically converted to ActorError::Unsupported in Rust.
Example
@remote class LegacyActor: async def process(self, data: str) -> str: if data.startswith("legacy:"): raise PulsingUnsupportedError("process") return process_data(data)
Source code in pulsing/exceptions.py
Functions¶
actor_system(addr=None, *, seeds=None, passphrase=None)
async
¶
Create a new ActorSystem (does not set global system)
This is the Actor System style API for explicit system management. Use this when you need multiple systems or want explicit control.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
addr
|
str | None
|
Bind address (e.g., "0.0.0.0:8000"). None for standalone mode. |
None
|
seeds
|
list[str] | None
|
Seed nodes to join cluster |
None
|
passphrase
|
str | None
|
Enable TLS with this passphrase |
None
|
Returns:
| Type | Description |
|---|---|
ActorSystem
|
ActorSystem instance with .queue API |
Example
import pulsing as pul
Standalone mode¶
system = await pul.actor_system()
Cluster mode¶
system = await pul.actor_system(addr="0.0.0.0:8000")
Join existing cluster¶
system = await pul.actor_system( addr="0.0.0.0:8001", seeds=["192.168.1.1:8000"] )
With TLS¶
system = await pul.actor_system( addr="0.0.0.0:8000", passphrase="my-secret" )
Queue API¶
writer = await system.queue.write("my_topic") reader = await system.queue.read("my_topic")
Source code in pulsing/__init__.py
as_any(ref)
¶
Return an untyped proxy that forwards any method call to the remote actor.
Use when you have an ActorRef and want to call methods by name without the typed class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ref
|
ActorRef
|
ActorRef from resolve(name). |
required |
Example
ref = await resolve("channel.discord") proxy = as_any(ref) # or proxy = ref.as_any() await proxy.send_text(chat_id, content)
Source code in pulsing/core/remote.py
cleanup_ray()
¶
get_system()
¶
Get the global actor system (must call init() first)
Source code in pulsing/core/__init__.py
init(addr=None, *, seeds=None, passphrase=None, head_addr=None, is_head_node=False)
async
¶
Initialize Pulsing actor system
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
addr
|
str
|
Bind address (e.g., "0.0.0.0:8000"). None for standalone mode. |
None
|
seeds
|
list[str]
|
Seed nodes to join cluster (Gossip mode). |
None
|
passphrase
|
str
|
Enable TLS with this passphrase. |
None
|
head_addr
|
str
|
Address of head node (worker mode). Mutually exclusive with is_head_node. |
None
|
is_head_node
|
bool
|
If True, this node runs as head. Mutually exclusive with head_addr. |
False
|
Returns:
| Type | Description |
|---|---|
ActorSystem
|
ActorSystem instance |
Example
Standalone mode¶
await init()
Cluster mode (Gossip + seed)¶
await init(addr="0.0.0.0:8001", seeds=["192.168.1.1:8000"])
Head node¶
await init(addr="0.0.0.0:8000", is_head_node=True)
Worker node¶
await init(addr="0.0.0.0:8001", head_addr="192.168.1.1:8000")
Source code in pulsing/core/__init__.py
init_inside_ray()
¶
Initialize Pulsing in Ray worker and join cluster (async version).
Usage::
await pul.init_inside_ray()
init_inside_torchrun()
¶
Initialize Pulsing in current process and join cluster via torch.distributed.
Rank 0 becomes the seed; others join with seeds=[rank0_addr]. Call after torch.distributed.init_process_group() (e.g. when launched with torchrun).
Usage::
import torch.distributed as dist
dist.init_process_group(...)
system = pul.init_inside_torchrun()
Source code in pulsing/__init__.py
is_initialized()
¶
mount(instance, *, name, public=True)
¶
Mount an existing Python object to the Pulsing communication network.
Synchronous interface, can be called in __init__. Automatically:
1. Initialize Pulsing (if not already, auto-detects Ray environment)
2. Wrap instance as a Pulsing actor
3. Register to Pulsing network, other nodes can discover via pul.resolve(name)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instance
|
Any
|
Object to mount (any Python instance) |
required |
name
|
str
|
Pulsing name, other nodes resolve via this name |
required |
public
|
bool
|
Whether discoverable by other cluster nodes (default True) |
True
|
Example::
@ray.remote
class Counter:
def __init__(self, name, peers):
self.name = name
self.peers = sorted(peers)
pul.mount(self, name=name)
async def greet(self, msg):
return f"Hello from {self.name}: {msg}"
Source code in pulsing/core/remote.py
refer(actorid)
async
¶
Get actor reference by ID using global system
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
actorid
|
ActorId | str
|
Actor ID (ActorId instance or string) |
required |
Returns:
| Type | Description |
|---|---|
ActorRef
|
ActorRef to the actor |
Source code in pulsing/__init__.py
resolve(name, *, node_id=None, timeout=None)
async
¶
Resolve a named actor by name.
Returns an ActorRef that supports .ask(), .tell(), .as_any(), and .as_type(). Use .as_any() to get an untyped proxy that forwards any method call. Use .as_type(Counter) to get a typed proxy with method validation.
For typed ActorProxy with method calls, use Counter.resolve(name) instead.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Actor name |
required |
node_id
|
int | None
|
Target node ID, searches in cluster if not provided |
None
|
timeout
|
float | None
|
Seconds to wait for the name to appear (gossip convergence). None means no wait (error immediately if not found). |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
ActorRef |
Actor reference with .as_any() / .as_type() for proxy generation. |
Example
from pulsing.core import init, remote, resolve
await init()
By name only (no type needed)¶
ref = await resolve("channel.discord") proxy = ref.as_any() await proxy.send_text(chat_id, content)
Wait for name to appear (gossip convergence)¶
ref = await resolve("peer_node", timeout=30)
Low-level ask¶
ref = await resolve("my_counter") result = await ref.ask({"call": "increment", "args": [], "kwargs": {}})
Source code in pulsing/core/remote.py
shutdown()
async
¶
spawn(actor, *, name=None, public=False, restart_policy='never', max_restarts=3, min_backoff=0.1, max_backoff=30.0)
async
¶
Spawn an actor using the global system
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
actor
|
Any
|
Actor instance |
required |
name
|
str | None
|
Actor name (auto-generated if None) |
None
|
public
|
bool
|
Whether to register as public named actor |
False
|
restart_policy
|
str
|
Restart policy ("never", "always", "on-failure") |
'never'
|
max_restarts
|
int
|
Maximum restart attempts |
3
|
min_backoff
|
float
|
Minimum backoff seconds |
0.1
|
max_backoff
|
float
|
Maximum backoff seconds |
30.0
|
Returns:
| Type | Description |
|---|---|
ActorRef
|
ActorRef to the spawned actor |
Example
import pulsing as pul
await pul.init()
class MyActor: async def receive(self, msg): return f"Got: {msg}"
ref = await pul.spawn(MyActor(), name="my_actor")
Source code in pulsing/__init__.py
unmount(name)
¶
Unmount a previously mounted actor from the Pulsing network.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name used during mounting |
required |
Source code in pulsing/core/remote.py
Actor Module¶
Pulsing Core - Python bindings for distributed actor framework
Simple API
from pulsing.core import init, shutdown, remote
await init()
@remote class Counter: def init(self, init=0): self.value = init def incr(self): self.value += 1; return self.value
counter = await Counter.spawn(init=10) result = await counter.incr()
await shutdown()
Advanced API
from pulsing.core import ActorSystem, Actor, Message, SystemConfig
Classes¶
Actor
¶
Bases: ABC
Base class for Python actors. Implement receive to handle messages.
Python actors can receive and return arbitrary Python objects when communicating with other Python actors. The objects are automatically pickled and unpickled.
For communication with Rust actors, use Message.from_json() and msg.to_json().
Functions¶
metadata()
¶
on_start(actor_id)
¶
on_stop()
¶
receive(msg)
abstractmethod
async
¶
Handle incoming message
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
msg
|
Incoming message. Can be: - Any Python object (when called from Python actors with ask/tell) - Message object (when called from Rust actors or with Message.from_json) |
required |
Returns:
| Type | Description |
|---|---|
|
|
|
|
|
|
|
Example (Python-to-Python, simple objects): # Caller: result = await counter.ask({"action": "increment", "n": 10})
# Actor receive:
async def receive(self, msg):
if isinstance(msg, dict) and msg.get("action") == "increment":
self.value += msg["n"]
return {"value": self.value}
Example (Rust actor communication): async def receive(self, msg): if isinstance(msg, Message) and msg.msg_type == "Ping": return Message.from_json("Pong", {"count": 1}) return None
Source code in pulsing/core/__init__.py
ActorProxy(actor_ref, method_names=None, async_methods=None)
¶
Actor proxy.
Source code in pulsing/core/remote.py
PulsingActorError(message, actor_name=None, cause=None)
¶
Bases: PulsingError
User Actor execution errors.
This corresponds to pulsing_actor::error::ActorError in Rust.
These are errors raised by user code during Actor execution: - Business errors (user input errors) - System errors (internal errors from user code) - Timeout errors (operation timeouts) - Unsupported errors (unsupported operations)
Note: Framework-level errors like "Actor not found" are RuntimeError, not ActorError.
Source code in pulsing/exceptions.py
PulsingError
¶
Bases: Exception
Base exception for all Pulsing errors.
This corresponds to pulsing_actor::error::PulsingError in Rust.
PulsingRuntimeError(message, cause=None)
¶
Bases: PulsingError
Framework/system-level errors.
This corresponds to pulsing_actor::error::RuntimeError in Rust.
These are framework-level errors, not caused by user code: - Actor system errors (NotFound, Stopped, etc.) - Transport errors (ConnectionFailed, etc.) - Cluster errors (NodeNotFound, etc.) - Config errors (InvalidValue, etc.) - I/O errors - Serialization errors
Source code in pulsing/exceptions.py
PythonActorService(system)
¶
Bases: _ActorBase
Python Actor creation service - one per node, handles Python actor creation requests.
Note: Rust SystemActor (path "system/core") handles system-level operations, this service specifically handles Python actor creation.
Source code in pulsing/core/remote.py
SystemActorProxy(actor_ref)
¶
Proxy for SystemActor with direct method calls.
Example
system_proxy = await get_system_actor(system) actors = await system_proxy.list_actors() metrics = await system_proxy.get_metrics() await system_proxy.ping()
Source code in pulsing/core/remote.py
Functions¶
as_any(ref)
¶
Return an untyped proxy that forwards any method call to the remote actor.
Use when you have an ActorRef and want to call methods by name without the typed class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ref
|
ActorRef
|
ActorRef from resolve(name). |
required |
Example
ref = await resolve("channel.discord") proxy = as_any(ref) # or proxy = ref.as_any() await proxy.send_text(chat_id, content)
Source code in pulsing/core/remote.py
ask_with_timeout(actor_ref, msg, timeout=DEFAULT_ASK_TIMEOUT)
async
¶
Send a message and wait for response with timeout.
This is a convenience wrapper around ActorRef.ask() that adds timeout support. When timeout occurs, the local task is cancelled. Note that this does NOT guarantee the remote handler will stop - it relies on HTTP/2 RST_STREAM propagation for stream cancellation.
For handlers that may run long, implement idempotent operations and/or check for stream closure in streaming scenarios.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
actor_ref
|
ActorRef
|
Target actor reference |
required |
msg
|
Any
|
Message to send (any Python object or Message) |
required |
timeout
|
float
|
Timeout in seconds (default: 30.0) |
DEFAULT_ASK_TIMEOUT
|
Returns:
| Type | Description |
|---|---|
Any
|
Response from the actor |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If timeout expires before response |
Exception
|
Any error from the actor |
Example
try: result = await ask_with_timeout(actor_ref, {"action": "compute"}, timeout=10.0) except asyncio.TimeoutError: print("Request timed out")
Source code in pulsing/core/__init__.py
get_system()
¶
Get the global actor system (must call init() first)
Source code in pulsing/core/__init__.py
get_system_actor(system, node_id=None)
async
¶
Get SystemActorProxy for direct method calls.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
system
|
ActorSystem
|
ActorSystem instance |
required |
node_id
|
int | None
|
Target node ID (None means local node) |
None
|
Returns:
| Type | Description |
|---|---|
SystemActorProxy
|
SystemActorProxy with methods: list_actors(), get_metrics(), etc. |
Example
sys = await get_system_actor(system) actors = await sys.list_actors() await sys.ping()
Source code in pulsing/core/remote.py
init(addr=None, *, seeds=None, passphrase=None, head_addr=None, is_head_node=False)
async
¶
Initialize Pulsing actor system
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
addr
|
str
|
Bind address (e.g., "0.0.0.0:8000"). None for standalone mode. |
None
|
seeds
|
list[str]
|
Seed nodes to join cluster (Gossip mode). |
None
|
passphrase
|
str
|
Enable TLS with this passphrase. |
None
|
head_addr
|
str
|
Address of head node (worker mode). Mutually exclusive with is_head_node. |
None
|
is_head_node
|
bool
|
If True, this node runs as head. Mutually exclusive with head_addr. |
False
|
Returns:
| Type | Description |
|---|---|
ActorSystem
|
ActorSystem instance |
Example
Standalone mode¶
await init()
Cluster mode (Gossip + seed)¶
await init(addr="0.0.0.0:8001", seeds=["192.168.1.1:8000"])
Head node¶
await init(addr="0.0.0.0:8000", is_head_node=True)
Worker node¶
await init(addr="0.0.0.0:8001", head_addr="192.168.1.1:8000")
Source code in pulsing/core/__init__.py
is_initialized()
¶
mount(instance, *, name, public=True)
¶
Mount an existing Python object to the Pulsing communication network.
Synchronous interface, can be called in __init__. Automatically:
1. Initialize Pulsing (if not already, auto-detects Ray environment)
2. Wrap instance as a Pulsing actor
3. Register to Pulsing network, other nodes can discover via pul.resolve(name)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
instance
|
Any
|
Object to mount (any Python instance) |
required |
name
|
str
|
Pulsing name, other nodes resolve via this name |
required |
public
|
bool
|
Whether discoverable by other cluster nodes (default True) |
True
|
Example::
@ray.remote
class Counter:
def __init__(self, name, peers):
self.name = name
self.peers = sorted(peers)
pul.mount(self, name=name)
async def greet(self, msg):
return f"Hello from {self.name}: {msg}"
Source code in pulsing/core/remote.py
resolve(name, *, node_id=None, timeout=None)
async
¶
Resolve a named actor by name.
Returns an ActorRef that supports .ask(), .tell(), .as_any(), and .as_type(). Use .as_any() to get an untyped proxy that forwards any method call. Use .as_type(Counter) to get a typed proxy with method validation.
For typed ActorProxy with method calls, use Counter.resolve(name) instead.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Actor name |
required |
node_id
|
int | None
|
Target node ID, searches in cluster if not provided |
None
|
timeout
|
float | None
|
Seconds to wait for the name to appear (gossip convergence). None means no wait (error immediately if not found). |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
ActorRef |
Actor reference with .as_any() / .as_type() for proxy generation. |
Example
from pulsing.core import init, remote, resolve
await init()
By name only (no type needed)¶
ref = await resolve("channel.discord") proxy = ref.as_any() await proxy.send_text(chat_id, content)
Wait for name to appear (gossip convergence)¶
ref = await resolve("peer_node", timeout=30)
Low-level ask¶
ref = await resolve("my_counter") result = await ref.ask({"call": "increment", "args": [], "kwargs": {}})
Source code in pulsing/core/remote.py
shutdown()
async
¶
tell_with_timeout(actor_ref, msg, timeout=DEFAULT_ASK_TIMEOUT)
async
¶
Send a fire-and-forget message with timeout.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
actor_ref
|
ActorRef
|
Target actor reference |
required |
msg
|
Any
|
Message to send |
required |
timeout
|
float
|
Timeout in seconds (default: 30.0) |
DEFAULT_ASK_TIMEOUT
|
Raises:
| Type | Description |
|---|---|
TimeoutError
|
If timeout expires |
Source code in pulsing/core/__init__.py
unmount(name)
¶
Unmount a previously mounted actor from the Pulsing network.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name used during mounting |
required |
Source code in pulsing/core/remote.py
Agent Module¶
Pulsing Agent Toolbox
Lightweight multi-agent development tools, fully compatible with pulsing.core.
Core APIs: - runtime(): Actor system lifecycle management - agent(): @remote with metadata (for visualization) - llm(): LLM client - parse_json(): JSON parsing
Example
from pulsing.core import remote, resolve from pulsing.agent import agent, runtime, llm, get_agent_meta
@remote: Basic Actor¶
@remote class Worker: async def work(self): ...
@agent: Actor with metadata (for visualization/debugging)¶
@agent(role="Researcher", goal="Deep analysis") class Researcher: async def analyze(self, topic: str) -> str: client = await llm() return await client.ainvoke(f"Analyze: {topic}")
async def main(): async with runtime(): r = await Researcher.spawn(name="researcher") result = await r.analyze("AI")
# Get metadata
meta = get_agent_meta("researcher")
print(meta.role) # "Researcher"
Classes¶
AgentMeta(role='', goal='', backstory='', tags=dict())
dataclass
¶
Agent metadata
Functions¶
agent(role='', goal='', backstory='', **tags)
¶
Agent decorator - equivalent to @remote, but with additional metadata
Metadata can be retrieved via get_agent_meta(name).
Example
@agent(role="Researcher", goal="Deep analysis") class Researcher: async def analyze(self, topic: str) -> str: ...
async with runtime(): r = await Researcher.spawn(name="researcher")
# Get metadata
meta = get_agent_meta("researcher")
print(meta.role) # "Researcher"
Source code in pulsing/agent/base.py
cleanup()
¶
Clean up all agent-related global state.
Includes: - Agent metadata registry - LLM singleton
Recommended to call when repeatedly creating/destroying runtime to avoid memory leaks.
Example
from pulsing.agent import runtime, cleanup
try: async with runtime(): agent = await MyAgent.spawn(name="agent") await agent.work() finally: cleanup() # Clean up all global state
Source code in pulsing/agent/__init__.py
clear_agent_registry()
¶
extract_field(content, field, fallback=None)
¶
Extract specified field from JSON output.
Example
response = '{"score": 8, "reason": "good"}' score = extract_field(response, "score", fallback=0) # 8
Source code in pulsing/agent/utils.py
get_agent_meta(name)
¶
list_agents()
¶
parse_json(content, fallback=None)
¶
Parse JSON from LLM output.
Supports:
- Pure JSON strings
- json ... code block wrapped
- Returns fallback on parse failure
Example
from pulsing.agent import parse_json
Pure JSON¶
data = parse_json('{"name": "test"}')
Code block wrapped¶
data = parse_json('json\n{"name": "test"}\n')
Parse failure¶
data = parse_json('invalid', fallback={}) # Returns {}
Source code in pulsing/agent/utils.py
Queue Module¶
Streaming - Queue (point-to-point) and Pub/Sub (topic) APIs
Queue
writer = await system.streaming.write("my_queue") # or system.queue reader = await system.streaming.read("my_queue")
Topic
writer = await system.topic.write("events") reader = await system.topic.read("events")
Classes¶
BucketStorage(bucket_id, storage_path, batch_size=100, backend='memory', backend_options=None)
¶
Storage Actor for a Single Bucket
Uses pluggable StorageBackend for data storage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bucket_id
|
int
|
Bucket ID |
required |
storage_path
|
str
|
Storage path |
required |
batch_size
|
int
|
Batch size |
100
|
backend
|
str | type
|
Backend name or backend class - "memory": Pure in-memory backend (default) - Custom name/class: Use register_backend() or pass class |
'memory'
|
backend_options
|
dict[str, Any] | None
|
Additional parameters passed to backend |
None
|
Source code in pulsing/streaming/storage.py
Functions¶
flush()
async
¶
get(limit=100, offset=0)
async
¶
Get records.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum number of records to return |
100
|
offset
|
int
|
Starting offset |
0
|
Returns:
| Type | Description |
|---|---|
list[dict]
|
List of records |
Source code in pulsing/streaming/storage.py
get_stream(limit=100, offset=0, wait=False, timeout=None)
async
¶
Get records as a stream.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
limit
|
int
|
Maximum number of records to return |
100
|
offset
|
int
|
Starting offset |
0
|
wait
|
bool
|
Whether to wait for new records |
False
|
timeout
|
float | None
|
Timeout in seconds |
None
|
Yields:
| Type | Description |
|---|---|
AsyncIterator[list[dict]]
|
Batches of records |
Source code in pulsing/streaming/storage.py
put(record)
async
¶
Put a single record.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
record
|
dict
|
Record to store |
required |
Returns:
| Type | Description |
|---|---|
dict
|
{"status": "ok"} |
Source code in pulsing/streaming/storage.py
put_batch(records)
async
¶
Put multiple records.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
records
|
list[dict]
|
List of records to store |
required |
Returns:
| Type | Description |
|---|---|
dict
|
{"status": "ok", "count": N} |
Source code in pulsing/streaming/storage.py
stats()
async
¶
MemoryBackend(bucket_id, **kwargs)
¶
Pure In-Memory Backend - Built-in Default Implementation
Features: - No persistence, data exists only in memory - Supports blocking wait for new data - Lightweight, suitable for testing and temporary data
For persistence, use a plugin that implements StorageBackend (e.g. register_backend).
Source code in pulsing/streaming/backend.py
PublishMode
¶
Bases: Enum
Publish mode
PublishResult(success, delivered, failed, subscriber_count, failed_subscribers=None)
dataclass
¶
Publish result
Queue(system, topic, bucket_column='id', num_buckets=4, batch_size=100, storage_path=None, backend='memory', backend_options=None)
¶
Distributed Queue - High-level API
Each bucket corresponds to an independent BucketStorage Actor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
system
|
ActorSystem
|
Actor system |
required |
topic
|
str
|
Queue topic |
required |
bucket_column
|
str
|
Bucketing column name |
'id'
|
num_buckets
|
int
|
Number of buckets |
4
|
batch_size
|
int
|
Batch size |
100
|
storage_path
|
str | None
|
Storage path |
None
|
backend
|
str | type
|
Storage backend - "memory": Pure in-memory backend (default) - Custom: register_backend() or class implementing StorageBackend |
'memory'
|
backend_options
|
dict[str, Any] | None
|
Additional backend parameters |
None
|
Source code in pulsing/streaming/queue.py
Functions¶
flush()
async
¶
Flush all bucket buffers
Source code in pulsing/streaming/queue.py
get(bucket_id=None, limit=100, offset=0, wait=False, timeout=None)
async
¶
Read data from queue (both in-memory and persistent data visible)
Source code in pulsing/streaming/queue.py
get_bucket_id(partition_value)
¶
put(record)
async
¶
Write data to queue
Source code in pulsing/streaming/queue.py
stats()
async
¶
Get queue statistics
Source code in pulsing/streaming/queue.py
sync()
¶
Return synchronous wrapper
Example
queue = Queue(system, topic="test") sync_queue = queue.sync() sync_queue.put({"id": "1", "value": 100}) # Synchronous write records = sync_queue.get(limit=10) # Synchronous read
Source code in pulsing/streaming/queue.py
QueueAPI(system)
¶
Queue API entry point via system.queue
Source code in pulsing/streaming/__init__.py
Functions¶
read(topic, *, bucket_id=None, bucket_ids=None, rank=None, world_size=None, num_buckets=4, storage_path=None, backend='memory', backend_options=None)
async
¶
Open queue for reading
Source code in pulsing/streaming/__init__.py
write(topic, *, bucket_column='id', num_buckets=4, batch_size=100, storage_path=None, backend='memory', backend_options=None)
async
¶
Open queue for writing
Source code in pulsing/streaming/__init__.py
QueueReader(queue, bucket_ids=None)
¶
Queue read handle
Supports three modes: 1. bucket_ids=None: Read from all buckets 2. bucket_ids=[0, 2]: Read from specified bucket list 3. Auto-assign buckets via rank/world_size (distributed consumption)
Source code in pulsing/streaming/queue.py
Functions¶
get(limit=100, wait=False, timeout=None)
async
¶
Read data from assigned buckets
Source code in pulsing/streaming/queue.py
reset()
¶
set_offset(offset, bucket_id=None)
¶
QueueWriter(queue)
¶
Queue write handle
Source code in pulsing/streaming/queue.py
StorageBackend
¶
Bases: Protocol
Storage Backend Protocol
All storage backends must implement this protocol. Can be implemented via inheritance or duck typing.
Functions¶
flush()
abstractmethod
async
¶
get(limit, offset)
abstractmethod
async
¶
get_data(batch_meta, fields=None)
async
¶
get_meta(fields, batch_size, task_name='default', sampler=None, **sampling_kwargs)
async
¶
Optional tensor-native metadata API.
get_stream(limit, offset, wait=False, timeout=None)
abstractmethod
async
¶
put(record)
abstractmethod
async
¶
put_batch(records)
abstractmethod
async
¶
put_tensor(data, **kwargs)
async
¶
stats()
abstractmethod
async
¶
StorageManager(system, base_storage_path='./queue_storage', default_backend='memory')
¶
Storage manager Actor
One instance per node, responsible for: 1. Receiving GetBucket/GetTopic requests 2. Determining if resource belongs to this node (consistent hashing) 3. If belongs to this node: create/return corresponding Actor 4. If not belongs to this node: return Redirect response pointing to correct node
Supported resource types: - Queue Bucket: GetBucket -> BucketStorage Actor - Topic Broker: GetTopic -> TopicBroker Actor
Source code in pulsing/streaming/manager.py
Functions¶
get_bucket(topic, bucket_id, batch_size=100, storage_path=None, backend=None, backend_options=None)
async
¶
Get bucket reference.
Returns:
| Type | Description |
|---|---|
dict
|
|
dict
|
|
Source code in pulsing/streaming/manager.py
get_stats()
async
¶
Get storage manager statistics.
Returns:
| Type | Description |
|---|---|
dict
|
{"node_id": ..., "bucket_count": ..., "topic_count": ..., "buckets": [...], "topics": [...]} |
Source code in pulsing/streaming/manager.py
get_topic(topic)
async
¶
Get topic broker reference.
Returns:
| Type | Description |
|---|---|
dict
|
|
dict
|
|
Source code in pulsing/streaming/manager.py
list_buckets()
async
¶
List all buckets managed by this node.
Returns:
| Type | Description |
|---|---|
list[dict]
|
List of {"topic": ..., "bucket_id": ...} |
Source code in pulsing/streaming/manager.py
list_topics()
async
¶
List all topics managed by this node.
Returns:
| Type | Description |
|---|---|
list[str]
|
List of topic names |
SyncQueue(queue)
¶
SyncQueueReader(reader)
¶
SyncQueueWriter(writer)
¶
TopicAPI(system)
¶
TopicReader(system, topic, reader_id=None)
¶
Topic read handle
Example
reader = await read_topic(system, "events")
@reader.on_message async def handle(msg): print(f"Received: {msg}")
await reader.start()
Source code in pulsing/streaming/pubsub.py
Functions¶
add_callback(callback)
¶
on_message(callback)
¶
Register message callback (decorator style)
Example
@reader.on_message async def handle(msg): print(f"Received: {msg}")
Source code in pulsing/streaming/pubsub.py
remove_callback(callback)
¶
start()
async
¶
Start receiving messages
Source code in pulsing/streaming/pubsub.py
stats()
async
¶
stop()
async
¶
Stop receiving messages
Source code in pulsing/streaming/pubsub.py
TopicWriter(system, topic, writer_id=None)
¶
Topic write handle
Source code in pulsing/streaming/pubsub.py
Functions¶
publish(message, mode=PublishMode.FIRE_AND_FORGET, timeout=None)
async
¶
Publish message
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Any
|
Message to publish (any Python object) |
required |
mode
|
PublishMode
|
Publish mode |
FIRE_AND_FORGET
|
timeout
|
float | None
|
Timeout in seconds. None means use default timeout. For WAIT_ANY_ACK and WAIT_ALL_ACKS modes, local task will be cancelled after timeout, but remote handler may still be executing (relies on HTTP/2 RST_STREAM to propagate cancellation). |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
PublishResult |
PublishResult
|
Publish result |
Raises:
| Type | Description |
|---|---|
TimeoutError
|
Timeout |
RuntimeError
|
Other errors |
Source code in pulsing/streaming/pubsub.py
Functions¶
get_backend_class(backend)
¶
Get backend class
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
backend
|
str | type
|
Backend name (str) or backend class (type) |
required |
Returns:
| Type | Description |
|---|---|
type
|
Backend class |
Source code in pulsing/streaming/backend.py
get_bucket_ref(system, topic, bucket_id, batch_size=100, storage_path=None, backend=None, backend_options=None, max_redirects=3)
async
¶
Get ActorProxy for specified bucket
Automatically handles redirects to ensure getting the bucket on the correct node. Returns ActorProxy for direct method calls on BucketStorage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
system
|
ActorSystem
|
Actor system |
required |
topic
|
str
|
Queue topic |
required |
bucket_id
|
int
|
Bucket ID |
required |
batch_size
|
int
|
Batch size |
100
|
storage_path
|
str | None
|
Custom storage path (optional) |
None
|
backend
|
str | type | None
|
Storage backend name or class (optional) |
None
|
backend_options
|
dict | None
|
Additional backend options (optional) |
None
|
max_redirects
|
int
|
Maximum redirect count |
3
|
Source code in pulsing/streaming/manager.py
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 | |
get_storage_manager(system)
async
¶
Get StorageManager proxy for this node, create if not exists.
Returns:
| Type | Description |
|---|---|
ActorProxy
|
ActorProxy for direct method calls on StorageManager |
Source code in pulsing/streaming/manager.py
get_topic_broker(system, topic, max_redirects=3)
async
¶
Get broker ActorProxy for specified topic
Automatically handles redirects to ensure getting the broker on the correct node. Returns ActorProxy for direct method calls on TopicBroker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
system
|
ActorSystem
|
Actor system |
required |
topic
|
str
|
Topic name |
required |
max_redirects
|
int
|
Maximum redirect count |
3
|
Source code in pulsing/streaming/manager.py
list_backends()
¶
read_queue(system, topic, bucket_id=None, bucket_ids=None, rank=None, world_size=None, num_buckets=4, storage_path=None, backend='memory', backend_options=None)
async
¶
Open queue for reading
Supports three modes: 1. Default: Read from all buckets 2. bucket_id/bucket_ids: Read from specified buckets 3. rank/world_size: Distributed consumption, auto-assign buckets
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
system
|
ActorSystem
|
Actor system |
required |
topic
|
str
|
Queue topic |
required |
bucket_id
|
int | None
|
Single bucket ID (mutually exclusive with bucket_ids) |
None
|
bucket_ids
|
list[int] | None
|
List of bucket IDs (mutually exclusive with bucket_id) |
None
|
rank
|
int | None
|
Current consumer's rank (0 to world_size-1) |
None
|
world_size
|
int | None
|
Total number of consumers |
None
|
num_buckets
|
int
|
Number of buckets (for rank/world_size mode) |
4
|
storage_path
|
str | None
|
Storage path |
None
|
backend
|
str | type
|
Storage backend (must match backend used for writing) |
'memory'
|
backend_options
|
dict[str, Any] | None
|
Additional backend parameters |
None
|
Example
Distributed consumption: 4 buckets, 2 consumers¶
reader0 = await read_queue(system, "q", rank=0, world_size=2) # bucket 0, 2 reader1 = await read_queue(system, "q", rank=1, world_size=2) # bucket 1, 3
Source code in pulsing/streaming/queue.py
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 | |
read_topic(system, topic, reader_id=None, auto_start=False)
async
¶
Open topic for reading
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
system
|
ActorSystem
|
Actor system |
required |
topic
|
str
|
Topic name |
required |
reader_id
|
str | None
|
Reader ID (optional) |
None
|
auto_start
|
bool
|
Whether to automatically start receiving |
False
|
Returns:
| Name | Type | Description |
|---|---|---|
TopicReader |
TopicReader
|
Read handle |
Example
reader = await read_topic(system, "events")
@reader.on_message async def handle(msg): print(f"Received: {msg}")
await reader.start()
Source code in pulsing/streaming/pubsub.py
register_backend(name, backend_class)
¶
Register a custom storage backend (e.g. from a plugin package).
Example
from my_plugin import MyBackend register_backend("my_backend", MyBackend) writer = await write_queue(system, "topic", backend="my_backend")
Source code in pulsing/streaming/backend.py
subscribe_to_topic(system, topic, subscriber_id, actor_name, node_id=None)
async
¶
Subscribe an actor to a topic.
This is a helper function for manually registering subscribers with a topic broker. For normal usage, prefer using TopicReader which handles this automatically.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
system
|
ActorSystem
|
ActorSystem instance |
required |
topic
|
str
|
Topic name |
required |
subscriber_id
|
str
|
Unique subscriber identifier |
required |
actor_name
|
str
|
Name of the actor to receive messages |
required |
node_id
|
int | None
|
Optional node ID (defaults to local node) |
None
|
Returns:
| Type | Description |
|---|---|
dict
|
Response dict from broker |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If subscription fails |
Source code in pulsing/streaming/pubsub.py
write_queue(system, topic, bucket_column='id', num_buckets=4, batch_size=100, storage_path=None, backend='memory', backend_options=None)
async
¶
Open queue for writing
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
system
|
ActorSystem
|
Actor system |
required |
topic
|
str
|
Queue topic |
required |
bucket_column
|
str
|
Bucketing column name |
'id'
|
num_buckets
|
int
|
Number of buckets |
4
|
batch_size
|
int
|
Batch size |
100
|
storage_path
|
str | None
|
Storage path |
None
|
backend
|
str | type
|
Storage backend - "memory": Pure in-memory backend (default) - Custom: register_backend() or pass StorageBackend class |
'memory'
|
backend_options
|
dict[str, Any] | None
|
Additional backend parameters |
None
|
Example
writer = await write_queue(system, "my_queue")
Custom backend from a plugin¶
from my_plugin import MyBackend from .backend import register_backend register_backend("my_backend", MyBackend) writer = await write_queue(system, "my_queue", backend="my_backend")
Source code in pulsing/streaming/queue.py
write_topic(system, topic, writer_id=None)
async
¶
Open topic for writing
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
system
|
ActorSystem
|
Actor system |
required |
topic
|
str
|
Topic name |
required |
writer_id
|
str | None
|
Writer ID (optional) |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
TopicWriter |
TopicWriter
|
Write handle |
Example
writer = await write_topic(system, "events") await writer.publish({"type": "user_login"})