summaryrefslogtreecommitdiffhomepage
path: root/packages
diff options
context:
space:
mode:
authorDax Raad <[email protected]>2025-12-01 16:35:03 -0500
committerDax Raad <[email protected]>2025-12-01 16:35:07 -0500
commit3d99dc78dbff982cde6dd61a0629b7ee9de1a8f3 (patch)
tree4c8dabb52db7c26d2d412c1be861f7599168a7d3 /packages
parent95c3a8b80505fcb3140989daf6fece3377aa3b95 (diff)
downloadopencode-3d99dc78dbff982cde6dd61a0629b7ee9de1a8f3.tar.gz
opencode-3d99dc78dbff982cde6dd61a0629b7ee9de1a8f3.zip
core: reduce latency when loading shared sessions through event compaction
Diffstat (limited to 'packages')
-rw-r--r--packages/enterprise/src/core/share.ts100
-rw-r--r--packages/enterprise/src/core/storage.ts20
-rw-r--r--packages/enterprise/test-debug.ts37
-rw-r--r--packages/enterprise/test/core/share.test.ts269
-rw-r--r--packages/enterprise/test/core/storage.test.ts67
-rw-r--r--packages/util/src/identifier.ts48
6 files changed, 506 insertions, 35 deletions
diff --git a/packages/enterprise/src/core/share.ts b/packages/enterprise/src/core/share.ts
index 5c2fbc28c..bf10c4a14 100644
--- a/packages/enterprise/src/core/share.ts
+++ b/packages/enterprise/src/core/share.ts
@@ -1,8 +1,10 @@
import { FileDiff, Message, Model, Part, Session, SessionStatus } from "@opencode-ai/sdk"
import { fn } from "@opencode-ai/util/fn"
import { iife } from "@opencode-ai/util/iife"
+import { Identifier } from "@opencode-ai/util/identifier"
import z from "zod"
import { Storage } from "./storage"
+import { Binary } from "@opencode-ai/util/binary"
export namespace Share {
export const Info = z.object({
@@ -37,15 +39,15 @@ export namespace Share {
export type Data = z.infer<typeof Data>
export const create = fn(z.object({ sessionID: z.string() }), async (body) => {
+ const isTest = process.env.NODE_ENV === "test" || body.sessionID.startsWith("test_")
const info: Info = {
- id: body.sessionID.slice(-8),
+ id: (isTest ? "test_" : "") + body.sessionID.slice(-8),
sessionID: body.sessionID,
secret: crypto.randomUUID(),
}
const exists = await get(info.id)
if (exists) throw new Errors.AlreadyExists(info.id)
await Storage.write(["share", info.id], info)
- console.log("created share", info.id)
return info
})
@@ -58,35 +60,72 @@ export namespace Share {
if (!share) throw new Errors.NotFound(body.id)
if (share.secret !== body.secret) throw new Errors.InvalidSecret(body.id)
await Storage.remove(["share", body.id])
- const list = await Storage.list(["share_data", body.id])
+ const list = await Storage.list({ prefix: ["share_data", body.id] })
for (const item of list) {
await Storage.remove(item)
}
})
- export async function data(id: string) {
- let time = Date.now()
- const list = await Storage.list(["share_data", id])
- console.log("listing share data", Date.now() - time, list.length)
- const promises = []
- time = Date.now()
- for (const item of list) {
- promises.push(
- iife(async () => {
- const [, , type] = item
- return {
- type: type as any,
- data: await Storage.read<any>(item),
- } as Data
- }),
- )
+ export const sync = fn(
+ z.object({
+ share: Info.pick({ id: true, secret: true }),
+ data: Data.array(),
+ }),
+ async (input) => {
+ const share = await get(input.share.id)
+ if (!share) throw new Errors.NotFound(input.share.id)
+ if (share.secret !== input.share.secret) throw new Errors.InvalidSecret(input.share.id)
+ await Storage.write(["share_event", input.share.id, Identifier.descending()], input.data)
+ },
+ )
+
+ type Compaction = {
+ event?: string
+ data: Data[]
+ }
+
+ export async function data(shareID: string) {
+ const compaction: Compaction = (await Storage.read<Compaction>(["share_compaction", shareID])) ?? {
+ data: [],
+ event: undefined,
+ }
+
+ const list = await Storage.list({
+ prefix: ["share_event", shareID],
+ end: compaction.event,
+ }).then((x) => x.toReversed())
+
+ const data = await Promise.all(list.map(async (event) => await Storage.read<Data[]>(event))).then((x) => x.flat())
+ for (const item of data) {
+ if (!item) continue
+ const key = (item: Data) => {
+ switch (item.type) {
+ case "session":
+ return "session"
+ case "message":
+ return `message/${item.data.id}`
+ case "part":
+ return `${item.data.messageID}/${item.data.id}`
+ case "session_diff":
+ return "session_diff"
+ case "model":
+ return "model"
+ }
+ }
+ const id = key(item)
+ const result = Binary.search(compaction.data, id, key)
+ if (result.found) {
+ compaction.data[result.index] = item
+ } else {
+ compaction.data.splice(result.index, 0, item)
+ }
}
- const result = await Promise.all(promises)
- console.log("read share data", Date.now() - time, result.length)
- return result
+ compaction.event = list.at(-1)?.at(-1)
+ await Storage.write(["share_compaction", shareID], compaction)
+ return compaction.data
}
- export const sync = fn(
+ export const syncOld = fn(
z.object({
share: Info.pick({ id: true, secret: true }),
data: Data.array(),
@@ -103,15 +142,16 @@ export namespace Share {
case "session":
await Storage.write(["share_data", input.share.id, "session"], item.data)
break
- case "message":
- await Storage.write(["share_data", input.share.id, "message", item.data.id], item.data)
+ case "message": {
+ const data = item.data as Message
+ await Storage.write(["share_data", input.share.id, "message", data.id], item.data)
break
- case "part":
- await Storage.write(
- ["share_data", input.share.id, "part", item.data.messageID, item.data.id],
- item.data,
- )
+ }
+ case "part": {
+ const data = item.data as Part
+ await Storage.write(["share_data", input.share.id, "part", data.messageID, data.id], item.data)
break
+ }
case "session_diff":
await Storage.write(["share_data", input.share.id, "session_diff"], item.data)
break
diff --git a/packages/enterprise/src/core/storage.ts b/packages/enterprise/src/core/storage.ts
index ee711458b..6edbef9ed 100644
--- a/packages/enterprise/src/core/storage.ts
+++ b/packages/enterprise/src/core/storage.ts
@@ -6,7 +6,7 @@ export namespace Storage {
read(path: string): Promise<string | undefined>
write(path: string, value: string): Promise<void>
remove(path: string): Promise<void>
- list(prefix: string): Promise<string[]>
+ list(options?: { prefix?: string; limit?: number; start?: string; end?: string }): Promise<string[]>
}
function createAdapter(client: AwsClient, endpoint: string, bucket: string): Adapter {
@@ -37,8 +37,14 @@ export namespace Storage {
if (!response.ok) throw new Error(`Failed to remove ${path}: ${response.status}`)
},
- async list(prefix: string): Promise<string[]> {
+ async list(options?: { prefix?: string; limit?: number; start?: string; end?: string }): Promise<string[]> {
+ const prefix = options?.prefix || ""
const params = new URLSearchParams({ "list-type": "2", prefix })
+ if (options?.limit) params.set("max-keys", options.limit.toString())
+ if (options?.start) {
+ const startPath = prefix + options.start + ".json"
+ params.set("start-after", startPath)
+ }
const response = await client.fetch(`${base}?${params}`)
if (!response.ok) throw new Error(`Failed to list ${prefix}: ${response.status}`)
const xml = await response.text()
@@ -48,6 +54,10 @@ export namespace Storage {
while ((match = regex.exec(xml)) !== null) {
keys.push(match[1])
}
+ if (options?.end) {
+ const endPath = prefix + options.end + ".json"
+ return keys.filter((key) => key <= endPath)
+ }
return keys
},
}
@@ -98,9 +108,9 @@ export namespace Storage {
return adapter().remove(resolve(key))
}
- export async function list(prefix: string[]) {
- const p = prefix.join("/") + (prefix.length ? "/" : "")
- const result = await adapter().list(p)
+ export async function list(options?: { prefix?: string[]; limit?: number; start?: string; end?: string }) {
+ const p = options?.prefix ? options.prefix.join("/") + (options.prefix.length ? "/" : "") : ""
+ const result = await adapter().list({ prefix: p, limit: options?.limit, start: options?.start, end: options?.end })
return result.map((x) => x.replace(/\.json$/, "").split("/"))
}
diff --git a/packages/enterprise/test-debug.ts b/packages/enterprise/test-debug.ts
new file mode 100644
index 000000000..fca995297
--- /dev/null
+++ b/packages/enterprise/test-debug.ts
@@ -0,0 +1,37 @@
+import { Share } from "./src/core/share"
+import { Storage } from "./src/core/storage"
+
+async function test() {
+ const shareInfo = await Share.create({ sessionID: "test-debug-" + Date.now() })
+
+ const batch1: Share.Data[] = [
+ { type: "part", data: { id: "part1", sessionID: "session1", messageID: "msg1", type: "text", text: "Hello" } },
+ ]
+
+ const batch2: Share.Data[] = [
+ { type: "part", data: { id: "part1", sessionID: "session1", messageID: "msg1", type: "text", text: "Hello Updated" } },
+ ]
+
+ await Share.sync({
+ share: { id: shareInfo.id, secret: shareInfo.secret },
+ data: batch1,
+ })
+
+ await Share.sync({
+ share: { id: shareInfo.id, secret: shareInfo.secret },
+ data: batch2,
+ })
+
+ const events = await Storage.list({ prefix: ["share_event", shareInfo.id] })
+ console.log("Events (raw):", events)
+ console.log("Events (reversed):", events.toReversed())
+
+ for (const event of events.toReversed()) {
+ const data = await Storage.read(event)
+ console.log("Event data (reversed order):", event, data)
+ }
+
+ await Share.remove({ id: shareInfo.id, secret: shareInfo.secret })
+}
+
+test()
diff --git a/packages/enterprise/test/core/share.test.ts b/packages/enterprise/test/core/share.test.ts
new file mode 100644
index 000000000..d3bf6a2c2
--- /dev/null
+++ b/packages/enterprise/test/core/share.test.ts
@@ -0,0 +1,269 @@
+import { describe, expect, test, afterAll } from "bun:test"
+import { Share } from "../../src/core/share"
+import { Storage } from "../../src/core/storage"
+import { Identifier } from "@opencode-ai/util/identifier"
+
+describe.concurrent("core.share", () => {
+ test("should create a share", async () => {
+ const sessionID = Identifier.descending()
+ const share = await Share.create({ sessionID })
+
+ expect(share.sessionID).toBe(sessionID)
+ expect(share.secret).toBeDefined()
+
+ await Share.remove({ id: share.id, secret: share.secret })
+ })
+
+ test("should sync data to a share", async () => {
+ const sessionID = Identifier.descending()
+ const share = await Share.create({ sessionID })
+
+ const data: Share.Data[] = [
+ {
+ type: "part",
+ data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello" },
+ },
+ ]
+
+ await Share.sync({
+ share: { id: share.id, secret: share.secret },
+ data,
+ })
+
+ const events = await Storage.list({ prefix: ["share_event", share.id] })
+ expect(events.length).toBe(1)
+
+ await Share.remove({ id: share.id, secret: share.secret })
+ })
+
+ test("should sync multiple batches of data", async () => {
+ const sessionID = Identifier.descending()
+ const share = await Share.create({ sessionID })
+
+ const data1: Share.Data[] = [
+ {
+ type: "part",
+ data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello" },
+ },
+ ]
+
+ const data2: Share.Data[] = [
+ {
+ type: "part",
+ data: { id: "part2", sessionID, messageID: "msg1", type: "text", text: "World" },
+ },
+ ]
+
+ await Share.sync({
+ share: { id: share.id, secret: share.secret },
+ data: data1,
+ })
+
+ await Share.sync({
+ share: { id: share.id, secret: share.secret },
+ data: data2,
+ })
+
+ const events = await Storage.list({ prefix: ["share_event", share.id] })
+ expect(events.length).toBe(2)
+
+ await Share.remove({ id: share.id, secret: share.secret })
+ })
+
+ test("should retrieve synced data", async () => {
+ const sessionID = Identifier.descending()
+ const share = await Share.create({ sessionID })
+
+ const data: Share.Data[] = [
+ {
+ type: "part",
+ data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello" },
+ },
+ {
+ type: "part",
+ data: { id: "part2", sessionID, messageID: "msg1", type: "text", text: "World" },
+ },
+ ]
+
+ await Share.sync({
+ share: { id: share.id, secret: share.secret },
+ data,
+ })
+
+ const result = await Share.data(share.id)
+
+ expect(result.length).toBe(2)
+ expect(result[0].type).toBe("part")
+ expect(result[1].type).toBe("part")
+
+ await Share.remove({ id: share.id, secret: share.secret })
+ })
+
+ test("should retrieve data from multiple syncs", async () => {
+ const sessionID = Identifier.descending()
+ const share = await Share.create({ sessionID })
+
+ const data1: Share.Data[] = [
+ {
+ type: "part",
+ data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello" },
+ },
+ ]
+
+ const data2: Share.Data[] = [
+ {
+ type: "part",
+ data: { id: "part2", sessionID, messageID: "msg2", type: "text", text: "World" },
+ },
+ ]
+
+ const data3: Share.Data[] = [
+ { type: "part", data: { id: "part3", sessionID, messageID: "msg3", type: "text", text: "!" } },
+ ]
+
+ await Share.sync({
+ share: { id: share.id, secret: share.secret },
+ data: data1,
+ })
+
+ await Share.sync({
+ share: { id: share.id, secret: share.secret },
+ data: data2,
+ })
+
+ await Share.sync({
+ share: { id: share.id, secret: share.secret },
+ data: data3,
+ })
+
+ const result = await Share.data(share.id)
+
+ expect(result.length).toBe(3)
+ const parts = result.filter((d) => d.type === "part")
+ expect(parts.length).toBe(3)
+
+ await Share.remove({ id: share.id, secret: share.secret })
+ })
+
+ test("should return latest data when syncing duplicate parts", async () => {
+ const sessionID = Identifier.descending()
+ const share = await Share.create({ sessionID })
+
+ const data1: Share.Data[] = [
+ {
+ type: "part",
+ data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello" },
+ },
+ ]
+
+ const data2: Share.Data[] = [
+ {
+ type: "part",
+ data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello Updated" },
+ },
+ ]
+
+ await Share.sync({
+ share: { id: share.id, secret: share.secret },
+ data: data1,
+ })
+
+ await Share.sync({
+ share: { id: share.id, secret: share.secret },
+ data: data2,
+ })
+
+ const result = await Share.data(share.id)
+
+ expect(result.length).toBe(1)
+ const [first] = result
+ expect(first.type).toBe("part")
+ expect(first.type === "part" && first.data.type === "text" && first.data.text).toBe("Hello Updated")
+
+ await Share.remove({ id: share.id, secret: share.secret })
+ })
+
+ test("should return empty array for share with no data", async () => {
+ const sessionID = Identifier.descending()
+ const share = await Share.create({ sessionID })
+
+ const result = await Share.data(share.id)
+
+ expect(result).toEqual([])
+
+ await Share.remove({ id: share.id, secret: share.secret })
+ })
+
+ test("should throw error for invalid secret", async () => {
+ const sessionID = Identifier.descending()
+ const share = await Share.create({ sessionID })
+
+ const data: Share.Data[] = [
+ {
+ type: "part",
+ data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Test" },
+ },
+ ]
+
+ expect(async () => {
+ await Share.sync({
+ share: { id: share.id, secret: "invalid-secret" },
+ data,
+ })
+ }).toThrow()
+
+ await Share.remove({ id: share.id, secret: share.secret })
+ })
+
+ test("should throw error for non-existent share", async () => {
+ const sessionID = Identifier.descending()
+ const data: Share.Data[] = [
+ {
+ type: "part",
+ data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Test" },
+ },
+ ]
+
+ expect(async () => {
+ await Share.sync({
+ share: { id: "non-existent-id", secret: "some-secret" },
+ data,
+ })
+ }).toThrow()
+ })
+
+ test("should handle different data types", async () => {
+ const sessionID = Identifier.descending()
+ const share = await Share.create({ sessionID })
+
+ const data: Share.Data[] = [
+ { type: "session", data: { id: sessionID, status: "running" } as any },
+ { type: "message", data: { id: "msg1", sessionID } as any },
+ {
+ type: "part",
+ data: { id: "part1", sessionID, messageID: "msg1", type: "text", text: "Hello" },
+ },
+ ]
+
+ await Share.sync({
+ share: { id: share.id, secret: share.secret },
+ data,
+ })
+
+ const result = await Share.data(share.id)
+
+ expect(result.length).toBe(3)
+ expect(result.some((d) => d.type === "session")).toBe(true)
+ expect(result.some((d) => d.type === "message")).toBe(true)
+ expect(result.some((d) => d.type === "part")).toBe(true)
+
+ await Share.remove({ id: share.id, secret: share.secret })
+ })
+
+ afterAll(async () => {
+ const files = await Storage.list()
+ for (const file of files) {
+ Storage.remove(file)
+ }
+ })
+})
diff --git a/packages/enterprise/test/core/storage.test.ts b/packages/enterprise/test/core/storage.test.ts
new file mode 100644
index 000000000..27e51384c
--- /dev/null
+++ b/packages/enterprise/test/core/storage.test.ts
@@ -0,0 +1,67 @@
+import { describe, expect, test, afterAll } from "bun:test"
+import { Storage } from "../../src/core/storage"
+
+describe("core.storage", () => {
+ test("should list files with start and end range", async () => {
+ await Storage.write(["test", "users", "user1"], { name: "user1" })
+ await Storage.write(["test", "users", "user2"], { name: "user2" })
+ await Storage.write(["test", "users", "user3"], { name: "user3" })
+ await Storage.write(["test", "users", "user4"], { name: "user4" })
+ await Storage.write(["test", "users", "user5"], { name: "user5" })
+
+ const result = await Storage.list({ prefix: ["test", "users"], start: "user2", end: "user4" })
+
+ expect(result).toEqual([
+ ["test", "users", "user3"],
+ ["test", "users", "user4"],
+ ])
+ })
+
+ test("should list files with start only", async () => {
+ const result = await Storage.list({ prefix: ["test", "users"], start: "user3" })
+
+ expect(result).toEqual([
+ ["test", "users", "user4"],
+ ["test", "users", "user5"],
+ ])
+ })
+
+ test("should list files with limit", async () => {
+ const result = await Storage.list({ prefix: ["test", "users"], limit: 3 })
+
+ expect(result).toEqual([
+ ["test", "users", "user1"],
+ ["test", "users", "user2"],
+ ["test", "users", "user3"],
+ ])
+ })
+
+ test("should list all files without prefix", async () => {
+ const result = await Storage.list()
+
+ expect(result.length).toBeGreaterThan(0)
+ })
+
+ test("should list all files with prefix", async () => {
+ const result = await Storage.list({ prefix: ["test", "users"] })
+
+ expect(result).toEqual([
+ ["test", "users", "user1"],
+ ["test", "users", "user2"],
+ ["test", "users", "user3"],
+ ["test", "users", "user4"],
+ ["test", "users", "user5"],
+ ])
+ })
+
+ afterAll(async () => {
+ const testFiles = await Storage.list({ prefix: ["test"] })
+
+ for (const file of testFiles) {
+ await Storage.remove(file)
+ }
+
+ const remainingFiles = await Storage.list({ prefix: ["test"] })
+ expect(remainingFiles).toEqual([])
+ })
+})
diff --git a/packages/util/src/identifier.ts b/packages/util/src/identifier.ts
new file mode 100644
index 000000000..ba28a351b
--- /dev/null
+++ b/packages/util/src/identifier.ts
@@ -0,0 +1,48 @@
+import { randomBytes } from "crypto"
+
+export namespace Identifier {
+ const LENGTH = 26
+
+ // State for monotonic ID generation
+ let lastTimestamp = 0
+ let counter = 0
+
+ export function ascending() {
+ return create(false)
+ }
+
+ export function descending() {
+ return create(true)
+ }
+
+ function randomBase62(length: number): string {
+ const chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
+ let result = ""
+ const bytes = randomBytes(length)
+ for (let i = 0; i < length; i++) {
+ result += chars[bytes[i] % 62]
+ }
+ return result
+ }
+
+ export function create(descending: boolean, timestamp?: number): string {
+ const currentTimestamp = timestamp ?? Date.now()
+
+ if (currentTimestamp !== lastTimestamp) {
+ lastTimestamp = currentTimestamp
+ counter = 0
+ }
+ counter++
+
+ let now = BigInt(currentTimestamp) * BigInt(0x1000) + BigInt(counter)
+
+ now = descending ? ~now : now
+
+ const timeBytes = Buffer.alloc(6)
+ for (let i = 0; i < 6; i++) {
+ timeBytes[i] = Number((now >> BigInt(40 - 8 * i)) & BigInt(0xff))
+ }
+
+ return timeBytes.toString("hex") + randomBase62(LENGTH - 12)
+ }
+}