mirror of
https://github.com/anomalyco/opencode.git
synced 2026-06-01 19:05:38 +00:00
fix(pty): expose missing session errors (#28884)
This commit is contained in:
@@ -87,6 +87,10 @@ export const UpdateInput = Schema.Struct({
|
|||||||
|
|
||||||
export type UpdateInput = Types.DeepMutable<Schema.Schema.Type<typeof UpdateInput>>
|
export type UpdateInput = Types.DeepMutable<Schema.Schema.Type<typeof UpdateInput>>
|
||||||
|
|
||||||
|
export class NotFoundError extends Schema.TaggedErrorClass<NotFoundError>()("Pty.NotFoundError", {
|
||||||
|
ptyID: PtyID,
|
||||||
|
}) {}
|
||||||
|
|
||||||
export const Event = {
|
export const Event = {
|
||||||
Created: BusEvent.define("pty.created", Schema.Struct({ info: Info })),
|
Created: BusEvent.define("pty.created", Schema.Struct({ info: Info })),
|
||||||
Updated: BusEvent.define("pty.updated", Schema.Struct({ info: Info })),
|
Updated: BusEvent.define("pty.updated", Schema.Struct({ info: Info })),
|
||||||
@@ -96,17 +100,17 @@ export const Event = {
|
|||||||
|
|
||||||
export interface Interface {
|
export interface Interface {
|
||||||
readonly list: () => Effect.Effect<Info[]>
|
readonly list: () => Effect.Effect<Info[]>
|
||||||
readonly get: (id: PtyID) => Effect.Effect<Info | undefined>
|
readonly get: (id: PtyID) => Effect.Effect<Info, NotFoundError>
|
||||||
readonly create: (input: CreateInput) => Effect.Effect<Info>
|
readonly create: (input: CreateInput) => Effect.Effect<Info>
|
||||||
readonly update: (id: PtyID, input: UpdateInput) => Effect.Effect<Info | undefined>
|
readonly update: (id: PtyID, input: UpdateInput) => Effect.Effect<Info, NotFoundError>
|
||||||
readonly remove: (id: PtyID) => Effect.Effect<void>
|
readonly remove: (id: PtyID) => Effect.Effect<void, NotFoundError>
|
||||||
readonly resize: (id: PtyID, cols: number, rows: number) => Effect.Effect<void>
|
readonly resize: (id: PtyID, cols: number, rows: number) => Effect.Effect<void, NotFoundError>
|
||||||
readonly write: (id: PtyID, data: string) => Effect.Effect<void>
|
readonly write: (id: PtyID, data: string) => Effect.Effect<void, NotFoundError>
|
||||||
readonly connect: (
|
readonly connect: (
|
||||||
id: PtyID,
|
id: PtyID,
|
||||||
ws: Socket,
|
ws: Socket,
|
||||||
cursor?: number,
|
cursor?: number,
|
||||||
) => Effect.Effect<{ onMessage: (message: string | ArrayBuffer) => void; onClose: () => void } | undefined>
|
) => Effect.Effect<{ onMessage: (message: string | ArrayBuffer) => void; onClose: () => void } | undefined, NotFoundError>
|
||||||
}
|
}
|
||||||
|
|
||||||
export class Service extends Context.Service<Service, Interface>()("@opencode/Pty") {}
|
export class Service extends Context.Service<Service, Interface>()("@opencode/Pty") {}
|
||||||
@@ -150,10 +154,15 @@ export const layer = Layer.effect(
|
|||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const requireSession = Effect.fn("Pty.requireSession")(function* (id: PtyID) {
|
||||||
|
const session = (yield* InstanceState.get(state)).sessions.get(id)
|
||||||
|
if (!session) return yield* new NotFoundError({ ptyID: id })
|
||||||
|
return session
|
||||||
|
})
|
||||||
|
|
||||||
const remove = Effect.fn("Pty.remove")(function* (id: PtyID) {
|
const remove = Effect.fn("Pty.remove")(function* (id: PtyID) {
|
||||||
const s = yield* InstanceState.get(state)
|
const s = yield* InstanceState.get(state)
|
||||||
const session = s.sessions.get(id)
|
const session = yield* requireSession(id)
|
||||||
if (!session) return
|
|
||||||
s.sessions.delete(id)
|
s.sessions.delete(id)
|
||||||
log.info("removing session", { id })
|
log.info("removing session", { id })
|
||||||
teardown(session)
|
teardown(session)
|
||||||
@@ -166,8 +175,7 @@ export const layer = Layer.effect(
|
|||||||
})
|
})
|
||||||
|
|
||||||
const get = Effect.fn("Pty.get")(function* (id: PtyID) {
|
const get = Effect.fn("Pty.get")(function* (id: PtyID) {
|
||||||
const s = yield* InstanceState.get(state)
|
return (yield* requireSession(id)).info
|
||||||
return s.sessions.get(id)?.info
|
|
||||||
})
|
})
|
||||||
|
|
||||||
const create = Effect.fn("Pty.create")(function* (input: CreateInput) {
|
const create = Effect.fn("Pty.create")(function* (input: CreateInput) {
|
||||||
@@ -262,9 +270,7 @@ export const layer = Layer.effect(
|
|||||||
})
|
})
|
||||||
|
|
||||||
const update = Effect.fn("Pty.update")(function* (id: PtyID, input: UpdateInput) {
|
const update = Effect.fn("Pty.update")(function* (id: PtyID, input: UpdateInput) {
|
||||||
const s = yield* InstanceState.get(state)
|
const session = yield* requireSession(id)
|
||||||
const session = s.sessions.get(id)
|
|
||||||
if (!session) return
|
|
||||||
if (input.title) {
|
if (input.title) {
|
||||||
session.info.title = input.title
|
session.info.title = input.title
|
||||||
}
|
}
|
||||||
@@ -276,28 +282,27 @@ export const layer = Layer.effect(
|
|||||||
})
|
})
|
||||||
|
|
||||||
const resize = Effect.fn("Pty.resize")(function* (id: PtyID, cols: number, rows: number) {
|
const resize = Effect.fn("Pty.resize")(function* (id: PtyID, cols: number, rows: number) {
|
||||||
const s = yield* InstanceState.get(state)
|
const session = yield* requireSession(id)
|
||||||
const session = s.sessions.get(id)
|
if (session.info.status === "running") {
|
||||||
if (session && session.info.status === "running") {
|
|
||||||
session.process.resize(cols, rows)
|
session.process.resize(cols, rows)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
const write = Effect.fn("Pty.write")(function* (id: PtyID, data: string) {
|
const write = Effect.fn("Pty.write")(function* (id: PtyID, data: string) {
|
||||||
const s = yield* InstanceState.get(state)
|
const session = yield* requireSession(id)
|
||||||
const session = s.sessions.get(id)
|
if (session.info.status === "running") {
|
||||||
if (session && session.info.status === "running") {
|
|
||||||
session.process.write(data)
|
session.process.write(data)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
const connect = Effect.fn("Pty.connect")(function* (id: PtyID, ws: Socket, cursor?: number) {
|
const connect = Effect.fn("Pty.connect")(function* (id: PtyID, ws: Socket, cursor?: number) {
|
||||||
const s = yield* InstanceState.get(state)
|
const session = yield* requireSession(id).pipe(
|
||||||
const session = s.sessions.get(id)
|
Effect.tapError(() =>
|
||||||
if (!session) {
|
Effect.sync(() => {
|
||||||
ws.close()
|
ws.close()
|
||||||
return
|
}),
|
||||||
}
|
),
|
||||||
|
)
|
||||||
log.info("client connected to session", { id })
|
log.info("client connected to session", { id })
|
||||||
|
|
||||||
const sub = sock(ws)
|
const sub = sock(ws)
|
||||||
|
|||||||
@@ -46,38 +46,50 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler
|
|||||||
})
|
})
|
||||||
|
|
||||||
const get = Effect.fn("PtyHttpApi.get")(function* (ctx: { params: { ptyID: PtyID } }) {
|
const get = Effect.fn("PtyHttpApi.get")(function* (ctx: { params: { ptyID: PtyID } }) {
|
||||||
const info = yield* pty.get(ctx.params.ptyID)
|
return yield* pty.get(ctx.params.ptyID).pipe(
|
||||||
if (!info)
|
Effect.catchTag("Pty.NotFoundError", (error) =>
|
||||||
return yield* new ApiError.PtyNotFoundError({
|
Effect.fail(
|
||||||
ptyID: ctx.params.ptyID,
|
new ApiError.PtyNotFoundError({
|
||||||
message: `PTY session not found: ${ctx.params.ptyID}`,
|
ptyID: error.ptyID,
|
||||||
})
|
message: `PTY session not found: ${error.ptyID}`,
|
||||||
return info
|
}),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
})
|
})
|
||||||
|
|
||||||
const update = Effect.fn("PtyHttpApi.update")(function* (ctx: {
|
const update = Effect.fn("PtyHttpApi.update")(function* (ctx: {
|
||||||
params: { ptyID: PtyID }
|
params: { ptyID: PtyID }
|
||||||
payload: typeof Pty.UpdateInput.Type
|
payload: typeof Pty.UpdateInput.Type
|
||||||
}) {
|
}) {
|
||||||
const info = yield* pty.update(ctx.params.ptyID, {
|
return yield* pty
|
||||||
...ctx.payload,
|
.update(ctx.params.ptyID, {
|
||||||
size: ctx.payload.size ? { ...ctx.payload.size } : undefined,
|
...ctx.payload,
|
||||||
})
|
size: ctx.payload.size ? { ...ctx.payload.size } : undefined,
|
||||||
if (!info)
|
|
||||||
return yield* new ApiError.PtyNotFoundError({
|
|
||||||
ptyID: ctx.params.ptyID,
|
|
||||||
message: `PTY session not found: ${ctx.params.ptyID}`,
|
|
||||||
})
|
})
|
||||||
return info
|
.pipe(
|
||||||
|
Effect.catchTag("Pty.NotFoundError", (error) =>
|
||||||
|
Effect.fail(
|
||||||
|
new ApiError.PtyNotFoundError({
|
||||||
|
ptyID: error.ptyID,
|
||||||
|
message: `PTY session not found: ${error.ptyID}`,
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
})
|
})
|
||||||
|
|
||||||
const remove = Effect.fn("PtyHttpApi.remove")(function* (ctx: { params: { ptyID: PtyID } }) {
|
const remove = Effect.fn("PtyHttpApi.remove")(function* (ctx: { params: { ptyID: PtyID } }) {
|
||||||
if (!(yield* pty.get(ctx.params.ptyID)))
|
yield* pty.remove(ctx.params.ptyID).pipe(
|
||||||
return yield* new ApiError.PtyNotFoundError({
|
Effect.catchTag("Pty.NotFoundError", (error) =>
|
||||||
ptyID: ctx.params.ptyID,
|
Effect.fail(
|
||||||
message: `PTY session not found: ${ctx.params.ptyID}`,
|
new ApiError.PtyNotFoundError({
|
||||||
})
|
ptyID: error.ptyID,
|
||||||
yield* pty.remove(ctx.params.ptyID)
|
message: `PTY session not found: ${error.ptyID}`,
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -85,11 +97,16 @@ export const ptyHandlers = HttpApiBuilder.group(InstanceHttpApi, "pty", (handler
|
|||||||
const request = yield* HttpServerRequest.HttpServerRequest
|
const request = yield* HttpServerRequest.HttpServerRequest
|
||||||
if (request.headers[PTY_CONNECT_TOKEN_HEADER] !== PTY_CONNECT_TOKEN_HEADER_VALUE || !validOrigin(request, cors))
|
if (request.headers[PTY_CONNECT_TOKEN_HEADER] !== PTY_CONNECT_TOKEN_HEADER_VALUE || !validOrigin(request, cors))
|
||||||
return yield* new ApiError.PtyForbiddenError({ message: "Invalid PTY connect token request" })
|
return yield* new ApiError.PtyForbiddenError({ message: "Invalid PTY connect token request" })
|
||||||
if (!(yield* pty.get(ctx.params.ptyID)))
|
yield* pty.get(ctx.params.ptyID).pipe(
|
||||||
return yield* new ApiError.PtyNotFoundError({
|
Effect.catchTag("Pty.NotFoundError", (error) =>
|
||||||
ptyID: ctx.params.ptyID,
|
Effect.fail(
|
||||||
message: `PTY session not found: ${ctx.params.ptyID}`,
|
new ApiError.PtyNotFoundError({
|
||||||
})
|
ptyID: error.ptyID,
|
||||||
|
message: `PTY session not found: ${error.ptyID}`,
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
return yield* tickets.issue({ ptyID: ctx.params.ptyID, ...(yield* PtyTicket.scope) })
|
return yield* tickets.issue({ ptyID: ctx.params.ptyID, ...(yield* PtyTicket.scope) })
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -114,7 +131,11 @@ export const ptyConnectRoute = HttpRouter.use((router) =>
|
|||||||
PtyPaths.connect,
|
PtyPaths.connect,
|
||||||
Effect.gen(function* () {
|
Effect.gen(function* () {
|
||||||
const params = yield* HttpRouter.schemaPathParams(Params)
|
const params = yield* HttpRouter.schemaPathParams(Params)
|
||||||
if (!(yield* pty.get(params.ptyID))) return HttpServerResponse.empty({ status: 404 })
|
const exists = yield* pty.get(params.ptyID).pipe(
|
||||||
|
Effect.as(true),
|
||||||
|
Effect.catchTag("Pty.NotFoundError", () => Effect.succeed(false)),
|
||||||
|
)
|
||||||
|
if (!exists) return HttpServerResponse.empty({ status: 404 })
|
||||||
|
|
||||||
const query = yield* HttpServerRequest.schemaSearchParams(CursorQuery)
|
const query = yield* HttpServerRequest.schemaSearchParams(CursorQuery)
|
||||||
const request = yield* HttpServerRequest.HttpServerRequest
|
const request = yield* HttpServerRequest.HttpServerRequest
|
||||||
@@ -164,11 +185,12 @@ export const ptyConnectRoute = HttpRouter.use((router) =>
|
|||||||
writeScoped(write(new Socket.CloseEvent(code, reason)))
|
writeScoped(write(new Socket.CloseEvent(code, reason)))
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
const handler = yield* pty.connect(params.ptyID, adapter, cursor)
|
const handler = yield* pty.connect(params.ptyID, adapter, cursor).pipe(
|
||||||
if (!handler) {
|
Effect.catchTag("Pty.NotFoundError", () =>
|
||||||
yield* closeAccepted(new Socket.CloseEvent(4404, "session not found"))
|
closeAccepted(new Socket.CloseEvent(4404, "session not found")).pipe(Effect.as(undefined)),
|
||||||
return HttpServerResponse.empty()
|
),
|
||||||
}
|
)
|
||||||
|
if (!handler) return HttpServerResponse.empty()
|
||||||
|
|
||||||
// No `pending[]`-style early-frame buffer (the legacy handler had one).
|
// No `pending[]`-style early-frame buffer (the legacy handler had one).
|
||||||
// `request.upgrade` returns a Socket without running the WS handshake; the
|
// `request.upgrade` returns a Socket without running the WS handshake; the
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import { Config } from "../../src/config/config"
|
|||||||
import { Plugin } from "../../src/plugin"
|
import { Plugin } from "../../src/plugin"
|
||||||
import { Pty } from "../../src/pty"
|
import { Pty } from "../../src/pty"
|
||||||
import type { PtyID } from "../../src/pty/schema"
|
import type { PtyID } from "../../src/pty/schema"
|
||||||
import { Effect, Layer, Queue } from "effect"
|
import { Cause, Effect, Exit, Layer, Queue } from "effect"
|
||||||
import { testEffect } from "../lib/effect"
|
import { testEffect } from "../lib/effect"
|
||||||
|
|
||||||
type PtyEvent = { type: "created" | "exited" | "deleted"; id: PtyID }
|
type PtyEvent = { type: "created" | "exited" | "deleted"; id: PtyID }
|
||||||
@@ -66,6 +66,54 @@ const waitForEvents = (events: Queue.Queue<PtyEvent>, id: PtyID, count: number)
|
|||||||
}
|
}
|
||||||
|
|
||||||
describe("pty", () => {
|
describe("pty", () => {
|
||||||
|
it.instance(
|
||||||
|
"returns typed not found errors for missing sessions",
|
||||||
|
() =>
|
||||||
|
Effect.gen(function* () {
|
||||||
|
const pty = yield* Pty.Service
|
||||||
|
const id = "pty_missing" as PtyID
|
||||||
|
let closed = false
|
||||||
|
const socket = {
|
||||||
|
readyState: 1,
|
||||||
|
send: () => {},
|
||||||
|
close: () => {
|
||||||
|
closed = true
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
const get = yield* pty.get(id).pipe(Effect.exit)
|
||||||
|
expect(Exit.isFailure(get)).toBe(true)
|
||||||
|
if (Exit.isFailure(get)) expect(Cause.squash(get.cause)).toMatchObject({ _tag: "Pty.NotFoundError", ptyID: id })
|
||||||
|
|
||||||
|
const update = yield* pty.update(id, { title: "missing" }).pipe(Effect.exit)
|
||||||
|
expect(Exit.isFailure(update)).toBe(true)
|
||||||
|
if (Exit.isFailure(update))
|
||||||
|
expect(Cause.squash(update.cause)).toMatchObject({ _tag: "Pty.NotFoundError", ptyID: id })
|
||||||
|
|
||||||
|
const remove = yield* pty.remove(id).pipe(Effect.exit)
|
||||||
|
expect(Exit.isFailure(remove)).toBe(true)
|
||||||
|
if (Exit.isFailure(remove))
|
||||||
|
expect(Cause.squash(remove.cause)).toMatchObject({ _tag: "Pty.NotFoundError", ptyID: id })
|
||||||
|
|
||||||
|
const resize = yield* pty.resize(id, 80, 24).pipe(Effect.exit)
|
||||||
|
expect(Exit.isFailure(resize)).toBe(true)
|
||||||
|
if (Exit.isFailure(resize))
|
||||||
|
expect(Cause.squash(resize.cause)).toMatchObject({ _tag: "Pty.NotFoundError", ptyID: id })
|
||||||
|
|
||||||
|
const write = yield* pty.write(id, "input").pipe(Effect.exit)
|
||||||
|
expect(Exit.isFailure(write)).toBe(true)
|
||||||
|
if (Exit.isFailure(write))
|
||||||
|
expect(Cause.squash(write.cause)).toMatchObject({ _tag: "Pty.NotFoundError", ptyID: id })
|
||||||
|
|
||||||
|
const connect = yield* pty.connect(id, socket).pipe(Effect.exit)
|
||||||
|
expect(Exit.isFailure(connect)).toBe(true)
|
||||||
|
if (Exit.isFailure(connect))
|
||||||
|
expect(Cause.squash(connect.cause)).toMatchObject({ _tag: "Pty.NotFoundError", ptyID: id })
|
||||||
|
expect(closed).toBe(true)
|
||||||
|
}),
|
||||||
|
{ git: true },
|
||||||
|
)
|
||||||
|
|
||||||
ptyTest(
|
ptyTest(
|
||||||
"publishes created, exited, deleted in order for a short-lived process",
|
"publishes created, exited, deleted in order for a short-lived process",
|
||||||
() =>
|
() =>
|
||||||
@@ -93,7 +141,7 @@ describe("pty", () => {
|
|||||||
expect(yield* waitForEvents(events, info.id, 1)).toEqual(["created"])
|
expect(yield* waitForEvents(events, info.id, 1)).toEqual(["created"])
|
||||||
yield* pty.write(info.id, "exit\n")
|
yield* pty.write(info.id, "exit\n")
|
||||||
expect(yield* waitForEvents(events, info.id, 2)).toEqual(["exited", "deleted"])
|
expect(yield* waitForEvents(events, info.id, 2)).toEqual(["exited", "deleted"])
|
||||||
yield* pty.remove(info.id)
|
yield* pty.remove(info.id).pipe(Effect.ignore)
|
||||||
}),
|
}),
|
||||||
{ git: true },
|
{ git: true },
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -147,6 +147,33 @@ describe("pty HttpApi bridge", () => {
|
|||||||
expect(response.status).toBe(404)
|
expect(response.status).toBe(404)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
test("returns typed not found errors for missing PTY HTTP resources", async () => {
|
||||||
|
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||||
|
const headers = { "x-opencode-directory": tmp.path }
|
||||||
|
const missingID = String(PtyID.ascending())
|
||||||
|
const expected = {
|
||||||
|
_tag: "PtyNotFoundError",
|
||||||
|
ptyID: missingID,
|
||||||
|
message: `PTY session not found: ${missingID}`,
|
||||||
|
}
|
||||||
|
|
||||||
|
const found = await app().request(PtyPaths.get.replace(":ptyID", missingID), { headers })
|
||||||
|
expect(found.status).toBe(404)
|
||||||
|
expect(await found.json()).toEqual(expected)
|
||||||
|
|
||||||
|
const updated = await app().request(PtyPaths.update.replace(":ptyID", missingID), {
|
||||||
|
method: "PUT",
|
||||||
|
headers: { ...headers, "content-type": "application/json" },
|
||||||
|
body: JSON.stringify({ title: "missing" }),
|
||||||
|
})
|
||||||
|
expect(updated.status).toBe(404)
|
||||||
|
expect(await updated.json()).toEqual(expected)
|
||||||
|
|
||||||
|
const removed = await app().request(PtyPaths.remove.replace(":ptyID", missingID), { method: "DELETE", headers })
|
||||||
|
expect(removed.status).toBe(404)
|
||||||
|
expect(await removed.json()).toEqual(expected)
|
||||||
|
})
|
||||||
|
|
||||||
test("returns typed errors for PTY connect token failures", async () => {
|
test("returns typed errors for PTY connect token failures", async () => {
|
||||||
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
|
||||||
const headers = { "x-opencode-directory": tmp.path }
|
const headers = { "x-opencode-directory": tmp.path }
|
||||||
|
|||||||
Reference in New Issue
Block a user