Reintroduce OpenClaw webhook transport alongside SSE
This commit is contained in:
@@ -9,15 +9,17 @@ Adapter: openclaw
|
|||||||
|
|
||||||
Use when:
|
Use when:
|
||||||
- You run an OpenClaw agent remotely and wake it over HTTP.
|
- You run an OpenClaw agent remotely and wake it over HTTP.
|
||||||
- You want SSE-first execution so one Paperclip run captures live progress and completion.
|
- You want selectable transport:
|
||||||
|
- \`sse\` for streaming execution in one Paperclip run.
|
||||||
|
- \`webhook\` for wake-style callbacks (including /hooks/wake compatibility).
|
||||||
|
|
||||||
Don't use when:
|
Don't use when:
|
||||||
- You need local CLI execution inside Paperclip (use claude_local/codex_local/opencode_local/process).
|
- You need local CLI execution inside Paperclip (use claude_local/codex_local/opencode_local/process).
|
||||||
- The OpenClaw endpoint is not reachable from the Paperclip server.
|
- The OpenClaw endpoint is not reachable from the Paperclip server.
|
||||||
|
|
||||||
Core fields:
|
Core fields:
|
||||||
- url (string, required): OpenClaw SSE endpoint URL
|
- url (string, required): OpenClaw endpoint URL
|
||||||
- streamTransport (string, optional): must be \`sse\` when provided
|
- streamTransport (string, optional): \`sse\` (default) or \`webhook\`
|
||||||
- method (string, optional): HTTP method, default POST
|
- method (string, optional): HTTP method, default POST
|
||||||
- headers (object, optional): extra HTTP headers for requests
|
- headers (object, optional): extra HTTP headers for requests
|
||||||
- webhookAuthHeader (string, optional): Authorization header value if your endpoint requires auth
|
- webhookAuthHeader (string, optional): Authorization header value if your endpoint requires auth
|
||||||
|
|||||||
394
packages/adapters/openclaw/src/server/execute-common.ts
Normal file
394
packages/adapters/openclaw/src/server/execute-common.ts
Normal file
@@ -0,0 +1,394 @@
|
|||||||
|
import type { AdapterExecutionContext } from "@paperclipai/adapter-utils";
|
||||||
|
import { asNumber, asString, buildPaperclipEnv, parseObject } from "@paperclipai/adapter-utils/server-utils";
|
||||||
|
import { createHash } from "node:crypto";
|
||||||
|
import { parseOpenClawResponse } from "./parse.js";
|
||||||
|
|
||||||
|
export type OpenClawTransport = "sse" | "webhook";
|
||||||
|
export type SessionKeyStrategy = "fixed" | "issue" | "run";
|
||||||
|
|
||||||
|
export type WakePayload = {
|
||||||
|
runId: string;
|
||||||
|
agentId: string;
|
||||||
|
companyId: string;
|
||||||
|
taskId: string | null;
|
||||||
|
issueId: string | null;
|
||||||
|
wakeReason: string | null;
|
||||||
|
wakeCommentId: string | null;
|
||||||
|
approvalId: string | null;
|
||||||
|
approvalStatus: string | null;
|
||||||
|
issueIds: string[];
|
||||||
|
};
|
||||||
|
|
||||||
|
export type OpenClawExecutionState = {
|
||||||
|
method: string;
|
||||||
|
timeoutSec: number;
|
||||||
|
headers: Record<string, string>;
|
||||||
|
payloadTemplate: Record<string, unknown>;
|
||||||
|
wakePayload: WakePayload;
|
||||||
|
sessionKey: string;
|
||||||
|
paperclipEnv: Record<string, string>;
|
||||||
|
wakeText: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
const SENSITIVE_LOG_KEY_PATTERN =
|
||||||
|
/(^|[_-])(auth|authorization|token|secret|password|api[_-]?key|private[_-]?key)([_-]|$)|^x-openclaw-auth$/i;
|
||||||
|
|
||||||
|
export function nonEmpty(value: unknown): string | null {
|
||||||
|
return typeof value === "string" && value.trim().length > 0 ? value.trim() : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function toAuthorizationHeaderValue(rawToken: string): string {
|
||||||
|
const trimmed = rawToken.trim();
|
||||||
|
if (!trimmed) return trimmed;
|
||||||
|
return /^bearer\s+/i.test(trimmed) ? trimmed : `Bearer ${trimmed}`;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function resolvePaperclipApiUrlOverride(value: unknown): string | null {
|
||||||
|
const raw = nonEmpty(value);
|
||||||
|
if (!raw) return null;
|
||||||
|
try {
|
||||||
|
const parsed = new URL(raw);
|
||||||
|
if (parsed.protocol !== "http:" && parsed.protocol !== "https:") return null;
|
||||||
|
return parsed.toString();
|
||||||
|
} catch {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function normalizeSessionKeyStrategy(value: unknown): SessionKeyStrategy {
|
||||||
|
const normalized = asString(value, "fixed").trim().toLowerCase();
|
||||||
|
if (normalized === "issue" || normalized === "run") return normalized;
|
||||||
|
return "fixed";
|
||||||
|
}
|
||||||
|
|
||||||
|
export function resolveSessionKey(input: {
|
||||||
|
strategy: SessionKeyStrategy;
|
||||||
|
configuredSessionKey: string | null;
|
||||||
|
runId: string;
|
||||||
|
issueId: string | null;
|
||||||
|
}): string {
|
||||||
|
const fallback = input.configuredSessionKey ?? "paperclip";
|
||||||
|
if (input.strategy === "run") return `paperclip:run:${input.runId}`;
|
||||||
|
if (input.strategy === "issue" && input.issueId) return `paperclip:issue:${input.issueId}`;
|
||||||
|
return fallback;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function isWakeCompatibilityEndpoint(url: string): boolean {
|
||||||
|
try {
|
||||||
|
const parsed = new URL(url);
|
||||||
|
const path = parsed.pathname.toLowerCase();
|
||||||
|
return path === "/hooks/wake" || path.endsWith("/hooks/wake");
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function isOpenResponsesEndpoint(url: string): boolean {
|
||||||
|
try {
|
||||||
|
const parsed = new URL(url);
|
||||||
|
const path = parsed.pathname.toLowerCase();
|
||||||
|
return path === "/v1/responses" || path.endsWith("/v1/responses");
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function toStringRecord(value: unknown): Record<string, string> {
|
||||||
|
const parsed = parseObject(value);
|
||||||
|
const out: Record<string, string> = {};
|
||||||
|
for (const [key, entry] of Object.entries(parsed)) {
|
||||||
|
if (typeof entry === "string") {
|
||||||
|
out[key] = entry;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
|
||||||
|
function isSensitiveLogKey(key: string): boolean {
|
||||||
|
return SENSITIVE_LOG_KEY_PATTERN.test(key.trim());
|
||||||
|
}
|
||||||
|
|
||||||
|
function sha256Prefix(value: string): string {
|
||||||
|
return createHash("sha256").update(value).digest("hex").slice(0, 12);
|
||||||
|
}
|
||||||
|
|
||||||
|
function redactSecretForLog(value: string): string {
|
||||||
|
return `[redacted len=${value.length} sha256=${sha256Prefix(value)}]`;
|
||||||
|
}
|
||||||
|
|
||||||
|
function truncateForLog(value: string, maxChars = 320): string {
|
||||||
|
if (value.length <= maxChars) return value;
|
||||||
|
return `${value.slice(0, maxChars)}... [truncated ${value.length - maxChars} chars]`;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function redactForLog(value: unknown, keyPath: string[] = [], depth = 0): unknown {
|
||||||
|
const currentKey = keyPath[keyPath.length - 1] ?? "";
|
||||||
|
if (typeof value === "string") {
|
||||||
|
if (isSensitiveLogKey(currentKey)) return redactSecretForLog(value);
|
||||||
|
return truncateForLog(value);
|
||||||
|
}
|
||||||
|
if (typeof value === "number" || typeof value === "boolean" || value == null) {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
if (Array.isArray(value)) {
|
||||||
|
if (depth >= 6) return "[array-truncated]";
|
||||||
|
const out = value.slice(0, 20).map((entry, index) => redactForLog(entry, [...keyPath, `${index}`], depth + 1));
|
||||||
|
if (value.length > 20) out.push(`[+${value.length - 20} more items]`);
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
if (typeof value === "object") {
|
||||||
|
if (depth >= 6) return "[object-truncated]";
|
||||||
|
const entries = Object.entries(value as Record<string, unknown>);
|
||||||
|
const out: Record<string, unknown> = {};
|
||||||
|
for (const [key, entry] of entries.slice(0, 80)) {
|
||||||
|
out[key] = redactForLog(entry, [...keyPath, key], depth + 1);
|
||||||
|
}
|
||||||
|
if (entries.length > 80) {
|
||||||
|
out.__truncated__ = `+${entries.length - 80} keys`;
|
||||||
|
}
|
||||||
|
return out;
|
||||||
|
}
|
||||||
|
return String(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
export function stringifyForLog(value: unknown, maxChars: number): string {
|
||||||
|
const text = JSON.stringify(value);
|
||||||
|
if (text.length <= maxChars) return text;
|
||||||
|
return `${text.slice(0, maxChars)}... [truncated ${text.length - maxChars} chars]`;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function buildWakePayload(ctx: AdapterExecutionContext): WakePayload {
|
||||||
|
const { runId, agent, context } = ctx;
|
||||||
|
return {
|
||||||
|
runId,
|
||||||
|
agentId: agent.id,
|
||||||
|
companyId: agent.companyId,
|
||||||
|
taskId: nonEmpty(context.taskId) ?? nonEmpty(context.issueId),
|
||||||
|
issueId: nonEmpty(context.issueId),
|
||||||
|
wakeReason: nonEmpty(context.wakeReason),
|
||||||
|
wakeCommentId: nonEmpty(context.wakeCommentId) ?? nonEmpty(context.commentId),
|
||||||
|
approvalId: nonEmpty(context.approvalId),
|
||||||
|
approvalStatus: nonEmpty(context.approvalStatus),
|
||||||
|
issueIds: Array.isArray(context.issueIds)
|
||||||
|
? context.issueIds.filter(
|
||||||
|
(value): value is string => typeof value === "string" && value.trim().length > 0,
|
||||||
|
)
|
||||||
|
: [],
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export function buildPaperclipEnvForWake(ctx: AdapterExecutionContext, wakePayload: WakePayload): Record<string, string> {
|
||||||
|
const paperclipApiUrlOverride = resolvePaperclipApiUrlOverride(ctx.config.paperclipApiUrl);
|
||||||
|
const paperclipEnv: Record<string, string> = {
|
||||||
|
...buildPaperclipEnv(ctx.agent),
|
||||||
|
PAPERCLIP_RUN_ID: ctx.runId,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (paperclipApiUrlOverride) {
|
||||||
|
paperclipEnv.PAPERCLIP_API_URL = paperclipApiUrlOverride;
|
||||||
|
}
|
||||||
|
if (wakePayload.taskId) paperclipEnv.PAPERCLIP_TASK_ID = wakePayload.taskId;
|
||||||
|
if (wakePayload.wakeReason) paperclipEnv.PAPERCLIP_WAKE_REASON = wakePayload.wakeReason;
|
||||||
|
if (wakePayload.wakeCommentId) paperclipEnv.PAPERCLIP_WAKE_COMMENT_ID = wakePayload.wakeCommentId;
|
||||||
|
if (wakePayload.approvalId) paperclipEnv.PAPERCLIP_APPROVAL_ID = wakePayload.approvalId;
|
||||||
|
if (wakePayload.approvalStatus) paperclipEnv.PAPERCLIP_APPROVAL_STATUS = wakePayload.approvalStatus;
|
||||||
|
if (wakePayload.issueIds.length > 0) {
|
||||||
|
paperclipEnv.PAPERCLIP_LINKED_ISSUE_IDS = wakePayload.issueIds.join(",");
|
||||||
|
}
|
||||||
|
|
||||||
|
return paperclipEnv;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function buildWakeText(payload: WakePayload, paperclipEnv: Record<string, string>): string {
|
||||||
|
const claimedApiKeyPath = "~/.openclaw/workspace/paperclip-claimed-api-key.json";
|
||||||
|
const orderedKeys = [
|
||||||
|
"PAPERCLIP_RUN_ID",
|
||||||
|
"PAPERCLIP_AGENT_ID",
|
||||||
|
"PAPERCLIP_COMPANY_ID",
|
||||||
|
"PAPERCLIP_API_URL",
|
||||||
|
"PAPERCLIP_TASK_ID",
|
||||||
|
"PAPERCLIP_WAKE_REASON",
|
||||||
|
"PAPERCLIP_WAKE_COMMENT_ID",
|
||||||
|
"PAPERCLIP_APPROVAL_ID",
|
||||||
|
"PAPERCLIP_APPROVAL_STATUS",
|
||||||
|
"PAPERCLIP_LINKED_ISSUE_IDS",
|
||||||
|
];
|
||||||
|
|
||||||
|
const envLines: string[] = [];
|
||||||
|
for (const key of orderedKeys) {
|
||||||
|
const value = paperclipEnv[key];
|
||||||
|
if (!value) continue;
|
||||||
|
envLines.push(`${key}=${value}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const lines = [
|
||||||
|
"Paperclip wake event for a cloud adapter.",
|
||||||
|
"",
|
||||||
|
"Set these values in your run context:",
|
||||||
|
...envLines,
|
||||||
|
`PAPERCLIP_API_KEY=<token from ${claimedApiKeyPath}>`,
|
||||||
|
"",
|
||||||
|
`Load PAPERCLIP_API_KEY from ${claimedApiKeyPath} (the token you saved after claim-api-key).`,
|
||||||
|
"",
|
||||||
|
`task_id=${payload.taskId ?? ""}`,
|
||||||
|
`issue_id=${payload.issueId ?? ""}`,
|
||||||
|
`wake_reason=${payload.wakeReason ?? ""}`,
|
||||||
|
`wake_comment_id=${payload.wakeCommentId ?? ""}`,
|
||||||
|
`approval_id=${payload.approvalId ?? ""}`,
|
||||||
|
`approval_status=${payload.approvalStatus ?? ""}`,
|
||||||
|
`linked_issue_ids=${payload.issueIds.join(",")}`,
|
||||||
|
];
|
||||||
|
|
||||||
|
lines.push("", "Run your Paperclip heartbeat procedure now.");
|
||||||
|
return lines.join("\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
export function appendWakeText(baseText: string, wakeText: string): string {
|
||||||
|
const trimmedBase = baseText.trim();
|
||||||
|
return trimmedBase.length > 0 ? `${trimmedBase}\n\n${wakeText}` : wakeText;
|
||||||
|
}
|
||||||
|
|
||||||
|
function buildOpenResponsesWakeInputMessage(wakeText: string): Record<string, unknown> {
|
||||||
|
return {
|
||||||
|
type: "message",
|
||||||
|
role: "user",
|
||||||
|
content: [
|
||||||
|
{
|
||||||
|
type: "input_text",
|
||||||
|
text: wakeText,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export function appendWakeTextToOpenResponsesInput(input: unknown, wakeText: string): unknown {
|
||||||
|
if (typeof input === "string") {
|
||||||
|
return appendWakeText(input, wakeText);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Array.isArray(input)) {
|
||||||
|
return [...input, buildOpenResponsesWakeInputMessage(wakeText)];
|
||||||
|
}
|
||||||
|
|
||||||
|
if (typeof input === "object" && input !== null) {
|
||||||
|
const parsed = parseObject(input);
|
||||||
|
const content = parsed.content;
|
||||||
|
if (typeof content === "string") {
|
||||||
|
return {
|
||||||
|
...parsed,
|
||||||
|
content: appendWakeText(content, wakeText),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
if (Array.isArray(content)) {
|
||||||
|
return {
|
||||||
|
...parsed,
|
||||||
|
content: [
|
||||||
|
...content,
|
||||||
|
{
|
||||||
|
type: "input_text",
|
||||||
|
text: wakeText,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
};
|
||||||
|
}
|
||||||
|
return [parsed, buildOpenResponsesWakeInputMessage(wakeText)];
|
||||||
|
}
|
||||||
|
|
||||||
|
return wakeText;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function isTextRequiredResponse(responseText: string): boolean {
|
||||||
|
const parsed = parseOpenClawResponse(responseText);
|
||||||
|
const parsedError = parsed && typeof parsed.error === "string" ? parsed.error : null;
|
||||||
|
if (parsedError && parsedError.toLowerCase().includes("text required")) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return responseText.toLowerCase().includes("text required");
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function sendJsonRequest(params: {
|
||||||
|
url: string;
|
||||||
|
method: string;
|
||||||
|
headers: Record<string, string>;
|
||||||
|
payload: Record<string, unknown>;
|
||||||
|
signal: AbortSignal;
|
||||||
|
}): Promise<Response> {
|
||||||
|
return fetch(params.url, {
|
||||||
|
method: params.method,
|
||||||
|
headers: params.headers,
|
||||||
|
body: JSON.stringify(params.payload),
|
||||||
|
signal: params.signal,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function readAndLogResponseText(params: {
|
||||||
|
response: Response;
|
||||||
|
onLog: AdapterExecutionContext["onLog"];
|
||||||
|
}): Promise<string> {
|
||||||
|
const responseText = await params.response.text();
|
||||||
|
if (responseText.trim().length > 0) {
|
||||||
|
await params.onLog(
|
||||||
|
"stdout",
|
||||||
|
`[openclaw] response (${params.response.status}) ${responseText.slice(0, 2000)}\n`,
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
await params.onLog("stdout", `[openclaw] response (${params.response.status}) <empty>\n`);
|
||||||
|
}
|
||||||
|
return responseText;
|
||||||
|
}
|
||||||
|
|
||||||
|
export function buildExecutionState(ctx: AdapterExecutionContext): OpenClawExecutionState {
|
||||||
|
const method = asString(ctx.config.method, "POST").trim().toUpperCase() || "POST";
|
||||||
|
const timeoutSecRaw = asNumber(ctx.config.timeoutSec, 0);
|
||||||
|
const timeoutSec = timeoutSecRaw > 0 ? Math.max(1, Math.floor(timeoutSecRaw)) : 0;
|
||||||
|
const headersConfig = parseObject(ctx.config.headers) as Record<string, unknown>;
|
||||||
|
const payloadTemplate = parseObject(ctx.config.payloadTemplate);
|
||||||
|
const webhookAuthHeader = nonEmpty(ctx.config.webhookAuthHeader);
|
||||||
|
const sessionKeyStrategy = normalizeSessionKeyStrategy(ctx.config.sessionKeyStrategy);
|
||||||
|
|
||||||
|
const headers: Record<string, string> = {
|
||||||
|
"content-type": "application/json",
|
||||||
|
};
|
||||||
|
for (const [key, value] of Object.entries(headersConfig)) {
|
||||||
|
if (typeof value === "string" && value.trim().length > 0) {
|
||||||
|
headers[key] = value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const openClawAuthHeader = nonEmpty(headers["x-openclaw-auth"] ?? headers["X-OpenClaw-Auth"]);
|
||||||
|
if (openClawAuthHeader && !headers.authorization && !headers.Authorization) {
|
||||||
|
headers.authorization = toAuthorizationHeaderValue(openClawAuthHeader);
|
||||||
|
}
|
||||||
|
if (webhookAuthHeader && !headers.authorization && !headers.Authorization) {
|
||||||
|
headers.authorization = webhookAuthHeader;
|
||||||
|
}
|
||||||
|
|
||||||
|
const wakePayload = buildWakePayload(ctx);
|
||||||
|
const sessionKey = resolveSessionKey({
|
||||||
|
strategy: sessionKeyStrategy,
|
||||||
|
configuredSessionKey: nonEmpty(ctx.config.sessionKey),
|
||||||
|
runId: ctx.runId,
|
||||||
|
issueId: wakePayload.issueId ?? wakePayload.taskId,
|
||||||
|
});
|
||||||
|
|
||||||
|
const paperclipEnv = buildPaperclipEnvForWake(ctx, wakePayload);
|
||||||
|
const wakeText = buildWakeText(wakePayload, paperclipEnv);
|
||||||
|
|
||||||
|
return {
|
||||||
|
method,
|
||||||
|
timeoutSec,
|
||||||
|
headers,
|
||||||
|
payloadTemplate,
|
||||||
|
wakePayload,
|
||||||
|
sessionKey,
|
||||||
|
paperclipEnv,
|
||||||
|
wakeText,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export function buildWakeCompatibilityPayload(wakeText: string): Record<string, unknown> {
|
||||||
|
return {
|
||||||
|
text: wakeText,
|
||||||
|
mode: "now",
|
||||||
|
};
|
||||||
|
}
|
||||||
469
packages/adapters/openclaw/src/server/execute-sse.ts
Normal file
469
packages/adapters/openclaw/src/server/execute-sse.ts
Normal file
@@ -0,0 +1,469 @@
|
|||||||
|
import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclipai/adapter-utils";
|
||||||
|
import {
|
||||||
|
appendWakeTextToOpenResponsesInput,
|
||||||
|
buildExecutionState,
|
||||||
|
isOpenResponsesEndpoint,
|
||||||
|
isTextRequiredResponse,
|
||||||
|
readAndLogResponseText,
|
||||||
|
redactForLog,
|
||||||
|
sendJsonRequest,
|
||||||
|
stringifyForLog,
|
||||||
|
toStringRecord,
|
||||||
|
type OpenClawExecutionState,
|
||||||
|
} from "./execute-common.js";
|
||||||
|
import { parseOpenClawResponse } from "./parse.js";
|
||||||
|
|
||||||
|
type ConsumedSse = {
|
||||||
|
eventCount: number;
|
||||||
|
lastEventType: string | null;
|
||||||
|
lastData: string | null;
|
||||||
|
lastPayload: Record<string, unknown> | null;
|
||||||
|
terminal: boolean;
|
||||||
|
failed: boolean;
|
||||||
|
errorMessage: string | null;
|
||||||
|
};
|
||||||
|
|
||||||
|
function nonEmpty(value: unknown): string | null {
|
||||||
|
return typeof value === "string" && value.trim().length > 0 ? value.trim() : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function inferSseTerminal(input: {
|
||||||
|
eventType: string;
|
||||||
|
data: string;
|
||||||
|
parsedPayload: Record<string, unknown> | null;
|
||||||
|
}): { terminal: boolean; failed: boolean; errorMessage: string | null } {
|
||||||
|
const normalizedType = input.eventType.trim().toLowerCase();
|
||||||
|
const trimmedData = input.data.trim();
|
||||||
|
const payload = input.parsedPayload;
|
||||||
|
const payloadType = nonEmpty(payload?.type)?.toLowerCase() ?? null;
|
||||||
|
const payloadStatus = nonEmpty(payload?.status)?.toLowerCase() ?? null;
|
||||||
|
|
||||||
|
if (trimmedData === "[DONE]") {
|
||||||
|
return { terminal: true, failed: false, errorMessage: null };
|
||||||
|
}
|
||||||
|
|
||||||
|
const failType =
|
||||||
|
normalizedType.includes("error") ||
|
||||||
|
normalizedType.includes("failed") ||
|
||||||
|
normalizedType.includes("cancel");
|
||||||
|
if (failType) {
|
||||||
|
return {
|
||||||
|
terminal: true,
|
||||||
|
failed: true,
|
||||||
|
errorMessage:
|
||||||
|
nonEmpty(payload?.error) ??
|
||||||
|
nonEmpty(payload?.message) ??
|
||||||
|
(trimmedData.length > 0 ? trimmedData : "OpenClaw SSE error"),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const doneType =
|
||||||
|
normalizedType === "done" ||
|
||||||
|
normalizedType.endsWith(".completed") ||
|
||||||
|
normalizedType === "completed";
|
||||||
|
if (doneType) {
|
||||||
|
return { terminal: true, failed: false, errorMessage: null };
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payloadStatus) {
|
||||||
|
if (
|
||||||
|
payloadStatus === "completed" ||
|
||||||
|
payloadStatus === "succeeded" ||
|
||||||
|
payloadStatus === "done"
|
||||||
|
) {
|
||||||
|
return { terminal: true, failed: false, errorMessage: null };
|
||||||
|
}
|
||||||
|
if (
|
||||||
|
payloadStatus === "failed" ||
|
||||||
|
payloadStatus === "cancelled" ||
|
||||||
|
payloadStatus === "error"
|
||||||
|
) {
|
||||||
|
return {
|
||||||
|
terminal: true,
|
||||||
|
failed: true,
|
||||||
|
errorMessage:
|
||||||
|
nonEmpty(payload?.error) ??
|
||||||
|
nonEmpty(payload?.message) ??
|
||||||
|
`OpenClaw SSE status ${payloadStatus}`,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payloadType) {
|
||||||
|
if (payloadType.endsWith(".completed")) {
|
||||||
|
return { terminal: true, failed: false, errorMessage: null };
|
||||||
|
}
|
||||||
|
if (
|
||||||
|
payloadType.endsWith(".failed") ||
|
||||||
|
payloadType.endsWith(".cancelled") ||
|
||||||
|
payloadType.endsWith(".error")
|
||||||
|
) {
|
||||||
|
return {
|
||||||
|
terminal: true,
|
||||||
|
failed: true,
|
||||||
|
errorMessage:
|
||||||
|
nonEmpty(payload?.error) ??
|
||||||
|
nonEmpty(payload?.message) ??
|
||||||
|
`OpenClaw SSE type ${payloadType}`,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (payload?.done === true) {
|
||||||
|
return { terminal: true, failed: false, errorMessage: null };
|
||||||
|
}
|
||||||
|
|
||||||
|
return { terminal: false, failed: false, errorMessage: null };
|
||||||
|
}
|
||||||
|
|
||||||
|
async function consumeSseResponse(params: {
|
||||||
|
response: Response;
|
||||||
|
onLog: AdapterExecutionContext["onLog"];
|
||||||
|
}): Promise<ConsumedSse> {
|
||||||
|
const reader = params.response.body?.getReader();
|
||||||
|
if (!reader) {
|
||||||
|
throw new Error("OpenClaw SSE response body is missing");
|
||||||
|
}
|
||||||
|
|
||||||
|
const decoder = new TextDecoder();
|
||||||
|
let buffer = "";
|
||||||
|
let eventType = "message";
|
||||||
|
let dataLines: string[] = [];
|
||||||
|
let eventCount = 0;
|
||||||
|
let lastEventType: string | null = null;
|
||||||
|
let lastData: string | null = null;
|
||||||
|
let lastPayload: Record<string, unknown> | null = null;
|
||||||
|
let terminal = false;
|
||||||
|
let failed = false;
|
||||||
|
let errorMessage: string | null = null;
|
||||||
|
|
||||||
|
const dispatchEvent = async (): Promise<boolean> => {
|
||||||
|
if (dataLines.length === 0) {
|
||||||
|
eventType = "message";
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const data = dataLines.join("\n");
|
||||||
|
const trimmedData = data.trim();
|
||||||
|
const parsedPayload = parseOpenClawResponse(trimmedData);
|
||||||
|
|
||||||
|
eventCount += 1;
|
||||||
|
lastEventType = eventType;
|
||||||
|
lastData = data;
|
||||||
|
if (parsedPayload) lastPayload = parsedPayload;
|
||||||
|
|
||||||
|
const preview =
|
||||||
|
trimmedData.length > 1000 ? `${trimmedData.slice(0, 1000)}...` : trimmedData;
|
||||||
|
await params.onLog("stdout", `[openclaw:sse] event=${eventType} data=${preview}\n`);
|
||||||
|
|
||||||
|
const resolution = inferSseTerminal({
|
||||||
|
eventType,
|
||||||
|
data,
|
||||||
|
parsedPayload,
|
||||||
|
});
|
||||||
|
|
||||||
|
dataLines = [];
|
||||||
|
eventType = "message";
|
||||||
|
|
||||||
|
if (resolution.terminal) {
|
||||||
|
terminal = true;
|
||||||
|
failed = resolution.failed;
|
||||||
|
errorMessage = resolution.errorMessage;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
let shouldStop = false;
|
||||||
|
while (!shouldStop) {
|
||||||
|
const { done, value } = await reader.read();
|
||||||
|
if (done) break;
|
||||||
|
|
||||||
|
buffer += decoder.decode(value, { stream: true });
|
||||||
|
|
||||||
|
while (!shouldStop) {
|
||||||
|
const newlineIndex = buffer.indexOf("\n");
|
||||||
|
if (newlineIndex === -1) break;
|
||||||
|
|
||||||
|
let line = buffer.slice(0, newlineIndex);
|
||||||
|
buffer = buffer.slice(newlineIndex + 1);
|
||||||
|
if (line.endsWith("\r")) line = line.slice(0, -1);
|
||||||
|
|
||||||
|
if (line.length === 0) {
|
||||||
|
shouldStop = await dispatchEvent();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (line.startsWith(":")) continue;
|
||||||
|
|
||||||
|
const colonIndex = line.indexOf(":");
|
||||||
|
const field = colonIndex === -1 ? line : line.slice(0, colonIndex);
|
||||||
|
const rawValue =
|
||||||
|
colonIndex === -1 ? "" : line.slice(colonIndex + 1).replace(/^ /, "");
|
||||||
|
|
||||||
|
if (field === "event") {
|
||||||
|
eventType = rawValue || "message";
|
||||||
|
} else if (field === "data") {
|
||||||
|
dataLines.push(rawValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer += decoder.decode();
|
||||||
|
if (!shouldStop && buffer.trim().length > 0) {
|
||||||
|
for (const rawLine of buffer.split(/\r?\n/)) {
|
||||||
|
const line = rawLine.trimEnd();
|
||||||
|
if (line.length === 0) {
|
||||||
|
shouldStop = await dispatchEvent();
|
||||||
|
if (shouldStop) break;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (line.startsWith(":")) continue;
|
||||||
|
|
||||||
|
const colonIndex = line.indexOf(":");
|
||||||
|
const field = colonIndex === -1 ? line : line.slice(0, colonIndex);
|
||||||
|
const rawValue =
|
||||||
|
colonIndex === -1 ? "" : line.slice(colonIndex + 1).replace(/^ /, "");
|
||||||
|
|
||||||
|
if (field === "event") {
|
||||||
|
eventType = rawValue || "message";
|
||||||
|
} else if (field === "data") {
|
||||||
|
dataLines.push(rawValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!shouldStop && dataLines.length > 0) {
|
||||||
|
await dispatchEvent();
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
eventCount,
|
||||||
|
lastEventType,
|
||||||
|
lastData,
|
||||||
|
lastPayload,
|
||||||
|
terminal,
|
||||||
|
failed,
|
||||||
|
errorMessage,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
function buildSseBody(input: {
|
||||||
|
url: string;
|
||||||
|
state: OpenClawExecutionState;
|
||||||
|
context: AdapterExecutionContext["context"];
|
||||||
|
configModel: unknown;
|
||||||
|
}): { headers: Record<string, string>; body: Record<string, unknown> } {
|
||||||
|
const { url, state, context, configModel } = input;
|
||||||
|
const templateText = nonEmpty(state.payloadTemplate.text);
|
||||||
|
const payloadText = templateText ? `${templateText}\n\n${state.wakeText}` : state.wakeText;
|
||||||
|
|
||||||
|
const isOpenResponses = isOpenResponsesEndpoint(url);
|
||||||
|
const openResponsesInput = Object.prototype.hasOwnProperty.call(state.payloadTemplate, "input")
|
||||||
|
? appendWakeTextToOpenResponsesInput(state.payloadTemplate.input, state.wakeText)
|
||||||
|
: payloadText;
|
||||||
|
|
||||||
|
const body: Record<string, unknown> = isOpenResponses
|
||||||
|
? {
|
||||||
|
...state.payloadTemplate,
|
||||||
|
stream: true,
|
||||||
|
model:
|
||||||
|
nonEmpty(state.payloadTemplate.model) ??
|
||||||
|
nonEmpty(configModel) ??
|
||||||
|
"openclaw",
|
||||||
|
input: openResponsesInput,
|
||||||
|
metadata: {
|
||||||
|
...toStringRecord(state.payloadTemplate.metadata),
|
||||||
|
...state.paperclipEnv,
|
||||||
|
paperclip_session_key: state.sessionKey,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
: {
|
||||||
|
...state.payloadTemplate,
|
||||||
|
stream: true,
|
||||||
|
sessionKey: state.sessionKey,
|
||||||
|
text: payloadText,
|
||||||
|
paperclip: {
|
||||||
|
...state.wakePayload,
|
||||||
|
sessionKey: state.sessionKey,
|
||||||
|
streamTransport: "sse",
|
||||||
|
env: state.paperclipEnv,
|
||||||
|
context,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const headers: Record<string, string> = {
|
||||||
|
...state.headers,
|
||||||
|
accept: "text/event-stream",
|
||||||
|
};
|
||||||
|
|
||||||
|
if (isOpenResponses && !headers["x-openclaw-session-key"] && !headers["X-OpenClaw-Session-Key"]) {
|
||||||
|
headers["x-openclaw-session-key"] = state.sessionKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
return { headers, body };
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function executeSse(ctx: AdapterExecutionContext, url: string): Promise<AdapterExecutionResult> {
|
||||||
|
const { onLog, onMeta, context } = ctx;
|
||||||
|
const state = buildExecutionState(ctx);
|
||||||
|
|
||||||
|
if (onMeta) {
|
||||||
|
await onMeta({
|
||||||
|
adapterType: "openclaw",
|
||||||
|
command: "sse",
|
||||||
|
commandArgs: [state.method, url],
|
||||||
|
context,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const { headers, body } = buildSseBody({
|
||||||
|
url,
|
||||||
|
state,
|
||||||
|
context,
|
||||||
|
configModel: ctx.config.model,
|
||||||
|
});
|
||||||
|
|
||||||
|
const outboundHeaderKeys = Object.keys(headers).sort();
|
||||||
|
await onLog(
|
||||||
|
"stdout",
|
||||||
|
`[openclaw] outbound headers (redacted): ${stringifyForLog(redactForLog(headers), 4_000)}\n`,
|
||||||
|
);
|
||||||
|
await onLog(
|
||||||
|
"stdout",
|
||||||
|
`[openclaw] outbound payload (redacted): ${stringifyForLog(redactForLog(body), 12_000)}\n`,
|
||||||
|
);
|
||||||
|
await onLog("stdout", `[openclaw] outbound header keys: ${outboundHeaderKeys.join(", ")}\n`);
|
||||||
|
await onLog("stdout", `[openclaw] invoking ${state.method} ${url} (transport=sse)\n`);
|
||||||
|
|
||||||
|
const controller = new AbortController();
|
||||||
|
const timeout = state.timeoutSec > 0 ? setTimeout(() => controller.abort(), state.timeoutSec * 1000) : null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const response = await sendJsonRequest({
|
||||||
|
url,
|
||||||
|
method: state.method,
|
||||||
|
headers,
|
||||||
|
payload: body,
|
||||||
|
signal: controller.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
const responseText = await readAndLogResponseText({ response, onLog });
|
||||||
|
return {
|
||||||
|
exitCode: 1,
|
||||||
|
signal: null,
|
||||||
|
timedOut: false,
|
||||||
|
errorMessage:
|
||||||
|
isTextRequiredResponse(responseText)
|
||||||
|
? "OpenClaw endpoint rejected the payload as text-required."
|
||||||
|
: `OpenClaw SSE request failed with status ${response.status}`,
|
||||||
|
errorCode: isTextRequiredResponse(responseText)
|
||||||
|
? "openclaw_text_required"
|
||||||
|
: "openclaw_http_error",
|
||||||
|
resultJson: {
|
||||||
|
status: response.status,
|
||||||
|
statusText: response.statusText,
|
||||||
|
response: parseOpenClawResponse(responseText) ?? responseText,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const contentType = (response.headers.get("content-type") ?? "").toLowerCase();
|
||||||
|
if (!contentType.includes("text/event-stream")) {
|
||||||
|
const responseText = await readAndLogResponseText({ response, onLog });
|
||||||
|
return {
|
||||||
|
exitCode: 1,
|
||||||
|
signal: null,
|
||||||
|
timedOut: false,
|
||||||
|
errorMessage: "OpenClaw SSE endpoint did not return text/event-stream",
|
||||||
|
errorCode: "openclaw_sse_expected_event_stream",
|
||||||
|
resultJson: {
|
||||||
|
status: response.status,
|
||||||
|
statusText: response.statusText,
|
||||||
|
contentType,
|
||||||
|
response: parseOpenClawResponse(responseText) ?? responseText,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const consumed = await consumeSseResponse({ response, onLog });
|
||||||
|
if (consumed.failed) {
|
||||||
|
return {
|
||||||
|
exitCode: 1,
|
||||||
|
signal: null,
|
||||||
|
timedOut: false,
|
||||||
|
errorMessage: consumed.errorMessage ?? "OpenClaw SSE stream failed",
|
||||||
|
errorCode: "openclaw_sse_stream_failed",
|
||||||
|
resultJson: {
|
||||||
|
eventCount: consumed.eventCount,
|
||||||
|
terminal: consumed.terminal,
|
||||||
|
lastEventType: consumed.lastEventType,
|
||||||
|
lastData: consumed.lastData,
|
||||||
|
response: consumed.lastPayload ?? consumed.lastData,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!consumed.terminal) {
|
||||||
|
return {
|
||||||
|
exitCode: 1,
|
||||||
|
signal: null,
|
||||||
|
timedOut: false,
|
||||||
|
errorMessage: "OpenClaw SSE stream closed without a terminal event",
|
||||||
|
errorCode: "openclaw_sse_stream_incomplete",
|
||||||
|
resultJson: {
|
||||||
|
eventCount: consumed.eventCount,
|
||||||
|
terminal: consumed.terminal,
|
||||||
|
lastEventType: consumed.lastEventType,
|
||||||
|
lastData: consumed.lastData,
|
||||||
|
response: consumed.lastPayload ?? consumed.lastData,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
exitCode: 0,
|
||||||
|
signal: null,
|
||||||
|
timedOut: false,
|
||||||
|
provider: "openclaw",
|
||||||
|
model: null,
|
||||||
|
summary: `OpenClaw SSE ${state.method} ${url}`,
|
||||||
|
resultJson: {
|
||||||
|
eventCount: consumed.eventCount,
|
||||||
|
terminal: consumed.terminal,
|
||||||
|
lastEventType: consumed.lastEventType,
|
||||||
|
lastData: consumed.lastData,
|
||||||
|
response: consumed.lastPayload ?? consumed.lastData,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
} catch (err) {
|
||||||
|
if (err instanceof Error && err.name === "AbortError") {
|
||||||
|
const timeoutMessage =
|
||||||
|
state.timeoutSec > 0
|
||||||
|
? `[openclaw] SSE request timed out after ${state.timeoutSec}s\n`
|
||||||
|
: "[openclaw] SSE request aborted\n";
|
||||||
|
await onLog("stderr", timeoutMessage);
|
||||||
|
return {
|
||||||
|
exitCode: null,
|
||||||
|
signal: null,
|
||||||
|
timedOut: true,
|
||||||
|
errorMessage: state.timeoutSec > 0 ? `Timed out after ${state.timeoutSec}s` : "Request aborted",
|
||||||
|
errorCode: "openclaw_sse_timeout",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const message = err instanceof Error ? err.message : String(err);
|
||||||
|
await onLog("stderr", `[openclaw] request failed: ${message}\n`);
|
||||||
|
return {
|
||||||
|
exitCode: 1,
|
||||||
|
signal: null,
|
||||||
|
timedOut: false,
|
||||||
|
errorMessage: message,
|
||||||
|
errorCode: "openclaw_request_failed",
|
||||||
|
};
|
||||||
|
} finally {
|
||||||
|
if (timeout) clearTimeout(timeout);
|
||||||
|
}
|
||||||
|
}
|
||||||
227
packages/adapters/openclaw/src/server/execute-webhook.ts
Normal file
227
packages/adapters/openclaw/src/server/execute-webhook.ts
Normal file
@@ -0,0 +1,227 @@
|
|||||||
|
import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclipai/adapter-utils";
|
||||||
|
import {
|
||||||
|
appendWakeText,
|
||||||
|
buildExecutionState,
|
||||||
|
buildWakeCompatibilityPayload,
|
||||||
|
isTextRequiredResponse,
|
||||||
|
isWakeCompatibilityEndpoint,
|
||||||
|
readAndLogResponseText,
|
||||||
|
redactForLog,
|
||||||
|
sendJsonRequest,
|
||||||
|
stringifyForLog,
|
||||||
|
type OpenClawExecutionState,
|
||||||
|
} from "./execute-common.js";
|
||||||
|
import { parseOpenClawResponse } from "./parse.js";
|
||||||
|
|
||||||
|
function nonEmpty(value: unknown): string | null {
|
||||||
|
return typeof value === "string" && value.trim().length > 0 ? value.trim() : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
function buildWebhookBody(input: {
|
||||||
|
state: OpenClawExecutionState;
|
||||||
|
context: AdapterExecutionContext["context"];
|
||||||
|
}): Record<string, unknown> {
|
||||||
|
const { state, context } = input;
|
||||||
|
const templateText = nonEmpty(state.payloadTemplate.text);
|
||||||
|
const payloadText = templateText ? appendWakeText(templateText, state.wakeText) : state.wakeText;
|
||||||
|
|
||||||
|
return {
|
||||||
|
...state.payloadTemplate,
|
||||||
|
stream: false,
|
||||||
|
sessionKey: state.sessionKey,
|
||||||
|
text: payloadText,
|
||||||
|
paperclip: {
|
||||||
|
...state.wakePayload,
|
||||||
|
sessionKey: state.sessionKey,
|
||||||
|
streamTransport: "webhook",
|
||||||
|
env: state.paperclipEnv,
|
||||||
|
context,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async function sendWebhookRequest(params: {
|
||||||
|
url: string;
|
||||||
|
method: string;
|
||||||
|
headers: Record<string, string>;
|
||||||
|
payload: Record<string, unknown>;
|
||||||
|
onLog: AdapterExecutionContext["onLog"];
|
||||||
|
signal: AbortSignal;
|
||||||
|
}): Promise<{ response: Response; responseText: string }> {
|
||||||
|
const response = await sendJsonRequest({
|
||||||
|
url: params.url,
|
||||||
|
method: params.method,
|
||||||
|
headers: params.headers,
|
||||||
|
payload: params.payload,
|
||||||
|
signal: params.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
const responseText = await readAndLogResponseText({ response, onLog: params.onLog });
|
||||||
|
return { response, responseText };
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function executeWebhook(ctx: AdapterExecutionContext, url: string): Promise<AdapterExecutionResult> {
|
||||||
|
const { onLog, onMeta, context } = ctx;
|
||||||
|
const state = buildExecutionState(ctx);
|
||||||
|
|
||||||
|
if (onMeta) {
|
||||||
|
await onMeta({
|
||||||
|
adapterType: "openclaw",
|
||||||
|
command: "webhook",
|
||||||
|
commandArgs: [state.method, url],
|
||||||
|
context,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
const headers = { ...state.headers };
|
||||||
|
const webhookBody = buildWebhookBody({ state, context });
|
||||||
|
const wakeCompatibilityBody = buildWakeCompatibilityPayload(state.wakeText);
|
||||||
|
const preferWakeCompatibilityBody = isWakeCompatibilityEndpoint(url);
|
||||||
|
const initialBody = preferWakeCompatibilityBody ? wakeCompatibilityBody : webhookBody;
|
||||||
|
|
||||||
|
const outboundHeaderKeys = Object.keys(headers).sort();
|
||||||
|
await onLog(
|
||||||
|
"stdout",
|
||||||
|
`[openclaw] outbound headers (redacted): ${stringifyForLog(redactForLog(headers), 4_000)}\n`,
|
||||||
|
);
|
||||||
|
await onLog(
|
||||||
|
"stdout",
|
||||||
|
`[openclaw] outbound payload (redacted): ${stringifyForLog(redactForLog(initialBody), 12_000)}\n`,
|
||||||
|
);
|
||||||
|
await onLog("stdout", `[openclaw] outbound header keys: ${outboundHeaderKeys.join(", ")}\n`);
|
||||||
|
await onLog("stdout", `[openclaw] invoking ${state.method} ${url} (transport=webhook)\n`);
|
||||||
|
|
||||||
|
if (preferWakeCompatibilityBody) {
|
||||||
|
await onLog("stdout", "[openclaw] using wake text payload for /hooks/wake compatibility\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
const controller = new AbortController();
|
||||||
|
const timeout = state.timeoutSec > 0 ? setTimeout(() => controller.abort(), state.timeoutSec * 1000) : null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
const initialResponse = await sendWebhookRequest({
|
||||||
|
url,
|
||||||
|
method: state.method,
|
||||||
|
headers,
|
||||||
|
payload: initialBody,
|
||||||
|
onLog,
|
||||||
|
signal: controller.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!initialResponse.response.ok) {
|
||||||
|
const canRetryWithWakeCompatibility =
|
||||||
|
!preferWakeCompatibilityBody && isTextRequiredResponse(initialResponse.responseText);
|
||||||
|
|
||||||
|
if (canRetryWithWakeCompatibility) {
|
||||||
|
await onLog(
|
||||||
|
"stdout",
|
||||||
|
"[openclaw] endpoint requires text payload; retrying with wake compatibility format\n",
|
||||||
|
);
|
||||||
|
|
||||||
|
const retryResponse = await sendWebhookRequest({
|
||||||
|
url,
|
||||||
|
method: state.method,
|
||||||
|
headers,
|
||||||
|
payload: wakeCompatibilityBody,
|
||||||
|
onLog,
|
||||||
|
signal: controller.signal,
|
||||||
|
});
|
||||||
|
|
||||||
|
if (retryResponse.response.ok) {
|
||||||
|
return {
|
||||||
|
exitCode: 0,
|
||||||
|
signal: null,
|
||||||
|
timedOut: false,
|
||||||
|
provider: "openclaw",
|
||||||
|
model: null,
|
||||||
|
summary: `OpenClaw webhook ${state.method} ${url} (wake compatibility)`,
|
||||||
|
resultJson: {
|
||||||
|
status: retryResponse.response.status,
|
||||||
|
statusText: retryResponse.response.statusText,
|
||||||
|
compatibilityMode: "wake_text",
|
||||||
|
response: parseOpenClawResponse(retryResponse.responseText) ?? retryResponse.responseText,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
exitCode: 1,
|
||||||
|
signal: null,
|
||||||
|
timedOut: false,
|
||||||
|
errorMessage:
|
||||||
|
isTextRequiredResponse(retryResponse.responseText)
|
||||||
|
? "OpenClaw endpoint rejected the wake compatibility payload as text-required."
|
||||||
|
: `OpenClaw webhook failed with status ${retryResponse.response.status}`,
|
||||||
|
errorCode: isTextRequiredResponse(retryResponse.responseText)
|
||||||
|
? "openclaw_text_required"
|
||||||
|
: "openclaw_http_error",
|
||||||
|
resultJson: {
|
||||||
|
status: retryResponse.response.status,
|
||||||
|
statusText: retryResponse.response.statusText,
|
||||||
|
compatibilityMode: "wake_text",
|
||||||
|
response: parseOpenClawResponse(retryResponse.responseText) ?? retryResponse.responseText,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
exitCode: 1,
|
||||||
|
signal: null,
|
||||||
|
timedOut: false,
|
||||||
|
errorMessage:
|
||||||
|
isTextRequiredResponse(initialResponse.responseText)
|
||||||
|
? "OpenClaw endpoint rejected the payload as text-required."
|
||||||
|
: `OpenClaw webhook failed with status ${initialResponse.response.status}`,
|
||||||
|
errorCode: isTextRequiredResponse(initialResponse.responseText)
|
||||||
|
? "openclaw_text_required"
|
||||||
|
: "openclaw_http_error",
|
||||||
|
resultJson: {
|
||||||
|
status: initialResponse.response.status,
|
||||||
|
statusText: initialResponse.response.statusText,
|
||||||
|
response: parseOpenClawResponse(initialResponse.responseText) ?? initialResponse.responseText,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
exitCode: 0,
|
||||||
|
signal: null,
|
||||||
|
timedOut: false,
|
||||||
|
provider: "openclaw",
|
||||||
|
model: null,
|
||||||
|
summary: `OpenClaw webhook ${state.method} ${url}`,
|
||||||
|
resultJson: {
|
||||||
|
status: initialResponse.response.status,
|
||||||
|
statusText: initialResponse.response.statusText,
|
||||||
|
response: parseOpenClawResponse(initialResponse.responseText) ?? initialResponse.responseText,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
} catch (err) {
|
||||||
|
if (err instanceof Error && err.name === "AbortError") {
|
||||||
|
const timeoutMessage =
|
||||||
|
state.timeoutSec > 0
|
||||||
|
? `[openclaw] webhook request timed out after ${state.timeoutSec}s\n`
|
||||||
|
: "[openclaw] webhook request aborted\n";
|
||||||
|
await onLog("stderr", timeoutMessage);
|
||||||
|
return {
|
||||||
|
exitCode: null,
|
||||||
|
signal: null,
|
||||||
|
timedOut: true,
|
||||||
|
errorMessage: state.timeoutSec > 0 ? `Timed out after ${state.timeoutSec}s` : "Request aborted",
|
||||||
|
errorCode: "openclaw_webhook_timeout",
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
const message = err instanceof Error ? err.message : String(err);
|
||||||
|
await onLog("stderr", `[openclaw] request failed: ${message}\n`);
|
||||||
|
return {
|
||||||
|
exitCode: 1,
|
||||||
|
signal: null,
|
||||||
|
timedOut: false,
|
||||||
|
errorMessage: message,
|
||||||
|
errorCode: "openclaw_request_failed",
|
||||||
|
};
|
||||||
|
} finally {
|
||||||
|
if (timeout) clearTimeout(timeout);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,523 +1,18 @@
|
|||||||
import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclipai/adapter-utils";
|
import type { AdapterExecutionContext, AdapterExecutionResult } from "@paperclipai/adapter-utils";
|
||||||
import { asNumber, asString, buildPaperclipEnv, parseObject } from "@paperclipai/adapter-utils/server-utils";
|
import { asString } from "@paperclipai/adapter-utils/server-utils";
|
||||||
import { createHash } from "node:crypto";
|
import { isWakeCompatibilityEndpoint } from "./execute-common.js";
|
||||||
import { parseOpenClawResponse } from "./parse.js";
|
import { executeSse } from "./execute-sse.js";
|
||||||
|
import { executeWebhook } from "./execute-webhook.js";
|
||||||
|
|
||||||
type SessionKeyStrategy = "fixed" | "issue" | "run";
|
function normalizeTransport(value: unknown): "sse" | "webhook" | null {
|
||||||
|
const normalized = asString(value, "sse").trim().toLowerCase();
|
||||||
function nonEmpty(value: unknown): string | null {
|
if (!normalized || normalized === "sse") return "sse";
|
||||||
return typeof value === "string" && value.trim().length > 0 ? value.trim() : null;
|
if (normalized === "webhook") return "webhook";
|
||||||
}
|
return null;
|
||||||
|
|
||||||
function toAuthorizationHeaderValue(rawToken: string): string {
|
|
||||||
const trimmed = rawToken.trim();
|
|
||||||
if (!trimmed) return trimmed;
|
|
||||||
return /^bearer\s+/i.test(trimmed) ? trimmed : `Bearer ${trimmed}`;
|
|
||||||
}
|
|
||||||
|
|
||||||
function resolvePaperclipApiUrlOverride(value: unknown): string | null {
|
|
||||||
const raw = nonEmpty(value);
|
|
||||||
if (!raw) return null;
|
|
||||||
try {
|
|
||||||
const parsed = new URL(raw);
|
|
||||||
if (parsed.protocol !== "http:" && parsed.protocol !== "https:") return null;
|
|
||||||
return parsed.toString();
|
|
||||||
} catch {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function normalizeSessionKeyStrategy(value: unknown): SessionKeyStrategy {
|
|
||||||
const normalized = asString(value, "fixed").trim().toLowerCase();
|
|
||||||
if (normalized === "issue" || normalized === "run") return normalized;
|
|
||||||
return "fixed";
|
|
||||||
}
|
|
||||||
|
|
||||||
function resolveSessionKey(input: {
|
|
||||||
strategy: SessionKeyStrategy;
|
|
||||||
configuredSessionKey: string | null;
|
|
||||||
runId: string;
|
|
||||||
issueId: string | null;
|
|
||||||
}): string {
|
|
||||||
const fallback = input.configuredSessionKey ?? "paperclip";
|
|
||||||
if (input.strategy === "run") return `paperclip:run:${input.runId}`;
|
|
||||||
if (input.strategy === "issue" && input.issueId) return `paperclip:issue:${input.issueId}`;
|
|
||||||
return fallback;
|
|
||||||
}
|
|
||||||
|
|
||||||
function isWakeCompatibilityEndpoint(url: string): boolean {
|
|
||||||
try {
|
|
||||||
const parsed = new URL(url);
|
|
||||||
const path = parsed.pathname.toLowerCase();
|
|
||||||
return path === "/hooks/wake" || path.endsWith("/hooks/wake");
|
|
||||||
} catch {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function isOpenResponsesEndpoint(url: string): boolean {
|
|
||||||
try {
|
|
||||||
const parsed = new URL(url);
|
|
||||||
const path = parsed.pathname.toLowerCase();
|
|
||||||
return path === "/v1/responses" || path.endsWith("/v1/responses");
|
|
||||||
} catch {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
function toStringRecord(value: unknown): Record<string, string> {
|
|
||||||
const parsed = parseObject(value);
|
|
||||||
const out: Record<string, string> = {};
|
|
||||||
for (const [key, entry] of Object.entries(parsed)) {
|
|
||||||
if (typeof entry === "string") {
|
|
||||||
out[key] = entry;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
|
|
||||||
const SENSITIVE_LOG_KEY_PATTERN =
|
|
||||||
/(^|[_-])(auth|authorization|token|secret|password|api[_-]?key|private[_-]?key)([_-]|$)|^x-openclaw-auth$/i;
|
|
||||||
|
|
||||||
function isSensitiveLogKey(key: string): boolean {
|
|
||||||
return SENSITIVE_LOG_KEY_PATTERN.test(key.trim());
|
|
||||||
}
|
|
||||||
|
|
||||||
function sha256Prefix(value: string): string {
|
|
||||||
return createHash("sha256").update(value).digest("hex").slice(0, 12);
|
|
||||||
}
|
|
||||||
|
|
||||||
function redactSecretForLog(value: string): string {
|
|
||||||
return `[redacted len=${value.length} sha256=${sha256Prefix(value)}]`;
|
|
||||||
}
|
|
||||||
|
|
||||||
function truncateForLog(value: string, maxChars = 320): string {
|
|
||||||
if (value.length <= maxChars) return value;
|
|
||||||
return `${value.slice(0, maxChars)}... [truncated ${value.length - maxChars} chars]`;
|
|
||||||
}
|
|
||||||
|
|
||||||
function redactForLog(value: unknown, keyPath: string[] = [], depth = 0): unknown {
|
|
||||||
const currentKey = keyPath[keyPath.length - 1] ?? "";
|
|
||||||
if (typeof value === "string") {
|
|
||||||
if (isSensitiveLogKey(currentKey)) return redactSecretForLog(value);
|
|
||||||
return truncateForLog(value);
|
|
||||||
}
|
|
||||||
if (typeof value === "number" || typeof value === "boolean" || value == null) {
|
|
||||||
return value;
|
|
||||||
}
|
|
||||||
if (Array.isArray(value)) {
|
|
||||||
if (depth >= 6) return "[array-truncated]";
|
|
||||||
const out = value.slice(0, 20).map((entry, index) => redactForLog(entry, [...keyPath, `${index}`], depth + 1));
|
|
||||||
if (value.length > 20) out.push(`[+${value.length - 20} more items]`);
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
if (typeof value === "object") {
|
|
||||||
if (depth >= 6) return "[object-truncated]";
|
|
||||||
const entries = Object.entries(value as Record<string, unknown>);
|
|
||||||
const out: Record<string, unknown> = {};
|
|
||||||
for (const [key, entry] of entries.slice(0, 80)) {
|
|
||||||
out[key] = redactForLog(entry, [...keyPath, key], depth + 1);
|
|
||||||
}
|
|
||||||
if (entries.length > 80) {
|
|
||||||
out.__truncated__ = `+${entries.length - 80} keys`;
|
|
||||||
}
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
return String(value);
|
|
||||||
}
|
|
||||||
|
|
||||||
function stringifyForLog(value: unknown, maxChars: number): string {
|
|
||||||
const text = JSON.stringify(value);
|
|
||||||
if (text.length <= maxChars) return text;
|
|
||||||
return `${text.slice(0, maxChars)}... [truncated ${text.length - maxChars} chars]`;
|
|
||||||
}
|
|
||||||
|
|
||||||
type WakePayload = {
|
|
||||||
runId: string;
|
|
||||||
agentId: string;
|
|
||||||
companyId: string;
|
|
||||||
taskId: string | null;
|
|
||||||
issueId: string | null;
|
|
||||||
wakeReason: string | null;
|
|
||||||
wakeCommentId: string | null;
|
|
||||||
approvalId: string | null;
|
|
||||||
approvalStatus: string | null;
|
|
||||||
issueIds: string[];
|
|
||||||
};
|
|
||||||
|
|
||||||
function buildWakeText(payload: WakePayload, paperclipEnv: Record<string, string>): string {
|
|
||||||
const claimedApiKeyPath = "~/.openclaw/workspace/paperclip-claimed-api-key.json";
|
|
||||||
const orderedKeys = [
|
|
||||||
"PAPERCLIP_RUN_ID",
|
|
||||||
"PAPERCLIP_AGENT_ID",
|
|
||||||
"PAPERCLIP_COMPANY_ID",
|
|
||||||
"PAPERCLIP_API_URL",
|
|
||||||
"PAPERCLIP_TASK_ID",
|
|
||||||
"PAPERCLIP_WAKE_REASON",
|
|
||||||
"PAPERCLIP_WAKE_COMMENT_ID",
|
|
||||||
"PAPERCLIP_APPROVAL_ID",
|
|
||||||
"PAPERCLIP_APPROVAL_STATUS",
|
|
||||||
"PAPERCLIP_LINKED_ISSUE_IDS",
|
|
||||||
];
|
|
||||||
|
|
||||||
const envLines: string[] = [];
|
|
||||||
for (const key of orderedKeys) {
|
|
||||||
const value = paperclipEnv[key];
|
|
||||||
if (!value) continue;
|
|
||||||
envLines.push(`${key}=${value}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
const lines = [
|
|
||||||
"Paperclip wake event for a cloud adapter.",
|
|
||||||
"",
|
|
||||||
"Set these values in your run context:",
|
|
||||||
...envLines,
|
|
||||||
`PAPERCLIP_API_KEY=<token from ${claimedApiKeyPath}>`,
|
|
||||||
"",
|
|
||||||
`Load PAPERCLIP_API_KEY from ${claimedApiKeyPath} (the token you saved after claim-api-key).`,
|
|
||||||
"",
|
|
||||||
`task_id=${payload.taskId ?? ""}`,
|
|
||||||
`issue_id=${payload.issueId ?? ""}`,
|
|
||||||
`wake_reason=${payload.wakeReason ?? ""}`,
|
|
||||||
`wake_comment_id=${payload.wakeCommentId ?? ""}`,
|
|
||||||
`approval_id=${payload.approvalId ?? ""}`,
|
|
||||||
`approval_status=${payload.approvalStatus ?? ""}`,
|
|
||||||
`linked_issue_ids=${payload.issueIds.join(",")}`,
|
|
||||||
];
|
|
||||||
|
|
||||||
lines.push("", "Run your Paperclip heartbeat procedure now.");
|
|
||||||
return lines.join("\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
function appendWakeText(baseText: string, wakeText: string): string {
|
|
||||||
const trimmedBase = baseText.trim();
|
|
||||||
return trimmedBase.length > 0 ? `${trimmedBase}\n\n${wakeText}` : wakeText;
|
|
||||||
}
|
|
||||||
|
|
||||||
function buildOpenResponsesWakeInputMessage(wakeText: string): Record<string, unknown> {
|
|
||||||
return {
|
|
||||||
type: "message",
|
|
||||||
role: "user",
|
|
||||||
content: [
|
|
||||||
{
|
|
||||||
type: "input_text",
|
|
||||||
text: wakeText,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
function appendWakeTextToOpenResponsesInput(input: unknown, wakeText: string): unknown {
|
|
||||||
if (typeof input === "string") {
|
|
||||||
return appendWakeText(input, wakeText);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (Array.isArray(input)) {
|
|
||||||
return [...input, buildOpenResponsesWakeInputMessage(wakeText)];
|
|
||||||
}
|
|
||||||
|
|
||||||
if (typeof input === "object" && input !== null) {
|
|
||||||
const parsed = parseObject(input);
|
|
||||||
const content = parsed.content;
|
|
||||||
if (typeof content === "string") {
|
|
||||||
return {
|
|
||||||
...parsed,
|
|
||||||
content: appendWakeText(content, wakeText),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
if (Array.isArray(content)) {
|
|
||||||
return {
|
|
||||||
...parsed,
|
|
||||||
content: [
|
|
||||||
...content,
|
|
||||||
{
|
|
||||||
type: "input_text",
|
|
||||||
text: wakeText,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
};
|
|
||||||
}
|
|
||||||
return [parsed, buildOpenResponsesWakeInputMessage(wakeText)];
|
|
||||||
}
|
|
||||||
|
|
||||||
return wakeText;
|
|
||||||
}
|
|
||||||
|
|
||||||
function isTextRequiredResponse(responseText: string): boolean {
|
|
||||||
const parsed = parseOpenClawResponse(responseText);
|
|
||||||
const parsedError = parsed && typeof parsed.error === "string" ? parsed.error : null;
|
|
||||||
if (parsedError && parsedError.toLowerCase().includes("text required")) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return responseText.toLowerCase().includes("text required");
|
|
||||||
}
|
|
||||||
|
|
||||||
async function sendJsonRequest(params: {
|
|
||||||
url: string;
|
|
||||||
method: string;
|
|
||||||
headers: Record<string, string>;
|
|
||||||
payload: Record<string, unknown>;
|
|
||||||
signal: AbortSignal;
|
|
||||||
}): Promise<Response> {
|
|
||||||
return fetch(params.url, {
|
|
||||||
method: params.method,
|
|
||||||
headers: params.headers,
|
|
||||||
body: JSON.stringify(params.payload),
|
|
||||||
signal: params.signal,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async function readAndLogResponseText(params: {
|
|
||||||
response: Response;
|
|
||||||
onLog: AdapterExecutionContext["onLog"];
|
|
||||||
}): Promise<string> {
|
|
||||||
const responseText = await params.response.text();
|
|
||||||
if (responseText.trim().length > 0) {
|
|
||||||
await params.onLog(
|
|
||||||
"stdout",
|
|
||||||
`[openclaw] response (${params.response.status}) ${responseText.slice(0, 2000)}\n`,
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
await params.onLog("stdout", `[openclaw] response (${params.response.status}) <empty>\n`);
|
|
||||||
}
|
|
||||||
return responseText;
|
|
||||||
}
|
|
||||||
|
|
||||||
type ConsumedSse = {
|
|
||||||
eventCount: number;
|
|
||||||
lastEventType: string | null;
|
|
||||||
lastData: string | null;
|
|
||||||
lastPayload: Record<string, unknown> | null;
|
|
||||||
terminal: boolean;
|
|
||||||
failed: boolean;
|
|
||||||
errorMessage: string | null;
|
|
||||||
};
|
|
||||||
|
|
||||||
function inferSseTerminal(input: {
|
|
||||||
eventType: string;
|
|
||||||
data: string;
|
|
||||||
parsedPayload: Record<string, unknown> | null;
|
|
||||||
}): { terminal: boolean; failed: boolean; errorMessage: string | null } {
|
|
||||||
const normalizedType = input.eventType.trim().toLowerCase();
|
|
||||||
const trimmedData = input.data.trim();
|
|
||||||
const payload = input.parsedPayload;
|
|
||||||
const payloadType = nonEmpty(payload?.type)?.toLowerCase() ?? null;
|
|
||||||
const payloadStatus = nonEmpty(payload?.status)?.toLowerCase() ?? null;
|
|
||||||
|
|
||||||
if (trimmedData === "[DONE]") {
|
|
||||||
return { terminal: true, failed: false, errorMessage: null };
|
|
||||||
}
|
|
||||||
|
|
||||||
const failType =
|
|
||||||
normalizedType.includes("error") ||
|
|
||||||
normalizedType.includes("failed") ||
|
|
||||||
normalizedType.includes("cancel");
|
|
||||||
if (failType) {
|
|
||||||
return {
|
|
||||||
terminal: true,
|
|
||||||
failed: true,
|
|
||||||
errorMessage:
|
|
||||||
nonEmpty(payload?.error) ??
|
|
||||||
nonEmpty(payload?.message) ??
|
|
||||||
(trimmedData.length > 0 ? trimmedData : "OpenClaw SSE error"),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
const doneType =
|
|
||||||
normalizedType === "done" ||
|
|
||||||
normalizedType.endsWith(".completed") ||
|
|
||||||
normalizedType === "completed";
|
|
||||||
if (doneType) {
|
|
||||||
return { terminal: true, failed: false, errorMessage: null };
|
|
||||||
}
|
|
||||||
|
|
||||||
if (payloadStatus) {
|
|
||||||
if (
|
|
||||||
payloadStatus === "completed" ||
|
|
||||||
payloadStatus === "succeeded" ||
|
|
||||||
payloadStatus === "done"
|
|
||||||
) {
|
|
||||||
return { terminal: true, failed: false, errorMessage: null };
|
|
||||||
}
|
|
||||||
if (
|
|
||||||
payloadStatus === "failed" ||
|
|
||||||
payloadStatus === "cancelled" ||
|
|
||||||
payloadStatus === "error"
|
|
||||||
) {
|
|
||||||
return {
|
|
||||||
terminal: true,
|
|
||||||
failed: true,
|
|
||||||
errorMessage:
|
|
||||||
nonEmpty(payload?.error) ??
|
|
||||||
nonEmpty(payload?.message) ??
|
|
||||||
`OpenClaw SSE status ${payloadStatus}`,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (payloadType) {
|
|
||||||
if (payloadType.endsWith(".completed")) {
|
|
||||||
return { terminal: true, failed: false, errorMessage: null };
|
|
||||||
}
|
|
||||||
if (
|
|
||||||
payloadType.endsWith(".failed") ||
|
|
||||||
payloadType.endsWith(".cancelled") ||
|
|
||||||
payloadType.endsWith(".error")
|
|
||||||
) {
|
|
||||||
return {
|
|
||||||
terminal: true,
|
|
||||||
failed: true,
|
|
||||||
errorMessage:
|
|
||||||
nonEmpty(payload?.error) ??
|
|
||||||
nonEmpty(payload?.message) ??
|
|
||||||
`OpenClaw SSE type ${payloadType}`,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (payload?.done === true) {
|
|
||||||
return { terminal: true, failed: false, errorMessage: null };
|
|
||||||
}
|
|
||||||
|
|
||||||
return { terminal: false, failed: false, errorMessage: null };
|
|
||||||
}
|
|
||||||
|
|
||||||
async function consumeSseResponse(params: {
|
|
||||||
response: Response;
|
|
||||||
onLog: AdapterExecutionContext["onLog"];
|
|
||||||
}): Promise<ConsumedSse> {
|
|
||||||
const reader = params.response.body?.getReader();
|
|
||||||
if (!reader) {
|
|
||||||
throw new Error("OpenClaw SSE response body is missing");
|
|
||||||
}
|
|
||||||
|
|
||||||
const decoder = new TextDecoder();
|
|
||||||
let buffer = "";
|
|
||||||
let eventType = "message";
|
|
||||||
let dataLines: string[] = [];
|
|
||||||
let eventCount = 0;
|
|
||||||
let lastEventType: string | null = null;
|
|
||||||
let lastData: string | null = null;
|
|
||||||
let lastPayload: Record<string, unknown> | null = null;
|
|
||||||
let terminal = false;
|
|
||||||
let failed = false;
|
|
||||||
let errorMessage: string | null = null;
|
|
||||||
|
|
||||||
const dispatchEvent = async (): Promise<boolean> => {
|
|
||||||
if (dataLines.length === 0) {
|
|
||||||
eventType = "message";
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
const data = dataLines.join("\n");
|
|
||||||
const trimmedData = data.trim();
|
|
||||||
const parsedPayload = parseOpenClawResponse(trimmedData);
|
|
||||||
|
|
||||||
eventCount += 1;
|
|
||||||
lastEventType = eventType;
|
|
||||||
lastData = data;
|
|
||||||
if (parsedPayload) lastPayload = parsedPayload;
|
|
||||||
|
|
||||||
const preview =
|
|
||||||
trimmedData.length > 1000 ? `${trimmedData.slice(0, 1000)}...` : trimmedData;
|
|
||||||
await params.onLog("stdout", `[openclaw:sse] event=${eventType} data=${preview}\n`);
|
|
||||||
|
|
||||||
const resolution = inferSseTerminal({
|
|
||||||
eventType,
|
|
||||||
data,
|
|
||||||
parsedPayload,
|
|
||||||
});
|
|
||||||
|
|
||||||
dataLines = [];
|
|
||||||
eventType = "message";
|
|
||||||
|
|
||||||
if (resolution.terminal) {
|
|
||||||
terminal = true;
|
|
||||||
failed = resolution.failed;
|
|
||||||
errorMessage = resolution.errorMessage;
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
|
|
||||||
let shouldStop = false;
|
|
||||||
while (!shouldStop) {
|
|
||||||
const { done, value } = await reader.read();
|
|
||||||
if (done) break;
|
|
||||||
|
|
||||||
buffer += decoder.decode(value, { stream: true });
|
|
||||||
|
|
||||||
while (!shouldStop) {
|
|
||||||
const newlineIndex = buffer.indexOf("\n");
|
|
||||||
if (newlineIndex === -1) break;
|
|
||||||
|
|
||||||
let line = buffer.slice(0, newlineIndex);
|
|
||||||
buffer = buffer.slice(newlineIndex + 1);
|
|
||||||
if (line.endsWith("\r")) line = line.slice(0, -1);
|
|
||||||
|
|
||||||
if (line.length === 0) {
|
|
||||||
shouldStop = await dispatchEvent();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (line.startsWith(":")) continue;
|
|
||||||
|
|
||||||
const colonIndex = line.indexOf(":");
|
|
||||||
const field = colonIndex === -1 ? line : line.slice(0, colonIndex);
|
|
||||||
const rawValue =
|
|
||||||
colonIndex === -1 ? "" : line.slice(colonIndex + 1).replace(/^ /, "");
|
|
||||||
|
|
||||||
if (field === "event") {
|
|
||||||
eventType = rawValue || "message";
|
|
||||||
} else if (field === "data") {
|
|
||||||
dataLines.push(rawValue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
buffer += decoder.decode();
|
|
||||||
if (!shouldStop && buffer.trim().length > 0) {
|
|
||||||
for (const rawLine of buffer.split(/\r?\n/)) {
|
|
||||||
const line = rawLine.trimEnd();
|
|
||||||
if (line.length === 0) {
|
|
||||||
shouldStop = await dispatchEvent();
|
|
||||||
if (shouldStop) break;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (line.startsWith(":")) continue;
|
|
||||||
|
|
||||||
const colonIndex = line.indexOf(":");
|
|
||||||
const field = colonIndex === -1 ? line : line.slice(0, colonIndex);
|
|
||||||
const rawValue =
|
|
||||||
colonIndex === -1 ? "" : line.slice(colonIndex + 1).replace(/^ /, "");
|
|
||||||
|
|
||||||
if (field === "event") {
|
|
||||||
eventType = rawValue || "message";
|
|
||||||
} else if (field === "data") {
|
|
||||||
dataLines.push(rawValue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!shouldStop && dataLines.length > 0) {
|
|
||||||
await dispatchEvent();
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
|
||||||
eventCount,
|
|
||||||
lastEventType,
|
|
||||||
lastData,
|
|
||||||
lastPayload,
|
|
||||||
terminal,
|
|
||||||
failed,
|
|
||||||
errorMessage,
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExecutionResult> {
|
export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExecutionResult> {
|
||||||
const { config, runId, agent, context, onLog, onMeta } = ctx;
|
const url = asString(ctx.config.url, "").trim();
|
||||||
const url = asString(config.url, "").trim();
|
|
||||||
if (!url) {
|
if (!url) {
|
||||||
return {
|
return {
|
||||||
exitCode: 1,
|
exitCode: 1,
|
||||||
@@ -528,289 +23,31 @@ export async function execute(ctx: AdapterExecutionContext): Promise<AdapterExec
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isWakeCompatibilityEndpoint(url)) {
|
const transportInput = ctx.config.streamTransport ?? ctx.config.transport;
|
||||||
|
const transport = normalizeTransport(transportInput);
|
||||||
|
if (!transport) {
|
||||||
return {
|
return {
|
||||||
exitCode: 1,
|
exitCode: 1,
|
||||||
signal: null,
|
signal: null,
|
||||||
timedOut: false,
|
timedOut: false,
|
||||||
errorMessage: "OpenClaw /hooks/wake is not stream-capable. Use a streaming endpoint.",
|
errorMessage: `OpenClaw adapter does not support transport: ${String(transportInput)}`,
|
||||||
errorCode: "openclaw_sse_incompatible_endpoint",
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
const streamTransport = asString(config.streamTransport, "sse").trim().toLowerCase();
|
|
||||||
if (streamTransport && streamTransport !== "sse") {
|
|
||||||
return {
|
|
||||||
exitCode: 1,
|
|
||||||
signal: null,
|
|
||||||
timedOut: false,
|
|
||||||
errorMessage: "OpenClaw adapter only supports streamTransport=sse.",
|
|
||||||
errorCode: "openclaw_stream_transport_unsupported",
|
errorCode: "openclaw_stream_transport_unsupported",
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
const method = asString(config.method, "POST").trim().toUpperCase() || "POST";
|
if (transport === "sse" && isWakeCompatibilityEndpoint(url)) {
|
||||||
const timeoutSecRaw = asNumber(config.timeoutSec, 0);
|
|
||||||
const timeoutSec = timeoutSecRaw > 0 ? Math.max(1, Math.floor(timeoutSecRaw)) : 0;
|
|
||||||
const headersConfig = parseObject(config.headers) as Record<string, unknown>;
|
|
||||||
const payloadTemplate = parseObject(config.payloadTemplate);
|
|
||||||
const webhookAuthHeader = nonEmpty(config.webhookAuthHeader);
|
|
||||||
const sessionKeyStrategy = normalizeSessionKeyStrategy(config.sessionKeyStrategy);
|
|
||||||
|
|
||||||
const headers: Record<string, string> = {
|
|
||||||
"content-type": "application/json",
|
|
||||||
};
|
|
||||||
for (const [key, value] of Object.entries(headersConfig)) {
|
|
||||||
if (typeof value === "string" && value.trim().length > 0) {
|
|
||||||
headers[key] = value;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
const openClawAuthHeader = nonEmpty(headers["x-openclaw-auth"] ?? headers["X-OpenClaw-Auth"]);
|
|
||||||
if (openClawAuthHeader && !headers.authorization && !headers.Authorization) {
|
|
||||||
headers.authorization = toAuthorizationHeaderValue(openClawAuthHeader);
|
|
||||||
}
|
|
||||||
if (webhookAuthHeader && !headers.authorization && !headers.Authorization) {
|
|
||||||
headers.authorization = webhookAuthHeader;
|
|
||||||
}
|
|
||||||
|
|
||||||
const wakePayload = {
|
|
||||||
runId,
|
|
||||||
agentId: agent.id,
|
|
||||||
companyId: agent.companyId,
|
|
||||||
taskId: nonEmpty(context.taskId) ?? nonEmpty(context.issueId),
|
|
||||||
issueId: nonEmpty(context.issueId),
|
|
||||||
wakeReason: nonEmpty(context.wakeReason),
|
|
||||||
wakeCommentId: nonEmpty(context.wakeCommentId) ?? nonEmpty(context.commentId),
|
|
||||||
approvalId: nonEmpty(context.approvalId),
|
|
||||||
approvalStatus: nonEmpty(context.approvalStatus),
|
|
||||||
issueIds: Array.isArray(context.issueIds)
|
|
||||||
? context.issueIds.filter(
|
|
||||||
(value): value is string => typeof value === "string" && value.trim().length > 0,
|
|
||||||
)
|
|
||||||
: [],
|
|
||||||
};
|
|
||||||
|
|
||||||
const sessionKey = resolveSessionKey({
|
|
||||||
strategy: sessionKeyStrategy,
|
|
||||||
configuredSessionKey: nonEmpty(config.sessionKey),
|
|
||||||
runId,
|
|
||||||
issueId: wakePayload.issueId ?? wakePayload.taskId,
|
|
||||||
});
|
|
||||||
|
|
||||||
const templateText = nonEmpty(payloadTemplate.text);
|
|
||||||
const paperclipApiUrlOverride = resolvePaperclipApiUrlOverride(config.paperclipApiUrl);
|
|
||||||
const paperclipEnv: Record<string, string> = {
|
|
||||||
...buildPaperclipEnv(agent),
|
|
||||||
PAPERCLIP_RUN_ID: runId,
|
|
||||||
};
|
|
||||||
if (paperclipApiUrlOverride) {
|
|
||||||
paperclipEnv.PAPERCLIP_API_URL = paperclipApiUrlOverride;
|
|
||||||
}
|
|
||||||
if (wakePayload.taskId) paperclipEnv.PAPERCLIP_TASK_ID = wakePayload.taskId;
|
|
||||||
if (wakePayload.wakeReason) paperclipEnv.PAPERCLIP_WAKE_REASON = wakePayload.wakeReason;
|
|
||||||
if (wakePayload.wakeCommentId) paperclipEnv.PAPERCLIP_WAKE_COMMENT_ID = wakePayload.wakeCommentId;
|
|
||||||
if (wakePayload.approvalId) paperclipEnv.PAPERCLIP_APPROVAL_ID = wakePayload.approvalId;
|
|
||||||
if (wakePayload.approvalStatus) paperclipEnv.PAPERCLIP_APPROVAL_STATUS = wakePayload.approvalStatus;
|
|
||||||
if (wakePayload.issueIds.length > 0) {
|
|
||||||
paperclipEnv.PAPERCLIP_LINKED_ISSUE_IDS = wakePayload.issueIds.join(",");
|
|
||||||
}
|
|
||||||
|
|
||||||
const wakeText = buildWakeText(wakePayload, paperclipEnv);
|
|
||||||
const payloadText = templateText ? `${templateText}\n\n${wakeText}` : wakeText;
|
|
||||||
const isOpenResponses = isOpenResponsesEndpoint(url);
|
|
||||||
const openResponsesInput = Object.prototype.hasOwnProperty.call(payloadTemplate, "input")
|
|
||||||
? appendWakeTextToOpenResponsesInput(payloadTemplate.input, wakeText)
|
|
||||||
: payloadText;
|
|
||||||
|
|
||||||
const paperclipBody: Record<string, unknown> = isOpenResponses
|
|
||||||
? {
|
|
||||||
...payloadTemplate,
|
|
||||||
stream: true,
|
|
||||||
model:
|
|
||||||
nonEmpty(payloadTemplate.model) ??
|
|
||||||
nonEmpty(config.model) ??
|
|
||||||
"openclaw",
|
|
||||||
input: openResponsesInput,
|
|
||||||
metadata: {
|
|
||||||
...toStringRecord(payloadTemplate.metadata),
|
|
||||||
...paperclipEnv,
|
|
||||||
paperclip_session_key: sessionKey,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
: {
|
|
||||||
...payloadTemplate,
|
|
||||||
stream: true,
|
|
||||||
sessionKey,
|
|
||||||
text: payloadText,
|
|
||||||
paperclip: {
|
|
||||||
...wakePayload,
|
|
||||||
sessionKey,
|
|
||||||
streamTransport: "sse",
|
|
||||||
env: paperclipEnv,
|
|
||||||
context,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
if (isOpenResponses) {
|
|
||||||
delete paperclipBody.text;
|
|
||||||
delete paperclipBody.sessionKey;
|
|
||||||
delete paperclipBody.paperclip;
|
|
||||||
if (!headers["x-openclaw-session-key"] && !headers["X-OpenClaw-Session-Key"]) {
|
|
||||||
headers["x-openclaw-session-key"] = sessionKey;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (onMeta) {
|
|
||||||
await onMeta({
|
|
||||||
adapterType: "openclaw",
|
|
||||||
command: "sse",
|
|
||||||
commandArgs: [method, url],
|
|
||||||
context,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
const outboundHeaderKeys = Array.from(new Set([...Object.keys(headers), "accept"])).sort();
|
|
||||||
await onLog(
|
|
||||||
"stdout",
|
|
||||||
`[openclaw] outbound headers (redacted): ${stringifyForLog(redactForLog(headers), 4_000)}\n`,
|
|
||||||
);
|
|
||||||
await onLog(
|
|
||||||
"stdout",
|
|
||||||
`[openclaw] outbound payload (redacted): ${stringifyForLog(redactForLog(paperclipBody), 12_000)}\n`,
|
|
||||||
);
|
|
||||||
await onLog("stdout", `[openclaw] outbound header keys: ${outboundHeaderKeys.join(", ")}\n`);
|
|
||||||
await onLog("stdout", `[openclaw] invoking ${method} ${url} (transport=sse)\n`);
|
|
||||||
|
|
||||||
const controller = new AbortController();
|
|
||||||
const timeout = timeoutSec > 0 ? setTimeout(() => controller.abort(), timeoutSec * 1000) : null;
|
|
||||||
|
|
||||||
try {
|
|
||||||
const response = await sendJsonRequest({
|
|
||||||
url,
|
|
||||||
method,
|
|
||||||
headers: {
|
|
||||||
...headers,
|
|
||||||
accept: "text/event-stream",
|
|
||||||
},
|
|
||||||
payload: paperclipBody,
|
|
||||||
signal: controller.signal,
|
|
||||||
});
|
|
||||||
|
|
||||||
if (!response.ok) {
|
|
||||||
const responseText = await readAndLogResponseText({ response, onLog });
|
|
||||||
return {
|
|
||||||
exitCode: 1,
|
|
||||||
signal: null,
|
|
||||||
timedOut: false,
|
|
||||||
errorMessage:
|
|
||||||
isTextRequiredResponse(responseText)
|
|
||||||
? "OpenClaw endpoint rejected the payload as text-required."
|
|
||||||
: `OpenClaw SSE request failed with status ${response.status}`,
|
|
||||||
errorCode: isTextRequiredResponse(responseText)
|
|
||||||
? "openclaw_text_required"
|
|
||||||
: "openclaw_http_error",
|
|
||||||
resultJson: {
|
|
||||||
status: response.status,
|
|
||||||
statusText: response.statusText,
|
|
||||||
response: parseOpenClawResponse(responseText) ?? responseText,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
const contentType = (response.headers.get("content-type") ?? "").toLowerCase();
|
|
||||||
if (!contentType.includes("text/event-stream")) {
|
|
||||||
const responseText = await readAndLogResponseText({ response, onLog });
|
|
||||||
return {
|
|
||||||
exitCode: 1,
|
|
||||||
signal: null,
|
|
||||||
timedOut: false,
|
|
||||||
errorMessage: "OpenClaw SSE endpoint did not return text/event-stream",
|
|
||||||
errorCode: "openclaw_sse_expected_event_stream",
|
|
||||||
resultJson: {
|
|
||||||
status: response.status,
|
|
||||||
statusText: response.statusText,
|
|
||||||
contentType,
|
|
||||||
response: parseOpenClawResponse(responseText) ?? responseText,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
const consumed = await consumeSseResponse({ response, onLog });
|
|
||||||
if (consumed.failed) {
|
|
||||||
return {
|
|
||||||
exitCode: 1,
|
|
||||||
signal: null,
|
|
||||||
timedOut: false,
|
|
||||||
errorMessage: consumed.errorMessage ?? "OpenClaw SSE stream failed",
|
|
||||||
errorCode: "openclaw_sse_stream_failed",
|
|
||||||
resultJson: {
|
|
||||||
eventCount: consumed.eventCount,
|
|
||||||
terminal: consumed.terminal,
|
|
||||||
lastEventType: consumed.lastEventType,
|
|
||||||
lastData: consumed.lastData,
|
|
||||||
response: consumed.lastPayload ?? consumed.lastData,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!consumed.terminal) {
|
|
||||||
return {
|
|
||||||
exitCode: 1,
|
|
||||||
signal: null,
|
|
||||||
timedOut: false,
|
|
||||||
errorMessage: "OpenClaw SSE stream closed without a terminal event",
|
|
||||||
errorCode: "openclaw_sse_stream_incomplete",
|
|
||||||
resultJson: {
|
|
||||||
eventCount: consumed.eventCount,
|
|
||||||
terminal: consumed.terminal,
|
|
||||||
lastEventType: consumed.lastEventType,
|
|
||||||
lastData: consumed.lastData,
|
|
||||||
response: consumed.lastPayload ?? consumed.lastData,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
|
||||||
exitCode: 0,
|
|
||||||
signal: null,
|
|
||||||
timedOut: false,
|
|
||||||
provider: "openclaw",
|
|
||||||
model: null,
|
|
||||||
summary: `OpenClaw SSE ${method} ${url}`,
|
|
||||||
resultJson: {
|
|
||||||
eventCount: consumed.eventCount,
|
|
||||||
terminal: consumed.terminal,
|
|
||||||
lastEventType: consumed.lastEventType,
|
|
||||||
lastData: consumed.lastData,
|
|
||||||
response: consumed.lastPayload ?? consumed.lastData,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
} catch (err) {
|
|
||||||
if (err instanceof Error && err.name === "AbortError") {
|
|
||||||
const timeoutMessage =
|
|
||||||
timeoutSec > 0
|
|
||||||
? `[openclaw] SSE request timed out after ${timeoutSec}s\n`
|
|
||||||
: "[openclaw] SSE request aborted\n";
|
|
||||||
await onLog("stderr", timeoutMessage);
|
|
||||||
return {
|
|
||||||
exitCode: null,
|
|
||||||
signal: null,
|
|
||||||
timedOut: true,
|
|
||||||
errorMessage: timeoutSec > 0 ? `Timed out after ${timeoutSec}s` : "Request aborted",
|
|
||||||
errorCode: "openclaw_sse_timeout",
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
const message = err instanceof Error ? err.message : String(err);
|
|
||||||
await onLog("stderr", `[openclaw] request failed: ${message}\n`);
|
|
||||||
return {
|
return {
|
||||||
exitCode: 1,
|
exitCode: 1,
|
||||||
signal: null,
|
signal: null,
|
||||||
timedOut: false,
|
timedOut: false,
|
||||||
errorMessage: message,
|
errorMessage: "OpenClaw /hooks/wake is not stream-capable. Use SSE transport with a streaming endpoint.",
|
||||||
errorCode: "openclaw_request_failed",
|
errorCode: "openclaw_sse_incompatible_endpoint",
|
||||||
};
|
};
|
||||||
} finally {
|
|
||||||
if (timeout) clearTimeout(timeout);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (transport === "webhook") {
|
||||||
|
return executeWebhook(ctx, url);
|
||||||
|
}
|
||||||
|
|
||||||
|
return executeSse(ctx, url);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -34,6 +34,13 @@ function isWakePath(pathname: string): boolean {
|
|||||||
return value === "/hooks/wake" || value.endsWith("/hooks/wake");
|
return value === "/hooks/wake" || value.endsWith("/hooks/wake");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function normalizeTransport(value: unknown): "sse" | "webhook" | null {
|
||||||
|
const normalized = asString(value, "sse").trim().toLowerCase();
|
||||||
|
if (!normalized || normalized === "sse") return "sse";
|
||||||
|
if (normalized === "webhook") return "webhook";
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
function pushDeploymentDiagnostics(
|
function pushDeploymentDiagnostics(
|
||||||
checks: AdapterEnvironmentCheck[],
|
checks: AdapterEnvironmentCheck[],
|
||||||
ctx: AdapterEnvironmentTestContext,
|
ctx: AdapterEnvironmentTestContext,
|
||||||
@@ -102,13 +109,15 @@ export async function testEnvironment(
|
|||||||
const checks: AdapterEnvironmentCheck[] = [];
|
const checks: AdapterEnvironmentCheck[] = [];
|
||||||
const config = parseObject(ctx.config);
|
const config = parseObject(ctx.config);
|
||||||
const urlValue = asString(config.url, "");
|
const urlValue = asString(config.url, "");
|
||||||
|
const streamTransportValue = config.streamTransport ?? config.transport;
|
||||||
|
const streamTransport = normalizeTransport(streamTransportValue);
|
||||||
|
|
||||||
if (!urlValue) {
|
if (!urlValue) {
|
||||||
checks.push({
|
checks.push({
|
||||||
code: "openclaw_url_missing",
|
code: "openclaw_url_missing",
|
||||||
level: "error",
|
level: "error",
|
||||||
message: "OpenClaw adapter requires a streaming endpoint URL.",
|
message: "OpenClaw adapter requires an endpoint URL.",
|
||||||
hint: "Set adapterConfig.url to your OpenClaw SSE endpoint.",
|
hint: "Set adapterConfig.url to your OpenClaw transport endpoint.",
|
||||||
});
|
});
|
||||||
return {
|
return {
|
||||||
adapterType: ctx.adapterType,
|
adapterType: ctx.adapterType,
|
||||||
@@ -154,23 +163,28 @@ export async function testEnvironment(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isWakePath(url.pathname)) {
|
if (streamTransport === "sse" && isWakePath(url.pathname)) {
|
||||||
checks.push({
|
checks.push({
|
||||||
code: "openclaw_wake_endpoint_incompatible",
|
code: "openclaw_wake_endpoint_incompatible",
|
||||||
level: "error",
|
level: "error",
|
||||||
message: "Endpoint targets /hooks/wake, which is not stream-capable for strict SSE mode.",
|
message: "Endpoint targets /hooks/wake, which is not stream-capable for SSE transport.",
|
||||||
hint: "Use an endpoint that returns text/event-stream for the full run duration.",
|
hint: "Use an endpoint that returns text/event-stream for the full run duration.",
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const streamTransport = asString(config.streamTransport, "sse").trim().toLowerCase();
|
if (!streamTransport) {
|
||||||
if (streamTransport && streamTransport !== "sse") {
|
|
||||||
checks.push({
|
checks.push({
|
||||||
code: "openclaw_stream_transport_unsupported",
|
code: "openclaw_stream_transport_unsupported",
|
||||||
level: "error",
|
level: "error",
|
||||||
message: `Unsupported streamTransport: ${streamTransport}`,
|
message: `Unsupported streamTransport: ${String(streamTransportValue)}`,
|
||||||
hint: "OpenClaw adapter now requires streamTransport=sse.",
|
hint: "Use streamTransport=sse or streamTransport=webhook.",
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
checks.push({
|
||||||
|
code: "openclaw_stream_transport_configured",
|
||||||
|
level: "info",
|
||||||
|
message: `Configured stream transport: ${streamTransport}`,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -159,7 +159,7 @@ describe("openclaw ui stdout parser", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
describe("openclaw adapter execute", () => {
|
describe("openclaw adapter execute", () => {
|
||||||
it("uses strict SSE and includes canonical PAPERCLIP context in text payload", async () => {
|
it("uses SSE transport and includes canonical PAPERCLIP context in text payload", async () => {
|
||||||
const fetchMock = vi.fn().mockResolvedValue(
|
const fetchMock = vi.fn().mockResolvedValue(
|
||||||
sseResponse([
|
sseResponse([
|
||||||
"event: response.completed\n",
|
"event: response.completed\n",
|
||||||
@@ -534,14 +534,109 @@ describe("openclaw adapter execute", () => {
|
|||||||
expect(result.errorCode).toBe("openclaw_text_required");
|
expect(result.errorCode).toBe("openclaw_text_required");
|
||||||
});
|
});
|
||||||
|
|
||||||
it("rejects non-sse transport configuration", async () => {
|
it("supports webhook transport and sends Paperclip webhook payloads", async () => {
|
||||||
|
const fetchMock = vi.fn().mockResolvedValue(
|
||||||
|
new Response(JSON.stringify({ ok: true }), {
|
||||||
|
status: 200,
|
||||||
|
statusText: "OK",
|
||||||
|
headers: {
|
||||||
|
"content-type": "application/json",
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
vi.stubGlobal("fetch", fetchMock);
|
||||||
|
|
||||||
|
const result = await execute(
|
||||||
|
buildContext({
|
||||||
|
url: "https://agent.example/webhook",
|
||||||
|
streamTransport: "webhook",
|
||||||
|
payloadTemplate: { foo: "bar" },
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(result.exitCode).toBe(0);
|
||||||
|
expect(fetchMock).toHaveBeenCalledTimes(1);
|
||||||
|
const body = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}")) as Record<string, unknown>;
|
||||||
|
expect(body.foo).toBe("bar");
|
||||||
|
expect(body.stream).toBe(false);
|
||||||
|
expect(body.sessionKey).toBe("paperclip");
|
||||||
|
expect(String(body.text ?? "")).toContain("PAPERCLIP_RUN_ID=run-123");
|
||||||
|
expect((body.paperclip as Record<string, unknown>).streamTransport).toBe("webhook");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("uses wake compatibility payloads for /hooks/wake when transport=webhook", async () => {
|
||||||
|
const fetchMock = vi.fn().mockResolvedValue(
|
||||||
|
new Response(JSON.stringify({ ok: true }), {
|
||||||
|
status: 200,
|
||||||
|
statusText: "OK",
|
||||||
|
headers: {
|
||||||
|
"content-type": "application/json",
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
vi.stubGlobal("fetch", fetchMock);
|
||||||
|
|
||||||
|
const result = await execute(
|
||||||
|
buildContext({
|
||||||
|
url: "https://agent.example/hooks/wake",
|
||||||
|
streamTransport: "webhook",
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(result.exitCode).toBe(0);
|
||||||
|
const body = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}")) as Record<string, unknown>;
|
||||||
|
expect(body.mode).toBe("now");
|
||||||
|
expect(String(body.text ?? "")).toContain("PAPERCLIP_RUN_ID=run-123");
|
||||||
|
expect(body.paperclip).toBeUndefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("retries webhook payloads with wake compatibility format on text-required errors", async () => {
|
||||||
|
const fetchMock = vi
|
||||||
|
.fn()
|
||||||
|
.mockResolvedValueOnce(
|
||||||
|
new Response(JSON.stringify({ error: "text required" }), {
|
||||||
|
status: 400,
|
||||||
|
statusText: "Bad Request",
|
||||||
|
headers: {
|
||||||
|
"content-type": "application/json",
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
.mockResolvedValueOnce(
|
||||||
|
new Response(JSON.stringify({ ok: true }), {
|
||||||
|
status: 200,
|
||||||
|
statusText: "OK",
|
||||||
|
headers: {
|
||||||
|
"content-type": "application/json",
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
vi.stubGlobal("fetch", fetchMock);
|
||||||
|
|
||||||
|
const result = await execute(
|
||||||
|
buildContext({
|
||||||
|
url: "https://agent.example/v1/responses",
|
||||||
|
streamTransport: "webhook",
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(result.exitCode).toBe(0);
|
||||||
|
expect(fetchMock).toHaveBeenCalledTimes(2);
|
||||||
|
const firstBody = JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body ?? "{}")) as Record<string, unknown>;
|
||||||
|
const secondBody = JSON.parse(String(fetchMock.mock.calls[1]?.[1]?.body ?? "{}")) as Record<string, unknown>;
|
||||||
|
expect(firstBody.paperclip).toBeTypeOf("object");
|
||||||
|
expect(secondBody.mode).toBe("now");
|
||||||
|
expect(String(secondBody.text ?? "")).toContain("PAPERCLIP_RUN_ID=run-123");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("rejects unsupported transport configuration", async () => {
|
||||||
const fetchMock = vi.fn();
|
const fetchMock = vi.fn();
|
||||||
vi.stubGlobal("fetch", fetchMock);
|
vi.stubGlobal("fetch", fetchMock);
|
||||||
|
|
||||||
const result = await execute(
|
const result = await execute(
|
||||||
buildContext({
|
buildContext({
|
||||||
url: "https://agent.example/sse",
|
url: "https://agent.example/sse",
|
||||||
streamTransport: "webhook",
|
streamTransport: "invalid",
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -550,7 +645,7 @@ describe("openclaw adapter execute", () => {
|
|||||||
expect(fetchMock).not.toHaveBeenCalled();
|
expect(fetchMock).not.toHaveBeenCalled();
|
||||||
});
|
});
|
||||||
|
|
||||||
it("rejects /hooks/wake compatibility endpoints in strict SSE mode", async () => {
|
it("rejects /hooks/wake compatibility endpoints in SSE mode", async () => {
|
||||||
const fetchMock = vi.fn();
|
const fetchMock = vi.fn();
|
||||||
vi.stubGlobal("fetch", fetchMock);
|
vi.stubGlobal("fetch", fetchMock);
|
||||||
|
|
||||||
@@ -567,7 +662,7 @@ describe("openclaw adapter execute", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
describe("openclaw adapter environment checks", () => {
|
describe("openclaw adapter environment checks", () => {
|
||||||
it("reports /hooks/wake endpoints as incompatible for strict SSE mode", async () => {
|
it("reports /hooks/wake endpoints as incompatible for SSE mode", async () => {
|
||||||
const fetchMock = vi
|
const fetchMock = vi
|
||||||
.fn()
|
.fn()
|
||||||
.mockResolvedValue(new Response(null, { status: 405, statusText: "Method Not Allowed" }));
|
.mockResolvedValue(new Response(null, { status: 405, statusText: "Method Not Allowed" }));
|
||||||
@@ -602,13 +697,36 @@ describe("openclaw adapter environment checks", () => {
|
|||||||
adapterType: "openclaw",
|
adapterType: "openclaw",
|
||||||
config: {
|
config: {
|
||||||
url: "https://agent.example/sse",
|
url: "https://agent.example/sse",
|
||||||
streamTransport: "webhook",
|
streamTransport: "invalid",
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
const check = result.checks.find((entry) => entry.code === "openclaw_stream_transport_unsupported");
|
const check = result.checks.find((entry) => entry.code === "openclaw_stream_transport_unsupported");
|
||||||
expect(check?.level).toBe("error");
|
expect(check?.level).toBe("error");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("accepts webhook streamTransport settings", async () => {
|
||||||
|
const fetchMock = vi
|
||||||
|
.fn()
|
||||||
|
.mockResolvedValue(new Response(null, { status: 405, statusText: "Method Not Allowed" }));
|
||||||
|
vi.stubGlobal("fetch", fetchMock);
|
||||||
|
|
||||||
|
const result = await testEnvironment({
|
||||||
|
companyId: "company-123",
|
||||||
|
adapterType: "openclaw",
|
||||||
|
config: {
|
||||||
|
url: "https://agent.example/hooks/wake",
|
||||||
|
streamTransport: "webhook",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const unsupported = result.checks.find((entry) => entry.code === "openclaw_stream_transport_unsupported");
|
||||||
|
const configured = result.checks.find((entry) => entry.code === "openclaw_stream_transport_configured");
|
||||||
|
const wakeIncompatible = result.checks.find((entry) => entry.code === "openclaw_wake_endpoint_incompatible");
|
||||||
|
expect(unsupported).toBeUndefined();
|
||||||
|
expect(configured?.level).toBe("info");
|
||||||
|
expect(wakeIncompatible).toBeUndefined();
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("onHireApproved", () => {
|
describe("onHireApproved", () => {
|
||||||
|
|||||||
@@ -135,6 +135,14 @@ function isWakePath(pathname: string): boolean {
|
|||||||
return value === "/hooks/wake" || value.endsWith("/hooks/wake");
|
return value === "/hooks/wake" || value.endsWith("/hooks/wake");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function normalizeOpenClawTransport(value: unknown): "sse" | "webhook" | null {
|
||||||
|
if (typeof value !== "string") return "sse";
|
||||||
|
const normalized = value.trim().toLowerCase();
|
||||||
|
if (!normalized || normalized === "sse") return "sse";
|
||||||
|
if (normalized === "webhook") return "webhook";
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
function normalizeHostname(value: string | null | undefined): string | null {
|
function normalizeHostname(value: string | null | undefined): string | null {
|
||||||
if (!value) return null;
|
if (!value) return null;
|
||||||
const trimmed = value.trim();
|
const trimmed = value.trim();
|
||||||
@@ -592,13 +600,25 @@ function normalizeAgentDefaultsForJoin(input: {
|
|||||||
level: "warn",
|
level: "warn",
|
||||||
message:
|
message:
|
||||||
"No OpenClaw callback config was provided in agentDefaultsPayload.",
|
"No OpenClaw callback config was provided in agentDefaultsPayload.",
|
||||||
hint: "Include agentDefaultsPayload.url so Paperclip can invoke the OpenClaw SSE endpoint immediately after approval."
|
hint: "Include agentDefaultsPayload.url so Paperclip can invoke the OpenClaw endpoint immediately after approval."
|
||||||
});
|
});
|
||||||
return { normalized: null as Record<string, unknown> | null, diagnostics };
|
return { normalized: null as Record<string, unknown> | null, diagnostics };
|
||||||
}
|
}
|
||||||
|
|
||||||
const defaults = input.defaultsPayload as Record<string, unknown>;
|
const defaults = input.defaultsPayload as Record<string, unknown>;
|
||||||
|
const streamTransportInput = defaults.streamTransport ?? defaults.transport;
|
||||||
|
const streamTransport = normalizeOpenClawTransport(streamTransportInput);
|
||||||
const normalized: Record<string, unknown> = { streamTransport: "sse" };
|
const normalized: Record<string, unknown> = { streamTransport: "sse" };
|
||||||
|
if (!streamTransport) {
|
||||||
|
diagnostics.push({
|
||||||
|
code: "openclaw_stream_transport_unsupported",
|
||||||
|
level: "warn",
|
||||||
|
message: `Unsupported streamTransport: ${String(streamTransportInput)}`,
|
||||||
|
hint: "Use streamTransport=sse or streamTransport=webhook."
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
normalized.streamTransport = streamTransport;
|
||||||
|
}
|
||||||
|
|
||||||
let callbackUrl: URL | null = null;
|
let callbackUrl: URL | null = null;
|
||||||
const rawUrl = typeof defaults.url === "string" ? defaults.url.trim() : "";
|
const rawUrl = typeof defaults.url === "string" ? defaults.url.trim() : "";
|
||||||
@@ -607,7 +627,7 @@ function normalizeAgentDefaultsForJoin(input: {
|
|||||||
code: "openclaw_callback_url_missing",
|
code: "openclaw_callback_url_missing",
|
||||||
level: "warn",
|
level: "warn",
|
||||||
message: "OpenClaw callback URL is missing.",
|
message: "OpenClaw callback URL is missing.",
|
||||||
hint: "Set agentDefaultsPayload.url to your OpenClaw SSE endpoint."
|
hint: "Set agentDefaultsPayload.url to your OpenClaw endpoint."
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
@@ -630,12 +650,12 @@ function normalizeAgentDefaultsForJoin(input: {
|
|||||||
message: `Callback endpoint set to ${callbackUrl.toString()}`
|
message: `Callback endpoint set to ${callbackUrl.toString()}`
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
if (isWakePath(callbackUrl.pathname)) {
|
if ((streamTransport ?? "sse") === "sse" && isWakePath(callbackUrl.pathname)) {
|
||||||
diagnostics.push({
|
diagnostics.push({
|
||||||
code: "openclaw_callback_wake_path_incompatible",
|
code: "openclaw_callback_wake_path_incompatible",
|
||||||
level: "warn",
|
level: "warn",
|
||||||
message:
|
message:
|
||||||
"Configured callback path targets /hooks/wake, which is not stream-capable for strict SSE mode.",
|
"Configured callback path targets /hooks/wake, which is not stream-capable for SSE transport.",
|
||||||
hint: "Use an endpoint that returns text/event-stream for the full run duration."
|
hint: "Use an endpoint that returns text/event-stream for the full run duration."
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -696,7 +716,7 @@ function normalizeAgentDefaultsForJoin(input: {
|
|||||||
code: "openclaw_auth_header_missing",
|
code: "openclaw_auth_header_missing",
|
||||||
level: "warn",
|
level: "warn",
|
||||||
message: "Gateway auth token is missing from agent defaults.",
|
message: "Gateway auth token is missing from agent defaults.",
|
||||||
hint: "Set agentDefaultsPayload.headers.x-openclaw-auth to the token your OpenClaw /v1/responses endpoint requires."
|
hint: "Set agentDefaultsPayload.headers.x-openclaw-auth to the token your OpenClaw endpoint requires."
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -943,10 +963,10 @@ function buildInviteOnboardingManifest(
|
|||||||
requiredFields: {
|
requiredFields: {
|
||||||
requestType: "agent",
|
requestType: "agent",
|
||||||
agentName: "Display name for this agent",
|
agentName: "Display name for this agent",
|
||||||
adapterType: "Use 'openclaw' for OpenClaw streaming agents",
|
adapterType: "Use 'openclaw' for OpenClaw agents",
|
||||||
capabilities: "Optional capability summary",
|
capabilities: "Optional capability summary",
|
||||||
agentDefaultsPayload:
|
agentDefaultsPayload:
|
||||||
"Adapter config for OpenClaw SSE endpoint. MUST include headers.x-openclaw-auth; also include url/method/paperclipApiUrl (and optional webhookAuthHeader/timeoutSec/payloadTemplate)."
|
"Adapter config for OpenClaw endpoint. MUST include headers.x-openclaw-auth; include streamTransport ('sse' or 'webhook') plus url/method/paperclipApiUrl (and optional webhookAuthHeader/timeoutSec/payloadTemplate)."
|
||||||
},
|
},
|
||||||
registrationEndpoint: {
|
registrationEndpoint: {
|
||||||
method: "POST",
|
method: "POST",
|
||||||
@@ -1103,6 +1123,7 @@ export function buildInviteOnboardingTextDocument(
|
|||||||
|
|
||||||
IMPORTANT: You MUST include agentDefaultsPayload.headers.x-openclaw-auth with your gateway token.
|
IMPORTANT: You MUST include agentDefaultsPayload.headers.x-openclaw-auth with your gateway token.
|
||||||
Without this token, Paperclip callback requests to your OpenClaw endpoint will fail with 401 Unauthorized.
|
Without this token, Paperclip callback requests to your OpenClaw endpoint will fail with 401 Unauthorized.
|
||||||
|
Set "streamTransport" to "sse" for streaming /v1/responses endpoints, or "webhook" for wake-style callbacks.
|
||||||
|
|
||||||
Body (JSON):
|
Body (JSON):
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user