Compare commits

...

24 Commits

Author SHA1 Message Date
Ahmed Ibrahim
ef89fb780f Merge branch 'main' into capture-shell-partial-output 2026-01-19 23:49:40 -08:00
Ahmed Ibrahim
829b0f5d1a fix 2026-01-19 09:28:19 -08:00
Ahmed Ibrahim
4e8b5e9cff Merge branch 'main' into capture-shell-partial-output 2026-01-19 09:13:35 -08:00
Ahmed Ibrahim
ce94c79d4c Update exec.rs 2026-01-13 01:20:03 -08:00
Ahmed Ibrahim
73ea8c44ba graceful 2026-01-13 00:36:55 -08:00
Ahmed Ibrahim
cf3bf993b6 Merge branch 'capture-shell-partial-output' of https://github.com/openai/codex into capture-shell-partial-output 2026-01-12 23:20:26 -08:00
Ahmed Ibrahim
3f52be0153 tests 2026-01-12 23:20:13 -08:00
Ahmed Ibrahim
69930d2f55 Fix collab test tool invocation 2026-01-12 23:03:28 -08:00
Ahmed Ibrahim
ff94c94370 Merge branch 'main' into capture-shell-partial-output 2026-01-12 20:32:49 -08:00
Ahmed Ibrahim
d079cc1778 childtoken and tests 2026-01-11 23:28:43 -08:00
Ahmed Ibrahim
f3b89ce11e childtoken and tests 2026-01-11 23:18:46 -08:00
Ahmed Ibrahim
016f8ba2db childtoken and tests 2026-01-11 22:52:46 -08:00
Ahmed Ibrahim
f651e72ef4 windows 2026-01-11 20:36:17 -08:00
Ahmed Ibrahim
09febdcd74 strong tests 2026-01-11 18:45:39 -08:00
Ahmed Ibrahim
ac7a556534 strong tests 2026-01-11 17:43:37 -08:00
Ahmed Ibrahim
3384319409 strong tests 2026-01-11 16:41:01 -08:00
Ahmed Ibrahim
4ec9ef35e0 strong tests 2026-01-11 16:30:43 -08:00
Ahmed Ibrahim
a0909ea1a9 strong tests 2026-01-11 16:13:27 -08:00
Ahmed Ibrahim
282696e625 fix 2026-01-11 15:55:19 -08:00
Ahmed Ibrahim
c9abdd043b fix 2026-01-11 15:24:20 -08:00
Ahmed Ibrahim
fcde109c7f fix 2026-01-11 15:22:00 -08:00
Ahmed Ibrahim
a8d69094ad unrelated 2026-01-11 14:12:30 -08:00
Ahmed Ibrahim
bc6be70e60 tests 2026-01-11 14:05:16 -08:00
Ahmed Ibrahim
c84baeef1f Capture partial output on shell aborts 2026-01-11 14:01:14 -08:00
25 changed files with 664 additions and 247 deletions

2
codex-rs/Cargo.lock generated
View File

@@ -1024,6 +1024,7 @@ dependencies = [
"shlex",
"tempfile",
"tokio",
"tokio-util",
"toml 0.9.5",
"tracing",
"tracing-subscriber",
@@ -1531,6 +1532,7 @@ dependencies = [
"seccompiler",
"tempfile",
"tokio",
"tokio-util",
]
[[package]]

View File

@@ -42,6 +42,7 @@ tokio = { workspace = true, features = [
"rt-multi-thread",
"signal",
] }
tokio-util = { workspace = true }
tracing = { workspace = true, features = ["log"] }
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] }
uuid = { workspace = true, features = ["serde", "v7"] }

View File

@@ -140,6 +140,7 @@ use codex_core::config::edit::ConfigEditsBuilder;
use codex_core::config::types::McpServerTransportConfig;
use codex_core::default_client::get_codex_user_agent;
use codex_core::error::CodexErr;
use codex_core::exec::ExecExpiration;
use codex_core::exec::ExecParams;
use codex_core::exec_env::create_env;
use codex_core::features::Feature;
@@ -188,6 +189,7 @@ use tokio::select;
use tokio::sync::Mutex;
use tokio::sync::broadcast;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use toml::Value as TomlValue;
use tracing::error;
use tracing::info;
@@ -1232,10 +1234,11 @@ impl CodexMessageProcessor {
let timeout_ms = params
.timeout_ms
.and_then(|timeout_ms| u64::try_from(timeout_ms).ok());
let cancellation_token = CancellationToken::new();
let exec_params = ExecParams {
command: params.command,
cwd,
expiration: timeout_ms.into(),
expiration: ExecExpiration::from_timeout_ms(timeout_ms, cancellation_token),
env,
sandbox_permissions: SandboxPermissions::UseDefault,
justification: None,

View File

@@ -3359,6 +3359,7 @@ mod tests {
use std::path::Path;
use std::time::Duration;
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use mcp_types::ContentBlock;
use mcp_types::TextContent;
@@ -4458,6 +4459,7 @@ mod tests {
Arc::clone(&session),
Arc::clone(&turn_context),
tracker,
CancellationToken::new(),
call,
)
.await
@@ -4577,6 +4579,7 @@ mod tests {
#[tokio::test]
async fn rejects_escalated_permissions_when_policy_not_on_request() {
use crate::exec::ExecExpiration;
use crate::exec::ExecParams;
use crate::protocol::AskForApproval;
use crate::protocol::SandboxPolicy;
@@ -4607,7 +4610,7 @@ mod tests {
]
},
cwd: turn_context.cwd.clone(),
expiration: timeout_ms.into(),
expiration: ExecExpiration::from_timeout_ms(Some(timeout_ms), CancellationToken::new()),
env: HashMap::new(),
sandbox_permissions,
justification: Some("test".to_string()),
@@ -4618,7 +4621,7 @@ mod tests {
sandbox_permissions: SandboxPermissions::UseDefault,
command: params.command.clone(),
cwd: params.cwd.clone(),
expiration: timeout_ms.into(),
expiration: ExecExpiration::from_timeout_ms(Some(timeout_ms), CancellationToken::new()),
env: HashMap::new(),
justification: params.justification.clone(),
arg0: None,
@@ -4647,6 +4650,7 @@ mod tests {
})
.to_string(),
},
cancellation_token: CancellationToken::new(),
})
.await;
@@ -4684,6 +4688,7 @@ mod tests {
})
.to_string(),
},
cancellation_token: CancellationToken::new(),
})
.await;
@@ -4737,6 +4742,7 @@ mod tests {
})
.to_string(),
},
cancellation_token: CancellationToken::new(),
})
.await;

View File

@@ -35,6 +35,7 @@ use crate::text_encoding::bytes_to_string_smart;
use codex_utils_pty::process_group::kill_child_process_group;
pub const DEFAULT_EXEC_COMMAND_TIMEOUT_MS: u64 = 10_000;
pub(crate) const EXEC_ABORTED_MESSAGE: &str = "command aborted by user";
// Hardcode these since it does not seem worth including the libc crate just
// for these.
@@ -64,49 +65,78 @@ pub struct ExecParams {
/// Mechanism to terminate an exec invocation before it finishes naturally.
#[derive(Debug)]
pub enum ExecExpiration {
Timeout(Duration),
DefaultTimeout,
Cancellation(CancellationToken),
pub struct ExecExpiration {
pub timeout: TimeoutSpec,
pub cancellation: CancellationToken,
}
impl From<Option<u64>> for ExecExpiration {
fn from(timeout_ms: Option<u64>) -> Self {
timeout_ms.map_or(ExecExpiration::DefaultTimeout, |timeout_ms| {
ExecExpiration::Timeout(Duration::from_millis(timeout_ms))
})
}
}
impl From<u64> for ExecExpiration {
fn from(timeout_ms: u64) -> Self {
ExecExpiration::Timeout(Duration::from_millis(timeout_ms))
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TimeoutSpec {
Default,
Explicit(Duration),
None,
}
impl ExecExpiration {
async fn wait(self) {
match self {
ExecExpiration::Timeout(duration) => tokio::time::sleep(duration).await,
ExecExpiration::DefaultTimeout => {
tokio::time::sleep(Duration::from_millis(DEFAULT_EXEC_COMMAND_TIMEOUT_MS)).await
}
ExecExpiration::Cancellation(cancel) => {
cancel.cancelled().await;
pub fn new(timeout: TimeoutSpec, cancellation: CancellationToken) -> Self {
Self {
timeout,
cancellation,
}
}
pub fn default(cancellation: CancellationToken) -> Self {
Self::new(TimeoutSpec::Default, cancellation)
}
pub fn from_timeout(timeout: Duration, cancellation: CancellationToken) -> Self {
Self::new(TimeoutSpec::Explicit(timeout), cancellation)
}
pub fn from_timeout_ms(timeout_ms: Option<u64>, cancellation: CancellationToken) -> Self {
let timeout = timeout_ms.map_or(TimeoutSpec::Default, |timeout_ms| {
TimeoutSpec::Explicit(Duration::from_millis(timeout_ms))
});
Self::new(timeout, cancellation)
}
pub fn cancel_only(cancellation: CancellationToken) -> Self {
Self::new(TimeoutSpec::None, cancellation)
}
async fn wait(self) -> ExecExpirationOutcome {
match self.timeout {
TimeoutSpec::Explicit(duration) => tokio::select! {
_ = tokio::time::sleep(duration) => ExecExpirationOutcome::TimedOut,
_ = self.cancellation.cancelled() => ExecExpirationOutcome::Cancelled,
},
TimeoutSpec::Default => tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(DEFAULT_EXEC_COMMAND_TIMEOUT_MS)) => ExecExpirationOutcome::TimedOut,
_ = self.cancellation.cancelled() => ExecExpirationOutcome::Cancelled,
},
TimeoutSpec::None => {
self.cancellation.cancelled().await;
ExecExpirationOutcome::Cancelled
}
}
}
/// If ExecExpiration is a timeout, returns the timeout in milliseconds.
pub(crate) fn timeout_ms(&self) -> Option<u64> {
match self {
ExecExpiration::Timeout(duration) => Some(duration.as_millis() as u64),
ExecExpiration::DefaultTimeout => Some(DEFAULT_EXEC_COMMAND_TIMEOUT_MS),
ExecExpiration::Cancellation(_) => None,
match self.timeout {
TimeoutSpec::Explicit(duration) => Some(duration.as_millis() as u64),
TimeoutSpec::Default => Some(DEFAULT_EXEC_COMMAND_TIMEOUT_MS),
TimeoutSpec::None => None,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
enum ExecExpirationOutcome {
TimedOut,
Cancelled,
}
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SandboxType {
None,
@@ -234,8 +264,7 @@ async fn exec_windows_sandbox(
expiration,
..
} = params;
// TODO(iceweasel-oai): run_windows_sandbox_capture should support all
// variants of ExecExpiration, not just timeout.
// TODO(iceweasel-oai): run_windows_sandbox_capture should respect cancellation tokens.
let timeout_ms = expiration.timeout_ms();
let policy_str = serde_json::to_string(sandbox_policy).map_err(|err| {
@@ -494,6 +523,13 @@ fn append_all(dst: &mut Vec<u8>, src: &[u8]) {
dst.extend_from_slice(src);
}
pub(crate) fn append_abort_message(output: &mut Vec<u8>) {
if !output.is_empty() && !output.ends_with(b"\n") {
output.push(b'\n');
}
output.extend_from_slice(EXEC_ABORTED_MESSAGE.as_bytes());
}
#[derive(Clone, Debug)]
pub struct ExecToolCallOutput {
pub exit_code: i32,
@@ -599,20 +635,27 @@ async fn consume_truncated_output(
Some(agg_tx.clone()),
));
let (exit_status, timed_out) = tokio::select! {
let (exit_status, timed_out, was_cancelled) = tokio::select! {
status_result = child.wait() => {
let exit_status = status_result?;
(exit_status, false)
(exit_status, false, false)
}
_ = expiration.wait() => {
outcome = expiration.wait() => {
kill_child_process_group(&mut child)?;
child.start_kill()?;
(synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + TIMEOUT_CODE), true)
match outcome {
ExecExpirationOutcome::TimedOut => {
(synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + TIMEOUT_CODE), true, false)
}
ExecExpirationOutcome::Cancelled => {
(synthetic_exit_status(0), false, true)
}
}
}
_ = tokio::signal::ctrl_c() => {
kill_child_process_group(&mut child)?;
child.start_kill()?;
(synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + SIGKILL_CODE), false)
(synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + SIGKILL_CODE), false, false)
}
};
@@ -624,7 +667,7 @@ async fn consume_truncated_output(
// That would cause the `read_capped` tasks to block on `read()`
// indefinitely, effectively hanging the whole agent.
const IO_DRAIN_TIMEOUT_MS: u64 = 2_000; // 2 s should be plenty for local pipes
const IO_DRAIN_TIMEOUT_MS: u64 = 10;
// We need mutable bindings so we can `abort()` them on timeout.
use tokio::task::JoinHandle;
@@ -669,6 +712,10 @@ async fn consume_truncated_output(
while let Ok(chunk) = agg_rx.recv().await {
append_all(&mut combined_buf, &chunk);
}
if was_cancelled {
append_abort_message(&mut combined_buf);
}
let aggregated_output = StreamOutput {
text: combined_buf,
truncated_after_lines: None,
@@ -845,7 +892,7 @@ mod tests {
let params = ExecParams {
command,
cwd: std::env::current_dir()?,
expiration: 500.into(),
expiration: ExecExpiration::from_timeout_ms(Some(500), CancellationToken::new()),
env,
sandbox_permissions: SandboxPermissions::UseDefault,
justification: None,
@@ -890,7 +937,7 @@ mod tests {
let params = ExecParams {
command,
cwd: cwd.clone(),
expiration: ExecExpiration::Cancellation(cancel_token),
expiration: ExecExpiration::cancel_only(cancel_token),
env,
sandbox_permissions: SandboxPermissions::UseDefault,
justification: None,
@@ -900,20 +947,17 @@ mod tests {
tokio::time::sleep(Duration::from_millis(1_000)).await;
cancel_tx.cancel();
});
let result = process_exec_tool_call(
let output = process_exec_tool_call(
params,
&SandboxPolicy::DangerFullAccess,
cwd.as_path(),
&None,
None,
)
.await;
let output = match result {
Err(CodexErr::Sandbox(SandboxErr::Timeout { output })) => output,
other => panic!("expected timeout error, got {other:?}"),
};
assert!(output.timed_out);
assert_eq!(output.exit_code, EXEC_TIMEOUT_EXIT_CODE);
.await?;
assert!(!output.timed_out);
assert_eq!(output.exit_code, 0);
assert!(output.aggregated_output.text.contains(EXEC_ABORTED_MESSAGE));
Ok(())
}

View File

@@ -2,14 +2,13 @@ use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use codex_async_utils::CancelErr;
use codex_async_utils::OrCancelExt;
use codex_protocol::user_input::UserInput;
use tokio_util::sync::CancellationToken;
use tracing::error;
use uuid::Uuid;
use crate::codex::TurnContext;
use crate::exec::ExecExpiration;
use crate::exec::ExecToolCallOutput;
use crate::exec::SandboxType;
use crate::exec::StdoutStream;
@@ -104,9 +103,12 @@ impl SessionTask for UserShellCommandTask {
command: command.clone(),
cwd: cwd.clone(),
env: create_env(&turn_context.shell_environment_policy),
// TODO(zhao-oai): Now that we have ExecExpiration::Cancellation, we
// should use that instead of an "arbitrarily large" timeout here.
expiration: USER_SHELL_TIMEOUT_MS.into(),
// TODO(zhao-oai): consider whether the user shell should use a shorter
// default timeout now that cancellation is wired through ExecExpiration.
expiration: ExecExpiration::from_timeout_ms(
Some(USER_SHELL_TIMEOUT_MS),
cancellation_token.child_token(),
),
sandbox: SandboxType::None,
sandbox_permissions: SandboxPermissions::UseDefault,
justification: None,
@@ -120,52 +122,10 @@ impl SessionTask for UserShellCommandTask {
});
let sandbox_policy = SandboxPolicy::DangerFullAccess;
let exec_result = execute_exec_env(exec_env, &sandbox_policy, stdout_stream)
.or_cancel(&cancellation_token)
.await;
let exec_result = execute_exec_env(exec_env, &sandbox_policy, stdout_stream).await;
match exec_result {
Err(CancelErr::Cancelled) => {
let aborted_message = "command aborted by user".to_string();
let exec_output = ExecToolCallOutput {
exit_code: -1,
stdout: StreamOutput::new(String::new()),
stderr: StreamOutput::new(aborted_message.clone()),
aggregated_output: StreamOutput::new(aborted_message.clone()),
duration: Duration::ZERO,
timed_out: false,
};
let output_items = [user_shell_command_record_item(
&raw_command,
&exec_output,
&turn_context,
)];
session
.record_conversation_items(turn_context.as_ref(), &output_items)
.await;
session
.send_event(
turn_context.as_ref(),
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
process_id: None,
turn_id: turn_context.sub_id.clone(),
command: command.clone(),
cwd: cwd.clone(),
parsed_cmd: parsed_cmd.clone(),
source: ExecCommandSource::UserShell,
interaction_input: None,
stdout: String::new(),
stderr: aborted_message.clone(),
aggregated_output: aborted_message.clone(),
exit_code: -1,
duration: Duration::ZERO,
formatted_output: aborted_message,
}),
)
.await;
}
Ok(Ok(output)) => {
Ok(output) => {
session
.send_event(
turn_context.as_ref(),
@@ -200,7 +160,7 @@ impl SessionTask for UserShellCommandTask {
.record_conversation_items(turn_context.as_ref(), &output_items)
.await;
}
Ok(Err(err)) => {
Err(err) => {
error!("user shell command failed: {err:?}");
let message = format!("execution error: {err:?}");
let exec_output = ExecToolCallOutput {

View File

@@ -13,6 +13,7 @@ use mcp_types::CallToolResult;
use std::borrow::Cow;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
pub type SharedTurnDiffTracker = Arc<Mutex<TurnDiffTracker>>;
@@ -24,6 +25,7 @@ pub struct ToolInvocation {
pub call_id: String,
pub tool_name: String,
pub payload: ToolPayload,
pub cancellation_token: CancellationToken,
}
#[derive(Clone, Debug)]

View File

@@ -30,6 +30,7 @@ use async_trait::async_trait;
use codex_apply_patch::ApplyPatchAction;
use codex_apply_patch::ApplyPatchFileChange;
use codex_utils_absolute_path::AbsolutePathBuf;
use tokio_util::sync::CancellationToken;
pub struct ApplyPatchHandler;
@@ -84,6 +85,7 @@ impl ToolHandler for ApplyPatchHandler {
call_id,
tool_name,
payload,
cancellation_token,
} = invocation;
let patch_input = match payload {
@@ -143,6 +145,7 @@ impl ToolHandler for ApplyPatchHandler {
turn: turn.as_ref(),
call_id: call_id.clone(),
tool_name: tool_name.to_string(),
cancellation_token: cancellation_token.child_token(),
};
let out = orchestrator
.run(&mut runtime, &req, &tool_ctx, &turn, turn.approval_policy)
@@ -192,6 +195,7 @@ pub(crate) async fn intercept_apply_patch(
tracker: Option<&SharedTurnDiffTracker>,
call_id: &str,
tool_name: &str,
cancellation_token: CancellationToken,
) -> Result<Option<ToolOutput>, FunctionCallError> {
match codex_apply_patch::maybe_parse_apply_patch_verified(command, cwd) {
codex_apply_patch::MaybeApplyPatchVerified::Body(changes) => {
@@ -234,6 +238,7 @@ pub(crate) async fn intercept_apply_patch(
turn,
call_id: call_id.to_string(),
tool_name: tool_name.to_string(),
cancellation_token,
};
let out = orchestrator
.run(&mut runtime, &req, &tool_ctx, turn, turn.approval_policy)

View File

@@ -615,6 +615,7 @@ mod tests {
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
fn invocation(
session: Arc<crate::codex::Session>,
@@ -629,6 +630,7 @@ mod tests {
call_id: "call-1".to_string(),
tool_name: tool_name.to_string(),
payload,
cancellation_token: CancellationToken::new(),
}
}

View File

@@ -2,8 +2,10 @@ use async_trait::async_trait;
use codex_protocol::models::ShellCommandToolCallParams;
use codex_protocol::models::ShellToolCallParams;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use crate::codex::TurnContext;
use crate::exec::ExecExpiration;
use crate::exec::ExecParams;
use crate::exec_env::create_env;
use crate::function_tool::FunctionCallError;
@@ -29,11 +31,15 @@ pub struct ShellHandler;
pub struct ShellCommandHandler;
impl ShellHandler {
fn to_exec_params(params: ShellToolCallParams, turn_context: &TurnContext) -> ExecParams {
fn to_exec_params(
params: ShellToolCallParams,
turn_context: &TurnContext,
cancellation_token: CancellationToken,
) -> ExecParams {
ExecParams {
command: params.command,
cwd: turn_context.resolve_path(params.workdir.clone()),
expiration: params.timeout_ms.into(),
expiration: ExecExpiration::from_timeout_ms(params.timeout_ms, cancellation_token),
env: create_env(&turn_context.shell_environment_policy),
sandbox_permissions: params.sandbox_permissions.unwrap_or_default(),
justification: params.justification,
@@ -52,6 +58,7 @@ impl ShellCommandHandler {
params: ShellCommandToolCallParams,
session: &crate::codex::Session,
turn_context: &TurnContext,
cancellation_token: CancellationToken,
) -> ExecParams {
let shell = session.user_shell();
let command = Self::base_command(shell.as_ref(), &params.command, params.login);
@@ -59,7 +66,7 @@ impl ShellCommandHandler {
ExecParams {
command,
cwd: turn_context.resolve_path(params.workdir.clone()),
expiration: params.timeout_ms.into(),
expiration: ExecExpiration::from_timeout_ms(params.timeout_ms, cancellation_token),
env: create_env(&turn_context.shell_environment_policy),
sandbox_permissions: params.sandbox_permissions.unwrap_or_default(),
justification: params.justification,
@@ -101,32 +108,29 @@ impl ToolHandler for ShellHandler {
call_id,
tool_name,
payload,
cancellation_token,
} = invocation;
match payload {
ToolPayload::Function { arguments } => {
let params: ShellToolCallParams = parse_arguments(&arguments)?;
let exec_params = Self::to_exec_params(params, turn.as_ref());
let exec_params =
Self::to_exec_params(params, turn.as_ref(), cancellation_token.child_token());
Self::run_exec_like(
tool_name.as_str(),
exec_params,
session,
turn,
tracker,
call_id,
ShellExecContext::new(session, turn, tracker, call_id, cancellation_token),
false,
)
.await
}
ToolPayload::LocalShell { params } => {
let exec_params = Self::to_exec_params(params, turn.as_ref());
let exec_params =
Self::to_exec_params(params, turn.as_ref(), cancellation_token.child_token());
Self::run_exec_like(
tool_name.as_str(),
exec_params,
session,
turn,
tracker,
call_id,
ShellExecContext::new(session, turn, tracker, call_id, cancellation_token),
false,
)
.await
@@ -170,6 +174,7 @@ impl ToolHandler for ShellCommandHandler {
call_id,
tool_name,
payload,
cancellation_token,
} = invocation;
let ToolPayload::Function { arguments } = payload else {
@@ -179,17 +184,45 @@ impl ToolHandler for ShellCommandHandler {
};
let params: ShellCommandToolCallParams = parse_arguments(&arguments)?;
let exec_params = Self::to_exec_params(params, session.as_ref(), turn.as_ref());
let exec_params = Self::to_exec_params(
params,
session.as_ref(),
turn.as_ref(),
cancellation_token.child_token(),
);
ShellHandler::run_exec_like(
tool_name.as_str(),
exec_params,
ShellExecContext::new(session, turn, tracker, call_id, cancellation_token),
true,
)
.await
}
}
struct ShellExecContext {
session: Arc<crate::codex::Session>,
turn: Arc<TurnContext>,
tracker: crate::tools::context::SharedTurnDiffTracker,
call_id: String,
cancellation_token: CancellationToken,
}
impl ShellExecContext {
fn new(
session: Arc<crate::codex::Session>,
turn: Arc<TurnContext>,
tracker: crate::tools::context::SharedTurnDiffTracker,
call_id: String,
cancellation_token: CancellationToken,
) -> Self {
Self {
session,
turn,
tracker,
call_id,
true,
)
.await
cancellation_token,
}
}
}
@@ -197,12 +230,16 @@ impl ShellHandler {
async fn run_exec_like(
tool_name: &str,
exec_params: ExecParams,
session: Arc<crate::codex::Session>,
turn: Arc<TurnContext>,
tracker: crate::tools::context::SharedTurnDiffTracker,
call_id: String,
ctx: ShellExecContext,
freeform: bool,
) -> Result<ToolOutput, FunctionCallError> {
let ShellExecContext {
session,
turn,
tracker,
call_id,
cancellation_token,
} = ctx;
// Approval policy guard for explicit escalation in non-OnRequest modes.
if exec_params
.sandbox_permissions
@@ -212,9 +249,9 @@ impl ShellHandler {
codex_protocol::protocol::AskForApproval::OnRequest
)
{
let policy = turn.approval_policy;
return Err(FunctionCallError::RespondToModel(format!(
"approval policy is {policy:?}; reject command — you should not ask for escalated permissions if the approval policy is {policy:?}",
policy = turn.approval_policy
"approval policy is {policy:?}; reject command — you should not ask for escalated permissions if the approval policy is {policy:?}"
)));
}
@@ -228,6 +265,7 @@ impl ShellHandler {
Some(&tracker),
&call_id,
tool_name,
cancellation_token.child_token(),
)
.await?
{
@@ -273,6 +311,7 @@ impl ShellHandler {
turn: turn.as_ref(),
call_id: call_id.clone(),
tool_name: tool_name.to_string(),
cancellation_token,
};
let out = orchestrator
.run(&mut runtime, &req, &tool_ctx, &turn, turn.approval_policy)
@@ -294,6 +333,7 @@ mod tests {
use codex_protocol::models::ShellCommandToolCallParams;
use pretty_assertions::assert_eq;
use tokio_util::sync::CancellationToken;
use crate::codex::make_session_and_context;
use crate::exec_env::create_env;
@@ -377,7 +417,12 @@ mod tests {
justification: justification.clone(),
};
let exec_params = ShellCommandHandler::to_exec_params(params, &session, &turn_context);
let exec_params = ShellCommandHandler::to_exec_params(
params,
&session,
&turn_context,
CancellationToken::new(),
);
// ExecParams cannot derive Eq due to the CancellationToken field, so we manually compare the fields.
assert_eq!(exec_params.command, expected_command);

View File

@@ -107,6 +107,7 @@ impl ToolHandler for UnifiedExecHandler {
call_id,
tool_name,
payload,
cancellation_token,
..
} = invocation;
@@ -120,7 +121,12 @@ impl ToolHandler for UnifiedExecHandler {
};
let manager: &UnifiedExecProcessManager = &session.services.unified_exec_manager;
let context = UnifiedExecContext::new(session.clone(), turn.clone(), call_id.clone());
let context = UnifiedExecContext::new(
session.clone(),
turn.clone(),
call_id.clone(),
cancellation_token,
);
let response = match tool_name.as_str() {
"exec_command" => {
@@ -145,9 +151,9 @@ impl ToolHandler for UnifiedExecHandler {
)
{
manager.release_process_id(&process_id).await;
let policy = context.turn.approval_policy;
return Err(FunctionCallError::RespondToModel(format!(
"approval policy is {policy:?}; reject command — you cannot ask for escalated permissions if the approval policy is {policy:?}",
policy = context.turn.approval_policy
"approval policy is {policy:?}; reject command — you cannot ask for escalated permissions if the approval policy is {policy:?}"
)));
}
@@ -165,6 +171,7 @@ impl ToolHandler for UnifiedExecHandler {
Some(&tracker),
&context.call_id,
tool_name.as_str(),
context.cancellation_token.child_token(),
)
.await?
{
@@ -194,12 +201,15 @@ impl ToolHandler for UnifiedExecHandler {
"write_stdin" => {
let args: WriteStdinArgs = parse_arguments(&arguments)?;
let response = manager
.write_stdin(WriteStdinRequest {
process_id: &args.session_id.to_string(),
input: &args.chars,
yield_time_ms: args.yield_time_ms,
max_output_tokens: args.max_output_tokens,
})
.write_stdin(
WriteStdinRequest {
process_id: &args.session_id.to_string(),
input: &args.chars,
yield_time_ms: args.yield_time_ms,
max_output_tokens: args.max_output_tokens,
},
&context,
)
.await
.map_err(|err| {
FunctionCallError::RespondToModel(format!("write_stdin failed: {err}"))

View File

@@ -1,6 +1,4 @@
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::RwLock;
use tokio_util::either::Either;
use tokio_util::sync::CancellationToken;
@@ -14,10 +12,8 @@ use crate::codex::TurnContext;
use crate::error::CodexErr;
use crate::function_tool::FunctionCallError;
use crate::tools::context::SharedTurnDiffTracker;
use crate::tools::context::ToolPayload;
use crate::tools::router::ToolCall;
use crate::tools::router::ToolRouter;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
#[derive(Clone)]
@@ -58,8 +54,6 @@ impl ToolCallRuntime {
let turn = Arc::clone(&self.turn_context);
let tracker = Arc::clone(&self.tracker);
let lock = Arc::clone(&self.parallel_execution);
let started = Instant::now();
let dispatch_span = trace_span!(
"dispatch_tool_call",
otel.name = call.tool_name.as_str(),
@@ -70,24 +64,36 @@ impl ToolCallRuntime {
let handle: AbortOnDropHandle<Result<ResponseInputItem, FunctionCallError>> =
AbortOnDropHandle::new(tokio::spawn(async move {
tokio::select! {
_ = cancellation_token.cancelled() => {
let secs = started.elapsed().as_secs_f32().max(0.1);
dispatch_span.record("aborted", true);
Ok(Self::aborted_response(&call, secs))
},
res = async {
let _guard = if supports_parallel {
Either::Left(lock.read().await)
} else {
Either::Right(lock.write().await)
};
let dispatch_cancellation = cancellation_token.child_token();
let mut dispatch_future = Box::pin(async {
let _guard = if supports_parallel {
Either::Left(lock.read().await)
} else {
Either::Right(lock.write().await)
};
router
.dispatch_tool_call(session, turn, tracker, call.clone())
.instrument(dispatch_span.clone())
.await
} => res,
router
.dispatch_tool_call(
session,
turn,
tracker,
dispatch_cancellation,
call.clone(),
)
.instrument(dispatch_span.clone())
.await
});
let outcome = tokio::select! {
res = &mut dispatch_future => Some(res),
_ = cancellation_token.cancelled() => None,
};
match outcome {
Some(res) => res,
None => {
dispatch_span.record("aborted", true);
dispatch_future.await
}
}
}));
@@ -105,33 +111,4 @@ impl ToolCallRuntime {
}
}
impl ToolCallRuntime {
fn aborted_response(call: &ToolCall, secs: f32) -> ResponseInputItem {
match &call.payload {
ToolPayload::Custom { .. } => ResponseInputItem::CustomToolCallOutput {
call_id: call.call_id.clone(),
output: Self::abort_message(call, secs),
},
ToolPayload::Mcp { .. } => ResponseInputItem::McpToolCallOutput {
call_id: call.call_id.clone(),
result: Err(Self::abort_message(call, secs)),
},
_ => ResponseInputItem::FunctionCallOutput {
call_id: call.call_id.clone(),
output: FunctionCallOutputPayload {
content: Self::abort_message(call, secs),
..Default::default()
},
},
}
}
fn abort_message(call: &ToolCall, secs: f32) -> String {
match call.tool_name.as_str() {
"shell" | "container.exec" | "local_shell" | "shell_command" | "unified_exec" => {
format!("Wall time: {secs:.1} seconds\naborted by user")
}
_ => format!("aborted by user after {secs:.1}s"),
}
}
}
impl ToolCallRuntime {}

View File

@@ -16,6 +16,7 @@ use codex_protocol::models::ResponseItem;
use codex_protocol::models::ShellToolCallParams;
use std::collections::HashMap;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::instrument;
#[derive(Clone, Debug)]
@@ -132,6 +133,7 @@ impl ToolRouter {
session: Arc<Session>,
turn: Arc<TurnContext>,
tracker: SharedTurnDiffTracker,
cancellation_token: CancellationToken,
call: ToolCall,
) -> Result<ResponseInputItem, FunctionCallError> {
let ToolCall {
@@ -149,6 +151,7 @@ impl ToolRouter {
call_id,
tool_name,
payload,
cancellation_token,
};
match self.registry.dispatch(invocation).await {

View File

@@ -5,6 +5,7 @@
//! `codex --codex-run-as-apply-patch`, and runs under the current
//! `SandboxAttempt` with a minimal environment.
use crate::CODEX_APPLY_PATCH_ARG1;
use crate::exec::ExecExpiration;
use crate::exec::ExecToolCallOutput;
use crate::sandboxing::CommandSpec;
use crate::sandboxing::SandboxPermissions;
@@ -27,6 +28,7 @@ use codex_utils_absolute_path::AbsolutePathBuf;
use futures::future::BoxFuture;
use std::collections::HashMap;
use std::path::PathBuf;
use tokio_util::sync::CancellationToken;
#[derive(Debug)]
pub struct ApplyPatchRequest {
@@ -46,7 +48,10 @@ impl ApplyPatchRuntime {
Self
}
fn build_command_spec(req: &ApplyPatchRequest) -> Result<CommandSpec, ToolError> {
fn build_command_spec(
req: &ApplyPatchRequest,
cancellation_token: CancellationToken,
) -> Result<CommandSpec, ToolError> {
use std::env;
let exe = if let Some(path) = &req.codex_exe {
path.clone()
@@ -59,7 +64,7 @@ impl ApplyPatchRuntime {
program,
args: vec![CODEX_APPLY_PATCH_ARG1.to_string(), req.action.patch.clone()],
cwd: req.action.cwd.clone(),
expiration: req.timeout_ms.into(),
expiration: ExecExpiration::from_timeout_ms(req.timeout_ms, cancellation_token),
// Run apply_patch with a minimal environment for determinism and to avoid leaks.
env: HashMap::new(),
sandbox_permissions: SandboxPermissions::UseDefault,
@@ -149,7 +154,7 @@ impl ToolRuntime<ApplyPatchRequest, ExecToolCallOutput> for ApplyPatchRuntime {
attempt: &SandboxAttempt<'_>,
ctx: &ToolCtx<'_>,
) -> Result<ExecToolCallOutput, ToolError> {
let spec = Self::build_command_spec(req)?;
let spec = Self::build_command_spec(req, ctx.cancellation_token.child_token())?;
let env = attempt
.env_for(spec)
.map_err(|err| ToolError::Codex(err.into()))?;

View File

@@ -4,6 +4,7 @@ Runtime: shell
Executes shell requests under the orchestrator: asks for approval when needed,
builds a CommandSpec, and runs it under the current SandboxAttempt.
*/
use crate::exec::ExecExpiration;
use crate::exec::ExecToolCallOutput;
use crate::features::Feature;
use crate::powershell::prefix_powershell_script_with_utf8;
@@ -155,11 +156,13 @@ impl ToolRuntime<ShellRequest, ExecToolCallOutput> for ShellRuntime {
command
};
let expiration =
ExecExpiration::from_timeout_ms(req.timeout_ms, ctx.cancellation_token.child_token());
let spec = build_command_spec(
&command,
&req.cwd,
&req.env,
req.timeout_ms.into(),
expiration,
req.sandbox_permissions,
req.justification.clone(),
)?;

View File

@@ -185,7 +185,7 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
&command,
&req.cwd,
&req.env,
ExecExpiration::DefaultTimeout,
ExecExpiration::default(ctx.cancellation_token.child_token()),
req.sandbox_permissions,
req.justification.clone(),
)

View File

@@ -23,6 +23,7 @@ use std::path::Path;
use futures::Future;
use futures::future::BoxFuture;
use serde::Serialize;
use tokio_util::sync::CancellationToken;
#[derive(Clone, Default, Debug)]
pub(crate) struct ApprovalStore {
@@ -251,6 +252,7 @@ pub(crate) struct ToolCtx<'a> {
pub turn: &'a TurnContext,
pub call_id: String,
pub tool_name: String,
pub cancellation_token: CancellationToken,
}
#[derive(Debug)]

View File

@@ -30,6 +30,7 @@ use std::time::Duration;
use rand::Rng;
use rand::rng;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use crate::codex::Session;
use crate::codex::TurnContext;
@@ -60,14 +61,21 @@ pub(crate) struct UnifiedExecContext {
pub session: Arc<Session>,
pub turn: Arc<TurnContext>,
pub call_id: String,
pub cancellation_token: CancellationToken,
}
impl UnifiedExecContext {
pub fn new(session: Arc<Session>, turn: Arc<TurnContext>, call_id: String) -> Self {
pub fn new(
session: Arc<Session>,
turn: Arc<TurnContext>,
call_id: String,
cancellation_token: CancellationToken,
) -> Self {
Self {
session,
turn,
call_id,
cancellation_token,
}
}
}
@@ -184,8 +192,12 @@ mod tests {
cmd: &str,
yield_time_ms: u64,
) -> Result<UnifiedExecResponse, UnifiedExecError> {
let context =
UnifiedExecContext::new(Arc::clone(session), Arc::clone(turn), "call".to_string());
let context = UnifiedExecContext::new(
Arc::clone(session),
Arc::clone(turn),
"call".to_string(),
CancellationToken::new(),
);
let process_id = session
.services
.unified_exec_manager
@@ -213,19 +225,29 @@ mod tests {
async fn write_stdin(
session: &Arc<Session>,
turn: &Arc<TurnContext>,
process_id: &str,
input: &str,
yield_time_ms: u64,
) -> Result<UnifiedExecResponse, UnifiedExecError> {
let context = UnifiedExecContext::new(
Arc::clone(session),
Arc::clone(turn),
"call".to_string(),
CancellationToken::new(),
);
session
.services
.unified_exec_manager
.write_stdin(WriteStdinRequest {
process_id,
input,
yield_time_ms,
max_output_tokens: None,
})
.write_stdin(
WriteStdinRequest {
process_id,
input,
yield_time_ms,
max_output_tokens: None,
},
&context,
)
.await
}
@@ -277,6 +299,7 @@ mod tests {
write_stdin(
&session,
&turn,
process_id,
"export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
2_500,
@@ -285,6 +308,7 @@ mod tests {
let out_2 = write_stdin(
&session,
&turn,
process_id,
"echo $CODEX_INTERACTIVE_SHELL_VAR\n",
2_500,
@@ -313,6 +337,7 @@ mod tests {
write_stdin(
&session,
&turn,
session_a.as_str(),
"export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
2_500,
@@ -333,6 +358,7 @@ mod tests {
let out_3 = write_stdin(
&session,
&turn,
shell_a
.process_id
.as_ref()
@@ -367,6 +393,7 @@ mod tests {
write_stdin(
&session,
&turn,
process_id,
format!("export CODEX_INTERACTIVE_SHELL_VAR={TEST_VAR_VALUE}\n").as_str(),
2_500,
@@ -375,6 +402,7 @@ mod tests {
let out_2 = write_stdin(
&session,
&turn,
process_id,
"sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n",
10,
@@ -387,7 +415,7 @@ mod tests {
tokio::time::sleep(Duration::from_secs(7)).await;
let out_3 = write_stdin(&session, process_id, "", 100).await?;
let out_3 = write_stdin(&session, &turn, process_id, "", 100).await?;
assert!(
out_3.output.contains(TEST_VAR_VALUE),
@@ -449,11 +477,11 @@ mod tests {
.expect("expected process id")
.as_str();
write_stdin(&session, process_id, "exit\n", 2_500).await?;
write_stdin(&session, &turn, process_id, "exit\n", 2_500).await?;
tokio::time::sleep(Duration::from_millis(200)).await;
let err = write_stdin(&session, process_id, "", 100)
let err = write_stdin(&session, &turn, process_id, "", 100)
.await
.expect_err("expected unknown process error");

View File

@@ -10,6 +10,7 @@ use tokio::time::Duration;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use crate::exec::append_abort_message;
use crate::exec_env::create_env;
use crate::protocol::ExecCommandSource;
use crate::sandboxing::ExecEnv;
@@ -77,6 +78,11 @@ struct PreparedProcessHandles {
tty: bool,
}
struct CollectedOutput {
bytes: Vec<u8>,
was_cancelled: bool,
}
impl UnifiedExecProcessManager {
pub(crate) async fn allocate_process_id(&self) -> String {
loop {
@@ -175,12 +181,17 @@ impl UnifiedExecProcessManager {
&output_buffer,
&output_notify,
&cancellation_token,
&context.cancellation_token,
deadline,
)
.await;
let wall_time = Instant::now().saturating_duration_since(start);
let text = String::from_utf8_lossy(&collected).to_string();
let mut collected_bytes = collected.bytes;
if collected.was_cancelled {
append_abort_message(&mut collected_bytes);
}
let text = String::from_utf8_lossy(&collected_bytes).to_string();
let output = formatted_truncate_text(&text, TruncationPolicy::Tokens(max_tokens));
let exit_code = process.exit_code();
let has_exited = process.has_exited() || exit_code.is_some();
@@ -231,7 +242,7 @@ impl UnifiedExecProcessManager {
chunk_id,
wall_time,
output,
raw_output: collected,
raw_output: collected_bytes,
process_id: if has_exited {
None
} else {
@@ -248,6 +259,7 @@ impl UnifiedExecProcessManager {
pub(crate) async fn write_stdin(
&self,
request: WriteStdinRequest<'_>,
context: &UnifiedExecContext,
) -> Result<UnifiedExecResponse, UnifiedExecError> {
let process_id = request.process_id.to_string();
@@ -287,12 +299,17 @@ impl UnifiedExecProcessManager {
&output_buffer,
&output_notify,
&cancellation_token,
&context.cancellation_token,
deadline,
)
.await;
let wall_time = Instant::now().saturating_duration_since(start);
let text = String::from_utf8_lossy(&collected).to_string();
let mut collected_bytes = collected.bytes;
if collected.was_cancelled {
append_abort_message(&mut collected_bytes);
}
let text = String::from_utf8_lossy(&collected_bytes).to_string();
let output = formatted_truncate_text(&text, TruncationPolicy::Tokens(max_tokens));
let original_token_count = approx_token_count(&text);
let chunk_id = generate_chunk_id();
@@ -324,7 +341,7 @@ impl UnifiedExecProcessManager {
chunk_id,
wall_time,
output,
raw_output: collected,
raw_output: collected_bytes,
process_id,
exit_code,
original_token_count: Some(original_token_count),
@@ -523,6 +540,7 @@ impl UnifiedExecProcessManager {
turn: context.turn.as_ref(),
call_id: context.call_id.clone(),
tool_name: "exec_command".to_string(),
cancellation_token: context.cancellation_token.child_token(),
};
orchestrator
.run(
@@ -536,16 +554,18 @@ impl UnifiedExecProcessManager {
.map_err(|e| UnifiedExecError::create_process(format!("{e:?}")))
}
pub(super) async fn collect_output_until_deadline(
async fn collect_output_until_deadline(
output_buffer: &OutputBuffer,
output_notify: &Arc<Notify>,
cancellation_token: &CancellationToken,
exit_token: &CancellationToken,
tool_cancel_token: &CancellationToken,
deadline: Instant,
) -> Vec<u8> {
) -> CollectedOutput {
const POST_EXIT_OUTPUT_GRACE: Duration = Duration::from_millis(50);
let mut collected: Vec<u8> = Vec::with_capacity(4096);
let mut exit_signal_received = cancellation_token.is_cancelled();
let mut exit_signal_received = exit_token.is_cancelled();
let mut was_cancelled = tool_cancel_token.is_cancelled();
loop {
let drained_chunks: Vec<Vec<u8>>;
let mut wait_for_output = None;
@@ -558,14 +578,15 @@ impl UnifiedExecProcessManager {
}
if drained_chunks.is_empty() {
exit_signal_received |= cancellation_token.is_cancelled();
exit_signal_received |= exit_token.is_cancelled();
was_cancelled |= tool_cancel_token.is_cancelled();
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining == Duration::ZERO {
break;
}
let notified = wait_for_output.unwrap_or_else(|| output_notify.notified());
if exit_signal_received {
if exit_signal_received || was_cancelled {
let grace = remaining.min(POST_EXIT_OUTPUT_GRACE);
if tokio::time::timeout(grace, notified).await.is_err() {
break;
@@ -574,11 +595,14 @@ impl UnifiedExecProcessManager {
}
tokio::pin!(notified);
let exit_notified = cancellation_token.cancelled();
let exit_notified = exit_token.cancelled();
let tool_notified = tool_cancel_token.cancelled();
tokio::pin!(exit_notified);
tokio::pin!(tool_notified);
tokio::select! {
_ = &mut notified => {}
_ = &mut exit_notified => exit_signal_received = true,
_ = &mut tool_notified => was_cancelled = true,
_ = tokio::time::sleep(remaining) => break,
}
continue;
@@ -588,13 +612,17 @@ impl UnifiedExecProcessManager {
collected.extend_from_slice(&chunk);
}
exit_signal_received |= cancellation_token.is_cancelled();
exit_signal_received |= exit_token.is_cancelled();
was_cancelled |= tool_cancel_token.is_cancelled();
if Instant::now() >= deadline {
break;
}
}
collected
CollectedOutput {
bytes: collected,
was_cancelled,
}
}
fn prune_processes_if_needed(store: &mut ProcessStore) -> bool {

View File

@@ -1,12 +1,11 @@
use assert_matches::assert_matches;
use std::sync::Arc;
use std::time::Duration;
use codex_core::features::Feature;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_protocol::user_input::UserInput;
use core_test_support::assert_regex_match;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_local_shell_call;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_sequence;
@@ -14,8 +13,106 @@ use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use regex_lite::Regex;
use serde_json::json;
use std::sync::Arc;
#[cfg(windows)]
const POWERSHELL_PARTIAL_OUTPUT_SCRIPT: &str = r#"Start-Sleep -Milliseconds 200; [Console]::Out.WriteLine('partial output'); [Console]::Out.Flush(); Start-Sleep -Seconds 60"#;
#[cfg(not(windows))]
const POWERSHELL_PARTIAL_OUTPUT_SCRIPT: &str = "";
fn long_running_exec_command_with_output() -> (String, Option<&'static str>) {
if cfg!(windows) {
(
POWERSHELL_PARTIAL_OUTPUT_SCRIPT.to_string(),
Some("powershell"),
)
} else {
(
"sleep 0.2; printf 'partial output\\n' >&2; sleep 60".to_string(),
None,
)
}
}
fn long_running_shell_command_with_output() -> Vec<String> {
if cfg!(windows) {
vec![
"powershell.exe".to_string(),
"-NoProfile".to_string(),
"-Command".to_string(),
POWERSHELL_PARTIAL_OUTPUT_SCRIPT.to_string(),
]
} else {
vec![
"/bin/sh".to_string(),
"-c".to_string(),
"sleep 0.2; printf 'partial output\\n' >&2; sleep 60".to_string(),
]
}
}
fn long_running_local_shell_call_with_output() -> Vec<&'static str> {
if cfg!(windows) {
vec![
"powershell.exe",
"-NoProfile",
"-Command",
POWERSHELL_PARTIAL_OUTPUT_SCRIPT,
]
} else {
vec![
"/bin/sh",
"-c",
"sleep 0.2; printf 'partial output\\n' >&2; sleep 60",
]
}
}
fn saw_partial_output(call_id: &str, event: &EventMsg) -> bool {
let EventMsg::ExecCommandOutputDelta(delta) = event else {
return false;
};
delta.call_id == call_id && String::from_utf8_lossy(&delta.chunk).contains("partial output")
}
fn assert_aborted_output(output: &str) {
let normalized_output = output.replace("\r\n", "\n").replace('\r', "\n");
let normalized_output = normalized_output.trim_end_matches('\n');
let expected_pattern = r"(?s)^Exit code: [0-9]+\nWall time: ([0-9]+(?:\.[0-9]+)?) seconds\nOutput:\npartial output\ncommand aborted by user$";
let captures = assert_regex_match(expected_pattern, normalized_output);
let secs: f32 = match captures.get(1) {
Some(value) => match value.as_str().parse() {
Ok(secs) => secs,
Err(err) => panic!("failed to parse wall time seconds: {err}"),
},
None => panic!("aborted message with elapsed seconds"),
};
assert!(secs >= 0.0);
}
fn assert_unified_exec_aborted_output(output: &str) {
let normalized_output = output.replace("\r\n", "\n").replace('\r', "\n");
let normalized_output = normalized_output.trim_end_matches('\n');
let expected_pattern = concat!(
r#"(?s)^(?:Total output lines: \d+\n\n)?"#,
r#"(?:Chunk ID: [^\n]+\n)?"#,
r#"Wall time: (-?[0-9]+(?:\.[0-9]+)?) seconds\n"#,
r#"(?:Process exited with code -?\d+\n)?"#,
r#"(?:Process running with session ID -?\d+\n)?"#,
r#"(?:Original token count: \d+\n)?"#,
r#"Output:\npartial output\ncommand aborted by user$"#,
);
let captures = assert_regex_match(expected_pattern, normalized_output);
let secs: f64 = match captures.get(1) {
Some(value) => match value.as_str().parse() {
Ok(secs) => secs,
Err(err) => panic!("failed to parse unified exec wall time seconds: {err}"),
},
None => panic!("unified exec aborted message with elapsed seconds"),
};
assert!(secs >= 0.0);
}
/// Integration test: spawn a longrunning shell_command tool via a mocked Responses SSE
/// function call, then interrupt the session and expect TurnAborted.
@@ -70,7 +167,7 @@ async fn interrupt_long_running_tool_emits_turn_aborted() {
/// responses server, and ensures the model receives the synthesized abort.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn interrupt_tool_records_history_entries() {
let command = "sleep 60";
let (command, _shell) = long_running_exec_command_with_output();
let call_id = "call-history";
let args = json!({
@@ -110,8 +207,7 @@ async fn interrupt_tool_records_history_entries() {
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await;
tokio::time::sleep(Duration::from_secs_f32(0.1)).await;
wait_for_event(&codex, |ev| saw_partial_output(call_id, ev)).await;
codex.submit(Op::Interrupt).await.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await;
@@ -143,23 +239,209 @@ async fn interrupt_tool_records_history_entries() {
let output = response_mock
.function_call_output_text(call_id)
.expect("missing function_call_output text");
let re = Regex::new(r"^Wall time: ([0-9]+(?:\.[0-9])?) seconds\naborted by user$")
.expect("compile regex");
let captures = re.captures(&output);
assert_matches!(
captures.as_ref(),
Some(caps) if caps.get(1).is_some(),
"aborted message with elapsed seconds"
);
let secs: f32 = captures
.expect("aborted message with elapsed seconds")
.get(1)
.unwrap()
.as_str()
.parse()
.unwrap();
assert!(
secs >= 0.1,
"expected at least one tenth of a second of elapsed time, got {secs}"
);
assert_aborted_output(&output);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn interrupt_shell_records_partial_output() {
let command = long_running_shell_command_with_output();
let call_id = "call-shell";
let args = json!({
"command": command,
"timeout_ms": 60_000
})
.to_string();
let first_body = sse(vec![
ev_response_created("resp-shell"),
ev_function_call(call_id, "shell", &args),
ev_completed("resp-shell"),
]);
let follow_up_body = sse(vec![
ev_response_created("resp-shell-followup"),
ev_completed("resp-shell-followup"),
]);
let server = start_mock_server().await;
let response_mock = mount_sse_sequence(&server, vec![first_body, follow_up_body]).await;
let fixture = test_codex()
.with_model("gpt-5.1")
.build(&server)
.await
.unwrap();
let codex = Arc::clone(&fixture.codex);
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "start shell".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await;
wait_for_event(&codex, |ev| saw_partial_output(call_id, ev)).await;
codex.submit(Op::Interrupt).await.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "follow up".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let output = response_mock
.function_call_output_text(call_id)
.expect("missing shell output");
assert_aborted_output(&output);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn interrupt_local_shell_records_partial_output() {
let command = long_running_local_shell_call_with_output();
let call_id = "call-local-shell";
let first_body = sse(vec![
ev_response_created("resp-local-shell"),
ev_local_shell_call(call_id, "completed", command),
ev_completed("resp-local-shell"),
]);
let follow_up_body = sse(vec![
ev_response_created("resp-local-shell-followup"),
ev_completed("resp-local-shell-followup"),
]);
let server = start_mock_server().await;
let response_mock = mount_sse_sequence(&server, vec![first_body, follow_up_body]).await;
let fixture = test_codex()
.with_model("gpt-5.1")
.build(&server)
.await
.unwrap();
let codex = Arc::clone(&fixture.codex);
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "start local shell".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await;
wait_for_event(&codex, |ev| saw_partial_output(call_id, ev)).await;
codex.submit(Op::Interrupt).await.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "follow up".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let output = response_mock
.function_call_output_text(call_id)
.expect("missing local shell output");
assert_aborted_output(&output);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn interrupt_unified_exec_records_partial_output() {
let call_id = "call-unified-exec";
let (cmd, shell) = long_running_exec_command_with_output();
let args = if let Some(shell) = shell {
json!({
"cmd": cmd,
"shell": shell,
"yield_time_ms": 60_000,
})
.to_string()
} else {
json!({
"cmd": cmd,
"yield_time_ms": 60_000,
})
.to_string()
};
let first_body = sse(vec![
ev_response_created("resp-unified-exec"),
ev_function_call(call_id, "exec_command", &args),
ev_completed("resp-unified-exec"),
]);
let follow_up_body = sse(vec![
ev_response_created("resp-unified-exec-followup"),
ev_completed("resp-unified-exec-followup"),
]);
let server = start_mock_server().await;
let response_mock = mount_sse_sequence(&server, vec![first_body, follow_up_body]).await;
let fixture = test_codex()
.with_model("gpt-5.1")
.with_config(|config| {
config.features.enable(Feature::UnifiedExec);
})
.build(&server)
.await
.unwrap();
let codex = Arc::clone(&fixture.codex);
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "start unified exec".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await;
wait_for_event(&codex, |ev| saw_partial_output(call_id, ev)).await;
codex.submit(Op::Interrupt).await.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "follow up".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let output = response_mock
.function_call_output_text(call_id)
.expect("missing exec_command output");
assert_unified_exec_aborted_output(&output);
}

View File

@@ -3,6 +3,7 @@
use std::collections::HashMap;
use std::string::ToString;
use codex_core::exec::ExecExpiration;
use codex_core::exec::ExecParams;
use codex_core::exec::ExecToolCallOutput;
use codex_core::exec::SandboxType;
@@ -15,6 +16,7 @@ use tempfile::TempDir;
use codex_core::error::Result;
use codex_core::get_platform_sandbox;
use tokio_util::sync::CancellationToken;
fn skip_test() -> bool {
if std::env::var(CODEX_SANDBOX_ENV_VAR) == Ok("seatbelt".to_string()) {
@@ -33,7 +35,7 @@ async fn run_test_cmd(tmp: TempDir, cmd: Vec<&str>) -> Result<ExecToolCallOutput
let params = ExecParams {
command: cmd.iter().map(ToString::to_string).collect(),
cwd: tmp.path().to_path_buf(),
expiration: 1000.into(),
expiration: ExecExpiration::from_timeout_ms(Some(1000), CancellationToken::new()),
env: HashMap::new(),
sandbox_permissions: SandboxPermissions::UseDefault,
justification: None,

View File

@@ -84,7 +84,7 @@ impl EscalateServer {
command,
],
cwd: PathBuf::from(&workdir),
expiration: ExecExpiration::Cancellation(cancel_rx),
expiration: ExecExpiration::cancel_only(cancel_rx),
env,
sandbox_permissions: SandboxPermissions::UseDefault,
justification: None,

View File

@@ -33,3 +33,4 @@ tokio = { workspace = true, features = [
"rt-multi-thread",
"signal",
] }
tokio-util = { workspace = true }

View File

@@ -3,6 +3,7 @@
use codex_core::config::types::ShellEnvironmentPolicy;
use codex_core::error::CodexErr;
use codex_core::error::SandboxErr;
use codex_core::exec::ExecExpiration;
use codex_core::exec::ExecParams;
use codex_core::exec::process_exec_tool_call;
use codex_core::exec_env::create_env;
@@ -13,6 +14,7 @@ use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::path::PathBuf;
use tempfile::NamedTempFile;
use tokio_util::sync::CancellationToken;
// At least on GitHub CI, the arm64 tests appear to need longer timeouts.
@@ -57,7 +59,7 @@ async fn run_cmd_output(
let params = ExecParams {
command: cmd.iter().copied().map(str::to_owned).collect(),
cwd,
expiration: timeout_ms.into(),
expiration: ExecExpiration::from_timeout_ms(Some(timeout_ms), CancellationToken::new()),
env: create_env_from_core_vars(),
sandbox_permissions: SandboxPermissions::UseDefault,
justification: None,
@@ -174,7 +176,10 @@ async fn assert_network_blocked(cmd: &[&str]) {
cwd,
// Give the tool a generous 2-second timeout so even slow DNS timeouts
// do not stall the suite.
expiration: NETWORK_TIMEOUT_MS.into(),
expiration: ExecExpiration::from_timeout_ms(
Some(NETWORK_TIMEOUT_MS),
CancellationToken::new(),
),
env: create_env_from_core_vars(),
sandbox_permissions: SandboxPermissions::UseDefault,
justification: None,

View File

@@ -137,7 +137,7 @@ fn otlp_http_exporter_sends_metrics_to_collector() -> Result<()> {
let (tx, rx) = mpsc::channel::<Vec<CapturedRequest>>();
let server = thread::spawn(move || {
let mut captured = Vec::new();
let deadline = Instant::now() + Duration::from_secs(3);
let deadline = Instant::now() + Duration::from_secs(12);
while Instant::now() < deadline {
match listener.accept() {
@@ -150,6 +150,7 @@ fn otlp_http_exporter_sends_metrics_to_collector() -> Result<()> {
content_type: headers.get("content-type").cloned(),
body,
});
break;
}
}
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {