fix(app-server): emit turn/started only when turn actually starts (#13261)

This is a follow-up for https://github.com/openai/codex/pull/13047

## Why
We had a race where `turn/started` could be observed before the thread
had actually transitioned to `Active`. This was because we eagerly
emitted `turn/started` in the request handler for `turn/start` (and
`review/start`).

That was showing up as flaky `thread/resume` tests, but the real issue
was broader: a client could see `turn/started` and still get back an
idle thread immediately afterward.

The first idea was to eagerly call
`thread_watch_manager.note_turn_started(...)` from the `turn/start`
request path. That turns out to be unsafe, because
`submit(Op::UserInput)` only queues work. If a turn starts and completes
quickly, request-path bookkeeping can race with the real lifecycle
events and leave stale running state behind.

**The real fix** is to move `turn/started` to emit only after the turn
_actually_ starts, so we do that by waiting for the
`EventMsg::TurnStarted` notification emitted by codex core. We do this
for both `turn/start` and `review/start`.

I also verified this change is safe for our first-party codex apps -
they don't have any assumptions that `turn/started` is emitted before
the RPC response to `turn/start` (which is correct anyway).

I also removed `single_client_mode` since it isn't really necessary now.

## Testing
- `cargo test -p codex-app-server thread_resume -- --nocapture`
- `cargo test -p codex-app-server
'suite::v2::turn_start::turn_start_emits_notifications_and_accepts_model_override'
-- --exact --nocapture`
- `cargo test -p codex-app-server`
This commit is contained in:
Owen Lin
2026-03-02 16:43:31 -08:00
committed by GitHub
parent b20b6aa46f
commit 146b798129
7 changed files with 59 additions and 86 deletions

View File

@@ -173,7 +173,6 @@ use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnInterruptParams;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::TurnSteerParams;
use codex_app_server_protocol::TurnSteerResponse;
@@ -375,7 +374,6 @@ pub(crate) struct CodexMessageProcessor {
outgoing: Arc<OutgoingMessageSender>,
arg0_paths: Arg0DispatchPaths,
config: Arc<Config>,
single_client_mode: bool,
cli_overrides: Vec<(String, TomlValue)>,
cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
active_login: Arc<Mutex<Option<ActiveLogin>>>,
@@ -402,7 +400,6 @@ struct ListenerTaskContext {
thread_watch_manager: ThreadWatchManager,
fallback_model_provider: String,
codex_home: PathBuf,
single_client_mode: bool,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
@@ -419,7 +416,6 @@ pub(crate) struct CodexMessageProcessorArgs {
pub(crate) config: Arc<Config>,
pub(crate) cli_overrides: Vec<(String, TomlValue)>,
pub(crate) cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
pub(crate) single_client_mode: bool,
pub(crate) feedback: CodexFeedback,
}
@@ -464,7 +460,6 @@ impl CodexMessageProcessor {
config,
cli_overrides,
cloud_requirements,
single_client_mode,
feedback,
} = args;
Self {
@@ -473,7 +468,6 @@ impl CodexMessageProcessor {
outgoing: outgoing.clone(),
arg0_paths,
config,
single_client_mode,
cli_overrides,
cloud_requirements,
active_login: Arc::new(Mutex::new(None)),
@@ -2082,7 +2076,6 @@ impl CodexMessageProcessor {
thread_watch_manager: self.thread_watch_manager.clone(),
fallback_model_provider: self.config.model_provider_id.clone(),
codex_home: self.config.codex_home.clone(),
single_client_mode: self.single_client_mode,
};
tokio::spawn(async move {
@@ -5910,17 +5903,8 @@ impl CodexMessageProcessor {
status: TurnStatus::InProgress,
};
let response = TurnStartResponse { turn: turn.clone() };
let response = TurnStartResponse { turn };
self.outgoing.send_response(request_id, response).await;
// Emit v2 turn/started notification.
let notif = TurnStartedNotification {
thread_id: params.thread_id,
turn,
};
self.outgoing
.send_server_notification(ServerNotification::TurnStarted(notif))
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
@@ -6211,24 +6195,15 @@ impl CodexMessageProcessor {
&self,
request_id: &ConnectionRequestId,
turn: Turn,
parent_thread_id: String,
review_thread_id: String,
) {
let response = ReviewStartResponse {
turn: turn.clone(),
turn,
review_thread_id,
};
self.outgoing
.send_response(request_id.clone(), response)
.await;
let notif = TurnStartedNotification {
thread_id: parent_thread_id,
turn,
};
self.outgoing
.send_server_notification(ServerNotification::TurnStarted(notif))
.await;
}
async fn start_inline_review(
@@ -6244,13 +6219,8 @@ impl CodexMessageProcessor {
match turn_id {
Ok(turn_id) => {
let turn = Self::build_review_turn(turn_id, display_text);
self.emit_review_started(
request_id,
turn,
parent_thread_id.clone(),
parent_thread_id,
)
.await;
self.emit_review_started(request_id, turn, parent_thread_id)
.await;
Ok(())
}
Err(err) => Err(JSONRPCErrorError {
@@ -6364,7 +6334,7 @@ impl CodexMessageProcessor {
let turn = Self::build_review_turn(turn_id, display_text);
let review_thread_id = thread_id.to_string();
self.emit_review_started(request_id, turn, review_thread_id.clone(), review_thread_id)
self.emit_review_started(request_id, turn, review_thread_id)
.await;
Ok(())
@@ -6540,7 +6510,6 @@ impl CodexMessageProcessor {
thread_watch_manager: self.thread_watch_manager.clone(),
fallback_model_provider: self.config.model_provider_id.clone(),
codex_home: self.config.codex_home.clone(),
single_client_mode: self.single_client_mode,
},
conversation_id,
connection_id,
@@ -6628,7 +6597,6 @@ impl CodexMessageProcessor {
thread_watch_manager: self.thread_watch_manager.clone(),
fallback_model_provider: self.config.model_provider_id.clone(),
codex_home: self.config.codex_home.clone(),
single_client_mode: self.single_client_mode,
},
conversation_id,
conversation,
@@ -6660,7 +6628,6 @@ impl CodexMessageProcessor {
thread_watch_manager,
fallback_model_provider,
codex_home,
single_client_mode,
} = listener_task_context;
let outgoing_for_task = Arc::clone(&outgoing);
tokio::spawn(async move {
@@ -6710,9 +6677,7 @@ impl CodexMessageProcessor {
);
let raw_events_enabled = {
let mut thread_state = thread_state.lock().await;
if !single_client_mode {
thread_state.track_current_turn_event(&event.msg);
}
thread_state.track_current_turn_event(&event.msg);
thread_state.experimental_raw_events
};
let subscribed_connection_ids = thread_state_manager