diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index aac893075f00..6d47e3147b23 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -79,6 +79,7 @@ interface ProcessorContext extends Input { needsCompaction: boolean currentText: MessageV2.TextPart | undefined reasoningMap: Record + lastReasoningPart: MessageV2.ReasoningPart | undefined } type StreamEvent = Event @@ -119,6 +120,7 @@ export const layer = Layer.effect( needsCompaction: false, currentText: undefined, reasoningMap: {}, + lastReasoningPart: undefined, } let aborted = false const slog = log.clone().tag("session.id", input.sessionID).tag("messageID", input.assistantMessage.id) @@ -227,6 +229,16 @@ export const layer = Layer.effect( timestamp: DateTime.makeUnsafe(Date.now()), }) } + if (ctx.lastReasoningPart) { + ctx.reasoningMap[value.id] = { + ...ctx.lastReasoningPart, + time: { start: ctx.lastReasoningPart.time.start }, + metadata: value.providerMetadata ?? ctx.lastReasoningPart.metadata, + } + ctx.lastReasoningPart = undefined + yield* session.updatePart(ctx.reasoningMap[value.id]) + return + } ctx.reasoningMap[value.id] = { id: PartID.ascending(), messageID: ctx.assistantMessage.id, @@ -268,10 +280,12 @@ export const layer = Layer.effect( ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() } if (value.providerMetadata) ctx.reasoningMap[value.id].metadata = value.providerMetadata yield* session.updatePart(ctx.reasoningMap[value.id]) + ctx.lastReasoningPart = ctx.reasoningMap[value.id] delete ctx.reasoningMap[value.id] return case "tool-input-start": + ctx.lastReasoningPart = undefined if (ctx.assistantMessage.summary) { throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`) } @@ -319,6 +333,7 @@ export const layer = Layer.effect( } case "tool-call": { + ctx.lastReasoningPart = undefined if (ctx.assistantMessage.summary) { throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`) } @@ -464,6 +479,7 @@ export const layer = Layer.effect( throw value.error case "start-step": + ctx.lastReasoningPart = undefined if (!ctx.snapshot) ctx.snapshot = yield* snapshot.track() if (!ctx.assistantMessage.summary) { // TODO(v2): Temporary dual-write while migrating session messages to v2 events. @@ -491,6 +507,7 @@ export const layer = Layer.effect( return case "finish-step": { + ctx.lastReasoningPart = undefined const completedSnapshot = yield* snapshot.track() const usage = Session.getUsage({ model: ctx.model, @@ -554,6 +571,7 @@ export const layer = Layer.effect( } case "text-start": + ctx.lastReasoningPart = undefined if (!ctx.assistantMessage.summary) { // TODO(v2): Temporary dual-write while migrating session messages to v2 events. if (flags.experimentalEventSystem) { @@ -660,6 +678,7 @@ export const layer = Layer.effect( }) } ctx.reasoningMap = {} + ctx.lastReasoningPart = undefined yield* Effect.forEach( Object.values(ctx.toolcalls), @@ -727,6 +746,7 @@ export const layer = Layer.effect( yield* Effect.gen(function* () { ctx.currentText = undefined ctx.reasoningMap = {} + ctx.lastReasoningPart = undefined const stream = llm.stream(streamInput) yield* stream.pipe( @@ -738,6 +758,7 @@ export const layer = Layer.effect( Effect.onInterrupt(() => Effect.gen(function* () { aborted = true + ctx.lastReasoningPart = undefined if (!ctx.assistantMessage.error) { yield* halt(new DOMException("Aborted", "AbortError")) } diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts index 78c7e4c64228..fc1f63eff387 100644 --- a/packages/opencode/test/session/processor-effect.test.ts +++ b/packages/opencode/test/session/processor-effect.test.ts @@ -1,6 +1,6 @@ import { NodeFileSystem } from "@effect/platform-node" import { expect } from "bun:test" -import { Cause, Effect, Exit, Fiber, Layer } from "effect" +import { Cause, Effect, Exit, Fiber, Layer, Stream } from "effect" import path from "path" import type { Agent } from "../../src/agent/agent" import { Agent as AgentSvc } from "../../src/agent/agent" @@ -183,6 +183,18 @@ const deps = Layer.mergeAll( SyncEvent.defaultLayer, EventV2Bridge.defaultLayer, ).pipe(Layer.provideMerge(infra)) +const depsWithoutLLM = Layer.mergeAll( + Session.defaultLayer, + Snapshot.defaultLayer, + AgentSvc.defaultLayer, + Permission.defaultLayer, + Plugin.defaultLayer, + Config.defaultLayer, + Provider.defaultLayer, + status, + SyncEvent.defaultLayer, + EventV2Bridge.defaultLayer, +).pipe(Layer.provideMerge(infra)) const env = Layer.mergeAll( TestLLMServer.layer, SessionProcessor.layer.pipe( @@ -195,6 +207,33 @@ const env = Layer.mergeAll( const it = testEffect(env) +const splitReasoningLLM = Layer.succeed( + LLM.Service, + LLM.Service.of({ + stream: () => + Stream.make( + { type: "reasoning-start", id: "reason-1" } as LLM.Event, + { type: "reasoning-delta", id: "reason-1", text: "The" } as LLM.Event, + { type: "reasoning-end", id: "reason-1" } as LLM.Event, + { type: "reasoning-start", id: "reason-2" } as LLM.Event, + { type: "reasoning-delta", id: "reason-2", text: " user" } as LLM.Event, + { type: "reasoning-end", id: "reason-2" } as LLM.Event, + { type: "finish" } as LLM.Event, + ), + }), +) +const splitReasoningEnv = Layer.mergeAll( + TestLLMServer.layer, + SessionProcessor.layer.pipe( + Layer.provide(summary), + Layer.provide(Image.defaultLayer), + Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })), + Layer.provide(splitReasoningLLM), + Layer.provideMerge(depsWithoutLLM), + ), +) +const splitReasoningIt = testEffect(splitReasoningEnv) + const boot = Effect.fn("test.boot")(function* () { const processors = yield* SessionProcessor.Service const session = yield* Session.Service @@ -429,6 +468,52 @@ it.live("session.processor effect tests capture reasoning from http mock", () => ), ) +splitReasoningIt.live("session.processor effect tests merge adjacent reasoning cycles", () => + provideTmpdirServer( + ({ dir }) => + Effect.gen(function* () { + const { processors, session, provider } = yield* boot() + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "reason") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + }) + + const value = yield* handle.process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + messages: [{ role: "user", content: "reason" }], + tools: {}, + }) + + const reasoning = MessageV2.parts(msg.id).filter( + (part): part is MessageV2.ReasoningPart => part.type === "reasoning", + ) + + expect(value).toBe("continue") + expect(reasoning).toHaveLength(1) + expect(reasoning[0]?.text).toBe("The user") + expect(reasoning[0]?.time.end).toBeDefined() + }), + { git: true, config: (url) => providerCfg(url) }, + ), +) + it.live("session.processor effect tests reset reasoning state across retries", () => provideTmpdirServer( ({ dir, llm }) =>