Files
codex/codex-rs/app-server/tests/suite/v2/thread_queue.rs
2026-05-11 01:03:23 -07:00

462 lines
14 KiB
Rust

use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
use app_test_support::to_response;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadQueueAddResponse;
use codex_app_server_protocol::ThreadQueueDeleteResponse;
use codex_app_server_protocol::ThreadQueueListResponse;
use codex_app_server_protocol::ThreadQueueReorderResponse;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;
use tokio::time::timeout;
#[cfg(windows)]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
#[cfg(not(windows))]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_queue_supports_add_list_reorder_and_delete() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(vec![
create_final_assistant_message_sse_response("materialized")?,
create_shell_command_sse_response(
sleep_command(/*seconds*/ 10),
/*workdir*/ None,
Some(10_000),
"keep-open",
)?,
])
.await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread = start_materialized_thread(&mut mcp).await?;
let blocking_turn = start_blocking_turn(&mut mcp, thread.id.as_str()).await?;
let first = add_queued_turn(&mut mcp, thread.id.as_str(), "first").await?;
let second = add_queued_turn(&mut mcp, thread.id.as_str(), "second").await?;
let list_id = mcp
.send_raw_request(
"thread/queue/list",
Some(json!({
"threadId": thread.id,
})),
)
.await?;
let list_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
)
.await??;
let list: ThreadQueueListResponse = to_response(list_resp)?;
assert_eq!(
list.queued_turns
.iter()
.map(|turn| turn.id.as_str())
.collect::<Vec<_>>(),
vec![
first.queued_turn.id.as_str(),
second.queued_turn.id.as_str()
]
);
let reorder_id = mcp
.send_raw_request(
"thread/queue/reorder",
Some(json!({
"threadId": thread.id,
"queuedTurnIds": [
second.queued_turn.id,
first.queued_turn.id,
],
})),
)
.await?;
let reorder_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(reorder_id)),
)
.await??;
let reordered: ThreadQueueReorderResponse = to_response(reorder_resp)?;
assert_eq!(
reordered
.queued_turns
.iter()
.map(|turn| turn.id.as_str())
.collect::<Vec<_>>(),
vec![
second.queued_turn.id.as_str(),
first.queued_turn.id.as_str()
]
);
let delete_id = mcp
.send_raw_request(
"thread/queue/delete",
Some(json!({
"threadId": thread.id,
"queuedTurnId": second.queued_turn.id,
})),
)
.await?;
let delete_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(delete_id)),
)
.await??;
let deleted: ThreadQueueDeleteResponse = to_response(delete_resp)?;
assert!(deleted.deleted);
mcp.interrupt_turn_and_wait_for_aborted(thread.id, blocking_turn.id, DEFAULT_READ_TIMEOUT)
.await?;
Ok(())
}
#[tokio::test]
async fn thread_queue_drains_after_the_next_terminal_turn() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(vec![
create_final_assistant_message_sse_response("materialized")?,
create_shell_command_sse_response(
sleep_command(/*seconds*/ 1),
/*workdir*/ None,
Some(10_000),
"call1",
)?,
create_final_assistant_message_sse_response("manual")?,
create_final_assistant_message_sse_response("queued")?,
])
.await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread = start_materialized_thread(&mut mcp).await?;
start_blocking_turn(&mut mcp, thread.id.as_str()).await?;
add_queued_turn(&mut mcp, thread.id.as_str(), "queued").await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/started"),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let list_id = mcp
.send_raw_request(
"thread/queue/list",
Some(json!({
"threadId": thread.id,
})),
)
.await?;
let list_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
)
.await??;
let list: ThreadQueueListResponse = to_response(list_resp)?;
assert!(list.queued_turns.is_empty());
Ok(())
}
#[tokio::test]
async fn thread_queue_add_drains_immediately_when_the_thread_is_idle() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(vec![
create_final_assistant_message_sse_response("materialized")?,
create_final_assistant_message_sse_response("queued")?,
])
.await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread = start_materialized_thread(&mut mcp).await?;
add_queued_turn(&mut mcp, thread.id.as_str(), "queued").await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/started"),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let list_id = mcp
.send_raw_request(
"thread/queue/list",
Some(json!({
"threadId": thread.id,
})),
)
.await?;
let list_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
)
.await??;
let list: ThreadQueueListResponse = to_response(list_resp)?;
assert!(list.queued_turns.is_empty());
Ok(())
}
#[tokio::test]
async fn thread_queue_drains_after_restart_and_resume() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(vec![
create_final_assistant_message_sse_response("materialized")?,
create_shell_command_sse_response(
sleep_command(/*seconds*/ 10),
/*workdir*/ None,
Some(10_000),
"keep-open",
)?,
create_final_assistant_message_sse_response("queued")?,
])
.await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut first_mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, first_mcp.initialize()).await??;
let thread = start_materialized_thread(&mut first_mcp).await?;
start_blocking_turn(&mut first_mcp, thread.id.as_str()).await?;
add_queued_turn(&mut first_mcp, thread.id.as_str(), "queued").await?;
drop(first_mcp);
let mut second_mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, second_mcp.initialize()).await??;
let resume_id = second_mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id.clone(),
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
second_mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse {
thread: resumed, ..
} = to_response(resume_resp)?;
assert_eq!(resumed.id, thread.id);
timeout(
DEFAULT_READ_TIMEOUT,
second_mcp.read_stream_until_notification_message("thread/queue/changed"),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
second_mcp.read_stream_until_notification_message("turn/started"),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
second_mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let list_id = second_mcp
.send_raw_request(
"thread/queue/list",
Some(json!({
"threadId": thread.id,
})),
)
.await?;
let list_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
second_mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
)
.await??;
let list: ThreadQueueListResponse = to_response(list_resp)?;
assert!(list.queued_turns.is_empty());
Ok(())
}
async fn start_materialized_thread(
mcp: &mut McpProcess,
) -> Result<codex_app_server_protocol::Thread> {
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let turn_id = mcp
.send_turn_start_request(turn_start_params(thread.id.as_str(), "materialize"))
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/started"),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
Ok(thread)
}
async fn add_queued_turn(
mcp: &mut McpProcess,
thread_id: &str,
text: &str,
) -> Result<ThreadQueueAddResponse> {
let add_id = mcp
.send_raw_request(
"thread/queue/add",
Some(json!({
"threadId": thread_id,
"turnStartParams": turn_start_params(thread_id, text),
})),
)
.await?;
let add_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(add_id)),
)
.await??;
to_response(add_resp)
}
async fn start_blocking_turn(
mcp: &mut McpProcess,
thread_id: &str,
) -> Result<codex_app_server_protocol::Turn> {
let turn_id = mcp
.send_turn_start_request(turn_start_params(thread_id, "manual"))
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
)
.await??;
let TurnStartResponse { turn } = to_response(turn_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/started"),
)
.await??;
wait_for_command_execution_started(mcp).await?;
Ok(turn)
}
async fn wait_for_command_execution_started(mcp: &mut McpProcess) -> Result<()> {
loop {
let notif = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("item/started"),
)
.await??;
let started: ItemStartedNotification = serde_json::from_value(
notif
.params
.ok_or_else(|| anyhow::anyhow!("missing item/started params"))?,
)?;
if matches!(started.item, ThreadItem::CommandExecution { .. }) {
return Ok(());
}
}
}
fn sleep_command(seconds: u64) -> Vec<String> {
#[cfg(target_os = "windows")]
{
vec![
"powershell".to_string(),
"-Command".to_string(),
format!("Start-Sleep -Seconds {seconds}"),
]
}
#[cfg(not(target_os = "windows"))]
{
vec!["sleep".to_string(), seconds.to_string()]
}
}
fn turn_start_params(thread_id: &str, text: &str) -> TurnStartParams {
TurnStartParams {
thread_id: thread_id.to_string(),
input: vec![UserInput::Text {
text: text.to_string(),
text_elements: Vec::new(),
}],
..Default::default()
}
}
fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "gpt-5.3-codex"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[features]
personality = true
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}