Python SDK
The curate-me Python SDK provides a complete client for the Curate-Me AI Gateway and managed runner platform. It includes typed models, async support, automatic retry with exponential backoff, and streaming via SSE.
Installation
pip install curate-meRequires Python 3.9 or later. The SDK uses httpx for HTTP and pydantic for typed models.
Quick Start
Gateway Integration (Zero Code Changes)
The fastest way to use Curate-Me is to point your existing OpenAI or Anthropic SDK at the gateway. No new libraries needed:
from openai import OpenAI
client = OpenAI(
base_url="https://api.curate-me.ai/v1/openai",
default_headers={"X-CM-API-Key": "cm_sk_xxx"},
)
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}],
)
print(response.choices[0].message.content)Using the SDK Gateway Wrapper
For more control, use the CurateGateway wrapper which configures provider SDKs automatically:
from curate_me.gateway import CurateGateway
gw = CurateGateway(api_key="cm_sk_xxx", gateway_url="https://api.curate-me.ai")
# Option A: pass your own provider key
client = gw.openai(provider_key="sk-your-openai-key")
# Option B: use a stored secret (no provider key needed)
client = gw.openai()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}],
)Using the Main Client
The CurateMe client provides access to all platform APIs:
from curate_me import CurateMe
# Initialize with API key
client = CurateMe(api_key="cm_xxx", org_id="org_xxx")
# Or from environment variables (CURATE_ME_API_KEY, CURATE_ME_ORG_ID)
client = CurateMe.from_env()
# Use as async context manager
async with CurateMe.from_env() as client:
agents = await client.agents.list()
costs = await client.costs.summary()Gateway Methods
CurateGateway
| Method | Description |
|---|---|
gw.openai(provider_key=...) | Get an OpenAI client configured for the gateway |
gw.anthropic(provider_key=...) | Get an Anthropic client configured for the gateway |
GatewayAdmin
from curate_me.gateway import GatewayAdmin
admin = GatewayAdmin(api_key="cm_sk_xxx")
# Usage and cost tracking
usage = await admin.get_usage(days=7)
costs = await admin.get_daily_costs(days=30)
# Governance policies
policies = await admin.get_policies()
await admin.update_policies(daily_budget_usd=50.0, rate_limit_rpm=100)
# API key management
keys = await admin.list_keys()
key = await admin.create_key(name="production", scopes=["chat", "admin"])
await admin.revoke_key(key_id="key_xxx")Runner Methods
The client.runners property provides typed, high-level runner lifecycle management:
async with CurateMe(api_key="cm_xxx", org_id="org_xxx") as client:
# List runners
runners = await client.runners.list(status="running", limit=10)
# Create a new runner
runner = await client.runners.create(
template="default",
tool_profile="locked", # locked | web_automation | full_vm_tools
name="my-runner",
max_cost=5.0, # USD cost cap
)
# Start a stopped runner
runner = await client.runners.start(runner.runner_id)
# Execute a command
result = await client.runners.execute(
runner.runner_id,
"echo 'Hello from runner!'",
timeout=30,
)
print(result.stdout) # "Hello from runner!"
print(result.exit_code) # 0
print(result.duration_ms) # 150
# Get logs
logs = await client.runners.logs(runner.runner_id, lines=50)
for entry in logs:
print(f"[{entry.level}] {entry.message}")
# Stream logs in real-time
async for entry in client.runners.stream_logs(runner.runner_id):
print(f"[{entry.level}] {entry.message}")
# Stop and delete
await client.runners.stop(runner.runner_id)
await client.runners.delete(runner.runner_id)Runner Models
| Model | Fields |
|---|---|
Runner | runner_id, name, status, template, tool_profile, created_at, cost_total, org_id |
CommandResult | exit_code, stdout, stderr, duration_ms |
LogEntry | timestamp, level, message, runner_id |
Advanced Runner API (via GatewayAdmin)
For full control over runner sessions, artifacts, egress policies, quotas, and more:
from curate_me.gateway import GatewayAdmin
admin = GatewayAdmin(api_key="cm_sk_xxx")
runners = admin.runners
# Full lifecycle with explicit session management
runner = await runners.create(tool_profile="web_automation")
session = await runners.start_session(runner["runner_id"])
result = await runners.exec_command(
runner["runner_id"], session["session_id"], ["npm", "test"]
)
await runners.stop_session(runner["runner_id"], session["session_id"])
await runners.terminate(runner["runner_id"])
# Async command jobs
job = await runners.enqueue_command(
runner_id, session_id, ["python", "train.py"], timeout_seconds=300
)
result = await runners.wait_for_command(runner_id, job["command_id"])
# Egress policy
await runners.update_egress_policy(
allowed_domains=["api.openai.com", "pypi.org"],
allowed_cidrs=[],
allow_all=False,
)
# Quotas
quotas = await runners.get_quotas()
await runners.update_quotas(max_runners=20)Error Handling
The SDK provides specific exception types for different error conditions:
from curate_me import CurateMe, RateLimitError, AuthenticationError
from curate_me.gateway import GatewayGovernanceError
try:
result = await client.agents.run("agent_id", input={"query": "Hello"})
except RateLimitError as e:
print(f"Rate limited. Retry after {e.retry_after} seconds")
except AuthenticationError:
print("Invalid API key")
except GatewayGovernanceError as e:
print(f"Governance policy denied: {e.message}")| Exception | When |
|---|---|
AuthenticationError | Invalid or missing API key |
AuthorizationError | Insufficient permissions |
RateLimitError | Rate limit or budget exceeded (HTTP 429) |
NotFoundError | Resource not found |
ValidationError | Invalid request parameters |
ServerError | Gateway or provider server error |
GatewayGovernanceError | Governance policy denied the request |
GatewayRateLimitError | Gateway-specific rate limit |
Configuration
The SDK reads from environment variables or accepts explicit parameters:
| Environment Variable | Description | Default |
|---|---|---|
CURATE_ME_API_KEY | Your API key | (required) |
CURATE_ME_ORG_ID | Organization ID | (optional) |
CURATE_ME_BASE_URL | API base URL | https://api.curate-me.ai/api/v1 |
client = CurateMe(
api_key="cm_xxx",
org_id="org_xxx",
base_url="http://localhost:8001/api/v1", # Local development
timeout=120.0, # Request timeout in seconds
max_retries=5, # Retry attempts for transient errors
tracing_enabled=True, # Enable automatic tracing
)Streaming
The SDK supports SSE streaming for real-time responses:
# Stream agent execution
async for line in client._http.stream_post(
f"/admin/agents/{agent_id}/run",
json={"query": "Hello"},
):
print(line)
# Resilient streaming with automatic reconnection
from curate_me import resilient_openai_stream
async for chunk in resilient_openai_stream(openai_client, model="gpt-4o", messages=messages):
print(chunk)