Tutorial 2 — Drift investigation¶
Your nightly drift check just fired: amount-feature KS statistic went from 0.04 to 0.21. Walk through triage, root-cause, and remediation.
Time: ~20 minutes interactive.
Assumes: you've completed Tutorial 1 — End-to-end fraud classifier and have a deployed fraud_classifier:v1 with a scheduled drift check.
1. Simulate production drift¶
In real life, drift emerges from your production traffic. For this tutorial we'll simulate it by shifting the training data's distribution and logging predictions from the shifted data.
import os
import time
from pathlib import Path
import numpy as np
import pandas as pd
import httpx
SWARM_API = os.environ.get('SWARM_API', 'http://localhost:8000')
SWARM_TOKEN = os.environ['SWARM_TOKEN']
# Load the training data from tutorial 1
train = pd.read_csv('ml_team/data/fraud_synthetic.csv')
# Simulate April 2026 traffic: amounts shifted up, velocity slightly different
shifted = train.sample(10_000, random_state=99).reset_index(drop=True)
shifted['amount_zscore'] = shifted['amount_zscore'] * 1.4 + 0.3 # distribution shift
shifted['velocity_zscore'] = shifted['velocity_zscore'] + 0.2
shifted.to_csv('ml_team/data/fraud_april_shifted.csv', index=False)
print(f'Original: amount mean={train.amount_zscore.mean():.3f} std={train.amount_zscore.std():.3f}')
print(f'Shifted: amount mean={shifted.amount_zscore.mean():.3f} std={shifted.amount_zscore.std():.3f}')
2. Run the drift check¶
We'll invoke the detect_drift tool directly, pointing at our pinned baseline + the shifted data as the observation window.
drift_resp = httpx.post(
f'{SWARM_API}/api/v1/tools/invoke',
headers={'Authorization': f'Bearer {SWARM_TOKEN}'},
json={
'tool': 'detect_drift',
'arguments': {
'baseline_dataset': 'fraud_synthetic.csv',
'observation_dataset': 'fraud_april_shifted.csv',
'features': ['amount_zscore', 'velocity_zscore', 'merchant_risk', 'hour_of_day'],
'alert_threshold_ks': 0.15,
},
},
timeout=60,
)
drift_report = drift_resp.json()
print(drift_report)
Expected output: amount_zscore and velocity_zscore both alert. The report tells you:
- Baseline vs observation mean, std, quantiles
- KS statistic per feature
- p-value
- Interpretation string
3. Triage — is this concept drift or feature drift?¶
Feature drift = distribution moved but the relationship between features and outcome is intact; model may still perform fine.
Concept drift = the relationship itself changed; model performance is degrading.
Check performance on the shifted data against a held-out test set labelled the same way:
# Score the shifted dataset with the deployed model
scored_resp = httpx.post(
f'{SWARM_API}/api/v1/batch/fraud_shifted_triage',
headers={'Authorization': f'Bearer {SWARM_TOKEN}'},
json={
'input_path': 'fraud_april_shifted.csv',
'processor': 'inference',
'config': {'model_path': 'models/fraud_classifier_v1.joblib'},
'concurrency': 8,
},
timeout=30,
)
batch_id = scored_resp.json()['run_id']
# Wait for completion
while True:
s = httpx.get(f'{SWARM_API}/api/v1/batch/{batch_id}/status',
headers={'Authorization': f'Bearer {SWARM_TOKEN}'}).json()
if s['state'] in {'completed', 'failed'}:
break
time.sleep(3)
# Pull results + compute metrics
import json
from sklearn.metrics import roc_auc_score, average_precision_score
results = [json.loads(l) for l in Path(f'batch_runs/{batch_id}/results.jsonl').read_text().splitlines()]
y_true = [r['actual'] for r in results]
y_pred_prob = [r['prediction_prob'] for r in results]
print(f'On shifted data: ROC-AUC={roc_auc_score(y_true, y_pred_prob):.3f}')
print(f' PR-AUC= {average_precision_score(y_true, y_pred_prob):.3f}')
Compare against the baseline's held-out PR-AUC from Tutorial 1. If comparable (within 5%) → feature drift but model is robust. If worse → concept drift + retrain needed.
4. Root-cause — what actually moved?¶
Slice by time, by merchant segment, by geo, by customer cohort. The error_analyzer agent is designed for this.
investigation = httpx.post(
f'{SWARM_API}/api/v1/pipelines',
headers={'Authorization': f'Bearer {SWARM_TOKEN}'},
json={
'problem_statement': (
'Investigate drift on fraud_classifier v1. The amount_zscore feature '
'has a KS statistic of 0.21 vs baseline. Determine whether this is (a) '
'an upstream data-pipeline bug, (b) a genuine change in customer '
'behaviour, or (c) a bug in the feature engineering. Produce a report '
'with specific slice breakdowns and a recommendation.'
),
'dataset_path': 'fraud_april_shifted.csv',
'template': 'fast_prototype',
'name': 'fraud-drift-investigation',
},
)
inv_run_id = investigation.json()['run_id']
print(f'Investigation started: {inv_run_id}')
Let it run for ~5-10 min. Inspect pipeline_runs/<inv_run_id>/reports/ for the breakdown.
Typical findings in real investigations:
- Upstream data-pipeline bug — a field-encoding change broke the ETL; feature actually hasn't changed, the ingest pipeline is lying. Fix: ETL, not model.
- Genuine behaviour change — real customers shifting; model must retrain on new data.
- Feature engineering bug — the z-score normalization is using a stale mean/std; recompute.
5. Remediate¶
Based on the finding, one of:
If upstream pipeline bug:¶
# File a ticket with the data platform team
# Meanwhile, add a sanity-check to the ingestion step
swarm cron create \
--name 'fraud_etl_sanity_check' \
--schedule '@hourly' \
--task drift_check \
--config '{"features": ["amount_zscore", ...], "alert_threshold_ks": 0.10}'
If genuine drift + model still performing:¶
No action needed — feature drift without concept drift means the model is robust. Update the drift baseline at next scheduled retrain.
If concept drift:¶
# Retrain on recent data
swarm pipelines run \
--problem '<same as v1>' \
--dataset 'fraud_latest_3months.csv' \
--template default_ml_pipeline \
--compliance rbi_free_ai
# Shadow v2 against v1 for a week
# Promote if wins
6. Update the audit trail¶
Drift incidents should land in the monthly audit PDF rollup. The audit_logger agent does this automatically when the drift alert fires — but for regulators you may also want a standalone drift incident report:
httpx.post(
f'{SWARM_API}/api/v1/audit/incidents',
headers={'Authorization': f'Bearer {SWARM_TOKEN}'},
json={
'kind': 'drift_alert',
'model': 'fraud_classifier',
'version': 'v1',
'detected_at': '2026-04-15T03:00:00Z',
'remediation': 'Feature ETL fix; no retrain needed — model performance intact.',
'approver': 'alice@yourorg.com',
'references': [f'/pipelines/{inv_run_id}'],
},
)
Summary¶
Drift monitoring gives you:
- Early detection — catch distribution shifts before customers complain
- Root-cause triage — is it our ETL or their behaviour?
- Audit evidence — every drift event + remediation logged
- Informed retrain decisions — retrain when concept drift, not reflexively on feature drift
The full flow on cron:
(every night 03:00 IST)
drift_check
├── if drift <threshold → log to audit, no action
└── if drift ≥threshold →
alert email
+ run investigation pipeline
+ write incident record
+ dashboard banner
Next¶
- Plugin authoring tutorial
- Schedule a drift check — the full cron setup
- Concepts: Permissions & audit