diff --git a/codex-rs/exec-server/README.md b/codex-rs/exec-server/README.md index 5ec1bc9188..fbdcb6e0b5 100644 --- a/codex-rs/exec-server/README.md +++ b/codex-rs/exec-server/README.md @@ -27,7 +27,7 @@ Environment | |- current ExecServerConnection? | |- one in-flight reconnect attempt | |- terminal reconnect error? - | `- durable process_sessions: HashMap> + | `- tracked process_sessions: HashMap> |- RemoteProcess -> RemoteExecProcess -> ProcessSessionHandle |- RemoteFileSystem `- HttpClient for RemoteExecServerClient @@ -65,15 +65,17 @@ The main roles are: HTTP capability so all remote APIs share one reconnecting session. - `RemoteExecServerSession`: durable logical-session state behind the client. It remembers the resumable session id, current live connection, one shared - reconnect attempt, tracked process sessions, and any terminal resume error. + reconnect attempt, weak references to process sessions that may need rebinding, + and any terminal resume error. - `ExecServerConnection`: one live JSON-RPC transport binding. It owns connection-local routing for notifications and streamed HTTP response bodies. - `Inner`: private per-connection machinery behind `ExecServerConnection`. It owns the `RpcClient`, reader task, disconnect latch, connection-local process notification routes, connection-local HTTP body stream routes, and initialized session id for that live binding. -- `ProcessSession`: durable per-process client state. It keeps the local event - log, wake cursor, and failure state that must survive connection replacement. +- `ProcessSession`: durable per-process client state owned by the live process + handle. It keeps the local event log, wake cursor, and failure state that must + survive connection replacement while that handle still exists. - `ProcessSessionHandle`: process-facing handle used by `RemoteExecProcess`. It routes reads, writes, terminate, and unregister through either a focused direct connection test path or the logical reconnecting client path. @@ -89,6 +91,9 @@ Reconnect invariants: reconnect loop per API surface. - Reconnect resumes the same logical session id and rebinds tracked `ProcessSession` routes onto the replacement `ExecServerConnection`. +- When a reconnecting process session loses its transport, it emits a local + `ResyncRequired` event and wake so callers blocked on pushed events or wake + notifications know to recover through `process/read(afterSeq)`. - `process/read` may retry once after a transport-close race because its `afterSeq` cursor makes the replay read-only and recoverable. - `process/start`, `process/write`, `process/terminate`, filesystem RPCs, and diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index e2d98adc74..e4eae94141 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use std::sync::Arc; use std::sync::Mutex as StdMutex; use std::sync::OnceLock; +use std::sync::Weak; use std::sync::atomic::AtomicU64; use std::time::Duration; @@ -16,6 +17,7 @@ use tokio::sync::Notify; use tokio::sync::mpsc; use tokio::sync::watch; +use tokio::time::sleep; use tokio::time::timeout; use tracing::debug; @@ -88,6 +90,8 @@ const CONNECT_TIMEOUT: Duration = Duration::from_secs(10); const INITIALIZE_TIMEOUT: Duration = Duration::from_secs(10); const PROCESS_EVENT_CHANNEL_CAPACITY: usize = 256; const PROCESS_EVENT_RETAINED_BYTES: usize = 1024 * 1024; +const TRANSIENT_RESUME_CONFLICT_RETRY_DELAY: Duration = Duration::from_millis(10); +const TRANSIENT_RESUME_CONFLICT_RETRY_LIMIT: usize = 3; impl Default for ExecServerClientConnectOptions { fn default() -> Self { @@ -216,14 +220,15 @@ pub(crate) struct RemoteExecServerClient { } // Shared state for one logical remote exec-server client. The logical client -// owns resumable session identity and durable process session state; individual -// ExecServerConnection values only bind that state to one live transport. +// owns resumable session identity and tracks live process sessions that may +// need rebinding; individual ExecServerConnection values only bind that state +// to one live transport. struct RemoteExecServerSession { connection: Option, connection_attempt: Option>, logical_session_id: Option, terminal_error: Option, - process_sessions: HashMap>, + process_sessions: HashMap>, } enum RemoteExecServerConnectionAction { @@ -257,6 +262,7 @@ impl RemoteExecServerClient { } pub(crate) async fn connection(&self) -> Result { + let mut transient_resume_conflicts = 0; loop { match self.next_connection_action()? { RemoteExecServerConnectionAction::Ready(connection) => return Ok(connection), @@ -271,11 +277,22 @@ impl RemoteExecServerClient { let connection = self .connect_and_rebind(resume_session_id.clone(), process_sessions) .await; - return self.finish_connection_attempt( + match self.finish_connection_attempt( connection_attempt, resume_session_id, connection, - ); + ) { + Ok(connection) => return Ok(connection), + Err(err) + if transient_resume_conflicts + < TRANSIENT_RESUME_CONFLICT_RETRY_LIMIT + && is_transient_resume_conflict(&err) => + { + transient_resume_conflicts += 1; + sleep(TRANSIENT_RESUME_CONFLICT_RETRY_DELAY).await; + } + Err(err) => return Err(err), + } } } } @@ -284,23 +301,34 @@ impl RemoteExecServerClient { pub(crate) async fn register_process_session( &self, process_id: &ProcessId, - ) -> Result { + ) -> Result<(ProcessSessionHandle, ExecServerConnection), ExecServerError> { let process_session = Arc::new(ProcessSession::new( ProcessSessionDisconnectBehavior::Preserve, )); { let mut session = self.lock_session(); - if session.process_sessions.contains_key(process_id) { + if session + .process_sessions + .get(process_id) + .and_then(Weak::upgrade) + .is_some() + { return Err(ExecServerError::Protocol(format!( "session already registered for process {process_id}" ))); } session .process_sessions - .insert(process_id.clone(), Arc::clone(&process_session)); + .insert(process_id.clone(), Arc::downgrade(&process_session)); } - let connection = self.connection().await?; + let connection = match self.connection().await { + Ok(connection) => connection, + Err(err) => { + self.unregister_process_session(process_id).await; + return Err(err); + } + }; if let Err(err) = connection .register_process_session_route(process_id, Arc::clone(&process_session)) .await @@ -309,11 +337,14 @@ impl RemoteExecServerClient { return Err(err); } - Ok(ProcessSessionHandle { - control: ProcessSessionControl::RemoteClient(self.clone()), - process_id: process_id.clone(), - session: process_session, - }) + Ok(( + ProcessSessionHandle { + control: ProcessSessionControl::RemoteClient(self.clone()), + process_id: process_id.clone(), + session: process_session, + }, + connection, + )) } async fn read(&self, params: ReadParams) -> Result { @@ -378,11 +409,16 @@ impl RemoteExecServerClient { let connection_attempt = Arc::new(Notify::new()); let resume_session_id = session.logical_session_id.clone(); - let process_sessions = session + let mut process_sessions = Vec::new(); + session .process_sessions - .iter() - .map(|(process_id, process_session)| (process_id.clone(), Arc::clone(process_session))) - .collect(); + .retain(|process_id, process_session| { + let Some(process_session) = process_session.upgrade() else { + return false; + }; + process_sessions.push((process_id.clone(), process_session)); + true + }); session.connection_attempt = Some(Arc::clone(&connection_attempt)); Ok(RemoteExecServerConnectionAction::Connect { connection_attempt, @@ -824,6 +860,16 @@ impl ProcessSession { let _ = self.wake_tx.send(next); } + fn notify_wake(&self) { + let next = (*self.wake_tx.borrow()).saturating_add(1); + let _ = self.wake_tx.send(next); + } + + fn note_resync_required(&self) { + self.notify_wake(); + self.events.publish(ExecProcessEvent::ResyncRequired); + } + /// Publishes a process event only when all earlier sequenced events have /// already been published. /// @@ -875,8 +921,7 @@ impl ProcessSession { *failure = Some(message.clone()); } drop(failure); - let next = (*self.wake_tx.borrow()).saturating_add(1); - let _ = self.wake_tx.send(next); + self.notify_wake(); if should_publish { let _ = self.publish_ordered_event(ExecProcessEvent::Failed(message)); } @@ -1007,10 +1052,14 @@ impl ProcessSessionControl { impl TerminalReconnectError { fn from_error(error: &ExecServerError) -> Option { match error { - ExecServerError::Server { code, message } if *code == -32600 => Some(Self { - code: *code, - message: message.clone(), - }), + ExecServerError::Server { code, message } + if *code == -32600 && message.starts_with("unknown session id ") => + { + Some(Self { + code: *code, + message: message.clone(), + }) + } _ => None, } } @@ -1123,6 +1172,14 @@ fn record_disconnected(inner: &Arc, message: String) -> String { } } +fn is_transient_resume_conflict(error: &ExecServerError) -> bool { + matches!( + error, + ExecServerError::Server { code, message } + if *code == -32600 && message.ends_with("is already attached to another connection") + ) +} + async fn fail_all_process_sessions(inner: &Arc, message: String) { let routes = inner.take_all_process_session_routes().await; @@ -1130,8 +1187,13 @@ async fn fail_all_process_sessions(inner: &Arc, message: String) { // One-shot sessions synthesize a closed read response and emit a // pushed Failed event. Reconnecting remote sessions keep their local // event state so a reattached client can bind them again. - if session.disconnect_behavior == ProcessSessionDisconnectBehavior::Fail { - session.set_failure(message.clone()).await; + match session.disconnect_behavior { + ProcessSessionDisconnectBehavior::Fail => { + session.set_failure(message.clone()).await; + } + ProcessSessionDisconnectBehavior::Preserve => { + session.note_resync_required(); + } } } } diff --git a/codex-rs/exec-server/src/client/reconnect_tests.rs b/codex-rs/exec-server/src/client/reconnect_tests.rs index 48284bf1f0..3e07cc14a7 100644 --- a/codex-rs/exec-server/src/client/reconnect_tests.rs +++ b/codex-rs/exec-server/src/client/reconnect_tests.rs @@ -1,6 +1,8 @@ use anyhow::Context; use anyhow::Result; use base64::Engine as _; +use codex_app_server_protocol::JSONRPCError; +use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCRequest; @@ -34,6 +36,7 @@ use crate::RemoveOptions; use crate::client_api::ExecServerTransportParams; use crate::client_api::HttpClient; use crate::process::ExecBackend; +use crate::process::ExecProcessEvent; use crate::protocol::ByteChunk; use crate::protocol::EXEC_METHOD; use crate::protocol::EXEC_READ_METHOD; @@ -310,6 +313,23 @@ impl WebSocketJsonRpcPeer { .await; } + async fn write_error( + &mut self, + id: codex_app_server_protocol::RequestId, + code: i64, + message: impl Into, + ) { + self.write_message(JSONRPCMessage::Error(JSONRPCError { + id, + error: JSONRPCErrorError { + code, + message: message.into(), + data: None, + }, + })) + .await; + } + async fn write_notification(&mut self, method: &str, params: T) where T: serde::Serialize, @@ -693,6 +713,283 @@ async fn remote_client_reuses_one_reconnect_attempt_for_concurrent_callers() -> Ok(()) } +#[tokio::test] +async fn remote_process_disconnect_notifies_resync_before_cursor_recovery() -> Result<()> { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .context("test websocket listener should bind")?; + let websocket_url = format!("ws://{}", listener.local_addr()?); + let (disconnect_tx, disconnect_rx) = tokio::sync::oneshot::channel(); + let server = tokio::spawn(async move { + let mut first = WebSocketJsonRpcPeer::accept(&listener).await; + first + .complete_initialize(/*expected_resume_session_id*/ None, "session-1") + .await; + let start_request = first.read_request(EXEC_METHOD).await; + first + .write_response( + start_request.id, + ExecResponse { + process_id: ProcessId::from("proc"), + }, + ) + .await; + disconnect_rx + .await + .expect("test should trigger idle disconnect"); + drop(first); + + let mut second = WebSocketJsonRpcPeer::accept(&listener).await; + second + .complete_initialize(Some("session-1"), "session-1") + .await; + let read_request = second.read_request(EXEC_READ_METHOD).await; + assert_eq!( + decode_request_params::(&read_request), + ReadParams { + process_id: ProcessId::from("proc"), + after_seq: None, + max_bytes: None, + wait_ms: Some(0), + } + ); + second + .write_response(read_request.id, successful_read_response()) + .await; + }); + + let client = test_remote_client(websocket_url); + let process = RemoteProcess::new(client); + let started = process + .start(test_exec_params(&ProcessId::from("proc"))) + .await?; + let mut wake = started.process.subscribe_wake(); + let mut events = started.process.subscribe_events(); + disconnect_tx + .send(()) + .expect("idle disconnect should signal"); + + timeout(Duration::from_secs(1), wake.changed()) + .await + .context("idle process wake should surface transport resync")??; + assert_eq!( + timeout(Duration::from_secs(1), events.recv()) + .await + .context("idle process event should surface transport resync")??, + ExecProcessEvent::ResyncRequired + ); + assert_eq!( + started + .process + .read( + /*after_seq*/ None, + /*max_bytes*/ None, + /*wait_ms*/ Some(0), + ) + .await?, + successful_read_response() + ); + + server.await.expect("test websocket server should finish"); + Ok(()) +} + +#[tokio::test] +async fn remote_client_retries_transient_resume_conflict() -> Result<()> { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .context("test websocket listener should bind")?; + let websocket_url = format!("ws://{}", listener.local_addr()?); + let accepted_connections = Arc::new(AtomicUsize::new(0)); + let server_connections = Arc::clone(&accepted_connections); + let server = tokio::spawn(async move { + let mut first = WebSocketJsonRpcPeer::accept(&listener).await; + server_connections.fetch_add(1, Ordering::SeqCst); + first + .complete_initialize(/*expected_resume_session_id*/ None, "session-1") + .await; + drop(first); + + let mut second = WebSocketJsonRpcPeer::accept(&listener).await; + server_connections.fetch_add(1, Ordering::SeqCst); + let request = second.read_request(INITIALIZE_METHOD).await; + assert_eq!( + decode_request_params::(&request), + InitializeParams { + client_name: crate::client_transport::ENVIRONMENT_CLIENT_NAME.to_string(), + resume_session_id: Some("session-1".to_string()), + } + ); + second + .write_error( + request.id, + -32600, + "session session-1 is already attached to another connection", + ) + .await; + + let mut third = WebSocketJsonRpcPeer::accept(&listener).await; + server_connections.fetch_add(1, Ordering::SeqCst); + third + .complete_initialize(Some("session-1"), "session-1") + .await; + }); + + let client = test_remote_client(websocket_url); + let first_connection = client.connection().await?; + wait_for_disconnect(&first_connection).await; + assert_eq!( + client.connection().await?.session_id(), + Some("session-1".to_string()) + ); + + server.await.expect("test websocket server should finish"); + assert_eq!(accepted_connections.load(Ordering::SeqCst), 3); + Ok(()) +} + +#[tokio::test] +async fn remote_client_caches_unknown_session_resume_failure() -> Result<()> { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .context("test websocket listener should bind")?; + let websocket_url = format!("ws://{}", listener.local_addr()?); + let accepted_connections = Arc::new(AtomicUsize::new(0)); + let server_connections = Arc::clone(&accepted_connections); + let server = tokio::spawn(async move { + let mut first = WebSocketJsonRpcPeer::accept(&listener).await; + server_connections.fetch_add(1, Ordering::SeqCst); + first + .complete_initialize(/*expected_resume_session_id*/ None, "session-1") + .await; + drop(first); + + let mut second = WebSocketJsonRpcPeer::accept(&listener).await; + server_connections.fetch_add(1, Ordering::SeqCst); + let request = second.read_request(INITIALIZE_METHOD).await; + second + .write_error(request.id, -32600, "unknown session id session-1") + .await; + + let extra_connection = timeout(Duration::from_millis(200), listener.accept()).await; + assert!( + extra_connection.is_err(), + "terminal resume failure should not open another websocket" + ); + }); + + let client = test_remote_client(websocket_url); + let first_connection = client.connection().await?; + wait_for_disconnect(&first_connection).await; + let first_error = match client.connection().await { + Ok(_) => panic!("unknown session should fail reconnect"), + Err(err) => err, + }; + assert_eq!( + first_error.to_string(), + "exec-server rejected request (-32600): unknown session id session-1" + ); + let second_error = match client.connection().await { + Ok(_) => panic!("unknown session should stay terminal"), + Err(err) => err, + }; + assert_eq!( + second_error.to_string(), + "exec-server rejected request (-32600): unknown session id session-1" + ); + + server.await.expect("test websocket server should finish"); + assert_eq!(accepted_connections.load(Ordering::SeqCst), 2); + Ok(()) +} + +#[tokio::test] +async fn remote_process_start_releases_session_after_initial_connect_failure() -> Result<()> { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .context("test websocket listener should bind")?; + let websocket_url = format!("ws://{}", listener.local_addr()?); + let server = tokio::spawn(async move { + let first = WebSocketJsonRpcPeer::accept(&listener).await; + drop(first); + + let mut second = WebSocketJsonRpcPeer::accept(&listener).await; + second + .complete_initialize(/*expected_resume_session_id*/ None, "session-1") + .await; + let request = second.read_request(EXEC_METHOD).await; + second + .write_response( + request.id, + ExecResponse { + process_id: ProcessId::from("proc"), + }, + ) + .await; + }); + + let client = test_remote_client(websocket_url); + let process = RemoteProcess::new(client); + let params = test_exec_params(&ProcessId::from("proc")); + assert!( + process.start(params.clone()).await.is_err(), + "initial connect should fail before dispatch" + ); + let started = process.start(params).await?; + assert_eq!(started.process.process_id(), &ProcessId::from("proc")); + + server.await.expect("test websocket server should finish"); + Ok(()) +} + +#[tokio::test] +async fn remote_process_drop_releases_session_for_process_id_reuse() -> Result<()> { + let listener = TcpListener::bind("127.0.0.1:0") + .await + .context("test websocket listener should bind")?; + let websocket_url = format!("ws://{}", listener.local_addr()?); + let server = tokio::spawn(async move { + let mut peer = WebSocketJsonRpcPeer::accept(&listener).await; + peer.complete_initialize(/*expected_resume_session_id*/ None, "session-1") + .await; + for _ in 0..2 { + let request = peer.read_request(EXEC_METHOD).await; + peer.write_response( + request.id, + ExecResponse { + process_id: ProcessId::from("proc"), + }, + ) + .await; + } + }); + + let client = test_remote_client(websocket_url); + let process = RemoteProcess::new(client); + let params = test_exec_params(&ProcessId::from("proc")); + let started = process.start(params.clone()).await?; + drop(started); + let restarted = timeout(Duration::from_secs(1), async { + loop { + match process.start(params.clone()).await { + Ok(restarted) => return Ok(restarted), + Err(crate::ExecServerError::Protocol(message)) + if message == "session already registered for process proc" => + { + tokio::task::yield_now().await; + } + Err(err) => return Err(err), + } + } + }) + .await + .context("dropped process session should unregister")??; + assert_eq!(restarted.process.process_id(), &ProcessId::from("proc")); + + server.await.expect("test websocket server should finish"); + Ok(()) +} + #[tokio::test] async fn remote_client_reconnects_before_dispatching_every_remote_api() -> Result<()> { for api in RemoteApi::ALL { diff --git a/codex-rs/exec-server/src/process.rs b/codex-rs/exec-server/src/process.rs index 052e4aa643..12a2737f0f 100644 --- a/codex-rs/exec-server/src/process.rs +++ b/codex-rs/exec-server/src/process.rs @@ -23,13 +23,16 @@ pub struct StartedExecProcess { /// The stream is scoped to one [`ExecProcess`] handle. `Output` events carry /// stdout, stderr, or pty bytes. `Exited` reports the process exit status, while /// `Closed` means all output streams have ended and no more output events will -/// arrive. `Failed` is used when the process session cannot continue, for -/// example because the remote executor connection disconnected. +/// arrive. `ResyncRequired` means a reconnecting remote process session should +/// recover through [`ExecProcess::read`] using its last delivered sequence. +/// `Failed` is used when the process session cannot continue, for example +/// because a direct one-shot executor connection disconnected. #[derive(Debug, Clone, PartialEq, Eq)] pub enum ExecProcessEvent { Output(ProcessOutputChunk), Exited { seq: u64, exit_code: i32 }, Closed { seq: u64 }, + ResyncRequired, Failed(String), } @@ -60,13 +63,14 @@ struct ExecProcessEventHistory { impl ExecProcessEvent { /// Sequence number used to order process-owned events. /// - /// `Failed` is intentionally unsequenced because it is synthesized by the - /// client when the session or transport fails, not emitted by the process. + /// `ResyncRequired` and `Failed` are intentionally unsequenced because they + /// are synthesized by the client when the session or transport changes, + /// not emitted by the process. pub(crate) fn seq(&self) -> Option { match self { ExecProcessEvent::Output(chunk) => Some(chunk.seq), ExecProcessEvent::Exited { seq, .. } | ExecProcessEvent::Closed { seq } => Some(*seq), - ExecProcessEvent::Failed(_) => None, + ExecProcessEvent::ResyncRequired | ExecProcessEvent::Failed(_) => None, } } @@ -74,7 +78,9 @@ impl ExecProcessEvent { match self { ExecProcessEvent::Output(chunk) => chunk.chunk.0.len(), ExecProcessEvent::Failed(message) => message.len(), - ExecProcessEvent::Exited { .. } | ExecProcessEvent::Closed { .. } => 0, + ExecProcessEvent::Exited { .. } + | ExecProcessEvent::Closed { .. } + | ExecProcessEvent::ResyncRequired => 0, } } } diff --git a/codex-rs/exec-server/src/remote_process.rs b/codex-rs/exec-server/src/remote_process.rs index 903d3b056e..ba67f42695 100644 --- a/codex-rs/exec-server/src/remote_process.rs +++ b/codex-rs/exec-server/src/remote_process.rs @@ -1,8 +1,10 @@ use std::sync::Arc; use async_trait::async_trait; +use tokio::runtime::Handle; use tokio::sync::watch; use tracing::trace; +use tracing::warn; use crate::ExecBackend; use crate::ExecProcess; @@ -35,8 +37,7 @@ impl RemoteProcess { impl ExecBackend for RemoteProcess { async fn start(&self, params: ExecParams) -> Result { let process_id = params.process_id.clone(); - let session = self.client.register_process_session(&process_id).await?; - let connection = self.client.connection().await?; + let (session, connection) = self.client.register_process_session(&process_id).await?; if let Err(err) = connection.exec(params).await { session.unregister().await; return Err(err); @@ -85,8 +86,14 @@ impl ExecProcess for RemoteExecProcess { impl Drop for RemoteExecProcess { fn drop(&mut self) { let session = self.session.clone(); - tokio::spawn(async move { + let Ok(handle) = Handle::try_current() else { + warn!( + "Could not schedule remote exec process unregister on drop: no Tokio runtime is available" + ); + return; + }; + std::mem::drop(handle.spawn(async move { session.unregister().await; - }); + })); } } diff --git a/codex-rs/exec-server/tests/exec_process.rs b/codex-rs/exec-server/tests/exec_process.rs index 02e9cd48c5..c055d32582 100644 --- a/codex-rs/exec-server/tests/exec_process.rs +++ b/codex-rs/exec-server/tests/exec_process.rs @@ -166,6 +166,7 @@ async fn collect_process_output_from_events( drop(session); return Ok((stdout, stderr, exit_code, true)); } + ExecProcessEvent::ResyncRequired => continue, ExecProcessEvent::Failed(message) => { anyhow::bail!("process failed before closed state: {message}"); } @@ -189,6 +190,7 @@ async fn collect_process_event_snapshots( ProcessEventSnapshot::Exited { seq, exit_code } } ExecProcessEvent::Closed { seq } => ProcessEventSnapshot::Closed { seq }, + ExecProcessEvent::ResyncRequired => continue, ExecProcessEvent::Failed(message) => { anyhow::bail!("process failed before closed state: {message}"); } diff --git a/codex-rs/rmcp-client/src/executor_process_transport.rs b/codex-rs/rmcp-client/src/executor_process_transport.rs index 41f0b7660d..21bcc5fb88 100644 --- a/codex-rs/rmcp-client/src/executor_process_transport.rs +++ b/codex-rs/rmcp-client/src/executor_process_transport.rs @@ -193,6 +193,15 @@ impl ExecutorProcessTransport { self.note_seq(seq); self.closed = true; } + Ok(ExecProcessEvent::ResyncRequired) => { + if let Err(error) = self.recover_process_events().await { + warn!( + "Failed to resync remote MCP server output stream ({}): {error}", + self.program_name + ); + self.closed = true; + } + } Ok(ExecProcessEvent::Failed(message)) => { warn!( "Remote MCP server process failed ({}): {message}", @@ -205,7 +214,7 @@ impl ExecutorProcessTransport { "Remote MCP server output stream lagged ({}): skipped {skipped} events", self.program_name ); - if let Err(error) = self.recover_lagged_events().await { + if let Err(error) = self.recover_process_events().await { warn!( "Failed to recover remote MCP server output stream ({}): {error}", self.program_name @@ -232,7 +241,7 @@ impl ExecutorProcessTransport { true } - async fn recover_lagged_events(&mut self) -> io::Result<()> { + async fn recover_process_events(&mut self) -> io::Result<()> { let response = self .process .read(