Observer SDK
The Observer SDK provides distributed tracing with per-span cost tracking and human-in-the-loop (HITL) checkpoint decorators. It ships in two languages — Python for backend instrumentation and TypeScript for real-time session observation via WebSocket.
Installation
# Python
pip install curate-me-observer
# TypeScript
npm install @curate-me/observer-sdkPython: CurateMeTracer
The tracer creates spans, propagates W3C traceparent headers across services, and accumulates per-span cost data. Completed spans are sent to the gateway for storage and visualization in the time-travel debugger.
Quick Start
from observer_sdk.tracer import CurateMeTracer, hitl_checkpoint
tracer = CurateMeTracer(api_key="cm_sk_xxx")
# Context manager — creates a span, records timing and cost
with tracer.span("fetch_data", metadata={"user_id": "u_123"}) as s:
data = fetch_data()
tracer.record_cost(0.003, "gpt-4o")
tracer.record_tokens(500, 150, "gpt-4o")
print(f"Duration: {s.duration_ms}ms, Cost: ${s.cost:.4f}")Constructor
CurateMeTracer(
api_key: str, # Gateway API key (cm_sk_xxx)
gateway_url: str = "https://api.curate-me.ai",
*,
enabled: bool = True, # Set False to no-op all spans
on_span_end: Callable[[Span], None] = None, # Callback when span ends
)When enabled=False, spans are no-ops — zero overhead in production if you need to disable tracing temporarily.
Span Context Manager
with tracer.span(
name="llm_call",
metadata={"model": "gpt-4o"},
parent_traceparent="00-abc123...-def456...-01", # Optional: join existing trace
) as span:
response = call_llm()
tracer.record_cost(0.01, "gpt-4o")Spans nest automatically. If you open a span inside another span, the inner span becomes a child in the trace tree.
Trace Decorator
Works for both sync and async functions. The span name defaults to the function name.
@tracer.trace("my_func")
def my_func(x: int) -> int:
return x * 2
@tracer.trace() # Uses function name as span name
async def async_func():
await do_work()Recording Cost and Tokens
Must be called from within a span() context or @trace()-decorated function.
# Record a cost entry for a specific model
tracer.record_cost(amount=0.005, model="gpt-4o")
# Record token usage
tracer.record_tokens(input_tokens=500, output_tokens=150, model="gpt-4o")W3C Trace Context Propagation
Pass trace context across service boundaries using the standard traceparent header.
# Service A: get the traceparent to pass downstream
with tracer.span("service_a_work") as span:
traceparent = tracer.get_traceparent()
# Pass traceparent as a header to Service B
response = httpx.post(
"http://service-b/api",
headers={"traceparent": traceparent},
)
# Service B: continue the trace
incoming_traceparent = request.headers.get("traceparent")
with tracer.span("service_b_work", parent_traceparent=incoming_traceparent):
process_request()Retrieving Spans
# Get the currently active span
current = tracer.get_current_span()
# Get all completed spans (useful for testing)
spans = tracer.get_completed_spans()Python: SpanCostTracker
Accumulates costs and token usage within a single span, with breakdown by model.
from observer_sdk.cost_tracker import SpanCostTracker
tracker = SpanCostTracker()
tracker.add_cost(0.003, "gpt-4o")
tracker.add_cost(0.001, "gpt-4o")
tracker.add_cost(0.0005, "claude-haiku-4-5")
tracker.get_total() # 0.0045
tracker.get_breakdown() # {"gpt-4o": 0.004, "claude-haiku-4-5": 0.0005}
# Token tracking
tracker.add_tokens("gpt-4o", input_tokens=500, output_tokens=150)
tracker.get_token_breakdown() # {"gpt-4o": {"input": 500, "output": 150}}
tracker.get_total_tokens() # {"input": 500, "output": 150}
# Merge data from another tracker
tracker.merge(other_tracker)
# Serialize for storage
tracker.to_dict() # {total_cost, cost_by_model, tokens_by_model, ...}
# Reset
tracker.reset()Python: HITL Checkpoint Decorator
Gates function execution on human approval when estimated cost exceeds a threshold.
from observer_sdk.tracer import hitl_checkpoint
@hitl_checkpoint(
reason="Expensive batch processing",
threshold_usd=5.0,
timeout_seconds=300.0, # Max wait for approval (default: 5 min)
poll_interval=2.0, # Seconds between status polls
)
async def run_batch(items: list) -> list:
return await process_all(items)When the estimated cost (from the current span’s cost tracker) exceeds threshold_usd:
- An approval request is sent to the gateway
- The decorator polls for approval status
- If approved, the function executes normally
- If denied or timed out, raises
HITLDeniedError
The checkpoint is also recorded as a timeline event for the time-travel debugger.
Works for both sync and async functions. Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
reason | str | required | Human-readable reason shown in approval UI |
threshold_usd | float | 1.0 | Cost threshold that triggers approval |
timeout_seconds | float | 300.0 | Max seconds to wait for approval |
poll_interval | float | 2.0 | Seconds between approval status polls |
gateway_url | str | None | Override gateway URL |
api_key | str | None | Override API key |
TypeScript: Observer
The TypeScript Observer provides real-time session observation via WebSocket. It streams agent execution events to the dashboard for live monitoring and HITL intervention.
Quick Start
import { Observer } from '@curate-me/observer-sdk';
const observer = new Observer({
serverUrl: 'wss://api.curate-me.ai/ws/observe',
orgId: 'org_12345',
apiKey: 'cm_sk_xxx',
});
observer.on('onIntervention', (msg) => {
console.log('Human intervention:', msg);
});
await observer.connect();
const sessionId = await observer.startSession({ workflowId: 'wf_123' });
// Track agent lifecycle
observer.trackAgentStart('VisionAgent', 'vision', { imageUrl: '...' });
observer.trackLLMRequest('openai', 'gpt-4o', 500, 4096, 0.7);
observer.trackLLMResponse('openai', 'gpt-4o', 150, 650, 1200, 'stop', 0.01);
observer.trackAgentComplete('VisionAgent', 'vision', { result: '...' }, 1500, 0.01);
await observer.endSession('completed');
await observer.disconnect();Configuration
const observer = new Observer({
serverUrl: string; // WebSocket server URL
orgId: string; // Organization ID
apiKey?: string; // API key auth
authToken?: string; // JWT auth (alternative)
compression?: boolean; // Enable compression (default: true)
batchSize?: number; // Events per batch (default: 50)
flushInterval?: number; // Flush interval ms (default: 1000)
maxBufferSize?: number; // Max buffer before dropping (default: 1000)
debug?: boolean; // Log events to console
reconnect?: {
enabled?: boolean; // Auto-reconnect (default: true)
initialDelay?: number; // Initial delay ms (default: 1000)
maxDelay?: number; // Max delay ms (default: 30000)
maxAttempts?: number; // Max attempts (default: 10)
};
privacy?: {
redactUserInput?: boolean; // Redact user input (default: true)
redactPrompts?: boolean; // Redact LLM prompts (default: true)
redactFields?: string[]; // Fields to always redact
// Default: ['password', 'apiKey', 'token', 'secret', 'creditCard']
};
});Event Handlers
observer.on('onConnect', () => { ... });
observer.on('onDisconnect', (reason: string) => { ... });
observer.on('onError', (error: Error) => { ... });
observer.on('onPause', (message) => { ... }); // Server paused the session
observer.on('onResume', (message) => { ... }); // Session resumed
observer.on('onIntervention', (message) => { ... }); // Human intervention received
observer.on('onStateChange', (state: ConnectionState) => { ... });
observer.on('onAck', (lastAckedSequence: number) => { ... });Connection States
DISCONNECTED | CONNECTING | CONNECTED | RECONNECTING | PAUSED | ERROR
Tracking Methods
| Method | Purpose |
|---|---|
trackAgentStart(name, type, input, options?) | Agent began execution |
trackAgentComplete(name, type, output, duration, cost?, tokens?) | Agent finished |
trackAgentError(name, type, error, recoverable?) | Agent failed |
trackAgentProgress(name, progress, message?, step?) | Progress update (0-100) |
trackPipelineStart(id, name, totalSteps, input) | Pipeline began |
trackPipelineComplete(id, name, output, duration, cost?, tokens?) | Pipeline finished |
trackPipelineStep(id, stepIndex, name, status, duration?, output?) | Step status change |
trackLLMRequest(provider, model, promptTokens, maxTokens?, temp?, preview?) | LLM call started |
trackLLMResponse(provider, model, outputTokens, totalTokens, latency, reason, cost?) | LLM response received |
trackHITLPause(reason, gateType, state, options?) | Execution paused for approval |
trackCheckpoint(id, nodeId, stateSize, parentId?, metadata?) | Checkpoint created |
trackLog(level, message, data?) | Generic log event |
trackMetric(name, value, unit?, tags?) | Metric event |
trackError(message, code?, context?, fatal?) | Error event |
Buffer Management
Events are batched and flushed automatically. Critical events (HITL pauses) flush immediately.
// Manual flush
await observer.flush();
// Check buffer stats
const stats = observer.getBufferStats();
// { buffered: 12, pending: 1, sequence: 45, lastAcked: 33 }Integration with Dashboard
Spans and events from the Observer SDK appear in:
- Time-Travel Debugging — replay execution step-by-step
- Cost Tracking — per-span cost breakdown by model
- Approval Queues — HITL checkpoint events appear as approval requests
- Agent Monitoring — real-time agent status and progress
Next Steps
- Time-Travel Debugging — visualize traces in the dashboard
- Cost Tracking — configure budgets and alerts
- Approval Queues — manage HITL approvals
- Gateway Overview — understand the governance chain that generates trace data