Skip to content

Pipeline step anatomy

Every governance rule lives in a pipeline step. This page shows how one actually works, end to end — from an HTTP call hitting the gateway to a Detection object landing in the audit trail.

src/tappass/pipeline/protocol.py
class PipelineStep(Protocol):
name: str
phase: Literal["pre", "post"]
default_config: StepConfig
async def run(
self,
ctx: PipelineContext,
config: StepConfig,
) -> list[Detection]:
"""Inspect ctx. Return findings. Optionally modify ctx in place."""

One file per step under src/tappass/pipeline/steps/.

src/tappass/pipeline/steps/detect_pii.py
from tappass.pipeline.protocol import PipelineStep, StepConfig
from tappass.pipeline.context import PipelineContext
from tappass.pipeline.types import Detection, Severity
from tappass.pipeline.backends import LLMGuardBackend
class DetectPIIStep:
name = "detect_pii"
phase = "pre"
default_config = StepConfig(enabled=True, on_detection="redact")
def __init__(self, backend: LLMGuardBackend):
self._backend = backend
async def run(
self,
ctx: PipelineContext,
config: StepConfig,
) -> list[Detection]:
findings = await self._backend.scan(
ctx.user_message,
direction="input",
config=config,
)
if config.on_detection == "redact":
ctx.user_message = _apply_redactions(ctx.user_message, findings)
return [
Detection(
category=f.category,
severity=Severity(f.severity),
offset=f.offset,
length=f.length,
replacement=f.replacement if config.on_detection == "redact" else None,
)
for f in findings
]
@dataclass
class PipelineContext:
tenant: Tenant
agent: Agent
session_id: str
audit_id: str # the event ID — fixed at ctx construction
# The request as the agent sent it
original_payload: dict
user_message: str # the "current" text (may be modified)
messages: list[Message]
model: str
# Tool calls, if any
tool_calls: list[ToolCall]
# Accumulators
detections: list[Detection]
policy_decisions: list[PolicyDecision]
tokens_used: int
cost_usd: Decimal
# Shortcut accessors
def redact(self, offset: int, length: int, replacement: str) -> None: ...

Rule: a step must leave ctx in a valid state, even on failure. If you partially redact and then raise, the next step gets garbage.

src/tappass/pipeline/registry.py
REGISTRY: dict[str, type[PipelineStep]] = {
"detect_pii": DetectPIIStep,
"detect_injection": DetectInjectionStep,
# ...
}

Adding a new step = add the file, add one line here, add it to the default policy config.

Every step can be tuned per-tenant via step_configs. The Pydantic model:

class StepConfig(BaseModel):
enabled: bool = True
on_detection: Literal["block", "redact", "notify", "allow"] = "notify"
threshold: float | None = None
# Step-specific fields via extra="allow"
model_config = ConfigDict(extra="allow")

Access the extras typed:

validators = config.model_extra.get("validators", ["pii", "regex"])
# src/tappass/pipeline/engine.py (simplified)
async def run_pipeline(phase, ctx, steps, configs, policy):
for step in steps:
if not configs[step.name].enabled:
continue
findings = await step.run(ctx, configs[step.name])
ctx.detections.extend(findings)
for f in findings:
verdict = await policy.decide(step.name, f, ctx)
ctx.policy_decisions.append(verdict)
if verdict.action == "block":
raise PolicyBlockError(step=step.name, reason=f.category)
elif verdict.action == "redact":
# Already applied by the step; policy just confirms
pass
# notify / allow — nothing extra

So a step returns Detection objects; the policy engine decides the action. The on_detection field on the step config is a hint the step uses to decide what to put in replacement — policy has the final say.

Three test layers. All live in tests/:

tests/unit/pipeline/steps/test_detect_pii.py
async def test_detect_pii_catches_email(fake_backend):
ctx = PipelineContextFactory(user_message="Email me at alice@acme.com")
step = DetectPIIStep(fake_backend)
findings = await step.run(ctx, StepConfig(on_detection="redact"))
assert findings == [
Detection(category="pii.email", severity=Severity.WARN, ...)
]
assert "alice@acme.com" not in ctx.user_message
tests/integration/test_pipeline_pii.py
async def test_pii_end_to_end(client, real_backend, seed_agent):
response = await client.post(
"/v1/chat/completions",
headers={"Authorization": f"Bearer {seed_agent.api_key}"},
json={
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "ping alice@acme.com"}],
},
)
# Fake provider returns 200 with the forwarded user content,
# so we can check that the redacted version was what got sent.
assert "alice@acme.com" not in response.json()["echoed_user_message"]

See Testing for the fixture plumbing.

Every step runs on every call. Budget: p95 under 5ms for anything in the pre-LLM phase unless it’s I/O-bound (e.g., calls an external backend like Azure Content Safety).

For I/O steps:

  • Timeout strict: 2 seconds hard cap
  • On timeout: fail-open-to-audit (the event still lands, with a timeout category on the step)
  • Never block the response on retry loops
  1. Modifying the user message without returning a Detection. The redaction disappears in the audit trail. Always return a Detection with a replacement field so evidence is preserved.
  2. Reading secrets from the context. ctx doesn’t have the provider key — that’s resolved by the gateway AFTER the pipeline. If your step needs a secret, it’s a vault step or a backend with its own config.
  3. Assuming a single message. ctx.messages is a list. Multi-turn conversations flow through. Scan all of them unless you have a reason not to.
  4. Blocking on first detection. Steps should return all findings. Policy decides to short-circuit.
  5. Forgetting to add the step to the default policy. Step registered but disabled everywhere. Add to config/policies/default.yaml too.

If the rule you want is per-request (a flag), it belongs in Governance Flags, not a step. Flags are per-call overrides on existing steps, not new logic.

If the rule is customer-specific, write it as a Rego override in config/policies/rego/<customer>.rego. Rego is where per-customer deviations live — steps are the platform’s menu.