Compare commits

...

4 Commits

Author SHA1 Message Date
jif-oai
e7dcea2011 Merge branch 'main' into jif/implement-unified-exec-detachreatt 2026-02-19 18:45:07 +00:00
jif-oai
a789dec845 Fix watcher abort handling 2026-02-19 17:11:50 +00:00
jif-oai
62dbb4322b Review watcher task shutdown 2026-02-19 15:54:42 +00:00
jif-oai
26049c5566 Allow unified exec processes to re‑h 2026-02-19 15:48:47 +00:00
6 changed files with 474 additions and 34 deletions

View File

@@ -18,6 +18,9 @@ use crate::protocol::SessionConfiguredEvent;
use crate::rollout::RolloutRecorder;
use crate::rollout::truncation;
use crate::skills::SkillsManager;
use crate::unified_exec::DetachedProcessSummary;
use crate::unified_exec::DetachedUnifiedExecStore;
use crate::unified_exec::ReattachSummary;
use codex_protocol::ThreadId;
use codex_protocol::config_types::CollaborationModeMask;
use codex_protocol::openai_models::ModelPreset;
@@ -127,6 +130,7 @@ pub struct ThreadManager {
/// function to require an `Arc<&Self>`.
pub(crate) struct ThreadManagerState {
threads: Arc<RwLock<HashMap<ThreadId, Arc<CodexThread>>>>,
detached_unified_exec_store: DetachedUnifiedExecStore,
thread_created_tx: broadcast::Sender<ThreadId>,
auth_manager: Arc<AuthManager>,
models_manager: Arc<ModelsManager>,
@@ -150,6 +154,7 @@ impl ThreadManager {
Self {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
detached_unified_exec_store: DetachedUnifiedExecStore::default(),
thread_created_tx,
models_manager: Arc::new(ModelsManager::new(
codex_home,
@@ -201,6 +206,7 @@ impl ThreadManager {
Self {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
detached_unified_exec_store: DetachedUnifiedExecStore::default(),
thread_created_tx,
models_manager: Arc::new(ModelsManager::with_provider_for_tests(
codex_home,
@@ -347,6 +353,7 @@ impl ThreadManager {
thread.submit(Op::Shutdown).await?;
}
self.state.threads.write().await.clear();
self.state.clean_all_detached_processes().await;
Ok(())
}
@@ -379,6 +386,36 @@ impl ThreadManager {
AgentControl::new(Arc::downgrade(&self.state))
}
/// Detach unified-exec processes from an attached thread so they can
/// survive thread shutdown and be reattached on a later resume.
pub async fn detach_thread_processes(
&self,
thread_id: ThreadId,
) -> CodexResult<DetachedProcessSummary> {
match self.state.detach_thread_processes(thread_id).await {
Ok(summary) => Ok(summary),
Err(CodexErr::ThreadNotFound(_)) => Ok(DetachedProcessSummary::default()),
Err(err) => Err(err),
}
}
/// Reattach previously detached unified-exec processes to the currently
/// loaded thread session.
pub async fn reattach_thread_processes(
&self,
thread_id: ThreadId,
) -> CodexResult<ReattachSummary> {
self.state.reattach_thread_processes(thread_id).await
}
/// Terminate and remove any detached unified-exec processes for a thread.
pub async fn clean_detached_thread_processes(
&self,
thread_id: ThreadId,
) -> DetachedProcessSummary {
self.state.clean_detached_thread_processes(thread_id).await
}
#[cfg(test)]
pub(crate) fn captured_ops(&self) -> Vec<(ThreadId, Op)> {
self.state
@@ -415,6 +452,49 @@ impl ThreadManagerState {
self.threads.write().await.remove(thread_id)
}
pub(crate) async fn detach_thread_processes(
&self,
thread_id: ThreadId,
) -> CodexResult<DetachedProcessSummary> {
let thread = self.get_thread(thread_id).await?;
let summary = self
.detached_unified_exec_store
.detach_from_manager(
thread_id,
&thread.codex.session.services.unified_exec_manager,
)
.await;
Ok(summary)
}
pub(crate) async fn reattach_thread_processes(
&self,
thread_id: ThreadId,
) -> CodexResult<ReattachSummary> {
let thread = self.get_thread(thread_id).await?;
let summary = self
.detached_unified_exec_store
.reattach_to_manager(
thread_id,
&thread.codex.session.services.unified_exec_manager,
)
.await;
Ok(summary)
}
pub(crate) async fn clean_detached_thread_processes(
&self,
thread_id: ThreadId,
) -> DetachedProcessSummary {
self.detached_unified_exec_store
.clean_thread(thread_id)
.await
}
pub(crate) async fn clean_all_detached_processes(&self) {
self.detached_unified_exec_store.clean_all().await;
}
/// Spawn a new thread with no history using a provided config.
pub(crate) async fn spawn_new_thread(
&self,
@@ -571,6 +651,7 @@ fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> Initia
mod tests {
use super::*;
use crate::codex::make_session_and_context;
use crate::model_provider_info::built_in_model_providers;
use assert_matches::assert_matches;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemReasoningSummary;
@@ -681,4 +762,19 @@ mod tests {
serde_json::to_value(&expected).unwrap()
);
}
#[tokio::test]
async fn detach_thread_processes_is_noop_for_missing_thread() {
let manager = ThreadManager::with_models_provider_for_tests(
CodexAuth::from_api_key("dummy"),
built_in_model_providers()["openai"].clone(),
);
let summary = manager
.detach_thread_processes(ThreadId::default())
.await
.expect("missing thread detach should not fail");
assert_eq!(summary, DetachedProcessSummary::default());
}
}

View File

@@ -3,6 +3,7 @@ use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use tokio::time::Instant;
use tokio::time::Sleep;
@@ -40,7 +41,7 @@ pub(crate) fn start_streaming_output(
process: &UnifiedExecProcess,
context: &UnifiedExecContext,
transcript: Arc<Mutex<HeadTailBuffer>>,
) {
) -> JoinHandle<()> {
let mut receiver = process.output_receiver();
let output_drained = process.output_drained_notify();
let exit_token = process.cancellation_token();
@@ -97,7 +98,7 @@ pub(crate) fn start_streaming_output(
}
}
}
});
})
}
/// Spawn a background watcher that waits for the PTY to exit and then emits a
@@ -113,7 +114,7 @@ pub(crate) fn spawn_exit_watcher(
process_id: String,
transcript: Arc<Mutex<HeadTailBuffer>>,
started_at: Instant,
) {
) -> JoinHandle<()> {
let exit_token = process.cancellation_token();
let output_drained = process.output_drained_notify();
@@ -136,7 +137,7 @@ pub(crate) fn spawn_exit_watcher(
duration,
)
.await;
});
})
}
pub(crate) fn spawn_network_denial_watcher(
@@ -144,7 +145,7 @@ pub(crate) fn spawn_network_denial_watcher(
session: Arc<Session>,
process_id: String,
network_attempt_id: String,
) {
) -> JoinHandle<()> {
let exit_token = process.cancellation_token();
tokio::spawn(async move {
let mut poll = tokio::time::interval(Duration::from_millis(100));
@@ -173,7 +174,7 @@ pub(crate) fn spawn_network_denial_watcher(
}
}
}
});
})
}
async fn process_chunk(

View File

@@ -0,0 +1,164 @@
use std::collections::HashMap;
use codex_protocol::ThreadId;
use tokio::sync::Mutex;
use crate::unified_exec::ProcessEntry;
use crate::unified_exec::UnifiedExecProcessManager;
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct DetachedProcessSummary {
pub process_count: usize,
pub process_ids: Vec<String>,
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct ReattachSummary {
pub reattached_count: usize,
pub skipped_count: usize,
pub skipped_process_ids: Vec<String>,
}
#[derive(Default)]
pub(crate) struct DetachedUnifiedExecStore {
/// Detached unified-exec processes keyed by `(thread_id, process_id)`.
entries: Mutex<HashMap<(ThreadId, String), ProcessEntry>>,
}
impl DetachedUnifiedExecStore {
/// Export processes from an attached manager and store them under `thread_id`.
///
/// This method does not hold the detached-store lock while awaiting manager
/// export so process-manager and thread-manager lock domains stay separated.
pub(crate) async fn detach_from_manager(
&self,
thread_id: ThreadId,
manager: &UnifiedExecProcessManager,
) -> DetachedProcessSummary {
let exported = manager.export_processes().await;
let mut process_ids = exported
.iter()
.map(|entry| entry.process_id.clone())
.collect::<Vec<_>>();
process_ids.sort();
let mut replaced_entries = Vec::new();
{
let mut entries = self.entries.lock().await;
for entry in exported {
let process_id = entry.process_id.clone();
let key = (thread_id, process_id);
if let Some(existing) = entries.insert(key, entry) {
// Keep exactly one detached process entry per
// (thread_id, process_id). Replacements are stale detached
// handles and must be terminated to avoid leaked children.
replaced_entries.push(existing);
}
}
}
for mut replaced in replaced_entries {
Self::abort_all_watcher_tasks(&mut replaced);
replaced.process.terminate();
}
DetachedProcessSummary {
process_count: process_ids.len(),
process_ids,
}
}
/// Move detached processes for `thread_id` back into an attached manager.
pub(crate) async fn reattach_to_manager(
&self,
thread_id: ThreadId,
manager: &UnifiedExecProcessManager,
) -> ReattachSummary {
let detached_entries = {
let mut entries = self.entries.lock().await;
let mut keys = entries
.keys()
.filter(|(candidate_thread_id, _)| *candidate_thread_id == thread_id)
.cloned()
.collect::<Vec<_>>();
keys.sort_by(|left, right| left.1.cmp(&right.1));
keys.into_iter()
.filter_map(|key| entries.remove(&key))
.collect::<Vec<_>>()
};
if detached_entries.is_empty() {
return ReattachSummary::default();
}
let mut skipped_entries = Vec::new();
let summary = manager
.import_processes(detached_entries, &mut skipped_entries)
.await;
if !skipped_entries.is_empty() {
let mut entries = self.entries.lock().await;
for entry in skipped_entries {
let key = (thread_id, entry.process_id.clone());
let _ = entries.insert(key, entry);
}
}
summary
}
/// Terminate and remove all detached processes associated with `thread_id`.
pub(crate) async fn clean_thread(&self, thread_id: ThreadId) -> DetachedProcessSummary {
let entries = {
let mut entries = self.entries.lock().await;
let mut keys = entries
.keys()
.filter(|(candidate_thread_id, _)| *candidate_thread_id == thread_id)
.cloned()
.collect::<Vec<_>>();
keys.sort_by(|left, right| left.1.cmp(&right.1));
keys.into_iter()
.filter_map(|key| entries.remove(&key))
.collect::<Vec<_>>()
};
let mut process_ids = entries
.iter()
.map(|entry| entry.process_id.clone())
.collect::<Vec<_>>();
process_ids.sort();
for mut entry in entries {
Self::abort_all_watcher_tasks(&mut entry);
entry.process.terminate();
}
DetachedProcessSummary {
process_count: process_ids.len(),
process_ids,
}
}
/// Terminate and remove every detached process across all threads.
pub(crate) async fn clean_all(&self) {
let entries = {
let mut entries = self.entries.lock().await;
entries.drain().map(|(_, entry)| entry).collect::<Vec<_>>()
};
for mut entry in entries {
Self::abort_all_watcher_tasks(&mut entry);
entry.process.terminate();
}
}
fn abort_all_watcher_tasks(entry: &mut ProcessEntry) {
if let Some(task) = entry.stream_task.take() {
task.abort();
}
if let Some(task) = entry.exit_task.take() {
task.abort();
}
if let Some(task) = entry.network_task.take() {
task.abort();
}
}
}

View File

@@ -32,12 +32,14 @@ use codex_network_proxy::NetworkProxy;
use rand::Rng;
use rand::rng;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::sandboxing::SandboxPermissions;
mod async_watcher;
mod detached_store;
mod errors;
mod head_tail_buffer;
mod process;
@@ -47,6 +49,9 @@ pub(crate) fn set_deterministic_process_ids_for_tests(enabled: bool) {
process_manager::set_deterministic_process_ids_for_tests(enabled);
}
pub use detached_store::DetachedProcessSummary;
pub(crate) use detached_store::DetachedUnifiedExecStore;
pub use detached_store::ReattachSummary;
pub(crate) use errors::UnifiedExecError;
pub(crate) use process::UnifiedExecProcess;
@@ -149,7 +154,7 @@ impl Default for UnifiedExecProcessManager {
}
}
struct ProcessEntry {
pub(crate) struct ProcessEntry {
process: Arc<UnifiedExecProcess>,
call_id: String,
process_id: String,
@@ -158,6 +163,12 @@ struct ProcessEntry {
network_attempt_id: Option<String>,
session: Weak<Session>,
last_used: tokio::time::Instant,
// Session-bound watcher tasks.
// - `stream_task` and `network_task` are aborted on detach.
// - `exit_task` is retained while detached, then aborted on reattach/cleanup.
stream_task: Option<JoinHandle<()>>,
exit_task: Option<JoinHandle<()>>,
network_task: Option<JoinHandle<()>>,
}
pub(crate) fn clamp_yield_time(yield_time_ms: u64) -> u64 {
@@ -499,4 +510,66 @@ mod tests {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn export_import_keeps_process_alive_and_stable_process_id() -> anyhow::Result<()> {
skip_if_sandbox!(Ok(()));
let (session, turn) = test_session_and_turn().await;
let open_shell = exec_command(&session, &turn, "bash -i", 2_500).await?;
let process_id = open_shell.process_id.clone().expect("expected process id");
let mut detached = session
.services
.unified_exec_manager
.export_processes()
.await;
assert_eq!(detached.len(), 1);
let detached_entry = detached.pop().expect("expected detached entry");
assert_eq!(detached_entry.process_id, process_id);
assert_eq!(detached_entry.network_attempt_id, None);
let err = write_stdin(&session, process_id.as_str(), "", 100)
.await
.expect_err("detached process should be unavailable in old manager");
match err {
UnifiedExecError::UnknownProcessId { process_id: err_id } => {
assert_eq!(err_id, process_id);
}
other => panic!("expected UnknownProcessId, got {other:?}"),
}
detached_entry
.process
.writer_sender()
.send(b"echo detached-alive\n".to_vec())
.await
.expect("writer channel should be available while detached");
tokio::time::sleep(Duration::from_millis(250)).await;
let mut skipped_entries = Vec::new();
let import = session
.services
.unified_exec_manager
.import_processes(vec![detached_entry], &mut skipped_entries)
.await;
assert_eq!(import.reattached_count, 1);
assert_eq!(import.skipped_count, 0);
assert!(import.skipped_process_ids.is_empty());
assert!(skipped_entries.is_empty());
let resumed = write_stdin(&session, process_id.as_str(), "", 2_500).await?;
assert!(resumed.output.contains("detached-alive"));
write_stdin(&session, process_id.as_str(), "exit\n", 2_500).await?;
session
.services
.unified_exec_manager
.terminate_all_processes()
.await;
Ok(())
}
}

View File

@@ -129,7 +129,7 @@ impl UnifiedExecProcess {
self.process_handle.exit_code()
}
pub(super) fn terminate(&self) {
pub(crate) fn terminate(&self) {
self.output_closed.store(true, Ordering::Release);
self.output_closed_notify.notify_waiters();
self.process_handle.terminate();

View File

@@ -36,6 +36,7 @@ use crate::unified_exec::MIN_EMPTY_YIELD_TIME_MS;
use crate::unified_exec::MIN_YIELD_TIME_MS;
use crate::unified_exec::ProcessEntry;
use crate::unified_exec::ProcessStore;
use crate::unified_exec::ReattachSummary;
use crate::unified_exec::UnifiedExecContext;
use crate::unified_exec::UnifiedExecError;
use crate::unified_exec::UnifiedExecProcessManager;
@@ -140,6 +141,9 @@ impl UnifiedExecProcessManager {
store.remove(process_id)
};
if let Some(entry) = removed {
// Intentionally do not abort watcher tasks here. In attached flows
// (for example network denial), stream/exit watchers must finish so
// ExecCommandEnd is still emitted after forced termination.
Self::unregister_network_attempt_for_entry(&entry).await;
}
}
@@ -194,7 +198,7 @@ impl UnifiedExecProcessManager {
);
emitter.emit(event_ctx, ToolEventStage::Begin).await;
start_streaming_output(&process, context, Arc::clone(&transcript));
let stream_task = start_streaming_output(&process, context, Arc::clone(&transcript));
let max_tokens = resolve_max_tokens(request.max_output_tokens);
let yield_time_ms = clamp_yield_time(request.yield_time_ms);
@@ -247,6 +251,7 @@ impl UnifiedExecProcessManager {
)
.await;
stream_task.abort();
self.release_process_id(&request.process_id).await;
if let Some(deferred) = deferred_network_approval.as_ref()
&& let Some(message) =
@@ -297,6 +302,7 @@ impl UnifiedExecProcessManager {
request.tty,
network_attempt_id,
Arc::clone(&transcript),
stream_task,
)
.await;
};
@@ -430,6 +436,8 @@ impl UnifiedExecProcessManager {
let Some(entry) = store.remove(&process_id) else {
return ProcessStatus::Unknown;
};
// Do not abort watcher tasks on natural exit detection here.
// The exit watcher emits ExecCommandEnd after output draining.
ProcessStatus::Exited {
exit_code,
entry: Box::new(entry),
@@ -504,8 +512,32 @@ impl UnifiedExecProcessManager {
tty: bool,
network_attempt_id: Option<String>,
transcript: Arc<tokio::sync::Mutex<HeadTailBuffer>>,
stream_task: tokio::task::JoinHandle<()>,
) {
let network_attempt_id_for_watcher = network_attempt_id.clone();
let exit_task = spawn_exit_watcher(
Arc::clone(&process),
Arc::clone(&context.session),
Arc::clone(&context.turn),
context.call_id.clone(),
command.to_vec(),
cwd,
process_id.clone(),
Arc::clone(&transcript),
started_at,
);
let network_task = if context.turn.config.managed_network_requirements_enabled() {
network_attempt_id_for_watcher.map(|network_attempt_id| {
spawn_network_denial_watcher(
Arc::clone(&process),
Arc::clone(&context.session),
process_id.clone(),
network_attempt_id,
)
})
} else {
None
};
let entry = ProcessEntry {
process: Arc::clone(&process),
call_id: context.call_id.clone(),
@@ -515,6 +547,9 @@ impl UnifiedExecProcessManager {
network_attempt_id,
session: Arc::downgrade(&context.session),
last_used: started_at,
stream_task: Some(stream_task),
exit_task: Some(exit_task),
network_task,
};
let (number_processes, pruned_entry) = {
let mut store = self.process_store.lock().await;
@@ -524,7 +559,8 @@ impl UnifiedExecProcessManager {
};
// prune_processes_if_needed runs while holding process_store; do async
// network-approval cleanup only after dropping that lock.
if let Some(pruned_entry) = pruned_entry {
if let Some(mut pruned_entry) = pruned_entry {
Self::abort_all_watcher_tasks(&mut pruned_entry);
Self::unregister_network_attempt_for_entry(&pruned_entry).await;
pruned_entry.process.terminate();
}
@@ -538,29 +574,6 @@ impl UnifiedExecProcessManager {
)
.await;
};
spawn_exit_watcher(
Arc::clone(&process),
Arc::clone(&context.session),
Arc::clone(&context.turn),
context.call_id.clone(),
command.to_vec(),
cwd,
process_id.clone(),
transcript,
started_at,
);
if context.turn.config.managed_network_requirements_enabled()
&& let Some(network_attempt_id) = network_attempt_id_for_watcher
{
spawn_network_denial_watcher(
Arc::clone(&process),
Arc::clone(&context.session),
process_id,
network_attempt_id,
);
}
}
pub(crate) async fn open_session_with_exec_env(
@@ -791,11 +804,104 @@ impl UnifiedExecProcessManager {
entries
};
for entry in entries {
for mut entry in entries {
Self::abort_all_watcher_tasks(&mut entry);
Self::unregister_network_attempt_for_entry(&entry).await;
entry.process.terminate();
}
}
/// Export selected process entries from this manager without terminating
/// underlying child processes.
///
/// This drains the matching entries from the session-local store, removes
/// their reserved process IDs, aborts output/network watchers, keeps the
/// exit watcher alive while detached, and clears network approval linkage.
pub(crate) async fn export_processes(&self) -> Vec<ProcessEntry> {
let entries = {
let mut store = self.process_store.lock().await;
let mut selected = store.processes.keys().cloned().collect::<Vec<_>>();
selected.sort();
selected
.into_iter()
.filter_map(|process_id| store.remove(&process_id))
.collect::<Vec<_>>()
};
let mut exported = Vec::with_capacity(entries.len());
for mut entry in entries {
Self::abort_detach_watcher_tasks(&mut entry);
Self::unregister_network_attempt_for_entry(&entry).await;
entry.network_attempt_id = None;
exported.push(entry);
}
exported
}
/// Import previously detached process entries into this manager.
///
/// Process IDs are preserved and reserved in the destination manager so
/// existing `write_stdin(session_id=...)` IDs remain stable. Entries whose
/// IDs already exist are skipped and returned in the summary.
pub(crate) async fn import_processes(
&self,
entries: Vec<ProcessEntry>,
skipped_entries: &mut Vec<ProcessEntry>,
) -> ReattachSummary {
let mut reattached_count = 0usize;
let mut skipped_count = 0usize;
let mut skipped_process_ids = Vec::new();
let mut store = self.process_store.lock().await;
for mut entry in entries {
if store.processes.contains_key(&entry.process_id)
|| store.reserved_process_ids.contains(&entry.process_id)
{
// Keep existing attached ownership authoritative when there is
// an ID collision, and hand the detached entry back to caller.
skipped_count += 1;
skipped_process_ids.push(entry.process_id.clone());
skipped_entries.push(entry);
continue;
}
let process_id = entry.process_id.clone();
entry.network_attempt_id = None;
if let Some(task) = entry.exit_task.take() {
task.abort();
}
entry.session = std::sync::Weak::new();
entry.stream_task = None;
entry.network_task = None;
store.reserved_process_ids.insert(process_id.clone());
store.processes.insert(process_id, entry);
reattached_count += 1;
}
skipped_process_ids.sort();
ReattachSummary {
reattached_count,
skipped_count,
skipped_process_ids,
}
}
fn abort_detach_watcher_tasks(entry: &mut ProcessEntry) {
if let Some(task) = entry.stream_task.take() {
task.abort();
}
if let Some(task) = entry.network_task.take() {
task.abort();
}
}
fn abort_all_watcher_tasks(entry: &mut ProcessEntry) {
Self::abort_detach_watcher_tasks(entry);
if let Some(task) = entry.exit_task.take() {
task.abort();
}
}
}
enum ProcessStatus {