summaryrefslogtreecommitdiffhomepage
path: root/packages/opencode/src/session/processor.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/opencode/src/session/processor.ts')
-rw-r--r--packages/opencode/src/session/processor.ts143
1 files changed, 138 insertions, 5 deletions
diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts
index b475ec1c5..1a32a656d 100644
--- a/packages/opencode/src/session/processor.ts
+++ b/packages/opencode/src/session/processor.ts
@@ -20,6 +20,9 @@ import { Question } from "@/question"
import { errorMessage } from "@/util/error"
import * as Log from "@opencode-ai/core/util/log"
import { isRecord } from "@/util/record"
+import { EventV2 } from "@/v2/event"
+import { SessionEvent } from "@/v2/session-event"
+import * as DateTime from "effect/DateTime"
const DOOM_LOOP_THRESHOLD = 3
const log = Log.create({ service: "session.processor" })
@@ -221,6 +224,12 @@ export const layer: Layer.Layer<
case "reasoning-start":
if (value.id in ctx.reasoningMap) return
+ // TODO(v2): Temporary dual-write while migrating session messages to v2 events.
+ EventV2.run(SessionEvent.Reasoning.Started.Sync, {
+ sessionID: ctx.sessionID,
+ reasoningID: value.id,
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
ctx.reasoningMap[value.id] = {
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
@@ -248,6 +257,13 @@ export const layer: Layer.Layer<
case "reasoning-end":
if (!(value.id in ctx.reasoningMap)) return
+ // TODO(v2): Temporary dual-write while migrating session messages to v2 events.
+ EventV2.run(SessionEvent.Reasoning.Ended.Sync, {
+ sessionID: ctx.sessionID,
+ reasoningID: value.id,
+ text: ctx.reasoningMap[value.id].text,
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
// oxlint-disable-next-line no-self-assign -- reactivity trigger
ctx.reasoningMap[value.id].text = ctx.reasoningMap[value.id].text
ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() }
@@ -260,6 +276,13 @@ export const layer: Layer.Layer<
if (ctx.assistantMessage.summary) {
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
}
+ // TODO(v2): Temporary dual-write while migrating session messages to v2 events.
+ EventV2.run(SessionEvent.Tool.Input.Started.Sync, {
+ sessionID: ctx.sessionID,
+ callID: value.id,
+ name: value.toolName,
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
const part = yield* session.updatePart({
id: ctx.toolcalls[value.id]?.partID ?? PartID.ascending(),
messageID: ctx.assistantMessage.id,
@@ -281,13 +304,34 @@ export const layer: Layer.Layer<
case "tool-input-delta":
return
- case "tool-input-end":
+ case "tool-input-end": {
+ // TODO(v2): Temporary dual-write while migrating session messages to v2 events.
+ EventV2.run(SessionEvent.Tool.Input.Ended.Sync, {
+ sessionID: ctx.sessionID,
+ callID: value.id,
+ text: "",
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
return
+ }
case "tool-call": {
if (ctx.assistantMessage.summary) {
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
}
+ const toolCall = yield* readToolCall(value.toolCallId)
+ // TODO(v2): Temporary dual-write while migrating session messages to v2 events.
+ EventV2.run(SessionEvent.Tool.Called.Sync, {
+ sessionID: ctx.sessionID,
+ callID: value.toolCallId,
+ tool: value.toolName,
+ input: value.input,
+ provider: {
+ executed: toolCall?.part.metadata?.providerExecuted === true,
+ ...(value.providerMetadata ? { metadata: value.providerMetadata } : {}),
+ },
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
yield* updateToolCall(value.toolCallId, (match) => ({
...match,
tool: value.toolName,
@@ -331,11 +375,48 @@ export const layer: Layer.Layer<
}
case "tool-result": {
+ const toolCall = yield* readToolCall(value.toolCallId)
+ // TODO(v2): Temporary dual-write while migrating session messages to v2 events.
+ EventV2.run(SessionEvent.Tool.Success.Sync, {
+ sessionID: ctx.sessionID,
+ callID: value.toolCallId,
+ structured: value.output.metadata,
+ content: [
+ {
+ type: "text",
+ text: value.output.output,
+ },
+ ...(value.output.attachments?.map((item: MessageV2.FilePart) => ({
+ type: "file",
+ uri: item.url,
+ mime: item.mime,
+ name: item.filename,
+ })) ?? []),
+ ],
+ provider: {
+ executed: toolCall?.part.metadata?.providerExecuted === true,
+ },
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
yield* completeToolCall(value.toolCallId, value.output)
return
}
case "tool-error": {
+ const toolCall = yield* readToolCall(value.toolCallId)
+ // TODO(v2): Temporary dual-write while migrating session messages to v2 events.
+ EventV2.run(SessionEvent.Tool.Error.Sync, {
+ sessionID: ctx.sessionID,
+ callID: value.toolCallId,
+ error: {
+ type: "unknown",
+ message: errorMessage(value.error),
+ },
+ provider: {
+ executed: toolCall?.part.metadata?.providerExecuted === true,
+ },
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
yield* failToolCall(value.toolCallId, value.error)
return
}
@@ -345,6 +426,20 @@ export const layer: Layer.Layer<
case "start-step":
if (!ctx.snapshot) ctx.snapshot = yield* snapshot.track()
+ if (!ctx.assistantMessage.summary) {
+ // TODO(v2): Temporary dual-write while migrating session messages to v2 events.
+ EventV2.run(SessionEvent.Step.Started.Sync, {
+ sessionID: ctx.sessionID,
+ agent: input.assistantMessage.agent,
+ model: {
+ id: ctx.model.id,
+ providerID: ctx.model.providerID,
+ variant: input.assistantMessage.variant,
+ },
+ snapshot: ctx.snapshot,
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
+ }
yield* session.updatePart({
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
@@ -355,18 +450,30 @@ export const layer: Layer.Layer<
return
case "finish-step": {
+ const completedSnapshot = yield* snapshot.track()
const usage = Session.getUsage({
model: ctx.model,
usage: value.usage,
metadata: value.providerMetadata,
})
+ if (!ctx.assistantMessage.summary) {
+ // TODO(v2): Temporary dual-write while migrating session messages to v2 events.
+ EventV2.run(SessionEvent.Step.Ended.Sync, {
+ sessionID: ctx.sessionID,
+ finish: value.finishReason,
+ cost: usage.cost,
+ tokens: usage.tokens,
+ snapshot: completedSnapshot,
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
+ }
ctx.assistantMessage.finish = value.finishReason
ctx.assistantMessage.cost += usage.cost
ctx.assistantMessage.tokens = usage.tokens
yield* session.updatePart({
id: PartID.ascending(),
reason: value.finishReason,
- snapshot: yield* snapshot.track(),
+ snapshot: completedSnapshot,
messageID: ctx.assistantMessage.id,
sessionID: ctx.assistantMessage.sessionID,
type: "step-finish",
@@ -404,6 +511,13 @@ export const layer: Layer.Layer<
}
case "text-start":
+ if (!ctx.assistantMessage.summary) {
+ // TODO(v2): Temporary dual-write while migrating session messages to v2 events.
+ EventV2.run(SessionEvent.Text.Started.Sync, {
+ sessionID: ctx.sessionID,
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
+ }
ctx.currentText = {
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
@@ -442,6 +556,14 @@ export const layer: Layer.Layer<
},
{ text: ctx.currentText.text },
)).text
+ if (!ctx.assistantMessage.summary) {
+ // TODO(v2): Temporary dual-write while migrating session messages to v2 events.
+ EventV2.run(SessionEvent.Text.Ended.Sync, {
+ sessionID: ctx.sessionID,
+ text: ctx.currentText.text,
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
+ }
{
const end = Date.now()
ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end }
@@ -568,13 +690,24 @@ export const layer: Layer.Layer<
Effect.retry(
SessionRetry.policy({
parse,
- set: (info) =>
- status.set(ctx.sessionID, {
+ set: (info) => {
+ // TODO(v2): Temporary dual-write while migrating session messages to v2 events.
+ EventV2.run(SessionEvent.Retried.Sync, {
+ sessionID: ctx.sessionID,
+ attempt: info.attempt,
+ error: {
+ message: info.message,
+ isRetryable: true,
+ },
+ timestamp: DateTime.makeUnsafe(Date.now()),
+ })
+ return status.set(ctx.sessionID, {
type: "retry",
attempt: info.attempt,
message: info.message,
next: info.next,
- }),
+ })
+ },
}),
),
Effect.catch(halt),