fix(heartbeat): prevent false process_lost failures on queued and non-child-process runs
- reapOrphanedRuns() now only scans running runs; queued runs are legitimately absent from runningProcesses (waiting on concurrency limits or issue locks) so including them caused false process_lost failures (closes #90) - Add module-level activeRunExecutions set so non-child-process adapters (http, openclaw) are protected from the reaper during execution - Add resumeQueuedRuns() to restart persisted queued runs after a server restart, called at startup and each periodic tick - Add outer catch in executeRun() so setup failures (ensureRuntimeState, resolveWorkspaceForRun, etc.) are recorded as failed runs instead of leaving them stuck in running state - Guard resumeQueuedRuns() against paused/terminated/pending_approval agents - Increase opencode models discovery timeout from 20s to 45s
This commit is contained in:
@@ -7,6 +7,7 @@ import {
|
|||||||
} from "@paperclipai/adapter-utils/server-utils";
|
} from "@paperclipai/adapter-utils/server-utils";
|
||||||
|
|
||||||
const MODELS_CACHE_TTL_MS = 60_000;
|
const MODELS_CACHE_TTL_MS = 60_000;
|
||||||
|
const MODELS_DISCOVERY_TIMEOUT_MS = 20_000;
|
||||||
|
|
||||||
function resolveOpenCodeCommand(input: unknown): string {
|
function resolveOpenCodeCommand(input: unknown): string {
|
||||||
const envOverride =
|
const envOverride =
|
||||||
@@ -115,14 +116,14 @@ export async function discoverOpenCodeModels(input: {
|
|||||||
{
|
{
|
||||||
cwd,
|
cwd,
|
||||||
env: runtimeEnv,
|
env: runtimeEnv,
|
||||||
timeoutSec: 20,
|
timeoutSec: MODELS_DISCOVERY_TIMEOUT_MS / 1000,
|
||||||
graceSec: 3,
|
graceSec: 3,
|
||||||
onLog: async () => {},
|
onLog: async () => {},
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
if (result.timedOut) {
|
if (result.timedOut) {
|
||||||
throw new Error("`opencode models` timed out.");
|
throw new Error(`\`opencode models\` timed out after ${MODELS_DISCOVERY_TIMEOUT_MS / 1000}s.`);
|
||||||
}
|
}
|
||||||
if ((result.exitCode ?? 1) !== 0) {
|
if ((result.exitCode ?? 1) !== 0) {
|
||||||
const detail = firstNonEmptyLine(result.stderr) || firstNonEmptyLine(result.stdout);
|
const detail = firstNonEmptyLine(result.stderr) || firstNonEmptyLine(result.stdout);
|
||||||
|
|||||||
@@ -38,9 +38,9 @@
|
|||||||
"@paperclipai/adapter-codex-local": "workspace:*",
|
"@paperclipai/adapter-codex-local": "workspace:*",
|
||||||
"@paperclipai/adapter-cursor-local": "workspace:*",
|
"@paperclipai/adapter-cursor-local": "workspace:*",
|
||||||
"@paperclipai/adapter-gemini-local": "workspace:*",
|
"@paperclipai/adapter-gemini-local": "workspace:*",
|
||||||
|
"@paperclipai/adapter-openclaw-gateway": "workspace:*",
|
||||||
"@paperclipai/adapter-opencode-local": "workspace:*",
|
"@paperclipai/adapter-opencode-local": "workspace:*",
|
||||||
"@paperclipai/adapter-pi-local": "workspace:*",
|
"@paperclipai/adapter-pi-local": "workspace:*",
|
||||||
"@paperclipai/adapter-openclaw-gateway": "workspace:*",
|
|
||||||
"@paperclipai/adapter-utils": "workspace:*",
|
"@paperclipai/adapter-utils": "workspace:*",
|
||||||
"@paperclipai/db": "workspace:*",
|
"@paperclipai/db": "workspace:*",
|
||||||
"@paperclipai/shared": "workspace:*",
|
"@paperclipai/shared": "workspace:*",
|
||||||
|
|||||||
@@ -513,11 +513,14 @@ export async function startServer(): Promise<StartedServer> {
|
|||||||
if (config.heartbeatSchedulerEnabled) {
|
if (config.heartbeatSchedulerEnabled) {
|
||||||
const heartbeat = heartbeatService(db as any);
|
const heartbeat = heartbeatService(db as any);
|
||||||
|
|
||||||
// Reap orphaned runs at startup (no threshold -- runningProcesses is empty)
|
// Reap orphaned running runs at startup while in-memory execution state is empty,
|
||||||
void heartbeat.reapOrphanedRuns().catch((err) => {
|
// then resume any persisted queued runs that were waiting on the previous process.
|
||||||
logger.error({ err }, "startup reap of orphaned heartbeat runs failed");
|
void heartbeat
|
||||||
});
|
.reapOrphanedRuns()
|
||||||
|
.then(() => heartbeat.resumeQueuedRuns())
|
||||||
|
.catch((err) => {
|
||||||
|
logger.error({ err }, "startup heartbeat recovery failed");
|
||||||
|
});
|
||||||
setInterval(() => {
|
setInterval(() => {
|
||||||
void heartbeat
|
void heartbeat
|
||||||
.tickTimers(new Date())
|
.tickTimers(new Date())
|
||||||
@@ -530,11 +533,13 @@ export async function startServer(): Promise<StartedServer> {
|
|||||||
logger.error({ err }, "heartbeat timer tick failed");
|
logger.error({ err }, "heartbeat timer tick failed");
|
||||||
});
|
});
|
||||||
|
|
||||||
// Periodically reap orphaned runs (5-min staleness threshold)
|
// Periodically reap orphaned runs (5-min staleness threshold) and make sure
|
||||||
|
// persisted queued work is still being driven forward.
|
||||||
void heartbeat
|
void heartbeat
|
||||||
.reapOrphanedRuns({ staleThresholdMs: 5 * 60 * 1000 })
|
.reapOrphanedRuns({ staleThresholdMs: 5 * 60 * 1000 })
|
||||||
|
.then(() => heartbeat.resumeQueuedRuns())
|
||||||
.catch((err) => {
|
.catch((err) => {
|
||||||
logger.error({ err }, "periodic reap of orphaned heartbeat runs failed");
|
logger.error({ err }, "periodic heartbeat recovery failed");
|
||||||
});
|
});
|
||||||
}, config.heartbeatSchedulerIntervalMs);
|
}, config.heartbeatSchedulerIntervalMs);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1089,6 +1089,9 @@ export function heartbeatService(db: Db) {
|
|||||||
run = claimed;
|
run = claimed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
activeRunExecutions.add(run.id);
|
||||||
|
|
||||||
|
try {
|
||||||
const agent = await getAgent(run.agentId);
|
const agent = await getAgent(run.agentId);
|
||||||
if (!agent) {
|
if (!agent) {
|
||||||
await setRunStatus(runId, "failed", {
|
await setRunStatus(runId, "failed", {
|
||||||
@@ -1676,10 +1679,39 @@ export function heartbeatService(db: Db) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
await finalizeAgentStatus(agent.id, "failed");
|
await finalizeAgentStatus(agent.id, "failed");
|
||||||
} finally {
|
} catch (outerErr) {
|
||||||
await releaseRuntimeServicesForRun(run.id);
|
// Setup code before adapter.execute threw (e.g. ensureRuntimeState, resolveWorkspaceForRun).
|
||||||
await startNextQueuedRunForAgent(agent.id);
|
// The inner catch did not fire, so we must record the failure here.
|
||||||
}
|
const message = outerErr instanceof Error ? outerErr.message : "Unknown setup failure";
|
||||||
|
logger.error({ err: outerErr, runId }, "heartbeat execution setup failed");
|
||||||
|
await setRunStatus(runId, "failed", {
|
||||||
|
error: message,
|
||||||
|
errorCode: "adapter_failed",
|
||||||
|
finishedAt: new Date(),
|
||||||
|
}).catch(() => undefined);
|
||||||
|
await setWakeupStatus(run.wakeupRequestId, "failed", {
|
||||||
|
finishedAt: new Date(),
|
||||||
|
error: message,
|
||||||
|
}).catch(() => undefined);
|
||||||
|
const failedRun = await getRun(runId).catch(() => null);
|
||||||
|
if (failedRun) {
|
||||||
|
// Emit a run-log event so the failure is visible in the run timeline,
|
||||||
|
// consistent with what the inner catch block does for adapter failures.
|
||||||
|
await appendRunEvent(failedRun, 1, {
|
||||||
|
eventType: "error",
|
||||||
|
stream: "system",
|
||||||
|
level: "error",
|
||||||
|
message,
|
||||||
|
}).catch(() => undefined);
|
||||||
|
await releaseIssueExecutionAndPromote(failedRun).catch(() => undefined);
|
||||||
|
}
|
||||||
|
// Ensure the agent is not left stuck in "running" if the inner catch handler's
|
||||||
|
// DB calls threw (e.g. a transient DB error in finalizeAgentStatus).
|
||||||
|
await finalizeAgentStatus(run.agentId, "failed").catch(() => undefined);
|
||||||
|
} finally {
|
||||||
|
await releaseRuntimeServicesForRun(run.id).catch(() => undefined);
|
||||||
|
activeRunExecutions.delete(run.id);
|
||||||
|
await startNextQueuedRunForAgent(run.agentId);
|
||||||
}
|
}
|
||||||
|
|
||||||
async function releaseIssueExecutionAndPromote(run: typeof heartbeatRuns.$inferSelect) {
|
async function releaseIssueExecutionAndPromote(run: typeof heartbeatRuns.$inferSelect) {
|
||||||
|
|||||||
Reference in New Issue
Block a user