Compare commits

...

1 Commits

Author SHA1 Message Date
Kit Langton
3f93669f4c refactor(session): replace Effect.promise with Effect.sync for sync MessageV2.stream calls
MessageV2.stream is a synchronous generator, so wrapping it in
Effect.promise with `for await` is an unnecessary async roundtrip.
Use Effect.sync with a plain `for` loop instead.
2026-04-10 23:33:46 -04:00
2 changed files with 36 additions and 17 deletions

View File

@@ -29,7 +29,7 @@ import type { Provider } from "@/provider/provider"
import { Permission } from "@/permission"
import { Global } from "@/global"
import type { LanguageModelV2Usage } from "@ai-sdk/provider"
import { Effect, Layer, Context } from "effect"
import { Effect, Layer, Option, Context } from "effect"
import { makeRuntime } from "@/effect/run-service"
export namespace Session {
@@ -352,6 +352,17 @@ export namespace Session {
field: string
delta: string
}) => Effect.Effect<void>
/** Finds the first message matching the predicate, searching newest-first. */
readonly findMessage: {
<T extends MessageV2.WithParts>(
sessionID: SessionID,
predicate: (msg: MessageV2.WithParts) => msg is T,
): Effect.Effect<Option.Option<T>>
(
sessionID: SessionID,
predicate: (msg: MessageV2.WithParts) => boolean,
): Effect.Effect<Option.Option<MessageV2.WithParts>>
}
}
export class Service extends Context.Service<Service, Interface>()("@opencode/Session") {}
@@ -636,6 +647,17 @@ export namespace Session {
yield* bus.publish(MessageV2.Event.PartDelta, input)
})
/** Finds the first message matching the predicate, searching newest-first. */
const findMessage = Effect.fn("Session.findMessage")(function* (
sessionID: SessionID,
predicate: (msg: MessageV2.WithParts) => boolean,
) {
for (const item of MessageV2.stream(sessionID)) {
if (predicate(item)) return Option.some(item)
}
return Option.none<MessageV2.WithParts>()
})
return Service.of({
create,
fork,
@@ -657,6 +679,7 @@ export namespace Session {
updatePart,
getPart,
updatePartDelta,
findMessage,
})
}),
)

View File

@@ -907,12 +907,11 @@ NOTE: At any point in time through this workflow you should feel free to ask the
})
const lastModel = Effect.fnUntraced(function* (sessionID: SessionID) {
const model = yield* Effect.promise(async () => {
for await (const item of MessageV2.stream(sessionID)) {
if (item.info.role === "user" && item.info.model) return item.info.model
}
})
if (model) return model
const match = yield* sessions.findMessage(
sessionID,
(m): m is MessageV2.WithParts & { info: MessageV2.User } => m.info.role === "user" && !!m.info.model,
)
if (Option.isSome(match)) return match.value.info.model
return yield* provider.defaultModel()
})
@@ -1295,16 +1294,13 @@ NOTE: At any point in time through this workflow you should feel free to ask the
},
)
const lastAssistant = (sessionID: SessionID) =>
Effect.promise(async () => {
let latest: MessageV2.WithParts | undefined
for await (const item of MessageV2.stream(sessionID)) {
latest ??= item
if (item.info.role !== "user") return item
}
if (latest) return latest
throw new Error("Impossible")
})
const lastAssistant = Effect.fnUntraced(function* (sessionID: SessionID) {
const match = yield* sessions.findMessage(sessionID, (m) => m.info.role !== "user")
if (Option.isSome(match)) return match.value
const msgs = yield* sessions.messages({ sessionID, limit: 1 })
if (msgs.length > 0) return msgs[0]
throw new Error("Impossible")
})
const runLoop: (sessionID: SessionID) => Effect.Effect<MessageV2.WithParts> = Effect.fn("SessionPrompt.run")(
function* (sessionID: SessionID) {