Files
codex/codex-rs/app-server/src/thread_status.rs
Michael Bolin 61dfe0b86c chore: clean up argument-comment lint and roll out all-target CI on macOS (#16054)
## Why

`argument-comment-lint` was green in CI even though the repo still had
many uncommented literal arguments. The main gap was target coverage:
the repo wrapper did not force Cargo to inspect test-only call sites, so
examples like the `latest_session_lookup_params(true, ...)` tests in
`codex-rs/tui_app_server/src/lib.rs` never entered the blocking CI path.

This change cleans up the existing backlog, makes the default repo lint
path cover all Cargo targets, and starts rolling that stricter CI
enforcement out on the platform where it is currently validated.

## What changed

- mechanically fixed existing `argument-comment-lint` violations across
the `codex-rs` workspace, including tests, examples, and benches
- updated `tools/argument-comment-lint/run-prebuilt-linter.sh` and
`tools/argument-comment-lint/run.sh` so non-`--fix` runs default to
`--all-targets` unless the caller explicitly narrows the target set
- fixed both wrappers so forwarded cargo arguments after `--` are
preserved with a single separator
- documented the new default behavior in
`tools/argument-comment-lint/README.md`
- updated `rust-ci` so the macOS lint lane keeps the plain wrapper
invocation and therefore enforces `--all-targets`, while Linux and
Windows temporarily pass `-- --lib --bins`

That temporary CI split keeps the stricter all-targets check where it is
already cleaned up, while leaving room to finish the remaining Linux-
and Windows-specific target-gated cleanup before enabling
`--all-targets` on those runners. The Linux and Windows failures on the
intermediate revision were caused by the wrapper forwarding bug, not by
additional lint findings in those lanes.

## Validation

- `bash -n tools/argument-comment-lint/run.sh`
- `bash -n tools/argument-comment-lint/run-prebuilt-linter.sh`
- shell-level wrapper forwarding check for `-- --lib --bins`
- shell-level wrapper forwarding check for `-- --tests`
- `just argument-comment-lint`
- `cargo test` in `tools/argument-comment-lint`
- `cargo test -p codex-terminal-detection`

## Follow-up

- Clean up remaining Linux-only target-gated callsites, then switch the
Linux lint lane back to the plain wrapper invocation.
- Clean up remaining Windows-only target-gated callsites, then switch
the Windows lint lane back to the plain wrapper invocation.
2026-03-27 19:00:44 -07:00

813 lines
24 KiB
Rust

#[cfg(test)]
use crate::outgoing_message::OutgoingEnvelope;
#[cfg(test)]
use crate::outgoing_message::OutgoingMessage;
use crate::outgoing_message::OutgoingMessageSender;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadActiveFlag;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadStatusChangedNotification;
use std::collections::HashMap;
#[cfg(test)]
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Mutex;
#[cfg(test)]
use tokio::sync::mpsc;
use tokio::sync::watch;
#[derive(Clone)]
pub(crate) struct ThreadWatchManager {
state: Arc<Mutex<ThreadWatchState>>,
outgoing: Option<Arc<OutgoingMessageSender>>,
running_turn_count_tx: watch::Sender<usize>,
}
pub(crate) struct ThreadWatchActiveGuard {
manager: ThreadWatchManager,
thread_id: String,
guard_type: ThreadWatchActiveGuardType,
handle: tokio::runtime::Handle,
}
impl ThreadWatchActiveGuard {
fn new(
manager: ThreadWatchManager,
thread_id: String,
guard_type: ThreadWatchActiveGuardType,
) -> Self {
Self {
manager,
thread_id,
guard_type,
handle: tokio::runtime::Handle::current(),
}
}
}
impl Drop for ThreadWatchActiveGuard {
fn drop(&mut self) {
let manager = self.manager.clone();
let thread_id = self.thread_id.clone();
let guard_type = self.guard_type;
self.handle.spawn(async move {
manager
.note_active_guard_released(thread_id, guard_type)
.await;
});
}
}
#[derive(Clone, Copy)]
enum ThreadWatchActiveGuardType {
Permission,
UserInput,
}
impl Default for ThreadWatchManager {
fn default() -> Self {
Self::new()
}
}
impl ThreadWatchManager {
pub(crate) fn new() -> Self {
let (running_turn_count_tx, _running_turn_count_rx) = watch::channel(0);
Self {
state: Arc::new(Mutex::new(ThreadWatchState::default())),
outgoing: None,
running_turn_count_tx,
}
}
pub(crate) fn new_with_outgoing(outgoing: Arc<OutgoingMessageSender>) -> Self {
let (running_turn_count_tx, _running_turn_count_rx) = watch::channel(0);
Self {
state: Arc::new(Mutex::new(ThreadWatchState::default())),
outgoing: Some(outgoing),
running_turn_count_tx,
}
}
pub(crate) async fn upsert_thread(&self, thread: Thread) {
self.mutate_and_publish(move |state| {
state.upsert_thread(thread.id, /*emit_notification*/ true)
})
.await;
}
pub(crate) async fn upsert_thread_silently(&self, thread: Thread) {
self.mutate_and_publish(move |state| {
state.upsert_thread(thread.id, /*emit_notification*/ false)
})
.await;
}
pub(crate) async fn remove_thread(&self, thread_id: &str) {
let thread_id = thread_id.to_string();
self.mutate_and_publish(move |state| state.remove_thread(&thread_id))
.await;
}
pub(crate) async fn loaded_status_for_thread(&self, thread_id: &str) -> ThreadStatus {
self.state.lock().await.loaded_status_for_thread(thread_id)
}
pub(crate) async fn loaded_statuses_for_threads(
&self,
thread_ids: Vec<String>,
) -> HashMap<String, ThreadStatus> {
let state = self.state.lock().await;
thread_ids
.into_iter()
.map(|thread_id| {
let status = state.loaded_status_for_thread(&thread_id);
(thread_id, status)
})
.collect()
}
#[cfg(test)]
pub(crate) async fn running_turn_count(&self) -> usize {
self.state
.lock()
.await
.runtime_by_thread_id
.values()
.filter(|runtime| runtime.running)
.count()
}
pub(crate) fn subscribe_running_turn_count(&self) -> watch::Receiver<usize> {
self.running_turn_count_tx.subscribe()
}
pub(crate) async fn note_turn_started(&self, thread_id: &str) {
self.update_runtime_for_thread(thread_id, |runtime| {
runtime.is_loaded = true;
runtime.running = true;
runtime.has_system_error = false;
})
.await;
}
pub(crate) async fn note_turn_completed(&self, thread_id: &str, _failed: bool) {
self.clear_active_state(thread_id).await;
}
pub(crate) async fn note_turn_interrupted(&self, thread_id: &str) {
self.clear_active_state(thread_id).await;
}
pub(crate) async fn note_thread_shutdown(&self, thread_id: &str) {
self.update_runtime_for_thread(thread_id, |runtime| {
runtime.running = false;
runtime.pending_permission_requests = 0;
runtime.pending_user_input_requests = 0;
runtime.is_loaded = false;
})
.await;
}
pub(crate) async fn note_system_error(&self, thread_id: &str) {
self.update_runtime_for_thread(thread_id, |runtime| {
runtime.running = false;
runtime.pending_permission_requests = 0;
runtime.pending_user_input_requests = 0;
runtime.has_system_error = true;
})
.await;
}
async fn clear_active_state(&self, thread_id: &str) {
self.update_runtime_for_thread(thread_id, move |runtime| {
runtime.running = false;
runtime.pending_permission_requests = 0;
runtime.pending_user_input_requests = 0;
})
.await;
}
pub(crate) async fn note_permission_requested(
&self,
thread_id: &str,
) -> ThreadWatchActiveGuard {
self.note_pending_request(thread_id, ThreadWatchActiveGuardType::Permission)
.await
}
pub(crate) async fn note_user_input_requested(
&self,
thread_id: &str,
) -> ThreadWatchActiveGuard {
self.note_pending_request(thread_id, ThreadWatchActiveGuardType::UserInput)
.await
}
async fn note_pending_request(
&self,
thread_id: &str,
guard_type: ThreadWatchActiveGuardType,
) -> ThreadWatchActiveGuard {
self.update_runtime_for_thread(thread_id, move |runtime| {
runtime.is_loaded = true;
let counter = Self::pending_counter(runtime, guard_type);
*counter = counter.saturating_add(1);
})
.await;
ThreadWatchActiveGuard::new(self.clone(), thread_id.to_string(), guard_type)
}
async fn mutate_and_publish<F>(&self, mutate: F)
where
F: FnOnce(&mut ThreadWatchState) -> Option<ThreadStatusChangedNotification>,
{
let (notification, running_turn_count) = {
let mut state = self.state.lock().await;
let notification = mutate(&mut state);
let running_turn_count = state
.runtime_by_thread_id
.values()
.filter(|runtime| runtime.running)
.count();
(notification, running_turn_count)
};
let _ = self.running_turn_count_tx.send(running_turn_count);
if let Some(notification) = notification
&& let Some(outgoing) = &self.outgoing
{
outgoing
.send_server_notification(ServerNotification::ThreadStatusChanged(notification))
.await;
}
}
async fn note_active_guard_released(
&self,
thread_id: String,
guard_type: ThreadWatchActiveGuardType,
) {
self.update_runtime_for_thread(&thread_id, move |runtime| {
let counter = Self::pending_counter(runtime, guard_type);
*counter = counter.saturating_sub(1);
})
.await;
}
async fn update_runtime_for_thread<F>(&self, thread_id: &str, update: F)
where
F: FnOnce(&mut RuntimeFacts),
{
let thread_id = thread_id.to_string();
self.mutate_and_publish(move |state| state.update_runtime(&thread_id, update))
.await;
}
fn pending_counter(
runtime: &mut RuntimeFacts,
guard_type: ThreadWatchActiveGuardType,
) -> &mut u32 {
match guard_type {
ThreadWatchActiveGuardType::Permission => &mut runtime.pending_permission_requests,
ThreadWatchActiveGuardType::UserInput => &mut runtime.pending_user_input_requests,
}
}
}
pub(crate) fn resolve_thread_status(
status: ThreadStatus,
has_in_progress_turn: bool,
) -> ThreadStatus {
// Running-turn events can arrive before the watch runtime state is observed by
// the listener loop. In that window we prefer to reflect a real active turn as
// `Active` instead of `Idle`/`NotLoaded`.
if has_in_progress_turn && matches!(status, ThreadStatus::Idle | ThreadStatus::NotLoaded) {
return ThreadStatus::Active {
active_flags: Vec::new(),
};
}
status
}
#[derive(Default)]
struct ThreadWatchState {
runtime_by_thread_id: HashMap<String, RuntimeFacts>,
}
impl ThreadWatchState {
fn upsert_thread(
&mut self,
thread_id: String,
emit_notification: bool,
) -> Option<ThreadStatusChangedNotification> {
let previous_status = self.status_for(&thread_id);
let runtime = self
.runtime_by_thread_id
.entry(thread_id.clone())
.or_default();
runtime.is_loaded = true;
if emit_notification {
self.status_changed_notification(thread_id, previous_status)
} else {
None
}
}
fn remove_thread(&mut self, thread_id: &str) -> Option<ThreadStatusChangedNotification> {
let previous_status = self.status_for(thread_id);
self.runtime_by_thread_id.remove(thread_id);
if previous_status.is_some() && previous_status != Some(ThreadStatus::NotLoaded) {
Some(ThreadStatusChangedNotification {
thread_id: thread_id.to_string(),
status: ThreadStatus::NotLoaded,
})
} else {
None
}
}
fn update_runtime<F>(
&mut self,
thread_id: &str,
mutate: F,
) -> Option<ThreadStatusChangedNotification>
where
F: FnOnce(&mut RuntimeFacts),
{
let previous_status = self.status_for(thread_id);
let runtime = self
.runtime_by_thread_id
.entry(thread_id.to_string())
.or_default();
runtime.is_loaded = true;
mutate(runtime);
self.status_changed_notification(thread_id.to_string(), previous_status)
}
fn status_for(&self, thread_id: &str) -> Option<ThreadStatus> {
self.runtime_by_thread_id
.get(thread_id)
.map(loaded_thread_status)
}
fn loaded_status_for_thread(&self, thread_id: &str) -> ThreadStatus {
self.status_for(thread_id)
.unwrap_or(ThreadStatus::NotLoaded)
}
fn status_changed_notification(
&self,
thread_id: String,
previous_status: Option<ThreadStatus>,
) -> Option<ThreadStatusChangedNotification> {
let status = self.status_for(&thread_id)?;
if previous_status.as_ref() == Some(&status) {
return None;
}
Some(ThreadStatusChangedNotification { thread_id, status })
}
}
#[derive(Clone, Default)]
struct RuntimeFacts {
is_loaded: bool,
running: bool,
pending_permission_requests: u32,
pending_user_input_requests: u32,
has_system_error: bool,
}
fn loaded_thread_status(runtime: &RuntimeFacts) -> ThreadStatus {
if !runtime.is_loaded {
return ThreadStatus::NotLoaded;
}
let mut active_flags = Vec::new();
if runtime.pending_permission_requests > 0 {
active_flags.push(ThreadActiveFlag::WaitingOnApproval);
}
if runtime.pending_user_input_requests > 0 {
active_flags.push(ThreadActiveFlag::WaitingOnUserInput);
}
if runtime.running || !active_flags.is_empty() {
return ThreadStatus::Active { active_flags };
}
if runtime.has_system_error {
return ThreadStatus::SystemError;
}
ThreadStatus::Idle
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use tokio::time::Duration;
use tokio::time::timeout;
const INTERACTIVE_THREAD_ID: &str = "00000000-0000-0000-0000-000000000001";
const NON_INTERACTIVE_THREAD_ID: &str = "00000000-0000-0000-0000-000000000002";
#[tokio::test]
async fn loaded_status_defaults_to_not_loaded_for_untracked_threads() {
let manager = ThreadWatchManager::new();
assert_eq!(
manager
.loaded_status_for_thread("00000000-0000-0000-0000-000000000003")
.await,
ThreadStatus::NotLoaded,
);
}
#[tokio::test]
async fn tracks_non_interactive_thread_status() {
let manager = ThreadWatchManager::new();
manager
.upsert_thread(test_thread(
NON_INTERACTIVE_THREAD_ID,
codex_app_server_protocol::SessionSource::AppServer,
))
.await;
manager.note_turn_started(NON_INTERACTIVE_THREAD_ID).await;
assert_eq!(
manager
.loaded_status_for_thread(NON_INTERACTIVE_THREAD_ID)
.await,
ThreadStatus::Active {
active_flags: vec![],
},
);
}
#[tokio::test]
async fn status_updates_track_single_thread() {
let manager = ThreadWatchManager::new();
manager
.upsert_thread(test_thread(
INTERACTIVE_THREAD_ID,
codex_app_server_protocol::SessionSource::Cli,
))
.await;
manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
assert_eq!(
manager
.loaded_status_for_thread(INTERACTIVE_THREAD_ID)
.await,
ThreadStatus::Active {
active_flags: vec![],
},
);
let permission_guard = manager
.note_permission_requested(INTERACTIVE_THREAD_ID)
.await;
assert_eq!(
manager
.loaded_status_for_thread(INTERACTIVE_THREAD_ID)
.await,
ThreadStatus::Active {
active_flags: vec![ThreadActiveFlag::WaitingOnApproval],
},
);
let user_input_guard = manager
.note_user_input_requested(INTERACTIVE_THREAD_ID)
.await;
assert_eq!(
manager
.loaded_status_for_thread(INTERACTIVE_THREAD_ID)
.await,
ThreadStatus::Active {
active_flags: vec![
ThreadActiveFlag::WaitingOnApproval,
ThreadActiveFlag::WaitingOnUserInput,
],
},
);
drop(permission_guard);
wait_for_status(
&manager,
INTERACTIVE_THREAD_ID,
ThreadStatus::Active {
active_flags: vec![ThreadActiveFlag::WaitingOnUserInput],
},
)
.await;
drop(user_input_guard);
wait_for_status(
&manager,
INTERACTIVE_THREAD_ID,
ThreadStatus::Active {
active_flags: vec![],
},
)
.await;
manager
.note_turn_completed(INTERACTIVE_THREAD_ID, false)
.await;
assert_eq!(
manager
.loaded_status_for_thread(INTERACTIVE_THREAD_ID)
.await,
ThreadStatus::Idle,
);
}
#[test]
fn resolves_in_progress_turn_to_active_status() {
let status = resolve_thread_status(ThreadStatus::Idle, /*has_in_progress_turn*/ true);
assert_eq!(
status,
ThreadStatus::Active {
active_flags: Vec::new(),
}
);
let status =
resolve_thread_status(ThreadStatus::NotLoaded, /*has_in_progress_turn*/ true);
assert_eq!(
status,
ThreadStatus::Active {
active_flags: Vec::new(),
}
);
}
#[test]
fn keeps_status_when_no_in_progress_turn() {
assert_eq!(
resolve_thread_status(ThreadStatus::Idle, /*has_in_progress_turn*/ false),
ThreadStatus::Idle
);
assert_eq!(
resolve_thread_status(
ThreadStatus::SystemError,
/*has_in_progress_turn*/ false
),
ThreadStatus::SystemError
);
}
#[tokio::test]
async fn system_error_sets_idle_flag_until_next_turn() {
let manager = ThreadWatchManager::new();
manager
.upsert_thread(test_thread(
INTERACTIVE_THREAD_ID,
codex_app_server_protocol::SessionSource::Cli,
))
.await;
manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
manager.note_system_error(INTERACTIVE_THREAD_ID).await;
assert_eq!(
manager
.loaded_status_for_thread(INTERACTIVE_THREAD_ID)
.await,
ThreadStatus::SystemError,
);
manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
assert_eq!(
manager
.loaded_status_for_thread(INTERACTIVE_THREAD_ID)
.await,
ThreadStatus::Active {
active_flags: vec![],
},
);
}
#[tokio::test]
async fn shutdown_marks_thread_not_loaded() {
let manager = ThreadWatchManager::new();
manager
.upsert_thread(test_thread(
INTERACTIVE_THREAD_ID,
codex_app_server_protocol::SessionSource::Cli,
))
.await;
manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
manager.note_thread_shutdown(INTERACTIVE_THREAD_ID).await;
assert_eq!(
manager
.loaded_status_for_thread(INTERACTIVE_THREAD_ID)
.await,
ThreadStatus::NotLoaded,
);
}
#[tokio::test]
async fn loaded_statuses_default_to_not_loaded_for_untracked_threads() {
let manager = ThreadWatchManager::new();
manager
.upsert_thread(test_thread(
INTERACTIVE_THREAD_ID,
codex_app_server_protocol::SessionSource::Cli,
))
.await;
manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
let statuses = manager
.loaded_statuses_for_threads(vec![
INTERACTIVE_THREAD_ID.to_string(),
NON_INTERACTIVE_THREAD_ID.to_string(),
])
.await;
assert_eq!(
statuses.get(INTERACTIVE_THREAD_ID),
Some(&ThreadStatus::Active {
active_flags: vec![],
}),
);
assert_eq!(
statuses.get(NON_INTERACTIVE_THREAD_ID),
Some(&ThreadStatus::NotLoaded),
);
}
#[tokio::test]
async fn has_running_turns_tracks_runtime_running_flag_only() {
let manager = ThreadWatchManager::new();
manager
.upsert_thread(test_thread(
INTERACTIVE_THREAD_ID,
codex_app_server_protocol::SessionSource::Cli,
))
.await;
assert_eq!(manager.running_turn_count().await, 0);
let _permission_guard = manager
.note_permission_requested(INTERACTIVE_THREAD_ID)
.await;
assert_eq!(manager.running_turn_count().await, 0);
manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
assert_eq!(manager.running_turn_count().await, 1);
manager
.note_turn_completed(INTERACTIVE_THREAD_ID, false)
.await;
assert_eq!(manager.running_turn_count().await, 0);
}
#[tokio::test]
async fn status_change_emits_notification() {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
outgoing_tx,
)));
manager
.upsert_thread(test_thread(
INTERACTIVE_THREAD_ID,
codex_app_server_protocol::SessionSource::Cli,
))
.await;
assert_eq!(
recv_status_changed_notification(&mut outgoing_rx).await,
ThreadStatusChangedNotification {
thread_id: INTERACTIVE_THREAD_ID.to_string(),
status: ThreadStatus::Idle,
},
);
manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
assert_eq!(
recv_status_changed_notification(&mut outgoing_rx).await,
ThreadStatusChangedNotification {
thread_id: INTERACTIVE_THREAD_ID.to_string(),
status: ThreadStatus::Active {
active_flags: vec![],
},
},
);
manager.remove_thread(INTERACTIVE_THREAD_ID).await;
assert_eq!(
recv_status_changed_notification(&mut outgoing_rx).await,
ThreadStatusChangedNotification {
thread_id: INTERACTIVE_THREAD_ID.to_string(),
status: ThreadStatus::NotLoaded,
},
);
}
#[tokio::test]
async fn silent_upsert_skips_initial_notification() {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8);
let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new(
outgoing_tx,
)));
manager
.upsert_thread_silently(test_thread(
INTERACTIVE_THREAD_ID,
codex_app_server_protocol::SessionSource::Cli,
))
.await;
assert_eq!(
manager
.loaded_status_for_thread(INTERACTIVE_THREAD_ID)
.await,
ThreadStatus::Idle,
);
assert!(
timeout(Duration::from_millis(100), outgoing_rx.recv())
.await
.is_err(),
"silent upsert should not emit thread/status/changed"
);
manager.note_turn_started(INTERACTIVE_THREAD_ID).await;
assert_eq!(
recv_status_changed_notification(&mut outgoing_rx).await,
ThreadStatusChangedNotification {
thread_id: INTERACTIVE_THREAD_ID.to_string(),
status: ThreadStatus::Active {
active_flags: vec![],
},
},
);
}
async fn wait_for_status(
manager: &ThreadWatchManager,
thread_id: &str,
expected_status: ThreadStatus,
) {
timeout(Duration::from_secs(1), async {
loop {
let status = manager.loaded_status_for_thread(thread_id).await;
if status == expected_status {
break;
}
tokio::task::yield_now().await;
}
})
.await
.expect("timed out waiting for status");
}
async fn recv_status_changed_notification(
outgoing_rx: &mut mpsc::Receiver<OutgoingEnvelope>,
) -> ThreadStatusChangedNotification {
let envelope = timeout(Duration::from_secs(1), outgoing_rx.recv())
.await
.expect("timed out waiting for outgoing notification")
.expect("outgoing channel closed unexpectedly");
let OutgoingEnvelope::Broadcast { message } = envelope else {
panic!("expected broadcast notification");
};
let OutgoingMessage::AppServerNotification(ServerNotification::ThreadStatusChanged(
notification,
)) = message
else {
panic!("expected thread/status/changed notification");
};
notification
}
fn test_thread(thread_id: &str, source: codex_app_server_protocol::SessionSource) -> Thread {
Thread {
id: thread_id.to_string(),
preview: String::new(),
ephemeral: false,
model_provider: "mock-provider".to_string(),
created_at: 0,
updated_at: 0,
status: ThreadStatus::NotLoaded,
path: None,
cwd: PathBuf::from("/tmp"),
cli_version: "test".to_string(),
agent_nickname: None,
agent_role: None,
source,
git_info: None,
name: None,
turns: Vec::new(),
}
}
}