mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
Compare commits
4 Commits
ccunningha
...
jif/implem
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e7dcea2011 | ||
|
|
a789dec845 | ||
|
|
62dbb4322b | ||
|
|
26049c5566 |
@@ -18,6 +18,9 @@ use crate::protocol::SessionConfiguredEvent;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::truncation;
|
||||
use crate::skills::SkillsManager;
|
||||
use crate::unified_exec::DetachedProcessSummary;
|
||||
use crate::unified_exec::DetachedUnifiedExecStore;
|
||||
use crate::unified_exec::ReattachSummary;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::CollaborationModeMask;
|
||||
use codex_protocol::openai_models::ModelPreset;
|
||||
@@ -127,6 +130,7 @@ pub struct ThreadManager {
|
||||
/// function to require an `Arc<&Self>`.
|
||||
pub(crate) struct ThreadManagerState {
|
||||
threads: Arc<RwLock<HashMap<ThreadId, Arc<CodexThread>>>>,
|
||||
detached_unified_exec_store: DetachedUnifiedExecStore,
|
||||
thread_created_tx: broadcast::Sender<ThreadId>,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
models_manager: Arc<ModelsManager>,
|
||||
@@ -150,6 +154,7 @@ impl ThreadManager {
|
||||
Self {
|
||||
state: Arc::new(ThreadManagerState {
|
||||
threads: Arc::new(RwLock::new(HashMap::new())),
|
||||
detached_unified_exec_store: DetachedUnifiedExecStore::default(),
|
||||
thread_created_tx,
|
||||
models_manager: Arc::new(ModelsManager::new(
|
||||
codex_home,
|
||||
@@ -201,6 +206,7 @@ impl ThreadManager {
|
||||
Self {
|
||||
state: Arc::new(ThreadManagerState {
|
||||
threads: Arc::new(RwLock::new(HashMap::new())),
|
||||
detached_unified_exec_store: DetachedUnifiedExecStore::default(),
|
||||
thread_created_tx,
|
||||
models_manager: Arc::new(ModelsManager::with_provider_for_tests(
|
||||
codex_home,
|
||||
@@ -347,6 +353,7 @@ impl ThreadManager {
|
||||
thread.submit(Op::Shutdown).await?;
|
||||
}
|
||||
self.state.threads.write().await.clear();
|
||||
self.state.clean_all_detached_processes().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -379,6 +386,36 @@ impl ThreadManager {
|
||||
AgentControl::new(Arc::downgrade(&self.state))
|
||||
}
|
||||
|
||||
/// Detach unified-exec processes from an attached thread so they can
|
||||
/// survive thread shutdown and be reattached on a later resume.
|
||||
pub async fn detach_thread_processes(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
) -> CodexResult<DetachedProcessSummary> {
|
||||
match self.state.detach_thread_processes(thread_id).await {
|
||||
Ok(summary) => Ok(summary),
|
||||
Err(CodexErr::ThreadNotFound(_)) => Ok(DetachedProcessSummary::default()),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
/// Reattach previously detached unified-exec processes to the currently
|
||||
/// loaded thread session.
|
||||
pub async fn reattach_thread_processes(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
) -> CodexResult<ReattachSummary> {
|
||||
self.state.reattach_thread_processes(thread_id).await
|
||||
}
|
||||
|
||||
/// Terminate and remove any detached unified-exec processes for a thread.
|
||||
pub async fn clean_detached_thread_processes(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
) -> DetachedProcessSummary {
|
||||
self.state.clean_detached_thread_processes(thread_id).await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn captured_ops(&self) -> Vec<(ThreadId, Op)> {
|
||||
self.state
|
||||
@@ -415,6 +452,49 @@ impl ThreadManagerState {
|
||||
self.threads.write().await.remove(thread_id)
|
||||
}
|
||||
|
||||
pub(crate) async fn detach_thread_processes(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
) -> CodexResult<DetachedProcessSummary> {
|
||||
let thread = self.get_thread(thread_id).await?;
|
||||
let summary = self
|
||||
.detached_unified_exec_store
|
||||
.detach_from_manager(
|
||||
thread_id,
|
||||
&thread.codex.session.services.unified_exec_manager,
|
||||
)
|
||||
.await;
|
||||
Ok(summary)
|
||||
}
|
||||
|
||||
pub(crate) async fn reattach_thread_processes(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
) -> CodexResult<ReattachSummary> {
|
||||
let thread = self.get_thread(thread_id).await?;
|
||||
let summary = self
|
||||
.detached_unified_exec_store
|
||||
.reattach_to_manager(
|
||||
thread_id,
|
||||
&thread.codex.session.services.unified_exec_manager,
|
||||
)
|
||||
.await;
|
||||
Ok(summary)
|
||||
}
|
||||
|
||||
pub(crate) async fn clean_detached_thread_processes(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
) -> DetachedProcessSummary {
|
||||
self.detached_unified_exec_store
|
||||
.clean_thread(thread_id)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn clean_all_detached_processes(&self) {
|
||||
self.detached_unified_exec_store.clean_all().await;
|
||||
}
|
||||
|
||||
/// Spawn a new thread with no history using a provided config.
|
||||
pub(crate) async fn spawn_new_thread(
|
||||
&self,
|
||||
@@ -571,6 +651,7 @@ fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> Initia
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::codex::make_session_and_context;
|
||||
use crate::model_provider_info::built_in_model_providers;
|
||||
use assert_matches::assert_matches;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ReasoningItemReasoningSummary;
|
||||
@@ -681,4 +762,19 @@ mod tests {
|
||||
serde_json::to_value(&expected).unwrap()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn detach_thread_processes_is_noop_for_missing_thread() {
|
||||
let manager = ThreadManager::with_models_provider_for_tests(
|
||||
CodexAuth::from_api_key("dummy"),
|
||||
built_in_model_providers()["openai"].clone(),
|
||||
);
|
||||
|
||||
let summary = manager
|
||||
.detach_thread_processes(ThreadId::default())
|
||||
.await
|
||||
.expect("missing thread detach should not fail");
|
||||
|
||||
assert_eq!(summary, DetachedProcessSummary::default());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
use tokio::time::Sleep;
|
||||
@@ -40,7 +41,7 @@ pub(crate) fn start_streaming_output(
|
||||
process: &UnifiedExecProcess,
|
||||
context: &UnifiedExecContext,
|
||||
transcript: Arc<Mutex<HeadTailBuffer>>,
|
||||
) {
|
||||
) -> JoinHandle<()> {
|
||||
let mut receiver = process.output_receiver();
|
||||
let output_drained = process.output_drained_notify();
|
||||
let exit_token = process.cancellation_token();
|
||||
@@ -97,7 +98,7 @@ pub(crate) fn start_streaming_output(
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
/// Spawn a background watcher that waits for the PTY to exit and then emits a
|
||||
@@ -113,7 +114,7 @@ pub(crate) fn spawn_exit_watcher(
|
||||
process_id: String,
|
||||
transcript: Arc<Mutex<HeadTailBuffer>>,
|
||||
started_at: Instant,
|
||||
) {
|
||||
) -> JoinHandle<()> {
|
||||
let exit_token = process.cancellation_token();
|
||||
let output_drained = process.output_drained_notify();
|
||||
|
||||
@@ -136,7 +137,7 @@ pub(crate) fn spawn_exit_watcher(
|
||||
duration,
|
||||
)
|
||||
.await;
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn spawn_network_denial_watcher(
|
||||
@@ -144,7 +145,7 @@ pub(crate) fn spawn_network_denial_watcher(
|
||||
session: Arc<Session>,
|
||||
process_id: String,
|
||||
network_attempt_id: String,
|
||||
) {
|
||||
) -> JoinHandle<()> {
|
||||
let exit_token = process.cancellation_token();
|
||||
tokio::spawn(async move {
|
||||
let mut poll = tokio::time::interval(Duration::from_millis(100));
|
||||
@@ -173,7 +174,7 @@ pub(crate) fn spawn_network_denial_watcher(
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
async fn process_chunk(
|
||||
|
||||
164
codex-rs/core/src/unified_exec/detached_store.rs
Normal file
164
codex-rs/core/src/unified_exec/detached_store.rs
Normal file
@@ -0,0 +1,164 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use codex_protocol::ThreadId;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use crate::unified_exec::ProcessEntry;
|
||||
use crate::unified_exec::UnifiedExecProcessManager;
|
||||
|
||||
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
||||
pub struct DetachedProcessSummary {
|
||||
pub process_count: usize,
|
||||
pub process_ids: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
||||
pub struct ReattachSummary {
|
||||
pub reattached_count: usize,
|
||||
pub skipped_count: usize,
|
||||
pub skipped_process_ids: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct DetachedUnifiedExecStore {
|
||||
/// Detached unified-exec processes keyed by `(thread_id, process_id)`.
|
||||
entries: Mutex<HashMap<(ThreadId, String), ProcessEntry>>,
|
||||
}
|
||||
|
||||
impl DetachedUnifiedExecStore {
|
||||
/// Export processes from an attached manager and store them under `thread_id`.
|
||||
///
|
||||
/// This method does not hold the detached-store lock while awaiting manager
|
||||
/// export so process-manager and thread-manager lock domains stay separated.
|
||||
pub(crate) async fn detach_from_manager(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
manager: &UnifiedExecProcessManager,
|
||||
) -> DetachedProcessSummary {
|
||||
let exported = manager.export_processes().await;
|
||||
let mut process_ids = exported
|
||||
.iter()
|
||||
.map(|entry| entry.process_id.clone())
|
||||
.collect::<Vec<_>>();
|
||||
process_ids.sort();
|
||||
|
||||
let mut replaced_entries = Vec::new();
|
||||
{
|
||||
let mut entries = self.entries.lock().await;
|
||||
for entry in exported {
|
||||
let process_id = entry.process_id.clone();
|
||||
let key = (thread_id, process_id);
|
||||
if let Some(existing) = entries.insert(key, entry) {
|
||||
// Keep exactly one detached process entry per
|
||||
// (thread_id, process_id). Replacements are stale detached
|
||||
// handles and must be terminated to avoid leaked children.
|
||||
replaced_entries.push(existing);
|
||||
}
|
||||
}
|
||||
}
|
||||
for mut replaced in replaced_entries {
|
||||
Self::abort_all_watcher_tasks(&mut replaced);
|
||||
replaced.process.terminate();
|
||||
}
|
||||
|
||||
DetachedProcessSummary {
|
||||
process_count: process_ids.len(),
|
||||
process_ids,
|
||||
}
|
||||
}
|
||||
|
||||
/// Move detached processes for `thread_id` back into an attached manager.
|
||||
pub(crate) async fn reattach_to_manager(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
manager: &UnifiedExecProcessManager,
|
||||
) -> ReattachSummary {
|
||||
let detached_entries = {
|
||||
let mut entries = self.entries.lock().await;
|
||||
let mut keys = entries
|
||||
.keys()
|
||||
.filter(|(candidate_thread_id, _)| *candidate_thread_id == thread_id)
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
keys.sort_by(|left, right| left.1.cmp(&right.1));
|
||||
|
||||
keys.into_iter()
|
||||
.filter_map(|key| entries.remove(&key))
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
if detached_entries.is_empty() {
|
||||
return ReattachSummary::default();
|
||||
}
|
||||
|
||||
let mut skipped_entries = Vec::new();
|
||||
let summary = manager
|
||||
.import_processes(detached_entries, &mut skipped_entries)
|
||||
.await;
|
||||
if !skipped_entries.is_empty() {
|
||||
let mut entries = self.entries.lock().await;
|
||||
for entry in skipped_entries {
|
||||
let key = (thread_id, entry.process_id.clone());
|
||||
let _ = entries.insert(key, entry);
|
||||
}
|
||||
}
|
||||
|
||||
summary
|
||||
}
|
||||
|
||||
/// Terminate and remove all detached processes associated with `thread_id`.
|
||||
pub(crate) async fn clean_thread(&self, thread_id: ThreadId) -> DetachedProcessSummary {
|
||||
let entries = {
|
||||
let mut entries = self.entries.lock().await;
|
||||
let mut keys = entries
|
||||
.keys()
|
||||
.filter(|(candidate_thread_id, _)| *candidate_thread_id == thread_id)
|
||||
.cloned()
|
||||
.collect::<Vec<_>>();
|
||||
keys.sort_by(|left, right| left.1.cmp(&right.1));
|
||||
|
||||
keys.into_iter()
|
||||
.filter_map(|key| entries.remove(&key))
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
let mut process_ids = entries
|
||||
.iter()
|
||||
.map(|entry| entry.process_id.clone())
|
||||
.collect::<Vec<_>>();
|
||||
process_ids.sort();
|
||||
for mut entry in entries {
|
||||
Self::abort_all_watcher_tasks(&mut entry);
|
||||
entry.process.terminate();
|
||||
}
|
||||
|
||||
DetachedProcessSummary {
|
||||
process_count: process_ids.len(),
|
||||
process_ids,
|
||||
}
|
||||
}
|
||||
|
||||
/// Terminate and remove every detached process across all threads.
|
||||
pub(crate) async fn clean_all(&self) {
|
||||
let entries = {
|
||||
let mut entries = self.entries.lock().await;
|
||||
entries.drain().map(|(_, entry)| entry).collect::<Vec<_>>()
|
||||
};
|
||||
for mut entry in entries {
|
||||
Self::abort_all_watcher_tasks(&mut entry);
|
||||
entry.process.terminate();
|
||||
}
|
||||
}
|
||||
|
||||
fn abort_all_watcher_tasks(entry: &mut ProcessEntry) {
|
||||
if let Some(task) = entry.stream_task.take() {
|
||||
task.abort();
|
||||
}
|
||||
if let Some(task) = entry.exit_task.take() {
|
||||
task.abort();
|
||||
}
|
||||
if let Some(task) = entry.network_task.take() {
|
||||
task.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -32,12 +32,14 @@ use codex_network_proxy::NetworkProxy;
|
||||
use rand::Rng;
|
||||
use rand::rng;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::sandboxing::SandboxPermissions;
|
||||
|
||||
mod async_watcher;
|
||||
mod detached_store;
|
||||
mod errors;
|
||||
mod head_tail_buffer;
|
||||
mod process;
|
||||
@@ -47,6 +49,9 @@ pub(crate) fn set_deterministic_process_ids_for_tests(enabled: bool) {
|
||||
process_manager::set_deterministic_process_ids_for_tests(enabled);
|
||||
}
|
||||
|
||||
pub use detached_store::DetachedProcessSummary;
|
||||
pub(crate) use detached_store::DetachedUnifiedExecStore;
|
||||
pub use detached_store::ReattachSummary;
|
||||
pub(crate) use errors::UnifiedExecError;
|
||||
pub(crate) use process::UnifiedExecProcess;
|
||||
|
||||
@@ -149,7 +154,7 @@ impl Default for UnifiedExecProcessManager {
|
||||
}
|
||||
}
|
||||
|
||||
struct ProcessEntry {
|
||||
pub(crate) struct ProcessEntry {
|
||||
process: Arc<UnifiedExecProcess>,
|
||||
call_id: String,
|
||||
process_id: String,
|
||||
@@ -158,6 +163,12 @@ struct ProcessEntry {
|
||||
network_attempt_id: Option<String>,
|
||||
session: Weak<Session>,
|
||||
last_used: tokio::time::Instant,
|
||||
// Session-bound watcher tasks.
|
||||
// - `stream_task` and `network_task` are aborted on detach.
|
||||
// - `exit_task` is retained while detached, then aborted on reattach/cleanup.
|
||||
stream_task: Option<JoinHandle<()>>,
|
||||
exit_task: Option<JoinHandle<()>>,
|
||||
network_task: Option<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
pub(crate) fn clamp_yield_time(yield_time_ms: u64) -> u64 {
|
||||
@@ -499,4 +510,66 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn export_import_keeps_process_alive_and_stable_process_id() -> anyhow::Result<()> {
|
||||
skip_if_sandbox!(Ok(()));
|
||||
|
||||
let (session, turn) = test_session_and_turn().await;
|
||||
|
||||
let open_shell = exec_command(&session, &turn, "bash -i", 2_500).await?;
|
||||
let process_id = open_shell.process_id.clone().expect("expected process id");
|
||||
|
||||
let mut detached = session
|
||||
.services
|
||||
.unified_exec_manager
|
||||
.export_processes()
|
||||
.await;
|
||||
assert_eq!(detached.len(), 1);
|
||||
let detached_entry = detached.pop().expect("expected detached entry");
|
||||
assert_eq!(detached_entry.process_id, process_id);
|
||||
assert_eq!(detached_entry.network_attempt_id, None);
|
||||
|
||||
let err = write_stdin(&session, process_id.as_str(), "", 100)
|
||||
.await
|
||||
.expect_err("detached process should be unavailable in old manager");
|
||||
match err {
|
||||
UnifiedExecError::UnknownProcessId { process_id: err_id } => {
|
||||
assert_eq!(err_id, process_id);
|
||||
}
|
||||
other => panic!("expected UnknownProcessId, got {other:?}"),
|
||||
}
|
||||
|
||||
detached_entry
|
||||
.process
|
||||
.writer_sender()
|
||||
.send(b"echo detached-alive\n".to_vec())
|
||||
.await
|
||||
.expect("writer channel should be available while detached");
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(250)).await;
|
||||
|
||||
let mut skipped_entries = Vec::new();
|
||||
let import = session
|
||||
.services
|
||||
.unified_exec_manager
|
||||
.import_processes(vec![detached_entry], &mut skipped_entries)
|
||||
.await;
|
||||
assert_eq!(import.reattached_count, 1);
|
||||
assert_eq!(import.skipped_count, 0);
|
||||
assert!(import.skipped_process_ids.is_empty());
|
||||
assert!(skipped_entries.is_empty());
|
||||
|
||||
let resumed = write_stdin(&session, process_id.as_str(), "", 2_500).await?;
|
||||
assert!(resumed.output.contains("detached-alive"));
|
||||
|
||||
write_stdin(&session, process_id.as_str(), "exit\n", 2_500).await?;
|
||||
session
|
||||
.services
|
||||
.unified_exec_manager
|
||||
.terminate_all_processes()
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -129,7 +129,7 @@ impl UnifiedExecProcess {
|
||||
self.process_handle.exit_code()
|
||||
}
|
||||
|
||||
pub(super) fn terminate(&self) {
|
||||
pub(crate) fn terminate(&self) {
|
||||
self.output_closed.store(true, Ordering::Release);
|
||||
self.output_closed_notify.notify_waiters();
|
||||
self.process_handle.terminate();
|
||||
|
||||
@@ -36,6 +36,7 @@ use crate::unified_exec::MIN_EMPTY_YIELD_TIME_MS;
|
||||
use crate::unified_exec::MIN_YIELD_TIME_MS;
|
||||
use crate::unified_exec::ProcessEntry;
|
||||
use crate::unified_exec::ProcessStore;
|
||||
use crate::unified_exec::ReattachSummary;
|
||||
use crate::unified_exec::UnifiedExecContext;
|
||||
use crate::unified_exec::UnifiedExecError;
|
||||
use crate::unified_exec::UnifiedExecProcessManager;
|
||||
@@ -140,6 +141,9 @@ impl UnifiedExecProcessManager {
|
||||
store.remove(process_id)
|
||||
};
|
||||
if let Some(entry) = removed {
|
||||
// Intentionally do not abort watcher tasks here. In attached flows
|
||||
// (for example network denial), stream/exit watchers must finish so
|
||||
// ExecCommandEnd is still emitted after forced termination.
|
||||
Self::unregister_network_attempt_for_entry(&entry).await;
|
||||
}
|
||||
}
|
||||
@@ -194,7 +198,7 @@ impl UnifiedExecProcessManager {
|
||||
);
|
||||
emitter.emit(event_ctx, ToolEventStage::Begin).await;
|
||||
|
||||
start_streaming_output(&process, context, Arc::clone(&transcript));
|
||||
let stream_task = start_streaming_output(&process, context, Arc::clone(&transcript));
|
||||
let max_tokens = resolve_max_tokens(request.max_output_tokens);
|
||||
let yield_time_ms = clamp_yield_time(request.yield_time_ms);
|
||||
|
||||
@@ -247,6 +251,7 @@ impl UnifiedExecProcessManager {
|
||||
)
|
||||
.await;
|
||||
|
||||
stream_task.abort();
|
||||
self.release_process_id(&request.process_id).await;
|
||||
if let Some(deferred) = deferred_network_approval.as_ref()
|
||||
&& let Some(message) =
|
||||
@@ -297,6 +302,7 @@ impl UnifiedExecProcessManager {
|
||||
request.tty,
|
||||
network_attempt_id,
|
||||
Arc::clone(&transcript),
|
||||
stream_task,
|
||||
)
|
||||
.await;
|
||||
};
|
||||
@@ -430,6 +436,8 @@ impl UnifiedExecProcessManager {
|
||||
let Some(entry) = store.remove(&process_id) else {
|
||||
return ProcessStatus::Unknown;
|
||||
};
|
||||
// Do not abort watcher tasks on natural exit detection here.
|
||||
// The exit watcher emits ExecCommandEnd after output draining.
|
||||
ProcessStatus::Exited {
|
||||
exit_code,
|
||||
entry: Box::new(entry),
|
||||
@@ -504,8 +512,32 @@ impl UnifiedExecProcessManager {
|
||||
tty: bool,
|
||||
network_attempt_id: Option<String>,
|
||||
transcript: Arc<tokio::sync::Mutex<HeadTailBuffer>>,
|
||||
stream_task: tokio::task::JoinHandle<()>,
|
||||
) {
|
||||
let network_attempt_id_for_watcher = network_attempt_id.clone();
|
||||
let exit_task = spawn_exit_watcher(
|
||||
Arc::clone(&process),
|
||||
Arc::clone(&context.session),
|
||||
Arc::clone(&context.turn),
|
||||
context.call_id.clone(),
|
||||
command.to_vec(),
|
||||
cwd,
|
||||
process_id.clone(),
|
||||
Arc::clone(&transcript),
|
||||
started_at,
|
||||
);
|
||||
let network_task = if context.turn.config.managed_network_requirements_enabled() {
|
||||
network_attempt_id_for_watcher.map(|network_attempt_id| {
|
||||
spawn_network_denial_watcher(
|
||||
Arc::clone(&process),
|
||||
Arc::clone(&context.session),
|
||||
process_id.clone(),
|
||||
network_attempt_id,
|
||||
)
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let entry = ProcessEntry {
|
||||
process: Arc::clone(&process),
|
||||
call_id: context.call_id.clone(),
|
||||
@@ -515,6 +547,9 @@ impl UnifiedExecProcessManager {
|
||||
network_attempt_id,
|
||||
session: Arc::downgrade(&context.session),
|
||||
last_used: started_at,
|
||||
stream_task: Some(stream_task),
|
||||
exit_task: Some(exit_task),
|
||||
network_task,
|
||||
};
|
||||
let (number_processes, pruned_entry) = {
|
||||
let mut store = self.process_store.lock().await;
|
||||
@@ -524,7 +559,8 @@ impl UnifiedExecProcessManager {
|
||||
};
|
||||
// prune_processes_if_needed runs while holding process_store; do async
|
||||
// network-approval cleanup only after dropping that lock.
|
||||
if let Some(pruned_entry) = pruned_entry {
|
||||
if let Some(mut pruned_entry) = pruned_entry {
|
||||
Self::abort_all_watcher_tasks(&mut pruned_entry);
|
||||
Self::unregister_network_attempt_for_entry(&pruned_entry).await;
|
||||
pruned_entry.process.terminate();
|
||||
}
|
||||
@@ -538,29 +574,6 @@ impl UnifiedExecProcessManager {
|
||||
)
|
||||
.await;
|
||||
};
|
||||
|
||||
spawn_exit_watcher(
|
||||
Arc::clone(&process),
|
||||
Arc::clone(&context.session),
|
||||
Arc::clone(&context.turn),
|
||||
context.call_id.clone(),
|
||||
command.to_vec(),
|
||||
cwd,
|
||||
process_id.clone(),
|
||||
transcript,
|
||||
started_at,
|
||||
);
|
||||
|
||||
if context.turn.config.managed_network_requirements_enabled()
|
||||
&& let Some(network_attempt_id) = network_attempt_id_for_watcher
|
||||
{
|
||||
spawn_network_denial_watcher(
|
||||
Arc::clone(&process),
|
||||
Arc::clone(&context.session),
|
||||
process_id,
|
||||
network_attempt_id,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn open_session_with_exec_env(
|
||||
@@ -791,11 +804,104 @@ impl UnifiedExecProcessManager {
|
||||
entries
|
||||
};
|
||||
|
||||
for entry in entries {
|
||||
for mut entry in entries {
|
||||
Self::abort_all_watcher_tasks(&mut entry);
|
||||
Self::unregister_network_attempt_for_entry(&entry).await;
|
||||
entry.process.terminate();
|
||||
}
|
||||
}
|
||||
|
||||
/// Export selected process entries from this manager without terminating
|
||||
/// underlying child processes.
|
||||
///
|
||||
/// This drains the matching entries from the session-local store, removes
|
||||
/// their reserved process IDs, aborts output/network watchers, keeps the
|
||||
/// exit watcher alive while detached, and clears network approval linkage.
|
||||
pub(crate) async fn export_processes(&self) -> Vec<ProcessEntry> {
|
||||
let entries = {
|
||||
let mut store = self.process_store.lock().await;
|
||||
let mut selected = store.processes.keys().cloned().collect::<Vec<_>>();
|
||||
selected.sort();
|
||||
selected
|
||||
.into_iter()
|
||||
.filter_map(|process_id| store.remove(&process_id))
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
let mut exported = Vec::with_capacity(entries.len());
|
||||
for mut entry in entries {
|
||||
Self::abort_detach_watcher_tasks(&mut entry);
|
||||
Self::unregister_network_attempt_for_entry(&entry).await;
|
||||
entry.network_attempt_id = None;
|
||||
exported.push(entry);
|
||||
}
|
||||
|
||||
exported
|
||||
}
|
||||
|
||||
/// Import previously detached process entries into this manager.
|
||||
///
|
||||
/// Process IDs are preserved and reserved in the destination manager so
|
||||
/// existing `write_stdin(session_id=...)` IDs remain stable. Entries whose
|
||||
/// IDs already exist are skipped and returned in the summary.
|
||||
pub(crate) async fn import_processes(
|
||||
&self,
|
||||
entries: Vec<ProcessEntry>,
|
||||
skipped_entries: &mut Vec<ProcessEntry>,
|
||||
) -> ReattachSummary {
|
||||
let mut reattached_count = 0usize;
|
||||
let mut skipped_count = 0usize;
|
||||
let mut skipped_process_ids = Vec::new();
|
||||
|
||||
let mut store = self.process_store.lock().await;
|
||||
for mut entry in entries {
|
||||
if store.processes.contains_key(&entry.process_id)
|
||||
|| store.reserved_process_ids.contains(&entry.process_id)
|
||||
{
|
||||
// Keep existing attached ownership authoritative when there is
|
||||
// an ID collision, and hand the detached entry back to caller.
|
||||
skipped_count += 1;
|
||||
skipped_process_ids.push(entry.process_id.clone());
|
||||
skipped_entries.push(entry);
|
||||
continue;
|
||||
}
|
||||
|
||||
let process_id = entry.process_id.clone();
|
||||
entry.network_attempt_id = None;
|
||||
if let Some(task) = entry.exit_task.take() {
|
||||
task.abort();
|
||||
}
|
||||
entry.session = std::sync::Weak::new();
|
||||
entry.stream_task = None;
|
||||
entry.network_task = None;
|
||||
store.reserved_process_ids.insert(process_id.clone());
|
||||
store.processes.insert(process_id, entry);
|
||||
reattached_count += 1;
|
||||
}
|
||||
skipped_process_ids.sort();
|
||||
|
||||
ReattachSummary {
|
||||
reattached_count,
|
||||
skipped_count,
|
||||
skipped_process_ids,
|
||||
}
|
||||
}
|
||||
|
||||
fn abort_detach_watcher_tasks(entry: &mut ProcessEntry) {
|
||||
if let Some(task) = entry.stream_task.take() {
|
||||
task.abort();
|
||||
}
|
||||
if let Some(task) = entry.network_task.take() {
|
||||
task.abort();
|
||||
}
|
||||
}
|
||||
|
||||
fn abort_all_watcher_tasks(entry: &mut ProcessEntry) {
|
||||
Self::abort_detach_watcher_tasks(entry);
|
||||
if let Some(task) = entry.exit_task.take() {
|
||||
task.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum ProcessStatus {
|
||||
|
||||
Reference in New Issue
Block a user