Compare commits

...

3 Commits

Author SHA1 Message Date
Thibault Sottiaux
3632c6a3ef Revert "execpolicy: suppress proposals for multiline commands"
This reverts commit 74d670029e.
2026-01-05 21:54:08 -08:00
Thibault Sottiaux
74d670029e execpolicy: suppress proposals for multiline commands 2026-01-05 17:44:36 -08:00
Thibault Sottiaux
4c6116b1cf unified exec: add long-running sessions 2026-01-05 03:19:39 -08:00
6 changed files with 132 additions and 28 deletions

View File

@@ -206,7 +206,7 @@ impl Session {
async fn close_unified_exec_sessions(&self) {
self.services
.unified_exec_manager
.terminate_all_sessions()
.terminate_turn_sessions()
.await;
}

View File

@@ -36,6 +36,8 @@ struct ExecCommandArgs {
shell: Option<String>,
#[serde(default = "default_login")]
login: bool,
#[serde(default)]
long_running: bool,
#[serde(default = "default_exec_yield_time_ms")]
yield_time_ms: u64,
#[serde(default)]
@@ -135,6 +137,7 @@ impl ToolHandler for UnifiedExecHandler {
max_output_tokens,
sandbox_permissions,
justification,
long_running,
..
} = args;
@@ -196,6 +199,7 @@ impl ToolHandler for UnifiedExecHandler {
workdir,
sandbox_permissions,
justification,
long_running,
},
&context,
)

View File

@@ -196,6 +196,15 @@ fn create_exec_command_tool() -> ToolSpec {
),
},
);
properties.insert(
"long_running".to_string(),
JsonSchema::Boolean {
description: Some(
"Whether to keep the process running across turns. Max 8 long-running processes; long-running processes do not stream output deltas or end events."
.to_string(),
),
},
);
ToolSpec::Function(ResponsesApiTool {
name: "exec_command".to_string(),
@@ -1493,6 +1502,28 @@ mod tests {
assert_contains_tool_names(&tools, &subset);
}
#[test]
fn test_exec_command_schema_includes_long_running() {
let config = test_config();
let model_family = ModelsManager::construct_model_family_offline("o3", &config);
let mut features = Features::with_defaults();
features.enable(Feature::UnifiedExec);
let tools_config = ToolsConfig::new(&ToolsConfigParams {
model_family: &model_family,
features: &features,
});
let (tools, _) = build_specs(&tools_config, None).build();
let tool = find_tool(&tools, "exec_command");
let ToolSpec::Function(tool) = &tool.spec else {
panic!("expected exec_command to be a function tool");
};
let JsonSchema::Object { properties, .. } = &tool.parameters else {
panic!("expected exec_command schema parameters to be an object");
};
assert!(properties.contains_key("long_running"));
}
#[test]
#[ignore]
fn test_parallel_support_flags() {

View File

@@ -8,6 +8,10 @@ pub(crate) enum UnifiedExecError {
// Called "session" in the model's training.
#[error("Unknown session id {process_id}")]
UnknownSessionId { process_id: String },
#[error(
"Too many long-running unified exec sessions (max {max}). Close an existing one before starting another."
)]
TooManyLongRunningSessions { max: usize },
#[error("failed to write to stdin")]
WriteToStdin,
#[error("missing command line for unified exec request")]

View File

@@ -49,6 +49,7 @@ pub(crate) const DEFAULT_MAX_OUTPUT_TOKENS: usize = 10_000;
pub(crate) const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 1024 * 1024; // 1 MiB
pub(crate) const UNIFIED_EXEC_OUTPUT_MAX_TOKENS: usize = UNIFIED_EXEC_OUTPUT_MAX_BYTES / 4;
pub(crate) const MAX_UNIFIED_EXEC_SESSIONS: usize = 64;
pub(crate) const MAX_LONG_RUNNING_UNIFIED_EXEC_SESSIONS: usize = 8;
// Send a warning message to the models when it reaches this number of sessions.
pub(crate) const WARNING_UNIFIED_EXEC_SESSIONS: usize = 60;
@@ -96,6 +97,7 @@ pub(crate) struct ExecCommandRequest {
pub workdir: Option<PathBuf>,
pub sandbox_permissions: SandboxPermissions,
pub justification: Option<String>,
pub long_running: bool,
}
#[derive(Debug)]
@@ -153,6 +155,7 @@ struct SessionEntry {
process_id: String,
command: Vec<String>,
last_used: tokio::time::Instant,
long_running: bool,
}
pub(crate) fn clamp_yield_time(yield_time_ms: u64) -> u64 {
@@ -220,6 +223,7 @@ mod tests {
workdir: None,
sandbox_permissions: SandboxPermissions::UseDefault,
justification: None,
long_running: false,
},
&context,
)

View File

@@ -28,6 +28,7 @@ use crate::truncate::formatted_truncate_text;
use super::CommandTranscript;
use super::ExecCommandRequest;
use super::MAX_LONG_RUNNING_UNIFIED_EXEC_SESSIONS;
use super::MAX_UNIFIED_EXEC_SESSIONS;
use super::SessionEntry;
use super::SessionStore;
@@ -74,6 +75,7 @@ struct PreparedSessionHandles {
turn_ref: Arc<TurnContext>,
command: Vec<String>,
process_id: String,
long_running: bool,
}
impl UnifiedExecSessionManager {
@@ -140,7 +142,9 @@ impl UnifiedExecSessionManager {
};
let transcript = Arc::new(tokio::sync::Mutex::new(CommandTranscript::default()));
start_streaming_output(&session, context, Arc::clone(&transcript));
if !request.long_running {
start_streaming_output(&session, context, Arc::clone(&transcript));
}
let max_tokens = resolve_max_tokens(request.max_output_tokens);
let yield_time_ms = clamp_yield_time(request.yield_time_ms);
@@ -175,23 +179,35 @@ impl UnifiedExecSessionManager {
// same helper as the background watcher, so all end events share
// one implementation.
let exit = exit_code.unwrap_or(-1);
emit_exec_end_for_unified_exec(
Arc::clone(&context.session),
Arc::clone(&context.turn),
context.call_id.clone(),
request.command.clone(),
cwd,
Some(process_id),
Arc::clone(&transcript),
output.clone(),
exit,
wall_time,
)
.await;
if !request.long_running {
emit_exec_end_for_unified_exec(
Arc::clone(&context.session),
Arc::clone(&context.turn),
context.call_id.clone(),
request.command.clone(),
cwd,
Some(process_id),
Arc::clone(&transcript),
output.clone(),
exit,
wall_time,
)
.await;
}
self.release_process_id(&request.process_id).await;
session.check_for_sandbox_denial_with_text(&text).await?;
} else {
if request.long_running
&& self.active_long_running_session_count().await
>= MAX_LONG_RUNNING_UNIFIED_EXEC_SESSIONS
{
session.terminate();
self.release_process_id(&request.process_id).await;
return Err(UnifiedExecError::TooManyLongRunningSessions {
max: MAX_LONG_RUNNING_UNIFIED_EXEC_SESSIONS,
});
}
// Longlived command: persist the session so write_stdin can reuse
// it, and register a background watcher that will emit
// ExecCommandEnd when the PTY eventually exits (even if no further
@@ -204,6 +220,7 @@ impl UnifiedExecSessionManager {
start,
process_id,
Arc::clone(&transcript),
request.long_running,
)
.await;
@@ -245,6 +262,7 @@ impl UnifiedExecSessionManager {
turn_ref,
command: session_command,
process_id,
long_running,
..
} = self.prepare_session_handles(process_id.as_str()).await?;
@@ -307,7 +325,7 @@ impl UnifiedExecSessionManager {
session_command: Some(session_command.clone()),
};
if response.process_id.is_some() {
if response.process_id.is_some() && !long_running {
Self::emit_waiting_status(&session_ref, &turn_ref, &session_command).await;
}
@@ -368,6 +386,7 @@ impl UnifiedExecSessionManager {
turn_ref: Arc::clone(&entry.turn_ref),
command: entry.command.clone(),
process_id: entry.process_id.clone(),
long_running: entry.long_running,
})
}
@@ -391,6 +410,7 @@ impl UnifiedExecSessionManager {
started_at: Instant,
process_id: String,
transcript: Arc<tokio::sync::Mutex<CommandTranscript>>,
long_running: bool,
) {
let entry = SessionEntry {
session: Arc::clone(&session),
@@ -400,6 +420,7 @@ impl UnifiedExecSessionManager {
process_id: process_id.clone(),
command: command.to_vec(),
last_used: started_at,
long_running,
};
let number_sessions = {
let mut store = self.session_store.lock().await;
@@ -418,17 +439,19 @@ impl UnifiedExecSessionManager {
.await;
};
spawn_exit_watcher(
Arc::clone(&session),
Arc::clone(&context.session),
Arc::clone(&context.turn),
context.call_id.clone(),
command.to_vec(),
cwd,
process_id,
transcript,
started_at,
);
if !long_running {
spawn_exit_watcher(
Arc::clone(&session),
Arc::clone(&context.session),
Arc::clone(&context.turn),
context.call_id.clone(),
command.to_vec(),
cwd,
process_id,
transcript,
started_at,
);
}
}
async fn emit_waiting_status(
@@ -583,13 +606,19 @@ impl UnifiedExecSessionManager {
}
fn prune_sessions_if_needed(store: &mut SessionStore) -> bool {
if store.sessions.len() < MAX_UNIFIED_EXEC_SESSIONS {
let non_long_running = store
.sessions
.values()
.filter(|entry| !entry.long_running)
.count();
if non_long_running < MAX_UNIFIED_EXEC_SESSIONS {
return false;
}
let meta: Vec<(String, Instant, bool)> = store
.sessions
.iter()
.filter(|(_, entry)| !entry.long_running)
.map(|(id, entry)| (id.clone(), entry.last_used, entry.session.has_exited()))
.collect();
@@ -645,6 +674,38 @@ impl UnifiedExecSessionManager {
entry.session.terminate();
}
}
pub(crate) async fn terminate_turn_sessions(&self) {
let entries: Vec<SessionEntry> = {
let mut sessions = self.session_store.lock().await;
let mut keep = HashMap::new();
let mut to_terminate = Vec::new();
let drained: Vec<(String, SessionEntry)> = sessions.sessions.drain().collect();
for (process_id, entry) in drained {
if entry.long_running {
keep.insert(process_id, entry);
} else {
sessions.reserved_sessions_id.remove(&process_id);
to_terminate.push(entry);
}
}
sessions.sessions = keep;
to_terminate
};
for entry in entries {
entry.session.terminate();
}
}
async fn active_long_running_session_count(&self) -> usize {
let store = self.session_store.lock().await;
store
.sessions
.values()
.filter(|entry| entry.long_running && !entry.session.has_exited())
.count()
}
}
enum SessionStatus {