mirror of
https://github.com/openai/codex.git
synced 2026-03-21 14:13:56 +00:00
Compare commits
2 Commits
main
...
codex-exec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0ee08bc702 | ||
|
|
04190334dc |
@@ -34,7 +34,7 @@ use crate::tools::sandboxing::with_cached_approval;
|
||||
use crate::tools::spec::UnifiedExecShellMode;
|
||||
use crate::unified_exec::NoopSpawnLifecycle;
|
||||
use crate::unified_exec::UnifiedExecError;
|
||||
use crate::unified_exec::UnifiedExecProcess;
|
||||
use crate::unified_exec::ProcessBackend;
|
||||
use crate::unified_exec::UnifiedExecProcessManager;
|
||||
use codex_network_proxy::NetworkProxy;
|
||||
use codex_protocol::models::PermissionProfile;
|
||||
@@ -53,6 +53,8 @@ pub struct UnifiedExecRequest {
|
||||
pub tty: bool,
|
||||
pub sandbox_permissions: SandboxPermissions,
|
||||
pub additional_permissions: Option<PermissionProfile>,
|
||||
pub process_id: i32,
|
||||
pub use_exec_server: bool,
|
||||
#[cfg(unix)]
|
||||
pub additional_permissions_preapproved: bool,
|
||||
pub justification: Option<String>,
|
||||
@@ -71,6 +73,7 @@ pub struct UnifiedExecApprovalKey {
|
||||
pub struct UnifiedExecRuntime<'a> {
|
||||
manager: &'a UnifiedExecProcessManager,
|
||||
shell_mode: UnifiedExecShellMode,
|
||||
use_exec_server: bool,
|
||||
}
|
||||
|
||||
impl<'a> UnifiedExecRuntime<'a> {
|
||||
@@ -78,6 +81,18 @@ impl<'a> UnifiedExecRuntime<'a> {
|
||||
Self {
|
||||
manager,
|
||||
shell_mode,
|
||||
use_exec_server: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_exec_server(
|
||||
manager: &'a UnifiedExecProcessManager,
|
||||
shell_mode: UnifiedExecShellMode,
|
||||
) -> Self {
|
||||
Self {
|
||||
manager,
|
||||
shell_mode,
|
||||
use_exec_server: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -172,7 +187,7 @@ impl Approvable<UnifiedExecRequest> for UnifiedExecRuntime<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRuntime<'a> {
|
||||
impl<'a> ToolRuntime<UnifiedExecRequest, Arc<ProcessBackend>> for UnifiedExecRuntime<'a> {
|
||||
fn network_approval_spec(
|
||||
&self,
|
||||
req: &UnifiedExecRequest,
|
||||
@@ -190,7 +205,13 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
|
||||
req: &UnifiedExecRequest,
|
||||
attempt: &SandboxAttempt<'_>,
|
||||
ctx: &ToolCtx,
|
||||
) -> Result<UnifiedExecProcess, ToolError> {
|
||||
) -> Result<Arc<ProcessBackend>, ToolError> {
|
||||
let use_exec_server = if self.use_exec_server {
|
||||
true
|
||||
} else {
|
||||
req.use_exec_server
|
||||
};
|
||||
|
||||
let base_command = &req.command;
|
||||
let session_shell = ctx.session.user_shell();
|
||||
let command = maybe_wrap_shell_lc_with_snapshot(
|
||||
@@ -237,7 +258,10 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
|
||||
.manager
|
||||
.open_session_with_exec_env(
|
||||
&prepared.exec_request,
|
||||
req.process_id,
|
||||
req.tty,
|
||||
use_exec_server,
|
||||
Some(ctx.session.services.environment.get_executor()),
|
||||
prepared.spawn_lifecycle,
|
||||
)
|
||||
.await
|
||||
@@ -272,7 +296,18 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
|
||||
.env_for(spec, req.network.as_ref())
|
||||
.map_err(|err| ToolError::Codex(err.into()))?;
|
||||
self.manager
|
||||
.open_session_with_exec_env(&exec_env, req.tty, Box::new(NoopSpawnLifecycle))
|
||||
.open_session_with_exec_env(
|
||||
&exec_env,
|
||||
req.process_id,
|
||||
req.tty,
|
||||
use_exec_server,
|
||||
if use_exec_server {
|
||||
Some(ctx.session.services.environment.get_executor())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
Box::new(NoopSpawnLifecycle),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
UnifiedExecError::SandboxDenied { output, .. } => {
|
||||
|
||||
@@ -18,10 +18,12 @@ use crate::protocol::EventMsg;
|
||||
use crate::protocol::ExecCommandOutputDeltaEvent;
|
||||
use crate::protocol::ExecCommandSource;
|
||||
use crate::protocol::ExecOutputStream;
|
||||
use crate::unified_exec::process::OutputHandles;
|
||||
use crate::tools::events::ToolEmitter;
|
||||
use crate::tools::events::ToolEventCtx;
|
||||
use crate::tools::events::ToolEventStage;
|
||||
use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
|
||||
use crate::unified_exec::ProcessBackend;
|
||||
|
||||
pub(crate) const TRAILING_OUTPUT_GRACE: Duration = Duration::from_millis(100);
|
||||
|
||||
@@ -37,21 +39,81 @@ const UNIFIED_EXEC_OUTPUT_DELTA_MAX_BYTES: usize = 8192;
|
||||
/// shared transcript, and emits ExecCommandOutputDelta events on UTF‑8
|
||||
/// boundaries.
|
||||
pub(crate) fn start_streaming_output(
|
||||
process: &UnifiedExecProcess,
|
||||
process: Arc<ProcessBackend>,
|
||||
context: &UnifiedExecContext,
|
||||
transcript: Arc<Mutex<HeadTailBuffer>>,
|
||||
) {
|
||||
let mut receiver = process.output_receiver();
|
||||
let output_drained = process.output_drained_notify();
|
||||
let exit_token = process.cancellation_token();
|
||||
|
||||
let session_ref = Arc::clone(&context.session);
|
||||
let turn_ref = Arc::clone(&context.turn);
|
||||
let call_id = context.call_id.clone();
|
||||
let OutputHandles {
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_closed,
|
||||
output_closed_notify,
|
||||
cancellation_token,
|
||||
} = process.output_handles();
|
||||
|
||||
if let Some(local_process) = process.as_local_process() {
|
||||
let mut receiver = local_process.output_receiver();
|
||||
let output_drained = local_process.output_drained_notify();
|
||||
tokio::spawn(async move {
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
|
||||
let mut pending = Vec::<u8>::new();
|
||||
let mut emitted_deltas: usize = 0;
|
||||
|
||||
let mut grace_sleep: Option<Pin<Box<Sleep>>> = None;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancellation_token.cancelled(), if grace_sleep.is_none() => {
|
||||
let deadline = Instant::now() + TRAILING_OUTPUT_GRACE;
|
||||
grace_sleep.replace(Box::pin(tokio::time::sleep_until(deadline)));
|
||||
}
|
||||
|
||||
_ = async {
|
||||
if let Some(sleep) = grace_sleep.as_mut() {
|
||||
sleep.as_mut().await;
|
||||
}
|
||||
}, if grace_sleep.is_some() => {
|
||||
output_drained.notify_one();
|
||||
break;
|
||||
}
|
||||
|
||||
received = receiver.recv() => {
|
||||
let chunk = match received {
|
||||
Ok(chunk) => chunk,
|
||||
Err(RecvError::Lagged(_)) => {
|
||||
continue;
|
||||
},
|
||||
Err(RecvError::Closed) => {
|
||||
output_drained.notify_one();
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
process_chunk(
|
||||
&mut pending,
|
||||
&transcript,
|
||||
&call_id,
|
||||
&session_ref,
|
||||
&turn_ref,
|
||||
&mut emitted_deltas,
|
||||
chunk,
|
||||
).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
let output_drained = process
|
||||
.output_drained()
|
||||
.unwrap_or_else(|| Arc::new(Notify::new()));
|
||||
|
||||
tokio::spawn(async move {
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
|
||||
let mut pending = Vec::<u8>::new();
|
||||
let mut emitted_deltas: usize = 0;
|
||||
|
||||
@@ -59,7 +121,7 @@ pub(crate) fn start_streaming_output(
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = exit_token.cancelled(), if grace_sleep.is_none() => {
|
||||
_ = cancellation_token.cancelled(), if grace_sleep.is_none() => {
|
||||
let deadline = Instant::now() + TRAILING_OUTPUT_GRACE;
|
||||
grace_sleep.replace(Box::pin(tokio::time::sleep_until(deadline)));
|
||||
}
|
||||
@@ -73,27 +135,35 @@ pub(crate) fn start_streaming_output(
|
||||
break;
|
||||
}
|
||||
|
||||
received = receiver.recv() => {
|
||||
let chunk = match received {
|
||||
Ok(chunk) => chunk,
|
||||
Err(RecvError::Lagged(_)) => {
|
||||
continue;
|
||||
},
|
||||
Err(RecvError::Closed) => {
|
||||
_ = output_notify.notified() => {
|
||||
let drained_chunks = {
|
||||
let mut guard = output_buffer.lock().await;
|
||||
guard.drain_chunks()
|
||||
};
|
||||
if drained_chunks.is_empty() {
|
||||
if cancellation_token.is_cancelled() && output_closed.load(std::sync::atomic::Ordering::Acquire) {
|
||||
output_drained.notify_one();
|
||||
break;
|
||||
}
|
||||
};
|
||||
continue;
|
||||
}
|
||||
|
||||
process_chunk(
|
||||
&mut pending,
|
||||
&transcript,
|
||||
&call_id,
|
||||
&session_ref,
|
||||
&turn_ref,
|
||||
&mut emitted_deltas,
|
||||
chunk,
|
||||
).await;
|
||||
for chunk in drained_chunks {
|
||||
process_chunk(
|
||||
&mut pending,
|
||||
&transcript,
|
||||
&call_id,
|
||||
&session_ref,
|
||||
&turn_ref,
|
||||
&mut emitted_deltas,
|
||||
chunk,
|
||||
).await;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
_ = output_closed_notify.notified(), if grace_sleep.is_none() => {
|
||||
grace_sleep.replace(Box::pin(tokio::time::sleep_until(Instant::now() + TRAILING_OUTPUT_GRACE)));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -104,7 +174,7 @@ pub(crate) fn start_streaming_output(
|
||||
/// single ExecCommandEnd event with the aggregated transcript.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn spawn_exit_watcher(
|
||||
process: Arc<UnifiedExecProcess>,
|
||||
process: Arc<ProcessBackend>,
|
||||
session_ref: Arc<Session>,
|
||||
turn_ref: Arc<TurnContext>,
|
||||
call_id: String,
|
||||
@@ -114,8 +184,12 @@ pub(crate) fn spawn_exit_watcher(
|
||||
transcript: Arc<Mutex<HeadTailBuffer>>,
|
||||
started_at: Instant,
|
||||
) {
|
||||
let exit_token = process.cancellation_token();
|
||||
let output_drained = process.output_drained_notify();
|
||||
let Some(exit_token) = process.cancellation_token() else {
|
||||
return;
|
||||
};
|
||||
let Some(output_drained) = process.output_drained() else {
|
||||
return;
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
exit_token.cancelled().await;
|
||||
|
||||
@@ -25,17 +25,30 @@ use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::Weak;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use codex_exec_server::process::ExecProcess;
|
||||
use codex_network_proxy::NetworkProxy;
|
||||
use codex_protocol::models::PermissionProfile;
|
||||
use rand::Rng;
|
||||
use rand::rng;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::Notify;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::sandboxing::SandboxPermissions;
|
||||
use crate::exec::is_likely_sandbox_denied;
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
use crate::exec::SandboxType;
|
||||
use crate::exec::StreamOutput;
|
||||
use crate::truncate::TruncationPolicy;
|
||||
use crate::truncate::formatted_truncate_text;
|
||||
|
||||
mod async_watcher;
|
||||
mod errors;
|
||||
@@ -142,7 +155,7 @@ impl Default for UnifiedExecProcessManager {
|
||||
}
|
||||
|
||||
struct ProcessEntry {
|
||||
process: Arc<UnifiedExecProcess>,
|
||||
backend: ProcessBackend,
|
||||
call_id: String,
|
||||
process_id: i32,
|
||||
command: Vec<String>,
|
||||
@@ -152,6 +165,240 @@ struct ProcessEntry {
|
||||
last_used: tokio::time::Instant,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) enum ProcessBackend {
|
||||
Local {
|
||||
process: Arc<UnifiedExecProcess>,
|
||||
},
|
||||
ExecServer {
|
||||
process_id: String,
|
||||
executor: Arc<dyn ExecProcess>,
|
||||
output_buffer: crate::unified_exec::process::OutputBuffer,
|
||||
output_notify: Arc<Notify>,
|
||||
output_closed: Arc<AtomicBool>,
|
||||
output_closed_notify: Arc<Notify>,
|
||||
output_drained: Arc<Notify>,
|
||||
cancellation_token: CancellationToken,
|
||||
exit_code: Arc<RwLock<Option<i32>>>,
|
||||
has_exited: Arc<AtomicBool>,
|
||||
sandbox_type: SandboxType,
|
||||
output_seq: Arc<AtomicU64>,
|
||||
},
|
||||
}
|
||||
|
||||
impl ProcessBackend {
|
||||
pub(crate) fn is_local(&self) -> bool {
|
||||
matches!(self, Self::Local { .. })
|
||||
}
|
||||
|
||||
pub(crate) fn as_local_process(&self) -> Option<&Arc<UnifiedExecProcess>> {
|
||||
match self {
|
||||
Self::Local { process } => Some(process),
|
||||
Self::ExecServer { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn output_handles(
|
||||
&self,
|
||||
) -> (
|
||||
crate::unified_exec::process::OutputBuffer,
|
||||
Arc<Notify>,
|
||||
Arc<AtomicBool>,
|
||||
Arc<Notify>,
|
||||
CancellationToken,
|
||||
) {
|
||||
match self {
|
||||
Self::Local { process } => {
|
||||
let handles = process.output_handles();
|
||||
(
|
||||
handles.output_buffer,
|
||||
handles.output_notify,
|
||||
handles.output_closed,
|
||||
handles.output_closed_notify,
|
||||
handles.cancellation_token,
|
||||
)
|
||||
}
|
||||
Self::ExecServer {
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_closed,
|
||||
output_closed_notify,
|
||||
cancellation_token,
|
||||
..
|
||||
} => (
|
||||
Arc::clone(output_buffer),
|
||||
Arc::clone(output_notify),
|
||||
Arc::clone(output_closed),
|
||||
Arc::clone(output_closed_notify),
|
||||
cancellation_token.clone(),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn check_for_sandbox_denial_with_text(
|
||||
&self,
|
||||
text: &str,
|
||||
) -> Result<(), UnifiedExecError> {
|
||||
let sandbox_type = self.sandbox_type();
|
||||
if sandbox_type == SandboxType::None || !self.has_exited() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match self {
|
||||
Self::Local { process } => {
|
||||
process.check_for_sandbox_denial_with_text(text).await
|
||||
}
|
||||
Self::ExecServer { .. } => {
|
||||
let exit_code = self.exit_code().unwrap_or(-1);
|
||||
let exec_output = ExecToolCallOutput {
|
||||
exit_code,
|
||||
stderr: StreamOutput::new(text.to_string()),
|
||||
aggregated_output: StreamOutput::new(text.to_string()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
if is_likely_sandbox_denied(sandbox_type, &exec_output) {
|
||||
let snippet = formatted_truncate_text(
|
||||
text,
|
||||
TruncationPolicy::Tokens(UNIFIED_EXEC_OUTPUT_MAX_TOKENS),
|
||||
);
|
||||
let message = if snippet.is_empty() {
|
||||
format!("Process exited with code {exit_code}")
|
||||
} else {
|
||||
snippet
|
||||
};
|
||||
return Err(UnifiedExecError::sandbox_denied(message, exec_output));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn exit_code(&self) -> Option<i32> {
|
||||
match self {
|
||||
Self::Local { process } => process.exit_code(),
|
||||
Self::ExecServer { exit_code, .. } => *exit_code.read().unwrap_or_else(|err| err.into_inner()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_exit_code(&self, exit_code: i32) {
|
||||
if let Self::ExecServer { exit_code: state, .. } = self {
|
||||
let mut guard = state.write().unwrap_or_else(|err| err.into_inner());
|
||||
*guard = Some(exit_code);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn mark_exited(&self) {
|
||||
if let Self::ExecServer { has_exited, .. } = self {
|
||||
has_exited.store(true, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn has_exited(&self) -> bool {
|
||||
match self {
|
||||
Self::Local { process } => process.has_exited(),
|
||||
Self::ExecServer { has_exited, .. } => has_exited.load(Ordering::Acquire),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn cancellation_token(&self) -> Option<CancellationToken> {
|
||||
match self {
|
||||
Self::Local { process } => Some(process.cancellation_token()),
|
||||
Self::ExecServer {
|
||||
cancellation_token, ..
|
||||
} => Some(cancellation_token.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn output_drained(&self) -> Option<Arc<Notify>> {
|
||||
match self {
|
||||
Self::Local { process } => Some(process.output_drained_notify()),
|
||||
Self::ExecServer { output_drained, .. } => Some(Arc::clone(output_drained)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn exit_code_handle(&self) -> Option<Arc<RwLock<Option<i32>>> {
|
||||
match self {
|
||||
Self::ExecServer { exit_code, .. } => Some(Arc::clone(exit_code)),
|
||||
Self::Local { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn remote_output_seq(&self) -> Option<Arc<AtomicU64>> {
|
||||
match self {
|
||||
Self::ExecServer { output_seq, .. } => Some(Arc::clone(output_seq)),
|
||||
Self::Local { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn write_stdin(&self, data: &[u8]) -> Result<(), UnifiedExecError> {
|
||||
match self {
|
||||
Self::Local { process } => process
|
||||
.writer_sender()
|
||||
.send(data.to_vec())
|
||||
.await
|
||||
.map_err(|_| UnifiedExecError::WriteToStdin)?,
|
||||
Self::ExecServer {
|
||||
process_id,
|
||||
executor,
|
||||
..
|
||||
} => {
|
||||
let response = executor
|
||||
.write(process_id.as_str(), data.to_vec())
|
||||
.await
|
||||
.map_err(|_| UnifiedExecError::WriteToStdin)?;
|
||||
if !response.accepted {
|
||||
return Err(UnifiedExecError::WriteToStdin);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn terminate(&self) {
|
||||
match self {
|
||||
Self::Local { process } => process.terminate(),
|
||||
Self::ExecServer {
|
||||
process_id,
|
||||
executor,
|
||||
output_closed,
|
||||
output_closed_notify,
|
||||
output_drained,
|
||||
cancellation_token,
|
||||
has_exited,
|
||||
..
|
||||
} => {
|
||||
has_exited.store(true, Ordering::Release);
|
||||
output_closed.store(true, Ordering::Release);
|
||||
output_closed_notify.notify_waiters();
|
||||
output_drained.notify_one();
|
||||
cancellation_token.cancel();
|
||||
let _ = executor.terminate(process_id.as_str()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn sandbox_type(&self) -> SandboxType {
|
||||
match self {
|
||||
Self::Local { process } => process.sandbox_type(),
|
||||
Self::ExecServer { sandbox_type, .. } => *sandbox_type,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn remote_exec_state(&self) -> Option<(&str, &Arc<dyn ExecProcess>)> {
|
||||
match self {
|
||||
Self::ExecServer {
|
||||
process_id,
|
||||
executor,
|
||||
..
|
||||
} => Some((process_id.as_str(), executor)),
|
||||
Self::Local { .. } => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn clamp_yield_time(yield_time_ms: u64) -> u64 {
|
||||
yield_time_ms.clamp(MIN_YIELD_TIME_MS, MAX_YIELD_TIME_MS)
|
||||
}
|
||||
|
||||
@@ -4,14 +4,18 @@ use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::watch;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use codex_exec_server::process::ExecProcess;
|
||||
use codex_exec_server::protocol::ExecParams;
|
||||
use codex_exec_server::protocol::ReadParams;
|
||||
|
||||
use crate::exec_env::create_env;
|
||||
use crate::exec_policy::ExecApprovalRequest;
|
||||
@@ -34,6 +38,7 @@ use crate::unified_exec::MAX_YIELD_TIME_MS;
|
||||
use crate::unified_exec::MIN_EMPTY_YIELD_TIME_MS;
|
||||
use crate::unified_exec::MIN_YIELD_TIME_MS;
|
||||
use crate::unified_exec::ProcessEntry;
|
||||
use crate::unified_exec::ProcessBackend;
|
||||
use crate::unified_exec::ProcessStore;
|
||||
use crate::unified_exec::UnifiedExecContext;
|
||||
use crate::unified_exec::UnifiedExecError;
|
||||
@@ -49,7 +54,6 @@ use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
|
||||
use crate::unified_exec::process::OutputBuffer;
|
||||
use crate::unified_exec::process::OutputHandles;
|
||||
use crate::unified_exec::process::SpawnLifecycleHandle;
|
||||
use crate::unified_exec::process::UnifiedExecProcess;
|
||||
|
||||
const UNIFIED_EXEC_ENV: [(&str, &str); 10] = [
|
||||
("NO_COLOR", "1"),
|
||||
@@ -90,7 +94,7 @@ fn apply_unified_exec_env(mut env: HashMap<String, String>) -> HashMap<String, S
|
||||
}
|
||||
|
||||
struct PreparedProcessHandles {
|
||||
writer_tx: mpsc::Sender<Vec<u8>>,
|
||||
process: Arc<ProcessBackend>,
|
||||
output_buffer: OutputBuffer,
|
||||
output_notify: Arc<Notify>,
|
||||
output_closed: Arc<AtomicBool>,
|
||||
@@ -166,9 +170,7 @@ impl UnifiedExecProcessManager {
|
||||
.await;
|
||||
|
||||
let (process, mut deferred_network_approval) = match process {
|
||||
Ok((process, deferred_network_approval)) => {
|
||||
(Arc::new(process), deferred_network_approval)
|
||||
}
|
||||
Ok((process, deferred_network_approval)) => (process, deferred_network_approval),
|
||||
Err(err) => {
|
||||
self.release_process_id(request.process_id).await;
|
||||
return Err(err);
|
||||
@@ -190,7 +192,7 @@ impl UnifiedExecProcessManager {
|
||||
);
|
||||
emitter.emit(event_ctx, ToolEventStage::Begin).await;
|
||||
|
||||
start_streaming_output(&process, context, Arc::clone(&transcript));
|
||||
start_streaming_output(process.clone(), context, Arc::clone(&transcript));
|
||||
let start = Instant::now();
|
||||
// Persist live sessions before the initial yield wait so interrupting the
|
||||
// turn cannot drop the last Arc and terminate the background process.
|
||||
@@ -312,7 +314,6 @@ impl UnifiedExecProcessManager {
|
||||
let process_id = request.process_id;
|
||||
|
||||
let PreparedProcessHandles {
|
||||
writer_tx,
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_closed,
|
||||
@@ -329,7 +330,7 @@ impl UnifiedExecProcessManager {
|
||||
if !tty {
|
||||
return Err(UnifiedExecError::StdinClosed);
|
||||
}
|
||||
Self::send_input(&writer_tx, request.input.as_bytes()).await?;
|
||||
process.write_stdin(request.input.as_bytes()).await?;
|
||||
// Give the remote process a brief window to react so that we are
|
||||
// more likely to capture its output in the poll below.
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
@@ -407,10 +408,10 @@ impl UnifiedExecProcessManager {
|
||||
return ProcessStatus::Unknown;
|
||||
};
|
||||
|
||||
let exit_code = entry.process.exit_code();
|
||||
let exit_code = entry.backend.exit_code();
|
||||
let process_id = entry.process_id;
|
||||
|
||||
if entry.process.has_exited() {
|
||||
if entry.backend.has_exited() {
|
||||
let Some(entry) = store.remove(process_id) else {
|
||||
return ProcessStatus::Unknown;
|
||||
};
|
||||
@@ -448,14 +449,13 @@ impl UnifiedExecProcessManager {
|
||||
output_closed,
|
||||
output_closed_notify,
|
||||
cancellation_token,
|
||||
} = entry.process.output_handles();
|
||||
} = entry.backend.output_handles();
|
||||
let pause_state = entry
|
||||
.session
|
||||
.upgrade()
|
||||
.map(|session| session.subscribe_out_of_band_elicitation_pause_state());
|
||||
|
||||
Ok(PreparedProcessHandles {
|
||||
writer_tx: entry.process.writer_sender(),
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_closed,
|
||||
@@ -468,20 +468,10 @@ impl UnifiedExecProcessManager {
|
||||
})
|
||||
}
|
||||
|
||||
async fn send_input(
|
||||
writer_tx: &mpsc::Sender<Vec<u8>>,
|
||||
data: &[u8],
|
||||
) -> Result<(), UnifiedExecError> {
|
||||
writer_tx
|
||||
.send(data.to_vec())
|
||||
.await
|
||||
.map_err(|_| UnifiedExecError::WriteToStdin)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn store_process(
|
||||
&self,
|
||||
process: Arc<UnifiedExecProcess>,
|
||||
process: Arc<ProcessBackend>,
|
||||
context: &UnifiedExecContext,
|
||||
command: &[String],
|
||||
cwd: PathBuf,
|
||||
@@ -492,7 +482,7 @@ impl UnifiedExecProcessManager {
|
||||
transcript: Arc<tokio::sync::Mutex<HeadTailBuffer>>,
|
||||
) {
|
||||
let entry = ProcessEntry {
|
||||
process: Arc::clone(&process),
|
||||
backend: Arc::clone(&process),
|
||||
call_id: context.call_id.clone(),
|
||||
process_id,
|
||||
command: command.to_vec(),
|
||||
@@ -511,7 +501,7 @@ impl UnifiedExecProcessManager {
|
||||
// network-approval cleanup only after dropping that lock.
|
||||
if let Some(pruned_entry) = pruned_entry {
|
||||
Self::unregister_network_approval_for_entry(&pruned_entry).await;
|
||||
pruned_entry.process.terminate();
|
||||
pruned_entry.backend.terminate();
|
||||
}
|
||||
|
||||
if number_processes >= WARNING_UNIFIED_EXEC_PROCESSES {
|
||||
@@ -540,9 +530,29 @@ impl UnifiedExecProcessManager {
|
||||
pub(crate) async fn open_session_with_exec_env(
|
||||
&self,
|
||||
env: &ExecRequest,
|
||||
process_id: i32,
|
||||
tty: bool,
|
||||
use_exec_server: bool,
|
||||
mut executor: Option<Arc<dyn ExecProcess>>,
|
||||
mut spawn_lifecycle: SpawnLifecycleHandle,
|
||||
) -> Result<UnifiedExecProcess, UnifiedExecError> {
|
||||
) -> Result<Arc<ProcessBackend>, UnifiedExecError> {
|
||||
if use_exec_server {
|
||||
return self
|
||||
.open_session_with_exec_server(env, process_id, tty, &mut executor, spawn_lifecycle)
|
||||
.await;
|
||||
}
|
||||
|
||||
self.open_session_with_local_process(env, process_id, tty, spawn_lifecycle)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn open_session_with_local_process(
|
||||
&self,
|
||||
env: &ExecRequest,
|
||||
process_id: i32,
|
||||
tty: bool,
|
||||
spawn_lifecycle: SpawnLifecycleHandle,
|
||||
) -> Result<Arc<ProcessBackend>, UnifiedExecError> {
|
||||
let (program, args) = env
|
||||
.command
|
||||
.split_first()
|
||||
@@ -571,10 +581,58 @@ impl UnifiedExecProcessManager {
|
||||
)
|
||||
.await
|
||||
};
|
||||
let spawned =
|
||||
spawn_result.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
|
||||
|
||||
let spawned = spawn_result.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
|
||||
spawn_lifecycle.after_spawn();
|
||||
UnifiedExecProcess::from_spawned(spawned, env.sandbox, spawn_lifecycle).await
|
||||
let process = UnifiedExecProcess::from_spawned(spawned, env.sandbox, spawn_lifecycle).await?;
|
||||
Ok(Arc::new(ProcessBackend::Local {
|
||||
process: Arc::new(process),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn open_session_with_exec_server(
|
||||
&self,
|
||||
env: &ExecRequest,
|
||||
process_id: i32,
|
||||
tty: bool,
|
||||
mut executor: &mut Option<Arc<dyn ExecProcess>>,
|
||||
spawn_lifecycle: SpawnLifecycleHandle,
|
||||
) -> Result<Arc<ProcessBackend>, UnifiedExecError> {
|
||||
let executor = executor
|
||||
.take()
|
||||
.ok_or_else(|| UnifiedExecError::create_process("exec-server unavailable".to_string()))?;
|
||||
let response = executor
|
||||
.start(ExecParams {
|
||||
process_id: process_id.to_string(),
|
||||
argv: env.command.clone(),
|
||||
cwd: env.cwd.clone(),
|
||||
env: env.env.clone(),
|
||||
tty,
|
||||
arg0: env.arg0.clone(),
|
||||
})
|
||||
.await
|
||||
.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
|
||||
let _ = response.process_id;
|
||||
|
||||
let process = Arc::new(ProcessBackend::ExecServer {
|
||||
process_id: process_id.to_string(),
|
||||
executor,
|
||||
output_buffer: Arc::new(tokio::sync::Mutex::new(HeadTailBuffer::default())),
|
||||
output_notify: Arc::new(Notify::new()),
|
||||
output_closed: Arc::new(AtomicBool::new(false)),
|
||||
output_closed_notify: Arc::new(Notify::new()),
|
||||
output_drained: Arc::new(Notify::new()),
|
||||
cancellation_token: CancellationToken::new(),
|
||||
exit_code: Arc::new(RwLock::new(None)),
|
||||
has_exited: Arc::new(AtomicBool::new(false)),
|
||||
sandbox_type: env.sandbox,
|
||||
output_seq: Arc::new(AtomicU64::new(0)),
|
||||
});
|
||||
|
||||
spawn_lifecycle.after_spawn();
|
||||
Self::spawn_exec_server_output_watcher(Arc::clone(&process));
|
||||
|
||||
Ok(process)
|
||||
}
|
||||
|
||||
pub(super) async fn open_session_with_sandbox(
|
||||
@@ -582,16 +640,20 @@ impl UnifiedExecProcessManager {
|
||||
request: &ExecCommandRequest,
|
||||
cwd: PathBuf,
|
||||
context: &UnifiedExecContext,
|
||||
) -> Result<(UnifiedExecProcess, Option<DeferredNetworkApproval>), UnifiedExecError> {
|
||||
) -> Result<(Arc<ProcessBackend>, Option<DeferredNetworkApproval>), UnifiedExecError> {
|
||||
let env = apply_unified_exec_env(create_env(
|
||||
&context.turn.shell_environment_policy,
|
||||
Some(context.session.conversation_id),
|
||||
));
|
||||
let mut orchestrator = ToolOrchestrator::new();
|
||||
let mut runtime = UnifiedExecRuntime::new(
|
||||
self,
|
||||
context.turn.tools_config.unified_exec_shell_mode.clone(),
|
||||
);
|
||||
let mut runtime = if context.turn.config.experimental_exec_server_url.is_some() {
|
||||
UnifiedExecRuntime::with_exec_server(
|
||||
self,
|
||||
context.turn.tools_config.unified_exec_shell_mode.clone(),
|
||||
)
|
||||
} else {
|
||||
UnifiedExecRuntime::new(self, context.turn.tools_config.unified_exec_shell_mode.clone())
|
||||
};
|
||||
let exec_approval_requirement = context
|
||||
.session
|
||||
.services
|
||||
@@ -618,6 +680,8 @@ impl UnifiedExecProcessManager {
|
||||
tty: request.tty,
|
||||
sandbox_permissions: request.sandbox_permissions,
|
||||
additional_permissions: request.additional_permissions.clone(),
|
||||
process_id: request.process_id,
|
||||
use_exec_server: context.turn.config.experimental_exec_server_url.is_some(),
|
||||
#[cfg(unix)]
|
||||
additional_permissions_preapproved: request.additional_permissions_preapproved,
|
||||
justification: request.justification.clone(),
|
||||
@@ -775,7 +839,7 @@ impl UnifiedExecProcessManager {
|
||||
let meta: Vec<(i32, Instant, bool)> = store
|
||||
.processes
|
||||
.iter()
|
||||
.map(|(id, entry)| (*id, entry.last_used, entry.process.has_exited()))
|
||||
.map(|(id, entry)| (*id, entry.last_used, entry.backend.has_exited()))
|
||||
.collect();
|
||||
|
||||
if let Some(process_id) = Self::process_id_to_prune_from_meta(&meta) {
|
||||
@@ -828,11 +892,129 @@ impl UnifiedExecProcessManager {
|
||||
|
||||
for entry in entries {
|
||||
Self::unregister_network_approval_for_entry(&entry).await;
|
||||
entry.process.terminate();
|
||||
entry.backend.terminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UnifiedExecProcessManager {
|
||||
fn spawn_exec_server_output_watcher(process: Arc<ProcessBackend>) {
|
||||
let (
|
||||
process_id,
|
||||
executor,
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_closed,
|
||||
output_closed_notify,
|
||||
output_drained,
|
||||
output_seq,
|
||||
has_exited,
|
||||
exit_code,
|
||||
cancellation_token,
|
||||
) = {
|
||||
if let ProcessBackend::ExecServer {
|
||||
process_id,
|
||||
executor,
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_closed,
|
||||
output_closed_notify,
|
||||
output_drained,
|
||||
output_seq,
|
||||
has_exited,
|
||||
exit_code,
|
||||
cancellation_token,
|
||||
..
|
||||
} = process.as_ref()
|
||||
{
|
||||
(
|
||||
process_id.clone(),
|
||||
Arc::clone(executor),
|
||||
Arc::clone(output_buffer),
|
||||
Arc::clone(output_notify),
|
||||
Arc::clone(output_closed),
|
||||
Arc::clone(output_closed_notify),
|
||||
Arc::clone(output_drained),
|
||||
Arc::clone(output_seq),
|
||||
Arc::clone(has_exited),
|
||||
Arc::clone(exit_code),
|
||||
cancellation_token.clone(),
|
||||
)
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut after_seq = None;
|
||||
loop {
|
||||
if cancellation_token.is_cancelled() {
|
||||
output_closed.store(true, Ordering::Release);
|
||||
output_closed_notify.notify_waiters();
|
||||
output_drained.notify_waiters();
|
||||
break;
|
||||
}
|
||||
|
||||
let response = match executor
|
||||
.read(ReadParams {
|
||||
process_id: process_id.clone(),
|
||||
after_seq,
|
||||
max_bytes: Some(64 * 1024),
|
||||
wait_ms: Some(100),
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(response) => response,
|
||||
Err(_) => {
|
||||
let mut guard = exit_code.write().unwrap_or_else(|err| err.into_inner());
|
||||
if guard.is_none() {
|
||||
*guard = Some(-1);
|
||||
}
|
||||
has_exited.store(true, Ordering::Release);
|
||||
output_closed.store(true, Ordering::Release);
|
||||
output_closed_notify.notify_waiters();
|
||||
output_drained.notify_waiters();
|
||||
cancellation_token.cancel();
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if !response.chunks.is_empty() {
|
||||
let mut current_seq = output_seq.load(Ordering::Acquire);
|
||||
for chunk in response.chunks {
|
||||
if chunk.seq < current_seq {
|
||||
continue;
|
||||
}
|
||||
|
||||
{
|
||||
let mut guard = output_buffer.lock().await;
|
||||
guard.push_chunk(chunk.chunk.into_inner());
|
||||
}
|
||||
current_seq = chunk.seq.saturating_add(1);
|
||||
output_seq.store(current_seq, Ordering::Release);
|
||||
output_notify.notify_waiters();
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(code) = response.exit_code {
|
||||
let mut guard = exit_code.write().unwrap_or_else(|err| err.into_inner());
|
||||
*guard = Some(code);
|
||||
}
|
||||
after_seq = Some(response.next_seq);
|
||||
|
||||
if response.exited {
|
||||
has_exited.store(true, Ordering::Release);
|
||||
output_closed.store(true, Ordering::Release);
|
||||
output_closed_notify.notify_waiters();
|
||||
output_drained.notify_waiters();
|
||||
cancellation_token.cancel();
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
enum ProcessStatus {
|
||||
Alive {
|
||||
exit_code: Option<i32>,
|
||||
|
||||
Reference in New Issue
Block a user