Compare commits

...

7 Commits

Author SHA1 Message Date
Eric Traut
361e633ba2 codex: address PR review feedback (#24321) 2026-05-24 03:23:34 -07:00
Eric Traut
d73829e118 codex: address PR review feedback (#24321) 2026-05-24 03:09:32 -07:00
Eric Traut
7f3448e1ff codex: address PR review feedback (#24321) 2026-05-24 03:02:54 -07:00
Eric Traut
7a9aa1ae5a codex: simplify active goal resume rejoin (#24321) 2026-05-24 02:55:02 -07:00
Eric Traut
cbd26eea41 codex: address PR review feedback (#24321) 2026-05-24 02:42:36 -07:00
Eric Traut
a7e3211614 codex: address PR review feedback (#24321) 2026-05-24 02:37:01 -07:00
Eric Traut
7814ff329c fix(exec): resume active goals without prompt
Fixes #24016.
2026-05-24 02:21:47 -07:00
2 changed files with 304 additions and 33 deletions

View File

@@ -21,6 +21,7 @@ use codex_app_server_client::InProcessClientStartArgs;
use codex_app_server_client::InProcessServerEvent;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::McpServerElicitationAction;
use codex_app_server_protocol::McpServerElicitationRequestResponse;
@@ -31,6 +32,7 @@ use codex_app_server_protocol::ReviewTarget as ApiReviewTarget;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::Thread as AppServerThread;
use codex_app_server_protocol::ThreadGoalStatus;
use codex_app_server_protocol::ThreadItem as AppServerThreadItem;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
@@ -43,13 +45,19 @@ use codex_app_server_protocol::ThreadSource;
use codex_app_server_protocol::ThreadSourceKind;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadTurnsListParams;
use codex_app_server_protocol::ThreadTurnsListResponse;
use codex_app_server_protocol::ThreadUnsubscribeParams;
use codex_app_server_protocol::ThreadUnsubscribeResponse;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnInterruptParams;
use codex_app_server_protocol::TurnInterruptResponse;
use codex_app_server_protocol::TurnItemsView;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_arg0::Arg0DispatchPaths;
use codex_cloud_requirements::cloud_requirements_loader_for_storage;
use codex_config::ConfigLoadError;
@@ -164,8 +172,11 @@ enum InitialOperation {
Review {
review_request: ReviewRequest,
},
FollowActiveGoal,
}
const NO_ACTIVE_GOAL_PROMPTLESS_RESUME_ERROR: &str = "No prompt provided and resumed thread has no active goal. Specify a prompt or pipe one into stdin.";
enum StdinPromptBehavior {
/// Read stdin only when there is no positional prompt, which is the legacy
/// `codex exec` behavior for `codex exec` with piped input.
@@ -173,8 +184,7 @@ enum StdinPromptBehavior {
/// Always treat stdin as the prompt, used for the explicit `codex exec -`
/// sentinel and similar forced-stdin call sites.
Forced,
/// If stdin is piped alongside a positional prompt, treat stdin as
/// additional context to append rather than as the primary prompt.
/// Read stdin when it is piped and ignore terminal or empty stdin.
OptionalAppend,
}
@@ -627,25 +637,41 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
}
})
.or(root_prompt);
let prompt_text = resolve_prompt(prompt_arg);
let mut items: Vec<UserInput> = imgs
.into_iter()
.chain(args.images.iter().cloned())
.map(|path| UserInput::LocalImage { path, detail: None })
.collect();
items.push(UserInput::Text {
text: prompt_text.clone(),
// CLI input doesn't track UI element ranges, so none are available here.
text_elements: Vec::new(),
});
let output_schema = load_output_schema(output_schema_path.clone());
(
InitialOperation::UserTurn {
items,
output_schema,
},
prompt_text,
)
let prompt_text = prompt_arg
.map(|prompt| resolve_prompt(Some(prompt)))
.or_else(|| read_prompt_from_stdin(StdinPromptBehavior::OptionalAppend));
if let Some(prompt_text) = prompt_text {
let mut items: Vec<UserInput> = imgs
.into_iter()
.chain(args.images.iter().cloned())
.map(|path| UserInput::LocalImage { path, detail: None })
.collect();
items.push(UserInput::Text {
text: prompt_text.clone(),
// CLI input doesn't track UI element ranges, so none are available here.
text_elements: Vec::new(),
});
let output_schema = load_output_schema(output_schema_path);
(
InitialOperation::UserTurn {
items,
output_schema,
},
prompt_text,
)
} else {
if !imgs.is_empty() || !args.images.is_empty() {
anyhow::bail!(
"No prompt provided; images require a prompt for the resumed turn."
);
}
if output_schema_path.is_some() {
anyhow::bail!(
"No prompt provided; --output-schema requires a prompt for the resumed turn."
);
}
(InitialOperation::FollowActiveGoal, String::new())
}
}
(None, root_prompt, imgs) => {
let prompt_text = resolve_root_prompt(root_prompt);
@@ -688,6 +714,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
// Handle resume subcommand through existing `thread/list` + `thread/resume`
// APIs so exec no longer reaches into rollout storage directly.
let mut running_turn_id_from_resume = None;
let (primary_thread_id, fallback_session_configured) = if let Some(ExecCommand::Resume(args)) =
command.as_ref()
{
@@ -704,11 +731,25 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
)
.await
.map_err(anyhow::Error::msg)?;
if matches!(initial_operation, InitialOperation::FollowActiveGoal) {
running_turn_id_from_resume = response
.thread
.turns
.iter()
.rev()
.find(|turn| turn.status == TurnStatus::InProgress)
.map(|turn| turn.id.clone());
}
let session_configured =
session_configured_from_thread_resume_response(&response, &config)
.map_err(anyhow::Error::msg)?;
(session_configured.thread_id, session_configured)
} else {
if matches!(initial_operation, InitialOperation::FollowActiveGoal) {
anyhow::bail!(
"No prompt provided and no session was resumed. Specify a prompt or pipe one into stdin."
);
}
let response: ThreadStartResponse = send_request_with_response(
&client,
ClientRequest::ThreadStart {
@@ -741,6 +782,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
};
let primary_thread_id_for_span = primary_thread_id.to_string();
// Use the start/resume response as the authoritative bootstrap payload.
// Waiting for a later streamed `SessionConfigured` event adds up to 10s of
// avoidable startup latency on the in-process path.
@@ -768,7 +810,9 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
}
});
let task_id = match initial_operation {
let mut error_seen = false;
let mut interrupt_channel_open = true;
let (task_id, wait_for_completion) = match initial_operation {
InitialOperation::UserTurn {
items,
output_schema,
@@ -803,7 +847,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
.map_err(anyhow::Error::msg)?;
let task_id = response.turn.id;
info!("Sent prompt with event ID: {task_id}");
task_id
(task_id, true)
}
InitialOperation::Review { review_request } => {
let response: ReviewStartResponse = send_request_with_response(
@@ -820,15 +864,136 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
)
.await
.map_err(anyhow::Error::msg)?;
let _ = event_processor.process_server_notification(ServerNotification::TurnStarted(
TurnStartedNotification {
thread_id: response.review_thread_id.clone(),
turn: response.turn.clone(),
},
));
let task_id = response.turn.id;
let task_id = process_turn_started(
event_processor.as_mut(),
response.review_thread_id.clone(),
response.turn,
);
info!("Sent review request with event ID: {task_id}");
task_id
(task_id, true)
}
InitialOperation::FollowActiveGoal => {
let mut active_goal_seen = false;
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(30);
loop {
if active_goal_seen
&& let Some(running_turn_id) = running_turn_id_from_resume.take()
{
let response: ThreadTurnsListResponse = send_request_with_response(
&client,
ClientRequest::ThreadTurnsList {
request_id: request_ids.next(),
params: ThreadTurnsListParams {
thread_id: primary_thread_id_for_span.clone(),
cursor: None,
limit: Some(/*limit*/ 10),
sort_direction: None,
items_view: Some(TurnItemsView::Full),
},
},
"thread/turns/list",
)
.await
.map_err(anyhow::Error::msg)?;
if let Some(turn) = response
.data
.into_iter()
.find(|turn| turn.id == running_turn_id)
{
let status = turn.status.clone();
let task_id = process_turn_started(
event_processor.as_mut(),
primary_thread_id_for_span.clone(),
turn.clone(),
);
if status == TurnStatus::InProgress {
break (task_id, true);
}
for item in turn.items.iter().cloned() {
let _ = event_processor.process_server_notification(
ServerNotification::ItemCompleted(ItemCompletedNotification {
item,
thread_id: primary_thread_id_for_span.clone(),
turn_id: task_id.clone(),
completed_at_ms: 0,
}),
);
}
let _ = event_processor.process_server_notification(
ServerNotification::TurnCompleted(TurnCompletedNotification {
thread_id: primary_thread_id_for_span.clone(),
turn,
}),
);
if matches!(status, TurnStatus::Failed | TurnStatus::Interrupted) {
error_seen = true;
}
break (task_id, false);
}
}
let server_event = tokio::select! {
maybe_interrupt = interrupt_rx.recv(), if interrupt_channel_open => {
if maybe_interrupt.is_none() {
interrupt_channel_open = false;
continue;
}
if let Err(err) = request_shutdown(
&client,
&mut request_ids,
&primary_thread_id_for_span,
)
.await
{
warn!("thread/unsubscribe failed during shutdown: {err}");
}
anyhow::bail!("interrupted while waiting for active goal continuation");
}
_ = tokio::time::sleep_until(deadline) => {
anyhow::bail!("resumed active goal did not start a turn");
}
maybe_event = client.next_event() => maybe_event,
};
let Some(server_event) = server_event else {
anyhow::bail!("resumed active goal did not start a turn");
};
match server_event {
InProcessServerEvent::ServerRequest(request) => {
handle_server_request(&client, request, &mut error_seen).await;
}
InProcessServerEvent::ServerNotification(notification) => match &notification {
ServerNotification::ThreadGoalUpdated(payload)
if payload.thread_id == primary_thread_id_for_span =>
{
if payload.goal.status == ThreadGoalStatus::Active {
active_goal_seen = true;
} else {
anyhow::bail!(NO_ACTIVE_GOAL_PROMPTLESS_RESUME_ERROR);
}
}
ServerNotification::ThreadGoalCleared(payload)
if payload.thread_id == primary_thread_id_for_span =>
{
anyhow::bail!(NO_ACTIVE_GOAL_PROMPTLESS_RESUME_ERROR);
}
ServerNotification::TurnStarted(payload)
if active_goal_seen
&& payload.thread_id == primary_thread_id_for_span =>
{
let task_id = payload.turn.id.clone();
let _ = event_processor.process_server_notification(notification);
break (task_id, true);
}
_ => {}
},
InProcessServerEvent::Lagged { skipped } => {
let message = lagged_event_warning_message(skipped);
warn!("{message}");
event_processor.process_warning(message);
}
}
}
}
};
exec_span.record("turn.id", task_id.as_str());
@@ -836,9 +1001,18 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
// Run the loop until the task is complete.
// Track whether a fatal error was reported by the server so we can
// exit with a non-zero status for automation-friendly signaling.
let mut error_seen = false;
let mut interrupt_channel_open = true;
let primary_thread_id_for_requests = primary_thread_id.to_string();
if !wait_for_completion {
if let Err(err) = client.shutdown().await {
warn!("in-process app-server shutdown failed: {err}");
}
event_processor.print_final_output();
if error_seen {
std::process::exit(1);
}
return Ok(());
}
loop {
let server_event = tokio::select! {
maybe_interrupt = interrupt_rx.recv(), if interrupt_channel_open => {
@@ -943,6 +1117,18 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
Ok(())
}
fn process_turn_started(
event_processor: &mut dyn EventProcessor,
thread_id: String,
turn: Turn,
) -> String {
let task_id = turn.id.clone();
let _ = event_processor.process_server_notification(ServerNotification::TurnStarted(
TurnStartedNotification { thread_id, turn },
));
task_id
}
fn thread_start_params_from_config(config: &Config) -> ThreadStartParams {
let permissions = permissions_selection_from_config(config);
let sandbox = permissions.is_none().then(|| {

View File

@@ -127,6 +127,91 @@ async fn mount_exec_responses(
responses::mount_sse_sequence(server, (0..count).map(exec_sse_response).collect()).await
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_resume_last_without_prompt_follows_active_goal() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let test = test_codex_exec();
let server = MockServer::start().await;
let response_mock = responses::mount_sse_sequence(
&server,
vec![
responses::sse(vec![
responses::ev_response_created("resp-create-goal"),
responses::ev_function_call(
"call-create-goal",
"create_goal",
r#"{"objective":"manual promptless resume test"}"#,
),
responses::ev_completed("resp-create-goal"),
]),
exec_sse_response(/*index*/ 1),
exec_sse_response(/*index*/ 2),
],
)
.await;
let repo_root = exec_repo_root()?;
test.cmd_with_server(&server)
.arg("--skip-git-repo-check")
.arg("-C")
.arg(&repo_root)
.arg("create an active goal")
.assert()
.success();
test.cmd_with_server(&server)
.arg("--skip-git-repo-check")
.arg("-C")
.arg(&repo_root)
.arg("resume")
.arg("--last")
.assert()
.success();
let requests = response_mock.requests();
assert_eq!(requests.len(), 3);
assert!(requests[2].body_contains_text("<goal_context>"));
assert!(requests[2].body_contains_text("Continue working toward the active thread goal."));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_resume_last_without_prompt_rejects_thread_without_active_goal() -> anyhow::Result<()>
{
skip_if_no_network!(Ok(()));
let test = test_codex_exec();
let server = MockServer::start().await;
let response_mock = mount_exec_responses(&server, /*count*/ 1).await;
let repo_root = exec_repo_root()?;
test.cmd_with_server(&server)
.arg("--skip-git-repo-check")
.arg("-C")
.arg(&repo_root)
.arg("seed a normal session")
.assert()
.success();
let output = test
.cmd_with_server(&server)
.arg("--skip-git-repo-check")
.arg("-C")
.arg(&repo_root)
.arg("resume")
.arg("--last")
.assert()
.failure()
.get_output()
.clone();
let stderr = String::from_utf8_lossy(&output.stderr);
assert!(stderr.contains("resumed thread has no active goal"));
assert_eq!(response_mock.requests().len(), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn exec_resume_last_appends_to_existing_file() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));