Custom Protocols#
This guide covers creating custom coordination protocols for HERON.
Protocol Types#
HERON supports two types of protocols:
Type |
Direction |
Use Case |
|---|---|---|
Vertical |
Parent → Child |
Setpoints, prices, commands |
Horizontal |
Peer ↔ Peer |
Trading, consensus, negotiation |
Creating a Vertical Protocol#
Vertical protocols coordinate between hierarchy levels:
from heron.protocols.base import Protocol
from heron.agents.base import Agent
class TargetAllocationProtocol(Protocol):
"""Allocate targets from coordinator to subordinates."""
def __init__(self, allocation_strategy: str = "proportional"):
self.allocation_strategy = allocation_strategy
def execute(self, coordinator: Agent, target: float) -> dict:
"""Allocate target among subordinates.
Args:
coordinator: Coordinator agent with subordinates
target: Total target to allocate
Returns:
Dict mapping subordinate IDs to allocated targets
"""
subordinates = coordinator.subordinates
n = len(subordinates)
if self.allocation_strategy == "equal":
allocations = {sub_id: target / n for sub_id in subordinates}
elif self.allocation_strategy == "proportional":
# Allocate based on capacity
total_capacity = sum(
sub.state.capacity for sub in subordinates.values()
)
allocations = {
sub_id: target * (sub.state.capacity / total_capacity)
for sub_id, sub in subordinates.items()
}
elif self.allocation_strategy == "priority":
# Allocate to highest priority first
sorted_subs = sorted(
subordinates.items(),
key=lambda x: x[1].priority,
reverse=True
)
allocations = {}
remaining = target
for sub_id, sub in sorted_subs:
alloc = min(remaining, sub.state.capacity)
allocations[sub_id] = alloc
remaining -= alloc
# Apply allocations
for sub_id, alloc in allocations.items():
subordinates[sub_id].set_target(alloc)
return allocations
Creating a Horizontal Protocol#
Horizontal protocols coordinate between peers:
from heron.protocols.base import CommunicationProtocol
class AuctionProtocol(CommunicationProtocol):
"""Double auction protocol for resource trading."""
def __init__(self, max_rounds: int = 5):
self.max_rounds = max_rounds
self.bids = []
self.asks = []
def execute(self, agents: list[Agent]) -> dict:
"""Execute double auction among agents.
Args:
agents: List of participating agents
Returns:
Dict with matched trades
"""
# Collect bids and asks
self.bids = []
self.asks = []
for agent in agents:
offer = agent.make_offer()
if offer["type"] == "bid":
self.bids.append({
"agent_id": agent.agent_id,
"quantity": offer["quantity"],
"price": offer["price"]
})
elif offer["type"] == "ask":
self.asks.append({
"agent_id": agent.agent_id,
"quantity": offer["quantity"],
"price": offer["price"]
})
# Match bids and asks
trades = self._match_orders()
# Execute trades
for trade in trades:
buyer = next(a for a in agents if a.agent_id == trade["buyer"])
seller = next(a for a in agents if a.agent_id == trade["seller"])
buyer.receive_resource(trade["quantity"])
seller.deliver_resource(trade["quantity"])
return {"trades": trades, "total_volume": sum(t["quantity"] for t in trades)}
def _match_orders(self) -> list:
"""Match bids and asks using price-time priority."""
# Sort bids descending by price, asks ascending
sorted_bids = sorted(self.bids, key=lambda x: -x["price"])
sorted_asks = sorted(self.asks, key=lambda x: x["price"])
trades = []
bid_idx, ask_idx = 0, 0
while bid_idx < len(sorted_bids) and ask_idx < len(sorted_asks):
bid = sorted_bids[bid_idx]
ask = sorted_asks[ask_idx]
if bid["price"] >= ask["price"]:
# Match found
quantity = min(bid["quantity"], ask["quantity"])
price = (bid["price"] + ask["price"]) / 2
trades.append({
"buyer": bid["agent_id"],
"seller": ask["agent_id"],
"quantity": quantity,
"price": price
})
bid["quantity"] -= quantity
ask["quantity"] -= quantity
if bid["quantity"] == 0:
bid_idx += 1
if ask["quantity"] == 0:
ask_idx += 1
else:
break # No more matches possible
return trades
Protocol with Message Broker#
For distributed execution, protocols use the message broker:
from heron.protocols.base import CommunicationProtocol
from heron.messaging.base import MessageBroker
class DistributedConsensusProtocol(CommunicationProtocol):
"""Consensus protocol using message passing."""
def __init__(self, broker: MessageBroker, max_iterations: int = 10, tolerance: float = 0.01):
self.broker = broker
self.max_iterations = max_iterations
self.tolerance = tolerance
async def execute(self, agents: list[Agent]) -> dict:
"""Execute distributed consensus.
Args:
agents: List of participating agents
Returns:
Dict with consensus value and convergence info
"""
# Initialize values
values = {a.agent_id: a.get_local_value() for a in agents}
for iteration in range(self.max_iterations):
# Each agent broadcasts its value
for agent in agents:
await self.broker.publish(
f"consensus/{agent.agent_id}",
{"value": values[agent.agent_id], "iteration": iteration}
)
# Each agent receives neighbors' values and updates
new_values = {}
for agent in agents:
neighbor_values = []
for neighbor_id in agent.neighbors:
msg = await self.broker.consume(f"consensus/{neighbor_id}")
neighbor_values.append(msg["value"])
# Update with average
all_values = [values[agent.agent_id]] + neighbor_values
new_values[agent.agent_id] = np.mean(all_values)
# Check convergence
max_diff = max(
abs(new_values[aid] - values[aid])
for aid in values
)
values = new_values
if max_diff < self.tolerance:
return {
"consensus_value": np.mean(list(values.values())),
"iterations": iteration + 1,
"converged": True
}
return {
"consensus_value": np.mean(list(values.values())),
"iterations": self.max_iterations,
"converged": False
}
Registering Custom Protocols#
Register protocols for use in environments:
from heron.utils.registry import ProtocolRegistry
# Register custom protocol
ProtocolRegistry.register("auction", AuctionProtocol)
ProtocolRegistry.register("target_allocation", TargetAllocationProtocol)
# Use in environment config
env_config = {
"horizontal_protocol": "auction",
"vertical_protocol": "target_allocation",
}
Best Practices#
Separate logic from communication: Protocol logic should be independent of message transport
Handle failures gracefully: Timeouts, missing messages, partial participation
Document convergence: For iterative protocols, document convergence guarantees
Test both modes: Verify behavior in both centralized and distributed modes