Example 6: Distributed Mode with Proxy Agent#

This example demonstrates distributed execution mode where agents communicate via message brokers instead of direct function calls.

What You’ll Learn#

  • Setting up distributed execution mode

  • Using Proxy for message-based coordination

  • Implementing realistic communication patterns

Architecture#

┌─────────────────────────────────────────────────────────┐
│                    Message Broker                        │
│  ┌───────────────────────────────────────────────────┐  │
│  │           pub/sub channels                         │  │
│  └───────────────────────────────────────────────────┘  │
│         ▲              ▲              ▲                  │
│         │              │              │                  │
│    ┌────┴────┐    ┌────┴────┐    ┌────┴────┐           │
│    │ Proxy 1 │    │ Proxy 2 │    │ Proxy 3 │           │
│    │  (MG1)  │    │  (MG2)  │    │  (MG3)  │           │
│    └────┬────┘    └────┬────┘    └────┬────┘           │
│         │              │              │                  │
│    ┌────┴────┐    ┌────┴────┐    ┌────┴────┐           │
│    │ GridAgt │    │ GridAgt │    │ GridAgt │           │
│    └─────────┘    └─────────┘    └─────────┘           │
└─────────────────────────────────────────────────────────┘

Code#

from heron.messaging.memory import InMemoryBroker
from heron.agents import Proxy
from powergrid.envs import MultiAgentMicrogrids

# Create message broker
broker = InMemoryBroker()

# Configure distributed mode
env_config = {
    "centralized": False,  # Enable distributed mode
    "message_broker": broker,
    "max_episode_steps": 24,
}

env = MultiAgentMicrogrids(env_config)

Proxy Agent Setup#

Proxy wraps actual agents for message-based communication:

from heron.agents import Proxy
from heron.messaging.memory import InMemoryBroker

broker = InMemoryBroker()

# Create proxy for each grid agent
proxy = Proxy(
    agent_id="mg1_proxy",
    actual_agent=grid_agent,
    broker=broker,
    upstream_id="system_operator",
)

# Distributed step (async)
await proxy.step_distributed()

Message Flow#

1. System Operator broadcasts price signal
   └── Message: {type: "price", value: 55.0}

2. Proxies receive and forward to actual agents
   └── Each GridAgent computes response

3. GridAgents send actions back through proxies
   └── Message: {type: "action", p_gen: 1.5, p_ess: -0.3}

4. Horizontal protocol messages (P2P trading)
   └── MG1 → MG2: {type: "offer", quantity: 0.5, price: 52.0}
   └── MG2 → MG1: {type: "accept", quantity: 0.5}

Running the Example#

cd case_studies/power
python examples/06_distributed_mode_with_proxy.py

Key Concepts#

InMemoryBroker#

For local simulation without external message systems:

from heron.messaging.memory import InMemoryBroker

broker = InMemoryBroker()

# Subscribe to channels
broker.subscribe("control", callback=handle_control)

# Publish messages
broker.publish("control", {"setpoint": 1.0})

Extending to Production#

The MessageBroker interface can be extended for production systems:

from heron.messaging.base import MessageBroker

class KafkaBroker(MessageBroker):
    """Kafka-based message broker for production."""

    def __init__(self, bootstrap_servers: list[str]):
        self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
        self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers)

    async def publish(self, channel: str, message: dict):
        self.producer.send(channel, json.dumps(message).encode())

    async def consume(self, channel: str) -> dict:
        for msg in self.consumer:
            if msg.topic == channel:
                return json.loads(msg.value.decode())

Centralized vs Distributed Comparison#

# Centralized (fast, for training)
env_central = MultiAgentMicrogrids({"centralized": True})
# Observation: Full state vector
# Communication: Direct function calls

# Distributed (realistic, for deployment)
env_dist = MultiAgentMicrogrids({"centralized": False, "message_broker": broker})
# Observation: Local state + received messages
# Communication: Async message passing

Performance Considerations#

Aspect

Centralized

Distributed

Step time

~10ms

~50ms

Observability

Full

Local + messages

Scalability

Limited

High

Realism

Low

High

Next Steps#