Skip to Content
SdkBuilding Pipelines

Building Pipelines

The Orchestrator coordinates multi-agent pipelines by managing dependencies, parallel execution, state propagation, and result aggregation. You define which agents to run and how they depend on each other; the Orchestrator handles the execution order and data flow.

Orchestrator

The Orchestrator class is the entry point for building and running pipelines:

from src.orchestrator.coordinator import Orchestrator orchestrator = Orchestrator()

Adding Agents

Register agents with the orchestrator using add_agent(). Each agent receives a unique identifier and an optional list of dependencies.

from src.agents.vision_analysis import VisionAnalysisAgent from src.agents.style_analysis import StyleAnalysisAgent from src.agents.style_critic import StyleCriticAgent orchestrator = Orchestrator() # Agent with no dependencies -- runs first orchestrator.add_agent("vision", VisionAnalysisAgent()) # Agent that depends on vision -- runs after vision completes orchestrator.add_agent("style", StyleAnalysisAgent(), depends_on=["vision"]) # Agent that depends on style -- runs after style completes orchestrator.add_agent("critic", StyleCriticAgent(), depends_on=["style"])

The depends_on parameter accepts a list of agent identifiers. An agent will not start until all of its dependencies have completed successfully.

Parallel Execution

Agents without mutual dependencies execute concurrently. The Orchestrator automatically detects independent branches in the dependency graph and runs them in parallel.

orchestrator = Orchestrator() # These two agents have no dependencies on each other orchestrator.add_agent("color_analysis", ColorAnalysisAgent()) orchestrator.add_agent("texture_analysis", TextureAnalysisAgent()) # This agent depends on both -- waits for both to complete orchestrator.add_agent( "combined_report", CombinedReportAgent(), depends_on=["color_analysis", "texture_analysis"] )

In this example, color_analysis and texture_analysis run concurrently. combined_report starts only after both have completed, receiving their merged outputs as its input.

Conditional Routing

Add conditional logic to route data to different agents based on intermediate results:

orchestrator = Orchestrator() orchestrator.add_agent("classifier", ClassifierAgent()) # Route to different agents based on classifier output orchestrator.add_agent( "detailed_analysis", DetailedAnalysisAgent(), depends_on=["classifier"], condition=lambda result: result["classifier"]["confidence"] > 0.8 ) orchestrator.add_agent( "manual_review", ManualReviewAgent(), depends_on=["classifier"], condition=lambda result: result["classifier"]["confidence"] <= 0.8 )

When a condition function is provided, the agent only executes if the condition returns True. The condition receives the accumulated results from all completed agents.

Pipeline State Management

The Orchestrator maintains a state dictionary that accumulates results from each agent. When an agent completes, its output is merged into the pipeline state under its identifier key.

# After running the pipeline: # state = { # "vision": {"entities": [...], "confidence": 0.94}, # "style": {"analysis_profile": {...}, "results": [...]}, # "critic": {"score": 8.2, "feedback": "..."} # }

Each agent receives the full accumulated state as its input_data, so downstream agents can access the outputs of any upstream agent:

class StyleAnalysisAgent(BaseAgent): async def execute(self, input_data: Dict) -> AsyncIterator[AgentEvent]: # Access upstream agent's output vision_result = input_data["vision"] entities = vision_result["entities"] # ...

Running Pipelines

Execute a pipeline with orchestrator.run(), which returns an async iterator of events from all agents:

async for event in orchestrator.run(initial_input): if event.type == "agent_start": print(f"Started: {event.agent}") elif event.type == "token": print(event.token, end="", flush=True) elif event.type == "agent_complete": print(f"Completed: {event.agent}") elif event.type == "error": print(f"Error in {event.agent}: {event.error}")

SSE Streaming with FastAPI

Integrate pipeline execution with FastAPI’s Server-Sent Events for real-time streaming to the frontend:

from fastapi import FastAPI from fastapi.responses import StreamingResponse import json app = FastAPI() @app.post("/api/v1/analyze") async def analyze(request: AnalysisRequest): async def event_stream(): async for event in orchestrator.run(request.dict()): data = json.dumps({ "type": event.type, "agent": event.agent, "result": event.result, "token": getattr(event, "token", None), }) yield f"data: {data}\n\n" yield "data: [DONE]\n\n" return StreamingResponse( event_stream(), media_type="text/event-stream" )

On the frontend, consume the SSE stream with EventSource:

const eventSource = new EventSource('/api/v1/analyze', { method: 'POST', body: JSON.stringify(input), }); eventSource.onmessage = (event) => { if (event.data === '[DONE]') { eventSource.close(); return; } const agentEvent = JSON.parse(event.data); handleEvent(agentEvent); };

JSON Workflow Definitions

Pipelines can be defined as JSON for integration with the visual workflow builder or for version-controlled pipeline configurations:

{ "workflow_id": "content-pipeline-v2", "name": "Content Analysis Pipeline", "nodes": [ { "id": "extract", "agent": "ContentExtractor", "model": "gpt-4o-mini", "config": { "max_tokens": 2048 } }, { "id": "analyze", "agent": "ContentAnalysis", "model": "gpt-4o", "depends_on": ["extract"] }, { "id": "summarize", "agent": "Summarizer", "model": "gpt-5-nano", "depends_on": ["analyze"] } ], "edges": [ { "from": "extract", "to": "analyze" }, { "from": "analyze", "to": "summarize" } ] }

Load and execute a JSON workflow:

orchestrator = Orchestrator.from_json(workflow_definition) async for event in orchestrator.run(input_data): handle_event(event)

This approach decouples pipeline structure from application code, enabling non-technical users to create and modify workflows through the dashboard’s visual builder without touching Python.