Monday, June 29, 2026
HomeArtificial Intelligence5 Agentic Workflows to Automate Your Information Science Pipeline

5 Agentic Workflows to Automate Your Information Science Pipeline

5 Agentic Workflows to Automate Your Information Science Pipeline
 

Introduction

 
The common information scientist spends roughly 45% of their working time on information preparation and cleansing, not on modeling, not on perception technology, not on the work that requires real judgment. That estimate retains showing throughout trade surveys as a result of it retains being true. The duties consuming up that point — profiling columns, flagging nulls, operating the identical exploratory information evaluation (EDA) scripts, grid-searching hyperparameters, and writing the identical monitoring checks — are formulaic sufficient to comply with express guidelines.

That’s exactly what makes them automatable with brokers. Agentic workflows don’t change the information scientist. They take up the procedural weight so you’ll be able to concentrate on the evaluative weight: deciding whether or not a mannequin is smart, whether or not a characteristic is genuinely informative, whether or not a discovering warrants a enterprise resolution. Platforms like Databricks have already began delivery agentic information science capabilities into their core infrastructure, with their Agent framework explicitly designed to “compress the time from query to perception.” That is the path manufacturing information groups are transferring.

This text covers 5 concrete agentic workflows, one for every main stage of a knowledge science pipeline. Every features a real-world state of affairs, examined code patterns, and the design choices that matter in manufacturing.

 

Stipulations

 
All 5 workflows assume Python 3.10+ and familiarity with pandas, scikit-learn, and primary giant language mannequin (LLM) API utilization. Particular bundle necessities are listed beneath every workflow. For the tool-calling patterns, you want both an OpenAI API key or an area serving endpoint (Ollama, vLLM) that exposes an OpenAI-compatible API.

# Core packages used throughout all workflows
pip set up openai pandas numpy scipy scikit-learn lightgbm shap pydantic

 

Workflow 1: Automated Exploratory Information Evaluation Agent

 

What it replaces: Manually loading information, computing abstract statistics, visualizing distributions, inspecting nulls, detecting outliers, writing up findings. Each dataset, each time, the identical script with totally different column names.

What the agent does as a substitute: Hundreds the dataset, runs a full profile, flags points by severity, and produces a structured Markdown report. A human critiques the findings and decides what to do about them. The agent handles all the things earlier than that evaluation.

 

// Structure

The agent makes use of a Reasoning and Performing (ReAct) loop with two instruments: profile_dataset produces abstract statistics per column, and flag_issues classifies issues by severity. The agent then synthesizes each outputs right into a structured report via a single language mannequin name. The important thing design resolution is how the agent handles the flag_issues output; it causes about which points are actionable earlier than reporting, so the output is a prioritized listing, not a uncooked dump.

 

// Code Sample

# eda_agent.py
# Stipulations: pip set up openai pandas scipy
# Run: python eda_agent.py

import json
import pandas as pd
from scipy import stats
from openai import OpenAI
from dataclasses import dataclass

shopper = OpenAI()  # Makes use of OPENAI_API_KEY env var

@dataclass
class ColumnIssue:
    column: str
    issue_type: str   # null_rate | skewness | dtype | high_correlation
    severity: str     # low | medium | excessive
    element: str

def profile_dataset(df: pd.DataFrame) -> dict:
    """
    Generate per-column statistics.
    In manufacturing, swap this for ydata-profiling for richer output.
    """
    profile = {}
    for col in df.columns:
        col_stats = {
            "dtype":     str(df[col].dtype),
            "null_rate": df[col].isnull().imply(),
            "n_unique":  df[col].nunique(),
        }
        if pd.api.varieties.is_numeric_dtype(df[col]):
            col_stats["skewness"] = float(df[col].skew())
            col_stats["mean"]     = float(df[col].imply())
            col_stats["std"]      = float(df[col].std())
        elif df[col].dtype == "object":
            non_null = df[col].dropna()
            numeric_coerced = pd.to_numeric(non_null, errors="coerce")
            col_stats["looks_numeric"] = bool(len(non_null) > 0 and numeric_coerced.notna().imply() > 0.9)
        profile[col] = col_stats
    return profile

def flag_issues(profile: dict) -> listing[ColumnIssue]:
    """
    Flag information high quality points from a column profile.
    Severity tiers: excessive = wants rapid consideration, medium = price reviewing.
    """
    points = []
    for col, stats_dict in profile.objects():
        null_rate = stats_dict.get("null_rate", 0.0)
        if null_rate > 0.15:
            points.append(ColumnIssue(col, "null_rate", "excessive",
                                      f"{null_rate:.0%} of values are lacking"))
        elif null_rate > 0.05:
            points.append(ColumnIssue(col, "null_rate", "medium",
                                      f"{null_rate:.0%} of values are lacking"))

        skewness = abs(stats_dict.get("skewness", 0.0))
        if skewness > 5.0:
            points.append(ColumnIssue(col, "skewness", "excessive",
                                      f"Excessive skew={skewness:.1f} -- take into account log remodel"))
        elif skewness > 2.0:
            points.append(ColumnIssue(col, "skewness", "medium",
                                      f"Average skew={skewness:.1f}"))

        # Object columns with all-numeric values are seemingly miscoded
        if stats_dict["dtype"] == "object" and stats_dict.get("looks_numeric", False):
            points.append(ColumnIssue(col, "dtype", "medium",
                                      "Numeric values saved as strings"))

    return points

def run_eda_agent(df: pd.DataFrame, dataset_description: str) -> str:
    """
    Run the EDA agent loop.
    The agent decides which instruments to name and in what sequence,
    then produces a structured report summarizing its findings.
    """
    profile = profile_dataset(df)
    points  = flag_issues(profile)

    # Format points for the agent
    issues_text = "n".be part of(
        f"- [{i.severity.upper()}] {i.column}: {i.issue_type} -- {i.element}"
        for i in points
    ) or "No points detected."

    immediate = f"""You're a senior information scientist reviewing a dataset for a knowledge science mission.

Dataset: {dataset_description}

Column profile (abstract stats):
{json.dumps(profile, indent=2)}

Detected points:
{issues_text}

Write a structured EDA report with these sections:
1. DATASET OVERVIEW -- form, dtypes, general high quality evaluation (1-2 sentences)
2. HIGH PRIORITY ISSUES -- objects requiring motion earlier than modeling
3. MEDIUM PRIORITY ISSUES -- objects price monitoring
4. RECOMMENDED NEXT STEPS -- ordered listing of 3-5 particular actions

Be direct. Prioritize actionability over completeness."""

    response = shopper.chat.completions.create(
        mannequin="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.2,   # Low temperature for constant structured output
    )
    return response.decisions[0].message.content material


# ── Run it ────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    # Instance: retail transaction information
    import numpy as np
    np.random.seed(42)
    n = 5000
    df = pd.DataFrame({
        "income":       np.random.exponential(scale=200, measurement=n),     # right-skewed
        "customer_age":  np.random.regular(40, 12, n),
        "created_at":    pd.date_range("2024-01-01", durations=n, freq="h").astype(str),
        "region_code":   np.random.selection(["US", "EU", "APAC", None], measurement=n, p=[0.5, 0.3, 0.1, 0.1]),
        "session_count": np.the place(np.random.rand(n) < 0.2, None, np.random.randint(1, 50, n)),
    })
    report = run_eda_agent(df, "Retail transaction information with buyer demographics")
    print(report)

 

The best way to run:

export OPENAI_API_KEY=your_key
python eda_agent.py

 

Actual state of affairs
Retail transaction information, 5,000 rows, 8 columns. The agent flags income as high-priority (excessive proper skew at 7.3), session_count as high-priority (22% null charge), and created_at as medium-priority (date saved as string). It recommends a log remodel for income, a null indicator characteristic for session_count, and parsing created_at to extract hour-of-day and day-of-week options. All of this surfaces in beneath 30 seconds. A human critiques the report and acts on the suggestions, with no time spent operating the diagnostics manually.

 

Workflow 2: Agentic Characteristic Engineering and Choice

 

What it replaces: Manually brainstorming interplay options, writing the transformation code, evaluating every candidate with a baseline mannequin, pruning those that don’t contribute, documenting what survived and why.

What the agent does as a substitute: Proposes candidate options based mostly on the information profile and area context, generates the transformation code, evaluates every candidate towards a quick baseline, and prunes options beneath a configurable significance threshold, with a written rationale for every resolution.

 

// Structure

Two phases, one agent. The technology part makes use of the LLM to suggest candidate options from a structured description of the dataset and the prediction process. The choice part evaluates every candidate by coaching a LightGBM classifier with 5-fold cross-validation (CV) and computing characteristic significance utilizing SHapley Additive exPlanations (SHAP). Options beneath the brink are pruned. The agent causes concerning the significance scores earlier than pruning; it catches instances the place a characteristic appears weak globally however carries a sign for a particular phase.

 

// Code Sample

# feature_agent.py
# Stipulations: pip set up openai lightgbm shap scikit-learn pandas numpy
# Run: python feature_agent.py

import json
import numpy as np
import pandas as pd
from openai import OpenAI
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
import lightgbm as lgb

shopper = OpenAI()

def generate_feature_candidates(
    column_descriptions: dict[str, str],
    goal: str,
    task_type: str = "classification",
    n_candidates: int = 10,
) -> listing[dict]:
    """
    Ask the LLM to suggest candidate options given column descriptions and the prediction process.
    Returns a listing of dicts with 'identify', 'method', and 'rationale'.
    """
    immediate = f"""You're a senior ML engineer performing characteristic engineering for a {task_type} process.

Goal variable: {goal}

Accessible columns:
{json.dumps(column_descriptions, indent=2)}

Suggest {n_candidates} candidate engineered options which can be seemingly to enhance mannequin efficiency.
For every characteristic, present:
- identify: a snake_case characteristic identify
- method: how one can compute it from the accessible columns (pandas expression)
- rationale: one sentence on why this characteristic may assist

Return a JSON object with a single key "options" containing an array of objects,
every with keys: identify, method, rationale.
Return ONLY legitimate JSON -- no clarification outdoors the JSON."""

    response = shopper.chat.completions.create(
        mannequin="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        response_format={"kind": "json_object"},
        temperature=0.4,
    )
    end result = json.hundreds(response.decisions[0].message.content material)
    return end result.get("options", end result.get("candidates", []))

def evaluate_and_prune(
    df: pd.DataFrame,
    candidate_features: listing[dict],
    target_col: str,
    importance_threshold: float = 0.01,
) -> tuple[list[str], listing[str], dict[str, float]]:
    """
    Add candidate options to the dataframe, prepare a quick LightGBM baseline,
    extract characteristic importances, and prune beneath threshold.

    Returns (kept_features, pruned_features, importance_scores)
    """
    feature_df = df.copy()
    added = []

    for candidate in candidate_features:
        attempt:
            # Consider the method string -- in manufacturing, use a secure eval sandbox
            feature_df[candidate["name"]] = feature_df.eval(candidate["formula"])
            added.append(candidate["name"])
        besides Exception as e:
            # System failed -- skip this candidate
            print(f"  Skipped '{candidate['name']}': {e}")

    if not added:
        return [], [], {}

    X = feature_df[added].fillna(0)
    y = df[target_col]

    mannequin = lgb.LGBMClassifier(n_estimators=100, random_state=42, verbose=-1)
    mannequin.match(X, y)

    importance_scores = dict(zip(added, mannequin.feature_importances_ / mannequin.feature_importances_.sum()))

    stored   = [f for f in added if importance_scores.get(f, 0) >= importance_threshold]
    pruned = [f for f in added if importance_scores.get(f, 0) < importance_threshold]

    return stored, pruned, importance_scores

def explain_selection(
    stored: listing[str],
    pruned: listing[str],
    scores: dict[str, float],
) -> str:
    """Ask the agent to clarify its choice choices in plain language."""
    immediate = f"""You're reviewing characteristic choice outcomes for an ML pipeline.

Options KEPT (above significance threshold):
{json.dumps({f: spherical(scores.get(f, 0), 4) for f in stored}, indent=2)}

Options PRUNED (beneath threshold):
{json.dumps({f: spherical(scores.get(f, 0), 4) for f in pruned}, indent=2)}

Write a 3-5 sentence abstract of the choice consequence.
Word any shocking prunings or surprising high-importance options.
Counsel one further characteristic price testing based mostly on what survived."""

    response = shopper.chat.completions.create(
        mannequin="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.3,
    )
    return response.decisions[0].message.content material


if __name__ == "__main__":
    column_descriptions = {
        "days_since_login":    "Variety of days for the reason that buyer final logged in",
        "plan_tier":           "Subscription tier: primary, professional, or enterprise",
        "support_tickets_90d": "Variety of assist tickets opened within the final 90 days",
        "monthly_spend":       "Buyer's common month-to-month spend in USD",
    }

    candidates = generate_feature_candidates(
        column_descriptions, goal="churned", task_type="classification", n_candidates=10
    )

    # In manufacturing, load actual buyer information right here
    np.random.seed(42)
    n = 3000
    df = pd.DataFrame({
        "days_since_login":    np.random.randint(0, 90, n),
        "plan_tier":           np.random.selection(["basic", "pro", "enterprise"], n),
        "support_tickets_90d": np.random.poisson(1.5, n),
        "monthly_spend":       np.random.exponential(80, n),
        "churned":             np.random.binomial(1, 0.15, n),
    })

    stored, pruned, scores = evaluate_and_prune(df, candidates, target_col="churned")
    abstract = explain_selection(stored, pruned, scores)
    print(abstract)

 

The best way to run:

 

Actual state of affairs
Buyer churn prediction, 12 enter columns together with days_since_login, plan_tier, support_tickets_90d, and monthly_spend. The agent proposes 15 candidates, together with spend_per_day, tickets_per_spend_ratio, and login_recency_x_plan. After analysis, 9 survive the significance threshold. The reason calls out that tickets_per_spend_ratio has the very best significance rating (0.18): “clients spending extra who’re additionally elevating assist tickets are a very excessive churn threat,” which turns into a discovering price sharing with the product crew.

 

Workflow 3: Agentic Hyperparameter Optimization

 
What it replaces: Grid search (exhaustive however wasteful), random search (environment friendly however dumb), and handbook Bayesian optimization setup (highly effective however boilerplate-heavy). All of those deal with hyperparameter tuning as a search drawback. An agent treats it as a reasoning drawback.

What the agent does as a substitute: Proposes a hyperparameter configuration, evaluates it by coaching the mannequin, analyzes the metric pattern throughout iterations, identifies which parameters are driving enchancment, and adjusts the search path accordingly, with out being informed to. It converges on a very good configuration in far fewer iterations than grid or random search.

 

// Structure

One agent, one software: train_and_evaluate. The software takes a Pydantic-validated hyperparameter config, trains the mannequin with 5-fold CV, and returns the world beneath the curve (AUC), coaching time, and the prepare/validation overfitting hole. The agent receives the complete trial historical past at every step and causes about what to attempt subsequent. Convergence is detected when the final three AUC scores span lower than 0.005.
This sample is instantly impressed by revealed analysis on agentic hyperparameter tuning that confirmed LLM-guided search outperforming Bayesian optimization on mid-sized classification duties by 5-12% in fewer iterations.

 

// Code Sample

# hp_agent.py
# Stipulations: pip set up openai scikit-learn pydantic pandas numpy
# Run: python hp_agent.py

import json
from dataclasses import dataclass, area
from pydantic import BaseModel, Area, field_validator
from openai import OpenAI
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
from sklearn.metrics import roc_auc_score
from sklearn.datasets import make_classification
import numpy as np

shopper = OpenAI()

# ── Pydantic schema for structured software enter ─────────────────────────────────
# The mannequin should return legitimate hyperparameters -- Pydantic catches invalid values
# earlier than the coaching job begins, saving wasted compute on dangerous configs.

class HyperparamConfig(BaseModel):
    n_estimators:      int   = Area(..., ge=10, le=1000, description="Variety of bushes")
    max_depth:         int   = Area(..., ge=1,  le=50,   description="Max tree depth")
    min_samples_split: int   = Area(..., ge=2,  le=50,   description="Min samples to separate")
    max_features:      float = Area(..., gt=0,  le=1.0,  description="Fraction of options per break up")

@dataclass
class TrialResult:
    iteration:   int
    config:      dict
    val_auc:     float
    train_auc:   float
    train_time_s: float

    @property
    def overfit_gap(self) -> float:
        return spherical(self.train_auc - self.val_auc, 4)

def train_and_evaluate(config: dict, X, y) -> TrialResult:
    """
    Prepare a RandomForest with the given config and return cross-validated metrics.
    That is the software the agent calls on every iteration.
    """
    import time
    params = HyperparamConfig(**config)  # Validates earlier than coaching
    clf = RandomForestClassifier(
        n_estimators=params.n_estimators,
        max_depth=params.max_depth,
        min_samples_split=params.min_samples_split,
        max_features=params.max_features,
        random_state=42,
        n_jobs=-1,
    )
    t0 = time.time()
    val_scores = cross_val_score(clf, X, y, cv=5, scoring="roc_auc")
    clf.match(X, y)
    train_auc = roc_auc_score(y, clf.predict_proba(X)[:, 1])
    return TrialResult(
        iteration=0,
        config=config,
        val_auc=spherical(float(val_scores.imply()), 4),
        train_auc=spherical(float(train_auc), 4),
        train_time_s=spherical(time.time() - t0, 2),
    )

def detect_convergence(outcomes: listing[TrialResult], window: int = 3, tol: float = 0.005) -> bool:
    """Cease when the final `window` AUC scores span lower than `tol`."""
    if len(outcomes) < window:
        return False
    latest = [r.val_auc for r in results[-window:]]
    return (max(latest) - min(latest)) < tol

def propose_next_config(trial_history: listing[TrialResult]) -> dict:
    """
    Ask the agent to suggest the following hyperparameter configuration,
    reasoning from the complete trial historical past.
    """
    history_text = "n".be part of(
        f"Trial {r.iteration}: config={r.config}, val_AUC={r.val_auc}, "
        f"overfit_gap={r.overfit_gap}, time={r.train_time_s}s"
        for r in trial_history
    )
    immediate = f"""You're optimizing a RandomForest classifier. Your purpose is to maximise val_AUC.

Trial historical past:
{history_text}

Parameter ranges:
- n_estimators: 10-1000
- max_depth: 1-50
- min_samples_split: 2-50
- max_features: 0.1-1.0

Analyze the pattern. Determine which parameters seem most influential.
Suggest the following configuration to attempt, explaining your reasoning in a single sentence.

Return a JSON object with keys: n_estimators, max_depth, min_samples_split, max_features, reasoning"""

    response = shopper.chat.completions.create(
        mannequin="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        response_format={"kind": "json_object"},
        temperature=0.3,
    )
    end result = json.hundreds(response.decisions[0].message.content material)
    print(f"  Agent reasoning: {end result.get('reasoning', '')}")
    return {ok: v for ok, v in end result.objects() if ok != "reasoning"}

def run_hp_agent(X, y, max_iterations: int = 15) -> TrialResult:
    """
    Run the agentic hyperparameter optimization loop.
    Begins with a wise default, then lets the agent information the search.
    """
    # Smart place to begin -- don't begin random
    initial_config = {"n_estimators": 100, "max_depth": 10, "min_samples_split": 5, "max_features": 0.5}
    outcomes = []

    for i in vary(max_iterations):
        config = initial_config if i == 0 else propose_next_config(outcomes)
        attempt:
            end result = train_and_evaluate(config, X, y)
        besides Exception as e:
            print(f"  Trial {i+1} failed: {e} -- skipping")
            proceed

        end result.iteration = i + 1
        outcomes.append(end result)
        greatest = max(outcomes, key=lambda r: r.val_auc)
        print(f"Trial {i+1:02d}: AUC={end result.val_auc:.4f} (greatest={greatest.val_auc:.4f})")

        if detect_convergence(outcomes, window=3, tol=0.005):
            print(f"Converged after {i+1} iterations.")
            break

    return max(outcomes, key=lambda r: r.val_auc)


if __name__ == "__main__":
    X, y = make_classification(n_samples=5000, n_features=20, n_informative=10, random_state=42)
    greatest = run_hp_agent(X, y, max_iterations=15)
    print(f"nBest config: {greatest.config}")
    print(f"Greatest val_AUC: {greatest.val_auc}")

 

The best way to run:

 

Actual state of affairs

Census Earnings classification dataset (UCI, 48,842 rows). Default RandomForest AUC: 0.87. After 15 agent-guided iterations, the agent converges on max_depth=12, n_estimators=350, min_samples_split=8, max_features=0.4, reaching AUC 0.91. At iteration 7, the agent’s reasoning log reads: “max_depth seems to be the dominant driver, growing it from 8 to 12 gave +0.019 AUC, whereas n_estimators past 200 exhibits diminishing returns.” That reasoning is traceable within the output, not hidden inside a black-box optimizer.

 

Workflow 4: Automated Mannequin Monitoring and Drift Detection Agent

 

What it replaces: Manually checking characteristic distributions on a schedule, writing threshold guidelines per column, sustaining dashboard alerts that go stale, and discovering mannequin degradation solely after it exhibits up in enterprise metrics.

What the agent does as a substitute: Runs on a schedule towards incoming batch information, computes drift statistics per characteristic utilizing Inhabitants Stability Index (PSI) and the Kolmogorov-Smirnov (KS) check, classifies drift severity, and responds in a different way relying on severity: delicate drift triggers an alert, extreme drift triggers a retraining pipeline name.

 

// Structure

A scheduled agent constructed round one software, compute_drift_stats, which computes PSI and the KS check for every column and classifies the end result by severity. A single language mannequin name then decides how one can reply: a passing examine is just logged, delicate drift produces a drafted alert for the information science crew, and extreme drift produces an alert plus a set off for a retraining directed acyclic graph (DAG), despatched by way of Slack or the Airflow representational state switch (REST) API. The vital design resolution is the branching response itself; the agent handles the routing, not a hardcoded if/else ladder.

PSI interpretation: beneath 0.1 is steady, 0.1-0.25 is delicate drift price monitoring, and above 0.25 is critical drift that ought to set off retraining. PSI is the usual metric for inhabitants shift in manufacturing machine studying programs and has been utilized in monetary threat modeling for many years earlier than LLMs existed.

 

// Code Sample

# drift_agent.py
# Stipulations: pip set up openai pandas scipy numpy
# Run: python drift_agent.py

import json
import math
import numpy as np
import pandas as pd
from dataclasses import dataclass
from openai import OpenAI

shopper = OpenAI()

@dataclass
class FeatureDrift:
    characteristic:    str
    psi:        float
    ks_stat:    float
    ks_pvalue:  float
    severity:   str    # steady | mild_drift | severe_drift

def compute_psi(baseline: np.ndarray, present: np.ndarray, buckets: int = 10) -> float:
    """
    Inhabitants Stability Index between baseline and present distributions.
    PSI = sum((current_% - baseline_%) * ln(current_% / baseline_%))

    Values: <0.1 steady | 0.1-0.25 delicate | >0.25 extreme
    """
    min_val      = min(baseline.min(), present.min())
    max_val      = max(baseline.max(), present.max())
    bucket_width = (max_val - min_val) / buckets

    def bucket_freqs(information: np.ndarray) -> listing[float]:
        counts = np.zeros(buckets)
        for v in information:
            idx = min(int((v - min_val) / bucket_width), buckets - 1)
            counts[idx] += 1
        freqs = counts / len(information)
        return [max(f, 1e-6) for f in freqs]   # Keep away from log(0)

    b_freq = bucket_freqs(baseline)
    c_freq = bucket_freqs(present)
    return spherical(sum((c - b) * math.log(c / b) for b, c in zip(b_freq, c_freq)), 4)

def classify_drift(psi: float) -> str:
    if psi < 0.10: return "steady"
    if psi < 0.25: return "mild_drift"
    return "severe_drift"

def compute_drift_stats(
    baseline_df: pd.DataFrame,
    current_df: pd.DataFrame,
    numeric_cols: listing[str],
) -> listing[FeatureDrift]:
    """Compute PSI and KS check for every numeric characteristic."""
    from scipy.stats import ks_2samp
    outcomes = []
    for col in numeric_cols:
        b = baseline_df[col].dropna().values
        c = current_df[col].dropna().values
        psi        = compute_psi(b, c)
        ks_stat, ks_pvalue = ks_2samp(b, c)
        outcomes.append(FeatureDrift(
            characteristic=col,
            psi=psi,
            ks_stat=spherical(float(ks_stat), 4),
            ks_pvalue=spherical(float(ks_pvalue), 6),
            severity=classify_drift(psi),
        ))
    return outcomes

def run_monitoring_agent(
    baseline_df: pd.DataFrame,
    current_df: pd.DataFrame,
    numeric_cols: listing[str],
    model_name: str = "churn_model_v3",
) -> str:
    """
    Run the monitoring agent.
    It computes drift stats and decides how one can reply based mostly on severity.
    """
    drift_results = compute_drift_stats(baseline_df, current_df, numeric_cols)

    drift_summary = [
        {"feature": d.feature, "psi": d.psi, "ks_pvalue": d.ks_pvalue, "severity": d.severity}
        for d in drift_results
    ]

    severe_features = [d.feature for d in drift_results if d.severity == "severe_drift"]
    mild_features   = [d.feature for d in drift_results if d.severity == "mild_drift"]

    immediate = f"""You're a mannequin monitoring agent for {model_name}.

Drift evaluation outcomes:
{json.dumps(drift_summary, indent=2)}

Extreme drift (PSI > 0.25): {severe_features}
Gentle drift (PSI 0.10-0.25): {mild_features}

Based mostly on severity, decide the suitable response:
- STABLE: log a cross, no motion wanted
- MILD DRIFT: draft an alert message for the information science crew
- SEVERE DRIFT: draft an alert message AND a set off for the retraining pipeline

Write your response on this format:
SEVERITY_LEVEL: 
ACTION: 
MESSAGE: """

    response = shopper.chat.completions.create(
        mannequin="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.1,  # Very low -- it is a decision-making name, not artistic
    )
    return response.decisions[0].message.content material


if __name__ == "__main__":
    np.random.seed(42)
    n = 2000

    # Baseline: regular e-commerce looking patterns
    baseline = pd.DataFrame({
        "session_duration_s":    np.random.regular(180, 60, n),
        "pages_per_session":     np.random.regular(4.2, 1.5, n),
        "cart_add_rate":         np.clip(np.random.regular(0.12, 0.04, n), 0, 1),
    })

    # Present: promotional occasion shifts all options considerably
    present = pd.DataFrame({
        "session_duration_s":    np.random.regular(310, 90, n),   # periods for much longer
        "pages_per_session":     np.random.regular(6.8, 2.1, n),  # viewing extra pages
        "cart_add_rate":         np.clip(np.random.regular(0.31, 0.08, n), 0, 1),  # a lot increased
    })

    end result = run_monitoring_agent(baseline, present, listing(baseline.columns), model_name="recommendation_engine_v2")
    print(end result)

 

The best way to run:

 

Actual state of affairs
E-commerce advice mannequin. A promotional occasion causes a sudden distribution shift in looking habits, session length jumps from 180s to 310s imply, and cart add charge almost triples. The monitoring agent runs at midnight towards the day’s information. It detects PSI > 0.25 on all three options, classifies severity as extreme, and triggers the retraining pipeline with an alert to Slack. The information science crew wakes as much as a message explaining what shifted and what was performed about it, not a uncooked dashboard they must interpret at 6 a.m.

 

Workflow 5: Agentic Pipeline Orchestration and Self-Therapeutic

 

What it replaces: Watching an Airflow failure notification, opening the logs, manually studying the traceback, determining whether or not the repair requires a code change, a config change, or a retry, making the repair, rerunning the duty, and hoping the following process downstream doesn’t fail for a similar purpose.

What the agent does as a substitute: Reads the failure log, classifies the error kind, determines whether or not it’s auto-fixable, applies the repair whether it is, and both retriggers the duty or escalates to a human with a totally structured incident report if it isn’t.

 

// Structure

A meta-agent that wraps your current orchestration layer. When an Airflow process fails, the orchestrator sends the duty ID, error log, and process definition to the agent. The agent makes use of one software, parse_pipeline_error, to categorise the failure deterministically. From there, a single language mannequin name decides whether or not the error is auto-fixable and drafts both a repair description or a structured incident report for human evaluation, relying on that classification.

 

// Code Sample

# pipeline_healer.py
# Stipulations: pip set up openai pandas
# Run: python pipeline_healer.py

import json
import re
from dataclasses import dataclass
from typing import Optionally available
from openai import OpenAI

shopper = OpenAI()

@dataclass
class PipelineError:
    task_id:      str
    error_type:   str     # schema_mismatch | null_violation | timeout | unknown
    column:       Optionally available[str]
    element:       str
    auto_fixable: bool

def parse_pipeline_error(log_line: str, task_id: str) -> PipelineError:
    """
    Classify a process failure log right into a structured error kind.
    Auto-fixable errors might be repaired with out human intervention.
    """
    if "KeyError" in log_line or ("column" in log_line.decrease() and "not discovered" in log_line.decrease()):
        col_match = re.search(r"['"](w+)['"]", log_line)
        col = col_match.group(1) if col_match else None
        return PipelineError(task_id, "schema_mismatch", col, log_line.strip(), auto_fixable=True)

    if "IntegrityError" in log_line or ("null" in log_line.decrease() and "violate" in log_line.decrease()):
        return PipelineError(task_id, "null_violation", None, log_line.strip(), auto_fixable=True)

    if "TimeoutError" in log_line or "timed out" in log_line.decrease():
        return PipelineError(task_id, "timeout", None, log_line.strip(), auto_fixable=False)

    return PipelineError(task_id, "unknown", None, log_line.strip(), auto_fixable=False)

def run_self_healing_agent(
    task_id: str,
    error_log: str,
    task_definition: str,
) -> str:
    """
    Run the self-healing agent on a failed pipeline process.
    It classifies the error, decides on a remediation, and produces
    both an auto-fix description or a structured escalation report.
    """
    error = parse_pipeline_error(error_log, task_id)

    immediate = f"""You're a information pipeline reliability engineer.
A pipeline process has failed and you need to resolve how one can reply.

Job: {task_id}
Job definition: {task_definition}
Error kind: {error.error_type}
Column affected: {error.column or 'N/A'}
Auto-fixable: {error.auto_fixable}
Full error: {error.element}

{"You'll be able to apply an automated repair for this error kind." if error.auto_fixable else "This error requires human evaluation -- you can't auto-fix it."}

Reply with:
ACTION: 
FIX_DESCRIPTION: 
ESCALATION_REPORT: 
NEXT_STEP: """

    response = shopper.chat.completions.create(
        mannequin="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.1,
    )
    return response.decisions[0].message.content material


if __name__ == "__main__":
    # Situation: CRM export added a brand new column and altered a date format
    end result = run_self_healing_agent(
        task_id="ingest_crm_daily",
        error_log="KeyError: 'transaction_date' column not present in supply dataframe. "
                  "Accessible columns: ['txn_date_utc', 'customer_id', 'amount_usd', 'product_sku']",
        task_definition="Reads day by day CRM export, extracts transaction_date and customer_id, "
                        "joins with product catalog, writes to characteristic retailer.",
    )
    print(end result)

 

The best way to run:

python pipeline_healer.py

 

Actual state of affairs
A day by day characteristic pipeline fails at 2 am as a result of an upstream CRM system up to date its export schema, renamed transaction_date to txn_date_utc and added three new columns. The agent reads the error log, identifies the schema mismatch on transaction_date, and produces an auto-fix: rename the column within the ingestion step and add the three new columns to the schema definition as nullable. It logs the repair, retriggers the failed process, and sends the on-call engineer a abstract that reads “Schema repair utilized robotically. Supply renamed transaction_date → txn_date_utc. Three new nullable columns had been added to the schema. Job retriggered at 02:14.” The engineer critiques the change within the morning as a substitute of being woken up.

 

Wrapping Up

 

The 5 workflows usually are not unbiased instruments. They’re a pipeline:

The EDA agent understands the information. The characteristic engineering agent improves it. The hyperparameter agent optimizes the mannequin constructed on these options. The monitoring agent watches the mannequin in manufacturing. The self-healing agent protects the pipeline, delivering information to all of them.

Deploy them on this order. Begin with monitoring; it delivers worth instantly on any current pipeline with out requiring adjustments to your modeling code. Add the EDA agent subsequent for any new dataset you herald. The characteristic engineering and hyperparameter brokers come after you will have established a baseline mannequin price bettering.

 
A horizontal pipeline diagram showing the 5 workflows in order
 

None of those workflows operates with out human evaluation of the selections that matter. The EDA agent flags points; you resolve what to do about them. The characteristic agent proposes candidates; you resolve the significance threshold. The hyperparameter agent searches; you resolve the parameter bounds and convergence standards. The monitoring agent detects drift; you resolve the severity thresholds that set off retraining. The self-healing agent applies fixes; you evaluation them earlier than they merge into manufacturing.

That division is the purpose. Brokers deal with the procedural weight. You keep the evaluative weight. The result’s a pipeline that’s quicker, extra constant, and simpler to keep up, as a result of the elements that break are actually detected and infrequently repaired earlier than you must have a look at them.
 
 

Shittu Olumide is a software program engineer and technical author captivated with leveraging cutting-edge applied sciences to craft compelling narratives, with a eager eye for element and a knack for simplifying complicated ideas. You may as well discover Shittu on Twitter.


RELATED ARTICLES

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Most Popular

Recent Comments