restructure

This commit is contained in:
Ahmed Ibrahim
2025-08-02 13:54:13 -07:00
parent c01b9d2d2a
commit ec6a4f9e2a

View File

@@ -6,6 +6,7 @@ use std::sync::Arc;
use codex_core::Codex;
use codex_core::protocol::AgentMessageEvent;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecApprovalRequestEvent;
use codex_core::protocol::FileChange;
@@ -24,6 +25,42 @@ use crate::mcp_protocol::NotificationMeta;
use crate::outgoing_message::OutgoingMessageSender;
use crate::patch_approval::handle_patch_approval_request;
/// Deferred elicitation requests to be sent after InitialState when
/// streaming is enabled. Preserves original event order (FIFO).
enum PendingElicitation {
Exec {
command: Vec<String>,
cwd: PathBuf,
event_id: String,
call_id: String,
},
PatchReq {
call_id: String,
reason: Option<String>,
grant_root: Option<PathBuf>,
changes: HashMap<PathBuf, FileChange>,
event_id: String,
},
}
/// Immutable context shared across helper functions to avoid long
/// argument lists.
struct LoopCtx {
outgoing: Arc<OutgoingMessageSender>,
codex: Arc<Codex>,
request_id: RequestId,
request_id_str: String,
}
/// Snapshot of a patch approval request used to defer elicitation.
struct PatchReq {
call_id: String,
reason: Option<String>,
grant_root: Option<PathBuf>,
changes: HashMap<PathBuf, FileChange>,
event_id: String,
}
/// Conversation event loop bridging Codex events to MCP notifications.
///
/// Semantics:
@@ -61,227 +98,146 @@ pub async fn run_conversation_loop(
loop {
tokio::select! {
res = codex.next_event() => {
match res {
Ok(event) => {
// Always buffer the event
buffered_events.push(CodexEventNotificationParams { meta: None, msg: event.msg.clone() });
// Stream immediately if enabled
stream_event_if_enabled(streaming_enabled, &ctx, &event.msg).await;
match event.msg {
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent { command, cwd, call_id, reason: _ }) => {
process_exec_request(
streaming_enabled,
&mut pending_elicitations,
command,
cwd,
call_id,
event.id.clone(),
&ctx,
)
.await;
continue;
}
EventMsg::Error(_) => {
error!("Codex runtime error");
}
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent { call_id, reason, grant_root, changes }) => {
process_patch_request(
streaming_enabled,
&mut pending_elicitations,
PatchReq {
call_id,
reason,
grant_root,
changes,
event_id: event.id.clone(),
},
&ctx,
)
.await;
continue;
}
EventMsg::TaskComplete(_) => {
handle_task_complete(&running_sessions, &session_id).await;
}
EventMsg::SessionConfigured(_) => {
tracing::error!("unexpected SessionConfigured event");
}
EventMsg::AgentMessageDelta(_) => {
// TODO: think how we want to support this in the MCP
}
EventMsg::AgentReasoningDelta(_) => {
// TODO: think how we want to support this in the MCP
}
EventMsg::AgentMessage(AgentMessageEvent { .. }) => {
// TODO: think how we want to support this in the MCP
}
EventMsg::TaskStarted
| EventMsg::TokenCount(_)
| EventMsg::AgentReasoning(_)
| EventMsg::McpToolCallBegin(_)
| EventMsg::McpToolCallEnd(_)
| EventMsg::ExecCommandBegin(_)
| EventMsg::ExecCommandEnd(_)
| EventMsg::BackgroundEvent(_)
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::PatchApplyBegin(_)
| EventMsg::PatchApplyEnd(_)
| EventMsg::GetHistoryEntryResponse(_)
| EventMsg::PlanUpdate(_)
| EventMsg::ShutdownComplete => {
}
}
}
Err(e) => {
error!("Codex runtime error: {e}");
}
}
handle_next_event_arm(
res,
streaming_enabled,
&mut buffered_events,
&mut pending_elicitations,
&ctx,
&running_sessions,
&session_id,
).await;
},
changed = stream_rx.changed() => {
if changed.is_ok() {
let now = *stream_rx.borrow();
handle_stream_change(
now,
&mut streaming_enabled,
session_id,
&buffered_events,
&mut pending_elicitations,
&ctx,
)
.await;
} else {
error!("stream_rx change error; streaming control channel closed");
}
handle_stream_rx_arm(
changed,
&mut stream_rx,
&mut streaming_enabled,
&session_id,
&buffered_events,
&mut pending_elicitations,
&ctx,
).await;
}
}
}
}
/// Deferred elicitation requests to be sent after InitialState when
/// streaming is enabled. Preserves original event order (FIFO).
enum PendingElicitation {
Exec {
command: Vec<String>,
cwd: PathBuf,
event_id: String,
call_id: String,
},
Patch {
call_id: String,
reason: Option<String>,
grant_root: Option<PathBuf>,
changes: HashMap<PathBuf, FileChange>,
event_id: String,
},
}
/// Immutable context shared across helper functions to avoid long
/// argument lists.
struct LoopCtx {
outgoing: Arc<OutgoingMessageSender>,
codex: Arc<Codex>,
request_id: RequestId,
request_id_str: String,
}
/// Snapshot of a patch approval request used to defer elicitation.
struct PatchReq {
call_id: String,
reason: Option<String>,
grant_root: Option<PathBuf>,
changes: HashMap<PathBuf, FileChange>,
event_id: String,
}
/// Streams a single Codex event as an MCP notification if streaming is enabled.
async fn stream_event_if_enabled(streaming_enabled: bool, ctx: &LoopCtx, msg: &EventMsg) {
if !streaming_enabled {
return;
}
let method = msg.to_string();
let params = CodexEventNotificationParams {
meta: None,
msg: msg.clone(),
};
if let Ok(params_val) = serde_json::to_value(&params) {
ctx.outgoing
.send_custom_notification(&method, params_val)
.await;
} else {
error!("Failed to serialize event params");
}
}
/// Handles an exec approval request. If streaming is disabled, defers the
/// elicitation until after InitialState; otherwise elicits immediately.
async fn process_exec_request(
/// Handles the `codex.next_event()` select arm.
async fn handle_next_event_arm<E>(
res: Result<Event, E>,
streaming_enabled: bool,
pending: &mut Vec<PendingElicitation>,
command: Vec<String>,
cwd: PathBuf,
call_id: String,
event_id: String,
buffered_events: &mut Vec<CodexEventNotificationParams>,
pending_elicitations: &mut Vec<PendingElicitation>,
ctx: &LoopCtx,
running_sessions: &Arc<Mutex<HashSet<Uuid>>>,
session_id: &Uuid,
) where
E: std::fmt::Display,
{
match res {
Ok(event) => {
buffered_events.push(CodexEventNotificationParams {
meta: None,
msg: event.msg.clone(),
});
stream_event_if_enabled(streaming_enabled, ctx, &event.msg).await;
match event.msg {
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
command,
cwd,
call_id,
reason: _,
}) => {
process_exec_request(
streaming_enabled,
pending_elicitations,
command,
cwd,
call_id,
event.id.clone(),
ctx,
)
.await;
}
EventMsg::Error(_) => {
error!("Codex runtime error");
}
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id,
reason,
grant_root,
changes,
}) => {
process_patch_request(
streaming_enabled,
pending_elicitations,
PatchReq {
call_id,
reason,
grant_root,
changes,
event_id: event.id.clone(),
},
ctx,
)
.await;
}
EventMsg::TaskComplete(_) => {
handle_task_complete(running_sessions, session_id).await;
}
EventMsg::SessionConfigured(_) => {
tracing::error!("unexpected SessionConfigured event");
}
EventMsg::AgentMessageDelta(_) => {}
EventMsg::AgentReasoningDelta(_) => {}
EventMsg::AgentMessage(AgentMessageEvent { .. }) => {}
EventMsg::TaskStarted
| EventMsg::TokenCount(_)
| EventMsg::AgentReasoning(_)
| EventMsg::McpToolCallBegin(_)
| EventMsg::McpToolCallEnd(_)
| EventMsg::ExecCommandBegin(_)
| EventMsg::ExecCommandEnd(_)
| EventMsg::BackgroundEvent(_)
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::PatchApplyBegin(_)
| EventMsg::PatchApplyEnd(_)
| EventMsg::GetHistoryEntryResponse(_)
| EventMsg::PlanUpdate(_)
| EventMsg::ShutdownComplete => {}
}
}
Err(e) => {
error!("Codex runtime error: {e}");
}
}
}
/// Handles the `stream_rx.changed()` select arm.
async fn handle_stream_rx_arm(
changed: Result<(), tokio::sync::watch::error::RecvError>,
stream_rx: &mut WatchReceiver<bool>,
streaming_enabled: &mut bool,
session_id: &Uuid,
buffered_events: &[CodexEventNotificationParams],
pending_elicitations: &mut Vec<PendingElicitation>,
ctx: &LoopCtx,
) {
if streaming_enabled {
handle_exec_approval_request(
command,
cwd,
ctx.outgoing.clone(),
ctx.codex.clone(),
ctx.request_id.clone(),
ctx.request_id_str.clone(),
event_id,
call_id,
if changed.is_ok() {
let now = *stream_rx.borrow();
handle_stream_change(
now,
streaming_enabled,
*session_id,
buffered_events,
pending_elicitations,
ctx,
)
.await;
} else {
pending.push(PendingElicitation::Exec {
command,
cwd,
event_id,
call_id,
});
}
}
/// Handles a patch approval request. If streaming is disabled, defers the
/// elicitation until after InitialState; otherwise elicits immediately.
async fn process_patch_request(
streaming_enabled: bool,
pending: &mut Vec<PendingElicitation>,
req: PatchReq,
ctx: &LoopCtx,
) {
let PatchReq {
call_id,
reason,
grant_root,
changes,
event_id,
} = req;
if streaming_enabled {
handle_patch_approval_request(
call_id,
reason,
grant_root,
changes,
ctx.outgoing.clone(),
ctx.codex.clone(),
ctx.request_id.clone(),
ctx.request_id_str.clone(),
event_id,
)
.await;
} else {
pending.push(PendingElicitation::Patch {
call_id,
reason,
grant_root,
changes,
event_id,
});
error!("stream_rx change error; streaming control channel closed");
}
}
@@ -353,7 +309,7 @@ async fn drain_pending_elicitations(pending: &mut Vec<PendingElicitation>, ctx:
)
.await;
}
PendingElicitation::Patch {
PendingElicitation::PatchReq {
call_id,
reason,
grant_root,
@@ -377,6 +333,97 @@ async fn drain_pending_elicitations(pending: &mut Vec<PendingElicitation>, ctx:
}
}
/// Handles an exec approval request. If streaming is disabled, defers the
/// elicitation until after InitialState; otherwise elicits immediately.
async fn process_exec_request(
streaming_enabled: bool,
pending: &mut Vec<PendingElicitation>,
command: Vec<String>,
cwd: PathBuf,
call_id: String,
event_id: String,
ctx: &LoopCtx,
) {
if streaming_enabled {
handle_exec_approval_request(
command,
cwd,
ctx.outgoing.clone(),
ctx.codex.clone(),
ctx.request_id.clone(),
ctx.request_id_str.clone(),
event_id,
call_id,
)
.await;
} else {
pending.push(PendingElicitation::Exec {
command,
cwd,
event_id,
call_id,
});
}
}
/// Handles a patch approval request. If streaming is disabled, defers the
/// elicitation until after InitialState; otherwise elicits immediately.
async fn process_patch_request(
streaming_enabled: bool,
pending: &mut Vec<PendingElicitation>,
req: PatchReq,
ctx: &LoopCtx,
) {
let PatchReq {
call_id,
reason,
grant_root,
changes,
event_id,
} = req;
if streaming_enabled {
handle_patch_approval_request(
call_id,
reason,
grant_root,
changes,
ctx.outgoing.clone(),
ctx.codex.clone(),
ctx.request_id.clone(),
ctx.request_id_str.clone(),
event_id,
)
.await;
} else {
pending.push(PendingElicitation::PatchReq {
call_id,
reason,
grant_root,
changes,
event_id,
});
}
}
/// Streams a single Codex event as an MCP notification if streaming is enabled.
async fn stream_event_if_enabled(streaming_enabled: bool, ctx: &LoopCtx, msg: &EventMsg) {
if !streaming_enabled {
return;
}
let method = msg.to_string();
let params = CodexEventNotificationParams {
meta: None,
msg: msg.clone(),
};
if let Ok(params_val) = serde_json::to_value(&params) {
ctx.outgoing
.send_custom_notification(&method, params_val)
.await;
} else {
error!("Failed to serialize event params");
}
}
/// Removes the session id from the shared running set when a task completes.
async fn handle_task_complete(running_sessions: &Arc<Mutex<HashSet<Uuid>>>, session_id: &Uuid) {
let mut running_sessions = running_sessions.lock().await;