Skip to Content
Getting StartedFirst Pipeline

First Pipeline

This tutorial walks through building a multi-agent pipeline using the Curate-Me orchestrator. You will create agents, wire them together, and run the pipeline with streaming output.

Create a simple agent

Every agent extends BaseAgent and implements an async execute method that yields AgentEvent objects:

from src.agents.base import BaseAgent from src.models.schemas import AgentEvent class SummaryAgent(BaseAgent): name = "summary" async def execute(self, input_data: dict): yield AgentEvent(type="agent_start", agent=self.name) text = input_data.get("text", "") summary = text[:200] + "..." if len(text) > 200 else text yield AgentEvent( type="agent_complete", agent=self.name, result={"summary": summary} )

Set up the orchestrator

The Orchestrator manages agent execution order, streaming, and error handling:

from src.orchestrator.coordinator import Orchestrator orchestrator = Orchestrator(agents=[SummaryAgent()]) async def run(): async for event in orchestrator.run({"text": "Your input text here..."}): print(event) import asyncio asyncio.run(run())

Event types

The pipeline emits structured AgentEvent objects as it executes. These are the core event types:

TypeDescription
agent_startAgent has begun processing
tokenIncremental streaming token from an LLM call
agent_completeAgent finished with a result payload
agent_errorAgent encountered an error
pipeline_startOrchestrator began the pipeline
pipeline_completeAll agents finished successfully

Add a second agent with dependencies

Agents can declare dependencies using depends_on. The orchestrator resolves the execution graph and runs independent agents in parallel:

from src.agents.base import BaseAgent from src.models.schemas import AgentEvent class AnalysisAgent(BaseAgent): name = "analysis" async def execute(self, input_data: dict): yield AgentEvent(type="agent_start", agent=self.name) text = input_data.get("text", "") word_count = len(text.split()) yield AgentEvent( type="agent_complete", agent=self.name, result={"word_count": word_count, "language": "en"} ) class ReportAgent(BaseAgent): name = "report" depends_on = ["summary", "analysis"] async def execute(self, input_data: dict): yield AgentEvent(type="agent_start", agent=self.name) summary_result = input_data.get("summary", {}) analysis_result = input_data.get("analysis", {}) report = { "summary": summary_result.get("summary"), "word_count": analysis_result.get("word_count"), "status": "complete" } yield AgentEvent( type="agent_complete", agent=self.name, result=report )

Run the full pipeline

Wire all three agents together. The orchestrator automatically determines that SummaryAgent and AnalysisAgent can run in parallel, while ReportAgent waits for both to complete:

from src.orchestrator.coordinator import Orchestrator orchestrator = Orchestrator( agents=[ SummaryAgent(), AnalysisAgent(), ReportAgent(), ] ) async def run(): input_data = { "text": "Curate-Me is a platform for managing AI agent pipelines..." } async for event in orchestrator.run(input_data): if event.type == "agent_start": print(f"[START] {event.agent}") elif event.type == "agent_complete": print(f"[DONE] {event.agent}: {event.result}") elif event.type == "pipeline_complete": print("Pipeline finished.") import asyncio asyncio.run(run())

Expected output:

[START] summary [START] analysis [DONE] summary: {'summary': 'Curate-Me is a platform for managing AI agent pipelines...'} [DONE] analysis: {'word_count': 9, 'language': 'en'} [START] report [DONE] report: {'summary': 'Curate-Me is a platform for managing AI agent pipelines...', 'word_count': 9, 'status': 'complete'} Pipeline finished.

Notice that summary and analysis start simultaneously because they have no dependencies on each other.

Next steps

  • Platform Overview — understand the full architecture
  • API Reference — explore the triple API architecture
  • Dashboard — manage agents, runners, and costs from the ops console
  • Gateway — deep dive into the governance chain and LLM proxy