mirror of
https://github.com/openai/codex.git
synced 2026-04-24 22:54:54 +00:00
What changed
- Added `outputSchema` support to the app-server APIs, mirroring `codex
exec --output-schema` behavior.
- V1 `sendUserTurn` now accepts `outputSchema` and constrains the final
assistant message for that turn.
- V2 `turn/start` now accepts `outputSchema` and constrains the final
assistant message for that turn (explicitly per-turn only).
Core behavior
- `Op::UserTurn` already supported `final_output_json_schema`; now V1
`sendUserTurn` forwards `outputSchema` into that field.
- `Op::UserInput` now carries `final_output_json_schema` for per-turn
settings updates; core maps it into
`SessionSettingsUpdate.final_output_json_schema` so it applies to the
created turn context.
- V2 `turn/start` does NOT persist the schema via `OverrideTurnContext`
(it’s applied only for the current turn). Other overrides
(cwd/model/etc) keep their existing persistent behavior.
API / docs
- `codex-rs/app-server-protocol/src/protocol/v1.rs`: add `output_schema:
Option<serde_json::Value>` to `SendUserTurnParams` (serialized as
`outputSchema`).
- `codex-rs/app-server-protocol/src/protocol/v2.rs`: add `output_schema:
Option<JsonValue>` to `TurnStartParams` (serialized as `outputSchema`).
- `codex-rs/app-server/README.md`: document `outputSchema` for
`turn/start` and clarify it applies only to the current turn.
- `codex-rs/docs/codex_mcp_interface.md`: document `outputSchema` for v1
`sendUserTurn` and v2 `turn/start`.
Tests added/updated
- New app-server integration tests asserting `outputSchema` is forwarded
into outbound `/responses` requests as `text.format`:
- `codex-rs/app-server/tests/suite/output_schema.rs`
- `codex-rs/app-server/tests/suite/v2/output_schema.rs`
- Added per-turn semantics tests (schema does not leak to the next
turn):
- `send_user_turn_output_schema_is_per_turn_v1`
- `turn_start_output_schema_is_per_turn_v2`
- Added protocol wire-compat tests for the merged op:
- serialize omits `final_output_json_schema` when `None`
- deserialize works when field is missing
- serialize includes `final_output_json_schema` when `Some(schema)`
Call site updates (high level)
- Updated all `Op::UserInput { .. }` constructions to include
`final_output_json_schema`:
- `codex-rs/app-server/src/codex_message_processor.rs`
- `codex-rs/core/src/codex_delegate.rs`
- `codex-rs/mcp-server/src/codex_tool_runner.rs`
- `codex-rs/tui/src/chatwidget.rs`
- `codex-rs/tui2/src/chatwidget.rs`
- plus impacted core tests.
Validation
- `just fmt`
- `cargo test -p codex-core`
- `cargo test -p codex-app-server`
- `cargo test -p codex-mcp-server`
- `cargo test -p codex-tui`
- `cargo test -p codex-tui2`
- `cargo test -p codex-protocol`
- `cargo clippy --all-features --tests --profile dev --fix -- -D
warnings`
1088 lines
37 KiB
Rust
1088 lines
37 KiB
Rust
use anyhow::Result;
|
|
use app_test_support::McpProcess;
|
|
use app_test_support::create_apply_patch_sse_response;
|
|
use app_test_support::create_exec_command_sse_response;
|
|
use app_test_support::create_final_assistant_message_sse_response;
|
|
use app_test_support::create_mock_chat_completions_server;
|
|
use app_test_support::create_mock_chat_completions_server_unchecked;
|
|
use app_test_support::create_shell_command_sse_response;
|
|
use app_test_support::format_with_current_shell_display;
|
|
use app_test_support::to_response;
|
|
use codex_app_server_protocol::ApprovalDecision;
|
|
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
|
|
use codex_app_server_protocol::CommandExecutionStatus;
|
|
use codex_app_server_protocol::FileChangeOutputDeltaNotification;
|
|
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
|
|
use codex_app_server_protocol::ItemCompletedNotification;
|
|
use codex_app_server_protocol::ItemStartedNotification;
|
|
use codex_app_server_protocol::JSONRPCNotification;
|
|
use codex_app_server_protocol::JSONRPCResponse;
|
|
use codex_app_server_protocol::PatchApplyStatus;
|
|
use codex_app_server_protocol::PatchChangeKind;
|
|
use codex_app_server_protocol::RequestId;
|
|
use codex_app_server_protocol::ServerRequest;
|
|
use codex_app_server_protocol::ThreadItem;
|
|
use codex_app_server_protocol::ThreadStartParams;
|
|
use codex_app_server_protocol::ThreadStartResponse;
|
|
use codex_app_server_protocol::TurnCompletedNotification;
|
|
use codex_app_server_protocol::TurnStartParams;
|
|
use codex_app_server_protocol::TurnStartResponse;
|
|
use codex_app_server_protocol::TurnStartedNotification;
|
|
use codex_app_server_protocol::TurnStatus;
|
|
use codex_app_server_protocol::UserInput as V2UserInput;
|
|
use codex_core::protocol_config_types::ReasoningSummary;
|
|
use codex_protocol::openai_models::ReasoningEffort;
|
|
use core_test_support::skip_if_no_network;
|
|
use pretty_assertions::assert_eq;
|
|
use std::path::Path;
|
|
use tempfile::TempDir;
|
|
use tokio::time::timeout;
|
|
|
|
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
|
|
|
#[tokio::test]
|
|
async fn turn_start_emits_notifications_and_accepts_model_override() -> Result<()> {
|
|
// Provide a mock server and config so model wiring is valid.
|
|
// Three Codex turns hit the mock model (session start + two turn/start calls).
|
|
let responses = vec![
|
|
create_final_assistant_message_sse_response("Done")?,
|
|
create_final_assistant_message_sse_response("Done")?,
|
|
create_final_assistant_message_sse_response("Done")?,
|
|
];
|
|
let server = create_mock_chat_completions_server_unchecked(responses).await;
|
|
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
// Start a thread (v2) and capture its id.
|
|
let thread_req = mcp
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("mock-model".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let thread_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
|
|
|
// Start a turn with only input and thread_id set (no overrides).
|
|
let turn_req = mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![V2UserInput::Text {
|
|
text: "Hello".to_string(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let turn_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
|
|
)
|
|
.await??;
|
|
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
|
assert!(!turn.id.is_empty());
|
|
|
|
// Expect a turn/started notification.
|
|
let notif: JSONRPCNotification = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/started"),
|
|
)
|
|
.await??;
|
|
let started: TurnStartedNotification =
|
|
serde_json::from_value(notif.params.expect("params must be present"))?;
|
|
assert_eq!(started.thread_id, thread.id);
|
|
assert_eq!(
|
|
started.turn.status,
|
|
codex_app_server_protocol::TurnStatus::InProgress
|
|
);
|
|
|
|
// Send a second turn that exercises the overrides path: change the model.
|
|
let turn_req2 = mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![V2UserInput::Text {
|
|
text: "Second".to_string(),
|
|
}],
|
|
model: Some("mock-model-override".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let turn_resp2: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(turn_req2)),
|
|
)
|
|
.await??;
|
|
let TurnStartResponse { turn: turn2 } = to_response::<TurnStartResponse>(turn_resp2)?;
|
|
assert!(!turn2.id.is_empty());
|
|
// Ensure the second turn has a different id than the first.
|
|
assert_ne!(turn.id, turn2.id);
|
|
|
|
// Expect a second turn/started notification as well.
|
|
let _notif2: JSONRPCNotification = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/started"),
|
|
)
|
|
.await??;
|
|
|
|
let completed_notif: JSONRPCNotification = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
let completed: TurnCompletedNotification = serde_json::from_value(
|
|
completed_notif
|
|
.params
|
|
.expect("turn/completed params must be present"),
|
|
)?;
|
|
assert_eq!(completed.thread_id, thread.id);
|
|
assert_eq!(completed.turn.status, TurnStatus::Completed);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn turn_start_accepts_local_image_input() -> Result<()> {
|
|
// Two Codex turns hit the mock model (session start + turn/start).
|
|
let responses = vec![
|
|
create_final_assistant_message_sse_response("Done")?,
|
|
create_final_assistant_message_sse_response("Done")?,
|
|
];
|
|
// Use the unchecked variant because the request payload includes a LocalImage
|
|
// which the strict matcher does not currently cover.
|
|
let server = create_mock_chat_completions_server_unchecked(responses).await;
|
|
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let thread_req = mcp
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("mock-model".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let thread_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
|
|
|
let image_path = codex_home.path().join("image.png");
|
|
// No need to actually write the file; we just exercise the input path.
|
|
|
|
let turn_req = mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![V2UserInput::LocalImage { path: image_path }],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let turn_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
|
|
)
|
|
.await??;
|
|
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
|
assert!(!turn.id.is_empty());
|
|
|
|
// This test only validates that turn/start responds and returns a turn.
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let tmp = TempDir::new()?;
|
|
let codex_home = tmp.path().to_path_buf();
|
|
|
|
// Mock server: first turn requests a shell call (elicitation), then completes.
|
|
// Second turn same, but we'll set approval_policy=never to avoid elicitation.
|
|
let responses = vec![
|
|
create_shell_command_sse_response(
|
|
vec![
|
|
"python3".to_string(),
|
|
"-c".to_string(),
|
|
"print(42)".to_string(),
|
|
],
|
|
None,
|
|
Some(5000),
|
|
"call1",
|
|
)?,
|
|
create_final_assistant_message_sse_response("done 1")?,
|
|
create_shell_command_sse_response(
|
|
vec![
|
|
"python3".to_string(),
|
|
"-c".to_string(),
|
|
"print(42)".to_string(),
|
|
],
|
|
None,
|
|
Some(5000),
|
|
"call2",
|
|
)?,
|
|
create_final_assistant_message_sse_response("done 2")?,
|
|
];
|
|
let server = create_mock_chat_completions_server(responses).await;
|
|
// Default approval is untrusted to force elicitation on first turn.
|
|
create_config_toml(codex_home.as_path(), &server.uri(), "untrusted")?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.as_path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
// thread/start
|
|
let start_id = mcp
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("mock-model".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
// turn/start — expect CommandExecutionRequestApproval request from server
|
|
let first_turn_id = mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![V2UserInput::Text {
|
|
text: "run python".to_string(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
// Acknowledge RPC
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(first_turn_id)),
|
|
)
|
|
.await??;
|
|
|
|
// Receive elicitation
|
|
let server_req = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_request_message(),
|
|
)
|
|
.await??;
|
|
let ServerRequest::CommandExecutionRequestApproval { request_id, params } = server_req else {
|
|
panic!("expected CommandExecutionRequestApproval request");
|
|
};
|
|
assert_eq!(params.item_id, "call1");
|
|
|
|
// Approve and wait for task completion
|
|
mcp.send_response(
|
|
request_id,
|
|
serde_json::json!({ "decision": codex_core::protocol::ReviewDecision::Approved }),
|
|
)
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("codex/event/task_complete"),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
// Second turn with approval_policy=never should not elicit approval
|
|
let second_turn_id = mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![V2UserInput::Text {
|
|
text: "run python again".to_string(),
|
|
}],
|
|
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
|
|
sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::DangerFullAccess),
|
|
model: Some("mock-model".to_string()),
|
|
effort: Some(ReasoningEffort::Medium),
|
|
summary: Some(ReasoningSummary::Auto),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(second_turn_id)),
|
|
)
|
|
.await??;
|
|
|
|
// Ensure we do NOT receive a CommandExecutionRequestApproval request before task completes
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("codex/event/task_complete"),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn turn_start_exec_approval_decline_v2() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let tmp = TempDir::new()?;
|
|
let codex_home = tmp.path().to_path_buf();
|
|
let workspace = tmp.path().join("workspace");
|
|
std::fs::create_dir(&workspace)?;
|
|
|
|
let responses = vec![
|
|
create_shell_command_sse_response(
|
|
vec![
|
|
"python3".to_string(),
|
|
"-c".to_string(),
|
|
"print(42)".to_string(),
|
|
],
|
|
None,
|
|
Some(5000),
|
|
"call-decline",
|
|
)?,
|
|
create_final_assistant_message_sse_response("done")?,
|
|
];
|
|
let server = create_mock_chat_completions_server(responses).await;
|
|
create_config_toml(codex_home.as_path(), &server.uri(), "untrusted")?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.as_path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let start_id = mcp
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("mock-model".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let turn_id = mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![V2UserInput::Text {
|
|
text: "run python".to_string(),
|
|
}],
|
|
cwd: Some(workspace.clone()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let turn_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
|
)
|
|
.await??;
|
|
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
|
|
|
let started_command_execution = timeout(DEFAULT_READ_TIMEOUT, async {
|
|
loop {
|
|
let started_notif = mcp
|
|
.read_stream_until_notification_message("item/started")
|
|
.await?;
|
|
let started: ItemStartedNotification =
|
|
serde_json::from_value(started_notif.params.clone().expect("item/started params"))?;
|
|
if let ThreadItem::CommandExecution { .. } = started.item {
|
|
return Ok::<ThreadItem, anyhow::Error>(started.item);
|
|
}
|
|
}
|
|
})
|
|
.await??;
|
|
let ThreadItem::CommandExecution { id, status, .. } = started_command_execution else {
|
|
unreachable!("loop ensures we break on command execution items");
|
|
};
|
|
assert_eq!(id, "call-decline");
|
|
assert_eq!(status, CommandExecutionStatus::InProgress);
|
|
|
|
let server_req = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_request_message(),
|
|
)
|
|
.await??;
|
|
let ServerRequest::CommandExecutionRequestApproval { request_id, params } = server_req else {
|
|
panic!("expected CommandExecutionRequestApproval request")
|
|
};
|
|
assert_eq!(params.item_id, "call-decline");
|
|
assert_eq!(params.thread_id, thread.id);
|
|
assert_eq!(params.turn_id, turn.id);
|
|
|
|
mcp.send_response(
|
|
request_id,
|
|
serde_json::to_value(CommandExecutionRequestApprovalResponse {
|
|
decision: ApprovalDecision::Decline,
|
|
})?,
|
|
)
|
|
.await?;
|
|
|
|
let completed_command_execution = timeout(DEFAULT_READ_TIMEOUT, async {
|
|
loop {
|
|
let completed_notif = mcp
|
|
.read_stream_until_notification_message("item/completed")
|
|
.await?;
|
|
let completed: ItemCompletedNotification = serde_json::from_value(
|
|
completed_notif
|
|
.params
|
|
.clone()
|
|
.expect("item/completed params"),
|
|
)?;
|
|
if let ThreadItem::CommandExecution { .. } = completed.item {
|
|
return Ok::<ThreadItem, anyhow::Error>(completed.item);
|
|
}
|
|
}
|
|
})
|
|
.await??;
|
|
let ThreadItem::CommandExecution {
|
|
id,
|
|
status,
|
|
exit_code,
|
|
aggregated_output,
|
|
..
|
|
} = completed_command_execution
|
|
else {
|
|
unreachable!("loop ensures we break on command execution items");
|
|
};
|
|
assert_eq!(id, "call-decline");
|
|
assert_eq!(status, CommandExecutionStatus::Declined);
|
|
assert!(exit_code.is_none());
|
|
assert!(aggregated_output.is_none());
|
|
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("codex/event/task_complete"),
|
|
)
|
|
.await??;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let tmp = TempDir::new()?;
|
|
let codex_home = tmp.path().join("codex_home");
|
|
std::fs::create_dir(&codex_home)?;
|
|
let workspace_root = tmp.path().join("workspace");
|
|
std::fs::create_dir(&workspace_root)?;
|
|
let first_cwd = workspace_root.join("turn1");
|
|
let second_cwd = workspace_root.join("turn2");
|
|
std::fs::create_dir(&first_cwd)?;
|
|
std::fs::create_dir(&second_cwd)?;
|
|
|
|
let responses = vec![
|
|
create_shell_command_sse_response(
|
|
vec!["echo".to_string(), "first".to_string(), "turn".to_string()],
|
|
None,
|
|
Some(5000),
|
|
"call-first",
|
|
)?,
|
|
create_final_assistant_message_sse_response("done first")?,
|
|
create_shell_command_sse_response(
|
|
vec!["echo".to_string(), "second".to_string(), "turn".to_string()],
|
|
None,
|
|
Some(5000),
|
|
"call-second",
|
|
)?,
|
|
create_final_assistant_message_sse_response("done second")?,
|
|
];
|
|
let server = create_mock_chat_completions_server(responses).await;
|
|
create_config_toml(&codex_home, &server.uri(), "untrusted")?;
|
|
|
|
let mut mcp = McpProcess::new(&codex_home).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
// thread/start
|
|
let start_id = mcp
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("mock-model".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
// first turn with workspace-write sandbox and first_cwd
|
|
let first_turn = mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![V2UserInput::Text {
|
|
text: "first turn".to_string(),
|
|
}],
|
|
cwd: Some(first_cwd.clone()),
|
|
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
|
|
sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::WorkspaceWrite {
|
|
writable_roots: vec![first_cwd.try_into()?],
|
|
network_access: false,
|
|
exclude_tmpdir_env_var: false,
|
|
exclude_slash_tmp: false,
|
|
}),
|
|
model: Some("mock-model".to_string()),
|
|
effort: Some(ReasoningEffort::Medium),
|
|
summary: Some(ReasoningSummary::Auto),
|
|
output_schema: None,
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(first_turn)),
|
|
)
|
|
.await??;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("codex/event/task_complete"),
|
|
)
|
|
.await??;
|
|
|
|
// second turn with workspace-write and second_cwd, ensure exec begins in second_cwd
|
|
let second_turn = mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![V2UserInput::Text {
|
|
text: "second turn".to_string(),
|
|
}],
|
|
cwd: Some(second_cwd.clone()),
|
|
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
|
|
sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::DangerFullAccess),
|
|
model: Some("mock-model".to_string()),
|
|
effort: Some(ReasoningEffort::Medium),
|
|
summary: Some(ReasoningSummary::Auto),
|
|
output_schema: None,
|
|
})
|
|
.await?;
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(second_turn)),
|
|
)
|
|
.await??;
|
|
|
|
let command_exec_item = timeout(DEFAULT_READ_TIMEOUT, async {
|
|
loop {
|
|
let item_started_notification = mcp
|
|
.read_stream_until_notification_message("item/started")
|
|
.await?;
|
|
let params = item_started_notification
|
|
.params
|
|
.clone()
|
|
.expect("item/started params");
|
|
let item_started: ItemStartedNotification =
|
|
serde_json::from_value(params).expect("deserialize item/started notification");
|
|
if matches!(item_started.item, ThreadItem::CommandExecution { .. }) {
|
|
return Ok::<ThreadItem, anyhow::Error>(item_started.item);
|
|
}
|
|
}
|
|
})
|
|
.await??;
|
|
let ThreadItem::CommandExecution {
|
|
cwd,
|
|
command,
|
|
status,
|
|
..
|
|
} = command_exec_item
|
|
else {
|
|
unreachable!("loop ensures we break on command execution items");
|
|
};
|
|
assert_eq!(cwd, second_cwd);
|
|
let expected_command = format_with_current_shell_display("echo second turn");
|
|
assert_eq!(command, expected_command);
|
|
assert_eq!(status, CommandExecutionStatus::InProgress);
|
|
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("codex/event/task_complete"),
|
|
)
|
|
.await??;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn turn_start_file_change_approval_v2() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let tmp = TempDir::new()?;
|
|
let codex_home = tmp.path().join("codex_home");
|
|
std::fs::create_dir(&codex_home)?;
|
|
let workspace = tmp.path().join("workspace");
|
|
std::fs::create_dir(&workspace)?;
|
|
|
|
let patch = r#"*** Begin Patch
|
|
*** Add File: README.md
|
|
+new line
|
|
*** End Patch
|
|
"#;
|
|
let responses = vec![
|
|
create_apply_patch_sse_response(patch, "patch-call")?,
|
|
create_final_assistant_message_sse_response("patch applied")?,
|
|
];
|
|
let server = create_mock_chat_completions_server(responses).await;
|
|
create_config_toml(&codex_home, &server.uri(), "untrusted")?;
|
|
|
|
let mut mcp = McpProcess::new(&codex_home).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let start_req = mcp
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("mock-model".to_string()),
|
|
cwd: Some(workspace.to_string_lossy().into_owned()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(start_req)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let turn_req = mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![V2UserInput::Text {
|
|
text: "apply patch".into(),
|
|
}],
|
|
cwd: Some(workspace.clone()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let turn_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
|
|
)
|
|
.await??;
|
|
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
|
|
|
let started_file_change = timeout(DEFAULT_READ_TIMEOUT, async {
|
|
loop {
|
|
let started_notif = mcp
|
|
.read_stream_until_notification_message("item/started")
|
|
.await?;
|
|
let started: ItemStartedNotification =
|
|
serde_json::from_value(started_notif.params.clone().expect("item/started params"))?;
|
|
if let ThreadItem::FileChange { .. } = started.item {
|
|
return Ok::<ThreadItem, anyhow::Error>(started.item);
|
|
}
|
|
}
|
|
})
|
|
.await??;
|
|
let ThreadItem::FileChange {
|
|
ref id,
|
|
status,
|
|
ref changes,
|
|
} = started_file_change
|
|
else {
|
|
unreachable!("loop ensures we break on file change items");
|
|
};
|
|
assert_eq!(id, "patch-call");
|
|
assert_eq!(status, PatchApplyStatus::InProgress);
|
|
let started_changes = changes.clone();
|
|
|
|
let server_req = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_request_message(),
|
|
)
|
|
.await??;
|
|
let ServerRequest::FileChangeRequestApproval { request_id, params } = server_req else {
|
|
panic!("expected FileChangeRequestApproval request")
|
|
};
|
|
assert_eq!(params.item_id, "patch-call");
|
|
assert_eq!(params.thread_id, thread.id);
|
|
assert_eq!(params.turn_id, turn.id);
|
|
let expected_readme_path = workspace.join("README.md");
|
|
let expected_readme_path = expected_readme_path.to_string_lossy().into_owned();
|
|
pretty_assertions::assert_eq!(
|
|
started_changes,
|
|
vec![codex_app_server_protocol::FileUpdateChange {
|
|
path: expected_readme_path.clone(),
|
|
kind: PatchChangeKind::Add,
|
|
diff: "new line\n".to_string(),
|
|
}]
|
|
);
|
|
|
|
mcp.send_response(
|
|
request_id,
|
|
serde_json::to_value(FileChangeRequestApprovalResponse {
|
|
decision: ApprovalDecision::Accept,
|
|
})?,
|
|
)
|
|
.await?;
|
|
|
|
let output_delta_notif = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("item/fileChange/outputDelta"),
|
|
)
|
|
.await??;
|
|
let output_delta: FileChangeOutputDeltaNotification = serde_json::from_value(
|
|
output_delta_notif
|
|
.params
|
|
.clone()
|
|
.expect("item/fileChange/outputDelta params"),
|
|
)?;
|
|
assert_eq!(output_delta.thread_id, thread.id);
|
|
assert_eq!(output_delta.turn_id, turn.id);
|
|
assert_eq!(output_delta.item_id, "patch-call");
|
|
assert!(
|
|
!output_delta.delta.is_empty(),
|
|
"expected delta to be non-empty, got: {}",
|
|
output_delta.delta
|
|
);
|
|
|
|
let completed_file_change = timeout(DEFAULT_READ_TIMEOUT, async {
|
|
loop {
|
|
let completed_notif = mcp
|
|
.read_stream_until_notification_message("item/completed")
|
|
.await?;
|
|
let completed: ItemCompletedNotification = serde_json::from_value(
|
|
completed_notif
|
|
.params
|
|
.clone()
|
|
.expect("item/completed params"),
|
|
)?;
|
|
if let ThreadItem::FileChange { .. } = completed.item {
|
|
return Ok::<ThreadItem, anyhow::Error>(completed.item);
|
|
}
|
|
}
|
|
})
|
|
.await??;
|
|
let ThreadItem::FileChange { ref id, status, .. } = completed_file_change else {
|
|
unreachable!("loop ensures we break on file change items");
|
|
};
|
|
assert_eq!(id, "patch-call");
|
|
assert_eq!(status, PatchApplyStatus::Completed);
|
|
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("codex/event/task_complete"),
|
|
)
|
|
.await??;
|
|
|
|
let readme_contents = std::fs::read_to_string(expected_readme_path)?;
|
|
assert_eq!(readme_contents, "new line\n");
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn turn_start_file_change_approval_decline_v2() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let tmp = TempDir::new()?;
|
|
let codex_home = tmp.path().join("codex_home");
|
|
std::fs::create_dir(&codex_home)?;
|
|
let workspace = tmp.path().join("workspace");
|
|
std::fs::create_dir(&workspace)?;
|
|
|
|
let patch = r#"*** Begin Patch
|
|
*** Add File: README.md
|
|
+new line
|
|
*** End Patch
|
|
"#;
|
|
let responses = vec![
|
|
create_apply_patch_sse_response(patch, "patch-call")?,
|
|
create_final_assistant_message_sse_response("patch declined")?,
|
|
];
|
|
let server = create_mock_chat_completions_server(responses).await;
|
|
create_config_toml(&codex_home, &server.uri(), "untrusted")?;
|
|
|
|
let mut mcp = McpProcess::new(&codex_home).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let start_req = mcp
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("mock-model".to_string()),
|
|
cwd: Some(workspace.to_string_lossy().into_owned()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(start_req)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let turn_req = mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![V2UserInput::Text {
|
|
text: "apply patch".into(),
|
|
}],
|
|
cwd: Some(workspace.clone()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let turn_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
|
|
)
|
|
.await??;
|
|
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
|
|
|
let started_file_change = timeout(DEFAULT_READ_TIMEOUT, async {
|
|
loop {
|
|
let started_notif = mcp
|
|
.read_stream_until_notification_message("item/started")
|
|
.await?;
|
|
let started: ItemStartedNotification =
|
|
serde_json::from_value(started_notif.params.clone().expect("item/started params"))?;
|
|
if let ThreadItem::FileChange { .. } = started.item {
|
|
return Ok::<ThreadItem, anyhow::Error>(started.item);
|
|
}
|
|
}
|
|
})
|
|
.await??;
|
|
let ThreadItem::FileChange {
|
|
ref id,
|
|
status,
|
|
ref changes,
|
|
} = started_file_change
|
|
else {
|
|
unreachable!("loop ensures we break on file change items");
|
|
};
|
|
assert_eq!(id, "patch-call");
|
|
assert_eq!(status, PatchApplyStatus::InProgress);
|
|
let started_changes = changes.clone();
|
|
|
|
let server_req = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_request_message(),
|
|
)
|
|
.await??;
|
|
let ServerRequest::FileChangeRequestApproval { request_id, params } = server_req else {
|
|
panic!("expected FileChangeRequestApproval request")
|
|
};
|
|
assert_eq!(params.item_id, "patch-call");
|
|
assert_eq!(params.thread_id, thread.id);
|
|
assert_eq!(params.turn_id, turn.id);
|
|
let expected_readme_path = workspace.join("README.md");
|
|
let expected_readme_path_str = expected_readme_path.to_string_lossy().into_owned();
|
|
pretty_assertions::assert_eq!(
|
|
started_changes,
|
|
vec![codex_app_server_protocol::FileUpdateChange {
|
|
path: expected_readme_path_str.clone(),
|
|
kind: PatchChangeKind::Add,
|
|
diff: "new line\n".to_string(),
|
|
}]
|
|
);
|
|
|
|
mcp.send_response(
|
|
request_id,
|
|
serde_json::to_value(FileChangeRequestApprovalResponse {
|
|
decision: ApprovalDecision::Decline,
|
|
})?,
|
|
)
|
|
.await?;
|
|
|
|
let completed_file_change = timeout(DEFAULT_READ_TIMEOUT, async {
|
|
loop {
|
|
let completed_notif = mcp
|
|
.read_stream_until_notification_message("item/completed")
|
|
.await?;
|
|
let completed: ItemCompletedNotification = serde_json::from_value(
|
|
completed_notif
|
|
.params
|
|
.clone()
|
|
.expect("item/completed params"),
|
|
)?;
|
|
if let ThreadItem::FileChange { .. } = completed.item {
|
|
return Ok::<ThreadItem, anyhow::Error>(completed.item);
|
|
}
|
|
}
|
|
})
|
|
.await??;
|
|
let ThreadItem::FileChange { ref id, status, .. } = completed_file_change else {
|
|
unreachable!("loop ensures we break on file change items");
|
|
};
|
|
assert_eq!(id, "patch-call");
|
|
assert_eq!(status, PatchApplyStatus::Declined);
|
|
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("codex/event/task_complete"),
|
|
)
|
|
.await??;
|
|
|
|
assert!(
|
|
!expected_readme_path.exists(),
|
|
"declined patch should not be applied"
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
#[cfg_attr(windows, ignore = "process id reporting differs on Windows")]
|
|
async fn command_execution_notifications_include_process_id() -> Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let responses = vec![
|
|
create_exec_command_sse_response("uexec-1")?,
|
|
create_final_assistant_message_sse_response("done")?,
|
|
];
|
|
let server = create_mock_chat_completions_server(responses).await;
|
|
let codex_home = TempDir::new()?;
|
|
create_config_toml(codex_home.path(), &server.uri(), "never")?;
|
|
let config_toml = codex_home.path().join("config.toml");
|
|
let mut config_contents = std::fs::read_to_string(&config_toml)?;
|
|
config_contents.push_str(
|
|
r#"
|
|
[features]
|
|
unified_exec = true
|
|
"#,
|
|
);
|
|
std::fs::write(&config_toml, config_contents)?;
|
|
|
|
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
|
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
|
|
|
let start_id = mcp
|
|
.send_thread_start_request(ThreadStartParams {
|
|
model: Some("mock-model".to_string()),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let start_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
|
|
)
|
|
.await??;
|
|
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
|
|
|
let turn_id = mcp
|
|
.send_turn_start_request(TurnStartParams {
|
|
thread_id: thread.id.clone(),
|
|
input: vec![V2UserInput::Text {
|
|
text: "run a command".to_string(),
|
|
}],
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let turn_resp: JSONRPCResponse = timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
|
)
|
|
.await??;
|
|
let TurnStartResponse { turn: _turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
|
|
|
let started_command = timeout(DEFAULT_READ_TIMEOUT, async {
|
|
loop {
|
|
let notif = mcp
|
|
.read_stream_until_notification_message("item/started")
|
|
.await?;
|
|
let started: ItemStartedNotification = serde_json::from_value(
|
|
notif
|
|
.params
|
|
.clone()
|
|
.expect("item/started should include params"),
|
|
)?;
|
|
if let ThreadItem::CommandExecution { .. } = started.item {
|
|
return Ok::<ThreadItem, anyhow::Error>(started.item);
|
|
}
|
|
}
|
|
})
|
|
.await??;
|
|
let ThreadItem::CommandExecution {
|
|
id,
|
|
process_id: started_process_id,
|
|
status,
|
|
..
|
|
} = started_command
|
|
else {
|
|
unreachable!("loop ensures we break on command execution items");
|
|
};
|
|
assert_eq!(id, "uexec-1");
|
|
assert_eq!(status, CommandExecutionStatus::InProgress);
|
|
let started_process_id = started_process_id.expect("process id should be present");
|
|
|
|
let completed_command = timeout(DEFAULT_READ_TIMEOUT, async {
|
|
loop {
|
|
let notif = mcp
|
|
.read_stream_until_notification_message("item/completed")
|
|
.await?;
|
|
let completed: ItemCompletedNotification = serde_json::from_value(
|
|
notif
|
|
.params
|
|
.clone()
|
|
.expect("item/completed should include params"),
|
|
)?;
|
|
if let ThreadItem::CommandExecution { .. } = completed.item {
|
|
return Ok::<ThreadItem, anyhow::Error>(completed.item);
|
|
}
|
|
}
|
|
})
|
|
.await??;
|
|
let ThreadItem::CommandExecution {
|
|
id: completed_id,
|
|
process_id: completed_process_id,
|
|
status: completed_status,
|
|
exit_code,
|
|
..
|
|
} = completed_command
|
|
else {
|
|
unreachable!("loop ensures we break on command execution items");
|
|
};
|
|
assert_eq!(completed_id, "uexec-1");
|
|
assert_eq!(completed_status, CommandExecutionStatus::Completed);
|
|
assert_eq!(exit_code, Some(0));
|
|
assert_eq!(
|
|
completed_process_id.as_deref(),
|
|
Some(started_process_id.as_str())
|
|
);
|
|
|
|
timeout(
|
|
DEFAULT_READ_TIMEOUT,
|
|
mcp.read_stream_until_notification_message("turn/completed"),
|
|
)
|
|
.await??;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
// Helper to create a config.toml pointing at the mock model server.
|
|
fn create_config_toml(
|
|
codex_home: &Path,
|
|
server_uri: &str,
|
|
approval_policy: &str,
|
|
) -> std::io::Result<()> {
|
|
let config_toml = codex_home.join("config.toml");
|
|
std::fs::write(
|
|
config_toml,
|
|
format!(
|
|
r#"
|
|
model = "mock-model"
|
|
approval_policy = "{approval_policy}"
|
|
sandbox_mode = "read-only"
|
|
|
|
model_provider = "mock_provider"
|
|
|
|
[model_providers.mock_provider]
|
|
name = "Mock provider for test"
|
|
base_url = "{server_uri}/v1"
|
|
wire_api = "chat"
|
|
request_max_retries = 0
|
|
stream_max_retries = 0
|
|
"#
|
|
),
|
|
)
|
|
}
|