Stop emitting v2 turn/started

This commit is contained in:
Ahmed Ibrahim
2026-02-02 14:06:48 -08:00
parent 9c01c75697
commit edc8af2509
4 changed files with 17 additions and 70 deletions

View File

@@ -43,7 +43,7 @@ Use the thread APIs to create, list, or archive conversations. Drive a conversat
- Initialize once: Immediately after launching the codex app-server process, send an `initialize` request with your client metadata, then emit an `initialized` notification. Any other request before this handshake gets rejected.
- Start (or resume) a thread: Call `thread/start` to open a fresh conversation. The response returns the thread object and youll also get a `thread/started` notification. If youre continuing an existing conversation, call `thread/resume` with its ID instead. If you want to branch from an existing conversation, call `thread/fork` to create a new thread id with copied history.
- Begin a turn: To send user input, call `turn/start` with the target `threadId` and the user's input. Optional fields let you override model, cwd, sandbox policy, etc. This immediately returns the new turn object and triggers a `turn/started` notification.
- Begin a turn: To send user input, call `turn/start` with the target `threadId` and the user's input. Optional fields let you override model, cwd, sandbox policy, etc. This immediately returns the new turn object; clients should rely on the core event stream (`codex/event/task_started`) and item notifications (no v2 `turn/started` notification is emitted).
- Stream events: After `turn/start`, keep reading JSON-RPC notifications on stdout. Youll see `item/started`, `item/completed`, deltas like `item/agentMessage/delta`, tool progress, etc. These represent streaming model output plus any side effects (commands, tool calls, reasoning notes).
- Finish the turn: When the model is done (or the turn is interrupted via making the `turn/interrupt` call), the server sends `turn/completed` with the final turn state and token usage.
@@ -81,12 +81,12 @@ Example (from OpenAI's official VSCode extension):
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders` filtering.
- `thread/loaded/list` — list the thread ids currently loaded in memory.
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`.
- `thread/compact` — trigger a context compaction run for a thread; returns a `TurnStartResponse`. No `turn/started` notification is emitted; clients should rely on the core event stream (`task_started`) and item notifications.
- `thread/compact` — trigger a context compaction run for a thread; returns a `TurnStartResponse`. No v2 `turn/started` notification is emitted; clients should rely on the core event stream (`codex/event/task_started`) and item notifications.
- `thread/archive` — move a threads rollout file into the archived directory; returns `{}` on success.
- `thread/name/set` — set or update a threads user-facing name; returns `{}` on success. Thread names are not required to be unique; name lookups resolve to the most recently updated thread.
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success.
- `thread/rollback` — drop the last N turns from the agents in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications.
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `item/*` and `turn/completed` notifications. Clients should rely on the core event stream (`codex/event/task_started`) for turn start (no v2 `turn/started` notification is emitted).
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
- `review/start` — kick off Codexs automated reviewer for a thread; responds like `turn/start` and emits `item/started`/`item/completed` notifications with `enteredReviewMode` and `exitedReviewMode` items, plus a final assistant `agentMessage` containing the review.
- `command/exec` — run a single command under the server sandbox without starting a thread/turn (handy for utilities and validation).
@@ -368,7 +368,7 @@ Example request/response:
For a detached review, use `"delivery": "detached"`. The response is the same shape, but `reviewThreadId` will be the id of the new review thread (different from the original `threadId`). The server also emits a `thread/started` notification for that new thread before streaming the review turn.
Codex streams the usual `turn/started` notification followed by an `item/started`
Codex emits a `codex/event/task_started` core event, followed by an `item/started`
with an `enteredReviewMode` item so clients can show progress:
```json
@@ -426,13 +426,13 @@ Notes:
## Events
Event notifications are the server-initiated event stream for thread lifecycles, turn lifecycles, and the items within them. After you start or resume a thread, keep reading stdout for `thread/started`, `turn/*`, and `item/*` notifications.
Event notifications are the server-initiated event stream for thread lifecycles, turn lifecycles, and the items within them. After you start or resume a thread, keep reading stdout for `thread/started`, `codex/event/*`, `turn/*`, and `item/*` notifications.
### Turn events
The app-server streams JSON-RPC notifications while a turn is running. Each turn starts with `turn/started` (initial `turn`) and ends with `turn/completed` (final `turn` status). Token usage events stream separately via `thread/tokenUsage/updated`. Clients subscribe to the events they care about, rendering each item incrementally as updates arrive. The per-item lifecycle is always: `item/started` → zero or more item-specific deltas → `item/completed`.
The app-server streams JSON-RPC notifications while a turn is running. Each turn starts with the core event stream `codex/event/task_started` (clients can treat this as "turn started") and ends with `turn/completed` (final `turn` status). Token usage events stream separately via `thread/tokenUsage/updated`. Clients subscribe to the events they care about, rendering each item incrementally as updates arrive. The per-item lifecycle is always: `item/started` → zero or more item-specific deltas → `item/completed`.
- `turn/started``{ turn }` with the turn id, empty `items`, and `status: "inProgress"`.
- `codex/event/task_started` — core `Event` with `id` set to the turn id, plus `conversationId`; clients can treat this as the "turn started" signal.
- `turn/completed``{ turn }` where `turn.status` is `completed`, `interrupted`, or `failed`; failures carry `{ error: { message, codexErrorInfo?, additionalDetails? } }`.
- `turn/diff/updated``{ threadId, turnId, diff }` represents the up-to-date snapshot of the turn-level unified diff, emitted after every FileChange item. `diff` is the latest aggregated unified diff across every file change in the turn. UIs can render this to show the full "what changed" view without stitching individual `fileChange` items.
- `turn/plan/updated``{ turnId, explanation?, plan }` whenever the agent shares or changes its plan; each `plan` entry is `{ step, status }` with `status` in `pending`, `inProgress`, or `completed`.

View File

@@ -125,7 +125,6 @@ use codex_app_server_protocol::TurnError;
use codex_app_server_protocol::TurnInterruptParams;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInfoResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
@@ -4136,15 +4135,6 @@ impl CodexMessageProcessor {
let response = TurnStartResponse { turn: turn.clone() };
self.outgoing.send_response(request_id, response).await;
// Emit v2 turn/started notification.
let notif = TurnStartedNotification {
thread_id: params.thread_id,
turn,
};
self.outgoing
.send_server_notification(ServerNotification::TurnStarted(notif))
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
@@ -4217,24 +4207,15 @@ impl CodexMessageProcessor {
&self,
request_id: &RequestId,
turn: Turn,
parent_thread_id: String,
review_thread_id: String,
) {
let response = ReviewStartResponse {
turn: turn.clone(),
turn,
review_thread_id,
};
self.outgoing
.send_response(request_id.clone(), response)
.await;
let notif = TurnStartedNotification {
thread_id: parent_thread_id,
turn,
};
self.outgoing
.send_server_notification(ServerNotification::TurnStarted(notif))
.await;
}
async fn start_inline_review(
@@ -4250,13 +4231,8 @@ impl CodexMessageProcessor {
match turn_id {
Ok(turn_id) => {
let turn = Self::build_review_turn(turn_id, display_text);
self.emit_review_started(
request_id,
turn,
parent_thread_id.clone(),
parent_thread_id,
)
.await;
self.emit_review_started(request_id, turn, parent_thread_id)
.await;
Ok(())
}
Err(err) => Err(JSONRPCErrorError {
@@ -4355,7 +4331,7 @@ impl CodexMessageProcessor {
let turn = Self::build_review_turn(turn_id, display_text);
let review_thread_id = thread_id.to_string();
self.emit_review_started(request_id, turn, review_thread_id.clone(), review_thread_id)
self.emit_review_started(request_id, turn, review_thread_id)
.await;
Ok(())

View File

@@ -26,7 +26,6 @@ use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_core::auth::AuthCredentialsStoreMode;
@@ -370,12 +369,9 @@ async fn wait_for_context_compaction_started_without_turn_started(
}
}
JSONRPCMessage::Notification(notification) if notification.method == "turn/started" => {
let started: TurnStartedNotification = serde_json::from_value(
notification.params.clone().expect("turn/started params"),
)?;
if started.turn.id == forbidden_turn_id {
anyhow::bail!("unexpected v2 turn/started notification for manual compaction");
}
anyhow::bail!(
"unexpected v2 turn/started notification for manual compaction (turn_id={forbidden_turn_id})"
);
}
_ => {}
}
@@ -400,12 +396,9 @@ async fn wait_for_context_compaction_completed_without_turn_started(
}
}
JSONRPCMessage::Notification(notification) if notification.method == "turn/started" => {
let started: TurnStartedNotification = serde_json::from_value(
notification.params.clone().expect("turn/started params"),
)?;
if started.turn.id == forbidden_turn_id {
anyhow::bail!("unexpected v2 turn/started notification for manual compaction");
}
anyhow::bail!(
"unexpected v2 turn/started notification for manual compaction (turn_id={forbidden_turn_id})"
);
}
_ => {}
}

View File

@@ -31,7 +31,6 @@ use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_core::features::FEATURES;
@@ -269,20 +268,6 @@ async fn turn_start_emits_notifications_and_accepts_model_override() -> Result<(
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
assert!(!turn.id.is_empty());
// Expect a turn/started notification.
let notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/started"),
)
.await??;
let started: TurnStartedNotification =
serde_json::from_value(notif.params.expect("params must be present"))?;
assert_eq!(started.thread_id, thread.id);
assert_eq!(
started.turn.status,
codex_app_server_protocol::TurnStatus::InProgress
);
// Send a second turn that exercises the overrides path: change the model.
let turn_req2 = mcp
.send_turn_start_request(TurnStartParams {
@@ -305,13 +290,6 @@ async fn turn_start_emits_notifications_and_accepts_model_override() -> Result<(
// Ensure the second turn has a different id than the first.
assert_ne!(turn.id, turn2.id);
// Expect a second turn/started notification as well.
let _notif2: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/started"),
)
.await??;
let completed_notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),