This commit is contained in:
Ahmed Ibrahim
2025-08-01 17:56:57 -07:00
parent 4c13829e8b
commit bea4a5358a
3 changed files with 35 additions and 4 deletions

View File

@@ -1,4 +1,6 @@
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::exec_approval::handle_exec_approval_request;
use crate::mcp_protocol::CodexEventNotificationParams;
@@ -25,6 +27,7 @@ pub async fn run_conversation_loop(
request_id: RequestId,
mut stream_rx: WatchReceiver<bool>,
session_id: Uuid,
running_sessions: Arc<Mutex<HashSet<Uuid>>>,
) {
let request_id_str = match &request_id {
RequestId::String(s) => s.clone(),
@@ -99,7 +102,11 @@ pub async fn run_conversation_loop(
}
continue;
}
EventMsg::TaskComplete(_) => {}
EventMsg::TaskComplete(_) => {
// remove running session id
let mut running_sessions = running_sessions.lock().await;
running_sessions.remove(&session_id);
}
EventMsg::SessionConfigured(_) => {
tracing::error!("unexpected SessionConfigured event");
}

View File

@@ -140,8 +140,17 @@ pub(crate) async fn handle_create_conversation(
// Run the conversation loop in the background so this request can return immediately.
let outgoing = message_processor.outgoing();
let spawn_id = id.clone();
let running_sessions = message_processor.running_session_ids();
tokio::spawn(async move {
run_conversation_loop(codex_arc.clone(), outgoing, spawn_id, stream_rx, session_id).await;
run_conversation_loop(
codex_arc.clone(),
outgoing,
spawn_id,
stream_rx,
session_id,
running_sessions,
)
.await;
});
// Reply with the new conversation id and effective model

View File

@@ -130,7 +130,10 @@ async fn test_send_then_connect_receives_initial_state_with_message() {
async fn test_cancel_stream_then_reconnect_catches_up_initial_state() {
// One response is sufficient for the assertions in this test
let responses = vec![
create_final_assistant_message_sse_response("Done").expect("build mock assistant message"),
create_final_assistant_message_sse_response("Done 1")
.expect("build mock assistant message"),
create_final_assistant_message_sse_response("Done 2")
.expect("build mock assistant message"),
];
let server = create_mock_chat_completions_server(responses).await;
@@ -199,8 +202,20 @@ async fn test_cancel_stream_then_reconnect_catches_up_initial_state() {
.get("msg")
.and_then(|m| m.get("message"))
.and_then(|t| t.as_str())
== Some("Done")
== Some("Done 1")
}));
assert!(events.iter().any(|ev| {
ev.get("msg")
.and_then(|m| m.get("type"))
.and_then(|t| t.as_str())
== Some("agent_message")
&& ev
.get("msg")
.and_then(|m| m.get("message"))
.and_then(|t| t.as_str())
== Some("Done 2")
}));
drop(server);
}
// Helper to create a config.toml pointing at the mock model server.