import { and, asc, desc, eq, gt, inArray, sql } from "drizzle-orm"; import type { Db } from "@paperclip/db"; import { agents, agentRuntimeState, agentWakeupRequests, heartbeatRunEvents, heartbeatRuns, costEvents, } from "@paperclip/db"; import { conflict, notFound } from "../errors.js"; import { logger } from "../middleware/logger.js"; import { publishLiveEvent } from "./live-events.js"; import { getRunLogStore, type RunLogHandle } from "./run-log-store.js"; import { getServerAdapter, runningProcesses } from "../adapters/index.js"; import type { AdapterExecutionResult, AdapterInvocationMeta } from "../adapters/index.js"; import { createLocalAgentJwt } from "../agent-auth-jwt.js"; import { parseObject, asBoolean, asNumber, appendWithCap, MAX_EXCERPT_BYTES } from "../adapters/utils.js"; const MAX_LIVE_LOG_CHUNK_BYTES = 8 * 1024; function appendExcerpt(prev: string, chunk: string) { return appendWithCap(prev, chunk, MAX_EXCERPT_BYTES); } interface WakeupOptions { source?: "timer" | "assignment" | "on_demand" | "automation"; triggerDetail?: "manual" | "ping" | "callback" | "system"; reason?: string | null; payload?: Record | null; idempotencyKey?: string | null; requestedByActorType?: "user" | "agent" | "system"; requestedByActorId?: string | null; contextSnapshot?: Record; } function readNonEmptyString(value: unknown): string | null { return typeof value === "string" && value.trim().length > 0 ? value : null; } export function heartbeatService(db: Db) { const runLogStore = getRunLogStore(); async function getAgent(agentId: string) { return db .select() .from(agents) .where(eq(agents.id, agentId)) .then((rows) => rows[0] ?? null); } async function getRun(runId: string) { return db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.id, runId)) .then((rows) => rows[0] ?? null); } async function getRuntimeState(agentId: string) { return db .select() .from(agentRuntimeState) .where(eq(agentRuntimeState.agentId, agentId)) .then((rows) => rows[0] ?? null); } async function ensureRuntimeState(agent: typeof agents.$inferSelect) { const existing = await getRuntimeState(agent.id); if (existing) return existing; return db .insert(agentRuntimeState) .values({ agentId: agent.id, companyId: agent.companyId, adapterType: agent.adapterType, stateJson: {}, }) .returning() .then((rows) => rows[0]); } async function setRunStatus( runId: string, status: string, patch?: Partial, ) { const updated = await db .update(heartbeatRuns) .set({ status, ...patch, updatedAt: new Date() }) .where(eq(heartbeatRuns.id, runId)) .returning() .then((rows) => rows[0] ?? null); if (updated) { publishLiveEvent({ companyId: updated.companyId, type: "heartbeat.run.status", payload: { runId: updated.id, agentId: updated.agentId, status: updated.status, invocationSource: updated.invocationSource, triggerDetail: updated.triggerDetail, error: updated.error ?? null, errorCode: updated.errorCode ?? null, startedAt: updated.startedAt ? new Date(updated.startedAt).toISOString() : null, finishedAt: updated.finishedAt ? new Date(updated.finishedAt).toISOString() : null, }, }); } return updated; } async function setWakeupStatus( wakeupRequestId: string | null | undefined, status: string, patch?: Partial, ) { if (!wakeupRequestId) return; await db .update(agentWakeupRequests) .set({ status, ...patch, updatedAt: new Date() }) .where(eq(agentWakeupRequests.id, wakeupRequestId)); } async function appendRunEvent( run: typeof heartbeatRuns.$inferSelect, seq: number, event: { eventType: string; stream?: "system" | "stdout" | "stderr"; level?: "info" | "warn" | "error"; color?: string; message?: string; payload?: Record; }, ) { await db.insert(heartbeatRunEvents).values({ companyId: run.companyId, runId: run.id, agentId: run.agentId, seq, eventType: event.eventType, stream: event.stream, level: event.level, color: event.color, message: event.message, payload: event.payload, }); publishLiveEvent({ companyId: run.companyId, type: "heartbeat.run.event", payload: { runId: run.id, agentId: run.agentId, seq, eventType: event.eventType, stream: event.stream ?? null, level: event.level ?? null, color: event.color ?? null, message: event.message ?? null, payload: event.payload ?? null, }, }); } function parseHeartbeatPolicy(agent: typeof agents.$inferSelect) { const runtimeConfig = parseObject(agent.runtimeConfig); const heartbeat = parseObject(runtimeConfig.heartbeat); return { enabled: asBoolean(heartbeat.enabled, true), intervalSec: Math.max(0, asNumber(heartbeat.intervalSec, 0)), wakeOnDemand: asBoolean(heartbeat.wakeOnDemand ?? heartbeat.wakeOnAssignment ?? heartbeat.wakeOnOnDemand ?? heartbeat.wakeOnAutomation, true), }; } async function finalizeAgentStatus( agentId: string, outcome: "succeeded" | "failed" | "cancelled" | "timed_out", ) { const existing = await getAgent(agentId); if (!existing) return; if (existing.status === "paused" || existing.status === "terminated") { return; } const nextStatus = outcome === "succeeded" ? "idle" : outcome === "cancelled" ? "idle" : "error"; const updated = await db .update(agents) .set({ status: nextStatus, lastHeartbeatAt: new Date(), updatedAt: new Date(), }) .where(eq(agents.id, agentId)) .returning() .then((rows) => rows[0] ?? null); if (updated) { publishLiveEvent({ companyId: updated.companyId, type: "agent.status", payload: { agentId: updated.id, status: updated.status, lastHeartbeatAt: updated.lastHeartbeatAt ? new Date(updated.lastHeartbeatAt).toISOString() : null, outcome, }, }); } } async function reapOrphanedRuns(opts?: { staleThresholdMs?: number }) { const staleThresholdMs = opts?.staleThresholdMs ?? 0; const now = new Date(); // Find all runs in "queued" or "running" state const activeRuns = await db .select() .from(heartbeatRuns) .where(inArray(heartbeatRuns.status, ["queued", "running"])); const reaped: string[] = []; for (const run of activeRuns) { if (runningProcesses.has(run.id)) continue; // Apply staleness threshold to avoid false positives if (staleThresholdMs > 0) { const refTime = run.updatedAt ? new Date(run.updatedAt).getTime() : 0; if (now.getTime() - refTime < staleThresholdMs) continue; } await setRunStatus(run.id, "failed", { error: "Process lost -- server may have restarted", errorCode: "process_lost", finishedAt: now, }); await setWakeupStatus(run.wakeupRequestId, "failed", { finishedAt: now, error: "Process lost -- server may have restarted", }); const updatedRun = await getRun(run.id); if (updatedRun) { await appendRunEvent(updatedRun, 1, { eventType: "lifecycle", stream: "system", level: "error", message: "Process lost -- server may have restarted", }); } await finalizeAgentStatus(run.agentId, "failed"); runningProcesses.delete(run.id); reaped.push(run.id); } if (reaped.length > 0) { logger.warn({ reapedCount: reaped.length, runIds: reaped }, "reaped orphaned heartbeat runs"); } return { reaped: reaped.length, runIds: reaped }; } async function updateRuntimeState( agent: typeof agents.$inferSelect, run: typeof heartbeatRuns.$inferSelect, result: AdapterExecutionResult, ) { const existing = await ensureRuntimeState(agent); const usage = result.usage; const inputTokens = usage?.inputTokens ?? 0; const outputTokens = usage?.outputTokens ?? 0; const cachedInputTokens = usage?.cachedInputTokens ?? 0; const additionalCostCents = Math.max(0, Math.round((result.costUsd ?? 0) * 100)); await db .update(agentRuntimeState) .set({ adapterType: agent.adapterType, sessionId: result.clearSession ? null : (result.sessionId ?? existing.sessionId), lastRunId: run.id, lastRunStatus: run.status, lastError: result.errorMessage ?? null, totalInputTokens: existing.totalInputTokens + inputTokens, totalOutputTokens: existing.totalOutputTokens + outputTokens, totalCachedInputTokens: existing.totalCachedInputTokens + cachedInputTokens, totalCostCents: existing.totalCostCents + additionalCostCents, updatedAt: new Date(), }) .where(eq(agentRuntimeState.agentId, agent.id)); if (additionalCostCents > 0) { await db.insert(costEvents).values({ companyId: agent.companyId, agentId: agent.id, provider: result.provider ?? "unknown", model: result.model ?? "unknown", inputTokens, outputTokens, costCents: additionalCostCents, occurredAt: new Date(), }); await db .update(agents) .set({ spentMonthlyCents: sql`${agents.spentMonthlyCents} + ${additionalCostCents}`, updatedAt: new Date(), }) .where(eq(agents.id, agent.id)); } } async function executeRun(runId: string) { const run = await getRun(runId); if (!run) return; if (run.status !== "queued" && run.status !== "running") return; const agent = await getAgent(run.agentId); if (!agent) { await setRunStatus(runId, "failed", { error: "Agent not found", errorCode: "agent_not_found", finishedAt: new Date(), }); await setWakeupStatus(run.wakeupRequestId, "failed", { finishedAt: new Date(), error: "Agent not found", }); return; } const runtime = await ensureRuntimeState(agent); let seq = 1; let handle: RunLogHandle | null = null; let stdoutExcerpt = ""; let stderrExcerpt = ""; try { await setRunStatus(runId, "running", { startedAt: new Date(), sessionIdBefore: runtime.sessionId, }); await setWakeupStatus(run.wakeupRequestId, "claimed", { claimedAt: new Date() }); const runningAgent = await db .update(agents) .set({ status: "running", updatedAt: new Date() }) .where(eq(agents.id, agent.id)) .returning() .then((rows) => rows[0] ?? null); if (runningAgent) { publishLiveEvent({ companyId: runningAgent.companyId, type: "agent.status", payload: { agentId: runningAgent.id, status: runningAgent.status, outcome: "running", }, }); } const currentRun = (await getRun(runId)) ?? run; await appendRunEvent(currentRun, seq++, { eventType: "lifecycle", stream: "system", level: "info", message: "run started", }); handle = await runLogStore.begin({ companyId: run.companyId, agentId: run.agentId, runId, }); await db .update(heartbeatRuns) .set({ logStore: handle.store, logRef: handle.logRef, updatedAt: new Date(), }) .where(eq(heartbeatRuns.id, runId)); const onLog = async (stream: "stdout" | "stderr", chunk: string) => { if (stream === "stdout") stdoutExcerpt = appendExcerpt(stdoutExcerpt, chunk); if (stream === "stderr") stderrExcerpt = appendExcerpt(stderrExcerpt, chunk); if (handle) { await runLogStore.append(handle, { stream, chunk, ts: new Date().toISOString(), }); } const payloadChunk = chunk.length > MAX_LIVE_LOG_CHUNK_BYTES ? chunk.slice(chunk.length - MAX_LIVE_LOG_CHUNK_BYTES) : chunk; publishLiveEvent({ companyId: run.companyId, type: "heartbeat.run.log", payload: { runId: run.id, agentId: run.agentId, stream, chunk: payloadChunk, truncated: payloadChunk.length !== chunk.length, }, }); }; const config = parseObject(agent.adapterConfig); const context = (run.contextSnapshot ?? {}) as Record; const onAdapterMeta = async (meta: AdapterInvocationMeta) => { await appendRunEvent(currentRun, seq++, { eventType: "adapter.invoke", stream: "system", level: "info", message: "adapter invocation", payload: meta as unknown as Record, }); }; const adapter = getServerAdapter(agent.adapterType); const authToken = adapter.supportsLocalAgentJwt ? createLocalAgentJwt(agent.id, agent.companyId, agent.adapterType, run.id) : null; if (adapter.supportsLocalAgentJwt && !authToken) { logger.warn( { companyId: agent.companyId, agentId: agent.id, runId: run.id, adapterType: agent.adapterType, }, "local agent jwt secret missing or invalid; running without injected PAPERCLIP_API_KEY", ); } const adapterResult = await adapter.execute({ runId: run.id, agent, runtime, config, context, onLog, onMeta: onAdapterMeta, authToken: authToken ?? undefined, }); let outcome: "succeeded" | "failed" | "cancelled" | "timed_out"; const latestRun = await getRun(run.id); if (latestRun?.status === "cancelled") { outcome = "cancelled"; } else if (adapterResult.timedOut) { outcome = "timed_out"; } else if ((adapterResult.exitCode ?? 0) === 0 && !adapterResult.errorMessage) { outcome = "succeeded"; } else { outcome = "failed"; } let logSummary: { bytes: number; sha256?: string; compressed: boolean } | null = null; if (handle) { logSummary = await runLogStore.finalize(handle); } const status = outcome === "succeeded" ? "succeeded" : outcome === "cancelled" ? "cancelled" : outcome === "timed_out" ? "timed_out" : "failed"; const usageJson = adapterResult.usage || adapterResult.costUsd != null ? ({ ...(adapterResult.usage ?? {}), ...(adapterResult.costUsd != null ? { costUsd: adapterResult.costUsd } : {}), } as Record) : null; await setRunStatus(run.id, status, { finishedAt: new Date(), error: outcome === "succeeded" ? null : adapterResult.errorMessage ?? (outcome === "timed_out" ? "Timed out" : "Adapter failed"), errorCode: outcome === "timed_out" ? "timeout" : outcome === "cancelled" ? "cancelled" : outcome === "failed" ? "adapter_failed" : null, exitCode: adapterResult.exitCode, signal: adapterResult.signal, usageJson, resultJson: adapterResult.resultJson ?? null, sessionIdAfter: adapterResult.sessionId ?? runtime.sessionId, stdoutExcerpt, stderrExcerpt, logBytes: logSummary?.bytes, logSha256: logSummary?.sha256, logCompressed: logSummary?.compressed ?? false, }); await setWakeupStatus(run.wakeupRequestId, outcome === "succeeded" ? "completed" : status, { finishedAt: new Date(), error: adapterResult.errorMessage ?? null, }); const finalizedRun = await getRun(run.id); if (finalizedRun) { await appendRunEvent(finalizedRun, seq++, { eventType: "lifecycle", stream: "system", level: outcome === "succeeded" ? "info" : "error", message: `run ${outcome}`, payload: { status, exitCode: adapterResult.exitCode, }, }); } if (finalizedRun) { await updateRuntimeState(agent, finalizedRun, adapterResult); } await finalizeAgentStatus(agent.id, outcome); } catch (err) { const message = err instanceof Error ? err.message : "Unknown adapter failure"; logger.error({ err, runId }, "heartbeat execution failed"); let logSummary: { bytes: number; sha256?: string; compressed: boolean } | null = null; if (handle) { try { logSummary = await runLogStore.finalize(handle); } catch (finalizeErr) { logger.warn({ err: finalizeErr, runId }, "failed to finalize run log after error"); } } const failedRun = await setRunStatus(run.id, "failed", { error: message, errorCode: "adapter_failed", finishedAt: new Date(), stdoutExcerpt, stderrExcerpt, logBytes: logSummary?.bytes, logSha256: logSummary?.sha256, logCompressed: logSummary?.compressed ?? false, }); await setWakeupStatus(run.wakeupRequestId, "failed", { finishedAt: new Date(), error: message, }); if (failedRun) { await appendRunEvent(failedRun, seq++, { eventType: "error", stream: "system", level: "error", message, }); await updateRuntimeState(agent, failedRun, { exitCode: null, signal: null, timedOut: false, errorMessage: message, }); } await finalizeAgentStatus(agent.id, "failed"); } } async function enqueueWakeup(agentId: string, opts: WakeupOptions = {}) { const source = opts.source ?? "on_demand"; const triggerDetail = opts.triggerDetail ?? null; const contextSnapshot: Record = { ...(opts.contextSnapshot ?? {}) }; const reason = opts.reason ?? null; const payload = opts.payload ?? null; const issueIdFromPayload = readNonEmptyString(payload?.["issueId"]); if (!readNonEmptyString(contextSnapshot["wakeReason"]) && reason) { contextSnapshot.wakeReason = reason; } if (!readNonEmptyString(contextSnapshot["issueId"]) && issueIdFromPayload) { contextSnapshot.issueId = issueIdFromPayload; } if (!readNonEmptyString(contextSnapshot["taskId"]) && issueIdFromPayload) { contextSnapshot.taskId = issueIdFromPayload; } if (!readNonEmptyString(contextSnapshot["wakeSource"])) { contextSnapshot.wakeSource = source; } if (!readNonEmptyString(contextSnapshot["wakeTriggerDetail"]) && triggerDetail) { contextSnapshot.wakeTriggerDetail = triggerDetail; } const agent = await getAgent(agentId); if (!agent) throw notFound("Agent not found"); if ( agent.status === "paused" || agent.status === "terminated" || agent.status === "pending_approval" ) { throw conflict("Agent is not invokable in its current state", { status: agent.status }); } const policy = parseHeartbeatPolicy(agent); const writeSkippedRequest = async (reason: string) => { await db.insert(agentWakeupRequests).values({ companyId: agent.companyId, agentId, source, triggerDetail, reason, payload, status: "skipped", requestedByActorType: opts.requestedByActorType ?? null, requestedByActorId: opts.requestedByActorId ?? null, idempotencyKey: opts.idempotencyKey ?? null, finishedAt: new Date(), }); }; if (source === "timer" && !policy.enabled) { await writeSkippedRequest("heartbeat.disabled"); return null; } if (source !== "timer" && !policy.wakeOnDemand) { await writeSkippedRequest("heartbeat.wakeOnDemand.disabled"); return null; } const activeRun = await db .select() .from(heartbeatRuns) .where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"]))) .orderBy(desc(heartbeatRuns.createdAt)) .then((rows) => rows[0] ?? null); if (activeRun) { await db.insert(agentWakeupRequests).values({ companyId: agent.companyId, agentId, source, triggerDetail, reason, payload, status: "coalesced", coalescedCount: 1, requestedByActorType: opts.requestedByActorType ?? null, requestedByActorId: opts.requestedByActorId ?? null, idempotencyKey: opts.idempotencyKey ?? null, runId: activeRun.id, finishedAt: new Date(), }); return activeRun; } const wakeupRequest = await db .insert(agentWakeupRequests) .values({ companyId: agent.companyId, agentId, source, triggerDetail, reason, payload, status: "queued", requestedByActorType: opts.requestedByActorType ?? null, requestedByActorId: opts.requestedByActorId ?? null, idempotencyKey: opts.idempotencyKey ?? null, }) .returning() .then((rows) => rows[0]); const runtimeForRun = await getRuntimeState(agent.id); const newRun = await db .insert(heartbeatRuns) .values({ companyId: agent.companyId, agentId, invocationSource: source, triggerDetail, status: "queued", wakeupRequestId: wakeupRequest.id, contextSnapshot, sessionIdBefore: runtimeForRun?.sessionId ?? null, }) .returning() .then((rows) => rows[0]); await db .update(agentWakeupRequests) .set({ runId: newRun.id, updatedAt: new Date(), }) .where(eq(agentWakeupRequests.id, wakeupRequest.id)); publishLiveEvent({ companyId: newRun.companyId, type: "heartbeat.run.queued", payload: { runId: newRun.id, agentId: newRun.agentId, invocationSource: newRun.invocationSource, triggerDetail: newRun.triggerDetail, wakeupRequestId: newRun.wakeupRequestId, }, }); void executeRun(newRun.id).catch((err) => { logger.error({ err, runId: newRun.id }, "heartbeat execution failed"); }); return newRun; } return { list: (companyId: string, agentId?: string) => { if (!agentId) { return db .select() .from(heartbeatRuns) .where(eq(heartbeatRuns.companyId, companyId)) .orderBy(desc(heartbeatRuns.createdAt)); } return db .select() .from(heartbeatRuns) .where(and(eq(heartbeatRuns.companyId, companyId), eq(heartbeatRuns.agentId, agentId))) .orderBy(desc(heartbeatRuns.createdAt)); }, getRun, getRuntimeState: async (agentId: string) => { const state = await getRuntimeState(agentId); if (state) return state; const agent = await getAgent(agentId); if (!agent) return null; return ensureRuntimeState(agent); }, resetRuntimeSession: async (agentId: string) => { const agent = await getAgent(agentId); if (!agent) throw notFound("Agent not found"); await ensureRuntimeState(agent); return db .update(agentRuntimeState) .set({ sessionId: null, stateJson: {}, lastError: null, updatedAt: new Date(), }) .where(eq(agentRuntimeState.agentId, agentId)) .returning() .then((rows) => rows[0] ?? null); }, listEvents: (runId: string, afterSeq = 0, limit = 200) => db .select() .from(heartbeatRunEvents) .where(and(eq(heartbeatRunEvents.runId, runId), gt(heartbeatRunEvents.seq, afterSeq))) .orderBy(asc(heartbeatRunEvents.seq)) .limit(Math.max(1, Math.min(limit, 1000))), readLog: async (runId: string, opts?: { offset?: number; limitBytes?: number }) => { const run = await getRun(runId); if (!run) throw notFound("Heartbeat run not found"); if (!run.logStore || !run.logRef) throw notFound("Run log not found"); const result = await runLogStore.read( { store: run.logStore as "local_file", logRef: run.logRef, }, opts, ); return { runId, store: run.logStore, logRef: run.logRef, ...result, }; }, invoke: async ( agentId: string, source: "timer" | "assignment" | "on_demand" | "automation" = "on_demand", contextSnapshot: Record = {}, triggerDetail: "manual" | "ping" | "callback" | "system" = "manual", actor?: { actorType?: "user" | "agent" | "system"; actorId?: string | null }, ) => enqueueWakeup(agentId, { source, triggerDetail, contextSnapshot, requestedByActorType: actor?.actorType, requestedByActorId: actor?.actorId ?? null, }), wakeup: enqueueWakeup, reapOrphanedRuns, tickTimers: async (now = new Date()) => { const allAgents = await db.select().from(agents); let checked = 0; let enqueued = 0; let skipped = 0; for (const agent of allAgents) { if (agent.status === "paused" || agent.status === "terminated") continue; const policy = parseHeartbeatPolicy(agent); if (!policy.enabled || policy.intervalSec <= 0) continue; checked += 1; const last = agent.lastHeartbeatAt ? new Date(agent.lastHeartbeatAt).getTime() : 0; const elapsedMs = now.getTime() - last; if (last && elapsedMs < policy.intervalSec * 1000) continue; const run = await enqueueWakeup(agent.id, { source: "timer", triggerDetail: "system", reason: "heartbeat_timer", requestedByActorType: "system", requestedByActorId: "heartbeat_scheduler", contextSnapshot: { source: "scheduler", reason: "interval_elapsed", now: now.toISOString(), }, }); if (run) enqueued += 1; else skipped += 1; } return { checked, enqueued, skipped }; }, cancelRun: async (runId: string) => { const run = await getRun(runId); if (!run) throw notFound("Heartbeat run not found"); if (run.status !== "running" && run.status !== "queued") return run; const running = runningProcesses.get(run.id); if (running) { running.child.kill("SIGTERM"); const graceMs = Math.max(1, running.graceSec) * 1000; setTimeout(() => { if (!running.child.killed) { running.child.kill("SIGKILL"); } }, graceMs); } const cancelled = await setRunStatus(run.id, "cancelled", { finishedAt: new Date(), error: "Cancelled by control plane", errorCode: "cancelled", }); await setWakeupStatus(run.wakeupRequestId, "cancelled", { finishedAt: new Date(), error: "Cancelled by control plane", }); if (cancelled) { await appendRunEvent(cancelled, 1, { eventType: "lifecycle", stream: "system", level: "warn", message: "run cancelled", }); } runningProcesses.delete(run.id); await finalizeAgentStatus(run.agentId, "cancelled"); return cancelled; }, cancelActiveForAgent: async (agentId: string) => { const runs = await db .select() .from(heartbeatRuns) .where(and(eq(heartbeatRuns.agentId, agentId), inArray(heartbeatRuns.status, ["queued", "running"]))); for (const run of runs) { await setRunStatus(run.id, "cancelled", { finishedAt: new Date(), error: "Cancelled due to agent pause", errorCode: "cancelled", }); await setWakeupStatus(run.wakeupRequestId, "cancelled", { finishedAt: new Date(), error: "Cancelled due to agent pause", }); const running = runningProcesses.get(run.id); if (running) { running.child.kill("SIGTERM"); runningProcesses.delete(run.id); } } return runs.length; }, getActiveRunForAgent: async (agentId: string) => { const [run] = await db .select() .from(heartbeatRuns) .where( and( eq(heartbeatRuns.agentId, agentId), eq(heartbeatRuns.status, "running"), ), ) .orderBy(desc(heartbeatRuns.startedAt)) .limit(1); return run ?? null; }, }; }