app-server: Add streaming and tty/pty capabilities to command/exec (#13640)

* Add an ability to stream stdin, stdout, and stderr
* Streaming of stdout and stderr has a configurable cap for total amount
of transmitted bytes (with an ability to disable it)
* Add support for overriding environment variables
* Add an ability to terminate running applications (using
`command/exec/terminate`)
* Add TTY/PTY support, with an ability to resize the terminal (using
`command/exec/resize`)
This commit is contained in:
Ruslan Nigmatullin
2026-03-06 17:30:17 -08:00
committed by GitHub
parent 61098c7f51
commit e9bd8b20a1
43 changed files with 4205 additions and 70 deletions

View File

@@ -1,4 +1,6 @@
use crate::bespoke_event_handling::apply_bespoke_event_handling;
use crate::command_exec::CommandExecManager;
use crate::command_exec::StartCommandExecParams;
use crate::error_code::INPUT_TOO_LARGE_ERROR_CODE;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_PARAMS_ERROR_CODE;
@@ -34,10 +36,12 @@ use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::CollaborationModeListParams;
use codex_app_server_protocol::CollaborationModeListResponse;
use codex_app_server_protocol::CommandExecParams;
use codex_app_server_protocol::CommandExecResizeParams;
use codex_app_server_protocol::CommandExecTerminateParams;
use codex_app_server_protocol::CommandExecWriteParams;
use codex_app_server_protocol::ConversationGitInfo;
use codex_app_server_protocol::ConversationSummary;
use codex_app_server_protocol::DynamicToolSpec as ApiDynamicToolSpec;
use codex_app_server_protocol::ExecOneOffCommandResponse;
use codex_app_server_protocol::ExperimentalFeature as ApiExperimentalFeature;
use codex_app_server_protocol::ExperimentalFeatureListParams;
use codex_app_server_protocol::ExperimentalFeatureListResponse;
@@ -193,6 +197,7 @@ use codex_core::connectors::filter_disallowed_connectors;
use codex_core::connectors::merge_plugin_apps;
use codex_core::default_client::set_default_client_residency_requirement;
use codex_core::error::CodexErr;
use codex_core::exec::ExecExpiration;
use codex_core::exec::ExecParams;
use codex_core::exec_env::create_env;
use codex_core::features::FEATURES;
@@ -264,6 +269,7 @@ use codex_state::StateRuntime;
use codex_state::ThreadMetadataBuilder;
use codex_state::log_db::LogDbLayer;
use codex_utils_json_to_toml::json_to_toml;
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
use std::collections::HashMap;
use std::collections::HashSet;
use std::ffi::OsStr;
@@ -282,6 +288,7 @@ use tokio::sync::Mutex;
use tokio::sync::broadcast;
use tokio::sync::oneshot;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
use toml::Value as TomlValue;
use tracing::error;
use tracing::info;
@@ -369,6 +376,7 @@ pub(crate) struct CodexMessageProcessor {
pending_thread_unloads: Arc<Mutex<HashSet<ThreadId>>>,
thread_state_manager: ThreadStateManager,
thread_watch_manager: ThreadWatchManager,
command_exec_manager: CommandExecManager,
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
fuzzy_search_sessions: Arc<Mutex<HashMap<String, FuzzyFileSearchSession>>>,
feedback: CodexFeedback,
@@ -482,6 +490,7 @@ impl CodexMessageProcessor {
pending_thread_unloads: Arc::new(Mutex::new(HashSet::new())),
thread_state_manager: ThreadStateManager::new(),
thread_watch_manager: ThreadWatchManager::new_with_outgoing(outgoing),
command_exec_manager: CommandExecManager::default(),
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
fuzzy_search_sessions: Arc::new(Mutex::new(HashMap::new())),
feedback,
@@ -831,6 +840,18 @@ impl CodexMessageProcessor {
self.exec_one_off_command(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::CommandExecWrite { request_id, params } => {
self.command_exec_write(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::CommandExecResize { request_id, params } => {
self.command_exec_resize(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::CommandExecTerminate { request_id, params } => {
self.command_exec_terminate(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::ConfigRead { .. }
| ClientRequest::ConfigValueWrite { .. }
| ClientRequest::ConfigBatchWrite { .. } => {
@@ -1503,11 +1524,84 @@ impl CodexMessageProcessor {
return;
}
let cwd = params.cwd.unwrap_or_else(|| self.config.cwd.clone());
let env = create_env(&self.config.permissions.shell_environment_policy, None);
let timeout_ms = params
.timeout_ms
.and_then(|timeout_ms| u64::try_from(timeout_ms).ok());
let CommandExecParams {
command,
process_id,
tty,
stream_stdin,
stream_stdout_stderr,
output_bytes_cap,
disable_output_cap,
disable_timeout,
timeout_ms,
cwd,
env: env_overrides,
size,
sandbox_policy,
} = params;
if size.is_some() && !tty {
let error = JSONRPCErrorError {
code: INVALID_PARAMS_ERROR_CODE,
message: "command/exec size requires tty: true".to_string(),
data: None,
};
self.outgoing.send_error(request, error).await;
return;
}
if disable_output_cap && output_bytes_cap.is_some() {
let error = JSONRPCErrorError {
code: INVALID_PARAMS_ERROR_CODE,
message: "command/exec cannot set both outputBytesCap and disableOutputCap"
.to_string(),
data: None,
};
self.outgoing.send_error(request, error).await;
return;
}
if disable_timeout && timeout_ms.is_some() {
let error = JSONRPCErrorError {
code: INVALID_PARAMS_ERROR_CODE,
message: "command/exec cannot set both timeoutMs and disableTimeout".to_string(),
data: None,
};
self.outgoing.send_error(request, error).await;
return;
}
let cwd = cwd.unwrap_or_else(|| self.config.cwd.clone());
let mut env = create_env(&self.config.permissions.shell_environment_policy, None);
if let Some(env_overrides) = env_overrides {
for (key, value) in env_overrides {
match value {
Some(value) => {
env.insert(key, value);
}
None => {
env.remove(&key);
}
}
}
}
let timeout_ms = match timeout_ms {
Some(timeout_ms) => match u64::try_from(timeout_ms) {
Ok(timeout_ms) => Some(timeout_ms),
Err(_) => {
let error = JSONRPCErrorError {
code: INVALID_PARAMS_ERROR_CODE,
message: format!(
"command/exec timeoutMs must be non-negative, got {timeout_ms}"
),
data: None,
};
self.outgoing.send_error(request, error).await;
return;
}
},
None => None,
};
let managed_network_requirements_enabled =
self.config.managed_network_requirements_enabled();
let started_network_proxy = match self.config.permissions.network.as_ref() {
@@ -1535,10 +1629,23 @@ impl CodexMessageProcessor {
None => None,
};
let windows_sandbox_level = WindowsSandboxLevel::from_config(&self.config);
let output_bytes_cap = if disable_output_cap {
None
} else {
Some(output_bytes_cap.unwrap_or(DEFAULT_OUTPUT_BYTES_CAP))
};
let expiration = if disable_timeout {
ExecExpiration::Cancellation(CancellationToken::new())
} else {
match timeout_ms {
Some(timeout_ms) => timeout_ms.into(),
None => ExecExpiration::DefaultTimeout,
}
};
let exec_params = ExecParams {
command: params.command,
command,
cwd,
expiration: timeout_ms.into(),
expiration,
env,
network: started_network_proxy
.as_ref()
@@ -1549,7 +1656,7 @@ impl CodexMessageProcessor {
arg0: None,
};
let requested_policy = params.sandbox_policy.map(|policy| policy.to_core());
let requested_policy = sandbox_policy.map(|policy| policy.to_core());
let effective_policy = match requested_policy {
Some(policy) => match self.config.permissions.sandbox_policy.can_set(&policy) {
Ok(()) => policy,
@@ -1568,41 +1675,100 @@ impl CodexMessageProcessor {
let codex_linux_sandbox_exe = self.arg0_paths.codex_linux_sandbox_exe.clone();
let outgoing = self.outgoing.clone();
let request_for_task = request;
let request_for_task = request.clone();
let sandbox_cwd = self.config.cwd.clone();
let started_network_proxy_for_task = started_network_proxy;
let use_linux_sandbox_bwrap = self.config.features.enabled(Feature::UseLinuxSandboxBwrap);
let size = match size.map(crate::command_exec::terminal_size_from_protocol) {
Some(Ok(size)) => Some(size),
Some(Err(error)) => {
self.outgoing.send_error(request, error).await;
return;
}
None => None,
};
tokio::spawn(async move {
let _started_network_proxy = started_network_proxy_for_task;
match codex_core::exec::process_exec_tool_call(
exec_params,
&effective_policy,
sandbox_cwd.as_path(),
&codex_linux_sandbox_exe,
use_linux_sandbox_bwrap,
None,
)
.await
{
Ok(output) => {
let response = ExecOneOffCommandResponse {
exit_code: output.exit_code,
stdout: output.stdout.text,
stderr: output.stderr.text,
};
outgoing.send_response(request_for_task, response).await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("exec failed: {err}"),
data: None,
};
outgoing.send_error(request_for_task, error).await;
match codex_core::exec::build_exec_request(
exec_params,
&effective_policy,
sandbox_cwd.as_path(),
&codex_linux_sandbox_exe,
use_linux_sandbox_bwrap,
) {
Ok(exec_request) => {
if let Err(error) = self
.command_exec_manager
.start(StartCommandExecParams {
outgoing,
request_id: request_for_task,
process_id,
exec_request,
started_network_proxy: started_network_proxy_for_task,
tty,
stream_stdin,
stream_stdout_stderr,
output_bytes_cap,
size,
})
.await
{
self.outgoing.send_error(request, error).await;
}
}
});
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("exec failed: {err}"),
data: None,
};
self.outgoing.send_error(request, error).await;
}
}
}
async fn command_exec_write(
&self,
request_id: ConnectionRequestId,
params: CommandExecWriteParams,
) {
match self
.command_exec_manager
.write(request_id.clone(), params)
.await
{
Ok(response) => self.outgoing.send_response(request_id, response).await,
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
async fn command_exec_resize(
&self,
request_id: ConnectionRequestId,
params: CommandExecResizeParams,
) {
match self
.command_exec_manager
.resize(request_id.clone(), params)
.await
{
Ok(response) => self.outgoing.send_response(request_id, response).await,
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
async fn command_exec_terminate(
&self,
request_id: ConnectionRequestId,
params: CommandExecTerminateParams,
) {
match self
.command_exec_manager
.terminate(request_id.clone(), params)
.await
{
Ok(response) => self.outgoing.send_response(request_id, response).await,
Err(error) => self.outgoing.send_error(request_id, error).await,
}
}
async fn thread_start(&self, request_id: ConnectionRequestId, params: ThreadStartParams) {
@@ -2872,6 +3038,9 @@ impl CodexMessageProcessor {
}
pub(crate) async fn connection_closed(&mut self, connection_id: ConnectionId) {
self.command_exec_manager
.connection_closed(connection_id)
.await;
self.thread_state_manager
.remove_connection(connection_id)
.await;