mirror of
https://github.com/anomalyco/opencode.git
synced 2026-04-24 06:45:22 +00:00
core: significantly speed up data migration by optimizing SQLite settings and batch processing
Reduces migration time from minutes to seconds by enabling WAL mode, increasing batch size to 1000, and pre-scanning files upfront. Users upgrading to the SQLite backend will now see much faster startup times when their existing data is migrated.
This commit is contained in:
@@ -14,8 +14,15 @@ export namespace JsonMigration {
|
||||
const storageDir = path.join(Global.Path.data, "storage")
|
||||
|
||||
log.info("starting json to sqlite migration", { storageDir })
|
||||
const start = performance.now()
|
||||
|
||||
const db = drizzle({ client: sqlite })
|
||||
|
||||
// Optimize SQLite for bulk inserts
|
||||
sqlite.exec("PRAGMA journal_mode = WAL")
|
||||
sqlite.exec("PRAGMA synchronous = OFF")
|
||||
sqlite.exec("PRAGMA cache_size = 10000")
|
||||
sqlite.exec("PRAGMA temp_store = MEMORY")
|
||||
const stats = {
|
||||
projects: 0,
|
||||
sessions: 0,
|
||||
@@ -26,8 +33,16 @@ export namespace JsonMigration {
|
||||
shares: 0,
|
||||
errors: [] as string[],
|
||||
}
|
||||
const orphans = {
|
||||
sessions: 0,
|
||||
todos: 0,
|
||||
permissions: 0,
|
||||
shares: 0,
|
||||
}
|
||||
const errs = stats.errors
|
||||
|
||||
const limit = 32
|
||||
const batchSize = 1000
|
||||
const now = Date.now()
|
||||
|
||||
async function list(pattern: string) {
|
||||
const items: string[] = []
|
||||
@@ -38,74 +53,112 @@ export namespace JsonMigration {
|
||||
return items
|
||||
}
|
||||
|
||||
async function read(files: string[]) {
|
||||
const results = await Promise.allSettled(files.map((file) => Bun.file(file).json()))
|
||||
const items: { file: string; data: any }[] = []
|
||||
async function read(files: string[], start: number, end: number) {
|
||||
const count = end - start
|
||||
const tasks = new Array(count)
|
||||
for (let i = 0; i < count; i++) {
|
||||
tasks[i] = Bun.file(files[start + i]).json()
|
||||
}
|
||||
const results = await Promise.allSettled(tasks)
|
||||
const items = new Array(count)
|
||||
for (let i = 0; i < results.length; i++) {
|
||||
const result = results[i]
|
||||
const file = files[i]
|
||||
if (result.status === "fulfilled") {
|
||||
items.push({ file, data: result.value })
|
||||
items[i] = result.value
|
||||
continue
|
||||
}
|
||||
stats.errors.push(`failed to read ${file}: ${result.reason}`)
|
||||
errs.push(`failed to read ${files[start + i]}: ${result.reason}`)
|
||||
}
|
||||
return items
|
||||
}
|
||||
|
||||
function insert(values: any[], table: any, label: string) {
|
||||
if (values.length === 0) return 0
|
||||
try {
|
||||
db.insert(table).values(values).onConflictDoNothing().run()
|
||||
return values.length
|
||||
} catch (e) {
|
||||
errs.push(`failed to migrate ${label} batch: ${e}`)
|
||||
return 0
|
||||
}
|
||||
}
|
||||
|
||||
// Pre-scan all files upfront to avoid repeated glob operations
|
||||
log.info("scanning files...")
|
||||
const [projectFiles, sessionFiles, messageFiles, partFiles, todoFiles, permFiles, shareFiles] = await Promise.all([
|
||||
list("project/*.json"),
|
||||
list("session/*/*.json"),
|
||||
list("message/*/*.json"),
|
||||
list("part/*/*.json"),
|
||||
list("todo/*.json"),
|
||||
list("permission/*.json"),
|
||||
list("session_share/*.json"),
|
||||
])
|
||||
|
||||
log.info("file scan complete", {
|
||||
projects: projectFiles.length,
|
||||
sessions: sessionFiles.length,
|
||||
messages: messageFiles.length,
|
||||
parts: partFiles.length,
|
||||
todos: todoFiles.length,
|
||||
permissions: permFiles.length,
|
||||
shares: shareFiles.length,
|
||||
})
|
||||
|
||||
sqlite.exec("BEGIN TRANSACTION")
|
||||
|
||||
// Migrate projects first (no FK deps)
|
||||
const projectFiles = await list("project/*.json")
|
||||
for (let i = 0; i < projectFiles.length; i += limit) {
|
||||
const batch = await read(projectFiles.slice(i, i + limit))
|
||||
const values = [] as any[]
|
||||
for (const item of batch) {
|
||||
const data = item.data
|
||||
const projectIds = new Set<string>()
|
||||
const projectValues = [] as any[]
|
||||
for (let i = 0; i < projectFiles.length; i += batchSize) {
|
||||
const end = Math.min(i + batchSize, projectFiles.length)
|
||||
const batch = await read(projectFiles, i, end)
|
||||
projectValues.length = 0
|
||||
for (let j = 0; j < batch.length; j++) {
|
||||
const data = batch[j]
|
||||
if (!data) continue
|
||||
if (!data?.id) {
|
||||
stats.errors.push(`project missing id: ${item.file}`)
|
||||
errs.push(`project missing id: ${projectFiles[i + j]}`)
|
||||
continue
|
||||
}
|
||||
values.push({
|
||||
projectIds.add(data.id)
|
||||
projectValues.push({
|
||||
id: data.id,
|
||||
worktree: data.worktree ?? "/",
|
||||
vcs: data.vcs,
|
||||
name: data.name ?? undefined,
|
||||
icon_url: data.icon?.url,
|
||||
icon_color: data.icon?.color,
|
||||
time_created: data.time?.created ?? Date.now(),
|
||||
time_updated: data.time?.updated ?? Date.now(),
|
||||
time_created: data.time?.created ?? now,
|
||||
time_updated: data.time?.updated ?? now,
|
||||
time_initialized: data.time?.initialized,
|
||||
sandboxes: data.sandboxes ?? [],
|
||||
})
|
||||
}
|
||||
if (values.length === 0) continue
|
||||
try {
|
||||
db.insert(ProjectTable).values(values).onConflictDoNothing().run()
|
||||
stats.projects += values.length
|
||||
} catch (e) {
|
||||
stats.errors.push(`failed to migrate project batch: ${e}`)
|
||||
}
|
||||
stats.projects += insert(projectValues, ProjectTable, "project")
|
||||
}
|
||||
log.info("migrated projects", { count: stats.projects })
|
||||
|
||||
const projectRows = db.select({ id: ProjectTable.id }).from(ProjectTable).all()
|
||||
const projectIds = new Set(projectRows.map((item) => item.id))
|
||||
log.info("migrated projects", { count: stats.projects, duration: Math.round(performance.now() - start) })
|
||||
|
||||
// Migrate sessions (depends on projects)
|
||||
const sessionFiles = await list("session/*/*.json")
|
||||
for (let i = 0; i < sessionFiles.length; i += limit) {
|
||||
const batch = await read(sessionFiles.slice(i, i + limit))
|
||||
const values = [] as any[]
|
||||
for (const item of batch) {
|
||||
const data = item.data
|
||||
const sessionIds = new Set<string>()
|
||||
const sessionValues = [] as any[]
|
||||
for (let i = 0; i < sessionFiles.length; i += batchSize) {
|
||||
const end = Math.min(i + batchSize, sessionFiles.length)
|
||||
const batch = await read(sessionFiles, i, end)
|
||||
sessionValues.length = 0
|
||||
for (let j = 0; j < batch.length; j++) {
|
||||
const data = batch[j]
|
||||
if (!data) continue
|
||||
if (!data?.id || !data?.projectID) {
|
||||
stats.errors.push(`session missing id or projectID: ${item.file}`)
|
||||
errs.push(`session missing id or projectID: ${sessionFiles[i + j]}`)
|
||||
continue
|
||||
}
|
||||
if (!projectIds.has(data.projectID)) {
|
||||
log.warn("skipping orphaned session", { sessionID: data.id, projectID: data.projectID })
|
||||
orphans.sessions++
|
||||
continue
|
||||
}
|
||||
values.push({
|
||||
sessionIds.add(data.id)
|
||||
sessionValues.push({
|
||||
id: data.id,
|
||||
project_id: data.projectID,
|
||||
parent_id: data.parentID ?? null,
|
||||
@@ -120,117 +173,110 @@ export namespace JsonMigration {
|
||||
summary_diffs: data.summary?.diffs ?? null,
|
||||
revert: data.revert ?? null,
|
||||
permission: data.permission ?? null,
|
||||
time_created: data.time?.created ?? Date.now(),
|
||||
time_updated: data.time?.updated ?? Date.now(),
|
||||
time_created: data.time?.created ?? now,
|
||||
time_updated: data.time?.updated ?? now,
|
||||
time_compacting: data.time?.compacting ?? null,
|
||||
time_archived: data.time?.archived ?? null,
|
||||
})
|
||||
}
|
||||
if (values.length === 0) continue
|
||||
try {
|
||||
db.insert(SessionTable).values(values).onConflictDoNothing().run()
|
||||
stats.sessions += values.length
|
||||
} catch (e) {
|
||||
stats.errors.push(`failed to migrate session batch: ${e}`)
|
||||
}
|
||||
stats.sessions += insert(sessionValues, SessionTable, "session")
|
||||
}
|
||||
log.info("migrated sessions", { count: stats.sessions })
|
||||
if (orphans.sessions > 0) {
|
||||
log.warn("skipped orphaned sessions", { count: orphans.sessions })
|
||||
}
|
||||
|
||||
const sessionRows = db.select({ id: SessionTable.id }).from(SessionTable).all()
|
||||
const sessionIds = new Set(sessionRows.map((item) => item.id))
|
||||
// Migrate messages using pre-scanned file map
|
||||
const allMessageFiles = [] as string[]
|
||||
const allMessageSessions = [] as string[]
|
||||
for (const file of messageFiles) {
|
||||
const sessionID = path.basename(path.dirname(file))
|
||||
if (!sessionIds.has(sessionID)) continue
|
||||
allMessageFiles.push(file)
|
||||
allMessageSessions.push(sessionID)
|
||||
}
|
||||
|
||||
// Migrate messages + parts per session
|
||||
const sessionList = Array.from(sessionIds)
|
||||
for (let i = 0; i < sessionList.length; i += limit) {
|
||||
const batch = sessionList.slice(i, i + limit)
|
||||
await Promise.allSettled(
|
||||
batch.map(async (sessionID) => {
|
||||
const messageFiles = await list(`message/${sessionID}/*.json`)
|
||||
const messageIds = new Set<string>()
|
||||
for (let j = 0; j < messageFiles.length; j += limit) {
|
||||
const chunk = await read(messageFiles.slice(j, j + limit))
|
||||
const values = [] as any[]
|
||||
for (const item of chunk) {
|
||||
const data = item.data
|
||||
if (!data?.id) {
|
||||
stats.errors.push(`message missing id: ${item.file}`)
|
||||
continue
|
||||
}
|
||||
const { id, sessionID: _, ...rest } = data
|
||||
values.push({
|
||||
id: data.id,
|
||||
session_id: sessionID,
|
||||
time_created: data.time?.created ?? Date.now(),
|
||||
time_updated: data.time?.updated ?? Date.now(),
|
||||
data: rest,
|
||||
})
|
||||
messageIds.add(data.id)
|
||||
}
|
||||
if (values.length === 0) continue
|
||||
try {
|
||||
db.insert(MessageTable).values(values).onConflictDoNothing().run()
|
||||
stats.messages += values.length
|
||||
} catch (e) {
|
||||
stats.errors.push(`failed to migrate message batch: ${e}`)
|
||||
}
|
||||
}
|
||||
|
||||
const messageList = Array.from(messageIds)
|
||||
for (let j = 0; j < messageList.length; j += limit) {
|
||||
const messageBatch = messageList.slice(j, j + limit)
|
||||
await Promise.allSettled(
|
||||
messageBatch.map(async (messageID) => {
|
||||
const partFiles = await list(`part/${messageID}/*.json`)
|
||||
for (let k = 0; k < partFiles.length; k += limit) {
|
||||
const chunk = await read(partFiles.slice(k, k + limit))
|
||||
const values = [] as any[]
|
||||
for (const item of chunk) {
|
||||
const data = item.data
|
||||
if (!data?.id || !data?.messageID) {
|
||||
stats.errors.push(`part missing id or messageID: ${item.file}`)
|
||||
continue
|
||||
}
|
||||
const { id, messageID, sessionID: _, ...rest } = data
|
||||
values.push({
|
||||
id: data.id,
|
||||
message_id: data.messageID,
|
||||
session_id: sessionID,
|
||||
time_created: data.time?.created ?? Date.now(),
|
||||
time_updated: data.time?.updated ?? Date.now(),
|
||||
data: rest,
|
||||
})
|
||||
}
|
||||
if (values.length === 0) continue
|
||||
try {
|
||||
db.insert(PartTable).values(values).onConflictDoNothing().run()
|
||||
stats.parts += values.length
|
||||
} catch (e) {
|
||||
stats.errors.push(`failed to migrate part batch: ${e}`)
|
||||
}
|
||||
}
|
||||
}),
|
||||
)
|
||||
}
|
||||
}),
|
||||
)
|
||||
for (let i = 0; i < allMessageFiles.length; i += batchSize) {
|
||||
const end = Math.min(i + batchSize, allMessageFiles.length)
|
||||
const batch = await read(allMessageFiles, i, end)
|
||||
const values = new Array(batch.length)
|
||||
let count = 0
|
||||
for (let j = 0; j < batch.length; j++) {
|
||||
const data = batch[j]
|
||||
if (!data) continue
|
||||
if (!data?.id) {
|
||||
errs.push(`message missing id: ${allMessageFiles[i + j]}`)
|
||||
continue
|
||||
}
|
||||
const sessionID = allMessageSessions[i + j]
|
||||
const id = data.id
|
||||
const rest = data
|
||||
delete rest.id
|
||||
delete rest.sessionID
|
||||
values[count++] = {
|
||||
id,
|
||||
session_id: sessionID,
|
||||
time_created: data.time?.created ?? now,
|
||||
time_updated: data.time?.updated ?? now,
|
||||
data: rest,
|
||||
}
|
||||
}
|
||||
values.length = count
|
||||
stats.messages += insert(values, MessageTable, "message")
|
||||
}
|
||||
log.info("migrated messages", { count: stats.messages })
|
||||
|
||||
// Migrate parts using pre-scanned file map
|
||||
for (let i = 0; i < partFiles.length; i += batchSize) {
|
||||
const end = Math.min(i + batchSize, partFiles.length)
|
||||
const batch = await read(partFiles, i, end)
|
||||
const values = new Array(batch.length)
|
||||
let count = 0
|
||||
for (let j = 0; j < batch.length; j++) {
|
||||
const data = batch[j]
|
||||
if (!data) continue
|
||||
if (!data?.id || !data?.messageID || !data?.sessionID) {
|
||||
errs.push(`part missing id/messageID/sessionID: ${partFiles[i + j]}`)
|
||||
continue
|
||||
}
|
||||
if (!sessionIds.has(data.sessionID)) continue
|
||||
const id = data.id
|
||||
const messageID = data.messageID
|
||||
const sessionID = data.sessionID
|
||||
const rest = data
|
||||
delete rest.id
|
||||
delete rest.messageID
|
||||
delete rest.sessionID
|
||||
values[count++] = {
|
||||
id,
|
||||
message_id: messageID,
|
||||
session_id: sessionID,
|
||||
time_created: data.time?.created ?? now,
|
||||
time_updated: data.time?.updated ?? now,
|
||||
data: rest,
|
||||
}
|
||||
}
|
||||
values.length = count
|
||||
stats.parts += insert(values, PartTable, "part")
|
||||
}
|
||||
log.info("migrated parts", { count: stats.parts })
|
||||
|
||||
// Migrate todos
|
||||
const todoFiles = await list("todo/*.json")
|
||||
for (let i = 0; i < todoFiles.length; i += limit) {
|
||||
const batch = await read(todoFiles.slice(i, i + limit))
|
||||
const todoSessions = todoFiles.map((file) => path.basename(file, ".json"))
|
||||
for (let i = 0; i < todoFiles.length; i += batchSize) {
|
||||
const end = Math.min(i + batchSize, todoFiles.length)
|
||||
const batch = await read(todoFiles, i, end)
|
||||
const values = [] as any[]
|
||||
for (const item of batch) {
|
||||
const data = item.data
|
||||
const sessionID = path.basename(item.file, ".json")
|
||||
for (let j = 0; j < batch.length; j++) {
|
||||
const data = batch[j]
|
||||
if (!data) continue
|
||||
const sessionID = todoSessions[i + j]
|
||||
if (!sessionIds.has(sessionID)) {
|
||||
log.warn("skipping orphaned todo", { sessionID })
|
||||
orphans.todos++
|
||||
continue
|
||||
}
|
||||
if (!Array.isArray(data)) {
|
||||
stats.errors.push(`todo not an array: ${item.file}`)
|
||||
errs.push(`todo not an array: ${todoFiles[i + j]}`)
|
||||
continue
|
||||
}
|
||||
for (let position = 0; position < data.length; position++) {
|
||||
@@ -242,72 +288,71 @@ export namespace JsonMigration {
|
||||
status: todo.status,
|
||||
priority: todo.priority,
|
||||
position,
|
||||
time_created: Date.now(),
|
||||
time_updated: Date.now(),
|
||||
time_created: now,
|
||||
time_updated: now,
|
||||
})
|
||||
}
|
||||
}
|
||||
if (values.length === 0) continue
|
||||
try {
|
||||
db.insert(TodoTable).values(values).onConflictDoNothing().run()
|
||||
stats.todos += values.length
|
||||
} catch (e) {
|
||||
stats.errors.push(`failed to migrate todo batch: ${e}`)
|
||||
}
|
||||
stats.todos += insert(values, TodoTable, "todo")
|
||||
}
|
||||
log.info("migrated todos", { count: stats.todos })
|
||||
if (orphans.todos > 0) {
|
||||
log.warn("skipped orphaned todos", { count: orphans.todos })
|
||||
}
|
||||
|
||||
// Migrate permissions
|
||||
const permFiles = await list("permission/*.json")
|
||||
for (let i = 0; i < permFiles.length; i += limit) {
|
||||
const batch = await read(permFiles.slice(i, i + limit))
|
||||
const values = [] as any[]
|
||||
for (const item of batch) {
|
||||
const data = item.data
|
||||
const projectID = path.basename(item.file, ".json")
|
||||
const permProjects = permFiles.map((file) => path.basename(file, ".json"))
|
||||
const permValues = [] as any[]
|
||||
for (let i = 0; i < permFiles.length; i += batchSize) {
|
||||
const end = Math.min(i + batchSize, permFiles.length)
|
||||
const batch = await read(permFiles, i, end)
|
||||
permValues.length = 0
|
||||
for (let j = 0; j < batch.length; j++) {
|
||||
const data = batch[j]
|
||||
if (!data) continue
|
||||
const projectID = permProjects[i + j]
|
||||
if (!projectIds.has(projectID)) {
|
||||
log.warn("skipping orphaned permission", { projectID })
|
||||
orphans.permissions++
|
||||
continue
|
||||
}
|
||||
values.push({ project_id: projectID, data })
|
||||
}
|
||||
if (values.length === 0) continue
|
||||
try {
|
||||
db.insert(PermissionTable).values(values).onConflictDoNothing().run()
|
||||
stats.permissions += values.length
|
||||
} catch (e) {
|
||||
stats.errors.push(`failed to migrate permission batch: ${e}`)
|
||||
permValues.push({ project_id: projectID, data })
|
||||
}
|
||||
stats.permissions += insert(permValues, PermissionTable, "permission")
|
||||
}
|
||||
log.info("migrated permissions", { count: stats.permissions })
|
||||
if (orphans.permissions > 0) {
|
||||
log.warn("skipped orphaned permissions", { count: orphans.permissions })
|
||||
}
|
||||
|
||||
// Migrate session shares
|
||||
const shareFiles = await list("session_share/*.json")
|
||||
for (let i = 0; i < shareFiles.length; i += limit) {
|
||||
const batch = await read(shareFiles.slice(i, i + limit))
|
||||
const values = [] as any[]
|
||||
for (const item of batch) {
|
||||
const data = item.data
|
||||
const sessionID = path.basename(item.file, ".json")
|
||||
const shareSessions = shareFiles.map((file) => path.basename(file, ".json"))
|
||||
const shareValues = [] as any[]
|
||||
for (let i = 0; i < shareFiles.length; i += batchSize) {
|
||||
const end = Math.min(i + batchSize, shareFiles.length)
|
||||
const batch = await read(shareFiles, i, end)
|
||||
shareValues.length = 0
|
||||
for (let j = 0; j < batch.length; j++) {
|
||||
const data = batch[j]
|
||||
if (!data) continue
|
||||
const sessionID = shareSessions[i + j]
|
||||
if (!sessionIds.has(sessionID)) {
|
||||
log.warn("skipping orphaned session_share", { sessionID })
|
||||
orphans.shares++
|
||||
continue
|
||||
}
|
||||
if (!data?.id || !data?.secret || !data?.url) {
|
||||
stats.errors.push(`session_share missing id/secret/url: ${item.file}`)
|
||||
errs.push(`session_share missing id/secret/url: ${shareFiles[i + j]}`)
|
||||
continue
|
||||
}
|
||||
values.push({ session_id: sessionID, id: data.id, secret: data.secret, url: data.url })
|
||||
}
|
||||
if (values.length === 0) continue
|
||||
try {
|
||||
db.insert(SessionShareTable).values(values).onConflictDoNothing().run()
|
||||
stats.shares += values.length
|
||||
} catch (e) {
|
||||
stats.errors.push(`failed to migrate session_share batch: ${e}`)
|
||||
shareValues.push({ session_id: sessionID, id: data.id, secret: data.secret, url: data.url })
|
||||
}
|
||||
stats.shares += insert(shareValues, SessionShareTable, "session_share")
|
||||
}
|
||||
log.info("migrated session shares", { count: stats.shares })
|
||||
if (orphans.shares > 0) {
|
||||
log.warn("skipped orphaned session shares", { count: orphans.shares })
|
||||
}
|
||||
|
||||
sqlite.exec("COMMIT")
|
||||
|
||||
log.info("json migration complete", {
|
||||
projects: stats.projects,
|
||||
@@ -318,6 +363,7 @@ export namespace JsonMigration {
|
||||
permissions: stats.permissions,
|
||||
shares: stats.shares,
|
||||
errorCount: stats.errors.length,
|
||||
duration: Math.round(performance.now() - start),
|
||||
})
|
||||
|
||||
if (stats.errors.length > 0) {
|
||||
|
||||
Reference in New Issue
Block a user