Compare commits

..

5 Commits

Author SHA1 Message Date
Dylan Hurd
968c029471 fix(core) updated request_rule guidance (#10379)
## Summary
Update guidance for request_rule

## Testing
- [x] Unit tests pass
2026-02-03 22:29:52 -08:00
pakrym-oai
56ebfff1a8 Move metadata calculation out of client (#10589)
Model client shouldn't be responsible for this.
2026-02-03 21:59:13 -08:00
Ahmed Ibrahim
38a47700b5 Add thread/compact v2 (#10445)
- add `thread/compact` as a trigger-only v2 RPC that submits
`Op::Compact` and returns `{}` immediately.
- add v2 compaction e2e coverage for success and invalid/unknown thread
ids, and update protocol schemas/docs.
2026-02-03 18:15:55 -08:00
Anton Panasenko
fcaed4cb88 feat: log webscocket timing into runtime metrics (#10577) 2026-02-03 18:04:07 -08:00
Charley Cunningham
a9eb766f33 tui: make Esc clear request_user_input notes while notes are shown (#10569)
## Summary

This PR updates the `request_user_input` TUI overlay so `Esc` is
context-aware:

- When notes are visible for an option question, `Esc` now clears notes
and exits notes mode.
- When notes are not visible (normal option selection UI), `Esc` still
interrupts as before.

It also updates footer guidance text to match behavior.

## Changes

- Added a shared notes-clear path for option questions:
- `Tab` and `Esc` now both clear notes and return focus to options when
notes are visible.
- Updated footer hint text in notes-visible state:
  - from: `tab to clear notes | ... | esc to interrupt`
  - to: `tab or esc to clear notes | ...`
- Hid `esc to interrupt` hint while notes are visible for option
questions.
- Kept `esc to interrupt` visible and functional in normal
option-selection mode.
- Updated tests to assert the new `Esc` behavior in notes mode.
- Updated snapshot output for the notes-visible footer row.
- Updated docs in `docs/tui-request-user-input.md` to reflect
mode-specific `Esc` behavior.
2026-02-03 16:17:06 -08:00
32 changed files with 615 additions and 200 deletions

View File

@@ -55,7 +55,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.95.0"
version = "0.0.0"
# Track the edition for all workspace crates in one place. Individual
# crates can still override this value, but keeping it here means new
# crates created with `cargo new -w ...` automatically inherit the 2024

View File

@@ -2235,6 +2235,17 @@
],
"type": "object"
},
"ThreadCompactStartParams": {
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"type": "object"
},
"ThreadForkParams": {
"description": "There are two ways to fork a thread: 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread. 2. By path: load the thread from disk by path and fork it into a new thread.\n\nIf using path, the thread_id param will be ignored.\n\nPrefer using thread_id whenever possible.",
"properties": {
@@ -3210,6 +3221,30 @@
"title": "Thread/unarchiveRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"thread/compact/start"
],
"title": "Thread/compact/startRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadCompactStartParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Thread/compact/startRequest",
"type": "object"
},
{
"properties": {
"id": {

View File

@@ -598,6 +598,30 @@
"title": "Thread/unarchiveRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"thread/compact/start"
],
"title": "Thread/compact/startRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/ThreadCompactStartParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Thread/compact/startRequest",
"type": "object"
},
{
"properties": {
"id": {
@@ -13855,6 +13879,24 @@
"title": "ThreadArchiveResponse",
"type": "object"
},
"ThreadCompactStartParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadCompactStartParams",
"type": "object"
},
"ThreadCompactStartResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ThreadCompactStartResponse",
"type": "object"
},
"ThreadForkParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "There are two ways to fork a thread: 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread. 2. By path: load the thread from disk by path and fork it into a new thread.\n\nIf using path, the thread_id param will be ignored.\n\nPrefer using thread_id whenever possible.",

View File

@@ -0,0 +1,13 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadCompactStartParams",
"type": "object"
}

View File

@@ -0,0 +1,5 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ThreadCompactStartResponse",
"type": "object"
}

View File

@@ -39,6 +39,7 @@ import type { SkillsListParams } from "./v2/SkillsListParams";
import type { SkillsRemoteReadParams } from "./v2/SkillsRemoteReadParams";
import type { SkillsRemoteWriteParams } from "./v2/SkillsRemoteWriteParams";
import type { ThreadArchiveParams } from "./v2/ThreadArchiveParams";
import type { ThreadCompactStartParams } from "./v2/ThreadCompactStartParams";
import type { ThreadForkParams } from "./v2/ThreadForkParams";
import type { ThreadListParams } from "./v2/ThreadListParams";
import type { ThreadLoadedListParams } from "./v2/ThreadLoadedListParams";
@@ -54,4 +55,4 @@ import type { TurnStartParams } from "./v2/TurnStartParams";
/**
* Request from the client to the server.
*/
export type ClientRequest ={ "method": "initialize", id: RequestId, params: InitializeParams, } | { "method": "thread/start", id: RequestId, params: ThreadStartParams, } | { "method": "thread/resume", id: RequestId, params: ThreadResumeParams, } | { "method": "thread/fork", id: RequestId, params: ThreadForkParams, } | { "method": "thread/archive", id: RequestId, params: ThreadArchiveParams, } | { "method": "thread/name/set", id: RequestId, params: ThreadSetNameParams, } | { "method": "thread/unarchive", id: RequestId, params: ThreadUnarchiveParams, } | { "method": "thread/rollback", id: RequestId, params: ThreadRollbackParams, } | { "method": "thread/list", id: RequestId, params: ThreadListParams, } | { "method": "thread/loaded/list", id: RequestId, params: ThreadLoadedListParams, } | { "method": "thread/read", id: RequestId, params: ThreadReadParams, } | { "method": "skills/list", id: RequestId, params: SkillsListParams, } | { "method": "skills/remote/read", id: RequestId, params: SkillsRemoteReadParams, } | { "method": "skills/remote/write", id: RequestId, params: SkillsRemoteWriteParams, } | { "method": "app/list", id: RequestId, params: AppsListParams, } | { "method": "skills/config/write", id: RequestId, params: SkillsConfigWriteParams, } | { "method": "turn/start", id: RequestId, params: TurnStartParams, } | { "method": "turn/interrupt", id: RequestId, params: TurnInterruptParams, } | { "method": "review/start", id: RequestId, params: ReviewStartParams, } | { "method": "model/list", id: RequestId, params: ModelListParams, } | { "method": "mcpServer/oauth/login", id: RequestId, params: McpServerOauthLoginParams, } | { "method": "config/mcpServer/reload", id: RequestId, params: undefined, } | { "method": "mcpServerStatus/list", id: RequestId, params: ListMcpServerStatusParams, } | { "method": "account/login/start", id: RequestId, params: LoginAccountParams, } | { "method": "account/login/cancel", id: RequestId, params: CancelLoginAccountParams, } | { "method": "account/logout", id: RequestId, params: undefined, } | { "method": "account/rateLimits/read", id: RequestId, params: undefined, } | { "method": "feedback/upload", id: RequestId, params: FeedbackUploadParams, } | { "method": "command/exec", id: RequestId, params: CommandExecParams, } | { "method": "config/read", id: RequestId, params: ConfigReadParams, } | { "method": "config/value/write", id: RequestId, params: ConfigValueWriteParams, } | { "method": "config/batchWrite", id: RequestId, params: ConfigBatchWriteParams, } | { "method": "configRequirements/read", id: RequestId, params: undefined, } | { "method": "account/read", id: RequestId, params: GetAccountParams, } | { "method": "newConversation", id: RequestId, params: NewConversationParams, } | { "method": "getConversationSummary", id: RequestId, params: GetConversationSummaryParams, } | { "method": "listConversations", id: RequestId, params: ListConversationsParams, } | { "method": "resumeConversation", id: RequestId, params: ResumeConversationParams, } | { "method": "forkConversation", id: RequestId, params: ForkConversationParams, } | { "method": "archiveConversation", id: RequestId, params: ArchiveConversationParams, } | { "method": "sendUserMessage", id: RequestId, params: SendUserMessageParams, } | { "method": "sendUserTurn", id: RequestId, params: SendUserTurnParams, } | { "method": "interruptConversation", id: RequestId, params: InterruptConversationParams, } | { "method": "addConversationListener", id: RequestId, params: AddConversationListenerParams, } | { "method": "removeConversationListener", id: RequestId, params: RemoveConversationListenerParams, } | { "method": "gitDiffToRemote", id: RequestId, params: GitDiffToRemoteParams, } | { "method": "loginApiKey", id: RequestId, params: LoginApiKeyParams, } | { "method": "loginChatGpt", id: RequestId, params: undefined, } | { "method": "cancelLoginChatGpt", id: RequestId, params: CancelLoginChatGptParams, } | { "method": "logoutChatGpt", id: RequestId, params: undefined, } | { "method": "getAuthStatus", id: RequestId, params: GetAuthStatusParams, } | { "method": "getUserSavedConfig", id: RequestId, params: undefined, } | { "method": "setDefaultModel", id: RequestId, params: SetDefaultModelParams, } | { "method": "getUserAgent", id: RequestId, params: undefined, } | { "method": "userInfo", id: RequestId, params: undefined, } | { "method": "fuzzyFileSearch", id: RequestId, params: FuzzyFileSearchParams, } | { "method": "execOneOffCommand", id: RequestId, params: ExecOneOffCommandParams, };
export type ClientRequest ={ "method": "initialize", id: RequestId, params: InitializeParams, } | { "method": "thread/start", id: RequestId, params: ThreadStartParams, } | { "method": "thread/resume", id: RequestId, params: ThreadResumeParams, } | { "method": "thread/fork", id: RequestId, params: ThreadForkParams, } | { "method": "thread/archive", id: RequestId, params: ThreadArchiveParams, } | { "method": "thread/name/set", id: RequestId, params: ThreadSetNameParams, } | { "method": "thread/unarchive", id: RequestId, params: ThreadUnarchiveParams, } | { "method": "thread/compact/start", id: RequestId, params: ThreadCompactStartParams, } | { "method": "thread/rollback", id: RequestId, params: ThreadRollbackParams, } | { "method": "thread/list", id: RequestId, params: ThreadListParams, } | { "method": "thread/loaded/list", id: RequestId, params: ThreadLoadedListParams, } | { "method": "thread/read", id: RequestId, params: ThreadReadParams, } | { "method": "skills/list", id: RequestId, params: SkillsListParams, } | { "method": "skills/remote/read", id: RequestId, params: SkillsRemoteReadParams, } | { "method": "skills/remote/write", id: RequestId, params: SkillsRemoteWriteParams, } | { "method": "app/list", id: RequestId, params: AppsListParams, } | { "method": "skills/config/write", id: RequestId, params: SkillsConfigWriteParams, } | { "method": "turn/start", id: RequestId, params: TurnStartParams, } | { "method": "turn/interrupt", id: RequestId, params: TurnInterruptParams, } | { "method": "review/start", id: RequestId, params: ReviewStartParams, } | { "method": "model/list", id: RequestId, params: ModelListParams, } | { "method": "mcpServer/oauth/login", id: RequestId, params: McpServerOauthLoginParams, } | { "method": "config/mcpServer/reload", id: RequestId, params: undefined, } | { "method": "mcpServerStatus/list", id: RequestId, params: ListMcpServerStatusParams, } | { "method": "account/login/start", id: RequestId, params: LoginAccountParams, } | { "method": "account/login/cancel", id: RequestId, params: CancelLoginAccountParams, } | { "method": "account/logout", id: RequestId, params: undefined, } | { "method": "account/rateLimits/read", id: RequestId, params: undefined, } | { "method": "feedback/upload", id: RequestId, params: FeedbackUploadParams, } | { "method": "command/exec", id: RequestId, params: CommandExecParams, } | { "method": "config/read", id: RequestId, params: ConfigReadParams, } | { "method": "config/value/write", id: RequestId, params: ConfigValueWriteParams, } | { "method": "config/batchWrite", id: RequestId, params: ConfigBatchWriteParams, } | { "method": "configRequirements/read", id: RequestId, params: undefined, } | { "method": "account/read", id: RequestId, params: GetAccountParams, } | { "method": "newConversation", id: RequestId, params: NewConversationParams, } | { "method": "getConversationSummary", id: RequestId, params: GetConversationSummaryParams, } | { "method": "listConversations", id: RequestId, params: ListConversationsParams, } | { "method": "resumeConversation", id: RequestId, params: ResumeConversationParams, } | { "method": "forkConversation", id: RequestId, params: ForkConversationParams, } | { "method": "archiveConversation", id: RequestId, params: ArchiveConversationParams, } | { "method": "sendUserMessage", id: RequestId, params: SendUserMessageParams, } | { "method": "sendUserTurn", id: RequestId, params: SendUserTurnParams, } | { "method": "interruptConversation", id: RequestId, params: InterruptConversationParams, } | { "method": "addConversationListener", id: RequestId, params: AddConversationListenerParams, } | { "method": "removeConversationListener", id: RequestId, params: RemoveConversationListenerParams, } | { "method": "gitDiffToRemote", id: RequestId, params: GitDiffToRemoteParams, } | { "method": "loginApiKey", id: RequestId, params: LoginApiKeyParams, } | { "method": "loginChatGpt", id: RequestId, params: undefined, } | { "method": "cancelLoginChatGpt", id: RequestId, params: CancelLoginChatGptParams, } | { "method": "logoutChatGpt", id: RequestId, params: undefined, } | { "method": "getAuthStatus", id: RequestId, params: GetAuthStatusParams, } | { "method": "getUserSavedConfig", id: RequestId, params: undefined, } | { "method": "setDefaultModel", id: RequestId, params: SetDefaultModelParams, } | { "method": "getUserAgent", id: RequestId, params: undefined, } | { "method": "userInfo", id: RequestId, params: undefined, } | { "method": "fuzzyFileSearch", id: RequestId, params: FuzzyFileSearchParams, } | { "method": "execOneOffCommand", id: RequestId, params: ExecOneOffCommandParams, };

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type ThreadCompactStartParams = { threadId: string, };

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type ThreadCompactStartResponse = Record<string, never>;

View File

@@ -128,6 +128,8 @@ export type { TextRange } from "./TextRange";
export type { Thread } from "./Thread";
export type { ThreadArchiveParams } from "./ThreadArchiveParams";
export type { ThreadArchiveResponse } from "./ThreadArchiveResponse";
export type { ThreadCompactStartParams } from "./ThreadCompactStartParams";
export type { ThreadCompactStartResponse } from "./ThreadCompactStartResponse";
export type { ThreadForkParams } from "./ThreadForkParams";
export type { ThreadForkResponse } from "./ThreadForkResponse";
export type { ThreadItem } from "./ThreadItem";

View File

@@ -208,6 +208,10 @@ client_request_definitions! {
params: v2::ThreadUnarchiveParams,
response: v2::ThreadUnarchiveResponse,
},
ThreadCompactStart => "thread/compact/start" {
params: v2::ThreadCompactStartParams,
response: v2::ThreadCompactStartResponse,
},
ThreadRollback => "thread/rollback" {
params: v2::ThreadRollbackParams,
response: v2::ThreadRollbackResponse,

View File

@@ -1408,6 +1408,18 @@ pub struct ThreadUnarchiveResponse {
pub thread: Thread,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadCompactStartParams {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadCompactStartResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -85,6 +85,7 @@ Example (from OpenAI's official VSCode extension):
- `thread/archive` — move a threads rollout file into the archived directory; returns `{}` on success.
- `thread/name/set` — set or update a threads user-facing name; returns `{}` on success. Thread names are not required to be unique; name lookups resolve to the most recently updated thread.
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success.
- `thread/compact/start` — trigger conversation history compaction for a thread; returns `{}` immediately while progress streams through standard turn/item notifications.
- `thread/rollback` — drop the last N turns from the agents in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications.
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
@@ -239,6 +240,22 @@ Use `thread/unarchive` to move an archived rollout back into the sessions direct
{ "id": 24, "result": { "thread": { "id": "thr_b" } } }
```
### Example: Trigger thread compaction
Use `thread/compact/start` to trigger manual history compaction for a thread. The request returns immediately with `{}`.
Progress is emitted as standard `turn/*` and `item/*` notifications on the same `threadId`. Clients should expect a single compaction item:
- `item/started` with `item: { "type": "contextCompaction", ... }`
- `item/completed` with the same `contextCompaction` item id
While compaction is running, the thread is effectively in a turn so clients should surface progress UI based on the notifications.
```json
{ "method": "thread/compact/start", "id": 25, "params": { "threadId": "thr_b" } }
{ "id": 25, "result": {} }
```
### Example: Start a turn (send user input)
Turns attach user input (text or images) to a thread and trigger Codex generation. The `input` field is a list of discriminated unions:

View File

@@ -104,6 +104,8 @@ use codex_app_server_protocol::SkillsRemoteWriteResponse;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadCompactStartParams;
use codex_app_server_protocol::ThreadCompactStartResponse;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadItem;
@@ -455,6 +457,9 @@ impl CodexMessageProcessor {
ClientRequest::ThreadUnarchive { request_id, params } => {
self.thread_unarchive(request_id, params).await;
}
ClientRequest::ThreadCompactStart { request_id, params } => {
self.thread_compact_start(request_id, params).await;
}
ClientRequest::ThreadRollback { request_id, params } => {
self.thread_rollback(request_id, params).await;
}
@@ -2094,6 +2099,30 @@ impl CodexMessageProcessor {
}
}
async fn thread_compact_start(&self, request_id: RequestId, params: ThreadCompactStartParams) {
let ThreadCompactStartParams { thread_id } = params;
let (_, thread) = match self.load_thread(&thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
match thread.submit(Op::Compact).await {
Ok(_) => {
self.outgoing
.send_response(request_id, ThreadCompactStartResponse {})
.await;
}
Err(err) => {
self.send_internal_error(request_id, format!("failed to start compaction: {err}"))
.await;
}
}
}
async fn thread_list(&self, request_id: RequestId, params: ThreadListParams) {
let ThreadListParams {
cursor,

View File

@@ -50,6 +50,7 @@ use codex_app_server_protocol::SendUserTurnParams;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::SetDefaultModelParams;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadCompactStartParams;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadLoadedListParams;
@@ -418,6 +419,15 @@ impl McpProcess {
self.send_request("thread/unarchive", params).await
}
/// Send a `thread/compact/start` JSON-RPC request.
pub async fn send_thread_compact_start_request(
&mut self,
params: ThreadCompactStartParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/compact/start", params).await
}
/// Send a `thread/rollback` JSON-RPC request.
pub async fn send_thread_rollback_request(
&mut self,

View File

@@ -15,9 +15,12 @@ use app_test_support::write_chatgpt_auth;
use app_test_support::write_mock_responses_config_toml;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadCompactStartParams;
use codex_app_server_protocol::ThreadCompactStartResponse;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
@@ -39,6 +42,7 @@ use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const AUTO_COMPACT_LIMIT: i64 = 1_000;
const COMPACT_PROMPT: &str = "Summarize the conversation.";
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_compaction_local_emits_started_and_completed_items() -> Result<()> {
@@ -196,6 +200,134 @@ async fn auto_compaction_remote_emits_started_and_completed_items() -> Result<()
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn thread_compact_start_triggers_compaction_and_returns_empty_response() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let sse = responses::sse(vec![
responses::ev_assistant_message("m1", "MANUAL_COMPACT_SUMMARY"),
responses::ev_completed_with_tokens("r1", 200),
]);
responses::mount_sse_sequence(&server, vec![sse]).await;
let codex_home = TempDir::new()?;
write_mock_responses_config_toml(
codex_home.path(),
&server.uri(),
&BTreeMap::default(),
AUTO_COMPACT_LIMIT,
None,
"mock_provider",
COMPACT_PROMPT,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_id = start_thread(&mut mcp).await?;
let compact_id = mcp
.send_thread_compact_start_request(ThreadCompactStartParams {
thread_id: thread_id.clone(),
})
.await?;
let compact_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(compact_id)),
)
.await??;
let _compact: ThreadCompactStartResponse =
to_response::<ThreadCompactStartResponse>(compact_resp)?;
let started = wait_for_context_compaction_started(&mut mcp).await?;
let completed = wait_for_context_compaction_completed(&mut mcp).await?;
let ThreadItem::ContextCompaction { id: started_id } = started.item else {
unreachable!("started item should be context compaction");
};
let ThreadItem::ContextCompaction { id: completed_id } = completed.item else {
unreachable!("completed item should be context compaction");
};
assert_eq!(started.thread_id, thread_id);
assert_eq!(completed.thread_id, thread_id);
assert_eq!(started_id, completed_id);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn thread_compact_start_rejects_invalid_thread_id() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let codex_home = TempDir::new()?;
write_mock_responses_config_toml(
codex_home.path(),
&server.uri(),
&BTreeMap::default(),
AUTO_COMPACT_LIMIT,
None,
"mock_provider",
COMPACT_PROMPT,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_thread_compact_start_request(ThreadCompactStartParams {
thread_id: "not-a-thread-id".to_string(),
})
.await?;
let error: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE);
assert!(error.error.message.contains("invalid thread id"));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn thread_compact_start_rejects_unknown_thread_id() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let codex_home = TempDir::new()?;
write_mock_responses_config_toml(
codex_home.path(),
&server.uri(),
&BTreeMap::default(),
AUTO_COMPACT_LIMIT,
None,
"mock_provider",
COMPACT_PROMPT,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_thread_compact_start_request(ThreadCompactStartParams {
thread_id: "67e55044-10b1-426f-9247-bb680e5fe0c8".to_string(),
})
.await?;
let error: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE);
assert!(error.error.message.contains("thread not found"));
Ok(())
}
async fn start_thread(mcp: &mut McpProcess) -> Result<String> {
let thread_id = mcp
.send_thread_start_request(ThreadStartParams {

View File

@@ -1,13 +1,10 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::OnceLock;
use std::sync::RwLock;
use crate::api_bridge::CoreAuthProvider;
use crate::api_bridge::auth_provider_from_auth;
use crate::api_bridge::map_api_error;
use crate::auth::UnauthorizedRecovery;
use crate::turn_metadata::build_turn_metadata_header;
use codex_api::CompactClient as ApiCompactClient;
use codex_api::CompactionInput as ApiCompactionInput;
use codex_api::Prompt as ApiPrompt;
@@ -72,12 +69,8 @@ use crate::transport_manager::TransportManager;
pub const WEB_SEARCH_ELIGIBLE_HEADER: &str = "x-oai-web-search-eligible";
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata";
#[derive(Debug, Default)]
struct TurnMetadataCache {
cwd: Option<PathBuf>,
header: Option<HeaderValue>,
}
pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
"x-responsesapi-include-timing-metrics";
#[derive(Debug)]
struct ModelClientState {
@@ -91,7 +84,6 @@ struct ModelClientState {
summary: ReasoningSummaryConfig,
session_source: SessionSource,
transport_manager: TransportManager,
turn_metadata_cache: Arc<RwLock<TurnMetadataCache>>,
}
#[derive(Debug, Clone)]
@@ -104,6 +96,7 @@ pub struct ModelClientSession {
connection: Option<ApiWebSocketConnection>,
websocket_last_items: Vec<ResponseItem>,
transport_manager: TransportManager,
turn_metadata_header: Option<String>,
/// Turn state for sticky routing.
///
/// This is an `OnceLock` that stores the turn state value received from the server
@@ -143,53 +136,20 @@ impl ModelClient {
summary,
session_source,
transport_manager,
turn_metadata_cache: Arc::new(RwLock::new(TurnMetadataCache::default())),
}),
}
}
pub fn new_session(&self, turn_metadata_cwd: Option<PathBuf>) -> ModelClientSession {
self.prewarm_turn_metadata_header(turn_metadata_cwd);
pub fn new_session(&self, turn_metadata_header: Option<String>) -> ModelClientSession {
ModelClientSession {
state: Arc::clone(&self.state),
connection: None,
websocket_last_items: Vec::new(),
transport_manager: self.state.transport_manager.clone(),
turn_metadata_header,
turn_state: Arc::new(OnceLock::new()),
}
}
/// Refresh turn metadata in the background and update a cached header that request
/// builders can read without blocking.
fn prewarm_turn_metadata_header(&self, turn_metadata_cwd: Option<PathBuf>) {
let turn_metadata_cwd =
turn_metadata_cwd.map(|cwd| std::fs::canonicalize(&cwd).unwrap_or(cwd));
if let Ok(mut cache) = self.state.turn_metadata_cache.write()
&& cache.cwd != turn_metadata_cwd
{
cache.cwd = turn_metadata_cwd.clone();
cache.header = None;
}
let Some(cwd) = turn_metadata_cwd else {
return;
};
let turn_metadata_cache = Arc::clone(&self.state.turn_metadata_cache);
if let Ok(handle) = tokio::runtime::Handle::try_current() {
let _task = handle.spawn(async move {
let header = build_turn_metadata_header(cwd.as_path())
.await
.and_then(|value| HeaderValue::from_str(value.as_str()).ok());
if let Ok(mut cache) = turn_metadata_cache.write()
&& cache.cwd.as_ref() == Some(&cwd)
{
cache.header = header;
}
});
}
}
}
impl ModelClient {
@@ -298,14 +258,6 @@ impl ModelClient {
}
impl ModelClientSession {
fn turn_metadata_header(&self) -> Option<HeaderValue> {
self.state
.turn_metadata_cache
.try_read()
.ok()
.and_then(|cache| cache.header.clone())
}
/// Streams a single model turn using the configured Responses transport.
pub async fn stream(&mut self, prompt: &Prompt) -> Result<ResponseStream> {
let wire_api = self.state.provider.wire_api;
@@ -362,7 +314,10 @@ impl ModelClientSession {
prompt: &Prompt,
compression: Compression,
) -> ApiResponsesOptions {
let turn_metadata_header = self.turn_metadata_header();
let turn_metadata_header = self
.turn_metadata_header
.as_deref()
.and_then(|value| HeaderValue::from_str(value).ok());
let model_info = &self.state.model_info;
let default_reasoning_effort = model_info.default_reasoning_level;
@@ -489,6 +444,12 @@ impl ModelClientSession {
if needs_new {
let mut headers = options.extra_headers.clone();
headers.extend(build_conversation_headers(options.conversation_id.clone()));
if self.state.config.features.enabled(Feature::RuntimeMetrics) {
headers.insert(
X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER,
HeaderValue::from_static("true"),
);
}
let websocket_telemetry = self.build_websocket_telemetry();
let new_conn: ApiWebSocketConnection =
ApiWebSocketResponsesClient::new(api_provider, api_auth)

View File

@@ -34,6 +34,7 @@ use crate::stream_events_utils::last_assistant_message_from_item;
use crate::terminal;
use crate::transport_manager::TransportManager;
use crate::truncate::TruncationPolicy;
use crate::turn_metadata::build_turn_metadata_header;
use crate::user_notification::UserNotifier;
use crate::util::error_or_panic;
use async_channel::Receiver;
@@ -80,6 +81,7 @@ use rmcp::model::RequestId;
use serde_json;
use serde_json::Value;
use tokio::sync::Mutex;
use tokio::sync::OnceCell;
use tokio::sync::RwLock;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
@@ -90,6 +92,7 @@ use tracing::field;
use tracing::info;
use tracing::info_span;
use tracing::instrument;
use tracing::trace;
use tracing::trace_span;
use tracing::warn;
@@ -501,6 +504,7 @@ pub(crate) struct TurnContext {
pub(crate) tool_call_gate: Arc<ReadinessFlag>,
pub(crate) truncation_policy: TruncationPolicy,
pub(crate) dynamic_tools: Vec<DynamicToolSpec>,
turn_metadata_header: OnceCell<Option<String>>,
}
impl TurnContext {
pub(crate) fn resolve_path(&self, path: Option<String>) -> PathBuf {
@@ -514,6 +518,38 @@ impl TurnContext {
.as_deref()
.unwrap_or(compact::SUMMARIZATION_PROMPT)
}
async fn build_turn_metadata_header(&self) -> Option<String> {
self.turn_metadata_header
.get_or_init(|| async { build_turn_metadata_header(self.cwd.as_path()).await })
.await
.clone()
}
pub async fn resolve_turn_metadata_header(&self) -> Option<String> {
const TURN_METADATA_HEADER_TIMEOUT_MS: u64 = 250;
match tokio::time::timeout(
std::time::Duration::from_millis(TURN_METADATA_HEADER_TIMEOUT_MS),
self.build_turn_metadata_header(),
)
.await
{
Ok(header) => header,
Err(_) => {
warn!("timed out after 250ms while building turn metadata header");
self.turn_metadata_header.get().cloned().flatten()
}
}
}
pub fn spawn_turn_metadata_header_task(self: &Arc<Self>) {
let context = Arc::clone(self);
tokio::spawn(async move {
trace!("Spawning turn metadata calculation task");
context.build_turn_metadata_header().await;
trace!("Turn metadata calculation task completed");
});
}
}
#[derive(Clone)]
@@ -682,10 +718,11 @@ impl Session {
web_search_mode: per_turn_config.web_search_mode,
});
let cwd = session_configuration.cwd.clone();
TurnContext {
sub_id,
client,
cwd: session_configuration.cwd.clone(),
cwd,
developer_instructions: session_configuration.developer_instructions.clone(),
compact_prompt: session_configuration.compact_prompt.clone(),
user_instructions: session_configuration.user_instructions.clone(),
@@ -702,6 +739,7 @@ impl Session {
tool_call_gate: Arc::new(ReadinessFlag::new()),
truncation_policy: model_info.truncation_policy.into(),
dynamic_tools: session_configuration.dynamic_tools.clone(),
turn_metadata_header: OnceCell::new(),
}
}
@@ -1246,10 +1284,13 @@ impl Session {
sub_id,
self.services.transport_manager.clone(),
);
if let Some(final_schema) = final_output_json_schema {
turn_context.final_output_json_schema = final_schema;
}
Arc::new(turn_context)
let turn_context = Arc::new(turn_context);
turn_context.spawn_turn_metadata_header_task();
turn_context
}
pub(crate) async fn new_default_turn(&self) -> Arc<TurnContext> {
@@ -3274,6 +3315,7 @@ async fn spawn_review_thread(
tool_call_gate: Arc::new(ReadinessFlag::new()),
dynamic_tools: parent_turn_context.dynamic_tools.clone(),
truncation_policy: model_info.truncation_policy.into(),
turn_metadata_header: parent_turn_context.turn_metadata_header.clone(),
};
// Seed the child task with the review prompt as the initial user message.
@@ -3478,9 +3520,8 @@ pub(crate) async fn run_turn(
// many turns, from the perspective of the user, it is a single turn.
let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
let mut client_session = turn_context
.client
.new_session(Some(turn_context.cwd.clone()));
let turn_metadata_header = turn_context.resolve_turn_metadata_header().await;
let mut client_session = turn_context.client.new_session(turn_metadata_header);
loop {
// Note that pending_input would be something like a message the user

View File

@@ -337,9 +337,8 @@ async fn drain_to_completed(
turn_context: &TurnContext,
prompt: &Prompt,
) -> CodexResult<()> {
let mut client_session = turn_context
.client
.new_session(Some(turn_context.cwd.clone()));
let turn_metadata_header = turn_context.resolve_turn_metadata_header().await;
let mut client_session = turn_context.client.new_session(turn_metadata_header);
let mut stream = client_session.stream(prompt).await?;
loop {
let maybe_event = stream.next().await;

View File

@@ -61,6 +61,7 @@ pub mod token_data;
mod truncate;
mod unified_exec;
pub mod windows_sandbox;
pub use client::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER;
pub use model_provider_info::DEFAULT_LMSTUDIO_PORT;
pub use model_provider_info::DEFAULT_OLLAMA_PORT;
pub use model_provider_info::LMSTUDIO_OSS_PROVIDER_ID;
@@ -138,6 +139,7 @@ pub use exec_policy::check_execpolicy_for_warnings;
pub use exec_policy::load_exec_policy;
pub use safety::get_platform_sandbox;
pub use tools::spec::parse_tool_input_schema;
pub use turn_metadata::build_turn_metadata_header;
// Re-export the protocol types from the standalone `codex-protocol` crate so existing
// `codex_core::protocol::...` references continue to work across the workspace.
pub use codex_protocol::protocol;

View File

@@ -20,7 +20,7 @@ struct TurnMetadata {
workspaces: BTreeMap<String, TurnMetadataWorkspace>,
}
pub(crate) async fn build_turn_metadata_header(cwd: &Path) -> Option<String> {
pub async fn build_turn_metadata_header(cwd: &Path) -> Option<String> {
let repo_root = get_git_repo_root(cwd)?;
let (latest_git_commit_hash, associated_remote_urls) = tokio::join!(

View File

@@ -408,88 +408,14 @@ async fn responses_stream_includes_turn_metadata_header_for_git_workspace_e2e()
responses::ev_response_created("resp-1"),
responses::ev_completed("resp-1"),
]);
let provider = ModelProviderInfo {
name: "mock".into(),
base_url: Some(format!("{}/v1", server.uri())),
env_key: None,
env_key_instructions: None,
experimental_bearer_token: None,
wire_api: WireApi::Responses,
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: Some(0),
stream_max_retries: Some(0),
stream_idle_timeout_ms: Some(5_000),
requires_openai_auth: false,
supports_websockets: false,
};
let codex_home = TempDir::new().expect("failed to create TempDir");
let mut config = load_default_config_for_test(&codex_home).await;
config.model_provider_id = provider.name.clone();
config.model_provider = provider.clone();
let effort = config.model_reasoning_effort;
let summary = config.model_reasoning_summary;
let model = ModelsManager::get_model_offline(config.model.as_deref());
config.model = Some(model.clone());
let config = Arc::new(config);
let conversation_id = ThreadId::new();
let auth_mode = AuthMode::Chatgpt;
let session_source =
SessionSource::SubAgent(SubAgentSource::Other("turn-metadata-e2e".to_string()));
let model_info = ModelsManager::construct_model_info_offline(model.as_str(), &config);
let otel_manager = OtelManager::new(
conversation_id,
model.as_str(),
model_info.slug.as_str(),
None,
Some("test@test.com".to_string()),
Some(auth_mode),
false,
"test".to_string(),
session_source.clone(),
);
let client = ModelClient::new(
Arc::clone(&config),
None,
model_info,
otel_manager,
provider,
effort,
summary,
conversation_id,
session_source,
TransportManager::new(),
);
let workspace = TempDir::new().expect("workspace tempdir");
let cwd = workspace.path();
let mut prompt = Prompt::default();
prompt.input = vec![ResponseItem::Message {
id: None,
role: "user".into(),
content: vec![ContentItem::InputText {
text: "hello".into(),
}],
end_turn: None,
phase: None,
}];
let test = test_codex().build(&server).await.expect("build test codex");
let cwd = test.cwd_path();
let first_request = responses::mount_sse_once(&server, response_body.clone()).await;
let mut first_session = client.new_session(Some(cwd.to_path_buf()));
let mut first_stream = first_session
.stream(&prompt)
test.submit_turn("hello")
.await
.expect("stream first turn");
while let Some(event) = first_stream.next().await {
if matches!(event, Ok(ResponseEvent::Completed { .. })) {
break;
}
}
.expect("submit first turn prompt");
assert_eq!(
first_request
.single_request()
@@ -539,21 +465,13 @@ async fn responses_stream_includes_turn_metadata_header_for_git_workspace_e2e()
.trim()
.to_string();
let repo_root = std::fs::canonicalize(cwd)
.unwrap_or_else(|_| cwd.to_path_buf())
.to_string_lossy()
.into_owned();
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
loop {
let request_recorder = responses::mount_sse_once(&server, response_body.clone()).await;
let mut session = client.new_session(Some(cwd.to_path_buf()));
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut stream = session.stream(&prompt).await.expect("stream post-git turn");
while let Some(event) = stream.next().await {
if matches!(event, Ok(ResponseEvent::Completed { .. })) {
break;
}
}
test.submit_turn("hello")
.await
.expect("submit post-git turn prompt");
let maybe_header = request_recorder
.single_request()
@@ -561,11 +479,14 @@ async fn responses_stream_includes_turn_metadata_header_for_git_workspace_e2e()
if let Some(header_value) = maybe_header {
let parsed: serde_json::Value = serde_json::from_str(&header_value)
.expect("x-codex-turn-metadata should be valid JSON");
let workspace = parsed
let workspaces = parsed
.get("workspaces")
.and_then(serde_json::Value::as_object)
.and_then(|workspaces| workspaces.get(&repo_root))
.expect("metadata should include cwd repo root workspace entry");
.expect("metadata should include workspaces");
let workspace = workspaces
.values()
.next()
.expect("metadata should include at least one workspace entry");
assert_eq!(
workspace

View File

@@ -10,6 +10,7 @@ use codex_core::ResponseEvent;
use codex_core::ResponseItem;
use codex_core::TransportManager;
use codex_core::WireApi;
use codex_core::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER;
use codex_core::features::Feature;
use codex_core::models_manager::manager::ModelsManager;
use codex_core::protocol::SessionSource;
@@ -102,6 +103,72 @@ async fn responses_websocket_emits_websocket_telemetry_events() {
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_includes_timing_metrics_header_when_runtime_metrics_enabled() {
skip_if_no_network!();
let server = start_websocket_server(vec![vec![vec![
ev_response_created("resp-1"),
serde_json::json!({
"type": "responsesapi.websocket_timing",
"timing_metrics": {
"responses_duration_excl_engine_and_client_tool_time_ms": 120,
"engine_service_total_ms": 450
}
}),
ev_completed("resp-1"),
]]])
.await;
let harness = websocket_harness_with_runtime_metrics(&server, true).await;
harness.otel_manager.reset_runtime_metrics();
let mut session = harness.client.new_session(None);
let prompt = prompt_with_input(vec![message_item("hello")]);
stream_until_complete(&mut session, &prompt).await;
tokio::time::sleep(Duration::from_millis(10)).await;
let handshake = server.single_handshake();
assert_eq!(
handshake.header(X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER),
Some("true".to_string())
);
let summary = harness
.otel_manager
.runtime_metrics_summary()
.expect("runtime metrics summary");
assert_eq!(summary.responses_api_overhead_ms, 120);
assert_eq!(summary.responses_api_inference_time_ms, 450);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_omits_timing_metrics_header_when_runtime_metrics_disabled() {
skip_if_no_network!();
let server = start_websocket_server(vec![vec![vec![
ev_response_created("resp-1"),
ev_completed("resp-1"),
]]])
.await;
let harness = websocket_harness_with_runtime_metrics(&server, false).await;
let mut session = harness.client.new_session(None);
let prompt = prompt_with_input(vec![message_item("hello")]);
stream_until_complete(&mut session, &prompt).await;
let handshake = server.single_handshake();
assert_eq!(
handshake.header(X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER),
None
);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_emits_reasoning_included_event() {
skip_if_no_network!();
@@ -241,11 +308,21 @@ fn websocket_provider(server: &WebSocketTestServer) -> ModelProviderInfo {
}
async fn websocket_harness(server: &WebSocketTestServer) -> WebsocketTestHarness {
websocket_harness_with_runtime_metrics(server, false).await
}
async fn websocket_harness_with_runtime_metrics(
server: &WebSocketTestServer,
runtime_metrics_enabled: bool,
) -> WebsocketTestHarness {
let provider = websocket_provider(server);
let codex_home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&codex_home).await;
config.model = Some(MODEL.to_string());
config.features.enable(Feature::ResponsesWebsockets);
if runtime_metrics_enabled {
config.features.enable(Feature::RuntimeMetrics);
}
let config = Arc::new(config);
let model_info = ModelsManager::construct_model_info_offline(MODEL, &config);
let conversation_id = ThreadId::new();

View File

@@ -8,3 +8,7 @@ pub(crate) const WEBSOCKET_REQUEST_COUNT_METRIC: &str = "codex.websocket.request
pub(crate) const WEBSOCKET_REQUEST_DURATION_METRIC: &str = "codex.websocket.request.duration_ms";
pub(crate) const WEBSOCKET_EVENT_COUNT_METRIC: &str = "codex.websocket.event";
pub(crate) const WEBSOCKET_EVENT_DURATION_METRIC: &str = "codex.websocket.event.duration_ms";
pub(crate) const RESPONSES_API_OVERHEAD_DURATION_METRIC: &str =
"codex.responses_api_overhead.duration_ms";
pub(crate) const RESPONSES_API_INFERENCE_TIME_DURATION_METRIC: &str =
"codex.responses_api_inference_time.duration_ms";

View File

@@ -1,5 +1,7 @@
use crate::metrics::names::API_CALL_COUNT_METRIC;
use crate::metrics::names::API_CALL_DURATION_METRIC;
use crate::metrics::names::RESPONSES_API_INFERENCE_TIME_DURATION_METRIC;
use crate::metrics::names::RESPONSES_API_OVERHEAD_DURATION_METRIC;
use crate::metrics::names::SSE_EVENT_COUNT_METRIC;
use crate::metrics::names::SSE_EVENT_DURATION_METRIC;
use crate::metrics::names::TOOL_CALL_COUNT_METRIC;
@@ -32,6 +34,8 @@ pub struct RuntimeMetricsSummary {
pub streaming_events: RuntimeMetricTotals,
pub websocket_calls: RuntimeMetricTotals,
pub websocket_events: RuntimeMetricTotals,
pub responses_api_overhead_ms: u64,
pub responses_api_inference_time_ms: u64,
}
impl RuntimeMetricsSummary {
@@ -41,6 +45,8 @@ impl RuntimeMetricsSummary {
&& self.streaming_events.is_empty()
&& self.websocket_calls.is_empty()
&& self.websocket_events.is_empty()
&& self.responses_api_overhead_ms == 0
&& self.responses_api_inference_time_ms == 0
}
pub(crate) fn from_snapshot(snapshot: &ResourceMetrics) -> Self {
@@ -64,12 +70,18 @@ impl RuntimeMetricsSummary {
count: sum_counter(snapshot, WEBSOCKET_EVENT_COUNT_METRIC),
duration_ms: sum_histogram_ms(snapshot, WEBSOCKET_EVENT_DURATION_METRIC),
};
let responses_api_overhead_ms =
sum_histogram_ms(snapshot, RESPONSES_API_OVERHEAD_DURATION_METRIC);
let responses_api_inference_time_ms =
sum_histogram_ms(snapshot, RESPONSES_API_INFERENCE_TIME_DURATION_METRIC);
Self {
tool_calls,
api_calls,
streaming_events,
websocket_calls,
websocket_events,
responses_api_overhead_ms,
responses_api_inference_time_ms,
}
}
}

View File

@@ -1,5 +1,7 @@
use crate::metrics::names::API_CALL_COUNT_METRIC;
use crate::metrics::names::API_CALL_DURATION_METRIC;
use crate::metrics::names::RESPONSES_API_INFERENCE_TIME_DURATION_METRIC;
use crate::metrics::names::RESPONSES_API_OVERHEAD_DURATION_METRIC;
use crate::metrics::names::SSE_EVENT_COUNT_METRIC;
use crate::metrics::names::SSE_EVENT_DURATION_METRIC;
use crate::metrics::names::TOOL_CALL_COUNT_METRIC;
@@ -42,6 +44,10 @@ pub use crate::ToolDecisionSource;
const SSE_UNKNOWN_KIND: &str = "unknown";
const WEBSOCKET_UNKNOWN_KIND: &str = "unknown";
const RESPONSES_WEBSOCKET_TIMING_KIND: &str = "responsesapi.websocket_timing";
const RESPONSES_WEBSOCKET_TIMING_METRICS_FIELD: &str = "timing_metrics";
const RESPONSES_API_OVERHEAD_FIELD: &str = "responses_duration_excl_engine_and_client_tool_time_ms";
const RESPONSES_API_INFERENCE_FIELD: &str = "engine_service_total_ms";
impl OtelManager {
#[allow(clippy::too_many_arguments)]
@@ -252,6 +258,9 @@ impl OtelManager {
.get("type")
.and_then(|value| value.as_str())
.map(std::string::ToString::to_string);
if kind.as_deref() == Some(RESPONSES_WEBSOCKET_TIMING_KIND) {
self.record_responses_websocket_timing_metrics(&value);
}
if kind.as_deref() == Some("response.failed") {
success = false;
error_message = value
@@ -651,6 +660,22 @@ impl OtelManager {
);
}
fn record_responses_websocket_timing_metrics(&self, value: &serde_json::Value) {
let timing_metrics = value.get(RESPONSES_WEBSOCKET_TIMING_METRICS_FIELD);
let overhead_value =
timing_metrics.and_then(|value| value.get(RESPONSES_API_OVERHEAD_FIELD));
if let Some(duration) = duration_from_ms_value(overhead_value) {
self.record_duration(RESPONSES_API_OVERHEAD_DURATION_METRIC, duration, &[]);
}
let inference_value =
timing_metrics.and_then(|value| value.get(RESPONSES_API_INFERENCE_FIELD));
if let Some(duration) = duration_from_ms_value(inference_value) {
self.record_duration(RESPONSES_API_INFERENCE_TIME_DURATION_METRIC, duration, &[]);
}
}
fn responses_type(event: &ResponseEvent) -> String {
match event {
ResponseEvent::Created => "created".into(),
@@ -689,3 +714,16 @@ impl OtelManager {
fn timestamp() -> String {
Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)
}
fn duration_from_ms_value(value: Option<&serde_json::Value>) -> Option<Duration> {
let value = value?;
let ms = value
.as_f64()
.or_else(|| value.as_i64().map(|v| v as f64))
.or_else(|| value.as_u64().map(|v| v as f64))?;
if !ms.is_finite() || ms < 0.0 {
return None;
}
let clamped = ms.min(u64::MAX as f64);
Some(Duration::from_millis(clamped.round() as u64))
}

View File

@@ -62,6 +62,14 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<(
r#"{"type":"response.created"}"#.into(),
))));
manager.record_websocket_event(&ws_response, Duration::from_millis(80));
let ws_timing_response: std::result::Result<
Option<std::result::Result<Message, tokio_tungstenite::tungstenite::Error>>,
codex_api::ApiError,
> = Ok(Some(Ok(Message::Text(
r#"{"type":"responsesapi.websocket_timing","timing_metrics":{"responses_duration_excl_engine_and_client_tool_time_ms":124,"engine_service_total_ms":457}}"#
.into(),
))));
manager.record_websocket_event(&ws_timing_response, Duration::from_millis(20));
let summary = manager
.runtime_metrics_summary()
@@ -84,9 +92,11 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<(
duration_ms: 400,
},
websocket_events: RuntimeMetricTotals {
count: 1,
duration_ms: 80,
count: 2,
duration_ms: 100,
},
responses_api_overhead_ms: 124,
responses_api_inference_time_ms: 457,
};
assert_eq!(summary, expected);

View File

@@ -249,7 +249,7 @@ impl DeveloperInstructions {
match command_prefixes {
Some(prefixes) => {
format!(
"{APPROVAL_POLICY_ON_REQUEST_RULE}\nApproved command prefixes:\n{prefixes}"
"{APPROVAL_POLICY_ON_REQUEST_RULE}\n## Approved command prefixes\nThe following prefix rules have already been approved: {prefixes}"
)
}
None => APPROVAL_POLICY_ON_REQUEST_RULE.to_string(),

View File

@@ -37,25 +37,20 @@ While commands are running inside the sandbox, here are some scenarios that will
- You need to run a GUI app (e.g., open/xdg-open/osascript) to open browsers or files.
- If you run a command that is important to solving the user's query, but it fails because of sandboxing, rerun the command with `require_escalated`. ALWAYS proceed to use the `sandbox_permissions` and `justification` parameters. do not message the user before requesting approval for the command.
- You are about to take a potentially destructive action such as an `rm` or `git reset` that the user did not explicitly ask for.
Only run commands that require approval if it is absolutely necessary to solve the user's query, don't try and circumvent approvals by using other tools.
- Be judicious with escalating, but if completing the user's request requires it, you should do so - don't try and circumvent approvals by using other tools.
## prefix_rule guidance
When choosing a `prefix_rule`, request one that will allow you to fulfill similar requests from the user in the future without re-requesting escalation. It should be categorical and reasonably scoped to similar capabilities. You MUST NOT pass the entire command into `prefix_rule`.
When choosing a `prefix_rule`, request one that will allow you to fulfill similar requests from the user in the future without re-requesting escalation. It should be categorical and reasonably scoped to similar capabilities. You should rarely pass the entire command into `prefix_rule`.
### Banned prefix_rules
NEVER provide a prefix_rule argument for destructive commands like rm.
Do not provide a prefix rule if your command uses a heredoc or herestring.
### Examples
Good examples of prefixes:
- ["npm", "run", "dev"]
- ["gh", "pr", "check"]
- ["pytest"]
- ["cargo", "test"]
<good_example reason="frequently run command">
["npm", "run", "dev"]
</good_example>
<good_example reason="generic and reusable">
["gh", "pr", "checks"]
</good_example>
<good_example reason="helpful for development cycle">
["pytest"]
</good_example>
<bad_example reason="too specific">
["cargo", "test", "-p", "codex-app-server"]
<correction_to_good_example>
["cargo", "test"]
</correction_to_good_example>
</bad_example>

View File

@@ -427,8 +427,8 @@ impl RequestUserInputOverlay {
if self.selected_option_index().is_some() && !notes_visible {
tips.push(FooterTip::highlighted("tab to add notes"));
}
if self.selected_option_index().is_some() && notes_visible && self.focus_is_notes() {
tips.push(FooterTip::new("tab to clear notes"));
if self.selected_option_index().is_some() && notes_visible {
tips.push(FooterTip::new("tab or esc to clear notes"));
}
}
@@ -449,7 +449,9 @@ impl RequestUserInputOverlay {
tips.push(FooterTip::new("ctrl + n next question"));
}
}
tips.push(FooterTip::new("esc to interrupt"));
if !(self.has_options() && notes_visible) {
tips.push(FooterTip::new("esc to interrupt"));
}
tips
}
@@ -663,6 +665,23 @@ impl RequestUserInputOverlay {
self.sync_composer_placeholder();
}
fn clear_notes_and_focus_options(&mut self) {
if !self.has_options() {
return;
}
if let Some(answer) = self.current_answer_mut() {
answer.draft = ComposerDraft::default();
answer.answer_committed = false;
answer.notes_visible = false;
}
self.pending_submission_draft = None;
self.composer
.set_text_content(String::new(), Vec::new(), Vec::new());
self.composer.move_cursor_to_end();
self.focus = Focus::Options;
self.sync_composer_placeholder();
}
/// Ensure there is a selection before allowing notes entry.
fn ensure_selected_for_notes(&mut self) {
if let Some(answer) = self.current_answer_mut() {
@@ -976,6 +995,10 @@ impl BottomPaneView for RequestUserInputOverlay {
}
if matches!(key_event.code, KeyCode::Esc) {
if self.has_options() && self.notes_ui_visible() {
self.clear_notes_and_focus_options();
return;
}
// TODO: Emit interrupted request_user_input results (including committed answers)
// once core supports persisting them reliably without follow-up turn issues.
self.app_event_tx.send(AppEvent::CodexOp(Op::Interrupt));
@@ -1093,16 +1116,7 @@ impl BottomPaneView for RequestUserInputOverlay {
Focus::Notes => {
let notes_empty = self.composer.current_text_with_pending().trim().is_empty();
if self.has_options() && matches!(key_event.code, KeyCode::Tab) {
if let Some(answer) = self.current_answer_mut() {
answer.draft = ComposerDraft::default();
answer.answer_committed = false;
answer.notes_visible = false;
}
self.composer
.set_text_content(String::new(), Vec::new(), Vec::new());
self.composer.move_cursor_to_end();
self.focus = Focus::Options;
self.sync_composer_placeholder();
self.clear_notes_and_focus_options();
return;
}
if self.has_options() && matches!(key_event.code, KeyCode::Backspace) && notes_empty
@@ -1753,7 +1767,7 @@ mod tests {
}
#[test]
fn esc_in_notes_mode_interrupts() {
fn esc_in_notes_mode_clears_notes_and_hides_ui() {
let (tx, mut rx) = test_sender();
let mut overlay = RequestUserInputOverlay::new(
request_event("turn-1", vec![question_with_options("q1", "Pick one")]),
@@ -1769,12 +1783,19 @@ mod tests {
overlay.handle_key_event(KeyEvent::from(KeyCode::Tab));
overlay.handle_key_event(KeyEvent::from(KeyCode::Esc));
assert_eq!(overlay.done, true);
expect_interrupt_only(&mut rx);
let answer = overlay.current_answer().expect("answer missing");
assert_eq!(overlay.done, false);
assert!(matches!(overlay.focus, Focus::Options));
assert_eq!(overlay.notes_ui_visible(), false);
assert_eq!(overlay.composer.current_text_with_pending(), "");
assert_eq!(answer.draft.text, "");
assert_eq!(answer.options_state.selected_idx, Some(0));
assert_eq!(answer.answer_committed, false);
assert!(rx.try_recv().is_err());
}
#[test]
fn esc_in_notes_mode_interrupts_with_notes_visible() {
fn esc_in_notes_mode_with_text_clears_notes_and_hides_ui() {
let (tx, mut rx) = test_sender();
let mut overlay = RequestUserInputOverlay::new(
request_event("turn-1", vec![question_with_options("q1", "Pick one")]),
@@ -1791,8 +1812,15 @@ mod tests {
overlay.handle_key_event(KeyEvent::from(KeyCode::Char('a')));
overlay.handle_key_event(KeyEvent::from(KeyCode::Esc));
assert_eq!(overlay.done, true);
expect_interrupt_only(&mut rx);
let answer = overlay.current_answer().expect("answer missing");
assert_eq!(overlay.done, false);
assert!(matches!(overlay.focus, Focus::Options));
assert_eq!(overlay.notes_ui_visible(), false);
assert_eq!(overlay.composer.current_text_with_pending(), "");
assert_eq!(answer.draft.text, "");
assert_eq!(answer.options_state.selected_idx, Some(0));
assert_eq!(answer.answer_committed, false);
assert!(rx.try_recv().is_err());
}
#[test]

View File

@@ -1,5 +1,6 @@
---
source: tui/src/bottom_pane/request_user_input/mod.rs
assertion_line: 2321
expression: "render_snapshot(&overlay, area)"
---
@@ -16,4 +17,4 @@ expression: "render_snapshot(&overlay, area)"
tab to clear notes | enter to submit answer | esc to interrupt
tab or esc to clear notes | enter to submit answer

View File

@@ -2213,6 +2213,14 @@ fn runtime_metrics_label(summary: RuntimeMetricsSummary) -> Option<String> {
summary.websocket_events.count
));
}
if summary.responses_api_overhead_ms > 0 {
let duration = format_duration_ms(summary.responses_api_overhead_ms);
parts.push(format!("Responses API overhead: {duration}"));
}
if summary.responses_api_inference_time_ms > 0 {
let duration = format_duration_ms(summary.responses_api_inference_time_ms);
parts.push(format!("Responses API inference: {duration}"));
}
if parts.is_empty() {
None
} else {
@@ -2381,9 +2389,11 @@ mod tests {
count: 4,
duration_ms: 1_200,
},
responses_api_overhead_ms: 650,
responses_api_inference_time_ms: 1_940,
};
let cell = FinalMessageSeparator::new(Some(12), Some(summary));
let rendered = render_lines(&cell.display_lines(200));
let rendered = render_lines(&cell.display_lines(300));
assert_eq!(rendered.len(), 1);
assert!(!rendered[0].contains("Worked for"));
@@ -2392,6 +2402,8 @@ mod tests {
assert!(rendered[0].contains("WebSocket: 1 events send (700ms)"));
assert!(rendered[0].contains("Streams: 6 events (900ms)"));
assert!(rendered[0].contains("4 events received (1.2s)"));
assert!(rendered[0].contains("Responses API overhead: 650ms"));
assert!(rendered[0].contains("Responses API inference: 1.9s"));
}
#[test]

View File

@@ -30,7 +30,9 @@ friction for freeform input.
- Enter advances to the next question.
- Enter on the last question submits all answers.
- PageUp/PageDown navigate across questions (when multiple are present).
- Esc interrupts the run.
- Esc interrupts the run in option selection mode.
- When notes are open for an option question, Tab or Esc clears notes and returns
to option selection.
## Layout priorities