blob: d2bc13d1e04459424281fbffdae51688871aed9d (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
import type {
ChatDeltaMessage,
ChatErrorMessage,
ConversationCompactedMessage,
ConversationOpenMessage,
ConversationStatusChangedMessage,
WsClientMessage,
} from "@dispatch/transport-contract";
import type { SurfaceServerMessage } from "@dispatch/ui-contract";
import { nextBackoffMs, parseServerMessage, serialize } from "./logic";
export interface WebSocketLike {
send(data: string): void;
close(): void;
onopen: (() => void) | null;
onmessage: ((ev: { data: string }) => void) | null;
onclose: ((ev: { code: number; reason: string }) => void) | null;
}
export interface SurfaceSocketOptions {
url: string;
onMessage: (msg: SurfaceServerMessage) => void;
onChat?: (msg: ChatDeltaMessage | ChatErrorMessage) => void;
/** Broadcast when a conversation is "opened" (e.g. CLI `--open` flag). */
onConversationOpen?: (msg: ConversationOpenMessage) => void;
/** Broadcast when a conversation's lifecycle status changes (active/idle/closed). */
onConversationStatusChanged?: (msg: ConversationStatusChangedMessage) => void;
/** Broadcast when a conversation's history has been compacted (reload needed). */
onConversationCompacted?: (msg: ConversationCompactedMessage) => void;
onReopen?: () => void;
socketFactory?: (url: string) => WebSocketLike;
}
export interface SurfaceSocketHandle {
send(msg: WsClientMessage): void;
close(): void;
}
export function createSurfaceSocket(opts: SurfaceSocketOptions): SurfaceSocketHandle {
const factory =
opts.socketFactory ?? ((url: string) => new WebSocket(url) as unknown as WebSocketLike);
let socket: WebSocketLike | null = null;
let disposed = false;
let reconnectAttempt = 0;
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
let isOpen = false;
const queue: string[] = [];
function connect(isReconnect: boolean): void {
socket = factory(opts.url);
isOpen = false;
socket.onopen = () => {
if (disposed) return;
isOpen = true;
reconnectAttempt = 0;
for (const raw of queue.splice(0)) {
socket?.send(raw);
}
if (isReconnect) {
opts.onReopen?.();
}
};
socket.onmessage = (ev) => {
if (disposed) return;
const msg = parseServerMessage(ev.data);
if (msg !== null) {
if (msg.type === "chat.delta" || msg.type === "chat.error") {
opts.onChat?.(msg as ChatDeltaMessage | ChatErrorMessage);
} else if (msg.type === "conversation.open") {
opts.onConversationOpen?.(msg as ConversationOpenMessage);
} else if (msg.type === "conversation.statusChanged") {
opts.onConversationStatusChanged?.(msg as ConversationStatusChangedMessage);
} else if (msg.type === "conversation.compacted") {
opts.onConversationCompacted?.(msg as ConversationCompactedMessage);
} else {
opts.onMessage(msg as SurfaceServerMessage);
}
}
};
socket.onclose = () => {
if (disposed) return;
isOpen = false;
scheduleReconnect();
};
}
function scheduleReconnect(): void {
const delay = nextBackoffMs(reconnectAttempt);
reconnectAttempt++;
reconnectTimer = setTimeout(() => {
reconnectTimer = null;
if (disposed) return;
connect(true);
}, delay);
}
connect(false);
return {
send(msg: WsClientMessage): void {
if (disposed) return;
const raw = serialize(msg);
if (isOpen) {
socket?.send(raw);
} else {
queue.push(raw);
}
},
close(): void {
disposed = true;
if (reconnectTimer !== null) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
socket?.close();
socket = null;
},
};
}
|