use std::collections::HashMap; use std::collections::VecDeque; use std::sync::Arc; use std::time::Duration; use codex_utils_pty::ExecCommandSession; use codex_utils_pty::TerminalSize; use tokio::sync::Mutex; use tokio::sync::mpsc; use tracing::warn; use crate::protocol::ExecExitedNotification; use crate::protocol::ExecOutputDeltaNotification; use crate::protocol::ExecOutputStream; use crate::protocol::ExecResponse; use crate::protocol::InitializeResponse; use crate::protocol::PROTOCOL_VERSION; use crate::protocol::ProcessOutputChunk; use crate::protocol::ReadResponse; use crate::protocol::TerminateResponse; use crate::protocol::WriteResponse; use crate::server::routing::ExecServerOutboundMessage; use crate::server::routing::ExecServerServerNotification; use crate::server::routing::internal_error; use crate::server::routing::invalid_params; use crate::server::routing::invalid_request; const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024; #[derive(Clone)] struct RetainedOutputChunk { seq: u64, stream: ExecOutputStream, chunk: Vec, } struct RunningProcess { session: ExecCommandSession, tty: bool, output: VecDeque, retained_bytes: usize, next_seq: u64, exit_code: Option, } pub(crate) struct ExecServerHandler { outbound_tx: mpsc::Sender, // Keyed by client-chosen logical `processId` scoped to this connection. // This is a protocol handle, not an OS pid. processes: Arc>>, initialize_requested: bool, initialized: bool, } impl ExecServerHandler { pub(crate) fn new(outbound_tx: mpsc::Sender) -> Self { Self { outbound_tx, processes: Arc::new(Mutex::new(HashMap::new())), initialize_requested: false, initialized: false, } } pub(crate) async fn shutdown(&self) { let remaining = { let mut processes = self.processes.lock().await; processes .drain() .map(|(_, process)| process) .collect::>() }; for process in remaining { process.session.terminate(); } } pub(crate) fn initialized(&mut self) -> Result<(), String> { if !self.initialize_requested { return Err("received `initialized` notification before `initialize`".into()); } self.initialized = true; Ok(()) } pub(crate) fn initialize( &mut self, ) -> Result { if self.initialize_requested { return Err(invalid_request( "initialize may only be sent once per connection".to_string(), )); } self.initialize_requested = true; Ok(InitializeResponse { protocol_version: PROTOCOL_VERSION.to_string(), }) } fn require_initialized(&self) -> Result<(), codex_app_server_protocol::JSONRPCErrorError> { if !self.initialize_requested { return Err(invalid_request( "client must call initialize before using exec methods".to_string(), )); } if !self.initialized { return Err(invalid_request( "client must send initialized before using exec methods".to_string(), )); } Ok(()) } pub(crate) async fn exec( &self, params: crate::protocol::ExecParams, ) -> Result { self.require_initialized()?; let process_id = params.process_id.clone(); // Same-connection requests are serialized by the RPC processor, and the // in-process client holds the handler mutex across this full call. That // makes this pre-spawn duplicate check safe for the current entrypoints. { let process_map = self.processes.lock().await; if process_map.contains_key(&process_id) { return Err(invalid_request(format!( "process {process_id} already exists" ))); } } let (program, args) = params .argv .split_first() .ok_or_else(|| invalid_params("argv must not be empty".to_string()))?; let spawned = if params.tty { codex_utils_pty::spawn_pty_process( program, args, params.cwd.as_path(), ¶ms.env, ¶ms.arg0, TerminalSize::default(), ) .await } else { codex_utils_pty::spawn_pipe_process_no_stdin( program, args, params.cwd.as_path(), ¶ms.env, ¶ms.arg0, ) .await } .map_err(|err| internal_error(err.to_string()))?; { let mut process_map = self.processes.lock().await; process_map.insert( process_id.clone(), RunningProcess { session: spawned.session, tty: params.tty, output: std::collections::VecDeque::new(), retained_bytes: 0, next_seq: 1, exit_code: None, }, ); } tokio::spawn(stream_output( process_id.clone(), if params.tty { ExecOutputStream::Pty } else { ExecOutputStream::Stdout }, spawned.stdout_rx, self.outbound_tx.clone(), Arc::clone(&self.processes), )); tokio::spawn(stream_output( process_id.clone(), if params.tty { ExecOutputStream::Pty } else { ExecOutputStream::Stderr }, spawned.stderr_rx, self.outbound_tx.clone(), Arc::clone(&self.processes), )); tokio::spawn(watch_exit( process_id.clone(), spawned.exit_rx, self.outbound_tx.clone(), Arc::clone(&self.processes), )); Ok(ExecResponse { process_id }) } pub(crate) async fn read( &self, params: crate::protocol::ReadParams, ) -> Result { self.require_initialized()?; let after_seq = params.after_seq.unwrap_or(0); let max_bytes = params.max_bytes.unwrap_or(usize::MAX); let wait = Duration::from_millis(params.wait_ms.unwrap_or(0)); let deadline = tokio::time::Instant::now() + wait; loop { let response = { let process_map = self.processes.lock().await; let process = process_map.get(¶ms.process_id).ok_or_else(|| { invalid_request(format!("unknown process id {}", params.process_id)) })?; let mut chunks = Vec::new(); let mut total_bytes = 0; let mut next_seq = process.next_seq; for retained in process.output.iter().filter(|chunk| chunk.seq > after_seq) { let chunk_len = retained.chunk.len(); if !chunks.is_empty() && total_bytes + chunk_len > max_bytes { break; } total_bytes += chunk_len; chunks.push(ProcessOutputChunk { seq: retained.seq, stream: retained.stream, chunk: retained.chunk.clone().into(), }); next_seq = retained.seq + 1; if total_bytes >= max_bytes { break; } } ReadResponse { chunks, next_seq, exited: process.exit_code.is_some(), exit_code: process.exit_code, } }; if !response.chunks.is_empty() || response.exited || tokio::time::Instant::now() >= deadline { return Ok(response); } tokio::time::sleep(Duration::from_millis(10)).await; } } pub(crate) async fn write( &self, params: crate::protocol::WriteParams, ) -> Result { self.require_initialized()?; let writer_tx = { let process_map = self.processes.lock().await; let process = process_map.get(¶ms.process_id).ok_or_else(|| { invalid_request(format!("unknown process id {}", params.process_id)) })?; if !process.tty { return Err(invalid_request(format!( "stdin is closed for process {}", params.process_id ))); } process.session.writer_sender() }; writer_tx .send(params.chunk.into_inner()) .await .map_err(|_| internal_error("failed to write to process stdin".to_string()))?; Ok(WriteResponse { accepted: true }) } pub(crate) async fn terminate( &self, params: crate::protocol::TerminateParams, ) -> Result { self.require_initialized()?; let running = { let process_map = self.processes.lock().await; if let Some(process) = process_map.get(¶ms.process_id) { process.session.terminate(); true } else { false } }; Ok(TerminateResponse { running }) } } #[cfg(test)] impl ExecServerHandler { async fn handle_message( &mut self, message: crate::server::routing::ExecServerInboundMessage, ) -> Result<(), String> { match message { crate::server::routing::ExecServerInboundMessage::Request(request) => { self.handle_request(request).await } crate::server::routing::ExecServerInboundMessage::Notification( crate::server::routing::ExecServerClientNotification::Initialized, ) => self.initialized(), } } async fn handle_request( &mut self, request: crate::server::routing::ExecServerRequest, ) -> Result<(), String> { let outbound = match request { crate::server::routing::ExecServerRequest::Initialize { request_id, .. } => { Self::request_outbound( request_id, self.initialize() .map(crate::server::routing::ExecServerResponseMessage::Initialize), ) } crate::server::routing::ExecServerRequest::Exec { request_id, params } => { Self::request_outbound( request_id, self.exec(params) .await .map(crate::server::routing::ExecServerResponseMessage::Exec), ) } crate::server::routing::ExecServerRequest::Read { request_id, params } => { Self::request_outbound( request_id, self.read(params) .await .map(crate::server::routing::ExecServerResponseMessage::Read), ) } crate::server::routing::ExecServerRequest::Write { request_id, params } => { Self::request_outbound( request_id, self.write(params) .await .map(crate::server::routing::ExecServerResponseMessage::Write), ) } crate::server::routing::ExecServerRequest::Terminate { request_id, params } => { Self::request_outbound( request_id, self.terminate(params) .await .map(crate::server::routing::ExecServerResponseMessage::Terminate), ) } }; self.outbound_tx .send(outbound) .await .map_err(|_| "outbound channel closed".to_string()) } fn request_outbound( request_id: codex_app_server_protocol::RequestId, result: Result< crate::server::routing::ExecServerResponseMessage, codex_app_server_protocol::JSONRPCErrorError, >, ) -> crate::server::routing::ExecServerOutboundMessage { match result { Ok(response) => crate::server::routing::ExecServerOutboundMessage::Response { request_id, response, }, Err(error) => { crate::server::routing::ExecServerOutboundMessage::Error { request_id, error } } } } } async fn stream_output( process_id: String, stream: ExecOutputStream, mut receiver: tokio::sync::mpsc::Receiver>, outbound_tx: mpsc::Sender, processes: Arc>>, ) { while let Some(chunk) = receiver.recv().await { let notification = { let mut processes = processes.lock().await; let Some(process) = processes.get_mut(&process_id) else { break; }; let seq = process.next_seq; process.next_seq += 1; process.retained_bytes += chunk.len(); process.output.push_back(RetainedOutputChunk { seq, stream, chunk: chunk.clone(), }); while process.retained_bytes > RETAINED_OUTPUT_BYTES_PER_PROCESS { let Some(evicted) = process.output.pop_front() else { break; }; process.retained_bytes = process.retained_bytes.saturating_sub(evicted.chunk.len()); warn!( "retained output cap exceeded for process {process_id}; dropping oldest output" ); } ExecOutputDeltaNotification { process_id: process_id.clone(), stream, chunk: chunk.into(), } }; if outbound_tx .send(ExecServerOutboundMessage::Notification( ExecServerServerNotification::OutputDelta(notification), )) .await .is_err() { break; } } } async fn watch_exit( process_id: String, exit_rx: tokio::sync::oneshot::Receiver, outbound_tx: mpsc::Sender, processes: Arc>>, ) { let exit_code = exit_rx.await.unwrap_or(-1); { let mut processes = processes.lock().await; if let Some(process) = processes.get_mut(&process_id) { process.exit_code = Some(exit_code); } } let _ = outbound_tx .send(ExecServerOutboundMessage::Notification( ExecServerServerNotification::Exited(ExecExitedNotification { process_id, exit_code, }), )) .await; } #[cfg(test)] mod tests { use std::collections::HashMap; use std::collections::VecDeque; use std::time::Duration; use pretty_assertions::assert_eq; use tokio::time::timeout; use super::ExecServerHandler; use super::RetainedOutputChunk; use super::RunningProcess; use crate::protocol::ExecOutputStream; use crate::protocol::InitializeParams; use crate::protocol::InitializeResponse; use crate::protocol::PROTOCOL_VERSION; use crate::protocol::ReadParams; use crate::protocol::TerminateResponse; use crate::protocol::WriteParams; use crate::server::routing::ExecServerClientNotification; use crate::server::routing::ExecServerInboundMessage; use crate::server::routing::ExecServerOutboundMessage; use crate::server::routing::ExecServerRequest; use crate::server::routing::ExecServerResponseMessage; use codex_app_server_protocol::RequestId; async fn recv_outbound( outgoing_rx: &mut tokio::sync::mpsc::Receiver, ) -> ExecServerOutboundMessage { let recv_result = timeout(Duration::from_secs(1), outgoing_rx.recv()).await; let maybe_message = match recv_result { Ok(maybe_message) => maybe_message, Err(err) => panic!("timed out waiting for handler output: {err}"), }; match maybe_message { Some(message) => message, None => panic!("handler output channel closed unexpectedly"), } } #[tokio::test] async fn initialize_response_reports_protocol_version() { let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(1); let mut handler = ExecServerHandler::new(outgoing_tx); if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request( ExecServerRequest::Initialize { request_id: RequestId::Integer(1), params: InitializeParams { client_name: "test".to_string(), }, }, )) .await { panic!("initialize should succeed: {err}"); } assert_eq!( recv_outbound(&mut outgoing_rx).await, ExecServerOutboundMessage::Response { request_id: RequestId::Integer(1), response: ExecServerResponseMessage::Initialize(InitializeResponse { protocol_version: PROTOCOL_VERSION.to_string(), }), } ); } #[tokio::test] async fn exec_methods_require_initialize() { let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(1); let mut handler = ExecServerHandler::new(outgoing_tx); if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec { request_id: RequestId::Integer(7), params: crate::protocol::ExecParams { process_id: "proc-1".to_string(), argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()], cwd: std::env::current_dir().expect("cwd"), env: HashMap::new(), tty: true, arg0: None, }, })) .await { panic!("request handling should not fail the handler: {err}"); } let ExecServerOutboundMessage::Error { request_id, error } = recv_outbound(&mut outgoing_rx).await else { panic!("expected invalid-request error"); }; assert_eq!(request_id, RequestId::Integer(7)); assert_eq!(error.code, -32600); assert_eq!( error.message, "client must call initialize before using exec methods" ); } #[tokio::test] async fn exec_methods_require_initialized_notification_after_initialize() { let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(2); let mut handler = ExecServerHandler::new(outgoing_tx); if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request( ExecServerRequest::Initialize { request_id: RequestId::Integer(1), params: InitializeParams { client_name: "test".to_string(), }, }, )) .await { panic!("initialize should succeed: {err}"); } let _ = recv_outbound(&mut outgoing_rx).await; if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec { request_id: RequestId::Integer(2), params: crate::protocol::ExecParams { process_id: "proc-1".to_string(), argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()], cwd: std::env::current_dir().expect("cwd"), env: HashMap::new(), tty: true, arg0: None, }, })) .await { panic!("request handling should not fail the handler: {err}"); } let ExecServerOutboundMessage::Error { request_id, error } = recv_outbound(&mut outgoing_rx).await else { panic!("expected invalid-request error"); }; assert_eq!(request_id, RequestId::Integer(2)); assert_eq!(error.code, -32600); assert_eq!( error.message, "client must send initialized before using exec methods" ); } #[tokio::test] async fn initialized_before_initialize_is_a_protocol_error() { let (outgoing_tx, _outgoing_rx) = tokio::sync::mpsc::channel(1); let mut handler = ExecServerHandler::new(outgoing_tx); let result = handler .handle_message(ExecServerInboundMessage::Notification( ExecServerClientNotification::Initialized, )) .await; match result { Err(err) => { assert_eq!( err, "received `initialized` notification before `initialize`" ); } Ok(()) => panic!("expected protocol error for early initialized notification"), } } #[tokio::test] async fn initialize_may_only_be_sent_once_per_connection() { let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(2); let mut handler = ExecServerHandler::new(outgoing_tx); if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request( ExecServerRequest::Initialize { request_id: RequestId::Integer(1), params: InitializeParams { client_name: "test".to_string(), }, }, )) .await { panic!("initialize should succeed: {err}"); } let _ = recv_outbound(&mut outgoing_rx).await; if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request( ExecServerRequest::Initialize { request_id: RequestId::Integer(2), params: InitializeParams { client_name: "test".to_string(), }, }, )) .await { panic!("duplicate initialize should not fail the handler: {err}"); } let ExecServerOutboundMessage::Error { request_id, error } = recv_outbound(&mut outgoing_rx).await else { panic!("expected invalid-request error"); }; assert_eq!(request_id, RequestId::Integer(2)); assert_eq!(error.code, -32600); assert_eq!( error.message, "initialize may only be sent once per connection" ); } #[tokio::test] async fn exec_echoes_client_process_ids() { let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(4); let mut handler = ExecServerHandler::new(outgoing_tx); if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request( ExecServerRequest::Initialize { request_id: RequestId::Integer(1), params: InitializeParams { client_name: "test".to_string(), }, }, )) .await { panic!("initialize should succeed: {err}"); } let _ = recv_outbound(&mut outgoing_rx).await; if let Err(err) = handler .handle_message(ExecServerInboundMessage::Notification( ExecServerClientNotification::Initialized, )) .await { panic!("initialized should succeed: {err}"); } let params = crate::protocol::ExecParams { process_id: "proc-1".to_string(), argv: vec![ "bash".to_string(), "-lc".to_string(), "sleep 30".to_string(), ], cwd: std::env::current_dir().expect("cwd"), env: HashMap::new(), tty: false, arg0: None, }; if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec { request_id: RequestId::Integer(2), params: params.clone(), })) .await { panic!("first exec should succeed: {err}"); } let ExecServerOutboundMessage::Response { request_id, response: ExecServerResponseMessage::Exec(first_exec), } = recv_outbound(&mut outgoing_rx).await else { panic!("expected first exec response"); }; assert_eq!(request_id, RequestId::Integer(2)); assert_eq!(first_exec.process_id, "proc-1"); if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec { request_id: RequestId::Integer(3), params: crate::protocol::ExecParams { process_id: "proc-2".to_string(), argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()], ..params }, })) .await { panic!("second exec should succeed: {err}"); } let ExecServerOutboundMessage::Response { request_id, response: ExecServerResponseMessage::Exec(second_exec), } = recv_outbound(&mut outgoing_rx).await else { panic!("expected second exec response"); }; assert_eq!(request_id, RequestId::Integer(3)); assert_eq!(second_exec.process_id, "proc-2"); handler.shutdown().await; } #[tokio::test] async fn writes_to_pipe_backed_processes_are_rejected() { let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(4); let mut handler = ExecServerHandler::new(outgoing_tx); if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request( ExecServerRequest::Initialize { request_id: RequestId::Integer(1), params: InitializeParams { client_name: "test".to_string(), }, }, )) .await { panic!("initialize should succeed: {err}"); } let _ = recv_outbound(&mut outgoing_rx).await; if let Err(err) = handler .handle_message(ExecServerInboundMessage::Notification( ExecServerClientNotification::Initialized, )) .await { panic!("initialized should succeed: {err}"); } if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec { request_id: RequestId::Integer(2), params: crate::protocol::ExecParams { process_id: "proc-1".to_string(), argv: vec![ "bash".to_string(), "-lc".to_string(), "sleep 30".to_string(), ], cwd: std::env::current_dir().expect("cwd"), env: HashMap::new(), tty: false, arg0: None, }, })) .await { panic!("exec should succeed: {err}"); } let ExecServerOutboundMessage::Response { response: ExecServerResponseMessage::Exec(exec_response), .. } = recv_outbound(&mut outgoing_rx).await else { panic!("expected exec response"); }; if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request( ExecServerRequest::Write { request_id: RequestId::Integer(3), params: WriteParams { process_id: exec_response.process_id, chunk: b"hello\n".to_vec().into(), }, }, )) .await { panic!("write should not fail the handler: {err}"); } let ExecServerOutboundMessage::Error { request_id, error } = recv_outbound(&mut outgoing_rx).await else { panic!("expected stdin-closed error"); }; assert_eq!(request_id, RequestId::Integer(3)); assert_eq!(error.code, -32600); assert_eq!(error.message, "stdin is closed for process proc-1"); handler.shutdown().await; } #[tokio::test] async fn writes_to_unknown_processes_are_rejected() { let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(2); let mut handler = ExecServerHandler::new(outgoing_tx); if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request( ExecServerRequest::Initialize { request_id: RequestId::Integer(1), params: InitializeParams { client_name: "test".to_string(), }, }, )) .await { panic!("initialize should succeed: {err}"); } let _ = recv_outbound(&mut outgoing_rx).await; if let Err(err) = handler .handle_message(ExecServerInboundMessage::Notification( ExecServerClientNotification::Initialized, )) .await { panic!("initialized should succeed: {err}"); } if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request( ExecServerRequest::Write { request_id: RequestId::Integer(2), params: WriteParams { process_id: "missing".to_string(), chunk: b"hello\n".to_vec().into(), }, }, )) .await { panic!("write should not fail the handler: {err}"); } let ExecServerOutboundMessage::Error { request_id, error } = recv_outbound(&mut outgoing_rx).await else { panic!("expected unknown-process error"); }; assert_eq!(request_id, RequestId::Integer(2)); assert_eq!(error.code, -32600); assert_eq!(error.message, "unknown process id missing"); } #[tokio::test] async fn terminate_unknown_processes_report_running_false() { let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(2); let mut handler = ExecServerHandler::new(outgoing_tx); if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request( ExecServerRequest::Initialize { request_id: RequestId::Integer(1), params: InitializeParams { client_name: "test".to_string(), }, }, )) .await { panic!("initialize should succeed: {err}"); } let _ = recv_outbound(&mut outgoing_rx).await; if let Err(err) = handler .handle_message(ExecServerInboundMessage::Notification( ExecServerClientNotification::Initialized, )) .await { panic!("initialized should succeed: {err}"); } if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request( ExecServerRequest::Terminate { request_id: RequestId::Integer(2), params: crate::protocol::TerminateParams { process_id: "missing".to_string(), }, }, )) .await { panic!("terminate should not fail the handler: {err}"); } assert_eq!( recv_outbound(&mut outgoing_rx).await, ExecServerOutboundMessage::Response { request_id: RequestId::Integer(2), response: ExecServerResponseMessage::Terminate(TerminateResponse { running: false, }), } ); } #[tokio::test] async fn terminate_keeps_process_ids_reserved() { let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(2); let mut handler = ExecServerHandler::new(outgoing_tx); if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request( ExecServerRequest::Initialize { request_id: RequestId::Integer(1), params: InitializeParams { client_name: "test".to_string(), }, }, )) .await { panic!("initialize should succeed: {err}"); } let _ = recv_outbound(&mut outgoing_rx).await; if let Err(err) = handler .handle_message(ExecServerInboundMessage::Notification( ExecServerClientNotification::Initialized, )) .await { panic!("initialized should succeed: {err}"); } let spawned = codex_utils_pty::spawn_pipe_process_no_stdin( "bash", &["-lc".to_string(), "sleep 30".to_string()], std::env::current_dir().expect("cwd").as_path(), &HashMap::new(), &None, ) .await .expect("spawn test process"); { let mut process_map = handler.processes.lock().await; process_map.insert( "proc-1".to_string(), super::RunningProcess { session: spawned.session, tty: false, output: std::collections::VecDeque::new(), retained_bytes: 0, next_seq: 1, exit_code: None, }, ); } if let Err(err) = handler .handle_message(ExecServerInboundMessage::Request( ExecServerRequest::Terminate { request_id: RequestId::Integer(2), params: crate::protocol::TerminateParams { process_id: "proc-1".to_string(), }, }, )) .await { panic!("terminate should not fail the handler: {err}"); } assert_eq!( recv_outbound(&mut outgoing_rx).await, ExecServerOutboundMessage::Response { request_id: RequestId::Integer(2), response: ExecServerResponseMessage::Terminate(TerminateResponse { running: true }), } ); assert!( handler.processes.lock().await.contains_key("proc-1"), "terminated ids should stay reserved until exit cleanup removes them" ); handler.shutdown().await; } #[tokio::test] async fn read_paginates_retained_output_without_skipping_omitted_chunks() { let (outgoing_tx, _outgoing_rx) = tokio::sync::mpsc::channel(1); let mut handler = ExecServerHandler::new(outgoing_tx); let _ = handler.initialize().expect("initialize should succeed"); handler.initialized().expect("initialized should succeed"); let spawned = codex_utils_pty::spawn_pipe_process_no_stdin( "bash", &["-lc".to_string(), "true".to_string()], std::env::current_dir().expect("cwd").as_path(), &HashMap::new(), &None, ) .await .expect("spawn test process"); { let mut process_map = handler.processes.lock().await; process_map.insert( "proc-1".to_string(), RunningProcess { session: spawned.session, tty: false, output: VecDeque::from([ RetainedOutputChunk { seq: 1, stream: ExecOutputStream::Stdout, chunk: b"abc".to_vec(), }, RetainedOutputChunk { seq: 2, stream: ExecOutputStream::Stderr, chunk: b"def".to_vec(), }, ]), retained_bytes: 6, next_seq: 3, exit_code: None, }, ); } let first = handler .read(ReadParams { process_id: "proc-1".to_string(), after_seq: Some(0), max_bytes: Some(3), wait_ms: Some(0), }) .await .expect("first read should succeed"); assert_eq!(first.chunks.len(), 1); assert_eq!(first.chunks[0].seq, 1); assert_eq!(first.chunks[0].stream, ExecOutputStream::Stdout); assert_eq!(first.chunks[0].chunk.clone().into_inner(), b"abc".to_vec()); assert_eq!(first.next_seq, 2); let second = handler .read(ReadParams { process_id: "proc-1".to_string(), after_seq: Some(first.next_seq - 1), max_bytes: Some(3), wait_ms: Some(0), }) .await .expect("second read should succeed"); assert_eq!(second.chunks.len(), 1); assert_eq!(second.chunks[0].seq, 2); assert_eq!(second.chunks[0].stream, ExecOutputStream::Stderr); assert_eq!(second.chunks[0].chunk.clone().into_inner(), b"def".to_vec()); assert_eq!(second.next_seq, 3); handler.shutdown().await; } }