Compare commits

...

2 Commits

Author SHA1 Message Date
David Hao
fbf0c14a27 Lint 2026-04-13 11:41:14 -07:00
David Hao
d3fcf88a3d Do not terminate codex threads even when there are no subscribers 2026-04-13 10:14:02 -07:00
3 changed files with 61 additions and 5 deletions

View File

@@ -3757,8 +3757,8 @@ impl CodexMessageProcessor {
self.finalize_thread_teardown(thread_id).await;
continue;
};
self.unload_thread_without_subscribers(thread_id, thread)
.await;
drop(thread);
info!("thread {thread_id} has no subscribers after connection close; keeping loaded");
}
}
@@ -5521,7 +5521,7 @@ impl CodexMessageProcessor {
thread_id: ThreadId,
thread: Arc<CodexThread>,
) {
// This connection was the last subscriber. Only now do we unload the thread.
// This explicit unsubscribe was the last subscriber. Only now do we unload the thread.
info!("thread {thread_id} has no subscribers; shutting down");
let should_start_unload_task = self.pending_thread_unloads.lock().await.insert(thread_id);

View File

@@ -338,7 +338,7 @@ async fn websocket_transport_allows_unauthenticated_non_loopback_startup_by_defa
}
#[tokio::test]
async fn websocket_disconnect_unloads_last_subscribed_thread() -> Result<()> {
async fn websocket_disconnect_keeps_last_subscribed_thread_loaded() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), "never")?;
@@ -359,7 +359,7 @@ async fn websocket_disconnect_unloads_last_subscribed_thread() -> Result<()> {
send_initialize_request(&mut ws2, /*id*/ 4, "ws_reconnect_client").await?;
read_response_for_id(&mut ws2, /*id*/ 4).await?;
wait_for_loaded_threads(&mut ws2, /*first_id*/ 5, &[]).await?;
wait_for_loaded_threads(&mut ws2, /*first_id*/ 5, &[thread_id.as_str()]).await?;
process
.kill()

View File

@@ -12,13 +12,17 @@ use anyhow::bail;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::to_response;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::UserInput as V2UserInput;
use core_test_support::responses;
use futures::SinkExt;
use futures::StreamExt;
use std::net::SocketAddr;
use std::process::Command as StdCommand;
use tempfile::TempDir;
use tokio::process::Child;
@@ -38,6 +42,7 @@ async fn websocket_transport_ctrl_c_waits_for_running_turn_before_exit() -> Resu
_server,
mut process,
mut ws,
..
} = start_ctrl_c_restart_fixture(Duration::from_secs(3)).await?;
send_sigint(&process)?;
@@ -63,6 +68,7 @@ async fn websocket_transport_second_ctrl_c_forces_exit_while_turn_running() -> R
_server,
mut process,
mut ws,
..
} = start_ctrl_c_restart_fixture(Duration::from_secs(3)).await?;
send_sigint(&process)?;
@@ -89,6 +95,7 @@ async fn websocket_transport_sigterm_waits_for_running_turn_before_exit() -> Res
_server,
mut process,
mut ws,
..
} = start_ctrl_c_restart_fixture(Duration::from_secs(3)).await?;
send_sigterm(&process)?;
@@ -114,6 +121,7 @@ async fn websocket_transport_second_sigterm_forces_exit_while_turn_running() ->
_server,
mut process,
mut ws,
..
} = start_ctrl_c_restart_fixture(Duration::from_secs(3)).await?;
send_sigterm(&process)?;
@@ -133,10 +141,56 @@ async fn websocket_transport_second_sigterm_forces_exit_while_turn_running() ->
Ok(())
}
#[tokio::test]
async fn websocket_transport_disconnect_does_not_shutdown_running_thread() -> Result<()> {
let GracefulCtrlCFixture {
_codex_home,
_server,
bind_addr,
mut process,
thread_id,
mut ws,
} = start_ctrl_c_restart_fixture(Duration::from_secs(3)).await?;
ws.close(None)
.await
.context("failed to close websocket connection")?;
let mut resumed_ws = connect_websocket(bind_addr).await?;
send_initialize_request(&mut resumed_ws, /*id*/ 4, "ws_reconnect_client").await?;
read_response_for_id(&mut resumed_ws, /*id*/ 4).await?;
send_request(
&mut resumed_ws,
"thread/resume",
/*id*/ 5,
Some(serde_json::to_value(ThreadResumeParams {
thread_id: thread_id.clone(),
..Default::default()
})?),
)
.await?;
let resume_response = read_response_for_id(&mut resumed_ws, /*id*/ 5).await?;
let ThreadResumeResponse { thread, .. } = to_response(resume_response)?;
assert_ne!(
thread.status,
ThreadStatus::NotLoaded,
"running thread should survive transient websocket disconnect",
);
process
.kill()
.await
.context("failed to stop websocket app-server process")?;
Ok(())
}
struct GracefulCtrlCFixture {
_codex_home: TempDir,
_server: wiremock::MockServer,
bind_addr: SocketAddr,
process: Child,
thread_id: String,
ws: WsClient,
}
@@ -173,7 +227,9 @@ async fn start_ctrl_c_restart_fixture(turn_delay: Duration) -> Result<GracefulCt
Ok(GracefulCtrlCFixture {
_codex_home: codex_home,
_server: server,
bind_addr,
process,
thread_id: thread.id,
ws,
})
}