Skip to content

Python API Reference

This page contains the complete auto-generated API documentation for Pulsing's Python interface.

Installation

Pulsing requires Python 3.10+ and can be installed via pip:

pip install pulsing

For development, clone the repository and install in development mode:

git clone https://github.com/DeepLink-org/pulsing
cd pulsing
pip install -e .

Core Module

Pulsing - Distributed Actor Framework

Usage

import pulsing as pul

await pul.init()

@pul.remote class Counter: def init(self, init=0): self.value = init def incr(self): self.value += 1; return self.value

counter = await Counter.spawn(name="counter") result = await counter.incr()

await pul.shutdown()

Classes

Actor

Bases: ABC

Base class for Python actors. Implement receive to handle messages.

Functions

receive(msg) abstractmethod async

Handle incoming message.

Source code in pulsing/core/remote.py
@abstractmethod
async def receive(self, msg) -> Any:
    """Handle incoming message."""
    pass

ActorProxy(actor_ref, method_names=None, async_methods=None)

Actor proxy.

Source code in pulsing/core/proxy.py
def __init__(
    self,
    actor_ref: ActorRef,
    method_names: list[str] | None = None,
    async_methods: set[str] | None = None,
):
    self._ref = actor_ref
    self._method_names = set(method_names) if method_names else None
    self._async_methods = async_methods

Attributes

ref property

Get underlying ActorRef.

Functions

as_any()

Return an untyped proxy that forwards any method call to the remote actor.

Source code in pulsing/core/proxy.py
def as_any(self) -> "ActorProxy":
    """Return an untyped proxy that forwards any method call to the remote actor."""
    return ActorProxy(self._ref, method_names=None, async_methods=None)

ActorSystem(inner)

ActorSystem wrapper with queue/topic API

This wraps the Rust ActorSystem and adds Python-level extensions like queue and topic APIs.

Source code in pulsing/__init__.py
def __init__(self, inner: _ActorSystem):
    self._inner = inner
    from pulsing.streaming import QueueAPI, TopicAPI

    self.queue = QueueAPI(inner)
    self.topic = TopicAPI(inner)

Functions

refer(actorid) async

Get actor reference by ID

Parameters:

Name Type Description Default
actorid ActorId | str

Actor ID (ActorId instance or string in format "node_id:local_id")

required

Returns:

Type Description
ActorRef

ActorRef to the actor

Source code in pulsing/__init__.py
async def refer(self, actorid: ActorId | str) -> ActorRef:
    """Get actor reference by ID

    Args:
        actorid: Actor ID (ActorId instance or string in format "node_id:local_id")

    Returns:
        ActorRef to the actor
    """
    if isinstance(actorid, str):
        actorid = ActorId.from_str(actorid)
    return await self._inner.refer(actorid)

PulsingActorError(message, actor_name=None, cause=None)

Bases: PulsingError

User Actor execution errors.

This corresponds to pulsing_actor::error::ActorError in Rust.

These are errors raised by user code during Actor execution: - Business errors (user input errors) - System errors (internal errors from user code) - Timeout errors (operation timeouts) - Unsupported errors (unsupported operations)

Note: Framework-level errors like "Actor not found" are RuntimeError, not ActorError.

Source code in pulsing/exceptions.py
def __init__(
    self,
    message: str,
    actor_name: str | None = None,
    cause: Exception | None = None,
):
    super().__init__(message)
    self.actor_name = actor_name
    self.cause = cause

PulsingBusinessError(code, message, details=None)

Bases: PulsingActorError

Business error: User input error, business logic error.

These errors are recoverable and should be returned to the caller. Automatically converted to ActorError::Business in Rust.

Example

@remote class UserActor: async def validate_age(self, age: int) -> bool: if age < 18: raise PulsingBusinessError(400, "Age must be >= 18", details="User validation failed") return True

Source code in pulsing/exceptions.py
def __init__(self, code: int, message: str, details: str | None = None):
    self.code = code
    self.message = message
    self.details = details
    super().__init__(f"[{code}] {message}", cause=None)

PulsingError

Bases: Exception

Base exception for all Pulsing errors.

This corresponds to pulsing_actor::error::PulsingError in Rust.

PulsingRuntimeError(message, cause=None)

Bases: PulsingError

Framework/system-level errors.

This corresponds to pulsing_actor::error::RuntimeError in Rust.

These are framework-level errors, not caused by user code: - Actor system errors (NotFound, Stopped, etc.) - Transport errors (ConnectionFailed, etc.) - Cluster errors (NodeNotFound, etc.) - Config errors (InvalidValue, etc.) - I/O errors - Serialization errors

Source code in pulsing/exceptions.py
def __init__(self, message: str, cause: Exception | None = None):
    super().__init__(message)
    self.cause = cause

PulsingSystemError(error, recoverable=True)

Bases: PulsingActorError

System error: Internal error, resource error.

May trigger Actor restart depending on recoverable flag. Automatically converted to ActorError::System in Rust.

Example

@remote class DataProcessor: async def process(self, data: str) -> str: try: return process_data(data) except Exception as e: raise PulsingSystemError(f"Processing failed: {e}", recoverable=True)

Source code in pulsing/exceptions.py
def __init__(self, error: str, recoverable: bool = True):
    self.error = error
    self.recoverable = recoverable
    super().__init__(error, cause=None)

PulsingTimeoutError(operation, duration_ms=0)

Bases: PulsingActorError

Timeout error: Operation timed out.

Usually recoverable, can be retried. Automatically converted to ActorError::Timeout in Rust.

Example

@remote class NetworkActor: async def fetch(self, url: str) -> str: try: return await asyncio.wait_for(httpx.get(url), timeout=5.0) except asyncio.TimeoutError: raise PulsingTimeoutError("fetch", duration_ms=5000)

Source code in pulsing/exceptions.py
def __init__(self, operation: str, duration_ms: int = 0):
    self.operation = operation
    self.duration_ms = duration_ms
    super().__init__(
        f"Operation '{operation}' timed out after {duration_ms}ms", cause=None
    )

PulsingUnsupportedError(operation)

Bases: PulsingActorError

Unsupported operation error.

Not recoverable. Indicates that the requested operation is not supported. Automatically converted to ActorError::Unsupported in Rust.

Example

@remote class LegacyActor: async def process(self, data: str) -> str: if data.startswith("legacy:"): raise PulsingUnsupportedError("process") return process_data(data)

Source code in pulsing/exceptions.py
def __init__(self, operation: str):
    self.operation = operation
    super().__init__(f"Unsupported operation: {operation}", cause=None)

Functions

actor_system(addr=None, *, seeds=None, passphrase=None) async

Create a new ActorSystem (does not set global system)

This is the Actor System style API for explicit system management. Use this when you need multiple systems or want explicit control.

Parameters:

Name Type Description Default
addr str | None

Bind address (e.g., "0.0.0.0:8000"). None for standalone mode.

None
seeds list[str] | None

Seed nodes to join cluster

None
passphrase str | None

Enable TLS with this passphrase

None

Returns:

Type Description
ActorSystem

ActorSystem instance with .queue API

Example

import pulsing as pul

Standalone mode

system = await pul.actor_system()

Cluster mode

system = await pul.actor_system(addr="0.0.0.0:8000")

Join existing cluster

system = await pul.actor_system( addr="0.0.0.0:8001", seeds=["192.168.1.1:8000"] )

With TLS

system = await pul.actor_system( addr="0.0.0.0:8000", passphrase="my-secret" )

Queue API

writer = await system.queue.write("my_topic") reader = await system.queue.read("my_topic")

Source code in pulsing/__init__.py
async def actor_system(
    addr: str | None = None,
    *,
    seeds: list[str] | None = None,
    passphrase: str | None = None,
) -> ActorSystem:
    """Create a new ActorSystem (does not set global system)

    This is the Actor System style API for explicit system management.
    Use this when you need multiple systems or want explicit control.

    Args:
        addr: Bind address (e.g., "0.0.0.0:8000"). None for standalone mode.
        seeds: Seed nodes to join cluster
        passphrase: Enable TLS with this passphrase

    Returns:
        ActorSystem instance with .queue API

    Example:
        import pulsing as pul

        # Standalone mode
        system = await pul.actor_system()

        # Cluster mode
        system = await pul.actor_system(addr="0.0.0.0:8000")

        # Join existing cluster
        system = await pul.actor_system(
            addr="0.0.0.0:8001",
            seeds=["192.168.1.1:8000"]
        )

        # With TLS
        system = await pul.actor_system(
            addr="0.0.0.0:8000",
            passphrase="my-secret"
        )

        # Queue API
        writer = await system.queue.write("my_topic")
        reader = await system.queue.read("my_topic")
    """
    # Build config
    if addr:
        config = SystemConfig.with_addr(addr)
    else:
        config = SystemConfig.standalone()

    if seeds:
        config = config.with_seeds(seeds)

    if passphrase:
        config = config.with_passphrase(passphrase)

    loop = asyncio.get_running_loop()
    inner = await _ActorSystem.create(config, loop)

    # Wrap with Python ActorSystem
    system = ActorSystem(inner)

    # Automatically register PythonActorService (for remote actor creation)
    service = _PythonActorService(inner)
    await inner.spawn(service, name=_PYTHON_ACTOR_SERVICE_NAME, public=True)

    return system

cleanup_ray()

Clean up Pulsing state in Ray KV store

Source code in pulsing/__init__.py
def cleanup_ray():
    """Clean up Pulsing state in Ray KV store"""
    from pulsing.integrations.ray import cleanup

    return cleanup()

get_system()

Get the global actor system (must call init() first)

Source code in pulsing/core/__init__.py
def get_system() -> ActorSystem:
    """Get the global actor system (must call init() first)"""
    if _global_system is None:
        from pulsing.exceptions import PulsingRuntimeError

        raise PulsingRuntimeError(
            "Actor system not initialized. Call 'await init()' first."
        )
    return _global_system

init(addr=None, *, seeds=None, passphrase=None, head_addr=None, is_head_node=False) async

Initialize Pulsing actor system

Parameters:

Name Type Description Default
addr str

Bind address (e.g., "0.0.0.0:8000"). None for standalone mode.

None
seeds list[str]

Seed nodes to join cluster (Gossip mode).

None
passphrase str

Enable TLS with this passphrase.

None
head_addr str

Address of head node (worker mode). Mutually exclusive with is_head_node.

None
is_head_node bool

If True, this node runs as head. Mutually exclusive with head_addr.

False

Returns:

Type Description
ActorSystem

ActorSystem instance

Source code in pulsing/core/__init__.py
async def init(
    addr: str = None,
    *,
    seeds: list[str] = None,
    passphrase: str = None,
    head_addr: str = None,
    is_head_node: bool = False,
) -> ActorSystem:
    """Initialize Pulsing actor system

    Args:
        addr: Bind address (e.g., "0.0.0.0:8000"). None for standalone mode.
        seeds: Seed nodes to join cluster (Gossip mode).
        passphrase: Enable TLS with this passphrase.
        head_addr: Address of head node (worker mode). Mutually exclusive with is_head_node.
        is_head_node: If True, this node runs as head. Mutually exclusive with head_addr.

    Returns:
        ActorSystem instance
    """
    global _global_system

    if _global_system is not None:
        return _global_system

    if is_head_node and head_addr:
        raise ValueError("Cannot set both is_head_node and head_addr")

    if addr:
        config = SystemConfig.with_addr(addr)
    else:
        config = SystemConfig.standalone()

    if seeds:
        config = config.with_seeds(seeds)
    if is_head_node:
        config = config.with_head_node()
    elif head_addr:
        config = config.with_head_addr(head_addr)

    if passphrase:
        config = config.with_passphrase(passphrase)

    loop = asyncio.get_running_loop()
    _global_system = await ActorSystem.create(config, loop)

    service = PythonActorService(_global_system)
    await _global_system.spawn(service, name=PYTHON_ACTOR_SERVICE_NAME, public=True)

    # Notify pulsing.subprocess of the loop so sync callers can submit coroutines
    try:
        from pulsing.subprocess.popen import _set_pulsing_loop

        _set_pulsing_loop(loop)
    except ImportError:
        pass

    return _global_system

init_inside_ray()

Initialize Pulsing in Ray worker and join cluster (async version).

Usage::

await pul.init_inside_ray()
Source code in pulsing/__init__.py
def init_inside_ray():
    """Initialize Pulsing in Ray worker and join cluster (async version).

    Usage::

        await pul.init_inside_ray()
    """
    from pulsing.integrations.ray import async_init_in_ray

    return async_init_in_ray()

init_inside_torchrun()

Initialize Pulsing in current process and join cluster via torch.distributed.

Rank 0 becomes the seed; others join with seeds=[rank0_addr]. Call after torch.distributed.init_process_group() (e.g. when launched with torchrun).

Usage::

import torch.distributed as dist
dist.init_process_group(...)
system = pul.init_inside_torchrun()
Source code in pulsing/__init__.py
def init_inside_torchrun():
    """Initialize Pulsing in current process and join cluster via torch.distributed.

    Rank 0 becomes the seed; others join with seeds=[rank0_addr]. Call after
    torch.distributed.init_process_group() (e.g. when launched with torchrun).

    Usage::

        import torch.distributed as dist
        dist.init_process_group(...)
        system = pul.init_inside_torchrun()
    """
    from pulsing.integrations.torchrun import init_in_torchrun

    return init_in_torchrun()

is_initialized()

Check if the global actor system is initialized

Source code in pulsing/core/__init__.py
def is_initialized() -> bool:
    """Check if the global actor system is initialized"""
    return _global_system is not None

mount(instance, *, name, public=True)

Mount an existing Python object to the Pulsing communication network.

Synchronous interface, can be called in __init__. Automatically: 1. Initialize Pulsing (if not already, auto-detects Ray environment) 2. Wrap instance as a Pulsing actor 3. Register to Pulsing network — other nodes can discover via pul.resolve(name)

Parameters:

Name Type Description Default
instance Any

Object to mount (any Python instance)

required
name str

Pulsing name, other nodes resolve via this name

required
public bool

Whether discoverable by other cluster nodes (default True)

True

Example::

@ray.remote
class Counter:
    def __init__(self, name, peers):
        self.name = name
        pul.mount(self, name=name)

    async def greet(self, msg):
        return f"Hello from {self.name}: {msg}"
Source code in pulsing/core/helpers.py
def mount(instance: Any, *, name: str, public: bool = True) -> None:
    """Mount an existing Python object to the Pulsing communication network.

    Synchronous interface, can be called in ``__init__``. Automatically:
      1. Initialize Pulsing (if not already, auto-detects Ray environment)
      2. Wrap instance as a Pulsing actor
      3. Register to Pulsing network — other nodes can discover via ``pul.resolve(name)``

    Args:
        instance: Object to mount (any Python instance)
        name: Pulsing name, other nodes resolve via this name
        public: Whether discoverable by other cluster nodes (default True)

    Example::

        @ray.remote
        class Counter:
            def __init__(self, name, peers):
                self.name = name
                pul.mount(self, name=name)

            async def greet(self, msg):
                return f"Hello from {self.name}: {msg}"
    """
    from . import _global_system

    if _global_system is None:
        _auto_init_pulsing()

    from . import _global_system as system

    if system is None:
        raise RuntimeError(
            "Pulsing initialization failed. Please call pul.init() or run in Ray environment."
        )

    from .remote import _WrappedActor, _register_actor_metadata

    actor_name = name if "/" in name else f"actors/{name}"
    wrapped = _WrappedActor(instance)

    async def _do_mount():
        return await system.spawn(wrapped, name=actor_name, public=public)

    actor_ref = run_sync(_do_mount())
    wrapped._inject_delayed(actor_ref)
    _register_actor_metadata(actor_name, type(instance))

refer(actorid) async

Get actor reference by ID using global system

Parameters:

Name Type Description Default
actorid ActorId | str

Actor ID (ActorId instance or string)

required

Returns:

Type Description
ActorRef

ActorRef to the actor

Source code in pulsing/__init__.py
async def refer(actorid: ActorId | str) -> ActorRef:
    """Get actor reference by ID using global system

    Args:
        actorid: Actor ID (ActorId instance or string)

    Returns:
        ActorRef to the actor
    """
    system = get_system()
    if isinstance(actorid, str):
        # Parse string to ActorId
        actorid = ActorId.from_str(actorid)
    if isinstance(actorid, int):
        actorid = ActorId(actorid)
    return await system.refer(actorid)

resolve(name, *, cls=None, node_id=None, timeout=None) async

Resolve a named actor and return a ready-to-use proxy.

Parameters:

Name Type Description Default
name str

Actor name to resolve.

required
cls type | None

Optional class for typed proxy (validates method names). If omitted, returns an untyped proxy that accepts any method call.

None
node_id int | None

Target node ID (None = any node via load balancing).

None
timeout float | None

Retry timeout in seconds for waiting on gossip propagation.

None

Examples::

proxy = await pul.resolve("counter", cls=Counter, timeout=30)
result = await proxy.incr()

proxy = await pul.resolve("service")
result = await proxy.some_method()
Source code in pulsing/core/remote.py
async def resolve(
    name: str,
    *,
    cls: type | None = None,
    node_id: int | None = None,
    timeout: float | None = None,
) -> ActorProxy:
    """Resolve a named actor and return a ready-to-use proxy.

    Args:
        name: Actor name to resolve.
        cls: Optional class for typed proxy (validates method names).
             If omitted, returns an untyped proxy that accepts any method call.
        node_id: Target node ID (None = any node via load balancing).
        timeout: Retry timeout in seconds for waiting on gossip propagation.

    Examples::

        proxy = await pul.resolve("counter", cls=Counter, timeout=30)
        result = await proxy.incr()

        proxy = await pul.resolve("service")
        result = await proxy.some_method()
    """
    from . import get_system

    ref = await get_system().resolve(name, node_id=node_id, timeout=timeout)
    if cls is not None:
        methods, async_methods = _extract_methods(cls)
        return ActorProxy(ref, methods, async_methods)
    return ActorProxy(ref)

shutdown() async

Shutdown the global actor system

Source code in pulsing/core/__init__.py
async def shutdown() -> None:
    """Shutdown the global actor system"""
    global _global_system

    if _global_system is not None:
        await _global_system.shutdown()
        _global_system = None

spawn(actor, *, name=None, public=False, restart_policy='never', max_restarts=3, min_backoff=0.1, max_backoff=30.0) async

Spawn an actor using the global system

Parameters:

Name Type Description Default
actor Any

Actor instance

required
name str | None

Actor name (auto-generated if None)

None
public bool

Whether to register as public named actor

False
restart_policy str

Restart policy ("never", "always", "on-failure")

'never'
max_restarts int

Maximum restart attempts

3
min_backoff float

Minimum backoff seconds

0.1
max_backoff float

Maximum backoff seconds

30.0

Returns:

Type Description
ActorRef

ActorRef to the spawned actor

Example

import pulsing as pul

await pul.init()

class MyActor: async def receive(self, msg): return f"Got: {msg}"

ref = await pul.spawn(MyActor(), name="my_actor")

Source code in pulsing/__init__.py
async def spawn(
    actor: Any,
    *,
    name: str | None = None,
    public: bool = False,
    restart_policy: str = "never",
    max_restarts: int = 3,
    min_backoff: float = 0.1,
    max_backoff: float = 30.0,
) -> ActorRef:
    """Spawn an actor using the global system

    Args:
        actor: Actor instance
        name: Actor name (auto-generated if None)
        public: Whether to register as public named actor
        restart_policy: Restart policy ("never", "always", "on-failure")
        max_restarts: Maximum restart attempts
        min_backoff: Minimum backoff seconds
        max_backoff: Maximum backoff seconds

    Returns:
        ActorRef to the spawned actor

    Example:
        import pulsing as pul

        await pul.init()

        class MyActor:
            async def receive(self, msg):
                return f"Got: {msg}"

        ref = await pul.spawn(MyActor(), name="my_actor")
    """
    system = get_system()
    return await system.spawn(
        actor,
        name=name,
        public=public,
        restart_policy=restart_policy,
        max_restarts=max_restarts,
        min_backoff=min_backoff,
        max_backoff=max_backoff,
    )

unmount(name)

Unmount a previously mounted actor from the Pulsing network.

Parameters:

Name Type Description Default
name str

Name used during mounting

required
Source code in pulsing/core/helpers.py
def unmount(name: str) -> None:
    """Unmount a previously mounted actor from the Pulsing network.

    Args:
        name: Name used during mounting
    """
    from . import _global_system

    if _global_system is None:
        return

    actor_name = name if "/" in name else f"actors/{name}"

    async def _do_unmount():
        await _global_system.stop(actor_name)

    run_sync(_do_unmount())

Actor Module

Pulsing Core - Python bindings for distributed actor framework

Simple API

from pulsing.core import init, shutdown, remote

await init()

@remote class Counter: def init(self, init=0): self.value = init def incr(self): self.value += 1; return self.value

counter = await Counter.spawn(init=10) result = await counter.incr()

await shutdown()

Classes

Actor

Bases: ABC

Base class for Python actors. Implement receive to handle messages.

Functions

receive(msg) abstractmethod async

Handle incoming message.

Source code in pulsing/core/remote.py
@abstractmethod
async def receive(self, msg) -> Any:
    """Handle incoming message."""
    pass

ActorProxy(actor_ref, method_names=None, async_methods=None)

Actor proxy.

Source code in pulsing/core/proxy.py
def __init__(
    self,
    actor_ref: ActorRef,
    method_names: list[str] | None = None,
    async_methods: set[str] | None = None,
):
    self._ref = actor_ref
    self._method_names = set(method_names) if method_names else None
    self._async_methods = async_methods

Attributes

ref property

Get underlying ActorRef.

Functions

as_any()

Return an untyped proxy that forwards any method call to the remote actor.

Source code in pulsing/core/proxy.py
def as_any(self) -> "ActorProxy":
    """Return an untyped proxy that forwards any method call to the remote actor."""
    return ActorProxy(self._ref, method_names=None, async_methods=None)

PulsingActorError(message, actor_name=None, cause=None)

Bases: PulsingError

User Actor execution errors.

This corresponds to pulsing_actor::error::ActorError in Rust.

These are errors raised by user code during Actor execution: - Business errors (user input errors) - System errors (internal errors from user code) - Timeout errors (operation timeouts) - Unsupported errors (unsupported operations)

Note: Framework-level errors like "Actor not found" are RuntimeError, not ActorError.

Source code in pulsing/exceptions.py
def __init__(
    self,
    message: str,
    actor_name: str | None = None,
    cause: Exception | None = None,
):
    super().__init__(message)
    self.actor_name = actor_name
    self.cause = cause

PulsingError

Bases: Exception

Base exception for all Pulsing errors.

This corresponds to pulsing_actor::error::PulsingError in Rust.

PulsingRuntimeError(message, cause=None)

Bases: PulsingError

Framework/system-level errors.

This corresponds to pulsing_actor::error::RuntimeError in Rust.

These are framework-level errors, not caused by user code: - Actor system errors (NotFound, Stopped, etc.) - Transport errors (ConnectionFailed, etc.) - Cluster errors (NodeNotFound, etc.) - Config errors (InvalidValue, etc.) - I/O errors - Serialization errors

Source code in pulsing/exceptions.py
def __init__(self, message: str, cause: Exception | None = None):
    super().__init__(message)
    self.cause = cause

SystemActorProxy(actor_ref)

Proxy for SystemActor with direct method calls.

Example

system_proxy = await get_system_actor(system) actors = await system_proxy.list_actors() metrics = await system_proxy.get_metrics() await system_proxy.ping()

Source code in pulsing/core/service.py
def __init__(self, actor_ref: ActorRef):
    self._ref = actor_ref

Attributes

ref property

Get underlying ActorRef.

Functions

get_metrics() async

Get system metrics.

Source code in pulsing/core/service.py
async def get_metrics(self) -> dict:
    """Get system metrics."""
    return await self._ask("GetMetrics")
get_node_info() async

Get node info.

Source code in pulsing/core/service.py
async def get_node_info(self) -> dict:
    """Get node info."""
    return await self._ask("GetNodeInfo")
health_check() async

Health check.

Source code in pulsing/core/service.py
async def health_check(self) -> dict:
    """Health check."""
    return await self._ask("HealthCheck")
list_actors() async

List all actors on this node.

Source code in pulsing/core/service.py
async def list_actors(self) -> list[dict]:
    """List all actors on this node."""
    data = await self._ask("ListActors")
    if data.get("type") == "Error":
        raise PulsingRuntimeError(data.get("message"))
    return data.get("actors", [])
ping() async

Ping this node.

Source code in pulsing/core/service.py
async def ping(self) -> dict:
    """Ping this node."""
    return await self._ask("Ping")

Functions

get_system()

Get the global actor system (must call init() first)

Source code in pulsing/core/__init__.py
def get_system() -> ActorSystem:
    """Get the global actor system (must call init() first)"""
    if _global_system is None:
        from pulsing.exceptions import PulsingRuntimeError

        raise PulsingRuntimeError(
            "Actor system not initialized. Call 'await init()' first."
        )
    return _global_system

get_system_actor(system, node_id=None) async

Get SystemActorProxy for direct method calls.

Source code in pulsing/core/service.py
async def get_system_actor(
    system: ActorSystem, node_id: int | None = None
) -> SystemActorProxy:
    """Get SystemActorProxy for direct method calls."""
    if node_id is None:
        actor_ref = await system.system()
    else:
        actor_ref = await system.remote_system(node_id)
    return SystemActorProxy(actor_ref)

init(addr=None, *, seeds=None, passphrase=None, head_addr=None, is_head_node=False) async

Initialize Pulsing actor system

Parameters:

Name Type Description Default
addr str

Bind address (e.g., "0.0.0.0:8000"). None for standalone mode.

None
seeds list[str]

Seed nodes to join cluster (Gossip mode).

None
passphrase str

Enable TLS with this passphrase.

None
head_addr str

Address of head node (worker mode). Mutually exclusive with is_head_node.

None
is_head_node bool

If True, this node runs as head. Mutually exclusive with head_addr.

False

Returns:

Type Description
ActorSystem

ActorSystem instance

Source code in pulsing/core/__init__.py
async def init(
    addr: str = None,
    *,
    seeds: list[str] = None,
    passphrase: str = None,
    head_addr: str = None,
    is_head_node: bool = False,
) -> ActorSystem:
    """Initialize Pulsing actor system

    Args:
        addr: Bind address (e.g., "0.0.0.0:8000"). None for standalone mode.
        seeds: Seed nodes to join cluster (Gossip mode).
        passphrase: Enable TLS with this passphrase.
        head_addr: Address of head node (worker mode). Mutually exclusive with is_head_node.
        is_head_node: If True, this node runs as head. Mutually exclusive with head_addr.

    Returns:
        ActorSystem instance
    """
    global _global_system

    if _global_system is not None:
        return _global_system

    if is_head_node and head_addr:
        raise ValueError("Cannot set both is_head_node and head_addr")

    if addr:
        config = SystemConfig.with_addr(addr)
    else:
        config = SystemConfig.standalone()

    if seeds:
        config = config.with_seeds(seeds)
    if is_head_node:
        config = config.with_head_node()
    elif head_addr:
        config = config.with_head_addr(head_addr)

    if passphrase:
        config = config.with_passphrase(passphrase)

    loop = asyncio.get_running_loop()
    _global_system = await ActorSystem.create(config, loop)

    service = PythonActorService(_global_system)
    await _global_system.spawn(service, name=PYTHON_ACTOR_SERVICE_NAME, public=True)

    # Notify pulsing.subprocess of the loop so sync callers can submit coroutines
    try:
        from pulsing.subprocess.popen import _set_pulsing_loop

        _set_pulsing_loop(loop)
    except ImportError:
        pass

    return _global_system

is_initialized()

Check if the global actor system is initialized

Source code in pulsing/core/__init__.py
def is_initialized() -> bool:
    """Check if the global actor system is initialized"""
    return _global_system is not None

mount(instance, *, name, public=True)

Mount an existing Python object to the Pulsing communication network.

Synchronous interface, can be called in __init__. Automatically: 1. Initialize Pulsing (if not already, auto-detects Ray environment) 2. Wrap instance as a Pulsing actor 3. Register to Pulsing network — other nodes can discover via pul.resolve(name)

Parameters:

Name Type Description Default
instance Any

Object to mount (any Python instance)

required
name str

Pulsing name, other nodes resolve via this name

required
public bool

Whether discoverable by other cluster nodes (default True)

True

Example::

@ray.remote
class Counter:
    def __init__(self, name, peers):
        self.name = name
        pul.mount(self, name=name)

    async def greet(self, msg):
        return f"Hello from {self.name}: {msg}"
Source code in pulsing/core/helpers.py
def mount(instance: Any, *, name: str, public: bool = True) -> None:
    """Mount an existing Python object to the Pulsing communication network.

    Synchronous interface, can be called in ``__init__``. Automatically:
      1. Initialize Pulsing (if not already, auto-detects Ray environment)
      2. Wrap instance as a Pulsing actor
      3. Register to Pulsing network — other nodes can discover via ``pul.resolve(name)``

    Args:
        instance: Object to mount (any Python instance)
        name: Pulsing name, other nodes resolve via this name
        public: Whether discoverable by other cluster nodes (default True)

    Example::

        @ray.remote
        class Counter:
            def __init__(self, name, peers):
                self.name = name
                pul.mount(self, name=name)

            async def greet(self, msg):
                return f"Hello from {self.name}: {msg}"
    """
    from . import _global_system

    if _global_system is None:
        _auto_init_pulsing()

    from . import _global_system as system

    if system is None:
        raise RuntimeError(
            "Pulsing initialization failed. Please call pul.init() or run in Ray environment."
        )

    from .remote import _WrappedActor, _register_actor_metadata

    actor_name = name if "/" in name else f"actors/{name}"
    wrapped = _WrappedActor(instance)

    async def _do_mount():
        return await system.spawn(wrapped, name=actor_name, public=public)

    actor_ref = run_sync(_do_mount())
    wrapped._inject_delayed(actor_ref)
    _register_actor_metadata(actor_name, type(instance))

resolve(name, *, cls=None, node_id=None, timeout=None) async

Resolve a named actor and return a ready-to-use proxy.

Parameters:

Name Type Description Default
name str

Actor name to resolve.

required
cls type | None

Optional class for typed proxy (validates method names). If omitted, returns an untyped proxy that accepts any method call.

None
node_id int | None

Target node ID (None = any node via load balancing).

None
timeout float | None

Retry timeout in seconds for waiting on gossip propagation.

None

Examples::

proxy = await pul.resolve("counter", cls=Counter, timeout=30)
result = await proxy.incr()

proxy = await pul.resolve("service")
result = await proxy.some_method()
Source code in pulsing/core/remote.py
async def resolve(
    name: str,
    *,
    cls: type | None = None,
    node_id: int | None = None,
    timeout: float | None = None,
) -> ActorProxy:
    """Resolve a named actor and return a ready-to-use proxy.

    Args:
        name: Actor name to resolve.
        cls: Optional class for typed proxy (validates method names).
             If omitted, returns an untyped proxy that accepts any method call.
        node_id: Target node ID (None = any node via load balancing).
        timeout: Retry timeout in seconds for waiting on gossip propagation.

    Examples::

        proxy = await pul.resolve("counter", cls=Counter, timeout=30)
        result = await proxy.incr()

        proxy = await pul.resolve("service")
        result = await proxy.some_method()
    """
    from . import get_system

    ref = await get_system().resolve(name, node_id=node_id, timeout=timeout)
    if cls is not None:
        methods, async_methods = _extract_methods(cls)
        return ActorProxy(ref, methods, async_methods)
    return ActorProxy(ref)

shutdown() async

Shutdown the global actor system

Source code in pulsing/core/__init__.py
async def shutdown() -> None:
    """Shutdown the global actor system"""
    global _global_system

    if _global_system is not None:
        await _global_system.shutdown()
        _global_system = None

unmount(name)

Unmount a previously mounted actor from the Pulsing network.

Parameters:

Name Type Description Default
name str

Name used during mounting

required
Source code in pulsing/core/helpers.py
def unmount(name: str) -> None:
    """Unmount a previously mounted actor from the Pulsing network.

    Args:
        name: Name used during mounting
    """
    from . import _global_system

    if _global_system is None:
        return

    actor_name = name if "/" in name else f"actors/{name}"

    async def _do_unmount():
        await _global_system.stop(actor_name)

    run_sync(_do_unmount())

Agent Module

Pulsing Agent Toolbox

Lightweight multi-agent development tools, fully compatible with pulsing.core.

Core APIs: - runtime(): Actor system lifecycle management - agent(): @remote with metadata (for visualization) - llm(): LLM client - parse_json(): JSON parsing

Example

from pulsing.core import remote, resolve from pulsing.agent import agent, runtime, llm, get_agent_meta

@remote: Basic Actor

@remote class Worker: async def work(self): ...

@agent: Actor with metadata (for visualization/debugging)

@agent(role="Researcher", goal="Deep analysis") class Researcher: async def analyze(self, topic: str) -> str: client = await llm() return await client.ainvoke(f"Analyze: {topic}")

async def main(): async with runtime(): r = await Researcher.spawn(name="researcher") result = await r.analyze("AI")

    # Get metadata
    meta = get_agent_meta("researcher")
    print(meta.role)  # "Researcher"

Classes

AgentMeta(role='', goal='', backstory='', tags=dict()) dataclass

Agent metadata

Functions

agent(role='', goal='', backstory='', **tags)

Agent decorator - equivalent to @remote, but with additional metadata

Metadata can be retrieved via get_agent_meta(name).

Example

@agent(role="Researcher", goal="Deep analysis") class Researcher: async def analyze(self, topic: str) -> str: ...

async with runtime(): r = await Researcher.spawn(name="researcher")

# Get metadata
meta = get_agent_meta("researcher")
print(meta.role)  # "Researcher"
Source code in pulsing/agent/base.py
def agent(
    role: str = "",
    goal: str = "",
    backstory: str = "",
    **tags: Any,
) -> Callable[[type[T]], type[T]]:
    """
    Agent decorator - equivalent to @remote, but with additional metadata

    Metadata can be retrieved via get_agent_meta(name).

    Example:
        @agent(role="Researcher", goal="Deep analysis")
        class Researcher:
            async def analyze(self, topic: str) -> str:
                ...

        async with runtime():
            r = await Researcher.spawn(name="researcher")

            # Get metadata
            meta = get_agent_meta("researcher")
            print(meta.role)  # "Researcher"
    """

    def decorator(cls: type[T]) -> type[T]:
        # Create metadata
        meta = AgentMeta(role=role, goal=goal, backstory=backstory, tags=tags)

        # Store on class (for use during spawn)
        cls._agent_meta_template = meta  # type: ignore

        # Wrap spawn method to register metadata
        actor_cls = remote(cls)
        original_spawn = actor_cls.spawn

        async def spawn_with_meta(*args: Any, name: str | None = None, **kwargs: Any):
            proxy = await original_spawn(*args, name=name, **kwargs)
            # Register metadata
            if name:
                _agent_meta_registry[name] = meta
            return proxy

        actor_cls.spawn = spawn_with_meta
        actor_cls.__agent_meta__ = meta  # Also accessible at class level

        return actor_cls

    return decorator

cleanup()

Clean up all agent-related global state.

Includes: - Agent metadata registry - LLM singleton

Recommended to call when repeatedly creating/destroying runtime to avoid memory leaks.

Example

from pulsing.agent import runtime, cleanup

try: async with runtime(): agent = await MyAgent.spawn(name="agent") await agent.work() finally: cleanup() # Clean up all global state

Source code in pulsing/agent/__init__.py
def cleanup():
    """
    Clean up all agent-related global state.

    Includes:
    - Agent metadata registry
    - LLM singleton

    Recommended to call when repeatedly creating/destroying runtime to avoid memory leaks.

    Example:
        from pulsing.agent import runtime, cleanup

        try:
            async with runtime():
                agent = await MyAgent.spawn(name="agent")
                await agent.work()
        finally:
            cleanup()  # Clean up all global state
    """
    clear_agent_registry()
    reset_llm()

clear_agent_registry()

Clear registry (for testing)

Source code in pulsing/agent/base.py
def clear_agent_registry() -> None:
    """Clear registry (for testing)"""
    _agent_meta_registry.clear()

extract_field(content, field, fallback=None)

Extract specified field from JSON output.

Example

response = '{"score": 8, "reason": "good"}' score = extract_field(response, "score", fallback=0) # 8

Source code in pulsing/agent/utils.py
def extract_field(content: str | None, field: str, fallback: Any = None) -> Any:
    """
    Extract specified field from JSON output.

    Example:
        response = '{"score": 8, "reason": "good"}'
        score = extract_field(response, "score", fallback=0)  # 8
    """
    data = parse_json(content, {})
    if isinstance(data, dict):
        return data.get(field, fallback)
    return fallback

get_agent_meta(name)

Get metadata by actor name

Source code in pulsing/agent/base.py
def get_agent_meta(name: str) -> AgentMeta | None:
    """Get metadata by actor name"""
    return _agent_meta_registry.get(name)

list_agents()

List all registered agents and their metadata

Source code in pulsing/agent/base.py
def list_agents() -> dict[str, AgentMeta]:
    """List all registered agents and their metadata"""
    return _agent_meta_registry.copy()

parse_json(content, fallback=None)

Parse JSON from LLM output.

Supports: - Pure JSON strings - json ... code block wrapped - Returns fallback on parse failure

Example

from pulsing.agent import parse_json

Pure JSON

data = parse_json('{"name": "test"}')

Code block wrapped

data = parse_json('json\n{"name": "test"}\n')

Parse failure

data = parse_json('invalid', fallback={}) # Returns {}

Source code in pulsing/agent/utils.py
def parse_json(content: str | None, fallback: Any = None) -> Any:
    """
    Parse JSON from LLM output.

    Supports:
    - Pure JSON strings
    - ```json ... ``` code block wrapped
    - Returns fallback on parse failure

    Example:
        from pulsing.agent import parse_json

        # Pure JSON
        data = parse_json('{"name": "test"}')

        # Code block wrapped
        data = parse_json('```json\\n{"name": "test"}\\n```')

        # Parse failure
        data = parse_json('invalid', fallback={})  # Returns {}
    """
    if content is None:
        return fallback

    text = str(content).strip()
    if not text:
        return fallback

    # Handle ```json ... ``` wrapping
    if text.startswith("```"):
        parts = text.split("```")
        if len(parts) >= 2:
            text = parts[1].lstrip()
            if text.lower().startswith("json"):
                text = text[4:].lstrip()

    try:
        return json.loads(text)
    except (json.JSONDecodeError, ValueError):
        return fallback

reset_llm()

Reset LLM singleton

Source code in pulsing/agent/llm.py
def reset_llm():
    """Reset LLM singleton"""
    global _default_llm
    _default_llm = None

Queue Module

Streaming - Queue (point-to-point) and Pub/Sub (topic) APIs

Queue

writer = await system.streaming.write("my_queue") # or system.queue reader = await system.streaming.read("my_queue")

Topic

writer = await system.topic.write("events") reader = await system.topic.read("events")

Classes

BucketStorage(bucket_id, storage_path, batch_size=100, backend='memory', backend_options=None)

Storage Actor for a Single Bucket

Uses pluggable StorageBackend for data storage.

Parameters:

Name Type Description Default
bucket_id int

Bucket ID

required
storage_path str

Storage path

required
batch_size int

Batch size

100
backend str | type

Backend name or backend class - "memory": Pure in-memory backend (default) - Custom name/class: Use register_backend() or pass class

'memory'
backend_options dict[str, Any] | None

Additional parameters passed to backend

None
Source code in pulsing/streaming/storage.py
def __init__(
    self,
    bucket_id: int,
    storage_path: str,
    batch_size: int = 100,
    backend: str | type = "memory",
    backend_options: dict[str, Any] | None = None,
):
    self.bucket_id = bucket_id
    self.storage_path = storage_path
    self.batch_size = batch_size
    self._backend_type = backend
    self._backend_options = backend_options or {}

    # Backend instance (initialized in on_start)
    self._backend: StorageBackend | None = None
    # Typed extension references — set once in on_start via isinstance checks
    self._tensor_backend: TensorBackend | None = None
    self._consumption_backend: ConsumptionBackend | None = None
    self._production_status: dict[int, dict[str, str]] = {}
    self._consumption_status: dict[str, set[int]] = {}
    self._key_to_index: dict[str, int] = {}

Functions

flush() async

Flush pending writes.

Returns:

Type Description
dict

{"status": "ok"}

Source code in pulsing/streaming/storage.py
async def flush(self) -> dict:
    """Flush pending writes.

    Returns:
        {"status": "ok"}
    """
    await self._backend.flush()
    return {"status": "ok"}
get(limit=100, offset=0) async

Get records.

Parameters:

Name Type Description Default
limit int

Maximum number of records to return

100
offset int

Starting offset

0

Returns:

Type Description
list[dict]

List of records

Source code in pulsing/streaming/storage.py
async def get(self, limit: int = 100, offset: int = 0) -> list[dict]:
    """Get records.

    Args:
        limit: Maximum number of records to return
        offset: Starting offset

    Returns:
        List of records
    """
    return await self._backend.get(limit, offset)
get_stream(limit=100, offset=0, wait=False, timeout=None) async

Get records as a stream.

Parameters:

Name Type Description Default
limit int

Maximum number of records to return

100
offset int

Starting offset

0
wait bool

Whether to wait for new records

False
timeout float | None

Timeout in seconds

None

Yields:

Type Description
AsyncIterator[list[dict]]

Batches of records

Source code in pulsing/streaming/storage.py
async def get_stream(
    self,
    limit: int = 100,
    offset: int = 0,
    wait: bool = False,
    timeout: float | None = None,
) -> AsyncIterator[list[dict]]:
    """Get records as a stream.

    Args:
        limit: Maximum number of records to return
        offset: Starting offset
        wait: Whether to wait for new records
        timeout: Timeout in seconds

    Yields:
        Batches of records
    """
    async for records in self._backend.get_stream(limit, offset, wait, timeout):
        yield records
put(record) async

Put a single record.

Parameters:

Name Type Description Default
record dict

Record to store

required

Returns:

Type Description
dict

{"status": "ok"}

Source code in pulsing/streaming/storage.py
async def put(self, record: dict) -> dict:
    """Put a single record.

    Args:
        record: Record to store

    Returns:
        {"status": "ok"}
    """
    if not record:
        raise ValueError("Missing 'record'")
    before = self._backend.total_count()
    await self._backend.put(record)
    fields = [k for k in record.keys() if not str(k).startswith("_")]
    self._production_status[before] = {field: "ready" for field in fields}
    return {"status": "ok"}
put_batch(records) async

Put multiple records.

Parameters:

Name Type Description Default
records list[dict]

List of records to store

required

Returns:

Type Description
dict

{"status": "ok", "count": N}

Source code in pulsing/streaming/storage.py
async def put_batch(self, records: list[dict]) -> dict:
    """Put multiple records.

    Args:
        records: List of records to store

    Returns:
        {"status": "ok", "count": N}
    """
    if not records:
        raise ValueError("Missing 'records'")
    start = self._backend.total_count()
    await self._backend.put_batch(records)
    for i, record in enumerate(records):
        fields = [k for k in record.keys() if not str(k).startswith("_")]
        self._production_status[start + i] = {field: "ready" for field in fields}
    return {"status": "ok", "count": len(records)}
stats() async

Get storage statistics.

Returns:

Type Description
dict

Statistics dict from backend

Source code in pulsing/streaming/storage.py
async def stats(self) -> dict:
    """Get storage statistics.

    Returns:
        Statistics dict from backend
    """
    return await self._backend.stats()

MemoryBackend(bucket_id, **kwargs)

Pure In-Memory Backend - Built-in Default Implementation

Features: - No persistence, data exists only in memory - Supports blocking wait for new data - Lightweight, suitable for testing and temporary data

For persistence, use a plugin that implements StorageBackend (e.g. register_backend).

Source code in pulsing/streaming/backend.py
def __init__(self, bucket_id: int, **kwargs):
    self.bucket_id = bucket_id
    self.buffer: list[dict[str, Any]] = []
    self._lock = asyncio.Lock()
    self._condition = asyncio.Condition(self._lock)

PublishMode

Bases: Enum

Publish mode

PublishResult(success, delivered, failed, subscriber_count, failed_subscribers=None) dataclass

Publish result

Queue(system, topic, bucket_column='id', num_buckets=4, batch_size=100, storage_path=None, backend='memory', backend_options=None)

Distributed Queue - High-level API

Each bucket corresponds to an independent BucketStorage Actor.

Parameters:

Name Type Description Default
system ActorSystem

Actor system

required
topic str

Queue topic

required
bucket_column str

Bucketing column name

'id'
num_buckets int

Number of buckets

4
batch_size int

Batch size

100
storage_path str | None

Storage path

None
backend str | type

Storage backend - "memory": Pure in-memory backend (default) - Custom: register_backend() or class implementing StorageBackend

'memory'
backend_options dict[str, Any] | None

Additional backend parameters

None
Source code in pulsing/streaming/queue.py
def __init__(
    self,
    system: ActorSystem,
    topic: str,
    bucket_column: str = "id",
    num_buckets: int = 4,
    batch_size: int = 100,
    storage_path: str | None = None,
    backend: str | type = "memory",
    backend_options: dict[str, Any] | None = None,
):
    self.system = system
    self.topic = topic
    self.bucket_column = bucket_column
    self.num_buckets = num_buckets
    self.batch_size = batch_size
    self.storage_path = storage_path or f"./queue_storage/{topic}"
    self.backend = backend
    self.backend_options = backend_options

    # Actor proxies for each bucket; per-bucket locks allow parallel resolution
    self._bucket_refs: dict[int, ActorProxy] = {}
    self._bucket_locks: dict[int, asyncio.Lock] = {}
    self._bucket_locks_meta = asyncio.Lock()

    # Save event loop reference (for sync wrapper)
    try:
        self._loop = asyncio.get_running_loop()
    except RuntimeError:
        self._loop = None

Functions

flush() async

Flush all bucket buffers

Source code in pulsing/streaming/queue.py
async def flush(self) -> None:
    """Flush all bucket buffers"""
    tasks = []
    for bucket_id in range(self.num_buckets):
        if bucket_id in self._bucket_refs:
            # Direct method call via proxy
            tasks.append(self._bucket_refs[bucket_id].flush())
    if tasks:
        await asyncio.gather(*tasks)
get(bucket_id=None, limit=100, offset=0, wait=False, timeout=None) async

Read data from queue (both in-memory and persistent data visible)

Source code in pulsing/streaming/queue.py
async def get(
    self,
    bucket_id: int | None = None,
    limit: int = 100,
    offset: int = 0,
    wait: bool = False,
    timeout: float | None = None,
) -> list[dict[str, Any]]:
    """Read data from queue (both in-memory and persistent data visible)"""
    if bucket_id is not None:
        return await self._get_from_bucket(bucket_id, limit, offset, wait, timeout)

    # Read from all buckets
    all_records = []
    for bid in range(self.num_buckets):
        if len(all_records) >= limit:
            break
        records = await self._get_from_bucket(
            bid, limit - len(all_records), offset, wait, timeout
        )
        all_records.extend(records)

    return all_records[:limit]
get_bucket_id(partition_value)

Calculate bucket ID based on partition column value

Source code in pulsing/streaming/queue.py
def get_bucket_id(self, partition_value: Any) -> int:
    """Calculate bucket ID based on partition column value"""
    return self._hash_partition(partition_value)
put(record) async

Write data to queue

Source code in pulsing/streaming/queue.py
async def put(
    self, record: dict[str, Any] | list[dict[str, Any]]
) -> dict[str, Any] | list[dict[str, Any]]:
    """Write data to queue"""
    if isinstance(record, dict):
        records = [record]
        single = True
    elif isinstance(record, list):
        records = record
        single = False
    else:
        raise TypeError("record must be a dict or list of dicts")

    results = []
    for rec in records:
        if self.bucket_column not in rec:
            raise ValueError(f"Missing partition column '{self.bucket_column}'")

        bucket_id = self._hash_partition(rec[self.bucket_column])
        bucket = await self._ensure_bucket(bucket_id)

        # Direct method call via proxy
        await bucket.put(rec)
        results.append({"bucket_id": bucket_id, "status": "ok"})

    return results[0] if single else results
stats() async

Get queue statistics

Source code in pulsing/streaming/queue.py
async def stats(self) -> dict[str, Any]:
    """Get queue statistics"""
    bucket_stats = {}
    for bucket_id in range(self.num_buckets):
        if bucket_id in self._bucket_refs:
            # Direct method call via proxy
            bucket_stats[bucket_id] = await self._bucket_refs[bucket_id].stats()

    return {
        "topic": self.topic,
        "bucket_column": self.bucket_column,
        "num_buckets": self.num_buckets,
        "buckets": bucket_stats,
    }
sync()

Return synchronous wrapper

Example

queue = Queue(system, topic="test") sync_queue = queue.sync() sync_queue.put({"id": "1", "value": 100}) # Synchronous write records = sync_queue.get(limit=10) # Synchronous read

Source code in pulsing/streaming/queue.py
def sync(self) -> "SyncQueue":
    """Return synchronous wrapper

    Example:
        queue = Queue(system, topic="test")
        sync_queue = queue.sync()
        sync_queue.put({"id": "1", "value": 100})  # Synchronous write
        records = sync_queue.get(limit=10)  # Synchronous read
    """
    from .sync_queue import SyncQueue

    return SyncQueue(self)

QueueAPI(system)

Queue API entry point via system.queue — delegates to write_queue/read_queue.

Source code in pulsing/streaming/__init__.py
def __init__(self, system: "ActorSystem"):
    self._system = system

QueueReader(queue, bucket_ids=None)

Queue read handle

Supports three modes: 1. bucket_ids=None: Read from all buckets 2. bucket_ids=[0, 2]: Read from specified bucket list 3. Auto-assign buckets via rank/world_size (distributed consumption)

Source code in pulsing/streaming/queue.py
def __init__(self, queue: Queue, bucket_ids: list[int] | None = None):
    self.queue = queue
    self.bucket_ids = bucket_ids  # None means read from all buckets
    # Each bucket independently maintains offset
    self._offsets: dict[int, int] = {}

Functions

get(limit=100, wait=False, timeout=None) async

Read data from assigned buckets

Source code in pulsing/streaming/queue.py
async def get(
    self,
    limit: int = 100,
    wait: bool = False,
    timeout: float | None = None,
) -> list[dict[str, Any]]:
    """Read data from assigned buckets"""
    bucket_ids = self._get_bucket_ids()
    all_records = []

    for bid in bucket_ids:
        if len(all_records) >= limit:
            break

        offset = self._offsets.get(bid, 0)
        records = await self.queue._get_from_bucket(
            bid, limit - len(all_records), offset, wait, timeout
        )
        self._offsets[bid] = offset + len(records)
        all_records.extend(records)

    return all_records[:limit]
reset()

Reset offsets for all buckets

Source code in pulsing/streaming/queue.py
def reset(self) -> None:
    """Reset offsets for all buckets"""
    self._offsets.clear()
set_offset(offset, bucket_id=None)

Set offset

Source code in pulsing/streaming/queue.py
def set_offset(self, offset: int, bucket_id: int | None = None) -> None:
    """Set offset"""
    if bucket_id is not None:
        self._offsets[bucket_id] = offset
    else:
        for bid in self._get_bucket_ids():
            self._offsets[bid] = offset
sync()

Return synchronous wrapper

Source code in pulsing/streaming/queue.py
def sync(self) -> "SyncQueueReader":
    """Return synchronous wrapper"""
    from .sync_queue import SyncQueueReader

    return SyncQueueReader(self)

StorageBackend

Bases: Protocol

Core Storage Backend Protocol.

Every backend must implement these seven methods. Duck typing is fine — inheritance from this class is not required.

Functions

flush() abstractmethod async

Flush buffer to persistent storage.

Source code in pulsing/streaming/backend.py
@abstractmethod
async def flush(self) -> None:
    """Flush buffer to persistent storage."""
    ...
get(limit, offset) abstractmethod async

Read records.

Source code in pulsing/streaming/backend.py
@abstractmethod
async def get(self, limit: int, offset: int) -> list[dict[str, Any]]:
    """Read records."""
    ...
get_stream(limit, offset, wait=False, timeout=None) abstractmethod async

Stream records.

Source code in pulsing/streaming/backend.py
@abstractmethod
async def get_stream(
    self,
    limit: int,
    offset: int,
    wait: bool = False,
    timeout: float | None = None,
) -> AsyncIterator[list[dict[str, Any]]]:
    """Stream records."""
    ...
put(record) abstractmethod async

Write a single record.

Source code in pulsing/streaming/backend.py
@abstractmethod
async def put(self, record: dict[str, Any]) -> None:
    """Write a single record."""
    ...
put_batch(records) abstractmethod async

Write records in batch.

Source code in pulsing/streaming/backend.py
@abstractmethod
async def put_batch(self, records: list[dict[str, Any]]) -> None:
    """Write records in batch."""
    ...
stats() abstractmethod async

Return statistics dict.

Source code in pulsing/streaming/backend.py
@abstractmethod
async def stats(self) -> dict[str, Any]:
    """Return statistics dict."""
    ...
total_count() abstractmethod

Total record count (synchronous).

Source code in pulsing/streaming/backend.py
@abstractmethod
def total_count(self) -> int:
    """Total record count (synchronous)."""
    ...

StorageManager(system, base_storage_path='./queue_storage', default_backend='memory')

Storage manager Actor

One instance per node, responsible for: 1. Receiving GetBucket/GetTopic requests 2. Determining if resource belongs to this node (consistent hashing) 3. If belongs to this node: create/return corresponding Actor 4. If not belongs to this node: return Redirect response pointing to correct node

Supported resource types: - Queue Bucket: GetBucket -> BucketStorage Actor - Topic Broker: GetTopic -> TopicBroker Actor

Source code in pulsing/streaming/manager.py
def __init__(
    self,
    system: ActorSystem,
    base_storage_path: str = "./queue_storage",
    default_backend: str | type = "memory",
):
    self.system = system
    self.base_storage_path = base_storage_path
    self.default_backend = default_backend

    # Buckets managed by this node: {(topic, bucket_id): ActorRef}
    self._buckets: dict[tuple[str, int], ActorRef] = {}
    # Topic brokers managed by this node: {topic_name: ActorRef}
    self._topics: dict[str, ActorRef] = {}
    self._locks: dict[str, asyncio.Lock] = {}
    self._locks_meta = asyncio.Lock()

    self._members: list[dict] = []
    self._members_updated_at: float = 0

Functions

get_stats() async

Get storage manager statistics.

Returns:

Type Description
dict

{"node_id": ..., "bucket_count": ..., "topic_count": ..., "buckets": [...], "topics": [...]}

Source code in pulsing/streaming/manager.py
async def get_stats(self) -> dict:
    """Get storage manager statistics.

    Returns:
        {"node_id": ..., "bucket_count": ..., "topic_count": ..., "buckets": [...], "topics": [...]}
    """
    return {
        "node_id": str(self.system.node_id.id),
        "bucket_count": len(self._buckets),
        "topic_count": len(self._topics),
        "buckets": [
            {"topic": t, "bucket_id": b} for (t, b) in self._buckets.keys()
        ],
        "topics": list(self._topics.keys()),
    }
list_buckets() async

List all buckets managed by this node.

Returns:

Type Description
list[dict]

List of {"topic": ..., "bucket_id": ...}

Source code in pulsing/streaming/manager.py
async def list_buckets(self) -> list[dict]:
    """List all buckets managed by this node.

    Returns:
        List of {"topic": ..., "bucket_id": ...}
    """
    return [
        {"topic": topic, "bucket_id": bid} for (topic, bid) in self._buckets.keys()
    ]
list_topics() async

List all topics managed by this node.

Returns:

Type Description
list[str]

List of topic names

Source code in pulsing/streaming/manager.py
async def list_topics(self) -> list[str]:
    """List all topics managed by this node.

    Returns:
        List of topic names
    """
    return list(self._topics.keys())

SyncQueue(queue)

Synchronous queue wrapper

Source code in pulsing/streaming/sync_queue.py
def __init__(self, queue: "Queue"):
    self._queue = queue
    self._loop = queue._loop

SyncQueueReader(reader)

Synchronous reader wrapper

Source code in pulsing/streaming/sync_queue.py
def __init__(self, reader: "QueueReader"):
    self._reader = reader
    self._loop = reader.queue._loop

TopicAPI(system)

Topic API entry point via system.topic — delegates to write_topic/read_topic.

Source code in pulsing/streaming/__init__.py
def __init__(self, system: "ActorSystem"):
    self._system = system

TopicReader(system, topic, reader_id=None)

Topic read handle

Example

reader = await read_topic(system, "events")

@reader.on_message async def handle(msg): print(f"Received: {msg}")

await reader.start()

Source code in pulsing/streaming/pubsub.py
def __init__(self, system: ActorSystem, topic: str, reader_id: str | None = None):
    self._system = system
    self._topic = topic
    self._reader_id = reader_id or f"reader_{uuid.uuid4().hex[:8]}"
    self._callbacks: list[MessageCallback] = []
    self._subscriber_ref: "ActorRef | None" = None
    self._started = False

Functions

add_callback(callback)

Add message callback

Source code in pulsing/streaming/pubsub.py
def add_callback(self, callback: MessageCallback) -> None:
    """Add message callback"""
    self._callbacks.append(callback)
on_message(callback)

Register message callback (decorator style)

Example

@reader.on_message async def handle(msg): print(f"Received: {msg}")

Source code in pulsing/streaming/pubsub.py
def on_message(self, callback: MessageCallback) -> MessageCallback:
    """Register message callback (decorator style)

    Example:
        @reader.on_message
        async def handle(msg):
            print(f"Received: {msg}")
    """
    self._callbacks.append(callback)
    return callback
remove_callback(callback)

Remove message callback

Source code in pulsing/streaming/pubsub.py
def remove_callback(self, callback: MessageCallback) -> bool:
    """Remove message callback"""
    try:
        self._callbacks.remove(callback)
        return True
    except ValueError:
        return False
start() async

Start receiving messages

Source code in pulsing/streaming/pubsub.py
async def start(self) -> None:
    """Start receiving messages"""
    if self._started:
        return

    if not self._callbacks:
        raise ValueError("at least one callback required")

    # Create subscriber Actor
    actor_name = f"_topic_sub_{self._topic}_{self._reader_id}"
    subscriber = _SubscriberActor(self._callbacks)
    self._subscriber_ref = await self._system.spawn(
        subscriber, name=actor_name, public=True
    )

    # Register with broker using direct method call
    broker = await _get_broker(self._system, self._topic)
    await broker.subscribe(
        self._reader_id,
        actor_name,
        node_id=self._system.node_id.id,
    )

    self._started = True
    logger.debug(f"TopicReader[{self._reader_id}] started for topic: {self._topic}")
stats() async

Get topic statistics

Source code in pulsing/streaming/pubsub.py
async def stats(self) -> dict[str, Any]:
    """Get topic statistics"""
    broker = await _get_broker(self._system, self._topic)
    # Direct method call on broker proxy
    return await broker.get_stats()
stop() async

Stop receiving messages

Source code in pulsing/streaming/pubsub.py
async def stop(self) -> None:
    """Stop receiving messages"""
    if not self._started:
        return

    # Unsubscribe from broker using direct method call
    try:
        broker = await _get_broker(self._system, self._topic)
        await broker.unsubscribe(self._reader_id)
    except Exception as e:
        logger.warning(f"Unsubscribe error: {e}")

    # Stop subscriber Actor
    if self._subscriber_ref:
        try:
            actor_name = f"_topic_sub_{self._topic}_{self._reader_id}"
            await self._system.stop(actor_name)
        except Exception as e:
            logger.warning(f"Stop subscriber error: {e}")

    self._started = False
    self._subscriber_ref = None
    logger.debug(f"TopicReader[{self._reader_id}] stopped")

TopicWriter(system, topic, writer_id=None)

Topic write handle

Source code in pulsing/streaming/pubsub.py
def __init__(self, system: ActorSystem, topic: str, writer_id: str | None = None):
    self._system = system
    self._topic = topic
    self._writer_id = writer_id or f"writer_{uuid.uuid4().hex[:8]}"
    self._broker: "ActorProxy | None" = None

Functions

publish(message, mode=PublishMode.FIRE_AND_FORGET, timeout=None) async

Publish message

Parameters:

Name Type Description Default
message Any

Message to publish (any Python object)

required
mode PublishMode

Publish mode

FIRE_AND_FORGET
timeout float | None

Fanout timeout forwarded to the broker (seconds). The broker respects this deadline when waiting for subscriber acks and returns a structured PublishResult after the timeout. None uses the broker's default (DEFAULT_FANOUT_TIMEOUT).

None

Returns:

Name Type Description
PublishResult PublishResult

Publish result with delivery statistics.

Raises:

Type Description
PulsingRuntimeError

If the broker is unreachable.

Source code in pulsing/streaming/pubsub.py
async def publish(
    self,
    message: Any,
    mode: PublishMode = PublishMode.FIRE_AND_FORGET,
    timeout: float | None = None,
) -> PublishResult:
    """Publish message

    Args:
        message: Message to publish (any Python object)
        mode: Publish mode
        timeout: Fanout timeout forwarded to the broker (seconds).
                 The broker respects this deadline when waiting for subscriber acks
                 and returns a structured PublishResult after the timeout.
                 None uses the broker's default (DEFAULT_FANOUT_TIMEOUT).

    Returns:
        PublishResult: Publish result with delivery statistics.

    Raises:
        PulsingRuntimeError: If the broker is unreachable.
    """
    broker = await self._broker_ref()
    effective_timeout = timeout if timeout is not None else DEFAULT_PUBLISH_TIMEOUT

    data = await broker.publish(
        message,
        mode=mode.value,
        sender_id=self._writer_id,
        timeout=effective_timeout,
    )

    if data.get("timed_out"):
        raise asyncio.TimeoutError(
            f"Publish timed out after {effective_timeout}s "
            f"({data.get('delivered', 0)}/{data.get('subscriber_count', 0)} acks received)"
        )

    return PublishResult(
        success=data.get("success", False),
        delivered=data.get("delivered", 0),
        failed=data.get("failed", 0),
        subscriber_count=data.get("subscriber_count", 0),
        failed_subscribers=data.get("failed_subscribers"),
    )
stats() async

Get topic statistics

Source code in pulsing/streaming/pubsub.py
async def stats(self) -> dict[str, Any]:
    """Get topic statistics"""
    broker = await self._broker_ref()
    # Direct method call on broker proxy
    return await broker.get_stats()

Functions

get_backend_class(backend)

Get backend class

Parameters:

Name Type Description Default
backend str | type

Backend name (str) or backend class (type)

required

Returns:

Type Description
type

Backend class

Source code in pulsing/streaming/backend.py
def get_backend_class(backend: str | type) -> type:
    """Get backend class

    Args:
        backend: Backend name (str) or backend class (type)

    Returns:
        Backend class
    """
    if isinstance(backend, type):
        return backend

    if backend in _REGISTERED_BACKENDS:
        return _REGISTERED_BACKENDS[backend]

    if backend in _BUILTIN_BACKENDS:
        return _BUILTIN_BACKENDS[backend]

    available = list(_BUILTIN_BACKENDS.keys()) + list(_REGISTERED_BACKENDS.keys())
    raise ValueError(
        f"Unknown backend: {backend}. Available: {available}. "
        "Use register_backend() to add custom backends."
    )

get_bucket_ref(system, topic, bucket_id, batch_size=100, storage_path=None, backend=None, backend_options=None, max_redirects=3) async

Get ActorProxy for the specified bucket, following redirects automatically.

Source code in pulsing/streaming/manager.py
async def get_bucket_ref(
    system: ActorSystem,
    topic: str,
    bucket_id: int,
    batch_size: int = 100,
    storage_path: str | None = None,
    backend: str | type | None = None,
    backend_options: dict | None = None,
    max_redirects: int = 3,
) -> "ActorProxy":
    """Get ActorProxy for the specified bucket, following redirects automatically."""
    backend_name = (
        (backend if isinstance(backend, str) else backend.__name__) if backend else None
    )
    return await _follow_redirects(
        system,
        lambda mgr: mgr.get_bucket(
            topic=topic,
            bucket_id=bucket_id,
            batch_size=batch_size,
            storage_path=storage_path,
            backend=backend_name,
            backend_options=backend_options,
        ),
        lambda: BucketStorage.resolve(f"bucket_{topic}_{bucket_id}", system=system),
        f"bucket {topic}:{bucket_id}",
        max_redirects,
    )

get_storage_manager(system) async

Get StorageManager proxy for this node, create if not exists.

Returns:

Type Description
ActorProxy

ActorProxy for direct method calls on StorageManager

Source code in pulsing/streaming/manager.py
async def get_storage_manager(system: ActorSystem) -> "ActorProxy":
    """Get StorageManager proxy for this node, create if not exists.

    Returns:
        ActorProxy for direct method calls on StorageManager
    """
    local_node_id = system.node_id.id

    # Try to resolve local node's StorageManager
    try:
        return await StorageManager.resolve(
            STORAGE_MANAGER_NAME, system=system, node_id=local_node_id
        )
    except Exception:
        pass

    async with _get_manager_lock():
        # Check local node again
        try:
            return await StorageManager.resolve(
                STORAGE_MANAGER_NAME, system=system, node_id=local_node_id
            )
        except Exception:
            pass

        try:
            return await StorageManager.spawn(
                system, system=system, name=STORAGE_MANAGER_NAME, public=True
            )
        except Exception as e:
            if "already exists" in str(e).lower():
                return await StorageManager.resolve(
                    STORAGE_MANAGER_NAME, system=system, node_id=local_node_id
                )
            raise

get_topic_broker(system, topic, max_redirects=3) async

Get broker ActorProxy for the specified topic, following redirects automatically.

Source code in pulsing/streaming/manager.py
async def get_topic_broker(
    system: ActorSystem,
    topic: str,
    max_redirects: int = 3,
) -> "ActorProxy":
    """Get broker ActorProxy for the specified topic, following redirects automatically."""
    from pulsing.streaming.broker import TopicBroker

    return await _follow_redirects(
        system,
        lambda mgr: mgr.get_topic(topic=topic),
        lambda: TopicBroker.resolve(f"_topic_broker_{topic}", system=system),
        f"topic {topic}",
        max_redirects,
    )

list_backends()

List all available backends

Source code in pulsing/streaming/backend.py
def list_backends() -> list[str]:
    """List all available backends"""
    return list(_BUILTIN_BACKENDS.keys()) + list(_REGISTERED_BACKENDS.keys())

read_queue(system, topic, bucket_id=None, bucket_ids=None, rank=None, world_size=None, num_buckets=4, storage_path=None, backend='memory', backend_options=None) async

Open queue for reading

Supports three modes: 1. Default: Read from all buckets 2. bucket_id/bucket_ids: Read from specified buckets 3. rank/world_size: Distributed consumption, auto-assign buckets

Parameters:

Name Type Description Default
system ActorSystem

Actor system

required
topic str

Queue topic

required
bucket_id int | None

Single bucket ID (mutually exclusive with bucket_ids)

None
bucket_ids list[int] | None

List of bucket IDs (mutually exclusive with bucket_id)

None
rank int | None

Current consumer's rank (0 to world_size-1)

None
world_size int | None

Total number of consumers

None
num_buckets int

Number of buckets (for rank/world_size mode)

4
storage_path str | None

Storage path

None
backend str | type

Storage backend (must match backend used for writing)

'memory'
backend_options dict[str, Any] | None

Additional backend parameters

None
Example

Distributed consumption: 4 buckets, 2 consumers

reader0 = await read_queue(system, "q", rank=0, world_size=2) # bucket 0, 2 reader1 = await read_queue(system, "q", rank=1, world_size=2) # bucket 1, 3

Source code in pulsing/streaming/queue.py
async def read_queue(
    system: ActorSystem,
    topic: str,
    bucket_id: int | None = None,
    bucket_ids: list[int] | None = None,
    rank: int | None = None,
    world_size: int | None = None,
    num_buckets: int = 4,
    storage_path: str | None = None,
    backend: str | type = "memory",
    backend_options: dict[str, Any] | None = None,
) -> QueueReader:
    """Open queue for reading

    Supports three modes:
    1. Default: Read from all buckets
    2. bucket_id/bucket_ids: Read from specified buckets
    3. rank/world_size: Distributed consumption, auto-assign buckets

    Args:
        system: Actor system
        topic: Queue topic
        bucket_id: Single bucket ID (mutually exclusive with bucket_ids)
        bucket_ids: List of bucket IDs (mutually exclusive with bucket_id)
        rank: Current consumer's rank (0 to world_size-1)
        world_size: Total number of consumers
        num_buckets: Number of buckets (for rank/world_size mode)
        storage_path: Storage path
        backend: Storage backend (must match backend used for writing)
        backend_options: Additional backend parameters

    Example:
        # Distributed consumption: 4 buckets, 2 consumers
        reader0 = await read_queue(system, "q", rank=0, world_size=2)  # bucket 0, 2
        reader1 = await read_queue(system, "q", rank=1, world_size=2)  # bucket 1, 3
    """
    # Ensure all nodes in cluster have StorageManager
    from .manager import ensure_storage_managers

    await ensure_storage_managers(system)

    # Determine list of buckets to read from
    if rank is not None and world_size is not None:
        # Distributed consumption mode
        assigned_buckets = _assign_buckets(num_buckets, rank, world_size)
        logger.info(
            f"Reader rank={rank}/{world_size} assigned buckets: {assigned_buckets}"
        )
    elif bucket_id is not None:
        assigned_buckets = [bucket_id]
    elif bucket_ids is not None:
        assigned_buckets = bucket_ids
    else:
        assigned_buckets = None  # Read from all buckets

    # Create Queue (reader side doesn't need bucket_column for hashing, but it must
    # keep `num_buckets/storage_path/backend` consistent with writer).
    queue = Queue(
        system=system,
        topic=topic,
        num_buckets=num_buckets,
        storage_path=storage_path,
        backend=backend,
        backend_options=backend_options,
    )

    # Try to resolve existing bucket Actors
    if assigned_buckets:
        from .storage import BucketStorage

        for bid in assigned_buckets:
            # Must match `StorageManager` bucket actor naming: "bucket_{topic}_{bucket_id}"
            actor_name = f"bucket_{topic}_{bid}"
            try:
                # Use BucketStorage.resolve to get typed ActorProxy
                queue._bucket_refs[bid] = await BucketStorage.resolve(
                    actor_name, system=system
                )
            except Exception:
                pass

    return QueueReader(queue, bucket_ids=assigned_buckets)

read_topic(system, topic, reader_id=None, auto_start=False) async

Open topic for reading

Parameters:

Name Type Description Default
system ActorSystem

Actor system

required
topic str

Topic name

required
reader_id str | None

Reader ID (optional)

None
auto_start bool

Whether to automatically start receiving

False

Returns:

Name Type Description
TopicReader TopicReader

Read handle

Example

reader = await read_topic(system, "events")

@reader.on_message async def handle(msg): print(f"Received: {msg}")

await reader.start()

Source code in pulsing/streaming/pubsub.py
async def read_topic(
    system: ActorSystem,
    topic: str,
    reader_id: str | None = None,
    auto_start: bool = False,
) -> TopicReader:
    """Open topic for reading

    Args:
        system: Actor system
        topic: Topic name
        reader_id: Reader ID (optional)
        auto_start: Whether to automatically start receiving

    Returns:
        TopicReader: Read handle

    Example:
        reader = await read_topic(system, "events")

        @reader.on_message
        async def handle(msg):
            print(f"Received: {msg}")

        await reader.start()
    """
    reader = TopicReader(system, topic, reader_id)
    if auto_start:
        await reader.start()
    return reader

register_backend(name, backend_class)

Register a custom storage backend (e.g. from a plugin package).

Example

from my_plugin import MyBackend register_backend("my_backend", MyBackend) writer = await write_queue(system, "topic", backend="my_backend")

Source code in pulsing/streaming/backend.py
def register_backend(name: str, backend_class: type) -> None:
    """Register a custom storage backend (e.g. from a plugin package).

    Example:
        from my_plugin import MyBackend
        register_backend("my_backend", MyBackend)
        writer = await write_queue(system, "topic", backend="my_backend")
    """
    if not isinstance(backend_class, type):
        raise TypeError(f"backend_class must be a class, got {type(backend_class)}")
    _REGISTERED_BACKENDS[name] = backend_class
    logger.info(f"Registered storage backend: {name}")

subscribe_to_topic(system, topic, subscriber_id, actor_name, node_id=None) async

Subscribe an actor to a topic.

This is a helper function for manually registering subscribers with a topic broker. For normal usage, prefer using TopicReader which handles this automatically.

Parameters:

Name Type Description Default
system ActorSystem

ActorSystem instance

required
topic str

Topic name

required
subscriber_id str

Unique subscriber identifier

required
actor_name str

Name of the actor to receive messages

required
node_id int | None

Optional node ID (defaults to local node)

None

Returns:

Type Description
dict

Response dict from broker

Raises:

Type Description
RuntimeError

If subscription fails

Source code in pulsing/streaming/pubsub.py
async def subscribe_to_topic(
    system: ActorSystem,
    topic: str,
    subscriber_id: str,
    actor_name: str,
    node_id: int | None = None,
) -> dict:
    """Subscribe an actor to a topic.

    This is a helper function for manually registering subscribers with a topic broker.
    For normal usage, prefer using TopicReader which handles this automatically.

    Args:
        system: ActorSystem instance
        topic: Topic name
        subscriber_id: Unique subscriber identifier
        actor_name: Name of the actor to receive messages
        node_id: Optional node ID (defaults to local node)

    Returns:
        Response dict from broker

    Raises:
        RuntimeError: If subscription fails
    """
    broker = await _get_broker(system, topic)
    # Direct method call on broker proxy
    return await broker.subscribe(subscriber_id, actor_name, node_id)

write_queue(system, topic, bucket_column='id', num_buckets=4, batch_size=100, storage_path=None, backend='memory', backend_options=None) async

Open queue for writing, returns a Queue object.

Parameters:

Name Type Description Default
system ActorSystem

Actor system

required
topic str

Queue topic

required
bucket_column str

Bucketing column name

'id'
num_buckets int

Number of buckets

4
batch_size int

Batch size

100
storage_path str | None

Storage path

None
backend str | type

Storage backend ("memory" by default; or a registered name / class)

'memory'
backend_options dict[str, Any] | None

Additional backend parameters

None

Example::

queue = await write_queue(system, "my_queue")
await queue.put({"id": "1", "value": 42})
Source code in pulsing/streaming/queue.py
async def write_queue(
    system: ActorSystem,
    topic: str,
    bucket_column: str = "id",
    num_buckets: int = 4,
    batch_size: int = 100,
    storage_path: str | None = None,
    backend: str | type = "memory",
    backend_options: dict[str, Any] | None = None,
) -> Queue:
    """Open queue for writing, returns a ``Queue`` object.

    Args:
        system: Actor system
        topic: Queue topic
        bucket_column: Bucketing column name
        num_buckets: Number of buckets
        batch_size: Batch size
        storage_path: Storage path
        backend: Storage backend ("memory" by default; or a registered name / class)
        backend_options: Additional backend parameters

    Example::

        queue = await write_queue(system, "my_queue")
        await queue.put({"id": "1", "value": 42})
    """
    from .manager import ensure_storage_managers

    await ensure_storage_managers(system)
    return Queue(
        system=system,
        topic=topic,
        bucket_column=bucket_column,
        num_buckets=num_buckets,
        batch_size=batch_size,
        storage_path=storage_path,
        backend=backend,
        backend_options=backend_options,
    )

write_topic(system, topic, writer_id=None) async

Open topic for writing

Parameters:

Name Type Description Default
system ActorSystem

Actor system

required
topic str

Topic name

required
writer_id str | None

Writer ID (optional)

None

Returns:

Name Type Description
TopicWriter TopicWriter

Write handle

Example

writer = await write_topic(system, "events") await writer.publish({"type": "user_login"})

Source code in pulsing/streaming/pubsub.py
async def write_topic(
    system: ActorSystem,
    topic: str,
    writer_id: str | None = None,
) -> TopicWriter:
    """Open topic for writing

    Args:
        system: Actor system
        topic: Topic name
        writer_id: Writer ID (optional)

    Returns:
        TopicWriter: Write handle

    Example:
        writer = await write_topic(system, "events")
        await writer.publish({"type": "user_login"})
    """
    return TopicWriter(system, topic, writer_id)

Subprocess Module

pulsing.subprocess provides a subprocess-compatible synchronous API. Without resources, calls fall back to Python's native subprocess. When resources is provided and USE_POLSING_SUBPROCESS=1 is set, the command is executed through Pulsing's backend.

pulsing.subprocess — subprocess-compatible synchronous API.

Calls without resources delegate to Python's native subprocess module. Passing a non-empty resources=... runs the subprocess through Pulsing actors only when USE_POLSING_SUBPROCESS is enabled. In that resource-backed mode this module lazily initializes Pulsing internally, so callers do not need to call await pul.init() before using it.

Usage::

import pulsing.subprocess as subprocess

result = subprocess.run(["echo", "hello"], capture_output=True)
proc = subprocess.Popen(["cat"], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
stdout, _ = proc.communicate(input=b"hi")
remote = subprocess.run(
    ["echo", "hello from pulsing"],
    capture_output=True,
    resources={"num_cpus": 1},
)

Shell usage::

python examples/python/subprocess_example.py --resources
USE_POLSING_SUBPROCESS=1 python examples/python/subprocess_example.py --resources

Classes

Popen(args, **kwargs)

Subprocess-compatible wrapper that switches between native and Pulsing backends.

Source code in pulsing/subprocess/popen.py
def __init__(self, args, **kwargs):
    self.args = args
    self._args = args
    self._placement = kwargs.pop("placement", "local")
    self._system = kwargs.pop("system", None)
    self._name = kwargs.pop("name", None)
    self._resources = kwargs.pop("resources", None)
    self._spawn_kwargs = kwargs
    self._proxy = None
    self._native = None
    self._uses_pulsing = _should_use_pulsing(self._resources)

    self.returncode: int | None = None
    self.pid: int | None = None
    self.stdin = None
    self.stdout = None
    self.stderr = None

    if self._uses_pulsing:
        _ensure_module_owned_system()
        spawn_coro = self._aspawn()
        try:
            _run_sync(spawn_coro)
        except Exception:
            spawn_coro.close()
            raise
    else:
        self._spawn_native()

ProcessActor(args, *, stdin=None, stdout=None, stderr=None, cwd=None, env=None, shell=False, encoding=None, errors=None, text=False, bufsize=-1)

Actor that manages a subprocess.Popen instance.

Top-level methods (poll, wait, communicate, send_signal, terminate, kill) match subprocess.Popen exactly.

stdin/stdout/stderr are file objects in subprocess.Popen; actors cannot pass file objects across the network, so their operations are exposed as prefixed methods: stdin_write, stdin_close, stdout_read, stdout_readline, stderr_read, stderr_readline. The popen.py wrapper re-assembles these into .stdin / .stdout / .stderr proxy objects so user code looks identical to stdlib subprocess.

Source code in pulsing/subprocess/process.py
def __init__(
    self,
    args,
    *,
    stdin=None,
    stdout=None,
    stderr=None,
    cwd=None,
    env=None,
    shell=False,
    encoding=None,
    errors=None,
    text=False,
    bufsize=-1,
):
    self._proc = subprocess.Popen(
        args,
        stdin=stdin,
        stdout=stdout,
        stderr=stderr,
        cwd=cwd,
        env=env,
        shell=shell,
        encoding=encoding,
        errors=errors,
        text=text,
        bufsize=bufsize,
    )

Functions

run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, cwd=None, env=None, shell=False, encoding=None, errors=None, text=False, timeout=None, check=False, placement='local', system=None, name=None, resources=None)

Run a command and return subprocess.CompletedProcess.

Source code in pulsing/subprocess/popen.py
def run(
    args,
    *,
    stdin=None,
    input=None,
    stdout=None,
    stderr=None,
    capture_output: bool = False,
    cwd=None,
    env=None,
    shell: bool = False,
    encoding=None,
    errors=None,
    text: bool = False,
    timeout: float | None = None,
    check: bool = False,
    placement: str | int = "local",
    system=None,
    name: str | None = None,
    resources: dict | None = None,
) -> CompletedProcess:
    """Run a command and return ``subprocess.CompletedProcess``."""
    if capture_output:
        if stdout is not None or stderr is not None:
            raise ValueError("capture_output is mutually exclusive with stdout/stderr")
        stdout = subprocess.PIPE
        stderr = subprocess.PIPE

    if input is not None:
        if stdin is not None:
            raise ValueError("stdin and input are mutually exclusive")
        stdin = subprocess.PIPE

    proc = Popen(
        args,
        stdin=stdin,
        stdout=stdout,
        stderr=stderr,
        cwd=cwd,
        env=env,
        shell=shell,
        encoding=encoding,
        errors=errors,
        text=text,
        placement=placement,
        system=system,
        name=name,
        resources=resources,
    )

    try:
        out, err = proc.communicate(input=input, timeout=timeout)
    finally:
        proc.close()

    result = CompletedProcess(
        args=args, returncode=proc.returncode, stdout=out, stderr=err
    )
    if check:
        result.check_returncode()
    return result