mirror of
https://github.com/anomalyco/opencode.git
synced 2026-03-20 05:34:45 +00:00
Compare commits
2 Commits
production
...
kit/effect
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6a3cb06c6c | ||
|
|
ddcb32ae0b |
@@ -128,7 +128,7 @@ Still open and likely worth migrating:
|
||||
|
||||
- [ ] `Plugin`
|
||||
- [ ] `ToolRegistry`
|
||||
- [ ] `Pty`
|
||||
- [x] `Pty`
|
||||
- [ ] `Worktree`
|
||||
- [ ] `Installation`
|
||||
- [ ] `Bus`
|
||||
|
||||
@@ -907,12 +907,12 @@ export function Session() {
|
||||
const filename = options.filename.trim()
|
||||
const filepath = path.join(exportDir, filename)
|
||||
|
||||
await Bun.write(filepath, transcript)
|
||||
await Filesystem.write(filepath, transcript)
|
||||
|
||||
// Open with EDITOR if available
|
||||
const result = await Editor.open({ value: transcript, renderer })
|
||||
if (result !== undefined) {
|
||||
await Bun.write(filepath, result)
|
||||
await Filesystem.write(filepath, result)
|
||||
}
|
||||
|
||||
toast.show({ message: `Session exported to ${filename}`, variant: "success" })
|
||||
|
||||
@@ -8,7 +8,6 @@ import { upgrade } from "@/cli/upgrade"
|
||||
import { Config } from "@/config/config"
|
||||
import { GlobalBus } from "@/bus/global"
|
||||
import { createOpencodeClient, type Event } from "@opencode-ai/sdk/v2"
|
||||
import type { BunWebSocketData } from "hono/bun"
|
||||
import { Flag } from "@/flag/flag"
|
||||
import { setTimeout as sleep } from "node:timers/promises"
|
||||
|
||||
@@ -38,7 +37,7 @@ GlobalBus.on("event", (event) => {
|
||||
Rpc.emit("global.event", event)
|
||||
})
|
||||
|
||||
let server: Bun.Server<BunWebSocketData> | undefined
|
||||
let server: Awaited<ReturnType<typeof Server.listen>> | undefined
|
||||
|
||||
const eventStream = {
|
||||
abort: undefined as AbortController | undefined,
|
||||
@@ -120,7 +119,7 @@ export const rpc = {
|
||||
},
|
||||
async server(input: { port: number; hostname: string; mdns?: boolean; cors?: string[] }) {
|
||||
if (server) await server.stop(true)
|
||||
server = Server.listen(input)
|
||||
server = await Server.listen(input)
|
||||
return { url: server.url.toString() }
|
||||
},
|
||||
async checkUpgrade(input: { directory: string }) {
|
||||
@@ -143,7 +142,7 @@ export const rpc = {
|
||||
Log.Default.info("worker shutting down")
|
||||
if (eventStream.abort) eventStream.abort.abort()
|
||||
await Instance.disposeAll()
|
||||
if (server) server.stop(true)
|
||||
if (server) await server.stop(true)
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ import { FileTime } from "@/file/time"
|
||||
import { FileWatcher } from "@/file/watcher"
|
||||
import { Format } from "@/format"
|
||||
import { PermissionNext } from "@/permission"
|
||||
import { Pty } from "@/pty"
|
||||
import { Instance } from "@/project/instance"
|
||||
import { Vcs } from "@/project/vcs"
|
||||
import { ProviderAuth } from "@/provider/auth"
|
||||
@@ -24,6 +25,7 @@ export type InstanceServices =
|
||||
| FileTime.Service
|
||||
| Format.Service
|
||||
| File.Service
|
||||
| Pty.Service
|
||||
| Skill.Service
|
||||
| Snapshot.Service
|
||||
|
||||
@@ -44,6 +46,7 @@ function lookup(_key: string) {
|
||||
Layer.fresh(FileTime.layer).pipe(Layer.orDie),
|
||||
Layer.fresh(Format.layer),
|
||||
Layer.fresh(File.layer),
|
||||
Layer.fresh(Pty.layer),
|
||||
Layer.fresh(Skill.defaultLayer),
|
||||
Layer.fresh(Snapshot.defaultLayer),
|
||||
).pipe(Layer.provide(ctx))
|
||||
|
||||
@@ -18,6 +18,10 @@ export function runPromiseInstance<A, E>(effect: Effect.Effect<A, E, InstanceSer
|
||||
return runtime.runPromise(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
|
||||
}
|
||||
|
||||
export function runSyncInstance<A, E>(effect: Effect.Effect<A, E, InstanceServices>) {
|
||||
return runtime.runSync(effect.pipe(Effect.provide(Instances.get(Instance.directory))))
|
||||
}
|
||||
|
||||
export function disposeRuntime() {
|
||||
return runtime.dispose()
|
||||
}
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
import { BusEvent } from "@/bus/bus-event"
|
||||
import { Bus } from "@/bus"
|
||||
import { InstanceContext } from "@/effect/instance-context"
|
||||
import { runPromiseInstance, runSyncInstance } from "@/effect/runtime"
|
||||
import { type IPty } from "bun-pty"
|
||||
import z from "zod"
|
||||
import { Log } from "../util/log"
|
||||
import { Instance } from "../project/instance"
|
||||
import { lazy } from "@opencode-ai/util/lazy"
|
||||
import { Shell } from "@/shell/shell"
|
||||
import { Plugin } from "@/plugin"
|
||||
import { PtyID } from "./schema"
|
||||
import { Effect, Layer, ServiceMap } from "effect"
|
||||
|
||||
export namespace Pty {
|
||||
const log = Log.create({ service: "pty" })
|
||||
@@ -90,232 +92,284 @@ export namespace Pty {
|
||||
subscribers: Map<unknown, Socket>
|
||||
}
|
||||
|
||||
const state = Instance.state(
|
||||
() => new Map<PtyID, ActiveSession>(),
|
||||
async (sessions) => {
|
||||
for (const session of sessions.values()) {
|
||||
export interface Interface {
|
||||
readonly list: () => Effect.Effect<Info[]>
|
||||
readonly get: (id: PtyID) => Effect.Effect<Info | undefined>
|
||||
readonly create: (input: CreateInput) => Effect.Effect<Info>
|
||||
readonly update: (id: PtyID, input: UpdateInput) => Effect.Effect<Info | undefined>
|
||||
readonly remove: (id: PtyID) => Effect.Effect<void>
|
||||
readonly resize: (id: PtyID, cols: number, rows: number) => Effect.Effect<void>
|
||||
readonly write: (id: PtyID, data: string) => Effect.Effect<void>
|
||||
readonly connect: (
|
||||
id: PtyID,
|
||||
ws: Socket,
|
||||
cursor?: number,
|
||||
) => Effect.Effect<{ onMessage: (message: string | ArrayBuffer) => void; onClose: () => void } | undefined>
|
||||
}
|
||||
|
||||
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Pty") {}
|
||||
|
||||
export const layer = Layer.effect(
|
||||
Service,
|
||||
Effect.gen(function* () {
|
||||
const instance = yield* InstanceContext
|
||||
const sessions = new Map<PtyID, ActiveSession>()
|
||||
|
||||
yield* Effect.addFinalizer(() =>
|
||||
Effect.sync(() => {
|
||||
for (const session of sessions.values()) {
|
||||
try {
|
||||
session.process.kill()
|
||||
} catch {}
|
||||
for (const [key, ws] of session.subscribers.entries()) {
|
||||
try {
|
||||
if (ws.data === key) ws.close()
|
||||
} catch {}
|
||||
}
|
||||
}
|
||||
sessions.clear()
|
||||
}),
|
||||
)
|
||||
|
||||
const removeSession = (id: PtyID) => {
|
||||
const session = sessions.get(id)
|
||||
if (!session) return
|
||||
sessions.delete(id)
|
||||
log.info("removing session", { id })
|
||||
try {
|
||||
session.process.kill()
|
||||
} catch {}
|
||||
for (const [key, ws] of session.subscribers.entries()) {
|
||||
try {
|
||||
if (ws.data === key) ws.close()
|
||||
} catch {}
|
||||
}
|
||||
session.subscribers.clear()
|
||||
Bus.publish(Event.Deleted, { id: session.info.id })
|
||||
}
|
||||
|
||||
const list = Effect.fn("Pty.list")(function* () {
|
||||
return Array.from(sessions.values()).map((s) => s.info)
|
||||
})
|
||||
|
||||
const get = Effect.fn("Pty.get")(function* (id: PtyID) {
|
||||
return sessions.get(id)?.info
|
||||
})
|
||||
|
||||
const create = Effect.fn("Pty.create")(function* (input: CreateInput) {
|
||||
return yield* Effect.promise(async () => {
|
||||
const id = PtyID.ascending()
|
||||
const command = input.command || Shell.preferred()
|
||||
const args = input.args || []
|
||||
if (command.endsWith("sh")) {
|
||||
args.push("-l")
|
||||
}
|
||||
|
||||
const cwd = input.cwd || instance.directory
|
||||
const shellEnv = await Plugin.trigger("shell.env", { cwd }, { env: {} })
|
||||
const env = {
|
||||
...process.env,
|
||||
...input.env,
|
||||
...shellEnv.env,
|
||||
TERM: "xterm-256color",
|
||||
OPENCODE_TERMINAL: "1",
|
||||
} as Record<string, string>
|
||||
|
||||
if (process.platform === "win32") {
|
||||
env.LC_ALL = "C.UTF-8"
|
||||
env.LC_CTYPE = "C.UTF-8"
|
||||
env.LANG = "C.UTF-8"
|
||||
}
|
||||
log.info("creating session", { id, cmd: command, args, cwd })
|
||||
|
||||
const spawn = await pty()
|
||||
const ptyProcess = spawn(command, args, {
|
||||
name: "xterm-256color",
|
||||
cwd,
|
||||
env,
|
||||
})
|
||||
|
||||
const info = {
|
||||
id,
|
||||
title: input.title || `Terminal ${id.slice(-4)}`,
|
||||
command,
|
||||
args,
|
||||
cwd,
|
||||
status: "running",
|
||||
pid: ptyProcess.pid,
|
||||
} as const
|
||||
const session: ActiveSession = {
|
||||
info,
|
||||
process: ptyProcess,
|
||||
buffer: "",
|
||||
bufferCursor: 0,
|
||||
cursor: 0,
|
||||
subscribers: new Map(),
|
||||
}
|
||||
sessions.set(id, session)
|
||||
ptyProcess.onData((chunk) => {
|
||||
session.cursor += chunk.length
|
||||
|
||||
for (const [key, ws] of session.subscribers.entries()) {
|
||||
if (ws.readyState !== 1) {
|
||||
session.subscribers.delete(key)
|
||||
continue
|
||||
}
|
||||
if (ws.data !== key) {
|
||||
session.subscribers.delete(key)
|
||||
continue
|
||||
}
|
||||
try {
|
||||
ws.send(chunk)
|
||||
} catch {
|
||||
session.subscribers.delete(key)
|
||||
}
|
||||
}
|
||||
|
||||
session.buffer += chunk
|
||||
if (session.buffer.length <= BUFFER_LIMIT) return
|
||||
const excess = session.buffer.length - BUFFER_LIMIT
|
||||
session.buffer = session.buffer.slice(excess)
|
||||
session.bufferCursor += excess
|
||||
})
|
||||
ptyProcess.onExit(({ exitCode }) => {
|
||||
if (session.info.status === "exited") return
|
||||
log.info("session exited", { id, exitCode })
|
||||
session.info.status = "exited"
|
||||
Bus.publish(Event.Exited, { id, exitCode })
|
||||
removeSession(id)
|
||||
})
|
||||
Bus.publish(Event.Created, { info })
|
||||
return info
|
||||
})
|
||||
})
|
||||
|
||||
const update = Effect.fn("Pty.update")(function* (id: PtyID, input: UpdateInput) {
|
||||
const session = sessions.get(id)
|
||||
if (!session) return
|
||||
if (input.title) {
|
||||
session.info.title = input.title
|
||||
}
|
||||
if (input.size) {
|
||||
session.process.resize(input.size.cols, input.size.rows)
|
||||
}
|
||||
Bus.publish(Event.Updated, { info: session.info })
|
||||
return session.info
|
||||
})
|
||||
|
||||
const remove = Effect.fn("Pty.remove")(function* (id: PtyID) {
|
||||
removeSession(id)
|
||||
})
|
||||
|
||||
const resize = Effect.fn("Pty.resize")(function* (id: PtyID, cols: number, rows: number) {
|
||||
const session = sessions.get(id)
|
||||
if (session && session.info.status === "running") {
|
||||
session.process.resize(cols, rows)
|
||||
}
|
||||
})
|
||||
|
||||
const write = Effect.fn("Pty.write")(function* (id: PtyID, data: string) {
|
||||
const session = sessions.get(id)
|
||||
if (session && session.info.status === "running") {
|
||||
session.process.write(data)
|
||||
}
|
||||
})
|
||||
|
||||
const connect = Effect.fn("Pty.connect")(function* (id: PtyID, ws: Socket, cursor?: number) {
|
||||
const session = sessions.get(id)
|
||||
if (!session) {
|
||||
ws.close()
|
||||
return
|
||||
}
|
||||
log.info("client connected to session", { id })
|
||||
|
||||
const connectionKey = ws.data && typeof ws.data === "object" ? ws.data : ws
|
||||
session.subscribers.delete(connectionKey)
|
||||
session.subscribers.set(connectionKey, ws)
|
||||
|
||||
const cleanup = () => {
|
||||
session.subscribers.delete(connectionKey)
|
||||
}
|
||||
|
||||
const start = session.bufferCursor
|
||||
const end = session.cursor
|
||||
|
||||
const from =
|
||||
cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0
|
||||
|
||||
const data = (() => {
|
||||
if (!session.buffer) return ""
|
||||
if (from >= end) return ""
|
||||
const offset = Math.max(0, from - start)
|
||||
if (offset >= session.buffer.length) return ""
|
||||
return session.buffer.slice(offset)
|
||||
})()
|
||||
|
||||
if (data) {
|
||||
try {
|
||||
for (let i = 0; i < data.length; i += BUFFER_CHUNK) {
|
||||
ws.send(data.slice(i, i + BUFFER_CHUNK))
|
||||
}
|
||||
} catch {
|
||||
// ignore
|
||||
cleanup()
|
||||
ws.close()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
sessions.clear()
|
||||
},
|
||||
|
||||
try {
|
||||
ws.send(meta(end))
|
||||
} catch {
|
||||
cleanup()
|
||||
ws.close()
|
||||
return
|
||||
}
|
||||
return {
|
||||
onMessage: (message: string | ArrayBuffer) => {
|
||||
session.process.write(String(message))
|
||||
},
|
||||
onClose: () => {
|
||||
log.info("client disconnected from session", { id })
|
||||
cleanup()
|
||||
},
|
||||
}
|
||||
})
|
||||
|
||||
return Service.of({ list, get, create, update, remove, resize, write, connect })
|
||||
}),
|
||||
)
|
||||
|
||||
// Sync facades
|
||||
export function list() {
|
||||
return Array.from(state().values()).map((s) => s.info)
|
||||
return runSyncInstance(Service.use((svc) => svc.list()))
|
||||
}
|
||||
|
||||
export function get(id: PtyID) {
|
||||
return state().get(id)?.info
|
||||
}
|
||||
|
||||
export async function create(input: CreateInput) {
|
||||
const id = PtyID.ascending()
|
||||
const command = input.command || Shell.preferred()
|
||||
const args = input.args || []
|
||||
if (command.endsWith("sh")) {
|
||||
args.push("-l")
|
||||
}
|
||||
|
||||
const cwd = input.cwd || Instance.directory
|
||||
const shellEnv = await Plugin.trigger("shell.env", { cwd }, { env: {} })
|
||||
const env = {
|
||||
...process.env,
|
||||
...input.env,
|
||||
...shellEnv.env,
|
||||
TERM: "xterm-256color",
|
||||
OPENCODE_TERMINAL: "1",
|
||||
} as Record<string, string>
|
||||
|
||||
if (process.platform === "win32") {
|
||||
env.LC_ALL = "C.UTF-8"
|
||||
env.LC_CTYPE = "C.UTF-8"
|
||||
env.LANG = "C.UTF-8"
|
||||
}
|
||||
log.info("creating session", { id, cmd: command, args, cwd })
|
||||
|
||||
const spawn = await pty()
|
||||
const ptyProcess = spawn(command, args, {
|
||||
name: "xterm-256color",
|
||||
cwd,
|
||||
env,
|
||||
})
|
||||
|
||||
const info = {
|
||||
id,
|
||||
title: input.title || `Terminal ${id.slice(-4)}`,
|
||||
command,
|
||||
args,
|
||||
cwd,
|
||||
status: "running",
|
||||
pid: ptyProcess.pid,
|
||||
} as const
|
||||
const session: ActiveSession = {
|
||||
info,
|
||||
process: ptyProcess,
|
||||
buffer: "",
|
||||
bufferCursor: 0,
|
||||
cursor: 0,
|
||||
subscribers: new Map(),
|
||||
}
|
||||
state().set(id, session)
|
||||
ptyProcess.onData(
|
||||
Instance.bind((chunk) => {
|
||||
session.cursor += chunk.length
|
||||
|
||||
for (const [key, ws] of session.subscribers.entries()) {
|
||||
if (ws.readyState !== 1) {
|
||||
session.subscribers.delete(key)
|
||||
continue
|
||||
}
|
||||
|
||||
if (ws.data !== key) {
|
||||
session.subscribers.delete(key)
|
||||
continue
|
||||
}
|
||||
|
||||
try {
|
||||
ws.send(chunk)
|
||||
} catch {
|
||||
session.subscribers.delete(key)
|
||||
}
|
||||
}
|
||||
|
||||
session.buffer += chunk
|
||||
if (session.buffer.length <= BUFFER_LIMIT) return
|
||||
const excess = session.buffer.length - BUFFER_LIMIT
|
||||
session.buffer = session.buffer.slice(excess)
|
||||
session.bufferCursor += excess
|
||||
}),
|
||||
)
|
||||
ptyProcess.onExit(
|
||||
Instance.bind(({ exitCode }) => {
|
||||
if (session.info.status === "exited") return
|
||||
log.info("session exited", { id, exitCode })
|
||||
session.info.status = "exited"
|
||||
Bus.publish(Event.Exited, { id, exitCode })
|
||||
remove(id)
|
||||
}),
|
||||
)
|
||||
Bus.publish(Event.Created, { info })
|
||||
return info
|
||||
}
|
||||
|
||||
export async function update(id: PtyID, input: UpdateInput) {
|
||||
const session = state().get(id)
|
||||
if (!session) return
|
||||
if (input.title) {
|
||||
session.info.title = input.title
|
||||
}
|
||||
if (input.size) {
|
||||
session.process.resize(input.size.cols, input.size.rows)
|
||||
}
|
||||
Bus.publish(Event.Updated, { info: session.info })
|
||||
return session.info
|
||||
}
|
||||
|
||||
export async function remove(id: PtyID) {
|
||||
const session = state().get(id)
|
||||
if (!session) return
|
||||
state().delete(id)
|
||||
log.info("removing session", { id })
|
||||
try {
|
||||
session.process.kill()
|
||||
} catch {}
|
||||
for (const [key, ws] of session.subscribers.entries()) {
|
||||
try {
|
||||
if (ws.data === key) ws.close()
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
session.subscribers.clear()
|
||||
Bus.publish(Event.Deleted, { id: session.info.id })
|
||||
return runSyncInstance(Service.use((svc) => svc.get(id)))
|
||||
}
|
||||
|
||||
export function resize(id: PtyID, cols: number, rows: number) {
|
||||
const session = state().get(id)
|
||||
if (session && session.info.status === "running") {
|
||||
session.process.resize(cols, rows)
|
||||
}
|
||||
runSyncInstance(Service.use((svc) => svc.resize(id, cols, rows)))
|
||||
}
|
||||
|
||||
export function write(id: PtyID, data: string) {
|
||||
const session = state().get(id)
|
||||
if (session && session.info.status === "running") {
|
||||
session.process.write(data)
|
||||
}
|
||||
runSyncInstance(Service.use((svc) => svc.write(id, data)))
|
||||
}
|
||||
|
||||
export function connect(id: PtyID, ws: Socket, cursor?: number) {
|
||||
const session = state().get(id)
|
||||
if (!session) {
|
||||
ws.close()
|
||||
return
|
||||
}
|
||||
log.info("client connected to session", { id })
|
||||
return runSyncInstance(Service.use((svc) => svc.connect(id, ws, cursor)))
|
||||
}
|
||||
|
||||
// Use ws.data as the unique key for this connection lifecycle.
|
||||
// If ws.data is undefined, fallback to ws object.
|
||||
const connectionKey = ws.data && typeof ws.data === "object" ? ws.data : ws
|
||||
// Async facades
|
||||
export async function create(input: CreateInput) {
|
||||
return runPromiseInstance(Service.use((svc) => svc.create(input)))
|
||||
}
|
||||
|
||||
// Optionally cleanup if the key somehow exists
|
||||
session.subscribers.delete(connectionKey)
|
||||
session.subscribers.set(connectionKey, ws)
|
||||
export async function update(id: PtyID, input: UpdateInput) {
|
||||
return runPromiseInstance(Service.use((svc) => svc.update(id, input)))
|
||||
}
|
||||
|
||||
const cleanup = () => {
|
||||
session.subscribers.delete(connectionKey)
|
||||
}
|
||||
|
||||
const start = session.bufferCursor
|
||||
const end = session.cursor
|
||||
|
||||
const from =
|
||||
cursor === -1 ? end : typeof cursor === "number" && Number.isSafeInteger(cursor) ? Math.max(0, cursor) : 0
|
||||
|
||||
const data = (() => {
|
||||
if (!session.buffer) return ""
|
||||
if (from >= end) return ""
|
||||
const offset = Math.max(0, from - start)
|
||||
if (offset >= session.buffer.length) return ""
|
||||
return session.buffer.slice(offset)
|
||||
})()
|
||||
|
||||
if (data) {
|
||||
try {
|
||||
for (let i = 0; i < data.length; i += BUFFER_CHUNK) {
|
||||
ws.send(data.slice(i, i + BUFFER_CHUNK))
|
||||
}
|
||||
} catch {
|
||||
cleanup()
|
||||
ws.close()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
ws.send(meta(end))
|
||||
} catch {
|
||||
cleanup()
|
||||
ws.close()
|
||||
return
|
||||
}
|
||||
return {
|
||||
onMessage: (message: string | ArrayBuffer) => {
|
||||
session.process.write(String(message))
|
||||
},
|
||||
onClose: () => {
|
||||
log.info("client disconnected from session", { id })
|
||||
cleanup()
|
||||
},
|
||||
}
|
||||
export async function remove(id: PtyID) {
|
||||
return runPromiseInstance(Service.use((svc) => svc.remove(id)))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user