mirror of
https://github.com/anomalyco/opencode.git
synced 2026-04-24 06:45:22 +00:00
Apply PR #17640: refactor(file-time): effectify FileTimeService with Semaphore locks
This commit is contained in:
@@ -6,6 +6,7 @@ import { QuestionService } from "@/question/service"
|
||||
import { PermissionService } from "@/permission/service"
|
||||
import { FileWatcherService } from "@/file/watcher"
|
||||
import { VcsService } from "@/project/vcs"
|
||||
import { FileTimeService } from "@/file/time"
|
||||
import { Instance } from "@/project/instance"
|
||||
|
||||
export { InstanceContext } from "./instance-context"
|
||||
@@ -16,6 +17,7 @@ export type InstanceServices =
|
||||
| ProviderAuthService
|
||||
| FileWatcherService
|
||||
| VcsService
|
||||
| FileTimeService
|
||||
|
||||
function lookup(directory: string) {
|
||||
const project = Instance.project
|
||||
@@ -24,8 +26,9 @@ function lookup(directory: string) {
|
||||
Layer.fresh(QuestionService.layer),
|
||||
Layer.fresh(PermissionService.layer),
|
||||
Layer.fresh(ProviderAuthService.layer),
|
||||
Layer.fresh(FileWatcherService.layer),
|
||||
Layer.fresh(FileWatcherService.layer).pipe(Layer.orDie),
|
||||
Layer.fresh(VcsService.layer),
|
||||
Layer.fresh(FileTimeService.layer).pipe(Layer.orDie),
|
||||
).pipe(Layer.provide(ctx))
|
||||
}
|
||||
|
||||
|
||||
@@ -1,71 +1,88 @@
|
||||
import { Instance } from "../project/instance"
|
||||
import { Log } from "../util/log"
|
||||
import { Flag } from "../flag/flag"
|
||||
import { Flag } from "@/flag/flag"
|
||||
import { Filesystem } from "../util/filesystem"
|
||||
import { Effect, Layer, ServiceMap, Semaphore } from "effect"
|
||||
import { runPromiseInstance } from "@/effect/runtime"
|
||||
|
||||
export namespace FileTime {
|
||||
const log = Log.create({ service: "file.time" })
|
||||
// Per-session read times plus per-file write locks.
|
||||
// All tools that overwrite existing files should run their
|
||||
// assert/read/write/update sequence inside withLock(filepath, ...)
|
||||
// so concurrent writes to the same file are serialized.
|
||||
export const state = Instance.state(() => {
|
||||
const read: {
|
||||
[sessionID: string]: {
|
||||
[path: string]: Date | undefined
|
||||
const log = Log.create({ service: "file.time" })
|
||||
|
||||
export namespace FileTimeService {
|
||||
export interface Service {
|
||||
readonly read: (sessionID: string, file: string) => Effect.Effect<void>
|
||||
readonly get: (sessionID: string, file: string) => Effect.Effect<Date | undefined>
|
||||
readonly assert: (sessionID: string, filepath: string) => Effect.Effect<void>
|
||||
readonly withLock: <T>(filepath: string, fn: () => Promise<T>) => Effect.Effect<T>
|
||||
}
|
||||
}
|
||||
|
||||
export class FileTimeService extends ServiceMap.Service<FileTimeService, FileTimeService.Service>()(
|
||||
"@opencode/FileTime",
|
||||
) {
|
||||
static readonly layer = Layer.effect(
|
||||
FileTimeService,
|
||||
Effect.gen(function* () {
|
||||
const disableCheck = yield* Flag.OPENCODE_DISABLE_FILETIME_CHECK
|
||||
const reads: { [sessionID: string]: { [path: string]: Date | undefined } } = {}
|
||||
const locks = new Map<string, Semaphore.Semaphore>()
|
||||
|
||||
function getLock(filepath: string) {
|
||||
let lock = locks.get(filepath)
|
||||
if (!lock) {
|
||||
lock = Semaphore.makeUnsafe(1)
|
||||
locks.set(filepath, lock)
|
||||
}
|
||||
return lock
|
||||
}
|
||||
} = {}
|
||||
const locks = new Map<string, Promise<void>>()
|
||||
return {
|
||||
read,
|
||||
locks,
|
||||
}
|
||||
})
|
||||
|
||||
return FileTimeService.of({
|
||||
read: Effect.fn("FileTimeService.read")(function* (sessionID: string, file: string) {
|
||||
log.info("read", { sessionID, file })
|
||||
reads[sessionID] = reads[sessionID] || {}
|
||||
reads[sessionID][file] = new Date()
|
||||
}),
|
||||
|
||||
get: Effect.fn("FileTimeService.get")(function* (sessionID: string, file: string) {
|
||||
return reads[sessionID]?.[file]
|
||||
}),
|
||||
|
||||
assert: Effect.fn("FileTimeService.assert")(function* (sessionID: string, filepath: string) {
|
||||
if (disableCheck) return
|
||||
|
||||
const time = reads[sessionID]?.[filepath]
|
||||
if (!time) throw new Error(`You must read file ${filepath} before overwriting it. Use the Read tool first`)
|
||||
const mtime = Filesystem.stat(filepath)?.mtime
|
||||
if (mtime && mtime.getTime() > time.getTime() + 50) {
|
||||
throw new Error(
|
||||
`File ${filepath} has been modified since it was last read.\nLast modification: ${mtime.toISOString()}\nLast read: ${time.toISOString()}\n\nPlease read the file again before modifying it.`,
|
||||
)
|
||||
}
|
||||
}),
|
||||
|
||||
withLock: Effect.fn("FileTimeService.withLock")(function* <T>(filepath: string, fn: () => Promise<T>) {
|
||||
const lock = getLock(filepath)
|
||||
return yield* Effect.promise(fn).pipe(lock.withPermits(1))
|
||||
}),
|
||||
})
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
// Legacy facade — callers don't need to change
|
||||
export namespace FileTime {
|
||||
export function read(sessionID: string, file: string) {
|
||||
log.info("read", { sessionID, file })
|
||||
const { read } = state()
|
||||
read[sessionID] = read[sessionID] || {}
|
||||
read[sessionID][file] = new Date()
|
||||
// Fire-and-forget — callers never await this
|
||||
runPromiseInstance(FileTimeService.use((s) => s.read(sessionID, file)))
|
||||
}
|
||||
|
||||
export function get(sessionID: string, file: string) {
|
||||
return state().read[sessionID]?.[file]
|
||||
}
|
||||
|
||||
export async function withLock<T>(filepath: string, fn: () => Promise<T>): Promise<T> {
|
||||
const current = state()
|
||||
const currentLock = current.locks.get(filepath) ?? Promise.resolve()
|
||||
let release: () => void = () => {}
|
||||
const nextLock = new Promise<void>((resolve) => {
|
||||
release = resolve
|
||||
})
|
||||
const chained = currentLock.then(() => nextLock)
|
||||
current.locks.set(filepath, chained)
|
||||
await currentLock
|
||||
try {
|
||||
return await fn()
|
||||
} finally {
|
||||
release()
|
||||
if (current.locks.get(filepath) === chained) {
|
||||
current.locks.delete(filepath)
|
||||
}
|
||||
}
|
||||
return runPromiseInstance(FileTimeService.use((s) => s.get(sessionID, file)))
|
||||
}
|
||||
|
||||
export async function assert(sessionID: string, filepath: string) {
|
||||
if (Flag.OPENCODE_DISABLE_FILETIME_CHECK === true) {
|
||||
return
|
||||
}
|
||||
return runPromiseInstance(FileTimeService.use((s) => s.assert(sessionID, filepath)))
|
||||
}
|
||||
|
||||
const time = get(sessionID, filepath)
|
||||
if (!time) throw new Error(`You must read file ${filepath} before overwriting it. Use the Read tool first`)
|
||||
const mtime = Filesystem.stat(filepath)?.mtime
|
||||
// Allow a 50ms tolerance for Windows NTFS timestamp fuzziness / async flushing
|
||||
if (mtime && mtime.getTime() > time.getTime() + 50) {
|
||||
throw new Error(
|
||||
`File ${filepath} has been modified since it was last read.\nLast modification: ${mtime.toISOString()}\nLast read: ${time.toISOString()}\n\nPlease read the file again before modifying it.`,
|
||||
)
|
||||
}
|
||||
export async function withLock<T>(filepath: string, fn: () => Promise<T>): Promise<T> {
|
||||
return runPromiseInstance(FileTimeService.use((s) => s.withLock(filepath, fn)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,7 +72,8 @@ export class FileWatcherService extends ServiceMap.Service<FileWatcherService, F
|
||||
FileWatcherService,
|
||||
Effect.gen(function* () {
|
||||
const instance = yield* InstanceContext
|
||||
if (yield* Flag.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER) return FileWatcherService.of({ init })
|
||||
if (yield* Flag.OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER)
|
||||
return FileWatcherService.of({ init })
|
||||
|
||||
log.info("init", { directory: instance.directory })
|
||||
|
||||
|
||||
@@ -61,7 +61,9 @@ export namespace Flag {
|
||||
export const OPENCODE_EXPERIMENTAL_OXFMT = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_OXFMT")
|
||||
export const OPENCODE_EXPERIMENTAL_LSP_TY = truthy("OPENCODE_EXPERIMENTAL_LSP_TY")
|
||||
export const OPENCODE_EXPERIMENTAL_LSP_TOOL = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_LSP_TOOL")
|
||||
export const OPENCODE_DISABLE_FILETIME_CHECK = truthy("OPENCODE_DISABLE_FILETIME_CHECK")
|
||||
export const OPENCODE_DISABLE_FILETIME_CHECK = Config.boolean("OPENCODE_DISABLE_FILETIME_CHECK").pipe(
|
||||
Config.withDefault(false),
|
||||
)
|
||||
export const OPENCODE_EXPERIMENTAL_PLAN_MODE = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_PLAN_MODE")
|
||||
export const OPENCODE_EXPERIMENTAL_WORKSPACES = OPENCODE_EXPERIMENTAL || truthy("OPENCODE_EXPERIMENTAL_WORKSPACES")
|
||||
export const OPENCODE_EXPERIMENTAL_MARKDOWN = !falsy("OPENCODE_EXPERIMENTAL_MARKDOWN")
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { describe, test, expect, beforeEach } from "bun:test"
|
||||
import { describe, test, expect, afterEach } from "bun:test"
|
||||
import path from "path"
|
||||
import fs from "fs/promises"
|
||||
import { FileTime } from "../../src/file/time"
|
||||
@@ -6,6 +6,8 @@ import { Instance } from "../../src/project/instance"
|
||||
import { Filesystem } from "../../src/util/filesystem"
|
||||
import { tmpdir } from "../fixture/fixture"
|
||||
|
||||
afterEach(() => Instance.disposeAll())
|
||||
|
||||
describe("file/time", () => {
|
||||
const sessionID = "test-session-123"
|
||||
|
||||
@@ -18,12 +20,13 @@ describe("file/time", () => {
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const before = FileTime.get(sessionID, filepath)
|
||||
const before = await FileTime.get(sessionID, filepath)
|
||||
expect(before).toBeUndefined()
|
||||
|
||||
FileTime.read(sessionID, filepath)
|
||||
await Bun.sleep(10)
|
||||
|
||||
const after = FileTime.get(sessionID, filepath)
|
||||
const after = await FileTime.get(sessionID, filepath)
|
||||
expect(after).toBeInstanceOf(Date)
|
||||
expect(after!.getTime()).toBeGreaterThan(0)
|
||||
},
|
||||
@@ -40,9 +43,10 @@ describe("file/time", () => {
|
||||
fn: async () => {
|
||||
FileTime.read("session1", filepath)
|
||||
FileTime.read("session2", filepath)
|
||||
await Bun.sleep(10)
|
||||
|
||||
const time1 = FileTime.get("session1", filepath)
|
||||
const time2 = FileTime.get("session2", filepath)
|
||||
const time1 = await FileTime.get("session1", filepath)
|
||||
const time2 = await FileTime.get("session2", filepath)
|
||||
|
||||
expect(time1).toBeDefined()
|
||||
expect(time2).toBeDefined()
|
||||
@@ -59,14 +63,16 @@ describe("file/time", () => {
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
FileTime.read(sessionID, filepath)
|
||||
const first = FileTime.get(sessionID, filepath)!
|
||||
await Bun.sleep(10)
|
||||
const first = await FileTime.get(sessionID, filepath)
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 10))
|
||||
await Bun.sleep(10)
|
||||
|
||||
FileTime.read(sessionID, filepath)
|
||||
const second = FileTime.get(sessionID, filepath)!
|
||||
await Bun.sleep(10)
|
||||
const second = await FileTime.get(sessionID, filepath)
|
||||
|
||||
expect(second.getTime()).toBeGreaterThanOrEqual(first.getTime())
|
||||
expect(second!.getTime()).toBeGreaterThanOrEqual(first!.getTime())
|
||||
},
|
||||
})
|
||||
})
|
||||
@@ -82,8 +88,7 @@ describe("file/time", () => {
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
FileTime.read(sessionID, filepath)
|
||||
|
||||
// Should not throw
|
||||
await Bun.sleep(10)
|
||||
await FileTime.assert(sessionID, filepath)
|
||||
},
|
||||
})
|
||||
@@ -111,13 +116,8 @@ describe("file/time", () => {
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
FileTime.read(sessionID, filepath)
|
||||
|
||||
// Wait to ensure different timestamps
|
||||
await new Promise((resolve) => setTimeout(resolve, 100))
|
||||
|
||||
// Modify file after reading
|
||||
await Bun.sleep(100)
|
||||
await fs.writeFile(filepath, "modified content", "utf-8")
|
||||
|
||||
await expect(FileTime.assert(sessionID, filepath)).rejects.toThrow("modified since it was last read")
|
||||
},
|
||||
})
|
||||
@@ -132,7 +132,7 @@ describe("file/time", () => {
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
FileTime.read(sessionID, filepath)
|
||||
await new Promise((resolve) => setTimeout(resolve, 100))
|
||||
await Bun.sleep(100)
|
||||
await fs.writeFile(filepath, "modified", "utf-8")
|
||||
|
||||
let error: Error | undefined
|
||||
@@ -147,28 +147,6 @@ describe("file/time", () => {
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("skips check when OPENCODE_DISABLE_FILETIME_CHECK is true", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const filepath = path.join(tmp.path, "file.txt")
|
||||
await fs.writeFile(filepath, "content", "utf-8")
|
||||
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
const { Flag } = await import("../../src/flag/flag")
|
||||
const original = Flag.OPENCODE_DISABLE_FILETIME_CHECK
|
||||
;(Flag as { OPENCODE_DISABLE_FILETIME_CHECK: boolean }).OPENCODE_DISABLE_FILETIME_CHECK = true
|
||||
|
||||
try {
|
||||
// Should not throw even though file wasn't read
|
||||
await FileTime.assert(sessionID, filepath)
|
||||
} finally {
|
||||
;(Flag as { OPENCODE_DISABLE_FILETIME_CHECK: boolean }).OPENCODE_DISABLE_FILETIME_CHECK = original
|
||||
}
|
||||
},
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe("withLock()", () => {
|
||||
@@ -215,7 +193,7 @@ describe("file/time", () => {
|
||||
|
||||
const op1 = FileTime.withLock(filepath, async () => {
|
||||
order.push(1)
|
||||
await new Promise((resolve) => setTimeout(resolve, 10))
|
||||
await Bun.sleep(50)
|
||||
order.push(2)
|
||||
})
|
||||
|
||||
@@ -225,12 +203,7 @@ describe("file/time", () => {
|
||||
})
|
||||
|
||||
await Promise.all([op1, op2])
|
||||
|
||||
// Operations should be serialized
|
||||
expect(order).toContain(1)
|
||||
expect(order).toContain(2)
|
||||
expect(order).toContain(3)
|
||||
expect(order).toContain(4)
|
||||
expect(order).toEqual([1, 2, 3, 4])
|
||||
},
|
||||
})
|
||||
})
|
||||
@@ -248,8 +221,8 @@ describe("file/time", () => {
|
||||
|
||||
const op1 = FileTime.withLock(filepath1, async () => {
|
||||
started1 = true
|
||||
await new Promise((resolve) => setTimeout(resolve, 50))
|
||||
expect(started2).toBe(true) // op2 should have started while op1 is running
|
||||
await Bun.sleep(50)
|
||||
expect(started2).toBe(true)
|
||||
})
|
||||
|
||||
const op2 = FileTime.withLock(filepath2, async () => {
|
||||
@@ -257,7 +230,6 @@ describe("file/time", () => {
|
||||
})
|
||||
|
||||
await Promise.all([op1, op2])
|
||||
|
||||
expect(started1).toBe(true)
|
||||
expect(started2).toBe(true)
|
||||
},
|
||||
@@ -277,7 +249,6 @@ describe("file/time", () => {
|
||||
}),
|
||||
).rejects.toThrow("Test error")
|
||||
|
||||
// Lock should be released, subsequent operations should work
|
||||
let executed = false
|
||||
await FileTime.withLock(filepath, async () => {
|
||||
executed = true
|
||||
@@ -286,31 +257,6 @@ describe("file/time", () => {
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test("deadlocks on nested locks (expected behavior)", async () => {
|
||||
await using tmp = await tmpdir()
|
||||
const filepath = path.join(tmp.path, "file.txt")
|
||||
|
||||
await Instance.provide({
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
// Nested locks on same file cause deadlock - this is expected
|
||||
// The outer lock waits for inner to complete, but inner waits for outer to release
|
||||
const timeout = new Promise<never>((_, reject) =>
|
||||
setTimeout(() => reject(new Error("Deadlock detected")), 100),
|
||||
)
|
||||
|
||||
const nestedLock = FileTime.withLock(filepath, async () => {
|
||||
return FileTime.withLock(filepath, async () => {
|
||||
return "inner"
|
||||
})
|
||||
})
|
||||
|
||||
// Should timeout due to deadlock
|
||||
await expect(Promise.race([nestedLock, timeout])).rejects.toThrow("Deadlock detected")
|
||||
},
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe("stat() Filesystem.stat pattern", () => {
|
||||
@@ -323,12 +269,12 @@ describe("file/time", () => {
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
FileTime.read(sessionID, filepath)
|
||||
await Bun.sleep(10)
|
||||
|
||||
const stats = Filesystem.stat(filepath)
|
||||
expect(stats?.mtime).toBeInstanceOf(Date)
|
||||
expect(stats!.mtime.getTime()).toBeGreaterThan(0)
|
||||
|
||||
// FileTime.assert uses this stat internally
|
||||
await FileTime.assert(sessionID, filepath)
|
||||
},
|
||||
})
|
||||
@@ -343,11 +289,11 @@ describe("file/time", () => {
|
||||
directory: tmp.path,
|
||||
fn: async () => {
|
||||
FileTime.read(sessionID, filepath)
|
||||
await Bun.sleep(10)
|
||||
|
||||
const originalStat = Filesystem.stat(filepath)
|
||||
|
||||
// Wait and modify
|
||||
await new Promise((resolve) => setTimeout(resolve, 100))
|
||||
await Bun.sleep(100)
|
||||
await fs.writeFile(filepath, "modified", "utf-8")
|
||||
|
||||
const newStat = Filesystem.stat(filepath)
|
||||
|
||||
Reference in New Issue
Block a user