diff options
| author | Adam Malczewski <[email protected]> | 2026-06-01 09:30:08 +0900 |
|---|---|---|
| committer | Adam Malczewski <[email protected]> | 2026-06-01 09:30:08 +0900 |
| commit | 7128c624cf076d698bceb354526cf4cd3dfe5434 (patch) | |
| tree | 7ddc5a9d57667a493842ec98e39ce3cc279a2587 | |
| parent | b8bc5c71076d0318291e120e056e8103344493ff (diff) | |
| download | dispatch-7128c624cf076d698bceb354526cf4cd3dfe5434.tar.gz dispatch-7128c624cf076d698bceb354526cf4cd3dfe5434.zip | |
fix(api): wake scheduler — missed-wake recovery, retry consolidation, status surface
Bugs fixed
- Missed wakes silently lost. The old loadScheduleFromDB just pushed any
past next_wake_at to its 'next occurrence' in *server* local time, so a
wake that fired while the API was down never ran — defeating the whole
point of the panel (overnight task picks up after a 5h rate-window
reset). Now: if missed by <= 2h we fire it on the next tick; either way
the entry is rolled forward by 24h-multiple steps.
- Server-TZ drift. nextOccurrenceAt15 used the server's local TZ, so on
a UTC Docker host running for a user in PST the reschedule slowly
migrated the fire time. Now we advance by 24h * N from the original
client-supplied timestamp, preserving the user's wall-clock intent.
- Retry storm. Every failed wake pushed a new entry into a retries[]
array, all converging at the same +5min instant. Replaced with a single
shared pending-retry slot whose budget resets on subsequent failures.
- Retry race with fresh fires. If a tick fired AND a retry was due in
the same iteration we'd double-hit the upstream. Now retries only run
on ticks where no fresh wake fired.
New behavior surfaced on /wake-schedule:
{ schedule, resetOffsetHours, lastWake, pendingRetry }
POST /wake-schedule/toggle now also rejects non-integer hours (4.5, etc.)
and returns the same snapshot shape so the client can stay in sync.
Tests: 9 new HTTP route tests covering snapshot shape, add/remove,
validation (range, integer, past timestamp, missing timestamp), and
independent multi-hour scheduling.
| -rw-r--r-- | packages/api/src/routes/models.ts | 242 | ||||
| -rw-r--r-- | packages/api/tests/routes.test.ts | 92 |
2 files changed, 255 insertions, 79 deletions
diff --git a/packages/api/src/routes/models.ts b/packages/api/src/routes/models.ts index 1a54abb..4fc89eb 100644 --- a/packages/api/src/routes/models.ts +++ b/packages/api/src/routes/models.ts @@ -21,6 +21,11 @@ import { validateAccountCredentials, } from "@dispatch/core"; import { Hono } from "hono"; +import { + CLAUDE_RESET_OFFSET_HOURS, + nextDailyAfter, + recoverScheduleEntry, +} from "../wake-scheduler.js"; let getRegistry: () => ModelRegistry | null = () => null; let getAccounts: () => ClaudeAccount[] = () => []; @@ -608,20 +613,24 @@ modelsRoutes.post("/wake", async (c) => { type WakeSchedule = Record<number, number>; // hour → next wake timestamp (ms) interface PendingRetry { - retriesLeft: number; // starts at 6 (5 min × 6 = 30 min) - nextRetryAt: number; // timestamp for next retry attempt + /** Remaining attempts. Starts at MAX_RETRIES (e.g. 6 → 30 min of retries). */ + retriesLeft: number; + /** Absolute timestamp (ms) of the next retry attempt. */ + nextRetryAt: number; + /** Why we entered retry mode — surfaced on /wake-schedule. */ + reason: string; } -function nextOccurrenceAt15(hour: number): number { - const now = new Date(); - const target = new Date(now); - target.setHours(hour, 15, 0, 0); - if (target.getTime() <= Date.now()) { - target.setDate(target.getDate() + 1); - } - return target.getTime(); +interface LastWake { + firedAt: number; + ok: boolean; + results: Array<{ label: string; ok: boolean; error?: string }>; } +const MAX_RETRIES = 6; +const RETRY_INTERVAL_MS = 5 * 60 * 1000; +const TICK_INTERVAL_MS = 30_000; + function loadScheduleFromDB(): WakeSchedule { try { const db = getDatabase(); @@ -630,16 +639,23 @@ function loadScheduleFromDB(): WakeSchedule { next_wake_at: number; }>; const schedule: WakeSchedule = {}; - let needsUpdate = false; + const now = Date.now(); + let needsPersist = false; for (const row of rows) { - if (row.next_wake_at > Date.now()) { - schedule[row.hour] = row.next_wake_at; - } else { - schedule[row.hour] = nextOccurrenceAt15(row.hour); - needsUpdate = true; + const recovered = recoverScheduleEntry(row.next_wake_at, now); + schedule[row.hour] = recovered.nextWakeAt; + if (recovered.nextWakeAt !== row.next_wake_at) { + needsPersist = true; + } + if (recovered.shouldFireNow) { + // Mark the entry as "due immediately" so the next tick fires it. + // We accomplish this by setting next_wake_at = now; the tick will + // see ts <= now, fire, and advance via nextDailyAfter(). + schedule[row.hour] = now; + needsPersist = true; } } - if (needsUpdate) { + if (needsPersist) { persistSchedule(schedule); } return schedule; @@ -660,82 +676,130 @@ function persistSchedule(scheduleToSave?: WakeSchedule): void { insert.run({ $hour: Number(hour), $nextWakeAt: nextWakeAt }); } } catch { - // Ignore DB errors + // Ignore DB errors — schedule still lives in-memory for this process. } } const wakeSchedule: WakeSchedule = loadScheduleFromDB(); -const pendingRetries: PendingRetry[] = []; -// HMR-safe: clear previous tick before starting a new one -(globalThis as Record<string, unknown>)._dispatchWakeTimer ??= undefined; +/** + * A single shared retry slot. We deliberately do NOT queue one retry per + * failed wake — multiple back-to-back failures (e.g. the network is down for + * five minutes) used to spawn retries that all converged on the same instant + * and hammered the upstream. One in-flight retry covers all accounts. + */ +let pendingRetry: PendingRetry | null = null; +let lastWake: LastWake | null = null; + +// HMR-safe: track the scheduler timer on globalThis so re-imports during dev +// don't leave orphaned timers running. const timerKey = "_dispatchWakeTimer"; +(globalThis as Record<string, unknown>)[timerKey] ??= undefined; let isTickRunning = false; +function recordWake(results: Array<{ label: string; ok: boolean; error?: string }>): boolean { + const ok = results.length > 0 && results.every((r) => r.ok); + lastWake = { firedAt: Date.now(), ok, results }; + return ok; +} + +function scheduleRetry(reason: string): void { + if (pendingRetry) { + // Already retrying — reset the budget so the next failure window covers + // the new incident too, but don't compound timers. + pendingRetry.retriesLeft = MAX_RETRIES; + pendingRetry.nextRetryAt = Date.now() + RETRY_INTERVAL_MS; + pendingRetry.reason = reason; + return; + } + pendingRetry = { + retriesLeft: MAX_RETRIES, + nextRetryAt: Date.now() + RETRY_INTERVAL_MS, + reason, + }; +} + +async function fireWake(reason: string): Promise<void> { + try { + const results = await wakeAllClaudeAccounts(); + const ok = recordWake(results); + if (!ok) scheduleRetry(reason); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + lastWake = { + firedAt: Date.now(), + ok: false, + results: [{ label: "(scheduler)", ok: false, error: message }], + }; + scheduleRetry(reason); + } +} + +async function processPendingRetry(now: number): Promise<void> { + // Capture into a local so TS narrowing survives across awaits, and so a + // racing toggle that clears `pendingRetry` mid-flight can't NPE us. + const retry = pendingRetry; + if (!retry || retry.nextRetryAt > now) return; + try { + const results = await wakeAllClaudeAccounts(); + const ok = recordWake(results); + if (ok || retry.retriesLeft <= 1) { + pendingRetry = null; + } else { + retry.retriesLeft -= 1; + retry.nextRetryAt = Date.now() + RETRY_INTERVAL_MS; + } + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + lastWake = { + firedAt: Date.now(), + ok: false, + results: [{ label: "(retry)", ok: false, error: message }], + }; + if (retry.retriesLeft <= 1) { + pendingRetry = null; + } else { + retry.retriesLeft -= 1; + retry.nextRetryAt = Date.now() + RETRY_INTERVAL_MS; + } + } +} + async function schedulerTick(): Promise<void> { - // Prevent concurrent tick execution (e.g. toggle called mid-tick) + // Prevent concurrent tick execution (e.g. toggle called mid-tick). if (isTickRunning) return; isTickRunning = true; try { const now = Date.now(); + // Snapshot hours so we don't iterate while mutating (toggle can race). const hours = Object.keys(wakeSchedule).map(Number); + let anyFiredThisTick = false; for (const hour of hours) { const ts = wakeSchedule[hour]; - if (ts !== undefined && ts <= now) { - // Reschedule for next day (recurring daily) - wakeSchedule[hour] = nextOccurrenceAt15(hour); - persistSchedule(); - - // Wake accounts and track failures for retry - try { - const results = await wakeAllClaudeAccounts(); - const anyFailed = results.some((r) => !r.ok); - if (anyFailed) { - pendingRetries.push({ - retriesLeft: 6, - nextRetryAt: now + 5 * 60 * 1000, - }); - } - } catch { - // Total failure — schedule retry - pendingRetries.push({ - retriesLeft: 6, - nextRetryAt: now + 5 * 60 * 1000, - }); - } - } + if (ts === undefined || ts > now) continue; + + // Advance the next fire to strictly > now (skips any missed days, + // e.g. if the tick was paused for hours). + wakeSchedule[hour] = nextDailyAfter(ts, now); + persistSchedule(); + anyFiredThisTick = true; + await fireWake(`scheduled wake at hour ${hour}`); } - // Process pending retries (iterate backwards for safe splicing) - for (let i = pendingRetries.length - 1; i >= 0; i--) { - const retry = pendingRetries[i]; - if (!retry || retry.nextRetryAt > now) continue; - - try { - const results = await wakeAllClaudeAccounts(); - const anyFailed = results.some((r) => !r.ok); - if (!anyFailed || retry.retriesLeft <= 1) { - // All succeeded or out of retries — remove - pendingRetries.splice(i, 1); - } else { - retry.retriesLeft--; - retry.nextRetryAt = now + 5 * 60 * 1000; - } - } catch { - if (retry.retriesLeft <= 1) { - pendingRetries.splice(i, 1); - } else { - retry.retriesLeft--; - retry.nextRetryAt = now + 5 * 60 * 1000; - } - } + // Only attempt a retry on ticks that didn't *just* fire — otherwise we'd + // race the retry against a fresh attempt within the same loop iteration. + if (!anyFiredThisTick) { + await processPendingRetry(Date.now()); } - // Schedule next tick while there's work to monitor - if (Object.keys(wakeSchedule).length > 0 || pendingRetries.length > 0) { - (globalThis as Record<string, unknown>)[timerKey] = setTimeout(schedulerTick, 30_000); + // Keep ticking while there's anything to monitor. + if (Object.keys(wakeSchedule).length > 0 || pendingRetry !== null) { + (globalThis as Record<string, unknown>)[timerKey] = setTimeout( + schedulerTick, + TICK_INTERVAL_MS, + ); } } finally { isTickRunning = false; @@ -743,10 +807,25 @@ async function schedulerTick(): Promise<void> { } export function startWakeScheduler(): void { - // Clear any previous timer (HMR-safe — works with Bun's Timer objects) + // Clear any previous timer (HMR-safe — works with Bun's Timer objects). const prev = (globalThis as Record<string, unknown>)[timerKey]; if (prev != null) clearTimeout(prev as ReturnType<typeof setTimeout>); - schedulerTick(); + // Fire-and-forget; the tick re-arms itself. + void schedulerTick(); +} + +function scheduleSnapshot(): { + schedule: WakeSchedule; + resetOffsetHours: number; + lastWake: LastWake | null; + pendingRetry: PendingRetry | null; +} { + return { + schedule: wakeSchedule, + resetOffsetHours: CLAUDE_RESET_OFFSET_HOURS, + lastWake, + pendingRetry, + }; } modelsRoutes.post("/wake-schedule/toggle", async (c) => { @@ -755,26 +834,31 @@ modelsRoutes.post("/wake-schedule/toggle", async (c) => { if (typeof hour !== "number" || !Number.isFinite(hour) || hour < 0 || hour > 23) { return c.json({ error: "hour must be a number 0-23" }, 400); } + // Integer-only; reject 4.7, NaN coercions, etc. + if (!Number.isInteger(hour)) { + return c.json({ error: "hour must be an integer 0-23" }, 400); + } if (wakeSchedule[hour] !== undefined) { - // Delete + // Toggle off — remove the entry. delete wakeSchedule[hour]; } else { - // Add — require a future timestamp + // Toggle on — require a future absolute timestamp from the client. The + // client is the source of truth for *local* wall-clock intent; the + // server just stores the ms and rolls it forward by 24h cycles. const ts = body.timestamp; - if (typeof ts !== "number" || ts <= Date.now()) { + if (typeof ts !== "number" || !Number.isFinite(ts) || ts <= Date.now()) { return c.json({ error: "timestamp must be a future Unix ms value" }, 400); } wakeSchedule[hour] = ts; } - // Persist and restart the tick loop persistSchedule(); startWakeScheduler(); - return c.json({ schedule: wakeSchedule }); + return c.json(scheduleSnapshot()); }); modelsRoutes.get("/wake-schedule", (c) => { - return c.json({ schedule: wakeSchedule }); + return c.json(scheduleSnapshot()); }); diff --git a/packages/api/tests/routes.test.ts b/packages/api/tests/routes.test.ts index 9ab2afe..e3dff3d 100644 --- a/packages/api/tests/routes.test.ts +++ b/packages/api/tests/routes.test.ts @@ -397,3 +397,95 @@ describe("POST /chat/stop", () => { expect(res.status).toBe(400); }); }); +describe("Wake schedule routes", () => { + async function getSchedule() { + const res = await app.request("/models/wake-schedule"); + expect(res.status).toBe(200); + return (await res.json()) as { + schedule: Record<string, number>; + resetOffsetHours: number; + lastWake: unknown; + pendingRetry: unknown; + }; + } + + async function toggle(body: Record<string, unknown>) { + return app.request("/models/wake-schedule/toggle", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(body), + }); + } + + it("GET returns the full snapshot shape with resetOffsetHours, lastWake, pendingRetry", async () => { + const snap = await getSchedule(); + expect(snap.schedule).toBeDefined(); + // CLAUDE_RESET_OFFSET_HOURS is currently 5; keep this loose in case the + // product changes the constant, but verify it's a positive integer. + expect(Number.isInteger(snap.resetOffsetHours)).toBe(true); + expect(snap.resetOffsetHours).toBeGreaterThan(0); + expect(snap.lastWake).toBeNull(); + expect(snap.pendingRetry).toBeNull(); + }); + + it("POST toggle adds and removes a wake hour", async () => { + const future = Date.now() + 60 * 60 * 1000; // 1 h ahead + + const addRes = await toggle({ hour: 9, timestamp: future }); + expect(addRes.status).toBe(200); + const addBody = (await addRes.json()) as { schedule: Record<string, number> }; + expect(addBody.schedule["9"]).toBe(future); + + const removeRes = await toggle({ hour: 9 }); + expect(removeRes.status).toBe(200); + const removeBody = (await removeRes.json()) as { schedule: Record<string, number> }; + expect(removeBody.schedule["9"]).toBeUndefined(); + }); + + it("POST toggle rejects out-of-range hour", async () => { + const res = await toggle({ hour: 24, timestamp: Date.now() + 60_000 }); + expect(res.status).toBe(400); + }); + + it("POST toggle rejects negative hour", async () => { + const res = await toggle({ hour: -1, timestamp: Date.now() + 60_000 }); + expect(res.status).toBe(400); + }); + + it("POST toggle rejects non-integer hour", async () => { + const res = await toggle({ hour: 4.5, timestamp: Date.now() + 60_000 }); + expect(res.status).toBe(400); + }); + + it("POST toggle rejects past timestamp on add", async () => { + const res = await toggle({ hour: 7, timestamp: Date.now() - 1000 }); + expect(res.status).toBe(400); + }); + + it("POST toggle rejects missing timestamp on add", async () => { + const res = await toggle({ hour: 8 }); + expect(res.status).toBe(400); + }); + + it("POST toggle: a delete does NOT require a timestamp", async () => { + const future = Date.now() + 60 * 60 * 1000; + const addRes = await toggle({ hour: 11, timestamp: future }); + expect(addRes.status).toBe(200); + const delRes = await toggle({ hour: 11 }); + expect(delRes.status).toBe(200); + const body = (await delRes.json()) as { schedule: Record<string, number> }; + expect(body.schedule["11"]).toBeUndefined(); + }); + + it("snapshot reflects multiple scheduled hours independently", async () => { + const future = Date.now() + 2 * 60 * 60 * 1000; + await toggle({ hour: 14, timestamp: future }); + await toggle({ hour: 19, timestamp: future + 60_000 }); + const snap = await getSchedule(); + expect(snap.schedule["14"]).toBe(future); + expect(snap.schedule["19"]).toBe(future + 60_000); + // Cleanup so later tests start clean. + await toggle({ hour: 14 }); + await toggle({ hour: 19 }); + }); +}); |
