mirror of
https://github.com/openai/codex.git
synced 2026-06-02 19:31:59 +00:00
codex: address PR review feedback (#24321)
This commit is contained in:
@@ -31,8 +31,6 @@ use codex_app_server_protocol::ReviewTarget as ApiReviewTarget;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::Thread as AppServerThread;
|
||||
use codex_app_server_protocol::ThreadGoalGetParams;
|
||||
use codex_app_server_protocol::ThreadGoalGetResponse;
|
||||
use codex_app_server_protocol::ThreadGoalStatus;
|
||||
use codex_app_server_protocol::ThreadItem as AppServerThreadItem;
|
||||
use codex_app_server_protocol::ThreadListParams;
|
||||
@@ -53,6 +51,7 @@ use codex_app_server_protocol::TurnInterruptResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::TurnStartedNotification;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_cloud_requirements::cloud_requirements_loader_for_storage;
|
||||
use codex_config::ConfigLoadError;
|
||||
@@ -653,6 +652,16 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
prompt_text,
|
||||
)
|
||||
} else {
|
||||
if !imgs.is_empty() || !args.images.is_empty() {
|
||||
anyhow::bail!(
|
||||
"No prompt provided; images require a prompt for the resumed turn."
|
||||
);
|
||||
}
|
||||
if output_schema_path.is_some() {
|
||||
anyhow::bail!(
|
||||
"No prompt provided; --output-schema requires a prompt for the resumed turn."
|
||||
);
|
||||
}
|
||||
(InitialOperation::FollowActiveGoal, String::new())
|
||||
}
|
||||
}
|
||||
@@ -697,6 +706,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
|
||||
// Handle resume subcommand through existing `thread/list` + `thread/resume`
|
||||
// APIs so exec no longer reaches into rollout storage directly.
|
||||
let mut active_turn_id_from_resume: Option<String> = None;
|
||||
let (primary_thread_id, fallback_session_configured) = if let Some(ExecCommand::Resume(args)) =
|
||||
command.as_ref()
|
||||
{
|
||||
@@ -713,6 +723,15 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
)
|
||||
.await
|
||||
.map_err(anyhow::Error::msg)?;
|
||||
if matches!(initial_operation, InitialOperation::FollowActiveGoal) {
|
||||
active_turn_id_from_resume = response
|
||||
.thread
|
||||
.turns
|
||||
.iter()
|
||||
.rev()
|
||||
.find(|turn| turn.status == TurnStatus::InProgress)
|
||||
.map(|turn| turn.id.clone());
|
||||
}
|
||||
let session_configured =
|
||||
session_configured_from_thread_resume_response(&response, &config)
|
||||
.map_err(anyhow::Error::msg)?;
|
||||
@@ -756,31 +775,6 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
|
||||
let primary_thread_id_for_span = primary_thread_id.to_string();
|
||||
|
||||
if matches!(initial_operation, InitialOperation::FollowActiveGoal) {
|
||||
let response: ThreadGoalGetResponse = send_request_with_response(
|
||||
&client,
|
||||
ClientRequest::ThreadGoalGet {
|
||||
request_id: request_ids.next(),
|
||||
params: ThreadGoalGetParams {
|
||||
thread_id: primary_thread_id_for_span.clone(),
|
||||
},
|
||||
},
|
||||
"thread/goal/get",
|
||||
)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
anyhow::anyhow!(
|
||||
"No prompt provided and could not verify an active goal for the resumed thread: {err}"
|
||||
)
|
||||
})?;
|
||||
anyhow::ensure!(
|
||||
response
|
||||
.goal
|
||||
.is_some_and(|goal| goal.status == ThreadGoalStatus::Active),
|
||||
"No prompt provided and resumed thread has no active goal. Specify a prompt or pipe one into stdin."
|
||||
);
|
||||
}
|
||||
|
||||
// Use the start/resume response as the authoritative bootstrap payload.
|
||||
// Waiting for a later streamed `SessionConfigured` event adds up to 10s of
|
||||
// avoidable startup latency on the in-process path.
|
||||
@@ -809,6 +803,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
});
|
||||
|
||||
let mut error_seen = false;
|
||||
let mut interrupt_channel_open = true;
|
||||
let task_id = match initial_operation {
|
||||
InitialOperation::UserTurn {
|
||||
items,
|
||||
@@ -871,37 +866,87 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
info!("Sent review request with event ID: {task_id}");
|
||||
task_id
|
||||
}
|
||||
InitialOperation::FollowActiveGoal => loop {
|
||||
let Some(server_event) = client.next_event().await else {
|
||||
anyhow::bail!("resumed active goal did not start a turn");
|
||||
};
|
||||
match server_event {
|
||||
InProcessServerEvent::ServerRequest(request) => {
|
||||
handle_server_request(&client, request, &mut error_seen).await;
|
||||
InitialOperation::FollowActiveGoal => {
|
||||
let mut active_goal_seen = false;
|
||||
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(30);
|
||||
loop {
|
||||
if active_goal_seen && let Some(task_id) = active_turn_id_from_resume.take() {
|
||||
break task_id;
|
||||
}
|
||||
InProcessServerEvent::ServerNotification(notification) => {
|
||||
if let ServerNotification::TurnStarted(payload) = ¬ification
|
||||
&& payload.thread_id == primary_thread_id_for_span
|
||||
{
|
||||
let task_id = payload.turn.id.clone();
|
||||
let _ = event_processor.process_server_notification(notification);
|
||||
break task_id;
|
||||
|
||||
let server_event = tokio::select! {
|
||||
maybe_interrupt = interrupt_rx.recv(), if interrupt_channel_open => {
|
||||
if maybe_interrupt.is_none() {
|
||||
interrupt_channel_open = false;
|
||||
continue;
|
||||
}
|
||||
if let Err(err) = request_shutdown(
|
||||
&client,
|
||||
&mut request_ids,
|
||||
&primary_thread_id_for_span,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("thread/unsubscribe failed during shutdown: {err}");
|
||||
}
|
||||
anyhow::bail!("interrupted while waiting for active goal continuation");
|
||||
}
|
||||
_ = tokio::time::sleep_until(deadline) => {
|
||||
anyhow::bail!("resumed active goal did not start a turn");
|
||||
}
|
||||
maybe_event = client.next_event() => maybe_event,
|
||||
};
|
||||
|
||||
let Some(server_event) = server_event else {
|
||||
anyhow::bail!("resumed active goal did not start a turn");
|
||||
};
|
||||
match server_event {
|
||||
InProcessServerEvent::ServerRequest(request) => {
|
||||
handle_server_request(&client, request, &mut error_seen).await;
|
||||
}
|
||||
InProcessServerEvent::ServerNotification(notification) => match ¬ification {
|
||||
ServerNotification::ThreadGoalUpdated(payload)
|
||||
if payload.thread_id == primary_thread_id_for_span =>
|
||||
{
|
||||
if payload.goal.status == ThreadGoalStatus::Active {
|
||||
active_goal_seen = true;
|
||||
} else {
|
||||
anyhow::bail!(
|
||||
"No prompt provided and resumed thread has no active goal. Specify a prompt or pipe one into stdin."
|
||||
);
|
||||
}
|
||||
}
|
||||
ServerNotification::ThreadGoalCleared(payload)
|
||||
if payload.thread_id == primary_thread_id_for_span =>
|
||||
{
|
||||
anyhow::bail!(
|
||||
"No prompt provided and resumed thread has no active goal. Specify a prompt or pipe one into stdin."
|
||||
);
|
||||
}
|
||||
ServerNotification::TurnStarted(payload)
|
||||
if active_goal_seen
|
||||
&& payload.thread_id == primary_thread_id_for_span =>
|
||||
{
|
||||
let task_id = payload.turn.id.clone();
|
||||
let _ = event_processor.process_server_notification(notification);
|
||||
break task_id;
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
InProcessServerEvent::Lagged { skipped } => {
|
||||
let message = lagged_event_warning_message(skipped);
|
||||
warn!("{message}");
|
||||
event_processor.process_warning(message);
|
||||
}
|
||||
}
|
||||
InProcessServerEvent::Lagged { skipped } => {
|
||||
let message = lagged_event_warning_message(skipped);
|
||||
warn!("{message}");
|
||||
event_processor.process_warning(message);
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
};
|
||||
exec_span.record("turn.id", task_id.as_str());
|
||||
|
||||
// Run the loop until the task is complete.
|
||||
// Track whether a fatal error was reported by the server so we can
|
||||
// exit with a non-zero status for automation-friendly signaling.
|
||||
let mut interrupt_channel_open = true;
|
||||
let primary_thread_id_for_requests = primary_thread_id.to_string();
|
||||
loop {
|
||||
let server_event = tokio::select! {
|
||||
|
||||
@@ -127,6 +127,99 @@ async fn mount_exec_responses(
|
||||
responses::mount_sse_sequence(server, (0..count).map(exec_sse_response).collect()).await
|
||||
}
|
||||
|
||||
fn goal_response_sse_response(response_id: &str, message: &str) -> String {
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created(response_id),
|
||||
responses::ev_assistant_message(response_id, message),
|
||||
responses::ev_completed(response_id),
|
||||
])
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn exec_resume_last_without_prompt_follows_active_goal() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let test = test_codex_exec();
|
||||
let server = MockServer::start().await;
|
||||
let response_mock = responses::mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-create-goal"),
|
||||
responses::ev_function_call(
|
||||
"call-create-goal",
|
||||
"create_goal",
|
||||
r#"{"objective":"manual promptless resume test"}"#,
|
||||
),
|
||||
responses::ev_completed("resp-create-goal"),
|
||||
]),
|
||||
goal_response_sse_response("resp-seed-goal", "seed goal is active"),
|
||||
goal_response_sse_response("resp-resume-goal", "goal resumed without prompt"),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
let repo_root = exec_repo_root()?;
|
||||
|
||||
test.cmd_with_server(&server)
|
||||
.arg("--skip-git-repo-check")
|
||||
.arg("-C")
|
||||
.arg(&repo_root)
|
||||
.arg("create an active goal")
|
||||
.assert()
|
||||
.success();
|
||||
|
||||
test.cmd_with_server(&server)
|
||||
.arg("--skip-git-repo-check")
|
||||
.arg("-C")
|
||||
.arg(&repo_root)
|
||||
.arg("resume")
|
||||
.arg("--last")
|
||||
.assert()
|
||||
.success();
|
||||
|
||||
let requests = response_mock.requests();
|
||||
assert_eq!(requests.len(), 3);
|
||||
assert!(requests[2].body_contains_text("<goal_context>"));
|
||||
assert!(requests[2].body_contains_text("Continue working toward the active thread goal."));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn exec_resume_last_without_prompt_rejects_thread_without_active_goal() -> anyhow::Result<()>
|
||||
{
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let test = test_codex_exec();
|
||||
let server = MockServer::start().await;
|
||||
let response_mock = mount_exec_responses(&server, /*count*/ 1).await;
|
||||
let repo_root = exec_repo_root()?;
|
||||
|
||||
test.cmd_with_server(&server)
|
||||
.arg("--skip-git-repo-check")
|
||||
.arg("-C")
|
||||
.arg(&repo_root)
|
||||
.arg("seed a normal session")
|
||||
.assert()
|
||||
.success();
|
||||
|
||||
let output = test
|
||||
.cmd_with_server(&server)
|
||||
.arg("--skip-git-repo-check")
|
||||
.arg("-C")
|
||||
.arg(&repo_root)
|
||||
.arg("resume")
|
||||
.arg("--last")
|
||||
.assert()
|
||||
.failure()
|
||||
.get_output()
|
||||
.clone();
|
||||
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
assert!(stderr.contains("resumed thread has no active goal"));
|
||||
assert_eq!(response_mock.requests().len(), 1);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn exec_resume_last_appends_to_existing_file() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
Reference in New Issue
Block a user