mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
Mark incomplete resumed turns interrupted when idle (#14125)
Fixes a Codex app bug where quitting the app mid-run could leave the reopened thread stuck in progress and non-interactable. On cold thread resume, app-server could return an idle thread with a replayed turn still marked in progress. This marks incomplete replayed turns as interrupted unless the thread is actually active.
This commit is contained in:
@@ -3071,7 +3071,7 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let Some(thread) = loaded_thread else {
|
||||
let Some(thread) = loaded_thread.as_ref() else {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("thread not loaded: {thread_uuid}"),
|
||||
@@ -3125,11 +3125,21 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
thread.status = resolve_thread_status(
|
||||
self.thread_watch_manager
|
||||
.loaded_status_for_thread(&thread.id)
|
||||
.await,
|
||||
false,
|
||||
let has_live_in_progress_turn = if let Some(loaded_thread) = loaded_thread.as_ref() {
|
||||
matches!(loaded_thread.agent_status().await, AgentStatus::Running)
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
let thread_status = self
|
||||
.thread_watch_manager
|
||||
.loaded_status_for_thread(&thread.id)
|
||||
.await;
|
||||
|
||||
set_thread_status_and_interrupt_stale_turns(
|
||||
&mut thread,
|
||||
thread_status,
|
||||
has_live_in_progress_turn,
|
||||
);
|
||||
let response = ThreadReadResponse { thread };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
@@ -3337,12 +3347,12 @@ impl CodexMessageProcessor {
|
||||
.upsert_thread(thread.clone())
|
||||
.await;
|
||||
|
||||
thread.status = resolve_thread_status(
|
||||
self.thread_watch_manager
|
||||
.loaded_status_for_thread(&thread.id)
|
||||
.await,
|
||||
false,
|
||||
);
|
||||
let thread_status = self
|
||||
.thread_watch_manager
|
||||
.loaded_status_for_thread(&thread.id)
|
||||
.await;
|
||||
|
||||
set_thread_status_and_interrupt_stale_turns(&mut thread, thread_status, false);
|
||||
|
||||
let response = ThreadResumeResponse {
|
||||
thread,
|
||||
@@ -6493,6 +6503,7 @@ impl CodexMessageProcessor {
|
||||
};
|
||||
handle_thread_listener_command(
|
||||
conversation_id,
|
||||
&conversation,
|
||||
codex_home.as_path(),
|
||||
&thread_state_manager,
|
||||
&thread_state,
|
||||
@@ -6862,8 +6873,10 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn handle_thread_listener_command(
|
||||
conversation_id: ThreadId,
|
||||
conversation: &Arc<CodexThread>,
|
||||
codex_home: &Path,
|
||||
thread_state_manager: &ThreadStateManager,
|
||||
thread_state: &Arc<Mutex<ThreadState>>,
|
||||
@@ -6875,6 +6888,7 @@ async fn handle_thread_listener_command(
|
||||
ThreadListenerCommand::SendThreadResumeResponse(resume_request) => {
|
||||
handle_pending_thread_resume_request(
|
||||
conversation_id,
|
||||
conversation,
|
||||
codex_home,
|
||||
thread_state_manager,
|
||||
thread_state,
|
||||
@@ -6900,8 +6914,10 @@ async fn handle_thread_listener_command(
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn handle_pending_thread_resume_request(
|
||||
conversation_id: ThreadId,
|
||||
conversation: &Arc<CodexThread>,
|
||||
codex_home: &Path,
|
||||
thread_state_manager: &ThreadStateManager,
|
||||
thread_state: &Arc<Mutex<ThreadState>>,
|
||||
@@ -6921,9 +6937,11 @@ async fn handle_pending_thread_resume_request(
|
||||
active_turn_status = ?active_turn.as_ref().map(|turn| &turn.status),
|
||||
"composing running thread resume response"
|
||||
);
|
||||
let mut has_in_progress_turn = active_turn
|
||||
.as_ref()
|
||||
.is_some_and(|turn| matches!(turn.status, TurnStatus::InProgress));
|
||||
let has_live_in_progress_turn =
|
||||
matches!(conversation.agent_status().await, AgentStatus::Running)
|
||||
|| active_turn
|
||||
.as_ref()
|
||||
.is_some_and(|turn| matches!(turn.status, TurnStatus::InProgress));
|
||||
|
||||
let request_id = pending.request_id;
|
||||
let connection_id = request_id.connection_id;
|
||||
@@ -6948,19 +6966,15 @@ async fn handle_pending_thread_resume_request(
|
||||
return;
|
||||
}
|
||||
|
||||
has_in_progress_turn = has_in_progress_turn
|
||||
|| thread
|
||||
.turns
|
||||
.iter()
|
||||
.any(|turn| matches!(turn.status, TurnStatus::InProgress));
|
||||
let thread_status = thread_watch_manager
|
||||
.loaded_status_for_thread(&thread.id)
|
||||
.await;
|
||||
|
||||
let status = resolve_thread_status(
|
||||
thread_watch_manager
|
||||
.loaded_status_for_thread(&thread.id)
|
||||
.await,
|
||||
has_in_progress_turn,
|
||||
set_thread_status_and_interrupt_stale_turns(
|
||||
&mut thread,
|
||||
thread_status,
|
||||
has_live_in_progress_turn,
|
||||
);
|
||||
thread.status = status;
|
||||
|
||||
match find_thread_name_by_id(codex_home, &conversation_id).await {
|
||||
Ok(thread_name) => thread.name = thread_name,
|
||||
@@ -7058,6 +7072,22 @@ fn merge_turn_history_with_active_turn(turns: &mut Vec<Turn>, active_turn: Turn)
|
||||
turns.push(active_turn);
|
||||
}
|
||||
|
||||
fn set_thread_status_and_interrupt_stale_turns(
|
||||
thread: &mut Thread,
|
||||
loaded_status: ThreadStatus,
|
||||
has_live_in_progress_turn: bool,
|
||||
) {
|
||||
let status = resolve_thread_status(loaded_status, has_live_in_progress_turn);
|
||||
if !matches!(status, ThreadStatus::Active { .. }) {
|
||||
for turn in &mut thread.turns {
|
||||
if matches!(turn.status, TurnStatus::InProgress) {
|
||||
turn.status = TurnStatus::Interrupted;
|
||||
}
|
||||
}
|
||||
}
|
||||
thread.status = status;
|
||||
}
|
||||
|
||||
fn collect_resume_override_mismatches(
|
||||
request: &ThreadResumeParams,
|
||||
config_snapshot: &ThreadConfigSnapshot,
|
||||
|
||||
@@ -25,6 +25,8 @@ use codex_app_server_protocol::SessionSource;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadMetadataGitInfoUpdateParams;
|
||||
use codex_app_server_protocol::ThreadMetadataUpdateParams;
|
||||
use codex_app_server_protocol::ThreadReadParams;
|
||||
use codex_app_server_protocol::ThreadReadResponse;
|
||||
use codex_app_server_protocol::ThreadResumeParams;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
@@ -38,9 +40,12 @@ use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::Personality;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::AgentMessageEvent;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource as RolloutSessionSource;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use codex_protocol::user_input::ByteRange;
|
||||
use codex_protocol::user_input::TextElement;
|
||||
use codex_state::StateRuntime;
|
||||
@@ -398,6 +403,120 @@ stream_max_retries = 0
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_and_read_interrupt_incomplete_rollout_turn_when_thread_is_idle() -> Result<()>
|
||||
{
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let filename_ts = "2025-01-05T12-00-00";
|
||||
let meta_rfc3339 = "2025-01-05T12:00:00Z";
|
||||
let conversation_id = create_fake_rollout_with_text_elements(
|
||||
codex_home.path(),
|
||||
filename_ts,
|
||||
meta_rfc3339,
|
||||
"Saved user message",
|
||||
Vec::new(),
|
||||
Some("mock_provider"),
|
||||
None,
|
||||
)?;
|
||||
let rollout_file_path = rollout_path(codex_home.path(), filename_ts, &conversation_id);
|
||||
let persisted_rollout = std::fs::read_to_string(&rollout_file_path)?;
|
||||
let turn_id = "incomplete-turn";
|
||||
let appended_rollout = [
|
||||
json!({
|
||||
"timestamp": meta_rfc3339,
|
||||
"type": "event_msg",
|
||||
"payload": serde_json::to_value(EventMsg::TurnStarted(TurnStartedEvent {
|
||||
turn_id: turn_id.to_string(),
|
||||
model_context_window: None,
|
||||
collaboration_mode_kind: Default::default(),
|
||||
}))?,
|
||||
})
|
||||
.to_string(),
|
||||
json!({
|
||||
"timestamp": meta_rfc3339,
|
||||
"type": "event_msg",
|
||||
"payload": serde_json::to_value(EventMsg::AgentMessage(AgentMessageEvent {
|
||||
message: "Still running".to_string(),
|
||||
phase: None,
|
||||
}))?,
|
||||
})
|
||||
.to_string(),
|
||||
]
|
||||
.join("\n");
|
||||
std::fs::write(
|
||||
&rollout_file_path,
|
||||
format!("{persisted_rollout}{appended_rollout}\n"),
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let resume_id = mcp
|
||||
.send_thread_resume_request(ThreadResumeParams {
|
||||
thread_id: conversation_id,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let resume_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
|
||||
|
||||
assert_eq!(thread.status, ThreadStatus::Idle);
|
||||
assert_eq!(thread.turns.len(), 2);
|
||||
assert_eq!(thread.turns[0].status, TurnStatus::Completed);
|
||||
assert_eq!(thread.turns[1].id, turn_id);
|
||||
assert_eq!(thread.turns[1].status, TurnStatus::Interrupted);
|
||||
|
||||
let second_resume_id = mcp
|
||||
.send_thread_resume_request(ThreadResumeParams {
|
||||
thread_id: thread.id.clone(),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let second_resume_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(second_resume_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadResumeResponse {
|
||||
thread: resumed_again,
|
||||
..
|
||||
} = to_response::<ThreadResumeResponse>(second_resume_resp)?;
|
||||
|
||||
assert_eq!(resumed_again.status, ThreadStatus::Idle);
|
||||
assert_eq!(resumed_again.turns.len(), 2);
|
||||
assert_eq!(resumed_again.turns[1].id, turn_id);
|
||||
assert_eq!(resumed_again.turns[1].status, TurnStatus::Interrupted);
|
||||
|
||||
let read_id = mcp
|
||||
.send_thread_read_request(ThreadReadParams {
|
||||
thread_id: resumed_again.id,
|
||||
include_turns: true,
|
||||
})
|
||||
.await?;
|
||||
let read_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadReadResponse {
|
||||
thread: read_thread,
|
||||
} = to_response::<ThreadReadResponse>(read_resp)?;
|
||||
|
||||
assert_eq!(read_thread.status, ThreadStatus::Idle);
|
||||
assert_eq!(read_thread.turns.len(), 2);
|
||||
assert_eq!(read_thread.turns[1].id, turn_id);
|
||||
assert_eq!(read_thread.turns[1].status, TurnStatus::Interrupted);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_without_overrides_does_not_change_updated_at_or_mtime() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
|
||||
Reference in New Issue
Block a user