summaryrefslogtreecommitdiffhomepage
path: root/src/ollama-client.ts
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-03-24 14:01:20 +0900
committerAdam Malczewski <[email protected]>2026-03-24 14:01:20 +0900
commit7f9b25a1479f9897aea7f85c3fb58a568b0bd642 (patch)
treeecb9dec930ba46c4f749d07ec023c2a1960599f8 /src/ollama-client.ts
parent5a44a97111d304945bbfc3da02d29a83191d816c (diff)
downloadai-pulse-obsidian-plugin-7f9b25a1479f9897aea7f85c3fb58a568b0bd642.tar.gz
ai-pulse-obsidian-plugin-7f9b25a1479f9897aea7f85c3fb58a568b0bd642.zip
add streaming of ai text
Diffstat (limited to 'src/ollama-client.ts')
-rw-r--r--src/ollama-client.ts167
1 files changed, 167 insertions, 0 deletions
diff --git a/src/ollama-client.ts b/src/ollama-client.ts
index 91bd40c..66badc6 100644
--- a/src/ollama-client.ts
+++ b/src/ollama-client.ts
@@ -190,3 +190,170 @@ export async function sendChatMessage(
throw new Error("Tool calling loop exceeded maximum iterations.");
}
+
+/**
+ * Streaming chat options.
+ */
+export interface StreamingChatOptions {
+ ollamaUrl: string;
+ model: string;
+ messages: ChatMessage[];
+ tools?: OllamaToolDefinition[];
+ app?: App;
+ onChunk: (text: string) => void;
+ onToolCall?: (event: ToolCallEvent) => void;
+ abortSignal?: AbortSignal;
+}
+
+/**
+ * Parse ndjson lines from a streamed response body.
+ * Handles partial lines that may span across chunks from the reader.
+ */
+async function* readNdjsonStream(
+ reader: ReadableStreamDefaultReader<Uint8Array>,
+ decoder: TextDecoder,
+): AsyncGenerator<Record<string, unknown>> {
+ let buffer = "";
+
+ while (true) {
+ const { done, value } = await reader.read();
+ if (done) break;
+
+ buffer += decoder.decode(value, { stream: true });
+ const lines = buffer.split("\n");
+ // Last element may be incomplete — keep it in buffer
+ buffer = lines.pop() ?? "";
+
+ for (const line of lines) {
+ const trimmed = line.trim();
+ if (trimmed === "") continue;
+ yield JSON.parse(trimmed) as Record<string, unknown>;
+ }
+ }
+
+ // Process any remaining data in buffer
+ const trimmed = buffer.trim();
+ if (trimmed !== "") {
+ yield JSON.parse(trimmed) as Record<string, unknown>;
+ }
+}
+
+/**
+ * Send a chat message with streaming.
+ * Streams text chunks via onChunk callback. Supports tool-calling agent loop:
+ * tool execution rounds are non-streamed, only the final text response streams.
+ * Returns the full accumulated response text.
+ */
+export async function sendChatMessageStreaming(
+ opts: StreamingChatOptions,
+): Promise<string> {
+ const { ollamaUrl, model, messages, tools, app, onChunk, onToolCall, abortSignal } = opts;
+ const maxIterations = 10;
+ let iterations = 0;
+
+ const workingMessages = messages.map((m) => ({ ...m }));
+
+ while (iterations < maxIterations) {
+ iterations++;
+
+ const body: Record<string, unknown> = {
+ model,
+ messages: workingMessages,
+ stream: true,
+ };
+
+ if (tools !== undefined && tools.length > 0) {
+ body.tools = tools;
+ }
+
+ const response = await fetch(`${ollamaUrl}/api/chat`, {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify(body),
+ signal: abortSignal,
+ });
+
+ if (!response.ok) {
+ throw new Error(`Ollama returned status ${response.status}.`);
+ }
+
+ if (response.body === null) {
+ throw new Error("Response body is null — streaming not supported.");
+ }
+
+ const reader = response.body.getReader();
+ const decoder = new TextDecoder();
+
+ let content = "";
+ const toolCalls: ToolCallResponse[] = [];
+
+ try {
+ for await (const chunk of readNdjsonStream(reader, decoder)) {
+ const msg = chunk.message as Record<string, unknown> | undefined;
+ if (msg !== undefined && msg !== null) {
+ if (typeof msg.content === "string" && msg.content !== "") {
+ content += msg.content;
+ onChunk(msg.content);
+ }
+ if (Array.isArray(msg.tool_calls)) {
+ toolCalls.push(...(msg.tool_calls as ToolCallResponse[]));
+ }
+ }
+ }
+ } catch (err: unknown) {
+ if (err instanceof DOMException && err.name === "AbortError") {
+ // User cancelled — return whatever we accumulated
+ return content;
+ }
+ throw err;
+ }
+
+ // If no tool calls, we're done
+ if (toolCalls.length === 0) {
+ return content;
+ }
+
+ // Tool calling: append assistant message and execute tools
+ const assistantMsg: ChatMessage = {
+ role: "assistant",
+ content,
+ tool_calls: toolCalls,
+ };
+ workingMessages.push(assistantMsg);
+
+ if (app === undefined) {
+ throw new Error("App reference required for tool execution.");
+ }
+
+ for (const tc of toolCalls) {
+ const fnName = tc.function.name;
+ const fnArgs = tc.function.arguments;
+ const toolEntry = findToolByName(fnName);
+
+ let result: string;
+ if (toolEntry === undefined) {
+ result = `Error: Unknown tool "${fnName}".`;
+ } else {
+ result = await toolEntry.execute(app, fnArgs);
+ }
+
+ if (onToolCall !== undefined) {
+ const friendlyName = toolEntry !== undefined ? toolEntry.friendlyName : fnName;
+ const summary = toolEntry !== undefined ? toolEntry.summarize(fnArgs) : `Called ${fnName}`;
+ const resultSummary = toolEntry !== undefined ? toolEntry.summarizeResult(result) : "";
+ onToolCall({ toolName: fnName, friendlyName, summary, resultSummary, args: fnArgs, result });
+ }
+
+ workingMessages.push({
+ role: "tool",
+ tool_name: fnName,
+ content: result,
+ });
+ }
+
+ // Reset content for next streaming round
+ // (tool call content was intermediate, next round streams the final answer)
+ }
+
+ throw new Error("Tool calling loop exceeded maximum iterations.");
+}