Files
codex/codex-rs/exec-server/src/server/session_registry.rs
jif-oai 085ffb4456 feat: move exec-server ownership (#16344)
This introduces session-scoped ownership for exec-server so ws
disconnects no longer immediately kill running remote exec processes,
and it prepares the protocol for reconnect-based resume.
- add session_id / resume_session_id to the exec-server initialize
handshake
  - move process ownership under a shared session registry
- detach sessions on websocket disconnect and expire them after a TTL
instead of killing processes immediately (we will resume based on this)
- allow a new connection to resume an existing session and take over
notifications/ownership
- I use UUID to make them not predictable as we don't have auth for now
- make detached-session expiry authoritative at resume time so teardown
wins at the TTL boundary
- reject long-poll process/read calls that get resumed out from under an
older attachment

---------

Co-authored-by: Codex <noreply@openai.com>
2026-04-10 14:11:47 +01:00

260 lines
7.9 KiB
Rust

use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::time::Duration;
use codex_app_server_protocol::JSONRPCErrorError;
use tokio::sync::Mutex;
use uuid::Uuid;
use crate::rpc::RpcNotificationSender;
use crate::rpc::invalid_request;
use crate::server::process_handler::ProcessHandler;
#[cfg(test)]
const DETACHED_SESSION_TTL: Duration = Duration::from_millis(200);
#[cfg(not(test))]
const DETACHED_SESSION_TTL: Duration = Duration::from_secs(10);
pub(crate) struct SessionRegistry {
sessions: Mutex<HashMap<String, Arc<SessionEntry>>>,
}
struct SessionEntry {
session_id: String,
process: ProcessHandler,
attachment: StdMutex<AttachmentState>,
}
struct AttachmentState {
current_connection_id: Option<ConnectionId>,
detached_connection_id: Option<ConnectionId>,
detached_expires_at: Option<tokio::time::Instant>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
struct ConnectionId(Uuid);
impl std::fmt::Display for ConnectionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
#[derive(Clone)]
pub(crate) struct SessionHandle {
registry: Arc<SessionRegistry>,
entry: Arc<SessionEntry>,
connection_id: ConnectionId,
}
impl SessionRegistry {
pub(crate) fn new() -> Arc<Self> {
Arc::new(Self {
sessions: Mutex::new(HashMap::new()),
})
}
pub(crate) async fn attach(
self: &Arc<Self>,
resume_session_id: Option<String>,
notifications: RpcNotificationSender,
) -> Result<SessionHandle, JSONRPCErrorError> {
enum AttachOutcome {
Attached(Arc<SessionEntry>),
Expired {
session_id: String,
entry: Arc<SessionEntry>,
},
}
let connection_id = ConnectionId(Uuid::new_v4());
let outcome = {
let mut sessions = self.sessions.lock().await;
if let Some(session_id) = resume_session_id {
let entry = sessions
.get(&session_id)
.cloned()
.ok_or_else(|| invalid_request(format!("unknown session id {session_id}")))?;
if entry.is_expired(tokio::time::Instant::now()) {
let entry = sessions.remove(&session_id).ok_or_else(|| {
invalid_request(format!("unknown session id {session_id}"))
})?;
Ok(AttachOutcome::Expired { session_id, entry })
} else if entry.has_active_connection() {
Err(invalid_request(format!(
"session {session_id} is already attached to another connection"
)))
} else {
entry.process.set_notification_sender(Some(notifications));
entry.attach(connection_id);
Ok(AttachOutcome::Attached(entry))
}
} else {
let session_id = Uuid::new_v4().to_string();
let entry = Arc::new(SessionEntry::new(
session_id.clone(),
ProcessHandler::new(notifications),
connection_id,
));
sessions.insert(session_id, Arc::clone(&entry));
Ok(AttachOutcome::Attached(entry))
}
};
let entry = match outcome? {
AttachOutcome::Attached(entry) => entry,
AttachOutcome::Expired { session_id, entry } => {
entry.process.shutdown().await;
return Err(invalid_request(format!("unknown session id {session_id}")));
}
};
Ok(SessionHandle {
registry: Arc::clone(self),
entry,
connection_id,
})
}
async fn expire_if_detached(&self, session_id: String, connection_id: ConnectionId) {
tokio::time::sleep(DETACHED_SESSION_TTL).await;
let removed = {
let mut sessions = self.sessions.lock().await;
let Some(entry) = sessions.get(&session_id) else {
return;
};
if !entry.is_detached_connection_expired(connection_id, tokio::time::Instant::now()) {
return;
}
sessions.remove(&session_id)
};
if let Some(entry) = removed {
entry.process.shutdown().await;
}
}
}
impl Default for SessionRegistry {
fn default() -> Self {
Self {
sessions: Mutex::new(HashMap::new()),
}
}
}
impl SessionEntry {
fn new(session_id: String, process: ProcessHandler, connection_id: ConnectionId) -> Self {
Self {
session_id,
process,
attachment: StdMutex::new(AttachmentState {
current_connection_id: Some(connection_id),
detached_connection_id: None,
detached_expires_at: None,
}),
}
}
fn attach(&self, connection_id: ConnectionId) {
let mut attachment = self
.attachment
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
attachment.current_connection_id = Some(connection_id);
attachment.detached_connection_id = None;
attachment.detached_expires_at = None;
}
fn detach(&self, connection_id: ConnectionId) -> bool {
let mut attachment = self
.attachment
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if attachment.current_connection_id != Some(connection_id) {
return false;
}
attachment.current_connection_id = None;
attachment.detached_connection_id = Some(connection_id);
attachment.detached_expires_at = Some(tokio::time::Instant::now() + DETACHED_SESSION_TTL);
true
}
fn has_active_connection(&self) -> bool {
self.attachment
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.current_connection_id
.is_some()
}
fn is_attached_to(&self, connection_id: ConnectionId) -> bool {
self.attachment
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.current_connection_id
== Some(connection_id)
}
fn is_expired(&self, now: tokio::time::Instant) -> bool {
self.attachment
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.detached_expires_at
.is_some_and(|deadline| now >= deadline)
}
fn is_detached_connection_expired(
&self,
connection_id: ConnectionId,
now: tokio::time::Instant,
) -> bool {
let attachment = self
.attachment
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
attachment.current_connection_id.is_none()
&& attachment.detached_connection_id == Some(connection_id)
&& attachment
.detached_expires_at
.is_some_and(|deadline| now >= deadline)
}
}
impl SessionHandle {
pub(crate) fn session_id(&self) -> &str {
&self.entry.session_id
}
pub(crate) fn connection_id(&self) -> String {
self.connection_id.to_string()
}
pub(crate) fn is_session_attached(&self) -> bool {
self.entry.is_attached_to(self.connection_id)
}
pub(crate) fn process(&self) -> &ProcessHandler {
&self.entry.process
}
pub(crate) async fn detach(&self) {
if !self.entry.detach(self.connection_id) {
return;
}
self.entry
.process
.set_notification_sender(/*notifications*/ None);
let registry = Arc::clone(&self.registry);
let session_id = self.entry.session_id.clone();
let connection_id = self.connection_id;
tokio::spawn(async move {
registry.expire_if_detached(session_id, connection_id).await;
});
}
}