Skip to Content
SdkCreating Agents

Creating Agents

Agents are the fundamental building blocks of the Curate-Me platform. Every agent extends the BaseAgent abstract class and implements a streaming execute() method that yields AgentEvent objects as it processes input.

BaseAgent

The BaseAgent class provides the contract that all agents must implement:

from src.agents.base import BaseAgent from src.models.schemas import AgentEvent from typing import AsyncIterator, Dict class MyAgent(BaseAgent): """A custom agent that processes input data.""" async def execute(self, input_data: Dict) -> AsyncIterator[AgentEvent]: # Signal that this agent has started yield AgentEvent(type="agent_start", agent=self.name) # Your processing logic here result = await self.process(input_data) # Signal completion with the result yield AgentEvent( type="agent_complete", agent=self.name, result=result )

The execute() method is an async generator that receives input data and yields events as the agent progresses through its work. This streaming pattern enables real-time progress updates to the frontend via SSE.

AgentEvent Types

Agents communicate their progress through a set of typed events:

Event TypePurposePayload
agent_startSignals the agent has begun executionagent name
tokenStreams individual tokens during LLM generationtoken string, agent name
agent_progressReports intermediate progressagent name, progress percentage, message
agent_completeSignals successful completionagent name, result dict
errorReports an error during executionagent name, error message, details

Streaming Tokens

For agents that perform LLM generation, yield token events to stream output to the client in real time:

async def execute(self, input_data: Dict) -> AsyncIterator[AgentEvent]: yield AgentEvent(type="agent_start", agent=self.name) async for token in self.llm.stream_generate( prompt=self.build_prompt(input_data), model=self.config.model ): yield AgentEvent(type="token", agent=self.name, token=token) yield AgentEvent(type="agent_complete", agent=self.name, result=self.get_result())

Reporting Progress

For long-running operations, yield agent_progress events to keep the client informed:

async def execute(self, input_data: Dict) -> AsyncIterator[AgentEvent]: yield AgentEvent(type="agent_start", agent=self.name) items = input_data["items"] for i, item in enumerate(items): await self.process_item(item) yield AgentEvent( type="agent_progress", agent=self.name, progress=(i + 1) / len(items) * 100, message=f"Processed {i + 1}/{len(items)} items" ) yield AgentEvent(type="agent_complete", agent=self.name, result=self.results)

Agent Configuration

Agents are configured with a model, prompt version, and custom settings:

class StyleAnalysisAgent(BaseAgent): def __init__(self): super().__init__( name="style_analysis", model="gpt-5.1", prompt_version="v3", settings={ "temperature": 0.3, "max_tokens": 2048, "timeout_ms": 30000, "retry_count": 2, } ) async def execute(self, input_data: Dict) -> AsyncIterator[AgentEvent]: yield AgentEvent(type="agent_start", agent=self.name) prompt = self.load_prompt(self.prompt_version) result = await self.llm.generate( prompt=prompt.format(**input_data), model=self.model, temperature=self.settings["temperature"], max_tokens=self.settings["max_tokens"] ) yield AgentEvent( type="agent_complete", agent=self.name, result={"analysis": result} )

Agent Versioning

Agent prompts are versioned to enable controlled rollouts and A/B testing. The platform supports multiple concurrent prompt versions:

VersionDescription
v1Initial prompt, basic capabilities
v2Improved accuracy with few-shot examples
v2.2Refined edge case handling
v3Current production version with structured output

Prompt versions are stored alongside the agent code and loaded at runtime based on the agent’s prompt_version configuration. This allows you to test a new prompt version on a subset of traffic before rolling it out to all users.

# Switch an agent to a new prompt version agent = StyleAnalysisAgent() agent.prompt_version = "v3" # Or configure per-environment if environment == "staging": agent.prompt_version = "v3.1-rc"

Example: Analysis Agent

Here is a complete example of a custom analysis agent that processes text input and returns structured output:

from src.agents.base import BaseAgent from src.models.schemas import AgentEvent from typing import AsyncIterator, Dict import json class ContentAnalysisAgent(BaseAgent): """Analyzes content and extracts structured metadata.""" def __init__(self): super().__init__( name="content_analysis", model="gpt-4o-mini", prompt_version="v2", settings={ "temperature": 0.1, "max_tokens": 1024, } ) async def execute(self, input_data: Dict) -> AsyncIterator[AgentEvent]: yield AgentEvent(type="agent_start", agent=self.name) content = input_data.get("content", "") if not content: yield AgentEvent( type="error", agent=self.name, error="No content provided", details={"input_keys": list(input_data.keys())} ) return yield AgentEvent( type="agent_progress", agent=self.name, progress=10, message="Analyzing content..." ) prompt = f"""Analyze the following content and return a JSON object with: - "topic": main topic - "sentiment": positive, negative, or neutral - "key_points": list of 3-5 key points - "word_count": approximate word count Content: {content}""" result_text = await self.llm.generate( prompt=prompt, model=self.model, temperature=self.settings["temperature"], max_tokens=self.settings["max_tokens"] ) try: result = json.loads(result_text) except json.JSONDecodeError: result = {"raw_output": result_text} yield AgentEvent( type="agent_complete", agent=self.name, result=result )

Testing Agents

Agents should be tested with pytest using async test fixtures:

import pytest from src.agents.content_analysis import ContentAnalysisAgent @pytest.mark.asyncio async def test_content_analysis_agent(): agent = ContentAnalysisAgent() input_data = {"content": "The product launch was a success. Sales exceeded expectations."} events = [] async for event in agent.execute(input_data): events.append(event) # Verify event sequence assert events[0].type == "agent_start" assert events[-1].type == "agent_complete" # Verify result structure result = events[-1].result assert "topic" in result assert "sentiment" in result assert result["sentiment"] in ["positive", "negative", "neutral"] @pytest.mark.asyncio async def test_content_analysis_empty_input(): agent = ContentAnalysisAgent() input_data = {"content": ""} events = [] async for event in agent.execute(input_data): events.append(event) assert events[-1].type == "error" assert "No content provided" in events[-1].error

Run agent tests with:

cd services/backend poetry run pytest tests/agents/ -v

Registering Agents

After creating an agent, register it in the orchestrator coordinator to make it available in pipelines:

# services/backend/src/orchestrator/coordinator.py from src.agents.content_analysis import ContentAnalysisAgent orchestrator.add_agent("content_analysis", ContentAnalysisAgent())

See Building Pipelines for details on wiring agents into multi-agent workflows.