mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
codex: address PR review feedback (#14862)
Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -330,13 +330,21 @@ impl ExecServerClient {
|
||||
let process_id = params.process_id.clone();
|
||||
let status = Arc::new(RemoteProcessStatus::new());
|
||||
let (output_tx, output_rx) = broadcast::channel(256);
|
||||
self.inner.processes.lock().await.insert(
|
||||
process_id.clone(),
|
||||
RegisteredProcess {
|
||||
output_tx,
|
||||
status: Arc::clone(&status),
|
||||
},
|
||||
);
|
||||
{
|
||||
let mut processes = self.inner.processes.lock().await;
|
||||
if processes.contains_key(&process_id) {
|
||||
return Err(ExecServerError::Protocol(format!(
|
||||
"process `{process_id}` already exists"
|
||||
)));
|
||||
}
|
||||
processes.insert(
|
||||
process_id.clone(),
|
||||
RegisteredProcess {
|
||||
output_tx,
|
||||
status: Arc::clone(&status),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
|
||||
let client = self.clone();
|
||||
@@ -1083,6 +1091,115 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn start_process_rejects_duplicate_local_ids_without_orphaning_existing_process() {
|
||||
let (client_stdin, server_reader) = tokio::io::duplex(4096);
|
||||
let (mut server_writer, client_stdout) = tokio::io::duplex(4096);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut lines = BufReader::new(server_reader).lines();
|
||||
|
||||
let initialize = read_jsonrpc_line(&mut lines).await;
|
||||
let JSONRPCMessage::Request(initialize_request) = initialize else {
|
||||
panic!("expected initialize request");
|
||||
};
|
||||
write_jsonrpc_line(
|
||||
&mut server_writer,
|
||||
JSONRPCMessage::Response(JSONRPCResponse {
|
||||
id: initialize_request.id,
|
||||
result: serde_json::json!({ "protocolVersion": PROTOCOL_VERSION }),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
|
||||
let initialized = read_jsonrpc_line(&mut lines).await;
|
||||
let JSONRPCMessage::Notification(notification) = initialized else {
|
||||
panic!("expected initialized notification");
|
||||
};
|
||||
assert_eq!(notification.method, INITIALIZED_METHOD);
|
||||
|
||||
let exec_request = read_jsonrpc_line(&mut lines).await;
|
||||
let JSONRPCMessage::Request(JSONRPCRequest { id, method, .. }) = exec_request else {
|
||||
panic!("expected exec request");
|
||||
};
|
||||
assert_eq!(method, EXEC_METHOD);
|
||||
write_jsonrpc_line(
|
||||
&mut server_writer,
|
||||
JSONRPCMessage::Response(JSONRPCResponse {
|
||||
id,
|
||||
result: serde_json::json!({ "processId": "proc-1" }),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
tokio::time::sleep(Duration::from_millis(25)).await;
|
||||
write_jsonrpc_line(
|
||||
&mut server_writer,
|
||||
JSONRPCMessage::Notification(JSONRPCNotification {
|
||||
method: EXEC_OUTPUT_DELTA_METHOD.to_string(),
|
||||
params: Some(serde_json::json!({
|
||||
"processId": "proc-1",
|
||||
"stream": "stdout",
|
||||
"chunk": "YWxpdmUK"
|
||||
})),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
});
|
||||
|
||||
let client = match ExecServerClient::connect_stdio(
|
||||
client_stdin,
|
||||
client_stdout,
|
||||
test_options(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(client) => client,
|
||||
Err(err) => panic!("failed to connect test client: {err}"),
|
||||
};
|
||||
|
||||
let first_process = match client
|
||||
.start_process(ExecParams {
|
||||
process_id: "proc-1".to_string(),
|
||||
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
|
||||
cwd: std::env::current_dir().unwrap_or_else(|err| panic!("missing cwd: {err}")),
|
||||
env: HashMap::new(),
|
||||
tty: true,
|
||||
arg0: None,
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(process) => process,
|
||||
Err(err) => panic!("failed to start first process: {err}"),
|
||||
};
|
||||
|
||||
let duplicate_result = client
|
||||
.start_process(ExecParams {
|
||||
process_id: "proc-1".to_string(),
|
||||
argv: vec!["bash".to_string(), "-lc".to_string(), "true".to_string()],
|
||||
cwd: std::env::current_dir().unwrap_or_else(|err| panic!("missing cwd: {err}")),
|
||||
env: HashMap::new(),
|
||||
tty: true,
|
||||
arg0: None,
|
||||
})
|
||||
.await;
|
||||
|
||||
match duplicate_result {
|
||||
Err(ExecServerError::Protocol(message)) => {
|
||||
assert_eq!(message, "process `proc-1` already exists");
|
||||
}
|
||||
Err(err) => panic!("unexpected duplicate start failure: {err}"),
|
||||
Ok(_) => panic!("expected local duplicate rejection"),
|
||||
}
|
||||
|
||||
let mut output = first_process.output_receiver();
|
||||
let output = timeout(Duration::from_secs(1), output.recv())
|
||||
.await
|
||||
.unwrap_or_else(|err| panic!("timed out waiting for process output: {err}"))
|
||||
.unwrap_or_else(|err| panic!("failed to receive process output: {err}"));
|
||||
assert_eq!(output.stream, ExecOutputStream::Stdout);
|
||||
assert_eq!(output.chunk, b"alive\n".to_vec());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn transport_shutdown_marks_processes_exited_without_exit_codes() {
|
||||
let (client_stdin, server_reader) = tokio::io::duplex(4096);
|
||||
|
||||
@@ -176,6 +176,16 @@ impl ExecServerHandler {
|
||||
.split_first()
|
||||
.ok_or_else(|| invalid_params("argv must not be empty".to_string()))?;
|
||||
|
||||
let process_id = params.process_id.clone();
|
||||
{
|
||||
let process_map = self.processes.lock().await;
|
||||
if process_map.contains_key(&process_id) {
|
||||
return Err(invalid_request(format!(
|
||||
"process {process_id} already exists"
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
let spawned = if params.tty {
|
||||
codex_utils_pty::spawn_pty_process(
|
||||
program,
|
||||
@@ -198,7 +208,6 @@ impl ExecServerHandler {
|
||||
}
|
||||
.map_err(|err| internal_error(err.to_string()))?;
|
||||
|
||||
let process_id = params.process_id.clone();
|
||||
{
|
||||
let mut process_map = self.processes.lock().await;
|
||||
if process_map.contains_key(&process_id) {
|
||||
@@ -268,17 +277,17 @@ impl ExecServerHandler {
|
||||
&self,
|
||||
params: crate::protocol::TerminateParams,
|
||||
) -> Result<TerminateResponse, codex_app_server_protocol::JSONRPCErrorError> {
|
||||
let process = {
|
||||
let mut process_map = self.processes.lock().await;
|
||||
process_map.remove(¶ms.process_id)
|
||||
let running = {
|
||||
let process_map = self.processes.lock().await;
|
||||
if let Some(process) = process_map.get(¶ms.process_id) {
|
||||
process.session.terminate();
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
Ok(if let Some(process) = process {
|
||||
process.session.terminate();
|
||||
TerminateResponse { running: true }
|
||||
} else {
|
||||
TerminateResponse { running: false }
|
||||
})
|
||||
Ok(TerminateResponse { running })
|
||||
}
|
||||
|
||||
async fn send_request_result(
|
||||
@@ -568,6 +577,15 @@ mod tests {
|
||||
async fn duplicate_process_ids_are_rejected_per_connection() {
|
||||
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(4);
|
||||
let mut handler = ExecServerHandler::new(outgoing_tx);
|
||||
let marker_path = std::env::temp_dir().join(format!(
|
||||
"codex-exec-server-duplicate-{}-{}",
|
||||
std::process::id(),
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.expect("system clock before unix epoch")
|
||||
.as_nanos()
|
||||
));
|
||||
let _ = std::fs::remove_file(&marker_path);
|
||||
|
||||
if let Err(err) = handler
|
||||
.handle_message(ExecServerInboundMessage::Request(
|
||||
@@ -626,7 +644,14 @@ mod tests {
|
||||
if let Err(err) = handler
|
||||
.handle_message(ExecServerInboundMessage::Request(ExecServerRequest::Exec {
|
||||
request_id: RequestId::Integer(3),
|
||||
params,
|
||||
params: crate::protocol::ExecParams {
|
||||
argv: vec![
|
||||
"bash".to_string(),
|
||||
"-lc".to_string(),
|
||||
format!("printf duplicate > {}", marker_path.display()),
|
||||
],
|
||||
..params
|
||||
},
|
||||
}))
|
||||
.await
|
||||
{
|
||||
@@ -641,8 +666,13 @@ mod tests {
|
||||
assert_eq!(request_id, RequestId::Integer(3));
|
||||
assert_eq!(error.code, -32600);
|
||||
assert_eq!(error.message, "process proc-1 already exists");
|
||||
assert!(
|
||||
!marker_path.exists(),
|
||||
"duplicate process ids must be rejected before spawning the command"
|
||||
);
|
||||
|
||||
handler.shutdown().await;
|
||||
let _ = std::fs::remove_file(&marker_path);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -827,4 +857,82 @@ mod tests {
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn terminate_keeps_process_ids_reserved_until_exit_cleanup() {
|
||||
let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(2);
|
||||
let mut handler = ExecServerHandler::new(outgoing_tx);
|
||||
|
||||
if let Err(err) = handler
|
||||
.handle_message(ExecServerInboundMessage::Request(
|
||||
ExecServerRequest::Initialize {
|
||||
request_id: RequestId::Integer(1),
|
||||
params: InitializeParams {
|
||||
client_name: "test".to_string(),
|
||||
},
|
||||
},
|
||||
))
|
||||
.await
|
||||
{
|
||||
panic!("initialize should succeed: {err}");
|
||||
}
|
||||
let _ = recv_outbound(&mut outgoing_rx).await;
|
||||
if let Err(err) = handler
|
||||
.handle_message(ExecServerInboundMessage::Notification(
|
||||
ExecServerClientNotification::Initialized,
|
||||
))
|
||||
.await
|
||||
{
|
||||
panic!("initialized should succeed: {err}");
|
||||
}
|
||||
|
||||
let spawned = codex_utils_pty::spawn_pipe_process_no_stdin(
|
||||
"bash",
|
||||
&["-lc".to_string(), "sleep 30".to_string()],
|
||||
std::env::current_dir().expect("cwd").as_path(),
|
||||
&HashMap::new(),
|
||||
&None,
|
||||
)
|
||||
.await
|
||||
.expect("spawn test process");
|
||||
{
|
||||
let mut process_map = handler.processes.lock().await;
|
||||
process_map.insert(
|
||||
"proc-1".to_string(),
|
||||
super::RunningProcess {
|
||||
session: spawned.session,
|
||||
tty: false,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
if let Err(err) = handler
|
||||
.handle_message(ExecServerInboundMessage::Request(
|
||||
ExecServerRequest::Terminate {
|
||||
request_id: RequestId::Integer(2),
|
||||
params: crate::protocol::TerminateParams {
|
||||
process_id: "proc-1".to_string(),
|
||||
},
|
||||
},
|
||||
))
|
||||
.await
|
||||
{
|
||||
panic!("terminate should not fail the handler: {err}");
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
recv_outbound(&mut outgoing_rx).await,
|
||||
ExecServerOutboundMessage::Response {
|
||||
request_id: RequestId::Integer(2),
|
||||
response: ExecServerResponseMessage::Terminate(TerminateResponse { running: true }),
|
||||
}
|
||||
);
|
||||
|
||||
assert!(
|
||||
handler.processes.lock().await.contains_key("proc-1"),
|
||||
"terminated ids should stay reserved until exit cleanup removes them"
|
||||
);
|
||||
|
||||
handler.shutdown().await;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user