Middleware
The SDK includes a middleware system for cross-cutting concerns like multi-tenant isolation, authentication, and request processing. Middleware wraps pipeline execution and can inspect, modify, or enrich the execution context at each stage.
TenantMiddleware
The TenantMiddleware provides multi-tenant data isolation for B2B deployments. It ensures that all pipeline state, checkpoints, and cost records are scoped to the correct tenant.
from src.middleware.tenant_isolation import TenantMiddleware
middleware = TenantMiddleware()
orchestrator.use(middleware)When enabled, the middleware extracts tenant context from the incoming request and propagates it through the entire pipeline execution. Every downstream operation — state reads, checkpoint writes, cost recording — is automatically scoped to the tenant.
TenantContext
The TenantContext object carries tenant identity information through the pipeline:
from src.middleware.tenant_isolation import TenantContext
context = TenantContext(
tenant_id="org_abc123",
workspace_id="ws_default",
user_id="user_456"
)| Field | Type | Description |
|---|---|---|
tenant_id | str | Organization identifier. All data is isolated at this level. |
workspace_id | str | Workspace within the organization. Enables sub-org segmentation. |
user_id | str | The user who initiated the pipeline execution. Used for audit trails. |
In the B2B API, tenant context is extracted automatically from the request:
- JWT claims —
org_idandorg_rolefrom the token payload (highest priority) - X-Org-ID header — passed by the dashboard frontend
- URL path parameter —
/organizations/{org_id}/...
# In a FastAPI route, access tenant context via request.state
@app.post("/api/v1/admin/pipelines/execute")
async def execute_pipeline(request: Request):
org_id = request.state.org_id
user_id = request.state.user_id
org_role = request.state.org_role
context = TenantContext(
tenant_id=org_id,
user_id=user_id
)
# Pass context to orchestrator
async for event in orchestrator.run(input_data, context=context):
yield eventTenantScopedCheckpointStore
The TenantScopedCheckpointStore wraps the base checkpoint store and automatically scopes all reads and writes to the current tenant. This prevents cross-tenant data leakage in the time-travel debugging system.
from src.middleware.tenant_isolation import TenantScopedCheckpointStore
# Wrap the base store with tenant scoping
scoped_store = TenantScopedCheckpointStore(
base_store=checkpoint_store,
tenant_id=context.tenant_id
)
# All operations are automatically scoped
checkpoints = await scoped_store.get_checkpoints(run_id="run_abc123")
# Only returns checkpoints belonging to tenant_id="org_abc123"The scoped store intercepts every operation and adds tenant filtering:
| Operation | Behavior |
|---|---|
save_checkpoint() | Tags the checkpoint with the tenant ID |
get_checkpoints() | Filters results to only the current tenant |
delete_checkpoints() | Only deletes checkpoints owned by the current tenant |
list_runs() | Only returns runs belonging to the current tenant |
TenantCostTracker
The TenantCostTracker extends the base cost tracker with per-tenant cost isolation and budget enforcement:
from src.middleware.tenant_isolation import TenantCostTracker
tracker = TenantCostTracker(
tenant_id=context.tenant_id,
budget_limit_usd=500.00 # Monthly budget
)
# Record a cost event -- automatically scoped to tenant
tracker.record(
model="gpt-4o",
input_tokens=1200,
output_tokens=450,
agent="style_analysis"
)
# Query tenant-specific costs
tenant_total = tracker.total_cost # This tenant only
by_agent = tracker.by_agent() # Breakdown for this tenant
by_model = tracker.by_model() # Breakdown for this tenant
budget_remaining = tracker.budget_remaining # Remaining budgetThe tenant cost tracker also supports budget alerts:
if tracker.budget_utilization_pct > 80:
# Trigger warning notification
await notify_budget_warning(context.tenant_id, tracker.budget_utilization_pct)
if tracker.budget_remaining <= 0:
# Route to approval queue or block execution
raise BudgetExceededError(context.tenant_id)Creating Custom Middleware
You can create custom middleware by implementing the middleware interface:
from src.middleware.tenant_isolation import BaseMiddleware, MiddlewareContext
class LoggingMiddleware(BaseMiddleware):
"""Logs pipeline execution events to an external service."""
async def before_pipeline(self, context: MiddlewareContext) -> None:
"""Called before the pipeline starts executing."""
self.logger.info(
f"Pipeline started: {context.pipeline_id}",
extra={"tenant_id": context.tenant_id, "input_keys": list(context.input_data.keys())}
)
async def before_agent(self, context: MiddlewareContext, agent_name: str) -> None:
"""Called before each agent starts executing."""
self.logger.info(f"Agent starting: {agent_name}")
async def after_agent(self, context: MiddlewareContext, agent_name: str, result: dict) -> None:
"""Called after each agent completes successfully."""
self.logger.info(
f"Agent completed: {agent_name}",
extra={"duration_ms": context.last_duration_ms, "cost": context.last_cost}
)
async def after_pipeline(self, context: MiddlewareContext, result: dict) -> None:
"""Called after the pipeline completes."""
self.logger.info(
f"Pipeline completed: {context.pipeline_id}",
extra={"total_duration_ms": context.total_duration_ms, "total_cost": context.total_cost}
)
async def on_error(self, context: MiddlewareContext, error: Exception) -> None:
"""Called when an agent or the pipeline encounters an error."""
self.logger.error(
f"Pipeline error: {error}",
extra={"agent": context.current_agent, "pipeline": context.pipeline_id}
)Register custom middleware with the orchestrator:
orchestrator.use(TenantMiddleware())
orchestrator.use(LoggingMiddleware())
orchestrator.use(MyCustomMiddleware())Middleware executes in the order it is registered. The before_* hooks run in registration order; the after_* hooks run in reverse order (like a stack).