mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
ref: state 6 (#4168)
This commit is contained in:
@@ -504,7 +504,9 @@ impl Session {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
*active = Some(ActiveTurn {
|
||||
sub_id: current_task.sub_id.clone(),
|
||||
turn_state: std::sync::Arc::new(crate::state::TurnState),
|
||||
turn_state: std::sync::Arc::new(tokio::sync::Mutex::new(
|
||||
crate::state::TurnState::default(),
|
||||
)),
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -583,8 +585,14 @@ impl Session {
|
||||
let (tx_approve, rx_approve) = oneshot::channel();
|
||||
let event_id = sub_id.clone();
|
||||
let prev_entry = {
|
||||
let mut state = self.state.lock().await;
|
||||
state.insert_pending_approval(sub_id, tx_approve)
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.as_mut() {
|
||||
Some(at) => {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
ts.insert_pending_approval(sub_id, tx_approve)
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
};
|
||||
if prev_entry.is_some() {
|
||||
warn!("Overwriting existing pending approval for sub_id: {event_id}");
|
||||
@@ -615,8 +623,14 @@ impl Session {
|
||||
let (tx_approve, rx_approve) = oneshot::channel();
|
||||
let event_id = sub_id.clone();
|
||||
let prev_entry = {
|
||||
let mut state = self.state.lock().await;
|
||||
state.insert_pending_approval(sub_id, tx_approve)
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.as_mut() {
|
||||
Some(at) => {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
ts.insert_pending_approval(sub_id, tx_approve)
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
};
|
||||
if prev_entry.is_some() {
|
||||
warn!("Overwriting existing pending approval for sub_id: {event_id}");
|
||||
@@ -637,8 +651,14 @@ impl Session {
|
||||
|
||||
pub async fn notify_approval(&self, sub_id: &str, decision: ReviewDecision) {
|
||||
let entry = {
|
||||
let mut state = self.state.lock().await;
|
||||
state.remove_pending_approval(sub_id)
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.as_mut() {
|
||||
Some(at) => {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
ts.remove_pending_approval(sub_id)
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
};
|
||||
match entry {
|
||||
Some(tx_approve) => {
|
||||
@@ -986,9 +1006,13 @@ impl Session {
|
||||
|
||||
/// Returns the input if there was no task running to inject into
|
||||
pub async fn inject_input(&self, input: Vec<InputItem>) -> Result<(), Vec<InputItem>> {
|
||||
let mut state = self.state.lock().await;
|
||||
let state = self.state.lock().await;
|
||||
if state.current_task.is_some() {
|
||||
state.push_pending_input(input.into());
|
||||
let mut active = self.active_turn.lock().await;
|
||||
if let Some(at) = active.as_mut() {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
ts.push_pending_input(input.into());
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
Err(input)
|
||||
@@ -996,8 +1020,13 @@ impl Session {
|
||||
}
|
||||
|
||||
pub async fn get_pending_input(&self) -> Vec<ResponseInputItem> {
|
||||
let mut state = self.state.lock().await;
|
||||
state.take_pending_input()
|
||||
let mut active = self.active_turn.lock().await;
|
||||
if let Some(at) = active.as_mut() {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
ts.take_pending_input()
|
||||
} else {
|
||||
Vec::with_capacity(0)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn call_tool(
|
||||
@@ -1015,7 +1044,11 @@ impl Session {
|
||||
pub async fn interrupt_task(&self) {
|
||||
info!("interrupt received: abort current task, if any");
|
||||
let mut state = self.state.lock().await;
|
||||
state.clear_pending();
|
||||
let mut active = self.active_turn.lock().await;
|
||||
if let Some(at) = active.as_mut() {
|
||||
let mut ts = at.turn_state.lock().await;
|
||||
ts.clear_pending();
|
||||
}
|
||||
if let Some(task) = state.current_task.take() {
|
||||
task.abort(TurnAbortReason::Interrupted);
|
||||
}
|
||||
@@ -1023,7 +1056,12 @@ impl Session {
|
||||
|
||||
fn interrupt_task_sync(&self) {
|
||||
if let Ok(mut state) = self.state.try_lock() {
|
||||
state.clear_pending();
|
||||
if let Ok(mut active) = self.active_turn.try_lock()
|
||||
&& let Some(at) = active.as_mut()
|
||||
&& let Ok(mut ts) = at.turn_state.try_lock()
|
||||
{
|
||||
ts.clear_pending();
|
||||
}
|
||||
if let Some(task) = state.current_task.take() {
|
||||
task.abort(TurnAbortReason::Interrupted);
|
||||
}
|
||||
|
||||
@@ -1,8 +1,14 @@
|
||||
//! Session/turn state module scaffolding.
|
||||
//! Session/turn state module.
|
||||
//!
|
||||
//! This module will encapsulate all mutable state for a Codex session.
|
||||
//! It starts with lightweight placeholders to enable incremental refactors
|
||||
//! without changing behaviour.
|
||||
//! Encapsulates all mutable state for a Codex session and the currently active
|
||||
//! turn. The goal is to present lock-safe, narrow APIs so other modules never
|
||||
//! need to poke at raw mutexes or internal fields.
|
||||
//!
|
||||
//! Locking guidelines
|
||||
//! - Lock ordering: `SessionState` → `ActiveTurn` → `TurnState`.
|
||||
//! - Never hold a lock across an `.await`. Extract minimal data and drop the
|
||||
//! guard before awaiting.
|
||||
//! - Prefer helper methods on these types rather than exposing fields.
|
||||
|
||||
mod session;
|
||||
mod turn;
|
||||
|
||||
@@ -1,16 +1,12 @@
|
||||
//! Session-wide mutable state.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use crate::codex::AgentTask;
|
||||
use crate::conversation_history::ConversationHistory;
|
||||
use crate::protocol::RateLimitSnapshot;
|
||||
use crate::protocol::ReviewDecision;
|
||||
use crate::protocol::TokenUsage;
|
||||
use crate::protocol::TokenUsageInfo;
|
||||
|
||||
@@ -19,8 +15,6 @@ use crate::protocol::TokenUsageInfo;
|
||||
pub(crate) struct SessionState {
|
||||
pub(crate) approved_commands: HashSet<Vec<String>>,
|
||||
pub(crate) current_task: Option<AgentTask>,
|
||||
pub(crate) pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
|
||||
pub(crate) pending_input: Vec<ResponseInputItem>,
|
||||
pub(crate) history: ConversationHistory,
|
||||
pub(crate) token_info: Option<TokenUsageInfo>,
|
||||
pub(crate) latest_rate_limits: Option<RateLimitSnapshot>,
|
||||
@@ -84,38 +78,5 @@ impl SessionState {
|
||||
(self.token_info.clone(), self.latest_rate_limits.clone())
|
||||
}
|
||||
|
||||
// Pending input/approval helpers
|
||||
pub(crate) fn insert_pending_approval(
|
||||
&mut self,
|
||||
key: String,
|
||||
tx: oneshot::Sender<ReviewDecision>,
|
||||
) -> Option<oneshot::Sender<ReviewDecision>> {
|
||||
self.pending_approvals.insert(key, tx)
|
||||
}
|
||||
|
||||
pub(crate) fn remove_pending_approval(
|
||||
&mut self,
|
||||
key: &str,
|
||||
) -> Option<oneshot::Sender<ReviewDecision>> {
|
||||
self.pending_approvals.remove(key)
|
||||
}
|
||||
|
||||
pub(crate) fn clear_pending(&mut self) {
|
||||
self.pending_approvals.clear();
|
||||
self.pending_input.clear();
|
||||
}
|
||||
|
||||
pub(crate) fn push_pending_input(&mut self, input: ResponseInputItem) {
|
||||
self.pending_input.push(input);
|
||||
}
|
||||
|
||||
pub(crate) fn take_pending_input(&mut self) -> Vec<ResponseInputItem> {
|
||||
if self.pending_input.is_empty() {
|
||||
Vec::with_capacity(0)
|
||||
} else {
|
||||
let mut ret = Vec::new();
|
||||
std::mem::swap(&mut ret, &mut self.pending_input);
|
||||
ret
|
||||
}
|
||||
}
|
||||
// Pending input/approval moved to TurnState.
|
||||
}
|
||||
|
||||
@@ -1,14 +1,60 @@
|
||||
//! Turn-scoped state and active turn metadata scaffolding.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use crate::protocol::ReviewDecision;
|
||||
|
||||
/// Metadata about the currently running turn.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct ActiveTurn {
|
||||
pub(crate) sub_id: String,
|
||||
pub(crate) turn_state: Arc<TurnState>,
|
||||
pub(crate) turn_state: Arc<Mutex<TurnState>>,
|
||||
}
|
||||
|
||||
/// Mutable state for a single turn.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct TurnState;
|
||||
pub(crate) struct TurnState {
|
||||
pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
|
||||
pending_input: Vec<ResponseInputItem>,
|
||||
}
|
||||
|
||||
impl TurnState {
|
||||
pub(crate) fn insert_pending_approval(
|
||||
&mut self,
|
||||
key: String,
|
||||
tx: oneshot::Sender<ReviewDecision>,
|
||||
) -> Option<oneshot::Sender<ReviewDecision>> {
|
||||
self.pending_approvals.insert(key, tx)
|
||||
}
|
||||
|
||||
pub(crate) fn remove_pending_approval(
|
||||
&mut self,
|
||||
key: &str,
|
||||
) -> Option<oneshot::Sender<ReviewDecision>> {
|
||||
self.pending_approvals.remove(key)
|
||||
}
|
||||
|
||||
pub(crate) fn clear_pending(&mut self) {
|
||||
self.pending_approvals.clear();
|
||||
self.pending_input.clear();
|
||||
}
|
||||
|
||||
pub(crate) fn push_pending_input(&mut self, input: ResponseInputItem) {
|
||||
self.pending_input.push(input);
|
||||
}
|
||||
|
||||
pub(crate) fn take_pending_input(&mut self) -> Vec<ResponseInputItem> {
|
||||
if self.pending_input.is_empty() {
|
||||
Vec::with_capacity(0)
|
||||
} else {
|
||||
let mut ret = Vec::new();
|
||||
std::mem::swap(&mut ret, &mut self.pending_input);
|
||||
ret
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user