diff options
| author | Frank <[email protected]> | 2025-05-23 14:17:45 -0400 |
|---|---|---|
| committer | Dax Raad <[email protected]> | 2025-05-26 12:40:17 -0400 |
| commit | 83974e0c95d65c72d12e7d58a287514c39f3768f (patch) | |
| tree | 274625aa4693b0d71b50eae3cfedb5d6245f99f9 /app/packages/function/src/api.ts | |
| parent | 59d43fa5da0619655c7ba3360e969bcbda1716ea (diff) | |
| download | opencode-83974e0c95d65c72d12e7d58a287514c39f3768f.tar.gz opencode-83974e0c95d65c72d12e7d58a287514c39f3768f.zip | |
Share: sync
Diffstat (limited to 'app/packages/function/src/api.ts')
| -rw-r--r-- | app/packages/function/src/api.ts | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/app/packages/function/src/api.ts b/app/packages/function/src/api.ts new file mode 100644 index 000000000..fe8dc9ab5 --- /dev/null +++ b/app/packages/function/src/api.ts @@ -0,0 +1,167 @@ +import { DurableObject } from "cloudflare:workers" +import { + DurableObjectNamespace, + ExecutionContext, +} from "@cloudflare/workers-types" +import { createHash } from "node:crypto" +import path from "node:path" +import { Resource } from "sst" + +type Bindings = { + SYNC_SERVER: DurableObjectNamespace<WebSocketHibernationServer> +} + +export class SyncServer extends DurableObject { + private files: Map<string, string> = new Map() + + constructor(ctx, env) { + super(ctx, env) + this.ctx.blockConcurrencyWhile(async () => { + this.files = await this.ctx.storage.list() + }) + } + + async publish(filename: string, content: string) { + console.log( + "SyncServer publish", + filename, + content, + "to", + this.ctx.getWebSockets().length, + "subscribers", + ) + this.files.set(filename, content) + await this.ctx.storage.put(filename, content) + + this.ctx.getWebSockets().forEach((client) => { + client.send(JSON.stringify({ filename, content })) + }) + } + + async webSocketMessage(ws, message) { + if (message === "load_history") { + } + } + + async webSocketClose(ws, code, reason, wasClean) { + ws.close(code, "Durable Object is closing WebSocket") + } + + async fetch(req: Request) { + console.log("SyncServer subscribe") + + // Creates two ends of a WebSocket connection. + const webSocketPair = new WebSocketPair() + const [client, server] = Object.values(webSocketPair) + + this.ctx.acceptWebSocket(server) + + setTimeout(() => { + this.files.forEach((content, filename) => + server.send(JSON.stringify({ filename, content })), + ) + }, 0) + + return new Response(null, { + status: 101, + webSocket: client, + }) + } +} + +export default { + async fetch(request: Request, env: Bindings, ctx: ExecutionContext) { + const url = new URL(request.url) + + if (request.method === "GET" && url.pathname === "/") { + return new Response("Hello, world!", { + headers: { "Content-Type": "text/plain" }, + }) + } + if (request.method === "POST" && url.pathname.endsWith("/share_create")) { + const body = await request.json() + const sessionID = body.session_id + const shareID = createHash("sha256").update(sessionID).digest("hex") + const infoFile = `${shareID}/info/${sessionID}.json` + const ret = await Resource.Bucket.get(infoFile) + if (ret) + return new Response("Error: Session already sharing", { status: 400 }) + + await Resource.Bucket.put(infoFile, "") + + return new Response(JSON.stringify({ share_id: shareID }), { + headers: { "Content-Type": "application/json" }, + }) + } + if (request.method === "POST" && url.pathname.endsWith("/share_delete")) { + const body = await request.json() + const sessionID = body.session_id + const shareID = body.share_id + const infoFile = `${shareID}/info/${sessionID}.json` + await Resource.Bucket.delete(infoFile) + return new Response(JSON.stringify({}), { + headers: { "Content-Type": "application/json" }, + }) + } + if (request.method === "POST" && url.pathname.endsWith("/share_sync")) { + const body = await request.json() + const sessionID = body.session_id + const shareID = body.share_id + const filename = body.filename + const content = body.content + + // validate filename + if (!filename.startsWith("info/") && !filename.startsWith("message/")) + return new Response("Error: Invalid filename", { status: 400 }) + + const infoFile = `${shareID}/info/${sessionID}.json` + const ret = await Resource.Bucket.get(infoFile) + if (!ret) + return new Response("Error: Session not shared", { status: 400 }) + + // send message to server + const id = env.SYNC_SERVER.idFromName(sessionID) + const stub = env.SYNC_SERVER.get(id) + await stub.publish(filename, content) + + // store message + await Resource.Bucket.put(`${shareID}/${filename}`, content) + + return new Response(JSON.stringify({}), { + headers: { "Content-Type": "application/json" }, + }) + } + if (request.method === "GET" && url.pathname.endsWith("/share_poll")) { + // Expect to receive a WebSocket Upgrade request. + // If there is one, accept the request and return a WebSocket Response. + const upgradeHeader = request.headers.get("Upgrade") + if (!upgradeHeader || upgradeHeader !== "websocket") { + return new Response("Error: Upgrade header is required", { + status: 426, + }) + } + + // get query parameters + const shareID = url.searchParams.get("share_id") + if (!shareID) + return new Response("Error: Share ID is required", { status: 400 }) + + // Get session ID + const listRet = await Resource.Bucket.list({ + prefix: `${shareID}/info/`, + delimiter: "/", + }) + + if (listRet.objects.length === 0) + return new Response("Error: Session not shared", { status: 400 }) + if (listRet.objects.length > 1) + return new Response("Error: Multiple sessions found", { status: 400 }) + const sessionID = path.parse(listRet.objects[0].key).name + + // subscribe to server + const id = env.SYNC_SERVER.idFromName(sessionID) + const stub = env.SYNC_SERVER.get(id) + return stub.fetch(request) + } + }, +} |
