diff --git a/codex-rs/app-server-transport/src/lib.rs b/codex-rs/app-server-transport/src/lib.rs index f4eb1fc6a5..8608102f36 100644 --- a/codex-rs/app-server-transport/src/lib.rs +++ b/codex-rs/app-server-transport/src/lib.rs @@ -6,6 +6,7 @@ pub use outgoing_message::OutgoingError; pub use outgoing_message::OutgoingMessage; pub use outgoing_message::OutgoingResponse; pub use outgoing_message::QueuedOutgoingMessage; +pub use transport::AppServerStartupLock; pub use transport::AppServerTransport; pub use transport::AppServerTransportParseError; pub use transport::CHANNEL_CAPACITY; @@ -14,8 +15,11 @@ pub use transport::RemoteControlHandle; pub use transport::RemoteControlStartConfig; pub use transport::RemoteControlUnavailable; pub use transport::TransportEvent; +pub use transport::acquire_app_server_startup_lock; pub use transport::app_server_control_socket_path; +pub use transport::app_server_startup_lock_path; pub use transport::auth; +pub use transport::prepare_control_socket_path; pub use transport::start_control_socket_acceptor; pub use transport::start_remote_control; pub use transport::start_stdio_connection; diff --git a/codex-rs/app-server-transport/src/transport/mod.rs b/codex-rs/app-server-transport/src/transport/mod.rs index a9e806c128..7452f06938 100644 --- a/codex-rs/app-server-transport/src/transport/mod.rs +++ b/codex-rs/app-server-transport/src/transport/mod.rs @@ -35,6 +35,9 @@ pub use remote_control::RemoteControlStartConfig; pub use remote_control::RemoteControlUnavailable; pub use remote_control::start_remote_control; pub use stdio::start_stdio_connection; +pub use unix_socket::AppServerStartupLock; +pub use unix_socket::acquire_app_server_startup_lock; +pub use unix_socket::prepare_control_socket_path; pub use unix_socket::start_control_socket_acceptor; pub use websocket::start_websocket_acceptor; @@ -42,6 +45,7 @@ const OVERLOADED_ERROR_CODE: i64 = -32001; const APP_SERVER_CONTROL_SOCKET_DIR_NAME: &str = "app-server-control"; const APP_SERVER_CONTROL_SOCKET_FILE_NAME: &str = "app-server-control.sock"; +const APP_SERVER_STARTUP_LOCK_FILE_NAME: &str = "app-server-startup.lock"; pub fn app_server_control_socket_path(codex_home: &Path) -> std::io::Result { AbsolutePathBuf::from_absolute_path( @@ -51,6 +55,14 @@ pub fn app_server_control_socket_path(codex_home: &Path) -> std::io::Result std::io::Result { + AbsolutePathBuf::from_absolute_path( + codex_home + .join(APP_SERVER_CONTROL_SOCKET_DIR_NAME) + .join(APP_SERVER_STARTUP_LOCK_FILE_NAME), + ) +} + #[derive(Clone, Debug, Eq, PartialEq)] pub enum AppServerTransport { Stdio, diff --git a/codex-rs/app-server-transport/src/transport/unix_socket.rs b/codex-rs/app-server-transport/src/transport/unix_socket.rs index f75d3fe99a..dd46d88280 100644 --- a/codex-rs/app-server-transport/src/transport/unix_socket.rs +++ b/codex-rs/app-server-transport/src/transport/unix_socket.rs @@ -1,3 +1,4 @@ +use std::fs::OpenOptions; use std::io::ErrorKind; use std::io::Result as IoResult; use std::path::Path; @@ -89,7 +90,7 @@ async fn run_control_socket_acceptor( info!("control socket acceptor shutting down"); } -async fn prepare_control_socket_path(socket_path: &Path) -> IoResult<()> { +pub async fn prepare_control_socket_path(socket_path: &Path) -> IoResult<()> { if let Some(parent) = socket_path.parent() { codex_uds::prepare_private_socket_directory(parent).await?; } @@ -130,6 +131,30 @@ async fn prepare_control_socket_path(socket_path: &Path) -> IoResult<()> { tokio::fs::remove_file(socket_path).await } +pub struct AppServerStartupLock { + _file: std::fs::File, +} + +pub async fn acquire_app_server_startup_lock( + startup_lock_path: AbsolutePathBuf, +) -> IoResult { + if let Some(parent) = startup_lock_path.as_path().parent() { + codex_uds::prepare_private_socket_directory(parent).await?; + } + tokio::task::spawn_blocking(move || { + let file = OpenOptions::new() + .create(true) + .truncate(false) + .read(true) + .write(true) + .open(startup_lock_path.as_path())?; + file.lock()?; + Ok(AppServerStartupLock { _file: file }) + }) + .await + .map_err(|err| std::io::Error::other(format!("startup lock task failed: {err}")))? +} + #[cfg(unix)] async fn set_control_socket_permissions(socket_path: &Path) -> IoResult<()> { use std::os::unix::fs::PermissionsExt; diff --git a/codex-rs/app-server-transport/src/transport/unix_socket_tests.rs b/codex-rs/app-server-transport/src/transport/unix_socket_tests.rs index 0b7dec0a23..ac0b2b00c4 100644 --- a/codex-rs/app-server-transport/src/transport/unix_socket_tests.rs +++ b/codex-rs/app-server-transport/src/transport/unix_socket_tests.rs @@ -1,6 +1,7 @@ use super::AppServerTransport; use super::CHANNEL_CAPACITY; use super::TransportEvent; +use super::acquire_app_server_startup_lock; use super::app_server_control_socket_path; use super::start_control_socket_acceptor; use codex_app_server_protocol::JSONRPCMessage; @@ -140,6 +141,28 @@ async fn control_socket_acceptor_upgrades_and_forwards_websocket_text_messages_a assert_socket_path_removed(socket_path.as_path()); } +#[tokio::test] +async fn app_server_startup_lock_serializes_waiters() { + let temp_dir = tempfile::TempDir::new().expect("temp dir"); + let lock_path = test_startup_lock_path(temp_dir.path()); + let first_lock = acquire_app_server_startup_lock(lock_path.clone()) + .await + .expect("first startup lock should succeed"); + let mut second_lock = tokio::spawn(acquire_app_server_startup_lock(lock_path)); + + assert!( + timeout(Duration::from_millis(100), &mut second_lock) + .await + .is_err() + ); + + drop(first_lock); + second_lock + .await + .expect("second startup lock task should join") + .expect("second startup lock should succeed"); +} + #[cfg(unix)] #[tokio::test] async fn control_socket_file_is_private_after_bind() { @@ -185,6 +208,15 @@ fn test_socket_path(temp_dir: &Path) -> AbsolutePathBuf { .expect("socket path should resolve") } +fn test_startup_lock_path(temp_dir: &Path) -> AbsolutePathBuf { + AbsolutePathBuf::from_absolute_path( + temp_dir + .join("app-server-control") + .join("app-server-startup.lock"), + ) + .expect("startup lock path should resolve") +} + async fn connect_to_socket(socket_path: &Path) -> IoResult { UnixStream::connect(socket_path).await } diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 7b7cbc11c3..ef4106aa14 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -31,7 +31,10 @@ use crate::transport::ConnectionState; use crate::transport::OutboundConnectionState; use crate::transport::RemoteControlStartConfig; use crate::transport::TransportEvent; +use crate::transport::acquire_app_server_startup_lock; +use crate::transport::app_server_startup_lock_path; use crate::transport::auth::policy_from_settings; +use crate::transport::prepare_control_socket_path; use crate::transport::route_outgoing_envelope; use crate::transport::start_control_socket_acceptor; use crate::transport::start_remote_control; @@ -516,6 +519,15 @@ pub async fn run_main_with_transport_options( })?; codex_core::otel_init::record_process_start(otel.as_ref(), OTEL_SERVICE_NAME); codex_core::otel_init::install_sqlite_telemetry(otel.as_ref(), OTEL_SERVICE_NAME); + let unix_socket_startup_lock = match &transport { + AppServerTransport::UnixSocket { socket_path } => { + let startup_lock_path = app_server_startup_lock_path(&codex_home)?; + let startup_lock = acquire_app_server_startup_lock(startup_lock_path).await?; + prepare_control_socket_path(socket_path.as_path()).await?; + Some(startup_lock) + } + _ => None, + }; let state_db = match rollout_state_db::try_init(&config).await { Ok(state_db) => Some(state_db), Err(err) => { @@ -682,6 +694,7 @@ pub async fn run_main_with_transport_options( } AppServerTransport::Off => {} } + drop(unix_socket_startup_lock); let auth_manager = AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await; diff --git a/codex-rs/app-server/src/transport.rs b/codex-rs/app-server/src/transport.rs index f7222bd28f..2be5b55414 100644 --- a/codex-rs/app-server/src/transport.rs +++ b/codex-rs/app-server/src/transport.rs @@ -22,8 +22,11 @@ pub(crate) use codex_app_server_transport::RemoteControlHandle; pub(crate) use codex_app_server_transport::RemoteControlStartConfig; pub(crate) use codex_app_server_transport::RemoteControlUnavailable; pub(crate) use codex_app_server_transport::TransportEvent; +pub(crate) use codex_app_server_transport::acquire_app_server_startup_lock; pub use codex_app_server_transport::app_server_control_socket_path; +pub(crate) use codex_app_server_transport::app_server_startup_lock_path; pub use codex_app_server_transport::auth; +pub(crate) use codex_app_server_transport::prepare_control_socket_path; pub(crate) use codex_app_server_transport::start_control_socket_acceptor; pub(crate) use codex_app_server_transport::start_remote_control; pub(crate) use codex_app_server_transport::start_stdio_connection;