we don't need to error on submit while turn is running

This commit is contained in:
Ahmed Ibrahim
2025-08-04 22:40:49 -07:00
parent c1e9083cbd
commit 4259e5787f

View File

@@ -34,7 +34,6 @@ pub(crate) struct Conversation {
}
struct ConversationState {
running: bool,
streaming_enabled: bool,
buffered_events: Vec<CodexEventNotificationParams>,
pending_elicitations: Vec<PendingElicitation>,
@@ -53,7 +52,6 @@ impl Conversation {
outgoing,
request_id,
state: Mutex::new(ConversationState {
running: false,
streaming_enabled: false,
buffered_events: Vec::new(),
pending_elicitations: Vec::new(),
@@ -84,11 +82,10 @@ impl Conversation {
fn spawn_loop(this: Arc<Self>) {
tokio::spawn(async move {
loop {
// Codex can be awaited without locking Conversation
let codex = this.codex.clone();
let res = codex.next_event().await;
this.handle_next_event(res).await;
// Clone once outside the loop; `Codex` is cheap to clone but we don't need to do it repeatedly.
let codex = this.codex.clone();
while let Ok(event) = codex.next_event().await {
this.handle_event(event).await;
}
});
}
@@ -102,13 +99,6 @@ impl Conversation {
request_id: RequestId,
items: Vec<codex_core::protocol::InputItem>,
) -> Result<(), String> {
{
let mut st = self.state.lock().await;
if st.running {
return Err("Session is already running".to_string());
}
st.running = true;
}
let request_id_string = match &request_id {
RequestId::String(s) => s.clone(),
RequestId::Integer(i) => i.to_string(),
@@ -121,91 +111,71 @@ impl Conversation {
})
.await;
if let Err(e) = submit_res {
let mut st = self.state.lock().await;
st.running = false;
return Err(format!("Failed to submit user input: {e}"));
}
Ok(())
}
async fn handle_next_event<E>(&self, res: Result<Event, E>)
where
E: std::fmt::Display,
{
match res {
Ok(event) => {
{
let mut st = self.state.lock().await;
st.buffered_events.push(CodexEventNotificationParams {
meta: None,
msg: event.msg.clone(),
});
}
self.stream_event_if_enabled(&event.msg).await;
async fn handle_event(&self, event: Event) {
{
let mut st = self.state.lock().await;
st.buffered_events.push(CodexEventNotificationParams {
meta: None,
msg: event.msg.clone(),
});
}
self.stream_event_if_enabled(&event.msg).await;
match event.msg {
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
command,
cwd,
call_id,
reason: _,
}) => {
self.process_exec_request(command, cwd, call_id, event.id.clone())
.await;
}
EventMsg::Error(_) => {
error!("Codex runtime error");
self.handle_task_clear().await;
}
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id,
reason,
grant_root,
changes,
}) => {
self.process_patch_request(PatchRequest {
call_id,
reason,
grant_root,
changes,
event_id: event.id.clone(),
})
.await;
}
EventMsg::TaskComplete(_) => {
self.handle_task_clear().await;
}
EventMsg::TaskStarted => {
self.handle_task_started().await;
}
EventMsg::SessionConfigured(_) => {
error!("unexpected SessionConfigured event");
}
EventMsg::AgentMessageDelta(_) => {}
EventMsg::AgentReasoningDelta(_) => {}
EventMsg::AgentMessage(AgentMessageEvent { .. }) => {}
EventMsg::TokenCount(_)
| EventMsg::AgentReasoning(_)
| EventMsg::McpToolCallBegin(_)
| EventMsg::McpToolCallEnd(_)
| EventMsg::ExecCommandBegin(_)
| EventMsg::ExecCommandEnd(_)
| EventMsg::BackgroundEvent(_)
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::PatchApplyBegin(_)
| EventMsg::PatchApplyEnd(_)
| EventMsg::GetHistoryEntryResponse(_)
| EventMsg::PlanUpdate(_)
| EventMsg::TurnDiff(_)
| EventMsg::ShutdownComplete => {
self.handle_task_clear().await;
}
}
match event.msg {
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
command,
cwd,
call_id,
reason: _,
}) => {
self.process_exec_request(command, cwd, call_id, event.id.clone())
.await;
}
Err(e) => {
error!("Codex runtime error: {e}");
self.handle_task_clear().await;
EventMsg::Error(_) => {
error!("Codex runtime error");
}
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id,
reason,
grant_root,
changes,
}) => {
self.process_patch_request(PatchRequest {
call_id,
reason,
grant_root,
changes,
event_id: event.id.clone(),
})
.await;
}
EventMsg::TaskComplete(_) => {}
EventMsg::TaskStarted => {}
EventMsg::SessionConfigured(_) => {
error!("unexpected SessionConfigured event");
}
EventMsg::AgentMessageDelta(_) => {}
EventMsg::AgentReasoningDelta(_) => {}
EventMsg::AgentMessage(AgentMessageEvent { .. }) => {}
EventMsg::TokenCount(_)
| EventMsg::AgentReasoning(_)
| EventMsg::McpToolCallBegin(_)
| EventMsg::McpToolCallEnd(_)
| EventMsg::ExecCommandBegin(_)
| EventMsg::ExecCommandEnd(_)
| EventMsg::BackgroundEvent(_)
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::PatchApplyBegin(_)
| EventMsg::PatchApplyEnd(_)
| EventMsg::GetHistoryEntryResponse(_)
| EventMsg::PlanUpdate(_)
| EventMsg::TurnDiff(_)
| EventMsg::ShutdownComplete => {}
}
}
@@ -367,16 +337,6 @@ impl Conversation {
error!("Failed to serialize event params");
}
}
async fn handle_task_started(&self) {
let mut st = self.state.lock().await;
st.running = true;
}
async fn handle_task_clear(&self) {
let mut st = self.state.lock().await;
st.running = false;
}
}
enum PendingElicitation {