mirror of
https://github.com/openai/codex.git
synced 2026-02-02 06:57:03 +00:00
Compare commits
13 Commits
rust-v0.67
...
prototype
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a9dd1b2e11 | ||
|
|
54f6a292bb | ||
|
|
282e696079 | ||
|
|
bb26652be1 | ||
|
|
d830ec9678 | ||
|
|
c3ef52f442 | ||
|
|
d7ad216217 | ||
|
|
e380b297d0 | ||
|
|
d63e920ea4 | ||
|
|
838d82e19c | ||
|
|
89efcac5a3 | ||
|
|
6dcb9ff39d | ||
|
|
3bf76f79f0 |
@@ -10,7 +10,7 @@ use crate::function_tool::FunctionCallError;
|
||||
use crate::mcp::auth::McpAuthStatusEntry;
|
||||
use crate::parse_command::parse_command;
|
||||
use crate::parse_turn_item;
|
||||
use crate::review_format::format_review_findings_block;
|
||||
use crate::response_processing::process_items;
|
||||
use crate::terminal;
|
||||
use crate::user_notification::UserNotifier;
|
||||
use async_channel::Receiver;
|
||||
@@ -19,7 +19,6 @@ use codex_apply_patch::ApplyPatchAction;
|
||||
use codex_protocol::ConversationId;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::protocol::ConversationPathResponseEvent;
|
||||
use codex_protocol::protocol::ExitedReviewModeEvent;
|
||||
use codex_protocol::protocol::ItemCompletedEvent;
|
||||
use codex_protocol::protocol::ItemStartedEvent;
|
||||
use codex_protocol::protocol::ReviewRequest;
|
||||
@@ -46,7 +45,6 @@ use tokio_util::sync::CancellationToken;
|
||||
use tracing::debug;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
use tracing::trace;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::ModelProviderInfo;
|
||||
@@ -85,7 +83,6 @@ use crate::protocol::ListCustomPromptsResponseEvent;
|
||||
use crate::protocol::Op;
|
||||
use crate::protocol::RateLimitSnapshot;
|
||||
use crate::protocol::ReviewDecision;
|
||||
use crate::protocol::ReviewOutputEvent;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use crate::protocol::SessionConfiguredEvent;
|
||||
use crate::protocol::StreamErrorEvent;
|
||||
@@ -266,7 +263,6 @@ pub(crate) struct TurnContext {
|
||||
pub(crate) sandbox_policy: SandboxPolicy,
|
||||
pub(crate) shell_environment_policy: ShellEnvironmentPolicy,
|
||||
pub(crate) tools_config: ToolsConfig,
|
||||
pub(crate) is_review_mode: bool,
|
||||
pub(crate) final_output_json_schema: Option<Value>,
|
||||
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
}
|
||||
@@ -401,7 +397,6 @@ impl Session {
|
||||
sandbox_policy: session_configuration.sandbox_policy.clone(),
|
||||
shell_environment_policy: config.shell_environment_policy.clone(),
|
||||
tools_config,
|
||||
is_review_mode: false,
|
||||
final_output_json_schema: None,
|
||||
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
|
||||
}
|
||||
@@ -633,6 +628,14 @@ impl Session {
|
||||
state.session_configuration = state.session_configuration.apply(&updates);
|
||||
}
|
||||
|
||||
pub(crate) async fn base_config(&self) -> Arc<Config> {
|
||||
let state = self.state.lock().await;
|
||||
state
|
||||
.session_configuration
|
||||
.original_config_do_not_use
|
||||
.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn new_turn(&self, updates: SessionSettingsUpdate) -> Arc<TurnContext> {
|
||||
let sub_id = self.next_internal_sub_id();
|
||||
self.new_turn_with_sub_id(sub_id, updates).await
|
||||
@@ -855,7 +858,7 @@ impl Session {
|
||||
|
||||
/// Records input items: always append to conversation history and
|
||||
/// persist these response items to rollout.
|
||||
async fn record_conversation_items(&self, items: &[ResponseItem]) {
|
||||
pub(crate) async fn record_conversation_items(&self, items: &[ResponseItem]) {
|
||||
self.record_into_history(items).await;
|
||||
self.persist_rollout_response_items(items).await;
|
||||
}
|
||||
@@ -1467,7 +1470,6 @@ async fn spawn_review_thread(
|
||||
sandbox_policy: parent_turn_context.sandbox_policy.clone(),
|
||||
shell_environment_policy: parent_turn_context.shell_environment_policy.clone(),
|
||||
cwd: parent_turn_context.cwd.clone(),
|
||||
is_review_mode: true,
|
||||
final_output_json_schema: None,
|
||||
codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(),
|
||||
};
|
||||
@@ -1517,21 +1519,8 @@ pub(crate) async fn run_task(
|
||||
sess.send_event(&turn_context, event).await;
|
||||
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
||||
// For review threads, keep an isolated in-memory history so the
|
||||
// model sees a fresh conversation without the parent session's history.
|
||||
// For normal turns, continue recording to the session history as before.
|
||||
let is_review_mode = turn_context.is_review_mode;
|
||||
|
||||
let mut review_thread_history: ConversationHistory = ConversationHistory::new();
|
||||
if is_review_mode {
|
||||
// Seed review threads with environment context so the model knows the working directory.
|
||||
review_thread_history
|
||||
.record_items(sess.build_initial_context(turn_context.as_ref()).iter());
|
||||
review_thread_history.record_items(std::iter::once(&initial_input_for_turn.into()));
|
||||
} else {
|
||||
sess.record_input_and_rollout_usermsg(turn_context.as_ref(), &initial_input_for_turn)
|
||||
.await;
|
||||
}
|
||||
sess.record_input_and_rollout_usermsg(turn_context.as_ref(), &initial_input_for_turn)
|
||||
.await;
|
||||
|
||||
let mut last_agent_message: Option<String> = None;
|
||||
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
|
||||
@@ -1560,12 +1549,7 @@ pub(crate) async fn run_task(
|
||||
// conversation history on each turn. The rollout file, however, should
|
||||
// only record the new items that originated in this turn so that it
|
||||
// represents an append-only log without duplicates.
|
||||
let turn_input: Vec<ResponseItem> = if is_review_mode {
|
||||
if !pending_input.is_empty() {
|
||||
review_thread_history.record_items(&pending_input);
|
||||
}
|
||||
review_thread_history.get_history()
|
||||
} else {
|
||||
let turn_input: Vec<ResponseItem> = {
|
||||
sess.record_conversation_items(&pending_input).await;
|
||||
sess.history_snapshot().await
|
||||
};
|
||||
@@ -1608,109 +1592,8 @@ pub(crate) async fn run_task(
|
||||
let token_limit_reached = total_usage_tokens
|
||||
.map(|tokens| tokens >= limit)
|
||||
.unwrap_or(false);
|
||||
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
|
||||
let mut responses = Vec::<ResponseInputItem>::new();
|
||||
for processed_response_item in processed_items {
|
||||
let ProcessedResponseItem { item, response } = processed_response_item;
|
||||
match (&item, &response) {
|
||||
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
|
||||
// If the model returned a message, we need to record it.
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
}
|
||||
(
|
||||
ResponseItem::LocalShellCall { .. },
|
||||
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
items_to_record_in_conversation_history.push(
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: output.clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
(
|
||||
ResponseItem::FunctionCall { .. },
|
||||
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
items_to_record_in_conversation_history.push(
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: output.clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
(
|
||||
ResponseItem::CustomToolCall { .. },
|
||||
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }),
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
items_to_record_in_conversation_history.push(
|
||||
ResponseItem::CustomToolCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: output.clone(),
|
||||
},
|
||||
);
|
||||
}
|
||||
(
|
||||
ResponseItem::FunctionCall { .. },
|
||||
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
let output = match result {
|
||||
Ok(call_tool_result) => {
|
||||
convert_call_tool_result_to_function_call_output_payload(
|
||||
call_tool_result,
|
||||
)
|
||||
}
|
||||
Err(err) => FunctionCallOutputPayload {
|
||||
content: err.clone(),
|
||||
success: Some(false),
|
||||
},
|
||||
};
|
||||
items_to_record_in_conversation_history.push(
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output,
|
||||
},
|
||||
);
|
||||
}
|
||||
(
|
||||
ResponseItem::Reasoning {
|
||||
id,
|
||||
summary,
|
||||
content,
|
||||
encrypted_content,
|
||||
},
|
||||
None,
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
|
||||
id: id.clone(),
|
||||
summary: summary.clone(),
|
||||
content: content.clone(),
|
||||
encrypted_content: encrypted_content.clone(),
|
||||
});
|
||||
}
|
||||
_ => {
|
||||
warn!("Unexpected response item: {item:?} with response: {response:?}");
|
||||
}
|
||||
};
|
||||
if let Some(response) = response {
|
||||
responses.push(response);
|
||||
}
|
||||
}
|
||||
|
||||
// Only attempt to take the lock if there is something to record.
|
||||
if !items_to_record_in_conversation_history.is_empty() {
|
||||
if is_review_mode {
|
||||
review_thread_history
|
||||
.record_items(items_to_record_in_conversation_history.iter());
|
||||
} else {
|
||||
sess.record_conversation_items(&items_to_record_in_conversation_history)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
let (responses, items_to_record_in_conversation_history) =
|
||||
process_items(processed_items, &sess).await;
|
||||
|
||||
if token_limit_reached {
|
||||
if auto_compact_recently_attempted {
|
||||
@@ -1749,7 +1632,10 @@ pub(crate) async fn run_task(
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Err(CodexErr::TurnAborted) => {
|
||||
Err(CodexErr::TurnAborted {
|
||||
dangling_artifacts: processed_items,
|
||||
}) => {
|
||||
let _ = process_items(processed_items, &sess).await;
|
||||
// Aborted turn is reported via a different event.
|
||||
break;
|
||||
}
|
||||
@@ -1765,50 +1651,9 @@ pub(crate) async fn run_task(
|
||||
}
|
||||
}
|
||||
|
||||
// If this was a review thread and we have a final assistant message,
|
||||
// try to parse it as a ReviewOutput.
|
||||
//
|
||||
// If parsing fails, construct a minimal ReviewOutputEvent using the plain
|
||||
// text as the overall explanation. Else, just exit review mode with None.
|
||||
//
|
||||
// Emits an ExitedReviewMode event with the parsed review output.
|
||||
if turn_context.is_review_mode {
|
||||
exit_review_mode(
|
||||
sess.clone(),
|
||||
Arc::clone(&turn_context),
|
||||
last_agent_message.as_deref().map(parse_review_output_event),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
last_agent_message
|
||||
}
|
||||
|
||||
/// Parse the review output; when not valid JSON, build a structured
|
||||
/// fallback that carries the plain text as the overall explanation.
|
||||
///
|
||||
/// Returns: a ReviewOutputEvent parsed from JSON or a fallback populated from text.
|
||||
fn parse_review_output_event(text: &str) -> ReviewOutputEvent {
|
||||
// Try direct parse first
|
||||
if let Ok(ev) = serde_json::from_str::<ReviewOutputEvent>(text) {
|
||||
return ev;
|
||||
}
|
||||
// If wrapped in markdown fences or extra prose, attempt to extract the first JSON object
|
||||
if let (Some(start), Some(end)) = (text.find('{'), text.rfind('}'))
|
||||
&& start < end
|
||||
&& let Some(slice) = text.get(start..=end)
|
||||
&& let Ok(ev) = serde_json::from_str::<ReviewOutputEvent>(slice)
|
||||
{
|
||||
return ev;
|
||||
}
|
||||
// Not JSON – return a structured ReviewOutputEvent that carries
|
||||
// the plain text as the overall explanation.
|
||||
ReviewOutputEvent {
|
||||
overall_explanation: text.to_string(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_turn(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
@@ -1850,7 +1695,13 @@ async fn run_turn(
|
||||
.await
|
||||
{
|
||||
Ok(output) => return Ok(output),
|
||||
Err(CodexErr::TurnAborted) => return Err(CodexErr::TurnAborted),
|
||||
Err(CodexErr::TurnAborted {
|
||||
dangling_artifacts: processed_items,
|
||||
}) => {
|
||||
return Err(CodexErr::TurnAborted {
|
||||
dangling_artifacts: processed_items,
|
||||
});
|
||||
}
|
||||
Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted),
|
||||
Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)),
|
||||
Err(e @ CodexErr::Fatal(_)) => return Err(e),
|
||||
@@ -1903,9 +1754,9 @@ async fn run_turn(
|
||||
/// "handled" such that it produces a `ResponseInputItem` that needs to be
|
||||
/// sent back to the model on the next turn.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct ProcessedResponseItem {
|
||||
pub(crate) item: ResponseItem,
|
||||
pub(crate) response: Option<ResponseInputItem>,
|
||||
pub struct ProcessedResponseItem {
|
||||
pub item: ResponseItem,
|
||||
pub response: Option<ResponseInputItem>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@@ -1954,7 +1805,15 @@ async fn try_run_turn(
|
||||
// Poll the next item from the model stream. We must inspect *both* Ok and Err
|
||||
// cases so that transient stream failures (e.g., dropped SSE connection before
|
||||
// `response.completed`) bubble up and trigger the caller's retry logic.
|
||||
let event = stream.next().or_cancel(&cancellation_token).await?;
|
||||
let event = match stream.next().or_cancel(&cancellation_token).await {
|
||||
Ok(event) => event,
|
||||
Err(codex_async_utils::CancelErr::Cancelled) => {
|
||||
let processed_items = output.try_collect().await?;
|
||||
return Err(CodexErr::TurnAborted {
|
||||
dangling_artifacts: processed_items,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
let event = match event {
|
||||
Some(res) => res?,
|
||||
@@ -1978,7 +1837,8 @@ async fn try_run_turn(
|
||||
let payload_preview = call.payload.log_payload().into_owned();
|
||||
tracing::info!("ToolCall: {} {}", call.tool_name, payload_preview);
|
||||
|
||||
let response = tool_runtime.handle_tool_call(call);
|
||||
let response =
|
||||
tool_runtime.handle_tool_call(call, cancellation_token.child_token());
|
||||
|
||||
output.push_back(
|
||||
async move {
|
||||
@@ -2060,12 +1920,7 @@ async fn try_run_turn(
|
||||
} => {
|
||||
sess.update_token_usage_info(turn_context.as_ref(), token_usage.as_ref())
|
||||
.await;
|
||||
|
||||
let processed_items = output
|
||||
.try_collect()
|
||||
.or_cancel(&cancellation_token)
|
||||
.await??;
|
||||
|
||||
let processed_items = output.try_collect().await?;
|
||||
let unified_diff = {
|
||||
let mut tracker = turn_diff_tracker.lock().await;
|
||||
tracker.get_unified_diff()
|
||||
@@ -2085,12 +1940,8 @@ async fn try_run_turn(
|
||||
ResponseEvent::OutputTextDelta(delta) => {
|
||||
// In review child threads, suppress assistant text deltas; the
|
||||
// UI will show a selection popup from the final ReviewOutput.
|
||||
if !turn_context.is_review_mode {
|
||||
let event = EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta });
|
||||
sess.send_event(&turn_context, event).await;
|
||||
} else {
|
||||
trace!("suppressing OutputTextDelta in review mode");
|
||||
}
|
||||
let event = EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta });
|
||||
sess.send_event(&turn_context, event).await;
|
||||
}
|
||||
ResponseEvent::ReasoningSummaryDelta(delta) => {
|
||||
let event = EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta });
|
||||
@@ -2125,13 +1976,7 @@ async fn handle_non_tool_response_item(
|
||||
ResponseItem::Message { .. }
|
||||
| ResponseItem::Reasoning { .. }
|
||||
| ResponseItem::WebSearchCall { .. } => {
|
||||
let turn_item = match &item {
|
||||
ResponseItem::Message { .. } if turn_context.is_review_mode => {
|
||||
trace!("suppressing assistant Message in review mode");
|
||||
None
|
||||
}
|
||||
_ => parse_turn_item(&item),
|
||||
};
|
||||
let turn_item = parse_turn_item(&item);
|
||||
if let Some(turn_item) = turn_item {
|
||||
sess.emit_turn_item_started_completed(
|
||||
turn_context.as_ref(),
|
||||
@@ -2169,7 +2014,7 @@ pub(super) fn get_last_assistant_message_from_turn(responses: &[ResponseItem]) -
|
||||
}
|
||||
})
|
||||
}
|
||||
fn convert_call_tool_result_to_function_call_output_payload(
|
||||
pub(crate) fn convert_call_tool_result_to_function_call_output_payload(
|
||||
call_tool_result: &CallToolResult,
|
||||
) -> FunctionCallOutputPayload {
|
||||
let CallToolResult {
|
||||
@@ -2204,58 +2049,6 @@ fn convert_call_tool_result_to_function_call_output_payload(
|
||||
}
|
||||
}
|
||||
|
||||
/// Emits an ExitedReviewMode Event with optional ReviewOutput,
|
||||
/// and records a developer message with the review output.
|
||||
pub(crate) async fn exit_review_mode(
|
||||
session: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
review_output: Option<ReviewOutputEvent>,
|
||||
) {
|
||||
let event = EventMsg::ExitedReviewMode(ExitedReviewModeEvent {
|
||||
review_output: review_output.clone(),
|
||||
});
|
||||
session.send_event(turn_context.as_ref(), event).await;
|
||||
|
||||
let mut user_message = String::new();
|
||||
if let Some(out) = review_output {
|
||||
let mut findings_str = String::new();
|
||||
let text = out.overall_explanation.trim();
|
||||
if !text.is_empty() {
|
||||
findings_str.push_str(text);
|
||||
}
|
||||
if !out.findings.is_empty() {
|
||||
let block = format_review_findings_block(&out.findings, None);
|
||||
findings_str.push_str(&format!("\n{block}"));
|
||||
}
|
||||
user_message.push_str(&format!(
|
||||
r#"<user_action>
|
||||
<context>User initiated a review task. Here's the full review output from reviewer model. User may select one or more comments to resolve.</context>
|
||||
<action>review</action>
|
||||
<results>
|
||||
{findings_str}
|
||||
</results>
|
||||
</user_action>
|
||||
"#));
|
||||
} else {
|
||||
user_message.push_str(r#"<user_action>
|
||||
<context>User initiated a review task, but was interrupted. If user asks about this, tell them to re-initiate a review with `/review` and wait for it to complete.</context>
|
||||
<action>review</action>
|
||||
<results>
|
||||
None.
|
||||
</results>
|
||||
</user_action>
|
||||
"#);
|
||||
}
|
||||
|
||||
session
|
||||
.record_conversation_items(&[ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText { text: user_message }],
|
||||
}])
|
||||
.await;
|
||||
}
|
||||
|
||||
fn mcp_init_error_display(
|
||||
server_name: &str,
|
||||
entry: Option<&McpAuthStatusEntry>,
|
||||
@@ -2735,12 +2528,6 @@ mod tests {
|
||||
sleep(Duration::from_secs(60)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
|
||||
if let TaskKind::Review = self.kind {
|
||||
exit_review_mode(session.clone_session(), ctx, None).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::codex::ProcessedResponseItem;
|
||||
use crate::exec::ExecToolCallOutput;
|
||||
use crate::token_data::KnownPlan;
|
||||
use crate::token_data::PlanType;
|
||||
@@ -53,8 +54,11 @@ pub enum SandboxErr {
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum CodexErr {
|
||||
// todo(aibrahim): git rid of this error carrying the dangling artifacts
|
||||
#[error("turn aborted")]
|
||||
TurnAborted,
|
||||
TurnAborted {
|
||||
dangling_artifacts: Vec<ProcessedResponseItem>,
|
||||
},
|
||||
|
||||
/// Returned by ResponsesClient when the SSE stream disconnects or errors out **after** the HTTP
|
||||
/// handshake has succeeded but **before** it finished emitting `response.completed`.
|
||||
@@ -158,7 +162,9 @@ pub enum CodexErr {
|
||||
|
||||
impl From<CancelErr> for CodexErr {
|
||||
fn from(_: CancelErr) -> Self {
|
||||
CodexErr::TurnAborted
|
||||
CodexErr::TurnAborted {
|
||||
dangling_artifacts: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -14,6 +14,7 @@ mod client_common;
|
||||
pub mod codex;
|
||||
mod codex_conversation;
|
||||
pub use codex_conversation::CodexConversation;
|
||||
mod codex_delegate;
|
||||
mod command_safety;
|
||||
pub mod config;
|
||||
pub mod config_edit;
|
||||
@@ -36,6 +37,7 @@ mod mcp_tool_call;
|
||||
mod message_history;
|
||||
mod model_provider_info;
|
||||
pub mod parse_command;
|
||||
mod response_processing;
|
||||
pub mod sandboxing;
|
||||
pub mod token_data;
|
||||
mod truncate;
|
||||
|
||||
105
codex-rs/core/src/response_processing.rs
Normal file
105
codex-rs/core/src/response_processing.rs
Normal file
@@ -0,0 +1,105 @@
|
||||
use crate::codex::Session;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use tracing::warn;
|
||||
|
||||
/// Process streamed `ResponseItem`s from the model into the pair of:
|
||||
/// - items we should record in conversation history; and
|
||||
/// - `ResponseInputItem`s to send back to the model on the next turn.
|
||||
pub(crate) async fn process_items(
|
||||
processed_items: Vec<crate::codex::ProcessedResponseItem>,
|
||||
sess: &Session,
|
||||
) -> (Vec<ResponseInputItem>, Vec<ResponseItem>) {
|
||||
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();
|
||||
let mut responses = Vec::<ResponseInputItem>::new();
|
||||
for processed_response_item in processed_items {
|
||||
let crate::codex::ProcessedResponseItem { item, response } = processed_response_item;
|
||||
match (&item, &response) {
|
||||
(ResponseItem::Message { role, .. }, None) if role == "assistant" => {
|
||||
// If the model returned a message, we need to record it.
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
}
|
||||
(
|
||||
ResponseItem::LocalShellCall { .. },
|
||||
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: output.clone(),
|
||||
});
|
||||
}
|
||||
(
|
||||
ResponseItem::FunctionCall { .. },
|
||||
Some(ResponseInputItem::FunctionCallOutput { call_id, output }),
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: output.clone(),
|
||||
});
|
||||
}
|
||||
(
|
||||
ResponseItem::CustomToolCall { .. },
|
||||
Some(ResponseInputItem::CustomToolCallOutput { call_id, output }),
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
items_to_record_in_conversation_history.push(ResponseItem::CustomToolCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: output.clone(),
|
||||
});
|
||||
}
|
||||
(
|
||||
ResponseItem::FunctionCall { .. },
|
||||
Some(ResponseInputItem::McpToolCallOutput { call_id, result }),
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(item);
|
||||
let output = match result {
|
||||
Ok(call_tool_result) => {
|
||||
crate::codex::convert_call_tool_result_to_function_call_output_payload(
|
||||
call_tool_result,
|
||||
)
|
||||
}
|
||||
Err(err) => FunctionCallOutputPayload {
|
||||
content: err.clone(),
|
||||
success: Some(false),
|
||||
},
|
||||
};
|
||||
items_to_record_in_conversation_history.push(ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output,
|
||||
});
|
||||
}
|
||||
(
|
||||
ResponseItem::Reasoning {
|
||||
id,
|
||||
summary,
|
||||
content,
|
||||
encrypted_content,
|
||||
},
|
||||
None,
|
||||
) => {
|
||||
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
|
||||
id: id.clone(),
|
||||
summary: summary.clone(),
|
||||
content: content.clone(),
|
||||
encrypted_content: encrypted_content.clone(),
|
||||
});
|
||||
}
|
||||
_ => {
|
||||
warn!("Unexpected response item: {item:?} with response: {response:?}");
|
||||
}
|
||||
};
|
||||
if let Some(response) = response {
|
||||
responses.push(response);
|
||||
}
|
||||
}
|
||||
|
||||
// Only attempt to take the lock if there is something to record.
|
||||
if !items_to_record_in_conversation_history.is_empty() {
|
||||
sess.record_conversation_items(&items_to_record_in_conversation_history)
|
||||
.await;
|
||||
}
|
||||
(responses, items_to_record_in_conversation_history)
|
||||
}
|
||||
@@ -13,8 +13,10 @@ use tokio_util::task::AbortOnDropHandle;
|
||||
use tracing::trace;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::AuthManager;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::config::Config;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::TaskCompleteEvent;
|
||||
use crate::protocol::TurnAbortReason;
|
||||
@@ -44,6 +46,14 @@ impl SessionTaskContext {
|
||||
pub(crate) fn clone_session(&self) -> Arc<Session> {
|
||||
Arc::clone(&self.session)
|
||||
}
|
||||
|
||||
pub(crate) fn auth_manager(&self) -> Arc<AuthManager> {
|
||||
Arc::clone(&self.session.services.auth_manager)
|
||||
}
|
||||
|
||||
pub(crate) async fn base_config(&self) -> Arc<Config> {
|
||||
self.session.base_config().await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -1,11 +1,18 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ReviewOutputEvent;
|
||||
use codex_protocol::protocol::TaskCompleteEvent;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::codex::exit_review_mode;
|
||||
use crate::codex::run_task;
|
||||
use crate::codex_delegate::run_codex_conversation;
|
||||
// use crate::config::Config; // no longer needed directly; use session.base_config()
|
||||
use crate::review_format::format_review_findings_block;
|
||||
use crate::state::TaskKind;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
|
||||
@@ -28,11 +35,108 @@ impl SessionTask for ReviewTask {
|
||||
input: Vec<UserInput>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
let sess = session.clone_session();
|
||||
run_task(sess, ctx, input, TaskKind::Review, cancellation_token).await
|
||||
// let sess = session.clone_session();
|
||||
// run_task(sess, ctx, input, TaskKind::Review, cancellation_token).await
|
||||
|
||||
let config = session.base_config().await.as_ref().clone();
|
||||
let receiver =
|
||||
match run_codex_conversation(config, session.auth_manager(), input, cancellation_token)
|
||||
.await
|
||||
{
|
||||
Ok(r) => r,
|
||||
Err(_) => return None,
|
||||
};
|
||||
while let Ok(event) = receiver.recv().await {
|
||||
session
|
||||
.clone_session()
|
||||
.send_event(ctx.as_ref(), event.clone())
|
||||
.await;
|
||||
if let EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) = event {
|
||||
exit_review_mode(
|
||||
session.clone_session(),
|
||||
last_agent_message.as_deref().map(parse_review_output_event),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
Some("".to_string())
|
||||
}
|
||||
|
||||
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
|
||||
exit_review_mode(session.clone_session(), ctx, None).await;
|
||||
async fn abort(&self, session: Arc<SessionTaskContext>, _ctx: Arc<TurnContext>) {
|
||||
exit_review_mode(session.clone_session(), None).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Emits an ExitedReviewMode Event with optional ReviewOutput,
|
||||
/// and records a developer message with the review output.
|
||||
pub(crate) async fn exit_review_mode(
|
||||
session: Arc<Session>,
|
||||
review_output: Option<ReviewOutputEvent>,
|
||||
) {
|
||||
// ExitedReviewMode event can be emitted by the caller if needed.
|
||||
|
||||
let mut user_message = String::new();
|
||||
if let Some(out) = review_output {
|
||||
let mut findings_str = String::new();
|
||||
let text = out.overall_explanation.trim();
|
||||
if !text.is_empty() {
|
||||
findings_str.push_str(text);
|
||||
}
|
||||
if !out.findings.is_empty() {
|
||||
let block = format_review_findings_block(&out.findings, None);
|
||||
findings_str.push_str(&format!("\n{block}"));
|
||||
}
|
||||
user_message.push_str(&format!(
|
||||
r#"<user_action>
|
||||
<context>User initiated a review task. Here's the full review output from reviewer model. User may select one or more comments to resolve.</context>
|
||||
<action>review</action>
|
||||
<results>
|
||||
{findings_str}
|
||||
</results>
|
||||
</user_action>
|
||||
"#));
|
||||
} else {
|
||||
user_message.push_str(r#"<user_action>
|
||||
<context>User initiated a review task, but was interrupted. If user asks about this, tell them to re-initiate a review with `/review` and wait for it to complete.</context>
|
||||
<action>review</action>
|
||||
<results>
|
||||
None.
|
||||
</results>
|
||||
</user_action>
|
||||
"#);
|
||||
}
|
||||
|
||||
session
|
||||
.record_conversation_items(&[ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText { text: user_message }],
|
||||
}])
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Parse the review output; when not valid JSON, build a structured
|
||||
/// fallback that carries the plain text as the overall explanation.
|
||||
///
|
||||
/// Returns: a ReviewOutputEvent parsed from JSON or a fallback populated from text.
|
||||
fn parse_review_output_event(text: &str) -> ReviewOutputEvent {
|
||||
// Try direct parse first
|
||||
if let Ok(ev) = serde_json::from_str::<ReviewOutputEvent>(text) {
|
||||
return ev;
|
||||
}
|
||||
// If wrapped in markdown fences or extra prose, attempt to extract the first JSON object
|
||||
if let (Some(start), Some(end)) = (text.find('{'), text.rfind('}'))
|
||||
&& start < end
|
||||
&& let Some(slice) = text.get(start..=end)
|
||||
&& let Ok(ev) = serde_json::from_str::<ReviewOutputEvent>(slice)
|
||||
{
|
||||
return ev;
|
||||
}
|
||||
// Not JSON – return a structured ReviewOutputEvent that carries
|
||||
// the plain text as the overall explanation.
|
||||
ReviewOutputEvent {
|
||||
overall_explanation: text.to_string(),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::sync::Arc;
|
||||
|
||||
use tokio::sync::RwLock;
|
||||
use tokio_util::either::Either;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tokio_util::task::AbortOnDropHandle;
|
||||
|
||||
use crate::codex::Session;
|
||||
@@ -9,8 +10,10 @@ use crate::codex::TurnContext;
|
||||
use crate::error::CodexErr;
|
||||
use crate::function_tool::FunctionCallError;
|
||||
use crate::tools::context::SharedTurnDiffTracker;
|
||||
use crate::tools::context::ToolPayload;
|
||||
use crate::tools::router::ToolCall;
|
||||
use crate::tools::router::ToolRouter;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
|
||||
pub(crate) struct ToolCallRuntime {
|
||||
@@ -40,6 +43,7 @@ impl ToolCallRuntime {
|
||||
pub(crate) fn handle_tool_call(
|
||||
&self,
|
||||
call: ToolCall,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> impl std::future::Future<Output = Result<ResponseInputItem, CodexErr>> {
|
||||
let supports_parallel = self.router.tool_supports_parallel(&call.tool_name);
|
||||
|
||||
@@ -48,18 +52,24 @@ impl ToolCallRuntime {
|
||||
let turn = Arc::clone(&self.turn_context);
|
||||
let tracker = Arc::clone(&self.tracker);
|
||||
let lock = Arc::clone(&self.parallel_execution);
|
||||
let aborted_response = Self::aborted_response(&call);
|
||||
|
||||
let handle: AbortOnDropHandle<Result<ResponseInputItem, FunctionCallError>> =
|
||||
AbortOnDropHandle::new(tokio::spawn(async move {
|
||||
let _guard = if supports_parallel {
|
||||
Either::Left(lock.read().await)
|
||||
} else {
|
||||
Either::Right(lock.write().await)
|
||||
};
|
||||
tokio::select! {
|
||||
_ = cancellation_token.cancelled() => Ok(aborted_response),
|
||||
res = async {
|
||||
let _guard = if supports_parallel {
|
||||
Either::Left(lock.read().await)
|
||||
} else {
|
||||
Either::Right(lock.write().await)
|
||||
};
|
||||
|
||||
router
|
||||
.dispatch_tool_call(session, turn, tracker, call)
|
||||
.await
|
||||
router
|
||||
.dispatch_tool_call(session, turn, tracker, call)
|
||||
.await
|
||||
} => res,
|
||||
}
|
||||
}));
|
||||
|
||||
async move {
|
||||
@@ -74,3 +84,25 @@ impl ToolCallRuntime {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ToolCallRuntime {
|
||||
fn aborted_response(call: &ToolCall) -> ResponseInputItem {
|
||||
match &call.payload {
|
||||
ToolPayload::Custom { .. } => ResponseInputItem::CustomToolCallOutput {
|
||||
call_id: call.call_id.clone(),
|
||||
output: "aborted".to_string(),
|
||||
},
|
||||
ToolPayload::Mcp { .. } => ResponseInputItem::McpToolCallOutput {
|
||||
call_id: call.call_id.clone(),
|
||||
result: Err("aborted".to_string()),
|
||||
},
|
||||
_ => ResponseInputItem::FunctionCallOutput {
|
||||
call_id: call.call_id.clone(),
|
||||
output: FunctionCallOutputPayload {
|
||||
content: "aborted".to_string(),
|
||||
success: None,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,6 +35,22 @@ impl ResponseMock {
|
||||
pub fn requests(&self) -> Vec<ResponsesRequest> {
|
||||
self.requests.lock().unwrap().clone()
|
||||
}
|
||||
|
||||
/// Returns true if any captured request contains a `function_call` with the
|
||||
/// provided `call_id`.
|
||||
pub fn saw_function_call(&self, call_id: &str) -> bool {
|
||||
self.requests()
|
||||
.iter()
|
||||
.any(|req| req.has_function_call(call_id))
|
||||
}
|
||||
|
||||
/// Returns the `output` string for a matching `function_call_output` with
|
||||
/// the provided `call_id`, searching across all captured requests.
|
||||
pub fn function_call_output_text(&self, call_id: &str) -> Option<String> {
|
||||
self.requests()
|
||||
.iter()
|
||||
.find_map(|req| req.function_call_output_text(call_id))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -70,6 +86,28 @@ impl ResponsesRequest {
|
||||
.unwrap_or_else(|| panic!("function call output {call_id} item not found in request"))
|
||||
}
|
||||
|
||||
/// Returns true if this request's `input` contains a `function_call` with
|
||||
/// the specified `call_id`.
|
||||
pub fn has_function_call(&self, call_id: &str) -> bool {
|
||||
self.input().iter().any(|item| {
|
||||
item.get("type").and_then(Value::as_str) == Some("function_call")
|
||||
&& item.get("call_id").and_then(Value::as_str) == Some(call_id)
|
||||
})
|
||||
}
|
||||
|
||||
/// If present, returns the `output` string of the `function_call_output`
|
||||
/// entry matching `call_id` in this request's `input`.
|
||||
pub fn function_call_output_text(&self, call_id: &str) -> Option<String> {
|
||||
let binding = self.input();
|
||||
let item = binding.iter().find(|item| {
|
||||
item.get("type").and_then(Value::as_str) == Some("function_call_output")
|
||||
&& item.get("call_id").and_then(Value::as_str) == Some(call_id)
|
||||
})?;
|
||||
item.get("output")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
}
|
||||
|
||||
pub fn header(&self, name: &str) -> Option<String> {
|
||||
self.0
|
||||
.headers
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_core::protocol::EventMsg;
|
||||
@@ -5,7 +6,9 @@ use codex_core::protocol::Op;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_function_call;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
@@ -67,3 +70,98 @@ async fn interrupt_long_running_tool_emits_turn_aborted() {
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
/// After an interrupt we expect the next request to the model to include both
|
||||
/// the original tool call and an `"aborted"` `function_call_output`. This test
|
||||
/// exercises the follow-up flow: it sends another user turn, inspects the mock
|
||||
/// responses server, and ensures the model receives the synthesized abort.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn interrupt_tool_records_history_entries() {
|
||||
let command = vec![
|
||||
"bash".to_string(),
|
||||
"-lc".to_string(),
|
||||
"sleep 60".to_string(),
|
||||
];
|
||||
let call_id = "call-history";
|
||||
|
||||
let args = json!({
|
||||
"command": command,
|
||||
"timeout_ms": 60_000
|
||||
})
|
||||
.to_string();
|
||||
let first_body = sse(vec![
|
||||
ev_response_created("resp-history"),
|
||||
ev_function_call(call_id, "shell", &args),
|
||||
ev_completed("resp-history"),
|
||||
]);
|
||||
let follow_up_body = sse(vec![
|
||||
ev_response_created("resp-followup"),
|
||||
ev_completed("resp-followup"),
|
||||
]);
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let response_mock = mount_sse_sequence(&server, vec![first_body, follow_up_body]).await;
|
||||
|
||||
let fixture = test_codex().build(&server).await.unwrap();
|
||||
let codex = Arc::clone(&fixture.codex);
|
||||
|
||||
let wait_timeout = Duration::from_millis(100);
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "start history recording".into(),
|
||||
}],
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event_with_timeout(
|
||||
&codex,
|
||||
|ev| matches!(ev, EventMsg::ExecCommandBegin(_)),
|
||||
wait_timeout,
|
||||
)
|
||||
.await;
|
||||
|
||||
codex.submit(Op::Interrupt).await.unwrap();
|
||||
|
||||
wait_for_event_with_timeout(
|
||||
&codex,
|
||||
|ev| matches!(ev, EventMsg::TurnAborted(_)),
|
||||
wait_timeout,
|
||||
)
|
||||
.await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "follow up".into(),
|
||||
}],
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event_with_timeout(
|
||||
&codex,
|
||||
|ev| matches!(ev, EventMsg::TaskComplete(_)),
|
||||
wait_timeout,
|
||||
)
|
||||
.await;
|
||||
|
||||
let requests = response_mock.requests();
|
||||
assert!(
|
||||
requests.len() == 2,
|
||||
"expected two calls to the responses API, got {}",
|
||||
requests.len()
|
||||
);
|
||||
|
||||
assert!(
|
||||
response_mock.saw_function_call(call_id),
|
||||
"function call not recorded in responses payload"
|
||||
);
|
||||
assert_eq!(
|
||||
response_mock.function_call_output_text(call_id).as_deref(),
|
||||
Some("aborted"),
|
||||
"aborted function call output not recorded in responses payload"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -919,6 +919,7 @@ pub enum SessionSource {
|
||||
VSCode,
|
||||
Exec,
|
||||
Mcp,
|
||||
SubAgent,
|
||||
#[serde(other)]
|
||||
Unknown,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user