fix: trace ingest FK violation on tool-call decisions

Spans must be inserted before decision points due to
DecisionPoint.parentSpanId FK referencing Span.id. Switched from
nested Prisma create to interactive transaction with topological
span ordering. Also adds real MoonshotAI LLM test script.
This commit is contained in:
Vectry
2026-02-10 02:16:10 +00:00
parent 98bfa968ce
commit d91fdfc81a
2 changed files with 432 additions and 36 deletions

View File

@@ -63,6 +63,24 @@ interface BatchTracesRequest {
traces: TracePayload[]; traces: TracePayload[];
} }
function topologicalSortSpans(spans: SpanPayload[]): SpanPayload[] {
const byId = new Map(spans.map((s) => [s.id, s]));
const sorted: SpanPayload[] = [];
const visited = new Set<string>();
function visit(span: SpanPayload) {
if (visited.has(span.id)) return;
visited.add(span.id);
if (span.parentSpanId && byId.has(span.parentSpanId)) {
visit(byId.get(span.parentSpanId)!);
}
sorted.push(span);
}
for (const span of spans) visit(span);
return sorted;
}
// POST /api/traces — Batch ingest traces from SDK // POST /api/traces — Batch ingest traces from SDK
export async function POST(request: NextRequest) { export async function POST(request: NextRequest) {
try { try {
@@ -166,10 +184,15 @@ export async function POST(request: NextRequest) {
} }
} }
// Insert traces using transaction // Insert traces using interactive transaction to control insert order.
const result = await prisma.$transaction( // Spans must be inserted before decision points due to the
body.traces.map((trace) => // DecisionPoint.parentSpanId FK referencing Span.id.
prisma.trace.create({ const result = await prisma.$transaction(async (tx) => {
const created: string[] = [];
for (const trace of body.traces) {
// 1. Create the trace record (no nested relations)
await tx.trace.create({
data: { data: {
id: trace.id, id: trace.id,
name: trace.name, name: trace.name,
@@ -182,23 +205,18 @@ export async function POST(request: NextRequest) {
totalDuration: trace.totalDuration, totalDuration: trace.totalDuration,
startedAt: new Date(trace.startedAt), startedAt: new Date(trace.startedAt),
endedAt: trace.endedAt ? new Date(trace.endedAt) : null, endedAt: trace.endedAt ? new Date(trace.endedAt) : null,
decisionPoints: {
create: trace.decisionPoints.map((dp) => ({
id: dp.id,
type: dp.type,
reasoning: dp.reasoning,
chosen: dp.chosen as Prisma.InputJsonValue,
alternatives: dp.alternatives as Prisma.InputJsonValue[],
contextSnapshot: dp.contextSnapshot as Prisma.InputJsonValue | undefined,
durationMs: dp.durationMs,
costUsd: dp.costUsd,
parentSpanId: dp.parentSpanId,
timestamp: new Date(dp.timestamp),
})),
}, },
spans: { });
create: trace.spans.map((span) => ({
// 2. Create spans FIRST (decision points may reference them via parentSpanId)
if (trace.spans.length > 0) {
// Sort spans so parents come before children
const spanOrder = topologicalSortSpans(trace.spans);
for (const span of spanOrder) {
await tx.span.create({
data: {
id: span.id, id: span.id,
traceId: trace.id,
parentSpanId: span.parentSpanId, parentSpanId: span.parentSpanId,
name: span.name, name: span.name,
type: span.type, type: span.type,
@@ -212,22 +230,53 @@ export async function POST(request: NextRequest) {
startedAt: new Date(span.startedAt), startedAt: new Date(span.startedAt),
endedAt: span.endedAt ? new Date(span.endedAt) : null, endedAt: span.endedAt ? new Date(span.endedAt) : null,
metadata: span.metadata as Prisma.InputJsonValue | undefined, metadata: span.metadata as Prisma.InputJsonValue | undefined,
})),
}, },
events: { });
create: trace.events.map((event) => ({ }
}
// 3. Create decision points AFTER spans exist
if (trace.decisionPoints.length > 0) {
// Collect valid span IDs so we can null-out invalid parentSpanId refs
const validSpanIds = new Set(trace.spans.map((s) => s.id));
await tx.decisionPoint.createMany({
data: trace.decisionPoints.map((dp) => ({
id: dp.id,
traceId: trace.id,
type: dp.type,
reasoning: dp.reasoning,
chosen: dp.chosen as Prisma.InputJsonValue,
alternatives: dp.alternatives as Prisma.InputJsonValue[],
contextSnapshot: dp.contextSnapshot as Prisma.InputJsonValue | undefined,
durationMs: dp.durationMs,
costUsd: dp.costUsd,
parentSpanId: dp.parentSpanId && validSpanIds.has(dp.parentSpanId) ? dp.parentSpanId : null,
timestamp: new Date(dp.timestamp),
})),
});
}
// 4. Create events
if (trace.events.length > 0) {
await tx.event.createMany({
data: trace.events.map((event) => ({
id: event.id, id: event.id,
traceId: trace.id,
spanId: event.spanId, spanId: event.spanId,
type: event.type, type: event.type,
name: event.name, name: event.name,
metadata: event.metadata as Prisma.InputJsonValue | undefined, metadata: event.metadata as Prisma.InputJsonValue | undefined,
timestamp: new Date(event.timestamp), timestamp: new Date(event.timestamp),
})), })),
}, });
}, }
})
) created.push(trace.id);
); }
return created;
});
return NextResponse.json({ success: true, count: result.length }, { status: 200 }); return NextResponse.json({ success: true, count: result.length }, { status: 200 });
} catch (error) { } catch (error) {

View File

@@ -0,0 +1,347 @@
"""
AgentLens Real LLM Test — MoonshotAI (Kimi) via OpenAI-compatible API.
Tests the full pipeline: SDK → wrap_openai() → real LLM completion → AgentLens dashboard.
Uses MoonshotAI (OpenAI-compatible) with kimi-k2-turbo-preview model.
Usage:
pip install vectry-agentlens openai
python moonshot_real_test.py
"""
import time
import json
import agentlens
from agentlens.integrations.openai import wrap_openai
import openai
# ── Config ──────────────────────────────────────────────────────────
MOONSHOT_API_KEY = "sk-2uhpGUeqISKtiGwd14aGuYJ4tt2p0Ad98qke9T8Ykdc4dEPp"
MOONSHOT_BASE_URL = "https://api.moonshot.ai/v1"
MOONSHOT_MODEL = "kimi-k2-turbo-preview"
AGENTLENS_ENDPOINT = "https://agentlens.vectry.tech"
AGENTLENS_API_KEY = "test-moonshot-key"
# ── Initialize ──────────────────────────────────────────────────────
print("=" * 60)
print("AgentLens Real LLM Test — MoonshotAI (Kimi)")
print("=" * 60)
agentlens.init(
api_key=AGENTLENS_API_KEY,
endpoint=AGENTLENS_ENDPOINT,
)
print(f"[✓] AgentLens initialized → {AGENTLENS_ENDPOINT}")
# Create OpenAI client pointing to MoonshotAI
client = openai.OpenAI(
api_key=MOONSHOT_API_KEY,
base_url=MOONSHOT_BASE_URL,
)
wrap_openai(client)
print(f"[✓] OpenAI client wrapped → {MOONSHOT_BASE_URL}")
print(f"[✓] Model: {MOONSHOT_MODEL}")
print()
# ── Test 1: Basic Completion ────────────────────────────────────────
print("─── Test 1: Basic Completion ───")
with agentlens.trace(
"moonshot-basic-completion",
tags=["moonshot", "test", "basic"],
metadata={"provider": "moonshot", "model": MOONSHOT_MODEL, "test": "basic"},
):
agentlens.log_decision(
type="TOOL_SELECTION",
chosen={
"name": MOONSHOT_MODEL,
"confidence": 0.95,
"params": {"temperature": 0.7, "max_tokens": 200},
},
alternatives=[
{
"name": "moonshot-v1-8k",
"confidence": 0.6,
"reason_rejected": "Older model, less capable",
}
],
reasoning="Using kimi-k2-turbo-preview for best quality/speed balance.",
)
start = time.time()
response = client.chat.completions.create(
model=MOONSHOT_MODEL,
messages=[
{
"role": "system",
"content": "You are a helpful AI assistant. Be concise.",
},
{
"role": "user",
"content": "What are the 3 most important principles of software engineering? Answer in one sentence each.",
},
],
temperature=0.7,
max_tokens=200,
)
elapsed = time.time() - start
content = response.choices[0].message.content
usage = response.usage
print(f" Response ({elapsed:.2f}s):")
print(f" {content[:200]}...")
print(
f" Tokens: {usage.prompt_tokens} in / {usage.completion_tokens} out / {usage.total_tokens} total"
)
print()
# ── Test 2: Multi-turn Conversation with Decision Logging ──────────
print("─── Test 2: Multi-turn with Decisions ───")
with agentlens.trace(
"moonshot-multi-turn-agent",
tags=["moonshot", "test", "multi-turn", "agent"],
metadata={"provider": "moonshot", "model": MOONSHOT_MODEL, "test": "multi-turn"},
):
# Step 1: Classify user intent
agentlens.log_decision(
type="PLANNING",
chosen={
"name": "classify-then-respond",
"confidence": 0.9,
"params": {"strategy": "two-step"},
},
alternatives=[
{
"name": "direct-response",
"confidence": 0.5,
"reason_rejected": "Classification first improves response quality",
}
],
reasoning="Two-step approach: classify intent first, then generate targeted response.",
context_snapshot={"user_query": "Help me debug a Python TypeError"},
)
with agentlens.trace("classify-intent", tags=["classification"]):
classification = client.chat.completions.create(
model=MOONSHOT_MODEL,
messages=[
{
"role": "system",
"content": "Classify the user's programming question into one category: 'syntax', 'runtime', 'logic', 'design', 'performance'. Reply with just the category.",
},
{
"role": "user",
"content": "I'm getting a TypeError: unsupported operand type(s) for +: 'int' and 'str' in my Python code",
},
],
temperature=0.2,
max_tokens=20,
)
category = classification.choices[0].message.content.strip()
print(f" Intent classified: {category}")
# Step 2: Route to appropriate response strategy
agentlens.log_decision(
type="ROUTING",
chosen={
"name": f"respond-as-{category}",
"confidence": 0.85,
},
alternatives=[
{
"name": "generic-response",
"confidence": 0.3,
"reason_rejected": "Classified response is more helpful",
}
],
reasoning=f"User question classified as '{category}' — routing to specialized response.",
context_snapshot={"category": category},
)
# Step 3: Generate response
with agentlens.trace("generate-response", tags=["response"]):
response = client.chat.completions.create(
model=MOONSHOT_MODEL,
messages=[
{
"role": "system",
"content": f"You are an expert Python debugger specializing in {category} errors. Give a concise, actionable fix.",
},
{
"role": "user",
"content": "I'm getting a TypeError: unsupported operand type(s) for +: 'int' and 'str' in my Python code",
},
{
"role": "assistant",
"content": f"This is a {category} error. Let me help you fix it.",
},
{
"role": "user",
"content": "Here's my code: total = count + name where count=5 and name='hello'",
},
],
temperature=0.5,
max_tokens=300,
)
answer = response.choices[0].message.content
print(f" Response: {answer[:150]}...")
print()
# ── Test 3: Tool/Function Calling ───────────────────────────────────
print("─── Test 3: Function Calling ───")
with agentlens.trace(
"moonshot-function-calling",
tags=["moonshot", "test", "tools", "function-calling"],
metadata={
"provider": "moonshot",
"model": MOONSHOT_MODEL,
"test": "function-calling",
},
):
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get the current weather for a location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City name, e.g. 'San Francisco'",
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "Temperature unit",
},
},
"required": ["location"],
},
},
},
{
"type": "function",
"function": {
"name": "search_web",
"description": "Search the web for information",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query",
},
},
"required": ["query"],
},
},
},
]
agentlens.log_decision(
type="TOOL_SELECTION",
chosen={
"name": "provide-tools",
"confidence": 0.9,
"params": {"tools": ["get_weather", "search_web"]},
},
alternatives=[],
reasoning="User query likely requires weather data — providing weather and search tools.",
)
response = client.chat.completions.create(
model=MOONSHOT_MODEL,
messages=[
{
"role": "system",
"content": "You are a helpful assistant with access to tools. Use them when needed.",
},
{"role": "user", "content": "What's the weather like in Lisbon today?"},
],
tools=tools,
temperature=0.3,
max_tokens=200,
)
message = response.choices[0].message
if message.tool_calls:
print(f" Tool calls requested: {len(message.tool_calls)}")
for tc in message.tool_calls:
print(f"{tc.function.name}({tc.function.arguments})")
# Simulate tool response
agentlens.log_decision(
type="TOOL_SELECTION",
chosen={
"name": tc.function.name,
"confidence": 1.0,
},
alternatives=[],
reasoning=f"Model requested {tc.function.name} — executing tool call.",
)
# Send fake tool result back
tool_result = json.dumps(
{
"temperature": 18,
"unit": "celsius",
"condition": "sunny",
"location": "Lisbon",
}
)
final_response = client.chat.completions.create(
model=MOONSHOT_MODEL,
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What's the weather like in Lisbon today?"},
message,
{
"role": "tool",
"tool_call_id": message.tool_calls[0].id,
"content": tool_result,
},
],
temperature=0.5,
max_tokens=200,
)
print(f" Final answer: {final_response.choices[0].message.content[:150]}...")
else:
print(f" Direct response (no tool calls): {message.content[:150]}...")
print()
# ── Shutdown & Verify ───────────────────────────────────────────────
print("─── Flushing traces to AgentLens... ───")
agentlens.shutdown()
print("[✓] All traces flushed")
# Wait a moment for async processing
time.sleep(2)
# Verify traces arrived
print()
print("─── Verifying traces in dashboard... ───")
import httpx
resp = httpx.get(
f"{AGENTLENS_ENDPOINT}/api/traces",
params={"search": "moonshot", "limit": "10"},
headers={"Authorization": f"Bearer {AGENTLENS_API_KEY}"},
)
if resp.status_code == 200:
data = resp.json()
traces = data.get("traces", [])
print(f"[✓] Found {len(traces)} moonshot traces in dashboard:")
for t in traces:
spans = t.get("_count", {}).get("spans", "?")
decisions = t.get("_count", {}).get("decisionPoints", "?")
print(
f"{t['name']} — status={t['status']}, spans={spans}, decisions={decisions}"
)
else:
print(f"[✗] API returned {resp.status_code}: {resp.text[:200]}")
print()
print("=" * 60)
print("Test complete! Visit https://agentlens.vectry.tech/dashboard")
print("=" * 60)