mirror of
https://github.com/openai/codex.git
synced 2026-05-18 10:12:59 +00:00
462 lines
14 KiB
Rust
462 lines
14 KiB
Rust
use anyhow::Result;
|
|
use app_test_support::McpProcess;
|
|
use app_test_support::create_final_assistant_message_sse_response;
|
|
use app_test_support::create_mock_responses_server_sequence_unchecked;
|
|
use app_test_support::create_shell_command_sse_response;
|
|
use app_test_support::to_response;
|
|
use codex_app_server_protocol::ItemStartedNotification;
|
|
use codex_app_server_protocol::JSONRPCResponse;
|
|
use codex_app_server_protocol::RequestId;
|
|
use codex_app_server_protocol::ThreadItem;
|
|
use codex_app_server_protocol::ThreadQueueAddResponse;
|
|
use codex_app_server_protocol::ThreadQueueDeleteResponse;
|
|
use codex_app_server_protocol::ThreadQueueListResponse;
|
|
use codex_app_server_protocol::ThreadQueueReorderResponse;
|
|
use codex_app_server_protocol::ThreadResumeParams;
|
|
use codex_app_server_protocol::ThreadResumeResponse;
|
|
use codex_app_server_protocol::ThreadStartParams;
|
|
use codex_app_server_protocol::ThreadStartResponse;
|
|
use codex_app_server_protocol::TurnStartParams;
|
|
use codex_app_server_protocol::TurnStartResponse;
|
|
use codex_app_server_protocol::UserInput;
|
|
use pretty_assertions::assert_eq;
|
|
use serde_json::json;
|
|
use tempfile::TempDir;
|
|
use tokio::time::timeout;
|
|
|
|
#[cfg(windows)]
|
|
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
|
|
#[cfg(not(windows))]
|
|
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
|
|
|
#[tokio::test]
|
|
async fn thread_queue_supports_add_list_reorder_and_delete() -> Result<()> {
|
|
let server = create_mock_responses_server_sequence_unchecked(vec![
|
|
create_final_assistant_message_sse_response("materialized")?,
|
|
create_shell_command_sse_response(
|
|
sleep_command(/*seconds*/ 10),
|
|
/*workdir*/ None,
|
|
Some(10_000),
|
|
"keep-open",
|
|
)?,
|
|
])
|
|
.await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
let thread = start_materialized_thread(&mut mcp).await?;
|
|
let blocking_turn = start_blocking_turn(&mut mcp, thread.id.as_str()).await?;
|
|
|
|
let first = add_queued_turn(&mut mcp, thread.id.as_str(), "first").await?;
|
|
let second = add_queued_turn(&mut mcp, thread.id.as_str(), "second").await?;
|
|
|
|
let list_id = mcp
|
|
.send_raw_request(
|
|
"thread/queue/list",
|
|
Some(json!({
|
|
"threadId": thread.id,
|
|
})),
|
|
)
|
|
.await?;
|
|
let list_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
|
|
)
|
|
.await??;
|
|
let list: ThreadQueueListResponse = to_response(list_resp)?;
|
|
assert_eq!(
|
|
list.queued_turns
|
|
.iter()
|
|
.map(|turn| turn.id.as_str())
|
|
.collect::<Vec<_>>(),
|
|
vec![
|
|
first.queued_turn.id.as_str(),
|
|
second.queued_turn.id.as_str()
|
|
]
|
|
);
|
|
|
|
let reorder_id = mcp
|
|
.send_raw_request(
|
|
"thread/queue/reorder",
|
|
Some(json!({
|
|
"threadId": thread.id,
|
|
"queuedTurnIds": [
|
|
second.queued_turn.id,
|
|
first.queued_turn.id,
|
|
],
|
|
})),
|
|
)
|
|
.await?;
|
|
let reorder_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(reorder_id)),
|
|
)
|
|
.await??;
|
|
let reordered: ThreadQueueReorderResponse = to_response(reorder_resp)?;
|
|
assert_eq!(
|
|
reordered
|
|
.queued_turns
|
|
.iter()
|
|
.map(|turn| turn.id.as_str())
|
|
.collect::<Vec<_>>(),
|
|
vec![
|
|
second.queued_turn.id.as_str(),
|
|
first.queued_turn.id.as_str()
|
|
]
|
|
);
|
|
|
|
let delete_id = mcp
|
|
.send_raw_request(
|
|
"thread/queue/delete",
|
|
Some(json!({
|
|
"threadId": thread.id,
|
|
"queuedTurnId": second.queued_turn.id,
|
|
})),
|
|
)
|
|
.await?;
|
|
let delete_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(delete_id)),
|
|
)
|
|
.await??;
|
|
let deleted: ThreadQueueDeleteResponse = to_response(delete_resp)?;
|
|
assert!(deleted.deleted);
|
|
|
|
mcp.interrupt_turn_and_wait_for_aborted(thread.id, blocking_turn.id, DEFAULT_READ_TIMEOUT)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_queue_drains_after_the_next_terminal_turn() -> Result<()> {
|
|
let server = create_mock_responses_server_sequence_unchecked(vec![
|
|
create_final_assistant_message_sse_response("materialized")?,
|
|
create_shell_command_sse_response(
|
|
sleep_command(/*seconds*/ 1),
|
|
/*workdir*/ None,
|
|
Some(10_000),
|
|
"call1",
|
|
)?,
|
|
create_final_assistant_message_sse_response("manual")?,
|
|
create_final_assistant_message_sse_response("queued")?,
|
|
])
|
|
.await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
let thread = start_materialized_thread(&mut mcp).await?;
|
|
start_blocking_turn(&mut mcp, thread.id.as_str()).await?;
|
|
add_queued_turn(&mut mcp, thread.id.as_str(), "queued").await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/started"),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
let list_id = mcp
|
|
.send_raw_request(
|
|
"thread/queue/list",
|
|
Some(json!({
|
|
"threadId": thread.id,
|
|
})),
|
|
)
|
|
.await?;
|
|
let list_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
|
|
)
|
|
.await??;
|
|
let list: ThreadQueueListResponse = to_response(list_resp)?;
|
|
assert!(list.queued_turns.is_empty());
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_queue_add_drains_immediately_when_the_thread_is_idle() -> Result<()> {
|
|
let server = create_mock_responses_server_sequence_unchecked(vec![
|
|
create_final_assistant_message_sse_response("materialized")?,
|
|
create_final_assistant_message_sse_response("queued")?,
|
|
])
|
|
.await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
let thread = start_materialized_thread(&mut mcp).await?;
|
|
add_queued_turn(&mut mcp, thread.id.as_str(), "queued").await?;
|
|
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/started"),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
let list_id = mcp
|
|
.send_raw_request(
|
|
"thread/queue/list",
|
|
Some(json!({
|
|
"threadId": thread.id,
|
|
})),
|
|
)
|
|
.await?;
|
|
let list_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
|
|
)
|
|
.await??;
|
|
let list: ThreadQueueListResponse = to_response(list_resp)?;
|
|
assert!(list.queued_turns.is_empty());
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn thread_queue_drains_after_restart_and_resume() -> Result<()> {
|
|
let server = create_mock_responses_server_sequence_unchecked(vec![
|
|
create_final_assistant_message_sse_response("materialized")?,
|
|
create_shell_command_sse_response(
|
|
sleep_command(/*seconds*/ 10),
|
|
/*workdir*/ None,
|
|
Some(10_000),
|
|
"keep-open",
|
|
)?,
|
|
create_final_assistant_message_sse_response("queued")?,
|
|
])
|
|
.await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri())?;
|
|
|
|
let mut first_mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, first_mcp.initialize()).await??;
|
|
let thread = start_materialized_thread(&mut first_mcp).await?;
|
|
start_blocking_turn(&mut first_mcp, thread.id.as_str()).await?;
|
|
add_queued_turn(&mut first_mcp, thread.id.as_str(), "queued").await?;
|
|
drop(first_mcp);
|
|
|
|
let mut second_mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, second_mcp.initialize()).await??;
|
|
let resume_id = second_mcp
|
|
.send_thread_resume_request(ThreadResumeParams {
|
|
thread_id: thread.id.clone(),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let resume_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
second_mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
|
)
|
|
.await??;
|
|
let ThreadResumeResponse {
|
|
thread: resumed, ..
|
|
} = to_response(resume_resp)?;
|
|
assert_eq!(resumed.id, thread.id);
|
|
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
second_mcp.read_stream_until_notification_message("thread/queue/changed"),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
second_mcp.read_stream_until_notification_message("turn/started"),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
second_mcp.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
let list_id = second_mcp
|
|
.send_raw_request(
|
|
"thread/queue/list",
|
|
Some(json!({
|
|
"threadId": thread.id,
|
|
})),
|
|
)
|
|
.await?;
|
|
let list_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
second_mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
|
|
)
|
|
.await??;
|
|
let list: ThreadQueueListResponse = to_response(list_resp)?;
|
|
assert!(list.queued_turns.is_empty());
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn start_materialized_thread(
|
|
mcp: &mut McpProcess,
|
|
) -> Result<codex_app_server_protocol::Thread> {
|
|
let start_id = mcp
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("mock-model".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let turn_id = mcp
|
|
.send_turn_start_request(turn_start_params(thread.id.as_str(), "materialize"))
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/started"),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
Ok(thread)
|
|
}
|
|
|
|
async fn add_queued_turn(
|
|
mcp: &mut McpProcess,
|
|
thread_id: &str,
|
|
text: &str,
|
|
) -> Result<ThreadQueueAddResponse> {
|
|
let add_id = mcp
|
|
.send_raw_request(
|
|
"thread/queue/add",
|
|
Some(json!({
|
|
"threadId": thread_id,
|
|
"turnStartParams": turn_start_params(thread_id, text),
|
|
})),
|
|
)
|
|
.await?;
|
|
let add_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(add_id)),
|
|
)
|
|
.await??;
|
|
to_response(add_resp)
|
|
}
|
|
|
|
async fn start_blocking_turn(
|
|
mcp: &mut McpProcess,
|
|
thread_id: &str,
|
|
) -> Result<codex_app_server_protocol::Turn> {
|
|
let turn_id = mcp
|
|
.send_turn_start_request(turn_start_params(thread_id, "manual"))
|
|
.await?;
|
|
let turn_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
|
)
|
|
.await??;
|
|
let TurnStartResponse { turn } = to_response(turn_resp)?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/started"),
|
|
)
|
|
.await??;
|
|
wait_for_command_execution_started(mcp).await?;
|
|
Ok(turn)
|
|
}
|
|
|
|
async fn wait_for_command_execution_started(mcp: &mut McpProcess) -> Result<()> {
|
|
loop {
|
|
let notif = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("item/started"),
|
|
)
|
|
.await??;
|
|
let started: ItemStartedNotification = serde_json::from_value(
|
|
notif
|
|
.params
|
|
.ok_or_else(|| anyhow::anyhow!("missing item/started params"))?,
|
|
)?;
|
|
if matches!(started.item, ThreadItem::CommandExecution { .. }) {
|
|
return Ok(());
|
|
}
|
|
}
|
|
}
|
|
|
|
fn sleep_command(seconds: u64) -> Vec<String> {
|
|
#[cfg(target_os = "windows")]
|
|
{
|
|
vec![
|
|
"powershell".to_string(),
|
|
"-Command".to_string(),
|
|
format!("Start-Sleep -Seconds {seconds}"),
|
|
]
|
|
}
|
|
#[cfg(not(target_os = "windows"))]
|
|
{
|
|
vec!["sleep".to_string(), seconds.to_string()]
|
|
}
|
|
}
|
|
|
|
fn turn_start_params(thread_id: &str, text: &str) -> TurnStartParams {
|
|
TurnStartParams {
|
|
thread_id: thread_id.to_string(),
|
|
input: vec![UserInput::Text {
|
|
text: text.to_string(),
|
|
text_elements: Vec::new(),
|
|
}],
|
|
..Default::default()
|
|
}
|
|
}
|
|
|
|
fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
|
|
let config_toml = codex_home.join("config.toml");
|
|
std::fs::write(
|
|
config_toml,
|
|
format!(
|
|
r#"
|
|
model = "gpt-5.3-codex"
|
|
approval_policy = "never"
|
|
sandbox_mode = "read-only"
|
|
|
|
model_provider = "mock_provider"
|
|
|
|
[features]
|
|
personality = true
|
|
|
|
[model_providers.mock_provider]
|
|
name = "Mock provider for test"
|
|
base_url = "{server_uri}/v1"
|
|
wire_api = "responses"
|
|
request_max_retries = 0
|
|
stream_max_retries = 0
|
|
"#
|
|
),
|
|
)
|
|
}
|