codex: address PR review feedback (#14862)

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-03-17 18:05:24 +00:00
parent 25481592c2
commit 30e094e34a
2 changed files with 243 additions and 18 deletions

View File

@@ -330,13 +330,21 @@ impl ExecServerClient {
let process_id = params.process_id.clone();
let status = Arc::new(RemoteProcessStatus::new());
let (output_tx, output_rx) = broadcast::channel(256);
self.inner.processes.lock().await.insert(
process_id.clone(),
RegisteredProcess {
output_tx,
status: Arc::clone(&status),
},
);
{
let mut processes = self.inner.processes.lock().await;
if processes.contains_key(&process_id) {
return Err(ExecServerError::Protocol(format!(
"process `{process_id}` already exists"
)));
}
processes.insert(
process_id.clone(),
RegisteredProcess {
output_tx,
status: Arc::clone(&status),
},
);
}
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
let client = self.clone();
@@ -1083,6 +1091,115 @@ mod tests {
);
}
#[tokio::test]
async fn start_process_rejects_duplicate_local_ids_without_orphaning_existing_process() {
let (client_stdin, server_reader) = tokio::io::duplex(4096);
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
let initialize = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(initialize_request) = initialize else {
panic!("expected initialize request");
};
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: initialize_request.id,
result: serde_json::json!({ "protocolVersion": PROTOCOL_VERSION }),
}),
)
.await;
let initialized = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Notification(notification) = initialized else {
panic!("expected initialized notification");
};
assert_eq!(notification.method, INITIALIZED_METHOD);
let exec_request = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(JSONRPCRequest { id, method, .. }) = exec_request else {
panic!("expected exec request");
};
assert_eq!(method, EXEC_METHOD);
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id,
result: serde_json::json!({ "processId": "proc-1" }),
}),
)
.await;
tokio::time::sleep(Duration::from_millis(25)).await;
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Notification(JSONRPCNotification {
method: EXEC_OUTPUT_DELTA_METHOD.to_string(),
params: Some(serde_json::json!({
"processId": "proc-1",
"stream": "stdout",
"chunk": "YWxpdmUK"
})),
}),
)
.await;
});
let client = match ExecServerClient::connect_stdio(
client_stdin,
client_stdout,
test_options(),
)
.await
{
Ok(client) => client,
Err(err) => panic!("failed to connect test client: {err}"),
};
let first_process = match client
.start_process(ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
cwd: std::env::current_dir().unwrap_or_else(|err| panic!("missing cwd: {err}")),
env: HashMap::new(),
tty: true,
arg0: None,
})
.await
{
Ok(process) => process,
Err(err) => panic!("failed to start first process: {err}"),
};
let duplicate_result = client
.start_process(ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
cwd: std::env::current_dir().unwrap_or_else(|err| panic!("missing cwd: {err}")),
env: HashMap::new(),
tty: true,
arg0: None,
})
.await;
match duplicate_result {
Err(ExecServerError::Protocol(message)) => {
assert_eq!(message, "process `proc-1` already exists");
}
Err(err) => panic!("unexpected duplicate start failure: {err}"),
Ok(_) => panic!("expected local duplicate rejection"),
}
let mut output = first_process.output_receiver();
let output = timeout(Duration::from_secs(1), output.recv())
.await
.unwrap_or_else(|err| panic!("timed out waiting for process output: {err}"))
.unwrap_or_else(|err| panic!("failed to receive process output: {err}"));
assert_eq!(output.stream, ExecOutputStream::Stdout);
assert_eq!(output.chunk, b"alive\n".to_vec());
}
#[tokio::test]
async fn transport_shutdown_marks_processes_exited_without_exit_codes() {
let (client_stdin, server_reader) = tokio::io::duplex(4096);

View File

@@ -176,6 +176,16 @@ impl ExecServerHandler {
.split_first()
.ok_or_else(|| invalid_params("argv must not be empty".to_string()))?;
let process_id = params.process_id.clone();
{
let process_map = self.processes.lock().await;
if process_map.contains_key(&process_id) {
return Err(invalid_request(format!(
"process {process_id} already exists"
)));
}
}
let spawned = if params.tty {
codex_utils_pty::spawn_pty_process(
program,
@@ -198,7 +208,6 @@ impl ExecServerHandler {
}
.map_err(|err| internal_error(err.to_string()))?;
let process_id = params.process_id.clone();
{
let mut process_map = self.processes.lock().await;
if process_map.contains_key(&process_id) {
@@ -268,17 +277,17 @@ impl ExecServerHandler {
&self,
params: crate::protocol::TerminateParams,
) -> Result<TerminateResponse, codex_app_server_protocol::JSONRPCErrorError> {
let process = {
let mut process_map = self.processes.lock().await;
process_map.remove(&params.process_id)
let running = {
let process_map = self.processes.lock().await;
if let Some(process) = process_map.get(&params.process_id) {
process.session.terminate();
true
} else {
false
}
};
Ok(if let Some(process) = process {
process.session.terminate();
TerminateResponse { running: true }
} else {
TerminateResponse { running: false }
})
Ok(TerminateResponse { running })
}
async fn send_request_result(
@@ -568,6 +577,15 @@ mod tests {
async fn duplicate_process_ids_are_rejected_per_connection() {
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(4);
let mut handler = ExecServerHandler::new(outgoing_tx);
let marker_path = std::env::temp_dir().join(format!(
"codex-exec-server-duplicate-{}-{}",
std::process::id(),
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system clock before unix epoch")
.as_nanos()
));
let _ = std::fs::remove_file(&marker_path);
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
@@ -626,7 +644,14 @@ mod tests {
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec {
request_id: RequestId::Integer(3),
params,
params: crate::protocol::ExecParams {
argv: vec![
"bash".to_string(),
"-lc".to_string(),
format!("printf duplicate > {}", marker_path.display()),
],
..params
},
}))
.await
{
@@ -641,8 +666,13 @@ mod tests {
assert_eq!(request_id, RequestId::Integer(3));
assert_eq!(error.code, -32600);
assert_eq!(error.message, "process proc-1 already exists");
assert!(
!marker_path.exists(),
"duplicate process ids must be rejected before spawning the command"
);
handler.shutdown().await;
let _ = std::fs::remove_file(&marker_path);
}
#[tokio::test]
@@ -827,4 +857,82 @@ mod tests {
}
);
}
#[tokio::test]
async fn terminate_keeps_process_ids_reserved_until_exit_cleanup() {
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(2);
let mut handler = ExecServerHandler::new(outgoing_tx);
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
ExecServerRequest::Initialize {
request_id: RequestId::Integer(1),
params: InitializeParams {
client_name: "test".to_string(),
},
},
))
.await
{
panic!("initialize should succeed: {err}");
}
let _ = recv_outbound(&mut outgoing_rx).await;
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Notification(
ExecServerClientNotification::Initialized,
))
.await
{
panic!("initialized should succeed: {err}");
}
let spawned = codex_utils_pty::spawn_pipe_process_no_stdin(
"bash",
&["-lc".to_string(), "sleep 30".to_string()],
std::env::current_dir().expect("cwd").as_path(),
&HashMap::new(),
&None,
)
.await
.expect("spawn test process");
{
let mut process_map = handler.processes.lock().await;
process_map.insert(
"proc-1".to_string(),
super::RunningProcess {
session: spawned.session,
tty: false,
},
);
}
if let Err(err) = handler
.handle_message(ExecServerInboundMessage::Request(
ExecServerRequest::Terminate {
request_id: RequestId::Integer(2),
params: crate::protocol::TerminateParams {
process_id: "proc-1".to_string(),
},
},
))
.await
{
panic!("terminate should not fail the handler: {err}");
}
assert_eq!(
recv_outbound(&mut outgoing_rx).await,
ExecServerOutboundMessage::Response {
request_id: RequestId::Integer(2),
response: ExecServerResponseMessage::Terminate(TerminateResponse { running: true }),
}
);
assert!(
handler.processes.lock().await.contains_key("proc-1"),
"terminated ids should stay reserved until exit cleanup removes them"
);
handler.shutdown().await;
}
}