
# Introduction
The typical knowledge scientist spends roughly 45% of their working time on knowledge preparation and cleansing, not on modeling, not on perception era, not on the work that requires real judgment. That estimate retains showing throughout business surveys as a result of it retains being true. The duties consuming up that point — profiling columns, flagging nulls, working the identical exploratory knowledge evaluation (EDA) scripts, grid-searching hyperparameters, and writing the identical monitoring checks — are formulaic sufficient to comply with express guidelines.
That’s exactly what makes them automatable with brokers. Agentic workflows don’t exchange the information scientist. They soak up the procedural weight so you’ll be able to deal with the evaluative weight: deciding whether or not a mannequin is sensible, whether or not a function is genuinely informative, whether or not a discovering warrants a enterprise choice. Platforms like Databricks have already began delivery agentic knowledge science capabilities into their core infrastructure, with their Agent framework explicitly designed to “compress the time from query to perception.” That is the course manufacturing knowledge groups are transferring.
This text covers 5 concrete agentic workflows, one for every main stage of a knowledge science pipeline. Every features a real-world state of affairs, examined code patterns, and the design choices that matter in manufacturing.
# Conditions
All 5 workflows assume Python 3.10+ and familiarity with pandas, scikit-learn, and fundamental giant language mannequin (LLM) API utilization. Particular bundle necessities are listed beneath every workflow. For the tool-calling patterns, you want both an OpenAI API key or an area serving endpoint (Ollama, vLLM) that exposes an OpenAI-compatible API.
# Core packages used throughout all workflows
pip set up openai pandas numpy scipy scikit-learn lightgbm shap pydantic
# Workflow 1: Automated Exploratory Information Evaluation Agent
What it replaces: Manually loading knowledge, computing abstract statistics, visualizing distributions, inspecting nulls, detecting outliers, writing up findings. Each dataset, each time, the identical script with totally different column names.
What the agent does as an alternative: Masses the dataset, runs a full profile, flags points by severity, and produces a structured Markdown report. A human opinions the findings and decides what to do about them. The agent handles the whole lot earlier than that assessment.
// Structure
The agent makes use of a Reasoning and Performing (ReAct) loop with two instruments: profile_dataset produces abstract statistics per column, and flag_issues classifies issues by severity. The agent then synthesizes each outputs right into a structured report by a single language mannequin name. The important thing design choice is how the agent handles the flag_issues output; it causes about which points are actionable earlier than reporting, so the output is a prioritized record, not a uncooked dump.
// Code Sample
# eda_agent.py
# Conditions: pip set up openai pandas scipy
# Run: python eda_agent.py
import json
import pandas as pd
from scipy import stats
from openai import OpenAI
from dataclasses import dataclass
consumer = OpenAI() # Makes use of OPENAI_API_KEY env var
@dataclass
class ColumnIssue:
column: str
issue_type: str # null_rate | skewness | dtype | high_correlation
severity: str # low | medium | excessive
element: str
def profile_dataset(df: pd.DataFrame) -> dict:
"""
Generate per-column statistics.
In manufacturing, swap this for ydata-profiling for richer output.
"""
profile = {}
for col in df.columns:
col_stats = {
"dtype": str(df[col].dtype),
"null_rate": df[col].isnull().imply(),
"n_unique": df[col].nunique(),
}
if pd.api.varieties.is_numeric_dtype(df[col]):
col_stats["skewness"] = float(df[col].skew())
col_stats["mean"] = float(df[col].imply())
col_stats["std"] = float(df[col].std())
elif df[col].dtype == "object":
non_null = df[col].dropna()
numeric_coerced = pd.to_numeric(non_null, errors="coerce")
col_stats["looks_numeric"] = bool(len(non_null) > 0 and numeric_coerced.notna().imply() > 0.9)
profile[col] = col_stats
return profile
def flag_issues(profile: dict) -> record[ColumnIssue]:
"""
Flag knowledge high quality points from a column profile.
Severity tiers: excessive = wants fast consideration, medium = value reviewing.
"""
points = []
for col, stats_dict in profile.objects():
null_rate = stats_dict.get("null_rate", 0.0)
if null_rate > 0.15:
points.append(ColumnIssue(col, "null_rate", "excessive",
f"{null_rate:.0%} of values are lacking"))
elif null_rate > 0.05:
points.append(ColumnIssue(col, "null_rate", "medium",
f"{null_rate:.0%} of values are lacking"))
skewness = abs(stats_dict.get("skewness", 0.0))
if skewness > 5.0:
points.append(ColumnIssue(col, "skewness", "excessive",
f"Excessive skew={skewness:.1f} -- take into account log remodel"))
elif skewness > 2.0:
points.append(ColumnIssue(col, "skewness", "medium",
f"Average skew={skewness:.1f}"))
# Object columns with all-numeric values are seemingly miscoded
if stats_dict["dtype"] == "object" and stats_dict.get("looks_numeric", False):
points.append(ColumnIssue(col, "dtype", "medium",
"Numeric values saved as strings"))
return points
def run_eda_agent(df: pd.DataFrame, dataset_description: str) -> str:
"""
Run the EDA agent loop.
The agent decides which instruments to name and in what sequence,
then produces a structured report summarizing its findings.
"""
profile = profile_dataset(df)
points = flag_issues(profile)
# Format points for the agent
issues_text = "n".be a part of(
f"- [{i.severity.upper()}] {i.column}: {i.issue_type} -- {i.element}"
for i in points
) or "No points detected."
immediate = f"""You're a senior knowledge scientist reviewing a dataset for a knowledge science venture.
Dataset: {dataset_description}
Column profile (abstract stats):
{json.dumps(profile, indent=2)}
Detected points:
{issues_text}
Write a structured EDA report with these sections:
1. DATASET OVERVIEW -- form, dtypes, general high quality evaluation (1-2 sentences)
2. HIGH PRIORITY ISSUES -- objects requiring motion earlier than modeling
3. MEDIUM PRIORITY ISSUES -- objects value monitoring
4. RECOMMENDED NEXT STEPS -- ordered record of 3-5 particular actions
Be direct. Prioritize actionability over completeness."""
response = consumer.chat.completions.create(
mannequin="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.2, # Low temperature for constant structured output
)
return response.selections[0].message.content material
# ── Run it ────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
# Instance: retail transaction knowledge
import numpy as np
np.random.seed(42)
n = 5000
df = pd.DataFrame({
"income": np.random.exponential(scale=200, dimension=n), # right-skewed
"customer_age": np.random.regular(40, 12, n),
"created_at": pd.date_range("2024-01-01", durations=n, freq="h").astype(str),
"region_code": np.random.alternative(["US", "EU", "APAC", None], dimension=n, p=[0.5, 0.3, 0.1, 0.1]),
"session_count": np.the place(np.random.rand(n) < 0.2, None, np.random.randint(1, 50, n)),
})
report = run_eda_agent(df, "Retail transaction knowledge with buyer demographics")
print(report)
Tips on how to run:
export OPENAI_API_KEY=your_key
python eda_agent.py
Actual state of affairs
Retail transaction knowledge, 5,000 rows, 8 columns. The agent flags income as high-priority (excessive proper skew at 7.3), session_count as high-priority (22% null price), and created_at as medium-priority (date saved as string). It recommends a log remodel for income, a null indicator function for session_count, and parsing created_at to extract hour-of-day and day-of-week options. All of this surfaces in beneath 30 seconds. A human opinions the report and acts on the suggestions, with no time spent working the diagnostics manually.
# Workflow 2: Agentic Characteristic Engineering and Choice
What it replaces: Manually brainstorming interplay options, writing the transformation code, evaluating every candidate with a baseline mannequin, pruning those that don’t contribute, documenting what survived and why.
What the agent does as an alternative: Proposes candidate options based mostly on the information profile and area context, generates the transformation code, evaluates every candidate in opposition to a quick baseline, and prunes options under a configurable significance threshold, with a written rationale for every choice.
// Structure
Two phases, one agent. The era part makes use of the LLM to suggest candidate options from a structured description of the dataset and the prediction job. The choice part evaluates every candidate by coaching a LightGBM classifier with 5-fold cross-validation (CV) and computing function significance utilizing SHapley Additive exPlanations (SHAP). Options under the edge are pruned. The agent causes concerning the significance scores earlier than pruning; it catches circumstances the place a function appears to be like weak globally however carries a sign for a selected phase.
// Code Sample
# feature_agent.py
# Conditions: pip set up openai lightgbm shap scikit-learn pandas numpy
# Run: python feature_agent.py
import json
import numpy as np
import pandas as pd
from openai import OpenAI
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import LabelEncoder
import lightgbm as lgb
consumer = OpenAI()
def generate_feature_candidates(
column_descriptions: dict[str, str],
goal: str,
task_type: str = "classification",
n_candidates: int = 10,
) -> record[dict]:
"""
Ask the LLM to suggest candidate options given column descriptions and the prediction job.
Returns an inventory of dicts with 'title', 'components', and 'rationale'.
"""
immediate = f"""You're a senior ML engineer performing function engineering for a {task_type} job.
Goal variable: {goal}
Obtainable columns:
{json.dumps(column_descriptions, indent=2)}
Suggest {n_candidates} candidate engineered options which can be seemingly to enhance mannequin efficiency.
For every function, present:
- title: a snake_case function title
- components: how you can compute it from the accessible columns (pandas expression)
- rationale: one sentence on why this function would possibly assist
Return a JSON object with a single key "options" containing an array of objects,
every with keys: title, components, rationale.
Return ONLY legitimate JSON -- no clarification outdoors the JSON."""
response = consumer.chat.completions.create(
mannequin="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
response_format={"kind": "json_object"},
temperature=0.4,
)
end result = json.masses(response.selections[0].message.content material)
return end result.get("options", end result.get("candidates", []))
def evaluate_and_prune(
df: pd.DataFrame,
candidate_features: record[dict],
target_col: str,
importance_threshold: float = 0.01,
) -> tuple[list[str], record[str], dict[str, float]]:
"""
Add candidate options to the dataframe, practice a quick LightGBM baseline,
extract function importances, and prune under threshold.
Returns (kept_features, pruned_features, importance_scores)
"""
feature_df = df.copy()
added = []
for candidate in candidate_features:
attempt:
# Consider the components string -- in manufacturing, use a secure eval sandbox
feature_df[candidate["name"]] = feature_df.eval(candidate["formula"])
added.append(candidate["name"])
besides Exception as e:
# System failed -- skip this candidate
print(f" Skipped '{candidate['name']}': {e}")
if not added:
return [], [], {}
X = feature_df[added].fillna(0)
y = df[target_col]
mannequin = lgb.LGBMClassifier(n_estimators=100, random_state=42, verbose=-1)
mannequin.match(X, y)
importance_scores = dict(zip(added, mannequin.feature_importances_ / mannequin.feature_importances_.sum()))
stored = [f for f in added if importance_scores.get(f, 0) >= importance_threshold]
pruned = [f for f in added if importance_scores.get(f, 0) < importance_threshold]
return stored, pruned, importance_scores
def explain_selection(
stored: record[str],
pruned: record[str],
scores: dict[str, float],
) -> str:
"""Ask the agent to elucidate its choice choices in plain language."""
immediate = f"""You might be reviewing function choice outcomes for an ML pipeline.
Options KEPT (above significance threshold):
{json.dumps({f: spherical(scores.get(f, 0), 4) for f in stored}, indent=2)}
Options PRUNED (under threshold):
{json.dumps({f: spherical(scores.get(f, 0), 4) for f in pruned}, indent=2)}
Write a 3-5 sentence abstract of the choice final result.
Observe any stunning prunings or sudden high-importance options.
Counsel one further function value testing based mostly on what survived."""
response = consumer.chat.completions.create(
mannequin="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
)
return response.selections[0].message.content material
if __name__ == "__main__":
column_descriptions = {
"days_since_login": "Variety of days because the buyer final logged in",
"plan_tier": "Subscription tier: fundamental, professional, or enterprise",
"support_tickets_90d": "Variety of assist tickets opened within the final 90 days",
"monthly_spend": "Buyer's common month-to-month spend in USD",
}
candidates = generate_feature_candidates(
column_descriptions, goal="churned", task_type="classification", n_candidates=10
)
# In manufacturing, load actual buyer knowledge right here
np.random.seed(42)
n = 3000
df = pd.DataFrame({
"days_since_login": np.random.randint(0, 90, n),
"plan_tier": np.random.alternative(["basic", "pro", "enterprise"], n),
"support_tickets_90d": np.random.poisson(1.5, n),
"monthly_spend": np.random.exponential(80, n),
"churned": np.random.binomial(1, 0.15, n),
})
stored, pruned, scores = evaluate_and_prune(df, candidates, target_col="churned")
abstract = explain_selection(stored, pruned, scores)
print(abstract)
Tips on how to run:
Actual state of affairs
Buyer churn prediction, 12 enter columns together with days_since_login, plan_tier, support_tickets_90d, and monthly_spend. The agent proposes 15 candidates, together with spend_per_day, tickets_per_spend_ratio, and login_recency_x_plan. After analysis, 9 survive the significance threshold. The reason calls out that tickets_per_spend_ratio has the best significance rating (0.18): “clients spending extra who’re additionally elevating assist tickets are a very excessive churn danger,” which turns into a discovering value sharing with the product group.
# Workflow 3: Agentic Hyperparameter Optimization
What it replaces: Grid search (exhaustive however wasteful), random search (environment friendly however dumb), and handbook Bayesian optimization setup (highly effective however boilerplate-heavy). All of those deal with hyperparameter tuning as a search downside. An agent treats it as a reasoning downside.
What the agent does as an alternative: Proposes a hyperparameter configuration, evaluates it by coaching the mannequin, analyzes the metric development throughout iterations, identifies which parameters are driving enchancment, and adjusts the search course accordingly, with out being advised to. It converges on a great configuration in far fewer iterations than grid or random search.
// Structure
One agent, one software: train_and_evaluate. The software takes a Pydantic-validated hyperparameter config, trains the mannequin with 5-fold CV, and returns the realm beneath the curve (AUC), coaching time, and the practice/validation overfitting hole. The agent receives the total trial historical past at every step and causes about what to attempt subsequent. Convergence is detected when the final three AUC scores span lower than 0.005.
This sample is instantly impressed by revealed analysis on agentic hyperparameter tuning that confirmed LLM-guided search outperforming Bayesian optimization on mid-sized classification duties by 5-12% in fewer iterations.
// Code Sample
# hp_agent.py
# Conditions: pip set up openai scikit-learn pydantic pandas numpy
# Run: python hp_agent.py
import json
from dataclasses import dataclass, subject
from pydantic import BaseModel, Area, field_validator
from openai import OpenAI
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score
from sklearn.metrics import roc_auc_score
from sklearn.datasets import make_classification
import numpy as np
consumer = OpenAI()
# ── Pydantic schema for structured software enter ─────────────────────────────────
# The mannequin should return legitimate hyperparameters -- Pydantic catches invalid values
# earlier than the coaching job begins, saving wasted compute on dangerous configs.
class HyperparamConfig(BaseModel):
n_estimators: int = Area(..., ge=10, le=1000, description="Variety of bushes")
max_depth: int = Area(..., ge=1, le=50, description="Max tree depth")
min_samples_split: int = Area(..., ge=2, le=50, description="Min samples to separate")
max_features: float = Area(..., gt=0, le=1.0, description="Fraction of options per cut up")
@dataclass
class TrialResult:
iteration: int
config: dict
val_auc: float
train_auc: float
train_time_s: float
@property
def overfit_gap(self) -> float:
return spherical(self.train_auc - self.val_auc, 4)
def train_and_evaluate(config: dict, X, y) -> TrialResult:
"""
Prepare a RandomForest with the given config and return cross-validated metrics.
That is the software the agent calls on every iteration.
"""
import time
params = HyperparamConfig(**config) # Validates earlier than coaching
clf = RandomForestClassifier(
n_estimators=params.n_estimators,
max_depth=params.max_depth,
min_samples_split=params.min_samples_split,
max_features=params.max_features,
random_state=42,
n_jobs=-1,
)
t0 = time.time()
val_scores = cross_val_score(clf, X, y, cv=5, scoring="roc_auc")
clf.match(X, y)
train_auc = roc_auc_score(y, clf.predict_proba(X)[:, 1])
return TrialResult(
iteration=0,
config=config,
val_auc=spherical(float(val_scores.imply()), 4),
train_auc=spherical(float(train_auc), 4),
train_time_s=spherical(time.time() - t0, 2),
)
def detect_convergence(outcomes: record[TrialResult], window: int = 3, tol: float = 0.005) -> bool:
"""Cease when the final `window` AUC scores span lower than `tol`."""
if len(outcomes) < window:
return False
current = [r.val_auc for r in results[-window:]]
return (max(current) - min(current)) < tol
def propose_next_config(trial_history: record[TrialResult]) -> dict:
"""
Ask the agent to suggest the following hyperparameter configuration,
reasoning from the total trial historical past.
"""
history_text = "n".be a part of(
f"Trial {r.iteration}: config={r.config}, val_AUC={r.val_auc}, "
f"overfit_gap={r.overfit_gap}, time={r.train_time_s}s"
for r in trial_history
)
immediate = f"""You might be optimizing a RandomForest classifier. Your objective is to maximise val_AUC.
Trial historical past:
{history_text}
Parameter ranges:
- n_estimators: 10-1000
- max_depth: 1-50
- min_samples_split: 2-50
- max_features: 0.1-1.0
Analyze the development. Determine which parameters seem most influential.
Suggest the following configuration to attempt, explaining your reasoning in a single sentence.
Return a JSON object with keys: n_estimators, max_depth, min_samples_split, max_features, reasoning"""
response = consumer.chat.completions.create(
mannequin="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
response_format={"kind": "json_object"},
temperature=0.3,
)
end result = json.masses(response.selections[0].message.content material)
print(f" Agent reasoning: {end result.get('reasoning', '')}")
return {ok: v for ok, v in end result.objects() if ok != "reasoning"}
def run_hp_agent(X, y, max_iterations: int = 15) -> TrialResult:
"""
Run the agentic hyperparameter optimization loop.
Begins with a wise default, then lets the agent information the search.
"""
# Wise start line -- don't begin random
initial_config = {"n_estimators": 100, "max_depth": 10, "min_samples_split": 5, "max_features": 0.5}
outcomes = []
for i in vary(max_iterations):
config = initial_config if i == 0 else propose_next_config(outcomes)
attempt:
end result = train_and_evaluate(config, X, y)
besides Exception as e:
print(f" Trial {i+1} failed: {e} -- skipping")
proceed
end result.iteration = i + 1
outcomes.append(end result)
finest = max(outcomes, key=lambda r: r.val_auc)
print(f"Trial {i+1:02d}: AUC={end result.val_auc:.4f} (finest={finest.val_auc:.4f})")
if detect_convergence(outcomes, window=3, tol=0.005):
print(f"Converged after {i+1} iterations.")
break
return max(outcomes, key=lambda r: r.val_auc)
if __name__ == "__main__":
X, y = make_classification(n_samples=5000, n_features=20, n_informative=10, random_state=42)
finest = run_hp_agent(X, y, max_iterations=15)
print(f"nBest config: {finest.config}")
print(f"Greatest val_AUC: {finest.val_auc}")
Tips on how to run:
Actual state of affairs
Census Revenue classification dataset (UCI, 48,842 rows). Default RandomForest AUC: 0.87. After 15 agent-guided iterations, the agent converges on max_depth=12, n_estimators=350, min_samples_split=8, max_features=0.4, reaching AUC 0.91. At iteration 7, the agent’s reasoning log reads: “max_depth seems to be the dominant driver, rising it from 8 to 12 gave +0.019 AUC, whereas n_estimators past 200 reveals diminishing returns.” That reasoning is traceable within the output, not hidden inside a black-box optimizer.
# Workflow 4: Automated Mannequin Monitoring and Drift Detection Agent
What it replaces: Manually checking function distributions on a schedule, writing threshold guidelines per column, sustaining dashboard alerts that go stale, and discovering mannequin degradation solely after it reveals up in enterprise metrics.
What the agent does as an alternative: Runs on a schedule in opposition to incoming batch knowledge, computes drift statistics per function utilizing Inhabitants Stability Index (PSI) and the Kolmogorov-Smirnov (KS) take a look at, classifies drift severity, and responds in another way relying on severity: gentle drift triggers an alert, extreme drift triggers a retraining pipeline name.
// Structure
A scheduled agent constructed round one software, compute_drift_stats, which computes PSI and the KS take a look at for every column and classifies the end result by severity. A single language mannequin name then decides how you can reply: a passing verify is just logged, gentle drift produces a drafted alert for the information science group, and extreme drift produces an alert plus a set off for a retraining directed acyclic graph (DAG), despatched through Slack or the Airflow representational state switch (REST) API. The vital design choice is the branching response itself; the agent handles the routing, not a hardcoded if/else ladder.
PSI interpretation: under 0.1 is secure, 0.1-0.25 is gentle drift value monitoring, and above 0.25 is critical drift that ought to set off retraining. PSI is the usual metric for inhabitants shift in manufacturing machine studying techniques and has been utilized in monetary danger modeling for many years earlier than LLMs existed.
// Code Sample
# drift_agent.py
# Conditions: pip set up openai pandas scipy numpy
# Run: python drift_agent.py
import json
import math
import numpy as np
import pandas as pd
from dataclasses import dataclass
from openai import OpenAI
consumer = OpenAI()
@dataclass
class FeatureDrift:
function: str
psi: float
ks_stat: float
ks_pvalue: float
severity: str # secure | mild_drift | severe_drift
def compute_psi(baseline: np.ndarray, present: np.ndarray, buckets: int = 10) -> float:
"""
Inhabitants Stability Index between baseline and present distributions.
PSI = sum((current_% - baseline_%) * ln(current_% / baseline_%))
Values: <0.1 secure | 0.1-0.25 gentle | >0.25 extreme
"""
min_val = min(baseline.min(), present.min())
max_val = max(baseline.max(), present.max())
bucket_width = (max_val - min_val) / buckets
def bucket_freqs(knowledge: np.ndarray) -> record[float]:
counts = np.zeros(buckets)
for v in knowledge:
idx = min(int((v - min_val) / bucket_width), buckets - 1)
counts[idx] += 1
freqs = counts / len(knowledge)
return [max(f, 1e-6) for f in freqs] # Keep away from log(0)
b_freq = bucket_freqs(baseline)
c_freq = bucket_freqs(present)
return spherical(sum((c - b) * math.log(c / b) for b, c in zip(b_freq, c_freq)), 4)
def classify_drift(psi: float) -> str:
if psi < 0.10: return "secure"
if psi < 0.25: return "mild_drift"
return "severe_drift"
def compute_drift_stats(
baseline_df: pd.DataFrame,
current_df: pd.DataFrame,
numeric_cols: record[str],
) -> record[FeatureDrift]:
"""Compute PSI and KS take a look at for every numeric function."""
from scipy.stats import ks_2samp
outcomes = []
for col in numeric_cols:
b = baseline_df[col].dropna().values
c = current_df[col].dropna().values
psi = compute_psi(b, c)
ks_stat, ks_pvalue = ks_2samp(b, c)
outcomes.append(FeatureDrift(
function=col,
psi=psi,
ks_stat=spherical(float(ks_stat), 4),
ks_pvalue=spherical(float(ks_pvalue), 6),
severity=classify_drift(psi),
))
return outcomes
def run_monitoring_agent(
baseline_df: pd.DataFrame,
current_df: pd.DataFrame,
numeric_cols: record[str],
model_name: str = "churn_model_v3",
) -> str:
"""
Run the monitoring agent.
It computes drift stats and decides how you can reply based mostly on severity.
"""
drift_results = compute_drift_stats(baseline_df, current_df, numeric_cols)
drift_summary = [
{"feature": d.feature, "psi": d.psi, "ks_pvalue": d.ks_pvalue, "severity": d.severity}
for d in drift_results
]
severe_features = [d.feature for d in drift_results if d.severity == "severe_drift"]
mild_features = [d.feature for d in drift_results if d.severity == "mild_drift"]
immediate = f"""You're a mannequin monitoring agent for {model_name}.
Drift evaluation outcomes:
{json.dumps(drift_summary, indent=2)}
Extreme drift (PSI > 0.25): {severe_features}
Delicate drift (PSI 0.10-0.25): {mild_features}
Primarily based on severity, decide the suitable response:
- STABLE: log a cross, no motion wanted
- MILD DRIFT: draft an alert message for the information science group
- SEVERE DRIFT: draft an alert message AND a set off for the retraining pipeline
Write your response on this format:
SEVERITY_LEVEL:
ACTION:
MESSAGE: """
response = consumer.chat.completions.create(
mannequin="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.1, # Very low -- it is a decision-making name, not inventive
)
return response.selections[0].message.content material
if __name__ == "__main__":
np.random.seed(42)
n = 2000
# Baseline: regular e-commerce looking patterns
baseline = pd.DataFrame({
"session_duration_s": np.random.regular(180, 60, n),
"pages_per_session": np.random.regular(4.2, 1.5, n),
"cart_add_rate": np.clip(np.random.regular(0.12, 0.04, n), 0, 1),
})
# Present: promotional occasion shifts all options considerably
present = pd.DataFrame({
"session_duration_s": np.random.regular(310, 90, n), # periods for much longer
"pages_per_session": np.random.regular(6.8, 2.1, n), # viewing extra pages
"cart_add_rate": np.clip(np.random.regular(0.31, 0.08, n), 0, 1), # a lot larger
})
end result = run_monitoring_agent(baseline, present, record(baseline.columns), model_name="recommendation_engine_v2")
print(end result)
Tips on how to run:
Actual state of affairs
E-commerce suggestion mannequin. A promotional occasion causes a sudden distribution shift in looking conduct, session length jumps from 180s to 310s imply, and cart add price practically triples. The monitoring agent runs at midnight in opposition to the day’s knowledge. It detects PSI > 0.25 on all three options, classifies severity as extreme, and triggers the retraining pipeline with an alert to Slack. The information science group wakes as much as a message explaining what shifted and what was accomplished about it, not a uncooked dashboard they should interpret at 6 a.m.
# Workflow 5: Agentic Pipeline Orchestration and Self-Therapeutic
What it replaces: Looking at an Airflow failure notification, opening the logs, manually studying the traceback, determining whether or not the repair requires a code change, a config change, or a retry, making the repair, rerunning the duty, and hoping the following job downstream doesn’t fail for a similar motive.
What the agent does as an alternative: Reads the failure log, classifies the error kind, determines whether or not it’s auto-fixable, applies the repair whether it is, and both retriggers the duty or escalates to a human with a totally structured incident report if it isn’t.
// Structure
A meta-agent that wraps your present orchestration layer. When an Airflow job fails, the orchestrator sends the duty ID, error log, and job definition to the agent. The agent makes use of one software, parse_pipeline_error, to categorise the failure deterministically. From there, a single language mannequin name decides whether or not the error is auto-fixable and drafts both a repair description or a structured incident report for human assessment, relying on that classification.
// Code Sample
# pipeline_healer.py
# Conditions: pip set up openai pandas
# Run: python pipeline_healer.py
import json
import re
from dataclasses import dataclass
from typing import Optionally available
from openai import OpenAI
consumer = OpenAI()
@dataclass
class PipelineError:
task_id: str
error_type: str # schema_mismatch | null_violation | timeout | unknown
column: Optionally available[str]
element: str
auto_fixable: bool
def parse_pipeline_error(log_line: str, task_id: str) -> PipelineError:
"""
Classify a job failure log right into a structured error kind.
Auto-fixable errors could be repaired with out human intervention.
"""
if "KeyError" in log_line or ("column" in log_line.decrease() and "not discovered" in log_line.decrease()):
col_match = re.search(r"['"](w+)['"]", log_line)
col = col_match.group(1) if col_match else None
return PipelineError(task_id, "schema_mismatch", col, log_line.strip(), auto_fixable=True)
if "IntegrityError" in log_line or ("null" in log_line.decrease() and "violate" in log_line.decrease()):
return PipelineError(task_id, "null_violation", None, log_line.strip(), auto_fixable=True)
if "TimeoutError" in log_line or "timed out" in log_line.decrease():
return PipelineError(task_id, "timeout", None, log_line.strip(), auto_fixable=False)
return PipelineError(task_id, "unknown", None, log_line.strip(), auto_fixable=False)
def run_self_healing_agent(
task_id: str,
error_log: str,
task_definition: str,
) -> str:
"""
Run the self-healing agent on a failed pipeline job.
It classifies the error, decides on a remediation, and produces
both an auto-fix description or a structured escalation report.
"""
error = parse_pipeline_error(error_log, task_id)
immediate = f"""You're a knowledge pipeline reliability engineer.
A pipeline job has failed and you will need to resolve how you can reply.
Activity: {task_id}
Activity definition: {task_definition}
Error kind: {error.error_type}
Column affected: {error.column or 'N/A'}
Auto-fixable: {error.auto_fixable}
Full error: {error.element}
{"You'll be able to apply an automated repair for this error kind." if error.auto_fixable else "This error requires human assessment -- you can't auto-fix it."}
Reply with:
ACTION:
FIX_DESCRIPTION:
ESCALATION_REPORT:
NEXT_STEP: """
response = consumer.chat.completions.create(
mannequin="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0.1,
)
return response.selections[0].message.content material
if __name__ == "__main__":
# Situation: CRM export added a brand new column and adjusted a date format
end result = run_self_healing_agent(
task_id="ingest_crm_daily",
error_log="KeyError: 'transaction_date' column not present in supply dataframe. "
"Obtainable columns: ['txn_date_utc', 'customer_id', 'amount_usd', 'product_sku']",
task_definition="Reads day by day CRM export, extracts transaction_date and customer_id, "
"joins with product catalog, writes to function retailer.",
)
print(end result)
Tips on how to run:
python pipeline_healer.py
Actual state of affairs
A day by day function pipeline fails at 2 am as a result of an upstream CRM system up to date its export schema, renamed transaction_date to txn_date_utc and added three new columns. The agent reads the error log, identifies the schema mismatch on transaction_date, and produces an auto-fix: rename the column within the ingestion step and add the three new columns to the schema definition as nullable. It logs the repair, retriggers the failed job, and sends the on-call engineer a abstract that reads “Schema repair utilized robotically. Supply renamed transaction_date → txn_date_utc. Three new nullable columns had been added to the schema. Activity retriggered at 02:14.” The engineer opinions the change within the morning as an alternative of being woken up.
# Wrapping Up
The 5 workflows will not be impartial instruments. They’re a pipeline:
The EDA agent understands the information. The function engineering agent improves it. The hyperparameter agent optimizes the mannequin constructed on these options. The monitoring agent watches the mannequin in manufacturing. The self-healing agent protects the pipeline, delivering knowledge to all of them.
Deploy them on this order. Begin with monitoring; it delivers worth instantly on any present pipeline with out requiring modifications to your modeling code. Add the EDA agent subsequent for any new dataset you usher in. The function engineering and hyperparameter brokers come after you’ve gotten established a baseline mannequin value enhancing.

None of those workflows operates with out human assessment of the choices that matter. The EDA agent flags points; you resolve what to do about them. The function agent proposes candidates; you resolve the significance threshold. The hyperparameter agent searches; you resolve the parameter bounds and convergence standards. The monitoring agent detects drift; you resolve the severity thresholds that set off retraining. The self-healing agent applies fixes; you assessment them earlier than they merge into manufacturing.
That division is the purpose. Brokers deal with the procedural weight. You keep the evaluative weight. The result’s a pipeline that’s sooner, extra constant, and simpler to keep up, as a result of the components that break at the moment are detected and infrequently repaired earlier than you must take a look at them.
Shittu Olumide is a software program engineer and technical author keen about leveraging cutting-edge applied sciences to craft compelling narratives, with a eager eye for element and a knack for simplifying advanced ideas. You too can discover Shittu on Twitter.















