improve test readability

This commit is contained in:
Ahmed Ibrahim
2025-08-01 18:42:25 -07:00
parent 0110749efa
commit a5b3c151ac
4 changed files with 88 additions and 76 deletions

View File

@@ -9,7 +9,6 @@ use crate::mcp_protocol::InitialStateNotificationParams;
use crate::mcp_protocol::InitialStatePayload;
use crate::mcp_protocol::NotificationMeta;
use crate::outgoing_message::OutgoingMessageSender;
use crate::outgoing_message::OutgoingNotificationMeta;
use crate::patch_approval::handle_patch_approval_request;
use codex_core::Codex;
use codex_core::protocol::AgentMessageEvent;
@@ -47,12 +46,15 @@ pub async fn run_conversation_loop(
buffered_events.push(CodexEventNotificationParams { meta: None, msg: event.msg.clone() });
if streaming_enabled {
outgoing
.send_event_as_notification(
&event,
Some(OutgoingNotificationMeta::new(Some(request_id.clone()))),
)
.await;
let method = event.msg.to_string();
let params = CodexEventNotificationParams { meta: None, msg: event.msg.clone() };
if let Ok(params_val) = serde_json::to_value(&params) {
outgoing
.send_custom_notification(&method, params_val)
.await;
} else {
error!("Failed to serialize event params");
}
}
match event.msg {
@@ -154,7 +156,7 @@ pub async fn run_conversation_loop(
streaming_enabled = true;
// Emit InitialState with all buffered events
let params = InitialStateNotificationParams {
meta: Some(NotificationMeta { conversation_id: Some(ConversationId(session_id)), request_id: Some(request_id.clone()) }),
meta: Some(NotificationMeta { conversation_id: Some(ConversationId(session_id)), request_id: None }),
initial_state: InitialStatePayload { events: buffered_events.clone() },
};
if let Ok(params_val) = serde_json::to_value(&params) {

View File

@@ -109,7 +109,7 @@ impl OutgoingMessageSender {
// should be backwards compatible.
// it will replace send_event_as_notification eventually.
async fn send_event_as_notification_new_schema(
pub(crate) async fn send_event_as_notification_new_schema(
&self,
event: &Event,
params: Option<serde_json::Value>,

View File

@@ -449,50 +449,23 @@ impl McpProcess {
pub async fn read_stream_until_configured_response_message(
&mut self,
) -> anyhow::Result<String> {
let mut sid_old: Option<String> = None;
let mut sid_new: Option<String> = None;
loop {
let message = self.read_jsonrpc_message().await?;
eprint!("message: {message:?}");
match message {
JSONRPCMessage::Notification(notification) => {
if let Some(params) = notification.params {
// Back-compat schema: method == "codex/event" and msg.type == "session_configured"
if notification.method == "codex/event" {
if let Some(msg) = params.get("msg") {
if msg.get("type").and_then(|v| v.as_str())
== Some("session_configured")
{
if let Some(session_id) =
msg.get("session_id").and_then(|v| v.as_str())
{
sid_old = Some(session_id.to_string());
}
}
}
}
// New schema: method is the Display of EventMsg::SessionConfigured => "SessionConfigured"
if notification.method == "session_configured" {
if notification.method == "session_configured" {
if let Some(params) = notification.params {
if let Some(msg) = params.get("msg") {
if let Some(session_id) =
msg.get("session_id").and_then(|v| v.as_str())
{
sid_new = Some(session_id.to_string());
return Ok(session_id.to_string());
}
}
}
}
if sid_old.is_some() && sid_new.is_some() {
// Both seen, they must match
assert_eq!(
sid_old.as_ref().unwrap(),
sid_new.as_ref().unwrap(),
"session_id mismatch between old and new schema"
);
return Ok(sid_old.unwrap());
}
}
JSONRPCMessage::Request(_) => {
anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}");

View File

@@ -38,15 +38,19 @@ async fn test_connect_then_send_receives_initial_state_and_notifications() {
.expect("create conversation");
// Connect the stream
let (_stream_req, params) = mcp
let (stream_req, params) = mcp
.connect_stream_and_expect_initial_state(&conv_id)
.await
.expect("initial_state params");
assert_eq!(
params["_meta"]["conversationId"].as_str(),
Some(conv_id.as_str())
);
assert_eq!(params["initial_state"], json!({ "events": [] }));
let expected_params = json!({
"_meta": {
"conversationId": conv_id.as_str(),
},
"initial_state": {
"events": []
}
});
assert_eq!(params, expected_params);
// Send a message and expect a subsequent notification (non-initial_state)
mcp.send_user_message_and_wait_ok("Hello there", &conv_id)
@@ -55,10 +59,13 @@ async fn test_connect_then_send_receives_initial_state_and_notifications() {
// Read until we see an event notification (new schema example: agent_message)
let params = mcp.wait_for_agent_message().await.expect("agent message");
assert_eq!(
params["msg"],
json!({ "type": "agent_message", "message": "Done" })
);
let expected_params = json!({
"msg": {
"type": "agent_message",
"message": "Done"
}
});
assert_eq!(params, expected_params);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -98,28 +105,34 @@ async fn test_send_then_connect_receives_initial_state_with_message() {
let events = params["initial_state"]["events"]
.as_array()
.expect("events array");
let mut agent_events: Vec<_> = events
.iter()
.filter(|ev| ev["msg"]["type"].as_str() == Some("agent_message"))
.cloned()
.collect();
if agent_events.is_empty() {
// Fallback to live notification if not present in initial state, then assert the full event list
if !events.iter().any(|ev| {
ev.get("msg")
.and_then(|m| m.get("type"))
.and_then(|t| t.as_str())
== Some("agent_message")
&& ev
.get("msg")
.and_then(|m| m.get("message"))
.and_then(|t| t.as_str())
== Some("Done")
}) {
// Fallback to live notification if not present in initial state
let note: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_method("agent_message"),
)
.await
.expect("agent_message note timeout")
.expect("agent_message note err");
let p = note.params.expect("params");
agent_events.push(json!({ "msg": { "type": "agent_message", "message": p["msg"]["message"].as_str().expect("message str") } }));
.expect("event note timeout")
.expect("event note err");
let params = note.params.expect("params");
let expected_params = json!({
"msg": {
"type": "agent_message",
"message": "Done"
}
});
assert_eq!(params, expected_params);
}
let expected = vec![json!({ "msg": { "type": "agent_message", "message": "Done" } })];
assert_eq!(
agent_events, expected,
"initial_state agent_message events should match exactly"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -189,19 +202,43 @@ async fn test_cancel_stream_then_reconnect_catches_up_initial_state() {
let events = params["initial_state"]["events"]
.as_array()
.expect("events array");
let agent_events: Vec<_> = events
.iter()
.filter(|ev| ev["msg"]["type"].as_str() == Some("agent_message"))
.cloned()
.collect();
let expected = vec![
json!({ "msg": { "type": "agent_message", "message": "Done 1" } }),
json!({ "msg": { "type": "agent_message", "message": "Done 2" } }),
json!({
"msg": {
"type": "task_started",
},
}),
json!({
"msg": {
"message": "Done 1",
"type": "agent_message",
},
}),
json!({
"msg": {
"last_agent_message": "Done 1",
"type": "task_complete",
},
}),
json!({
"msg": {
"type": "task_started",
},
}),
json!({
"msg": {
"message": "Done 2",
"type": "agent_message",
},
}),
json!({
"msg": {
"last_agent_message": "Done 2",
"type": "task_complete",
},
}),
];
assert_eq!(
agent_events, expected,
"initial_state agent_message events should match exactly"
);
assert_eq!(*events, expected);
drop(server);
}