Compare commits

..

4 Commits

Author SHA1 Message Date
Kit Langton
a2b81cd8a5 Merge branch 'dev' into kit/effectify-pty 2026-03-20 15:25:17 -04:00
Kit Langton
24f9df5463 fix: update stale account url/email on re-login (#18426) 2026-03-20 14:50:01 -04:00
Kit Langton
771f1918b5 Merge branch 'dev' into kit/effectify-pty 2026-03-20 09:17:37 -04:00
Kit Langton
6a3cb06c6c effectify Pty service, add runSyncInstance helper 2026-03-19 21:18:15 -04:00
5 changed files with 588 additions and 687 deletions

View File

@@ -128,7 +128,7 @@ Still open and likely worth migrating:
- [ ] `Plugin`
- [ ] `ToolRegistry`
- [ ] `Pty`
- [x] `Pty`
- [ ] `Worktree`
- [ ] `Installation`
- [ ] `Bus`
@@ -140,5 +140,5 @@ Still open and likely worth migrating:
- [ ] `SessionCompaction`
- [ ] `Provider`
- [ ] `Project`
- [x] `LSP`
- [ ] `LSP`
- [ ] `MCP`

View File

@@ -136,6 +136,8 @@ export class AccountRepo extends ServiceMap.Service<AccountRepo, AccountRepo.Ser
.onConflictDoUpdate({
target: AccountTable.id,
set: {
email: input.email,
url: input.url,
access_token: input.accessToken,
refresh_token: input.refreshToken,
token_expiry: input.expiry,

View File

@@ -3,8 +3,8 @@ import { File } from "@/file/service"
import { FileTime } from "@/file/time-service"
import { FileWatcher } from "@/file/watcher"
import { Format } from "@/format/service"
import { LSP } from "@/lsp"
import { Permission } from "@/permission/service"
import { Pty } from "@/pty"
import { Instance } from "@/project/instance"
import { Vcs } from "@/project/vcs"
import { ProviderAuth } from "@/provider/auth-service"
@@ -25,9 +25,9 @@ export type InstanceServices =
| FileTime.Service
| Format.Service
| File.Service
| Pty.Service
| Skill.Service
| Snapshot.Service
| LSP.Service
// NOTE: LayerMap only passes the key (directory string) to lookup, but we need
// the full instance context (directory, worktree, project). We read from the
@@ -46,9 +46,9 @@ function lookup(_key: string) {
FileTime.layer,
Format.layer,
File.layer,
Pty.layer,
Skill.defaultLayer,
Snapshot.defaultLayer,
LSP.layer,
).pipe(Layer.provide(ctx))
}

View File

@@ -1,7 +1,5 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { InstanceContext } from "@/effect/instance-context"
import { runPromiseInstance } from "@/effect/runtime"
import { Log } from "../util/log"
import { LSPClient } from "./client"
import path from "path"
@@ -9,10 +7,10 @@ import { pathToFileURL, fileURLToPath } from "url"
import { LSPServer } from "./server"
import z from "zod"
import { Config } from "../config/config"
import { Instance } from "../project/instance"
import { Flag } from "@/flag/flag"
import { Process } from "../util/process"
import { spawn as lspspawn } from "./launch"
import { Effect, Fiber, Layer, ServiceMap } from "effect"
export namespace LSP {
const log = Log.create({ service: "lsp" })
@@ -79,6 +77,77 @@ export namespace LSP {
}
}
const state = Instance.state(
async () => {
const clients: LSPClient.Info[] = []
const servers: Record<string, LSPServer.Info> = {}
const cfg = await Config.get()
if (cfg.lsp === false) {
log.info("all LSPs are disabled")
return {
broken: new Set<string>(),
servers,
clients,
spawning: new Map<string, Promise<LSPClient.Info | undefined>>(),
}
}
for (const server of Object.values(LSPServer)) {
servers[server.id] = server
}
filterExperimentalServers(servers)
for (const [name, item] of Object.entries(cfg.lsp ?? {})) {
const existing = servers[name]
if (item.disabled) {
log.info(`LSP server ${name} is disabled`)
delete servers[name]
continue
}
servers[name] = {
...existing,
id: name,
root: existing?.root ?? (async () => Instance.directory),
extensions: item.extensions ?? existing?.extensions ?? [],
spawn: async (root) => {
return {
process: lspspawn(item.command[0], item.command.slice(1), {
cwd: root,
env: {
...process.env,
...item.env,
},
}),
initialization: item.initialization,
}
},
}
}
log.info("enabled LSP servers", {
serverIds: Object.values(servers)
.map((server) => server.id)
.join(", "),
})
return {
broken: new Set<string>(),
servers,
clients,
spawning: new Map<string, Promise<LSPClient.Info | undefined>>(),
}
},
async (state) => {
await Promise.all(state.clients.map((client) => client.shutdown()))
},
)
export async function init() {
return state()
}
export const Status = z
.object({
id: z.string(),
@@ -91,6 +160,162 @@ export namespace LSP {
})
export type Status = z.infer<typeof Status>
export async function status() {
return state().then((x) => {
const result: Status[] = []
for (const client of x.clients) {
result.push({
id: client.serverID,
name: x.servers[client.serverID].id,
root: path.relative(Instance.directory, client.root),
status: "connected",
})
}
return result
})
}
async function getClients(file: string) {
const s = await state()
const extension = path.parse(file).ext || file
const result: LSPClient.Info[] = []
async function schedule(server: LSPServer.Info, root: string, key: string) {
const handle = await server
.spawn(root)
.then((value) => {
if (!value) s.broken.add(key)
return value
})
.catch((err) => {
s.broken.add(key)
log.error(`Failed to spawn LSP server ${server.id}`, { error: err })
return undefined
})
if (!handle) return undefined
log.info("spawned lsp server", { serverID: server.id })
const client = await LSPClient.create({
serverID: server.id,
server: handle,
root,
}).catch(async (err) => {
s.broken.add(key)
await Process.stop(handle.process)
log.error(`Failed to initialize LSP client ${server.id}`, { error: err })
return undefined
})
if (!client) {
return undefined
}
const existing = s.clients.find((x) => x.root === root && x.serverID === server.id)
if (existing) {
await Process.stop(handle.process)
return existing
}
s.clients.push(client)
return client
}
for (const server of Object.values(s.servers)) {
if (server.extensions.length && !server.extensions.includes(extension)) continue
const root = await server.root(file)
if (!root) continue
if (s.broken.has(root + server.id)) continue
const match = s.clients.find((x) => x.root === root && x.serverID === server.id)
if (match) {
result.push(match)
continue
}
const inflight = s.spawning.get(root + server.id)
if (inflight) {
const client = await inflight
if (!client) continue
result.push(client)
continue
}
const task = schedule(server, root, root + server.id)
s.spawning.set(root + server.id, task)
task.finally(() => {
if (s.spawning.get(root + server.id) === task) {
s.spawning.delete(root + server.id)
}
})
const client = await task
if (!client) continue
result.push(client)
Bus.publish(Event.Updated, {})
}
return result
}
export async function hasClients(file: string) {
const s = await state()
const extension = path.parse(file).ext || file
for (const server of Object.values(s.servers)) {
if (server.extensions.length && !server.extensions.includes(extension)) continue
const root = await server.root(file)
if (!root) continue
if (s.broken.has(root + server.id)) continue
return true
}
return false
}
export async function touchFile(input: string, waitForDiagnostics?: boolean) {
log.info("touching file", { file: input })
const clients = await getClients(input)
await Promise.all(
clients.map(async (client) => {
const wait = waitForDiagnostics ? client.waitForDiagnostics({ path: input }) : Promise.resolve()
await client.notify.open({ path: input })
return wait
}),
).catch((err) => {
log.error("failed to touch file", { err, file: input })
})
}
export async function diagnostics() {
const results: Record<string, LSPClient.Diagnostic[]> = {}
for (const result of await runAll(async (client) => client.diagnostics)) {
for (const [path, diagnostics] of result.entries()) {
const arr = results[path] || []
arr.push(...diagnostics)
results[path] = arr
}
}
return results
}
export async function hover(input: { file: string; line: number; character: number }) {
return run(input.file, (client) => {
return client.connection
.sendRequest("textDocument/hover", {
textDocument: {
uri: pathToFileURL(input.file).href,
},
position: {
line: input.line,
character: input.character,
},
})
.catch(() => null)
})
}
enum SymbolKind {
File = 1,
Module = 2,
@@ -131,504 +356,114 @@ export namespace LSP {
SymbolKind.Enum,
]
export interface Interface {
readonly init: () => Effect.Effect<void>
readonly status: () => Effect.Effect<Status[]>
readonly hasClients: (file: string) => Effect.Effect<boolean>
readonly touchFile: (input: string, waitForDiagnostics?: boolean) => Effect.Effect<void>
readonly diagnostics: () => Effect.Effect<Record<string, LSPClient.Diagnostic[]>>
readonly hover: (input: { file: string; line: number; character: number }) => Effect.Effect<any>
readonly workspaceSymbol: (query: string) => Effect.Effect<LSP.Symbol[]>
readonly documentSymbol: (uri: string) => Effect.Effect<(LSP.DocumentSymbol | LSP.Symbol)[]>
readonly definition: (input: { file: string; line: number; character: number }) => Effect.Effect<any[]>
readonly references: (input: { file: string; line: number; character: number }) => Effect.Effect<any[]>
readonly implementation: (input: { file: string; line: number; character: number }) => Effect.Effect<any[]>
readonly prepareCallHierarchy: (input: { file: string; line: number; character: number }) => Effect.Effect<any[]>
readonly incomingCalls: (input: { file: string; line: number; character: number }) => Effect.Effect<any[]>
readonly outgoingCalls: (input: { file: string; line: number; character: number }) => Effect.Effect<any[]>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/LSP") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const instance = yield* InstanceContext
const directory = instance.directory
const clients: LSPClient.Info[] = []
const servers: Record<string, LSPServer.Info> = {}
const broken = new Set<string>()
const spawning = new Map<string, Promise<LSPClient.Info | undefined>>()
// Load server configs lazily — forkScoped so it doesn't block layer construction
const load = Effect.fn("LSP.load")(function* () {
yield* Effect.promise(async () => {
const cfg = await Config.get()
if (cfg.lsp === false) {
log.info("all LSPs are disabled")
return
}
for (const server of Object.values(LSPServer)) {
servers[server.id] = server
}
filterExperimentalServers(servers)
for (const [name, item] of Object.entries(cfg.lsp ?? {})) {
const existing = servers[name]
if (item.disabled) {
log.info(`LSP server ${name} is disabled`)
delete servers[name]
continue
}
servers[name] = {
...existing,
id: name,
root: existing?.root ?? (async () => directory),
extensions: item.extensions ?? existing?.extensions ?? [],
spawn: async (root) => {
return {
process: lspspawn(item.command[0], item.command.slice(1), {
cwd: root,
env: {
...process.env,
...item.env,
},
}),
initialization: item.initialization,
}
},
}
}
log.info("enabled LSP servers", {
serverIds: Object.values(servers)
.map((server) => server.id)
.join(", "),
})
})
})
const loadFiber = yield* load().pipe(
Effect.catchCause((cause) => Effect.sync(() => log.error("init failed", { cause }))),
Effect.forkScoped,
)
// Cleanup: shut down all LSP clients when the scope is closed
yield* Effect.addFinalizer(() =>
Effect.promise(async () => {
await Promise.all(clients.map((client) => client.shutdown()))
}),
)
async function getClientsForFile(file: string) {
const extension = path.parse(file).ext || file
const result: LSPClient.Info[] = []
async function schedule(server: LSPServer.Info, root: string, key: string) {
const handle = await server
.spawn(root)
.then((value) => {
if (!value) broken.add(key)
return value
})
.catch((err) => {
broken.add(key)
log.error(`Failed to spawn LSP server ${server.id}`, { error: err })
return undefined
})
if (!handle) return undefined
log.info("spawned lsp server", { serverID: server.id })
const client = await LSPClient.create({
serverID: server.id,
server: handle,
root,
}).catch(async (err) => {
broken.add(key)
await Process.stop(handle.process)
log.error(`Failed to initialize LSP client ${server.id}`, { error: err })
return undefined
})
if (!client) {
return undefined
}
const existing = clients.find((x) => x.root === root && x.serverID === server.id)
if (existing) {
await Process.stop(handle.process)
return existing
}
clients.push(client)
return client
}
for (const server of Object.values(servers)) {
if (server.extensions.length && !server.extensions.includes(extension)) continue
const root = await server.root(file)
if (!root) continue
if (broken.has(root + server.id)) continue
const match = clients.find((x) => x.root === root && x.serverID === server.id)
if (match) {
result.push(match)
continue
}
const inflight = spawning.get(root + server.id)
if (inflight) {
const client = await inflight
if (!client) continue
result.push(client)
continue
}
const task = schedule(server, root, root + server.id)
spawning.set(root + server.id, task)
task.finally(() => {
if (spawning.get(root + server.id) === task) {
spawning.delete(root + server.id)
}
})
const client = await task
if (!client) continue
result.push(client)
Bus.publish(Event.Updated, {})
}
return result
}
async function runAllClients<T>(input: (client: LSPClient.Info) => Promise<T>): Promise<T[]> {
const tasks = clients.map((x) => input(x))
return Promise.all(tasks)
}
async function runForFile<T>(file: string, input: (client: LSPClient.Info) => Promise<T>): Promise<T[]> {
const matched = await getClientsForFile(file)
const tasks = matched.map((x) => input(x))
return Promise.all(tasks)
}
const init = Effect.fn("LSP.init")(function* () {
yield* Fiber.join(loadFiber)
})
const status = Effect.fn("LSP.status")(function* () {
yield* Fiber.join(loadFiber)
const result: Status[] = []
for (const client of clients) {
result.push({
id: client.serverID,
name: servers[client.serverID].id,
root: path.relative(directory, client.root),
status: "connected",
})
}
return result
})
const hasClients = Effect.fn("LSP.hasClients")(function* (file: string) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
const extension = path.parse(file).ext || file
for (const server of Object.values(servers)) {
if (server.extensions.length && !server.extensions.includes(extension)) continue
const root = await server.root(file)
if (!root) continue
if (broken.has(root + server.id)) continue
return true
}
return false
})
})
const touchFile = Effect.fn("LSP.touchFile")(function* (input: string, waitForDiagnostics?: boolean) {
yield* Fiber.join(loadFiber)
yield* Effect.promise(async () => {
log.info("touching file", { file: input })
const matched = await getClientsForFile(input)
await Promise.all(
matched.map(async (client) => {
const wait = waitForDiagnostics ? client.waitForDiagnostics({ path: input }) : Promise.resolve()
await client.notify.open({ path: input })
return wait
}),
).catch((err) => {
log.error("failed to touch file", { err, file: input })
})
})
})
const diagnostics = Effect.fn("LSP.diagnostics")(function* () {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
const results: Record<string, LSPClient.Diagnostic[]> = {}
for (const result of await runAllClients(async (client) => client.diagnostics)) {
for (const [path, diagnostics] of result.entries()) {
const arr = results[path] || []
arr.push(...diagnostics)
results[path] = arr
}
}
return results
})
})
const hover = Effect.fn("LSP.hover")(function* (input: { file: string; line: number; character: number }) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
return runForFile(input.file, (client) => {
return client.connection
.sendRequest("textDocument/hover", {
textDocument: {
uri: pathToFileURL(input.file).href,
},
position: {
line: input.line,
character: input.character,
},
})
.catch(() => null)
})
})
})
const workspaceSymbol = Effect.fn("LSP.workspaceSymbol")(function* (query: string) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
return runAllClients((client) =>
client.connection
.sendRequest("workspace/symbol", {
query,
})
.then((result: any) => result.filter((x: LSP.Symbol) => kinds.includes(x.kind)))
.then((result: any) => result.slice(0, 10))
.catch(() => []),
).then((result) => result.flat() as LSP.Symbol[])
})
})
const documentSymbol = Effect.fn("LSP.documentSymbol")(function* (uri: string) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
const file = fileURLToPath(uri)
return runForFile(file, (client) =>
client.connection
.sendRequest("textDocument/documentSymbol", {
textDocument: {
uri,
},
})
.catch(() => []),
)
.then((result) => result.flat() as (LSP.DocumentSymbol | LSP.Symbol)[])
.then((result) => result.filter(Boolean))
})
})
const definition = Effect.fn("LSP.definition")(function* (input: {
file: string
line: number
character: number
}) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
return runForFile(input.file, (client) =>
client.connection
.sendRequest("textDocument/definition", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => null),
).then((result) => result.flat().filter(Boolean))
})
})
const references = Effect.fn("LSP.references")(function* (input: {
file: string
line: number
character: number
}) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
return runForFile(input.file, (client) =>
client.connection
.sendRequest("textDocument/references", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
context: { includeDeclaration: true },
})
.catch(() => []),
).then((result) => result.flat().filter(Boolean))
})
})
const implementation = Effect.fn("LSP.implementation")(function* (input: {
file: string
line: number
character: number
}) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
return runForFile(input.file, (client) =>
client.connection
.sendRequest("textDocument/implementation", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => null),
).then((result) => result.flat().filter(Boolean))
})
})
const prepareCallHierarchy = Effect.fn("LSP.prepareCallHierarchy")(function* (input: {
file: string
line: number
character: number
}) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
return runForFile(input.file, (client) =>
client.connection
.sendRequest("textDocument/prepareCallHierarchy", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => []),
).then((result) => result.flat().filter(Boolean))
})
})
const incomingCalls = Effect.fn("LSP.incomingCalls")(function* (input: {
file: string
line: number
character: number
}) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
return runForFile(input.file, async (client) => {
const items = (await client.connection
.sendRequest("textDocument/prepareCallHierarchy", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => [])) as any[]
if (!items?.length) return []
return client.connection
.sendRequest("callHierarchy/incomingCalls", { item: items[0] })
.catch(() => [])
}).then((result) => result.flat().filter(Boolean))
})
})
const outgoingCalls = Effect.fn("LSP.outgoingCalls")(function* (input: {
file: string
line: number
character: number
}) {
yield* Fiber.join(loadFiber)
return yield* Effect.promise(async () => {
return runForFile(input.file, async (client) => {
const items = (await client.connection
.sendRequest("textDocument/prepareCallHierarchy", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => [])) as any[]
if (!items?.length) return []
return client.connection
.sendRequest("callHierarchy/outgoingCalls", { item: items[0] })
.catch(() => [])
}).then((result) => result.flat().filter(Boolean))
})
})
log.info("init")
return Service.of({
init,
status,
hasClients,
touchFile,
diagnostics,
hover,
workspaceSymbol,
documentSymbol,
definition,
references,
implementation,
prepareCallHierarchy,
incomingCalls,
outgoingCalls,
})
}),
)
// Async facades
export async function init() {
return runPromiseInstance(Service.use((svc) => svc.init()))
}
export async function status() {
return runPromiseInstance(Service.use((svc) => svc.status()))
}
export async function hasClients(file: string) {
return runPromiseInstance(Service.use((svc) => svc.hasClients(file)))
}
export async function touchFile(input: string, waitForDiagnostics?: boolean) {
return runPromiseInstance(Service.use((svc) => svc.touchFile(input, waitForDiagnostics)))
}
export async function diagnostics() {
return runPromiseInstance(Service.use((svc) => svc.diagnostics()))
}
export async function hover(input: { file: string; line: number; character: number }) {
return runPromiseInstance(Service.use((svc) => svc.hover(input)))
}
export async function workspaceSymbol(query: string) {
return runPromiseInstance(Service.use((svc) => svc.workspaceSymbol(query)))
return runAll((client) =>
client.connection
.sendRequest("workspace/symbol", {
query,
})
.then((result: any) => result.filter((x: LSP.Symbol) => kinds.includes(x.kind)))
.then((result: any) => result.slice(0, 10))
.catch(() => []),
).then((result) => result.flat() as LSP.Symbol[])
}
export async function documentSymbol(uri: string) {
return runPromiseInstance(Service.use((svc) => svc.documentSymbol(uri)))
const file = fileURLToPath(uri)
return run(file, (client) =>
client.connection
.sendRequest("textDocument/documentSymbol", {
textDocument: {
uri,
},
})
.catch(() => []),
)
.then((result) => result.flat() as (LSP.DocumentSymbol | LSP.Symbol)[])
.then((result) => result.filter(Boolean))
}
export async function definition(input: { file: string; line: number; character: number }) {
return runPromiseInstance(Service.use((svc) => svc.definition(input)))
return run(input.file, (client) =>
client.connection
.sendRequest("textDocument/definition", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => null),
).then((result) => result.flat().filter(Boolean))
}
export async function references(input: { file: string; line: number; character: number }) {
return runPromiseInstance(Service.use((svc) => svc.references(input)))
return run(input.file, (client) =>
client.connection
.sendRequest("textDocument/references", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
context: { includeDeclaration: true },
})
.catch(() => []),
).then((result) => result.flat().filter(Boolean))
}
export async function implementation(input: { file: string; line: number; character: number }) {
return runPromiseInstance(Service.use((svc) => svc.implementation(input)))
return run(input.file, (client) =>
client.connection
.sendRequest("textDocument/implementation", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => null),
).then((result) => result.flat().filter(Boolean))
}
export async function prepareCallHierarchy(input: { file: string; line: number; character: number }) {
return runPromiseInstance(Service.use((svc) => svc.prepareCallHierarchy(input)))
return run(input.file, (client) =>
client.connection
.sendRequest("textDocument/prepareCallHierarchy", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => []),
).then((result) => result.flat().filter(Boolean))
}
export async function incomingCalls(input: { file: string; line: number; character: number }) {
return runPromiseInstance(Service.use((svc) => svc.incomingCalls(input)))
return run(input.file, async (client) => {
const items = (await client.connection
.sendRequest("textDocument/prepareCallHierarchy", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => [])) as any[]
if (!items?.length) return []
return client.connection.sendRequest("callHierarchy/incomingCalls", { item: items[0] }).catch(() => [])
}).then((result) => result.flat().filter(Boolean))
}
export async function outgoingCalls(input: { file: string; line: number; character: number }) {
return runPromiseInstance(Service.use((svc) => svc.outgoingCalls(input)))
return run(input.file, async (client) => {
const items = (await client.connection
.sendRequest("textDocument/prepareCallHierarchy", {
textDocument: { uri: pathToFileURL(input.file).href },
position: { line: input.line, character: input.character },
})
.catch(() => [])) as any[]
if (!items?.length) return []
return client.connection.sendRequest("callHierarchy/outgoingCalls", { item: items[0] }).catch(() => [])
}).then((result) => result.flat().filter(Boolean))
}
async function runAll<T>(input: (client: LSPClient.Info) => Promise<T>): Promise<T[]> {
const clients = await state().then((x) => x.clients)
const tasks = clients.map((x) => input(x))
return Promise.all(tasks)
}
async function run<T>(file: string, input: (client: LSPClient.Info) => Promise<T>): Promise<T[]> {
const clients = await getClients(file)
const tasks = clients.map((x) => input(x))
return Promise.all(tasks)
}
export namespace Diagnostic {

View File

@@ -1,13 +1,12 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { InstanceContext } from "@/effect/instance-context"
import { type IPty } from "bun-pty"
import z from "zod"
import { Log } from "../util/log"
import { Instance } from "../project/instance"
import { lazy } from "@opencode-ai/util/lazy"
import { Shell } from "@/shell/shell"
import { Plugin } from "@/plugin"
import { PtyID } from "./schema"
import { Effect, Layer, ServiceMap } from "effect"
export namespace Pty {
const log = Log.create({ service: "pty" })
@@ -90,232 +89,297 @@ export namespace Pty {
subscribers: Map<unknown, Socket>
}
const state = Instance.state(
() => new Map<PtyID, ActiveSession>(),
async (sessions) => {
for (const session of sessions.values()) {
export interface Interface {
readonly list: () => Effect.Effect<Info[]>
readonly get: (id: PtyID) => Effect.Effect<Info | undefined>
readonly create: (input: CreateInput) => Effect.Effect<Info>
readonly update: (id: PtyID, input: UpdateInput) => Effect.Effect<Info | undefined>
readonly remove: (id: PtyID) => Effect.Effect<void>
readonly resize: (id: PtyID, cols: number, rows: number) => Effect.Effect<void>
readonly write: (id: PtyID, data: string) => Effect.Effect<void>
readonly connect: (
id: PtyID,
ws: Socket,
cursor?: number,
) => Effect.Effect<{ onMessage: (message: string | ArrayBuffer) => void; onClose: () => void } | undefined>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Pty") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const instance = yield* InstanceContext
const sessions = new Map<PtyID, ActiveSession>()
yield* Effect.addFinalizer(() =>
Effect.sync(() => {
for (const session of sessions.values()) {
try {
session.process.kill()
} catch {}
for (const [key, ws] of session.subscribers.entries()) {
try {
if (ws.data === key) ws.close()
} catch {}
}
}
sessions.clear()
}),
)
const removeSession = (id: PtyID) => {
const session = sessions.get(id)
if (!session) return
sessions.delete(id)
log.info("removing session", { id })
try {
session.process.kill()
} catch {}
for (const [key, ws] of session.subscribers.entries()) {
try {
if (ws.data === key) ws.close()
} catch {}
}
session.subscribers.clear()
Bus.publish(Event.Deleted, { id: session.info.id })
}
const list = Effect.fn("Pty.list")(function* () {
return Array.from(sessions.values()).map((s) => s.info)
})
const get = Effect.fn("Pty.get")(function* (id: PtyID) {
return sessions.get(id)?.info
})
const create = Effect.fn("Pty.create")(function* (input: CreateInput) {
return yield* Effect.promise(async () => {
const [{ Shell }, { Plugin }] = await Promise.all([import("@/shell/shell"), import("@/plugin")])
const id = PtyID.ascending()
const command = input.command || Shell.preferred()
const args = input.args || []
if (command.endsWith("sh")) {
args.push("-l")
}
const cwd = input.cwd || instance.directory
const shellEnv = await Plugin.trigger("shell.env", { cwd }, { env: {} })
const env = {
...process.env,
...input.env,
...shellEnv.env,
TERM: "xterm-256color",
OPENCODE_TERMINAL: "1",
} as Record<string, string>
if (process.platform === "win32") {
env.LC_ALL = "C.UTF-8"
env.LC_CTYPE = "C.UTF-8"
env.LANG = "C.UTF-8"
}
log.info("creating session", { id, cmd: command, args, cwd })
const spawn = await pty()
const ptyProcess = spawn(command, args, {
name: "xterm-256color",
cwd,
env,
})
const info = {
id,
title: input.title || `Terminal ${id.slice(-4)}`,
command,
args,
cwd,
status: "running",
pid: ptyProcess.pid,
} as const
const session: ActiveSession = {
info,
process: ptyProcess,
buffer: "",
bufferCursor: 0,
cursor: 0,
subscribers: new Map(),
}
sessions.set(id, session)
ptyProcess.onData((chunk) => {
session.cursor += chunk.length
for (const [key, ws] of session.subscribers.entries()) {
if (ws.readyState !== 1) {
session.subscribers.delete(key)
continue
}
if (ws.data !== key) {
session.subscribers.delete(key)
continue
}
try {
ws.send(chunk)
} catch {
session.subscribers.delete(key)
}
}
session.buffer += chunk
if (session.buffer.length <= BUFFER_LIMIT) return
const excess = session.buffer.length - BUFFER_LIMIT
session.buffer = session.buffer.slice(excess)
session.bufferCursor += excess
})
ptyProcess.onExit(({ exitCode }) => {
if (session.info.status === "exited") return
log.info("session exited", { id, exitCode })
session.info.status = "exited"
Bus.publish(Event.Exited, { id, exitCode })
removeSession(id)
})
Bus.publish(Event.Created, { info })
return info
})
})
const update = Effect.fn("Pty.update")(function* (id: PtyID, input: UpdateInput) {
const session = sessions.get(id)
if (!session) return
if (input.title) {
session.info.title = input.title
}
if (input.size) {
session.process.resize(input.size.cols, input.size.rows)
}
Bus.publish(Event.Updated, { info: session.info })
return session.info
})
const remove = Effect.fn("Pty.remove")(function* (id: PtyID) {
removeSession(id)
})
const resize = Effect.fn("Pty.resize")(function* (id: PtyID, cols: number, rows: number) {
const session = sessions.get(id)
if (session && session.info.status === "running") {
session.process.resize(cols, rows)
}
})
const write = Effect.fn("Pty.write")(function* (id: PtyID, data: string) {
const session = sessions.get(id)
if (session && session.info.status === "running") {
session.process.write(data)
}
})
const connect = Effect.fn("Pty.connect")(function* (id: PtyID, ws: Socket, cursor?: number) {
const session = sessions.get(id)
if (!session) {
ws.close()
return
}
log.info("client connected to session", { id })
const connectionKey = ws.data && typeof ws.data === "object" ? ws.data : ws
session.subscribers.delete(connectionKey)
session.subscribers.set(connectionKey, ws)
const cleanup = () => {
session.subscribers.delete(connectionKey)
}
const start = session.bufferCursor
const end = session.cursor
const from =
cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0
const data = (() => {
if (!session.buffer) return ""
if (from >= end) return ""
const offset = Math.max(0, from - start)
if (offset >= session.buffer.length) return ""
return session.buffer.slice(offset)
})()
if (data) {
try {
for (let i = 0; i < data.length; i += BUFFER_CHUNK) {
ws.send(data.slice(i, i + BUFFER_CHUNK))
}
} catch {
// ignore
cleanup()
ws.close()
return
}
}
}
sessions.clear()
},
try {
ws.send(meta(end))
} catch {
cleanup()
ws.close()
return
}
return {
onMessage: (message: string | ArrayBuffer) => {
session.process.write(String(message))
},
onClose: () => {
log.info("client disconnected from session", { id })
cleanup()
},
}
})
return Service.of({ list, get, create, update, remove, resize, write, connect })
}),
)
function runtime() {
return require("@/effect/runtime") as typeof import("@/effect/runtime")
}
function run<A, E>(effect: Effect.Effect<A, E, Service>) {
return runtime().runPromiseInstance(effect)
}
function runSync<A, E>(effect: Effect.Effect<A, E, Service>) {
return runtime().runSyncInstance(effect)
}
// Sync facades
export function list() {
return Array.from(state().values()).map((s) => s.info)
return runSync(Service.use((svc) => svc.list()))
}
export function get(id: PtyID) {
return state().get(id)?.info
}
export async function create(input: CreateInput) {
const id = PtyID.ascending()
const command = input.command || Shell.preferred()
const args = input.args || []
if (command.endsWith("sh")) {
args.push("-l")
}
const cwd = input.cwd || Instance.directory
const shellEnv = await Plugin.trigger("shell.env", { cwd }, { env: {} })
const env = {
...process.env,
...input.env,
...shellEnv.env,
TERM: "xterm-256color",
OPENCODE_TERMINAL: "1",
} as Record<string, string>
if (process.platform === "win32") {
env.LC_ALL = "C.UTF-8"
env.LC_CTYPE = "C.UTF-8"
env.LANG = "C.UTF-8"
}
log.info("creating session", { id, cmd: command, args, cwd })
const spawn = await pty()
const ptyProcess = spawn(command, args, {
name: "xterm-256color",
cwd,
env,
})
const info = {
id,
title: input.title || `Terminal ${id.slice(-4)}`,
command,
args,
cwd,
status: "running",
pid: ptyProcess.pid,
} as const
const session: ActiveSession = {
info,
process: ptyProcess,
buffer: "",
bufferCursor: 0,
cursor: 0,
subscribers: new Map(),
}
state().set(id, session)
ptyProcess.onData(
Instance.bind((chunk) => {
session.cursor += chunk.length
for (const [key, ws] of session.subscribers.entries()) {
if (ws.readyState !== 1) {
session.subscribers.delete(key)
continue
}
if (ws.data !== key) {
session.subscribers.delete(key)
continue
}
try {
ws.send(chunk)
} catch {
session.subscribers.delete(key)
}
}
session.buffer += chunk
if (session.buffer.length <= BUFFER_LIMIT) return
const excess = session.buffer.length - BUFFER_LIMIT
session.buffer = session.buffer.slice(excess)
session.bufferCursor += excess
}),
)
ptyProcess.onExit(
Instance.bind(({ exitCode }) => {
if (session.info.status === "exited") return
log.info("session exited", { id, exitCode })
session.info.status = "exited"
Bus.publish(Event.Exited, { id, exitCode })
remove(id)
}),
)
Bus.publish(Event.Created, { info })
return info
}
export async function update(id: PtyID, input: UpdateInput) {
const session = state().get(id)
if (!session) return
if (input.title) {
session.info.title = input.title
}
if (input.size) {
session.process.resize(input.size.cols, input.size.rows)
}
Bus.publish(Event.Updated, { info: session.info })
return session.info
}
export async function remove(id: PtyID) {
const session = state().get(id)
if (!session) return
state().delete(id)
log.info("removing session", { id })
try {
session.process.kill()
} catch {}
for (const [key, ws] of session.subscribers.entries()) {
try {
if (ws.data === key) ws.close()
} catch {
// ignore
}
}
session.subscribers.clear()
Bus.publish(Event.Deleted, { id: session.info.id })
return runSync(Service.use((svc) => svc.get(id)))
}
export function resize(id: PtyID, cols: number, rows: number) {
const session = state().get(id)
if (session && session.info.status === "running") {
session.process.resize(cols, rows)
}
runSync(Service.use((svc) => svc.resize(id, cols, rows)))
}
export function write(id: PtyID, data: string) {
const session = state().get(id)
if (session && session.info.status === "running") {
session.process.write(data)
}
runSync(Service.use((svc) => svc.write(id, data)))
}
export function connect(id: PtyID, ws: Socket, cursor?: number) {
const session = state().get(id)
if (!session) {
ws.close()
return
}
log.info("client connected to session", { id })
return runSync(Service.use((svc) => svc.connect(id, ws, cursor)))
}
// Use ws.data as the unique key for this connection lifecycle.
// If ws.data is undefined, fallback to ws object.
const connectionKey = ws.data && typeof ws.data === "object" ? ws.data : ws
// Async facades
export async function create(input: CreateInput) {
return run(Service.use((svc) => svc.create(input)))
}
// Optionally cleanup if the key somehow exists
session.subscribers.delete(connectionKey)
session.subscribers.set(connectionKey, ws)
export async function update(id: PtyID, input: UpdateInput) {
return run(Service.use((svc) => svc.update(id, input)))
}
const cleanup = () => {
session.subscribers.delete(connectionKey)
}
const start = session.bufferCursor
const end = session.cursor
const from =
cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0
const data = (() => {
if (!session.buffer) return ""
if (from >= end) return ""
const offset = Math.max(0, from - start)
if (offset >= session.buffer.length) return ""
return session.buffer.slice(offset)
})()
if (data) {
try {
for (let i = 0; i < data.length; i += BUFFER_CHUNK) {
ws.send(data.slice(i, i + BUFFER_CHUNK))
}
} catch {
cleanup()
ws.close()
return
}
}
try {
ws.send(meta(end))
} catch {
cleanup()
ws.close()
return
}
return {
onMessage: (message: string | ArrayBuffer) => {
session.process.write(String(message))
},
onClose: () => {
log.info("client disconnected from session", { id })
cleanup()
},
}
export async function remove(id: PtyID) {
return run(Service.use((svc) => svc.remove(id)))
}
}