refactor(server): simplify listener lifecycle (#27413)

This commit is contained in:
Kit Langton
2026-05-14 20:00:52 -04:00
committed by GitHub
parent 78769010a1
commit 195f592640
20 changed files with 449 additions and 216 deletions

View File

@@ -32,11 +32,6 @@
"bun": "./src/pty/pty.bun.ts",
"node": "./src/pty/pty.node.ts",
"default": "./src/pty/pty.bun.ts"
},
"#httpapi-server": {
"bun": "./src/server/httpapi-server.node.ts",
"node": "./src/server/httpapi-server.node.ts",
"default": "./src/server/httpapi-server.node.ts"
}
},
"devDependencies": {

View File

@@ -1,35 +0,0 @@
import { NodeHttpServer } from "@effect/platform-node"
import { Effect, Layer } from "effect"
import { createServer } from "node:http"
import { Service } from "./httpapi-server"
export { Service }
export const name = "node-http-server"
export type Opts = { port: number; hostname: string }
export const layer = (opts: Opts) => {
const server = createServer()
const serverRef = { closeStarted: false, forceStop: false }
const close = server.close.bind(server)
// Keep shutdown owned by NodeHttpServer, but honor listener.stop(true) by
// force-closing active HTTP sockets when its finalizer calls server.close().
server.close = ((callback?: Parameters<typeof server.close>[0]) => {
serverRef.closeStarted = true
const result = close(callback)
if (serverRef.forceStop) server.closeAllConnections()
return result
}) as typeof server.close
return Layer.mergeAll(
NodeHttpServer.layer(() => server, { port: opts.port, host: opts.hostname, gracefulShutdownTimeout: "1 second" }),
Layer.succeed(Service)(
Service.of({
closeAll: Effect.sync(() => {
serverRef.forceStop = true
if (serverRef.closeStarted) server.closeAllConnections()
}),
}),
),
)
}

View File

@@ -1,9 +0,0 @@
import { Context, Effect } from "effect"
export interface Interface {
readonly closeAll: Effect.Effect<void>
}
export class Service extends Context.Service<Service, Interface>()("@opencode/HttpApiServer") {}
export * as HttpApiServer from "./httpapi-server"

View File

@@ -24,5 +24,3 @@ export function initProjectors() {
},
})
}
initProjectors()

View File

@@ -1,4 +1,4 @@
import { Context, Effect, Layer } from "effect"
import { Config as EffectConfig, Context, Effect, Layer } from "effect"
import { HttpApiBuilder, OpenApi } from "effect/unstable/httpapi"
import {
FetchHttpClient,
@@ -90,15 +90,6 @@ import { schemaErrorLayer } from "./middleware/schema-error"
export const context = Context.makeUnsafe<unknown>(new Map())
const runtime = HttpRouter.middleware()(
Effect.succeed((effect) =>
Effect.gen(function* () {
yield* Effect.annotateCurrentSpan({ "opencode.server.backend": "effect-httpapi" })
return yield* effect
}),
),
).layer
const cors = (corsOptions?: CorsOptions) =>
HttpRouter.middleware(
HttpMiddleware.cors({
@@ -180,7 +171,16 @@ const uiRoute = HttpRouter.use((router) =>
}),
).pipe(Layer.provide(authOnlyRouterLayer))
export function createRoutes(corsOptions?: CorsOptions) {
type RouteRequirements =
| HttpRouter.HttpRouter
| HttpRouter.Request<"Error", unknown>
| HttpRouter.Request<"GlobalError", unknown>
| HttpRouter.Request<"Requires", unknown>
| HttpRouter.Request<"GlobalRequires", never>
export function createRoutes(
corsOptions?: CorsOptions,
): Layer.Layer<never, EffectConfig.ConfigError, RouteRequirements> {
return Layer.mergeAll(rootApiRoutes, eventApiRoutes, instanceRoutes, docRoute, uiRoute).pipe(
Layer.provide([
errorLayer,
@@ -188,7 +188,6 @@ export function createRoutes(corsOptions?: CorsOptions) {
corsVaryFix,
fenceLayer,
cors(corsOptions),
runtime,
Account.defaultLayer,
Agent.defaultLayer,
Auth.defaultLayer,
@@ -233,28 +232,20 @@ export function createRoutes(corsOptions?: CorsOptions) {
FetchHttpClient.layer,
HttpServer.layerServices,
]),
Layer.provideMerge(Layer.succeed(CorsConfig)(corsOptions)),
Layer.provideMerge(InstanceLayer.layer),
Layer.provideMerge(Observability.layer),
Layer.provide(Layer.succeed(CorsConfig)(corsOptions)),
Layer.provide(InstanceLayer.layer),
Layer.provide(Observability.layer),
)
}
export const routes = createRoutes()
const defaultWebHandler = lazy(() =>
export const webHandler = lazy(() =>
HttpRouter.toWebHandler(routes, {
disableLogger: true,
memoMap,
middleware: disposeMiddleware,
}),
)
export function webHandler(corsOptions?: CorsOptions) {
if (!corsOptions?.cors?.length) return defaultWebHandler()
return HttpRouter.toWebHandler(createRoutes(corsOptions), {
// Server-level CORS options are dynamic; don't reuse the default route layer memoized without them.
memoMap: Layer.makeMemoMapUnsafe(),
middleware: disposeMiddleware,
})
}
export * as ExperimentalHttpApiServer from "./server"
export * as HttpApiApp from "./server"

View File

@@ -1,15 +1,17 @@
import { NodeHttpServer } from "@effect/platform-node"
import * as Log from "@opencode-ai/core/util/log"
import { ConfigProvider, Context, Effect, Exit, Layer, Scope } from "effect"
import { HttpRouter, HttpServer } from "effect/unstable/http"
import { OpenApi } from "effect/unstable/httpapi"
import * as HttpApiServer from "#httpapi-server"
import { createServer } from "node:http"
import { MDNS } from "./mdns"
import { initProjectors } from "./projectors"
import { ExperimentalHttpApiServer } from "./routes/instance/httpapi/server"
import { HttpApiApp } from "./routes/instance/httpapi/server"
import { disposeMiddleware } from "./routes/instance/httpapi/lifecycle"
import { WebSocketTracker } from "./routes/instance/httpapi/websocket-tracker"
import { PublicApi } from "./routes/instance/httpapi/public"
import type { CorsOptions } from "./cors"
import { lazy } from "@/util/lazy"
// @ts-ignore This global is needed to prevent ai-sdk from logging warnings to stdout https://github.com/vercel/ai/blob/2dc67e0ef538307f21368db32d5a12345d98831b/packages/ai/src/logger/log-warnings.ts#L85
globalThis.AI_SDK_LOG_WARNINGS = false
@@ -36,19 +38,34 @@ type ListenOptions = CorsOptions & {
mdns?: boolean
mdnsDomain?: string
}
type ListenerState = {
scope: Scope.Scope
server: Context.Service.Shape<typeof HttpServer.HttpServer>
http: ListenerServer
websockets: WebSocketTracker.Interface
}
type EffectListener = Omit<Listener, "stop"> & {
stop: (close?: boolean) => Effect.Effect<void>
}
const defaultHttpApi = (() => {
const handler = ExperimentalHttpApiServer.webHandler().handler
interface ListenerServer {
readonly closeAll: Effect.Effect<void>
}
class ListenerServerService extends Context.Service<ListenerServerService, ListenerServer>()(
"@opencode/ListenerServer",
) {}
export const Default = lazy(() => {
const handler = HttpApiApp.webHandler().handler
const app: ServerApp = {
fetch: (request: Request) => handler(request, ExperimentalHttpApiServer.context),
fetch: (request: Request) => handler(request, HttpApiApp.context),
request(input, init) {
return app.fetch(input instanceof Request ? input : new Request(new URL(input, "http://localhost"), init))
},
}
return { app }
})()
export const Default = () => defaultHttpApi
})
export async function openapi() {
return OpenApi.fromApi(PublicApi)
@@ -57,102 +74,146 @@ export async function openapi() {
export let url: URL
export async function listen(opts: ListenOptions): Promise<Listener> {
log.info("server backend", { "opencode.server.runtime": HttpApiServer.name })
const buildLayer = (port: number) =>
HttpRouter.serve(ExperimentalHttpApiServer.createRoutes(opts), {
middleware: disposeMiddleware,
disableLogger: true,
disableListenLog: true,
}).pipe(
Layer.provideMerge(WebSocketTracker.layer),
Layer.provideMerge(HttpApiServer.layer({ port, hostname: opts.hostname })),
// Install a fresh `ConfigProvider` per listener so `Config.string(...)`
// reads reflect the current `process.env`. Effect's default
// `ConfigProvider` snapshots `process.env` on first read and caches the
// result on a module-singleton Reference; without overriding it here,
// every later `Server.listen()` keeps observing that initial snapshot.
Layer.provide(ConfigProvider.layer(ConfigProvider.fromEnv())),
)
const start = async (port: number) => {
const scope = Scope.makeUnsafe()
try {
const layer = buildLayer(port) as Layer.Layer<
HttpServer.HttpServer | WebSocketTracker.Service | HttpApiServer.Service,
unknown,
never
>
const ctx = await Effect.runPromise(Layer.buildWithMemoMap(layer, Layer.makeMemoMapUnsafe(), scope))
return { scope, ctx }
} catch (err) {
await Effect.runPromise(Scope.close(scope, Exit.void)).catch(() => undefined)
throw err
}
}
// Match the legacy adapter port-resolution behavior: explicit `0` prefers
// 4096 first, then any free port.
let resolved: Awaited<ReturnType<typeof start>> | undefined
if (opts.port === 0) {
resolved = await start(4096).catch(() => undefined)
if (!resolved) resolved = await start(0)
} else {
resolved = await start(opts.port)
}
if (!resolved) throw new Error(`Failed to start server on port ${opts.port}`)
const server = Context.get(resolved.ctx, HttpServer.HttpServer)
if (server.address._tag !== "TcpAddress") {
await Effect.runPromise(Scope.close(resolved.scope, Exit.void))
throw new Error(`Unexpected HttpServer address tag: ${server.address._tag}`)
}
const port = server.address.port
const innerUrl = new URL("http://localhost")
innerUrl.hostname = opts.hostname
innerUrl.port = String(port)
url = innerUrl
const mdns =
opts.mdns && port && opts.hostname !== "127.0.0.1" && opts.hostname !== "localhost" && opts.hostname !== "::1"
if (mdns) {
MDNS.publish(port, opts.mdnsDomain)
} else if (opts.mdns) {
log.warn("mDNS enabled but hostname is loopback; skipping mDNS publish")
}
let forceStopPromise: Promise<void> | undefined
let stopPromise: Promise<void> | undefined
let mdnsUnpublished = false
const unpublish = () => {
if (!mdns || mdnsUnpublished) return
mdnsUnpublished = true
MDNS.unpublish()
}
const forceStop = () => {
forceStopPromise ??= Effect.runPromiseExit(
Effect.gen(function* () {
yield* Context.get(resolved!.ctx, HttpApiServer.Service).closeAll
yield* Context.get(resolved!.ctx, WebSocketTracker.Service).closeAll
}),
).then(() => undefined)
return forceStopPromise
}
const listener = await Effect.runPromise(listenEffect(opts))
return {
hostname: opts.hostname,
port,
url: innerUrl,
stop: (close?: boolean) => {
unpublish()
const requested = close ? forceStop() : Promise.resolve()
stopPromise ??= requested
.then(() => Effect.runPromiseExit(Scope.close(resolved!.scope, Exit.void)))
.then(() => undefined)
return requested.then(() => stopPromise!)
},
hostname: listener.hostname,
port: listener.port,
url: listener.url,
stop: (close?: boolean) => Effect.runPromiseExit(listener.stop(close)).then(() => undefined),
}
}
const listenEffect: (opts: ListenOptions) => Effect.Effect<EffectListener, unknown> = Effect.fn("Server.listen")(
function* (opts: ListenOptions) {
const state = yield* startWithPortFallback(opts)
const address = yield* tcpAddress(state)
const listenerUrl = makeURL(opts.hostname, address.port)
url = listenerUrl
const unpublishMdns = yield* setupMdns(opts, address.port, state.scope)
return {
hostname: opts.hostname,
port: address.port,
url: listenerUrl,
stop: yield* makeStop(state, unpublishMdns),
}
},
)
function listenerLayer(opts: ListenOptions, port: number) {
return HttpRouter.serve(HttpApiApp.createRoutes(opts), {
middleware: disposeMiddleware,
disableLogger: true,
disableListenLog: true,
}).pipe(
Layer.provideMerge(WebSocketTracker.layer),
Layer.provideMerge(serverLayer({ port, hostname: opts.hostname })),
// Install a fresh `ConfigProvider` per listener so `Config.string(...)`
// reads reflect the current `process.env`. Effect's default
// `ConfigProvider` snapshots `process.env` on first read and caches the
// result on a module-singleton Reference; without overriding it here,
// every later `Server.listen()` keeps observing that initial snapshot.
Layer.provide(ConfigProvider.layer(ConfigProvider.fromEnv())),
)
}
function startWithPortFallback(opts: ListenOptions) {
if (opts.port !== 0) return startListener(opts, opts.port)
// Match the legacy listener port-resolution behavior: explicit `0` prefers
// 4096 first, then any free port.
return startListener(opts, 4096).pipe(Effect.catch(() => startListener(opts, 0)))
}
function startListener(opts: ListenOptions, port: number) {
const scope = Scope.makeUnsafe()
return Layer.buildWithMemoMap(listenerLayer(opts, port), Layer.makeMemoMapUnsafe(), scope).pipe(
Effect.provide(HttpApiApp.context),
Effect.onError(() => Scope.close(scope, Exit.void).pipe(Effect.ignore)),
Effect.map(
(ctx): ListenerState => ({
scope,
server: Context.get(ctx, HttpServer.HttpServer),
http: Context.get(ctx, ListenerServerService),
websockets: Context.get(ctx, WebSocketTracker.Service),
}),
),
)
}
function tcpAddress(state: ListenerState) {
return Effect.gen(function* () {
if (state.server.address._tag === "TcpAddress") return state.server.address
yield* Scope.close(state.scope, Exit.void).pipe(Effect.ignore)
return yield* Effect.die(new Error(`Unexpected HttpServer address tag: ${state.server.address._tag}`))
})
}
function makeURL(hostname: string, port: number) {
const result = new URL("http://localhost")
result.hostname = hostname
result.port = String(port)
return result
}
function setupMdns(opts: ListenOptions, port: number, scope: Scope.Scope) {
return Effect.gen(function* () {
const publish =
opts.mdns && port && opts.hostname !== "127.0.0.1" && opts.hostname !== "localhost" && opts.hostname !== "::1"
if (publish) {
const unpublish = yield* Effect.cached(Effect.sync(() => MDNS.unpublish()))
yield* Effect.sync(() => MDNS.publish(port, opts.mdnsDomain))
yield* Scope.addFinalizer(scope, unpublish)
return unpublish
}
if (opts.mdns) log.warn("mDNS enabled but hostname is loopback; skipping mDNS publish")
return Effect.void
})
}
function makeStop(state: ListenerState, unpublishMdns: Effect.Effect<void>) {
return Effect.gen(function* () {
const forceCloseOnce = yield* Effect.cached(forceClose(state).pipe(Effect.ignore))
const closeScopeOnce = yield* Effect.cached(Scope.close(state.scope, Exit.void).pipe(Effect.ignore))
return (close?: boolean) =>
Effect.gen(function* () {
yield* unpublishMdns
if (close) yield* forceCloseOnce
yield* closeScopeOnce
})
})
}
function forceClose(state: ListenerState) {
return Effect.all([state.http.closeAll, state.websockets.closeAll], { concurrency: "unbounded", discard: true })
}
function serverLayer(opts: { port: number; hostname: string }) {
const server = createServer()
const serverRef = { closeStarted: false, forceStop: false }
const close = server.close.bind(server)
// Keep shutdown owned by NodeHttpServer, but honor listener.stop(true) by
// force-closing active HTTP sockets when its finalizer calls server.close().
// oxlint-disable-next-line typescript-eslint/no-unsafe-type-assertion -- Node's overloads don't preserve a monkey-patched method assignment.
server.close = ((callback?: Parameters<typeof server.close>[0]) => {
serverRef.closeStarted = true
const result = close(callback)
if (serverRef.forceStop) server.closeAllConnections()
return result
}) as typeof server.close
return Layer.mergeAll(
NodeHttpServer.layer(() => server, { port: opts.port, host: opts.hostname, gracefulShutdownTimeout: "1 second" }),
Layer.succeed(ListenerServerService)(
ListenerServerService.of({
closeAll: Effect.sync(() => {
serverRef.forceStop = true
if (serverRef.closeStarted) server.closeAllConnections()
}),
}),
),
)
}
export * as Server from "./server"

View File

@@ -6,7 +6,7 @@ import { HttpClient, HttpClientRequest, HttpRouter, HttpServer } from "effect/un
import * as Socket from "effect/unstable/socket/Socket"
import { Server } from "../../src/server/server"
import { InstancePaths } from "../../src/server/routes/instance/httpapi/groups/instance"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { resetDatabase } from "../fixture/db"
import { testEffect } from "../lib/effect"
@@ -27,7 +27,7 @@ const testStateLayer = Layer.effectDiscard(
)
const servedRoutes: Layer.Layer<never, Config.ConfigError, HttpServer.HttpServer> = HttpRouter.serve(
ExperimentalHttpApiServer.routes,
HttpApiApp.routes,
{ disableListenLog: true, disableLogger: true },
)
@@ -63,7 +63,7 @@ describe("HttpApi CORS", () => {
it.live("adds CORS headers to unauthorized responses", () =>
Effect.gen(function* () {
const handler = HttpRouter.toWebHandler(
ExperimentalHttpApiServer.createRoutes().pipe(
HttpApiApp.createRoutes().pipe(
Layer.provide(ConfigProvider.layer(ConfigProvider.fromUnknown({ OPENCODE_SERVER_PASSWORD: "secret" }))),
),
{ disableLogger: true },
@@ -73,7 +73,7 @@ describe("HttpApi CORS", () => {
new Request(new URL("/global/config", "http://localhost"), {
headers: { origin: "https://app.opencode.ai" },
}),
ExperimentalHttpApiServer.context,
HttpApiApp.context,
),
)
@@ -103,6 +103,20 @@ describe("HttpApi CORS", () => {
expect(response.status).toBe(204)
expect(response.headers.get("access-control-allow-origin")).toBe("https://custom.example")
expect(response.headers.get("access-control-allow-headers")).toBe("authorization")
const rejected = yield* Effect.promise(() =>
fetch(new URL(InstancePaths.path, listener.url), {
method: "OPTIONS",
headers: {
origin: "https://evil.example",
"access-control-request-method": "GET",
"access-control-request-headers": "authorization",
},
}),
)
expect(rejected.status).toBe(204)
expect(rejected.headers.get("access-control-allow-origin")).not.toBe("https://evil.example")
}),
)
})

View File

@@ -49,7 +49,7 @@ function app(modules: Runtime, options: CallOptions) {
if (appCache[cacheKey]) return appCache[cacheKey]
const handler = HttpRouter.toWebHandler(
modules.ExperimentalHttpApiServer.routes.pipe(
modules.HttpApiApp.routes.pipe(
Layer.provide(
ConfigProvider.layer(
ConfigProvider.fromUnknown({ OPENCODE_SERVER_PASSWORD: password, OPENCODE_SERVER_USERNAME: username }),
@@ -62,7 +62,7 @@ function app(modules: Runtime, options: CallOptions) {
request(input: string | URL | Request, init?: RequestInit) {
return handler(
input instanceof Request ? input : new Request(new URL(input, "http://localhost"), init),
modules.ExperimentalHttpApiServer.context,
modules.HttpApiApp.context,
)
},
})

View File

@@ -1,6 +1,6 @@
export type Runtime = {
PublicApi: (typeof import("../../../src/server/routes/instance/httpapi/public"))["PublicApi"]
ExperimentalHttpApiServer: (typeof import("../../../src/server/routes/instance/httpapi/server"))["ExperimentalHttpApiServer"]
HttpApiApp: (typeof import("../../../src/server/routes/instance/httpapi/server"))["HttpApiApp"]
AppLayer: (typeof import("../../../src/effect/app-runtime"))["AppLayer"]
InstanceRef: (typeof import("../../../src/effect/instance-ref"))["InstanceRef"]
Instance: (typeof import("../../../src/project/instance"))["Instance"]
@@ -34,7 +34,7 @@ export function runtime() {
const db = await import("../../fixture/db")
return {
PublicApi: publicApi.PublicApi,
ExperimentalHttpApiServer: httpApiServer.ExperimentalHttpApiServer,
HttpApiApp: httpApiServer.HttpApiApp,
AppLayer: appRuntime.AppLayer,
InstanceRef: instanceRef.InstanceRef,
Instance: instance.Instance,

View File

@@ -1,7 +1,7 @@
import { afterEach, describe, expect, test } from "bun:test"
import { Context } from "effect"
import path from "path"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { FilePaths } from "../../src/server/routes/instance/httpapi/groups/file"
import { Instance } from "../../src/project/instance"
import * as Log from "@opencode-ai/core/util/log"
@@ -17,7 +17,7 @@ function request(route: string, directory: string, query?: Record<string, string
for (const [key, value] of Object.entries(query ?? {})) {
url.searchParams.set(key, value)
}
return ExperimentalHttpApiServer.webHandler().handler(
return HttpApiApp.webHandler().handler(
new Request(url, {
headers: {
"x-opencode-directory": directory,

View File

@@ -8,7 +8,7 @@ import { WorkspaceID } from "../../src/control-plane/schema"
import { ControlPaths } from "../../src/server/routes/instance/httpapi/groups/control"
import { InstancePaths } from "../../src/server/routes/instance/httpapi/groups/instance"
import { SessionPaths } from "../../src/server/routes/instance/httpapi/groups/session"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { HEADER as FenceHeader } from "../../src/server/shared/fence"
import { resetDatabase } from "../fixture/db"
import { tmpdirScoped } from "../fixture/fixture"
@@ -37,7 +37,7 @@ const testStateLayer = Layer.effectDiscard(
// 127.0.0.1:0 and a fetch-based HttpClient that prepends the server URL. This
// keeps the test wired directly through the same route layer production uses.
const servedRoutes: Layer.Layer<never, Config.ConfigError, HttpServer.HttpServer> = HttpRouter.serve(
ExperimentalHttpApiServer.routes,
HttpApiApp.routes,
{ disableListenLog: true, disableLogger: true },
)
@@ -122,7 +122,7 @@ describe("instance HttpApi", () => {
const dir = yield* tmpdirScoped({ git: true })
const request = (path: string, init?: RequestInit) =>
Effect.promise(() =>
ExperimentalHttpApiServer.webHandler().handler(
HttpApiApp.webHandler().handler(
new Request(`http://localhost${path}`, {
...init,
headers: { "x-opencode-directory": dir, "content-type": "application/json", ...init?.headers },

View File

@@ -1,4 +1,5 @@
import { afterEach, describe, expect, test } from "bun:test"
import net from "node:net"
import { Flag } from "@opencode-ai/core/flag/flag"
import * as Log from "@opencode-ai/core/util/log"
import { Server } from "../../src/server/server"
@@ -156,6 +157,16 @@ function waitForMessage(ws: WebSocket, predicate: (message: string) => boolean)
})
}
async function openPtySocket(listener: Awaited<ReturnType<typeof startListener>>, dir: string) {
const info = await createCat(listener, dir)
const ticket = await connectTicket(listener, info.id, dir)
const ws = await openSocket(socketURL(listener, info.id, dir, ticket.ticket))
return {
ws,
closed: new Promise<void>((resolve) => ws.addEventListener("close", () => resolve(), { once: true })),
}
}
describe("HttpApi Server.listen", () => {
testPty("serves HTTP routes and upgrades PTY websocket through Server.listen", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
@@ -208,6 +219,116 @@ describe("HttpApi Server.listen", () => {
}
})
testPty("stop(true) is safe when called concurrently and repeatedly", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const listener = await startListener()
let stopped = false
try {
const socket = await openPtySocket(listener, tmp.path)
await withTimeout(
Promise.all([listener.stop(true), listener.stop(true)]).then(() => undefined),
10_000,
"timed out waiting for concurrent listener.stop(true)",
)
await withTimeout(socket.closed, 5_000, "timed out waiting for websocket close after concurrent stop")
await withTimeout(listener.stop(true), 5_000, "timed out waiting for repeated listener.stop(true)")
stopped = true
} finally {
if (!stopped) await stop(listener, "timed out cleaning up concurrent stop listener").catch(() => undefined)
}
})
testPty("stop(true) can force a graceful stop already in progress", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const listener = await startListener()
let stopped = false
try {
const socket = await openPtySocket(listener, tmp.path)
const graceful = listener.stop()
const forced = listener.stop(true)
await withTimeout(
Promise.all([graceful, forced]).then(() => undefined),
10_000,
"timed out waiting for forced listener stop",
)
await withTimeout(socket.closed, 5_000, "timed out waiting for websocket close after forced stop")
stopped = true
} finally {
if (!stopped) await stop(listener, "timed out cleaning up forced stop listener").catch(() => undefined)
}
})
testPty("graceful stop waits for an overlapping forced stop", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const listener = await startListener()
let stopped = false
try {
const socket = await openPtySocket(listener, tmp.path)
const forced = listener.stop(true)
await withTimeout(listener.stop(), 10_000, "timed out waiting for graceful stop after forced stop")
stopped = true
await withTimeout(forced, 5_000, "timed out waiting for overlapping forced stop")
await withTimeout(socket.closed, 5_000, "timed out waiting for websocket close before graceful stop resolved")
} finally {
if (!stopped) await stop(listener, "timed out cleaning up overlapping stop listener").catch(() => undefined)
}
})
test("stop() gracefully closes an idle listener and is repeat-safe", async () => {
const listener = await startListener()
await withTimeout(listener.stop(), 10_000, "timed out waiting for graceful listener.stop()")
await withTimeout(listener.stop(), 5_000, "timed out waiting for repeated graceful listener.stop()")
await expect(
fetch(new URL(PtyPaths.shells, listener.url), { headers: { authorization: authorization() } }),
).rejects.toThrow()
})
test("default in-process handler does not emit Effect HTTP response logs", async () => {
let output = ""
// oxlint-disable-next-line typescript-eslint/unbound-method -- restored in finally after temporarily capturing stderr.
const original = process.stderr.write
process.stderr.write = ((chunk) => {
output += String(chunk)
return true
}) as typeof process.stderr.write
try {
const response = await Server.Default().app.request("/status")
expect(response.status).toBe(200)
} finally {
process.stderr.write = original
}
expect(output).not.toContain("Sent HTTP response")
})
test("port 0 prefers 4096 when free", async () => {
if (!(await isPortFree(4096))) return
const listener = await startListener()
try {
expect(listener.port).toBe(4096)
} finally {
await stop(listener, "timed out cleaning up port-0 prefers-4096 listener")
}
})
test("port 0 falls back when 4096 is taken", async () => {
const blocker = await occupyPort(4096)
if (!blocker) return
try {
const listener = await startListener()
try {
expect(listener.port).not.toBe(4096)
expect(listener.port).toBeGreaterThan(0)
} finally {
await stop(listener, "timed out cleaning up port-0 fallback listener")
}
} finally {
await new Promise<void>((resolve) => blocker.close(() => resolve()))
}
})
testPty("rejects unsafe PTY ticket mint and connect requests", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const listener = await startListener()
@@ -295,3 +416,20 @@ describe("HttpApi Server.listen", () => {
}
})
})
function isPortFree(port: number) {
return new Promise<boolean>((resolve) => {
const probe = net.createServer()
probe.once("error", () => resolve(false))
probe.once("listening", () => probe.close(() => resolve(true)))
probe.listen(port, "127.0.0.1")
})
}
function occupyPort(port: number) {
return new Promise<net.Server | undefined>((resolve) => {
const server = net.createServer()
server.once("error", () => resolve(undefined))
server.listen(port, "127.0.0.1", () => resolve(server))
})
}

View File

@@ -1,6 +1,6 @@
import { describe, expect } from "bun:test"
import { Context, Effect, Layer } from "effect"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { McpPaths } from "../../src/server/routes/instance/httpapi/groups/mcp"
import { Server } from "../../src/server/server"
import * as Log from "@opencode-ai/core/util/log"
@@ -23,10 +23,10 @@ function app() {
return Server.Default().app
}
type TestApp = ReturnType<typeof app>
type TestHandler = ReturnType<typeof ExperimentalHttpApiServer.webHandler>
type TestHandler = ReturnType<typeof HttpApiApp.webHandler>
const handlerScoped = Effect.acquireRelease(
Effect.sync(() => ExperimentalHttpApiServer.webHandler()),
Effect.sync(() => HttpApiApp.webHandler()),
(handler) => Effect.promise(() => handler.dispose()).pipe(Effect.ignore),
)

View File

@@ -0,0 +1,82 @@
import { afterEach, describe, expect, mock, test } from "bun:test"
import { Flag } from "@opencode-ai/core/flag/flag"
import * as Log from "@opencode-ai/core/util/log"
import { withTimeout } from "../../src/util/timeout"
import { resetDatabase } from "../fixture/db"
import { disposeAllInstances } from "../fixture/fixture"
void Log.init({ print: false })
type Event = { kind: "publish"; port: number; name: string } | { kind: "unpublishAll" } | { kind: "destroy" }
const events: Event[] = []
void mock.module("bonjour-service", () => ({
Bonjour: class {
publish(opts: { port: number; name: string }) {
events.push({ kind: "publish", port: opts.port, name: opts.name })
return { on: () => {} }
}
unpublishAll() {
events.push({ kind: "unpublishAll" })
}
destroy() {
events.push({ kind: "destroy" })
}
},
}))
// Import Server AFTER the mock so the MDNS module picks up the stub.
const { Server } = await import("../../src/server/server")
const original = {
OPENCODE_SERVER_PASSWORD: Flag.OPENCODE_SERVER_PASSWORD,
OPENCODE_SERVER_USERNAME: Flag.OPENCODE_SERVER_USERNAME,
}
afterEach(async () => {
events.length = 0
Flag.OPENCODE_SERVER_PASSWORD = original.OPENCODE_SERVER_PASSWORD
Flag.OPENCODE_SERVER_USERNAME = original.OPENCODE_SERVER_USERNAME
await disposeAllInstances()
await resetDatabase()
})
describe("HttpApi Server.listen mDNS", () => {
test("skips publish for loopback hostnames", async () => {
Flag.OPENCODE_SERVER_PASSWORD = "mdns-secret"
Flag.OPENCODE_SERVER_USERNAME = "opencode"
const listener = await Server.listen({ hostname: "127.0.0.1", port: 0, mdns: true })
try {
expect(events.filter((e) => e.kind === "publish")).toEqual([])
} finally {
await withTimeout(listener.stop(true), 10_000, "timed out stopping loopback mdns listener")
}
expect(events.filter((e) => e.kind === "publish")).toEqual([])
})
test("publishes for non-loopback hostnames and unpublishes on stop", async () => {
Flag.OPENCODE_SERVER_PASSWORD = "mdns-secret"
Flag.OPENCODE_SERVER_USERNAME = "opencode"
const listener = await Server.listen({ hostname: "0.0.0.0", port: 0, mdns: true })
try {
const published = events.filter((e) => e.kind === "publish")
expect(published.length).toBe(1)
expect(published[0]!.port).toBe(listener.port)
expect(published[0]!.name).toBe(`opencode-${listener.port}`)
} finally {
await withTimeout(listener.stop(true), 10_000, "timed out stopping mdns listener")
}
expect(events.some((e) => e.kind === "unpublishAll")).toBe(true)
expect(events.some((e) => e.kind === "destroy")).toBe(true)
})
test("scope finalizer unpublishes even if stop() is not called for force-close", async () => {
Flag.OPENCODE_SERVER_PASSWORD = "mdns-secret"
Flag.OPENCODE_SERVER_USERNAME = "opencode"
const listener = await Server.listen({ hostname: "0.0.0.0", port: 0, mdns: true })
expect(events.filter((e) => e.kind === "publish").length).toBe(1)
// Plain (graceful) stop without close=true should still unpublish.
await withTimeout(listener.stop(), 10_000, "timed out stopping graceful mdns listener")
expect(events.some((e) => e.kind === "unpublishAll")).toBe(true)
})
})

View File

@@ -10,7 +10,7 @@ import { disposeAllInstances, tmpdir, tmpdirScoped } from "../fixture/fixture"
import { Config, Effect, Layer, Queue, Schema } from "effect"
import { HttpClient, HttpClientRequest, HttpRouter, HttpServer } from "effect/unstable/http"
import * as Socket from "effect/unstable/socket/Socket"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { Pty } from "../../src/pty"
import { testEffect } from "../lib/effect"
@@ -30,7 +30,7 @@ const testStateLayer = Layer.effectDiscard(
)
const servedRoutes: Layer.Layer<never, Config.ConfigError, HttpServer.HttpServer> = HttpRouter.serve(
ExperimentalHttpApiServer.routes,
HttpApiApp.routes,
{ disableListenLog: true, disableLogger: true },
)

View File

@@ -4,7 +4,7 @@ import { HttpRouter } from "effect/unstable/http"
import { Instance } from "../../src/project/instance"
import { EventPaths } from "../../src/server/routes/instance/httpapi/groups/event"
import { PtyPaths } from "../../src/server/routes/instance/httpapi/groups/pty"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { PtyID } from "../../src/pty/schema"
import { resetDatabase } from "../fixture/db"
import { disposeAllInstances, tmpdir } from "../fixture/fixture"
@@ -14,7 +14,7 @@ void Log.init({ print: false })
function app(input: { password?: string; username?: string }) {
const handler = HttpRouter.toWebHandler(
ExperimentalHttpApiServer.routes.pipe(
HttpApiApp.routes.pipe(
Layer.provide(
ConfigProvider.layer(
ConfigProvider.fromUnknown({
@@ -28,7 +28,7 @@ function app(input: { password?: string; username?: string }) {
).handler
return {
fetch: (request: Request) => handler(request, ExperimentalHttpApiServer.context),
fetch: (request: Request) => handler(request, HttpApiApp.context),
request(input: string | URL | Request, init?: RequestInit) {
return this.fetch(input instanceof Request ? input : new Request(new URL(input, "http://localhost"), init))
},

View File

@@ -10,7 +10,7 @@ import { createOpencodeClient } from "@opencode-ai/sdk/v2"
import { validateSession } from "../../src/cli/cmd/tui/validate-session"
import { InstanceBootstrap } from "../../src/project/bootstrap-service"
import { InstanceStore } from "../../src/project/instance-store"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { Server } from "../../src/server/server"
import { MessageID, PartID, SessionID } from "../../src/session/schema"
import { MessageV2 } from "../../src/session/message-v2"
@@ -53,7 +53,7 @@ function app(serverPath: ServerPath, input?: { password?: string; username?: str
if (serverPath === "default") return Server.Default().app
const handler = HttpRouter.toWebHandler(
ExperimentalHttpApiServer.routes.pipe(
HttpApiApp.routes.pipe(
Layer.provide(
ConfigProvider.layer(
ConfigProvider.fromUnknown({
@@ -66,7 +66,7 @@ function app(serverPath: ServerPath, input?: { password?: string; username?: str
{ disableLogger: true },
).handler
return {
fetch: (request: Request) => handler(request, ExperimentalHttpApiServer.context),
fetch: (request: Request) => handler(request, HttpApiApp.context),
request(input: string | URL | Request, init?: RequestInit) {
return this.fetch(input instanceof Request ? input : new Request(new URL(input, "http://localhost"), init))
},

View File

@@ -3,7 +3,7 @@ import { Context, Effect } from "effect"
import { Flag } from "@opencode-ai/core/flag/flag"
import { Server } from "../../src/server/server"
import { SyncPaths } from "../../src/server/routes/instance/httpapi/groups/sync"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { Session } from "@/session/session"
import * as Log from "@opencode-ai/core/util/log"
import { resetDatabase } from "../fixture/db"
@@ -144,7 +144,7 @@ describe("sync HttpApi", () => {
Effect.gen(function* () {
const tmp = yield* TestInstance
const response = yield* Effect.promise(() =>
ExperimentalHttpApiServer.webHandler().handler(
HttpApiApp.webHandler().handler(
new Request(`http://localhost${SyncPaths.history}`, {
method: "POST",
headers: { "x-opencode-directory": tmp.directory, "content-type": "application/json" },

View File

@@ -16,7 +16,7 @@ import { AppFileSystem } from "@opencode-ai/core/filesystem"
import { RuntimeFlags } from "../../src/effect/runtime-flags"
import { ServerAuth } from "../../src/server/auth"
import { authorizationRouterMiddleware } from "../../src/server/routes/instance/httpapi/middleware/authorization"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { serveEmbeddedUIEffect, serveUIEffect } from "../../src/server/shared/ui"
import { testEffect } from "../lib/effect"
@@ -54,7 +54,7 @@ function restoreEnv(key: string, value: string | undefined) {
function app(input?: { password?: string; username?: string }) {
const handler = HttpRouter.toWebHandler(
ExperimentalHttpApiServer.routes.pipe(
HttpApiApp.routes.pipe(
Layer.provide(
ConfigProvider.layer(
ConfigProvider.fromUnknown({
@@ -72,7 +72,7 @@ function app(input?: { password?: string; username?: string }) {
Promise.resolve(
handler(
input instanceof Request ? input : new Request(new URL(input, "http://localhost"), init),
ExperimentalHttpApiServer.context,
HttpApiApp.context,
),
),
)
@@ -119,7 +119,7 @@ function uiApp(input?: {
Promise.resolve(
handler(
input instanceof Request ? input : new Request(new URL(input, "http://localhost"), init),
ExperimentalHttpApiServer.context,
HttpApiApp.context,
),
),
)
@@ -161,7 +161,7 @@ function routeOrderingApp() {
Promise.resolve(
handler(
input instanceof Request ? input : new Request(new URL(input, "http://localhost"), init),
ExperimentalHttpApiServer.context,
HttpApiApp.context,
),
),
)
@@ -184,7 +184,7 @@ function responseText(response: Response) {
}
describe("HttpApi UI fallback", () => {
it.live("serves the web UI through the experimental backend", () =>
it.live("serves the web UI through the HTTP API app", () =>
Effect.gen(function* () {
let proxiedUrl: string | undefined

View File

@@ -4,7 +4,7 @@ import { HttpRouter } from "effect/unstable/http"
import { Flag } from "@opencode-ai/core/flag/flag"
import { GlobalBus, type GlobalEvent } from "@/bus/global"
import { Worktree } from "@/worktree"
import { ExperimentalHttpApiServer } from "../../src/server/routes/instance/httpapi/server"
import { HttpApiApp } from "../../src/server/routes/instance/httpapi/server"
import { ExperimentalPaths } from "../../src/server/routes/instance/httpapi/groups/experimental"
import { WorkspacePaths } from "../../src/server/routes/instance/httpapi/groups/workspace"
import { resetDatabase } from "../fixture/db"
@@ -36,15 +36,13 @@ type ScopedWorktree = { directory: string; body: CreatedWorktree; ready: Effect.
function serverScoped() {
return Effect.acquireRelease(
Effect.sync(() => HttpRouter.toWebHandler(ExperimentalHttpApiServer.routes, { disableLogger: true })),
Effect.sync(() => HttpRouter.toWebHandler(HttpApiApp.routes, { disableLogger: true })),
(server) => Effect.promise(() => server.dispose()).pipe(Effect.ignore),
)
}
function request(server: TestServer, input: string, init?: RequestInit) {
return Effect.promise(() =>
server.handler(new Request(new URL(input, "http://localhost"), init), ExperimentalHttpApiServer.context),
)
return Effect.promise(() => server.handler(new Request(new URL(input, "http://localhost"), init), HttpApiApp.context))
}
function withRequestTimeout(effect: Effect.Effect<Response>, label: string, ms = 5_000) {