codex: address PR review feedback (#14862)

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
starr-openai
2026-03-17 18:05:24 +00:00
parent 008d096dc2
commit adfc5aa0c5
2 changed files with 243 additions and 18 deletions

View File

@@ -330,13 +330,21 @@ impl ExecServerClient {
let process_id = params.process_id.clone();
let status = Arc::new(RemoteProcessStatus::new());
let (output_tx, output_rx) = broadcast::channel(256);
self.inner.processes.lock().await.insert(
process_id.clone(),
RegisteredProcess {
output_tx,
status: Arc::clone(&status),
},
);
{
let mut processes = self.inner.processes.lock().await;
if processes.contains_key(&process_id) {
return Err(ExecServerError::Protocol(format!(
"process `{process_id}` already exists"
)));
}
processes.insert(
process_id.clone(),
RegisteredProcess {
output_tx,
status: Arc::clone(&status),
},
);
}
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
let client = self.clone();
@@ -1083,6 +1091,115 @@ mod tests {
);
}
#[tokio::test]
async fn start_process_rejects_duplicate_local_ids_without_orphaning_existing_process() {
let (client_stdin, server_reader) = tokio::io::duplex(4096);
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
tokio::spawn(async move {
let mut lines = BufReader::new(server_reader).lines();
let initialize = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(initialize_request) = initialize else {
panic!("expected initialize request");
};
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id: initialize_request.id,
result: serde_json::json!({ "protocolVersion": PROTOCOL_VERSION }),
}),
)
.await;
let initialized = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Notification(notification) = initialized else {
panic!("expected initialized notification");
};
assert_eq!(notification.method, INITIALIZED_METHOD);
let exec_request = read_jsonrpc_line(&mut lines).await;
let JSONRPCMessage::Request(JSONRPCRequest { id, method, .. }) = exec_request else {
panic!("expected exec request");
};
assert_eq!(method, EXEC_METHOD);
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Response(JSONRPCResponse {
id,
result: serde_json::json!({ "processId": "proc-1" }),
}),
)
.await;
tokio::time::sleep(Duration::from_millis(25)).await;
write_jsonrpc_line(
&mut server_writer,
JSONRPCMessage::Notification(JSONRPCNotification {
method: EXEC_OUTPUT_DELTA_METHOD.to_string(),
params: Some(serde_json::json!({
"processId": "proc-1",
"stream": "stdout",
"chunk": "YWxpdmUK"
})),
}),
)
.await;
});
let client = match ExecServerClient::connect_stdio(
client_stdin,
client_stdout,
test_options(),
)
.await
{
Ok(client) => client,
Err(err) => panic!("failed to connect test client: {err}"),
};
let first_process = match client
.start_process(ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
cwd: std::env::current_dir().unwrap_or_else(|err| panic!("missing cwd: {err}")),
env: HashMap::new(),
tty: true,
arg0: None,
})
.await
{
Ok(process) => process,
Err(err) => panic!("failed to start first process: {err}"),
};
let duplicate_result = client
.start_process(ExecParams {
process_id: "proc-1".to_string(),
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
cwd: std::env::current_dir().unwrap_or_else(|err| panic!("missing cwd: {err}")),
env: HashMap::new(),
tty: true,
arg0: None,
})
.await;
match duplicate_result {
Err(ExecServerError::Protocol(message)) => {
assert_eq!(message, "process `proc-1` already exists");
}
Err(err) => panic!("unexpected duplicate start failure: {err}"),
Ok(_) => panic!("expected local duplicate rejection"),
}
let mut output = first_process.output_receiver();
let output = timeout(Duration::from_secs(1), output.recv())
.await
.unwrap_or_else(|err| panic!("timed out waiting for process output: {err}"))
.unwrap_or_else(|err| panic!("failed to receive process output: {err}"));
assert_eq!(output.stream, ExecOutputStream::Stdout);
assert_eq!(output.chunk, b"alive\n".to_vec());
}
#[tokio::test]
async fn transport_shutdown_marks_processes_exited_without_exit_codes() {
let (client_stdin, server_reader) = tokio::io::duplex(4096);