LangGraph Integration¶
Pulsing provides with_pulsing(), a one-line wrapper for distributed LangGraph execution.
Features¶
- One-line Integration - Wrap any compiled graph
- Node Mapping - Route specific nodes to remote workers
- API Compatible - Same
ainvoke(),invoke(),astream()methods - State Serialization - Automatic state transfer between nodes
Quick Start¶
from pulsing.integrations.langgraph import with_pulsing
from langgraph.graph import StateGraph
# Build graph as usual
graph = StateGraph(MyState)
graph.add_node("llm", llm_fn)
graph.add_node("tool", tool_fn)
app = graph.compile()
# One line to enable distributed execution
distributed_app = with_pulsing(
app,
node_mapping={
"llm": "langgraph_node_llm",
"tool": "langgraph_node_tool",
},
seeds=["gpu-server:8001"],
)
# Use exactly the same way
result = await distributed_app.ainvoke({"messages": [...]})
Node Mapping¶
The node_mapping parameter defines which nodes execute remotely:
- Key: LangGraph node name
- Value: Pulsing Actor name on remote worker
- Unmapped nodes: Execute locally
node_mapping={
"llm": "langgraph_node_llm", # LLM → GPU server
"tool": "langgraph_node_tool", # Tool → CPU server
}
Starting Workers¶
from pulsing.integrations.langgraph import start_worker
# GPU server
await start_worker("llm", llm_node, addr="0.0.0.0:8001")
# CPU server (joins cluster)
await start_worker("tool", tool_node, addr="0.0.0.0:8002", seeds=["gpu:8001"])
Configuration¶
| Parameter | Description | Default |
|---|---|---|
node_mapping |
Node to Actor mapping | {} (all local) |
addr |
This node's address | None (auto) |
seeds |
Seed node addresses | [] |
Example¶
API Reference¶
with_pulsing¶
def with_pulsing(
compiled_graph,
*,
node_mapping: dict[str, str] | None = None,
addr: str | None = None,
seeds: list[str] | None = None,
) -> PulsingGraphWrapper
start_worker¶
async def start_worker(
node_name: str,
node_func: Callable,
*,
addr: str,
seeds: list[str] | None = None,
actor_name: str | None = None,
) -> None