Compare commits

...

6 Commits

Author SHA1 Message Date
Aiden Cline
2d037966f4 add note 2026-03-25 23:10:25 -05:00
Aiden Cline
10089c9d28 tweak: adjust codex plugin to distinguish between the 2 session id occurences 2026-03-25 20:11:54 -05:00
Aiden Cline
f8232986d6 tweak: adjust plugin api to accept 'small' as an input for chat.headers hook 2026-03-25 17:42:59 -05:00
Aiden Cline
cea7b7e182 DEBUG LOGGING STUFF 2026-03-25 13:53:19 -05:00
Aiden Cline
33ac63f5b8 fix: get websocket to work w/ oauth 2026-03-25 00:34:55 -05:00
Aiden Cline
05346fdc50 wip 2026-03-24 23:01:08 -05:00
9 changed files with 293 additions and 12 deletions

View File

@@ -383,6 +383,7 @@
"vscode-jsonrpc": "8.2.1",
"web-tree-sitter": "0.25.10",
"which": "6.0.1",
"ws": "8.18.0",
"xdg-basedir": "5.1.0",
"yargs": "18.0.0",
"zod": "catalog:",
@@ -410,6 +411,7 @@
"@types/semver": "^7.5.8",
"@types/turndown": "5.0.5",
"@types/which": "3.0.4",
"@types/ws": "^8.18.1",
"@types/yargs": "17.0.33",
"@typescript/native-preview": "catalog:",
"drizzle-kit": "catalog:",

View File

@@ -55,6 +55,7 @@
"@types/semver": "^7.5.8",
"@types/turndown": "5.0.5",
"@types/which": "3.0.4",
"@types/ws": "^8.18.1",
"@types/yargs": "17.0.33",
"@typescript/native-preview": "catalog:",
"drizzle-kit": "catalog:",
@@ -133,9 +134,9 @@
"minimatch": "10.0.3",
"open": "10.1.2",
"opencode-gitlab-auth": "2.0.0",
"opencode-poe-auth": "0.0.1",
"opentui-spinner": "0.0.6",
"partial-json": "0.1.7",
"opencode-poe-auth": "0.0.1",
"remeda": "catalog:",
"semver": "^7.6.3",
"solid-js": "catalog:",
@@ -145,6 +146,7 @@
"ulid": "catalog:",
"vscode-jsonrpc": "8.2.1",
"web-tree-sitter": "0.25.10",
"ws": "8.18.0",
"which": "6.0.1",
"xdg-basedir": "5.1.0",
"yargs": "18.0.0",

View File

@@ -6,7 +6,7 @@ import { createOpencodeClient } from "@opencode-ai/sdk"
import { Server } from "../server/server"
import { BunProc } from "../bun"
import { Flag } from "../flag/flag"
import { CodexAuthPlugin } from "./codex"
import { CodexAuthPlugin } from "./openai/codex"
import { Session } from "../session"
import { NamedError } from "@opencode-ai/util/error"
import { CopilotAuthPlugin } from "./copilot"

View File

@@ -1,11 +1,12 @@
import type { Hooks, PluginInput } from "@opencode-ai/plugin"
import { Log } from "../util/log"
import { Installation } from "../installation"
import { Auth, OAUTH_DUMMY_KEY } from "../auth"
import { Log } from "../../util/log"
import { Installation } from "../../installation"
import { OAUTH_DUMMY_KEY } from "../../auth"
import os from "os"
import { ProviderTransform } from "@/provider/transform"
import { ModelID, ProviderID } from "@/provider/schema"
import { setTimeout as sleep } from "node:timers/promises"
import { createWebSocketFetch } from "./websocket"
const log = Log.create({ service: "plugin.codex" })
@@ -351,12 +352,18 @@ function waitForOAuthCallback(pkce: PkceCodes, state: string): Promise<TokenResp
}
export async function CodexAuthPlugin(input: PluginInput): Promise<Hooks> {
const ws = createWebSocketFetch()
return {
auth: {
provider: "openai",
async loader(getAuth, provider) {
const auth = await getAuth()
if (auth.type !== "oauth") return {}
if (auth.type !== "oauth")
return {
async fetch(requestInput: RequestInfo | URL, init?: RequestInit) {
return ws(requestInput, init)
},
}
// Filter models to only allowed Codex models for OAuth
const allowedModels = new Set([
@@ -491,7 +498,7 @@ export async function CodexAuthPlugin(input: PluginInput): Promise<Hooks> {
? new URL(CODEX_API_ENDPOINT)
: parsed
return fetch(url, {
return ws(url, {
...init,
headers,
})
@@ -622,7 +629,14 @@ export async function CodexAuthPlugin(input: PluginInput): Promise<Hooks> {
if (input.model.providerID !== "openai") return
output.headers.originator = "opencode"
output.headers["User-Agent"] = `opencode/${Installation.VERSION} (${os.platform()} ${os.release()}; ${os.arch()})`
output.headers.session_id = input.sessionID
// TODO: this is kinda hacky, we need to move the transport creation to a later point so it can accept more arguments rather than just relying on whatever is passed to fetch...
// distinguish between title gen and actual chat session
if (!input.small) {
output.headers.session_id = input.sessionID
} else {
output.headers.session_id = input.sessionID + "_title"
}
},
}
}

View File

@@ -0,0 +1,248 @@
import WebSocket from "ws"
import { Log } from "@/util/log"
const log = Log.create({ service: "plugin.openai.websocket" })
export interface CreateWebSocketFetchOptions {
/**
* WebSocket endpoint URL.
* @default 'wss://api.openai.com/v1/responses'
*/
url?: string
}
/**
* Creates a `fetch` function that routes OpenAI Responses API streaming
* requests through a persistent WebSocket connection instead of HTTP.
*
* Non-streaming requests and requests to other endpoints are passed
* through to the standard `fetch`.
*
* The connection is created lazily on the first streaming request and
* reused for subsequent ones, which is the main source of latency
* savings in multi-step tool-calling workflows.
*
* @example
* ```ts
* import { createOpenAI } from '@ai-sdk/openai';
* import { createWebSocketFetch } from 'ai-sdk-openai-websocket-fetch';
*
* const wsFetch = createWebSocketFetch();
* const openai = createOpenAI({ fetch: wsFetch });
*
* const result = streamText({
* model: openai('gpt-4.1-mini'),
* prompt: 'Hello!',
* onFinish: () => wsFetch.close(),
* });
* ```
*/
export function createWebSocketFetch(options?: CreateWebSocketFetchOptions) {
let ws: WebSocket | null = null
let connecting: Promise<WebSocket> | null = null
let busy = false
function getConnection(url: string, headers: Record<string, string>): Promise<WebSocket> {
if (ws?.readyState === WebSocket.OPEN && !busy) {
log.debug("reusing websocket", { url })
return Promise.resolve(ws)
}
if (connecting && !busy) return connecting
connecting = new Promise<WebSocket>((resolve, reject) => {
log.debug("connecting websocket", {
url,
headers: Object.keys(headers).sort().join(","),
})
const socket = new WebSocket(url, { headers })
socket.on("open", () => {
ws = socket
connecting = null
log.debug("websocket connected", { url })
resolve(socket)
})
socket.on("error", (err) => {
log.debug("websocket connect error", {
url,
error: err.message,
})
if (connecting) {
connecting = null
reject(err)
}
})
socket.on("close", () => {
log.debug("websocket closed", { url })
if (ws === socket) ws = null
})
})
return connecting
}
async function websocketFetch(input: RequestInfo | URL, init?: RequestInit): Promise<Response> {
const url = input instanceof URL ? input.toString() : typeof input === "string" ? input : input.url
if (init?.method !== "POST" || !url.endsWith("/responses")) {
return globalThis.fetch(input, init)
}
let body: Record<string, unknown>
try {
body = JSON.parse(typeof init.body === "string" ? init.body : "")
} catch {
return globalThis.fetch(input, init)
}
if (!body.stream) {
return globalThis.fetch(input, init)
}
const wsUrl = getWebSocketURL(url, options?.url)
const headers = getWebSocketHeaders(init.headers)
log.debug("intercepting responses request", {
url,
wsUrl,
stream: true,
})
const connection = await getConnection(wsUrl, headers)
busy = true
const { stream: _, ...requestBody } = body
const encoder = new TextEncoder()
const responseStream = new ReadableStream<Uint8Array>({
start(controller) {
function cleanup() {
connection.off("message", onMessage)
connection.off("error", onError)
connection.off("close", onClose)
busy = false
}
function onMessage(data: WebSocket.RawData) {
const text = data.toString()
log.debug("websocket event", { event: pretty(text), url: wsUrl })
controller.enqueue(encoder.encode(`data: ${text}\n\n`))
try {
const event = JSON.parse(text)
if (event.type === "response.completed" || event.type === "error") {
controller.enqueue(encoder.encode("data: [DONE]\n\n"))
cleanup()
controller.close()
}
} catch {
// non-JSON frame, continue
}
}
function onError(err: Error) {
log.debug("websocket stream error", { url: wsUrl, error: err.message })
cleanup()
controller.error(err)
}
function onClose() {
log.debug("websocket stream close", { url: wsUrl })
cleanup()
try {
controller.close()
} catch {
// already closed
}
}
connection.on("message", onMessage)
connection.on("error", onError)
connection.on("close", onClose)
if (init?.signal) {
if (init.signal.aborted) {
cleanup()
controller.error(init.signal.reason ?? new DOMException("Aborted", "AbortError"))
return
}
init.signal.addEventListener(
"abort",
() => {
cleanup()
try {
controller.error(init!.signal!.reason ?? new DOMException("Aborted", "AbortError"))
} catch {
// already closed
}
},
{ once: true },
)
}
connection.send(JSON.stringify({ type: "response.create", ...requestBody }))
},
})
return new Response(responseStream, {
status: 200,
headers: { "content-type": "text/event-stream" },
})
}
return Object.assign(websocketFetch, {
/** Close the underlying WebSocket connection. */
close() {
if (ws) {
ws.close()
ws = null
}
},
})
}
function normalizeHeaders(headers: HeadersInit | undefined): Record<string, string> {
const result: Record<string, string> = {}
if (!headers) return result
if (headers instanceof Headers) {
headers.forEach((v, k) => {
result[k.toLowerCase()] = v
})
} else if (Array.isArray(headers)) {
for (const [k, v] of headers) {
result[k.toLowerCase()] = v
}
} else {
for (const [k, v] of Object.entries(headers)) {
if (v != null) result[k.toLowerCase()] = v
}
}
return result
}
function getWebSocketHeaders(headers: HeadersInit | undefined) {
const result = normalizeHeaders(headers)
delete result["content-length"]
result["openai-beta"] ??= "responses_websockets=2026-02-06"
return result
}
function getWebSocketURL(url: string, fallback?: string) {
if (fallback) return fallback
const parsed = new URL(url)
parsed.protocol = parsed.protocol === "https:" ? "wss:" : "ws:"
return parsed.toString()
}
function pretty(text: string) {
try {
return JSON.stringify(JSON.parse(text), null, 2)
} catch {
return text
}
}

View File

@@ -155,6 +155,7 @@ export namespace LLM {
model: input.model,
provider,
message: input.user,
small: input.small === true,
},
{
headers: {},

View File

@@ -22,6 +22,10 @@ export namespace Log {
return levelPriority[input] >= levelPriority[level]
}
function shouldServiceLog(service: unknown) {
return service === "plugin.openai.websocket"
}
export type Logger = {
debug(message?: any, extra?: Record<string, any>): void
info(message?: any, extra?: Record<string, any>): void
@@ -62,8 +66,10 @@ export namespace Log {
cleanup(Global.Path.log)
if (options.print) return
logpath = path.join(
Global.Path.log,
options.dev ? "dev.log" : new Date().toISOString().split(".")[0].replace(/:/g, "") + ".log",
// TODO: STOP DOING THIS!!!!!
"dev.log",
// Global.Path.log,
// options.dev ? "dev.log" : new Date().toISOString().split(".")[0].replace(/:/g, "") + ".log",
)
await fs.truncate(logpath).catch(() => {})
const stream = createWriteStream(logpath, { flags: "a" })
@@ -109,6 +115,7 @@ export namespace Log {
}
function build(message: any, extra?: Record<string, any>) {
if (!shouldServiceLog(tags?.["service"])) return
const prefix = Object.entries({
...tags,
...extra,

View File

@@ -4,7 +4,7 @@ import {
extractAccountIdFromClaims,
extractAccountId,
type IdTokenClaims,
} from "../../src/plugin/codex"
} from "../../src/plugin/openai/codex"
function createTestJwt(payload: object): string {
const header = Buffer.from(JSON.stringify({ alg: "none" })).toString("base64url")

View File

@@ -187,7 +187,14 @@ export interface Hooks {
output: { temperature: number; topP: number; topK: number; options: Record<string, any> },
) => Promise<void>
"chat.headers"?: (
input: { sessionID: string; agent: string; model: Model; provider: ProviderContext; message: UserMessage },
input: {
sessionID: string
agent: string
model: Model
provider: ProviderContext
message: UserMessage
small: boolean
},
output: { headers: Record<string, string> },
) => Promise<void>
"permission.ask"?: (input: Permission, output: { status: "ask" | "deny" | "allow" }) => Promise<void>