Compare commits

...

1 Commits

Author SHA1 Message Date
Kit Langton
6a3cb06c6c effectify Pty service, add runSyncInstance helper 2026-03-19 21:18:15 -04:00
4 changed files with 263 additions and 202 deletions

View File

@@ -128,7 +128,7 @@ Still open and likely worth migrating:
- [ ] `Plugin`
- [ ] `ToolRegistry`
- [ ] `Pty`
- [x] `Pty`
- [ ] `Worktree`
- [ ] `Installation`
- [ ] `Bus`

View File

@@ -4,6 +4,7 @@ import { FileTime } from "@/file/time"
import { FileWatcher } from "@/file/watcher"
import { Format } from "@/format"
import { PermissionNext } from "@/permission"
import { Pty } from "@/pty"
import { Instance } from "@/project/instance"
import { Vcs } from "@/project/vcs"
import { ProviderAuth } from "@/provider/auth"
@@ -24,6 +25,7 @@ export type InstanceServices =
| FileTime.Service
| Format.Service
| File.Service
| Pty.Service
| Skill.Service
| Snapshot.Service
@@ -44,6 +46,7 @@ function lookup(_key: string) {
Layer.fresh(FileTime.layer).pipe(Layer.orDie),
Layer.fresh(Format.layer),
Layer.fresh(File.layer),
Layer.fresh(Pty.layer),
Layer.fresh(Skill.defaultLayer),
Layer.fresh(Snapshot.defaultLayer),
).pipe(Layer.provide(ctx))

View File

@@ -18,6 +18,10 @@ export function runPromiseInstance<A, E>(effect: Effect.Effect<A, E, InstanceSer
return runtime.runPromise(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
}
export function runSyncInstance<A, E>(effect: Effect.Effect<A, E, InstanceServices>) {
return runtime.runSync(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
}
export function disposeRuntime() {
return runtime.dispose()
}

View File

@@ -1,13 +1,15 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { InstanceContext } from "@/effect/instance-context"
import { runPromiseInstance, runSyncInstance } from "@/effect/runtime"
import { type IPty } from "bun-pty"
import z from "zod"
import { Log } from "../util/log"
import { Instance } from "../project/instance"
import { lazy } from "@opencode-ai/util/lazy"
import { Shell } from "@/shell/shell"
import { Plugin } from "@/plugin"
import { PtyID } from "./schema"
import { Effect, Layer, ServiceMap } from "effect"
export namespace Pty {
const log = Log.create({ service: "pty" })
@@ -90,232 +92,284 @@ export namespace Pty {
subscribers: Map<unknown, Socket>
}
const state = Instance.state(
() => new Map<PtyID, ActiveSession>(),
async (sessions) => {
for (const session of sessions.values()) {
export interface Interface {
readonly list: () => Effect.Effect<Info[]>
readonly get: (id: PtyID) => Effect.Effect<Info | undefined>
readonly create: (input: CreateInput) => Effect.Effect<Info>
readonly update: (id: PtyID, input: UpdateInput) => Effect.Effect<Info | undefined>
readonly remove: (id: PtyID) => Effect.Effect<void>
readonly resize: (id: PtyID, cols: number, rows: number) => Effect.Effect<void>
readonly write: (id: PtyID, data: string) => Effect.Effect<void>
readonly connect: (
id: PtyID,
ws: Socket,
cursor?: number,
) => Effect.Effect<{ onMessage: (message: string | ArrayBuffer) => void; onClose: () => void } | undefined>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Pty") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const instance = yield* InstanceContext
const sessions = new Map<PtyID, ActiveSession>()
yield* Effect.addFinalizer(() =>
Effect.sync(() => {
for (const session of sessions.values()) {
try {
session.process.kill()
} catch {}
for (const [key, ws] of session.subscribers.entries()) {
try {
if (ws.data === key) ws.close()
} catch {}
}
}
sessions.clear()
}),
)
const removeSession = (id: PtyID) => {
const session = sessions.get(id)
if (!session) return
sessions.delete(id)
log.info("removing session", { id })
try {
session.process.kill()
} catch {}
for (const [key, ws] of session.subscribers.entries()) {
try {
if (ws.data === key) ws.close()
} catch {}
}
session.subscribers.clear()
Bus.publish(Event.Deleted, { id: session.info.id })
}
const list = Effect.fn("Pty.list")(function* () {
return Array.from(sessions.values()).map((s) => s.info)
})
const get = Effect.fn("Pty.get")(function* (id: PtyID) {
return sessions.get(id)?.info
})
const create = Effect.fn("Pty.create")(function* (input: CreateInput) {
return yield* Effect.promise(async () => {
const id = PtyID.ascending()
const command = input.command || Shell.preferred()
const args = input.args || []
if (command.endsWith("sh")) {
args.push("-l")
}
const cwd = input.cwd || instance.directory
const shellEnv = await Plugin.trigger("shell.env", { cwd }, { env: {} })
const env = {
...process.env,
...input.env,
...shellEnv.env,
TERM: "xterm-256color",
OPENCODE_TERMINAL: "1",
} as Record<string, string>
if (process.platform === "win32") {
env.LC_ALL = "C.UTF-8"
env.LC_CTYPE = "C.UTF-8"
env.LANG = "C.UTF-8"
}
log.info("creating session", { id, cmd: command, args, cwd })
const spawn = await pty()
const ptyProcess = spawn(command, args, {
name: "xterm-256color",
cwd,
env,
})
const info = {
id,
title: input.title || `Terminal ${id.slice(-4)}`,
command,
args,
cwd,
status: "running",
pid: ptyProcess.pid,
} as const
const session: ActiveSession = {
info,
process: ptyProcess,
buffer: "",
bufferCursor: 0,
cursor: 0,
subscribers: new Map(),
}
sessions.set(id, session)
ptyProcess.onData((chunk) => {
session.cursor += chunk.length
for (const [key, ws] of session.subscribers.entries()) {
if (ws.readyState !== 1) {
session.subscribers.delete(key)
continue
}
if (ws.data !== key) {
session.subscribers.delete(key)
continue
}
try {
ws.send(chunk)
} catch {
session.subscribers.delete(key)
}
}
session.buffer += chunk
if (session.buffer.length <= BUFFER_LIMIT) return
const excess = session.buffer.length - BUFFER_LIMIT
session.buffer = session.buffer.slice(excess)
session.bufferCursor += excess
})
ptyProcess.onExit(({ exitCode }) => {
if (session.info.status === "exited") return
log.info("session exited", { id, exitCode })
session.info.status = "exited"
Bus.publish(Event.Exited, { id, exitCode })
removeSession(id)
})
Bus.publish(Event.Created, { info })
return info
})
})
const update = Effect.fn("Pty.update")(function* (id: PtyID, input: UpdateInput) {
const session = sessions.get(id)
if (!session) return
if (input.title) {
session.info.title = input.title
}
if (input.size) {
session.process.resize(input.size.cols, input.size.rows)
}
Bus.publish(Event.Updated, { info: session.info })
return session.info
})
const remove = Effect.fn("Pty.remove")(function* (id: PtyID) {
removeSession(id)
})
const resize = Effect.fn("Pty.resize")(function* (id: PtyID, cols: number, rows: number) {
const session = sessions.get(id)
if (session && session.info.status === "running") {
session.process.resize(cols, rows)
}
})
const write = Effect.fn("Pty.write")(function* (id: PtyID, data: string) {
const session = sessions.get(id)
if (session && session.info.status === "running") {
session.process.write(data)
}
})
const connect = Effect.fn("Pty.connect")(function* (id: PtyID, ws: Socket, cursor?: number) {
const session = sessions.get(id)
if (!session) {
ws.close()
return
}
log.info("client connected to session", { id })
const connectionKey = ws.data && typeof ws.data === "object" ? ws.data : ws
session.subscribers.delete(connectionKey)
session.subscribers.set(connectionKey, ws)
const cleanup = () => {
session.subscribers.delete(connectionKey)
}
const start = session.bufferCursor
const end = session.cursor
const from =
cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0
const data = (() => {
if (!session.buffer) return ""
if (from >= end) return ""
const offset = Math.max(0, from - start)
if (offset >= session.buffer.length) return ""
return session.buffer.slice(offset)
})()
if (data) {
try {
for (let i = 0; i < data.length; i += BUFFER_CHUNK) {
ws.send(data.slice(i, i + BUFFER_CHUNK))
}
} catch {
// ignore
cleanup()
ws.close()
return
}
}
}
sessions.clear()
},
try {
ws.send(meta(end))
} catch {
cleanup()
ws.close()
return
}
return {
onMessage: (message: string | ArrayBuffer) => {
session.process.write(String(message))
},
onClose: () => {
log.info("client disconnected from session", { id })
cleanup()
},
}
})
return Service.of({ list, get, create, update, remove, resize, write, connect })
}),
)
// Sync facades
export function list() {
return Array.from(state().values()).map((s) => s.info)
return runSyncInstance(Service.use((svc) => svc.list()))
}
export function get(id: PtyID) {
return state().get(id)?.info
}
export async function create(input: CreateInput) {
const id = PtyID.ascending()
const command = input.command || Shell.preferred()
const args = input.args || []
if (command.endsWith("sh")) {
args.push("-l")
}
const cwd = input.cwd || Instance.directory
const shellEnv = await Plugin.trigger("shell.env", { cwd }, { env: {} })
const env = {
...process.env,
...input.env,
...shellEnv.env,
TERM: "xterm-256color",
OPENCODE_TERMINAL: "1",
} as Record<string, string>
if (process.platform === "win32") {
env.LC_ALL = "C.UTF-8"
env.LC_CTYPE = "C.UTF-8"
env.LANG = "C.UTF-8"
}
log.info("creating session", { id, cmd: command, args, cwd })
const spawn = await pty()
const ptyProcess = spawn(command, args, {
name: "xterm-256color",
cwd,
env,
})
const info = {
id,
title: input.title || `Terminal ${id.slice(-4)}`,
command,
args,
cwd,
status: "running",
pid: ptyProcess.pid,
} as const
const session: ActiveSession = {
info,
process: ptyProcess,
buffer: "",
bufferCursor: 0,
cursor: 0,
subscribers: new Map(),
}
state().set(id, session)
ptyProcess.onData(
Instance.bind((chunk) => {
session.cursor += chunk.length
for (const [key, ws] of session.subscribers.entries()) {
if (ws.readyState !== 1) {
session.subscribers.delete(key)
continue
}
if (ws.data !== key) {
session.subscribers.delete(key)
continue
}
try {
ws.send(chunk)
} catch {
session.subscribers.delete(key)
}
}
session.buffer += chunk
if (session.buffer.length <= BUFFER_LIMIT) return
const excess = session.buffer.length - BUFFER_LIMIT
session.buffer = session.buffer.slice(excess)
session.bufferCursor += excess
}),
)
ptyProcess.onExit(
Instance.bind(({ exitCode }) => {
if (session.info.status === "exited") return
log.info("session exited", { id, exitCode })
session.info.status = "exited"
Bus.publish(Event.Exited, { id, exitCode })
remove(id)
}),
)
Bus.publish(Event.Created, { info })
return info
}
export async function update(id: PtyID, input: UpdateInput) {
const session = state().get(id)
if (!session) return
if (input.title) {
session.info.title = input.title
}
if (input.size) {
session.process.resize(input.size.cols, input.size.rows)
}
Bus.publish(Event.Updated, { info: session.info })
return session.info
}
export async function remove(id: PtyID) {
const session = state().get(id)
if (!session) return
state().delete(id)
log.info("removing session", { id })
try {
session.process.kill()
} catch {}
for (const [key, ws] of session.subscribers.entries()) {
try {
if (ws.data === key) ws.close()
} catch {
// ignore
}
}
session.subscribers.clear()
Bus.publish(Event.Deleted, { id: session.info.id })
return runSyncInstance(Service.use((svc) => svc.get(id)))
}
export function resize(id: PtyID, cols: number, rows: number) {
const session = state().get(id)
if (session && session.info.status === "running") {
session.process.resize(cols, rows)
}
runSyncInstance(Service.use((svc) => svc.resize(id, cols, rows)))
}
export function write(id: PtyID, data: string) {
const session = state().get(id)
if (session && session.info.status === "running") {
session.process.write(data)
}
runSyncInstance(Service.use((svc) => svc.write(id, data)))
}
export function connect(id: PtyID, ws: Socket, cursor?: number) {
const session = state().get(id)
if (!session) {
ws.close()
return
}
log.info("client connected to session", { id })
return runSyncInstance(Service.use((svc) => svc.connect(id, ws, cursor)))
}
// Use ws.data as the unique key for this connection lifecycle.
// If ws.data is undefined, fallback to ws object.
const connectionKey = ws.data && typeof ws.data === "object" ? ws.data : ws
// Async facades
export async function create(input: CreateInput) {
return runPromiseInstance(Service.use((svc) => svc.create(input)))
}
// Optionally cleanup if the key somehow exists
session.subscribers.delete(connectionKey)
session.subscribers.set(connectionKey, ws)
export async function update(id: PtyID, input: UpdateInput) {
return runPromiseInstance(Service.use((svc) => svc.update(id, input)))
}
const cleanup = () => {
session.subscribers.delete(connectionKey)
}
const start = session.bufferCursor
const end = session.cursor
const from =
cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0
const data = (() => {
if (!session.buffer) return ""
if (from >= end) return ""
const offset = Math.max(0, from - start)
if (offset >= session.buffer.length) return ""
return session.buffer.slice(offset)
})()
if (data) {
try {
for (let i = 0; i < data.length; i += BUFFER_CHUNK) {
ws.send(data.slice(i, i + BUFFER_CHUNK))
}
} catch {
cleanup()
ws.close()
return
}
}
try {
ws.send(meta(end))
} catch {
cleanup()
ws.close()
return
}
return {
onMessage: (message: string | ArrayBuffer) => {
session.process.write(String(message))
},
onClose: () => {
log.info("client disconnected from session", { id })
cleanup()
},
}
export async function remove(id: PtyID) {
return runPromiseInstance(Service.use((svc) => svc.remove(id)))
}
}