Expand heartbeat service with process adapter improvements

Add richer process lifecycle management, output capture, and error
handling to the heartbeat service. Improve run event recording and
adapter state persistence.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Forgotten
2026-02-18 13:02:17 -06:00
parent 7e4a20645c
commit 11c8c1af78

View File

@@ -1,4 +1,6 @@
import { spawn, type ChildProcess } from "node:child_process"; import { spawn, type ChildProcess } from "node:child_process";
import { constants as fsConstants, promises as fs } from "node:fs";
import path from "node:path";
import { and, asc, desc, eq, gt, inArray, sql } from "drizzle-orm"; import { and, asc, desc, eq, gt, inArray, sql } from "drizzle-orm";
import type { Db } from "@paperclip/db"; import type { Db } from "@paperclip/db";
import { import {
@@ -47,6 +49,16 @@ interface AdapterExecutionResult {
summary?: string | null; summary?: string | null;
} }
interface AdapterInvocationMeta {
adapterType: string;
command: string;
cwd?: string;
commandArgs?: string[];
env?: Record<string, string>;
prompt?: string;
context?: Record<string, unknown>;
}
interface WakeupOptions { interface WakeupOptions {
source?: "timer" | "assignment" | "on_demand" | "automation"; source?: "timer" | "assignment" | "on_demand" | "automation";
triggerDetail?: "manual" | "ping" | "callback" | "system"; triggerDetail?: "manual" | "ping" | "callback" | "system";
@@ -62,6 +74,7 @@ const runningProcesses = new Map<string, RunningProcess>();
const MAX_CAPTURE_BYTES = 4 * 1024 * 1024; const MAX_CAPTURE_BYTES = 4 * 1024 * 1024;
const MAX_EXCERPT_BYTES = 32 * 1024; const MAX_EXCERPT_BYTES = 32 * 1024;
const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024; const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024;
const SENSITIVE_ENV_KEY = /(key|token|secret|password|passwd|authorization|cookie)/i;
function parseObject(value: unknown): Record<string, unknown> { function parseObject(value: unknown): Record<string, unknown> {
if (typeof value !== "object" || value === null || Array.isArray(value)) { if (typeof value !== "object" || value === null || Array.isArray(value)) {
@@ -175,6 +188,85 @@ function parseCodexJsonl(stdout: string) {
}; };
} }
function parseClaudeStreamJson(stdout: string) {
let sessionId: string | null = null;
let model = "";
let finalResult: Record<string, unknown> | null = null;
const assistantTexts: string[] = [];
for (const rawLine of stdout.split(/\r?\n/)) {
const line = rawLine.trim();
if (!line) continue;
const event = parseJson(line);
if (!event) continue;
const type = asString(event.type, "");
if (type === "system" && asString(event.subtype, "") === "init") {
sessionId = asString(event.session_id, sessionId ?? "") || sessionId;
model = asString(event.model, model);
continue;
}
if (type === "assistant") {
sessionId = asString(event.session_id, sessionId ?? "") || sessionId;
const message = parseObject(event.message);
const content = Array.isArray(message.content) ? message.content : [];
for (const entry of content) {
if (typeof entry !== "object" || entry === null || Array.isArray(entry)) continue;
const block = entry as Record<string, unknown>;
if (asString(block.type, "") === "text") {
const text = asString(block.text, "");
if (text) assistantTexts.push(text);
}
}
continue;
}
if (type === "result") {
finalResult = event;
sessionId = asString(event.session_id, sessionId ?? "") || sessionId;
}
}
if (!finalResult) {
return {
sessionId,
model,
costUsd: null as number | null,
usage: null as UsageSummary | null,
summary: assistantTexts.join("\n\n").trim(),
resultJson: null as Record<string, unknown> | null,
};
}
const usageObj = parseObject(finalResult.usage);
const usage: UsageSummary = {
inputTokens: asNumber(usageObj.input_tokens, 0),
cachedInputTokens: asNumber(usageObj.cache_read_input_tokens, 0),
outputTokens: asNumber(usageObj.output_tokens, 0),
};
const costRaw = finalResult.total_cost_usd;
const costUsd = typeof costRaw === "number" && Number.isFinite(costRaw) ? costRaw : null;
const summary = asString(finalResult.result, assistantTexts.join("\n\n")).trim();
return {
sessionId,
model,
costUsd,
usage,
summary,
resultJson: finalResult,
};
}
function redactEnvForLogs(env: Record<string, string>): Record<string, string> {
const redacted: Record<string, string> = {};
for (const [key, value] of Object.entries(env)) {
redacted[key] = SENSITIVE_ENV_KEY.test(key) ? "***REDACTED***" : value;
}
return redacted;
}
async function runChildProcess( async function runChildProcess(
runId: string, runId: string,
command: string, command: string,
@@ -188,9 +280,10 @@ async function runChildProcess(
}, },
): Promise<RunProcessResult> { ): Promise<RunProcessResult> {
return new Promise<RunProcessResult>((resolve, reject) => { return new Promise<RunProcessResult>((resolve, reject) => {
const mergedEnv = ensurePathInEnv({ ...process.env, ...opts.env });
const child = spawn(command, args, { const child = spawn(command, args, {
cwd: opts.cwd, cwd: opts.cwd,
env: { ...process.env, ...opts.env }, env: mergedEnv,
shell: false, shell: false,
stdio: ["ignore", "pipe", "pipe"], stdio: ["ignore", "pipe", "pipe"],
}); });
@@ -231,7 +324,13 @@ async function runChildProcess(
child.on("error", (err) => { child.on("error", (err) => {
clearTimeout(timeout); clearTimeout(timeout);
runningProcesses.delete(runId); runningProcesses.delete(runId);
reject(err); const errno = (err as NodeJS.ErrnoException).code;
const pathValue = mergedEnv.PATH ?? mergedEnv.Path ?? "";
const msg =
errno === "ENOENT"
? `Failed to start command "${command}" in "${opts.cwd}". Verify adapter command, working directory, and PATH (${pathValue}).`
: `Failed to start command "${command}" in "${opts.cwd}": ${err.message}`;
reject(new Error(msg));
}); });
child.on("close", (code, signal) => { child.on("close", (code, signal) => {
@@ -260,6 +359,70 @@ function buildPaperclipEnv(agent: { id: string; companyId: string }): Record<str
return vars; return vars;
} }
function defaultPathForPlatform() {
if (process.platform === "win32") {
return "C:\\Windows\\System32;C:\\Windows;C:\\Windows\\System32\\Wbem";
}
return "/usr/local/bin:/opt/homebrew/bin:/usr/local/sbin:/usr/bin:/bin:/usr/sbin:/sbin";
}
function ensurePathInEnv(env: NodeJS.ProcessEnv): NodeJS.ProcessEnv {
if (typeof env.PATH === "string" && env.PATH.length > 0) return env;
if (typeof env.Path === "string" && env.Path.length > 0) return env;
return { ...env, PATH: defaultPathForPlatform() };
}
async function ensureAbsoluteDirectory(cwd: string) {
if (!path.isAbsolute(cwd)) {
throw new Error(`Working directory must be an absolute path: "${cwd}"`);
}
let stats;
try {
stats = await fs.stat(cwd);
} catch {
throw new Error(`Working directory does not exist: "${cwd}"`);
}
if (!stats.isDirectory()) {
throw new Error(`Working directory is not a directory: "${cwd}"`);
}
}
async function ensureCommandResolvable(command: string, cwd: string, env: NodeJS.ProcessEnv) {
const hasPathSeparator = command.includes("/") || command.includes("\\");
if (hasPathSeparator) {
const absolute = path.isAbsolute(command) ? command : path.resolve(cwd, command);
try {
await fs.access(absolute, fsConstants.X_OK);
} catch {
throw new Error(`Command is not executable: "${command}" (resolved: "${absolute}")`);
}
return;
}
const pathValue = env.PATH ?? env.Path ?? "";
const delimiter = process.platform === "win32" ? ";" : ":";
const dirs = pathValue.split(delimiter).filter(Boolean);
const windowsExt = process.platform === "win32"
? (env.PATHEXT ?? ".EXE;.CMD;.BAT;.COM").split(";")
: [""];
for (const dir of dirs) {
for (const ext of windowsExt) {
const candidate = path.join(dir, process.platform === "win32" ? `${command}${ext}` : command);
try {
await fs.access(candidate, fsConstants.X_OK);
return;
} catch {
// continue scanning PATH
}
}
}
throw new Error(`Command not found in PATH: "${command}"`);
}
export function heartbeatService(db: Db) { export function heartbeatService(db: Db) {
const runLogStore = getRunLogStore(); const runLogStore = getRunLogStore();
@@ -543,6 +706,7 @@ export function heartbeatService(db: Db) {
agent: typeof agents.$inferSelect, agent: typeof agents.$inferSelect,
config: Record<string, unknown>, config: Record<string, unknown>,
onLog: (stream: "stdout" | "stderr", chunk: string) => Promise<void>, onLog: (stream: "stdout" | "stderr", chunk: string) => Promise<void>,
onMeta?: (meta: AdapterInvocationMeta) => Promise<void>,
): Promise<AdapterExecutionResult> { ): Promise<AdapterExecutionResult> {
const command = asString(config.command, ""); const command = asString(config.command, "");
if (!command) throw new Error("Process adapter missing command"); if (!command) throw new Error("Process adapter missing command");
@@ -558,6 +722,16 @@ export function heartbeatService(db: Db) {
const timeoutSec = asNumber(config.timeoutSec, 900); const timeoutSec = asNumber(config.timeoutSec, 900);
const graceSec = asNumber(config.graceSec, 15); const graceSec = asNumber(config.graceSec, 15);
if (onMeta) {
await onMeta({
adapterType: "process",
command,
cwd,
commandArgs: args,
env: redactEnvForLogs(env),
});
}
const proc = await runChildProcess(runId, command, args, { const proc = await runChildProcess(runId, command, args, {
cwd, cwd,
env, env,
@@ -606,26 +780,35 @@ export function heartbeatService(db: Db) {
config: Record<string, unknown>, config: Record<string, unknown>,
context: Record<string, unknown>, context: Record<string, unknown>,
onLog: (stream: "stdout" | "stderr", chunk: string) => Promise<void>, onLog: (stream: "stdout" | "stderr", chunk: string) => Promise<void>,
onMeta?: (meta: AdapterInvocationMeta) => Promise<void>,
): Promise<AdapterExecutionResult> { ): Promise<AdapterExecutionResult> {
const promptTemplate = asString( const promptTemplate = asString(
config.promptTemplate, config.promptTemplate,
"You are agent {{agent.id}} ({{agent.name}}). Continue your Paperclip work.", "You are agent {{agent.id}} ({{agent.name}}). Continue your Paperclip work.",
); );
const bootstrapTemplate = asString(config.bootstrapPromptTemplate, promptTemplate); const bootstrapTemplate = asString(config.bootstrapPromptTemplate, promptTemplate);
const command = asString(config.command, "claude");
const model = asString(config.model, ""); const model = asString(config.model, "");
const maxTurns = asNumber(config.maxTurnsPerRun, 0); const maxTurns = asNumber(config.maxTurnsPerRun, 0);
const dangerouslySkipPermissions = asBoolean(config.dangerouslySkipPermissions, false); const dangerouslySkipPermissions = asBoolean(config.dangerouslySkipPermissions, false);
const cwd = asString(config.cwd, process.cwd()); const cwd = asString(config.cwd, process.cwd());
await ensureAbsoluteDirectory(cwd);
const envConfig = parseObject(config.env); const envConfig = parseObject(config.env);
const env: Record<string, string> = { ...buildPaperclipEnv(agent) }; const env: Record<string, string> = { ...buildPaperclipEnv(agent) };
for (const [k, v] of Object.entries(envConfig)) { for (const [k, v] of Object.entries(envConfig)) {
if (typeof v === "string") env[k] = v; if (typeof v === "string") env[k] = v;
} }
const runtimeEnv = ensurePathInEnv({ ...process.env, ...env });
await ensureCommandResolvable(command, cwd, runtimeEnv);
const timeoutSec = asNumber(config.timeoutSec, 1800); const timeoutSec = asNumber(config.timeoutSec, 1800);
const graceSec = asNumber(config.graceSec, 20); const graceSec = asNumber(config.graceSec, 20);
const extraArgs = asStringArray(config.extraArgs); const extraArgs = (() => {
const fromExtraArgs = asStringArray(config.extraArgs);
if (fromExtraArgs.length > 0) return fromExtraArgs;
return asStringArray(config.args);
})();
const sessionId = runtime.sessionId; const sessionId = runtime.sessionId;
const template = sessionId ? promptTemplate : bootstrapTemplate; const template = sessionId ? promptTemplate : bootstrapTemplate;
@@ -636,14 +819,26 @@ export function heartbeatService(db: Db) {
context, context,
}); });
const args = ["--print", prompt, "--output-format", "json"]; const args = ["--print", prompt, "--output-format", "stream-json", "--verbose"];
if (sessionId) args.push("--resume", sessionId); if (sessionId) args.push("--resume", sessionId);
if (dangerouslySkipPermissions) args.push("--dangerously-skip-permissions"); if (dangerouslySkipPermissions) args.push("--dangerously-skip-permissions");
if (model) args.push("--model", model); if (model) args.push("--model", model);
if (maxTurns > 0) args.push("--max-turns", String(maxTurns)); if (maxTurns > 0) args.push("--max-turns", String(maxTurns));
if (extraArgs.length > 0) args.push(...extraArgs); if (extraArgs.length > 0) args.push(...extraArgs);
const proc = await runChildProcess(runId, "claude", args, { if (onMeta) {
await onMeta({
adapterType: "claude_local",
command,
cwd,
commandArgs: args.map((value, idx) => (idx === 1 ? `<prompt ${prompt.length} chars>` : value)),
env: redactEnvForLogs(env),
prompt,
context,
});
}
const proc = await runChildProcess(runId, command, args, {
cwd, cwd,
env, env,
timeoutSec, timeoutSec,
@@ -660,7 +855,8 @@ export function heartbeatService(db: Db) {
}; };
} }
const parsed = parseJson(proc.stdout); const parsedStream = parseClaudeStreamJson(proc.stdout);
const parsed = parsedStream.resultJson ?? parseJson(proc.stdout);
if (!parsed) { if (!parsed) {
return { return {
exitCode: proc.exitCode, exitCode: proc.exitCode,
@@ -677,12 +873,16 @@ export function heartbeatService(db: Db) {
}; };
} }
const usageObj = parseObject(parsed.usage); const usage =
const usage: UsageSummary = { parsedStream.usage ??
inputTokens: asNumber(usageObj.input_tokens, 0), (() => {
cachedInputTokens: asNumber(usageObj.cache_read_input_tokens, 0), const usageObj = parseObject(parsed.usage);
outputTokens: asNumber(usageObj.output_tokens, 0), return {
}; inputTokens: asNumber(usageObj.input_tokens, 0),
cachedInputTokens: asNumber(usageObj.cache_read_input_tokens, 0),
outputTokens: asNumber(usageObj.output_tokens, 0),
};
})();
return { return {
exitCode: proc.exitCode, exitCode: proc.exitCode,
@@ -690,12 +890,14 @@ export function heartbeatService(db: Db) {
timedOut: false, timedOut: false,
errorMessage: (proc.exitCode ?? 0) === 0 ? null : `Claude exited with code ${proc.exitCode ?? -1}`, errorMessage: (proc.exitCode ?? 0) === 0 ? null : `Claude exited with code ${proc.exitCode ?? -1}`,
usage, usage,
sessionId: asString(parsed.session_id, runtime.sessionId ?? "") || runtime.sessionId, sessionId:
parsedStream.sessionId ??
(asString(parsed.session_id, runtime.sessionId ?? "") || runtime.sessionId),
provider: "anthropic", provider: "anthropic",
model: asString(parsed.model, model), model: parsedStream.model || asString(parsed.model, model),
costUsd: asNumber(parsed.total_cost_usd, 0), costUsd: parsedStream.costUsd ?? asNumber(parsed.total_cost_usd, 0),
resultJson: parsed, resultJson: parsed,
summary: asString(parsed.result, ""), summary: parsedStream.summary || asString(parsed.result, ""),
}; };
} }
@@ -706,26 +908,35 @@ export function heartbeatService(db: Db) {
config: Record<string, unknown>, config: Record<string, unknown>,
context: Record<string, unknown>, context: Record<string, unknown>,
onLog: (stream: "stdout" | "stderr", chunk: string) => Promise<void>, onLog: (stream: "stdout" | "stderr", chunk: string) => Promise<void>,
onMeta?: (meta: AdapterInvocationMeta) => Promise<void>,
): Promise<AdapterExecutionResult> { ): Promise<AdapterExecutionResult> {
const promptTemplate = asString( const promptTemplate = asString(
config.promptTemplate, config.promptTemplate,
"You are agent {{agent.id}} ({{agent.name}}). Continue your Paperclip work.", "You are agent {{agent.id}} ({{agent.name}}). Continue your Paperclip work.",
); );
const bootstrapTemplate = asString(config.bootstrapPromptTemplate, promptTemplate); const bootstrapTemplate = asString(config.bootstrapPromptTemplate, promptTemplate);
const command = asString(config.command, "codex");
const model = asString(config.model, ""); const model = asString(config.model, "");
const search = asBoolean(config.search, false); const search = asBoolean(config.search, false);
const bypass = asBoolean(config.dangerouslyBypassApprovalsAndSandbox, false); const bypass = asBoolean(config.dangerouslyBypassApprovalsAndSandbox, false);
const cwd = asString(config.cwd, process.cwd()); const cwd = asString(config.cwd, process.cwd());
await ensureAbsoluteDirectory(cwd);
const envConfig = parseObject(config.env); const envConfig = parseObject(config.env);
const env: Record<string, string> = { ...buildPaperclipEnv(agent) }; const env: Record<string, string> = { ...buildPaperclipEnv(agent) };
for (const [k, v] of Object.entries(envConfig)) { for (const [k, v] of Object.entries(envConfig)) {
if (typeof v === "string") env[k] = v; if (typeof v === "string") env[k] = v;
} }
const runtimeEnv = ensurePathInEnv({ ...process.env, ...env });
await ensureCommandResolvable(command, cwd, runtimeEnv);
const timeoutSec = asNumber(config.timeoutSec, 1800); const timeoutSec = asNumber(config.timeoutSec, 1800);
const graceSec = asNumber(config.graceSec, 20); const graceSec = asNumber(config.graceSec, 20);
const extraArgs = asStringArray(config.extraArgs); const extraArgs = (() => {
const fromExtraArgs = asStringArray(config.extraArgs);
if (fromExtraArgs.length > 0) return fromExtraArgs;
return asStringArray(config.args);
})();
const sessionId = runtime.sessionId; const sessionId = runtime.sessionId;
const template = sessionId ? promptTemplate : bootstrapTemplate; const template = sessionId ? promptTemplate : bootstrapTemplate;
@@ -744,7 +955,23 @@ export function heartbeatService(db: Db) {
if (sessionId) args.push("resume", sessionId, prompt); if (sessionId) args.push("resume", sessionId, prompt);
else args.push(prompt); else args.push(prompt);
const proc = await runChildProcess(runId, "codex", args, { if (onMeta) {
await onMeta({
adapterType: "codex_local",
command,
cwd,
commandArgs: args.map((value, idx) => {
if (!sessionId && idx === args.length - 1) return `<prompt ${prompt.length} chars>`;
if (sessionId && idx === args.length - 1) return `<prompt ${prompt.length} chars>`;
return value;
}),
env: redactEnvForLogs(env),
prompt,
context,
});
}
const proc = await runChildProcess(runId, command, args, {
cwd, cwd,
env, env,
timeoutSec, timeoutSec,
@@ -888,16 +1115,25 @@ export function heartbeatService(db: Db) {
const config = parseObject(agent.adapterConfig); const config = parseObject(agent.adapterConfig);
const context = (run.contextSnapshot ?? {}) as Record<string, unknown>; const context = (run.contextSnapshot ?? {}) as Record<string, unknown>;
const onAdapterMeta = async (meta: AdapterInvocationMeta) => {
await appendRunEvent(currentRun, seq++, {
eventType: "adapter.invoke",
stream: "system",
level: "info",
message: "adapter invocation",
payload: meta as Record<string, unknown>,
});
};
let adapterResult: AdapterExecutionResult; let adapterResult: AdapterExecutionResult;
if (agent.adapterType === "http") { if (agent.adapterType === "http") {
adapterResult = await executeHttpRun(run.id, agent.id, config, context); adapterResult = await executeHttpRun(run.id, agent.id, config, context);
} else if (agent.adapterType === "claude_local") { } else if (agent.adapterType === "claude_local") {
adapterResult = await executeClaudeLocalRun(run.id, agent, runtime, config, context, onLog); adapterResult = await executeClaudeLocalRun(run.id, agent, runtime, config, context, onLog, onAdapterMeta);
} else if (agent.adapterType === "codex_local") { } else if (agent.adapterType === "codex_local") {
adapterResult = await executeCodexLocalRun(run.id, agent, runtime, config, context, onLog); adapterResult = await executeCodexLocalRun(run.id, agent, runtime, config, context, onLog, onAdapterMeta);
} else { } else {
adapterResult = await executeProcessRun(run.id, agent, config, onLog); adapterResult = await executeProcessRun(run.id, agent, config, onLog, onAdapterMeta);
} }
let outcome: "succeeded" | "failed" | "cancelled" | "timed_out"; let outcome: "succeeded" | "failed" | "cancelled" | "timed_out";
@@ -926,6 +1162,14 @@ export function heartbeatService(db: Db) {
? "timed_out" ? "timed_out"
: "failed"; : "failed";
const usageJson =
adapterResult.usage || adapterResult.costUsd != null
? ({
...(adapterResult.usage ?? {}),
...(adapterResult.costUsd != null ? { costUsd: adapterResult.costUsd } : {}),
} as Record<string, unknown>)
: null;
await setRunStatus(run.id, status, { await setRunStatus(run.id, status, {
finishedAt: new Date(), finishedAt: new Date(),
error: error:
@@ -942,7 +1186,7 @@ export function heartbeatService(db: Db) {
: null, : null,
exitCode: adapterResult.exitCode, exitCode: adapterResult.exitCode,
signal: adapterResult.signal, signal: adapterResult.signal,
usageJson: (adapterResult.usage ?? null) as Record<string, unknown> | null, usageJson,
resultJson: adapterResult.resultJson ?? null, resultJson: adapterResult.resultJson ?? null,
sessionIdAfter: adapterResult.sessionId ?? runtime.sessionId, sessionIdAfter: adapterResult.sessionId ?? runtime.sessionId,
stdoutExcerpt, stdoutExcerpt,