mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
fix: flaky tests
This commit is contained in:
@@ -61,7 +61,7 @@ pub struct McpProcess {
|
||||
process: Child,
|
||||
stdin: ChildStdin,
|
||||
stdout: BufReader<ChildStdout>,
|
||||
pending_user_messages: VecDeque<JSONRPCNotification>,
|
||||
pending_notifications: VecDeque<JSONRPCNotification>,
|
||||
}
|
||||
|
||||
impl McpProcess {
|
||||
@@ -132,7 +132,7 @@ impl McpProcess {
|
||||
process,
|
||||
stdin,
|
||||
stdout,
|
||||
pending_user_messages: VecDeque::new(),
|
||||
pending_notifications: VecDeque::new(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -484,6 +484,10 @@ impl McpProcess {
|
||||
method: &str,
|
||||
params: Option<serde_json::Value>,
|
||||
) -> anyhow::Result<i64> {
|
||||
// Drop any stale notifications from previous requests so callers don't
|
||||
// accidentally consume old events when waiting for the next response.
|
||||
self.pending_notifications.clear();
|
||||
|
||||
let request_id = self.next_request_id.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
let message = JSONRPCMessage::Request(JSONRPCRequest {
|
||||
@@ -546,7 +550,7 @@ impl McpProcess {
|
||||
match message {
|
||||
JSONRPCMessage::Notification(notification) => {
|
||||
eprintln!("notification: {notification:?}");
|
||||
self.enqueue_user_message(notification);
|
||||
self.enqueue_notification(notification);
|
||||
}
|
||||
JSONRPCMessage::Request(jsonrpc_request) => {
|
||||
return jsonrpc_request.try_into().with_context(
|
||||
@@ -574,7 +578,7 @@ impl McpProcess {
|
||||
match message {
|
||||
JSONRPCMessage::Notification(notification) => {
|
||||
eprintln!("notification: {notification:?}");
|
||||
self.enqueue_user_message(notification);
|
||||
self.enqueue_notification(notification);
|
||||
}
|
||||
JSONRPCMessage::Request(_) => {
|
||||
anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}");
|
||||
@@ -600,7 +604,7 @@ impl McpProcess {
|
||||
match message {
|
||||
JSONRPCMessage::Notification(notification) => {
|
||||
eprintln!("notification: {notification:?}");
|
||||
self.enqueue_user_message(notification);
|
||||
self.enqueue_notification(notification);
|
||||
}
|
||||
JSONRPCMessage::Request(_) => {
|
||||
anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}");
|
||||
@@ -634,7 +638,7 @@ impl McpProcess {
|
||||
if notification.method == method {
|
||||
return Ok(notification);
|
||||
}
|
||||
self.enqueue_user_message(notification);
|
||||
self.enqueue_notification(notification);
|
||||
}
|
||||
JSONRPCMessage::Request(_) => {
|
||||
anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}");
|
||||
@@ -651,18 +655,16 @@ impl McpProcess {
|
||||
|
||||
fn take_pending_notification_by_method(&mut self, method: &str) -> Option<JSONRPCNotification> {
|
||||
if let Some(pos) = self
|
||||
.pending_user_messages
|
||||
.pending_notifications
|
||||
.iter()
|
||||
.position(|notification| notification.method == method)
|
||||
{
|
||||
return self.pending_user_messages.remove(pos);
|
||||
return self.pending_notifications.remove(pos);
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
fn enqueue_user_message(&mut self, notification: JSONRPCNotification) {
|
||||
if notification.method == "codex/event/user_message" {
|
||||
self.pending_user_messages.push_back(notification);
|
||||
}
|
||||
fn enqueue_notification(&mut self, notification: JSONRPCNotification) {
|
||||
self.pending_notifications.push_back(notification);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user