feat: Settings page, DELETE traces endpoint, Anthropic SDK, dashboard bug fixes

- Add /dashboard/settings with SDK connection details, data stats, purge
- Add DELETE /api/traces/[id] with cascade deletion
- Add Anthropic integration (wrap_anthropic) for Python SDK
- Fix missing root duration (totalDuration -> durationMs mapping)
- Fix truncated JSON in decision tree nodes (extract readable labels)
- Fix hardcoded 128K maxTokens in token gauge (model-aware context windows)
- Enable Settings nav item in sidebar
This commit is contained in:
Vectry
2026-02-10 02:35:50 +00:00
parent 4f7719eace
commit 0149e0a6f4
8 changed files with 1125 additions and 9 deletions

View File

@@ -0,0 +1,21 @@
import { NextResponse } from "next/server";
import { prisma } from "@/lib/prisma";
export async function POST() {
try {
await prisma.$transaction([
prisma.event.deleteMany(),
prisma.decisionPoint.deleteMany(),
prisma.span.deleteMany(),
prisma.trace.deleteMany(),
]);
return NextResponse.json({ success: true }, { status: 200 });
} catch (error) {
console.error("Error purging data:", error);
return NextResponse.json(
{ error: "Internal server error" },
{ status: 500 }
);
}
}

View File

@@ -0,0 +1,25 @@
import { NextResponse } from "next/server";
import { prisma } from "@/lib/prisma";
export async function GET() {
try {
const [totalTraces, totalSpans, totalDecisions, totalEvents] =
await Promise.all([
prisma.trace.count(),
prisma.span.count(),
prisma.decisionPoint.count(),
prisma.event.count(),
]);
return NextResponse.json(
{ totalTraces, totalSpans, totalDecisions, totalEvents },
{ status: 200 }
);
} catch (error) {
console.error("Error fetching stats:", error);
return NextResponse.json(
{ error: "Internal server error" },
{ status: 500 }
);
}
}

View File

@@ -1,10 +1,26 @@
import { NextResponse } from "next/server";
import { NextRequest, NextResponse } from "next/server";
import { prisma } from "@/lib/prisma";
type RouteParams = { params: Promise<{ id: string }> };
function extractActionLabel(value: unknown): string {
if (typeof value === "string") return value;
if (value && typeof value === "object" && !Array.isArray(value)) {
const obj = value as Record<string, unknown>;
if (typeof obj.name === "string") return obj.name;
if (typeof obj.action === "string") return obj.action;
if (typeof obj.tool === "string") return obj.tool;
for (const v of Object.values(obj)) {
if (typeof v === "string") return v;
}
}
return JSON.stringify(value);
}
// GET /api/traces/[id] — Get single trace with all relations
export async function GET(
_request: Request,
{ params }: { params: Promise<{ id: string }> }
_request: NextRequest,
{ params }: RouteParams
) {
try {
const { id } = await params;
@@ -41,11 +57,13 @@ export async function GET(
// Transform data to match frontend expectations
const transformedTrace = {
...trace,
durationMs: trace.totalDuration,
costUsd: trace.totalCost,
decisionPoints: trace.decisionPoints.map((dp) => ({
id: dp.id,
type: dp.type,
chosenAction: typeof dp.chosen === "string" ? dp.chosen : JSON.stringify(dp.chosen),
alternatives: dp.alternatives.map((alt) => (typeof alt === "string" ? alt : JSON.stringify(alt))),
chosenAction: extractActionLabel(dp.chosen),
alternatives: dp.alternatives.map((alt) => extractActionLabel(alt)),
reasoning: dp.reasoning,
contextSnapshot: dp.contextSnapshot as Record<string, unknown> | null,
confidence: null, // Not in schema, default to null
@@ -81,3 +99,35 @@ export async function GET(
return NextResponse.json({ error: "Internal server error" }, { status: 500 });
}
}
// DELETE /api/traces/[id] — Delete a trace and all related data (cascade)
export async function DELETE(
_request: NextRequest,
{ params }: RouteParams
) {
try {
const { id } = await params;
if (!id || typeof id !== "string") {
return NextResponse.json({ error: "Invalid trace ID" }, { status: 400 });
}
const trace = await prisma.trace.findUnique({
where: { id },
select: { id: true },
});
if (!trace) {
return NextResponse.json({ error: "Trace not found" }, { status: 404 });
}
await prisma.trace.delete({
where: { id },
});
return NextResponse.json({ success: true, deleted: id }, { status: 200 });
} catch (error) {
console.error("Error deleting trace:", error);
return NextResponse.json({ error: "Internal server error" }, { status: 500 });
}
}

View File

@@ -22,7 +22,7 @@ interface NavItem {
const navItems: NavItem[] = [
{ href: "/dashboard", label: "Traces", icon: Activity },
{ href: "/dashboard/decisions", label: "Decisions", icon: GitBranch },
{ href: "/dashboard/settings", label: "Settings", icon: Settings, comingSoon: true },
{ href: "/dashboard/settings", label: "Settings", icon: Settings },
];
function Sidebar({ onNavigate }: { onNavigate?: () => void }) {

View File

@@ -0,0 +1,294 @@
"use client";
import { useState, useEffect, useCallback } from "react";
import {
Settings,
Key,
Globe,
Copy,
Check,
RefreshCw,
Database,
Trash2,
AlertTriangle,
} from "lucide-react";
import { cn } from "@/lib/utils";
interface Stats {
totalTraces: number;
totalSpans: number;
totalDecisions: number;
totalEvents: number;
}
export default function SettingsPage() {
const [stats, setStats] = useState<Stats | null>(null);
const [isLoadingStats, setIsLoadingStats] = useState(true);
const [copiedField, setCopiedField] = useState<string | null>(null);
const [isPurging, setIsPurging] = useState(false);
const [showPurgeConfirm, setShowPurgeConfirm] = useState(false);
const fetchStats = useCallback(async () => {
setIsLoadingStats(true);
try {
const res = await fetch("/api/settings/stats", { cache: "no-store" });
if (res.ok) {
const data = await res.json();
setStats(data);
}
} catch (error) {
console.error("Failed to fetch stats:", error);
} finally {
setIsLoadingStats(false);
}
}, []);
useEffect(() => {
fetchStats();
}, [fetchStats]);
const copyToClipboard = async (text: string, field: string) => {
try {
await navigator.clipboard.writeText(text);
setCopiedField(field);
setTimeout(() => setCopiedField(null), 2000);
} catch {
console.error("Failed to copy");
}
};
const handlePurgeAll = async () => {
setIsPurging(true);
try {
const res = await fetch("/api/settings/purge", { method: "POST" });
if (res.ok) {
setShowPurgeConfirm(false);
fetchStats();
}
} catch (error) {
console.error("Failed to purge:", error);
} finally {
setIsPurging(false);
}
};
const endpointUrl =
typeof window !== "undefined"
? `${window.location.origin}/api/traces`
: "https://agentlens.vectry.tech/api/traces";
return (
<div className="space-y-8 max-w-3xl">
<div>
<h1 className="text-2xl font-bold text-neutral-100">Settings</h1>
<p className="text-neutral-400 mt-1">
Configuration and SDK connection details
</p>
</div>
{/* SDK Connection */}
<section className="space-y-4">
<div className="flex items-center gap-2 text-neutral-300">
<Globe className="w-5 h-5 text-emerald-400" />
<h2 className="text-lg font-semibold">SDK Connection</h2>
</div>
<div className="bg-neutral-900 border border-neutral-800 rounded-xl p-6 space-y-5">
<SettingField
label="Ingest Endpoint"
value={endpointUrl}
copiedField={copiedField}
fieldKey="endpoint"
onCopy={copyToClipboard}
/>
<SettingField
label="API Key"
value="any-value-accepted"
hint="Authentication is not enforced yet. Use any non-empty string as your Bearer token."
copiedField={copiedField}
fieldKey="apikey"
onCopy={copyToClipboard}
/>
<div className="pt-4 border-t border-neutral-800">
<p className="text-xs text-neutral-500 mb-3">Quick start</p>
<div className="bg-neutral-950 border border-neutral-800 rounded-lg p-4 font-mono text-sm text-neutral-300 overflow-x-auto">
<pre>{`from agentlens import init
init(
api_key="your-api-key",
endpoint="${endpointUrl.replace("/api/traces", "")}",
)`}</pre>
</div>
</div>
</div>
</section>
{/* Data & Storage */}
<section className="space-y-4">
<div className="flex items-center gap-2 text-neutral-300">
<Database className="w-5 h-5 text-emerald-400" />
<h2 className="text-lg font-semibold">Data & Storage</h2>
</div>
<div className="bg-neutral-900 border border-neutral-800 rounded-xl p-6 space-y-5">
{isLoadingStats ? (
<div className="grid grid-cols-2 sm:grid-cols-4 gap-4">
{Array.from({ length: 4 }).map((_, i) => (
<div key={i} className="animate-pulse">
<div className="h-4 w-16 bg-neutral-800 rounded mb-2" />
<div className="h-8 w-12 bg-neutral-800 rounded" />
</div>
))}
</div>
) : stats ? (
<div className="grid grid-cols-2 sm:grid-cols-4 gap-4">
<StatCard label="Traces" value={stats.totalTraces} />
<StatCard label="Spans" value={stats.totalSpans} />
<StatCard label="Decisions" value={stats.totalDecisions} />
<StatCard label="Events" value={stats.totalEvents} />
</div>
) : (
<p className="text-sm text-neutral-500">
Unable to load statistics
</p>
)}
<div className="pt-4 border-t border-neutral-800 flex items-center justify-between">
<div>
<p className="text-sm text-neutral-300 font-medium">
Purge All Data
</p>
<p className="text-xs text-neutral-500 mt-0.5">
Permanently delete all traces, spans, decisions, and events
</p>
</div>
{showPurgeConfirm ? (
<div className="flex items-center gap-2">
<button
onClick={() => setShowPurgeConfirm(false)}
className="px-3 py-2 text-sm text-neutral-400 hover:text-neutral-200 transition-colors"
>
Cancel
</button>
<button
onClick={handlePurgeAll}
disabled={isPurging}
className="flex items-center gap-2 px-4 py-2 bg-red-500/20 border border-red-500/30 text-red-400 rounded-lg text-sm font-medium hover:bg-red-500/30 disabled:opacity-50 transition-colors"
>
{isPurging ? (
<RefreshCw className="w-4 h-4 animate-spin" />
) : (
<AlertTriangle className="w-4 h-4" />
)}
Confirm Purge
</button>
</div>
) : (
<button
onClick={() => setShowPurgeConfirm(true)}
className="flex items-center gap-2 px-4 py-2 bg-neutral-800 border border-neutral-700 text-neutral-400 rounded-lg text-sm font-medium hover:text-red-400 hover:border-red-500/30 transition-colors"
>
<Trash2 className="w-4 h-4" />
Purge
</button>
)}
</div>
</div>
</section>
{/* About */}
<section className="space-y-4">
<div className="flex items-center gap-2 text-neutral-300">
<Settings className="w-5 h-5 text-emerald-400" />
<h2 className="text-lg font-semibold">About</h2>
</div>
<div className="bg-neutral-900 border border-neutral-800 rounded-xl p-6">
<div className="grid grid-cols-2 gap-4 text-sm">
<div>
<p className="text-neutral-500">Version</p>
<p className="text-neutral-200 font-medium">0.1.0</p>
</div>
<div>
<p className="text-neutral-500">SDK Package</p>
<p className="text-neutral-200 font-medium">vectry-agentlens</p>
</div>
<div>
<p className="text-neutral-500">Database</p>
<p className="text-neutral-200 font-medium">PostgreSQL</p>
</div>
<div>
<p className="text-neutral-500">License</p>
<p className="text-neutral-200 font-medium">MIT</p>
</div>
</div>
</div>
</section>
</div>
);
}
function SettingField({
label,
value,
hint,
copiedField,
fieldKey,
onCopy,
}: {
label: string;
value: string;
hint?: string;
copiedField: string | null;
fieldKey: string;
onCopy: (text: string, field: string) => void;
}) {
const isCopied = copiedField === fieldKey;
return (
<div>
<label className="text-xs text-neutral-500 font-medium block mb-1.5">
{label}
</label>
<div className="flex items-center gap-2">
<div className="flex-1 flex items-center gap-2 px-3 py-2.5 bg-neutral-950 border border-neutral-800 rounded-lg">
<Key className="w-4 h-4 text-neutral-600 shrink-0" />
<code className="text-sm text-neutral-300 font-mono truncate">
{value}
</code>
</div>
<button
onClick={() => onCopy(value, fieldKey)}
className={cn(
"p-2.5 rounded-lg border transition-all",
isCopied
? "bg-emerald-500/10 border-emerald-500/30 text-emerald-400"
: "bg-neutral-800 border-neutral-700 text-neutral-400 hover:text-neutral-200"
)}
>
{isCopied ? (
<Check className="w-4 h-4" />
) : (
<Copy className="w-4 h-4" />
)}
</button>
</div>
{hint && (
<p className="text-xs text-neutral-600 mt-1.5">{hint}</p>
)}
</div>
);
}
function StatCard({ label, value }: { label: string; value: number }) {
return (
<div className="p-3 bg-neutral-800/50 rounded-lg">
<p className="text-xs text-neutral-500">{label}</p>
<p className="text-xl font-bold text-neutral-100 mt-1">
{value.toLocaleString()}
</p>
</div>
);
}

View File

@@ -461,13 +461,35 @@ function CostBreakdown({
// Section C: Token Usage Gauge
function TokenUsageGauge({ trace }: { trace: Trace }) {
const tokenData = useMemo(() => {
// Try to get total tokens from various sources
const totalTokens =
(trace.metadata?.totalTokens as number | null | undefined) ??
(trace.metadata?.tokenCount as number | null | undefined) ??
null;
const maxTokens = 128000; // Default context window
const modelContextWindows: Record<string, number> = {
"gpt-4": 8192,
"gpt-4-32k": 32768,
"gpt-4-turbo": 128000,
"gpt-4o": 128000,
"gpt-4o-mini": 128000,
"gpt-3.5-turbo": 16385,
"claude-3-opus": 200000,
"claude-3-sonnet": 200000,
"claude-3-haiku": 200000,
"claude-3.5-sonnet": 200000,
"claude-4-opus": 200000,
"claude-4-sonnet": 200000,
};
const model = (trace.metadata?.model as string | undefined) ?? "";
const modelLower = model.toLowerCase();
let maxTokens = 128000;
for (const [prefix, ctx] of Object.entries(modelContextWindows)) {
if (modelLower.startsWith(prefix)) {
maxTokens = ctx;
break;
}
}
return {
totalTokens,

View File

@@ -1 +1,8 @@
"""Integration packages for AgentLens."""
"""Integration packages for AgentLens.
Available integrations:
- ``openai``: Wrap OpenAI clients with ``wrap_openai(client)``.
- ``anthropic``: Wrap Anthropic clients with ``wrap_anthropic(client)``.
- ``langchain``: LangChain callback handler for tracing.
"""

View File

@@ -0,0 +1,697 @@
"""Anthropic integration for AgentLens.
This module provides a wrapper that auto-instruments Anthropic API calls with
tracing, span creation, decision logging for tool calls, and token tracking.
"""
import json
import logging
import time
from functools import wraps
from typing import Any, Dict, Iterator, List, Optional
from agentlens.models import (
Event,
EventType,
_now_iso,
)
from agentlens.trace import (
TraceContext,
_get_context_stack,
get_current_span_id,
get_current_trace,
)
logger = logging.getLogger("agentlens")
# Cost per 1K tokens (input/output) for common Claude models
_MODEL_COSTS: Dict[str, tuple] = {
# Claude 3 family
"claude-3-opus-20240229": (0.015, 0.075),
"claude-3-sonnet-20240229": (0.003, 0.015),
"claude-3-haiku-20240307": (0.00025, 0.00125),
# Claude 3.5 family
"claude-3-5-sonnet-20240620": (0.003, 0.015),
"claude-3-5-sonnet-20241022": (0.003, 0.015),
"claude-3-5-haiku-20241022": (0.0008, 0.004),
# Claude 4 family
"claude-sonnet-4-20250514": (0.003, 0.015),
"claude-opus-4-20250514": (0.015, 0.075),
# Short aliases for prefix matching
"claude-3-opus": (0.015, 0.075),
"claude-3-sonnet": (0.003, 0.015),
"claude-3-haiku": (0.00025, 0.00125),
"claude-3-5-sonnet": (0.003, 0.015),
"claude-3-5-haiku": (0.0008, 0.004),
"claude-3.5-sonnet": (0.003, 0.015),
"claude-3.5-haiku": (0.0008, 0.004),
"claude-sonnet-4": (0.003, 0.015),
"claude-opus-4": (0.015, 0.075),
"claude-4-sonnet": (0.003, 0.015),
"claude-4-opus": (0.015, 0.075),
}
def _truncate_data(data: Any, max_length: int = 500) -> Any:
"""Truncate data for privacy while preserving structure."""
if isinstance(data, str):
return data[:max_length] + "..." if len(data) > max_length else data
elif isinstance(data, dict):
return {k: _truncate_data(v, max_length) for k, v in data.items()}
elif isinstance(data, list):
return [_truncate_data(item, max_length) for item in data]
else:
return data
def _calculate_cost(
model: str, input_tokens: int, output_tokens: int
) -> Optional[float]:
"""Calculate cost in USD based on model pricing."""
model_lower = model.lower()
if model_lower in _MODEL_COSTS:
input_cost, output_cost = _MODEL_COSTS[model_lower]
return (float(input_tokens) / 1000.0) * input_cost + float(
output_tokens
) / 1000.0 * output_cost
best_match = None
best_len = 0
for model_name, costs in _MODEL_COSTS.items():
if model_lower.startswith(model_name.lower()) and len(model_name) > best_len:
best_match = costs
best_len = len(model_name)
if best_match:
input_cost, output_cost = best_match
return (float(input_tokens) / 1000.0) * input_cost + float(
output_tokens
) / 1000.0 * output_cost
return None
def _extract_messages_truncated(messages: List[Any]) -> List[Dict[str, Any]]:
"""Extract and truncate message content."""
truncated = []
for msg in messages:
if isinstance(msg, dict):
truncated_msg = {"role": msg.get("role", "unknown")}
content = msg.get("content")
if content is not None:
if isinstance(content, list):
# Anthropic supports content as list of blocks
truncated_msg["content"] = _truncate_data(content)
else:
truncated_msg["content"] = _truncate_data(str(content))
truncated.append(truncated_msg)
else:
# Handle message objects
role = getattr(msg, "role", "unknown")
content = getattr(msg, "content", "")
truncated.append({"role": role, "content": _truncate_data(str(content))})
return truncated
def _extract_content_from_response(response: Any) -> Optional[str]:
"""Extract text content from Anthropic response.
Anthropic responses have a ``content`` array with blocks of type
``text`` or ``tool_use``.
"""
if hasattr(response, "content") and response.content:
text_parts = []
for block in response.content:
if hasattr(block, "type") and block.type == "text":
text_parts.append(getattr(block, "text", ""))
elif isinstance(block, dict) and block.get("type") == "text":
text_parts.append(block.get("text", ""))
if text_parts:
return _truncate_data(" ".join(text_parts))
return None
def _extract_tool_calls_from_response(response: Any) -> List[Dict[str, Any]]:
"""Extract tool_use blocks from Anthropic response.
Anthropic tool calls appear as content blocks with ``type: "tool_use"``,
containing ``name`` and ``input`` fields.
"""
tool_calls: List[Dict[str, Any]] = []
if hasattr(response, "content") and response.content:
for block in response.content:
block_type = getattr(block, "type", None) or (
block.get("type") if isinstance(block, dict) else None
)
if block_type == "tool_use":
if isinstance(block, dict):
name = block.get("name", "unknown")
arguments = block.get("input", {})
else:
name = getattr(block, "name", "unknown")
arguments = getattr(block, "input", {})
tool_calls.append({"name": name, "arguments": arguments})
return tool_calls
class _StreamWrapper:
"""Wrapper for Anthropic stream responses to collect events and finalize span."""
def __init__(self, original_stream: Any, trace_ctx: Optional[TraceContext]):
self._original_stream = original_stream
self._trace_ctx = trace_ctx
self._events: List[Any] = []
self._start_time = time.time()
self._model: Optional[str] = None
self._temperature: Optional[float] = None
self._max_tokens: Optional[int] = None
self._messages: Optional[List[Any]] = None
self._parent_span_id = get_current_span_id()
# Accumulated response data from stream events
self._text_content: str = ""
self._tool_calls: List[Dict[str, Any]] = []
self._current_tool: Optional[Dict[str, Any]] = None
self._input_tokens: Optional[int] = None
self._output_tokens: Optional[int] = None
self._response_model: Optional[str] = None
self._stop_reason: Optional[str] = None
def set_params(
self,
model: str,
temperature: Optional[float],
max_tokens: Optional[int],
messages: List[Any],
) -> None:
self._model = model
self._temperature = temperature
self._max_tokens = max_tokens
self._messages = messages
def _process_event(self, event: Any) -> None:
"""Process a single stream event to accumulate response data."""
event_type = getattr(event, "type", None)
if event_type == "message_start":
message = getattr(event, "message", None)
if message:
self._response_model = getattr(message, "model", None)
usage = getattr(message, "usage", None)
if usage:
self._input_tokens = getattr(usage, "input_tokens", None)
elif event_type == "content_block_start":
block = getattr(event, "content_block", None)
if block:
block_type = getattr(block, "type", None)
if block_type == "tool_use":
self._current_tool = {
"name": getattr(block, "name", "unknown"),
"arguments": "",
}
elif event_type == "content_block_delta":
delta = getattr(event, "delta", None)
if delta:
delta_type = getattr(delta, "type", None)
if delta_type == "text_delta":
self._text_content += getattr(delta, "text", "")
elif delta_type == "input_json_delta":
if self._current_tool is not None:
self._current_tool["arguments"] += getattr(
delta, "partial_json", ""
)
elif event_type == "content_block_stop":
if self._current_tool is not None:
# Parse accumulated JSON arguments
try:
args_str = self._current_tool["arguments"]
if isinstance(args_str, str) and args_str:
self._current_tool["arguments"] = json.loads(args_str)
elif not args_str:
self._current_tool["arguments"] = {}
except (json.JSONDecodeError, TypeError):
pass
self._tool_calls.append(self._current_tool)
self._current_tool = None
elif event_type == "message_delta":
delta = getattr(event, "delta", None)
if delta:
self._stop_reason = getattr(delta, "stop_reason", None)
usage = getattr(event, "usage", None)
if usage:
self._output_tokens = getattr(usage, "output_tokens", None)
def __iter__(self) -> Iterator[Any]:
return self
def __next__(self) -> Any:
event = next(self._original_stream)
self._events.append(event)
self._process_event(event)
return event
def __enter__(self) -> "_StreamWrapper":
"""Support context manager protocol for Anthropic streaming."""
if hasattr(self._original_stream, "__enter__"):
self._original_stream.__enter__()
return self
def __exit__(
self,
exc_type: Optional[type],
exc_val: Optional[BaseException],
exc_tb: Optional[Any],
) -> None:
"""Finalize span and close underlying stream on context manager exit."""
if hasattr(self._original_stream, "__exit__"):
self._original_stream.__exit__(exc_type, exc_val, exc_tb)
self.finalize()
def finalize(self) -> None:
"""Create span after stream is fully consumed."""
if not self._events:
return
response_model = self._response_model or self._model or "unknown"
# Build a mock response object for _create_llm_span
mock = _MockResponse()
mock.model = response_model
mock.text_content = self._text_content or None
mock.tool_calls = self._tool_calls
mock.stop_reason = self._stop_reason
mock.input_tokens = self._input_tokens
mock.output_tokens = self._output_tokens
_create_llm_span(
response=mock,
start_time=self._start_time,
model=self._model or response_model,
temperature=self._temperature,
max_tokens=self._max_tokens,
messages=self._messages or [],
parent_span_id=self._parent_span_id,
trace_ctx=self._trace_ctx,
)
if self._trace_ctx:
self._trace_ctx.__exit__(None, None, None)
class _MockResponse:
"""Lightweight object to unify stream-assembled and regular responses."""
def __init__(self) -> None:
self.model: str = "unknown"
self.text_content: Optional[str] = None
self.tool_calls: List[Dict[str, Any]] = []
self.stop_reason: Optional[str] = None
self.input_tokens: Optional[int] = None
self.output_tokens: Optional[int] = None
# Fake content list for compatibility with extraction helpers
self.content: List[Any] = []
def _create_llm_span(
response: Any,
start_time: float,
model: str,
temperature: Optional[float],
max_tokens: Optional[int],
messages: List[Any],
parent_span_id: Optional[str],
trace_ctx: Optional[TraceContext],
) -> None:
"""Create LLM span from Anthropic response."""
from agentlens.models import Span, SpanStatus, SpanType
current_trace = get_current_trace()
if current_trace is None:
logger.warning("No active trace, skipping span creation")
return
end_time = time.time()
duration_ms = int((end_time - start_time) * 1000)
# Extract token usage
token_count = None
cost_usd = None
# Handle real Anthropic response
input_tokens = getattr(response, "input_tokens", None)
output_tokens = getattr(response, "output_tokens", None)
# Real responses have usage object
if input_tokens is None and hasattr(response, "usage"):
usage = response.usage
input_tokens = getattr(usage, "input_tokens", None)
output_tokens = getattr(usage, "output_tokens", None)
if input_tokens is not None and output_tokens is not None:
token_count = input_tokens + output_tokens
cost_usd = _calculate_cost(model, input_tokens, output_tokens)
# Extract content - try helpers first, fall back to mock fields
content = _extract_content_from_response(response)
if content is None:
text_content = getattr(response, "text_content", None)
if text_content:
content = _truncate_data(str(text_content))
# Extract tool calls - try helpers first, fall back to mock fields
tool_calls = _extract_tool_calls_from_response(response)
if not tool_calls:
tool_calls = getattr(response, "tool_calls", []) or []
# Extract stop reason
stop_reason = getattr(response, "stop_reason", None)
# Create span
span_name = f"anthropic.{model}"
span = Span(
name=span_name,
type=SpanType.LLM_CALL.value,
parent_span_id=parent_span_id,
input_data={"messages": _extract_messages_truncated(messages)},
output_data={"content": content, "tool_calls": tool_calls or None},
token_count=token_count,
cost_usd=cost_usd,
duration_ms=duration_ms,
status=SpanStatus.COMPLETED.value,
started_at=_now_iso(),
ended_at=_now_iso(),
metadata={
"model": model,
"temperature": temperature,
"max_tokens": max_tokens,
"stop_reason": stop_reason,
},
)
current_trace.spans.append(span)
# Push onto context stack for decision logging
stack = _get_context_stack()
stack.append(span)
# Log tool call decisions
if tool_calls:
from agentlens.decision import log_decision
# Try to get reasoning from the assistant's text content
reasoning = None
if content:
reasoning = _truncate_data(str(content))
# Build context snapshot
context_snapshot = None
if input_tokens is not None or output_tokens is not None:
context_snapshot = {
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
}
for tool_call in tool_calls:
log_decision(
type="TOOL_SELECTION",
chosen={
"name": tool_call.get("name", "unknown"),
"arguments": tool_call.get("arguments", {}),
},
alternatives=[],
reasoning=reasoning,
context_snapshot=context_snapshot,
)
# Always pop from context stack
if stack and stack[-1] == span:
stack.pop()
elif stack and isinstance(stack[-1], Span) and stack[-1].id == span.id:
stack.pop()
def _handle_error(
error: Exception,
start_time: float,
model: str,
temperature: Optional[float],
max_tokens: Optional[int],
messages: List[Any],
parent_span_id: Optional[str],
trace_ctx: Optional[TraceContext],
) -> None:
"""Handle error by creating error span and event."""
from agentlens.models import Span, SpanStatus, SpanType
current_trace = get_current_trace()
if current_trace is None:
return
end_time = time.time()
duration_ms = int((end_time - start_time) * 1000)
# Create error span
span_name = f"anthropic.{model}"
span = Span(
name=span_name,
type=SpanType.LLM_CALL.value,
parent_span_id=parent_span_id,
input_data={"messages": _extract_messages_truncated(messages)},
status=SpanStatus.ERROR.value,
status_message=str(error),
started_at=_now_iso(),
ended_at=_now_iso(),
duration_ms=duration_ms,
metadata={
"model": model,
"temperature": temperature,
"max_tokens": max_tokens,
},
)
current_trace.spans.append(span)
# Create error event
error_event = Event(
type=EventType.ERROR.value,
name=f"{span_name}: {str(error)}",
span_id=span.id,
metadata={"error_type": type(error).__name__},
)
current_trace.events.append(error_event)
# Pop from context stack if needed
stack = _get_context_stack()
if stack and isinstance(stack[-1], Span) and stack[-1].id == span.id:
stack.pop()
def _wrap_create(original_create: Any, is_async: bool = False) -> Any:
"""Wrap Anthropic messages.create method."""
if is_async:
@wraps(original_create)
async def async_traced_create(*args: Any, **kwargs: Any) -> Any:
# Extract parameters
model = kwargs.get("model", "claude-3-5-sonnet-20241022")
temperature = kwargs.get("temperature")
max_tokens = kwargs.get("max_tokens")
messages = kwargs.get("messages", [])
stream = kwargs.get("stream", False)
parent_span_id = get_current_span_id()
start_time = time.time()
# Handle streaming
if stream:
trace_ctx = None
if get_current_trace() is None:
trace_ctx = TraceContext(name=f"anthropic-{model}")
trace_ctx.__enter__()
try:
original_stream = await original_create(*args, **kwargs)
wrapper = _StreamWrapper(original_stream, trace_ctx)
wrapper.set_params(model, temperature, max_tokens, messages)
return wrapper
except Exception as e:
if trace_ctx:
trace_ctx.__exit__(type(e), e, None)
raise
# Non-streaming
trace_ctx = None
if get_current_trace() is None:
trace_ctx = TraceContext(name=f"anthropic-{model}")
trace_ctx.__enter__()
try:
response = await original_create(*args, **kwargs)
_create_llm_span(
response=response,
start_time=start_time,
model=model,
temperature=temperature,
max_tokens=max_tokens,
messages=messages,
parent_span_id=parent_span_id,
trace_ctx=trace_ctx,
)
if trace_ctx is not None:
trace_ctx.__exit__(None, None, None)
return response
except Exception as e:
_handle_error(
error=e,
start_time=start_time,
model=model,
temperature=temperature,
max_tokens=max_tokens,
messages=messages,
parent_span_id=parent_span_id,
trace_ctx=trace_ctx,
)
raise
return async_traced_create
else:
@wraps(original_create)
def traced_create(*args: Any, **kwargs: Any) -> Any:
# Extract parameters
model = kwargs.get("model", "claude-3-5-sonnet-20241022")
temperature = kwargs.get("temperature")
max_tokens = kwargs.get("max_tokens")
messages = kwargs.get("messages", [])
stream = kwargs.get("stream", False)
parent_span_id = get_current_span_id()
start_time = time.time()
# Handle streaming
if stream:
trace_ctx = None
if get_current_trace() is None:
trace_ctx = TraceContext(name=f"anthropic-{model}")
trace_ctx.__enter__()
try:
original_stream = original_create(*args, **kwargs)
wrapper = _StreamWrapper(original_stream, trace_ctx)
wrapper.set_params(model, temperature, max_tokens, messages)
return wrapper
except Exception as e:
if trace_ctx:
trace_ctx.__exit__(type(e), e, None)
raise
# Non-streaming
trace_ctx = None
if get_current_trace() is None:
trace_ctx = TraceContext(name=f"anthropic-{model}")
trace_ctx.__enter__()
try:
response = original_create(*args, **kwargs)
_create_llm_span(
response=response,
start_time=start_time,
model=model,
temperature=temperature,
max_tokens=max_tokens,
messages=messages,
parent_span_id=parent_span_id,
trace_ctx=trace_ctx,
)
if trace_ctx is not None:
trace_ctx.__exit__(None, None, None)
return response
except Exception as e:
_handle_error(
error=e,
start_time=start_time,
model=model,
temperature=temperature,
max_tokens=max_tokens,
messages=messages,
parent_span_id=parent_span_id,
trace_ctx=trace_ctx,
)
raise
return traced_create
def wrap_anthropic(client: Any) -> Any:
"""Wrap an Anthropic client to add AgentLens tracing.
Instruments ``client.messages.create()`` to automatically capture LLM spans,
token usage, cost estimation, and tool-call decisions.
Supports both sync (``anthropic.Anthropic``) and async
(``anthropic.AsyncAnthropic``) clients as well as streaming responses.
Args:
client: An ``anthropic.Anthropic`` or ``anthropic.AsyncAnthropic`` instance.
Returns:
The same client instance with ``messages.create`` wrapped for tracing.
Example::
import anthropic
from agentlens.integrations.anthropic import wrap_anthropic
client = anthropic.Anthropic(api_key="sk-...")
traced_client = wrap_anthropic(client)
response = traced_client.messages.create(
model="claude-3-sonnet-20240229",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello!"}]
)
"""
# Detect async client by checking for common async patterns
is_async = False
try:
import asyncio
import inspect
create_method = client.messages.create
if inspect.iscoroutinefunction(create_method) or (
hasattr(create_method, "__wrapped__")
and inspect.iscoroutinefunction(create_method.__wrapped__)
):
is_async = True
except (AttributeError, ImportError):
pass
# Also detect by class name as a fallback
client_class_name = type(client).__name__
if "Async" in client_class_name:
is_async = True
original_create = client.messages.create
traced_create = _wrap_create(original_create, is_async=is_async)
client.messages.create = traced_create
logger.debug("Anthropic client wrapped with AgentLens tracing")
return client