Compare commits

...

5 Commits

Author SHA1 Message Date
Max Johnson
ae9cf6339b app-server: avoid expect in thread resume test 2026-02-13 09:49:13 -08:00
Max Johnson
69e29f1440 Merge branch 'maxj/fixauth' into merge 2026-02-13 09:42:41 -08:00
Max Johnson
ee0da1e20b app-server: simplify resume override mismatch check 2026-02-13 09:41:41 -08:00
Max Johnson
614be88ef5 app-server: rejoin running threads on resume 2026-02-13 09:25:01 -08:00
Max Johnson
aa23c966d3 fix crash in auth 2026-02-12 22:34:35 -08:00
4 changed files with 855 additions and 51 deletions

View File

@@ -2689,6 +2689,176 @@ impl CodexMessageProcessor {
}
async fn thread_resume(&mut self, request_id: ConnectionRequestId, params: ThreadResumeParams) {
if let Ok(existing_thread_id) = ThreadId::from_string(&params.thread_id)
&& let Ok(existing_thread) = self.thread_manager.get_thread(existing_thread_id).await
{
if params.history.is_some() {
self.send_invalid_request_error(
request_id,
format!(
"cannot resume thread {existing_thread_id} with history while it is already running"
),
)
.await;
return;
}
let rollout_path = if let Some(path) = existing_thread.rollout_path() {
if path.exists() {
path
} else {
match find_thread_path_by_id_str(
&self.config.codex_home,
&existing_thread_id.to_string(),
)
.await
{
Ok(Some(path)) => path,
Ok(None) => {
self.send_invalid_request_error(
request_id,
format!("no rollout found for thread id {existing_thread_id}"),
)
.await;
return;
}
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate thread id {existing_thread_id}: {err}"),
)
.await;
return;
}
}
}
} else {
match find_thread_path_by_id_str(
&self.config.codex_home,
&existing_thread_id.to_string(),
)
.await
{
Ok(Some(path)) => path,
Ok(None) => {
self.send_invalid_request_error(
request_id,
format!("no rollout found for thread id {existing_thread_id}"),
)
.await;
return;
}
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate thread id {existing_thread_id}: {err}"),
)
.await;
return;
}
}
};
if let Some(requested_path) = params.path.as_ref()
&& requested_path != &rollout_path
{
self.send_invalid_request_error(
request_id,
format!(
"cannot resume running thread {existing_thread_id} with mismatched path: requested `{}`, active `{}`",
requested_path.display(),
rollout_path.display()
),
)
.await;
return;
}
// Keep any in-flight stream attached instead of spawning a replacement thread object.
if let Err(err) = self
.ensure_conversation_listener(
existing_thread_id,
request_id.connection_id,
false,
ApiVersion::V2,
)
.await
{
tracing::warn!(
"failed to attach listener for thread {}: {}",
existing_thread_id,
err.message
);
}
let config_snapshot = existing_thread.config_snapshot().await;
let mismatch_details = collect_resume_override_mismatches(&params, &config_snapshot);
if !mismatch_details.is_empty() {
tracing::warn!(
"thread/resume overrides ignored for running thread {}: {}",
existing_thread_id,
mismatch_details.join("; ")
);
}
let mut thread = match read_summary_from_rollout(
rollout_path.as_path(),
config_snapshot.model_provider_id.as_str(),
)
.await
{
Ok(summary) => summary_to_thread(summary),
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {existing_thread_id}: {err}",
rollout_path.display()
),
)
.await;
return;
}
};
match read_rollout_items_from_rollout(rollout_path.as_path()).await {
Ok(items) => {
thread.turns = build_turns_from_rollout_items(&items);
}
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {existing_thread_id}: {err}",
rollout_path.display()
),
)
.await;
return;
}
}
let ThreadConfigSnapshot {
model,
model_provider_id,
approval_policy,
sandbox_policy,
cwd,
reasoning_effort,
..
} = config_snapshot;
let response = ThreadResumeResponse {
thread,
model,
model_provider: model_provider_id,
cwd,
approval_policy: approval_policy.into(),
sandbox: sandbox_policy.into(),
reasoning_effort,
};
self.outgoing.send_response(request_id, response).await;
return;
}
let ThreadResumeParams {
thread_id,
history,
@@ -5807,6 +5977,101 @@ impl CodexMessageProcessor {
}
}
fn collect_resume_override_mismatches(
request: &ThreadResumeParams,
config_snapshot: &ThreadConfigSnapshot,
) -> Vec<String> {
let mut mismatch_details = Vec::new();
if let Some(requested_model) = request.model.as_deref()
&& requested_model != config_snapshot.model
{
mismatch_details.push(format!(
"model requested={requested_model} active={}",
config_snapshot.model
));
}
if let Some(requested_provider) = request.model_provider.as_deref()
&& requested_provider != config_snapshot.model_provider_id
{
mismatch_details.push(format!(
"model_provider requested={requested_provider} active={}",
config_snapshot.model_provider_id
));
}
if let Some(requested_cwd) = request.cwd.as_deref() {
let requested_cwd_path = std::path::PathBuf::from(requested_cwd);
if requested_cwd_path != config_snapshot.cwd {
mismatch_details.push(format!(
"cwd requested={} active={}",
requested_cwd_path.display(),
config_snapshot.cwd.display()
));
}
}
if let Some(requested_approval) = request.approval_policy.as_ref() {
let active_approval: AskForApproval = config_snapshot.approval_policy.into();
if requested_approval != &active_approval {
mismatch_details.push(format!(
"approval_policy requested={requested_approval:?} active={active_approval:?}"
));
}
}
if let Some(requested_sandbox) = request.sandbox.as_ref() {
let sandbox_matches = matches!(
(requested_sandbox, &config_snapshot.sandbox_policy),
(
SandboxMode::ReadOnly,
codex_protocol::protocol::SandboxPolicy::ReadOnly { .. }
) | (
SandboxMode::WorkspaceWrite,
codex_protocol::protocol::SandboxPolicy::WorkspaceWrite { .. }
) | (
SandboxMode::DangerFullAccess,
codex_protocol::protocol::SandboxPolicy::DangerFullAccess
) | (
SandboxMode::DangerFullAccess,
codex_protocol::protocol::SandboxPolicy::ExternalSandbox { .. }
)
);
if !sandbox_matches {
mismatch_details.push(format!(
"sandbox requested={requested_sandbox:?} active={:?}",
config_snapshot.sandbox_policy
));
}
}
if let Some(requested_personality) = request.personality.as_ref()
&& config_snapshot.personality.as_ref() != Some(requested_personality)
{
mismatch_details.push(format!(
"personality requested={requested_personality:?} active={:?}",
config_snapshot.personality
));
}
if request.config.is_some() {
mismatch_details
.push("config overrides were provided and ignored while running".to_string());
}
if request.base_instructions.is_some() {
mismatch_details
.push("baseInstructions override was provided and ignored while running".to_string());
}
if request.developer_instructions.is_some() {
mismatch_details.push(
"developerInstructions override was provided and ignored while running".to_string(),
);
}
if request.persist_extended_history {
mismatch_details.push(
"persistExtendedHistory override was provided and ignored while running".to_string(),
);
}
mismatch_details
}
fn skills_to_info(
skills: &[codex_core::skills::SkillMetadata],
disabled_paths: &std::collections::HashSet<PathBuf>,

View File

@@ -209,18 +209,412 @@ async fn thread_resume_without_overrides_does_not_change_updated_at_or_mtime() -
Ok(())
}
#[tokio::test]
async fn thread_resume_keeps_in_flight_turn_streaming() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut primary = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??;
let start_id = primary
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5.1-codex-max".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let seed_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "seed history".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(seed_turn_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
primary.clear_message_buffer();
let mut secondary = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, secondary.initialize()).await??;
let turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "respond with docs".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(turn_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_notification_message("turn/started"),
)
.await??;
let resume_id = secondary
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id,
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
secondary.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
Ok(())
}
#[tokio::test]
async fn thread_resume_rejects_history_when_thread_is_running() -> Result<()> {
let server = responses::start_mock_server().await;
let first_body = responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_assistant_message("msg-1", "Done"),
responses::ev_completed("resp-1"),
]);
let second_body = responses::sse(vec![responses::ev_response_created("resp-2")]);
let _first_response_mock = responses::mount_sse_once(&server, first_body).await;
let _second_response_mock = responses::mount_sse_once(&server, second_body).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut primary = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??;
let start_id = primary
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5.1-codex-max".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let seed_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "seed history".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(seed_turn_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
primary.clear_message_buffer();
let running_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "keep running".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_notification_message("turn/started"),
)
.await??;
let resume_id = primary
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id,
history: Some(vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "history override".to_string(),
}],
end_turn: None,
phase: None,
}]),
..Default::default()
})
.await?;
let resume_err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_error_message(RequestId::Integer(resume_id)),
)
.await??;
assert!(
resume_err.error.message.contains("cannot resume thread")
&& resume_err.error.message.contains("with history")
&& resume_err.error.message.contains("running"),
"unexpected resume error: {}",
resume_err.error.message
);
Ok(())
}
#[tokio::test]
async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Result<()> {
let server = responses::start_mock_server().await;
let first_body = responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_assistant_message("msg-1", "Done"),
responses::ev_completed("resp-1"),
]);
let second_body = responses::sse(vec![responses::ev_response_created("resp-2")]);
let _first_response_mock = responses::mount_sse_once(&server, first_body).await;
let _second_response_mock = responses::mount_sse_once(&server, second_body).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut primary = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??;
let start_id = primary
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5.1-codex-max".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let seed_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "seed history".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(seed_turn_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
primary.clear_message_buffer();
let running_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "keep running".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_notification_message("turn/started"),
)
.await??;
let resume_id = primary
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id,
path: Some(PathBuf::from("/tmp/does-not-match-running-rollout.jsonl")),
..Default::default()
})
.await?;
let resume_err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_error_message(RequestId::Integer(resume_id)),
)
.await??;
assert!(
resume_err.error.message.contains("mismatched path"),
"unexpected resume error: {}",
resume_err.error.message
);
Ok(())
}
#[tokio::test]
async fn thread_resume_rejoins_running_thread_even_with_override_mismatch() -> Result<()> {
let server = responses::start_mock_server().await;
let first_body = responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_assistant_message("msg-1", "Done"),
responses::ev_completed("resp-1"),
]);
let second_body = responses::sse(vec![responses::ev_response_created("resp-2")]);
let _response_mock =
responses::mount_sse_sequence(&server, vec![first_body, second_body]).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut primary = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??;
let start_id = primary
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5.1-codex-max".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let seed_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "seed history".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(seed_turn_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
primary.clear_message_buffer();
let running_turn_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "keep running".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(running_turn_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_notification_message("turn/started"),
)
.await??;
let resume_id = primary
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id.clone(),
model: Some("not-the-running-model".to_string()),
cwd: Some("/tmp".to_string()),
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse { model, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(model, "gpt-5.1-codex-max");
timeout(
DEFAULT_READ_TIMEOUT,
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
Ok(())
}
#[tokio::test]
async fn thread_resume_with_overrides_defers_updated_at_until_turn_start() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
let rollout = setup_rollout_fixture(codex_home.path(), &server.uri())?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let RestartedThreadFixture {
mut mcp,
thread_id,
rollout_file_path,
} = start_materialized_thread_and_restart(codex_home.path(), "materialize").await?;
let expected_updated_at_rfc3339 = "2025-01-07T00:00:00Z";
set_rollout_mtime(rollout_file_path.as_path(), expected_updated_at_rfc3339)?;
let before_modified = std::fs::metadata(&rollout_file_path)?.modified()?;
let expected_updated_at = chrono::DateTime::parse_from_rfc3339(expected_updated_at_rfc3339)?
.with_timezone(&Utc)
.timestamp();
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: rollout.conversation_id.clone(),
thread_id,
model: Some("mock-model".to_string()),
..Default::default()
})
@@ -230,16 +624,19 @@ async fn thread_resume_with_overrides_defers_updated_at_until_turn_start() -> Re
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
let ThreadResumeResponse {
thread: resumed_thread,
..
} = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(thread.updated_at, rollout.expected_updated_at);
assert_eq!(resumed_thread.updated_at, expected_updated_at);
let after_resume_modified = std::fs::metadata(&rollout.rollout_file_path)?.modified()?;
assert_eq!(after_resume_modified, rollout.before_modified);
let after_resume_modified = std::fs::metadata(&rollout_file_path)?.modified()?;
assert_eq!(after_resume_modified, before_modified);
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: rollout.conversation_id,
thread_id: resumed_thread.id,
input: vec![UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
@@ -258,8 +655,8 @@ async fn thread_resume_with_overrides_defers_updated_at_until_turn_start() -> Re
)
.await??;
let after_turn_modified = std::fs::metadata(&rollout.rollout_file_path)?.modified()?;
assert!(after_turn_modified > rollout.before_modified);
let after_turn_modified = std::fs::metadata(&rollout_file_path)?.modified()?;
assert!(after_turn_modified > before_modified);
Ok(())
}
@@ -374,22 +771,9 @@ async fn thread_resume_supports_history_and_overrides() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Start a thread.
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5.1-codex-max".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let RestartedThreadFixture {
mut mcp, thread_id, ..
} = start_materialized_thread_and_restart(codex_home.path(), "seed history").await?;
let history_text = "Hello from history";
let history = vec![ResponseItem::Message {
@@ -405,7 +789,7 @@ async fn thread_resume_supports_history_and_overrides() -> Result<()> {
// Resume with explicit history and override the model.
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id,
thread_id,
history: Some(history),
model: Some("mock-model".to_string()),
model_provider: Some("mock_provider".to_string()),
@@ -429,6 +813,70 @@ async fn thread_resume_supports_history_and_overrides() -> Result<()> {
Ok(())
}
struct RestartedThreadFixture {
mcp: McpProcess,
thread_id: String,
rollout_file_path: PathBuf,
}
async fn start_materialized_thread_and_restart(
codex_home: &Path,
seed_text: &str,
) -> Result<RestartedThreadFixture> {
let mut first_mcp = McpProcess::new(codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, first_mcp.initialize()).await??;
let start_id = first_mcp
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5.1-codex-max".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
first_mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let materialize_turn_id = first_mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: seed_text.to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
first_mcp.read_stream_until_response_message(RequestId::Integer(materialize_turn_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
first_mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let thread_id = thread.id;
let rollout_file_path = thread
.path
.ok_or_else(|| anyhow::anyhow!("thread path missing from thread/start response"))?;
drop(first_mcp);
let mut second_mcp = McpProcess::new(codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, second_mcp.initialize()).await??;
Ok(RestartedThreadFixture {
mcp: second_mcp,
thread_id,
rollout_file_path,
})
}
#[tokio::test]
async fn thread_resume_accepts_personality_override() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -449,10 +897,10 @@ async fn thread_resume_accepts_personality_override() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let mut primary = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??;
let start_id = mcp
let start_id = primary
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5.2-codex".to_string()),
..Default::default()
@@ -460,12 +908,12 @@ async fn thread_resume_accepts_personality_override() -> Result<()> {
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
primary.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let materialize_id = mcp
let materialize_id = primary
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
@@ -477,16 +925,19 @@ async fn thread_resume_accepts_personality_override() -> Result<()> {
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(materialize_id)),
primary.read_stream_until_response_message(RequestId::Integer(materialize_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
primary.read_stream_until_notification_message("turn/completed"),
)
.await??;
let resume_id = mcp
let mut secondary = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, secondary.initialize()).await??;
let resume_id = secondary
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id,
model: Some("gpt-5.2-codex".to_string()),
@@ -496,12 +947,12 @@ async fn thread_resume_accepts_personality_override() -> Result<()> {
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
secondary.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let resume: ThreadResumeResponse = to_response::<ThreadResumeResponse>(resume_resp)?;
let turn_id = mcp
let turn_id = secondary
.send_turn_start_request(TurnStartParams {
thread_id: resume.thread.id,
input: vec![UserInput::Text {
@@ -513,13 +964,13 @@ async fn thread_resume_accepts_personality_override() -> Result<()> {
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
secondary.read_stream_until_response_message(RequestId::Integer(turn_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
secondary.read_stream_until_notification_message("turn/completed"),
)
.await??;

View File

@@ -7,6 +7,7 @@ use codex_protocol::protocol::McpAuthStatus;
use reqwest::Client;
use reqwest::StatusCode;
use reqwest::Url;
use reqwest::header::AUTHORIZATION;
use reqwest::header::HeaderMap;
use serde::Deserialize;
use tracing::debug;
@@ -33,12 +34,15 @@ pub async fn determine_streamable_http_auth_status(
return Ok(McpAuthStatus::BearerToken);
}
let default_headers = build_default_headers(http_headers, env_http_headers)?;
if default_headers.contains_key(AUTHORIZATION) {
return Ok(McpAuthStatus::BearerToken);
}
if has_oauth_tokens(server_name, url, store_mode)? {
return Ok(McpAuthStatus::OAuth);
}
let default_headers = build_default_headers(http_headers, env_http_headers)?;
match supports_oauth_login_with_headers(url, &default_headers).await {
Ok(true) => Ok(McpAuthStatus::NotLoggedIn),
Ok(false) => Ok(McpAuthStatus::Unsupported),
@@ -139,3 +143,84 @@ fn discovery_paths(base_path: &str) -> Vec<String> {
candidates
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use serial_test::serial;
use std::collections::HashMap;
use std::ffi::OsString;
struct EnvVarGuard {
key: String,
original: Option<OsString>,
}
impl EnvVarGuard {
fn set(key: &str, value: &str) -> Self {
let original = std::env::var_os(key);
unsafe {
std::env::set_var(key, value);
}
Self {
key: key.to_string(),
original,
}
}
}
impl Drop for EnvVarGuard {
fn drop(&mut self) {
if let Some(value) = &self.original {
unsafe {
std::env::set_var(&self.key, value);
}
} else {
unsafe {
std::env::remove_var(&self.key);
}
}
}
}
#[tokio::test]
async fn determine_auth_status_uses_bearer_token_when_authorization_header_present() {
let status = determine_streamable_http_auth_status(
"server",
"not-a-url",
None,
Some(HashMap::from([(
"Authorization".to_string(),
"Bearer token".to_string(),
)])),
None,
OAuthCredentialsStoreMode::Keyring,
)
.await
.expect("status should compute");
assert_eq!(status, McpAuthStatus::BearerToken);
}
#[tokio::test]
#[serial(auth_status_env)]
async fn determine_auth_status_uses_bearer_token_when_env_authorization_header_present() {
let _guard = EnvVarGuard::set("CODEX_RMCP_CLIENT_AUTH_STATUS_TEST_TOKEN", "Bearer token");
let status = determine_streamable_http_auth_status(
"server",
"not-a-url",
None,
None,
Some(HashMap::from([(
"Authorization".to_string(),
"CODEX_RMCP_CLIENT_AUTH_STATUS_TEST_TOKEN".to_string(),
)])),
OAuthCredentialsStoreMode::Keyring,
)
.await
.expect("status should compute");
assert_eq!(status, McpAuthStatus::BearerToken);
}
}

View File

@@ -11,6 +11,7 @@ use anyhow::anyhow;
use futures::FutureExt;
use futures::future::BoxFuture;
use oauth2::TokenResponse;
use reqwest::header::AUTHORIZATION;
use reqwest::header::HeaderMap;
use rmcp::model::CallToolRequestParams;
use rmcp::model::CallToolResult;
@@ -244,16 +245,18 @@ impl RmcpClient {
) -> Result<Self> {
let default_headers = build_default_headers(http_headers, env_http_headers)?;
let initial_oauth_tokens = match bearer_token {
Some(_) => None,
None => match load_oauth_tokens(server_name, url, store_mode) {
Ok(tokens) => tokens,
Err(err) => {
warn!("failed to read tokens for server `{server_name}`: {err}");
None
let initial_oauth_tokens =
if bearer_token.is_none() && !default_headers.contains_key(AUTHORIZATION) {
match load_oauth_tokens(server_name, url, store_mode) {
Ok(tokens) => tokens,
Err(err) => {
warn!("failed to read tokens for server `{server_name}`: {err}");
None
}
}
},
};
} else {
None
};
let transport = if let Some(initial_tokens) = initial_oauth_tokens.clone() {
match create_oauth_transport_and_runtime(