mirror of
https://github.com/openai/codex.git
synced 2026-04-05 05:14:47 +00:00
Compare commits
2 Commits
pr16659
...
codex/forw
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4ab8eb4a79 | ||
|
|
6c9e77727d |
@@ -3167,6 +3167,16 @@
|
||||
],
|
||||
"description": "Override where approval requests are routed for review on this turn and subsequent turns."
|
||||
},
|
||||
"clientMetadata": {
|
||||
"additionalProperties": {
|
||||
"type": "string"
|
||||
},
|
||||
"description": "Optional turn-scoped Responses API client metadata.",
|
||||
"type": [
|
||||
"object",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"cwd": {
|
||||
"description": "Override the working directory for this turn and subsequent turns.",
|
||||
"type": [
|
||||
@@ -3264,6 +3274,16 @@
|
||||
},
|
||||
"TurnSteerParams": {
|
||||
"properties": {
|
||||
"clientMetadata": {
|
||||
"additionalProperties": {
|
||||
"type": "string"
|
||||
},
|
||||
"description": "Optional turn-scoped Responses API client metadata.",
|
||||
"type": [
|
||||
"object",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"expectedTurnId": {
|
||||
"description": "Required active turn id precondition. The request fails when it does not match the currently active turn.",
|
||||
"type": "string"
|
||||
|
||||
@@ -14338,6 +14338,16 @@
|
||||
],
|
||||
"description": "Override where approval requests are routed for review on this turn and subsequent turns."
|
||||
},
|
||||
"clientMetadata": {
|
||||
"additionalProperties": {
|
||||
"type": "string"
|
||||
},
|
||||
"description": "Optional turn-scoped Responses API client metadata.",
|
||||
"type": [
|
||||
"object",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"cwd": {
|
||||
"description": "Override the working directory for this turn and subsequent turns.",
|
||||
"type": [
|
||||
@@ -14476,6 +14486,16 @@
|
||||
"TurnSteerParams": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
"clientMetadata": {
|
||||
"additionalProperties": {
|
||||
"type": "string"
|
||||
},
|
||||
"description": "Optional turn-scoped Responses API client metadata.",
|
||||
"type": [
|
||||
"object",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"expectedTurnId": {
|
||||
"description": "Required active turn id precondition. The request fails when it does not match the currently active turn.",
|
||||
"type": "string"
|
||||
|
||||
@@ -12184,6 +12184,16 @@
|
||||
],
|
||||
"description": "Override where approval requests are routed for review on this turn and subsequent turns."
|
||||
},
|
||||
"clientMetadata": {
|
||||
"additionalProperties": {
|
||||
"type": "string"
|
||||
},
|
||||
"description": "Optional turn-scoped Responses API client metadata.",
|
||||
"type": [
|
||||
"object",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"cwd": {
|
||||
"description": "Override the working directory for this turn and subsequent turns.",
|
||||
"type": [
|
||||
@@ -12322,6 +12332,16 @@
|
||||
"TurnSteerParams": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
"clientMetadata": {
|
||||
"additionalProperties": {
|
||||
"type": "string"
|
||||
},
|
||||
"description": "Optional turn-scoped Responses API client metadata.",
|
||||
"type": [
|
||||
"object",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"expectedTurnId": {
|
||||
"description": "Required active turn id precondition. The request fails when it does not match the currently active turn.",
|
||||
"type": "string"
|
||||
|
||||
@@ -521,6 +521,16 @@
|
||||
],
|
||||
"description": "Override where approval requests are routed for review on this turn and subsequent turns."
|
||||
},
|
||||
"clientMetadata": {
|
||||
"additionalProperties": {
|
||||
"type": "string"
|
||||
},
|
||||
"description": "Optional turn-scoped Responses API client metadata.",
|
||||
"type": [
|
||||
"object",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"cwd": {
|
||||
"description": "Override the working directory for this turn and subsequent turns.",
|
||||
"type": [
|
||||
|
||||
@@ -165,6 +165,16 @@
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"clientMetadata": {
|
||||
"additionalProperties": {
|
||||
"type": "string"
|
||||
},
|
||||
"description": "Optional turn-scoped Responses API client metadata.",
|
||||
"type": [
|
||||
"object",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"expectedTurnId": {
|
||||
"description": "Required active turn id precondition. The request fails when it does not match the currently active turn.",
|
||||
"type": "string"
|
||||
|
||||
@@ -13,6 +13,9 @@ import type { SandboxPolicy } from "./SandboxPolicy";
|
||||
import type { UserInput } from "./UserInput";
|
||||
|
||||
export type TurnStartParams = {threadId: string, input: Array<UserInput>, /**
|
||||
* Optional turn-scoped Responses API client metadata.
|
||||
*/
|
||||
clientMetadata?: { [key in string]?: string } | null, /**
|
||||
* Override the working directory for this turn and subsequent turns.
|
||||
*/
|
||||
cwd?: string | null, /**
|
||||
|
||||
@@ -4,6 +4,10 @@
|
||||
import type { UserInput } from "./UserInput";
|
||||
|
||||
export type TurnSteerParams = { threadId: string, input: Array<UserInput>,
|
||||
/**
|
||||
* Optional turn-scoped Responses API client metadata.
|
||||
*/
|
||||
clientMetadata?: { [key in string]?: string } | null,
|
||||
/**
|
||||
* Required active turn id precondition. The request fails when it does not
|
||||
* match the currently active turn.
|
||||
|
||||
@@ -3937,6 +3937,9 @@ pub enum TurnStatus {
|
||||
pub struct TurnStartParams {
|
||||
pub thread_id: String,
|
||||
pub input: Vec<UserInput>,
|
||||
/// Optional turn-scoped Responses API client metadata.
|
||||
#[ts(optional = nullable)]
|
||||
pub client_metadata: Option<HashMap<String, String>>,
|
||||
/// Override the working directory for this turn and subsequent turns.
|
||||
#[ts(optional = nullable)]
|
||||
pub cwd: Option<PathBuf>,
|
||||
@@ -4053,6 +4056,9 @@ pub struct TurnStartResponse {
|
||||
pub struct TurnSteerParams {
|
||||
pub thread_id: String,
|
||||
pub input: Vec<UserInput>,
|
||||
/// Optional turn-scoped Responses API client metadata.
|
||||
#[ts(optional = nullable)]
|
||||
pub client_metadata: Option<HashMap<String, String>>,
|
||||
/// Required active turn id precondition. The request fails when it does not
|
||||
/// match the currently active turn.
|
||||
pub expected_turn_id: String,
|
||||
@@ -8067,6 +8073,7 @@ mod tests {
|
||||
let without_override = TurnStartParams {
|
||||
thread_id: "thread_123".to_string(),
|
||||
input: vec![],
|
||||
client_metadata: None,
|
||||
cwd: None,
|
||||
approval_policy: None,
|
||||
approvals_reviewer: None,
|
||||
@@ -8083,4 +8090,58 @@ mod tests {
|
||||
serde_json::to_value(&without_override).expect("params should serialize");
|
||||
assert_eq!(serialized_without_override.get("serviceTier"), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn turn_params_round_trip_client_metadata() {
|
||||
let turn_start: TurnStartParams = serde_json::from_value(json!({
|
||||
"threadId": "thread_123",
|
||||
"input": [],
|
||||
"clientMetadata": {
|
||||
"fiber_run_id": "fiber-123",
|
||||
"origin": "gaas"
|
||||
}
|
||||
}))
|
||||
.expect("turn start params should deserialize");
|
||||
assert_eq!(
|
||||
turn_start.client_metadata,
|
||||
Some(HashMap::from([
|
||||
("fiber_run_id".to_string(), "fiber-123".to_string()),
|
||||
("origin".to_string(), "gaas".to_string()),
|
||||
]))
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::to_value(&turn_start)
|
||||
.expect("turn start params should serialize")
|
||||
.get("clientMetadata"),
|
||||
Some(&json!({
|
||||
"fiber_run_id": "fiber-123",
|
||||
"origin": "gaas"
|
||||
}))
|
||||
);
|
||||
|
||||
let turn_steer: TurnSteerParams = serde_json::from_value(json!({
|
||||
"threadId": "thread_123",
|
||||
"input": [],
|
||||
"clientMetadata": {
|
||||
"fiber_run_id": "fiber-456"
|
||||
},
|
||||
"expectedTurnId": "turn_123"
|
||||
}))
|
||||
.expect("turn steer params should deserialize");
|
||||
assert_eq!(
|
||||
turn_steer.client_metadata,
|
||||
Some(HashMap::from([(
|
||||
"fiber_run_id".to_string(),
|
||||
"fiber-456".to_string(),
|
||||
)]))
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::to_value(&turn_steer)
|
||||
.expect("turn steer params should serialize")
|
||||
.get("clientMetadata"),
|
||||
Some(&json!({
|
||||
"fiber_run_id": "fiber-456"
|
||||
}))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,8 +148,8 @@ Example with notification opt-out:
|
||||
- `thread/shellCommand` — run a user-initiated `!` shell command against a thread; this runs unsandboxed with full access rather than inheriting the thread sandbox policy. Returns `{}` immediately while progress streams through standard turn/item notifications and any active turn receives the formatted output in its message stream.
|
||||
- `thread/backgroundTerminals/clean` — terminate all running background terminals for a thread (experimental; requires `capabilities.experimentalApi`); returns `{}` when the cleanup request is accepted.
|
||||
- `thread/rollback` — drop the last N turns from the agent’s in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
|
||||
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
|
||||
- `turn/steer` — add user input to an already in-flight regular turn without starting a new turn; returns the active `turnId` that accepted the input. Review and manual compaction turns reject `turn/steer`.
|
||||
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. Optional `clientMetadata` is merged into Codex’s existing turn-metadata payload for that turn. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
|
||||
- `turn/steer` — add user input to an already in-flight regular turn without starting a new turn; returns the active `turnId` that accepted the input. Optional `clientMetadata` updates the extra fields merged into that turn-scoped metadata for the next outbound request. Review and manual compaction turns reject `turn/steer`.
|
||||
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
|
||||
- `thread/realtime/start` — start a thread-scoped realtime session (experimental); returns `{}` and streams `thread/realtime/*` notifications.
|
||||
- `thread/realtime/appendAudio` — append an input audio chunk to the active realtime session (experimental); returns `{}`.
|
||||
@@ -462,7 +462,7 @@ Turns attach user input (text or images) to a thread and trigger Codex generatio
|
||||
- `{"type":"image","url":"https://…png"}`
|
||||
- `{"type":"localImage","path":"/tmp/screenshot.png"}`
|
||||
|
||||
You can optionally specify config overrides on the new turn. If specified, these settings become the default for subsequent turns on the same thread. `outputSchema` applies only to the current turn.
|
||||
You can optionally specify turn-scoped `clientMetadata` plus config overrides on the new turn. Config overrides become the default for subsequent turns on the same thread. `outputSchema` and `clientMetadata` apply only to the current turn, and `clientMetadata` is merged into the existing turn-metadata payload that Codex forwards downstream.
|
||||
|
||||
`approvalsReviewer` accepts:
|
||||
|
||||
@@ -473,6 +473,9 @@ You can optionally specify config overrides on the new turn. If specified, these
|
||||
{ "method": "turn/start", "id": 30, "params": {
|
||||
"threadId": "thr_123",
|
||||
"input": [ { "type": "text", "text": "Run tests" } ],
|
||||
"clientMetadata": {
|
||||
"fiber_run_id": "fiber_123"
|
||||
},
|
||||
// Below are optional config overrides
|
||||
"cwd": "/Users/me/project",
|
||||
"approvalPolicy": "unlessTrusted",
|
||||
@@ -595,6 +598,9 @@ not emit `turn/started` and does not accept turn context overrides.
|
||||
{ "method": "turn/steer", "id": 32, "params": {
|
||||
"threadId": "thr_123",
|
||||
"input": [ { "type": "text", "text": "Actually focus on failing tests first." } ],
|
||||
"clientMetadata": {
|
||||
"fiber_run_id": "fiber_456"
|
||||
},
|
||||
"expectedTurnId": "turn_456"
|
||||
} }
|
||||
{ "id": 32, "result": { "turnId": "turn_456" } }
|
||||
|
||||
@@ -6362,9 +6362,10 @@ impl CodexMessageProcessor {
|
||||
.submit_core_op(
|
||||
&request_id,
|
||||
thread.as_ref(),
|
||||
Op::UserInput {
|
||||
Op::UserInputWithClientMetadata {
|
||||
items: mapped_items,
|
||||
final_output_json_schema: params.output_schema,
|
||||
client_metadata: params.client_metadata,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
@@ -6441,7 +6442,11 @@ impl CodexMessageProcessor {
|
||||
.collect();
|
||||
|
||||
match thread
|
||||
.steer_input(mapped_items, Some(¶ms.expected_turn_id))
|
||||
.steer_input(
|
||||
mapped_items,
|
||||
Some(¶ms.expected_turn_id),
|
||||
params.client_metadata,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(turn_id) => {
|
||||
|
||||
@@ -600,6 +600,7 @@ async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
|
||||
text: "hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
client_metadata: None,
|
||||
cwd: None,
|
||||
approval_policy: None,
|
||||
sandbox_policy: None,
|
||||
@@ -622,7 +623,7 @@ async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
|
||||
&& span_attr(span, "rpc.method") == Some("turn/start")
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
}) && spans.iter().any(|span| {
|
||||
span_attr(span, "codex.op") == Some("user_input")
|
||||
span_attr(span, "codex.op") == Some("user_input_with_client_metadata")
|
||||
&& span.span_context.trace_id() == remote_trace_id
|
||||
})
|
||||
})
|
||||
@@ -630,10 +631,12 @@ async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
|
||||
|
||||
let server_request_span =
|
||||
find_rpc_span_with_trace(&spans, SpanKind::Server, "turn/start", remote_trace_id);
|
||||
let core_turn_span =
|
||||
find_span_with_trace(&spans, remote_trace_id, "codex.op=user_input", |span| {
|
||||
span_attr(span, "codex.op") == Some("user_input")
|
||||
});
|
||||
let core_turn_span = find_span_with_trace(
|
||||
&spans,
|
||||
remote_trace_id,
|
||||
"codex.op=user_input_with_client_metadata",
|
||||
|span| span_attr(span, "codex.op") == Some("user_input_with_client_metadata"),
|
||||
);
|
||||
|
||||
assert_eq!(server_request_span.parent_span_id, remote_parent_span_id);
|
||||
assert!(server_request_span.parent_span_is_remote);
|
||||
|
||||
383
codex-rs/app-server/tests/suite/v2/client_metadata.rs
Normal file
383
codex-rs/app-server/tests/suite/v2/client_metadata.rs
Normal file
@@ -0,0 +1,383 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_shell_command_sse_response;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::TurnSteerParams;
|
||||
use codex_app_server_protocol::TurnSteerResponse;
|
||||
use codex_app_server_protocol::UserInput as V2UserInput;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_forwards_client_metadata_to_responses_request_v2() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let response_mock = responses::mount_sse_once(
|
||||
&server,
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_assistant_message("msg-1", "Done"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
/*supports_websockets*/ false,
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let thread_req = mcp
|
||||
.send_thread_start_request(ThreadStartParams::default())
|
||||
.await?;
|
||||
let thread_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
||||
|
||||
let client_metadata = HashMap::from([
|
||||
("fiber_run_id".to_string(), "fiber-start-123".to_string()),
|
||||
("origin".to_string(), "gaas".to_string()),
|
||||
]);
|
||||
let turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id,
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "Hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
client_metadata: Some(client_metadata.clone()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
|
||||
)
|
||||
.await??;
|
||||
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
||||
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let request = response_mock.single_request();
|
||||
let metadata = request
|
||||
.header("x-codex-turn-metadata")
|
||||
.as_deref()
|
||||
.map(parse_json_header)
|
||||
.unwrap_or_else(|| panic!("missing x-codex-turn-metadata header"));
|
||||
assert_eq!(metadata["fiber_run_id"].as_str(), Some("fiber-start-123"));
|
||||
assert_eq!(metadata["origin"].as_str(), Some("gaas"));
|
||||
assert_eq!(metadata["turn_id"].as_str(), Some(turn.id.as_str()));
|
||||
assert!(metadata.get("session_id").is_some());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_steer_updates_client_metadata_on_follow_up_responses_request_v2() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
#[cfg(target_os = "windows")]
|
||||
let shell_command = vec![
|
||||
"powershell".to_string(),
|
||||
"-Command".to_string(),
|
||||
"Start-Sleep -Seconds 1".to_string(),
|
||||
];
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
let shell_command = vec!["sleep".to_string(), "1".to_string()];
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
let workdir = codex_home.path().join("workdir");
|
||||
std::fs::create_dir(&workdir)?;
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let request_log = responses::mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
create_shell_command_sse_response(
|
||||
shell_command,
|
||||
Some(&workdir),
|
||||
Some(5_000),
|
||||
"call_sleep",
|
||||
)?,
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-2"),
|
||||
responses::ev_assistant_message("msg-2", "Done"),
|
||||
responses::ev_completed("resp-2"),
|
||||
]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
create_config_toml(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
/*supports_websockets*/ false,
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let thread_req = mcp
|
||||
.send_thread_start_request(ThreadStartParams::default())
|
||||
.await?;
|
||||
let thread_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
||||
|
||||
let start_metadata =
|
||||
HashMap::from([("fiber_run_id".to_string(), "fiber-start-123".to_string())]);
|
||||
let turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "Run sleep".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
client_metadata: Some(start_metadata.clone()),
|
||||
cwd: Some(workdir.clone()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
|
||||
)
|
||||
.await??;
|
||||
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
||||
let turn_id = turn.id.clone();
|
||||
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/started"),
|
||||
)
|
||||
.await??;
|
||||
wait_for_request_count(&request_log, /*expected*/ 1).await?;
|
||||
|
||||
let steer_metadata = HashMap::from([
|
||||
("fiber_run_id".to_string(), "fiber-steer-456".to_string()),
|
||||
("origin".to_string(), "gaas".to_string()),
|
||||
]);
|
||||
let steer_req = mcp
|
||||
.send_turn_steer_request(TurnSteerParams {
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "Focus on the failure".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
client_metadata: Some(steer_metadata.clone()),
|
||||
expected_turn_id: turn_id.clone(),
|
||||
})
|
||||
.await?;
|
||||
let steer_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(steer_req)),
|
||||
)
|
||||
.await??;
|
||||
let _turn: TurnSteerResponse = to_response::<TurnSteerResponse>(steer_resp)?;
|
||||
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let requests = request_log.requests();
|
||||
assert_eq!(requests.len(), 2);
|
||||
let first_metadata = requests[0]
|
||||
.header("x-codex-turn-metadata")
|
||||
.as_deref()
|
||||
.map(parse_json_header)
|
||||
.unwrap_or_else(|| panic!("missing first x-codex-turn-metadata header"));
|
||||
assert_eq!(
|
||||
first_metadata["fiber_run_id"].as_str(),
|
||||
Some("fiber-start-123")
|
||||
);
|
||||
assert_eq!(first_metadata["turn_id"].as_str(), Some(turn_id.as_str()));
|
||||
|
||||
let second_metadata = requests[1]
|
||||
.header("x-codex-turn-metadata")
|
||||
.as_deref()
|
||||
.map(parse_json_header)
|
||||
.unwrap_or_else(|| panic!("missing second x-codex-turn-metadata header"));
|
||||
assert_eq!(
|
||||
second_metadata["fiber_run_id"].as_str(),
|
||||
Some("fiber-steer-456")
|
||||
);
|
||||
assert_eq!(second_metadata["origin"].as_str(), Some("gaas"));
|
||||
assert_eq!(second_metadata["turn_id"].as_str(), Some(turn_id.as_str()));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_forwards_client_metadata_to_responses_websocket_request_body_v2() -> Result<()>
|
||||
{
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let websocket_server = responses::start_websocket_server(vec![vec![
|
||||
vec![
|
||||
responses::ev_response_created("warm-1"),
|
||||
responses::ev_completed("warm-1"),
|
||||
],
|
||||
vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_assistant_message("msg-1", "Done"),
|
||||
responses::ev_completed("resp-1"),
|
||||
],
|
||||
]])
|
||||
.await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(
|
||||
codex_home.path(),
|
||||
&websocket_server.uri().replacen("ws://", "http://", 1),
|
||||
/*supports_websockets*/ true,
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let thread_req = mcp
|
||||
.send_thread_start_request(ThreadStartParams::default())
|
||||
.await?;
|
||||
let thread_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
||||
|
||||
let client_metadata = HashMap::from([
|
||||
("fiber_run_id".to_string(), "fiber-start-123".to_string()),
|
||||
("origin".to_string(), "gaas".to_string()),
|
||||
]);
|
||||
let turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id,
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "Hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
client_metadata: Some(client_metadata),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
|
||||
)
|
||||
.await??;
|
||||
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
||||
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let warmup = websocket_server
|
||||
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 0)
|
||||
.await
|
||||
.body_json();
|
||||
let request = websocket_server
|
||||
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 1)
|
||||
.await
|
||||
.body_json();
|
||||
|
||||
assert_eq!(warmup["type"].as_str(), Some("response.create"));
|
||||
assert_eq!(warmup["generate"].as_bool(), Some(false));
|
||||
assert_eq!(request["type"].as_str(), Some("response.create"));
|
||||
assert_eq!(request["previous_response_id"].as_str(), Some("warm-1"));
|
||||
|
||||
let metadata = request["client_metadata"]["x-codex-turn-metadata"]
|
||||
.as_str()
|
||||
.map(parse_json_header)
|
||||
.unwrap_or_else(|| panic!("missing websocket x-codex-turn-metadata client metadata"));
|
||||
assert_eq!(metadata["fiber_run_id"].as_str(), Some("fiber-start-123"));
|
||||
assert_eq!(metadata["origin"].as_str(), Some("gaas"));
|
||||
assert_eq!(metadata["turn_id"].as_str(), Some(turn.id.as_str()));
|
||||
assert!(metadata.get("session_id").is_some());
|
||||
|
||||
websocket_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_config_toml(
|
||||
codex_home: &Path,
|
||||
server_uri: &str,
|
||||
supports_websockets: bool,
|
||||
) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(
|
||||
config_toml,
|
||||
format!(
|
||||
r#"
|
||||
model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "read-only"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
supports_websockets = {supports_websockets}
|
||||
"#
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
fn parse_json_header(value: &str) -> serde_json::Value {
|
||||
match serde_json::from_str(value) {
|
||||
Ok(value) => value,
|
||||
Err(err) => panic!("metadata header should be valid json: {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_request_count(
|
||||
request_log: &core_test_support::responses::ResponseMock,
|
||||
expected: usize,
|
||||
) -> Result<()> {
|
||||
timeout(DEFAULT_READ_TIMEOUT, async {
|
||||
loop {
|
||||
if request_log.requests().len() >= expected {
|
||||
return;
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
|
||||
}
|
||||
})
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
mod account;
|
||||
mod analytics;
|
||||
mod app_list;
|
||||
mod client_metadata;
|
||||
mod collaboration_mode_list;
|
||||
#[cfg(unix)]
|
||||
mod command_exec;
|
||||
|
||||
@@ -1377,6 +1377,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
|
||||
text: "first turn".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
client_metadata: None,
|
||||
cwd: Some(first_cwd.clone()),
|
||||
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
|
||||
approvals_reviewer: None,
|
||||
@@ -1416,6 +1417,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
|
||||
text: "second turn".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
client_metadata: None,
|
||||
cwd: Some(second_cwd.clone()),
|
||||
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
|
||||
approvals_reviewer: None,
|
||||
|
||||
@@ -57,6 +57,7 @@ async fn turn_steer_requires_active_turn() -> Result<()> {
|
||||
text: "steer".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
client_metadata: None,
|
||||
expected_turn_id: "turn-does-not-exist".to_string(),
|
||||
})
|
||||
.await?;
|
||||
@@ -145,6 +146,7 @@ async fn turn_steer_rejects_oversized_text_input() -> Result<()> {
|
||||
text: oversized_input.clone(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
client_metadata: None,
|
||||
expected_turn_id: turn.id.clone(),
|
||||
})
|
||||
.await?;
|
||||
@@ -247,6 +249,7 @@ async fn turn_steer_returns_active_turn_id() -> Result<()> {
|
||||
text: "steer".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
client_metadata: None,
|
||||
expected_turn_id: turn.id.clone(),
|
||||
})
|
||||
.await?;
|
||||
|
||||
@@ -1364,11 +1364,17 @@ fn parse_turn_metadata_header(turn_metadata_header: Option<&str>) -> Option<Head
|
||||
}
|
||||
|
||||
fn build_ws_client_metadata(turn_metadata_header: Option<&str>) -> Option<HashMap<String, String>> {
|
||||
let turn_metadata_header = parse_turn_metadata_header(turn_metadata_header)?;
|
||||
let turn_metadata = turn_metadata_header.to_str().ok()?.to_string();
|
||||
let mut client_metadata = HashMap::new();
|
||||
client_metadata.insert(X_CODEX_TURN_METADATA_HEADER.to_string(), turn_metadata);
|
||||
Some(client_metadata)
|
||||
if let Some(turn_metadata_header) = parse_turn_metadata_header(turn_metadata_header)
|
||||
&& let Ok(turn_metadata) = turn_metadata_header.to_str()
|
||||
{
|
||||
client_metadata.insert(
|
||||
X_CODEX_TURN_METADATA_HEADER.to_string(),
|
||||
turn_metadata.to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
(!client_metadata.is_empty()).then_some(client_metadata)
|
||||
}
|
||||
|
||||
/// Builds the extra headers attached to Responses API requests.
|
||||
|
||||
@@ -742,8 +742,11 @@ impl Codex {
|
||||
&self,
|
||||
input: Vec<UserInput>,
|
||||
expected_turn_id: Option<&str>,
|
||||
client_metadata: Option<HashMap<String, String>>,
|
||||
) -> Result<String, SteerInputError> {
|
||||
self.session.steer_input(input, expected_turn_id).await
|
||||
self.session
|
||||
.steer_input(input, expected_turn_id, client_metadata)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn set_app_server_client_name(
|
||||
@@ -3896,6 +3899,7 @@ impl Session {
|
||||
&self,
|
||||
input: Vec<UserInput>,
|
||||
expected_turn_id: Option<&str>,
|
||||
client_metadata: Option<HashMap<String, String>>,
|
||||
) -> Result<String, SteerInputError> {
|
||||
if input.is_empty() {
|
||||
return Err(SteerInputError::EmptyInput);
|
||||
@@ -3934,6 +3938,15 @@ impl Session {
|
||||
None => return Err(SteerInputError::NoActiveTurn(input)),
|
||||
}
|
||||
|
||||
if let Some(client_metadata) = client_metadata
|
||||
&& let Some((_, active_task)) = active_turn.tasks.first()
|
||||
{
|
||||
active_task
|
||||
.turn_context
|
||||
.turn_metadata_state
|
||||
.set_client_metadata(client_metadata);
|
||||
}
|
||||
|
||||
let mut turn_state = active_turn.turn_state.lock().await;
|
||||
turn_state.push_pending_input(input.into());
|
||||
Ok(active_turn_id.clone())
|
||||
@@ -4326,7 +4339,9 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
||||
.await;
|
||||
false
|
||||
}
|
||||
Op::UserInput { .. } | Op::UserTurn { .. } => {
|
||||
Op::UserInput { .. }
|
||||
| Op::UserInputWithClientMetadata { .. }
|
||||
| Op::UserTurn { .. } => {
|
||||
handlers::user_input_or_turn(&sess, sub.id.clone(), sub.op).await;
|
||||
false
|
||||
}
|
||||
@@ -4570,7 +4585,7 @@ mod handlers {
|
||||
}
|
||||
|
||||
pub async fn user_input_or_turn(sess: &Arc<Session>, sub_id: String, op: Op) {
|
||||
let (items, updates) = match op {
|
||||
let (items, updates, client_metadata) = match op {
|
||||
Op::UserTurn {
|
||||
cwd,
|
||||
approval_policy,
|
||||
@@ -4610,6 +4625,7 @@ mod handlers {
|
||||
personality,
|
||||
app_server_client_name: None,
|
||||
},
|
||||
None,
|
||||
)
|
||||
}
|
||||
Op::UserInput {
|
||||
@@ -4621,6 +4637,19 @@ mod handlers {
|
||||
final_output_json_schema: Some(final_output_json_schema),
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
),
|
||||
Op::UserInputWithClientMetadata {
|
||||
items,
|
||||
final_output_json_schema,
|
||||
client_metadata,
|
||||
} => (
|
||||
items,
|
||||
SessionSettingsUpdate {
|
||||
final_output_json_schema: Some(final_output_json_schema),
|
||||
..Default::default()
|
||||
},
|
||||
client_metadata,
|
||||
),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
@@ -4632,11 +4661,20 @@ mod handlers {
|
||||
sess.maybe_emit_unknown_model_warning_for_turn(current_context.as_ref())
|
||||
.await;
|
||||
match sess
|
||||
.steer_input(items.clone(), /*expected_turn_id*/ None)
|
||||
.steer_input(
|
||||
items.clone(),
|
||||
/*expected_turn_id*/ None,
|
||||
client_metadata.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => current_context.session_telemetry.user_prompt(&items),
|
||||
Err(SteerInputError::NoActiveTurn(items)) => {
|
||||
if let Some(client_metadata) = client_metadata {
|
||||
current_context
|
||||
.turn_metadata_state
|
||||
.set_client_metadata(client_metadata);
|
||||
}
|
||||
current_context.session_telemetry.user_prompt(&items);
|
||||
sess.refresh_mcp_servers_if_requested(¤t_context)
|
||||
.await;
|
||||
|
||||
@@ -4589,7 +4589,9 @@ async fn steer_input_requires_active_turn() {
|
||||
}];
|
||||
|
||||
let err = sess
|
||||
.steer_input(input, /*expected_turn_id*/ None)
|
||||
.steer_input(
|
||||
input, /*expected_turn_id*/ None, /*client_metadata*/ None,
|
||||
)
|
||||
.await
|
||||
.expect_err("steering without active turn should fail");
|
||||
|
||||
@@ -4618,7 +4620,11 @@ async fn steer_input_enforces_expected_turn_id() {
|
||||
text_elements: Vec::new(),
|
||||
}];
|
||||
let err = sess
|
||||
.steer_input(steer_input, Some("different-turn-id"))
|
||||
.steer_input(
|
||||
steer_input,
|
||||
Some("different-turn-id"),
|
||||
/*client_metadata*/ None,
|
||||
)
|
||||
.await
|
||||
.expect_err("mismatched expected turn id should fail");
|
||||
|
||||
@@ -4660,7 +4666,11 @@ async fn steer_input_rejects_non_regular_turns() {
|
||||
text_elements: Vec::new(),
|
||||
}];
|
||||
let err = sess
|
||||
.steer_input(steer_input, /*expected_turn_id*/ None)
|
||||
.steer_input(
|
||||
steer_input,
|
||||
/*expected_turn_id*/ None,
|
||||
/*client_metadata*/ None,
|
||||
)
|
||||
.await
|
||||
.expect_err("steering a non-regular turn should fail");
|
||||
|
||||
@@ -4692,7 +4702,7 @@ async fn steer_input_returns_active_turn_id() {
|
||||
text_elements: Vec::new(),
|
||||
}];
|
||||
let turn_id = sess
|
||||
.steer_input(steer_input, Some(&tc.sub_id))
|
||||
.steer_input(steer_input, Some(&tc.sub_id), /*client_metadata*/ None)
|
||||
.await
|
||||
.expect("steering with matching expected turn id should succeed");
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::watch;
|
||||
@@ -96,8 +97,11 @@ impl CodexThread {
|
||||
&self,
|
||||
input: Vec<UserInput>,
|
||||
expected_turn_id: Option<&str>,
|
||||
client_metadata: Option<HashMap<String, String>>,
|
||||
) -> Result<String, SteerInputError> {
|
||||
self.codex.steer_input(input, expected_turn_id).await
|
||||
self.codex
|
||||
.steer_input(input, expected_turn_id, client_metadata)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn set_app_server_client_name(
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
@@ -6,6 +7,7 @@ use std::sync::Mutex;
|
||||
use std::sync::RwLock;
|
||||
|
||||
use serde::Serialize;
|
||||
use serde_json::Value;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::sandbox_tags::sandbox_tag;
|
||||
@@ -69,6 +71,20 @@ impl TurnMetadataBag {
|
||||
}
|
||||
}
|
||||
|
||||
fn merge_client_metadata(
|
||||
header: &str,
|
||||
client_metadata: Option<&HashMap<String, String>>,
|
||||
) -> Option<String> {
|
||||
let client_metadata = client_metadata?;
|
||||
let mut metadata = serde_json::from_str::<serde_json::Map<String, Value>>(header).ok()?;
|
||||
for (key, value) in client_metadata {
|
||||
metadata
|
||||
.entry(key.clone())
|
||||
.or_insert_with(|| Value::String(value.clone()));
|
||||
}
|
||||
serde_json::to_string(&metadata).ok()
|
||||
}
|
||||
|
||||
fn build_turn_metadata_bag(
|
||||
session_id: Option<String>,
|
||||
turn_id: Option<String>,
|
||||
@@ -129,6 +145,7 @@ pub(crate) struct TurnMetadataState {
|
||||
base_metadata: TurnMetadataBag,
|
||||
base_header: String,
|
||||
enriched_header: Arc<RwLock<Option<String>>>,
|
||||
client_metadata: Arc<RwLock<Option<HashMap<String, String>>>>,
|
||||
enrichment_task: Arc<Mutex<Option<JoinHandle<()>>>>,
|
||||
}
|
||||
|
||||
@@ -159,21 +176,29 @@ impl TurnMetadataState {
|
||||
base_metadata,
|
||||
base_header,
|
||||
enriched_header: Arc::new(RwLock::new(None)),
|
||||
client_metadata: Arc::new(RwLock::new(None)),
|
||||
enrichment_task: Arc::new(Mutex::new(None)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn current_header_value(&self) -> Option<String> {
|
||||
if let Some(header) = self
|
||||
let header = if let Some(header) = self
|
||||
.enriched_header
|
||||
.read()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner)
|
||||
.as_ref()
|
||||
.cloned()
|
||||
{
|
||||
return Some(header);
|
||||
}
|
||||
Some(self.base_header.clone())
|
||||
header
|
||||
} else {
|
||||
self.base_header.clone()
|
||||
};
|
||||
let client_metadata = self
|
||||
.client_metadata
|
||||
.read()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner)
|
||||
.clone();
|
||||
merge_client_metadata(&header, client_metadata.as_ref()).or(Some(header))
|
||||
}
|
||||
|
||||
pub(crate) fn current_meta_value(&self) -> Option<serde_json::Value> {
|
||||
@@ -181,6 +206,13 @@ impl TurnMetadataState {
|
||||
.and_then(|header| serde_json::from_str(&header).ok())
|
||||
}
|
||||
|
||||
pub(crate) fn set_client_metadata(&self, client_metadata: HashMap<String, String>) {
|
||||
*self
|
||||
.client_metadata
|
||||
.write()
|
||||
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(client_metadata);
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_git_enrichment_task(&self) {
|
||||
if self.repo_root.is_none() {
|
||||
return;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use super::*;
|
||||
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use tempfile::TempDir;
|
||||
use tokio::process::Command;
|
||||
|
||||
@@ -83,3 +84,29 @@ fn turn_metadata_state_uses_platform_sandbox_tag() {
|
||||
assert_eq!(sandbox_name, Some(expected_sandbox));
|
||||
assert_eq!(session_id, Some("session-a"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn turn_metadata_state_merges_client_metadata_without_replacing_reserved_fields() {
|
||||
let temp_dir = TempDir::new().expect("temp dir");
|
||||
let cwd = temp_dir.path().to_path_buf();
|
||||
let sandbox_policy = SandboxPolicy::new_read_only_policy();
|
||||
|
||||
let state = TurnMetadataState::new(
|
||||
"session-a".to_string(),
|
||||
"turn-a".to_string(),
|
||||
cwd,
|
||||
&sandbox_policy,
|
||||
WindowsSandboxLevel::Disabled,
|
||||
);
|
||||
state.set_client_metadata(HashMap::from([
|
||||
("fiber_run_id".to_string(), "fiber-123".to_string()),
|
||||
("session_id".to_string(), "client-supplied".to_string()),
|
||||
]));
|
||||
|
||||
let header = state.current_header_value().expect("header");
|
||||
let json: Value = serde_json::from_str(&header).expect("json");
|
||||
|
||||
assert_eq!(json["fiber_run_id"].as_str(), Some("fiber-123"));
|
||||
assert_eq!(json["session_id"].as_str(), Some("session-a"));
|
||||
assert_eq!(json["turn_id"].as_str(), Some("turn-a"));
|
||||
}
|
||||
|
||||
@@ -1260,6 +1260,56 @@ async fn responses_websocket_forwards_turn_metadata_on_initial_and_incremental_c
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_preserves_custom_turn_metadata_fields() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_websocket_server(vec![vec![vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_completed("resp-1"),
|
||||
]]])
|
||||
.await;
|
||||
|
||||
let harness = websocket_harness(&server).await;
|
||||
let mut client_session = harness.client.new_session();
|
||||
let prompt = prompt_with_input(vec![message_item("hello")]);
|
||||
let turn_metadata = json!({
|
||||
"turn_id": "turn-123",
|
||||
"fiber_run_id": "fiber-123",
|
||||
"origin": "app-server",
|
||||
})
|
||||
.to_string();
|
||||
|
||||
stream_until_complete_with_turn_metadata(
|
||||
&mut client_session,
|
||||
&harness,
|
||||
&prompt,
|
||||
/*service_tier*/ None,
|
||||
Some(&turn_metadata),
|
||||
)
|
||||
.await;
|
||||
|
||||
let body = server
|
||||
.single_connection()
|
||||
.first()
|
||||
.expect("missing request")
|
||||
.body_json();
|
||||
|
||||
assert_eq!(body["type"].as_str(), Some("response.create"));
|
||||
assert_eq!(
|
||||
body["client_metadata"]["x-codex-turn-metadata"]
|
||||
.as_str()
|
||||
.map(|value| serde_json::from_str::<serde_json::Value>(value).expect("valid json")),
|
||||
Some(json!({
|
||||
"turn_id": "turn-123",
|
||||
"fiber_run_id": "fiber-123",
|
||||
"origin": "app-server",
|
||||
}))
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_uses_previous_response_id_when_prefix_after_completed() {
|
||||
skip_if_no_network!();
|
||||
@@ -1810,6 +1860,23 @@ async fn stream_until_complete_with_turn_metadata(
|
||||
prompt: &Prompt,
|
||||
service_tier: Option<ServiceTier>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
) {
|
||||
stream_until_complete_with_request_metadata(
|
||||
client_session,
|
||||
harness,
|
||||
prompt,
|
||||
service_tier,
|
||||
turn_metadata_header,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn stream_until_complete_with_request_metadata(
|
||||
client_session: &mut ModelClientSession,
|
||||
harness: &WebsocketTestHarness,
|
||||
prompt: &Prompt,
|
||||
service_tier: Option<ServiceTier>,
|
||||
turn_metadata_header: Option<&str>,
|
||||
) {
|
||||
let mut stream = client_session
|
||||
.stream(
|
||||
|
||||
@@ -677,6 +677,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
params: TurnStartParams {
|
||||
thread_id: primary_thread_id_for_span.clone(),
|
||||
input: items.into_iter().map(Into::into).collect(),
|
||||
client_metadata: None,
|
||||
cwd: Some(default_cwd),
|
||||
approval_policy: Some(default_approval_policy.into()),
|
||||
approvals_reviewer: None,
|
||||
|
||||
@@ -242,6 +242,18 @@ pub enum Op {
|
||||
final_output_json_schema: Option<Value>,
|
||||
},
|
||||
|
||||
/// App-server user input carrying turn-scoped Responses API client metadata.
|
||||
UserInputWithClientMetadata {
|
||||
/// User input items, see `InputItem`
|
||||
items: Vec<UserInput>,
|
||||
/// Optional JSON Schema used to constrain the final assistant message for this turn.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
final_output_json_schema: Option<Value>,
|
||||
/// Optional turn-scoped Responses API `client_metadata`.
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
client_metadata: Option<HashMap<String, String>>,
|
||||
},
|
||||
|
||||
/// Similar to [`Op::UserInput`], but contains additional context required
|
||||
/// for a turn of a [`crate::codex_thread::CodexThread`].
|
||||
UserTurn {
|
||||
@@ -577,6 +589,7 @@ impl Op {
|
||||
Self::RealtimeConversationText(_) => "realtime_conversation_text",
|
||||
Self::RealtimeConversationClose => "realtime_conversation_close",
|
||||
Self::UserInput { .. } => "user_input",
|
||||
Self::UserInputWithClientMetadata { .. } => "user_input_with_client_metadata",
|
||||
Self::UserTurn { .. } => "user_turn",
|
||||
Self::InterAgentCommunication { .. } => "inter_agent_communication",
|
||||
Self::OverrideTurnContext { .. } => "override_turn_context",
|
||||
@@ -4491,6 +4504,33 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn user_input_with_client_metadata_round_trips() -> Result<()> {
|
||||
let op = Op::UserInputWithClientMetadata {
|
||||
items: Vec::new(),
|
||||
final_output_json_schema: None,
|
||||
client_metadata: Some(HashMap::from([(
|
||||
"fiber_run_id".to_string(),
|
||||
"fiber-123".to_string(),
|
||||
)])),
|
||||
};
|
||||
|
||||
let json_op = serde_json::to_value(&op)?;
|
||||
assert_eq!(
|
||||
json_op,
|
||||
json!({
|
||||
"type": "user_input_with_client_metadata",
|
||||
"items": [],
|
||||
"client_metadata": {
|
||||
"fiber_run_id": "fiber-123",
|
||||
}
|
||||
})
|
||||
);
|
||||
assert_eq!(serde_json::from_value::<Op>(json_op)?, op);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn user_input_text_serializes_empty_text_elements() -> Result<()> {
|
||||
let input = UserInput::Text {
|
||||
|
||||
@@ -417,6 +417,7 @@ impl AppServerSession {
|
||||
params: TurnStartParams {
|
||||
thread_id: thread_id.to_string(),
|
||||
input: items.into_iter().map(Into::into).collect(),
|
||||
client_metadata: None,
|
||||
cwd: Some(cwd),
|
||||
approval_policy: Some(approval_policy.into()),
|
||||
approvals_reviewer: Some(approvals_reviewer.into()),
|
||||
@@ -467,6 +468,7 @@ impl AppServerSession {
|
||||
params: TurnSteerParams {
|
||||
thread_id: thread_id.to_string(),
|
||||
input: items.into_iter().map(Into::into).collect(),
|
||||
client_metadata: None,
|
||||
expected_turn_id: turn_id,
|
||||
},
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user