mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
Compare commits
3 Commits
shareable-
...
tibo/unifi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3632c6a3ef | ||
|
|
74d670029e | ||
|
|
4c6116b1cf |
@@ -206,7 +206,7 @@ impl Session {
|
||||
async fn close_unified_exec_sessions(&self) {
|
||||
self.services
|
||||
.unified_exec_manager
|
||||
.terminate_all_sessions()
|
||||
.terminate_turn_sessions()
|
||||
.await;
|
||||
}
|
||||
|
||||
|
||||
@@ -36,6 +36,8 @@ struct ExecCommandArgs {
|
||||
shell: Option<String>,
|
||||
#[serde(default = "default_login")]
|
||||
login: bool,
|
||||
#[serde(default)]
|
||||
long_running: bool,
|
||||
#[serde(default = "default_exec_yield_time_ms")]
|
||||
yield_time_ms: u64,
|
||||
#[serde(default)]
|
||||
@@ -135,6 +137,7 @@ impl ToolHandler for UnifiedExecHandler {
|
||||
max_output_tokens,
|
||||
sandbox_permissions,
|
||||
justification,
|
||||
long_running,
|
||||
..
|
||||
} = args;
|
||||
|
||||
@@ -196,6 +199,7 @@ impl ToolHandler for UnifiedExecHandler {
|
||||
workdir,
|
||||
sandbox_permissions,
|
||||
justification,
|
||||
long_running,
|
||||
},
|
||||
&context,
|
||||
)
|
||||
|
||||
@@ -196,6 +196,15 @@ fn create_exec_command_tool() -> ToolSpec {
|
||||
),
|
||||
},
|
||||
);
|
||||
properties.insert(
|
||||
"long_running".to_string(),
|
||||
JsonSchema::Boolean {
|
||||
description: Some(
|
||||
"Whether to keep the process running across turns. Max 8 long-running processes; long-running processes do not stream output deltas or end events."
|
||||
.to_string(),
|
||||
),
|
||||
},
|
||||
);
|
||||
|
||||
ToolSpec::Function(ResponsesApiTool {
|
||||
name: "exec_command".to_string(),
|
||||
@@ -1493,6 +1502,28 @@ mod tests {
|
||||
assert_contains_tool_names(&tools, &subset);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_exec_command_schema_includes_long_running() {
|
||||
let config = test_config();
|
||||
let model_family = ModelsManager::construct_model_family_offline("o3", &config);
|
||||
let mut features = Features::with_defaults();
|
||||
features.enable(Feature::UnifiedExec);
|
||||
let tools_config = ToolsConfig::new(&ToolsConfigParams {
|
||||
model_family: &model_family,
|
||||
features: &features,
|
||||
});
|
||||
let (tools, _) = build_specs(&tools_config, None).build();
|
||||
|
||||
let tool = find_tool(&tools, "exec_command");
|
||||
let ToolSpec::Function(tool) = &tool.spec else {
|
||||
panic!("expected exec_command to be a function tool");
|
||||
};
|
||||
let JsonSchema::Object { properties, .. } = &tool.parameters else {
|
||||
panic!("expected exec_command schema parameters to be an object");
|
||||
};
|
||||
assert!(properties.contains_key("long_running"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[ignore]
|
||||
fn test_parallel_support_flags() {
|
||||
|
||||
@@ -8,6 +8,10 @@ pub(crate) enum UnifiedExecError {
|
||||
// Called "session" in the model's training.
|
||||
#[error("Unknown session id {process_id}")]
|
||||
UnknownSessionId { process_id: String },
|
||||
#[error(
|
||||
"Too many long-running unified exec sessions (max {max}). Close an existing one before starting another."
|
||||
)]
|
||||
TooManyLongRunningSessions { max: usize },
|
||||
#[error("failed to write to stdin")]
|
||||
WriteToStdin,
|
||||
#[error("missing command line for unified exec request")]
|
||||
|
||||
@@ -49,6 +49,7 @@ pub(crate) const DEFAULT_MAX_OUTPUT_TOKENS: usize = 10_000;
|
||||
pub(crate) const UNIFIED_EXEC_OUTPUT_MAX_BYTES: usize = 1024 * 1024; // 1 MiB
|
||||
pub(crate) const UNIFIED_EXEC_OUTPUT_MAX_TOKENS: usize = UNIFIED_EXEC_OUTPUT_MAX_BYTES / 4;
|
||||
pub(crate) const MAX_UNIFIED_EXEC_SESSIONS: usize = 64;
|
||||
pub(crate) const MAX_LONG_RUNNING_UNIFIED_EXEC_SESSIONS: usize = 8;
|
||||
|
||||
// Send a warning message to the models when it reaches this number of sessions.
|
||||
pub(crate) const WARNING_UNIFIED_EXEC_SESSIONS: usize = 60;
|
||||
@@ -96,6 +97,7 @@ pub(crate) struct ExecCommandRequest {
|
||||
pub workdir: Option<PathBuf>,
|
||||
pub sandbox_permissions: SandboxPermissions,
|
||||
pub justification: Option<String>,
|
||||
pub long_running: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -153,6 +155,7 @@ struct SessionEntry {
|
||||
process_id: String,
|
||||
command: Vec<String>,
|
||||
last_used: tokio::time::Instant,
|
||||
long_running: bool,
|
||||
}
|
||||
|
||||
pub(crate) fn clamp_yield_time(yield_time_ms: u64) -> u64 {
|
||||
@@ -220,6 +223,7 @@ mod tests {
|
||||
workdir: None,
|
||||
sandbox_permissions: SandboxPermissions::UseDefault,
|
||||
justification: None,
|
||||
long_running: false,
|
||||
},
|
||||
&context,
|
||||
)
|
||||
|
||||
@@ -28,6 +28,7 @@ use crate::truncate::formatted_truncate_text;
|
||||
|
||||
use super::CommandTranscript;
|
||||
use super::ExecCommandRequest;
|
||||
use super::MAX_LONG_RUNNING_UNIFIED_EXEC_SESSIONS;
|
||||
use super::MAX_UNIFIED_EXEC_SESSIONS;
|
||||
use super::SessionEntry;
|
||||
use super::SessionStore;
|
||||
@@ -74,6 +75,7 @@ struct PreparedSessionHandles {
|
||||
turn_ref: Arc<TurnContext>,
|
||||
command: Vec<String>,
|
||||
process_id: String,
|
||||
long_running: bool,
|
||||
}
|
||||
|
||||
impl UnifiedExecSessionManager {
|
||||
@@ -140,7 +142,9 @@ impl UnifiedExecSessionManager {
|
||||
};
|
||||
|
||||
let transcript = Arc::new(tokio::sync::Mutex::new(CommandTranscript::default()));
|
||||
start_streaming_output(&session, context, Arc::clone(&transcript));
|
||||
if !request.long_running {
|
||||
start_streaming_output(&session, context, Arc::clone(&transcript));
|
||||
}
|
||||
|
||||
let max_tokens = resolve_max_tokens(request.max_output_tokens);
|
||||
let yield_time_ms = clamp_yield_time(request.yield_time_ms);
|
||||
@@ -175,23 +179,35 @@ impl UnifiedExecSessionManager {
|
||||
// same helper as the background watcher, so all end events share
|
||||
// one implementation.
|
||||
let exit = exit_code.unwrap_or(-1);
|
||||
emit_exec_end_for_unified_exec(
|
||||
Arc::clone(&context.session),
|
||||
Arc::clone(&context.turn),
|
||||
context.call_id.clone(),
|
||||
request.command.clone(),
|
||||
cwd,
|
||||
Some(process_id),
|
||||
Arc::clone(&transcript),
|
||||
output.clone(),
|
||||
exit,
|
||||
wall_time,
|
||||
)
|
||||
.await;
|
||||
if !request.long_running {
|
||||
emit_exec_end_for_unified_exec(
|
||||
Arc::clone(&context.session),
|
||||
Arc::clone(&context.turn),
|
||||
context.call_id.clone(),
|
||||
request.command.clone(),
|
||||
cwd,
|
||||
Some(process_id),
|
||||
Arc::clone(&transcript),
|
||||
output.clone(),
|
||||
exit,
|
||||
wall_time,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
self.release_process_id(&request.process_id).await;
|
||||
session.check_for_sandbox_denial_with_text(&text).await?;
|
||||
} else {
|
||||
if request.long_running
|
||||
&& self.active_long_running_session_count().await
|
||||
>= MAX_LONG_RUNNING_UNIFIED_EXEC_SESSIONS
|
||||
{
|
||||
session.terminate();
|
||||
self.release_process_id(&request.process_id).await;
|
||||
return Err(UnifiedExecError::TooManyLongRunningSessions {
|
||||
max: MAX_LONG_RUNNING_UNIFIED_EXEC_SESSIONS,
|
||||
});
|
||||
}
|
||||
// Long‑lived command: persist the session so write_stdin can reuse
|
||||
// it, and register a background watcher that will emit
|
||||
// ExecCommandEnd when the PTY eventually exits (even if no further
|
||||
@@ -204,6 +220,7 @@ impl UnifiedExecSessionManager {
|
||||
start,
|
||||
process_id,
|
||||
Arc::clone(&transcript),
|
||||
request.long_running,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -245,6 +262,7 @@ impl UnifiedExecSessionManager {
|
||||
turn_ref,
|
||||
command: session_command,
|
||||
process_id,
|
||||
long_running,
|
||||
..
|
||||
} = self.prepare_session_handles(process_id.as_str()).await?;
|
||||
|
||||
@@ -307,7 +325,7 @@ impl UnifiedExecSessionManager {
|
||||
session_command: Some(session_command.clone()),
|
||||
};
|
||||
|
||||
if response.process_id.is_some() {
|
||||
if response.process_id.is_some() && !long_running {
|
||||
Self::emit_waiting_status(&session_ref, &turn_ref, &session_command).await;
|
||||
}
|
||||
|
||||
@@ -368,6 +386,7 @@ impl UnifiedExecSessionManager {
|
||||
turn_ref: Arc::clone(&entry.turn_ref),
|
||||
command: entry.command.clone(),
|
||||
process_id: entry.process_id.clone(),
|
||||
long_running: entry.long_running,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -391,6 +410,7 @@ impl UnifiedExecSessionManager {
|
||||
started_at: Instant,
|
||||
process_id: String,
|
||||
transcript: Arc<tokio::sync::Mutex<CommandTranscript>>,
|
||||
long_running: bool,
|
||||
) {
|
||||
let entry = SessionEntry {
|
||||
session: Arc::clone(&session),
|
||||
@@ -400,6 +420,7 @@ impl UnifiedExecSessionManager {
|
||||
process_id: process_id.clone(),
|
||||
command: command.to_vec(),
|
||||
last_used: started_at,
|
||||
long_running,
|
||||
};
|
||||
let number_sessions = {
|
||||
let mut store = self.session_store.lock().await;
|
||||
@@ -418,17 +439,19 @@ impl UnifiedExecSessionManager {
|
||||
.await;
|
||||
};
|
||||
|
||||
spawn_exit_watcher(
|
||||
Arc::clone(&session),
|
||||
Arc::clone(&context.session),
|
||||
Arc::clone(&context.turn),
|
||||
context.call_id.clone(),
|
||||
command.to_vec(),
|
||||
cwd,
|
||||
process_id,
|
||||
transcript,
|
||||
started_at,
|
||||
);
|
||||
if !long_running {
|
||||
spawn_exit_watcher(
|
||||
Arc::clone(&session),
|
||||
Arc::clone(&context.session),
|
||||
Arc::clone(&context.turn),
|
||||
context.call_id.clone(),
|
||||
command.to_vec(),
|
||||
cwd,
|
||||
process_id,
|
||||
transcript,
|
||||
started_at,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async fn emit_waiting_status(
|
||||
@@ -583,13 +606,19 @@ impl UnifiedExecSessionManager {
|
||||
}
|
||||
|
||||
fn prune_sessions_if_needed(store: &mut SessionStore) -> bool {
|
||||
if store.sessions.len() < MAX_UNIFIED_EXEC_SESSIONS {
|
||||
let non_long_running = store
|
||||
.sessions
|
||||
.values()
|
||||
.filter(|entry| !entry.long_running)
|
||||
.count();
|
||||
if non_long_running < MAX_UNIFIED_EXEC_SESSIONS {
|
||||
return false;
|
||||
}
|
||||
|
||||
let meta: Vec<(String, Instant, bool)> = store
|
||||
.sessions
|
||||
.iter()
|
||||
.filter(|(_, entry)| !entry.long_running)
|
||||
.map(|(id, entry)| (id.clone(), entry.last_used, entry.session.has_exited()))
|
||||
.collect();
|
||||
|
||||
@@ -645,6 +674,38 @@ impl UnifiedExecSessionManager {
|
||||
entry.session.terminate();
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn terminate_turn_sessions(&self) {
|
||||
let entries: Vec<SessionEntry> = {
|
||||
let mut sessions = self.session_store.lock().await;
|
||||
let mut keep = HashMap::new();
|
||||
let mut to_terminate = Vec::new();
|
||||
let drained: Vec<(String, SessionEntry)> = sessions.sessions.drain().collect();
|
||||
for (process_id, entry) in drained {
|
||||
if entry.long_running {
|
||||
keep.insert(process_id, entry);
|
||||
} else {
|
||||
sessions.reserved_sessions_id.remove(&process_id);
|
||||
to_terminate.push(entry);
|
||||
}
|
||||
}
|
||||
sessions.sessions = keep;
|
||||
to_terminate
|
||||
};
|
||||
|
||||
for entry in entries {
|
||||
entry.session.terminate();
|
||||
}
|
||||
}
|
||||
|
||||
async fn active_long_running_session_count(&self) -> usize {
|
||||
let store = self.session_store.lock().await;
|
||||
store
|
||||
.sessions
|
||||
.values()
|
||||
.filter(|entry| entry.long_running && !entry.session.has_exited())
|
||||
.count()
|
||||
}
|
||||
}
|
||||
|
||||
enum SessionStatus {
|
||||
|
||||
Reference in New Issue
Block a user