Messaging#
Message broker system for distributed agent communication.
MessageBroker Interface#
- class heron.messaging.base.MessageBroker#
Abstract base class for message brokers.
- abstractmethod async publish(channel: str, message: dict) None#
Publish message to a channel.
- Parameters:
channel – Channel name
message – Message payload
- abstractmethod async consume(channel: str) dict#
Consume next message from channel.
- Parameters:
channel – Channel name
- Returns:
Message payload
- abstractmethod subscribe(channel: str, callback: Callable) None#
Subscribe to channel with callback.
- Parameters:
channel – Channel name
callback – Function called for each message
- abstractmethod create_channel(channel: str) None#
Create a new channel.
- Parameters:
channel – Channel name
InMemoryBroker#
- class heron.messaging.memory.InMemoryBroker#
In-memory message broker for local simulation.
Usage:
from heron.messaging.memory import InMemoryBroker broker = InMemoryBroker() # Create channels broker.create_channel("control") broker.create_channel("status") # Subscribe with callback def handle_control(msg): print(f"Received: {msg}") broker.subscribe("control", handle_control) # Publish message await broker.publish("control", {"setpoint": 1.0}) # Or consume directly msg = await broker.consume("status")
ChannelManager#
- class heron.messaging.base.ChannelManager#
Utility for managing broker channels.
- create_agent_channels(agent_ids: list)#
Create channels for a list of agents.
- get_channel_name(from_id: str, to_id: str) str#
Get channel name for agent-to-agent communication.
Usage:
from heron.messaging.base import ChannelManager manager = ChannelManager(broker) # Create channels for all agent pairs manager.create_agent_channels(["mg1", "mg2", "mg3"]) # Get specific channel channel = manager.get_channel_name("mg1", "mg2")
Extending for Production#
Implement custom brokers for production systems:
from heron.messaging.base import MessageBroker
import json
class KafkaBroker(MessageBroker):
"""Kafka-based message broker."""
def __init__(self, bootstrap_servers: list[str]):
from kafka import KafkaProducer, KafkaConsumer
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode()
)
self.consumers = {}
async def publish(self, channel: str, message: dict):
self.producer.send(channel, message)
self.producer.flush()
async def consume(self, channel: str) -> dict:
if channel not in self.consumers:
from kafka import KafkaConsumer
self.consumers[channel] = KafkaConsumer(
channel,
bootstrap_servers=self.bootstrap_servers,
value_deserializer=lambda v: json.loads(v.decode())
)
for msg in self.consumers[channel]:
return msg.value
def create_channel(self, channel: str):
# Kafka topics are auto-created or pre-configured
pass
def subscribe(self, channel: str, callback):
# Implement callback-based subscription
pass
class RedisBroker(MessageBroker):
"""Redis-based message broker."""
def __init__(self, host: str = "localhost", port: int = 6379):
import redis
self.client = redis.Redis(host=host, port=port)
async def publish(self, channel: str, message: dict):
self.client.publish(channel, json.dumps(message))
async def consume(self, channel: str) -> dict:
pubsub = self.client.pubsub()
pubsub.subscribe(channel)
for msg in pubsub.listen():
if msg["type"] == "message":
return json.loads(msg["data"])
def create_channel(self, channel: str):
pass # Redis channels are dynamic
def subscribe(self, channel: str, callback):
pubsub = self.client.pubsub()
pubsub.subscribe(**{channel: callback})