restructure

This commit is contained in:
Ahmed Ibrahim
2025-08-04 13:40:22 -07:00
parent bfbe523f81
commit c515d2869e
2 changed files with 30 additions and 40 deletions

View File

@@ -24,8 +24,7 @@ use crate::mcp_protocol::NotificationMeta;
use crate::outgoing_message::OutgoingMessageSender;
use crate::patch_approval::handle_patch_approval_request;
/// A single source of truth for an active conversation.
/// Owns the Codex session and all per-conversation state.
/// Conversation struct that owns the Codex session and all per-conversation state.
pub(crate) struct Conversation {
codex: Arc<Codex>,
session_id: Uuid,
@@ -37,26 +36,12 @@ pub(crate) struct Conversation {
pending_elicitations: Vec<PendingElicitation>,
}
/// Deferred elicitation requests to be sent after InitialState when
/// streaming is enabled. Preserves original event order (FIFO).
enum PendingElicitation {
Exec {
command: Vec<String>,
cwd: PathBuf,
event_id: String,
call_id: String,
},
PatchReq {
call_id: String,
reason: Option<String>,
grant_root: Option<PathBuf>,
changes: HashMap<PathBuf, FileChange>,
event_id: String,
},
ExecRequest(ExecRequest),
PatchRequest(PatchRequest),
}
/// Snapshot of a patch approval request used to defer elicitation.
struct PatchReq {
struct PatchRequest {
call_id: String,
reason: Option<String>,
grant_root: Option<PathBuf>,
@@ -64,6 +49,13 @@ struct PatchReq {
event_id: String,
}
struct ExecRequest {
command: Vec<String>,
cwd: PathBuf,
event_id: String,
call_id: String,
}
impl Conversation {
pub(crate) fn new(
codex: Arc<Codex>,
@@ -87,11 +79,11 @@ impl Conversation {
}
pub(crate) async fn set_streaming(&mut self, enabled: bool) {
if enabled && !self.streaming_enabled {
if enabled {
self.streaming_enabled = true;
self.emit_initial_state().await;
self.drain_pending_elicitations().await;
} else if !enabled && self.streaming_enabled {
} else {
self.streaming_enabled = false;
}
}
@@ -173,7 +165,7 @@ impl Conversation {
grant_root,
changes,
}) => {
self.process_patch_request(PatchReq {
self.process_patch_request(PatchRequest {
call_id,
reason,
grant_root,
@@ -189,7 +181,7 @@ impl Conversation {
self.handle_task_started().await;
}
EventMsg::SessionConfigured(_) => {
tracing::error!("unexpected SessionConfigured event");
error!("unexpected SessionConfigured event");
}
EventMsg::AgentMessageDelta(_) => {}
EventMsg::AgentReasoningDelta(_) => {}
@@ -219,8 +211,6 @@ impl Conversation {
}
}
// streaming toggling handled by set_streaming()
async fn emit_initial_state(&self) {
let params = InitialStateNotificationParams {
meta: Some(NotificationMeta {
@@ -243,12 +233,12 @@ impl Conversation {
async fn drain_pending_elicitations(&mut self) {
for item in self.pending_elicitations.drain(..) {
match item {
PendingElicitation::Exec {
PendingElicitation::ExecRequest(ExecRequest {
command,
cwd,
event_id,
call_id,
} => {
}) => {
handle_exec_approval_request(
command,
cwd,
@@ -264,13 +254,13 @@ impl Conversation {
)
.await;
}
PendingElicitation::PatchReq {
PendingElicitation::PatchRequest(PatchRequest {
call_id,
reason,
grant_root,
changes,
event_id,
} => {
}) => {
handle_patch_approval_request(
call_id,
reason,
@@ -314,17 +304,18 @@ impl Conversation {
)
.await;
} else {
self.pending_elicitations.push(PendingElicitation::Exec {
command,
cwd,
event_id,
call_id,
});
self.pending_elicitations
.push(PendingElicitation::ExecRequest(ExecRequest {
command,
cwd,
event_id,
call_id,
}));
}
}
async fn process_patch_request(&mut self, req: PatchReq) {
let PatchReq {
async fn process_patch_request(&mut self, req: PatchRequest) {
let PatchRequest {
call_id,
reason,
grant_root,
@@ -349,13 +340,13 @@ impl Conversation {
.await;
} else {
self.pending_elicitations
.push(PendingElicitation::PatchReq {
.push(PendingElicitation::PatchRequest(PatchRequest {
call_id,
reason,
grant_root,
changes,
event_id,
});
}));
}
}

View File

@@ -594,7 +594,6 @@ impl MessageProcessor {
// ---------------------------------------------------------------------
// Notification handlers
// ---------------------------------------------------------------------
#[allow(clippy::collapsible_match)]
async fn handle_cancelled_notification(
&self,
params: <mcp_types::CancelledNotification as mcp_types::ModelContextProtocolNotification>::Params,