diff options
| author | Dax Raad <[email protected]> | 2025-12-01 16:35:03 -0500 |
|---|---|---|
| committer | Dax Raad <[email protected]> | 2025-12-01 16:35:07 -0500 |
| commit | 3d99dc78dbff982cde6dd61a0629b7ee9de1a8f3 (patch) | |
| tree | 4c8dabb52db7c26d2d412c1be861f7599168a7d3 /packages/enterprise/src | |
| parent | 95c3a8b80505fcb3140989daf6fece3377aa3b95 (diff) | |
| download | opencode-3d99dc78dbff982cde6dd61a0629b7ee9de1a8f3.tar.gz opencode-3d99dc78dbff982cde6dd61a0629b7ee9de1a8f3.zip | |
core: reduce latency when loading shared sessions through event compaction
Diffstat (limited to 'packages/enterprise/src')
| -rw-r--r-- | packages/enterprise/src/core/share.ts | 100 | ||||
| -rw-r--r-- | packages/enterprise/src/core/storage.ts | 20 |
2 files changed, 85 insertions, 35 deletions
diff --git a/packages/enterprise/src/core/share.ts b/packages/enterprise/src/core/share.ts index 5c2fbc28c..bf10c4a14 100644 --- a/packages/enterprise/src/core/share.ts +++ b/packages/enterprise/src/core/share.ts @@ -1,8 +1,10 @@ import { FileDiff, Message, Model, Part, Session, SessionStatus } from "@opencode-ai/sdk" import { fn } from "@opencode-ai/util/fn" import { iife } from "@opencode-ai/util/iife" +import { Identifier } from "@opencode-ai/util/identifier" import z from "zod" import { Storage } from "./storage" +import { Binary } from "@opencode-ai/util/binary" export namespace Share { export const Info = z.object({ @@ -37,15 +39,15 @@ export namespace Share { export type Data = z.infer<typeof Data> export const create = fn(z.object({ sessionID: z.string() }), async (body) => { + const isTest = process.env.NODE_ENV === "test" || body.sessionID.startsWith("test_") const info: Info = { - id: body.sessionID.slice(-8), + id: (isTest ? "test_" : "") + body.sessionID.slice(-8), sessionID: body.sessionID, secret: crypto.randomUUID(), } const exists = await get(info.id) if (exists) throw new Errors.AlreadyExists(info.id) await Storage.write(["share", info.id], info) - console.log("created share", info.id) return info }) @@ -58,35 +60,72 @@ export namespace Share { if (!share) throw new Errors.NotFound(body.id) if (share.secret !== body.secret) throw new Errors.InvalidSecret(body.id) await Storage.remove(["share", body.id]) - const list = await Storage.list(["share_data", body.id]) + const list = await Storage.list({ prefix: ["share_data", body.id] }) for (const item of list) { await Storage.remove(item) } }) - export async function data(id: string) { - let time = Date.now() - const list = await Storage.list(["share_data", id]) - console.log("listing share data", Date.now() - time, list.length) - const promises = [] - time = Date.now() - for (const item of list) { - promises.push( - iife(async () => { - const [, , type] = item - return { - type: type as any, - data: await Storage.read<any>(item), - } as Data - }), - ) + export const sync = fn( + z.object({ + share: Info.pick({ id: true, secret: true }), + data: Data.array(), + }), + async (input) => { + const share = await get(input.share.id) + if (!share) throw new Errors.NotFound(input.share.id) + if (share.secret !== input.share.secret) throw new Errors.InvalidSecret(input.share.id) + await Storage.write(["share_event", input.share.id, Identifier.descending()], input.data) + }, + ) + + type Compaction = { + event?: string + data: Data[] + } + + export async function data(shareID: string) { + const compaction: Compaction = (await Storage.read<Compaction>(["share_compaction", shareID])) ?? { + data: [], + event: undefined, + } + + const list = await Storage.list({ + prefix: ["share_event", shareID], + end: compaction.event, + }).then((x) => x.toReversed()) + + const data = await Promise.all(list.map(async (event) => await Storage.read<Data[]>(event))).then((x) => x.flat()) + for (const item of data) { + if (!item) continue + const key = (item: Data) => { + switch (item.type) { + case "session": + return "session" + case "message": + return `message/${item.data.id}` + case "part": + return `${item.data.messageID}/${item.data.id}` + case "session_diff": + return "session_diff" + case "model": + return "model" + } + } + const id = key(item) + const result = Binary.search(compaction.data, id, key) + if (result.found) { + compaction.data[result.index] = item + } else { + compaction.data.splice(result.index, 0, item) + } } - const result = await Promise.all(promises) - console.log("read share data", Date.now() - time, result.length) - return result + compaction.event = list.at(-1)?.at(-1) + await Storage.write(["share_compaction", shareID], compaction) + return compaction.data } - export const sync = fn( + export const syncOld = fn( z.object({ share: Info.pick({ id: true, secret: true }), data: Data.array(), @@ -103,15 +142,16 @@ export namespace Share { case "session": await Storage.write(["share_data", input.share.id, "session"], item.data) break - case "message": - await Storage.write(["share_data", input.share.id, "message", item.data.id], item.data) + case "message": { + const data = item.data as Message + await Storage.write(["share_data", input.share.id, "message", data.id], item.data) break - case "part": - await Storage.write( - ["share_data", input.share.id, "part", item.data.messageID, item.data.id], - item.data, - ) + } + case "part": { + const data = item.data as Part + await Storage.write(["share_data", input.share.id, "part", data.messageID, data.id], item.data) break + } case "session_diff": await Storage.write(["share_data", input.share.id, "session_diff"], item.data) break diff --git a/packages/enterprise/src/core/storage.ts b/packages/enterprise/src/core/storage.ts index ee711458b..6edbef9ed 100644 --- a/packages/enterprise/src/core/storage.ts +++ b/packages/enterprise/src/core/storage.ts @@ -6,7 +6,7 @@ export namespace Storage { read(path: string): Promise<string | undefined> write(path: string, value: string): Promise<void> remove(path: string): Promise<void> - list(prefix: string): Promise<string[]> + list(options?: { prefix?: string; limit?: number; start?: string; end?: string }): Promise<string[]> } function createAdapter(client: AwsClient, endpoint: string, bucket: string): Adapter { @@ -37,8 +37,14 @@ export namespace Storage { if (!response.ok) throw new Error(`Failed to remove ${path}: ${response.status}`) }, - async list(prefix: string): Promise<string[]> { + async list(options?: { prefix?: string; limit?: number; start?: string; end?: string }): Promise<string[]> { + const prefix = options?.prefix || "" const params = new URLSearchParams({ "list-type": "2", prefix }) + if (options?.limit) params.set("max-keys", options.limit.toString()) + if (options?.start) { + const startPath = prefix + options.start + ".json" + params.set("start-after", startPath) + } const response = await client.fetch(`${base}?${params}`) if (!response.ok) throw new Error(`Failed to list ${prefix}: ${response.status}`) const xml = await response.text() @@ -48,6 +54,10 @@ export namespace Storage { while ((match = regex.exec(xml)) !== null) { keys.push(match[1]) } + if (options?.end) { + const endPath = prefix + options.end + ".json" + return keys.filter((key) => key <= endPath) + } return keys }, } @@ -98,9 +108,9 @@ export namespace Storage { return adapter().remove(resolve(key)) } - export async function list(prefix: string[]) { - const p = prefix.join("/") + (prefix.length ? "/" : "") - const result = await adapter().list(p) + export async function list(options?: { prefix?: string[]; limit?: number; start?: string; end?: string }) { + const p = options?.prefix ? options.prefix.join("/") + (options.prefix.length ? "/" : "") : "" + const result = await adapter().list({ prefix: p, limit: options?.limit, start: options?.start, end: options?.end }) return result.map((x) => x.replace(/\.json$/, "").split("/")) } |
