Compare commits

...

2 Commits

Author SHA1 Message Date
Neil Patel
4ab8eb4a79 Fix rebased test formatting and argument comments 2026-03-28 19:15:42 -04:00
Neil Patel
6c9e77727d Forward app-server turn client metadata 2026-03-28 19:15:41 -04:00
25 changed files with 805 additions and 27 deletions

View File

@@ -3167,6 +3167,16 @@
],
"description": "Override where approval requests are routed for review on this turn and subsequent turns."
},
"clientMetadata": {
"additionalProperties": {
"type": "string"
},
"description": "Optional turn-scoped Responses API client metadata.",
"type": [
"object",
"null"
]
},
"cwd": {
"description": "Override the working directory for this turn and subsequent turns.",
"type": [
@@ -3264,6 +3274,16 @@
},
"TurnSteerParams": {
"properties": {
"clientMetadata": {
"additionalProperties": {
"type": "string"
},
"description": "Optional turn-scoped Responses API client metadata.",
"type": [
"object",
"null"
]
},
"expectedTurnId": {
"description": "Required active turn id precondition. The request fails when it does not match the currently active turn.",
"type": "string"

View File

@@ -14338,6 +14338,16 @@
],
"description": "Override where approval requests are routed for review on this turn and subsequent turns."
},
"clientMetadata": {
"additionalProperties": {
"type": "string"
},
"description": "Optional turn-scoped Responses API client metadata.",
"type": [
"object",
"null"
]
},
"cwd": {
"description": "Override the working directory for this turn and subsequent turns.",
"type": [
@@ -14476,6 +14486,16 @@
"TurnSteerParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"clientMetadata": {
"additionalProperties": {
"type": "string"
},
"description": "Optional turn-scoped Responses API client metadata.",
"type": [
"object",
"null"
]
},
"expectedTurnId": {
"description": "Required active turn id precondition. The request fails when it does not match the currently active turn.",
"type": "string"

View File

@@ -12184,6 +12184,16 @@
],
"description": "Override where approval requests are routed for review on this turn and subsequent turns."
},
"clientMetadata": {
"additionalProperties": {
"type": "string"
},
"description": "Optional turn-scoped Responses API client metadata.",
"type": [
"object",
"null"
]
},
"cwd": {
"description": "Override the working directory for this turn and subsequent turns.",
"type": [
@@ -12322,6 +12332,16 @@
"TurnSteerParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"clientMetadata": {
"additionalProperties": {
"type": "string"
},
"description": "Optional turn-scoped Responses API client metadata.",
"type": [
"object",
"null"
]
},
"expectedTurnId": {
"description": "Required active turn id precondition. The request fails when it does not match the currently active turn.",
"type": "string"

View File

@@ -521,6 +521,16 @@
],
"description": "Override where approval requests are routed for review on this turn and subsequent turns."
},
"clientMetadata": {
"additionalProperties": {
"type": "string"
},
"description": "Optional turn-scoped Responses API client metadata.",
"type": [
"object",
"null"
]
},
"cwd": {
"description": "Override the working directory for this turn and subsequent turns.",
"type": [

View File

@@ -165,6 +165,16 @@
}
},
"properties": {
"clientMetadata": {
"additionalProperties": {
"type": "string"
},
"description": "Optional turn-scoped Responses API client metadata.",
"type": [
"object",
"null"
]
},
"expectedTurnId": {
"description": "Required active turn id precondition. The request fails when it does not match the currently active turn.",
"type": "string"

View File

@@ -13,6 +13,9 @@ import type { SandboxPolicy } from "./SandboxPolicy";
import type { UserInput } from "./UserInput";
export type TurnStartParams = {threadId: string, input: Array<UserInput>, /**
* Optional turn-scoped Responses API client metadata.
*/
clientMetadata?: { [key in string]?: string } | null, /**
* Override the working directory for this turn and subsequent turns.
*/
cwd?: string | null, /**

View File

@@ -4,6 +4,10 @@
import type { UserInput } from "./UserInput";
export type TurnSteerParams = { threadId: string, input: Array<UserInput>,
/**
* Optional turn-scoped Responses API client metadata.
*/
clientMetadata?: { [key in string]?: string } | null,
/**
* Required active turn id precondition. The request fails when it does not
* match the currently active turn.

View File

@@ -3937,6 +3937,9 @@ pub enum TurnStatus {
pub struct TurnStartParams {
pub thread_id: String,
pub input: Vec<UserInput>,
/// Optional turn-scoped Responses API client metadata.
#[ts(optional = nullable)]
pub client_metadata: Option<HashMap<String, String>>,
/// Override the working directory for this turn and subsequent turns.
#[ts(optional = nullable)]
pub cwd: Option<PathBuf>,
@@ -4053,6 +4056,9 @@ pub struct TurnStartResponse {
pub struct TurnSteerParams {
pub thread_id: String,
pub input: Vec<UserInput>,
/// Optional turn-scoped Responses API client metadata.
#[ts(optional = nullable)]
pub client_metadata: Option<HashMap<String, String>>,
/// Required active turn id precondition. The request fails when it does not
/// match the currently active turn.
pub expected_turn_id: String,
@@ -8067,6 +8073,7 @@ mod tests {
let without_override = TurnStartParams {
thread_id: "thread_123".to_string(),
input: vec![],
client_metadata: None,
cwd: None,
approval_policy: None,
approvals_reviewer: None,
@@ -8083,4 +8090,58 @@ mod tests {
serde_json::to_value(&without_override).expect("params should serialize");
assert_eq!(serialized_without_override.get("serviceTier"), None);
}
#[test]
fn turn_params_round_trip_client_metadata() {
let turn_start: TurnStartParams = serde_json::from_value(json!({
"threadId": "thread_123",
"input": [],
"clientMetadata": {
"fiber_run_id": "fiber-123",
"origin": "gaas"
}
}))
.expect("turn start params should deserialize");
assert_eq!(
turn_start.client_metadata,
Some(HashMap::from([
("fiber_run_id".to_string(), "fiber-123".to_string()),
("origin".to_string(), "gaas".to_string()),
]))
);
assert_eq!(
serde_json::to_value(&turn_start)
.expect("turn start params should serialize")
.get("clientMetadata"),
Some(&json!({
"fiber_run_id": "fiber-123",
"origin": "gaas"
}))
);
let turn_steer: TurnSteerParams = serde_json::from_value(json!({
"threadId": "thread_123",
"input": [],
"clientMetadata": {
"fiber_run_id": "fiber-456"
},
"expectedTurnId": "turn_123"
}))
.expect("turn steer params should deserialize");
assert_eq!(
turn_steer.client_metadata,
Some(HashMap::from([(
"fiber_run_id".to_string(),
"fiber-456".to_string(),
)]))
);
assert_eq!(
serde_json::to_value(&turn_steer)
.expect("turn steer params should serialize")
.get("clientMetadata"),
Some(&json!({
"fiber_run_id": "fiber-456"
}))
);
}
}

View File

@@ -148,8 +148,8 @@ Example with notification opt-out:
- `thread/shellCommand` — run a user-initiated `!` shell command against a thread; this runs unsandboxed with full access rather than inheriting the thread sandbox policy. Returns `{}` immediately while progress streams through standard turn/item notifications and any active turn receives the formatted output in its message stream.
- `thread/backgroundTerminals/clean` — terminate all running background terminals for a thread (experimental; requires `capabilities.experimentalApi`); returns `{}` when the cleanup request is accepted.
- `thread/rollback` — drop the last N turns from the agents in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
- `turn/steer` — add user input to an already in-flight regular turn without starting a new turn; returns the active `turnId` that accepted the input. Review and manual compaction turns reject `turn/steer`.
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. Optional `clientMetadata` is merged into Codexs existing turn-metadata payload for that turn. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
- `turn/steer` — add user input to an already in-flight regular turn without starting a new turn; returns the active `turnId` that accepted the input. Optional `clientMetadata` updates the extra fields merged into that turn-scoped metadata for the next outbound request. Review and manual compaction turns reject `turn/steer`.
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
- `thread/realtime/start` — start a thread-scoped realtime session (experimental); returns `{}` and streams `thread/realtime/*` notifications.
- `thread/realtime/appendAudio` — append an input audio chunk to the active realtime session (experimental); returns `{}`.
@@ -462,7 +462,7 @@ Turns attach user input (text or images) to a thread and trigger Codex generatio
- `{"type":"image","url":"https://…png"}`
- `{"type":"localImage","path":"/tmp/screenshot.png"}`
You can optionally specify config overrides on the new turn. If specified, these settings become the default for subsequent turns on the same thread. `outputSchema` applies only to the current turn.
You can optionally specify turn-scoped `clientMetadata` plus config overrides on the new turn. Config overrides become the default for subsequent turns on the same thread. `outputSchema` and `clientMetadata` apply only to the current turn, and `clientMetadata` is merged into the existing turn-metadata payload that Codex forwards downstream.
`approvalsReviewer` accepts:
@@ -473,6 +473,9 @@ You can optionally specify config overrides on the new turn. If specified, these
{ "method": "turn/start", "id": 30, "params": {
"threadId": "thr_123",
"input": [ { "type": "text", "text": "Run tests" } ],
"clientMetadata": {
"fiber_run_id": "fiber_123"
},
// Below are optional config overrides
"cwd": "/Users/me/project",
"approvalPolicy": "unlessTrusted",
@@ -595,6 +598,9 @@ not emit `turn/started` and does not accept turn context overrides.
{ "method": "turn/steer", "id": 32, "params": {
"threadId": "thr_123",
"input": [ { "type": "text", "text": "Actually focus on failing tests first." } ],
"clientMetadata": {
"fiber_run_id": "fiber_456"
},
"expectedTurnId": "turn_456"
} }
{ "id": 32, "result": { "turnId": "turn_456" } }

View File

@@ -6362,9 +6362,10 @@ impl CodexMessageProcessor {
.submit_core_op(
&request_id,
thread.as_ref(),
Op::UserInput {
Op::UserInputWithClientMetadata {
items: mapped_items,
final_output_json_schema: params.output_schema,
client_metadata: params.client_metadata,
},
)
.await;
@@ -6441,7 +6442,11 @@ impl CodexMessageProcessor {
.collect();
match thread
.steer_input(mapped_items, Some(&params.expected_turn_id))
.steer_input(
mapped_items,
Some(&params.expected_turn_id),
params.client_metadata,
)
.await
{
Ok(turn_id) => {

View File

@@ -600,6 +600,7 @@ async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
client_metadata: None,
cwd: None,
approval_policy: None,
sandbox_policy: None,
@@ -622,7 +623,7 @@ async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
&& span_attr(span, "rpc.method") == Some("turn/start")
&& span.span_context.trace_id() == remote_trace_id
}) && spans.iter().any(|span| {
span_attr(span, "codex.op") == Some("user_input")
span_attr(span, "codex.op") == Some("user_input_with_client_metadata")
&& span.span_context.trace_id() == remote_trace_id
})
})
@@ -630,10 +631,12 @@ async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
let server_request_span =
find_rpc_span_with_trace(&spans, SpanKind::Server, "turn/start", remote_trace_id);
let core_turn_span =
find_span_with_trace(&spans, remote_trace_id, "codex.op=user_input", |span| {
span_attr(span, "codex.op") == Some("user_input")
});
let core_turn_span = find_span_with_trace(
&spans,
remote_trace_id,
"codex.op=user_input_with_client_metadata",
|span| span_attr(span, "codex.op") == Some("user_input_with_client_metadata"),
);
assert_eq!(server_request_span.parent_span_id, remote_parent_span_id);
assert!(server_request_span.parent_span_is_remote);

View File

@@ -0,0 +1,383 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_shell_command_sse_response;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnSteerParams;
use codex_app_server_protocol::TurnSteerResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use core_test_support::responses;
use core_test_support::skip_if_no_network;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn turn_start_forwards_client_metadata_to_responses_request_v2() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let response_mock = responses::mount_sse_once(
&server,
responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_assistant_message("msg-1", "Done"),
responses::ev_completed("resp-1"),
]),
)
.await;
let codex_home = TempDir::new()?;
create_config_toml(
codex_home.path(),
&server.uri(),
/*supports_websockets*/ false,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams::default())
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let client_metadata = HashMap::from([
("fiber_run_id".to_string(), "fiber-start-123".to_string()),
("origin".to_string(), "gaas".to_string()),
]);
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
client_metadata: Some(client_metadata.clone()),
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let request = response_mock.single_request();
let metadata = request
.header("x-codex-turn-metadata")
.as_deref()
.map(parse_json_header)
.unwrap_or_else(|| panic!("missing x-codex-turn-metadata header"));
assert_eq!(metadata["fiber_run_id"].as_str(), Some("fiber-start-123"));
assert_eq!(metadata["origin"].as_str(), Some("gaas"));
assert_eq!(metadata["turn_id"].as_str(), Some(turn.id.as_str()));
assert!(metadata.get("session_id").is_some());
Ok(())
}
#[tokio::test]
async fn turn_steer_updates_client_metadata_on_follow_up_responses_request_v2() -> Result<()> {
skip_if_no_network!(Ok(()));
#[cfg(target_os = "windows")]
let shell_command = vec![
"powershell".to_string(),
"-Command".to_string(),
"Start-Sleep -Seconds 1".to_string(),
];
#[cfg(not(target_os = "windows"))]
let shell_command = vec!["sleep".to_string(), "1".to_string()];
let codex_home = TempDir::new()?;
let workdir = codex_home.path().join("workdir");
std::fs::create_dir(&workdir)?;
let server = responses::start_mock_server().await;
let request_log = responses::mount_sse_sequence(
&server,
vec![
create_shell_command_sse_response(
shell_command,
Some(&workdir),
Some(5_000),
"call_sleep",
)?,
responses::sse(vec![
responses::ev_response_created("resp-2"),
responses::ev_assistant_message("msg-2", "Done"),
responses::ev_completed("resp-2"),
]),
],
)
.await;
create_config_toml(
codex_home.path(),
&server.uri(),
/*supports_websockets*/ false,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams::default())
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let start_metadata =
HashMap::from([("fiber_run_id".to_string(), "fiber-start-123".to_string())]);
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "Run sleep".to_string(),
text_elements: Vec::new(),
}],
client_metadata: Some(start_metadata.clone()),
cwd: Some(workdir.clone()),
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
let turn_id = turn.id.clone();
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/started"),
)
.await??;
wait_for_request_count(&request_log, /*expected*/ 1).await?;
let steer_metadata = HashMap::from([
("fiber_run_id".to_string(), "fiber-steer-456".to_string()),
("origin".to_string(), "gaas".to_string()),
]);
let steer_req = mcp
.send_turn_steer_request(TurnSteerParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "Focus on the failure".to_string(),
text_elements: Vec::new(),
}],
client_metadata: Some(steer_metadata.clone()),
expected_turn_id: turn_id.clone(),
})
.await?;
let steer_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(steer_req)),
)
.await??;
let _turn: TurnSteerResponse = to_response::<TurnSteerResponse>(steer_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let requests = request_log.requests();
assert_eq!(requests.len(), 2);
let first_metadata = requests[0]
.header("x-codex-turn-metadata")
.as_deref()
.map(parse_json_header)
.unwrap_or_else(|| panic!("missing first x-codex-turn-metadata header"));
assert_eq!(
first_metadata["fiber_run_id"].as_str(),
Some("fiber-start-123")
);
assert_eq!(first_metadata["turn_id"].as_str(), Some(turn_id.as_str()));
let second_metadata = requests[1]
.header("x-codex-turn-metadata")
.as_deref()
.map(parse_json_header)
.unwrap_or_else(|| panic!("missing second x-codex-turn-metadata header"));
assert_eq!(
second_metadata["fiber_run_id"].as_str(),
Some("fiber-steer-456")
);
assert_eq!(second_metadata["origin"].as_str(), Some("gaas"));
assert_eq!(second_metadata["turn_id"].as_str(), Some(turn_id.as_str()));
Ok(())
}
#[tokio::test]
async fn turn_start_forwards_client_metadata_to_responses_websocket_request_body_v2() -> Result<()>
{
skip_if_no_network!(Ok(()));
let websocket_server = responses::start_websocket_server(vec![vec![
vec![
responses::ev_response_created("warm-1"),
responses::ev_completed("warm-1"),
],
vec![
responses::ev_response_created("resp-1"),
responses::ev_assistant_message("msg-1", "Done"),
responses::ev_completed("resp-1"),
],
]])
.await;
let codex_home = TempDir::new()?;
create_config_toml(
codex_home.path(),
&websocket_server.uri().replacen("ws://", "http://", 1),
/*supports_websockets*/ true,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams::default())
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let client_metadata = HashMap::from([
("fiber_run_id".to_string(), "fiber-start-123".to_string()),
("origin".to_string(), "gaas".to_string()),
]);
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
client_metadata: Some(client_metadata),
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let warmup = websocket_server
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 0)
.await
.body_json();
let request = websocket_server
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 1)
.await
.body_json();
assert_eq!(warmup["type"].as_str(), Some("response.create"));
assert_eq!(warmup["generate"].as_bool(), Some(false));
assert_eq!(request["type"].as_str(), Some("response.create"));
assert_eq!(request["previous_response_id"].as_str(), Some("warm-1"));
let metadata = request["client_metadata"]["x-codex-turn-metadata"]
.as_str()
.map(parse_json_header)
.unwrap_or_else(|| panic!("missing websocket x-codex-turn-metadata client metadata"));
assert_eq!(metadata["fiber_run_id"].as_str(), Some("fiber-start-123"));
assert_eq!(metadata["origin"].as_str(), Some("gaas"));
assert_eq!(metadata["turn_id"].as_str(), Some(turn.id.as_str()));
assert!(metadata.get("session_id").is_some());
websocket_server.shutdown().await;
Ok(())
}
fn create_config_toml(
codex_home: &Path,
server_uri: &str,
supports_websockets: bool,
) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
supports_websockets = {supports_websockets}
"#
),
)
}
fn parse_json_header(value: &str) -> serde_json::Value {
match serde_json::from_str(value) {
Ok(value) => value,
Err(err) => panic!("metadata header should be valid json: {err}"),
}
}
async fn wait_for_request_count(
request_log: &core_test_support::responses::ResponseMock,
expected: usize,
) -> Result<()> {
timeout(DEFAULT_READ_TIMEOUT, async {
loop {
if request_log.requests().len() >= expected {
return;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
})
.await?;
Ok(())
}

View File

@@ -1,6 +1,7 @@
mod account;
mod analytics;
mod app_list;
mod client_metadata;
mod collaboration_mode_list;
#[cfg(unix)]
mod command_exec;

View File

@@ -1377,6 +1377,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
text: "first turn".to_string(),
text_elements: Vec::new(),
}],
client_metadata: None,
cwd: Some(first_cwd.clone()),
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
approvals_reviewer: None,
@@ -1416,6 +1417,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
text: "second turn".to_string(),
text_elements: Vec::new(),
}],
client_metadata: None,
cwd: Some(second_cwd.clone()),
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
approvals_reviewer: None,

View File

@@ -57,6 +57,7 @@ async fn turn_steer_requires_active_turn() -> Result<()> {
text: "steer".to_string(),
text_elements: Vec::new(),
}],
client_metadata: None,
expected_turn_id: "turn-does-not-exist".to_string(),
})
.await?;
@@ -145,6 +146,7 @@ async fn turn_steer_rejects_oversized_text_input() -> Result<()> {
text: oversized_input.clone(),
text_elements: Vec::new(),
}],
client_metadata: None,
expected_turn_id: turn.id.clone(),
})
.await?;
@@ -247,6 +249,7 @@ async fn turn_steer_returns_active_turn_id() -> Result<()> {
text: "steer".to_string(),
text_elements: Vec::new(),
}],
client_metadata: None,
expected_turn_id: turn.id.clone(),
})
.await?;

View File

@@ -1364,11 +1364,17 @@ fn parse_turn_metadata_header(turn_metadata_header: Option<&str>) -> Option<Head
}
fn build_ws_client_metadata(turn_metadata_header: Option<&str>) -> Option<HashMap<String, String>> {
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header)?;
let turn_metadata = turn_metadata_header.to_str().ok()?.to_string();
let mut client_metadata = HashMap::new();
client_metadata.insert(X_CODEX_TURN_METADATA_HEADER.to_string(), turn_metadata);
Some(client_metadata)
if let Some(turn_metadata_header) = parse_turn_metadata_header(turn_metadata_header)
&& let Ok(turn_metadata) = turn_metadata_header.to_str()
{
client_metadata.insert(
X_CODEX_TURN_METADATA_HEADER.to_string(),
turn_metadata.to_string(),
);
}
(!client_metadata.is_empty()).then_some(client_metadata)
}
/// Builds the extra headers attached to Responses API requests.

View File

@@ -742,8 +742,11 @@ impl Codex {
&self,
input: Vec<UserInput>,
expected_turn_id: Option<&str>,
client_metadata: Option<HashMap<String, String>>,
) -> Result<String, SteerInputError> {
self.session.steer_input(input, expected_turn_id).await
self.session
.steer_input(input, expected_turn_id, client_metadata)
.await
}
pub(crate) async fn set_app_server_client_name(
@@ -3896,6 +3899,7 @@ impl Session {
&self,
input: Vec<UserInput>,
expected_turn_id: Option<&str>,
client_metadata: Option<HashMap<String, String>>,
) -> Result<String, SteerInputError> {
if input.is_empty() {
return Err(SteerInputError::EmptyInput);
@@ -3934,6 +3938,15 @@ impl Session {
None => return Err(SteerInputError::NoActiveTurn(input)),
}
if let Some(client_metadata) = client_metadata
&& let Some((_, active_task)) = active_turn.tasks.first()
{
active_task
.turn_context
.turn_metadata_state
.set_client_metadata(client_metadata);
}
let mut turn_state = active_turn.turn_state.lock().await;
turn_state.push_pending_input(input.into());
Ok(active_turn_id.clone())
@@ -4326,7 +4339,9 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
.await;
false
}
Op::UserInput { .. } | Op::UserTurn { .. } => {
Op::UserInput { .. }
| Op::UserInputWithClientMetadata { .. }
| Op::UserTurn { .. } => {
handlers::user_input_or_turn(&sess, sub.id.clone(), sub.op).await;
false
}
@@ -4570,7 +4585,7 @@ mod handlers {
}
pub async fn user_input_or_turn(sess: &Arc<Session>, sub_id: String, op: Op) {
let (items, updates) = match op {
let (items, updates, client_metadata) = match op {
Op::UserTurn {
cwd,
approval_policy,
@@ -4610,6 +4625,7 @@ mod handlers {
personality,
app_server_client_name: None,
},
None,
)
}
Op::UserInput {
@@ -4621,6 +4637,19 @@ mod handlers {
final_output_json_schema: Some(final_output_json_schema),
..Default::default()
},
None,
),
Op::UserInputWithClientMetadata {
items,
final_output_json_schema,
client_metadata,
} => (
items,
SessionSettingsUpdate {
final_output_json_schema: Some(final_output_json_schema),
..Default::default()
},
client_metadata,
),
_ => unreachable!(),
};
@@ -4632,11 +4661,20 @@ mod handlers {
sess.maybe_emit_unknown_model_warning_for_turn(current_context.as_ref())
.await;
match sess
.steer_input(items.clone(), /*expected_turn_id*/ None)
.steer_input(
items.clone(),
/*expected_turn_id*/ None,
client_metadata.clone(),
)
.await
{
Ok(_) => current_context.session_telemetry.user_prompt(&items),
Err(SteerInputError::NoActiveTurn(items)) => {
if let Some(client_metadata) = client_metadata {
current_context
.turn_metadata_state
.set_client_metadata(client_metadata);
}
current_context.session_telemetry.user_prompt(&items);
sess.refresh_mcp_servers_if_requested(&current_context)
.await;

View File

@@ -4589,7 +4589,9 @@ async fn steer_input_requires_active_turn() {
}];
let err = sess
.steer_input(input, /*expected_turn_id*/ None)
.steer_input(
input, /*expected_turn_id*/ None, /*client_metadata*/ None,
)
.await
.expect_err("steering without active turn should fail");
@@ -4618,7 +4620,11 @@ async fn steer_input_enforces_expected_turn_id() {
text_elements: Vec::new(),
}];
let err = sess
.steer_input(steer_input, Some("different-turn-id"))
.steer_input(
steer_input,
Some("different-turn-id"),
/*client_metadata*/ None,
)
.await
.expect_err("mismatched expected turn id should fail");
@@ -4660,7 +4666,11 @@ async fn steer_input_rejects_non_regular_turns() {
text_elements: Vec::new(),
}];
let err = sess
.steer_input(steer_input, /*expected_turn_id*/ None)
.steer_input(
steer_input,
/*expected_turn_id*/ None,
/*client_metadata*/ None,
)
.await
.expect_err("steering a non-regular turn should fail");
@@ -4692,7 +4702,7 @@ async fn steer_input_returns_active_turn_id() {
text_elements: Vec::new(),
}];
let turn_id = sess
.steer_input(steer_input, Some(&tc.sub_id))
.steer_input(steer_input, Some(&tc.sub_id), /*client_metadata*/ None)
.await
.expect("steering with matching expected turn id should succeed");

View File

@@ -22,6 +22,7 @@ use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::W3cTraceContext;
use codex_protocol::user_input::UserInput;
use std::collections::HashMap;
use std::path::PathBuf;
use tokio::sync::Mutex;
use tokio::sync::watch;
@@ -96,8 +97,11 @@ impl CodexThread {
&self,
input: Vec<UserInput>,
expected_turn_id: Option<&str>,
client_metadata: Option<HashMap<String, String>>,
) -> Result<String, SteerInputError> {
self.codex.steer_input(input, expected_turn_id).await
self.codex
.steer_input(input, expected_turn_id, client_metadata)
.await
}
pub async fn set_app_server_client_name(

View File

@@ -1,4 +1,5 @@
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
@@ -6,6 +7,7 @@ use std::sync::Mutex;
use std::sync::RwLock;
use serde::Serialize;
use serde_json::Value;
use tokio::task::JoinHandle;
use crate::sandbox_tags::sandbox_tag;
@@ -69,6 +71,20 @@ impl TurnMetadataBag {
}
}
fn merge_client_metadata(
header: &str,
client_metadata: Option<&HashMap<String, String>>,
) -> Option<String> {
let client_metadata = client_metadata?;
let mut metadata = serde_json::from_str::<serde_json::Map<String, Value>>(header).ok()?;
for (key, value) in client_metadata {
metadata
.entry(key.clone())
.or_insert_with(|| Value::String(value.clone()));
}
serde_json::to_string(&metadata).ok()
}
fn build_turn_metadata_bag(
session_id: Option<String>,
turn_id: Option<String>,
@@ -129,6 +145,7 @@ pub(crate) struct TurnMetadataState {
base_metadata: TurnMetadataBag,
base_header: String,
enriched_header: Arc<RwLock<Option<String>>>,
client_metadata: Arc<RwLock<Option<HashMap<String, String>>>>,
enrichment_task: Arc<Mutex<Option<JoinHandle<()>>>>,
}
@@ -159,21 +176,29 @@ impl TurnMetadataState {
base_metadata,
base_header,
enriched_header: Arc::new(RwLock::new(None)),
client_metadata: Arc::new(RwLock::new(None)),
enrichment_task: Arc::new(Mutex::new(None)),
}
}
pub(crate) fn current_header_value(&self) -> Option<String> {
if let Some(header) = self
let header = if let Some(header) = self
.enriched_header
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.as_ref()
.cloned()
{
return Some(header);
}
Some(self.base_header.clone())
header
} else {
self.base_header.clone()
};
let client_metadata = self
.client_metadata
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clone();
merge_client_metadata(&header, client_metadata.as_ref()).or(Some(header))
}
pub(crate) fn current_meta_value(&self) -> Option<serde_json::Value> {
@@ -181,6 +206,13 @@ impl TurnMetadataState {
.and_then(|header| serde_json::from_str(&header).ok())
}
pub(crate) fn set_client_metadata(&self, client_metadata: HashMap<String, String>) {
*self
.client_metadata
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(client_metadata);
}
pub(crate) fn spawn_git_enrichment_task(&self) {
if self.repo_root.is_none() {
return;

View File

@@ -1,6 +1,7 @@
use super::*;
use serde_json::Value;
use std::collections::HashMap;
use tempfile::TempDir;
use tokio::process::Command;
@@ -83,3 +84,29 @@ fn turn_metadata_state_uses_platform_sandbox_tag() {
assert_eq!(sandbox_name, Some(expected_sandbox));
assert_eq!(session_id, Some("session-a"));
}
#[test]
fn turn_metadata_state_merges_client_metadata_without_replacing_reserved_fields() {
let temp_dir = TempDir::new().expect("temp dir");
let cwd = temp_dir.path().to_path_buf();
let sandbox_policy = SandboxPolicy::new_read_only_policy();
let state = TurnMetadataState::new(
"session-a".to_string(),
"turn-a".to_string(),
cwd,
&sandbox_policy,
WindowsSandboxLevel::Disabled,
);
state.set_client_metadata(HashMap::from([
("fiber_run_id".to_string(), "fiber-123".to_string()),
("session_id".to_string(), "client-supplied".to_string()),
]));
let header = state.current_header_value().expect("header");
let json: Value = serde_json::from_str(&header).expect("json");
assert_eq!(json["fiber_run_id"].as_str(), Some("fiber-123"));
assert_eq!(json["session_id"].as_str(), Some("session-a"));
assert_eq!(json["turn_id"].as_str(), Some("turn-a"));
}

View File

@@ -1260,6 +1260,56 @@ async fn responses_websocket_forwards_turn_metadata_on_initial_and_incremental_c
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_preserves_custom_turn_metadata_fields() {
skip_if_no_network!();
let server = start_websocket_server(vec![vec![vec![
ev_response_created("resp-1"),
ev_completed("resp-1"),
]]])
.await;
let harness = websocket_harness(&server).await;
let mut client_session = harness.client.new_session();
let prompt = prompt_with_input(vec![message_item("hello")]);
let turn_metadata = json!({
"turn_id": "turn-123",
"fiber_run_id": "fiber-123",
"origin": "app-server",
})
.to_string();
stream_until_complete_with_turn_metadata(
&mut client_session,
&harness,
&prompt,
/*service_tier*/ None,
Some(&turn_metadata),
)
.await;
let body = server
.single_connection()
.first()
.expect("missing request")
.body_json();
assert_eq!(body["type"].as_str(), Some("response.create"));
assert_eq!(
body["client_metadata"]["x-codex-turn-metadata"]
.as_str()
.map(|value| serde_json::from_str::<serde_json::Value>(value).expect("valid json")),
Some(json!({
"turn_id": "turn-123",
"fiber_run_id": "fiber-123",
"origin": "app-server",
}))
);
server.shutdown().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_websocket_uses_previous_response_id_when_prefix_after_completed() {
skip_if_no_network!();
@@ -1810,6 +1860,23 @@ async fn stream_until_complete_with_turn_metadata(
prompt: &Prompt,
service_tier: Option<ServiceTier>,
turn_metadata_header: Option<&str>,
) {
stream_until_complete_with_request_metadata(
client_session,
harness,
prompt,
service_tier,
turn_metadata_header,
)
.await;
}
async fn stream_until_complete_with_request_metadata(
client_session: &mut ModelClientSession,
harness: &WebsocketTestHarness,
prompt: &Prompt,
service_tier: Option<ServiceTier>,
turn_metadata_header: Option<&str>,
) {
let mut stream = client_session
.stream(

View File

@@ -677,6 +677,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
params: TurnStartParams {
thread_id: primary_thread_id_for_span.clone(),
input: items.into_iter().map(Into::into).collect(),
client_metadata: None,
cwd: Some(default_cwd),
approval_policy: Some(default_approval_policy.into()),
approvals_reviewer: None,

View File

@@ -242,6 +242,18 @@ pub enum Op {
final_output_json_schema: Option<Value>,
},
/// App-server user input carrying turn-scoped Responses API client metadata.
UserInputWithClientMetadata {
/// User input items, see `InputItem`
items: Vec<UserInput>,
/// Optional JSON Schema used to constrain the final assistant message for this turn.
#[serde(skip_serializing_if = "Option::is_none")]
final_output_json_schema: Option<Value>,
/// Optional turn-scoped Responses API `client_metadata`.
#[serde(default, skip_serializing_if = "Option::is_none")]
client_metadata: Option<HashMap<String, String>>,
},
/// Similar to [`Op::UserInput`], but contains additional context required
/// for a turn of a [`crate::codex_thread::CodexThread`].
UserTurn {
@@ -577,6 +589,7 @@ impl Op {
Self::RealtimeConversationText(_) => "realtime_conversation_text",
Self::RealtimeConversationClose => "realtime_conversation_close",
Self::UserInput { .. } => "user_input",
Self::UserInputWithClientMetadata { .. } => "user_input_with_client_metadata",
Self::UserTurn { .. } => "user_turn",
Self::InterAgentCommunication { .. } => "inter_agent_communication",
Self::OverrideTurnContext { .. } => "override_turn_context",
@@ -4491,6 +4504,33 @@ mod tests {
Ok(())
}
#[test]
fn user_input_with_client_metadata_round_trips() -> Result<()> {
let op = Op::UserInputWithClientMetadata {
items: Vec::new(),
final_output_json_schema: None,
client_metadata: Some(HashMap::from([(
"fiber_run_id".to_string(),
"fiber-123".to_string(),
)])),
};
let json_op = serde_json::to_value(&op)?;
assert_eq!(
json_op,
json!({
"type": "user_input_with_client_metadata",
"items": [],
"client_metadata": {
"fiber_run_id": "fiber-123",
}
})
);
assert_eq!(serde_json::from_value::<Op>(json_op)?, op);
Ok(())
}
#[test]
fn user_input_text_serializes_empty_text_elements() -> Result<()> {
let input = UserInput::Text {

View File

@@ -417,6 +417,7 @@ impl AppServerSession {
params: TurnStartParams {
thread_id: thread_id.to_string(),
input: items.into_iter().map(Into::into).collect(),
client_metadata: None,
cwd: Some(cwd),
approval_policy: Some(approval_policy.into()),
approvals_reviewer: Some(approvals_reviewer.into()),
@@ -467,6 +468,7 @@ impl AppServerSession {
params: TurnSteerParams {
thread_id: thread_id.to_string(),
input: items.into_iter().map(Into::into).collect(),
client_metadata: None,
expected_turn_id: turn_id,
},
})