mirror of
https://github.com/openai/codex.git
synced 2026-03-21 06:03:49 +00:00
Compare commits
2 Commits
main
...
codex-exec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0ee08bc702 | ||
|
|
04190334dc |
@@ -25,7 +25,6 @@ use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::header;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
use wiremock::matchers::query_param;
|
||||
|
||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const TEST_CURATED_PLUGIN_SHA: &str = "0123456789abcdef0123456789abcdef01234567";
|
||||
@@ -679,7 +678,6 @@ async fn plugin_list_force_remote_sync_reconciles_curated_plugin_state() -> Resu
|
||||
.await;
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/backend-api/plugins/featured"))
|
||||
.and(query_param("platform", "codex"))
|
||||
.and(header("authorization", "Bearer chatgpt-token"))
|
||||
.and(header("chatgpt-account-id", "account-123"))
|
||||
.respond_with(
|
||||
@@ -786,7 +784,6 @@ async fn app_server_startup_remote_plugin_sync_runs_once() -> Result<()> {
|
||||
.await;
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/backend-api/plugins/featured"))
|
||||
.and(query_param("platform", "codex"))
|
||||
.and(header("authorization", "Bearer chatgpt-token"))
|
||||
.and(header("chatgpt-account-id", "account-123"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_string(r#"["linear@openai-curated"]"#))
|
||||
@@ -853,7 +850,6 @@ async fn plugin_list_fetches_featured_plugin_ids_without_chatgpt_auth() -> Resul
|
||||
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/backend-api/plugins/featured"))
|
||||
.and(query_param("platform", "codex"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_string(r#"["linear@openai-curated"]"#))
|
||||
.mount(&server)
|
||||
.await;
|
||||
@@ -892,7 +888,6 @@ async fn plugin_list_uses_warmed_featured_plugin_ids_cache_on_first_request() ->
|
||||
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/backend-api/plugins/featured"))
|
||||
.and(query_param("platform", "codex"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_string(r#"["linear@openai-curated"]"#))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
|
||||
@@ -549,7 +549,7 @@ pub fn render_decision_for_unmatched_command(
|
||||
|
||||
// On Windows, ReadOnly sandbox is not a real sandbox, so special-case it
|
||||
// here.
|
||||
let environment_lacks_sandbox_protections =
|
||||
let runtime_sandbox_provides_safety =
|
||||
cfg!(windows) && matches!(sandbox_policy, SandboxPolicy::ReadOnly { .. });
|
||||
|
||||
// If the command is flagged as dangerous or we have no sandbox protection,
|
||||
@@ -558,20 +558,9 @@ pub fn render_decision_for_unmatched_command(
|
||||
// We prefer to prompt the user rather than outright forbid the command,
|
||||
// but if the user has explicitly disabled prompts, we must
|
||||
// forbid the command.
|
||||
if command_might_be_dangerous(command) || environment_lacks_sandbox_protections {
|
||||
if command_might_be_dangerous(command) || runtime_sandbox_provides_safety {
|
||||
return match approval_policy {
|
||||
AskForApproval::Never => {
|
||||
let sandbox_is_explicitly_disabled = matches!(
|
||||
sandbox_policy,
|
||||
SandboxPolicy::DangerFullAccess | SandboxPolicy::ExternalSandbox { .. }
|
||||
);
|
||||
if sandbox_is_explicitly_disabled {
|
||||
// If the sandbox is explicitly disabled, we should allow the command to run
|
||||
Decision::Allow
|
||||
} else {
|
||||
Decision::Forbidden
|
||||
}
|
||||
}
|
||||
AskForApproval::Never => Decision::Forbidden,
|
||||
AskForApproval::OnFailure
|
||||
| AskForApproval::OnRequest
|
||||
| AskForApproval::UnlessTrusted
|
||||
|
||||
@@ -81,10 +81,6 @@ fn unrestricted_file_system_sandbox_policy() -> FileSystemSandboxPolicy {
|
||||
FileSystemSandboxPolicy::unrestricted()
|
||||
}
|
||||
|
||||
fn external_file_system_sandbox_policy() -> FileSystemSandboxPolicy {
|
||||
FileSystemSandboxPolicy::external_sandbox()
|
||||
}
|
||||
|
||||
async fn test_config() -> (TempDir, Config) {
|
||||
let home = TempDir::new().expect("create temp dir");
|
||||
let config = ConfigBuilder::default()
|
||||
@@ -1690,100 +1686,3 @@ async fn verify_approval_requirement_for_unsafe_powershell_command() {
|
||||
(unless AskForApproval::Never is specified)."#
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn dangerous_command_allowed_when_sandbox_is_explicitly_disabled() {
|
||||
let command = vec_str(&["rm", "-rf", "/tmp/nonexistent"]);
|
||||
assert_exec_approval_requirement_for_command(
|
||||
ExecApprovalRequirementScenario {
|
||||
policy_src: None,
|
||||
command,
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::ExternalSandbox {
|
||||
network_access: Default::default(),
|
||||
},
|
||||
file_system_sandbox_policy: external_file_system_sandbox_policy(),
|
||||
sandbox_permissions: SandboxPermissions::UseDefault,
|
||||
prefix_rule: None,
|
||||
},
|
||||
ExecApprovalRequirement::Skip {
|
||||
bypass_sandbox: false,
|
||||
proposed_execpolicy_amendment: Some(ExecPolicyAmendment {
|
||||
command: vec_str(&["rm", "-rf", "/tmp/nonexistent"]),
|
||||
}),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn dangerous_command_forbidden_in_external_sandbox_when_policy_matches() {
|
||||
let command = vec_str(&["rm", "-rf", "/tmp/nonexistent"]);
|
||||
assert_exec_approval_requirement_for_command(
|
||||
ExecApprovalRequirementScenario {
|
||||
policy_src: Some("prefix_rule(pattern=['rm'], decision='prompt')".to_string()),
|
||||
command,
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::ExternalSandbox {
|
||||
network_access: Default::default(),
|
||||
},
|
||||
file_system_sandbox_policy: external_file_system_sandbox_policy(),
|
||||
sandbox_permissions: SandboxPermissions::UseDefault,
|
||||
prefix_rule: None,
|
||||
},
|
||||
ExecApprovalRequirement::Forbidden {
|
||||
reason: "approval required by policy, but AskForApproval is set to Never".to_string(),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
struct ExecApprovalRequirementScenario {
|
||||
/// Source for the Starlark `.rules` file.
|
||||
policy_src: Option<String>,
|
||||
command: Vec<String>,
|
||||
approval_policy: AskForApproval,
|
||||
sandbox_policy: SandboxPolicy,
|
||||
file_system_sandbox_policy: FileSystemSandboxPolicy,
|
||||
sandbox_permissions: SandboxPermissions,
|
||||
prefix_rule: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
async fn assert_exec_approval_requirement_for_command(
|
||||
test: ExecApprovalRequirementScenario,
|
||||
expected_requirement: ExecApprovalRequirement,
|
||||
) {
|
||||
let ExecApprovalRequirementScenario {
|
||||
policy_src,
|
||||
command,
|
||||
approval_policy,
|
||||
sandbox_policy,
|
||||
file_system_sandbox_policy,
|
||||
sandbox_permissions,
|
||||
prefix_rule,
|
||||
} = test;
|
||||
|
||||
let policy = match policy_src {
|
||||
Some(src) => {
|
||||
let mut parser = PolicyParser::new();
|
||||
parser
|
||||
.parse("test.rules", src.as_str())
|
||||
.expect("parse policy");
|
||||
Arc::new(parser.build())
|
||||
}
|
||||
None => Arc::new(Policy::empty()),
|
||||
};
|
||||
|
||||
let requirement = ExecPolicyManager::new(policy)
|
||||
.create_exec_approval_requirement_for_command(ExecApprovalRequest {
|
||||
command: &command,
|
||||
approval_policy,
|
||||
sandbox_policy: &sandbox_policy,
|
||||
file_system_sandbox_policy: &file_system_sandbox_policy,
|
||||
sandbox_permissions,
|
||||
prefix_rule,
|
||||
})
|
||||
.await;
|
||||
|
||||
assert_eq!(requirement, expected_requirement);
|
||||
}
|
||||
|
||||
@@ -622,8 +622,7 @@ impl PluginsManager {
|
||||
if let Some(featured_plugin_ids) = self.cached_featured_plugin_ids(&cache_key) {
|
||||
return Ok(featured_plugin_ids);
|
||||
}
|
||||
let featured_plugin_ids =
|
||||
fetch_remote_featured_plugin_ids(config, auth, self.restriction_product).await?;
|
||||
let featured_plugin_ids = fetch_remote_featured_plugin_ids(config, auth).await?;
|
||||
self.write_featured_plugin_ids_cache(cache_key, &featured_plugin_ids);
|
||||
Ok(featured_plugin_ids)
|
||||
}
|
||||
|
||||
@@ -23,7 +23,6 @@ use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::header;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
use wiremock::matchers::query_param;
|
||||
|
||||
fn write_plugin(root: &Path, dir_name: &str, manifest_name: &str) {
|
||||
let plugin_root = root.join(dir_name);
|
||||
@@ -1900,74 +1899,6 @@ plugins = true
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn featured_plugin_ids_for_config_uses_restriction_product_query_param() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
write_file(
|
||||
&tmp.path().join(CONFIG_TOML_FILE),
|
||||
r#"[features]
|
||||
plugins = true
|
||||
"#,
|
||||
);
|
||||
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/backend-api/plugins/featured"))
|
||||
.and(query_param("platform", "chat"))
|
||||
.and(header("authorization", "Bearer Access Token"))
|
||||
.and(header("chatgpt-account-id", "account_id"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_string(r#"["chat-plugin"]"#))
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let mut config = load_config(tmp.path(), tmp.path()).await;
|
||||
config.chatgpt_base_url = format!("{}/backend-api/", server.uri());
|
||||
let manager = PluginsManager::new_with_restriction_product(
|
||||
tmp.path().to_path_buf(),
|
||||
Some(Product::Chatgpt),
|
||||
);
|
||||
|
||||
let featured_plugin_ids = manager
|
||||
.featured_plugin_ids_for_config(
|
||||
&config,
|
||||
Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(featured_plugin_ids, vec!["chat-plugin".to_string()]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn featured_plugin_ids_for_config_defaults_query_param_to_codex() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
write_file(
|
||||
&tmp.path().join(CONFIG_TOML_FILE),
|
||||
r#"[features]
|
||||
plugins = true
|
||||
"#,
|
||||
);
|
||||
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("GET"))
|
||||
.and(path("/backend-api/plugins/featured"))
|
||||
.and(query_param("platform", "codex"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_string(r#"["codex-plugin"]"#))
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let mut config = load_config(tmp.path(), tmp.path()).await;
|
||||
config.chatgpt_base_url = format!("{}/backend-api/", server.uri());
|
||||
let manager = PluginsManager::new_with_restriction_product(tmp.path().to_path_buf(), None);
|
||||
|
||||
let featured_plugin_ids = manager
|
||||
.featured_plugin_ids_for_config(&config, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(featured_plugin_ids, vec!["codex-plugin".to_string()]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn refresh_curated_plugin_cache_replaces_existing_local_version_with_sha() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use crate::auth::CodexAuth;
|
||||
use crate::config::Config;
|
||||
use crate::default_client::build_reqwest_client;
|
||||
use codex_protocol::protocol::Product;
|
||||
use serde::Deserialize;
|
||||
use std::time::Duration;
|
||||
use url::Url;
|
||||
@@ -163,17 +162,12 @@ pub(crate) async fn fetch_remote_plugin_status(
|
||||
pub async fn fetch_remote_featured_plugin_ids(
|
||||
config: &Config,
|
||||
auth: Option<&CodexAuth>,
|
||||
product: Option<Product>,
|
||||
) -> Result<Vec<String>, RemotePluginFetchError> {
|
||||
let base_url = config.chatgpt_base_url.trim_end_matches('/');
|
||||
let url = format!("{base_url}/plugins/featured");
|
||||
let client = build_reqwest_client();
|
||||
let mut request = client
|
||||
.get(&url)
|
||||
.query(&[(
|
||||
"platform",
|
||||
product.unwrap_or(Product::Codex).to_app_platform(),
|
||||
)])
|
||||
.timeout(REMOTE_FEATURED_PLUGIN_FETCH_TIMEOUT);
|
||||
|
||||
if let Some(auth) = auth.filter(|auth| auth.is_chatgpt_auth()) {
|
||||
|
||||
@@ -34,7 +34,7 @@ use crate::tools::sandboxing::with_cached_approval;
|
||||
use crate::tools::spec::UnifiedExecShellMode;
|
||||
use crate::unified_exec::NoopSpawnLifecycle;
|
||||
use crate::unified_exec::UnifiedExecError;
|
||||
use crate::unified_exec::UnifiedExecProcess;
|
||||
use crate::unified_exec::ProcessBackend;
|
||||
use crate::unified_exec::UnifiedExecProcessManager;
|
||||
use codex_network_proxy::NetworkProxy;
|
||||
use codex_protocol::models::PermissionProfile;
|
||||
@@ -53,6 +53,8 @@ pub struct UnifiedExecRequest {
|
||||
pub tty: bool,
|
||||
pub sandbox_permissions: SandboxPermissions,
|
||||
pub additional_permissions: Option<PermissionProfile>,
|
||||
pub process_id: i32,
|
||||
pub use_exec_server: bool,
|
||||
#[cfg(unix)]
|
||||
pub additional_permissions_preapproved: bool,
|
||||
pub justification: Option<String>,
|
||||
@@ -71,6 +73,7 @@ pub struct UnifiedExecApprovalKey {
|
||||
pub struct UnifiedExecRuntime<'a> {
|
||||
manager: &'a UnifiedExecProcessManager,
|
||||
shell_mode: UnifiedExecShellMode,
|
||||
use_exec_server: bool,
|
||||
}
|
||||
|
||||
impl<'a> UnifiedExecRuntime<'a> {
|
||||
@@ -78,6 +81,18 @@ impl<'a> UnifiedExecRuntime<'a> {
|
||||
Self {
|
||||
manager,
|
||||
shell_mode,
|
||||
use_exec_server: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_exec_server(
|
||||
manager: &'a UnifiedExecProcessManager,
|
||||
shell_mode: UnifiedExecShellMode,
|
||||
) -> Self {
|
||||
Self {
|
||||
manager,
|
||||
shell_mode,
|
||||
use_exec_server: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -172,7 +187,7 @@ impl Approvable<UnifiedExecRequest> for UnifiedExecRuntime<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRuntime<'a> {
|
||||
impl<'a> ToolRuntime<UnifiedExecRequest, Arc<ProcessBackend>> for UnifiedExecRuntime<'a> {
|
||||
fn network_approval_spec(
|
||||
&self,
|
||||
req: &UnifiedExecRequest,
|
||||
@@ -190,7 +205,13 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
|
||||
req: &UnifiedExecRequest,
|
||||
attempt: &SandboxAttempt<'_>,
|
||||
ctx: &ToolCtx,
|
||||
) -> Result<UnifiedExecProcess, ToolError> {
|
||||
) -> Result<Arc<ProcessBackend>, ToolError> {
|
||||
let use_exec_server = if self.use_exec_server {
|
||||
true
|
||||
} else {
|
||||
req.use_exec_server
|
||||
};
|
||||
|
||||
let base_command = &req.command;
|
||||
let session_shell = ctx.session.user_shell();
|
||||
let command = maybe_wrap_shell_lc_with_snapshot(
|
||||
@@ -237,7 +258,10 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
|
||||
.manager
|
||||
.open_session_with_exec_env(
|
||||
&prepared.exec_request,
|
||||
req.process_id,
|
||||
req.tty,
|
||||
use_exec_server,
|
||||
Some(ctx.session.services.environment.get_executor()),
|
||||
prepared.spawn_lifecycle,
|
||||
)
|
||||
.await
|
||||
@@ -272,7 +296,18 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
|
||||
.env_for(spec, req.network.as_ref())
|
||||
.map_err(|err| ToolError::Codex(err.into()))?;
|
||||
self.manager
|
||||
.open_session_with_exec_env(&exec_env, req.tty, Box::new(NoopSpawnLifecycle))
|
||||
.open_session_with_exec_env(
|
||||
&exec_env,
|
||||
req.process_id,
|
||||
req.tty,
|
||||
use_exec_server,
|
||||
if use_exec_server {
|
||||
Some(ctx.session.services.environment.get_executor())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
Box::new(NoopSpawnLifecycle),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
UnifiedExecError::SandboxDenied { output, .. } => {
|
||||
|
||||
@@ -18,10 +18,12 @@ use crate::protocol::EventMsg;
|
||||
use crate::protocol::ExecCommandOutputDeltaEvent;
|
||||
use crate::protocol::ExecCommandSource;
|
||||
use crate::protocol::ExecOutputStream;
|
||||
use crate::unified_exec::process::OutputHandles;
|
||||
use crate::tools::events::ToolEmitter;
|
||||
use crate::tools::events::ToolEventCtx;
|
||||
use crate::tools::events::ToolEventStage;
|
||||
use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
|
||||
use crate::unified_exec::ProcessBackend;
|
||||
|
||||
pub(crate) const TRAILING_OUTPUT_GRACE: Duration = Duration::from_millis(100);
|
||||
|
||||
@@ -37,21 +39,81 @@ const UNIFIED_EXEC_OUTPUT_DELTA_MAX_BYTES: usize = 8192;
|
||||
/// shared transcript, and emits ExecCommandOutputDelta events on UTF‑8
|
||||
/// boundaries.
|
||||
pub(crate) fn start_streaming_output(
|
||||
process: &UnifiedExecProcess,
|
||||
process: Arc<ProcessBackend>,
|
||||
context: &UnifiedExecContext,
|
||||
transcript: Arc<Mutex<HeadTailBuffer>>,
|
||||
) {
|
||||
let mut receiver = process.output_receiver();
|
||||
let output_drained = process.output_drained_notify();
|
||||
let exit_token = process.cancellation_token();
|
||||
|
||||
let session_ref = Arc::clone(&context.session);
|
||||
let turn_ref = Arc::clone(&context.turn);
|
||||
let call_id = context.call_id.clone();
|
||||
let OutputHandles {
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_closed,
|
||||
output_closed_notify,
|
||||
cancellation_token,
|
||||
} = process.output_handles();
|
||||
|
||||
if let Some(local_process) = process.as_local_process() {
|
||||
let mut receiver = local_process.output_receiver();
|
||||
let output_drained = local_process.output_drained_notify();
|
||||
tokio::spawn(async move {
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
|
||||
let mut pending = Vec::<u8>::new();
|
||||
let mut emitted_deltas: usize = 0;
|
||||
|
||||
let mut grace_sleep: Option<Pin<Box<Sleep>>> = None;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancellation_token.cancelled(), if grace_sleep.is_none() => {
|
||||
let deadline = Instant::now() + TRAILING_OUTPUT_GRACE;
|
||||
grace_sleep.replace(Box::pin(tokio::time::sleep_until(deadline)));
|
||||
}
|
||||
|
||||
_ = async {
|
||||
if let Some(sleep) = grace_sleep.as_mut() {
|
||||
sleep.as_mut().await;
|
||||
}
|
||||
}, if grace_sleep.is_some() => {
|
||||
output_drained.notify_one();
|
||||
break;
|
||||
}
|
||||
|
||||
received = receiver.recv() => {
|
||||
let chunk = match received {
|
||||
Ok(chunk) => chunk,
|
||||
Err(RecvError::Lagged(_)) => {
|
||||
continue;
|
||||
},
|
||||
Err(RecvError::Closed) => {
|
||||
output_drained.notify_one();
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
process_chunk(
|
||||
&mut pending,
|
||||
&transcript,
|
||||
&call_id,
|
||||
&session_ref,
|
||||
&turn_ref,
|
||||
&mut emitted_deltas,
|
||||
chunk,
|
||||
).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
let output_drained = process
|
||||
.output_drained()
|
||||
.unwrap_or_else(|| Arc::new(Notify::new()));
|
||||
|
||||
tokio::spawn(async move {
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
|
||||
let mut pending = Vec::<u8>::new();
|
||||
let mut emitted_deltas: usize = 0;
|
||||
|
||||
@@ -59,7 +121,7 @@ pub(crate) fn start_streaming_output(
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = exit_token.cancelled(), if grace_sleep.is_none() => {
|
||||
_ = cancellation_token.cancelled(), if grace_sleep.is_none() => {
|
||||
let deadline = Instant::now() + TRAILING_OUTPUT_GRACE;
|
||||
grace_sleep.replace(Box::pin(tokio::time::sleep_until(deadline)));
|
||||
}
|
||||
@@ -73,27 +135,35 @@ pub(crate) fn start_streaming_output(
|
||||
break;
|
||||
}
|
||||
|
||||
received = receiver.recv() => {
|
||||
let chunk = match received {
|
||||
Ok(chunk) => chunk,
|
||||
Err(RecvError::Lagged(_)) => {
|
||||
continue;
|
||||
},
|
||||
Err(RecvError::Closed) => {
|
||||
_ = output_notify.notified() => {
|
||||
let drained_chunks = {
|
||||
let mut guard = output_buffer.lock().await;
|
||||
guard.drain_chunks()
|
||||
};
|
||||
if drained_chunks.is_empty() {
|
||||
if cancellation_token.is_cancelled() && output_closed.load(std::sync::atomic::Ordering::Acquire) {
|
||||
output_drained.notify_one();
|
||||
break;
|
||||
}
|
||||
};
|
||||
continue;
|
||||
}
|
||||
|
||||
process_chunk(
|
||||
&mut pending,
|
||||
&transcript,
|
||||
&call_id,
|
||||
&session_ref,
|
||||
&turn_ref,
|
||||
&mut emitted_deltas,
|
||||
chunk,
|
||||
).await;
|
||||
for chunk in drained_chunks {
|
||||
process_chunk(
|
||||
&mut pending,
|
||||
&transcript,
|
||||
&call_id,
|
||||
&session_ref,
|
||||
&turn_ref,
|
||||
&mut emitted_deltas,
|
||||
chunk,
|
||||
).await;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
_ = output_closed_notify.notified(), if grace_sleep.is_none() => {
|
||||
grace_sleep.replace(Box::pin(tokio::time::sleep_until(Instant::now() + TRAILING_OUTPUT_GRACE)));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -104,7 +174,7 @@ pub(crate) fn start_streaming_output(
|
||||
/// single ExecCommandEnd event with the aggregated transcript.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn spawn_exit_watcher(
|
||||
process: Arc<UnifiedExecProcess>,
|
||||
process: Arc<ProcessBackend>,
|
||||
session_ref: Arc<Session>,
|
||||
turn_ref: Arc<TurnContext>,
|
||||
call_id: String,
|
||||
@@ -114,8 +184,12 @@ pub(crate) fn spawn_exit_watcher(
|
||||
transcript: Arc<Mutex<HeadTailBuffer>>,
|
||||
started_at: Instant,
|
||||
) {
|
||||
let exit_token = process.cancellation_token();
|
||||
let output_drained = process.output_drained_notify();
|
||||
let Some(exit_token) = process.cancellation_token() else {
|
||||
return;
|
||||
};
|
||||
let Some(output_drained) = process.output_drained() else {
|
||||
return;
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
exit_token.cancelled().await;
|
||||
|
||||
@@ -25,17 +25,30 @@ use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::Weak;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use codex_exec_server::process::ExecProcess;
|
||||
use codex_network_proxy::NetworkProxy;
|
||||
use codex_protocol::models::PermissionProfile;
|
||||
use rand::Rng;
|
||||
use rand::rng;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::Notify;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::sandboxing::SandboxPermissions;
|
||||
use crate::exec::is_likely_sandbox_denied;
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
use crate::exec::SandboxType;
|
||||
use crate::exec::StreamOutput;
|
||||
use crate::truncate::TruncationPolicy;
|
||||
use crate::truncate::formatted_truncate_text;
|
||||
|
||||
mod async_watcher;
|
||||
mod errors;
|
||||
@@ -142,7 +155,7 @@ impl Default for UnifiedExecProcessManager {
|
||||
}
|
||||
|
||||
struct ProcessEntry {
|
||||
process: Arc<UnifiedExecProcess>,
|
||||
backend: ProcessBackend,
|
||||
call_id: String,
|
||||
process_id: i32,
|
||||
command: Vec<String>,
|
||||
@@ -152,6 +165,240 @@ struct ProcessEntry {
|
||||
last_used: tokio::time::Instant,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) enum ProcessBackend {
|
||||
Local {
|
||||
process: Arc<UnifiedExecProcess>,
|
||||
},
|
||||
ExecServer {
|
||||
process_id: String,
|
||||
executor: Arc<dyn ExecProcess>,
|
||||
output_buffer: crate::unified_exec::process::OutputBuffer,
|
||||
output_notify: Arc<Notify>,
|
||||
output_closed: Arc<AtomicBool>,
|
||||
output_closed_notify: Arc<Notify>,
|
||||
output_drained: Arc<Notify>,
|
||||
cancellation_token: CancellationToken,
|
||||
exit_code: Arc<RwLock<Option<i32>>>,
|
||||
has_exited: Arc<AtomicBool>,
|
||||
sandbox_type: SandboxType,
|
||||
output_seq: Arc<AtomicU64>,
|
||||
},
|
||||
}
|
||||
|
||||
impl ProcessBackend {
|
||||
pub(crate) fn is_local(&self) -> bool {
|
||||
matches!(self, Self::Local { .. })
|
||||
}
|
||||
|
||||
pub(crate) fn as_local_process(&self) -> Option<&Arc<UnifiedExecProcess>> {
|
||||
match self {
|
||||
Self::Local { process } => Some(process),
|
||||
Self::ExecServer { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn output_handles(
|
||||
&self,
|
||||
) -> (
|
||||
crate::unified_exec::process::OutputBuffer,
|
||||
Arc<Notify>,
|
||||
Arc<AtomicBool>,
|
||||
Arc<Notify>,
|
||||
CancellationToken,
|
||||
) {
|
||||
match self {
|
||||
Self::Local { process } => {
|
||||
let handles = process.output_handles();
|
||||
(
|
||||
handles.output_buffer,
|
||||
handles.output_notify,
|
||||
handles.output_closed,
|
||||
handles.output_closed_notify,
|
||||
handles.cancellation_token,
|
||||
)
|
||||
}
|
||||
Self::ExecServer {
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_closed,
|
||||
output_closed_notify,
|
||||
cancellation_token,
|
||||
..
|
||||
} => (
|
||||
Arc::clone(output_buffer),
|
||||
Arc::clone(output_notify),
|
||||
Arc::clone(output_closed),
|
||||
Arc::clone(output_closed_notify),
|
||||
cancellation_token.clone(),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn check_for_sandbox_denial_with_text(
|
||||
&self,
|
||||
text: &str,
|
||||
) -> Result<(), UnifiedExecError> {
|
||||
let sandbox_type = self.sandbox_type();
|
||||
if sandbox_type == SandboxType::None || !self.has_exited() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match self {
|
||||
Self::Local { process } => {
|
||||
process.check_for_sandbox_denial_with_text(text).await
|
||||
}
|
||||
Self::ExecServer { .. } => {
|
||||
let exit_code = self.exit_code().unwrap_or(-1);
|
||||
let exec_output = ExecToolCallOutput {
|
||||
exit_code,
|
||||
stderr: StreamOutput::new(text.to_string()),
|
||||
aggregated_output: StreamOutput::new(text.to_string()),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
if is_likely_sandbox_denied(sandbox_type, &exec_output) {
|
||||
let snippet = formatted_truncate_text(
|
||||
text,
|
||||
TruncationPolicy::Tokens(UNIFIED_EXEC_OUTPUT_MAX_TOKENS),
|
||||
);
|
||||
let message = if snippet.is_empty() {
|
||||
format!("Process exited with code {exit_code}")
|
||||
} else {
|
||||
snippet
|
||||
};
|
||||
return Err(UnifiedExecError::sandbox_denied(message, exec_output));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn exit_code(&self) -> Option<i32> {
|
||||
match self {
|
||||
Self::Local { process } => process.exit_code(),
|
||||
Self::ExecServer { exit_code, .. } => *exit_code.read().unwrap_or_else(|err| err.into_inner()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_exit_code(&self, exit_code: i32) {
|
||||
if let Self::ExecServer { exit_code: state, .. } = self {
|
||||
let mut guard = state.write().unwrap_or_else(|err| err.into_inner());
|
||||
*guard = Some(exit_code);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn mark_exited(&self) {
|
||||
if let Self::ExecServer { has_exited, .. } = self {
|
||||
has_exited.store(true, Ordering::Release);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn has_exited(&self) -> bool {
|
||||
match self {
|
||||
Self::Local { process } => process.has_exited(),
|
||||
Self::ExecServer { has_exited, .. } => has_exited.load(Ordering::Acquire),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn cancellation_token(&self) -> Option<CancellationToken> {
|
||||
match self {
|
||||
Self::Local { process } => Some(process.cancellation_token()),
|
||||
Self::ExecServer {
|
||||
cancellation_token, ..
|
||||
} => Some(cancellation_token.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn output_drained(&self) -> Option<Arc<Notify>> {
|
||||
match self {
|
||||
Self::Local { process } => Some(process.output_drained_notify()),
|
||||
Self::ExecServer { output_drained, .. } => Some(Arc::clone(output_drained)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn exit_code_handle(&self) -> Option<Arc<RwLock<Option<i32>>> {
|
||||
match self {
|
||||
Self::ExecServer { exit_code, .. } => Some(Arc::clone(exit_code)),
|
||||
Self::Local { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn remote_output_seq(&self) -> Option<Arc<AtomicU64>> {
|
||||
match self {
|
||||
Self::ExecServer { output_seq, .. } => Some(Arc::clone(output_seq)),
|
||||
Self::Local { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn write_stdin(&self, data: &[u8]) -> Result<(), UnifiedExecError> {
|
||||
match self {
|
||||
Self::Local { process } => process
|
||||
.writer_sender()
|
||||
.send(data.to_vec())
|
||||
.await
|
||||
.map_err(|_| UnifiedExecError::WriteToStdin)?,
|
||||
Self::ExecServer {
|
||||
process_id,
|
||||
executor,
|
||||
..
|
||||
} => {
|
||||
let response = executor
|
||||
.write(process_id.as_str(), data.to_vec())
|
||||
.await
|
||||
.map_err(|_| UnifiedExecError::WriteToStdin)?;
|
||||
if !response.accepted {
|
||||
return Err(UnifiedExecError::WriteToStdin);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn terminate(&self) {
|
||||
match self {
|
||||
Self::Local { process } => process.terminate(),
|
||||
Self::ExecServer {
|
||||
process_id,
|
||||
executor,
|
||||
output_closed,
|
||||
output_closed_notify,
|
||||
output_drained,
|
||||
cancellation_token,
|
||||
has_exited,
|
||||
..
|
||||
} => {
|
||||
has_exited.store(true, Ordering::Release);
|
||||
output_closed.store(true, Ordering::Release);
|
||||
output_closed_notify.notify_waiters();
|
||||
output_drained.notify_one();
|
||||
cancellation_token.cancel();
|
||||
let _ = executor.terminate(process_id.as_str()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn sandbox_type(&self) -> SandboxType {
|
||||
match self {
|
||||
Self::Local { process } => process.sandbox_type(),
|
||||
Self::ExecServer { sandbox_type, .. } => *sandbox_type,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn remote_exec_state(&self) -> Option<(&str, &Arc<dyn ExecProcess>)> {
|
||||
match self {
|
||||
Self::ExecServer {
|
||||
process_id,
|
||||
executor,
|
||||
..
|
||||
} => Some((process_id.as_str(), executor)),
|
||||
Self::Local { .. } => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn clamp_yield_time(yield_time_ms: u64) -> u64 {
|
||||
yield_time_ms.clamp(MIN_YIELD_TIME_MS, MAX_YIELD_TIME_MS)
|
||||
}
|
||||
|
||||
@@ -4,14 +4,18 @@ use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::watch;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use codex_exec_server::process::ExecProcess;
|
||||
use codex_exec_server::protocol::ExecParams;
|
||||
use codex_exec_server::protocol::ReadParams;
|
||||
|
||||
use crate::exec_env::create_env;
|
||||
use crate::exec_policy::ExecApprovalRequest;
|
||||
@@ -34,6 +38,7 @@ use crate::unified_exec::MAX_YIELD_TIME_MS;
|
||||
use crate::unified_exec::MIN_EMPTY_YIELD_TIME_MS;
|
||||
use crate::unified_exec::MIN_YIELD_TIME_MS;
|
||||
use crate::unified_exec::ProcessEntry;
|
||||
use crate::unified_exec::ProcessBackend;
|
||||
use crate::unified_exec::ProcessStore;
|
||||
use crate::unified_exec::UnifiedExecContext;
|
||||
use crate::unified_exec::UnifiedExecError;
|
||||
@@ -49,7 +54,6 @@ use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
|
||||
use crate::unified_exec::process::OutputBuffer;
|
||||
use crate::unified_exec::process::OutputHandles;
|
||||
use crate::unified_exec::process::SpawnLifecycleHandle;
|
||||
use crate::unified_exec::process::UnifiedExecProcess;
|
||||
|
||||
const UNIFIED_EXEC_ENV: [(&str, &str); 10] = [
|
||||
("NO_COLOR", "1"),
|
||||
@@ -90,7 +94,7 @@ fn apply_unified_exec_env(mut env: HashMap<String, String>) -> HashMap<String, S
|
||||
}
|
||||
|
||||
struct PreparedProcessHandles {
|
||||
writer_tx: mpsc::Sender<Vec<u8>>,
|
||||
process: Arc<ProcessBackend>,
|
||||
output_buffer: OutputBuffer,
|
||||
output_notify: Arc<Notify>,
|
||||
output_closed: Arc<AtomicBool>,
|
||||
@@ -166,9 +170,7 @@ impl UnifiedExecProcessManager {
|
||||
.await;
|
||||
|
||||
let (process, mut deferred_network_approval) = match process {
|
||||
Ok((process, deferred_network_approval)) => {
|
||||
(Arc::new(process), deferred_network_approval)
|
||||
}
|
||||
Ok((process, deferred_network_approval)) => (process, deferred_network_approval),
|
||||
Err(err) => {
|
||||
self.release_process_id(request.process_id).await;
|
||||
return Err(err);
|
||||
@@ -190,7 +192,7 @@ impl UnifiedExecProcessManager {
|
||||
);
|
||||
emitter.emit(event_ctx, ToolEventStage::Begin).await;
|
||||
|
||||
start_streaming_output(&process, context, Arc::clone(&transcript));
|
||||
start_streaming_output(process.clone(), context, Arc::clone(&transcript));
|
||||
let start = Instant::now();
|
||||
// Persist live sessions before the initial yield wait so interrupting the
|
||||
// turn cannot drop the last Arc and terminate the background process.
|
||||
@@ -312,7 +314,6 @@ impl UnifiedExecProcessManager {
|
||||
let process_id = request.process_id;
|
||||
|
||||
let PreparedProcessHandles {
|
||||
writer_tx,
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_closed,
|
||||
@@ -329,7 +330,7 @@ impl UnifiedExecProcessManager {
|
||||
if !tty {
|
||||
return Err(UnifiedExecError::StdinClosed);
|
||||
}
|
||||
Self::send_input(&writer_tx, request.input.as_bytes()).await?;
|
||||
process.write_stdin(request.input.as_bytes()).await?;
|
||||
// Give the remote process a brief window to react so that we are
|
||||
// more likely to capture its output in the poll below.
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
@@ -407,10 +408,10 @@ impl UnifiedExecProcessManager {
|
||||
return ProcessStatus::Unknown;
|
||||
};
|
||||
|
||||
let exit_code = entry.process.exit_code();
|
||||
let exit_code = entry.backend.exit_code();
|
||||
let process_id = entry.process_id;
|
||||
|
||||
if entry.process.has_exited() {
|
||||
if entry.backend.has_exited() {
|
||||
let Some(entry) = store.remove(process_id) else {
|
||||
return ProcessStatus::Unknown;
|
||||
};
|
||||
@@ -448,14 +449,13 @@ impl UnifiedExecProcessManager {
|
||||
output_closed,
|
||||
output_closed_notify,
|
||||
cancellation_token,
|
||||
} = entry.process.output_handles();
|
||||
} = entry.backend.output_handles();
|
||||
let pause_state = entry
|
||||
.session
|
||||
.upgrade()
|
||||
.map(|session| session.subscribe_out_of_band_elicitation_pause_state());
|
||||
|
||||
Ok(PreparedProcessHandles {
|
||||
writer_tx: entry.process.writer_sender(),
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_closed,
|
||||
@@ -468,20 +468,10 @@ impl UnifiedExecProcessManager {
|
||||
})
|
||||
}
|
||||
|
||||
async fn send_input(
|
||||
writer_tx: &mpsc::Sender<Vec<u8>>,
|
||||
data: &[u8],
|
||||
) -> Result<(), UnifiedExecError> {
|
||||
writer_tx
|
||||
.send(data.to_vec())
|
||||
.await
|
||||
.map_err(|_| UnifiedExecError::WriteToStdin)
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn store_process(
|
||||
&self,
|
||||
process: Arc<UnifiedExecProcess>,
|
||||
process: Arc<ProcessBackend>,
|
||||
context: &UnifiedExecContext,
|
||||
command: &[String],
|
||||
cwd: PathBuf,
|
||||
@@ -492,7 +482,7 @@ impl UnifiedExecProcessManager {
|
||||
transcript: Arc<tokio::sync::Mutex<HeadTailBuffer>>,
|
||||
) {
|
||||
let entry = ProcessEntry {
|
||||
process: Arc::clone(&process),
|
||||
backend: Arc::clone(&process),
|
||||
call_id: context.call_id.clone(),
|
||||
process_id,
|
||||
command: command.to_vec(),
|
||||
@@ -511,7 +501,7 @@ impl UnifiedExecProcessManager {
|
||||
// network-approval cleanup only after dropping that lock.
|
||||
if let Some(pruned_entry) = pruned_entry {
|
||||
Self::unregister_network_approval_for_entry(&pruned_entry).await;
|
||||
pruned_entry.process.terminate();
|
||||
pruned_entry.backend.terminate();
|
||||
}
|
||||
|
||||
if number_processes >= WARNING_UNIFIED_EXEC_PROCESSES {
|
||||
@@ -540,9 +530,29 @@ impl UnifiedExecProcessManager {
|
||||
pub(crate) async fn open_session_with_exec_env(
|
||||
&self,
|
||||
env: &ExecRequest,
|
||||
process_id: i32,
|
||||
tty: bool,
|
||||
use_exec_server: bool,
|
||||
mut executor: Option<Arc<dyn ExecProcess>>,
|
||||
mut spawn_lifecycle: SpawnLifecycleHandle,
|
||||
) -> Result<UnifiedExecProcess, UnifiedExecError> {
|
||||
) -> Result<Arc<ProcessBackend>, UnifiedExecError> {
|
||||
if use_exec_server {
|
||||
return self
|
||||
.open_session_with_exec_server(env, process_id, tty, &mut executor, spawn_lifecycle)
|
||||
.await;
|
||||
}
|
||||
|
||||
self.open_session_with_local_process(env, process_id, tty, spawn_lifecycle)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn open_session_with_local_process(
|
||||
&self,
|
||||
env: &ExecRequest,
|
||||
process_id: i32,
|
||||
tty: bool,
|
||||
spawn_lifecycle: SpawnLifecycleHandle,
|
||||
) -> Result<Arc<ProcessBackend>, UnifiedExecError> {
|
||||
let (program, args) = env
|
||||
.command
|
||||
.split_first()
|
||||
@@ -571,10 +581,58 @@ impl UnifiedExecProcessManager {
|
||||
)
|
||||
.await
|
||||
};
|
||||
let spawned =
|
||||
spawn_result.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
|
||||
|
||||
let spawned = spawn_result.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
|
||||
spawn_lifecycle.after_spawn();
|
||||
UnifiedExecProcess::from_spawned(spawned, env.sandbox, spawn_lifecycle).await
|
||||
let process = UnifiedExecProcess::from_spawned(spawned, env.sandbox, spawn_lifecycle).await?;
|
||||
Ok(Arc::new(ProcessBackend::Local {
|
||||
process: Arc::new(process),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn open_session_with_exec_server(
|
||||
&self,
|
||||
env: &ExecRequest,
|
||||
process_id: i32,
|
||||
tty: bool,
|
||||
mut executor: &mut Option<Arc<dyn ExecProcess>>,
|
||||
spawn_lifecycle: SpawnLifecycleHandle,
|
||||
) -> Result<Arc<ProcessBackend>, UnifiedExecError> {
|
||||
let executor = executor
|
||||
.take()
|
||||
.ok_or_else(|| UnifiedExecError::create_process("exec-server unavailable".to_string()))?;
|
||||
let response = executor
|
||||
.start(ExecParams {
|
||||
process_id: process_id.to_string(),
|
||||
argv: env.command.clone(),
|
||||
cwd: env.cwd.clone(),
|
||||
env: env.env.clone(),
|
||||
tty,
|
||||
arg0: env.arg0.clone(),
|
||||
})
|
||||
.await
|
||||
.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
|
||||
let _ = response.process_id;
|
||||
|
||||
let process = Arc::new(ProcessBackend::ExecServer {
|
||||
process_id: process_id.to_string(),
|
||||
executor,
|
||||
output_buffer: Arc::new(tokio::sync::Mutex::new(HeadTailBuffer::default())),
|
||||
output_notify: Arc::new(Notify::new()),
|
||||
output_closed: Arc::new(AtomicBool::new(false)),
|
||||
output_closed_notify: Arc::new(Notify::new()),
|
||||
output_drained: Arc::new(Notify::new()),
|
||||
cancellation_token: CancellationToken::new(),
|
||||
exit_code: Arc::new(RwLock::new(None)),
|
||||
has_exited: Arc::new(AtomicBool::new(false)),
|
||||
sandbox_type: env.sandbox,
|
||||
output_seq: Arc::new(AtomicU64::new(0)),
|
||||
});
|
||||
|
||||
spawn_lifecycle.after_spawn();
|
||||
Self::spawn_exec_server_output_watcher(Arc::clone(&process));
|
||||
|
||||
Ok(process)
|
||||
}
|
||||
|
||||
pub(super) async fn open_session_with_sandbox(
|
||||
@@ -582,16 +640,20 @@ impl UnifiedExecProcessManager {
|
||||
request: &ExecCommandRequest,
|
||||
cwd: PathBuf,
|
||||
context: &UnifiedExecContext,
|
||||
) -> Result<(UnifiedExecProcess, Option<DeferredNetworkApproval>), UnifiedExecError> {
|
||||
) -> Result<(Arc<ProcessBackend>, Option<DeferredNetworkApproval>), UnifiedExecError> {
|
||||
let env = apply_unified_exec_env(create_env(
|
||||
&context.turn.shell_environment_policy,
|
||||
Some(context.session.conversation_id),
|
||||
));
|
||||
let mut orchestrator = ToolOrchestrator::new();
|
||||
let mut runtime = UnifiedExecRuntime::new(
|
||||
self,
|
||||
context.turn.tools_config.unified_exec_shell_mode.clone(),
|
||||
);
|
||||
let mut runtime = if context.turn.config.experimental_exec_server_url.is_some() {
|
||||
UnifiedExecRuntime::with_exec_server(
|
||||
self,
|
||||
context.turn.tools_config.unified_exec_shell_mode.clone(),
|
||||
)
|
||||
} else {
|
||||
UnifiedExecRuntime::new(self, context.turn.tools_config.unified_exec_shell_mode.clone())
|
||||
};
|
||||
let exec_approval_requirement = context
|
||||
.session
|
||||
.services
|
||||
@@ -618,6 +680,8 @@ impl UnifiedExecProcessManager {
|
||||
tty: request.tty,
|
||||
sandbox_permissions: request.sandbox_permissions,
|
||||
additional_permissions: request.additional_permissions.clone(),
|
||||
process_id: request.process_id,
|
||||
use_exec_server: context.turn.config.experimental_exec_server_url.is_some(),
|
||||
#[cfg(unix)]
|
||||
additional_permissions_preapproved: request.additional_permissions_preapproved,
|
||||
justification: request.justification.clone(),
|
||||
@@ -775,7 +839,7 @@ impl UnifiedExecProcessManager {
|
||||
let meta: Vec<(i32, Instant, bool)> = store
|
||||
.processes
|
||||
.iter()
|
||||
.map(|(id, entry)| (*id, entry.last_used, entry.process.has_exited()))
|
||||
.map(|(id, entry)| (*id, entry.last_used, entry.backend.has_exited()))
|
||||
.collect();
|
||||
|
||||
if let Some(process_id) = Self::process_id_to_prune_from_meta(&meta) {
|
||||
@@ -828,11 +892,129 @@ impl UnifiedExecProcessManager {
|
||||
|
||||
for entry in entries {
|
||||
Self::unregister_network_approval_for_entry(&entry).await;
|
||||
entry.process.terminate();
|
||||
entry.backend.terminate();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UnifiedExecProcessManager {
|
||||
fn spawn_exec_server_output_watcher(process: Arc<ProcessBackend>) {
|
||||
let (
|
||||
process_id,
|
||||
executor,
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_closed,
|
||||
output_closed_notify,
|
||||
output_drained,
|
||||
output_seq,
|
||||
has_exited,
|
||||
exit_code,
|
||||
cancellation_token,
|
||||
) = {
|
||||
if let ProcessBackend::ExecServer {
|
||||
process_id,
|
||||
executor,
|
||||
output_buffer,
|
||||
output_notify,
|
||||
output_closed,
|
||||
output_closed_notify,
|
||||
output_drained,
|
||||
output_seq,
|
||||
has_exited,
|
||||
exit_code,
|
||||
cancellation_token,
|
||||
..
|
||||
} = process.as_ref()
|
||||
{
|
||||
(
|
||||
process_id.clone(),
|
||||
Arc::clone(executor),
|
||||
Arc::clone(output_buffer),
|
||||
Arc::clone(output_notify),
|
||||
Arc::clone(output_closed),
|
||||
Arc::clone(output_closed_notify),
|
||||
Arc::clone(output_drained),
|
||||
Arc::clone(output_seq),
|
||||
Arc::clone(has_exited),
|
||||
Arc::clone(exit_code),
|
||||
cancellation_token.clone(),
|
||||
)
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut after_seq = None;
|
||||
loop {
|
||||
if cancellation_token.is_cancelled() {
|
||||
output_closed.store(true, Ordering::Release);
|
||||
output_closed_notify.notify_waiters();
|
||||
output_drained.notify_waiters();
|
||||
break;
|
||||
}
|
||||
|
||||
let response = match executor
|
||||
.read(ReadParams {
|
||||
process_id: process_id.clone(),
|
||||
after_seq,
|
||||
max_bytes: Some(64 * 1024),
|
||||
wait_ms: Some(100),
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(response) => response,
|
||||
Err(_) => {
|
||||
let mut guard = exit_code.write().unwrap_or_else(|err| err.into_inner());
|
||||
if guard.is_none() {
|
||||
*guard = Some(-1);
|
||||
}
|
||||
has_exited.store(true, Ordering::Release);
|
||||
output_closed.store(true, Ordering::Release);
|
||||
output_closed_notify.notify_waiters();
|
||||
output_drained.notify_waiters();
|
||||
cancellation_token.cancel();
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if !response.chunks.is_empty() {
|
||||
let mut current_seq = output_seq.load(Ordering::Acquire);
|
||||
for chunk in response.chunks {
|
||||
if chunk.seq < current_seq {
|
||||
continue;
|
||||
}
|
||||
|
||||
{
|
||||
let mut guard = output_buffer.lock().await;
|
||||
guard.push_chunk(chunk.chunk.into_inner());
|
||||
}
|
||||
current_seq = chunk.seq.saturating_add(1);
|
||||
output_seq.store(current_seq, Ordering::Release);
|
||||
output_notify.notify_waiters();
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(code) = response.exit_code {
|
||||
let mut guard = exit_code.write().unwrap_or_else(|err| err.into_inner());
|
||||
*guard = Some(code);
|
||||
}
|
||||
after_seq = Some(response.next_seq);
|
||||
|
||||
if response.exited {
|
||||
has_exited.store(true, Ordering::Release);
|
||||
output_closed.store(true, Ordering::Release);
|
||||
output_closed_notify.notify_waiters();
|
||||
output_drained.notify_waiters();
|
||||
cancellation_token.cancel();
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
enum ProcessStatus {
|
||||
Alive {
|
||||
exit_code: Option<i32>,
|
||||
|
||||
@@ -2978,14 +2978,6 @@ pub enum Product {
|
||||
Atlas,
|
||||
}
|
||||
impl Product {
|
||||
pub fn to_app_platform(self) -> &'static str {
|
||||
match self {
|
||||
Self::Chatgpt => "chat",
|
||||
Self::Codex => "codex",
|
||||
Self::Atlas => "atlas",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_session_source_name(value: &str) -> Option<Self> {
|
||||
let normalized = value.trim().to_ascii_lowercase();
|
||||
match normalized.as_str() {
|
||||
|
||||
Reference in New Issue
Block a user