fix: address review feedback — subscription cleanup, filter nullability, stale diagram
- Add scopedBus.clear() in dispose() to prevent subscription accumulation on worker crash/restart cycles - Use two-arg subscribe() overload when filter is null instead of passing empty object; fix filter type to include null - Update ASCII flow diagram: onEvent is a notification, not request/response
This commit is contained in:
@@ -19,8 +19,7 @@
|
|||||||
* |--- request(initialize) -------------> | → calls plugin.setup(ctx)
|
* |--- request(initialize) -------------> | → calls plugin.setup(ctx)
|
||||||
* |<-- response(ok:true) ---------------- |
|
* |<-- response(ok:true) ---------------- |
|
||||||
* | |
|
* | |
|
||||||
* |--- request(onEvent) ----------------> | → dispatches to registered handler
|
* |--- notification(onEvent) -----------> | → dispatches to registered handler
|
||||||
* |<-- response(void) ------------------ |
|
|
||||||
* | |
|
* | |
|
||||||
* |<-- request(state.get) --------------- | ← SDK client call from plugin code
|
* |<-- request(state.get) --------------- | ← SDK client call from plugin code
|
||||||
* |--- response(result) ----------------> |
|
* |--- response(result) ----------------> |
|
||||||
|
|||||||
@@ -556,16 +556,17 @@ export function buildHostServices(
|
|||||||
}
|
}
|
||||||
await scopedBus.emit(params.name, params.companyId, params.payload);
|
await scopedBus.emit(params.name, params.companyId, params.payload);
|
||||||
},
|
},
|
||||||
async subscribe(params: { eventPattern: string; filter?: Record<string, unknown> }) {
|
async subscribe(params: { eventPattern: string; filter?: Record<string, unknown> | null }) {
|
||||||
scopedBus.subscribe(
|
const handler = async (event: import("@paperclipai/plugin-sdk").PluginEvent) => {
|
||||||
params.eventPattern as any,
|
|
||||||
params.filter as any ?? {},
|
|
||||||
async (event) => {
|
|
||||||
if (notifyWorker) {
|
if (notifyWorker) {
|
||||||
notifyWorker("onEvent", { event });
|
notifyWorker("onEvent", { event });
|
||||||
}
|
}
|
||||||
},
|
};
|
||||||
);
|
if (params.filter) {
|
||||||
|
scopedBus.subscribe(params.eventPattern as any, params.filter as any, handler);
|
||||||
|
} else {
|
||||||
|
scopedBus.subscribe(params.eventPattern as any, handler);
|
||||||
|
}
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
@@ -1071,6 +1072,10 @@ export function buildHostServices(
|
|||||||
dispose() {
|
dispose() {
|
||||||
disposed = true;
|
disposed = true;
|
||||||
|
|
||||||
|
// Clear event bus subscriptions to prevent accumulation on worker restart.
|
||||||
|
// Without this, each crash/restart cycle adds duplicate subscriptions.
|
||||||
|
scopedBus.clear();
|
||||||
|
|
||||||
// Snapshot to avoid iterator invalidation from concurrent sendMessage() calls
|
// Snapshot to avoid iterator invalidation from concurrent sendMessage() calls
|
||||||
const snapshot = Array.from(activeSubscriptions);
|
const snapshot = Array.from(activeSubscriptions);
|
||||||
activeSubscriptions.clear();
|
activeSubscriptions.clear();
|
||||||
|
|||||||
Reference in New Issue
Block a user