fix: resolve type errors in process-lost-reaper PR
- Fix malformed try/catch/finally blocks in heartbeat executeRun - Declare activeRunExecutions Set to track in-flight runs - Add resumeQueuedRuns function and export from heartbeat service - Add initdbFlags to EmbeddedPostgresCtor type Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -53,6 +53,7 @@ type EmbeddedPostgresCtor = new (opts: {
|
|||||||
password: string;
|
password: string;
|
||||||
port: number;
|
port: number;
|
||||||
persistent: boolean;
|
persistent: boolean;
|
||||||
|
initdbFlags?: string[];
|
||||||
onLog?: (message: unknown) => void;
|
onLog?: (message: unknown) => void;
|
||||||
onError?: (message: unknown) => void;
|
onError?: (message: unknown) => void;
|
||||||
}) => EmbeddedPostgresInstance;
|
}) => EmbeddedPostgresInstance;
|
||||||
|
|||||||
@@ -455,6 +455,7 @@ export function heartbeatService(db: Db) {
|
|||||||
const runLogStore = getRunLogStore();
|
const runLogStore = getRunLogStore();
|
||||||
const secretsSvc = secretService(db);
|
const secretsSvc = secretService(db);
|
||||||
const issuesSvc = issueService(db);
|
const issuesSvc = issueService(db);
|
||||||
|
const activeRunExecutions = new Set<string>();
|
||||||
|
|
||||||
async function getAgent(agentId: string) {
|
async function getAgent(agentId: string) {
|
||||||
return db
|
return db
|
||||||
@@ -959,7 +960,7 @@ export function heartbeatService(db: Db) {
|
|||||||
const reaped: string[] = [];
|
const reaped: string[] = [];
|
||||||
|
|
||||||
for (const run of activeRuns) {
|
for (const run of activeRuns) {
|
||||||
if (runningProcesses.has(run.id)) continue;
|
if (runningProcesses.has(run.id) || activeRunExecutions.has(run.id)) continue;
|
||||||
|
|
||||||
// Apply staleness threshold to avoid false positives
|
// Apply staleness threshold to avoid false positives
|
||||||
if (staleThresholdMs > 0) {
|
if (staleThresholdMs > 0) {
|
||||||
@@ -998,6 +999,18 @@ export function heartbeatService(db: Db) {
|
|||||||
return { reaped: reaped.length, runIds: reaped };
|
return { reaped: reaped.length, runIds: reaped };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function resumeQueuedRuns() {
|
||||||
|
const queuedRuns = await db
|
||||||
|
.select({ agentId: heartbeatRuns.agentId })
|
||||||
|
.from(heartbeatRuns)
|
||||||
|
.where(eq(heartbeatRuns.status, "queued"));
|
||||||
|
|
||||||
|
const agentIds = [...new Set(queuedRuns.map((r) => r.agentId))];
|
||||||
|
for (const agentId of agentIds) {
|
||||||
|
await startNextQueuedRunForAgent(agentId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async function updateRuntimeState(
|
async function updateRuntimeState(
|
||||||
agent: typeof agents.$inferSelect,
|
agent: typeof agents.$inferSelect,
|
||||||
run: typeof heartbeatRuns.$inferSelect,
|
run: typeof heartbeatRuns.$inferSelect,
|
||||||
@@ -1679,7 +1692,8 @@ export function heartbeatService(db: Db) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
await finalizeAgentStatus(agent.id, "failed");
|
await finalizeAgentStatus(agent.id, "failed");
|
||||||
} catch (outerErr) {
|
}
|
||||||
|
} catch (outerErr) {
|
||||||
// Setup code before adapter.execute threw (e.g. ensureRuntimeState, resolveWorkspaceForRun).
|
// Setup code before adapter.execute threw (e.g. ensureRuntimeState, resolveWorkspaceForRun).
|
||||||
// The inner catch did not fire, so we must record the failure here.
|
// The inner catch did not fire, so we must record the failure here.
|
||||||
const message = outerErr instanceof Error ? outerErr.message : "Unknown setup failure";
|
const message = outerErr instanceof Error ? outerErr.message : "Unknown setup failure";
|
||||||
@@ -1710,8 +1724,9 @@ export function heartbeatService(db: Db) {
|
|||||||
await finalizeAgentStatus(run.agentId, "failed").catch(() => undefined);
|
await finalizeAgentStatus(run.agentId, "failed").catch(() => undefined);
|
||||||
} finally {
|
} finally {
|
||||||
await releaseRuntimeServicesForRun(run.id).catch(() => undefined);
|
await releaseRuntimeServicesForRun(run.id).catch(() => undefined);
|
||||||
activeRunExecutions.delete(run.id);
|
activeRunExecutions.delete(run.id);
|
||||||
await startNextQueuedRunForAgent(run.agentId);
|
await startNextQueuedRunForAgent(run.agentId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function releaseIssueExecutionAndPromote(run: typeof heartbeatRuns.$inferSelect) {
|
async function releaseIssueExecutionAndPromote(run: typeof heartbeatRuns.$inferSelect) {
|
||||||
@@ -2457,6 +2472,8 @@ export function heartbeatService(db: Db) {
|
|||||||
|
|
||||||
reapOrphanedRuns,
|
reapOrphanedRuns,
|
||||||
|
|
||||||
|
resumeQueuedRuns,
|
||||||
|
|
||||||
tickTimers: async (now = new Date()) => {
|
tickTimers: async (now = new Date()) => {
|
||||||
const allAgents = await db.select().from(agents);
|
const allAgents = await db.select().from(agents);
|
||||||
let checked = 0;
|
let checked = 0;
|
||||||
|
|||||||
Reference in New Issue
Block a user