summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorDax Raad <[email protected]>2025-12-01 16:58:59 -0500
committerDax Raad <[email protected]>2025-12-01 16:58:59 -0500
commit01e2c9cc21c5a0a5d23928cab7b3100c4d6f79a1 (patch)
tree1dc17ad9c7854bb6815477732cdc96c970c6f203
parent4acb645f04aacf0d8bad8ea526e6ba895d123d57 (diff)
downloadopencode-01e2c9cc21c5a0a5d23928cab7b3100c4d6f79a1.tar.gz
opencode-01e2c9cc21c5a0a5d23928cab7b3100c4d6f79a1.zip
core: fix share compaction reprocessing same events by making storage list boundaries exclusive
-rw-r--r--packages/enterprise/src/core/share.ts52
-rw-r--r--packages/enterprise/src/core/storage.ts25
-rw-r--r--packages/enterprise/test/core/storage.test.ts13
3 files changed, 47 insertions, 43 deletions
diff --git a/packages/enterprise/src/core/share.ts b/packages/enterprise/src/core/share.ts
index 46b277d12..a85994c31 100644
--- a/packages/enterprise/src/core/share.ts
+++ b/packages/enterprise/src/core/share.ts
@@ -93,38 +93,40 @@ export namespace Share {
console.log("reading pending events")
const list = await Storage.list({
prefix: ["share_event", shareID],
- end: compaction.event,
+ before: compaction.event,
}).then((x) => x.toReversed())
console.log("compacting", list.length)
- const data = await Promise.all(list.map(async (event) => await Storage.read<Data[]>(event))).then((x) => x.flat())
- for (const item of data) {
- if (!item) continue
- const key = (item: Data) => {
- switch (item.type) {
- case "session":
- return "session"
- case "message":
- return `message/${item.data.id}`
- case "part":
- return `${item.data.messageID}/${item.data.id}`
- case "session_diff":
- return "session_diff"
- case "model":
- return "model"
+ if (list.length > 0) {
+ const data = await Promise.all(list.map(async (event) => await Storage.read<Data[]>(event))).then((x) => x.flat())
+ for (const item of data) {
+ if (!item) continue
+ const key = (item: Data) => {
+ switch (item.type) {
+ case "session":
+ return "session"
+ case "message":
+ return `message/${item.data.id}`
+ case "part":
+ return `${item.data.messageID}/${item.data.id}`
+ case "session_diff":
+ return "session_diff"
+ case "model":
+ return "model"
+ }
+ }
+ const id = key(item)
+ const result = Binary.search(compaction.data, id, key)
+ if (result.found) {
+ compaction.data[result.index] = item
+ } else {
+ compaction.data.splice(result.index, 0, item)
}
}
- const id = key(item)
- const result = Binary.search(compaction.data, id, key)
- if (result.found) {
- compaction.data[result.index] = item
- } else {
- compaction.data.splice(result.index, 0, item)
- }
+ compaction.event = list.at(-1)?.at(-1)
+ await Storage.write(["share_compaction", shareID], compaction)
}
- compaction.event = list.at(-1)?.at(-1)
- await Storage.write(["share_compaction", shareID], compaction)
return compaction.data
}
diff --git a/packages/enterprise/src/core/storage.ts b/packages/enterprise/src/core/storage.ts
index 6edbef9ed..b8030b4f9 100644
--- a/packages/enterprise/src/core/storage.ts
+++ b/packages/enterprise/src/core/storage.ts
@@ -6,7 +6,7 @@ export namespace Storage {
read(path: string): Promise<string | undefined>
write(path: string, value: string): Promise<void>
remove(path: string): Promise<void>
- list(options?: { prefix?: string; limit?: number; start?: string; end?: string }): Promise<string[]>
+ list(options?: { prefix?: string; limit?: number; after?: string; before?: string }): Promise<string[]>
}
function createAdapter(client: AwsClient, endpoint: string, bucket: string): Adapter {
@@ -37,13 +37,13 @@ export namespace Storage {
if (!response.ok) throw new Error(`Failed to remove ${path}: ${response.status}`)
},
- async list(options?: { prefix?: string; limit?: number; start?: string; end?: string }): Promise<string[]> {
+ async list(options?: { prefix?: string; limit?: number; after?: string; before?: string }): Promise<string[]> {
const prefix = options?.prefix || ""
const params = new URLSearchParams({ "list-type": "2", prefix })
if (options?.limit) params.set("max-keys", options.limit.toString())
- if (options?.start) {
- const startPath = prefix + options.start + ".json"
- params.set("start-after", startPath)
+ if (options?.after) {
+ const afterPath = prefix + options.after + ".json"
+ params.set("start-after", afterPath)
}
const response = await client.fetch(`${base}?${params}`)
if (!response.ok) throw new Error(`Failed to list ${prefix}: ${response.status}`)
@@ -54,9 +54,9 @@ export namespace Storage {
while ((match = regex.exec(xml)) !== null) {
keys.push(match[1])
}
- if (options?.end) {
- const endPath = prefix + options.end + ".json"
- return keys.filter((key) => key <= endPath)
+ if (options?.before) {
+ const beforePath = prefix + options.before + ".json"
+ return keys.filter((key) => key < beforePath)
}
return keys
},
@@ -108,9 +108,14 @@ export namespace Storage {
return adapter().remove(resolve(key))
}
- export async function list(options?: { prefix?: string[]; limit?: number; start?: string; end?: string }) {
+ export async function list(options?: { prefix?: string[]; limit?: number; after?: string; before?: string }) {
const p = options?.prefix ? options.prefix.join("/") + (options.prefix.length ? "/" : "") : ""
- const result = await adapter().list({ prefix: p, limit: options?.limit, start: options?.start, end: options?.end })
+ const result = await adapter().list({
+ prefix: p,
+ limit: options?.limit,
+ after: options?.after,
+ before: options?.before,
+ })
return result.map((x) => x.replace(/\.json$/, "").split("/"))
}
diff --git a/packages/enterprise/test/core/storage.test.ts b/packages/enterprise/test/core/storage.test.ts
index 27e51384c..5b5281791 100644
--- a/packages/enterprise/test/core/storage.test.ts
+++ b/packages/enterprise/test/core/storage.test.ts
@@ -2,23 +2,20 @@ import { describe, expect, test, afterAll } from "bun:test"
import { Storage } from "../../src/core/storage"
describe("core.storage", () => {
- test("should list files with start and end range", async () => {
+ test("should list files with after and before range", async () => {
await Storage.write(["test", "users", "user1"], { name: "user1" })
await Storage.write(["test", "users", "user2"], { name: "user2" })
await Storage.write(["test", "users", "user3"], { name: "user3" })
await Storage.write(["test", "users", "user4"], { name: "user4" })
await Storage.write(["test", "users", "user5"], { name: "user5" })
- const result = await Storage.list({ prefix: ["test", "users"], start: "user2", end: "user4" })
+ const result = await Storage.list({ prefix: ["test", "users"], after: "user2", before: "user4" })
- expect(result).toEqual([
- ["test", "users", "user3"],
- ["test", "users", "user4"],
- ])
+ expect(result).toEqual([["test", "users", "user3"]])
})
- test("should list files with start only", async () => {
- const result = await Storage.list({ prefix: ["test", "users"], start: "user3" })
+ test("should list files with after only", async () => {
+ const result = await Storage.list({ prefix: ["test", "users"], after: "user3" })
expect(result).toEqual([
["test", "users", "user4"],