diff options
| author | Adam Malczewski <[email protected]> | 2026-05-11 19:18:34 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-05-11 19:18:34 +0900 |
| commit | c23ee09f6d24832aa472298db91df3ce6e248a76 (patch) | |
| tree | 3576678394cf5eb053dc649abdf1dab559d69487 | |
| download | youtube-transcriber-c23ee09f6d24832aa472298db91df3ce6e248a76.tar.gz youtube-transcriber-c23ee09f6d24832aa472298db91df3ce6e248a76.zip | |
Initial commit: YouTube transcriber API with queue-based worker
| -rw-r--r-- | .dockerignore | 20 | ||||
| -rw-r--r-- | .gitignore | 11 | ||||
| -rw-r--r-- | Dockerfile | 33 | ||||
| -rw-r--r-- | Dockerfile.dev | 26 | ||||
| -rw-r--r-- | app/__init__.py | 0 | ||||
| -rw-r--r-- | app/main.py | 106 | ||||
| -rw-r--r-- | app/schemas.py | 35 | ||||
| -rw-r--r-- | app/storage.py | 176 | ||||
| -rw-r--r-- | app/transcriber.py | 84 | ||||
| -rw-r--r-- | app/worker.py | 58 | ||||
| -rwxr-xr-x | bin/build | 9 | ||||
| -rwxr-xr-x | bin/clean | 9 | ||||
| -rwxr-xr-x | bin/down | 9 | ||||
| -rwxr-xr-x | bin/test | 9 | ||||
| -rwxr-xr-x | bin/up | 9 | ||||
| -rw-r--r-- | docker-compose.prod.yml | 11 | ||||
| -rw-r--r-- | docker-compose.yml | 13 | ||||
| -rwxr-xr-x | docker/entrypoint.dev.sh | 6 | ||||
| -rwxr-xr-x | docker/entrypoint.prod.sh | 4 | ||||
| -rw-r--r-- | pyproject.toml | 21 | ||||
| -rw-r--r-- | tests/__init__.py | 0 | ||||
| -rw-r--r-- | tests/test_main.py | 126 | ||||
| -rw-r--r-- | tests/test_storage.py | 250 | ||||
| -rw-r--r-- | tests/test_transcriber.py | 60 | ||||
| -rw-r--r-- | tests/test_worker.py | 249 |
25 files changed, 1334 insertions, 0 deletions
diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..2bcabc9 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,20 @@ +__pycache__/ +*.py[cod] +*.egg-info/ +dist/ +.venv/ +.env +*.log +.pytest_cache/ +.git/ +.gitignore +.dockerignore +Dockerfile.dev +docker-compose.yml +docker-compose.prod.yml +bin/ +tests/ +README.md +data/ +research/ +plan/ diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0baec98 --- /dev/null +++ b/.gitignore @@ -0,0 +1,11 @@ +__pycache__/ +*.py[cod] +*.egg-info/ +dist/ +.venv/ +.env +*.log +.pytest_cache/ +data/ +research/ +plan/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..eb25a59 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,33 @@ +FROM python:3.12-alpine3.21 AS builder + +WORKDIR /app + +RUN python -m venv --copies /opt/venv +ENV PATH="/opt/venv/bin:$PATH" + +COPY pyproject.toml ./ +RUN pip install --no-cache-dir $(python -c "import tomllib; print(' '.join(tomllib.load(open('pyproject.toml','rb'))['project']['dependencies']))") + +FROM python:3.12-alpine3.21 + +RUN apk add --no-cache wget + +RUN addgroup -g 1001 -S appgroup && adduser -u 1001 -S appuser -G appgroup + +WORKDIR /app + +COPY --from=builder /opt/venv /opt/venv +ENV PATH="/opt/venv/bin:$PATH" + +COPY app/ ./app/ + +RUN mkdir -p /app/data && chown appuser:appgroup /app/data + +USER appuser + +EXPOSE 41090 + +HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \ + CMD wget -q -O /dev/null http://localhost:41090/health || exit 1 + +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "41090"] diff --git a/Dockerfile.dev b/Dockerfile.dev new file mode 100644 index 0000000..1591f41 --- /dev/null +++ b/Dockerfile.dev @@ -0,0 +1,26 @@ +FROM python:3.12-alpine3.21 + +RUN apk add --no-cache bash wget + +WORKDIR /app + +RUN addgroup -g 1001 -S appgroup && adduser -u 1001 -S appuser -G appgroup + +RUN chown -R appuser:appgroup /app + +RUN mkdir -p /app/data && chown appuser:appgroup /app/data + +COPY --chown=appuser:appgroup pyproject.toml ./ + +COPY --chown=appuser:appgroup docker/entrypoint.dev.sh /usr/local/bin/entrypoint +RUN chmod +x /usr/local/bin/entrypoint + +EXPOSE 41090 + +HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --retries=3 \ + CMD wget -q -O /dev/null http://localhost:41090/health || exit 1 + +ENV PATH="/home/appuser/.local/bin:$PATH" + +ENTRYPOINT ["entrypoint"] +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "41090", "--reload"] diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/app/__init__.py diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..bece6e4 --- /dev/null +++ b/app/main.py @@ -0,0 +1,106 @@ +import asyncio +from contextlib import asynccontextmanager + +from fastapi import FastAPI, HTTPException, Query +from fastapi.middleware.cors import CORSMiddleware +from starlette.requests import Request + +from app.schemas import ( + TranscriptFailedResponse, + TranscriptQueuedResponse, + TranscriptResponse, + TranscriptResultResponse, + TranscriptSegment, +) +from app.storage import TranscriptStore +from app.transcriber import InvalidURLError, create_api, extract_video_id +from app.worker import run_worker + + +@asynccontextmanager +async def lifespan(app: FastAPI): + store = TranscriptStore("data/transcripts.db") + await store.initialize() + api = create_api() + shutdown_event = asyncio.Event() + worker_task = asyncio.create_task(run_worker(store, api, shutdown_event)) + app.state.store = store + try: + yield + finally: + shutdown_event.set() + worker_task.cancel() + try: + await worker_task + except asyncio.CancelledError: + pass + await store.close() + + +app = FastAPI( + title="YouTube Transcriber", + description="API for fetching YouTube video transcripts", + version="0.2.0", + lifespan=lifespan, +) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) + + [email protected]("/health") +async def health() -> dict[str, str]: + return {"status": "ok"} + + [email protected]("/api/transcript", response_model=TranscriptResponse) +async def get_transcript( + request: Request, + url: str = Query(..., description="YouTube video URL", min_length=1), +) -> TranscriptResponse: + try: + video_id = extract_video_id(url) + except InvalidURLError: + raise HTTPException(status_code=400, detail=f"Invalid YouTube URL: {url}") + + store: TranscriptStore = request.app.state.store + + cached = await store.get_transcript(video_id) + if cached is not None: + return TranscriptResultResponse( + video_id=cached["video_id"], + full_text=cached["full_text"], + segments=[TranscriptSegment(**s) for s in cached["segments"]], + ) + + entry = await store.get_queue_entry(video_id) + if entry is not None: + if entry["status"] == "failed": + return TranscriptFailedResponse( + video_id=video_id, + error=entry["error"] or "", + error_type=entry["error_type"] or "", + ) + # pending or processing + estimate = await store.get_position_and_estimate(video_id) + api_status = "processing" if entry["status"] == "processing" else "queued" + return TranscriptQueuedResponse( + status=api_status, + video_id=video_id, + position=estimate["position"], + estimated_seconds=estimate["estimated_seconds"], + ) + + # Not cached and not queued — enqueue it. + await store.enqueue(video_id) + estimate = await store.get_position_and_estimate(video_id) + return TranscriptQueuedResponse( + status="queued", + video_id=video_id, + position=estimate["position"], + estimated_seconds=estimate["estimated_seconds"], + ) diff --git a/app/schemas.py b/app/schemas.py new file mode 100644 index 0000000..27ab96f --- /dev/null +++ b/app/schemas.py @@ -0,0 +1,35 @@ +from typing import Literal + +from pydantic import BaseModel + + +class TranscriptSegment(BaseModel): + text: str + start: float + duration: float + + +class TranscriptResultResponse(BaseModel): + status: Literal["completed"] = "completed" + video_id: str + full_text: str + segments: list[TranscriptSegment] + + +class TranscriptQueuedResponse(BaseModel): + status: Literal["queued", "processing"] + video_id: str + position: int + estimated_seconds: float + + +class TranscriptFailedResponse(BaseModel): + status: Literal["failed"] = "failed" + video_id: str + error: str + error_type: str + + +TranscriptResponse = ( + TranscriptResultResponse | TranscriptQueuedResponse | TranscriptFailedResponse +) diff --git a/app/storage.py b/app/storage.py new file mode 100644 index 0000000..2e991d3 --- /dev/null +++ b/app/storage.py @@ -0,0 +1,176 @@ +import aiosqlite +import json +import os +import random +from datetime import datetime + + +class TranscriptStore: + """Persists cached transcripts in SQLite.""" + + def __init__(self, db_path: str = "data/transcripts.db"): + self.db_path = db_path + self._db: aiosqlite.Connection | None = None + + async def initialize(self) -> None: + """Create the database directory, open the connection, and ensure the table exists.""" + parent = os.path.dirname(self.db_path) + if parent: + os.makedirs(parent, exist_ok=True) + + self._db = await aiosqlite.connect(self.db_path) + await self._db.execute("PRAGMA journal_mode=WAL") + await self._db.execute( + """CREATE TABLE IF NOT EXISTS transcripts ( + video_id TEXT PRIMARY KEY, + full_text TEXT NOT NULL, + segments TEXT NOT NULL, + created_at TEXT NOT NULL + )""" + ) + await self._db.execute( + """CREATE TABLE IF NOT EXISTS queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + video_id TEXT UNIQUE NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + assigned_delay REAL NOT NULL, + error TEXT, + error_type TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + started_at TEXT + )""" + ) + await self._db.commit() + + async def get_transcript(self, video_id: str) -> dict | None: + """Retrieve a cached transcript by video_id, or None if not found.""" + cursor = await self._db.execute( + "SELECT video_id, full_text, segments, created_at FROM transcripts WHERE video_id = ?", + (video_id,), + ) + row = await cursor.fetchone() + if row is None: + return None + return { + "video_id": row[0], + "full_text": row[1], + "segments": json.loads(row[2]), + } + + async def save_transcript( + self, video_id: str, full_text: str, segments: list[dict] + ) -> None: + """Insert or replace a transcript row.""" + await self._db.execute( + """INSERT OR REPLACE INTO transcripts (video_id, full_text, segments, created_at) + VALUES (?, ?, ?, ?)""", + (video_id, full_text, json.dumps(segments), datetime.utcnow().isoformat()), + ) + await self._db.commit() + + async def enqueue(self, video_id: str) -> dict: + """Add a video to the queue (no-op if already queued). Returns the queue entry.""" + delay = random.uniform(30.0, 60.0) + now = datetime.utcnow().isoformat() + await self._db.execute( + """INSERT OR IGNORE INTO queue + (video_id, status, assigned_delay, error, error_type, created_at, updated_at, started_at) + VALUES (?, 'pending', ?, NULL, NULL, ?, ?, NULL)""", + (video_id, delay, now, now), + ) + await self._db.commit() + cursor = await self._db.execute( + "SELECT id, video_id, status, assigned_delay, error, error_type, created_at, updated_at, started_at FROM queue WHERE video_id = ?", + (video_id,), + ) + row = await cursor.fetchone() + return { + "id": row[0], "video_id": row[1], "status": row[2], + "assigned_delay": row[3], "error": row[4], "error_type": row[5], + "created_at": row[6], "updated_at": row[7], "started_at": row[8], + } + + async def get_queue_entry(self, video_id: str) -> dict | None: + """Return the queue entry for a video, or None if not queued.""" + cursor = await self._db.execute( + "SELECT id, video_id, status, assigned_delay, error, error_type, created_at, updated_at, started_at FROM queue WHERE video_id = ?", + (video_id,), + ) + row = await cursor.fetchone() + if row is None: + return None + return { + "id": row[0], "video_id": row[1], "status": row[2], + "assigned_delay": row[3], "error": row[4], "error_type": row[5], + "created_at": row[6], "updated_at": row[7], "started_at": row[8], + } + + async def get_next_pending(self) -> dict | None: + """Atomically claim the next pending queue entry and mark it as processing.""" + cursor = await self._db.execute( + "SELECT id, video_id, status, assigned_delay, error, error_type, created_at, updated_at, started_at FROM queue WHERE status = 'pending' ORDER BY id ASC LIMIT 1" + ) + row = await cursor.fetchone() + if row is None: + return None + now = datetime.utcnow().isoformat() + await self._db.execute( + "UPDATE queue SET status = 'processing', started_at = ?, updated_at = ? WHERE video_id = ?", + (now, now, row[1]), + ) + await self._db.commit() + return { + "id": row[0], "video_id": row[1], "status": "processing", + "assigned_delay": row[3], "error": row[4], "error_type": row[5], + "created_at": row[6], "updated_at": now, "started_at": now, + } + + async def mark_completed(self, video_id: str) -> None: + """Remove the queue entry for a completed video.""" + await self._db.execute("DELETE FROM queue WHERE video_id = ?", (video_id,)) + await self._db.commit() + + async def mark_failed(self, video_id: str, error: str, error_type: str) -> None: + """Mark a queue entry as failed with an error message and type.""" + now = datetime.utcnow().isoformat() + await self._db.execute( + "UPDATE queue SET status = 'failed', error = ?, error_type = ?, updated_at = ? WHERE video_id = ?", + (error, error_type, now, video_id), + ) + await self._db.commit() + + async def get_position_and_estimate(self, video_id: str) -> dict | None: + """Return queue position (1-indexed) and estimated wait seconds for a video.""" + entry = await self.get_queue_entry(video_id) + if entry is None: + return None + if entry["status"] == "failed": + return {"position": 0, "estimated_seconds": 0.0} + now = datetime.utcnow() + if entry["status"] == "processing": + elapsed = (now - datetime.fromisoformat(entry["started_at"])).total_seconds() + remaining = max(0.0, entry["assigned_delay"] - elapsed) + return {"position": 0, "estimated_seconds": remaining} + # pending + cursor = await self._db.execute( + "SELECT status, assigned_delay, started_at FROM queue WHERE status IN ('pending', 'processing') AND id < ? ORDER BY id ASC", + (entry["id"],), + ) + rows = await cursor.fetchall() + total_wait = 0.0 + for r in rows: + if r[0] == "processing": + elapsed = (now - datetime.fromisoformat(r[2])).total_seconds() + total_wait += max(0.0, r[1] - elapsed) + else: + total_wait += r[1] + total_wait += entry["assigned_delay"] + position = len(rows) + return {"position": position, "estimated_seconds": total_wait} + + async def close(self) -> None: + """Close the database connection if it is open.""" + if self._db is not None: + await self._db.close() + self._db = None diff --git a/app/transcriber.py b/app/transcriber.py new file mode 100644 index 0000000..090082a --- /dev/null +++ b/app/transcriber.py @@ -0,0 +1,84 @@ +import re + +from requests import Session +from youtube_transcript_api import YouTubeTranscriptApi +from youtube_transcript_api._errors import ( + CouldNotRetrieveTranscript, + IpBlocked, + TranscriptsDisabled, + VideoUnavailable, +) + + +class IPBlockedError(Exception): + pass + + +class TranscriptNotAvailableError(Exception): + pass + + +class TranscriptDisabledError(Exception): + pass + + +class InvalidURLError(Exception): + pass + + +_YOUTUBE_URL_PATTERNS = [ + re.compile(r"(?:https?://)?(?:www\.)?youtube\.com/watch\?v=([A-Za-z0-9_-]{11})"), + re.compile(r"(?:https?://)?(?:www\.)?youtube\.com/embed/([A-Za-z0-9_-]{11})"), + re.compile(r"(?:https?://)?youtu\.be/([A-Za-z0-9_-]{11})"), + re.compile(r"(?:https?://)?(?:www\.)?youtube\.com/shorts/([A-Za-z0-9_-]{11})"), +] + + +def extract_video_id(url: str) -> str: + for pattern in _YOUTUBE_URL_PATTERNS: + match = pattern.search(url) + if match: + return match.group(1) + raise InvalidURLError(f"Could not extract YouTube video ID from URL: {url}") + + +def create_api() -> YouTubeTranscriptApi: + """Create a YouTubeTranscriptApi with a hardened browser-like Session.""" + session = Session() + session.headers.update( + { + "User-Agent": ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/124.0.0.0 Safari/537.36" + ), + "Accept-Language": "en-US,en;q=0.5", + } + ) + return YouTubeTranscriptApi(http_client=session) + + +def fetch_transcript_by_id(video_id: str, api: YouTubeTranscriptApi) -> list[dict]: + """Fetch a transcript by video_id using a pre-built API instance.""" + try: + transcript = api.fetch(video_id) + except IpBlocked as e: + raise IPBlockedError(f"YouTube is blocking requests from this IP. {e}") + except VideoUnavailable: + raise TranscriptNotAvailableError(f"Video {video_id} is unavailable") + except TranscriptsDisabled: + raise TranscriptDisabledError( + f"Transcripts are disabled for video {video_id}" + ) + except CouldNotRetrieveTranscript: + raise TranscriptNotAvailableError( + f"No transcript available for video {video_id}" + ) + return transcript.to_raw_data() + + +def fetch_transcript(url: str) -> tuple[str, list[dict]]: + video_id = extract_video_id(url) + api = create_api() + segments = fetch_transcript_by_id(video_id, api) + return video_id, segments diff --git a/app/worker.py b/app/worker.py new file mode 100644 index 0000000..ede949c --- /dev/null +++ b/app/worker.py @@ -0,0 +1,58 @@ +import asyncio + +from youtube_transcript_api import YouTubeTranscriptApi + +from app.storage import TranscriptStore +from app.transcriber import ( + IPBlockedError, + TranscriptDisabledError, + TranscriptNotAvailableError, + fetch_transcript_by_id, +) + + +async def process_next(store: TranscriptStore, api: YouTubeTranscriptApi) -> bool: + """Process the next pending queue item. Returns True if one was processed.""" + entry = await store.get_next_pending() + if entry is None: + return False + + video_id = entry["video_id"] + + try: + try: + segments = await asyncio.to_thread(fetch_transcript_by_id, video_id, api) + except IPBlockedError as e: + await store.mark_failed(video_id, str(e), "ip_blocked") + return True + except TranscriptNotAvailableError as e: + await store.mark_failed(video_id, str(e), "not_available") + return True + except TranscriptDisabledError as e: + await store.mark_failed(video_id, str(e), "transcript_disabled") + return True + except Exception as e: + await store.mark_failed(video_id, str(e), "internal_error") + return True + + full_text = " ".join(s["text"] for s in segments) + await store.save_transcript(video_id, full_text, segments) + await store.mark_completed(video_id) + return True + finally: + await asyncio.sleep(entry["assigned_delay"]) + + +async def run_worker( + store: TranscriptStore, + api: YouTubeTranscriptApi, + shutdown_event: asyncio.Event, +) -> None: + """Run the queue worker loop until shutdown_event is set.""" + while not shutdown_event.is_set(): + processed = await process_next(store, api) + if not processed: + try: + await asyncio.wait_for(shutdown_event.wait(), timeout=1.0) + except asyncio.TimeoutError: + continue diff --git a/bin/build b/bin/build new file mode 100755 index 0000000..9740f81 --- /dev/null +++ b/bin/build @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +ROOT_DIR="$(dirname "$SCRIPT_DIR")" + +cd "$ROOT_DIR" + +sudo docker build -t youtube-transcriber:latest -f Dockerfile . diff --git a/bin/clean b/bin/clean new file mode 100755 index 0000000..e09a99a --- /dev/null +++ b/bin/clean @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +ROOT_DIR="$(dirname "$SCRIPT_DIR")" + +cd "$ROOT_DIR" + +sudo docker compose stop && sudo docker compose rm -f && sudo docker volume rm -f "$(sudo docker volume ls -q | grep youtube-transcriber)" 2>/dev/null || true diff --git a/bin/down b/bin/down new file mode 100755 index 0000000..6931e7f --- /dev/null +++ b/bin/down @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +ROOT_DIR="$(dirname "$SCRIPT_DIR")" + +cd "$ROOT_DIR" + +sudo docker compose down "$@" diff --git a/bin/test b/bin/test new file mode 100755 index 0000000..8a3bbe3 --- /dev/null +++ b/bin/test @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +ROOT_DIR="$(dirname "$SCRIPT_DIR")" + +cd "$ROOT_DIR" + +sudo docker compose exec app pytest "$@" @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" +ROOT_DIR="$(dirname "$SCRIPT_DIR")" + +cd "$ROOT_DIR" + +sudo docker compose up --build "$@" diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml new file mode 100644 index 0000000..ea294e0 --- /dev/null +++ b/docker-compose.prod.yml @@ -0,0 +1,11 @@ +services: + app: + build: + context: . + dockerfile: Dockerfile + container_name: youtube-transcriber + ports: + - "41090:41090" + volumes: + - ./data:/app/data + restart: unless-stopped diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..72cd081 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,13 @@ +services: + app: + build: + context: . + dockerfile: Dockerfile.dev + container_name: youtube-transcriber-dev + ports: + - "41090:41090" + volumes: + - ./app:/app/app:ro + - ./pyproject.toml:/app/pyproject.toml:ro + - ./data:/app/data + restart: unless-stopped diff --git a/docker/entrypoint.dev.sh b/docker/entrypoint.dev.sh new file mode 100755 index 0000000..83bde02 --- /dev/null +++ b/docker/entrypoint.dev.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash +set -euo pipefail + +pip install --no-cache-dir --break-system-packages $(python -c "import tomllib; print(' '.join(tomllib.load(open('/app/pyproject.toml','rb'))['project']['dependencies']))") > /dev/null + +exec "$@" diff --git a/docker/entrypoint.prod.sh b/docker/entrypoint.prod.sh new file mode 100755 index 0000000..7bc3d2f --- /dev/null +++ b/docker/entrypoint.prod.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash +set -euo pipefail + +exec "$@" diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..87afb1e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,21 @@ +[project] +name = "youtube-transcriber" +version = "0.1.0" +description = "API for fetching YouTube video transcripts" +requires-python = ">=3.12" +dependencies = [ + "fastapi>=0.115,<1", + "uvicorn[standard]>=0.34,<1", + "youtube-transcript-api>=1.2,<2", + "pydantic>=2.10,<3", + "aiosqlite>=0.20,<1", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8,<9", + "httpx>=0.28,<1", +] + +[tool.pytest.ini_options] +testpaths = ["tests"] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/__init__.py diff --git a/tests/test_main.py b/tests/test_main.py new file mode 100644 index 0000000..e260747 --- /dev/null +++ b/tests/test_main.py @@ -0,0 +1,126 @@ +import asyncio +import os +from unittest.mock import AsyncMock, MagicMock, patch + +import httpx +from httpx import ASGITransport + +import app.main as main_module +from app.main import app +from app.storage import TranscriptStore + + +def _run_with_app(tmp_path, coro_factory): + """Initialize app lifespan with a temp-DB store, run coro_factory(client, store), tear down.""" + + async def _runner(): + db_path = os.path.join(str(tmp_path), "test.db") + + def _store_factory(_path): + return TranscriptStore(db_path=db_path) + + with patch.object(main_module, "run_worker", new=AsyncMock()), \ + patch.object(main_module, "create_api", new=MagicMock()), \ + patch.object(main_module, "TranscriptStore", side_effect=_store_factory): + async with main_module.lifespan(app): + transport = ASGITransport(app=app) + async with httpx.AsyncClient(transport=transport, base_url="http://test") as client: + await coro_factory(client, app.state.store) + + asyncio.run(_runner()) + + +class TestGetTranscript: + def test_health(self, tmp_path) -> None: + async def _do(client, store): + r = await client.get("/health") + assert r.status_code == 200 + assert r.json() == {"status": "ok"} + + _run_with_app(tmp_path, _do) + + def test_invalid_url_returns_400(self, tmp_path) -> None: + async def _do(client, store): + r = await client.get("/api/transcript", params={"url": "https://example.com/notavideo"}) + assert r.status_code == 400 + assert "Invalid" in r.json()["detail"] + + _run_with_app(tmp_path, _do) + + def test_missing_url_returns_422(self, tmp_path) -> None: + async def _do(client, store): + r = await client.get("/api/transcript") + assert r.status_code == 422 + + _run_with_app(tmp_path, _do) + + def test_new_video_returns_queued(self, tmp_path) -> None: + async def _do(client, store): + r = await client.get( + "/api/transcript", + params={"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ"}, + ) + assert r.status_code == 200 + body = r.json() + assert body["status"] == "queued" + assert body["video_id"] == "dQw4w9WgXcQ" + assert isinstance(body["estimated_seconds"], (int, float)) + assert body["estimated_seconds"] > 0 + + _run_with_app(tmp_path, _do) + + def test_cached_video_returns_completed(self, tmp_path) -> None: + async def _do(client, store): + await store.save_transcript( + "dQw4w9WgXcQ", + "hello world", + [{"text": "hello world", "start": 0.0, "duration": 1.0}], + ) + r = await client.get( + "/api/transcript", + params={"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ"}, + ) + assert r.status_code == 200 + body = r.json() + assert body["status"] == "completed" + assert body["full_text"] == "hello world" + assert isinstance(body["segments"], list) + assert len(body["segments"]) == 1 + + _run_with_app(tmp_path, _do) + + def test_already_queued_video_returns_existing_status(self, tmp_path) -> None: + async def _do(client, store): + await store.enqueue("dQw4w9WgXcQ") + r = await client.get( + "/api/transcript", + params={"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ"}, + ) + assert r.status_code == 200 + body = r.json() + assert body["status"] == "queued" + assert body["video_id"] == "dQw4w9WgXcQ" + + r2 = await client.get( + "/api/transcript", + params={"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ"}, + ) + assert r2.status_code == 200 + assert r2.json()["video_id"] == "dQw4w9WgXcQ" + + _run_with_app(tmp_path, _do) + + def test_failed_video_returns_failure(self, tmp_path) -> None: + async def _do(client, store): + await store.enqueue("dQw4w9WgXcQ") + await store.mark_failed("dQw4w9WgXcQ", "Transcripts disabled", "transcript_disabled") + r = await client.get( + "/api/transcript", + params={"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ"}, + ) + assert r.status_code == 200 + body = r.json() + assert body["status"] == "failed" + assert body["error_type"] == "transcript_disabled" + + _run_with_app(tmp_path, _do) diff --git a/tests/test_storage.py b/tests/test_storage.py new file mode 100644 index 0000000..b1b914b --- /dev/null +++ b/tests/test_storage.py @@ -0,0 +1,250 @@ +import asyncio +import os + +from app.storage import TranscriptStore + + +class TestTranscriptCache: + def test_initialize_creates_database_file(self, tmp_path) -> None: + async def _run() -> None: + db_path = os.path.join(str(tmp_path), "test.db") + store = TranscriptStore(db_path=db_path) + await store.initialize() + assert os.path.exists(db_path), "Database file should exist after initialize" + await store.close() + + asyncio.run(_run()) + + def test_get_transcript_returns_none_when_not_found(self, tmp_path) -> None: + async def _run() -> None: + db_path = os.path.join(str(tmp_path), "test.db") + store = TranscriptStore(db_path=db_path) + await store.initialize() + result = await store.get_transcript("nonexistent_id") + assert result is None + await store.close() + + asyncio.run(_run()) + + def test_save_and_retrieve_transcript(self, tmp_path) -> None: + async def _run() -> None: + db_path = os.path.join(str(tmp_path), "test.db") + store = TranscriptStore(db_path=db_path) + await store.initialize() + + segments = [ + {"text": "hello", "start": 0.0, "duration": 1.0}, + {"text": "world", "start": 1.0, "duration": 1.0}, + ] + await store.save_transcript( + video_id="abc123", + full_text="hello world", + segments=segments, + ) + + result = await store.get_transcript("abc123") + assert result is not None + assert result["video_id"] == "abc123" + assert result["full_text"] == "hello world" + assert result["segments"] == segments + assert isinstance(result["segments"], list) + assert all(isinstance(s, dict) for s in result["segments"]) + + await store.close() + + asyncio.run(_run()) + + def test_save_transcript_overwrites_existing(self, tmp_path) -> None: + async def _run() -> None: + db_path = os.path.join(str(tmp_path), "test.db") + store = TranscriptStore(db_path=db_path) + await store.initialize() + + await store.save_transcript( + video_id="abc123", + full_text="first version", + segments=[], + ) + await store.save_transcript( + video_id="abc123", + full_text="second version", + segments=[], + ) + + result = await store.get_transcript("abc123") + assert result is not None + assert result["full_text"] == "second version" + + await store.close() + + asyncio.run(_run()) + + def test_multiple_transcripts(self, tmp_path) -> None: + async def _run() -> None: + db_path = os.path.join(str(tmp_path), "test.db") + store = TranscriptStore(db_path=db_path) + await store.initialize() + + await store.save_transcript( + video_id="vid1", + full_text="transcript one", + segments=[{"text": "one", "start": 0.0, "duration": 1.0}], + ) + await store.save_transcript( + video_id="vid2", + full_text="transcript two", + segments=[{"text": "two", "start": 0.0, "duration": 1.0}], + ) + + result1 = await store.get_transcript("vid1") + assert result1 is not None + assert result1["video_id"] == "vid1" + assert result1["full_text"] == "transcript one" + + result2 = await store.get_transcript("vid2") + assert result2 is not None + assert result2["video_id"] == "vid2" + assert result2["full_text"] == "transcript two" + + await store.close() + + asyncio.run(_run()) + + +class TestQueue: + def _make_store(self, tmp_path): + return TranscriptStore(db_path=os.path.join(str(tmp_path), "test.db")) + + def test_enqueue_creates_pending_entry(self, tmp_path) -> None: + async def _run() -> None: + store = self._make_store(tmp_path) + await store.initialize() + entry = await store.enqueue("vid_001") + assert entry["video_id"] == "vid_001" + assert entry["status"] == "pending" + assert isinstance(entry["assigned_delay"], float) + assert 30.0 <= entry["assigned_delay"] <= 60.0 + assert entry["error"] is None + assert entry["error_type"] is None + await store.close() + asyncio.run(_run()) + + def test_enqueue_duplicate_returns_existing(self, tmp_path) -> None: + async def _run() -> None: + store = self._make_store(tmp_path) + await store.initialize() + first = await store.enqueue("vid_001") + second = await store.enqueue("vid_001") + assert first["assigned_delay"] == second["assigned_delay"] + assert first["id"] == second["id"] + await store.close() + asyncio.run(_run()) + + def test_get_queue_entry(self, tmp_path) -> None: + async def _run() -> None: + store = self._make_store(tmp_path) + await store.initialize() + await store.enqueue("vid_001") + entry = await store.get_queue_entry("vid_001") + assert entry is not None + assert entry["video_id"] == "vid_001" + assert entry["status"] == "pending" + assert await store.get_queue_entry("nonexistent") is None + await store.close() + asyncio.run(_run()) + + def test_get_next_pending_returns_oldest_first(self, tmp_path) -> None: + async def _run() -> None: + store = self._make_store(tmp_path) + await store.initialize() + await store.enqueue("vid_001") + await store.enqueue("vid_002") + await store.enqueue("vid_003") + first = await store.get_next_pending() + assert first is not None + assert first["video_id"] == "vid_001" + assert first["status"] == "processing" + assert first["started_at"] is not None + second = await store.get_next_pending() + assert second is not None + assert second["video_id"] == "vid_002" + await store.close() + asyncio.run(_run()) + + def test_get_next_pending_returns_none_when_empty(self, tmp_path) -> None: + async def _run() -> None: + store = self._make_store(tmp_path) + await store.initialize() + assert await store.get_next_pending() is None + await store.close() + asyncio.run(_run()) + + def test_mark_completed_removes_entry(self, tmp_path) -> None: + async def _run() -> None: + store = self._make_store(tmp_path) + await store.initialize() + await store.enqueue("vid_001") + await store.mark_completed("vid_001") + assert await store.get_queue_entry("vid_001") is None + await store.close() + asyncio.run(_run()) + + def test_mark_failed_updates_entry(self, tmp_path) -> None: + async def _run() -> None: + store = self._make_store(tmp_path) + await store.initialize() + await store.enqueue("vid_001") + await store.mark_failed("vid_001", "IP was blocked", "ip_blocked") + entry = await store.get_queue_entry("vid_001") + assert entry is not None + assert entry["status"] == "failed" + assert entry["error"] == "IP was blocked" + assert entry["error_type"] == "ip_blocked" + await store.close() + asyncio.run(_run()) + + def test_get_position_and_estimate_for_pending(self, tmp_path) -> None: + async def _run() -> None: + store = self._make_store(tmp_path) + await store.initialize() + e1 = await store.enqueue("vid_001") + e2 = await store.enqueue("vid_002") + e3 = await store.enqueue("vid_003") + result = await store.get_position_and_estimate("vid_003") + assert result is not None + assert result["position"] == 2 + expected = e1["assigned_delay"] + e2["assigned_delay"] + e3["assigned_delay"] + assert abs(result["estimated_seconds"] - expected) < 0.01 + await store.close() + asyncio.run(_run()) + + def test_get_position_and_estimate_for_first_pending(self, tmp_path) -> None: + async def _run() -> None: + store = self._make_store(tmp_path) + await store.initialize() + e1 = await store.enqueue("vid_001") + result = await store.get_position_and_estimate("vid_001") + assert result is not None + assert result["position"] == 0 + assert abs(result["estimated_seconds"] - e1["assigned_delay"]) < 0.01 + await store.close() + asyncio.run(_run()) + + def test_get_position_and_estimate_for_failed(self, tmp_path) -> None: + async def _run() -> None: + store = self._make_store(tmp_path) + await store.initialize() + await store.enqueue("vid_001") + await store.mark_failed("vid_001", "error", "type") + result = await store.get_position_and_estimate("vid_001") + assert result == {"position": 0, "estimated_seconds": 0.0} + await store.close() + asyncio.run(_run()) + + def test_get_position_and_estimate_not_found(self, tmp_path) -> None: + async def _run() -> None: + store = self._make_store(tmp_path) + await store.initialize() + assert await store.get_position_and_estimate("nonexistent") is None + await store.close() + asyncio.run(_run()) diff --git a/tests/test_transcriber.py b/tests/test_transcriber.py new file mode 100644 index 0000000..c05eab5 --- /dev/null +++ b/tests/test_transcriber.py @@ -0,0 +1,60 @@ +from unittest.mock import patch + +import pytest +from youtube_transcript_api._errors import IpBlocked, TranscriptsDisabled + +from app.transcriber import ( + InvalidURLError, + IPBlockedError, + TranscriptDisabledError, + create_api, + extract_video_id, + fetch_transcript_by_id, +) + + +class TestExtractVideoId: + def test_standard_url(self) -> None: + assert extract_video_id("https://www.youtube.com/watch?v=dQw4w9WgXcQ") == "dQw4w9WgXcQ" + + def test_short_url(self) -> None: + assert extract_video_id("https://youtu.be/dQw4w9WgXcQ") == "dQw4w9WgXcQ" + + def test_embed_url(self) -> None: + assert extract_video_id("https://www.youtube.com/embed/dQw4w9WgXcQ") == "dQw4w9WgXcQ" + + def test_shorts_url(self) -> None: + assert extract_video_id("https://www.youtube.com/shorts/dQw4w9WgXcQ") == "dQw4w9WgXcQ" + + def test_no_protocol(self) -> None: + assert extract_video_id("youtube.com/watch?v=dQw4w9WgXcQ") == "dQw4w9WgXcQ" + + def test_invalid_url(self) -> None: + try: + extract_video_id("https://example.com/video") + assert False, "Expected InvalidURLError" + except InvalidURLError: + pass + + +class TestCreateApi: + def test_create_api_has_browser_user_agent(self) -> None: + api = create_api() + # The session is stored on the fetcher inside the API instance. + session = api._fetcher._http_client + ua = session.headers.get("User-Agent", "") + assert "Chrome" in ua + + +class TestFetchTranscriptById: + def test_fetch_transcript_by_id_raises_ip_blocked(self) -> None: + api = create_api() + with patch.object(api, "fetch", side_effect=IpBlocked(video_id="test")): + with pytest.raises(IPBlockedError): + fetch_transcript_by_id("test", api) + + def test_fetch_transcript_by_id_raises_transcript_disabled(self) -> None: + api = create_api() + with patch.object(api, "fetch", side_effect=TranscriptsDisabled(video_id="test")): + with pytest.raises(TranscriptDisabledError): + fetch_transcript_by_id("test", api) diff --git a/tests/test_worker.py b/tests/test_worker.py new file mode 100644 index 0000000..0ef523a --- /dev/null +++ b/tests/test_worker.py @@ -0,0 +1,249 @@ +import asyncio +import os +from unittest.mock import AsyncMock, MagicMock, patch + +from app.storage import TranscriptStore +from app.transcriber import ( + IPBlockedError, + TranscriptDisabledError, +) +from app.worker import process_next + + +def _to_thread_passthrough(func, *args, **kwargs): + """Replacement for asyncio.to_thread that runs the function synchronously.""" + + async def _coro(): + return func(*args, **kwargs) + + return _coro() + + +def _patch_to_thread(): + return patch("app.worker.asyncio.to_thread", new=_to_thread_passthrough) + + +class TestProcessNext: + def test_process_next_returns_false_when_queue_empty(self, tmp_path) -> None: + async def _run() -> None: + db_path = os.path.join(str(tmp_path), "test.db") + store = TranscriptStore(db_path=db_path) + await store.initialize() + + with patch("app.worker.asyncio.sleep", new_callable=AsyncMock) as mock_sleep: + api = MagicMock() + result = await process_next(store, api) + assert result is False + mock_sleep.assert_not_called() + + await store.close() + + asyncio.run(_run()) + + def test_process_next_success(self, tmp_path) -> None: + async def _run() -> None: + db_path = os.path.join(str(tmp_path), "test.db") + store = TranscriptStore(db_path=db_path) + await store.initialize() + entry = await store.enqueue("vid_001") + + with patch("app.worker.asyncio.sleep", new_callable=AsyncMock) as mock_sleep, \ + _patch_to_thread(), \ + patch("app.worker.fetch_transcript_by_id", return_value=[{"text": "hello", "start": 0.0, "duration": 1.0}]): + api = MagicMock() + result = await process_next(store, api) + assert result is True + mock_sleep.assert_called_once() + slept_for = mock_sleep.call_args.args[0] + assert 30.0 <= slept_for <= 60.0 + assert abs(slept_for - entry["assigned_delay"]) < 0.001 + + transcript = await store.get_transcript("vid_001") + assert transcript is not None + assert transcript["full_text"] == "hello" + assert await store.get_queue_entry("vid_001") is None + await store.close() + + asyncio.run(_run()) + + def test_process_next_ip_blocked(self, tmp_path) -> None: + async def _run() -> None: + db_path = os.path.join(str(tmp_path), "test.db") + store = TranscriptStore(db_path=db_path) + await store.initialize() + await store.enqueue("vid_001") + + with patch("app.worker.asyncio.sleep", new_callable=AsyncMock), \ + _patch_to_thread(), \ + patch("app.worker.fetch_transcript_by_id", side_effect=IPBlockedError("blocked")): + api = MagicMock() + result = await process_next(store, api) + assert result is True + + entry = await store.get_queue_entry("vid_001") + assert entry is not None + assert entry["status"] == "failed" + assert entry["error_type"] == "ip_blocked" + assert await store.get_transcript("vid_001") is None + await store.close() + + asyncio.run(_run()) + + def test_process_next_transcript_disabled(self, tmp_path) -> None: + async def _run() -> None: + db_path = os.path.join(str(tmp_path), "test.db") + store = TranscriptStore(db_path=db_path) + await store.initialize() + await store.enqueue("vid_001") + + with patch("app.worker.asyncio.sleep", new_callable=AsyncMock), \ + _patch_to_thread(), \ + patch("app.worker.fetch_transcript_by_id", side_effect=TranscriptDisabledError("disabled")): + api = MagicMock() + result = await process_next(store, api) + assert result is True + + entry = await store.get_queue_entry("vid_001") + assert entry is not None + assert entry["status"] == "failed" + assert entry["error_type"] == "transcript_disabled" + await store.close() + + asyncio.run(_run()) + + def test_process_next_downloads_before_sleeping(self, tmp_path) -> None: + async def _run() -> None: + db_path = os.path.join(str(tmp_path), "test.db") + store = TranscriptStore(db_path=db_path) + await store.initialize() + await store.enqueue("vid_001") + + call_order = [] + + async def mock_sleep(seconds): + call_order.append("sleep") + + def mock_fetch(video_id, api): + call_order.append("fetch") + return [{"text": "hello", "start": 0.0, "duration": 1.0}] + + with patch("app.worker.asyncio.sleep", side_effect=mock_sleep), \ + _patch_to_thread(), \ + patch("app.worker.fetch_transcript_by_id", side_effect=mock_fetch): + api = MagicMock() + result = await process_next(store, api) + assert result is True + + assert call_order == ["fetch", "sleep"] + await store.close() + + asyncio.run(_run()) + + def test_process_next_sleeps_after_error(self, tmp_path) -> None: + async def _run() -> None: + db_path = os.path.join(str(tmp_path), "test.db") + store = TranscriptStore(db_path=db_path) + await store.initialize() + entry = await store.enqueue("vid_001") + + with patch("app.worker.asyncio.sleep", new_callable=AsyncMock) as mock_sleep, \ + _patch_to_thread(), \ + patch("app.worker.fetch_transcript_by_id", side_effect=IPBlockedError("blocked")): + api = MagicMock() + result = await process_next(store, api) + assert result is True + mock_sleep.assert_called_once() + slept_for = mock_sleep.call_args.args[0] + assert abs(slept_for - entry["assigned_delay"]) < 0.001 + + entry = await store.get_queue_entry("vid_001") + assert entry is not None + assert entry["status"] == "failed" + await store.close() + + asyncio.run(_run()) + + def test_process_next_sleeps_after_error_before_next_download(self, tmp_path) -> None: + async def _run() -> None: + db_path = os.path.join(str(tmp_path), "test.db") + store = TranscriptStore(db_path=db_path) + await store.initialize() + await store.enqueue("vid_001") + + call_order = [] + + async def mock_sleep(seconds): + call_order.append("sleep") + + def mock_fetch(video_id, api): + call_order.append("fetch") + raise IPBlockedError("blocked") + + with patch("app.worker.asyncio.sleep", side_effect=mock_sleep), \ + _patch_to_thread(), \ + patch("app.worker.fetch_transcript_by_id", side_effect=mock_fetch): + api = MagicMock() + await process_next(store, api) + + assert call_order == ["fetch", "sleep"] + await store.close() + + asyncio.run(_run()) + + def test_process_next_no_sleep_before_first_download_after_empty_queue(self, tmp_path) -> None: + async def _run() -> None: + db_path = os.path.join(str(tmp_path), "test.db") + store = TranscriptStore(db_path=db_path) + await store.initialize() + + call_order = [] + + async def mock_sleep(seconds): + call_order.append("sleep") + + def mock_fetch(video_id, api): + call_order.append("fetch") + return [{"text": "hello", "start": 0.0, "duration": 1.0}] + + with patch("app.worker.asyncio.sleep", side_effect=mock_sleep), \ + _patch_to_thread(), \ + patch("app.worker.fetch_transcript_by_id", side_effect=mock_fetch): + api = MagicMock() + + # Queue is empty — no sleep, no fetch + result = await process_next(store, api) + assert result is False + assert call_order == [] + + # Video is added while queue was idle + await store.enqueue("vid_001") + + # Next call downloads immediately, then sleeps after + result = await process_next(store, api) + assert result is True + assert call_order == ["fetch", "sleep"] + + await store.close() + + asyncio.run(_run()) + + def test_process_next_processes_fifo_order(self, tmp_path) -> None: + async def _run() -> None: + db_path = os.path.join(str(tmp_path), "test.db") + store = TranscriptStore(db_path=db_path) + await store.initialize() + await store.enqueue("vid_001") + await store.enqueue("vid_002") + + with patch("app.worker.asyncio.sleep", new_callable=AsyncMock), \ + _patch_to_thread(), \ + patch("app.worker.fetch_transcript_by_id", return_value=[{"text": "first", "start": 0.0, "duration": 1.0}]): + api = MagicMock() + await process_next(store, api) + + assert (await store.get_transcript("vid_001")) is not None + assert (await store.get_transcript("vid_002")) is None + assert (await store.get_queue_entry("vid_002")) is not None + await store.close() + + asyncio.run(_run()) |
