Compare commits

..

1 Commits

Author SHA1 Message Date
Michael Bolin
0d2ceb199f Release 0.34.0 2025-09-10 14:15:41 -07:00
40 changed files with 549 additions and 1814 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -769,7 +769,6 @@ version = "0.0.0"
dependencies = [
"anyhow",
"assert_cmd",
"base64",
"codex-arg0",
"codex-common",
"codex-core",

View File

@@ -22,7 +22,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.0.0"
version = "0.34.0"
# Track the edition for all workspace crates in one place. Individual
# crates can still override this value, but keeping it here means new
# crates created with `cargo new -w ...` automatically inherit the 2024

View File

@@ -54,7 +54,6 @@ tracing = { version = "0.1.41", features = ["log"] }
tree-sitter = "0.25.9"
tree-sitter-bash = "0.25.0"
uuid = { version = "1", features = ["serde", "v4"] }
which = "6"
wildmatch = "2.4.0"
@@ -70,6 +69,9 @@ openssl-sys = { version = "*", features = ["vendored"] }
[target.aarch64-unknown-linux-musl.dependencies]
openssl-sys = { version = "*", features = ["vendored"] }
[target.'cfg(target_os = "windows")'.dependencies]
which = "6"
[dev-dependencies]
assert_cmd = "2"
core_test_support = { path = "tests/common" }

View File

@@ -19,14 +19,13 @@ use codex_apply_patch::ApplyPatchAction;
use codex_apply_patch::MaybeApplyPatchVerified;
use codex_apply_patch::maybe_parse_apply_patch_verified;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::protocol::ConversationPathResponseEvent;
use codex_protocol::protocol::ConversationHistoryResponseEvent;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::TaskStartedEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use futures::prelude::*;
use mcp_types::CallToolResult;
use serde::Deserialize;
use serde::Serialize;
use serde_json;
use tokio::sync::oneshot;
@@ -113,7 +112,6 @@ use crate::safety::assess_command_safety;
use crate::safety::assess_safety_for_untrusted_command;
use crate::shell;
use crate::turn_diff_tracker::TurnDiffTracker;
use crate::unified_exec::UnifiedExecSessionManager;
use crate::user_instructions::UserInstructions;
use crate::user_notification::UserNotification;
use crate::util::backoff;
@@ -282,7 +280,6 @@ pub(crate) struct Session {
/// Manager for external MCP servers/tools.
mcp_connection_manager: McpConnectionManager,
session_manager: ExecSessionManager,
unified_exec_manager: UnifiedExecSessionManager,
/// External notifier command (will be passed as args to exec()). When
/// `None` this feature is disabled.
@@ -468,12 +465,12 @@ impl Session {
tools_config: ToolsConfig::new(&ToolsConfigParams {
model_family: &config.model_family,
approval_policy,
sandbox_policy: sandbox_policy.clone(),
include_plan_tool: config.include_plan_tool,
include_apply_patch_tool: config.include_apply_patch_tool,
include_web_search_request: config.tools_web_search_request,
use_streamable_shell_tool: config.use_experimental_streamable_shell_tool,
include_view_image_tool: config.include_view_image_tool,
experimental_unified_exec_tool: config.use_experimental_unified_exec_tool,
}),
user_instructions,
base_instructions,
@@ -487,7 +484,6 @@ impl Session {
tx_event: tx_event.clone(),
mcp_connection_manager,
session_manager: ExecSessionManager::default(),
unified_exec_manager: UnifiedExecSessionManager::default(),
notify,
state: Mutex::new(state),
rollout: Mutex::new(Some(rollout_recorder)),
@@ -1147,12 +1143,12 @@ async fn submission_loop(
let tools_config = ToolsConfig::new(&ToolsConfigParams {
model_family: &effective_family,
approval_policy: new_approval_policy,
sandbox_policy: new_sandbox_policy.clone(),
include_plan_tool: config.include_plan_tool,
include_apply_patch_tool: config.include_apply_patch_tool,
include_web_search_request: config.tools_web_search_request,
use_streamable_shell_tool: config.use_experimental_streamable_shell_tool,
include_view_image_tool: config.include_view_image_tool,
experimental_unified_exec_tool: config.use_experimental_unified_exec_tool,
});
let new_turn_context = TurnContext {
@@ -1184,18 +1180,26 @@ async fn submission_loop(
{
warn!("failed to persist overrides: {e:#}");
}
if cwd.is_some() || approval_policy.is_some() || sandbox_policy.is_some() {
sess.record_conversation_items(&[ResponseItem::from(EnvironmentContext::new(
cwd,
approval_policy,
sandbox_policy,
// Shell is not configurable from turn to turn
None,
))])
.await;
}
}
Op::UserInput { items } => {
submit_user_input(
turn_context.cwd.clone(),
turn_context.approval_policy,
turn_context.sandbox_policy.clone(),
&sess,
&turn_context,
sub.id.clone(),
items,
)
.await;
// attempt to inject input into current task
if let Err(items) = sess.inject_input(items) {
// no current task, spawn a new one
let task =
AgentTask::spawn(sess.clone(), Arc::clone(&turn_context), sub.id, items);
sess.set_task(task);
}
}
Op::UserTurn {
items,
@@ -1240,14 +1244,13 @@ async fn submission_loop(
tools_config: ToolsConfig::new(&ToolsConfigParams {
model_family: &model_family,
approval_policy,
sandbox_policy: sandbox_policy.clone(),
include_plan_tool: config.include_plan_tool,
include_apply_patch_tool: config.include_apply_patch_tool,
include_web_search_request: config.tools_web_search_request,
use_streamable_shell_tool: config
.use_experimental_streamable_shell_tool,
include_view_image_tool: config.include_view_image_tool,
experimental_unified_exec_tool: config
.use_experimental_unified_exec_tool,
}),
user_instructions: turn_context.user_instructions.clone(),
base_instructions: turn_context.base_instructions.clone(),
@@ -1256,16 +1259,11 @@ async fn submission_loop(
shell_environment_policy: turn_context.shell_environment_policy.clone(),
cwd,
};
submit_user_input(
fresh_turn_context.cwd.clone(),
fresh_turn_context.approval_policy,
fresh_turn_context.sandbox_policy.clone(),
&sess,
&Arc::new(fresh_turn_context),
sub.id.clone(),
items,
)
.await;
// TODO: record the new environment context in the conversation history
// no current task, spawn a new one with the perturn context
let task =
AgentTask::spawn(sess.clone(), Arc::new(fresh_turn_context), sub.id, items);
sess.set_task(task);
}
}
Op::ExecApproval { id, decision } => match decision {
@@ -1399,29 +1397,14 @@ async fn submission_loop(
sess.send_event(event).await;
break;
}
Op::GetPath => {
Op::GetHistory => {
let sub_id = sub.id.clone();
// Flush rollout writes before returning the path so readers observe a consistent file.
let (path, rec_opt) = {
let guard = sess.rollout.lock_unchecked();
match guard.as_ref() {
Some(rec) => (rec.get_rollout_path(), Some(rec.clone())),
None => {
error!("rollout recorder not found");
continue;
}
}
};
if let Some(rec) = rec_opt
&& let Err(e) = rec.flush().await
{
warn!("failed to flush rollout recorder before GetHistory: {e}");
}
let event = Event {
id: sub_id.clone(),
msg: EventMsg::ConversationPath(ConversationPathResponseEvent {
msg: EventMsg::ConversationHistory(ConversationHistoryResponseEvent {
conversation_id: sess.conversation_id,
path,
entries: sess.state.lock_unchecked().history.contents(),
}),
};
sess.send_event(event).await;
@@ -2099,72 +2082,6 @@ async fn handle_response_item(
Ok(output)
}
async fn handle_unified_exec_tool_call(
sess: &Session,
call_id: String,
session_id: Option<String>,
arguments: Vec<String>,
timeout_ms: Option<u64>,
) -> ResponseInputItem {
let parsed_session_id = if let Some(session_id) = session_id {
match session_id.parse::<i32>() {
Ok(parsed) => Some(parsed),
Err(output) => {
return ResponseInputItem::FunctionCallOutput {
call_id: call_id.to_string(),
output: FunctionCallOutputPayload {
content: format!("invalid session_id: {session_id} due to error {output}"),
success: Some(false),
},
};
}
}
} else {
None
};
let request = crate::unified_exec::UnifiedExecRequest {
session_id: parsed_session_id,
input_chunks: &arguments,
timeout_ms,
};
let result = sess.unified_exec_manager.handle_request(request).await;
let output_payload = match result {
Ok(value) => {
#[derive(Serialize)]
struct SerializedUnifiedExecResult<'a> {
session_id: Option<String>,
output: &'a str,
}
match serde_json::to_string(&SerializedUnifiedExecResult {
session_id: value.session_id.map(|id| id.to_string()),
output: &value.output,
}) {
Ok(serialized) => FunctionCallOutputPayload {
content: serialized,
success: Some(true),
},
Err(err) => FunctionCallOutputPayload {
content: format!("failed to serialize unified exec output: {err}"),
success: Some(false),
},
}
}
Err(err) => FunctionCallOutputPayload {
content: format!("unified exec failed: {err}"),
success: Some(false),
},
};
ResponseInputItem::FunctionCallOutput {
call_id,
output: output_payload,
}
}
async fn handle_function_call(
sess: &Session,
turn_context: &TurnContext,
@@ -2192,38 +2109,6 @@ async fn handle_function_call(
)
.await
}
"unified_exec" => {
#[derive(Deserialize)]
struct UnifiedExecArgs {
input: Vec<String>,
#[serde(default)]
session_id: Option<String>,
#[serde(default)]
timeout_ms: Option<u64>,
}
let args = match serde_json::from_str::<UnifiedExecArgs>(&arguments) {
Ok(args) => args,
Err(err) => {
return ResponseInputItem::FunctionCallOutput {
call_id,
output: FunctionCallOutputPayload {
content: format!("failed to parse function arguments: {err}"),
success: Some(false),
},
};
}
};
handle_unified_exec_tool_call(
sess,
call_id,
args.session_id,
args.input,
args.timeout_ms,
)
.await
}
"view_image" => {
#[derive(serde::Deserialize)]
struct SeeImageArgs {
@@ -2827,30 +2712,6 @@ async fn handle_sandbox_error(
}
}
async fn submit_user_input(
cwd: PathBuf,
approval_policy: AskForApproval,
sandbox_policy: SandboxPolicy,
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
sub_id: String,
items: Vec<InputItem>,
) {
sess.record_conversation_items(&[ResponseItem::from(EnvironmentContext::new(
Some(cwd),
Some(approval_policy),
Some(sandbox_policy),
// Shell is not configurable from turn to turn
None,
))])
.await;
if let Err(items) = sess.inject_input(items) {
// no current task, spawn a new one
let task = AgentTask::spawn(Arc::clone(sess), Arc::clone(turn_context), sub_id, items);
sess.set_task(task);
}
}
fn format_exec_output_str(exec_output: &ExecToolCallOutput) -> String {
let ExecToolCallOutput {
aggregated_output, ..

View File

@@ -172,9 +172,6 @@ pub struct Config {
pub use_experimental_streamable_shell_tool: bool,
/// If set to `true`, used only the experimental unified exec tool.
pub use_experimental_unified_exec_tool: bool,
/// Include the `view_image` tool that lets the agent attach a local image path to context.
pub include_view_image_tool: bool,
@@ -264,7 +261,17 @@ pub fn load_config_as_toml(codex_home: &Path) -> std::io::Result<TomlValue> {
}
}
fn set_project_trusted_inner(doc: &mut DocumentMut, project_path: &Path) -> anyhow::Result<()> {
/// Patch `CODEX_HOME/config.toml` project state.
/// Use with caution.
pub fn set_project_trusted(codex_home: &Path, project_path: &Path) -> anyhow::Result<()> {
let config_path = codex_home.join(CONFIG_TOML_FILE);
// Parse existing config if present; otherwise start a new document.
let mut doc = match std::fs::read_to_string(config_path.clone()) {
Ok(s) => s.parse::<DocumentMut>()?,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => DocumentMut::new(),
Err(e) => return Err(e.into()),
};
// Ensure we render a human-friendly structure:
//
// [projects]
@@ -280,26 +287,14 @@ fn set_project_trusted_inner(doc: &mut DocumentMut, project_path: &Path) -> anyh
// Ensure top-level `projects` exists as a non-inline, explicit table. If it
// exists but was previously represented as a non-table (e.g., inline),
// replace it with an explicit table.
let mut created_projects_table = false;
{
let root = doc.as_table_mut();
// If `projects` exists but isn't a standard table (e.g., it's an inline table),
// convert it to an explicit table while preserving existing entries.
let existing_projects = root.get("projects").cloned();
if existing_projects.as_ref().is_none_or(|i| !i.is_table()) {
let mut projects_tbl = toml_edit::Table::new();
projects_tbl.set_implicit(true);
// If there was an existing inline table, migrate its entries to explicit tables.
if let Some(inline_tbl) = existing_projects.as_ref().and_then(|i| i.as_inline_table()) {
for (k, v) in inline_tbl.iter() {
if let Some(inner_tbl) = v.as_inline_table() {
let new_tbl = inner_tbl.clone().into_table();
projects_tbl.insert(k, toml_edit::Item::Table(new_tbl));
}
}
}
root.insert("projects", toml_edit::Item::Table(projects_tbl));
let needs_table = !root.contains_key("projects")
|| root.get("projects").and_then(|i| i.as_table()).is_none();
if needs_table {
root.insert("projects", toml_edit::table());
created_projects_table = true;
}
}
let Some(projects_tbl) = doc["projects"].as_table_mut() else {
@@ -308,6 +303,12 @@ fn set_project_trusted_inner(doc: &mut DocumentMut, project_path: &Path) -> anyh
));
};
// If we created the `projects` table ourselves, keep it implicit so we
// don't render a standalone `[projects]` header.
if created_projects_table {
projects_tbl.set_implicit(true);
}
// Ensure the per-project entry is its own explicit table. If it exists but
// is not a table (e.g., an inline table), replace it with an explicit table.
let needs_proj_table = !projects_tbl.contains_key(project_key.as_str())
@@ -326,21 +327,6 @@ fn set_project_trusted_inner(doc: &mut DocumentMut, project_path: &Path) -> anyh
};
proj_tbl.set_implicit(false);
proj_tbl["trust_level"] = toml_edit::value("trusted");
Ok(())
}
/// Patch `CODEX_HOME/config.toml` project state.
/// Use with caution.
pub fn set_project_trusted(codex_home: &Path, project_path: &Path) -> anyhow::Result<()> {
let config_path = codex_home.join(CONFIG_TOML_FILE);
// Parse existing config if present; otherwise start a new document.
let mut doc = match std::fs::read_to_string(config_path.clone()) {
Ok(s) => s.parse::<DocumentMut>()?,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => DocumentMut::new(),
Err(e) => return Err(e.into()),
};
set_project_trusted_inner(&mut doc, project_path)?;
// ensure codex_home exists
std::fs::create_dir_all(codex_home)?;
@@ -490,7 +476,6 @@ pub struct ConfigToml {
pub experimental_instructions_file: Option<PathBuf>,
pub experimental_use_exec_command_tool: Option<bool>,
pub experimental_use_unified_exec_tool: Option<bool>,
pub projects: Option<HashMap<String, ProjectConfig>>,
@@ -841,9 +826,6 @@ impl Config {
use_experimental_streamable_shell_tool: cfg
.experimental_use_exec_command_tool
.unwrap_or(false),
use_experimental_unified_exec_tool: cfg
.experimental_use_unified_exec_tool
.unwrap_or(true),
include_view_image_tool,
active_profile: active_profile_name,
disable_paste_burst: cfg.disable_paste_burst.unwrap_or(false),
@@ -1219,7 +1201,6 @@ model_verbosity = "high"
tools_web_search_request: false,
preferred_auth_method: AuthMode::ChatGPT,
use_experimental_streamable_shell_tool: false,
use_experimental_unified_exec_tool: true,
include_view_image_tool: true,
active_profile: Some("o3".to_string()),
disable_paste_burst: false,
@@ -1277,7 +1258,6 @@ model_verbosity = "high"
tools_web_search_request: false,
preferred_auth_method: AuthMode::ChatGPT,
use_experimental_streamable_shell_tool: false,
use_experimental_unified_exec_tool: true,
include_view_image_tool: true,
active_profile: Some("gpt3".to_string()),
disable_paste_burst: false,
@@ -1350,7 +1330,6 @@ model_verbosity = "high"
tools_web_search_request: false,
preferred_auth_method: AuthMode::ChatGPT,
use_experimental_streamable_shell_tool: false,
use_experimental_unified_exec_tool: true,
include_view_image_tool: true,
active_profile: Some("zdr".to_string()),
disable_paste_burst: false,
@@ -1409,7 +1388,6 @@ model_verbosity = "high"
tools_web_search_request: false,
preferred_auth_method: AuthMode::ChatGPT,
use_experimental_streamable_shell_tool: false,
use_experimental_unified_exec_tool: true,
include_view_image_tool: true,
active_profile: Some("gpt5".to_string()),
disable_paste_burst: false,
@@ -1422,14 +1400,17 @@ model_verbosity = "high"
#[test]
fn test_set_project_trusted_writes_explicit_tables() -> anyhow::Result<()> {
let project_dir = Path::new("/some/path");
let mut doc = DocumentMut::new();
let codex_home = TempDir::new().unwrap();
let project_dir = TempDir::new().unwrap();
set_project_trusted_inner(&mut doc, project_dir)?;
// Call the function under test
set_project_trusted(codex_home.path(), project_dir.path())?;
let contents = doc.to_string();
// Read back the generated config.toml and assert exact contents
let config_path = codex_home.path().join(CONFIG_TOML_FILE);
let contents = std::fs::read_to_string(&config_path)?;
let raw_path = project_dir.to_string_lossy();
let raw_path = project_dir.path().to_string_lossy();
let path_str = if raw_path.contains('\\') {
format!("'{raw_path}'")
} else {
@@ -1447,10 +1428,12 @@ trust_level = "trusted"
#[test]
fn test_set_project_trusted_converts_inline_to_explicit() -> anyhow::Result<()> {
let project_dir = Path::new("/some/path");
let codex_home = TempDir::new().unwrap();
let project_dir = TempDir::new().unwrap();
// Seed config.toml with an inline project entry under [projects]
let raw_path = project_dir.to_string_lossy();
let config_path = codex_home.path().join(CONFIG_TOML_FILE);
let raw_path = project_dir.path().to_string_lossy();
let path_str = if raw_path.contains('\\') {
format!("'{raw_path}'")
} else {
@@ -1462,12 +1445,13 @@ trust_level = "trusted"
{path_str} = {{ trust_level = "untrusted" }}
"#
);
let mut doc = initial.parse::<DocumentMut>()?;
std::fs::create_dir_all(codex_home.path())?;
std::fs::write(&config_path, initial)?;
// Run the function; it should convert to explicit tables and set trusted
set_project_trusted_inner(&mut doc, project_dir)?;
set_project_trusted(codex_home.path(), project_dir.path())?;
let contents = doc.to_string();
let contents = std::fs::read_to_string(&config_path)?;
// Assert exact output after conversion to explicit table
let expected = format!(
@@ -1481,38 +1465,4 @@ trust_level = "trusted"
Ok(())
}
#[test]
fn test_set_project_trusted_migrates_top_level_inline_projects_preserving_entries()
-> anyhow::Result<()> {
let initial = r#"toplevel = "baz"
projects = { "/Users/mbolin/code/codex4" = { trust_level = "trusted", foo = "bar" } , "/Users/mbolin/code/codex3" = { trust_level = "trusted" } }
model = "foo""#;
let mut doc = initial.parse::<DocumentMut>()?;
// Approve a new directory
let new_project = Path::new("/Users/mbolin/code/codex2");
set_project_trusted_inner(&mut doc, new_project)?;
let contents = doc.to_string();
// Since we created the [projects] table as part of migration, it is kept implicit.
// Expect explicit per-project tables, preserving prior entries and appending the new one.
let expected = r#"toplevel = "baz"
model = "foo"
[projects."/Users/mbolin/code/codex4"]
trust_level = "trusted"
foo = "bar"
[projects."/Users/mbolin/code/codex3"]
trust_level = "trusted"
[projects."/Users/mbolin/code/codex2"]
trust_level = "trusted"
"#;
assert_eq!(contents, expected);
Ok(())
}
}

View File

@@ -150,13 +150,13 @@ impl ConversationManager {
/// caller's `config`). The new conversation will have a fresh id.
pub async fn fork_conversation(
&self,
conversation_history: Vec<ResponseItem>,
num_messages_to_drop: usize,
config: Config,
path: PathBuf,
) -> CodexResult<NewConversation> {
// Compute the prefix up to the cut point.
let history = RolloutRecorder::get_rollout_history(&path).await?;
let history = truncate_after_dropping_last_messages(history, num_messages_to_drop);
let history =
truncate_after_dropping_last_messages(conversation_history, num_messages_to_drop);
// Spawn a new conversation with the computed initial history.
let auth_manager = self.auth_manager.clone();
@@ -171,36 +171,36 @@ impl ConversationManager {
/// Return a prefix of `items` obtained by dropping the last `n` user messages
/// and all items that follow them.
fn truncate_after_dropping_last_messages(history: InitialHistory, n: usize) -> InitialHistory {
fn truncate_after_dropping_last_messages(items: Vec<ResponseItem>, n: usize) -> InitialHistory {
if n == 0 {
return InitialHistory::Forked(history.get_rollout_items());
let rolled: Vec<RolloutItem> = items.into_iter().map(RolloutItem::ResponseItem).collect();
return InitialHistory::Forked(rolled);
}
// Work directly on rollout items, and cut the vector at the nth-from-last user message input.
let items: Vec<RolloutItem> = history.get_rollout_items();
// Find indices of user message inputs in rollout order.
let mut user_positions: Vec<usize> = Vec::new();
for (idx, item) in items.iter().enumerate() {
if let RolloutItem::ResponseItem(ResponseItem::Message { role, .. }) = item
// Walk backwards counting only `user` Message items, find cut index.
let mut count = 0usize;
let mut cut_index = 0usize;
for (idx, item) in items.iter().enumerate().rev() {
if let ResponseItem::Message { role, .. } = item
&& role == "user"
{
user_positions.push(idx);
count += 1;
if count == n {
// Cut everything from this user message to the end.
cut_index = idx;
break;
}
}
}
// If fewer than n user messages exist, treat as empty.
if user_positions.len() < n {
return InitialHistory::New;
}
// Cut strictly before the nth-from-last user message (do not keep the nth itself).
let cut_idx = user_positions[user_positions.len() - n];
let rolled: Vec<RolloutItem> = items.into_iter().take(cut_idx).collect();
if rolled.is_empty() {
if cut_index == 0 {
// No prefix remains after dropping; start a new conversation.
InitialHistory::New
} else {
let rolled: Vec<RolloutItem> = items
.into_iter()
.take(cut_index)
.map(RolloutItem::ResponseItem)
.collect();
InitialHistory::Forked(rolled)
}
}
@@ -256,13 +256,7 @@ mod tests {
assistant_msg("a4"),
];
// Wrap as InitialHistory::Forked with response items only.
let initial: Vec<RolloutItem> = items
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.collect();
let truncated = truncate_after_dropping_last_messages(InitialHistory::Forked(initial), 1);
let truncated = truncate_after_dropping_last_messages(items.clone(), 1);
let got_items = truncated.get_rollout_items();
let expected_items = vec![
RolloutItem::ResponseItem(items[0].clone()),
@@ -274,12 +268,7 @@ mod tests {
serde_json::to_value(&expected_items).unwrap()
);
let initial2: Vec<RolloutItem> = items
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.collect();
let truncated2 = truncate_after_dropping_last_messages(InitialHistory::Forked(initial2), 2);
let truncated2 = truncate_after_dropping_last_messages(items, 2);
assert!(matches!(truncated2, InitialHistory::New));
}
}

View File

@@ -26,7 +26,6 @@ pub(crate) struct EnvironmentContext {
pub approval_policy: Option<AskForApproval>,
pub sandbox_mode: Option<SandboxMode>,
pub network_access: Option<NetworkAccess>,
pub writable_roots: Option<Vec<PathBuf>>,
pub shell: Option<Shell>,
}
@@ -58,16 +57,6 @@ impl EnvironmentContext {
}
None => None,
},
writable_roots: match sandbox_policy {
Some(SandboxPolicy::WorkspaceWrite { writable_roots, .. }) => {
if writable_roots.is_empty() {
None
} else {
Some(writable_roots.clone())
}
}
_ => None,
},
shell,
}
}
@@ -83,7 +72,6 @@ impl EnvironmentContext {
/// <cwd>...</cwd>
/// <approval_policy>...</approval_policy>
/// <sandbox_mode>...</sandbox_mode>
/// <writable_roots>...</writable_roots>
/// <network_access>...</network_access>
/// <shell>...</shell>
/// </environment_context>
@@ -106,16 +94,6 @@ impl EnvironmentContext {
" <network_access>{network_access}</network_access>"
));
}
if let Some(writable_roots) = self.writable_roots {
lines.push(" <writable_roots>".to_string());
for writable_root in writable_roots {
lines.push(format!(
" <root>{}</root>",
writable_root.to_string_lossy()
));
}
lines.push(" </writable_roots>".to_string());
}
if let Some(shell) = self.shell
&& let Some(shell_name) = shell.name()
{
@@ -137,77 +115,3 @@ impl From<EnvironmentContext> for ResponseItem {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
fn workspace_write_policy(writable_roots: Vec<&str>, network_access: bool) -> SandboxPolicy {
SandboxPolicy::WorkspaceWrite {
writable_roots: writable_roots.into_iter().map(PathBuf::from).collect(),
network_access,
exclude_tmpdir_env_var: false,
exclude_slash_tmp: false,
}
}
#[test]
fn serialize_workspace_write_environment_context() {
let context = EnvironmentContext::new(
Some(PathBuf::from("/repo")),
Some(AskForApproval::OnRequest),
Some(workspace_write_policy(vec!["/repo", "/tmp"], false)),
None,
);
let expected = r#"<environment_context>
<cwd>/repo</cwd>
<approval_policy>on-request</approval_policy>
<sandbox_mode>workspace-write</sandbox_mode>
<network_access>restricted</network_access>
<writable_roots>
<root>/repo</root>
<root>/tmp</root>
</writable_roots>
</environment_context>"#;
assert_eq!(context.serialize_to_xml(), expected);
}
#[test]
fn serialize_read_only_environment_context() {
let context = EnvironmentContext::new(
None,
Some(AskForApproval::Never),
Some(SandboxPolicy::ReadOnly),
None,
);
let expected = r#"<environment_context>
<approval_policy>never</approval_policy>
<sandbox_mode>read-only</sandbox_mode>
<network_access>restricted</network_access>
</environment_context>"#;
assert_eq!(context.serialize_to_xml(), expected);
}
#[test]
fn serialize_full_access_environment_context() {
let context = EnvironmentContext::new(
None,
Some(AskForApproval::OnFailure),
Some(SandboxPolicy::DangerFullAccess),
None,
);
let expected = r#"<environment_context>
<approval_policy>on-failure</approval_policy>
<sandbox_mode>danger-full-access</sandbox_mode>
<network_access>enabled</network_access>
</environment_context>"#;
assert_eq!(context.serialize_to_xml(), expected);
}
}

View File

@@ -24,9 +24,6 @@ pub(crate) struct ExecCommandSession {
/// JoinHandle for the child wait task.
wait_handle: StdMutex<Option<JoinHandle<()>>>,
/// Tracks whether the underlying process has exited.
exit_status: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
impl ExecCommandSession {
@@ -37,7 +34,6 @@ impl ExecCommandSession {
reader_handle: JoinHandle<()>,
writer_handle: JoinHandle<()>,
wait_handle: JoinHandle<()>,
exit_status: std::sync::Arc<std::sync::atomic::AtomicBool>,
) -> Self {
Self {
writer_tx,
@@ -46,7 +42,6 @@ impl ExecCommandSession {
reader_handle: StdMutex::new(Some(reader_handle)),
writer_handle: StdMutex::new(Some(writer_handle)),
wait_handle: StdMutex::new(Some(wait_handle)),
exit_status,
}
}
@@ -57,10 +52,6 @@ impl ExecCommandSession {
pub(crate) fn output_receiver(&self) -> broadcast::Receiver<Vec<u8>> {
self.output_tx.subscribe()
}
pub(crate) fn has_exited(&self) -> bool {
self.exit_status.load(std::sync::atomic::Ordering::SeqCst)
}
}
impl Drop for ExecCommandSession {

View File

@@ -6,7 +6,6 @@ mod session_manager;
pub use exec_command_params::ExecCommandParams;
pub use exec_command_params::WriteStdinParams;
pub(crate) use exec_command_session::ExecCommandSession;
pub use responses_api::EXEC_COMMAND_TOOL_NAME;
pub use responses_api::WRITE_STDIN_TOOL_NAME;
pub use responses_api::create_exec_command_tool_for_responses_api;

View File

@@ -3,7 +3,6 @@ use std::io::ErrorKind;
use std::io::Read;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU32;
use portable_pty::CommandBuilder;
@@ -20,7 +19,6 @@ use crate::exec_command::exec_command_params::ExecCommandParams;
use crate::exec_command::exec_command_params::WriteStdinParams;
use crate::exec_command::exec_command_session::ExecCommandSession;
use crate::exec_command::session_id::SessionId;
use crate::truncate::truncate_middle;
use codex_protocol::models::FunctionCallOutputPayload;
#[derive(Debug, Default)]
@@ -329,14 +327,11 @@ async fn create_exec_command_session(
// Keep the child alive until it exits, then signal exit code.
let (exit_tx, exit_rx) = oneshot::channel::<i32>();
let exit_status = Arc::new(AtomicBool::new(false));
let wait_exit_status = exit_status.clone();
let wait_handle = tokio::task::spawn_blocking(move || {
let code = match child.wait() {
Ok(status) => status.exit_code() as i32,
Err(_) => -1,
};
wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst);
let _ = exit_tx.send(code);
});
@@ -348,11 +343,116 @@ async fn create_exec_command_session(
reader_handle,
writer_handle,
wait_handle,
exit_status,
);
Ok((session, exit_rx))
}
/// Truncate the middle of a UTF-8 string to at most `max_bytes` bytes,
/// preserving the beginning and the end. Returns the possibly truncated
/// string and `Some(original_token_count)` (estimated at 4 bytes/token)
/// if truncation occurred; otherwise returns the original string and `None`.
fn truncate_middle(s: &str, max_bytes: usize) -> (String, Option<u64>) {
// No truncation needed
if s.len() <= max_bytes {
return (s.to_string(), None);
}
let est_tokens = (s.len() as u64).div_ceil(4);
if max_bytes == 0 {
// Cannot keep any content; still return a full marker (never truncated).
return (format!("{est_tokens} tokens truncated…"), Some(est_tokens));
}
// Helper to truncate a string to a given byte length on a char boundary.
fn truncate_on_boundary(input: &str, max_len: usize) -> &str {
if input.len() <= max_len {
return input;
}
let mut end = max_len;
while end > 0 && !input.is_char_boundary(end) {
end -= 1;
}
&input[..end]
}
// Given a left/right budget, prefer newline boundaries; otherwise fall back
// to UTF-8 char boundaries.
fn pick_prefix_end(s: &str, left_budget: usize) -> usize {
if let Some(head) = s.get(..left_budget)
&& let Some(i) = head.rfind('\n')
{
return i + 1; // keep the newline so suffix starts on a fresh line
}
truncate_on_boundary(s, left_budget).len()
}
fn pick_suffix_start(s: &str, right_budget: usize) -> usize {
let start_tail = s.len().saturating_sub(right_budget);
if let Some(tail) = s.get(start_tail..)
&& let Some(i) = tail.find('\n')
{
return start_tail + i + 1; // start after newline
}
// Fall back to a char boundary at or after start_tail.
let mut idx = start_tail.min(s.len());
while idx < s.len() && !s.is_char_boundary(idx) {
idx += 1;
}
idx
}
// Refine marker length and budgets until stable. Marker is never truncated.
let mut guess_tokens = est_tokens; // worst-case: everything truncated
for _ in 0..4 {
let marker = format!("{guess_tokens} tokens truncated…");
let marker_len = marker.len();
let keep_budget = max_bytes.saturating_sub(marker_len);
if keep_budget == 0 {
// No room for any content within the cap; return a full, untruncated marker
// that reflects the entire truncated content.
return (format!("{est_tokens} tokens truncated…"), Some(est_tokens));
}
let left_budget = keep_budget / 2;
let right_budget = keep_budget - left_budget;
let prefix_end = pick_prefix_end(s, left_budget);
let mut suffix_start = pick_suffix_start(s, right_budget);
if suffix_start < prefix_end {
suffix_start = prefix_end;
}
let kept_content_bytes = prefix_end + (s.len() - suffix_start);
let truncated_content_bytes = s.len().saturating_sub(kept_content_bytes);
let new_tokens = (truncated_content_bytes as u64).div_ceil(4);
if new_tokens == guess_tokens {
let mut out = String::with_capacity(marker_len + kept_content_bytes + 1);
out.push_str(&s[..prefix_end]);
out.push_str(&marker);
// Place marker on its own line for symmetry when we keep line boundaries.
out.push('\n');
out.push_str(&s[suffix_start..]);
return (out, Some(est_tokens));
}
guess_tokens = new_tokens;
}
// Fallback: use last guess to build output.
let marker = format!("{guess_tokens} tokens truncated…");
let marker_len = marker.len();
let keep_budget = max_bytes.saturating_sub(marker_len);
if keep_budget == 0 {
return (format!("{est_tokens} tokens truncated…"), Some(est_tokens));
}
let left_budget = keep_budget / 2;
let right_budget = keep_budget - left_budget;
let prefix_end = pick_prefix_end(s, left_budget);
let suffix_start = pick_suffix_start(s, right_budget);
let mut out = String::with_capacity(marker_len + prefix_end + (s.len() - suffix_start) + 1);
out.push_str(&s[..prefix_end]);
out.push_str(&marker);
out.push('\n');
out.push_str(&s[suffix_start..]);
(out, Some(est_tokens))
}
#[cfg(test)]
mod tests {
use super::*;
@@ -516,4 +616,50 @@ Output:
abc"#;
assert_eq!(expected, text);
}
#[test]
fn truncate_middle_no_newlines_fallback() {
// A long string with no newlines that exceeds the cap.
let s = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
let max_bytes = 16; // force truncation
let (out, original) = truncate_middle(s, max_bytes);
// For very small caps, we return the full, untruncated marker,
// even if it exceeds the cap.
assert_eq!(out, "…16 tokens truncated…");
// Original string length is 62 bytes => ceil(62/4) = 16 tokens.
assert_eq!(original, Some(16));
}
#[test]
fn truncate_middle_prefers_newline_boundaries() {
// Build a multi-line string of 20 numbered lines (each "NNN\n").
let mut s = String::new();
for i in 1..=20 {
s.push_str(&format!("{i:03}\n"));
}
// Total length: 20 lines * 4 bytes per line = 80 bytes.
assert_eq!(s.len(), 80);
// Choose a cap that forces truncation while leaving room for
// a few lines on each side after accounting for the marker.
let max_bytes = 64;
// Expect exact output: first 4 lines, marker, last 4 lines, and correct token estimate (80/4 = 20).
assert_eq!(
truncate_middle(&s, max_bytes),
(
r#"001
002
003
004
…12 tokens truncated…
017
018
019
020
"#
.to_string(),
Some(20)
)
);
}
}

View File

@@ -35,8 +35,6 @@ mod mcp_tool_call;
mod message_history;
mod model_provider_info;
pub mod parse_command;
mod truncate;
mod unified_exec;
mod user_instructions;
pub use model_provider_info::BUILT_IN_OSS_MODEL_PROVIDER_ID;
pub use model_provider_info::ModelProviderInfo;

View File

@@ -17,7 +17,7 @@ use anyhow::Result;
use anyhow::anyhow;
use codex_mcp_client::McpClient;
use mcp_types::ClientCapabilities;
use mcp_types::Implementation;
use mcp_types::McpClientInfo;
use mcp_types::Tool;
use serde_json::json;
@@ -159,14 +159,10 @@ impl McpConnectionManager {
// indicates this should be an empty object.
elicitation: Some(json!({})),
},
client_info: Implementation {
client_info: McpClientInfo {
name: "codex-mcp-client".to_owned(),
version: env!("CARGO_PKG_VERSION").to_owned(),
title: Some("Codex".into()),
// This field is used by Codex when it is an MCP
// server: it should not be used when Codex is
// an MCP client.
user_agent: None,
},
protocol_version: mcp_types::MCP_SCHEMA_VERSION.to_owned(),
};

View File

@@ -8,6 +8,7 @@ use std::collections::HashMap;
use crate::model_family::ModelFamily;
use crate::plan_tool::PLAN_TOOL;
use crate::protocol::AskForApproval;
use crate::protocol::SandboxPolicy;
use crate::tool_apply_patch::ApplyPatchToolType;
use crate::tool_apply_patch::create_apply_patch_freeform_tool;
use crate::tool_apply_patch::create_apply_patch_json_tool;
@@ -57,7 +58,7 @@ pub(crate) enum OpenAiTool {
#[derive(Debug, Clone)]
pub enum ConfigShellToolType {
DefaultShell,
ShellWithRequest,
ShellWithRequest { sandbox_policy: SandboxPolicy },
LocalShell,
StreamableShell,
}
@@ -69,18 +70,17 @@ pub(crate) struct ToolsConfig {
pub apply_patch_tool_type: Option<ApplyPatchToolType>,
pub web_search_request: bool,
pub include_view_image_tool: bool,
pub experimental_unified_exec_tool: bool,
}
pub(crate) struct ToolsConfigParams<'a> {
pub(crate) model_family: &'a ModelFamily,
pub(crate) approval_policy: AskForApproval,
pub(crate) sandbox_policy: SandboxPolicy,
pub(crate) include_plan_tool: bool,
pub(crate) include_apply_patch_tool: bool,
pub(crate) include_web_search_request: bool,
pub(crate) use_streamable_shell_tool: bool,
pub(crate) include_view_image_tool: bool,
pub(crate) experimental_unified_exec_tool: bool,
}
impl ToolsConfig {
@@ -88,12 +88,12 @@ impl ToolsConfig {
let ToolsConfigParams {
model_family,
approval_policy,
sandbox_policy,
include_plan_tool,
include_apply_patch_tool,
include_web_search_request,
use_streamable_shell_tool,
include_view_image_tool,
experimental_unified_exec_tool,
} = params;
let mut shell_type = if *use_streamable_shell_tool {
ConfigShellToolType::StreamableShell
@@ -103,7 +103,9 @@ impl ToolsConfig {
ConfigShellToolType::DefaultShell
};
if matches!(approval_policy, AskForApproval::OnRequest) && !use_streamable_shell_tool {
shell_type = ConfigShellToolType::ShellWithRequest;
shell_type = ConfigShellToolType::ShellWithRequest {
sandbox_policy: sandbox_policy.clone(),
}
}
let apply_patch_tool_type = match model_family.apply_patch_tool_type {
@@ -124,7 +126,6 @@ impl ToolsConfig {
apply_patch_tool_type,
web_search_request: *include_web_search_request,
include_view_image_tool: *include_view_image_tool,
experimental_unified_exec_tool: *experimental_unified_exec_tool,
}
}
}
@@ -199,56 +200,7 @@ fn create_shell_tool() -> OpenAiTool {
})
}
fn create_unified_exec_tool() -> OpenAiTool {
let mut properties = BTreeMap::new();
properties.insert(
"input".to_string(),
JsonSchema::Array {
items: Box::new(JsonSchema::String { description: None }),
description: Some(
"When no session_id is provided, treat the array as the command and arguments \
to launch. When session_id is set, concatenate the strings (in order) and write \
them to the session's stdin."
.to_string(),
),
},
);
properties.insert(
"session_id".to_string(),
JsonSchema::String {
description: Some(
"Identifier for an existing interactive session. If omitted, a new command \
is spawned."
.to_string(),
),
},
);
properties.insert(
"timeout_ms".to_string(),
JsonSchema::Number {
description: Some(
"Maximum time in milliseconds to wait for output after writing the input."
.to_string(),
),
},
);
OpenAiTool::Function(ResponsesApiTool {
name: "unified_exec".to_string(),
description:
"Runs a command in a PTY. Provide a session_id to reuse an existing interactive session.".to_string(),
strict: false,
parameters: JsonSchema::Object {
properties,
required: Some(vec!["input".to_string()]),
additional_properties: Some(false),
},
})
}
const SHELL_TOOL_DESCRIPTION: &str = r#"Runs a shell command and returns its output"#;
fn create_shell_tool_for_sandbox() -> OpenAiTool {
fn create_shell_tool_for_sandbox(sandbox_policy: &SandboxPolicy) -> OpenAiTool {
let mut properties = BTreeMap::new();
properties.insert(
"command".to_string(),
@@ -260,29 +212,82 @@ fn create_shell_tool_for_sandbox() -> OpenAiTool {
properties.insert(
"workdir".to_string(),
JsonSchema::String {
description: Some("Working directory to execute the command in.".to_string()),
description: Some("The working directory to execute the command in".to_string()),
},
);
properties.insert(
"timeout_ms".to_string(),
JsonSchema::Number {
description: Some("Timeout for the command in milliseconds.".to_string()),
},
);
properties.insert(
"with_escalated_permissions".to_string(),
JsonSchema::Boolean {
description: Some("Request escalated permissions, only for when a command would otherwise be blocked by the sandbox.".to_string()),
},
);
properties.insert(
"justification".to_string(),
JsonSchema::String {
description: Some("Required if and only if with_escalated_permissions == true. One sentence explaining why escalation is needed (e.g., write outside CWD, network fetch, git commit).".to_string()),
description: Some("The timeout for the command in milliseconds".to_string()),
},
);
let description = SHELL_TOOL_DESCRIPTION.to_string();
if matches!(sandbox_policy, SandboxPolicy::WorkspaceWrite { .. }) {
properties.insert(
"with_escalated_permissions".to_string(),
JsonSchema::Boolean {
description: Some("Whether to request escalated permissions. Set to true if command needs to be run without sandbox restrictions".to_string()),
},
);
properties.insert(
"justification".to_string(),
JsonSchema::String {
description: Some("Only set if with_escalated_permissions is true. 1-sentence explanation of why we want to run this command.".to_string()),
},
);
}
let description = match sandbox_policy {
SandboxPolicy::WorkspaceWrite {
network_access,
writable_roots,
..
} => {
format!(
r#"
The shell tool is used to execute shell commands.
- When invoking the shell tool, your call will be running in a sandbox, and some shell commands will require escalated privileges:
- Types of actions that require escalated privileges:
- Writing files other than those in the writable roots
- writable roots:
{}{}
- Examples of commands that require escalated privileges:
- git commit
- npm install or pnpm install
- cargo build
- cargo test
- When invoking a command that will require escalated privileges:
- Provide the with_escalated_permissions parameter with the boolean value true
- Include a short, 1 sentence explanation for why we need to run with_escalated_permissions in the justification parameter."#,
writable_roots.iter().map(|wr| format!(" - {}", wr.to_string_lossy())).collect::<Vec<String>>().join("\n"),
if !network_access {
"\n - Commands that require network access\n"
} else {
""
}
)
}
SandboxPolicy::DangerFullAccess => {
"Runs a shell command and returns its output.".to_string()
}
SandboxPolicy::ReadOnly => {
r#"
The shell tool is used to execute shell commands.
- When invoking the shell tool, your call will be running in a sandbox, and some shell commands (including apply_patch) will require escalated permissions:
- Types of actions that require escalated privileges:
- Writing files
- Applying patches
- Examples of commands that require escalated privileges:
- apply_patch
- git commit
- npm install or pnpm install
- cargo build
- cargo test
- When invoking a command that will require escalated privileges:
- Provide the with_escalated_permissions parameter with the boolean value true
- Include a short, 1 sentence explanation for why we need to run with_escalated_permissions in the justification parameter"#.to_string()
}
};
OpenAiTool::Function(ResponsesApiTool {
name: "shell".to_string(),
@@ -295,6 +300,7 @@ fn create_shell_tool_for_sandbox() -> OpenAiTool {
},
})
}
fn create_view_image_tool() -> OpenAiTool {
// Support only local filesystem path.
let mut properties = BTreeMap::new();
@@ -528,27 +534,23 @@ pub(crate) fn get_openai_tools(
) -> Vec<OpenAiTool> {
let mut tools: Vec<OpenAiTool> = Vec::new();
if config.experimental_unified_exec_tool {
tools.push(create_unified_exec_tool());
} else {
match &config.shell_type {
ConfigShellToolType::DefaultShell => {
tools.push(create_shell_tool());
}
ConfigShellToolType::ShellWithRequest => {
tools.push(create_shell_tool_for_sandbox());
}
ConfigShellToolType::LocalShell => {
tools.push(OpenAiTool::LocalShell {});
}
ConfigShellToolType::StreamableShell => {
tools.push(OpenAiTool::Function(
crate::exec_command::create_exec_command_tool_for_responses_api(),
));
tools.push(OpenAiTool::Function(
crate::exec_command::create_write_stdin_tool_for_responses_api(),
));
}
match &config.shell_type {
ConfigShellToolType::DefaultShell => {
tools.push(create_shell_tool());
}
ConfigShellToolType::ShellWithRequest { sandbox_policy } => {
tools.push(create_shell_tool_for_sandbox(sandbox_policy));
}
ConfigShellToolType::LocalShell => {
tools.push(OpenAiTool::LocalShell {});
}
ConfigShellToolType::StreamableShell => {
tools.push(OpenAiTool::Function(
crate::exec_command::create_exec_command_tool_for_responses_api(),
));
tools.push(OpenAiTool::Function(
crate::exec_command::create_write_stdin_tool_for_responses_api(),
));
}
}
@@ -575,8 +577,10 @@ pub(crate) fn get_openai_tools(
if config.include_view_image_tool {
tools.push(create_view_image_tool());
}
if let Some(mcp_tools) = mcp_tools {
// Ensure deterministic ordering to maximize prompt cache hits.
// HashMap iteration order is non-deterministic, so sort by fully-qualified tool name.
let mut entries: Vec<(String, mcp_types::Tool)> = mcp_tools.into_iter().collect();
entries.sort_by(|a, b| a.0.cmp(&b.0));
@@ -632,18 +636,18 @@ mod tests {
let config = ToolsConfig::new(&ToolsConfigParams {
model_family: &model_family,
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::ReadOnly,
include_plan_tool: true,
include_apply_patch_tool: false,
include_web_search_request: true,
use_streamable_shell_tool: false,
include_view_image_tool: true,
experimental_unified_exec_tool: true,
});
let tools = get_openai_tools(&config, Some(HashMap::new()));
assert_eq_tool_names(
&tools,
&["unified_exec", "update_plan", "web_search", "view_image"],
&["local_shell", "update_plan", "web_search", "view_image"],
);
}
@@ -653,18 +657,18 @@ mod tests {
let config = ToolsConfig::new(&ToolsConfigParams {
model_family: &model_family,
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::ReadOnly,
include_plan_tool: true,
include_apply_patch_tool: false,
include_web_search_request: true,
use_streamable_shell_tool: false,
include_view_image_tool: true,
experimental_unified_exec_tool: true,
});
let tools = get_openai_tools(&config, Some(HashMap::new()));
assert_eq_tool_names(
&tools,
&["unified_exec", "update_plan", "web_search", "view_image"],
&["shell", "update_plan", "web_search", "view_image"],
);
}
@@ -674,12 +678,12 @@ mod tests {
let config = ToolsConfig::new(&ToolsConfigParams {
model_family: &model_family,
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::ReadOnly,
include_plan_tool: false,
include_apply_patch_tool: false,
include_web_search_request: true,
use_streamable_shell_tool: false,
include_view_image_tool: true,
experimental_unified_exec_tool: true,
});
let tools = get_openai_tools(
&config,
@@ -722,7 +726,7 @@ mod tests {
assert_eq_tool_names(
&tools,
&[
"unified_exec",
"shell",
"web_search",
"view_image",
"test_server/do_something_cool",
@@ -779,12 +783,12 @@ mod tests {
let config = ToolsConfig::new(&ToolsConfigParams {
model_family: &model_family,
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::ReadOnly,
include_plan_tool: false,
include_apply_patch_tool: false,
include_web_search_request: false,
use_streamable_shell_tool: false,
include_view_image_tool: true,
experimental_unified_exec_tool: true,
});
// Intentionally construct a map with keys that would sort alphabetically.
@@ -837,11 +841,11 @@ mod tests {
]);
let tools = get_openai_tools(&config, Some(tools_map));
// Expect unified_exec first, followed by MCP tools sorted by fully-qualified name.
// Expect shell first, followed by MCP tools sorted by fully-qualified name.
assert_eq_tool_names(
&tools,
&[
"unified_exec",
"shell",
"view_image",
"test_server/cool",
"test_server/do",
@@ -856,12 +860,12 @@ mod tests {
let config = ToolsConfig::new(&ToolsConfigParams {
model_family: &model_family,
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::ReadOnly,
include_plan_tool: false,
include_apply_patch_tool: false,
include_web_search_request: true,
use_streamable_shell_tool: false,
include_view_image_tool: true,
experimental_unified_exec_tool: true,
});
let tools = get_openai_tools(
@@ -889,7 +893,7 @@ mod tests {
assert_eq_tool_names(
&tools,
&["unified_exec", "web_search", "view_image", "dash/search"],
&["shell", "web_search", "view_image", "dash/search"],
);
assert_eq!(
@@ -918,12 +922,12 @@ mod tests {
let config = ToolsConfig::new(&ToolsConfigParams {
model_family: &model_family,
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::ReadOnly,
include_plan_tool: false,
include_apply_patch_tool: false,
include_web_search_request: true,
use_streamable_shell_tool: false,
include_view_image_tool: true,
experimental_unified_exec_tool: true,
});
let tools = get_openai_tools(
@@ -949,7 +953,7 @@ mod tests {
assert_eq_tool_names(
&tools,
&["unified_exec", "web_search", "view_image", "dash/paginate"],
&["shell", "web_search", "view_image", "dash/paginate"],
);
assert_eq!(
tools[3],
@@ -975,12 +979,12 @@ mod tests {
let config = ToolsConfig::new(&ToolsConfigParams {
model_family: &model_family,
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::ReadOnly,
include_plan_tool: false,
include_apply_patch_tool: false,
include_web_search_request: true,
use_streamable_shell_tool: false,
include_view_image_tool: true,
experimental_unified_exec_tool: true,
});
let tools = get_openai_tools(
@@ -1004,10 +1008,7 @@ mod tests {
)])),
);
assert_eq_tool_names(
&tools,
&["unified_exec", "web_search", "view_image", "dash/tags"],
);
assert_eq_tool_names(&tools, &["shell", "web_search", "view_image", "dash/tags"]);
assert_eq!(
tools[3],
OpenAiTool::Function(ResponsesApiTool {
@@ -1035,12 +1036,12 @@ mod tests {
let config = ToolsConfig::new(&ToolsConfigParams {
model_family: &model_family,
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::ReadOnly,
include_plan_tool: false,
include_apply_patch_tool: false,
include_web_search_request: true,
use_streamable_shell_tool: false,
include_view_image_tool: true,
experimental_unified_exec_tool: true,
});
let tools = get_openai_tools(
@@ -1064,10 +1065,7 @@ mod tests {
)])),
);
assert_eq_tool_names(
&tools,
&["unified_exec", "web_search", "view_image", "dash/value"],
);
assert_eq_tool_names(&tools, &["shell", "web_search", "view_image", "dash/value"]);
assert_eq!(
tools[3],
OpenAiTool::Function(ResponsesApiTool {
@@ -1088,7 +1086,13 @@ mod tests {
#[test]
fn test_shell_tool_for_sandbox_workspace_write() {
let tool = super::create_shell_tool_for_sandbox();
let sandbox_policy = SandboxPolicy::WorkspaceWrite {
writable_roots: vec!["workspace".into()],
network_access: false,
exclude_tmpdir_env_var: false,
exclude_slash_tmp: false,
};
let tool = super::create_shell_tool_for_sandbox(&sandbox_policy);
let OpenAiTool::Function(ResponsesApiTool {
description, name, ..
}) = &tool
@@ -1097,13 +1101,29 @@ mod tests {
};
assert_eq!(name, "shell");
let expected = super::SHELL_TOOL_DESCRIPTION;
let expected = r#"
The shell tool is used to execute shell commands.
- When invoking the shell tool, your call will be running in a sandbox, and some shell commands will require escalated privileges:
- Types of actions that require escalated privileges:
- Writing files other than those in the writable roots
- writable roots:
- workspace
- Commands that require network access
- Examples of commands that require escalated privileges:
- git commit
- npm install or pnpm install
- cargo build
- cargo test
- When invoking a command that will require escalated privileges:
- Provide the with_escalated_permissions parameter with the boolean value true
- Include a short, 1 sentence explanation for why we need to run with_escalated_permissions in the justification parameter."#;
assert_eq!(description, expected);
}
#[test]
fn test_shell_tool_for_sandbox_readonly() {
let tool = super::create_shell_tool_for_sandbox();
let tool = super::create_shell_tool_for_sandbox(&SandboxPolicy::ReadOnly);
let OpenAiTool::Function(ResponsesApiTool {
description, name, ..
}) = &tool
@@ -1112,13 +1132,27 @@ mod tests {
};
assert_eq!(name, "shell");
let expected = super::SHELL_TOOL_DESCRIPTION;
let expected = r#"
The shell tool is used to execute shell commands.
- When invoking the shell tool, your call will be running in a sandbox, and some shell commands (including apply_patch) will require escalated permissions:
- Types of actions that require escalated privileges:
- Writing files
- Applying patches
- Examples of commands that require escalated privileges:
- apply_patch
- git commit
- npm install or pnpm install
- cargo build
- cargo test
- When invoking a command that will require escalated privileges:
- Provide the with_escalated_permissions parameter with the boolean value true
- Include a short, 1 sentence explanation for why we need to run with_escalated_permissions in the justification parameter"#;
assert_eq!(description, expected);
}
#[test]
fn test_shell_tool_for_sandbox_danger_full_access() {
let tool = super::create_shell_tool_for_sandbox();
let tool = super::create_shell_tool_for_sandbox(&SandboxPolicy::DangerFullAccess);
let OpenAiTool::Function(ResponsesApiTool {
description, name, ..
}) = &tool
@@ -1127,7 +1161,6 @@ mod tests {
};
assert_eq!(name, "shell");
let expected = super::SHELL_TOOL_DESCRIPTION;
assert_eq!(description, expected);
assert_eq!(description, "Runs a shell command and returns its output.");
}
}

View File

@@ -65,6 +65,6 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::PlanUpdate(_)
| EventMsg::TurnAborted(_)
| EventMsg::ShutdownComplete
| EventMsg::ConversationPath(_) => false,
| EventMsg::ConversationHistory(_) => false,
}
}

View File

@@ -77,13 +77,7 @@ pub enum RolloutRecorderParams {
enum RolloutCmd {
AddItems(Vec<RolloutItem>),
/// Ensure all prior writes are processed; respond when flushed.
Flush {
ack: oneshot::Sender<()>,
},
Shutdown {
ack: oneshot::Sender<()>,
},
Shutdown { ack: oneshot::Sender<()> },
}
impl RolloutRecorderParams {
@@ -191,17 +185,6 @@ impl RolloutRecorder {
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
}
/// Flush all queued writes and wait until they are committed by the writer task.
pub async fn flush(&self) -> std::io::Result<()> {
let (tx, rx) = oneshot::channel();
self.tx
.send(RolloutCmd::Flush { ack: tx })
.await
.map_err(|e| IoError::other(format!("failed to queue rollout flush: {e}")))?;
rx.await
.map_err(|e| IoError::other(format!("failed waiting for rollout flush: {e}")))
}
pub(crate) async fn get_rollout_history(path: &Path) -> std::io::Result<InitialHistory> {
info!("Resuming rollout from {path:?}");
tracing::error!("Resuming rollout from {path:?}");
@@ -228,11 +211,11 @@ impl RolloutRecorder {
match serde_json::from_value::<RolloutLine>(v.clone()) {
Ok(rollout_line) => match rollout_line.item {
RolloutItem::SessionMeta(session_meta_line) => {
// Use the FIRST SessionMeta encountered in the file as the canonical
// conversation id and main session information. Keep all items intact.
if conversation_id.is_none() {
conversation_id = Some(session_meta_line.meta.id);
}
tracing::error!(
"Parsed conversation ID from rollout file: {:?}",
session_meta_line.meta.id
);
conversation_id = Some(session_meta_line.meta.id);
items.push(RolloutItem::SessionMeta(session_meta_line));
}
RolloutItem::ResponseItem(item) => {
@@ -268,10 +251,6 @@ impl RolloutRecorder {
}))
}
pub(crate) fn get_rollout_path(&self) -> PathBuf {
self.rollout_path.clone()
}
pub async fn shutdown(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await {
@@ -372,14 +351,6 @@ async fn rollout_writer(
}
}
}
RolloutCmd::Flush { ack } => {
// Ensure underlying file is flushed and then ack.
if let Err(e) = writer.file.flush().await {
let _ = ack.send(());
return Err(e);
}
let _ = ack.send(());
}
RolloutCmd::Shutdown { ack } => {
let _ = ack.send(());
}

View File

@@ -1,180 +0,0 @@
//! Utilities for truncating large chunks of output while preserving a prefix
//! and suffix on UTF-8 boundaries.
/// Truncate the middle of a UTF-8 string to at most `max_bytes` bytes,
/// preserving the beginning and the end. Returns the possibly truncated
/// string and `Some(original_token_count)` (estimated at 4 bytes/token)
/// if truncation occurred; otherwise returns the original string and `None`.
pub(crate) fn truncate_middle(s: &str, max_bytes: usize) -> (String, Option<u64>) {
if s.len() <= max_bytes {
return (s.to_string(), None);
}
let est_tokens = (s.len() as u64).div_ceil(4);
if max_bytes == 0 {
return (format!("{est_tokens} tokens truncated…"), Some(est_tokens));
}
fn truncate_on_boundary(input: &str, max_len: usize) -> &str {
if input.len() <= max_len {
return input;
}
let mut end = max_len;
while end > 0 && !input.is_char_boundary(end) {
end -= 1;
}
&input[..end]
}
fn pick_prefix_end(s: &str, left_budget: usize) -> usize {
if let Some(head) = s.get(..left_budget)
&& let Some(i) = head.rfind('\n')
{
return i + 1;
}
truncate_on_boundary(s, left_budget).len()
}
fn pick_suffix_start(s: &str, right_budget: usize) -> usize {
let start_tail = s.len().saturating_sub(right_budget);
if let Some(tail) = s.get(start_tail..)
&& let Some(i) = tail.find('\n')
{
return start_tail + i + 1;
}
let mut idx = start_tail.min(s.len());
while idx < s.len() && !s.is_char_boundary(idx) {
idx += 1;
}
idx
}
let mut guess_tokens = est_tokens;
for _ in 0..4 {
let marker = format!("{guess_tokens} tokens truncated…");
let marker_len = marker.len();
let keep_budget = max_bytes.saturating_sub(marker_len);
if keep_budget == 0 {
return (format!("{est_tokens} tokens truncated…"), Some(est_tokens));
}
let left_budget = keep_budget / 2;
let right_budget = keep_budget - left_budget;
let prefix_end = pick_prefix_end(s, left_budget);
let mut suffix_start = pick_suffix_start(s, right_budget);
if suffix_start < prefix_end {
suffix_start = prefix_end;
}
let kept_content_bytes = prefix_end + (s.len() - suffix_start);
let truncated_content_bytes = s.len().saturating_sub(kept_content_bytes);
let new_tokens = (truncated_content_bytes as u64).div_ceil(4);
if new_tokens == guess_tokens {
let mut out = String::with_capacity(marker_len + kept_content_bytes + 1);
out.push_str(&s[..prefix_end]);
out.push_str(&marker);
out.push('\n');
out.push_str(&s[suffix_start..]);
return (out, Some(est_tokens));
}
guess_tokens = new_tokens;
}
let marker = format!("{guess_tokens} tokens truncated…");
let marker_len = marker.len();
let keep_budget = max_bytes.saturating_sub(marker_len);
if keep_budget == 0 {
return (format!("{est_tokens} tokens truncated…"), Some(est_tokens));
}
let left_budget = keep_budget / 2;
let right_budget = keep_budget - left_budget;
let prefix_end = pick_prefix_end(s, left_budget);
let suffix_start = pick_suffix_start(s, right_budget);
let mut out = String::with_capacity(marker_len + prefix_end + (s.len() - suffix_start) + 1);
out.push_str(&s[..prefix_end]);
out.push_str(&marker);
out.push('\n');
out.push_str(&s[suffix_start..]);
(out, Some(est_tokens))
}
#[cfg(test)]
mod tests {
use super::truncate_middle;
#[test]
fn truncate_middle_no_newlines_fallback() {
let s = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ*";
let max_bytes = 32;
let (out, original) = truncate_middle(s, max_bytes);
assert!(out.starts_with("abc"));
assert!(out.contains("tokens truncated"));
assert!(out.ends_with("XYZ*"));
assert_eq!(original, Some((s.len() as u64).div_ceil(4)));
}
#[test]
fn truncate_middle_prefers_newline_boundaries() {
let mut s = String::new();
for i in 1..=20 {
s.push_str(&format!("{i:03}\n"));
}
assert_eq!(s.len(), 80);
let max_bytes = 64;
let (out, tokens) = truncate_middle(&s, max_bytes);
assert!(out.starts_with("001\n002\n003\n004\n"));
assert!(out.contains("tokens truncated"));
assert!(out.ends_with("017\n018\n019\n020\n"));
assert_eq!(tokens, Some(20));
}
#[test]
fn truncate_middle_handles_utf8_content() {
let s = "😀😀😀😀😀😀😀😀😀😀\nsecond line with ascii text\n";
let max_bytes = 32;
let (out, tokens) = truncate_middle(s, max_bytes);
assert!(out.contains("tokens truncated"));
assert!(!out.contains('\u{fffd}'));
assert_eq!(tokens, Some((s.len() as u64).div_ceil(4)));
}
#[test]
fn truncate_middle_prefers_newline_boundaries_2() {
// Build a multi-line string of 20 numbered lines (each "NNN\n").
let mut s = String::new();
for i in 1..=20 {
s.push_str(&format!("{i:03}\n"));
}
// Total length: 20 lines * 4 bytes per line = 80 bytes.
assert_eq!(s.len(), 80);
// Choose a cap that forces truncation while leaving room for
// a few lines on each side after accounting for the marker.
let max_bytes = 64;
// Expect exact output: first 4 lines, marker, last 4 lines, and correct token estimate (80/4 = 20).
assert_eq!(
truncate_middle(&s, max_bytes),
(
r#"001
002
003
004
…12 tokens truncated…
017
018
019
020
"#
.to_string(),
Some(20)
)
);
}
}

View File

@@ -1,22 +0,0 @@
use thiserror::Error;
#[derive(Debug, Error)]
pub(crate) enum UnifiedExecError {
#[error("Failed to create unified exec session: {pty_error}")]
CreateSession {
#[source]
pty_error: anyhow::Error,
},
#[error("Unknown session id {session_id}")]
UnknownSessionId { session_id: i32 },
#[error("failed to write to stdin")]
WriteToStdin,
#[error("missing command line for unified exec request")]
MissingCommandLine,
}
impl UnifiedExecError {
pub(crate) fn create_session(error: anyhow::Error) -> Self {
Self::CreateSession { pty_error: error }
}
}

View File

@@ -1,653 +0,0 @@
use portable_pty::CommandBuilder;
use portable_pty::PtySize;
use portable_pty::native_pty_system;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::io::ErrorKind;
use std::io::Read;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
use tokio::sync::Mutex;
use tokio::sync::Notify;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use tokio::time::Instant;
use crate::exec_command::ExecCommandSession;
use crate::truncate::truncate_middle;
mod errors;
pub(crate) use errors::UnifiedExecError;
const DEFAULT_TIMEOUT_MS: u64 = 1_000;
const MAX_TIMEOUT_MS: u64 = 60_000;
const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 128 * 1024; // 128 KiB
#[derive(Debug)]
pub(crate) struct UnifiedExecRequest<'a> {
pub session_id: Option<i32>,
pub input_chunks: &'a [String],
pub timeout_ms: Option<u64>,
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct UnifiedExecResult {
pub session_id: Option<i32>,
pub output: String,
}
#[derive(Debug, Default)]
pub(crate) struct UnifiedExecSessionManager {
next_session_id: AtomicI32,
sessions: Mutex<HashMap<i32, ManagedUnifiedExecSession>>,
}
#[derive(Debug)]
struct ManagedUnifiedExecSession {
session: ExecCommandSession,
output_buffer: OutputBuffer,
/// Notifies waiters whenever new output has been appended to
/// `output_buffer`, allowing clients to poll for fresh data.
output_notify: Arc<Notify>,
output_task: JoinHandle<()>,
}
#[derive(Debug, Default)]
struct OutputBufferState {
chunks: VecDeque<Vec<u8>>,
total_bytes: usize,
}
impl OutputBufferState {
fn push_chunk(&mut self, chunk: Vec<u8>) {
self.total_bytes = self.total_bytes.saturating_add(chunk.len());
self.chunks.push_back(chunk);
let mut excess = self
.total_bytes
.saturating_sub(UNIFIED_EXEC_OUTPUT_MAX_BYTES);
while excess > 0 {
match self.chunks.front_mut() {
Some(front) if excess >= front.len() => {
excess -= front.len();
self.total_bytes = self.total_bytes.saturating_sub(front.len());
self.chunks.pop_front();
}
Some(front) => {
front.drain(..excess);
self.total_bytes = self.total_bytes.saturating_sub(excess);
break;
}
None => break,
}
}
}
fn drain(&mut self) -> Vec<Vec<u8>> {
let drained: Vec<Vec<u8>> = self.chunks.drain(..).collect();
self.total_bytes = 0;
drained
}
}
type OutputBuffer = Arc<Mutex<OutputBufferState>>;
type OutputHandles = (OutputBuffer, Arc<Notify>);
impl ManagedUnifiedExecSession {
fn new(session: ExecCommandSession) -> Self {
let output_buffer = Arc::new(Mutex::new(OutputBufferState::default()));
let output_notify = Arc::new(Notify::new());
let mut receiver = session.output_receiver();
let buffer_clone = Arc::clone(&output_buffer);
let notify_clone = Arc::clone(&output_notify);
let output_task = tokio::spawn(async move {
while let Ok(chunk) = receiver.recv().await {
let mut guard = buffer_clone.lock().await;
guard.push_chunk(chunk);
drop(guard);
notify_clone.notify_waiters();
}
});
Self {
session,
output_buffer,
output_notify,
output_task,
}
}
fn writer_sender(&self) -> mpsc::Sender<Vec<u8>> {
self.session.writer_sender()
}
fn output_handles(&self) -> OutputHandles {
(
Arc::clone(&self.output_buffer),
Arc::clone(&self.output_notify),
)
}
fn has_exited(&self) -> bool {
self.session.has_exited()
}
}
impl Drop for ManagedUnifiedExecSession {
fn drop(&mut self) {
self.output_task.abort();
}
}
impl UnifiedExecSessionManager {
pub async fn handle_request(
&self,
request: UnifiedExecRequest<'_>,
) -> Result<UnifiedExecResult, UnifiedExecError> {
let (timeout_ms, timeout_warning) = match request.timeout_ms {
Some(requested) if requested > MAX_TIMEOUT_MS => (
MAX_TIMEOUT_MS,
Some(format!(
"Warning: requested timeout {requested}ms exceeds maximum of {MAX_TIMEOUT_MS}ms; clamping to {MAX_TIMEOUT_MS}ms.\n"
)),
),
Some(requested) => (requested, None),
None => (DEFAULT_TIMEOUT_MS, None),
};
let mut new_session: Option<ManagedUnifiedExecSession> = None;
let session_id;
let writer_tx;
let output_buffer;
let output_notify;
if let Some(existing_id) = request.session_id {
let mut sessions = self.sessions.lock().await;
match sessions.get(&existing_id) {
Some(session) => {
if session.has_exited() {
sessions.remove(&existing_id);
return Err(UnifiedExecError::UnknownSessionId {
session_id: existing_id,
});
}
let (buffer, notify) = session.output_handles();
session_id = existing_id;
writer_tx = session.writer_sender();
output_buffer = buffer;
output_notify = notify;
}
None => {
return Err(UnifiedExecError::UnknownSessionId {
session_id: existing_id,
});
}
}
drop(sessions);
} else {
let command = request.input_chunks.to_vec();
let new_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
let session = create_unified_exec_session(&command).await?;
let managed_session = ManagedUnifiedExecSession::new(session);
let (buffer, notify) = managed_session.output_handles();
writer_tx = managed_session.writer_sender();
output_buffer = buffer;
output_notify = notify;
session_id = new_id;
new_session = Some(managed_session);
};
if request.session_id.is_some() {
let joined_input = request.input_chunks.join(" ");
if !joined_input.is_empty() && writer_tx.send(joined_input.into_bytes()).await.is_err()
{
return Err(UnifiedExecError::WriteToStdin);
}
}
let mut collected: Vec<u8> = Vec::with_capacity(4096);
let start = Instant::now();
let deadline = start + Duration::from_millis(timeout_ms);
loop {
let drained_chunks;
let mut wait_for_output = None;
{
let mut guard = output_buffer.lock().await;
drained_chunks = guard.drain();
if drained_chunks.is_empty() {
wait_for_output = Some(output_notify.notified());
}
}
if drained_chunks.is_empty() {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining == Duration::ZERO {
break;
}
let notified = wait_for_output.unwrap_or_else(|| output_notify.notified());
tokio::pin!(notified);
tokio::select! {
_ = &mut notified => {}
_ = tokio::time::sleep(remaining) => break,
}
continue;
}
for chunk in drained_chunks {
collected.extend_from_slice(&chunk);
}
if Instant::now() >= deadline {
break;
}
}
let (output, _maybe_tokens) = truncate_middle(
&String::from_utf8_lossy(&collected),
UNIFIED_EXEC_OUTPUT_MAX_BYTES,
);
let output = if let Some(warning) = timeout_warning {
format!("{warning}{output}")
} else {
output
};
let should_store_session = if let Some(session) = new_session.as_ref() {
!session.has_exited()
} else if request.session_id.is_some() {
let mut sessions = self.sessions.lock().await;
if let Some(existing) = sessions.get(&session_id) {
if existing.has_exited() {
sessions.remove(&session_id);
false
} else {
true
}
} else {
false
}
} else {
true
};
if should_store_session {
if let Some(session) = new_session {
self.sessions.lock().await.insert(session_id, session);
}
Ok(UnifiedExecResult {
session_id: Some(session_id),
output,
})
} else {
Ok(UnifiedExecResult {
session_id: None,
output,
})
}
}
}
async fn create_unified_exec_session(
command: &[String],
) -> Result<ExecCommandSession, UnifiedExecError> {
if command.is_empty() {
return Err(UnifiedExecError::MissingCommandLine);
}
let pty_system = native_pty_system();
let pair = pty_system
.openpty(PtySize {
rows: 24,
cols: 80,
pixel_width: 0,
pixel_height: 0,
})
.map_err(UnifiedExecError::create_session)?;
// Safe thanks to the check at the top of the function.
let mut command_builder = CommandBuilder::new(command[0].clone());
for arg in &command[1..] {
command_builder.arg(arg);
}
let mut child = pair
.slave
.spawn_command(command_builder)
.map_err(UnifiedExecError::create_session)?;
let killer = child.clone_killer();
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
let (output_tx, _) = tokio::sync::broadcast::channel::<Vec<u8>>(256);
let mut reader = pair
.master
.try_clone_reader()
.map_err(UnifiedExecError::create_session)?;
let output_tx_clone = output_tx.clone();
let reader_handle = tokio::task::spawn_blocking(move || {
let mut buf = [0u8; 8192];
loop {
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
let _ = output_tx_clone.send(buf[..n].to_vec());
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
std::thread::sleep(Duration::from_millis(5));
continue;
}
Err(_) => break,
}
}
});
let writer = pair
.master
.take_writer()
.map_err(UnifiedExecError::create_session)?;
let writer = Arc::new(StdMutex::new(writer));
let writer_handle = tokio::spawn({
let writer = writer.clone();
async move {
while let Some(bytes) = writer_rx.recv().await {
let writer = writer.clone();
let _ = tokio::task::spawn_blocking(move || {
if let Ok(mut guard) = writer.lock() {
use std::io::Write;
let _ = guard.write_all(&bytes);
let _ = guard.flush();
}
})
.await;
}
}
});
let exit_status = Arc::new(AtomicBool::new(false));
let wait_exit_status = Arc::clone(&exit_status);
let wait_handle = tokio::task::spawn_blocking(move || {
let _ = child.wait();
wait_exit_status.store(true, Ordering::SeqCst);
});
Ok(ExecCommandSession::new(
writer_tx,
output_tx,
killer,
reader_handle,
writer_handle,
wait_handle,
exit_status,
))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn push_chunk_trims_only_excess_bytes() {
let mut buffer = OutputBufferState::default();
buffer.push_chunk(vec![b'a'; UNIFIED_EXEC_OUTPUT_MAX_BYTES]);
buffer.push_chunk(vec![b'b']);
buffer.push_chunk(vec![b'c']);
assert_eq!(buffer.total_bytes, UNIFIED_EXEC_OUTPUT_MAX_BYTES);
assert_eq!(buffer.chunks.len(), 3);
assert_eq!(
buffer.chunks.front().unwrap().len(),
UNIFIED_EXEC_OUTPUT_MAX_BYTES - 2
);
assert_eq!(buffer.chunks.pop_back().unwrap(), vec![b'c']);
assert_eq!(buffer.chunks.pop_back().unwrap(), vec![b'b']);
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_persists_across_requests_jif() -> Result<(), UnifiedExecError> {
let manager = UnifiedExecSessionManager::default();
let open_shell = manager
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &["bash".to_string(), "-i".to_string()],
timeout_ms: Some(1_500),
})
.await?;
let session_id = open_shell.session_id.expect("expected session_id");
manager
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &[
"export".to_string(),
"CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(),
],
timeout_ms: Some(2_500),
})
.await?;
let out_2 = manager
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
timeout_ms: Some(1_500),
})
.await?;
assert!(out_2.output.contains("codex"));
Ok(())
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn multi_unified_exec_sessions() -> Result<(), UnifiedExecError> {
let manager = UnifiedExecSessionManager::default();
let shell_a = manager
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &["/bin/bash".to_string(), "-i".to_string()],
timeout_ms: Some(1_500),
})
.await?;
let session_a = shell_a.session_id.expect("expected session id");
manager
.handle_request(UnifiedExecRequest {
session_id: Some(session_a),
input_chunks: &["export CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string()],
timeout_ms: Some(1_500),
})
.await?;
let out_2 = manager
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &[
"echo".to_string(),
"$CODEX_INTERACTIVE_SHELL_VAR\n".to_string(),
],
timeout_ms: Some(1_500),
})
.await?;
assert!(!out_2.output.contains("codex"));
let out_3 = manager
.handle_request(UnifiedExecRequest {
session_id: Some(session_a),
input_chunks: &["echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
timeout_ms: Some(1_500),
})
.await?;
assert!(out_3.output.contains("codex"));
Ok(())
}
#[cfg(unix)]
#[tokio::test]
async fn unified_exec_timeouts() -> Result<(), UnifiedExecError> {
let manager = UnifiedExecSessionManager::default();
let open_shell = manager
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &["bash".to_string(), "-i".to_string()],
timeout_ms: Some(1_500),
})
.await?;
let session_id = open_shell.session_id.expect("expected session id");
manager
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &[
"export".to_string(),
"CODEX_INTERACTIVE_SHELL_VAR=codex\n".to_string(),
],
timeout_ms: Some(1_500),
})
.await?;
let out_2 = manager
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &["sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n".to_string()],
timeout_ms: Some(10),
})
.await?;
assert!(!out_2.output.contains("codex"));
tokio::time::sleep(Duration::from_secs(7)).await;
let empty = Vec::new();
let out_3 = manager
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &empty,
timeout_ms: Some(100),
})
.await?;
assert!(out_3.output.contains("codex"));
Ok(())
}
#[cfg(unix)]
#[tokio::test]
async fn requests_with_large_timeout_are_capped() -> Result<(), UnifiedExecError> {
let manager = UnifiedExecSessionManager::default();
let result = manager
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &["echo".to_string(), "codex".to_string()],
timeout_ms: Some(120_000),
})
.await?;
assert!(result.output.starts_with(
"Warning: requested timeout 120000ms exceeds maximum of 60000ms; clamping to 60000ms.\n"
));
assert!(result.output.contains("codex"));
Ok(())
}
#[cfg(unix)]
#[tokio::test]
async fn completed_commands_do_not_persist_sessions() -> Result<(), UnifiedExecError> {
let manager = UnifiedExecSessionManager::default();
let result = manager
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &["/bin/echo".to_string(), "codex".to_string()],
timeout_ms: Some(1_500),
})
.await?;
assert!(result.session_id.is_none());
assert!(result.output.contains("codex"));
assert!(manager.sessions.lock().await.is_empty());
Ok(())
}
#[cfg(unix)]
#[tokio::test]
async fn correct_path_resolution() -> Result<(), UnifiedExecError> {
let manager = UnifiedExecSessionManager::default();
let result = manager
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &["echo".to_string(), "codex".to_string()],
timeout_ms: Some(1_500),
})
.await?;
assert!(result.session_id.is_none());
assert!(result.output.contains("codex"));
assert!(manager.sessions.lock().await.is_empty());
Ok(())
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn reusing_completed_session_returns_unknown_session() -> Result<(), UnifiedExecError> {
let manager = UnifiedExecSessionManager::default();
let open_shell = manager
.handle_request(UnifiedExecRequest {
session_id: None,
input_chunks: &["/bin/bash".to_string(), "-i".to_string()],
timeout_ms: Some(1_500),
})
.await?;
let session_id = open_shell.session_id.expect("expected session id");
manager
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &["exit\n".to_string()],
timeout_ms: Some(1_500),
})
.await?;
tokio::time::sleep(Duration::from_millis(200)).await;
let err = manager
.handle_request(UnifiedExecRequest {
session_id: Some(session_id),
input_chunks: &[],
timeout_ms: Some(100),
})
.await
.expect_err("expected unknown session error");
match err {
UnifiedExecError::UnknownSessionId { session_id: err_id } => {
assert_eq!(err_id, session_id);
}
other => panic!("expected UnknownSessionId, got {other:?}"),
}
assert!(!manager.sessions.lock().await.contains_key(&session_id));
Ok(())
}
}

View File

@@ -277,25 +277,7 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
"content": [{ "type": "input_text", "text": "hello" }]
}
]);
let input_array = request_body
.get("input")
.and_then(|v| v.as_array())
.cloned()
.expect("input array in request body");
let filtered: Vec<serde_json::Value> = input_array
.into_iter()
.filter(|item| {
let text = item
.get("content")
.and_then(|c| c.as_array())
.and_then(|a| a.first())
.and_then(|o| o.get("text"))
.and_then(|t| t.as_str())
.unwrap_or("");
!text.contains("<environment_context>")
})
.collect();
assert_eq!(serde_json::json!(filtered), expected_input);
assert_eq!(request_body["input"], expected_input);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -968,6 +950,34 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() {
assert_eq!(requests.len(), 3, "expected 3 requests (one per turn)");
// Replace full-array compare with tail-only raw JSON compare using a single hard-coded value.
let r3_tail_expected = json!([
{
"type": "message",
"role": "user",
"content": [{"type":"input_text","text":"U1"}]
},
{
"type": "message",
"role": "assistant",
"content": [{"type":"output_text","text":"Hey there!\n"}]
},
{
"type": "message",
"role": "user",
"content": [{"type":"input_text","text":"U2"}]
},
{
"type": "message",
"role": "assistant",
"content": [{"type":"output_text","text":"Hey there!\n"}]
},
{
"type": "message",
"role": "user",
"content": [{"type":"input_text","text":"U3"}]
}
]);
let r3_input_array = requests[2]
.body_json::<serde_json::Value>()
.unwrap()
@@ -975,60 +985,12 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() {
.and_then(|v| v.as_array())
.cloned()
.expect("r3 missing input array");
// We only assert on the last 5 items of the input history for request 3.
// With per-turn environment context injected, the last 5 should be:
// [env_ctx, U2, assistant("Hey there!\n"), env_ctx, U3]
let actual_tail = &r3_input_array[r3_input_array.len() - 5..];
// env_ctx 1
assert_eq!(actual_tail[0]["type"], serde_json::json!("message"));
assert_eq!(actual_tail[0]["role"], serde_json::json!("user"));
let env_text_1 = &actual_tail[0]["content"][0]["text"];
assert!(
env_text_1
.as_str()
.expect("env text should be string")
.contains("<environment_context>")
);
// U2
// skipping earlier context and developer messages
let tail_len = r3_tail_expected.as_array().unwrap().len();
let actual_tail = &r3_input_array[r3_input_array.len() - tail_len..];
assert_eq!(
actual_tail[1],
serde_json::json!({
"type": "message",
"role": "user",
"content": [ { "type": "input_text", "text": "U2" } ]
})
);
// assistant response
assert_eq!(
actual_tail[2],
serde_json::json!({
"type": "message",
"role": "assistant",
"content": [ { "type": "output_text", "text": "Hey there!\n" } ]
})
);
// env_ctx 2
assert_eq!(actual_tail[3]["type"], serde_json::json!("message"));
assert_eq!(actual_tail[3]["role"], serde_json::json!("user"));
let env_text_2 = &actual_tail[3]["content"][0]["text"];
assert!(
env_text_2
.as_str()
.expect("env text should be string")
.contains("<environment_context>")
);
// U3
assert_eq!(
actual_tail[4],
serde_json::json!({
"type": "message",
"role": "user",
"content": [ { "type": "input_text", "text": "U3" } ]
})
serde_json::Value::Array(actual_tail.to_vec()),
r3_tail_expected,
"request 3 tail mismatch",
);
}

View File

@@ -1,16 +1,12 @@
use codex_core::CodexAuth;
use codex_core::ContentItem;
use codex_core::ConversationManager;
use codex_core::ModelProviderInfo;
use codex_core::NewConversation;
use codex_core::ResponseItem;
use codex_core::built_in_model_providers;
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::ConversationHistoryResponseEvent;
use codex_core::protocol::EventMsg;
use codex_core::protocol::InputItem;
use codex_core::protocol::Op;
use codex_core::protocol::RolloutItem;
use codex_core::protocol::RolloutLine;
use core_test_support::load_default_config_for_test;
use core_test_support::wait_for_event;
use tempfile::TempDir;
@@ -75,121 +71,84 @@ async fn fork_conversation_twice_drops_to_first_message() {
let _ = wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
}
// Request history from the base conversation to obtain rollout path.
codex.submit(Op::GetPath).await.unwrap();
// Request history from the base conversation.
codex.submit(Op::GetHistory).await.unwrap();
let base_history =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationPath(_))).await;
let base_path = match &base_history {
EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path.clone(),
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationHistory(_))).await;
// Capture entries from the base history and compute expected prefixes after each fork.
let entries_after_three = match &base_history {
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { entries, .. }) => {
entries.clone()
}
_ => panic!("expected ConversationHistory event"),
};
// History layout for this test:
// [0] user instructions,
// [1] environment context,
// [2] "first" user message,
// [3] "second" user message,
// [4] "third" user message.
// GetHistory flushes before returning the path; no wait needed.
// Fork 1: drops the last user message and everything after.
let expected_after_first = vec![
entries_after_three[0].clone(),
entries_after_three[1].clone(),
entries_after_three[2].clone(),
entries_after_three[3].clone(),
];
// Helper: read rollout items (excluding SessionMeta) from a JSONL path.
let read_items = |p: &std::path::Path| -> Vec<RolloutItem> {
let text = std::fs::read_to_string(p).expect("read rollout file");
let mut items: Vec<RolloutItem> = Vec::new();
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
let v: serde_json::Value = serde_json::from_str(line).expect("jsonl line");
let rl: RolloutLine = serde_json::from_value(v).expect("rollout line");
match rl.item {
RolloutItem::SessionMeta(_) => {}
other => items.push(other),
}
}
items
};
// Fork 2: drops the last user message and everything after.
// [0] user instructions,
// [1] environment context,
// [2] "first" user message,
let expected_after_second = vec![
entries_after_three[0].clone(),
entries_after_three[1].clone(),
entries_after_three[2].clone(),
];
// Compute expected prefixes after each fork by truncating base rollout at nth-from-last user input.
let base_items = read_items(&base_path);
let find_user_input_positions = |items: &[RolloutItem]| -> Vec<usize> {
let mut pos = Vec::new();
for (i, it) in items.iter().enumerate() {
if let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) = it
&& role == "user"
{
// Consider any user message as an input boundary; recorder stores both EventMsg and ResponseItem.
// We specifically look for input items, which are represented as ContentItem::InputText.
if content
.iter()
.any(|c| matches!(c, ContentItem::InputText { .. }))
{
pos.push(i);
}
}
}
pos
};
let user_inputs = find_user_input_positions(&base_items);
// After dropping last user input (n=1), cut strictly before that input if present, else empty.
let cut1 = user_inputs
.get(user_inputs.len().saturating_sub(1))
.copied()
.unwrap_or(0);
let expected_after_first: Vec<RolloutItem> = base_items[..cut1].to_vec();
// After dropping again (n=1 on fork1), compute expected relative to fork1's rollout.
// Fork once with n=1 → drops the last user input and everything after.
// Fork once with n=1 → drops the last user message and everything after.
let NewConversation {
conversation: codex_fork1,
..
} = conversation_manager
.fork_conversation(1, config_for_fork.clone(), base_path.clone())
.fork_conversation(entries_after_three.clone(), 1, config_for_fork.clone())
.await
.expect("fork 1");
codex_fork1.submit(Op::GetPath).await.unwrap();
codex_fork1.submit(Op::GetHistory).await.unwrap();
let fork1_history = wait_for_event(&codex_fork1, |ev| {
matches!(ev, EventMsg::ConversationPath(_))
matches!(ev, EventMsg::ConversationHistory(_))
})
.await;
let fork1_path = match &fork1_history {
EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path.clone(),
let entries_after_first_fork = match &fork1_history {
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { entries, .. }) => {
assert!(matches!(
fork1_history,
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { ref entries, .. }) if *entries == expected_after_first
));
entries.clone()
}
_ => panic!("expected ConversationHistory event after first fork"),
};
// GetHistory on fork1 flushed; the file is ready.
let fork1_items = read_items(&fork1_path);
pretty_assertions::assert_eq!(
serde_json::to_value(&fork1_items).unwrap(),
serde_json::to_value(&expected_after_first).unwrap()
);
// Fork again with n=1 → drops the (new) last user message, leaving only the first.
let NewConversation {
conversation: codex_fork2,
..
} = conversation_manager
.fork_conversation(1, config_for_fork.clone(), fork1_path.clone())
.fork_conversation(entries_after_first_fork.clone(), 1, config_for_fork.clone())
.await
.expect("fork 2");
codex_fork2.submit(Op::GetPath).await.unwrap();
codex_fork2.submit(Op::GetHistory).await.unwrap();
let fork2_history = wait_for_event(&codex_fork2, |ev| {
matches!(ev, EventMsg::ConversationPath(_))
matches!(ev, EventMsg::ConversationHistory(_))
})
.await;
let fork2_path = match &fork2_history {
EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path.clone(),
_ => panic!("expected ConversationHistory event after second fork"),
};
// GetHistory on fork2 flushed; the file is ready.
let fork1_items = read_items(&fork1_path);
let fork1_user_inputs = find_user_input_positions(&fork1_items);
let cut_last_on_fork1 = fork1_user_inputs
.get(fork1_user_inputs.len().saturating_sub(1))
.copied()
.unwrap_or(0);
let expected_after_second: Vec<RolloutItem> = fork1_items[..cut_last_on_fork1].to_vec();
let fork2_items = read_items(&fork2_path);
pretty_assertions::assert_eq!(
serde_json::to_value(&fork2_items).unwrap(),
serde_json::to_value(&expected_after_second).unwrap()
);
assert!(matches!(
fork2_history,
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { ref entries, .. }) if *entries == expected_after_second
));
}

View File

@@ -191,8 +191,7 @@ async fn prompt_tools_are_consistent_across_requests() {
let expected_instructions: &str = include_str!("../../prompt.md");
// our internal implementation is responsible for keeping tools in sync
// with the OpenAI schema, so we just verify the tool presence here
let expected_tools_names: &[&str] =
&["unified_exec", "update_plan", "apply_patch", "view_image"];
let expected_tools_names: &[&str] = &["shell", "update_plan", "apply_patch", "view_image"];
let body0 = requests[0].body_json::<serde_json::Value>().unwrap();
assert_eq!(
body0["instructions"],
@@ -272,7 +271,7 @@ async fn prefixes_context_and_instructions_once_and_consistently_across_requests
let shell = default_user_shell().await;
let expected_env_text_init = format!(
let expected_env_text = format!(
r#"<environment_context>
<cwd>{}</cwd>
<approval_policy>on-request</approval_policy>
@@ -285,28 +284,13 @@ async fn prefixes_context_and_instructions_once_and_consistently_across_requests
None => String::new(),
}
);
// Per-turn environment context omits the shell tag.
let expected_env_text_turn = format!(
r#"<environment_context>
<cwd>{}</cwd>
<approval_policy>on-request</approval_policy>
<sandbox_mode>read-only</sandbox_mode>
<network_access>restricted</network_access>
</environment_context>"#,
cwd.path().to_string_lossy(),
);
let expected_ui_text =
"<user_instructions>\n\nbe consistent and helpful\n\n</user_instructions>";
let expected_env_msg_init = serde_json::json!({
let expected_env_msg = serde_json::json!({
"type": "message",
"role": "user",
"content": [ { "type": "input_text", "text": expected_env_text_init } ]
});
let expected_env_msg_turn = serde_json::json!({
"type": "message",
"role": "user",
"content": [ { "type": "input_text", "text": expected_env_text_turn } ]
"content": [ { "type": "input_text", "text": expected_env_text } ]
});
let expected_ui_msg = serde_json::json!({
"type": "message",
@@ -322,12 +306,7 @@ async fn prefixes_context_and_instructions_once_and_consistently_across_requests
let body1 = requests[0].body_json::<serde_json::Value>().unwrap();
assert_eq!(
body1["input"],
serde_json::json!([
expected_ui_msg,
expected_env_msg_init,
expected_env_msg_turn,
expected_user_message_1
])
serde_json::json!([expected_ui_msg, expected_env_msg, expected_user_message_1])
);
let expected_user_message_2 = serde_json::json!({
@@ -339,7 +318,7 @@ async fn prefixes_context_and_instructions_once_and_consistently_across_requests
let expected_body2 = serde_json::json!(
[
body1["input"].as_array().unwrap().as_slice(),
[expected_env_msg_turn, expected_user_message_2].as_slice(),
[expected_user_message_2].as_slice(),
]
.concat()
);
@@ -447,17 +426,11 @@ async fn overrides_turn_context_but_keeps_cached_prefix_and_key_constant() {
// After overriding the turn context, the environment context should be emitted again
// reflecting the new approval policy and sandbox settings. Omit cwd because it did
// not change.
let expected_env_text_2 = format!(
r#"<environment_context>
let expected_env_text_2 = r#"<environment_context>
<approval_policy>never</approval_policy>
<sandbox_mode>workspace-write</sandbox_mode>
<network_access>enabled</network_access>
<writable_roots>
<root>{}</root>
</writable_roots>
</environment_context>"#,
writable.path().to_string_lossy()
);
</environment_context>"#;
let expected_env_msg_2 = serde_json::json!({
"type": "message",
"role": "user",
@@ -567,24 +540,10 @@ async fn per_turn_overrides_keep_cached_prefix_and_key_constant() {
"role": "user",
"content": [ { "type": "input_text", "text": "hello 2" } ]
});
let expected_env_text_2 = format!(
r#"<environment_context>
<cwd>{}</cwd>
<approval_policy>never</approval_policy>
<sandbox_mode>workspace-write</sandbox_mode>
<network_access>enabled</network_access>
</environment_context>"#,
new_cwd.path().to_string_lossy()
);
let expected_env_msg_2 = serde_json::json!({
"type": "message",
"role": "user",
"content": [ { "type": "input_text", "text": expected_env_text_2 } ]
});
let expected_body2 = serde_json::json!(
[
body1["input"].as_array().unwrap().as_slice(),
[expected_env_msg_2, expected_user_message_2].as_slice(),
[expected_user_message_2].as_slice(),
]
.concat()
);

View File

@@ -559,7 +559,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
}
},
EventMsg::ShutdownComplete => return CodexStatus::Shutdown,
EventMsg::ConversationPath(_) => {}
EventMsg::ConversationHistory(_) => {}
EventMsg::UserMessage(_) => {}
}
CodexStatus::Running

View File

@@ -17,10 +17,10 @@ use anyhow::Context;
use anyhow::Result;
use codex_mcp_client::McpClient;
use mcp_types::ClientCapabilities;
use mcp_types::Implementation;
use mcp_types::InitializeRequestParams;
use mcp_types::ListToolsRequestParams;
use mcp_types::MCP_SCHEMA_VERSION;
use mcp_types::McpClientInfo;
use tracing_subscriber::EnvFilter;
#[tokio::main]
@@ -60,13 +60,10 @@ async fn main() -> Result<()> {
sampling: None,
elicitation: None,
},
client_info: Implementation {
client_info: McpClientInfo {
name: "codex-mcp-client".to_owned(),
version: env!("CARGO_PKG_VERSION").to_owned(),
title: Some("Codex".to_string()),
// This field is used by Codex when it is an MCP server: it should
// not be used when Codex is an MCP client.
user_agent: None,
},
protocol_version: MCP_SCHEMA_VERSION.to_owned(),
};

View File

@@ -40,7 +40,6 @@ uuid = { version = "1", features = ["serde", "v4"] }
[dev-dependencies]
assert_cmd = "2"
base64 = "0.22"
mcp_test_support = { path = "tests/common" }
os_info = "3.12.0"
pretty_assertions = "1.4.1"

View File

@@ -11,8 +11,6 @@ use codex_core::NewConversation;
use codex_core::RolloutRecorder;
use codex_core::SessionMeta;
use codex_core::auth::CLIENT_ID;
use codex_core::auth::get_auth_file;
use codex_core::auth::try_read_auth_json;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
use codex_core::config::ConfigToml;
@@ -69,7 +67,6 @@ use codex_protocol::mcp_protocol::SendUserMessageResponse;
use codex_protocol::mcp_protocol::SendUserTurnParams;
use codex_protocol::mcp_protocol::SendUserTurnResponse;
use codex_protocol::mcp_protocol::ServerNotification;
use codex_protocol::mcp_protocol::UserInfoResponse;
use codex_protocol::mcp_protocol::UserSavedConfig;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
@@ -190,9 +187,6 @@ impl CodexMessageProcessor {
ClientRequest::GetUserAgent { request_id } => {
self.get_user_agent(request_id).await;
}
ClientRequest::UserInfo { request_id } => {
self.get_user_info(request_id).await;
}
ClientRequest::ExecOneOffCommand { request_id, params } => {
self.exec_one_off_command(request_id, params).await;
}
@@ -445,18 +439,6 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
async fn get_user_info(&self, request_id: RequestId) {
// Read alleged user email from auth.json (best-effort; not verified).
let auth_path = get_auth_file(&self.config.codex_home);
let alleged_user_email = match try_read_auth_json(&auth_path) {
Ok(auth) => auth.tokens.and_then(|t| t.id_token.email),
Err(_) => None,
};
let response = UserInfoResponse { alleged_user_email };
self.outgoing.send_response(request_id, response).await;
}
async fn exec_one_off_command(&self, request_id: RequestId, params: ExecOneOffCommandParams) {
tracing::debug!("ExecOneOffCommand params: {params:?}");

View File

@@ -277,7 +277,7 @@ async fn run_codex_tool_session_inner(
| EventMsg::GetHistoryEntryResponse(_)
| EventMsg::PlanUpdate(_)
| EventMsg::TurnAborted(_)
| EventMsg::ConversationPath(_)
| EventMsg::ConversationHistory(_)
| EventMsg::UserMessage(_)
| EventMsg::ShutdownComplete => {
// For now, we do not do anything extra for these

View File

@@ -234,7 +234,7 @@ impl MessageProcessor {
},
instructions: None,
protocol_version: params.protocol_version.clone(),
server_info: mcp_types::Implementation {
server_info: mcp_types::McpServerInfo {
name: "codex-mcp-server".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
title: Some("Codex".to_string()),

View File

@@ -26,13 +26,13 @@ use codex_protocol::mcp_protocol::SendUserTurnParams;
use mcp_types::CallToolRequestParams;
use mcp_types::ClientCapabilities;
use mcp_types::Implementation;
use mcp_types::InitializeRequestParams;
use mcp_types::JSONRPC_VERSION;
use mcp_types::JSONRPCMessage;
use mcp_types::JSONRPCNotification;
use mcp_types::JSONRPCRequest;
use mcp_types::JSONRPCResponse;
use mcp_types::McpClientInfo;
use mcp_types::ModelContextProtocolNotification;
use mcp_types::ModelContextProtocolRequest;
use mcp_types::RequestId;
@@ -134,11 +134,10 @@ impl McpProcess {
roots: None,
sampling: None,
},
client_info: Implementation {
client_info: McpClientInfo {
name: "elicitation test".into(),
title: Some("Elicitation Test".into()),
version: "0.0.0".into(),
user_agent: None,
},
protocol_version: mcp_types::MCP_SCHEMA_VERSION.into(),
};
@@ -295,11 +294,6 @@ impl McpProcess {
self.send_request("getUserAgent", None).await
}
/// Send a `userInfo` JSON-RPC request.
pub async fn send_user_info_request(&mut self) -> anyhow::Result<i64> {
self.send_request("userInfo", None).await
}
/// Send a `listConversations` JSON-RPC request.
pub async fn send_list_conversations_request(
&mut self,

View File

@@ -10,4 +10,3 @@ mod list_resume;
mod login;
mod send_message;
mod user_agent;
mod user_info;

View File

@@ -1,78 +0,0 @@
use std::time::Duration;
use anyhow::Context;
use base64::Engine;
use base64::engine::general_purpose::URL_SAFE_NO_PAD;
use codex_core::auth::AuthDotJson;
use codex_core::auth::get_auth_file;
use codex_core::auth::write_auth_json;
use codex_core::token_data::IdTokenInfo;
use codex_core::token_data::TokenData;
use codex_protocol::mcp_protocol::UserInfoResponse;
use mcp_test_support::McpProcess;
use mcp_test_support::to_response;
use mcp_types::JSONRPCResponse;
use mcp_types::RequestId;
use pretty_assertions::assert_eq;
use serde_json::json;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(10);
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn user_info_returns_email_from_auth_json() {
let codex_home = TempDir::new().expect("create tempdir");
let auth_path = get_auth_file(codex_home.path());
let mut id_token = IdTokenInfo::default();
id_token.email = Some("user@example.com".to_string());
id_token.raw_jwt = encode_id_token_with_email("user@example.com").expect("encode id token");
let auth = AuthDotJson {
openai_api_key: None,
tokens: Some(TokenData {
id_token,
access_token: "access".to_string(),
refresh_token: "refresh".to_string(),
account_id: None,
}),
last_refresh: None,
};
write_auth_json(&auth_path, &auth).expect("write auth.json");
let mut mcp = McpProcess::new(codex_home.path())
.await
.expect("spawn mcp process");
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
.await
.expect("initialize timeout")
.expect("initialize request");
let request_id = mcp.send_user_info_request().await.expect("send userInfo");
let response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await
.expect("userInfo timeout")
.expect("userInfo response");
let received: UserInfoResponse = to_response(response).expect("deserialize userInfo response");
let expected = UserInfoResponse {
alleged_user_email: Some("user@example.com".to_string()),
};
assert_eq!(received, expected);
}
fn encode_id_token_with_email(email: &str) -> anyhow::Result<String> {
let header_b64 = URL_SAFE_NO_PAD.encode(
serde_json::to_vec(&json!({ "alg": "none", "typ": "JWT" }))
.context("serialize jwt header")?,
);
let payload =
serde_json::to_vec(&json!({ "email": email })).context("serialize jwt payload")?;
let payload_b64 = URL_SAFE_NO_PAD.encode(payload);
Ok(format!("{header_b64}.{payload_b64}.signature"))
}

View File

@@ -265,11 +265,8 @@ class StructField:
name: str
type_name: str
serde: str | None = None
comment: str | None = None
def append(self, out: list[str], supports_const: bool) -> None:
if self.comment:
out.append(f" // {self.comment}\n")
if self.serde:
out.append(f" {self.serde}\n")
if self.viz == "const":
@@ -315,18 +312,6 @@ def define_struct(
else:
fields.append(StructField("pub", rs_prop.name, prop_type, rs_prop.serde))
# Special-case: add Codex-specific user_agent to Implementation
if name == "Implementation":
fields.append(
StructField(
"pub",
"user_agent",
"Option<String>",
'#[serde(default, skip_serializing_if = "Option::is_none")]',
"This is an extra field that the Codex MCP server sends as part of InitializeResult.",
)
)
if implements_request_trait(name):
add_trait_impl(name, "ModelContextProtocolRequest", fields, out)
elif implements_notification_trait(name):

View File

@@ -482,12 +482,20 @@ pub struct ImageContent {
/// Describes the name and version of an MCP implementation, with an optional title for UI representation.
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, TS)]
pub struct Implementation {
pub struct McpClientInfo {
pub name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub title: Option<String>,
pub version: String,
}
/// Describes the name and version of an MCP implementation, with an optional title for UI representation.
#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, TS)]
pub struct McpServerInfo {
pub name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub title: Option<String>,
pub version: String,
// This is an extra field that the Codex MCP server sends as part of InitializeResult.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub user_agent: Option<String>,
}
@@ -505,7 +513,7 @@ impl ModelContextProtocolRequest for InitializeRequest {
pub struct InitializeRequestParams {
pub capabilities: ClientCapabilities,
#[serde(rename = "clientInfo")]
pub client_info: Implementation,
pub client_info: McpClientInfo,
#[serde(rename = "protocolVersion")]
pub protocol_version: String,
}
@@ -519,7 +527,7 @@ pub struct InitializeResult {
#[serde(rename = "protocolVersion")]
pub protocol_version: String,
#[serde(rename = "serverInfo")]
pub server_info: Implementation,
pub server_info: McpServerInfo,
}
impl From<InitializeResult> for serde_json::Value {

View File

@@ -1,10 +1,10 @@
use mcp_types::ClientCapabilities;
use mcp_types::ClientRequest;
use mcp_types::Implementation;
use mcp_types::InitializeRequestParams;
use mcp_types::JSONRPC_VERSION;
use mcp_types::JSONRPCMessage;
use mcp_types::JSONRPCRequest;
use mcp_types::McpClientInfo;
use mcp_types::RequestId;
use serde_json::json;
@@ -58,11 +58,10 @@ fn deserialize_initialize_request() {
sampling: None,
elicitation: None,
},
client_info: Implementation {
client_info: McpClientInfo {
name: "acme-client".into(),
title: Some("Acme".to_string()),
version: "1.2.3".into(),
user_agent: None,
},
protocol_version: "2025-06-18".into(),
}

View File

@@ -39,7 +39,6 @@ pub fn generate_ts(out_dir: &Path, prettier: Option<&Path>) -> Result<()> {
codex_protocol::mcp_protocol::ExecCommandApprovalResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::GetUserSavedConfigResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::GetUserAgentResponse::export_all_to(out_dir)?;
codex_protocol::mcp_protocol::UserInfoResponse::export_all_to(out_dir)?;
// All notification types reachable from this enum will be generated by
// induction, so they do not need to be listed individually.

View File

@@ -152,10 +152,6 @@ pub enum ClientRequest {
#[serde(rename = "id")]
request_id: RequestId,
},
UserInfo {
#[serde(rename = "id")]
request_id: RequestId,
},
/// Execute a command (argv vector) under the server's sandbox.
ExecOneOffCommand {
#[serde(rename = "id")]
@@ -378,16 +374,6 @@ pub struct GetUserAgentResponse {
pub user_agent: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)]
#[serde(rename_all = "camelCase")]
pub struct UserInfoResponse {
/// Note: `alleged_user_email` is not currently verified. We read it from
/// the local auth.json, which the user could theoretically modify. In the
/// future, we may add logic to verify the email against the server before
/// returning it.
pub alleged_user_email: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)]
#[serde(rename_all = "camelCase")]
pub struct GetUserSavedConfigResponse {

View File

@@ -115,6 +115,7 @@ pub enum ResponseItem {
status: Option<String>,
action: WebSearchAction,
},
#[serde(other)]
Other,
}

View File

@@ -149,7 +149,7 @@ pub enum Op {
/// Request the full in-memory conversation transcript for the current session.
/// Reply is delivered via `EventMsg::ConversationHistory`.
GetPath,
GetHistory,
/// Request the list of MCP tools available across all configured servers.
/// Reply is delivered via `EventMsg::McpListToolsResponse`.
@@ -499,7 +499,7 @@ pub enum EventMsg {
/// Notification that the agent is shutting down.
ShutdownComplete,
ConversationPath(ConversationPathResponseEvent),
ConversationHistory(ConversationHistoryResponseEvent),
}
// Individual event payload types matching each `EventMsg` variant.
@@ -801,9 +801,9 @@ pub struct WebSearchEndEvent {
/// Response payload for `Op::GetHistory` containing the current session's
/// in-memory transcript.
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
pub struct ConversationPathResponseEvent {
pub struct ConversationHistoryResponseEvent {
pub conversation_id: ConversationId,
pub path: PathBuf,
pub entries: Vec<ResponseItem>,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]

View File

@@ -1,11 +1,9 @@
use std::path::PathBuf;
use crate::app::App;
use crate::backtrack_helpers;
use crate::pager_overlay::Overlay;
use crate::tui;
use crate::tui::TuiEvent;
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::ConversationHistoryResponseEvent;
use codex_protocol::mcp_protocol::ConversationId;
use color_eyre::eyre::Result;
use crossterm::event::KeyCode;
@@ -100,7 +98,7 @@ impl App {
) {
self.backtrack.pending = Some((base_id, drop_last_messages, prefill));
self.app_event_tx.send(crate::app_event::AppEvent::CodexOp(
codex_core::protocol::Op::GetPath,
codex_core::protocol::Op::GetHistory,
));
}
@@ -267,7 +265,7 @@ impl App {
pub(crate) async fn on_conversation_history_for_backtrack(
&mut self,
tui: &mut tui::Tui,
ev: ConversationPathResponseEvent,
ev: ConversationHistoryResponseEvent,
) -> Result<()> {
if let Some((base_id, _, _)) = self.backtrack.pending.as_ref()
&& ev.conversation_id == *base_id
@@ -283,14 +281,14 @@ impl App {
async fn fork_and_switch_to_new_conversation(
&mut self,
tui: &mut tui::Tui,
ev: ConversationPathResponseEvent,
ev: ConversationHistoryResponseEvent,
drop_count: usize,
prefill: String,
) {
let cfg = self.chat_widget.config_ref().clone();
// Perform the fork via a thin wrapper for clarity/testability.
let result = self
.perform_fork(ev.path.clone(), drop_count, cfg.clone())
.perform_fork(ev.entries.clone(), drop_count, cfg.clone())
.await;
match result {
Ok(new_conv) => {
@@ -303,11 +301,13 @@ impl App {
/// Thin wrapper around ConversationManager::fork_conversation.
async fn perform_fork(
&self,
path: PathBuf,
entries: Vec<codex_protocol::models::ResponseItem>,
drop_count: usize,
cfg: codex_core::config::Config,
) -> codex_core::error::Result<codex_core::NewConversation> {
self.server.fork_conversation(drop_count, cfg, path).await
self.server
.fork_conversation(entries, drop_count, cfg)
.await
}
/// Install a forked conversation into the ChatWidget and update UI to reflect selection.

View File

@@ -1,4 +1,4 @@
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::ConversationHistoryResponseEvent;
use codex_core::protocol::Event;
use codex_file_search::FileMatch;
@@ -58,5 +58,5 @@ pub(crate) enum AppEvent {
UpdateSandboxPolicy(SandboxPolicy),
/// Forwarded conversation history snapshot from the current conversation.
ConversationHistory(ConversationPathResponseEvent),
ConversationHistory(ConversationHistoryResponseEvent),
}

View File

@@ -1083,7 +1083,7 @@ impl ChatWidget {
self.on_user_message_event(ev);
}
}
EventMsg::ConversationPath(ev) => {
EventMsg::ConversationHistory(ev) => {
self.app_event_tx
.send(crate::app_event::AppEvent::ConversationHistory(ev));
}