codex: address PR review feedback (#17581)

This commit is contained in:
Eric Traut
2026-04-12 16:52:29 -07:00
parent 5d482b54e9
commit 09b222272b

View File

@@ -85,10 +85,22 @@ ORDER BY queued_at ASC, seq ASC
can_after_turn: bool,
can_steer_current_turn: bool,
) -> anyhow::Result<Option<ThreadMessageClaim>> {
let mut tx = self.pool.begin().await?;
let row = sqlx::query_as::<_, ThreadMessageRow>(
r#"
SELECT
DELETE FROM thread_messages
WHERE seq = (
SELECT seq
FROM thread_messages
WHERE thread_id = ?
ORDER BY queued_at ASC, seq ASC
LIMIT 1
)
AND (
delivery NOT IN (?, ?)
OR (delivery = ? AND ?)
OR (delivery = ? AND ?)
)
RETURNING
seq,
id,
thread_id,
@@ -98,6 +110,33 @@ SELECT
meta_json,
delivery,
queued_at
"#,
)
.bind(thread_id)
.bind(DELIVERY_AFTER_TURN)
.bind(DELIVERY_STEER_CURRENT_TURN)
.bind(DELIVERY_AFTER_TURN)
.bind(can_after_turn)
.bind(DELIVERY_STEER_CURRENT_TURN)
.bind(can_steer_current_turn || can_after_turn)
.fetch_optional(self.pool.as_ref())
.await?;
if let Some(row) = row {
return match row.delivery.as_str() {
DELIVERY_AFTER_TURN | DELIVERY_STEER_CURRENT_TURN => {
Ok(Some(ThreadMessageClaim::Claimed(ThreadMessage::from(row))))
}
delivery => Ok(Some(ThreadMessageClaim::Invalid {
id: row.id,
reason: format!("invalid delivery `{delivery}`"),
})),
};
}
let oldest_delivery = sqlx::query_scalar::<_, String>(
r#"
SELECT delivery
FROM thread_messages
WHERE thread_id = ?
ORDER BY queued_at ASC, seq ASC
@@ -105,45 +144,16 @@ LIMIT 1
"#,
)
.bind(thread_id)
.fetch_optional(&mut *tx)
.fetch_optional(self.pool.as_ref())
.await?;
let Some(row) = row else {
tx.commit().await?;
return Ok(None);
};
let can_claim = match row.delivery.as_str() {
DELIVERY_AFTER_TURN => can_after_turn,
DELIVERY_STEER_CURRENT_TURN => can_steer_current_turn || can_after_turn,
delivery => {
sqlx::query("DELETE FROM thread_messages WHERE seq = ? AND id = ?")
.bind(row.seq)
.bind(row.id.as_str())
.execute(&mut *tx)
.await?;
tx.commit().await?;
return Ok(Some(ThreadMessageClaim::Invalid {
id: row.id,
reason: format!("invalid delivery `{delivery}`"),
}));
match oldest_delivery.as_deref() {
Some(DELIVERY_AFTER_TURN) if !can_after_turn => Ok(Some(ThreadMessageClaim::NotReady)),
Some(DELIVERY_STEER_CURRENT_TURN) if !(can_steer_current_turn || can_after_turn) => {
Ok(Some(ThreadMessageClaim::NotReady))
}
};
if !can_claim {
tx.commit().await?;
return Ok(Some(ThreadMessageClaim::NotReady));
None | Some(_) => Ok(None),
}
let result = sqlx::query("DELETE FROM thread_messages WHERE seq = ? AND id = ?")
.bind(row.seq)
.bind(row.id.as_str())
.execute(&mut *tx)
.await?;
tx.commit().await?;
if result.rows_affected() == 0 {
return Ok(None);
}
Ok(Some(ThreadMessageClaim::Claimed(ThreadMessage::from(row))))
}
}