diff --git a/codex-rs/app-server/src/request_processors/turn_processor.rs b/codex-rs/app-server/src/request_processors/turn_processor.rs index 17df7b30ed..b5da8588b8 100644 --- a/codex-rs/app-server/src/request_processors/turn_processor.rs +++ b/codex-rs/app-server/src/request_processors/turn_processor.rs @@ -52,6 +52,9 @@ struct ResolvedTurnContextOverrides { overrides: CodexThreadTurnContextOverrides, } +const TURN_CONTEXT_APPLY_TIMEOUT: Duration = Duration::from_secs(5); +const TURN_CONTEXT_APPLY_POLL_INTERVAL: Duration = Duration::from_millis(10); + impl TurnRequestProcessor { #[allow(clippy::too_many_arguments)] pub(crate) fn new( @@ -569,6 +572,30 @@ impl TurnRequestProcessor { error })?; + if let Some(after_turn_context) = after_turn_context { + if before_turn_context != after_turn_context { + let started = Instant::now(); + loop { + let config_snapshot = thread.config_snapshot().await; + if thread_turn_context_from_snapshot(&config_snapshot) == after_turn_context { + break; + } + if started.elapsed() >= TURN_CONTEXT_APPLY_TIMEOUT { + return Err(internal_error( + "timed out waiting for turn context overrides to apply".to_string(), + )); + } + tokio::time::sleep(TURN_CONTEXT_APPLY_POLL_INTERVAL).await; + } + } + self.maybe_emit_turn_context_updated( + ¶ms.thread_id, + &before_turn_context, + after_turn_context, + ) + .await; + } + if turn_has_input { let config_snapshot = thread.config_snapshot().await; codex_memories_write::start_memories_startup_task( @@ -580,14 +607,6 @@ impl TurnRequestProcessor { &config_snapshot.session_source, ); } - if let Some(after_turn_context) = after_turn_context { - self.maybe_emit_turn_context_updated( - ¶ms.thread_id, - &before_turn_context, - after_turn_context, - ) - .await; - } self.outgoing .record_request_turn_id(&request_id, &turn_id) diff --git a/codex-rs/app-server/tests/suite/v2/thread_turn_context.rs b/codex-rs/app-server/tests/suite/v2/thread_turn_context.rs index b33f6442d7..5c6f4f3ad4 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_turn_context.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_turn_context.rs @@ -251,3 +251,82 @@ async fn turn_start_emits_turn_context_updated_when_overrides_change_defaults() Ok(()) } + +#[tokio::test] +async fn thread_turn_context_update_after_turn_start_preserves_newer_update() -> Result<()> { + let server = create_mock_responses_server_sequence_unchecked(vec![ + create_final_assistant_message_sse_response("Done")?, + ]) + .await; + let codex_home = TempDir::new()?; + write_config(&codex_home, &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + let ThreadStartResponse { thread, .. } = start_thread(&mut mcp).await?; + + let turn_request_id = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "Hello".to_string(), + text_elements: Vec::new(), + }], + model: Some("gpt-5.2".to_string()), + effort: Some(ReasoningEffort::Low), + ..Default::default() + }) + .await?; + let update_request_id = mcp + .send_thread_turn_context_update_request(ThreadTurnContextUpdateParams { + thread_id: thread.id.clone(), + model: Some("gpt-5.4".to_string()), + effort: Some(Some(ReasoningEffort::High)), + ..Default::default() + }) + .await?; + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_request_id)), + ) + .await??; + let update_response: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(update_request_id)), + ) + .await??; + let update_response = to_response::(update_response)?; + assert_eq!(update_response.turn_context.model, "gpt-5.4"); + assert_eq!( + update_response.turn_context.effort, + Some(ReasoningEffort::High) + ); + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + mcp.clear_message_buffer(); + let read_current_request_id = mcp + .send_thread_turn_context_update_request(ThreadTurnContextUpdateParams { + thread_id: thread.id, + ..Default::default() + }) + .await?; + let read_current_response: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(read_current_request_id)), + ) + .await??; + let read_current_response = + to_response::(read_current_response)?; + assert_eq!( + read_current_response.turn_context, + update_response.turn_context + ); + + Ok(()) +}