diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 2b27b07b66..845ef2bf53 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -9050,7 +9050,7 @@ mod tests { async fn run( self: Arc, _session: Arc, - _ctx: Arc, + _initial_turn_context: Arc, _input: Vec, _cancellation_token: CancellationToken, ) -> Option { @@ -10586,7 +10586,7 @@ mod tests { async fn run( self: Arc, _session: Arc, - _ctx: Arc, + _initial_turn_context: Arc, _input: Vec, cancellation_token: CancellationToken, ) -> Option { diff --git a/codex-rs/core/src/tasks/compact.rs b/codex-rs/core/src/tasks/compact.rs index a8c0a5e06d..1dad31c100 100644 --- a/codex-rs/core/src/tasks/compact.rs +++ b/codex-rs/core/src/tasks/compact.rs @@ -24,25 +24,26 @@ impl SessionTask for CompactTask { async fn run( self: Arc, session: Arc, - ctx: Arc, + initial_turn_context: Arc, input: Vec, _cancellation_token: CancellationToken, ) -> Option { let session = session.clone_session(); - let _ = if crate::compact::should_use_remote_compact_task(&ctx.provider) { + let _ = if crate::compact::should_use_remote_compact_task(&initial_turn_context.provider) { let _ = session.services.otel_manager.counter( "codex.task.compact", 1, &[("type", "remote")], ); - crate::compact_remote::run_remote_compact_task(session.clone(), ctx).await + crate::compact_remote::run_remote_compact_task(session.clone(), initial_turn_context) + .await } else { let _ = session.services.otel_manager.counter( "codex.task.compact", 1, &[("type", "local")], ); - crate::compact::run_compact_task(session.clone(), ctx, input).await + crate::compact::run_compact_task(session.clone(), initial_turn_context, input).await }; None } diff --git a/codex-rs/core/src/tasks/ghost_snapshot.rs b/codex-rs/core/src/tasks/ghost_snapshot.rs index ded8533a23..9fdf194937 100644 --- a/codex-rs/core/src/tasks/ghost_snapshot.rs +++ b/codex-rs/core/src/tasks/ghost_snapshot.rs @@ -39,18 +39,18 @@ impl SessionTask for GhostSnapshotTask { async fn run( self: Arc, session: Arc, - ctx: Arc, + initial_turn_context: Arc, _input: Vec, cancellation_token: CancellationToken, ) -> Option { tokio::task::spawn(async move { let token = self.token; - let warnings_enabled = !ctx.ghost_snapshot.disable_warnings; + let warnings_enabled = !initial_turn_context.ghost_snapshot.disable_warnings; // Channel used to signal when the snapshot work has finished so the // timeout warning task can exit early without sending a warning. let (snapshot_done_tx, snapshot_done_rx) = oneshot::channel::<()>(); if warnings_enabled { - let ctx_for_warning = ctx.clone(); + let initial_turn_context_for_warning = initial_turn_context.clone(); let cancellation_token_for_warning = cancellation_token.clone(); let session_for_warning = session.clone(); // Fire a generic warning if the snapshot is still running after @@ -61,7 +61,7 @@ impl SessionTask for GhostSnapshotTask { _ = tokio::time::sleep(SNAPSHOT_WARNING_THRESHOLD) => { session_for_warning.session .send_event( - &ctx_for_warning, + &initial_turn_context_for_warning, EventMsg::Warning(WarningEvent { message: "Repository snapshot is taking longer than expected. Large untracked or ignored files can slow snapshots; consider adding large files or directories to .gitignore or disabling `undo` in your config.".to_string() }), @@ -76,12 +76,12 @@ impl SessionTask for GhostSnapshotTask { drop(snapshot_done_rx); } - let ctx_for_task = ctx.clone(); + let initial_turn_context_for_task = initial_turn_context.clone(); let cancelled = tokio::select! { _ = cancellation_token.cancelled() => true, _ = async { - let repo_path = ctx_for_task.cwd.clone(); - let ghost_snapshot = ctx_for_task.ghost_snapshot.clone(); + let repo_path = initial_turn_context_for_task.cwd.clone(); + let ghost_snapshot = initial_turn_context_for_task.ghost_snapshot.clone(); let ghost_snapshot_for_commit = ghost_snapshot.clone(); // Required to run in a dedicated blocking pool. match tokio::task::spawn_blocking(move || { @@ -102,7 +102,7 @@ impl SessionTask for GhostSnapshotTask { session .session .send_event( - &ctx_for_task, + &initial_turn_context_for_task, EventMsg::Warning(WarningEvent { message }), ) .await; @@ -110,7 +110,7 @@ impl SessionTask for GhostSnapshotTask { } session .session - .record_conversation_items(&ctx, &[ResponseItem::GhostSnapshot { + .record_conversation_items(&initial_turn_context, &[ResponseItem::GhostSnapshot { ghost_commit: ghost_commit.clone(), }]) .await; @@ -118,26 +118,26 @@ impl SessionTask for GhostSnapshotTask { } Ok(Err(err)) => match err { GitToolingError::NotAGitRepository { .. } => info!( - sub_id = ctx_for_task.sub_id.as_str(), + sub_id = initial_turn_context_for_task.sub_id.as_str(), "skipping ghost snapshot because current directory is not a Git repository" ), _ => { warn!( - sub_id = ctx_for_task.sub_id.as_str(), + sub_id = initial_turn_context_for_task.sub_id.as_str(), "failed to capture ghost snapshot: {err}" ); } }, Err(err) => { warn!( - sub_id = ctx_for_task.sub_id.as_str(), + sub_id = initial_turn_context_for_task.sub_id.as_str(), "ghost snapshot task panicked: {err}" ); let message = format!("Snapshots disabled after ghost snapshot panic: {err}."); session .session - .notify_background_event(&ctx_for_task, message) + .notify_background_event(&initial_turn_context_for_task, message) .await; } } @@ -150,7 +150,7 @@ impl SessionTask for GhostSnapshotTask { info!("ghost snapshot task cancelled"); } - match ctx.tool_call_gate.mark_ready(token).await { + match initial_turn_context.tool_call_gate.mark_ready(token).await { Ok(true) => info!("ghost snapshot gate marked ready"), Ok(false) => warn!("ghost snapshot gate already ready"), Err(err) => warn!("failed to mark ghost snapshot ready: {err}"), diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index 50e4505800..346936f840 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -103,7 +103,7 @@ pub(crate) trait SessionTask: Send + Sync + 'static { async fn run( self: Arc, session: Arc, - ctx: Arc, + initial_turn_context: Arc, input: Vec, cancellation_token: CancellationToken, ) -> Option; @@ -113,15 +113,19 @@ pub(crate) trait SessionTask: Send + Sync + 'static { /// The default implementation is a no-op; override this if additional /// teardown or notifications are required once /// [`Session::abort_all_tasks`] cancels the task. - async fn abort(&self, session: Arc, ctx: Arc) { - let _ = (session, ctx); + async fn abort( + &self, + session: Arc, + initial_turn_context: Arc, + ) { + let _ = (session, initial_turn_context); } } impl Session { pub async fn spawn_task( self: &Arc, - turn_context: Arc, + initial_turn_context: Arc, input: Vec, task: T, ) { @@ -138,7 +142,7 @@ impl Session { let done_clone = Arc::clone(&done); let handle = { let session_ctx = Arc::new(SessionTaskContext::new(Arc::clone(self))); - let ctx = Arc::clone(&turn_context); + let task_initial_turn_context = Arc::clone(&initial_turn_context); let task_for_run = Arc::clone(&task); let task_cancellation_token = cancellation_token.child_token(); // Task-owned turn spans keep a core-owned span open for the @@ -147,16 +151,16 @@ impl Session { "turn", otel.name = span_name, thread.id = %self.conversation_id, - turn.id = %turn_context.sub_id, - model = %turn_context.model_info.slug, + turn.id = %initial_turn_context.sub_id, + model = %initial_turn_context.model_info.slug, ); tokio::spawn( async move { - let ctx_for_finish = Arc::clone(&ctx); + let initial_turn_context_for_finish = Arc::clone(&task_initial_turn_context); let last_agent_message = task_for_run .run( Arc::clone(&session_ctx), - ctx, + task_initial_turn_context, input, task_cancellation_token.child_token(), ) @@ -165,8 +169,11 @@ impl Session { sess.flush_rollout().await; if !task_cancellation_token.is_cancelled() { // Emit completion uniformly from spawn site so all tasks share the same lifecycle. - sess.on_task_finished(Arc::clone(&ctx_for_finish), last_agent_message) - .await; + sess.on_task_finished( + Arc::clone(&initial_turn_context_for_finish), + last_agent_message, + ) + .await; } done_clone.notify_waiters(); } @@ -174,7 +181,7 @@ impl Session { ) }; - let timer = turn_context + let timer = initial_turn_context .otel_manager .start_timer("codex.turn.e2e_duration_ms", &[]) .ok(); @@ -185,7 +192,7 @@ impl Session { kind: task_kind, task, cancellation_token, - initial_turn_context: Arc::clone(&turn_context), + initial_turn_context: Arc::clone(&initial_turn_context), _timer: timer, }; self.register_new_active_task(running_task).await; @@ -202,10 +209,10 @@ impl Session { pub async fn on_task_finished( self: &Arc, - turn_context: Arc, + initial_turn_context: Arc, last_agent_message: Option, ) { - turn_context + initial_turn_context .turn_metadata_state .cancel_git_enrichment_task(); @@ -216,7 +223,7 @@ impl Session { let mut turn_tool_calls = 0_u64; let mut current_turn_metadata_state = None; if let Some(at) = active.as_mut() - && at.remove_task(&turn_context.sub_id) + && at.remove_task(&initial_turn_context.sub_id) { current_turn_metadata_state = at .current_turn_context @@ -246,14 +253,14 @@ impl Session { // normal pre-sampling drain. This helper records the response item once, then // emits ItemStarted/UserMessage and ItemCompleted/UserMessage for clients. self.record_user_prompt_and_emit_turn_item( - turn_context.as_ref(), + initial_turn_context.as_ref(), &user_message.content, response_item, ) .await; } else { self.record_conversation_items( - turn_context.as_ref(), + initial_turn_context.as_ref(), std::slice::from_ref(&response_item), ) .await; @@ -321,10 +328,10 @@ impl Session { ); } let event = EventMsg::TurnComplete(TurnCompleteEvent { - turn_id: turn_context.sub_id.clone(), + turn_id: initial_turn_context.sub_id.clone(), last_agent_message, }); - self.send_event(turn_context.as_ref(), event).await; + self.send_event(initial_turn_context.as_ref(), event).await; } async fn register_new_active_task(&self, task: RunningTask) { diff --git a/codex-rs/core/src/tasks/regular.rs b/codex-rs/core/src/tasks/regular.rs index e3a9ef4c38..abda6a3f89 100644 --- a/codex-rs/core/src/tasks/regular.rs +++ b/codex-rs/core/src/tasks/regular.rs @@ -75,7 +75,7 @@ impl SessionTask for RegularTask { async fn run( self: Arc, session: Arc, - ctx: Arc, + initial_turn_context: Arc, input: Vec, cancellation_token: CancellationToken, ) -> Option { @@ -85,7 +85,7 @@ impl SessionTask for RegularTask { let prewarmed_client_session = self.take_prewarmed_session().await; run_turn( sess, - ctx, + initial_turn_context, input, prewarmed_client_session, cancellation_token, diff --git a/codex-rs/core/src/tasks/review.rs b/codex-rs/core/src/tasks/review.rs index 4787ee7826..1539a4e3b3 100644 --- a/codex-rs/core/src/tasks/review.rs +++ b/codex-rs/core/src/tasks/review.rs @@ -50,7 +50,7 @@ impl SessionTask for ReviewTask { async fn run( self: Arc, session: Arc, - ctx: Arc, + initial_turn_context: Arc, input: Vec, cancellation_token: CancellationToken, ) -> Option { @@ -63,33 +63,44 @@ impl SessionTask for ReviewTask { // Start sub-codex conversation and get the receiver for events. let output = match start_review_conversation( session.clone(), - ctx.clone(), + initial_turn_context.clone(), input, cancellation_token.clone(), ) .await { - Some(receiver) => process_review_events(session.clone(), ctx.clone(), receiver).await, + Some(receiver) => { + process_review_events(session.clone(), initial_turn_context.clone(), receiver).await + } None => None, }; if !cancellation_token.is_cancelled() { - exit_review_mode(session.clone_session(), output.clone(), ctx.clone()).await; + exit_review_mode( + session.clone_session(), + output.clone(), + initial_turn_context.clone(), + ) + .await; } None } - async fn abort(&self, session: Arc, ctx: Arc) { - exit_review_mode(session.clone_session(), None, ctx).await; + async fn abort( + &self, + session: Arc, + initial_turn_context: Arc, + ) { + exit_review_mode(session.clone_session(), None, initial_turn_context).await; } } async fn start_review_conversation( session: Arc, - ctx: Arc, + initial_turn_context: Arc, input: Vec, cancellation_token: CancellationToken, ) -> Option> { - let config = ctx.config.clone(); + let config = initial_turn_context.config.clone(); let mut sub_agent_config = config.as_ref().clone(); // Carry over review-only feature restrictions so the delegate cannot // re-enable blocked tools (web search, collab tools, view image). @@ -108,7 +119,7 @@ async fn start_review_conversation( let model = config .review_model .clone() - .unwrap_or_else(|| ctx.model_info.slug.clone()); + .unwrap_or_else(|| initial_turn_context.model_info.slug.clone()); sub_agent_config.model = Some(model); (run_codex_thread_one_shot( sub_agent_config, @@ -116,7 +127,7 @@ async fn start_review_conversation( session.models_manager(), input, session.clone_session(), - ctx.clone(), + initial_turn_context.clone(), cancellation_token, None, ) @@ -127,7 +138,7 @@ async fn start_review_conversation( async fn process_review_events( session: Arc, - ctx: Arc, + initial_turn_context: Arc, receiver: async_channel::Receiver, ) -> Option { let mut prev_agent_message: Option = None; @@ -137,7 +148,7 @@ async fn process_review_events( if let Some(prev) = prev_agent_message.take() { session .clone_session() - .send_event(ctx.as_ref(), prev.msg) + .send_event(initial_turn_context.as_ref(), prev.msg) .await; } prev_agent_message = Some(event); @@ -166,7 +177,7 @@ async fn process_review_events( other => { session .clone_session() - .send_event(ctx.as_ref(), other) + .send_event(initial_turn_context.as_ref(), other) .await; } } @@ -202,7 +213,7 @@ fn parse_review_output_event(text: &str) -> ReviewOutputEvent { pub(crate) async fn exit_review_mode( session: Arc, review_output: Option, - ctx: Arc, + initial_turn_context: Arc, ) { const REVIEW_USER_MESSAGE_ID: &str = "review_rollout_user"; const REVIEW_ASSISTANT_MESSAGE_ID: &str = "review_rollout_assistant"; @@ -230,7 +241,7 @@ pub(crate) async fn exit_review_mode( session .record_conversation_items( - &ctx, + &initial_turn_context, &[ResponseItem::Message { id: Some(REVIEW_USER_MESSAGE_ID.to_string()), role: "user".to_string(), @@ -243,13 +254,13 @@ pub(crate) async fn exit_review_mode( session .send_event( - ctx.as_ref(), + initial_turn_context.as_ref(), EventMsg::ExitedReviewMode(ExitedReviewModeEvent { review_output }), ) .await; session .record_response_item_and_emit_turn_item( - ctx.as_ref(), + initial_turn_context.as_ref(), ResponseItem::Message { id: Some(REVIEW_ASSISTANT_MESSAGE_ID.to_string()), role: "assistant".to_string(), diff --git a/codex-rs/core/src/tasks/undo.rs b/codex-rs/core/src/tasks/undo.rs index 95f51c1ec3..fe6fa2be4b 100644 --- a/codex-rs/core/src/tasks/undo.rs +++ b/codex-rs/core/src/tasks/undo.rs @@ -38,7 +38,7 @@ impl SessionTask for UndoTask { async fn run( self: Arc, session: Arc, - ctx: Arc, + initial_turn_context: Arc, _input: Vec, cancellation_token: CancellationToken, ) -> Option { @@ -49,7 +49,7 @@ impl SessionTask for UndoTask { .counter("codex.task.undo", 1, &[]); let sess = session.clone_session(); sess.send_event( - ctx.as_ref(), + initial_turn_context.as_ref(), EventMsg::UndoStarted(UndoStartedEvent { message: Some("Undo in progress...".to_string()), }), @@ -58,7 +58,7 @@ impl SessionTask for UndoTask { if cancellation_token.is_cancelled() { sess.send_event( - ctx.as_ref(), + initial_turn_context.as_ref(), EventMsg::UndoCompleted(UndoCompletedEvent { success: false, message: Some("Undo cancelled.".to_string()), @@ -88,14 +88,17 @@ impl SessionTask for UndoTask { }) else { completed.message = Some("No ghost snapshot available to undo.".to_string()); - sess.send_event(ctx.as_ref(), EventMsg::UndoCompleted(completed)) - .await; + sess.send_event( + initial_turn_context.as_ref(), + EventMsg::UndoCompleted(completed), + ) + .await; return None; }; let commit_id = ghost_commit.id().to_string(); - let repo_path = ctx.cwd.clone(); - let ghost_snapshot = ctx.ghost_snapshot.clone(); + let repo_path = initial_turn_context.cwd.clone(); + let ghost_snapshot = initial_turn_context.ghost_snapshot.clone(); let restore_result = tokio::task::spawn_blocking(move || { let options = RestoreGhostCommitOptions::new(&repo_path).ghost_snapshot(ghost_snapshot); restore_ghost_commit_with_options(&options, &ghost_commit) @@ -124,8 +127,11 @@ impl SessionTask for UndoTask { } } - sess.send_event(ctx.as_ref(), EventMsg::UndoCompleted(completed)) - .await; + sess.send_event( + initial_turn_context.as_ref(), + EventMsg::UndoCompleted(completed), + ) + .await; None } } diff --git a/codex-rs/core/src/tasks/user_shell.rs b/codex-rs/core/src/tasks/user_shell.rs index 2f77d9fcee..b4e35eca80 100644 --- a/codex-rs/core/src/tasks/user_shell.rs +++ b/codex-rs/core/src/tasks/user_shell.rs @@ -73,13 +73,13 @@ impl SessionTask for UserShellCommandTask { async fn run( self: Arc, session: Arc, - turn_context: Arc, + initial_turn_context: Arc, _input: Vec, cancellation_token: CancellationToken, ) -> Option { execute_user_shell_command( session.clone_session(), - turn_context, + initial_turn_context, self.command.clone(), cancellation_token, UserShellCommandMode::StandaloneTurn, diff --git a/codex-rs/core/src/tools/network_approval.rs b/codex-rs/core/src/tools/network_approval.rs index 020de7f9d5..91e3f72373 100644 --- a/codex-rs/core/src/tools/network_approval.rs +++ b/codex-rs/core/src/tools/network_approval.rs @@ -577,7 +577,7 @@ mod tests { async fn run( self: Arc, _session: Arc, - _ctx: Arc, + _initial_turn_context: Arc, _input: Vec, _cancellation_token: CancellationToken, ) -> Option {