diff options
| author | Adam Malczewski <[email protected]> | 2026-03-24 14:01:20 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-03-24 14:01:20 +0900 |
| commit | 7f9b25a1479f9897aea7f85c3fb58a568b0bd642 (patch) | |
| tree | ecb9dec930ba46c4f749d07ec023c2a1960599f8 /src/ollama-client.ts | |
| parent | 5a44a97111d304945bbfc3da02d29a83191d816c (diff) | |
| download | ai-pulse-obsidian-plugin-7f9b25a1479f9897aea7f85c3fb58a568b0bd642.tar.gz ai-pulse-obsidian-plugin-7f9b25a1479f9897aea7f85c3fb58a568b0bd642.zip | |
add streaming of ai text
Diffstat (limited to 'src/ollama-client.ts')
| -rw-r--r-- | src/ollama-client.ts | 167 |
1 files changed, 167 insertions, 0 deletions
diff --git a/src/ollama-client.ts b/src/ollama-client.ts index 91bd40c..66badc6 100644 --- a/src/ollama-client.ts +++ b/src/ollama-client.ts @@ -190,3 +190,170 @@ export async function sendChatMessage( throw new Error("Tool calling loop exceeded maximum iterations."); } + +/** + * Streaming chat options. + */ +export interface StreamingChatOptions { + ollamaUrl: string; + model: string; + messages: ChatMessage[]; + tools?: OllamaToolDefinition[]; + app?: App; + onChunk: (text: string) => void; + onToolCall?: (event: ToolCallEvent) => void; + abortSignal?: AbortSignal; +} + +/** + * Parse ndjson lines from a streamed response body. + * Handles partial lines that may span across chunks from the reader. + */ +async function* readNdjsonStream( + reader: ReadableStreamDefaultReader<Uint8Array>, + decoder: TextDecoder, +): AsyncGenerator<Record<string, unknown>> { + let buffer = ""; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split("\n"); + // Last element may be incomplete — keep it in buffer + buffer = lines.pop() ?? ""; + + for (const line of lines) { + const trimmed = line.trim(); + if (trimmed === "") continue; + yield JSON.parse(trimmed) as Record<string, unknown>; + } + } + + // Process any remaining data in buffer + const trimmed = buffer.trim(); + if (trimmed !== "") { + yield JSON.parse(trimmed) as Record<string, unknown>; + } +} + +/** + * Send a chat message with streaming. + * Streams text chunks via onChunk callback. Supports tool-calling agent loop: + * tool execution rounds are non-streamed, only the final text response streams. + * Returns the full accumulated response text. + */ +export async function sendChatMessageStreaming( + opts: StreamingChatOptions, +): Promise<string> { + const { ollamaUrl, model, messages, tools, app, onChunk, onToolCall, abortSignal } = opts; + const maxIterations = 10; + let iterations = 0; + + const workingMessages = messages.map((m) => ({ ...m })); + + while (iterations < maxIterations) { + iterations++; + + const body: Record<string, unknown> = { + model, + messages: workingMessages, + stream: true, + }; + + if (tools !== undefined && tools.length > 0) { + body.tools = tools; + } + + const response = await fetch(`${ollamaUrl}/api/chat`, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(body), + signal: abortSignal, + }); + + if (!response.ok) { + throw new Error(`Ollama returned status ${response.status}.`); + } + + if (response.body === null) { + throw new Error("Response body is null — streaming not supported."); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + + let content = ""; + const toolCalls: ToolCallResponse[] = []; + + try { + for await (const chunk of readNdjsonStream(reader, decoder)) { + const msg = chunk.message as Record<string, unknown> | undefined; + if (msg !== undefined && msg !== null) { + if (typeof msg.content === "string" && msg.content !== "") { + content += msg.content; + onChunk(msg.content); + } + if (Array.isArray(msg.tool_calls)) { + toolCalls.push(...(msg.tool_calls as ToolCallResponse[])); + } + } + } + } catch (err: unknown) { + if (err instanceof DOMException && err.name === "AbortError") { + // User cancelled — return whatever we accumulated + return content; + } + throw err; + } + + // If no tool calls, we're done + if (toolCalls.length === 0) { + return content; + } + + // Tool calling: append assistant message and execute tools + const assistantMsg: ChatMessage = { + role: "assistant", + content, + tool_calls: toolCalls, + }; + workingMessages.push(assistantMsg); + + if (app === undefined) { + throw new Error("App reference required for tool execution."); + } + + for (const tc of toolCalls) { + const fnName = tc.function.name; + const fnArgs = tc.function.arguments; + const toolEntry = findToolByName(fnName); + + let result: string; + if (toolEntry === undefined) { + result = `Error: Unknown tool "${fnName}".`; + } else { + result = await toolEntry.execute(app, fnArgs); + } + + if (onToolCall !== undefined) { + const friendlyName = toolEntry !== undefined ? toolEntry.friendlyName : fnName; + const summary = toolEntry !== undefined ? toolEntry.summarize(fnArgs) : `Called ${fnName}`; + const resultSummary = toolEntry !== undefined ? toolEntry.summarizeResult(result) : ""; + onToolCall({ toolName: fnName, friendlyName, summary, resultSummary, args: fnArgs, result }); + } + + workingMessages.push({ + role: "tool", + tool_name: fnName, + content: result, + }); + } + + // Reset content for next streaming round + // (tool call content was intermediate, next round streams the final answer) + } + + throw new Error("Tool calling loop exceeded maximum iterations."); +} |
