Compare commits

...

13 Commits

Author SHA1 Message Date
Ahmed Ibrahim
a9dd1b2e11 proto 2025-10-23 12:12:48 -07:00
Ahmed Ibrahim
54f6a292bb progress 2025-10-23 08:16:49 -07:00
Ahmed Ibrahim
282e696079 feedback 2025-10-23 07:53:03 -07:00
Ahmed Ibrahim
bb26652be1 feedback 2025-10-23 07:52:05 -07:00
Ahmed Ibrahim
d830ec9678 clean 2025-10-22 16:25:52 -07:00
Ahmed Ibrahim
c3ef52f442 clean 2025-10-22 16:25:27 -07:00
Ahmed Ibrahim
d7ad216217 clean 2025-10-22 16:24:19 -07:00
Ahmed Ibrahim
e380b297d0 clean 2025-10-22 16:21:52 -07:00
Ahmed Ibrahim
d63e920ea4 tests 2025-10-22 16:17:05 -07:00
Ahmed Ibrahim
838d82e19c clean 2025-10-22 16:14:20 -07:00
Ahmed Ibrahim
89efcac5a3 clean 2025-10-22 16:12:33 -07:00
Ahmed Ibrahim
6dcb9ff39d clippy 2025-10-22 16:05:32 -07:00
Ahmed Ibrahim
3bf76f79f0 progress 2025-10-22 15:39:34 -07:00
10 changed files with 457 additions and 274 deletions

View File

@@ -10,7 +10,7 @@ use crate::function_tool::FunctionCallError;
use crate::mcp::auth::McpAuthStatusEntry;
use crate::parse_command::parse_command;
use crate::parse_turn_item;
use crate::review_format::format_review_findings_block;
use crate::response_processing::process_items;
use crate::terminal;
use crate::user_notification::UserNotifier;
use async_channel::Receiver;
@@ -19,7 +19,6 @@ use codex_apply_patch::ApplyPatchAction;
use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::protocol::ConversationPathResponseEvent;
use codex_protocol::protocol::ExitedReviewModeEvent;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ItemStartedEvent;
use codex_protocol::protocol::ReviewRequest;
@@ -46,7 +45,6 @@ use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::trace;
use tracing::warn;
use crate::ModelProviderInfo;
@@ -85,7 +83,6 @@ use crate::protocol::ListCustomPromptsResponseEvent;
use crate::protocol::Op;
use crate::protocol::RateLimitSnapshot;
use crate::protocol::ReviewDecision;
use crate::protocol::ReviewOutputEvent;
use crate::protocol::SandboxPolicy;
use crate::protocol::SessionConfiguredEvent;
use crate::protocol::StreamErrorEvent;
@@ -266,7 +263,6 @@ pub(crate) struct TurnContext {
pub(crate) sandbox_policy: SandboxPolicy,
pub(crate) shell_environment_policy: ShellEnvironmentPolicy,
pub(crate) tools_config: ToolsConfig,
pub(crate) is_review_mode: bool,
pub(crate) final_output_json_schema: Option<Value>,
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
}
@@ -401,7 +397,6 @@ impl Session {
sandbox_policy: session_configuration.sandbox_policy.clone(),
shell_environment_policy: config.shell_environment_policy.clone(),
tools_config,
is_review_mode: false,
final_output_json_schema: None,
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
}
@@ -633,6 +628,14 @@ impl Session {
state.session_configuration = state.session_configuration.apply(&updates);
}
pub(crate) async fn base_config(&self) -> Arc<Config> {
let state = self.state.lock().await;
state
.session_configuration
.original_config_do_not_use
.clone()
}
pub(crate) async fn new_turn(&self, updates: SessionSettingsUpdate) -> Arc<TurnContext> {
let sub_id = self.next_internal_sub_id();
self.new_turn_with_sub_id(sub_id, updates).await
@@ -855,7 +858,7 @@ impl Session {
/// Records input items: always append to conversation history and
/// persist these response items to rollout.
async fn record_conversation_items(&self, items: &[ResponseItem]) {
pub(crate) async fn record_conversation_items(&self, items: &[ResponseItem]) {
self.record_into_history(items).await;
self.persist_rollout_response_items(items).await;
}
@@ -1467,7 +1470,6 @@ async fn spawn_review_thread(
sandbox_policy: parent_turn_context.sandbox_policy.clone(),
shell_environment_policy: parent_turn_context.shell_environment_policy.clone(),
cwd: parent_turn_context.cwd.clone(),
is_review_mode: true,
final_output_json_schema: None,
codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(),
};
@@ -1517,21 +1519,8 @@ pub(crate) async fn run_task(
sess.send_event(&turn_context, event).await;
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
// For review threads, keep an isolated in-memory history so the
// model sees a fresh conversation without the parent session's history.
// For normal turns, continue recording to the session history as before.
let is_review_mode = turn_context.is_review_mode;
let mut review_thread_history: ConversationHistory = ConversationHistory::new();
if is_review_mode {
// Seed review threads with environment context so the model knows the working directory.
review_thread_history
.record_items(sess.build_initial_context(turn_context.as_ref()).iter());
review_thread_history.record_items(std::iter::once(&initial_input_for_turn.into()));
} else {
sess.record_input_and_rollout_usermsg(turn_context.as_ref(), &initial_input_for_turn)
.await;
}
sess.record_input_and_rollout_usermsg(turn_context.as_ref(), &initial_input_for_turn)
.await;
let mut last_agent_message: Option<String> = None;
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
@@ -1560,12 +1549,7 @@ pub(crate) async fn run_task(
// conversation history on each turn. The rollout file, however, should
// only record the new items that originated in this turn so that it
// represents an append-only log without duplicates.
let turn_input: Vec<ResponseItem> = if is_review_mode {
if !pending_input.is_empty() {
review_thread_history.record_items(&pending_input);
}
review_thread_history.get_history()
} else {
let turn_input: Vec<ResponseItem> = {
sess.record_conversation_items(&pending_input).await;
sess.history_snapshot().await
};
@@ -1608,109 +1592,8 @@ pub(crate) async fn run_task(
let token_limit_reached = total_usage_tokens
.map(|tokens| tokens >= limit)
.unwrap_or(false);
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
let mut responses = Vec::<ResponseInputItem>::new();
for processed_response_item in processed_items {
let ProcessedResponseItem { item, response } = processed_response_item;
match (&item, &response) {
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
// If the model returned a message, we need to record it.
items_to_record_in_conversation_history.push(item);
}
(
ResponseItem::LocalShellCall { .. },
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
},
);
}
(
ResponseItem::FunctionCall { .. },
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
},
);
}
(
ResponseItem::CustomToolCall { .. },
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(
ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
output: output.clone(),
},
);
}
(
ResponseItem::FunctionCall { .. },
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
) => {
items_to_record_in_conversation_history.push(item);
let output = match result {
Ok(call_tool_result) => {
convert_call_tool_result_to_function_call_output_payload(
call_tool_result,
)
}
Err(err) => FunctionCallOutputPayload {
content: err.clone(),
success: Some(false),
},
};
items_to_record_in_conversation_history.push(
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output,
},
);
}
(
ResponseItem::Reasoning {
id,
summary,
content,
encrypted_content,
},
None,
) => {
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
id: id.clone(),
summary: summary.clone(),
content: content.clone(),
encrypted_content: encrypted_content.clone(),
});
}
_ => {
warn!("Unexpected response item: {item:?} with response: {response:?}");
}
};
if let Some(response) = response {
responses.push(response);
}
}
// Only attempt to take the lock if there is something to record.
if !items_to_record_in_conversation_history.is_empty() {
if is_review_mode {
review_thread_history
.record_items(items_to_record_in_conversation_history.iter());
} else {
sess.record_conversation_items(&items_to_record_in_conversation_history)
.await;
}
}
let (responses, items_to_record_in_conversation_history) =
process_items(processed_items, &sess).await;
if token_limit_reached {
if auto_compact_recently_attempted {
@@ -1749,7 +1632,10 @@ pub(crate) async fn run_task(
}
continue;
}
Err(CodexErr::TurnAborted) => {
Err(CodexErr::TurnAborted {
dangling_artifacts: processed_items,
}) => {
let _ = process_items(processed_items, &sess).await;
// Aborted turn is reported via a different event.
break;
}
@@ -1765,50 +1651,9 @@ pub(crate) async fn run_task(
}
}
// If this was a review thread and we have a final assistant message,
// try to parse it as a ReviewOutput.
//
// If parsing fails, construct a minimal ReviewOutputEvent using the plain
// text as the overall explanation. Else, just exit review mode with None.
//
// Emits an ExitedReviewMode event with the parsed review output.
if turn_context.is_review_mode {
exit_review_mode(
sess.clone(),
Arc::clone(&turn_context),
last_agent_message.as_deref().map(parse_review_output_event),
)
.await;
}
last_agent_message
}
/// Parse the review output; when not valid JSON, build a structured
/// fallback that carries the plain text as the overall explanation.
///
/// Returns: a ReviewOutputEvent parsed from JSON or a fallback populated from text.
fn parse_review_output_event(text: &str) -> ReviewOutputEvent {
// Try direct parse first
if let Ok(ev) = serde_json::from_str::<ReviewOutputEvent>(text) {
return ev;
}
// If wrapped in markdown fences or extra prose, attempt to extract the first JSON object
if let (Some(start), Some(end)) = (text.find('{'), text.rfind('}'))
&& start < end
&& let Some(slice) = text.get(start..=end)
&& let Ok(ev) = serde_json::from_str::<ReviewOutputEvent>(slice)
{
return ev;
}
// Not JSON return a structured ReviewOutputEvent that carries
// the plain text as the overall explanation.
ReviewOutputEvent {
overall_explanation: text.to_string(),
..Default::default()
}
}
async fn run_turn(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
@@ -1850,7 +1695,13 @@ async fn run_turn(
.await
{
Ok(output) => return Ok(output),
Err(CodexErr::TurnAborted) => return Err(CodexErr::TurnAborted),
Err(CodexErr::TurnAborted {
dangling_artifacts: processed_items,
}) => {
return Err(CodexErr::TurnAborted {
dangling_artifacts: processed_items,
});
}
Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted),
Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)),
Err(e @ CodexErr::Fatal(_)) => return Err(e),
@@ -1903,9 +1754,9 @@ async fn run_turn(
/// "handled" such that it produces a `ResponseInputItem` that needs to be
/// sent back to the model on the next turn.
#[derive(Debug)]
pub(crate) struct ProcessedResponseItem {
pub(crate) item: ResponseItem,
pub(crate) response: Option<ResponseInputItem>,
pub struct ProcessedResponseItem {
pub item: ResponseItem,
pub response: Option<ResponseInputItem>,
}
#[derive(Debug)]
@@ -1954,7 +1805,15 @@ async fn try_run_turn(
// Poll the next item from the model stream. We must inspect *both* Ok and Err
// cases so that transient stream failures (e.g., dropped SSE connection before
// `response.completed`) bubble up and trigger the caller's retry logic.
let event = stream.next().or_cancel(&cancellation_token).await?;
let event = match stream.next().or_cancel(&cancellation_token).await {
Ok(event) => event,
Err(codex_async_utils::CancelErr::Cancelled) => {
let processed_items = output.try_collect().await?;
return Err(CodexErr::TurnAborted {
dangling_artifacts: processed_items,
});
}
};
let event = match event {
Some(res) => res?,
@@ -1978,7 +1837,8 @@ async fn try_run_turn(
let payload_preview = call.payload.log_payload().into_owned();
tracing::info!("ToolCall: {} {}", call.tool_name, payload_preview);
let response = tool_runtime.handle_tool_call(call);
let response =
tool_runtime.handle_tool_call(call, cancellation_token.child_token());
output.push_back(
async move {
@@ -2060,12 +1920,7 @@ async fn try_run_turn(
} => {
sess.update_token_usage_info(turn_context.as_ref(), token_usage.as_ref())
.await;
let processed_items = output
.try_collect()
.or_cancel(&cancellation_token)
.await??;
let processed_items = output.try_collect().await?;
let unified_diff = {
let mut tracker = turn_diff_tracker.lock().await;
tracker.get_unified_diff()
@@ -2085,12 +1940,8 @@ async fn try_run_turn(
ResponseEvent::OutputTextDelta(delta) => {
// In review child threads, suppress assistant text deltas; the
// UI will show a selection popup from the final ReviewOutput.
if !turn_context.is_review_mode {
let event = EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta });
sess.send_event(&turn_context, event).await;
} else {
trace!("suppressing OutputTextDelta in review mode");
}
let event = EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta });
sess.send_event(&turn_context, event).await;
}
ResponseEvent::ReasoningSummaryDelta(delta) => {
let event = EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta });
@@ -2125,13 +1976,7 @@ async fn handle_non_tool_response_item(
ResponseItem::Message { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. } => {
let turn_item = match &item {
ResponseItem::Message { .. } if turn_context.is_review_mode => {
trace!("suppressing assistant Message in review mode");
None
}
_ => parse_turn_item(&item),
};
let turn_item = parse_turn_item(&item);
if let Some(turn_item) = turn_item {
sess.emit_turn_item_started_completed(
turn_context.as_ref(),
@@ -2169,7 +2014,7 @@ pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -
}
})
}
fn convert_call_tool_result_to_function_call_output_payload(
pub(crate) fn convert_call_tool_result_to_function_call_output_payload(
call_tool_result: &CallToolResult,
) -> FunctionCallOutputPayload {
let CallToolResult {
@@ -2204,58 +2049,6 @@ fn convert_call_tool_result_to_function_call_output_payload(
}
}
/// Emits an ExitedReviewMode Event with optional ReviewOutput,
/// and records a developer message with the review output.
pub(crate) async fn exit_review_mode(
session: Arc<Session>,
turn_context: Arc<TurnContext>,
review_output: Option<ReviewOutputEvent>,
) {
let event = EventMsg::ExitedReviewMode(ExitedReviewModeEvent {
review_output: review_output.clone(),
});
session.send_event(turn_context.as_ref(), event).await;
let mut user_message = String::new();
if let Some(out) = review_output {
let mut findings_str = String::new();
let text = out.overall_explanation.trim();
if !text.is_empty() {
findings_str.push_str(text);
}
if !out.findings.is_empty() {
let block = format_review_findings_block(&out.findings, None);
findings_str.push_str(&format!("\n{block}"));
}
user_message.push_str(&format!(
r#"<user_action>
<context>User initiated a review task. Here's the full review output from reviewer model. User may select one or more comments to resolve.</context>
<action>review</action>
<results>
{findings_str}
</results>
</user_action>
"#));
} else {
user_message.push_str(r#"<user_action>
<context>User initiated a review task, but was interrupted. If user asks about this, tell them to re-initiate a review with `/review` and wait for it to complete.</context>
<action>review</action>
<results>
None.
</results>
</user_action>
"#);
}
session
.record_conversation_items(&[ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText { text: user_message }],
}])
.await;
}
fn mcp_init_error_display(
server_name: &str,
entry: Option<&McpAuthStatusEntry>,
@@ -2735,12 +2528,6 @@ mod tests {
sleep(Duration::from_secs(60)).await;
}
}
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
if let TaskKind::Review = self.kind {
exit_review_mode(session.clone_session(), ctx, None).await;
}
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]

View File

@@ -1,3 +1,4 @@
use crate::codex::ProcessedResponseItem;
use crate::exec::ExecToolCallOutput;
use crate::token_data::KnownPlan;
use crate::token_data::PlanType;
@@ -53,8 +54,11 @@ pub enum SandboxErr {
#[derive(Error, Debug)]
pub enum CodexErr {
// todo(aibrahim): git rid of this error carrying the dangling artifacts
#[error("turn aborted")]
TurnAborted,
TurnAborted {
dangling_artifacts: Vec<ProcessedResponseItem>,
},
/// Returned by ResponsesClient when the SSE stream disconnects or errors out **after** the HTTP
/// handshake has succeeded but **before** it finished emitting `response.completed`.
@@ -158,7 +162,9 @@ pub enum CodexErr {
impl From<CancelErr> for CodexErr {
fn from(_: CancelErr) -> Self {
CodexErr::TurnAborted
CodexErr::TurnAborted {
dangling_artifacts: Vec::new(),
}
}
}

View File

@@ -14,6 +14,7 @@ mod client_common;
pub mod codex;
mod codex_conversation;
pub use codex_conversation::CodexConversation;
mod codex_delegate;
mod command_safety;
pub mod config;
pub mod config_edit;
@@ -36,6 +37,7 @@ mod mcp_tool_call;
mod message_history;
mod model_provider_info;
pub mod parse_command;
mod response_processing;
pub mod sandboxing;
pub mod token_data;
mod truncate;

View File

@@ -0,0 +1,105 @@
use crate::codex::Session;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use tracing::warn;
/// Process streamed `ResponseItem`s from the model into the pair of:
/// - items we should record in conversation history; and
/// - `ResponseInputItem`s to send back to the model on the next turn.
pub(crate) async fn process_items(
processed_items: Vec<crate::codex::ProcessedResponseItem>,
sess: &Session,
) -> (Vec<ResponseInputItem>, Vec<ResponseItem>) {
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
let mut responses = Vec::<ResponseInputItem>::new();
for processed_response_item in processed_items {
let crate::codex::ProcessedResponseItem { item, response } = processed_response_item;
match (&item, &response) {
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
// If the model returned a message, we need to record it.
items_to_record_in_conversation_history.push(item);
}
(
ResponseItem::LocalShellCall { .. },
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
});
}
(
ResponseItem::FunctionCall { .. },
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: output.clone(),
});
}
(
ResponseItem::CustomToolCall { .. },
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }),
) => {
items_to_record_in_conversation_history.push(item);
items_to_record_in_conversation_history.push(ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
output: output.clone(),
});
}
(
ResponseItem::FunctionCall { .. },
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
) => {
items_to_record_in_conversation_history.push(item);
let output = match result {
Ok(call_tool_result) => {
crate::codex::convert_call_tool_result_to_function_call_output_payload(
call_tool_result,
)
}
Err(err) => FunctionCallOutputPayload {
content: err.clone(),
success: Some(false),
},
};
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output,
});
}
(
ResponseItem::Reasoning {
id,
summary,
content,
encrypted_content,
},
None,
) => {
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
id: id.clone(),
summary: summary.clone(),
content: content.clone(),
encrypted_content: encrypted_content.clone(),
});
}
_ => {
warn!("Unexpected response item: {item:?} with response: {response:?}");
}
};
if let Some(response) = response {
responses.push(response);
}
}
// Only attempt to take the lock if there is something to record.
if !items_to_record_in_conversation_history.is_empty() {
sess.record_conversation_items(&items_to_record_in_conversation_history)
.await;
}
(responses, items_to_record_in_conversation_history)
}

View File

@@ -13,8 +13,10 @@ use tokio_util::task::AbortOnDropHandle;
use tracing::trace;
use tracing::warn;
use crate::AuthManager;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::config::Config;
use crate::protocol::EventMsg;
use crate::protocol::TaskCompleteEvent;
use crate::protocol::TurnAbortReason;
@@ -44,6 +46,14 @@ impl SessionTaskContext {
pub(crate) fn clone_session(&self) -> Arc<Session> {
Arc::clone(&self.session)
}
pub(crate) fn auth_manager(&self) -> Arc<AuthManager> {
Arc::clone(&self.session.services.auth_manager)
}
pub(crate) async fn base_config(&self) -> Arc<Config> {
self.session.base_config().await
}
}
#[async_trait]

View File

@@ -1,11 +1,18 @@
use std::sync::Arc;
use async_trait::async_trait;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ReviewOutputEvent;
use codex_protocol::protocol::TaskCompleteEvent;
use tokio_util::sync::CancellationToken;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::codex::exit_review_mode;
use crate::codex::run_task;
use crate::codex_delegate::run_codex_conversation;
// use crate::config::Config; // no longer needed directly; use session.base_config()
use crate::review_format::format_review_findings_block;
use crate::state::TaskKind;
use codex_protocol::user_input::UserInput;
@@ -28,11 +35,108 @@ impl SessionTask for ReviewTask {
input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
let sess = session.clone_session();
run_task(sess, ctx, input, TaskKind::Review, cancellation_token).await
// let sess = session.clone_session();
// run_task(sess, ctx, input, TaskKind::Review, cancellation_token).await
let config = session.base_config().await.as_ref().clone();
let receiver =
match run_codex_conversation(config, session.auth_manager(), input, cancellation_token)
.await
{
Ok(r) => r,
Err(_) => return None,
};
while let Ok(event) = receiver.recv().await {
session
.clone_session()
.send_event(ctx.as_ref(), event.clone())
.await;
if let EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) = event {
exit_review_mode(
session.clone_session(),
last_agent_message.as_deref().map(parse_review_output_event),
)
.await;
}
}
Some("".to_string())
}
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
exit_review_mode(session.clone_session(), ctx, None).await;
async fn abort(&self, session: Arc<SessionTaskContext>, _ctx: Arc<TurnContext>) {
exit_review_mode(session.clone_session(), None).await;
}
}
/// Emits an ExitedReviewMode Event with optional ReviewOutput,
/// and records a developer message with the review output.
pub(crate) async fn exit_review_mode(
session: Arc<Session>,
review_output: Option<ReviewOutputEvent>,
) {
// ExitedReviewMode event can be emitted by the caller if needed.
let mut user_message = String::new();
if let Some(out) = review_output {
let mut findings_str = String::new();
let text = out.overall_explanation.trim();
if !text.is_empty() {
findings_str.push_str(text);
}
if !out.findings.is_empty() {
let block = format_review_findings_block(&out.findings, None);
findings_str.push_str(&format!("\n{block}"));
}
user_message.push_str(&format!(
r#"<user_action>
<context>User initiated a review task. Here's the full review output from reviewer model. User may select one or more comments to resolve.</context>
<action>review</action>
<results>
{findings_str}
</results>
</user_action>
"#));
} else {
user_message.push_str(r#"<user_action>
<context>User initiated a review task, but was interrupted. If user asks about this, tell them to re-initiate a review with `/review` and wait for it to complete.</context>
<action>review</action>
<results>
None.
</results>
</user_action>
"#);
}
session
.record_conversation_items(&[ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText { text: user_message }],
}])
.await;
}
/// Parse the review output; when not valid JSON, build a structured
/// fallback that carries the plain text as the overall explanation.
///
/// Returns: a ReviewOutputEvent parsed from JSON or a fallback populated from text.
fn parse_review_output_event(text: &str) -> ReviewOutputEvent {
// Try direct parse first
if let Ok(ev) = serde_json::from_str::<ReviewOutputEvent>(text) {
return ev;
}
// If wrapped in markdown fences or extra prose, attempt to extract the first JSON object
if let (Some(start), Some(end)) = (text.find('{'), text.rfind('}'))
&& start < end
&& let Some(slice) = text.get(start..=end)
&& let Ok(ev) = serde_json::from_str::<ReviewOutputEvent>(slice)
{
return ev;
}
// Not JSON return a structured ReviewOutputEvent that carries
// the plain text as the overall explanation.
ReviewOutputEvent {
overall_explanation: text.to_string(),
..Default::default()
}
}

View File

@@ -2,6 +2,7 @@ use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_util::either::Either;
use tokio_util::sync::CancellationToken;
use tokio_util::task::AbortOnDropHandle;
use crate::codex::Session;
@@ -9,8 +10,10 @@ use crate::codex::TurnContext;
use crate::error::CodexErr;
use crate::function_tool::FunctionCallError;
use crate::tools::context::SharedTurnDiffTracker;
use crate::tools::context::ToolPayload;
use crate::tools::router::ToolCall;
use crate::tools::router::ToolRouter;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
pub(crate) struct ToolCallRuntime {
@@ -40,6 +43,7 @@ impl ToolCallRuntime {
pub(crate) fn handle_tool_call(
&self,
call: ToolCall,
cancellation_token: CancellationToken,
) -> impl std::future::Future<Output = Result<ResponseInputItem, CodexErr>> {
let supports_parallel = self.router.tool_supports_parallel(&call.tool_name);
@@ -48,18 +52,24 @@ impl ToolCallRuntime {
let turn = Arc::clone(&self.turn_context);
let tracker = Arc::clone(&self.tracker);
let lock = Arc::clone(&self.parallel_execution);
let aborted_response = Self::aborted_response(&call);
let handle: AbortOnDropHandle<Result<ResponseInputItem, FunctionCallError>> =
AbortOnDropHandle::new(tokio::spawn(async move {
let _guard = if supports_parallel {
Either::Left(lock.read().await)
} else {
Either::Right(lock.write().await)
};
tokio::select! {
_ = cancellation_token.cancelled() => Ok(aborted_response),
res = async {
let _guard = if supports_parallel {
Either::Left(lock.read().await)
} else {
Either::Right(lock.write().await)
};
router
.dispatch_tool_call(session, turn, tracker, call)
.await
router
.dispatch_tool_call(session, turn, tracker, call)
.await
} => res,
}
}));
async move {
@@ -74,3 +84,25 @@ impl ToolCallRuntime {
}
}
}
impl ToolCallRuntime {
fn aborted_response(call: &ToolCall) -> ResponseInputItem {
match &call.payload {
ToolPayload::Custom { .. } => ResponseInputItem::CustomToolCallOutput {
call_id: call.call_id.clone(),
output: "aborted".to_string(),
},
ToolPayload::Mcp { .. } => ResponseInputItem::McpToolCallOutput {
call_id: call.call_id.clone(),
result: Err("aborted".to_string()),
},
_ => ResponseInputItem::FunctionCallOutput {
call_id: call.call_id.clone(),
output: FunctionCallOutputPayload {
content: "aborted".to_string(),
success: None,
},
},
}
}
}

View File

@@ -35,6 +35,22 @@ impl ResponseMock {
pub fn requests(&self) -> Vec<ResponsesRequest> {
self.requests.lock().unwrap().clone()
}
/// Returns true if any captured request contains a `function_call` with the
/// provided `call_id`.
pub fn saw_function_call(&self, call_id: &str) -> bool {
self.requests()
.iter()
.any(|req| req.has_function_call(call_id))
}
/// Returns the `output` string for a matching `function_call_output` with
/// the provided `call_id`, searching across all captured requests.
pub fn function_call_output_text(&self, call_id: &str) -> Option<String> {
self.requests()
.iter()
.find_map(|req| req.function_call_output_text(call_id))
}
}
#[derive(Debug, Clone)]
@@ -70,6 +86,28 @@ impl ResponsesRequest {
.unwrap_or_else(|| panic!("function call output {call_id} item not found in request"))
}
/// Returns true if this request's `input` contains a `function_call` with
/// the specified `call_id`.
pub fn has_function_call(&self, call_id: &str) -> bool {
self.input().iter().any(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call")
&& item.get("call_id").and_then(Value::as_str) == Some(call_id)
})
}
/// If present, returns the `output` string of the `function_call_output`
/// entry matching `call_id` in this request's `input`.
pub fn function_call_output_text(&self, call_id: &str) -> Option<String> {
let binding = self.input();
let item = binding.iter().find(|item| {
item.get("type").and_then(Value::as_str) == Some("function_call_output")
&& item.get("call_id").and_then(Value::as_str) == Some(call_id)
})?;
item.get("output")
.and_then(Value::as_str)
.map(str::to_string)
}
pub fn header(&self, name: &str) -> Option<String> {
self.0
.headers

View File

@@ -1,3 +1,4 @@
use std::sync::Arc;
use std::time::Duration;
use codex_core::protocol::EventMsg;
@@ -5,7 +6,9 @@ use codex_core::protocol::Op;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::test_codex::test_codex;
@@ -67,3 +70,98 @@ async fn interrupt_long_running_tool_emits_turn_aborted() {
)
.await;
}
/// After an interrupt we expect the next request to the model to include both
/// the original tool call and an `"aborted"` `function_call_output`. This test
/// exercises the follow-up flow: it sends another user turn, inspects the mock
/// responses server, and ensures the model receives the synthesized abort.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn interrupt_tool_records_history_entries() {
let command = vec![
"bash".to_string(),
"-lc".to_string(),
"sleep 60".to_string(),
];
let call_id = "call-history";
let args = json!({
"command": command,
"timeout_ms": 60_000
})
.to_string();
let first_body = sse(vec![
ev_response_created("resp-history"),
ev_function_call(call_id, "shell", &args),
ev_completed("resp-history"),
]);
let follow_up_body = sse(vec![
ev_response_created("resp-followup"),
ev_completed("resp-followup"),
]);
let server = start_mock_server().await;
let response_mock = mount_sse_sequence(&server, vec![first_body, follow_up_body]).await;
let fixture = test_codex().build(&server).await.unwrap();
let codex = Arc::clone(&fixture.codex);
let wait_timeout = Duration::from_millis(100);
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "start history recording".into(),
}],
})
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::ExecCommandBegin(_)),
wait_timeout,
)
.await;
codex.submit(Op::Interrupt).await.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TurnAborted(_)),
wait_timeout,
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "follow up".into(),
}],
})
.await
.unwrap();
wait_for_event_with_timeout(
&codex,
|ev| matches!(ev, EventMsg::TaskComplete(_)),
wait_timeout,
)
.await;
let requests = response_mock.requests();
assert!(
requests.len() == 2,
"expected two calls to the responses API, got {}",
requests.len()
);
assert!(
response_mock.saw_function_call(call_id),
"function call not recorded in responses payload"
);
assert_eq!(
response_mock.function_call_output_text(call_id).as_deref(),
Some("aborted"),
"aborted function call output not recorded in responses payload"
);
}

View File

@@ -919,6 +919,7 @@ pub enum SessionSource {
VSCode,
Exec,
Mcp,
SubAgent,
#[serde(other)]
Unknown,
}