AutoGen Integration¶
Pulsing implements PulsingRuntime, a distributed runtime for Microsoft AutoGen.
Features¶
- Distributed Execution - Run agents on different cluster nodes
- Service Discovery - Automatic agent discovery via Gossip protocol
- Load Balancing - Distribute workload across agent instances
- API Compatible - Drop-in replacement for
SingleThreadedAgentRuntime
Quick Start¶
from pulsing.integrations.autogen import PulsingRuntime
from autogen_core import AgentId, RoutedAgent, message_handler
class MyAgent(RoutedAgent):
@message_handler
async def handle_msg(self, message: str, ctx) -> str:
return f"Processed: {message}"
async def main():
# Standalone mode
runtime = PulsingRuntime()
await runtime.start()
# Register agent (same API as AutoGen)
await runtime.register_factory("my_agent", lambda: MyAgent())
# Send message
response = await runtime.send_message("Hello", AgentId("my_agent", "default"))
print(response)
await runtime.stop()
Distributed Mode¶
Node 1 (Seed)¶
runtime = PulsingRuntime(addr="0.0.0.0:8000")
await runtime.start()
await runtime.register_factory("writer", lambda: WriterAgent())
Node 2 (Worker)¶
runtime = PulsingRuntime(
addr="0.0.0.0:8001",
seeds=["192.168.1.100:8000"]
)
await runtime.start()
await runtime.register_factory("editor", lambda: EditorAgent())
Any Node (Client)¶
# Messages route automatically across cluster
await runtime.send_message(draft, AgentId("editor", "default"))
Configuration¶
| Parameter | Description | Default |
|---|---|---|
addr |
Node address | None (standalone) |
seeds |
Seed node addresses | [] |