Skip to Content
SdkMiddleware

Middleware

The SDK includes a middleware system for cross-cutting concerns like multi-tenant isolation, authentication, and request processing. Middleware wraps pipeline execution and can inspect, modify, or enrich the execution context at each stage.

TenantMiddleware

The TenantMiddleware provides multi-tenant data isolation for B2B deployments. It ensures that all pipeline state, checkpoints, and cost records are scoped to the correct tenant.

from src.middleware.tenant_isolation import TenantMiddleware middleware = TenantMiddleware() orchestrator.use(middleware)

When enabled, the middleware extracts tenant context from the incoming request and propagates it through the entire pipeline execution. Every downstream operation — state reads, checkpoint writes, cost recording — is automatically scoped to the tenant.

TenantContext

The TenantContext object carries tenant identity information through the pipeline:

from src.middleware.tenant_isolation import TenantContext context = TenantContext( tenant_id="org_abc123", workspace_id="ws_default", user_id="user_456" )
FieldTypeDescription
tenant_idstrOrganization identifier. All data is isolated at this level.
workspace_idstrWorkspace within the organization. Enables sub-org segmentation.
user_idstrThe user who initiated the pipeline execution. Used for audit trails.

In the B2B API, tenant context is extracted automatically from the request:

  1. JWT claimsorg_id and org_role from the token payload (highest priority)
  2. X-Org-ID header — passed by the dashboard frontend
  3. URL path parameter/organizations/{org_id}/...
# In a FastAPI route, access tenant context via request.state @app.post("/api/v1/admin/pipelines/execute") async def execute_pipeline(request: Request): org_id = request.state.org_id user_id = request.state.user_id org_role = request.state.org_role context = TenantContext( tenant_id=org_id, user_id=user_id ) # Pass context to orchestrator async for event in orchestrator.run(input_data, context=context): yield event

TenantScopedCheckpointStore

The TenantScopedCheckpointStore wraps the base checkpoint store and automatically scopes all reads and writes to the current tenant. This prevents cross-tenant data leakage in the time-travel debugging system.

from src.middleware.tenant_isolation import TenantScopedCheckpointStore # Wrap the base store with tenant scoping scoped_store = TenantScopedCheckpointStore( base_store=checkpoint_store, tenant_id=context.tenant_id ) # All operations are automatically scoped checkpoints = await scoped_store.get_checkpoints(run_id="run_abc123") # Only returns checkpoints belonging to tenant_id="org_abc123"

The scoped store intercepts every operation and adds tenant filtering:

OperationBehavior
save_checkpoint()Tags the checkpoint with the tenant ID
get_checkpoints()Filters results to only the current tenant
delete_checkpoints()Only deletes checkpoints owned by the current tenant
list_runs()Only returns runs belonging to the current tenant

TenantCostTracker

The TenantCostTracker extends the base cost tracker with per-tenant cost isolation and budget enforcement:

from src.middleware.tenant_isolation import TenantCostTracker tracker = TenantCostTracker( tenant_id=context.tenant_id, budget_limit_usd=500.00 # Monthly budget ) # Record a cost event -- automatically scoped to tenant tracker.record( model="gpt-4o", input_tokens=1200, output_tokens=450, agent="style_analysis" ) # Query tenant-specific costs tenant_total = tracker.total_cost # This tenant only by_agent = tracker.by_agent() # Breakdown for this tenant by_model = tracker.by_model() # Breakdown for this tenant budget_remaining = tracker.budget_remaining # Remaining budget

The tenant cost tracker also supports budget alerts:

if tracker.budget_utilization_pct > 80: # Trigger warning notification await notify_budget_warning(context.tenant_id, tracker.budget_utilization_pct) if tracker.budget_remaining <= 0: # Route to approval queue or block execution raise BudgetExceededError(context.tenant_id)

Creating Custom Middleware

You can create custom middleware by implementing the middleware interface:

from src.middleware.tenant_isolation import BaseMiddleware, MiddlewareContext class LoggingMiddleware(BaseMiddleware): """Logs pipeline execution events to an external service.""" async def before_pipeline(self, context: MiddlewareContext) -> None: """Called before the pipeline starts executing.""" self.logger.info( f"Pipeline started: {context.pipeline_id}", extra={"tenant_id": context.tenant_id, "input_keys": list(context.input_data.keys())} ) async def before_agent(self, context: MiddlewareContext, agent_name: str) -> None: """Called before each agent starts executing.""" self.logger.info(f"Agent starting: {agent_name}") async def after_agent(self, context: MiddlewareContext, agent_name: str, result: dict) -> None: """Called after each agent completes successfully.""" self.logger.info( f"Agent completed: {agent_name}", extra={"duration_ms": context.last_duration_ms, "cost": context.last_cost} ) async def after_pipeline(self, context: MiddlewareContext, result: dict) -> None: """Called after the pipeline completes.""" self.logger.info( f"Pipeline completed: {context.pipeline_id}", extra={"total_duration_ms": context.total_duration_ms, "total_cost": context.total_cost} ) async def on_error(self, context: MiddlewareContext, error: Exception) -> None: """Called when an agent or the pipeline encounters an error.""" self.logger.error( f"Pipeline error: {error}", extra={"agent": context.current_agent, "pipeline": context.pipeline_id} )

Register custom middleware with the orchestrator:

orchestrator.use(TenantMiddleware()) orchestrator.use(LoggingMiddleware()) orchestrator.use(MyCustomMiddleware())

Middleware executes in the order it is registered. The before_* hooks run in registration order; the after_* hooks run in reverse order (like a stack).