Hooks & ops#
Three operational primitives: lifecycle hooks, cron jobs, and batch runners. All three are first-class citizens with REST + CLI + dashboard surfaces.
Lifecycle hooks#
swarm exposes five hook events matching the Claude Code format:
| Event | When it fires | Handler can |
|---|---|---|
SESSION_START |
Once per pipeline run, before first LLM call | Rewrite system prompt, inject context |
PRE_TOOL |
Before every tool dispatch | Mutate arguments, deny (carry_on=False), or pass through |
POST_TOOL |
After every tool result | Mutate content (PII redaction, enrichment) |
PRE_COMPACTION |
Before context compaction | Mark preservation rules |
POST_COMPACTION |
After compaction | Re-inject critical context |
Implemented at ml_team/core/hooks.py.
Registering a hook#
From a plugin (hooks/hooks.json):
{
"hooks": {
"PostTool": [
{
"hooks": [
{
"type": "command",
"command": "\"${CLAUDE_PLUGIN_ROOT}/hooks/redact_pan.sh\"",
"timeout": 5
}
]
}
]
}
}
Or from Python (internal extension):
from ml_team.core.hooks import registry, HookEvent, HookResult
def mask_pan(ctx: dict) -> HookResult:
content = ctx.get("result", {}).get("content", "")
masked = re.sub(r"\b\d{14,16}\b", "XXXX-XXXX-XXXX-XXXX", content)
return HookResult(carry_on=True, mutation={"result": {"content": masked}})
registry().register(HookEvent.POST_TOOL, mask_pan, plugin="acme-bfsi", priority=10)
Composition#
- Handlers fire in descending priority order, stable FIFO within a tier
- Each handler's
mutationis shallow-merged into context before the next handler runs carry_on=FalseonPRE_TOOLshort-circuits the chain and returns askipped_by_hooktool result- Feature-flag gated (
hooks_enabled) — when off,hooks.run()is a hard no-op
Security (shell-command hooks)#
Plugin shell-command hooks are disabled by default behind plugin_shell_hooks_enabled. When enabled, invocation:
- Validates against a command allowlist + the plugin's declared
allowed_shell_commands - Injects
CLAUDE_PLUGIN_ROOT; scrubs the rest of the env - Applies rlimits (Linux) + a hard timeout (default 10s, max 60s)
- Logs every execution to
plugin_shell_executionsaudit table
Cron jobs#
Scheduled operations — retraining, drift checks, audit-PDF rollups, retention sweeps — run via swarm's built-in cron scheduler. Vendored from Hermes' design, tailored for regulated use cases.
Schedule formats#
All of these parse:
| Format | Meaning |
|---|---|
30m |
Every 30 minutes |
every 2h |
Every 2 hours |
@daily |
Once a day at midnight |
0 2 * * * |
Cron expression — 02:00 daily |
2026-05-15T09:00Z |
ISO — one-shot at that time |
Task kinds#
| Kind | What it does |
|---|---|
retrain |
Reruns a pipeline with the latest data snapshot |
drift_check |
Computes drift against a baseline, alerts on threshold |
audit_pdf |
Rolls up a date range into a combined regulator PDF |
custom |
Arbitrary bash (gated by the permission engine) |
Creating a job#
swarm cron create \
--name "nightly_fraud_drift" \
--schedule "0 3 * * *" \
--task drift_check \
--config '{"baseline_run_id": "abc123", "window_hours": 24}'
Storage + daemon#
- File-backed at
~/.swarm/cron/jobs.json(atomic write +fcntllocking) - 60-second tick loop running as part of the FastAPI lifespan
- Each run logs output to
~/.swarm/cron/output/{job_id}_{run_id}.log - Feature-flag gated (
cron_scheduler, default on — table stakes)
REST surface#
GET /api/v1/cron/jobs— listPOST /api/v1/cron/jobs— create (admin)GET /api/v1/cron/jobs/{id}/runs— run historyPOST /api/v1/cron/jobs/{id}/run— run-now (bypass schedule)
See Reference: CLI + YAML: cron job schema.
Batch runner#
When you need to score 10,000+ records through a trained model and aggregate metrics, the batch runner is the primitive. Vendored + slimmed from Hermes' batch_runner.py.
Shape#
input.jsonl output
├── {record_id: "r1", ...} ├── results.jsonl (streaming)
├── {record_id: "r2", ...} ├── checkpoint.json (every 10 records)
└── ... └── stats.json (final aggregation)
Processors#
| Processor | What it does |
|---|---|
inference |
Loads a trained joblib model, runs predict per record |
echo |
For testing — echoes input |
custom |
pkg.mod:func supplied by operator, runs in worker process |
Creating a batch run#
swarm batch submit \
--input transactions_20260415.jsonl \
--processor inference \
--config '{"model_path": "models/fraud_v3.joblib"}' \
--concurrency 8
Resume on restart:
Already-processed record_ids are skipped (deduplicated against results.jsonl).
REST + WebSocket#
POST /api/v1/batch/{run_id}— submit JSONL bodyGET /api/v1/batch/{run_id}/status— pollWS /api/v1/batch/{run_id}/stream— checkpoint progress eventsGET /api/v1/batch/{run_id}/results— download results.jsonl
Why not threading.Timer / custom loop?#
Because the cron + batch primitives are regulated-audit-compatible: every invocation gets a record in run_events, every failure is persisted, every scheduled task has an approval gate if the YAML policy says so. Homegrown scheduling skips all of that.