Compare commits

...

3 Commits

Author SHA1 Message Date
Daniel Edrisian
6ed3c92ed1 Simplify 2025-09-22 19:34:06 -07:00
Daniel Edrisian
a2cfb125dc Fix 2025-09-22 17:18:30 -07:00
Daniel Edrisian
002d877c02 Auto-compact when running out of context 2025-09-22 17:00:23 -07:00
14 changed files with 271 additions and 8 deletions

View File

@@ -21,6 +21,7 @@ use crate::client_common::ResponseEvent;
use crate::client_common::ResponseStream;
use crate::error::CodexErr;
use crate::error::Result;
use crate::error_codes::CONTEXT_LENGTH_EXCEEDED;
use crate::model_family::ModelFamily;
use crate::openai_tools::create_tools_json_for_chat_completions_api;
use crate::util::backoff;
@@ -28,6 +29,19 @@ use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemContent;
use codex_protocol::models::ResponseItem;
// Minimal error body used to parse structured provider error codes on
// Chat Completions non2xx responses.
#[derive(serde::Deserialize)]
struct ChatErrorBody {
error: ChatErrorInner,
}
#[derive(serde::Deserialize)]
struct ChatErrorInner {
code: Option<String>,
message: Option<String>,
}
/// Implementation for the classic Chat Completions API.
pub(crate) async fn stream_chat_completions(
prompt: &Prompt,
@@ -309,6 +323,16 @@ pub(crate) async fn stream_chat_completions(
let status = res.status();
if !(status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error()) {
let body = (res.text().await).unwrap_or_default();
// Attempt to parse a structured error and map known codes.
if let Ok(parsed) = serde_json::from_str::<ChatErrorBody>(&body)
&& parsed.error.code.as_deref() == Some(CONTEXT_LENGTH_EXCEEDED)
{
return Err(CodexErr::ContextLengthExceeded(
parsed.error.message.unwrap_or_default(),
));
}
return Err(CodexErr::UnexpectedStatus(status, body));
}

View File

@@ -5,6 +5,7 @@ use std::time::Duration;
use crate::AuthManager;
use crate::auth::CodexAuth;
use crate::error_codes::CONTEXT_LENGTH_EXCEEDED;
use bytes::Bytes;
use codex_protocol::mcp_protocol::AuthMode;
use codex_protocol::mcp_protocol::ConversationId;
@@ -659,7 +660,11 @@ async fn process_sse<S>(
Ok(error) => {
let delay = try_parse_retry_after(&error);
let message = error.message.unwrap_or_default();
response_error = Some(CodexErr::Stream(message, delay));
if error.code.as_deref() == Some(CONTEXT_LENGTH_EXCEEDED) {
response_error = Some(CodexErr::ContextLengthExceeded(message));
} else {
response_error = Some(CodexErr::Stream(message, delay));
}
}
Err(e) => {
debug!("failed to parse ErrorResponse: {e}");

View File

@@ -86,6 +86,7 @@ use crate::protocol::AgentReasoningSectionBreakEvent;
use crate::protocol::ApplyPatchApprovalRequestEvent;
use crate::protocol::AskForApproval;
use crate::protocol::BackgroundEventEvent;
use crate::protocol::CompactApprovalRequestEvent;
use crate::protocol::ErrorEvent;
use crate::protocol::Event;
use crate::protocol::EventMsg;
@@ -1984,6 +1985,19 @@ async fn run_turn(
return Err(e);
}
Err(e) => {
// If we hit a context/window limit error, ask the UI to
// offer a compact confirmation and stop retrying this turn.
if matches!(e, CodexErr::ContextLengthExceeded(_)) {
let event = Event {
id: sub_id.clone(),
msg: EventMsg::CompactApprovalRequest(CompactApprovalRequestEvent {
reason: "The chat has exceeded its limits. To continue, you need to compact the chat. Confirm running compact?".to_string(),
}),
};
sess.send_event(event).await;
// Non-transient do not retry this turn further; let UI prompt.
return Err(e);
}
// Use the configured provider-specific stream retry budget.
let max_retries = turn_context.client.get_provider().stream_max_retries();
if retries < max_retries {

View File

@@ -115,14 +115,13 @@ async fn run_compact_task_inner(
) {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let instructions_override = compact_instructions;
let turn_input = sess
.turn_input_with_history(vec![initial_input_for_turn.clone().into()])
.await;
let prompt = Prompt {
input: turn_input,
tools: Vec::new(),
base_instructions_override: Some(instructions_override),
// Build an in-memory snapshot of the current history; attempts will pop
// from this vector on context-length errors without modifying the session
// transcript.
let mut working_history = {
let state = sess.state.lock().await;
state.history.contents()
};
let max_retries = turn_context.client.get_provider().stream_max_retries();
@@ -139,6 +138,17 @@ async fn run_compact_task_inner(
sess.persist_rollout_items(&[rollout_item]).await;
loop {
// Build prompt input = history + compact trigger
let mut turn_input: Vec<ResponseItem> = Vec::with_capacity(working_history.len() + 1);
turn_input.extend_from_slice(&working_history);
turn_input.push(initial_input_for_turn.clone().into());
let prompt = Prompt {
input: turn_input,
tools: Vec::new(),
base_instructions_override: Some(instructions_override.clone()),
};
let attempt_result = drain_to_completed(&sess, turn_context.as_ref(), &prompt).await;
match attempt_result {
@@ -149,6 +159,28 @@ async fn run_compact_task_inner(
return;
}
Err(e) => {
// Special-case compaction overflows: trim and retry immediately with no backoff.
if matches!(e, CodexErr::ContextLengthExceeded(_)) {
if working_history.pop().is_some() {
sess.notify_stream_error(
&sub_id,
"compact input exceeds context window; retrying with 1 fewer item…",
)
.await;
continue;
} else {
let event = Event {
id: sub_id.clone(),
msg: EventMsg::Error(ErrorEvent {
message:
"Unable to compact: context window too small for any history"
.to_string(),
}),
};
sess.send_event(event).await;
return;
}
}
if retries < max_retries {
retries += 1;
let delay = backoff(retries);

View File

@@ -104,6 +104,10 @@ pub enum CodexErr {
#[error("codex-linux-sandbox was required but not provided")]
LandlockSandboxExecutableNotProvided,
/// Provider reported the input exceeds the model's context window.
#[error("context_length_exceeded: {0}")]
ContextLengthExceeded(String),
// -----------------------------------------------------------------
// Automatic conversions for common external error types
// -----------------------------------------------------------------

View File

@@ -0,0 +1,2 @@
/// Known structured error codes returned by model providers.
pub const CONTEXT_LENGTH_EXCEEDED: &str = "context_length_exceeded";

View File

@@ -54,6 +54,7 @@ pub use conversation_manager::NewConversation;
pub use auth::AuthManager;
pub use auth::CodexAuth;
pub mod default_client;
pub mod error_codes;
pub mod model_family;
mod openai_model_info;
mod openai_tools;

View File

@@ -60,6 +60,7 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::ExecCommandEnd(_)
| EventMsg::ExecApprovalRequest(_)
| EventMsg::ApplyPatchApprovalRequest(_)
| EventMsg::CompactApprovalRequest(_)
| EventMsg::BackgroundEvent(_)
| EventMsg::StreamError(_)
| EventMsg::PatchApplyBegin(_)

View File

@@ -796,3 +796,133 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_
"second auto compact request should reuse summarization instructions"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn compact_trims_history_on_context_limit_error() {
non_sandbox_test!();
let server = start_mock_server().await;
// Minimal completes for two initial user turns.
let sse_user_done = sse(vec![ev_completed("r-user")]);
// First compact attempt fails with context length exceeded.
let sse_compact_fail = sse(vec![serde_json::json!({
"type": "response.failed",
"response": { "error": { "code": "context_length_exceeded", "message": "too big" } }
})]);
// Second compact attempt succeeds.
let sse_compact_ok = sse(vec![
ev_assistant_message("m-sum", SUMMARY_TEXT),
ev_completed("r-sum"),
]);
// Matchers for the two user turns and two compact attempts.
let m_user1 = |req: &Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains("\"text\":\"u1\"") && !body.contains(SUMMARIZE_TRIGGER)
};
mount_sse_once(&server, m_user1, sse_user_done.clone()).await;
let m_user2 = |req: &Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains("\"text\":\"u2\"") && !body.contains(SUMMARIZE_TRIGGER)
};
mount_sse_once(&server, m_user2, sse_user_done.clone()).await;
// First compact attempt: includes the trigger and will fail.
let m_compact1 = |req: &Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains(SUMMARIZE_TRIGGER)
};
mount_sse_once(&server, m_compact1, sse_compact_fail).await;
// Second compact attempt: also includes trigger; succeeds.
let m_compact2 = |req: &Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains(SUMMARIZE_TRIGGER)
};
mount_sse_once(&server, m_compact2, sse_compact_ok).await;
// Build conversation
let model_provider = ModelProviderInfo {
base_url: Some(format!("{}/v1", server.uri())),
..built_in_model_providers()["openai"].clone()
};
let home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&home);
config.model_provider = model_provider;
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"));
let codex = conversation_manager
.new_conversation(config)
.await
.unwrap()
.conversation;
// Two user turns to seed history.
codex
.submit(Op::UserInput {
items: vec![InputItem::Text { text: "u1".into() }],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![InputItem::Text { text: "u2".into() }],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
// Request compaction: first attempt fails with context_length_exceeded; retry trims history and succeeds.
codex.submit(Op::Compact).await.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
// Inspect requests to verify that there were two compaction attempts and the
// second had a smaller input array than the first.
let requests = server.received_requests().await.unwrap();
let mut compact_bodies: Vec<serde_json::Value> = Vec::new();
for req in &requests {
let body = req.body_json::<serde_json::Value>().unwrap();
let is_compact = body
.get("input")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter().any(|it| {
it.get("type").and_then(|t| t.as_str()) == Some("message")
&& it
.get("content")
.and_then(|c| c.as_array())
.and_then(|a| a.first())
.and_then(|t| t.get("text"))
.and_then(|t| t.as_str())
.map(|t| t.contains(SUMMARIZE_TRIGGER))
.unwrap_or(false)
})
})
.unwrap_or(false);
if is_compact {
compact_bodies.push(body);
}
}
assert!(
compact_bodies.len() >= 2,
"expected at least two compact attempts (fail then success)"
);
let n = compact_bodies.len();
let len1 = compact_bodies[n - 2]["input"]
.as_array()
.map(Vec::len)
.unwrap_or(0);
let len2 = compact_bodies[n - 1]["input"]
.as_array()
.map(Vec::len)
.unwrap_or(0);
assert_eq!(
len1,
len2 + 1,
"second compact attempt should trim exactly one item: {len1} -> {len2}"
);
}

View File

@@ -562,6 +562,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
ts_println!(self, "task aborted: review ended");
}
},
EventMsg::CompactApprovalRequest(_) => {}
EventMsg::ShutdownComplete => return CodexStatus::Shutdown,
EventMsg::ConversationPath(_) => {}
EventMsg::UserMessage(_) => {}

View File

@@ -276,6 +276,7 @@ async fn run_codex_tool_session_inner(
| EventMsg::WebSearchEnd(_)
| EventMsg::GetHistoryEntryResponse(_)
| EventMsg::PlanUpdate(_)
| EventMsg::CompactApprovalRequest(_)
| EventMsg::TurnAborted(_)
| EventMsg::ConversationPath(_)
| EventMsg::UserMessage(_)

View File

@@ -478,6 +478,10 @@ pub enum EventMsg {
ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent),
/// Request user confirmation to run a compact operation to reduce the
/// conversation context after encountering model context/window limits.
CompactApprovalRequest(CompactApprovalRequestEvent),
BackgroundEvent(BackgroundEventEvent),
/// Notification that a model stream experienced an error or disconnect
@@ -540,6 +544,11 @@ pub struct TaskStartedEvent {
pub model_context_window: Option<u64>,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
pub struct CompactApprovalRequestEvent {
pub reason: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, Default, TS)]
pub struct TokenUsage {
pub input_tokens: u64,

View File

@@ -15,6 +15,7 @@ use codex_core::protocol::AgentReasoningRawContentDeltaEvent;
use codex_core::protocol::AgentReasoningRawContentEvent;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::BackgroundEventEvent;
use codex_core::protocol::CompactApprovalRequestEvent;
use codex_core::protocol::ErrorEvent;
use codex_core::protocol::Event;
use codex_core::protocol::EventMsg;
@@ -427,6 +428,12 @@ impl ChatWidget {
);
}
fn on_compact_approval_request(&mut self, ev: CompactApprovalRequestEvent) {
let request = ApprovalRequest::Compact { reason: ev.reason };
self.bottom_pane.push_approval_request(request);
self.request_redraw();
}
fn on_exec_command_begin(&mut self, ev: ExecCommandBeginEvent) {
self.flush_answer_stream_with_separator();
let ev2 = ev.clone();
@@ -1202,6 +1209,7 @@ impl ChatWidget {
EventMsg::ApplyPatchApprovalRequest(ev) => {
self.on_apply_patch_approval_request(id.unwrap_or_default(), ev)
}
EventMsg::CompactApprovalRequest(ev) => self.on_compact_approval_request(ev),
EventMsg::ExecCommandBegin(ev) => self.on_exec_command_begin(ev),
EventMsg::ExecCommandOutputDelta(delta) => self.on_exec_command_output_delta(delta),
EventMsg::PatchApplyBegin(ev) => self.on_patch_apply_begin(ev),

View File

@@ -44,6 +44,9 @@ pub(crate) enum ApprovalRequest {
reason: Option<String>,
grant_root: Option<PathBuf>,
},
Compact {
reason: String,
},
}
/// Options displayed in the *select* mode.
@@ -96,6 +99,24 @@ static PATCH_SELECT_OPTIONS: LazyLock<Vec<SelectOption>> = LazyLock::new(|| {
]
});
// Compact confirmation simple Yes/No prompt.
static COMPACT_SELECT_OPTIONS: LazyLock<Vec<SelectOption>> = LazyLock::new(|| {
vec![
SelectOption {
label: Line::from(vec!["Y".underlined(), "es".into()]),
description: "Run compact now to continue",
key: KeyCode::Char('y'),
decision: ReviewDecision::Approved,
},
SelectOption {
label: Line::from(vec!["N".underlined(), "o".into()]),
description: "Cancel; keep the transcript as-is",
key: KeyCode::Char('n'),
decision: ReviewDecision::Abort,
},
]
});
/// A modal prompting the user to approve or deny the pending request.
pub(crate) struct UserApprovalWidget {
approval_request: ApprovalRequest,
@@ -142,12 +163,17 @@ impl UserApprovalWidget {
Paragraph::new(contents).wrap(Wrap { trim: false })
}
ApprovalRequest::Compact { reason, .. } => {
let contents: Vec<Line> = vec![Line::from(reason.clone()), "".into()];
Paragraph::new(contents).wrap(Wrap { trim: false })
}
};
Self {
select_options: match &approval_request {
ApprovalRequest::Exec { .. } => &COMMAND_SELECT_OPTIONS,
ApprovalRequest::ApplyPatch { .. } => &PATCH_SELECT_OPTIONS,
ApprovalRequest::Compact { .. } => &COMPACT_SELECT_OPTIONS,
},
approval_request,
app_event_tx,
@@ -294,6 +320,9 @@ impl UserApprovalWidget {
ApprovalRequest::ApplyPatch { .. } => {
// No history line for patch approval decisions.
}
ApprovalRequest::Compact { .. } => {
// No history line for compact approval decisions. That's handled by compact task itself.
}
}
let op = match &self.approval_request {
@@ -305,6 +334,7 @@ impl UserApprovalWidget {
id: id.clone(),
decision,
},
ApprovalRequest::Compact { .. } => Op::Compact,
};
self.app_event_tx.send(AppEvent::CodexOp(op));
@@ -357,6 +387,7 @@ impl WidgetRef for &UserApprovalWidget {
let title = match &self.approval_request {
ApprovalRequest::Exec { .. } => "Allow command?",
ApprovalRequest::ApplyPatch { .. } => "Apply changes?",
ApprovalRequest::Compact { .. } => "Run compact?",
};
Line::from(title).render(title_area, buf);