diff --git a/apps/web/src/app/api/settings/purge/route.ts b/apps/web/src/app/api/settings/purge/route.ts new file mode 100644 index 0000000..819eb6f --- /dev/null +++ b/apps/web/src/app/api/settings/purge/route.ts @@ -0,0 +1,21 @@ +import { NextResponse } from "next/server"; +import { prisma } from "@/lib/prisma"; + +export async function POST() { + try { + await prisma.$transaction([ + prisma.event.deleteMany(), + prisma.decisionPoint.deleteMany(), + prisma.span.deleteMany(), + prisma.trace.deleteMany(), + ]); + + return NextResponse.json({ success: true }, { status: 200 }); + } catch (error) { + console.error("Error purging data:", error); + return NextResponse.json( + { error: "Internal server error" }, + { status: 500 } + ); + } +} diff --git a/apps/web/src/app/api/settings/stats/route.ts b/apps/web/src/app/api/settings/stats/route.ts new file mode 100644 index 0000000..85fc13e --- /dev/null +++ b/apps/web/src/app/api/settings/stats/route.ts @@ -0,0 +1,25 @@ +import { NextResponse } from "next/server"; +import { prisma } from "@/lib/prisma"; + +export async function GET() { + try { + const [totalTraces, totalSpans, totalDecisions, totalEvents] = + await Promise.all([ + prisma.trace.count(), + prisma.span.count(), + prisma.decisionPoint.count(), + prisma.event.count(), + ]); + + return NextResponse.json( + { totalTraces, totalSpans, totalDecisions, totalEvents }, + { status: 200 } + ); + } catch (error) { + console.error("Error fetching stats:", error); + return NextResponse.json( + { error: "Internal server error" }, + { status: 500 } + ); + } +} diff --git a/apps/web/src/app/api/traces/[id]/route.ts b/apps/web/src/app/api/traces/[id]/route.ts index e1d2910..6e79afb 100644 --- a/apps/web/src/app/api/traces/[id]/route.ts +++ b/apps/web/src/app/api/traces/[id]/route.ts @@ -1,10 +1,26 @@ -import { NextResponse } from "next/server"; +import { NextRequest, NextResponse } from "next/server"; import { prisma } from "@/lib/prisma"; +type RouteParams = { params: Promise<{ id: string }> }; + +function extractActionLabel(value: unknown): string { + if (typeof value === "string") return value; + if (value && typeof value === "object" && !Array.isArray(value)) { + const obj = value as Record; + if (typeof obj.name === "string") return obj.name; + if (typeof obj.action === "string") return obj.action; + if (typeof obj.tool === "string") return obj.tool; + for (const v of Object.values(obj)) { + if (typeof v === "string") return v; + } + } + return JSON.stringify(value); +} + // GET /api/traces/[id] — Get single trace with all relations export async function GET( - _request: Request, - { params }: { params: Promise<{ id: string }> } + _request: NextRequest, + { params }: RouteParams ) { try { const { id } = await params; @@ -41,11 +57,13 @@ export async function GET( // Transform data to match frontend expectations const transformedTrace = { ...trace, + durationMs: trace.totalDuration, + costUsd: trace.totalCost, decisionPoints: trace.decisionPoints.map((dp) => ({ id: dp.id, type: dp.type, - chosenAction: typeof dp.chosen === "string" ? dp.chosen : JSON.stringify(dp.chosen), - alternatives: dp.alternatives.map((alt) => (typeof alt === "string" ? alt : JSON.stringify(alt))), + chosenAction: extractActionLabel(dp.chosen), + alternatives: dp.alternatives.map((alt) => extractActionLabel(alt)), reasoning: dp.reasoning, contextSnapshot: dp.contextSnapshot as Record | null, confidence: null, // Not in schema, default to null @@ -81,3 +99,35 @@ export async function GET( return NextResponse.json({ error: "Internal server error" }, { status: 500 }); } } + +// DELETE /api/traces/[id] — Delete a trace and all related data (cascade) +export async function DELETE( + _request: NextRequest, + { params }: RouteParams +) { + try { + const { id } = await params; + + if (!id || typeof id !== "string") { + return NextResponse.json({ error: "Invalid trace ID" }, { status: 400 }); + } + + const trace = await prisma.trace.findUnique({ + where: { id }, + select: { id: true }, + }); + + if (!trace) { + return NextResponse.json({ error: "Trace not found" }, { status: 404 }); + } + + await prisma.trace.delete({ + where: { id }, + }); + + return NextResponse.json({ success: true, deleted: id }, { status: 200 }); + } catch (error) { + console.error("Error deleting trace:", error); + return NextResponse.json({ error: "Internal server error" }, { status: 500 }); + } +} diff --git a/apps/web/src/app/dashboard/layout.tsx b/apps/web/src/app/dashboard/layout.tsx index 4460adb..dddfd98 100644 --- a/apps/web/src/app/dashboard/layout.tsx +++ b/apps/web/src/app/dashboard/layout.tsx @@ -22,7 +22,7 @@ interface NavItem { const navItems: NavItem[] = [ { href: "/dashboard", label: "Traces", icon: Activity }, { href: "/dashboard/decisions", label: "Decisions", icon: GitBranch }, - { href: "/dashboard/settings", label: "Settings", icon: Settings, comingSoon: true }, + { href: "/dashboard/settings", label: "Settings", icon: Settings }, ]; function Sidebar({ onNavigate }: { onNavigate?: () => void }) { diff --git a/apps/web/src/app/dashboard/settings/page.tsx b/apps/web/src/app/dashboard/settings/page.tsx new file mode 100644 index 0000000..0bd7daa --- /dev/null +++ b/apps/web/src/app/dashboard/settings/page.tsx @@ -0,0 +1,294 @@ +"use client"; + +import { useState, useEffect, useCallback } from "react"; +import { + Settings, + Key, + Globe, + Copy, + Check, + RefreshCw, + Database, + Trash2, + AlertTriangle, +} from "lucide-react"; +import { cn } from "@/lib/utils"; + +interface Stats { + totalTraces: number; + totalSpans: number; + totalDecisions: number; + totalEvents: number; +} + +export default function SettingsPage() { + const [stats, setStats] = useState(null); + const [isLoadingStats, setIsLoadingStats] = useState(true); + const [copiedField, setCopiedField] = useState(null); + const [isPurging, setIsPurging] = useState(false); + const [showPurgeConfirm, setShowPurgeConfirm] = useState(false); + + const fetchStats = useCallback(async () => { + setIsLoadingStats(true); + try { + const res = await fetch("/api/settings/stats", { cache: "no-store" }); + if (res.ok) { + const data = await res.json(); + setStats(data); + } + } catch (error) { + console.error("Failed to fetch stats:", error); + } finally { + setIsLoadingStats(false); + } + }, []); + + useEffect(() => { + fetchStats(); + }, [fetchStats]); + + const copyToClipboard = async (text: string, field: string) => { + try { + await navigator.clipboard.writeText(text); + setCopiedField(field); + setTimeout(() => setCopiedField(null), 2000); + } catch { + console.error("Failed to copy"); + } + }; + + const handlePurgeAll = async () => { + setIsPurging(true); + try { + const res = await fetch("/api/settings/purge", { method: "POST" }); + if (res.ok) { + setShowPurgeConfirm(false); + fetchStats(); + } + } catch (error) { + console.error("Failed to purge:", error); + } finally { + setIsPurging(false); + } + }; + + const endpointUrl = + typeof window !== "undefined" + ? `${window.location.origin}/api/traces` + : "https://agentlens.vectry.tech/api/traces"; + + return ( +
+
+

Settings

+

+ Configuration and SDK connection details +

+
+ + {/* SDK Connection */} +
+
+ +

SDK Connection

+
+ +
+ + + + +
+

Quick start

+
+
{`from agentlens import init
+
+init(
+    api_key="your-api-key",
+    endpoint="${endpointUrl.replace("/api/traces", "")}",
+)`}
+
+
+
+
+ + {/* Data & Storage */} +
+
+ +

Data & Storage

+
+ +
+ {isLoadingStats ? ( +
+ {Array.from({ length: 4 }).map((_, i) => ( +
+
+
+
+ ))} +
+ ) : stats ? ( +
+ + + + +
+ ) : ( +

+ Unable to load statistics +

+ )} + +
+
+

+ Purge All Data +

+

+ Permanently delete all traces, spans, decisions, and events +

+
+ {showPurgeConfirm ? ( +
+ + +
+ ) : ( + + )} +
+
+
+ + {/* About */} +
+
+ +

About

+
+ +
+
+
+

Version

+

0.1.0

+
+
+

SDK Package

+

vectry-agentlens

+
+
+

Database

+

PostgreSQL

+
+
+

License

+

MIT

+
+
+
+
+
+ ); +} + +function SettingField({ + label, + value, + hint, + copiedField, + fieldKey, + onCopy, +}: { + label: string; + value: string; + hint?: string; + copiedField: string | null; + fieldKey: string; + onCopy: (text: string, field: string) => void; +}) { + const isCopied = copiedField === fieldKey; + + return ( +
+ +
+
+ + + {value} + +
+ +
+ {hint && ( +

{hint}

+ )} +
+ ); +} + +function StatCard({ label, value }: { label: string; value: number }) { + return ( +
+

{label}

+

+ {value.toLocaleString()} +

+
+ ); +} diff --git a/apps/web/src/components/trace-analytics.tsx b/apps/web/src/components/trace-analytics.tsx index 790ec0e..4f43351 100644 --- a/apps/web/src/components/trace-analytics.tsx +++ b/apps/web/src/components/trace-analytics.tsx @@ -461,13 +461,35 @@ function CostBreakdown({ // Section C: Token Usage Gauge function TokenUsageGauge({ trace }: { trace: Trace }) { const tokenData = useMemo(() => { - // Try to get total tokens from various sources const totalTokens = (trace.metadata?.totalTokens as number | null | undefined) ?? (trace.metadata?.tokenCount as number | null | undefined) ?? null; - const maxTokens = 128000; // Default context window + const modelContextWindows: Record = { + "gpt-4": 8192, + "gpt-4-32k": 32768, + "gpt-4-turbo": 128000, + "gpt-4o": 128000, + "gpt-4o-mini": 128000, + "gpt-3.5-turbo": 16385, + "claude-3-opus": 200000, + "claude-3-sonnet": 200000, + "claude-3-haiku": 200000, + "claude-3.5-sonnet": 200000, + "claude-4-opus": 200000, + "claude-4-sonnet": 200000, + }; + + const model = (trace.metadata?.model as string | undefined) ?? ""; + const modelLower = model.toLowerCase(); + let maxTokens = 128000; + for (const [prefix, ctx] of Object.entries(modelContextWindows)) { + if (modelLower.startsWith(prefix)) { + maxTokens = ctx; + break; + } + } return { totalTokens, diff --git a/packages/sdk-python/agentlens/integrations/__init__.py b/packages/sdk-python/agentlens/integrations/__init__.py index fcdf98e..4025b3c 100644 --- a/packages/sdk-python/agentlens/integrations/__init__.py +++ b/packages/sdk-python/agentlens/integrations/__init__.py @@ -1 +1,8 @@ -"""Integration packages for AgentLens.""" +"""Integration packages for AgentLens. + +Available integrations: + +- ``openai``: Wrap OpenAI clients with ``wrap_openai(client)``. +- ``anthropic``: Wrap Anthropic clients with ``wrap_anthropic(client)``. +- ``langchain``: LangChain callback handler for tracing. +""" diff --git a/packages/sdk-python/agentlens/integrations/anthropic.py b/packages/sdk-python/agentlens/integrations/anthropic.py new file mode 100644 index 0000000..cc1e505 --- /dev/null +++ b/packages/sdk-python/agentlens/integrations/anthropic.py @@ -0,0 +1,697 @@ +"""Anthropic integration for AgentLens. + +This module provides a wrapper that auto-instruments Anthropic API calls with +tracing, span creation, decision logging for tool calls, and token tracking. +""" + +import json +import logging +import time +from functools import wraps +from typing import Any, Dict, Iterator, List, Optional + +from agentlens.models import ( + Event, + EventType, + _now_iso, +) +from agentlens.trace import ( + TraceContext, + _get_context_stack, + get_current_span_id, + get_current_trace, +) + +logger = logging.getLogger("agentlens") + +# Cost per 1K tokens (input/output) for common Claude models +_MODEL_COSTS: Dict[str, tuple] = { + # Claude 3 family + "claude-3-opus-20240229": (0.015, 0.075), + "claude-3-sonnet-20240229": (0.003, 0.015), + "claude-3-haiku-20240307": (0.00025, 0.00125), + # Claude 3.5 family + "claude-3-5-sonnet-20240620": (0.003, 0.015), + "claude-3-5-sonnet-20241022": (0.003, 0.015), + "claude-3-5-haiku-20241022": (0.0008, 0.004), + # Claude 4 family + "claude-sonnet-4-20250514": (0.003, 0.015), + "claude-opus-4-20250514": (0.015, 0.075), + # Short aliases for prefix matching + "claude-3-opus": (0.015, 0.075), + "claude-3-sonnet": (0.003, 0.015), + "claude-3-haiku": (0.00025, 0.00125), + "claude-3-5-sonnet": (0.003, 0.015), + "claude-3-5-haiku": (0.0008, 0.004), + "claude-3.5-sonnet": (0.003, 0.015), + "claude-3.5-haiku": (0.0008, 0.004), + "claude-sonnet-4": (0.003, 0.015), + "claude-opus-4": (0.015, 0.075), + "claude-4-sonnet": (0.003, 0.015), + "claude-4-opus": (0.015, 0.075), +} + + +def _truncate_data(data: Any, max_length: int = 500) -> Any: + """Truncate data for privacy while preserving structure.""" + if isinstance(data, str): + return data[:max_length] + "..." if len(data) > max_length else data + elif isinstance(data, dict): + return {k: _truncate_data(v, max_length) for k, v in data.items()} + elif isinstance(data, list): + return [_truncate_data(item, max_length) for item in data] + else: + return data + + +def _calculate_cost( + model: str, input_tokens: int, output_tokens: int +) -> Optional[float]: + """Calculate cost in USD based on model pricing.""" + model_lower = model.lower() + + if model_lower in _MODEL_COSTS: + input_cost, output_cost = _MODEL_COSTS[model_lower] + return (float(input_tokens) / 1000.0) * input_cost + float( + output_tokens + ) / 1000.0 * output_cost + + best_match = None + best_len = 0 + for model_name, costs in _MODEL_COSTS.items(): + if model_lower.startswith(model_name.lower()) and len(model_name) > best_len: + best_match = costs + best_len = len(model_name) + + if best_match: + input_cost, output_cost = best_match + return (float(input_tokens) / 1000.0) * input_cost + float( + output_tokens + ) / 1000.0 * output_cost + + return None + + +def _extract_messages_truncated(messages: List[Any]) -> List[Dict[str, Any]]: + """Extract and truncate message content.""" + truncated = [] + for msg in messages: + if isinstance(msg, dict): + truncated_msg = {"role": msg.get("role", "unknown")} + content = msg.get("content") + if content is not None: + if isinstance(content, list): + # Anthropic supports content as list of blocks + truncated_msg["content"] = _truncate_data(content) + else: + truncated_msg["content"] = _truncate_data(str(content)) + truncated.append(truncated_msg) + else: + # Handle message objects + role = getattr(msg, "role", "unknown") + content = getattr(msg, "content", "") + truncated.append({"role": role, "content": _truncate_data(str(content))}) + return truncated + + +def _extract_content_from_response(response: Any) -> Optional[str]: + """Extract text content from Anthropic response. + + Anthropic responses have a ``content`` array with blocks of type + ``text`` or ``tool_use``. + """ + if hasattr(response, "content") and response.content: + text_parts = [] + for block in response.content: + if hasattr(block, "type") and block.type == "text": + text_parts.append(getattr(block, "text", "")) + elif isinstance(block, dict) and block.get("type") == "text": + text_parts.append(block.get("text", "")) + if text_parts: + return _truncate_data(" ".join(text_parts)) + return None + + +def _extract_tool_calls_from_response(response: Any) -> List[Dict[str, Any]]: + """Extract tool_use blocks from Anthropic response. + + Anthropic tool calls appear as content blocks with ``type: "tool_use"``, + containing ``name`` and ``input`` fields. + """ + tool_calls: List[Dict[str, Any]] = [] + if hasattr(response, "content") and response.content: + for block in response.content: + block_type = getattr(block, "type", None) or ( + block.get("type") if isinstance(block, dict) else None + ) + if block_type == "tool_use": + if isinstance(block, dict): + name = block.get("name", "unknown") + arguments = block.get("input", {}) + else: + name = getattr(block, "name", "unknown") + arguments = getattr(block, "input", {}) + tool_calls.append({"name": name, "arguments": arguments}) + return tool_calls + + +class _StreamWrapper: + """Wrapper for Anthropic stream responses to collect events and finalize span.""" + + def __init__(self, original_stream: Any, trace_ctx: Optional[TraceContext]): + self._original_stream = original_stream + self._trace_ctx = trace_ctx + self._events: List[Any] = [] + self._start_time = time.time() + self._model: Optional[str] = None + self._temperature: Optional[float] = None + self._max_tokens: Optional[int] = None + self._messages: Optional[List[Any]] = None + self._parent_span_id = get_current_span_id() + # Accumulated response data from stream events + self._text_content: str = "" + self._tool_calls: List[Dict[str, Any]] = [] + self._current_tool: Optional[Dict[str, Any]] = None + self._input_tokens: Optional[int] = None + self._output_tokens: Optional[int] = None + self._response_model: Optional[str] = None + self._stop_reason: Optional[str] = None + + def set_params( + self, + model: str, + temperature: Optional[float], + max_tokens: Optional[int], + messages: List[Any], + ) -> None: + self._model = model + self._temperature = temperature + self._max_tokens = max_tokens + self._messages = messages + + def _process_event(self, event: Any) -> None: + """Process a single stream event to accumulate response data.""" + event_type = getattr(event, "type", None) + + if event_type == "message_start": + message = getattr(event, "message", None) + if message: + self._response_model = getattr(message, "model", None) + usage = getattr(message, "usage", None) + if usage: + self._input_tokens = getattr(usage, "input_tokens", None) + + elif event_type == "content_block_start": + block = getattr(event, "content_block", None) + if block: + block_type = getattr(block, "type", None) + if block_type == "tool_use": + self._current_tool = { + "name": getattr(block, "name", "unknown"), + "arguments": "", + } + + elif event_type == "content_block_delta": + delta = getattr(event, "delta", None) + if delta: + delta_type = getattr(delta, "type", None) + if delta_type == "text_delta": + self._text_content += getattr(delta, "text", "") + elif delta_type == "input_json_delta": + if self._current_tool is not None: + self._current_tool["arguments"] += getattr( + delta, "partial_json", "" + ) + + elif event_type == "content_block_stop": + if self._current_tool is not None: + # Parse accumulated JSON arguments + try: + args_str = self._current_tool["arguments"] + if isinstance(args_str, str) and args_str: + self._current_tool["arguments"] = json.loads(args_str) + elif not args_str: + self._current_tool["arguments"] = {} + except (json.JSONDecodeError, TypeError): + pass + self._tool_calls.append(self._current_tool) + self._current_tool = None + + elif event_type == "message_delta": + delta = getattr(event, "delta", None) + if delta: + self._stop_reason = getattr(delta, "stop_reason", None) + usage = getattr(event, "usage", None) + if usage: + self._output_tokens = getattr(usage, "output_tokens", None) + + def __iter__(self) -> Iterator[Any]: + return self + + def __next__(self) -> Any: + event = next(self._original_stream) + self._events.append(event) + self._process_event(event) + return event + + def __enter__(self) -> "_StreamWrapper": + """Support context manager protocol for Anthropic streaming.""" + if hasattr(self._original_stream, "__enter__"): + self._original_stream.__enter__() + return self + + def __exit__( + self, + exc_type: Optional[type], + exc_val: Optional[BaseException], + exc_tb: Optional[Any], + ) -> None: + """Finalize span and close underlying stream on context manager exit.""" + if hasattr(self._original_stream, "__exit__"): + self._original_stream.__exit__(exc_type, exc_val, exc_tb) + self.finalize() + + def finalize(self) -> None: + """Create span after stream is fully consumed.""" + if not self._events: + return + + response_model = self._response_model or self._model or "unknown" + + # Build a mock response object for _create_llm_span + mock = _MockResponse() + mock.model = response_model + mock.text_content = self._text_content or None + mock.tool_calls = self._tool_calls + mock.stop_reason = self._stop_reason + mock.input_tokens = self._input_tokens + mock.output_tokens = self._output_tokens + + _create_llm_span( + response=mock, + start_time=self._start_time, + model=self._model or response_model, + temperature=self._temperature, + max_tokens=self._max_tokens, + messages=self._messages or [], + parent_span_id=self._parent_span_id, + trace_ctx=self._trace_ctx, + ) + + if self._trace_ctx: + self._trace_ctx.__exit__(None, None, None) + + +class _MockResponse: + """Lightweight object to unify stream-assembled and regular responses.""" + + def __init__(self) -> None: + self.model: str = "unknown" + self.text_content: Optional[str] = None + self.tool_calls: List[Dict[str, Any]] = [] + self.stop_reason: Optional[str] = None + self.input_tokens: Optional[int] = None + self.output_tokens: Optional[int] = None + # Fake content list for compatibility with extraction helpers + self.content: List[Any] = [] + + +def _create_llm_span( + response: Any, + start_time: float, + model: str, + temperature: Optional[float], + max_tokens: Optional[int], + messages: List[Any], + parent_span_id: Optional[str], + trace_ctx: Optional[TraceContext], +) -> None: + """Create LLM span from Anthropic response.""" + from agentlens.models import Span, SpanStatus, SpanType + + current_trace = get_current_trace() + if current_trace is None: + logger.warning("No active trace, skipping span creation") + return + + end_time = time.time() + duration_ms = int((end_time - start_time) * 1000) + + # Extract token usage + token_count = None + cost_usd = None + + # Handle real Anthropic response + input_tokens = getattr(response, "input_tokens", None) + output_tokens = getattr(response, "output_tokens", None) + + # Real responses have usage object + if input_tokens is None and hasattr(response, "usage"): + usage = response.usage + input_tokens = getattr(usage, "input_tokens", None) + output_tokens = getattr(usage, "output_tokens", None) + + if input_tokens is not None and output_tokens is not None: + token_count = input_tokens + output_tokens + cost_usd = _calculate_cost(model, input_tokens, output_tokens) + + # Extract content - try helpers first, fall back to mock fields + content = _extract_content_from_response(response) + if content is None: + text_content = getattr(response, "text_content", None) + if text_content: + content = _truncate_data(str(text_content)) + + # Extract tool calls - try helpers first, fall back to mock fields + tool_calls = _extract_tool_calls_from_response(response) + if not tool_calls: + tool_calls = getattr(response, "tool_calls", []) or [] + + # Extract stop reason + stop_reason = getattr(response, "stop_reason", None) + + # Create span + span_name = f"anthropic.{model}" + span = Span( + name=span_name, + type=SpanType.LLM_CALL.value, + parent_span_id=parent_span_id, + input_data={"messages": _extract_messages_truncated(messages)}, + output_data={"content": content, "tool_calls": tool_calls or None}, + token_count=token_count, + cost_usd=cost_usd, + duration_ms=duration_ms, + status=SpanStatus.COMPLETED.value, + started_at=_now_iso(), + ended_at=_now_iso(), + metadata={ + "model": model, + "temperature": temperature, + "max_tokens": max_tokens, + "stop_reason": stop_reason, + }, + ) + + current_trace.spans.append(span) + + # Push onto context stack for decision logging + stack = _get_context_stack() + stack.append(span) + + # Log tool call decisions + if tool_calls: + from agentlens.decision import log_decision + + # Try to get reasoning from the assistant's text content + reasoning = None + if content: + reasoning = _truncate_data(str(content)) + + # Build context snapshot + context_snapshot = None + if input_tokens is not None or output_tokens is not None: + context_snapshot = { + "model": model, + "input_tokens": input_tokens, + "output_tokens": output_tokens, + } + + for tool_call in tool_calls: + log_decision( + type="TOOL_SELECTION", + chosen={ + "name": tool_call.get("name", "unknown"), + "arguments": tool_call.get("arguments", {}), + }, + alternatives=[], + reasoning=reasoning, + context_snapshot=context_snapshot, + ) + + # Always pop from context stack + if stack and stack[-1] == span: + stack.pop() + elif stack and isinstance(stack[-1], Span) and stack[-1].id == span.id: + stack.pop() + + +def _handle_error( + error: Exception, + start_time: float, + model: str, + temperature: Optional[float], + max_tokens: Optional[int], + messages: List[Any], + parent_span_id: Optional[str], + trace_ctx: Optional[TraceContext], +) -> None: + """Handle error by creating error span and event.""" + from agentlens.models import Span, SpanStatus, SpanType + + current_trace = get_current_trace() + if current_trace is None: + return + + end_time = time.time() + duration_ms = int((end_time - start_time) * 1000) + + # Create error span + span_name = f"anthropic.{model}" + span = Span( + name=span_name, + type=SpanType.LLM_CALL.value, + parent_span_id=parent_span_id, + input_data={"messages": _extract_messages_truncated(messages)}, + status=SpanStatus.ERROR.value, + status_message=str(error), + started_at=_now_iso(), + ended_at=_now_iso(), + duration_ms=duration_ms, + metadata={ + "model": model, + "temperature": temperature, + "max_tokens": max_tokens, + }, + ) + + current_trace.spans.append(span) + + # Create error event + error_event = Event( + type=EventType.ERROR.value, + name=f"{span_name}: {str(error)}", + span_id=span.id, + metadata={"error_type": type(error).__name__}, + ) + + current_trace.events.append(error_event) + + # Pop from context stack if needed + stack = _get_context_stack() + if stack and isinstance(stack[-1], Span) and stack[-1].id == span.id: + stack.pop() + + +def _wrap_create(original_create: Any, is_async: bool = False) -> Any: + """Wrap Anthropic messages.create method.""" + + if is_async: + + @wraps(original_create) + async def async_traced_create(*args: Any, **kwargs: Any) -> Any: + # Extract parameters + model = kwargs.get("model", "claude-3-5-sonnet-20241022") + temperature = kwargs.get("temperature") + max_tokens = kwargs.get("max_tokens") + messages = kwargs.get("messages", []) + stream = kwargs.get("stream", False) + + parent_span_id = get_current_span_id() + start_time = time.time() + + # Handle streaming + if stream: + trace_ctx = None + if get_current_trace() is None: + trace_ctx = TraceContext(name=f"anthropic-{model}") + trace_ctx.__enter__() + + try: + original_stream = await original_create(*args, **kwargs) + + wrapper = _StreamWrapper(original_stream, trace_ctx) + wrapper.set_params(model, temperature, max_tokens, messages) + + return wrapper + except Exception as e: + if trace_ctx: + trace_ctx.__exit__(type(e), e, None) + raise + + # Non-streaming + trace_ctx = None + if get_current_trace() is None: + trace_ctx = TraceContext(name=f"anthropic-{model}") + trace_ctx.__enter__() + + try: + response = await original_create(*args, **kwargs) + + _create_llm_span( + response=response, + start_time=start_time, + model=model, + temperature=temperature, + max_tokens=max_tokens, + messages=messages, + parent_span_id=parent_span_id, + trace_ctx=trace_ctx, + ) + + if trace_ctx is not None: + trace_ctx.__exit__(None, None, None) + + return response + except Exception as e: + _handle_error( + error=e, + start_time=start_time, + model=model, + temperature=temperature, + max_tokens=max_tokens, + messages=messages, + parent_span_id=parent_span_id, + trace_ctx=trace_ctx, + ) + raise + + return async_traced_create + + else: + + @wraps(original_create) + def traced_create(*args: Any, **kwargs: Any) -> Any: + # Extract parameters + model = kwargs.get("model", "claude-3-5-sonnet-20241022") + temperature = kwargs.get("temperature") + max_tokens = kwargs.get("max_tokens") + messages = kwargs.get("messages", []) + stream = kwargs.get("stream", False) + + parent_span_id = get_current_span_id() + start_time = time.time() + + # Handle streaming + if stream: + trace_ctx = None + if get_current_trace() is None: + trace_ctx = TraceContext(name=f"anthropic-{model}") + trace_ctx.__enter__() + + try: + original_stream = original_create(*args, **kwargs) + + wrapper = _StreamWrapper(original_stream, trace_ctx) + wrapper.set_params(model, temperature, max_tokens, messages) + + return wrapper + except Exception as e: + if trace_ctx: + trace_ctx.__exit__(type(e), e, None) + raise + + # Non-streaming + trace_ctx = None + if get_current_trace() is None: + trace_ctx = TraceContext(name=f"anthropic-{model}") + trace_ctx.__enter__() + + try: + response = original_create(*args, **kwargs) + + _create_llm_span( + response=response, + start_time=start_time, + model=model, + temperature=temperature, + max_tokens=max_tokens, + messages=messages, + parent_span_id=parent_span_id, + trace_ctx=trace_ctx, + ) + + if trace_ctx is not None: + trace_ctx.__exit__(None, None, None) + + return response + except Exception as e: + _handle_error( + error=e, + start_time=start_time, + model=model, + temperature=temperature, + max_tokens=max_tokens, + messages=messages, + parent_span_id=parent_span_id, + trace_ctx=trace_ctx, + ) + raise + + return traced_create + + +def wrap_anthropic(client: Any) -> Any: + """Wrap an Anthropic client to add AgentLens tracing. + + Instruments ``client.messages.create()`` to automatically capture LLM spans, + token usage, cost estimation, and tool-call decisions. + + Supports both sync (``anthropic.Anthropic``) and async + (``anthropic.AsyncAnthropic``) clients as well as streaming responses. + + Args: + client: An ``anthropic.Anthropic`` or ``anthropic.AsyncAnthropic`` instance. + + Returns: + The same client instance with ``messages.create`` wrapped for tracing. + + Example:: + + import anthropic + from agentlens.integrations.anthropic import wrap_anthropic + + client = anthropic.Anthropic(api_key="sk-...") + traced_client = wrap_anthropic(client) + + response = traced_client.messages.create( + model="claude-3-sonnet-20240229", + max_tokens=1024, + messages=[{"role": "user", "content": "Hello!"}] + ) + """ + # Detect async client by checking for common async patterns + is_async = False + try: + import asyncio + import inspect + + create_method = client.messages.create + if inspect.iscoroutinefunction(create_method) or ( + hasattr(create_method, "__wrapped__") + and inspect.iscoroutinefunction(create_method.__wrapped__) + ): + is_async = True + except (AttributeError, ImportError): + pass + + # Also detect by class name as a fallback + client_class_name = type(client).__name__ + if "Async" in client_class_name: + is_async = True + + original_create = client.messages.create + traced_create = _wrap_create(original_create, is_async=is_async) + client.messages.create = traced_create + + logger.debug("Anthropic client wrapped with AgentLens tracing") + return client