summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorAdam Malczewski <[email protected]>2026-06-01 09:30:08 +0900
committerAdam Malczewski <[email protected]>2026-06-01 09:30:08 +0900
commit7128c624cf076d698bceb354526cf4cd3dfe5434 (patch)
tree7ddc5a9d57667a493842ec98e39ce3cc279a2587
parentb8bc5c71076d0318291e120e056e8103344493ff (diff)
downloaddispatch-7128c624cf076d698bceb354526cf4cd3dfe5434.tar.gz
dispatch-7128c624cf076d698bceb354526cf4cd3dfe5434.zip
fix(api): wake scheduler — missed-wake recovery, retry consolidation, status surface
Bugs fixed - Missed wakes silently lost. The old loadScheduleFromDB just pushed any past next_wake_at to its 'next occurrence' in *server* local time, so a wake that fired while the API was down never ran — defeating the whole point of the panel (overnight task picks up after a 5h rate-window reset). Now: if missed by <= 2h we fire it on the next tick; either way the entry is rolled forward by 24h-multiple steps. - Server-TZ drift. nextOccurrenceAt15 used the server's local TZ, so on a UTC Docker host running for a user in PST the reschedule slowly migrated the fire time. Now we advance by 24h * N from the original client-supplied timestamp, preserving the user's wall-clock intent. - Retry storm. Every failed wake pushed a new entry into a retries[] array, all converging at the same +5min instant. Replaced with a single shared pending-retry slot whose budget resets on subsequent failures. - Retry race with fresh fires. If a tick fired AND a retry was due in the same iteration we'd double-hit the upstream. Now retries only run on ticks where no fresh wake fired. New behavior surfaced on /wake-schedule: { schedule, resetOffsetHours, lastWake, pendingRetry } POST /wake-schedule/toggle now also rejects non-integer hours (4.5, etc.) and returns the same snapshot shape so the client can stay in sync. Tests: 9 new HTTP route tests covering snapshot shape, add/remove, validation (range, integer, past timestamp, missing timestamp), and independent multi-hour scheduling.
-rw-r--r--packages/api/src/routes/models.ts242
-rw-r--r--packages/api/tests/routes.test.ts92
2 files changed, 255 insertions, 79 deletions
diff --git a/packages/api/src/routes/models.ts b/packages/api/src/routes/models.ts
index 1a54abb..4fc89eb 100644
--- a/packages/api/src/routes/models.ts
+++ b/packages/api/src/routes/models.ts
@@ -21,6 +21,11 @@ import {
validateAccountCredentials,
} from "@dispatch/core";
import { Hono } from "hono";
+import {
+ CLAUDE_RESET_OFFSET_HOURS,
+ nextDailyAfter,
+ recoverScheduleEntry,
+} from "../wake-scheduler.js";
let getRegistry: () => ModelRegistry | null = () => null;
let getAccounts: () => ClaudeAccount[] = () => [];
@@ -608,20 +613,24 @@ modelsRoutes.post("/wake", async (c) => {
type WakeSchedule = Record<number, number>; // hour → next wake timestamp (ms)
interface PendingRetry {
- retriesLeft: number; // starts at 6 (5 min × 6 = 30 min)
- nextRetryAt: number; // timestamp for next retry attempt
+ /** Remaining attempts. Starts at MAX_RETRIES (e.g. 6 → 30 min of retries). */
+ retriesLeft: number;
+ /** Absolute timestamp (ms) of the next retry attempt. */
+ nextRetryAt: number;
+ /** Why we entered retry mode — surfaced on /wake-schedule. */
+ reason: string;
}
-function nextOccurrenceAt15(hour: number): number {
- const now = new Date();
- const target = new Date(now);
- target.setHours(hour, 15, 0, 0);
- if (target.getTime() <= Date.now()) {
- target.setDate(target.getDate() + 1);
- }
- return target.getTime();
+interface LastWake {
+ firedAt: number;
+ ok: boolean;
+ results: Array<{ label: string; ok: boolean; error?: string }>;
}
+const MAX_RETRIES = 6;
+const RETRY_INTERVAL_MS = 5 * 60 * 1000;
+const TICK_INTERVAL_MS = 30_000;
+
function loadScheduleFromDB(): WakeSchedule {
try {
const db = getDatabase();
@@ -630,16 +639,23 @@ function loadScheduleFromDB(): WakeSchedule {
next_wake_at: number;
}>;
const schedule: WakeSchedule = {};
- let needsUpdate = false;
+ const now = Date.now();
+ let needsPersist = false;
for (const row of rows) {
- if (row.next_wake_at > Date.now()) {
- schedule[row.hour] = row.next_wake_at;
- } else {
- schedule[row.hour] = nextOccurrenceAt15(row.hour);
- needsUpdate = true;
+ const recovered = recoverScheduleEntry(row.next_wake_at, now);
+ schedule[row.hour] = recovered.nextWakeAt;
+ if (recovered.nextWakeAt !== row.next_wake_at) {
+ needsPersist = true;
+ }
+ if (recovered.shouldFireNow) {
+ // Mark the entry as "due immediately" so the next tick fires it.
+ // We accomplish this by setting next_wake_at = now; the tick will
+ // see ts <= now, fire, and advance via nextDailyAfter().
+ schedule[row.hour] = now;
+ needsPersist = true;
}
}
- if (needsUpdate) {
+ if (needsPersist) {
persistSchedule(schedule);
}
return schedule;
@@ -660,82 +676,130 @@ function persistSchedule(scheduleToSave?: WakeSchedule): void {
insert.run({ $hour: Number(hour), $nextWakeAt: nextWakeAt });
}
} catch {
- // Ignore DB errors
+ // Ignore DB errors — schedule still lives in-memory for this process.
}
}
const wakeSchedule: WakeSchedule = loadScheduleFromDB();
-const pendingRetries: PendingRetry[] = [];
-// HMR-safe: clear previous tick before starting a new one
-(globalThis as Record<string, unknown>)._dispatchWakeTimer ??= undefined;
+/**
+ * A single shared retry slot. We deliberately do NOT queue one retry per
+ * failed wake — multiple back-to-back failures (e.g. the network is down for
+ * five minutes) used to spawn retries that all converged on the same instant
+ * and hammered the upstream. One in-flight retry covers all accounts.
+ */
+let pendingRetry: PendingRetry | null = null;
+let lastWake: LastWake | null = null;
+
+// HMR-safe: track the scheduler timer on globalThis so re-imports during dev
+// don't leave orphaned timers running.
const timerKey = "_dispatchWakeTimer";
+(globalThis as Record<string, unknown>)[timerKey] ??= undefined;
let isTickRunning = false;
+function recordWake(results: Array<{ label: string; ok: boolean; error?: string }>): boolean {
+ const ok = results.length > 0 && results.every((r) => r.ok);
+ lastWake = { firedAt: Date.now(), ok, results };
+ return ok;
+}
+
+function scheduleRetry(reason: string): void {
+ if (pendingRetry) {
+ // Already retrying — reset the budget so the next failure window covers
+ // the new incident too, but don't compound timers.
+ pendingRetry.retriesLeft = MAX_RETRIES;
+ pendingRetry.nextRetryAt = Date.now() + RETRY_INTERVAL_MS;
+ pendingRetry.reason = reason;
+ return;
+ }
+ pendingRetry = {
+ retriesLeft: MAX_RETRIES,
+ nextRetryAt: Date.now() + RETRY_INTERVAL_MS,
+ reason,
+ };
+}
+
+async function fireWake(reason: string): Promise<void> {
+ try {
+ const results = await wakeAllClaudeAccounts();
+ const ok = recordWake(results);
+ if (!ok) scheduleRetry(reason);
+ } catch (err) {
+ const message = err instanceof Error ? err.message : String(err);
+ lastWake = {
+ firedAt: Date.now(),
+ ok: false,
+ results: [{ label: "(scheduler)", ok: false, error: message }],
+ };
+ scheduleRetry(reason);
+ }
+}
+
+async function processPendingRetry(now: number): Promise<void> {
+ // Capture into a local so TS narrowing survives across awaits, and so a
+ // racing toggle that clears `pendingRetry` mid-flight can't NPE us.
+ const retry = pendingRetry;
+ if (!retry || retry.nextRetryAt > now) return;
+ try {
+ const results = await wakeAllClaudeAccounts();
+ const ok = recordWake(results);
+ if (ok || retry.retriesLeft <= 1) {
+ pendingRetry = null;
+ } else {
+ retry.retriesLeft -= 1;
+ retry.nextRetryAt = Date.now() + RETRY_INTERVAL_MS;
+ }
+ } catch (err) {
+ const message = err instanceof Error ? err.message : String(err);
+ lastWake = {
+ firedAt: Date.now(),
+ ok: false,
+ results: [{ label: "(retry)", ok: false, error: message }],
+ };
+ if (retry.retriesLeft <= 1) {
+ pendingRetry = null;
+ } else {
+ retry.retriesLeft -= 1;
+ retry.nextRetryAt = Date.now() + RETRY_INTERVAL_MS;
+ }
+ }
+}
+
async function schedulerTick(): Promise<void> {
- // Prevent concurrent tick execution (e.g. toggle called mid-tick)
+ // Prevent concurrent tick execution (e.g. toggle called mid-tick).
if (isTickRunning) return;
isTickRunning = true;
try {
const now = Date.now();
+ // Snapshot hours so we don't iterate while mutating (toggle can race).
const hours = Object.keys(wakeSchedule).map(Number);
+ let anyFiredThisTick = false;
for (const hour of hours) {
const ts = wakeSchedule[hour];
- if (ts !== undefined && ts <= now) {
- // Reschedule for next day (recurring daily)
- wakeSchedule[hour] = nextOccurrenceAt15(hour);
- persistSchedule();
-
- // Wake accounts and track failures for retry
- try {
- const results = await wakeAllClaudeAccounts();
- const anyFailed = results.some((r) => !r.ok);
- if (anyFailed) {
- pendingRetries.push({
- retriesLeft: 6,
- nextRetryAt: now + 5 * 60 * 1000,
- });
- }
- } catch {
- // Total failure — schedule retry
- pendingRetries.push({
- retriesLeft: 6,
- nextRetryAt: now + 5 * 60 * 1000,
- });
- }
- }
+ if (ts === undefined || ts > now) continue;
+
+ // Advance the next fire to strictly > now (skips any missed days,
+ // e.g. if the tick was paused for hours).
+ wakeSchedule[hour] = nextDailyAfter(ts, now);
+ persistSchedule();
+ anyFiredThisTick = true;
+ await fireWake(`scheduled wake at hour ${hour}`);
}
- // Process pending retries (iterate backwards for safe splicing)
- for (let i = pendingRetries.length - 1; i >= 0; i--) {
- const retry = pendingRetries[i];
- if (!retry || retry.nextRetryAt > now) continue;
-
- try {
- const results = await wakeAllClaudeAccounts();
- const anyFailed = results.some((r) => !r.ok);
- if (!anyFailed || retry.retriesLeft <= 1) {
- // All succeeded or out of retries — remove
- pendingRetries.splice(i, 1);
- } else {
- retry.retriesLeft--;
- retry.nextRetryAt = now + 5 * 60 * 1000;
- }
- } catch {
- if (retry.retriesLeft <= 1) {
- pendingRetries.splice(i, 1);
- } else {
- retry.retriesLeft--;
- retry.nextRetryAt = now + 5 * 60 * 1000;
- }
- }
+ // Only attempt a retry on ticks that didn't *just* fire — otherwise we'd
+ // race the retry against a fresh attempt within the same loop iteration.
+ if (!anyFiredThisTick) {
+ await processPendingRetry(Date.now());
}
- // Schedule next tick while there's work to monitor
- if (Object.keys(wakeSchedule).length > 0 || pendingRetries.length > 0) {
- (globalThis as Record<string, unknown>)[timerKey] = setTimeout(schedulerTick, 30_000);
+ // Keep ticking while there's anything to monitor.
+ if (Object.keys(wakeSchedule).length > 0 || pendingRetry !== null) {
+ (globalThis as Record<string, unknown>)[timerKey] = setTimeout(
+ schedulerTick,
+ TICK_INTERVAL_MS,
+ );
}
} finally {
isTickRunning = false;
@@ -743,10 +807,25 @@ async function schedulerTick(): Promise<void> {
}
export function startWakeScheduler(): void {
- // Clear any previous timer (HMR-safe — works with Bun's Timer objects)
+ // Clear any previous timer (HMR-safe — works with Bun's Timer objects).
const prev = (globalThis as Record<string, unknown>)[timerKey];
if (prev != null) clearTimeout(prev as ReturnType<typeof setTimeout>);
- schedulerTick();
+ // Fire-and-forget; the tick re-arms itself.
+ void schedulerTick();
+}
+
+function scheduleSnapshot(): {
+ schedule: WakeSchedule;
+ resetOffsetHours: number;
+ lastWake: LastWake | null;
+ pendingRetry: PendingRetry | null;
+} {
+ return {
+ schedule: wakeSchedule,
+ resetOffsetHours: CLAUDE_RESET_OFFSET_HOURS,
+ lastWake,
+ pendingRetry,
+ };
}
modelsRoutes.post("/wake-schedule/toggle", async (c) => {
@@ -755,26 +834,31 @@ modelsRoutes.post("/wake-schedule/toggle", async (c) => {
if (typeof hour !== "number" || !Number.isFinite(hour) || hour < 0 || hour > 23) {
return c.json({ error: "hour must be a number 0-23" }, 400);
}
+ // Integer-only; reject 4.7, NaN coercions, etc.
+ if (!Number.isInteger(hour)) {
+ return c.json({ error: "hour must be an integer 0-23" }, 400);
+ }
if (wakeSchedule[hour] !== undefined) {
- // Delete
+ // Toggle off — remove the entry.
delete wakeSchedule[hour];
} else {
- // Add — require a future timestamp
+ // Toggle on — require a future absolute timestamp from the client. The
+ // client is the source of truth for *local* wall-clock intent; the
+ // server just stores the ms and rolls it forward by 24h cycles.
const ts = body.timestamp;
- if (typeof ts !== "number" || ts <= Date.now()) {
+ if (typeof ts !== "number" || !Number.isFinite(ts) || ts <= Date.now()) {
return c.json({ error: "timestamp must be a future Unix ms value" }, 400);
}
wakeSchedule[hour] = ts;
}
- // Persist and restart the tick loop
persistSchedule();
startWakeScheduler();
- return c.json({ schedule: wakeSchedule });
+ return c.json(scheduleSnapshot());
});
modelsRoutes.get("/wake-schedule", (c) => {
- return c.json({ schedule: wakeSchedule });
+ return c.json(scheduleSnapshot());
});
diff --git a/packages/api/tests/routes.test.ts b/packages/api/tests/routes.test.ts
index 9ab2afe..e3dff3d 100644
--- a/packages/api/tests/routes.test.ts
+++ b/packages/api/tests/routes.test.ts
@@ -397,3 +397,95 @@ describe("POST /chat/stop", () => {
expect(res.status).toBe(400);
});
});
+describe("Wake schedule routes", () => {
+ async function getSchedule() {
+ const res = await app.request("/models/wake-schedule");
+ expect(res.status).toBe(200);
+ return (await res.json()) as {
+ schedule: Record<string, number>;
+ resetOffsetHours: number;
+ lastWake: unknown;
+ pendingRetry: unknown;
+ };
+ }
+
+ async function toggle(body: Record<string, unknown>) {
+ return app.request("/models/wake-schedule/toggle", {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify(body),
+ });
+ }
+
+ it("GET returns the full snapshot shape with resetOffsetHours, lastWake, pendingRetry", async () => {
+ const snap = await getSchedule();
+ expect(snap.schedule).toBeDefined();
+ // CLAUDE_RESET_OFFSET_HOURS is currently 5; keep this loose in case the
+ // product changes the constant, but verify it's a positive integer.
+ expect(Number.isInteger(snap.resetOffsetHours)).toBe(true);
+ expect(snap.resetOffsetHours).toBeGreaterThan(0);
+ expect(snap.lastWake).toBeNull();
+ expect(snap.pendingRetry).toBeNull();
+ });
+
+ it("POST toggle adds and removes a wake hour", async () => {
+ const future = Date.now() + 60 * 60 * 1000; // 1 h ahead
+
+ const addRes = await toggle({ hour: 9, timestamp: future });
+ expect(addRes.status).toBe(200);
+ const addBody = (await addRes.json()) as { schedule: Record<string, number> };
+ expect(addBody.schedule["9"]).toBe(future);
+
+ const removeRes = await toggle({ hour: 9 });
+ expect(removeRes.status).toBe(200);
+ const removeBody = (await removeRes.json()) as { schedule: Record<string, number> };
+ expect(removeBody.schedule["9"]).toBeUndefined();
+ });
+
+ it("POST toggle rejects out-of-range hour", async () => {
+ const res = await toggle({ hour: 24, timestamp: Date.now() + 60_000 });
+ expect(res.status).toBe(400);
+ });
+
+ it("POST toggle rejects negative hour", async () => {
+ const res = await toggle({ hour: -1, timestamp: Date.now() + 60_000 });
+ expect(res.status).toBe(400);
+ });
+
+ it("POST toggle rejects non-integer hour", async () => {
+ const res = await toggle({ hour: 4.5, timestamp: Date.now() + 60_000 });
+ expect(res.status).toBe(400);
+ });
+
+ it("POST toggle rejects past timestamp on add", async () => {
+ const res = await toggle({ hour: 7, timestamp: Date.now() - 1000 });
+ expect(res.status).toBe(400);
+ });
+
+ it("POST toggle rejects missing timestamp on add", async () => {
+ const res = await toggle({ hour: 8 });
+ expect(res.status).toBe(400);
+ });
+
+ it("POST toggle: a delete does NOT require a timestamp", async () => {
+ const future = Date.now() + 60 * 60 * 1000;
+ const addRes = await toggle({ hour: 11, timestamp: future });
+ expect(addRes.status).toBe(200);
+ const delRes = await toggle({ hour: 11 });
+ expect(delRes.status).toBe(200);
+ const body = (await delRes.json()) as { schedule: Record<string, number> };
+ expect(body.schedule["11"]).toBeUndefined();
+ });
+
+ it("snapshot reflects multiple scheduled hours independently", async () => {
+ const future = Date.now() + 2 * 60 * 60 * 1000;
+ await toggle({ hour: 14, timestamp: future });
+ await toggle({ hour: 19, timestamp: future + 60_000 });
+ const snap = await getSchedule();
+ expect(snap.schedule["14"]).toBe(future);
+ expect(snap.schedule["19"]).toBe(future + 60_000);
+ // Cleanup so later tests start clean.
+ await toggle({ hour: 14 });
+ await toggle({ hour: 19 });
+ });
+});