Compare commits

...

5 Commits

Author SHA1 Message Date
Kit Langton
e4fbf5c657 fix(mcp): use Effect.timeoutOrElse for transport connect timeout
Replace withTimeout (Promise.race) with Effect.timeoutOrElse so that
fiber interruption properly triggers acquireUseRelease cleanup. The
Promise.race approach left the underlying connect promise dangling,
which prevented the release function from running on some platforms.
2026-03-26 11:19:10 -04:00
Kit Langton
3439521ef0 effectify MCP service: convert remaining Promise code to idiomatic Effect
- fetchFromClient: async function -> Effect.tryPromise + Effect.map
- collectFromConnected: compose directly with Effect fetchFromClient
- closeClient: Effect.promise(.catch) -> Effect.tryPromise + Effect.ignore
- Bus.publish calls: .catch() -> busPublish helper with Effect.tryPromise + Effect.ignore
- startAuth: Effect.promise(async try/catch) -> Effect.tryPromise + Effect.catch
- finishAuth: simplify inner async/await
- create(): eliminate mutable let vars, extract connectRemote/connectLocal
- Extract sanitize helper, CreateResult interface, busPublish helper
- watch callback: use busPublish via Effect.runPromise
- getConfig helper function for repeated Config.get() pattern
2026-03-26 10:39:33 -04:00
Kit Langton
7965300ce5 refactor(mcp): convert create/defs/tryConnect to Effect-native patterns
Replace the async tryConnect helper with connectTransport using
Effect.acquireUseRelease for guaranteed transport cleanup on failure.
Convert create() from async function to Effect.gen and defs() to
return an Effect. Callers now yield* directly instead of wrapping
in Effect.promise.
2026-03-26 10:39:33 -04:00
Kit Langton
ebdecf2ec7 fix(mcp): close transport on failed/timed-out connections
When withTimeout rejects during MCP connect, the transport (and its
child process for stdio servers) was never closed. Extract a tryConnect
helper that ensures transport.close() is always called on failure,
eliminating process/connection leaks in all connect paths.

Fixes #19168
2026-03-26 10:39:33 -04:00
Kit Langton
4342ef6c97 review test 2026-03-26 10:39:32 -04:00
9 changed files with 1253 additions and 229 deletions

View File

@@ -0,0 +1,65 @@
---
description: Review pull requests for correctness bugs and regressions
mode: primary
model: opencode/gpt-5.4
reasoningEffort: high
textVerbosity: low
temperature: 0.1
tools:
write: false
edit: false
bash: false
webfetch: false
task: false
todowrite: false
---
You are a pull request reviewer focused on correctness.
Start by reading `.opencode-review/pr.json`, `.opencode-review/files.json`, and
`.opencode-review/diff.patch`.
You have read access to the full repository. Use that access only for targeted
follow-up on changed files: direct callees, direct callers, touched tests,
related types, or helpers needed to confirm a concrete bug.
Review strategy:
1. Start with changed hunks.
2. Read the full changed file only when a hunk needs more context.
3. Expand to other files only when they are directly relevant to a suspected
bug.
4. Stop once you have enough evidence to either report the issue or discard it.
Avoid broad repo exploration. Do not read unrelated files just to learn the
architecture. Prefer depth on a few relevant files over breadth across many
files.
Report only concrete issues with a plausible failure mode. Ignore formatting,
micro-optimizations, and weak style opinions.
Do not report more than 5 findings.
Return only JSON. The response must be an array of objects with this exact
shape:
```json
[
{
"category": "correctness",
"severity": "must-fix",
"confidence": "high",
"file": "path/to/file.ts",
"line": 12,
"summary": "Short one-line bug summary",
"evidence": "Why this is a real issue in the current code",
"suggestion": "Optional fix direction",
"introduced": true
}
]
```
Severity must be one of `must-fix`, `should-fix`, or `suggestion`.
Confidence must be one of `high`, `medium`, or `low`.
If there are no issues, return `[]`.

View File

@@ -0,0 +1,64 @@
---
description: Review pull requests for high-signal maintainability issues
mode: primary
model: opencode/gpt-5.4
reasoningEffort: high
textVerbosity: low
temperature: 0.1
tools:
write: false
edit: false
bash: false
webfetch: false
task: false
todowrite: false
---
You are a pull request reviewer focused on maintainability.
Start by reading `.opencode-review/pr.json`, `.opencode-review/files.json`, and
`.opencode-review/diff.patch`.
Use repository guidance from `AGENTS.md` and `REVIEW.md` when present. Be
strict about real repo conventions, but do not nitpick personal taste.
Review strategy:
1. Start with changed hunks.
2. Read the full changed file when needed.
3. Expand to nearby helpers, tests, or conventions only when the diff suggests
a real maintainability problem.
4. Stop when you have enough evidence.
Avoid repo-wide convention hunts. Do not search broadly for every possible
style rule.
Only report issues that create meaningful maintenance cost, hide bugs, or break
clear project conventions. Ignore harmless formatting or one-off stylistic
differences.
Do not report more than 5 findings.
Return only JSON. The response must be an array of objects with this exact
shape:
```json
[
{
"category": "maintainability",
"severity": "should-fix",
"confidence": "high",
"file": "path/to/file.ts",
"line": 12,
"summary": "Short one-line maintainability issue summary",
"evidence": "Why this matters in this codebase",
"suggestion": "Optional fix direction",
"introduced": true
}
]
```
Severity must be one of `must-fix`, `should-fix`, or `suggestion`.
Confidence must be one of `high`, `medium`, or `low`.
If there are no issues, return `[]`.

View File

@@ -0,0 +1,63 @@
---
description: Review pull requests for security issues and unsafe changes
mode: primary
model: opencode/gpt-5.4
reasoningEffort: high
textVerbosity: low
temperature: 0.1
tools:
write: false
edit: false
bash: false
webfetch: false
task: false
todowrite: false
---
You are a pull request reviewer focused on security.
Start by reading `.opencode-review/pr.json`, `.opencode-review/files.json`, and
`.opencode-review/diff.patch`.
You have read access to the full repository. Inspect related code only when it
is directly connected to changed code, especially auth, validation,
persistence, secrets handling, logging, and data exposure paths.
Review strategy:
1. Start with changed hunks.
2. Read the full changed file only when needed.
3. Expand only to directly connected validation, auth, storage, or transport
code.
4. Stop once you can prove or reject the issue.
Avoid broad repo sweeps or generic checklist-driven exploration.
Only report concrete issues introduced or exposed by this pull request. Ignore
generic OWASP checklists unless the code actually shows the problem.
Do not report more than 5 findings.
Return only JSON. The response must be an array of objects with this exact
shape:
```json
[
{
"category": "security",
"severity": "must-fix",
"confidence": "high",
"file": "path/to/file.ts",
"line": 12,
"summary": "Short one-line security issue summary",
"evidence": "Why this is a real issue in the current code",
"suggestion": "Optional fix direction",
"introduced": true
}
]
```
Severity must be one of `must-fix`, `should-fix`, or `suggestion`.
Confidence must be one of `high`, `medium`, or `low`.
If there are no issues, return `[]`.

View File

@@ -0,0 +1,56 @@
---
description: Verify pull request review findings and remove weak claims
mode: primary
model: opencode/gpt-5.4
reasoningEffort: high
textVerbosity: low
temperature: 0.1
tools:
write: false
edit: false
bash: false
webfetch: false
task: false
todowrite: false
---
You are a verification pass for pull request review findings.
Start by reading `.opencode-review/pr.json`, `.opencode-review/files.json`,
`.opencode-review/diff.patch`, and `.opencode-review/candidates.json`.
For each candidate, inspect the cited code and reject anything that is:
- vague or speculative
- duplicated by a stronger finding
- unsupported by the current code
- not meaningfully attributable to this pull request
- a harmless style preference
Keep only findings with concrete evidence and an actionable explanation.
Prefer reading the cited file and directly related context only. Do not do a
broad repo search unless a candidate specifically depends on another file.
Return no more than 8 findings.
Return only JSON. The response must be an array of objects with this exact
shape:
```json
[
{
"category": "correctness",
"severity": "must-fix",
"confidence": "high",
"file": "path/to/file.ts",
"line": 12,
"summary": "Short one-line issue summary",
"evidence": "Why this survived verification",
"suggestion": "Optional fix direction",
"introduced": true
}
]
```
If there are no verified issues, return `[]`.

21
REVIEW.md Normal file
View File

@@ -0,0 +1,21 @@
# Review Guidelines
## Prioritize
- correctness bugs, regressions, and unsafe edge cases
- security issues with concrete impact
- maintainability problems that clearly violate repo conventions
## Flag
- unnecessary `any` in new code when a precise type is practical
- deep nesting when early returns would make the flow clearer
- duplicated logic that should obviously reuse existing helpers
- new routes, migrations, or persistence changes that look untested or unsafe
## Skip
- harmless formatting differences
- stylistic nits without clear repo guidance
- optional micro-optimizations without user impact
- pre-existing issues unrelated to the pull request

View File

@@ -0,0 +1,664 @@
#!/usr/bin/env bun
import { NodeFileSystem, NodePath } from "@effect/platform-node"
import * as CrossSpawnSpawner from "../src/effect/cross-spawn-spawner"
import { makeRuntime } from "../src/effect/run-service"
import path from "path"
import { Duration, Effect, Fiber, FileSystem, Layer, Schema, Schedule, ServiceMap, Stream } from "effect"
import type { PlatformError } from "effect/PlatformError"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
const Category = Schema.Union([
Schema.Literal("correctness"),
Schema.Literal("security"),
Schema.Literal("maintainability"),
])
const Severity = Schema.Union([Schema.Literal("must-fix"), Schema.Literal("should-fix"), Schema.Literal("suggestion")])
const Confidence = Schema.Union([Schema.Literal("high"), Schema.Literal("medium"), Schema.Literal("low")])
class Base extends Schema.Class<Base>("ReviewBase")({
ref: Schema.String,
}) {}
class Head extends Schema.Class<Head>("ReviewHead")({
sha: Schema.String,
ref: Schema.String,
}) {}
class Pull extends Schema.Class<Pull>("ReviewPull")({
number: Schema.Number,
title: Schema.String,
body: Schema.NullOr(Schema.String),
head: Head,
base: Base,
}) {}
class PullFile extends Schema.Class<PullFile>("ReviewPullFile")({
filename: Schema.String,
status: Schema.String,
patch: Schema.optional(Schema.String),
}) {}
class PullContext extends Schema.Class<PullContext>("ReviewPullContext")({
repo: Schema.String,
mergeBase: Schema.String,
pull: Pull,
}) {}
class Finding extends Schema.Class<Finding>("ReviewFinding")({
category: Category,
severity: Severity,
confidence: Confidence,
file: Schema.String,
line: Schema.Number,
summary: Schema.String,
evidence: Schema.String,
suggestion: Schema.String,
introduced: Schema.Boolean,
}) {}
class ReviewError extends Schema.TaggedErrorClass<ReviewError>()("ReviewError", {
message: Schema.String,
cause: Schema.optional(Schema.Defect),
}) {}
const PullFiles = Schema.Array(PullFile)
const Findings = Schema.Array(Finding)
const decodePullJson = Schema.decodeSync(Schema.fromJsonString(Pull))
const decodePullFilesJson = Schema.decodeSync(Schema.fromJsonString(PullFiles))
const decodeFindingsJson = Schema.decodeSync(Schema.fromJsonString(Findings))
const encodePullContext = Schema.encodeSync(Schema.fromJsonString(PullContext))
const encodePullFiles = Schema.encodeSync(Schema.fromJsonString(PullFiles))
const encodeFindings = Schema.encodeSync(Schema.fromJsonString(Findings))
const args = parse(process.argv.slice(2))
export namespace Review {
export interface Interface {
readonly run: (input: {
repo: string
pr: number
post: boolean
}) => Effect.Effect<void, ReviewError | PlatformError>
}
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Review") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const fs = yield* FileSystem.FileSystem
const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
const root = process.cwd()
const bin = process.env.OPENCODE_BIN ?? "opencode"
const note = (text: string) => Effect.sync(() => console.error(`[review] ${text}`))
const fail = (message: string) => (cause: unknown) =>
new ReviewError({
message,
cause,
})
const cmd = Effect.fn("Review.cmd")(function* (file: string, argv: string[], cwd: string) {
const handle = yield* spawner.spawn(
ChildProcess.make(file, argv, {
cwd,
extendEnv: true,
stdin: "ignore",
stdout: "pipe",
stderr: "pipe",
}),
)
const [stdout, stderr, code] = yield* Effect.all(
[
Stream.mkString(Stream.decodeText(handle.stdout)),
Stream.mkString(Stream.decodeText(handle.stderr)),
handle.exitCode,
],
{ concurrency: 3 },
)
if (code !== ChildProcessSpawner.ExitCode(0)) {
return yield* new ReviewError({
message: `${file} ${argv.join(" ")} failed`,
cause: new Error(stderr.trim() || stdout.trim() || `exit=${code}`),
})
}
return stdout
}, Effect.scoped)
const pull = (text: string) =>
Effect.try({
try: () => decodePullJson(text),
catch: fail("pull decode failed"),
})
const files = (text: string) =>
Effect.try({
try: () => decodePullFilesJson(text),
catch: fail("pull files decode failed"),
})
const findings = (text: string) =>
Effect.try({
try: () => decodeFindingsJson(text),
catch: fail("findings decode failed"),
})
const gh = Effect.fn("Review.gh")(function* (argv: string[]) {
return yield* cmd("gh", argv, root)
})
const git = Effect.fn("Review.git")(function* (argv: string[], cwd: string) {
return yield* cmd("git", argv, cwd)
})
const sync = Effect.fn("Review.sync")(function* (dir: string, box: string) {
const src = path.join(root, ".opencode", "agents")
const dst = path.join(dir, ".opencode", "agents")
yield* fs.makeDirectory(dst, { recursive: true }).pipe(Effect.mapError(fail("create agents dir failed")))
for (const name of [
"review-correctness.md",
"review-security.md",
"review-maintainability.md",
"review-verify.md",
]) {
const text = yield* fs.readFileString(path.join(src, name)).pipe(Effect.mapError(fail(`read ${name} failed`)))
yield* fs.writeFileString(path.join(dst, name), text).pipe(Effect.mapError(fail(`write ${name} failed`)))
}
const review = yield* fs
.readFileString(path.join(root, "REVIEW.md"))
.pipe(Effect.mapError(fail("read REVIEW.md failed")))
yield* fs
.writeFileString(path.join(box, "REVIEW.md"), review)
.pipe(Effect.mapError(fail("write REVIEW.md failed")))
})
const parseText = Effect.fn("Review.parseText")(function* (text: string) {
const body = text.trim()
if (!body) return yield* new ReviewError({ message: "review agent returned no text" })
const clean = strip(body)
try {
return decodeFindingsJson(clean)
} catch {}
const start = clean.indexOf("[")
const end = clean.lastIndexOf("]")
if (start !== -1 && end > start) {
return yield* findings(clean.slice(start, end + 1))
}
return yield* new ReviewError({ message: `could not parse findings JSON\n\n${clean}` })
})
const talk = Effect.fn("Review.talk")(function* (agent: string, prompt: string, cwd: string) {
const out: string[] = []
const err: string[] = []
const handle = yield* spawner.spawn(
ChildProcess.make(bin, ["run", "--agent", agent, "--format", "json", prompt], {
cwd,
extendEnv: true,
stdin: "ignore",
stdout: "pipe",
stderr: "pipe",
}),
)
const [, , code] = yield* Effect.all(
[
handle.stdout.pipe(
Stream.decodeText(),
Stream.splitLines,
Stream.runForEach((line) =>
Effect.sync(() => {
out.push(line)
trace(agent, line)
}),
),
),
handle.stderr.pipe(
Stream.decodeText(),
Stream.splitLines,
Stream.runForEach((line) =>
Effect.sync(() => {
err.push(line)
if (line.trim()) console.error(`[review:${agent}] ${line}`)
}),
),
),
handle.exitCode,
],
{ concurrency: 3 },
)
if (code !== ChildProcessSpawner.ExitCode(0)) {
return yield* new ReviewError({
message: `${agent} failed`,
cause: new Error(err.join("\n").trim() || out.join("\n").trim() || `exit=${code}`),
})
}
return out.join("\n")
}, Effect.scoped)
const pass = Effect.fn("Review.pass")(function* (agent: string, prompt: string, cwd: string) {
yield* note(`${agent} tools: read/glob/grep/list allowed; write/edit/bash denied`)
const raw = yield* talk(agent, prompt, cwd)
return yield* parseText(collect(raw))
})
const job = Effect.fn("Review.job")(function* (
name: string,
fx: Effect.Effect<readonly Finding[], ReviewError | PlatformError>,
) {
yield* note(`${name} started`)
const beat = yield* note(`${name} still running`).pipe(
Effect.repeat(Schedule.spaced(Duration.seconds(15))),
Effect.delay(Duration.seconds(15)),
Effect.forkScoped,
)
const out = yield* fx.pipe(
Effect.timeout(Duration.minutes(10)),
Effect.catchTag("TimeoutError", () =>
Effect.fail(new ReviewError({ message: `${name} timed out after 600s` })),
),
Effect.ensuring(Fiber.interrupt(beat)),
)
yield* note(`${name} finished (${out.length} findings)`)
return out
}, Effect.scoped)
const safe = (name: string, fx: Effect.Effect<readonly Finding[], ReviewError | PlatformError>) =>
fx.pipe(Effect.catch((err) => note(`pass failed: ${name}: ${err.message}`).pipe(Effect.as([] as const))))
const inline = Effect.fn("Review.inline")(function* (repo: string, pr: number, sha: string, item: Finding) {
yield* gh([
"api",
"--method",
"POST",
"-H",
"Accept: application/vnd.github+json",
"-H",
"X-GitHub-Api-Version: 2022-11-28",
`/repos/${repo}/pulls/${pr}/comments`,
"-f",
`body=${body(item)}`,
"-f",
`commit_id=${sha}`,
"-f",
`path=${item.file}`,
"-F",
`line=${Math.trunc(item.line)}`,
"-f",
"side=RIGHT",
])
})
const top = Effect.fn("Review.top")(function* (repo: string, pr: number, text: string) {
yield* gh(["pr", "comment", String(pr), "--repo", repo, "--body", text])
})
const run = Effect.fn("Review.run")(function* (input: { repo: string; pr: number; post: boolean }) {
yield* note(`loading PR #${input.pr}`)
const data = yield* gh(["api", `/repos/${input.repo}/pulls/${input.pr}`]).pipe(Effect.flatMap(pull))
const list = yield* gh(["api", `/repos/${input.repo}/pulls/${input.pr}/files?per_page=100`]).pipe(
Effect.flatMap(files),
)
const tmp = yield* fs
.makeTempDirectoryScoped({ prefix: "opencode-review-" })
.pipe(Effect.mapError(fail("create temp dir failed")))
const dir = path.join(tmp, `pr-${input.pr}`)
yield* note("preparing worktree")
yield* git(
["fetch", "origin", data.base.ref, `refs/pull/${input.pr}/head:refs/remotes/origin/pr-${input.pr}`],
root,
)
yield* Effect.acquireRelease(
git(["worktree", "add", "--detach", dir, `refs/remotes/origin/pr-${input.pr}`], root),
() => git(["worktree", "remove", "--force", dir], root).pipe(Effect.catch(() => Effect.void)),
)
const base = (yield* git(["merge-base", `origin/${data.base.ref}`, "HEAD"], dir)).trim()
const diff = yield* git(["diff", "--unified=3", `${base}...HEAD`], dir)
const box = path.join(dir, ".opencode-review")
yield* fs.makeDirectory(box, { recursive: true }).pipe(Effect.mapError(fail("create review dir failed")))
yield* sync(dir, box)
yield* fs
.writeFileString(
path.join(box, "pr.json"),
encodePullContext(
new PullContext({
repo: input.repo,
mergeBase: base,
pull: data,
}),
),
)
.pipe(Effect.mapError(fail("write pr.json failed")))
yield* fs
.writeFileString(path.join(box, "files.json"), encodePullFiles(list))
.pipe(Effect.mapError(fail("write files.json failed")))
yield* fs
.writeFileString(path.join(box, "diff.patch"), diff)
.pipe(Effect.mapError(fail("write diff.patch failed")))
const out = yield* Effect.all(
[
safe("correctness", job("correctness", pass("review-correctness", correctness(data, list), dir))),
safe("security", job("security", pass("review-security", security(data, list), dir))),
safe(
"maintainability",
job("maintainability", pass("review-maintainability", maintainability(data, list), dir)),
),
],
{ concurrency: 3 },
)
const merged = dedupe(out.flatMap((item) => [...item]))
yield* fs
.writeFileString(path.join(box, "candidates.json"), encodeFindings(merged))
.pipe(Effect.mapError(fail("write candidates.json failed")))
const kept = merged.length
? dedupe(yield* job("verifier", pass("review-verify", verify(data, merged), dir)))
: []
const ranges = new Map(list.map((item) => [item.filename, hunks(item.patch)]))
const notes = kept.filter((item) => inDiff(ranges.get(item.file), item.line))
const rest = kept.filter((item) => !inDiff(ranges.get(item.file), item.line))
if (!input.post) {
yield* Effect.sync(() => print(kept, notes, rest))
return
}
if (!kept.length) {
yield* top(input.repo, input.pr, "lgtm")
return
}
yield* Effect.all(
notes.map((item) => inline(input.repo, input.pr, data.head.sha, item)),
{ concurrency: 1 },
)
if (rest.length) yield* top(input.repo, input.pr, summary(rest))
})
return Service.of({
run: (input) => run(input).pipe(Effect.scoped),
})
}),
)
export const defaultLayer = layer.pipe(
Layer.provide(CrossSpawnSpawner.layer),
Layer.provide(NodeFileSystem.layer),
Layer.provide(NodePath.layer),
)
const { runPromise } = makeRuntime(Service, defaultLayer)
export function run(input: { repo: string; pr: number; post: boolean }) {
return runPromise((svc) => svc.run(input))
}
}
await Review.run(args)
function parse(argv: string[]) {
let repo: string | undefined
let pr: number | undefined
let post = false
for (let i = 0; i < argv.length; i++) {
const arg = argv[i]
if (arg === "--repo") repo = argv[++i]
if (arg === "--pr") pr = Number(argv[++i])
if (arg === "--post") post = true
}
if (!repo) throw new Error("Missing --repo")
if (!pr) throw new Error("Missing --pr")
return { repo, pr, post }
}
function collect(raw: string) {
const seen = new Set<string>()
const out: string[] = []
for (const row of raw.split(/\r?\n/)) {
if (!row.trim()) continue
let item: unknown
try {
item = JSON.parse(row)
} catch {
continue
}
if (!item || typeof item !== "object" || !("type" in item) || item.type !== "text") continue
if (!("part" in item) || !item.part || typeof item.part !== "object") continue
const part = item.part as { id?: string; text?: string }
if (!part.id || seen.has(part.id)) continue
seen.add(part.id)
if (typeof part.text === "string") out.push(part.text)
}
return out.join("\n")
}
function trace(agent: string, row: string) {
if (!row.trim()) return
let item: unknown
try {
item = JSON.parse(row)
} catch {
console.error(`[review:${agent}] ${row}`)
return
}
if (!item || typeof item !== "object") return
const type = "type" in item && typeof item.type === "string" ? item.type : undefined
if (!type) return
if (type === "tool_use") {
const part = "part" in item && item.part && typeof item.part === "object" ? item.part : undefined
const tool = part && "tool" in part && typeof part.tool === "string" ? part.tool : "tool"
const state = part && "state" in part && part.state && typeof part.state === "object" ? part.state : undefined
const input = state && "input" in state ? brief(state.input) : ""
console.error(`[review:${agent}] ${tool}${input ? ` ${input}` : ""}`)
return
}
if (type === "step_start") {
console.error(`[review:${agent}] step started`)
return
}
if (type === "step_finish") {
const part = "part" in item && item.part && typeof item.part === "object" ? item.part : undefined
const reason = part && "reason" in part && typeof part.reason === "string" ? part.reason : "step"
console.error(`[review:${agent}] step finished (${reason})`)
}
}
function brief(input: unknown) {
if (!input || typeof input !== "object") return ""
if ("filePath" in input && typeof input.filePath === "string") return input.filePath
if ("path" in input && typeof input.path === "string") return input.path
if ("pattern" in input && typeof input.pattern === "string") return input.pattern
if ("command" in input && typeof input.command === "string") return input.command
if ("include" in input && typeof input.include === "string") return input.include
return ""
}
function strip(text: string) {
if (!text.startsWith("```") || !text.endsWith("```")) return text
return text
.replace(/^```[a-zA-Z]*\n?/, "")
.replace(/\n?```$/, "")
.trim()
}
function correctness(data: Pull, list: readonly PullFile[]) {
return [
`Review pull request #${data.number}: ${data.title}.`,
`Base ref: ${data.base.ref}. Head ref: ${data.head.ref}.`,
`Changed files: ${list.map((item) => item.filename).join(", ")}.`,
"Read `.opencode-review/REVIEW.md` before reviewing.",
"Start with the diff. Use the rest of the repo only for targeted confirmation.",
"Avoid broad exploration. Follow direct references only.",
"Find correctness bugs, regressions, missing edge-case handling, broken invariants, and unsafe assumptions.",
"Only report issues introduced or exposed by this pull request.",
].join("\n")
}
function security(data: Pull, list: readonly PullFile[]) {
return [
`Review pull request #${data.number}: ${data.title}.`,
`Base ref: ${data.base.ref}. Head ref: ${data.head.ref}.`,
`Changed files: ${list.map((item) => item.filename).join(", ")}.`,
"Read `.opencode-review/REVIEW.md` before reviewing.",
"Start with the diff. Use the rest of the repo only for targeted confirmation.",
"Avoid broad exploration. Follow direct auth, validation, storage, or transport links only.",
"Find concrete security issues such as missing validation, unsafe auth checks, secrets exposure, or data leaks.",
"Only report issues introduced or exposed by this pull request.",
].join("\n")
}
function maintainability(data: Pull, list: readonly PullFile[]) {
return [
`Review pull request #${data.number}: ${data.title}.`,
`Base ref: ${data.base.ref}. Head ref: ${data.head.ref}.`,
`Changed files: ${list.map((item) => item.filename).join(", ")}.`,
"Read `.opencode-review/REVIEW.md` before reviewing.",
"Start with the diff. Use the rest of the repo only for targeted confirmation.",
"Avoid broad exploration. Focus on maintainability issues made visible by the changed files.",
"Find high-signal maintainability issues that clearly violate repo conventions or make future bugs likely.",
"Do not nitpick harmless style differences.",
].join("\n")
}
function verify(data: Pull, list: readonly Finding[]) {
return [
`Verify review findings for pull request #${data.number}: ${data.title}.`,
`Candidates: ${list.length}.`,
"Inspect the cited file first and expand only if needed to confirm or reject the finding.",
"Reject anything vague, duplicated, unsupported, or not attributable to the pull request.",
].join("\n")
}
function dedupe(list: readonly Finding[]) {
const seen = new Set<string>()
return order(list).filter((item) => {
const key = [item.category, item.file, Math.trunc(item.line), item.summary.trim().toLowerCase()].join(":")
if (seen.has(key)) return false
seen.add(key)
return true
})
}
function order(list: readonly Finding[]) {
const rank = {
"must-fix": 0,
"should-fix": 1,
suggestion: 2,
}
return [...list].sort((a, b) => {
const left = rank[a.severity] - rank[b.severity]
if (left) return left
return a.file.localeCompare(b.file) || a.line - b.line
})
}
function hunks(patch?: string) {
if (!patch) return [] as [number, number][]
const out: [number, number][] = []
let line = 0
let start = -1
let end = -1
for (const row of patch.split("\n")) {
if (row.startsWith("@@")) {
push(out, start, end)
start = -1
end = -1
const hit = /\+([0-9]+)(?:,([0-9]+))?/.exec(row)
line = hit ? Number(hit[1]) : 0
continue
}
if (row.startsWith("+") && !row.startsWith("+++")) {
start = start === -1 ? line : start
end = line
line += 1
continue
}
if (row.startsWith("-") && !row.startsWith("---")) continue
push(out, start, end)
start = -1
end = -1
line += 1
}
push(out, start, end)
return out
}
function push(out: [number, number][], start: number, end: number) {
if (start === -1 || end === -1) return
const prev = out.at(-1)
if (prev && prev[1] + 1 >= start) {
prev[1] = Math.max(prev[1], end)
return
}
out.push([start, end])
}
function inDiff(list: [number, number][] | undefined, line: number) {
return !!list?.some((item) => line >= item[0] && line <= item[1])
}
function body(item: Finding) {
const out = [`[${item.severity}] ${item.summary}`, "", item.evidence]
if (item.suggestion.trim()) out.push("", `Suggestion: ${item.suggestion.trim()}`)
return out.join("\n")
}
function summary(list: readonly Finding[]) {
const head = "OpenCode review found additional PR-relevant issues that could not be placed on changed lines:"
const body = order(list).map(
(item) => `- [${item.severity}] \`${item.file}:${Math.trunc(item.line)}\` ${item.summary}`,
)
return [head, "", ...body].join("\n")
}
function print(all: readonly Finding[], notes: readonly Finding[], rest: readonly Finding[]) {
console.log("# OpenCode Review")
console.log()
console.log(`- total: ${all.length}`)
console.log(`- inline-ready: ${notes.length}`)
console.log(`- summary-only: ${rest.length}`)
console.log()
for (const item of order(all)) {
console.log(`- [${item.severity}] ${item.file}:${Math.trunc(item.line)} ${item.summary}`)
console.log(` ${item.evidence}`)
if (item.suggestion.trim()) console.log(` suggestion: ${item.suggestion.trim()}`)
}
}

View File

@@ -3,6 +3,15 @@ import * as ServiceMap from "effect/ServiceMap"
export const memoMap = Layer.makeMemoMapUnsafe()
export function makeRunPromise<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {
let rt: ManagedRuntime.ManagedRuntime<I, E> | undefined
return <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) => {
rt ??= ManagedRuntime.make(layer, { memoMap })
return rt.runPromise(service.use(fn), options)
}
}
export function makeRuntime<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {
let rt: ManagedRuntime.ManagedRuntime<I, E> | undefined
const getRuntime = () => (rt ??= ManagedRuntime.make(layer, { memoMap }))

View File

@@ -24,7 +24,7 @@ import { BusEvent } from "../bus/bus-event"
import { Bus } from "@/bus"
import { TuiEvent } from "@/cli/cmd/tui/event"
import open from "open"
import { Effect, Layer, Option, ServiceMap, Stream } from "effect"
import { Effect, Exit, Layer, Option, ServiceMap, Stream } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRuntime } from "@/effect/run-service"
import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
@@ -129,6 +129,8 @@ export namespace MCP {
return typeof entry === "object" && entry !== null && "type" in entry
}
const sanitize = (s: string) => s.replace(/[^a-zA-Z0-9_-]/g, "_")
// Convert MCP tool definition to AI SDK Tool type
function convertMcpTool(mcpTool: MCPToolDef, client: MCPClient, timeout?: number): Tool {
const inputSchema = mcpTool.inputSchema
@@ -160,141 +162,157 @@ export namespace MCP {
})
}
async function defs(key: string, client: MCPClient, timeout?: number) {
const result = await withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT).catch((err) => {
log.error("failed to get tools from client", { key, error: err })
return undefined
})
return result?.tools
function defs(key: string, client: MCPClient, timeout?: number) {
return Effect.tryPromise({
try: () => withTimeout(client.listTools(), timeout ?? DEFAULT_TIMEOUT),
catch: (err) => err instanceof Error ? err : new Error(String(err)),
}).pipe(
Effect.map((result) => result.tools),
Effect.catch((err) => {
log.error("failed to get tools from client", { key, error: err })
return Effect.succeed(undefined)
}),
)
}
async function fetchFromClient<T extends { name: string }>(
function fetchFromClient<T extends { name: string }>(
clientName: string,
client: Client,
listFn: (c: Client) => Promise<T[]>,
label: string,
): Promise<Record<string, T & { client: string }> | undefined> {
const items = await listFn(client).catch((e: any) => {
log.error(`failed to get ${label}`, { clientName, error: e.message })
return undefined
})
if (!items) return undefined
const out: Record<string, T & { client: string }> = {}
const sanitizedClient = clientName.replace(/[^a-zA-Z0-9_-]/g, "_")
for (const item of items) {
const sanitizedName = item.name.replace(/[^a-zA-Z0-9_-]/g, "_")
out[sanitizedClient + ":" + sanitizedName] = { ...item, client: clientName }
}
return out
) {
return Effect.tryPromise({
try: () => listFn(client),
catch: (e: any) => {
log.error(`failed to get ${label}`, { clientName, error: e.message })
return e
},
}).pipe(
Effect.map((items) => {
const out: Record<string, T & { client: string }> = {}
const sanitizedClient = sanitize(clientName)
for (const item of items) {
out[sanitizedClient + ":" + sanitize(item.name)] = { ...item, client: clientName }
}
return out
}),
Effect.orElseSucceed(() => undefined),
)
}
async function create(key: string, mcp: Config.Mcp) {
if (mcp.enabled === false) {
log.info("mcp server disabled", { key })
return {
mcpClient: undefined,
status: { status: "disabled" as const },
}
type Transport = StdioClientTransport | StreamableHTTPClientTransport | SSEClientTransport
/**
* Connect a client via the given transport with resource safety:
* on failure the transport is closed; on success the caller owns it.
*/
const connectTransport = (transport: Transport, timeout: number) =>
Effect.acquireUseRelease(
Effect.succeed(transport),
(t) => {
const client = new Client({ name: "opencode", version: Installation.VERSION })
return Effect.tryPromise({
try: () => client.connect(t).then(() => client),
catch: (e) => (e instanceof Error ? e : new Error(String(e))),
}).pipe(
Effect.timeoutOrElse({
duration: `${timeout} millis`,
onTimeout: () => Effect.fail(new Error(`Operation timed out after ${timeout}ms`)),
}),
)
},
(t, exit) =>
Exit.isFailure(exit)
? Effect.tryPromise(() => t.close()).pipe(Effect.ignore)
: Effect.void,
)
/** Fire-and-forget Bus.publish wrapped in Effect */
const busPublish = <D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) =>
Effect.tryPromise(() => Bus.publish(def, properties)).pipe(Effect.ignore)
interface CreateResult {
mcpClient?: MCPClient
status: Status
defs?: MCPToolDef[]
}
const DISABLED_RESULT: CreateResult = { status: { status: "disabled" } }
const connectRemote = Effect.fn("MCP.connectRemote")(function* (key: string, mcp: Config.Mcp & { type: "remote" }) {
const oauthDisabled = mcp.oauth === false
const oauthConfig = typeof mcp.oauth === "object" ? mcp.oauth : undefined
let authProvider: McpOAuthProvider | undefined
if (!oauthDisabled) {
authProvider = new McpOAuthProvider(
key,
mcp.url,
{
clientId: oauthConfig?.clientId,
clientSecret: oauthConfig?.clientSecret,
scope: oauthConfig?.scope,
},
{
onRedirect: async (url) => {
log.info("oauth redirect requested", { key, url: url.toString() })
},
},
)
}
log.info("found", { key, type: mcp.type })
let mcpClient: MCPClient | undefined
let status: Status | undefined = undefined
const transports: Array<{ name: string; transport: TransportWithAuth }> = [
{
name: "StreamableHTTP",
transport: new StreamableHTTPClientTransport(new URL(mcp.url), {
authProvider,
requestInit: mcp.headers ? { headers: mcp.headers } : undefined,
}),
},
{
name: "SSE",
transport: new SSEClientTransport(new URL(mcp.url), {
authProvider,
requestInit: mcp.headers ? { headers: mcp.headers } : undefined,
}),
},
]
if (mcp.type === "remote") {
// OAuth is enabled by default for remote servers unless explicitly disabled with oauth: false
const oauthDisabled = mcp.oauth === false
const oauthConfig = typeof mcp.oauth === "object" ? mcp.oauth : undefined
let authProvider: McpOAuthProvider | undefined
const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
let lastStatus: Status | undefined
if (!oauthDisabled) {
authProvider = new McpOAuthProvider(
key,
mcp.url,
{
clientId: oauthConfig?.clientId,
clientSecret: oauthConfig?.clientSecret,
scope: oauthConfig?.scope,
},
{
onRedirect: async (url) => {
log.info("oauth redirect requested", { key, url: url.toString() })
// Store the URL - actual browser opening is handled by startAuth
},
},
)
}
const transports: Array<{ name: string; transport: TransportWithAuth }> = [
{
name: "StreamableHTTP",
transport: new StreamableHTTPClientTransport(new URL(mcp.url), {
authProvider,
requestInit: mcp.headers ? { headers: mcp.headers } : undefined,
}),
},
{
name: "SSE",
transport: new SSEClientTransport(new URL(mcp.url), {
authProvider,
requestInit: mcp.headers ? { headers: mcp.headers } : undefined,
}),
},
]
let lastError: Error | undefined
const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
for (const { name, transport } of transports) {
try {
const client = new Client({
name: "opencode",
version: Installation.VERSION,
})
await withTimeout(client.connect(transport), connectTimeout)
mcpClient = client
log.info("connected", { key, transport: name })
status = { status: "connected" }
break
} catch (error) {
lastError = error instanceof Error ? error : new Error(String(error))
// Handle OAuth-specific errors.
// The SDK throws UnauthorizedError when auth() returns 'REDIRECT',
// but may also throw plain Errors when auth() fails internally
// (e.g. during discovery, registration, or state generation).
// When an authProvider is attached, treat both cases as auth-related.
for (const { name, transport } of transports) {
const result = yield* connectTransport(transport, connectTimeout).pipe(
Effect.map((client) => ({ client, transportName: name })),
Effect.catch((error) => {
const lastError = error instanceof Error ? error : new Error(String(error))
const isAuthError =
error instanceof UnauthorizedError || (authProvider && lastError.message.includes("OAuth"))
if (isAuthError) {
log.info("mcp server requires authentication", { key, transport: name })
// Check if this is a "needs registration" error
if (lastError.message.includes("registration") || lastError.message.includes("client_id")) {
status = {
lastStatus = {
status: "needs_client_registration" as const,
error: "Server does not support dynamic client registration. Please provide clientId in config.",
}
// Show toast for needs_client_registration
Bus.publish(TuiEvent.ToastShow, {
return busPublish(TuiEvent.ToastShow, {
title: "MCP Authentication Required",
message: `Server "${key}" requires a pre-registered client ID. Add clientId to your config.`,
variant: "warning",
duration: 8000,
}).catch((e) => log.debug("failed to show toast", { error: e }))
}).pipe(Effect.as(undefined))
} else {
// Store transport for later finishAuth call
pendingOAuthTransports.set(key, transport)
status = { status: "needs_auth" as const }
// Show toast for needs_auth
Bus.publish(TuiEvent.ToastShow, {
lastStatus = { status: "needs_auth" as const }
return busPublish(TuiEvent.ToastShow, {
title: "MCP Authentication Required",
message: `Server "${key}" requires authentication. Run: opencode mcp auth ${key}`,
variant: "warning",
duration: 8000,
}).catch((e) => log.debug("failed to show toast", { error: e }))
}).pipe(Effect.as(undefined))
}
break
}
log.debug("transport connection failed", {
@@ -303,91 +321,75 @@ export namespace MCP {
url: mcp.url,
error: lastError.message,
})
status = {
status: "failed" as const,
error: lastError.message,
}
}
lastStatus = { status: "failed" as const, error: lastError.message }
return Effect.succeed(undefined)
}),
)
if (result) {
log.info("connected", { key, transport: result.transportName })
return { client: result.client as MCPClient | undefined, status: { status: "connected" } as Status }
}
// If this was an auth error, stop trying other transports
if (lastStatus?.status === "needs_auth" || lastStatus?.status === "needs_client_registration") break
}
if (mcp.type === "local") {
const [cmd, ...args] = mcp.command
const cwd = Instance.directory
const transport = new StdioClientTransport({
stderr: "pipe",
command: cmd,
args,
cwd,
env: {
...process.env,
...(cmd === "opencode" ? { BUN_BE_BUN: "1" } : {}),
...mcp.environment,
},
})
transport.stderr?.on("data", (chunk: Buffer) => {
log.info(`mcp stderr: ${chunk.toString()}`, { key })
})
return { client: undefined as MCPClient | undefined, status: (lastStatus ?? { status: "failed", error: "Unknown error" }) as Status }
})
const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
try {
const client = new Client({
name: "opencode",
version: Installation.VERSION,
})
await withTimeout(client.connect(transport), connectTimeout)
mcpClient = client
status = {
status: "connected",
}
} catch (error) {
log.error("local mcp startup failed", {
key,
command: mcp.command,
cwd,
error: error instanceof Error ? error.message : String(error),
})
status = {
status: "failed" as const,
error: error instanceof Error ? error.message : String(error),
}
}
const connectLocal = Effect.fn("MCP.connectLocal")(function* (key: string, mcp: Config.Mcp & { type: "local" }) {
const [cmd, ...args] = mcp.command
const cwd = Instance.directory
const transport = new StdioClientTransport({
stderr: "pipe",
command: cmd,
args,
cwd,
env: {
...process.env,
...(cmd === "opencode" ? { BUN_BE_BUN: "1" } : {}),
...mcp.environment,
},
})
transport.stderr?.on("data", (chunk: Buffer) => {
log.info(`mcp stderr: ${chunk.toString()}`, { key })
})
const connectTimeout = mcp.timeout ?? DEFAULT_TIMEOUT
return yield* connectTransport(transport, connectTimeout).pipe(
Effect.map((client): { client: MCPClient | undefined; status: Status } => ({ client, status: { status: "connected" } })),
Effect.catch((error): Effect.Effect<{ client: MCPClient | undefined; status: Status }> => {
const msg = error instanceof Error ? error.message : String(error)
log.error("local mcp startup failed", { key, command: mcp.command, cwd, error: msg })
return Effect.succeed({ client: undefined, status: { status: "failed", error: msg } })
}),
)
})
const create = Effect.fn("MCP.create")(function* (key: string, mcp: Config.Mcp) {
if (mcp.enabled === false) {
log.info("mcp server disabled", { key })
return DISABLED_RESULT
}
if (!status) {
status = {
status: "failed" as const,
error: "Unknown error",
}
}
log.info("found", { key, type: mcp.type })
const { client: mcpClient, status } = mcp.type === "remote"
? yield* connectRemote(key, mcp as Config.Mcp & { type: "remote" })
: yield* connectLocal(key, mcp as Config.Mcp & { type: "local" })
if (!mcpClient) {
return {
mcpClient: undefined,
status,
}
return { status } satisfies CreateResult
}
const listed = await defs(key, mcpClient, mcp.timeout)
const listed = yield* defs(key, mcpClient, mcp.timeout)
if (!listed) {
await mcpClient.close().catch((error) => {
log.error("Failed to close MCP client", {
error,
})
})
return {
mcpClient: undefined,
status: { status: "failed" as const, error: "Failed to get tools" },
}
yield* Effect.tryPromise(() => mcpClient.close()).pipe(Effect.ignore)
return { status: { status: "failed", error: "Failed to get tools" } } satisfies CreateResult
}
log.info("create() successfully created client", { key, toolCount: listed.length })
return {
mcpClient,
status,
defs: listed,
}
}
return { mcpClient, status, defs: listed } satisfies CreateResult
})
// --- Effect Service ---
@@ -463,20 +465,20 @@ export namespace MCP {
log.info("tools list changed notification received", { server: name })
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
const listed = await defs(name, client, timeout)
const listed = await Effect.runPromise(defs(name, client, timeout))
if (!listed) return
if (s.clients[name] !== client || s.status[name]?.status !== "connected") return
s.defs[name] = listed
await Bus.publish(ToolsChanged, { server: name }).catch((error) =>
log.warn("failed to publish tools changed", { server: name, error }),
)
await Effect.runPromise(busPublish(ToolsChanged, { server: name }))
})
}
const getConfig = () => Effect.promise(() => Config.get())
const cache = yield* InstanceState.make<State>(
Effect.fn("MCP.state")(function* () {
const cfg = yield* Effect.promise(() => Config.get())
const cfg = yield* getConfig()
const config = cfg.mcp ?? {}
const s: State = {
status: {},
@@ -498,13 +500,15 @@ export namespace MCP {
return
}
const result = yield* Effect.promise(() => create(key, mcp).catch(() => undefined))
const result = yield* create(key, mcp).pipe(
Effect.catch(() => Effect.succeed(undefined)),
)
if (!result) return
s.status[key] = result.status
if (result.mcpClient) {
s.clients[key] = result.mcpClient
s.defs[key] = result.defs
s.defs[key] = result.defs!
watch(s, key, result.mcpClient, mcp.timeout)
}
}),
@@ -542,14 +546,12 @@ export namespace MCP {
const client = s.clients[name]
delete s.defs[name]
if (!client) return Effect.void
return Effect.promise(() =>
client.close().catch((error: any) => log.error("failed to close MCP client", { name, error })),
)
return Effect.tryPromise(() => client.close()).pipe(Effect.ignore)
}
const status = Effect.fn("MCP.status")(function* () {
const s = yield* InstanceState.get(cache)
const cfg = yield* Effect.promise(() => Config.get())
const cfg = yield* getConfig()
const config = cfg.mcp ?? {}
const result: Record<string, Status> = {}
@@ -568,14 +570,7 @@ export namespace MCP {
const createAndStore = Effect.fn("MCP.createAndStore")(function* (name: string, mcp: Config.Mcp) {
const s = yield* InstanceState.get(cache)
const result = yield* Effect.promise(() => create(name, mcp))
if (!result) {
yield* closeClient(s, name)
delete s.clients[name]
s.status[name] = { status: "failed" as const, error: "unknown error" }
return s.status[name]
}
const result = yield* create(name, mcp)
s.status[name] = result.status
if (!result.mcpClient) {
@@ -586,7 +581,7 @@ export namespace MCP {
yield* closeClient(s, name)
s.clients[name] = result.mcpClient
s.defs[name] = result.defs
s.defs[name] = result.defs!
watch(s, name, result.mcpClient, mcp.timeout)
return result.status
})
@@ -616,7 +611,7 @@ export namespace MCP {
const tools = Effect.fn("MCP.tools")(function* () {
const result: Record<string, Tool> = {}
const s = yield* InstanceState.get(cache)
const cfg = yield* Effect.promise(() => Config.get())
const cfg = yield* getConfig()
const config = cfg.mcp ?? {}
const defaultTimeout = cfg.experimental?.mcp_timeout
@@ -639,9 +634,7 @@ export namespace MCP {
const timeout = entry?.timeout ?? defaultTimeout
for (const mcpTool of listed) {
const sanitizedClientName = clientName.replace(/[^a-zA-Z0-9_-]/g, "_")
const sanitizedToolName = mcpTool.name.replace(/[^a-zA-Z0-9_-]/g, "_")
result[sanitizedClientName + "_" + sanitizedToolName] = convertMcpTool(mcpTool, client, timeout)
result[sanitize(clientName) + "_" + sanitize(mcpTool.name)] = convertMcpTool(mcpTool, client, timeout)
}
}),
{ concurrency: "unbounded" },
@@ -649,30 +642,29 @@ export namespace MCP {
return result
})
function collectFromConnected<T>(
function collectFromConnected<T extends { name: string }>(
s: State,
fetchFn: (clientName: string, client: Client) => Promise<Record<string, T> | undefined>,
listFn: (c: Client) => Promise<T[]>,
label: string,
) {
return Effect.forEach(
Object.entries(s.clients).filter(([name]) => s.status[name]?.status === "connected"),
([clientName, client]) =>
Effect.promise(async () => Object.entries((await fetchFn(clientName, client)) ?? {})),
fetchFromClient(clientName, client, listFn, label).pipe(
Effect.map((items) => Object.entries(items ?? {})),
),
{ concurrency: "unbounded" },
).pipe(Effect.map((results) => Object.fromEntries<T>(results.flat())))
).pipe(Effect.map((results) => Object.fromEntries<T & { client: string }>(results.flat())))
}
const prompts = Effect.fn("MCP.prompts")(function* () {
const s = yield* InstanceState.get(cache)
return yield* collectFromConnected(s, (name, client) =>
fetchFromClient(name, client, (c) => c.listPrompts().then((r) => r.prompts), "prompts"),
)
return yield* collectFromConnected(s, (c) => c.listPrompts().then((r) => r.prompts), "prompts")
})
const resources = Effect.fn("MCP.resources")(function* () {
const s = yield* InstanceState.get(cache)
return yield* collectFromConnected(s, (name, client) =>
fetchFromClient(name, client, (c) => c.listResources().then((r) => r.resources), "resources"),
)
return yield* collectFromConnected(s, (c) => c.listResources().then((r) => r.resources), "resources")
})
const withClient = Effect.fnUntraced(function* <A>(
@@ -713,7 +705,7 @@ export namespace MCP {
})
const getMcpConfig = Effect.fnUntraced(function* (mcpName: string) {
const cfg = yield* Effect.promise(() => Config.get())
const cfg = yield* getConfig()
const mcpConfig = cfg.mcp?.[mcpName]
if (!mcpConfig || !isMcpConfigured(mcpConfig)) return undefined
return mcpConfig
@@ -750,19 +742,21 @@ export namespace MCP {
const transport = new StreamableHTTPClientTransport(new URL(mcpConfig.url), { authProvider })
return yield* Effect.promise(async () => {
try {
return yield* Effect.tryPromise({
try: () => {
const client = new Client({ name: "opencode", version: Installation.VERSION })
await client.connect(transport)
return { authorizationUrl: "", oauthState }
} catch (error) {
return client.connect(transport).then(() => ({ authorizationUrl: "", oauthState }))
},
catch: (error) => error,
}).pipe(
Effect.catch((error) => {
if (error instanceof UnauthorizedError && capturedUrl) {
pendingOAuthTransports.set(mcpName, transport)
return { authorizationUrl: capturedUrl.toString(), oauthState }
return Effect.succeed({ authorizationUrl: capturedUrl.toString(), oauthState })
}
throw error
}
})
return Effect.die(error)
}),
)
})
const authenticate = Effect.fn("MCP.authenticate")(function* (mcpName: string) {
@@ -791,7 +785,7 @@ export namespace MCP {
),
Effect.catch(() => {
log.warn("failed to open browser, user must open URL manually", { mcpName })
return Effect.promise(() => Bus.publish(BrowserOpenFailed, { mcpName, url: authorizationUrl }))
return busPublish(BrowserOpenFailed, { mcpName, url: authorizationUrl })
}),
)
@@ -811,10 +805,7 @@ export namespace MCP {
if (!transport) throw new Error(`No pending OAuth flow for MCP server: ${mcpName}`)
const result = yield* Effect.tryPromise({
try: async () => {
await transport.finishAuth(authorizationCode)
return true
},
try: () => transport.finishAuth(authorizationCode).then(() => true as const),
catch: (error) => {
log.error("failed to finish oauth", { mcpName, error })
return error

View File

@@ -19,9 +19,12 @@ interface MockClientState {
const clientStates = new Map<string, MockClientState>()
let lastCreatedClientName: string | undefined
let connectShouldFail = false
let connectShouldHang = false
let connectError = "Mock transport cannot connect"
// Tracks how many Client instances were created (detects leaks)
let clientCreateCount = 0
// Tracks how many times transport.close() is called across all mock transports
let transportCloseCount = 0
function getOrCreateClientState(name?: string): MockClientState {
const key = name ?? "default"
@@ -44,32 +47,42 @@ function getOrCreateClientState(name?: string): MockClientState {
return state
}
// Mock transport that succeeds or fails based on connectShouldFail
// Mock transport that succeeds or fails based on connectShouldFail / connectShouldHang
class MockStdioTransport {
stderr: null = null
pid = 12345
constructor(_opts: any) {}
async start() {
if (connectShouldHang) return new Promise<void>(() => {}) // never resolves
if (connectShouldFail) throw new Error(connectError)
if (connectShouldHang) await new Promise(() => {}) // never resolves
}
async close() {
transportCloseCount++
}
async close() {}
}
class MockStreamableHTTP {
constructor(_url: URL, _opts?: any) {}
async start() {
if (connectShouldHang) return new Promise<void>(() => {}) // never resolves
if (connectShouldFail) throw new Error(connectError)
}
async close() {}
async close() {
transportCloseCount++
}
async finishAuth() {}
}
class MockSSE {
constructor(_url: URL, _opts?: any) {}
async start() {
throw new Error("SSE fallback - not used in these tests")
if (connectShouldHang) return new Promise<void>(() => {}) // never resolves
if (connectShouldFail) throw new Error(connectError)
}
async close() {
transportCloseCount++
}
async close() {}
}
mock.module("@modelcontextprotocol/sdk/client/stdio.js", () => ({
@@ -145,8 +158,10 @@ beforeEach(() => {
clientStates.clear()
lastCreatedClientName = undefined
connectShouldFail = false
connectShouldHang = false
connectError = "Mock transport cannot connect"
clientCreateCount = 0
transportCloseCount = 0
})
// Import after mocks
@@ -658,3 +673,79 @@ test(
},
),
)
// ========================================================================
// Test: transport leak — local stdio timeout (#19168)
// ========================================================================
test(
"local stdio transport is closed when connect times out (no process leak)",
withInstance({}, async () => {
lastCreatedClientName = "hanging-server"
getOrCreateClientState("hanging-server")
connectShouldHang = true
const addResult = await MCP.add("hanging-server", {
type: "local",
command: ["node", "fake.js"],
timeout: 100,
})
const serverStatus = (addResult.status as any)["hanging-server"] ?? addResult.status
expect(serverStatus.status).toBe("failed")
expect(serverStatus.error).toContain("timed out")
// Transport must be closed to avoid orphaned child process
expect(transportCloseCount).toBeGreaterThanOrEqual(1)
}),
)
// ========================================================================
// Test: transport leak — remote timeout (#19168)
// ========================================================================
test(
"remote transport is closed when connect times out",
withInstance({}, async () => {
lastCreatedClientName = "hanging-remote"
getOrCreateClientState("hanging-remote")
connectShouldHang = true
const addResult = await MCP.add("hanging-remote", {
type: "remote",
url: "http://localhost:9999/mcp",
timeout: 100,
oauth: false,
})
const serverStatus = (addResult.status as any)["hanging-remote"] ?? addResult.status
expect(serverStatus.status).toBe("failed")
// Transport must be closed to avoid leaked HTTP connections
expect(transportCloseCount).toBeGreaterThanOrEqual(1)
}),
)
// ========================================================================
// Test: transport leak — failed remote transports not closed (#19168)
// ========================================================================
test(
"failed remote transport is closed before trying next transport",
withInstance({}, async () => {
lastCreatedClientName = "fail-remote"
getOrCreateClientState("fail-remote")
connectShouldFail = true
connectError = "Connection refused"
const addResult = await MCP.add("fail-remote", {
type: "remote",
url: "http://localhost:9999/mcp",
timeout: 5000,
oauth: false,
})
const serverStatus = (addResult.status as any)["fail-remote"] ?? addResult.status
expect(serverStatus.status).toBe("failed")
// Both StreamableHTTP and SSE transports should be closed
expect(transportCloseCount).toBeGreaterThanOrEqual(2)
}),
)