1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
/**
* Message-queue service — the typed handle the session-orchestrator obtains
* (lazily) to enqueue / read / drain a conversation's steering queue.
*
* The queue is transient + per-conversation (in-memory, only meaningful during
* generation). `drain` returns + clears AND pushes a surface update (empty).
* The orchestrator owns the delivery (drain → combine → inject → carry-to-new-
* turn); this service owns only the queue state, the surface, and the
* drain-and-clear.
*/
import { defineService, type Logger, type ServiceHandle } from "@dispatch/kernel";
import type { QueuedMessage } from "@dispatch/wire";
import type { MessageQueueState, QueueDeps } from "./pure.js";
import { drain as drainQueue, enqueue as enqueueMessage, getQueue as readQueue } from "./pure.js";
/**
* The message-queue service interface. Obtained via
* `host.getService(messageQueueHandle)`.
*/
export interface MessageQueueService {
/** Append a message; return the CURRENT queue snapshot (post-append). */
enqueue(conversationId: string, text: string): QueuedMessage[];
/** Current queue snapshot (empty array if none / unknown conversation). */
getQueue(conversationId: string): QueuedMessage[];
/**
* Drain: return all queued messages, CLEAR the queue, and push a surface
* update (empty). Returns the raw `QueuedMessage[]` (NOT a ChatMessage —
* the caller combines + builds the ChatMessage). Empty array if the queue
* was empty (and then NO surface update is pushed — no change).
*/
drain(conversationId: string): QueuedMessage[];
}
/**
* Typed handle anchoring the message-queue service. The single symbol the
* session-orchestrator imports to reach the queue — no string-keyed lookup.
*/
export const messageQueueHandle: ServiceHandle<MessageQueueService> =
defineService<MessageQueueService>("message-queue");
/** Deps for the service shell — the pure core's deps plus the surface notifier. */
export interface MessageQueueDeps extends QueueDeps {
/**
* Notify the surface-registry that the queue changed, so the transport
* re-fetches + pushes a full new SurfaceUpdate. Called ONLY on a real
* change (enqueue → grew; drain-non-empty → emptied); a no-op drain of an
* already-empty queue does NOT notify (no change → no surface push).
*/
readonly notify: () => void;
/** Injected host logger (optional in tests). Logs counts/lengths at debug, never bodies. */
readonly logger?: Logger;
}
/**
* Create a MessageQueueService backed by an in-memory per-conversation Map.
* Pure decision logic (enqueue/getQueue/drain) lives in ./pure.js; this shell
* owns the state + the notify edge. State is owned (not ambient): the Map lives
* in this closure, reachable only through the returned service.
*/
export function createMessageQueueService(deps: MessageQueueDeps): MessageQueueService {
const state: MessageQueueState = new Map();
return {
enqueue(conversationId, text) {
const snapshot = enqueueMessage(state, conversationId, text, deps);
deps.logger?.debug("message-queue: enqueued", {
conversationId,
queueLen: snapshot.length,
textLen: text.length,
});
deps.notify();
return snapshot;
},
getQueue(conversationId) {
return readQueue(state, conversationId);
},
drain(conversationId) {
const drained = drainQueue(state, conversationId);
if (drained.length === 0) return drained;
deps.logger?.debug("message-queue: drained", {
conversationId,
count: drained.length,
});
deps.notify();
return drained;
},
};
}
|