summaryrefslogtreecommitdiffhomepage
path: root/src/adapters/ws/index.ts
blob: d2bc13d1e04459424281fbffdae51688871aed9d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import type {
	ChatDeltaMessage,
	ChatErrorMessage,
	ConversationCompactedMessage,
	ConversationOpenMessage,
	ConversationStatusChangedMessage,
	WsClientMessage,
} from "@dispatch/transport-contract";
import type { SurfaceServerMessage } from "@dispatch/ui-contract";
import { nextBackoffMs, parseServerMessage, serialize } from "./logic";

export interface WebSocketLike {
	send(data: string): void;
	close(): void;
	onopen: (() => void) | null;
	onmessage: ((ev: { data: string }) => void) | null;
	onclose: ((ev: { code: number; reason: string }) => void) | null;
}

export interface SurfaceSocketOptions {
	url: string;
	onMessage: (msg: SurfaceServerMessage) => void;
	onChat?: (msg: ChatDeltaMessage | ChatErrorMessage) => void;
	/** Broadcast when a conversation is "opened" (e.g. CLI `--open` flag). */
	onConversationOpen?: (msg: ConversationOpenMessage) => void;
	/** Broadcast when a conversation's lifecycle status changes (active/idle/closed). */
	onConversationStatusChanged?: (msg: ConversationStatusChangedMessage) => void;
	/** Broadcast when a conversation's history has been compacted (reload needed). */
	onConversationCompacted?: (msg: ConversationCompactedMessage) => void;
	onReopen?: () => void;
	socketFactory?: (url: string) => WebSocketLike;
}

export interface SurfaceSocketHandle {
	send(msg: WsClientMessage): void;
	close(): void;
}

export function createSurfaceSocket(opts: SurfaceSocketOptions): SurfaceSocketHandle {
	const factory =
		opts.socketFactory ?? ((url: string) => new WebSocket(url) as unknown as WebSocketLike);

	let socket: WebSocketLike | null = null;
	let disposed = false;
	let reconnectAttempt = 0;
	let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
	let isOpen = false;
	const queue: string[] = [];

	function connect(isReconnect: boolean): void {
		socket = factory(opts.url);
		isOpen = false;

		socket.onopen = () => {
			if (disposed) return;
			isOpen = true;
			reconnectAttempt = 0;
			for (const raw of queue.splice(0)) {
				socket?.send(raw);
			}
			if (isReconnect) {
				opts.onReopen?.();
			}
		};

		socket.onmessage = (ev) => {
			if (disposed) return;
			const msg = parseServerMessage(ev.data);
			if (msg !== null) {
				if (msg.type === "chat.delta" || msg.type === "chat.error") {
					opts.onChat?.(msg as ChatDeltaMessage | ChatErrorMessage);
				} else if (msg.type === "conversation.open") {
					opts.onConversationOpen?.(msg as ConversationOpenMessage);
				} else if (msg.type === "conversation.statusChanged") {
					opts.onConversationStatusChanged?.(msg as ConversationStatusChangedMessage);
				} else if (msg.type === "conversation.compacted") {
					opts.onConversationCompacted?.(msg as ConversationCompactedMessage);
				} else {
					opts.onMessage(msg as SurfaceServerMessage);
				}
			}
		};

		socket.onclose = () => {
			if (disposed) return;
			isOpen = false;
			scheduleReconnect();
		};
	}

	function scheduleReconnect(): void {
		const delay = nextBackoffMs(reconnectAttempt);
		reconnectAttempt++;
		reconnectTimer = setTimeout(() => {
			reconnectTimer = null;
			if (disposed) return;
			connect(true);
		}, delay);
	}

	connect(false);

	return {
		send(msg: WsClientMessage): void {
			if (disposed) return;
			const raw = serialize(msg);
			if (isOpen) {
				socket?.send(raw);
			} else {
				queue.push(raw);
			}
		},
		close(): void {
			disposed = true;
			if (reconnectTimer !== null) {
				clearTimeout(reconnectTimer);
				reconnectTimer = null;
			}
			socket?.close();
			socket = null;
		},
	};
}