Multi-Agent Coordination Patterns

Reviewed: June 4, 2026

Published: May 26, 2026 | Multi-Agent Coordination Architecture

When a single agent can’t handle the complexity, you need multiple agents working together. But multi-agent systems introduce coordination overhead, communication costs, and emergent failure modes. Here’s how to manage them.

Coordination Topologies

class AgentTopology:
    """Define how agents communicate and coordinate."""
    
    class StarTopology:
        """One orchestrator agent delegates to specialist agents."""
        def __init__(self, orchestrator, specialists: dict):
            self.orchestrator = orchestrator
            self.specialists = specialists
        
        def execute(self, task: str) -> dict:
            # Orchestrator plans and delegates
            plan = self.orchestrator.plan(task)
            results = {}
            for step in plan:
                agent = self.specialists[step["agent"]]
                results[step["id"]] = agent.execute(step["task"])
            return self.orchestrator.synthesize(results)
    
    class PeerTopology:
        """Equal agents communicate directly."""
        def __init__(self, agents: list, shared_memory):
            self.agents = agents
            self.memory = shared_memory
        
        def execute(self, task: str) -> dict:
            # Agents bid on tasks based on capability
            bids = [(a, a.bid(task)) for a in self.agents]
            winner = max(bids, key=lambda x: x[1])
            result = winner[0].execute(task)
            self.memory.store(task, result)
            return result
    
    class PipelineTopology:
        """Sequential processing through agent chain."""
        def __init__(self, stages: list):
            self.stages = stages
        
        def execute(self, input_data: dict) -> dict:
            data = input_data
            for stage in self.stages:
                data = stage.process(data)
                if data.get("error"):
                    break
            return data

Conflict Resolution

class ConflictResolver:
    """Resolve disagreements between agents."""
    
    def resolve_by_voting(self, proposals: list[dict]) -> dict:
        """Majority vote on discrete choices."""
        from collections import Counter
        choices = [p["decision"] for p in proposals]
        winner = Counter(choices).most_common(1)[0]
        return {"decision": winner[0], "confidence": winner[1] / len(proposals)}
    
    def resolve_by_arbitrator(self, proposals: list[dict], 
                               arbitrator) -> dict:
        """Use a trusted arbitrator to decide."""
        return arbitrator.judge(proposals)
    
    def resolve_by_merge(self, proposals: list[dict]) -> dict:
        """Merge compatible proposals."""
        merged = {}
        for p in proposals:
            for key, value in p.items():
                if key not in merged:
                    merged[key] = value
                elif merged[key] != value:
                    merged[key] = self._prefer_higher_confidence(
                        merged[key], value
                    )
        return merged

Shared Memory Patterns

class SharedAgentMemory:
    """Thread-safe shared memory for multi-agent systems."""
    
    def __init__(self):
        self.facts = {}         # key -> value
        self.pending = []       # Tasks awaiting processing
        self.completed = []     # Completed task results
        self.lock = threading.Lock()
    
    def write(self, key: str, value: any, agent_id: str):
        with self.lock:
            self.facts[key] = {
                "value": value,
                "written_by": agent_id,
                "timestamp": datetime.utcnow().isoformat()
            }
    
    def read(self, key: str) -> Optional[any]:
        entry = self.facts.get(key)
        return entry["value"] if entry else None
    
    def publish_task(self, task: dict):
        with self.lock:
            self.pending.append({
                **task,
                "status": "pending",
                "created_at": datetime.utcnow().isoformat()
            })
    
    def claim_task(self, agent_id: str) -> Optional[dict]:
        with self.lock:
            for task in self.pending:
                if task["status"] == "pending":
                    task["status"] = "claimed"
                    task["claimed_by"] = agent_id
                    return task
            return None

Anti-Patterns to Avoid

Part of the Agent Architecture series on DataGate.ch.

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert