summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-05-23 04:37:56 +0900
committerAdam Malczewski <[email protected]>2026-05-23 04:37:56 +0900
commitef427d3eae77fca716c203dd8bd84939710c518a (patch)
tree4241c489a199f32206cd947bec506701d86bf659
parent80ce5960c479fe35ab72c822e3b67799d7e1491e (diff)
downloaddispatch-ef427d3eae77fca716c203dd8bd84939710c518a.tar.gz
dispatch-ef427d3eae77fca716c203dd8bd84939710c518a.zip
feat: youtube_transcribe blocks with polling, interruptible with background retrieve
- youtube_transcribe now polls until transcript is ready (waits estimated_seconds - 2s, min 2s) - Times out after 10 minutes of polling - When user interrupts, polling continues in background with youtube_transcribe_<uuid> job ID - BackgroundTranscriptStore holds polling jobs, retrieve tool resolves them - ToolCallDisplay shows 'interrupted' badge (blue) when result contains [USER INTERRUPT] - Applies to all interruptible tools: run_shell, youtube_transcribe, retrieve
-rw-r--r--packages/api/src/agent-manager.ts16
-rw-r--r--packages/core/src/index.ts2
-rw-r--r--packages/core/src/tools/youtube-transcribe.ts237
-rw-r--r--packages/frontend/src/lib/components/ToolCallDisplay.svelte4
4 files changed, 183 insertions, 76 deletions
diff --git a/packages/api/src/agent-manager.ts b/packages/api/src/agent-manager.ts
index b0a2f56..3789b68 100644
--- a/packages/api/src/agent-manager.ts
+++ b/packages/api/src/agent-manager.ts
@@ -12,6 +12,7 @@ import {
createRetrieveTool,
createRunShellTool,
BackgroundShellStore,
+ BackgroundTranscriptStore,
createSkillsWatcher,
createSummonTool,
createTaskListTool,
@@ -141,6 +142,8 @@ interface TabAgent {
queueListeners: Array<() => void>;
/** Store for shell commands backgrounded due to user interrupt. */
shellStore: BackgroundShellStore;
+ /** Store for transcript requests backgrounded due to user interrupt. */
+ transcriptStore: BackgroundTranscriptStore;
}
export class AgentManager {
@@ -277,6 +280,7 @@ export class AgentManager {
messageQueue: [],
queueListeners: [],
shellStore: new BackgroundShellStore(),
+ transcriptStore: new BackgroundTranscriptStore(),
};
this.tabAgents.set(tabId, tabAgent);
}
@@ -360,7 +364,7 @@ export class AgentManager {
toolEntries.push({ name: "web_search", tool: createWebSearchTool() });
}
if (allowed.has("youtube_transcribe")) {
- toolEntries.push({ name: "youtube_transcribe", tool: createYoutubeTranscribeTool() });
+ toolEntries.push({ name: "youtube_transcribe", tool: createYoutubeTranscribeTool(tabAgent.transcriptStore) });
}
if (allowed.has("todo")) {
toolEntries.push({ name: "todo", tool: createTaskListTool(tabAgent.taskList) });
@@ -388,7 +392,9 @@ export class AgentManager {
getResult: (id) =>
tabAgent.shellStore.has(id)
? tabAgent.shellStore.getResult(id)
- : this.getChildResult(id),
+ : tabAgent.transcriptStore.has(id)
+ ? tabAgent.transcriptStore.getResult(id)
+ : this.getChildResult(id),
}),
});
}
@@ -405,7 +411,7 @@ export class AgentManager {
toolEntries.push({ name: "run_shell", tool: createRunShellTool(workingDirectory, tabAgent.shellStore) });
}
toolEntries.push({ name: "web_search", tool: createWebSearchTool() });
- toolEntries.push({ name: "youtube_transcribe", tool: createYoutubeTranscribeTool() });
+ toolEntries.push({ name: "youtube_transcribe", tool: createYoutubeTranscribeTool(tabAgent.transcriptStore) });
toolEntries.push({ name: "todo", tool: createTaskListTool(tabAgent.taskList) });
if (permSummon) {
// Capture parent's allowed tool names for child permission enforcement
@@ -429,7 +435,9 @@ export class AgentManager {
getResult: (id) =>
tabAgent.shellStore.has(id)
? tabAgent.shellStore.getResult(id)
- : this.getChildResult(id),
+ : tabAgent.transcriptStore.has(id)
+ ? tabAgent.transcriptStore.getResult(id)
+ : this.getChildResult(id),
}),
});
}
diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts
index b092fe2..8d5db16 100644
--- a/packages/core/src/index.ts
+++ b/packages/core/src/index.ts
@@ -57,6 +57,6 @@ export { createSummonTool, type SummonCallbacks } from "./tools/summon.js";
export { createTaskListTool, TaskList } from "./tools/task-list.js";
export { createWebSearchTool } from "./tools/web-search.js";
export { createWriteFileTool } from "./tools/write-file.js";
-export { createYoutubeTranscribeTool } from "./tools/youtube-transcribe.js";
+export { BackgroundTranscriptStore, createYoutubeTranscribeTool } from "./tools/youtube-transcribe.js";
// Types & Permissions
export * from "./types/index.js";
diff --git a/packages/core/src/tools/youtube-transcribe.ts b/packages/core/src/tools/youtube-transcribe.ts
index 58ca465..cfa006d 100644
--- a/packages/core/src/tools/youtube-transcribe.ts
+++ b/packages/core/src/tools/youtube-transcribe.ts
@@ -1,14 +1,139 @@
+import { randomUUID } from "node:crypto";
import { z } from "zod";
-import type { ToolDefinition } from "../types/index.js";
+import type { ToolDefinition, ToolExecuteContext } from "../types/index.js";
-export function createYoutubeTranscribeTool(): ToolDefinition {
+const TRANSCRIBER_BASE = "http://100.102.55.49:41090";
+const MAX_OUTPUT_CHARS = 60000;
+const REQUEST_TIMEOUT_MS = 30000;
+const MAX_WAIT_MS = 10 * 60 * 1000; // give up after 10 minutes of polling
+
+interface TranscriptResponse {
+ status: string;
+ video_id?: string;
+ full_text?: string;
+ segments?: Array<{ text: string; start: number; duration: number }>;
+ position?: number;
+ estimated_seconds?: number;
+ error?: string;
+ error_type?: string;
+}
+
+async function fetchTranscript(url: string): Promise<TranscriptResponse> {
+ const controller = new AbortController();
+ const timeout = setTimeout(() => controller.abort(), REQUEST_TIMEOUT_MS);
+ try {
+ const apiUrl = `${TRANSCRIBER_BASE}/api/transcript?url=${encodeURIComponent(url)}`;
+ const response = await fetch(apiUrl, { signal: controller.signal });
+ if (!response.ok) {
+ throw new Error(`Transcriber returned HTTP ${response.status} ${response.statusText}`);
+ }
+ return (await response.json()) as TranscriptResponse;
+ } finally {
+ clearTimeout(timeout);
+ }
+}
+
+function formatTime(seconds: number): string {
+ const mins = Math.floor(seconds / 60);
+ const secs = Math.floor(seconds % 60);
+ return `${String(mins).padStart(2, "0")}:${String(secs).padStart(2, "0")}`;
+}
+
+function formatTranscript(data: TranscriptResponse): string {
+ const segments = data.segments ?? [];
+ const segmentsText = segments
+ .map((seg) => `[${formatTime(seg.start)}] ${seg.text}`)
+ .join("\n");
+
+ const output = [
+ `Video ID: ${data.video_id}`,
+ "",
+ "## Transcript",
+ "",
+ data.full_text ?? "",
+ "",
+ "## Timestamped Segments",
+ "",
+ segmentsText,
+ ].join("\n");
+
+ return output.length > MAX_OUTPUT_CHARS
+ ? output.slice(0, MAX_OUTPUT_CHARS) + "\n\n[Transcript truncated]"
+ : output;
+}
+
+function sleep(ms: number): Promise<void> {
+ return new Promise((resolve) => setTimeout(resolve, ms));
+}
+
+/** Polls until the transcript is ready, fails, or times out. */
+async function pollUntilReady(url: string): Promise<string> {
+ const startTime = Date.now();
+
+ while (Date.now() - startTime < MAX_WAIT_MS) {
+ const data = await fetchTranscript(url);
+
+ if (data.status === "completed") {
+ return formatTranscript(data);
+ }
+
+ if (data.status === "failed") {
+ return `Error: Transcription failed for video ${data.video_id ?? "unknown"}: [${data.error_type ?? "unknown"}] ${data.error ?? "no details"}`;
+ }
+
+ if (data.status === "queued" || data.status === "processing") {
+ const estimate = data.estimated_seconds ?? 30;
+ const waitMs = Math.max((estimate - 2) * 1000, 2000);
+ await sleep(waitMs);
+ continue;
+ }
+
+ return `Error: Unexpected transcriber response status: ${data.status}`;
+ }
+
+ return "Error: Timed out waiting for transcript after 10 minutes.";
+}
+
+/** Store for transcript polls backgrounded due to user interrupt. */
+export class BackgroundTranscriptStore {
+ private jobs = new Map<string, { url: string; completion: Promise<string> }>();
+
+ register(url: string, completion: Promise<string>): string {
+ const id = `youtube_transcribe_${randomUUID()}`;
+ this.jobs.set(id, { url, completion });
+ // Auto-cleanup 10 minutes after completion
+ completion.finally(() => {
+ setTimeout(() => this.jobs.delete(id), 10 * 60 * 1000);
+ });
+ return id;
+ }
+
+ async getResult(
+ id: string,
+ ): Promise<{ status: "done"; result: string } | { status: "error"; error: string }> {
+ const job = this.jobs.get(id);
+ if (!job) {
+ return { status: "error", error: `No background transcript job found with id '${id}'` };
+ }
+ const result = await job.completion;
+ return { status: "done", result };
+ }
+
+ has(id: string): boolean {
+ return this.jobs.has(id);
+ }
+}
+
+export function createYoutubeTranscribeTool(
+ transcriptStore?: BackgroundTranscriptStore,
+): ToolDefinition {
return {
name: "youtube_transcribe",
description: [
- "Fetch the transcript/subtitles for a YouTube video from a local transcriber service.",
- "",
- "If the transcript has not been downloaded before, the video will be queued for processing.",
- "When status is 'queued' or 'processing', call this tool again later to check if the transcript is ready.",
+ "Fetch the transcript/subtitles for a YouTube video. This tool blocks until the transcript is ready.",
+ "If the video hasn't been transcribed yet, it will be queued and this tool waits for it automatically.",
+ "If the user interrupts while waiting, the request continues in the background and you receive a job ID.",
+ "Use the retrieve tool with that ID to get the transcript later.",
"",
"Accepted URL formats:",
" - youtube.com/watch?v=",
@@ -19,86 +144,58 @@ export function createYoutubeTranscribeTool(): ToolDefinition {
parameters: z.object({
url: z.string().describe("The YouTube video URL to fetch the transcript for."),
}),
- execute: async (args: Record<string, unknown>): Promise<string> => {
+ execute: async (
+ args: Record<string, unknown>,
+ context?: ToolExecuteContext,
+ ): Promise<string> => {
const url = args.url as string;
- const controller = new AbortController();
- const timeout = setTimeout(() => controller.abort(), 30000);
+ const queueCallbacks = context?.queueCallbacks;
try {
- const apiUrl = `http://100.102.55.49:41090/api/transcript?url=${encodeURIComponent(url)}`;
- const response = await fetch(apiUrl, { signal: controller.signal });
+ const pollPromise = pollUntilReady(url);
- if (!response.ok) {
- return `Error: Transcriber service returned HTTP ${response.status} ${response.statusText}`;
- }
+ if (queueCallbacks && transcriptStore) {
+ const { promise: queuePromise, cancel: cancelQueueWait } =
+ queueCallbacks.waitForQueuedMessage();
+ const queueSignal = queuePromise.then(() => "QUEUE_INTERRUPT" as const);
- const data = (await response.json()) as Record<string, unknown>;
- const status = data.status as string;
-
- if (status === "completed") {
- const videoId = data.video_id as string;
- const fullText = data.full_text as string;
- const segments = data.segments as Array<{ text: string; start: number; duration: number }>;
-
- const formatTime = (seconds: number): string => {
- const mins = Math.floor(seconds / 60);
- const secs = Math.floor(seconds % 60);
- return `${String(mins).padStart(2, "0")}:${String(secs).padStart(2, "0")}`;
- };
-
- const segmentsText = segments
- .map((seg) => `[${formatTime(seg.start)}] ${seg.text}`)
- .join("\n");
-
- const output = [
- `Video ID: ${videoId}`,
- "",
- "## Transcript",
- "",
- fullText,
- "",
- "## Timestamped Segments",
- "",
- segmentsText,
- ].join("\n");
-
- return output.length > 60000 ? output.slice(0, 60000) + "\n\n[Transcript truncated]" : output;
- }
+ const raceResult = await Promise.race([pollPromise, queueSignal]);
- if (status === "queued" || status === "processing") {
- const videoId = data.video_id as string;
- const position = data.position as number;
- const estimatedSeconds = data.estimated_seconds as number;
-
- return [
- `Transcript for video ${videoId} is being processed.`,
- `Status: ${status}`,
- `Queue position: ${position}`,
- `Estimated wait time: ${estimatedSeconds} seconds`,
- "",
- "You can try calling this tool again later to check if the transcript is ready.",
- ].join("\n");
- }
+ if (raceResult === "QUEUE_INTERRUPT") {
+ // Background the still-polling request
+ const jobId = transcriptStore.register(url, pollPromise);
+
+ const queuedMsgs = queueCallbacks.dequeueMessages();
+ const userMessages = queuedMsgs.map((m) => m.message).join("\n---\n");
- if (status === "failed") {
- const videoId = data.video_id as string;
- const error = data.error as string;
- const errorType = data.error_type as string;
+ return [
+ `Transcript request backgrounded — still waiting for transcription.`,
+ `job_id: ${jobId}`,
+ `url: ${url}`,
+ ``,
+ `Use the retrieve tool with this job_id to get the transcript when ready.`,
+ ``,
+ `[USER INTERRUPT]`,
+ `The user has sent you message(s) while you were working. You MUST address these before continuing with your current task:`,
+ ``,
+ userMessages,
+ ].join("\n");
+ }
- return `Error transcribing video ${videoId}: [${errorType}] ${error}`;
+ // Poll finished before interrupt
+ cancelQueueWait();
+ return raceResult;
}
- return `Unexpected response status: ${status}`;
+ return await pollPromise;
} catch (err) {
if (err instanceof Error && err.name === "AbortError") {
- return "Error: Request to YouTube transcriber timed out after 30 seconds.";
+ return "Error: Request to YouTube transcriber timed out.";
}
if (err instanceof Error && (err as NodeJS.ErrnoException).code === "ECONNREFUSED") {
- return "Error: Could not connect to YouTube transcriber at http://100.102.55.49:41090. Is it running?";
+ return `Error: Could not connect to YouTube transcriber at ${TRANSCRIBER_BASE}. Is it running?`;
}
return `Error: ${err instanceof Error ? err.message : String(err)}`;
- } finally {
- clearTimeout(timeout);
}
},
};
diff --git a/packages/frontend/src/lib/components/ToolCallDisplay.svelte b/packages/frontend/src/lib/components/ToolCallDisplay.svelte
index 213ba17..7c7aef6 100644
--- a/packages/frontend/src/lib/components/ToolCallDisplay.svelte
+++ b/packages/frontend/src/lib/components/ToolCallDisplay.svelte
@@ -70,7 +70,9 @@ const summonAgentId = $derived.by(() => {
>Open Tab</button>
{/if}
{#if toolCall.result !== undefined}
- {#if isShell && shellResult !== null}
+ {#if toolCall.result.includes("[USER INTERRUPT]")}
+ <span class="badge badge-info badge-sm ml-auto">interrupted</span>
+ {:else if isShell && shellResult !== null}
<span class="badge badge-sm ml-auto {shellResult.exitCode === 0 ? 'badge-success' : 'badge-error'}">
exit {shellResult.exitCode}
</span>