mirror of
https://github.com/openai/codex.git
synced 2026-05-29 23:40:29 +00:00
fix: serialize unix app-server startup (#23516)
# Summary Unix-socket app-server startup can currently race when multiple launch attempts target the same `CODEX_HOME`. Those processes can overlap before the control socket exists, which lets them enter SQLite state initialization concurrently and reproduce the startup corruption pattern seen in SSH mode. This change makes the app-server own that singleton startup guarantee. Unix-socket startup now takes a `CODEX_HOME`-scoped advisory lock before SQLite initialization, runs the existing control-socket preparation check while holding that lock, returns the established `AddrInUse` error when another live listener already owns the socket, and releases the lock once the new listener has bound its socket. # Design decisions - The singleton rule lives in `app-server --listen unix://`, not in a desktop-only caller path, so every Unix-socket launch gets the same race protection. - A duplicate raw app-server launch returns an error instead of silently succeeding. The attach operation remains `app-server proxy`, which continues to connect to an already-running listener. - The lock is held only across the dangerous startup window: socket preparation, SQLite initialization, and socket bind. It is not held for the app-server lifetime. - Listener detection stays in `prepare_control_socket_path(...)`, so the preexisting live-listener and stale-socket behavior remains the single source of truth. # Testing Tests: targeted Unix-socket transport tests on the branch checkout, full `codex-cli` build on `efrazer-db10`, and an SSH-style smoke on `efrazer-db10` covering concurrent app-server starts, explicit duplicate-start errors, and absence of SQLite startup-error matches in launch logs.
This commit is contained in:
@@ -6,6 +6,7 @@ pub use outgoing_message::OutgoingError;
|
||||
pub use outgoing_message::OutgoingMessage;
|
||||
pub use outgoing_message::OutgoingResponse;
|
||||
pub use outgoing_message::QueuedOutgoingMessage;
|
||||
pub use transport::AppServerStartupLock;
|
||||
pub use transport::AppServerTransport;
|
||||
pub use transport::AppServerTransportParseError;
|
||||
pub use transport::CHANNEL_CAPACITY;
|
||||
@@ -14,8 +15,11 @@ pub use transport::RemoteControlHandle;
|
||||
pub use transport::RemoteControlStartConfig;
|
||||
pub use transport::RemoteControlUnavailable;
|
||||
pub use transport::TransportEvent;
|
||||
pub use transport::acquire_app_server_startup_lock;
|
||||
pub use transport::app_server_control_socket_path;
|
||||
pub use transport::app_server_startup_lock_path;
|
||||
pub use transport::auth;
|
||||
pub use transport::prepare_control_socket_path;
|
||||
pub use transport::start_control_socket_acceptor;
|
||||
pub use transport::start_remote_control;
|
||||
pub use transport::start_stdio_connection;
|
||||
|
||||
@@ -35,6 +35,9 @@ pub use remote_control::RemoteControlStartConfig;
|
||||
pub use remote_control::RemoteControlUnavailable;
|
||||
pub use remote_control::start_remote_control;
|
||||
pub use stdio::start_stdio_connection;
|
||||
pub use unix_socket::AppServerStartupLock;
|
||||
pub use unix_socket::acquire_app_server_startup_lock;
|
||||
pub use unix_socket::prepare_control_socket_path;
|
||||
pub use unix_socket::start_control_socket_acceptor;
|
||||
pub use websocket::start_websocket_acceptor;
|
||||
|
||||
@@ -42,6 +45,7 @@ const OVERLOADED_ERROR_CODE: i64 = -32001;
|
||||
|
||||
const APP_SERVER_CONTROL_SOCKET_DIR_NAME: &str = "app-server-control";
|
||||
const APP_SERVER_CONTROL_SOCKET_FILE_NAME: &str = "app-server-control.sock";
|
||||
const APP_SERVER_STARTUP_LOCK_FILE_NAME: &str = "app-server-startup.lock";
|
||||
|
||||
pub fn app_server_control_socket_path(codex_home: &Path) -> std::io::Result<AbsolutePathBuf> {
|
||||
AbsolutePathBuf::from_absolute_path(
|
||||
@@ -51,6 +55,14 @@ pub fn app_server_control_socket_path(codex_home: &Path) -> std::io::Result<Abso
|
||||
)
|
||||
}
|
||||
|
||||
pub fn app_server_startup_lock_path(codex_home: &Path) -> std::io::Result<AbsolutePathBuf> {
|
||||
AbsolutePathBuf::from_absolute_path(
|
||||
codex_home
|
||||
.join(APP_SERVER_CONTROL_SOCKET_DIR_NAME)
|
||||
.join(APP_SERVER_STARTUP_LOCK_FILE_NAME),
|
||||
)
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub enum AppServerTransport {
|
||||
Stdio,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::ErrorKind;
|
||||
use std::io::Result as IoResult;
|
||||
use std::path::Path;
|
||||
@@ -89,7 +90,7 @@ async fn run_control_socket_acceptor(
|
||||
info!("control socket acceptor shutting down");
|
||||
}
|
||||
|
||||
async fn prepare_control_socket_path(socket_path: &Path) -> IoResult<()> {
|
||||
pub async fn prepare_control_socket_path(socket_path: &Path) -> IoResult<()> {
|
||||
if let Some(parent) = socket_path.parent() {
|
||||
codex_uds::prepare_private_socket_directory(parent).await?;
|
||||
}
|
||||
@@ -130,6 +131,30 @@ async fn prepare_control_socket_path(socket_path: &Path) -> IoResult<()> {
|
||||
tokio::fs::remove_file(socket_path).await
|
||||
}
|
||||
|
||||
pub struct AppServerStartupLock {
|
||||
_file: std::fs::File,
|
||||
}
|
||||
|
||||
pub async fn acquire_app_server_startup_lock(
|
||||
startup_lock_path: AbsolutePathBuf,
|
||||
) -> IoResult<AppServerStartupLock> {
|
||||
if let Some(parent) = startup_lock_path.as_path().parent() {
|
||||
codex_uds::prepare_private_socket_directory(parent).await?;
|
||||
}
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let file = OpenOptions::new()
|
||||
.create(true)
|
||||
.truncate(false)
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(startup_lock_path.as_path())?;
|
||||
file.lock()?;
|
||||
Ok(AppServerStartupLock { _file: file })
|
||||
})
|
||||
.await
|
||||
.map_err(|err| std::io::Error::other(format!("startup lock task failed: {err}")))?
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
async fn set_control_socket_permissions(socket_path: &Path) -> IoResult<()> {
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use super::AppServerTransport;
|
||||
use super::CHANNEL_CAPACITY;
|
||||
use super::TransportEvent;
|
||||
use super::acquire_app_server_startup_lock;
|
||||
use super::app_server_control_socket_path;
|
||||
use super::start_control_socket_acceptor;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
@@ -140,6 +141,28 @@ async fn control_socket_acceptor_upgrades_and_forwards_websocket_text_messages_a
|
||||
assert_socket_path_removed(socket_path.as_path());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn app_server_startup_lock_serializes_waiters() {
|
||||
let temp_dir = tempfile::TempDir::new().expect("temp dir");
|
||||
let lock_path = test_startup_lock_path(temp_dir.path());
|
||||
let first_lock = acquire_app_server_startup_lock(lock_path.clone())
|
||||
.await
|
||||
.expect("first startup lock should succeed");
|
||||
let mut second_lock = tokio::spawn(acquire_app_server_startup_lock(lock_path));
|
||||
|
||||
assert!(
|
||||
timeout(Duration::from_millis(100), &mut second_lock)
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
|
||||
drop(first_lock);
|
||||
second_lock
|
||||
.await
|
||||
.expect("second startup lock task should join")
|
||||
.expect("second startup lock should succeed");
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test]
|
||||
async fn control_socket_file_is_private_after_bind() {
|
||||
@@ -185,6 +208,15 @@ fn test_socket_path(temp_dir: &Path) -> AbsolutePathBuf {
|
||||
.expect("socket path should resolve")
|
||||
}
|
||||
|
||||
fn test_startup_lock_path(temp_dir: &Path) -> AbsolutePathBuf {
|
||||
AbsolutePathBuf::from_absolute_path(
|
||||
temp_dir
|
||||
.join("app-server-control")
|
||||
.join("app-server-startup.lock"),
|
||||
)
|
||||
.expect("startup lock path should resolve")
|
||||
}
|
||||
|
||||
async fn connect_to_socket(socket_path: &Path) -> IoResult<UnixStream> {
|
||||
UnixStream::connect(socket_path).await
|
||||
}
|
||||
|
||||
@@ -31,7 +31,10 @@ use crate::transport::ConnectionState;
|
||||
use crate::transport::OutboundConnectionState;
|
||||
use crate::transport::RemoteControlStartConfig;
|
||||
use crate::transport::TransportEvent;
|
||||
use crate::transport::acquire_app_server_startup_lock;
|
||||
use crate::transport::app_server_startup_lock_path;
|
||||
use crate::transport::auth::policy_from_settings;
|
||||
use crate::transport::prepare_control_socket_path;
|
||||
use crate::transport::route_outgoing_envelope;
|
||||
use crate::transport::start_control_socket_acceptor;
|
||||
use crate::transport::start_remote_control;
|
||||
@@ -516,6 +519,15 @@ pub async fn run_main_with_transport_options(
|
||||
})?;
|
||||
codex_core::otel_init::record_process_start(otel.as_ref(), OTEL_SERVICE_NAME);
|
||||
codex_core::otel_init::install_sqlite_telemetry(otel.as_ref(), OTEL_SERVICE_NAME);
|
||||
let unix_socket_startup_lock = match &transport {
|
||||
AppServerTransport::UnixSocket { socket_path } => {
|
||||
let startup_lock_path = app_server_startup_lock_path(&codex_home)?;
|
||||
let startup_lock = acquire_app_server_startup_lock(startup_lock_path).await?;
|
||||
prepare_control_socket_path(socket_path.as_path()).await?;
|
||||
Some(startup_lock)
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
let state_db = match rollout_state_db::try_init(&config).await {
|
||||
Ok(state_db) => Some(state_db),
|
||||
Err(err) => {
|
||||
@@ -682,6 +694,7 @@ pub async fn run_main_with_transport_options(
|
||||
}
|
||||
AppServerTransport::Off => {}
|
||||
}
|
||||
drop(unix_socket_startup_lock);
|
||||
|
||||
let auth_manager =
|
||||
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;
|
||||
|
||||
@@ -22,8 +22,11 @@ pub(crate) use codex_app_server_transport::RemoteControlHandle;
|
||||
pub(crate) use codex_app_server_transport::RemoteControlStartConfig;
|
||||
pub(crate) use codex_app_server_transport::RemoteControlUnavailable;
|
||||
pub(crate) use codex_app_server_transport::TransportEvent;
|
||||
pub(crate) use codex_app_server_transport::acquire_app_server_startup_lock;
|
||||
pub use codex_app_server_transport::app_server_control_socket_path;
|
||||
pub(crate) use codex_app_server_transport::app_server_startup_lock_path;
|
||||
pub use codex_app_server_transport::auth;
|
||||
pub(crate) use codex_app_server_transport::prepare_control_socket_path;
|
||||
pub(crate) use codex_app_server_transport::start_control_socket_acceptor;
|
||||
pub(crate) use codex_app_server_transport::start_remote_control;
|
||||
pub(crate) use codex_app_server_transport::start_stdio_connection;
|
||||
|
||||
Reference in New Issue
Block a user