summaryrefslogtreecommitdiffhomepage
path: root/src/adapters
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-06 22:08:16 +0900
committerAdam Malczewski <[email protected]>2026-06-06 22:08:16 +0900
commite1c8cf3257cb33457aa882c548f5195ecc0f9854 (patch)
treed355147cdab8eb77917ad02caedf26b3d8d0be57 /src/adapters
downloaddispatch-web-e1c8cf3257cb33457aa882c548f5195ecc0f9854.tar.gz
dispatch-web-e1c8cf3257cb33457aa882c548f5195ecc0f9854.zip
Slice 1: surface system + WS transport + composition root
Pure-core feature libraries assembled at the composition root: - core/protocol: pure reducer over surface catalog/spec/error messages - features/surface-host: generic field-kind interpreter (toggle/progress/ selector/stat/button) + pure plan logic; no surface-id special-casing - adapters/ws: injected WebSocket client (effects at the edge) - app: composition root store (Svelte 5 runes over the pure reducer), host-relative surface WS URL resolution (resolveWsUrl), root App.svelte Verified green: svelte-check 0/0, vitest 84 passed, biome clean, vite build ok.
Diffstat (limited to 'src/adapters')
-rw-r--r--src/adapters/ws/index.test.ts234
-rw-r--r--src/adapters/ws/index.ts98
-rw-r--r--src/adapters/ws/logic.test.ts195
-rw-r--r--src/adapters/ws/logic.ts91
4 files changed, 618 insertions, 0 deletions
diff --git a/src/adapters/ws/index.test.ts b/src/adapters/ws/index.test.ts
new file mode 100644
index 0000000..92b8753
--- /dev/null
+++ b/src/adapters/ws/index.test.ts
@@ -0,0 +1,234 @@
+import { describe, expect, it, vi } from "vitest";
+import type { WebSocketLike } from "./index";
+import { createSurfaceSocket } from "./index";
+
+interface FakeSocket extends WebSocketLike {
+ sent: string[];
+ resolveOpen(): void;
+ invokeMessage(data: string): void;
+ invokeClose(): void;
+}
+
+function fakeSocket(): FakeSocket {
+ let onopen: (() => void) | null = null;
+ let onmessage: ((ev: { data: string }) => void) | null = null;
+ let onclose: ((ev: { code: number; reason: string }) => void) | null = null;
+ const sent: string[] = [];
+
+ const ws: FakeSocket = {
+ send(data: string) {
+ sent.push(data);
+ },
+ close() {},
+ get onopen() {
+ return onopen;
+ },
+ set onopen(fn) {
+ onopen = fn;
+ },
+ get onmessage() {
+ return onmessage;
+ },
+ set onmessage(fn) {
+ onmessage = fn;
+ },
+ get onclose() {
+ return onclose;
+ },
+ set onclose(fn) {
+ onclose = fn;
+ },
+ resolveOpen() {
+ onopen?.();
+ },
+ invokeMessage(data: string) {
+ onmessage?.({ data });
+ },
+ invokeClose() {
+ onclose?.({ code: 1000, reason: "" });
+ },
+ sent,
+ };
+ return ws;
+}
+
+describe("createSurfaceSocket", () => {
+ it("sends queued messages once socket opens", () => {
+ const ws = fakeSocket();
+ const onMessage = vi.fn();
+ const handle = createSurfaceSocket({
+ url: "ws://test",
+ onMessage,
+ socketFactory: () => ws,
+ });
+
+ handle.send({ type: "subscribe", surfaceId: "s1" });
+ handle.send({ type: "subscribe", surfaceId: "s2" });
+ expect(ws.sent).toHaveLength(0);
+
+ ws.resolveOpen();
+ expect(ws.sent).toHaveLength(2);
+ expect(JSON.parse(ws.sent[0] ?? "")).toEqual({ type: "subscribe", surfaceId: "s1" });
+ expect(JSON.parse(ws.sent[1] ?? "")).toEqual({ type: "subscribe", surfaceId: "s2" });
+ });
+
+ it("sends immediately when socket is already open", () => {
+ const ws = fakeSocket();
+ const handle = createSurfaceSocket({
+ url: "ws://test",
+ onMessage: vi.fn(),
+ socketFactory: () => ws,
+ });
+
+ ws.resolveOpen();
+ ws.sent.length = 0;
+
+ handle.send({ type: "subscribe", surfaceId: "s1" });
+ expect(ws.sent).toHaveLength(1);
+ });
+
+ it("routes inbound messages to onMessage via parseServerMessage", () => {
+ const ws = fakeSocket();
+ const onMessage = vi.fn();
+ createSurfaceSocket({
+ url: "ws://test",
+ onMessage,
+ socketFactory: () => ws,
+ });
+
+ ws.resolveOpen();
+ ws.invokeMessage(JSON.stringify({ type: "catalog", catalog: [] }));
+ expect(onMessage).toHaveBeenCalledOnce();
+ expect(onMessage).toHaveBeenCalledWith({ type: "catalog", catalog: [] });
+ });
+
+ it("drops malformed inbound messages silently", () => {
+ const ws = fakeSocket();
+ const onMessage = vi.fn();
+ createSurfaceSocket({
+ url: "ws://test",
+ onMessage,
+ socketFactory: () => ws,
+ });
+
+ ws.resolveOpen();
+ ws.invokeMessage("not json");
+ expect(onMessage).not.toHaveBeenCalled();
+ });
+
+ it("auto-reconnects on close and fires onReopen after successful reconnect", () => {
+ vi.useFakeTimers();
+ try {
+ const sockets: ReturnType<typeof fakeSocket>[] = [];
+ const onMessage = vi.fn();
+ const onReopen = vi.fn();
+ createSurfaceSocket({
+ url: "ws://test",
+ onMessage,
+ onReopen,
+ socketFactory: () => {
+ const ws = fakeSocket();
+ sockets.push(ws);
+ return ws;
+ },
+ });
+
+ expect(sockets).toHaveLength(1);
+ sockets[0]?.resolveOpen();
+
+ // Simulate close
+ sockets[0]?.invokeClose();
+
+ // Fast-forward past the backoff delay
+ vi.advanceTimersByTime(600);
+
+ expect(sockets).toHaveLength(2);
+ // onReopen should NOT have fired yet (socket not open)
+ expect(onReopen).not.toHaveBeenCalled();
+
+ sockets[1]?.resolveOpen();
+ expect(onReopen).toHaveBeenCalledOnce();
+ } finally {
+ vi.useRealTimers();
+ }
+ });
+
+ it("does not fire onReopen on initial connect", () => {
+ const ws = fakeSocket();
+ const onReopen = vi.fn();
+ createSurfaceSocket({
+ url: "ws://test",
+ onMessage: vi.fn(),
+ onReopen,
+ socketFactory: () => ws,
+ });
+
+ ws.resolveOpen();
+ expect(onReopen).not.toHaveBeenCalled();
+ });
+
+ it("close() prevents further reconnects", () => {
+ vi.useFakeTimers();
+ try {
+ const sockets: ReturnType<typeof fakeSocket>[] = [];
+ const handle = createSurfaceSocket({
+ url: "ws://test",
+ onMessage: vi.fn(),
+ socketFactory: () => {
+ const ws = fakeSocket();
+ sockets.push(ws);
+ return ws;
+ },
+ });
+
+ sockets[0]?.resolveOpen();
+ sockets[0]?.invokeClose();
+ handle.close();
+
+ vi.advanceTimersByTime(10_000);
+ expect(sockets).toHaveLength(1);
+ } finally {
+ vi.useRealTimers();
+ }
+ });
+
+ it("close() prevents further sends", () => {
+ const ws = fakeSocket();
+ const handle = createSurfaceSocket({
+ url: "ws://test",
+ onMessage: vi.fn(),
+ socketFactory: () => ws,
+ });
+
+ ws.resolveOpen();
+ ws.sent.length = 0;
+ handle.close();
+
+ handle.send({ type: "subscribe", surfaceId: "s1" });
+ expect(ws.sent).toHaveLength(0);
+ });
+
+ it("queues multiple sends before open and flushes in order", () => {
+ const ws = fakeSocket();
+ const handle = createSurfaceSocket({
+ url: "ws://test",
+ onMessage: vi.fn(),
+ socketFactory: () => ws,
+ });
+
+ handle.send({ type: "subscribe", surfaceId: "a" });
+ handle.send({ type: "subscribe", surfaceId: "b" });
+ handle.send({ type: "invoke", surfaceId: "a", actionId: "x", payload: 1 });
+ ws.resolveOpen();
+
+ expect(ws.sent).toHaveLength(3);
+ expect(JSON.parse(ws.sent[0] ?? "")).toEqual({ type: "subscribe", surfaceId: "a" });
+ expect(JSON.parse(ws.sent[1] ?? "")).toEqual({ type: "subscribe", surfaceId: "b" });
+ expect(JSON.parse(ws.sent[2] ?? "")).toEqual({
+ type: "invoke",
+ surfaceId: "a",
+ actionId: "x",
+ payload: 1,
+ });
+ });
+});
diff --git a/src/adapters/ws/index.ts b/src/adapters/ws/index.ts
new file mode 100644
index 0000000..40eda2b
--- /dev/null
+++ b/src/adapters/ws/index.ts
@@ -0,0 +1,98 @@
+import type { SurfaceClientMessage, SurfaceServerMessage } from "@dispatch/ui-contract";
+import { nextBackoffMs, parseServerMessage, serialize } from "./logic";
+
+export interface WebSocketLike {
+ send(data: string): void;
+ close(): void;
+ onopen: (() => void) | null;
+ onmessage: ((ev: { data: string }) => void) | null;
+ onclose: ((ev: { code: number; reason: string }) => void) | null;
+}
+
+export interface SurfaceSocketOptions {
+ url: string;
+ onMessage: (msg: SurfaceServerMessage) => void;
+ onReopen?: () => void;
+ socketFactory?: (url: string) => WebSocketLike;
+}
+
+export interface SurfaceSocketHandle {
+ send(msg: SurfaceClientMessage): void;
+ close(): void;
+}
+
+export function createSurfaceSocket(opts: SurfaceSocketOptions): SurfaceSocketHandle {
+ const factory =
+ opts.socketFactory ?? ((url: string) => new WebSocket(url) as unknown as WebSocketLike);
+
+ let socket: WebSocketLike | null = null;
+ let disposed = false;
+ let reconnectAttempt = 0;
+ let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
+ let isOpen = false;
+ const queue: string[] = [];
+
+ function connect(isReconnect: boolean): void {
+ socket = factory(opts.url);
+ isOpen = false;
+
+ socket.onopen = () => {
+ if (disposed) return;
+ isOpen = true;
+ reconnectAttempt = 0;
+ for (const raw of queue.splice(0)) {
+ socket?.send(raw);
+ }
+ if (isReconnect) {
+ opts.onReopen?.();
+ }
+ };
+
+ socket.onmessage = (ev) => {
+ if (disposed) return;
+ const msg = parseServerMessage(ev.data);
+ if (msg !== null) {
+ opts.onMessage(msg);
+ }
+ };
+
+ socket.onclose = () => {
+ if (disposed) return;
+ isOpen = false;
+ scheduleReconnect();
+ };
+ }
+
+ function scheduleReconnect(): void {
+ const delay = nextBackoffMs(reconnectAttempt);
+ reconnectAttempt++;
+ reconnectTimer = setTimeout(() => {
+ reconnectTimer = null;
+ if (disposed) return;
+ connect(true);
+ }, delay);
+ }
+
+ connect(false);
+
+ return {
+ send(msg: SurfaceClientMessage): void {
+ if (disposed) return;
+ const raw = serialize(msg);
+ if (isOpen) {
+ socket?.send(raw);
+ } else {
+ queue.push(raw);
+ }
+ },
+ close(): void {
+ disposed = true;
+ if (reconnectTimer !== null) {
+ clearTimeout(reconnectTimer);
+ reconnectTimer = null;
+ }
+ socket?.close();
+ socket = null;
+ },
+ };
+}
diff --git a/src/adapters/ws/logic.test.ts b/src/adapters/ws/logic.test.ts
new file mode 100644
index 0000000..62ae6a0
--- /dev/null
+++ b/src/adapters/ws/logic.test.ts
@@ -0,0 +1,195 @@
+import { describe, expect, it } from "vitest";
+import { nextBackoffMs, parseServerMessage, serialize } from "./logic";
+
+describe("serialize", () => {
+ it("serializes a subscribe message", () => {
+ const msg = { type: "subscribe" as const, surfaceId: "s1" };
+ expect(JSON.parse(serialize(msg))).toEqual(msg);
+ });
+
+ it("serializes an unsubscribe message", () => {
+ const msg = { type: "unsubscribe" as const, surfaceId: "s1" };
+ expect(JSON.parse(serialize(msg))).toEqual(msg);
+ });
+
+ it("serializes an invoke message with payload", () => {
+ const msg = { type: "invoke" as const, surfaceId: "s1", actionId: "toggle", payload: true };
+ expect(JSON.parse(serialize(msg))).toEqual(msg);
+ });
+
+ it("serializes an invoke message without payload", () => {
+ const msg = { type: "invoke" as const, surfaceId: "s1", actionId: "click" };
+ expect(JSON.parse(serialize(msg))).toEqual(msg);
+ });
+});
+
+describe("parseServerMessage", () => {
+ it("parses a catalog message", () => {
+ const data = JSON.stringify({
+ type: "catalog",
+ catalog: [{ id: "s1", region: "r", title: "S1" }],
+ });
+ const result = parseServerMessage(data);
+ expect(result).toEqual({
+ type: "catalog",
+ catalog: [{ id: "s1", region: "r", title: "S1" }],
+ });
+ });
+
+ it("parses a surface message", () => {
+ const data = JSON.stringify({
+ type: "surface",
+ spec: { id: "s1", region: "r", title: "S1", fields: [] },
+ });
+ const result = parseServerMessage(data);
+ expect(result).toEqual({
+ type: "surface",
+ spec: { id: "s1", region: "r", title: "S1", fields: [] },
+ });
+ });
+
+ it("parses an update message", () => {
+ const data = JSON.stringify({
+ type: "update",
+ update: {
+ surfaceId: "s1",
+ spec: { id: "s1", region: "r", title: "S1", fields: [] },
+ },
+ });
+ const result = parseServerMessage(data);
+ expect(result).toEqual({
+ type: "update",
+ update: {
+ surfaceId: "s1",
+ spec: { id: "s1", region: "r", title: "S1", fields: [] },
+ },
+ });
+ });
+
+ it("parses an error message with surfaceId", () => {
+ const data = JSON.stringify({ type: "error", surfaceId: "s1", message: "boom" });
+ const result = parseServerMessage(data);
+ expect(result).toEqual({ type: "error", surfaceId: "s1", message: "boom" });
+ });
+
+ it("parses an error message without surfaceId", () => {
+ const data = JSON.stringify({ type: "error", message: "global boom" });
+ const result = parseServerMessage(data);
+ expect(result).toEqual({ type: "error", message: "global boom" });
+ });
+
+ it("returns null for malformed JSON", () => {
+ expect(parseServerMessage("not json")).toBeNull();
+ expect(parseServerMessage("{broken")).toBeNull();
+ expect(parseServerMessage("")).toBeNull();
+ });
+
+ it("returns null for non-object JSON", () => {
+ expect(parseServerMessage("42")).toBeNull();
+ expect(parseServerMessage('"hello"')).toBeNull();
+ expect(parseServerMessage("null")).toBeNull();
+ expect(parseServerMessage("true")).toBeNull();
+ expect(parseServerMessage("[1,2,3]")).toBeNull();
+ });
+
+ it("returns null for unknown type", () => {
+ expect(parseServerMessage(JSON.stringify({ type: "unknown" }))).toBeNull();
+ });
+
+ it("returns null when type is missing", () => {
+ expect(parseServerMessage(JSON.stringify({ foo: "bar" }))).toBeNull();
+ });
+
+ it("returns null when type is not a string", () => {
+ expect(parseServerMessage(JSON.stringify({ type: 42 }))).toBeNull();
+ });
+
+ it("returns null for catalog with non-array catalog field", () => {
+ expect(parseServerMessage(JSON.stringify({ type: "catalog", catalog: "nope" }))).toBeNull();
+ });
+
+ it("returns null for surface with missing spec fields", () => {
+ expect(parseServerMessage(JSON.stringify({ type: "surface", spec: { id: "s1" } }))).toBeNull();
+ });
+
+ it("returns null for surface with non-object spec", () => {
+ expect(parseServerMessage(JSON.stringify({ type: "surface", spec: "nope" }))).toBeNull();
+ });
+
+ it("returns null for update with missing update field", () => {
+ expect(parseServerMessage(JSON.stringify({ type: "update" }))).toBeNull();
+ });
+
+ it("returns null for update with invalid spec", () => {
+ expect(
+ parseServerMessage(JSON.stringify({ type: "update", update: { surfaceId: "s1", spec: {} } })),
+ ).toBeNull();
+ });
+
+ it("returns null for error with non-string message", () => {
+ expect(parseServerMessage(JSON.stringify({ type: "error", message: 42 }))).toBeNull();
+ });
+
+ it("returns null for error with invalid surfaceId type", () => {
+ expect(
+ parseServerMessage(JSON.stringify({ type: "error", surfaceId: 42, message: "boom" })),
+ ).toBeNull();
+ });
+});
+
+describe("round-trip: parseServerMessage(serialize(...))", () => {
+ it("round-trips a subscribe message through serialize only", () => {
+ const msg = { type: "subscribe" as const, surfaceId: "s1" };
+ const wire = serialize(msg);
+ expect(JSON.parse(wire)).toEqual(msg);
+ });
+
+ it("round-trips an invoke message with payload", () => {
+ const msg = { type: "invoke" as const, surfaceId: "s1", actionId: "toggle", payload: false };
+ const wire = serialize(msg);
+ expect(JSON.parse(wire)).toEqual(msg);
+ });
+});
+
+describe("nextBackoffMs", () => {
+ it("returns a positive number", () => {
+ expect(nextBackoffMs(0)).toBeGreaterThan(0);
+ });
+
+ it("is capped at 30s + jitter (at most ~36s)", () => {
+ for (let i = 0; i < 100; i++) {
+ expect(nextBackoffMs(100)).toBeLessThanOrEqual(36_000);
+ }
+ });
+
+ it("starts around 500ms (±20% jitter)", () => {
+ for (let i = 0; i < 100; i++) {
+ const ms = nextBackoffMs(0);
+ expect(ms).toBeGreaterThanOrEqual(400);
+ expect(ms).toBeLessThanOrEqual(600);
+ }
+ });
+
+ it("grows exponentially with attempt", () => {
+ const averages = [0, 1, 2, 3].map((attempt) => {
+ let sum = 0;
+ for (let i = 0; i < 200; i++) {
+ sum += nextBackoffMs(attempt);
+ }
+ return sum / 200;
+ });
+ for (let i = 1; i < averages.length; i++) {
+ const prev = averages[i - 1];
+ if (prev === undefined) throw new Error("unreachable");
+ expect(averages[i]).toBeGreaterThan(prev);
+ }
+ });
+
+ it("treats negative attempt as 0", () => {
+ for (let i = 0; i < 50; i++) {
+ const ms = nextBackoffMs(-5);
+ expect(ms).toBeGreaterThanOrEqual(400);
+ expect(ms).toBeLessThanOrEqual(600);
+ }
+ });
+});
diff --git a/src/adapters/ws/logic.ts b/src/adapters/ws/logic.ts
new file mode 100644
index 0000000..83a5802
--- /dev/null
+++ b/src/adapters/ws/logic.ts
@@ -0,0 +1,91 @@
+import type {
+ CatalogMessage,
+ SurfaceClientMessage,
+ SurfaceErrorMessage,
+ SurfaceMessage,
+ SurfaceServerMessage,
+ SurfaceUpdateMessage,
+} from "@dispatch/ui-contract";
+
+const VALID_SERVER_TYPES = new Set(["catalog", "surface", "update", "error"]);
+
+/** Serialize a client message to a JSON string for the wire. */
+export function serialize(msg: SurfaceClientMessage): string {
+ return JSON.stringify(msg);
+}
+
+function isRecord(v: unknown): v is Record<string, unknown> {
+ return v !== null && typeof v === "object" && !Array.isArray(v);
+}
+
+/**
+ * Parse a raw server message string into a typed SurfaceServerMessage.
+ * Returns null for malformed JSON or shapes that don't match the protocol.
+ */
+export function parseServerMessage(data: string): SurfaceServerMessage | null {
+ let parsed: unknown;
+ try {
+ parsed = JSON.parse(data);
+ } catch {
+ return null;
+ }
+ if (!isRecord(parsed)) {
+ return null;
+ }
+ const t = parsed.type;
+ if (typeof t !== "string" || !VALID_SERVER_TYPES.has(t)) {
+ return null;
+ }
+ switch (t) {
+ case "catalog": {
+ if (!Array.isArray(parsed.catalog)) return null;
+ return { type: "catalog", catalog: parsed.catalog as CatalogMessage["catalog"] };
+ }
+ case "surface": {
+ const spec = parsed.spec;
+ if (!isRecord(spec)) return null;
+ if (typeof spec.id !== "string") return null;
+ if (typeof spec.region !== "string") return null;
+ if (typeof spec.title !== "string") return null;
+ if (!Array.isArray(spec.fields)) return null;
+ return { type: "surface", spec: spec as unknown as SurfaceMessage["spec"] };
+ }
+ case "update": {
+ const update = parsed.update;
+ if (!isRecord(update)) return null;
+ if (typeof update.surfaceId !== "string") return null;
+ const spec = update.spec;
+ if (!isRecord(spec)) return null;
+ if (typeof spec.id !== "string") return null;
+ if (typeof spec.region !== "string") return null;
+ if (typeof spec.title !== "string") return null;
+ if (!Array.isArray(spec.fields)) return null;
+ return { type: "update", update: update as unknown as SurfaceUpdateMessage["update"] };
+ }
+ case "error": {
+ if (typeof parsed.message !== "string") return null;
+ const surfaceId = parsed.surfaceId;
+ if (surfaceId !== undefined && typeof surfaceId !== "string") return null;
+ const msg: SurfaceErrorMessage =
+ surfaceId !== undefined
+ ? { type: "error", surfaceId, message: parsed.message }
+ : { type: "error", message: parsed.message };
+ return msg;
+ }
+ default:
+ return null;
+ }
+}
+
+/**
+ * Bounded exponential backoff with jitter.
+ * Base: 500ms, doubles each attempt, caps at 30s, adds ±20% jitter.
+ */
+export function nextBackoffMs(attempt: number): number {
+ const base = 500;
+ const max = 30_000;
+ const exponential = base * 2 ** Math.max(0, attempt);
+ const capped = Math.min(exponential, max);
+ const jitter = 0.8 + Math.random() * 0.4;
+ return Math.round(capped * jitter);
+}