This commit is contained in:
jimmyfraiture
2025-09-24 14:33:32 +01:00
parent 8bbcbdc0ed
commit 3ea07b67d4
4 changed files with 71 additions and 31 deletions

View File

@@ -980,7 +980,7 @@ impl Session {
pub async fn next_turn_readiness(&self) -> Option<Arc<ReadinessFlag>> {
let mut state = self.state.lock().await;
state.next_readiness()
state.next_readiness().and_then(super::state::session::TurnReadinessGuard::take)
}
/// Returns the input if there was no task running to inject into
@@ -1686,7 +1686,7 @@ async fn run_task(sess: Arc<Session>, turn_state: Arc<TurnState>) {
}
loop {
let (pending_input, turn_readiness) = turn_state.drain_mailbox().await;
let (pending_input, turn_readiness) = turn_state.drain_mailbox().await.into_parts();
let turn_input: Vec<ResponseItem> = if is_review_mode {
if !pending_input.is_empty() {

View File

@@ -26,11 +26,42 @@ impl SessionState {
self.readiness_queue.push_back(flag);
}
pub(crate) fn next_readiness(&mut self) -> Option<Arc<ReadinessFlag>> {
self.readiness_queue.pop_front()
pub(crate) fn next_readiness(&mut self) -> Option<TurnReadinessGuard<'_>> {
if self.readiness_queue.is_empty() {
None
} else {
Some(TurnReadinessGuard::new(&mut self.readiness_queue))
}
}
pub(crate) fn clear_readiness(&mut self) {
self.readiness_queue.clear();
}
}
pub(crate) struct TurnReadinessGuard<'a> {
queue: &'a mut VecDeque<Arc<ReadinessFlag>>,
consumed: bool,
}
impl<'a> TurnReadinessGuard<'a> {
fn new(queue: &'a mut VecDeque<Arc<ReadinessFlag>>) -> Self {
Self {
queue,
consumed: false,
}
}
pub(crate) fn take(mut self) -> Option<Arc<ReadinessFlag>> {
self.consumed = true;
self.queue.pop_front()
}
}
impl Drop for TurnReadinessGuard<'_> {
fn drop(&mut self) {
if !self.consumed {
let _ = self.queue.pop_front();
}
}
}

View File

@@ -1,11 +1,12 @@
use std::collections::HashMap;
use std::collections::VecDeque;
use std::path::PathBuf;
use std::sync::Arc;
use anyhow::Result;
use codex_utils_readiness::ReadinessFlag;
use serde_json::Value;
use tokio::sync::Mutex;
use tokio::sync::RwLock;
use crate::client::ModelClient;
use crate::config_types::ShellEnvironmentPolicy;
@@ -18,8 +19,6 @@ use crate::turn_diff_tracker::TurnDiffTracker;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use std::collections::VecDeque;
#[derive(Debug)]
pub(crate) struct TurnContext {
pub(crate) client: ModelClient,
@@ -45,6 +44,7 @@ impl TurnContext {
}
}
#[derive(Default)]
struct TurnMailbox {
initial_input: Option<ResponseInputItem>,
latest_readiness: Option<Arc<ReadinessFlag>>,
@@ -52,7 +52,7 @@ struct TurnMailbox {
}
impl TurnMailbox {
fn new(
fn seed(
initial_input: Option<ResponseInputItem>,
readiness: Option<Arc<ReadinessFlag>>,
) -> Self {
@@ -83,17 +83,29 @@ impl TurnMailbox {
self.pending.push_back(items.into());
}
fn drain(&mut self) -> (Vec<ResponseItem>, Option<Arc<ReadinessFlag>>) {
fn drain(&mut self) -> TurnDrain {
let items = self
.pending
.drain(..)
.map(ResponseItem::from)
.collect::<Vec<_>>();
let readiness = self.latest_readiness.clone();
(items, readiness)
TurnDrain { items, readiness }
}
}
pub(crate) struct TurnDrain {
items: Vec<ResponseItem>,
readiness: Option<Arc<ReadinessFlag>>,
}
impl TurnDrain {
pub(crate) fn into_parts(self) -> (Vec<ResponseItem>, Option<Arc<ReadinessFlag>>) {
(self.items, self.readiness)
}
}
#[derive(Default)]
struct TurnRuntime {
mailbox: TurnMailbox,
review_history: Vec<ResponseItem>,
@@ -108,11 +120,8 @@ impl TurnRuntime {
readiness: Option<Arc<ReadinessFlag>>,
) -> Self {
Self {
mailbox: TurnMailbox::new(initial_input, readiness),
review_history: Vec::new(),
last_agent_message: None,
auto_compact_recently_attempted: false,
diff_tracker: TurnDiffTracker::new(),
mailbox: TurnMailbox::seed(initial_input, readiness),
..Self::default()
}
}
}
@@ -120,7 +129,7 @@ impl TurnRuntime {
pub(crate) struct TurnState {
sub_id: String,
turn_context: Arc<TurnContext>,
runtime: Mutex<TurnRuntime>,
runtime: RwLock<TurnRuntime>,
}
impl TurnState {
@@ -139,7 +148,7 @@ impl TurnState {
Self {
sub_id,
turn_context,
runtime: Mutex::new(runtime),
runtime: RwLock::new(runtime),
}
}
@@ -152,12 +161,12 @@ impl TurnState {
}
pub(crate) async fn take_initial_input(&self) -> Option<ResponseInputItem> {
let mut runtime = self.runtime.lock().await;
let mut runtime = self.runtime.write().await;
runtime.mailbox.take_initial_input()
}
pub(crate) async fn drain_mailbox(&self) -> (Vec<ResponseItem>, Option<Arc<ReadinessFlag>>) {
let mut runtime = self.runtime.lock().await;
pub(crate) async fn drain_mailbox(&self) -> TurnDrain {
let mut runtime = self.runtime.write().await;
runtime.mailbox.drain()
}
@@ -166,12 +175,12 @@ impl TurnState {
items: Vec<InputItem>,
readiness: Option<Arc<ReadinessFlag>>,
) {
let mut runtime = self.runtime.lock().await;
let mut runtime = self.runtime.write().await;
runtime.mailbox.enqueue(items, readiness);
}
pub(crate) async fn set_review_history(&self, history: Vec<ResponseItem>) {
let mut runtime = self.runtime.lock().await;
let mut runtime = self.runtime.write().await;
runtime.review_history = history;
}
@@ -179,44 +188,44 @@ impl TurnState {
if items.is_empty() {
return;
}
let mut runtime = self.runtime.lock().await;
let mut runtime = self.runtime.write().await;
runtime.review_history.extend(items.iter().cloned());
}
pub(crate) async fn review_history(&self) -> Vec<ResponseItem> {
let runtime = self.runtime.lock().await;
let runtime = self.runtime.read().await;
runtime.review_history.clone()
}
pub(crate) async fn mark_auto_compact_attempted(&self) -> bool {
let mut runtime = self.runtime.lock().await;
let mut runtime = self.runtime.write().await;
let already_attempted = runtime.auto_compact_recently_attempted;
runtime.auto_compact_recently_attempted = true;
already_attempted
}
pub(crate) async fn reset_auto_compact_attempted(&self) {
let mut runtime = self.runtime.lock().await;
let mut runtime = self.runtime.write().await;
runtime.auto_compact_recently_attempted = false;
}
pub(crate) async fn set_last_agent_message(&self, message: Option<String>) {
let mut runtime = self.runtime.lock().await;
let mut runtime = self.runtime.write().await;
runtime.last_agent_message = message;
}
pub(crate) async fn last_agent_message(&self) -> Option<String> {
let runtime = self.runtime.lock().await;
let runtime = self.runtime.read().await;
runtime.last_agent_message.clone()
}
pub(crate) async fn on_patch_begin(&self, changes: &HashMap<PathBuf, FileChange>) {
let mut runtime = self.runtime.lock().await;
let mut runtime = self.runtime.write().await;
runtime.diff_tracker.on_patch_begin(changes);
}
pub(crate) async fn take_unified_diff(&self) -> Result<Option<String>> {
let mut runtime = self.runtime.lock().await;
let mut runtime = self.runtime.write().await;
runtime.diff_tracker.get_unified_diff()
}
}

View File

@@ -66,7 +66,7 @@ struct TurnMailbox {
4. **Turn execution** `run_task` now receives `Arc<TurnState>` instead of a raw `Vec<InputItem>` / readiness pair. It:
- Grabs the initial input via `turn_state.take_initial_input()` to seed history and the review mailbox.
- On each iteration, calls `turn_state.drain_mailbox()` which returns `(Vec<ResponseItem>, Option<Arc<ReadinessFlag>>)` so the loop no longer needs to manipulate the readiness flag manually. `TurnMailbox` ensures we always hand out the most recent readiness flag (the newest non-`None` entry wins).
- On each iteration, calls `turn_state.drain_mailbox()` which returns a `TurnDrain` bundling the pending `ResponseItem`s and the latest readiness flag so the loop no longer needs to manipulate the readiness flag manually. `TurnMailbox` ensures we always hand out the most recent readiness flag (the newest non-`None` entry wins).
- Accesses the diff tracker, review history, and auto-compaction flag through the `TurnState` rather than local variables. This keeps the single source of truth tied to the turns lifetime and makes debugging easier.
- Writes the last assistant message into `turn_state` before signalling `TaskComplete` so listeners can retrieve it even if the task is aborted elsewhere.