app-server: Only unload threads which were unused for some time (#17398)

Currently app-server may unload actively running threads once the last
connection disconnects, which is not expected.
Instead track when was the last active turn & when there were any
subscribers the last time, also add 30 minute idleness/no subscribers
timer to reduce the churn.
This commit is contained in:
Ruslan Nigmatullin
2026-04-13 12:25:26 -07:00
committed by GitHub
parent d905376628
commit a5507b59c4
6 changed files with 495 additions and 160 deletions

View File

@@ -338,7 +338,8 @@ async fn websocket_transport_allows_unauthenticated_non_loopback_startup_by_defa
}
#[tokio::test]
async fn websocket_disconnect_unloads_last_subscribed_thread() -> Result<()> {
async fn websocket_disconnect_keeps_last_subscribed_thread_loaded_until_idle_timeout() -> Result<()>
{
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), "never")?;
@@ -359,7 +360,7 @@ async fn websocket_disconnect_unloads_last_subscribed_thread() -> Result<()> {
send_initialize_request(&mut ws2, /*id*/ 4, "ws_reconnect_client").await?;
read_response_for_id(&mut ws2, /*id*/ 4).await?;
wait_for_loaded_threads(&mut ws2, /*first_id*/ 5, &[]).await?;
wait_for_loaded_threads(&mut ws2, /*first_id*/ 5, &[thread_id.as_str()]).await?;
process
.kill()

View File

@@ -7,10 +7,8 @@ use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::create_shell_command_sse_response;
use app_test_support::to_response;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadLoadedListParams;
use codex_app_server_protocol::ThreadLoadedListResponse;
@@ -21,7 +19,6 @@ use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadStatusChangedNotification;
use codex_app_server_protocol::ThreadUnsubscribeParams;
use codex_app_server_protocol::ThreadUnsubscribeResponse;
use codex_app_server_protocol::ThreadUnsubscribeStatus;
@@ -81,7 +78,7 @@ async fn wait_for_responses_request_count_to_stabilize(
}
#[tokio::test]
async fn thread_unsubscribe_unloads_thread_and_emits_thread_closed_notification() -> Result<()> {
async fn thread_unsubscribe_keeps_thread_loaded_until_idle_timeout() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
@@ -104,20 +101,14 @@ async fn thread_unsubscribe_unloads_thread_and_emits_thread_closed_notification(
let unsubscribe = to_response::<ThreadUnsubscribeResponse>(unsubscribe_resp)?;
assert_eq!(unsubscribe.status, ThreadUnsubscribeStatus::Unsubscribed);
let closed_notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/closed"),
)
.await??;
let parsed: ServerNotification = closed_notif.try_into()?;
let ServerNotification::ThreadClosed(payload) = parsed else {
anyhow::bail!("expected thread/closed notification");
};
assert_eq!(payload.thread_id, thread_id);
let status_changed = wait_for_thread_status_not_loaded(&mut mcp, &payload.thread_id).await?;
assert_eq!(status_changed.thread_id, payload.thread_id);
assert_eq!(status_changed.status, ThreadStatus::NotLoaded);
assert!(
timeout(
std::time::Duration::from_millis(250),
mcp.read_stream_until_notification_message("thread/closed"),
)
.await
.is_err()
);
let list_id = mcp
.send_thread_loaded_list_request(ThreadLoadedListParams::default())
@@ -129,22 +120,22 @@ async fn thread_unsubscribe_unloads_thread_and_emits_thread_closed_notification(
.await??;
let ThreadLoadedListResponse { data, next_cursor } =
to_response::<ThreadLoadedListResponse>(list_resp)?;
assert_eq!(data, Vec::<String>::new());
assert_eq!(data, vec![thread_id]);
assert_eq!(next_cursor, None);
Ok(())
}
#[tokio::test]
async fn thread_unsubscribe_during_turn_interrupts_turn_and_emits_thread_closed() -> Result<()> {
async fn thread_unsubscribe_during_turn_keeps_turn_running() -> Result<()> {
#[cfg(target_os = "windows")]
let shell_command = vec![
"powershell".to_string(),
"-Command".to_string(),
"Start-Sleep -Seconds 10".to_string(),
"Start-Sleep -Seconds 1".to_string(),
];
#[cfg(not(target_os = "windows"))]
let shell_command = vec!["sleep".to_string(), "10".to_string()];
let shell_command = vec!["sleep".to_string(), "1".to_string()];
let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home");
@@ -206,20 +197,18 @@ async fn thread_unsubscribe_during_turn_interrupts_turn_and_emits_thread_closed(
let unsubscribe = to_response::<ThreadUnsubscribeResponse>(unsubscribe_resp)?;
assert_eq!(unsubscribe.status, ThreadUnsubscribeStatus::Unsubscribed);
let closed_notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/closed"),
)
.await??;
let parsed: ServerNotification = closed_notif.try_into()?;
let ServerNotification::ThreadClosed(payload) = parsed else {
anyhow::bail!("expected thread/closed notification");
};
assert_eq!(payload.thread_id, thread_id);
assert!(
timeout(
std::time::Duration::from_millis(250),
mcp.read_stream_until_notification_message("thread/closed"),
)
.await
.is_err()
);
wait_for_responses_request_count_to_stabilize(
&server,
/*expected_count*/ 1,
/*expected_count*/ 2,
std::time::Duration::from_millis(200),
)
.await?;
@@ -228,7 +217,7 @@ async fn thread_unsubscribe_during_turn_interrupts_turn_and_emits_thread_closed(
}
#[tokio::test]
async fn thread_unsubscribe_clears_cached_status_before_resume() -> Result<()> {
async fn thread_unsubscribe_preserves_cached_status_before_idle_unload() -> Result<()> {
let server = responses::start_mock_server().await;
let _response_mock = responses::mount_sse_once(
&server,
@@ -291,11 +280,14 @@ async fn thread_unsubscribe_clears_cached_status_before_resume() -> Result<()> {
.await??;
let unsubscribe = to_response::<ThreadUnsubscribeResponse>(unsubscribe_resp)?;
assert_eq!(unsubscribe.status, ThreadUnsubscribeStatus::Unsubscribed);
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/closed"),
)
.await??;
assert!(
timeout(
std::time::Duration::from_millis(250),
mcp.read_stream_until_notification_message("thread/closed"),
)
.await
.is_err()
);
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
@@ -309,13 +301,13 @@ async fn thread_unsubscribe_clears_cached_status_before_resume() -> Result<()> {
)
.await??;
let resume: ThreadResumeResponse = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(resume.thread.status, ThreadStatus::Idle);
assert_eq!(resume.thread.status, ThreadStatus::SystemError);
Ok(())
}
#[tokio::test]
async fn thread_unsubscribe_reports_not_loaded_after_thread_is_unloaded() -> Result<()> {
async fn thread_unsubscribe_reports_not_subscribed_before_idle_unload() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
@@ -341,12 +333,6 @@ async fn thread_unsubscribe_reports_not_loaded_after_thread_is_unloaded() -> Res
ThreadUnsubscribeStatus::Unsubscribed
);
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/closed"),
)
.await??;
let second_unsubscribe_id = mcp
.send_thread_unsubscribe_request(ThreadUnsubscribeParams { thread_id })
.await?;
@@ -358,7 +344,7 @@ async fn thread_unsubscribe_reports_not_loaded_after_thread_is_unloaded() -> Res
let second_unsubscribe = to_response::<ThreadUnsubscribeResponse>(second_unsubscribe_resp)?;
assert_eq!(
second_unsubscribe.status,
ThreadUnsubscribeStatus::NotLoaded
ThreadUnsubscribeStatus::NotSubscribed
);
Ok(())
@@ -377,28 +363,6 @@ async fn wait_for_command_execution_item_started(mcp: &mut McpProcess) -> Result
}
}
async fn wait_for_thread_status_not_loaded(
mcp: &mut McpProcess,
thread_id: &str,
) -> Result<ThreadStatusChangedNotification> {
loop {
let status_changed_notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/status/changed"),
)
.await??;
let status_changed_params = status_changed_notif
.params
.context("thread/status/changed params must be present")?;
let status_changed: ThreadStatusChangedNotification =
serde_json::from_value(status_changed_params)?;
if status_changed.thread_id == thread_id && status_changed.status == ThreadStatus::NotLoaded
{
return Ok(status_changed);
}
}
}
fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(