diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index ad5070e063..69a07ed1b9 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -11,7 +11,6 @@ use crate::mcp::auth::McpAuthStatusEntry; use crate::parse_command::parse_command; use crate::parse_turn_item; use crate::response_processing::process_items; -use crate::review_format::format_review_findings_block; use crate::terminal; use crate::user_notification::UserNotifier; use async_channel::Receiver; @@ -20,7 +19,6 @@ use codex_apply_patch::ApplyPatchAction; use codex_protocol::ConversationId; use codex_protocol::items::TurnItem; use codex_protocol::protocol::ConversationPathResponseEvent; -use codex_protocol::protocol::ExitedReviewModeEvent; use codex_protocol::protocol::ItemCompletedEvent; use codex_protocol::protocol::ItemStartedEvent; use codex_protocol::protocol::ReviewRequest; @@ -47,7 +45,6 @@ use tokio_util::sync::CancellationToken; use tracing::debug; use tracing::error; use tracing::info; -use tracing::trace; use tracing::warn; use crate::ModelProviderInfo; @@ -86,7 +83,6 @@ use crate::protocol::ListCustomPromptsResponseEvent; use crate::protocol::Op; use crate::protocol::RateLimitSnapshot; use crate::protocol::ReviewDecision; -use crate::protocol::ReviewOutputEvent; use crate::protocol::SandboxPolicy; use crate::protocol::SessionConfiguredEvent; use crate::protocol::StreamErrorEvent; @@ -267,7 +263,6 @@ pub(crate) struct TurnContext { pub(crate) sandbox_policy: SandboxPolicy, pub(crate) shell_environment_policy: ShellEnvironmentPolicy, pub(crate) tools_config: ToolsConfig, - pub(crate) is_review_mode: bool, pub(crate) final_output_json_schema: Option, pub(crate) codex_linux_sandbox_exe: Option, } @@ -402,7 +397,6 @@ impl Session { sandbox_policy: session_configuration.sandbox_policy.clone(), shell_environment_policy: config.shell_environment_policy.clone(), tools_config, - is_review_mode: false, final_output_json_schema: None, codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), } @@ -634,6 +628,14 @@ impl Session { state.session_configuration = state.session_configuration.apply(&updates); } + pub(crate) async fn base_config(&self) -> Arc { + let state = self.state.lock().await; + state + .session_configuration + .original_config_do_not_use + .clone() + } + pub(crate) async fn new_turn(&self, updates: SessionSettingsUpdate) -> Arc { let sub_id = self.next_internal_sub_id(); self.new_turn_with_sub_id(sub_id, updates).await @@ -1468,7 +1470,6 @@ async fn spawn_review_thread( sandbox_policy: parent_turn_context.sandbox_policy.clone(), shell_environment_policy: parent_turn_context.shell_environment_policy.clone(), cwd: parent_turn_context.cwd.clone(), - is_review_mode: true, final_output_json_schema: None, codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(), }; @@ -1518,21 +1519,8 @@ pub(crate) async fn run_task( sess.send_event(&turn_context, event).await; let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); - // For review threads, keep an isolated in-memory history so the - // model sees a fresh conversation without the parent session's history. - // For normal turns, continue recording to the session history as before. - let is_review_mode = turn_context.is_review_mode; - - let mut review_thread_history: ConversationHistory = ConversationHistory::new(); - if is_review_mode { - // Seed review threads with environment context so the model knows the working directory. - review_thread_history - .record_items(sess.build_initial_context(turn_context.as_ref()).iter()); - review_thread_history.record_items(std::iter::once(&initial_input_for_turn.into())); - } else { - sess.record_input_and_rollout_usermsg(turn_context.as_ref(), &initial_input_for_turn) - .await; - } + sess.record_input_and_rollout_usermsg(turn_context.as_ref(), &initial_input_for_turn) + .await; let mut last_agent_message: Option = None; // Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains @@ -1561,12 +1549,7 @@ pub(crate) async fn run_task( // conversation history on each turn. The rollout file, however, should // only record the new items that originated in this turn so that it // represents an append-only log without duplicates. - let turn_input: Vec = if is_review_mode { - if !pending_input.is_empty() { - review_thread_history.record_items(&pending_input); - } - review_thread_history.get_history() - } else { + let turn_input: Vec = { sess.record_conversation_items(&pending_input).await; sess.history_snapshot().await }; @@ -1609,13 +1592,8 @@ pub(crate) async fn run_task( let token_limit_reached = total_usage_tokens .map(|tokens| tokens >= limit) .unwrap_or(false); - let (responses, items_to_record_in_conversation_history) = process_items( - processed_items, - is_review_mode, - &mut review_thread_history, - &sess, - ) - .await; + let (responses, items_to_record_in_conversation_history) = + process_items(processed_items, &sess).await; if token_limit_reached { if auto_compact_recently_attempted { @@ -1657,13 +1635,7 @@ pub(crate) async fn run_task( Err(CodexErr::TurnAborted { dangling_artifacts: processed_items, }) => { - let _ = process_items( - processed_items, - is_review_mode, - &mut review_thread_history, - &sess, - ) - .await; + let _ = process_items(processed_items, &sess).await; // Aborted turn is reported via a different event. break; } @@ -1679,50 +1651,9 @@ pub(crate) async fn run_task( } } - // If this was a review thread and we have a final assistant message, - // try to parse it as a ReviewOutput. - // - // If parsing fails, construct a minimal ReviewOutputEvent using the plain - // text as the overall explanation. Else, just exit review mode with None. - // - // Emits an ExitedReviewMode event with the parsed review output. - if turn_context.is_review_mode { - exit_review_mode( - sess.clone(), - Arc::clone(&turn_context), - last_agent_message.as_deref().map(parse_review_output_event), - ) - .await; - } - last_agent_message } -/// Parse the review output; when not valid JSON, build a structured -/// fallback that carries the plain text as the overall explanation. -/// -/// Returns: a ReviewOutputEvent parsed from JSON or a fallback populated from text. -fn parse_review_output_event(text: &str) -> ReviewOutputEvent { - // Try direct parse first - if let Ok(ev) = serde_json::from_str::(text) { - return ev; - } - // If wrapped in markdown fences or extra prose, attempt to extract the first JSON object - if let (Some(start), Some(end)) = (text.find('{'), text.rfind('}')) - && start < end - && let Some(slice) = text.get(start..=end) - && let Ok(ev) = serde_json::from_str::(slice) - { - return ev; - } - // Not JSON – return a structured ReviewOutputEvent that carries - // the plain text as the overall explanation. - ReviewOutputEvent { - overall_explanation: text.to_string(), - ..Default::default() - } -} - async fn run_turn( sess: Arc, turn_context: Arc, @@ -2009,12 +1940,8 @@ async fn try_run_turn( ResponseEvent::OutputTextDelta(delta) => { // In review child threads, suppress assistant text deltas; the // UI will show a selection popup from the final ReviewOutput. - if !turn_context.is_review_mode { - let event = EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }); - sess.send_event(&turn_context, event).await; - } else { - trace!("suppressing OutputTextDelta in review mode"); - } + let event = EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }); + sess.send_event(&turn_context, event).await; } ResponseEvent::ReasoningSummaryDelta(delta) => { let event = EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }); @@ -2049,13 +1976,7 @@ async fn handle_non_tool_response_item( ResponseItem::Message { .. } | ResponseItem::Reasoning { .. } | ResponseItem::WebSearchCall { .. } => { - let turn_item = match &item { - ResponseItem::Message { .. } if turn_context.is_review_mode => { - trace!("suppressing assistant Message in review mode"); - None - } - _ => parse_turn_item(&item), - }; + let turn_item = parse_turn_item(&item); if let Some(turn_item) = turn_item { sess.emit_turn_item_started_completed( turn_context.as_ref(), @@ -2128,58 +2049,6 @@ pub(crate) fn convert_call_tool_result_to_function_call_output_payload( } } -/// Emits an ExitedReviewMode Event with optional ReviewOutput, -/// and records a developer message with the review output. -pub(crate) async fn exit_review_mode( - session: Arc, - turn_context: Arc, - review_output: Option, -) { - let event = EventMsg::ExitedReviewMode(ExitedReviewModeEvent { - review_output: review_output.clone(), - }); - session.send_event(turn_context.as_ref(), event).await; - - let mut user_message = String::new(); - if let Some(out) = review_output { - let mut findings_str = String::new(); - let text = out.overall_explanation.trim(); - if !text.is_empty() { - findings_str.push_str(text); - } - if !out.findings.is_empty() { - let block = format_review_findings_block(&out.findings, None); - findings_str.push_str(&format!("\n{block}")); - } - user_message.push_str(&format!( - r#" - User initiated a review task. Here's the full review output from reviewer model. User may select one or more comments to resolve. - review - - {findings_str} - - -"#)); - } else { - user_message.push_str(r#" - User initiated a review task, but was interrupted. If user asks about this, tell them to re-initiate a review with `/review` and wait for it to complete. - review - - None. - - -"#); - } - - session - .record_conversation_items(&[ResponseItem::Message { - id: None, - role: "user".to_string(), - content: vec![ContentItem::InputText { text: user_message }], - }]) - .await; -} - fn mcp_init_error_display( server_name: &str, entry: Option<&McpAuthStatusEntry>, @@ -2659,12 +2528,6 @@ mod tests { sleep(Duration::from_secs(60)).await; } } - - async fn abort(&self, session: Arc, ctx: Arc) { - if let TaskKind::Review = self.kind { - exit_review_mode(session.clone_session(), ctx, None).await; - } - } } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 34b6df4a5a..4d8e965dcd 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -14,6 +14,7 @@ mod client_common; pub mod codex; mod codex_conversation; pub use codex_conversation::CodexConversation; +mod codex_delegate; mod command_safety; pub mod config; pub mod config_edit; diff --git a/codex-rs/core/src/response_processing.rs b/codex-rs/core/src/response_processing.rs index b9139ce6c0..0031ac1ffc 100644 --- a/codex-rs/core/src/response_processing.rs +++ b/codex-rs/core/src/response_processing.rs @@ -1,5 +1,4 @@ use crate::codex::Session; -use crate::conversation_history::ConversationHistory; use codex_protocol::models::FunctionCallOutputPayload; use codex_protocol::models::ResponseInputItem; use codex_protocol::models::ResponseItem; @@ -10,8 +9,6 @@ use tracing::warn; /// - `ResponseInputItem`s to send back to the model on the next turn. pub(crate) async fn process_items( processed_items: Vec, - is_review_mode: bool, - review_thread_history: &mut ConversationHistory, sess: &Session, ) -> (Vec, Vec) { let mut items_to_record_in_conversation_history = Vec::::new(); @@ -101,12 +98,8 @@ pub(crate) async fn process_items( // Only attempt to take the lock if there is something to record. if !items_to_record_in_conversation_history.is_empty() { - if is_review_mode { - review_thread_history.record_items(items_to_record_in_conversation_history.iter()); - } else { - sess.record_conversation_items(&items_to_record_in_conversation_history) - .await; - } + sess.record_conversation_items(&items_to_record_in_conversation_history) + .await; } (responses, items_to_record_in_conversation_history) } diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index 79527814e3..5f4347e58f 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -13,8 +13,10 @@ use tokio_util::task::AbortOnDropHandle; use tracing::trace; use tracing::warn; +use crate::AuthManager; use crate::codex::Session; use crate::codex::TurnContext; +use crate::config::Config; use crate::protocol::EventMsg; use crate::protocol::TaskCompleteEvent; use crate::protocol::TurnAbortReason; @@ -44,6 +46,14 @@ impl SessionTaskContext { pub(crate) fn clone_session(&self) -> Arc { Arc::clone(&self.session) } + + pub(crate) fn auth_manager(&self) -> Arc { + Arc::clone(&self.session.services.auth_manager) + } + + pub(crate) async fn base_config(&self) -> Arc { + self.session.base_config().await + } } #[async_trait] diff --git a/codex-rs/core/src/tasks/review.rs b/codex-rs/core/src/tasks/review.rs index fbf553b349..fb1dac039e 100644 --- a/codex-rs/core/src/tasks/review.rs +++ b/codex-rs/core/src/tasks/review.rs @@ -1,11 +1,18 @@ use std::sync::Arc; use async_trait::async_trait; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::ReviewOutputEvent; +use codex_protocol::protocol::TaskCompleteEvent; use tokio_util::sync::CancellationToken; +use crate::codex::Session; use crate::codex::TurnContext; -use crate::codex::exit_review_mode; -use crate::codex::run_task; +use crate::codex_delegate::run_codex_conversation; +// use crate::config::Config; // no longer needed directly; use session.base_config() +use crate::review_format::format_review_findings_block; use crate::state::TaskKind; use codex_protocol::user_input::UserInput; @@ -28,11 +35,108 @@ impl SessionTask for ReviewTask { input: Vec, cancellation_token: CancellationToken, ) -> Option { - let sess = session.clone_session(); - run_task(sess, ctx, input, TaskKind::Review, cancellation_token).await + // let sess = session.clone_session(); + // run_task(sess, ctx, input, TaskKind::Review, cancellation_token).await + + let config = session.base_config().await.as_ref().clone(); + let receiver = + match run_codex_conversation(config, session.auth_manager(), input, cancellation_token) + .await + { + Ok(r) => r, + Err(_) => return None, + }; + while let Ok(event) = receiver.recv().await { + session + .clone_session() + .send_event(ctx.as_ref(), event.clone()) + .await; + if let EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) = event { + exit_review_mode( + session.clone_session(), + last_agent_message.as_deref().map(parse_review_output_event), + ) + .await; + } + } + + Some("".to_string()) } - async fn abort(&self, session: Arc, ctx: Arc) { - exit_review_mode(session.clone_session(), ctx, None).await; + async fn abort(&self, session: Arc, _ctx: Arc) { + exit_review_mode(session.clone_session(), None).await; + } +} + +/// Emits an ExitedReviewMode Event with optional ReviewOutput, +/// and records a developer message with the review output. +pub(crate) async fn exit_review_mode( + session: Arc, + review_output: Option, +) { + // ExitedReviewMode event can be emitted by the caller if needed. + + let mut user_message = String::new(); + if let Some(out) = review_output { + let mut findings_str = String::new(); + let text = out.overall_explanation.trim(); + if !text.is_empty() { + findings_str.push_str(text); + } + if !out.findings.is_empty() { + let block = format_review_findings_block(&out.findings, None); + findings_str.push_str(&format!("\n{block}")); + } + user_message.push_str(&format!( + r#" + User initiated a review task. Here's the full review output from reviewer model. User may select one or more comments to resolve. + review + + {findings_str} + + +"#)); + } else { + user_message.push_str(r#" + User initiated a review task, but was interrupted. If user asks about this, tell them to re-initiate a review with `/review` and wait for it to complete. + review + + None. + + +"#); + } + + session + .record_conversation_items(&[ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { text: user_message }], + }]) + .await; +} + +/// Parse the review output; when not valid JSON, build a structured +/// fallback that carries the plain text as the overall explanation. +/// +/// Returns: a ReviewOutputEvent parsed from JSON or a fallback populated from text. +fn parse_review_output_event(text: &str) -> ReviewOutputEvent { + // Try direct parse first + if let Ok(ev) = serde_json::from_str::(text) { + return ev; + } + // If wrapped in markdown fences or extra prose, attempt to extract the first JSON object + if let (Some(start), Some(end)) = (text.find('{'), text.rfind('}')) + && start < end + && let Some(slice) = text.get(start..=end) + && let Ok(ev) = serde_json::from_str::(slice) + { + return ev; + } + // Not JSON – return a structured ReviewOutputEvent that carries + // the plain text as the overall explanation. + ReviewOutputEvent { + overall_explanation: text.to_string(), + ..Default::default() } } diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 105f028049..b515353be1 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -919,6 +919,7 @@ pub enum SessionSource { VSCode, Exec, Mcp, + SubAgent, #[serde(other)] Unknown, }