Tuesday, January 27, 2026
HomeArtificial IntelligenceHow a Haystack-Powered Multi-Agent System Detects Incidents, Investigates Metrics and Logs, and...

How a Haystack-Powered Multi-Agent System Detects Incidents, Investigates Metrics and Logs, and Produces Manufacturing-Grade Incident Evaluations Finish-to-Finish

@device
def sql_investigate(question: str) -> dict:
   strive:
       df = con.execute(question).df()
       head = df.head(30)
       return {
           "rows": int(len(df)),
           "columns": listing(df.columns),
           "preview": head.to_dict(orient="data")
       }
   besides Exception as e:
       return {"error": str(e)}


@device
def log_pattern_scan(window_start_iso: str, window_end_iso: str, top_k: int = 8) -> dict:
   ws = pd.to_datetime(window_start_iso)
   we = pd.to_datetime(window_end_iso)
   df = logs_df[(logs_df["ts"] >= ws) & (logs_df["ts"] <= we)].copy()
   if df.empty:
       return {"rows": 0, "top_error_kinds": [], "top_services": [], "top_endpoints": []}
   df["error_kind_norm"] = df["error_kind"].fillna("").substitute("", "NONE")
   err = df[df["level"].isin(["WARN","ERROR"])].copy()
   top_err = err["error_kind_norm"].value_counts().head(int(top_k)).to_dict()
   top_svc = err["service"].value_counts().head(int(top_k)).to_dict()
   top_ep = err["endpoint"].value_counts().head(int(top_k)).to_dict()
   by_region = err.groupby("area").dimension().sort_values(ascending=False).head(int(top_k)).to_dict()
   p95_latency = float(np.percentile(df["latency_ms"].values, 95))
   return {
       "rows": int(len(df)),
       "warn_error_rows": int(len(err)),
       "p95_latency_ms": p95_latency,
       "top_error_kinds": top_err,
       "top_services": top_svc,
       "top_endpoints": top_ep,
       "error_by_region": by_region
   }


@device
def propose_mitigations(speculation: str) -> dict:
   h = speculation.decrease()
   mitigations = []
   if "conn" in h or "pool" in h or "db" in h:
       mitigations += [
           {"action": "Increase DB connection pool size (bounded) and add backpressure at db-proxy", "owner": "Platform", "eta_days": 3},
           {"action": "Add circuit breaker + adaptive timeouts between api-gateway and db-proxy", "owner": "Backend", "eta_days": 5},
           {"action": "Tune query hotspots; add indexes for top offending endpoints", "owner": "Data/DBA", "eta_days": 7},
       ]
   if "timeout" in h or "upstream" in h:
       mitigations += [
           {"action": "Implement hedged requests for idempotent calls (carefully) and tighten retry budgets", "owner": "Backend", "eta_days": 6},
           {"action": "Add upstream SLO-aware load shedding at api-gateway", "owner": "Platform", "eta_days": 7},
       ]
   if "cache" in h:
       mitigations += [
           {"action": "Add request coalescing and negative caching to prevent cache-miss storms", "owner": "Backend", "eta_days": 6},
           {"action": "Prewarm cache for top endpoints during deploys", "owner": "SRE", "eta_days": 4},
       ]
   if not mitigations:
       mitigations += [
           {"action": "Add targeted dashboards and alerts for the suspected bottleneck metric", "owner": "SRE", "eta_days": 3},
           {"action": "Run controlled load test to reproduce and validate the hypothesis", "owner": "Perf Eng", "eta_days": 5},
       ]
   mitigations = mitigations[:10]
   return {"speculation": speculation, "mitigations": mitigations}


@device
def draft_postmortem(title: str, window_start_iso: str, window_end_iso: str, customer_impact: str, suspected_root_cause: str, key_facts_json: str, mitigations_json: str) -> dict:
   strive:
       information = json.masses(key_facts_json)
   besides Exception:
       information = {"be aware": "key_facts_json was not legitimate JSON"}
   strive:
       mits = json.masses(mitigations_json)
   besides Exception:
       mits = {"be aware": "mitigations_json was not legitimate JSON"}
   doc = {
       "title": title,
       "date_utc": datetime.utcnow().strftime("%Y-%m-%d"),
       "incident_window_utc": {"begin": window_start_iso, "finish": window_end_iso},
       "customer_impact": customer_impact,
       "suspected_root_cause": suspected_root_cause,
       "detection": {
           "how_detected": "Automated anomaly detection + error-rate spike triage",
           "gaps": ["Add earlier saturation alerting", "Improve symptom-to-cause correlation dashboards"]
       },
       "timeline": [
           {"t": window_start_iso, "event": "Symptoms begin (latency/error anomalies)"},
           {"t": "T+10m", "event": "On-call begins triage; identifies top services/endpoints"},
           {"t": "T+25m", "event": "Mitigation actions initiated (throttling/backpressure)"},
           {"t": window_end_iso, "event": "Customer impact ends; metrics stabilize"},
       ],
       "key_facts": information,
       "corrective_actions": mits.get("mitigations", mits),
       "followups": [
           {"area": "Reliability", "task": "Add saturation signals + budget-based retries", "priority": "P1"},
           {"area": "Observability", "task": "Add golden signals per service/endpoint", "priority": "P1"},
           {"area": "Performance", "task": "Reproduce with load test and validate fix", "priority": "P2"},
       ],
       "appendix": {"notes": "Generated by a Haystack multi-agent workflow (non-RAG)."}
   }
   return {"postmortem_json": doc}


llm = OpenAIChatGenerator(mannequin="gpt-4o-mini")


state_schema = {
   "metrics_csv_path": {"kind": str},
   "logs_csv_path": {"kind": str},
   "metrics_summary": {"kind": dict},
   "logs_summary": {"kind": dict},
   "incident_window": {"kind": dict},
   "investigation_notes": {"kind": listing, "handler": merge_lists},
   "speculation": {"kind": str},
   "key_facts": {"kind": dict},
   "mitigation_plan": {"kind": dict},
   "postmortem": {"kind": dict},
}


profiler_prompt = """You're a specialist incident profiler.
Aim: flip uncooked metrics/log summaries into crisp, high-signal findings.
Guidelines:
- Desire calling instruments over guessing.
- Output have to be a JSON object with keys: window, signs, top_contributors, speculation, key_facts.
- Speculation have to be falsifiable and point out not less than one particular service and mechanism.
"""


writer_prompt = """You're a specialist postmortem author.
Aim: produce a high-quality postmortem JSON (not prose) utilizing the offered proof and mitigation plan.
Guidelines:
- Name instruments provided that wanted.
- Preserve 'suspected_root_cause' particular and never generic.
- Guarantee corrective actions have homeowners and eta_days.
"""


coordinator_prompt = """You might be an incident commander coordinating a non-RAG multi-agent workflow.
You could:
1) Load inputs
2) Discover an incident window (use p95_ms or error_rate)
3) Examine with focused SQL and log sample scan
4) Ask the specialist profiler to synthesize proof
5) Suggest mitigations
6) Ask the specialist author to draft a postmortem JSON
Return a remaining response with:
- A brief government abstract (max 10 traces)
- The postmortem JSON
- A compact runbook guidelines (bulleted)
"""


profiler_agent = Agent(
   chat_generator=llm,
   instruments=[load_inputs, detect_incident_window, sql_investigate, log_pattern_scan],
   system_prompt=profiler_prompt,
   exit_conditions=["text"],
   state_schema=state_schema
)


writer_agent = Agent(
   chat_generator=llm,
   instruments=[draft_postmortem],
   system_prompt=writer_prompt,
   exit_conditions=["text"],
   state_schema=state_schema
)


profiler_tool = ComponentTool(
   element=profiler_agent,
   title="profiler_specialist",
   description="Synthesizes incident proof right into a falsifiable speculation and key information (JSON output).",
   outputs_to_string={"supply": "last_message"}
)


writer_tool = ComponentTool(
   element=writer_agent,
   title="postmortem_writer_specialist",
   description="Drafts a postmortem JSON utilizing title/window/affect/rca/information/mitigations.",
   outputs_to_string={"supply": "last_message"}
)


coordinator_agent = Agent(
   chat_generator=llm,
   instruments=[
       load_inputs,
       detect_incident_window,
       sql_investigate,
       log_pattern_scan,
       propose_mitigations,
       profiler_tool,
       writer_tool,
       draft_postmortem
   ],
   system_prompt=coordinator_prompt,
   exit_conditions=["text"],
   state_schema=state_schema
)

RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments