Improve OpenClaw SSE transcript parsing and stream readability
This commit is contained in:
@@ -135,7 +135,7 @@ export interface ServerAdapterModule {
|
|||||||
// ---------------------------------------------------------------------------
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
export type TranscriptEntry =
|
export type TranscriptEntry =
|
||||||
| { kind: "assistant"; ts: string; text: string }
|
| { kind: "assistant"; ts: string; text: string; delta?: boolean }
|
||||||
| { kind: "thinking"; ts: string; text: string; delta?: boolean }
|
| { kind: "thinking"; ts: string; text: string; delta?: boolean }
|
||||||
| { kind: "user"; ts: string; text: string }
|
| { kind: "user"; ts: string; text: string }
|
||||||
| { kind: "tool_call"; ts: string; name: string; input: unknown }
|
| { kind: "tool_call"; ts: string; name: string; input: unknown }
|
||||||
|
|||||||
@@ -1,5 +1,119 @@
|
|||||||
import type { TranscriptEntry } from "@paperclipai/adapter-utils";
|
import type { TranscriptEntry } from "@paperclipai/adapter-utils";
|
||||||
|
|
||||||
|
function safeJsonParse(text: string): unknown {
|
||||||
|
try {
|
||||||
|
return JSON.parse(text);
|
||||||
|
} catch {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function asRecord(value: unknown): Record<string, unknown> | null {
|
||||||
|
if (typeof value !== "object" || value === null || Array.isArray(value)) return null;
|
||||||
|
return value as Record<string, unknown>;
|
||||||
|
}
|
||||||
|
|
||||||
|
function asString(value: unknown, fallback = ""): string {
|
||||||
|
return typeof value === "string" ? value : fallback;
|
||||||
|
}
|
||||||
|
|
||||||
|
function asNumber(value: unknown, fallback = 0): number {
|
||||||
|
return typeof value === "number" && Number.isFinite(value) ? value : fallback;
|
||||||
|
}
|
||||||
|
|
||||||
|
function extractResponseOutputText(response: Record<string, unknown> | null): string {
|
||||||
|
if (!response) return "";
|
||||||
|
|
||||||
|
const output = Array.isArray(response.output) ? response.output : [];
|
||||||
|
const parts: string[] = [];
|
||||||
|
for (const itemRaw of output) {
|
||||||
|
const item = asRecord(itemRaw);
|
||||||
|
if (!item) continue;
|
||||||
|
const content = Array.isArray(item.content) ? item.content : [];
|
||||||
|
for (const partRaw of content) {
|
||||||
|
const part = asRecord(partRaw);
|
||||||
|
if (!part) continue;
|
||||||
|
const type = asString(part.type).trim().toLowerCase();
|
||||||
|
if (type !== "output_text" && type !== "text" && type !== "refusal") continue;
|
||||||
|
const text = asString(part.text).trim();
|
||||||
|
if (text) parts.push(text);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return parts.join("\n\n").trim();
|
||||||
|
}
|
||||||
|
|
||||||
|
function parseOpenClawSseLine(line: string, ts: string): TranscriptEntry[] {
|
||||||
|
const match = line.match(/^\[openclaw:sse\]\s+event=([^\s]+)\s+data=(.*)$/s);
|
||||||
|
if (!match) return [{ kind: "stdout", ts, text: line }];
|
||||||
|
|
||||||
|
const eventType = (match[1] ?? "").trim();
|
||||||
|
const dataText = (match[2] ?? "").trim();
|
||||||
|
const parsed = asRecord(safeJsonParse(dataText));
|
||||||
|
const normalizedEventType = eventType.toLowerCase();
|
||||||
|
|
||||||
|
if (dataText === "[DONE]") {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
const delta = asString(parsed?.delta);
|
||||||
|
if (normalizedEventType.endsWith(".delta") && delta) {
|
||||||
|
return [{ kind: "assistant", ts, text: delta, delta: true }];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
normalizedEventType.includes("error") ||
|
||||||
|
normalizedEventType.includes("failed") ||
|
||||||
|
normalizedEventType.includes("cancel")
|
||||||
|
) {
|
||||||
|
const message =
|
||||||
|
asString(parsed?.error).trim() ||
|
||||||
|
asString(parsed?.message).trim() ||
|
||||||
|
dataText;
|
||||||
|
return message ? [{ kind: "stderr", ts, text: message }] : [];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (normalizedEventType === "response.completed" || normalizedEventType.endsWith(".completed")) {
|
||||||
|
const response = asRecord(parsed?.response);
|
||||||
|
const usage = asRecord(response?.usage);
|
||||||
|
const status = asString(response?.status, asString(parsed?.status, eventType));
|
||||||
|
const statusLower = status.trim().toLowerCase();
|
||||||
|
const errorText =
|
||||||
|
asString(response?.error).trim() ||
|
||||||
|
asString(parsed?.error).trim() ||
|
||||||
|
asString(parsed?.message).trim();
|
||||||
|
const isError =
|
||||||
|
statusLower === "failed" ||
|
||||||
|
statusLower === "error" ||
|
||||||
|
statusLower === "cancelled";
|
||||||
|
|
||||||
|
return [{
|
||||||
|
kind: "result",
|
||||||
|
ts,
|
||||||
|
text: extractResponseOutputText(response),
|
||||||
|
inputTokens: asNumber(usage?.input_tokens),
|
||||||
|
outputTokens: asNumber(usage?.output_tokens),
|
||||||
|
cachedTokens: asNumber(usage?.cached_input_tokens),
|
||||||
|
costUsd: asNumber(usage?.cost_usd, asNumber(usage?.total_cost_usd)),
|
||||||
|
subtype: status || eventType,
|
||||||
|
isError,
|
||||||
|
errors: errorText ? [errorText] : [],
|
||||||
|
}];
|
||||||
|
}
|
||||||
|
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
export function parseOpenClawStdoutLine(line: string, ts: string): TranscriptEntry[] {
|
export function parseOpenClawStdoutLine(line: string, ts: string): TranscriptEntry[] {
|
||||||
|
const trimmed = line.trim();
|
||||||
|
if (!trimmed) return [];
|
||||||
|
|
||||||
|
if (trimmed.startsWith("[openclaw:sse]")) {
|
||||||
|
return parseOpenClawSseLine(trimmed, ts);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (trimmed.startsWith("[openclaw]")) {
|
||||||
|
return [{ kind: "system", ts, text: trimmed.replace(/^\[openclaw\]\s*/, "") }];
|
||||||
|
}
|
||||||
|
|
||||||
return [{ kind: "stdout", ts, text: line }];
|
return [{ kind: "stdout", ts, text: line }];
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import { afterEach, describe, expect, it, vi } from "vitest";
|
import { afterEach, describe, expect, it, vi } from "vitest";
|
||||||
import { execute, testEnvironment } from "@paperclipai/adapter-openclaw/server";
|
import { execute, testEnvironment } from "@paperclipai/adapter-openclaw/server";
|
||||||
|
import { parseOpenClawStdoutLine } from "@paperclipai/adapter-openclaw/ui";
|
||||||
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
|
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
|
||||||
|
|
||||||
function buildContext(
|
function buildContext(
|
||||||
@@ -57,6 +58,78 @@ afterEach(() => {
|
|||||||
vi.unstubAllGlobals();
|
vi.unstubAllGlobals();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe("openclaw ui stdout parser", () => {
|
||||||
|
it("parses SSE deltas into assistant streaming entries", () => {
|
||||||
|
const ts = "2026-03-05T23:07:16.296Z";
|
||||||
|
const line =
|
||||||
|
'[openclaw:sse] event=response.output_text.delta data={"type":"response.output_text.delta","delta":"hello"}';
|
||||||
|
|
||||||
|
expect(parseOpenClawStdoutLine(line, ts)).toEqual([
|
||||||
|
{
|
||||||
|
kind: "assistant",
|
||||||
|
ts,
|
||||||
|
text: "hello",
|
||||||
|
delta: true,
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("parses response.completed into usage-aware result entries", () => {
|
||||||
|
const ts = "2026-03-05T23:07:20.269Z";
|
||||||
|
const line = JSON.stringify({
|
||||||
|
type: "response.completed",
|
||||||
|
response: {
|
||||||
|
status: "completed",
|
||||||
|
usage: {
|
||||||
|
input_tokens: 12,
|
||||||
|
output_tokens: 34,
|
||||||
|
cached_input_tokens: 5,
|
||||||
|
},
|
||||||
|
output: [
|
||||||
|
{
|
||||||
|
type: "message",
|
||||||
|
content: [
|
||||||
|
{
|
||||||
|
type: "output_text",
|
||||||
|
text: "All done",
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(parseOpenClawStdoutLine(`[openclaw:sse] event=response.completed data=${line}`, ts)).toEqual([
|
||||||
|
{
|
||||||
|
kind: "result",
|
||||||
|
ts,
|
||||||
|
text: "All done",
|
||||||
|
inputTokens: 12,
|
||||||
|
outputTokens: 34,
|
||||||
|
cachedTokens: 5,
|
||||||
|
costUsd: 0,
|
||||||
|
subtype: "completed",
|
||||||
|
isError: false,
|
||||||
|
errors: [],
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("maps SSE errors to stderr entries", () => {
|
||||||
|
const ts = "2026-03-05T23:07:20.269Z";
|
||||||
|
const line =
|
||||||
|
'[openclaw:sse] event=response.failed data={"type":"response.failed","error":"timeout"}';
|
||||||
|
|
||||||
|
expect(parseOpenClawStdoutLine(line, ts)).toEqual([
|
||||||
|
{
|
||||||
|
kind: "stderr",
|
||||||
|
ts,
|
||||||
|
text: "timeout",
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe("openclaw adapter execute", () => {
|
describe("openclaw adapter execute", () => {
|
||||||
it("uses strict SSE and includes canonical PAPERCLIP context in text payload", async () => {
|
it("uses strict SSE and includes canonical PAPERCLIP context in text payload", async () => {
|
||||||
const fetchMock = vi.fn().mockResolvedValue(
|
const fetchMock = vi.fn().mockResolvedValue(
|
||||||
|
|||||||
@@ -3,9 +3,9 @@ import type { TranscriptEntry, StdoutLineParser } from "./types";
|
|||||||
type RunLogChunk = { ts: string; stream: "stdout" | "stderr" | "system"; chunk: string };
|
type RunLogChunk = { ts: string; stream: "stdout" | "stderr" | "system"; chunk: string };
|
||||||
|
|
||||||
function appendTranscriptEntry(entries: TranscriptEntry[], entry: TranscriptEntry) {
|
function appendTranscriptEntry(entries: TranscriptEntry[], entry: TranscriptEntry) {
|
||||||
if (entry.kind === "thinking" && entry.delta) {
|
if ((entry.kind === "thinking" || entry.kind === "assistant") && entry.delta) {
|
||||||
const last = entries[entries.length - 1];
|
const last = entries[entries.length - 1];
|
||||||
if (last && last.kind === "thinking" && last.delta) {
|
if (last && last.kind === entry.kind && last.delta) {
|
||||||
last.text += entry.text;
|
last.text += entry.text;
|
||||||
last.ts = entry.ts;
|
last.ts = entry.ts;
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -108,8 +108,20 @@ function parseStdoutChunk(
|
|||||||
pendingByRun.set(pendingKey, split.pop() ?? "");
|
pendingByRun.set(pendingKey, split.pop() ?? "");
|
||||||
const adapter = getUIAdapter(run.adapterType);
|
const adapter = getUIAdapter(run.adapterType);
|
||||||
|
|
||||||
const summarized: Array<{ text: string; tone: FeedTone; thinkingDelta?: boolean }> = [];
|
const summarized: Array<{ text: string; tone: FeedTone; thinkingDelta?: boolean; assistantDelta?: boolean }> = [];
|
||||||
const appendSummary = (entry: TranscriptEntry) => {
|
const appendSummary = (entry: TranscriptEntry) => {
|
||||||
|
if (entry.kind === "assistant" && entry.delta) {
|
||||||
|
const text = entry.text;
|
||||||
|
if (!text.trim()) return;
|
||||||
|
const last = summarized[summarized.length - 1];
|
||||||
|
if (last && last.assistantDelta) {
|
||||||
|
last.text += text;
|
||||||
|
} else {
|
||||||
|
summarized.push({ text, tone: "assistant", assistantDelta: true });
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (entry.kind === "thinking" && entry.delta) {
|
if (entry.kind === "thinking" && entry.delta) {
|
||||||
const text = entry.text;
|
const text = entry.text;
|
||||||
if (!text.trim()) return;
|
if (!text.trim()) return;
|
||||||
@@ -133,6 +145,9 @@ function parseStdoutChunk(
|
|||||||
if (!trimmed) continue;
|
if (!trimmed) continue;
|
||||||
const parsed = adapter.parseStdoutLine(trimmed, ts);
|
const parsed = adapter.parseStdoutLine(trimmed, ts);
|
||||||
if (parsed.length === 0) {
|
if (parsed.length === 0) {
|
||||||
|
if (run.adapterType === "openclaw") {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
const fallback = createFeedItem(run, ts, trimmed, "info", nextIdRef.current++);
|
const fallback = createFeedItem(run, ts, trimmed, "info", nextIdRef.current++);
|
||||||
if (fallback) items.push(fallback);
|
if (fallback) items.push(fallback);
|
||||||
continue;
|
continue;
|
||||||
|
|||||||
Reference in New Issue
Block a user