mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-19 02:53:08 +00:00
feat(core): exponential backoff of workspace reconnect (#23083)
This commit is contained in:
@@ -229,6 +229,10 @@ export function DialogWorkspaceCreate(props: { onSelect: (workspaceID: string) =
|
||||
})
|
||||
|
||||
const result = await sdk.client.experimental.workspace.create({ type, branch: null }).catch((err) => {
|
||||
toast.show({
|
||||
message: "Creating workspace failed",
|
||||
variant: "error",
|
||||
})
|
||||
log.error("workspace create request failed", {
|
||||
type,
|
||||
error: errorData(err),
|
||||
|
||||
@@ -34,7 +34,6 @@ export type Info = z.infer<typeof Info>
|
||||
export const ConnectionStatus = z.object({
|
||||
workspaceID: WorkspaceID.zod,
|
||||
status: z.enum(["connected", "connecting", "disconnected", "error"]),
|
||||
error: z.string().optional(),
|
||||
})
|
||||
export type ConnectionStatus = z.infer<typeof ConnectionStatus>
|
||||
|
||||
@@ -345,10 +344,10 @@ const connections = new Map<WorkspaceID, ConnectionStatus>()
|
||||
const aborts = new Map<WorkspaceID, AbortController>()
|
||||
const TIMEOUT = 5000
|
||||
|
||||
function setStatus(id: WorkspaceID, status: ConnectionStatus["status"], error?: string) {
|
||||
function setStatus(id: WorkspaceID, status: ConnectionStatus["status"]) {
|
||||
const prev = connections.get(id)
|
||||
if (prev?.status === status && prev?.error === error) return
|
||||
const next = { workspaceID: id, status, error }
|
||||
if (prev?.status === status) return
|
||||
const next = { workspaceID: id, status }
|
||||
connections.set(id, next)
|
||||
|
||||
if (status === "error") {
|
||||
@@ -425,68 +424,78 @@ function route(url: string | URL, path: string) {
|
||||
return next
|
||||
}
|
||||
|
||||
async function syncWorkspace(space: Info, signal: AbortSignal) {
|
||||
async function connectSSE(url: URL | string, headers: HeadersInit | undefined, signal: AbortSignal) {
|
||||
const res = await fetch(route(url, "/global/event"), {
|
||||
method: "GET",
|
||||
headers,
|
||||
signal,
|
||||
})
|
||||
|
||||
if (!res.ok) throw new Error(`Workspace sync HTTP failure: ${res.status}`)
|
||||
if (!res.body) throw new Error("No response body from global sync")
|
||||
|
||||
return res.body
|
||||
}
|
||||
|
||||
async function syncWorkspaceLoop(space: Info, signal: AbortSignal) {
|
||||
const adaptor = await getAdaptor(space.projectID, space.type)
|
||||
const target = await adaptor.target(space)
|
||||
|
||||
if (target.type === "local") return null
|
||||
|
||||
let attempt = 0
|
||||
|
||||
while (!signal.aborted) {
|
||||
log.info("connecting to global sync", { workspace: space.name })
|
||||
setStatus(space.id, "connecting")
|
||||
|
||||
const adaptor = await getAdaptor(space.projectID, space.type)
|
||||
const target = await adaptor.target(space)
|
||||
|
||||
if (target.type === "local") return
|
||||
|
||||
const res = await fetch(route(target.url, "/global/event"), {
|
||||
method: "GET",
|
||||
headers: target.headers,
|
||||
signal,
|
||||
}).catch((err: unknown) => {
|
||||
setStatus(space.id, "error", err instanceof Error ? err.message : String(err))
|
||||
|
||||
let stream
|
||||
try {
|
||||
stream = await connectSSE(target.url, target.headers, signal)
|
||||
} catch (err) {
|
||||
setStatus(space.id, "error")
|
||||
log.info("failed to connect to global sync", {
|
||||
workspace: space.name,
|
||||
error: err,
|
||||
err,
|
||||
})
|
||||
return undefined
|
||||
})
|
||||
|
||||
if (!res || !res.ok || !res.body) {
|
||||
const error = !res ? "No response from global sync" : `Global sync HTTP ${res.status}`
|
||||
log.info("failed to connect to global sync", { workspace: space.name, error })
|
||||
setStatus(space.id, "error", error)
|
||||
await sleep(1000)
|
||||
continue
|
||||
}
|
||||
|
||||
log.info("global sync connected", { workspace: space.name })
|
||||
setStatus(space.id, "connected")
|
||||
if (stream) {
|
||||
attempt = 0
|
||||
|
||||
await parseSSE(res.body, signal, (evt: any) => {
|
||||
try {
|
||||
if (!("payload" in evt)) return
|
||||
log.info("global sync connected", { workspace: space.name })
|
||||
setStatus(space.id, "connected")
|
||||
|
||||
if (evt.payload.type === "sync") {
|
||||
SyncEvent.replay(evt.payload.syncEvent as SyncEvent.SerializedEvent)
|
||||
await parseSSE(stream, signal, (evt: any) => {
|
||||
try {
|
||||
if (!("payload" in evt)) return
|
||||
|
||||
if (evt.payload.type === "sync") {
|
||||
SyncEvent.replay(evt.payload.syncEvent as SyncEvent.SerializedEvent)
|
||||
}
|
||||
|
||||
GlobalBus.emit("event", {
|
||||
directory: evt.directory,
|
||||
project: evt.project,
|
||||
workspace: space.id,
|
||||
payload: evt.payload,
|
||||
})
|
||||
} catch (err) {
|
||||
log.info("failed to replay global event", {
|
||||
workspaceID: space.id,
|
||||
error: err,
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
GlobalBus.emit("event", {
|
||||
directory: evt.directory,
|
||||
project: evt.project,
|
||||
workspace: space.id,
|
||||
payload: evt.payload,
|
||||
})
|
||||
} catch (err) {
|
||||
log.info("failed to replay global event", {
|
||||
workspaceID: space.id,
|
||||
error: err,
|
||||
})
|
||||
}
|
||||
})
|
||||
log.info("disconnected from global sync: " + space.id)
|
||||
setStatus(space.id, "disconnected")
|
||||
}
|
||||
|
||||
log.info("disconnected from global sync: " + space.id)
|
||||
setStatus(space.id, "disconnected")
|
||||
|
||||
// TODO: Implement exponential backoff
|
||||
await sleep(1000)
|
||||
// Back off reconnect attempts up to 2 minutes while the workspace
|
||||
// stays unavailable.
|
||||
await sleep(Math.min(120_000, 1_000 * 2 ** attempt))
|
||||
attempt += 1
|
||||
}
|
||||
}
|
||||
|
||||
@@ -498,7 +507,7 @@ async function startSync(space: Info) {
|
||||
|
||||
if (target.type === "local") {
|
||||
void Filesystem.exists(target.directory).then((exists) => {
|
||||
setStatus(space.id, exists ? "connected" : "error", exists ? undefined : "directory does not exist")
|
||||
setStatus(space.id, exists ? "connected" : "error")
|
||||
})
|
||||
return
|
||||
}
|
||||
@@ -510,10 +519,10 @@ async function startSync(space: Info) {
|
||||
const abort = new AbortController()
|
||||
aborts.set(space.id, abort)
|
||||
|
||||
void syncWorkspace(space, abort.signal).catch((error) => {
|
||||
void syncWorkspaceLoop(space, abort.signal).catch((error) => {
|
||||
aborts.delete(space.id)
|
||||
|
||||
setStatus(space.id, "error", String(error))
|
||||
setStatus(space.id, "error")
|
||||
log.warn("workspace listener failed", {
|
||||
workspaceID: space.id,
|
||||
error,
|
||||
|
||||
Reference in New Issue
Block a user