diff options
Diffstat (limited to 'packages/opencode/src/session/processor.ts')
| -rw-r--r-- | packages/opencode/src/session/processor.ts | 143 |
1 files changed, 138 insertions, 5 deletions
diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index b475ec1c5..1a32a656d 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -20,6 +20,9 @@ import { Question } from "@/question" import { errorMessage } from "@/util/error" import * as Log from "@opencode-ai/core/util/log" import { isRecord } from "@/util/record" +import { EventV2 } from "@/v2/event" +import { SessionEvent } from "@/v2/session-event" +import * as DateTime from "effect/DateTime" const DOOM_LOOP_THRESHOLD = 3 const log = Log.create({ service: "session.processor" }) @@ -221,6 +224,12 @@ export const layer: Layer.Layer< case "reasoning-start": if (value.id in ctx.reasoningMap) return + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + EventV2.run(SessionEvent.Reasoning.Started.Sync, { + sessionID: ctx.sessionID, + reasoningID: value.id, + timestamp: DateTime.makeUnsafe(Date.now()), + }) ctx.reasoningMap[value.id] = { id: PartID.ascending(), messageID: ctx.assistantMessage.id, @@ -248,6 +257,13 @@ export const layer: Layer.Layer< case "reasoning-end": if (!(value.id in ctx.reasoningMap)) return + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + EventV2.run(SessionEvent.Reasoning.Ended.Sync, { + sessionID: ctx.sessionID, + reasoningID: value.id, + text: ctx.reasoningMap[value.id].text, + timestamp: DateTime.makeUnsafe(Date.now()), + }) // oxlint-disable-next-line no-self-assign -- reactivity trigger ctx.reasoningMap[value.id].text = ctx.reasoningMap[value.id].text ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() } @@ -260,6 +276,13 @@ export const layer: Layer.Layer< if (ctx.assistantMessage.summary) { throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`) } + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + EventV2.run(SessionEvent.Tool.Input.Started.Sync, { + sessionID: ctx.sessionID, + callID: value.id, + name: value.toolName, + timestamp: DateTime.makeUnsafe(Date.now()), + }) const part = yield* session.updatePart({ id: ctx.toolcalls[value.id]?.partID ?? PartID.ascending(), messageID: ctx.assistantMessage.id, @@ -281,13 +304,34 @@ export const layer: Layer.Layer< case "tool-input-delta": return - case "tool-input-end": + case "tool-input-end": { + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + EventV2.run(SessionEvent.Tool.Input.Ended.Sync, { + sessionID: ctx.sessionID, + callID: value.id, + text: "", + timestamp: DateTime.makeUnsafe(Date.now()), + }) return + } case "tool-call": { if (ctx.assistantMessage.summary) { throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`) } + const toolCall = yield* readToolCall(value.toolCallId) + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + EventV2.run(SessionEvent.Tool.Called.Sync, { + sessionID: ctx.sessionID, + callID: value.toolCallId, + tool: value.toolName, + input: value.input, + provider: { + executed: toolCall?.part.metadata?.providerExecuted === true, + ...(value.providerMetadata ? { metadata: value.providerMetadata } : {}), + }, + timestamp: DateTime.makeUnsafe(Date.now()), + }) yield* updateToolCall(value.toolCallId, (match) => ({ ...match, tool: value.toolName, @@ -331,11 +375,48 @@ export const layer: Layer.Layer< } case "tool-result": { + const toolCall = yield* readToolCall(value.toolCallId) + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + EventV2.run(SessionEvent.Tool.Success.Sync, { + sessionID: ctx.sessionID, + callID: value.toolCallId, + structured: value.output.metadata, + content: [ + { + type: "text", + text: value.output.output, + }, + ...(value.output.attachments?.map((item: MessageV2.FilePart) => ({ + type: "file", + uri: item.url, + mime: item.mime, + name: item.filename, + })) ?? []), + ], + provider: { + executed: toolCall?.part.metadata?.providerExecuted === true, + }, + timestamp: DateTime.makeUnsafe(Date.now()), + }) yield* completeToolCall(value.toolCallId, value.output) return } case "tool-error": { + const toolCall = yield* readToolCall(value.toolCallId) + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + EventV2.run(SessionEvent.Tool.Error.Sync, { + sessionID: ctx.sessionID, + callID: value.toolCallId, + error: { + type: "unknown", + message: errorMessage(value.error), + }, + provider: { + executed: toolCall?.part.metadata?.providerExecuted === true, + }, + timestamp: DateTime.makeUnsafe(Date.now()), + }) yield* failToolCall(value.toolCallId, value.error) return } @@ -345,6 +426,20 @@ export const layer: Layer.Layer< case "start-step": if (!ctx.snapshot) ctx.snapshot = yield* snapshot.track() + if (!ctx.assistantMessage.summary) { + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + EventV2.run(SessionEvent.Step.Started.Sync, { + sessionID: ctx.sessionID, + agent: input.assistantMessage.agent, + model: { + id: ctx.model.id, + providerID: ctx.model.providerID, + variant: input.assistantMessage.variant, + }, + snapshot: ctx.snapshot, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } yield* session.updatePart({ id: PartID.ascending(), messageID: ctx.assistantMessage.id, @@ -355,18 +450,30 @@ export const layer: Layer.Layer< return case "finish-step": { + const completedSnapshot = yield* snapshot.track() const usage = Session.getUsage({ model: ctx.model, usage: value.usage, metadata: value.providerMetadata, }) + if (!ctx.assistantMessage.summary) { + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + EventV2.run(SessionEvent.Step.Ended.Sync, { + sessionID: ctx.sessionID, + finish: value.finishReason, + cost: usage.cost, + tokens: usage.tokens, + snapshot: completedSnapshot, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } ctx.assistantMessage.finish = value.finishReason ctx.assistantMessage.cost += usage.cost ctx.assistantMessage.tokens = usage.tokens yield* session.updatePart({ id: PartID.ascending(), reason: value.finishReason, - snapshot: yield* snapshot.track(), + snapshot: completedSnapshot, messageID: ctx.assistantMessage.id, sessionID: ctx.assistantMessage.sessionID, type: "step-finish", @@ -404,6 +511,13 @@ export const layer: Layer.Layer< } case "text-start": + if (!ctx.assistantMessage.summary) { + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + EventV2.run(SessionEvent.Text.Started.Sync, { + sessionID: ctx.sessionID, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } ctx.currentText = { id: PartID.ascending(), messageID: ctx.assistantMessage.id, @@ -442,6 +556,14 @@ export const layer: Layer.Layer< }, { text: ctx.currentText.text }, )).text + if (!ctx.assistantMessage.summary) { + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + EventV2.run(SessionEvent.Text.Ended.Sync, { + sessionID: ctx.sessionID, + text: ctx.currentText.text, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } { const end = Date.now() ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end } @@ -568,13 +690,24 @@ export const layer: Layer.Layer< Effect.retry( SessionRetry.policy({ parse, - set: (info) => - status.set(ctx.sessionID, { + set: (info) => { + // TODO(v2): Temporary dual-write while migrating session messages to v2 events. + EventV2.run(SessionEvent.Retried.Sync, { + sessionID: ctx.sessionID, + attempt: info.attempt, + error: { + message: info.message, + isRetryable: true, + }, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + return status.set(ctx.sessionID, { type: "retry", attempt: info.attempt, message: info.message, next: info.next, - }), + }) + }, }), ), Effect.catch(halt), |
