Skip to content

Hooks & ops#

Three operational primitives: lifecycle hooks, cron jobs, and batch runners. All three are first-class citizens with REST + CLI + dashboard surfaces.

Lifecycle hooks#

swarm exposes five hook events matching the Claude Code format:

Event When it fires Handler can
SESSION_START Once per pipeline run, before first LLM call Rewrite system prompt, inject context
PRE_TOOL Before every tool dispatch Mutate arguments, deny (carry_on=False), or pass through
POST_TOOL After every tool result Mutate content (PII redaction, enrichment)
PRE_COMPACTION Before context compaction Mark preservation rules
POST_COMPACTION After compaction Re-inject critical context

Implemented at ml_team/core/hooks.py.

Registering a hook#

From a plugin (hooks/hooks.json):

{
  "hooks": {
    "PostTool": [
      {
        "hooks": [
          {
            "type": "command",
            "command": "\"${CLAUDE_PLUGIN_ROOT}/hooks/redact_pan.sh\"",
            "timeout": 5
          }
        ]
      }
    ]
  }
}

Or from Python (internal extension):

from ml_team.core.hooks import registry, HookEvent, HookResult

def mask_pan(ctx: dict) -> HookResult:
    content = ctx.get("result", {}).get("content", "")
    masked = re.sub(r"\b\d{14,16}\b", "XXXX-XXXX-XXXX-XXXX", content)
    return HookResult(carry_on=True, mutation={"result": {"content": masked}})

registry().register(HookEvent.POST_TOOL, mask_pan, plugin="acme-bfsi", priority=10)

Composition#

  • Handlers fire in descending priority order, stable FIFO within a tier
  • Each handler's mutation is shallow-merged into context before the next handler runs
  • carry_on=False on PRE_TOOL short-circuits the chain and returns a skipped_by_hook tool result
  • Feature-flag gated (hooks_enabled) — when off, hooks.run() is a hard no-op

Security (shell-command hooks)#

Plugin shell-command hooks are disabled by default behind plugin_shell_hooks_enabled. When enabled, invocation:

  • Validates against a command allowlist + the plugin's declared allowed_shell_commands
  • Injects CLAUDE_PLUGIN_ROOT; scrubs the rest of the env
  • Applies rlimits (Linux) + a hard timeout (default 10s, max 60s)
  • Logs every execution to plugin_shell_executions audit table

Cron jobs#

Scheduled operations — retraining, drift checks, audit-PDF rollups, retention sweeps — run via swarm's built-in cron scheduler. Vendored from Hermes' design, tailored for regulated use cases.

Schedule formats#

All of these parse:

Format Meaning
30m Every 30 minutes
every 2h Every 2 hours
@daily Once a day at midnight
0 2 * * * Cron expression — 02:00 daily
2026-05-15T09:00Z ISO — one-shot at that time

Task kinds#

Kind What it does
retrain Reruns a pipeline with the latest data snapshot
drift_check Computes drift against a baseline, alerts on threshold
audit_pdf Rolls up a date range into a combined regulator PDF
custom Arbitrary bash (gated by the permission engine)

Creating a job#

swarm cron create \
  --name "nightly_fraud_drift" \
  --schedule "0 3 * * *" \
  --task drift_check \
  --config '{"baseline_run_id": "abc123", "window_hours": 24}'

Storage + daemon#

  • File-backed at ~/.swarm/cron/jobs.json (atomic write + fcntl locking)
  • 60-second tick loop running as part of the FastAPI lifespan
  • Each run logs output to ~/.swarm/cron/output/{job_id}_{run_id}.log
  • Feature-flag gated (cron_scheduler, default on — table stakes)

REST surface#

  • GET /api/v1/cron/jobs — list
  • POST /api/v1/cron/jobs — create (admin)
  • GET /api/v1/cron/jobs/{id}/runs — run history
  • POST /api/v1/cron/jobs/{id}/run — run-now (bypass schedule)

See Reference: CLI + YAML: cron job schema.


Batch runner#

When you need to score 10,000+ records through a trained model and aggregate metrics, the batch runner is the primitive. Vendored + slimmed from Hermes' batch_runner.py.

Shape#

input.jsonl                          output
  ├── {record_id: "r1", ...}         ├── results.jsonl (streaming)
  ├── {record_id: "r2", ...}         ├── checkpoint.json (every 10 records)
  └── ...                             └── stats.json (final aggregation)

Processors#

Processor What it does
inference Loads a trained joblib model, runs predict per record
echo For testing — echoes input
custom pkg.mod:func supplied by operator, runs in worker process

Creating a batch run#

swarm batch submit \
  --input transactions_20260415.jsonl \
  --processor inference \
  --config '{"model_path": "models/fraud_v3.joblib"}' \
  --concurrency 8

Resume on restart:

swarm batch resume <run_id>

Already-processed record_ids are skipped (deduplicated against results.jsonl).

REST + WebSocket#

  • POST /api/v1/batch/{run_id} — submit JSONL body
  • GET /api/v1/batch/{run_id}/status — poll
  • WS /api/v1/batch/{run_id}/stream — checkpoint progress events
  • GET /api/v1/batch/{run_id}/results — download results.jsonl

Why not threading.Timer / custom loop?#

Because the cron + batch primitives are regulated-audit-compatible: every invocation gets a record in run_events, every failure is persisted, every scheduled task has an approval gate if the YAML policy says so. Homegrown scheduling skips all of that.

Next#