Harden exec-server reconnect recovery

This commit is contained in:
starr-openai
2026-05-18 23:33:43 -07:00
parent d82bb0d81e
commit edd0d6a66e
7 changed files with 430 additions and 42 deletions

View File

@@ -27,7 +27,7 @@ Environment
| |- current ExecServerConnection?
| |- one in-flight reconnect attempt
| |- terminal reconnect error?
| `- durable process_sessions: HashMap<ProcessId, Arc<ProcessSession>>
| `- tracked process_sessions: HashMap<ProcessId, Weak<ProcessSession>>
|- RemoteProcess -> RemoteExecProcess -> ProcessSessionHandle
|- RemoteFileSystem
`- HttpClient for RemoteExecServerClient
@@ -65,15 +65,17 @@ The main roles are:
HTTP capability so all remote APIs share one reconnecting session.
- `RemoteExecServerSession`: durable logical-session state behind the client.
It remembers the resumable session id, current live connection, one shared
reconnect attempt, tracked process sessions, and any terminal resume error.
reconnect attempt, weak references to process sessions that may need rebinding,
and any terminal resume error.
- `ExecServerConnection`: one live JSON-RPC transport binding. It owns
connection-local routing for notifications and streamed HTTP response bodies.
- `Inner`: private per-connection machinery behind `ExecServerConnection`.
It owns the `RpcClient`, reader task, disconnect latch, connection-local
process notification routes, connection-local HTTP body stream routes, and
initialized session id for that live binding.
- `ProcessSession`: durable per-process client state. It keeps the local event
log, wake cursor, and failure state that must survive connection replacement.
- `ProcessSession`: durable per-process client state owned by the live process
handle. It keeps the local event log, wake cursor, and failure state that must
survive connection replacement while that handle still exists.
- `ProcessSessionHandle`: process-facing handle used by `RemoteExecProcess`.
It routes reads, writes, terminate, and unregister through either a focused
direct connection test path or the logical reconnecting client path.
@@ -89,6 +91,9 @@ Reconnect invariants:
reconnect loop per API surface.
- Reconnect resumes the same logical session id and rebinds tracked
`ProcessSession` routes onto the replacement `ExecServerConnection`.
- When a reconnecting process session loses its transport, it emits a local
`ResyncRequired` event and wake so callers blocked on pushed events or wake
notifications know to recover through `process/read(afterSeq)`.
- `process/read` may retry once after a transport-close race because its
`afterSeq` cursor makes the replay read-only and recoverable.
- `process/start`, `process/write`, `process/terminate`, filesystem RPCs, and

View File

@@ -3,6 +3,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::sync::OnceLock;
use std::sync::Weak;
use std::sync::atomic::AtomicU64;
use std::time::Duration;
@@ -16,6 +17,7 @@ use tokio::sync::Notify;
use tokio::sync::mpsc;
use tokio::sync::watch;
use tokio::time::sleep;
use tokio::time::timeout;
use tracing::debug;
@@ -88,6 +90,8 @@ const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10);
const PROCESS_EVENT_CHANNEL_CAPACITY: usize = 256;
const PROCESS_EVENT_RETAINED_BYTES: usize = 1024 * 1024;
const TRANSIENT_RESUME_CONFLICT_RETRY_DELAY: Duration = Duration::from_millis(10);
const TRANSIENT_RESUME_CONFLICT_RETRY_LIMIT: usize = 3;
impl Default for ExecServerClientConnectOptions {
fn default() -> Self {
@@ -216,14 +220,15 @@ pub(crate) struct RemoteExecServerClient {
}
// Shared state for one logical remote exec-server client. The logical client
// owns resumable session identity and durable process session state; individual
// ExecServerConnection values only bind that state to one live transport.
// owns resumable session identity and tracks live process sessions that may
// need rebinding; individual ExecServerConnection values only bind that state
// to one live transport.
struct RemoteExecServerSession {
connection: Option<ExecServerConnection>,
connection_attempt: Option<Arc<Notify>>,
logical_session_id: Option<String>,
terminal_error: Option<TerminalReconnectError>,
process_sessions: HashMap<ProcessId, Arc<ProcessSession>>,
process_sessions: HashMap<ProcessId, Weak<ProcessSession>>,
}
enum RemoteExecServerConnectionAction {
@@ -257,6 +262,7 @@ impl RemoteExecServerClient {
}
pub(crate) async fn connection(&self) -> Result<ExecServerConnection, ExecServerError> {
let mut transient_resume_conflicts = 0;
loop {
match self.next_connection_action()? {
RemoteExecServerConnectionAction::Ready(connection) => return Ok(connection),
@@ -271,11 +277,22 @@ impl RemoteExecServerClient {
let connection = self
.connect_and_rebind(resume_session_id.clone(), process_sessions)
.await;
return self.finish_connection_attempt(
match self.finish_connection_attempt(
connection_attempt,
resume_session_id,
connection,
);
) {
Ok(connection) => return Ok(connection),
Err(err)
if transient_resume_conflicts
< TRANSIENT_RESUME_CONFLICT_RETRY_LIMIT
&& is_transient_resume_conflict(&err) =>
{
transient_resume_conflicts += 1;
sleep(TRANSIENT_RESUME_CONFLICT_RETRY_DELAY).await;
}
Err(err) => return Err(err),
}
}
}
}
@@ -284,23 +301,34 @@ impl RemoteExecServerClient {
pub(crate) async fn register_process_session(
&self,
process_id: &ProcessId,
) -> Result<ProcessSessionHandle, ExecServerError> {
) -> Result<(ProcessSessionHandle, ExecServerConnection), ExecServerError> {
let process_session = Arc::new(ProcessSession::new(
ProcessSessionDisconnectBehavior::Preserve,
));
{
let mut session = self.lock_session();
if session.process_sessions.contains_key(process_id) {
if session
.process_sessions
.get(process_id)
.and_then(Weak::upgrade)
.is_some()
{
return Err(ExecServerError::Protocol(format!(
"session already registered for process {process_id}"
)));
}
session
.process_sessions
.insert(process_id.clone(), Arc::clone(&process_session));
.insert(process_id.clone(), Arc::downgrade(&process_session));
}
let connection = self.connection().await?;
let connection = match self.connection().await {
Ok(connection) => connection,
Err(err) => {
self.unregister_process_session(process_id).await;
return Err(err);
}
};
if let Err(err) = connection
.register_process_session_route(process_id, Arc::clone(&process_session))
.await
@@ -309,11 +337,14 @@ impl RemoteExecServerClient {
return Err(err);
}
Ok(ProcessSessionHandle {
control: ProcessSessionControl::RemoteClient(self.clone()),
process_id: process_id.clone(),
session: process_session,
})
Ok((
ProcessSessionHandle {
control: ProcessSessionControl::RemoteClient(self.clone()),
process_id: process_id.clone(),
session: process_session,
},
connection,
))
}
async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError> {
@@ -378,11 +409,16 @@ impl RemoteExecServerClient {
let connection_attempt = Arc::new(Notify::new());
let resume_session_id = session.logical_session_id.clone();
let process_sessions = session
let mut process_sessions = Vec::new();
session
.process_sessions
.iter()
.map(|(process_id, process_session)| (process_id.clone(), Arc::clone(process_session)))
.collect();
.retain(|process_id, process_session| {
let Some(process_session) = process_session.upgrade() else {
return false;
};
process_sessions.push((process_id.clone(), process_session));
true
});
session.connection_attempt = Some(Arc::clone(&connection_attempt));
Ok(RemoteExecServerConnectionAction::Connect {
connection_attempt,
@@ -824,6 +860,16 @@ impl ProcessSession {
let _ = self.wake_tx.send(next);
}
fn notify_wake(&self) {
let next = (*self.wake_tx.borrow()).saturating_add(1);
let _ = self.wake_tx.send(next);
}
fn note_resync_required(&self) {
self.notify_wake();
self.events.publish(ExecProcessEvent::ResyncRequired);
}
/// Publishes a process event only when all earlier sequenced events have
/// already been published.
///
@@ -875,8 +921,7 @@ impl ProcessSession {
*failure = Some(message.clone());
}
drop(failure);
let next = (*self.wake_tx.borrow()).saturating_add(1);
let _ = self.wake_tx.send(next);
self.notify_wake();
if should_publish {
let _ = self.publish_ordered_event(ExecProcessEvent::Failed(message));
}
@@ -1007,10 +1052,14 @@ impl ProcessSessionControl {
impl TerminalReconnectError {
fn from_error(error: &ExecServerError) -> Option<Self> {
match error {
ExecServerError::Server { code, message } if *code == -32600 => Some(Self {
code: *code,
message: message.clone(),
}),
ExecServerError::Server { code, message }
if *code == -32600 && message.starts_with("unknown session id ") =>
{
Some(Self {
code: *code,
message: message.clone(),
})
}
_ => None,
}
}
@@ -1123,6 +1172,14 @@ fn record_disconnected(inner: &Arc<Inner>, message: String) -> String {
}
}
fn is_transient_resume_conflict(error: &ExecServerError) -> bool {
matches!(
error,
ExecServerError::Server { code, message }
if *code == -32600 && message.ends_with("is already attached to another connection")
)
}
async fn fail_all_process_sessions(inner: &Arc<Inner>, message: String) {
let routes = inner.take_all_process_session_routes().await;
@@ -1130,8 +1187,13 @@ async fn fail_all_process_sessions(inner: &Arc<Inner>, message: String) {
// One-shot sessions synthesize a closed read response and emit a
// pushed Failed event. Reconnecting remote sessions keep their local
// event state so a reattached client can bind them again.
if session.disconnect_behavior == ProcessSessionDisconnectBehavior::Fail {
session.set_failure(message.clone()).await;
match session.disconnect_behavior {
ProcessSessionDisconnectBehavior::Fail => {
session.set_failure(message.clone()).await;
}
ProcessSessionDisconnectBehavior::Preserve => {
session.note_resync_required();
}
}
}
}

View File

@@ -1,6 +1,8 @@
use anyhow::Context;
use anyhow::Result;
use base64::Engine as _;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCRequest;
@@ -34,6 +36,7 @@ use crate::RemoveOptions;
use crate::client_api::ExecServerTransportParams;
use crate::client_api::HttpClient;
use crate::process::ExecBackend;
use crate::process::ExecProcessEvent;
use crate::protocol::ByteChunk;
use crate::protocol::EXEC_METHOD;
use crate::protocol::EXEC_READ_METHOD;
@@ -310,6 +313,23 @@ impl WebSocketJsonRpcPeer {
.await;
}
async fn write_error(
&mut self,
id: codex_app_server_protocol::RequestId,
code: i64,
message: impl Into<String>,
) {
self.write_message(JSONRPCMessage::Error(JSONRPCError {
id,
error: JSONRPCErrorError {
code,
message: message.into(),
data: None,
},
}))
.await;
}
async fn write_notification<T>(&mut self, method: &str, params: T)
where
T: serde::Serialize,
@@ -693,6 +713,283 @@ async fn remote_client_reuses_one_reconnect_attempt_for_concurrent_callers() ->
Ok(())
}
#[tokio::test]
async fn remote_process_disconnect_notifies_resync_before_cursor_recovery() -> Result<()> {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.context("test websocket listener should bind")?;
let websocket_url = format!("ws://{}", listener.local_addr()?);
let (disconnect_tx, disconnect_rx) = tokio::sync::oneshot::channel();
let server = tokio::spawn(async move {
let mut first = WebSocketJsonRpcPeer::accept(&listener).await;
first
.complete_initialize(/*expected_resume_session_id*/ None, "session-1")
.await;
let start_request = first.read_request(EXEC_METHOD).await;
first
.write_response(
start_request.id,
ExecResponse {
process_id: ProcessId::from("proc"),
},
)
.await;
disconnect_rx
.await
.expect("test should trigger idle disconnect");
drop(first);
let mut second = WebSocketJsonRpcPeer::accept(&listener).await;
second
.complete_initialize(Some("session-1"), "session-1")
.await;
let read_request = second.read_request(EXEC_READ_METHOD).await;
assert_eq!(
decode_request_params::<ReadParams>(&read_request),
ReadParams {
process_id: ProcessId::from("proc"),
after_seq: None,
max_bytes: None,
wait_ms: Some(0),
}
);
second
.write_response(read_request.id, successful_read_response())
.await;
});
let client = test_remote_client(websocket_url);
let process = RemoteProcess::new(client);
let started = process
.start(test_exec_params(&ProcessId::from("proc")))
.await?;
let mut wake = started.process.subscribe_wake();
let mut events = started.process.subscribe_events();
disconnect_tx
.send(())
.expect("idle disconnect should signal");
timeout(Duration::from_secs(1), wake.changed())
.await
.context("idle process wake should surface transport resync")??;
assert_eq!(
timeout(Duration::from_secs(1), events.recv())
.await
.context("idle process event should surface transport resync")??,
ExecProcessEvent::ResyncRequired
);
assert_eq!(
started
.process
.read(
/*after_seq*/ None,
/*max_bytes*/ None,
/*wait_ms*/ Some(0),
)
.await?,
successful_read_response()
);
server.await.expect("test websocket server should finish");
Ok(())
}
#[tokio::test]
async fn remote_client_retries_transient_resume_conflict() -> Result<()> {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.context("test websocket listener should bind")?;
let websocket_url = format!("ws://{}", listener.local_addr()?);
let accepted_connections = Arc::new(AtomicUsize::new(0));
let server_connections = Arc::clone(&accepted_connections);
let server = tokio::spawn(async move {
let mut first = WebSocketJsonRpcPeer::accept(&listener).await;
server_connections.fetch_add(1, Ordering::SeqCst);
first
.complete_initialize(/*expected_resume_session_id*/ None, "session-1")
.await;
drop(first);
let mut second = WebSocketJsonRpcPeer::accept(&listener).await;
server_connections.fetch_add(1, Ordering::SeqCst);
let request = second.read_request(INITIALIZE_METHOD).await;
assert_eq!(
decode_request_params::<InitializeParams>(&request),
InitializeParams {
client_name: crate::client_transport::ENVIRONMENT_CLIENT_NAME.to_string(),
resume_session_id: Some("session-1".to_string()),
}
);
second
.write_error(
request.id,
-32600,
"session session-1 is already attached to another connection",
)
.await;
let mut third = WebSocketJsonRpcPeer::accept(&listener).await;
server_connections.fetch_add(1, Ordering::SeqCst);
third
.complete_initialize(Some("session-1"), "session-1")
.await;
});
let client = test_remote_client(websocket_url);
let first_connection = client.connection().await?;
wait_for_disconnect(&first_connection).await;
assert_eq!(
client.connection().await?.session_id(),
Some("session-1".to_string())
);
server.await.expect("test websocket server should finish");
assert_eq!(accepted_connections.load(Ordering::SeqCst), 3);
Ok(())
}
#[tokio::test]
async fn remote_client_caches_unknown_session_resume_failure() -> Result<()> {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.context("test websocket listener should bind")?;
let websocket_url = format!("ws://{}", listener.local_addr()?);
let accepted_connections = Arc::new(AtomicUsize::new(0));
let server_connections = Arc::clone(&accepted_connections);
let server = tokio::spawn(async move {
let mut first = WebSocketJsonRpcPeer::accept(&listener).await;
server_connections.fetch_add(1, Ordering::SeqCst);
first
.complete_initialize(/*expected_resume_session_id*/ None, "session-1")
.await;
drop(first);
let mut second = WebSocketJsonRpcPeer::accept(&listener).await;
server_connections.fetch_add(1, Ordering::SeqCst);
let request = second.read_request(INITIALIZE_METHOD).await;
second
.write_error(request.id, -32600, "unknown session id session-1")
.await;
let extra_connection = timeout(Duration::from_millis(200), listener.accept()).await;
assert!(
extra_connection.is_err(),
"terminal resume failure should not open another websocket"
);
});
let client = test_remote_client(websocket_url);
let first_connection = client.connection().await?;
wait_for_disconnect(&first_connection).await;
let first_error = match client.connection().await {
Ok(_) => panic!("unknown session should fail reconnect"),
Err(err) => err,
};
assert_eq!(
first_error.to_string(),
"exec-server rejected request (-32600): unknown session id session-1"
);
let second_error = match client.connection().await {
Ok(_) => panic!("unknown session should stay terminal"),
Err(err) => err,
};
assert_eq!(
second_error.to_string(),
"exec-server rejected request (-32600): unknown session id session-1"
);
server.await.expect("test websocket server should finish");
assert_eq!(accepted_connections.load(Ordering::SeqCst), 2);
Ok(())
}
#[tokio::test]
async fn remote_process_start_releases_session_after_initial_connect_failure() -> Result<()> {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.context("test websocket listener should bind")?;
let websocket_url = format!("ws://{}", listener.local_addr()?);
let server = tokio::spawn(async move {
let first = WebSocketJsonRpcPeer::accept(&listener).await;
drop(first);
let mut second = WebSocketJsonRpcPeer::accept(&listener).await;
second
.complete_initialize(/*expected_resume_session_id*/ None, "session-1")
.await;
let request = second.read_request(EXEC_METHOD).await;
second
.write_response(
request.id,
ExecResponse {
process_id: ProcessId::from("proc"),
},
)
.await;
});
let client = test_remote_client(websocket_url);
let process = RemoteProcess::new(client);
let params = test_exec_params(&ProcessId::from("proc"));
assert!(
process.start(params.clone()).await.is_err(),
"initial connect should fail before dispatch"
);
let started = process.start(params).await?;
assert_eq!(started.process.process_id(), &ProcessId::from("proc"));
server.await.expect("test websocket server should finish");
Ok(())
}
#[tokio::test]
async fn remote_process_drop_releases_session_for_process_id_reuse() -> Result<()> {
let listener = TcpListener::bind("127.0.0.1:0")
.await
.context("test websocket listener should bind")?;
let websocket_url = format!("ws://{}", listener.local_addr()?);
let server = tokio::spawn(async move {
let mut peer = WebSocketJsonRpcPeer::accept(&listener).await;
peer.complete_initialize(/*expected_resume_session_id*/ None, "session-1")
.await;
for _ in 0..2 {
let request = peer.read_request(EXEC_METHOD).await;
peer.write_response(
request.id,
ExecResponse {
process_id: ProcessId::from("proc"),
},
)
.await;
}
});
let client = test_remote_client(websocket_url);
let process = RemoteProcess::new(client);
let params = test_exec_params(&ProcessId::from("proc"));
let started = process.start(params.clone()).await?;
drop(started);
let restarted = timeout(Duration::from_secs(1), async {
loop {
match process.start(params.clone()).await {
Ok(restarted) => return Ok(restarted),
Err(crate::ExecServerError::Protocol(message))
if message == "session already registered for process proc" =>
{
tokio::task::yield_now().await;
}
Err(err) => return Err(err),
}
}
})
.await
.context("dropped process session should unregister")??;
assert_eq!(restarted.process.process_id(), &ProcessId::from("proc"));
server.await.expect("test websocket server should finish");
Ok(())
}
#[tokio::test]
async fn remote_client_reconnects_before_dispatching_every_remote_api() -> Result<()> {
for api in RemoteApi::ALL {

View File

@@ -23,13 +23,16 @@ pub struct StartedExecProcess {
/// The stream is scoped to one [`ExecProcess`] handle. `Output` events carry
/// stdout, stderr, or pty bytes. `Exited` reports the process exit status, while
/// `Closed` means all output streams have ended and no more output events will
/// arrive. `Failed` is used when the process session cannot continue, for
/// example because the remote executor connection disconnected.
/// arrive. `ResyncRequired` means a reconnecting remote process session should
/// recover through [`ExecProcess::read`] using its last delivered sequence.
/// `Failed` is used when the process session cannot continue, for example
/// because a direct one-shot executor connection disconnected.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExecProcessEvent {
Output(ProcessOutputChunk),
Exited { seq: u64, exit_code: i32 },
Closed { seq: u64 },
ResyncRequired,
Failed(String),
}
@@ -60,13 +63,14 @@ struct ExecProcessEventHistory {
impl ExecProcessEvent {
/// Sequence number used to order process-owned events.
///
/// `Failed` is intentionally unsequenced because it is synthesized by the
/// client when the session or transport fails, not emitted by the process.
/// `ResyncRequired` and `Failed` are intentionally unsequenced because they
/// are synthesized by the client when the session or transport changes,
/// not emitted by the process.
pub(crate) fn seq(&self) -> Option<u64> {
match self {
ExecProcessEvent::Output(chunk) => Some(chunk.seq),
ExecProcessEvent::Exited { seq, .. } | ExecProcessEvent::Closed { seq } => Some(*seq),
ExecProcessEvent::Failed(_) => None,
ExecProcessEvent::ResyncRequired | ExecProcessEvent::Failed(_) => None,
}
}
@@ -74,7 +78,9 @@ impl ExecProcessEvent {
match self {
ExecProcessEvent::Output(chunk) => chunk.chunk.0.len(),
ExecProcessEvent::Failed(message) => message.len(),
ExecProcessEvent::Exited { .. } | ExecProcessEvent::Closed { .. } => 0,
ExecProcessEvent::Exited { .. }
| ExecProcessEvent::Closed { .. }
| ExecProcessEvent::ResyncRequired => 0,
}
}
}

View File

@@ -1,8 +1,10 @@
use std::sync::Arc;
use async_trait::async_trait;
use tokio::runtime::Handle;
use tokio::sync::watch;
use tracing::trace;
use tracing::warn;
use crate::ExecBackend;
use crate::ExecProcess;
@@ -35,8 +37,7 @@ impl RemoteProcess {
impl ExecBackend for RemoteProcess {
async fn start(&self, params: ExecParams) -> Result<StartedExecProcess, ExecServerError> {
let process_id = params.process_id.clone();
let session = self.client.register_process_session(&process_id).await?;
let connection = self.client.connection().await?;
let (session, connection) = self.client.register_process_session(&process_id).await?;
if let Err(err) = connection.exec(params).await {
session.unregister().await;
return Err(err);
@@ -85,8 +86,14 @@ impl ExecProcess for RemoteExecProcess {
impl Drop for RemoteExecProcess {
fn drop(&mut self) {
let session = self.session.clone();
tokio::spawn(async move {
let Ok(handle) = Handle::try_current() else {
warn!(
"Could not schedule remote exec process unregister on drop: no Tokio runtime is available"
);
return;
};
std::mem::drop(handle.spawn(async move {
session.unregister().await;
});
}));
}
}

View File

@@ -166,6 +166,7 @@ async fn collect_process_output_from_events(
drop(session);
return Ok((stdout, stderr, exit_code, true));
}
ExecProcessEvent::ResyncRequired => continue,
ExecProcessEvent::Failed(message) => {
anyhow::bail!("process failed before closed state: {message}");
}
@@ -189,6 +190,7 @@ async fn collect_process_event_snapshots(
ProcessEventSnapshot::Exited { seq, exit_code }
}
ExecProcessEvent::Closed { seq } => ProcessEventSnapshot::Closed { seq },
ExecProcessEvent::ResyncRequired => continue,
ExecProcessEvent::Failed(message) => {
anyhow::bail!("process failed before closed state: {message}");
}