Compare commits

...

1 Commits

Author SHA1 Message Date
James Long
66ab7332de fix(core): switch /event route to effect and implement queue 2026-03-18 21:25:13 -04:00
2 changed files with 93 additions and 61 deletions

View File

@@ -0,0 +1,91 @@
import { Hono } from "hono"
import { describeRoute, resolver } from "hono-openapi"
import { streamSSE } from "hono/streaming"
import { Effect, Queue, Stream } from "effect"
import { Log } from "@/util/log"
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { lazy } from "../../util/lazy"
const log = Log.create({ service: "server" })
export const EventRoutes = lazy(() =>
new Hono().get(
"/event",
describeRoute({
summary: "Subscribe to events",
description: "Get events",
operationId: "event.subscribe",
responses: {
200: {
description: "Event stream",
content: {
"text/event-stream": {
schema: resolver(BusEvent.payloads()),
},
},
},
},
}),
async (c) => {
log.info("event connected")
c.header("X-Accel-Buffering", "no")
c.header("X-Content-Type-Options", "nosniff")
return streamSSE(c, async (stream) => {
await Effect.runPromise(
Stream.callback<string>((q) =>
Effect.acquireRelease(
Effect.sync(() => {
stream.onAbort(() => {
Queue.endUnsafe(q)
})
Queue.offerUnsafe(
q,
JSON.stringify({
type: "server.connected",
properties: {},
}),
)
const unsub = Bus.subscribeAll((event) => {
Queue.offerUnsafe(q, JSON.stringify(event))
if (event.type === Bus.InstanceDisposed.type) {
Queue.endUnsafe(q)
}
})
// Send heartbeat every 10s to prevent stalled proxy streams.
const heartbeat = setInterval(() => {
Queue.offerUnsafe(
q,
JSON.stringify({
type: "server.heartbeat",
properties: {},
}),
)
}, 10_000)
return { heartbeat, unsub }
}),
(x) =>
Effect.sync(() => {
clearInterval(x.heartbeat)
x.unsub()
Queue.endUnsafe(q)
log.info("event disconnected")
}),
),
).pipe(
Stream.runForEach((data) =>
Effect.tryPromise({
try: () => stream.writeSSE({ data }),
catch: () => {},
}),
),
),
)
})
},
),
)

View File

@@ -1,10 +1,7 @@
import { BusEvent } from "@/bus/bus-event"
import { Bus } from "@/bus"
import { Log } from "../util/log"
import { describeRoute, generateSpecs, validator, resolver, openAPIRouteHandler } from "hono-openapi"
import { Hono } from "hono"
import { cors } from "hono/cors"
import { streamSSE } from "hono/streaming"
import { proxy } from "hono/proxy"
import { basicAuth } from "hono/basic-auth"
import z from "zod"
@@ -34,6 +31,7 @@ import { FileRoutes } from "./routes/file"
import { ConfigRoutes } from "./routes/config"
import { ExperimentalRoutes } from "./routes/experimental"
import { ProviderRoutes } from "./routes/provider"
import { EventRoutes } from "./routes/event"
import { InstanceBootstrap } from "../project/bootstrap"
import { NotFoundError } from "../storage/db"
import type { ContentfulStatusCode } from "hono/utils/http-status"
@@ -251,6 +249,7 @@ export namespace Server {
.route("/question", QuestionRoutes())
.route("/provider", ProviderRoutes())
.route("/", FileRoutes())
.route("/", EventRoutes())
.route("/mcp", McpRoutes())
.route("/tui", TuiRoutes())
.post(
@@ -498,64 +497,6 @@ export namespace Server {
return c.json(await Format.status())
},
)
.get(
"/event",
describeRoute({
summary: "Subscribe to events",
description: "Get events",
operationId: "event.subscribe",
responses: {
200: {
description: "Event stream",
content: {
"text/event-stream": {
schema: resolver(BusEvent.payloads()),
},
},
},
},
}),
async (c) => {
log.info("event connected")
c.header("X-Accel-Buffering", "no")
c.header("X-Content-Type-Options", "nosniff")
return streamSSE(c, async (stream) => {
stream.writeSSE({
data: JSON.stringify({
type: "server.connected",
properties: {},
}),
})
const unsub = Bus.subscribeAll(async (event) => {
await stream.writeSSE({
data: JSON.stringify(event),
})
if (event.type === Bus.InstanceDisposed.type) {
stream.close()
}
})
// Send heartbeat every 10s to prevent stalled proxy streams.
const heartbeat = setInterval(() => {
stream.writeSSE({
data: JSON.stringify({
type: "server.heartbeat",
properties: {},
}),
})
}, 10_000)
await new Promise<void>((resolve) => {
stream.onAbort(() => {
clearInterval(heartbeat)
unsub()
resolve()
log.info("event disconnected")
})
})
})
},
)
.all("/*", async (c) => {
const path = c.req.path