mirror of
https://github.com/openai/codex.git
synced 2026-05-17 17:53:06 +00:00
Compare commits
6 Commits
dev/flaky-
...
jif/state5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
23d5138f35 | ||
|
|
f66c4bcf6f | ||
|
|
c122259012 | ||
|
|
269b2999c7 | ||
|
|
29bfa77cc0 | ||
|
|
c8d65bbf5e |
@@ -1,6 +1,5 @@
|
|||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::collections::HashSet;
|
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -110,7 +109,6 @@ use crate::protocol::Submission;
|
|||||||
use crate::protocol::TaskCompleteEvent;
|
use crate::protocol::TaskCompleteEvent;
|
||||||
use crate::protocol::TokenCountEvent;
|
use crate::protocol::TokenCountEvent;
|
||||||
use crate::protocol::TokenUsage;
|
use crate::protocol::TokenUsage;
|
||||||
use crate::protocol::TokenUsageInfo;
|
|
||||||
use crate::protocol::TurnDiffEvent;
|
use crate::protocol::TurnDiffEvent;
|
||||||
use crate::protocol::WebSearchBeginEvent;
|
use crate::protocol::WebSearchBeginEvent;
|
||||||
use crate::rollout::RolloutRecorder;
|
use crate::rollout::RolloutRecorder;
|
||||||
@@ -118,7 +116,9 @@ use crate::rollout::RolloutRecorderParams;
|
|||||||
use crate::safety::SafetyCheck;
|
use crate::safety::SafetyCheck;
|
||||||
use crate::safety::assess_command_safety;
|
use crate::safety::assess_command_safety;
|
||||||
use crate::safety::assess_safety_for_untrusted_command;
|
use crate::safety::assess_safety_for_untrusted_command;
|
||||||
|
use crate::services::SessionServices;
|
||||||
use crate::shell;
|
use crate::shell;
|
||||||
|
use crate::state::ActiveTurn;
|
||||||
use crate::turn_diff_tracker::TurnDiffTracker;
|
use crate::turn_diff_tracker::TurnDiffTracker;
|
||||||
use crate::unified_exec::UnifiedExecSessionManager;
|
use crate::unified_exec::UnifiedExecSessionManager;
|
||||||
use crate::user_instructions::UserInstructions;
|
use crate::user_instructions::UserInstructions;
|
||||||
@@ -252,17 +252,7 @@ impl Codex {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mutable state of the agent
|
use crate::state::SessionState;
|
||||||
#[derive(Default)]
|
|
||||||
struct State {
|
|
||||||
approved_commands: HashSet<Vec<String>>,
|
|
||||||
current_task: Option<AgentTask>,
|
|
||||||
pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
|
|
||||||
pending_input: Vec<ResponseInputItem>,
|
|
||||||
history: ConversationHistory,
|
|
||||||
token_info: Option<TokenUsageInfo>,
|
|
||||||
latest_rate_limits: Option<RateLimitSnapshot>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Context for an initialized model agent
|
/// Context for an initialized model agent
|
||||||
///
|
///
|
||||||
@@ -270,21 +260,9 @@ struct State {
|
|||||||
pub(crate) struct Session {
|
pub(crate) struct Session {
|
||||||
conversation_id: ConversationId,
|
conversation_id: ConversationId,
|
||||||
tx_event: Sender<Event>,
|
tx_event: Sender<Event>,
|
||||||
|
state: Mutex<SessionState>,
|
||||||
/// Manager for external MCP servers/tools.
|
active_turn: Mutex<Option<ActiveTurn>>,
|
||||||
mcp_connection_manager: McpConnectionManager,
|
services: SessionServices,
|
||||||
session_manager: ExecSessionManager,
|
|
||||||
unified_exec_manager: UnifiedExecSessionManager,
|
|
||||||
|
|
||||||
notifier: UserNotifier,
|
|
||||||
|
|
||||||
/// Optional rollout recorder for persisting the conversation transcript so
|
|
||||||
/// sessions can be replayed or inspected later.
|
|
||||||
rollout: Mutex<Option<RolloutRecorder>>,
|
|
||||||
state: Mutex<State>,
|
|
||||||
codex_linux_sandbox_exe: Option<PathBuf>,
|
|
||||||
user_shell: shell::Shell,
|
|
||||||
show_raw_agent_reasoning: bool,
|
|
||||||
next_internal_sub_id: AtomicU64,
|
next_internal_sub_id: AtomicU64,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -412,10 +390,7 @@ impl Session {
|
|||||||
})?;
|
})?;
|
||||||
let rollout_path = rollout_recorder.rollout_path.clone();
|
let rollout_path = rollout_recorder.rollout_path.clone();
|
||||||
// Create the mutable state for the Session.
|
// Create the mutable state for the Session.
|
||||||
let state = State {
|
let state = SessionState::new();
|
||||||
history: ConversationHistory::new(),
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
// Handle MCP manager result and record any startup failures.
|
// Handle MCP manager result and record any startup failures.
|
||||||
let (mcp_connection_manager, failed_clients) = match mcp_res {
|
let (mcp_connection_manager, failed_clients) = match mcp_res {
|
||||||
@@ -473,18 +448,23 @@ impl Session {
|
|||||||
is_review_mode: false,
|
is_review_mode: false,
|
||||||
final_output_json_schema: None,
|
final_output_json_schema: None,
|
||||||
};
|
};
|
||||||
let sess = Arc::new(Session {
|
let services = SessionServices {
|
||||||
conversation_id,
|
|
||||||
tx_event: tx_event.clone(),
|
|
||||||
mcp_connection_manager,
|
mcp_connection_manager,
|
||||||
session_manager: ExecSessionManager::default(),
|
session_manager: ExecSessionManager::default(),
|
||||||
unified_exec_manager: UnifiedExecSessionManager::default(),
|
unified_exec_manager: UnifiedExecSessionManager::default(),
|
||||||
notifier: notify,
|
notifier: notify,
|
||||||
state: Mutex::new(state),
|
|
||||||
rollout: Mutex::new(Some(rollout_recorder)),
|
rollout: Mutex::new(Some(rollout_recorder)),
|
||||||
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
|
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
|
||||||
user_shell: default_shell,
|
user_shell: default_shell,
|
||||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||||
|
};
|
||||||
|
|
||||||
|
let sess = Arc::new(Session {
|
||||||
|
conversation_id,
|
||||||
|
tx_event: tx_event.clone(),
|
||||||
|
state: Mutex::new(state),
|
||||||
|
active_turn: Mutex::new(None),
|
||||||
|
services,
|
||||||
next_internal_sub_id: AtomicU64::new(0),
|
next_internal_sub_id: AtomicU64::new(0),
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -520,6 +500,15 @@ impl Session {
|
|||||||
current_task.abort(TurnAbortReason::Replaced);
|
current_task.abort(TurnAbortReason::Replaced);
|
||||||
}
|
}
|
||||||
state.current_task = Some(task);
|
state.current_task = Some(task);
|
||||||
|
if let Some(current_task) = &state.current_task {
|
||||||
|
let mut active = self.active_turn.lock().await;
|
||||||
|
*active = Some(ActiveTurn {
|
||||||
|
sub_id: current_task.sub_id.clone(),
|
||||||
|
turn_state: std::sync::Arc::new(tokio::sync::Mutex::new(
|
||||||
|
crate::state::TurnState::default(),
|
||||||
|
)),
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn remove_task(&self, sub_id: &str) {
|
pub async fn remove_task(&self, sub_id: &str) {
|
||||||
@@ -529,6 +518,12 @@ impl Session {
|
|||||||
{
|
{
|
||||||
state.current_task.take();
|
state.current_task.take();
|
||||||
}
|
}
|
||||||
|
let mut active = self.active_turn.lock().await;
|
||||||
|
if let Some(at) = &*active
|
||||||
|
&& at.sub_id == sub_id
|
||||||
|
{
|
||||||
|
*active = None;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn next_internal_sub_id(&self) -> String {
|
fn next_internal_sub_id(&self) -> String {
|
||||||
@@ -590,8 +585,14 @@ impl Session {
|
|||||||
let (tx_approve, rx_approve) = oneshot::channel();
|
let (tx_approve, rx_approve) = oneshot::channel();
|
||||||
let event_id = sub_id.clone();
|
let event_id = sub_id.clone();
|
||||||
let prev_entry = {
|
let prev_entry = {
|
||||||
let mut state = self.state.lock().await;
|
let mut active = self.active_turn.lock().await;
|
||||||
state.pending_approvals.insert(sub_id, tx_approve)
|
match active.as_mut() {
|
||||||
|
Some(at) => {
|
||||||
|
let mut ts = at.turn_state.lock().await;
|
||||||
|
ts.insert_pending_approval(sub_id, tx_approve)
|
||||||
|
}
|
||||||
|
None => None,
|
||||||
|
}
|
||||||
};
|
};
|
||||||
if prev_entry.is_some() {
|
if prev_entry.is_some() {
|
||||||
warn!("Overwriting existing pending approval for sub_id: {event_id}");
|
warn!("Overwriting existing pending approval for sub_id: {event_id}");
|
||||||
@@ -622,8 +623,14 @@ impl Session {
|
|||||||
let (tx_approve, rx_approve) = oneshot::channel();
|
let (tx_approve, rx_approve) = oneshot::channel();
|
||||||
let event_id = sub_id.clone();
|
let event_id = sub_id.clone();
|
||||||
let prev_entry = {
|
let prev_entry = {
|
||||||
let mut state = self.state.lock().await;
|
let mut active = self.active_turn.lock().await;
|
||||||
state.pending_approvals.insert(sub_id, tx_approve)
|
match active.as_mut() {
|
||||||
|
Some(at) => {
|
||||||
|
let mut ts = at.turn_state.lock().await;
|
||||||
|
ts.insert_pending_approval(sub_id, tx_approve)
|
||||||
|
}
|
||||||
|
None => None,
|
||||||
|
}
|
||||||
};
|
};
|
||||||
if prev_entry.is_some() {
|
if prev_entry.is_some() {
|
||||||
warn!("Overwriting existing pending approval for sub_id: {event_id}");
|
warn!("Overwriting existing pending approval for sub_id: {event_id}");
|
||||||
@@ -644,8 +651,14 @@ impl Session {
|
|||||||
|
|
||||||
pub async fn notify_approval(&self, sub_id: &str, decision: ReviewDecision) {
|
pub async fn notify_approval(&self, sub_id: &str, decision: ReviewDecision) {
|
||||||
let entry = {
|
let entry = {
|
||||||
let mut state = self.state.lock().await;
|
let mut active = self.active_turn.lock().await;
|
||||||
state.pending_approvals.remove(sub_id)
|
match active.as_mut() {
|
||||||
|
Some(at) => {
|
||||||
|
let mut ts = at.turn_state.lock().await;
|
||||||
|
ts.remove_pending_approval(sub_id)
|
||||||
|
}
|
||||||
|
None => None,
|
||||||
|
}
|
||||||
};
|
};
|
||||||
match entry {
|
match entry {
|
||||||
Some(tx_approve) => {
|
Some(tx_approve) => {
|
||||||
@@ -659,7 +672,7 @@ impl Session {
|
|||||||
|
|
||||||
pub async fn add_approved_command(&self, cmd: Vec<String>) {
|
pub async fn add_approved_command(&self, cmd: Vec<String>) {
|
||||||
let mut state = self.state.lock().await;
|
let mut state = self.state.lock().await;
|
||||||
state.approved_commands.insert(cmd);
|
state.add_approved_command(cmd);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Records input items: always append to conversation history and
|
/// Records input items: always append to conversation history and
|
||||||
@@ -699,7 +712,7 @@ impl Session {
|
|||||||
/// Append ResponseItems to the in-memory conversation history only.
|
/// Append ResponseItems to the in-memory conversation history only.
|
||||||
async fn record_into_history(&self, items: &[ResponseItem]) {
|
async fn record_into_history(&self, items: &[ResponseItem]) {
|
||||||
let mut state = self.state.lock().await;
|
let mut state = self.state.lock().await;
|
||||||
state.history.record_items(items.iter());
|
state.record_items(items.iter());
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn persist_rollout_response_items(&self, items: &[ResponseItem]) {
|
async fn persist_rollout_response_items(&self, items: &[ResponseItem]) {
|
||||||
@@ -720,14 +733,14 @@ impl Session {
|
|||||||
Some(turn_context.cwd.clone()),
|
Some(turn_context.cwd.clone()),
|
||||||
Some(turn_context.approval_policy),
|
Some(turn_context.approval_policy),
|
||||||
Some(turn_context.sandbox_policy.clone()),
|
Some(turn_context.sandbox_policy.clone()),
|
||||||
Some(self.user_shell.clone()),
|
Some(self.services.user_shell.clone()),
|
||||||
)));
|
)));
|
||||||
items
|
items
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn persist_rollout_items(&self, items: &[RolloutItem]) {
|
async fn persist_rollout_items(&self, items: &[RolloutItem]) {
|
||||||
let recorder = {
|
let recorder = {
|
||||||
let guard = self.rollout.lock().await;
|
let guard = self.services.rollout.lock().await;
|
||||||
guard.clone()
|
guard.clone()
|
||||||
};
|
};
|
||||||
if let Some(rec) = recorder
|
if let Some(rec) = recorder
|
||||||
@@ -746,12 +759,10 @@ impl Session {
|
|||||||
{
|
{
|
||||||
let mut state = self.state.lock().await;
|
let mut state = self.state.lock().await;
|
||||||
if let Some(token_usage) = token_usage {
|
if let Some(token_usage) = token_usage {
|
||||||
let info = TokenUsageInfo::new_or_append(
|
state.update_token_info_from_usage(
|
||||||
&state.token_info,
|
token_usage,
|
||||||
&Some(token_usage.clone()),
|
|
||||||
turn_context.client.get_model_context_window(),
|
turn_context.client.get_model_context_window(),
|
||||||
);
|
);
|
||||||
state.token_info = info;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.send_token_count_event(sub_id).await;
|
self.send_token_count_event(sub_id).await;
|
||||||
@@ -760,7 +771,7 @@ impl Session {
|
|||||||
async fn update_rate_limits(&self, sub_id: &str, new_rate_limits: RateLimitSnapshot) {
|
async fn update_rate_limits(&self, sub_id: &str, new_rate_limits: RateLimitSnapshot) {
|
||||||
{
|
{
|
||||||
let mut state = self.state.lock().await;
|
let mut state = self.state.lock().await;
|
||||||
state.latest_rate_limits = Some(new_rate_limits);
|
state.set_rate_limits(new_rate_limits);
|
||||||
}
|
}
|
||||||
self.send_token_count_event(sub_id).await;
|
self.send_token_count_event(sub_id).await;
|
||||||
}
|
}
|
||||||
@@ -768,7 +779,7 @@ impl Session {
|
|||||||
async fn send_token_count_event(&self, sub_id: &str) {
|
async fn send_token_count_event(&self, sub_id: &str) {
|
||||||
let (info, rate_limits) = {
|
let (info, rate_limits) = {
|
||||||
let state = self.state.lock().await;
|
let state = self.state.lock().await;
|
||||||
(state.token_info.clone(), state.latest_rate_limits.clone())
|
state.token_info_and_rate_limits()
|
||||||
};
|
};
|
||||||
let event = Event {
|
let event = Event {
|
||||||
id: sub_id.to_string(),
|
id: sub_id.to_string(),
|
||||||
@@ -786,8 +797,10 @@ impl Session {
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
// Derive user message events and persist only UserMessage to rollout
|
// Derive user message events and persist only UserMessage to rollout
|
||||||
let msgs =
|
let msgs = map_response_item_to_event_messages(
|
||||||
map_response_item_to_event_messages(&response_item, self.show_raw_agent_reasoning);
|
&response_item,
|
||||||
|
self.services.show_raw_agent_reasoning,
|
||||||
|
);
|
||||||
let user_msgs: Vec<RolloutItem> = msgs
|
let user_msgs: Vec<RolloutItem> = msgs
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|m| match m {
|
.filter_map(|m| match m {
|
||||||
@@ -986,16 +999,20 @@ impl Session {
|
|||||||
pub async fn turn_input_with_history(&self, extra: Vec<ResponseItem>) -> Vec<ResponseItem> {
|
pub async fn turn_input_with_history(&self, extra: Vec<ResponseItem>) -> Vec<ResponseItem> {
|
||||||
let history = {
|
let history = {
|
||||||
let state = self.state.lock().await;
|
let state = self.state.lock().await;
|
||||||
state.history.contents()
|
state.history_snapshot()
|
||||||
};
|
};
|
||||||
[history, extra].concat()
|
[history, extra].concat()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the input if there was no task running to inject into
|
/// Returns the input if there was no task running to inject into
|
||||||
pub async fn inject_input(&self, input: Vec<InputItem>) -> Result<(), Vec<InputItem>> {
|
pub async fn inject_input(&self, input: Vec<InputItem>) -> Result<(), Vec<InputItem>> {
|
||||||
let mut state = self.state.lock().await;
|
let state = self.state.lock().await;
|
||||||
if state.current_task.is_some() {
|
if state.current_task.is_some() {
|
||||||
state.pending_input.push(input.into());
|
let mut active = self.active_turn.lock().await;
|
||||||
|
if let Some(at) = active.as_mut() {
|
||||||
|
let mut ts = at.turn_state.lock().await;
|
||||||
|
ts.push_pending_input(input.into());
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
Err(input)
|
Err(input)
|
||||||
@@ -1003,13 +1020,12 @@ impl Session {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_pending_input(&self) -> Vec<ResponseInputItem> {
|
pub async fn get_pending_input(&self) -> Vec<ResponseInputItem> {
|
||||||
let mut state = self.state.lock().await;
|
let mut active = self.active_turn.lock().await;
|
||||||
if state.pending_input.is_empty() {
|
if let Some(at) = active.as_mut() {
|
||||||
Vec::with_capacity(0)
|
let mut ts = at.turn_state.lock().await;
|
||||||
|
ts.take_pending_input()
|
||||||
} else {
|
} else {
|
||||||
let mut ret = Vec::new();
|
Vec::with_capacity(0)
|
||||||
std::mem::swap(&mut ret, &mut state.pending_input);
|
|
||||||
ret
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1019,7 +1035,8 @@ impl Session {
|
|||||||
tool: &str,
|
tool: &str,
|
||||||
arguments: Option<serde_json::Value>,
|
arguments: Option<serde_json::Value>,
|
||||||
) -> anyhow::Result<CallToolResult> {
|
) -> anyhow::Result<CallToolResult> {
|
||||||
self.mcp_connection_manager
|
self.services
|
||||||
|
.mcp_connection_manager
|
||||||
.call_tool(server, tool, arguments)
|
.call_tool(server, tool, arguments)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
@@ -1027,8 +1044,11 @@ impl Session {
|
|||||||
pub async fn interrupt_task(&self) {
|
pub async fn interrupt_task(&self) {
|
||||||
info!("interrupt received: abort current task, if any");
|
info!("interrupt received: abort current task, if any");
|
||||||
let mut state = self.state.lock().await;
|
let mut state = self.state.lock().await;
|
||||||
state.pending_approvals.clear();
|
let mut active = self.active_turn.lock().await;
|
||||||
state.pending_input.clear();
|
if let Some(at) = active.as_mut() {
|
||||||
|
let mut ts = at.turn_state.lock().await;
|
||||||
|
ts.clear_pending();
|
||||||
|
}
|
||||||
if let Some(task) = state.current_task.take() {
|
if let Some(task) = state.current_task.take() {
|
||||||
task.abort(TurnAbortReason::Interrupted);
|
task.abort(TurnAbortReason::Interrupted);
|
||||||
}
|
}
|
||||||
@@ -1036,8 +1056,12 @@ impl Session {
|
|||||||
|
|
||||||
fn interrupt_task_sync(&self) {
|
fn interrupt_task_sync(&self) {
|
||||||
if let Ok(mut state) = self.state.try_lock() {
|
if let Ok(mut state) = self.state.try_lock() {
|
||||||
state.pending_approvals.clear();
|
if let Ok(mut active) = self.active_turn.try_lock()
|
||||||
state.pending_input.clear();
|
&& let Some(at) = active.as_mut()
|
||||||
|
&& let Ok(mut ts) = at.turn_state.try_lock()
|
||||||
|
{
|
||||||
|
ts.clear_pending();
|
||||||
|
}
|
||||||
if let Some(task) = state.current_task.take() {
|
if let Some(task) = state.current_task.take() {
|
||||||
task.abort(TurnAbortReason::Interrupted);
|
task.abort(TurnAbortReason::Interrupted);
|
||||||
}
|
}
|
||||||
@@ -1045,7 +1069,7 @@ impl Session {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn notifier(&self) -> &UserNotifier {
|
pub(crate) fn notifier(&self) -> &UserNotifier {
|
||||||
&self.notifier
|
&self.services.notifier
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1417,7 +1441,7 @@ async fn submission_loop(
|
|||||||
let sub_id = sub.id.clone();
|
let sub_id = sub.id.clone();
|
||||||
|
|
||||||
// This is a cheap lookup from the connection manager's cache.
|
// This is a cheap lookup from the connection manager's cache.
|
||||||
let tools = sess.mcp_connection_manager.list_all_tools();
|
let tools = sess.services.mcp_connection_manager.list_all_tools();
|
||||||
let event = Event {
|
let event = Event {
|
||||||
id: sub_id,
|
id: sub_id,
|
||||||
msg: EventMsg::McpListToolsResponse(
|
msg: EventMsg::McpListToolsResponse(
|
||||||
@@ -1467,7 +1491,7 @@ async fn submission_loop(
|
|||||||
// Gracefully flush and shutdown rollout recorder on session end so tests
|
// Gracefully flush and shutdown rollout recorder on session end so tests
|
||||||
// that inspect the rollout file do not race with the background writer.
|
// that inspect the rollout file do not race with the background writer.
|
||||||
let recorder_opt = {
|
let recorder_opt = {
|
||||||
let mut guard = sess.rollout.lock().await;
|
let mut guard = sess.services.rollout.lock().await;
|
||||||
guard.take()
|
guard.take()
|
||||||
};
|
};
|
||||||
if let Some(rec) = recorder_opt
|
if let Some(rec) = recorder_opt
|
||||||
@@ -1494,7 +1518,7 @@ async fn submission_loop(
|
|||||||
let sub_id = sub.id.clone();
|
let sub_id = sub.id.clone();
|
||||||
// Flush rollout writes before returning the path so readers observe a consistent file.
|
// Flush rollout writes before returning the path so readers observe a consistent file.
|
||||||
let (path, rec_opt) = {
|
let (path, rec_opt) = {
|
||||||
let guard = sess.rollout.lock().await;
|
let guard = sess.services.rollout.lock().await;
|
||||||
match guard.as_ref() {
|
match guard.as_ref() {
|
||||||
Some(rec) => (rec.get_rollout_path(), Some(rec.clone())),
|
Some(rec) => (rec.get_rollout_path(), Some(rec.clone())),
|
||||||
None => {
|
None => {
|
||||||
@@ -1952,7 +1976,7 @@ async fn run_turn(
|
|||||||
) -> CodexResult<TurnRunResult> {
|
) -> CodexResult<TurnRunResult> {
|
||||||
let tools = get_openai_tools(
|
let tools = get_openai_tools(
|
||||||
&turn_context.tools_config,
|
&turn_context.tools_config,
|
||||||
Some(sess.mcp_connection_manager.list_all_tools()),
|
Some(sess.services.mcp_connection_manager.list_all_tools()),
|
||||||
);
|
);
|
||||||
|
|
||||||
let prompt = Prompt {
|
let prompt = Prompt {
|
||||||
@@ -2202,7 +2226,7 @@ async fn try_run_turn(
|
|||||||
sess.send_event(event).await;
|
sess.send_event(event).await;
|
||||||
}
|
}
|
||||||
ResponseEvent::ReasoningContentDelta(delta) => {
|
ResponseEvent::ReasoningContentDelta(delta) => {
|
||||||
if sess.show_raw_agent_reasoning {
|
if sess.services.show_raw_agent_reasoning {
|
||||||
let event = Event {
|
let event = Event {
|
||||||
id: sub_id.to_string(),
|
id: sub_id.to_string(),
|
||||||
msg: EventMsg::AgentReasoningRawContentDelta(
|
msg: EventMsg::AgentReasoningRawContentDelta(
|
||||||
@@ -2324,7 +2348,10 @@ async fn handle_response_item(
|
|||||||
trace!("suppressing assistant Message in review mode");
|
trace!("suppressing assistant Message in review mode");
|
||||||
Vec::new()
|
Vec::new()
|
||||||
}
|
}
|
||||||
_ => map_response_item_to_event_messages(&item, sess.show_raw_agent_reasoning),
|
_ => map_response_item_to_event_messages(
|
||||||
|
&item,
|
||||||
|
sess.services.show_raw_agent_reasoning,
|
||||||
|
),
|
||||||
};
|
};
|
||||||
for msg in msgs {
|
for msg in msgs {
|
||||||
let event = Event {
|
let event = Event {
|
||||||
@@ -2370,7 +2397,11 @@ async fn handle_unified_exec_tool_call(
|
|||||||
timeout_ms,
|
timeout_ms,
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = sess.unified_exec_manager.handle_request(request).await;
|
let result = sess
|
||||||
|
.services
|
||||||
|
.unified_exec_manager
|
||||||
|
.handle_request(request)
|
||||||
|
.await;
|
||||||
|
|
||||||
let output_payload = match result {
|
let output_payload = match result {
|
||||||
Ok(value) => {
|
Ok(value) => {
|
||||||
@@ -2545,6 +2576,7 @@ async fn handle_function_call(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
let result = sess
|
let result = sess
|
||||||
|
.services
|
||||||
.session_manager
|
.session_manager
|
||||||
.handle_exec_command_request(exec_params)
|
.handle_exec_command_request(exec_params)
|
||||||
.await;
|
.await;
|
||||||
@@ -2568,6 +2600,7 @@ async fn handle_function_call(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
let result = sess
|
let result = sess
|
||||||
|
.services
|
||||||
.session_manager
|
.session_manager
|
||||||
.handle_write_stdin_request(write_stdin_params)
|
.handle_write_stdin_request(write_stdin_params)
|
||||||
.await;
|
.await;
|
||||||
@@ -2579,7 +2612,7 @@ async fn handle_function_call(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
match sess.mcp_connection_manager.parse_tool_name(&name) {
|
match sess.services.mcp_connection_manager.parse_tool_name(&name) {
|
||||||
Some((server, tool_name)) => {
|
Some((server, tool_name)) => {
|
||||||
handle_mcp_tool_call(sess, &sub_id, call_id, server, tool_name, arguments).await
|
handle_mcp_tool_call(sess, &sub_id, call_id, server, tool_name, arguments).await
|
||||||
}
|
}
|
||||||
@@ -2697,11 +2730,12 @@ fn maybe_translate_shell_command(
|
|||||||
sess: &Session,
|
sess: &Session,
|
||||||
turn_context: &TurnContext,
|
turn_context: &TurnContext,
|
||||||
) -> ExecParams {
|
) -> ExecParams {
|
||||||
let should_translate = matches!(sess.user_shell, crate::shell::Shell::PowerShell(_))
|
let should_translate = matches!(sess.services.user_shell, crate::shell::Shell::PowerShell(_))
|
||||||
|| turn_context.shell_environment_policy.use_profile;
|
|| turn_context.shell_environment_policy.use_profile;
|
||||||
|
|
||||||
if should_translate
|
if should_translate
|
||||||
&& let Some(command) = sess
|
&& let Some(command) = sess
|
||||||
|
.services
|
||||||
.user_shell
|
.user_shell
|
||||||
.format_default_shell_invocation(params.command.clone())
|
.format_default_shell_invocation(params.command.clone())
|
||||||
{
|
{
|
||||||
@@ -2816,7 +2850,7 @@ async fn handle_container_exec_with_params(
|
|||||||
¶ms.command,
|
¶ms.command,
|
||||||
turn_context.approval_policy,
|
turn_context.approval_policy,
|
||||||
&turn_context.sandbox_policy,
|
&turn_context.sandbox_policy,
|
||||||
&state.approved_commands,
|
state.approved_commands_ref(),
|
||||||
params.with_escalated_permissions.unwrap_or(false),
|
params.with_escalated_permissions.unwrap_or(false),
|
||||||
)
|
)
|
||||||
};
|
};
|
||||||
@@ -2895,7 +2929,7 @@ async fn handle_container_exec_with_params(
|
|||||||
sandbox_type,
|
sandbox_type,
|
||||||
sandbox_policy: &turn_context.sandbox_policy,
|
sandbox_policy: &turn_context.sandbox_policy,
|
||||||
sandbox_cwd: &turn_context.cwd,
|
sandbox_cwd: &turn_context.cwd,
|
||||||
codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
|
codex_linux_sandbox_exe: &sess.services.codex_linux_sandbox_exe,
|
||||||
stdout_stream: if exec_command_context.apply_patch.is_some() {
|
stdout_stream: if exec_command_context.apply_patch.is_some() {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
@@ -3030,7 +3064,7 @@ async fn handle_sandbox_error(
|
|||||||
sandbox_type: SandboxType::None,
|
sandbox_type: SandboxType::None,
|
||||||
sandbox_policy: &turn_context.sandbox_policy,
|
sandbox_policy: &turn_context.sandbox_policy,
|
||||||
sandbox_cwd: &turn_context.cwd,
|
sandbox_cwd: &turn_context.cwd,
|
||||||
codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
|
codex_linux_sandbox_exe: &sess.services.codex_linux_sandbox_exe,
|
||||||
stdout_stream: if exec_command_context.apply_patch.is_some() {
|
stdout_stream: if exec_command_context.apply_patch.is_some() {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
@@ -3384,7 +3418,7 @@ mod tests {
|
|||||||
}),
|
}),
|
||||||
));
|
));
|
||||||
|
|
||||||
let actual = tokio_test::block_on(async { session.state.lock().await.history.contents() });
|
let actual = tokio_test::block_on(async { session.state.lock().await.history_snapshot() });
|
||||||
assert_eq!(expected, actual);
|
assert_eq!(expected, actual);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -3397,7 +3431,7 @@ mod tests {
|
|||||||
session.record_initial_history(&turn_context, InitialHistory::Forked(rollout_items)),
|
session.record_initial_history(&turn_context, InitialHistory::Forked(rollout_items)),
|
||||||
);
|
);
|
||||||
|
|
||||||
let actual = tokio_test::block_on(async { session.state.lock().await.history.contents() });
|
let actual = tokio_test::block_on(async { session.state.lock().await.history_snapshot() });
|
||||||
assert_eq!(expected, actual);
|
assert_eq!(expected, actual);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -3623,21 +3657,22 @@ mod tests {
|
|||||||
is_review_mode: false,
|
is_review_mode: false,
|
||||||
final_output_json_schema: None,
|
final_output_json_schema: None,
|
||||||
};
|
};
|
||||||
let session = Session {
|
let services = SessionServices {
|
||||||
conversation_id,
|
|
||||||
tx_event,
|
|
||||||
mcp_connection_manager: McpConnectionManager::default(),
|
mcp_connection_manager: McpConnectionManager::default(),
|
||||||
session_manager: ExecSessionManager::default(),
|
session_manager: ExecSessionManager::default(),
|
||||||
unified_exec_manager: UnifiedExecSessionManager::default(),
|
unified_exec_manager: UnifiedExecSessionManager::default(),
|
||||||
notifier: UserNotifier::default(),
|
notifier: UserNotifier::default(),
|
||||||
rollout: Mutex::new(None),
|
rollout: Mutex::new(None),
|
||||||
state: Mutex::new(State {
|
|
||||||
history: ConversationHistory::new(),
|
|
||||||
..Default::default()
|
|
||||||
}),
|
|
||||||
codex_linux_sandbox_exe: None,
|
codex_linux_sandbox_exe: None,
|
||||||
user_shell: shell::Shell::Unknown,
|
user_shell: shell::Shell::Unknown,
|
||||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||||
|
};
|
||||||
|
let session = Session {
|
||||||
|
conversation_id,
|
||||||
|
tx_event,
|
||||||
|
state: Mutex::new(SessionState::new()),
|
||||||
|
active_turn: Mutex::new(None),
|
||||||
|
services,
|
||||||
next_internal_sub_id: AtomicU64::new(0),
|
next_internal_sub_id: AtomicU64::new(0),
|
||||||
};
|
};
|
||||||
(session, turn_context)
|
(session, turn_context)
|
||||||
|
|||||||
@@ -153,7 +153,7 @@ async fn run_compact_task_inner(
|
|||||||
}
|
}
|
||||||
let history_snapshot = {
|
let history_snapshot = {
|
||||||
let state = sess.state.lock().await;
|
let state = sess.state.lock().await;
|
||||||
state.history.contents()
|
state.history_snapshot()
|
||||||
};
|
};
|
||||||
let summary_text = get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default();
|
let summary_text = get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default();
|
||||||
let user_messages = collect_user_messages(&history_snapshot);
|
let user_messages = collect_user_messages(&history_snapshot);
|
||||||
@@ -161,7 +161,7 @@ async fn run_compact_task_inner(
|
|||||||
let new_history = build_compacted_history(initial_context, &user_messages, &summary_text);
|
let new_history = build_compacted_history(initial_context, &user_messages, &summary_text);
|
||||||
{
|
{
|
||||||
let mut state = sess.state.lock().await;
|
let mut state = sess.state.lock().await;
|
||||||
state.history.replace(new_history);
|
state.replace_history(new_history);
|
||||||
}
|
}
|
||||||
|
|
||||||
let rollout_item = RolloutItem::Compacted(CompactedItem {
|
let rollout_item = RolloutItem::Compacted(CompactedItem {
|
||||||
@@ -271,7 +271,7 @@ async fn drain_to_completed(
|
|||||||
match event {
|
match event {
|
||||||
Ok(ResponseEvent::OutputItemDone(item)) => {
|
Ok(ResponseEvent::OutputItemDone(item)) => {
|
||||||
let mut state = sess.state.lock().await;
|
let mut state = sess.state.lock().await;
|
||||||
state.history.record_items(std::slice::from_ref(&item));
|
state.record_items(std::slice::from_ref(&item));
|
||||||
}
|
}
|
||||||
Ok(ResponseEvent::Completed { .. }) => {
|
Ok(ResponseEvent::Completed { .. }) => {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ pub use model_provider_info::create_oss_provider_with_base_url;
|
|||||||
mod conversation_manager;
|
mod conversation_manager;
|
||||||
mod event_mapping;
|
mod event_mapping;
|
||||||
pub mod review_format;
|
pub mod review_format;
|
||||||
|
mod services;
|
||||||
pub use codex_protocol::protocol::InitialHistory;
|
pub use codex_protocol::protocol::InitialHistory;
|
||||||
pub use conversation_manager::ConversationManager;
|
pub use conversation_manager::ConversationManager;
|
||||||
pub use conversation_manager::NewConversation;
|
pub use conversation_manager::NewConversation;
|
||||||
@@ -75,6 +76,7 @@ pub use rollout::find_conversation_path_by_id_str;
|
|||||||
pub use rollout::list::ConversationItem;
|
pub use rollout::list::ConversationItem;
|
||||||
pub use rollout::list::ConversationsPage;
|
pub use rollout::list::ConversationsPage;
|
||||||
pub use rollout::list::Cursor;
|
pub use rollout::list::Cursor;
|
||||||
|
mod state;
|
||||||
mod user_notification;
|
mod user_notification;
|
||||||
pub mod util;
|
pub mod util;
|
||||||
|
|
||||||
|
|||||||
23
codex-rs/core/src/services/mod.rs
Normal file
23
codex-rs/core/src/services/mod.rs
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
//! Group long‑lived helpers/managers for a session.
|
||||||
|
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
use crate::exec_command::ExecSessionManager;
|
||||||
|
use crate::mcp_connection_manager::McpConnectionManager;
|
||||||
|
use crate::rollout::RolloutRecorder;
|
||||||
|
use crate::unified_exec::UnifiedExecSessionManager;
|
||||||
|
use crate::user_notification::UserNotifier;
|
||||||
|
|
||||||
|
/// Container for side‑effectful services and helpers used by `Session`.
|
||||||
|
pub(crate) struct SessionServices {
|
||||||
|
pub(crate) mcp_connection_manager: McpConnectionManager,
|
||||||
|
pub(crate) session_manager: ExecSessionManager,
|
||||||
|
pub(crate) unified_exec_manager: UnifiedExecSessionManager,
|
||||||
|
pub(crate) notifier: UserNotifier,
|
||||||
|
pub(crate) rollout: Mutex<Option<RolloutRecorder>>,
|
||||||
|
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
|
||||||
|
pub(crate) user_shell: crate::shell::Shell,
|
||||||
|
pub(crate) show_raw_agent_reasoning: bool,
|
||||||
|
}
|
||||||
18
codex-rs/core/src/state/mod.rs
Normal file
18
codex-rs/core/src/state/mod.rs
Normal file
@@ -0,0 +1,18 @@
|
|||||||
|
//! Session/turn state module.
|
||||||
|
//!
|
||||||
|
//! Encapsulates all mutable state for a Codex session and the currently active
|
||||||
|
//! turn. The goal is to present lock-safe, narrow APIs so other modules never
|
||||||
|
//! need to poke at raw mutexes or internal fields.
|
||||||
|
//!
|
||||||
|
//! Locking guidelines
|
||||||
|
//! - Lock ordering: `SessionState` → `ActiveTurn` → `TurnState`.
|
||||||
|
//! - Never hold a lock across an `.await`. Extract minimal data and drop the
|
||||||
|
//! guard before awaiting.
|
||||||
|
//! - Prefer helper methods on these types rather than exposing fields.
|
||||||
|
|
||||||
|
mod session;
|
||||||
|
mod turn;
|
||||||
|
|
||||||
|
pub(crate) use session::SessionState;
|
||||||
|
pub(crate) use turn::ActiveTurn;
|
||||||
|
pub(crate) use turn::TurnState;
|
||||||
82
codex-rs/core/src/state/session.rs
Normal file
82
codex-rs/core/src/state/session.rs
Normal file
@@ -0,0 +1,82 @@
|
|||||||
|
//! Session-wide mutable state.
|
||||||
|
|
||||||
|
use std::collections::HashSet;
|
||||||
|
|
||||||
|
use codex_protocol::models::ResponseItem;
|
||||||
|
|
||||||
|
use crate::codex::AgentTask;
|
||||||
|
use crate::conversation_history::ConversationHistory;
|
||||||
|
use crate::protocol::RateLimitSnapshot;
|
||||||
|
use crate::protocol::TokenUsage;
|
||||||
|
use crate::protocol::TokenUsageInfo;
|
||||||
|
|
||||||
|
/// Persistent, session-scoped state previously stored directly on `Session`.
|
||||||
|
#[derive(Default)]
|
||||||
|
pub(crate) struct SessionState {
|
||||||
|
pub(crate) approved_commands: HashSet<Vec<String>>,
|
||||||
|
pub(crate) current_task: Option<AgentTask>,
|
||||||
|
pub(crate) history: ConversationHistory,
|
||||||
|
pub(crate) token_info: Option<TokenUsageInfo>,
|
||||||
|
pub(crate) latest_rate_limits: Option<RateLimitSnapshot>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SessionState {
|
||||||
|
/// Create a new session state mirroring previous `State::default()` semantics.
|
||||||
|
pub(crate) fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
history: ConversationHistory::new(),
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// History helpers
|
||||||
|
pub(crate) fn record_items<I>(&mut self, items: I)
|
||||||
|
where
|
||||||
|
I: IntoIterator,
|
||||||
|
I::Item: std::ops::Deref<Target = ResponseItem>,
|
||||||
|
{
|
||||||
|
self.history.record_items(items)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn history_snapshot(&self) -> Vec<ResponseItem> {
|
||||||
|
self.history.contents()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn replace_history(&mut self, items: Vec<ResponseItem>) {
|
||||||
|
self.history.replace(items);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Approved command helpers
|
||||||
|
pub(crate) fn add_approved_command(&mut self, cmd: Vec<String>) {
|
||||||
|
self.approved_commands.insert(cmd);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn approved_commands_ref(&self) -> &HashSet<Vec<String>> {
|
||||||
|
&self.approved_commands
|
||||||
|
}
|
||||||
|
|
||||||
|
// Token/rate limit helpers
|
||||||
|
pub(crate) fn update_token_info_from_usage(
|
||||||
|
&mut self,
|
||||||
|
usage: &TokenUsage,
|
||||||
|
model_context_window: Option<u64>,
|
||||||
|
) {
|
||||||
|
self.token_info = TokenUsageInfo::new_or_append(
|
||||||
|
&self.token_info,
|
||||||
|
&Some(usage.clone()),
|
||||||
|
model_context_window,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn set_rate_limits(&mut self, snapshot: RateLimitSnapshot) {
|
||||||
|
self.latest_rate_limits = Some(snapshot);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn token_info_and_rate_limits(
|
||||||
|
&self,
|
||||||
|
) -> (Option<TokenUsageInfo>, Option<RateLimitSnapshot>) {
|
||||||
|
(self.token_info.clone(), self.latest_rate_limits.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pending input/approval moved to TurnState.
|
||||||
|
}
|
||||||
60
codex-rs/core/src/state/turn.rs
Normal file
60
codex-rs/core/src/state/turn.rs
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
//! Turn-scoped state and active turn metadata scaffolding.
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
use codex_protocol::models::ResponseInputItem;
|
||||||
|
use tokio::sync::oneshot;
|
||||||
|
|
||||||
|
use crate::protocol::ReviewDecision;
|
||||||
|
|
||||||
|
/// Metadata about the currently running turn.
|
||||||
|
#[derive(Default)]
|
||||||
|
pub(crate) struct ActiveTurn {
|
||||||
|
pub(crate) sub_id: String,
|
||||||
|
pub(crate) turn_state: Arc<Mutex<TurnState>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Mutable state for a single turn.
|
||||||
|
#[derive(Default)]
|
||||||
|
pub(crate) struct TurnState {
|
||||||
|
pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
|
||||||
|
pending_input: Vec<ResponseInputItem>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TurnState {
|
||||||
|
pub(crate) fn insert_pending_approval(
|
||||||
|
&mut self,
|
||||||
|
key: String,
|
||||||
|
tx: oneshot::Sender<ReviewDecision>,
|
||||||
|
) -> Option<oneshot::Sender<ReviewDecision>> {
|
||||||
|
self.pending_approvals.insert(key, tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn remove_pending_approval(
|
||||||
|
&mut self,
|
||||||
|
key: &str,
|
||||||
|
) -> Option<oneshot::Sender<ReviewDecision>> {
|
||||||
|
self.pending_approvals.remove(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn clear_pending(&mut self) {
|
||||||
|
self.pending_approvals.clear();
|
||||||
|
self.pending_input.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn push_pending_input(&mut self, input: ResponseInputItem) {
|
||||||
|
self.pending_input.push(input);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn take_pending_input(&mut self) -> Vec<ResponseInputItem> {
|
||||||
|
if self.pending_input.is_empty() {
|
||||||
|
Vec::with_capacity(0)
|
||||||
|
} else {
|
||||||
|
let mut ret = Vec::new();
|
||||||
|
std::mem::swap(&mut ret, &mut self.pending_input);
|
||||||
|
ret
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user