Compare commits

...

1 Commits

Author SHA1 Message Date
Dylan Hurd
a772c86cd8 feat(core) python_tool 2026-02-07 11:31:16 -05:00
10 changed files with 848 additions and 1 deletions

7
codex-rs/Cargo.lock generated
View File

@@ -1567,6 +1567,7 @@ dependencies = [
"landlock",
"libc",
"maplit",
"monty",
"multimap",
"notify",
"once_cell",
@@ -5158,6 +5159,12 @@ dependencies = [
"uuid",
]
[[package]]
name = "monty"
version = "0.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07773618cf354db873984814969d052a0072b1f99e88ca29850259272a8f7ddd"
[[package]]
name = "moxcms"
version = "0.7.11"

View File

@@ -109,6 +109,7 @@ uuid = { workspace = true, features = ["serde", "v4", "v5"] }
which = { workspace = true }
wildmatch = { workspace = true }
zip = { workspace = true }
monty = "0.0.0"
[features]
deterministic_process_ids = []

View File

@@ -232,6 +232,8 @@ use codex_utils_readiness::Readiness;
use codex_utils_readiness::ReadinessFlag;
use tokio::sync::watch;
const PYTHON_TOOL_DEVELOPER_INSTRUCTIONS: &str = include_str!("python_tool_developer_message.md");
/// The high-level interface to the Codex system.
/// It operates as a queue pair where you send submissions and receive events.
pub struct Codex {
@@ -2129,6 +2131,9 @@ impl Session {
)
.into(),
);
if self.features.enabled(Feature::PythonTool) {
items.push(DeveloperInstructions::new(PYTHON_TOOL_DEVELOPER_INSTRUCTIONS).into());
}
if let Some(developer_instructions) = turn_context.developer_instructions.as_deref() {
items.push(DeveloperInstructions::new(developer_instructions.to_string()).into());
}

View File

@@ -105,6 +105,8 @@ pub enum Feature {
Sqlite,
/// Enable the get_memory tool backed by SQLite thread memories.
MemoryTool,
/// Enable the dedicated Python execution tool.
PythonTool,
/// Append additional AGENTS.md guidance to user instructions.
ChildAgentsMd,
/// Enforce UTF8 output in Powershell.
@@ -453,6 +455,12 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::PythonTool,
key: "python_tool",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::ChildAgentsMd,
key: "child_agents_md",

View File

@@ -0,0 +1,11 @@
Python tool mode is enabled.
Use the `python` function tool for code execution instead of shell tools.
Tool interface:
- required: `code` (Python source string)
- optional: `args` (argv values), `workdir`, `timeout_ms` (or `timeout`), `python` (executable path), `sandbox_permissions`, `justification`, `prefix_rule`
Behavior:
- runs as `<python-or-python3> -c <code> [args...]`
- returns combined stdout/stderr plus metadata in the tool output

View File

@@ -7,6 +7,7 @@ mod list_dir;
mod mcp;
mod mcp_resource;
mod plan;
mod python;
mod read_file;
mod request_user_input;
mod shell;
@@ -27,6 +28,7 @@ pub use list_dir::ListDirHandler;
pub use mcp::McpHandler;
pub use mcp_resource::McpResourceHandler;
pub use plan::PlanHandler;
pub use python::PythonHandler;
pub use read_file::ReadFileHandler;
pub use request_user_input::RequestUserInputHandler;
pub(crate) use request_user_input::request_user_input_tool_description;

View File

@@ -0,0 +1,173 @@
use async_trait::async_trait;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::SandboxPermissions;
use serde::Deserialize;
use crate::exec_env::create_env;
use crate::exec_policy::ExecApprovalRequest;
use crate::function_tool::FunctionCallError;
use crate::protocol::ExecCommandSource;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolOutput;
use crate::tools::context::ToolPayload;
use crate::tools::events::ToolEmitter;
use crate::tools::events::ToolEventCtx;
use crate::tools::handlers::parse_arguments;
use crate::tools::orchestrator::ToolOrchestrator;
use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
use crate::tools::runtimes::shell::ShellRequest;
use crate::tools::runtimes::shell::ShellRuntime;
use crate::tools::sandboxing::ToolCtx;
pub struct PythonHandler;
#[derive(Debug, Deserialize)]
struct PythonToolCallParams {
code: String,
#[serde(default)]
args: Vec<String>,
#[serde(default)]
workdir: Option<String>,
#[serde(alias = "timeout", default)]
timeout_ms: Option<u64>,
#[serde(default)]
python: Option<String>,
#[serde(default)]
sandbox_permissions: Option<SandboxPermissions>,
#[serde(default)]
prefix_rule: Option<Vec<String>>,
#[serde(default)]
justification: Option<String>,
}
#[async_trait]
impl ToolHandler for PythonHandler {
fn kind(&self) -> ToolKind {
ToolKind::Function
}
fn matches_kind(&self, payload: &ToolPayload) -> bool {
matches!(payload, ToolPayload::Function { .. })
}
async fn is_mutating(&self, _invocation: &ToolInvocation) -> bool {
true
}
async fn handle(&self, invocation: ToolInvocation) -> Result<ToolOutput, FunctionCallError> {
let ToolInvocation {
session,
turn,
call_id,
tool_name,
payload,
..
} = invocation;
let ToolPayload::Function { arguments } = payload else {
return Err(FunctionCallError::RespondToModel(format!(
"unsupported payload for python handler: {tool_name}"
)));
};
let params: PythonToolCallParams = parse_arguments(&arguments)?;
let PythonToolCallParams {
code,
args,
workdir,
timeout_ms,
python,
sandbox_permissions,
prefix_rule,
justification,
} = params;
let mut command = vec![python.unwrap_or_else(|| "python3".to_string())];
command.push("-c".to_string());
command.push(code);
command.extend(args);
let sandbox_permissions = sandbox_permissions.unwrap_or_default();
if sandbox_permissions.requires_escalated_permissions()
&& !matches!(
turn.approval_policy,
codex_protocol::protocol::AskForApproval::OnRequest
)
{
let approval_policy = turn.approval_policy;
return Err(FunctionCallError::RespondToModel(format!(
"approval policy is {approval_policy:?}; reject command — you should not ask for escalated permissions if the approval policy is {approval_policy:?}"
)));
}
let features = session.features();
let request_rule_enabled = features.enabled(crate::features::Feature::RequestRule);
let prefix_rule = if request_rule_enabled {
prefix_rule
} else {
None
};
let cwd = turn.resolve_path(workdir);
let mut env = create_env(
&turn.shell_environment_policy,
Some(session.conversation_id),
);
let dependency_env = session.dependency_env().await;
if !dependency_env.is_empty() {
env.extend(dependency_env);
}
let emitter = ToolEmitter::shell(
command.clone(),
cwd.clone(),
ExecCommandSource::Agent,
false,
);
let event_ctx = ToolEventCtx::new(session.as_ref(), turn.as_ref(), &call_id, None);
emitter.begin(event_ctx).await;
let exec_approval_requirement = session
.services
.exec_policy
.create_exec_approval_requirement_for_command(ExecApprovalRequest {
command: &command,
approval_policy: turn.approval_policy,
sandbox_policy: &turn.sandbox_policy,
sandbox_permissions,
prefix_rule,
})
.await;
let req = ShellRequest {
command,
cwd,
timeout_ms,
env,
sandbox_permissions,
justification,
exec_approval_requirement,
};
let mut orchestrator = ToolOrchestrator::new();
let mut runtime = ShellRuntime::new();
let tool_ctx = ToolCtx {
session: session.as_ref(),
turn: turn.as_ref(),
call_id: call_id.clone(),
tool_name,
};
let out = orchestrator
.run(&mut runtime, &req, &tool_ctx, &turn, turn.approval_policy)
.await;
let event_ctx = ToolEventCtx::new(session.as_ref(), turn.as_ref(), &call_id, None);
let content = emitter.finish(event_ctx, out).await?;
Ok(ToolOutput::Function {
body: FunctionCallOutputBody::Text(content),
success: Some(true),
})
}
}

View File

@@ -29,6 +29,7 @@ pub(crate) struct ToolsConfig {
pub shell_type: ConfigShellToolType,
pub apply_patch_tool_type: Option<ApplyPatchToolType>,
pub web_search_mode: Option<WebSearchMode>,
pub python_tool: bool,
pub collab_tools: bool,
pub collaboration_modes_tools: bool,
pub memory_tools: bool,
@@ -50,12 +51,13 @@ impl ToolsConfig {
web_search_mode,
} = params;
let include_apply_patch_tool = features.enabled(Feature::ApplyPatchFreeform);
let include_python_tool = features.enabled(Feature::PythonTool);
let include_collab_tools = features.enabled(Feature::Collab);
let include_collaboration_modes_tools = features.enabled(Feature::CollaborationModes);
let include_memory_tools = features.enabled(Feature::MemoryTool);
let request_rule_enabled = features.enabled(Feature::RequestRule);
let shell_type = if !features.enabled(Feature::ShellTool) {
let shell_type = if include_python_tool || !features.enabled(Feature::ShellTool) {
ConfigShellToolType::Disabled
} else if features.enabled(Feature::UnifiedExec) {
// If ConPTY not supported (for old Windows versions), fallback on ShellCommand.
@@ -84,6 +86,7 @@ impl ToolsConfig {
shell_type,
apply_patch_tool_type,
web_search_mode: *web_search_mode,
python_tool: include_python_tool,
collab_tools: include_collab_tools,
collaboration_modes_tools: include_collaboration_modes_tools,
memory_tools: include_memory_tools,
@@ -265,6 +268,64 @@ fn create_exec_command_tool(include_prefix_rule: bool) -> ToolSpec {
})
}
fn create_python_tool(include_prefix_rule: bool) -> ToolSpec {
let mut properties = BTreeMap::from([
(
"code".to_string(),
JsonSchema::String {
description: Some("Python source code to execute with `python3 -c`.".to_string()),
},
),
(
"args".to_string(),
JsonSchema::Array {
items: Box::new(JsonSchema::String { description: None }),
description: Some(
"Optional command line arguments passed to the script as `sys.argv[1:]`."
.to_string(),
),
},
),
(
"python".to_string(),
JsonSchema::String {
description: Some(
"Optional Python executable path. Defaults to `python3`.".to_string(),
),
},
),
(
"workdir".to_string(),
JsonSchema::String {
description: Some(
"Optional working directory to run the command in; defaults to the turn cwd."
.to_string(),
),
},
),
(
"timeout_ms".to_string(),
JsonSchema::Number {
description: Some(
"Maximum runtime in milliseconds before the process is terminated.".to_string(),
),
},
),
]);
properties.extend(create_approval_parameters(include_prefix_rule));
ToolSpec::Function(ResponsesApiTool {
name: "python".to_string(),
description: "Run a Python snippet in a subprocess and return stdout/stderr.".to_string(),
strict: false,
parameters: JsonSchema::Object {
properties,
required: Some(vec!["code".to_string()]),
additional_properties: Some(false.into()),
},
})
}
fn create_write_stdin_tool() -> ToolSpec {
let properties = BTreeMap::from([
(
@@ -1259,6 +1320,7 @@ pub(crate) fn build_specs(
use crate::tools::handlers::McpHandler;
use crate::tools::handlers::McpResourceHandler;
use crate::tools::handlers::PlanHandler;
use crate::tools::handlers::PythonHandler;
use crate::tools::handlers::ReadFileHandler;
use crate::tools::handlers::RequestUserInputHandler;
use crate::tools::handlers::ShellCommandHandler;
@@ -1276,6 +1338,7 @@ pub(crate) fn build_specs(
let apply_patch_handler = Arc::new(ApplyPatchHandler);
let dynamic_tool_handler = Arc::new(DynamicToolHandler);
let get_memory_handler = Arc::new(GetMemoryHandler);
let python_handler = Arc::new(PythonHandler);
let view_image_handler = Arc::new(ViewImageHandler);
let mcp_handler = Arc::new(McpHandler);
let mcp_resource_handler = Arc::new(McpResourceHandler);
@@ -1320,6 +1383,12 @@ pub(crate) fn build_specs(
builder.register_handler("shell_command", shell_command_handler);
}
if config.python_tool {
builder
.push_spec_with_parallel_support(create_python_tool(config.request_rule_enabled), true);
builder.register_handler("python", python_handler);
}
builder.push_spec_with_parallel_support(create_list_mcp_resources_tool(), true);
builder.push_spec_with_parallel_support(create_list_mcp_resource_templates_tool(), true);
builder.push_spec_with_parallel_support(create_read_mcp_resource_tool(), true);

View File

@@ -95,6 +95,7 @@ mod permissions_messages;
mod personality;
mod personality_migration;
mod prompt_caching;
mod python_tool;
mod quota_exceeded;
mod read_file;
mod remote_models;

View File

@@ -0,0 +1,570 @@
#![cfg(not(target_os = "windows"))]
#![allow(clippy::expect_used, clippy::unwrap_used)]
use std::fs;
use anyhow::Context;
use anyhow::Result;
use codex_core::features::Feature;
use codex_core::protocol::AskForApproval;
use codex_core::protocol::SandboxPolicy;
use codex_core::sandboxing::SandboxPermissions;
use core_test_support::assert_regex_match;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use which::which;
fn skip_if_no_python3() -> bool {
which("python3").is_err()
}
fn tool_names(body: &Value) -> Vec<String> {
body.get("tools")
.and_then(Value::as_array)
.map(|tools| {
tools
.iter()
.filter_map(|tool| {
tool.get("name")
.or_else(|| tool.get("type"))
.and_then(Value::as_str)
.map(str::to_string)
})
.collect()
})
.unwrap_or_default()
}
fn parse_structured_tool_output(output: &str) -> Result<Value> {
serde_json::from_str(output).context("structured tool output should be valid json")
}
async fn collect_tools(enable_python_tool: bool) -> Result<Vec<String>> {
let server = start_mock_server().await;
let mock = mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-1"),
]),
)
.await;
let mut builder = test_codex().with_config(move |config| {
if enable_python_tool {
config.features.enable(Feature::PythonTool);
} else {
config.features.disable(Feature::PythonTool);
}
});
let test = builder.build(&server).await?;
test.submit_turn("list tools").await?;
let body = mock.single_request().body_json();
Ok(tool_names(&body))
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn python_tool_spec_toggle_end_to_end() -> Result<()> {
skip_if_no_network!(Ok(()));
let tools_disabled = collect_tools(false).await?;
assert!(
!tools_disabled.iter().any(|name| name == "python"),
"tools list should not include python when disabled: {tools_disabled:?}"
);
let tools_enabled = collect_tools(true).await?;
assert!(
tools_enabled.iter().any(|name| name == "python"),
"tools list should include python when enabled: {tools_enabled:?}"
);
for shell_tool in [
"shell",
"shell_command",
"local_shell",
"exec_command",
"write_stdin",
] {
assert!(
!tools_enabled.iter().any(|name| name == shell_tool),
"tools list should not include {shell_tool} when python_tool is enabled: {tools_enabled:?}"
);
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn python_tool_appends_developer_instructions() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let mock = mount_sse_once(
&server,
sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-1"),
]),
)
.await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::PythonTool);
});
let test = builder.build(&server).await?;
test.submit_turn("hello").await?;
let request = mock.single_request();
let developer_messages = request.message_input_texts("developer");
assert!(
developer_messages
.iter()
.any(|msg| msg.contains("Python tool mode is enabled.")),
"expected python tool developer instructions in prompt, got: {developer_messages:?}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn python_tool_executes_code_and_returns_structured_output() -> Result<()> {
skip_if_no_network!(Ok(()));
if skip_if_no_python3() {
return Ok(());
}
let server = start_mock_server().await;
let call_id = "python-basic";
let args = json!({"code": "print('hi from python')"});
let mock = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "python", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
],
)
.await;
let mut builder = test_codex().with_model("gpt-5").with_config(|config| {
config.features.enable(Feature::PythonTool);
});
let test = builder.build(&server).await?;
test.submit_turn("run python").await?;
let output = mock
.function_call_output_text(call_id)
.context("python output present")?;
let output_json = parse_structured_tool_output(&output)?;
assert_eq!(output_json["metadata"]["exit_code"].as_i64(), Some(0));
let stdout = output_json["output"].as_str().unwrap_or_default();
assert_regex_match(r"(?s)^hi from python\n?$", stdout);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn python_tool_call_fails_when_feature_is_disabled() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let call_id = "python-disabled";
let args = json!({"code": "print('should not run')"});
let mock = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "python", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
],
)
.await;
let mut builder = test_codex().with_config(|config| {
config.features.disable(Feature::PythonTool);
});
let test = builder.build(&server).await?;
test.submit_turn("run python while disabled").await?;
let output = mock
.function_call_output_text(call_id)
.context("python output present")?;
assert_eq!(output, "unsupported call: python");
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn python_tool_passes_args_to_script() -> Result<()> {
skip_if_no_network!(Ok(()));
if skip_if_no_python3() {
return Ok(());
}
let server = start_mock_server().await;
let call_id = "python-args";
let args = json!({
"code": "import sys; print('|'.join(sys.argv[1:]))",
"args": ["alpha", "beta", "gamma"],
});
let mock = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "python", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
],
)
.await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::PythonTool);
});
let test = builder.build(&server).await?;
test.submit_turn("run python args").await?;
let output = mock
.function_call_output_text(call_id)
.context("python output present")?;
let output_json = parse_structured_tool_output(&output)?;
assert_eq!(output_json["metadata"]["exit_code"].as_i64(), Some(0));
let stdout = output_json["output"].as_str().unwrap_or_default();
assert_regex_match(r"(?s)^alpha\|beta\|gamma\n?$", stdout);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn python_tool_respects_workdir() -> Result<()> {
skip_if_no_network!(Ok(()));
if skip_if_no_python3() {
return Ok(());
}
let server = start_mock_server().await;
let call_id = "python-workdir";
let args = json!({
"code": "import os; print(os.getcwd())",
"workdir": "nested/dir",
});
let mock = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "python", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
],
)
.await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::PythonTool);
});
let test = builder.build(&server).await?;
let nested_dir = test.workspace_path("nested/dir");
fs::create_dir_all(&nested_dir)?;
test.submit_turn("run python in workdir").await?;
let output = mock
.function_call_output_text(call_id)
.context("python output present")?;
let output_json = parse_structured_tool_output(&output)?;
assert_eq!(output_json["metadata"]["exit_code"].as_i64(), Some(0));
let stdout = output_json["output"].as_str().unwrap_or_default();
assert!(stdout.contains(nested_dir.to_string_lossy().as_ref()));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn python_tool_reports_nonzero_exit_and_stderr() -> Result<()> {
skip_if_no_network!(Ok(()));
if skip_if_no_python3() {
return Ok(());
}
let server = start_mock_server().await;
let call_id = "python-nonzero";
let args = json!({
"code": "import sys; sys.stderr.write('boom\\n'); raise SystemExit(7)",
});
let mock = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "python", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
],
)
.await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::PythonTool);
});
let test = builder.build(&server).await?;
test.submit_turn("run failing python").await?;
let output = mock
.function_call_output_text(call_id)
.context("python output present")?;
let output_json = parse_structured_tool_output(&output)?;
assert_eq!(output_json["metadata"]["exit_code"].as_i64(), Some(7));
let stdout = output_json["output"].as_str().unwrap_or_default();
assert!(stdout.contains("boom"));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn python_tool_escalation_rejected_with_never_approval() -> Result<()> {
skip_if_no_network!(Ok(()));
if skip_if_no_python3() {
return Ok(());
}
let server = start_mock_server().await;
let call_id = "python-escalation";
let args = json!({
"code": "print('blocked')",
"sandbox_permissions": SandboxPermissions::RequireEscalated,
});
let mock = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "python", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
],
)
.await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::PythonTool);
});
let test = builder.build(&server).await?;
test.submit_turn("run escalated python").await?;
let output = mock
.function_call_output_text(call_id)
.context("python output present")?;
let policy = AskForApproval::Never;
let expected_message = format!(
"approval policy is {policy:?}; reject command — you should not ask for escalated permissions if the approval policy is {policy:?}"
);
assert_eq!(output, expected_message);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn python_tool_timeout_includes_timeout_message() -> Result<()> {
skip_if_no_network!(Ok(()));
if skip_if_no_python3() {
return Ok(());
}
let server = start_mock_server().await;
let call_id = "python-timeout";
let args = json!({
"code": "import time; time.sleep(2); print('done')",
"timeout_ms": 50,
});
let mock = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "python", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
],
)
.await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::PythonTool);
});
let test = builder.build(&server).await?;
test.submit_turn("run slow python").await?;
let output = mock
.function_call_output_text(call_id)
.context("python output present")?;
if let Ok(output_json) = parse_structured_tool_output(&output) {
assert_eq!(output_json["metadata"]["exit_code"].as_i64(), Some(124));
let stdout = output_json["output"].as_str().unwrap_or_default();
assert!(stdout.contains("command timed out"));
} else {
assert_regex_match(r"(?is)^execution error:.*signal.*$", &output);
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn python_tool_timeout_alias_matches_timeout_ms_behavior() -> Result<()> {
skip_if_no_network!(Ok(()));
if skip_if_no_python3() {
return Ok(());
}
let server = start_mock_server().await;
let call_id = "python-timeout-alias";
let args = json!({
"code": "import time; time.sleep(2)",
"timeout": 50,
});
let mock = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "python", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
],
)
.await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::PythonTool);
});
let test = builder.build(&server).await?;
test.submit_turn("run slow python with timeout alias")
.await?;
let output = mock
.function_call_output_text(call_id)
.context("python output present")?;
if let Ok(output_json) = parse_structured_tool_output(&output) {
assert_eq!(output_json["metadata"]["exit_code"].as_i64(), Some(124));
} else {
assert_regex_match(r"(?is)^execution error:.*signal.*$", &output);
}
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn python_tool_surfaces_sandbox_denial_output() -> Result<()> {
skip_if_no_network!(Ok(()));
if skip_if_no_python3() {
return Ok(());
}
let server = start_mock_server().await;
let call_id = "python-sandbox-denied";
let args = json!({
"code": "from pathlib import Path\nPath('sandbox-denied-python.txt').write_text('blocked')",
});
let mock = mount_sse_sequence(
&server,
vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(call_id, "python", &serde_json::to_string(&args)?),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-2"),
]),
],
)
.await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::PythonTool);
});
let test = builder.build(&server).await?;
test.submit_turn_with_policy("run denied python", SandboxPolicy::ReadOnly)
.await?;
let output = mock
.function_call_output_text(call_id)
.context("python output present")?;
let output_json = parse_structured_tool_output(&output)?;
let exit_code = output_json["metadata"]["exit_code"]
.as_i64()
.context("exit code should exist")?;
assert_ne!(exit_code, 0);
let body = output_json["output"]
.as_str()
.unwrap_or_default()
.to_lowercase();
assert!(
body.contains("permission denied")
|| body.contains("operation not permitted")
|| body.contains("read-only file system"),
"expected sandbox denial details in output: {output_json}"
);
Ok(())
}