summaryrefslogtreecommitdiffhomepage
path: root/js/src
diff options
context:
space:
mode:
authorDax Raad <[email protected]>2025-05-18 14:13:04 -0400
committerDax Raad <[email protected]>2025-05-26 12:40:17 -0400
commit0e303e6508edb4374213d1f98ec383b266339774 (patch)
treef7dc146eb58126f55f470ef135b66c678bf16898 /js/src
parentbcd2fd68b7fa00af055f558049994c2975d9515d (diff)
downloadopencode-0e303e6508edb4374213d1f98ec383b266339774.tar.gz
opencode-0e303e6508edb4374213d1f98ec383b266339774.zip
sync
Diffstat (limited to 'js/src')
-rw-r--r--js/src/bus/index.ts79
-rw-r--r--js/src/id/id.ts11
-rw-r--r--js/src/index.ts22
-rw-r--r--js/src/server/server.ts86
-rw-r--r--js/src/session/session.ts138
-rw-r--r--js/src/storage/storage.ts20
-rw-r--r--js/src/util/event.ts0
-rw-r--r--js/src/util/log.ts21
8 files changed, 270 insertions, 107 deletions
diff --git a/js/src/bus/index.ts b/js/src/bus/index.ts
new file mode 100644
index 000000000..5359debd9
--- /dev/null
+++ b/js/src/bus/index.ts
@@ -0,0 +1,79 @@
+import type { z, ZodSchema } from "zod/v4";
+import { App } from "../app";
+import { Log } from "../util/log";
+
+export namespace Bus {
+ const log = Log.create({ service: "bus" });
+ type Subscription = (event: any) => void;
+
+ const state = App.state("bus", () => {
+ const subscriptions = new Map<any, Subscription[]>();
+
+ return {
+ subscriptions,
+ };
+ });
+
+ export type EventDefinition = ReturnType<typeof event>;
+
+ export function event<Type extends string, Properties extends ZodSchema>(
+ type: Type,
+ properties: Properties,
+ ) {
+ return {
+ type,
+ properties,
+ };
+ }
+
+ export function publish<Definition extends EventDefinition>(
+ def: Definition,
+ properties: z.output<Definition["properties"]>,
+ ) {
+ const payload = {
+ type: def.type,
+ properties,
+ };
+ log.info("publishing", {
+ type: def.type,
+ ...properties,
+ });
+ for (const key of [def.type, "*"]) {
+ const match = state().subscriptions.get(key);
+ for (const sub of match ?? []) {
+ sub(payload);
+ }
+ }
+ }
+
+ export function subscribe<Definition extends EventDefinition>(
+ def: Definition,
+ callback: (event: {
+ type: Definition["type"];
+ properties: z.infer<Definition["properties"]>;
+ }) => void,
+ ) {
+ return raw(def.type, callback);
+ }
+
+ export function subscribeAll(callback: (event: any) => void) {
+ return raw("*", callback);
+ }
+
+ function raw(type: string, callback: (event: any) => void) {
+ log.info("subscribing", { type });
+ const subscriptions = state().subscriptions;
+ let match = subscriptions.get(type) ?? [];
+ match.push(callback);
+ subscriptions.set(type, match);
+
+ return () => {
+ log.info("unsubscribing", { type });
+ const match = subscriptions.get(type);
+ if (!match) return;
+ const index = match.indexOf(callback);
+ if (index === -1) return;
+ match.splice(index, 1);
+ };
+ }
+}
diff --git a/js/src/id/id.ts b/js/src/id/id.ts
index e4744f172..39ee5326f 100644
--- a/js/src/id/id.ts
+++ b/js/src/id/id.ts
@@ -1,4 +1,4 @@
-import { z } from "zod";
+import { z } from "zod/v4";
import { randomBytes } from "crypto";
export namespace Identifier {
@@ -11,7 +11,7 @@ export namespace Identifier {
return z.string().startsWith(prefixes[prefix]);
}
- const LENGTH = 24;
+ const LENGTH = 26;
export function ascending(prefix: keyof typeof prefixes, given?: string) {
return generateID(prefix, false, given);
@@ -45,6 +45,11 @@ export namespace Identifier {
const randLength = (LENGTH - 12) / 2;
const random = randomBytes(randLength);
- return prefix + "_" + timeBytes.toString("hex") + random.toString("hex");
+ return (
+ prefixes[prefix] +
+ "_" +
+ timeBytes.toString("hex") +
+ random.toString("hex")
+ );
}
}
diff --git a/js/src/index.ts b/js/src/index.ts
index 72c32c8e9..8f98310be 100644
--- a/js/src/index.ts
+++ b/js/src/index.ts
@@ -1,29 +1,11 @@
import { App } from "./app";
import process from "node:process";
-import { RPC } from "./server/server";
-import { Session } from "./session/session";
-import { Identifier } from "./id/id";
+import { Server } from "./server/server";
const app = await App.create({
directory: process.cwd(),
});
App.provide(app, async () => {
- const sessionID = await Session.list()
- [Symbol.asyncIterator]()
- .next()
- .then((v) => v.value ?? Session.create().then((s) => s.id));
-
- await Session.chat(sessionID, {
- role: "user",
- id: Identifier.ascending("message"),
- parts: [
- {
- type: "text",
- text: "Hey how are you? try to use tools",
- },
- ],
- });
-
- const rpc = RPC.listen();
+ const server = Server.listen();
});
diff --git a/js/src/server/server.ts b/js/src/server/server.ts
index 6266e9421..dd066c400 100644
--- a/js/src/server/server.ts
+++ b/js/src/server/server.ts
@@ -1,34 +1,64 @@
import { Log } from "../util/log";
+import { Bus } from "../bus";
-export namespace RPC {
- const log = Log.create({ service: "rpc" });
+import { Hono } from "hono";
+import { streamSSE } from "hono/streaming";
+import { Session } from "../session/session";
+import { zValidator } from "@hono/zod-validator";
+import { z } from "zod";
+
+export namespace Server {
+ const log = Log.create({ service: "server" });
const PORT = 16713;
- export function listen(input?: { port?: number }) {
- const port = input?.port ?? PORT;
- log.info("trying", { port });
- try {
- const server = Bun.serve({
- port,
- websocket: {
- open() {},
- message() {},
- },
- routes: {
- "/ws": (req, server) => {
- if (server.upgrade(req)) return;
- return new Response("Not a websocket request", { status: 400 });
- },
+
+ export type App = ReturnType<typeof listen>;
+
+ export function listen() {
+ const app = new Hono()
+ .get("/event", async (c) => {
+ log.info("event connected");
+ return streamSSE(c, async (stream) => {
+ const unsub = Bus.subscribeAll(async (event) => {
+ await stream.writeSSE({
+ data: JSON.stringify(event),
+ });
+ });
+ await new Promise<void>((resolve) => {
+ stream.onAbort(() => {
+ unsub();
+ resolve();
+ log.info("event disconnected");
+ });
+ });
+ });
+ })
+ .post("/session_create", async (c) => {
+ const session = await Session.create();
+ return c.json(session);
+ })
+ .post(
+ "/session_chat",
+ zValidator(
+ "json",
+ z.object({
+ sessionID: z.string(),
+ parts: z.custom<Session.Message["parts"]>(),
+ }),
+ ),
+ async (c) => {
+ const body = c.req.valid("json");
+ const msg = await Session.chat(body.sessionID, ...body.parts);
+ return c.json(msg);
},
- });
- log.info("listening", { port });
- return {
- server,
- };
- } catch (e: any) {
- if (e?.code === "EADDRINUSE") {
- return listen({ port: port + 1 });
- }
- throw e;
- }
+ );
+
+ Bun.serve({
+ port: PORT,
+ hostname: "0.0.0.0",
+ idleTimeout: 0,
+ fetch: app.fetch,
+ });
+
+ return app;
}
}
diff --git a/js/src/session/session.ts b/js/src/session/session.ts
index d07c799b9..fb45f0e59 100644
--- a/js/src/session/session.ts
+++ b/js/src/session/session.ts
@@ -1,5 +1,5 @@
import path from "path";
-import { z } from "zod";
+import { z } from "zod/v3";
import { App } from "../app/";
import { Identifier } from "../id/id";
import { LLM } from "../llm/llm";
@@ -11,7 +11,9 @@ import {
tool,
type TextUIPart,
type ToolInvocationUIPart,
+ type UIDataTypes,
type UIMessage,
+ type UIMessagePart,
} from "ai";
export namespace Session {
@@ -20,11 +22,18 @@ export namespace Session {
export interface Info {
id: string;
title: string;
+ tokens: {
+ input: number;
+ output: number;
+ reasoning: number;
+ };
}
+ export type Message = UIMessage<{ sessionID: string }>;
+
const state = App.state("session", () => {
const sessions = new Map<string, Info>();
- const messages = new Map<string, UIMessage[]>();
+ const messages = new Map<string, Message[]>();
return {
sessions,
@@ -36,12 +45,14 @@ export namespace Session {
const result: Info = {
id: Identifier.descending("session"),
title: "New Session - " + new Date().toISOString(),
+ tokens: {
+ input: 0,
+ output: 0,
+ reasoning: 0,
+ },
};
log.info("created", result);
- await Storage.write(
- "session/info/" + result.id + ".json",
- JSON.stringify(result),
- );
+ await Storage.writeJSON("session/info/" + result.id, result);
state().sessions.set(result.id, result);
return result;
}
@@ -51,23 +62,35 @@ export namespace Session {
if (result) {
return result;
}
- const read = JSON.parse(await Storage.readToString("session/info/" + id));
+ const read = await Storage.readJSON<Info>("session/info/" + id);
state().sessions.set(id, read);
- return read;
+ return read as Info;
+ }
+
+ export async function update(session: Info) {
+ state().sessions.set(session.id, session);
+ await Storage.writeJSON("session/info/" + session.id, session);
}
export async function messages(sessionID: string) {
- const result = state().messages.get(sessionID);
- if (result) {
- return result;
+ const match = state().messages.get(sessionID);
+ if (match) {
+ return match;
+ }
+ const result = [] as Message[];
+ const list = await Storage.list("session/message/" + sessionID)
+ .then((x) => x.toArray())
+ .catch(() => {});
+ if (!list) return result;
+ for (const item of list) {
+ const messageID = path.basename(item.path, ".json");
+ const read = await Storage.readJSON<Message>(
+ "session/message/" + sessionID + "/" + messageID,
+ );
+ result.push(read);
}
- const read = JSON.parse(
- await Storage.readToString(
- "session/message/" + sessionID + ".json",
- ).catch(() => "[]"),
- );
- state().messages.set(sessionID, read);
- return read;
+ state().messages.set(sessionID, result);
+ return result;
}
export async function* list() {
@@ -81,11 +104,23 @@ export namespace Session {
}
}
- export async function chat(sessionID: string, msg: UIMessage) {
+ export async function chat(
+ sessionID: string,
+ ...parts: UIMessagePart<UIDataTypes>[]
+ ) {
+ const session = await get(sessionID);
const l = log.clone().tag("session", sessionID);
l.info("chatting");
- const msgs = (await messages(sessionID)) ?? [
- {
+
+ const msgs = await messages(sessionID);
+ async function write(msg: Message) {
+ return Storage.writeJSON(
+ "session/message/" + sessionID + "/" + msg.id,
+ msg,
+ );
+ }
+ if (msgs.length === 0) {
+ const system: UIMessage<{ sessionID: string }> = {
id: Identifier.ascending("message"),
role: "system",
parts: [
@@ -94,40 +129,38 @@ export namespace Session {
text: "You are a helpful assistant called opencode",
},
],
- } as UIMessage,
- ];
- msgs.push(msg);
- state().messages.set(sessionID, msgs);
- async function write() {
- return Storage.write(
- "session/message/" + sessionID + ".json",
- JSON.stringify(msgs),
- );
+ metadata: {
+ sessionID,
+ },
+ };
+ msgs.push(system);
+ state().messages.set(sessionID, msgs);
+ await write(system);
}
- await write();
+ const msg: Message = {
+ role: "user",
+ id: Identifier.ascending("message"),
+ parts,
+ metadata: {
+ sessionID,
+ },
+ };
+ msgs.push(msg);
+ await write(msg);
const model = await LLM.findModel("claude-3-7-sonnet-20250219");
const result = streamText({
messages: convertToModelMessages(msgs),
temperature: 0,
- tools: {
- test: tool({
- id: "opencode.test" as const,
- parameters: z.object({
- feeling: z.string(),
- }),
- execute: async () => {
- return `Hello`;
- },
- description: "call this tool to get a greeting",
- }),
- },
model,
});
- const next: UIMessage = {
+ const next: Message = {
id: Identifier.ascending("message"),
role: "assistant",
parts: [],
+ metadata: {
+ sessionID,
+ },
};
msgs.push(next);
let text: TextUIPart | undefined;
@@ -135,7 +168,9 @@ export namespace Session {
while (true) {
const { done, value } = await reader.read();
if (done) break;
- l.info("part", value);
+ l.info("part", {
+ type: value.type,
+ });
switch (value.type) {
case "start":
break;
@@ -175,15 +210,15 @@ export namespace Session {
state: "result",
result: value.result,
};
- await write();
}
break;
case "finish":
- await write();
break;
case "finish-step":
- await write();
+ break;
+ case "error":
+ log.error("error", value);
break;
default:
@@ -191,6 +226,13 @@ export namespace Session {
type: value.type,
});
}
+ await write(next);
}
+ const usage = await result.totalUsage;
+ session.tokens.input += usage.inputTokens || 0;
+ session.tokens.output += usage.outputTokens || 0;
+ session.tokens.reasoning += usage.reasoningTokens || 0;
+ await update(session);
+ return next;
}
}
diff --git a/js/src/storage/storage.ts b/js/src/storage/storage.ts
index 19e8dc06f..89c8b4d17 100644
--- a/js/src/storage/storage.ts
+++ b/js/src/storage/storage.ts
@@ -4,10 +4,19 @@ import fs from "fs/promises";
import { Log } from "../util/log";
import { App } from "../app";
import { AppPath } from "../app/path";
+import { Bus } from "../bus";
+import z from "zod/v4";
export namespace Storage {
const log = Log.create({ service: "storage" });
+ export const Event = {
+ Write: Bus.event(
+ "storage.write",
+ z.object({ key: z.string(), body: z.any() }),
+ ),
+ };
+
const state = App.state("storage", async () => {
const app = await App.use();
const storageDir = AppPath.storage(app.root);
@@ -36,4 +45,15 @@ export namespace Storage {
export const read = expose("read");
export const list = expose("list");
export const readToString = expose("readToString");
+
+ export async function readJSON<T>(key: string) {
+ const data = await readToString(key + ".json");
+ return JSON.parse(data) as T;
+ }
+
+ export async function writeJSON<T>(key: string, data: T) {
+ Bus.publish(Event.Write, { key, body: data });
+ const json = JSON.stringify(data);
+ await write(key + ".json", json);
+ }
}
diff --git a/js/src/util/event.ts b/js/src/util/event.ts
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/js/src/util/event.ts
diff --git a/js/src/util/log.ts b/js/src/util/log.ts
index 9de4eb495..8f7157140 100644
--- a/js/src/util/log.ts
+++ b/js/src/util/log.ts
@@ -2,16 +2,21 @@ export namespace Log {
export function create(tags?: Record<string, any>) {
tags = tags || {};
+ function build(message: any, extra?: Record<string, any>) {
+ const prefix = Object.entries({
+ ...tags,
+ ...extra,
+ })
+ .map(([key, value]) => `${key}=${value}`)
+ .join(" ");
+ return [prefix, message];
+ }
const result = {
info(message?: any, extra?: Record<string, any>) {
- const prefix = Object.entries({
- ...tags,
- ...extra,
- })
- .map(([key, value]) => `${key}=${value}`)
- .join(" ");
- console.log(prefix, message);
- return result;
+ console.log(...build(message, extra));
+ },
+ error(message?: any, extra?: Record<string, any>) {
+ console.error(...build(message, extra));
},
tag(key: string, value: string) {
if (tags) tags[key] = value;