From e1c8cf3257cb33457aa882c548f5195ecc0f9854 Mon Sep 17 00:00:00 2001 From: Adam Malczewski Date: Sat, 6 Jun 2026 22:08:16 +0900 Subject: Slice 1: surface system + WS transport + composition root Pure-core feature libraries assembled at the composition root: - core/protocol: pure reducer over surface catalog/spec/error messages - features/surface-host: generic field-kind interpreter (toggle/progress/ selector/stat/button) + pure plan logic; no surface-id special-casing - adapters/ws: injected WebSocket client (effects at the edge) - app: composition root store (Svelte 5 runes over the pure reducer), host-relative surface WS URL resolution (resolveWsUrl), root App.svelte Verified green: svelte-check 0/0, vitest 84 passed, biome clean, vite build ok. --- src/adapters/ws/index.test.ts | 234 ++++++++++++++++++++++++++++++++++++++++++ src/adapters/ws/index.ts | 98 ++++++++++++++++++ src/adapters/ws/logic.test.ts | 195 +++++++++++++++++++++++++++++++++++ src/adapters/ws/logic.ts | 91 ++++++++++++++++ 4 files changed, 618 insertions(+) create mode 100644 src/adapters/ws/index.test.ts create mode 100644 src/adapters/ws/index.ts create mode 100644 src/adapters/ws/logic.test.ts create mode 100644 src/adapters/ws/logic.ts (limited to 'src/adapters') diff --git a/src/adapters/ws/index.test.ts b/src/adapters/ws/index.test.ts new file mode 100644 index 0000000..92b8753 --- /dev/null +++ b/src/adapters/ws/index.test.ts @@ -0,0 +1,234 @@ +import { describe, expect, it, vi } from "vitest"; +import type { WebSocketLike } from "./index"; +import { createSurfaceSocket } from "./index"; + +interface FakeSocket extends WebSocketLike { + sent: string[]; + resolveOpen(): void; + invokeMessage(data: string): void; + invokeClose(): void; +} + +function fakeSocket(): FakeSocket { + let onopen: (() => void) | null = null; + let onmessage: ((ev: { data: string }) => void) | null = null; + let onclose: ((ev: { code: number; reason: string }) => void) | null = null; + const sent: string[] = []; + + const ws: FakeSocket = { + send(data: string) { + sent.push(data); + }, + close() {}, + get onopen() { + return onopen; + }, + set onopen(fn) { + onopen = fn; + }, + get onmessage() { + return onmessage; + }, + set onmessage(fn) { + onmessage = fn; + }, + get onclose() { + return onclose; + }, + set onclose(fn) { + onclose = fn; + }, + resolveOpen() { + onopen?.(); + }, + invokeMessage(data: string) { + onmessage?.({ data }); + }, + invokeClose() { + onclose?.({ code: 1000, reason: "" }); + }, + sent, + }; + return ws; +} + +describe("createSurfaceSocket", () => { + it("sends queued messages once socket opens", () => { + const ws = fakeSocket(); + const onMessage = vi.fn(); + const handle = createSurfaceSocket({ + url: "ws://test", + onMessage, + socketFactory: () => ws, + }); + + handle.send({ type: "subscribe", surfaceId: "s1" }); + handle.send({ type: "subscribe", surfaceId: "s2" }); + expect(ws.sent).toHaveLength(0); + + ws.resolveOpen(); + expect(ws.sent).toHaveLength(2); + expect(JSON.parse(ws.sent[0] ?? "")).toEqual({ type: "subscribe", surfaceId: "s1" }); + expect(JSON.parse(ws.sent[1] ?? "")).toEqual({ type: "subscribe", surfaceId: "s2" }); + }); + + it("sends immediately when socket is already open", () => { + const ws = fakeSocket(); + const handle = createSurfaceSocket({ + url: "ws://test", + onMessage: vi.fn(), + socketFactory: () => ws, + }); + + ws.resolveOpen(); + ws.sent.length = 0; + + handle.send({ type: "subscribe", surfaceId: "s1" }); + expect(ws.sent).toHaveLength(1); + }); + + it("routes inbound messages to onMessage via parseServerMessage", () => { + const ws = fakeSocket(); + const onMessage = vi.fn(); + createSurfaceSocket({ + url: "ws://test", + onMessage, + socketFactory: () => ws, + }); + + ws.resolveOpen(); + ws.invokeMessage(JSON.stringify({ type: "catalog", catalog: [] })); + expect(onMessage).toHaveBeenCalledOnce(); + expect(onMessage).toHaveBeenCalledWith({ type: "catalog", catalog: [] }); + }); + + it("drops malformed inbound messages silently", () => { + const ws = fakeSocket(); + const onMessage = vi.fn(); + createSurfaceSocket({ + url: "ws://test", + onMessage, + socketFactory: () => ws, + }); + + ws.resolveOpen(); + ws.invokeMessage("not json"); + expect(onMessage).not.toHaveBeenCalled(); + }); + + it("auto-reconnects on close and fires onReopen after successful reconnect", () => { + vi.useFakeTimers(); + try { + const sockets: ReturnType[] = []; + const onMessage = vi.fn(); + const onReopen = vi.fn(); + createSurfaceSocket({ + url: "ws://test", + onMessage, + onReopen, + socketFactory: () => { + const ws = fakeSocket(); + sockets.push(ws); + return ws; + }, + }); + + expect(sockets).toHaveLength(1); + sockets[0]?.resolveOpen(); + + // Simulate close + sockets[0]?.invokeClose(); + + // Fast-forward past the backoff delay + vi.advanceTimersByTime(600); + + expect(sockets).toHaveLength(2); + // onReopen should NOT have fired yet (socket not open) + expect(onReopen).not.toHaveBeenCalled(); + + sockets[1]?.resolveOpen(); + expect(onReopen).toHaveBeenCalledOnce(); + } finally { + vi.useRealTimers(); + } + }); + + it("does not fire onReopen on initial connect", () => { + const ws = fakeSocket(); + const onReopen = vi.fn(); + createSurfaceSocket({ + url: "ws://test", + onMessage: vi.fn(), + onReopen, + socketFactory: () => ws, + }); + + ws.resolveOpen(); + expect(onReopen).not.toHaveBeenCalled(); + }); + + it("close() prevents further reconnects", () => { + vi.useFakeTimers(); + try { + const sockets: ReturnType[] = []; + const handle = createSurfaceSocket({ + url: "ws://test", + onMessage: vi.fn(), + socketFactory: () => { + const ws = fakeSocket(); + sockets.push(ws); + return ws; + }, + }); + + sockets[0]?.resolveOpen(); + sockets[0]?.invokeClose(); + handle.close(); + + vi.advanceTimersByTime(10_000); + expect(sockets).toHaveLength(1); + } finally { + vi.useRealTimers(); + } + }); + + it("close() prevents further sends", () => { + const ws = fakeSocket(); + const handle = createSurfaceSocket({ + url: "ws://test", + onMessage: vi.fn(), + socketFactory: () => ws, + }); + + ws.resolveOpen(); + ws.sent.length = 0; + handle.close(); + + handle.send({ type: "subscribe", surfaceId: "s1" }); + expect(ws.sent).toHaveLength(0); + }); + + it("queues multiple sends before open and flushes in order", () => { + const ws = fakeSocket(); + const handle = createSurfaceSocket({ + url: "ws://test", + onMessage: vi.fn(), + socketFactory: () => ws, + }); + + handle.send({ type: "subscribe", surfaceId: "a" }); + handle.send({ type: "subscribe", surfaceId: "b" }); + handle.send({ type: "invoke", surfaceId: "a", actionId: "x", payload: 1 }); + ws.resolveOpen(); + + expect(ws.sent).toHaveLength(3); + expect(JSON.parse(ws.sent[0] ?? "")).toEqual({ type: "subscribe", surfaceId: "a" }); + expect(JSON.parse(ws.sent[1] ?? "")).toEqual({ type: "subscribe", surfaceId: "b" }); + expect(JSON.parse(ws.sent[2] ?? "")).toEqual({ + type: "invoke", + surfaceId: "a", + actionId: "x", + payload: 1, + }); + }); +}); diff --git a/src/adapters/ws/index.ts b/src/adapters/ws/index.ts new file mode 100644 index 0000000..40eda2b --- /dev/null +++ b/src/adapters/ws/index.ts @@ -0,0 +1,98 @@ +import type { SurfaceClientMessage, SurfaceServerMessage } from "@dispatch/ui-contract"; +import { nextBackoffMs, parseServerMessage, serialize } from "./logic"; + +export interface WebSocketLike { + send(data: string): void; + close(): void; + onopen: (() => void) | null; + onmessage: ((ev: { data: string }) => void) | null; + onclose: ((ev: { code: number; reason: string }) => void) | null; +} + +export interface SurfaceSocketOptions { + url: string; + onMessage: (msg: SurfaceServerMessage) => void; + onReopen?: () => void; + socketFactory?: (url: string) => WebSocketLike; +} + +export interface SurfaceSocketHandle { + send(msg: SurfaceClientMessage): void; + close(): void; +} + +export function createSurfaceSocket(opts: SurfaceSocketOptions): SurfaceSocketHandle { + const factory = + opts.socketFactory ?? ((url: string) => new WebSocket(url) as unknown as WebSocketLike); + + let socket: WebSocketLike | null = null; + let disposed = false; + let reconnectAttempt = 0; + let reconnectTimer: ReturnType | null = null; + let isOpen = false; + const queue: string[] = []; + + function connect(isReconnect: boolean): void { + socket = factory(opts.url); + isOpen = false; + + socket.onopen = () => { + if (disposed) return; + isOpen = true; + reconnectAttempt = 0; + for (const raw of queue.splice(0)) { + socket?.send(raw); + } + if (isReconnect) { + opts.onReopen?.(); + } + }; + + socket.onmessage = (ev) => { + if (disposed) return; + const msg = parseServerMessage(ev.data); + if (msg !== null) { + opts.onMessage(msg); + } + }; + + socket.onclose = () => { + if (disposed) return; + isOpen = false; + scheduleReconnect(); + }; + } + + function scheduleReconnect(): void { + const delay = nextBackoffMs(reconnectAttempt); + reconnectAttempt++; + reconnectTimer = setTimeout(() => { + reconnectTimer = null; + if (disposed) return; + connect(true); + }, delay); + } + + connect(false); + + return { + send(msg: SurfaceClientMessage): void { + if (disposed) return; + const raw = serialize(msg); + if (isOpen) { + socket?.send(raw); + } else { + queue.push(raw); + } + }, + close(): void { + disposed = true; + if (reconnectTimer !== null) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } + socket?.close(); + socket = null; + }, + }; +} diff --git a/src/adapters/ws/logic.test.ts b/src/adapters/ws/logic.test.ts new file mode 100644 index 0000000..62ae6a0 --- /dev/null +++ b/src/adapters/ws/logic.test.ts @@ -0,0 +1,195 @@ +import { describe, expect, it } from "vitest"; +import { nextBackoffMs, parseServerMessage, serialize } from "./logic"; + +describe("serialize", () => { + it("serializes a subscribe message", () => { + const msg = { type: "subscribe" as const, surfaceId: "s1" }; + expect(JSON.parse(serialize(msg))).toEqual(msg); + }); + + it("serializes an unsubscribe message", () => { + const msg = { type: "unsubscribe" as const, surfaceId: "s1" }; + expect(JSON.parse(serialize(msg))).toEqual(msg); + }); + + it("serializes an invoke message with payload", () => { + const msg = { type: "invoke" as const, surfaceId: "s1", actionId: "toggle", payload: true }; + expect(JSON.parse(serialize(msg))).toEqual(msg); + }); + + it("serializes an invoke message without payload", () => { + const msg = { type: "invoke" as const, surfaceId: "s1", actionId: "click" }; + expect(JSON.parse(serialize(msg))).toEqual(msg); + }); +}); + +describe("parseServerMessage", () => { + it("parses a catalog message", () => { + const data = JSON.stringify({ + type: "catalog", + catalog: [{ id: "s1", region: "r", title: "S1" }], + }); + const result = parseServerMessage(data); + expect(result).toEqual({ + type: "catalog", + catalog: [{ id: "s1", region: "r", title: "S1" }], + }); + }); + + it("parses a surface message", () => { + const data = JSON.stringify({ + type: "surface", + spec: { id: "s1", region: "r", title: "S1", fields: [] }, + }); + const result = parseServerMessage(data); + expect(result).toEqual({ + type: "surface", + spec: { id: "s1", region: "r", title: "S1", fields: [] }, + }); + }); + + it("parses an update message", () => { + const data = JSON.stringify({ + type: "update", + update: { + surfaceId: "s1", + spec: { id: "s1", region: "r", title: "S1", fields: [] }, + }, + }); + const result = parseServerMessage(data); + expect(result).toEqual({ + type: "update", + update: { + surfaceId: "s1", + spec: { id: "s1", region: "r", title: "S1", fields: [] }, + }, + }); + }); + + it("parses an error message with surfaceId", () => { + const data = JSON.stringify({ type: "error", surfaceId: "s1", message: "boom" }); + const result = parseServerMessage(data); + expect(result).toEqual({ type: "error", surfaceId: "s1", message: "boom" }); + }); + + it("parses an error message without surfaceId", () => { + const data = JSON.stringify({ type: "error", message: "global boom" }); + const result = parseServerMessage(data); + expect(result).toEqual({ type: "error", message: "global boom" }); + }); + + it("returns null for malformed JSON", () => { + expect(parseServerMessage("not json")).toBeNull(); + expect(parseServerMessage("{broken")).toBeNull(); + expect(parseServerMessage("")).toBeNull(); + }); + + it("returns null for non-object JSON", () => { + expect(parseServerMessage("42")).toBeNull(); + expect(parseServerMessage('"hello"')).toBeNull(); + expect(parseServerMessage("null")).toBeNull(); + expect(parseServerMessage("true")).toBeNull(); + expect(parseServerMessage("[1,2,3]")).toBeNull(); + }); + + it("returns null for unknown type", () => { + expect(parseServerMessage(JSON.stringify({ type: "unknown" }))).toBeNull(); + }); + + it("returns null when type is missing", () => { + expect(parseServerMessage(JSON.stringify({ foo: "bar" }))).toBeNull(); + }); + + it("returns null when type is not a string", () => { + expect(parseServerMessage(JSON.stringify({ type: 42 }))).toBeNull(); + }); + + it("returns null for catalog with non-array catalog field", () => { + expect(parseServerMessage(JSON.stringify({ type: "catalog", catalog: "nope" }))).toBeNull(); + }); + + it("returns null for surface with missing spec fields", () => { + expect(parseServerMessage(JSON.stringify({ type: "surface", spec: { id: "s1" } }))).toBeNull(); + }); + + it("returns null for surface with non-object spec", () => { + expect(parseServerMessage(JSON.stringify({ type: "surface", spec: "nope" }))).toBeNull(); + }); + + it("returns null for update with missing update field", () => { + expect(parseServerMessage(JSON.stringify({ type: "update" }))).toBeNull(); + }); + + it("returns null for update with invalid spec", () => { + expect( + parseServerMessage(JSON.stringify({ type: "update", update: { surfaceId: "s1", spec: {} } })), + ).toBeNull(); + }); + + it("returns null for error with non-string message", () => { + expect(parseServerMessage(JSON.stringify({ type: "error", message: 42 }))).toBeNull(); + }); + + it("returns null for error with invalid surfaceId type", () => { + expect( + parseServerMessage(JSON.stringify({ type: "error", surfaceId: 42, message: "boom" })), + ).toBeNull(); + }); +}); + +describe("round-trip: parseServerMessage(serialize(...))", () => { + it("round-trips a subscribe message through serialize only", () => { + const msg = { type: "subscribe" as const, surfaceId: "s1" }; + const wire = serialize(msg); + expect(JSON.parse(wire)).toEqual(msg); + }); + + it("round-trips an invoke message with payload", () => { + const msg = { type: "invoke" as const, surfaceId: "s1", actionId: "toggle", payload: false }; + const wire = serialize(msg); + expect(JSON.parse(wire)).toEqual(msg); + }); +}); + +describe("nextBackoffMs", () => { + it("returns a positive number", () => { + expect(nextBackoffMs(0)).toBeGreaterThan(0); + }); + + it("is capped at 30s + jitter (at most ~36s)", () => { + for (let i = 0; i < 100; i++) { + expect(nextBackoffMs(100)).toBeLessThanOrEqual(36_000); + } + }); + + it("starts around 500ms (±20% jitter)", () => { + for (let i = 0; i < 100; i++) { + const ms = nextBackoffMs(0); + expect(ms).toBeGreaterThanOrEqual(400); + expect(ms).toBeLessThanOrEqual(600); + } + }); + + it("grows exponentially with attempt", () => { + const averages = [0, 1, 2, 3].map((attempt) => { + let sum = 0; + for (let i = 0; i < 200; i++) { + sum += nextBackoffMs(attempt); + } + return sum / 200; + }); + for (let i = 1; i < averages.length; i++) { + const prev = averages[i - 1]; + if (prev === undefined) throw new Error("unreachable"); + expect(averages[i]).toBeGreaterThan(prev); + } + }); + + it("treats negative attempt as 0", () => { + for (let i = 0; i < 50; i++) { + const ms = nextBackoffMs(-5); + expect(ms).toBeGreaterThanOrEqual(400); + expect(ms).toBeLessThanOrEqual(600); + } + }); +}); diff --git a/src/adapters/ws/logic.ts b/src/adapters/ws/logic.ts new file mode 100644 index 0000000..83a5802 --- /dev/null +++ b/src/adapters/ws/logic.ts @@ -0,0 +1,91 @@ +import type { + CatalogMessage, + SurfaceClientMessage, + SurfaceErrorMessage, + SurfaceMessage, + SurfaceServerMessage, + SurfaceUpdateMessage, +} from "@dispatch/ui-contract"; + +const VALID_SERVER_TYPES = new Set(["catalog", "surface", "update", "error"]); + +/** Serialize a client message to a JSON string for the wire. */ +export function serialize(msg: SurfaceClientMessage): string { + return JSON.stringify(msg); +} + +function isRecord(v: unknown): v is Record { + return v !== null && typeof v === "object" && !Array.isArray(v); +} + +/** + * Parse a raw server message string into a typed SurfaceServerMessage. + * Returns null for malformed JSON or shapes that don't match the protocol. + */ +export function parseServerMessage(data: string): SurfaceServerMessage | null { + let parsed: unknown; + try { + parsed = JSON.parse(data); + } catch { + return null; + } + if (!isRecord(parsed)) { + return null; + } + const t = parsed.type; + if (typeof t !== "string" || !VALID_SERVER_TYPES.has(t)) { + return null; + } + switch (t) { + case "catalog": { + if (!Array.isArray(parsed.catalog)) return null; + return { type: "catalog", catalog: parsed.catalog as CatalogMessage["catalog"] }; + } + case "surface": { + const spec = parsed.spec; + if (!isRecord(spec)) return null; + if (typeof spec.id !== "string") return null; + if (typeof spec.region !== "string") return null; + if (typeof spec.title !== "string") return null; + if (!Array.isArray(spec.fields)) return null; + return { type: "surface", spec: spec as unknown as SurfaceMessage["spec"] }; + } + case "update": { + const update = parsed.update; + if (!isRecord(update)) return null; + if (typeof update.surfaceId !== "string") return null; + const spec = update.spec; + if (!isRecord(spec)) return null; + if (typeof spec.id !== "string") return null; + if (typeof spec.region !== "string") return null; + if (typeof spec.title !== "string") return null; + if (!Array.isArray(spec.fields)) return null; + return { type: "update", update: update as unknown as SurfaceUpdateMessage["update"] }; + } + case "error": { + if (typeof parsed.message !== "string") return null; + const surfaceId = parsed.surfaceId; + if (surfaceId !== undefined && typeof surfaceId !== "string") return null; + const msg: SurfaceErrorMessage = + surfaceId !== undefined + ? { type: "error", surfaceId, message: parsed.message } + : { type: "error", message: parsed.message }; + return msg; + } + default: + return null; + } +} + +/** + * Bounded exponential backoff with jitter. + * Base: 500ms, doubles each attempt, caps at 30s, adds ±20% jitter. + */ +export function nextBackoffMs(attempt: number): number { + const base = 500; + const max = 30_000; + const exponential = base * 2 ** Math.max(0, attempt); + const capped = Math.min(exponential, max); + const jitter = 0.8 + Math.random() * 0.4; + return Math.round(capped * jitter); +} -- cgit v1.2.3