use anyhow::Result; use app_test_support::ChatGptAuthFixture; use app_test_support::McpProcess; use app_test_support::create_mock_responses_server_repeating_assistant; use app_test_support::to_response; use app_test_support::write_chatgpt_auth; use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::McpServerStartupState; use codex_app_server_protocol::McpServerStatusUpdatedNotification; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; use codex_app_server_protocol::ThreadStartedNotification; use codex_app_server_protocol::ThreadStatus; use codex_app_server_protocol::ThreadStatusChangedNotification; use codex_core::config::set_project_trust_level; use codex_login::AuthCredentialsStoreMode; use codex_login::REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR; use codex_protocol::config_types::ServiceTier; use codex_protocol::config_types::TrustLevel; use codex_protocol::openai_models::ReasoningEffort; use pretty_assertions::assert_eq; use serde_json::Value; use serde_json::json; use std::path::Path; use std::time::Duration; use tempfile::TempDir; use tokio::time::timeout; use wiremock::Mock; use wiremock::MockServer; use wiremock::ResponseTemplate; use wiremock::matchers::method; use wiremock::matchers::path; use super::analytics::assert_basic_thread_initialized_event; use super::analytics::enable_analytics_capture; use super::analytics::thread_initialized_event; use super::analytics::wait_for_analytics_payload; const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); #[tokio::test] async fn thread_start_creates_thread_and_emits_started() -> Result<()> { // Provide a mock server and config so model wiring is valid. let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; create_config_toml(codex_home.path(), &server.uri())?; // Start server and initialize. let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; // Start a v2 thread with an explicit model override. let req_id = mcp .send_thread_start_request(ThreadStartParams { model: Some("gpt-5.1".to_string()), ..Default::default() }) .await?; // Expect a proper JSON-RPC response with a thread id. let resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(req_id)), ) .await??; let resp_result = resp.result.clone(); let ThreadStartResponse { thread, model_provider, .. } = to_response::(resp)?; assert!(!thread.id.is_empty(), "thread id should not be empty"); assert!( thread.preview.is_empty(), "new threads should start with an empty preview" ); assert_eq!(model_provider, "mock_provider"); assert!( thread.created_at > 0, "created_at should be a positive UNIX timestamp" ); assert!( !thread.ephemeral, "new persistent threads should not be ephemeral" ); assert_eq!(thread.status, ThreadStatus::Idle); let thread_path = thread.path.clone().expect("thread path should be present"); assert!(thread_path.is_absolute(), "thread path should be absolute"); assert!( !thread_path.exists(), "fresh thread rollout should not be materialized until first user message" ); // Wire contract: thread title field is `name`, serialized as null when unset. let thread_json = resp_result .get("thread") .and_then(Value::as_object) .expect("thread/start result.thread must be an object"); assert_eq!( thread_json.get("name"), Some(&Value::Null), "new threads should serialize `name: null`" ); assert_eq!( thread_json.get("ephemeral").and_then(Value::as_bool), Some(false), "new persistent threads should serialize `ephemeral: false`" ); assert_eq!(thread.name, None); // A corresponding thread/started notification should arrive. let deadline = tokio::time::Instant::now() + DEFAULT_READ_TIMEOUT; let notif = loop { let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); let message = timeout(remaining, mcp.read_next_message()).await??; let JSONRPCMessage::Notification(notif) = message else { continue; }; if notif.method == "thread/status/changed" { let status_changed: ThreadStatusChangedNotification = serde_json::from_value(notif.params.expect("params must be present"))?; if status_changed.thread_id == thread.id { anyhow::bail!( "thread/start should introduce the thread without a preceding thread/status/changed" ); } continue; } if notif.method == "thread/started" { break notif; } }; let started_params = notif.params.clone().expect("params must be present"); let started_thread_json = started_params .get("thread") .and_then(Value::as_object) .expect("thread/started params.thread must be an object"); assert_eq!( started_thread_json.get("name"), Some(&Value::Null), "thread/started should serialize `name: null` for new threads" ); assert_eq!( started_thread_json .get("ephemeral") .and_then(Value::as_bool), Some(false), "thread/started should serialize `ephemeral: false` for new persistent threads" ); let started: ThreadStartedNotification = serde_json::from_value(notif.params.expect("params must be present"))?; assert_eq!(started.thread, thread); Ok(()) } #[tokio::test] async fn thread_start_tracks_thread_initialized_analytics() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; create_config_toml_with_chatgpt_base_url( codex_home.path(), &server.uri(), &server.uri(), /*general_analytics_enabled*/ true, )?; enable_analytics_capture(&server, codex_home.path()).await?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; let req_id = mcp .send_thread_start_request(ThreadStartParams::default()) .await?; let resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(req_id)), ) .await??; let ThreadStartResponse { thread, .. } = to_response::(resp)?; let payload = wait_for_analytics_payload(&server, DEFAULT_READ_TIMEOUT).await?; assert_eq!(payload["events"].as_array().expect("events array").len(), 1); let event = thread_initialized_event(&payload)?; assert_basic_thread_initialized_event(event, &thread.id, "mock-model", "new"); Ok(()) } #[tokio::test] async fn thread_start_does_not_track_thread_initialized_analytics_without_feature() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; create_config_toml_with_chatgpt_base_url( codex_home.path(), &server.uri(), &server.uri(), /*general_analytics_enabled*/ false, )?; enable_analytics_capture(&server, codex_home.path()).await?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; let req_id = mcp .send_thread_start_request(ThreadStartParams::default()) .await?; let resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(req_id)), ) .await??; let _ = to_response::(resp)?; let payload = wait_for_analytics_payload(&server, Duration::from_millis(250)).await; assert!( payload.is_err(), "thread analytics should be gated off when general_analytics is disabled" ); Ok(()) } #[tokio::test] async fn thread_start_respects_project_config_from_cwd() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; create_config_toml(codex_home.path(), &server.uri())?; let workspace = TempDir::new()?; let project_config_dir = workspace.path().join(".codex"); std::fs::create_dir_all(&project_config_dir)?; std::fs::write( project_config_dir.join("config.toml"), r#" model_reasoning_effort = "high" "#, )?; set_project_trust_level(codex_home.path(), workspace.path(), TrustLevel::Trusted)?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; let req_id = mcp .send_thread_start_request(ThreadStartParams { cwd: Some(workspace.path().to_string_lossy().into_owned()), ..Default::default() }) .await?; let resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(req_id)), ) .await??; let ThreadStartResponse { reasoning_effort, .. } = to_response::(resp)?; assert_eq!(reasoning_effort, Some(ReasoningEffort::High)); Ok(()) } #[tokio::test] async fn thread_start_accepts_flex_service_tier() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; create_config_toml(codex_home.path(), &server.uri())?; let read_timeout = if cfg!(windows) { std::time::Duration::from_secs(15) } else { DEFAULT_READ_TIMEOUT }; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(read_timeout, mcp.initialize()).await??; let req_id = mcp .send_thread_start_request(ThreadStartParams { service_tier: Some(Some(ServiceTier::Flex)), ..Default::default() }) .await?; let resp: JSONRPCResponse = timeout( read_timeout, mcp.read_stream_until_response_message(RequestId::Integer(req_id)), ) .await??; let ThreadStartResponse { service_tier, .. } = to_response::(resp)?; assert_eq!(service_tier, Some(ServiceTier::Flex)); Ok(()) } #[tokio::test] async fn thread_start_accepts_metrics_service_name() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; create_config_toml(codex_home.path(), &server.uri())?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; let req_id = mcp .send_thread_start_request(ThreadStartParams { service_name: Some("my_app_server_client".to_string()), ..Default::default() }) .await?; let resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(req_id)), ) .await??; let ThreadStartResponse { thread, .. } = to_response::(resp)?; assert!(!thread.id.is_empty(), "thread id should not be empty"); Ok(()) } #[tokio::test] async fn thread_start_ephemeral_remains_pathless() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; create_config_toml(codex_home.path(), &server.uri())?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; let req_id = mcp .send_thread_start_request(ThreadStartParams { model: Some("gpt-5.1".to_string()), ephemeral: Some(true), ..Default::default() }) .await?; let resp: JSONRPCResponse = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(req_id)), ) .await??; let resp_result = resp.result.clone(); let ThreadStartResponse { thread, .. } = to_response::(resp)?; assert!( thread.ephemeral, "ephemeral threads should be marked explicitly" ); assert_eq!( thread.path, None, "ephemeral threads should not expose a path" ); let thread_json = resp_result .get("thread") .and_then(Value::as_object) .expect("thread/start result.thread must be an object"); assert_eq!( thread_json.get("ephemeral").and_then(Value::as_bool), Some(true), "ephemeral threads should serialize `ephemeral: true`" ); Ok(()) } #[tokio::test] async fn thread_start_fails_when_required_mcp_server_fails_to_initialize() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; create_config_toml_with_required_broken_mcp(codex_home.path(), &server.uri())?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; let req_id = mcp .send_thread_start_request(ThreadStartParams::default()) .await?; let err: JSONRPCError = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_error_message(RequestId::Integer(req_id)), ) .await??; assert!( err.error .message .contains("required MCP servers failed to initialize"), "unexpected error message: {}", err.error.message ); assert!( err.error.message.contains("required_broken"), "unexpected error message: {}", err.error.message ); Ok(()) } #[tokio::test] async fn thread_start_emits_mcp_server_status_updated_notifications() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; create_config_toml_with_optional_broken_mcp(codex_home.path(), &server.uri())?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; let req_id = mcp .send_thread_start_request(ThreadStartParams::default()) .await?; let _: ThreadStartResponse = to_response( timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_response_message(RequestId::Integer(req_id)), ) .await??, )?; let starting = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_matching_notification( "mcpServer/startupStatus/updated starting", |notification| { notification.method == "mcpServer/startupStatus/updated" && notification .params .as_ref() .and_then(|params| params.get("name")) .and_then(Value::as_str) == Some("optional_broken") && notification .params .as_ref() .and_then(|params| params.get("status")) .and_then(Value::as_str) == Some("starting") }, ), ) .await??; let starting: ServerNotification = starting.try_into()?; let ServerNotification::McpServerStatusUpdated(starting) = starting else { anyhow::bail!("unexpected notification variant"); }; assert_eq!( starting, McpServerStatusUpdatedNotification { name: "optional_broken".to_string(), status: McpServerStartupState::Starting, error: None, } ); let failed = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_matching_notification( "mcpServer/startupStatus/updated failed", |notification| { notification.method == "mcpServer/startupStatus/updated" && notification .params .as_ref() .and_then(|params| params.get("name")) .and_then(Value::as_str) == Some("optional_broken") && notification .params .as_ref() .and_then(|params| params.get("status")) .and_then(Value::as_str) == Some("failed") }, ), ) .await??; let failed: ServerNotification = failed.try_into()?; let ServerNotification::McpServerStatusUpdated(failed) = failed else { anyhow::bail!("unexpected notification variant"); }; assert_eq!(failed.name, "optional_broken"); assert_eq!(failed.status, McpServerStartupState::Failed); assert!( failed .error .as_deref() .is_some_and(|error| error.contains("MCP client for `optional_broken` failed to start")), "unexpected MCP startup error: {:?}", failed.error ); Ok(()) } #[tokio::test] async fn thread_start_surfaces_cloud_requirements_load_errors() -> Result<()> { let server = MockServer::start().await; Mock::given(method("GET")) .and(path("/backend-api/wham/config/requirements")) .respond_with( ResponseTemplate::new(401) .insert_header("content-type", "text/html") .set_body_string("nope"), ) .mount(&server) .await; Mock::given(method("POST")) .and(path("/oauth/token")) .respond_with(ResponseTemplate::new(401).set_body_json(json!({ "error": { "code": "refresh_token_invalidated" } }))) .mount(&server) .await; let codex_home = TempDir::new()?; let model_server = create_mock_responses_server_repeating_assistant("Done").await; let chatgpt_base_url = format!("{}/backend-api", server.uri()); create_config_toml_with_chatgpt_base_url( codex_home.path(), &model_server.uri(), &chatgpt_base_url, /*general_analytics_enabled*/ false, )?; write_chatgpt_auth( codex_home.path(), ChatGptAuthFixture::new("chatgpt-token") .refresh_token("stale-refresh-token") .plan_type("business") .chatgpt_user_id("user-123") .chatgpt_account_id("account-123") .account_id("account-123"), AuthCredentialsStoreMode::File, )?; let refresh_token_url = format!("{}/oauth/token", server.uri()); let mut mcp = McpProcess::new_with_env( codex_home.path(), &[ ("OPENAI_API_KEY", None), ( REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR, Some(refresh_token_url.as_str()), ), ], ) .await?; timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; let req_id = mcp .send_thread_start_request(ThreadStartParams::default()) .await?; let err: JSONRPCError = timeout( DEFAULT_READ_TIMEOUT, mcp.read_stream_until_error_message(RequestId::Integer(req_id)), ) .await??; assert!( err.error.message.contains("failed to load configuration"), "unexpected error message: {}", err.error.message ); assert_eq!( err.error.data, Some(json!({ "reason": "cloudRequirements", "errorCode": "Auth", "action": "relogin", "statusCode": 401, "detail": "Your access token could not be refreshed because your refresh token was revoked. Please log out and sign in again.", })) ); Ok(()) } // Helper to create a config.toml pointing at the mock model server. fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { let config_toml = codex_home.join("config.toml"); std::fs::write( config_toml, format!( r#" model = "mock-model" approval_policy = "never" sandbox_mode = "read-only" model_provider = "mock_provider" [model_providers.mock_provider] name = "Mock provider for test" base_url = "{server_uri}/v1" wire_api = "responses" request_max_retries = 0 stream_max_retries = 0 "# ), ) } fn create_config_toml_with_chatgpt_base_url( codex_home: &Path, server_uri: &str, chatgpt_base_url: &str, general_analytics_enabled: bool, ) -> std::io::Result<()> { let general_analytics_toml = if general_analytics_enabled { "\n[features]\ngeneral_analytics = true\n".to_string() } else { String::new() }; let config_toml = codex_home.join("config.toml"); std::fs::write( config_toml, format!( r#" model = "mock-model" approval_policy = "never" sandbox_mode = "read-only" chatgpt_base_url = "{chatgpt_base_url}" model_provider = "mock_provider" {general_analytics_toml} [model_providers.mock_provider] name = "Mock provider for test" base_url = "{server_uri}/v1" wire_api = "responses" request_max_retries = 0 stream_max_retries = 0 "# ), ) } fn create_config_toml_with_required_broken_mcp( codex_home: &Path, server_uri: &str, ) -> std::io::Result<()> { let config_toml = codex_home.join("config.toml"); std::fs::write( config_toml, format!( r#" model = "mock-model" approval_policy = "never" sandbox_mode = "read-only" model_provider = "mock_provider" [model_providers.mock_provider] name = "Mock provider for test" base_url = "{server_uri}/v1" wire_api = "responses" request_max_retries = 0 stream_max_retries = 0 [mcp_servers.required_broken] {required_broken_transport} required = true "#, required_broken_transport = broken_mcp_transport_toml() ), ) } fn create_config_toml_with_optional_broken_mcp( codex_home: &Path, server_uri: &str, ) -> std::io::Result<()> { let config_toml = codex_home.join("config.toml"); std::fs::write( config_toml, format!( r#" model = "mock-model" approval_policy = "never" sandbox_mode = "read-only" model_provider = "mock_provider" [model_providers.mock_provider] name = "Mock provider for test" base_url = "{server_uri}/v1" wire_api = "responses" request_max_retries = 0 stream_max_retries = 0 [mcp_servers.optional_broken] {optional_broken_transport} "#, optional_broken_transport = broken_mcp_transport_toml() ), ) } #[cfg(target_os = "windows")] fn broken_mcp_transport_toml() -> &'static str { r#"command = "cmd" args = ["/C", "exit 1"]"# } #[cfg(not(target_os = "windows"))] fn broken_mcp_transport_toml() -> &'static str { r#"command = "/bin/sh" args = ["-c", "exit 1"]"# }