Clarify initial turn context naming in task lifecycle

Rename task-snapshot TurnContext parameters and locals to initial_turn_context across task spawning, completion, and task implementations to avoid confusion with mutable active-turn context.

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Charles Cunningham
2026-03-06 01:33:15 -08:00
parent e2fae5de8d
commit ff975f2f62
9 changed files with 96 additions and 71 deletions

View File

@@ -9050,7 +9050,7 @@ mod tests {
async fn run(
self: Arc<Self>,
_session: Arc<SessionTaskContext>,
_ctx: Arc<TurnContext>,
_initial_turn_context: Arc<TurnContext>,
_input: Vec<UserInput>,
_cancellation_token: CancellationToken,
) -> Option<String> {
@@ -10586,7 +10586,7 @@ mod tests {
async fn run(
self: Arc<Self>,
_session: Arc<SessionTaskContext>,
_ctx: Arc<TurnContext>,
_initial_turn_context: Arc<TurnContext>,
_input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {

View File

@@ -24,25 +24,26 @@ impl SessionTask for CompactTask {
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
_cancellation_token: CancellationToken,
) -> Option<String> {
let session = session.clone_session();
let _ = if crate::compact::should_use_remote_compact_task(&ctx.provider) {
let _ = if crate::compact::should_use_remote_compact_task(&initial_turn_context.provider) {
let _ = session.services.otel_manager.counter(
"codex.task.compact",
1,
&[("type", "remote")],
);
crate::compact_remote::run_remote_compact_task(session.clone(), ctx).await
crate::compact_remote::run_remote_compact_task(session.clone(), initial_turn_context)
.await
} else {
let _ = session.services.otel_manager.counter(
"codex.task.compact",
1,
&[("type", "local")],
);
crate::compact::run_compact_task(session.clone(), ctx, input).await
crate::compact::run_compact_task(session.clone(), initial_turn_context, input).await
};
None
}

View File

@@ -39,18 +39,18 @@ impl SessionTask for GhostSnapshotTask {
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
_input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
tokio::task::spawn(async move {
let token = self.token;
let warnings_enabled = !ctx.ghost_snapshot.disable_warnings;
let warnings_enabled = !initial_turn_context.ghost_snapshot.disable_warnings;
// Channel used to signal when the snapshot work has finished so the
// timeout warning task can exit early without sending a warning.
let (snapshot_done_tx, snapshot_done_rx) = oneshot::channel::<()>();
if warnings_enabled {
let ctx_for_warning = ctx.clone();
let initial_turn_context_for_warning = initial_turn_context.clone();
let cancellation_token_for_warning = cancellation_token.clone();
let session_for_warning = session.clone();
// Fire a generic warning if the snapshot is still running after
@@ -61,7 +61,7 @@ impl SessionTask for GhostSnapshotTask {
_ = tokio::time::sleep(SNAPSHOT_WARNING_THRESHOLD) => {
session_for_warning.session
.send_event(
&ctx_for_warning,
&initial_turn_context_for_warning,
EventMsg::Warning(WarningEvent {
message: "Repository snapshot is taking longer than expected. Large untracked or ignored files can slow snapshots; consider adding large files or directories to .gitignore or disabling `undo` in your config.".to_string()
}),
@@ -76,12 +76,12 @@ impl SessionTask for GhostSnapshotTask {
drop(snapshot_done_rx);
}
let ctx_for_task = ctx.clone();
let initial_turn_context_for_task = initial_turn_context.clone();
let cancelled = tokio::select! {
_ = cancellation_token.cancelled() => true,
_ = async {
let repo_path = ctx_for_task.cwd.clone();
let ghost_snapshot = ctx_for_task.ghost_snapshot.clone();
let repo_path = initial_turn_context_for_task.cwd.clone();
let ghost_snapshot = initial_turn_context_for_task.ghost_snapshot.clone();
let ghost_snapshot_for_commit = ghost_snapshot.clone();
// Required to run in a dedicated blocking pool.
match tokio::task::spawn_blocking(move || {
@@ -102,7 +102,7 @@ impl SessionTask for GhostSnapshotTask {
session
.session
.send_event(
&ctx_for_task,
&initial_turn_context_for_task,
EventMsg::Warning(WarningEvent { message }),
)
.await;
@@ -110,7 +110,7 @@ impl SessionTask for GhostSnapshotTask {
}
session
.session
.record_conversation_items(&ctx, &[ResponseItem::GhostSnapshot {
.record_conversation_items(&initial_turn_context, &[ResponseItem::GhostSnapshot {
ghost_commit: ghost_commit.clone(),
}])
.await;
@@ -118,26 +118,26 @@ impl SessionTask for GhostSnapshotTask {
}
Ok(Err(err)) => match err {
GitToolingError::NotAGitRepository { .. } => info!(
sub_id = ctx_for_task.sub_id.as_str(),
sub_id = initial_turn_context_for_task.sub_id.as_str(),
"skipping ghost snapshot because current directory is not a Git repository"
),
_ => {
warn!(
sub_id = ctx_for_task.sub_id.as_str(),
sub_id = initial_turn_context_for_task.sub_id.as_str(),
"failed to capture ghost snapshot: {err}"
);
}
},
Err(err) => {
warn!(
sub_id = ctx_for_task.sub_id.as_str(),
sub_id = initial_turn_context_for_task.sub_id.as_str(),
"ghost snapshot task panicked: {err}"
);
let message =
format!("Snapshots disabled after ghost snapshot panic: {err}.");
session
.session
.notify_background_event(&ctx_for_task, message)
.notify_background_event(&initial_turn_context_for_task, message)
.await;
}
}
@@ -150,7 +150,7 @@ impl SessionTask for GhostSnapshotTask {
info!("ghost snapshot task cancelled");
}
match ctx.tool_call_gate.mark_ready(token).await {
match initial_turn_context.tool_call_gate.mark_ready(token).await {
Ok(true) => info!("ghost snapshot gate marked ready"),
Ok(false) => warn!("ghost snapshot gate already ready"),
Err(err) => warn!("failed to mark ghost snapshot ready: {err}"),

View File

@@ -103,7 +103,7 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String>;
@@ -113,15 +113,19 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
/// The default implementation is a no-op; override this if additional
/// teardown or notifications are required once
/// [`Session::abort_all_tasks`] cancels the task.
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
let _ = (session, ctx);
async fn abort(
&self,
session: Arc<SessionTaskContext>,
initial_turn_context: Arc<TurnContext>,
) {
let _ = (session, initial_turn_context);
}
}
impl Session {
pub async fn spawn_task<T: SessionTask>(
self: &Arc<Self>,
turn_context: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
task: T,
) {
@@ -138,7 +142,7 @@ impl Session {
let done_clone = Arc::clone(&done);
let handle = {
let session_ctx = Arc::new(SessionTaskContext::new(Arc::clone(self)));
let ctx = Arc::clone(&turn_context);
let task_initial_turn_context = Arc::clone(&initial_turn_context);
let task_for_run = Arc::clone(&task);
let task_cancellation_token = cancellation_token.child_token();
// Task-owned turn spans keep a core-owned span open for the
@@ -147,16 +151,16 @@ impl Session {
"turn",
otel.name = span_name,
thread.id = %self.conversation_id,
turn.id = %turn_context.sub_id,
model = %turn_context.model_info.slug,
turn.id = %initial_turn_context.sub_id,
model = %initial_turn_context.model_info.slug,
);
tokio::spawn(
async move {
let ctx_for_finish = Arc::clone(&ctx);
let initial_turn_context_for_finish = Arc::clone(&task_initial_turn_context);
let last_agent_message = task_for_run
.run(
Arc::clone(&session_ctx),
ctx,
task_initial_turn_context,
input,
task_cancellation_token.child_token(),
)
@@ -165,8 +169,11 @@ impl Session {
sess.flush_rollout().await;
if !task_cancellation_token.is_cancelled() {
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
sess.on_task_finished(Arc::clone(&ctx_for_finish), last_agent_message)
.await;
sess.on_task_finished(
Arc::clone(&initial_turn_context_for_finish),
last_agent_message,
)
.await;
}
done_clone.notify_waiters();
}
@@ -174,7 +181,7 @@ impl Session {
)
};
let timer = turn_context
let timer = initial_turn_context
.otel_manager
.start_timer("codex.turn.e2e_duration_ms", &[])
.ok();
@@ -185,7 +192,7 @@ impl Session {
kind: task_kind,
task,
cancellation_token,
initial_turn_context: Arc::clone(&turn_context),
initial_turn_context: Arc::clone(&initial_turn_context),
_timer: timer,
};
self.register_new_active_task(running_task).await;
@@ -202,10 +209,10 @@ impl Session {
pub async fn on_task_finished(
self: &Arc<Self>,
turn_context: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
last_agent_message: Option<String>,
) {
turn_context
initial_turn_context
.turn_metadata_state
.cancel_git_enrichment_task();
@@ -216,7 +223,7 @@ impl Session {
let mut turn_tool_calls = 0_u64;
let mut current_turn_metadata_state = None;
if let Some(at) = active.as_mut()
&& at.remove_task(&turn_context.sub_id)
&& at.remove_task(&initial_turn_context.sub_id)
{
current_turn_metadata_state = at
.current_turn_context
@@ -246,14 +253,14 @@ impl Session {
// normal pre-sampling drain. This helper records the response item once, then
// emits ItemStarted/UserMessage and ItemCompleted/UserMessage for clients.
self.record_user_prompt_and_emit_turn_item(
turn_context.as_ref(),
initial_turn_context.as_ref(),
&user_message.content,
response_item,
)
.await;
} else {
self.record_conversation_items(
turn_context.as_ref(),
initial_turn_context.as_ref(),
std::slice::from_ref(&response_item),
)
.await;
@@ -321,10 +328,10 @@ impl Session {
);
}
let event = EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: turn_context.sub_id.clone(),
turn_id: initial_turn_context.sub_id.clone(),
last_agent_message,
});
self.send_event(turn_context.as_ref(), event).await;
self.send_event(initial_turn_context.as_ref(), event).await;
}
async fn register_new_active_task(&self, task: RunningTask) {

View File

@@ -75,7 +75,7 @@ impl SessionTask for RegularTask {
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
@@ -85,7 +85,7 @@ impl SessionTask for RegularTask {
let prewarmed_client_session = self.take_prewarmed_session().await;
run_turn(
sess,
ctx,
initial_turn_context,
input,
prewarmed_client_session,
cancellation_token,

View File

@@ -50,7 +50,7 @@ impl SessionTask for ReviewTask {
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
@@ -63,33 +63,44 @@ impl SessionTask for ReviewTask {
// Start sub-codex conversation and get the receiver for events.
let output = match start_review_conversation(
session.clone(),
ctx.clone(),
initial_turn_context.clone(),
input,
cancellation_token.clone(),
)
.await
{
Some(receiver) => process_review_events(session.clone(), ctx.clone(), receiver).await,
Some(receiver) => {
process_review_events(session.clone(), initial_turn_context.clone(), receiver).await
}
None => None,
};
if !cancellation_token.is_cancelled() {
exit_review_mode(session.clone_session(), output.clone(), ctx.clone()).await;
exit_review_mode(
session.clone_session(),
output.clone(),
initial_turn_context.clone(),
)
.await;
}
None
}
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
exit_review_mode(session.clone_session(), None, ctx).await;
async fn abort(
&self,
session: Arc<SessionTaskContext>,
initial_turn_context: Arc<TurnContext>,
) {
exit_review_mode(session.clone_session(), None, initial_turn_context).await;
}
}
async fn start_review_conversation(
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<async_channel::Receiver<Event>> {
let config = ctx.config.clone();
let config = initial_turn_context.config.clone();
let mut sub_agent_config = config.as_ref().clone();
// Carry over review-only feature restrictions so the delegate cannot
// re-enable blocked tools (web search, collab tools, view image).
@@ -108,7 +119,7 @@ async fn start_review_conversation(
let model = config
.review_model
.clone()
.unwrap_or_else(|| ctx.model_info.slug.clone());
.unwrap_or_else(|| initial_turn_context.model_info.slug.clone());
sub_agent_config.model = Some(model);
(run_codex_thread_one_shot(
sub_agent_config,
@@ -116,7 +127,7 @@ async fn start_review_conversation(
session.models_manager(),
input,
session.clone_session(),
ctx.clone(),
initial_turn_context.clone(),
cancellation_token,
None,
)
@@ -127,7 +138,7 @@ async fn start_review_conversation(
async fn process_review_events(
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
receiver: async_channel::Receiver<Event>,
) -> Option<ReviewOutputEvent> {
let mut prev_agent_message: Option<Event> = None;
@@ -137,7 +148,7 @@ async fn process_review_events(
if let Some(prev) = prev_agent_message.take() {
session
.clone_session()
.send_event(ctx.as_ref(), prev.msg)
.send_event(initial_turn_context.as_ref(), prev.msg)
.await;
}
prev_agent_message = Some(event);
@@ -166,7 +177,7 @@ async fn process_review_events(
other => {
session
.clone_session()
.send_event(ctx.as_ref(), other)
.send_event(initial_turn_context.as_ref(), other)
.await;
}
}
@@ -202,7 +213,7 @@ fn parse_review_output_event(text: &str) -> ReviewOutputEvent {
pub(crate) async fn exit_review_mode(
session: Arc<Session>,
review_output: Option<ReviewOutputEvent>,
ctx: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
) {
const REVIEW_USER_MESSAGE_ID: &str = "review_rollout_user";
const REVIEW_ASSISTANT_MESSAGE_ID: &str = "review_rollout_assistant";
@@ -230,7 +241,7 @@ pub(crate) async fn exit_review_mode(
session
.record_conversation_items(
&ctx,
&initial_turn_context,
&[ResponseItem::Message {
id: Some(REVIEW_USER_MESSAGE_ID.to_string()),
role: "user".to_string(),
@@ -243,13 +254,13 @@ pub(crate) async fn exit_review_mode(
session
.send_event(
ctx.as_ref(),
initial_turn_context.as_ref(),
EventMsg::ExitedReviewMode(ExitedReviewModeEvent { review_output }),
)
.await;
session
.record_response_item_and_emit_turn_item(
ctx.as_ref(),
initial_turn_context.as_ref(),
ResponseItem::Message {
id: Some(REVIEW_ASSISTANT_MESSAGE_ID.to_string()),
role: "assistant".to_string(),

View File

@@ -38,7 +38,7 @@ impl SessionTask for UndoTask {
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
_input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
@@ -49,7 +49,7 @@ impl SessionTask for UndoTask {
.counter("codex.task.undo", 1, &[]);
let sess = session.clone_session();
sess.send_event(
ctx.as_ref(),
initial_turn_context.as_ref(),
EventMsg::UndoStarted(UndoStartedEvent {
message: Some("Undo in progress...".to_string()),
}),
@@ -58,7 +58,7 @@ impl SessionTask for UndoTask {
if cancellation_token.is_cancelled() {
sess.send_event(
ctx.as_ref(),
initial_turn_context.as_ref(),
EventMsg::UndoCompleted(UndoCompletedEvent {
success: false,
message: Some("Undo cancelled.".to_string()),
@@ -88,14 +88,17 @@ impl SessionTask for UndoTask {
})
else {
completed.message = Some("No ghost snapshot available to undo.".to_string());
sess.send_event(ctx.as_ref(), EventMsg::UndoCompleted(completed))
.await;
sess.send_event(
initial_turn_context.as_ref(),
EventMsg::UndoCompleted(completed),
)
.await;
return None;
};
let commit_id = ghost_commit.id().to_string();
let repo_path = ctx.cwd.clone();
let ghost_snapshot = ctx.ghost_snapshot.clone();
let repo_path = initial_turn_context.cwd.clone();
let ghost_snapshot = initial_turn_context.ghost_snapshot.clone();
let restore_result = tokio::task::spawn_blocking(move || {
let options = RestoreGhostCommitOptions::new(&repo_path).ghost_snapshot(ghost_snapshot);
restore_ghost_commit_with_options(&options, &ghost_commit)
@@ -124,8 +127,11 @@ impl SessionTask for UndoTask {
}
}
sess.send_event(ctx.as_ref(), EventMsg::UndoCompleted(completed))
.await;
sess.send_event(
initial_turn_context.as_ref(),
EventMsg::UndoCompleted(completed),
)
.await;
None
}
}

View File

@@ -73,13 +73,13 @@ impl SessionTask for UserShellCommandTask {
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
turn_context: Arc<TurnContext>,
initial_turn_context: Arc<TurnContext>,
_input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
execute_user_shell_command(
session.clone_session(),
turn_context,
initial_turn_context,
self.command.clone(),
cancellation_token,
UserShellCommandMode::StandaloneTurn,

View File

@@ -577,7 +577,7 @@ mod tests {
async fn run(
self: Arc<Self>,
_session: Arc<SessionTaskContext>,
_ctx: Arc<crate::codex::TurnContext>,
_initial_turn_context: Arc<crate::codex::TurnContext>,
_input: Vec<UserInput>,
_cancellation_token: CancellationToken,
) -> Option<String> {