Files
agentlens/packages/sdk-python/agentlens/models.py
Vectry 3fe9013838 feat: Python SDK real implementation + API ingestion routes
- SDK: client with BatchTransport, trace decorator/context manager,
  log_decision, thread-local context stack, nested trace→span support
- API: POST /api/traces (batch ingest), GET /api/traces (paginated list),
  GET /api/traces/[id] (full trace with relations), GET /api/health
- Tests: 8 unit tests for SDK (all passing)
- Transport: thread-safe buffer with background flush thread
2026-02-09 23:25:34 +00:00

212 lines
6.4 KiB
Python

"""Data models for AgentLens SDK, matching the server-side Prisma schema."""
import time
import uuid
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, List, Optional
from enum import Enum
class TraceStatus(str, Enum):
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
ERROR = "ERROR"
class DecisionType(str, Enum):
TOOL_SELECTION = "TOOL_SELECTION"
ROUTING = "ROUTING"
RETRY = "RETRY"
ESCALATION = "ESCALATION"
MEMORY_RETRIEVAL = "MEMORY_RETRIEVAL"
PLANNING = "PLANNING"
CUSTOM = "CUSTOM"
class SpanType(str, Enum):
LLM_CALL = "LLM_CALL"
TOOL_CALL = "TOOL_CALL"
MEMORY_OP = "MEMORY_OP"
CHAIN = "CHAIN"
AGENT = "AGENT"
CUSTOM = "CUSTOM"
class SpanStatus(str, Enum):
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
ERROR = "ERROR"
class EventType(str, Enum):
ERROR = "ERROR"
RETRY = "RETRY"
FALLBACK = "FALLBACK"
CONTEXT_OVERFLOW = "CONTEXT_OVERFLOW"
USER_FEEDBACK = "USER_FEEDBACK"
CUSTOM = "CUSTOM"
def _generate_id() -> str:
return str(uuid.uuid4())
def _now_ms() -> int:
return int(time.time() * 1000)
def _now_iso() -> str:
from datetime import datetime, timezone
return datetime.now(timezone.utc).isoformat()
@dataclass
class DecisionPoint:
type: str # DecisionType value
chosen: Dict[str, Any] # {name, confidence, params}
alternatives: List[Dict[str, Any]] # [{name, confidence, reason_rejected}]
reasoning: Optional[str] = None
context_snapshot: Optional[Dict[str, Any]] = (
None # {window_usage_pct, tokens_used, tokens_available}
)
duration_ms: Optional[int] = None
cost_usd: Optional[float] = None
parent_span_id: Optional[str] = None
id: str = field(default_factory=_generate_id)
timestamp: str = field(default_factory=_now_iso)
def to_dict(self) -> Dict[str, Any]:
d = {
"id": self.id,
"type": self.type,
"chosen": self.chosen,
"alternatives": self.alternatives,
"timestamp": self.timestamp,
}
if self.reasoning is not None:
d["reasoning"] = self.reasoning
if self.context_snapshot is not None:
d["contextSnapshot"] = self.context_snapshot
if self.duration_ms is not None:
d["durationMs"] = self.duration_ms
if self.cost_usd is not None:
d["costUsd"] = self.cost_usd
if self.parent_span_id is not None:
d["parentSpanId"] = self.parent_span_id
return d
@dataclass
class Span:
name: str
type: str # SpanType value
id: str = field(default_factory=_generate_id)
parent_span_id: Optional[str] = None
input_data: Optional[Any] = None
output_data: Optional[Any] = None
token_count: Optional[int] = None
cost_usd: Optional[float] = None
duration_ms: Optional[int] = None
status: str = field(default_factory=lambda: SpanStatus.RUNNING.value)
status_message: Optional[str] = None
started_at: str = field(default_factory=_now_iso)
ended_at: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
def to_dict(self) -> Dict[str, Any]:
d = {
"id": self.id,
"name": self.name,
"type": self.type,
"status": self.status,
"startedAt": self.started_at,
}
if self.parent_span_id is not None:
d["parentSpanId"] = self.parent_span_id
if self.input_data is not None:
d["input"] = self.input_data
if self.output_data is not None:
d["output"] = self.output_data
if self.token_count is not None:
d["tokenCount"] = self.token_count
if self.cost_usd is not None:
d["costUsd"] = self.cost_usd
if self.duration_ms is not None:
d["durationMs"] = self.duration_ms
if self.status_message is not None:
d["statusMessage"] = self.status_message
if self.ended_at is not None:
d["endedAt"] = self.ended_at
if self.metadata is not None:
d["metadata"] = self.metadata
return d
@dataclass
class Event:
type: str # EventType value
name: str
span_id: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
id: str = field(default_factory=_generate_id)
timestamp: str = field(default_factory=_now_iso)
def to_dict(self) -> Dict[str, Any]:
d = {
"id": self.id,
"type": self.type,
"name": self.name,
"timestamp": self.timestamp,
}
if self.span_id is not None:
d["spanId"] = self.span_id
if self.metadata is not None:
d["metadata"] = self.metadata
return d
@dataclass
class TraceData:
"""Represents a complete trace to be sent to the server."""
name: str
id: str = field(default_factory=_generate_id)
session_id: Optional[str] = None
status: str = field(default_factory=lambda: TraceStatus.RUNNING.value)
tags: List[str] = field(default_factory=list)
metadata: Optional[Dict[str, Any]] = None
total_cost: Optional[float] = None
total_tokens: Optional[int] = None
total_duration: Optional[int] = None # ms
started_at: str = field(default_factory=_now_iso)
ended_at: Optional[str] = None
decision_points: List[DecisionPoint] = field(default_factory=list)
spans: List[Span] = field(default_factory=list)
events: List[Event] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
d = {
"id": self.id,
"name": self.name,
"status": self.status,
"tags": self.tags,
"startedAt": self.started_at,
"decisionPoints": [dp.to_dict() for dp in self.decision_points],
"spans": [s.to_dict() for s in self.spans],
"events": [e.to_dict() for e in self.events],
}
if self.session_id is not None:
d["sessionId"] = self.session_id
if self.metadata is not None:
d["metadata"] = self.metadata
if self.total_cost is not None:
d["totalCost"] = self.total_cost
if self.total_tokens is not None:
d["totalTokens"] = self.total_tokens
if self.total_duration is not None:
d["totalDuration"] = self.total_duration
if self.ended_at is not None:
d["endedAt"] = self.ended_at
return d