Skip to content

Python API Reference

This page contains the complete auto-generated API documentation for Pulsing's Python interface.

Installation

Pulsing requires Python 3.10+ and can be installed via pip:

pip install pulsing

For development, clone the repository and install in development mode:

git clone https://github.com/DeepLink-org/pulsing
cd pulsing
pip install -e .

Core Module

Pulsing - Distributed Actor Framework

Usage

import pulsing as pul

await pul.init()

@pul.remote class Counter: def init(self, init=0): self.value = init def incr(self): self.value += 1; return self.value

counter = await Counter.spawn(name="counter") result = await counter.incr()

await pul.shutdown()

Classes

Actor

Bases: ABC

Base class for Python actors. Implement receive to handle messages.

Python actors can receive and return arbitrary Python objects when communicating with other Python actors. The objects are automatically pickled and unpickled.

For communication with Rust actors, use Message.from_json() and msg.to_json().

Functions

metadata()

Return actor metadata for diagnostics

Source code in pulsing/core/__init__.py
def metadata(self) -> dict[str, str]:
    """Return actor metadata for diagnostics"""
    return {}
on_start(actor_id)

Called when actor starts. Override to handle actor startup.

Source code in pulsing/core/__init__.py
def on_start(self, actor_id: ActorId) -> None:  # noqa: B027
    """Called when actor starts. Override to handle actor startup."""
    pass
on_stop()

Called when actor stops. Override to handle actor cleanup.

Source code in pulsing/core/__init__.py
def on_stop(self) -> None:  # noqa: B027
    """Called when actor stops. Override to handle actor cleanup."""
    pass
receive(msg) abstractmethod async

Handle incoming message

Parameters:

Name Type Description Default
msg

Incoming message. Can be: - Any Python object (when called from Python actors with ask/tell) - Message object (when called from Rust actors or with Message.from_json)

required

Returns:

Type Description
  • Any Python object: automatically pickled for Python-to-Python communication
  • Message.from_json("Type", {...}): JSON response for Rust actor communication
  • StreamMessage.create(...): Streaming response
  • None: No response

Example (Python-to-Python, simple objects): # Caller: result = await counter.ask({"action": "increment", "n": 10})

# Actor receive:
async def receive(self, msg):
    if isinstance(msg, dict) and msg.get("action") == "increment":
        self.value += msg["n"]
        return {"value": self.value}

Example (Rust actor communication): async def receive(self, msg): if isinstance(msg, Message) and msg.msg_type == "Ping": return Message.from_json("Pong", {"count": 1}) return None

Source code in pulsing/core/__init__.py
@abstractmethod
async def receive(self, msg):
    """
    Handle incoming message

    Args:
        msg: Incoming message. Can be:
             - Any Python object (when called from Python actors with ask/tell)
             - Message object (when called from Rust actors or with Message.from_json)

    Returns:
        - Any Python object: automatically pickled for Python-to-Python communication
        - Message.from_json("Type", {...}): JSON response for Rust actor communication
        - StreamMessage.create(...): Streaming response
        - None: No response

    Example (Python-to-Python, simple objects):
        # Caller:
        result = await counter.ask({"action": "increment", "n": 10})

        # Actor receive:
        async def receive(self, msg):
            if isinstance(msg, dict) and msg.get("action") == "increment":
                self.value += msg["n"]
                return {"value": self.value}

    Example (Rust actor communication):
        async def receive(self, msg):
            if isinstance(msg, Message) and msg.msg_type == "Ping":
                return Message.from_json("Pong", {"count": 1})
            return None
    """
    pass

ActorProxy(actor_ref, method_names=None, async_methods=None)

Actor proxy.

Source code in pulsing/core/remote.py
def __init__(
    self,
    actor_ref: ActorRef,
    method_names: list[str] | None = None,
    async_methods: set[str] | None = None,
):
    self._ref = actor_ref
    self._method_names = set(method_names) if method_names else None
    # None means "any proxy": allow any method, treat all as async (streaming support)
    self._async_methods = async_methods

Attributes

ref property

Get underlying ActorRef.

Functions

as_any()

Return an untyped proxy that forwards any method call to the remote actor.

Source code in pulsing/core/remote.py
def as_any(self) -> "ActorProxy":
    """Return an untyped proxy that forwards any method call to the remote actor."""
    return ActorProxy(self._ref, method_names=None, async_methods=None)
from_ref(actor_ref, methods=None, async_methods=None) classmethod

Create ActorProxy from ActorRef.

Source code in pulsing/core/remote.py
@classmethod
def from_ref(
    cls,
    actor_ref: ActorRef,
    methods: list[str] | None = None,
    async_methods: set[str] | None = None,
) -> "ActorProxy":
    """Create ActorProxy from ActorRef."""
    return cls(actor_ref, methods, async_methods)

ActorSystem(inner)

ActorSystem wrapper with queue/topic API

This wraps the Rust ActorSystem and adds Python-level extensions like queue and topic APIs.

Source code in pulsing/__init__.py
def __init__(self, inner: _ActorSystem):
    self._inner = inner
    from pulsing.streaming import QueueAPI, TopicAPI

    self.queue = QueueAPI(inner)
    self.topic = TopicAPI(inner)

Functions

refer(actorid) async

Get actor reference by ID

Parameters:

Name Type Description Default
actorid ActorId | str

Actor ID (ActorId instance or string in format "node_id:local_id")

required

Returns:

Type Description
ActorRef

ActorRef to the actor

Source code in pulsing/__init__.py
async def refer(self, actorid: ActorId | str) -> ActorRef:
    """Get actor reference by ID

    Args:
        actorid: Actor ID (ActorId instance or string in format "node_id:local_id")

    Returns:
        ActorRef to the actor
    """
    if isinstance(actorid, str):
        actorid = ActorId.from_str(actorid)
    return await self._inner.refer(actorid)

PulsingActorError(message, actor_name=None, cause=None)

Bases: PulsingError

User Actor execution errors.

This corresponds to pulsing_actor::error::ActorError in Rust.

These are errors raised by user code during Actor execution: - Business errors (user input errors) - System errors (internal errors from user code) - Timeout errors (operation timeouts) - Unsupported errors (unsupported operations)

Note: Framework-level errors like "Actor not found" are RuntimeError, not ActorError.

Source code in pulsing/exceptions.py
def __init__(
    self,
    message: str,
    actor_name: str | None = None,
    cause: Exception | None = None,
):
    super().__init__(message)
    self.actor_name = actor_name
    self.cause = cause

PulsingBusinessError(code, message, details=None)

Bases: PulsingActorError

Business error: User input error, business logic error.

These errors are recoverable and should be returned to the caller. Automatically converted to ActorError::Business in Rust.

Example

@remote class UserActor: async def validate_age(self, age: int) -> bool: if age < 18: raise PulsingBusinessError(400, "Age must be >= 18", details="User validation failed") return True

Source code in pulsing/exceptions.py
def __init__(self, code: int, message: str, details: str | None = None):
    self.code = code
    self.message = message
    self.details = details
    super().__init__(f"[{code}] {message}", cause=None)

PulsingError

Bases: Exception

Base exception for all Pulsing errors.

This corresponds to pulsing_actor::error::PulsingError in Rust.

PulsingRuntimeError(message, cause=None)

Bases: PulsingError

Framework/system-level errors.

This corresponds to pulsing_actor::error::RuntimeError in Rust.

These are framework-level errors, not caused by user code: - Actor system errors (NotFound, Stopped, etc.) - Transport errors (ConnectionFailed, etc.) - Cluster errors (NodeNotFound, etc.) - Config errors (InvalidValue, etc.) - I/O errors - Serialization errors

Source code in pulsing/exceptions.py
def __init__(self, message: str, cause: Exception | None = None):
    super().__init__(message)
    self.cause = cause

PulsingSystemError(error, recoverable=True)

Bases: PulsingActorError

System error: Internal error, resource error.

May trigger Actor restart depending on recoverable flag. Automatically converted to ActorError::System in Rust.

Example

@remote class DataProcessor: async def process(self, data: str) -> str: try: return process_data(data) except Exception as e: raise PulsingSystemError(f"Processing failed: {e}", recoverable=True)

Source code in pulsing/exceptions.py
def __init__(self, error: str, recoverable: bool = True):
    self.error = error
    self.recoverable = recoverable
    super().__init__(error, cause=None)

PulsingTimeoutError(operation, duration_ms=0)

Bases: PulsingActorError

Timeout error: Operation timed out.

Usually recoverable, can be retried. Automatically converted to ActorError::Timeout in Rust.

Example

@remote class NetworkActor: async def fetch(self, url: str) -> str: try: return await asyncio.wait_for(httpx.get(url), timeout=5.0) except asyncio.TimeoutError: raise PulsingTimeoutError("fetch", duration_ms=5000)

Source code in pulsing/exceptions.py
def __init__(self, operation: str, duration_ms: int = 0):
    self.operation = operation
    self.duration_ms = duration_ms
    super().__init__(
        f"Operation '{operation}' timed out after {duration_ms}ms", cause=None
    )

PulsingUnsupportedError(operation)

Bases: PulsingActorError

Unsupported operation error.

Not recoverable. Indicates that the requested operation is not supported. Automatically converted to ActorError::Unsupported in Rust.

Example

@remote class LegacyActor: async def process(self, data: str) -> str: if data.startswith("legacy:"): raise PulsingUnsupportedError("process") return process_data(data)

Source code in pulsing/exceptions.py
def __init__(self, operation: str):
    self.operation = operation
    super().__init__(f"Unsupported operation: {operation}", cause=None)

Functions

actor_system(addr=None, *, seeds=None, passphrase=None) async

Create a new ActorSystem (does not set global system)

This is the Actor System style API for explicit system management. Use this when you need multiple systems or want explicit control.

Parameters:

Name Type Description Default
addr str | None

Bind address (e.g., "0.0.0.0:8000"). None for standalone mode.

None
seeds list[str] | None

Seed nodes to join cluster

None
passphrase str | None

Enable TLS with this passphrase

None

Returns:

Type Description
ActorSystem

ActorSystem instance with .queue API

Example

import pulsing as pul

Standalone mode

system = await pul.actor_system()

Cluster mode

system = await pul.actor_system(addr="0.0.0.0:8000")

Join existing cluster

system = await pul.actor_system( addr="0.0.0.0:8001", seeds=["192.168.1.1:8000"] )

With TLS

system = await pul.actor_system( addr="0.0.0.0:8000", passphrase="my-secret" )

Queue API

writer = await system.queue.write("my_topic") reader = await system.queue.read("my_topic")

Source code in pulsing/__init__.py
async def actor_system(
    addr: str | None = None,
    *,
    seeds: list[str] | None = None,
    passphrase: str | None = None,
) -> ActorSystem:
    """Create a new ActorSystem (does not set global system)

    This is the Actor System style API for explicit system management.
    Use this when you need multiple systems or want explicit control.

    Args:
        addr: Bind address (e.g., "0.0.0.0:8000"). None for standalone mode.
        seeds: Seed nodes to join cluster
        passphrase: Enable TLS with this passphrase

    Returns:
        ActorSystem instance with .queue API

    Example:
        import pulsing as pul

        # Standalone mode
        system = await pul.actor_system()

        # Cluster mode
        system = await pul.actor_system(addr="0.0.0.0:8000")

        # Join existing cluster
        system = await pul.actor_system(
            addr="0.0.0.0:8001",
            seeds=["192.168.1.1:8000"]
        )

        # With TLS
        system = await pul.actor_system(
            addr="0.0.0.0:8000",
            passphrase="my-secret"
        )

        # Queue API
        writer = await system.queue.write("my_topic")
        reader = await system.queue.read("my_topic")
    """
    # Build config
    if addr:
        config = SystemConfig.with_addr(addr)
    else:
        config = SystemConfig.standalone()

    if seeds:
        config = config.with_seeds(seeds)

    if passphrase:
        config = config.with_passphrase(passphrase)

    loop = asyncio.get_running_loop()
    inner = await _ActorSystem.create(config, loop)

    # Wrap with Python ActorSystem
    system = ActorSystem(inner)

    # Automatically register PythonActorService (for remote actor creation)
    service = PythonActorService(inner)
    await inner.spawn(service, name=PYTHON_ACTOR_SERVICE_NAME, public=True)

    return system

as_any(ref)

Return an untyped proxy that forwards any method call to the remote actor.

Use when you have an ActorRef and want to call methods by name without the typed class.

Parameters:

Name Type Description Default
ref ActorRef

ActorRef from resolve(name).

required
Example

ref = await resolve("channel.discord") proxy = as_any(ref) # or proxy = ref.as_any() await proxy.send_text(chat_id, content)

Source code in pulsing/core/remote.py
def as_any(ref: ActorRef) -> ActorProxy:
    """Return an untyped proxy that forwards any method call to the remote actor.

    Use when you have an ActorRef and want to call methods by name
    without the typed class.

    Args:
        ref: ActorRef from resolve(name).

    Example:
        ref = await resolve("channel.discord")
        proxy = as_any(ref)  # or proxy = ref.as_any()
        await proxy.send_text(chat_id, content)
    """
    return ref.as_any()

cleanup_ray()

Clean up Pulsing state in Ray KV store

Source code in pulsing/__init__.py
def cleanup_ray():
    """Clean up Pulsing state in Ray KV store"""
    from pulsing.integrations.ray import cleanup

    return cleanup()

get_system()

Get the global actor system (must call init() first)

Source code in pulsing/core/__init__.py
def get_system() -> ActorSystem:
    """Get the global actor system (must call init() first)"""
    if _global_system is None:
        from pulsing.exceptions import PulsingRuntimeError

        raise PulsingRuntimeError(
            "Actor system not initialized. Call 'await init()' first."
        )
    return _global_system

init(addr=None, *, seeds=None, passphrase=None, head_addr=None, is_head_node=False) async

Initialize Pulsing actor system

Parameters:

Name Type Description Default
addr str

Bind address (e.g., "0.0.0.0:8000"). None for standalone mode.

None
seeds list[str]

Seed nodes to join cluster (Gossip mode).

None
passphrase str

Enable TLS with this passphrase.

None
head_addr str

Address of head node (worker mode). Mutually exclusive with is_head_node.

None
is_head_node bool

If True, this node runs as head. Mutually exclusive with head_addr.

False

Returns:

Type Description
ActorSystem

ActorSystem instance

Example

Standalone mode

await init()

Cluster mode (Gossip + seed)

await init(addr="0.0.0.0:8001", seeds=["192.168.1.1:8000"])

Head node

await init(addr="0.0.0.0:8000", is_head_node=True)

Worker node

await init(addr="0.0.0.0:8001", head_addr="192.168.1.1:8000")

Source code in pulsing/core/__init__.py
async def init(
    addr: str = None,
    *,
    seeds: list[str] = None,
    passphrase: str = None,
    head_addr: str = None,
    is_head_node: bool = False,
) -> ActorSystem:
    """Initialize Pulsing actor system

    Args:
        addr: Bind address (e.g., "0.0.0.0:8000"). None for standalone mode.
        seeds: Seed nodes to join cluster (Gossip mode).
        passphrase: Enable TLS with this passphrase.
        head_addr: Address of head node (worker mode). Mutually exclusive with is_head_node.
        is_head_node: If True, this node runs as head. Mutually exclusive with head_addr.

    Returns:
        ActorSystem instance

    Example:
        # Standalone mode
        await init()

        # Cluster mode (Gossip + seed)
        await init(addr="0.0.0.0:8001", seeds=["192.168.1.1:8000"])

        # Head node
        await init(addr="0.0.0.0:8000", is_head_node=True)

        # Worker node
        await init(addr="0.0.0.0:8001", head_addr="192.168.1.1:8000")
    """
    global _global_system

    if _global_system is not None:
        return _global_system

    if is_head_node and head_addr:
        raise ValueError("Cannot set both is_head_node and head_addr")

    # Build config
    if addr:
        config = SystemConfig.with_addr(addr)
    else:
        config = SystemConfig.standalone()

    if seeds:
        config = config.with_seeds(seeds)
    if is_head_node:
        config = config.with_head_node()
    elif head_addr:
        config = config.with_head_addr(head_addr)

    if passphrase:
        config = config.with_passphrase(passphrase)

    loop = asyncio.get_running_loop()
    _global_system = await ActorSystem.create(config, loop)
    # Automatically register PythonActorService for remote actor creation
    from .remote import PYTHON_ACTOR_SERVICE_NAME, PythonActorService

    service = PythonActorService(_global_system)
    await _global_system.spawn(service, name=PYTHON_ACTOR_SERVICE_NAME, public=True)
    return _global_system

init_inside_ray()

Initialize Pulsing in Ray worker and join cluster (async version).

Usage::

await pul.init_inside_ray()
Source code in pulsing/__init__.py
def init_inside_ray():
    """Initialize Pulsing in Ray worker and join cluster (async version).

    Usage::

        await pul.init_inside_ray()
    """
    from pulsing.integrations.ray import async_init_in_ray

    return async_init_in_ray()

init_inside_torchrun()

Initialize Pulsing in current process and join cluster via torch.distributed.

Rank 0 becomes the seed; others join with seeds=[rank0_addr]. Call after torch.distributed.init_process_group() (e.g. when launched with torchrun).

Usage::

import torch.distributed as dist
dist.init_process_group(...)
system = pul.init_inside_torchrun()
Source code in pulsing/__init__.py
def init_inside_torchrun():
    """Initialize Pulsing in current process and join cluster via torch.distributed.

    Rank 0 becomes the seed; others join with seeds=[rank0_addr]. Call after
    torch.distributed.init_process_group() (e.g. when launched with torchrun).

    Usage::

        import torch.distributed as dist
        dist.init_process_group(...)
        system = pul.init_inside_torchrun()
    """
    from pulsing.integrations.torchrun import init_in_torchrun

    return init_in_torchrun()

is_initialized()

Check if the global actor system is initialized

Source code in pulsing/core/__init__.py
def is_initialized() -> bool:
    """Check if the global actor system is initialized"""
    return _global_system is not None

mount(instance, *, name, public=True)

Mount an existing Python object to the Pulsing communication network.

Synchronous interface, can be called in __init__. Automatically: 1. Initialize Pulsing (if not already, auto-detects Ray environment) 2. Wrap instance as a Pulsing actor 3. Register to Pulsing network, other nodes can discover via pul.resolve(name)

Parameters:

Name Type Description Default
instance Any

Object to mount (any Python instance)

required
name str

Pulsing name, other nodes resolve via this name

required
public bool

Whether discoverable by other cluster nodes (default True)

True

Example::

@ray.remote
class Counter:
    def __init__(self, name, peers):
        self.name = name
        self.peers = sorted(peers)
        pul.mount(self, name=name)

    async def greet(self, msg):
        return f"Hello from {self.name}: {msg}"
Source code in pulsing/core/remote.py
def mount(instance: Any, *, name: str, public: bool = True) -> None:
    """Mount an existing Python object to the Pulsing communication network.

    Synchronous interface, can be called in ``__init__``. Automatically:
      1. Initialize Pulsing (if not already, auto-detects Ray environment)
      2. Wrap instance as a Pulsing actor
      3. Register to Pulsing network, other nodes can discover via ``pul.resolve(name)``

    Args:
        instance: Object to mount (any Python instance)
        name: Pulsing name, other nodes resolve via this name
        public: Whether discoverable by other cluster nodes (default True)

    Example::

        @ray.remote
        class Counter:
            def __init__(self, name, peers):
                self.name = name
                self.peers = sorted(peers)
                pul.mount(self, name=name)

            async def greet(self, msg):
                return f"Hello from {self.name}: {msg}"
    """
    from . import _global_system

    # Auto-initialize Pulsing
    if _global_system is None:
        _auto_init_pulsing()

    from . import _global_system as system

    if system is None:
        raise RuntimeError(
            "Pulsing initialization failed. Please call pul.init() or run in Ray environment."
        )

    actor_name = name if "/" in name else f"actors/{name}"
    wrapped = _WrappedActor(instance)

    async def _do_mount():
        ref = await system.spawn(wrapped, name=actor_name, public=public)
        return ref

    actor_ref = _run_sync_on_pulsing_loop(_do_mount())
    wrapped._inject_delayed(actor_ref)
    _register_actor_metadata(actor_name, type(instance))

refer(actorid) async

Get actor reference by ID using global system

Parameters:

Name Type Description Default
actorid ActorId | str

Actor ID (ActorId instance or string)

required

Returns:

Type Description
ActorRef

ActorRef to the actor

Source code in pulsing/__init__.py
async def refer(actorid: ActorId | str) -> ActorRef:
    """Get actor reference by ID using global system

    Args:
        actorid: Actor ID (ActorId instance or string)

    Returns:
        ActorRef to the actor
    """
    system = get_system()
    if isinstance(actorid, str):
        # Parse string to ActorId
        actorid = ActorId.from_str(actorid)
    if isinstance(actorid, int):
        actorid = ActorId(actorid)
    return await system.refer(actorid)

resolve(name, *, node_id=None, timeout=None) async

Resolve a named actor by name.

Returns an ActorRef that supports .ask(), .tell(), .as_any(), and .as_type(). Use .as_any() to get an untyped proxy that forwards any method call. Use .as_type(Counter) to get a typed proxy with method validation.

For typed ActorProxy with method calls, use Counter.resolve(name) instead.

Parameters:

Name Type Description Default
name str

Actor name

required
node_id int | None

Target node ID, searches in cluster if not provided

None
timeout float | None

Seconds to wait for the name to appear (gossip convergence). None means no wait (error immediately if not found).

None

Returns:

Name Type Description
ActorRef

Actor reference with .as_any() / .as_type() for proxy generation.

Example

from pulsing.core import init, remote, resolve

await init()

By name only (no type needed)

ref = await resolve("channel.discord") proxy = ref.as_any() await proxy.send_text(chat_id, content)

Wait for name to appear (gossip convergence)

ref = await resolve("peer_node", timeout=30)

Low-level ask

ref = await resolve("my_counter") result = await ref.ask({"call": "increment", "args": [], "kwargs": {}})

Source code in pulsing/core/remote.py
async def resolve(
    name: str,
    *,
    node_id: int | None = None,
    timeout: float | None = None,
):
    """Resolve a named actor by name.

    Returns an ActorRef that supports .ask(), .tell(), .as_any(), and .as_type().
    Use .as_any() to get an untyped proxy that forwards any method call.
    Use .as_type(Counter) to get a typed proxy with method validation.

    For typed ActorProxy with method calls, use Counter.resolve(name) instead.

    Args:
        name: Actor name
        node_id: Target node ID, searches in cluster if not provided
        timeout: Seconds to wait for the name to appear (gossip convergence).
                 None means no wait (error immediately if not found).

    Returns:
        ActorRef: Actor reference with .as_any() / .as_type() for proxy generation.

    Example:
        from pulsing.core import init, remote, resolve

        await init()

        # By name only (no type needed)
        ref = await resolve("channel.discord")
        proxy = ref.as_any()
        await proxy.send_text(chat_id, content)

        # Wait for name to appear (gossip convergence)
        ref = await resolve("peer_node", timeout=30)

        # Low-level ask
        ref = await resolve("my_counter")
        result = await ref.ask({"__call__": "increment", "args": [], "kwargs": {}})
    """
    from . import _global_system

    if _global_system is None:
        raise RuntimeError("Actor system not initialized. Call 'await init()' first.")

    try:
        return await _global_system.resolve(name, node_id=node_id, timeout=timeout)
    except RuntimeError as e:
        raise _convert_rust_error(e) from e

shutdown() async

Shutdown the global actor system

Source code in pulsing/core/__init__.py
async def shutdown() -> None:
    """Shutdown the global actor system"""
    global _global_system

    if _global_system is not None:
        await _global_system.shutdown()
        _global_system = None

spawn(actor, *, name=None, public=False, restart_policy='never', max_restarts=3, min_backoff=0.1, max_backoff=30.0) async

Spawn an actor using the global system

Parameters:

Name Type Description Default
actor Any

Actor instance

required
name str | None

Actor name (auto-generated if None)

None
public bool

Whether to register as public named actor

False
restart_policy str

Restart policy ("never", "always", "on-failure")

'never'
max_restarts int

Maximum restart attempts

3
min_backoff float

Minimum backoff seconds

0.1
max_backoff float

Maximum backoff seconds

30.0

Returns:

Type Description
ActorRef

ActorRef to the spawned actor

Example

import pulsing as pul

await pul.init()

class MyActor: async def receive(self, msg): return f"Got: {msg}"

ref = await pul.spawn(MyActor(), name="my_actor")

Source code in pulsing/__init__.py
async def spawn(
    actor: Any,
    *,
    name: str | None = None,
    public: bool = False,
    restart_policy: str = "never",
    max_restarts: int = 3,
    min_backoff: float = 0.1,
    max_backoff: float = 30.0,
) -> ActorRef:
    """Spawn an actor using the global system

    Args:
        actor: Actor instance
        name: Actor name (auto-generated if None)
        public: Whether to register as public named actor
        restart_policy: Restart policy ("never", "always", "on-failure")
        max_restarts: Maximum restart attempts
        min_backoff: Minimum backoff seconds
        max_backoff: Maximum backoff seconds

    Returns:
        ActorRef to the spawned actor

    Example:
        import pulsing as pul

        await pul.init()

        class MyActor:
            async def receive(self, msg):
                return f"Got: {msg}"

        ref = await pul.spawn(MyActor(), name="my_actor")
    """
    system = get_system()
    return await system.spawn(
        actor,
        name=name,
        public=public,
        restart_policy=restart_policy,
        max_restarts=max_restarts,
        min_backoff=min_backoff,
        max_backoff=max_backoff,
    )

unmount(name)

Unmount a previously mounted actor from the Pulsing network.

Parameters:

Name Type Description Default
name str

Name used during mounting

required
Source code in pulsing/core/remote.py
def unmount(name: str) -> None:
    """Unmount a previously mounted actor from the Pulsing network.

    Args:
        name: Name used during mounting
    """
    from . import _global_system

    if _global_system is None:
        return

    actor_name = name if "/" in name else f"actors/{name}"

    async def _do_unmount():
        await _global_system.stop(actor_name)

    _run_sync_on_pulsing_loop(_do_unmount())

Actor Module

Pulsing Core - Python bindings for distributed actor framework

Simple API

from pulsing.core import init, shutdown, remote

await init()

@remote class Counter: def init(self, init=0): self.value = init def incr(self): self.value += 1; return self.value

counter = await Counter.spawn(init=10) result = await counter.incr()

await shutdown()

Advanced API

from pulsing.core import ActorSystem, Actor, Message, SystemConfig

Classes

Actor

Bases: ABC

Base class for Python actors. Implement receive to handle messages.

Python actors can receive and return arbitrary Python objects when communicating with other Python actors. The objects are automatically pickled and unpickled.

For communication with Rust actors, use Message.from_json() and msg.to_json().

Functions

metadata()

Return actor metadata for diagnostics

Source code in pulsing/core/__init__.py
def metadata(self) -> dict[str, str]:
    """Return actor metadata for diagnostics"""
    return {}
on_start(actor_id)

Called when actor starts. Override to handle actor startup.

Source code in pulsing/core/__init__.py
def on_start(self, actor_id: ActorId) -> None:  # noqa: B027
    """Called when actor starts. Override to handle actor startup."""
    pass
on_stop()

Called when actor stops. Override to handle actor cleanup.

Source code in pulsing/core/__init__.py
def on_stop(self) -> None:  # noqa: B027
    """Called when actor stops. Override to handle actor cleanup."""
    pass
receive(msg) abstractmethod async

Handle incoming message

Parameters:

Name Type Description Default
msg

Incoming message. Can be: - Any Python object (when called from Python actors with ask/tell) - Message object (when called from Rust actors or with Message.from_json)

required

Returns:

Type Description
  • Any Python object: automatically pickled for Python-to-Python communication
  • Message.from_json("Type", {...}): JSON response for Rust actor communication
  • StreamMessage.create(...): Streaming response
  • None: No response

Example (Python-to-Python, simple objects): # Caller: result = await counter.ask({"action": "increment", "n": 10})

# Actor receive:
async def receive(self, msg):
    if isinstance(msg, dict) and msg.get("action") == "increment":
        self.value += msg["n"]
        return {"value": self.value}

Example (Rust actor communication): async def receive(self, msg): if isinstance(msg, Message) and msg.msg_type == "Ping": return Message.from_json("Pong", {"count": 1}) return None

Source code in pulsing/core/__init__.py
@abstractmethod
async def receive(self, msg):
    """
    Handle incoming message

    Args:
        msg: Incoming message. Can be:
             - Any Python object (when called from Python actors with ask/tell)
             - Message object (when called from Rust actors or with Message.from_json)

    Returns:
        - Any Python object: automatically pickled for Python-to-Python communication
        - Message.from_json("Type", {...}): JSON response for Rust actor communication
        - StreamMessage.create(...): Streaming response
        - None: No response

    Example (Python-to-Python, simple objects):
        # Caller:
        result = await counter.ask({"action": "increment", "n": 10})

        # Actor receive:
        async def receive(self, msg):
            if isinstance(msg, dict) and msg.get("action") == "increment":
                self.value += msg["n"]
                return {"value": self.value}

    Example (Rust actor communication):
        async def receive(self, msg):
            if isinstance(msg, Message) and msg.msg_type == "Ping":
                return Message.from_json("Pong", {"count": 1})
            return None
    """
    pass

ActorProxy(actor_ref, method_names=None, async_methods=None)

Actor proxy.

Source code in pulsing/core/remote.py
def __init__(
    self,
    actor_ref: ActorRef,
    method_names: list[str] | None = None,
    async_methods: set[str] | None = None,
):
    self._ref = actor_ref
    self._method_names = set(method_names) if method_names else None
    # None means "any proxy": allow any method, treat all as async (streaming support)
    self._async_methods = async_methods

Attributes

ref property

Get underlying ActorRef.

Functions

as_any()

Return an untyped proxy that forwards any method call to the remote actor.

Source code in pulsing/core/remote.py
def as_any(self) -> "ActorProxy":
    """Return an untyped proxy that forwards any method call to the remote actor."""
    return ActorProxy(self._ref, method_names=None, async_methods=None)
from_ref(actor_ref, methods=None, async_methods=None) classmethod

Create ActorProxy from ActorRef.

Source code in pulsing/core/remote.py
@classmethod
def from_ref(
    cls,
    actor_ref: ActorRef,
    methods: list[str] | None = None,
    async_methods: set[str] | None = None,
) -> "ActorProxy":
    """Create ActorProxy from ActorRef."""
    return cls(actor_ref, methods, async_methods)

PulsingActorError(message, actor_name=None, cause=None)

Bases: PulsingError

User Actor execution errors.

This corresponds to pulsing_actor::error::ActorError in Rust.

These are errors raised by user code during Actor execution: - Business errors (user input errors) - System errors (internal errors from user code) - Timeout errors (operation timeouts) - Unsupported errors (unsupported operations)

Note: Framework-level errors like "Actor not found" are RuntimeError, not ActorError.

Source code in pulsing/exceptions.py
def __init__(
    self,
    message: str,
    actor_name: str | None = None,
    cause: Exception | None = None,
):
    super().__init__(message)
    self.actor_name = actor_name
    self.cause = cause

PulsingError

Bases: Exception

Base exception for all Pulsing errors.

This corresponds to pulsing_actor::error::PulsingError in Rust.

PulsingRuntimeError(message, cause=None)

Bases: PulsingError

Framework/system-level errors.

This corresponds to pulsing_actor::error::RuntimeError in Rust.

These are framework-level errors, not caused by user code: - Actor system errors (NotFound, Stopped, etc.) - Transport errors (ConnectionFailed, etc.) - Cluster errors (NodeNotFound, etc.) - Config errors (InvalidValue, etc.) - I/O errors - Serialization errors

Source code in pulsing/exceptions.py
def __init__(self, message: str, cause: Exception | None = None):
    super().__init__(message)
    self.cause = cause

PythonActorService(system)

Bases: _ActorBase

Python Actor creation service - one per node, handles Python actor creation requests.

Note: Rust SystemActor (path "system/core") handles system-level operations, this service specifically handles Python actor creation.

Source code in pulsing/core/remote.py
def __init__(self, system: ActorSystem):
    self.system = system

SystemActorProxy(actor_ref)

Proxy for SystemActor with direct method calls.

Example

system_proxy = await get_system_actor(system) actors = await system_proxy.list_actors() metrics = await system_proxy.get_metrics() await system_proxy.ping()

Source code in pulsing/core/remote.py
def __init__(self, actor_ref: ActorRef):
    self._ref = actor_ref

Attributes

ref property

Get underlying ActorRef.

Functions

get_metrics() async

Get system metrics.

Source code in pulsing/core/remote.py
async def get_metrics(self) -> dict:
    """Get system metrics."""
    return await self._ask("GetMetrics")
get_node_info() async

Get node info.

Source code in pulsing/core/remote.py
async def get_node_info(self) -> dict:
    """Get node info."""
    return await self._ask("GetNodeInfo")
health_check() async

Health check.

Source code in pulsing/core/remote.py
async def health_check(self) -> dict:
    """Health check."""
    return await self._ask("HealthCheck")
list_actors() async

List all actors on this node.

Source code in pulsing/core/remote.py
async def list_actors(self) -> list[dict]:
    """List all actors on this node."""
    data = await self._ask("ListActors")
    if data.get("type") == "Error":
        # System error: system message failed
        raise PulsingRuntimeError(data.get("message"))
    return data.get("actors", [])
ping() async

Ping this node.

Source code in pulsing/core/remote.py
async def ping(self) -> dict:
    """Ping this node."""
    return await self._ask("Ping")

Functions

as_any(ref)

Return an untyped proxy that forwards any method call to the remote actor.

Use when you have an ActorRef and want to call methods by name without the typed class.

Parameters:

Name Type Description Default
ref ActorRef

ActorRef from resolve(name).

required
Example

ref = await resolve("channel.discord") proxy = as_any(ref) # or proxy = ref.as_any() await proxy.send_text(chat_id, content)

Source code in pulsing/core/remote.py
def as_any(ref: ActorRef) -> ActorProxy:
    """Return an untyped proxy that forwards any method call to the remote actor.

    Use when you have an ActorRef and want to call methods by name
    without the typed class.

    Args:
        ref: ActorRef from resolve(name).

    Example:
        ref = await resolve("channel.discord")
        proxy = as_any(ref)  # or proxy = ref.as_any()
        await proxy.send_text(chat_id, content)
    """
    return ref.as_any()

ask_with_timeout(actor_ref, msg, timeout=DEFAULT_ASK_TIMEOUT) async

Send a message and wait for response with timeout.

This is a convenience wrapper around ActorRef.ask() that adds timeout support. When timeout occurs, the local task is cancelled. Note that this does NOT guarantee the remote handler will stop - it relies on HTTP/2 RST_STREAM propagation for stream cancellation.

For handlers that may run long, implement idempotent operations and/or check for stream closure in streaming scenarios.

Parameters:

Name Type Description Default
actor_ref ActorRef

Target actor reference

required
msg Any

Message to send (any Python object or Message)

required
timeout float

Timeout in seconds (default: 30.0)

DEFAULT_ASK_TIMEOUT

Returns:

Type Description
Any

Response from the actor

Raises:

Type Description
TimeoutError

If timeout expires before response

Exception

Any error from the actor

Example

try: result = await ask_with_timeout(actor_ref, {"action": "compute"}, timeout=10.0) except asyncio.TimeoutError: print("Request timed out")

Source code in pulsing/core/__init__.py
async def ask_with_timeout(
    actor_ref: ActorRef,
    msg: Any,
    timeout: float = DEFAULT_ASK_TIMEOUT,
) -> Any:
    """Send a message and wait for response with timeout.

    This is a convenience wrapper around ActorRef.ask() that adds timeout support.
    When timeout occurs, the local task is cancelled. Note that this does NOT
    guarantee the remote handler will stop - it relies on HTTP/2 RST_STREAM
    propagation for stream cancellation.

    For handlers that may run long, implement idempotent operations and/or
    check for stream closure in streaming scenarios.

    Args:
        actor_ref: Target actor reference
        msg: Message to send (any Python object or Message)
        timeout: Timeout in seconds (default: 30.0)

    Returns:
        Response from the actor

    Raises:
        asyncio.TimeoutError: If timeout expires before response
        Exception: Any error from the actor

    Example:
        try:
            result = await ask_with_timeout(actor_ref, {"action": "compute"}, timeout=10.0)
        except asyncio.TimeoutError:
            print("Request timed out")
    """
    return await asyncio.wait_for(actor_ref.ask(msg), timeout=timeout)

get_system()

Get the global actor system (must call init() first)

Source code in pulsing/core/__init__.py
def get_system() -> ActorSystem:
    """Get the global actor system (must call init() first)"""
    if _global_system is None:
        from pulsing.exceptions import PulsingRuntimeError

        raise PulsingRuntimeError(
            "Actor system not initialized. Call 'await init()' first."
        )
    return _global_system

get_system_actor(system, node_id=None) async

Get SystemActorProxy for direct method calls.

Parameters:

Name Type Description Default
system ActorSystem

ActorSystem instance

required
node_id int | None

Target node ID (None means local node)

None

Returns:

Type Description
SystemActorProxy

SystemActorProxy with methods: list_actors(), get_metrics(), etc.

Example

sys = await get_system_actor(system) actors = await sys.list_actors() await sys.ping()

Source code in pulsing/core/remote.py
async def get_system_actor(
    system: ActorSystem, node_id: int | None = None
) -> SystemActorProxy:
    """Get SystemActorProxy for direct method calls.

    Args:
        system: ActorSystem instance
        node_id: Target node ID (None means local node)

    Returns:
        SystemActorProxy with methods: list_actors(), get_metrics(), etc.

    Example:
        sys = await get_system_actor(system)
        actors = await sys.list_actors()
        await sys.ping()
    """
    if node_id is None:
        actor_ref = await system.system()
    else:
        actor_ref = await system.remote_system(node_id)
    return SystemActorProxy(actor_ref)

init(addr=None, *, seeds=None, passphrase=None, head_addr=None, is_head_node=False) async

Initialize Pulsing actor system

Parameters:

Name Type Description Default
addr str

Bind address (e.g., "0.0.0.0:8000"). None for standalone mode.

None
seeds list[str]

Seed nodes to join cluster (Gossip mode).

None
passphrase str

Enable TLS with this passphrase.

None
head_addr str

Address of head node (worker mode). Mutually exclusive with is_head_node.

None
is_head_node bool

If True, this node runs as head. Mutually exclusive with head_addr.

False

Returns:

Type Description
ActorSystem

ActorSystem instance

Example

Standalone mode

await init()

Cluster mode (Gossip + seed)

await init(addr="0.0.0.0:8001", seeds=["192.168.1.1:8000"])

Head node

await init(addr="0.0.0.0:8000", is_head_node=True)

Worker node

await init(addr="0.0.0.0:8001", head_addr="192.168.1.1:8000")

Source code in pulsing/core/__init__.py
async def init(
    addr: str = None,
    *,
    seeds: list[str] = None,
    passphrase: str = None,
    head_addr: str = None,
    is_head_node: bool = False,
) -> ActorSystem:
    """Initialize Pulsing actor system

    Args:
        addr: Bind address (e.g., "0.0.0.0:8000"). None for standalone mode.
        seeds: Seed nodes to join cluster (Gossip mode).
        passphrase: Enable TLS with this passphrase.
        head_addr: Address of head node (worker mode). Mutually exclusive with is_head_node.
        is_head_node: If True, this node runs as head. Mutually exclusive with head_addr.

    Returns:
        ActorSystem instance

    Example:
        # Standalone mode
        await init()

        # Cluster mode (Gossip + seed)
        await init(addr="0.0.0.0:8001", seeds=["192.168.1.1:8000"])

        # Head node
        await init(addr="0.0.0.0:8000", is_head_node=True)

        # Worker node
        await init(addr="0.0.0.0:8001", head_addr="192.168.1.1:8000")
    """
    global _global_system

    if _global_system is not None:
        return _global_system

    if is_head_node and head_addr:
        raise ValueError("Cannot set both is_head_node and head_addr")

    # Build config
    if addr:
        config = SystemConfig.with_addr(addr)
    else:
        config = SystemConfig.standalone()

    if seeds:
        config = config.with_seeds(seeds)
    if is_head_node:
        config = config.with_head_node()
    elif head_addr:
        config = config.with_head_addr(head_addr)

    if passphrase:
        config = config.with_passphrase(passphrase)

    loop = asyncio.get_running_loop()
    _global_system = await ActorSystem.create(config, loop)
    # Automatically register PythonActorService for remote actor creation
    from .remote import PYTHON_ACTOR_SERVICE_NAME, PythonActorService

    service = PythonActorService(_global_system)
    await _global_system.spawn(service, name=PYTHON_ACTOR_SERVICE_NAME, public=True)
    return _global_system

is_initialized()

Check if the global actor system is initialized

Source code in pulsing/core/__init__.py
def is_initialized() -> bool:
    """Check if the global actor system is initialized"""
    return _global_system is not None

mount(instance, *, name, public=True)

Mount an existing Python object to the Pulsing communication network.

Synchronous interface, can be called in __init__. Automatically: 1. Initialize Pulsing (if not already, auto-detects Ray environment) 2. Wrap instance as a Pulsing actor 3. Register to Pulsing network, other nodes can discover via pul.resolve(name)

Parameters:

Name Type Description Default
instance Any

Object to mount (any Python instance)

required
name str

Pulsing name, other nodes resolve via this name

required
public bool

Whether discoverable by other cluster nodes (default True)

True

Example::

@ray.remote
class Counter:
    def __init__(self, name, peers):
        self.name = name
        self.peers = sorted(peers)
        pul.mount(self, name=name)

    async def greet(self, msg):
        return f"Hello from {self.name}: {msg}"
Source code in pulsing/core/remote.py
def mount(instance: Any, *, name: str, public: bool = True) -> None:
    """Mount an existing Python object to the Pulsing communication network.

    Synchronous interface, can be called in ``__init__``. Automatically:
      1. Initialize Pulsing (if not already, auto-detects Ray environment)
      2. Wrap instance as a Pulsing actor
      3. Register to Pulsing network, other nodes can discover via ``pul.resolve(name)``

    Args:
        instance: Object to mount (any Python instance)
        name: Pulsing name, other nodes resolve via this name
        public: Whether discoverable by other cluster nodes (default True)

    Example::

        @ray.remote
        class Counter:
            def __init__(self, name, peers):
                self.name = name
                self.peers = sorted(peers)
                pul.mount(self, name=name)

            async def greet(self, msg):
                return f"Hello from {self.name}: {msg}"
    """
    from . import _global_system

    # Auto-initialize Pulsing
    if _global_system is None:
        _auto_init_pulsing()

    from . import _global_system as system

    if system is None:
        raise RuntimeError(
            "Pulsing initialization failed. Please call pul.init() or run in Ray environment."
        )

    actor_name = name if "/" in name else f"actors/{name}"
    wrapped = _WrappedActor(instance)

    async def _do_mount():
        ref = await system.spawn(wrapped, name=actor_name, public=public)
        return ref

    actor_ref = _run_sync_on_pulsing_loop(_do_mount())
    wrapped._inject_delayed(actor_ref)
    _register_actor_metadata(actor_name, type(instance))

resolve(name, *, node_id=None, timeout=None) async

Resolve a named actor by name.

Returns an ActorRef that supports .ask(), .tell(), .as_any(), and .as_type(). Use .as_any() to get an untyped proxy that forwards any method call. Use .as_type(Counter) to get a typed proxy with method validation.

For typed ActorProxy with method calls, use Counter.resolve(name) instead.

Parameters:

Name Type Description Default
name str

Actor name

required
node_id int | None

Target node ID, searches in cluster if not provided

None
timeout float | None

Seconds to wait for the name to appear (gossip convergence). None means no wait (error immediately if not found).

None

Returns:

Name Type Description
ActorRef

Actor reference with .as_any() / .as_type() for proxy generation.

Example

from pulsing.core import init, remote, resolve

await init()

By name only (no type needed)

ref = await resolve("channel.discord") proxy = ref.as_any() await proxy.send_text(chat_id, content)

Wait for name to appear (gossip convergence)

ref = await resolve("peer_node", timeout=30)

Low-level ask

ref = await resolve("my_counter") result = await ref.ask({"call": "increment", "args": [], "kwargs": {}})

Source code in pulsing/core/remote.py
async def resolve(
    name: str,
    *,
    node_id: int | None = None,
    timeout: float | None = None,
):
    """Resolve a named actor by name.

    Returns an ActorRef that supports .ask(), .tell(), .as_any(), and .as_type().
    Use .as_any() to get an untyped proxy that forwards any method call.
    Use .as_type(Counter) to get a typed proxy with method validation.

    For typed ActorProxy with method calls, use Counter.resolve(name) instead.

    Args:
        name: Actor name
        node_id: Target node ID, searches in cluster if not provided
        timeout: Seconds to wait for the name to appear (gossip convergence).
                 None means no wait (error immediately if not found).

    Returns:
        ActorRef: Actor reference with .as_any() / .as_type() for proxy generation.

    Example:
        from pulsing.core import init, remote, resolve

        await init()

        # By name only (no type needed)
        ref = await resolve("channel.discord")
        proxy = ref.as_any()
        await proxy.send_text(chat_id, content)

        # Wait for name to appear (gossip convergence)
        ref = await resolve("peer_node", timeout=30)

        # Low-level ask
        ref = await resolve("my_counter")
        result = await ref.ask({"__call__": "increment", "args": [], "kwargs": {}})
    """
    from . import _global_system

    if _global_system is None:
        raise RuntimeError("Actor system not initialized. Call 'await init()' first.")

    try:
        return await _global_system.resolve(name, node_id=node_id, timeout=timeout)
    except RuntimeError as e:
        raise _convert_rust_error(e) from e

shutdown() async

Shutdown the global actor system

Source code in pulsing/core/__init__.py
async def shutdown() -> None:
    """Shutdown the global actor system"""
    global _global_system

    if _global_system is not None:
        await _global_system.shutdown()
        _global_system = None

tell_with_timeout(actor_ref, msg, timeout=DEFAULT_ASK_TIMEOUT) async

Send a fire-and-forget message with timeout.

Parameters:

Name Type Description Default
actor_ref ActorRef

Target actor reference

required
msg Any

Message to send

required
timeout float

Timeout in seconds (default: 30.0)

DEFAULT_ASK_TIMEOUT

Raises:

Type Description
TimeoutError

If timeout expires

Source code in pulsing/core/__init__.py
async def tell_with_timeout(
    actor_ref: ActorRef,
    msg: Any,
    timeout: float = DEFAULT_ASK_TIMEOUT,
) -> None:
    """Send a fire-and-forget message with timeout.

    Args:
        actor_ref: Target actor reference
        msg: Message to send
        timeout: Timeout in seconds (default: 30.0)

    Raises:
        asyncio.TimeoutError: If timeout expires
    """
    await asyncio.wait_for(actor_ref.tell(msg), timeout=timeout)

unmount(name)

Unmount a previously mounted actor from the Pulsing network.

Parameters:

Name Type Description Default
name str

Name used during mounting

required
Source code in pulsing/core/remote.py
def unmount(name: str) -> None:
    """Unmount a previously mounted actor from the Pulsing network.

    Args:
        name: Name used during mounting
    """
    from . import _global_system

    if _global_system is None:
        return

    actor_name = name if "/" in name else f"actors/{name}"

    async def _do_unmount():
        await _global_system.stop(actor_name)

    _run_sync_on_pulsing_loop(_do_unmount())

Agent Module

Pulsing Agent Toolbox

Lightweight multi-agent development tools, fully compatible with pulsing.core.

Core APIs: - runtime(): Actor system lifecycle management - agent(): @remote with metadata (for visualization) - llm(): LLM client - parse_json(): JSON parsing

Example

from pulsing.core import remote, resolve from pulsing.agent import agent, runtime, llm, get_agent_meta

@remote: Basic Actor

@remote class Worker: async def work(self): ...

@agent: Actor with metadata (for visualization/debugging)

@agent(role="Researcher", goal="Deep analysis") class Researcher: async def analyze(self, topic: str) -> str: client = await llm() return await client.ainvoke(f"Analyze: {topic}")

async def main(): async with runtime(): r = await Researcher.spawn(name="researcher") result = await r.analyze("AI")

    # Get metadata
    meta = get_agent_meta("researcher")
    print(meta.role)  # "Researcher"

Classes

AgentMeta(role='', goal='', backstory='', tags=dict()) dataclass

Agent metadata

Functions

agent(role='', goal='', backstory='', **tags)

Agent decorator - equivalent to @remote, but with additional metadata

Metadata can be retrieved via get_agent_meta(name).

Example

@agent(role="Researcher", goal="Deep analysis") class Researcher: async def analyze(self, topic: str) -> str: ...

async with runtime(): r = await Researcher.spawn(name="researcher")

# Get metadata
meta = get_agent_meta("researcher")
print(meta.role)  # "Researcher"
Source code in pulsing/agent/base.py
def agent(
    role: str = "",
    goal: str = "",
    backstory: str = "",
    **tags: Any,
) -> Callable[[type[T]], type[T]]:
    """
    Agent decorator - equivalent to @remote, but with additional metadata

    Metadata can be retrieved via get_agent_meta(name).

    Example:
        @agent(role="Researcher", goal="Deep analysis")
        class Researcher:
            async def analyze(self, topic: str) -> str:
                ...

        async with runtime():
            r = await Researcher.spawn(name="researcher")

            # Get metadata
            meta = get_agent_meta("researcher")
            print(meta.role)  # "Researcher"
    """

    def decorator(cls: type[T]) -> type[T]:
        # Create metadata
        meta = AgentMeta(role=role, goal=goal, backstory=backstory, tags=tags)

        # Store on class (for use during spawn)
        cls._agent_meta_template = meta  # type: ignore

        # Wrap spawn method to register metadata
        actor_cls = remote(cls)
        original_spawn = actor_cls.spawn

        async def spawn_with_meta(*args: Any, name: str | None = None, **kwargs: Any):
            proxy = await original_spawn(*args, name=name, **kwargs)
            # Register metadata
            if name:
                _agent_meta_registry[name] = meta
            return proxy

        actor_cls.spawn = spawn_with_meta
        actor_cls.__agent_meta__ = meta  # Also accessible at class level

        return actor_cls

    return decorator

cleanup()

Clean up all agent-related global state.

Includes: - Agent metadata registry - LLM singleton

Recommended to call when repeatedly creating/destroying runtime to avoid memory leaks.

Example

from pulsing.agent import runtime, cleanup

try: async with runtime(): agent = await MyAgent.spawn(name="agent") await agent.work() finally: cleanup() # Clean up all global state

Source code in pulsing/agent/__init__.py
def cleanup():
    """
    Clean up all agent-related global state.

    Includes:
    - Agent metadata registry
    - LLM singleton

    Recommended to call when repeatedly creating/destroying runtime to avoid memory leaks.

    Example:
        from pulsing.agent import runtime, cleanup

        try:
            async with runtime():
                agent = await MyAgent.spawn(name="agent")
                await agent.work()
        finally:
            cleanup()  # Clean up all global state
    """
    clear_agent_registry()
    reset_llm()

clear_agent_registry()

Clear registry (for testing)

Source code in pulsing/agent/base.py
def clear_agent_registry() -> None:
    """Clear registry (for testing)"""
    _agent_meta_registry.clear()

extract_field(content, field, fallback=None)

Extract specified field from JSON output.

Example

response = '{"score": 8, "reason": "good"}' score = extract_field(response, "score", fallback=0) # 8

Source code in pulsing/agent/utils.py
def extract_field(content: str | None, field: str, fallback: Any = None) -> Any:
    """
    Extract specified field from JSON output.

    Example:
        response = '{"score": 8, "reason": "good"}'
        score = extract_field(response, "score", fallback=0)  # 8
    """
    data = parse_json(content, {})
    if isinstance(data, dict):
        return data.get(field, fallback)
    return fallback

get_agent_meta(name)

Get metadata by actor name

Source code in pulsing/agent/base.py
def get_agent_meta(name: str) -> AgentMeta | None:
    """Get metadata by actor name"""
    return _agent_meta_registry.get(name)

list_agents()

List all registered agents and their metadata

Source code in pulsing/agent/base.py
def list_agents() -> dict[str, AgentMeta]:
    """List all registered agents and their metadata"""
    return _agent_meta_registry.copy()

parse_json(content, fallback=None)

Parse JSON from LLM output.

Supports: - Pure JSON strings - json ... code block wrapped - Returns fallback on parse failure

Example

from pulsing.agent import parse_json

Pure JSON

data = parse_json('{"name": "test"}')

Code block wrapped

data = parse_json('json\n{"name": "test"}\n')

Parse failure

data = parse_json('invalid', fallback={}) # Returns {}

Source code in pulsing/agent/utils.py
def parse_json(content: str | None, fallback: Any = None) -> Any:
    """
    Parse JSON from LLM output.

    Supports:
    - Pure JSON strings
    - ```json ... ``` code block wrapped
    - Returns fallback on parse failure

    Example:
        from pulsing.agent import parse_json

        # Pure JSON
        data = parse_json('{"name": "test"}')

        # Code block wrapped
        data = parse_json('```json\\n{"name": "test"}\\n```')

        # Parse failure
        data = parse_json('invalid', fallback={})  # Returns {}
    """
    if content is None:
        return fallback

    text = str(content).strip()
    if not text:
        return fallback

    # Handle ```json ... ``` wrapping
    if text.startswith("```"):
        parts = text.split("```")
        if len(parts) >= 2:
            text = parts[1].lstrip()
            if text.lower().startswith("json"):
                text = text[4:].lstrip()

    try:
        return json.loads(text)
    except (json.JSONDecodeError, ValueError):
        return fallback

reset_llm()

Reset LLM singleton

Source code in pulsing/agent/llm.py
def reset_llm():
    """Reset LLM singleton"""
    global _default_llm
    _default_llm = None

Queue Module

Streaming - Queue (point-to-point) and Pub/Sub (topic) APIs

Queue

writer = await system.streaming.write("my_queue") # or system.queue reader = await system.streaming.read("my_queue")

Topic

writer = await system.topic.write("events") reader = await system.topic.read("events")

Classes

BucketStorage(bucket_id, storage_path, batch_size=100, backend='memory', backend_options=None)

Storage Actor for a Single Bucket

Uses pluggable StorageBackend for data storage.

Parameters:

Name Type Description Default
bucket_id int

Bucket ID

required
storage_path str

Storage path

required
batch_size int

Batch size

100
backend str | type

Backend name or backend class - "memory": Pure in-memory backend (default) - Custom name/class: Use register_backend() or pass class

'memory'
backend_options dict[str, Any] | None

Additional parameters passed to backend

None
Source code in pulsing/streaming/storage.py
def __init__(
    self,
    bucket_id: int,
    storage_path: str,
    batch_size: int = 100,
    backend: str | type = "memory",
    backend_options: dict[str, Any] | None = None,
):
    self.bucket_id = bucket_id
    self.storage_path = storage_path
    self.batch_size = batch_size
    self._backend_type = backend
    self._backend_options = backend_options or {}

    # Backend instance (initialized in on_start)
    self._backend: StorageBackend | None = None
    self._production_status: dict[int, dict[str, str]] = {}
    self._consumption_status: dict[str, set[int]] = {}
    self._key_to_index: dict[str, int] = {}

Functions

flush() async

Flush pending writes.

Returns:

Type Description
dict

{"status": "ok"}

Source code in pulsing/streaming/storage.py
async def flush(self) -> dict:
    """Flush pending writes.

    Returns:
        {"status": "ok"}
    """
    await self._backend.flush()
    return {"status": "ok"}
get(limit=100, offset=0) async

Get records.

Parameters:

Name Type Description Default
limit int

Maximum number of records to return

100
offset int

Starting offset

0

Returns:

Type Description
list[dict]

List of records

Source code in pulsing/streaming/storage.py
async def get(self, limit: int = 100, offset: int = 0) -> list[dict]:
    """Get records.

    Args:
        limit: Maximum number of records to return
        offset: Starting offset

    Returns:
        List of records
    """
    return await self._backend.get(limit, offset)
get_stream(limit=100, offset=0, wait=False, timeout=None) async

Get records as a stream.

Parameters:

Name Type Description Default
limit int

Maximum number of records to return

100
offset int

Starting offset

0
wait bool

Whether to wait for new records

False
timeout float | None

Timeout in seconds

None

Yields:

Type Description
AsyncIterator[list[dict]]

Batches of records

Source code in pulsing/streaming/storage.py
async def get_stream(
    self,
    limit: int = 100,
    offset: int = 0,
    wait: bool = False,
    timeout: float | None = None,
) -> AsyncIterator[list[dict]]:
    """Get records as a stream.

    Args:
        limit: Maximum number of records to return
        offset: Starting offset
        wait: Whether to wait for new records
        timeout: Timeout in seconds

    Yields:
        Batches of records
    """
    async for records in self._backend.get_stream(limit, offset, wait, timeout):
        yield records
put(record) async

Put a single record.

Parameters:

Name Type Description Default
record dict

Record to store

required

Returns:

Type Description
dict

{"status": "ok"}

Source code in pulsing/streaming/storage.py
async def put(self, record: dict) -> dict:
    """Put a single record.

    Args:
        record: Record to store

    Returns:
        {"status": "ok"}
    """
    if not record:
        raise ValueError("Missing 'record'")
    before = self._backend.total_count()
    await self._backend.put(record)
    fields = [k for k in record.keys() if not str(k).startswith("_")]
    self._production_status[before] = {field: "ready" for field in fields}
    return {"status": "ok"}
put_batch(records) async

Put multiple records.

Parameters:

Name Type Description Default
records list[dict]

List of records to store

required

Returns:

Type Description
dict

{"status": "ok", "count": N}

Source code in pulsing/streaming/storage.py
async def put_batch(self, records: list[dict]) -> dict:
    """Put multiple records.

    Args:
        records: List of records to store

    Returns:
        {"status": "ok", "count": N}
    """
    if not records:
        raise ValueError("Missing 'records'")
    start = self._backend.total_count()
    await self._backend.put_batch(records)
    for i, record in enumerate(records):
        fields = [k for k in record.keys() if not str(k).startswith("_")]
        self._production_status[start + i] = {field: "ready" for field in fields}
    return {"status": "ok", "count": len(records)}
stats() async

Get storage statistics.

Returns:

Type Description
dict

Statistics dict from backend

Source code in pulsing/streaming/storage.py
async def stats(self) -> dict:
    """Get storage statistics.

    Returns:
        Statistics dict from backend
    """
    return await self._backend.stats()

MemoryBackend(bucket_id, **kwargs)

Pure In-Memory Backend - Built-in Default Implementation

Features: - No persistence, data exists only in memory - Supports blocking wait for new data - Lightweight, suitable for testing and temporary data

For persistence, use a plugin that implements StorageBackend (e.g. register_backend).

Source code in pulsing/streaming/backend.py
def __init__(self, bucket_id: int, **kwargs):
    self.bucket_id = bucket_id
    self.buffer: list[dict[str, Any]] = []
    self._lock = asyncio.Lock()
    self._condition = asyncio.Condition(self._lock)

PublishMode

Bases: Enum

Publish mode

PublishResult(success, delivered, failed, subscriber_count, failed_subscribers=None) dataclass

Publish result

Queue(system, topic, bucket_column='id', num_buckets=4, batch_size=100, storage_path=None, backend='memory', backend_options=None)

Distributed Queue - High-level API

Each bucket corresponds to an independent BucketStorage Actor.

Parameters:

Name Type Description Default
system ActorSystem

Actor system

required
topic str

Queue topic

required
bucket_column str

Bucketing column name

'id'
num_buckets int

Number of buckets

4
batch_size int

Batch size

100
storage_path str | None

Storage path

None
backend str | type

Storage backend - "memory": Pure in-memory backend (default) - Custom: register_backend() or class implementing StorageBackend

'memory'
backend_options dict[str, Any] | None

Additional backend parameters

None
Source code in pulsing/streaming/queue.py
def __init__(
    self,
    system: ActorSystem,
    topic: str,
    bucket_column: str = "id",
    num_buckets: int = 4,
    batch_size: int = 100,
    storage_path: str | None = None,
    backend: str | type = "memory",
    backend_options: dict[str, Any] | None = None,
):
    self.system = system
    self.topic = topic
    self.bucket_column = bucket_column
    self.num_buckets = num_buckets
    self.batch_size = batch_size
    self.storage_path = storage_path or f"./queue_storage/{topic}"
    self.backend = backend
    self.backend_options = backend_options

    # Actor proxies for each bucket; per-bucket locks allow parallel resolution
    self._bucket_refs: dict[int, ActorProxy] = {}
    self._bucket_locks: dict[int, asyncio.Lock] = {}
    self._bucket_locks_meta = asyncio.Lock()

    # Save event loop reference (for sync wrapper)
    try:
        self._loop = asyncio.get_running_loop()
    except RuntimeError:
        self._loop = None

Functions

flush() async

Flush all bucket buffers

Source code in pulsing/streaming/queue.py
async def flush(self) -> None:
    """Flush all bucket buffers"""
    tasks = []
    for bucket_id in range(self.num_buckets):
        if bucket_id in self._bucket_refs:
            # Direct method call via proxy
            tasks.append(self._bucket_refs[bucket_id].flush())
    if tasks:
        await asyncio.gather(*tasks)
get(bucket_id=None, limit=100, offset=0, wait=False, timeout=None) async

Read data from queue (both in-memory and persistent data visible)

Source code in pulsing/streaming/queue.py
async def get(
    self,
    bucket_id: int | None = None,
    limit: int = 100,
    offset: int = 0,
    wait: bool = False,
    timeout: float | None = None,
) -> list[dict[str, Any]]:
    """Read data from queue (both in-memory and persistent data visible)"""
    if bucket_id is not None:
        return await self._get_from_bucket(bucket_id, limit, offset, wait, timeout)

    # Read from all buckets
    all_records = []
    for bid in range(self.num_buckets):
        if len(all_records) >= limit:
            break
        records = await self._get_from_bucket(
            bid, limit - len(all_records), offset, wait, timeout
        )
        all_records.extend(records)

    return all_records[:limit]
get_bucket_id(partition_value)

Calculate bucket ID based on partition column value

Source code in pulsing/streaming/queue.py
def get_bucket_id(self, partition_value: Any) -> int:
    """Calculate bucket ID based on partition column value"""
    return self._hash_partition(partition_value)
put(record) async

Write data to queue

Source code in pulsing/streaming/queue.py
async def put(
    self, record: dict[str, Any] | list[dict[str, Any]]
) -> dict[str, Any] | list[dict[str, Any]]:
    """Write data to queue"""
    if isinstance(record, dict):
        records = [record]
        single = True
    elif isinstance(record, list):
        records = record
        single = False
    else:
        raise TypeError("record must be a dict or list of dicts")

    results = []
    for rec in records:
        if self.bucket_column not in rec:
            raise ValueError(f"Missing partition column '{self.bucket_column}'")

        bucket_id = self._hash_partition(rec[self.bucket_column])
        bucket = await self._ensure_bucket(bucket_id)

        # Direct method call via proxy
        await bucket.put(rec)
        results.append({"bucket_id": bucket_id, "status": "ok"})

    return results[0] if single else results
stats() async

Get queue statistics

Source code in pulsing/streaming/queue.py
async def stats(self) -> dict[str, Any]:
    """Get queue statistics"""
    bucket_stats = {}
    for bucket_id in range(self.num_buckets):
        if bucket_id in self._bucket_refs:
            # Direct method call via proxy
            bucket_stats[bucket_id] = await self._bucket_refs[bucket_id].stats()

    return {
        "topic": self.topic,
        "bucket_column": self.bucket_column,
        "num_buckets": self.num_buckets,
        "buckets": bucket_stats,
    }
sync()

Return synchronous wrapper

Example

queue = Queue(system, topic="test") sync_queue = queue.sync() sync_queue.put({"id": "1", "value": 100}) # Synchronous write records = sync_queue.get(limit=10) # Synchronous read

Source code in pulsing/streaming/queue.py
def sync(self) -> "SyncQueue":
    """Return synchronous wrapper

    Example:
        queue = Queue(system, topic="test")
        sync_queue = queue.sync()
        sync_queue.put({"id": "1", "value": 100})  # Synchronous write
        records = sync_queue.get(limit=10)  # Synchronous read
    """
    from .sync_queue import SyncQueue

    return SyncQueue(self)

QueueAPI(system)

Queue API entry point via system.queue

Source code in pulsing/streaming/__init__.py
def __init__(self, system: "ActorSystem"):
    self._system = system

Functions

read(topic, *, bucket_id=None, bucket_ids=None, rank=None, world_size=None, num_buckets=4, storage_path=None, backend='memory', backend_options=None) async

Open queue for reading

Source code in pulsing/streaming/__init__.py
async def read(
    self,
    topic: str,
    *,
    bucket_id: int | None = None,
    bucket_ids: list[int] | None = None,
    rank: int | None = None,
    world_size: int | None = None,
    num_buckets: int = 4,
    storage_path: str | None = None,
    backend: str | type = "memory",
    backend_options: dict[str, Any] | None = None,
) -> QueueReader:
    """Open queue for reading"""
    return await read_queue(
        self._system,
        topic,
        bucket_id=bucket_id,
        bucket_ids=bucket_ids,
        rank=rank,
        world_size=world_size,
        num_buckets=num_buckets,
        storage_path=storage_path,
        backend=backend,
        backend_options=backend_options,
    )
write(topic, *, bucket_column='id', num_buckets=4, batch_size=100, storage_path=None, backend='memory', backend_options=None) async

Open queue for writing

Source code in pulsing/streaming/__init__.py
async def write(
    self,
    topic: str,
    *,
    bucket_column: str = "id",
    num_buckets: int = 4,
    batch_size: int = 100,
    storage_path: str | None = None,
    backend: str | type = "memory",
    backend_options: dict[str, Any] | None = None,
) -> QueueWriter:
    """Open queue for writing"""
    return await write_queue(
        self._system,
        topic,
        bucket_column=bucket_column,
        num_buckets=num_buckets,
        batch_size=batch_size,
        storage_path=storage_path,
        backend=backend,
        backend_options=backend_options,
    )

QueueReader(queue, bucket_ids=None)

Queue read handle

Supports three modes: 1. bucket_ids=None: Read from all buckets 2. bucket_ids=[0, 2]: Read from specified bucket list 3. Auto-assign buckets via rank/world_size (distributed consumption)

Source code in pulsing/streaming/queue.py
def __init__(self, queue: Queue, bucket_ids: list[int] | None = None):
    self.queue = queue
    self.bucket_ids = bucket_ids  # None means read from all buckets
    # Each bucket independently maintains offset
    self._offsets: dict[int, int] = {}

Functions

get(limit=100, wait=False, timeout=None) async

Read data from assigned buckets

Source code in pulsing/streaming/queue.py
async def get(
    self,
    limit: int = 100,
    wait: bool = False,
    timeout: float | None = None,
) -> list[dict[str, Any]]:
    """Read data from assigned buckets"""
    bucket_ids = self._get_bucket_ids()
    all_records = []

    for bid in bucket_ids:
        if len(all_records) >= limit:
            break

        offset = self._offsets.get(bid, 0)
        records = await self.queue._get_from_bucket(
            bid, limit - len(all_records), offset, wait, timeout
        )
        self._offsets[bid] = offset + len(records)
        all_records.extend(records)

    return all_records[:limit]
reset()

Reset offsets for all buckets

Source code in pulsing/streaming/queue.py
def reset(self) -> None:
    """Reset offsets for all buckets"""
    self._offsets.clear()
set_offset(offset, bucket_id=None)

Set offset

Source code in pulsing/streaming/queue.py
def set_offset(self, offset: int, bucket_id: int | None = None) -> None:
    """Set offset"""
    if bucket_id is not None:
        self._offsets[bucket_id] = offset
    else:
        for bid in self._get_bucket_ids():
            self._offsets[bid] = offset
sync()

Return synchronous wrapper

Source code in pulsing/streaming/queue.py
def sync(self) -> "SyncQueueReader":
    """Return synchronous wrapper"""
    from .sync_queue import SyncQueueReader

    return SyncQueueReader(self)

QueueWriter(queue)

Queue write handle

Source code in pulsing/streaming/queue.py
def __init__(self, queue: Queue):
    self.queue = queue

Functions

sync()

Return synchronous wrapper

Source code in pulsing/streaming/queue.py
def sync(self) -> "SyncQueueWriter":
    """Return synchronous wrapper"""
    from .sync_queue import SyncQueueWriter

    return SyncQueueWriter(self)

StorageBackend

Bases: Protocol

Storage Backend Protocol

All storage backends must implement this protocol. Can be implemented via inheritance or duck typing.

Functions

flush() abstractmethod async

Flush buffer to persistent storage

Source code in pulsing/streaming/backend.py
@abstractmethod
async def flush(self) -> None:
    """Flush buffer to persistent storage"""
    ...
get(limit, offset) abstractmethod async

Read records

Source code in pulsing/streaming/backend.py
@abstractmethod
async def get(self, limit: int, offset: int) -> list[dict[str, Any]]:
    """Read records"""
    ...
get_data(batch_meta, fields=None) async

Optional tensor-native batch data API.

Source code in pulsing/streaming/backend.py
async def get_data(self, batch_meta: Any, fields: list[str] | None = None) -> Any:
    """Optional tensor-native batch data API."""
    raise NotImplementedError
get_meta(fields, batch_size, task_name='default', sampler=None, **sampling_kwargs) async

Optional tensor-native metadata API.

Source code in pulsing/streaming/backend.py
async def get_meta(
    self,
    fields: list[str],
    batch_size: int,
    task_name: str = "default",
    sampler: Any = None,
    **sampling_kwargs: Any,
) -> Any:
    """Optional tensor-native metadata API."""
    raise NotImplementedError
get_stream(limit, offset, wait=False, timeout=None) abstractmethod async

Stream read records

Source code in pulsing/streaming/backend.py
@abstractmethod
async def get_stream(
    self,
    limit: int,
    offset: int,
    wait: bool = False,
    timeout: float | None = None,
) -> AsyncIterator[list[dict[str, Any]]]:
    """Stream read records"""
    ...
put(record) abstractmethod async

Write a single record

Source code in pulsing/streaming/backend.py
@abstractmethod
async def put(self, record: dict[str, Any]) -> None:
    """Write a single record"""
    ...
put_batch(records) abstractmethod async

Write records in batch

Source code in pulsing/streaming/backend.py
@abstractmethod
async def put_batch(self, records: list[dict[str, Any]]) -> None:
    """Write records in batch"""
    ...
put_tensor(data, **kwargs) async

Optional tensor-native put API.

Source code in pulsing/streaming/backend.py
async def put_tensor(self, data: Any, **kwargs: Any) -> Any:
    """Optional tensor-native put API."""
    raise NotImplementedError
stats() abstractmethod async

Get statistics

Source code in pulsing/streaming/backend.py
@abstractmethod
async def stats(self) -> dict[str, Any]:
    """Get statistics"""
    ...
total_count() abstractmethod

Total record count

Source code in pulsing/streaming/backend.py
@abstractmethod
def total_count(self) -> int:
    """Total record count"""
    ...

StorageManager(system, base_storage_path='./queue_storage', default_backend='memory')

Storage manager Actor

One instance per node, responsible for: 1. Receiving GetBucket/GetTopic requests 2. Determining if resource belongs to this node (consistent hashing) 3. If belongs to this node: create/return corresponding Actor 4. If not belongs to this node: return Redirect response pointing to correct node

Supported resource types: - Queue Bucket: GetBucket -> BucketStorage Actor - Topic Broker: GetTopic -> TopicBroker Actor

Source code in pulsing/streaming/manager.py
def __init__(
    self,
    system: ActorSystem,
    base_storage_path: str = "./queue_storage",
    default_backend: str | type = "memory",
):
    self.system = system
    self.base_storage_path = base_storage_path
    self.default_backend = default_backend

    # Buckets managed by this node: {(topic, bucket_id): ActorRef}
    self._buckets: dict[tuple[str, int], ActorRef] = {}
    # Topic brokers managed by this node: {topic_name: ActorRef}
    self._topics: dict[str, ActorRef] = {}
    # Per-resource locks so different buckets/topics can be created in parallel
    self._bucket_locks: dict[tuple[str, int], asyncio.Lock] = {}
    self._topic_locks: dict[str, asyncio.Lock] = {}
    self._locks_meta = asyncio.Lock()

    # Cached cluster member information
    self._members: list[dict] = []
    self._members_updated_at: float = 0

Functions

get_bucket(topic, bucket_id, batch_size=100, storage_path=None, backend=None, backend_options=None) async

Get bucket reference.

Returns:

Type Description
dict
  • {"_type": "BucketReady", "topic": ..., "bucket_id": ..., "actor_id": ..., "node_id": ...}
dict
  • {"_type": "Redirect", "topic": ..., "bucket_id": ..., "owner_node_id": ..., "owner_addr": ...}
Source code in pulsing/streaming/manager.py
async def get_bucket(
    self,
    topic: str,
    bucket_id: int,
    batch_size: int = 100,
    storage_path: str | None = None,
    backend: str | None = None,
    backend_options: dict | None = None,
) -> dict:
    """Get bucket reference.

    Returns:
        - {"_type": "BucketReady", "topic": ..., "bucket_id": ..., "actor_id": ..., "node_id": ...}
        - {"_type": "Redirect", "topic": ..., "bucket_id": ..., "owner_node_id": ..., "owner_addr": ...}
    """
    # Compute owner
    bucket_key = self._bucket_key(topic, bucket_id)
    members = await self._refresh_members()
    owner_node_id = _compute_owner(bucket_key, members)
    local_node_id = str(self.system.node_id.id)

    if owner_node_id is None or str(owner_node_id) == local_node_id:
        # This node is responsible, create/return bucket
        bucket_ref = await self._get_or_create_bucket(
            topic, bucket_id, batch_size, storage_path, backend, backend_options
        )
        return {
            "_type": "BucketReady",
            "topic": topic,
            "bucket_id": bucket_id,
            "actor_id": str(bucket_ref.actor_id.id),
            "node_id": str(local_node_id),
        }
    else:
        # Not owned by this node, return redirect
        owner_addr = None
        for m in members:
            m_node_id = m.get("node_id")
            if m_node_id is not None and str(m_node_id) == str(owner_node_id):
                owner_addr = m.get("addr")
                break

        return {
            "_type": "Redirect",
            "topic": topic,
            "bucket_id": bucket_id,
            "owner_node_id": str(owner_node_id),
            "owner_addr": owner_addr,
        }
get_stats() async

Get storage manager statistics.

Returns:

Type Description
dict

{"node_id": ..., "bucket_count": ..., "topic_count": ..., "buckets": [...], "topics": [...]}

Source code in pulsing/streaming/manager.py
async def get_stats(self) -> dict:
    """Get storage manager statistics.

    Returns:
        {"node_id": ..., "bucket_count": ..., "topic_count": ..., "buckets": [...], "topics": [...]}
    """
    return {
        "node_id": str(self.system.node_id.id),
        "bucket_count": len(self._buckets),
        "topic_count": len(self._topics),
        "buckets": [
            {"topic": t, "bucket_id": b} for (t, b) in self._buckets.keys()
        ],
        "topics": list(self._topics.keys()),
    }
get_topic(topic) async

Get topic broker reference.

Returns:

Type Description
dict
  • {"_type": "TopicReady", "topic": ..., "actor_id": ..., "node_id": ...}
dict
  • {"_type": "Redirect", "topic": ..., "owner_node_id": ..., "owner_addr": ...}
Source code in pulsing/streaming/manager.py
async def get_topic(self, topic: str) -> dict:
    """Get topic broker reference.

    Returns:
        - {"_type": "TopicReady", "topic": ..., "actor_id": ..., "node_id": ...}
        - {"_type": "Redirect", "topic": ..., "owner_node_id": ..., "owner_addr": ...}
    """
    # Compute owner
    topic_key = self._topic_key(topic)
    members = await self._refresh_members()
    owner_node_id = _compute_owner(topic_key, members)
    local_node_id = str(self.system.node_id.id)

    if owner_node_id is None or str(owner_node_id) == local_node_id:
        # This node is responsible, create/return topic broker
        broker_ref = await self._get_or_create_topic_broker(topic)
        return {
            "_type": "TopicReady",
            "topic": topic,
            "actor_id": str(broker_ref.actor_id.id),
            "node_id": str(local_node_id),
        }
    else:
        # Not owned by this node, return redirect
        owner_addr = None
        for m in members:
            m_node_id = m.get("node_id")
            if m_node_id is not None and str(m_node_id) == str(owner_node_id):
                owner_addr = m.get("addr")
                break

        return {
            "_type": "Redirect",
            "topic": topic,
            "owner_node_id": str(owner_node_id),
            "owner_addr": owner_addr,
        }
list_buckets() async

List all buckets managed by this node.

Returns:

Type Description
list[dict]

List of {"topic": ..., "bucket_id": ...}

Source code in pulsing/streaming/manager.py
async def list_buckets(self) -> list[dict]:
    """List all buckets managed by this node.

    Returns:
        List of {"topic": ..., "bucket_id": ...}
    """
    return [
        {"topic": topic, "bucket_id": bid} for (topic, bid) in self._buckets.keys()
    ]
list_topics() async

List all topics managed by this node.

Returns:

Type Description
list[str]

List of topic names

Source code in pulsing/streaming/manager.py
async def list_topics(self) -> list[str]:
    """List all topics managed by this node.

    Returns:
        List of topic names
    """
    return list(self._topics.keys())

SyncQueue(queue)

Synchronous queue wrapper

Source code in pulsing/streaming/sync_queue.py
def __init__(self, queue: "Queue"):
    self._queue = queue
    self._loop = queue._loop

SyncQueueReader(reader)

Synchronous reader wrapper

Source code in pulsing/streaming/sync_queue.py
def __init__(self, reader: "QueueReader"):
    self._reader = reader
    self._loop = reader.queue._loop

SyncQueueWriter(writer)

Synchronous writer wrapper

Source code in pulsing/streaming/sync_queue.py
def __init__(self, writer: "QueueWriter"):
    self._writer = writer
    self._loop = writer.queue._loop

TopicAPI(system)

Topic API entry point via system.topic

Source code in pulsing/streaming/__init__.py
def __init__(self, system: "ActorSystem"):
    self._system = system

Functions

read(topic, *, reader_id=None, auto_start=False) async

Open topic for reading

Source code in pulsing/streaming/__init__.py
async def read(
    self,
    topic: str,
    *,
    reader_id: str | None = None,
    auto_start: bool = False,
) -> TopicReader:
    """Open topic for reading"""
    return await read_topic(
        self._system, topic, reader_id=reader_id, auto_start=auto_start
    )
write(topic, *, writer_id=None) async

Open topic for writing

Source code in pulsing/streaming/__init__.py
async def write(
    self,
    topic: str,
    *,
    writer_id: str | None = None,
) -> TopicWriter:
    """Open topic for writing"""
    return await write_topic(self._system, topic, writer_id=writer_id)

TopicReader(system, topic, reader_id=None)

Topic read handle

Example

reader = await read_topic(system, "events")

@reader.on_message async def handle(msg): print(f"Received: {msg}")

await reader.start()

Source code in pulsing/streaming/pubsub.py
def __init__(self, system: ActorSystem, topic: str, reader_id: str | None = None):
    self._system = system
    self._topic = topic
    self._reader_id = reader_id or f"reader_{uuid.uuid4().hex[:8]}"
    self._callbacks: list[MessageCallback] = []
    self._subscriber_ref: "ActorRef | None" = None
    self._started = False

Functions

add_callback(callback)

Add message callback

Source code in pulsing/streaming/pubsub.py
def add_callback(self, callback: MessageCallback) -> None:
    """Add message callback"""
    self._callbacks.append(callback)
on_message(callback)

Register message callback (decorator style)

Example

@reader.on_message async def handle(msg): print(f"Received: {msg}")

Source code in pulsing/streaming/pubsub.py
def on_message(self, callback: MessageCallback) -> MessageCallback:
    """Register message callback (decorator style)

    Example:
        @reader.on_message
        async def handle(msg):
            print(f"Received: {msg}")
    """
    self._callbacks.append(callback)
    return callback
remove_callback(callback)

Remove message callback

Source code in pulsing/streaming/pubsub.py
def remove_callback(self, callback: MessageCallback) -> bool:
    """Remove message callback"""
    try:
        self._callbacks.remove(callback)
        return True
    except ValueError:
        return False
start() async

Start receiving messages

Source code in pulsing/streaming/pubsub.py
async def start(self) -> None:
    """Start receiving messages"""
    if self._started:
        return

    if not self._callbacks:
        raise ValueError("at least one callback required")

    # Create subscriber Actor
    actor_name = f"_topic_sub_{self._topic}_{self._reader_id}"
    subscriber = _SubscriberActor(self._callbacks)
    self._subscriber_ref = await self._system.spawn(
        subscriber, name=actor_name, public=True
    )

    # Register with broker using direct method call
    broker = await _get_broker(self._system, self._topic)
    await broker.subscribe(
        self._reader_id,
        actor_name,
        node_id=self._system.node_id.id,
    )

    self._started = True
    logger.debug(f"TopicReader[{self._reader_id}] started for topic: {self._topic}")
stats() async

Get topic statistics

Source code in pulsing/streaming/pubsub.py
async def stats(self) -> dict[str, Any]:
    """Get topic statistics"""
    broker = await _get_broker(self._system, self._topic)
    # Direct method call on broker proxy
    return await broker.get_stats()
stop() async

Stop receiving messages

Source code in pulsing/streaming/pubsub.py
async def stop(self) -> None:
    """Stop receiving messages"""
    if not self._started:
        return

    # Unsubscribe from broker using direct method call
    try:
        broker = await _get_broker(self._system, self._topic)
        await broker.unsubscribe(self._reader_id)
    except Exception as e:
        logger.warning(f"Unsubscribe error: {e}")

    # Stop subscriber Actor
    if self._subscriber_ref:
        try:
            actor_name = f"_topic_sub_{self._topic}_{self._reader_id}"
            await self._system.stop(actor_name)
        except Exception as e:
            logger.warning(f"Stop subscriber error: {e}")

    self._started = False
    self._subscriber_ref = None
    logger.debug(f"TopicReader[{self._reader_id}] stopped")

TopicWriter(system, topic, writer_id=None)

Topic write handle

Source code in pulsing/streaming/pubsub.py
def __init__(self, system: ActorSystem, topic: str, writer_id: str | None = None):
    self._system = system
    self._topic = topic
    self._writer_id = writer_id or f"writer_{uuid.uuid4().hex[:8]}"
    self._broker: "ActorProxy | None" = None

Functions

publish(message, mode=PublishMode.FIRE_AND_FORGET, timeout=None) async

Publish message

Parameters:

Name Type Description Default
message Any

Message to publish (any Python object)

required
mode PublishMode

Publish mode

FIRE_AND_FORGET
timeout float | None

Timeout in seconds. None means use default timeout. For WAIT_ANY_ACK and WAIT_ALL_ACKS modes, local task will be cancelled after timeout, but remote handler may still be executing (relies on HTTP/2 RST_STREAM to propagate cancellation).

None

Returns:

Name Type Description
PublishResult PublishResult

Publish result

Raises:

Type Description
TimeoutError

Timeout

RuntimeError

Other errors

Source code in pulsing/streaming/pubsub.py
async def publish(
    self,
    message: Any,
    mode: PublishMode = PublishMode.FIRE_AND_FORGET,
    timeout: float | None = None,
) -> PublishResult:
    """Publish message

    Args:
        message: Message to publish (any Python object)
        mode: Publish mode
        timeout: Timeout in seconds. None means use default timeout.
                 For WAIT_ANY_ACK and WAIT_ALL_ACKS modes, local task will be cancelled after timeout,
                 but remote handler may still be executing (relies on HTTP/2 RST_STREAM to propagate cancellation).

    Returns:
        PublishResult: Publish result

    Raises:
        asyncio.TimeoutError: Timeout
        RuntimeError: Other errors
    """
    broker = await self._broker_ref()

    # Determine timeout value
    effective_timeout = timeout if timeout is not None else DEFAULT_PUBLISH_TIMEOUT

    async def _do_publish():
        # Direct method call on broker proxy
        return await broker.publish(
            message,
            mode=mode.value,
            sender_id=self._writer_id,
            timeout=effective_timeout,
        )

    data = await asyncio.wait_for(_do_publish(), timeout=effective_timeout)

    return PublishResult(
        success=data.get("success", False),
        delivered=data.get("delivered", 0),
        failed=data.get("failed", 0),
        subscriber_count=data.get("subscriber_count", 0),
        failed_subscribers=data.get("failed_subscribers"),
    )
stats() async

Get topic statistics

Source code in pulsing/streaming/pubsub.py
async def stats(self) -> dict[str, Any]:
    """Get topic statistics"""
    broker = await self._broker_ref()
    # Direct method call on broker proxy
    return await broker.get_stats()

Functions

get_backend_class(backend)

Get backend class

Parameters:

Name Type Description Default
backend str | type

Backend name (str) or backend class (type)

required

Returns:

Type Description
type

Backend class

Source code in pulsing/streaming/backend.py
def get_backend_class(backend: str | type) -> type:
    """Get backend class

    Args:
        backend: Backend name (str) or backend class (type)

    Returns:
        Backend class
    """
    if isinstance(backend, type):
        return backend

    if backend in _REGISTERED_BACKENDS:
        return _REGISTERED_BACKENDS[backend]

    if backend in _BUILTIN_BACKENDS:
        return _BUILTIN_BACKENDS[backend]

    available = list(_BUILTIN_BACKENDS.keys()) + list(_REGISTERED_BACKENDS.keys())
    raise ValueError(
        f"Unknown backend: {backend}. Available: {available}. "
        "Use register_backend() to add custom backends."
    )

get_bucket_ref(system, topic, bucket_id, batch_size=100, storage_path=None, backend=None, backend_options=None, max_redirects=3) async

Get ActorProxy for specified bucket

Automatically handles redirects to ensure getting the bucket on the correct node. Returns ActorProxy for direct method calls on BucketStorage.

Parameters:

Name Type Description Default
system ActorSystem

Actor system

required
topic str

Queue topic

required
bucket_id int

Bucket ID

required
batch_size int

Batch size

100
storage_path str | None

Custom storage path (optional)

None
backend str | type | None

Storage backend name or class (optional)

None
backend_options dict | None

Additional backend options (optional)

None
max_redirects int

Maximum redirect count

3
Source code in pulsing/streaming/manager.py
async def get_bucket_ref(
    system: ActorSystem,
    topic: str,
    bucket_id: int,
    batch_size: int = 100,
    storage_path: str | None = None,
    backend: str | type | None = None,
    backend_options: dict | None = None,
    max_redirects: int = 3,
) -> "ActorProxy":
    """Get ActorProxy for specified bucket

    Automatically handles redirects to ensure getting the bucket on the correct node.
    Returns ActorProxy for direct method calls on BucketStorage.

    Args:
        system: Actor system
        topic: Queue topic
        bucket_id: Bucket ID
        batch_size: Batch size
        storage_path: Custom storage path (optional)
        backend: Storage backend name or class (optional)
        backend_options: Additional backend options (optional)
        max_redirects: Maximum redirect count
    """
    # Request from local StorageManager first
    manager = await get_storage_manager(system)

    # Convert backend class to name if needed
    backend_name = None
    if backend:
        backend_name = backend if isinstance(backend, str) else backend.__name__

    for redirect_count in range(max_redirects + 1):
        # Call manager.get_bucket() via proxy
        resp_data = await manager.get_bucket(
            topic=topic,
            bucket_id=bucket_id,
            batch_size=batch_size,
            storage_path=storage_path,
            backend=backend_name,
            backend_options=backend_options,
        )

        msg_type = resp_data.get("_type", "")

        if msg_type == "BucketReady":
            # Successfully got bucket - resolve by actor name for typed proxy
            actor_name = f"bucket_{topic}_{bucket_id}"
            # Use BucketStorage.resolve to get typed ActorProxy
            return await BucketStorage.resolve(actor_name, system=system)

        elif msg_type == "Redirect":
            # Need to redirect to other node
            # owner_node_id transmitted as string, keep as string for comparison
            owner_node_id_str = str(resp_data.get("owner_node_id"))
            owner_addr = resp_data.get("owner_addr")

            logger.debug(
                f"Redirecting bucket {topic}:{bucket_id} to node {owner_node_id_str} @ {owner_addr}"
            )

            if redirect_count >= max_redirects:
                raise RuntimeError(f"Too many redirects for bucket {topic}:{bucket_id}")

            # Check if redirecting to self (avoid infinite loop)
            # Compare as strings for consistency
            if str(owner_node_id_str) == str(system.node_id.id):
                raise RuntimeError(
                    f"Redirect loop detected for bucket {topic}:{bucket_id}"
                )

            # Get owner node's StorageManager (with retry, wait for remote node initialization)
            # Convert to int for resolve_named which expects int
            owner_node_id_int = int(owner_node_id_str)
            max_resolve_retries = 10
            for resolve_retry in range(max_resolve_retries):
                try:
                    manager = await StorageManager.resolve(
                        STORAGE_MANAGER_NAME, system=system, node_id=owner_node_id_int
                    )
                    break
                except Exception as e:
                    if resolve_retry < max_resolve_retries - 1:
                        logger.debug(
                            f"StorageManager not found on node {owner_node_id_str}, "
                            f"retry {resolve_retry + 1}/{max_resolve_retries}"
                        )
                        await asyncio.sleep(0.5)
                    else:
                        raise RuntimeError(
                            f"StorageManager not found on node {owner_node_id_str} after "
                            f"{max_resolve_retries} retries: {e}"
                        ) from e

        else:
            raise RuntimeError(f"Unexpected response: {msg_type}")

    raise RuntimeError(f"Failed to get bucket {topic}:{bucket_id}")

get_storage_manager(system) async

Get StorageManager proxy for this node, create if not exists.

Returns:

Type Description
ActorProxy

ActorProxy for direct method calls on StorageManager

Source code in pulsing/streaming/manager.py
async def get_storage_manager(system: ActorSystem) -> "ActorProxy":
    """Get StorageManager proxy for this node, create if not exists.

    Returns:
        ActorProxy for direct method calls on StorageManager
    """
    local_node_id = system.node_id.id

    # Try to resolve local node's StorageManager
    try:
        return await StorageManager.resolve(
            STORAGE_MANAGER_NAME, system=system, node_id=local_node_id
        )
    except Exception:
        pass

    async with _get_manager_lock():
        # Check local node again
        try:
            return await StorageManager.resolve(
                STORAGE_MANAGER_NAME, system=system, node_id=local_node_id
            )
        except Exception:
            pass

        # Create new StorageManager using .local()
        try:
            return await StorageManager.local(
                system, system, name=STORAGE_MANAGER_NAME, public=True
            )
        except Exception as e:
            if "already exists" in str(e).lower():
                return await StorageManager.resolve(
                    STORAGE_MANAGER_NAME, system=system, node_id=local_node_id
                )
            raise

get_topic_broker(system, topic, max_redirects=3) async

Get broker ActorProxy for specified topic

Automatically handles redirects to ensure getting the broker on the correct node. Returns ActorProxy for direct method calls on TopicBroker.

Parameters:

Name Type Description Default
system ActorSystem

Actor system

required
topic str

Topic name

required
max_redirects int

Maximum redirect count

3
Source code in pulsing/streaming/manager.py
async def get_topic_broker(
    system: ActorSystem,
    topic: str,
    max_redirects: int = 3,
) -> "ActorProxy":
    """Get broker ActorProxy for specified topic

    Automatically handles redirects to ensure getting the broker on the correct node.
    Returns ActorProxy for direct method calls on TopicBroker.

    Args:
        system: Actor system
        topic: Topic name
        max_redirects: Maximum redirect count
    """
    from pulsing.streaming.broker import TopicBroker

    manager = await get_storage_manager(system)

    for redirect_count in range(max_redirects + 1):
        # Call manager.get_topic() via proxy
        resp_data = await manager.get_topic(topic=topic)
        msg_type = resp_data.get("_type", "")

        if msg_type == "TopicReady":
            # Successfully got topic - resolve by actor name for typed proxy
            actor_name = f"_topic_broker_{topic}"
            return await TopicBroker.resolve(actor_name, system=system)

        elif msg_type == "Redirect":
            # owner_node_id transmitted as string, keep as string for comparison
            owner_node_id_str = str(resp_data["owner_node_id"])

            logger.debug(f"Redirecting topic {topic} to node {owner_node_id_str}")

            if redirect_count >= max_redirects:
                raise RuntimeError(f"Too many redirects for topic: {topic}")

            # Compare as strings for consistency
            if str(owner_node_id_str) == str(system.node_id.id):
                raise RuntimeError(f"Redirect loop for topic: {topic}")

            # Get owner node's StorageManager via proxy
            # Convert to int for resolve_named which expects int
            owner_node_id_int = int(owner_node_id_str)
            for retry in range(10):
                try:
                    manager = await StorageManager.resolve(
                        STORAGE_MANAGER_NAME, system=system, node_id=owner_node_id_int
                    )
                    break
                except Exception as e:
                    if retry < 9:
                        await asyncio.sleep(0.5)
                    else:
                        raise RuntimeError(
                            f"StorageManager not found on node {owner_node_id_str}: {e}"
                        ) from e

        else:
            raise RuntimeError(f"Unexpected response: {msg_type}")

    raise RuntimeError(f"Failed to get topic broker: {topic}")

list_backends()

List all available backends

Source code in pulsing/streaming/backend.py
def list_backends() -> list[str]:
    """List all available backends"""
    return list(_BUILTIN_BACKENDS.keys()) + list(_REGISTERED_BACKENDS.keys())

read_queue(system, topic, bucket_id=None, bucket_ids=None, rank=None, world_size=None, num_buckets=4, storage_path=None, backend='memory', backend_options=None) async

Open queue for reading

Supports three modes: 1. Default: Read from all buckets 2. bucket_id/bucket_ids: Read from specified buckets 3. rank/world_size: Distributed consumption, auto-assign buckets

Parameters:

Name Type Description Default
system ActorSystem

Actor system

required
topic str

Queue topic

required
bucket_id int | None

Single bucket ID (mutually exclusive with bucket_ids)

None
bucket_ids list[int] | None

List of bucket IDs (mutually exclusive with bucket_id)

None
rank int | None

Current consumer's rank (0 to world_size-1)

None
world_size int | None

Total number of consumers

None
num_buckets int

Number of buckets (for rank/world_size mode)

4
storage_path str | None

Storage path

None
backend str | type

Storage backend (must match backend used for writing)

'memory'
backend_options dict[str, Any] | None

Additional backend parameters

None
Example

Distributed consumption: 4 buckets, 2 consumers

reader0 = await read_queue(system, "q", rank=0, world_size=2) # bucket 0, 2 reader1 = await read_queue(system, "q", rank=1, world_size=2) # bucket 1, 3

Source code in pulsing/streaming/queue.py
async def read_queue(
    system: ActorSystem,
    topic: str,
    bucket_id: int | None = None,
    bucket_ids: list[int] | None = None,
    rank: int | None = None,
    world_size: int | None = None,
    num_buckets: int = 4,
    storage_path: str | None = None,
    backend: str | type = "memory",
    backend_options: dict[str, Any] | None = None,
) -> QueueReader:
    """Open queue for reading

    Supports three modes:
    1. Default: Read from all buckets
    2. bucket_id/bucket_ids: Read from specified buckets
    3. rank/world_size: Distributed consumption, auto-assign buckets

    Args:
        system: Actor system
        topic: Queue topic
        bucket_id: Single bucket ID (mutually exclusive with bucket_ids)
        bucket_ids: List of bucket IDs (mutually exclusive with bucket_id)
        rank: Current consumer's rank (0 to world_size-1)
        world_size: Total number of consumers
        num_buckets: Number of buckets (for rank/world_size mode)
        storage_path: Storage path
        backend: Storage backend (must match backend used for writing)
        backend_options: Additional backend parameters

    Example:
        # Distributed consumption: 4 buckets, 2 consumers
        reader0 = await read_queue(system, "q", rank=0, world_size=2)  # bucket 0, 2
        reader1 = await read_queue(system, "q", rank=1, world_size=2)  # bucket 1, 3
    """
    # Ensure all nodes in cluster have StorageManager
    from .manager import ensure_storage_managers

    await ensure_storage_managers(system)

    # Determine list of buckets to read from
    if rank is not None and world_size is not None:
        # Distributed consumption mode
        assigned_buckets = _assign_buckets(num_buckets, rank, world_size)
        logger.info(
            f"Reader rank={rank}/{world_size} assigned buckets: {assigned_buckets}"
        )
    elif bucket_id is not None:
        assigned_buckets = [bucket_id]
    elif bucket_ids is not None:
        assigned_buckets = bucket_ids
    else:
        assigned_buckets = None  # Read from all buckets

    # Create Queue (reader side doesn't need bucket_column for hashing, but it must
    # keep `num_buckets/storage_path/backend` consistent with writer).
    queue = Queue(
        system=system,
        topic=topic,
        num_buckets=num_buckets,
        storage_path=storage_path,
        backend=backend,
        backend_options=backend_options,
    )

    # Try to resolve existing bucket Actors
    if assigned_buckets:
        from .storage import BucketStorage

        for bid in assigned_buckets:
            # Must match `StorageManager` bucket actor naming: "bucket_{topic}_{bucket_id}"
            actor_name = f"bucket_{topic}_{bid}"
            try:
                # Use BucketStorage.resolve to get typed ActorProxy
                queue._bucket_refs[bid] = await BucketStorage.resolve(
                    actor_name, system=system
                )
            except Exception:
                pass

    return QueueReader(queue, bucket_ids=assigned_buckets)

read_topic(system, topic, reader_id=None, auto_start=False) async

Open topic for reading

Parameters:

Name Type Description Default
system ActorSystem

Actor system

required
topic str

Topic name

required
reader_id str | None

Reader ID (optional)

None
auto_start bool

Whether to automatically start receiving

False

Returns:

Name Type Description
TopicReader TopicReader

Read handle

Example

reader = await read_topic(system, "events")

@reader.on_message async def handle(msg): print(f"Received: {msg}")

await reader.start()

Source code in pulsing/streaming/pubsub.py
async def read_topic(
    system: ActorSystem,
    topic: str,
    reader_id: str | None = None,
    auto_start: bool = False,
) -> TopicReader:
    """Open topic for reading

    Args:
        system: Actor system
        topic: Topic name
        reader_id: Reader ID (optional)
        auto_start: Whether to automatically start receiving

    Returns:
        TopicReader: Read handle

    Example:
        reader = await read_topic(system, "events")

        @reader.on_message
        async def handle(msg):
            print(f"Received: {msg}")

        await reader.start()
    """
    reader = TopicReader(system, topic, reader_id)
    if auto_start:
        await reader.start()
    return reader

register_backend(name, backend_class)

Register a custom storage backend (e.g. from a plugin package).

Example

from my_plugin import MyBackend register_backend("my_backend", MyBackend) writer = await write_queue(system, "topic", backend="my_backend")

Source code in pulsing/streaming/backend.py
def register_backend(name: str, backend_class: type) -> None:
    """Register a custom storage backend (e.g. from a plugin package).

    Example:
        from my_plugin import MyBackend
        register_backend("my_backend", MyBackend)
        writer = await write_queue(system, "topic", backend="my_backend")
    """
    if not isinstance(backend_class, type):
        raise TypeError(f"backend_class must be a class, got {type(backend_class)}")
    _REGISTERED_BACKENDS[name] = backend_class
    logger.info(f"Registered storage backend: {name}")

subscribe_to_topic(system, topic, subscriber_id, actor_name, node_id=None) async

Subscribe an actor to a topic.

This is a helper function for manually registering subscribers with a topic broker. For normal usage, prefer using TopicReader which handles this automatically.

Parameters:

Name Type Description Default
system ActorSystem

ActorSystem instance

required
topic str

Topic name

required
subscriber_id str

Unique subscriber identifier

required
actor_name str

Name of the actor to receive messages

required
node_id int | None

Optional node ID (defaults to local node)

None

Returns:

Type Description
dict

Response dict from broker

Raises:

Type Description
RuntimeError

If subscription fails

Source code in pulsing/streaming/pubsub.py
async def subscribe_to_topic(
    system: ActorSystem,
    topic: str,
    subscriber_id: str,
    actor_name: str,
    node_id: int | None = None,
) -> dict:
    """Subscribe an actor to a topic.

    This is a helper function for manually registering subscribers with a topic broker.
    For normal usage, prefer using TopicReader which handles this automatically.

    Args:
        system: ActorSystem instance
        topic: Topic name
        subscriber_id: Unique subscriber identifier
        actor_name: Name of the actor to receive messages
        node_id: Optional node ID (defaults to local node)

    Returns:
        Response dict from broker

    Raises:
        RuntimeError: If subscription fails
    """
    broker = await _get_broker(system, topic)
    # Direct method call on broker proxy
    return await broker.subscribe(subscriber_id, actor_name, node_id)

write_queue(system, topic, bucket_column='id', num_buckets=4, batch_size=100, storage_path=None, backend='memory', backend_options=None) async

Open queue for writing

Parameters:

Name Type Description Default
system ActorSystem

Actor system

required
topic str

Queue topic

required
bucket_column str

Bucketing column name

'id'
num_buckets int

Number of buckets

4
batch_size int

Batch size

100
storage_path str | None

Storage path

None
backend str | type

Storage backend - "memory": Pure in-memory backend (default) - Custom: register_backend() or pass StorageBackend class

'memory'
backend_options dict[str, Any] | None

Additional backend parameters

None
Example

writer = await write_queue(system, "my_queue")

Custom backend from a plugin

from my_plugin import MyBackend from .backend import register_backend register_backend("my_backend", MyBackend) writer = await write_queue(system, "my_queue", backend="my_backend")

Source code in pulsing/streaming/queue.py
async def write_queue(
    system: ActorSystem,
    topic: str,
    bucket_column: str = "id",
    num_buckets: int = 4,
    batch_size: int = 100,
    storage_path: str | None = None,
    backend: str | type = "memory",
    backend_options: dict[str, Any] | None = None,
) -> QueueWriter:
    """Open queue for writing

    Args:
        system: Actor system
        topic: Queue topic
        bucket_column: Bucketing column name
        num_buckets: Number of buckets
        batch_size: Batch size
        storage_path: Storage path
        backend: Storage backend
            - "memory": Pure in-memory backend (default)
            - Custom: register_backend() or pass StorageBackend class
        backend_options: Additional backend parameters

    Example:
        writer = await write_queue(system, "my_queue")

        # Custom backend from a plugin
        from my_plugin import MyBackend
        from .backend import register_backend
        register_backend("my_backend", MyBackend)
        writer = await write_queue(system, "my_queue", backend="my_backend")
    """
    # Ensure all nodes in cluster have StorageManager
    from .manager import ensure_storage_managers

    await ensure_storage_managers(system)

    queue = Queue(
        system=system,
        topic=topic,
        bucket_column=bucket_column,
        num_buckets=num_buckets,
        batch_size=batch_size,
        storage_path=storage_path,
        backend=backend,
        backend_options=backend_options,
    )
    return QueueWriter(queue)

write_topic(system, topic, writer_id=None) async

Open topic for writing

Parameters:

Name Type Description Default
system ActorSystem

Actor system

required
topic str

Topic name

required
writer_id str | None

Writer ID (optional)

None

Returns:

Name Type Description
TopicWriter TopicWriter

Write handle

Example

writer = await write_topic(system, "events") await writer.publish({"type": "user_login"})

Source code in pulsing/streaming/pubsub.py
async def write_topic(
    system: ActorSystem,
    topic: str,
    writer_id: str | None = None,
) -> TopicWriter:
    """Open topic for writing

    Args:
        system: Actor system
        topic: Topic name
        writer_id: Writer ID (optional)

    Returns:
        TopicWriter: Write handle

    Example:
        writer = await write_topic(system, "events")
        await writer.publish({"type": "user_login"})
    """
    return TopicWriter(system, topic, writer_id)