Sunday, March 1, 2026
HomeArtificial IntelligenceLearn how to Design a Manufacturing-Grade Multi-Agent Communication System Utilizing LangGraph Structured...

Learn how to Design a Manufacturing-Grade Multi-Agent Communication System Utilizing LangGraph Structured Message Bus, ACP Logging, and Persistent Shared State Structure

On this tutorial, we construct a sophisticated multi-agent communication system utilizing a structured message bus structure powered by LangGraph and Pydantic. We outline a strict ACP-style message schema that permits brokers to speak by way of a shared state reasonably than calling one another straight, enabling modularity, traceability, and production-grade orchestration. We implement three specialised brokers, a Planner, Executor, and Validator, that coordinate by means of structured messages, persistent state, and routing logic. We additionally combine SQLite-based persistence to offer sturdy reminiscence throughout executions and visualize the agent communication move to know how messages propagate by means of the system.

!pip -q set up -U "pydantic==2.12.3"
!pip -q set up -U langgraph langchain-core networkx matplotlib
!pip -q set up -U langgraph-checkpoint-sqlite


import os
import json
import uuid
import sqlite3
from datetime import datetime, timezone
from typing import Any, Dict, Checklist, Literal, Non-compulsory, Tuple


from pydantic import BaseModel, Subject


import networkx as nx
import matplotlib.pyplot as plt


from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver




Position = Literal["planner", "executor", "validator", "user", "system"]
MsgType = Literal["task", "plan", "result", "validation", "error", "control"]


class ACPMessage(BaseModel):
   msg_id: str = Subject(default_factory=lambda: str(uuid.uuid4()))
   ts: str = Subject(default_factory=lambda: datetime.now(timezone.utc).isoformat().exchange("+00:00", "Z"))
   sender: Position
   receiver: Position
   msg_type: MsgType
   content material: str
   meta: Dict[str, Any] = Subject(default_factory=dict)
   hint: Dict[str, Any] = Subject(default_factory=dict)


def acp_log_path() -> str:
   os.makedirs("acp_logs", exist_ok=True)
   return os.path.be part of("acp_logs", "acp_messages.jsonl")


def append_acp_log(m: ACPMessage) -> None:
   with open(acp_log_path(), "a", encoding="utf-8") as f:
       f.write(m.model_dump_json() + "n")

We set up and import all of the required libraries wanted to construct a structured multi-agent communication system. We outline the ACP-style message schema utilizing Pydantic, which permits us to implement a strict and structured format for agent communication. We additionally implement structured logging to persist each message exchanged between brokers, enabling traceability and observability of the system.

class BusState(BaseModel):
   objective: str = ""
   performed: bool = False
   errors: Checklist[str] = Subject(default_factory=listing)
   mailbox: Checklist[ACPMessage] = Subject(default_factory=listing)
   edges: Checklist[Tuple[str, str, str]] = Subject(default_factory=listing)
   active_role: Position = "person"
   step: int = 0




def bus_update(
   state: BusState,
   sender: Position,
   receiver: Position,
   msg_type: MsgType,
   content material: str,
   meta: Non-compulsory[Dict[str, Any]] = None,
   hint: Non-compulsory[Dict[str, Any]] = None,
) -> Dict[str, Any]:
   m = ACPMessage(
       sender=sender,
       receiver=receiver,
       msg_type=msg_type,
       content material=content material,
       meta=meta or {},
       hint=hint or {},
   )
   append_acp_log(m)
   return {
       "objective": state.objective,
       "performed": state.performed,
       "errors": state.errors,
       "mailbox": state.mailbox + [m],
       "edges": state.edges + [(sender, receiver, msg_type)],
       "active_role": receiver,
       "step": state.step + 1,
   }

We outline the shared state construction that acts because the centralized message bus for all brokers. We implement the BusState class to retailer the objective, mailbox, routing data, and execution progress. We additionally create the bus_update operate, which permits us to generate structured messages, replace the shared state, and persistently persist message logs.

def planner_agent(state_dict: Dict[str, Any]) -> Dict[str, Any]:
   state = BusState.model_validate(state_dict)
   objective = state.objective.strip()
   if not objective:
       return bus_update(state, "planner", "validator", "error", "No objective offered.", meta={"motive": "empty_goal"})
   plan = [
       "Interpret the goal and extract requirements.",
       "Decide an execution strategy with clear outputs.",
       "Ask Executor to produce the result.",
       "Ask Validator to check correctness + completeness.",
   ]
   plan_text = "n".be part of([f"{i+1}. {p}" for i, p in enumerate(plan)])
   return bus_update(
       state,
       "planner",
       "executor",
       "plan",
       plan_text,
       meta={"objective": objective, "plan_steps": len(plan)},
       hint={"coverage": "deterministic_planner_v1"},
   )




def executor_agent(state_dict: Dict[str, Any]) -> Dict[str, Any]:
   state = BusState.model_validate(state_dict)
   objective = state.objective.strip()
   latest_plan = None
   for m in reversed(state.mailbox):
       if m.receiver == "executor" and m.msg_type == "plan":
           latest_plan = m.content material
           break
   consequence = {
       "objective": objective,
       "assumptions": [
           "We can produce a concise, actionable output.",
           "We can validate via rule-based checks.",
       ],
       "output": f"Executed activity for objective: {objective}",
       "deliverables": [
           "A clear summary",
           "A step-by-step action list",
           "Any constraints and edge cases",
       ],
       "plan_seen": bool(latest_plan),
   }
   result_text = json.dumps(consequence, indent=2)
   return bus_update(
       state,
       "executor",
       "validator",
       "consequence",
       result_text,
       meta={"artifact_type": "json", "bytes": len(result_text.encode("utf-8"))},
       hint={"coverage": "deterministic_executor_v1"},
   )

We implement the Planner and Executor brokers, which deal with activity planning and execution. We design the Planner agent to interpret the objective and generate a structured execution plan, which is then handed by means of the message bus. We implement the Executor agent to learn the plan, execute it, and produce a structured consequence artifact that downstream brokers can validate.

def validator_agent(state_dict: Dict[str, Any]) -> Dict[str, Any]:
   state = BusState.model_validate(state_dict)
   objective = state.objective.strip()
   latest_result = None
   for m in reversed(state.mailbox):
       if m.receiver == "validator" and m.msg_type in ("consequence", "error"):
           latest_result = m
           break
   if latest_result is None:
       upd = bus_update(state, "validator", "planner", "error", "No consequence to validate.", meta={"motive": "missing_result"})
       upd["done"] = True
       upd["errors"] = state.errors + ["missing_result"]
       return upd
   if latest_result.msg_type == "error":
       upd = bus_update(
           state,
           "validator",
           "planner",
           "validation",
           f"Validation failed as a result of upstream error occurred: {latest_result.content material}",
           meta={"standing": "fail"},
       )
       upd["done"] = True
       upd["errors"] = state.errors + [latest_result.content]
       return upd
   attempt:
       parsed = json.masses(latest_result.content material)
   besides Exception as e:
       upd = bus_update(
           state,
           "validator",
           "planner",
           "validation",
           f"Outcome just isn't legitimate JSON: {e}",
           meta={"standing": "fail"},
       )
       upd["done"] = True
       upd["errors"] = state.errors + [f"invalid_json: {e}"]
       return upd
   points = []
   if parsed.get("objective") != objective:
       points.append("Outcome.objective doesn't match enter objective.")
   if "deliverables" not in parsed or not isinstance(parsed["deliverables"], listing) or len(parsed["deliverables"]) == 0:
       points.append("Lacking or empty deliverables listing.")
   if points:
       upd = bus_update(
           state,
           "validator",
           "planner",
           "validation",
           "Validation failed:n- " + "n- ".be part of(points),
           meta={"standing": "fail", "points": points},
       )
       upd["done"] = True
       upd["errors"] = state.errors + points
       return upd
   upd = bus_update(
       state,
       "validator",
       "person",
       "validation",
       "Validation handed ✅ Outcome seems constant and full.",
       meta={"standing": "move"},
   )
   upd["done"] = True
   upd["errors"] = state.errors
   return upd




def route_next(state_dict: Dict[str, Any]) -> str:
   if state_dict.get("performed", False):
       return END
   position = state_dict.get("active_role", "person")
   if position == "planner":
       return "planner"
   if position == "executor":
       return "executor"
   if position == "validator":
       return "validator"
   return END

We implement the Validator agent and the routing logic that controls agent execution move. We design the Validator to examine the execution outcomes, confirm correctness, and generate validation outcomes by means of structured checks. We additionally implement the routing operate that dynamically determines which agent ought to execute subsequent, enabling coordinated multi-agent orchestration.

graph = StateGraph(dict)


graph.add_node("planner", planner_agent)
graph.add_node("executor", executor_agent)
graph.add_node("validator", validator_agent)


graph.set_entry_point("planner")


graph.add_conditional_edges("planner", route_next, {"planner": "planner", "executor": "executor", "validator": "validator", END: END})
graph.add_conditional_edges("executor", route_next, {"planner": "planner", "executor": "executor", "validator": "validator", END: END})
graph.add_conditional_edges("validator", route_next, {"planner": "planner", "executor": "executor", "validator": "validator", END: END})


os.makedirs("checkpoints", exist_ok=True)
db_path = "checkpoints/langgraph_bus.sqlite"
conn = sqlite3.join(db_path, check_same_thread=False)
checkpointer = SqliteSaver(conn)


app = graph.compile(checkpointer=checkpointer)




def run_thread(objective: str, thread_id: str) -> BusState:
   init = BusState(objective=objective, active_role="planner", performed=False).model_dump()
   final_state_dict = app.invoke(init, config={"configurable": {"thread_id": thread_id}})
   return BusState.model_validate(final_state_dict)




thread_id = "demo-thread-001"
objective = "Design an ACP-style message bus the place planner/executor/validator coordinate by means of shared state."


final_state = run_thread(objective, thread_id)
print("Achieved:", final_state.performed)
print("Steps:", final_state.step)
print("Errors:", final_state.errors)


print("nLast 5 messages:")
for m in final_state.mailbox[-5:]:
   print(f"- [{m.msg_type}] {m.sender} -> {m.receiver}: {m.content material[:80]}")


snapshot = checkpointer.get_tuple({"configurable": {"thread_id": thread_id}})
cp = snapshot.checkpoint or {}
cv = cp.get("channel_values", {}) or {}
sv = cp.get("state", {}) or {}
vals = cv if isinstance(cv, dict) and len(cv) else sv if isinstance(sv, dict) else {}


print("nCheckpoint keys:", listing(cp.keys()))
if isinstance(cv, dict):
   print("channel_values keys:", listing(cv.keys())[:30])
if isinstance(sv, dict):
   print("state keys:", listing(sv.keys())[:30])


print("nPersisted step (best-effort):", vals.get("step", "NOT_FOUND"))
print("Continued active_role (best-effort):", vals.get("active_role", "NOT_FOUND"))


print("nACP logs:", acp_log_path())
print("Checkpoint DB:", db_path)




G = nx.DiGraph()
G.add_edge("planner", "executor")
G.add_edge("executor", "validator")
G.add_edge("validator", "person")


plt.determine(figsize=(6, 4))
pos = nx.spring_layout(G, seed=7)
nx.draw(G, pos, with_labels=True, node_size=1800, font_size=10, arrows=True)
plt.title("Orchestration Graph: Planner → Executor → Validator")
plt.present()




comm = nx.MultiDiGraph()
for (s, r, t) in final_state.edges:
   comm.add_edge(s, r, label=t)


plt.determine(figsize=(8, 5))
pos2 = nx.spring_layout(comm, seed=11)
nx.draw(comm, pos2, with_labels=True, node_size=1800, font_size=10, arrows=True)
plt.title("Communication Graph from Structured Message Bus (Runtime Edges)")
plt.present()




def tail_jsonl(path: str, n: int = 8) -> Checklist[Dict[str, Any]]:
   if not os.path.exists(path):
       return []
   with open(path, "r", encoding="utf-8") as f:
       strains = f.readlines()[-n:]
   return [json.loads(x) for x in lines]




print("nLast ACP log entries:")
for row in tail_jsonl(acp_log_path(), 6):
   print(f"{row['msg_type']:>10} | {row['sender']} -> {row['receiver']} | {row['ts']}")

We assemble the LangGraph state graph, allow SQLite-based persistence, and execute the multi-agent workflow. We use a thread identifier to make sure the agent state will be saved and recovered reliably throughout executions. We additionally visualize the orchestration and communication graphs and examine endured logs, which permits us to know how brokers work together by means of the structured message bus.

On this tutorial, we efficiently designed and applied a structured multi-agent communication framework utilizing LangGraph’s shared-state structure and ACP-style message-bus rules. We enabled brokers to function independently whereas speaking by means of structured, persistent messages, which improves reliability, observability, and scalability. We logged each interplay, endured agent state throughout executions, and visualized communication patterns to achieve deep perception into agent coordination. This structure permits us to construct strong, modular, and production-ready multi-agent methods that may be prolonged with further brokers, LLM reasoning, reminiscence methods, and sophisticated routing methods.


Take a look at the Full Codes right hereAdditionally, be at liberty to observe us on Twitter and don’t neglect to hitch our 120k+ ML SubReddit and Subscribe to our Publication. Wait! are you on telegram? now you may be part of us on telegram as nicely.


RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments