Skip to Content
Getting StartedFirst Pipeline

First Pipeline

This tutorial walks through building a multi-step pipeline with the Curate-Me Python SDK. A pipeline in Curate-Me is a workflow: a set of steps, each backed by an agent, wired together with dependencies. The platform resolves the execution graph, runs independent steps in parallel, and streams structured events as the run progresses.

Note: All execution happens server-side. The SDK is a thin client over the Curate-Me API — you define and run workflows; the platform handles orchestration, governance, and cost tracking. There is no client-side orchestrator class to subclass.

Install and authenticate

pip install curate-me
from curate_me import CurateMe # Pass credentials explicitly... client = CurateMe(api_key="cm_xxx", org_id="org_xxx") # ...or read CURATE_ME_API_KEY / CURATE_ME_ORG_ID from the environment client = CurateMe.from_env()

Reference an agent

Workflow steps run agents that already exist in your org. You can list the agents available to your key, or create one with a model and config:

# List existing agents agents = await client.agents.list() # Or create a new one summarizer = await client.agents.create( name="summarizer", model="claude-sonnet-4-6", description="Summarizes input text", ) print(summarizer.id) # e.g. "agt_123" — an opaque ID you reference in steps

Each step in a workflow points at an agent by its ID via the agent_id field.

Create a workflow

A workflow is a list of step definitions. Each step has a name, the agent_id it runs, and an optional depends_on list. Steps with no dependencies on each other run in parallel; a step with depends_on waits for the listed steps to finish before it starts.

workflow = await client.workflows.create( name="summarize-and-report", steps=[ {"name": "summary", "agent_id": "agt_summary"}, {"name": "analysis", "agent_id": "agt_analysis"}, { "name": "report", "agent_id": "agt_report", "depends_on": ["summary", "analysis"], }, ], ) print(workflow.id) # e.g. "wf_456"

Here summary and analysis have no dependency on each other, so the platform runs them in parallel. report declares depends_on: ["summary", "analysis"], so it only starts once both upstream steps complete.

Run the workflow

Call run() to execute the workflow and get the final WorkflowRun result:

run = await client.workflows.run( workflow.id, input={"text": "Curate-Me is a platform for orchestrating AI work..."}, ) print(run.status) # ExecutionStatus, e.g. "completed" print(run.step_results) # per-step outputs keyed by step name print(run.total_cost) # total USD cost recorded for the run

Stream execution events

To watch the run as it happens, use stream(). It yields StreamEvent objects with a typed event.type (an EventType) and an event.data payload:

async for event in client.workflows.stream( workflow.id, input={"text": "Curate-Me is a platform for orchestrating AI work..."}, ): if event.type == "workflow_start": print("[START] workflow") elif event.type == "agent_start": print(f"[START] {event.data.get('step') or event.data.get('agent')}") elif event.type == "agent_complete": print(f"[DONE] {event.data.get('step')}: {event.data.get('result')}") elif event.type == "workflow_complete": print("Pipeline finished.")

Event types

The platform emits structured StreamEvent objects as a workflow runs. The type field is one of the SDK’s EventType values:

TypeDescription
workflow_startThe workflow run has begun
workflow_stepExecution advanced to a new step
agent_startA step’s agent has begun processing
tokenIncremental streaming token from an LLM call
agent_progressA step’s agent reported progress
agent_completeA step’s agent finished with a result payload
agent_errorA step’s agent encountered an error
guardrail_checkA governance/guardrail check ran for a step
workflow_completeAll steps finished successfully
workflow_errorThe workflow run failed

These are defined in the SDK’s EventType enum (from curate_me import EventType).

Next steps