summaryrefslogtreecommitdiffhomepage
path: root/tests/test_storage.py
blob: b1b914b0efdc6512be19277639c9fdcead5463f6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
import asyncio
import os

from app.storage import TranscriptStore


class TestTranscriptCache:
    def test_initialize_creates_database_file(self, tmp_path) -> None:
        async def _run() -> None:
            db_path = os.path.join(str(tmp_path), "test.db")
            store = TranscriptStore(db_path=db_path)
            await store.initialize()
            assert os.path.exists(db_path), "Database file should exist after initialize"
            await store.close()

        asyncio.run(_run())

    def test_get_transcript_returns_none_when_not_found(self, tmp_path) -> None:
        async def _run() -> None:
            db_path = os.path.join(str(tmp_path), "test.db")
            store = TranscriptStore(db_path=db_path)
            await store.initialize()
            result = await store.get_transcript("nonexistent_id")
            assert result is None
            await store.close()

        asyncio.run(_run())

    def test_save_and_retrieve_transcript(self, tmp_path) -> None:
        async def _run() -> None:
            db_path = os.path.join(str(tmp_path), "test.db")
            store = TranscriptStore(db_path=db_path)
            await store.initialize()

            segments = [
                {"text": "hello", "start": 0.0, "duration": 1.0},
                {"text": "world", "start": 1.0, "duration": 1.0},
            ]
            await store.save_transcript(
                video_id="abc123",
                full_text="hello world",
                segments=segments,
            )

            result = await store.get_transcript("abc123")
            assert result is not None
            assert result["video_id"] == "abc123"
            assert result["full_text"] == "hello world"
            assert result["segments"] == segments
            assert isinstance(result["segments"], list)
            assert all(isinstance(s, dict) for s in result["segments"])

            await store.close()

        asyncio.run(_run())

    def test_save_transcript_overwrites_existing(self, tmp_path) -> None:
        async def _run() -> None:
            db_path = os.path.join(str(tmp_path), "test.db")
            store = TranscriptStore(db_path=db_path)
            await store.initialize()

            await store.save_transcript(
                video_id="abc123",
                full_text="first version",
                segments=[],
            )
            await store.save_transcript(
                video_id="abc123",
                full_text="second version",
                segments=[],
            )

            result = await store.get_transcript("abc123")
            assert result is not None
            assert result["full_text"] == "second version"

            await store.close()

        asyncio.run(_run())

    def test_multiple_transcripts(self, tmp_path) -> None:
        async def _run() -> None:
            db_path = os.path.join(str(tmp_path), "test.db")
            store = TranscriptStore(db_path=db_path)
            await store.initialize()

            await store.save_transcript(
                video_id="vid1",
                full_text="transcript one",
                segments=[{"text": "one", "start": 0.0, "duration": 1.0}],
            )
            await store.save_transcript(
                video_id="vid2",
                full_text="transcript two",
                segments=[{"text": "two", "start": 0.0, "duration": 1.0}],
            )

            result1 = await store.get_transcript("vid1")
            assert result1 is not None
            assert result1["video_id"] == "vid1"
            assert result1["full_text"] == "transcript one"

            result2 = await store.get_transcript("vid2")
            assert result2 is not None
            assert result2["video_id"] == "vid2"
            assert result2["full_text"] == "transcript two"

            await store.close()

        asyncio.run(_run())


class TestQueue:
    def _make_store(self, tmp_path):
        return TranscriptStore(db_path=os.path.join(str(tmp_path), "test.db"))

    def test_enqueue_creates_pending_entry(self, tmp_path) -> None:
        async def _run() -> None:
            store = self._make_store(tmp_path)
            await store.initialize()
            entry = await store.enqueue("vid_001")
            assert entry["video_id"] == "vid_001"
            assert entry["status"] == "pending"
            assert isinstance(entry["assigned_delay"], float)
            assert 30.0 <= entry["assigned_delay"] <= 60.0
            assert entry["error"] is None
            assert entry["error_type"] is None
            await store.close()
        asyncio.run(_run())

    def test_enqueue_duplicate_returns_existing(self, tmp_path) -> None:
        async def _run() -> None:
            store = self._make_store(tmp_path)
            await store.initialize()
            first = await store.enqueue("vid_001")
            second = await store.enqueue("vid_001")
            assert first["assigned_delay"] == second["assigned_delay"]
            assert first["id"] == second["id"]
            await store.close()
        asyncio.run(_run())

    def test_get_queue_entry(self, tmp_path) -> None:
        async def _run() -> None:
            store = self._make_store(tmp_path)
            await store.initialize()
            await store.enqueue("vid_001")
            entry = await store.get_queue_entry("vid_001")
            assert entry is not None
            assert entry["video_id"] == "vid_001"
            assert entry["status"] == "pending"
            assert await store.get_queue_entry("nonexistent") is None
            await store.close()
        asyncio.run(_run())

    def test_get_next_pending_returns_oldest_first(self, tmp_path) -> None:
        async def _run() -> None:
            store = self._make_store(tmp_path)
            await store.initialize()
            await store.enqueue("vid_001")
            await store.enqueue("vid_002")
            await store.enqueue("vid_003")
            first = await store.get_next_pending()
            assert first is not None
            assert first["video_id"] == "vid_001"
            assert first["status"] == "processing"
            assert first["started_at"] is not None
            second = await store.get_next_pending()
            assert second is not None
            assert second["video_id"] == "vid_002"
            await store.close()
        asyncio.run(_run())

    def test_get_next_pending_returns_none_when_empty(self, tmp_path) -> None:
        async def _run() -> None:
            store = self._make_store(tmp_path)
            await store.initialize()
            assert await store.get_next_pending() is None
            await store.close()
        asyncio.run(_run())

    def test_mark_completed_removes_entry(self, tmp_path) -> None:
        async def _run() -> None:
            store = self._make_store(tmp_path)
            await store.initialize()
            await store.enqueue("vid_001")
            await store.mark_completed("vid_001")
            assert await store.get_queue_entry("vid_001") is None
            await store.close()
        asyncio.run(_run())

    def test_mark_failed_updates_entry(self, tmp_path) -> None:
        async def _run() -> None:
            store = self._make_store(tmp_path)
            await store.initialize()
            await store.enqueue("vid_001")
            await store.mark_failed("vid_001", "IP was blocked", "ip_blocked")
            entry = await store.get_queue_entry("vid_001")
            assert entry is not None
            assert entry["status"] == "failed"
            assert entry["error"] == "IP was blocked"
            assert entry["error_type"] == "ip_blocked"
            await store.close()
        asyncio.run(_run())

    def test_get_position_and_estimate_for_pending(self, tmp_path) -> None:
        async def _run() -> None:
            store = self._make_store(tmp_path)
            await store.initialize()
            e1 = await store.enqueue("vid_001")
            e2 = await store.enqueue("vid_002")
            e3 = await store.enqueue("vid_003")
            result = await store.get_position_and_estimate("vid_003")
            assert result is not None
            assert result["position"] == 2
            expected = e1["assigned_delay"] + e2["assigned_delay"] + e3["assigned_delay"]
            assert abs(result["estimated_seconds"] - expected) < 0.01
            await store.close()
        asyncio.run(_run())

    def test_get_position_and_estimate_for_first_pending(self, tmp_path) -> None:
        async def _run() -> None:
            store = self._make_store(tmp_path)
            await store.initialize()
            e1 = await store.enqueue("vid_001")
            result = await store.get_position_and_estimate("vid_001")
            assert result is not None
            assert result["position"] == 0
            assert abs(result["estimated_seconds"] - e1["assigned_delay"]) < 0.01
            await store.close()
        asyncio.run(_run())

    def test_get_position_and_estimate_for_failed(self, tmp_path) -> None:
        async def _run() -> None:
            store = self._make_store(tmp_path)
            await store.initialize()
            await store.enqueue("vid_001")
            await store.mark_failed("vid_001", "error", "type")
            result = await store.get_position_and_estimate("vid_001")
            assert result == {"position": 0, "estimated_seconds": 0.0}
            await store.close()
        asyncio.run(_run())

    def test_get_position_and_estimate_not_found(self, tmp_path) -> None:
        async def _run() -> None:
            store = self._make_store(tmp_path)
            await store.initialize()
            assert await store.get_position_and_estimate("nonexistent") is None
            await store.close()
        asyncio.run(_run())