use Session.Service and Agent.Service directly instead of Effect.promise wrappers

Yield the effectified services in the layer and call their methods
directly, eliminating the double Effect→Promise→Effect bounce through
async facades. Layer.unwrap(Effect.sync(...)) breaks the circular
import. Also improves the assertNotBusy test with a proper gate/spy
so it deterministically catches the busy state.
This commit is contained in:
Kit Langton
2026-03-28 12:15:53 -04:00
parent 5719a13cbf
commit 76373f0c73
2 changed files with 80 additions and 46 deletions

View File

@@ -90,6 +90,8 @@ export namespace SessionPrompt {
Effect.gen(function* () {
const bus = yield* Bus.Service
const status = yield* SessionStatus.Service
const sessions = yield* Session.Service
const agents = yield* Agent.Service
const scope = yield* Scope.Scope
const cache = yield* InstanceState.make(
@@ -137,10 +139,10 @@ export namespace SessionPrompt {
})
const promptE = Effect.fn("SessionPrompt.prompt")(function* (input: PromptInput) {
const session = yield* Effect.promise(() => Session.get(input.sessionID))
const session = yield* sessions.get(input.sessionID)
yield* Effect.promise(() => SessionRevert.cleanup(session))
const message = yield* Effect.promise(() => createUserMessage(input))
yield* Effect.promise(() => Session.touch(input.sessionID))
yield* sessions.touch(input.sessionID)
const permissions: Permission.Ruleset = []
for (const [t, enabled] of Object.entries(input.tools ?? {})) {
@@ -148,7 +150,7 @@ export namespace SessionPrompt {
}
if (permissions.length > 0) {
session.permission = permissions
yield* Effect.promise(() => Session.setPermission({ sessionID: session.id, permission: permissions }))
yield* sessions.setPermission({ sessionID: session.id, permission: permissions })
}
if (input.noReply === true) return message
@@ -158,7 +160,7 @@ export namespace SessionPrompt {
const runLoop = Effect.fn("SessionPrompt.run")(function* (sessionID: SessionID) {
let structured: unknown | undefined
let step = 0
const session = yield* Effect.promise(() => Session.get(sessionID))
const session = yield* sessions.get(sessionID)
while (true) {
yield* status.set(sessionID, { type: "busy" })
@@ -251,11 +253,9 @@ export namespace SessionPrompt {
continue
}
const agent = yield* Effect.promise(() => Agent.get(lastUser!.agent))
const agent = yield* agents.get(lastUser!.agent)
if (!agent) {
const available = yield* Effect.promise(() =>
Agent.list().then((agents) => agents.filter((a) => !a.hidden).map((a) => a.name)),
)
const available = (yield* agents.list()).filter((a) => !a.hidden).map((a) => a.name)
const hint = available.length ? ` Available agents: ${available.join(", ")}` : ""
const error = new NamedError.Unknown({ message: `Agent not found: "${lastUser!.agent}".${hint}` })
yield* bus.publish(Session.Event.Error, { sessionID, error: error.toObject() })
@@ -265,30 +265,28 @@ export namespace SessionPrompt {
const isLastStep = step >= maxSteps
msgs = yield* Effect.promise(() => insertReminders({ messages: msgs, agent, session }))
const msg = yield* Effect.promise(() =>
Session.updateMessage({
id: MessageID.ascending(),
parentID: lastUser!.id,
role: "assistant",
mode: agent.name,
agent: agent.name,
variant: lastUser!.variant,
path: { cwd: Instance.directory, root: Instance.worktree },
cost: 0,
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
modelID: model.id,
providerID: model.providerID,
time: { created: Date.now() },
sessionID,
}),
)
const msg = yield* sessions.updateMessage({
id: MessageID.ascending(),
parentID: lastUser!.id,
role: "assistant",
mode: agent.name,
agent: agent.name,
variant: lastUser!.variant,
path: { cwd: Instance.directory, root: Instance.worktree },
cost: 0,
tokens: { input: 0, output: 0, reasoning: 0, cache: { read: 0, write: 0 } },
modelID: model.id,
providerID: model.providerID,
time: { created: Date.now() },
sessionID,
})
const processor = yield* Effect.promise((signal) =>
Promise.resolve(SessionProcessor.create({
SessionProcessor.create({
assistantMessage: msg as MessageV2.Assistant,
sessionID,
model,
abort: signal,
})),
}),
)
const outcome: "break" | "continue" = yield* Effect.ensuring(
@@ -363,7 +361,7 @@ export namespace SessionPrompt {
if (structured !== undefined) {
processor.message.structured = structured
processor.message.finish = processor.message.finish ?? "stop"
yield* Effect.promise(() => Session.updateMessage(processor.message))
yield* sessions.updateMessage(processor.message)
return "break" as const
}
@@ -374,7 +372,7 @@ export namespace SessionPrompt {
message: "Model did not produce structured output",
retries: 0,
}).toObject()
yield* Effect.promise(() => Session.updateMessage(processor.message))
yield* sessions.updateMessage(processor.message)
return "break" as const
}
}
@@ -506,9 +504,15 @@ export namespace SessionPrompt {
}),
)
const defaultLayer = layer.pipe(
Layer.provide(SessionStatus.layer),
Layer.provide(Bus.layer),
const defaultLayer = Layer.unwrap(
Effect.sync(() =>
layer.pipe(
Layer.provide(SessionStatus.layer),
Layer.provide(Session.defaultLayer),
Layer.provide(Agent.defaultLayer),
Layer.provide(Bus.layer),
),
),
)
const { runPromise } = makeRuntime(Service, defaultLayer)

View File

@@ -1,5 +1,6 @@
import { describe, expect, test } from "bun:test"
import { describe, expect, spyOn, test } from "bun:test"
import { Instance } from "../../src/project/instance"
import { Provider } from "../../src/provider/provider"
import { Session } from "../../src/session"
import { MessageV2 } from "../../src/session/message-v2"
import { SessionPrompt } from "../../src/session/prompt"
@@ -10,6 +11,14 @@ import { tmpdir } from "../fixture/fixture"
Log.init({ print: false })
function deferred() {
let resolve!: () => void
const promise = new Promise<void>((done) => {
resolve = done
})
return { promise, resolve }
}
// Helper: seed a session with a user message + finished assistant message
// so loop() exits immediately without calling any LLM
async function seed(sessionID: SessionID) {
@@ -101,23 +110,44 @@ describe("session.prompt concurrency", () => {
directory: tmp.path,
fn: async () => {
const session = await Session.create({})
await seed(session.id)
const userMsg: MessageV2.Info = {
id: MessageID.ascending(),
role: "user",
sessionID: session.id,
time: { created: Date.now() },
agent: "build",
model: { providerID: "openai" as any, modelID: "gpt-5.2" as any },
}
await Session.updateMessage(userMsg)
await Session.updatePart({
id: PartID.ascending(),
messageID: userMsg.id,
sessionID: session.id,
type: "text",
text: "hello",
})
// Start loop — it'll finish fast but we can race assertNotBusy
const loopPromise = SessionPrompt.loop({ sessionID: session.id })
const ready = deferred()
const gate = deferred()
const getModel = spyOn(Provider, "getModel").mockImplementation(async () => {
ready.resolve()
await gate.promise
throw new Error("test stop")
})
// Give the loop fiber a tick to register in the map
await new Promise((r) => setTimeout(r, 5))
try {
const loopPromise = SessionPrompt.loop({ sessionID: session.id }).catch(() => undefined)
await ready.promise
// The loop may already be done on fast machines, so this test
// verifies the mechanism works — if the loop is still running,
// assertNotBusy should throw
const busyCheck = SessionPrompt.assertNotBusy(session.id).then(
() => "not-busy",
(e) => (e instanceof Session.BusyError ? "busy" : "other-error"),
)
await expect(SessionPrompt.assertNotBusy(session.id)).rejects.toBeInstanceOf(Session.BusyError)
gate.resolve()
await loopPromise
} finally {
gate.resolve()
getModel.mockRestore()
}
await loopPromise
// After loop completes, assertNotBusy should succeed
await SessionPrompt.assertNotBusy(session.id)
},