mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
Do not terminate codex threads even when there are no subscribers
This commit is contained in:
@@ -3757,8 +3757,8 @@ impl CodexMessageProcessor {
|
||||
self.finalize_thread_teardown(thread_id).await;
|
||||
continue;
|
||||
};
|
||||
self.unload_thread_without_subscribers(thread_id, thread)
|
||||
.await;
|
||||
drop(thread);
|
||||
info!("thread {thread_id} has no subscribers after connection close; keeping loaded");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5521,7 +5521,7 @@ impl CodexMessageProcessor {
|
||||
thread_id: ThreadId,
|
||||
thread: Arc<CodexThread>,
|
||||
) {
|
||||
// This connection was the last subscriber. Only now do we unload the thread.
|
||||
// This explicit unsubscribe was the last subscriber. Only now do we unload the thread.
|
||||
info!("thread {thread_id} has no subscribers; shutting down");
|
||||
let should_start_unload_task = self.pending_thread_unloads.lock().await.insert(thread_id);
|
||||
|
||||
|
||||
@@ -338,7 +338,7 @@ async fn websocket_transport_allows_unauthenticated_non_loopback_startup_by_defa
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn websocket_disconnect_unloads_last_subscribed_thread() -> Result<()> {
|
||||
async fn websocket_disconnect_keeps_last_subscribed_thread_loaded() -> Result<()> {
|
||||
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
||||
@@ -359,7 +359,7 @@ async fn websocket_disconnect_unloads_last_subscribed_thread() -> Result<()> {
|
||||
send_initialize_request(&mut ws2, /*id*/ 4, "ws_reconnect_client").await?;
|
||||
read_response_for_id(&mut ws2, /*id*/ 4).await?;
|
||||
|
||||
wait_for_loaded_threads(&mut ws2, /*first_id*/ 5, &[]).await?;
|
||||
wait_for_loaded_threads(&mut ws2, /*first_id*/ 5, &[thread_id.as_str()]).await?;
|
||||
|
||||
process
|
||||
.kill()
|
||||
|
||||
@@ -12,13 +12,17 @@ use anyhow::bail;
|
||||
use app_test_support::create_final_assistant_message_sse_response;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ThreadResumeParams;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::UserInput as V2UserInput;
|
||||
use core_test_support::responses;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use std::net::SocketAddr;
|
||||
use std::process::Command as StdCommand;
|
||||
use tempfile::TempDir;
|
||||
use tokio::process::Child;
|
||||
@@ -38,6 +42,7 @@ async fn websocket_transport_ctrl_c_waits_for_running_turn_before_exit() -> Resu
|
||||
_server,
|
||||
mut process,
|
||||
mut ws,
|
||||
..
|
||||
} = start_ctrl_c_restart_fixture(Duration::from_secs(3)).await?;
|
||||
|
||||
send_sigint(&process)?;
|
||||
@@ -63,6 +68,7 @@ async fn websocket_transport_second_ctrl_c_forces_exit_while_turn_running() -> R
|
||||
_server,
|
||||
mut process,
|
||||
mut ws,
|
||||
..
|
||||
} = start_ctrl_c_restart_fixture(Duration::from_secs(3)).await?;
|
||||
|
||||
send_sigint(&process)?;
|
||||
@@ -89,6 +95,7 @@ async fn websocket_transport_sigterm_waits_for_running_turn_before_exit() -> Res
|
||||
_server,
|
||||
mut process,
|
||||
mut ws,
|
||||
..
|
||||
} = start_ctrl_c_restart_fixture(Duration::from_secs(3)).await?;
|
||||
|
||||
send_sigterm(&process)?;
|
||||
@@ -114,6 +121,7 @@ async fn websocket_transport_second_sigterm_forces_exit_while_turn_running() ->
|
||||
_server,
|
||||
mut process,
|
||||
mut ws,
|
||||
..
|
||||
} = start_ctrl_c_restart_fixture(Duration::from_secs(3)).await?;
|
||||
|
||||
send_sigterm(&process)?;
|
||||
@@ -133,10 +141,56 @@ async fn websocket_transport_second_sigterm_forces_exit_while_turn_running() ->
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn websocket_transport_disconnect_does_not_shutdown_running_thread() -> Result<()> {
|
||||
let GracefulCtrlCFixture {
|
||||
_codex_home,
|
||||
_server,
|
||||
bind_addr,
|
||||
mut process,
|
||||
thread_id,
|
||||
mut ws,
|
||||
} = start_ctrl_c_restart_fixture(Duration::from_secs(3)).await?;
|
||||
|
||||
ws.close(None)
|
||||
.await
|
||||
.context("failed to close websocket connection")?;
|
||||
|
||||
let mut resumed_ws = connect_websocket(bind_addr).await?;
|
||||
send_initialize_request(&mut resumed_ws, /*id*/ 4, "ws_reconnect_client").await?;
|
||||
read_response_for_id(&mut resumed_ws, /*id*/ 4).await?;
|
||||
|
||||
send_request(
|
||||
&mut resumed_ws,
|
||||
"thread/resume",
|
||||
5,
|
||||
Some(serde_json::to_value(ThreadResumeParams {
|
||||
thread_id: thread_id.clone(),
|
||||
..Default::default()
|
||||
})?),
|
||||
)
|
||||
.await?;
|
||||
let resume_response = read_response_for_id(&mut resumed_ws, /*id*/ 5).await?;
|
||||
let ThreadResumeResponse { thread, .. } = to_response(resume_response)?;
|
||||
assert_ne!(
|
||||
thread.status,
|
||||
ThreadStatus::NotLoaded,
|
||||
"running thread should survive transient websocket disconnect",
|
||||
);
|
||||
|
||||
process
|
||||
.kill()
|
||||
.await
|
||||
.context("failed to stop websocket app-server process")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct GracefulCtrlCFixture {
|
||||
_codex_home: TempDir,
|
||||
_server: wiremock::MockServer,
|
||||
bind_addr: SocketAddr,
|
||||
process: Child,
|
||||
thread_id: String,
|
||||
ws: WsClient,
|
||||
}
|
||||
|
||||
@@ -173,7 +227,9 @@ async fn start_ctrl_c_restart_fixture(turn_delay: Duration) -> Result<GracefulCt
|
||||
Ok(GracefulCtrlCFixture {
|
||||
_codex_home: codex_home,
|
||||
_server: server,
|
||||
bind_addr,
|
||||
process,
|
||||
thread_id: thread.id,
|
||||
ws,
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user