Compare commits

...

20 Commits

Author SHA1 Message Date
nicholasclark-openai
8ee421c407 Merge branch 'nicholasclark/tool-call-task-headers-_meta' into nicholasclark/app-server-inprocess-parent-request-headers 2026-03-19 14:56:09 -07:00
nicholasclark-openai
5305a38085 Merge branch 'main' into nicholasclark/tool-call-task-headers-_meta 2026-03-19 14:33:47 -07:00
nicholasclark-openai
656d2f5724 Merge branch 'main' into nicholasclark/tool-call-task-headers-_meta 2026-03-19 14:15:56 -07:00
nicholasclark-openai
2bcc504df8 Merge branch 'nicholasclark/tool-call-task-headers-_meta' into nicholasclark/app-server-inprocess-parent-request-headers 2026-03-19 13:51:58 -07:00
nicholasclark-openai
6bf3961d66 Merge branch 'main' into nicholasclark/tool-call-task-headers-_meta 2026-03-19 13:32:18 -07:00
nicholasclark-openai
5bf604ca69 Keep parent request values in turn metadata only
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 13:13:06 -07:00
nicholasclark-openai
b679c7596b Merge branch 'main' into nicholasclark/tool-call-task-headers-_meta 2026-03-19 12:45:36 -07:00
nicholasclark-openai
cbd0700837 Restore explicit RMCP tools/call request path
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 12:38:16 -07:00
nicholasclark-openai
ec278abd94 codex: tighten turn metadata staging
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 12:21:18 -07:00
nicholasclark-openai
8efc728deb codex: narrow turn metadata propagation
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 12:03:57 -07:00
nicholasclark-openai
6e8ebdc5bc codex: seed turn metadata from app-server turn start
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 12:03:57 -07:00
nicholasclark-openai
db3ddeae35 Restore RMCP meta local variable name
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 11:58:51 -07:00
nicholasclark-openai
88653c9245 Simplify RMCP tool call _meta plumbing
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 11:50:06 -07:00
nicholasclark-openai
98f97df37f Skip non-JSON apps requests in search tool test
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 11:35:21 -07:00
nicholasclark-openai
e84f54294a Include session id in MCP turn metadata
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 11:18:05 -07:00
nicholasclark-openai
fb816b1322 Merge branch 'main' into nicholasclark/tool-call-task-headers-_meta 2026-03-19 10:46:08 -07:00
nicholasclark-openai
edc8e17c5c Merge RMCP tool call _meta entries
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 10:31:51 -07:00
nicholasclark-openai
9124fe9cbc Plumb MCP turn metadata through _meta
Co-authored-by: Codex <noreply@openai.com>
2026-03-19 09:29:54 -07:00
nicholasclark-openai
7df3c80f25 Merge branch 'main' into revert-15011-nicholasclark/tool-call-task-headers 2026-03-19 09:26:43 -07:00
nicholasclark-openai
bab2a28f26 Revert "Forward session and turn headers to MCP HTTP requests (#15011)"
This reverts commit b14689df3b.
2026-03-18 21:40:32 -07:00
21 changed files with 652 additions and 17 deletions

View File

@@ -3116,6 +3116,16 @@
},
"type": "array"
},
"metadata": {
"additionalProperties": {
"type": "string"
},
"description": "Optional string metadata attached only to this turn.",
"type": [
"object",
"null"
]
},
"model": {
"description": "Override the model for this turn and subsequent turns.",
"type": [

View File

@@ -13954,6 +13954,16 @@
},
"type": "array"
},
"metadata": {
"additionalProperties": {
"type": "string"
},
"description": "Optional string metadata attached only to this turn.",
"type": [
"object",
"null"
]
},
"model": {
"description": "Override the model for this turn and subsequent turns.",
"type": [

View File

@@ -11714,6 +11714,16 @@
},
"type": "array"
},
"metadata": {
"additionalProperties": {
"type": "string"
},
"description": "Optional string metadata attached only to this turn.",
"type": [
"object",
"null"
]
},
"model": {
"description": "Override the model for this turn and subsequent turns.",
"type": [

View File

@@ -545,6 +545,16 @@
},
"type": "array"
},
"metadata": {
"additionalProperties": {
"type": "string"
},
"description": "Optional string metadata attached only to this turn.",
"type": [
"object",
"null"
]
},
"model": {
"description": "Override the model for this turn and subsequent turns.",
"type": [

View File

@@ -13,6 +13,9 @@ import type { SandboxPolicy } from "./SandboxPolicy";
import type { UserInput } from "./UserInput";
export type TurnStartParams = {threadId: string, input: Array<UserInput>, /**
* Optional string metadata attached only to this turn.
*/
metadata?: { [key in string]?: string } | null, /**
* Override the working directory for this turn and subsequent turns.
*/
cwd?: string | null, /**

View File

@@ -3831,6 +3831,9 @@ pub enum TurnStatus {
pub struct TurnStartParams {
pub thread_id: String,
pub input: Vec<UserInput>,
/// Optional string metadata attached only to this turn.
#[ts(optional = nullable)]
pub metadata: Option<BTreeMap<String, String>>,
/// Override the working directory for this turn and subsequent turns.
#[ts(optional = nullable)]
pub cwd: Option<PathBuf>,
@@ -7938,6 +7941,7 @@ mod tests {
let without_override = TurnStartParams {
thread_id: "thread_123".to_string(),
input: vec![],
metadata: None,
cwd: None,
approval_policy: None,
approvals_reviewer: None,

View File

@@ -69,7 +69,7 @@ Use the thread APIs to create, list, or archive conversations. Drive a conversat
- Initialize once per connection: Immediately after opening a transport connection, send an `initialize` request with your client metadata, then emit an `initialized` notification. Any other request on that connection before this handshake gets rejected.
- Start (or resume) a thread: Call `thread/start` to open a fresh conversation. The response returns the thread object and youll also get a `thread/started` notification. If youre continuing an existing conversation, call `thread/resume` with its ID instead. If you want to branch from an existing conversation, call `thread/fork` to create a new thread id with copied history. Like `thread/start`, `thread/fork` also accepts `ephemeral: true` for an in-memory temporary thread.
The returned `thread.ephemeral` flag tells you whether the session is intentionally in-memory only; when it is `true`, `thread.path` is `null`.
- Begin a turn: To send user input, call `turn/start` with the target `threadId` and the user's input. Optional fields let you override model, cwd, sandbox policy, approval policy, approvals reviewer, etc. This immediately returns the new turn object. The app-server emits `turn/started` when that turn actually begins running.
- Begin a turn: To send user input, call `turn/start` with the target `threadId` and the user's input. Optional fields let you override model, cwd, sandbox policy, approval policy, approvals reviewer, etc. `turn/start.metadata` also lets callers attach arbitrary string metadata to that turn. This immediately returns the new turn object. The app-server emits `turn/started` when that turn actually begins running.
- Stream events: After `turn/start`, keep reading JSON-RPC notifications on stdout. Youll see `item/started`, `item/completed`, deltas like `item/agentMessage/delta`, tool progress, etc. These represent streaming model output plus any side effects (commands, tool calls, reasoning notes).
- Finish the turn: When the model is done (or the turn is interrupted via making the `turn/interrupt` call), the server sends `turn/completed` with the final turn state and token usage.
@@ -139,7 +139,7 @@ Example with notification opt-out:
- `thread/shellCommand` — run a user-initiated `!` shell command against a thread; this runs unsandboxed with full access rather than inheriting the thread sandbox policy. Returns `{}` immediately while progress streams through standard turn/item notifications and any active turn receives the formatted output in its message stream.
- `thread/backgroundTerminals/clean` — terminate all running background terminals for a thread (experimental; requires `capabilities.experimentalApi`); returns `{}` when the cleanup request is accepted.
- `thread/rollback` — drop the last N turns from the agents in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. `metadata` accepts arbitrary string key/value pairs for turn-scoped observability and request forwarding. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
- `turn/steer` — add user input to an already in-flight turn without starting a new turn; returns the active `turnId` that accepted the input.
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
- `thread/realtime/start` — start a thread-scoped realtime session (experimental); returns `{}` and streams `thread/realtime/*` notifications.

View File

@@ -279,6 +279,7 @@ use codex_state::ThreadMetadataBuilder;
use codex_state::log_db::LogDbLayer;
use codex_utils_json_to_toml::json_to_toml;
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::ffi::OsStr;
@@ -5973,6 +5974,8 @@ impl CodexMessageProcessor {
.into_iter()
.map(V2UserInput::into_core)
.collect();
let next_turn_metadata = params.metadata.clone();
let clear_next_turn_metadata_on_error = next_turn_metadata.is_some();
let has_any_overrides = params.cwd.is_some()
|| params.approval_policy.is_some()
@@ -6010,6 +6013,13 @@ impl CodexMessageProcessor {
.await;
}
if let Some(metadata) = next_turn_metadata
&& let Err(error) = Self::set_next_turn_metadata(thread.as_ref(), metadata).await
{
self.outgoing.send_error(request_id, error).await;
return;
}
// Start the turn by submitting the user input. Return its submission id as turn_id.
let turn_id = self
.submit_core_op(
@@ -6038,6 +6048,15 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
if clear_next_turn_metadata_on_error
&& let Err(clear_error) =
Self::set_next_turn_metadata(thread.as_ref(), BTreeMap::new()).await
{
warn!(
error = %clear_error.message,
"failed to clear next turn metadata after turn/start submission error"
);
}
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start turn: {err}"),
@@ -6062,6 +6081,20 @@ impl CodexMessageProcessor {
})
}
async fn set_next_turn_metadata(
thread: &CodexThread,
metadata: BTreeMap<String, String>,
) -> Result<(), JSONRPCErrorError> {
thread
.set_next_turn_metadata(metadata)
.await
.map_err(|err| JSONRPCErrorError {
code: INVALID_PARAMS_ERROR_CODE,
message: err.to_string(),
data: None,
})
}
async fn turn_steer(&self, request_id: ConnectionRequestId, params: TurnSteerParams) {
let (_, thread) = match self.load_thread(&params.thread_id).await {
Ok(v) => v,

View File

@@ -730,6 +730,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
#[cfg(test)]
mod tests {
use super::*;
use app_test_support::create_mock_responses_server_repeating_assistant;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ConfigRequirementsReadResponse;
use codex_app_server_protocol::SessionSource as ApiSessionSource;
@@ -737,9 +738,13 @@ mod tests {
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput;
use codex_core::config::ConfigBuilder;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
async fn build_test_config() -> Config {
match ConfigBuilder::default().build().await {
@@ -857,6 +862,175 @@ mod tests {
.expect("in-process runtime should shutdown cleanly");
}
#[tokio::test]
async fn in_process_start_forwards_turn_metadata_on_turn_requests() {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new().expect("tempdir should create");
let mut client = start(InProcessStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config: Arc::new(
ConfigBuilder::default()
.codex_home(codex_home.path().to_path_buf())
.build()
.await
.expect("test config should load"),
),
cli_overrides: vec![
(
"model_provider".to_string(),
TomlValue::String("mock_provider".to_string()),
),
(
"model_providers.mock_provider.name".to_string(),
TomlValue::String("Mock provider for test".to_string()),
),
(
"model_providers.mock_provider.base_url".to_string(),
TomlValue::String(format!("{}/v1", server.uri())),
),
(
"model_providers.mock_provider.wire_api".to_string(),
TomlValue::String("responses".to_string()),
),
(
"model_providers.mock_provider.request_max_retries".to_string(),
TomlValue::Integer(0),
),
(
"model_providers.mock_provider.stream_max_retries".to_string(),
TomlValue::Integer(0),
),
],
loader_overrides: LoaderOverrides::default(),
cloud_requirements: CloudRequirementsLoader::default(),
auth_manager: None,
thread_manager: None,
feedback: CodexFeedback::new(),
config_warnings: Vec::new(),
session_source: SessionSource::Cli,
enable_codex_api_key_env: false,
initialize: InitializeParams {
client_info: ClientInfo {
name: "codex-in-process-test".to_string(),
title: None,
version: "0.0.0".to_string(),
},
capabilities: None,
},
channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
})
.await
.expect("in-process runtime should start");
let thread_start_response: ThreadStartResponse = serde_json::from_value(
client
.request(ClientRequest::ThreadStart {
request_id: RequestId::Integer(5),
params: ThreadStartParams {
model: Some("mock-model".to_string()),
ephemeral: Some(true),
..ThreadStartParams::default()
},
})
.await
.expect("thread/start request should work")
.expect("thread/start should succeed"),
)
.expect("thread/start response should parse");
let _turn_start_response: TurnStartResponse = serde_json::from_value(
client
.request(ClientRequest::TurnStart {
request_id: RequestId::Integer(6),
params: TurnStartParams {
thread_id: thread_start_response.thread.id,
input: vec![UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
metadata: Some(std::collections::BTreeMap::from([
(
"parentConversationId".to_string(),
"parent-conversation-123".to_string(),
),
(
"parentMessageId".to_string(),
"parent-message-123".to_string(),
),
("parentTurnId".to_string(), "parent-turn-123".to_string()),
])),
..TurnStartParams::default()
},
})
.await
.expect("turn/start request should work")
.expect("turn/start should succeed"),
)
.expect("turn/start response should parse");
let turn_completed = loop {
let event = timeout(std::time::Duration::from_secs(10), client.next_event())
.await
.expect("timed out waiting for turn completion");
match event {
Some(InProcessServerEvent::ServerNotification(
ServerNotification::TurnCompleted(notification),
)) => break notification,
Some(_) => continue,
None => panic!("in-process runtime exited before turn completion"),
}
};
assert_eq!(
turn_completed.turn.status,
TurnStatus::Completed,
"turn error: {:?}",
turn_completed.turn.error
);
assert_eq!(turn_completed.turn.error, None);
let requests = timeout(std::time::Duration::from_secs(5), async {
loop {
let requests = server
.received_requests()
.await
.expect("received requests should be available");
if !requests.is_empty() {
break requests;
}
tokio::task::yield_now().await;
}
})
.await
.expect("timed out waiting for outbound requests");
assert!(!requests.is_empty());
for request in requests {
let turn_metadata_header = request
.headers
.get("x-codex-turn-metadata")
.and_then(|value| value.to_str().ok())
.expect("turn metadata header should be present");
let turn_metadata: serde_json::Value =
serde_json::from_str(turn_metadata_header).expect("turn metadata should be JSON");
assert_eq!(
turn_metadata.pointer("/metadata/parentConversationId"),
Some(&serde_json::json!("parent-conversation-123"))
);
assert_eq!(
turn_metadata.pointer("/metadata/parentMessageId"),
Some(&serde_json::json!("parent-message-123"))
);
assert_eq!(
turn_metadata.pointer("/metadata/parentTurnId"),
Some(&serde_json::json!("parent-turn-123"))
);
}
client
.shutdown()
.await
.expect("in-process runtime should shutdown cleanly");
}
#[test]
fn guaranteed_delivery_helpers_cover_terminal_notifications() {
assert!(server_notification_requires_delivery(

View File

@@ -601,6 +601,7 @@ async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
personality: None,
output_schema: None,
collaboration_mode: None,
metadata: None,
},
},
Some(remote_trace),

View File

@@ -1394,6 +1394,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
personality: None,
output_schema: None,
collaboration_mode: None,
metadata: None,
})
.await?;
timeout(
@@ -1427,6 +1428,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
personality: None,
output_schema: None,
collaboration_mode: None,
metadata: None,
})
.await?;
timeout(

View File

@@ -1,3 +1,4 @@
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt::Debug;
@@ -591,6 +592,7 @@ impl Codex {
persist_extended_history,
inherited_shell_snapshot,
user_shell_override,
next_turn_metadata: BTreeMap::new(),
};
// Generate a unique ID for the lifetime of this Codex session.
@@ -716,6 +718,18 @@ impl Codex {
.await
}
pub(crate) async fn set_next_turn_metadata(
&self,
metadata: BTreeMap<String, String>,
) -> ConstraintResult<()> {
self.session
.update_settings(SessionSettingsUpdate {
next_turn_metadata: Some(metadata),
..Default::default()
})
.await
}
pub(crate) async fn agent_status(&self) -> AgentStatus {
self.agent_status.borrow().clone()
}
@@ -1056,6 +1070,8 @@ pub(crate) struct SessionConfiguration {
persist_extended_history: bool,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
user_shell_override: Option<shell::Shell>,
/// One-shot metadata consumed by the next turn created from this session.
next_turn_metadata: BTreeMap<String, String>,
}
impl SessionConfiguration {
@@ -1098,6 +1114,9 @@ impl SessionConfiguration {
if let Some(personality) = updates.personality {
next_configuration.personality = Some(personality);
}
if let Some(next_turn_metadata) = updates.next_turn_metadata.clone() {
next_configuration.next_turn_metadata = next_turn_metadata;
}
if let Some(approval_policy) = updates.approval_policy {
next_configuration.approval_policy.set(approval_policy)?;
}
@@ -1148,6 +1167,8 @@ pub(crate) struct SessionSettingsUpdate {
pub(crate) final_output_json_schema: Option<Option<Value>>,
pub(crate) personality: Option<Personality>,
pub(crate) app_server_client_name: Option<String>,
/// One-shot metadata for the next turn created from this update.
pub(crate) next_turn_metadata: Option<BTreeMap<String, String>>,
}
impl Session {
@@ -1286,6 +1307,7 @@ impl Session {
#[allow(clippy::too_many_arguments)]
fn make_turn_context(
conversation_id: ThreadId,
auth_manager: Option<Arc<AuthManager>>,
session_telemetry: &SessionTelemetry,
provider: ModelProviderInfo,
@@ -1336,10 +1358,12 @@ impl Session {
let cwd = session_configuration.cwd.clone();
let turn_metadata_state = Arc::new(TurnMetadataState::new(
conversation_id.to_string(),
sub_id.clone(),
cwd.clone(),
session_configuration.sandbox_policy.get(),
session_configuration.windows_sandbox_level,
session_configuration.next_turn_metadata.clone(),
));
let (current_date, timezone) = local_time_context();
TurnContext {
@@ -2306,7 +2330,9 @@ impl Session {
state.session_configuration.sandbox_policy != next.sandbox_policy;
let codex_home = next.codex_home.clone();
let session_source = next.session_source.clone();
state.session_configuration = next.clone();
let mut persisted_configuration = next.clone();
persisted_configuration.next_turn_metadata.clear();
state.session_configuration = persisted_configuration;
(
next,
sandbox_policy_changed,
@@ -2394,6 +2420,7 @@ impl Session {
.skills_for_config(&per_turn_config),
);
let mut turn_context: TurnContext = Self::make_turn_context(
self.conversation_id,
Some(Arc::clone(&self.services.auth_manager)),
&self.services.session_telemetry,
session_configuration.provider.clone(),
@@ -4491,6 +4518,7 @@ mod handlers {
reasoning_summary: summary,
service_tier,
final_output_json_schema: Some(final_output_json_schema),
next_turn_metadata: None,
personality,
app_server_client_name: None,
},
@@ -5220,10 +5248,12 @@ async fn spawn_review_thread(
let per_turn_config = Arc::new(per_turn_config);
let review_turn_id = sub_id.to_string();
let turn_metadata_state = Arc::new(TurnMetadataState::new(
sess.conversation_id.to_string(),
review_turn_id.clone(),
parent_turn_context.cwd.clone(),
parent_turn_context.sandbox_policy.get(),
parent_turn_context.windows_sandbox_level,
parent_turn_context.turn_metadata_state.metadata().clone(),
));
let review_turn_context = TurnContext {

View File

@@ -1660,6 +1660,7 @@ async fn set_rate_limits_retains_previous_credits() {
persist_extended_history: false,
inherited_shell_snapshot: None,
user_shell_override: None,
next_turn_metadata: std::collections::BTreeMap::new(),
};
let mut state = SessionState::new(session_configuration);
@@ -1758,6 +1759,7 @@ async fn set_rate_limits_updates_plan_type_when_present() {
persist_extended_history: false,
inherited_shell_snapshot: None,
user_shell_override: None,
next_turn_metadata: std::collections::BTreeMap::new(),
};
let mut state = SessionState::new(session_configuration);
@@ -2102,6 +2104,7 @@ pub(crate) async fn make_session_configuration_for_tests() -> SessionConfigurati
persist_extended_history: false,
inherited_shell_snapshot: None,
user_shell_override: None,
next_turn_metadata: std::collections::BTreeMap::new(),
}
}
@@ -2275,6 +2278,46 @@ async fn session_configuration_apply_rederives_legacy_file_system_policy_on_cwd_
);
}
#[tokio::test]
async fn new_turn_with_sub_id_consumes_next_turn_metadata_once() {
let (session, _) = make_session_and_context().await;
let next_turn_metadata = std::collections::BTreeMap::from([(
"parentTurnId".to_string(),
"parent-turn-123".to_string(),
)]);
session
.update_settings(SessionSettingsUpdate {
next_turn_metadata: Some(next_turn_metadata.clone()),
..Default::default()
})
.await
.expect("test setup should allow updating next turn metadata");
let first_turn_context = session
.new_turn_with_sub_id("turn-1".to_string(), SessionSettingsUpdate::default())
.await
.expect("first turn should be created");
assert_eq!(
first_turn_context.turn_metadata_state.metadata(),
&next_turn_metadata
);
let state = session.state.lock().await;
assert!(state.session_configuration.next_turn_metadata.is_empty());
drop(state);
let second_turn_context = session
.new_turn_with_sub_id("turn-2".to_string(), SessionSettingsUpdate::default())
.await
.expect("second turn should be created");
assert!(
second_turn_context
.turn_metadata_state
.metadata()
.is_empty()
);
}
#[tokio::test]
async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() {
let codex_home = tempfile::tempdir().expect("create temp dir");
@@ -2333,6 +2376,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() {
persist_extended_history: false,
inherited_shell_snapshot: None,
user_shell_override: None,
next_turn_metadata: std::collections::BTreeMap::new(),
};
let (tx_event, _rx_event) = async_channel::unbounded();
@@ -2428,6 +2472,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
persist_extended_history: false,
inherited_shell_snapshot: None,
user_shell_override: None,
next_turn_metadata: std::collections::BTreeMap::new(),
};
let per_turn_config = Session::build_per_turn_config(&session_configuration);
let model_info = ModelsManager::construct_model_info_offline_for_tests(
@@ -2517,6 +2562,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
let skills_outcome = Arc::new(services.skills_manager.skills_for_config(&per_turn_config));
let turn_context = Session::make_turn_context(
conversation_id,
Some(Arc::clone(&auth_manager)),
&session_telemetry,
session_configuration.provider.clone(),
@@ -3226,6 +3272,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
persist_extended_history: false,
inherited_shell_snapshot: None,
user_shell_override: None,
next_turn_metadata: std::collections::BTreeMap::new(),
};
let per_turn_config = Session::build_per_turn_config(&session_configuration);
let model_info = ModelsManager::construct_model_info_offline_for_tests(
@@ -3315,6 +3362,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
let skills_outcome = Arc::new(services.skills_manager.skills_for_config(&per_turn_config));
let turn_context = Arc::new(Session::make_turn_context(
conversation_id,
Some(Arc::clone(&auth_manager)),
&session_telemetry,
session_configuration.provider.clone(),

View File

@@ -22,6 +22,7 @@ use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::W3cTraceContext;
use codex_protocol::user_input::UserInput;
use std::collections::BTreeMap;
use std::path::PathBuf;
use tokio::sync::Mutex;
use tokio::sync::watch;
@@ -99,6 +100,13 @@ impl CodexThread {
.await
}
pub async fn set_next_turn_metadata(
&self,
metadata: BTreeMap<String, String>,
) -> ConstraintResult<()> {
self.codex.set_next_turn_metadata(metadata).await
}
/// Use sparingly: this is intended to be removed soon.
pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> {
self.codex.submit_with_id(sub).await

View File

@@ -119,7 +119,8 @@ pub(crate) async fn handle_mcp_tool_call(
);
return CallToolResult::from_result(result);
}
let request_meta = build_mcp_tool_call_request_meta(&server, metadata.as_ref());
let request_meta =
build_mcp_tool_call_request_meta(turn_context.as_ref(), &server, metadata.as_ref());
let tool_call_begin_event = EventMsg::McpToolCallBegin(McpToolCallBeginEvent {
call_id: call_id.clone(),
@@ -388,20 +389,31 @@ pub(crate) struct McpToolApprovalMetadata {
}
const MCP_TOOL_CODEX_APPS_META_KEY: &str = "_codex_apps";
fn build_mcp_tool_call_request_meta(
turn_context: &TurnContext,
server: &str,
metadata: Option<&McpToolApprovalMetadata>,
) -> Option<serde_json::Value> {
if server != CODEX_APPS_MCP_SERVER_NAME {
return None;
let mut request_meta = serde_json::Map::new();
if let Some(turn_metadata) = turn_context.turn_metadata_state.current_meta_value() {
request_meta.insert(
crate::X_CODEX_TURN_METADATA_HEADER.to_string(),
turn_metadata,
);
}
let codex_apps_meta = metadata.and_then(|metadata| metadata.codex_apps_meta.as_ref())?;
if server == CODEX_APPS_MCP_SERVER_NAME
&& let Some(codex_apps_meta) =
metadata.and_then(|metadata| metadata.codex_apps_meta.clone())
{
request_meta.insert(
MCP_TOOL_CODEX_APPS_META_KEY.to_string(),
serde_json::Value::Object(codex_apps_meta),
);
}
Some(serde_json::json!({
MCP_TOOL_CODEX_APPS_META_KEY: codex_apps_meta,
}))
(!request_meta.is_empty()).then_some(serde_json::Value::Object(request_meta))
}
#[derive(Clone, Copy)]

View File

@@ -439,8 +439,39 @@ fn sanitize_mcp_tool_result_for_model_preserves_image_when_supported() {
assert_eq!(got, original);
}
#[test]
fn codex_apps_tool_call_request_meta_includes_codex_apps_meta() {
#[tokio::test]
async fn mcp_tool_call_request_meta_includes_turn_metadata_for_custom_server() {
let (_, turn_context) = make_session_and_context().await;
let expected_turn_metadata = serde_json::from_str::<serde_json::Value>(
&turn_context
.turn_metadata_state
.current_header_value()
.expect("turn metadata header"),
)
.expect("turn metadata json");
let meta =
build_mcp_tool_call_request_meta(&turn_context, "custom_server", /*metadata*/ None)
.expect("custom servers should receive turn metadata");
assert_eq!(
meta,
serde_json::json!({
crate::X_CODEX_TURN_METADATA_HEADER: expected_turn_metadata,
})
);
}
#[tokio::test]
async fn codex_apps_tool_call_request_meta_includes_turn_metadata_and_codex_apps_meta() {
let (_, turn_context) = make_session_and_context().await;
let expected_turn_metadata = serde_json::from_str::<serde_json::Value>(
&turn_context
.turn_metadata_state
.current_header_value()
.expect("turn metadata header"),
)
.expect("turn metadata json");
let metadata = McpToolApprovalMetadata {
annotations: None,
connector_id: Some("calendar".to_string()),
@@ -461,8 +492,13 @@ fn codex_apps_tool_call_request_meta_includes_codex_apps_meta() {
};
assert_eq!(
build_mcp_tool_call_request_meta(CODEX_APPS_MCP_SERVER_NAME, Some(&metadata)),
build_mcp_tool_call_request_meta(
&turn_context,
CODEX_APPS_MCP_SERVER_NAME,
Some(&metadata),
),
Some(serde_json::json!({
crate::X_CODEX_TURN_METADATA_HEADER: expected_turn_metadata,
MCP_TOOL_CODEX_APPS_META_KEY: {
"resource_uri": "connector://calendar/tools/calendar_create_event",
"contains_mcp_source": true,

View File

@@ -53,12 +53,16 @@ impl From<WorkspaceGitMetadata> for TurnMetadataWorkspace {
#[derive(Clone, Debug, Serialize, Default)]
pub(crate) struct TurnMetadataBag {
#[serde(default, skip_serializing_if = "Option::is_none")]
session_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
turn_id: Option<String>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
workspaces: BTreeMap<String, TurnMetadataWorkspace>,
#[serde(default, skip_serializing_if = "Option::is_none")]
sandbox: Option<String>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
metadata: BTreeMap<String, String>,
}
impl TurnMetadataBag {
@@ -68,10 +72,12 @@ impl TurnMetadataBag {
}
fn build_turn_metadata_bag(
session_id: Option<String>,
turn_id: Option<String>,
sandbox: Option<String>,
repo_root: Option<String>,
workspace_git_metadata: Option<WorkspaceGitMetadata>,
metadata: BTreeMap<String, String>,
) -> TurnMetadataBag {
let mut workspaces = BTreeMap::new();
if let (Some(repo_root), Some(workspace_git_metadata)) = (repo_root, workspace_git_metadata)
@@ -81,9 +87,11 @@ fn build_turn_metadata_bag(
}
TurnMetadataBag {
session_id,
turn_id,
workspaces,
sandbox,
metadata,
}
}
@@ -104,6 +112,7 @@ pub async fn build_turn_metadata_header(cwd: &Path, sandbox: Option<&str>) -> Op
}
build_turn_metadata_bag(
/*session_id*/ None,
/*turn_id*/ None,
sandbox.map(ToString::to_string),
repo_root,
@@ -112,6 +121,7 @@ pub async fn build_turn_metadata_header(cwd: &Path, sandbox: Option<&str>) -> Op
latest_git_commit_hash,
has_changes,
}),
BTreeMap::new(),
)
.to_header_value()
}
@@ -128,18 +138,22 @@ pub(crate) struct TurnMetadataState {
impl TurnMetadataState {
pub(crate) fn new(
session_id: String,
turn_id: String,
cwd: PathBuf,
sandbox_policy: &SandboxPolicy,
windows_sandbox_level: WindowsSandboxLevel,
metadata: BTreeMap<String, String>,
) -> Self {
let repo_root = get_git_repo_root(&cwd).map(|root| root.to_string_lossy().into_owned());
let sandbox = Some(sandbox_tag(sandbox_policy, windows_sandbox_level).to_string());
let base_metadata = build_turn_metadata_bag(
Some(session_id),
Some(turn_id),
sandbox,
/*repo_root*/ None,
/*workspace_git_metadata*/ None,
metadata,
);
let base_header = base_metadata
.to_header_value()
@@ -168,6 +182,15 @@ impl TurnMetadataState {
Some(self.base_header.clone())
}
pub(crate) fn current_meta_value(&self) -> Option<serde_json::Value> {
self.current_header_value()
.and_then(|header| serde_json::from_str(&header).ok())
}
pub(crate) fn metadata(&self) -> &BTreeMap<String, String> {
&self.base_metadata.metadata
}
pub(crate) fn spawn_git_enrichment_task(&self) {
if self.repo_root.is_none() {
return;
@@ -189,10 +212,12 @@ impl TurnMetadataState {
};
let enriched_metadata = build_turn_metadata_bag(
state.base_metadata.session_id.clone(),
state.base_metadata.turn_id.clone(),
state.base_metadata.sandbox.clone(),
Some(repo_root),
Some(workspace_git_metadata),
state.base_metadata.metadata.clone(),
);
if enriched_metadata.workspaces.is_empty() {
return;
@@ -231,7 +256,6 @@ impl TurnMetadataState {
}
}
}
#[cfg(test)]
#[path = "turn_metadata_tests.rs"]
mod tests;

View File

@@ -1,5 +1,7 @@
use super::*;
use std::collections::BTreeMap;
use serde_json::Value;
use tempfile::TempDir;
use tokio::process::Command;
@@ -67,16 +69,46 @@ fn turn_metadata_state_uses_platform_sandbox_tag() {
let sandbox_policy = SandboxPolicy::new_read_only_policy();
let state = TurnMetadataState::new(
"session-a".to_string(),
"turn-a".to_string(),
cwd,
&sandbox_policy,
WindowsSandboxLevel::Disabled,
BTreeMap::new(),
);
let header = state.current_header_value().expect("header");
let json: Value = serde_json::from_str(&header).expect("json");
let sandbox_name = json.get("sandbox").and_then(Value::as_str);
let session_id = json.get("session_id").and_then(Value::as_str);
let expected_sandbox = sandbox_tag(&sandbox_policy, WindowsSandboxLevel::Disabled);
assert_eq!(sandbox_name, Some(expected_sandbox));
assert_eq!(session_id, Some("session-a"));
}
#[test]
fn turn_metadata_state_serializes_custom_metadata() {
let temp_dir = TempDir::new().expect("temp dir");
let cwd = temp_dir.path().to_path_buf();
let sandbox_policy = SandboxPolicy::new_read_only_policy();
let metadata = BTreeMap::from([
("parentConversationId".to_string(), "conv-123".to_string()),
("parentMessageId".to_string(), "msg-123".to_string()),
("parentTurnId".to_string(), "turn-123".to_string()),
]);
let state = TurnMetadataState::new(
"session-a".to_string(),
"turn-a".to_string(),
cwd,
&sandbox_policy,
WindowsSandboxLevel::Disabled,
metadata.clone(),
);
let header = state.current_header_value().expect("header");
let json: Value = serde_json::from_str(&header).expect("json");
assert_eq!(json.get("metadata"), Some(&serde_json::json!(metadata)));
}

View File

@@ -20,6 +20,7 @@ use core_test_support::responses;
use core_test_support::test_codex::test_codex;
use futures::StreamExt;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;
use wiremock::matchers::header;
@@ -546,3 +547,134 @@ async fn responses_stream_includes_turn_metadata_header_for_git_workspace_e2e()
Some(false)
);
}
#[tokio::test]
async fn responses_stream_includes_parent_metadata_in_turn_metadata_header() {
core_test_support::skip_if_no_network!();
let server = responses::start_mock_server().await;
let request_recorder = responses::mount_sse_once(
&server,
responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_completed("resp-1"),
]),
)
.await;
let provider = ModelProviderInfo {
name: "mock".into(),
base_url: Some(format!("{}/v1", server.uri())),
env_key: None,
env_key_instructions: None,
experimental_bearer_token: None,
wire_api: WireApi::Responses,
query_params: None,
http_headers: None,
env_http_headers: None,
request_max_retries: Some(0),
stream_max_retries: Some(0),
stream_idle_timeout_ms: Some(5_000),
websocket_connect_timeout_ms: None,
requires_openai_auth: false,
supports_websockets: false,
};
let codex_home = TempDir::new().expect("failed to create TempDir");
let mut config = load_default_config_for_test(&codex_home).await;
config.model_provider_id = provider.name.clone();
config.model_provider = provider.clone();
let effort = config.model_reasoning_effort;
let summary = config.model_reasoning_summary;
let model = codex_core::test_support::get_model_offline(config.model.as_deref());
config.model = Some(model.clone());
let config = Arc::new(config);
let conversation_id = ThreadId::new();
let auth_mode = TelemetryAuthMode::Chatgpt;
let session_source = SessionSource::Exec;
let model_info =
codex_core::test_support::construct_model_info_offline(model.as_str(), &config);
let session_telemetry = SessionTelemetry::new(
conversation_id,
model.as_str(),
model_info.slug.as_str(),
None,
Some("test@test.com".to_string()),
Some(auth_mode),
"test_originator".to_string(),
false,
"test".to_string(),
session_source.clone(),
);
let client = ModelClient::new(
None,
conversation_id,
provider,
session_source,
config.model_verbosity,
/*enable_request_compression*/ false,
/*include_timing_metrics*/ false,
None,
);
let mut client_session = client.new_session();
let mut prompt = Prompt::default();
prompt.input = vec![ResponseItem::Message {
id: None,
role: "user".into(),
content: vec![ContentItem::InputText {
text: "hello".into(),
}],
end_turn: None,
phase: None,
}];
let turn_metadata_header = serde_json::to_string(&json!({
"turn_id": "turn-123",
"metadata": {
"parentConversationId": "conv-123",
"parentMessageId": "msg-123",
"parentTurnId": "turn-123",
},
}))
.expect("turn metadata json");
let mut stream = client_session
.stream(
&prompt,
&model_info,
&session_telemetry,
effort,
summary.unwrap_or(model_info.default_reasoning_summary),
None,
Some(turn_metadata_header.as_str()),
)
.await
.expect("stream failed");
while let Some(event) = stream.next().await {
if matches!(event, Ok(ResponseEvent::Completed { .. })) {
break;
}
}
let request = request_recorder.single_request();
let turn_metadata_header = request
.header("x-codex-turn-metadata")
.expect("request should include turn metadata");
let turn_metadata: serde_json::Value =
serde_json::from_str(&turn_metadata_header).expect("turn metadata should be valid JSON");
assert_eq!(
turn_metadata.pointer("/metadata/parentConversationId"),
Some(&json!("conv-123"))
);
assert_eq!(
turn_metadata.pointer("/metadata/parentMessageId"),
Some(&json!("msg-123"))
);
assert_eq!(
turn_metadata.pointer("/metadata/parentTurnId"),
Some(&json!("turn-123"))
);
}

View File

@@ -424,6 +424,39 @@ async fn tool_search_returns_deferred_tools_without_follow_up_tool_injection() -
let requests = mock.requests();
assert_eq!(requests.len(), 3);
let apps_tool_call = server
.received_requests()
.await
.unwrap_or_default()
.into_iter()
.find_map(|request| {
let body: Value = serde_json::from_slice(&request.body).ok()?;
(request.url.path() == "/api/codex/apps"
&& body.get("method").and_then(Value::as_str) == Some("tools/call"))
.then_some(body)
})
.expect("apps tools/call request should be recorded");
assert_eq!(
apps_tool_call.pointer("/params/_meta/_codex_apps"),
Some(&json!({
"resource_uri": CALENDAR_CREATE_EVENT_RESOURCE_URI,
"contains_mcp_source": true,
"connector_id": "calendar",
}))
);
assert_eq!(
apps_tool_call.pointer("/params/_meta/x-codex-turn-metadata/session_id"),
Some(&json!(test.session_configured.session_id.to_string()))
);
assert!(
apps_tool_call
.pointer("/params/_meta/x-codex-turn-metadata/turn_id")
.and_then(Value::as_str)
.is_some_and(|turn_id| !turn_id.is_empty()),
"apps tools/call should include turn metadata turn_id: {apps_tool_call:?}"
);
let first_request_tools = tool_names(&requests[0].body_json());
assert!(
first_request_tools

View File

@@ -723,7 +723,7 @@ impl RmcpClient {
None => None,
};
let rmcp_params = CallToolRequestParams {
meta,
meta: None,
name: name.into(),
arguments,
task: None,
@@ -731,7 +731,30 @@ impl RmcpClient {
let result = self
.run_service_operation("tools/call", timeout, move |service| {
let rmcp_params = rmcp_params.clone();
async move { service.call_tool(rmcp_params).await }.boxed()
let meta = meta.clone();
async move {
let result = service
.peer()
.send_request_with_option(
ClientRequest::CallToolRequest(rmcp::model::CallToolRequest {
method: Default::default(),
params: rmcp_params,
extensions: Default::default(),
}),
rmcp::service::PeerRequestOptions {
timeout: None,
meta,
},
)
.await?
.await_response()
.await?;
match result {
ServerResult::CallToolResult(result) => Ok(result),
_ => Err(rmcp::service::ServiceError::UnexpectedResponse),
}
}
.boxed()
})
.await?;
self.persist_oauth_tokens().await;