Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ interface ProcessorContext extends Input {
needsCompaction: boolean
currentText: MessageV2.TextPart | undefined
reasoningMap: Record<string, MessageV2.ReasoningPart>
lastReasoningPart: MessageV2.ReasoningPart | undefined
}

type StreamEvent = Event
Expand Down Expand Up @@ -119,6 +120,7 @@ export const layer = Layer.effect(
needsCompaction: false,
currentText: undefined,
reasoningMap: {},
lastReasoningPart: undefined,
}
let aborted = false
const slog = log.clone().tag("session.id", input.sessionID).tag("messageID", input.assistantMessage.id)
Expand Down Expand Up @@ -227,6 +229,16 @@ export const layer = Layer.effect(
timestamp: DateTime.makeUnsafe(Date.now()),
})
}
if (ctx.lastReasoningPart) {
ctx.reasoningMap[value.id] = {
...ctx.lastReasoningPart,
time: { start: ctx.lastReasoningPart.time.start },
metadata: value.providerMetadata ?? ctx.lastReasoningPart.metadata,
}
ctx.lastReasoningPart = undefined
yield* session.updatePart(ctx.reasoningMap[value.id])
return
}
ctx.reasoningMap[value.id] = {
id: PartID.ascending(),
messageID: ctx.assistantMessage.id,
Expand Down Expand Up @@ -268,10 +280,12 @@ export const layer = Layer.effect(
ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() }
if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata
yield* session.updatePart(ctx.reasoningMap[value.id])
ctx.lastReasoningPart = ctx.reasoningMap[value.id]
delete ctx.reasoningMap[value.id]
return

case "tool-input-start":
ctx.lastReasoningPart = undefined
if (ctx.assistantMessage.summary) {
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
}
Expand Down Expand Up @@ -319,6 +333,7 @@ export const layer = Layer.effect(
}

case "tool-call": {
ctx.lastReasoningPart = undefined
if (ctx.assistantMessage.summary) {
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
}
Expand Down Expand Up @@ -464,6 +479,7 @@ export const layer = Layer.effect(
throw value.error

case "start-step":
ctx.lastReasoningPart = undefined
if (!ctx.snapshot) ctx.snapshot = yield* snapshot.track()
if (!ctx.assistantMessage.summary) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
Expand Down Expand Up @@ -491,6 +507,7 @@ export const layer = Layer.effect(
return

case "finish-step": {
ctx.lastReasoningPart = undefined
const completedSnapshot = yield* snapshot.track()
const usage = Session.getUsage({
model: ctx.model,
Expand Down Expand Up @@ -554,6 +571,7 @@ export const layer = Layer.effect(
}

case "text-start":
ctx.lastReasoningPart = undefined
if (!ctx.assistantMessage.summary) {
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
if (flags.experimentalEventSystem) {
Expand Down Expand Up @@ -660,6 +678,7 @@ export const layer = Layer.effect(
})
}
ctx.reasoningMap = {}
ctx.lastReasoningPart = undefined

yield* Effect.forEach(
Object.values(ctx.toolcalls),
Expand Down Expand Up @@ -727,6 +746,7 @@ export const layer = Layer.effect(
yield* Effect.gen(function* () {
ctx.currentText = undefined
ctx.reasoningMap = {}
ctx.lastReasoningPart = undefined
const stream = llm.stream(streamInput)

yield* stream.pipe(
Expand All @@ -738,6 +758,7 @@ export const layer = Layer.effect(
Effect.onInterrupt(() =>
Effect.gen(function* () {
aborted = true
ctx.lastReasoningPart = undefined
if (!ctx.assistantMessage.error) {
yield* halt(new DOMException("Aborted", "AbortError"))
}
Expand Down
87 changes: 86 additions & 1 deletion packages/opencode/test/session/processor-effect.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { NodeFileSystem } from "@effect/platform-node"
import { expect } from "bun:test"
import { Cause, Effect, Exit, Fiber, Layer } from "effect"
import { Cause, Effect, Exit, Fiber, Layer, Stream } from "effect"
import path from "path"
import type { Agent } from "../../src/agent/agent"
import { Agent as AgentSvc } from "../../src/agent/agent"
Expand Down Expand Up @@ -183,6 +183,18 @@ const deps = Layer.mergeAll(
SyncEvent.defaultLayer,
EventV2Bridge.defaultLayer,
).pipe(Layer.provideMerge(infra))
const depsWithoutLLM = Layer.mergeAll(
Session.defaultLayer,
Snapshot.defaultLayer,
AgentSvc.defaultLayer,
Permission.defaultLayer,
Plugin.defaultLayer,
Config.defaultLayer,
Provider.defaultLayer,
status,
SyncEvent.defaultLayer,
EventV2Bridge.defaultLayer,
).pipe(Layer.provideMerge(infra))
const env = Layer.mergeAll(
TestLLMServer.layer,
SessionProcessor.layer.pipe(
Expand All @@ -195,6 +207,33 @@ const env = Layer.mergeAll(

const it = testEffect(env)

const splitReasoningLLM = Layer.succeed(
LLM.Service,
LLM.Service.of({
stream: () =>
Stream.make(
{ type: "reasoning-start", id: "reason-1" } as LLM.Event,
{ type: "reasoning-delta", id: "reason-1", text: "The" } as LLM.Event,
{ type: "reasoning-end", id: "reason-1" } as LLM.Event,
{ type: "reasoning-start", id: "reason-2" } as LLM.Event,
{ type: "reasoning-delta", id: "reason-2", text: " user" } as LLM.Event,
{ type: "reasoning-end", id: "reason-2" } as LLM.Event,
{ type: "finish" } as LLM.Event,
),
}),
)
const splitReasoningEnv = Layer.mergeAll(
TestLLMServer.layer,
SessionProcessor.layer.pipe(
Layer.provide(summary),
Layer.provide(Image.defaultLayer),
Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })),
Layer.provide(splitReasoningLLM),
Layer.provideMerge(depsWithoutLLM),
),
)
const splitReasoningIt = testEffect(splitReasoningEnv)

const boot = Effect.fn("test.boot")(function* () {
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
Expand Down Expand Up @@ -429,6 +468,52 @@ it.live("session.processor effect tests capture reasoning from http mock", () =>
),
)

splitReasoningIt.live("session.processor effect tests merge adjacent reasoning cycles", () =>
provideTmpdirServer(
({ dir }) =>
Effect.gen(function* () {
const { processors, session, provider } = yield* boot()

const chat = yield* session.create({})
const parent = yield* user(chat.id, "reason")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
})

const value = yield* handle.process({
user: {
id: parent.id,
sessionID: chat.id,
role: "user",
time: parent.time,
agent: parent.agent,
model: { providerID: ref.providerID, modelID: ref.modelID },
} satisfies MessageV2.User,
sessionID: chat.id,
model: mdl,
agent: agent(),
system: [],
messages: [{ role: "user", content: "reason" }],
tools: {},
})

const reasoning = MessageV2.parts(msg.id).filter(
(part): part is MessageV2.ReasoningPart => part.type === "reasoning",
)

expect(value).toBe("continue")
expect(reasoning).toHaveLength(1)
expect(reasoning[0]?.text).toBe("The user")
expect(reasoning[0]?.time.end).toBeDefined()
}),
{ git: true, config: (url) => providerCfg(url) },
),
)

it.live("session.processor effect tests reset reasoning state across retries", () =>
provideTmpdirServer(
({ dir, llm }) =>
Expand Down
Loading