summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-05-11 19:18:34 +0900
committerAdam Malczewski <[email protected]>2026-05-11 19:18:34 +0900
commitc23ee09f6d24832aa472298db91df3ce6e248a76 (patch)
tree3576678394cf5eb053dc649abdf1dab559d69487
downloadyoutube-transcriber-c23ee09f6d24832aa472298db91df3ce6e248a76.tar.gz
youtube-transcriber-c23ee09f6d24832aa472298db91df3ce6e248a76.zip
Initial commit: YouTube transcriber API with queue-based worker
-rw-r--r--.dockerignore20
-rw-r--r--.gitignore11
-rw-r--r--Dockerfile33
-rw-r--r--Dockerfile.dev26
-rw-r--r--app/__init__.py0
-rw-r--r--app/main.py106
-rw-r--r--app/schemas.py35
-rw-r--r--app/storage.py176
-rw-r--r--app/transcriber.py84
-rw-r--r--app/worker.py58
-rwxr-xr-xbin/build9
-rwxr-xr-xbin/clean9
-rwxr-xr-xbin/down9
-rwxr-xr-xbin/test9
-rwxr-xr-xbin/up9
-rw-r--r--docker-compose.prod.yml11
-rw-r--r--docker-compose.yml13
-rwxr-xr-xdocker/entrypoint.dev.sh6
-rwxr-xr-xdocker/entrypoint.prod.sh4
-rw-r--r--pyproject.toml21
-rw-r--r--tests/__init__.py0
-rw-r--r--tests/test_main.py126
-rw-r--r--tests/test_storage.py250
-rw-r--r--tests/test_transcriber.py60
-rw-r--r--tests/test_worker.py249
25 files changed, 1334 insertions, 0 deletions
diff --git a/.dockerignore b/.dockerignore
new file mode 100644
index 0000000..2bcabc9
--- /dev/null
+++ b/.dockerignore
@@ -0,0 +1,20 @@
+__pycache__/
+*.py[cod]
+*.egg-info/
+dist/
+.venv/
+.env
+*.log
+.pytest_cache/
+.git/
+.gitignore
+.dockerignore
+Dockerfile.dev
+docker-compose.yml
+docker-compose.prod.yml
+bin/
+tests/
+README.md
+data/
+research/
+plan/
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..0baec98
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,11 @@
+__pycache__/
+*.py[cod]
+*.egg-info/
+dist/
+.venv/
+.env
+*.log
+.pytest_cache/
+data/
+research/
+plan/
diff --git a/Dockerfile b/Dockerfile
new file mode 100644
index 0000000..eb25a59
--- /dev/null
+++ b/Dockerfile
@@ -0,0 +1,33 @@
+FROM python:3.12-alpine3.21 AS builder
+
+WORKDIR /app
+
+RUN python -m venv --copies /opt/venv
+ENV PATH="/opt/venv/bin:$PATH"
+
+COPY pyproject.toml ./
+RUN pip install --no-cache-dir $(python -c "import tomllib; print(' '.join(tomllib.load(open('pyproject.toml','rb'))['project']['dependencies']))")
+
+FROM python:3.12-alpine3.21
+
+RUN apk add --no-cache wget
+
+RUN addgroup -g 1001 -S appgroup && adduser -u 1001 -S appuser -G appgroup
+
+WORKDIR /app
+
+COPY --from=builder /opt/venv /opt/venv
+ENV PATH="/opt/venv/bin:$PATH"
+
+COPY app/ ./app/
+
+RUN mkdir -p /app/data && chown appuser:appgroup /app/data
+
+USER appuser
+
+EXPOSE 41090
+
+HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \
+ CMD wget -q -O /dev/null http://localhost:41090/health || exit 1
+
+CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "41090"]
diff --git a/Dockerfile.dev b/Dockerfile.dev
new file mode 100644
index 0000000..1591f41
--- /dev/null
+++ b/Dockerfile.dev
@@ -0,0 +1,26 @@
+FROM python:3.12-alpine3.21
+
+RUN apk add --no-cache bash wget
+
+WORKDIR /app
+
+RUN addgroup -g 1001 -S appgroup && adduser -u 1001 -S appuser -G appgroup
+
+RUN chown -R appuser:appgroup /app
+
+RUN mkdir -p /app/data && chown appuser:appgroup /app/data
+
+COPY --chown=appuser:appgroup pyproject.toml ./
+
+COPY --chown=appuser:appgroup docker/entrypoint.dev.sh /usr/local/bin/entrypoint
+RUN chmod +x /usr/local/bin/entrypoint
+
+EXPOSE 41090
+
+HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \
+ CMD wget -q -O /dev/null http://localhost:41090/health || exit 1
+
+ENV PATH="/home/appuser/.local/bin:$PATH"
+
+ENTRYPOINT ["entrypoint"]
+CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "41090", "--reload"]
diff --git a/app/__init__.py b/app/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/app/__init__.py
diff --git a/app/main.py b/app/main.py
new file mode 100644
index 0000000..bece6e4
--- /dev/null
+++ b/app/main.py
@@ -0,0 +1,106 @@
+import asyncio
+from contextlib import asynccontextmanager
+
+from fastapi import FastAPI, HTTPException, Query
+from fastapi.middleware.cors import CORSMiddleware
+from starlette.requests import Request
+
+from app.schemas import (
+ TranscriptFailedResponse,
+ TranscriptQueuedResponse,
+ TranscriptResponse,
+ TranscriptResultResponse,
+ TranscriptSegment,
+)
+from app.storage import TranscriptStore
+from app.transcriber import InvalidURLError, create_api, extract_video_id
+from app.worker import run_worker
+
+
+@asynccontextmanager
+async def lifespan(app: FastAPI):
+ store = TranscriptStore("data/transcripts.db")
+ await store.initialize()
+ api = create_api()
+ shutdown_event = asyncio.Event()
+ worker_task = asyncio.create_task(run_worker(store, api, shutdown_event))
+ app.state.store = store
+ try:
+ yield
+ finally:
+ shutdown_event.set()
+ worker_task.cancel()
+ try:
+ await worker_task
+ except asyncio.CancelledError:
+ pass
+ await store.close()
+
+
+app = FastAPI(
+ title="YouTube Transcriber",
+ description="API for fetching YouTube video transcripts",
+ version="0.2.0",
+ lifespan=lifespan,
+)
+
+app.add_middleware(
+ CORSMiddleware,
+ allow_origins=["*"],
+ allow_methods=["*"],
+ allow_headers=["*"],
+)
+
+
+async def health() -> dict[str, str]:
+ return {"status": "ok"}
+
+
[email protected]("/api/transcript", response_model=TranscriptResponse)
+async def get_transcript(
+ request: Request,
+ url: str = Query(..., description="YouTube video URL", min_length=1),
+) -> TranscriptResponse:
+ try:
+ video_id = extract_video_id(url)
+ except InvalidURLError:
+ raise HTTPException(status_code=400, detail=f"Invalid YouTube URL: {url}")
+
+ store: TranscriptStore = request.app.state.store
+
+ cached = await store.get_transcript(video_id)
+ if cached is not None:
+ return TranscriptResultResponse(
+ video_id=cached["video_id"],
+ full_text=cached["full_text"],
+ segments=[TranscriptSegment(**s) for s in cached["segments"]],
+ )
+
+ entry = await store.get_queue_entry(video_id)
+ if entry is not None:
+ if entry["status"] == "failed":
+ return TranscriptFailedResponse(
+ video_id=video_id,
+ error=entry["error"] or "",
+ error_type=entry["error_type"] or "",
+ )
+ # pending or processing
+ estimate = await store.get_position_and_estimate(video_id)
+ api_status = "processing" if entry["status"] == "processing" else "queued"
+ return TranscriptQueuedResponse(
+ status=api_status,
+ video_id=video_id,
+ position=estimate["position"],
+ estimated_seconds=estimate["estimated_seconds"],
+ )
+
+ # Not cached and not queued — enqueue it.
+ await store.enqueue(video_id)
+ estimate = await store.get_position_and_estimate(video_id)
+ return TranscriptQueuedResponse(
+ status="queued",
+ video_id=video_id,
+ position=estimate["position"],
+ estimated_seconds=estimate["estimated_seconds"],
+ )
diff --git a/app/schemas.py b/app/schemas.py
new file mode 100644
index 0000000..27ab96f
--- /dev/null
+++ b/app/schemas.py
@@ -0,0 +1,35 @@
+from typing import Literal
+
+from pydantic import BaseModel
+
+
+class TranscriptSegment(BaseModel):
+ text: str
+ start: float
+ duration: float
+
+
+class TranscriptResultResponse(BaseModel):
+ status: Literal["completed"] = "completed"
+ video_id: str
+ full_text: str
+ segments: list[TranscriptSegment]
+
+
+class TranscriptQueuedResponse(BaseModel):
+ status: Literal["queued", "processing"]
+ video_id: str
+ position: int
+ estimated_seconds: float
+
+
+class TranscriptFailedResponse(BaseModel):
+ status: Literal["failed"] = "failed"
+ video_id: str
+ error: str
+ error_type: str
+
+
+TranscriptResponse = (
+ TranscriptResultResponse | TranscriptQueuedResponse | TranscriptFailedResponse
+)
diff --git a/app/storage.py b/app/storage.py
new file mode 100644
index 0000000..2e991d3
--- /dev/null
+++ b/app/storage.py
@@ -0,0 +1,176 @@
+import aiosqlite
+import json
+import os
+import random
+from datetime import datetime
+
+
+class TranscriptStore:
+ """Persists cached transcripts in SQLite."""
+
+ def __init__(self, db_path: str = "data/transcripts.db"):
+ self.db_path = db_path
+ self._db: aiosqlite.Connection | None = None
+
+ async def initialize(self) -> None:
+ """Create the database directory, open the connection, and ensure the table exists."""
+ parent = os.path.dirname(self.db_path)
+ if parent:
+ os.makedirs(parent, exist_ok=True)
+
+ self._db = await aiosqlite.connect(self.db_path)
+ await self._db.execute("PRAGMA journal_mode=WAL")
+ await self._db.execute(
+ """CREATE TABLE IF NOT EXISTS transcripts (
+ video_id TEXT PRIMARY KEY,
+ full_text TEXT NOT NULL,
+ segments TEXT NOT NULL,
+ created_at TEXT NOT NULL
+ )"""
+ )
+ await self._db.execute(
+ """CREATE TABLE IF NOT EXISTS queue (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ video_id TEXT UNIQUE NOT NULL,
+ status TEXT NOT NULL DEFAULT 'pending',
+ assigned_delay REAL NOT NULL,
+ error TEXT,
+ error_type TEXT,
+ created_at TEXT NOT NULL,
+ updated_at TEXT NOT NULL,
+ started_at TEXT
+ )"""
+ )
+ await self._db.commit()
+
+ async def get_transcript(self, video_id: str) -> dict | None:
+ """Retrieve a cached transcript by video_id, or None if not found."""
+ cursor = await self._db.execute(
+ "SELECT video_id, full_text, segments, created_at FROM transcripts WHERE video_id = ?",
+ (video_id,),
+ )
+ row = await cursor.fetchone()
+ if row is None:
+ return None
+ return {
+ "video_id": row[0],
+ "full_text": row[1],
+ "segments": json.loads(row[2]),
+ }
+
+ async def save_transcript(
+ self, video_id: str, full_text: str, segments: list[dict]
+ ) -> None:
+ """Insert or replace a transcript row."""
+ await self._db.execute(
+ """INSERT OR REPLACE INTO transcripts (video_id, full_text, segments, created_at)
+ VALUES (?, ?, ?, ?)""",
+ (video_id, full_text, json.dumps(segments), datetime.utcnow().isoformat()),
+ )
+ await self._db.commit()
+
+ async def enqueue(self, video_id: str) -> dict:
+ """Add a video to the queue (no-op if already queued). Returns the queue entry."""
+ delay = random.uniform(30.0, 60.0)
+ now = datetime.utcnow().isoformat()
+ await self._db.execute(
+ """INSERT OR IGNORE INTO queue
+ (video_id, status, assigned_delay, error, error_type, created_at, updated_at, started_at)
+ VALUES (?, 'pending', ?, NULL, NULL, ?, ?, NULL)""",
+ (video_id, delay, now, now),
+ )
+ await self._db.commit()
+ cursor = await self._db.execute(
+ "SELECT id, video_id, status, assigned_delay, error, error_type, created_at, updated_at, started_at FROM queue WHERE video_id = ?",
+ (video_id,),
+ )
+ row = await cursor.fetchone()
+ return {
+ "id": row[0], "video_id": row[1], "status": row[2],
+ "assigned_delay": row[3], "error": row[4], "error_type": row[5],
+ "created_at": row[6], "updated_at": row[7], "started_at": row[8],
+ }
+
+ async def get_queue_entry(self, video_id: str) -> dict | None:
+ """Return the queue entry for a video, or None if not queued."""
+ cursor = await self._db.execute(
+ "SELECT id, video_id, status, assigned_delay, error, error_type, created_at, updated_at, started_at FROM queue WHERE video_id = ?",
+ (video_id,),
+ )
+ row = await cursor.fetchone()
+ if row is None:
+ return None
+ return {
+ "id": row[0], "video_id": row[1], "status": row[2],
+ "assigned_delay": row[3], "error": row[4], "error_type": row[5],
+ "created_at": row[6], "updated_at": row[7], "started_at": row[8],
+ }
+
+ async def get_next_pending(self) -> dict | None:
+ """Atomically claim the next pending queue entry and mark it as processing."""
+ cursor = await self._db.execute(
+ "SELECT id, video_id, status, assigned_delay, error, error_type, created_at, updated_at, started_at FROM queue WHERE status = 'pending' ORDER BY id ASC LIMIT 1"
+ )
+ row = await cursor.fetchone()
+ if row is None:
+ return None
+ now = datetime.utcnow().isoformat()
+ await self._db.execute(
+ "UPDATE queue SET status = 'processing', started_at = ?, updated_at = ? WHERE video_id = ?",
+ (now, now, row[1]),
+ )
+ await self._db.commit()
+ return {
+ "id": row[0], "video_id": row[1], "status": "processing",
+ "assigned_delay": row[3], "error": row[4], "error_type": row[5],
+ "created_at": row[6], "updated_at": now, "started_at": now,
+ }
+
+ async def mark_completed(self, video_id: str) -> None:
+ """Remove the queue entry for a completed video."""
+ await self._db.execute("DELETE FROM queue WHERE video_id = ?", (video_id,))
+ await self._db.commit()
+
+ async def mark_failed(self, video_id: str, error: str, error_type: str) -> None:
+ """Mark a queue entry as failed with an error message and type."""
+ now = datetime.utcnow().isoformat()
+ await self._db.execute(
+ "UPDATE queue SET status = 'failed', error = ?, error_type = ?, updated_at = ? WHERE video_id = ?",
+ (error, error_type, now, video_id),
+ )
+ await self._db.commit()
+
+ async def get_position_and_estimate(self, video_id: str) -> dict | None:
+ """Return queue position (1-indexed) and estimated wait seconds for a video."""
+ entry = await self.get_queue_entry(video_id)
+ if entry is None:
+ return None
+ if entry["status"] == "failed":
+ return {"position": 0, "estimated_seconds": 0.0}
+ now = datetime.utcnow()
+ if entry["status"] == "processing":
+ elapsed = (now - datetime.fromisoformat(entry["started_at"])).total_seconds()
+ remaining = max(0.0, entry["assigned_delay"] - elapsed)
+ return {"position": 0, "estimated_seconds": remaining}
+ # pending
+ cursor = await self._db.execute(
+ "SELECT status, assigned_delay, started_at FROM queue WHERE status IN ('pending', 'processing') AND id < ? ORDER BY id ASC",
+ (entry["id"],),
+ )
+ rows = await cursor.fetchall()
+ total_wait = 0.0
+ for r in rows:
+ if r[0] == "processing":
+ elapsed = (now - datetime.fromisoformat(r[2])).total_seconds()
+ total_wait += max(0.0, r[1] - elapsed)
+ else:
+ total_wait += r[1]
+ total_wait += entry["assigned_delay"]
+ position = len(rows)
+ return {"position": position, "estimated_seconds": total_wait}
+
+ async def close(self) -> None:
+ """Close the database connection if it is open."""
+ if self._db is not None:
+ await self._db.close()
+ self._db = None
diff --git a/app/transcriber.py b/app/transcriber.py
new file mode 100644
index 0000000..090082a
--- /dev/null
+++ b/app/transcriber.py
@@ -0,0 +1,84 @@
+import re
+
+from requests import Session
+from youtube_transcript_api import YouTubeTranscriptApi
+from youtube_transcript_api._errors import (
+ CouldNotRetrieveTranscript,
+ IpBlocked,
+ TranscriptsDisabled,
+ VideoUnavailable,
+)
+
+
+class IPBlockedError(Exception):
+ pass
+
+
+class TranscriptNotAvailableError(Exception):
+ pass
+
+
+class TranscriptDisabledError(Exception):
+ pass
+
+
+class InvalidURLError(Exception):
+ pass
+
+
+_YOUTUBE_URL_PATTERNS = [
+ re.compile(r"(?:https?://)?(?:www\.)?youtube\.com/watch\?v=([A-Za-z0-9_-]{11})"),
+ re.compile(r"(?:https?://)?(?:www\.)?youtube\.com/embed/([A-Za-z0-9_-]{11})"),
+ re.compile(r"(?:https?://)?youtu\.be/([A-Za-z0-9_-]{11})"),
+ re.compile(r"(?:https?://)?(?:www\.)?youtube\.com/shorts/([A-Za-z0-9_-]{11})"),
+]
+
+
+def extract_video_id(url: str) -> str:
+ for pattern in _YOUTUBE_URL_PATTERNS:
+ match = pattern.search(url)
+ if match:
+ return match.group(1)
+ raise InvalidURLError(f"Could not extract YouTube video ID from URL: {url}")
+
+
+def create_api() -> YouTubeTranscriptApi:
+ """Create a YouTubeTranscriptApi with a hardened browser-like Session."""
+ session = Session()
+ session.headers.update(
+ {
+ "User-Agent": (
+ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
+ "AppleWebKit/537.36 (KHTML, like Gecko) "
+ "Chrome/124.0.0.0 Safari/537.36"
+ ),
+ "Accept-Language": "en-US,en;q=0.5",
+ }
+ )
+ return YouTubeTranscriptApi(http_client=session)
+
+
+def fetch_transcript_by_id(video_id: str, api: YouTubeTranscriptApi) -> list[dict]:
+ """Fetch a transcript by video_id using a pre-built API instance."""
+ try:
+ transcript = api.fetch(video_id)
+ except IpBlocked as e:
+ raise IPBlockedError(f"YouTube is blocking requests from this IP. {e}")
+ except VideoUnavailable:
+ raise TranscriptNotAvailableError(f"Video {video_id} is unavailable")
+ except TranscriptsDisabled:
+ raise TranscriptDisabledError(
+ f"Transcripts are disabled for video {video_id}"
+ )
+ except CouldNotRetrieveTranscript:
+ raise TranscriptNotAvailableError(
+ f"No transcript available for video {video_id}"
+ )
+ return transcript.to_raw_data()
+
+
+def fetch_transcript(url: str) -> tuple[str, list[dict]]:
+ video_id = extract_video_id(url)
+ api = create_api()
+ segments = fetch_transcript_by_id(video_id, api)
+ return video_id, segments
diff --git a/app/worker.py b/app/worker.py
new file mode 100644
index 0000000..ede949c
--- /dev/null
+++ b/app/worker.py
@@ -0,0 +1,58 @@
+import asyncio
+
+from youtube_transcript_api import YouTubeTranscriptApi
+
+from app.storage import TranscriptStore
+from app.transcriber import (
+ IPBlockedError,
+ TranscriptDisabledError,
+ TranscriptNotAvailableError,
+ fetch_transcript_by_id,
+)
+
+
+async def process_next(store: TranscriptStore, api: YouTubeTranscriptApi) -> bool:
+ """Process the next pending queue item. Returns True if one was processed."""
+ entry = await store.get_next_pending()
+ if entry is None:
+ return False
+
+ video_id = entry["video_id"]
+
+ try:
+ try:
+ segments = await asyncio.to_thread(fetch_transcript_by_id, video_id, api)
+ except IPBlockedError as e:
+ await store.mark_failed(video_id, str(e), "ip_blocked")
+ return True
+ except TranscriptNotAvailableError as e:
+ await store.mark_failed(video_id, str(e), "not_available")
+ return True
+ except TranscriptDisabledError as e:
+ await store.mark_failed(video_id, str(e), "transcript_disabled")
+ return True
+ except Exception as e:
+ await store.mark_failed(video_id, str(e), "internal_error")
+ return True
+
+ full_text = " ".join(s["text"] for s in segments)
+ await store.save_transcript(video_id, full_text, segments)
+ await store.mark_completed(video_id)
+ return True
+ finally:
+ await asyncio.sleep(entry["assigned_delay"])
+
+
+async def run_worker(
+ store: TranscriptStore,
+ api: YouTubeTranscriptApi,
+ shutdown_event: asyncio.Event,
+) -> None:
+ """Run the queue worker loop until shutdown_event is set."""
+ while not shutdown_event.is_set():
+ processed = await process_next(store, api)
+ if not processed:
+ try:
+ await asyncio.wait_for(shutdown_event.wait(), timeout=1.0)
+ except asyncio.TimeoutError:
+ continue
diff --git a/bin/build b/bin/build
new file mode 100755
index 0000000..9740f81
--- /dev/null
+++ b/bin/build
@@ -0,0 +1,9 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
+ROOT_DIR="$(dirname "$SCRIPT_DIR")"
+
+cd "$ROOT_DIR"
+
+sudo docker build -t youtube-transcriber:latest -f Dockerfile .
diff --git a/bin/clean b/bin/clean
new file mode 100755
index 0000000..e09a99a
--- /dev/null
+++ b/bin/clean
@@ -0,0 +1,9 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
+ROOT_DIR="$(dirname "$SCRIPT_DIR")"
+
+cd "$ROOT_DIR"
+
+sudo docker compose stop && sudo docker compose rm -f && sudo docker volume rm -f "$(sudo docker volume ls -q | grep youtube-transcriber)" 2>/dev/null || true
diff --git a/bin/down b/bin/down
new file mode 100755
index 0000000..6931e7f
--- /dev/null
+++ b/bin/down
@@ -0,0 +1,9 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
+ROOT_DIR="$(dirname "$SCRIPT_DIR")"
+
+cd "$ROOT_DIR"
+
+sudo docker compose down "$@"
diff --git a/bin/test b/bin/test
new file mode 100755
index 0000000..8a3bbe3
--- /dev/null
+++ b/bin/test
@@ -0,0 +1,9 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
+ROOT_DIR="$(dirname "$SCRIPT_DIR")"
+
+cd "$ROOT_DIR"
+
+sudo docker compose exec app pytest "$@"
diff --git a/bin/up b/bin/up
new file mode 100755
index 0000000..3054cea
--- /dev/null
+++ b/bin/up
@@ -0,0 +1,9 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
+ROOT_DIR="$(dirname "$SCRIPT_DIR")"
+
+cd "$ROOT_DIR"
+
+sudo docker compose up --build "$@"
diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml
new file mode 100644
index 0000000..ea294e0
--- /dev/null
+++ b/docker-compose.prod.yml
@@ -0,0 +1,11 @@
+services:
+ app:
+ build:
+ context: .
+ dockerfile: Dockerfile
+ container_name: youtube-transcriber
+ ports:
+ - "41090:41090"
+ volumes:
+ - ./data:/app/data
+ restart: unless-stopped
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644
index 0000000..72cd081
--- /dev/null
+++ b/docker-compose.yml
@@ -0,0 +1,13 @@
+services:
+ app:
+ build:
+ context: .
+ dockerfile: Dockerfile.dev
+ container_name: youtube-transcriber-dev
+ ports:
+ - "41090:41090"
+ volumes:
+ - ./app:/app/app:ro
+ - ./pyproject.toml:/app/pyproject.toml:ro
+ - ./data:/app/data
+ restart: unless-stopped
diff --git a/docker/entrypoint.dev.sh b/docker/entrypoint.dev.sh
new file mode 100755
index 0000000..83bde02
--- /dev/null
+++ b/docker/entrypoint.dev.sh
@@ -0,0 +1,6 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+pip install --no-cache-dir --break-system-packages $(python -c "import tomllib; print(' '.join(tomllib.load(open('/app/pyproject.toml','rb'))['project']['dependencies']))") > /dev/null
+
+exec "$@"
diff --git a/docker/entrypoint.prod.sh b/docker/entrypoint.prod.sh
new file mode 100755
index 0000000..7bc3d2f
--- /dev/null
+++ b/docker/entrypoint.prod.sh
@@ -0,0 +1,4 @@
+#!/usr/bin/env bash
+set -euo pipefail
+
+exec "$@"
diff --git a/pyproject.toml b/pyproject.toml
new file mode 100644
index 0000000..87afb1e
--- /dev/null
+++ b/pyproject.toml
@@ -0,0 +1,21 @@
+[project]
+name = "youtube-transcriber"
+version = "0.1.0"
+description = "API for fetching YouTube video transcripts"
+requires-python = ">=3.12"
+dependencies = [
+ "fastapi>=0.115,<1",
+ "uvicorn[standard]>=0.34,<1",
+ "youtube-transcript-api>=1.2,<2",
+ "pydantic>=2.10,<3",
+ "aiosqlite>=0.20,<1",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=8,<9",
+ "httpx>=0.28,<1",
+]
+
+[tool.pytest.ini_options]
+testpaths = ["tests"]
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/__init__.py
diff --git a/tests/test_main.py b/tests/test_main.py
new file mode 100644
index 0000000..e260747
--- /dev/null
+++ b/tests/test_main.py
@@ -0,0 +1,126 @@
+import asyncio
+import os
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import httpx
+from httpx import ASGITransport
+
+import app.main as main_module
+from app.main import app
+from app.storage import TranscriptStore
+
+
+def _run_with_app(tmp_path, coro_factory):
+ """Initialize app lifespan with a temp-DB store, run coro_factory(client, store), tear down."""
+
+ async def _runner():
+ db_path = os.path.join(str(tmp_path), "test.db")
+
+ def _store_factory(_path):
+ return TranscriptStore(db_path=db_path)
+
+ with patch.object(main_module, "run_worker", new=AsyncMock()), \
+ patch.object(main_module, "create_api", new=MagicMock()), \
+ patch.object(main_module, "TranscriptStore", side_effect=_store_factory):
+ async with main_module.lifespan(app):
+ transport = ASGITransport(app=app)
+ async with httpx.AsyncClient(transport=transport, base_url="http://test") as client:
+ await coro_factory(client, app.state.store)
+
+ asyncio.run(_runner())
+
+
+class TestGetTranscript:
+ def test_health(self, tmp_path) -> None:
+ async def _do(client, store):
+ r = await client.get("/health")
+ assert r.status_code == 200
+ assert r.json() == {"status": "ok"}
+
+ _run_with_app(tmp_path, _do)
+
+ def test_invalid_url_returns_400(self, tmp_path) -> None:
+ async def _do(client, store):
+ r = await client.get("/api/transcript", params={"url": "https://example.com/notavideo"})
+ assert r.status_code == 400
+ assert "Invalid" in r.json()["detail"]
+
+ _run_with_app(tmp_path, _do)
+
+ def test_missing_url_returns_422(self, tmp_path) -> None:
+ async def _do(client, store):
+ r = await client.get("/api/transcript")
+ assert r.status_code == 422
+
+ _run_with_app(tmp_path, _do)
+
+ def test_new_video_returns_queued(self, tmp_path) -> None:
+ async def _do(client, store):
+ r = await client.get(
+ "/api/transcript",
+ params={"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ"},
+ )
+ assert r.status_code == 200
+ body = r.json()
+ assert body["status"] == "queued"
+ assert body["video_id"] == "dQw4w9WgXcQ"
+ assert isinstance(body["estimated_seconds"], (int, float))
+ assert body["estimated_seconds"] > 0
+
+ _run_with_app(tmp_path, _do)
+
+ def test_cached_video_returns_completed(self, tmp_path) -> None:
+ async def _do(client, store):
+ await store.save_transcript(
+ "dQw4w9WgXcQ",
+ "hello world",
+ [{"text": "hello world", "start": 0.0, "duration": 1.0}],
+ )
+ r = await client.get(
+ "/api/transcript",
+ params={"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ"},
+ )
+ assert r.status_code == 200
+ body = r.json()
+ assert body["status"] == "completed"
+ assert body["full_text"] == "hello world"
+ assert isinstance(body["segments"], list)
+ assert len(body["segments"]) == 1
+
+ _run_with_app(tmp_path, _do)
+
+ def test_already_queued_video_returns_existing_status(self, tmp_path) -> None:
+ async def _do(client, store):
+ await store.enqueue("dQw4w9WgXcQ")
+ r = await client.get(
+ "/api/transcript",
+ params={"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ"},
+ )
+ assert r.status_code == 200
+ body = r.json()
+ assert body["status"] == "queued"
+ assert body["video_id"] == "dQw4w9WgXcQ"
+
+ r2 = await client.get(
+ "/api/transcript",
+ params={"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ"},
+ )
+ assert r2.status_code == 200
+ assert r2.json()["video_id"] == "dQw4w9WgXcQ"
+
+ _run_with_app(tmp_path, _do)
+
+ def test_failed_video_returns_failure(self, tmp_path) -> None:
+ async def _do(client, store):
+ await store.enqueue("dQw4w9WgXcQ")
+ await store.mark_failed("dQw4w9WgXcQ", "Transcripts disabled", "transcript_disabled")
+ r = await client.get(
+ "/api/transcript",
+ params={"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ"},
+ )
+ assert r.status_code == 200
+ body = r.json()
+ assert body["status"] == "failed"
+ assert body["error_type"] == "transcript_disabled"
+
+ _run_with_app(tmp_path, _do)
diff --git a/tests/test_storage.py b/tests/test_storage.py
new file mode 100644
index 0000000..b1b914b
--- /dev/null
+++ b/tests/test_storage.py
@@ -0,0 +1,250 @@
+import asyncio
+import os
+
+from app.storage import TranscriptStore
+
+
+class TestTranscriptCache:
+ def test_initialize_creates_database_file(self, tmp_path) -> None:
+ async def _run() -> None:
+ db_path = os.path.join(str(tmp_path), "test.db")
+ store = TranscriptStore(db_path=db_path)
+ await store.initialize()
+ assert os.path.exists(db_path), "Database file should exist after initialize"
+ await store.close()
+
+ asyncio.run(_run())
+
+ def test_get_transcript_returns_none_when_not_found(self, tmp_path) -> None:
+ async def _run() -> None:
+ db_path = os.path.join(str(tmp_path), "test.db")
+ store = TranscriptStore(db_path=db_path)
+ await store.initialize()
+ result = await store.get_transcript("nonexistent_id")
+ assert result is None
+ await store.close()
+
+ asyncio.run(_run())
+
+ def test_save_and_retrieve_transcript(self, tmp_path) -> None:
+ async def _run() -> None:
+ db_path = os.path.join(str(tmp_path), "test.db")
+ store = TranscriptStore(db_path=db_path)
+ await store.initialize()
+
+ segments = [
+ {"text": "hello", "start": 0.0, "duration": 1.0},
+ {"text": "world", "start": 1.0, "duration": 1.0},
+ ]
+ await store.save_transcript(
+ video_id="abc123",
+ full_text="hello world",
+ segments=segments,
+ )
+
+ result = await store.get_transcript("abc123")
+ assert result is not None
+ assert result["video_id"] == "abc123"
+ assert result["full_text"] == "hello world"
+ assert result["segments"] == segments
+ assert isinstance(result["segments"], list)
+ assert all(isinstance(s, dict) for s in result["segments"])
+
+ await store.close()
+
+ asyncio.run(_run())
+
+ def test_save_transcript_overwrites_existing(self, tmp_path) -> None:
+ async def _run() -> None:
+ db_path = os.path.join(str(tmp_path), "test.db")
+ store = TranscriptStore(db_path=db_path)
+ await store.initialize()
+
+ await store.save_transcript(
+ video_id="abc123",
+ full_text="first version",
+ segments=[],
+ )
+ await store.save_transcript(
+ video_id="abc123",
+ full_text="second version",
+ segments=[],
+ )
+
+ result = await store.get_transcript("abc123")
+ assert result is not None
+ assert result["full_text"] == "second version"
+
+ await store.close()
+
+ asyncio.run(_run())
+
+ def test_multiple_transcripts(self, tmp_path) -> None:
+ async def _run() -> None:
+ db_path = os.path.join(str(tmp_path), "test.db")
+ store = TranscriptStore(db_path=db_path)
+ await store.initialize()
+
+ await store.save_transcript(
+ video_id="vid1",
+ full_text="transcript one",
+ segments=[{"text": "one", "start": 0.0, "duration": 1.0}],
+ )
+ await store.save_transcript(
+ video_id="vid2",
+ full_text="transcript two",
+ segments=[{"text": "two", "start": 0.0, "duration": 1.0}],
+ )
+
+ result1 = await store.get_transcript("vid1")
+ assert result1 is not None
+ assert result1["video_id"] == "vid1"
+ assert result1["full_text"] == "transcript one"
+
+ result2 = await store.get_transcript("vid2")
+ assert result2 is not None
+ assert result2["video_id"] == "vid2"
+ assert result2["full_text"] == "transcript two"
+
+ await store.close()
+
+ asyncio.run(_run())
+
+
+class TestQueue:
+ def _make_store(self, tmp_path):
+ return TranscriptStore(db_path=os.path.join(str(tmp_path), "test.db"))
+
+ def test_enqueue_creates_pending_entry(self, tmp_path) -> None:
+ async def _run() -> None:
+ store = self._make_store(tmp_path)
+ await store.initialize()
+ entry = await store.enqueue("vid_001")
+ assert entry["video_id"] == "vid_001"
+ assert entry["status"] == "pending"
+ assert isinstance(entry["assigned_delay"], float)
+ assert 30.0 <= entry["assigned_delay"] <= 60.0
+ assert entry["error"] is None
+ assert entry["error_type"] is None
+ await store.close()
+ asyncio.run(_run())
+
+ def test_enqueue_duplicate_returns_existing(self, tmp_path) -> None:
+ async def _run() -> None:
+ store = self._make_store(tmp_path)
+ await store.initialize()
+ first = await store.enqueue("vid_001")
+ second = await store.enqueue("vid_001")
+ assert first["assigned_delay"] == second["assigned_delay"]
+ assert first["id"] == second["id"]
+ await store.close()
+ asyncio.run(_run())
+
+ def test_get_queue_entry(self, tmp_path) -> None:
+ async def _run() -> None:
+ store = self._make_store(tmp_path)
+ await store.initialize()
+ await store.enqueue("vid_001")
+ entry = await store.get_queue_entry("vid_001")
+ assert entry is not None
+ assert entry["video_id"] == "vid_001"
+ assert entry["status"] == "pending"
+ assert await store.get_queue_entry("nonexistent") is None
+ await store.close()
+ asyncio.run(_run())
+
+ def test_get_next_pending_returns_oldest_first(self, tmp_path) -> None:
+ async def _run() -> None:
+ store = self._make_store(tmp_path)
+ await store.initialize()
+ await store.enqueue("vid_001")
+ await store.enqueue("vid_002")
+ await store.enqueue("vid_003")
+ first = await store.get_next_pending()
+ assert first is not None
+ assert first["video_id"] == "vid_001"
+ assert first["status"] == "processing"
+ assert first["started_at"] is not None
+ second = await store.get_next_pending()
+ assert second is not None
+ assert second["video_id"] == "vid_002"
+ await store.close()
+ asyncio.run(_run())
+
+ def test_get_next_pending_returns_none_when_empty(self, tmp_path) -> None:
+ async def _run() -> None:
+ store = self._make_store(tmp_path)
+ await store.initialize()
+ assert await store.get_next_pending() is None
+ await store.close()
+ asyncio.run(_run())
+
+ def test_mark_completed_removes_entry(self, tmp_path) -> None:
+ async def _run() -> None:
+ store = self._make_store(tmp_path)
+ await store.initialize()
+ await store.enqueue("vid_001")
+ await store.mark_completed("vid_001")
+ assert await store.get_queue_entry("vid_001") is None
+ await store.close()
+ asyncio.run(_run())
+
+ def test_mark_failed_updates_entry(self, tmp_path) -> None:
+ async def _run() -> None:
+ store = self._make_store(tmp_path)
+ await store.initialize()
+ await store.enqueue("vid_001")
+ await store.mark_failed("vid_001", "IP was blocked", "ip_blocked")
+ entry = await store.get_queue_entry("vid_001")
+ assert entry is not None
+ assert entry["status"] == "failed"
+ assert entry["error"] == "IP was blocked"
+ assert entry["error_type"] == "ip_blocked"
+ await store.close()
+ asyncio.run(_run())
+
+ def test_get_position_and_estimate_for_pending(self, tmp_path) -> None:
+ async def _run() -> None:
+ store = self._make_store(tmp_path)
+ await store.initialize()
+ e1 = await store.enqueue("vid_001")
+ e2 = await store.enqueue("vid_002")
+ e3 = await store.enqueue("vid_003")
+ result = await store.get_position_and_estimate("vid_003")
+ assert result is not None
+ assert result["position"] == 2
+ expected = e1["assigned_delay"] + e2["assigned_delay"] + e3["assigned_delay"]
+ assert abs(result["estimated_seconds"] - expected) < 0.01
+ await store.close()
+ asyncio.run(_run())
+
+ def test_get_position_and_estimate_for_first_pending(self, tmp_path) -> None:
+ async def _run() -> None:
+ store = self._make_store(tmp_path)
+ await store.initialize()
+ e1 = await store.enqueue("vid_001")
+ result = await store.get_position_and_estimate("vid_001")
+ assert result is not None
+ assert result["position"] == 0
+ assert abs(result["estimated_seconds"] - e1["assigned_delay"]) < 0.01
+ await store.close()
+ asyncio.run(_run())
+
+ def test_get_position_and_estimate_for_failed(self, tmp_path) -> None:
+ async def _run() -> None:
+ store = self._make_store(tmp_path)
+ await store.initialize()
+ await store.enqueue("vid_001")
+ await store.mark_failed("vid_001", "error", "type")
+ result = await store.get_position_and_estimate("vid_001")
+ assert result == {"position": 0, "estimated_seconds": 0.0}
+ await store.close()
+ asyncio.run(_run())
+
+ def test_get_position_and_estimate_not_found(self, tmp_path) -> None:
+ async def _run() -> None:
+ store = self._make_store(tmp_path)
+ await store.initialize()
+ assert await store.get_position_and_estimate("nonexistent") is None
+ await store.close()
+ asyncio.run(_run())
diff --git a/tests/test_transcriber.py b/tests/test_transcriber.py
new file mode 100644
index 0000000..c05eab5
--- /dev/null
+++ b/tests/test_transcriber.py
@@ -0,0 +1,60 @@
+from unittest.mock import patch
+
+import pytest
+from youtube_transcript_api._errors import IpBlocked, TranscriptsDisabled
+
+from app.transcriber import (
+ InvalidURLError,
+ IPBlockedError,
+ TranscriptDisabledError,
+ create_api,
+ extract_video_id,
+ fetch_transcript_by_id,
+)
+
+
+class TestExtractVideoId:
+ def test_standard_url(self) -> None:
+ assert extract_video_id("https://www.youtube.com/watch?v=dQw4w9WgXcQ") == "dQw4w9WgXcQ"
+
+ def test_short_url(self) -> None:
+ assert extract_video_id("https://youtu.be/dQw4w9WgXcQ") == "dQw4w9WgXcQ"
+
+ def test_embed_url(self) -> None:
+ assert extract_video_id("https://www.youtube.com/embed/dQw4w9WgXcQ") == "dQw4w9WgXcQ"
+
+ def test_shorts_url(self) -> None:
+ assert extract_video_id("https://www.youtube.com/shorts/dQw4w9WgXcQ") == "dQw4w9WgXcQ"
+
+ def test_no_protocol(self) -> None:
+ assert extract_video_id("youtube.com/watch?v=dQw4w9WgXcQ") == "dQw4w9WgXcQ"
+
+ def test_invalid_url(self) -> None:
+ try:
+ extract_video_id("https://example.com/video")
+ assert False, "Expected InvalidURLError"
+ except InvalidURLError:
+ pass
+
+
+class TestCreateApi:
+ def test_create_api_has_browser_user_agent(self) -> None:
+ api = create_api()
+ # The session is stored on the fetcher inside the API instance.
+ session = api._fetcher._http_client
+ ua = session.headers.get("User-Agent", "")
+ assert "Chrome" in ua
+
+
+class TestFetchTranscriptById:
+ def test_fetch_transcript_by_id_raises_ip_blocked(self) -> None:
+ api = create_api()
+ with patch.object(api, "fetch", side_effect=IpBlocked(video_id="test")):
+ with pytest.raises(IPBlockedError):
+ fetch_transcript_by_id("test", api)
+
+ def test_fetch_transcript_by_id_raises_transcript_disabled(self) -> None:
+ api = create_api()
+ with patch.object(api, "fetch", side_effect=TranscriptsDisabled(video_id="test")):
+ with pytest.raises(TranscriptDisabledError):
+ fetch_transcript_by_id("test", api)
diff --git a/tests/test_worker.py b/tests/test_worker.py
new file mode 100644
index 0000000..0ef523a
--- /dev/null
+++ b/tests/test_worker.py
@@ -0,0 +1,249 @@
+import asyncio
+import os
+from unittest.mock import AsyncMock, MagicMock, patch
+
+from app.storage import TranscriptStore
+from app.transcriber import (
+ IPBlockedError,
+ TranscriptDisabledError,
+)
+from app.worker import process_next
+
+
+def _to_thread_passthrough(func, *args, **kwargs):
+ """Replacement for asyncio.to_thread that runs the function synchronously."""
+
+ async def _coro():
+ return func(*args, **kwargs)
+
+ return _coro()
+
+
+def _patch_to_thread():
+ return patch("app.worker.asyncio.to_thread", new=_to_thread_passthrough)
+
+
+class TestProcessNext:
+ def test_process_next_returns_false_when_queue_empty(self, tmp_path) -> None:
+ async def _run() -> None:
+ db_path = os.path.join(str(tmp_path), "test.db")
+ store = TranscriptStore(db_path=db_path)
+ await store.initialize()
+
+ with patch("app.worker.asyncio.sleep", new_callable=AsyncMock) as mock_sleep:
+ api = MagicMock()
+ result = await process_next(store, api)
+ assert result is False
+ mock_sleep.assert_not_called()
+
+ await store.close()
+
+ asyncio.run(_run())
+
+ def test_process_next_success(self, tmp_path) -> None:
+ async def _run() -> None:
+ db_path = os.path.join(str(tmp_path), "test.db")
+ store = TranscriptStore(db_path=db_path)
+ await store.initialize()
+ entry = await store.enqueue("vid_001")
+
+ with patch("app.worker.asyncio.sleep", new_callable=AsyncMock) as mock_sleep, \
+ _patch_to_thread(), \
+ patch("app.worker.fetch_transcript_by_id", return_value=[{"text": "hello", "start": 0.0, "duration": 1.0}]):
+ api = MagicMock()
+ result = await process_next(store, api)
+ assert result is True
+ mock_sleep.assert_called_once()
+ slept_for = mock_sleep.call_args.args[0]
+ assert 30.0 <= slept_for <= 60.0
+ assert abs(slept_for - entry["assigned_delay"]) < 0.001
+
+ transcript = await store.get_transcript("vid_001")
+ assert transcript is not None
+ assert transcript["full_text"] == "hello"
+ assert await store.get_queue_entry("vid_001") is None
+ await store.close()
+
+ asyncio.run(_run())
+
+ def test_process_next_ip_blocked(self, tmp_path) -> None:
+ async def _run() -> None:
+ db_path = os.path.join(str(tmp_path), "test.db")
+ store = TranscriptStore(db_path=db_path)
+ await store.initialize()
+ await store.enqueue("vid_001")
+
+ with patch("app.worker.asyncio.sleep", new_callable=AsyncMock), \
+ _patch_to_thread(), \
+ patch("app.worker.fetch_transcript_by_id", side_effect=IPBlockedError("blocked")):
+ api = MagicMock()
+ result = await process_next(store, api)
+ assert result is True
+
+ entry = await store.get_queue_entry("vid_001")
+ assert entry is not None
+ assert entry["status"] == "failed"
+ assert entry["error_type"] == "ip_blocked"
+ assert await store.get_transcript("vid_001") is None
+ await store.close()
+
+ asyncio.run(_run())
+
+ def test_process_next_transcript_disabled(self, tmp_path) -> None:
+ async def _run() -> None:
+ db_path = os.path.join(str(tmp_path), "test.db")
+ store = TranscriptStore(db_path=db_path)
+ await store.initialize()
+ await store.enqueue("vid_001")
+
+ with patch("app.worker.asyncio.sleep", new_callable=AsyncMock), \
+ _patch_to_thread(), \
+ patch("app.worker.fetch_transcript_by_id", side_effect=TranscriptDisabledError("disabled")):
+ api = MagicMock()
+ result = await process_next(store, api)
+ assert result is True
+
+ entry = await store.get_queue_entry("vid_001")
+ assert entry is not None
+ assert entry["status"] == "failed"
+ assert entry["error_type"] == "transcript_disabled"
+ await store.close()
+
+ asyncio.run(_run())
+
+ def test_process_next_downloads_before_sleeping(self, tmp_path) -> None:
+ async def _run() -> None:
+ db_path = os.path.join(str(tmp_path), "test.db")
+ store = TranscriptStore(db_path=db_path)
+ await store.initialize()
+ await store.enqueue("vid_001")
+
+ call_order = []
+
+ async def mock_sleep(seconds):
+ call_order.append("sleep")
+
+ def mock_fetch(video_id, api):
+ call_order.append("fetch")
+ return [{"text": "hello", "start": 0.0, "duration": 1.0}]
+
+ with patch("app.worker.asyncio.sleep", side_effect=mock_sleep), \
+ _patch_to_thread(), \
+ patch("app.worker.fetch_transcript_by_id", side_effect=mock_fetch):
+ api = MagicMock()
+ result = await process_next(store, api)
+ assert result is True
+
+ assert call_order == ["fetch", "sleep"]
+ await store.close()
+
+ asyncio.run(_run())
+
+ def test_process_next_sleeps_after_error(self, tmp_path) -> None:
+ async def _run() -> None:
+ db_path = os.path.join(str(tmp_path), "test.db")
+ store = TranscriptStore(db_path=db_path)
+ await store.initialize()
+ entry = await store.enqueue("vid_001")
+
+ with patch("app.worker.asyncio.sleep", new_callable=AsyncMock) as mock_sleep, \
+ _patch_to_thread(), \
+ patch("app.worker.fetch_transcript_by_id", side_effect=IPBlockedError("blocked")):
+ api = MagicMock()
+ result = await process_next(store, api)
+ assert result is True
+ mock_sleep.assert_called_once()
+ slept_for = mock_sleep.call_args.args[0]
+ assert abs(slept_for - entry["assigned_delay"]) < 0.001
+
+ entry = await store.get_queue_entry("vid_001")
+ assert entry is not None
+ assert entry["status"] == "failed"
+ await store.close()
+
+ asyncio.run(_run())
+
+ def test_process_next_sleeps_after_error_before_next_download(self, tmp_path) -> None:
+ async def _run() -> None:
+ db_path = os.path.join(str(tmp_path), "test.db")
+ store = TranscriptStore(db_path=db_path)
+ await store.initialize()
+ await store.enqueue("vid_001")
+
+ call_order = []
+
+ async def mock_sleep(seconds):
+ call_order.append("sleep")
+
+ def mock_fetch(video_id, api):
+ call_order.append("fetch")
+ raise IPBlockedError("blocked")
+
+ with patch("app.worker.asyncio.sleep", side_effect=mock_sleep), \
+ _patch_to_thread(), \
+ patch("app.worker.fetch_transcript_by_id", side_effect=mock_fetch):
+ api = MagicMock()
+ await process_next(store, api)
+
+ assert call_order == ["fetch", "sleep"]
+ await store.close()
+
+ asyncio.run(_run())
+
+ def test_process_next_no_sleep_before_first_download_after_empty_queue(self, tmp_path) -> None:
+ async def _run() -> None:
+ db_path = os.path.join(str(tmp_path), "test.db")
+ store = TranscriptStore(db_path=db_path)
+ await store.initialize()
+
+ call_order = []
+
+ async def mock_sleep(seconds):
+ call_order.append("sleep")
+
+ def mock_fetch(video_id, api):
+ call_order.append("fetch")
+ return [{"text": "hello", "start": 0.0, "duration": 1.0}]
+
+ with patch("app.worker.asyncio.sleep", side_effect=mock_sleep), \
+ _patch_to_thread(), \
+ patch("app.worker.fetch_transcript_by_id", side_effect=mock_fetch):
+ api = MagicMock()
+
+ # Queue is empty — no sleep, no fetch
+ result = await process_next(store, api)
+ assert result is False
+ assert call_order == []
+
+ # Video is added while queue was idle
+ await store.enqueue("vid_001")
+
+ # Next call downloads immediately, then sleeps after
+ result = await process_next(store, api)
+ assert result is True
+ assert call_order == ["fetch", "sleep"]
+
+ await store.close()
+
+ asyncio.run(_run())
+
+ def test_process_next_processes_fifo_order(self, tmp_path) -> None:
+ async def _run() -> None:
+ db_path = os.path.join(str(tmp_path), "test.db")
+ store = TranscriptStore(db_path=db_path)
+ await store.initialize()
+ await store.enqueue("vid_001")
+ await store.enqueue("vid_002")
+
+ with patch("app.worker.asyncio.sleep", new_callable=AsyncMock), \
+ _patch_to_thread(), \
+ patch("app.worker.fetch_transcript_by_id", return_value=[{"text": "first", "start": 0.0, "duration": 1.0}]):
+ api = MagicMock()
+ await process_next(store, api)
+
+ assert (await store.get_transcript("vid_001")) is not None
+ assert (await store.get_transcript("vid_002")) is None
+ assert (await store.get_queue_entry("vid_002")) is not None
+ await store.close()
+
+ asyncio.run(_run())