use anyhow::Result; use app_test_support::McpProcess; use app_test_support::create_fake_rollout_with_text_elements; use app_test_support::create_mock_responses_server_repeating_assistant; use app_test_support::rollout_path; use app_test_support::to_response; use chrono::Utc; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::SessionSource; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadResumeParams; use codex_app_server_protocol::ThreadResumeResponse; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::UserInput; use codex_protocol::config_types::Personality; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; use codex_protocol::user_input::ByteRange; use codex_protocol::user_input::TextElement; use core_test_support::responses; use core_test_support::skip_if_no_network; use pretty_assertions::assert_eq; use std::fs::FileTimes; use std::path::Path; use std::path::PathBuf; use tempfile::TempDir; use tokio::time::timeout; const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); const CODEX_5_2_INSTRUCTIONS_TEMPLATE_DEFAULT: &str = "You are Codex, a coding agent based on GPT-5. You and the user share the same workspace and collaborate to achieve the user's goals."; #[tokio::test] async fn thread_resume_returns_original_thread() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; create_config_toml(codex_home.path(), &server.uri())?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; // Start a thread. let start_id = mcp .send_thread_start_request(ThreadStartParams { model: Some("gpt-5.1-codex-max".to_string()), ..Default::default() }) .await?; let start_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(start_id)), ) .await??; let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; // Resume it via v2 API. let resume_id = mcp .send_thread_resume_request(ThreadResumeParams { thread_id: thread.id.clone(), ..Default::default() }) .await?; let resume_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), ) .await??; let ThreadResumeResponse { thread: resumed, .. } = to_response::(resume_resp)?; let mut expected = thread; expected.updated_at = resumed.updated_at; assert_eq!(resumed, expected); Ok(()) } #[tokio::test] async fn thread_resume_returns_rollout_history() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; create_config_toml(codex_home.path(), &server.uri())?; let preview = "Saved user message"; let text_elements = vec![TextElement::new( ByteRange { start: 0, end: 5 }, Some("".into()), )]; let conversation_id = create_fake_rollout_with_text_elements( codex_home.path(), "2025-01-05T12-00-00", "2025-01-05T12:00:00Z", preview, text_elements .iter() .map(|elem| serde_json::to_value(elem).expect("serialize text element")) .collect(), Some("mock_provider"), None, )?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; let resume_id = mcp .send_thread_resume_request(ThreadResumeParams { thread_id: conversation_id.clone(), ..Default::default() }) .await?; let resume_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), ) .await??; let ThreadResumeResponse { thread, .. } = to_response::(resume_resp)?; assert_eq!(thread.id, conversation_id); assert_eq!(thread.preview, preview); assert_eq!(thread.model_provider, "mock_provider"); assert!(thread.path.as_ref().expect("thread path").is_absolute()); assert_eq!(thread.cwd, PathBuf::from("/")); assert_eq!(thread.cli_version, "0.0.0"); assert_eq!(thread.source, SessionSource::Cli); assert_eq!(thread.git_info, None); assert_eq!( thread.turns.len(), 1, "expected rollouts to include one turn" ); let turn = &thread.turns[0]; assert_eq!(turn.status, TurnStatus::Completed); assert_eq!(turn.items.len(), 1, "expected user message item"); match &turn.items[0] { ThreadItem::UserMessage { content, .. } => { assert_eq!( content, &vec![UserInput::Text { text: preview.to_string(), text_elements: text_elements.clone().into_iter().map(Into::into).collect(), }] ); } other => panic!("expected user message item, got {other:?}"), } Ok(()) } #[tokio::test] async fn thread_resume_without_overrides_does_not_change_updated_at_or_mtime() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; let rollout = setup_rollout_fixture(codex_home.path(), &server.uri())?; let thread_id = rollout.conversation_id.clone(); let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; let resume_id = mcp .send_thread_resume_request(ThreadResumeParams { thread_id: thread_id.clone(), ..Default::default() }) .await?; let resume_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), ) .await??; let ThreadResumeResponse { thread, .. } = to_response::(resume_resp)?; assert_eq!(thread.updated_at, rollout.expected_updated_at); let after_modified = std::fs::metadata(&rollout.rollout_file_path)?.modified()?; assert_eq!(after_modified, rollout.before_modified); let turn_id = mcp .send_turn_start_request(TurnStartParams { thread_id, input: vec![UserInput::Text { text: "Hello".to_string(), text_elements: Vec::new(), }], ..Default::default() }) .await?; timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), ) .await??; timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_notification_message("turn/completed"), ) .await??; let after_turn_modified = std::fs::metadata(&rollout.rollout_file_path)?.modified()?; assert!(after_turn_modified > rollout.before_modified); Ok(()) } #[tokio::test] async fn thread_resume_with_overrides_defers_updated_at_until_turn_start() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; let rollout = setup_rollout_fixture(codex_home.path(), &server.uri())?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; let resume_id = mcp .send_thread_resume_request(ThreadResumeParams { thread_id: rollout.conversation_id.clone(), model: Some("mock-model".to_string()), ..Default::default() }) .await?; let resume_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), ) .await??; let ThreadResumeResponse { thread, .. } = to_response::(resume_resp)?; assert_eq!(thread.updated_at, rollout.expected_updated_at); let after_resume_modified = std::fs::metadata(&rollout.rollout_file_path)?.modified()?; assert_eq!(after_resume_modified, rollout.before_modified); let turn_id = mcp .send_turn_start_request(TurnStartParams { thread_id: rollout.conversation_id, input: vec![UserInput::Text { text: "Hello".to_string(), text_elements: Vec::new(), }], ..Default::default() }) .await?; timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), ) .await??; timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_notification_message("turn/completed"), ) .await??; let after_turn_modified = std::fs::metadata(&rollout.rollout_file_path)?.modified()?; assert!(after_turn_modified > rollout.before_modified); Ok(()) } #[tokio::test] async fn thread_resume_prefers_path_over_thread_id() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; create_config_toml(codex_home.path(), &server.uri())?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; let start_id = mcp .send_thread_start_request(ThreadStartParams { model: Some("gpt-5.1-codex-max".to_string()), ..Default::default() }) .await?; let start_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(start_id)), ) .await??; let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; let thread_path = thread.path.clone().expect("thread path"); let resume_id = mcp .send_thread_resume_request(ThreadResumeParams { thread_id: "not-a-valid-thread-id".to_string(), path: Some(thread_path), ..Default::default() }) .await?; let resume_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), ) .await??; let ThreadResumeResponse { thread: resumed, .. } = to_response::(resume_resp)?; let mut expected = thread; expected.updated_at = resumed.updated_at; assert_eq!(resumed, expected); Ok(()) } #[tokio::test] async fn thread_resume_supports_history_and_overrides() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; create_config_toml(codex_home.path(), &server.uri())?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; // Start a thread. let start_id = mcp .send_thread_start_request(ThreadStartParams { model: Some("gpt-5.1-codex-max".to_string()), ..Default::default() }) .await?; let start_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(start_id)), ) .await??; let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; let history_text = "Hello from history"; let history = vec![ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: history_text.to_string(), }], end_turn: None, }]; // Resume with explicit history and override the model. let resume_id = mcp .send_thread_resume_request(ThreadResumeParams { thread_id: thread.id, history: Some(history), model: Some("mock-model".to_string()), model_provider: Some("mock_provider".to_string()), ..Default::default() }) .await?; let resume_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), ) .await??; let ThreadResumeResponse { thread: resumed, model_provider, .. } = to_response::(resume_resp)?; assert!(!resumed.id.is_empty()); assert_eq!(model_provider, "mock_provider"); assert_eq!(resumed.preview, history_text); Ok(()) } #[tokio::test] async fn thread_resume_accepts_personality_override() -> Result<()> { skip_if_no_network!(Ok(())); let server = responses::start_mock_server().await; let body = responses::sse(vec![ responses::ev_response_created("resp-1"), responses::ev_assistant_message("msg-1", "Done"), responses::ev_completed("resp-1"), ]); let response_mock = responses::mount_sse_once(&server, body).await; let codex_home = TempDir::new()?; create_config_toml(codex_home.path(), &server.uri())?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; let start_id = mcp .send_thread_start_request(ThreadStartParams { model: Some("gpt-5.2-codex".to_string()), ..Default::default() }) .await?; let start_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(start_id)), ) .await??; let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; let resume_id = mcp .send_thread_resume_request(ThreadResumeParams { thread_id: thread.id.clone(), model: Some("gpt-5.2-codex".to_string()), personality: Some(Personality::Pragmatic), ..Default::default() }) .await?; let resume_resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), ) .await??; let _resume: ThreadResumeResponse = to_response::(resume_resp)?; let turn_id = mcp .send_turn_start_request(TurnStartParams { thread_id: thread.id, input: vec![UserInput::Text { text: "Hello".to_string(), text_elements: Vec::new(), }], ..Default::default() }) .await?; timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(turn_id)), ) .await??; timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_notification_message("turn/completed"), ) .await??; let request = response_mock.single_request(); let developer_texts = request.message_input_texts("developer"); assert!( developer_texts .iter() .any(|text| text.contains("")), "expected a personality update message in developer input, got {developer_texts:?}" ); let instructions_text = request.instructions_text(); assert!( instructions_text.contains(CODEX_5_2_INSTRUCTIONS_TEMPLATE_DEFAULT), "expected default base instructions from history, got {instructions_text:?}" ); Ok(()) } // Helper to create a config.toml pointing at the mock model server. fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> { let config_toml = codex_home.join("config.toml"); std::fs::write( config_toml, format!( r#" model = "gpt-5.2-codex" approval_policy = "never" sandbox_mode = "read-only" model_provider = "mock_provider" [features] remote_models = false personality = true [model_providers.mock_provider] name = "Mock provider for test" base_url = "{server_uri}/v1" wire_api = "responses" request_max_retries = 0 stream_max_retries = 0 "# ), ) } fn set_rollout_mtime(path: &Path, updated_at_rfc3339: &str) -> Result<()> { let parsed = chrono::DateTime::parse_from_rfc3339(updated_at_rfc3339)?.with_timezone(&Utc); let times = FileTimes::new().set_modified(parsed.into()); std::fs::OpenOptions::new() .append(true) .open(path)? .set_times(times)?; Ok(()) } struct RolloutFixture { conversation_id: String, rollout_file_path: PathBuf, before_modified: std::time::SystemTime, expected_updated_at: i64, } fn setup_rollout_fixture(codex_home: &Path, server_uri: &str) -> Result { create_config_toml(codex_home, server_uri)?; let preview = "Saved user message"; let filename_ts = "2025-01-05T12-00-00"; let meta_rfc3339 = "2025-01-05T12:00:00Z"; let expected_updated_at_rfc3339 = "2025-01-07T00:00:00Z"; let conversation_id = create_fake_rollout_with_text_elements( codex_home, filename_ts, meta_rfc3339, preview, Vec::new(), Some("mock_provider"), None, )?; let rollout_file_path = rollout_path(codex_home, filename_ts, &conversation_id); set_rollout_mtime(rollout_file_path.as_path(), expected_updated_at_rfc3339)?; let before_modified = std::fs::metadata(&rollout_file_path)?.modified()?; let expected_updated_at = chrono::DateTime::parse_from_rfc3339(expected_updated_at_rfc3339)? .with_timezone(&Utc) .timestamp(); Ok(RolloutFixture { conversation_id, rollout_file_path, before_modified, expected_updated_at, }) }