Files
paperclip/server/src/services/plugin-host-services.ts
Dotta 0c121b856f Merge remote-tracking branch 'public-gh/master' into paperclip-subissues
* public-gh/master: (51 commits)
  Use attachment-size limit for company logos
  Address Greptile company logo feedback
  Drop lockfile from PR branch
  Use asset-backed company logos
  fix: use appType "custom" for Vite dev server so worktree branding is applied
  docs: fix documentation drift — adapters, plugins, tech stack
  docs: update documentation for accuracy after plugin system launch
  chore: ignore superset artifacts
  Dark theme for CodeMirror code blocks in MDXEditor
  Remove duplicate @paperclipai/adapter-openclaw-gateway in server/package.json
  Fix code block styles with robust prose overrides
  Add Docker setup for untrusted PR review in isolated containers
  Fix org chart canvas height to fit viewport without scrolling
  Add doc-maintenance skill for periodic documentation accuracy audits
  Fix sidebar scrollbar: hide track background when not hovering
  Restyle markdown code blocks: dark background, smaller font, compact padding
  Add archive project button and filter archived projects from selectors
  fix: address review feedback — subscription cleanup, filter nullability, stale diagram
  fix: wire plugin event subscriptions from worker to host
  fix(ui): hide scrollbar track background when sidebar is not hovered
  ...

# Conflicts:
#	packages/db/src/migrations/meta/0030_snapshot.json
#	packages/db/src/migrations/meta/_journal.json
2026-03-16 16:02:37 -05:00

1093 lines
39 KiB
TypeScript

import type { Db } from "@paperclipai/db";
import { pluginLogs, agentTaskSessions as agentTaskSessionsTable } from "@paperclipai/db";
import { eq, and, like, desc } from "drizzle-orm";
import type {
HostServices,
Company,
Agent,
Project,
Issue,
Goal,
PluginWorkspace,
IssueComment,
} from "@paperclipai/plugin-sdk";
import { companyService } from "./companies.js";
import { agentService } from "./agents.js";
import { projectService } from "./projects.js";
import { issueService } from "./issues.js";
import { goalService } from "./goals.js";
import { heartbeatService } from "./heartbeat.js";
import { subscribeCompanyLiveEvents } from "./live-events.js";
import { randomUUID } from "node:crypto";
import { activityService } from "./activity.js";
import { costService } from "./costs.js";
import { assetService } from "./assets.js";
import { pluginRegistryService } from "./plugin-registry.js";
import { pluginStateStore } from "./plugin-state-store.js";
import { createPluginSecretsHandler } from "./plugin-secrets-handler.js";
import { logActivity } from "./activity-log.js";
import type { PluginEventBus } from "./plugin-event-bus.js";
import { lookup as dnsLookup } from "node:dns/promises";
import type { IncomingMessage, RequestOptions as HttpRequestOptions } from "node:http";
import { request as httpRequest } from "node:http";
import { request as httpsRequest } from "node:https";
import { isIP } from "node:net";
import { logger } from "../middleware/logger.js";
// ---------------------------------------------------------------------------
// SSRF protection for plugin HTTP fetch
// ---------------------------------------------------------------------------
/** Maximum time (ms) a plugin fetch request may take before being aborted. */
const PLUGIN_FETCH_TIMEOUT_MS = 30_000;
/** Maximum time (ms) to wait for a DNS lookup before aborting. */
const DNS_LOOKUP_TIMEOUT_MS = 5_000;
/** Only these protocols are allowed for plugin HTTP requests. */
const ALLOWED_PROTOCOLS = new Set(["http:", "https:"]);
/**
* Check if an IP address is in a private/reserved range (RFC 1918, loopback,
* link-local, etc.) that plugins should never be able to reach.
*
* Handles IPv4-mapped IPv6 addresses (e.g. ::ffff:127.0.0.1) which Node's
* dns.lookup may return depending on OS configuration.
*/
function isPrivateIP(ip: string): boolean {
const lower = ip.toLowerCase();
// Unwrap IPv4-mapped IPv6 addresses (::ffff:x.x.x.x) and re-check as IPv4
const v4MappedMatch = lower.match(/^::ffff:(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})$/);
if (v4MappedMatch && v4MappedMatch[1]) return isPrivateIP(v4MappedMatch[1]);
// IPv4 patterns
if (ip.startsWith("10.")) return true;
if (ip.startsWith("172.")) {
const second = parseInt(ip.split(".")[1]!, 10);
if (second >= 16 && second <= 31) return true;
}
if (ip.startsWith("192.168.")) return true;
if (ip.startsWith("127.")) return true; // loopback
if (ip.startsWith("169.254.")) return true; // link-local
if (ip === "0.0.0.0") return true;
// IPv6 patterns
if (lower === "::1") return true; // loopback
if (lower.startsWith("fc") || lower.startsWith("fd")) return true; // ULA
if (lower.startsWith("fe80")) return true; // link-local
if (lower === "::") return true;
return false;
}
/**
* Validate a URL for plugin fetch: protocol whitelist + private IP blocking.
*
* SSRF Prevention Strategy:
* 1. Parse and validate the URL syntax
* 2. Enforce protocol whitelist (http/https only)
* 3. Resolve the hostname to IP(s) via DNS
* 4. Validate that ALL resolved IPs are non-private
* 5. Pin the first safe IP into the URL so fetch() does not re-resolve DNS
*
* This prevents DNS rebinding attacks where an attacker controls DNS to
* resolve to a safe IP during validation, then to a private IP when fetch() runs.
*
* @returns Request-routing metadata used to connect directly to the resolved IP
* while preserving the original hostname for HTTP Host and TLS SNI.
*/
interface ValidatedFetchTarget {
parsedUrl: URL;
resolvedAddress: string;
hostHeader: string;
tlsServername?: string;
useTls: boolean;
}
async function validateAndResolveFetchUrl(urlString: string): Promise<ValidatedFetchTarget> {
let parsed: URL;
try {
parsed = new URL(urlString);
} catch {
throw new Error(`Invalid URL: ${urlString}`);
}
if (!ALLOWED_PROTOCOLS.has(parsed.protocol)) {
throw new Error(
`Disallowed protocol "${parsed.protocol}" — only http: and https: are permitted`,
);
}
// Resolve the hostname to an IP and check for private ranges.
// We pin the resolved IP into the URL to eliminate the TOCTOU window
// between DNS resolution here and the second resolution fetch() would do.
const originalHostname = parsed.hostname.replace(/^\[|\]$/g, ""); // strip IPv6 brackets
const hostHeader = parsed.host; // includes port if non-default
// Race the DNS lookup against a timeout to prevent indefinite hangs
// when DNS is misconfigured or unresponsive.
const dnsPromise = dnsLookup(originalHostname, { all: true });
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(
() => reject(new Error(`DNS lookup timed out after ${DNS_LOOKUP_TIMEOUT_MS}ms for ${originalHostname}`)),
DNS_LOOKUP_TIMEOUT_MS,
);
});
try {
const results = await Promise.race([dnsPromise, timeoutPromise]);
if (results.length === 0) {
throw new Error(`DNS resolution returned no results for ${originalHostname}`);
}
// Filter to only non-private IPs instead of rejecting the entire request
// when some IPs are private. This handles multi-homed hosts that resolve
// to both private and public addresses.
const safeResults = results.filter((entry) => !isPrivateIP(entry.address));
if (safeResults.length === 0) {
throw new Error(
`All resolved IPs for ${originalHostname} are in private/reserved ranges`,
);
}
const resolved = safeResults[0]!;
return {
parsedUrl: parsed,
resolvedAddress: resolved.address,
hostHeader,
tlsServername: parsed.protocol === "https:" && isIP(originalHostname) === 0
? originalHostname
: undefined,
useTls: parsed.protocol === "https:",
};
} catch (err) {
// Re-throw our own errors; wrap DNS failures
if (err instanceof Error && (
err.message.startsWith("All resolved IPs") ||
err.message.startsWith("DNS resolution returned") ||
err.message.startsWith("DNS lookup timed out")
)) throw err;
throw new Error(`DNS resolution failed for ${originalHostname}: ${(err as Error).message}`);
}
}
function buildPinnedRequestOptions(
target: ValidatedFetchTarget,
init?: RequestInit,
): { options: HttpRequestOptions & { servername?: string }; body: string | undefined } {
const headers = new Headers(init?.headers);
const method = init?.method ?? "GET";
const body = init?.body === undefined || init?.body === null
? undefined
: typeof init.body === "string"
? init.body
: String(init.body);
headers.set("Host", target.hostHeader);
if (body !== undefined && !headers.has("content-length") && !headers.has("transfer-encoding")) {
headers.set("content-length", String(Buffer.byteLength(body)));
}
const pathname = `${target.parsedUrl.pathname}${target.parsedUrl.search}`;
const auth = target.parsedUrl.username || target.parsedUrl.password
? `${decodeURIComponent(target.parsedUrl.username)}:${decodeURIComponent(target.parsedUrl.password)}`
: undefined;
return {
options: {
protocol: target.parsedUrl.protocol,
host: target.resolvedAddress,
port: target.parsedUrl.port
? Number(target.parsedUrl.port)
: target.useTls
? 443
: 80,
path: pathname,
method,
headers: Object.fromEntries(headers.entries()),
auth,
servername: target.tlsServername,
},
body,
};
}
async function executePinnedHttpRequest(
target: ValidatedFetchTarget,
init: RequestInit | undefined,
signal: AbortSignal,
): Promise<{ status: number; statusText: string; headers: Record<string, string>; body: string }> {
const { options, body } = buildPinnedRequestOptions(target, init);
const response = await new Promise<IncomingMessage>((resolve, reject) => {
const requestFn = target.useTls ? httpsRequest : httpRequest;
const req = requestFn({ ...options, signal }, resolve);
req.on("error", reject);
if (body !== undefined) {
req.write(body);
}
req.end();
});
const MAX_RESPONSE_BODY_BYTES = 200 * 1024 * 1024; // 200 MB
const chunks: Buffer[] = [];
let totalBytes = 0;
await new Promise<void>((resolve, reject) => {
response.on("data", (chunk: Buffer | string) => {
const buf = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
totalBytes += buf.length;
if (totalBytes > MAX_RESPONSE_BODY_BYTES) {
chunks.length = 0;
response.destroy(new Error(`Response body exceeded ${MAX_RESPONSE_BODY_BYTES} bytes`));
return;
}
chunks.push(buf);
});
response.on("end", resolve);
response.on("error", reject);
});
const headers: Record<string, string> = {};
for (const [key, value] of Object.entries(response.headers)) {
if (Array.isArray(value)) {
headers[key] = value.join(", ");
} else if (value !== undefined) {
headers[key] = value;
}
}
return {
status: response.statusCode ?? 500,
statusText: response.statusMessage ?? "",
headers,
body: Buffer.concat(chunks).toString("utf8"),
};
}
const UUID_PATTERN = /^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i;
const PATH_LIKE_PATTERN = /[\\/]/;
const WINDOWS_DRIVE_PATH_PATTERN = /^[A-Za-z]:[\\/]/;
function looksLikePath(value: string): boolean {
const normalized = value.trim();
return (
PATH_LIKE_PATTERN.test(normalized)
|| WINDOWS_DRIVE_PATH_PATTERN.test(normalized)
) && !UUID_PATTERN.test(normalized);
}
function sanitizeWorkspaceText(value: string): string {
const trimmed = value.trim();
if (!trimmed || UUID_PATTERN.test(trimmed)) return "";
return trimmed;
}
function sanitizeWorkspacePath(cwd: string | null): string {
if (!cwd) return "";
return looksLikePath(cwd) ? cwd.trim() : "";
}
function sanitizeWorkspaceName(name: string, fallbackPath: string): string {
const safeName = sanitizeWorkspaceText(name);
if (safeName && !looksLikePath(safeName)) {
return safeName;
}
const normalized = fallbackPath.trim().replace(/[\\/]+$/, "");
const segments = normalized.split(/[\\/]/).filter(Boolean);
return segments[segments.length - 1] ?? "Workspace";
}
// ---------------------------------------------------------------------------
// Buffered plugin log writes
// ---------------------------------------------------------------------------
/** How many buffered log entries trigger an immediate flush. */
const LOG_BUFFER_FLUSH_SIZE = 100;
/** How often (ms) the buffer is flushed regardless of size. */
const LOG_BUFFER_FLUSH_INTERVAL_MS = 5_000;
/** Max length for a single plugin log message (bytes/chars). */
const MAX_LOG_MESSAGE_LENGTH = 10_000;
/** Max serialised JSON size for plugin log meta objects. */
const MAX_LOG_META_JSON_LENGTH = 50_000;
/** Max length for a metric name. */
const MAX_METRIC_NAME_LENGTH = 500;
/** Pino reserved field names that plugins must not overwrite. */
const PINO_RESERVED_KEYS = new Set([
"level",
"time",
"pid",
"hostname",
"msg",
"v",
]);
/** Truncate a string to `max` characters, appending a marker if truncated. */
function truncStr(s: string, max: number): string {
if (s.length <= max) return s;
return s.slice(0, max) + "...[truncated]";
}
/** Sanitise a plugin-supplied meta object: enforce size limit and strip reserved keys. */
function sanitiseMeta(meta: Record<string, unknown> | null | undefined): Record<string, unknown> | null {
if (meta == null) return null;
// Strip pino reserved keys
const cleaned: Record<string, unknown> = {};
for (const [k, v] of Object.entries(meta)) {
if (!PINO_RESERVED_KEYS.has(k)) {
cleaned[k] = v;
}
}
// Enforce total serialised size
let json: string;
try {
json = JSON.stringify(cleaned);
} catch {
return { _sanitised: true, _error: "meta was not JSON-serialisable" };
}
if (json.length > MAX_LOG_META_JSON_LENGTH) {
return { _sanitised: true, _error: `meta exceeded ${MAX_LOG_META_JSON_LENGTH} chars` };
}
return cleaned;
}
interface BufferedLogEntry {
db: Db;
pluginId: string;
level: string;
message: string;
meta: Record<string, unknown> | null;
}
const _logBuffer: BufferedLogEntry[] = [];
/**
* Flush all buffered log entries to the database in a single batch insert per
* unique db instance. Errors are swallowed with a console.error fallback so
* flushing never crashes the process.
*/
export async function flushPluginLogBuffer(): Promise<void> {
if (_logBuffer.length === 0) return;
// Drain the buffer atomically so concurrent flushes don't double-insert.
const entries = _logBuffer.splice(0, _logBuffer.length);
// Group entries by db identity so multi-db scenarios are handled correctly.
const byDb = new Map<Db, BufferedLogEntry[]>();
for (const entry of entries) {
const group = byDb.get(entry.db);
if (group) {
group.push(entry);
} else {
byDb.set(entry.db, [entry]);
}
}
for (const [dbInstance, group] of byDb) {
const values = group.map((e) => ({
pluginId: e.pluginId,
level: e.level,
message: e.message,
meta: e.meta,
}));
try {
await dbInstance.insert(pluginLogs).values(values);
} catch (err) {
try {
logger.warn({ err, count: values.length }, "Failed to batch-persist plugin logs to DB");
} catch {
console.error("[plugin-host-services] Batch log flush failed:", err);
}
}
}
}
/** Interval handle for the periodic log flush. */
const _logFlushInterval = setInterval(() => {
flushPluginLogBuffer().catch((err) => {
console.error("[plugin-host-services] Periodic log flush error:", err);
});
}, LOG_BUFFER_FLUSH_INTERVAL_MS);
// Allow the interval to be unref'd so it doesn't keep the process alive in tests.
if (_logFlushInterval.unref) _logFlushInterval.unref();
/**
* buildHostServices — creates a concrete implementation of the `HostServices`
* interface for a specific plugin.
*
* This implementation delegates to the core Paperclip domain services,
* providing the bridge between the plugin worker's SDK and the host platform.
*
* @param db - Database connection instance.
* @param pluginId - The UUID of the plugin installation record.
* @param pluginKey - The unique identifier from the plugin manifest (e.g., "acme.linear").
* @param eventBus - The system-wide event bus for publishing plugin events.
* @returns An object implementing the HostServices interface for the plugin SDK.
*/
/** Maximum time (ms) to keep a session event subscription alive before forcing cleanup. */
const SESSION_EVENT_SUBSCRIPTION_TIMEOUT_MS = 30 * 60 * 1_000; // 30 minutes
export function buildHostServices(
db: Db,
pluginId: string,
pluginKey: string,
eventBus: PluginEventBus,
notifyWorker?: (method: string, params: unknown) => void,
): HostServices & { dispose(): void } {
const registry = pluginRegistryService(db);
const stateStore = pluginStateStore(db);
const secretsHandler = createPluginSecretsHandler({ db, pluginId });
const companies = companyService(db);
const agents = agentService(db);
const heartbeat = heartbeatService(db);
const projects = projectService(db);
const issues = issueService(db);
const goals = goalService(db);
const activity = activityService(db);
const costs = costService(db);
const assets = assetService(db);
const scopedBus = eventBus.forPlugin(pluginKey);
// Track active session event subscriptions for cleanup
const activeSubscriptions = new Set<{ unsubscribe: () => void; timer: ReturnType<typeof setTimeout> }>();
let disposed = false;
const ensureCompanyId = (companyId?: string) => {
if (!companyId) throw new Error("companyId is required for this operation");
return companyId;
};
const parseWindowValue = (value: unknown): number | null => {
if (typeof value === "number" && Number.isFinite(value)) {
return Math.max(0, Math.floor(value));
}
if (typeof value === "string" && value.trim().length > 0) {
const parsed = Number(value);
if (Number.isFinite(parsed)) {
return Math.max(0, Math.floor(parsed));
}
}
return null;
};
const applyWindow = <T>(rows: T[], params?: { limit?: unknown; offset?: unknown }): T[] => {
const offset = parseWindowValue(params?.offset) ?? 0;
const limit = parseWindowValue(params?.limit);
if (limit == null) return rows.slice(offset);
return rows.slice(offset, offset + limit);
};
/**
* Plugins are instance-wide in the current runtime. Company IDs are still
* required for company-scoped data access, but there is no per-company
* availability gate to enforce here.
*/
const ensurePluginAvailableForCompany = async (_companyId: string) => {};
const inCompany = <T extends { companyId: string | null | undefined }>(
record: T | null | undefined,
companyId: string,
): record is T => Boolean(record && record.companyId === companyId);
const requireInCompany = <T extends { companyId: string | null | undefined }>(
entityName: string,
record: T | null | undefined,
companyId: string,
): T => {
if (!inCompany(record, companyId)) {
throw new Error(`${entityName} not found`);
}
return record;
};
return {
config: {
async get() {
const configRow = await registry.getConfig(pluginId);
return configRow?.configJson ?? {};
},
},
state: {
async get(params) {
return stateStore.get(pluginId, params.scopeKind as any, params.stateKey, {
scopeId: params.scopeId,
namespace: params.namespace,
});
},
async set(params) {
await stateStore.set(pluginId, {
scopeKind: params.scopeKind as any,
scopeId: params.scopeId,
namespace: params.namespace,
stateKey: params.stateKey,
value: params.value,
});
},
async delete(params) {
await stateStore.delete(pluginId, params.scopeKind as any, params.stateKey, {
scopeId: params.scopeId,
namespace: params.namespace,
});
},
},
entities: {
async upsert(params) {
return registry.upsertEntity(pluginId, params as any) as any;
},
async list(params) {
return registry.listEntities(pluginId, params as any) as any;
},
},
events: {
async emit(params) {
if (params.companyId) {
await ensurePluginAvailableForCompany(params.companyId);
}
await scopedBus.emit(params.name, params.companyId, params.payload);
},
async subscribe(params: { eventPattern: string; filter?: Record<string, unknown> | null }) {
const handler = async (event: import("@paperclipai/plugin-sdk").PluginEvent) => {
if (notifyWorker) {
notifyWorker("onEvent", { event });
}
};
if (params.filter) {
scopedBus.subscribe(params.eventPattern as any, params.filter as any, handler);
} else {
scopedBus.subscribe(params.eventPattern as any, handler);
}
},
},
http: {
async fetch(params) {
// SSRF protection: validate protocol whitelist + block private IPs.
// Resolve once, then connect directly to that IP to prevent DNS rebinding.
const target = await validateAndResolveFetchUrl(params.url);
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), PLUGIN_FETCH_TIMEOUT_MS);
try {
const init = params.init as RequestInit | undefined;
return await executePinnedHttpRequest(target, init, controller.signal);
} finally {
clearTimeout(timeout);
}
},
},
secrets: {
async resolve(params) {
return secretsHandler.resolve(params);
},
},
activity: {
async log(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
await logActivity(db, {
companyId,
actorType: "system",
actorId: pluginId,
action: params.message,
entityType: params.entityType ?? "plugin",
entityId: params.entityId ?? pluginId,
details: params.metadata,
});
},
},
metrics: {
async write(params) {
const safeName = truncStr(String(params.name ?? ""), MAX_METRIC_NAME_LENGTH);
logger.debug({ pluginId, name: safeName, value: params.value, tags: params.tags }, "Plugin metric write");
// Persist metrics to plugin_logs via the batch buffer (same path as
// logger.log) so they benefit from batched writes and are flushed
// reliably on shutdown. Using level "metric" makes them queryable
// alongside regular logs via the same API (§26).
_logBuffer.push({
db,
pluginId,
level: "metric",
message: safeName,
meta: sanitiseMeta({ value: params.value, tags: params.tags ?? null }),
});
if (_logBuffer.length >= LOG_BUFFER_FLUSH_SIZE) {
flushPluginLogBuffer().catch((err) => {
console.error("[plugin-host-services] Triggered metric flush failed:", err);
});
}
},
},
logger: {
async log(params) {
const { level, meta } = params;
const safeMessage = truncStr(String(params.message ?? ""), MAX_LOG_MESSAGE_LENGTH);
const safeMeta = sanitiseMeta(meta);
const pluginLogger = logger.child({ service: "plugin-worker", pluginId });
const logFields = {
...safeMeta,
pluginLogLevel: level,
pluginTimestamp: new Date().toISOString(),
};
if (level === "error") pluginLogger.error(logFields, `[plugin] ${safeMessage}`);
else if (level === "warn") pluginLogger.warn(logFields, `[plugin] ${safeMessage}`);
else if (level === "debug") pluginLogger.debug(logFields, `[plugin] ${safeMessage}`);
else pluginLogger.info(logFields, `[plugin] ${safeMessage}`);
// Persist to plugin_logs table via the module-level batch buffer (§26.1).
// Fire-and-forget — logging should never block the worker.
_logBuffer.push({
db,
pluginId,
level: level ?? "info",
message: safeMessage,
meta: safeMeta,
});
if (_logBuffer.length >= LOG_BUFFER_FLUSH_SIZE) {
flushPluginLogBuffer().catch((err) => {
console.error("[plugin-host-services] Triggered log flush failed:", err);
});
}
},
},
companies: {
async list(params) {
return applyWindow((await companies.list()) as Company[], params);
},
async get(params) {
await ensurePluginAvailableForCompany(params.companyId);
return (await companies.getById(params.companyId)) as Company;
},
},
projects: {
async list(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
return applyWindow((await projects.list(companyId)) as Project[], params);
},
async get(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const project = await projects.getById(params.projectId);
return (inCompany(project, companyId) ? project : null) as Project | null;
},
async listWorkspaces(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const project = await projects.getById(params.projectId);
if (!inCompany(project, companyId)) return [];
const rows = await projects.listWorkspaces(params.projectId);
return rows.map((row) => {
const path = sanitizeWorkspacePath(row.cwd);
const name = sanitizeWorkspaceName(row.name, path);
return {
id: row.id,
projectId: row.projectId,
name,
path,
isPrimary: row.isPrimary,
createdAt: row.createdAt.toISOString(),
updatedAt: row.updatedAt.toISOString(),
};
});
},
async getPrimaryWorkspace(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const project = await projects.getById(params.projectId);
if (!inCompany(project, companyId)) return null;
const row = project.primaryWorkspace;
const path = sanitizeWorkspacePath(project.codebase.effectiveLocalFolder);
const name = sanitizeWorkspaceName(row?.name ?? project.name, path);
return {
id: row?.id ?? `${project.id}:managed`,
projectId: project.id,
name,
path,
isPrimary: true,
createdAt: (row?.createdAt ?? project.createdAt).toISOString(),
updatedAt: (row?.updatedAt ?? project.updatedAt).toISOString(),
};
},
async getWorkspaceForIssue(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const issue = await issues.getById(params.issueId);
if (!inCompany(issue, companyId)) return null;
const projectId = (issue as Record<string, unknown>).projectId as string | null;
if (!projectId) return null;
const project = await projects.getById(projectId);
if (!inCompany(project, companyId)) return null;
const row = project.primaryWorkspace;
const path = sanitizeWorkspacePath(project.codebase.effectiveLocalFolder);
const name = sanitizeWorkspaceName(row?.name ?? project.name, path);
return {
id: row?.id ?? `${project.id}:managed`,
projectId: project.id,
name,
path,
isPrimary: true,
createdAt: (row?.createdAt ?? project.createdAt).toISOString(),
updatedAt: (row?.updatedAt ?? project.updatedAt).toISOString(),
};
},
},
issues: {
async list(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
return applyWindow((await issues.list(companyId, params as any)) as Issue[], params);
},
async get(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const issue = await issues.getById(params.issueId);
return (inCompany(issue, companyId) ? issue : null) as Issue | null;
},
async create(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
return (await issues.create(companyId, params as any)) as Issue;
},
async update(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
requireInCompany("Issue", await issues.getById(params.issueId), companyId);
return (await issues.update(params.issueId, params.patch as any)) as Issue;
},
async listComments(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
if (!inCompany(await issues.getById(params.issueId), companyId)) return [];
return (await issues.listComments(params.issueId)) as IssueComment[];
},
async createComment(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
requireInCompany("Issue", await issues.getById(params.issueId), companyId);
return (await issues.addComment(
params.issueId,
params.body,
{},
)) as IssueComment;
},
},
agents: {
async list(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const rows = await agents.list(companyId);
return applyWindow(
rows.filter((agent) => !params.status || agent.status === params.status) as Agent[],
params,
);
},
async get(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const agent = await agents.getById(params.agentId);
return (inCompany(agent, companyId) ? agent : null) as Agent | null;
},
async pause(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const agent = await agents.getById(params.agentId);
requireInCompany("Agent", agent, companyId);
return (await agents.pause(params.agentId)) as Agent;
},
async resume(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const agent = await agents.getById(params.agentId);
requireInCompany("Agent", agent, companyId);
return (await agents.resume(params.agentId)) as Agent;
},
async invoke(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const agent = await agents.getById(params.agentId);
requireInCompany("Agent", agent, companyId);
const run = await heartbeat.wakeup(params.agentId, {
source: "automation",
triggerDetail: "system",
reason: params.reason ?? null,
payload: { prompt: params.prompt },
requestedByActorType: "system",
requestedByActorId: pluginId,
});
if (!run) throw new Error("Agent wakeup was skipped by heartbeat policy");
return { runId: run.id };
},
},
goals: {
async list(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const rows = await goals.list(companyId);
return applyWindow(
rows.filter((goal) =>
(!params.level || goal.level === params.level) &&
(!params.status || goal.status === params.status),
) as Goal[],
params,
);
},
async get(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const goal = await goals.getById(params.goalId);
return (inCompany(goal, companyId) ? goal : null) as Goal | null;
},
async create(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
return (await goals.create(companyId, {
title: params.title,
description: params.description,
level: params.level as any,
status: params.status as any,
parentId: params.parentId,
ownerAgentId: params.ownerAgentId,
})) as Goal;
},
async update(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
requireInCompany("Goal", await goals.getById(params.goalId), companyId);
return (await goals.update(params.goalId, params.patch as any)) as Goal;
},
},
agentSessions: {
async create(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const agent = await agents.getById(params.agentId);
requireInCompany("Agent", agent, companyId);
const taskKey = params.taskKey ?? `plugin:${pluginKey}:session:${randomUUID()}`;
const row = await db
.insert(agentTaskSessionsTable)
.values({
companyId,
agentId: params.agentId,
adapterType: agent!.adapterType,
taskKey,
sessionParamsJson: null,
sessionDisplayId: null,
lastRunId: null,
lastError: null,
})
.returning()
.then((rows) => rows[0]);
return {
sessionId: row!.id,
agentId: params.agentId,
companyId,
status: "active" as const,
createdAt: row!.createdAt.toISOString(),
};
},
async list(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const rows = await db
.select()
.from(agentTaskSessionsTable)
.where(
and(
eq(agentTaskSessionsTable.agentId, params.agentId),
eq(agentTaskSessionsTable.companyId, companyId),
like(agentTaskSessionsTable.taskKey, `plugin:${pluginKey}:session:%`),
),
)
.orderBy(desc(agentTaskSessionsTable.createdAt));
return rows.map((row) => ({
sessionId: row.id,
agentId: row.agentId,
companyId: row.companyId,
status: "active" as const,
createdAt: row.createdAt.toISOString(),
}));
},
async sendMessage(params) {
if (disposed) {
throw new Error("Host services have been disposed");
}
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
// Verify session exists and belongs to this plugin
const session = await db
.select()
.from(agentTaskSessionsTable)
.where(
and(
eq(agentTaskSessionsTable.id, params.sessionId),
eq(agentTaskSessionsTable.companyId, companyId),
like(agentTaskSessionsTable.taskKey, `plugin:${pluginKey}:session:%`),
),
)
.then((rows) => rows[0] ?? null);
if (!session) throw new Error(`Session not found: ${params.sessionId}`);
const run = await heartbeat.wakeup(session.agentId, {
source: "automation",
triggerDetail: "system",
reason: params.reason ?? null,
payload: { prompt: params.prompt },
contextSnapshot: {
taskKey: session.taskKey,
wakeSource: "automation",
wakeTriggerDetail: "system",
},
requestedByActorType: "system",
requestedByActorId: pluginId,
});
if (!run) throw new Error("Agent wakeup was skipped by heartbeat policy");
// Subscribe to live events and forward to the plugin worker as notifications.
// Track the subscription so it can be cleaned up on dispose() if the run
// never reaches a terminal status (hang, crash, network partition).
if (notifyWorker) {
const TERMINAL_STATUSES = new Set(["succeeded", "failed", "cancelled", "timed_out"]);
const cleanup = () => {
unsubscribe();
clearTimeout(timeoutTimer);
activeSubscriptions.delete(entry);
};
const unsubscribe = subscribeCompanyLiveEvents(companyId, (event) => {
const payload = event.payload as Record<string, unknown> | undefined;
if (!payload || payload.runId !== run.id) return;
if (event.type === "heartbeat.run.log" || event.type === "heartbeat.run.event") {
notifyWorker("agents.sessions.event", {
sessionId: params.sessionId,
runId: run.id,
seq: (payload.seq as number) ?? 0,
eventType: "chunk",
stream: (payload.stream as string) ?? null,
message: (payload.chunk as string) ?? (payload.message as string) ?? null,
payload: payload,
});
} else if (event.type === "heartbeat.run.status") {
const status = payload.status as string;
if (TERMINAL_STATUSES.has(status)) {
notifyWorker("agents.sessions.event", {
sessionId: params.sessionId,
runId: run.id,
seq: 0,
eventType: status === "succeeded" ? "done" : "error",
stream: "system",
message: status === "succeeded" ? "Run completed" : `Run ${status}`,
payload: payload,
});
cleanup();
} else {
notifyWorker("agents.sessions.event", {
sessionId: params.sessionId,
runId: run.id,
seq: 0,
eventType: "status",
stream: "system",
message: `Run status: ${status}`,
payload: payload,
});
}
}
});
// Safety-net timeout: if the run never reaches a terminal status,
// force-cleanup the subscription to prevent unbounded leaks.
const timeoutTimer = setTimeout(() => {
logger.warn(
{ pluginId, pluginKey, runId: run.id },
"session event subscription timed out — forcing cleanup",
);
cleanup();
}, SESSION_EVENT_SUBSCRIPTION_TIMEOUT_MS);
const entry = { unsubscribe, timer: timeoutTimer };
activeSubscriptions.add(entry);
}
return { runId: run.id };
},
async close(params) {
const companyId = ensureCompanyId(params.companyId);
await ensurePluginAvailableForCompany(companyId);
const deleted = await db
.delete(agentTaskSessionsTable)
.where(
and(
eq(agentTaskSessionsTable.id, params.sessionId),
eq(agentTaskSessionsTable.companyId, companyId),
like(agentTaskSessionsTable.taskKey, `plugin:${pluginKey}:session:%`),
),
)
.returning()
.then((rows) => rows.length);
if (deleted === 0) throw new Error(`Session not found: ${params.sessionId}`);
},
},
/**
* Clean up all active session event subscriptions and flush any buffered
* log entries. Must be called when the plugin worker is stopped, crashed,
* or unloaded to prevent leaked listeners and lost log entries.
*/
dispose() {
disposed = true;
// Clear event bus subscriptions to prevent accumulation on worker restart.
// Without this, each crash/restart cycle adds duplicate subscriptions.
scopedBus.clear();
// Snapshot to avoid iterator invalidation from concurrent sendMessage() calls
const snapshot = Array.from(activeSubscriptions);
activeSubscriptions.clear();
for (const entry of snapshot) {
clearTimeout(entry.timer);
entry.unsubscribe();
}
// Flush any buffered log entries synchronously-as-possible on dispose.
flushPluginLogBuffer().catch((err) => {
console.error("[plugin-host-services] dispose() log flush failed:", err);
});
},
};
}