mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
Compare commits
24 Commits
dev/cc/add
...
capture-sh
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ef89fb780f | ||
|
|
829b0f5d1a | ||
|
|
4e8b5e9cff | ||
|
|
ce94c79d4c | ||
|
|
73ea8c44ba | ||
|
|
cf3bf993b6 | ||
|
|
3f52be0153 | ||
|
|
69930d2f55 | ||
|
|
ff94c94370 | ||
|
|
d079cc1778 | ||
|
|
f3b89ce11e | ||
|
|
016f8ba2db | ||
|
|
f651e72ef4 | ||
|
|
09febdcd74 | ||
|
|
ac7a556534 | ||
|
|
3384319409 | ||
|
|
4ec9ef35e0 | ||
|
|
a0909ea1a9 | ||
|
|
282696e625 | ||
|
|
c9abdd043b | ||
|
|
fcde109c7f | ||
|
|
a8d69094ad | ||
|
|
bc6be70e60 | ||
|
|
c84baeef1f |
2
codex-rs/Cargo.lock
generated
2
codex-rs/Cargo.lock
generated
@@ -1024,6 +1024,7 @@ dependencies = [
|
||||
"shlex",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"toml 0.9.5",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
@@ -1531,6 +1532,7 @@ dependencies = [
|
||||
"seccompiler",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -42,6 +42,7 @@ tokio = { workspace = true, features = [
|
||||
"rt-multi-thread",
|
||||
"signal",
|
||||
] }
|
||||
tokio-util = { workspace = true }
|
||||
tracing = { workspace = true, features = ["log"] }
|
||||
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] }
|
||||
uuid = { workspace = true, features = ["serde", "v7"] }
|
||||
|
||||
@@ -140,6 +140,7 @@ use codex_core::config::edit::ConfigEditsBuilder;
|
||||
use codex_core::config::types::McpServerTransportConfig;
|
||||
use codex_core::default_client::get_codex_user_agent;
|
||||
use codex_core::error::CodexErr;
|
||||
use codex_core::exec::ExecExpiration;
|
||||
use codex_core::exec::ExecParams;
|
||||
use codex_core::exec_env::create_env;
|
||||
use codex_core::features::Feature;
|
||||
@@ -188,6 +189,7 @@ use tokio::select;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use toml::Value as TomlValue;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
@@ -1232,10 +1234,11 @@ impl CodexMessageProcessor {
|
||||
let timeout_ms = params
|
||||
.timeout_ms
|
||||
.and_then(|timeout_ms| u64::try_from(timeout_ms).ok());
|
||||
let cancellation_token = CancellationToken::new();
|
||||
let exec_params = ExecParams {
|
||||
command: params.command,
|
||||
cwd,
|
||||
expiration: timeout_ms.into(),
|
||||
expiration: ExecExpiration::from_timeout_ms(timeout_ms, cancellation_token),
|
||||
env,
|
||||
sandbox_permissions: SandboxPermissions::UseDefault,
|
||||
justification: None,
|
||||
|
||||
@@ -3359,6 +3359,7 @@ mod tests {
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use mcp_types::ContentBlock;
|
||||
use mcp_types::TextContent;
|
||||
@@ -4458,6 +4459,7 @@ mod tests {
|
||||
Arc::clone(&session),
|
||||
Arc::clone(&turn_context),
|
||||
tracker,
|
||||
CancellationToken::new(),
|
||||
call,
|
||||
)
|
||||
.await
|
||||
@@ -4577,6 +4579,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_escalated_permissions_when_policy_not_on_request() {
|
||||
use crate::exec::ExecExpiration;
|
||||
use crate::exec::ExecParams;
|
||||
use crate::protocol::AskForApproval;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
@@ -4607,7 +4610,7 @@ mod tests {
|
||||
]
|
||||
},
|
||||
cwd: turn_context.cwd.clone(),
|
||||
expiration: timeout_ms.into(),
|
||||
expiration: ExecExpiration::from_timeout_ms(Some(timeout_ms), CancellationToken::new()),
|
||||
env: HashMap::new(),
|
||||
sandbox_permissions,
|
||||
justification: Some("test".to_string()),
|
||||
@@ -4618,7 +4621,7 @@ mod tests {
|
||||
sandbox_permissions: SandboxPermissions::UseDefault,
|
||||
command: params.command.clone(),
|
||||
cwd: params.cwd.clone(),
|
||||
expiration: timeout_ms.into(),
|
||||
expiration: ExecExpiration::from_timeout_ms(Some(timeout_ms), CancellationToken::new()),
|
||||
env: HashMap::new(),
|
||||
justification: params.justification.clone(),
|
||||
arg0: None,
|
||||
@@ -4647,6 +4650,7 @@ mod tests {
|
||||
})
|
||||
.to_string(),
|
||||
},
|
||||
cancellation_token: CancellationToken::new(),
|
||||
})
|
||||
.await;
|
||||
|
||||
@@ -4684,6 +4688,7 @@ mod tests {
|
||||
})
|
||||
.to_string(),
|
||||
},
|
||||
cancellation_token: CancellationToken::new(),
|
||||
})
|
||||
.await;
|
||||
|
||||
@@ -4737,6 +4742,7 @@ mod tests {
|
||||
})
|
||||
.to_string(),
|
||||
},
|
||||
cancellation_token: CancellationToken::new(),
|
||||
})
|
||||
.await;
|
||||
|
||||
|
||||
@@ -35,6 +35,7 @@ use crate::text_encoding::bytes_to_string_smart;
|
||||
use codex_utils_pty::process_group::kill_child_process_group;
|
||||
|
||||
pub const DEFAULT_EXEC_COMMAND_TIMEOUT_MS: u64 = 10_000;
|
||||
pub(crate) const EXEC_ABORTED_MESSAGE: &str = "command aborted by user";
|
||||
|
||||
// Hardcode these since it does not seem worth including the libc crate just
|
||||
// for these.
|
||||
@@ -64,49 +65,78 @@ pub struct ExecParams {
|
||||
|
||||
/// Mechanism to terminate an exec invocation before it finishes naturally.
|
||||
#[derive(Debug)]
|
||||
pub enum ExecExpiration {
|
||||
Timeout(Duration),
|
||||
DefaultTimeout,
|
||||
Cancellation(CancellationToken),
|
||||
pub struct ExecExpiration {
|
||||
pub timeout: TimeoutSpec,
|
||||
pub cancellation: CancellationToken,
|
||||
}
|
||||
|
||||
impl From<Option<u64>> for ExecExpiration {
|
||||
fn from(timeout_ms: Option<u64>) -> Self {
|
||||
timeout_ms.map_or(ExecExpiration::DefaultTimeout, |timeout_ms| {
|
||||
ExecExpiration::Timeout(Duration::from_millis(timeout_ms))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<u64> for ExecExpiration {
|
||||
fn from(timeout_ms: u64) -> Self {
|
||||
ExecExpiration::Timeout(Duration::from_millis(timeout_ms))
|
||||
}
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum TimeoutSpec {
|
||||
Default,
|
||||
Explicit(Duration),
|
||||
None,
|
||||
}
|
||||
|
||||
impl ExecExpiration {
|
||||
async fn wait(self) {
|
||||
match self {
|
||||
ExecExpiration::Timeout(duration) => tokio::time::sleep(duration).await,
|
||||
ExecExpiration::DefaultTimeout => {
|
||||
tokio::time::sleep(Duration::from_millis(DEFAULT_EXEC_COMMAND_TIMEOUT_MS)).await
|
||||
}
|
||||
ExecExpiration::Cancellation(cancel) => {
|
||||
cancel.cancelled().await;
|
||||
pub fn new(timeout: TimeoutSpec, cancellation: CancellationToken) -> Self {
|
||||
Self {
|
||||
timeout,
|
||||
cancellation,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn default(cancellation: CancellationToken) -> Self {
|
||||
Self::new(TimeoutSpec::Default, cancellation)
|
||||
}
|
||||
|
||||
pub fn from_timeout(timeout: Duration, cancellation: CancellationToken) -> Self {
|
||||
Self::new(TimeoutSpec::Explicit(timeout), cancellation)
|
||||
}
|
||||
|
||||
pub fn from_timeout_ms(timeout_ms: Option<u64>, cancellation: CancellationToken) -> Self {
|
||||
let timeout = timeout_ms.map_or(TimeoutSpec::Default, |timeout_ms| {
|
||||
TimeoutSpec::Explicit(Duration::from_millis(timeout_ms))
|
||||
});
|
||||
Self::new(timeout, cancellation)
|
||||
}
|
||||
|
||||
pub fn cancel_only(cancellation: CancellationToken) -> Self {
|
||||
Self::new(TimeoutSpec::None, cancellation)
|
||||
}
|
||||
|
||||
async fn wait(self) -> ExecExpirationOutcome {
|
||||
match self.timeout {
|
||||
TimeoutSpec::Explicit(duration) => tokio::select! {
|
||||
_ = tokio::time::sleep(duration) => ExecExpirationOutcome::TimedOut,
|
||||
_ = self.cancellation.cancelled() => ExecExpirationOutcome::Cancelled,
|
||||
},
|
||||
TimeoutSpec::Default => tokio::select! {
|
||||
_ = tokio::time::sleep(Duration::from_millis(DEFAULT_EXEC_COMMAND_TIMEOUT_MS)) => ExecExpirationOutcome::TimedOut,
|
||||
_ = self.cancellation.cancelled() => ExecExpirationOutcome::Cancelled,
|
||||
},
|
||||
TimeoutSpec::None => {
|
||||
self.cancellation.cancelled().await;
|
||||
ExecExpirationOutcome::Cancelled
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// If ExecExpiration is a timeout, returns the timeout in milliseconds.
|
||||
pub(crate) fn timeout_ms(&self) -> Option<u64> {
|
||||
match self {
|
||||
ExecExpiration::Timeout(duration) => Some(duration.as_millis() as u64),
|
||||
ExecExpiration::DefaultTimeout => Some(DEFAULT_EXEC_COMMAND_TIMEOUT_MS),
|
||||
ExecExpiration::Cancellation(_) => None,
|
||||
match self.timeout {
|
||||
TimeoutSpec::Explicit(duration) => Some(duration.as_millis() as u64),
|
||||
TimeoutSpec::Default => Some(DEFAULT_EXEC_COMMAND_TIMEOUT_MS),
|
||||
TimeoutSpec::None => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
enum ExecExpirationOutcome {
|
||||
TimedOut,
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq)]
|
||||
pub enum SandboxType {
|
||||
None,
|
||||
@@ -234,8 +264,7 @@ async fn exec_windows_sandbox(
|
||||
expiration,
|
||||
..
|
||||
} = params;
|
||||
// TODO(iceweasel-oai): run_windows_sandbox_capture should support all
|
||||
// variants of ExecExpiration, not just timeout.
|
||||
// TODO(iceweasel-oai): run_windows_sandbox_capture should respect cancellation tokens.
|
||||
let timeout_ms = expiration.timeout_ms();
|
||||
|
||||
let policy_str = serde_json::to_string(sandbox_policy).map_err(|err| {
|
||||
@@ -494,6 +523,13 @@ fn append_all(dst: &mut Vec<u8>, src: &[u8]) {
|
||||
dst.extend_from_slice(src);
|
||||
}
|
||||
|
||||
pub(crate) fn append_abort_message(output: &mut Vec<u8>) {
|
||||
if !output.is_empty() && !output.ends_with(b"\n") {
|
||||
output.push(b'\n');
|
||||
}
|
||||
output.extend_from_slice(EXEC_ABORTED_MESSAGE.as_bytes());
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ExecToolCallOutput {
|
||||
pub exit_code: i32,
|
||||
@@ -599,20 +635,27 @@ async fn consume_truncated_output(
|
||||
Some(agg_tx.clone()),
|
||||
));
|
||||
|
||||
let (exit_status, timed_out) = tokio::select! {
|
||||
let (exit_status, timed_out, was_cancelled) = tokio::select! {
|
||||
status_result = child.wait() => {
|
||||
let exit_status = status_result?;
|
||||
(exit_status, false)
|
||||
(exit_status, false, false)
|
||||
}
|
||||
_ = expiration.wait() => {
|
||||
outcome = expiration.wait() => {
|
||||
kill_child_process_group(&mut child)?;
|
||||
child.start_kill()?;
|
||||
(synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + TIMEOUT_CODE), true)
|
||||
match outcome {
|
||||
ExecExpirationOutcome::TimedOut => {
|
||||
(synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + TIMEOUT_CODE), true, false)
|
||||
}
|
||||
ExecExpirationOutcome::Cancelled => {
|
||||
(synthetic_exit_status(0), false, true)
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = tokio::signal::ctrl_c() => {
|
||||
kill_child_process_group(&mut child)?;
|
||||
child.start_kill()?;
|
||||
(synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + SIGKILL_CODE), false)
|
||||
(synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + SIGKILL_CODE), false, false)
|
||||
}
|
||||
};
|
||||
|
||||
@@ -624,7 +667,7 @@ async fn consume_truncated_output(
|
||||
// That would cause the `read_capped` tasks to block on `read()`
|
||||
// indefinitely, effectively hanging the whole agent.
|
||||
|
||||
const IO_DRAIN_TIMEOUT_MS: u64 = 2_000; // 2 s should be plenty for local pipes
|
||||
const IO_DRAIN_TIMEOUT_MS: u64 = 10;
|
||||
|
||||
// We need mutable bindings so we can `abort()` them on timeout.
|
||||
use tokio::task::JoinHandle;
|
||||
@@ -669,6 +712,10 @@ async fn consume_truncated_output(
|
||||
while let Ok(chunk) = agg_rx.recv().await {
|
||||
append_all(&mut combined_buf, &chunk);
|
||||
}
|
||||
if was_cancelled {
|
||||
append_abort_message(&mut combined_buf);
|
||||
}
|
||||
|
||||
let aggregated_output = StreamOutput {
|
||||
text: combined_buf,
|
||||
truncated_after_lines: None,
|
||||
@@ -845,7 +892,7 @@ mod tests {
|
||||
let params = ExecParams {
|
||||
command,
|
||||
cwd: std::env::current_dir()?,
|
||||
expiration: 500.into(),
|
||||
expiration: ExecExpiration::from_timeout_ms(Some(500), CancellationToken::new()),
|
||||
env,
|
||||
sandbox_permissions: SandboxPermissions::UseDefault,
|
||||
justification: None,
|
||||
@@ -890,7 +937,7 @@ mod tests {
|
||||
let params = ExecParams {
|
||||
command,
|
||||
cwd: cwd.clone(),
|
||||
expiration: ExecExpiration::Cancellation(cancel_token),
|
||||
expiration: ExecExpiration::cancel_only(cancel_token),
|
||||
env,
|
||||
sandbox_permissions: SandboxPermissions::UseDefault,
|
||||
justification: None,
|
||||
@@ -900,20 +947,17 @@ mod tests {
|
||||
tokio::time::sleep(Duration::from_millis(1_000)).await;
|
||||
cancel_tx.cancel();
|
||||
});
|
||||
let result = process_exec_tool_call(
|
||||
let output = process_exec_tool_call(
|
||||
params,
|
||||
&SandboxPolicy::DangerFullAccess,
|
||||
cwd.as_path(),
|
||||
&None,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
let output = match result {
|
||||
Err(CodexErr::Sandbox(SandboxErr::Timeout { output })) => output,
|
||||
other => panic!("expected timeout error, got {other:?}"),
|
||||
};
|
||||
assert!(output.timed_out);
|
||||
assert_eq!(output.exit_code, EXEC_TIMEOUT_EXIT_CODE);
|
||||
.await?;
|
||||
assert!(!output.timed_out);
|
||||
assert_eq!(output.exit_code, 0);
|
||||
assert!(output.aggregated_output.text.contains(EXEC_ABORTED_MESSAGE));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -2,14 +2,13 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use codex_async_utils::CancelErr;
|
||||
use codex_async_utils::OrCancelExt;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::error;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::codex::TurnContext;
|
||||
use crate::exec::ExecExpiration;
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
use crate::exec::SandboxType;
|
||||
use crate::exec::StdoutStream;
|
||||
@@ -104,9 +103,12 @@ impl SessionTask for UserShellCommandTask {
|
||||
command: command.clone(),
|
||||
cwd: cwd.clone(),
|
||||
env: create_env(&turn_context.shell_environment_policy),
|
||||
// TODO(zhao-oai): Now that we have ExecExpiration::Cancellation, we
|
||||
// should use that instead of an "arbitrarily large" timeout here.
|
||||
expiration: USER_SHELL_TIMEOUT_MS.into(),
|
||||
// TODO(zhao-oai): consider whether the user shell should use a shorter
|
||||
// default timeout now that cancellation is wired through ExecExpiration.
|
||||
expiration: ExecExpiration::from_timeout_ms(
|
||||
Some(USER_SHELL_TIMEOUT_MS),
|
||||
cancellation_token.child_token(),
|
||||
),
|
||||
sandbox: SandboxType::None,
|
||||
sandbox_permissions: SandboxPermissions::UseDefault,
|
||||
justification: None,
|
||||
@@ -120,52 +122,10 @@ impl SessionTask for UserShellCommandTask {
|
||||
});
|
||||
|
||||
let sandbox_policy = SandboxPolicy::DangerFullAccess;
|
||||
let exec_result = execute_exec_env(exec_env, &sandbox_policy, stdout_stream)
|
||||
.or_cancel(&cancellation_token)
|
||||
.await;
|
||||
let exec_result = execute_exec_env(exec_env, &sandbox_policy, stdout_stream).await;
|
||||
|
||||
match exec_result {
|
||||
Err(CancelErr::Cancelled) => {
|
||||
let aborted_message = "command aborted by user".to_string();
|
||||
let exec_output = ExecToolCallOutput {
|
||||
exit_code: -1,
|
||||
stdout: StreamOutput::new(String::new()),
|
||||
stderr: StreamOutput::new(aborted_message.clone()),
|
||||
aggregated_output: StreamOutput::new(aborted_message.clone()),
|
||||
duration: Duration::ZERO,
|
||||
timed_out: false,
|
||||
};
|
||||
let output_items = [user_shell_command_record_item(
|
||||
&raw_command,
|
||||
&exec_output,
|
||||
&turn_context,
|
||||
)];
|
||||
session
|
||||
.record_conversation_items(turn_context.as_ref(), &output_items)
|
||||
.await;
|
||||
session
|
||||
.send_event(
|
||||
turn_context.as_ref(),
|
||||
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
|
||||
call_id,
|
||||
process_id: None,
|
||||
turn_id: turn_context.sub_id.clone(),
|
||||
command: command.clone(),
|
||||
cwd: cwd.clone(),
|
||||
parsed_cmd: parsed_cmd.clone(),
|
||||
source: ExecCommandSource::UserShell,
|
||||
interaction_input: None,
|
||||
stdout: String::new(),
|
||||
stderr: aborted_message.clone(),
|
||||
aggregated_output: aborted_message.clone(),
|
||||
exit_code: -1,
|
||||
duration: Duration::ZERO,
|
||||
formatted_output: aborted_message,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
Ok(Ok(output)) => {
|
||||
Ok(output) => {
|
||||
session
|
||||
.send_event(
|
||||
turn_context.as_ref(),
|
||||
@@ -200,7 +160,7 @@ impl SessionTask for UserShellCommandTask {
|
||||
.record_conversation_items(turn_context.as_ref(), &output_items)
|
||||
.await;
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
Err(err) => {
|
||||
error!("user shell command failed: {err:?}");
|
||||
let message = format!("execution error: {err:?}");
|
||||
let exec_output = ExecToolCallOutput {
|
||||
|
||||
@@ -13,6 +13,7 @@ use mcp_types::CallToolResult;
|
||||
use std::borrow::Cow;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
pub type SharedTurnDiffTracker = Arc<Mutex<TurnDiffTracker>>;
|
||||
|
||||
@@ -24,6 +25,7 @@ pub struct ToolInvocation {
|
||||
pub call_id: String,
|
||||
pub tool_name: String,
|
||||
pub payload: ToolPayload,
|
||||
pub cancellation_token: CancellationToken,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
|
||||
@@ -30,6 +30,7 @@ use async_trait::async_trait;
|
||||
use codex_apply_patch::ApplyPatchAction;
|
||||
use codex_apply_patch::ApplyPatchFileChange;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
pub struct ApplyPatchHandler;
|
||||
|
||||
@@ -84,6 +85,7 @@ impl ToolHandler for ApplyPatchHandler {
|
||||
call_id,
|
||||
tool_name,
|
||||
payload,
|
||||
cancellation_token,
|
||||
} = invocation;
|
||||
|
||||
let patch_input = match payload {
|
||||
@@ -143,6 +145,7 @@ impl ToolHandler for ApplyPatchHandler {
|
||||
turn: turn.as_ref(),
|
||||
call_id: call_id.clone(),
|
||||
tool_name: tool_name.to_string(),
|
||||
cancellation_token: cancellation_token.child_token(),
|
||||
};
|
||||
let out = orchestrator
|
||||
.run(&mut runtime, &req, &tool_ctx, &turn, turn.approval_policy)
|
||||
@@ -192,6 +195,7 @@ pub(crate) async fn intercept_apply_patch(
|
||||
tracker: Option<&SharedTurnDiffTracker>,
|
||||
call_id: &str,
|
||||
tool_name: &str,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Result<Option<ToolOutput>, FunctionCallError> {
|
||||
match codex_apply_patch::maybe_parse_apply_patch_verified(command, cwd) {
|
||||
codex_apply_patch::MaybeApplyPatchVerified::Body(changes) => {
|
||||
@@ -234,6 +238,7 @@ pub(crate) async fn intercept_apply_patch(
|
||||
turn,
|
||||
call_id: call_id.to_string(),
|
||||
tool_name: tool_name.to_string(),
|
||||
cancellation_token,
|
||||
};
|
||||
let out = orchestrator
|
||||
.run(&mut runtime, &req, &tool_ctx, turn, turn.approval_policy)
|
||||
|
||||
@@ -615,6 +615,7 @@ mod tests {
|
||||
use std::time::Duration;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::time::timeout;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
fn invocation(
|
||||
session: Arc<crate::codex::Session>,
|
||||
@@ -629,6 +630,7 @@ mod tests {
|
||||
call_id: "call-1".to_string(),
|
||||
tool_name: tool_name.to_string(),
|
||||
payload,
|
||||
cancellation_token: CancellationToken::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,8 +2,10 @@ use async_trait::async_trait;
|
||||
use codex_protocol::models::ShellCommandToolCallParams;
|
||||
use codex_protocol::models::ShellToolCallParams;
|
||||
use std::sync::Arc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::codex::TurnContext;
|
||||
use crate::exec::ExecExpiration;
|
||||
use crate::exec::ExecParams;
|
||||
use crate::exec_env::create_env;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
@@ -29,11 +31,15 @@ pub struct ShellHandler;
|
||||
pub struct ShellCommandHandler;
|
||||
|
||||
impl ShellHandler {
|
||||
fn to_exec_params(params: ShellToolCallParams, turn_context: &TurnContext) -> ExecParams {
|
||||
fn to_exec_params(
|
||||
params: ShellToolCallParams,
|
||||
turn_context: &TurnContext,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> ExecParams {
|
||||
ExecParams {
|
||||
command: params.command,
|
||||
cwd: turn_context.resolve_path(params.workdir.clone()),
|
||||
expiration: params.timeout_ms.into(),
|
||||
expiration: ExecExpiration::from_timeout_ms(params.timeout_ms, cancellation_token),
|
||||
env: create_env(&turn_context.shell_environment_policy),
|
||||
sandbox_permissions: params.sandbox_permissions.unwrap_or_default(),
|
||||
justification: params.justification,
|
||||
@@ -52,6 +58,7 @@ impl ShellCommandHandler {
|
||||
params: ShellCommandToolCallParams,
|
||||
session: &crate::codex::Session,
|
||||
turn_context: &TurnContext,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> ExecParams {
|
||||
let shell = session.user_shell();
|
||||
let command = Self::base_command(shell.as_ref(), ¶ms.command, params.login);
|
||||
@@ -59,7 +66,7 @@ impl ShellCommandHandler {
|
||||
ExecParams {
|
||||
command,
|
||||
cwd: turn_context.resolve_path(params.workdir.clone()),
|
||||
expiration: params.timeout_ms.into(),
|
||||
expiration: ExecExpiration::from_timeout_ms(params.timeout_ms, cancellation_token),
|
||||
env: create_env(&turn_context.shell_environment_policy),
|
||||
sandbox_permissions: params.sandbox_permissions.unwrap_or_default(),
|
||||
justification: params.justification,
|
||||
@@ -101,32 +108,29 @@ impl ToolHandler for ShellHandler {
|
||||
call_id,
|
||||
tool_name,
|
||||
payload,
|
||||
cancellation_token,
|
||||
} = invocation;
|
||||
|
||||
match payload {
|
||||
ToolPayload::Function { arguments } => {
|
||||
let params: ShellToolCallParams = parse_arguments(&arguments)?;
|
||||
let exec_params = Self::to_exec_params(params, turn.as_ref());
|
||||
let exec_params =
|
||||
Self::to_exec_params(params, turn.as_ref(), cancellation_token.child_token());
|
||||
Self::run_exec_like(
|
||||
tool_name.as_str(),
|
||||
exec_params,
|
||||
session,
|
||||
turn,
|
||||
tracker,
|
||||
call_id,
|
||||
ShellExecContext::new(session, turn, tracker, call_id, cancellation_token),
|
||||
false,
|
||||
)
|
||||
.await
|
||||
}
|
||||
ToolPayload::LocalShell { params } => {
|
||||
let exec_params = Self::to_exec_params(params, turn.as_ref());
|
||||
let exec_params =
|
||||
Self::to_exec_params(params, turn.as_ref(), cancellation_token.child_token());
|
||||
Self::run_exec_like(
|
||||
tool_name.as_str(),
|
||||
exec_params,
|
||||
session,
|
||||
turn,
|
||||
tracker,
|
||||
call_id,
|
||||
ShellExecContext::new(session, turn, tracker, call_id, cancellation_token),
|
||||
false,
|
||||
)
|
||||
.await
|
||||
@@ -170,6 +174,7 @@ impl ToolHandler for ShellCommandHandler {
|
||||
call_id,
|
||||
tool_name,
|
||||
payload,
|
||||
cancellation_token,
|
||||
} = invocation;
|
||||
|
||||
let ToolPayload::Function { arguments } = payload else {
|
||||
@@ -179,17 +184,45 @@ impl ToolHandler for ShellCommandHandler {
|
||||
};
|
||||
|
||||
let params: ShellCommandToolCallParams = parse_arguments(&arguments)?;
|
||||
let exec_params = Self::to_exec_params(params, session.as_ref(), turn.as_ref());
|
||||
let exec_params = Self::to_exec_params(
|
||||
params,
|
||||
session.as_ref(),
|
||||
turn.as_ref(),
|
||||
cancellation_token.child_token(),
|
||||
);
|
||||
ShellHandler::run_exec_like(
|
||||
tool_name.as_str(),
|
||||
exec_params,
|
||||
ShellExecContext::new(session, turn, tracker, call_id, cancellation_token),
|
||||
true,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
struct ShellExecContext {
|
||||
session: Arc<crate::codex::Session>,
|
||||
turn: Arc<TurnContext>,
|
||||
tracker: crate::tools::context::SharedTurnDiffTracker,
|
||||
call_id: String,
|
||||
cancellation_token: CancellationToken,
|
||||
}
|
||||
|
||||
impl ShellExecContext {
|
||||
fn new(
|
||||
session: Arc<crate::codex::Session>,
|
||||
turn: Arc<TurnContext>,
|
||||
tracker: crate::tools::context::SharedTurnDiffTracker,
|
||||
call_id: String,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Self {
|
||||
Self {
|
||||
session,
|
||||
turn,
|
||||
tracker,
|
||||
call_id,
|
||||
true,
|
||||
)
|
||||
.await
|
||||
cancellation_token,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -197,12 +230,16 @@ impl ShellHandler {
|
||||
async fn run_exec_like(
|
||||
tool_name: &str,
|
||||
exec_params: ExecParams,
|
||||
session: Arc<crate::codex::Session>,
|
||||
turn: Arc<TurnContext>,
|
||||
tracker: crate::tools::context::SharedTurnDiffTracker,
|
||||
call_id: String,
|
||||
ctx: ShellExecContext,
|
||||
freeform: bool,
|
||||
) -> Result<ToolOutput, FunctionCallError> {
|
||||
let ShellExecContext {
|
||||
session,
|
||||
turn,
|
||||
tracker,
|
||||
call_id,
|
||||
cancellation_token,
|
||||
} = ctx;
|
||||
// Approval policy guard for explicit escalation in non-OnRequest modes.
|
||||
if exec_params
|
||||
.sandbox_permissions
|
||||
@@ -212,9 +249,9 @@ impl ShellHandler {
|
||||
codex_protocol::protocol::AskForApproval::OnRequest
|
||||
)
|
||||
{
|
||||
let policy = turn.approval_policy;
|
||||
return Err(FunctionCallError::RespondToModel(format!(
|
||||
"approval policy is {policy:?}; reject command — you should not ask for escalated permissions if the approval policy is {policy:?}",
|
||||
policy = turn.approval_policy
|
||||
"approval policy is {policy:?}; reject command — you should not ask for escalated permissions if the approval policy is {policy:?}"
|
||||
)));
|
||||
}
|
||||
|
||||
@@ -228,6 +265,7 @@ impl ShellHandler {
|
||||
Some(&tracker),
|
||||
&call_id,
|
||||
tool_name,
|
||||
cancellation_token.child_token(),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
@@ -273,6 +311,7 @@ impl ShellHandler {
|
||||
turn: turn.as_ref(),
|
||||
call_id: call_id.clone(),
|
||||
tool_name: tool_name.to_string(),
|
||||
cancellation_token,
|
||||
};
|
||||
let out = orchestrator
|
||||
.run(&mut runtime, &req, &tool_ctx, &turn, turn.approval_policy)
|
||||
@@ -294,6 +333,7 @@ mod tests {
|
||||
|
||||
use codex_protocol::models::ShellCommandToolCallParams;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::codex::make_session_and_context;
|
||||
use crate::exec_env::create_env;
|
||||
@@ -377,7 +417,12 @@ mod tests {
|
||||
justification: justification.clone(),
|
||||
};
|
||||
|
||||
let exec_params = ShellCommandHandler::to_exec_params(params, &session, &turn_context);
|
||||
let exec_params = ShellCommandHandler::to_exec_params(
|
||||
params,
|
||||
&session,
|
||||
&turn_context,
|
||||
CancellationToken::new(),
|
||||
);
|
||||
|
||||
// ExecParams cannot derive Eq due to the CancellationToken field, so we manually compare the fields.
|
||||
assert_eq!(exec_params.command, expected_command);
|
||||
|
||||
@@ -107,6 +107,7 @@ impl ToolHandler for UnifiedExecHandler {
|
||||
call_id,
|
||||
tool_name,
|
||||
payload,
|
||||
cancellation_token,
|
||||
..
|
||||
} = invocation;
|
||||
|
||||
@@ -120,7 +121,12 @@ impl ToolHandler for UnifiedExecHandler {
|
||||
};
|
||||
|
||||
let manager: &UnifiedExecProcessManager = &session.services.unified_exec_manager;
|
||||
let context = UnifiedExecContext::new(session.clone(), turn.clone(), call_id.clone());
|
||||
let context = UnifiedExecContext::new(
|
||||
session.clone(),
|
||||
turn.clone(),
|
||||
call_id.clone(),
|
||||
cancellation_token,
|
||||
);
|
||||
|
||||
let response = match tool_name.as_str() {
|
||||
"exec_command" => {
|
||||
@@ -145,9 +151,9 @@ impl ToolHandler for UnifiedExecHandler {
|
||||
)
|
||||
{
|
||||
manager.release_process_id(&process_id).await;
|
||||
let policy = context.turn.approval_policy;
|
||||
return Err(FunctionCallError::RespondToModel(format!(
|
||||
"approval policy is {policy:?}; reject command — you cannot ask for escalated permissions if the approval policy is {policy:?}",
|
||||
policy = context.turn.approval_policy
|
||||
"approval policy is {policy:?}; reject command — you cannot ask for escalated permissions if the approval policy is {policy:?}"
|
||||
)));
|
||||
}
|
||||
|
||||
@@ -165,6 +171,7 @@ impl ToolHandler for UnifiedExecHandler {
|
||||
Some(&tracker),
|
||||
&context.call_id,
|
||||
tool_name.as_str(),
|
||||
context.cancellation_token.child_token(),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
@@ -194,12 +201,15 @@ impl ToolHandler for UnifiedExecHandler {
|
||||
"write_stdin" => {
|
||||
let args: WriteStdinArgs = parse_arguments(&arguments)?;
|
||||
let response = manager
|
||||
.write_stdin(WriteStdinRequest {
|
||||
process_id: &args.session_id.to_string(),
|
||||
input: &args.chars,
|
||||
yield_time_ms: args.yield_time_ms,
|
||||
max_output_tokens: args.max_output_tokens,
|
||||
})
|
||||
.write_stdin(
|
||||
WriteStdinRequest {
|
||||
process_id: &args.session_id.to_string(),
|
||||
input: &args.chars,
|
||||
yield_time_ms: args.yield_time_ms,
|
||||
max_output_tokens: args.max_output_tokens,
|
||||
},
|
||||
&context,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| {
|
||||
FunctionCallError::RespondToModel(format!("write_stdin failed: {err}"))
|
||||
|
||||
@@ -1,6 +1,4 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use tokio::sync::RwLock;
|
||||
use tokio_util::either::Either;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -14,10 +12,8 @@ use crate::codex::TurnContext;
|
||||
use crate::error::CodexErr;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::tools::context::SharedTurnDiffTracker;
|
||||
use crate::tools::context::ToolPayload;
|
||||
use crate::tools::router::ToolCall;
|
||||
use crate::tools::router::ToolRouter;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -58,8 +54,6 @@ impl ToolCallRuntime {
|
||||
let turn = Arc::clone(&self.turn_context);
|
||||
let tracker = Arc::clone(&self.tracker);
|
||||
let lock = Arc::clone(&self.parallel_execution);
|
||||
let started = Instant::now();
|
||||
|
||||
let dispatch_span = trace_span!(
|
||||
"dispatch_tool_call",
|
||||
otel.name = call.tool_name.as_str(),
|
||||
@@ -70,24 +64,36 @@ impl ToolCallRuntime {
|
||||
|
||||
let handle: AbortOnDropHandle<Result<ResponseInputItem, FunctionCallError>> =
|
||||
AbortOnDropHandle::new(tokio::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = cancellation_token.cancelled() => {
|
||||
let secs = started.elapsed().as_secs_f32().max(0.1);
|
||||
dispatch_span.record("aborted", true);
|
||||
Ok(Self::aborted_response(&call, secs))
|
||||
},
|
||||
res = async {
|
||||
let _guard = if supports_parallel {
|
||||
Either::Left(lock.read().await)
|
||||
} else {
|
||||
Either::Right(lock.write().await)
|
||||
};
|
||||
let dispatch_cancellation = cancellation_token.child_token();
|
||||
let mut dispatch_future = Box::pin(async {
|
||||
let _guard = if supports_parallel {
|
||||
Either::Left(lock.read().await)
|
||||
} else {
|
||||
Either::Right(lock.write().await)
|
||||
};
|
||||
|
||||
router
|
||||
.dispatch_tool_call(session, turn, tracker, call.clone())
|
||||
.instrument(dispatch_span.clone())
|
||||
.await
|
||||
} => res,
|
||||
router
|
||||
.dispatch_tool_call(
|
||||
session,
|
||||
turn,
|
||||
tracker,
|
||||
dispatch_cancellation,
|
||||
call.clone(),
|
||||
)
|
||||
.instrument(dispatch_span.clone())
|
||||
.await
|
||||
});
|
||||
|
||||
let outcome = tokio::select! {
|
||||
res = &mut dispatch_future => Some(res),
|
||||
_ = cancellation_token.cancelled() => None,
|
||||
};
|
||||
match outcome {
|
||||
Some(res) => res,
|
||||
None => {
|
||||
dispatch_span.record("aborted", true);
|
||||
dispatch_future.await
|
||||
}
|
||||
}
|
||||
}));
|
||||
|
||||
@@ -105,33 +111,4 @@ impl ToolCallRuntime {
|
||||
}
|
||||
}
|
||||
|
||||
impl ToolCallRuntime {
|
||||
fn aborted_response(call: &ToolCall, secs: f32) -> ResponseInputItem {
|
||||
match &call.payload {
|
||||
ToolPayload::Custom { .. } => ResponseInputItem::CustomToolCallOutput {
|
||||
call_id: call.call_id.clone(),
|
||||
output: Self::abort_message(call, secs),
|
||||
},
|
||||
ToolPayload::Mcp { .. } => ResponseInputItem::McpToolCallOutput {
|
||||
call_id: call.call_id.clone(),
|
||||
result: Err(Self::abort_message(call, secs)),
|
||||
},
|
||||
_ => ResponseInputItem::FunctionCallOutput {
|
||||
call_id: call.call_id.clone(),
|
||||
output: FunctionCallOutputPayload {
|
||||
content: Self::abort_message(call, secs),
|
||||
..Default::default()
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn abort_message(call: &ToolCall, secs: f32) -> String {
|
||||
match call.tool_name.as_str() {
|
||||
"shell" | "container.exec" | "local_shell" | "shell_command" | "unified_exec" => {
|
||||
format!("Wall time: {secs:.1} seconds\naborted by user")
|
||||
}
|
||||
_ => format!("aborted by user after {secs:.1}s"),
|
||||
}
|
||||
}
|
||||
}
|
||||
impl ToolCallRuntime {}
|
||||
|
||||
@@ -16,6 +16,7 @@ use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::models::ShellToolCallParams;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::instrument;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -132,6 +133,7 @@ impl ToolRouter {
|
||||
session: Arc<Session>,
|
||||
turn: Arc<TurnContext>,
|
||||
tracker: SharedTurnDiffTracker,
|
||||
cancellation_token: CancellationToken,
|
||||
call: ToolCall,
|
||||
) -> Result<ResponseInputItem, FunctionCallError> {
|
||||
let ToolCall {
|
||||
@@ -149,6 +151,7 @@ impl ToolRouter {
|
||||
call_id,
|
||||
tool_name,
|
||||
payload,
|
||||
cancellation_token,
|
||||
};
|
||||
|
||||
match self.registry.dispatch(invocation).await {
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
//! `codex --codex-run-as-apply-patch`, and runs under the current
|
||||
//! `SandboxAttempt` with a minimal environment.
|
||||
use crate::CODEX_APPLY_PATCH_ARG1;
|
||||
use crate::exec::ExecExpiration;
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
use crate::sandboxing::CommandSpec;
|
||||
use crate::sandboxing::SandboxPermissions;
|
||||
@@ -27,6 +28,7 @@ use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use futures::future::BoxFuture;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ApplyPatchRequest {
|
||||
@@ -46,7 +48,10 @@ impl ApplyPatchRuntime {
|
||||
Self
|
||||
}
|
||||
|
||||
fn build_command_spec(req: &ApplyPatchRequest) -> Result<CommandSpec, ToolError> {
|
||||
fn build_command_spec(
|
||||
req: &ApplyPatchRequest,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Result<CommandSpec, ToolError> {
|
||||
use std::env;
|
||||
let exe = if let Some(path) = &req.codex_exe {
|
||||
path.clone()
|
||||
@@ -59,7 +64,7 @@ impl ApplyPatchRuntime {
|
||||
program,
|
||||
args: vec![CODEX_APPLY_PATCH_ARG1.to_string(), req.action.patch.clone()],
|
||||
cwd: req.action.cwd.clone(),
|
||||
expiration: req.timeout_ms.into(),
|
||||
expiration: ExecExpiration::from_timeout_ms(req.timeout_ms, cancellation_token),
|
||||
// Run apply_patch with a minimal environment for determinism and to avoid leaks.
|
||||
env: HashMap::new(),
|
||||
sandbox_permissions: SandboxPermissions::UseDefault,
|
||||
@@ -149,7 +154,7 @@ impl ToolRuntime<ApplyPatchRequest, ExecToolCallOutput> for ApplyPatchRuntime {
|
||||
attempt: &SandboxAttempt<'_>,
|
||||
ctx: &ToolCtx<'_>,
|
||||
) -> Result<ExecToolCallOutput, ToolError> {
|
||||
let spec = Self::build_command_spec(req)?;
|
||||
let spec = Self::build_command_spec(req, ctx.cancellation_token.child_token())?;
|
||||
let env = attempt
|
||||
.env_for(spec)
|
||||
.map_err(|err| ToolError::Codex(err.into()))?;
|
||||
|
||||
@@ -4,6 +4,7 @@ Runtime: shell
|
||||
Executes shell requests under the orchestrator: asks for approval when needed,
|
||||
builds a CommandSpec, and runs it under the current SandboxAttempt.
|
||||
*/
|
||||
use crate::exec::ExecExpiration;
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
use crate::features::Feature;
|
||||
use crate::powershell::prefix_powershell_script_with_utf8;
|
||||
@@ -155,11 +156,13 @@ impl ToolRuntime<ShellRequest, ExecToolCallOutput> for ShellRuntime {
|
||||
command
|
||||
};
|
||||
|
||||
let expiration =
|
||||
ExecExpiration::from_timeout_ms(req.timeout_ms, ctx.cancellation_token.child_token());
|
||||
let spec = build_command_spec(
|
||||
&command,
|
||||
&req.cwd,
|
||||
&req.env,
|
||||
req.timeout_ms.into(),
|
||||
expiration,
|
||||
req.sandbox_permissions,
|
||||
req.justification.clone(),
|
||||
)?;
|
||||
|
||||
@@ -185,7 +185,7 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
|
||||
&command,
|
||||
&req.cwd,
|
||||
&req.env,
|
||||
ExecExpiration::DefaultTimeout,
|
||||
ExecExpiration::default(ctx.cancellation_token.child_token()),
|
||||
req.sandbox_permissions,
|
||||
req.justification.clone(),
|
||||
)
|
||||
|
||||
@@ -23,6 +23,7 @@ use std::path::Path;
|
||||
use futures::Future;
|
||||
use futures::future::BoxFuture;
|
||||
use serde::Serialize;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
#[derive(Clone, Default, Debug)]
|
||||
pub(crate) struct ApprovalStore {
|
||||
@@ -251,6 +252,7 @@ pub(crate) struct ToolCtx<'a> {
|
||||
pub turn: &'a TurnContext,
|
||||
pub call_id: String,
|
||||
pub tool_name: String,
|
||||
pub cancellation_token: CancellationToken,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -30,6 +30,7 @@ use std::time::Duration;
|
||||
use rand::Rng;
|
||||
use rand::rng;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
@@ -60,14 +61,21 @@ pub(crate) struct UnifiedExecContext {
|
||||
pub session: Arc<Session>,
|
||||
pub turn: Arc<TurnContext>,
|
||||
pub call_id: String,
|
||||
pub cancellation_token: CancellationToken,
|
||||
}
|
||||
|
||||
impl UnifiedExecContext {
|
||||
pub fn new(session: Arc<Session>, turn: Arc<TurnContext>, call_id: String) -> Self {
|
||||
pub fn new(
|
||||
session: Arc<Session>,
|
||||
turn: Arc<TurnContext>,
|
||||
call_id: String,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Self {
|
||||
Self {
|
||||
session,
|
||||
turn,
|
||||
call_id,
|
||||
cancellation_token,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -184,8 +192,12 @@ mod tests {
|
||||
cmd: &str,
|
||||
yield_time_ms: u64,
|
||||
) -> Result<UnifiedExecResponse, UnifiedExecError> {
|
||||
let context =
|
||||
UnifiedExecContext::new(Arc::clone(session), Arc::clone(turn), "call".to_string());
|
||||
let context = UnifiedExecContext::new(
|
||||
Arc::clone(session),
|
||||
Arc::clone(turn),
|
||||
"call".to_string(),
|
||||
CancellationToken::new(),
|
||||
);
|
||||
let process_id = session
|
||||
.services
|
||||
.unified_exec_manager
|
||||
@@ -213,19 +225,29 @@ mod tests {
|
||||
|
||||
async fn write_stdin(
|
||||
session: &Arc<Session>,
|
||||
turn: &Arc<TurnContext>,
|
||||
process_id: &str,
|
||||
input: &str,
|
||||
yield_time_ms: u64,
|
||||
) -> Result<UnifiedExecResponse, UnifiedExecError> {
|
||||
let context = UnifiedExecContext::new(
|
||||
Arc::clone(session),
|
||||
Arc::clone(turn),
|
||||
"call".to_string(),
|
||||
CancellationToken::new(),
|
||||
);
|
||||
session
|
||||
.services
|
||||
.unified_exec_manager
|
||||
.write_stdin(WriteStdinRequest {
|
||||
process_id,
|
||||
input,
|
||||
yield_time_ms,
|
||||
max_output_tokens: None,
|
||||
})
|
||||
.write_stdin(
|
||||
WriteStdinRequest {
|
||||
process_id,
|
||||
input,
|
||||
yield_time_ms,
|
||||
max_output_tokens: None,
|
||||
},
|
||||
&context,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -277,6 +299,7 @@ mod tests {
|
||||
|
||||
write_stdin(
|
||||
&session,
|
||||
&turn,
|
||||
process_id,
|
||||
"export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
|
||||
2_500,
|
||||
@@ -285,6 +308,7 @@ mod tests {
|
||||
|
||||
let out_2 = write_stdin(
|
||||
&session,
|
||||
&turn,
|
||||
process_id,
|
||||
"echo $CODEX_INTERACTIVE_SHELL_VAR\n",
|
||||
2_500,
|
||||
@@ -313,6 +337,7 @@ mod tests {
|
||||
|
||||
write_stdin(
|
||||
&session,
|
||||
&turn,
|
||||
session_a.as_str(),
|
||||
"export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
|
||||
2_500,
|
||||
@@ -333,6 +358,7 @@ mod tests {
|
||||
|
||||
let out_3 = write_stdin(
|
||||
&session,
|
||||
&turn,
|
||||
shell_a
|
||||
.process_id
|
||||
.as_ref()
|
||||
@@ -367,6 +393,7 @@ mod tests {
|
||||
|
||||
write_stdin(
|
||||
&session,
|
||||
&turn,
|
||||
process_id,
|
||||
format!("export CODEX_INTERACTIVE_SHELL_VAR={TEST_VAR_VALUE}\n").as_str(),
|
||||
2_500,
|
||||
@@ -375,6 +402,7 @@ mod tests {
|
||||
|
||||
let out_2 = write_stdin(
|
||||
&session,
|
||||
&turn,
|
||||
process_id,
|
||||
"sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n",
|
||||
10,
|
||||
@@ -387,7 +415,7 @@ mod tests {
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(7)).await;
|
||||
|
||||
let out_3 = write_stdin(&session, process_id, "", 100).await?;
|
||||
let out_3 = write_stdin(&session, &turn, process_id, "", 100).await?;
|
||||
|
||||
assert!(
|
||||
out_3.output.contains(TEST_VAR_VALUE),
|
||||
@@ -449,11 +477,11 @@ mod tests {
|
||||
.expect("expected process id")
|
||||
.as_str();
|
||||
|
||||
write_stdin(&session, process_id, "exit\n", 2_500).await?;
|
||||
write_stdin(&session, &turn, process_id, "exit\n", 2_500).await?;
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
|
||||
let err = write_stdin(&session, process_id, "", 100)
|
||||
let err = write_stdin(&session, &turn, process_id, "", 100)
|
||||
.await
|
||||
.expect_err("expected unknown process error");
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ use tokio::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::exec::append_abort_message;
|
||||
use crate::exec_env::create_env;
|
||||
use crate::protocol::ExecCommandSource;
|
||||
use crate::sandboxing::ExecEnv;
|
||||
@@ -77,6 +78,11 @@ struct PreparedProcessHandles {
|
||||
tty: bool,
|
||||
}
|
||||
|
||||
struct CollectedOutput {
|
||||
bytes: Vec<u8>,
|
||||
was_cancelled: bool,
|
||||
}
|
||||
|
||||
impl UnifiedExecProcessManager {
|
||||
pub(crate) async fn allocate_process_id(&self) -> String {
|
||||
loop {
|
||||
@@ -175,12 +181,17 @@ impl UnifiedExecProcessManager {
|
||||
&output_buffer,
|
||||
&output_notify,
|
||||
&cancellation_token,
|
||||
&context.cancellation_token,
|
||||
deadline,
|
||||
)
|
||||
.await;
|
||||
let wall_time = Instant::now().saturating_duration_since(start);
|
||||
|
||||
let text = String::from_utf8_lossy(&collected).to_string();
|
||||
let mut collected_bytes = collected.bytes;
|
||||
if collected.was_cancelled {
|
||||
append_abort_message(&mut collected_bytes);
|
||||
}
|
||||
let text = String::from_utf8_lossy(&collected_bytes).to_string();
|
||||
let output = formatted_truncate_text(&text, TruncationPolicy::Tokens(max_tokens));
|
||||
let exit_code = process.exit_code();
|
||||
let has_exited = process.has_exited() || exit_code.is_some();
|
||||
@@ -231,7 +242,7 @@ impl UnifiedExecProcessManager {
|
||||
chunk_id,
|
||||
wall_time,
|
||||
output,
|
||||
raw_output: collected,
|
||||
raw_output: collected_bytes,
|
||||
process_id: if has_exited {
|
||||
None
|
||||
} else {
|
||||
@@ -248,6 +259,7 @@ impl UnifiedExecProcessManager {
|
||||
pub(crate) async fn write_stdin(
|
||||
&self,
|
||||
request: WriteStdinRequest<'_>,
|
||||
context: &UnifiedExecContext,
|
||||
) -> Result<UnifiedExecResponse, UnifiedExecError> {
|
||||
let process_id = request.process_id.to_string();
|
||||
|
||||
@@ -287,12 +299,17 @@ impl UnifiedExecProcessManager {
|
||||
&output_buffer,
|
||||
&output_notify,
|
||||
&cancellation_token,
|
||||
&context.cancellation_token,
|
||||
deadline,
|
||||
)
|
||||
.await;
|
||||
let wall_time = Instant::now().saturating_duration_since(start);
|
||||
|
||||
let text = String::from_utf8_lossy(&collected).to_string();
|
||||
let mut collected_bytes = collected.bytes;
|
||||
if collected.was_cancelled {
|
||||
append_abort_message(&mut collected_bytes);
|
||||
}
|
||||
let text = String::from_utf8_lossy(&collected_bytes).to_string();
|
||||
let output = formatted_truncate_text(&text, TruncationPolicy::Tokens(max_tokens));
|
||||
let original_token_count = approx_token_count(&text);
|
||||
let chunk_id = generate_chunk_id();
|
||||
@@ -324,7 +341,7 @@ impl UnifiedExecProcessManager {
|
||||
chunk_id,
|
||||
wall_time,
|
||||
output,
|
||||
raw_output: collected,
|
||||
raw_output: collected_bytes,
|
||||
process_id,
|
||||
exit_code,
|
||||
original_token_count: Some(original_token_count),
|
||||
@@ -523,6 +540,7 @@ impl UnifiedExecProcessManager {
|
||||
turn: context.turn.as_ref(),
|
||||
call_id: context.call_id.clone(),
|
||||
tool_name: "exec_command".to_string(),
|
||||
cancellation_token: context.cancellation_token.child_token(),
|
||||
};
|
||||
orchestrator
|
||||
.run(
|
||||
@@ -536,16 +554,18 @@ impl UnifiedExecProcessManager {
|
||||
.map_err(|e| UnifiedExecError::create_process(format!("{e:?}")))
|
||||
}
|
||||
|
||||
pub(super) async fn collect_output_until_deadline(
|
||||
async fn collect_output_until_deadline(
|
||||
output_buffer: &OutputBuffer,
|
||||
output_notify: &Arc<Notify>,
|
||||
cancellation_token: &CancellationToken,
|
||||
exit_token: &CancellationToken,
|
||||
tool_cancel_token: &CancellationToken,
|
||||
deadline: Instant,
|
||||
) -> Vec<u8> {
|
||||
) -> CollectedOutput {
|
||||
const POST_EXIT_OUTPUT_GRACE: Duration = Duration::from_millis(50);
|
||||
|
||||
let mut collected: Vec<u8> = Vec::with_capacity(4096);
|
||||
let mut exit_signal_received = cancellation_token.is_cancelled();
|
||||
let mut exit_signal_received = exit_token.is_cancelled();
|
||||
let mut was_cancelled = tool_cancel_token.is_cancelled();
|
||||
loop {
|
||||
let drained_chunks: Vec<Vec<u8>>;
|
||||
let mut wait_for_output = None;
|
||||
@@ -558,14 +578,15 @@ impl UnifiedExecProcessManager {
|
||||
}
|
||||
|
||||
if drained_chunks.is_empty() {
|
||||
exit_signal_received |= cancellation_token.is_cancelled();
|
||||
exit_signal_received |= exit_token.is_cancelled();
|
||||
was_cancelled |= tool_cancel_token.is_cancelled();
|
||||
let remaining = deadline.saturating_duration_since(Instant::now());
|
||||
if remaining == Duration::ZERO {
|
||||
break;
|
||||
}
|
||||
|
||||
let notified = wait_for_output.unwrap_or_else(|| output_notify.notified());
|
||||
if exit_signal_received {
|
||||
if exit_signal_received || was_cancelled {
|
||||
let grace = remaining.min(POST_EXIT_OUTPUT_GRACE);
|
||||
if tokio::time::timeout(grace, notified).await.is_err() {
|
||||
break;
|
||||
@@ -574,11 +595,14 @@ impl UnifiedExecProcessManager {
|
||||
}
|
||||
|
||||
tokio::pin!(notified);
|
||||
let exit_notified = cancellation_token.cancelled();
|
||||
let exit_notified = exit_token.cancelled();
|
||||
let tool_notified = tool_cancel_token.cancelled();
|
||||
tokio::pin!(exit_notified);
|
||||
tokio::pin!(tool_notified);
|
||||
tokio::select! {
|
||||
_ = &mut notified => {}
|
||||
_ = &mut exit_notified => exit_signal_received = true,
|
||||
_ = &mut tool_notified => was_cancelled = true,
|
||||
_ = tokio::time::sleep(remaining) => break,
|
||||
}
|
||||
continue;
|
||||
@@ -588,13 +612,17 @@ impl UnifiedExecProcessManager {
|
||||
collected.extend_from_slice(&chunk);
|
||||
}
|
||||
|
||||
exit_signal_received |= cancellation_token.is_cancelled();
|
||||
exit_signal_received |= exit_token.is_cancelled();
|
||||
was_cancelled |= tool_cancel_token.is_cancelled();
|
||||
if Instant::now() >= deadline {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
collected
|
||||
CollectedOutput {
|
||||
bytes: collected,
|
||||
was_cancelled,
|
||||
}
|
||||
}
|
||||
|
||||
fn prune_processes_if_needed(store: &mut ProcessStore) -> bool {
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
use assert_matches::assert_matches;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_core::features::Feature;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::assert_regex_match;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_function_call;
|
||||
use core_test_support::responses::ev_local_shell_call;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
@@ -14,8 +13,106 @@ use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use regex_lite::Regex;
|
||||
use serde_json::json;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[cfg(windows)]
|
||||
const POWERSHELL_PARTIAL_OUTPUT_SCRIPT: &str = r#"Start-Sleep -Milliseconds 200; [Console]::Out.WriteLine('partial output'); [Console]::Out.Flush(); Start-Sleep -Seconds 60"#;
|
||||
#[cfg(not(windows))]
|
||||
const POWERSHELL_PARTIAL_OUTPUT_SCRIPT: &str = "";
|
||||
|
||||
fn long_running_exec_command_with_output() -> (String, Option<&'static str>) {
|
||||
if cfg!(windows) {
|
||||
(
|
||||
POWERSHELL_PARTIAL_OUTPUT_SCRIPT.to_string(),
|
||||
Some("powershell"),
|
||||
)
|
||||
} else {
|
||||
(
|
||||
"sleep 0.2; printf 'partial output\\n' >&2; sleep 60".to_string(),
|
||||
None,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
fn long_running_shell_command_with_output() -> Vec<String> {
|
||||
if cfg!(windows) {
|
||||
vec![
|
||||
"powershell.exe".to_string(),
|
||||
"-NoProfile".to_string(),
|
||||
"-Command".to_string(),
|
||||
POWERSHELL_PARTIAL_OUTPUT_SCRIPT.to_string(),
|
||||
]
|
||||
} else {
|
||||
vec![
|
||||
"/bin/sh".to_string(),
|
||||
"-c".to_string(),
|
||||
"sleep 0.2; printf 'partial output\\n' >&2; sleep 60".to_string(),
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
fn long_running_local_shell_call_with_output() -> Vec<&'static str> {
|
||||
if cfg!(windows) {
|
||||
vec![
|
||||
"powershell.exe",
|
||||
"-NoProfile",
|
||||
"-Command",
|
||||
POWERSHELL_PARTIAL_OUTPUT_SCRIPT,
|
||||
]
|
||||
} else {
|
||||
vec![
|
||||
"/bin/sh",
|
||||
"-c",
|
||||
"sleep 0.2; printf 'partial output\\n' >&2; sleep 60",
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
fn saw_partial_output(call_id: &str, event: &EventMsg) -> bool {
|
||||
let EventMsg::ExecCommandOutputDelta(delta) = event else {
|
||||
return false;
|
||||
};
|
||||
delta.call_id == call_id && String::from_utf8_lossy(&delta.chunk).contains("partial output")
|
||||
}
|
||||
|
||||
fn assert_aborted_output(output: &str) {
|
||||
let normalized_output = output.replace("\r\n", "\n").replace('\r', "\n");
|
||||
let normalized_output = normalized_output.trim_end_matches('\n');
|
||||
let expected_pattern = r"(?s)^Exit code: [0-9]+\nWall time: ([0-9]+(?:\.[0-9]+)?) seconds\nOutput:\npartial output\ncommand aborted by user$";
|
||||
let captures = assert_regex_match(expected_pattern, normalized_output);
|
||||
let secs: f32 = match captures.get(1) {
|
||||
Some(value) => match value.as_str().parse() {
|
||||
Ok(secs) => secs,
|
||||
Err(err) => panic!("failed to parse wall time seconds: {err}"),
|
||||
},
|
||||
None => panic!("aborted message with elapsed seconds"),
|
||||
};
|
||||
assert!(secs >= 0.0);
|
||||
}
|
||||
|
||||
fn assert_unified_exec_aborted_output(output: &str) {
|
||||
let normalized_output = output.replace("\r\n", "\n").replace('\r', "\n");
|
||||
let normalized_output = normalized_output.trim_end_matches('\n');
|
||||
let expected_pattern = concat!(
|
||||
r#"(?s)^(?:Total output lines: \d+\n\n)?"#,
|
||||
r#"(?:Chunk ID: [^\n]+\n)?"#,
|
||||
r#"Wall time: (-?[0-9]+(?:\.[0-9]+)?) seconds\n"#,
|
||||
r#"(?:Process exited with code -?\d+\n)?"#,
|
||||
r#"(?:Process running with session ID -?\d+\n)?"#,
|
||||
r#"(?:Original token count: \d+\n)?"#,
|
||||
r#"Output:\npartial output\ncommand aborted by user$"#,
|
||||
);
|
||||
let captures = assert_regex_match(expected_pattern, normalized_output);
|
||||
let secs: f64 = match captures.get(1) {
|
||||
Some(value) => match value.as_str().parse() {
|
||||
Ok(secs) => secs,
|
||||
Err(err) => panic!("failed to parse unified exec wall time seconds: {err}"),
|
||||
},
|
||||
None => panic!("unified exec aborted message with elapsed seconds"),
|
||||
};
|
||||
assert!(secs >= 0.0);
|
||||
}
|
||||
|
||||
/// Integration test: spawn a long‑running shell_command tool via a mocked Responses SSE
|
||||
/// function call, then interrupt the session and expect TurnAborted.
|
||||
@@ -70,7 +167,7 @@ async fn interrupt_long_running_tool_emits_turn_aborted() {
|
||||
/// responses server, and ensures the model receives the synthesized abort.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn interrupt_tool_records_history_entries() {
|
||||
let command = "sleep 60";
|
||||
let (command, _shell) = long_running_exec_command_with_output();
|
||||
let call_id = "call-history";
|
||||
|
||||
let args = json!({
|
||||
@@ -110,8 +207,7 @@ async fn interrupt_tool_records_history_entries() {
|
||||
.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await;
|
||||
|
||||
tokio::time::sleep(Duration::from_secs_f32(0.1)).await;
|
||||
wait_for_event(&codex, |ev| saw_partial_output(call_id, ev)).await;
|
||||
codex.submit(Op::Interrupt).await.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await;
|
||||
@@ -143,23 +239,209 @@ async fn interrupt_tool_records_history_entries() {
|
||||
let output = response_mock
|
||||
.function_call_output_text(call_id)
|
||||
.expect("missing function_call_output text");
|
||||
let re = Regex::new(r"^Wall time: ([0-9]+(?:\.[0-9])?) seconds\naborted by user$")
|
||||
.expect("compile regex");
|
||||
let captures = re.captures(&output);
|
||||
assert_matches!(
|
||||
captures.as_ref(),
|
||||
Some(caps) if caps.get(1).is_some(),
|
||||
"aborted message with elapsed seconds"
|
||||
);
|
||||
let secs: f32 = captures
|
||||
.expect("aborted message with elapsed seconds")
|
||||
.get(1)
|
||||
.unwrap()
|
||||
.as_str()
|
||||
.parse()
|
||||
.unwrap();
|
||||
assert!(
|
||||
secs >= 0.1,
|
||||
"expected at least one tenth of a second of elapsed time, got {secs}"
|
||||
);
|
||||
assert_aborted_output(&output);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn interrupt_shell_records_partial_output() {
|
||||
let command = long_running_shell_command_with_output();
|
||||
let call_id = "call-shell";
|
||||
|
||||
let args = json!({
|
||||
"command": command,
|
||||
"timeout_ms": 60_000
|
||||
})
|
||||
.to_string();
|
||||
let first_body = sse(vec![
|
||||
ev_response_created("resp-shell"),
|
||||
ev_function_call(call_id, "shell", &args),
|
||||
ev_completed("resp-shell"),
|
||||
]);
|
||||
let follow_up_body = sse(vec![
|
||||
ev_response_created("resp-shell-followup"),
|
||||
ev_completed("resp-shell-followup"),
|
||||
]);
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let response_mock = mount_sse_sequence(&server, vec![first_body, follow_up_body]).await;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_model("gpt-5.1")
|
||||
.build(&server)
|
||||
.await
|
||||
.unwrap();
|
||||
let codex = Arc::clone(&fixture.codex);
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "start shell".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await;
|
||||
wait_for_event(&codex, |ev| saw_partial_output(call_id, ev)).await;
|
||||
codex.submit(Op::Interrupt).await.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "follow up".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let output = response_mock
|
||||
.function_call_output_text(call_id)
|
||||
.expect("missing shell output");
|
||||
assert_aborted_output(&output);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn interrupt_local_shell_records_partial_output() {
|
||||
let command = long_running_local_shell_call_with_output();
|
||||
let call_id = "call-local-shell";
|
||||
|
||||
let first_body = sse(vec![
|
||||
ev_response_created("resp-local-shell"),
|
||||
ev_local_shell_call(call_id, "completed", command),
|
||||
ev_completed("resp-local-shell"),
|
||||
]);
|
||||
let follow_up_body = sse(vec![
|
||||
ev_response_created("resp-local-shell-followup"),
|
||||
ev_completed("resp-local-shell-followup"),
|
||||
]);
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let response_mock = mount_sse_sequence(&server, vec![first_body, follow_up_body]).await;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_model("gpt-5.1")
|
||||
.build(&server)
|
||||
.await
|
||||
.unwrap();
|
||||
let codex = Arc::clone(&fixture.codex);
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "start local shell".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await;
|
||||
wait_for_event(&codex, |ev| saw_partial_output(call_id, ev)).await;
|
||||
codex.submit(Op::Interrupt).await.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "follow up".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let output = response_mock
|
||||
.function_call_output_text(call_id)
|
||||
.expect("missing local shell output");
|
||||
assert_aborted_output(&output);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn interrupt_unified_exec_records_partial_output() {
|
||||
let call_id = "call-unified-exec";
|
||||
let (cmd, shell) = long_running_exec_command_with_output();
|
||||
let args = if let Some(shell) = shell {
|
||||
json!({
|
||||
"cmd": cmd,
|
||||
"shell": shell,
|
||||
"yield_time_ms": 60_000,
|
||||
})
|
||||
.to_string()
|
||||
} else {
|
||||
json!({
|
||||
"cmd": cmd,
|
||||
"yield_time_ms": 60_000,
|
||||
})
|
||||
.to_string()
|
||||
};
|
||||
let first_body = sse(vec![
|
||||
ev_response_created("resp-unified-exec"),
|
||||
ev_function_call(call_id, "exec_command", &args),
|
||||
ev_completed("resp-unified-exec"),
|
||||
]);
|
||||
let follow_up_body = sse(vec![
|
||||
ev_response_created("resp-unified-exec-followup"),
|
||||
ev_completed("resp-unified-exec-followup"),
|
||||
]);
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let response_mock = mount_sse_sequence(&server, vec![first_body, follow_up_body]).await;
|
||||
|
||||
let fixture = test_codex()
|
||||
.with_model("gpt-5.1")
|
||||
.with_config(|config| {
|
||||
config.features.enable(Feature::UnifiedExec);
|
||||
})
|
||||
.build(&server)
|
||||
.await
|
||||
.unwrap();
|
||||
let codex = Arc::clone(&fixture.codex);
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "start unified exec".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await;
|
||||
wait_for_event(&codex, |ev| saw_partial_output(call_id, ev)).await;
|
||||
codex.submit(Op::Interrupt).await.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "follow up".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let output = response_mock
|
||||
.function_call_output_text(call_id)
|
||||
.expect("missing exec_command output");
|
||||
assert_unified_exec_aborted_output(&output);
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
use std::collections::HashMap;
|
||||
use std::string::ToString;
|
||||
|
||||
use codex_core::exec::ExecExpiration;
|
||||
use codex_core::exec::ExecParams;
|
||||
use codex_core::exec::ExecToolCallOutput;
|
||||
use codex_core::exec::SandboxType;
|
||||
@@ -15,6 +16,7 @@ use tempfile::TempDir;
|
||||
use codex_core::error::Result;
|
||||
|
||||
use codex_core::get_platform_sandbox;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
fn skip_test() -> bool {
|
||||
if std::env::var(CODEX_SANDBOX_ENV_VAR) == Ok("seatbelt".to_string()) {
|
||||
@@ -33,7 +35,7 @@ async fn run_test_cmd(tmp: TempDir, cmd: Vec<&str>) -> Result<ExecToolCallOutput
|
||||
let params = ExecParams {
|
||||
command: cmd.iter().map(ToString::to_string).collect(),
|
||||
cwd: tmp.path().to_path_buf(),
|
||||
expiration: 1000.into(),
|
||||
expiration: ExecExpiration::from_timeout_ms(Some(1000), CancellationToken::new()),
|
||||
env: HashMap::new(),
|
||||
sandbox_permissions: SandboxPermissions::UseDefault,
|
||||
justification: None,
|
||||
|
||||
@@ -84,7 +84,7 @@ impl EscalateServer {
|
||||
command,
|
||||
],
|
||||
cwd: PathBuf::from(&workdir),
|
||||
expiration: ExecExpiration::Cancellation(cancel_rx),
|
||||
expiration: ExecExpiration::cancel_only(cancel_rx),
|
||||
env,
|
||||
sandbox_permissions: SandboxPermissions::UseDefault,
|
||||
justification: None,
|
||||
|
||||
@@ -33,3 +33,4 @@ tokio = { workspace = true, features = [
|
||||
"rt-multi-thread",
|
||||
"signal",
|
||||
] }
|
||||
tokio-util = { workspace = true }
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
use codex_core::config::types::ShellEnvironmentPolicy;
|
||||
use codex_core::error::CodexErr;
|
||||
use codex_core::error::SandboxErr;
|
||||
use codex_core::exec::ExecExpiration;
|
||||
use codex_core::exec::ExecParams;
|
||||
use codex_core::exec::process_exec_tool_call;
|
||||
use codex_core::exec_env::create_env;
|
||||
@@ -13,6 +14,7 @@ use pretty_assertions::assert_eq;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use tempfile::NamedTempFile;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
// At least on GitHub CI, the arm64 tests appear to need longer timeouts.
|
||||
|
||||
@@ -57,7 +59,7 @@ async fn run_cmd_output(
|
||||
let params = ExecParams {
|
||||
command: cmd.iter().copied().map(str::to_owned).collect(),
|
||||
cwd,
|
||||
expiration: timeout_ms.into(),
|
||||
expiration: ExecExpiration::from_timeout_ms(Some(timeout_ms), CancellationToken::new()),
|
||||
env: create_env_from_core_vars(),
|
||||
sandbox_permissions: SandboxPermissions::UseDefault,
|
||||
justification: None,
|
||||
@@ -174,7 +176,10 @@ async fn assert_network_blocked(cmd: &[&str]) {
|
||||
cwd,
|
||||
// Give the tool a generous 2-second timeout so even slow DNS timeouts
|
||||
// do not stall the suite.
|
||||
expiration: NETWORK_TIMEOUT_MS.into(),
|
||||
expiration: ExecExpiration::from_timeout_ms(
|
||||
Some(NETWORK_TIMEOUT_MS),
|
||||
CancellationToken::new(),
|
||||
),
|
||||
env: create_env_from_core_vars(),
|
||||
sandbox_permissions: SandboxPermissions::UseDefault,
|
||||
justification: None,
|
||||
|
||||
@@ -137,7 +137,7 @@ fn otlp_http_exporter_sends_metrics_to_collector() -> Result<()> {
|
||||
let (tx, rx) = mpsc::channel::<Vec<CapturedRequest>>();
|
||||
let server = thread::spawn(move || {
|
||||
let mut captured = Vec::new();
|
||||
let deadline = Instant::now() + Duration::from_secs(3);
|
||||
let deadline = Instant::now() + Duration::from_secs(12);
|
||||
|
||||
while Instant::now() < deadline {
|
||||
match listener.accept() {
|
||||
@@ -150,6 +150,7 @@ fn otlp_http_exporter_sends_metrics_to_collector() -> Result<()> {
|
||||
content_type: headers.get("content-type").cloned(),
|
||||
body,
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
|
||||
Reference in New Issue
Block a user