app-server: expose loaded thread status via read/list and notifications (#11786)

Motivation
- Today, a newly connected client has no direct way to determine the
current runtime status of threads from read/list responses alone.
- This forces clients to infer state from transient events, which can
lead to stale or inconsistent UI when reconnecting or attaching late.

Changes
- Add `status` to `thread/read` responses.
- Add `statuses` to `thread/list` responses.
- Emit `thread/status/changed` notifications with `threadId` and the new
status.
- Track runtime status for all loaded threads and default unknown
threads to `idle`.
- Update protocol/docs/tests/schema fixtures for the revised API.

Testing
- Validated protocol API changes with automated protocol tests and
regenerated schema/type fixtures.
- Validated app-server behavior with unit and integration test suites,
including status transitions and notifications.
This commit is contained in:
Ruslan Nigmatullin
2026-02-18 15:20:03 -08:00
committed by GitHub
parent 216fe7f2ef
commit 1f54496c48
34 changed files with 2563 additions and 119 deletions

View File

@@ -25,6 +25,7 @@ mod thread_read;
mod thread_resume;
mod thread_rollback;
mod thread_start;
mod thread_status;
mod thread_unarchive;
mod turn_interrupt;
mod turn_start;

View File

@@ -14,6 +14,7 @@ use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput;
use pretty_assertions::assert_eq;
@@ -80,6 +81,7 @@ async fn thread_fork_creates_new_thread_and_emits_started() -> Result<()> {
assert_ne!(thread.id, conversation_id);
assert_eq!(thread.preview, preview);
assert_eq!(thread.model_provider, "mock_provider");
assert_eq!(thread.status, ThreadStatus::Idle);
let thread_path = thread.path.clone().expect("thread path");
assert!(thread_path.is_absolute());
assert_ne!(thread_path, original_path);

View File

@@ -2,6 +2,8 @@ use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout;
use app_test_support::create_fake_rollout_with_source;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::rollout_path;
use app_test_support::to_response;
use chrono::DateTime;
@@ -14,6 +16,12 @@ use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadSortKey;
use codex_app_server_protocol::ThreadSourceKind;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput;
use codex_core::ARCHIVED_SESSIONS_SUBDIR;
use codex_protocol::ThreadId;
use codex_protocol::protocol::GitInfo as CoreGitInfo;
@@ -21,6 +29,7 @@ use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionSource as CoreSessionSource;
use codex_protocol::protocol::SubAgentSource;
use core_test_support::responses;
use pretty_assertions::assert_eq;
use std::cmp::Reverse;
use std::fs;
@@ -157,7 +166,9 @@ async fn thread_list_basic_empty() -> Result<()> {
let mut mcp = init_mcp(codex_home.path()).await?;
let ThreadListResponse { data, next_cursor } = list_threads(
let ThreadListResponse {
data, next_cursor, ..
} = list_threads(
&mut mcp,
None,
Some(10),
@@ -172,6 +183,97 @@ async fn thread_list_basic_empty() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_list_reports_system_error_idle_flag_after_failed_turn() -> Result<()> {
let responses = vec![
create_final_assistant_message_sse_response("seeded")?,
responses::sse_failed("resp-2", "server_error", "simulated failure"),
];
let server = create_mock_responses_server_sequence(responses).await;
let codex_home = TempDir::new()?;
create_runtime_config(codex_home.path(), &server.uri())?;
let mut mcp = init_mcp(codex_home.path()).await?;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let seed_turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "seed history".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let seed_turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(seed_turn_id)),
)
.await??;
let _: TurnStartResponse = to_response::<TurnStartResponse>(seed_turn_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let failed_turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "fail turn".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let failed_turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(failed_turn_id)),
)
.await??;
let _: TurnStartResponse = to_response::<TurnStartResponse>(failed_turn_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("error"),
)
.await??;
let ThreadListResponse { data, .. } = list_threads(
&mut mcp,
None,
Some(10),
Some(vec!["mock_provider".to_string()]),
Some(vec![
ThreadSourceKind::AppServer,
ThreadSourceKind::Cli,
ThreadSourceKind::VsCode,
]),
None,
)
.await?;
let listed = data
.iter()
.find(|candidate| candidate.id == thread.id)
.expect("expected started thread to be listed");
assert_eq!(listed.status, ThreadStatus::SystemError,);
Ok(())
}
// Minimal config.toml for listing.
fn create_minimal_config(codex_home: &std::path::Path) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
@@ -184,6 +286,29 @@ approval_policy = "never"
)
}
fn create_runtime_config(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}
#[tokio::test]
async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
let codex_home = TempDir::new()?;
@@ -240,6 +365,7 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
assert_eq!(thread.cli_version, "0.0.0");
assert_eq!(thread.source, SessionSource::Cli);
assert_eq!(thread.git_info, None);
assert_eq!(thread.status, ThreadStatus::NotLoaded);
}
let cursor1 = cursor1.expect("expected nextCursor on first page");
@@ -266,6 +392,7 @@ async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
assert_eq!(thread.cli_version, "0.0.0");
assert_eq!(thread.source, SessionSource::Cli);
assert_eq!(thread.git_info, None);
assert_eq!(thread.status, ThreadStatus::NotLoaded);
}
assert_eq!(cursor2, None, "expected nextCursor to be null on last page");
@@ -298,7 +425,9 @@ async fn thread_list_respects_provider_filter() -> Result<()> {
let mut mcp = init_mcp(codex_home.path()).await?;
// Filter to only other_provider; expect 1 item, nextCursor None.
let ThreadListResponse { data, next_cursor } = list_threads(
let ThreadListResponse {
data, next_cursor, ..
} = list_threads(
&mut mcp,
None,
Some(10),
@@ -369,7 +498,9 @@ async fn thread_list_respects_cwd_filter() -> Result<()> {
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let ThreadListResponse { data, next_cursor } = to_response::<ThreadListResponse>(resp)?;
let ThreadListResponse {
data, next_cursor, ..
} = to_response::<ThreadListResponse>(resp)?;
assert_eq!(next_cursor, None);
assert_eq!(data.len(), 1);
@@ -405,7 +536,9 @@ async fn thread_list_empty_source_kinds_defaults_to_interactive_only() -> Result
let mut mcp = init_mcp(codex_home.path()).await?;
let ThreadListResponse { data, next_cursor } = list_threads(
let ThreadListResponse {
data, next_cursor, ..
} = list_threads(
&mut mcp,
None,
Some(10),
@@ -454,7 +587,9 @@ async fn thread_list_filters_by_source_kind_subagent_thread_spawn() -> Result<()
let mut mcp = init_mcp(codex_home.path()).await?;
let ThreadListResponse { data, next_cursor } = list_threads(
let ThreadListResponse {
data, next_cursor, ..
} = list_threads(
&mut mcp,
None,
Some(10),
@@ -607,7 +742,9 @@ async fn thread_list_fetches_until_limit_or_exhausted() -> Result<()> {
// Request 8 threads for the target provider; the matches only start on the
// third page so we rely on pagination to reach the limit.
let ThreadListResponse { data, next_cursor } = list_threads(
let ThreadListResponse {
data, next_cursor, ..
} = list_threads(
&mut mcp,
None,
Some(8),
@@ -653,7 +790,9 @@ async fn thread_list_enforces_max_limit() -> Result<()> {
let mut mcp = init_mcp(codex_home.path()).await?;
let ThreadListResponse { data, next_cursor } = list_threads(
let ThreadListResponse {
data, next_cursor, ..
} = list_threads(
&mut mcp,
None,
Some(200),
@@ -700,7 +839,9 @@ async fn thread_list_stops_when_not_enough_filtered_results_exist() -> Result<()
// Request more threads than exist after filtering; expect all matches to be
// returned with nextCursor None.
let ThreadListResponse { data, next_cursor } = list_threads(
let ThreadListResponse {
data, next_cursor, ..
} = list_threads(
&mut mcp,
None,
Some(10),
@@ -934,6 +1075,7 @@ async fn thread_list_updated_at_paginates_with_cursor() -> Result<()> {
let ThreadListResponse {
data: page1,
next_cursor: cursor1,
..
} = list_threads_with_sort(
&mut mcp,
None,
@@ -951,6 +1093,7 @@ async fn thread_list_updated_at_paginates_with_cursor() -> Result<()> {
let ThreadListResponse {
data: page2,
next_cursor: cursor2,
..
} = list_threads_with_sort(
&mut mcp,
Some(cursor1),

View File

@@ -12,10 +12,14 @@ use codex_app_server_protocol::ThreadReadParams;
use codex_app_server_protocol::ThreadReadResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput;
use codex_protocol::user_input::ByteRange;
use codex_protocol::user_input::TextElement;
use core_test_support::responses;
use pretty_assertions::assert_eq;
use std::path::Path;
use std::path::PathBuf;
@@ -73,6 +77,7 @@ async fn thread_read_returns_summary_without_turns() -> Result<()> {
assert_eq!(thread.source, SessionSource::Cli);
assert_eq!(thread.git_info, None);
assert_eq!(thread.turns.len(), 0);
assert_eq!(thread.status, ThreadStatus::NotLoaded);
Ok(())
}
@@ -133,6 +138,7 @@ async fn thread_read_can_include_turns() -> Result<()> {
}
other => panic!("expected user message item, got {other:?}"),
}
assert_eq!(thread.status, ThreadStatus::NotLoaded);
Ok(())
}
@@ -181,6 +187,7 @@ async fn thread_read_loaded_thread_returns_precomputed_path_before_materializati
assert_eq!(read.path, Some(thread_path));
assert!(read.preview.is_empty());
assert_eq!(read.turns.len(), 0);
assert_eq!(read.status, ThreadStatus::Idle);
Ok(())
}
@@ -236,6 +243,73 @@ async fn thread_read_include_turns_rejects_unmaterialized_loaded_thread() -> Res
Ok(())
}
#[tokio::test]
async fn thread_read_reports_system_error_idle_flag_after_failed_turn() -> Result<()> {
let server = responses::start_mock_server().await;
let _response_mock = responses::mount_sse_once(
&server,
responses::sse_failed("resp-1", "server_error", "simulated failure"),
)
.await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let turn_start_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "fail this turn".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_start_response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_start_id)),
)
.await??;
let _: TurnStartResponse = to_response::<TurnStartResponse>(turn_start_response)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("error"),
)
.await??;
let read_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: thread.id,
include_turns: false,
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(thread.status, ThreadStatus::SystemError,);
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");

View File

@@ -14,6 +14,7 @@ use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput;
@@ -129,6 +130,7 @@ async fn thread_resume_returns_rollout_history() -> Result<()> {
assert_eq!(thread.cli_version, "0.0.0");
assert_eq!(thread.source, SessionSource::Cli);
assert_eq!(thread.git_info, None);
assert_eq!(thread.status, ThreadStatus::Idle);
assert_eq!(
thread.turns.len(),
@@ -178,6 +180,7 @@ async fn thread_resume_without_overrides_does_not_change_updated_at_or_mtime() -
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(thread.updated_at, rollout.expected_updated_at);
assert_eq!(thread.status, ThreadStatus::Idle);
let after_modified = std::fs::metadata(&rollout.rollout_file_path)?.modified()?;
assert_eq!(after_modified, rollout.before_modified);
@@ -283,11 +286,16 @@ async fn thread_resume_keeps_in_flight_turn_streaming() -> Result<()> {
..Default::default()
})
.await?;
timeout(
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
secondary.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse {
thread: resumed_thread,
..
} = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_ne!(resumed_thread.status, ThreadStatus::NotLoaded);
timeout(
DEFAULT_READ_TIMEOUT,
@@ -582,8 +590,15 @@ async fn thread_resume_rejoins_running_thread_even_with_override_mismatch() -> R
primary.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse { model, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
let ThreadResumeResponse { thread, model, .. } =
to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(model, "gpt-5.1-codex-max");
assert_eq!(
thread.status,
ThreadStatus::Active {
active_flags: vec![],
}
);
timeout(
DEFAULT_READ_TIMEOUT,
@@ -630,6 +645,7 @@ async fn thread_resume_with_overrides_defers_updated_at_until_turn_start() -> Re
} = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(resumed_thread.updated_at, expected_updated_at);
assert_eq!(resumed_thread.status, ThreadStatus::Idle);
let after_resume_modified = std::fs::metadata(&rollout_file_path)?.modified()?;
assert_eq!(after_resume_modified, before_modified);
@@ -761,6 +777,7 @@ async fn thread_resume_prefers_path_over_thread_id() -> Result<()> {
} = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(resumed.id, thread.id);
assert_eq!(resumed.path, thread.path);
assert_eq!(resumed.status, ThreadStatus::Idle);
Ok(())
}
@@ -809,6 +826,7 @@ async fn thread_resume_supports_history_and_overrides() -> Result<()> {
assert!(!resumed.id.is_empty());
assert_eq!(model_provider, "mock_provider");
assert_eq!(resumed.preview, history_text);
assert_eq!(resumed.status, ThreadStatus::Idle);
Ok(())
}
@@ -951,6 +969,7 @@ async fn thread_resume_accepts_personality_override() -> Result<()> {
)
.await??;
let resume: ThreadResumeResponse = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(resume.thread.status, ThreadStatus::Idle);
let turn_id = secondary
.send_turn_start_request(TurnStartParams {

View File

@@ -12,6 +12,7 @@ use codex_app_server_protocol::ThreadRollbackParams;
use codex_app_server_protocol::ThreadRollbackResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::UserInput as V2UserInput;
use pretty_assertions::assert_eq;
@@ -111,6 +112,7 @@ async fn thread_rollback_drops_last_turns_and_persists_to_rollout() -> Result<()
} = to_response::<ThreadRollbackResponse>(rollback_resp)?;
assert_eq!(rolled_back_thread.turns.len(), 1);
assert_eq!(rolled_back_thread.status, ThreadStatus::Idle);
assert_eq!(rolled_back_thread.turns[0].items.len(), 2);
match &rolled_back_thread.turns[0].items[0] {
ThreadItem::UserMessage { content, .. } => {
@@ -140,6 +142,7 @@ async fn thread_rollback_drops_last_turns_and_persists_to_rollout() -> Result<()
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(thread.turns.len(), 1);
assert_eq!(thread.status, ThreadStatus::Idle);
assert_eq!(thread.turns[0].items.len(), 2);
match &thread.turns[0].items[0] {
ThreadItem::UserMessage { content, .. } => {

View File

@@ -9,6 +9,7 @@ use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::ThreadStatus;
use codex_core::config::set_project_trust_level;
use codex_protocol::config_types::TrustLevel;
use codex_protocol::openai_models::ReasoningEffort;
@@ -59,6 +60,7 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
thread.created_at > 0,
"created_at should be a positive UNIX timestamp"
);
assert_eq!(thread.status, ThreadStatus::Idle);
let thread_path = thread.path.clone().expect("thread path should be present");
assert!(thread_path.is_absolute(), "thread path should be absolute");
assert!(

View File

@@ -0,0 +1,240 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_sequence;
use app_test_support::to_response;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::InitializeCapabilities;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadStatusChangedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn thread_status_changed_emits_runtime_updates() -> Result<()> {
let codex_home = TempDir::new()?;
let responses = vec![create_final_assistant_message_sse_response("done")?];
let server = create_mock_responses_server_sequence(responses).await;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp =
McpProcess::new_with_env(codex_home.path(), &[("RUST_LOG", Some("info"))]).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response(thread_start_resp)?;
let turn_start_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "collect status updates".to_string(),
text_elements: Vec::new(),
}],
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let turn_start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_start_id)),
)
.await??;
let _: TurnStartResponse = to_response(turn_start_resp)?;
let mut saw_active_running = false;
let mut saw_idle_after_turn = false;
let deadline = tokio::time::Instant::now() + DEFAULT_READ_TIMEOUT;
while tokio::time::Instant::now() < deadline {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
let message = match timeout(remaining, mcp.read_next_message()).await {
Ok(Ok(message)) => message,
_ => break,
};
match message {
JSONRPCMessage::Notification(JSONRPCNotification {
method,
params: Some(params),
}) if method == "thread/status/changed" => {
let notification: ThreadStatusChangedNotification = serde_json::from_value(params)?;
if notification.thread_id != thread.id {
continue;
}
match notification.status {
ThreadStatus::Active { .. } => {
saw_active_running = true;
}
ThreadStatus::Idle => {
if saw_active_running {
saw_idle_after_turn = true;
}
}
ThreadStatus::SystemError => {
if saw_active_running {
saw_idle_after_turn = true;
}
}
ThreadStatus::NotLoaded => {
if saw_active_running {
saw_idle_after_turn = true;
}
}
}
}
_ => {}
}
if saw_active_running && saw_idle_after_turn {
break;
}
}
assert!(
saw_active_running,
"expected running active flag in thread/status/changed notifications"
);
assert!(
saw_idle_after_turn,
"expected idle status after turn completion in thread/status/changed notifications"
);
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
Ok(())
}
#[tokio::test]
async fn thread_status_changed_can_be_opted_out() -> Result<()> {
let codex_home = TempDir::new()?;
let responses = vec![create_final_assistant_message_sse_response("done")?];
let server = create_mock_responses_server_sequence(responses).await;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
let message = timeout(
DEFAULT_READ_TIMEOUT,
mcp.initialize_with_capabilities(
ClientInfo {
name: "codex_vscode".to_string(),
title: Some("Codex VS Code Extension".to_string()),
version: "0.1.0".to_string(),
},
Some(InitializeCapabilities {
experimental_api: true,
opt_out_notification_methods: Some(vec!["thread/status/changed".to_string()]),
}),
),
)
.await??;
let JSONRPCMessage::Response(_) = message else {
anyhow::bail!("expected initialize response, got {message:?}");
};
let thread_start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response(thread_start_resp)?;
let turn_start_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
input: vec![V2UserInput::Text {
text: "run once".to_string(),
text_elements: Vec::new(),
}],
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let turn_start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_start_id)),
)
.await??;
let _: TurnStartResponse = to_response(turn_start_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let status_update = timeout(
std::time::Duration::from_millis(500),
mcp.read_stream_until_notification_message("thread/status/changed"),
)
.await;
match status_update {
Err(_) => {}
Ok(Ok(notification)) => {
anyhow::bail!(
"thread/status/changed should be filtered by optOutNotificationMethods; got: {notification:?}"
);
}
Ok(Err(err)) => {
anyhow::bail!(
"expected timeout waiting for filtered thread/status/changed, got: {err}"
);
}
}
Ok(())
}
fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "untrusted"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[features]
collaboration_modes = true
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}

View File

@@ -8,6 +8,7 @@ use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::ThreadUnarchiveResponse;
use codex_app_server_protocol::ThreadUnarchivedNotification;
@@ -137,6 +138,7 @@ async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result
unarchived_thread.updated_at > old_timestamp,
"expected updated_at to be bumped on unarchive"
);
assert_eq!(unarchived_thread.status, ThreadStatus::NotLoaded);
let rollout_path_display = rollout_path.display();
assert!(