feat(acp): add acp-next session lifecycle (#29320)

This commit is contained in:
Shoubhit Dash
2026-05-26 11:45:24 +05:30
committed by GitHub
parent 49707c2117
commit 15c5ec6d24
4 changed files with 378 additions and 6 deletions

View File

@@ -4,10 +4,14 @@ import {
type AgentSideConnection,
type AuthenticateRequest,
type CancelNotification,
type CloseSessionRequest,
type ForkSessionRequest,
type InitializeRequest,
type ListSessionsRequest,
type LoadSessionRequest,
type NewSessionRequest,
type PromptRequest,
type ResumeSessionRequest,
type SetSessionConfigOptionRequest,
type SetSessionModelRequest,
type SetSessionModeRequest,
@@ -44,6 +48,22 @@ export class Agent implements ACPAgent {
return run(this.service.loadSession(params))
}
listSessions(params: ListSessionsRequest) {
return run(this.service.listSessions(params))
}
resumeSession(params: ResumeSessionRequest) {
return run(this.service.resumeSession(params))
}
closeSession(params: CloseSessionRequest) {
return run(this.service.closeSession(params))
}
unstable_forkSession(params: ForkSessionRequest) {
return run(this.service.forkSession(params))
}
setSessionConfigOption(params: SetSessionConfigOptionRequest) {
return run(this.service.setSessionConfigOption(params))
}

View File

@@ -4,8 +4,14 @@ import {
type AuthenticateResponse,
type AuthMethod,
type CancelNotification,
type CloseSessionRequest,
type CloseSessionResponse,
type ForkSessionRequest,
type ForkSessionResponse,
type InitializeRequest,
type InitializeResponse,
type ListSessionsRequest,
type ListSessionsResponse,
type LoadSessionRequest,
type LoadSessionResponse,
type McpServer,
@@ -13,6 +19,9 @@ import {
type NewSessionResponse,
type PromptRequest,
type PromptResponse,
type ResumeSessionRequest,
type ResumeSessionResponse,
type SessionInfo,
type SetSessionConfigOptionRequest,
type SetSessionConfigOptionResponse,
type SetSessionModelRequest,
@@ -21,6 +30,7 @@ import {
type SetSessionModeResponse,
} from "@agentclientprotocol/sdk"
import { InstallationVersion } from "@opencode-ai/core/installation/version"
import * as Log from "@opencode-ai/core/util/log"
import type { OpencodeClient } from "@opencode-ai/sdk/v2"
import { Context, Effect, Layer, ManagedRuntime } from "effect"
import * as ACPNextError from "./error"
@@ -32,6 +42,7 @@ import { Provider } from "@/provider/provider"
import type { Command } from "@/command"
export const AuthMethodID = "opencode-login"
const log = Log.create({ service: "acp-next-service" })
export type Error = ACPNextError.Error
@@ -40,6 +51,10 @@ export type Interface = {
readonly authenticate: (input: AuthenticateRequest) => Effect.Effect<AuthenticateResponse, Error>
readonly newSession: (input: NewSessionRequest) => Effect.Effect<NewSessionResponse, Error>
readonly loadSession: (input: LoadSessionRequest) => Effect.Effect<LoadSessionResponse, Error>
readonly listSessions: (input: ListSessionsRequest) => Effect.Effect<ListSessionsResponse, Error>
readonly resumeSession: (input: ResumeSessionRequest) => Effect.Effect<ResumeSessionResponse, Error>
readonly closeSession: (input: CloseSessionRequest) => Effect.Effect<CloseSessionResponse, Error>
readonly forkSession: (input: ForkSessionRequest) => Effect.Effect<ForkSessionResponse, Error>
readonly setSessionConfigOption: (
input: SetSessionConfigOptionRequest,
) => Effect.Effect<SetSessionConfigOptionResponse, Error>
@@ -90,6 +105,12 @@ export function make(input: {
embeddedContext: true,
image: true,
},
sessionCapabilities: {
close: {},
fork: {},
list: {},
resume: {},
},
},
authMethods: [authMethod],
agentInfo: {
@@ -181,6 +202,135 @@ export function make(input: {
yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, state.id, params.mcpServers)
yield* sendAvailableCommands(input.connection, state.id, snapshot)
return {
configOptions: configOptions(snapshot, {
model: state.model ?? model,
variant: state.variant,
modeId: state.modeId,
}),
}
})
const listSessions = Effect.fn("ACPNext.listSessions")(function* (params: ListSessionsRequest) {
const cursor = params.cursor ? Number(params.cursor) : undefined
const limit = 100
const sessions = yield* request(
() =>
input.sdk.session.list(
{
...(params.cwd ? { directory: params.cwd } : {}),
roots: true,
},
{ throwOnError: true },
),
"session",
)
const sorted = sessions.toSorted((a, b) => b.time.updated - a.time.updated)
const filtered =
cursor === undefined || !Number.isFinite(cursor) ? sorted : sorted.filter((item) => item.time.updated < cursor)
const page = filtered.slice(0, limit)
const last = page.at(-1)
return {
sessions: page.map((item): SessionInfo => ({
sessionId: item.id,
cwd: item.directory,
title: item.title,
updatedAt: new Date(item.time.updated).toISOString(),
})),
...(filtered.length > limit && last ? { nextCursor: String(last.time.updated) } : {}),
}
})
const resumeSession = Effect.fn("ACPNext.resumeSession")(function* (params: ResumeSessionRequest) {
const snapshot = yield* directorySnapshot(params.cwd)
yield* request(
() => input.sdk.session.get({ directory: params.cwd, sessionID: params.sessionId }, { throwOnError: true }),
"session",
)
const messages = yield* request(
() =>
input.sdk.session.messages(
{ directory: params.cwd, sessionID: params.sessionId, limit: 20 },
{ throwOnError: true },
),
"session",
)
const restored = restoreFromMessages(messages.map((item) => item.info))
const model = restored.model ?? selectDefaultModel(snapshot)
const state = yield* session.load({
id: params.sessionId,
cwd: params.cwd,
mcpServers: params.mcpServers ?? [],
model,
variant: restored.variant ?? selectVariant(snapshot, model),
modeId: restored.modeId ?? (snapshot.availableModes.length > 0 ? snapshot.defaultModeID : undefined),
})
yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, state.id, params.mcpServers ?? [])
yield* sendAvailableCommands(input.connection, state.id, snapshot)
return {
configOptions: configOptions(snapshot, {
model: state.model ?? model,
variant: state.variant,
modeId: state.modeId,
}),
}
})
const closeSession = Effect.fn("ACPNext.closeSession")(function* (params: CloseSessionRequest) {
const removed = yield* session.remove(params.sessionId)
registeredMcp.delete(params.sessionId)
if (!removed) return {}
yield* request(
() => input.sdk.session.abort({ directory: removed.cwd, sessionID: params.sessionId }, { throwOnError: true }),
"session",
).pipe(
Effect.catch((error) =>
Effect.sync(() => {
log.error("failed to abort session while closing ACP session", { error, sessionID: params.sessionId })
}),
),
)
return {}
})
const forkSession = Effect.fn("ACPNext.forkSession")(function* (params: ForkSessionRequest) {
const snapshot = yield* directorySnapshot(params.cwd)
const forked = yield* request(
() =>
input.sdk.session.fork(
{
directory: params.cwd,
sessionID: params.sessionId,
},
{ throwOnError: true },
),
"session",
)
const messages = yield* request(
() =>
input.sdk.session.messages(
{ directory: params.cwd, sessionID: forked.id, limit: 20 },
{ throwOnError: true },
),
"session",
)
const restored = restoreFromMessages(messages.map((item) => item.info))
const model = restored.model ?? selectDefaultModel(snapshot)
const state = yield* session.load({
id: forked.id,
cwd: params.cwd,
mcpServers: params.mcpServers ?? [],
model,
variant: restored.variant ?? selectVariant(snapshot, model),
modeId: restored.modeId ?? (snapshot.availableModes.length > 0 ? snapshot.defaultModeID : undefined),
})
yield* registerMcpServers(input.sdk, registeredMcp, params.cwd, state.id, params.mcpServers ?? [])
yield* sendAvailableCommands(input.connection, state.id, snapshot)
return {
sessionId: state.id,
configOptions: configOptions(snapshot, {
@@ -278,6 +428,10 @@ export function make(input: {
authenticate,
newSession,
loadSession,
listSessions,
resumeSession,
closeSession,
forkSession,
setSessionConfigOption,
setSessionMode,
setSessionModel,

View File

@@ -1,16 +1,19 @@
import { describe, expect, it } from "bun:test"
import type {
AgentSideConnection,
ForkSessionResponse,
LoadSessionResponse,
NewSessionResponse,
ResumeSessionResponse,
SessionConfigOption,
SessionConfigSelectOption,
SetSessionConfigOptionResponse,
} from "@agentclientprotocol/sdk"
import type { OpencodeClient } from "@opencode-ai/sdk/v2"
import { Effect } from "effect"
import { Effect, ManagedRuntime } from "effect"
import * as ACPNextService from "@/acp-next/service"
import * as ACPNextError from "@/acp-next/error"
import { ACPNextSession } from "@/acp-next/session"
import { ModelID, ProviderID } from "@/provider/schema"
import type { Provider } from "@/provider/provider"
@@ -140,6 +143,14 @@ describe("ACP next service sessions", () => {
const makeService = (messages: readonly { info: unknown; parts: readonly unknown[] }[] = []) => {
const updates: unknown[] = []
const mcpAdds: string[] = []
const aborts: string[] = []
const forks: string[] = []
const sessions = Array.from({ length: 102 }, (_, index) => ({
id: `ses_${index + 1}`,
directory: index % 2 === 0 ? "/workspace" : "/other",
title: `Session ${index + 1}`,
time: { created: index + 1, updated: index + 1 },
}))
const sdk = {
config: {
providers: () => Promise.resolve({ data: { providers: [provider], default: { test: modelID } } }),
@@ -168,8 +179,19 @@ describe("ACP next service sessions", () => {
session: {
create: () => Promise.resolve({ data: { id: "ses_new" } }),
get: () => Promise.resolve({ data: { id: "ses_loaded" } }),
list: () => Promise.resolve({ data: [] }),
list: (input: { directory?: string }) =>
Promise.resolve({
data: input.directory ? sessions.filter((session) => session.directory === input.directory) : sessions,
}),
messages: () => Promise.resolve({ data: messages }),
abort: (input: { sessionID: string }) => {
aborts.push(input.sessionID)
return Promise.resolve({ data: true })
},
fork: (input: { sessionID: string }) => {
forks.push(input.sessionID)
return Promise.resolve({ data: { id: `fork_${input.sessionID}` } })
},
},
mcp: {
add: (input: { name?: string }) => {
@@ -185,7 +207,7 @@ describe("ACP next service sessions", () => {
},
} as Pick<AgentSideConnection, "sessionUpdate">
return { service: ACPNextService.make({ sdk, connection }), updates, mcpAdds }
return { service: ACPNextService.make({ sdk, connection }), updates, mcpAdds, aborts, forks }
}
it("creates a backed session with config options and command update", async () => {
@@ -233,6 +255,125 @@ describe("ACP next service sessions", () => {
expect(result.configOptions?.find((option) => option.id === "mode")?.currentValue).toBe("plan")
})
it("lists sessions sorted by updated time with cursor support", async () => {
const { service } = makeService()
const first = await Effect.runPromise(service.listSessions({ cwd: "/workspace" }))
const second = await Effect.runPromise(service.listSessions({ cwd: "/workspace", cursor: first.nextCursor }))
expect(first.sessions).toHaveLength(51)
expect(first.sessions[0]?.sessionId).toBe("ses_101")
expect(first.sessions.at(-1)?.sessionId).toBe("ses_1")
expect(first.nextCursor).toBeUndefined()
expect(second.sessions).toEqual(first.sessions)
})
it("lists all sessions with next cursor when the first page is full", async () => {
const { service } = makeService()
const first = await Effect.runPromise(service.listSessions({}))
const second = await Effect.runPromise(service.listSessions({ cursor: first.nextCursor }))
expect(first.sessions).toHaveLength(100)
expect(first.sessions[0]?.sessionId).toBe("ses_102")
expect(first.sessions.at(-1)?.sessionId).toBe("ses_3")
expect(first.nextCursor).toBe("3")
expect(second.sessions.map((session) => session.sessionId)).toEqual(["ses_2", "ses_1"])
})
it("resumes a session and stores restored state", async () => {
const { service } = makeService([
{
info: {
role: "user",
model: { providerID: "test", modelID: "test-model", variant: "high" },
agent: "plan",
},
parts: [],
},
])
const resumed = await Effect.runPromise(
service.resumeSession({ cwd: "/workspace", sessionId: "ses_resume", mcpServers: [] }),
)
const updated = await Effect.runPromise(
service.setSessionConfigOption({ sessionId: "ses_resume", configId: "effort", value: "default" }),
)
expect(select(resumed, "effort")?.currentValue).toBe("high")
expect(select(updated, "effort")?.currentValue).toBe("default")
})
it("closes local ACP state and aborts the backing session best-effort", async () => {
const { service, aborts } = makeService()
const created = await Effect.runPromise(service.newSession({ cwd: "/workspace", mcpServers: [] }))
expect(await Effect.runPromise(service.closeSession({ sessionId: created.sessionId }))).toEqual({})
const missing = await Effect.runPromise(
service
.setSessionConfigOption({ sessionId: created.sessionId, configId: "effort", value: "high" })
.pipe(Effect.mapError(ACPNextError.toRequestError), Effect.flip),
)
expect(missing.code).toBe(-32602)
expect(aborts).toEqual([created.sessionId])
expect(await Effect.runPromise(service.closeSession({ sessionId: "missing" }))).toEqual({})
})
it("does not fail close when backing abort fails", async () => {
const sessionService = ManagedRuntime.make(ACPNextSession.defaultLayer).runSync(
ACPNextSession.Service.use((service) => Effect.succeed(service)),
)
const { service } = makeService()
const sdk = {
config: {
providers: () => Promise.resolve({ data: { providers: [provider], default: { test: modelID } } }),
get: () => Promise.resolve({ data: {} }),
},
app: {
agents: () => Promise.resolve({ data: [{ name: "build", mode: "primary", permission: [], options: {} }] }),
skills: () => Promise.resolve({ data: [] }),
},
command: {
list: () => Promise.resolve({ data: [] }),
},
session: {
abort: () => Promise.reject(new Error("nope")),
},
mcp: {
add: () => Promise.resolve({ data: {} }),
},
} as unknown as OpencodeClient
const closing = ACPNextService.make({ sdk, session: sessionService })
await Effect.runPromise(sessionService.create({ id: "ses_close", cwd: "/workspace" }))
expect(await Effect.runPromise(closing.closeSession({ sessionId: "ses_close" }))).toEqual({})
expect(await Effect.runPromise(service.closeSession({ sessionId: "missing" }))).toEqual({})
})
it("forks a session, loads fork state, and returns config options", async () => {
const { service, forks } = makeService([
{
info: {
role: "assistant",
providerID: "test",
modelID: "second-model",
variant: "medium",
mode: "plan",
},
parts: [],
},
])
const forked = await Effect.runPromise(
service.forkSession({ cwd: "/workspace", sessionId: "ses_parent", mcpServers: [] }),
)
const updated = await Effect.runPromise(
service.setSessionConfigOption({ sessionId: forked.sessionId, configId: "effort", value: "low" }),
)
expect(forked.sessionId).toBe("fork_ses_parent")
expect(select(forked, "model")?.currentValue).toBe("test/second-model")
expect(select(forked, "effort")?.currentValue).toBe("medium")
expect(select(updated, "effort")?.currentValue).toBe("low")
expect(forks).toEqual(["ses_parent"])
})
it("restores model variant and mode from the latest user message", async () => {
const { service } = makeService([
{
@@ -522,8 +663,11 @@ function categories(result: NewSessionResponse | LoadSessionResponse) {
return result.configOptions?.map((option) => option.category) ?? []
}
function select(result: SetSessionConfigOptionResponse, id: string) {
return result.configOptions.find(
function select(
result: SetSessionConfigOptionResponse | ResumeSessionResponse | NewSessionResponse | ForkSessionResponse,
id: string,
) {
return result.configOptions?.find(
(option): option is Extract<SessionConfigOption, { type: "select" }> =>
option.id === id && option.type === "select",
)

View File

@@ -1,9 +1,11 @@
import { describe, expect } from "bun:test"
import type {
AuthenticateResponse,
CloseSessionResponse,
InitializeResponse,
LoadSessionResponse,
NewSessionResponse,
ResumeSessionResponse,
SessionNotification,
SetSessionConfigOptionResponse,
} from "@agentclientprotocol/sdk"
@@ -31,7 +33,10 @@ describe("opencode acp-next (subprocess)", () => {
expect(initialized.agentCapabilities?.mcpCapabilities?.http).toBe(true)
expect(initialized.agentCapabilities?.mcpCapabilities?.sse).toBe(true)
expect(initialized.agentCapabilities?.loadSession).toBe(true)
expect(initialized.agentCapabilities?.sessionCapabilities).toBeUndefined()
expect(initialized.agentCapabilities?.sessionCapabilities?.close).toEqual({})
expect(initialized.agentCapabilities?.sessionCapabilities?.fork).toEqual({})
expect(initialized.agentCapabilities?.sessionCapabilities?.list).toEqual({})
expect(initialized.agentCapabilities?.sessionCapabilities?.resume).toEqual({})
expect(initialized.agentInfo?.name).toBe("OpenCode")
expect(initialized.authMethods?.[0]?.id).toBe("opencode-login")
expect(initialized.authMethods?.[0]?._meta?.["terminal-auth"]).toBeDefined()
@@ -159,6 +164,55 @@ describe("opencode acp-next (subprocess)", () => {
60_000,
)
cliIt.live(
"advertises and supports close behind OPENCODE_ACP_NEXT",
({ home, llm, opencode }) =>
Effect.gen(function* () {
const acp = createAcpClient(
yield* opencode.acp({
env: {
OPENCODE_ACP_NEXT: "1",
OPENCODE_CONFIG_CONTENT: JSON.stringify(verifierConfig(llm.url)),
},
}),
)
const initialized = expectOk(yield* acp.request<InitializeResponse>("initialize", { protocolVersion: 1 }))
expect(initialized.agentCapabilities?.sessionCapabilities?.close).toEqual({})
const session = expectOk(yield* acp.request<NewSessionResponse>("session/new", { cwd: home, mcpServers: [] }))
expectOk(yield* acp.request<CloseSessionResponse>("session/close", { sessionId: session.sessionId }))
}),
60_000,
)
cliIt.live(
"advertises and supports resume behind OPENCODE_ACP_NEXT",
({ home, llm, opencode }) =>
Effect.gen(function* () {
const acp = createAcpClient(
yield* opencode.acp({
env: {
OPENCODE_ACP_NEXT: "1",
OPENCODE_CONFIG_CONTENT: JSON.stringify(verifierConfig(llm.url)),
},
}),
)
const initialized = expectOk(yield* acp.request<InitializeResponse>("initialize", { protocolVersion: 1 }))
expect(initialized.agentCapabilities?.sessionCapabilities?.resume).toEqual({})
const session = expectOk(yield* acp.request<NewSessionResponse>("session/new", { cwd: home, mcpServers: [] }))
const resumed = expectOk(
yield* acp.request<ResumeSessionResponse>("session/resume", {
cwd: home,
sessionId: session.sessionId,
mcpServers: [],
}),
)
expect(selectConfigOption(resumed.configOptions, "model")?.category).toBe("model")
}),
60_000,
)
cliIt.live(
"exits cleanly when flagged stdin is closed",
({ opencode }) =>