mirror of
https://github.com/openai/codex.git
synced 2026-04-19 20:24:50 +00:00
Compare commits
20 Commits
codex-debu
...
nicholascl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8ee421c407 | ||
|
|
5305a38085 | ||
|
|
656d2f5724 | ||
|
|
2bcc504df8 | ||
|
|
6bf3961d66 | ||
|
|
5bf604ca69 | ||
|
|
b679c7596b | ||
|
|
cbd0700837 | ||
|
|
ec278abd94 | ||
|
|
8efc728deb | ||
|
|
6e8ebdc5bc | ||
|
|
db3ddeae35 | ||
|
|
88653c9245 | ||
|
|
98f97df37f | ||
|
|
e84f54294a | ||
|
|
fb816b1322 | ||
|
|
edc8e17c5c | ||
|
|
9124fe9cbc | ||
|
|
7df3c80f25 | ||
|
|
bab2a28f26 |
@@ -3116,6 +3116,16 @@
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"metadata": {
|
||||
"additionalProperties": {
|
||||
"type": "string"
|
||||
},
|
||||
"description": "Optional string metadata attached only to this turn.",
|
||||
"type": [
|
||||
"object",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"model": {
|
||||
"description": "Override the model for this turn and subsequent turns.",
|
||||
"type": [
|
||||
|
||||
@@ -13954,6 +13954,16 @@
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"metadata": {
|
||||
"additionalProperties": {
|
||||
"type": "string"
|
||||
},
|
||||
"description": "Optional string metadata attached only to this turn.",
|
||||
"type": [
|
||||
"object",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"model": {
|
||||
"description": "Override the model for this turn and subsequent turns.",
|
||||
"type": [
|
||||
|
||||
@@ -11714,6 +11714,16 @@
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"metadata": {
|
||||
"additionalProperties": {
|
||||
"type": "string"
|
||||
},
|
||||
"description": "Optional string metadata attached only to this turn.",
|
||||
"type": [
|
||||
"object",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"model": {
|
||||
"description": "Override the model for this turn and subsequent turns.",
|
||||
"type": [
|
||||
|
||||
@@ -545,6 +545,16 @@
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"metadata": {
|
||||
"additionalProperties": {
|
||||
"type": "string"
|
||||
},
|
||||
"description": "Optional string metadata attached only to this turn.",
|
||||
"type": [
|
||||
"object",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"model": {
|
||||
"description": "Override the model for this turn and subsequent turns.",
|
||||
"type": [
|
||||
|
||||
@@ -13,6 +13,9 @@ import type { SandboxPolicy } from "./SandboxPolicy";
|
||||
import type { UserInput } from "./UserInput";
|
||||
|
||||
export type TurnStartParams = {threadId: string, input: Array<UserInput>, /**
|
||||
* Optional string metadata attached only to this turn.
|
||||
*/
|
||||
metadata?: { [key in string]?: string } | null, /**
|
||||
* Override the working directory for this turn and subsequent turns.
|
||||
*/
|
||||
cwd?: string | null, /**
|
||||
|
||||
@@ -3831,6 +3831,9 @@ pub enum TurnStatus {
|
||||
pub struct TurnStartParams {
|
||||
pub thread_id: String,
|
||||
pub input: Vec<UserInput>,
|
||||
/// Optional string metadata attached only to this turn.
|
||||
#[ts(optional = nullable)]
|
||||
pub metadata: Option<BTreeMap<String, String>>,
|
||||
/// Override the working directory for this turn and subsequent turns.
|
||||
#[ts(optional = nullable)]
|
||||
pub cwd: Option<PathBuf>,
|
||||
@@ -7938,6 +7941,7 @@ mod tests {
|
||||
let without_override = TurnStartParams {
|
||||
thread_id: "thread_123".to_string(),
|
||||
input: vec![],
|
||||
metadata: None,
|
||||
cwd: None,
|
||||
approval_policy: None,
|
||||
approvals_reviewer: None,
|
||||
|
||||
@@ -69,7 +69,7 @@ Use the thread APIs to create, list, or archive conversations. Drive a conversat
|
||||
- Initialize once per connection: Immediately after opening a transport connection, send an `initialize` request with your client metadata, then emit an `initialized` notification. Any other request on that connection before this handshake gets rejected.
|
||||
- Start (or resume) a thread: Call `thread/start` to open a fresh conversation. The response returns the thread object and you’ll also get a `thread/started` notification. If you’re continuing an existing conversation, call `thread/resume` with its ID instead. If you want to branch from an existing conversation, call `thread/fork` to create a new thread id with copied history. Like `thread/start`, `thread/fork` also accepts `ephemeral: true` for an in-memory temporary thread.
|
||||
The returned `thread.ephemeral` flag tells you whether the session is intentionally in-memory only; when it is `true`, `thread.path` is `null`.
|
||||
- Begin a turn: To send user input, call `turn/start` with the target `threadId` and the user's input. Optional fields let you override model, cwd, sandbox policy, approval policy, approvals reviewer, etc. This immediately returns the new turn object. The app-server emits `turn/started` when that turn actually begins running.
|
||||
- Begin a turn: To send user input, call `turn/start` with the target `threadId` and the user's input. Optional fields let you override model, cwd, sandbox policy, approval policy, approvals reviewer, etc. `turn/start.metadata` also lets callers attach arbitrary string metadata to that turn. This immediately returns the new turn object. The app-server emits `turn/started` when that turn actually begins running.
|
||||
- Stream events: After `turn/start`, keep reading JSON-RPC notifications on stdout. You’ll see `item/started`, `item/completed`, deltas like `item/agentMessage/delta`, tool progress, etc. These represent streaming model output plus any side effects (commands, tool calls, reasoning notes).
|
||||
- Finish the turn: When the model is done (or the turn is interrupted via making the `turn/interrupt` call), the server sends `turn/completed` with the final turn state and token usage.
|
||||
|
||||
@@ -139,7 +139,7 @@ Example with notification opt-out:
|
||||
- `thread/shellCommand` — run a user-initiated `!` shell command against a thread; this runs unsandboxed with full access rather than inheriting the thread sandbox policy. Returns `{}` immediately while progress streams through standard turn/item notifications and any active turn receives the formatted output in its message stream.
|
||||
- `thread/backgroundTerminals/clean` — terminate all running background terminals for a thread (experimental; requires `capabilities.experimentalApi`); returns `{}` when the cleanup request is accepted.
|
||||
- `thread/rollback` — drop the last N turns from the agent’s in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
|
||||
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
|
||||
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. `metadata` accepts arbitrary string key/value pairs for turn-scoped observability and request forwarding. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
|
||||
- `turn/steer` — add user input to an already in-flight turn without starting a new turn; returns the active `turnId` that accepted the input.
|
||||
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
|
||||
- `thread/realtime/start` — start a thread-scoped realtime session (experimental); returns `{}` and streams `thread/realtime/*` notifications.
|
||||
|
||||
@@ -279,6 +279,7 @@ use codex_state::ThreadMetadataBuilder;
|
||||
use codex_state::log_db::LogDbLayer;
|
||||
use codex_utils_json_to_toml::json_to_toml;
|
||||
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::ffi::OsStr;
|
||||
@@ -5973,6 +5974,8 @@ impl CodexMessageProcessor {
|
||||
.into_iter()
|
||||
.map(V2UserInput::into_core)
|
||||
.collect();
|
||||
let next_turn_metadata = params.metadata.clone();
|
||||
let clear_next_turn_metadata_on_error = next_turn_metadata.is_some();
|
||||
|
||||
let has_any_overrides = params.cwd.is_some()
|
||||
|| params.approval_policy.is_some()
|
||||
@@ -6010,6 +6013,13 @@ impl CodexMessageProcessor {
|
||||
.await;
|
||||
}
|
||||
|
||||
if let Some(metadata) = next_turn_metadata
|
||||
&& let Err(error) = Self::set_next_turn_metadata(thread.as_ref(), metadata).await
|
||||
{
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
|
||||
// Start the turn by submitting the user input. Return its submission id as turn_id.
|
||||
let turn_id = self
|
||||
.submit_core_op(
|
||||
@@ -6038,6 +6048,15 @@ impl CodexMessageProcessor {
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
Err(err) => {
|
||||
if clear_next_turn_metadata_on_error
|
||||
&& let Err(clear_error) =
|
||||
Self::set_next_turn_metadata(thread.as_ref(), BTreeMap::new()).await
|
||||
{
|
||||
warn!(
|
||||
error = %clear_error.message,
|
||||
"failed to clear next turn metadata after turn/start submission error"
|
||||
);
|
||||
}
|
||||
let error = JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("failed to start turn: {err}"),
|
||||
@@ -6062,6 +6081,20 @@ impl CodexMessageProcessor {
|
||||
})
|
||||
}
|
||||
|
||||
async fn set_next_turn_metadata(
|
||||
thread: &CodexThread,
|
||||
metadata: BTreeMap<String, String>,
|
||||
) -> Result<(), JSONRPCErrorError> {
|
||||
thread
|
||||
.set_next_turn_metadata(metadata)
|
||||
.await
|
||||
.map_err(|err| JSONRPCErrorError {
|
||||
code: INVALID_PARAMS_ERROR_CODE,
|
||||
message: err.to_string(),
|
||||
data: None,
|
||||
})
|
||||
}
|
||||
|
||||
async fn turn_steer(&self, request_id: ConnectionRequestId, params: TurnSteerParams) {
|
||||
let (_, thread) = match self.load_thread(¶ms.thread_id).await {
|
||||
Ok(v) => v,
|
||||
|
||||
@@ -730,6 +730,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::ConfigRequirementsReadResponse;
|
||||
use codex_app_server_protocol::SessionSource as ApiSessionSource;
|
||||
@@ -737,9 +738,13 @@ mod tests {
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::Turn;
|
||||
use codex_app_server_protocol::TurnCompletedNotification;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
|
||||
async fn build_test_config() -> Config {
|
||||
match ConfigBuilder::default().build().await {
|
||||
@@ -857,6 +862,175 @@ mod tests {
|
||||
.expect("in-process runtime should shutdown cleanly");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn in_process_start_forwards_turn_metadata_on_turn_requests() {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new().expect("tempdir should create");
|
||||
let mut client = start(InProcessStartArgs {
|
||||
arg0_paths: Arg0DispatchPaths::default(),
|
||||
config: Arc::new(
|
||||
ConfigBuilder::default()
|
||||
.codex_home(codex_home.path().to_path_buf())
|
||||
.build()
|
||||
.await
|
||||
.expect("test config should load"),
|
||||
),
|
||||
cli_overrides: vec![
|
||||
(
|
||||
"model_provider".to_string(),
|
||||
TomlValue::String("mock_provider".to_string()),
|
||||
),
|
||||
(
|
||||
"model_providers.mock_provider.name".to_string(),
|
||||
TomlValue::String("Mock provider for test".to_string()),
|
||||
),
|
||||
(
|
||||
"model_providers.mock_provider.base_url".to_string(),
|
||||
TomlValue::String(format!("{}/v1", server.uri())),
|
||||
),
|
||||
(
|
||||
"model_providers.mock_provider.wire_api".to_string(),
|
||||
TomlValue::String("responses".to_string()),
|
||||
),
|
||||
(
|
||||
"model_providers.mock_provider.request_max_retries".to_string(),
|
||||
TomlValue::Integer(0),
|
||||
),
|
||||
(
|
||||
"model_providers.mock_provider.stream_max_retries".to_string(),
|
||||
TomlValue::Integer(0),
|
||||
),
|
||||
],
|
||||
loader_overrides: LoaderOverrides::default(),
|
||||
cloud_requirements: CloudRequirementsLoader::default(),
|
||||
auth_manager: None,
|
||||
thread_manager: None,
|
||||
feedback: CodexFeedback::new(),
|
||||
config_warnings: Vec::new(),
|
||||
session_source: SessionSource::Cli,
|
||||
enable_codex_api_key_env: false,
|
||||
initialize: InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: "codex-in-process-test".to_string(),
|
||||
title: None,
|
||||
version: "0.0.0".to_string(),
|
||||
},
|
||||
capabilities: None,
|
||||
},
|
||||
channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
|
||||
})
|
||||
.await
|
||||
.expect("in-process runtime should start");
|
||||
|
||||
let thread_start_response: ThreadStartResponse = serde_json::from_value(
|
||||
client
|
||||
.request(ClientRequest::ThreadStart {
|
||||
request_id: RequestId::Integer(5),
|
||||
params: ThreadStartParams {
|
||||
model: Some("mock-model".to_string()),
|
||||
ephemeral: Some(true),
|
||||
..ThreadStartParams::default()
|
||||
},
|
||||
})
|
||||
.await
|
||||
.expect("thread/start request should work")
|
||||
.expect("thread/start should succeed"),
|
||||
)
|
||||
.expect("thread/start response should parse");
|
||||
|
||||
let _turn_start_response: TurnStartResponse = serde_json::from_value(
|
||||
client
|
||||
.request(ClientRequest::TurnStart {
|
||||
request_id: RequestId::Integer(6),
|
||||
params: TurnStartParams {
|
||||
thread_id: thread_start_response.thread.id,
|
||||
input: vec![UserInput::Text {
|
||||
text: "Hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
metadata: Some(std::collections::BTreeMap::from([
|
||||
(
|
||||
"parentConversationId".to_string(),
|
||||
"parent-conversation-123".to_string(),
|
||||
),
|
||||
(
|
||||
"parentMessageId".to_string(),
|
||||
"parent-message-123".to_string(),
|
||||
),
|
||||
("parentTurnId".to_string(), "parent-turn-123".to_string()),
|
||||
])),
|
||||
..TurnStartParams::default()
|
||||
},
|
||||
})
|
||||
.await
|
||||
.expect("turn/start request should work")
|
||||
.expect("turn/start should succeed"),
|
||||
)
|
||||
.expect("turn/start response should parse");
|
||||
|
||||
let turn_completed = loop {
|
||||
let event = timeout(std::time::Duration::from_secs(10), client.next_event())
|
||||
.await
|
||||
.expect("timed out waiting for turn completion");
|
||||
match event {
|
||||
Some(InProcessServerEvent::ServerNotification(
|
||||
ServerNotification::TurnCompleted(notification),
|
||||
)) => break notification,
|
||||
Some(_) => continue,
|
||||
None => panic!("in-process runtime exited before turn completion"),
|
||||
}
|
||||
};
|
||||
assert_eq!(
|
||||
turn_completed.turn.status,
|
||||
TurnStatus::Completed,
|
||||
"turn error: {:?}",
|
||||
turn_completed.turn.error
|
||||
);
|
||||
assert_eq!(turn_completed.turn.error, None);
|
||||
|
||||
let requests = timeout(std::time::Duration::from_secs(5), async {
|
||||
loop {
|
||||
let requests = server
|
||||
.received_requests()
|
||||
.await
|
||||
.expect("received requests should be available");
|
||||
if !requests.is_empty() {
|
||||
break requests;
|
||||
}
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("timed out waiting for outbound requests");
|
||||
assert!(!requests.is_empty());
|
||||
for request in requests {
|
||||
let turn_metadata_header = request
|
||||
.headers
|
||||
.get("x-codex-turn-metadata")
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.expect("turn metadata header should be present");
|
||||
let turn_metadata: serde_json::Value =
|
||||
serde_json::from_str(turn_metadata_header).expect("turn metadata should be JSON");
|
||||
|
||||
assert_eq!(
|
||||
turn_metadata.pointer("/metadata/parentConversationId"),
|
||||
Some(&serde_json::json!("parent-conversation-123"))
|
||||
);
|
||||
assert_eq!(
|
||||
turn_metadata.pointer("/metadata/parentMessageId"),
|
||||
Some(&serde_json::json!("parent-message-123"))
|
||||
);
|
||||
assert_eq!(
|
||||
turn_metadata.pointer("/metadata/parentTurnId"),
|
||||
Some(&serde_json::json!("parent-turn-123"))
|
||||
);
|
||||
}
|
||||
|
||||
client
|
||||
.shutdown()
|
||||
.await
|
||||
.expect("in-process runtime should shutdown cleanly");
|
||||
}
|
||||
#[test]
|
||||
fn guaranteed_delivery_helpers_cover_terminal_notifications() {
|
||||
assert!(server_notification_requires_delivery(
|
||||
|
||||
@@ -601,6 +601,7 @@ async fn turn_start_jsonrpc_span_parents_core_turn_spans() -> Result<()> {
|
||||
personality: None,
|
||||
output_schema: None,
|
||||
collaboration_mode: None,
|
||||
metadata: None,
|
||||
},
|
||||
},
|
||||
Some(remote_trace),
|
||||
|
||||
@@ -1394,6 +1394,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
|
||||
personality: None,
|
||||
output_schema: None,
|
||||
collaboration_mode: None,
|
||||
metadata: None,
|
||||
})
|
||||
.await?;
|
||||
timeout(
|
||||
@@ -1427,6 +1428,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
|
||||
personality: None,
|
||||
output_schema: None,
|
||||
collaboration_mode: None,
|
||||
metadata: None,
|
||||
})
|
||||
.await?;
|
||||
timeout(
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Debug;
|
||||
@@ -591,6 +592,7 @@ impl Codex {
|
||||
persist_extended_history,
|
||||
inherited_shell_snapshot,
|
||||
user_shell_override,
|
||||
next_turn_metadata: BTreeMap::new(),
|
||||
};
|
||||
|
||||
// Generate a unique ID for the lifetime of this Codex session.
|
||||
@@ -716,6 +718,18 @@ impl Codex {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn set_next_turn_metadata(
|
||||
&self,
|
||||
metadata: BTreeMap<String, String>,
|
||||
) -> ConstraintResult<()> {
|
||||
self.session
|
||||
.update_settings(SessionSettingsUpdate {
|
||||
next_turn_metadata: Some(metadata),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn agent_status(&self) -> AgentStatus {
|
||||
self.agent_status.borrow().clone()
|
||||
}
|
||||
@@ -1056,6 +1070,8 @@ pub(crate) struct SessionConfiguration {
|
||||
persist_extended_history: bool,
|
||||
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
|
||||
user_shell_override: Option<shell::Shell>,
|
||||
/// One-shot metadata consumed by the next turn created from this session.
|
||||
next_turn_metadata: BTreeMap<String, String>,
|
||||
}
|
||||
|
||||
impl SessionConfiguration {
|
||||
@@ -1098,6 +1114,9 @@ impl SessionConfiguration {
|
||||
if let Some(personality) = updates.personality {
|
||||
next_configuration.personality = Some(personality);
|
||||
}
|
||||
if let Some(next_turn_metadata) = updates.next_turn_metadata.clone() {
|
||||
next_configuration.next_turn_metadata = next_turn_metadata;
|
||||
}
|
||||
if let Some(approval_policy) = updates.approval_policy {
|
||||
next_configuration.approval_policy.set(approval_policy)?;
|
||||
}
|
||||
@@ -1148,6 +1167,8 @@ pub(crate) struct SessionSettingsUpdate {
|
||||
pub(crate) final_output_json_schema: Option<Option<Value>>,
|
||||
pub(crate) personality: Option<Personality>,
|
||||
pub(crate) app_server_client_name: Option<String>,
|
||||
/// One-shot metadata for the next turn created from this update.
|
||||
pub(crate) next_turn_metadata: Option<BTreeMap<String, String>>,
|
||||
}
|
||||
|
||||
impl Session {
|
||||
@@ -1286,6 +1307,7 @@ impl Session {
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn make_turn_context(
|
||||
conversation_id: ThreadId,
|
||||
auth_manager: Option<Arc<AuthManager>>,
|
||||
session_telemetry: &SessionTelemetry,
|
||||
provider: ModelProviderInfo,
|
||||
@@ -1336,10 +1358,12 @@ impl Session {
|
||||
|
||||
let cwd = session_configuration.cwd.clone();
|
||||
let turn_metadata_state = Arc::new(TurnMetadataState::new(
|
||||
conversation_id.to_string(),
|
||||
sub_id.clone(),
|
||||
cwd.clone(),
|
||||
session_configuration.sandbox_policy.get(),
|
||||
session_configuration.windows_sandbox_level,
|
||||
session_configuration.next_turn_metadata.clone(),
|
||||
));
|
||||
let (current_date, timezone) = local_time_context();
|
||||
TurnContext {
|
||||
@@ -2306,7 +2330,9 @@ impl Session {
|
||||
state.session_configuration.sandbox_policy != next.sandbox_policy;
|
||||
let codex_home = next.codex_home.clone();
|
||||
let session_source = next.session_source.clone();
|
||||
state.session_configuration = next.clone();
|
||||
let mut persisted_configuration = next.clone();
|
||||
persisted_configuration.next_turn_metadata.clear();
|
||||
state.session_configuration = persisted_configuration;
|
||||
(
|
||||
next,
|
||||
sandbox_policy_changed,
|
||||
@@ -2394,6 +2420,7 @@ impl Session {
|
||||
.skills_for_config(&per_turn_config),
|
||||
);
|
||||
let mut turn_context: TurnContext = Self::make_turn_context(
|
||||
self.conversation_id,
|
||||
Some(Arc::clone(&self.services.auth_manager)),
|
||||
&self.services.session_telemetry,
|
||||
session_configuration.provider.clone(),
|
||||
@@ -4491,6 +4518,7 @@ mod handlers {
|
||||
reasoning_summary: summary,
|
||||
service_tier,
|
||||
final_output_json_schema: Some(final_output_json_schema),
|
||||
next_turn_metadata: None,
|
||||
personality,
|
||||
app_server_client_name: None,
|
||||
},
|
||||
@@ -5220,10 +5248,12 @@ async fn spawn_review_thread(
|
||||
let per_turn_config = Arc::new(per_turn_config);
|
||||
let review_turn_id = sub_id.to_string();
|
||||
let turn_metadata_state = Arc::new(TurnMetadataState::new(
|
||||
sess.conversation_id.to_string(),
|
||||
review_turn_id.clone(),
|
||||
parent_turn_context.cwd.clone(),
|
||||
parent_turn_context.sandbox_policy.get(),
|
||||
parent_turn_context.windows_sandbox_level,
|
||||
parent_turn_context.turn_metadata_state.metadata().clone(),
|
||||
));
|
||||
|
||||
let review_turn_context = TurnContext {
|
||||
|
||||
@@ -1660,6 +1660,7 @@ async fn set_rate_limits_retains_previous_credits() {
|
||||
persist_extended_history: false,
|
||||
inherited_shell_snapshot: None,
|
||||
user_shell_override: None,
|
||||
next_turn_metadata: std::collections::BTreeMap::new(),
|
||||
};
|
||||
|
||||
let mut state = SessionState::new(session_configuration);
|
||||
@@ -1758,6 +1759,7 @@ async fn set_rate_limits_updates_plan_type_when_present() {
|
||||
persist_extended_history: false,
|
||||
inherited_shell_snapshot: None,
|
||||
user_shell_override: None,
|
||||
next_turn_metadata: std::collections::BTreeMap::new(),
|
||||
};
|
||||
|
||||
let mut state = SessionState::new(session_configuration);
|
||||
@@ -2102,6 +2104,7 @@ pub(crate) async fn make_session_configuration_for_tests() -> SessionConfigurati
|
||||
persist_extended_history: false,
|
||||
inherited_shell_snapshot: None,
|
||||
user_shell_override: None,
|
||||
next_turn_metadata: std::collections::BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2275,6 +2278,46 @@ async fn session_configuration_apply_rederives_legacy_file_system_policy_on_cwd_
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn new_turn_with_sub_id_consumes_next_turn_metadata_once() {
|
||||
let (session, _) = make_session_and_context().await;
|
||||
let next_turn_metadata = std::collections::BTreeMap::from([(
|
||||
"parentTurnId".to_string(),
|
||||
"parent-turn-123".to_string(),
|
||||
)]);
|
||||
|
||||
session
|
||||
.update_settings(SessionSettingsUpdate {
|
||||
next_turn_metadata: Some(next_turn_metadata.clone()),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("test setup should allow updating next turn metadata");
|
||||
|
||||
let first_turn_context = session
|
||||
.new_turn_with_sub_id("turn-1".to_string(), SessionSettingsUpdate::default())
|
||||
.await
|
||||
.expect("first turn should be created");
|
||||
assert_eq!(
|
||||
first_turn_context.turn_metadata_state.metadata(),
|
||||
&next_turn_metadata
|
||||
);
|
||||
let state = session.state.lock().await;
|
||||
assert!(state.session_configuration.next_turn_metadata.is_empty());
|
||||
drop(state);
|
||||
|
||||
let second_turn_context = session
|
||||
.new_turn_with_sub_id("turn-2".to_string(), SessionSettingsUpdate::default())
|
||||
.await
|
||||
.expect("second turn should be created");
|
||||
assert!(
|
||||
second_turn_context
|
||||
.turn_metadata_state
|
||||
.metadata()
|
||||
.is_empty()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() {
|
||||
let codex_home = tempfile::tempdir().expect("create temp dir");
|
||||
@@ -2333,6 +2376,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() {
|
||||
persist_extended_history: false,
|
||||
inherited_shell_snapshot: None,
|
||||
user_shell_override: None,
|
||||
next_turn_metadata: std::collections::BTreeMap::new(),
|
||||
};
|
||||
|
||||
let (tx_event, _rx_event) = async_channel::unbounded();
|
||||
@@ -2428,6 +2472,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
persist_extended_history: false,
|
||||
inherited_shell_snapshot: None,
|
||||
user_shell_override: None,
|
||||
next_turn_metadata: std::collections::BTreeMap::new(),
|
||||
};
|
||||
let per_turn_config = Session::build_per_turn_config(&session_configuration);
|
||||
let model_info = ModelsManager::construct_model_info_offline_for_tests(
|
||||
@@ -2517,6 +2562,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
|
||||
let skills_outcome = Arc::new(services.skills_manager.skills_for_config(&per_turn_config));
|
||||
let turn_context = Session::make_turn_context(
|
||||
conversation_id,
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
&session_telemetry,
|
||||
session_configuration.provider.clone(),
|
||||
@@ -3226,6 +3272,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
|
||||
persist_extended_history: false,
|
||||
inherited_shell_snapshot: None,
|
||||
user_shell_override: None,
|
||||
next_turn_metadata: std::collections::BTreeMap::new(),
|
||||
};
|
||||
let per_turn_config = Session::build_per_turn_config(&session_configuration);
|
||||
let model_info = ModelsManager::construct_model_info_offline_for_tests(
|
||||
@@ -3315,6 +3362,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
|
||||
|
||||
let skills_outcome = Arc::new(services.skills_manager.skills_for_config(&per_turn_config));
|
||||
let turn_context = Arc::new(Session::make_turn_context(
|
||||
conversation_id,
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
&session_telemetry,
|
||||
session_configuration.provider.clone(),
|
||||
|
||||
@@ -22,6 +22,7 @@ use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use std::collections::BTreeMap;
|
||||
use std::path::PathBuf;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::watch;
|
||||
@@ -99,6 +100,13 @@ impl CodexThread {
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn set_next_turn_metadata(
|
||||
&self,
|
||||
metadata: BTreeMap<String, String>,
|
||||
) -> ConstraintResult<()> {
|
||||
self.codex.set_next_turn_metadata(metadata).await
|
||||
}
|
||||
|
||||
/// Use sparingly: this is intended to be removed soon.
|
||||
pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> {
|
||||
self.codex.submit_with_id(sub).await
|
||||
|
||||
@@ -119,7 +119,8 @@ pub(crate) async fn handle_mcp_tool_call(
|
||||
);
|
||||
return CallToolResult::from_result(result);
|
||||
}
|
||||
let request_meta = build_mcp_tool_call_request_meta(&server, metadata.as_ref());
|
||||
let request_meta =
|
||||
build_mcp_tool_call_request_meta(turn_context.as_ref(), &server, metadata.as_ref());
|
||||
|
||||
let tool_call_begin_event = EventMsg::McpToolCallBegin(McpToolCallBeginEvent {
|
||||
call_id: call_id.clone(),
|
||||
@@ -388,20 +389,31 @@ pub(crate) struct McpToolApprovalMetadata {
|
||||
}
|
||||
|
||||
const MCP_TOOL_CODEX_APPS_META_KEY: &str = "_codex_apps";
|
||||
|
||||
fn build_mcp_tool_call_request_meta(
|
||||
turn_context: &TurnContext,
|
||||
server: &str,
|
||||
metadata: Option<&McpToolApprovalMetadata>,
|
||||
) -> Option<serde_json::Value> {
|
||||
if server != CODEX_APPS_MCP_SERVER_NAME {
|
||||
return None;
|
||||
let mut request_meta = serde_json::Map::new();
|
||||
|
||||
if let Some(turn_metadata) = turn_context.turn_metadata_state.current_meta_value() {
|
||||
request_meta.insert(
|
||||
crate::X_CODEX_TURN_METADATA_HEADER.to_string(),
|
||||
turn_metadata,
|
||||
);
|
||||
}
|
||||
|
||||
let codex_apps_meta = metadata.and_then(|metadata| metadata.codex_apps_meta.as_ref())?;
|
||||
if server == CODEX_APPS_MCP_SERVER_NAME
|
||||
&& let Some(codex_apps_meta) =
|
||||
metadata.and_then(|metadata| metadata.codex_apps_meta.clone())
|
||||
{
|
||||
request_meta.insert(
|
||||
MCP_TOOL_CODEX_APPS_META_KEY.to_string(),
|
||||
serde_json::Value::Object(codex_apps_meta),
|
||||
);
|
||||
}
|
||||
|
||||
Some(serde_json::json!({
|
||||
MCP_TOOL_CODEX_APPS_META_KEY: codex_apps_meta,
|
||||
}))
|
||||
(!request_meta.is_empty()).then_some(serde_json::Value::Object(request_meta))
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
|
||||
@@ -439,8 +439,39 @@ fn sanitize_mcp_tool_result_for_model_preserves_image_when_supported() {
|
||||
assert_eq!(got, original);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn codex_apps_tool_call_request_meta_includes_codex_apps_meta() {
|
||||
#[tokio::test]
|
||||
async fn mcp_tool_call_request_meta_includes_turn_metadata_for_custom_server() {
|
||||
let (_, turn_context) = make_session_and_context().await;
|
||||
let expected_turn_metadata = serde_json::from_str::<serde_json::Value>(
|
||||
&turn_context
|
||||
.turn_metadata_state
|
||||
.current_header_value()
|
||||
.expect("turn metadata header"),
|
||||
)
|
||||
.expect("turn metadata json");
|
||||
|
||||
let meta =
|
||||
build_mcp_tool_call_request_meta(&turn_context, "custom_server", /*metadata*/ None)
|
||||
.expect("custom servers should receive turn metadata");
|
||||
|
||||
assert_eq!(
|
||||
meta,
|
||||
serde_json::json!({
|
||||
crate::X_CODEX_TURN_METADATA_HEADER: expected_turn_metadata,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn codex_apps_tool_call_request_meta_includes_turn_metadata_and_codex_apps_meta() {
|
||||
let (_, turn_context) = make_session_and_context().await;
|
||||
let expected_turn_metadata = serde_json::from_str::<serde_json::Value>(
|
||||
&turn_context
|
||||
.turn_metadata_state
|
||||
.current_header_value()
|
||||
.expect("turn metadata header"),
|
||||
)
|
||||
.expect("turn metadata json");
|
||||
let metadata = McpToolApprovalMetadata {
|
||||
annotations: None,
|
||||
connector_id: Some("calendar".to_string()),
|
||||
@@ -461,8 +492,13 @@ fn codex_apps_tool_call_request_meta_includes_codex_apps_meta() {
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
build_mcp_tool_call_request_meta(CODEX_APPS_MCP_SERVER_NAME, Some(&metadata)),
|
||||
build_mcp_tool_call_request_meta(
|
||||
&turn_context,
|
||||
CODEX_APPS_MCP_SERVER_NAME,
|
||||
Some(&metadata),
|
||||
),
|
||||
Some(serde_json::json!({
|
||||
crate::X_CODEX_TURN_METADATA_HEADER: expected_turn_metadata,
|
||||
MCP_TOOL_CODEX_APPS_META_KEY: {
|
||||
"resource_uri": "connector://calendar/tools/calendar_create_event",
|
||||
"contains_mcp_source": true,
|
||||
|
||||
@@ -53,12 +53,16 @@ impl From<WorkspaceGitMetadata> for TurnMetadataWorkspace {
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Default)]
|
||||
pub(crate) struct TurnMetadataBag {
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
session_id: Option<String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
turn_id: Option<String>,
|
||||
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
|
||||
workspaces: BTreeMap<String, TurnMetadataWorkspace>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
sandbox: Option<String>,
|
||||
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
|
||||
metadata: BTreeMap<String, String>,
|
||||
}
|
||||
|
||||
impl TurnMetadataBag {
|
||||
@@ -68,10 +72,12 @@ impl TurnMetadataBag {
|
||||
}
|
||||
|
||||
fn build_turn_metadata_bag(
|
||||
session_id: Option<String>,
|
||||
turn_id: Option<String>,
|
||||
sandbox: Option<String>,
|
||||
repo_root: Option<String>,
|
||||
workspace_git_metadata: Option<WorkspaceGitMetadata>,
|
||||
metadata: BTreeMap<String, String>,
|
||||
) -> TurnMetadataBag {
|
||||
let mut workspaces = BTreeMap::new();
|
||||
if let (Some(repo_root), Some(workspace_git_metadata)) = (repo_root, workspace_git_metadata)
|
||||
@@ -81,9 +87,11 @@ fn build_turn_metadata_bag(
|
||||
}
|
||||
|
||||
TurnMetadataBag {
|
||||
session_id,
|
||||
turn_id,
|
||||
workspaces,
|
||||
sandbox,
|
||||
metadata,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,6 +112,7 @@ pub async fn build_turn_metadata_header(cwd: &Path, sandbox: Option<&str>) -> Op
|
||||
}
|
||||
|
||||
build_turn_metadata_bag(
|
||||
/*session_id*/ None,
|
||||
/*turn_id*/ None,
|
||||
sandbox.map(ToString::to_string),
|
||||
repo_root,
|
||||
@@ -112,6 +121,7 @@ pub async fn build_turn_metadata_header(cwd: &Path, sandbox: Option<&str>) -> Op
|
||||
latest_git_commit_hash,
|
||||
has_changes,
|
||||
}),
|
||||
BTreeMap::new(),
|
||||
)
|
||||
.to_header_value()
|
||||
}
|
||||
@@ -128,18 +138,22 @@ pub(crate) struct TurnMetadataState {
|
||||
|
||||
impl TurnMetadataState {
|
||||
pub(crate) fn new(
|
||||
session_id: String,
|
||||
turn_id: String,
|
||||
cwd: PathBuf,
|
||||
sandbox_policy: &SandboxPolicy,
|
||||
windows_sandbox_level: WindowsSandboxLevel,
|
||||
metadata: BTreeMap<String, String>,
|
||||
) -> Self {
|
||||
let repo_root = get_git_repo_root(&cwd).map(|root| root.to_string_lossy().into_owned());
|
||||
let sandbox = Some(sandbox_tag(sandbox_policy, windows_sandbox_level).to_string());
|
||||
let base_metadata = build_turn_metadata_bag(
|
||||
Some(session_id),
|
||||
Some(turn_id),
|
||||
sandbox,
|
||||
/*repo_root*/ None,
|
||||
/*workspace_git_metadata*/ None,
|
||||
metadata,
|
||||
);
|
||||
let base_header = base_metadata
|
||||
.to_header_value()
|
||||
@@ -168,6 +182,15 @@ impl TurnMetadataState {
|
||||
Some(self.base_header.clone())
|
||||
}
|
||||
|
||||
pub(crate) fn current_meta_value(&self) -> Option<serde_json::Value> {
|
||||
self.current_header_value()
|
||||
.and_then(|header| serde_json::from_str(&header).ok())
|
||||
}
|
||||
|
||||
pub(crate) fn metadata(&self) -> &BTreeMap<String, String> {
|
||||
&self.base_metadata.metadata
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_git_enrichment_task(&self) {
|
||||
if self.repo_root.is_none() {
|
||||
return;
|
||||
@@ -189,10 +212,12 @@ impl TurnMetadataState {
|
||||
};
|
||||
|
||||
let enriched_metadata = build_turn_metadata_bag(
|
||||
state.base_metadata.session_id.clone(),
|
||||
state.base_metadata.turn_id.clone(),
|
||||
state.base_metadata.sandbox.clone(),
|
||||
Some(repo_root),
|
||||
Some(workspace_git_metadata),
|
||||
state.base_metadata.metadata.clone(),
|
||||
);
|
||||
if enriched_metadata.workspaces.is_empty() {
|
||||
return;
|
||||
@@ -231,7 +256,6 @@ impl TurnMetadataState {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "turn_metadata_tests.rs"]
|
||||
mod tests;
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use super::*;
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use serde_json::Value;
|
||||
use tempfile::TempDir;
|
||||
use tokio::process::Command;
|
||||
@@ -67,16 +69,46 @@ fn turn_metadata_state_uses_platform_sandbox_tag() {
|
||||
let sandbox_policy = SandboxPolicy::new_read_only_policy();
|
||||
|
||||
let state = TurnMetadataState::new(
|
||||
"session-a".to_string(),
|
||||
"turn-a".to_string(),
|
||||
cwd,
|
||||
&sandbox_policy,
|
||||
WindowsSandboxLevel::Disabled,
|
||||
BTreeMap::new(),
|
||||
);
|
||||
|
||||
let header = state.current_header_value().expect("header");
|
||||
let json: Value = serde_json::from_str(&header).expect("json");
|
||||
let sandbox_name = json.get("sandbox").and_then(Value::as_str);
|
||||
let session_id = json.get("session_id").and_then(Value::as_str);
|
||||
|
||||
let expected_sandbox = sandbox_tag(&sandbox_policy, WindowsSandboxLevel::Disabled);
|
||||
assert_eq!(sandbox_name, Some(expected_sandbox));
|
||||
assert_eq!(session_id, Some("session-a"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn turn_metadata_state_serializes_custom_metadata() {
|
||||
let temp_dir = TempDir::new().expect("temp dir");
|
||||
let cwd = temp_dir.path().to_path_buf();
|
||||
let sandbox_policy = SandboxPolicy::new_read_only_policy();
|
||||
let metadata = BTreeMap::from([
|
||||
("parentConversationId".to_string(), "conv-123".to_string()),
|
||||
("parentMessageId".to_string(), "msg-123".to_string()),
|
||||
("parentTurnId".to_string(), "turn-123".to_string()),
|
||||
]);
|
||||
|
||||
let state = TurnMetadataState::new(
|
||||
"session-a".to_string(),
|
||||
"turn-a".to_string(),
|
||||
cwd,
|
||||
&sandbox_policy,
|
||||
WindowsSandboxLevel::Disabled,
|
||||
metadata.clone(),
|
||||
);
|
||||
|
||||
let header = state.current_header_value().expect("header");
|
||||
let json: Value = serde_json::from_str(&header).expect("json");
|
||||
|
||||
assert_eq!(json.get("metadata"), Some(&serde_json::json!(metadata)));
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ use core_test_support::responses;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use futures::StreamExt;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use tempfile::TempDir;
|
||||
use wiremock::matchers::header;
|
||||
|
||||
@@ -546,3 +547,134 @@ async fn responses_stream_includes_turn_metadata_header_for_git_workspace_e2e()
|
||||
Some(false)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn responses_stream_includes_parent_metadata_in_turn_metadata_header() {
|
||||
core_test_support::skip_if_no_network!();
|
||||
|
||||
let server = responses::start_mock_server().await;
|
||||
let request_recorder = responses::mount_sse_once(
|
||||
&server,
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let provider = ModelProviderInfo {
|
||||
name: "mock".into(),
|
||||
base_url: Some(format!("{}/v1", server.uri())),
|
||||
env_key: None,
|
||||
env_key_instructions: None,
|
||||
experimental_bearer_token: None,
|
||||
wire_api: WireApi::Responses,
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: Some(0),
|
||||
stream_max_retries: Some(0),
|
||||
stream_idle_timeout_ms: Some(5_000),
|
||||
websocket_connect_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: false,
|
||||
};
|
||||
|
||||
let codex_home = TempDir::new().expect("failed to create TempDir");
|
||||
let mut config = load_default_config_for_test(&codex_home).await;
|
||||
config.model_provider_id = provider.name.clone();
|
||||
config.model_provider = provider.clone();
|
||||
let effort = config.model_reasoning_effort;
|
||||
let summary = config.model_reasoning_summary;
|
||||
let model = codex_core::test_support::get_model_offline(config.model.as_deref());
|
||||
config.model = Some(model.clone());
|
||||
let config = Arc::new(config);
|
||||
|
||||
let conversation_id = ThreadId::new();
|
||||
let auth_mode = TelemetryAuthMode::Chatgpt;
|
||||
let session_source = SessionSource::Exec;
|
||||
let model_info =
|
||||
codex_core::test_support::construct_model_info_offline(model.as_str(), &config);
|
||||
let session_telemetry = SessionTelemetry::new(
|
||||
conversation_id,
|
||||
model.as_str(),
|
||||
model_info.slug.as_str(),
|
||||
None,
|
||||
Some("test@test.com".to_string()),
|
||||
Some(auth_mode),
|
||||
"test_originator".to_string(),
|
||||
false,
|
||||
"test".to_string(),
|
||||
session_source.clone(),
|
||||
);
|
||||
|
||||
let client = ModelClient::new(
|
||||
None,
|
||||
conversation_id,
|
||||
provider,
|
||||
session_source,
|
||||
config.model_verbosity,
|
||||
/*enable_request_compression*/ false,
|
||||
/*include_timing_metrics*/ false,
|
||||
None,
|
||||
);
|
||||
let mut client_session = client.new_session();
|
||||
|
||||
let mut prompt = Prompt::default();
|
||||
prompt.input = vec![ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".into(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: "hello".into(),
|
||||
}],
|
||||
end_turn: None,
|
||||
phase: None,
|
||||
}];
|
||||
let turn_metadata_header = serde_json::to_string(&json!({
|
||||
"turn_id": "turn-123",
|
||||
"metadata": {
|
||||
"parentConversationId": "conv-123",
|
||||
"parentMessageId": "msg-123",
|
||||
"parentTurnId": "turn-123",
|
||||
},
|
||||
}))
|
||||
.expect("turn metadata json");
|
||||
|
||||
let mut stream = client_session
|
||||
.stream(
|
||||
&prompt,
|
||||
&model_info,
|
||||
&session_telemetry,
|
||||
effort,
|
||||
summary.unwrap_or(model_info.default_reasoning_summary),
|
||||
None,
|
||||
Some(turn_metadata_header.as_str()),
|
||||
)
|
||||
.await
|
||||
.expect("stream failed");
|
||||
while let Some(event) = stream.next().await {
|
||||
if matches!(event, Ok(ResponseEvent::Completed { .. })) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let request = request_recorder.single_request();
|
||||
let turn_metadata_header = request
|
||||
.header("x-codex-turn-metadata")
|
||||
.expect("request should include turn metadata");
|
||||
let turn_metadata: serde_json::Value =
|
||||
serde_json::from_str(&turn_metadata_header).expect("turn metadata should be valid JSON");
|
||||
|
||||
assert_eq!(
|
||||
turn_metadata.pointer("/metadata/parentConversationId"),
|
||||
Some(&json!("conv-123"))
|
||||
);
|
||||
assert_eq!(
|
||||
turn_metadata.pointer("/metadata/parentMessageId"),
|
||||
Some(&json!("msg-123"))
|
||||
);
|
||||
assert_eq!(
|
||||
turn_metadata.pointer("/metadata/parentTurnId"),
|
||||
Some(&json!("turn-123"))
|
||||
);
|
||||
}
|
||||
|
||||
@@ -424,6 +424,39 @@ async fn tool_search_returns_deferred_tools_without_follow_up_tool_injection() -
|
||||
let requests = mock.requests();
|
||||
assert_eq!(requests.len(), 3);
|
||||
|
||||
let apps_tool_call = server
|
||||
.received_requests()
|
||||
.await
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.find_map(|request| {
|
||||
let body: Value = serde_json::from_slice(&request.body).ok()?;
|
||||
(request.url.path() == "/api/codex/apps"
|
||||
&& body.get("method").and_then(Value::as_str) == Some("tools/call"))
|
||||
.then_some(body)
|
||||
})
|
||||
.expect("apps tools/call request should be recorded");
|
||||
|
||||
assert_eq!(
|
||||
apps_tool_call.pointer("/params/_meta/_codex_apps"),
|
||||
Some(&json!({
|
||||
"resource_uri": CALENDAR_CREATE_EVENT_RESOURCE_URI,
|
||||
"contains_mcp_source": true,
|
||||
"connector_id": "calendar",
|
||||
}))
|
||||
);
|
||||
assert_eq!(
|
||||
apps_tool_call.pointer("/params/_meta/x-codex-turn-metadata/session_id"),
|
||||
Some(&json!(test.session_configured.session_id.to_string()))
|
||||
);
|
||||
assert!(
|
||||
apps_tool_call
|
||||
.pointer("/params/_meta/x-codex-turn-metadata/turn_id")
|
||||
.and_then(Value::as_str)
|
||||
.is_some_and(|turn_id| !turn_id.is_empty()),
|
||||
"apps tools/call should include turn metadata turn_id: {apps_tool_call:?}"
|
||||
);
|
||||
|
||||
let first_request_tools = tool_names(&requests[0].body_json());
|
||||
assert!(
|
||||
first_request_tools
|
||||
|
||||
@@ -723,7 +723,7 @@ impl RmcpClient {
|
||||
None => None,
|
||||
};
|
||||
let rmcp_params = CallToolRequestParams {
|
||||
meta,
|
||||
meta: None,
|
||||
name: name.into(),
|
||||
arguments,
|
||||
task: None,
|
||||
@@ -731,7 +731,30 @@ impl RmcpClient {
|
||||
let result = self
|
||||
.run_service_operation("tools/call", timeout, move |service| {
|
||||
let rmcp_params = rmcp_params.clone();
|
||||
async move { service.call_tool(rmcp_params).await }.boxed()
|
||||
let meta = meta.clone();
|
||||
async move {
|
||||
let result = service
|
||||
.peer()
|
||||
.send_request_with_option(
|
||||
ClientRequest::CallToolRequest(rmcp::model::CallToolRequest {
|
||||
method: Default::default(),
|
||||
params: rmcp_params,
|
||||
extensions: Default::default(),
|
||||
}),
|
||||
rmcp::service::PeerRequestOptions {
|
||||
timeout: None,
|
||||
meta,
|
||||
},
|
||||
)
|
||||
.await?
|
||||
.await_response()
|
||||
.await?;
|
||||
match result {
|
||||
ServerResult::CallToolResult(result) => Ok(result),
|
||||
_ => Err(rmcp::service::ServiceError::UnexpectedResponse),
|
||||
}
|
||||
}
|
||||
.boxed()
|
||||
})
|
||||
.await?;
|
||||
self.persist_oauth_tokens().await;
|
||||
|
||||
Reference in New Issue
Block a user