Mark incomplete resumed turns interrupted when idle (#14125)

Fixes a Codex app bug where quitting the app mid-run could leave the
reopened thread stuck in progress and non-interactable. On cold thread
resume, app-server could return an idle thread with a replayed turn
still marked in progress. This marks incomplete replayed turns as
interrupted unless the thread is actually active.
This commit is contained in:
guinness-oai
2026-03-10 10:57:03 -07:00
committed by Michael Bolin
parent c4d35084f5
commit 4ac6042850
2 changed files with 175 additions and 26 deletions

View File

@@ -25,6 +25,8 @@ use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadMetadataGitInfoUpdateParams;
use codex_app_server_protocol::ThreadMetadataUpdateParams;
use codex_app_server_protocol::ThreadReadParams;
use codex_app_server_protocol::ThreadReadResponse;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
@@ -38,9 +40,12 @@ use codex_protocol::ThreadId;
use codex_protocol::config_types::Personality;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::SessionSource as RolloutSessionSource;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::user_input::ByteRange;
use codex_protocol::user_input::TextElement;
use codex_state::StateRuntime;
@@ -398,6 +403,120 @@ stream_max_retries = 0
Ok(())
}
#[tokio::test]
async fn thread_resume_and_read_interrupt_incomplete_rollout_turn_when_thread_is_idle() -> Result<()>
{
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let filename_ts = "2025-01-05T12-00-00";
let meta_rfc3339 = "2025-01-05T12:00:00Z";
let conversation_id = create_fake_rollout_with_text_elements(
codex_home.path(),
filename_ts,
meta_rfc3339,
"Saved user message",
Vec::new(),
Some("mock_provider"),
None,
)?;
let rollout_file_path = rollout_path(codex_home.path(), filename_ts, &conversation_id);
let persisted_rollout = std::fs::read_to_string(&rollout_file_path)?;
let turn_id = "incomplete-turn";
let appended_rollout = [
json!({
"timestamp": meta_rfc3339,
"type": "event_msg",
"payload": serde_json::to_value(EventMsg::TurnStarted(TurnStartedEvent {
turn_id: turn_id.to_string(),
model_context_window: None,
collaboration_mode_kind: Default::default(),
}))?,
})
.to_string(),
json!({
"timestamp": meta_rfc3339,
"type": "event_msg",
"payload": serde_json::to_value(EventMsg::AgentMessage(AgentMessageEvent {
message: "Still running".to_string(),
phase: None,
}))?,
})
.to_string(),
]
.join("\n");
std::fs::write(
&rollout_file_path,
format!("{persisted_rollout}{appended_rollout}\n"),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: conversation_id,
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(thread.status, ThreadStatus::Idle);
assert_eq!(thread.turns.len(), 2);
assert_eq!(thread.turns[0].status, TurnStatus::Completed);
assert_eq!(thread.turns[1].id, turn_id);
assert_eq!(thread.turns[1].status, TurnStatus::Interrupted);
let second_resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id.clone(),
..Default::default()
})
.await?;
let second_resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(second_resume_id)),
)
.await??;
let ThreadResumeResponse {
thread: resumed_again,
..
} = to_response::<ThreadResumeResponse>(second_resume_resp)?;
assert_eq!(resumed_again.status, ThreadStatus::Idle);
assert_eq!(resumed_again.turns.len(), 2);
assert_eq!(resumed_again.turns[1].id, turn_id);
assert_eq!(resumed_again.turns[1].status, TurnStatus::Interrupted);
let read_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: resumed_again.id,
include_turns: true,
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse {
thread: read_thread,
} = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(read_thread.status, ThreadStatus::Idle);
assert_eq!(read_thread.turns.len(), 2);
assert_eq!(read_thread.turns[1].id, turn_id);
assert_eq!(read_thread.turns[1].status, TurnStatus::Interrupted);
Ok(())
}
#[tokio::test]
async fn thread_resume_without_overrides_does_not_change_updated_at_or_mtime() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;