fix(httpapi): preserve event stream context (#27425)

Co-authored-by: Aiden Cline <aidenpcline@gmail.com>
Co-authored-by: James Long <longster@gmail.com>
This commit is contained in:
Kit Langton
2026-05-13 22:02:30 -04:00
committed by GitHub
parent 04286d0415
commit b928a1fff9
2 changed files with 102 additions and 37 deletions

View File

@@ -40,30 +40,37 @@ function eventData(data: unknown): Sse.Event {
}
function eventResponse(bus: Bus.Interface) {
const events = bus.subscribeAll().pipe(Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type))
const heartbeat = Stream.tick("10 seconds").pipe(
Stream.drop(1),
Stream.map(() => ({ id: Bus.createID(), type: "server.heartbeat", properties: {} })),
)
return Effect.gen(function* () {
const context = yield* Effect.context()
log.info("event connected")
return HttpServerResponse.stream(
Stream.make({ id: Bus.createID(), type: "server.connected", properties: {} }).pipe(
Stream.concat(events.pipe(Stream.merge(heartbeat, { haltStrategy: "left" }))),
Stream.map(eventData),
Stream.pipeThroughChannel(Sse.encode()),
Stream.encodeText,
Stream.ensuring(Effect.sync(() => log.info("event disconnected"))),
),
{
contentType: "text/event-stream",
headers: {
"Cache-Control": "no-cache, no-transform",
"X-Accel-Buffering": "no",
"X-Content-Type-Options": "nosniff",
const events = bus.subscribeAll().pipe(
Stream.provideContext(context),
Stream.takeUntil((event) => event.type === Bus.InstanceDisposed.type),
)
const heartbeat = Stream.tick("10 seconds").pipe(
Stream.drop(1),
Stream.map(() => ({ id: Bus.createID(), type: "server.heartbeat", properties: {} })),
)
log.info("event connected")
return HttpServerResponse.stream(
Stream.make({ id: Bus.createID(), type: "server.connected", properties: {} }).pipe(
Stream.concat(events.pipe(Stream.merge(heartbeat, { haltStrategy: "left" }))),
Stream.map(eventData),
Stream.pipeThroughChannel(Sse.encode()),
Stream.encodeText,
Stream.ensuring(Effect.sync(() => log.info("event disconnected"))),
),
{
contentType: "text/event-stream",
headers: {
"Cache-Control": "no-cache, no-transform",
"X-Accel-Buffering": "no",
"X-Content-Type-Options": "nosniff",
},
},
},
)
)
})
}
export const eventHandlers = HttpApiBuilder.group(EventApi, "event", (handlers) =>
@@ -72,7 +79,7 @@ export const eventHandlers = HttpApiBuilder.group(EventApi, "event", (handlers)
return handlers.handleRaw(
"subscribe",
Effect.fn("EventHttpApi.subscribe")(function* () {
return eventResponse(bus)
return yield* eventResponse(bus)
}),
)
}),

View File

@@ -1,10 +1,13 @@
import { afterEach, describe, expect, test } from "bun:test"
import { Bus } from "../../src/bus"
import { Instance } from "../../src/project/instance"
import { Server } from "../../src/server/server"
import { EventPaths } from "../../src/server/routes/instance/httpapi/event"
import { Event as ServerEvent } from "../../src/server/event"
import * as Log from "@opencode-ai/core/util/log"
import { Schema } from "effect"
import { resetDatabase } from "../fixture/db"
import { disposeAllInstances, tmpdir } from "../fixture/fixture"
import { disposeAllInstances, reloadTestInstance, tmpdir } from "../fixture/fixture"
void Log.init({ print: false })
@@ -12,25 +15,42 @@ function app() {
return Server.Default().app
}
async function readFirstChunk(response: Response) {
if (!response.body) throw new Error("missing response body")
const reader = response.body.getReader()
const result = await Promise.race([
reader.read(),
new Promise<never>((_, reject) => setTimeout(() => reject(new Error("timed out waiting for event")), 5_000)),
])
await reader.cancel()
return new TextDecoder().decode(result.value)
const EventData = Schema.Struct({
id: Schema.optional(Schema.String),
type: Schema.String,
properties: Schema.Record(Schema.String, Schema.Any),
})
async function readChunk(reader: ReadableStreamDefaultReader<Uint8Array>) {
let timeout: ReturnType<typeof setTimeout> | undefined
try {
return await Promise.race([
reader.read(),
new Promise<never>((_, reject) => {
timeout = setTimeout(() => reject(new Error("timed out waiting for event")), 5_000)
}),
])
} finally {
if (timeout) clearTimeout(timeout)
}
}
async function readFirstEvent(response: Response) {
return JSON.parse((await readFirstChunk(response)).replace(/^data: /, "")) as {
id?: string
type: string
properties: Record<string, unknown>
if (!response.body) throw new Error("missing response body")
const reader = response.body.getReader()
try {
return await readEvent(reader)
} finally {
await reader.cancel()
}
}
async function readEvent(reader: ReadableStreamDefaultReader<Uint8Array>) {
const result = await readChunk(reader)
if (result.done || !result.value) throw new Error("event stream closed")
return Schema.decodeUnknownSync(EventData)(JSON.parse(new TextDecoder().decode(result.value).replace(/^data: /, "")))
}
afterEach(async () => {
await disposeAllInstances()
await resetDatabase()
@@ -56,4 +76,42 @@ describe("event HttpApi", () => {
expect(await readFirstEvent(response)).toMatchObject({ type: "server.connected", properties: {} })
})
test("keeps the event stream open after the initial event", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
if (!response.body) throw new Error("missing response body")
const reader = response.body.getReader()
try {
expect(await readEvent(reader)).toMatchObject({ type: "server.connected", properties: {} })
const next = await Promise.race([
reader.read().then((result) => (result.done ? "closed" : "event")),
new Promise<"open">((resolve) => setTimeout(() => resolve("open"), 250)),
])
expect(next).toBe("open")
} finally {
await reader.cancel()
}
})
test("delivers instance bus events after the initial event", async () => {
await using tmp = await tmpdir({ git: true, config: { formatter: false, lsp: false } })
const response = await app().request(EventPaths.event, { headers: { "x-opencode-directory": tmp.path } })
if (!response.body) throw new Error("missing response body")
const reader = response.body.getReader()
try {
expect(await readEvent(reader)).toMatchObject({ type: "server.connected", properties: {} })
const next = readEvent(reader)
const ctx = await reloadTestInstance({ directory: tmp.path })
await Instance.restore(ctx, () => Bus.publish(ServerEvent.Connected, {}))
expect(await next).toMatchObject({ type: "server.connected", properties: {} })
} finally {
await reader.cancel()
}
})
})