Improve OpenClaw delta parsing and live stream coalescing

This commit is contained in:
Dotta
2026-03-06 08:34:51 -06:00
parent 38d3d5fa59
commit 854e818b74
4 changed files with 116 additions and 12 deletions

View File

@@ -0,0 +1,16 @@
export function normalizeOpenClawStreamLine(rawLine: string): {
stream: "stdout" | "stderr" | null;
line: string;
} {
const trimmed = rawLine.trim();
if (!trimmed) return { stream: null, line: "" };
const prefixed = trimmed.match(/^(stdout|stderr)\s*[:=]?\s*(.*)$/i);
if (!prefixed) {
return { stream: null, line: trimmed };
}
const stream = prefixed[1]?.toLowerCase() === "stderr" ? "stderr" : "stdout";
const line = (prefixed[2] ?? "").trim();
return { stream, line };
}

View File

@@ -1,4 +1,5 @@
import type { TranscriptEntry } from "@paperclipai/adapter-utils"; import type { TranscriptEntry } from "@paperclipai/adapter-utils";
import { normalizeOpenClawStreamLine } from "../shared/stream.js";
function safeJsonParse(text: string): unknown { function safeJsonParse(text: string): unknown {
try { try {
@@ -21,6 +22,51 @@ function asNumber(value: unknown, fallback = 0): number {
return typeof value === "number" && Number.isFinite(value) ? value : fallback; return typeof value === "number" && Number.isFinite(value) ? value : fallback;
} }
function stringifyUnknown(value: unknown): string {
if (typeof value === "string") return value;
if (value === null || value === undefined) return "";
try {
return JSON.stringify(value);
} catch {
return String(value);
}
}
function readErrorText(value: unknown): string {
if (typeof value === "string") return value;
const obj = asRecord(value);
if (!obj) return stringifyUnknown(value);
return (
asString(obj.message).trim() ||
asString(obj.error).trim() ||
asString(obj.code).trim() ||
stringifyUnknown(obj)
);
}
function readDeltaText(payload: Record<string, unknown> | null): string {
if (!payload) return "";
if (typeof payload.delta === "string") return payload.delta;
const deltaObj = asRecord(payload.delta);
if (deltaObj) {
const nestedDelta =
asString(deltaObj.text) ||
asString(deltaObj.value) ||
asString(deltaObj.delta);
if (nestedDelta.length > 0) return nestedDelta;
}
const part = asRecord(payload.part);
if (part) {
const partText = asString(part.text);
if (partText.length > 0) return partText;
}
return "";
}
function extractResponseOutputText(response: Record<string, unknown> | null): string { function extractResponseOutputText(response: Record<string, unknown> | null): string {
if (!response) return ""; if (!response) return "";
@@ -55,8 +101,8 @@ function parseOpenClawSseLine(line: string, ts: string): TranscriptEntry[] {
return []; return [];
} }
const delta = asString(parsed?.delta); const delta = readDeltaText(parsed);
if (normalizedEventType.endsWith(".delta") && delta) { if (normalizedEventType.endsWith(".delta") && delta.length > 0) {
return [{ kind: "assistant", ts, text: delta, delta: true }]; return [{ kind: "assistant", ts, text: delta, delta: true }];
} }
@@ -65,10 +111,7 @@ function parseOpenClawSseLine(line: string, ts: string): TranscriptEntry[] {
normalizedEventType.includes("failed") || normalizedEventType.includes("failed") ||
normalizedEventType.includes("cancel") normalizedEventType.includes("cancel")
) { ) {
const message = const message = readErrorText(parsed?.error) || readErrorText(parsed?.message) || dataText;
asString(parsed?.error).trim() ||
asString(parsed?.message).trim() ||
dataText;
return message ? [{ kind: "stderr", ts, text: message }] : []; return message ? [{ kind: "stderr", ts, text: message }] : [];
} }
@@ -78,9 +121,9 @@ function parseOpenClawSseLine(line: string, ts: string): TranscriptEntry[] {
const status = asString(response?.status, asString(parsed?.status, eventType)); const status = asString(response?.status, asString(parsed?.status, eventType));
const statusLower = status.trim().toLowerCase(); const statusLower = status.trim().toLowerCase();
const errorText = const errorText =
asString(response?.error).trim() || readErrorText(response?.error).trim() ||
asString(parsed?.error).trim() || readErrorText(parsed?.error).trim() ||
asString(parsed?.message).trim(); readErrorText(parsed?.message).trim();
const isError = const isError =
statusLower === "failed" || statusLower === "failed" ||
statusLower === "error" || statusLower === "error" ||
@@ -104,7 +147,12 @@ function parseOpenClawSseLine(line: string, ts: string): TranscriptEntry[] {
} }
export function parseOpenClawStdoutLine(line: string, ts: string): TranscriptEntry[] { export function parseOpenClawStdoutLine(line: string, ts: string): TranscriptEntry[] {
const trimmed = line.trim(); const normalized = normalizeOpenClawStreamLine(line);
if (normalized.stream === "stderr") {
return [{ kind: "stderr", ts, text: normalized.line }];
}
const trimmed = normalized.line.trim();
if (!trimmed) return []; if (!trimmed) return [];
if (trimmed.startsWith("[openclaw:sse]")) { if (trimmed.startsWith("[openclaw:sse]")) {
@@ -115,5 +163,5 @@ export function parseOpenClawStdoutLine(line: string, ts: string): TranscriptEnt
return [{ kind: "system", ts, text: trimmed.replace(/^\[openclaw\]\s*/, "") }]; return [{ kind: "system", ts, text: trimmed.replace(/^\[openclaw\]\s*/, "") }];
} }
return [{ kind: "stdout", ts, text: line }]; return [{ kind: "stdout", ts, text: normalized.line }];
} }

View File

@@ -74,6 +74,21 @@ describe("openclaw ui stdout parser", () => {
]); ]);
}); });
it("parses stdout-prefixed SSE deltas and preserves spacing", () => {
const ts = "2026-03-05T23:07:16.296Z";
const line =
'stdout[openclaw:sse] event=response.output_text.delta data={"type":"response.output_text.delta","delta":" can"}';
expect(parseOpenClawStdoutLine(line, ts)).toEqual([
{
kind: "assistant",
ts,
text: " can",
delta: true,
},
]);
});
it("parses response.completed into usage-aware result entries", () => { it("parses response.completed into usage-aware result entries", () => {
const ts = "2026-03-05T23:07:20.269Z"; const ts = "2026-03-05T23:07:20.269Z";
const line = JSON.stringify({ const line = JSON.stringify({
@@ -128,6 +143,19 @@ describe("openclaw ui stdout parser", () => {
}, },
]); ]);
}); });
it("maps stderr-prefixed lines to stderr transcript entries", () => {
const ts = "2026-03-05T23:07:20.269Z";
const line = "stderr OpenClaw transport error";
expect(parseOpenClawStdoutLine(line, ts)).toEqual([
{
kind: "stderr",
ts,
text: "OpenClaw transport error",
},
]);
});
}); });
describe("openclaw adapter execute", () => { describe("openclaw adapter execute", () => {

View File

@@ -97,8 +97,20 @@ function parseStdoutChunk(
pendingByRun.set(pendingKey, split.pop() ?? ""); pendingByRun.set(pendingKey, split.pop() ?? "");
const adapter = getUIAdapter(run.adapterType); const adapter = getUIAdapter(run.adapterType);
const summarized: Array<{ text: string; tone: FeedTone; thinkingDelta?: boolean }> = []; const summarized: Array<{ text: string; tone: FeedTone; thinkingDelta?: boolean; assistantDelta?: boolean }> = [];
const appendSummary = (entry: TranscriptEntry) => { const appendSummary = (entry: TranscriptEntry) => {
if (entry.kind === "assistant" && entry.delta) {
const text = entry.text;
if (!text.trim()) return;
const last = summarized[summarized.length - 1];
if (last && last.assistantDelta) {
last.text += text;
} else {
summarized.push({ text, tone: "assistant", assistantDelta: true });
}
return;
}
if (entry.kind === "thinking" && entry.delta) { if (entry.kind === "thinking" && entry.delta) {
const text = entry.text; const text = entry.text;
if (!text.trim()) return; if (!text.trim()) return;