Compare commits

..

4 Commits

Author SHA1 Message Date
Kit Langton
89c0db86b9 fix(sync): restore ALS for published events 2026-04-02 00:04:56 -04:00
Kit Langton
a68395bfef fix(sync): bind transaction inside effect 2026-04-01 23:58:06 -04:00
Kit Langton
2e6d7bb517 fix(sync): keep event application synchronous 2026-04-01 23:49:22 -04:00
Kit Langton
dc719269b6 refactor(sync): effectify sync event 2026-04-01 23:31:36 -04:00
34 changed files with 238 additions and 335 deletions

View File

@@ -382,7 +382,6 @@
"tree-sitter-powershell": "0.25.10",
"turndown": "7.2.0",
"ulid": "catalog:",
"venice-ai-sdk-provider": "2.0.1",
"vscode-jsonrpc": "8.2.1",
"web-tree-sitter": "0.25.10",
"which": "6.0.1",
@@ -4756,8 +4755,6 @@
"vary": ["vary@1.1.2", "", {}, "sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg=="],
"venice-ai-sdk-provider": ["venice-ai-sdk-provider@2.0.1", "", { "dependencies": { "@ai-sdk/openai-compatible": "^2.0.37", "@ai-sdk/provider": "^3.0.8", "@ai-sdk/provider-utils": "^4.0.21" }, "peerDependencies": { "ai": "^6.0.90" } }, "sha512-6SxA8a4MoA6Q/c+D3q7My0Hfog76enN3n0MXhwosM+tso66rXBEGeBRD/0lravRDVzL2Q1w5QJPc86rAVJtfXg=="],
"verror": ["verror@1.10.1", "", { "dependencies": { "assert-plus": "^1.0.0", "core-util-is": "1.0.2", "extsprintf": "^1.2.0" } }, "sha512-veufcmxri4e3XSrT0xwfUR7kguIkaxBeosDg00yDWhk49wdwkSUrvvsm7nc75e1PUyvIeZj6nS8VQRYz2/S4Xg=="],
"vfile": ["vfile@6.0.3", "", { "dependencies": { "@types/unist": "^3.0.0", "vfile-message": "^4.0.0" } }, "sha512-KzIbH/9tXat2u30jf+smMwFCsno4wHVdNmzFyL+T/L3UGqqk6JKfVqOFOZEpZSHADH1k40ab6NUIXZq422ov3Q=="],

View File

@@ -1,8 +1,8 @@
{
"nodeModules": {
"x86_64-linux": "sha256-aUDDy/Y6qBRWbCo5oFsocii1DWSsw2qRheoKEMw5E0w=",
"aarch64-linux": "sha256-mVPve3nyrlpWkaERQfvzue8BpWgIXsIg4KnXHchYg2w=",
"aarch64-darwin": "sha256-54W7ZHqH67zib+SarZyQhP2hf18hv8fKsI+QPiUGBwY=",
"x86_64-darwin": "sha256-n1yP4kEl9+a6orKUVdeLrmSxau2+pT9dTneT8ws5Fks="
"x86_64-linux": "sha256-bjfe8/aD0hvUQQEfaNdmKV/Y3dzpf8oz1OUJdgf61WI=",
"aarch64-linux": "sha256-iU9v+ekSCB/qTUG+pOOpSMhPh+0hWnWU5jzDNllEkxU=",
"aarch64-darwin": "sha256-SgNydQLeAjbX0J49f2VKcgKg2Y30pK826R2qQJBMWE4=",
"x86_64-darwin": "sha256-/rzwNuI9x55qi0UcU7QvPUTupErmkt62T09g1omXkQk="
}
}

View File

@@ -363,8 +363,6 @@ export const dict = {
"zen.api.error.userMonthlyLimitReached":
"لقد وصلت إلى حد الإنفاق الشهري البالغ ${{amount}}. إدارة حدودك هنا: {{membersUrl}}",
"zen.api.error.modelDisabled": "النموذج معطل",
"zen.api.error.trialEnded":
"انتهى العرض المجاني لـ {{model}}. يمكنك مواصلة استخدام النموذج بالاشتراك في OpenCode Go - {{link}}",
"black.meta.title": "OpenCode Black | الوصول إلى أفضل نماذج البرمجة في العالم",
"black.meta.description": "احصل على وصول إلى Claude، GPT، Gemini والمزيد مع خطط اشتراك OpenCode Black.",

View File

@@ -371,8 +371,6 @@ export const dict = {
"zen.api.error.userMonthlyLimitReached":
"Você atingiu seu limite de gastos mensais de ${{amount}}. Gerencie seus limites aqui: {{membersUrl}}",
"zen.api.error.modelDisabled": "O modelo está desabilitado",
"zen.api.error.trialEnded":
"A promoção gratuita do {{model}} terminou. Você pode continuar usando o modelo assinando o OpenCode Go - {{link}}",
"black.meta.title": "OpenCode Black | Acesse os melhores modelos de codificação do mundo",
"black.meta.description": "Tenha acesso ao Claude, GPT, Gemini e mais com os planos de assinatura OpenCode Black.",

View File

@@ -368,8 +368,6 @@ export const dict = {
"zen.api.error.userMonthlyLimitReached":
"Du har nået din månedlige forbrugsgrænse på ${{amount}}. Administrer dine grænser her: {{membersUrl}}",
"zen.api.error.modelDisabled": "Modellen er deaktiveret",
"zen.api.error.trialEnded":
"Den gratis kampagne for {{model}} er afsluttet. Du kan fortsætte med at bruge modellen ved at abonnere på OpenCode Go - {{link}}",
"black.meta.title": "OpenCode Black | Få adgang til verdens bedste kodningsmodeller",
"black.meta.description": "Få adgang til Claude, GPT, Gemini og mere med OpenCode Black-abonnementer.",

View File

@@ -371,8 +371,6 @@ export const dict = {
"zen.api.error.userMonthlyLimitReached":
"Du hast dein monatliches Ausgabenlimit von ${{amount}} erreicht. Verwalte deine Limits hier: {{membersUrl}}",
"zen.api.error.modelDisabled": "Modell ist deaktiviert",
"zen.api.error.trialEnded":
"Die kostenlose Aktion für {{model}} ist beendet. Du kannst das Modell weiterhin nutzen, indem du OpenCode Go abonnierst - {{link}}",
"black.meta.title": "OpenCode Black | Zugriff auf die weltweit besten Coding-Modelle",
"black.meta.description": "Erhalte Zugriff auf Claude, GPT, Gemini und mehr mit OpenCode Black Abos.",

View File

@@ -364,8 +364,6 @@ export const dict = {
"zen.api.error.userMonthlyLimitReached":
"You have reached your monthly spending limit of ${{amount}}. Manage your limits here: {{membersUrl}}",
"zen.api.error.modelDisabled": "Model is disabled",
"zen.api.error.trialEnded":
"Free promotion has ended for {{model}}. You can continue using the model by subscribing to OpenCode Go - {{link}}",
"black.meta.title": "OpenCode Black | Access all the world's best coding models",
"black.meta.description": "Get access to Claude, GPT, Gemini and more with OpenCode Black subscription plans.",

View File

@@ -371,8 +371,6 @@ export const dict = {
"zen.api.error.userMonthlyLimitReached":
"Has alcanzado tu límite de gasto mensual de ${{amount}}. Gestiona tus límites aquí: {{membersUrl}}",
"zen.api.error.modelDisabled": "El modelo está deshabilitado",
"zen.api.error.trialEnded":
"La promoción gratuita de {{model}} ha finalizado. Puedes seguir usando el modelo suscribiéndote a OpenCode Go - {{link}}",
"black.meta.title": "OpenCode Black | Accede a los mejores modelos de codificación del mundo",
"black.meta.description": "Obtén acceso a Claude, GPT, Gemini y más con los planes de suscripción de OpenCode Black.",

View File

@@ -372,8 +372,6 @@ export const dict = {
"zen.api.error.userMonthlyLimitReached":
"Vous avez atteint votre limite de dépense mensuelle de {{amount}} $. Gérez vos limites ici : {{membersUrl}}",
"zen.api.error.modelDisabled": "Le modèle est désactivé",
"zen.api.error.trialEnded":
"La promotion gratuite de {{model}} est terminée. Vous pouvez continuer à utiliser le modèle en vous abonnant à OpenCode Go - {{link}}",
"black.meta.title": "OpenCode Black | Accédez aux meilleurs modèles de code au monde",
"black.meta.description": "Accédez à Claude, GPT, Gemini et plus avec les forfaits d'abonnement OpenCode Black.",

View File

@@ -367,8 +367,6 @@ export const dict = {
"zen.api.error.userMonthlyLimitReached":
"Hai raggiunto il tuo limite di spesa mensile di ${{amount}}. Gestisci i tuoi limiti qui: {{membersUrl}}",
"zen.api.error.modelDisabled": "Il modello è disabilitato",
"zen.api.error.trialEnded":
"La promozione gratuita di {{model}} è terminata. Puoi continuare a usare il modello abbonandoti a OpenCode Go - {{link}}",
"black.meta.title": "OpenCode Black | Accedi ai migliori modelli di coding al mondo",
"black.meta.description":

View File

@@ -369,8 +369,6 @@ export const dict = {
"zen.api.error.userMonthlyLimitReached":
"月額の利用上限 ${{amount}} に達しました。こちらから上限を管理してください: {{membersUrl}}",
"zen.api.error.modelDisabled": "モデルが無効です",
"zen.api.error.trialEnded":
"{{model}} の無料プロモーションは終了しました。OpenCode Go を購読するとモデルを引き続き使用できます - {{link}}",
"black.meta.title": "OpenCode Black | 世界最高峰のコーディングモデルすべてにアクセス",
"black.meta.description": "OpenCode Black サブスクリプションプランで、Claude、GPT、Gemini などにアクセス。",

View File

@@ -363,8 +363,6 @@ export const dict = {
"zen.api.error.userMonthlyLimitReached":
"월간 지출 한도인 ${{amount}}에 도달했습니다. 한도 관리를 여기서 하세요: {{membersUrl}}",
"zen.api.error.modelDisabled": "모델이 비활성화되었습니다",
"zen.api.error.trialEnded":
"{{model}}의 무료 프로모션이 종료되었습니다. OpenCode Go를 구독하면 모델을 계속 사용할 수 있습니다 - {{link}}",
"black.meta.title": "OpenCode Black | 세계 최고의 코딩 모델에 액세스하세요",
"black.meta.description": "OpenCode Black 구독 플랜으로 Claude, GPT, Gemini 등에 액세스하세요.",

View File

@@ -368,8 +368,6 @@ export const dict = {
"zen.api.error.userMonthlyLimitReached":
"Du har nådd din månedlige utgiftsgrense på ${{amount}}. Administrer grensene dine her: {{membersUrl}}",
"zen.api.error.modelDisabled": "Modellen er deaktivert",
"zen.api.error.trialEnded":
"Den gratis kampanjen for {{model}} er avsluttet. Du kan fortsette å bruke modellen ved å abonnere på OpenCode Go - {{link}}",
"black.meta.title": "OpenCode Black | Få tilgang til verdens beste kodemodeller",
"black.meta.description": "Få tilgang til Claude, GPT, Gemini og mer med OpenCode Black-abonnementer.",

View File

@@ -369,8 +369,6 @@ export const dict = {
"zen.api.error.userMonthlyLimitReached":
"Osiągnąłeś swój miesięczny limit wydatków w wysokości ${{amount}}. Zarządzaj swoimi limitami tutaj: {{membersUrl}}",
"zen.api.error.modelDisabled": "Model jest wyłączony",
"zen.api.error.trialEnded":
"Bezpłatna promocja {{model}} dobiegła końca. Możesz dalej korzystać z modelu, subskrybując OpenCode Go - {{link}}",
"black.meta.title": "OpenCode Black | Dostęp do najlepszych na świecie modeli kodujących",
"black.meta.description": "Uzyskaj dostęp do Claude, GPT, Gemini i innych dzięki planom subskrypcji OpenCode Black.",

View File

@@ -373,8 +373,6 @@ export const dict = {
"zen.api.error.userMonthlyLimitReached":
"Вы достигли ежемесячного лимита расходов в ${{amount}}. Управляйте лимитами здесь: {{membersUrl}}",
"zen.api.error.modelDisabled": "Модель отключена",
"zen.api.error.trialEnded":
"Бесплатная акция для {{model}} завершена. Вы можете продолжить использование модели, подписавшись на OpenCode Go - {{link}}",
"black.meta.title": "OpenCode Black | Доступ к лучшим моделям для кодинга в мире",
"black.meta.description": "Получите доступ к Claude, GPT, Gemini и другим моделям с подпиской OpenCode Black.",

View File

@@ -365,8 +365,6 @@ export const dict = {
"zen.api.error.userMonthlyLimitReached":
"คุณถึงขีดจำกัดการใช้จ่ายรายเดือนที่ ${{amount}} แล้ว จัดการขีดจำกัดของคุณที่นี่: {{membersUrl}}",
"zen.api.error.modelDisabled": "โมเดลถูกปิดใช้งาน",
"zen.api.error.trialEnded":
"โปรโมชันฟรีสำหรับ {{model}} สิ้นสุดแล้ว คุณสามารถใช้โมเดลต่อได้โดยสมัครสมาชิก OpenCode Go - {{link}}",
"black.meta.title": "OpenCode Black | เข้าถึงโมเดลเขียนโค้ดที่ดีที่สุดในโลก",
"black.meta.description": "เข้าถึง Claude, GPT, Gemini และอื่นๆ ด้วยแผนสมาชิก OpenCode Black",

View File

@@ -372,8 +372,6 @@ export const dict = {
"zen.api.error.userMonthlyLimitReached":
"Aylık ${{amount}} harcama limitinize ulaştınız. Limitlerinizi buradan yönetin: {{membersUrl}}",
"zen.api.error.modelDisabled": "Model devre dışı",
"zen.api.error.trialEnded":
"{{model}} için ücretsiz promosyon sona erdi. OpenCode Go'ya abone olarak modeli kullanmaya devam edebilirsiniz - {{link}}",
"black.meta.title": "OpenCode Black | Dünyanın en iyi kodlama modellerine erişin",
"black.meta.description": "OpenCode Black abonelik planlarıyla Claude, GPT, Gemini ve daha fazlasına erişin.",

View File

@@ -349,7 +349,6 @@ export const dict = {
"您的工作区已达到每月支出限额 ${{amount}}。请在此处管理您的限额:{{billingUrl}}",
"zen.api.error.userMonthlyLimitReached": "您已达到每月支出限额 ${{amount}}。请在此处管理您的限额:{{membersUrl}}",
"zen.api.error.modelDisabled": "模型已禁用",
"zen.api.error.trialEnded": "{{model}} 的限免活动已结束。您可以订阅 OpenCode Go 继续使用该模型 - {{link}}",
"black.meta.title": "OpenCode Black | 访问全球顶尖编程模型",
"black.meta.description": "通过 OpenCode Black 订阅计划使用 Claude, GPT, Gemini 等模型。",

View File

@@ -349,7 +349,6 @@ export const dict = {
"你的工作區已達到每月支出限額 ${{amount}}。請在此處管理你的限額:{{billingUrl}}",
"zen.api.error.userMonthlyLimitReached": "你已達到每月支出限額 ${{amount}}。請在此處管理你的限額:{{membersUrl}}",
"zen.api.error.modelDisabled": "模型已停用",
"zen.api.error.trialEnded": "{{model}} 的限免活动已結束。您可以訂閱 OpenCode Go 繼續使用該模型 - {{link}}",
"black.meta.title": "OpenCode Black | 存取全球最佳編碼模型",
"black.meta.description": "透過 OpenCode Black 訂閱方案存取 Claude、GPT、Gemini 等模型。",

View File

@@ -404,14 +404,6 @@ export async function handler(
}),
)
if (modelData.trialEnded)
throw new ModelError(
`${t("zen.api.error.trialEnded", {
model: modelData.name,
link: "https://opencode.ai/go",
})}`,
)
logger.metric({ model: modelId })
return { id: modelId, ...modelData }

View File

@@ -27,7 +27,6 @@ export namespace ZenData {
byokProvider: z.enum(["openai", "anthropic", "google"]).optional(),
stickyProvider: z.enum(["strict", "prefer"]).optional(),
trialProviders: z.array(z.string()).optional(),
trialEnded: z.boolean().optional(),
fallbackProvider: z.string().optional(),
rateLimit: z.number().optional(),
providers: z.array(

View File

@@ -147,7 +147,6 @@
"tree-sitter-powershell": "0.25.10",
"turndown": "7.2.0",
"ulid": "catalog:",
"venice-ai-sdk-provider": "2.0.1",
"vscode-jsonrpc": "8.2.1",
"web-tree-sitter": "0.25.10",
"which": "6.0.1",

View File

@@ -1,6 +1,5 @@
#!/usr/bin/env bun
import { Script } from "@opencode-ai/script"
import fs from "fs"
import path from "path"
import { fileURLToPath } from "url"
@@ -49,7 +48,6 @@ await Bun.build({
external: ["jsonc-parser"],
define: {
OPENCODE_MIGRATIONS: JSON.stringify(migrations),
OPENCODE_CHANNEL: `'${Script.channel}'`,
},
})

View File

@@ -120,10 +120,6 @@ class TokenRefreshRequest extends Schema.Class<TokenRefreshRequest>("TokenRefres
const clientId = "opencode-cli"
const eagerRefreshThreshold = Duration.minutes(5)
const eagerRefreshThresholdMs = Duration.toMillis(eagerRefreshThreshold)
const isTokenFresh = (tokenExpiry: number | null, now: number) =>
tokenExpiry != null && tokenExpiry > now + eagerRefreshThresholdMs
const mapAccountServiceError =
(message = "Account service operation failed") =>
@@ -223,7 +219,7 @@ export namespace Account {
const account = maybeAccount.value
const now = yield* Clock.currentTimeMillis
if (isTokenFresh(account.token_expiry, now)) {
if (account.token_expiry && account.token_expiry > now + Duration.toMillis(eagerRefreshThreshold)) {
return account.access_token
}
@@ -233,7 +229,7 @@ export namespace Account {
const resolveToken = Effect.fnUntraced(function* (row: AccountRow) {
const now = yield* Clock.currentTimeMillis
if (isTokenFresh(row.token_expiry, now)) {
if (row.token_expiry && row.token_expiry > now + Duration.toMillis(eagerRefreshThreshold)) {
return row.access_token
}

View File

@@ -12,4 +12,3 @@ Focus on information that would be helpful for continuing the conversation, incl
Your summary should be comprehensive enough to provide context but concise enough to be quickly understood.
Do not respond to any questions in the conversation, only output the summary.
Respond in the same language the user used in the conversation.

View File

@@ -1,7 +1,6 @@
import z from "zod"
import { EOL } from "os"
import { NamedError } from "@opencode-ai/util/error"
import { logo as glyphs } from "./logo"
export namespace UI {
const wordmark = [
@@ -48,60 +47,12 @@ export namespace UI {
}
export function logo(pad?: string) {
if (!process.stdout.isTTY && !process.stderr.isTTY) {
const result = []
for (const row of wordmark) {
if (pad) result.push(pad)
result.push(row)
result.push(EOL)
}
return result.join("").trimEnd()
}
const result: string[] = []
const reset = "\x1b[0m"
const left = {
fg: "\x1b[90m",
shadow: "\x1b[38;5;235m",
bg: "\x1b[48;5;235m",
}
const right = {
fg: reset,
shadow: "\x1b[38;5;238m",
bg: "\x1b[48;5;238m",
}
const gap = " "
const draw = (line: string, fg: string, shadow: string, bg: string) => {
const parts: string[] = []
for (const char of line) {
if (char === "_") {
parts.push(bg, " ", reset)
continue
}
if (char === "^") {
parts.push(fg, bg, "▀", reset)
continue
}
if (char === "~") {
parts.push(shadow, "▀", reset)
continue
}
if (char === " ") {
parts.push(" ")
continue
}
parts.push(fg, char, reset)
}
return parts.join("")
}
glyphs.left.forEach((row, index) => {
const result = []
for (const row of wordmark) {
if (pad) result.push(pad)
result.push(draw(row, left.fg, left.shadow, left.bg))
result.push(gap)
const other = glyphs.right[index] ?? ""
result.push(draw(other, right.fg, right.shadow, right.bg))
result.push(row)
result.push(EOL)
})
}
return result.join("").trimEnd()
}

View File

@@ -48,19 +48,7 @@ process.on("uncaughtException", (e) => {
})
})
const args = hideBin(process.argv)
function show(out: string) {
const text = out.trimStart()
if (!text.startsWith("opencode ")) {
process.stderr.write(UI.logo() + EOL + EOL)
process.stderr.write(text)
return
}
process.stderr.write(out)
}
const cli = yargs(args)
const cli = yargs(hideBin(process.argv))
.parserConfiguration({ "populate--": true })
.scriptName("opencode")
.wrap(100)
@@ -142,7 +130,7 @@ const cli = yargs(args)
process.stderr.write("Database migration complete." + EOL)
}
})
.usage("")
.usage("\n" + UI.logo())
.completion("completion", "generate shell completion script")
.command(AcpCommand)
.command(McpCommand)
@@ -174,7 +162,7 @@ const cli = yargs(args)
msg?.startsWith("Invalid values:")
) {
if (err) throw err
cli.showHelp(show)
cli.showHelp("log")
}
if (err) throw err
process.exit(1)
@@ -182,15 +170,7 @@ const cli = yargs(args)
.strict()
try {
if (args.includes("-h") || args.includes("--help")) {
await cli.parse(args, (err: Error | undefined, _argv: unknown, out: string) => {
if (err) throw err
if (!out) return
show(out)
})
} else {
await cli.parse()
}
await cli.parse()
} catch (e) {
let data: Record<string, any> = {}
if (e instanceof NamedError) {

View File

@@ -44,7 +44,6 @@ import { createGateway } from "@ai-sdk/gateway"
import { createTogetherAI } from "@ai-sdk/togetherai"
import { createPerplexity } from "@ai-sdk/perplexity"
import { createVercel } from "@ai-sdk/vercel"
import { createVenice } from "venice-ai-sdk-provider"
import {
createGitLab,
VERSION as GITLAB_PROVIDER_VERSION,
@@ -140,7 +139,6 @@ export namespace Provider {
"@ai-sdk/vercel": createVercel,
"gitlab-ai-provider": createGitLab,
"@ai-sdk/github-copilot": createGitHubCopilotOpenAICompatible,
"venice-ai-sdk-provider": createVenice,
}
type CustomModelLoader = (sdk: any, modelID: string, options?: Record<string, any>) => Promise<any>

View File

@@ -190,7 +190,6 @@ export namespace SessionCompaction {
Focus on information that would be helpful for continuing the conversation, including what we did, what we're doing, which files we're working on, and what we're going to do next.
The summary that you construct will be used so that another agent can read it and continue the work.
Do not call any tools. Respond only with the summary text.
Respond in the same language as the user's messages in the conversation.
When constructing the summary, try to stick to this template:
---

View File

@@ -1362,18 +1362,9 @@ NOTE: At any point in time through this workflow you should feel free to ask the
}
if (!lastUser) throw new Error("No user message found in stream. This should never happen.")
const lastAssistantMsg = msgs.findLast(
(msg) => msg.info.role === "assistant" && msg.info.id === lastAssistant?.id,
)
// Some providers return "stop" even when the assistant message contains tool calls.
// Keep the loop running so tool results can be sent back to the model.
const hasToolCalls = lastAssistantMsg?.parts.some((part) => part.type === "tool") ?? false
if (
lastAssistant?.finish &&
!["tool-calls"].includes(lastAssistant.finish) &&
!hasToolCalls &&
lastUser.id < lastAssistant.id
) {
log.info("exiting loop", { sessionID })

View File

@@ -54,7 +54,7 @@ export namespace SessionRetry {
if (MessageV2.APIError.isInstance(error)) {
if (!error.data.isRetryable) return undefined
if (error.data.responseBody?.includes("FreeUsageLimitError"))
return `Free usage exceeded, subscribe to Go https://opencode.ai/go`
return `Free usage exceeded, add credits https://opencode.ai/zen`
return error.data.message.includes("Overloaded") ? "Provider is overloaded" : error.data.message
}

View File

@@ -1,6 +1,9 @@
import z from "zod"
import type { ZodObject } from "zod"
import { EventEmitter } from "events"
import { Effect, Layer, ServiceMap } from "effect"
import { InstanceState } from "@/effect/instance-state"
import { makeRuntime } from "@/effect/run-service"
import { Database, eq } from "@/storage/db"
import { Bus as ProjectBus } from "@/bus"
import { BusEvent } from "@/bus/bus-event"
@@ -31,37 +34,18 @@ export namespace SyncEvent {
type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
export const registry = new Map<string, Definition>()
let projectors: Map<Definition, ProjectorFunc> | undefined
const versions = new Map<string, number>()
let frozen = false
let convertEvent: (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
const Bus = new EventEmitter<{ event: [{ def: Definition; event: Event }] }>()
export function reset() {
frozen = false
projectors = undefined
convertEvent = (_, data) => data
type State = {
projectors: Map<Definition, ProjectorFunc> | undefined
convert: (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
bus: EventEmitter<{ event: [{ def: Definition; event: Event }] }>
}
export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: typeof convertEvent }) {
projectors = new Map(input.projectors)
export const registry = new Map<string, Definition>()
const versions = new Map<string, number>()
let frozen = false
// Install all the latest event defs to the bus. We only ever emit
// latest versions from code, and keep around old versions for
// replaying. Replaying does not go through the bus, and it
// simplifies the bus to only use unversioned latest events
for (let [type, version] of versions.entries()) {
let def = registry.get(versionedType(type, version))!
BusEvent.define(def.type, def.properties || def.schema)
}
// Freeze the system so it clearly errors if events are defined
// after `init` which would cause bugs
frozen = true
convertEvent = input.convertEvent || ((_, data) => data)
function noop(_: string, data: Event["data"]) {
return data
}
export function versionedType<A extends string>(type: A): A
@@ -102,140 +86,224 @@ export namespace SyncEvent {
return [def, func as ProjectorFunc]
}
function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
if (projectors == null) {
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
}
const projector = projectors.get(def)
if (!projector) {
throw new Error(`Projector not found for event: ${def.type}`)
}
// idempotent: need to ignore any events already logged
Database.transaction((tx) => {
projector(tx, event.data)
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
tx.insert(EventSequenceTable)
.values({
aggregate_id: event.aggregateID,
seq: event.seq,
})
.onConflictDoUpdate({
target: EventSequenceTable.aggregate_id,
set: { seq: event.seq },
})
.run()
tx.insert(EventTable)
.values({
id: event.id,
seq: event.seq,
aggregate_id: event.aggregateID,
type: versionedType(def.type, def.version),
data: event.data as Record<string, unknown>,
})
.run()
}
Database.effect(() => {
Bus.emit("event", {
def,
event,
})
if (options?.publish) {
const result = convertEvent(def.type, event.data)
if (result instanceof Promise) {
result.then((data) => {
ProjectBus.publish({ type: def.type, properties: def.schema }, data)
})
} else {
ProjectBus.publish({ type: def.type, properties: def.schema }, result)
}
}
})
})
export interface Interface {
readonly reset: () => Effect.Effect<void>
readonly init: (input: {
projectors: Array<[Definition, ProjectorFunc]>
convertEvent?: State["convert"]
}) => Effect.Effect<void>
readonly replay: (event: SerializedEvent, options?: { republish: boolean }) => Effect.Effect<void>
readonly run: <Def extends Definition>(def: Def, data: Event<Def>["data"]) => Effect.Effect<void>
readonly remove: (aggregateID: string) => Effect.Effect<void>
readonly subscribeAll: (handler: (event: { def: Definition; event: Event }) => void) => Effect.Effect<() => void>
}
// TODO:
//
// * Support applying multiple events at one time. One transaction,
// and it validets all the sequence ids
// * when loading events from db, apply zod validation to ensure shape
export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/SyncEvent") {}
export const layer = Layer.effect(
Service,
Effect.gen(function* () {
const state: State = {
projectors: undefined,
convert: noop,
bus: new EventEmitter<{ event: [{ def: Definition; event: Event }] }>(),
}
function process<Def extends Definition>(def: Def, event: Event<Def>, options: { publish: boolean }) {
if (state.projectors == null) {
throw new Error("No projectors available. Call `SyncEvent.init` to install projectors")
}
const projector = state.projectors.get(def)
if (!projector) {
throw new Error(`Projector not found for event: ${def.type}`)
}
Database.transaction((tx) => {
projector(tx, event.data)
if (Flag.OPENCODE_EXPERIMENTAL_WORKSPACES) {
tx.insert(EventSequenceTable)
.values({
aggregate_id: event.aggregateID,
seq: event.seq,
})
.onConflictDoUpdate({
target: EventSequenceTable.aggregate_id,
set: { seq: event.seq },
})
.run()
tx.insert(EventTable)
.values({
id: event.id,
seq: event.seq,
aggregate_id: event.aggregateID,
type: versionedType(def.type, def.version),
data: event.data as Record<string, unknown>,
})
.run()
}
Database.effect(() => {
state.bus.emit("event", { def, event })
if (!options.publish) return
const result = state.convert(def.type, event.data)
if (result instanceof Promise) {
result.then((data) => {
ProjectBus.publish({ type: def.type, properties: def.schema }, data)
})
return
}
ProjectBus.publish({ type: def.type, properties: def.schema }, result)
})
})
}
const reset = Effect.fn("SyncEvent.reset")(() =>
Effect.sync(() => {
frozen = false
state.projectors = undefined
state.convert = noop
}),
)
const init = Effect.fn("SyncEvent.init")(
(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: State["convert"] }) =>
Effect.sync(() => {
state.projectors = new Map(input.projectors)
for (const [type, version] of versions.entries()) {
const def = registry.get(versionedType(type, version))!
BusEvent.define(def.type, def.properties)
}
frozen = true
state.convert = input.convertEvent || noop
}),
)
// TODO:
//
// * Support applying multiple events at one time. One transaction,
// and it validets all the sequence ids
// * when loading events from db, apply zod validation to ensure shape
const replay = Effect.fn("SyncEvent.replay")(function* (
event: SerializedEvent,
options?: { republish: boolean },
) {
const def = registry.get(event.type)
if (!def) {
throw new Error(`Unknown event type: ${event.type}`)
}
const row = Database.use((db) =>
db
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
.get(),
)
const latest = row?.seq ?? -1
if (event.seq <= latest) {
return
}
const expected = latest + 1
if (event.seq !== expected) {
throw new Error(
`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`,
)
}
yield* InstanceState.withALS(() => process(def, event, { publish: !!options?.republish }))
})
const run: Interface["run"] = Effect.fn("SyncEvent.run")(function* <Def extends Definition>(
def: Def,
data: Event<Def>["data"],
) {
const agg = (data as Record<string, string>)[def.aggregate]
if (agg == null) {
throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
}
if (def.version !== versions.get(def.type)) {
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
}
yield* InstanceState.withALS(() =>
Database.transaction(
(tx) => {
const id = EventID.ascending()
const row = tx
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, agg))
.get()
const seq = row?.seq != null ? row.seq + 1 : 0
const event = { id, seq, aggregateID: agg, data }
process(def, event, { publish: true })
},
{
behavior: "immediate",
},
),
)
})
const remove = Effect.fn("SyncEvent.remove")((aggregateID: string) =>
Effect.sync(() => {
Database.transaction((tx) => {
tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
})
}),
)
const subscribeAll = Effect.fn("SyncEvent.subscribeAll")(
(handler: (event: { def: Definition; event: Event }) => void) =>
Effect.sync(() => {
state.bus.on("event", handler)
return () => state.bus.off("event", handler)
}),
)
return Service.of({ reset, init, replay, run, remove, subscribeAll })
}),
)
export const defaultLayer = layer
const { runSync } = makeRuntime(Service, defaultLayer)
export function reset() {
return runSync((svc) => svc.reset())
}
export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; convertEvent?: State["convert"] }) {
return runSync((svc) => svc.init(input))
}
export function replay(event: SerializedEvent, options?: { republish: boolean }) {
const def = registry.get(event.type)
if (!def) {
throw new Error(`Unknown event type: ${event.type}`)
}
const row = Database.use((db) =>
db
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, event.aggregateID))
.get(),
)
const latest = row?.seq ?? -1
if (event.seq <= latest) {
return
}
const expected = latest + 1
if (event.seq !== expected) {
throw new Error(`Sequence mismatch for aggregate "${event.aggregateID}": expected ${expected}, got ${event.seq}`)
}
process(def, event, { publish: !!options?.republish })
return runSync((svc) => svc.replay(event, options))
}
export function run<Def extends Definition>(def: Def, data: Event<Def>["data"]) {
const agg = (data as Record<string, string>)[def.aggregate]
// This should never happen: we've enforced it via typescript in
// the definition
if (agg == null) {
throw new Error(`SyncEvent.run: "${def.aggregate}" required but not found: ${JSON.stringify(data)}`)
}
if (def.version !== versions.get(def.type)) {
throw new Error(`SyncEvent.run: running old versions of events is not allowed: ${def.type}`)
}
// Note that this is an "immediate" transaction which is critical.
// We need to make sure we can safely read and write with nothing
// else changing the data from under us
Database.transaction(
(tx) => {
const id = EventID.ascending()
const row = tx
.select({ seq: EventSequenceTable.seq })
.from(EventSequenceTable)
.where(eq(EventSequenceTable.aggregate_id, agg))
.get()
const seq = row?.seq != null ? row.seq + 1 : 0
const event = { id, seq, aggregateID: agg, data }
process(def, event, { publish: true })
},
{
behavior: "immediate",
},
)
return runSync((svc) => svc.run(def, data))
}
export function remove(aggregateID: string) {
Database.transaction((tx) => {
tx.delete(EventSequenceTable).where(eq(EventSequenceTable.aggregate_id, aggregateID)).run()
tx.delete(EventTable).where(eq(EventTable.aggregate_id, aggregateID)).run()
})
return runSync((svc) => svc.remove(aggregateID))
}
export function subscribeAll(handler: (event: { def: Definition; event: Event }) => void) {
Bus.on("event", handler)
return () => Bus.off("event", handler)
return runSync((svc) => svc.subscribeAll(handler))
}
export function payloads() {

View File

@@ -18,9 +18,6 @@ const truncate = Layer.effectDiscard(
const it = testEffect(Layer.merge(AccountRepo.layer, truncate))
const insideEagerRefreshWindow = Duration.toMillis(Duration.minutes(1))
const outsideEagerRefreshWindow = Duration.toMillis(Duration.minutes(10))
const live = (client: HttpClient.HttpClient) =>
Account.layer.pipe(Layer.provide(Layer.succeed(HttpClient.HttpClient, client)))
@@ -66,7 +63,7 @@ it.live("orgsByAccount groups orgs per account", () =>
url: "https://one.example.com",
accessToken: AccessToken.make("at_1"),
refreshToken: RefreshToken.make("rt_1"),
expiry: Date.now() + outsideEagerRefreshWindow,
expiry: Date.now() + 10 * 60_000,
orgID: Option.none(),
}),
)
@@ -78,7 +75,7 @@ it.live("orgsByAccount groups orgs per account", () =>
url: "https://two.example.com",
accessToken: AccessToken.make("at_2"),
refreshToken: RefreshToken.make("rt_2"),
expiry: Date.now() + outsideEagerRefreshWindow,
expiry: Date.now() + 10 * 60_000,
orgID: Option.none(),
}),
)
@@ -162,7 +159,7 @@ it.live("token refreshes before expiry when inside the eager refresh window", ()
url: "https://one.example.com",
accessToken: AccessToken.make("at_old"),
refreshToken: RefreshToken.make("rt_old"),
expiry: Date.now() + insideEagerRefreshWindow,
expiry: Date.now() + 60_000,
orgID: Option.none(),
}),
)
@@ -270,7 +267,7 @@ it.live("config sends the selected org header", () =>
url: "https://one.example.com",
accessToken: AccessToken.make("at_1"),
refreshToken: RefreshToken.make("rt_1"),
expiry: Date.now() + outsideEagerRefreshWindow,
expiry: Date.now() + 10 * 60_000,
orgID: Option.none(),
}),
)

View File

@@ -3,6 +3,7 @@ import { expect, spyOn } from "bun:test"
import { Cause, Effect, Exit, Fiber, Layer } from "effect"
import path from "path"
import z from "zod"
import type { Agent } from "../../src/agent/agent"
import { Agent as AgentSvc } from "../../src/agent/agent"
import { Bus } from "../../src/bus"
import { Command } from "../../src/command"
@@ -34,7 +35,7 @@ import { Log } from "../../src/util/log"
import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
import { provideTmpdirInstance, provideTmpdirServer } from "../fixture/fixture"
import { testEffect } from "../lib/effect"
import { reply, TestLLMServer } from "../lib/llm-server"
import { TestLLMServer } from "../lib/llm-server"
Log.init({ print: false })
@@ -452,36 +453,6 @@ it.live("loop continues when finish is tool-calls", () =>
),
)
it.live("loop continues when finish is stop but assistant has tool parts", () =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {
const prompt = yield* SessionPrompt.Service
const sessions = yield* Session.Service
const session = yield* sessions.create({
title: "Pinned",
permission: [{ permission: "*", pattern: "*", action: "allow" }],
})
yield* prompt.prompt({
sessionID: session.id,
agent: "build",
noReply: true,
parts: [{ type: "text", text: "hello" }],
})
yield* llm.push(reply().tool("first", { value: "first" }).stop())
yield* llm.text("second")
const result = yield* prompt.loop({ sessionID: session.id })
expect(yield* llm.calls).toBe(2)
expect(result.info.role).toBe("assistant")
if (result.info.role === "assistant") {
expect(result.parts.some((part) => part.type === "text" && part.text === "second")).toBe(true)
expect(result.info.finish).toBe("stop")
}
}),
{ git: true, config: providerCfg },
),
)
it.live("failed subtask preserves metadata on error tool state", () =>
provideTmpdirServer(
Effect.fnUntraced(function* ({ llm }) {