Skip to Content
SdkObserver SDK

Observer SDK

The Observer SDK provides distributed tracing with per-span cost tracking and human-in-the-loop (HITL) checkpoint decorators. It ships in two languages — Python for backend instrumentation and TypeScript for real-time session observation via WebSocket.

Installation

# Python pip install curate-me-observer # TypeScript npm install @curate-me/observer-sdk

Python: CurateMeTracer

The tracer creates spans, propagates W3C traceparent headers across services, and accumulates per-span cost data. Completed spans are sent to the gateway for storage and visualization in the time-travel debugger.

Quick Start

from observer_sdk.tracer import CurateMeTracer, hitl_checkpoint tracer = CurateMeTracer(api_key="cm_sk_xxx") # Context manager — creates a span, records timing and cost with tracer.span("fetch_data", metadata={"user_id": "u_123"}) as s: data = fetch_data() tracer.record_cost(0.003, "gpt-4o") tracer.record_tokens(500, 150, "gpt-4o") print(f"Duration: {s.duration_ms}ms, Cost: ${s.cost:.4f}")

Constructor

CurateMeTracer( api_key: str, # Gateway API key (cm_sk_xxx) gateway_url: str = "https://api.curate-me.ai", *, enabled: bool = True, # Set False to no-op all spans on_span_end: Callable[[Span], None] = None, # Callback when span ends )

When enabled=False, spans are no-ops — zero overhead in production if you need to disable tracing temporarily.

Span Context Manager

with tracer.span( name="llm_call", metadata={"model": "gpt-4o"}, parent_traceparent="00-abc123...-def456...-01", # Optional: join existing trace ) as span: response = call_llm() tracer.record_cost(0.01, "gpt-4o")

Spans nest automatically. If you open a span inside another span, the inner span becomes a child in the trace tree.

Trace Decorator

Works for both sync and async functions. The span name defaults to the function name.

@tracer.trace("my_func") def my_func(x: int) -> int: return x * 2 @tracer.trace() # Uses function name as span name async def async_func(): await do_work()

Recording Cost and Tokens

Must be called from within a span() context or @trace()-decorated function.

# Record a cost entry for a specific model tracer.record_cost(amount=0.005, model="gpt-4o") # Record token usage tracer.record_tokens(input_tokens=500, output_tokens=150, model="gpt-4o")

W3C Trace Context Propagation

Pass trace context across service boundaries using the standard traceparent header.

# Service A: get the traceparent to pass downstream with tracer.span("service_a_work") as span: traceparent = tracer.get_traceparent() # Pass traceparent as a header to Service B response = httpx.post( "http://service-b/api", headers={"traceparent": traceparent}, ) # Service B: continue the trace incoming_traceparent = request.headers.get("traceparent") with tracer.span("service_b_work", parent_traceparent=incoming_traceparent): process_request()

Retrieving Spans

# Get the currently active span current = tracer.get_current_span() # Get all completed spans (useful for testing) spans = tracer.get_completed_spans()

Python: SpanCostTracker

Accumulates costs and token usage within a single span, with breakdown by model.

from observer_sdk.cost_tracker import SpanCostTracker tracker = SpanCostTracker() tracker.add_cost(0.003, "gpt-4o") tracker.add_cost(0.001, "gpt-4o") tracker.add_cost(0.0005, "claude-haiku-4-5") tracker.get_total() # 0.0045 tracker.get_breakdown() # {"gpt-4o": 0.004, "claude-haiku-4-5": 0.0005} # Token tracking tracker.add_tokens("gpt-4o", input_tokens=500, output_tokens=150) tracker.get_token_breakdown() # {"gpt-4o": {"input": 500, "output": 150}} tracker.get_total_tokens() # {"input": 500, "output": 150} # Merge data from another tracker tracker.merge(other_tracker) # Serialize for storage tracker.to_dict() # {total_cost, cost_by_model, tokens_by_model, ...} # Reset tracker.reset()

Python: HITL Checkpoint Decorator

Gates function execution on human approval when estimated cost exceeds a threshold.

from observer_sdk.tracer import hitl_checkpoint @hitl_checkpoint( reason="Expensive batch processing", threshold_usd=5.0, timeout_seconds=300.0, # Max wait for approval (default: 5 min) poll_interval=2.0, # Seconds between status polls ) async def run_batch(items: list) -> list: return await process_all(items)

When the estimated cost (from the current span’s cost tracker) exceeds threshold_usd:

  1. An approval request is sent to the gateway
  2. The decorator polls for approval status
  3. If approved, the function executes normally
  4. If denied or timed out, raises HITLDeniedError

The checkpoint is also recorded as a timeline event for the time-travel debugger.

Works for both sync and async functions. Parameters:

ParameterTypeDefaultDescription
reasonstrrequiredHuman-readable reason shown in approval UI
threshold_usdfloat1.0Cost threshold that triggers approval
timeout_secondsfloat300.0Max seconds to wait for approval
poll_intervalfloat2.0Seconds between approval status polls
gateway_urlstrNoneOverride gateway URL
api_keystrNoneOverride API key

TypeScript: Observer

The TypeScript Observer provides real-time session observation via WebSocket. It streams agent execution events to the dashboard for live monitoring and HITL intervention.

Quick Start

import { Observer } from '@curate-me/observer-sdk'; const observer = new Observer({ serverUrl: 'wss://api.curate-me.ai/ws/observe', orgId: 'org_12345', apiKey: 'cm_sk_xxx', }); observer.on('onIntervention', (msg) => { console.log('Human intervention:', msg); }); await observer.connect(); const sessionId = await observer.startSession({ workflowId: 'wf_123' }); // Track agent lifecycle observer.trackAgentStart('VisionAgent', 'vision', { imageUrl: '...' }); observer.trackLLMRequest('openai', 'gpt-4o', 500, 4096, 0.7); observer.trackLLMResponse('openai', 'gpt-4o', 150, 650, 1200, 'stop', 0.01); observer.trackAgentComplete('VisionAgent', 'vision', { result: '...' }, 1500, 0.01); await observer.endSession('completed'); await observer.disconnect();

Configuration

const observer = new Observer({ serverUrl: string; // WebSocket server URL orgId: string; // Organization ID apiKey?: string; // API key auth authToken?: string; // JWT auth (alternative) compression?: boolean; // Enable compression (default: true) batchSize?: number; // Events per batch (default: 50) flushInterval?: number; // Flush interval ms (default: 1000) maxBufferSize?: number; // Max buffer before dropping (default: 1000) debug?: boolean; // Log events to console reconnect?: { enabled?: boolean; // Auto-reconnect (default: true) initialDelay?: number; // Initial delay ms (default: 1000) maxDelay?: number; // Max delay ms (default: 30000) maxAttempts?: number; // Max attempts (default: 10) }; privacy?: { redactUserInput?: boolean; // Redact user input (default: true) redactPrompts?: boolean; // Redact LLM prompts (default: true) redactFields?: string[]; // Fields to always redact // Default: ['password', 'apiKey', 'token', 'secret', 'creditCard'] }; });

Event Handlers

observer.on('onConnect', () => { ... }); observer.on('onDisconnect', (reason: string) => { ... }); observer.on('onError', (error: Error) => { ... }); observer.on('onPause', (message) => { ... }); // Server paused the session observer.on('onResume', (message) => { ... }); // Session resumed observer.on('onIntervention', (message) => { ... }); // Human intervention received observer.on('onStateChange', (state: ConnectionState) => { ... }); observer.on('onAck', (lastAckedSequence: number) => { ... });

Connection States

DISCONNECTED | CONNECTING | CONNECTED | RECONNECTING | PAUSED | ERROR

Tracking Methods

MethodPurpose
trackAgentStart(name, type, input, options?)Agent began execution
trackAgentComplete(name, type, output, duration, cost?, tokens?)Agent finished
trackAgentError(name, type, error, recoverable?)Agent failed
trackAgentProgress(name, progress, message?, step?)Progress update (0-100)
trackPipelineStart(id, name, totalSteps, input)Pipeline began
trackPipelineComplete(id, name, output, duration, cost?, tokens?)Pipeline finished
trackPipelineStep(id, stepIndex, name, status, duration?, output?)Step status change
trackLLMRequest(provider, model, promptTokens, maxTokens?, temp?, preview?)LLM call started
trackLLMResponse(provider, model, outputTokens, totalTokens, latency, reason, cost?)LLM response received
trackHITLPause(reason, gateType, state, options?)Execution paused for approval
trackCheckpoint(id, nodeId, stateSize, parentId?, metadata?)Checkpoint created
trackLog(level, message, data?)Generic log event
trackMetric(name, value, unit?, tags?)Metric event
trackError(message, code?, context?, fatal?)Error event

Buffer Management

Events are batched and flushed automatically. Critical events (HITL pauses) flush immediately.

// Manual flush await observer.flush(); // Check buffer stats const stats = observer.getBufferStats(); // { buffered: 12, pending: 1, sequence: 45, lastAcked: 33 }

Integration with Dashboard

Spans and events from the Observer SDK appear in:

  • Time-Travel Debugging — replay execution step-by-step
  • Cost Tracking — per-span cost breakdown by model
  • Approval Queues — HITL checkpoint events appear as approval requests
  • Agent Monitoring — real-time agent status and progress

Next Steps