mirror of
https://github.com/openai/codex.git
synced 2026-05-20 03:05:02 +00:00
# Summary Unix-socket app-server startup can currently race when multiple launch attempts target the same `CODEX_HOME`. Those processes can overlap before the control socket exists, which lets them enter SQLite state initialization concurrently and reproduce the startup corruption pattern seen in SSH mode. This change makes the app-server own that singleton startup guarantee. Unix-socket startup now takes a `CODEX_HOME`-scoped advisory lock before SQLite initialization, runs the existing control-socket preparation check while holding that lock, returns the established `AddrInUse` error when another live listener already owns the socket, and releases the lock once the new listener has bound its socket. # Design decisions - The singleton rule lives in `app-server --listen unix://`, not in a desktop-only caller path, so every Unix-socket launch gets the same race protection. - A duplicate raw app-server launch returns an error instead of silently succeeding. The attach operation remains `app-server proxy`, which continues to connect to an already-running listener. - The lock is held only across the dangerous startup window: socket preparation, SQLite initialization, and socket bind. It is not held for the app-server lifetime. - Listener detection stays in `prepare_control_socket_path(...)`, so the preexisting live-listener and stale-socket behavior remains the single source of truth. # Testing Tests: targeted Unix-socket transport tests on the branch checkout, full `codex-cli` build on `efrazer-db10`, and an SSH-style smoke on `efrazer-db10` covering concurrent app-server starts, explicit duplicate-start errors, and absence of SQLite startup-error matches in launch logs.
238 lines
8.2 KiB
Rust
238 lines
8.2 KiB
Rust
use crate::message_processor::ConnectionSessionState;
|
|
use crate::outgoing_message::OutgoingEnvelope;
|
|
use codex_app_server_protocol::ExperimentalApi;
|
|
use codex_app_server_protocol::ServerRequest;
|
|
use std::collections::HashMap;
|
|
use std::collections::HashSet;
|
|
use std::sync::Arc;
|
|
use std::sync::RwLock;
|
|
use std::sync::atomic::AtomicBool;
|
|
use std::sync::atomic::Ordering;
|
|
use tokio::sync::mpsc;
|
|
use tokio_util::sync::CancellationToken;
|
|
use tracing::warn;
|
|
|
|
pub use codex_app_server_transport::AppServerTransport;
|
|
pub(crate) use codex_app_server_transport::CHANNEL_CAPACITY;
|
|
pub(crate) use codex_app_server_transport::ConnectionId;
|
|
pub(crate) use codex_app_server_transport::ConnectionOrigin;
|
|
pub(crate) use codex_app_server_transport::OutgoingMessage;
|
|
pub(crate) use codex_app_server_transport::QueuedOutgoingMessage;
|
|
pub(crate) use codex_app_server_transport::RemoteControlHandle;
|
|
pub(crate) use codex_app_server_transport::RemoteControlStartConfig;
|
|
pub(crate) use codex_app_server_transport::RemoteControlUnavailable;
|
|
pub(crate) use codex_app_server_transport::TransportEvent;
|
|
pub(crate) use codex_app_server_transport::acquire_app_server_startup_lock;
|
|
pub use codex_app_server_transport::app_server_control_socket_path;
|
|
pub(crate) use codex_app_server_transport::app_server_startup_lock_path;
|
|
pub use codex_app_server_transport::auth;
|
|
pub(crate) use codex_app_server_transport::prepare_control_socket_path;
|
|
pub(crate) use codex_app_server_transport::start_control_socket_acceptor;
|
|
pub(crate) use codex_app_server_transport::start_remote_control;
|
|
pub(crate) use codex_app_server_transport::start_stdio_connection;
|
|
pub(crate) use codex_app_server_transport::start_websocket_acceptor;
|
|
|
|
pub(crate) struct ConnectionState {
|
|
pub(crate) outbound_initialized: Arc<AtomicBool>,
|
|
pub(crate) outbound_experimental_api_enabled: Arc<AtomicBool>,
|
|
pub(crate) outbound_opted_out_notification_methods: Arc<RwLock<HashSet<String>>>,
|
|
pub(crate) session: Arc<ConnectionSessionState>,
|
|
}
|
|
|
|
impl ConnectionState {
|
|
pub(crate) fn new(
|
|
_origin: ConnectionOrigin,
|
|
outbound_initialized: Arc<AtomicBool>,
|
|
outbound_experimental_api_enabled: Arc<AtomicBool>,
|
|
outbound_opted_out_notification_methods: Arc<RwLock<HashSet<String>>>,
|
|
) -> Self {
|
|
Self {
|
|
outbound_initialized,
|
|
outbound_experimental_api_enabled,
|
|
outbound_opted_out_notification_methods,
|
|
session: Arc::new(ConnectionSessionState::new()),
|
|
}
|
|
}
|
|
}
|
|
|
|
pub(crate) struct OutboundConnectionState {
|
|
pub(crate) initialized: Arc<AtomicBool>,
|
|
pub(crate) experimental_api_enabled: Arc<AtomicBool>,
|
|
pub(crate) opted_out_notification_methods: Arc<RwLock<HashSet<String>>>,
|
|
pub(crate) writer: mpsc::Sender<QueuedOutgoingMessage>,
|
|
disconnect_sender: Option<CancellationToken>,
|
|
}
|
|
|
|
impl OutboundConnectionState {
|
|
pub(crate) fn new(
|
|
writer: mpsc::Sender<QueuedOutgoingMessage>,
|
|
initialized: Arc<AtomicBool>,
|
|
experimental_api_enabled: Arc<AtomicBool>,
|
|
opted_out_notification_methods: Arc<RwLock<HashSet<String>>>,
|
|
disconnect_sender: Option<CancellationToken>,
|
|
) -> Self {
|
|
Self {
|
|
initialized,
|
|
experimental_api_enabled,
|
|
opted_out_notification_methods,
|
|
writer,
|
|
disconnect_sender,
|
|
}
|
|
}
|
|
|
|
fn can_disconnect(&self) -> bool {
|
|
self.disconnect_sender.is_some()
|
|
}
|
|
|
|
pub(crate) fn request_disconnect(&self) {
|
|
if let Some(disconnect_sender) = &self.disconnect_sender {
|
|
disconnect_sender.cancel();
|
|
}
|
|
}
|
|
}
|
|
|
|
fn should_skip_notification_for_connection(
|
|
connection_state: &OutboundConnectionState,
|
|
message: &OutgoingMessage,
|
|
) -> bool {
|
|
let Ok(opted_out_notification_methods) = connection_state.opted_out_notification_methods.read()
|
|
else {
|
|
warn!("failed to read outbound opted-out notifications");
|
|
return false;
|
|
};
|
|
match message {
|
|
OutgoingMessage::AppServerNotification(notification) => {
|
|
if notification.experimental_reason().is_some()
|
|
&& !connection_state
|
|
.experimental_api_enabled
|
|
.load(Ordering::Acquire)
|
|
{
|
|
return true;
|
|
}
|
|
let method = notification.to_string();
|
|
opted_out_notification_methods.contains(method.as_str())
|
|
}
|
|
_ => false,
|
|
}
|
|
}
|
|
|
|
fn disconnect_connection(
|
|
connections: &mut HashMap<ConnectionId, OutboundConnectionState>,
|
|
connection_id: ConnectionId,
|
|
) -> bool {
|
|
if let Some(connection_state) = connections.remove(&connection_id) {
|
|
connection_state.request_disconnect();
|
|
return true;
|
|
}
|
|
false
|
|
}
|
|
|
|
async fn send_message_to_connection(
|
|
connections: &mut HashMap<ConnectionId, OutboundConnectionState>,
|
|
connection_id: ConnectionId,
|
|
message: OutgoingMessage,
|
|
write_complete_tx: Option<tokio::sync::oneshot::Sender<()>>,
|
|
) -> bool {
|
|
let Some(connection_state) = connections.get(&connection_id) else {
|
|
warn!("dropping message for disconnected connection: {connection_id:?}");
|
|
return false;
|
|
};
|
|
let message = filter_outgoing_message_for_connection(connection_state, message);
|
|
if should_skip_notification_for_connection(connection_state, &message) {
|
|
return false;
|
|
}
|
|
|
|
let writer = connection_state.writer.clone();
|
|
let queued_message = QueuedOutgoingMessage {
|
|
message,
|
|
write_complete_tx,
|
|
};
|
|
if connection_state.can_disconnect() {
|
|
match writer.try_send(queued_message) {
|
|
Ok(()) => false,
|
|
Err(mpsc::error::TrySendError::Full(_)) => {
|
|
warn!(
|
|
"disconnecting slow connection after outbound queue filled: {connection_id:?}"
|
|
);
|
|
disconnect_connection(connections, connection_id)
|
|
}
|
|
Err(mpsc::error::TrySendError::Closed(_)) => {
|
|
disconnect_connection(connections, connection_id)
|
|
}
|
|
}
|
|
} else if writer.send(queued_message).await.is_err() {
|
|
disconnect_connection(connections, connection_id)
|
|
} else {
|
|
false
|
|
}
|
|
}
|
|
|
|
fn filter_outgoing_message_for_connection(
|
|
connection_state: &OutboundConnectionState,
|
|
message: OutgoingMessage,
|
|
) -> OutgoingMessage {
|
|
let experimental_api_enabled = connection_state
|
|
.experimental_api_enabled
|
|
.load(Ordering::Acquire);
|
|
match message {
|
|
OutgoingMessage::Request(ServerRequest::CommandExecutionRequestApproval {
|
|
request_id,
|
|
mut params,
|
|
}) => {
|
|
if !experimental_api_enabled {
|
|
params.strip_experimental_fields();
|
|
}
|
|
OutgoingMessage::Request(ServerRequest::CommandExecutionRequestApproval {
|
|
request_id,
|
|
params,
|
|
})
|
|
}
|
|
_ => message,
|
|
}
|
|
}
|
|
|
|
pub(crate) async fn route_outgoing_envelope(
|
|
connections: &mut HashMap<ConnectionId, OutboundConnectionState>,
|
|
envelope: OutgoingEnvelope,
|
|
) {
|
|
match envelope {
|
|
OutgoingEnvelope::ToConnection {
|
|
connection_id,
|
|
message,
|
|
write_complete_tx,
|
|
} => {
|
|
let _ =
|
|
send_message_to_connection(connections, connection_id, message, write_complete_tx)
|
|
.await;
|
|
}
|
|
OutgoingEnvelope::Broadcast { message } => {
|
|
let target_connections: Vec<ConnectionId> = connections
|
|
.iter()
|
|
.filter_map(|(connection_id, connection_state)| {
|
|
if connection_state.initialized.load(Ordering::Acquire)
|
|
&& !should_skip_notification_for_connection(connection_state, &message)
|
|
{
|
|
Some(*connection_id)
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.collect();
|
|
|
|
for connection_id in target_connections {
|
|
let _ = send_message_to_connection(
|
|
connections,
|
|
connection_id,
|
|
message.clone(),
|
|
/*write_complete_tx*/ None,
|
|
)
|
|
.await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
#[path = "transport_tests.rs"]
|
|
mod tests;
|