diff options
| author | Dax Raad <[email protected]> | 2025-05-18 14:13:04 -0400 |
|---|---|---|
| committer | Dax Raad <[email protected]> | 2025-05-26 12:40:17 -0400 |
| commit | 0e303e6508edb4374213d1f98ec383b266339774 (patch) | |
| tree | f7dc146eb58126f55f470ef135b66c678bf16898 /js/src | |
| parent | bcd2fd68b7fa00af055f558049994c2975d9515d (diff) | |
| download | opencode-0e303e6508edb4374213d1f98ec383b266339774.tar.gz opencode-0e303e6508edb4374213d1f98ec383b266339774.zip | |
sync
Diffstat (limited to 'js/src')
| -rw-r--r-- | js/src/bus/index.ts | 79 | ||||
| -rw-r--r-- | js/src/id/id.ts | 11 | ||||
| -rw-r--r-- | js/src/index.ts | 22 | ||||
| -rw-r--r-- | js/src/server/server.ts | 86 | ||||
| -rw-r--r-- | js/src/session/session.ts | 138 | ||||
| -rw-r--r-- | js/src/storage/storage.ts | 20 | ||||
| -rw-r--r-- | js/src/util/event.ts | 0 | ||||
| -rw-r--r-- | js/src/util/log.ts | 21 |
8 files changed, 270 insertions, 107 deletions
diff --git a/js/src/bus/index.ts b/js/src/bus/index.ts new file mode 100644 index 000000000..5359debd9 --- /dev/null +++ b/js/src/bus/index.ts @@ -0,0 +1,79 @@ +import type { z, ZodSchema } from "zod/v4"; +import { App } from "../app"; +import { Log } from "../util/log"; + +export namespace Bus { + const log = Log.create({ service: "bus" }); + type Subscription = (event: any) => void; + + const state = App.state("bus", () => { + const subscriptions = new Map<any, Subscription[]>(); + + return { + subscriptions, + }; + }); + + export type EventDefinition = ReturnType<typeof event>; + + export function event<Type extends string, Properties extends ZodSchema>( + type: Type, + properties: Properties, + ) { + return { + type, + properties, + }; + } + + export function publish<Definition extends EventDefinition>( + def: Definition, + properties: z.output<Definition["properties"]>, + ) { + const payload = { + type: def.type, + properties, + }; + log.info("publishing", { + type: def.type, + ...properties, + }); + for (const key of [def.type, "*"]) { + const match = state().subscriptions.get(key); + for (const sub of match ?? []) { + sub(payload); + } + } + } + + export function subscribe<Definition extends EventDefinition>( + def: Definition, + callback: (event: { + type: Definition["type"]; + properties: z.infer<Definition["properties"]>; + }) => void, + ) { + return raw(def.type, callback); + } + + export function subscribeAll(callback: (event: any) => void) { + return raw("*", callback); + } + + function raw(type: string, callback: (event: any) => void) { + log.info("subscribing", { type }); + const subscriptions = state().subscriptions; + let match = subscriptions.get(type) ?? []; + match.push(callback); + subscriptions.set(type, match); + + return () => { + log.info("unsubscribing", { type }); + const match = subscriptions.get(type); + if (!match) return; + const index = match.indexOf(callback); + if (index === -1) return; + match.splice(index, 1); + }; + } +} diff --git a/js/src/id/id.ts b/js/src/id/id.ts index e4744f172..39ee5326f 100644 --- a/js/src/id/id.ts +++ b/js/src/id/id.ts @@ -1,4 +1,4 @@ -import { z } from "zod"; +import { z } from "zod/v4"; import { randomBytes } from "crypto"; export namespace Identifier { @@ -11,7 +11,7 @@ export namespace Identifier { return z.string().startsWith(prefixes[prefix]); } - const LENGTH = 24; + const LENGTH = 26; export function ascending(prefix: keyof typeof prefixes, given?: string) { return generateID(prefix, false, given); @@ -45,6 +45,11 @@ export namespace Identifier { const randLength = (LENGTH - 12) / 2; const random = randomBytes(randLength); - return prefix + "_" + timeBytes.toString("hex") + random.toString("hex"); + return ( + prefixes[prefix] + + "_" + + timeBytes.toString("hex") + + random.toString("hex") + ); } } diff --git a/js/src/index.ts b/js/src/index.ts index 72c32c8e9..8f98310be 100644 --- a/js/src/index.ts +++ b/js/src/index.ts @@ -1,29 +1,11 @@ import { App } from "./app"; import process from "node:process"; -import { RPC } from "./server/server"; -import { Session } from "./session/session"; -import { Identifier } from "./id/id"; +import { Server } from "./server/server"; const app = await App.create({ directory: process.cwd(), }); App.provide(app, async () => { - const sessionID = await Session.list() - [Symbol.asyncIterator]() - .next() - .then((v) => v.value ?? Session.create().then((s) => s.id)); - - await Session.chat(sessionID, { - role: "user", - id: Identifier.ascending("message"), - parts: [ - { - type: "text", - text: "Hey how are you? try to use tools", - }, - ], - }); - - const rpc = RPC.listen(); + const server = Server.listen(); }); diff --git a/js/src/server/server.ts b/js/src/server/server.ts index 6266e9421..dd066c400 100644 --- a/js/src/server/server.ts +++ b/js/src/server/server.ts @@ -1,34 +1,64 @@ import { Log } from "../util/log"; +import { Bus } from "../bus"; -export namespace RPC { - const log = Log.create({ service: "rpc" }); +import { Hono } from "hono"; +import { streamSSE } from "hono/streaming"; +import { Session } from "../session/session"; +import { zValidator } from "@hono/zod-validator"; +import { z } from "zod"; + +export namespace Server { + const log = Log.create({ service: "server" }); const PORT = 16713; - export function listen(input?: { port?: number }) { - const port = input?.port ?? PORT; - log.info("trying", { port }); - try { - const server = Bun.serve({ - port, - websocket: { - open() {}, - message() {}, - }, - routes: { - "/ws": (req, server) => { - if (server.upgrade(req)) return; - return new Response("Not a websocket request", { status: 400 }); - }, + + export type App = ReturnType<typeof listen>; + + export function listen() { + const app = new Hono() + .get("/event", async (c) => { + log.info("event connected"); + return streamSSE(c, async (stream) => { + const unsub = Bus.subscribeAll(async (event) => { + await stream.writeSSE({ + data: JSON.stringify(event), + }); + }); + await new Promise<void>((resolve) => { + stream.onAbort(() => { + unsub(); + resolve(); + log.info("event disconnected"); + }); + }); + }); + }) + .post("/session_create", async (c) => { + const session = await Session.create(); + return c.json(session); + }) + .post( + "/session_chat", + zValidator( + "json", + z.object({ + sessionID: z.string(), + parts: z.custom<Session.Message["parts"]>(), + }), + ), + async (c) => { + const body = c.req.valid("json"); + const msg = await Session.chat(body.sessionID, ...body.parts); + return c.json(msg); }, - }); - log.info("listening", { port }); - return { - server, - }; - } catch (e: any) { - if (e?.code === "EADDRINUSE") { - return listen({ port: port + 1 }); - } - throw e; - } + ); + + Bun.serve({ + port: PORT, + hostname: "0.0.0.0", + idleTimeout: 0, + fetch: app.fetch, + }); + + return app; } } diff --git a/js/src/session/session.ts b/js/src/session/session.ts index d07c799b9..fb45f0e59 100644 --- a/js/src/session/session.ts +++ b/js/src/session/session.ts @@ -1,5 +1,5 @@ import path from "path"; -import { z } from "zod"; +import { z } from "zod/v3"; import { App } from "../app/"; import { Identifier } from "../id/id"; import { LLM } from "../llm/llm"; @@ -11,7 +11,9 @@ import { tool, type TextUIPart, type ToolInvocationUIPart, + type UIDataTypes, type UIMessage, + type UIMessagePart, } from "ai"; export namespace Session { @@ -20,11 +22,18 @@ export namespace Session { export interface Info { id: string; title: string; + tokens: { + input: number; + output: number; + reasoning: number; + }; } + export type Message = UIMessage<{ sessionID: string }>; + const state = App.state("session", () => { const sessions = new Map<string, Info>(); - const messages = new Map<string, UIMessage[]>(); + const messages = new Map<string, Message[]>(); return { sessions, @@ -36,12 +45,14 @@ export namespace Session { const result: Info = { id: Identifier.descending("session"), title: "New Session - " + new Date().toISOString(), + tokens: { + input: 0, + output: 0, + reasoning: 0, + }, }; log.info("created", result); - await Storage.write( - "session/info/" + result.id + ".json", - JSON.stringify(result), - ); + await Storage.writeJSON("session/info/" + result.id, result); state().sessions.set(result.id, result); return result; } @@ -51,23 +62,35 @@ export namespace Session { if (result) { return result; } - const read = JSON.parse(await Storage.readToString("session/info/" + id)); + const read = await Storage.readJSON<Info>("session/info/" + id); state().sessions.set(id, read); - return read; + return read as Info; + } + + export async function update(session: Info) { + state().sessions.set(session.id, session); + await Storage.writeJSON("session/info/" + session.id, session); } export async function messages(sessionID: string) { - const result = state().messages.get(sessionID); - if (result) { - return result; + const match = state().messages.get(sessionID); + if (match) { + return match; + } + const result = [] as Message[]; + const list = await Storage.list("session/message/" + sessionID) + .then((x) => x.toArray()) + .catch(() => {}); + if (!list) return result; + for (const item of list) { + const messageID = path.basename(item.path, ".json"); + const read = await Storage.readJSON<Message>( + "session/message/" + sessionID + "/" + messageID, + ); + result.push(read); } - const read = JSON.parse( - await Storage.readToString( - "session/message/" + sessionID + ".json", - ).catch(() => "[]"), - ); - state().messages.set(sessionID, read); - return read; + state().messages.set(sessionID, result); + return result; } export async function* list() { @@ -81,11 +104,23 @@ export namespace Session { } } - export async function chat(sessionID: string, msg: UIMessage) { + export async function chat( + sessionID: string, + ...parts: UIMessagePart<UIDataTypes>[] + ) { + const session = await get(sessionID); const l = log.clone().tag("session", sessionID); l.info("chatting"); - const msgs = (await messages(sessionID)) ?? [ - { + + const msgs = await messages(sessionID); + async function write(msg: Message) { + return Storage.writeJSON( + "session/message/" + sessionID + "/" + msg.id, + msg, + ); + } + if (msgs.length === 0) { + const system: UIMessage<{ sessionID: string }> = { id: Identifier.ascending("message"), role: "system", parts: [ @@ -94,40 +129,38 @@ export namespace Session { text: "You are a helpful assistant called opencode", }, ], - } as UIMessage, - ]; - msgs.push(msg); - state().messages.set(sessionID, msgs); - async function write() { - return Storage.write( - "session/message/" + sessionID + ".json", - JSON.stringify(msgs), - ); + metadata: { + sessionID, + }, + }; + msgs.push(system); + state().messages.set(sessionID, msgs); + await write(system); } - await write(); + const msg: Message = { + role: "user", + id: Identifier.ascending("message"), + parts, + metadata: { + sessionID, + }, + }; + msgs.push(msg); + await write(msg); const model = await LLM.findModel("claude-3-7-sonnet-20250219"); const result = streamText({ messages: convertToModelMessages(msgs), temperature: 0, - tools: { - test: tool({ - id: "opencode.test" as const, - parameters: z.object({ - feeling: z.string(), - }), - execute: async () => { - return `Hello`; - }, - description: "call this tool to get a greeting", - }), - }, model, }); - const next: UIMessage = { + const next: Message = { id: Identifier.ascending("message"), role: "assistant", parts: [], + metadata: { + sessionID, + }, }; msgs.push(next); let text: TextUIPart | undefined; @@ -135,7 +168,9 @@ export namespace Session { while (true) { const { done, value } = await reader.read(); if (done) break; - l.info("part", value); + l.info("part", { + type: value.type, + }); switch (value.type) { case "start": break; @@ -175,15 +210,15 @@ export namespace Session { state: "result", result: value.result, }; - await write(); } break; case "finish": - await write(); break; case "finish-step": - await write(); + break; + case "error": + log.error("error", value); break; default: @@ -191,6 +226,13 @@ export namespace Session { type: value.type, }); } + await write(next); } + const usage = await result.totalUsage; + session.tokens.input += usage.inputTokens || 0; + session.tokens.output += usage.outputTokens || 0; + session.tokens.reasoning += usage.reasoningTokens || 0; + await update(session); + return next; } } diff --git a/js/src/storage/storage.ts b/js/src/storage/storage.ts index 19e8dc06f..89c8b4d17 100644 --- a/js/src/storage/storage.ts +++ b/js/src/storage/storage.ts @@ -4,10 +4,19 @@ import fs from "fs/promises"; import { Log } from "../util/log"; import { App } from "../app"; import { AppPath } from "../app/path"; +import { Bus } from "../bus"; +import z from "zod/v4"; export namespace Storage { const log = Log.create({ service: "storage" }); + export const Event = { + Write: Bus.event( + "storage.write", + z.object({ key: z.string(), body: z.any() }), + ), + }; + const state = App.state("storage", async () => { const app = await App.use(); const storageDir = AppPath.storage(app.root); @@ -36,4 +45,15 @@ export namespace Storage { export const read = expose("read"); export const list = expose("list"); export const readToString = expose("readToString"); + + export async function readJSON<T>(key: string) { + const data = await readToString(key + ".json"); + return JSON.parse(data) as T; + } + + export async function writeJSON<T>(key: string, data: T) { + Bus.publish(Event.Write, { key, body: data }); + const json = JSON.stringify(data); + await write(key + ".json", json); + } } diff --git a/js/src/util/event.ts b/js/src/util/event.ts new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/js/src/util/event.ts diff --git a/js/src/util/log.ts b/js/src/util/log.ts index 9de4eb495..8f7157140 100644 --- a/js/src/util/log.ts +++ b/js/src/util/log.ts @@ -2,16 +2,21 @@ export namespace Log { export function create(tags?: Record<string, any>) { tags = tags || {}; + function build(message: any, extra?: Record<string, any>) { + const prefix = Object.entries({ + ...tags, + ...extra, + }) + .map(([key, value]) => `${key}=${value}`) + .join(" "); + return [prefix, message]; + } const result = { info(message?: any, extra?: Record<string, any>) { - const prefix = Object.entries({ - ...tags, - ...extra, - }) - .map(([key, value]) => `${key}=${value}`) - .join(" "); - console.log(prefix, message); - return result; + console.log(...build(message, extra)); + }, + error(message?: any, extra?: Record<string, any>) { + console.error(...build(message, extra)); }, tag(key: string, value: string) { if (tags) tags[key] = value; |
