Skip to content

Commit d431dac

Browse files
committed
fix(session): merge adjacent reasoning cycles
1 parent 53e89f9 commit d431dac

2 files changed

Lines changed: 107 additions & 1 deletion

File tree

packages/opencode/src/session/processor.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ interface ProcessorContext extends Input {
7979
needsCompaction: boolean
8080
currentText: MessageV2.TextPart | undefined
8181
reasoningMap: Record<string, MessageV2.ReasoningPart>
82+
lastReasoningPart: MessageV2.ReasoningPart | undefined
8283
}
8384

8485
type StreamEvent = Event
@@ -119,6 +120,7 @@ export const layer = Layer.effect(
119120
needsCompaction: false,
120121
currentText: undefined,
121122
reasoningMap: {},
123+
lastReasoningPart: undefined,
122124
}
123125
let aborted = false
124126
const slog = log.clone().tag("session.id", input.sessionID).tag("messageID", input.assistantMessage.id)
@@ -227,6 +229,16 @@ export const layer = Layer.effect(
227229
timestamp: DateTime.makeUnsafe(Date.now()),
228230
})
229231
}
232+
if (ctx.lastReasoningPart) {
233+
ctx.reasoningMap[value.id] = {
234+
...ctx.lastReasoningPart,
235+
time: { start: ctx.lastReasoningPart.time.start },
236+
metadata: value.providerMetadata ?? ctx.lastReasoningPart.metadata,
237+
}
238+
ctx.lastReasoningPart = undefined
239+
yield* session.updatePart(ctx.reasoningMap[value.id])
240+
return
241+
}
230242
ctx.reasoningMap[value.id] = {
231243
id: PartID.ascending(),
232244
messageID: ctx.assistantMessage.id,
@@ -268,10 +280,12 @@ export const layer = Layer.effect(
268280
ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() }
269281
if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata
270282
yield* session.updatePart(ctx.reasoningMap[value.id])
283+
ctx.lastReasoningPart = ctx.reasoningMap[value.id]
271284
delete ctx.reasoningMap[value.id]
272285
return
273286

274287
case "tool-input-start":
288+
ctx.lastReasoningPart = undefined
275289
if (ctx.assistantMessage.summary) {
276290
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
277291
}
@@ -319,6 +333,7 @@ export const layer = Layer.effect(
319333
}
320334

321335
case "tool-call": {
336+
ctx.lastReasoningPart = undefined
322337
if (ctx.assistantMessage.summary) {
323338
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
324339
}
@@ -464,6 +479,7 @@ export const layer = Layer.effect(
464479
throw value.error
465480

466481
case "start-step":
482+
ctx.lastReasoningPart = undefined
467483
if (!ctx.snapshot) ctx.snapshot = yield* snapshot.track()
468484
if (!ctx.assistantMessage.summary) {
469485
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
@@ -491,6 +507,7 @@ export const layer = Layer.effect(
491507
return
492508

493509
case "finish-step": {
510+
ctx.lastReasoningPart = undefined
494511
const completedSnapshot = yield* snapshot.track()
495512
const usage = Session.getUsage({
496513
model: ctx.model,
@@ -554,6 +571,7 @@ export const layer = Layer.effect(
554571
}
555572

556573
case "text-start":
574+
ctx.lastReasoningPart = undefined
557575
if (!ctx.assistantMessage.summary) {
558576
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
559577
if (flags.experimentalEventSystem) {
@@ -660,6 +678,7 @@ export const layer = Layer.effect(
660678
})
661679
}
662680
ctx.reasoningMap = {}
681+
ctx.lastReasoningPart = undefined
663682

664683
yield* Effect.forEach(
665684
Object.values(ctx.toolcalls),
@@ -727,6 +746,7 @@ export const layer = Layer.effect(
727746
yield* Effect.gen(function* () {
728747
ctx.currentText = undefined
729748
ctx.reasoningMap = {}
749+
ctx.lastReasoningPart = undefined
730750
const stream = llm.stream(streamInput)
731751

732752
yield* stream.pipe(
@@ -738,6 +758,7 @@ export const layer = Layer.effect(
738758
Effect.onInterrupt(() =>
739759
Effect.gen(function* () {
740760
aborted = true
761+
ctx.lastReasoningPart = undefined
741762
if (!ctx.assistantMessage.error) {
742763
yield* halt(new DOMException("Aborted", "AbortError"))
743764
}

packages/opencode/test/session/processor-effect.test.ts

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { NodeFileSystem } from "@effect/platform-node"
22
import { expect } from "bun:test"
3-
import { Cause, Effect, Exit, Fiber, Layer } from "effect"
3+
import { Cause, Effect, Exit, Fiber, Layer, Stream } from "effect"
44
import path from "path"
55
import type { Agent } from "../../src/agent/agent"
66
import { Agent as AgentSvc } from "../../src/agent/agent"
@@ -183,6 +183,18 @@ const deps = Layer.mergeAll(
183183
SyncEvent.defaultLayer,
184184
EventV2Bridge.defaultLayer,
185185
).pipe(Layer.provideMerge(infra))
186+
const depsWithoutLLM = Layer.mergeAll(
187+
Session.defaultLayer,
188+
Snapshot.defaultLayer,
189+
AgentSvc.defaultLayer,
190+
Permission.defaultLayer,
191+
Plugin.defaultLayer,
192+
Config.defaultLayer,
193+
Provider.defaultLayer,
194+
status,
195+
SyncEvent.defaultLayer,
196+
EventV2Bridge.defaultLayer,
197+
).pipe(Layer.provideMerge(infra))
186198
const env = Layer.mergeAll(
187199
TestLLMServer.layer,
188200
SessionProcessor.layer.pipe(
@@ -195,6 +207,33 @@ const env = Layer.mergeAll(
195207

196208
const it = testEffect(env)
197209

210+
const splitReasoningLLM = Layer.succeed(
211+
LLM.Service,
212+
LLM.Service.of({
213+
stream: () =>
214+
Stream.make(
215+
{ type: "reasoning-start", id: "reason-1" } as LLM.Event,
216+
{ type: "reasoning-delta", id: "reason-1", text: "The" } as LLM.Event,
217+
{ type: "reasoning-end", id: "reason-1" } as LLM.Event,
218+
{ type: "reasoning-start", id: "reason-2" } as LLM.Event,
219+
{ type: "reasoning-delta", id: "reason-2", text: " user" } as LLM.Event,
220+
{ type: "reasoning-end", id: "reason-2" } as LLM.Event,
221+
{ type: "finish" } as LLM.Event,
222+
),
223+
}),
224+
)
225+
const splitReasoningEnv = Layer.mergeAll(
226+
TestLLMServer.layer,
227+
SessionProcessor.layer.pipe(
228+
Layer.provide(summary),
229+
Layer.provide(Image.defaultLayer),
230+
Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })),
231+
Layer.provide(splitReasoningLLM),
232+
Layer.provideMerge(depsWithoutLLM),
233+
),
234+
)
235+
const splitReasoningIt = testEffect(splitReasoningEnv)
236+
198237
const boot = Effect.fn("test.boot")(function* () {
199238
const processors = yield* SessionProcessor.Service
200239
const session = yield* Session.Service
@@ -429,6 +468,52 @@ it.live("session.processor effect tests capture reasoning from http mock", () =>
429468
),
430469
)
431470

471+
splitReasoningIt.live("session.processor effect tests merge adjacent reasoning cycles", () =>
472+
provideTmpdirServer(
473+
({ dir }) =>
474+
Effect.gen(function* () {
475+
const { processors, session, provider } = yield* boot()
476+
477+
const chat = yield* session.create({})
478+
const parent = yield* user(chat.id, "reason")
479+
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
480+
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
481+
const handle = yield* processors.create({
482+
assistantMessage: msg,
483+
sessionID: chat.id,
484+
model: mdl,
485+
})
486+
487+
const value = yield* handle.process({
488+
user: {
489+
id: parent.id,
490+
sessionID: chat.id,
491+
role: "user",
492+
time: parent.time,
493+
agent: parent.agent,
494+
model: { providerID: ref.providerID, modelID: ref.modelID },
495+
} satisfies MessageV2.User,
496+
sessionID: chat.id,
497+
model: mdl,
498+
agent: agent(),
499+
system: [],
500+
messages: [{ role: "user", content: "reason" }],
501+
tools: {},
502+
})
503+
504+
const reasoning = MessageV2.parts(msg.id).filter(
505+
(part): part is MessageV2.ReasoningPart => part.type === "reasoning",
506+
)
507+
508+
expect(value).toBe("continue")
509+
expect(reasoning).toHaveLength(1)
510+
expect(reasoning[0]?.text).toBe("The user")
511+
expect(reasoning[0]?.time.end).toBeDefined()
512+
}),
513+
{ git: true, config: (url) => providerCfg(url) },
514+
),
515+
)
516+
432517
it.live("session.processor effect tests reset reasoning state across retries", () =>
433518
provideTmpdirServer(
434519
({ dir, llm }) =>

0 commit comments

Comments
 (0)