mirror of
https://github.com/anomalyco/opencode.git
synced 2026-05-28 15:20:24 +00:00
chore: generate
This commit is contained in:
216
infra/lake.ts
216
infra/lake.ts
@@ -13,13 +13,10 @@ const glueS3TablesDatabaseWildcardArn = $interpolate`arn:${partition.partition}:
|
||||
const glueS3TablesTableWildcardArn = $interpolate`arn:${partition.partition}:glue:${region.region}:${current.accountId}:table/${glueCatalogName}/${tableBucketName}/*/*`
|
||||
const s3TablesBucketWildcardArn = $interpolate`arn:${partition.partition}:s3tables:${region.region}:${current.accountId}:bucket/*`
|
||||
|
||||
export const tableBucket = new aws.s3tables.TableBucket(
|
||||
"LakeTableBucket",
|
||||
{
|
||||
name: tableBucketName,
|
||||
forceDestroy: $app.stage !== "production",
|
||||
},
|
||||
)
|
||||
export const tableBucket = new aws.s3tables.TableBucket("LakeTableBucket", {
|
||||
name: tableBucketName,
|
||||
forceDestroy: $app.stage !== "production",
|
||||
})
|
||||
|
||||
const s3TablesCatalog = new aws.cloudcontrol.Resource(
|
||||
"LakeS3TablesCatalog",
|
||||
@@ -54,122 +51,107 @@ const s3TablesCatalog = new aws.cloudcontrol.Resource(
|
||||
{ dependsOn: [tableBucket] },
|
||||
)
|
||||
|
||||
const athenaResultsBucket = new aws.s3.Bucket(
|
||||
"LakeAthenaResults",
|
||||
{
|
||||
bucket: `opencode-${$app.stage}-lake-athena-results`,
|
||||
forceDestroy: $app.stage !== "production",
|
||||
},
|
||||
)
|
||||
const athenaResultsBucket = new aws.s3.Bucket("LakeAthenaResults", {
|
||||
bucket: `opencode-${$app.stage}-lake-athena-results`,
|
||||
forceDestroy: $app.stage !== "production",
|
||||
})
|
||||
|
||||
const firehoseErrorBucket = new aws.s3.Bucket(
|
||||
"LakeFirehoseErrors",
|
||||
{
|
||||
bucket: `opencode-${$app.stage}-lake-firehose-errors`,
|
||||
forceDestroy: $app.stage !== "production",
|
||||
},
|
||||
)
|
||||
const firehoseErrorBucket = new aws.s3.Bucket("LakeFirehoseErrors", {
|
||||
bucket: `opencode-${$app.stage}-lake-firehose-errors`,
|
||||
forceDestroy: $app.stage !== "production",
|
||||
})
|
||||
|
||||
const athenaWorkgroup = new aws.athena.Workgroup(
|
||||
"LakeAthenaWorkgroup",
|
||||
{
|
||||
name: `opencode-${$app.stage}-lake-workgroup`,
|
||||
forceDestroy: $app.stage !== "production",
|
||||
configuration: {
|
||||
enforceWorkgroupConfiguration: true,
|
||||
publishCloudwatchMetricsEnabled: true,
|
||||
resultConfiguration: {
|
||||
outputLocation: $interpolate`s3://${athenaResultsBucket.bucket}/`,
|
||||
},
|
||||
const athenaWorkgroup = new aws.athena.Workgroup("LakeAthenaWorkgroup", {
|
||||
name: `opencode-${$app.stage}-lake-workgroup`,
|
||||
forceDestroy: $app.stage !== "production",
|
||||
configuration: {
|
||||
enforceWorkgroupConfiguration: true,
|
||||
publishCloudwatchMetricsEnabled: true,
|
||||
resultConfiguration: {
|
||||
outputLocation: $interpolate`s3://${athenaResultsBucket.bucket}/`,
|
||||
},
|
||||
},
|
||||
)
|
||||
})
|
||||
|
||||
const firehoseRole = new aws.iam.Role(
|
||||
"LakeFirehoseRole",
|
||||
{
|
||||
assumeRolePolicy: aws.iam.getPolicyDocumentOutput({
|
||||
statements: [
|
||||
{
|
||||
effect: "Allow",
|
||||
actions: ["sts:AssumeRole"],
|
||||
principals: [
|
||||
{
|
||||
type: "Service",
|
||||
identifiers: ["firehose.amazonaws.com"],
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
}).json,
|
||||
},
|
||||
)
|
||||
const firehoseRole = new aws.iam.Role("LakeFirehoseRole", {
|
||||
assumeRolePolicy: aws.iam.getPolicyDocumentOutput({
|
||||
statements: [
|
||||
{
|
||||
effect: "Allow",
|
||||
actions: ["sts:AssumeRole"],
|
||||
principals: [
|
||||
{
|
||||
type: "Service",
|
||||
identifiers: ["firehose.amazonaws.com"],
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
}).json,
|
||||
})
|
||||
|
||||
const firehosePolicy = new aws.iam.RolePolicy(
|
||||
"LakeFirehosePolicy",
|
||||
{
|
||||
role: firehoseRole.id,
|
||||
policy: aws.iam.getPolicyDocumentOutput({
|
||||
statements: [
|
||||
{
|
||||
effect: "Allow",
|
||||
actions: [
|
||||
"s3tables:ListTableBuckets",
|
||||
"s3tables:GetTableBucket",
|
||||
"s3tables:GetNamespace",
|
||||
"s3tables:GetTable",
|
||||
"s3tables:GetTableData",
|
||||
"s3tables:GetTableMetadataLocation",
|
||||
"s3tables:ListNamespaces",
|
||||
"s3tables:ListTables",
|
||||
"s3tables:PutTableData",
|
||||
"s3tables:UpdateTableMetadataLocation",
|
||||
],
|
||||
resources: ["*"],
|
||||
},
|
||||
{
|
||||
effect: "Allow",
|
||||
actions: [
|
||||
"glue:GetCatalog",
|
||||
"glue:GetCatalogs",
|
||||
"glue:GetDatabase",
|
||||
"glue:GetDatabases",
|
||||
"glue:GetTable",
|
||||
"glue:GetTables",
|
||||
"glue:UpdateTable",
|
||||
],
|
||||
resources: [
|
||||
glueCatalogArn,
|
||||
glueS3TablesCatalogArn,
|
||||
$interpolate`${glueS3TablesCatalogArn}/*`,
|
||||
glueS3TablesDatabaseWildcardArn,
|
||||
glueS3TablesTableWildcardArn,
|
||||
$interpolate`arn:${partition.partition}:glue:${region.region}:${current.accountId}:database/*`,
|
||||
$interpolate`arn:${partition.partition}:glue:${region.region}:${current.accountId}:table/*/*`,
|
||||
$interpolate`arn:${partition.partition}:glue:${region.region}:${current.accountId}:table/${glueCatalogName}/*`,
|
||||
],
|
||||
},
|
||||
{
|
||||
effect: "Allow",
|
||||
actions: [
|
||||
"s3:AbortMultipartUpload",
|
||||
"s3:GetBucketLocation",
|
||||
"s3:GetObject",
|
||||
"s3:ListBucket",
|
||||
"s3:ListBucketMultipartUploads",
|
||||
"s3:PutObject",
|
||||
],
|
||||
resources: [firehoseErrorBucket.arn, $interpolate`${firehoseErrorBucket.arn}/*`],
|
||||
},
|
||||
{
|
||||
effect: "Allow",
|
||||
actions: ["lakeformation:GetDataAccess"],
|
||||
resources: ["*"],
|
||||
},
|
||||
],
|
||||
}).json,
|
||||
},
|
||||
)
|
||||
const firehosePolicy = new aws.iam.RolePolicy("LakeFirehosePolicy", {
|
||||
role: firehoseRole.id,
|
||||
policy: aws.iam.getPolicyDocumentOutput({
|
||||
statements: [
|
||||
{
|
||||
effect: "Allow",
|
||||
actions: [
|
||||
"s3tables:ListTableBuckets",
|
||||
"s3tables:GetTableBucket",
|
||||
"s3tables:GetNamespace",
|
||||
"s3tables:GetTable",
|
||||
"s3tables:GetTableData",
|
||||
"s3tables:GetTableMetadataLocation",
|
||||
"s3tables:ListNamespaces",
|
||||
"s3tables:ListTables",
|
||||
"s3tables:PutTableData",
|
||||
"s3tables:UpdateTableMetadataLocation",
|
||||
],
|
||||
resources: ["*"],
|
||||
},
|
||||
{
|
||||
effect: "Allow",
|
||||
actions: [
|
||||
"glue:GetCatalog",
|
||||
"glue:GetCatalogs",
|
||||
"glue:GetDatabase",
|
||||
"glue:GetDatabases",
|
||||
"glue:GetTable",
|
||||
"glue:GetTables",
|
||||
"glue:UpdateTable",
|
||||
],
|
||||
resources: [
|
||||
glueCatalogArn,
|
||||
glueS3TablesCatalogArn,
|
||||
$interpolate`${glueS3TablesCatalogArn}/*`,
|
||||
glueS3TablesDatabaseWildcardArn,
|
||||
glueS3TablesTableWildcardArn,
|
||||
$interpolate`arn:${partition.partition}:glue:${region.region}:${current.accountId}:database/*`,
|
||||
$interpolate`arn:${partition.partition}:glue:${region.region}:${current.accountId}:table/*/*`,
|
||||
$interpolate`arn:${partition.partition}:glue:${region.region}:${current.accountId}:table/${glueCatalogName}/*`,
|
||||
],
|
||||
},
|
||||
{
|
||||
effect: "Allow",
|
||||
actions: [
|
||||
"s3:AbortMultipartUpload",
|
||||
"s3:GetBucketLocation",
|
||||
"s3:GetObject",
|
||||
"s3:ListBucket",
|
||||
"s3:ListBucketMultipartUploads",
|
||||
"s3:PutObject",
|
||||
],
|
||||
resources: [firehoseErrorBucket.arn, $interpolate`${firehoseErrorBucket.arn}/*`],
|
||||
},
|
||||
{
|
||||
effect: "Allow",
|
||||
actions: ["lakeformation:GetDataAccess"],
|
||||
resources: ["*"],
|
||||
},
|
||||
],
|
||||
}).json,
|
||||
})
|
||||
|
||||
const firehose = new aws.kinesis.FirehoseDeliveryStream(
|
||||
"LakeFirehose",
|
||||
|
||||
@@ -2,9 +2,7 @@
|
||||
"version": "6",
|
||||
"dialect": "mysql",
|
||||
"id": "72655266-65da-408e-bfd8-9f3a4ad817a5",
|
||||
"prevIds": [
|
||||
"00000000-0000-0000-0000-000000000000"
|
||||
],
|
||||
"prevIds": ["00000000-0000-0000-0000-000000000000"],
|
||||
"ddl": [
|
||||
{
|
||||
"name": "stat",
|
||||
@@ -515,9 +513,7 @@
|
||||
"table": "stat"
|
||||
},
|
||||
{
|
||||
"columns": [
|
||||
"id"
|
||||
],
|
||||
"columns": ["id"],
|
||||
"name": "PRIMARY",
|
||||
"table": "stat",
|
||||
"entityType": "pks"
|
||||
@@ -624,4 +620,4 @@
|
||||
}
|
||||
],
|
||||
"renames": []
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,9 +2,7 @@
|
||||
"version": "6",
|
||||
"dialect": "mysql",
|
||||
"id": "e246639a-0da0-4fbd-b7bb-f1781d407780",
|
||||
"prevIds": [
|
||||
"72655266-65da-408e-bfd8-9f3a4ad817a5"
|
||||
],
|
||||
"prevIds": ["72655266-65da-408e-bfd8-9f3a4ad817a5"],
|
||||
"ddl": [
|
||||
{
|
||||
"name": "geo_stat",
|
||||
@@ -1601,25 +1599,19 @@
|
||||
"table": "provider_stat"
|
||||
},
|
||||
{
|
||||
"columns": [
|
||||
"id"
|
||||
],
|
||||
"columns": ["id"],
|
||||
"name": "PRIMARY",
|
||||
"table": "geo_stat",
|
||||
"entityType": "pks"
|
||||
},
|
||||
{
|
||||
"columns": [
|
||||
"id"
|
||||
],
|
||||
"columns": ["id"],
|
||||
"name": "PRIMARY",
|
||||
"table": "model_stat",
|
||||
"entityType": "pks"
|
||||
},
|
||||
{
|
||||
"columns": [
|
||||
"id"
|
||||
],
|
||||
"columns": ["id"],
|
||||
"name": "PRIMARY",
|
||||
"table": "provider_stat",
|
||||
"entityType": "pks"
|
||||
@@ -2038,4 +2030,4 @@
|
||||
}
|
||||
],
|
||||
"renames": []
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,11 +10,7 @@ export type StatDimension = "model" | "provider" | "geo"
|
||||
export function buildStatsQuery(periodStart: Date, periodEnd: Date, dimension: StatDimension) {
|
||||
const periodStartValue = sqlString(periodStart.toISOString())
|
||||
const periodEndValue = sqlString(periodEnd.toISOString())
|
||||
const sourceTable = [
|
||||
Resource.InferenceEvent.catalog,
|
||||
Resource.InferenceEvent.database,
|
||||
Resource.InferenceEvent.table,
|
||||
]
|
||||
const sourceTable = [Resource.InferenceEvent.catalog, Resource.InferenceEvent.database, Resource.InferenceEvent.table]
|
||||
.map(sqlIdentifier)
|
||||
.join(".")
|
||||
const dimensionSql = (() => {
|
||||
|
||||
@@ -40,15 +40,13 @@ export class Ingest extends Context.Service<Ingest, Ingest.Service>()("@opencode
|
||||
})
|
||||
}
|
||||
|
||||
const failed = (
|
||||
yield* Effect.all(
|
||||
chunks(
|
||||
records.map((event) => ({ Data: Buffer.from(JSON.stringify(event)) })),
|
||||
MAX_FIREHOSE_BATCH_SIZE,
|
||||
).map((batch) => putRecords(client, Resource.LakeIngestConfig.streamName, batch)),
|
||||
{ concurrency: 8 },
|
||||
)
|
||||
).reduce((sum, item) => sum + item, 0)
|
||||
const failed = (yield* Effect.all(
|
||||
chunks(
|
||||
records.map((event) => ({ Data: Buffer.from(JSON.stringify(event)) })),
|
||||
MAX_FIREHOSE_BATCH_SIZE,
|
||||
).map((batch) => putRecords(client, Resource.LakeIngestConfig.streamName, batch)),
|
||||
{ concurrency: 8 },
|
||||
)).reduce((sum, item) => sum + item, 0)
|
||||
|
||||
if (failed > 0) {
|
||||
return yield* new IngestError({ message: "Failed to ingest all lake records", failed })
|
||||
@@ -75,7 +73,8 @@ const putRecords: (
|
||||
) {
|
||||
const result = yield* Effect.tryPromise({
|
||||
try: () => client.send(new PutRecordBatchCommand({ DeliveryStreamName: streamName, Records: records })),
|
||||
catch: (cause) => new IngestError({ message: "Failed to write lake records to Firehose", failed: records.length, cause }),
|
||||
catch: (cause) =>
|
||||
new IngestError({ message: "Failed to write lake records to Firehose", failed: records.length, cause }),
|
||||
})
|
||||
const failed =
|
||||
result.RequestResponses?.flatMap((item, index) => {
|
||||
|
||||
@@ -25,26 +25,27 @@ export const Routes = HttpRouter.use((router) =>
|
||||
}),
|
||||
)
|
||||
|
||||
const ingest = (ingestService: Ingest.Service) => Effect.gen(function* () {
|
||||
const request = yield* HttpServerRequest.HttpServerRequest
|
||||
if (!isAuthorized(request.headers)) return yield* json(401, { ok: false, error: "Unauthorized" })
|
||||
const ingest = (ingestService: Ingest.Service) =>
|
||||
Effect.gen(function* () {
|
||||
const request = yield* HttpServerRequest.HttpServerRequest
|
||||
if (!isAuthorized(request.headers)) return yield* json(401, { ok: false, error: "Unauthorized" })
|
||||
|
||||
const payload = yield* HttpServerRequest.schemaBodyJson(IngestPayload).pipe(
|
||||
Effect.match({
|
||||
onFailure: () => undefined,
|
||||
onSuccess: (value) => value,
|
||||
}),
|
||||
)
|
||||
if (!payload) return yield* json(400, { ok: false, error: "Invalid JSON body" })
|
||||
const payload = yield* HttpServerRequest.schemaBodyJson(IngestPayload).pipe(
|
||||
Effect.match({
|
||||
onFailure: () => undefined,
|
||||
onSuccess: (value) => value,
|
||||
}),
|
||||
)
|
||||
if (!payload) return yield* json(400, { ok: false, error: "Invalid JSON body" })
|
||||
|
||||
const events = Array.isArray(payload.events) ? payload.events.filter(isRecord) : []
|
||||
if (events.length === 0) return yield* json(202, { ok: true, records: 0 })
|
||||
const events = Array.isArray(payload.events) ? payload.events.filter(isRecord) : []
|
||||
if (events.length === 0) return yield* json(202, { ok: true, records: 0 })
|
||||
|
||||
return yield* ingestService.write(events).pipe(
|
||||
Effect.flatMap((result) => json(202, { ok: true, records: result.records })),
|
||||
Effect.catchTag("IngestError", (error) => json(502, { ok: false, records: events.length, failed: error.failed })),
|
||||
)
|
||||
})
|
||||
return yield* ingestService.write(events).pipe(
|
||||
Effect.flatMap((result) => json(202, { ok: true, records: result.records })),
|
||||
Effect.catchTag("IngestError", (error) => json(502, { ok: false, records: events.length, failed: error.failed })),
|
||||
)
|
||||
})
|
||||
|
||||
function isAuthorized(headers: Record<string, string | undefined>) {
|
||||
const actual = Buffer.from(headers.authorization ?? headers.Authorization ?? "")
|
||||
|
||||
Reference in New Issue
Block a user