Files
codex/codex-rs/app-server/tests/suite/v2/thread_unsubscribe.rs
starr-openai 62847e7554 Make thread unsubscribe test deterministic (#18000)
## Summary
- replace the unsubscribe-during-turn test's sleep/polling flow with a
gated streaming SSE response
- add request-count notification support to the streaming SSE test
server so the test can wait for the in-flight Responses request
deterministically

## Scope
- codex-rs/app-server/tests/suite/v2/thread_unsubscribe.rs
- codex-rs/core/tests/common/streaming_sse.rs

## Validation
- Not run locally; this is a narrow extraction from the prior CI-green
branch.

---------

Co-authored-by: Codex <noreply@openai.com>
2026-04-16 19:34:04 +00:00

432 lines
15 KiB
Rust

use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::to_response;
use codex_app_server_protocol::DynamicToolCallOutputContentItem;
use codex_app_server_protocol::DynamicToolCallParams;
use codex_app_server_protocol::DynamicToolCallResponse;
use codex_app_server_protocol::DynamicToolSpec;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadLoadedListParams;
use codex_app_server_protocol::ThreadLoadedListResponse;
use codex_app_server_protocol::ThreadReadParams;
use codex_app_server_protocol::ThreadReadResponse;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadUnsubscribeParams;
use codex_app_server_protocol::ThreadUnsubscribeResponse;
use codex_app_server_protocol::ThreadUnsubscribeStatus;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use core_test_support::responses;
use core_test_support::streaming_sse::StreamingSseChunk;
use core_test_support::streaming_sse::start_streaming_sse_server;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_unsubscribe_keeps_thread_loaded_until_idle_timeout() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_id = start_thread(&mut mcp).await?;
let unsubscribe_id = mcp
.send_thread_unsubscribe_request(ThreadUnsubscribeParams {
thread_id: thread_id.clone(),
})
.await?;
let unsubscribe_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(unsubscribe_id)),
)
.await??;
let unsubscribe = to_response::<ThreadUnsubscribeResponse>(unsubscribe_resp)?;
assert_eq!(unsubscribe.status, ThreadUnsubscribeStatus::Unsubscribed);
assert!(
timeout(
std::time::Duration::from_millis(250),
mcp.read_stream_until_notification_message("thread/closed"),
)
.await
.is_err()
);
let list_id = mcp
.send_thread_loaded_list_request(ThreadLoadedListParams::default())
.await?;
let list_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
)
.await??;
let ThreadLoadedListResponse { data, next_cursor } =
to_response::<ThreadLoadedListResponse>(list_resp)?;
assert_eq!(data, vec![thread_id]);
assert_eq!(next_cursor, None);
Ok(())
}
#[tokio::test]
async fn thread_unsubscribe_during_turn_keeps_turn_running() -> Result<()> {
let call_id = "deterministic-wait-call";
let tool_name = "deterministic_wait";
let tool_args = json!({});
let tool_call_arguments = serde_json::to_string(&tool_args)?;
let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home");
std::fs::create_dir(&codex_home)?;
let working_directory = tmp.path().join("workdir");
std::fs::create_dir(&working_directory)?;
let (server, mut completions) = start_streaming_sse_server(vec![
vec![StreamingSseChunk {
gate: None,
body: responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_function_call(call_id, tool_name, &tool_call_arguments),
responses::ev_completed("resp-1"),
]),
}],
vec![StreamingSseChunk {
gate: None,
body: responses::sse(vec![
responses::ev_response_created("resp-2"),
responses::ev_assistant_message("msg-1", "Done"),
responses::ev_completed("resp-2"),
]),
}],
])
.await;
let first_response_completed = completions.remove(0);
let final_response_completed = completions.remove(0);
create_config_toml(&codex_home, server.uri())?;
let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
dynamic_tools: Some(vec![DynamicToolSpec {
name: tool_name.to_string(),
description: "Deterministic wait tool".to_string(),
input_schema: json!({
"type": "object",
"properties": {},
"additionalProperties": false,
}),
defer_loading: false,
}]),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let thread_id = thread.id;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread_id.clone(),
input: vec![V2UserInput::Text {
text: "run deterministic tool".to_string(),
text_elements: Vec::new(),
}],
cwd: Some(working_directory),
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let _: TurnStartResponse = to_response::<TurnStartResponse>(turn_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
server.wait_for_request_count(/*count*/ 1),
)
.await?;
timeout(DEFAULT_READ_TIMEOUT, first_response_completed).await??;
let started = timeout(
DEFAULT_READ_TIMEOUT,
wait_for_dynamic_tool_started(&mut mcp, call_id),
)
.await??;
assert_eq!(started.thread_id, thread_id);
let request = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_request_message(),
)
.await??;
let (request_id, params) = match request {
ServerRequest::DynamicToolCall { request_id, params } => (request_id, params),
other => panic!("expected DynamicToolCall request, got {other:?}"),
};
assert_eq!(
params,
DynamicToolCallParams {
thread_id: thread_id.clone(),
turn_id: started.turn_id,
call_id: call_id.to_string(),
tool: tool_name.to_string(),
arguments: tool_args,
}
);
let unsubscribe_id = mcp
.send_thread_unsubscribe_request(ThreadUnsubscribeParams {
thread_id: thread_id.clone(),
})
.await?;
let unsubscribe_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(unsubscribe_id)),
)
.await??;
let unsubscribe = to_response::<ThreadUnsubscribeResponse>(unsubscribe_resp)?;
assert_eq!(unsubscribe.status, ThreadUnsubscribeStatus::Unsubscribed);
let closed_while_tool_call_blocked = timeout(
std::time::Duration::from_millis(250),
mcp.read_stream_until_notification_message("thread/closed"),
);
let closed_while_tool_call_blocked = closed_while_tool_call_blocked.await;
assert!(closed_while_tool_call_blocked.is_err());
let response = DynamicToolCallResponse {
content_items: vec![DynamicToolCallOutputContentItem::InputText {
text: "dynamic-ok".to_string(),
}],
success: true,
};
mcp.send_response(request_id, serde_json::to_value(response)?)
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
server.wait_for_request_count(/*count*/ 2),
)
.await?;
timeout(DEFAULT_READ_TIMEOUT, final_response_completed).await??;
server.shutdown().await;
Ok(())
}
#[tokio::test]
async fn thread_unsubscribe_preserves_cached_status_before_idle_unload() -> Result<()> {
let server = responses::start_mock_server().await;
let _response_mock = responses::mount_sse_once(
&server,
responses::sse_failed("resp-1", "server_error", "simulated failure"),
)
.await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_id = start_thread(&mut mcp).await?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread_id.clone(),
input: vec![V2UserInput::Text {
text: "fail this turn".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let _: TurnStartResponse = to_response::<TurnStartResponse>(turn_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("error"),
)
.await??;
let read_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: thread_id.clone(),
include_turns: false,
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(thread.status, ThreadStatus::SystemError);
let unsubscribe_id = mcp
.send_thread_unsubscribe_request(ThreadUnsubscribeParams {
thread_id: thread_id.clone(),
})
.await?;
let unsubscribe_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(unsubscribe_id)),
)
.await??;
let unsubscribe = to_response::<ThreadUnsubscribeResponse>(unsubscribe_resp)?;
assert_eq!(unsubscribe.status, ThreadUnsubscribeStatus::Unsubscribed);
assert!(
timeout(
std::time::Duration::from_millis(250),
mcp.read_stream_until_notification_message("thread/closed"),
)
.await
.is_err()
);
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id,
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let resume: ThreadResumeResponse = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(resume.thread.status, ThreadStatus::SystemError);
Ok(())
}
#[tokio::test]
async fn thread_unsubscribe_reports_not_subscribed_before_idle_unload() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_id = start_thread(&mut mcp).await?;
let first_unsubscribe_id = mcp
.send_thread_unsubscribe_request(ThreadUnsubscribeParams {
thread_id: thread_id.clone(),
})
.await?;
let first_unsubscribe_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(first_unsubscribe_id)),
)
.await??;
let first_unsubscribe = to_response::<ThreadUnsubscribeResponse>(first_unsubscribe_resp)?;
assert_eq!(
first_unsubscribe.status,
ThreadUnsubscribeStatus::Unsubscribed
);
let second_unsubscribe_id = mcp
.send_thread_unsubscribe_request(ThreadUnsubscribeParams { thread_id })
.await?;
let second_unsubscribe_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(second_unsubscribe_id)),
)
.await??;
let second_unsubscribe = to_response::<ThreadUnsubscribeResponse>(second_unsubscribe_resp)?;
assert_eq!(
second_unsubscribe.status,
ThreadUnsubscribeStatus::NotSubscribed
);
Ok(())
}
async fn wait_for_dynamic_tool_started(
mcp: &mut McpProcess,
call_id: &str,
) -> Result<ItemStartedNotification> {
loop {
let notification = mcp
.read_stream_until_notification_message("item/started")
.await?;
let Some(params) = notification.params else {
continue;
};
let started: ItemStartedNotification = serde_json::from_value(params)?;
if matches!(&started.item, ThreadItem::DynamicToolCall { id, .. } if id == call_id) {
return Ok(started);
}
}
}
fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "danger-full-access"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}
async fn start_thread(mcp: &mut McpProcess) -> Result<String> {
let req_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(resp)?;
Ok(thread.id)
}