summaryrefslogtreecommitdiffhomepage
path: root/packages/kernel
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-04 22:40:46 +0900
committerAdam Malczewski <[email protected]>2026-06-04 22:40:56 +0900
commit669c269d972f7bfcd9036017ff5e668d2eeafdd8 (patch)
tree9dfd465feb78c1c30c75d09528476b0531689bcd /packages/kernel
parent0af8b1de733a199dbf5dcf779cc45f495048b1c3 (diff)
downloaddispatch-669c269d972f7bfcd9036017ff5e668d2eeafdd8.tar.gz
dispatch-669c269d972f7bfcd9036017ff5e668d2eeafdd8.zip
feat(kernel): event/hook bus (events, filters, services) — pure dispatch + stateful shell
Diffstat (limited to 'packages/kernel')
-rw-r--r--packages/kernel/src/bus/bus.test.ts365
-rw-r--r--packages/kernel/src/bus/bus.ts139
-rw-r--r--packages/kernel/src/bus/index.ts4
-rw-r--r--packages/kernel/src/bus/pure.ts83
-rw-r--r--packages/kernel/src/index.ts4
5 files changed, 593 insertions, 2 deletions
diff --git a/packages/kernel/src/bus/bus.test.ts b/packages/kernel/src/bus/bus.test.ts
new file mode 100644
index 0000000..7366b50
--- /dev/null
+++ b/packages/kernel/src/bus/bus.test.ts
@@ -0,0 +1,365 @@
+import { beforeEach, describe, expect, it } from "vitest";
+import type { Logger } from "../contracts/extension.js";
+import { defineEventHook, defineFilter, defineService } from "../contracts/hooks.js";
+import { type Bus, createBus } from "./bus.js";
+import { applyFilterChain, dispatchEventSync, sortFilters } from "./pure.js";
+
+interface FakeLogger extends Logger {
+ readonly errors: Array<{ message: string; args: unknown[] }>;
+}
+
+function createFakeLogger(): FakeLogger {
+ const errors: Array<{ message: string; args: unknown[] }> = [];
+ return {
+ errors,
+ debug: () => {},
+ info: () => {},
+ warn: () => {},
+ error: (message: string, ...args: unknown[]) => {
+ errors.push({ message, args });
+ },
+ };
+}
+
+describe("event hooks", () => {
+ let logger: FakeLogger;
+ let bus: Bus;
+
+ beforeEach(() => {
+ logger = createFakeLogger();
+ bus = createBus(logger);
+ });
+
+ it("fires all registered listeners", () => {
+ const hook = defineEventHook<{ value: number }>("test/event");
+ const received: number[] = [];
+
+ bus.on(hook, (payload) => {
+ received.push(payload.value);
+ });
+ bus.on(hook, (payload) => {
+ received.push(payload.value * 10);
+ });
+
+ bus.emit(hook, { value: 3 });
+
+ expect(received).toEqual([3, 30]);
+ });
+
+ it("isolates a throwing listener (others still run, error logged)", () => {
+ const hook = defineEventHook<string>("test/isolate");
+ const received: string[] = [];
+
+ bus.on(hook, () => {
+ throw new Error("boom");
+ });
+ bus.on(hook, (payload) => {
+ received.push(payload);
+ });
+
+ bus.emit(hook, "hello");
+
+ expect(received).toEqual(["hello"]);
+ expect(logger.errors).toHaveLength(1);
+ expect(logger.errors[0]?.message).toContain("test/isolate");
+ });
+
+ it("isolates an async handler rejection", async () => {
+ const hook = defineEventHook<string>("test/async-reject");
+ const received: string[] = [];
+
+ bus.on(hook, async () => {
+ throw new Error("async boom");
+ });
+ bus.on(hook, async (payload) => {
+ received.push(payload);
+ });
+
+ bus.emit(hook, "data");
+
+ await new Promise((resolve) => {
+ setTimeout(resolve, 10);
+ });
+
+ expect(received).toEqual(["data"]);
+ expect(logger.errors).toHaveLength(1);
+ expect(logger.errors[0]?.message).toContain("test/async-reject");
+ });
+
+ it("unsubscribe removes the handler", () => {
+ const hook = defineEventHook<void>("test/unsub");
+ let count = 0;
+
+ const unsub = bus.on(hook, () => {
+ count++;
+ });
+
+ bus.emit(hook, undefined);
+ expect(count).toBe(1);
+
+ unsub();
+ bus.emit(hook, undefined);
+ expect(count).toBe(1);
+ });
+
+ it("emit with no handlers is a no-op", () => {
+ const hook = defineEventHook<string>("test/empty");
+ expect(() => bus.emit(hook, "nothing")).not.toThrow();
+ });
+
+ it("emitAsync awaits all handlers", async () => {
+ const hook = defineEventHook<number>("test/async");
+ const received: number[] = [];
+
+ bus.on(hook, async (payload) => {
+ await new Promise((resolve) => {
+ setTimeout(resolve, 5);
+ });
+ received.push(payload);
+ });
+ bus.on(hook, async (payload) => {
+ received.push(payload * 2);
+ });
+
+ await bus.emitAsync(hook, 5);
+
+ expect(received).toEqual([10, 5]);
+ });
+
+ it("emitAsync respects timeout", async () => {
+ const hook = defineEventHook<void>("test/timeout");
+ let completed = false;
+
+ bus.on(hook, async () => {
+ await new Promise((resolve) => {
+ setTimeout(resolve, 100);
+ });
+ completed = true;
+ });
+
+ await bus.emitAsync(hook, undefined, 10);
+
+ expect(completed).toBe(false);
+ });
+});
+
+describe("filter hooks", () => {
+ let logger: FakeLogger;
+ let bus: Bus;
+
+ beforeEach(() => {
+ logger = createFakeLogger();
+ bus = createBus(logger);
+ });
+
+ it("chains filters in registration order", async () => {
+ const hook = defineFilter<string>("test/chain");
+
+ bus.addFilter(hook, (value) => `${value}-a`);
+ bus.addFilter(hook, (value) => `${value}-b`);
+
+ const result = await bus.applyFilters(hook, "start");
+ expect(result).toBe("start-a-b");
+ });
+
+ it("respects priority ordering (lower runs first)", async () => {
+ const hook = defineFilter<string>("test/priority");
+
+ bus.addFilter(hook, (value) => `${value}-second`);
+ bus.addFilter(hook, (value) => `${value}-first`, { priority: -1 });
+
+ const result = await bus.applyFilters(hook, "start");
+ expect(result).toBe("start-first-second");
+ });
+
+ it("fail-open passes value through on throw", async () => {
+ const hook = defineFilter<number>("test/fail-open");
+
+ bus.addFilter(hook, (value) => value + 1);
+ bus.addFilter(hook, () => {
+ throw new Error("filter boom");
+ });
+ bus.addFilter(hook, (value) => value * 2);
+
+ const result = await bus.applyFilters(hook, 5);
+ expect(result).toBe(12);
+ expect(logger.errors).toHaveLength(1);
+ expect(logger.errors[0]?.message).toContain("test/fail-open");
+ });
+
+ it("fail-closed propagates the error", async () => {
+ const hook = defineFilter<number>("test/fail-closed");
+
+ bus.addFilter(hook, () => {
+ throw new Error("closed boom");
+ });
+
+ await expect(bus.applyFilters(hook, 5, { failClosed: true })).rejects.toThrow("closed boom");
+ });
+
+ it("applyFilters with no filters returns value unchanged", async () => {
+ const hook = defineFilter<string>("test/no-filters");
+ const result = await bus.applyFilters(hook, "unchanged");
+ expect(result).toBe("unchanged");
+ });
+
+ it("unsubscribe removes a filter from the chain", async () => {
+ const hook = defineFilter<string>("test/filter-unsub");
+
+ const unsub = bus.addFilter(hook, (value) => `${value}-removed`);
+ bus.addFilter(hook, (value) => `${value}-kept`);
+
+ unsub();
+
+ const result = await bus.applyFilters(hook, "start");
+ expect(result).toBe("start-kept");
+ });
+});
+
+describe("services", () => {
+ let logger: FakeLogger;
+ let bus: Bus;
+
+ beforeEach(() => {
+ logger = createFakeLogger();
+ bus = createBus(logger);
+ });
+
+ it("provide and get round-trips", () => {
+ const handle = defineService<{ greet: (name: string) => string }>("test/service");
+ const impl = { greet: (name: string) => `hello ${name}` };
+
+ bus.provideService(handle, impl);
+ const retrieved = bus.getService(handle);
+
+ expect(retrieved.greet("world")).toBe("hello world");
+ });
+
+ it("getService on missing service throws", () => {
+ const handle = defineService<string>("test/missing");
+ expect(() => bus.getService(handle)).toThrow("test/missing");
+ });
+
+ it("double-provide throws", () => {
+ const handle = defineService<number>("test/double");
+
+ bus.provideService(handle, 1);
+ expect(() => bus.provideService(handle, 2)).toThrow("test/double");
+ });
+});
+
+describe("pure functions", () => {
+ describe("dispatchEventSync", () => {
+ it("calls all handlers with the payload", () => {
+ const logger = createFakeLogger();
+ const received: number[] = [];
+
+ dispatchEventSync(
+ [
+ (payload) => {
+ received.push(payload);
+ },
+ (payload) => {
+ received.push(payload * 2);
+ },
+ ],
+ 5,
+ logger,
+ "test",
+ );
+
+ expect(received).toEqual([5, 10]);
+ });
+
+ it("catches sync throws and logs them", () => {
+ const logger = createFakeLogger();
+ const received: number[] = [];
+
+ dispatchEventSync(
+ [
+ () => {
+ throw new Error("sync boom");
+ },
+ (payload) => {
+ received.push(payload);
+ },
+ ],
+ 42,
+ logger,
+ "test/sync",
+ );
+
+ expect(received).toEqual([42]);
+ expect(logger.errors).toHaveLength(1);
+ });
+ });
+
+ describe("sortFilters", () => {
+ it("sorts by priority ascending, then by order ascending", () => {
+ const entries = [
+ { fn: async (v: number) => v, priority: 10, order: 0 },
+ { fn: async (v: number) => v, priority: -1, order: 1 },
+ { fn: async (v: number) => v, priority: 10, order: 2 },
+ { fn: async (v: number) => v, priority: 0, order: 3 },
+ ];
+
+ const sorted = sortFilters(entries);
+ expect(sorted.map((e) => e.order)).toEqual([1, 3, 0, 2]);
+ });
+
+ it("preserves registration order when priorities are equal", () => {
+ const entries = [
+ { fn: async (v: string) => v, priority: 0, order: 0 },
+ { fn: async (v: string) => v, priority: 0, order: 1 },
+ { fn: async (v: string) => v, priority: 0, order: 2 },
+ ];
+
+ const sorted = sortFilters(entries);
+ expect(sorted.map((e) => e.order)).toEqual([0, 1, 2]);
+ });
+ });
+
+ describe("applyFilterChain", () => {
+ it("applies filters in order", async () => {
+ const logger = createFakeLogger();
+ const result = await applyFilterChain([(v) => v + 1, (v) => v * 3], 2, logger, "test", false);
+ expect(result).toBe(9);
+ });
+
+ it("fail-open skips the throwing filter", async () => {
+ const logger = createFakeLogger();
+ const result = await applyFilterChain(
+ [
+ (v) => v + 10,
+ () => {
+ throw new Error("skip me");
+ },
+ (v) => v + 1,
+ ],
+ 0,
+ logger,
+ "test",
+ false,
+ );
+ expect(result).toBe(11);
+ expect(logger.errors).toHaveLength(1);
+ });
+
+ it("fail-closed throws on error", async () => {
+ const logger = createFakeLogger();
+ await expect(
+ applyFilterChain(
+ [
+ () => {
+ throw new Error("closed");
+ },
+ ],
+ 0,
+ logger,
+ "test",
+ true,
+ ),
+ ).rejects.toThrow("closed");
+ });
+ });
+});
diff --git a/packages/kernel/src/bus/bus.ts b/packages/kernel/src/bus/bus.ts
new file mode 100644
index 0000000..03d692e
--- /dev/null
+++ b/packages/kernel/src/bus/bus.ts
@@ -0,0 +1,139 @@
+import type { Logger } from "../contracts/extension.js";
+import type {
+ EventHandler,
+ EventHookDescriptor,
+ FilterDescriptor,
+ FilterHandler,
+ ServiceHandle,
+} from "../contracts/hooks.js";
+import {
+ applyFilterChain,
+ dispatchEventAsync,
+ dispatchEventSync,
+ type FilterEntry,
+ sortFilters,
+} from "./pure.js";
+
+export interface Bus {
+ readonly on: <T>(hook: EventHookDescriptor<T>, handler: EventHandler<T>) => () => void;
+ readonly emit: <T>(hook: EventHookDescriptor<T>, payload: T) => void;
+ readonly emitAsync: <T>(
+ hook: EventHookDescriptor<T>,
+ payload: T,
+ timeoutMs?: number,
+ ) => Promise<void>;
+ readonly addFilter: <T>(
+ hook: FilterDescriptor<T>,
+ fn: FilterHandler<T>,
+ opts?: { readonly priority?: number },
+ ) => () => void;
+ readonly applyFilters: <T>(
+ hook: FilterDescriptor<T>,
+ value: T,
+ opts?: { readonly failClosed?: boolean },
+ ) => Promise<T>;
+ readonly provideService: <T>(handle: ServiceHandle<T>, impl: T) => void;
+ readonly getService: <T>(handle: ServiceHandle<T>) => T;
+}
+
+interface StoredFilterEntry {
+ readonly fn: unknown;
+ readonly priority: number;
+ readonly order: number;
+}
+
+export function createBus(logger: Logger): Bus {
+ const eventHandlers = new Map<string, Set<unknown>>();
+ const filterEntries = new Map<string, StoredFilterEntry[]>();
+ const services = new Map<string, unknown>();
+ let filterOrderCounter = 0;
+
+ return {
+ on<T>(hook: EventHookDescriptor<T>, handler: EventHandler<T>): () => void {
+ let set = eventHandlers.get(hook.id);
+ if (set === undefined) {
+ set = new Set();
+ eventHandlers.set(hook.id, set);
+ }
+ const stored: unknown = handler;
+ set.add(stored);
+ return () => {
+ const current = eventHandlers.get(hook.id);
+ if (current !== undefined) current.delete(stored);
+ };
+ },
+
+ emit<T>(hook: EventHookDescriptor<T>, payload: T): void {
+ const set = eventHandlers.get(hook.id);
+ if (set === undefined || set.size === 0) return;
+ const handlers = [...set] as Array<EventHandler<T>>;
+ dispatchEventSync(handlers, payload, logger, hook.id);
+ },
+
+ async emitAsync<T>(
+ hook: EventHookDescriptor<T>,
+ payload: T,
+ timeoutMs?: number,
+ ): Promise<void> {
+ const set = eventHandlers.get(hook.id);
+ if (set === undefined || set.size === 0) return;
+ const handlers = [...set] as Array<EventHandler<T>>;
+ await dispatchEventAsync(handlers, payload, logger, hook.id, timeoutMs);
+ },
+
+ addFilter<T>(
+ hook: FilterDescriptor<T>,
+ fn: FilterHandler<T>,
+ opts?: { readonly priority?: number },
+ ): () => void {
+ let entries = filterEntries.get(hook.id);
+ if (entries === undefined) {
+ entries = [];
+ filterEntries.set(hook.id, entries);
+ }
+ const entry: StoredFilterEntry = {
+ fn,
+ priority: opts?.priority ?? 0,
+ order: filterOrderCounter++,
+ };
+ entries.push(entry);
+ return () => {
+ const current = filterEntries.get(hook.id);
+ if (current === undefined) return;
+ const idx = current.indexOf(entry);
+ if (idx !== -1) current.splice(idx, 1);
+ };
+ },
+
+ async applyFilters<T>(
+ hook: FilterDescriptor<T>,
+ value: T,
+ opts?: { readonly failClosed?: boolean },
+ ): Promise<T> {
+ const entries = filterEntries.get(hook.id);
+ if (entries === undefined || entries.length === 0) return value;
+ const sorted = sortFilters(entries as ReadonlyArray<FilterEntry<T>>);
+ const fns = sorted.map((e) => e.fn) as Array<FilterHandler<T>>;
+ return applyFilterChain(fns, value, logger, hook.id, opts?.failClosed ?? false);
+ },
+
+ provideService<T>(handle: ServiceHandle<T>, impl: T): void {
+ if (services.has(handle.id)) {
+ throw new Error(
+ `Service "${handle.id}" is already provided. Only one provider per handle is allowed.`,
+ );
+ }
+ services.set(handle.id, impl);
+ },
+
+ getService<T>(handle: ServiceHandle<T>): T {
+ const impl = services.get(handle.id);
+ if (impl === undefined) {
+ throw new Error(
+ `Service "${handle.id}" has no provider. Call provideService before getService.`,
+ );
+ }
+ return impl as T;
+ },
+ };
+}
diff --git a/packages/kernel/src/bus/index.ts b/packages/kernel/src/bus/index.ts
new file mode 100644
index 0000000..860bab0
--- /dev/null
+++ b/packages/kernel/src/bus/index.ts
@@ -0,0 +1,4 @@
+export type { Bus } from "./bus.js";
+export { createBus } from "./bus.js";
+export type { FilterEntry } from "./pure.js";
+export { applyFilterChain, dispatchEventAsync, dispatchEventSync, sortFilters } from "./pure.js";
diff --git a/packages/kernel/src/bus/pure.ts b/packages/kernel/src/bus/pure.ts
new file mode 100644
index 0000000..7cd9143
--- /dev/null
+++ b/packages/kernel/src/bus/pure.ts
@@ -0,0 +1,83 @@
+import type { Logger } from "../contracts/extension.js";
+import type { EventHandler, FilterHandler } from "../contracts/hooks.js";
+
+export function dispatchEventSync<T>(
+ handlers: ReadonlyArray<EventHandler<T>>,
+ payload: T,
+ logger: Logger,
+ hookId: string,
+): void {
+ for (const handler of handlers) {
+ try {
+ const result = handler(payload);
+ if (result instanceof Promise) {
+ result.catch((err: unknown) => {
+ logger.error(`Event hook "${hookId}" handler rejected`, err);
+ });
+ }
+ } catch (err) {
+ logger.error(`Event hook "${hookId}" handler threw`, err);
+ }
+ }
+}
+
+export async function dispatchEventAsync<T>(
+ handlers: ReadonlyArray<EventHandler<T>>,
+ payload: T,
+ logger: Logger,
+ hookId: string,
+ timeoutMs?: number,
+): Promise<void> {
+ const promises = handlers.map(async (handler) => {
+ try {
+ await handler(payload);
+ } catch (err) {
+ logger.error(`Event hook "${hookId}" handler threw`, err);
+ }
+ });
+
+ if (timeoutMs !== undefined) {
+ await Promise.race([
+ Promise.all(promises),
+ new Promise<void>((resolve) => {
+ setTimeout(resolve, timeoutMs);
+ }),
+ ]);
+ } else {
+ await Promise.all(promises);
+ }
+}
+
+export interface FilterEntry<T> {
+ readonly fn: FilterHandler<T>;
+ readonly priority: number;
+ readonly order: number;
+}
+
+export function sortFilters<T>(
+ entries: ReadonlyArray<FilterEntry<T>>,
+): ReadonlyArray<FilterEntry<T>> {
+ return [...entries].sort((a, b) => {
+ if (a.priority !== b.priority) return a.priority - b.priority;
+ return a.order - b.order;
+ });
+}
+
+export async function applyFilterChain<T>(
+ filters: ReadonlyArray<FilterHandler<T>>,
+ value: T,
+ logger: Logger,
+ hookId: string,
+ failClosed: boolean,
+): Promise<T> {
+ let current = value;
+ for (const fn of filters) {
+ try {
+ current = await fn(current);
+ } catch (err) {
+ if (failClosed) throw err;
+ logger.error(`Filter "${hookId}" handler threw (fail-open, passing through)`, err);
+ }
+ }
+ return current;
+}
diff --git a/packages/kernel/src/index.ts b/packages/kernel/src/index.ts
index a9ff31f..ac1a734 100644
--- a/packages/kernel/src/index.ts
+++ b/packages/kernel/src/index.ts
@@ -1,7 +1,7 @@
// @dispatch/kernel — the minimal runtime core.
//
// Exposes the ABI (contracts) that every extension and the runtime compile
-// against. Host, runtime, and bus implementations are added by their own
-// owner-agents and re-exported here as they land.
+// against, plus kernel implementations (bus, host, runtime) as they land.
+export * from "./bus/index.js";
export * from "./contracts/index.js";