core: fix share compaction reprocessing same events by making storage list boundaries exclusive

This commit is contained in:
Dax Raad
2025-12-01 16:58:59 -05:00
parent 4acb645f04
commit 01e2c9cc21
3 changed files with 47 additions and 43 deletions

View File

@@ -93,38 +93,40 @@ export namespace Share {
console.log("reading pending events")
const list = await Storage.list({
prefix: ["share_event", shareID],
end: compaction.event,
before: compaction.event,
}).then((x) => x.toReversed())
console.log("compacting", list.length)
const data = await Promise.all(list.map(async (event) => await Storage.read<Data[]>(event))).then((x) => x.flat())
for (const item of data) {
if (!item) continue
const key = (item: Data) => {
switch (item.type) {
case "session":
return "session"
case "message":
return `message/${item.data.id}`
case "part":
return `${item.data.messageID}/${item.data.id}`
case "session_diff":
return "session_diff"
case "model":
return "model"
if (list.length > 0) {
const data = await Promise.all(list.map(async (event) => await Storage.read<Data[]>(event))).then((x) => x.flat())
for (const item of data) {
if (!item) continue
const key = (item: Data) => {
switch (item.type) {
case "session":
return "session"
case "message":
return `message/${item.data.id}`
case "part":
return `${item.data.messageID}/${item.data.id}`
case "session_diff":
return "session_diff"
case "model":
return "model"
}
}
const id = key(item)
const result = Binary.search(compaction.data, id, key)
if (result.found) {
compaction.data[result.index] = item
} else {
compaction.data.splice(result.index, 0, item)
}
}
const id = key(item)
const result = Binary.search(compaction.data, id, key)
if (result.found) {
compaction.data[result.index] = item
} else {
compaction.data.splice(result.index, 0, item)
}
compaction.event = list.at(-1)?.at(-1)
await Storage.write(["share_compaction", shareID], compaction)
}
compaction.event = list.at(-1)?.at(-1)
await Storage.write(["share_compaction", shareID], compaction)
return compaction.data
}

View File

@@ -6,7 +6,7 @@ export namespace Storage {
read(path: string): Promise<string | undefined>
write(path: string, value: string): Promise<void>
remove(path: string): Promise<void>
list(options?: { prefix?: string; limit?: number; start?: string; end?: string }): Promise<string[]>
list(options?: { prefix?: string; limit?: number; after?: string; before?: string }): Promise<string[]>
}
function createAdapter(client: AwsClient, endpoint: string, bucket: string): Adapter {
@@ -37,13 +37,13 @@ export namespace Storage {
if (!response.ok) throw new Error(`Failed to remove ${path}: ${response.status}`)
},
async list(options?: { prefix?: string; limit?: number; start?: string; end?: string }): Promise<string[]> {
async list(options?: { prefix?: string; limit?: number; after?: string; before?: string }): Promise<string[]> {
const prefix = options?.prefix || ""
const params = new URLSearchParams({ "list-type": "2", prefix })
if (options?.limit) params.set("max-keys", options.limit.toString())
if (options?.start) {
const startPath = prefix + options.start + ".json"
params.set("start-after", startPath)
if (options?.after) {
const afterPath = prefix + options.after + ".json"
params.set("start-after", afterPath)
}
const response = await client.fetch(`${base}?${params}`)
if (!response.ok) throw new Error(`Failed to list ${prefix}: ${response.status}`)
@@ -54,9 +54,9 @@ export namespace Storage {
while ((match = regex.exec(xml)) !== null) {
keys.push(match[1])
}
if (options?.end) {
const endPath = prefix + options.end + ".json"
return keys.filter((key) => key <= endPath)
if (options?.before) {
const beforePath = prefix + options.before + ".json"
return keys.filter((key) => key < beforePath)
}
return keys
},
@@ -108,9 +108,14 @@ export namespace Storage {
return adapter().remove(resolve(key))
}
export async function list(options?: { prefix?: string[]; limit?: number; start?: string; end?: string }) {
export async function list(options?: { prefix?: string[]; limit?: number; after?: string; before?: string }) {
const p = options?.prefix ? options.prefix.join("/") + (options.prefix.length ? "/" : "") : ""
const result = await adapter().list({ prefix: p, limit: options?.limit, start: options?.start, end: options?.end })
const result = await adapter().list({
prefix: p,
limit: options?.limit,
after: options?.after,
before: options?.before,
})
return result.map((x) => x.replace(/\.json$/, "").split("/"))
}

View File

@@ -2,23 +2,20 @@ import { describe, expect, test, afterAll } from "bun:test"
import { Storage } from "../../src/core/storage"
describe("core.storage", () => {
test("should list files with start and end range", async () => {
test("should list files with after and before range", async () => {
await Storage.write(["test", "users", "user1"], { name: "user1" })
await Storage.write(["test", "users", "user2"], { name: "user2" })
await Storage.write(["test", "users", "user3"], { name: "user3" })
await Storage.write(["test", "users", "user4"], { name: "user4" })
await Storage.write(["test", "users", "user5"], { name: "user5" })
const result = await Storage.list({ prefix: ["test", "users"], start: "user2", end: "user4" })
const result = await Storage.list({ prefix: ["test", "users"], after: "user2", before: "user4" })
expect(result).toEqual([
["test", "users", "user3"],
["test", "users", "user4"],
])
expect(result).toEqual([["test", "users", "user3"]])
})
test("should list files with start only", async () => {
const result = await Storage.list({ prefix: ["test", "users"], start: "user3" })
test("should list files with after only", async () => {
const result = await Storage.list({ prefix: ["test", "users"], after: "user3" })
expect(result).toEqual([
["test", "users", "user4"],