mirror of
https://github.com/openai/codex.git
synced 2026-04-22 05:34:49 +00:00
Compare commits
2 Commits
codex-debu
...
codex-exec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0ee08bc702 | ||
|
|
04190334dc |
@@ -34,7 +34,7 @@ use crate::tools::sandboxing::with_cached_approval;
|
|||||||
use crate::tools::spec::UnifiedExecShellMode;
|
use crate::tools::spec::UnifiedExecShellMode;
|
||||||
use crate::unified_exec::NoopSpawnLifecycle;
|
use crate::unified_exec::NoopSpawnLifecycle;
|
||||||
use crate::unified_exec::UnifiedExecError;
|
use crate::unified_exec::UnifiedExecError;
|
||||||
use crate::unified_exec::UnifiedExecProcess;
|
use crate::unified_exec::ProcessBackend;
|
||||||
use crate::unified_exec::UnifiedExecProcessManager;
|
use crate::unified_exec::UnifiedExecProcessManager;
|
||||||
use codex_network_proxy::NetworkProxy;
|
use codex_network_proxy::NetworkProxy;
|
||||||
use codex_protocol::models::PermissionProfile;
|
use codex_protocol::models::PermissionProfile;
|
||||||
@@ -53,6 +53,8 @@ pub struct UnifiedExecRequest {
|
|||||||
pub tty: bool,
|
pub tty: bool,
|
||||||
pub sandbox_permissions: SandboxPermissions,
|
pub sandbox_permissions: SandboxPermissions,
|
||||||
pub additional_permissions: Option<PermissionProfile>,
|
pub additional_permissions: Option<PermissionProfile>,
|
||||||
|
pub process_id: i32,
|
||||||
|
pub use_exec_server: bool,
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
pub additional_permissions_preapproved: bool,
|
pub additional_permissions_preapproved: bool,
|
||||||
pub justification: Option<String>,
|
pub justification: Option<String>,
|
||||||
@@ -71,6 +73,7 @@ pub struct UnifiedExecApprovalKey {
|
|||||||
pub struct UnifiedExecRuntime<'a> {
|
pub struct UnifiedExecRuntime<'a> {
|
||||||
manager: &'a UnifiedExecProcessManager,
|
manager: &'a UnifiedExecProcessManager,
|
||||||
shell_mode: UnifiedExecShellMode,
|
shell_mode: UnifiedExecShellMode,
|
||||||
|
use_exec_server: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> UnifiedExecRuntime<'a> {
|
impl<'a> UnifiedExecRuntime<'a> {
|
||||||
@@ -78,6 +81,18 @@ impl<'a> UnifiedExecRuntime<'a> {
|
|||||||
Self {
|
Self {
|
||||||
manager,
|
manager,
|
||||||
shell_mode,
|
shell_mode,
|
||||||
|
use_exec_server: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_exec_server(
|
||||||
|
manager: &'a UnifiedExecProcessManager,
|
||||||
|
shell_mode: UnifiedExecShellMode,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
manager,
|
||||||
|
shell_mode,
|
||||||
|
use_exec_server: true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -172,7 +187,7 @@ impl Approvable<UnifiedExecRequest> for UnifiedExecRuntime<'_> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRuntime<'a> {
|
impl<'a> ToolRuntime<UnifiedExecRequest, Arc<ProcessBackend>> for UnifiedExecRuntime<'a> {
|
||||||
fn network_approval_spec(
|
fn network_approval_spec(
|
||||||
&self,
|
&self,
|
||||||
req: &UnifiedExecRequest,
|
req: &UnifiedExecRequest,
|
||||||
@@ -190,7 +205,13 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
|
|||||||
req: &UnifiedExecRequest,
|
req: &UnifiedExecRequest,
|
||||||
attempt: &SandboxAttempt<'_>,
|
attempt: &SandboxAttempt<'_>,
|
||||||
ctx: &ToolCtx,
|
ctx: &ToolCtx,
|
||||||
) -> Result<UnifiedExecProcess, ToolError> {
|
) -> Result<Arc<ProcessBackend>, ToolError> {
|
||||||
|
let use_exec_server = if self.use_exec_server {
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
req.use_exec_server
|
||||||
|
};
|
||||||
|
|
||||||
let base_command = &req.command;
|
let base_command = &req.command;
|
||||||
let session_shell = ctx.session.user_shell();
|
let session_shell = ctx.session.user_shell();
|
||||||
let command = maybe_wrap_shell_lc_with_snapshot(
|
let command = maybe_wrap_shell_lc_with_snapshot(
|
||||||
@@ -237,7 +258,10 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
|
|||||||
.manager
|
.manager
|
||||||
.open_session_with_exec_env(
|
.open_session_with_exec_env(
|
||||||
&prepared.exec_request,
|
&prepared.exec_request,
|
||||||
|
req.process_id,
|
||||||
req.tty,
|
req.tty,
|
||||||
|
use_exec_server,
|
||||||
|
Some(ctx.session.services.environment.get_executor()),
|
||||||
prepared.spawn_lifecycle,
|
prepared.spawn_lifecycle,
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
@@ -272,7 +296,18 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
|
|||||||
.env_for(spec, req.network.as_ref())
|
.env_for(spec, req.network.as_ref())
|
||||||
.map_err(|err| ToolError::Codex(err.into()))?;
|
.map_err(|err| ToolError::Codex(err.into()))?;
|
||||||
self.manager
|
self.manager
|
||||||
.open_session_with_exec_env(&exec_env, req.tty, Box::new(NoopSpawnLifecycle))
|
.open_session_with_exec_env(
|
||||||
|
&exec_env,
|
||||||
|
req.process_id,
|
||||||
|
req.tty,
|
||||||
|
use_exec_server,
|
||||||
|
if use_exec_server {
|
||||||
|
Some(ctx.session.services.environment.get_executor())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
},
|
||||||
|
Box::new(NoopSpawnLifecycle),
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.map_err(|err| match err {
|
.map_err(|err| match err {
|
||||||
UnifiedExecError::SandboxDenied { output, .. } => {
|
UnifiedExecError::SandboxDenied { output, .. } => {
|
||||||
|
|||||||
@@ -18,10 +18,12 @@ use crate::protocol::EventMsg;
|
|||||||
use crate::protocol::ExecCommandOutputDeltaEvent;
|
use crate::protocol::ExecCommandOutputDeltaEvent;
|
||||||
use crate::protocol::ExecCommandSource;
|
use crate::protocol::ExecCommandSource;
|
||||||
use crate::protocol::ExecOutputStream;
|
use crate::protocol::ExecOutputStream;
|
||||||
|
use crate::unified_exec::process::OutputHandles;
|
||||||
use crate::tools::events::ToolEmitter;
|
use crate::tools::events::ToolEmitter;
|
||||||
use crate::tools::events::ToolEventCtx;
|
use crate::tools::events::ToolEventCtx;
|
||||||
use crate::tools::events::ToolEventStage;
|
use crate::tools::events::ToolEventStage;
|
||||||
use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
|
use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
|
||||||
|
use crate::unified_exec::ProcessBackend;
|
||||||
|
|
||||||
pub(crate) const TRAILING_OUTPUT_GRACE: Duration = Duration::from_millis(100);
|
pub(crate) const TRAILING_OUTPUT_GRACE: Duration = Duration::from_millis(100);
|
||||||
|
|
||||||
@@ -37,21 +39,81 @@ const UNIFIED_EXEC_OUTPUT_DELTA_MAX_BYTES: usize = 8192;
|
|||||||
/// shared transcript, and emits ExecCommandOutputDelta events on UTF‑8
|
/// shared transcript, and emits ExecCommandOutputDelta events on UTF‑8
|
||||||
/// boundaries.
|
/// boundaries.
|
||||||
pub(crate) fn start_streaming_output(
|
pub(crate) fn start_streaming_output(
|
||||||
process: &UnifiedExecProcess,
|
process: Arc<ProcessBackend>,
|
||||||
context: &UnifiedExecContext,
|
context: &UnifiedExecContext,
|
||||||
transcript: Arc<Mutex<HeadTailBuffer>>,
|
transcript: Arc<Mutex<HeadTailBuffer>>,
|
||||||
) {
|
) {
|
||||||
let mut receiver = process.output_receiver();
|
|
||||||
let output_drained = process.output_drained_notify();
|
|
||||||
let exit_token = process.cancellation_token();
|
|
||||||
|
|
||||||
let session_ref = Arc::clone(&context.session);
|
let session_ref = Arc::clone(&context.session);
|
||||||
let turn_ref = Arc::clone(&context.turn);
|
let turn_ref = Arc::clone(&context.turn);
|
||||||
let call_id = context.call_id.clone();
|
let call_id = context.call_id.clone();
|
||||||
|
let OutputHandles {
|
||||||
|
output_buffer,
|
||||||
|
output_notify,
|
||||||
|
output_closed,
|
||||||
|
output_closed_notify,
|
||||||
|
cancellation_token,
|
||||||
|
} = process.output_handles();
|
||||||
|
|
||||||
|
if let Some(local_process) = process.as_local_process() {
|
||||||
|
let mut receiver = local_process.output_receiver();
|
||||||
|
let output_drained = local_process.output_drained_notify();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
use tokio::sync::broadcast::error::RecvError;
|
||||||
|
|
||||||
|
let mut pending = Vec::<u8>::new();
|
||||||
|
let mut emitted_deltas: usize = 0;
|
||||||
|
|
||||||
|
let mut grace_sleep: Option<Pin<Box<Sleep>>> = None;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = cancellation_token.cancelled(), if grace_sleep.is_none() => {
|
||||||
|
let deadline = Instant::now() + TRAILING_OUTPUT_GRACE;
|
||||||
|
grace_sleep.replace(Box::pin(tokio::time::sleep_until(deadline)));
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = async {
|
||||||
|
if let Some(sleep) = grace_sleep.as_mut() {
|
||||||
|
sleep.as_mut().await;
|
||||||
|
}
|
||||||
|
}, if grace_sleep.is_some() => {
|
||||||
|
output_drained.notify_one();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
received = receiver.recv() => {
|
||||||
|
let chunk = match received {
|
||||||
|
Ok(chunk) => chunk,
|
||||||
|
Err(RecvError::Lagged(_)) => {
|
||||||
|
continue;
|
||||||
|
},
|
||||||
|
Err(RecvError::Closed) => {
|
||||||
|
output_drained.notify_one();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
process_chunk(
|
||||||
|
&mut pending,
|
||||||
|
&transcript,
|
||||||
|
&call_id,
|
||||||
|
&session_ref,
|
||||||
|
&turn_ref,
|
||||||
|
&mut emitted_deltas,
|
||||||
|
chunk,
|
||||||
|
).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let output_drained = process
|
||||||
|
.output_drained()
|
||||||
|
.unwrap_or_else(|| Arc::new(Notify::new()));
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
use tokio::sync::broadcast::error::RecvError;
|
|
||||||
|
|
||||||
let mut pending = Vec::<u8>::new();
|
let mut pending = Vec::<u8>::new();
|
||||||
let mut emitted_deltas: usize = 0;
|
let mut emitted_deltas: usize = 0;
|
||||||
|
|
||||||
@@ -59,7 +121,7 @@ pub(crate) fn start_streaming_output(
|
|||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = exit_token.cancelled(), if grace_sleep.is_none() => {
|
_ = cancellation_token.cancelled(), if grace_sleep.is_none() => {
|
||||||
let deadline = Instant::now() + TRAILING_OUTPUT_GRACE;
|
let deadline = Instant::now() + TRAILING_OUTPUT_GRACE;
|
||||||
grace_sleep.replace(Box::pin(tokio::time::sleep_until(deadline)));
|
grace_sleep.replace(Box::pin(tokio::time::sleep_until(deadline)));
|
||||||
}
|
}
|
||||||
@@ -73,27 +135,35 @@ pub(crate) fn start_streaming_output(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
received = receiver.recv() => {
|
_ = output_notify.notified() => {
|
||||||
let chunk = match received {
|
let drained_chunks = {
|
||||||
Ok(chunk) => chunk,
|
let mut guard = output_buffer.lock().await;
|
||||||
Err(RecvError::Lagged(_)) => {
|
guard.drain_chunks()
|
||||||
continue;
|
};
|
||||||
},
|
if drained_chunks.is_empty() {
|
||||||
Err(RecvError::Closed) => {
|
if cancellation_token.is_cancelled() && output_closed.load(std::sync::atomic::Ordering::Acquire) {
|
||||||
output_drained.notify_one();
|
output_drained.notify_one();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
};
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
process_chunk(
|
for chunk in drained_chunks {
|
||||||
&mut pending,
|
process_chunk(
|
||||||
&transcript,
|
&mut pending,
|
||||||
&call_id,
|
&transcript,
|
||||||
&session_ref,
|
&call_id,
|
||||||
&turn_ref,
|
&session_ref,
|
||||||
&mut emitted_deltas,
|
&turn_ref,
|
||||||
chunk,
|
&mut emitted_deltas,
|
||||||
).await;
|
chunk,
|
||||||
|
).await;
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
_ = output_closed_notify.notified(), if grace_sleep.is_none() => {
|
||||||
|
grace_sleep.replace(Box::pin(tokio::time::sleep_until(Instant::now() + TRAILING_OUTPUT_GRACE)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -104,7 +174,7 @@ pub(crate) fn start_streaming_output(
|
|||||||
/// single ExecCommandEnd event with the aggregated transcript.
|
/// single ExecCommandEnd event with the aggregated transcript.
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub(crate) fn spawn_exit_watcher(
|
pub(crate) fn spawn_exit_watcher(
|
||||||
process: Arc<UnifiedExecProcess>,
|
process: Arc<ProcessBackend>,
|
||||||
session_ref: Arc<Session>,
|
session_ref: Arc<Session>,
|
||||||
turn_ref: Arc<TurnContext>,
|
turn_ref: Arc<TurnContext>,
|
||||||
call_id: String,
|
call_id: String,
|
||||||
@@ -114,8 +184,12 @@ pub(crate) fn spawn_exit_watcher(
|
|||||||
transcript: Arc<Mutex<HeadTailBuffer>>,
|
transcript: Arc<Mutex<HeadTailBuffer>>,
|
||||||
started_at: Instant,
|
started_at: Instant,
|
||||||
) {
|
) {
|
||||||
let exit_token = process.cancellation_token();
|
let Some(exit_token) = process.cancellation_token() else {
|
||||||
let output_drained = process.output_drained_notify();
|
return;
|
||||||
|
};
|
||||||
|
let Some(output_drained) = process.output_drained() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
exit_token.cancelled().await;
|
exit_token.cancelled().await;
|
||||||
|
|||||||
@@ -25,17 +25,30 @@ use std::collections::HashMap;
|
|||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::sync::RwLock;
|
||||||
|
use std::sync::atomic::AtomicBool;
|
||||||
|
use std::sync::atomic::AtomicU64;
|
||||||
use std::sync::Weak;
|
use std::sync::Weak;
|
||||||
|
use std::sync::atomic::Ordering;
|
||||||
|
|
||||||
|
use codex_exec_server::process::ExecProcess;
|
||||||
use codex_network_proxy::NetworkProxy;
|
use codex_network_proxy::NetworkProxy;
|
||||||
use codex_protocol::models::PermissionProfile;
|
use codex_protocol::models::PermissionProfile;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use rand::rng;
|
use rand::rng;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
use tokio::sync::Notify;
|
||||||
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
use crate::codex::Session;
|
use crate::codex::Session;
|
||||||
use crate::codex::TurnContext;
|
use crate::codex::TurnContext;
|
||||||
use crate::sandboxing::SandboxPermissions;
|
use crate::sandboxing::SandboxPermissions;
|
||||||
|
use crate::exec::is_likely_sandbox_denied;
|
||||||
|
use crate::exec::ExecToolCallOutput;
|
||||||
|
use crate::exec::SandboxType;
|
||||||
|
use crate::exec::StreamOutput;
|
||||||
|
use crate::truncate::TruncationPolicy;
|
||||||
|
use crate::truncate::formatted_truncate_text;
|
||||||
|
|
||||||
mod async_watcher;
|
mod async_watcher;
|
||||||
mod errors;
|
mod errors;
|
||||||
@@ -142,7 +155,7 @@ impl Default for UnifiedExecProcessManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct ProcessEntry {
|
struct ProcessEntry {
|
||||||
process: Arc<UnifiedExecProcess>,
|
backend: ProcessBackend,
|
||||||
call_id: String,
|
call_id: String,
|
||||||
process_id: i32,
|
process_id: i32,
|
||||||
command: Vec<String>,
|
command: Vec<String>,
|
||||||
@@ -152,6 +165,240 @@ struct ProcessEntry {
|
|||||||
last_used: tokio::time::Instant,
|
last_used: tokio::time::Instant,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub(crate) enum ProcessBackend {
|
||||||
|
Local {
|
||||||
|
process: Arc<UnifiedExecProcess>,
|
||||||
|
},
|
||||||
|
ExecServer {
|
||||||
|
process_id: String,
|
||||||
|
executor: Arc<dyn ExecProcess>,
|
||||||
|
output_buffer: crate::unified_exec::process::OutputBuffer,
|
||||||
|
output_notify: Arc<Notify>,
|
||||||
|
output_closed: Arc<AtomicBool>,
|
||||||
|
output_closed_notify: Arc<Notify>,
|
||||||
|
output_drained: Arc<Notify>,
|
||||||
|
cancellation_token: CancellationToken,
|
||||||
|
exit_code: Arc<RwLock<Option<i32>>>,
|
||||||
|
has_exited: Arc<AtomicBool>,
|
||||||
|
sandbox_type: SandboxType,
|
||||||
|
output_seq: Arc<AtomicU64>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ProcessBackend {
|
||||||
|
pub(crate) fn is_local(&self) -> bool {
|
||||||
|
matches!(self, Self::Local { .. })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn as_local_process(&self) -> Option<&Arc<UnifiedExecProcess>> {
|
||||||
|
match self {
|
||||||
|
Self::Local { process } => Some(process),
|
||||||
|
Self::ExecServer { .. } => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn output_handles(
|
||||||
|
&self,
|
||||||
|
) -> (
|
||||||
|
crate::unified_exec::process::OutputBuffer,
|
||||||
|
Arc<Notify>,
|
||||||
|
Arc<AtomicBool>,
|
||||||
|
Arc<Notify>,
|
||||||
|
CancellationToken,
|
||||||
|
) {
|
||||||
|
match self {
|
||||||
|
Self::Local { process } => {
|
||||||
|
let handles = process.output_handles();
|
||||||
|
(
|
||||||
|
handles.output_buffer,
|
||||||
|
handles.output_notify,
|
||||||
|
handles.output_closed,
|
||||||
|
handles.output_closed_notify,
|
||||||
|
handles.cancellation_token,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
Self::ExecServer {
|
||||||
|
output_buffer,
|
||||||
|
output_notify,
|
||||||
|
output_closed,
|
||||||
|
output_closed_notify,
|
||||||
|
cancellation_token,
|
||||||
|
..
|
||||||
|
} => (
|
||||||
|
Arc::clone(output_buffer),
|
||||||
|
Arc::clone(output_notify),
|
||||||
|
Arc::clone(output_closed),
|
||||||
|
Arc::clone(output_closed_notify),
|
||||||
|
cancellation_token.clone(),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn check_for_sandbox_denial_with_text(
|
||||||
|
&self,
|
||||||
|
text: &str,
|
||||||
|
) -> Result<(), UnifiedExecError> {
|
||||||
|
let sandbox_type = self.sandbox_type();
|
||||||
|
if sandbox_type == SandboxType::None || !self.has_exited() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
match self {
|
||||||
|
Self::Local { process } => {
|
||||||
|
process.check_for_sandbox_denial_with_text(text).await
|
||||||
|
}
|
||||||
|
Self::ExecServer { .. } => {
|
||||||
|
let exit_code = self.exit_code().unwrap_or(-1);
|
||||||
|
let exec_output = ExecToolCallOutput {
|
||||||
|
exit_code,
|
||||||
|
stderr: StreamOutput::new(text.to_string()),
|
||||||
|
aggregated_output: StreamOutput::new(text.to_string()),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
if is_likely_sandbox_denied(sandbox_type, &exec_output) {
|
||||||
|
let snippet = formatted_truncate_text(
|
||||||
|
text,
|
||||||
|
TruncationPolicy::Tokens(UNIFIED_EXEC_OUTPUT_MAX_TOKENS),
|
||||||
|
);
|
||||||
|
let message = if snippet.is_empty() {
|
||||||
|
format!("Process exited with code {exit_code}")
|
||||||
|
} else {
|
||||||
|
snippet
|
||||||
|
};
|
||||||
|
return Err(UnifiedExecError::sandbox_denied(message, exec_output));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn exit_code(&self) -> Option<i32> {
|
||||||
|
match self {
|
||||||
|
Self::Local { process } => process.exit_code(),
|
||||||
|
Self::ExecServer { exit_code, .. } => *exit_code.read().unwrap_or_else(|err| err.into_inner()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn set_exit_code(&self, exit_code: i32) {
|
||||||
|
if let Self::ExecServer { exit_code: state, .. } = self {
|
||||||
|
let mut guard = state.write().unwrap_or_else(|err| err.into_inner());
|
||||||
|
*guard = Some(exit_code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn mark_exited(&self) {
|
||||||
|
if let Self::ExecServer { has_exited, .. } = self {
|
||||||
|
has_exited.store(true, Ordering::Release);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn has_exited(&self) -> bool {
|
||||||
|
match self {
|
||||||
|
Self::Local { process } => process.has_exited(),
|
||||||
|
Self::ExecServer { has_exited, .. } => has_exited.load(Ordering::Acquire),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn cancellation_token(&self) -> Option<CancellationToken> {
|
||||||
|
match self {
|
||||||
|
Self::Local { process } => Some(process.cancellation_token()),
|
||||||
|
Self::ExecServer {
|
||||||
|
cancellation_token, ..
|
||||||
|
} => Some(cancellation_token.clone()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn output_drained(&self) -> Option<Arc<Notify>> {
|
||||||
|
match self {
|
||||||
|
Self::Local { process } => Some(process.output_drained_notify()),
|
||||||
|
Self::ExecServer { output_drained, .. } => Some(Arc::clone(output_drained)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn exit_code_handle(&self) -> Option<Arc<RwLock<Option<i32>>> {
|
||||||
|
match self {
|
||||||
|
Self::ExecServer { exit_code, .. } => Some(Arc::clone(exit_code)),
|
||||||
|
Self::Local { .. } => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn remote_output_seq(&self) -> Option<Arc<AtomicU64>> {
|
||||||
|
match self {
|
||||||
|
Self::ExecServer { output_seq, .. } => Some(Arc::clone(output_seq)),
|
||||||
|
Self::Local { .. } => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn write_stdin(&self, data: &[u8]) -> Result<(), UnifiedExecError> {
|
||||||
|
match self {
|
||||||
|
Self::Local { process } => process
|
||||||
|
.writer_sender()
|
||||||
|
.send(data.to_vec())
|
||||||
|
.await
|
||||||
|
.map_err(|_| UnifiedExecError::WriteToStdin)?,
|
||||||
|
Self::ExecServer {
|
||||||
|
process_id,
|
||||||
|
executor,
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
let response = executor
|
||||||
|
.write(process_id.as_str(), data.to_vec())
|
||||||
|
.await
|
||||||
|
.map_err(|_| UnifiedExecError::WriteToStdin)?;
|
||||||
|
if !response.accepted {
|
||||||
|
return Err(UnifiedExecError::WriteToStdin);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn terminate(&self) {
|
||||||
|
match self {
|
||||||
|
Self::Local { process } => process.terminate(),
|
||||||
|
Self::ExecServer {
|
||||||
|
process_id,
|
||||||
|
executor,
|
||||||
|
output_closed,
|
||||||
|
output_closed_notify,
|
||||||
|
output_drained,
|
||||||
|
cancellation_token,
|
||||||
|
has_exited,
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
has_exited.store(true, Ordering::Release);
|
||||||
|
output_closed.store(true, Ordering::Release);
|
||||||
|
output_closed_notify.notify_waiters();
|
||||||
|
output_drained.notify_one();
|
||||||
|
cancellation_token.cancel();
|
||||||
|
let _ = executor.terminate(process_id.as_str()).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn sandbox_type(&self) -> SandboxType {
|
||||||
|
match self {
|
||||||
|
Self::Local { process } => process.sandbox_type(),
|
||||||
|
Self::ExecServer { sandbox_type, .. } => *sandbox_type,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn remote_exec_state(&self) -> Option<(&str, &Arc<dyn ExecProcess>)> {
|
||||||
|
match self {
|
||||||
|
Self::ExecServer {
|
||||||
|
process_id,
|
||||||
|
executor,
|
||||||
|
..
|
||||||
|
} => Some((process_id.as_str(), executor)),
|
||||||
|
Self::Local { .. } => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn clamp_yield_time(yield_time_ms: u64) -> u64 {
|
pub(crate) fn clamp_yield_time(yield_time_ms: u64) -> u64 {
|
||||||
yield_time_ms.clamp(MIN_YIELD_TIME_MS, MAX_YIELD_TIME_MS)
|
yield_time_ms.clamp(MIN_YIELD_TIME_MS, MAX_YIELD_TIME_MS)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,14 +4,18 @@ use std::collections::HashMap;
|
|||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::sync::RwLock;
|
||||||
use std::sync::atomic::AtomicBool;
|
use std::sync::atomic::AtomicBool;
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
|
use std::sync::atomic::AtomicU64;
|
||||||
use tokio::sync::Notify;
|
use tokio::sync::Notify;
|
||||||
use tokio::sync::mpsc;
|
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
use tokio::time::Duration;
|
use tokio::time::Duration;
|
||||||
use tokio::time::Instant;
|
use tokio::time::Instant;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
use codex_exec_server::process::ExecProcess;
|
||||||
|
use codex_exec_server::protocol::ExecParams;
|
||||||
|
use codex_exec_server::protocol::ReadParams;
|
||||||
|
|
||||||
use crate::exec_env::create_env;
|
use crate::exec_env::create_env;
|
||||||
use crate::exec_policy::ExecApprovalRequest;
|
use crate::exec_policy::ExecApprovalRequest;
|
||||||
@@ -34,6 +38,7 @@ use crate::unified_exec::MAX_YIELD_TIME_MS;
|
|||||||
use crate::unified_exec::MIN_EMPTY_YIELD_TIME_MS;
|
use crate::unified_exec::MIN_EMPTY_YIELD_TIME_MS;
|
||||||
use crate::unified_exec::MIN_YIELD_TIME_MS;
|
use crate::unified_exec::MIN_YIELD_TIME_MS;
|
||||||
use crate::unified_exec::ProcessEntry;
|
use crate::unified_exec::ProcessEntry;
|
||||||
|
use crate::unified_exec::ProcessBackend;
|
||||||
use crate::unified_exec::ProcessStore;
|
use crate::unified_exec::ProcessStore;
|
||||||
use crate::unified_exec::UnifiedExecContext;
|
use crate::unified_exec::UnifiedExecContext;
|
||||||
use crate::unified_exec::UnifiedExecError;
|
use crate::unified_exec::UnifiedExecError;
|
||||||
@@ -49,7 +54,6 @@ use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
|
|||||||
use crate::unified_exec::process::OutputBuffer;
|
use crate::unified_exec::process::OutputBuffer;
|
||||||
use crate::unified_exec::process::OutputHandles;
|
use crate::unified_exec::process::OutputHandles;
|
||||||
use crate::unified_exec::process::SpawnLifecycleHandle;
|
use crate::unified_exec::process::SpawnLifecycleHandle;
|
||||||
use crate::unified_exec::process::UnifiedExecProcess;
|
|
||||||
|
|
||||||
const UNIFIED_EXEC_ENV: [(&str, &str); 10] = [
|
const UNIFIED_EXEC_ENV: [(&str, &str); 10] = [
|
||||||
("NO_COLOR", "1"),
|
("NO_COLOR", "1"),
|
||||||
@@ -90,7 +94,7 @@ fn apply_unified_exec_env(mut env: HashMap<String, String>) -> HashMap<String, S
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct PreparedProcessHandles {
|
struct PreparedProcessHandles {
|
||||||
writer_tx: mpsc::Sender<Vec<u8>>,
|
process: Arc<ProcessBackend>,
|
||||||
output_buffer: OutputBuffer,
|
output_buffer: OutputBuffer,
|
||||||
output_notify: Arc<Notify>,
|
output_notify: Arc<Notify>,
|
||||||
output_closed: Arc<AtomicBool>,
|
output_closed: Arc<AtomicBool>,
|
||||||
@@ -166,9 +170,7 @@ impl UnifiedExecProcessManager {
|
|||||||
.await;
|
.await;
|
||||||
|
|
||||||
let (process, mut deferred_network_approval) = match process {
|
let (process, mut deferred_network_approval) = match process {
|
||||||
Ok((process, deferred_network_approval)) => {
|
Ok((process, deferred_network_approval)) => (process, deferred_network_approval),
|
||||||
(Arc::new(process), deferred_network_approval)
|
|
||||||
}
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
self.release_process_id(request.process_id).await;
|
self.release_process_id(request.process_id).await;
|
||||||
return Err(err);
|
return Err(err);
|
||||||
@@ -190,7 +192,7 @@ impl UnifiedExecProcessManager {
|
|||||||
);
|
);
|
||||||
emitter.emit(event_ctx, ToolEventStage::Begin).await;
|
emitter.emit(event_ctx, ToolEventStage::Begin).await;
|
||||||
|
|
||||||
start_streaming_output(&process, context, Arc::clone(&transcript));
|
start_streaming_output(process.clone(), context, Arc::clone(&transcript));
|
||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
// Persist live sessions before the initial yield wait so interrupting the
|
// Persist live sessions before the initial yield wait so interrupting the
|
||||||
// turn cannot drop the last Arc and terminate the background process.
|
// turn cannot drop the last Arc and terminate the background process.
|
||||||
@@ -312,7 +314,6 @@ impl UnifiedExecProcessManager {
|
|||||||
let process_id = request.process_id;
|
let process_id = request.process_id;
|
||||||
|
|
||||||
let PreparedProcessHandles {
|
let PreparedProcessHandles {
|
||||||
writer_tx,
|
|
||||||
output_buffer,
|
output_buffer,
|
||||||
output_notify,
|
output_notify,
|
||||||
output_closed,
|
output_closed,
|
||||||
@@ -329,7 +330,7 @@ impl UnifiedExecProcessManager {
|
|||||||
if !tty {
|
if !tty {
|
||||||
return Err(UnifiedExecError::StdinClosed);
|
return Err(UnifiedExecError::StdinClosed);
|
||||||
}
|
}
|
||||||
Self::send_input(&writer_tx, request.input.as_bytes()).await?;
|
process.write_stdin(request.input.as_bytes()).await?;
|
||||||
// Give the remote process a brief window to react so that we are
|
// Give the remote process a brief window to react so that we are
|
||||||
// more likely to capture its output in the poll below.
|
// more likely to capture its output in the poll below.
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
@@ -407,10 +408,10 @@ impl UnifiedExecProcessManager {
|
|||||||
return ProcessStatus::Unknown;
|
return ProcessStatus::Unknown;
|
||||||
};
|
};
|
||||||
|
|
||||||
let exit_code = entry.process.exit_code();
|
let exit_code = entry.backend.exit_code();
|
||||||
let process_id = entry.process_id;
|
let process_id = entry.process_id;
|
||||||
|
|
||||||
if entry.process.has_exited() {
|
if entry.backend.has_exited() {
|
||||||
let Some(entry) = store.remove(process_id) else {
|
let Some(entry) = store.remove(process_id) else {
|
||||||
return ProcessStatus::Unknown;
|
return ProcessStatus::Unknown;
|
||||||
};
|
};
|
||||||
@@ -448,14 +449,13 @@ impl UnifiedExecProcessManager {
|
|||||||
output_closed,
|
output_closed,
|
||||||
output_closed_notify,
|
output_closed_notify,
|
||||||
cancellation_token,
|
cancellation_token,
|
||||||
} = entry.process.output_handles();
|
} = entry.backend.output_handles();
|
||||||
let pause_state = entry
|
let pause_state = entry
|
||||||
.session
|
.session
|
||||||
.upgrade()
|
.upgrade()
|
||||||
.map(|session| session.subscribe_out_of_band_elicitation_pause_state());
|
.map(|session| session.subscribe_out_of_band_elicitation_pause_state());
|
||||||
|
|
||||||
Ok(PreparedProcessHandles {
|
Ok(PreparedProcessHandles {
|
||||||
writer_tx: entry.process.writer_sender(),
|
|
||||||
output_buffer,
|
output_buffer,
|
||||||
output_notify,
|
output_notify,
|
||||||
output_closed,
|
output_closed,
|
||||||
@@ -468,20 +468,10 @@ impl UnifiedExecProcessManager {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_input(
|
|
||||||
writer_tx: &mpsc::Sender<Vec<u8>>,
|
|
||||||
data: &[u8],
|
|
||||||
) -> Result<(), UnifiedExecError> {
|
|
||||||
writer_tx
|
|
||||||
.send(data.to_vec())
|
|
||||||
.await
|
|
||||||
.map_err(|_| UnifiedExecError::WriteToStdin)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
async fn store_process(
|
async fn store_process(
|
||||||
&self,
|
&self,
|
||||||
process: Arc<UnifiedExecProcess>,
|
process: Arc<ProcessBackend>,
|
||||||
context: &UnifiedExecContext,
|
context: &UnifiedExecContext,
|
||||||
command: &[String],
|
command: &[String],
|
||||||
cwd: PathBuf,
|
cwd: PathBuf,
|
||||||
@@ -492,7 +482,7 @@ impl UnifiedExecProcessManager {
|
|||||||
transcript: Arc<tokio::sync::Mutex<HeadTailBuffer>>,
|
transcript: Arc<tokio::sync::Mutex<HeadTailBuffer>>,
|
||||||
) {
|
) {
|
||||||
let entry = ProcessEntry {
|
let entry = ProcessEntry {
|
||||||
process: Arc::clone(&process),
|
backend: Arc::clone(&process),
|
||||||
call_id: context.call_id.clone(),
|
call_id: context.call_id.clone(),
|
||||||
process_id,
|
process_id,
|
||||||
command: command.to_vec(),
|
command: command.to_vec(),
|
||||||
@@ -511,7 +501,7 @@ impl UnifiedExecProcessManager {
|
|||||||
// network-approval cleanup only after dropping that lock.
|
// network-approval cleanup only after dropping that lock.
|
||||||
if let Some(pruned_entry) = pruned_entry {
|
if let Some(pruned_entry) = pruned_entry {
|
||||||
Self::unregister_network_approval_for_entry(&pruned_entry).await;
|
Self::unregister_network_approval_for_entry(&pruned_entry).await;
|
||||||
pruned_entry.process.terminate();
|
pruned_entry.backend.terminate();
|
||||||
}
|
}
|
||||||
|
|
||||||
if number_processes >= WARNING_UNIFIED_EXEC_PROCESSES {
|
if number_processes >= WARNING_UNIFIED_EXEC_PROCESSES {
|
||||||
@@ -540,9 +530,29 @@ impl UnifiedExecProcessManager {
|
|||||||
pub(crate) async fn open_session_with_exec_env(
|
pub(crate) async fn open_session_with_exec_env(
|
||||||
&self,
|
&self,
|
||||||
env: &ExecRequest,
|
env: &ExecRequest,
|
||||||
|
process_id: i32,
|
||||||
tty: bool,
|
tty: bool,
|
||||||
|
use_exec_server: bool,
|
||||||
|
mut executor: Option<Arc<dyn ExecProcess>>,
|
||||||
mut spawn_lifecycle: SpawnLifecycleHandle,
|
mut spawn_lifecycle: SpawnLifecycleHandle,
|
||||||
) -> Result<UnifiedExecProcess, UnifiedExecError> {
|
) -> Result<Arc<ProcessBackend>, UnifiedExecError> {
|
||||||
|
if use_exec_server {
|
||||||
|
return self
|
||||||
|
.open_session_with_exec_server(env, process_id, tty, &mut executor, spawn_lifecycle)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.open_session_with_local_process(env, process_id, tty, spawn_lifecycle)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn open_session_with_local_process(
|
||||||
|
&self,
|
||||||
|
env: &ExecRequest,
|
||||||
|
process_id: i32,
|
||||||
|
tty: bool,
|
||||||
|
spawn_lifecycle: SpawnLifecycleHandle,
|
||||||
|
) -> Result<Arc<ProcessBackend>, UnifiedExecError> {
|
||||||
let (program, args) = env
|
let (program, args) = env
|
||||||
.command
|
.command
|
||||||
.split_first()
|
.split_first()
|
||||||
@@ -571,10 +581,58 @@ impl UnifiedExecProcessManager {
|
|||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
};
|
};
|
||||||
let spawned =
|
|
||||||
spawn_result.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
|
let spawned = spawn_result.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
|
||||||
spawn_lifecycle.after_spawn();
|
spawn_lifecycle.after_spawn();
|
||||||
UnifiedExecProcess::from_spawned(spawned, env.sandbox, spawn_lifecycle).await
|
let process = UnifiedExecProcess::from_spawned(spawned, env.sandbox, spawn_lifecycle).await?;
|
||||||
|
Ok(Arc::new(ProcessBackend::Local {
|
||||||
|
process: Arc::new(process),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn open_session_with_exec_server(
|
||||||
|
&self,
|
||||||
|
env: &ExecRequest,
|
||||||
|
process_id: i32,
|
||||||
|
tty: bool,
|
||||||
|
mut executor: &mut Option<Arc<dyn ExecProcess>>,
|
||||||
|
spawn_lifecycle: SpawnLifecycleHandle,
|
||||||
|
) -> Result<Arc<ProcessBackend>, UnifiedExecError> {
|
||||||
|
let executor = executor
|
||||||
|
.take()
|
||||||
|
.ok_or_else(|| UnifiedExecError::create_process("exec-server unavailable".to_string()))?;
|
||||||
|
let response = executor
|
||||||
|
.start(ExecParams {
|
||||||
|
process_id: process_id.to_string(),
|
||||||
|
argv: env.command.clone(),
|
||||||
|
cwd: env.cwd.clone(),
|
||||||
|
env: env.env.clone(),
|
||||||
|
tty,
|
||||||
|
arg0: env.arg0.clone(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
|
||||||
|
let _ = response.process_id;
|
||||||
|
|
||||||
|
let process = Arc::new(ProcessBackend::ExecServer {
|
||||||
|
process_id: process_id.to_string(),
|
||||||
|
executor,
|
||||||
|
output_buffer: Arc::new(tokio::sync::Mutex::new(HeadTailBuffer::default())),
|
||||||
|
output_notify: Arc::new(Notify::new()),
|
||||||
|
output_closed: Arc::new(AtomicBool::new(false)),
|
||||||
|
output_closed_notify: Arc::new(Notify::new()),
|
||||||
|
output_drained: Arc::new(Notify::new()),
|
||||||
|
cancellation_token: CancellationToken::new(),
|
||||||
|
exit_code: Arc::new(RwLock::new(None)),
|
||||||
|
has_exited: Arc::new(AtomicBool::new(false)),
|
||||||
|
sandbox_type: env.sandbox,
|
||||||
|
output_seq: Arc::new(AtomicU64::new(0)),
|
||||||
|
});
|
||||||
|
|
||||||
|
spawn_lifecycle.after_spawn();
|
||||||
|
Self::spawn_exec_server_output_watcher(Arc::clone(&process));
|
||||||
|
|
||||||
|
Ok(process)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(super) async fn open_session_with_sandbox(
|
pub(super) async fn open_session_with_sandbox(
|
||||||
@@ -582,16 +640,20 @@ impl UnifiedExecProcessManager {
|
|||||||
request: &ExecCommandRequest,
|
request: &ExecCommandRequest,
|
||||||
cwd: PathBuf,
|
cwd: PathBuf,
|
||||||
context: &UnifiedExecContext,
|
context: &UnifiedExecContext,
|
||||||
) -> Result<(UnifiedExecProcess, Option<DeferredNetworkApproval>), UnifiedExecError> {
|
) -> Result<(Arc<ProcessBackend>, Option<DeferredNetworkApproval>), UnifiedExecError> {
|
||||||
let env = apply_unified_exec_env(create_env(
|
let env = apply_unified_exec_env(create_env(
|
||||||
&context.turn.shell_environment_policy,
|
&context.turn.shell_environment_policy,
|
||||||
Some(context.session.conversation_id),
|
Some(context.session.conversation_id),
|
||||||
));
|
));
|
||||||
let mut orchestrator = ToolOrchestrator::new();
|
let mut orchestrator = ToolOrchestrator::new();
|
||||||
let mut runtime = UnifiedExecRuntime::new(
|
let mut runtime = if context.turn.config.experimental_exec_server_url.is_some() {
|
||||||
self,
|
UnifiedExecRuntime::with_exec_server(
|
||||||
context.turn.tools_config.unified_exec_shell_mode.clone(),
|
self,
|
||||||
);
|
context.turn.tools_config.unified_exec_shell_mode.clone(),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
UnifiedExecRuntime::new(self, context.turn.tools_config.unified_exec_shell_mode.clone())
|
||||||
|
};
|
||||||
let exec_approval_requirement = context
|
let exec_approval_requirement = context
|
||||||
.session
|
.session
|
||||||
.services
|
.services
|
||||||
@@ -618,6 +680,8 @@ impl UnifiedExecProcessManager {
|
|||||||
tty: request.tty,
|
tty: request.tty,
|
||||||
sandbox_permissions: request.sandbox_permissions,
|
sandbox_permissions: request.sandbox_permissions,
|
||||||
additional_permissions: request.additional_permissions.clone(),
|
additional_permissions: request.additional_permissions.clone(),
|
||||||
|
process_id: request.process_id,
|
||||||
|
use_exec_server: context.turn.config.experimental_exec_server_url.is_some(),
|
||||||
#[cfg(unix)]
|
#[cfg(unix)]
|
||||||
additional_permissions_preapproved: request.additional_permissions_preapproved,
|
additional_permissions_preapproved: request.additional_permissions_preapproved,
|
||||||
justification: request.justification.clone(),
|
justification: request.justification.clone(),
|
||||||
@@ -775,7 +839,7 @@ impl UnifiedExecProcessManager {
|
|||||||
let meta: Vec<(i32, Instant, bool)> = store
|
let meta: Vec<(i32, Instant, bool)> = store
|
||||||
.processes
|
.processes
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(id, entry)| (*id, entry.last_used, entry.process.has_exited()))
|
.map(|(id, entry)| (*id, entry.last_used, entry.backend.has_exited()))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
if let Some(process_id) = Self::process_id_to_prune_from_meta(&meta) {
|
if let Some(process_id) = Self::process_id_to_prune_from_meta(&meta) {
|
||||||
@@ -828,11 +892,129 @@ impl UnifiedExecProcessManager {
|
|||||||
|
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
Self::unregister_network_approval_for_entry(&entry).await;
|
Self::unregister_network_approval_for_entry(&entry).await;
|
||||||
entry.process.terminate();
|
entry.backend.terminate();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl UnifiedExecProcessManager {
|
||||||
|
fn spawn_exec_server_output_watcher(process: Arc<ProcessBackend>) {
|
||||||
|
let (
|
||||||
|
process_id,
|
||||||
|
executor,
|
||||||
|
output_buffer,
|
||||||
|
output_notify,
|
||||||
|
output_closed,
|
||||||
|
output_closed_notify,
|
||||||
|
output_drained,
|
||||||
|
output_seq,
|
||||||
|
has_exited,
|
||||||
|
exit_code,
|
||||||
|
cancellation_token,
|
||||||
|
) = {
|
||||||
|
if let ProcessBackend::ExecServer {
|
||||||
|
process_id,
|
||||||
|
executor,
|
||||||
|
output_buffer,
|
||||||
|
output_notify,
|
||||||
|
output_closed,
|
||||||
|
output_closed_notify,
|
||||||
|
output_drained,
|
||||||
|
output_seq,
|
||||||
|
has_exited,
|
||||||
|
exit_code,
|
||||||
|
cancellation_token,
|
||||||
|
..
|
||||||
|
} = process.as_ref()
|
||||||
|
{
|
||||||
|
(
|
||||||
|
process_id.clone(),
|
||||||
|
Arc::clone(executor),
|
||||||
|
Arc::clone(output_buffer),
|
||||||
|
Arc::clone(output_notify),
|
||||||
|
Arc::clone(output_closed),
|
||||||
|
Arc::clone(output_closed_notify),
|
||||||
|
Arc::clone(output_drained),
|
||||||
|
Arc::clone(output_seq),
|
||||||
|
Arc::clone(has_exited),
|
||||||
|
Arc::clone(exit_code),
|
||||||
|
cancellation_token.clone(),
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut after_seq = None;
|
||||||
|
loop {
|
||||||
|
if cancellation_token.is_cancelled() {
|
||||||
|
output_closed.store(true, Ordering::Release);
|
||||||
|
output_closed_notify.notify_waiters();
|
||||||
|
output_drained.notify_waiters();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let response = match executor
|
||||||
|
.read(ReadParams {
|
||||||
|
process_id: process_id.clone(),
|
||||||
|
after_seq,
|
||||||
|
max_bytes: Some(64 * 1024),
|
||||||
|
wait_ms: Some(100),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(response) => response,
|
||||||
|
Err(_) => {
|
||||||
|
let mut guard = exit_code.write().unwrap_or_else(|err| err.into_inner());
|
||||||
|
if guard.is_none() {
|
||||||
|
*guard = Some(-1);
|
||||||
|
}
|
||||||
|
has_exited.store(true, Ordering::Release);
|
||||||
|
output_closed.store(true, Ordering::Release);
|
||||||
|
output_closed_notify.notify_waiters();
|
||||||
|
output_drained.notify_waiters();
|
||||||
|
cancellation_token.cancel();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if !response.chunks.is_empty() {
|
||||||
|
let mut current_seq = output_seq.load(Ordering::Acquire);
|
||||||
|
for chunk in response.chunks {
|
||||||
|
if chunk.seq < current_seq {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut guard = output_buffer.lock().await;
|
||||||
|
guard.push_chunk(chunk.chunk.into_inner());
|
||||||
|
}
|
||||||
|
current_seq = chunk.seq.saturating_add(1);
|
||||||
|
output_seq.store(current_seq, Ordering::Release);
|
||||||
|
output_notify.notify_waiters();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(code) = response.exit_code {
|
||||||
|
let mut guard = exit_code.write().unwrap_or_else(|err| err.into_inner());
|
||||||
|
*guard = Some(code);
|
||||||
|
}
|
||||||
|
after_seq = Some(response.next_seq);
|
||||||
|
|
||||||
|
if response.exited {
|
||||||
|
has_exited.store(true, Ordering::Release);
|
||||||
|
output_closed.store(true, Ordering::Release);
|
||||||
|
output_closed_notify.notify_waiters();
|
||||||
|
output_drained.notify_waiters();
|
||||||
|
cancellation_token.cancel();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
enum ProcessStatus {
|
enum ProcessStatus {
|
||||||
Alive {
|
Alive {
|
||||||
exit_code: Option<i32>,
|
exit_code: Option<i32>,
|
||||||
|
|||||||
Reference in New Issue
Block a user