Pipeline step anatomy
Every governance rule lives in a pipeline step. This page shows how one actually works, end to end — from an HTTP call hitting the gateway to a Detection object landing in the audit trail.
The protocol
Section titled “The protocol”class PipelineStep(Protocol): name: str phase: Literal["pre", "post"] default_config: StepConfig
async def run( self, ctx: PipelineContext, config: StepConfig, ) -> list[Detection]: """Inspect ctx. Return findings. Optionally modify ctx in place."""One file per step under src/tappass/pipeline/steps/.
Minimal example
Section titled “Minimal example”from tappass.pipeline.protocol import PipelineStep, StepConfigfrom tappass.pipeline.context import PipelineContextfrom tappass.pipeline.types import Detection, Severityfrom tappass.pipeline.backends import LLMGuardBackend
class DetectPIIStep: name = "detect_pii" phase = "pre" default_config = StepConfig(enabled=True, on_detection="redact")
def __init__(self, backend: LLMGuardBackend): self._backend = backend
async def run( self, ctx: PipelineContext, config: StepConfig, ) -> list[Detection]: findings = await self._backend.scan( ctx.user_message, direction="input", config=config, )
if config.on_detection == "redact": ctx.user_message = _apply_redactions(ctx.user_message, findings)
return [ Detection( category=f.category, severity=Severity(f.severity), offset=f.offset, length=f.length, replacement=f.replacement if config.on_detection == "redact" else None, ) for f in findings ]What PipelineContext gives you
Section titled “What PipelineContext gives you”@dataclassclass PipelineContext: tenant: Tenant agent: Agent session_id: str audit_id: str # the event ID — fixed at ctx construction
# The request as the agent sent it original_payload: dict user_message: str # the "current" text (may be modified) messages: list[Message] model: str
# Tool calls, if any tool_calls: list[ToolCall]
# Accumulators detections: list[Detection] policy_decisions: list[PolicyDecision] tokens_used: int cost_usd: Decimal
# Shortcut accessors def redact(self, offset: int, length: int, replacement: str) -> None: ...Rule: a step must leave ctx in a valid state, even on failure. If you partially redact and then raise, the next step gets garbage.
Registering the step
Section titled “Registering the step”REGISTRY: dict[str, type[PipelineStep]] = { "detect_pii": DetectPIIStep, "detect_injection": DetectInjectionStep, # ...}Adding a new step = add the file, add one line here, add it to the default policy config.
Configuration shape
Section titled “Configuration shape”Every step can be tuned per-tenant via step_configs. The Pydantic model:
class StepConfig(BaseModel): enabled: bool = True on_detection: Literal["block", "redact", "notify", "allow"] = "notify" threshold: float | None = None # Step-specific fields via extra="allow" model_config = ConfigDict(extra="allow")Access the extras typed:
validators = config.model_extra.get("validators", ["pii", "regex"])How the engine runs your step
Section titled “How the engine runs your step”# src/tappass/pipeline/engine.py (simplified)async def run_pipeline(phase, ctx, steps, configs, policy): for step in steps: if not configs[step.name].enabled: continue
findings = await step.run(ctx, configs[step.name]) ctx.detections.extend(findings)
for f in findings: verdict = await policy.decide(step.name, f, ctx) ctx.policy_decisions.append(verdict)
if verdict.action == "block": raise PolicyBlockError(step=step.name, reason=f.category) elif verdict.action == "redact": # Already applied by the step; policy just confirms pass # notify / allow — nothing extraSo a step returns Detection objects; the policy engine decides the action. The on_detection field on the step config is a hint the step uses to decide what to put in replacement — policy has the final say.
Testing a step
Section titled “Testing a step”Three test layers. All live in tests/:
async def test_detect_pii_catches_email(fake_backend): ctx = PipelineContextFactory(user_message="Email me at alice@acme.com") step = DetectPIIStep(fake_backend) findings = await step.run(ctx, StepConfig(on_detection="redact"))
assert findings == [ Detection(category="pii.email", severity=Severity.WARN, ...) ] assert "alice@acme.com" not in ctx.user_messageasync def test_pii_end_to_end(client, real_backend, seed_agent): response = await client.post( "/v1/chat/completions", headers={"Authorization": f"Bearer {seed_agent.api_key}"}, json={ "model": "gpt-4o-mini", "messages": [{"role": "user", "content": "ping alice@acme.com"}], }, ) # Fake provider returns 200 with the forwarded user content, # so we can check that the redacted version was what got sent. assert "alice@acme.com" not in response.json()["echoed_user_message"]See Testing for the fixture plumbing.
Performance budget
Section titled “Performance budget”Every step runs on every call. Budget: p95 under 5ms for anything in the pre-LLM phase unless it’s I/O-bound (e.g., calls an external backend like Azure Content Safety).
For I/O steps:
- Timeout strict: 2 seconds hard cap
- On timeout: fail-open-to-audit (the event still lands, with a
timeoutcategory on the step) - Never block the response on retry loops
Common gotchas
Section titled “Common gotchas”- Modifying the user message without returning a Detection. The redaction disappears in the audit trail. Always return a
Detectionwith areplacementfield so evidence is preserved. - Reading secrets from the context.
ctxdoesn’t have the provider key — that’s resolved by the gateway AFTER the pipeline. If your step needs a secret, it’s a vault step or a backend with its own config. - Assuming a single message.
ctx.messagesis a list. Multi-turn conversations flow through. Scan all of them unless you have a reason not to. - Blocking on first detection. Steps should return all findings. Policy decides to short-circuit.
- Forgetting to add the step to the default policy. Step registered but disabled everywhere. Add to
config/policies/default.yamltoo.
When NOT to add a new step
Section titled “When NOT to add a new step”If the rule you want is per-request (a flag), it belongs in Governance Flags, not a step. Flags are per-call overrides on existing steps, not new logic.
If the rule is customer-specific, write it as a Rego override in config/policies/rego/<customer>.rego. Rego is where per-customer deviations live — steps are the platform’s menu.