Creating Agents
Agents are the fundamental building blocks of the Curate-Me platform. Every agent extends the BaseAgent abstract class and implements a streaming execute() method that yields AgentEvent objects as it processes input.
BaseAgent
The BaseAgent class provides the contract that all agents must implement:
from src.agents.base import BaseAgent
from src.models.schemas import AgentEvent
from typing import AsyncIterator, Dict
class MyAgent(BaseAgent):
"""A custom agent that processes input data."""
async def execute(self, input_data: Dict) -> AsyncIterator[AgentEvent]:
# Signal that this agent has started
yield AgentEvent(type="agent_start", agent=self.name)
# Your processing logic here
result = await self.process(input_data)
# Signal completion with the result
yield AgentEvent(
type="agent_complete",
agent=self.name,
result=result
)The execute() method is an async generator that receives input data and yields events as the agent progresses through its work. This streaming pattern enables real-time progress updates to the frontend via SSE.
AgentEvent Types
Agents communicate their progress through a set of typed events:
| Event Type | Purpose | Payload |
|---|---|---|
agent_start | Signals the agent has begun execution | agent name |
token | Streams individual tokens during LLM generation | token string, agent name |
agent_progress | Reports intermediate progress | agent name, progress percentage, message |
agent_complete | Signals successful completion | agent name, result dict |
error | Reports an error during execution | agent name, error message, details |
Streaming Tokens
For agents that perform LLM generation, yield token events to stream output to the client in real time:
async def execute(self, input_data: Dict) -> AsyncIterator[AgentEvent]:
yield AgentEvent(type="agent_start", agent=self.name)
async for token in self.llm.stream_generate(
prompt=self.build_prompt(input_data),
model=self.config.model
):
yield AgentEvent(type="token", agent=self.name, token=token)
yield AgentEvent(type="agent_complete", agent=self.name, result=self.get_result())Reporting Progress
For long-running operations, yield agent_progress events to keep the client informed:
async def execute(self, input_data: Dict) -> AsyncIterator[AgentEvent]:
yield AgentEvent(type="agent_start", agent=self.name)
items = input_data["items"]
for i, item in enumerate(items):
await self.process_item(item)
yield AgentEvent(
type="agent_progress",
agent=self.name,
progress=(i + 1) / len(items) * 100,
message=f"Processed {i + 1}/{len(items)} items"
)
yield AgentEvent(type="agent_complete", agent=self.name, result=self.results)Agent Configuration
Agents are configured with a model, prompt version, and custom settings:
class StyleAnalysisAgent(BaseAgent):
def __init__(self):
super().__init__(
name="style_analysis",
model="gpt-5.1",
prompt_version="v3",
settings={
"temperature": 0.3,
"max_tokens": 2048,
"timeout_ms": 30000,
"retry_count": 2,
}
)
async def execute(self, input_data: Dict) -> AsyncIterator[AgentEvent]:
yield AgentEvent(type="agent_start", agent=self.name)
prompt = self.load_prompt(self.prompt_version)
result = await self.llm.generate(
prompt=prompt.format(**input_data),
model=self.model,
temperature=self.settings["temperature"],
max_tokens=self.settings["max_tokens"]
)
yield AgentEvent(
type="agent_complete",
agent=self.name,
result={"analysis": result}
)Agent Versioning
Agent prompts are versioned to enable controlled rollouts and A/B testing. The platform supports multiple concurrent prompt versions:
| Version | Description |
|---|---|
v1 | Initial prompt, basic capabilities |
v2 | Improved accuracy with few-shot examples |
v2.2 | Refined edge case handling |
v3 | Current production version with structured output |
Prompt versions are stored alongside the agent code and loaded at runtime based on the agent’s prompt_version configuration. This allows you to test a new prompt version on a subset of traffic before rolling it out to all users.
# Switch an agent to a new prompt version
agent = StyleAnalysisAgent()
agent.prompt_version = "v3"
# Or configure per-environment
if environment == "staging":
agent.prompt_version = "v3.1-rc"Example: Analysis Agent
Here is a complete example of a custom analysis agent that processes text input and returns structured output:
from src.agents.base import BaseAgent
from src.models.schemas import AgentEvent
from typing import AsyncIterator, Dict
import json
class ContentAnalysisAgent(BaseAgent):
"""Analyzes content and extracts structured metadata."""
def __init__(self):
super().__init__(
name="content_analysis",
model="gpt-4o-mini",
prompt_version="v2",
settings={
"temperature": 0.1,
"max_tokens": 1024,
}
)
async def execute(self, input_data: Dict) -> AsyncIterator[AgentEvent]:
yield AgentEvent(type="agent_start", agent=self.name)
content = input_data.get("content", "")
if not content:
yield AgentEvent(
type="error",
agent=self.name,
error="No content provided",
details={"input_keys": list(input_data.keys())}
)
return
yield AgentEvent(
type="agent_progress",
agent=self.name,
progress=10,
message="Analyzing content..."
)
prompt = f"""Analyze the following content and return a JSON object with:
- "topic": main topic
- "sentiment": positive, negative, or neutral
- "key_points": list of 3-5 key points
- "word_count": approximate word count
Content:
{content}"""
result_text = await self.llm.generate(
prompt=prompt,
model=self.model,
temperature=self.settings["temperature"],
max_tokens=self.settings["max_tokens"]
)
try:
result = json.loads(result_text)
except json.JSONDecodeError:
result = {"raw_output": result_text}
yield AgentEvent(
type="agent_complete",
agent=self.name,
result=result
)Testing Agents
Agents should be tested with pytest using async test fixtures:
import pytest
from src.agents.content_analysis import ContentAnalysisAgent
@pytest.mark.asyncio
async def test_content_analysis_agent():
agent = ContentAnalysisAgent()
input_data = {"content": "The product launch was a success. Sales exceeded expectations."}
events = []
async for event in agent.execute(input_data):
events.append(event)
# Verify event sequence
assert events[0].type == "agent_start"
assert events[-1].type == "agent_complete"
# Verify result structure
result = events[-1].result
assert "topic" in result
assert "sentiment" in result
assert result["sentiment"] in ["positive", "negative", "neutral"]
@pytest.mark.asyncio
async def test_content_analysis_empty_input():
agent = ContentAnalysisAgent()
input_data = {"content": ""}
events = []
async for event in agent.execute(input_data):
events.append(event)
assert events[-1].type == "error"
assert "No content provided" in events[-1].errorRun agent tests with:
cd services/backend
poetry run pytest tests/agents/ -vRegistering Agents
After creating an agent, register it in the orchestrator coordinator to make it available in pipelines:
# services/backend/src/orchestrator/coordinator.py
from src.agents.content_analysis import ContentAnalysisAgent
orchestrator.add_agent("content_analysis", ContentAnalysisAgent())See Building Pipelines for details on wiring agents into multi-agent workflows.