diff --git a/codex-rs/exec/src/lib.rs b/codex-rs/exec/src/lib.rs index 54844d52cb..0dcf7bf1ad 100644 --- a/codex-rs/exec/src/lib.rs +++ b/codex-rs/exec/src/lib.rs @@ -31,8 +31,6 @@ use codex_app_server_protocol::ReviewTarget as ApiReviewTarget; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::Thread as AppServerThread; -use codex_app_server_protocol::ThreadGoalGetParams; -use codex_app_server_protocol::ThreadGoalGetResponse; use codex_app_server_protocol::ThreadGoalStatus; use codex_app_server_protocol::ThreadItem as AppServerThreadItem; use codex_app_server_protocol::ThreadListParams; @@ -53,6 +51,7 @@ use codex_app_server_protocol::TurnInterruptResponse; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::TurnStartedNotification; +use codex_app_server_protocol::TurnStatus; use codex_arg0::Arg0DispatchPaths; use codex_cloud_requirements::cloud_requirements_loader_for_storage; use codex_config::ConfigLoadError; @@ -653,6 +652,16 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> { prompt_text, ) } else { + if !imgs.is_empty() || !args.images.is_empty() { + anyhow::bail!( + "No prompt provided; images require a prompt for the resumed turn." + ); + } + if output_schema_path.is_some() { + anyhow::bail!( + "No prompt provided; --output-schema requires a prompt for the resumed turn." + ); + } (InitialOperation::FollowActiveGoal, String::new()) } } @@ -697,6 +706,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> { // Handle resume subcommand through existing `thread/list` + `thread/resume` // APIs so exec no longer reaches into rollout storage directly. + let mut active_turn_id_from_resume: Option = None; let (primary_thread_id, fallback_session_configured) = if let Some(ExecCommand::Resume(args)) = command.as_ref() { @@ -713,6 +723,15 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> { ) .await .map_err(anyhow::Error::msg)?; + if matches!(initial_operation, InitialOperation::FollowActiveGoal) { + active_turn_id_from_resume = response + .thread + .turns + .iter() + .rev() + .find(|turn| turn.status == TurnStatus::InProgress) + .map(|turn| turn.id.clone()); + } let session_configured = session_configured_from_thread_resume_response(&response, &config) .map_err(anyhow::Error::msg)?; @@ -756,31 +775,6 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> { let primary_thread_id_for_span = primary_thread_id.to_string(); - if matches!(initial_operation, InitialOperation::FollowActiveGoal) { - let response: ThreadGoalGetResponse = send_request_with_response( - &client, - ClientRequest::ThreadGoalGet { - request_id: request_ids.next(), - params: ThreadGoalGetParams { - thread_id: primary_thread_id_for_span.clone(), - }, - }, - "thread/goal/get", - ) - .await - .map_err(|err| { - anyhow::anyhow!( - "No prompt provided and could not verify an active goal for the resumed thread: {err}" - ) - })?; - anyhow::ensure!( - response - .goal - .is_some_and(|goal| goal.status == ThreadGoalStatus::Active), - "No prompt provided and resumed thread has no active goal. Specify a prompt or pipe one into stdin." - ); - } - // Use the start/resume response as the authoritative bootstrap payload. // Waiting for a later streamed `SessionConfigured` event adds up to 10s of // avoidable startup latency on the in-process path. @@ -809,6 +803,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> { }); let mut error_seen = false; + let mut interrupt_channel_open = true; let task_id = match initial_operation { InitialOperation::UserTurn { items, @@ -871,37 +866,87 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> { info!("Sent review request with event ID: {task_id}"); task_id } - InitialOperation::FollowActiveGoal => loop { - let Some(server_event) = client.next_event().await else { - anyhow::bail!("resumed active goal did not start a turn"); - }; - match server_event { - InProcessServerEvent::ServerRequest(request) => { - handle_server_request(&client, request, &mut error_seen).await; + InitialOperation::FollowActiveGoal => { + let mut active_goal_seen = false; + let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(30); + loop { + if active_goal_seen && let Some(task_id) = active_turn_id_from_resume.take() { + break task_id; } - InProcessServerEvent::ServerNotification(notification) => { - if let ServerNotification::TurnStarted(payload) = ¬ification - && payload.thread_id == primary_thread_id_for_span - { - let task_id = payload.turn.id.clone(); - let _ = event_processor.process_server_notification(notification); - break task_id; + + let server_event = tokio::select! { + maybe_interrupt = interrupt_rx.recv(), if interrupt_channel_open => { + if maybe_interrupt.is_none() { + interrupt_channel_open = false; + continue; + } + if let Err(err) = request_shutdown( + &client, + &mut request_ids, + &primary_thread_id_for_span, + ) + .await + { + warn!("thread/unsubscribe failed during shutdown: {err}"); + } + anyhow::bail!("interrupted while waiting for active goal continuation"); + } + _ = tokio::time::sleep_until(deadline) => { + anyhow::bail!("resumed active goal did not start a turn"); + } + maybe_event = client.next_event() => maybe_event, + }; + + let Some(server_event) = server_event else { + anyhow::bail!("resumed active goal did not start a turn"); + }; + match server_event { + InProcessServerEvent::ServerRequest(request) => { + handle_server_request(&client, request, &mut error_seen).await; + } + InProcessServerEvent::ServerNotification(notification) => match ¬ification { + ServerNotification::ThreadGoalUpdated(payload) + if payload.thread_id == primary_thread_id_for_span => + { + if payload.goal.status == ThreadGoalStatus::Active { + active_goal_seen = true; + } else { + anyhow::bail!( + "No prompt provided and resumed thread has no active goal. Specify a prompt or pipe one into stdin." + ); + } + } + ServerNotification::ThreadGoalCleared(payload) + if payload.thread_id == primary_thread_id_for_span => + { + anyhow::bail!( + "No prompt provided and resumed thread has no active goal. Specify a prompt or pipe one into stdin." + ); + } + ServerNotification::TurnStarted(payload) + if active_goal_seen + && payload.thread_id == primary_thread_id_for_span => + { + let task_id = payload.turn.id.clone(); + let _ = event_processor.process_server_notification(notification); + break task_id; + } + _ => {} + }, + InProcessServerEvent::Lagged { skipped } => { + let message = lagged_event_warning_message(skipped); + warn!("{message}"); + event_processor.process_warning(message); } } - InProcessServerEvent::Lagged { skipped } => { - let message = lagged_event_warning_message(skipped); - warn!("{message}"); - event_processor.process_warning(message); - } } - }, + } }; exec_span.record("turn.id", task_id.as_str()); // Run the loop until the task is complete. // Track whether a fatal error was reported by the server so we can // exit with a non-zero status for automation-friendly signaling. - let mut interrupt_channel_open = true; let primary_thread_id_for_requests = primary_thread_id.to_string(); loop { let server_event = tokio::select! { diff --git a/codex-rs/exec/tests/suite/resume.rs b/codex-rs/exec/tests/suite/resume.rs index 904c6a8f19..eb8eb39704 100644 --- a/codex-rs/exec/tests/suite/resume.rs +++ b/codex-rs/exec/tests/suite/resume.rs @@ -127,6 +127,99 @@ async fn mount_exec_responses( responses::mount_sse_sequence(server, (0..count).map(exec_sse_response).collect()).await } +fn goal_response_sse_response(response_id: &str, message: &str) -> String { + responses::sse(vec![ + responses::ev_response_created(response_id), + responses::ev_assistant_message(response_id, message), + responses::ev_completed(response_id), + ]) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exec_resume_last_without_prompt_follows_active_goal() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let test = test_codex_exec(); + let server = MockServer::start().await; + let response_mock = responses::mount_sse_sequence( + &server, + vec![ + responses::sse(vec![ + responses::ev_response_created("resp-create-goal"), + responses::ev_function_call( + "call-create-goal", + "create_goal", + r#"{"objective":"manual promptless resume test"}"#, + ), + responses::ev_completed("resp-create-goal"), + ]), + goal_response_sse_response("resp-seed-goal", "seed goal is active"), + goal_response_sse_response("resp-resume-goal", "goal resumed without prompt"), + ], + ) + .await; + let repo_root = exec_repo_root()?; + + test.cmd_with_server(&server) + .arg("--skip-git-repo-check") + .arg("-C") + .arg(&repo_root) + .arg("create an active goal") + .assert() + .success(); + + test.cmd_with_server(&server) + .arg("--skip-git-repo-check") + .arg("-C") + .arg(&repo_root) + .arg("resume") + .arg("--last") + .assert() + .success(); + + let requests = response_mock.requests(); + assert_eq!(requests.len(), 3); + assert!(requests[2].body_contains_text("")); + assert!(requests[2].body_contains_text("Continue working toward the active thread goal.")); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exec_resume_last_without_prompt_rejects_thread_without_active_goal() -> anyhow::Result<()> +{ + skip_if_no_network!(Ok(())); + + let test = test_codex_exec(); + let server = MockServer::start().await; + let response_mock = mount_exec_responses(&server, /*count*/ 1).await; + let repo_root = exec_repo_root()?; + + test.cmd_with_server(&server) + .arg("--skip-git-repo-check") + .arg("-C") + .arg(&repo_root) + .arg("seed a normal session") + .assert() + .success(); + + let output = test + .cmd_with_server(&server) + .arg("--skip-git-repo-check") + .arg("-C") + .arg(&repo_root) + .arg("resume") + .arg("--last") + .assert() + .failure() + .get_output() + .clone(); + + let stderr = String::from_utf8_lossy(&output.stderr); + assert!(stderr.contains("resumed thread has no active goal")); + assert_eq!(response_mock.requests().len(), 1); + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn exec_resume_last_appends_to_existing_file() -> anyhow::Result<()> { skip_if_no_network!(Ok(()));