Merge pull request #909 from mvanhorn/feat/plugin-domain-event-bridge
feat(plugins): bridge core domain events to plugin event bus
This commit is contained in:
@@ -37,6 +37,7 @@ import { pluginLifecycleManager } from "./services/plugin-lifecycle.js";
|
|||||||
import { createPluginJobCoordinator } from "./services/plugin-job-coordinator.js";
|
import { createPluginJobCoordinator } from "./services/plugin-job-coordinator.js";
|
||||||
import { buildHostServices, flushPluginLogBuffer } from "./services/plugin-host-services.js";
|
import { buildHostServices, flushPluginLogBuffer } from "./services/plugin-host-services.js";
|
||||||
import { createPluginEventBus } from "./services/plugin-event-bus.js";
|
import { createPluginEventBus } from "./services/plugin-event-bus.js";
|
||||||
|
import { setPluginEventBus } from "./services/activity-log.js";
|
||||||
import { createPluginDevWatcher } from "./services/plugin-dev-watcher.js";
|
import { createPluginDevWatcher } from "./services/plugin-dev-watcher.js";
|
||||||
import { createPluginHostServiceCleanup } from "./services/plugin-host-service-cleanup.js";
|
import { createPluginHostServiceCleanup } from "./services/plugin-host-service-cleanup.js";
|
||||||
import { pluginRegistryService } from "./services/plugin-registry.js";
|
import { pluginRegistryService } from "./services/plugin-registry.js";
|
||||||
@@ -141,6 +142,7 @@ export async function createApp(
|
|||||||
const workerManager = createPluginWorkerManager();
|
const workerManager = createPluginWorkerManager();
|
||||||
const pluginRegistry = pluginRegistryService(db);
|
const pluginRegistry = pluginRegistryService(db);
|
||||||
const eventBus = createPluginEventBus();
|
const eventBus = createPluginEventBus();
|
||||||
|
setPluginEventBus(eventBus);
|
||||||
const jobStore = pluginJobStore(db);
|
const jobStore = pluginJobStore(db);
|
||||||
const lifecycle = pluginLifecycleManager(db, { workerManager });
|
const lifecycle = pluginLifecycleManager(db, { workerManager });
|
||||||
const scheduler = createPluginJobScheduler({
|
const scheduler = createPluginJobScheduler({
|
||||||
|
|||||||
@@ -1,8 +1,25 @@
|
|||||||
|
import { randomUUID } from "node:crypto";
|
||||||
import type { Db } from "@paperclipai/db";
|
import type { Db } from "@paperclipai/db";
|
||||||
import { activityLog } from "@paperclipai/db";
|
import { activityLog } from "@paperclipai/db";
|
||||||
|
import { PLUGIN_EVENT_TYPES, type PluginEventType } from "@paperclipai/shared";
|
||||||
|
import type { PluginEvent } from "@paperclipai/plugin-sdk";
|
||||||
import { publishLiveEvent } from "./live-events.js";
|
import { publishLiveEvent } from "./live-events.js";
|
||||||
import { redactCurrentUserValue } from "../log-redaction.js";
|
import { redactCurrentUserValue } from "../log-redaction.js";
|
||||||
import { sanitizeRecord } from "../redaction.js";
|
import { sanitizeRecord } from "../redaction.js";
|
||||||
|
import { logger } from "../middleware/logger.js";
|
||||||
|
import type { PluginEventBus } from "./plugin-event-bus.js";
|
||||||
|
|
||||||
|
const PLUGIN_EVENT_SET: ReadonlySet<string> = new Set(PLUGIN_EVENT_TYPES);
|
||||||
|
|
||||||
|
let _pluginEventBus: PluginEventBus | null = null;
|
||||||
|
|
||||||
|
/** Wire the plugin event bus so domain events are forwarded to plugins. */
|
||||||
|
export function setPluginEventBus(bus: PluginEventBus): void {
|
||||||
|
if (_pluginEventBus) {
|
||||||
|
logger.warn("setPluginEventBus called more than once, replacing existing bus");
|
||||||
|
}
|
||||||
|
_pluginEventBus = bus;
|
||||||
|
}
|
||||||
|
|
||||||
export interface LogActivityInput {
|
export interface LogActivityInput {
|
||||||
companyId: string;
|
companyId: string;
|
||||||
@@ -45,4 +62,27 @@ export async function logActivity(db: Db, input: LogActivityInput) {
|
|||||||
details: redactedDetails,
|
details: redactedDetails,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (_pluginEventBus && PLUGIN_EVENT_SET.has(input.action)) {
|
||||||
|
const event: PluginEvent = {
|
||||||
|
eventId: randomUUID(),
|
||||||
|
eventType: input.action as PluginEventType,
|
||||||
|
occurredAt: new Date().toISOString(),
|
||||||
|
actorId: input.actorId,
|
||||||
|
actorType: input.actorType,
|
||||||
|
entityId: input.entityId,
|
||||||
|
entityType: input.entityType,
|
||||||
|
companyId: input.companyId,
|
||||||
|
payload: {
|
||||||
|
...redactedDetails,
|
||||||
|
agentId: input.agentId ?? null,
|
||||||
|
runId: input.runId ?? null,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
void _pluginEventBus.emit(event).then(({ errors }) => {
|
||||||
|
for (const { pluginId, error } of errors) {
|
||||||
|
logger.warn({ pluginId, eventType: event.eventType, err: error }, "plugin event handler failed");
|
||||||
|
}
|
||||||
|
}).catch(() => {});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user