Files
paperclip/server/src/services/heartbeat.ts
Forgotten c9c75bbc0a Implement agent runtime services and WebSocket realtime
Expand heartbeat service with full run executor, wakeup coordinator,
and adapter lifecycle. Add run-log-store for pluggable log persistence.
Add live-events service and WebSocket handler for realtime updates.
Expand agent and issue routes with runtime operations. Add ws dependency.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 12:24:43 -06:00

1345 lines
40 KiB
TypeScript

import { spawn, type ChildProcess } from "node:child_process";
import { and, asc, desc, eq, gt, inArray, sql } from "drizzle-orm";
import type { Db } from "@paperclip/db";
import {
agents,
agentRuntimeState,
agentWakeupRequests,
heartbeatRunEvents,
heartbeatRuns,
costEvents,
} from "@paperclip/db";
import { conflict, notFound } from "../errors.js";
import { logger } from "../middleware/logger.js";
import { publishLiveEvent } from "./live-events.js";
import { getRunLogStore, type RunLogHandle } from "./run-log-store.js";
interface RunningProcess {
child: ChildProcess;
graceSec: number;
}
interface RunProcessResult {
exitCode: number | null;
signal: string | null;
timedOut: boolean;
stdout: string;
stderr: string;
}
interface UsageSummary {
inputTokens: number;
outputTokens: number;
cachedInputTokens?: number;
}
interface AdapterExecutionResult {
exitCode: number | null;
signal: string | null;
timedOut: boolean;
errorMessage?: string | null;
usage?: UsageSummary;
sessionId?: string | null;
provider?: string | null;
model?: string | null;
costUsd?: number | null;
resultJson?: Record<string, unknown> | null;
summary?: string | null;
}
interface WakeupOptions {
source?: "timer" | "assignment" | "on_demand" | "automation";
triggerDetail?: "manual" | "ping" | "callback" | "system";
reason?: string | null;
payload?: Record<string, unknown> | null;
idempotencyKey?: string | null;
requestedByActorType?: "user" | "agent" | "system";
requestedByActorId?: string | null;
contextSnapshot?: Record<string, unknown>;
}
const runningProcesses = new Map<string, RunningProcess>();
const MAX_CAPTURE_BYTES = 4 * 1024 * 1024;
const MAX_EXCERPT_BYTES = 32 * 1024;
const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024;
function parseObject(value: unknown): Record<string, unknown> {
if (typeof value !== "object" || value === null || Array.isArray(value)) {
return {};
}
return value as Record<string, unknown>;
}
function asString(value: unknown, fallback: string): string {
return typeof value === "string" && value.length > 0 ? value : fallback;
}
function asNumber(value: unknown, fallback: number): number {
return typeof value === "number" && Number.isFinite(value) ? value : fallback;
}
function asBoolean(value: unknown, fallback: boolean): boolean {
return typeof value === "boolean" ? value : fallback;
}
function asStringArray(value: unknown): string[] {
return Array.isArray(value) ? value.filter((item): item is string => typeof item === "string") : [];
}
function parseJson(value: string): Record<string, unknown> | null {
try {
return JSON.parse(value) as Record<string, unknown>;
} catch {
return null;
}
}
function appendWithCap(prev: string, chunk: string, cap = MAX_CAPTURE_BYTES) {
const combined = prev + chunk;
return combined.length > cap ? combined.slice(combined.length - cap) : combined;
}
function appendExcerpt(prev: string, chunk: string) {
return appendWithCap(prev, chunk, MAX_EXCERPT_BYTES);
}
function resolvePathValue(obj: Record<string, unknown>, dottedPath: string) {
const parts = dottedPath.split(".");
let cursor: unknown = obj;
for (const part of parts) {
if (typeof cursor !== "object" || cursor === null || Array.isArray(cursor)) {
return "";
}
cursor = (cursor as Record<string, unknown>)[part];
}
if (cursor === null || cursor === undefined) return "";
if (typeof cursor === "string") return cursor;
if (typeof cursor === "number" || typeof cursor === "boolean") return String(cursor);
try {
return JSON.stringify(cursor);
} catch {
return "";
}
}
function renderTemplate(template: string, data: Record<string, unknown>) {
return template.replace(/{{\s*([a-zA-Z0-9_.-]+)\s*}}/g, (_, path) => resolvePathValue(data, path));
}
function parseCodexJsonl(stdout: string) {
let sessionId: string | null = null;
const messages: string[] = [];
const usage = {
inputTokens: 0,
cachedInputTokens: 0,
outputTokens: 0,
};
for (const rawLine of stdout.split(/\r?\n/)) {
const line = rawLine.trim();
if (!line) continue;
const event = parseJson(line);
if (!event) continue;
const type = asString(event.type, "");
if (type === "thread.started") {
sessionId = asString(event.thread_id, sessionId ?? "") || sessionId;
continue;
}
if (type === "item.completed") {
const item = parseObject(event.item);
if (asString(item.type, "") === "agent_message") {
const text = asString(item.text, "");
if (text) messages.push(text);
}
continue;
}
if (type === "turn.completed") {
const usageObj = parseObject(event.usage);
usage.inputTokens = asNumber(usageObj.input_tokens, usage.inputTokens);
usage.cachedInputTokens = asNumber(usageObj.cached_input_tokens, usage.cachedInputTokens);
usage.outputTokens = asNumber(usageObj.output_tokens, usage.outputTokens);
}
}
return {
sessionId,
summary: messages.join("\n\n").trim(),
usage,
};
}
async function runChildProcess(
runId: string,
command: string,
args: string[],
opts: {
cwd: string;
env: Record<string, string>;
timeoutSec: number;
graceSec: number;
onLog: (stream: "stdout" | "stderr", chunk: string) => Promise<void>;
},
): Promise<RunProcessResult> {
return new Promise<RunProcessResult>((resolve, reject) => {
const child = spawn(command, args, {
cwd: opts.cwd,
env: { ...process.env, ...opts.env },
shell: false,
stdio: ["ignore", "pipe", "pipe"],
});
runningProcesses.set(runId, { child, graceSec: opts.graceSec });
let timedOut = false;
let stdout = "";
let stderr = "";
let logChain: Promise<void> = Promise.resolve();
const timeout = setTimeout(() => {
timedOut = true;
child.kill("SIGTERM");
setTimeout(() => {
if (!child.killed) {
child.kill("SIGKILL");
}
}, Math.max(1, opts.graceSec) * 1000);
}, Math.max(1, opts.timeoutSec) * 1000);
child.stdout?.on("data", (chunk) => {
const text = String(chunk);
stdout = appendWithCap(stdout, text);
logChain = logChain
.then(() => opts.onLog("stdout", text))
.catch((err) => logger.warn({ err, runId }, "failed to append stdout log chunk"));
});
child.stderr?.on("data", (chunk) => {
const text = String(chunk);
stderr = appendWithCap(stderr, text);
logChain = logChain
.then(() => opts.onLog("stderr", text))
.catch((err) => logger.warn({ err, runId }, "failed to append stderr log chunk"));
});
child.on("error", (err) => {
clearTimeout(timeout);
runningProcesses.delete(runId);
reject(err);
});
child.on("close", (code, signal) => {
clearTimeout(timeout);
runningProcesses.delete(runId);
void logChain.finally(() => {
resolve({
exitCode: code,
signal,
timedOut,
stdout,
stderr,
});
});
});
});
}
export function heartbeatService(db: Db) {
const runLogStore = getRunLogStore();
async function getAgent(agentId: string) {
return db
.select()
.from(agents)
.where(eq(agents.id, agentId))
.then((rows) => rows[0] ?? null);
}
async function getRun(runId: string) {
return db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.id, runId))
.then((rows) => rows[0] ?? null);
}
async function getRuntimeState(agentId: string) {
return db
.select()
.from(agentRuntimeState)
.where(eq(agentRuntimeState.agentId, agentId))
.then((rows) => rows[0] ?? null);
}
async function ensureRuntimeState(agent: typeof agents.$inferSelect) {
const existing = await getRuntimeState(agent.id);
if (existing) return existing;
return db
.insert(agentRuntimeState)
.values({
agentId: agent.id,
companyId: agent.companyId,
adapterType: agent.adapterType,
stateJson: {},
})
.returning()
.then((rows) => rows[0]);
}
async function setRunStatus(
runId: string,
status: string,
patch?: Partial<typeof heartbeatRuns.$inferInsert>,
) {
const updated = await db
.update(heartbeatRuns)
.set({ status, ...patch, updatedAt: new Date() })
.where(eq(heartbeatRuns.id, runId))
.returning()
.then((rows) => rows[0] ?? null);
if (updated) {
publishLiveEvent({
companyId: updated.companyId,
type: "heartbeat.run.status",
payload: {
runId: updated.id,
agentId: updated.agentId,
status: updated.status,
invocationSource: updated.invocationSource,
triggerDetail: updated.triggerDetail,
error: updated.error ?? null,
errorCode: updated.errorCode ?? null,
startedAt: updated.startedAt ? new Date(updated.startedAt).toISOString() : null,
finishedAt: updated.finishedAt ? new Date(updated.finishedAt).toISOString() : null,
},
});
}
return updated;
}
async function setWakeupStatus(
wakeupRequestId: string | null | undefined,
status: string,
patch?: Partial<typeof agentWakeupRequests.$inferInsert>,
) {
if (!wakeupRequestId) return;
await db
.update(agentWakeupRequests)
.set({ status, ...patch, updatedAt: new Date() })
.where(eq(agentWakeupRequests.id, wakeupRequestId));
}
async function appendRunEvent(
run: typeof heartbeatRuns.$inferSelect,
seq: number,
event: {
eventType: string;
stream?: "system" | "stdout" | "stderr";
level?: "info" | "warn" | "error";
color?: string;
message?: string;
payload?: Record<string, unknown>;
},
) {
await db.insert(heartbeatRunEvents).values({
companyId: run.companyId,
runId: run.id,
agentId: run.agentId,
seq,
eventType: event.eventType,
stream: event.stream,
level: event.level,
color: event.color,
message: event.message,
payload: event.payload,
});
publishLiveEvent({
companyId: run.companyId,
type: "heartbeat.run.event",
payload: {
runId: run.id,
agentId: run.agentId,
seq,
eventType: event.eventType,
stream: event.stream ?? null,
level: event.level ?? null,
color: event.color ?? null,
message: event.message ?? null,
payload: event.payload ?? null,
},
});
}
function parseHeartbeatPolicy(agent: typeof agents.$inferSelect) {
const runtimeConfig = parseObject(agent.runtimeConfig);
const heartbeat = parseObject(runtimeConfig.heartbeat);
return {
enabled: asBoolean(heartbeat.enabled, true),
intervalSec: Math.max(0, asNumber(heartbeat.intervalSec, 0)),
wakeOnAssignment: asBoolean(heartbeat.wakeOnAssignment, true),
wakeOnOnDemand: asBoolean(heartbeat.wakeOnOnDemand, true),
wakeOnAutomation: asBoolean(heartbeat.wakeOnAutomation, true),
};
}
async function finalizeAgentStatus(
agentId: string,
outcome: "succeeded" | "failed" | "cancelled" | "timed_out",
) {
const existing = await getAgent(agentId);
if (!existing) return;
if (existing.status === "paused" || existing.status === "terminated") {
return;
}
const nextStatus =
outcome === "succeeded" ? "idle" : outcome === "cancelled" ? "idle" : "error";
const updated = await db
.update(agents)
.set({
status: nextStatus,
lastHeartbeatAt: new Date(),
updatedAt: new Date(),
})
.where(eq(agents.id, agentId))
.returning()
.then((rows) => rows[0] ?? null);
if (updated) {
publishLiveEvent({
companyId: updated.companyId,
type: "agent.status",
payload: {
agentId: updated.id,
status: updated.status,
lastHeartbeatAt: updated.lastHeartbeatAt
? new Date(updated.lastHeartbeatAt).toISOString()
: null,
outcome,
},
});
}
}
async function updateRuntimeState(
agent: typeof agents.$inferSelect,
run: typeof heartbeatRuns.$inferSelect,
result: AdapterExecutionResult,
) {
const existing = await ensureRuntimeState(agent);
const usage = result.usage;
const inputTokens = usage?.inputTokens ?? 0;
const outputTokens = usage?.outputTokens ?? 0;
const cachedInputTokens = usage?.cachedInputTokens ?? 0;
const additionalCostCents = Math.max(0, Math.round((result.costUsd ?? 0) * 100));
await db
.update(agentRuntimeState)
.set({
adapterType: agent.adapterType,
sessionId: result.sessionId ?? existing.sessionId,
lastRunId: run.id,
lastRunStatus: run.status,
lastError: result.errorMessage ?? null,
totalInputTokens: existing.totalInputTokens + inputTokens,
totalOutputTokens: existing.totalOutputTokens + outputTokens,
totalCachedInputTokens: existing.totalCachedInputTokens + cachedInputTokens,
totalCostCents: existing.totalCostCents + additionalCostCents,
updatedAt: new Date(),
})
.where(eq(agentRuntimeState.agentId, agent.id));
if (additionalCostCents > 0) {
await db.insert(costEvents).values({
companyId: agent.companyId,
agentId: agent.id,
provider: result.provider ?? "unknown",
model: result.model ?? "unknown",
inputTokens,
outputTokens,
costCents: additionalCostCents,
occurredAt: new Date(),
});
await db
.update(agents)
.set({
spentMonthlyCents: sql`${agents.spentMonthlyCents} + ${additionalCostCents}`,
updatedAt: new Date(),
})
.where(eq(agents.id, agent.id));
}
}
async function executeHttpRun(
runId: string,
agentId: string,
config: Record<string, unknown>,
context: Record<string, unknown>,
): Promise<AdapterExecutionResult> {
const url = asString(config.url, "");
if (!url) throw new Error("HTTP adapter missing url");
const method = asString(config.method, "POST");
const timeoutMs = asNumber(config.timeoutMs, 15000);
const headers = parseObject(config.headers) as Record<string, string>;
const payloadTemplate = parseObject(config.payloadTemplate);
const body = { ...payloadTemplate, agentId, runId, context };
const controller = new AbortController();
const timer = setTimeout(() => controller.abort(), timeoutMs);
try {
const res = await fetch(url, {
method,
headers: {
"content-type": "application/json",
...headers,
},
body: JSON.stringify(body),
signal: controller.signal,
});
if (!res.ok) {
throw new Error(`HTTP invoke failed with status ${res.status}`);
}
return {
exitCode: 0,
signal: null,
timedOut: false,
summary: `HTTP ${method} ${url}`,
};
} finally {
clearTimeout(timer);
}
}
async function executeProcessRun(
runId: string,
config: Record<string, unknown>,
onLog: (stream: "stdout" | "stderr", chunk: string) => Promise<void>,
): Promise<AdapterExecutionResult> {
const command = asString(config.command, "");
if (!command) throw new Error("Process adapter missing command");
const args = asStringArray(config.args);
const cwd = asString(config.cwd, process.cwd());
const envConfig = parseObject(config.env);
const env: Record<string, string> = {};
for (const [k, v] of Object.entries(envConfig)) {
if (typeof v === "string") env[k] = v;
}
const timeoutSec = asNumber(config.timeoutSec, 900);
const graceSec = asNumber(config.graceSec, 15);
const proc = await runChildProcess(runId, command, args, {
cwd,
env,
timeoutSec,
graceSec,
onLog,
});
if (proc.timedOut) {
return {
exitCode: proc.exitCode,
signal: proc.signal,
timedOut: true,
errorMessage: `Timed out after ${timeoutSec}s`,
};
}
if ((proc.exitCode ?? 0) !== 0) {
return {
exitCode: proc.exitCode,
signal: proc.signal,
timedOut: false,
errorMessage: `Process exited with code ${proc.exitCode ?? -1}`,
resultJson: {
stdout: proc.stdout,
stderr: proc.stderr,
},
};
}
return {
exitCode: proc.exitCode,
signal: proc.signal,
timedOut: false,
resultJson: {
stdout: proc.stdout,
stderr: proc.stderr,
},
};
}
async function executeClaudeLocalRun(
runId: string,
agent: typeof agents.$inferSelect,
runtime: typeof agentRuntimeState.$inferSelect,
config: Record<string, unknown>,
context: Record<string, unknown>,
onLog: (stream: "stdout" | "stderr", chunk: string) => Promise<void>,
): Promise<AdapterExecutionResult> {
const promptTemplate = asString(
config.promptTemplate,
"You are agent {{agent.id}} ({{agent.name}}). Continue your Paperclip work.",
);
const bootstrapTemplate = asString(config.bootstrapPromptTemplate, promptTemplate);
const model = asString(config.model, "");
const maxTurns = asNumber(config.maxTurnsPerRun, 0);
const dangerouslySkipPermissions = asBoolean(config.dangerouslySkipPermissions, false);
const cwd = asString(config.cwd, process.cwd());
const envConfig = parseObject(config.env);
const env: Record<string, string> = {};
for (const [k, v] of Object.entries(envConfig)) {
if (typeof v === "string") env[k] = v;
}
const timeoutSec = asNumber(config.timeoutSec, 1800);
const graceSec = asNumber(config.graceSec, 20);
const extraArgs = asStringArray(config.extraArgs);
const sessionId = runtime.sessionId;
const template = sessionId ? promptTemplate : bootstrapTemplate;
const prompt = renderTemplate(template, {
company: { id: agent.companyId },
agent,
run: { id: runId, source: "on_demand" },
context,
});
const args = ["--print", prompt, "--output-format", "json"];
if (sessionId) args.push("--resume", sessionId);
if (dangerouslySkipPermissions) args.push("--dangerously-skip-permissions");
if (model) args.push("--model", model);
if (maxTurns > 0) args.push("--max-turns", String(maxTurns));
if (extraArgs.length > 0) args.push(...extraArgs);
const proc = await runChildProcess(runId, "claude", args, {
cwd,
env,
timeoutSec,
graceSec,
onLog,
});
if (proc.timedOut) {
return {
exitCode: proc.exitCode,
signal: proc.signal,
timedOut: true,
errorMessage: `Timed out after ${timeoutSec}s`,
};
}
const parsed = parseJson(proc.stdout);
if (!parsed) {
return {
exitCode: proc.exitCode,
signal: proc.signal,
timedOut: false,
errorMessage:
(proc.exitCode ?? 0) === 0
? "Failed to parse claude JSON output"
: `Claude exited with code ${proc.exitCode ?? -1}`,
resultJson: {
stdout: proc.stdout,
stderr: proc.stderr,
},
};
}
const usageObj = parseObject(parsed.usage);
const usage: UsageSummary = {
inputTokens: asNumber(usageObj.input_tokens, 0),
cachedInputTokens: asNumber(usageObj.cache_read_input_tokens, 0),
outputTokens: asNumber(usageObj.output_tokens, 0),
};
return {
exitCode: proc.exitCode,
signal: proc.signal,
timedOut: false,
errorMessage: (proc.exitCode ?? 0) === 0 ? null : `Claude exited with code ${proc.exitCode ?? -1}`,
usage,
sessionId: asString(parsed.session_id, runtime.sessionId ?? "") || runtime.sessionId,
provider: "anthropic",
model: asString(parsed.model, model),
costUsd: asNumber(parsed.total_cost_usd, 0),
resultJson: parsed,
summary: asString(parsed.result, ""),
};
}
async function executeCodexLocalRun(
runId: string,
agent: typeof agents.$inferSelect,
runtime: typeof agentRuntimeState.$inferSelect,
config: Record<string, unknown>,
context: Record<string, unknown>,
onLog: (stream: "stdout" | "stderr", chunk: string) => Promise<void>,
): Promise<AdapterExecutionResult> {
const promptTemplate = asString(
config.promptTemplate,
"You are agent {{agent.id}} ({{agent.name}}). Continue your Paperclip work.",
);
const bootstrapTemplate = asString(config.bootstrapPromptTemplate, promptTemplate);
const model = asString(config.model, "");
const search = asBoolean(config.search, false);
const bypass = asBoolean(config.dangerouslyBypassApprovalsAndSandbox, false);
const cwd = asString(config.cwd, process.cwd());
const envConfig = parseObject(config.env);
const env: Record<string, string> = {};
for (const [k, v] of Object.entries(envConfig)) {
if (typeof v === "string") env[k] = v;
}
const timeoutSec = asNumber(config.timeoutSec, 1800);
const graceSec = asNumber(config.graceSec, 20);
const extraArgs = asStringArray(config.extraArgs);
const sessionId = runtime.sessionId;
const template = sessionId ? promptTemplate : bootstrapTemplate;
const prompt = renderTemplate(template, {
company: { id: agent.companyId },
agent,
run: { id: runId, source: "on_demand" },
context,
});
const args = ["exec", "--json"];
if (search) args.unshift("--search");
if (bypass) args.push("--dangerously-bypass-approvals-and-sandbox");
if (model) args.push("--model", model);
if (extraArgs.length > 0) args.push(...extraArgs);
if (sessionId) args.push("resume", sessionId, prompt);
else args.push(prompt);
const proc = await runChildProcess(runId, "codex", args, {
cwd,
env,
timeoutSec,
graceSec,
onLog,
});
if (proc.timedOut) {
return {
exitCode: proc.exitCode,
signal: proc.signal,
timedOut: true,
errorMessage: `Timed out after ${timeoutSec}s`,
};
}
const parsed = parseCodexJsonl(proc.stdout);
return {
exitCode: proc.exitCode,
signal: proc.signal,
timedOut: false,
errorMessage: (proc.exitCode ?? 0) === 0 ? null : `Codex exited with code ${proc.exitCode ?? -1}`,
usage: parsed.usage,
sessionId: parsed.sessionId ?? runtime.sessionId,
provider: "openai",
model,
costUsd: null,
resultJson: {
stdout: proc.stdout,
stderr: proc.stderr,
},
summary: parsed.summary,
};
}
async function executeRun(runId: string) {
const run = await getRun(runId);
if (!run) return;
if (run.status !== "queued" && run.status !== "running") return;
const agent = await getAgent(run.agentId);
if (!agent) {
await setRunStatus(runId, "failed", {
error: "Agent not found",
errorCode: "agent_not_found",
finishedAt: new Date(),
});
await setWakeupStatus(run.wakeupRequestId, "failed", {
finishedAt: new Date(),
error: "Agent not found",
});
return;
}
const runtime = await ensureRuntimeState(agent);
let seq = 1;
let handle: RunLogHandle | null = null;
let stdoutExcerpt = "";
let stderrExcerpt = "";
try {
await setRunStatus(runId, "running", {
startedAt: new Date(),
sessionIdBefore: runtime.sessionId,
});
await setWakeupStatus(run.wakeupRequestId, "claimed", { claimedAt: new Date() });
const runningAgent = await db
.update(agents)
.set({ status: "running", updatedAt: new Date() })
.where(eq(agents.id, agent.id))
.returning()
.then((rows) => rows[0] ?? null);
if (runningAgent) {
publishLiveEvent({
companyId: runningAgent.companyId,
type: "agent.status",
payload: {
agentId: runningAgent.id,
status: runningAgent.status,
outcome: "running",
},
});
}
const currentRun = (await getRun(runId)) ?? run;
await appendRunEvent(currentRun, seq++, {
eventType: "lifecycle",
stream: "system",
level: "info",
message: "run started",
});
handle = await runLogStore.begin({
companyId: run.companyId,
agentId: run.agentId,
runId,
});
await db
.update(heartbeatRuns)
.set({
logStore: handle.store,
logRef: handle.logRef,
updatedAt: new Date(),
})
.where(eq(heartbeatRuns.id, runId));
const onLog = async (stream: "stdout" | "stderr", chunk: string) => {
if (stream === "stdout") stdoutExcerpt = appendExcerpt(stdoutExcerpt, chunk);
if (stream === "stderr") stderrExcerpt = appendExcerpt(stderrExcerpt, chunk);
if (handle) {
await runLogStore.append(handle, {
stream,
chunk,
ts: new Date().toISOString(),
});
}
const payloadChunk =
chunk.length > MAX_LIVE_LOG_CHUNK_BYTES
? chunk.slice(chunk.length - MAX_LIVE_LOG_CHUNK_BYTES)
: chunk;
publishLiveEvent({
companyId: run.companyId,
type: "heartbeat.run.log",
payload: {
runId: run.id,
agentId: run.agentId,
stream,
chunk: payloadChunk,
truncated: payloadChunk.length !== chunk.length,
},
});
};
const config = parseObject(agent.adapterConfig);
const context = (run.contextSnapshot ?? {}) as Record<string, unknown>;
let adapterResult: AdapterExecutionResult;
if (agent.adapterType === "http") {
adapterResult = await executeHttpRun(run.id, agent.id, config, context);
} else if (agent.adapterType === "claude_local") {
adapterResult = await executeClaudeLocalRun(run.id, agent, runtime, config, context, onLog);
} else if (agent.adapterType === "codex_local") {
adapterResult = await executeCodexLocalRun(run.id, agent, runtime, config, context, onLog);
} else {
adapterResult = await executeProcessRun(run.id, config, onLog);
}
let outcome: "succeeded" | "failed" | "cancelled" | "timed_out";
const latestRun = await getRun(run.id);
if (latestRun?.status === "cancelled") {
outcome = "cancelled";
} else if (adapterResult.timedOut) {
outcome = "timed_out";
} else if ((adapterResult.exitCode ?? 0) === 0 && !adapterResult.errorMessage) {
outcome = "succeeded";
} else {
outcome = "failed";
}
let logSummary: { bytes: number; sha256?: string; compressed: boolean } | null = null;
if (handle) {
logSummary = await runLogStore.finalize(handle);
}
const status =
outcome === "succeeded"
? "succeeded"
: outcome === "cancelled"
? "cancelled"
: outcome === "timed_out"
? "timed_out"
: "failed";
await setRunStatus(run.id, status, {
finishedAt: new Date(),
error:
outcome === "succeeded"
? null
: adapterResult.errorMessage ?? (outcome === "timed_out" ? "Timed out" : "Adapter failed"),
errorCode:
outcome === "timed_out"
? "timeout"
: outcome === "cancelled"
? "cancelled"
: outcome === "failed"
? "adapter_failed"
: null,
exitCode: adapterResult.exitCode,
signal: adapterResult.signal,
usageJson: (adapterResult.usage ?? null) as Record<string, unknown> | null,
resultJson: adapterResult.resultJson ?? null,
sessionIdAfter: adapterResult.sessionId ?? runtime.sessionId,
stdoutExcerpt,
stderrExcerpt,
logBytes: logSummary?.bytes,
logSha256: logSummary?.sha256,
logCompressed: logSummary?.compressed ?? false,
});
await setWakeupStatus(run.wakeupRequestId, outcome === "succeeded" ? "completed" : status, {
finishedAt: new Date(),
error: adapterResult.errorMessage ?? null,
});
const finalizedRun = await getRun(run.id);
if (finalizedRun) {
await appendRunEvent(finalizedRun, seq++, {
eventType: "lifecycle",
stream: "system",
level: outcome === "succeeded" ? "info" : "error",
message: `run ${outcome}`,
payload: {
status,
exitCode: adapterResult.exitCode,
},
});
}
if (finalizedRun) {
await updateRuntimeState(agent, finalizedRun, adapterResult);
}
await finalizeAgentStatus(agent.id, outcome);
} catch (err) {
const message = err instanceof Error ? err.message : "Unknown adapter failure";
logger.error({ err, runId }, "heartbeat execution failed");
let logSummary: { bytes: number; sha256?: string; compressed: boolean } | null = null;
if (handle) {
try {
logSummary = await runLogStore.finalize(handle);
} catch (finalizeErr) {
logger.warn({ err: finalizeErr, runId }, "failed to finalize run log after error");
}
}
const failedRun = await setRunStatus(run.id, "failed", {
error: message,
errorCode: "adapter_failed",
finishedAt: new Date(),
stdoutExcerpt,
stderrExcerpt,
logBytes: logSummary?.bytes,
logSha256: logSummary?.sha256,
logCompressed: logSummary?.compressed ?? false,
});
await setWakeupStatus(run.wakeupRequestId, "failed", {
finishedAt: new Date(),
error: message,
});
if (failedRun) {
await appendRunEvent(failedRun, seq++, {
eventType: "error",
stream: "system",
level: "error",
message,
});
await updateRuntimeState(agent, failedRun, {
exitCode: null,
signal: null,
timedOut: false,
errorMessage: message,
});
}
await finalizeAgentStatus(agent.id, "failed");
}
}
async function enqueueWakeup(agentId: string, opts: WakeupOptions = {}) {
const source = opts.source ?? "on_demand";
const triggerDetail = opts.triggerDetail ?? null;
const contextSnapshot = opts.contextSnapshot ?? {};
const agent = await getAgent(agentId);
if (!agent) throw notFound("Agent not found");
if (agent.status === "paused" || agent.status === "terminated") {
throw conflict("Agent is not invokable in its current state", { status: agent.status });
}
const policy = parseHeartbeatPolicy(agent);
const writeSkippedRequest = async (reason: string) => {
await db.insert(agentWakeupRequests).values({
companyId: agent.companyId,
agentId,
source,
triggerDetail,
reason,
payload: opts.payload ?? null,
status: "skipped",
requestedByActorType: opts.requestedByActorType ?? null,
requestedByActorId: opts.requestedByActorId ?? null,
idempotencyKey: opts.idempotencyKey ?? null,
finishedAt: new Date(),
});
};
if (source === "timer" && !policy.enabled) {
await writeSkippedRequest("heartbeat.disabled");
return null;
}
if (source === "assignment" && !policy.wakeOnAssignment) {
await writeSkippedRequest("heartbeat.wakeOnAssignment.disabled");
return null;
}
if (source === "automation" && !policy.wakeOnAutomation) {
await writeSkippedRequest("heartbeat.wakeOnAutomation.disabled");
return null;
}
if (source === "on_demand" && triggerDetail === "ping" && !policy.wakeOnOnDemand) {
await writeSkippedRequest("heartbeat.wakeOnOnDemand.disabled");
return null;
}
const activeRun = await db
.select()
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])))
.orderBy(desc(heartbeatRuns.createdAt))
.then((rows) => rows[0] ?? null);
if (activeRun) {
await db.insert(agentWakeupRequests).values({
companyId: agent.companyId,
agentId,
source,
triggerDetail,
reason: opts.reason ?? null,
payload: opts.payload ?? null,
status: "coalesced",
coalescedCount: 1,
requestedByActorType: opts.requestedByActorType ?? null,
requestedByActorId: opts.requestedByActorId ?? null,
idempotencyKey: opts.idempotencyKey ?? null,
runId: activeRun.id,
finishedAt: new Date(),
});
return activeRun;
}
const wakeupRequest = await db
.insert(agentWakeupRequests)
.values({
companyId: agent.companyId,
agentId,
source,
triggerDetail,
reason: opts.reason ?? null,
payload: opts.payload ?? null,
status: "queued",
requestedByActorType: opts.requestedByActorType ?? null,
requestedByActorId: opts.requestedByActorId ?? null,
idempotencyKey: opts.idempotencyKey ?? null,
})
.returning()
.then((rows) => rows[0]);
const runtime = await getRuntimeState(agent.id);
const run = await db
.insert(heartbeatRuns)
.values({
companyId: agent.companyId,
agentId,
invocationSource: source,
triggerDetail,
status: "queued",
wakeupRequestId: wakeupRequest.id,
contextSnapshot,
sessionIdBefore: runtime?.sessionId ?? null,
})
.returning()
.then((rows) => rows[0]);
await db
.update(agentWakeupRequests)
.set({
runId: run.id,
updatedAt: new Date(),
})
.where(eq(agentWakeupRequests.id, wakeupRequest.id));
publishLiveEvent({
companyId: run.companyId,
type: "heartbeat.run.queued",
payload: {
runId: run.id,
agentId: run.agentId,
invocationSource: run.invocationSource,
triggerDetail: run.triggerDetail,
wakeupRequestId: run.wakeupRequestId,
},
});
void executeRun(run.id).catch((err) => {
logger.error({ err, runId: run.id }, "heartbeat execution failed");
});
return run;
}
return {
list: (companyId: string, agentId?: string) => {
if (!agentId) {
return db
.select()
.from(heartbeatRuns)
.where(eq(heartbeatRuns.companyId, companyId))
.orderBy(desc(heartbeatRuns.createdAt));
}
return db
.select()
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.companyId, companyId), eq(heartbeatRuns.agentId, agentId)))
.orderBy(desc(heartbeatRuns.createdAt));
},
getRun,
getRuntimeState: async (agentId: string) => {
const state = await getRuntimeState(agentId);
if (state) return state;
const agent = await getAgent(agentId);
if (!agent) return null;
return ensureRuntimeState(agent);
},
resetRuntimeSession: async (agentId: string) => {
const agent = await getAgent(agentId);
if (!agent) throw notFound("Agent not found");
await ensureRuntimeState(agent);
return db
.update(agentRuntimeState)
.set({
sessionId: null,
stateJson: {},
lastError: null,
updatedAt: new Date(),
})
.where(eq(agentRuntimeState.agentId, agentId))
.returning()
.then((rows) => rows[0] ?? null);
},
listEvents: (runId: string, afterSeq = 0, limit = 200) =>
db
.select()
.from(heartbeatRunEvents)
.where(and(eq(heartbeatRunEvents.runId, runId), gt(heartbeatRunEvents.seq, afterSeq)))
.orderBy(asc(heartbeatRunEvents.seq))
.limit(Math.max(1, Math.min(limit, 1000))),
readLog: async (runId: string, opts?: { offset?: number; limitBytes?: number }) => {
const run = await getRun(runId);
if (!run) throw notFound("Heartbeat run not found");
if (!run.logStore || !run.logRef) throw notFound("Run log not found");
const result = await runLogStore.read(
{
store: run.logStore as "local_file",
logRef: run.logRef,
},
opts,
);
return {
runId,
store: run.logStore,
logRef: run.logRef,
...result,
};
},
invoke: async (
agentId: string,
source: "timer" | "assignment" | "on_demand" | "automation" = "on_demand",
contextSnapshot: Record<string, unknown> = {},
triggerDetail: "manual" | "ping" | "callback" | "system" = "manual",
actor?: { actorType?: "user" | "agent" | "system"; actorId?: string | null },
) =>
enqueueWakeup(agentId, {
source,
triggerDetail,
contextSnapshot,
requestedByActorType: actor?.actorType,
requestedByActorId: actor?.actorId ?? null,
}),
wakeup: enqueueWakeup,
tickTimers: async (now = new Date()) => {
const allAgents = await db.select().from(agents);
let checked = 0;
let enqueued = 0;
let skipped = 0;
for (const agent of allAgents) {
if (agent.status === "paused" || agent.status === "terminated") continue;
const policy = parseHeartbeatPolicy(agent);
if (!policy.enabled || policy.intervalSec <= 0) continue;
checked += 1;
const last = agent.lastHeartbeatAt ? new Date(agent.lastHeartbeatAt).getTime() : 0;
const elapsedMs = now.getTime() - last;
if (last && elapsedMs < policy.intervalSec * 1000) continue;
const run = await enqueueWakeup(agent.id, {
source: "timer",
triggerDetail: "system",
reason: "heartbeat_timer",
requestedByActorType: "system",
requestedByActorId: "heartbeat_scheduler",
contextSnapshot: {
source: "scheduler",
reason: "interval_elapsed",
now: now.toISOString(),
},
});
if (run) enqueued += 1;
else skipped += 1;
}
return { checked, enqueued, skipped };
},
cancelRun: async (runId: string) => {
const run = await getRun(runId);
if (!run) throw notFound("Heartbeat run not found");
if (run.status !== "running" && run.status !== "queued") return run;
const running = runningProcesses.get(run.id);
if (running) {
running.child.kill("SIGTERM");
const graceMs = Math.max(1, running.graceSec) * 1000;
setTimeout(() => {
if (!running.child.killed) {
running.child.kill("SIGKILL");
}
}, graceMs);
}
const cancelled = await setRunStatus(run.id, "cancelled", {
finishedAt: new Date(),
error: "Cancelled by control plane",
errorCode: "cancelled",
});
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
finishedAt: new Date(),
error: "Cancelled by control plane",
});
if (cancelled) {
await appendRunEvent(cancelled, 1, {
eventType: "lifecycle",
stream: "system",
level: "warn",
message: "run cancelled",
});
}
runningProcesses.delete(run.id);
await finalizeAgentStatus(run.agentId, "cancelled");
return cancelled;
},
cancelActiveForAgent: async (agentId: string) => {
const runs = await db
.select()
.from(heartbeatRuns)
.where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"])));
for (const run of runs) {
await setRunStatus(run.id, "cancelled", {
finishedAt: new Date(),
error: "Cancelled due to agent pause",
errorCode: "cancelled",
});
await setWakeupStatus(run.wakeupRequestId, "cancelled", {
finishedAt: new Date(),
error: "Cancelled due to agent pause",
});
const running = runningProcesses.get(run.id);
if (running) {
running.child.kill("SIGTERM");
runningProcesses.delete(run.id);
}
}
return runs.length;
},
};
}