diff options
| author | Adam Malczewski <[email protected]> | 2026-05-23 04:37:56 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-05-23 04:37:56 +0900 |
| commit | ef427d3eae77fca716c203dd8bd84939710c518a (patch) | |
| tree | 4241c489a199f32206cd947bec506701d86bf659 | |
| parent | 80ce5960c479fe35ab72c822e3b67799d7e1491e (diff) | |
| download | dispatch-ef427d3eae77fca716c203dd8bd84939710c518a.tar.gz dispatch-ef427d3eae77fca716c203dd8bd84939710c518a.zip | |
feat: youtube_transcribe blocks with polling, interruptible with background retrieve
- youtube_transcribe now polls until transcript is ready (waits estimated_seconds - 2s, min 2s)
- Times out after 10 minutes of polling
- When user interrupts, polling continues in background with youtube_transcribe_<uuid> job ID
- BackgroundTranscriptStore holds polling jobs, retrieve tool resolves them
- ToolCallDisplay shows 'interrupted' badge (blue) when result contains [USER INTERRUPT]
- Applies to all interruptible tools: run_shell, youtube_transcribe, retrieve
| -rw-r--r-- | packages/api/src/agent-manager.ts | 16 | ||||
| -rw-r--r-- | packages/core/src/index.ts | 2 | ||||
| -rw-r--r-- | packages/core/src/tools/youtube-transcribe.ts | 237 | ||||
| -rw-r--r-- | packages/frontend/src/lib/components/ToolCallDisplay.svelte | 4 |
4 files changed, 183 insertions, 76 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts index b0a2f56..3789b68 100644 --- a/packages/api/src/agent-manager.ts +++ b/packages/api/src/agent-manager.ts @@ -12,6 +12,7 @@ import { createRetrieveTool, createRunShellTool, BackgroundShellStore, + BackgroundTranscriptStore, createSkillsWatcher, createSummonTool, createTaskListTool, @@ -141,6 +142,8 @@ interface TabAgent { queueListeners: Array<() => void>; /** Store for shell commands backgrounded due to user interrupt. */ shellStore: BackgroundShellStore; + /** Store for transcript requests backgrounded due to user interrupt. */ + transcriptStore: BackgroundTranscriptStore; } export class AgentManager { @@ -277,6 +280,7 @@ export class AgentManager { messageQueue: [], queueListeners: [], shellStore: new BackgroundShellStore(), + transcriptStore: new BackgroundTranscriptStore(), }; this.tabAgents.set(tabId, tabAgent); } @@ -360,7 +364,7 @@ export class AgentManager { toolEntries.push({ name: "web_search", tool: createWebSearchTool() }); } if (allowed.has("youtube_transcribe")) { - toolEntries.push({ name: "youtube_transcribe", tool: createYoutubeTranscribeTool() }); + toolEntries.push({ name: "youtube_transcribe", tool: createYoutubeTranscribeTool(tabAgent.transcriptStore) }); } if (allowed.has("todo")) { toolEntries.push({ name: "todo", tool: createTaskListTool(tabAgent.taskList) }); @@ -388,7 +392,9 @@ export class AgentManager { getResult: (id) => tabAgent.shellStore.has(id) ? tabAgent.shellStore.getResult(id) - : this.getChildResult(id), + : tabAgent.transcriptStore.has(id) + ? tabAgent.transcriptStore.getResult(id) + : this.getChildResult(id), }), }); } @@ -405,7 +411,7 @@ export class AgentManager { toolEntries.push({ name: "run_shell", tool: createRunShellTool(workingDirectory, tabAgent.shellStore) }); } toolEntries.push({ name: "web_search", tool: createWebSearchTool() }); - toolEntries.push({ name: "youtube_transcribe", tool: createYoutubeTranscribeTool() }); + toolEntries.push({ name: "youtube_transcribe", tool: createYoutubeTranscribeTool(tabAgent.transcriptStore) }); toolEntries.push({ name: "todo", tool: createTaskListTool(tabAgent.taskList) }); if (permSummon) { // Capture parent's allowed tool names for child permission enforcement @@ -429,7 +435,9 @@ export class AgentManager { getResult: (id) => tabAgent.shellStore.has(id) ? tabAgent.shellStore.getResult(id) - : this.getChildResult(id), + : tabAgent.transcriptStore.has(id) + ? tabAgent.transcriptStore.getResult(id) + : this.getChildResult(id), }), }); } diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index b092fe2..8d5db16 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -57,6 +57,6 @@ export { createSummonTool, type SummonCallbacks } from "./tools/summon.js"; export { createTaskListTool, TaskList } from "./tools/task-list.js"; export { createWebSearchTool } from "./tools/web-search.js"; export { createWriteFileTool } from "./tools/write-file.js"; -export { createYoutubeTranscribeTool } from "./tools/youtube-transcribe.js"; +export { BackgroundTranscriptStore, createYoutubeTranscribeTool } from "./tools/youtube-transcribe.js"; // Types & Permissions export * from "./types/index.js"; diff --git a/packages/core/src/tools/youtube-transcribe.ts b/packages/core/src/tools/youtube-transcribe.ts index 58ca465..cfa006d 100644 --- a/packages/core/src/tools/youtube-transcribe.ts +++ b/packages/core/src/tools/youtube-transcribe.ts @@ -1,14 +1,139 @@ +import { randomUUID } from "node:crypto"; import { z } from "zod"; -import type { ToolDefinition } from "../types/index.js"; +import type { ToolDefinition, ToolExecuteContext } from "../types/index.js"; -export function createYoutubeTranscribeTool(): ToolDefinition { +const TRANSCRIBER_BASE = "http://100.102.55.49:41090"; +const MAX_OUTPUT_CHARS = 60000; +const REQUEST_TIMEOUT_MS = 30000; +const MAX_WAIT_MS = 10 * 60 * 1000; // give up after 10 minutes of polling + +interface TranscriptResponse { + status: string; + video_id?: string; + full_text?: string; + segments?: Array<{ text: string; start: number; duration: number }>; + position?: number; + estimated_seconds?: number; + error?: string; + error_type?: string; +} + +async function fetchTranscript(url: string): Promise<TranscriptResponse> { + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), REQUEST_TIMEOUT_MS); + try { + const apiUrl = `${TRANSCRIBER_BASE}/api/transcript?url=${encodeURIComponent(url)}`; + const response = await fetch(apiUrl, { signal: controller.signal }); + if (!response.ok) { + throw new Error(`Transcriber returned HTTP ${response.status} ${response.statusText}`); + } + return (await response.json()) as TranscriptResponse; + } finally { + clearTimeout(timeout); + } +} + +function formatTime(seconds: number): string { + const mins = Math.floor(seconds / 60); + const secs = Math.floor(seconds % 60); + return `${String(mins).padStart(2, "0")}:${String(secs).padStart(2, "0")}`; +} + +function formatTranscript(data: TranscriptResponse): string { + const segments = data.segments ?? []; + const segmentsText = segments + .map((seg) => `[${formatTime(seg.start)}] ${seg.text}`) + .join("\n"); + + const output = [ + `Video ID: ${data.video_id}`, + "", + "## Transcript", + "", + data.full_text ?? "", + "", + "## Timestamped Segments", + "", + segmentsText, + ].join("\n"); + + return output.length > MAX_OUTPUT_CHARS + ? output.slice(0, MAX_OUTPUT_CHARS) + "\n\n[Transcript truncated]" + : output; +} + +function sleep(ms: number): Promise<void> { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** Polls until the transcript is ready, fails, or times out. */ +async function pollUntilReady(url: string): Promise<string> { + const startTime = Date.now(); + + while (Date.now() - startTime < MAX_WAIT_MS) { + const data = await fetchTranscript(url); + + if (data.status === "completed") { + return formatTranscript(data); + } + + if (data.status === "failed") { + return `Error: Transcription failed for video ${data.video_id ?? "unknown"}: [${data.error_type ?? "unknown"}] ${data.error ?? "no details"}`; + } + + if (data.status === "queued" || data.status === "processing") { + const estimate = data.estimated_seconds ?? 30; + const waitMs = Math.max((estimate - 2) * 1000, 2000); + await sleep(waitMs); + continue; + } + + return `Error: Unexpected transcriber response status: ${data.status}`; + } + + return "Error: Timed out waiting for transcript after 10 minutes."; +} + +/** Store for transcript polls backgrounded due to user interrupt. */ +export class BackgroundTranscriptStore { + private jobs = new Map<string, { url: string; completion: Promise<string> }>(); + + register(url: string, completion: Promise<string>): string { + const id = `youtube_transcribe_${randomUUID()}`; + this.jobs.set(id, { url, completion }); + // Auto-cleanup 10 minutes after completion + completion.finally(() => { + setTimeout(() => this.jobs.delete(id), 10 * 60 * 1000); + }); + return id; + } + + async getResult( + id: string, + ): Promise<{ status: "done"; result: string } | { status: "error"; error: string }> { + const job = this.jobs.get(id); + if (!job) { + return { status: "error", error: `No background transcript job found with id '${id}'` }; + } + const result = await job.completion; + return { status: "done", result }; + } + + has(id: string): boolean { + return this.jobs.has(id); + } +} + +export function createYoutubeTranscribeTool( + transcriptStore?: BackgroundTranscriptStore, +): ToolDefinition { return { name: "youtube_transcribe", description: [ - "Fetch the transcript/subtitles for a YouTube video from a local transcriber service.", - "", - "If the transcript has not been downloaded before, the video will be queued for processing.", - "When status is 'queued' or 'processing', call this tool again later to check if the transcript is ready.", + "Fetch the transcript/subtitles for a YouTube video. This tool blocks until the transcript is ready.", + "If the video hasn't been transcribed yet, it will be queued and this tool waits for it automatically.", + "If the user interrupts while waiting, the request continues in the background and you receive a job ID.", + "Use the retrieve tool with that ID to get the transcript later.", "", "Accepted URL formats:", " - youtube.com/watch?v=", @@ -19,86 +144,58 @@ export function createYoutubeTranscribeTool(): ToolDefinition { parameters: z.object({ url: z.string().describe("The YouTube video URL to fetch the transcript for."), }), - execute: async (args: Record<string, unknown>): Promise<string> => { + execute: async ( + args: Record<string, unknown>, + context?: ToolExecuteContext, + ): Promise<string> => { const url = args.url as string; - const controller = new AbortController(); - const timeout = setTimeout(() => controller.abort(), 30000); + const queueCallbacks = context?.queueCallbacks; try { - const apiUrl = `http://100.102.55.49:41090/api/transcript?url=${encodeURIComponent(url)}`; - const response = await fetch(apiUrl, { signal: controller.signal }); + const pollPromise = pollUntilReady(url); - if (!response.ok) { - return `Error: Transcriber service returned HTTP ${response.status} ${response.statusText}`; - } + if (queueCallbacks && transcriptStore) { + const { promise: queuePromise, cancel: cancelQueueWait } = + queueCallbacks.waitForQueuedMessage(); + const queueSignal = queuePromise.then(() => "QUEUE_INTERRUPT" as const); - const data = (await response.json()) as Record<string, unknown>; - const status = data.status as string; - - if (status === "completed") { - const videoId = data.video_id as string; - const fullText = data.full_text as string; - const segments = data.segments as Array<{ text: string; start: number; duration: number }>; - - const formatTime = (seconds: number): string => { - const mins = Math.floor(seconds / 60); - const secs = Math.floor(seconds % 60); - return `${String(mins).padStart(2, "0")}:${String(secs).padStart(2, "0")}`; - }; - - const segmentsText = segments - .map((seg) => `[${formatTime(seg.start)}] ${seg.text}`) - .join("\n"); - - const output = [ - `Video ID: ${videoId}`, - "", - "## Transcript", - "", - fullText, - "", - "## Timestamped Segments", - "", - segmentsText, - ].join("\n"); - - return output.length > 60000 ? output.slice(0, 60000) + "\n\n[Transcript truncated]" : output; - } + const raceResult = await Promise.race([pollPromise, queueSignal]); - if (status === "queued" || status === "processing") { - const videoId = data.video_id as string; - const position = data.position as number; - const estimatedSeconds = data.estimated_seconds as number; - - return [ - `Transcript for video ${videoId} is being processed.`, - `Status: ${status}`, - `Queue position: ${position}`, - `Estimated wait time: ${estimatedSeconds} seconds`, - "", - "You can try calling this tool again later to check if the transcript is ready.", - ].join("\n"); - } + if (raceResult === "QUEUE_INTERRUPT") { + // Background the still-polling request + const jobId = transcriptStore.register(url, pollPromise); + + const queuedMsgs = queueCallbacks.dequeueMessages(); + const userMessages = queuedMsgs.map((m) => m.message).join("\n---\n"); - if (status === "failed") { - const videoId = data.video_id as string; - const error = data.error as string; - const errorType = data.error_type as string; + return [ + `Transcript request backgrounded — still waiting for transcription.`, + `job_id: ${jobId}`, + `url: ${url}`, + ``, + `Use the retrieve tool with this job_id to get the transcript when ready.`, + ``, + `[USER INTERRUPT]`, + `The user has sent you message(s) while you were working. You MUST address these before continuing with your current task:`, + ``, + userMessages, + ].join("\n"); + } - return `Error transcribing video ${videoId}: [${errorType}] ${error}`; + // Poll finished before interrupt + cancelQueueWait(); + return raceResult; } - return `Unexpected response status: ${status}`; + return await pollPromise; } catch (err) { if (err instanceof Error && err.name === "AbortError") { - return "Error: Request to YouTube transcriber timed out after 30 seconds."; + return "Error: Request to YouTube transcriber timed out."; } if (err instanceof Error && (err as NodeJS.ErrnoException).code === "ECONNREFUSED") { - return "Error: Could not connect to YouTube transcriber at http://100.102.55.49:41090. Is it running?"; + return `Error: Could not connect to YouTube transcriber at ${TRANSCRIBER_BASE}. Is it running?`; } return `Error: ${err instanceof Error ? err.message : String(err)}`; - } finally { - clearTimeout(timeout); } }, }; diff --git a/packages/frontend/src/lib/components/ToolCallDisplay.svelte b/packages/frontend/src/lib/components/ToolCallDisplay.svelte index 213ba17..7c7aef6 100644 --- a/packages/frontend/src/lib/components/ToolCallDisplay.svelte +++ b/packages/frontend/src/lib/components/ToolCallDisplay.svelte @@ -70,7 +70,9 @@ const summonAgentId = $derived.by(() => { >Open Tab</button> {/if} {#if toolCall.result !== undefined} - {#if isShell && shellResult !== null} + {#if toolCall.result.includes("[USER INTERRUPT]")} + <span class="badge badge-info badge-sm ml-auto">interrupted</span> + {:else if isShell && shellResult !== null} <span class="badge badge-sm ml-auto {shellResult.exitCode === 0 ? 'badge-success' : 'badge-error'}"> exit {shellResult.exitCode} </span> |
