diff options
| author | Adam Malczewski <[email protected]> | 2026-06-04 22:40:46 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-04 22:40:56 +0900 |
| commit | 669c269d972f7bfcd9036017ff5e668d2eeafdd8 (patch) | |
| tree | 9dfd465feb78c1c30c75d09528476b0531689bcd | |
| parent | 0af8b1de733a199dbf5dcf779cc45f495048b1c3 (diff) | |
| download | dispatch-669c269d972f7bfcd9036017ff5e668d2eeafdd8.tar.gz dispatch-669c269d972f7bfcd9036017ff5e668d2eeafdd8.zip | |
feat(kernel): event/hook bus (events, filters, services) — pure dispatch + stateful shell
| -rw-r--r-- | packages/kernel/src/bus/bus.test.ts | 365 | ||||
| -rw-r--r-- | packages/kernel/src/bus/bus.ts | 139 | ||||
| -rw-r--r-- | packages/kernel/src/bus/index.ts | 4 | ||||
| -rw-r--r-- | packages/kernel/src/bus/pure.ts | 83 | ||||
| -rw-r--r-- | packages/kernel/src/index.ts | 4 |
5 files changed, 593 insertions, 2 deletions
diff --git a/packages/kernel/src/bus/bus.test.ts b/packages/kernel/src/bus/bus.test.ts new file mode 100644 index 0000000..7366b50 --- /dev/null +++ b/packages/kernel/src/bus/bus.test.ts @@ -0,0 +1,365 @@ +import { beforeEach, describe, expect, it } from "vitest"; +import type { Logger } from "../contracts/extension.js"; +import { defineEventHook, defineFilter, defineService } from "../contracts/hooks.js"; +import { type Bus, createBus } from "./bus.js"; +import { applyFilterChain, dispatchEventSync, sortFilters } from "./pure.js"; + +interface FakeLogger extends Logger { + readonly errors: Array<{ message: string; args: unknown[] }>; +} + +function createFakeLogger(): FakeLogger { + const errors: Array<{ message: string; args: unknown[] }> = []; + return { + errors, + debug: () => {}, + info: () => {}, + warn: () => {}, + error: (message: string, ...args: unknown[]) => { + errors.push({ message, args }); + }, + }; +} + +describe("event hooks", () => { + let logger: FakeLogger; + let bus: Bus; + + beforeEach(() => { + logger = createFakeLogger(); + bus = createBus(logger); + }); + + it("fires all registered listeners", () => { + const hook = defineEventHook<{ value: number }>("test/event"); + const received: number[] = []; + + bus.on(hook, (payload) => { + received.push(payload.value); + }); + bus.on(hook, (payload) => { + received.push(payload.value * 10); + }); + + bus.emit(hook, { value: 3 }); + + expect(received).toEqual([3, 30]); + }); + + it("isolates a throwing listener (others still run, error logged)", () => { + const hook = defineEventHook<string>("test/isolate"); + const received: string[] = []; + + bus.on(hook, () => { + throw new Error("boom"); + }); + bus.on(hook, (payload) => { + received.push(payload); + }); + + bus.emit(hook, "hello"); + + expect(received).toEqual(["hello"]); + expect(logger.errors).toHaveLength(1); + expect(logger.errors[0]?.message).toContain("test/isolate"); + }); + + it("isolates an async handler rejection", async () => { + const hook = defineEventHook<string>("test/async-reject"); + const received: string[] = []; + + bus.on(hook, async () => { + throw new Error("async boom"); + }); + bus.on(hook, async (payload) => { + received.push(payload); + }); + + bus.emit(hook, "data"); + + await new Promise((resolve) => { + setTimeout(resolve, 10); + }); + + expect(received).toEqual(["data"]); + expect(logger.errors).toHaveLength(1); + expect(logger.errors[0]?.message).toContain("test/async-reject"); + }); + + it("unsubscribe removes the handler", () => { + const hook = defineEventHook<void>("test/unsub"); + let count = 0; + + const unsub = bus.on(hook, () => { + count++; + }); + + bus.emit(hook, undefined); + expect(count).toBe(1); + + unsub(); + bus.emit(hook, undefined); + expect(count).toBe(1); + }); + + it("emit with no handlers is a no-op", () => { + const hook = defineEventHook<string>("test/empty"); + expect(() => bus.emit(hook, "nothing")).not.toThrow(); + }); + + it("emitAsync awaits all handlers", async () => { + const hook = defineEventHook<number>("test/async"); + const received: number[] = []; + + bus.on(hook, async (payload) => { + await new Promise((resolve) => { + setTimeout(resolve, 5); + }); + received.push(payload); + }); + bus.on(hook, async (payload) => { + received.push(payload * 2); + }); + + await bus.emitAsync(hook, 5); + + expect(received).toEqual([10, 5]); + }); + + it("emitAsync respects timeout", async () => { + const hook = defineEventHook<void>("test/timeout"); + let completed = false; + + bus.on(hook, async () => { + await new Promise((resolve) => { + setTimeout(resolve, 100); + }); + completed = true; + }); + + await bus.emitAsync(hook, undefined, 10); + + expect(completed).toBe(false); + }); +}); + +describe("filter hooks", () => { + let logger: FakeLogger; + let bus: Bus; + + beforeEach(() => { + logger = createFakeLogger(); + bus = createBus(logger); + }); + + it("chains filters in registration order", async () => { + const hook = defineFilter<string>("test/chain"); + + bus.addFilter(hook, (value) => `${value}-a`); + bus.addFilter(hook, (value) => `${value}-b`); + + const result = await bus.applyFilters(hook, "start"); + expect(result).toBe("start-a-b"); + }); + + it("respects priority ordering (lower runs first)", async () => { + const hook = defineFilter<string>("test/priority"); + + bus.addFilter(hook, (value) => `${value}-second`); + bus.addFilter(hook, (value) => `${value}-first`, { priority: -1 }); + + const result = await bus.applyFilters(hook, "start"); + expect(result).toBe("start-first-second"); + }); + + it("fail-open passes value through on throw", async () => { + const hook = defineFilter<number>("test/fail-open"); + + bus.addFilter(hook, (value) => value + 1); + bus.addFilter(hook, () => { + throw new Error("filter boom"); + }); + bus.addFilter(hook, (value) => value * 2); + + const result = await bus.applyFilters(hook, 5); + expect(result).toBe(12); + expect(logger.errors).toHaveLength(1); + expect(logger.errors[0]?.message).toContain("test/fail-open"); + }); + + it("fail-closed propagates the error", async () => { + const hook = defineFilter<number>("test/fail-closed"); + + bus.addFilter(hook, () => { + throw new Error("closed boom"); + }); + + await expect(bus.applyFilters(hook, 5, { failClosed: true })).rejects.toThrow("closed boom"); + }); + + it("applyFilters with no filters returns value unchanged", async () => { + const hook = defineFilter<string>("test/no-filters"); + const result = await bus.applyFilters(hook, "unchanged"); + expect(result).toBe("unchanged"); + }); + + it("unsubscribe removes a filter from the chain", async () => { + const hook = defineFilter<string>("test/filter-unsub"); + + const unsub = bus.addFilter(hook, (value) => `${value}-removed`); + bus.addFilter(hook, (value) => `${value}-kept`); + + unsub(); + + const result = await bus.applyFilters(hook, "start"); + expect(result).toBe("start-kept"); + }); +}); + +describe("services", () => { + let logger: FakeLogger; + let bus: Bus; + + beforeEach(() => { + logger = createFakeLogger(); + bus = createBus(logger); + }); + + it("provide and get round-trips", () => { + const handle = defineService<{ greet: (name: string) => string }>("test/service"); + const impl = { greet: (name: string) => `hello ${name}` }; + + bus.provideService(handle, impl); + const retrieved = bus.getService(handle); + + expect(retrieved.greet("world")).toBe("hello world"); + }); + + it("getService on missing service throws", () => { + const handle = defineService<string>("test/missing"); + expect(() => bus.getService(handle)).toThrow("test/missing"); + }); + + it("double-provide throws", () => { + const handle = defineService<number>("test/double"); + + bus.provideService(handle, 1); + expect(() => bus.provideService(handle, 2)).toThrow("test/double"); + }); +}); + +describe("pure functions", () => { + describe("dispatchEventSync", () => { + it("calls all handlers with the payload", () => { + const logger = createFakeLogger(); + const received: number[] = []; + + dispatchEventSync( + [ + (payload) => { + received.push(payload); + }, + (payload) => { + received.push(payload * 2); + }, + ], + 5, + logger, + "test", + ); + + expect(received).toEqual([5, 10]); + }); + + it("catches sync throws and logs them", () => { + const logger = createFakeLogger(); + const received: number[] = []; + + dispatchEventSync( + [ + () => { + throw new Error("sync boom"); + }, + (payload) => { + received.push(payload); + }, + ], + 42, + logger, + "test/sync", + ); + + expect(received).toEqual([42]); + expect(logger.errors).toHaveLength(1); + }); + }); + + describe("sortFilters", () => { + it("sorts by priority ascending, then by order ascending", () => { + const entries = [ + { fn: async (v: number) => v, priority: 10, order: 0 }, + { fn: async (v: number) => v, priority: -1, order: 1 }, + { fn: async (v: number) => v, priority: 10, order: 2 }, + { fn: async (v: number) => v, priority: 0, order: 3 }, + ]; + + const sorted = sortFilters(entries); + expect(sorted.map((e) => e.order)).toEqual([1, 3, 0, 2]); + }); + + it("preserves registration order when priorities are equal", () => { + const entries = [ + { fn: async (v: string) => v, priority: 0, order: 0 }, + { fn: async (v: string) => v, priority: 0, order: 1 }, + { fn: async (v: string) => v, priority: 0, order: 2 }, + ]; + + const sorted = sortFilters(entries); + expect(sorted.map((e) => e.order)).toEqual([0, 1, 2]); + }); + }); + + describe("applyFilterChain", () => { + it("applies filters in order", async () => { + const logger = createFakeLogger(); + const result = await applyFilterChain([(v) => v + 1, (v) => v * 3], 2, logger, "test", false); + expect(result).toBe(9); + }); + + it("fail-open skips the throwing filter", async () => { + const logger = createFakeLogger(); + const result = await applyFilterChain( + [ + (v) => v + 10, + () => { + throw new Error("skip me"); + }, + (v) => v + 1, + ], + 0, + logger, + "test", + false, + ); + expect(result).toBe(11); + expect(logger.errors).toHaveLength(1); + }); + + it("fail-closed throws on error", async () => { + const logger = createFakeLogger(); + await expect( + applyFilterChain( + [ + () => { + throw new Error("closed"); + }, + ], + 0, + logger, + "test", + true, + ), + ).rejects.toThrow("closed"); + }); + }); +}); diff --git a/packages/kernel/src/bus/bus.ts b/packages/kernel/src/bus/bus.ts new file mode 100644 index 0000000..03d692e --- /dev/null +++ b/packages/kernel/src/bus/bus.ts @@ -0,0 +1,139 @@ +import type { Logger } from "../contracts/extension.js"; +import type { + EventHandler, + EventHookDescriptor, + FilterDescriptor, + FilterHandler, + ServiceHandle, +} from "../contracts/hooks.js"; +import { + applyFilterChain, + dispatchEventAsync, + dispatchEventSync, + type FilterEntry, + sortFilters, +} from "./pure.js"; + +export interface Bus { + readonly on: <T>(hook: EventHookDescriptor<T>, handler: EventHandler<T>) => () => void; + readonly emit: <T>(hook: EventHookDescriptor<T>, payload: T) => void; + readonly emitAsync: <T>( + hook: EventHookDescriptor<T>, + payload: T, + timeoutMs?: number, + ) => Promise<void>; + readonly addFilter: <T>( + hook: FilterDescriptor<T>, + fn: FilterHandler<T>, + opts?: { readonly priority?: number }, + ) => () => void; + readonly applyFilters: <T>( + hook: FilterDescriptor<T>, + value: T, + opts?: { readonly failClosed?: boolean }, + ) => Promise<T>; + readonly provideService: <T>(handle: ServiceHandle<T>, impl: T) => void; + readonly getService: <T>(handle: ServiceHandle<T>) => T; +} + +interface StoredFilterEntry { + readonly fn: unknown; + readonly priority: number; + readonly order: number; +} + +export function createBus(logger: Logger): Bus { + const eventHandlers = new Map<string, Set<unknown>>(); + const filterEntries = new Map<string, StoredFilterEntry[]>(); + const services = new Map<string, unknown>(); + let filterOrderCounter = 0; + + return { + on<T>(hook: EventHookDescriptor<T>, handler: EventHandler<T>): () => void { + let set = eventHandlers.get(hook.id); + if (set === undefined) { + set = new Set(); + eventHandlers.set(hook.id, set); + } + const stored: unknown = handler; + set.add(stored); + return () => { + const current = eventHandlers.get(hook.id); + if (current !== undefined) current.delete(stored); + }; + }, + + emit<T>(hook: EventHookDescriptor<T>, payload: T): void { + const set = eventHandlers.get(hook.id); + if (set === undefined || set.size === 0) return; + const handlers = [...set] as Array<EventHandler<T>>; + dispatchEventSync(handlers, payload, logger, hook.id); + }, + + async emitAsync<T>( + hook: EventHookDescriptor<T>, + payload: T, + timeoutMs?: number, + ): Promise<void> { + const set = eventHandlers.get(hook.id); + if (set === undefined || set.size === 0) return; + const handlers = [...set] as Array<EventHandler<T>>; + await dispatchEventAsync(handlers, payload, logger, hook.id, timeoutMs); + }, + + addFilter<T>( + hook: FilterDescriptor<T>, + fn: FilterHandler<T>, + opts?: { readonly priority?: number }, + ): () => void { + let entries = filterEntries.get(hook.id); + if (entries === undefined) { + entries = []; + filterEntries.set(hook.id, entries); + } + const entry: StoredFilterEntry = { + fn, + priority: opts?.priority ?? 0, + order: filterOrderCounter++, + }; + entries.push(entry); + return () => { + const current = filterEntries.get(hook.id); + if (current === undefined) return; + const idx = current.indexOf(entry); + if (idx !== -1) current.splice(idx, 1); + }; + }, + + async applyFilters<T>( + hook: FilterDescriptor<T>, + value: T, + opts?: { readonly failClosed?: boolean }, + ): Promise<T> { + const entries = filterEntries.get(hook.id); + if (entries === undefined || entries.length === 0) return value; + const sorted = sortFilters(entries as ReadonlyArray<FilterEntry<T>>); + const fns = sorted.map((e) => e.fn) as Array<FilterHandler<T>>; + return applyFilterChain(fns, value, logger, hook.id, opts?.failClosed ?? false); + }, + + provideService<T>(handle: ServiceHandle<T>, impl: T): void { + if (services.has(handle.id)) { + throw new Error( + `Service "${handle.id}" is already provided. Only one provider per handle is allowed.`, + ); + } + services.set(handle.id, impl); + }, + + getService<T>(handle: ServiceHandle<T>): T { + const impl = services.get(handle.id); + if (impl === undefined) { + throw new Error( + `Service "${handle.id}" has no provider. Call provideService before getService.`, + ); + } + return impl as T; + }, + }; +} diff --git a/packages/kernel/src/bus/index.ts b/packages/kernel/src/bus/index.ts new file mode 100644 index 0000000..860bab0 --- /dev/null +++ b/packages/kernel/src/bus/index.ts @@ -0,0 +1,4 @@ +export type { Bus } from "./bus.js"; +export { createBus } from "./bus.js"; +export type { FilterEntry } from "./pure.js"; +export { applyFilterChain, dispatchEventAsync, dispatchEventSync, sortFilters } from "./pure.js"; diff --git a/packages/kernel/src/bus/pure.ts b/packages/kernel/src/bus/pure.ts new file mode 100644 index 0000000..7cd9143 --- /dev/null +++ b/packages/kernel/src/bus/pure.ts @@ -0,0 +1,83 @@ +import type { Logger } from "../contracts/extension.js"; +import type { EventHandler, FilterHandler } from "../contracts/hooks.js"; + +export function dispatchEventSync<T>( + handlers: ReadonlyArray<EventHandler<T>>, + payload: T, + logger: Logger, + hookId: string, +): void { + for (const handler of handlers) { + try { + const result = handler(payload); + if (result instanceof Promise) { + result.catch((err: unknown) => { + logger.error(`Event hook "${hookId}" handler rejected`, err); + }); + } + } catch (err) { + logger.error(`Event hook "${hookId}" handler threw`, err); + } + } +} + +export async function dispatchEventAsync<T>( + handlers: ReadonlyArray<EventHandler<T>>, + payload: T, + logger: Logger, + hookId: string, + timeoutMs?: number, +): Promise<void> { + const promises = handlers.map(async (handler) => { + try { + await handler(payload); + } catch (err) { + logger.error(`Event hook "${hookId}" handler threw`, err); + } + }); + + if (timeoutMs !== undefined) { + await Promise.race([ + Promise.all(promises), + new Promise<void>((resolve) => { + setTimeout(resolve, timeoutMs); + }), + ]); + } else { + await Promise.all(promises); + } +} + +export interface FilterEntry<T> { + readonly fn: FilterHandler<T>; + readonly priority: number; + readonly order: number; +} + +export function sortFilters<T>( + entries: ReadonlyArray<FilterEntry<T>>, +): ReadonlyArray<FilterEntry<T>> { + return [...entries].sort((a, b) => { + if (a.priority !== b.priority) return a.priority - b.priority; + return a.order - b.order; + }); +} + +export async function applyFilterChain<T>( + filters: ReadonlyArray<FilterHandler<T>>, + value: T, + logger: Logger, + hookId: string, + failClosed: boolean, +): Promise<T> { + let current = value; + for (const fn of filters) { + try { + current = await fn(current); + } catch (err) { + if (failClosed) throw err; + logger.error(`Filter "${hookId}" handler threw (fail-open, passing through)`, err); + } + } + return current; +} diff --git a/packages/kernel/src/index.ts b/packages/kernel/src/index.ts index a9ff31f..ac1a734 100644 --- a/packages/kernel/src/index.ts +++ b/packages/kernel/src/index.ts @@ -1,7 +1,7 @@ // @dispatch/kernel — the minimal runtime core. // // Exposes the ABI (contracts) that every extension and the runtime compile -// against. Host, runtime, and bus implementations are added by their own -// owner-agents and re-exported here as they land. +// against, plus kernel implementations (bus, host, runtime) as they land. +export * from "./bus/index.js"; export * from "./contracts/index.js"; |
