// This file is auto-generated by @hey-api/openapi-ts import type { Config } from "./types.gen.js" export type ServerSentEventsOptions = Omit & Pick & { /** * Fetch API implementation. You can use this option to provide a custom * fetch instance. * * @default globalThis.fetch */ fetch?: typeof fetch /** * Implementing clients can call request interceptors inside this hook. */ onRequest?: (url: string, init: RequestInit) => Promise /** * Callback invoked when a network or parsing error occurs during streaming. * * This option applies only if the endpoint returns a stream of events. * * @param error The error that occurred. */ onSseError?: (error: unknown) => void /** * Callback invoked when an event is streamed from the server. * * This option applies only if the endpoint returns a stream of events. * * @param event Event streamed from the server. * @returns Nothing (void). */ onSseEvent?: (event: StreamEvent) => void serializedBody?: RequestInit["body"] /** * Default retry delay in milliseconds. * * This option applies only if the endpoint returns a stream of events. * * @default 3000 */ sseDefaultRetryDelay?: number /** * Maximum number of retry attempts before giving up. */ sseMaxRetryAttempts?: number /** * Maximum retry delay in milliseconds. * * Applies only when exponential backoff is used. * * This option applies only if the endpoint returns a stream of events. * * @default 30000 */ sseMaxRetryDelay?: number /** * Optional sleep function for retry backoff. * * Defaults to using `setTimeout`. */ sseSleepFn?: (ms: number) => Promise url: string } export interface StreamEvent { data: TData event?: string id?: string retry?: number } export type ServerSentEventsResult = { stream: AsyncGenerator ? TData[keyof TData] : TData, TReturn, TNext> } export const createSseClient = ({ onRequest, onSseError, onSseEvent, responseTransformer, responseValidator, sseDefaultRetryDelay, sseMaxRetryAttempts, sseMaxRetryDelay, sseSleepFn, url, ...options }: ServerSentEventsOptions): ServerSentEventsResult => { let lastEventId: string | undefined const sleep = sseSleepFn ?? ((ms: number) => new Promise((resolve) => setTimeout(resolve, ms))) const createStream = async function* () { let retryDelay: number = sseDefaultRetryDelay ?? 3000 let attempt = 0 const signal = options.signal ?? new AbortController().signal while (true) { if (signal.aborted) break attempt++ const headers = options.headers instanceof Headers ? options.headers : new Headers(options.headers as Record | undefined) if (lastEventId !== undefined) { headers.set("Last-Event-ID", lastEventId) } try { const requestInit: RequestInit = { redirect: "follow", ...options, body: options.serializedBody, headers, signal, } let request = new Request(url, requestInit) if (onRequest) { request = await onRequest(url, requestInit) } // fetch must be assigned here, otherwise it would throw the error: // TypeError: Failed to execute 'fetch' on 'Window': Illegal invocation const _fetch = options.fetch ?? globalThis.fetch const response = await _fetch(request) if (!response.ok) throw new Error(`SSE failed: ${response.status} ${response.statusText}`) if (!response.body) throw new Error("No body in SSE response") const reader = response.body.pipeThrough(new TextDecoderStream()).getReader() let buffer = "" const abortHandler = () => { try { reader.cancel() } catch { // noop } } signal.addEventListener("abort", abortHandler) try { while (true) { const { done, value } = await reader.read() if (done) break buffer += value // Normalize line endings: CRLF -> LF, then CR -> LF buffer = buffer.replace(/\r\n/g, "\n").replace(/\r/g, "\n") const chunks = buffer.split("\n\n") buffer = chunks.pop() ?? "" for (const chunk of chunks) { const lines = chunk.split("\n") const dataLines: Array = [] let eventName: string | undefined for (const line of lines) { if (line.startsWith("data:")) { dataLines.push(line.replace(/^data:\s*/, "")) } else if (line.startsWith("event:")) { eventName = line.replace(/^event:\s*/, "") } else if (line.startsWith("id:")) { lastEventId = line.replace(/^id:\s*/, "") } else if (line.startsWith("retry:")) { const parsed = Number.parseInt(line.replace(/^retry:\s*/, ""), 10) if (!Number.isNaN(parsed)) { retryDelay = parsed } } } let data: unknown let parsedJson = false if (dataLines.length) { const rawData = dataLines.join("\n") try { data = JSON.parse(rawData) parsedJson = true } catch { data = rawData } } if (parsedJson) { if (responseValidator) { await responseValidator(data) } if (responseTransformer) { data = await responseTransformer(data) } } onSseEvent?.({ data, event: eventName, id: lastEventId, retry: retryDelay, }) if (dataLines.length) { yield data as any } } } } finally { signal.removeEventListener("abort", abortHandler) reader.releaseLock() } break // exit loop on normal completion } catch (error) { // connection failed or aborted; retry after delay onSseError?.(error) if (sseMaxRetryAttempts !== undefined && attempt >= sseMaxRetryAttempts) { break // stop after firing error } // exponential backoff: double retry each attempt, cap at 30s const backoff = Math.min(retryDelay * 2 ** (attempt - 1), sseMaxRetryDelay ?? 30000) await sleep(backoff) } } } const stream = createStream() return { stream } }