Compare commits

...

2 Commits

Author SHA1 Message Date
Ahmed Ibrahim
edc8af2509 Stop emitting v2 turn/started 2026-02-02 14:06:48 -08:00
Ahmed Ibrahim
9c01c75697 Add v2 thread/compact 2026-02-02 13:47:54 -08:00
7 changed files with 194 additions and 56 deletions

View File

@@ -152,6 +152,10 @@ client_request_definitions! {
params: v2::ThreadReadParams,
response: v2::ThreadReadResponse,
},
ThreadCompact => "thread/compact" {
params: v2::ThreadCompactParams,
response: v2::TurnStartResponse,
},
SkillsList => "skills/list" {
params: v2::SkillsListParams,
response: v2::SkillsListResponse,

View File

@@ -1449,6 +1449,13 @@ pub struct ThreadReadResponse {
pub thread: Thread,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadCompactParams {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -43,7 +43,7 @@ Use the thread APIs to create, list, or archive conversations. Drive a conversat
- Initialize once: Immediately after launching the codex app-server process, send an `initialize` request with your client metadata, then emit an `initialized` notification. Any other request before this handshake gets rejected.
- Start (or resume) a thread: Call `thread/start` to open a fresh conversation. The response returns the thread object and youll also get a `thread/started` notification. If youre continuing an existing conversation, call `thread/resume` with its ID instead. If you want to branch from an existing conversation, call `thread/fork` to create a new thread id with copied history.
- Begin a turn: To send user input, call `turn/start` with the target `threadId` and the user's input. Optional fields let you override model, cwd, sandbox policy, etc. This immediately returns the new turn object and triggers a `turn/started` notification.
- Begin a turn: To send user input, call `turn/start` with the target `threadId` and the user's input. Optional fields let you override model, cwd, sandbox policy, etc. This immediately returns the new turn object; clients should rely on the core event stream (`codex/event/task_started`) and item notifications (no v2 `turn/started` notification is emitted).
- Stream events: After `turn/start`, keep reading JSON-RPC notifications on stdout. Youll see `item/started`, `item/completed`, deltas like `item/agentMessage/delta`, tool progress, etc. These represent streaming model output plus any side effects (commands, tool calls, reasoning notes).
- Finish the turn: When the model is done (or the turn is interrupted via making the `turn/interrupt` call), the server sends `turn/completed` with the final turn state and token usage.
@@ -81,11 +81,12 @@ Example (from OpenAI's official VSCode extension):
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders` filtering.
- `thread/loaded/list` — list the thread ids currently loaded in memory.
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`.
- `thread/compact` — trigger a context compaction run for a thread; returns a `TurnStartResponse`. No v2 `turn/started` notification is emitted; clients should rely on the core event stream (`codex/event/task_started`) and item notifications.
- `thread/archive` — move a threads rollout file into the archived directory; returns `{}` on success.
- `thread/name/set` — set or update a threads user-facing name; returns `{}` on success. Thread names are not required to be unique; name lookups resolve to the most recently updated thread.
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success.
- `thread/rollback` — drop the last N turns from the agents in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications.
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `item/*` and `turn/completed` notifications. Clients should rely on the core event stream (`codex/event/task_started`) for turn start (no v2 `turn/started` notification is emitted).
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
- `review/start` — kick off Codexs automated reviewer for a thread; responds like `turn/start` and emits `item/started`/`item/completed` notifications with `enteredReviewMode` and `exitedReviewMode` items, plus a final assistant `agentMessage` containing the review.
- `command/exec` — run a single command under the server sandbox without starting a thread/turn (handy for utilities and validation).
@@ -367,7 +368,7 @@ Example request/response:
For a detached review, use `"delivery": "detached"`. The response is the same shape, but `reviewThreadId` will be the id of the new review thread (different from the original `threadId`). The server also emits a `thread/started` notification for that new thread before streaming the review turn.
Codex streams the usual `turn/started` notification followed by an `item/started`
Codex emits a `codex/event/task_started` core event, followed by an `item/started`
with an `enteredReviewMode` item so clients can show progress:
```json
@@ -425,13 +426,13 @@ Notes:
## Events
Event notifications are the server-initiated event stream for thread lifecycles, turn lifecycles, and the items within them. After you start or resume a thread, keep reading stdout for `thread/started`, `turn/*`, and `item/*` notifications.
Event notifications are the server-initiated event stream for thread lifecycles, turn lifecycles, and the items within them. After you start or resume a thread, keep reading stdout for `thread/started`, `codex/event/*`, `turn/*`, and `item/*` notifications.
### Turn events
The app-server streams JSON-RPC notifications while a turn is running. Each turn starts with `turn/started` (initial `turn`) and ends with `turn/completed` (final `turn` status). Token usage events stream separately via `thread/tokenUsage/updated`. Clients subscribe to the events they care about, rendering each item incrementally as updates arrive. The per-item lifecycle is always: `item/started` → zero or more item-specific deltas → `item/completed`.
The app-server streams JSON-RPC notifications while a turn is running. Each turn starts with the core event stream `codex/event/task_started` (clients can treat this as "turn started") and ends with `turn/completed` (final `turn` status). Token usage events stream separately via `thread/tokenUsage/updated`. Clients subscribe to the events they care about, rendering each item incrementally as updates arrive. The per-item lifecycle is always: `item/started` → zero or more item-specific deltas → `item/completed`.
- `turn/started``{ turn }` with the turn id, empty `items`, and `status: "inProgress"`.
- `codex/event/task_started` — core `Event` with `id` set to the turn id, plus `conversationId`; clients can treat this as the "turn started" signal.
- `turn/completed``{ turn }` where `turn.status` is `completed`, `interrupted`, or `failed`; failures carry `{ error: { message, codexErrorInfo?, additionalDetails? } }`.
- `turn/diff/updated``{ threadId, turnId, diff }` represents the up-to-date snapshot of the turn-level unified diff, emitted after every FileChange item. `diff` is the latest aggregated unified diff across every file change in the turn. UIs can render this to show the full "what changed" view without stitching individual `fileChange` items.
- `turn/plan/updated``{ turnId, explanation?, plan }` whenever the agent shares or changes its plan; each `plan` entry is `{ step, status }` with `status` in `pending`, `inProgress`, or `completed`.

View File

@@ -98,6 +98,7 @@ use codex_app_server_protocol::SkillsListResponse;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadCompactParams;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadItem;
@@ -124,7 +125,6 @@ use codex_app_server_protocol::TurnError;
use codex_app_server_protocol::TurnInterruptParams;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInfoResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
@@ -456,6 +456,9 @@ impl CodexMessageProcessor {
ClientRequest::ThreadRead { request_id, params } => {
self.thread_read(request_id, params).await;
}
ClientRequest::ThreadCompact { request_id, params } => {
self.thread_compact(request_id, params).await;
}
ClientRequest::SkillsList { request_id, params } => {
self.skills_list(request_id, params).await;
}
@@ -4132,15 +4135,6 @@ impl CodexMessageProcessor {
let response = TurnStartResponse { turn: turn.clone() };
self.outgoing.send_response(request_id, response).await;
// Emit v2 turn/started notification.
let notif = TurnStartedNotification {
thread_id: params.thread_id,
turn,
};
self.outgoing
.send_server_notification(ServerNotification::TurnStarted(notif))
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
@@ -4153,6 +4147,40 @@ impl CodexMessageProcessor {
}
}
async fn thread_compact(&self, request_id: RequestId, params: ThreadCompactParams) {
let ThreadCompactParams { thread_id } = params;
let (_, thread) = match self.load_thread(&thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let turn_id = thread.submit(Op::Compact).await;
match turn_id {
Ok(turn_id) => {
let turn = Turn {
id: turn_id,
items: Vec::new(),
error: None,
status: TurnStatus::InProgress,
};
let response = TurnStartResponse { turn };
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to compact thread: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}
fn build_review_turn(turn_id: String, display_text: &str) -> Turn {
let items = if display_text.is_empty() {
Vec::new()
@@ -4179,24 +4207,15 @@ impl CodexMessageProcessor {
&self,
request_id: &RequestId,
turn: Turn,
parent_thread_id: String,
review_thread_id: String,
) {
let response = ReviewStartResponse {
turn: turn.clone(),
turn,
review_thread_id,
};
self.outgoing
.send_response(request_id.clone(), response)
.await;
let notif = TurnStartedNotification {
thread_id: parent_thread_id,
turn,
};
self.outgoing
.send_server_notification(ServerNotification::TurnStarted(notif))
.await;
}
async fn start_inline_review(
@@ -4212,13 +4231,8 @@ impl CodexMessageProcessor {
match turn_id {
Ok(turn_id) => {
let turn = Self::build_review_turn(turn_id, display_text);
self.emit_review_started(
request_id,
turn,
parent_thread_id.clone(),
parent_thread_id,
)
.await;
self.emit_review_started(request_id, turn, parent_thread_id)
.await;
Ok(())
}
Err(err) => Err(JSONRPCErrorError {
@@ -4317,7 +4331,7 @@ impl CodexMessageProcessor {
let turn = Self::build_review_turn(turn_id, display_text);
let review_thread_id = thread_id.to_string();
self.emit_review_started(request_id, turn, review_thread_id.clone(), review_thread_id)
self.emit_review_started(request_id, turn, review_thread_id)
.await;
Ok(())

View File

@@ -48,6 +48,7 @@ use codex_app_server_protocol::SendUserTurnParams;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::SetDefaultModelParams;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadCompactParams;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadLoadedListParams;
@@ -427,6 +428,15 @@ impl McpProcess {
self.send_request("thread/read", params).await
}
/// Send a `thread/compact` JSON-RPC request.
pub async fn send_thread_compact_request(
&mut self,
params: ThreadCompactParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/compact", params).await
}
/// Send a `model/list` JSON-RPC request.
pub async fn send_list_models_request(
&mut self,

View File

@@ -15,15 +15,18 @@ use app_test_support::write_chatgpt_auth;
use app_test_support::write_mock_responses_config_toml;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadCompactParams;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_core::auth::AuthCredentialsStoreMode;
use codex_core::features::Feature;
@@ -38,6 +41,7 @@ use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const AUTO_COMPACT_LIMIT: i64 = 1_000;
const MANUAL_COMPACT_LIMIT: i64 = 1_000_000;
const COMPACT_PROMPT: &str = "Summarize the conversation.";
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -195,6 +199,74 @@ async fn auto_compaction_remote_emits_started_and_completed_items() -> Result<()
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn manual_compaction_returns_turn_start_response_and_emits_items() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let sse1 = responses::sse(vec![
responses::ev_assistant_message("m1", "FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", 200),
]);
let sse2 = responses::sse(vec![
responses::ev_assistant_message("m2", "LOCAL_SUMMARY"),
responses::ev_completed_with_tokens("r2", 200),
]);
responses::mount_sse_sequence(&server, vec![sse1, sse2]).await;
let codex_home = TempDir::new()?;
write_mock_responses_config_toml(
codex_home.path(),
&server.uri(),
&BTreeMap::default(),
MANUAL_COMPACT_LIMIT,
None,
"mock_provider",
COMPACT_PROMPT,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_id = start_thread(&mut mcp).await?;
send_turn_and_wait(&mut mcp, &thread_id, "first").await?;
// Clear turn/started, item/*, etc from the prior turn so we can assert about
// the compaction flow without unrelated buffered notifications.
mcp.clear_message_buffer();
let compact_request_id = mcp
.send_thread_compact_request(ThreadCompactParams {
thread_id: thread_id.clone(),
})
.await?;
let compact_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(compact_request_id)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(compact_resp)?;
assert_eq!(turn.status, TurnStatus::InProgress);
let started =
wait_for_context_compaction_started_without_turn_started(&mut mcp, &turn.id).await?;
let completed =
wait_for_context_compaction_completed_without_turn_started(&mut mcp, &turn.id).await?;
let ThreadItem::ContextCompaction { id: started_id } = started.item else {
unreachable!("started item should be context compaction");
};
let ThreadItem::ContextCompaction { id: completed_id } = completed.item else {
unreachable!("completed item should be context compaction");
};
assert_eq!(started.thread_id, thread_id);
assert_eq!(completed.thread_id, thread_id);
assert_eq!(started_id, completed_id);
Ok(())
}
async fn start_thread(mcp: &mut McpProcess) -> Result<String> {
let thread_id = mcp
.send_thread_start_request(ThreadStartParams {
@@ -280,3 +352,55 @@ async fn wait_for_context_compaction_completed(
}
}
}
async fn wait_for_context_compaction_started_without_turn_started(
mcp: &mut McpProcess,
forbidden_turn_id: &str,
) -> Result<ItemStartedNotification> {
loop {
let message = timeout(DEFAULT_READ_TIMEOUT, mcp.read_next_message()).await??;
match message {
JSONRPCMessage::Notification(notification) if notification.method == "item/started" => {
let started: ItemStartedNotification = serde_json::from_value(
notification.params.clone().expect("item/started params"),
)?;
if let ThreadItem::ContextCompaction { .. } = started.item {
return Ok(started);
}
}
JSONRPCMessage::Notification(notification) if notification.method == "turn/started" => {
anyhow::bail!(
"unexpected v2 turn/started notification for manual compaction (turn_id={forbidden_turn_id})"
);
}
_ => {}
}
}
}
async fn wait_for_context_compaction_completed_without_turn_started(
mcp: &mut McpProcess,
forbidden_turn_id: &str,
) -> Result<ItemCompletedNotification> {
loop {
let message = timeout(DEFAULT_READ_TIMEOUT, mcp.read_next_message()).await??;
match message {
JSONRPCMessage::Notification(notification)
if notification.method == "item/completed" =>
{
let completed: ItemCompletedNotification = serde_json::from_value(
notification.params.clone().expect("item/completed params"),
)?;
if let ThreadItem::ContextCompaction { .. } = completed.item {
return Ok(completed);
}
}
JSONRPCMessage::Notification(notification) if notification.method == "turn/started" => {
anyhow::bail!(
"unexpected v2 turn/started notification for manual compaction (turn_id={forbidden_turn_id})"
);
}
_ => {}
}
}
}

View File

@@ -31,7 +31,6 @@ use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_core::features::FEATURES;
@@ -269,20 +268,6 @@ async fn turn_start_emits_notifications_and_accepts_model_override() -> Result<(
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
assert!(!turn.id.is_empty());
// Expect a turn/started notification.
let notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/started"),
)
.await??;
let started: TurnStartedNotification =
serde_json::from_value(notif.params.expect("params must be present"))?;
assert_eq!(started.thread_id, thread.id);
assert_eq!(
started.turn.status,
codex_app_server_protocol::TurnStatus::InProgress
);
// Send a second turn that exercises the overrides path: change the model.
let turn_req2 = mcp
.send_turn_start_request(TurnStartParams {
@@ -305,13 +290,6 @@ async fn turn_start_emits_notifications_and_accepts_model_override() -> Result<(
// Ensure the second turn has a different id than the first.
assert_ne!(turn.id, turn2.id);
// Expect a second turn/started notification as well.
let _notif2: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/started"),
)
.await??;
let completed_notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),