summaryrefslogtreecommitdiffhomepage
path: root/app/packages/function
diff options
context:
space:
mode:
authorFrank <[email protected]>2025-05-23 14:17:45 -0400
committerDax Raad <[email protected]>2025-05-26 12:40:17 -0400
commit83974e0c95d65c72d12e7d58a287514c39f3768f (patch)
tree274625aa4693b0d71b50eae3cfedb5d6245f99f9 /app/packages/function
parent59d43fa5da0619655c7ba3360e969bcbda1716ea (diff)
downloadopencode-83974e0c95d65c72d12e7d58a287514c39f3768f.tar.gz
opencode-83974e0c95d65c72d12e7d58a287514c39f3768f.zip
Share: sync
Diffstat (limited to 'app/packages/function')
-rw-r--r--app/packages/function/package.json10
-rw-r--r--app/packages/function/src/api.ts167
-rw-r--r--app/packages/function/sst-env.d.ts25
-rw-r--r--app/packages/function/tsconfig.json8
4 files changed, 210 insertions, 0 deletions
diff --git a/app/packages/function/package.json b/app/packages/function/package.json
new file mode 100644
index 000000000..46c83e840
--- /dev/null
+++ b/app/packages/function/package.json
@@ -0,0 +1,10 @@
+{
+ "name": "@opencode/function",
+ "version": "0.0.1",
+ "$schema": "https://json.schemastore.org/package.json",
+ "private": true,
+ "type": "module",
+ "devDependencies": {
+ "@cloudflare/workers-types": "^4.20250522.0"
+ }
+}
diff --git a/app/packages/function/src/api.ts b/app/packages/function/src/api.ts
new file mode 100644
index 000000000..fe8dc9ab5
--- /dev/null
+++ b/app/packages/function/src/api.ts
@@ -0,0 +1,167 @@
+import { DurableObject } from "cloudflare:workers"
+import {
+ DurableObjectNamespace,
+ ExecutionContext,
+} from "@cloudflare/workers-types"
+import { createHash } from "node:crypto"
+import path from "node:path"
+import { Resource } from "sst"
+
+type Bindings = {
+ SYNC_SERVER: DurableObjectNamespace<WebSocketHibernationServer>
+}
+
+export class SyncServer extends DurableObject {
+ private files: Map<string, string> = new Map()
+
+ constructor(ctx, env) {
+ super(ctx, env)
+ this.ctx.blockConcurrencyWhile(async () => {
+ this.files = await this.ctx.storage.list()
+ })
+ }
+
+ async publish(filename: string, content: string) {
+ console.log(
+ "SyncServer publish",
+ filename,
+ content,
+ "to",
+ this.ctx.getWebSockets().length,
+ "subscribers",
+ )
+ this.files.set(filename, content)
+ await this.ctx.storage.put(filename, content)
+
+ this.ctx.getWebSockets().forEach((client) => {
+ client.send(JSON.stringify({ filename, content }))
+ })
+ }
+
+ async webSocketMessage(ws, message) {
+ if (message === "load_history") {
+ }
+ }
+
+ async webSocketClose(ws, code, reason, wasClean) {
+ ws.close(code, "Durable Object is closing WebSocket")
+ }
+
+ async fetch(req: Request) {
+ console.log("SyncServer subscribe")
+
+ // Creates two ends of a WebSocket connection.
+ const webSocketPair = new WebSocketPair()
+ const [client, server] = Object.values(webSocketPair)
+
+ this.ctx.acceptWebSocket(server)
+
+ setTimeout(() => {
+ this.files.forEach((content, filename) =>
+ server.send(JSON.stringify({ filename, content })),
+ )
+ }, 0)
+
+ return new Response(null, {
+ status: 101,
+ webSocket: client,
+ })
+ }
+}
+
+export default {
+ async fetch(request: Request, env: Bindings, ctx: ExecutionContext) {
+ const url = new URL(request.url)
+
+ if (request.method === "GET" && url.pathname === "/") {
+ return new Response("Hello, world!", {
+ headers: { "Content-Type": "text/plain" },
+ })
+ }
+ if (request.method === "POST" && url.pathname.endsWith("/share_create")) {
+ const body = await request.json()
+ const sessionID = body.session_id
+ const shareID = createHash("sha256").update(sessionID).digest("hex")
+ const infoFile = `${shareID}/info/${sessionID}.json`
+ const ret = await Resource.Bucket.get(infoFile)
+ if (ret)
+ return new Response("Error: Session already sharing", { status: 400 })
+
+ await Resource.Bucket.put(infoFile, "")
+
+ return new Response(JSON.stringify({ share_id: shareID }), {
+ headers: { "Content-Type": "application/json" },
+ })
+ }
+ if (request.method === "POST" && url.pathname.endsWith("/share_delete")) {
+ const body = await request.json()
+ const sessionID = body.session_id
+ const shareID = body.share_id
+ const infoFile = `${shareID}/info/${sessionID}.json`
+ await Resource.Bucket.delete(infoFile)
+ return new Response(JSON.stringify({}), {
+ headers: { "Content-Type": "application/json" },
+ })
+ }
+ if (request.method === "POST" && url.pathname.endsWith("/share_sync")) {
+ const body = await request.json()
+ const sessionID = body.session_id
+ const shareID = body.share_id
+ const filename = body.filename
+ const content = body.content
+
+ // validate filename
+ if (!filename.startsWith("info/") && !filename.startsWith("message/"))
+ return new Response("Error: Invalid filename", { status: 400 })
+
+ const infoFile = `${shareID}/info/${sessionID}.json`
+ const ret = await Resource.Bucket.get(infoFile)
+ if (!ret)
+ return new Response("Error: Session not shared", { status: 400 })
+
+ // send message to server
+ const id = env.SYNC_SERVER.idFromName(sessionID)
+ const stub = env.SYNC_SERVER.get(id)
+ await stub.publish(filename, content)
+
+ // store message
+ await Resource.Bucket.put(`${shareID}/${filename}`, content)
+
+ return new Response(JSON.stringify({}), {
+ headers: { "Content-Type": "application/json" },
+ })
+ }
+ if (request.method === "GET" && url.pathname.endsWith("/share_poll")) {
+ // Expect to receive a WebSocket Upgrade request.
+ // If there is one, accept the request and return a WebSocket Response.
+ const upgradeHeader = request.headers.get("Upgrade")
+ if (!upgradeHeader || upgradeHeader !== "websocket") {
+ return new Response("Error: Upgrade header is required", {
+ status: 426,
+ })
+ }
+
+ // get query parameters
+ const shareID = url.searchParams.get("share_id")
+ if (!shareID)
+ return new Response("Error: Share ID is required", { status: 400 })
+
+ // Get session ID
+ const listRet = await Resource.Bucket.list({
+ prefix: `${shareID}/info/`,
+ delimiter: "/",
+ })
+
+ if (listRet.objects.length === 0)
+ return new Response("Error: Session not shared", { status: 400 })
+ if (listRet.objects.length > 1)
+ return new Response("Error: Multiple sessions found", { status: 400 })
+ const sessionID = path.parse(listRet.objects[0].key).name
+
+ // subscribe to server
+ const id = env.SYNC_SERVER.idFromName(sessionID)
+ const stub = env.SYNC_SERVER.get(id)
+ return stub.fetch(request)
+ }
+ },
+}
diff --git a/app/packages/function/sst-env.d.ts b/app/packages/function/sst-env.d.ts
new file mode 100644
index 000000000..41727ee9d
--- /dev/null
+++ b/app/packages/function/sst-env.d.ts
@@ -0,0 +1,25 @@
+/* This file is auto-generated by SST. Do not edit. */
+/* tslint:disable */
+/* eslint-disable */
+/* deno-fmt-ignore-file */
+
+import "sst"
+declare module "sst" {
+ export interface Resource {
+ "Web": {
+ "type": "sst.cloudflare.StaticSite"
+ "url": string
+ }
+ }
+}
+// cloudflare
+import * as cloudflare from "@cloudflare/workers-types";
+declare module "sst" {
+ export interface Resource {
+ "Api": cloudflare.Service
+ "Bucket": cloudflare.R2Bucket
+ }
+}
+
+import "sst"
+export {} \ No newline at end of file
diff --git a/app/packages/function/tsconfig.json b/app/packages/function/tsconfig.json
new file mode 100644
index 000000000..f66a7be0c
--- /dev/null
+++ b/app/packages/function/tsconfig.json
@@ -0,0 +1,8 @@
+{
+ "$schema": "https://json.schemastore.org/tsconfig",
+ "extends": "@tsconfig/node22/tsconfig.json",
+ "compilerOptions": {
+ "module": "ESNext",
+ "moduleResolution": "bundler"
+ }
+}