mirror of
https://github.com/anomalyco/opencode.git
synced 2026-04-09 15:35:07 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
46da801f30 | ||
|
|
58a99916bb | ||
|
|
c29392d085 | ||
|
|
46f243fea7 |
@@ -182,7 +182,6 @@ function ConnectionGate(props: ParentProps<{ disableHealthCheck?: boolean }>) {
|
||||
if (checkMode() === "background" || type === "http") return false
|
||||
}
|
||||
}).pipe(
|
||||
effectMinDuration(checkMode() === "blocking" ? "1.2 seconds" : 0),
|
||||
Effect.timeoutOrElse({ duration: "10 seconds", orElse: () => Effect.succeed(false) }),
|
||||
Effect.ensuring(Effect.sync(() => setCheckMode("background"))),
|
||||
Effect.runPromise,
|
||||
|
||||
@@ -11,6 +11,8 @@ const dir = path.resolve(__dirname, "..")
|
||||
|
||||
process.chdir(dir)
|
||||
|
||||
await import("./generate.ts")
|
||||
|
||||
// Load migrations from migration directories
|
||||
const migrationDirs = (
|
||||
await fs.promises.readdir(path.join(dir, "migration"), {
|
||||
|
||||
@@ -12,24 +12,11 @@ const dir = path.resolve(__dirname, "..")
|
||||
|
||||
process.chdir(dir)
|
||||
|
||||
await import("./generate.ts")
|
||||
|
||||
import { Script } from "@opencode-ai/script"
|
||||
import pkg from "../package.json"
|
||||
|
||||
const modelsUrl = process.env.OPENCODE_MODELS_URL || "https://models.dev"
|
||||
// Fetch and generate models.dev snapshot
|
||||
const modelsData = process.env.MODELS_DEV_API_JSON
|
||||
? await Bun.file(process.env.MODELS_DEV_API_JSON).text()
|
||||
: await fetch(`${modelsUrl}/api.json`).then((x) => x.text())
|
||||
await Bun.write(
|
||||
path.join(dir, "src/provider/models-snapshot.js"),
|
||||
`// @ts-nocheck\n// Auto-generated by build.ts - do not edit\nexport const snapshot = ${modelsData}\n`,
|
||||
)
|
||||
await Bun.write(
|
||||
path.join(dir, "src/provider/models-snapshot.d.ts"),
|
||||
`// Auto-generated by build.ts - do not edit\nexport declare const snapshot: Record<string, unknown>\n`,
|
||||
)
|
||||
console.log("Generated models-snapshot.js")
|
||||
|
||||
// Load migrations from migration directories
|
||||
const migrationDirs = (
|
||||
await fs.promises.readdir(path.join(dir, "migration"), {
|
||||
|
||||
23
packages/opencode/script/generate.ts
Normal file
23
packages/opencode/script/generate.ts
Normal file
@@ -0,0 +1,23 @@
|
||||
import path from "path"
|
||||
import { fileURLToPath } from "url"
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url)
|
||||
const __dirname = path.dirname(__filename)
|
||||
const dir = path.resolve(__dirname, "..")
|
||||
|
||||
process.chdir(dir)
|
||||
|
||||
const modelsUrl = process.env.OPENCODE_MODELS_URL || "https://models.dev"
|
||||
// Fetch and generate models.dev snapshot
|
||||
const modelsData = process.env.MODELS_DEV_API_JSON
|
||||
? await Bun.file(process.env.MODELS_DEV_API_JSON).text()
|
||||
: await fetch(`${modelsUrl}/api.json`).then((x) => x.text())
|
||||
await Bun.write(
|
||||
path.join(dir, "src/provider/models-snapshot.js"),
|
||||
`// @ts-nocheck\n// Auto-generated by build.ts - do not edit\nexport const snapshot = ${modelsData}\n`,
|
||||
)
|
||||
await Bun.write(
|
||||
path.join(dir, "src/provider/models-snapshot.d.ts"),
|
||||
`// Auto-generated by build.ts - do not edit\nexport declare const snapshot: Record<string, unknown>\n`,
|
||||
)
|
||||
console.log("Generated models-snapshot.js")
|
||||
@@ -1,10 +1,10 @@
|
||||
import { Cause, Deferred, Effect, Exit, Fiber, Option, Schema, Scope, SynchronizedRef } from "effect"
|
||||
import { Cause, Deferred, Effect, Exit, Fiber, Schema, Scope, SynchronizedRef } from "effect"
|
||||
|
||||
export interface Runner<A, E = never> {
|
||||
readonly state: Runner.State<A, E>
|
||||
readonly busy: boolean
|
||||
readonly ensureRunning: (work: Effect.Effect<A, E>) => Effect.Effect<A, E>
|
||||
readonly startShell: (work: (signal: AbortSignal) => Effect.Effect<A, E>) => Effect.Effect<A, E>
|
||||
readonly startShell: (work: Effect.Effect<A, E>) => Effect.Effect<A, E>
|
||||
readonly cancel: Effect.Effect<void>
|
||||
}
|
||||
|
||||
@@ -20,7 +20,6 @@ export namespace Runner {
|
||||
interface ShellHandle<A, E> {
|
||||
id: number
|
||||
fiber: Fiber.Fiber<A, E>
|
||||
abort: AbortController
|
||||
}
|
||||
|
||||
interface PendingHandle<A, E> {
|
||||
@@ -100,13 +99,7 @@ export namespace Runner {
|
||||
}),
|
||||
).pipe(Effect.flatten)
|
||||
|
||||
const stopShell = (shell: ShellHandle<A, E>) =>
|
||||
Effect.gen(function* () {
|
||||
shell.abort.abort()
|
||||
const exit = yield* Fiber.await(shell.fiber).pipe(Effect.timeoutOption("100 millis"))
|
||||
if (Option.isNone(exit)) yield* Fiber.interrupt(shell.fiber)
|
||||
yield* Fiber.await(shell.fiber).pipe(Effect.exit, Effect.asVoid)
|
||||
})
|
||||
const stopShell = (shell: ShellHandle<A, E>) => Fiber.interrupt(shell.fiber)
|
||||
|
||||
const ensureRunning = (work: Effect.Effect<A, E>) =>
|
||||
SynchronizedRef.modifyEffect(
|
||||
@@ -138,7 +131,7 @@ export namespace Runner {
|
||||
),
|
||||
)
|
||||
|
||||
const startShell = (work: (signal: AbortSignal) => Effect.Effect<A, E>) =>
|
||||
const startShell = (work: Effect.Effect<A, E>) =>
|
||||
SynchronizedRef.modifyEffect(
|
||||
ref,
|
||||
Effect.fnUntraced(function* (st) {
|
||||
@@ -153,9 +146,8 @@ export namespace Runner {
|
||||
}
|
||||
yield* busy
|
||||
const id = next()
|
||||
const abort = new AbortController()
|
||||
const fiber = yield* work(abort.signal).pipe(Effect.ensuring(finishShell(id)), Effect.forkChild)
|
||||
const shell = { id, fiber, abort } satisfies ShellHandle<A, E>
|
||||
const fiber = yield* work.pipe(Effect.ensuring(finishShell(id)), Effect.forkChild)
|
||||
const shell = { id, fiber } satisfies ShellHandle<A, E>
|
||||
return [
|
||||
Effect.gen(function* () {
|
||||
const exit = yield* Fiber.await(fiber)
|
||||
|
||||
@@ -234,7 +234,11 @@ export namespace LLM {
|
||||
// from the workflow service are executed via opencode's tool system
|
||||
// and results sent back over the WebSocket.
|
||||
if (language instanceof GitLabWorkflowLanguageModel) {
|
||||
const workflowModel = language
|
||||
const workflowModel = language as GitLabWorkflowLanguageModel & {
|
||||
sessionID?: string
|
||||
sessionPreapprovedTools?: string[]
|
||||
approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }>
|
||||
}
|
||||
workflowModel.sessionID = input.sessionID
|
||||
workflowModel.systemPrompt = system.join("\n")
|
||||
workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => {
|
||||
@@ -301,7 +305,7 @@ export namespace LLM {
|
||||
ruleset: [],
|
||||
})
|
||||
for (const name of uniqueNames) approvedToolsForSession.add(name)
|
||||
workflowModel.sessionPreapprovedTools = [...workflowModel.sessionPreapprovedTools, ...uniqueNames]
|
||||
workflowModel.sessionPreapprovedTools = [...(workflowModel.sessionPreapprovedTools ?? []), ...uniqueNames]
|
||||
return { approved: true }
|
||||
} catch {
|
||||
return { approved: false }
|
||||
|
||||
@@ -751,16 +751,32 @@ export namespace MessageV2 {
|
||||
...(differentModel ? {} : { callProviderMetadata: providerMeta(part.metadata) }),
|
||||
})
|
||||
}
|
||||
if (part.state.status === "error")
|
||||
assistantMessage.parts.push({
|
||||
type: ("tool-" + part.tool) as `tool-${string}`,
|
||||
state: "output-error",
|
||||
toolCallId: part.callID,
|
||||
input: part.state.input,
|
||||
errorText: part.state.error,
|
||||
...(part.metadata?.providerExecuted ? { providerExecuted: true } : {}),
|
||||
...(differentModel ? {} : { callProviderMetadata: providerMeta(part.metadata) }),
|
||||
})
|
||||
if (part.state.status === "error") {
|
||||
const output = part.state.metadata?.interrupted === true ? part.state.metadata.output : undefined
|
||||
if (typeof output === "string") {
|
||||
assistantMessage.parts.push({
|
||||
type: ("tool-" + part.tool) as `tool-${string}`,
|
||||
state: "output-available",
|
||||
toolCallId: part.callID,
|
||||
input: part.state.input,
|
||||
output,
|
||||
...(part.metadata?.providerExecuted ? { providerExecuted: true } : {}),
|
||||
...(differentModel ? {} : { callProviderMetadata: providerMeta(part.metadata) }),
|
||||
})
|
||||
} else {
|
||||
assistantMessage.parts.push({
|
||||
type: ("tool-" + part.tool) as `tool-${string}`,
|
||||
state: "output-error",
|
||||
toolCallId: part.callID,
|
||||
input: part.state.input,
|
||||
errorText: part.state.error,
|
||||
...(part.metadata?.providerExecuted ? { providerExecuted: true } : {}),
|
||||
...(differentModel ? {} : { callProviderMetadata: providerMeta(part.metadata) }),
|
||||
})
|
||||
}
|
||||
}
|
||||
// Handle pending/running tool calls to prevent dangling tool_use blocks
|
||||
// Anthropic/Claude APIs require every tool_use to have a corresponding tool_result
|
||||
if (part.state.status === "pending" || part.state.status === "running")
|
||||
assistantMessage.parts.push({
|
||||
type: ("tool-" + part.tool) as `tool-${string}`,
|
||||
|
||||
@@ -18,6 +18,7 @@ import { SessionStatus } from "./status"
|
||||
import { SessionSummary } from "./summary"
|
||||
import type { Provider } from "@/provider/provider"
|
||||
import { Question } from "@/question"
|
||||
import { isRecord } from "@/util/record"
|
||||
|
||||
export namespace SessionProcessor {
|
||||
const DOOM_LOOP_THRESHOLD = 3
|
||||
@@ -351,7 +352,10 @@ export namespace SessionProcessor {
|
||||
},
|
||||
{ text: ctx.currentText.text },
|
||||
)).text
|
||||
ctx.currentText.time = { start: Date.now(), end: Date.now() }
|
||||
{
|
||||
const end = Date.now()
|
||||
ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end }
|
||||
}
|
||||
if (value.providerMetadata) ctx.currentText.metadata = value.providerMetadata
|
||||
yield* session.updatePart(ctx.currentText)
|
||||
ctx.currentText = undefined
|
||||
@@ -398,19 +402,21 @@ export namespace SessionProcessor {
|
||||
}
|
||||
ctx.reasoningMap = {}
|
||||
|
||||
const parts = MessageV2.parts(ctx.assistantMessage.id)
|
||||
for (const part of parts) {
|
||||
if (part.type !== "tool" || part.state.status === "completed" || part.state.status === "error") continue
|
||||
for (const part of Object.values(ctx.toolcalls)) {
|
||||
const end = Date.now()
|
||||
const metadata = "metadata" in part.state && isRecord(part.state.metadata) ? part.state.metadata : {}
|
||||
yield* session.updatePart({
|
||||
...part,
|
||||
state: {
|
||||
...part.state,
|
||||
status: "error",
|
||||
error: "Tool execution aborted",
|
||||
time: { start: Date.now(), end: Date.now() },
|
||||
metadata: { ...metadata, interrupted: true },
|
||||
time: { start: "time" in part.state ? part.state.time.start : end, end },
|
||||
},
|
||||
})
|
||||
}
|
||||
ctx.toolcalls = {}
|
||||
ctx.assistantMessage.time.completed = Date.now()
|
||||
yield* session.updateMessage(ctx.assistantMessage)
|
||||
})
|
||||
|
||||
@@ -743,7 +743,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
} satisfies MessageV2.TextPart)
|
||||
})
|
||||
|
||||
const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput, signal: AbortSignal) {
|
||||
const shellImpl = Effect.fn("SessionPrompt.shellImpl")(function* (input: ShellInput) {
|
||||
const ctx = yield* InstanceState.context
|
||||
const session = yield* sessions.get(input.sessionID)
|
||||
if (session.revert) {
|
||||
@@ -1507,7 +1507,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
Effect.promise(() => SystemPrompt.skills(agent)),
|
||||
Effect.promise(() => SystemPrompt.environment(model)),
|
||||
instruction.system().pipe(Effect.orDie),
|
||||
Effect.promise(() => MessageV2.toModelMessages(msgs, model)),
|
||||
MessageV2.toModelMessagesEffect(msgs, model),
|
||||
])
|
||||
const system = [...env, ...(skills ? [skills] : []), ...instructions]
|
||||
const format = lastUser.format ?? { type: "text" as const }
|
||||
@@ -1577,7 +1577,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
||||
function* (input: ShellInput) {
|
||||
const s = yield* InstanceState.get(state)
|
||||
const runner = getRunner(s.runners, input.sessionID)
|
||||
return yield* runner.startShell((signal) => shellImpl(input, signal))
|
||||
return yield* runner.startShell(shellImpl(input))
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@@ -250,7 +250,7 @@ describe("Runner", () => {
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
const runner = Runner.make<string>(s)
|
||||
const result = yield* runner.startShell((_signal) => Effect.succeed("shell-done"))
|
||||
const result = yield* runner.startShell(Effect.succeed("shell-done"))
|
||||
expect(result).toBe("shell-done")
|
||||
expect(runner.busy).toBe(false)
|
||||
}),
|
||||
@@ -264,7 +264,7 @@ describe("Runner", () => {
|
||||
const fiber = yield* runner.ensureRunning(Effect.never.pipe(Effect.as("x"))).pipe(Effect.forkChild)
|
||||
yield* Effect.sleep("10 millis")
|
||||
|
||||
const exit = yield* runner.startShell((_s) => Effect.succeed("nope")).pipe(Effect.exit)
|
||||
const exit = yield* runner.startShell(Effect.succeed("nope")).pipe(Effect.exit)
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
|
||||
yield* runner.cancel
|
||||
@@ -279,12 +279,10 @@ describe("Runner", () => {
|
||||
const runner = Runner.make<string>(s)
|
||||
const gate = yield* Deferred.make<void>()
|
||||
|
||||
const sh = yield* runner
|
||||
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("first")))
|
||||
.pipe(Effect.forkChild)
|
||||
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("first"))).pipe(Effect.forkChild)
|
||||
yield* Effect.sleep("10 millis")
|
||||
|
||||
const exit = yield* runner.startShell((_s) => Effect.succeed("second")).pipe(Effect.exit)
|
||||
const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
|
||||
yield* Deferred.succeed(gate, undefined)
|
||||
@@ -302,37 +300,26 @@ describe("Runner", () => {
|
||||
},
|
||||
})
|
||||
|
||||
const sh = yield* runner
|
||||
.startShell((signal) =>
|
||||
Effect.promise(
|
||||
() =>
|
||||
new Promise<string>((resolve) => {
|
||||
signal.addEventListener("abort", () => resolve("aborted"), { once: true })
|
||||
}),
|
||||
),
|
||||
)
|
||||
.pipe(Effect.forkChild)
|
||||
const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
|
||||
yield* Effect.sleep("10 millis")
|
||||
|
||||
const exit = yield* runner.startShell((_s) => Effect.succeed("second")).pipe(Effect.exit)
|
||||
const exit = yield* runner.startShell(Effect.succeed("second")).pipe(Effect.exit)
|
||||
expect(Exit.isFailure(exit)).toBe(true)
|
||||
|
||||
yield* runner.cancel
|
||||
const done = yield* Fiber.await(sh)
|
||||
expect(Exit.isSuccess(done)).toBe(true)
|
||||
expect(Exit.isFailure(done)).toBe(true)
|
||||
}),
|
||||
)
|
||||
|
||||
it.live(
|
||||
"cancel interrupts shell that ignores abort signal",
|
||||
"cancel interrupts shell",
|
||||
Effect.gen(function* () {
|
||||
const s = yield* Scope.Scope
|
||||
const runner = Runner.make<string>(s)
|
||||
const gate = yield* Deferred.make<void>()
|
||||
|
||||
const sh = yield* runner
|
||||
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("ignored")))
|
||||
.pipe(Effect.forkChild)
|
||||
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ignored"))).pipe(Effect.forkChild)
|
||||
yield* Effect.sleep("10 millis")
|
||||
|
||||
const stop = yield* runner.cancel.pipe(Effect.forkChild)
|
||||
@@ -356,9 +343,7 @@ describe("Runner", () => {
|
||||
const runner = Runner.make<string>(s)
|
||||
const gate = yield* Deferred.make<void>()
|
||||
|
||||
const sh = yield* runner
|
||||
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("shell-result")))
|
||||
.pipe(Effect.forkChild)
|
||||
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell-result"))).pipe(Effect.forkChild)
|
||||
yield* Effect.sleep("10 millis")
|
||||
expect(runner.state._tag).toBe("Shell")
|
||||
|
||||
@@ -384,9 +369,7 @@ describe("Runner", () => {
|
||||
const calls = yield* Ref.make(0)
|
||||
const gate = yield* Deferred.make<void>()
|
||||
|
||||
const sh = yield* runner
|
||||
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("shell")))
|
||||
.pipe(Effect.forkChild)
|
||||
const sh = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("shell"))).pipe(Effect.forkChild)
|
||||
yield* Effect.sleep("10 millis")
|
||||
|
||||
const work = Effect.gen(function* () {
|
||||
@@ -414,16 +397,7 @@ describe("Runner", () => {
|
||||
const runner = Runner.make<string>(s)
|
||||
const gate = yield* Deferred.make<void>()
|
||||
|
||||
const sh = yield* runner
|
||||
.startShell((signal) =>
|
||||
Effect.promise(
|
||||
() =>
|
||||
new Promise<string>((resolve) => {
|
||||
signal.addEventListener("abort", () => resolve("aborted"), { once: true })
|
||||
}),
|
||||
),
|
||||
)
|
||||
.pipe(Effect.forkChild)
|
||||
const sh = yield* runner.startShell(Effect.never.pipe(Effect.as("aborted"))).pipe(Effect.forkChild)
|
||||
yield* Effect.sleep("10 millis")
|
||||
|
||||
const run = yield* runner.ensureRunning(Effect.succeed("y")).pipe(Effect.forkChild)
|
||||
@@ -478,7 +452,7 @@ describe("Runner", () => {
|
||||
const runner = Runner.make<string>(s, {
|
||||
onBusy: Ref.update(count, (n) => n + 1),
|
||||
})
|
||||
yield* runner.startShell((_signal) => Effect.succeed("done"))
|
||||
yield* runner.startShell(Effect.succeed("done"))
|
||||
expect(yield* Ref.get(count)).toBe(1)
|
||||
}),
|
||||
)
|
||||
@@ -509,9 +483,7 @@ describe("Runner", () => {
|
||||
const runner = Runner.make<string>(s)
|
||||
const gate = yield* Deferred.make<void>()
|
||||
|
||||
const fiber = yield* runner
|
||||
.startShell((_signal) => Deferred.await(gate).pipe(Effect.as("ok")))
|
||||
.pipe(Effect.forkChild)
|
||||
const fiber = yield* runner.startShell(Deferred.await(gate).pipe(Effect.as("ok"))).pipe(Effect.forkChild)
|
||||
yield* Effect.sleep("10 millis")
|
||||
expect(runner.busy).toBe(true)
|
||||
|
||||
|
||||
@@ -570,6 +570,81 @@ describe("session.message-v2.toModelMessage", () => {
|
||||
])
|
||||
})
|
||||
|
||||
test("forwards partial bash output for aborted tool calls", async () => {
|
||||
const userID = "m-user"
|
||||
const assistantID = "m-assistant"
|
||||
const output = [
|
||||
"31403",
|
||||
"12179",
|
||||
"4575",
|
||||
"",
|
||||
"<bash_metadata>",
|
||||
"User aborted the command",
|
||||
"</bash_metadata>",
|
||||
].join("\n")
|
||||
|
||||
const input: MessageV2.WithParts[] = [
|
||||
{
|
||||
info: userInfo(userID),
|
||||
parts: [
|
||||
{
|
||||
...basePart(userID, "u1"),
|
||||
type: "text",
|
||||
text: "run tool",
|
||||
},
|
||||
] as MessageV2.Part[],
|
||||
},
|
||||
{
|
||||
info: assistantInfo(assistantID, userID),
|
||||
parts: [
|
||||
{
|
||||
...basePart(assistantID, "a1"),
|
||||
type: "tool",
|
||||
callID: "call-1",
|
||||
tool: "bash",
|
||||
state: {
|
||||
status: "error",
|
||||
input: { command: "for i in {1..20}; do print -- $RANDOM; sleep 1; done" },
|
||||
error: "Tool execution aborted",
|
||||
metadata: { interrupted: true, output },
|
||||
time: { start: 0, end: 1 },
|
||||
},
|
||||
},
|
||||
] as MessageV2.Part[],
|
||||
},
|
||||
]
|
||||
|
||||
expect(await MessageV2.toModelMessages(input, model)).toStrictEqual([
|
||||
{
|
||||
role: "user",
|
||||
content: [{ type: "text", text: "run tool" }],
|
||||
},
|
||||
{
|
||||
role: "assistant",
|
||||
content: [
|
||||
{
|
||||
type: "tool-call",
|
||||
toolCallId: "call-1",
|
||||
toolName: "bash",
|
||||
input: { command: "for i in {1..20}; do print -- $RANDOM; sleep 1; done" },
|
||||
providerExecuted: undefined,
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
role: "tool",
|
||||
content: [
|
||||
{
|
||||
type: "tool-result",
|
||||
toolCallId: "call-1",
|
||||
toolName: "bash",
|
||||
output: { type: "text", value: output },
|
||||
},
|
||||
],
|
||||
},
|
||||
])
|
||||
})
|
||||
|
||||
test("filters assistant messages with non-abort errors", async () => {
|
||||
const assistantID = "m-assistant"
|
||||
|
||||
|
||||
@@ -21,7 +21,7 @@ import { Log } from "../../src/util/log"
|
||||
import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
|
||||
import { provideTmpdirServer } from "../fixture/fixture"
|
||||
import { testEffect } from "../lib/effect"
|
||||
import { reply, TestLLMServer } from "../lib/llm-server"
|
||||
import { raw, reply, TestLLMServer } from "../lib/llm-server"
|
||||
|
||||
Log.init({ print: false })
|
||||
|
||||
@@ -218,6 +218,93 @@ it.live("session.processor effect tests capture llm input cleanly", () =>
|
||||
),
|
||||
)
|
||||
|
||||
it.live("session.processor effect tests preserve text start time", () =>
|
||||
provideTmpdirServer(
|
||||
({ dir, llm }) =>
|
||||
Effect.gen(function* () {
|
||||
const gate = defer<void>()
|
||||
const { processors, session, provider } = yield* boot()
|
||||
|
||||
yield* llm.push(
|
||||
raw({
|
||||
head: [
|
||||
{
|
||||
id: "chatcmpl-test",
|
||||
object: "chat.completion.chunk",
|
||||
choices: [{ delta: { role: "assistant" } }],
|
||||
},
|
||||
{
|
||||
id: "chatcmpl-test",
|
||||
object: "chat.completion.chunk",
|
||||
choices: [{ delta: { content: "hello" } }],
|
||||
},
|
||||
],
|
||||
wait: gate.promise,
|
||||
tail: [
|
||||
{
|
||||
id: "chatcmpl-test",
|
||||
object: "chat.completion.chunk",
|
||||
choices: [{ delta: {}, finish_reason: "stop" }],
|
||||
},
|
||||
],
|
||||
}),
|
||||
)
|
||||
|
||||
const chat = yield* session.create({})
|
||||
const parent = yield* user(chat.id, "hi")
|
||||
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
|
||||
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
|
||||
const handle = yield* processors.create({
|
||||
assistantMessage: msg,
|
||||
sessionID: chat.id,
|
||||
model: mdl,
|
||||
})
|
||||
|
||||
const run = yield* handle
|
||||
.process({
|
||||
user: {
|
||||
id: parent.id,
|
||||
sessionID: chat.id,
|
||||
role: "user",
|
||||
time: parent.time,
|
||||
agent: parent.agent,
|
||||
model: { providerID: ref.providerID, modelID: ref.modelID },
|
||||
} satisfies MessageV2.User,
|
||||
sessionID: chat.id,
|
||||
model: mdl,
|
||||
agent: agent(),
|
||||
system: [],
|
||||
messages: [{ role: "user", content: "hi" }],
|
||||
tools: {},
|
||||
})
|
||||
.pipe(Effect.forkChild)
|
||||
|
||||
yield* Effect.promise(async () => {
|
||||
const stop = Date.now() + 500
|
||||
while (Date.now() < stop) {
|
||||
const text = MessageV2.parts(msg.id).find((part): part is MessageV2.TextPart => part.type === "text")
|
||||
if (text?.time?.start) return
|
||||
await Bun.sleep(10)
|
||||
}
|
||||
throw new Error("timed out waiting for text part")
|
||||
})
|
||||
yield* Effect.sleep("20 millis")
|
||||
gate.resolve()
|
||||
|
||||
const exit = yield* Fiber.await(run)
|
||||
const text = MessageV2.parts(msg.id).find((part): part is MessageV2.TextPart => part.type === "text")
|
||||
|
||||
expect(Exit.isSuccess(exit)).toBe(true)
|
||||
expect(text?.text).toBe("hello")
|
||||
expect(text?.time?.start).toBeDefined()
|
||||
expect(text?.time?.end).toBeDefined()
|
||||
if (!text?.time?.start || !text.time.end) return
|
||||
expect(text.time.start).toBeLessThan(text.time.end)
|
||||
}),
|
||||
{ git: true, config: (url) => providerCfg(url) },
|
||||
),
|
||||
)
|
||||
|
||||
it.live("session.processor effect tests stop after token overflow requests compaction", () =>
|
||||
provideTmpdirServer(
|
||||
({ dir, llm }) =>
|
||||
@@ -604,6 +691,7 @@ it.live("session.processor effect tests mark pending tools as aborted on cleanup
|
||||
expect(call?.state.status).toBe("error")
|
||||
if (call?.state.status === "error") {
|
||||
expect(call.state.error).toBe("Tool execution aborted")
|
||||
expect(call.state.metadata?.interrupted).toBe(true)
|
||||
expect(call.state.time.end).toBeDefined()
|
||||
}
|
||||
}),
|
||||
|
||||
Reference in New Issue
Block a user