Compare commits

..

4 Commits

Author SHA1 Message Date
Kit Langton
f53b3a56ba Merge branch 'dev' into kit/effectify-lsp 2026-03-20 14:52:12 -04:00
Kit Langton
cb39863e95 use forkScoped + Fiber.join for lazy LSP init (match old Instance.state behavior) 2026-03-20 09:39:09 -04:00
Kit Langton
a7d55d642b Merge branch 'dev' into kit/effectify-lsp 2026-03-20 09:17:39 -04:00
Kit Langton
cf7762957f effectify LSP service: migrate from Instance.state to Effect service pattern
Replace the legacy Instance.state() pattern in LSP with Interface,
Service, layer, and async facades using runPromiseInstance. Wire
LSP.Service into InstanceServices and add runSyncInstance helper.
2026-03-19 21:22:58 -04:00
5 changed files with 690 additions and 591 deletions

View File

@@ -128,7 +128,7 @@ Still open and likely worth migrating:
- [ ] `Plugin`
- [ ] `ToolRegistry`
- [x] `Pty`
- [ ] `Pty`
- [ ] `Worktree`
- [ ] `Installation`
- [ ] `Bus`
@@ -140,5 +140,5 @@ Still open and likely worth migrating:
- [ ] `SessionCompaction`
- [ ] `Provider`
- [ ] `Project`
- [ ] `LSP`
- [x] `LSP`
- [ ] `MCP`

View File

@@ -136,8 +136,6 @@ export class AccountRepo extends ServiceMap.Service<AccountRepo, AccountRepo.Ser
.onConflictDoUpdate({
target: AccountTable.id,
set: {
email: input.email,
url: input.url,
access_token: input.accessToken,
refresh_token: input.refreshToken,
token_expiry: input.expiry,

View File

@@ -3,8 +3,8 @@ import { File } from "@/file/service"
import { FileTime } from "@/file/time-service"
import { FileWatcher } from "@/file/watcher"
import { Format } from "@/format/service"
import { LSP } from "@/lsp"
import { Permission } from "@/permission/service"
import { Pty } from "@/pty"
import { Instance } from "@/project/instance"
import { Vcs } from "@/project/vcs"
import { ProviderAuth } from "@/provider/auth-service"
@@ -25,9 +25,9 @@ export type InstanceServices =
| FileTime.Service
| Format.Service
| File.Service
| Pty.Service
| Skill.Service
| Snapshot.Service
| LSP.Service
// NOTE: LayerMap only passes the key (directory string) to lookup, but we need
// the full instance context (directory, worktree, project). We read from the
@@ -46,9 +46,9 @@ function lookup(_key: string) {
FileTime.layer,
Format.layer,
File.layer,
Pty.layer,
Skill.defaultLayer,
Snapshot.defaultLayer,
LSP.layer,
).pipe(Layer.provide(ctx))
}

View File

@@ -1,5 +1,7 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { InstanceContext } from "@/effect/instance-context"
import { runPromiseInstance } from "@/effect/runtime"
import { Log } from "../util/log"
import { LSPClient } from "./client"
import path from "path"
@@ -7,10 +9,10 @@ import { pathToFileURL, fileURLToPath } from "url"
import { LSPServer } from "./server"
import z from "zod"
import { Config } from "../config/config"
import { Instance } from "../project/instance"
import { Flag } from "@/flag/flag"
import { Process } from "../util/process"
import { spawn as lspspawn } from "./launch"
import { Effect, Fiber, Layer, ServiceMap } from "effect"
export namespace LSP {
const log = Log.create({ service: "lsp" })
@@ -77,77 +79,6 @@ export namespace LSP {
}
}
const state = Instance.state(
async () => {
const clients: LSPClient.Info[] = []
const servers: Record<string, LSPServer.Info> = {}
const cfg = await Config.get()
if (cfg.lsp === false) {
log.info("all LSPs are disabled")
return {
broken: new Set<string>(),
servers,
clients,
spawning: new Map<string, Promise<LSPClient.Info | undefined>>(),
}
}
for (const server of Object.values(LSPServer)) {
servers[server.id] = server
}
filterExperimentalServers(servers)
for (const [name, item] of Object.entries(cfg.lsp ?? {})) {
const existing = servers[name]
if (item.disabled) {
log.info(`LSP server ${name} is disabled`)
delete servers[name]
continue
}
servers[name] = {
...existing,
id: name,
root: existing?.root ?? (async () => Instance.directory),
extensions: item.extensions ?? existing?.extensions ?? [],
spawn: async (root) => {
return {
process: lspspawn(item.command[0], item.command.slice(1), {
cwd: root,
env: {
...process.env,
...item.env,
},
}),
initialization: item.initialization,
}
},
}
}
log.info("enabled LSP servers", {
serverIds: Object.values(servers)
.map((server) => server.id)
.join(", "),
})
return {
broken: new Set<string>(),
servers,
clients,
spawning: new Map<string, Promise<LSPClient.Info | undefined>>(),
}
},
async (state) => {
await Promise.all(state.clients.map((client) => client.shutdown()))
},
)
export async function init() {
return state()
}
export const Status = z
.object({
id: z.string(),
@@ -160,162 +91,6 @@ export namespace LSP {
})
export type Status = z.infer<typeof Status>
export async function status() {
return state().then((x) => {
const result: Status[] = []
for (const client of x.clients) {
result.push({
id: client.serverID,
name: x.servers[client.serverID].id,
root: path.relative(Instance.directory, client.root),
status: "connected",
})
}
return result
})
}
async function getClients(file: string) {
const s = await state()
const extension = path.parse(file).ext || file
const result: LSPClient.Info[] = []
async function schedule(server: LSPServer.Info, root: string, key: string) {
const handle = await server
.spawn(root)
.then((value) => {
if (!value) s.broken.add(key)
return value
})
.catch((err) => {
s.broken.add(key)
log.error(`Failed to spawn LSP server ${server.id}`, { error: err })
return undefined
})
if (!handle) return undefined
log.info("spawned lsp server", { serverID: server.id })
const client = await LSPClient.create({
serverID: server.id,
server: handle,
root,
}).catch(async (err) => {
s.broken.add(key)
await Process.stop(handle.process)
log.error(`Failed to initialize LSP client ${server.id}`, { error: err })
return undefined
})
if (!client) {
return undefined
}
const existing = s.clients.find((x) => x.root === root && x.serverID === server.id)
if (existing) {
await Process.stop(handle.process)
return existing
}
s.clients.push(client)
return client
}
for (const server of Object.values(s.servers)) {
if (server.extensions.length && !server.extensions.includes(extension)) continue
const root = await server.root(file)
if (!root) continue
if (s.broken.has(root + server.id)) continue
const match = s.clients.find((x) => x.root === root && x.serverID === server.id)
if (match) {
result.push(match)
continue
}
const inflight = s.spawning.get(root + server.id)
if (inflight) {
const client = await inflight
if (!client) continue
result.push(client)
continue
}
const task = schedule(server, root, root + server.id)
s.spawning.set(root + server.id, task)
task.finally(() => {
if (s.spawning.get(root + server.id) === task) {
s.spawning.delete(root + server.id)
}
})
const client = await task
if (!client) continue
result.push(client)
Bus.publish(Event.Updated, {})
}
return result
}
export async function hasClients(file: string) {
const s = await state()
const extension = path.parse(file).ext || file
for (const server of Object.values(s.servers)) {
if (server.extensions.length && !server.extensions.includes(extension)) continue
const root = await server.root(file)
if (!root) continue
if (s.broken.has(root + server.id)) continue
return true
}
return false
}
export async function touchFile(input: string, waitForDiagnostics?: boolean) {
log.info("touching file", { file: input })
const clients = await getClients(input)
await Promise.all(
clients.map(async (client) => {
const wait = waitForDiagnostics ? client.waitForDiagnostics({ path: input }) : Promise.resolve()
await client.notify.open({ path: input })
return wait
}),
).catch((err) => {
log.error("failed to touch file", { err, file: input })
})
}
export async function diagnostics() {
const results: Record<string, LSPClient.Diagnostic[]> = {}
for (const result of await runAll(async (client) => client.diagnostics)) {
for (const [path, diagnostics] of result.entries()) {
const arr = results[path] || []
arr.push(...diagnostics)
results[path] = arr
}
}
return results
}
export async function hover(input: { file: string; line: number; character: number }) {
return run(input.file, (client) => {
return client.connection
.sendRequest("textDocument/hover", {
textDocument: {
uri: pathToFileURL(input.file).href,
},
position: {
line: input.line,
character: input.character,
},
})
.catch(() => null)
})
}
enum SymbolKind {
File = 1,
Module = 2,
@@ -356,114 +131,504 @@ export namespace LSP {
SymbolKind.Enum,
]
export async function workspaceSymbol(query: string) {
return runAll((client) =>
client.connection
.sendRequest("workspace/symbol", {
query,
export interface Interface {
readonly init: () => Effect.Effect<void>
readonly status: () => Effect.Effect<Status[]>
readonly hasClients: (file: string) => Effect.Effect<boolean>
readonly touchFile: (input: string, waitForDiagnostics?: boolean) => Effect.Effect<void>
readonly diagnostics: () => Effect.Effect<Record<string, LSPClient.Diagnostic[]>>
readonly hover: (input: { file: string; line: number; character: number }) => Effect.Effect<any>
readonly workspaceSymbol: (query: string) => Effect.Effect<LSP.Symbol[]>
readonly documentSymbol: (uri: string) => Effect.Effect<(LSP.DocumentSymbol | LSP.Symbol)[]>
readonly definition: (input: { file: string; line: number; character: number }) => Effect.Effect<any[]>
readonly references: (input: { file: string; line: number; character: number }) => Effect.Effect<any[]>
readonly implementation: (input: { file: string; line: number; character: number }) => Effect.Effect<any[]>
readonly prepareCallHierarchy: (input: { file: string; line: number; character: number }) => Effect.Effect<any[]>
readonly incomingCalls: (input: { file: string; line: number; character: number }) => Effect.Effect<any[]>
readonly outgoingCalls: (input: { file: string; line: number; character: number }) => Effect.Effect<any[]>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/LSP") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const instance = yield* InstanceContext
const directory = instance.directory
const clients: LSPClient.Info[] = []
const servers: Record<string, LSPServer.Info> = {}
const broken = new Set<string>()
const spawning = new Map<string, Promise<LSPClient.Info | undefined>>()
// Load server configs lazily — forkScoped so it doesn't block layer construction
const load = Effect.fn("LSP.load")(function* () {
yield* Effect.promise(async () => {
const cfg = await Config.get()
if (cfg.lsp === false) {
log.info("all LSPs are disabled")
return
}
for (const server of Object.values(LSPServer)) {
servers[server.id] = server
}
filterExperimentalServers(servers)
for (const [name, item] of Object.entries(cfg.lsp ?? {})) {
const existing = servers[name]
if (item.disabled) {
log.info(`LSP server ${name} is disabled`)
delete servers[name]
continue
}
servers[name] = {
...existing,
id: name,
root: existing?.root ?? (async () => directory),
extensions: item.extensions ?? existing?.extensions ?? [],
spawn: async (root) => {
return {
process: lspspawn(item.command[0], item.command.slice(1), {
cwd: root,
env: {
...process.env,
...item.env,
},
}),
initialization: item.initialization,
}
},
}
}
log.info("enabled LSP servers", {
serverIds: Object.values(servers)
.map((server) => server.id)
.join(", "),
})
})
.then((result: any) => result.filter((x: LSP.Symbol) => kinds.includes(x.kind)))
.then((result: any) => result.slice(0, 10))
.catch(() => []),
).then((result) => result.flat() as LSP.Symbol[])
})
const loadFiber = yield* load().pipe(
Effect.catchCause((cause) => Effect.sync(() => log.error("init failed", { cause }))),
Effect.forkScoped,
)
// Cleanup: shut down all LSP clients when the scope is closed
yield* Effect.addFinalizer(() =>
Effect.promise(async () => {
await Promise.all(clients.map((client) => client.shutdown()))
}),
)
async function getClientsForFile(file: string) {
const extension = path.parse(file).ext || file
const result: LSPClient.Info[] = []
async function schedule(server: LSPServer.Info, root: string, key: string) {
const handle = await server
.spawn(root)
.then((value) => {
if (!value) broken.add(key)
return value
})
.catch((err) => {
broken.add(key)
log.error(`Failed to spawn LSP server ${server.id}`, { error: err })
return undefined
})
if (!handle) return undefined
log.info("spawned lsp server", { serverID: server.id })
const client = await LSPClient.create({
serverID: server.id,
server: handle,
root,
}).catch(async (err) => {
broken.add(key)
await Process.stop(handle.process)
log.error(`Failed to initialize LSP client ${server.id}`, { error: err })
return undefined
})
if (!client) {
return undefined
}
const existing = clients.find((x) => x.root === root && x.serverID === server.id)
if (existing) {
await Process.stop(handle.process)
return existing
}
clients.push(client)
return client
}
for (const server of Object.values(servers)) {
if (server.extensions.length && !server.extensions.includes(extension)) continue
const root = await server.root(file)
if (!root) continue
if (broken.has(root + server.id)) continue
const match = clients.find((x) => x.root === root && x.serverID === server.id)
if (match) {
result.push(match)
continue
}
const inflight = spawning.get(root + server.id)
if (inflight) {
const client = await inflight
if (!client) continue
result.push(client)
continue
}
const task = schedule(server, root, root + server.id)
spawning.set(root + server.id, task)
task.finally(() => {
if (spawning.get(root + server.id) === task) {
spawning.delete(root + server.id)
}
})
const client = await task
if (!client) continue
result.push(client)
Bus.publish(Event.Updated, {})
}
return result
}
async function runAllClients<T>(input: (client: LSPClient.Info) => Promise<T>): Promise<T[]> {
const tasks = clients.map((x) => input(x))
return Promise.all(tasks)
}
async function runForFile<T>(file: string, input: (client: LSPClient.Info) => Promise<T>): Promise<T[]> {
const matched = await getClientsForFile(file)
const tasks = matched.map((x) => input(x))
return Promise.all(tasks)
}
const init = Effect.fn("LSP.init")(function* () {
yield* Fiber.join(loadFiber)
})
const status = Effect.fn("LSP.status")(function* () {
yield* Fiber.join(loadFiber)
const result: Status[] = []
for (const client of clients) {
result.push({
id: client.serverID,
name: servers[client.serverID].id,
root: path.relative(directory, client.root),
status: "connected",
})
}
return result
})
const hasClients = Effect.fn("LSP.hasClients")(function* (file: string) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
const extension = path.parse(file).ext || file
for (const server of Object.values(servers)) {
if (server.extensions.length && !server.extensions.includes(extension)) continue
const root = await server.root(file)
if (!root) continue
if (broken.has(root + server.id)) continue
return true
}
return false
})
})
const touchFile = Effect.fn("LSP.touchFile")(function* (input: string, waitForDiagnostics?: boolean) {
yield* Fiber.join(loadFiber)
yield* Effect.promise(async () => {
log.info("touching file", { file: input })
const matched = await getClientsForFile(input)
await Promise.all(
matched.map(async (client) => {
const wait = waitForDiagnostics ? client.waitForDiagnostics({ path: input }) : Promise.resolve()
await client.notify.open({ path: input })
return wait
}),
).catch((err) => {
log.error("failed to touch file", { err, file: input })
})
})
})
const diagnostics = Effect.fn("LSP.diagnostics")(function* () {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
const results: Record<string, LSPClient.Diagnostic[]> = {}
for (const result of await runAllClients(async (client) => client.diagnostics)) {
for (const [path, diagnostics] of result.entries()) {
const arr = results[path] || []
arr.push(...diagnostics)
results[path] = arr
}
}
return results
})
})
const hover = Effect.fn("LSP.hover")(function* (input: { file: string; line: number; character: number }) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
return runForFile(input.file, (client) => {
return client.connection
.sendRequest("textDocument/hover", {
textDocument: {
uri: pathToFileURL(input.file).href,
},
position: {
line: input.line,
character: input.character,
},
})
.catch(() => null)
})
})
})
const workspaceSymbol = Effect.fn("LSP.workspaceSymbol")(function* (query: string) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
return runAllClients((client) =>
client.connection
.sendRequest("workspace/symbol", {
query,
})
.then((result: any) => result.filter((x: LSP.Symbol) => kinds.includes(x.kind)))
.then((result: any) => result.slice(0, 10))
.catch(() => []),
).then((result) => result.flat() as LSP.Symbol[])
})
})
const documentSymbol = Effect.fn("LSP.documentSymbol")(function* (uri: string) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
const file = fileURLToPath(uri)
return runForFile(file, (client) =>
client.connection
.sendRequest("textDocument/documentSymbol", {
textDocument: {
uri,
},
})
.catch(() => []),
)
.then((result) => result.flat() as (LSP.DocumentSymbol | LSP.Symbol)[])
.then((result) => result.filter(Boolean))
})
})
const definition = Effect.fn("LSP.definition")(function* (input: {
file: string
line: number
character: number
}) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
return runForFile(input.file, (client) =>
client.connection
.sendRequest("textDocument/definition", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => null),
).then((result) => result.flat().filter(Boolean))
})
})
const references = Effect.fn("LSP.references")(function* (input: {
file: string
line: number
character: number
}) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
return runForFile(input.file, (client) =>
client.connection
.sendRequest("textDocument/references", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
context: { includeDeclaration: true },
})
.catch(() => []),
).then((result) => result.flat().filter(Boolean))
})
})
const implementation = Effect.fn("LSP.implementation")(function* (input: {
file: string
line: number
character: number
}) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
return runForFile(input.file, (client) =>
client.connection
.sendRequest("textDocument/implementation", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => null),
).then((result) => result.flat().filter(Boolean))
})
})
const prepareCallHierarchy = Effect.fn("LSP.prepareCallHierarchy")(function* (input: {
file: string
line: number
character: number
}) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
return runForFile(input.file, (client) =>
client.connection
.sendRequest("textDocument/prepareCallHierarchy", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => []),
).then((result) => result.flat().filter(Boolean))
})
})
const incomingCalls = Effect.fn("LSP.incomingCalls")(function* (input: {
file: string
line: number
character: number
}) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
return runForFile(input.file, async (client) => {
const items = (await client.connection
.sendRequest("textDocument/prepareCallHierarchy", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => [])) as any[]
if (!items?.length) return []
return client.connection
.sendRequest("callHierarchy/incomingCalls", { item: items[0] })
.catch(() => [])
}).then((result) => result.flat().filter(Boolean))
})
})
const outgoingCalls = Effect.fn("LSP.outgoingCalls")(function* (input: {
file: string
line: number
character: number
}) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
return runForFile(input.file, async (client) => {
const items = (await client.connection
.sendRequest("textDocument/prepareCallHierarchy", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => [])) as any[]
if (!items?.length) return []
return client.connection
.sendRequest("callHierarchy/outgoingCalls", { item: items[0] })
.catch(() => [])
}).then((result) => result.flat().filter(Boolean))
})
})
log.info("init")
return Service.of({
init,
status,
hasClients,
touchFile,
diagnostics,
hover,
workspaceSymbol,
documentSymbol,
definition,
references,
implementation,
prepareCallHierarchy,
incomingCalls,
outgoingCalls,
})
}),
)
// Async facades
export async function init() {
return runPromiseInstance(Service.use((svc) => svc.init()))
}
export async function status() {
return runPromiseInstance(Service.use((svc) => svc.status()))
}
export async function hasClients(file: string) {
return runPromiseInstance(Service.use((svc) => svc.hasClients(file)))
}
export async function touchFile(input: string, waitForDiagnostics?: boolean) {
return runPromiseInstance(Service.use((svc) => svc.touchFile(input, waitForDiagnostics)))
}
export async function diagnostics() {
return runPromiseInstance(Service.use((svc) => svc.diagnostics()))
}
export async function hover(input: { file: string; line: number; character: number }) {
return runPromiseInstance(Service.use((svc) => svc.hover(input)))
}
export async function workspaceSymbol(query: string) {
return runPromiseInstance(Service.use((svc) => svc.workspaceSymbol(query)))
}
export async function documentSymbol(uri: string) {
const file = fileURLToPath(uri)
return run(file, (client) =>
client.connection
.sendRequest("textDocument/documentSymbol", {
textDocument: {
uri,
},
})
.catch(() => []),
)
.then((result) => result.flat() as (LSP.DocumentSymbol | LSP.Symbol)[])
.then((result) => result.filter(Boolean))
return runPromiseInstance(Service.use((svc) => svc.documentSymbol(uri)))
}
export async function definition(input: { file: string; line: number; character: number }) {
return run(input.file, (client) =>
client.connection
.sendRequest("textDocument/definition", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => null),
).then((result) => result.flat().filter(Boolean))
return runPromiseInstance(Service.use((svc) => svc.definition(input)))
}
export async function references(input: { file: string; line: number; character: number }) {
return run(input.file, (client) =>
client.connection
.sendRequest("textDocument/references", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
context: { includeDeclaration: true },
})
.catch(() => []),
).then((result) => result.flat().filter(Boolean))
return runPromiseInstance(Service.use((svc) => svc.references(input)))
}
export async function implementation(input: { file: string; line: number; character: number }) {
return run(input.file, (client) =>
client.connection
.sendRequest("textDocument/implementation", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => null),
).then((result) => result.flat().filter(Boolean))
return runPromiseInstance(Service.use((svc) => svc.implementation(input)))
}
export async function prepareCallHierarchy(input: { file: string; line: number; character: number }) {
return run(input.file, (client) =>
client.connection
.sendRequest("textDocument/prepareCallHierarchy", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => []),
).then((result) => result.flat().filter(Boolean))
return runPromiseInstance(Service.use((svc) => svc.prepareCallHierarchy(input)))
}
export async function incomingCalls(input: { file: string; line: number; character: number }) {
return run(input.file, async (client) => {
const items = (await client.connection
.sendRequest("textDocument/prepareCallHierarchy", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => [])) as any[]
if (!items?.length) return []
return client.connection.sendRequest("callHierarchy/incomingCalls", { item: items[0] }).catch(() => [])
}).then((result) => result.flat().filter(Boolean))
return runPromiseInstance(Service.use((svc) => svc.incomingCalls(input)))
}
export async function outgoingCalls(input: { file: string; line: number; character: number }) {
return run(input.file, async (client) => {
const items = (await client.connection
.sendRequest("textDocument/prepareCallHierarchy", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => [])) as any[]
if (!items?.length) return []
return client.connection.sendRequest("callHierarchy/outgoingCalls", { item: items[0] }).catch(() => [])
}).then((result) => result.flat().filter(Boolean))
}
async function runAll<T>(input: (client: LSPClient.Info) => Promise<T>): Promise<T[]> {
const clients = await state().then((x) => x.clients)
const tasks = clients.map((x) => input(x))
return Promise.all(tasks)
}
async function run<T>(file: string, input: (client: LSPClient.Info) => Promise<T>): Promise<T[]> {
const clients = await getClients(file)
const tasks = clients.map((x) => input(x))
return Promise.all(tasks)
return runPromiseInstance(Service.use((svc) => svc.outgoingCalls(input)))
}
export namespace Diagnostic {

View File

@@ -1,12 +1,13 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { InstanceContext } from "@/effect/instance-context"
import { type IPty } from "bun-pty"
import z from "zod"
import { Log } from "../util/log"
import { Instance } from "../project/instance"
import { lazy } from "@opencode-ai/util/lazy"
import { Shell } from "@/shell/shell"
import { Plugin } from "@/plugin"
import { PtyID } from "./schema"
import { Effect, Layer, ServiceMap } from "effect"
export namespace Pty {
const log = Log.create({ service: "pty" })
@@ -89,297 +90,232 @@ export namespace Pty {
subscribers: Map<unknown, Socket>
}
export interface Interface {
readonly list: () => Effect.Effect<Info[]>
readonly get: (id: PtyID) => Effect.Effect<Info | undefined>
readonly create: (input: CreateInput) => Effect.Effect<Info>
readonly update: (id: PtyID, input: UpdateInput) => Effect.Effect<Info | undefined>
readonly remove: (id: PtyID) => Effect.Effect<void>
readonly resize: (id: PtyID, cols: number, rows: number) => Effect.Effect<void>
readonly write: (id: PtyID, data: string) => Effect.Effect<void>
readonly connect: (
id: PtyID,
ws: Socket,
cursor?: number,
) => Effect.Effect<{ onMessage: (message: string | ArrayBuffer) => void; onClose: () => void } | undefined>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Pty") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const instance = yield* InstanceContext
const sessions = new Map<PtyID, ActiveSession>()
yield* Effect.addFinalizer(() =>
Effect.sync(() => {
for (const session of sessions.values()) {
try {
session.process.kill()
} catch {}
for (const [key, ws] of session.subscribers.entries()) {
try {
if (ws.data === key) ws.close()
} catch {}
}
}
sessions.clear()
}),
)
const removeSession = (id: PtyID) => {
const session = sessions.get(id)
if (!session) return
sessions.delete(id)
log.info("removing session", { id })
const state = Instance.state(
() => new Map<PtyID, ActiveSession>(),
async (sessions) => {
for (const session of sessions.values()) {
try {
session.process.kill()
} catch {}
for (const [key, ws] of session.subscribers.entries()) {
try {
if (ws.data === key) ws.close()
} catch {}
}
session.subscribers.clear()
Bus.publish(Event.Deleted, { id: session.info.id })
}
const list = Effect.fn("Pty.list")(function* () {
return Array.from(sessions.values()).map((s) => s.info)
})
const get = Effect.fn("Pty.get")(function* (id: PtyID) {
return sessions.get(id)?.info
})
const create = Effect.fn("Pty.create")(function* (input: CreateInput) {
return yield* Effect.promise(async () => {
const [{ Shell }, { Plugin }] = await Promise.all([import("@/shell/shell"), import("@/plugin")])
const id = PtyID.ascending()
const command = input.command || Shell.preferred()
const args = input.args || []
if (command.endsWith("sh")) {
args.push("-l")
}
const cwd = input.cwd || instance.directory
const shellEnv = await Plugin.trigger("shell.env", { cwd }, { env: {} })
const env = {
...process.env,
...input.env,
...shellEnv.env,
TERM: "xterm-256color",
OPENCODE_TERMINAL: "1",
} as Record<string, string>
if (process.platform === "win32") {
env.LC_ALL = "C.UTF-8"
env.LC_CTYPE = "C.UTF-8"
env.LANG = "C.UTF-8"
}
log.info("creating session", { id, cmd: command, args, cwd })
const spawn = await pty()
const ptyProcess = spawn(command, args, {
name: "xterm-256color",
cwd,
env,
})
const info = {
id,
title: input.title || `Terminal ${id.slice(-4)}`,
command,
args,
cwd,
status: "running",
pid: ptyProcess.pid,
} as const
const session: ActiveSession = {
info,
process: ptyProcess,
buffer: "",
bufferCursor: 0,
cursor: 0,
subscribers: new Map(),
}
sessions.set(id, session)
ptyProcess.onData((chunk) => {
session.cursor += chunk.length
for (const [key, ws] of session.subscribers.entries()) {
if (ws.readyState !== 1) {
session.subscribers.delete(key)
continue
}
if (ws.data !== key) {
session.subscribers.delete(key)
continue
}
try {
ws.send(chunk)
} catch {
session.subscribers.delete(key)
}
}
session.buffer += chunk
if (session.buffer.length <= BUFFER_LIMIT) return
const excess = session.buffer.length - BUFFER_LIMIT
session.buffer = session.buffer.slice(excess)
session.bufferCursor += excess
})
ptyProcess.onExit(({ exitCode }) => {
if (session.info.status === "exited") return
log.info("session exited", { id, exitCode })
session.info.status = "exited"
Bus.publish(Event.Exited, { id, exitCode })
removeSession(id)
})
Bus.publish(Event.Created, { info })
return info
})
})
const update = Effect.fn("Pty.update")(function* (id: PtyID, input: UpdateInput) {
const session = sessions.get(id)
if (!session) return
if (input.title) {
session.info.title = input.title
}
if (input.size) {
session.process.resize(input.size.cols, input.size.rows)
}
Bus.publish(Event.Updated, { info: session.info })
return session.info
})
const remove = Effect.fn("Pty.remove")(function* (id: PtyID) {
removeSession(id)
})
const resize = Effect.fn("Pty.resize")(function* (id: PtyID, cols: number, rows: number) {
const session = sessions.get(id)
if (session && session.info.status === "running") {
session.process.resize(cols, rows)
}
})
const write = Effect.fn("Pty.write")(function* (id: PtyID, data: string) {
const session = sessions.get(id)
if (session && session.info.status === "running") {
session.process.write(data)
}
})
const connect = Effect.fn("Pty.connect")(function* (id: PtyID, ws: Socket, cursor?: number) {
const session = sessions.get(id)
if (!session) {
ws.close()
return
}
log.info("client connected to session", { id })
const connectionKey = ws.data && typeof ws.data === "object" ? ws.data : ws
session.subscribers.delete(connectionKey)
session.subscribers.set(connectionKey, ws)
const cleanup = () => {
session.subscribers.delete(connectionKey)
}
const start = session.bufferCursor
const end = session.cursor
const from =
cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0
const data = (() => {
if (!session.buffer) return ""
if (from >= end) return ""
const offset = Math.max(0, from - start)
if (offset >= session.buffer.length) return ""
return session.buffer.slice(offset)
})()
if (data) {
try {
for (let i = 0; i < data.length; i += BUFFER_CHUNK) {
ws.send(data.slice(i, i + BUFFER_CHUNK))
}
} catch {
cleanup()
ws.close()
return
// ignore
}
}
try {
ws.send(meta(end))
} catch {
cleanup()
ws.close()
return
}
return {
onMessage: (message: string | ArrayBuffer) => {
session.process.write(String(message))
},
onClose: () => {
log.info("client disconnected from session", { id })
cleanup()
},
}
})
return Service.of({ list, get, create, update, remove, resize, write, connect })
}),
}
sessions.clear()
},
)
function runtime() {
return require("@/effect/runtime") as typeof import("@/effect/runtime")
}
function run<A, E>(effect: Effect.Effect<A, E, Service>) {
return runtime().runPromiseInstance(effect)
}
function runSync<A, E>(effect: Effect.Effect<A, E, Service>) {
return runtime().runSyncInstance(effect)
}
// Sync facades
export function list() {
return runSync(Service.use((svc) => svc.list()))
return Array.from(state().values()).map((s) => s.info)
}
export function get(id: PtyID) {
return runSync(Service.use((svc) => svc.get(id)))
return state().get(id)?.info
}
export function resize(id: PtyID, cols: number, rows: number) {
runSync(Service.use((svc) => svc.resize(id, cols, rows)))
}
export function write(id: PtyID, data: string) {
runSync(Service.use((svc) => svc.write(id, data)))
}
export function connect(id: PtyID, ws: Socket, cursor?: number) {
return runSync(Service.use((svc) => svc.connect(id, ws, cursor)))
}
// Async facades
export async function create(input: CreateInput) {
return run(Service.use((svc) => svc.create(input)))
const id = PtyID.ascending()
const command = input.command || Shell.preferred()
const args = input.args || []
if (command.endsWith("sh")) {
args.push("-l")
}
const cwd = input.cwd || Instance.directory
const shellEnv = await Plugin.trigger("shell.env", { cwd }, { env: {} })
const env = {
...process.env,
...input.env,
...shellEnv.env,
TERM: "xterm-256color",
OPENCODE_TERMINAL: "1",
} as Record<string, string>
if (process.platform === "win32") {
env.LC_ALL = "C.UTF-8"
env.LC_CTYPE = "C.UTF-8"
env.LANG = "C.UTF-8"
}
log.info("creating session", { id, cmd: command, args, cwd })
const spawn = await pty()
const ptyProcess = spawn(command, args, {
name: "xterm-256color",
cwd,
env,
})
const info = {
id,
title: input.title || `Terminal ${id.slice(-4)}`,
command,
args,
cwd,
status: "running",
pid: ptyProcess.pid,
} as const
const session: ActiveSession = {
info,
process: ptyProcess,
buffer: "",
bufferCursor: 0,
cursor: 0,
subscribers: new Map(),
}
state().set(id, session)
ptyProcess.onData(
Instance.bind((chunk) => {
session.cursor += chunk.length
for (const [key, ws] of session.subscribers.entries()) {
if (ws.readyState !== 1) {
session.subscribers.delete(key)
continue
}
if (ws.data !== key) {
session.subscribers.delete(key)
continue
}
try {
ws.send(chunk)
} catch {
session.subscribers.delete(key)
}
}
session.buffer += chunk
if (session.buffer.length <= BUFFER_LIMIT) return
const excess = session.buffer.length - BUFFER_LIMIT
session.buffer = session.buffer.slice(excess)
session.bufferCursor += excess
}),
)
ptyProcess.onExit(
Instance.bind(({ exitCode }) => {
if (session.info.status === "exited") return
log.info("session exited", { id, exitCode })
session.info.status = "exited"
Bus.publish(Event.Exited, { id, exitCode })
remove(id)
}),
)
Bus.publish(Event.Created, { info })
return info
}
export async function update(id: PtyID, input: UpdateInput) {
return run(Service.use((svc) => svc.update(id, input)))
const session = state().get(id)
if (!session) return
if (input.title) {
session.info.title = input.title
}
if (input.size) {
session.process.resize(input.size.cols, input.size.rows)
}
Bus.publish(Event.Updated, { info: session.info })
return session.info
}
export async function remove(id: PtyID) {
return run(Service.use((svc) => svc.remove(id)))
const session = state().get(id)
if (!session) return
state().delete(id)
log.info("removing session", { id })
try {
session.process.kill()
} catch {}
for (const [key, ws] of session.subscribers.entries()) {
try {
if (ws.data === key) ws.close()
} catch {
// ignore
}
}
session.subscribers.clear()
Bus.publish(Event.Deleted, { id: session.info.id })
}
export function resize(id: PtyID, cols: number, rows: number) {
const session = state().get(id)
if (session && session.info.status === "running") {
session.process.resize(cols, rows)
}
}
export function write(id: PtyID, data: string) {
const session = state().get(id)
if (session && session.info.status === "running") {
session.process.write(data)
}
}
export function connect(id: PtyID, ws: Socket, cursor?: number) {
const session = state().get(id)
if (!session) {
ws.close()
return
}
log.info("client connected to session", { id })
// Use ws.data as the unique key for this connection lifecycle.
// If ws.data is undefined, fallback to ws object.
const connectionKey = ws.data && typeof ws.data === "object" ? ws.data : ws
// Optionally cleanup if the key somehow exists
session.subscribers.delete(connectionKey)
session.subscribers.set(connectionKey, ws)
const cleanup = () => {
session.subscribers.delete(connectionKey)
}
const start = session.bufferCursor
const end = session.cursor
const from =
cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0
const data = (() => {
if (!session.buffer) return ""
if (from >= end) return ""
const offset = Math.max(0, from - start)
if (offset >= session.buffer.length) return ""
return session.buffer.slice(offset)
})()
if (data) {
try {
for (let i = 0; i < data.length; i += BUFFER_CHUNK) {
ws.send(data.slice(i, i + BUFFER_CHUNK))
}
} catch {
cleanup()
ws.close()
return
}
}
try {
ws.send(meta(end))
} catch {
cleanup()
ws.close()
return
}
return {
onMessage: (message: string | ArrayBuffer) => {
session.process.write(String(message))
},
onClose: () => {
log.info("client disconnected from session", { id })
cleanup()
},
}
}
}