First Pipeline
This tutorial walks through building a multi-agent pipeline using the Curate-Me orchestrator. You will create agents, wire them together, and run the pipeline with streaming output.
Create a simple agent
Every agent extends BaseAgent and implements an async execute method that yields AgentEvent objects:
from src.agents.base import BaseAgent
from src.models.schemas import AgentEvent
class SummaryAgent(BaseAgent):
name = "summary"
async def execute(self, input_data: dict):
yield AgentEvent(type="agent_start", agent=self.name)
text = input_data.get("text", "")
summary = text[:200] + "..." if len(text) > 200 else text
yield AgentEvent(
type="agent_complete",
agent=self.name,
result={"summary": summary}
)Set up the orchestrator
The Orchestrator manages agent execution order, streaming, and error handling:
from src.orchestrator.coordinator import Orchestrator
orchestrator = Orchestrator(agents=[SummaryAgent()])
async def run():
async for event in orchestrator.run({"text": "Your input text here..."}):
print(event)
import asyncio
asyncio.run(run())Event types
The pipeline emits structured AgentEvent objects as it executes. These are the core event types:
| Type | Description |
|---|---|
agent_start | Agent has begun processing |
token | Incremental streaming token from an LLM call |
agent_complete | Agent finished with a result payload |
agent_error | Agent encountered an error |
pipeline_start | Orchestrator began the pipeline |
pipeline_complete | All agents finished successfully |
Add a second agent with dependencies
Agents can declare dependencies using depends_on. The orchestrator resolves the execution graph and runs independent agents in parallel:
from src.agents.base import BaseAgent
from src.models.schemas import AgentEvent
class AnalysisAgent(BaseAgent):
name = "analysis"
async def execute(self, input_data: dict):
yield AgentEvent(type="agent_start", agent=self.name)
text = input_data.get("text", "")
word_count = len(text.split())
yield AgentEvent(
type="agent_complete",
agent=self.name,
result={"word_count": word_count, "language": "en"}
)
class ReportAgent(BaseAgent):
name = "report"
depends_on = ["summary", "analysis"]
async def execute(self, input_data: dict):
yield AgentEvent(type="agent_start", agent=self.name)
summary_result = input_data.get("summary", {})
analysis_result = input_data.get("analysis", {})
report = {
"summary": summary_result.get("summary"),
"word_count": analysis_result.get("word_count"),
"status": "complete"
}
yield AgentEvent(
type="agent_complete",
agent=self.name,
result=report
)Run the full pipeline
Wire all three agents together. The orchestrator automatically determines that SummaryAgent and AnalysisAgent can run in parallel, while ReportAgent waits for both to complete:
from src.orchestrator.coordinator import Orchestrator
orchestrator = Orchestrator(
agents=[
SummaryAgent(),
AnalysisAgent(),
ReportAgent(),
]
)
async def run():
input_data = {
"text": "Curate-Me is a platform for managing AI agent pipelines..."
}
async for event in orchestrator.run(input_data):
if event.type == "agent_start":
print(f"[START] {event.agent}")
elif event.type == "agent_complete":
print(f"[DONE] {event.agent}: {event.result}")
elif event.type == "pipeline_complete":
print("Pipeline finished.")
import asyncio
asyncio.run(run())Expected output:
[START] summary
[START] analysis
[DONE] summary: {'summary': 'Curate-Me is a platform for managing AI agent pipelines...'}
[DONE] analysis: {'word_count': 9, 'language': 'en'}
[START] report
[DONE] report: {'summary': 'Curate-Me is a platform for managing AI agent pipelines...', 'word_count': 9, 'status': 'complete'}
Pipeline finished.Notice that summary and analysis start simultaneously because they have no dependencies on each other.
Next steps
- Platform Overview — understand the full architecture
- API Reference — explore the triple API architecture
- Dashboard — manage agents, runners, and costs from the ops console
- Gateway — deep dive into the governance chain and LLM proxy