make Session.updateMessage/updatePart generic, remove all MessageV2 casts

Generic interface preserves the narrowed type through update calls,
eliminating `as MessageV2.Assistant` / `as MessageV2.ToolPart` casts.
Async facades keep Zod validation via parse + cast, with `.force()`
bypass. Tighten test assertions for BusyError type and concurrent
caller identity.
This commit is contained in:
Kit Langton
2026-03-29 12:21:10 -04:00
parent cdac7a7dd1
commit b23e5b78a8
2 changed files with 100 additions and 31 deletions

View File

@@ -350,14 +350,14 @@ export namespace Session {
readonly messages: (input: { sessionID: SessionID; limit?: number }) => Effect.Effect<MessageV2.WithParts[]>
readonly children: (parentID: SessionID) => Effect.Effect<Info[]>
readonly remove: (sessionID: SessionID) => Effect.Effect<void>
readonly updateMessage: (msg: MessageV2.Info) => Effect.Effect<MessageV2.Info>
readonly updateMessage: <T extends MessageV2.Info>(msg: T) => Effect.Effect<T>
readonly removeMessage: (input: { sessionID: SessionID; messageID: MessageID }) => Effect.Effect<MessageID>
readonly removePart: (input: {
sessionID: SessionID
messageID: MessageID
partID: PartID
}) => Effect.Effect<PartID>
readonly updatePart: (part: MessageV2.Part) => Effect.Effect<MessageV2.Part>
readonly updatePart: <T extends MessageV2.Part>(part: T) => Effect.Effect<T>
readonly updatePartDelta: (input: {
sessionID: SessionID
messageID: MessageID
@@ -485,26 +485,23 @@ export namespace Session {
}
})
const updateMessage = Effect.fn("Session.updateMessage")(function* (msg: MessageV2.Info) {
yield* Effect.sync(() =>
SyncEvent.run(MessageV2.Event.Updated, {
sessionID: msg.sessionID,
info: msg,
}),
)
return msg
})
const updateMessage = <T extends MessageV2.Info>(msg: T): Effect.Effect<T> =>
Effect.gen(function* () {
yield* Effect.sync(() => SyncEvent.run(MessageV2.Event.Updated, { sessionID: msg.sessionID, info: msg }))
return msg
}).pipe(Effect.withSpan("Session.updateMessage"))
const updatePart = Effect.fn("Session.updatePart")(function* (part: MessageV2.Part) {
yield* Effect.sync(() =>
SyncEvent.run(MessageV2.Event.PartUpdated, {
sessionID: part.sessionID,
part: structuredClone(part),
time: Date.now(),
}),
)
return part
})
const updatePart = <T extends MessageV2.Part>(part: T): Effect.Effect<T> =>
Effect.gen(function* () {
yield* Effect.sync(() =>
SyncEvent.run(MessageV2.Event.PartUpdated, {
sessionID: part.sessionID,
part: structuredClone(part),
time: Date.now(),
}),
)
return part
}).pipe(Effect.withSpan("Session.updatePart"))
const create = Effect.fn("Session.create")(function* (input?: {
parentID?: SessionID
@@ -867,7 +864,17 @@ export namespace Session {
export const children = fn(SessionID.zod, (id) => runPromise((svc) => svc.children(id)))
export const remove = fn(SessionID.zod, (id) => runPromise((svc) => svc.remove(id)))
export const updateMessage = fn(MessageV2.Info, (msg) => runPromise((svc) => svc.updateMessage(msg)))
export const updateMessage = Object.assign(
async function updateMessage<T extends MessageV2.Info>(msg: T): Promise<T> {
return runPromise((svc) => svc.updateMessage(MessageV2.Info.parse(msg) as T))
},
{
schema: MessageV2.Info,
force<T extends MessageV2.Info>(msg: T): Promise<T> {
return runPromise((svc) => svc.updateMessage(msg))
},
},
)
export const removeMessage = fn(z.object({ sessionID: SessionID.zod, messageID: MessageID.zod }), (input) =>
runPromise((svc) => svc.removeMessage(input)),
@@ -878,7 +885,17 @@ export namespace Session {
(input) => runPromise((svc) => svc.removePart(input)),
)
export const updatePart = fn(MessageV2.Part, (part) => runPromise((svc) => svc.updatePart(part)))
export const updatePart = Object.assign(
async function updatePart<T extends MessageV2.Part>(part: T): Promise<T> {
return runPromise((svc) => svc.updatePart(MessageV2.Part.parse(part) as T))
},
{
schema: MessageV2.Part,
force<T extends MessageV2.Part>(part: T): Promise<T> {
return runPromise((svc) => svc.updatePart(part))
},
},
)
export const updatePartDelta = fn(
z.object({

View File

@@ -386,7 +386,7 @@ const boot = Effect.fn("test.boot")(function* (input?: { title?: string }) {
const test = yield* TestLLM
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const chat = yield* sessions.create(input ?? {})
const chat = yield* sessions.create(input ?? { title: "Pinned" })
return { test, prompt, sessions, chat }
})
@@ -438,6 +438,10 @@ it.effect("loop continues when finish is tool-calls", () =>
const result = yield* prompt.loop({ sessionID: chat.id })
expect(yield* test.calls).toBe(2)
expect(result.info.role).toBe("assistant")
if (result.info.role === "assistant") {
expect(result.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true)
expect(result.info.finish).toBe("stop")
}
}),
{ git: true, config: cfg },
),
@@ -540,7 +544,7 @@ it.effect("loop sets status to busy then idle", () =>
// Cancel semantics
it.effect(
"cancel interrupts loop and returns last assistant",
"cancel interrupts loop and resolves with an assistant message",
() =>
provideTmpdirInstance(
(dir) =>
@@ -629,9 +633,11 @@ it.effect(
yield* prompt.cancel(chat.id)
const [exitA, exitB] = yield* Effect.all([Fiber.await(a), Fiber.await(b)])
// Both should resolve (success or interrupt, not error)
expect(Exit.isFailure(exitA) && !Cause.hasInterruptsOnly(exitA.cause)).toBe(false)
expect(Exit.isFailure(exitB) && !Cause.hasInterruptsOnly(exitB.cause)).toBe(false)
expect(Exit.isSuccess(exitA)).toBe(true)
expect(Exit.isSuccess(exitB)).toBe(true)
if (Exit.isSuccess(exitA) && Exit.isSuccess(exitB)) {
expect(exitA.value.info.id).toBe(exitB.value.info.id)
}
}),
{ git: true, config: cfg },
),
@@ -678,6 +684,9 @@ it.effect("concurrent loop callers all receive same error result", () =>
if (a.info.role === "assistant") {
expect(a.info.error).toBeDefined()
}
if (b.info.role === "assistant") {
expect(b.info.error).toBeDefined()
}
}),
{ git: true, config: cfg },
),
@@ -708,6 +717,9 @@ it.effect(
const exit = yield* prompt.assertNotBusy(chat.id).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
}
yield* prompt.cancel(chat.id)
yield* Fiber.await(fiber)
@@ -755,6 +767,9 @@ it.effect(
const exit = yield* prompt.shell({ sessionID: chat.id, agent: "build", command: "echo hi" }).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
}
yield* prompt.cancel(chat.id)
yield* Fiber.await(fiber)
@@ -805,8 +820,8 @@ it.effect(
while (Date.now() - start < 2000) {
const msgs = await MessageV2.filterCompacted(MessageV2.stream(chat.id))
const taskMsg = msgs.find((item) => item.info.role === "assistant")
const tool = taskMsg ? runningTool(taskMsg.parts) : undefined
if (tool?.state.metadata?.output.includes("first")) return
const tool = taskMsg ? toolPart(taskMsg.parts) : undefined
if (tool?.state.status === "running" && tool.state.metadata?.output.includes("first")) return
await new Promise((done) => setTimeout(done, 20))
}
throw new Error("timed out waiting for running shell metadata")
@@ -909,7 +924,10 @@ it.effect(
expect(Exit.isSuccess(exit)).toBe(true)
if (Exit.isSuccess(exit)) {
expect(exit.value.info.role).toBe("assistant")
expect(exit.value.parts.some((part) => part.type === "tool")).toBe(true)
const tool = completedTool(exit.value.parts)
if (tool) {
expect(tool.state.output).toContain("User aborted the command")
}
}
const status = yield* SessionStatus.Service
@@ -922,6 +940,37 @@ it.effect(
30_000,
)
it.effect(
"cancel interrupts loop queued behind shell",
() =>
provideTmpdirInstance(
(dir) =>
Effect.gen(function* () {
const { prompt, chat } = yield* boot()
const sh = yield* prompt
.shell({ sessionID: chat.id, agent: "build", command: "sleep 30" })
.pipe(Effect.forkChild)
yield* waitMs(50)
const run = yield* prompt.loop({ sessionID: chat.id }).pipe(Effect.forkChild)
yield* waitMs(50)
yield* prompt.cancel(chat.id)
const exit = yield* Fiber.await(run)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.hasInterruptsOnly(exit.cause)).toBe(true)
}
yield* Fiber.await(sh)
}),
{ git: true, config: cfg },
),
30_000,
)
it.effect(
"shell rejects when another shell is already running",
() =>
@@ -937,6 +986,9 @@ it.effect(
const exit = yield* prompt.shell({ sessionID: chat.id, agent: "build", command: "echo hi" }).pipe(Effect.exit)
expect(Exit.isFailure(exit)).toBe(true)
if (Exit.isFailure(exit)) {
expect(Cause.squash(exit.cause)).toBeInstanceOf(Session.BusyError)
}
yield* prompt.cancel(chat.id)
yield* Fiber.await(a)