core: introduce services::SessionServices and move helpers off Session

This commit is contained in:
jimmyfraiture
2025-09-24 17:12:13 +01:00
parent 29bfa77cc0
commit 269b2999c7
3 changed files with 71 additions and 39 deletions

View File

@@ -117,6 +117,7 @@ use crate::rollout::RolloutRecorderParams;
use crate::safety::SafetyCheck;
use crate::safety::assess_command_safety;
use crate::safety::assess_safety_for_untrusted_command;
use crate::services::SessionServices;
use crate::shell;
use crate::turn_diff_tracker::TurnDiffTracker;
use crate::unified_exec::UnifiedExecSessionManager;
@@ -259,21 +260,8 @@ use crate::state::SessionState;
pub(crate) struct Session {
conversation_id: ConversationId,
tx_event: Sender<Event>,
/// Manager for external MCP servers/tools.
mcp_connection_manager: McpConnectionManager,
session_manager: ExecSessionManager,
unified_exec_manager: UnifiedExecSessionManager,
notifier: UserNotifier,
/// Optional rollout recorder for persisting the conversation transcript so
/// sessions can be replayed or inspected later.
rollout: Mutex<Option<RolloutRecorder>>,
state: Mutex<SessionState>,
codex_linux_sandbox_exe: Option<PathBuf>,
user_shell: shell::Shell,
show_raw_agent_reasoning: bool,
services: SessionServices,
next_internal_sub_id: AtomicU64,
}
@@ -459,18 +447,22 @@ impl Session {
is_review_mode: false,
final_output_json_schema: None,
};
let sess = Arc::new(Session {
conversation_id,
tx_event: tx_event.clone(),
let services = SessionServices {
mcp_connection_manager,
session_manager: ExecSessionManager::default(),
unified_exec_manager: UnifiedExecSessionManager::default(),
notifier: notify,
state: Mutex::new(state),
rollout: Mutex::new(Some(rollout_recorder)),
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
user_shell: default_shell,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
};
let sess = Arc::new(Session {
conversation_id,
tx_event: tx_event.clone(),
state: Mutex::new(state),
services,
next_internal_sub_id: AtomicU64::new(0),
});
@@ -706,14 +698,14 @@ impl Session {
Some(turn_context.cwd.clone()),
Some(turn_context.approval_policy),
Some(turn_context.sandbox_policy.clone()),
Some(self.user_shell.clone()),
Some(self.services.user_shell.clone()),
)));
items
}
async fn persist_rollout_items(&self, items: &[RolloutItem]) {
let recorder = {
let guard = self.rollout.lock().await;
let guard = self.services.rollout.lock().await;
guard.clone()
};
if let Some(rec) = recorder
@@ -772,8 +764,10 @@ impl Session {
.await;
// Derive user message events and persist only UserMessage to rollout
let msgs =
map_response_item_to_event_messages(&response_item, self.show_raw_agent_reasoning);
let msgs = map_response_item_to_event_messages(
&response_item,
self.services.show_raw_agent_reasoning,
);
let user_msgs: Vec<RolloutItem> = msgs
.into_iter()
.filter_map(|m| match m {
@@ -1005,7 +999,8 @@ impl Session {
tool: &str,
arguments: Option<serde_json::Value>,
) -> anyhow::Result<CallToolResult> {
self.mcp_connection_manager
self.services
.mcp_connection_manager
.call_tool(server, tool, arguments)
.await
}
@@ -1031,7 +1026,7 @@ impl Session {
}
pub(crate) fn notifier(&self) -> &UserNotifier {
&self.notifier
&self.services.notifier
}
}
@@ -1403,7 +1398,7 @@ async fn submission_loop(
let sub_id = sub.id.clone();
// This is a cheap lookup from the connection manager's cache.
let tools = sess.mcp_connection_manager.list_all_tools();
let tools = sess.services.mcp_connection_manager.list_all_tools();
let event = Event {
id: sub_id,
msg: EventMsg::McpListToolsResponse(
@@ -1453,7 +1448,7 @@ async fn submission_loop(
// Gracefully flush and shutdown rollout recorder on session end so tests
// that inspect the rollout file do not race with the background writer.
let recorder_opt = {
let mut guard = sess.rollout.lock().await;
let mut guard = sess.services.rollout.lock().await;
guard.take()
};
if let Some(rec) = recorder_opt
@@ -1480,7 +1475,7 @@ async fn submission_loop(
let sub_id = sub.id.clone();
// Flush rollout writes before returning the path so readers observe a consistent file.
let (path, rec_opt) = {
let guard = sess.rollout.lock().await;
let guard = sess.services.rollout.lock().await;
match guard.as_ref() {
Some(rec) => (rec.get_rollout_path(), Some(rec.clone())),
None => {
@@ -1938,7 +1933,7 @@ async fn run_turn(
) -> CodexResult<TurnRunResult> {
let tools = get_openai_tools(
&turn_context.tools_config,
Some(sess.mcp_connection_manager.list_all_tools()),
Some(sess.services.mcp_connection_manager.list_all_tools()),
);
let prompt = Prompt {
@@ -2188,7 +2183,7 @@ async fn try_run_turn(
sess.send_event(event).await;
}
ResponseEvent::ReasoningContentDelta(delta) => {
if sess.show_raw_agent_reasoning {
if sess.services.show_raw_agent_reasoning {
let event = Event {
id: sub_id.to_string(),
msg: EventMsg::AgentReasoningRawContentDelta(
@@ -2310,7 +2305,10 @@ async fn handle_response_item(
trace!("suppressing assistant Message in review mode");
Vec::new()
}
_ => map_response_item_to_event_messages(&item, sess.show_raw_agent_reasoning),
_ => map_response_item_to_event_messages(
&item,
sess.services.show_raw_agent_reasoning,
),
};
for msg in msgs {
let event = Event {
@@ -2356,7 +2354,11 @@ async fn handle_unified_exec_tool_call(
timeout_ms,
};
let result = sess.unified_exec_manager.handle_request(request).await;
let result = sess
.services
.unified_exec_manager
.handle_request(request)
.await;
let output_payload = match result {
Ok(value) => {
@@ -2531,6 +2533,7 @@ async fn handle_function_call(
}
};
let result = sess
.services
.session_manager
.handle_exec_command_request(exec_params)
.await;
@@ -2554,6 +2557,7 @@ async fn handle_function_call(
}
};
let result = sess
.services
.session_manager
.handle_write_stdin_request(write_stdin_params)
.await;
@@ -2565,7 +2569,7 @@ async fn handle_function_call(
}
}
_ => {
match sess.mcp_connection_manager.parse_tool_name(&name) {
match sess.services.mcp_connection_manager.parse_tool_name(&name) {
Some((server, tool_name)) => {
handle_mcp_tool_call(sess, &sub_id, call_id, server, tool_name, arguments).await
}
@@ -2683,11 +2687,12 @@ fn maybe_translate_shell_command(
sess: &Session,
turn_context: &TurnContext,
) -> ExecParams {
let should_translate = matches!(sess.user_shell, crate::shell::Shell::PowerShell(_))
let should_translate = matches!(sess.services.user_shell, crate::shell::Shell::PowerShell(_))
|| turn_context.shell_environment_policy.use_profile;
if should_translate
&& let Some(command) = sess
.services
.user_shell
.format_default_shell_invocation(params.command.clone())
{
@@ -2881,7 +2886,7 @@ async fn handle_container_exec_with_params(
sandbox_type,
sandbox_policy: &turn_context.sandbox_policy,
sandbox_cwd: &turn_context.cwd,
codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
codex_linux_sandbox_exe: &sess.services.codex_linux_sandbox_exe,
stdout_stream: if exec_command_context.apply_patch.is_some() {
None
} else {
@@ -3016,7 +3021,7 @@ async fn handle_sandbox_error(
sandbox_type: SandboxType::None,
sandbox_policy: &turn_context.sandbox_policy,
sandbox_cwd: &turn_context.cwd,
codex_linux_sandbox_exe: &sess.codex_linux_sandbox_exe,
codex_linux_sandbox_exe: &sess.services.codex_linux_sandbox_exe,
stdout_stream: if exec_command_context.apply_patch.is_some() {
None
} else {
@@ -3609,18 +3614,21 @@ mod tests {
is_review_mode: false,
final_output_json_schema: None,
};
let session = Session {
conversation_id,
tx_event,
let services = SessionServices {
mcp_connection_manager: McpConnectionManager::default(),
session_manager: ExecSessionManager::default(),
unified_exec_manager: UnifiedExecSessionManager::default(),
notifier: UserNotifier::default(),
rollout: Mutex::new(None),
state: Mutex::new(SessionState::new()),
codex_linux_sandbox_exe: None,
user_shell: shell::Shell::Unknown,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
};
let session = Session {
conversation_id,
tx_event,
state: Mutex::new(SessionState::new()),
services,
next_internal_sub_id: AtomicU64::new(0),
};
(session, turn_context)

View File

@@ -47,6 +47,7 @@ pub use model_provider_info::create_oss_provider_with_base_url;
mod conversation_manager;
mod event_mapping;
pub mod review_format;
mod services;
pub use codex_protocol::protocol::InitialHistory;
pub use conversation_manager::ConversationManager;
pub use conversation_manager::NewConversation;

View File

@@ -0,0 +1,23 @@
//! Group longlived helpers/managers for a session.
use std::path::PathBuf;
use tokio::sync::Mutex;
use crate::exec_command::ExecSessionManager;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::rollout::RolloutRecorder;
use crate::unified_exec::UnifiedExecSessionManager;
use crate::user_notification::UserNotifier;
/// Container for sideeffectful services and helpers used by `Session`.
pub(crate) struct SessionServices {
pub(crate) mcp_connection_manager: McpConnectionManager,
pub(crate) session_manager: ExecSessionManager,
pub(crate) unified_exec_manager: UnifiedExecSessionManager,
pub(crate) notifier: UserNotifier,
pub(crate) rollout: Mutex<Option<RolloutRecorder>>,
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
pub(crate) user_shell: crate::shell::Shell,
pub(crate) show_raw_agent_reasoning: bool,
}