diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 4eb4a14d6a..a7504939c1 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -980,7 +980,7 @@ impl Session { pub async fn next_turn_readiness(&self) -> Option> { let mut state = self.state.lock().await; - state.next_readiness() + state.next_readiness().and_then(super::state::session::TurnReadinessGuard::take) } /// Returns the input if there was no task running to inject into @@ -1686,7 +1686,7 @@ async fn run_task(sess: Arc, turn_state: Arc) { } loop { - let (pending_input, turn_readiness) = turn_state.drain_mailbox().await; + let (pending_input, turn_readiness) = turn_state.drain_mailbox().await.into_parts(); let turn_input: Vec = if is_review_mode { if !pending_input.is_empty() { diff --git a/codex-rs/core/src/state/session.rs b/codex-rs/core/src/state/session.rs index 11132aa83c..98310c9534 100644 --- a/codex-rs/core/src/state/session.rs +++ b/codex-rs/core/src/state/session.rs @@ -26,11 +26,42 @@ impl SessionState { self.readiness_queue.push_back(flag); } - pub(crate) fn next_readiness(&mut self) -> Option> { - self.readiness_queue.pop_front() + pub(crate) fn next_readiness(&mut self) -> Option> { + if self.readiness_queue.is_empty() { + None + } else { + Some(TurnReadinessGuard::new(&mut self.readiness_queue)) + } } pub(crate) fn clear_readiness(&mut self) { self.readiness_queue.clear(); } } + +pub(crate) struct TurnReadinessGuard<'a> { + queue: &'a mut VecDeque>, + consumed: bool, +} + +impl<'a> TurnReadinessGuard<'a> { + fn new(queue: &'a mut VecDeque>) -> Self { + Self { + queue, + consumed: false, + } + } + + pub(crate) fn take(mut self) -> Option> { + self.consumed = true; + self.queue.pop_front() + } +} + +impl Drop for TurnReadinessGuard<'_> { + fn drop(&mut self) { + if !self.consumed { + let _ = self.queue.pop_front(); + } + } +} diff --git a/codex-rs/core/src/state/turn.rs b/codex-rs/core/src/state/turn.rs index 61e8a709f7..afafe60d8d 100644 --- a/codex-rs/core/src/state/turn.rs +++ b/codex-rs/core/src/state/turn.rs @@ -1,11 +1,12 @@ use std::collections::HashMap; +use std::collections::VecDeque; use std::path::PathBuf; use std::sync::Arc; use anyhow::Result; use codex_utils_readiness::ReadinessFlag; use serde_json::Value; -use tokio::sync::Mutex; +use tokio::sync::RwLock; use crate::client::ModelClient; use crate::config_types::ShellEnvironmentPolicy; @@ -18,8 +19,6 @@ use crate::turn_diff_tracker::TurnDiffTracker; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; -use std::collections::VecDeque; - #[derive(Debug)] pub(crate) struct TurnContext { pub(crate) client: ModelClient, @@ -45,6 +44,7 @@ impl TurnContext { } } +#[derive(Default)] struct TurnMailbox { initial_input: Option, latest_readiness: Option>, @@ -52,7 +52,7 @@ struct TurnMailbox { } impl TurnMailbox { - fn new( + fn seed( initial_input: Option, readiness: Option>, ) -> Self { @@ -83,17 +83,29 @@ impl TurnMailbox { self.pending.push_back(items.into()); } - fn drain(&mut self) -> (Vec, Option>) { + fn drain(&mut self) -> TurnDrain { let items = self .pending .drain(..) .map(ResponseItem::from) .collect::>(); let readiness = self.latest_readiness.clone(); - (items, readiness) + TurnDrain { items, readiness } } } +pub(crate) struct TurnDrain { + items: Vec, + readiness: Option>, +} + +impl TurnDrain { + pub(crate) fn into_parts(self) -> (Vec, Option>) { + (self.items, self.readiness) + } +} + +#[derive(Default)] struct TurnRuntime { mailbox: TurnMailbox, review_history: Vec, @@ -108,11 +120,8 @@ impl TurnRuntime { readiness: Option>, ) -> Self { Self { - mailbox: TurnMailbox::new(initial_input, readiness), - review_history: Vec::new(), - last_agent_message: None, - auto_compact_recently_attempted: false, - diff_tracker: TurnDiffTracker::new(), + mailbox: TurnMailbox::seed(initial_input, readiness), + ..Self::default() } } } @@ -120,7 +129,7 @@ impl TurnRuntime { pub(crate) struct TurnState { sub_id: String, turn_context: Arc, - runtime: Mutex, + runtime: RwLock, } impl TurnState { @@ -139,7 +148,7 @@ impl TurnState { Self { sub_id, turn_context, - runtime: Mutex::new(runtime), + runtime: RwLock::new(runtime), } } @@ -152,12 +161,12 @@ impl TurnState { } pub(crate) async fn take_initial_input(&self) -> Option { - let mut runtime = self.runtime.lock().await; + let mut runtime = self.runtime.write().await; runtime.mailbox.take_initial_input() } - pub(crate) async fn drain_mailbox(&self) -> (Vec, Option>) { - let mut runtime = self.runtime.lock().await; + pub(crate) async fn drain_mailbox(&self) -> TurnDrain { + let mut runtime = self.runtime.write().await; runtime.mailbox.drain() } @@ -166,12 +175,12 @@ impl TurnState { items: Vec, readiness: Option>, ) { - let mut runtime = self.runtime.lock().await; + let mut runtime = self.runtime.write().await; runtime.mailbox.enqueue(items, readiness); } pub(crate) async fn set_review_history(&self, history: Vec) { - let mut runtime = self.runtime.lock().await; + let mut runtime = self.runtime.write().await; runtime.review_history = history; } @@ -179,44 +188,44 @@ impl TurnState { if items.is_empty() { return; } - let mut runtime = self.runtime.lock().await; + let mut runtime = self.runtime.write().await; runtime.review_history.extend(items.iter().cloned()); } pub(crate) async fn review_history(&self) -> Vec { - let runtime = self.runtime.lock().await; + let runtime = self.runtime.read().await; runtime.review_history.clone() } pub(crate) async fn mark_auto_compact_attempted(&self) -> bool { - let mut runtime = self.runtime.lock().await; + let mut runtime = self.runtime.write().await; let already_attempted = runtime.auto_compact_recently_attempted; runtime.auto_compact_recently_attempted = true; already_attempted } pub(crate) async fn reset_auto_compact_attempted(&self) { - let mut runtime = self.runtime.lock().await; + let mut runtime = self.runtime.write().await; runtime.auto_compact_recently_attempted = false; } pub(crate) async fn set_last_agent_message(&self, message: Option) { - let mut runtime = self.runtime.lock().await; + let mut runtime = self.runtime.write().await; runtime.last_agent_message = message; } pub(crate) async fn last_agent_message(&self) -> Option { - let runtime = self.runtime.lock().await; + let runtime = self.runtime.read().await; runtime.last_agent_message.clone() } pub(crate) async fn on_patch_begin(&self, changes: &HashMap) { - let mut runtime = self.runtime.lock().await; + let mut runtime = self.runtime.write().await; runtime.diff_tracker.on_patch_begin(changes); } pub(crate) async fn take_unified_diff(&self) -> Result> { - let mut runtime = self.runtime.lock().await; + let mut runtime = self.runtime.write().await; runtime.diff_tracker.get_unified_diff() } } diff --git a/codex-rs/state.md b/codex-rs/state.md index 3c6ef51f73..5408921be5 100644 --- a/codex-rs/state.md +++ b/codex-rs/state.md @@ -66,7 +66,7 @@ struct TurnMailbox { 4. **Turn execution** – `run_task` now receives `Arc` instead of a raw `Vec` / readiness pair. It: - Grabs the initial input via `turn_state.take_initial_input()` to seed history and the review mailbox. - - On each iteration, calls `turn_state.drain_mailbox()` which returns `(Vec, Option>)` so the loop no longer needs to manipulate the readiness flag manually. `TurnMailbox` ensures we always hand out the most recent readiness flag (the newest non-`None` entry wins). + - On each iteration, calls `turn_state.drain_mailbox()` which returns a `TurnDrain` bundling the pending `ResponseItem`s and the latest readiness flag so the loop no longer needs to manipulate the readiness flag manually. `TurnMailbox` ensures we always hand out the most recent readiness flag (the newest non-`None` entry wins). - Accesses the diff tracker, review history, and auto-compaction flag through the `TurnState` rather than local variables. This keeps the single source of truth tied to the turn’s lifetime and makes debugging easier. - Writes the last assistant message into `turn_state` before signalling `TaskComplete` so listeners can retrieve it even if the task is aborted elsewhere.