Compare commits

...

12 Commits

Author SHA1 Message Date
jif-oai
fa75e63824 core: clear active turn on task abort to avoid stale turn state (#4173) 2025-09-24 18:13:34 +01:00
jif-oai
e553ff6f17 ref: state 10 (#4172) 2025-09-24 18:13:27 +01:00
jif-oai
06a0b36ee5 ref: state 9 (#4171) 2025-09-24 18:13:18 +01:00
jif-oai
f4ceeba263 core/compact: stop locking Session.state; use Session::record_into_hi… (#4170)
…story during streaming

# External (non-OpenAI) Pull Request Requirements

Before opening this Pull Request, please read the dedicated
"Contributing" markdown file or your PR may be closed:
https://github.com/openai/codex/blob/main/docs/contributing.md

If your PR conforms to our contribution guidelines, replace this text
with a detailed and high quality description of your changes.
2025-09-24 17:48:33 +01:00
jimmyfraiture
55abf54a7d core: add Session::history_snapshot and use in compact task to avoid external state locking 2025-09-24 17:31:30 +01:00
jimmyfraiture
d36bae194b core: document lock ordering and usage in state module 2025-09-24 17:29:45 +01:00
jimmyfraiture
862c9e494b core: move pending approvals/input to TurnState; use ActiveTurn in Session for approvals and input lifecycle 2025-09-24 17:28:30 +01:00
jimmyfraiture
f66c4bcf6f core: add ActiveTurn wrapper, track current active turn in Session 2025-09-24 17:25:42 +01:00
jimmyfraiture
c122259012 core: add SessionState helper methods and migrate call sites (history, approvals, tokens, rate limits) 2025-09-24 17:16:59 +01:00
jimmyfraiture
269b2999c7 core: introduce services::SessionServices and move helpers off Session 2025-09-24 17:12:13 +01:00
jimmyfraiture
29bfa77cc0 core: move anonymous State into state::SessionState and update Session to use it 2025-09-24 17:08:07 +01:00
jimmyfraiture
c8d65bbf5e core: scaffold state module with SessionState/ActiveTurn/TurnState and wire into lib 2025-09-24 17:04:38 +01:00
7 changed files with 317 additions and 104 deletions

View File

@@ -1,6 +1,5 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
@@ -110,7 +109,6 @@ use crate::protocol::Submission;
use crate::protocol::TaskCompleteEvent;
use crate::protocol::TokenCountEvent;
use crate::protocol::TokenUsage;
use crate::protocol::TokenUsageInfo;
use crate::protocol::TurnDiffEvent;
use crate::protocol::WebSearchBeginEvent;
use crate::rollout::RolloutRecorder;
@@ -119,6 +117,8 @@ use crate::safety::SafetyCheck;
use crate::safety::assess_command_safety;
use crate::safety::assess_safety_for_untrusted_command;
use crate::shell;
use crate::state::ActiveTurn;
use crate::state::SessionServices;
use crate::turn_diff_tracker::TurnDiffTracker;
use crate::unified_exec::UnifiedExecSessionManager;
use crate::user_instructions::UserInstructions;
@@ -252,17 +252,7 @@ impl Codex {
}
}
/// Mutable state of the agent
#[derive(Default)]
struct State {
approved_commands: HashSet<Vec<String>>,
current_task: Option<AgentTask>,
pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
pending_input: Vec<ResponseInputItem>,
history: ConversationHistory,
token_info: Option<TokenUsageInfo>,
latest_rate_limits: Option<RateLimitSnapshot>,
}
use crate::state::SessionState;
/// Context for an initialized model agent
///
@@ -270,21 +260,9 @@ struct State {
pub(crate) struct Session {
conversation_id: ConversationId,
tx_event: Sender<Event>,
/// Manager for external MCP servers/tools.
mcp_connection_manager: McpConnectionManager,
session_manager: ExecSessionManager,
unified_exec_manager: UnifiedExecSessionManager,
notifier: UserNotifier,
/// Optional rollout recorder for persisting the conversation transcript so
/// sessions can be replayed or inspected later.
rollout: Mutex<Option<RolloutRecorder>>,
state: Mutex<State>,
codex_linux_sandbox_exe: Option<PathBuf>,
user_shell: shell::Shell,
show_raw_agent_reasoning: bool,
state: Mutex<SessionState>,
active_turn: Mutex<Option<ActiveTurn>>,
services: SessionServices,
next_internal_sub_id: AtomicU64,
}
@@ -412,10 +390,7 @@ impl Session {
})?;
let rollout_path = rollout_recorder.rollout_path.clone();
// Create the mutable state for the Session.
let state = State {
history: ConversationHistory::new(),
..Default::default()
};
let state = SessionState::new();
// Handle MCP manager result and record any startup failures.
let (mcp_connection_manager, failed_clients) = match mcp_res {
@@ -473,18 +448,23 @@ impl Session {
is_review_mode: false,
final_output_json_schema: None,
};
let sess = Arc::new(Session {
conversation_id,
tx_event: tx_event.clone(),
let services = SessionServices {
mcp_connection_manager,
session_manager: ExecSessionManager::default(),
unified_exec_manager: UnifiedExecSessionManager::default(),
notifier: notify,
state: Mutex::new(state),
rollout: Mutex::new(Some(rollout_recorder)),
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
user_shell: default_shell,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
};
let sess = Arc::new(Session {
conversation_id,
tx_event: tx_event.clone(),
state: Mutex::new(state),
active_turn: Mutex::new(None),
services,
next_internal_sub_id: AtomicU64::new(0),
});
@@ -520,6 +500,15 @@ impl Session {
current_task.abort(TurnAbortReason::Replaced);
}
state.current_task = Some(task);
if let Some(current_task) = &state.current_task {
let mut active = self.active_turn.lock().await;
*active = Some(ActiveTurn {
sub_id: current_task.sub_id.clone(),
turn_state: std::sync::Arc::new(tokio::sync::Mutex::new(
crate::state::TurnState::default(),
)),
});
}
}
pub async fn remove_task(&self, sub_id: &str) {
@@ -529,6 +518,12 @@ impl Session {
{
state.current_task.take();
}
let mut active = self.active_turn.lock().await;
if let Some(at) = &*active
&& at.sub_id == sub_id
{
*active = None;
}
}
fn next_internal_sub_id(&self) -> String {
@@ -590,8 +585,14 @@ impl Session {
let (tx_approve, rx_approve) = oneshot::channel();
let event_id = sub_id.clone();
let prev_entry = {
let mut state = self.state.lock().await;
state.pending_approvals.insert(sub_id, tx_approve)
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.insert_pending_approval(sub_id, tx_approve)
}
None => None,
}
};
if prev_entry.is_some() {
warn!("Overwriting existing pending approval for sub_id: {event_id}");
@@ -622,8 +623,14 @@ impl Session {
let (tx_approve, rx_approve) = oneshot::channel();
let event_id = sub_id.clone();
let prev_entry = {
let mut state = self.state.lock().await;
state.pending_approvals.insert(sub_id, tx_approve)
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.insert_pending_approval(sub_id, tx_approve)
}
None => None,
}
};
if prev_entry.is_some() {
warn!("Overwriting existing pending approval for sub_id: {event_id}");
@@ -644,8 +651,14 @@ impl Session {
pub async fn notify_approval(&self, sub_id: &str, decision: ReviewDecision) {
let entry = {
let mut state = self.state.lock().await;
state.pending_approvals.remove(sub_id)
let mut active = self.active_turn.lock().await;
match active.as_mut() {
Some(at) => {
let mut ts = at.turn_state.lock().await;
ts.remove_pending_approval(sub_id)
}
None => None,
}
};
match entry {
Some(tx_approve) => {
@@ -659,7 +672,7 @@ impl Session {
pub async fn add_approved_command(&self, cmd: Vec<String>) {
let mut state = self.state.lock().await;
state.approved_commands.insert(cmd);
state.add_approved_command(cmd);
}
/// Records input items: always append to conversation history and
@@ -699,7 +712,12 @@ impl Session {
/// Append ResponseItems to the in-memory conversation history only.
async fn record_into_history(&self, items: &[ResponseItem]) {
let mut state = self.state.lock().await;
state.history.record_items(items.iter());
state.record_items(items.iter());
}
async fn replace_history(&self, items: Vec<ResponseItem>) {
let mut state = self.state.lock().await;
state.replace_history(items);
}
async fn persist_rollout_response_items(&self, items: &[ResponseItem]) {
@@ -720,14 +738,14 @@ impl Session {
Some(turn_context.cwd.clone()),
Some(turn_context.approval_policy),
Some(turn_context.sandbox_policy.clone()),
Some(self.user_shell.clone()),
Some(self.user_shell().clone()),
)));
items
}
async fn persist_rollout_items(&self, items: &[RolloutItem]) {
let recorder = {
let guard = self.rollout.lock().await;
let guard = self.services.rollout.lock().await;
guard.clone()
};
if let Some(rec) = recorder
@@ -737,6 +755,11 @@ impl Session {
}
}
pub(crate) async fn history_snapshot(&self) -> Vec<ResponseItem> {
let state = self.state.lock().await;
state.history_snapshot()
}
async fn update_token_usage_info(
&self,
sub_id: &str,
@@ -746,12 +769,10 @@ impl Session {
{
let mut state = self.state.lock().await;
if let Some(token_usage) = token_usage {
let info = TokenUsageInfo::new_or_append(
&state.token_info,
&Some(token_usage.clone()),
state.update_token_info_from_usage(
token_usage,
turn_context.client.get_model_context_window(),
);
state.token_info = info;
}
}
self.send_token_count_event(sub_id).await;
@@ -760,7 +781,7 @@ impl Session {
async fn update_rate_limits(&self, sub_id: &str, new_rate_limits: RateLimitSnapshot) {
{
let mut state = self.state.lock().await;
state.latest_rate_limits = Some(new_rate_limits);
state.set_rate_limits(new_rate_limits);
}
self.send_token_count_event(sub_id).await;
}
@@ -768,7 +789,7 @@ impl Session {
async fn send_token_count_event(&self, sub_id: &str) {
let (info, rate_limits) = {
let state = self.state.lock().await;
(state.token_info.clone(), state.latest_rate_limits.clone())
state.token_info_and_rate_limits()
};
let event = Event {
id: sub_id.to_string(),
@@ -787,7 +808,7 @@ impl Session {
// Derive user message events and persist only UserMessage to rollout
let msgs =
map_response_item_to_event_messages(&response_item, self.show_raw_agent_reasoning);
map_response_item_to_event_messages(&response_item, self.show_raw_agent_reasoning());
let user_msgs: Vec<RolloutItem> = msgs
.into_iter()
.filter_map(|m| match m {
@@ -986,16 +1007,20 @@ impl Session {
pub async fn turn_input_with_history(&self, extra: Vec<ResponseItem>) -> Vec<ResponseItem> {
let history = {
let state = self.state.lock().await;
state.history.contents()
state.history_snapshot()
};
[history, extra].concat()
}
/// Returns the input if there was no task running to inject into
pub async fn inject_input(&self, input: Vec<InputItem>) -> Result<(), Vec<InputItem>> {
let mut state = self.state.lock().await;
let state = self.state.lock().await;
if state.current_task.is_some() {
state.pending_input.push(input.into());
let mut active = self.active_turn.lock().await;
if let Some(at) = active.as_mut() {
let mut ts = at.turn_state.lock().await;
ts.push_pending_input(input.into());
}
Ok(())
} else {
Err(input)
@@ -1003,13 +1028,12 @@ impl Session {
}
pub async fn get_pending_input(&self) -> Vec<ResponseInputItem> {
let mut state = self.state.lock().await;
if state.pending_input.is_empty() {
Vec::with_capacity(0)
let mut active = self.active_turn.lock().await;
if let Some(at) = active.as_mut() {
let mut ts = at.turn_state.lock().await;
ts.take_pending_input()
} else {
let mut ret = Vec::new();
std::mem::swap(&mut ret, &mut state.pending_input);
ret
Vec::with_capacity(0)
}
}
@@ -1019,7 +1043,8 @@ impl Session {
tool: &str,
arguments: Option<serde_json::Value>,
) -> anyhow::Result<CallToolResult> {
self.mcp_connection_manager
self.services
.mcp_connection_manager
.call_tool(server, tool, arguments)
.await
}
@@ -1027,8 +1052,11 @@ impl Session {
pub async fn interrupt_task(&self) {
info!("interrupt received: abort current task, if any");
let mut state = self.state.lock().await;
state.pending_approvals.clear();
state.pending_input.clear();
let mut active = self.active_turn.lock().await;
if let Some(at) = active.as_mut() {
let mut ts = at.turn_state.lock().await;
ts.clear_pending();
}
if let Some(task) = state.current_task.take() {
task.abort(TurnAbortReason::Interrupted);
}
@@ -1036,8 +1064,12 @@ impl Session {
fn interrupt_task_sync(&self) {
if let Ok(mut state) = self.state.try_lock() {
state.pending_approvals.clear();
state.pending_input.clear();
if let Ok(mut active) = self.active_turn.try_lock()
&& let Some(at) = active.as_mut()
&& let Ok(mut ts) = at.turn_state.try_lock()
{
ts.clear_pending();
}
if let Some(task) = state.current_task.take() {
task.abort(TurnAbortReason::Interrupted);
}
@@ -1045,7 +1077,15 @@ impl Session {
}
pub(crate) fn notifier(&self) -> &UserNotifier {
&self.notifier
&self.services.notifier
}
fn user_shell(&self) -> &shell::Shell {
&self.services.user_shell
}
fn show_raw_agent_reasoning(&self) -> bool {
self.services.show_raw_agent_reasoning
}
}
@@ -1151,15 +1191,19 @@ impl AgentTask {
// TOCTOU?
if !self.handle.is_finished() {
self.handle.abort();
let sub_id = self.sub_id.clone();
let is_review = self.kind == AgentTaskKind::Review;
let sess = self.sess;
let event = Event {
id: self.sub_id.clone(),
id: sub_id.clone(),
msg: EventMsg::TurnAborted(TurnAbortedEvent { reason }),
};
let sess = self.sess;
tokio::spawn(async move {
if self.kind == AgentTaskKind::Review {
exit_review_mode(sess.clone(), self.sub_id, None).await;
if is_review {
exit_review_mode(sess.clone(), sub_id.clone(), None).await;
}
// Ensure active turn state is cleared when a task is aborted.
sess.remove_task(&sub_id).await;
sess.send_event(event).await;
});
}
@@ -1417,7 +1461,7 @@ async fn submission_loop(
let sub_id = sub.id.clone();
// This is a cheap lookup from the connection manager's cache.
let tools = sess.mcp_connection_manager.list_all_tools();
let tools = sess.services.mcp_connection_manager.list_all_tools();
let event = Event {
id: sub_id,
msg: EventMsg::McpListToolsResponse(
@@ -1467,7 +1511,7 @@ async fn submission_loop(
// Gracefully flush and shutdown rollout recorder on session end so tests
// that inspect the rollout file do not race with the background writer.
let recorder_opt = {
let mut guard = sess.rollout.lock().await;
let mut guard = sess.services.rollout.lock().await;
guard.take()
};
if let Some(rec) = recorder_opt
@@ -1494,7 +1538,7 @@ async fn submission_loop(
let sub_id = sub.id.clone();
// Flush rollout writes before returning the path so readers observe a consistent file.
let (path, rec_opt) = {
let guard = sess.rollout.lock().await;
let guard = sess.services.rollout.lock().await;
match guard.as_ref() {
Some(rec) => (rec.get_rollout_path(), Some(rec.clone())),
None => {
@@ -1952,7 +1996,7 @@ async fn run_turn(
) -> CodexResult<TurnRunResult> {
let tools = get_openai_tools(
&turn_context.tools_config,
Some(sess.mcp_connection_manager.list_all_tools()),
Some(sess.services.mcp_connection_manager.list_all_tools()),
);
let prompt = Prompt {
@@ -2202,7 +2246,7 @@ async fn try_run_turn(
sess.send_event(event).await;
}
ResponseEvent::ReasoningContentDelta(delta) => {
if sess.show_raw_agent_reasoning {
if sess.show_raw_agent_reasoning() {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningRawContentDelta(
@@ -2324,7 +2368,7 @@ async fn handle_response_item(
trace!("suppressing assistant Message in review mode");
Vec::new()
}
_ => map_response_item_to_event_messages(&item, sess.show_raw_agent_reasoning),
_ => map_response_item_to_event_messages(&item, sess.show_raw_agent_reasoning()),
};
for msg in msgs {
let event = Event {
@@ -2370,7 +2414,11 @@ async fn handle_unified_exec_tool_call(
timeout_ms,
};
let result = sess.unified_exec_manager.handle_request(request).await;
let result = sess
.services
.unified_exec_manager
.handle_request(request)
.await;
let output_payload = match result {
Ok(value) => {
@@ -2545,6 +2593,7 @@ async fn handle_function_call(
}
};
let result = sess
.services
.session_manager
.handle_exec_command_request(exec_params)
.await;
@@ -2568,6 +2617,7 @@ async fn handle_function_call(
}
};
let result = sess
.services
.session_manager
.handle_write_stdin_request(write_stdin_params)
.await;
@@ -2579,7 +2629,7 @@ async fn handle_function_call(
}
}
_ => {
match sess.mcp_connection_manager.parse_tool_name(&name) {
match sess.services.mcp_connection_manager.parse_tool_name(&name) {
Some((server, tool_name)) => {
handle_mcp_tool_call(sess, &sub_id, call_id, server, tool_name, arguments).await
}
@@ -2697,12 +2747,12 @@ fn maybe_translate_shell_command(
sess: &Session,
turn_context: &TurnContext,
) -> ExecParams {
let should_translate = matches!(sess.user_shell, crate::shell::Shell::PowerShell(_))
let should_translate = matches!(sess.user_shell(), crate::shell::Shell::PowerShell(_))
|| turn_context.shell_environment_policy.use_profile;
if should_translate
&& let Some(command) = sess
.user_shell
.user_shell()
.format_default_shell_invocation(params.command.clone())
{
return ExecParams { command, ..params };
@@ -2816,7 +2866,7 @@ async fn handle_container_exec_with_params(
&params.command,
turn_context.approval_policy,
&turn_context.sandbox_policy,
&state.approved_commands,
state.approved_commands_ref(),
params.with_escalated_permissions.unwrap_or(false),
)
};
@@ -2895,7 +2945,7 @@ async fn handle_container_exec_with_params(
sandbox_type,
sandbox_policy: &turn_context.sandbox_policy,
sandbox_cwd: &turn_context.cwd,
codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
codex_linux_sandbox_exe: &sess.services.codex_linux_sandbox_exe,
stdout_stream: if exec_command_context.apply_patch.is_some() {
None
} else {
@@ -3030,7 +3080,7 @@ async fn handle_sandbox_error(
sandbox_type: SandboxType::None,
sandbox_policy: &turn_context.sandbox_policy,
sandbox_cwd: &turn_context.cwd,
codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
codex_linux_sandbox_exe: &sess.services.codex_linux_sandbox_exe,
stdout_stream: if exec_command_context.apply_patch.is_some() {
None
} else {
@@ -3384,7 +3434,7 @@ mod tests {
}),
));
let actual = tokio_test::block_on(async { session.state.lock().await.history.contents() });
let actual = tokio_test::block_on(async { session.state.lock().await.history_snapshot() });
assert_eq!(expected, actual);
}
@@ -3397,7 +3447,7 @@ mod tests {
session.record_initial_history(&turn_context, InitialHistory::Forked(rollout_items)),
);
let actual = tokio_test::block_on(async { session.state.lock().await.history.contents() });
let actual = tokio_test::block_on(async { session.state.lock().await.history_snapshot() });
assert_eq!(expected, actual);
}
@@ -3623,21 +3673,22 @@ mod tests {
is_review_mode: false,
final_output_json_schema: None,
};
let session = Session {
conversation_id,
tx_event,
let services = SessionServices {
mcp_connection_manager: McpConnectionManager::default(),
session_manager: ExecSessionManager::default(),
unified_exec_manager: UnifiedExecSessionManager::default(),
notifier: UserNotifier::default(),
rollout: Mutex::new(None),
state: Mutex::new(State {
history: ConversationHistory::new(),
..Default::default()
}),
codex_linux_sandbox_exe: None,
user_shell: shell::Shell::Unknown,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
};
let session = Session {
conversation_id,
tx_event,
state: Mutex::new(SessionState::new()),
active_turn: Mutex::new(None),
services,
next_internal_sub_id: AtomicU64::new(0),
};
(session, turn_context)

View File

@@ -151,18 +151,12 @@ async fn run_compact_task_inner(
if remove_task_on_completion {
sess.remove_task(&sub_id).await;
}
let history_snapshot = {
let state = sess.state.lock().await;
state.history.contents()
};
let history_snapshot = sess.history_snapshot().await;
let summary_text = get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default();
let user_messages = collect_user_messages(&history_snapshot);
let initial_context = sess.build_initial_context(turn_context.as_ref());
let new_history = build_compacted_history(initial_context, &user_messages, &summary_text);
{
let mut state = sess.state.lock().await;
state.history.replace(new_history);
}
sess.replace_history(new_history).await;
let rollout_item = RolloutItem::Compacted(CompactedItem {
message: summary_text.clone(),
@@ -270,8 +264,7 @@ async fn drain_to_completed(
};
match event {
Ok(ResponseEvent::OutputItemDone(item)) => {
let mut state = sess.state.lock().await;
state.history.record_items(std::slice::from_ref(&item));
sess.record_into_history(std::slice::from_ref(&item)).await;
}
Ok(ResponseEvent::Completed { .. }) => {
return Ok(());

View File

@@ -75,6 +75,7 @@ pub use rollout::find_conversation_path_by_id_str;
pub use rollout::list::ConversationItem;
pub use rollout::list::ConversationsPage;
pub use rollout::list::Cursor;
mod state;
mod user_notification;
pub mod util;

View File

@@ -0,0 +1,8 @@
mod service;
mod session;
mod turn;
pub(crate) use service::SessionServices;
pub(crate) use session::SessionState;
pub(crate) use turn::ActiveTurn;
pub(crate) use turn::TurnState;

View File

@@ -0,0 +1,18 @@
use crate::RolloutRecorder;
use crate::exec_command::ExecSessionManager;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::unified_exec::UnifiedExecSessionManager;
use crate::user_notification::UserNotifier;
use std::path::PathBuf;
use tokio::sync::Mutex;
pub(crate) struct SessionServices {
pub(crate) mcp_connection_manager: McpConnectionManager,
pub(crate) session_manager: ExecSessionManager,
pub(crate) unified_exec_manager: UnifiedExecSessionManager,
pub(crate) notifier: UserNotifier,
pub(crate) rollout: Mutex<Option<RolloutRecorder>>,
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
pub(crate) user_shell: crate::shell::Shell,
pub(crate) show_raw_agent_reasoning: bool,
}

View File

@@ -0,0 +1,82 @@
//! Session-wide mutable state.
use std::collections::HashSet;
use codex_protocol::models::ResponseItem;
use crate::codex::AgentTask;
use crate::conversation_history::ConversationHistory;
use crate::protocol::RateLimitSnapshot;
use crate::protocol::TokenUsage;
use crate::protocol::TokenUsageInfo;
/// Persistent, session-scoped state previously stored directly on `Session`.
#[derive(Default)]
pub(crate) struct SessionState {
pub(crate) approved_commands: HashSet<Vec<String>>,
pub(crate) current_task: Option<AgentTask>,
pub(crate) history: ConversationHistory,
pub(crate) token_info: Option<TokenUsageInfo>,
pub(crate) latest_rate_limits: Option<RateLimitSnapshot>,
}
impl SessionState {
/// Create a new session state mirroring previous `State::default()` semantics.
pub(crate) fn new() -> Self {
Self {
history: ConversationHistory::new(),
..Default::default()
}
}
// History helpers
pub(crate) fn record_items<I>(&mut self, items: I)
where
I: IntoIterator,
I::Item: std::ops::Deref<Target = ResponseItem>,
{
self.history.record_items(items)
}
pub(crate) fn history_snapshot(&self) -> Vec<ResponseItem> {
self.history.contents()
}
pub(crate) fn replace_history(&mut self, items: Vec<ResponseItem>) {
self.history.replace(items);
}
// Approved command helpers
pub(crate) fn add_approved_command(&mut self, cmd: Vec<String>) {
self.approved_commands.insert(cmd);
}
pub(crate) fn approved_commands_ref(&self) -> &HashSet<Vec<String>> {
&self.approved_commands
}
// Token/rate limit helpers
pub(crate) fn update_token_info_from_usage(
&mut self,
usage: &TokenUsage,
model_context_window: Option<u64>,
) {
self.token_info = TokenUsageInfo::new_or_append(
&self.token_info,
&Some(usage.clone()),
model_context_window,
);
}
pub(crate) fn set_rate_limits(&mut self, snapshot: RateLimitSnapshot) {
self.latest_rate_limits = Some(snapshot);
}
pub(crate) fn token_info_and_rate_limits(
&self,
) -> (Option<TokenUsageInfo>, Option<RateLimitSnapshot>) {
(self.token_info.clone(), self.latest_rate_limits.clone())
}
// Pending input/approval moved to TurnState.
}

View File

@@ -0,0 +1,60 @@
//! Turn-scoped state and active turn metadata scaffolding.
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use codex_protocol::models::ResponseInputItem;
use tokio::sync::oneshot;
use crate::protocol::ReviewDecision;
/// Metadata about the currently running turn.
#[derive(Default)]
pub(crate) struct ActiveTurn {
pub(crate) sub_id: String,
pub(crate) turn_state: Arc<Mutex<TurnState>>,
}
/// Mutable state for a single turn.
#[derive(Default)]
pub(crate) struct TurnState {
pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
pending_input: Vec<ResponseInputItem>,
}
impl TurnState {
pub(crate) fn insert_pending_approval(
&mut self,
key: String,
tx: oneshot::Sender<ReviewDecision>,
) -> Option<oneshot::Sender<ReviewDecision>> {
self.pending_approvals.insert(key, tx)
}
pub(crate) fn remove_pending_approval(
&mut self,
key: &str,
) -> Option<oneshot::Sender<ReviewDecision>> {
self.pending_approvals.remove(key)
}
pub(crate) fn clear_pending(&mut self) {
self.pending_approvals.clear();
self.pending_input.clear();
}
pub(crate) fn push_pending_input(&mut self, input: ResponseInputItem) {
self.pending_input.push(input);
}
pub(crate) fn take_pending_input(&mut self) -> Vec<ResponseInputItem> {
if self.pending_input.is_empty() {
Vec::with_capacity(0)
} else {
let mut ret = Vec::new();
std::mem::swap(&mut ret, &mut self.pending_input);
ret
}
}
}