Compare commits

...

2 Commits

Author SHA1 Message Date
Ruslan Nigmatullin
27ad7caff2 codex: address PR review feedback (#20487) 2026-04-30 20:03:21 +00:00
Ruslan Nigmatullin
eaf60c2b68 Use connection-specific thread unload delays 2026-04-30 19:07:57 +00:00
9 changed files with 326 additions and 157 deletions

View File

@@ -160,7 +160,7 @@ Example with notification opt-out:
- `thread/goal/cleared` — notification emitted whenever a thread goal is removed.
- `thread/status/changed` — notification emitted when a loaded threads status changes (`threadId` + new `status`).
- `thread/archive` — move a threads rollout file into the archived directory and attempt to move any spawned descendant thread rollout files; returns `{}` on success and emits `thread/archived` for each archived thread.
- `thread/unsubscribe` — unsubscribe this connection from thread turn/item events. If this was the last subscriber, the server keeps the thread loaded and unloads it only after it has had no subscribers and no thread activity for 30 minutes, then emits `thread/closed`.
- `thread/unsubscribe` — unsubscribe this connection from thread turn/item events. If this was the last subscriber, the server unloads the thread after it has had no subscribers and no thread activity for the connection-specific idle delay, then emits `thread/closed`. In-process and stdio connections use no delay; unix-socket, TCP websocket, and remote-control connections use 30 minutes. When multiple subscribers are attached, the longest delay among them applies.
- `thread/name/set` — set or update a threads user-facing name for either a loaded thread or a persisted rollout; returns `{}` on success and emits `thread/name/updated` to initialized, opted-in clients. Thread names are not required to be unique; name lookups resolve to the most recently updated thread.
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success and emits `thread/unarchived`.
- `thread/compact/start` — trigger conversation history compaction for a thread; returns `{}` immediately while progress streams through standard turn/item notifications.
@@ -383,14 +383,14 @@ When `nextCursor` is `null`, youve reached the final page.
- `notSubscribed` when the connection was not subscribed to that thread.
- `notLoaded` when the thread is not loaded.
If this was the last subscriber, the server does not unload the thread immediately. It unloads the thread after the thread has had no subscribers and no thread activity for 30 minutes, then emits `thread/closed` and a `thread/status/changed` transition to `notLoaded`.
If this was the last subscriber, the server unloads the thread after the thread has had no subscribers and no thread activity for the connection-specific idle delay, then emits `thread/closed` and a `thread/status/changed` transition to `notLoaded`. In-process and stdio connections use no delay; unix-socket, TCP websocket, and remote-control connections use 30 minutes. When multiple subscribers are attached, the longest delay among them applies.
```json
{ "method": "thread/unsubscribe", "id": 22, "params": { "threadId": "thr_123" } }
{ "id": 22, "result": { "status": "unsubscribed" } }
```
Later, after the idle unload timeout:
After the idle delay elapses:
```json
{ "method": "thread/status/changed", "params": {

View File

@@ -426,6 +426,7 @@ use crate::filters::source_kind_matches;
use crate::thread_state::ThreadListenerCommand;
use crate::thread_state::ThreadState;
use crate::thread_state::ThreadStateManager;
use crate::thread_state::ThreadSubscriptionState;
use token_usage_replay::latest_token_usage_turn_id_for_thread_path;
use token_usage_replay::latest_token_usage_turn_id_from_rollout_items;
use token_usage_replay::send_thread_token_usage_update_to_connection;
@@ -448,7 +449,6 @@ struct ThreadListFilters {
const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60);
const LOGIN_ISSUER_OVERRIDE_ENV_VAR: &str = "CODEX_APP_SERVER_LOGIN_ISSUER";
const APP_LIST_LOAD_TIMEOUT: Duration = Duration::from_secs(90);
const THREAD_UNLOADING_DELAY: Duration = Duration::from_secs(30 * 60);
enum ActiveLogin {
Browser {
@@ -572,55 +572,53 @@ enum RefreshTokenRequestOutcome {
}
struct UnloadingState {
delay: Duration,
has_subscribers_rx: watch::Receiver<bool>,
subscription_rx: watch::Receiver<ThreadSubscriptionState>,
has_subscribers: (bool, Instant),
unloading_delay: Duration,
thread_status_rx: watch::Receiver<ThreadStatus>,
is_active: (bool, Instant),
}
impl UnloadingState {
async fn new(
listener_task_context: &ListenerTaskContext,
thread_id: ThreadId,
delay: Duration,
) -> Option<Self> {
let has_subscribers_rx = listener_task_context
async fn new(listener_task_context: &ListenerTaskContext, thread_id: ThreadId) -> Option<Self> {
let subscription_rx = listener_task_context
.thread_state_manager
.subscribe_to_has_connections(thread_id)
.subscribe_to_subscriptions(thread_id)
.await?;
let thread_status_rx = listener_task_context
.thread_watch_manager
.subscribe(thread_id)
.await?;
let has_subscribers = (*has_subscribers_rx.borrow(), Instant::now());
let subscription = *subscription_rx.borrow();
let has_subscribers = (subscription.has_connections, Instant::now());
let is_active = (
matches!(*thread_status_rx.borrow(), ThreadStatus::Active { .. }),
Instant::now(),
);
Some(Self {
delay,
has_subscribers_rx,
subscription_rx,
thread_status_rx,
has_subscribers,
unloading_delay: subscription.unloading_delay,
is_active,
})
}
fn unloading_target(&self) -> Option<Instant> {
match (self.has_subscribers, self.is_active) {
((false, has_no_subscribers_since), (false, is_inactive_since)) => {
Some(std::cmp::max(has_no_subscribers_since, is_inactive_since) + self.delay)
}
((false, has_no_subscribers_since), (false, is_inactive_since)) => Some(
std::cmp::max(has_no_subscribers_since, is_inactive_since) + self.unloading_delay,
),
_ => None,
}
}
fn sync_receiver_values(&mut self) {
let has_subscribers = *self.has_subscribers_rx.borrow();
if self.has_subscribers.0 != has_subscribers {
self.has_subscribers = (has_subscribers, Instant::now());
let subscription = *self.subscription_rx.borrow();
if self.has_subscribers.0 != subscription.has_connections {
self.has_subscribers = (subscription.has_connections, Instant::now());
}
self.unloading_delay = subscription.unloading_delay;
let is_active = matches!(*self.thread_status_rx.borrow(), ThreadStatus::Active { .. });
if self.is_active.0 != is_active {
@@ -658,7 +656,7 @@ impl UnloadingState {
};
tokio::select! {
_ = unloading_sleep => return true,
changed = self.has_subscribers_rx.changed() => {
changed = self.subscription_rx.changed() => {
if changed.is_err() {
return false;
}
@@ -4152,9 +4150,13 @@ impl CodexMessageProcessor {
self.thread_manager.subscribe_thread_created()
}
pub(crate) async fn connection_initialized(&self, connection_id: ConnectionId) {
pub(crate) async fn connection_initialized(
&self,
connection_id: ConnectionId,
unloading_delay: Duration,
) {
self.thread_state_manager
.connection_initialized(connection_id)
.connection_initialized(connection_id, unloading_delay)
.await;
}
@@ -4482,6 +4484,10 @@ impl CodexMessageProcessor {
Some(persisted_metadata)
}
#[expect(
clippy::await_holding_invalid_type,
reason = "running-thread resume subscription must be serialized against pending unloads"
)]
async fn resume_running_thread(
&self,
request_id: &ConnectionRequestId,
@@ -4535,12 +4541,6 @@ impl CodexMessageProcessor {
.thread_state_manager
.thread_state(existing_thread_id)
.await;
self.ensure_listener_task_running(
existing_thread_id,
existing_thread.clone(),
thread_state.clone(),
)
.await?;
let config_snapshot = existing_thread.config_snapshot().await;
let mismatch_details = collect_resume_override_mismatches(params, &config_snapshot);
@@ -4569,16 +4569,6 @@ impl CodexMessageProcessor {
let instruction_sources =
Self::instruction_sources_from_config(&config_for_instruction_sources).await;
let listener_command_tx = {
let thread_state = thread_state.lock().await;
thread_state.listener_command_tx()
};
let Some(listener_command_tx) = listener_command_tx else {
return Err(internal_error(format!(
"failed to enqueue running thread resume for thread {existing_thread_id}: thread listener is not running"
)));
};
let emit_thread_goal_update = self.config.features.enabled(Feature::Goals);
let thread_goal_state_db = if emit_thread_goal_update {
if let Some(state_db) = existing_thread.state_db() {
@@ -4590,19 +4580,76 @@ impl CodexMessageProcessor {
None
};
let pending_resume_request = crate::thread_state::PendingThreadResumeRequest {
request_id: request_id.clone(),
history_items,
config_snapshot,
instruction_sources,
thread_summary,
emit_thread_goal_update,
thread_goal_state_db,
include_turns: !params.exclude_turns,
};
let connection_id = request_id.connection_id;
{
let pending_thread_unloads = self.pending_thread_unloads.lock().await;
if pending_thread_unloads.contains(&existing_thread_id) {
return Err(invalid_request(format!(
"thread {existing_thread_id} is closing; retry thread/resume after the thread is closed"
)));
}
if !self
.thread_state_manager
.try_add_connection_to_thread(existing_thread_id, connection_id)
.await
{
tracing::debug!(
thread_id = %existing_thread_id,
connection_id = ?connection_id,
"skipping running thread resume for closed connection"
);
return Ok(true);
}
}
if let Err(error) = self
.ensure_listener_task_running(
existing_thread_id,
existing_thread.clone(),
thread_state.clone(),
)
.await
{
let _ = self
.thread_state_manager
.unsubscribe_connection_from_thread(existing_thread_id, connection_id)
.await;
return Err(error);
}
let listener_command_tx = {
let thread_state = thread_state.lock().await;
thread_state.listener_command_tx()
};
let Some(listener_command_tx) = listener_command_tx else {
let _ = self
.thread_state_manager
.unsubscribe_connection_from_thread(existing_thread_id, connection_id)
.await;
return Err(internal_error(format!(
"failed to enqueue running thread resume for thread {existing_thread_id}: thread listener is not running"
)));
};
let command = crate::thread_state::ThreadListenerCommand::SendThreadResumeResponse(
Box::new(crate::thread_state::PendingThreadResumeRequest {
request_id: request_id.clone(),
history_items,
config_snapshot,
instruction_sources,
thread_summary,
emit_thread_goal_update,
thread_goal_state_db,
include_turns: !params.exclude_turns,
}),
Box::new(pending_resume_request),
);
if listener_command_tx.send(command).is_err() {
let _ = self
.thread_state_manager
.unsubscribe_connection_from_thread(existing_thread_id, connection_id)
.await;
return Err(internal_error(format!(
"failed to enqueue running thread resume for thread {existing_thread_id}: thread listener command channel is closed"
)));
@@ -7536,12 +7583,8 @@ impl CodexMessageProcessor {
thread_state: Arc<Mutex<ThreadState>>,
) -> Result<(), JSONRPCErrorError> {
let (cancel_tx, mut cancel_rx) = oneshot::channel();
let Some(mut unloading_state) = UnloadingState::new(
&listener_task_context,
conversation_id,
THREAD_UNLOADING_DELAY,
)
.await
let Some(mut unloading_state) =
UnloadingState::new(&listener_task_context, conversation_id).await
else {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
@@ -8306,6 +8349,9 @@ async fn handle_pending_thread_resume_request(
)
.await
{
let _ = thread_state_manager
.unsubscribe_connection_from_thread(conversation_id, connection_id)
.await;
outgoing
.send_error(request_id, internal_error(message))
.await;
@@ -8326,6 +8372,9 @@ async fn handle_pending_thread_resume_request(
let pending_thread_unloads = pending_thread_unloads.lock().await;
if pending_thread_unloads.contains(&conversation_id) {
drop(pending_thread_unloads);
let _ = thread_state_manager
.unsubscribe_connection_from_thread(conversation_id, connection_id)
.await;
outgoing
.send_error(
request_id,
@@ -10879,7 +10928,9 @@ mod tests {
let connection = ConnectionId(1);
let (cancel_tx, cancel_rx) = oneshot::channel();
manager.connection_initialized(connection).await;
manager
.connection_initialized(connection, Duration::ZERO)
.await;
manager
.try_ensure_connection_subscribed(
thread_id, connection, /*experimental_raw_events*/ false,
@@ -10922,8 +10973,12 @@ mod tests {
let connection_b = ConnectionId(2);
let (cancel_tx, mut cancel_rx) = oneshot::channel();
manager.connection_initialized(connection_a).await;
manager.connection_initialized(connection_b).await;
manager
.connection_initialized(connection_a, Duration::ZERO)
.await;
manager
.connection_initialized(connection_b, Duration::ZERO)
.await;
manager
.try_ensure_connection_subscribed(
thread_id,
@@ -10961,14 +11016,18 @@ mod tests {
}
#[tokio::test]
async fn adding_connection_to_thread_updates_has_connections_watcher() -> Result<()> {
async fn adding_connection_to_thread_updates_subscription_watcher() -> Result<()> {
let manager = ThreadStateManager::new();
let thread_id = ThreadId::from_string("ad7f0408-99b8-4f6e-a46f-bd0eec433370")?;
let connection_a = ConnectionId(1);
let connection_b = ConnectionId(2);
manager.connection_initialized(connection_a).await;
manager.connection_initialized(connection_b).await;
manager
.connection_initialized(connection_a, Duration::ZERO)
.await;
manager
.connection_initialized(connection_b, Duration::from_secs(30 * 60))
.await;
manager
.try_ensure_connection_subscribed(
thread_id,
@@ -10977,33 +11036,51 @@ mod tests {
)
.await
.expect("connection_a should be live");
let mut has_connections = manager
.subscribe_to_has_connections(thread_id)
let mut subscription = manager
.subscribe_to_subscriptions(thread_id)
.await
.expect("thread should have a has-connections watcher");
assert!(*has_connections.borrow());
assert!(
manager
.unsubscribe_connection_from_thread(thread_id, connection_a)
.await
);
tokio::time::timeout(Duration::from_secs(1), has_connections.changed())
.await
.expect("timed out waiting for no-subscriber update")
.expect("has-connections watcher should remain open");
assert!(!*has_connections.borrow());
.expect("thread should have a subscription watcher");
assert!(subscription.borrow().has_connections);
assert_eq!(subscription.borrow().unloading_delay, Duration::ZERO);
assert!(
manager
.try_add_connection_to_thread(thread_id, connection_b)
.await
);
tokio::time::timeout(Duration::from_secs(1), has_connections.changed())
tokio::time::timeout(Duration::from_secs(1), subscription.changed())
.await
.expect("timed out waiting for subscriber update")
.expect("has-connections watcher should remain open");
assert!(*has_connections.borrow());
.expect("timed out waiting for max-delay update")
.expect("subscription watcher should remain open");
assert!(subscription.borrow().has_connections);
assert_eq!(
subscription.borrow().unloading_delay,
Duration::from_secs(30 * 60)
);
assert!(
manager
.unsubscribe_connection_from_thread(thread_id, connection_b)
.await
);
tokio::time::timeout(Duration::from_secs(1), subscription.changed())
.await
.expect("timed out waiting for remaining-subscriber update")
.expect("subscription watcher should remain open");
assert!(subscription.borrow().has_connections);
assert_eq!(subscription.borrow().unloading_delay, Duration::ZERO);
assert!(
manager
.unsubscribe_connection_from_thread(thread_id, connection_a)
.await
);
tokio::time::timeout(Duration::from_secs(1), subscription.changed())
.await
.expect("timed out waiting for no-subscriber update")
.expect("subscription watcher should remain open");
assert!(!subscription.borrow().has_connections);
assert_eq!(subscription.borrow().unloading_delay, Duration::ZERO);
Ok(())
}
@@ -11013,7 +11090,9 @@ mod tests {
let thread_id = ThreadId::from_string("ad7f0408-99b8-4f6e-a46f-bd0eec433370")?;
let connection = ConnectionId(1);
manager.connection_initialized(connection).await;
manager
.connection_initialized(connection, Duration::ZERO)
.await;
let threads_to_unload = manager.remove_connection(connection).await;
assert_eq!(threads_to_unload, Vec::<ThreadId>::new());

View File

@@ -908,7 +908,12 @@ pub async fn run_main_with_transport_options(
),
)
.await;
processor.connection_initialized(connection_id).await;
processor
.connection_initialized(
connection_id,
&connection_state.session,
)
.await;
connection_state
.outbound_initialized
.store(true, std::sync::atomic::Ordering::Release);

View File

@@ -215,6 +215,10 @@ impl ConnectionSessionState {
self.origin.allows_device_key_requests()
}
fn thread_unloading_delay(&self) -> Duration {
self.origin.thread_unloading_delay()
}
pub(crate) fn experimental_api_enabled(&self) -> bool {
self.initialized
.get()
@@ -506,9 +510,13 @@ impl MessageProcessor {
}
}
pub(crate) async fn connection_initialized(&self, connection_id: ConnectionId) {
pub(crate) async fn connection_initialized(
&self,
connection_id: ConnectionId,
session_state: &ConnectionSessionState,
) {
self.codex_message_processor
.connection_initialized(connection_id)
.connection_initialized(connection_id, session_state.thread_unloading_delay())
.await;
}
@@ -695,7 +703,7 @@ impl MessageProcessor {
// initialize handling for the specific connection.
outbound_initialized.store(true, Ordering::Release);
self.codex_message_processor
.connection_initialized(connection_id)
.connection_initialized(connection_id, session.thread_unloading_delay())
.await;
}
return Ok(());

View File

@@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::Weak;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
@@ -174,25 +175,41 @@ pub(crate) async fn resolve_server_request_on_thread_listener(
struct ThreadEntry {
state: Arc<Mutex<ThreadState>>,
connection_ids: HashSet<ConnectionId>,
has_connections_watcher: watch::Sender<bool>,
connection_unloading_delays: HashMap<ConnectionId, Duration>,
idle_unloading_delay: Duration,
subscription_watcher: watch::Sender<ThreadSubscriptionState>,
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub(crate) struct ThreadSubscriptionState {
pub(crate) has_connections: bool,
pub(crate) unloading_delay: Duration,
}
impl Default for ThreadEntry {
fn default() -> Self {
Self {
state: Arc::new(Mutex::new(ThreadState::default())),
connection_ids: HashSet::new(),
has_connections_watcher: watch::channel(false).0,
connection_unloading_delays: HashMap::new(),
idle_unloading_delay: Duration::ZERO,
subscription_watcher: watch::channel(ThreadSubscriptionState::default()).0,
}
}
}
impl ThreadEntry {
fn update_has_connections(&self) {
let _ = self.has_connections_watcher.send_if_modified(|current| {
fn update_subscription_state(&mut self) {
if let Some(delay) = self.connection_unloading_delays.values().max().copied() {
self.idle_unloading_delay = delay;
}
let next = ThreadSubscriptionState {
has_connections: !self.connection_unloading_delays.is_empty(),
unloading_delay: self.idle_unloading_delay,
};
let _ = self.subscription_watcher.send_if_modified(|current| {
let prev = *current;
*current = !self.connection_ids.is_empty();
*current = next;
prev != *current
});
}
@@ -200,7 +217,7 @@ impl ThreadEntry {
#[derive(Default)]
struct ThreadStateManagerInner {
live_connections: HashSet<ConnectionId>,
live_connections: HashMap<ConnectionId, Duration>,
threads: HashMap<ThreadId, ThreadEntry>,
thread_ids_by_connection: HashMap<ConnectionId, HashSet<ThreadId>>,
}
@@ -215,12 +232,16 @@ impl ThreadStateManager {
Self::default()
}
pub(crate) async fn connection_initialized(&self, connection_id: ConnectionId) {
pub(crate) async fn connection_initialized(
&self,
connection_id: ConnectionId,
unloading_delay: Duration,
) {
self.state
.lock()
.await
.live_connections
.insert(connection_id);
.insert(connection_id, unloading_delay);
}
pub(crate) async fn subscribed_connection_ids(&self, thread_id: ThreadId) -> Vec<ConnectionId> {
@@ -228,7 +249,13 @@ impl ThreadStateManager {
state
.threads
.get(&thread_id)
.map(|thread_entry| thread_entry.connection_ids.iter().copied().collect())
.map(|thread_entry| {
thread_entry
.connection_unloading_delays
.keys()
.copied()
.collect()
})
.unwrap_or_default()
}
@@ -313,8 +340,10 @@ impl ThreadStateManager {
}
}
if let Some(thread_entry) = state.threads.get_mut(&thread_id) {
thread_entry.connection_ids.remove(&connection_id);
thread_entry.update_has_connections();
thread_entry
.connection_unloading_delays
.remove(&connection_id);
thread_entry.update_subscription_state();
}
};
@@ -328,7 +357,7 @@ impl ThreadStateManager {
.await
.threads
.get(&thread_id)
.is_some_and(|thread_entry| !thread_entry.connection_ids.is_empty())
.is_some_and(|thread_entry| !thread_entry.connection_unloading_delays.is_empty())
}
pub(crate) async fn try_ensure_connection_subscribed(
@@ -339,17 +368,17 @@ impl ThreadStateManager {
) -> Option<Arc<Mutex<ThreadState>>> {
let thread_state = {
let mut state = self.state.lock().await;
if !state.live_connections.contains(&connection_id) {
return None;
}
let unloading_delay = state.live_connections.get(&connection_id).copied()?;
state
.thread_ids_by_connection
.entry(connection_id)
.or_default()
.insert(thread_id);
let thread_entry = state.threads.entry(thread_id).or_default();
thread_entry.connection_ids.insert(connection_id);
thread_entry.update_has_connections();
thread_entry
.connection_unloading_delays
.insert(connection_id, unloading_delay);
thread_entry.update_subscription_state();
thread_entry.state.clone()
};
{
@@ -367,17 +396,19 @@ impl ThreadStateManager {
connection_id: ConnectionId,
) -> bool {
let mut state = self.state.lock().await;
if !state.live_connections.contains(&connection_id) {
let Some(unloading_delay) = state.live_connections.get(&connection_id).copied() else {
return false;
}
};
state
.thread_ids_by_connection
.entry(connection_id)
.or_default()
.insert(thread_id);
let thread_entry = state.threads.entry(thread_id).or_default();
thread_entry.connection_ids.insert(connection_id);
thread_entry.update_has_connections();
thread_entry
.connection_unloading_delays
.insert(connection_id, unloading_delay);
thread_entry.update_subscription_state();
true
}
@@ -391,30 +422,31 @@ impl ThreadStateManager {
.unwrap_or_default();
for thread_id in &thread_ids {
if let Some(thread_entry) = state.threads.get_mut(thread_id) {
thread_entry.connection_ids.remove(&connection_id);
thread_entry.update_has_connections();
thread_entry
.connection_unloading_delays
.remove(&connection_id);
thread_entry.update_subscription_state();
}
}
thread_ids
.into_iter()
.filter(|thread_id| {
state
.threads
.get(thread_id)
.is_some_and(|thread_entry| thread_entry.connection_ids.is_empty())
state.threads.get(thread_id).is_some_and(|thread_entry| {
thread_entry.connection_unloading_delays.is_empty()
})
})
.collect::<Vec<_>>()
}
}
pub(crate) async fn subscribe_to_has_connections(
pub(crate) async fn subscribe_to_subscriptions(
&self,
thread_id: ThreadId,
) -> Option<watch::Receiver<bool>> {
) -> Option<watch::Receiver<ThreadSubscriptionState>> {
let state = self.state.lock().await;
state
.threads
.get(&thread_id)
.map(|thread_entry| thread_entry.has_connections_watcher.subscribe())
.map(|thread_entry| thread_entry.subscription_watcher.subscribe())
}
}

View File

@@ -23,6 +23,7 @@ use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::error;
@@ -176,10 +177,20 @@ pub(crate) enum ConnectionOrigin {
Stdio,
InProcess,
WebSocket,
UnixSocket,
RemoteControl,
}
impl ConnectionOrigin {
pub(crate) fn thread_unloading_delay(self) -> Duration {
match self {
Self::Stdio | Self::InProcess => Duration::ZERO,
Self::WebSocket | Self::UnixSocket | Self::RemoteControl => {
Duration::from_secs(30 * 60)
}
}
}
pub(crate) fn allows_device_key_requests(self) -> bool {
// Device-key endpoints are only for local connections that own the app-server instance.
// Do not include remote transports such as SSH or remote-control websocket connections.
@@ -507,6 +518,30 @@ mod tests {
})
}
#[test]
fn connection_origin_sets_thread_unloading_delay() {
assert_eq!(
ConnectionOrigin::Stdio.thread_unloading_delay(),
Duration::ZERO
);
assert_eq!(
ConnectionOrigin::InProcess.thread_unloading_delay(),
Duration::ZERO
);
assert_eq!(
ConnectionOrigin::WebSocket.thread_unloading_delay(),
Duration::from_secs(30 * 60)
);
assert_eq!(
ConnectionOrigin::UnixSocket.thread_unloading_delay(),
Duration::from_secs(30 * 60)
);
assert_eq!(
ConnectionOrigin::RemoteControl.thread_unloading_delay(),
Duration::from_secs(30 * 60)
);
}
#[test]
fn listen_off_parses_as_off_transport() {
assert_eq!(

View File

@@ -2,6 +2,7 @@ use std::io::ErrorKind;
use std::io::Result as IoResult;
use std::path::Path;
use super::ConnectionOrigin;
use super::TransportEvent;
use crate::transport::websocket::run_websocket_connection;
use codex_uds::UnixListener;
@@ -83,7 +84,13 @@ async fn run_control_socket_acceptor(
}
};
let (websocket_writer, websocket_reader) = websocket_stream.split();
run_websocket_connection(websocket_writer, websocket_reader, transport_event_tx).await;
run_websocket_connection(
websocket_writer,
websocket_reader,
transport_event_tx,
ConnectionOrigin::UnixSocket,
)
.await;
});
}
info!("control socket acceptor shutting down");

View File

@@ -122,8 +122,13 @@ async fn websocket_upgrade_handler(
websocket
.on_upgrade(move |stream| async move {
let (websocket_writer, websocket_reader) = stream.split();
run_websocket_connection(websocket_writer, websocket_reader, state.transport_event_tx)
.await;
run_websocket_connection(
websocket_writer,
websocket_reader,
state.transport_event_tx,
ConnectionOrigin::WebSocket,
)
.await;
})
.into_response()
}
@@ -173,6 +178,7 @@ pub(crate) async fn run_websocket_connection<M, SinkError, StreamError>(
websocket_writer: impl futures::sink::Sink<M, Error = SinkError> + Send + 'static,
websocket_reader: impl futures::stream::Stream<Item = Result<M, StreamError>> + Send + 'static,
transport_event_tx: mpsc::Sender<TransportEvent>,
origin: ConnectionOrigin,
) where
M: AppServerWebSocketMessage + Send + 'static,
SinkError: Send + 'static,
@@ -186,7 +192,7 @@ pub(crate) async fn run_websocket_connection<M, SinkError, StreamError>(
if transport_event_tx
.send(TransportEvent::ConnectionOpened {
connection_id,
origin: ConnectionOrigin::WebSocket,
origin,
writer: writer_tx,
disconnect_sender: Some(disconnect_token.clone()),
})

View File

@@ -15,8 +15,6 @@ use codex_app_server_protocol::ThreadLoadedListParams;
use codex_app_server_protocol::ThreadLoadedListResponse;
use codex_app_server_protocol::ThreadReadParams;
use codex_app_server_protocol::ThreadReadResponse;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
@@ -36,7 +34,7 @@ use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_unsubscribe_keeps_thread_loaded_until_idle_timeout() -> Result<()> {
async fn thread_unsubscribe_over_stdio_unloads_idle_thread_immediately() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
@@ -59,14 +57,11 @@ async fn thread_unsubscribe_keeps_thread_loaded_until_idle_timeout() -> Result<(
let unsubscribe = to_response::<ThreadUnsubscribeResponse>(unsubscribe_resp)?;
assert_eq!(unsubscribe.status, ThreadUnsubscribeStatus::Unsubscribed);
assert!(
timeout(
std::time::Duration::from_millis(250),
mcp.read_stream_until_notification_message("thread/closed"),
)
.await
.is_err()
);
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/closed"),
)
.await??;
let list_id = mcp
.send_thread_loaded_list_request(ThreadLoadedListParams::default())
@@ -78,7 +73,7 @@ async fn thread_unsubscribe_keeps_thread_loaded_until_idle_timeout() -> Result<(
.await??;
let ThreadLoadedListResponse { data, next_cursor } =
to_response::<ThreadLoadedListResponse>(list_resp)?;
assert_eq!(data, vec![thread_id]);
assert_eq!(data, Vec::<String>::new());
assert_eq!(next_cursor, None);
Ok(())
@@ -242,7 +237,7 @@ async fn thread_unsubscribe_during_turn_keeps_turn_running() -> Result<()> {
}
#[tokio::test]
async fn thread_unsubscribe_preserves_cached_status_before_idle_unload() -> Result<()> {
async fn thread_unsubscribe_over_stdio_unloads_and_read_reports_not_loaded() -> Result<()> {
let server = responses::start_mock_server().await;
let _response_mock = responses::mount_sse_once(
&server,
@@ -305,34 +300,31 @@ async fn thread_unsubscribe_preserves_cached_status_before_idle_unload() -> Resu
.await??;
let unsubscribe = to_response::<ThreadUnsubscribeResponse>(unsubscribe_resp)?;
assert_eq!(unsubscribe.status, ThreadUnsubscribeStatus::Unsubscribed);
assert!(
timeout(
std::time::Duration::from_millis(250),
mcp.read_stream_until_notification_message("thread/closed"),
)
.await
.is_err()
);
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id,
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
mcp.read_stream_until_notification_message("thread/closed"),
)
.await??;
let resume: ThreadResumeResponse = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(resume.thread.status, ThreadStatus::SystemError);
let read_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id,
include_turns: false,
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread, .. } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(thread.status, ThreadStatus::NotLoaded);
Ok(())
}
#[tokio::test]
async fn thread_unsubscribe_reports_not_subscribed_before_idle_unload() -> Result<()> {
async fn thread_unsubscribe_over_stdio_reports_not_loaded_after_immediate_unload() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
@@ -357,6 +349,11 @@ async fn thread_unsubscribe_reports_not_subscribed_before_idle_unload() -> Resul
first_unsubscribe.status,
ThreadUnsubscribeStatus::Unsubscribed
);
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/closed"),
)
.await??;
let second_unsubscribe_id = mcp
.send_thread_unsubscribe_request(ThreadUnsubscribeParams { thread_id })
@@ -369,7 +366,7 @@ async fn thread_unsubscribe_reports_not_subscribed_before_idle_unload() -> Resul
let second_unsubscribe = to_response::<ThreadUnsubscribeResponse>(second_unsubscribe_resp)?;
assert_eq!(
second_unsubscribe.status,
ThreadUnsubscribeStatus::NotSubscribed
ThreadUnsubscribeStatus::NotLoaded
);
Ok(())