diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 224502c38a..3456f9d731 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1712,6 +1712,7 @@ dependencies = [ "include_dir", "indexmap 2.13.0", "indoc", + "insta", "keyring", "landlock", "libc", diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index 1b3f62c5aa..208ba1c3a6 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -152,6 +152,7 @@ codex-utils-cargo-bin = { workspace = true } core_test_support = { workspace = true } ctor = { workspace = true } image = { workspace = true, features = ["jpeg", "png"] } +insta = { workspace = true } maplit = { workspace = true } predicates = { workspace = true } pretty_assertions = { workspace = true } diff --git a/codex-rs/core/tests/common/context_snapshot.rs b/codex-rs/core/tests/common/context_snapshot.rs new file mode 100644 index 0000000000..f9362ee30c --- /dev/null +++ b/codex-rs/core/tests/common/context_snapshot.rs @@ -0,0 +1,339 @@ +use serde_json::Value; + +use crate::responses::ResponsesRequest; + +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub enum ContextSnapshotRenderMode { + #[default] + RedactedText, + FullText, + KindOnly, + KindWithTextPrefix { + max_chars: usize, + }, +} + +#[derive(Debug, Clone)] +pub struct ContextSnapshotOptions { + render_mode: ContextSnapshotRenderMode, +} + +impl Default for ContextSnapshotOptions { + fn default() -> Self { + Self { + render_mode: ContextSnapshotRenderMode::RedactedText, + } + } +} + +impl ContextSnapshotOptions { + pub fn render_mode(mut self, render_mode: ContextSnapshotRenderMode) -> Self { + self.render_mode = render_mode; + self + } +} + +pub fn format_request_input_snapshot( + request: &ResponsesRequest, + options: &ContextSnapshotOptions, +) -> String { + let items = request.input(); + format_response_items_snapshot(items.as_slice(), options) +} + +pub fn format_response_items_snapshot(items: &[Value], options: &ContextSnapshotOptions) -> String { + items + .iter() + .enumerate() + .map(|(idx, item)| { + let Some(item_type) = item.get("type").and_then(Value::as_str) else { + return format!("{idx:02}:"); + }; + + if options.render_mode == ContextSnapshotRenderMode::KindOnly { + return if item_type == "message" { + let role = item.get("role").and_then(Value::as_str).unwrap_or("unknown"); + format!("{idx:02}:message/{role}") + } else { + format!("{idx:02}:{item_type}") + }; + } + + match item_type { + "message" => { + let role = item.get("role").and_then(Value::as_str).unwrap_or("unknown"); + let text = item + .get("content") + .and_then(Value::as_array) + .map(|content| { + content + .iter() + .map(|entry| { + if let Some(text) = entry.get("text").and_then(Value::as_str) { + return format_snapshot_text(text, options); + } + let Some(content_type) = + entry.get("type").and_then(Value::as_str) + else { + return "".to_string(); + }; + let Some(content_object) = entry.as_object() else { + return format!("<{content_type}>"); + }; + let mut extra_keys = content_object + .keys() + .filter(|key| *key != "type" && *key != "text") + .cloned() + .collect::>(); + extra_keys.sort(); + if extra_keys.is_empty() { + format!("<{content_type}>") + } else { + format!("<{content_type}:{}>", extra_keys.join(",")) + } + }) + .collect::>() + .join(" | ") + }) + .filter(|text| !text.is_empty()) + .unwrap_or_else(|| "".to_string()); + format!("{idx:02}:message/{role}:{text}") + } + "function_call" => { + let name = item.get("name").and_then(Value::as_str).unwrap_or("unknown"); + format!("{idx:02}:function_call/{name}") + } + "function_call_output" => { + let output = item + .get("output") + .and_then(Value::as_str) + .map(|output| format_snapshot_text(output, options)) + .unwrap_or_else(|| "".to_string()); + format!("{idx:02}:function_call_output:{output}") + } + "local_shell_call" => { + let command = item + .get("action") + .and_then(|action| action.get("command")) + .and_then(Value::as_array) + .map(|parts| { + parts + .iter() + .filter_map(Value::as_str) + .collect::>() + .join(" ") + }) + .map(|command| format_snapshot_text(&command, options)) + .filter(|cmd| !cmd.is_empty()) + .unwrap_or_else(|| "".to_string()); + format!("{idx:02}:local_shell_call:{command}") + } + "reasoning" => { + let summary_text = item + .get("summary") + .and_then(Value::as_array) + .and_then(|summary| summary.first()) + .and_then(|entry| entry.get("text")) + .and_then(Value::as_str) + .map(|text| format_snapshot_text(text, options)) + .unwrap_or_else(|| "".to_string()); + let has_encrypted_content = item + .get("encrypted_content") + .and_then(Value::as_str) + .is_some_and(|value| !value.is_empty()); + format!( + "{idx:02}:reasoning:summary={summary_text}:encrypted={has_encrypted_content}" + ) + } + "compaction" => { + let has_encrypted_content = item + .get("encrypted_content") + .and_then(Value::as_str) + .is_some_and(|value| !value.is_empty()); + format!("{idx:02}:compaction:encrypted={has_encrypted_content}") + } + other => format!("{idx:02}:{other}"), + } + }) + .collect::>() + .join("\n") +} + +pub fn format_labeled_requests_snapshot( + scenario: &str, + sections: &[(&str, &ResponsesRequest)], + options: &ContextSnapshotOptions, +) -> String { + let sections = sections + .iter() + .map(|(title, request)| { + format!( + "## {title}\n{}", + format_request_input_snapshot(request, options) + ) + }) + .collect::>() + .join("\n\n"); + format!("Scenario: {scenario}\n\n{sections}") +} + +pub fn format_labeled_items_snapshot( + scenario: &str, + sections: &[(&str, &[Value])], + options: &ContextSnapshotOptions, +) -> String { + let sections = sections + .iter() + .map(|(title, items)| { + format!( + "## {title}\n{}", + format_response_items_snapshot(items, options) + ) + }) + .collect::>() + .join("\n\n"); + format!("Scenario: {scenario}\n\n{sections}") +} + +fn format_snapshot_text(text: &str, options: &ContextSnapshotOptions) -> String { + match options.render_mode { + ContextSnapshotRenderMode::RedactedText => { + canonicalize_snapshot_text(text).replace('\n', "\\n") + } + ContextSnapshotRenderMode::FullText => text.replace('\n', "\\n"), + ContextSnapshotRenderMode::KindWithTextPrefix { max_chars } => { + let normalized = canonicalize_snapshot_text(text).replace('\n', "\\n"); + if normalized.chars().count() <= max_chars { + normalized + } else { + let prefix = normalized.chars().take(max_chars).collect::(); + format!("{prefix}...") + } + } + ContextSnapshotRenderMode::KindOnly => unreachable!(), + } +} + +fn canonicalize_snapshot_text(text: &str) -> String { + if text.starts_with("") { + return "".to_string(); + } + if text.starts_with("# AGENTS.md instructions for ") { + return "".to_string(); + } + if text.starts_with("") { + if let (Some(cwd_start), Some(cwd_end)) = (text.find(""), text.find("")) { + let cwd = &text[cwd_start + "".len()..cwd_end]; + return if cwd.ends_with("PRETURN_CONTEXT_DIFF_CWD") { + "".to_string() + } else { + ">".to_string() + }; + } + return "".to_string(); + } + if text.starts_with("You are performing a CONTEXT CHECKPOINT COMPACTION.") { + return "".to_string(); + } + if text.starts_with("Another language model started to solve this problem") + && let Some((_, summary)) = text.split_once('\n') + { + return format!("\n{summary}"); + } + text.to_string() +} + +#[cfg(test)] +mod tests { + use super::ContextSnapshotOptions; + use super::ContextSnapshotRenderMode; + use super::format_response_items_snapshot; + use pretty_assertions::assert_eq; + use serde_json::json; + + #[test] + fn full_text_mode_preserves_unredacted_text() { + let items = vec![json!({ + "type": "message", + "role": "user", + "content": [{ + "type": "input_text", + "text": "# AGENTS.md instructions for /tmp/example" + }] + })]; + + let rendered = format_response_items_snapshot( + &items, + &ContextSnapshotOptions::default().render_mode(ContextSnapshotRenderMode::FullText), + ); + + assert_eq!( + rendered, + "00:message/user:# AGENTS.md instructions for /tmp/example" + ); + } + + #[test] + fn redacted_text_mode_keeps_canonical_placeholders() { + let items = vec![json!({ + "type": "message", + "role": "user", + "content": [{ + "type": "input_text", + "text": "# AGENTS.md instructions for /tmp/example" + }] + })]; + + let rendered = format_response_items_snapshot( + &items, + &ContextSnapshotOptions::default().render_mode(ContextSnapshotRenderMode::RedactedText), + ); + + assert_eq!(rendered, "00:message/user:"); + } + + #[test] + fn image_only_message_is_rendered_as_non_text_span() { + let items = vec![json!({ + "type": "message", + "role": "user", + "content": [{ + "type": "input_image", + "image_url": "data:image/png;base64,AAAA" + }] + })]; + + let rendered = format_response_items_snapshot(&items, &ContextSnapshotOptions::default()); + + assert_eq!(rendered, "00:message/user:"); + } + + #[test] + fn mixed_text_and_image_message_keeps_image_span() { + let items = vec![json!({ + "type": "message", + "role": "user", + "content": [ + { + "type": "input_text", + "text": "" + }, + { + "type": "input_image", + "image_url": "data:image/png;base64,AAAA" + }, + { + "type": "input_text", + "text": "" + } + ] + })]; + + let rendered = format_response_items_snapshot(&items, &ContextSnapshotOptions::default()); + + assert_eq!( + rendered, + "00:message/user: | | " + ); + } +} diff --git a/codex-rs/core/tests/common/lib.rs b/codex-rs/core/tests/common/lib.rs index 87a8e1adb2..79bf9369e0 100644 --- a/codex-rs/core/tests/common/lib.rs +++ b/codex-rs/core/tests/common/lib.rs @@ -12,6 +12,7 @@ use codex_utils_absolute_path::AbsolutePathBuf; use regex_lite::Regex; use std::path::PathBuf; +pub mod context_snapshot; pub mod process; pub mod responses; pub mod streaming_sse; diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index 9faca23888..c434c89717 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -5,6 +5,8 @@ use std::time::Duration; use anyhow::Result; use base64::Engine; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; use codex_protocol::openai_models::ModelsResponse; use futures::SinkExt; use futures::StreamExt; @@ -112,6 +114,14 @@ impl ResponsesRequest { self.0.body.clone() } + pub fn body_contains_text(&self, text: &str) -> bool { + let json_fragment = serde_json::to_string(text) + .expect("serialize text to JSON") + .trim_matches('"') + .to_string(); + self.body_json().to_string().contains(&json_fragment) + } + pub fn instructions_text(&self) -> String { self.body_json()["instructions"] .as_str() @@ -131,6 +141,22 @@ impl ResponsesRequest { .collect() } + /// Returns all `input_image` `image_url` spans from `message` inputs for the provided role. + pub fn message_input_image_urls(&self, role: &str) -> Vec { + self.inputs_of_type("message") + .into_iter() + .filter(|item| item.get("role").and_then(Value::as_str) == Some(role)) + .filter_map(|item| item.get("content").and_then(Value::as_array).cloned()) + .flatten() + .filter(|span| span.get("type").and_then(Value::as_str) == Some("input_image")) + .filter_map(|span| { + span.get("image_url") + .and_then(Value::as_str) + .map(str::to_owned) + }) + .collect() + } + pub fn input(&self) -> Vec { self.body_json()["input"] .as_array() @@ -480,6 +506,18 @@ pub fn ev_assistant_message(id: &str, text: &str) -> Value { }) } +pub fn user_message_item(text: &str) -> ResponseItem { + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: text.to_string(), + }], + end_turn: None, + phase: None, + } +} + pub fn ev_message_item_added(id: &str, text: &str) -> Value { serde_json::json!({ "type": "response.output_item.added", @@ -818,15 +856,24 @@ where } pub async fn mount_compact_json_once(server: &MockServer, body: serde_json::Value) -> ResponseMock { - let (mock, response_mock) = compact_mock(); - mock.respond_with( + mount_compact_response_once( + server, ResponseTemplate::new(200) .insert_header("content-type", "application/json") - .set_body_json(body.clone()), + .set_body_json(body), ) - .up_to_n_times(1) - .mount(server) - .await; + .await +} + +pub async fn mount_compact_response_once( + server: &MockServer, + response: ResponseTemplate, +) -> ResponseMock { + let (mock, response_mock) = compact_mock(); + mock.respond_with(response) + .up_to_n_times(1) + .mount(server) + .await; response_mock } diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index 888d5946c0..25278073ec 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -20,6 +20,9 @@ use codex_protocol::items::TurnItem; use codex_protocol::openai_models::ModelInfo; use codex_protocol::openai_models::ModelsResponse; use codex_protocol::user_input::UserInput; +use core_test_support::context_snapshot; +use core_test_support::context_snapshot::ContextSnapshotOptions; +use core_test_support::context_snapshot::ContextSnapshotRenderMode; use core_test_support::responses::ev_local_shell_call; use core_test_support::responses::ev_reasoning_item; use core_test_support::responses::mount_models_once; @@ -27,7 +30,7 @@ use core_test_support::skip_if_no_network; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; use core_test_support::wait_for_event_match; -use std::collections::VecDeque; +use std::path::PathBuf; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; @@ -60,10 +63,11 @@ const SECOND_AUTO_SUMMARY: &str = "SECOND_AUTO_SUMMARY"; const FINAL_REPLY: &str = "FINAL_REPLY"; const CONTEXT_LIMIT_MESSAGE: &str = "Your input exceeds the context window of this model. Please adjust your input and try again."; -const DUMMY_FUNCTION_NAME: &str = "unsupported_tool"; +const DUMMY_FUNCTION_NAME: &str = "test_tool"; const DUMMY_CALL_ID: &str = "call-multi-auto"; const FUNCTION_CALL_LIMIT_MSG: &str = "function call limit push"; const POST_AUTO_USER_MSG: &str = "post auto follow-up"; +const PRETURN_CONTEXT_DIFF_CWD: &str = "/tmp/PRETURN_CONTEXT_DIFF_CWD"; pub(super) const COMPACT_WARNING_MESSAGE: &str = "Heads up: Long threads and multiple compactions can cause the model to be less accurate. Start a new thread when possible to keep threads small and targeted."; @@ -75,23 +79,6 @@ fn summary_with_prefix(summary: &str) -> String { format!("{SUMMARY_PREFIX}\n{summary}") } -fn drop_call_id(value: &mut serde_json::Value) { - match value { - serde_json::Value::Object(obj) => { - obj.retain(|k, _| k != "call_id"); - for v in obj.values_mut() { - drop_call_id(v); - } - } - serde_json::Value::Array(arr) => { - for v in arr { - drop_call_id(v); - } - } - _ => {} - } -} - fn set_test_compact_prompt(config: &mut Config) { config.compact_prompt = Some(SUMMARIZATION_PROMPT.to_string()); } @@ -194,6 +181,21 @@ async fn assert_compaction_uses_turn_lifecycle_id(codex: &std::sync::Arc ContextSnapshotOptions { + ContextSnapshotOptions::default() + .render_mode(ContextSnapshotRenderMode::KindWithTextPrefix { max_chars: 64 }) +} + +fn format_labeled_requests_snapshot( + scenario: &str, + sections: &[(&str, &core_test_support::responses::ResponsesRequest)], +) -> String { + context_snapshot::format_labeled_requests_snapshot( + scenario, + sections, + &context_snapshot_options(), + ) +} #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn summarize_context_three_requests_and_instructions() { @@ -408,8 +410,15 @@ async fn manual_compact_uses_custom_prompt() { skip_if_no_network!(); let server = start_mock_server().await; - let sse_stream = sse(vec![ev_completed("r1")]); - let response_mock = mount_sse_once(&server, sse_stream).await; + let first_turn = sse(vec![ + ev_assistant_message("m0", FIRST_REPLY), + ev_completed_with_tokens("r0", 80), + ]); + let compact_turn = sse(vec![ + ev_assistant_message("m1", SUMMARY_TEXT), + ev_completed_with_tokens("r1", 100), + ]); + let request_log = mount_sse_sequence(&server, vec![first_turn, compact_turn]).await; let custom_prompt = "Use this compact prompt instead"; @@ -424,6 +433,18 @@ async fn manual_compact_uses_custom_prompt() { .expect("create conversation") .codex; + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "USER_ONE".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await + .expect("submit first user turn"); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + codex.submit(Op::Compact).await.expect("trigger compact"); let warning_event = wait_for_event(&codex, |ev| matches!(ev, EventMsg::Warning(_))).await; let EventMsg::Warning(WarningEvent { message }) = warning_event else { @@ -432,7 +453,13 @@ async fn manual_compact_uses_custom_prompt() { assert_eq!(message, COMPACT_WARNING_MESSAGE); wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; - let body = response_mock.single_request().body_json(); + let requests = request_log.requests(); + assert_eq!( + requests.len(), + 2, + "expected first turn and compact requests" + ); + let body = requests[1].body_json(); let input = body .get("input") @@ -1742,6 +1769,21 @@ async fn pre_sampling_compact_runs_on_switch_to_smaller_context_model() { previous_model, next_model, ); + + insta::assert_snapshot!( + "pre_sampling_model_switch_compaction_shapes", + format_labeled_requests_snapshot( + "Pre-sampling compaction on model switch to a smaller context window: current behavior compacts using prior-turn history only (incoming user message excluded), and the follow-up request carries compacted history plus the new user message.", + &[ + ("Initial Request (Previous Model)", &requests[0]), + ("Pre-sampling Compaction Request", &requests[1]), + ( + "Post-Compaction Follow-up Request (Next Model)", + &requests[2] + ), + ] + ) + ); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -2133,6 +2175,82 @@ async fn manual_compact_retries_after_context_window_error() { } } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +// TODO(ccunningham): Re-enable after the follow-up compaction behavior PR lands. +// Current main behavior around non-context manual /compact failures is known-incorrect. +#[ignore = "behavior change covered in follow-up compaction PR"] +async fn manual_compact_non_context_failure_retries_then_emits_task_error() { + skip_if_no_network!(); + + let server = start_mock_server().await; + + let user_turn = sse(vec![ + ev_assistant_message("m1", FIRST_REPLY), + ev_completed("r1"), + ]); + let compact_failed_1 = sse_failed( + "resp-fail-1", + "server_error", + "temporary compact failure one", + ); + let compact_failed_2 = sse_failed( + "resp-fail-2", + "server_error", + "temporary compact failure two", + ); + + mount_sse_sequence(&server, vec![user_turn, compact_failed_1, compact_failed_2]).await; + + let mut model_provider = non_openai_model_provider(&server); + model_provider.stream_max_retries = Some(1); + + let codex = test_codex() + .with_config(move |config| { + config.model_provider = model_provider; + set_test_compact_prompt(config); + config.model_auto_compact_token_limit = Some(200_000); + }) + .build(&server) + .await + .expect("build codex") + .codex; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "first turn".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await + .expect("submit user input"); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + codex.submit(Op::Compact).await.expect("trigger compact"); + + let reconnect_message = wait_for_event_match(&codex, |event| match event { + EventMsg::StreamError(stream_error) => Some(stream_error.message.clone()), + _ => None, + }) + .await; + assert!( + reconnect_message.contains("Reconnecting... 1/1"), + "expected reconnect stream error message, got {reconnect_message}" + ); + + let task_error_message = wait_for_event_match(&codex, |event| match event { + EventMsg::Error(err) => Some(err.message.clone()), + _ => None, + }) + .await; + assert!( + task_error_message.contains("Error running local compact task"), + "expected local compact task error prefix, got {task_error_message}" + ); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn manual_compact_twice_preserves_latest_user_messages() { skip_if_no_network!(); @@ -2237,114 +2355,89 @@ async fn manual_compact_twice_preserves_latest_user_messages() { 5, "expected exactly 5 requests (user turn, compact, user turn, compact, final turn)" ); - let contains_user_text = |input: &[serde_json::Value], expected: &str| -> bool { - input.iter().any(|item| { - item.get("type").and_then(|v| v.as_str()) == Some("message") - && item.get("role").and_then(|v| v.as_str()) == Some("user") - && item - .get("content") - .and_then(|v| v.as_array()) - .is_some_and(|arr| { - arr.iter().any(|entry| { - entry.get("text").and_then(|v| v.as_str()) == Some(expected) - }) - }) - }) + let contains_user_text = |request: &core_test_support::responses::ResponsesRequest, + expected: &str| { + request + .message_input_texts("user") + .iter() + .any(|text| text == expected) }; - let first_turn_input = requests[0].input(); assert!( - contains_user_text(&first_turn_input, first_user_message), + contains_user_text(&requests[0], first_user_message), "first turn request missing first user message" ); assert!( - !contains_user_text(&first_turn_input, SUMMARIZATION_PROMPT), + !contains_user_text(&requests[0], SUMMARIZATION_PROMPT), "first turn request should not include summarization prompt" ); - let first_compact_input = requests[1].input(); assert!( - contains_user_text(&first_compact_input, first_user_message), + contains_user_text(&requests[1], first_user_message), "first compact request should include history before compaction" ); - let second_turn_input = requests[2].input(); assert!( - contains_user_text(&second_turn_input, second_user_message), + contains_user_text(&requests[2], second_user_message), "second turn request missing second user message" ); assert!( - contains_user_text(&second_turn_input, first_user_message), + contains_user_text(&requests[2], first_user_message), "second turn request should include the compacted user history" ); - let second_compact_input = requests[3].input(); assert!( - contains_user_text(&second_compact_input, second_user_message), + contains_user_text(&requests[3], second_user_message), "second compact request should include latest history" ); - let first_compact_has_prompt = contains_user_text(&first_compact_input, SUMMARIZATION_PROMPT); - let second_compact_has_prompt = contains_user_text(&second_compact_input, SUMMARIZATION_PROMPT); + insta::assert_snapshot!( + "manual_compact_with_history_shapes", + format_labeled_requests_snapshot( + "Manual /compact with prior user history compacts existing history and the follow-up turn includes the compact summary plus new user message.", + &[ + ("Local Compaction Request", &requests[1]), + ("Local Post-Compaction History Layout", &requests[2]), + ] + ) + ); + + let first_compact_has_prompt = contains_user_text(&requests[1], SUMMARIZATION_PROMPT); + let second_compact_has_prompt = contains_user_text(&requests[3], SUMMARIZATION_PROMPT); assert_eq!( first_compact_has_prompt, second_compact_has_prompt, "compact requests should consistently include or omit the summarization prompt" ); - let mut final_output = requests + let first_request_user_texts = requests[0].message_input_texts("user"); + let first_turn_user_index = first_request_user_texts + .len() + .checked_sub(1) + .unwrap_or_else(|| panic!("first turn request missing user messages")); + assert_eq!( + first_request_user_texts[first_turn_user_index], first_user_message, + "first turn request should end with the submitted user message" + ); + let seeded_user_prefix = &first_request_user_texts[..first_turn_user_index]; + + let final_request_user_texts = requests .last() .unwrap_or_else(|| panic!("final turn request missing for {final_user_message}")) - .input() - .into_iter() - .collect::>(); - - // Permissions developer message - final_output.pop_front(); - // User instructions (project docs/skills) - final_output.pop_front(); - // Environment context - final_output.pop_front(); - - let _ = final_output - .iter_mut() - .map(drop_call_id) - .collect::>(); - + .message_input_texts("user"); + assert!( + final_request_user_texts + .as_slice() + .starts_with(seeded_user_prefix), + "final request should start with seeded user prefix from first request: {seeded_user_prefix:?}" + ); + let final_output = &final_request_user_texts[seeded_user_prefix.len()..]; let expected = vec![ - json!({ - "content": vec![json!({ - "text": first_user_message, - "type": "input_text", - })], - "role": "user", - "type": "message", - }), - json!({ - "content": vec![json!({ - "text": second_user_message, - "type": "input_text", - })], - "role": "user", - "type": "message", - }), - json!({ - "content": vec![json!({ - "text": expected_second_summary, - "type": "input_text", - })], - "role": "user", - "type": "message", - }), - json!({ - "content": vec![json!({ - "text": final_user_message, - "type": "input_text", - })], - "role": "user", - "type": "message", - }), + first_user_message.to_string(), + second_user_message.to_string(), + expected_second_summary, + final_user_message.to_string(), ]; - assert_eq!(final_output, expected); + assert_eq!(final_output, expected.as_slice()); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -2459,7 +2552,7 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_ } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn auto_compact_triggers_after_function_call_over_95_percent_usage() { +async fn snapshot_request_shape_mid_turn_continuation_compaction() { skip_if_no_network!(); let server = start_mock_server().await; @@ -2467,29 +2560,25 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() { let context_window = 100; let limit = context_window * 90 / 100; let over_limit_tokens = context_window * 95 / 100 + 1; - let follow_up_user = "FOLLOW_UP_AFTER_LIMIT"; let first_turn = sse(vec![ ev_function_call(DUMMY_CALL_ID, DUMMY_FUNCTION_NAME, "{}"), - ev_completed_with_tokens("r1", 50), - ]); - let function_call_follow_up = sse(vec![ - ev_assistant_message("m2", FINAL_REPLY), - ev_completed_with_tokens("r2", over_limit_tokens), + ev_completed_with_tokens("r1", over_limit_tokens), ]); let auto_summary_payload = auto_summary(AUTO_SUMMARY_TEXT); let auto_compact_turn = sse(vec![ - ev_assistant_message("m3", &auto_summary_payload), + ev_assistant_message("m2", &auto_summary_payload), ev_completed_with_tokens("r3", 10), ]); - let post_auto_compact_turn = sse(vec![ev_completed_with_tokens("r4", 10)]); + let post_auto_compact_turn = sse(vec![ + ev_assistant_message("m3", FINAL_REPLY), + ev_completed_with_tokens("r4", 10), + ]); // Mount responses in order and keep mocks only for the ones we assert on. let first_turn_mock = mount_sse_once(&server, first_turn).await; - let follow_up_mock = mount_sse_once(&server, function_call_follow_up).await; let auto_compact_mock = mount_sse_once(&server, auto_compact_turn).await; - // We don't assert on the post-compact request, so no need to keep its mock. - mount_sse_once(&server, post_auto_compact_turn).await; + let post_auto_compact_mock = mount_sse_once(&server, post_auto_compact_turn).await; let model_provider = non_openai_model_provider(&server); @@ -2514,19 +2603,6 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() { wait_for_event(&codex, |msg| matches!(msg, EventMsg::TurnComplete(_))).await; - codex - .submit(Op::UserInput { - items: vec![UserInput::Text { - text: follow_up_user.into(), - text_elements: Vec::new(), - }], - final_output_json_schema: None, - }) - .await - .unwrap(); - - wait_for_event(&codex, |msg| matches!(msg, EventMsg::TurnComplete(_))).await; - // Assert first request captured expected user message that triggers function call. let first_request = first_turn_mock.single_request().input(); assert!( @@ -2543,7 +2619,7 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() { "first request should include the user message that triggers the function call" ); - let function_call_output = follow_up_mock + let function_call_output = auto_compact_mock .single_request() .function_call_output(DUMMY_CALL_ID); let output_text = function_call_output @@ -2558,7 +2634,24 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() { let auto_compact_body = auto_compact_mock.single_request().body_json().to_string(); assert!( body_contains_text(&auto_compact_body, SUMMARIZATION_PROMPT), - "auto compact request should include the summarization prompt after exceeding 95% (limit {limit})" + "mid-turn auto compact request should include the summarization prompt after exceeding 95% (limit {limit})" + ); + + insta::assert_snapshot!( + "mid_turn_compaction_shapes", + format_labeled_requests_snapshot( + "True mid-turn continuation compaction after tool output: compact request includes tool artifacts, and the continuation request includes the summary in the same turn.", + &[ + ( + "Local Compaction Request", + &auto_compact_mock.single_request() + ), + ( + "Local Post-Compaction History Layout", + &post_auto_compact_mock.single_request() + ), + ] + ) ); } @@ -2828,3 +2921,270 @@ async fn auto_compact_runs_when_reasoning_header_clears_between_turns() { "remote compaction should run once after the reasoning header clears" ); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +// TODO(ccunningham): Update once pre-turn compaction includes incoming user input. +async fn snapshot_request_shape_pre_turn_compaction_including_incoming_user_message() { + skip_if_no_network!(); + + let server = start_mock_server().await; + + let sse1 = sse(vec![ + ev_assistant_message("m1", FIRST_REPLY), + ev_completed_with_tokens("r1", 60), + ]); + let sse2 = sse(vec![ + ev_assistant_message("m2", "SECOND_REPLY"), + ev_completed_with_tokens("r2", 500), + ]); + let sse3 = sse(vec![ + ev_assistant_message("m3", "PRE_TURN_SUMMARY"), + ev_completed_with_tokens("r3", 100), + ]); + let sse4 = sse(vec![ + ev_assistant_message("m4", FINAL_REPLY), + ev_completed_with_tokens("r4", 80), + ]); + let request_log = mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4]).await; + + let model_provider = non_openai_model_provider(&server); + let codex = test_codex() + .with_config(move |config| { + config.model_provider = model_provider; + set_test_compact_prompt(config); + config.model_auto_compact_token_limit = Some(200); + }) + .build(&server) + .await + .expect("build codex") + .codex; + + for user in ["USER_ONE", "USER_TWO"] { + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: user.to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await + .expect("submit user input"); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + } + codex + .submit(Op::OverrideTurnContext { + cwd: Some(PathBuf::from(PRETURN_CONTEXT_DIFF_CWD)), + approval_policy: None, + sandbox_policy: None, + windows_sandbox_level: None, + model: None, + effort: None, + summary: None, + collaboration_mode: None, + personality: None, + }) + .await + .expect("override turn context"); + let image_url = "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR4nGNgYAAAAAMAASsJTYQAAAAASUVORK5CYII=" + .to_string(); + codex + .submit(Op::UserInput { + items: vec![ + UserInput::Image { + image_url: image_url.clone(), + }, + UserInput::Text { + text: "USER_THREE".to_string(), + text_elements: Vec::new(), + }, + ], + final_output_json_schema: None, + }) + .await + .expect("submit user input"); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + let requests = request_log.requests(); + assert_eq!(requests.len(), 4, "expected user, user, compact, follow-up"); + + insta::assert_snapshot!( + "pre_turn_compaction_including_incoming_shapes", + format_labeled_requests_snapshot( + "Pre-turn auto-compaction with a context override emits the context diff in the compact request while the incoming user message is still excluded.", + &[ + ("Local Compaction Request", &requests[2]), + ("Local Post-Compaction History Layout", &requests[3]), + ] + ) + ); + let compact_request_user_texts = requests[2].message_input_texts("user"); + assert!( + !compact_request_user_texts + .iter() + .any(|text| text == "USER_THREE"), + "current behavior excludes incoming user message from pre-turn compaction input" + ); + let follow_up_user_texts = requests[3].message_input_texts("user"); + assert!( + follow_up_user_texts.iter().any(|text| text == "USER_THREE"), + "expected post-compaction follow-up request to keep incoming user text" + ); + let follow_up_user_images = requests[3].message_input_image_urls("user"); + assert!( + follow_up_user_images + .iter() + .any(|url| url == image_url.as_str()), + "expected post-compaction follow-up request to keep incoming user image content" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +// TODO(ccunningham): Update once pre-turn compaction context-overflow handling includes incoming +// user input and emits richer oversized-input messaging. +async fn snapshot_request_shape_pre_turn_compaction_context_window_exceeded() { + skip_if_no_network!(); + + let server = start_mock_server().await; + + let first_turn = sse(vec![ + ev_assistant_message("m1", FIRST_REPLY), + ev_completed_with_tokens("r1", 500), + ]); + let mut responses = vec![first_turn]; + responses.extend( + (0..6).map(|_| { + sse_failed( + "compact-failed", + "context_length_exceeded", + "Your input exceeds the context window of this model. Please adjust your input and try again.", + ) + }), + ); + let request_log = mount_sse_sequence(&server, responses).await; + + let mut model_provider = non_openai_model_provider(&server); + model_provider.stream_max_retries = Some(0); + let codex = test_codex() + .with_config(move |config| { + config.model_provider = model_provider; + set_test_compact_prompt(config); + config.model_auto_compact_token_limit = Some(200); + }) + .build(&server) + .await + .expect("build codex") + .codex; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "USER_ONE".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await + .expect("submit first user"); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "USER_TWO".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await + .expect("submit second user"); + let error_message = wait_for_event_match(&codex, |event| match event { + EventMsg::Error(err) => Some(err.message.clone()), + _ => None, + }) + .await; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + let requests = request_log.requests(); + assert!( + requests.len() >= 2, + "expected first turn and at least one compaction request" + ); + + insta::assert_snapshot!( + "pre_turn_compaction_context_window_exceeded_shapes", + format_labeled_requests_snapshot( + "Pre-turn auto-compaction context-window failure: compaction request excludes the incoming user message and the turn errors.", + &[( + "Local Compaction Request (Incoming User Excluded)", + &requests[1] + ),] + ) + ); + + assert!( + error_message.contains("ran out of room in the model's context window"), + "expected context window exceeded message, got {error_message}" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn snapshot_request_shape_manual_compact_without_previous_user_messages() { + skip_if_no_network!(); + + let server = start_mock_server().await; + + let compact_turn = sse(vec![ + ev_assistant_message("m1", "MANUAL_EMPTY_SUMMARY"), + ev_completed_with_tokens("r1", 90), + ]); + let follow_up_turn = sse(vec![ + ev_assistant_message("m2", FINAL_REPLY), + ev_completed_with_tokens("r2", 80), + ]); + let request_log = mount_sse_sequence(&server, vec![compact_turn, follow_up_turn]).await; + + let model_provider = non_openai_model_provider(&server); + let codex = test_codex() + .with_config(move |config| { + config.model_provider = model_provider; + set_test_compact_prompt(config); + }) + .build(&server) + .await + .expect("build codex") + .codex; + + codex.submit(Op::Compact).await.expect("run /compact"); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "AFTER_MANUAL_EMPTY_COMPACT".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await + .expect("submit follow-up user input"); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + let requests = request_log.requests(); + assert_eq!( + requests.len(), + 2, + "expected manual /compact request and follow-up turn request" + ); + + insta::assert_snapshot!( + "manual_compact_without_prev_user_shapes", + format_labeled_requests_snapshot( + "Manual /compact with no prior user turn currently still issues a compaction request; follow-up turn carries canonical context and the new user message.", + &[ + ("Local Compaction Request", &requests[0]), + ("Local Post-Compaction History Layout", &requests[1]), + ] + ) + ); +} diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index 3311427d4b..6c31b8dbeb 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -1,9 +1,11 @@ #![allow(clippy::expect_used)] use std::fs; +use std::path::PathBuf; use anyhow::Result; use codex_core::CodexAuth; +use codex_core::compact::SUMMARY_PREFIX; use codex_core::protocol::EventMsg; use codex_core::protocol::ItemCompletedEvent; use codex_core::protocol::ItemStartedEvent; @@ -14,6 +16,9 @@ use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; use codex_protocol::user_input::UserInput; +use core_test_support::context_snapshot; +use core_test_support::context_snapshot::ContextSnapshotOptions; +use core_test_support::context_snapshot::ContextSnapshotRenderMode; use core_test_support::responses; use core_test_support::responses::mount_sse_once; use core_test_support::responses::sse; @@ -23,6 +28,7 @@ use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; use core_test_support::wait_for_event_match; use pretty_assertions::assert_eq; +use wiremock::ResponseTemplate; fn approx_token_count(text: &str) -> i64 { i64::try_from(text.len().saturating_add(3) / 4).unwrap_or(i64::MAX) @@ -39,6 +45,29 @@ fn estimate_compact_payload_tokens(request: &responses::ResponsesRequest) -> i64 .saturating_add(approx_token_count(&request.instructions_text())) } +const PRETURN_CONTEXT_DIFF_CWD: &str = "/tmp/PRETURN_CONTEXT_DIFF_CWD"; +const DUMMY_FUNCTION_NAME: &str = "test_tool"; + +fn summary_with_prefix(summary: &str) -> String { + format!("{SUMMARY_PREFIX}\n{summary}") +} + +fn context_snapshot_options() -> ContextSnapshotOptions { + ContextSnapshotOptions::default() + .render_mode(ContextSnapshotRenderMode::KindWithTextPrefix { max_chars: 64 }) +} + +fn format_labeled_requests_snapshot( + scenario: &str, + sections: &[(&str, &responses::ResponsesRequest)], +) -> String { + context_snapshot::format_labeled_requests_snapshot( + scenario, + sections, + &context_snapshot_options(), + ) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn remote_compact_replaces_history_for_followups() -> Result<()> { skip_if_no_network!(Ok(())); @@ -65,15 +94,7 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> { .await; let compacted_history = vec![ - ResponseItem::Message { - id: None, - role: "user".to_string(), - content: vec![ContentItem::InputText { - text: "REMOTE_COMPACTED_SUMMARY".to_string(), - }], - end_turn: None, - phase: None, - }, + responses::user_message_item("REMOTE_COMPACTED_SUMMARY"), ResponseItem::Compaction { encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(), }, @@ -134,12 +155,9 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> { "expected compact request to include assistant history" ); - let follow_up_body = responses_mock - .requests() - .last() - .expect("follow-up request missing") - .body_json() - .to_string(); + let response_requests = responses_mock.requests(); + let follow_up_request = response_requests.last().expect("follow-up request missing"); + let follow_up_body = follow_up_request.body_json().to_string(); assert!( follow_up_body.contains("REMOTE_COMPACTED_SUMMARY"), "expected follow-up request to use compacted history" @@ -152,6 +170,21 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> { !follow_up_body.contains("FIRST_REMOTE_REPLY"), "expected follow-up request to drop pre-compaction assistant messages" ); + assert!( + !follow_up_body.contains("hello remote compact"), + "expected follow-up request to drop compacted-away user turns when remote output omits them" + ); + + insta::assert_snapshot!( + "remote_manual_compact_with_history_shapes", + format_labeled_requests_snapshot( + "Remote manual /compact where remote compact output is summary-only: follow-up layout uses returned summary plus new user message.", + &[ + ("Remote Compaction Request", &compact_request), + ("Remote Post-Compaction History Layout", follow_up_request), + ] + ) + ); Ok(()) } @@ -184,15 +217,7 @@ async fn remote_compact_runs_automatically() -> Result<()> { .await; let compacted_history = vec![ - ResponseItem::Message { - id: None, - role: "user".to_string(), - content: vec![ContentItem::InputText { - text: "REMOTE_COMPACTED_SUMMARY".to_string(), - }], - end_turn: None, - phase: None, - }, + responses::user_message_item("REMOTE_COMPACTED_SUMMARY"), ResponseItem::Compaction { encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(), }, @@ -221,7 +246,8 @@ async fn remote_compact_runs_automatically() -> Result<()> { wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; assert!(message); assert_eq!(compact_mock.requests().len(), 1); - let follow_up_body = responses_mock.single_request().body_json().to_string(); + let follow_up_request = responses_mock.single_request(); + let follow_up_body = follow_up_request.body_json().to_string(); assert!(follow_up_body.contains("REMOTE_COMPACTED_SUMMARY")); assert!(follow_up_body.contains("ENCRYPTED_COMPACTION_SUMMARY")); @@ -497,7 +523,7 @@ async fn auto_remote_compact_failure_stops_agent_loop() -> Result<()> { ) .await; - let compact_mock = responses::mount_compact_json_once( + let first_compact_mock = responses::mount_compact_json_once( harness.server(), serde_json::json!({ "output": "invalid compact payload shape" }), ) @@ -541,18 +567,29 @@ async fn auto_remote_compact_failure_stops_agent_loop() -> Result<()> { assert!( error_message.contains("Error running remote compact task"), - "expected compact failure error, got {error_message}" + "expected remote compact task error prefix, got {error_message}" ); assert_eq!( - compact_mock.requests().len(), + first_compact_mock.requests().len(), 1, - "expected exactly one remote compact attempt" + "expected first remote compact attempt with incoming items" ); assert!( post_compact_turn_mock.requests().is_empty(), "expected agent loop to stop after compaction failure" ); + insta::assert_snapshot!( + "remote_pre_turn_compaction_failure_shapes", + format_labeled_requests_snapshot( + "Remote pre-turn auto-compaction parse failure: compaction request excludes the incoming user message and the turn stops.", + &[( + "Remote Compaction Request (Incoming User Excluded)", + &first_compact_mock.single_request() + ),] + ) + ); + Ok(()) } @@ -778,15 +815,7 @@ async fn remote_manual_compact_emits_context_compaction_items() -> Result<()> { .await; let compacted_history = vec![ - ResponseItem::Message { - id: None, - role: "user".to_string(), - content: vec![ContentItem::InputText { - text: "REMOTE_COMPACTED_SUMMARY".to_string(), - }], - end_turn: None, - phase: None, - }, + responses::user_message_item("REMOTE_COMPACTED_SUMMARY"), ResponseItem::Compaction { encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(), }, @@ -851,6 +880,68 @@ async fn remote_manual_compact_emits_context_compaction_items() -> Result<()> { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn remote_manual_compact_failure_emits_task_error_event() -> Result<()> { + skip_if_no_network!(Ok(())); + + let harness = TestCodexHarness::with_builder( + test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()), + ) + .await?; + let codex = harness.test().codex.clone(); + + mount_sse_once( + harness.server(), + sse(vec![ + responses::ev_assistant_message("m1", "REMOTE_REPLY"), + responses::ev_completed("resp-1"), + ]), + ) + .await; + + let compact_mock = responses::mount_compact_json_once( + harness.server(), + serde_json::json!({ "output": "invalid compact payload shape" }), + ) + .await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "manual remote compact".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; + + codex.submit(Op::Compact).await?; + + let error_message = wait_for_event_match(&codex, |event| match event { + EventMsg::Error(err) => Some(err.message.clone()), + _ => None, + }) + .await; + assert!( + error_message.contains("Error running remote compact task"), + "expected remote compact task error prefix, got {error_message}" + ); + assert!( + error_message.contains("invalid compact payload shape") + || error_message.contains("invalid type: string"), + "expected invalid compact payload details, got {error_message}" + ); + wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; + + assert_eq!(compact_mock.requests().len(), 1); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +// TODO(ccunningham): Re-enable after the follow-up compaction behavior PR lands. +// Current main behavior for rollout replacement-history persistence is known-incorrect. +#[ignore = "behavior change covered in follow-up compaction PR"] async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> { skip_if_no_network!(Ok(())); @@ -876,15 +967,7 @@ async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> .await; let compacted_history = vec![ - ResponseItem::Message { - id: None, - role: "user".to_string(), - content: vec![ContentItem::InputText { - text: "COMPACTED_USER_SUMMARY".to_string(), - }], - end_turn: None, - phase: None, - }, + responses::user_message_item("COMPACTED_USER_SUMMARY"), ResponseItem::Compaction { encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(), }, @@ -980,11 +1063,11 @@ async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> ) }); - if has_compacted_user_summary - && has_compaction_item - && has_compacted_assistant_note - && has_permissions_developer_message - { + if has_compacted_user_summary && has_compaction_item && has_compacted_assistant_note { + assert!( + !has_permissions_developer_message, + "manual remote compact rollout replacement history should not inject permissions context" + ); saw_compacted_history = true; break; } @@ -1036,15 +1119,7 @@ async fn remote_compact_and_resume_refresh_stale_developer_instructions() -> Res .await; let compacted_history = vec![ - ResponseItem::Message { - id: None, - role: "user".to_string(), - content: vec![ContentItem::InputText { - text: "REMOTE_COMPACTED_SUMMARY".to_string(), - }], - end_turn: None, - phase: None, - }, + responses::user_message_item("REMOTE_COMPACTED_SUMMARY"), ResponseItem::Message { id: None, role: "developer".to_string(), @@ -1177,15 +1252,7 @@ async fn remote_compact_refreshes_stale_developer_instructions_without_resume() .await; let compacted_history = vec![ - ResponseItem::Message { - id: None, - role: "user".to_string(), - content: vec![ContentItem::InputText { - text: "REMOTE_COMPACTED_SUMMARY".to_string(), - }], - end_turn: None, - phase: None, - }, + responses::user_message_item("REMOTE_COMPACTED_SUMMARY"), ResponseItem::Message { id: None, role: "developer".to_string(), @@ -1250,3 +1317,344 @@ async fn remote_compact_refreshes_stale_developer_instructions_without_resume() Ok(()) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +// TODO(ccunningham): Update once remote pre-turn compaction includes incoming user input. +async fn snapshot_request_shape_remote_pre_turn_compaction_including_incoming_user_message() +-> Result<()> { + skip_if_no_network!(Ok(())); + + let harness = TestCodexHarness::with_builder( + test_codex() + .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + .with_config(|config| { + config.model_auto_compact_token_limit = Some(200); + }), + ) + .await?; + let codex = harness.test().codex.clone(); + + let responses_mock = responses::mount_sse_sequence( + harness.server(), + vec![ + responses::sse(vec![ + responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"), + responses::ev_completed_with_tokens("r1", 60), + ]), + responses::sse(vec![ + responses::ev_assistant_message("m2", "REMOTE_SECOND_REPLY"), + responses::ev_completed_with_tokens("r2", 500), + ]), + responses::sse(vec![ + responses::ev_assistant_message("m3", "REMOTE_FINAL_REPLY"), + responses::ev_completed_with_tokens("r3", 80), + ]), + ], + ) + .await; + + let compacted_history = vec![ + responses::user_message_item("USER_ONE"), + responses::user_message_item("USER_TWO"), + responses::user_message_item(&summary_with_prefix("REMOTE_PRE_TURN_SUMMARY")), + ]; + let compact_mock = responses::mount_compact_json_once( + harness.server(), + serde_json::json!({ "output": compacted_history }), + ) + .await; + + for user in ["USER_ONE", "USER_TWO", "USER_THREE"] { + if user == "USER_THREE" { + codex + .submit(Op::OverrideTurnContext { + cwd: Some(PathBuf::from(PRETURN_CONTEXT_DIFF_CWD)), + approval_policy: None, + sandbox_policy: None, + windows_sandbox_level: None, + model: None, + effort: None, + summary: None, + collaboration_mode: None, + personality: None, + }) + .await?; + } + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: user.to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + } + + assert_eq!(compact_mock.requests().len(), 1); + let requests = responses_mock.requests(); + assert_eq!( + requests.len(), + 3, + "expected user, user, and post-compact turn" + ); + + let compact_request = compact_mock.single_request(); + insta::assert_snapshot!( + "remote_pre_turn_compaction_including_incoming_shapes", + format_labeled_requests_snapshot( + "Remote pre-turn auto-compaction with a context override emits the context diff in the compact request while excluding the incoming user message.", + &[ + ("Remote Compaction Request", &compact_request), + ("Remote Post-Compaction History Layout", &requests[2]), + ] + ) + ); + assert_eq!( + requests[2] + .message_input_texts("user") + .iter() + .filter(|text| text.as_str() == "USER_THREE") + .count(), + 1, + "post-compaction request should contain incoming user exactly once from runtime append" + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +// TODO(ccunningham): Update once remote pre-turn compaction context-overflow handling includes +// incoming user input and emits richer oversized-input messaging. +async fn snapshot_request_shape_remote_pre_turn_compaction_context_window_exceeded() -> Result<()> { + skip_if_no_network!(Ok(())); + + let harness = TestCodexHarness::with_builder( + test_codex() + .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + .with_config(|config| { + config.model_auto_compact_token_limit = Some(200); + }), + ) + .await?; + let codex = harness.test().codex.clone(); + + let responses_mock = responses::mount_sse_sequence( + harness.server(), + vec![responses::sse(vec![ + responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"), + responses::ev_completed_with_tokens("r1", 500), + ])], + ) + .await; + + let compact_mock = responses::mount_compact_response_once( + harness.server(), + ResponseTemplate::new(400).set_body_json(serde_json::json!({ + "error": { + "code": "context_length_exceeded", + "message": "Your input exceeds the context window of this model. Please adjust your input and try again." + } + })), + ) + .await; + let post_compact_turn_mock = responses::mount_sse_once( + harness.server(), + responses::sse(vec![ + responses::ev_assistant_message("m2", "REMOTE_POST_COMPACT_SHOULD_NOT_RUN"), + responses::ev_completed_with_tokens("r2", 80), + ]), + ) + .await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "USER_ONE".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "USER_TWO".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + let error_message = wait_for_event_match(&codex, |event| match event { + EventMsg::Error(err) => Some(err.message.clone()), + _ => None, + }) + .await; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + assert_eq!(compact_mock.requests().len(), 1); + let requests = responses_mock.requests(); + assert_eq!( + requests.len(), + 1, + "expected no post-compaction follow-up turn request after compact failure" + ); + assert!( + post_compact_turn_mock.requests().is_empty(), + "expected turn to stop after compaction failure" + ); + + let include_attempt_request = compact_mock.single_request(); + insta::assert_snapshot!( + "remote_pre_turn_compaction_context_window_exceeded_shapes", + format_labeled_requests_snapshot( + "Remote pre-turn auto-compaction context-window failure: compaction request excludes the incoming user message and the turn errors.", + &[( + "Remote Compaction Request (Incoming User Excluded)", + &include_attempt_request + ),] + ) + ); + assert!( + error_message.to_lowercase().contains("context window"), + "expected context window failure to surface, got {error_message}" + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn snapshot_request_shape_remote_mid_turn_continuation_compaction() -> Result<()> { + skip_if_no_network!(Ok(())); + + let harness = TestCodexHarness::with_builder( + test_codex() + .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + .with_config(|config| { + config.model_auto_compact_token_limit = Some(200); + }), + ) + .await?; + let codex = harness.test().codex.clone(); + + let responses_mock = responses::mount_sse_sequence( + harness.server(), + vec![ + responses::sse(vec![ + responses::ev_function_call("call-remote-mid-turn", DUMMY_FUNCTION_NAME, "{}"), + responses::ev_completed_with_tokens("r1", 500), + ]), + responses::sse(vec![ + responses::ev_assistant_message("m2", "REMOTE_MID_TURN_FINAL_REPLY"), + responses::ev_completed_with_tokens("r2", 80), + ]), + ], + ) + .await; + + let compacted_history = vec![ + responses::user_message_item("USER_ONE"), + responses::user_message_item(&summary_with_prefix("REMOTE_MID_TURN_SUMMARY")), + ]; + let compact_mock = responses::mount_compact_json_once( + harness.server(), + serde_json::json!({ "output": compacted_history }), + ) + .await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "USER_ONE".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + assert_eq!(compact_mock.requests().len(), 1); + let requests = responses_mock.requests(); + assert_eq!( + requests.len(), + 2, + "expected initial and post-compact requests" + ); + + let compact_request = compact_mock.single_request(); + insta::assert_snapshot!( + "remote_mid_turn_compaction_shapes", + format_labeled_requests_snapshot( + "Remote mid-turn continuation compaction after tool output: compact request includes tool artifacts and follow-up request includes the summary.", + &[ + ("Remote Compaction Request", &compact_request), + ("Remote Post-Compaction History Layout", &requests[1]), + ] + ) + ); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +// TODO(ccunningham): Update once manual remote /compact with no prior user turn becomes a no-op. +async fn snapshot_request_shape_remote_manual_compact_without_previous_user_messages() -> Result<()> +{ + skip_if_no_network!(Ok(())); + + let harness = TestCodexHarness::with_builder( + test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()), + ) + .await?; + let codex = harness.test().codex.clone(); + + let responses_mock = responses::mount_sse_once( + harness.server(), + responses::sse(vec![ + responses::ev_assistant_message("m1", "REMOTE_MANUAL_EMPTY_FOLLOW_UP_REPLY"), + responses::ev_completed_with_tokens("r1", 80), + ]), + ) + .await; + + let compact_mock = + responses::mount_compact_json_once(harness.server(), serde_json::json!({ "output": [] })) + .await; + + codex.submit(Op::Compact).await?; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "USER_ONE".to_string(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + assert_eq!( + compact_mock.requests().len(), + 1, + "current behavior still issues remote compaction for manual /compact without prior user" + ); + let compact_request = compact_mock.single_request(); + let follow_up_request = responses_mock.single_request(); + insta::assert_snapshot!( + "remote_manual_compact_without_prev_user_shapes", + format_labeled_requests_snapshot( + "Remote manual /compact with no prior user turn still issues a compact request; follow-up turn carries canonical context and new user message.", + &[ + ("Remote Compaction Request", &compact_request), + ("Remote Post-Compaction History Layout", &follow_up_request), + ] + ) + ); + + Ok(()) +} diff --git a/codex-rs/core/tests/suite/model_switching.rs b/codex-rs/core/tests/suite/model_switching.rs index 8edeef065a..edbb42f70c 100644 --- a/codex-rs/core/tests/suite/model_switching.rs +++ b/codex-rs/core/tests/suite/model_switching.rs @@ -29,7 +29,6 @@ use core_test_support::skip_if_no_network; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; use pretty_assertions::assert_eq; -use serde_json::Value; use wiremock::MockServer; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -324,32 +323,14 @@ async fn model_change_from_image_to_text_strips_prior_image_content() -> Result< assert_eq!(requests.len(), 2, "expected two model requests"); let first_request = requests.first().expect("expected first request"); - let first_has_input_image = first_request.inputs_of_type("message").iter().any(|item| { - item.get("content") - .and_then(Value::as_array) - .is_some_and(|content| { - content - .iter() - .any(|span| span.get("type").and_then(Value::as_str) == Some("input_image")) - }) - }); assert!( - first_has_input_image, + !first_request.message_input_image_urls("user").is_empty(), "first request should include the uploaded image" ); let second_request = requests.last().expect("expected second request"); - let second_has_input_image = second_request.inputs_of_type("message").iter().any(|item| { - item.get("content") - .and_then(Value::as_array) - .is_some_and(|content| { - content - .iter() - .any(|span| span.get("type").and_then(Value::as_str) == Some("input_image")) - }) - }); assert!( - !second_has_input_image, + second_request.message_input_image_urls("user").is_empty(), "second request should strip unsupported image content" ); let second_user_texts = second_request.message_input_texts("user"); diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__compact__manual_compact_with_history_shapes.snap b/codex-rs/core/tests/suite/snapshots/all__suite__compact__manual_compact_with_history_shapes.snap new file mode 100644 index 0000000000..bcca5d3575 --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__compact__manual_compact_with_history_shapes.snap @@ -0,0 +1,21 @@ +--- +source: core/tests/suite/compact.rs +expression: "format_labeled_requests_snapshot(\"Manual /compact with prior user history compacts existing history and the follow-up turn includes the compact summary plus new user message.\",\n&[(\"Local Compaction Request\", &requests[1]),\n(\"Local Post-Compaction History Layout\", &requests[2]),])" +--- +Scenario: Manual /compact with prior user history compacts existing history and the follow-up turn includes the compact summary plus new user message. + +## Local Compaction Request +00:message/developer: +01:message/user: +02:message/user:> +03:message/user:first manual turn +04:message/assistant:FIRST_REPLY +05:message/user: + +## Local Post-Compaction History Layout +00:message/developer: +01:message/user: +02:message/user:> +03:message/user:first manual turn +04:message/user:\nFIRST_MANUAL_SUMMARY +05:message/user:second manual turn diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__compact__manual_compact_without_prev_user_shapes.snap b/codex-rs/core/tests/suite/snapshots/all__suite__compact__manual_compact_without_prev_user_shapes.snap new file mode 100644 index 0000000000..bdb4fe9bae --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__compact__manual_compact_without_prev_user_shapes.snap @@ -0,0 +1,18 @@ +--- +source: core/tests/suite/compact.rs +expression: "format_labeled_requests_snapshot(\"Manual /compact with no prior user turn currently still issues a compaction request; follow-up turn carries canonical context and the new user message.\",\n&[(\"Local Compaction Request\", &requests[0]),\n(\"Local Post-Compaction History Layout\", &requests[1]),])" +--- +Scenario: Manual /compact with no prior user turn currently still issues a compaction request; follow-up turn carries canonical context and the new user message. + +## Local Compaction Request +00:message/developer: +01:message/user: +02:message/user:> +03:message/user: + +## Local Post-Compaction History Layout +00:message/developer: +01:message/user: +02:message/user:> +03:message/user:\nMANUAL_EMPTY_SUMMARY +04:message/user:AFTER_MANUAL_EMPTY_COMPACT diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__compact__mid_turn_compaction_shapes.snap b/codex-rs/core/tests/suite/snapshots/all__suite__compact__mid_turn_compaction_shapes.snap new file mode 100644 index 0000000000..c13f78aff9 --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__compact__mid_turn_compaction_shapes.snap @@ -0,0 +1,22 @@ +--- +source: core/tests/suite/compact.rs +assertion_line: 2646 +expression: "format_labeled_requests_snapshot(\"True mid-turn continuation compaction after tool output: compact request includes tool artifacts, and the continuation request includes the summary in the same turn.\",\n&[(\"Local Compaction Request\", &auto_compact_mock.single_request()),\n(\"Local Post-Compaction History Layout\",\n&post_auto_compact_mock.single_request()),])" +--- +Scenario: True mid-turn continuation compaction after tool output: compact request includes tool artifacts, and the continuation request includes the summary in the same turn. + +## Local Compaction Request +00:message/developer: +01:message/user: +02:message/user:> +03:message/user:function call limit push +04:function_call/test_tool +05:function_call_output:unsupported call: test_tool +06:message/user: + +## Local Post-Compaction History Layout +00:message/developer: +01:message/user: +02:message/user:> +03:message/user:function call limit push +04:message/user:\nAUTO_SUMMARY diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__compact__pre_sampling_model_switch_compaction_shapes.snap b/codex-rs/core/tests/suite/snapshots/all__suite__compact__pre_sampling_model_switch_compaction_shapes.snap new file mode 100644 index 0000000000..8d6c4d9b1a --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__compact__pre_sampling_model_switch_compaction_shapes.snap @@ -0,0 +1,31 @@ +--- +source: core/tests/suite/compact.rs +assertion_line: 1773 +expression: "format_labeled_requests_snapshot(\"Pre-sampling compaction on model switch to a smaller context window: current behavior compacts using prior-turn history only (incoming user message excluded), and the follow-up request carries compacted history plus the new user message.\",\n&[(\"Initial Request (Previous Model)\", &requests[0]),\n(\"Pre-sampling Compaction Request\", &requests[1]),\n(\"Post-Compaction Follow-up Request (Next Model)\", &requests[2]),])" +--- +Scenario: Pre-sampling compaction on model switch to a smaller context window: current behavior compacts using prior-turn history only (incoming user message excluded), and the follow-up request carries compacted history plus the new user message. + +## Initial Request (Previous Model) +00:message/developer: +01:message/user: +02:message/user:> +03:message/developer: +04:message/user:before switch + +## Pre-sampling Compaction Request +00:message/developer: +01:message/user: +02:message/user:> +03:message/developer: +04:message/user:before switch +05:message/assistant:before switch +06:message/user: + +## Post-Compaction Follow-up Request (Next Model) +00:message/developer: +01:message/user: +02:message/user:> +03:message/user:before switch +04:message/user:\nPRE_SAMPLING_SUMMARY +05:message/developer:\nThe user was previously using a different model.... +06:message/user:after switch diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__compact__pre_turn_compaction_context_window_exceeded_shapes.snap b/codex-rs/core/tests/suite/snapshots/all__suite__compact__pre_turn_compaction_context_window_exceeded_shapes.snap new file mode 100644 index 0000000000..da30b01bd7 --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__compact__pre_turn_compaction_context_window_exceeded_shapes.snap @@ -0,0 +1,13 @@ +--- +source: core/tests/suite/compact.rs +expression: "format_labeled_requests_snapshot(\"Pre-turn auto-compaction context-window failure: compaction request excludes the incoming user message and the turn errors.\",\n&[(\"Local Compaction Request (Incoming User Excluded)\", &requests[1]),])" +--- +Scenario: Pre-turn auto-compaction context-window failure: compaction request excludes the incoming user message and the turn errors. + +## Local Compaction Request (Incoming User Excluded) +00:message/developer: +01:message/user: +02:message/user:> +03:message/user:USER_ONE +04:message/assistant:FIRST_REPLY +05:message/user: diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__compact__pre_turn_compaction_including_incoming_shapes.snap b/codex-rs/core/tests/suite/snapshots/all__suite__compact__pre_turn_compaction_including_incoming_shapes.snap new file mode 100644 index 0000000000..e586c8521a --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__compact__pre_turn_compaction_including_incoming_shapes.snap @@ -0,0 +1,25 @@ +--- +source: core/tests/suite/compact.rs +expression: "format_labeled_requests_snapshot(\"Pre-turn auto-compaction with a context override emits the context diff in the compact request while the incoming user message is still excluded.\",\n&[(\"Local Compaction Request\", &requests[2]),\n(\"Local Post-Compaction History Layout\", &requests[3]),])" +--- +Scenario: Pre-turn auto-compaction with a context override emits the context diff in the compact request while the incoming user message is still excluded. + +## Local Compaction Request +00:message/developer: +01:message/user: +02:message/user:> +03:message/user:USER_ONE +04:message/assistant:FIRST_REPLY +05:message/user:USER_TWO +06:message/assistant:SECOND_REPLY +07:message/user: +08:message/user: + +## Local Post-Compaction History Layout +00:message/developer: +01:message/user: +02:message/user: +03:message/user:USER_ONE +04:message/user:USER_TWO +05:message/user:\nPRE_TURN_SUMMARY +06:message/user: | | | USER_THREE diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_manual_compact_with_history_shapes.snap b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_manual_compact_with_history_shapes.snap new file mode 100644 index 0000000000..bc795a19f4 --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_manual_compact_with_history_shapes.snap @@ -0,0 +1,21 @@ +--- +source: core/tests/suite/compact_remote.rs +assertion_line: 178 +expression: "format_labeled_requests_snapshot(\"Remote manual /compact where remote compact output is summary-only: follow-up layout uses returned summary plus new user message.\",\n&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Layout\", follow_up_request),])" +--- +Scenario: Remote manual /compact where remote compact output is summary-only: follow-up layout uses returned summary plus new user message. + +## Remote Compaction Request +00:message/developer: +01:message/user: +02:message/user:> +03:message/user:hello remote compact +04:message/assistant:FIRST_REMOTE_REPLY + +## Remote Post-Compaction History Layout +00:message/developer: +01:message/user: +02:message/user:> +03:message/user:REMOTE_COMPACTED_SUMMARY +04:compaction:encrypted=true +05:message/user:after compact diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_manual_compact_without_prev_user_shapes.snap b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_manual_compact_without_prev_user_shapes.snap new file mode 100644 index 0000000000..ce62806d99 --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_manual_compact_without_prev_user_shapes.snap @@ -0,0 +1,16 @@ +--- +source: core/tests/suite/compact_remote.rs +expression: "format_labeled_requests_snapshot(\"Remote manual /compact with no prior user turn still issues a compact request; follow-up turn carries canonical context and new user message.\",\n&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Layout\", &follow_up_request),])" +--- +Scenario: Remote manual /compact with no prior user turn still issues a compact request; follow-up turn carries canonical context and new user message. + +## Remote Compaction Request +00:message/developer: +01:message/user: +02:message/user:> + +## Remote Post-Compaction History Layout +00:message/developer: +01:message/user: +02:message/user:> +03:message/user:USER_ONE diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_mid_turn_compaction_shapes.snap b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_mid_turn_compaction_shapes.snap new file mode 100644 index 0000000000..8ef8701673 --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_mid_turn_compaction_shapes.snap @@ -0,0 +1,20 @@ +--- +source: core/tests/suite/compact_remote.rs +expression: "format_labeled_requests_snapshot(\"Remote mid-turn continuation compaction after tool output: compact request includes tool artifacts and follow-up request includes the summary.\",\n&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Layout\", &requests[1]),])" +--- +Scenario: Remote mid-turn continuation compaction after tool output: compact request includes tool artifacts and follow-up request includes the summary. + +## Remote Compaction Request +00:message/developer: +01:message/user: +02:message/user:> +03:message/user:USER_ONE +04:function_call/test_tool +05:function_call_output:unsupported call: test_tool + +## Remote Post-Compaction History Layout +00:message/user:USER_ONE +01:message/developer: +02:message/user: +03:message/user:> +04:message/user:\nREMOTE_MID_TURN_SUMMARY diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_pre_turn_compaction_context_window_exceeded_shapes.snap b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_pre_turn_compaction_context_window_exceeded_shapes.snap new file mode 100644 index 0000000000..e718bfda88 --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_pre_turn_compaction_context_window_exceeded_shapes.snap @@ -0,0 +1,12 @@ +--- +source: core/tests/suite/compact_remote.rs +expression: "format_labeled_requests_snapshot(\"Remote pre-turn auto-compaction context-window failure: compaction request excludes the incoming user message and the turn errors.\",\n&[(\"Remote Compaction Request (Incoming User Excluded)\",\n&include_attempt_request),])" +--- +Scenario: Remote pre-turn auto-compaction context-window failure: compaction request excludes the incoming user message and the turn errors. + +## Remote Compaction Request (Incoming User Excluded) +00:message/developer: +01:message/user: +02:message/user:> +03:message/user:USER_ONE +04:message/assistant:REMOTE_FIRST_REPLY diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_pre_turn_compaction_failure_shapes.snap b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_pre_turn_compaction_failure_shapes.snap new file mode 100644 index 0000000000..f8aa74e9a1 --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_pre_turn_compaction_failure_shapes.snap @@ -0,0 +1,12 @@ +--- +source: core/tests/suite/compact_remote.rs +expression: "format_labeled_requests_snapshot(\"Remote pre-turn auto-compaction parse failure: compaction request excludes the incoming user message and the turn stops.\",\n&[(\"Remote Compaction Request (Incoming User Excluded)\",\n&first_compact_mock.single_request()),])" +--- +Scenario: Remote pre-turn auto-compaction parse failure: compaction request excludes the incoming user message and the turn stops. + +## Remote Compaction Request (Incoming User Excluded) +00:message/developer: +01:message/user: +02:message/user:> +03:message/user:turn that exceeds token threshold +04:message/assistant:initial turn complete diff --git a/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_pre_turn_compaction_including_incoming_shapes.snap b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_pre_turn_compaction_including_incoming_shapes.snap new file mode 100644 index 0000000000..00aaaeaa92 --- /dev/null +++ b/codex-rs/core/tests/suite/snapshots/all__suite__compact_remote__remote_pre_turn_compaction_including_incoming_shapes.snap @@ -0,0 +1,24 @@ +--- +source: core/tests/suite/compact_remote.rs +expression: "format_labeled_requests_snapshot(\"Remote pre-turn auto-compaction with a context override emits the context diff in the compact request while excluding the incoming user message.\",\n&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Layout\", &requests[2]),])" +--- +Scenario: Remote pre-turn auto-compaction with a context override emits the context diff in the compact request while excluding the incoming user message. + +## Remote Compaction Request +00:message/developer: +01:message/user: +02:message/user:> +03:message/user:USER_ONE +04:message/assistant:REMOTE_FIRST_REPLY +05:message/user:USER_TWO +06:message/assistant:REMOTE_SECOND_REPLY +07:message/user: + +## Remote Post-Compaction History Layout +00:message/user:USER_ONE +01:message/user:USER_TWO +02:message/developer: +03:message/user: +04:message/user: +05:message/user:\nREMOTE_PRE_TURN_SUMMARY +06:message/user:USER_THREE diff --git a/codex-rs/state/migrations/0010_logs_process_id.sql b/codex-rs/state/migrations/0010_logs_process_id.sql new file mode 100644 index 0000000000..fdefcd53a8 --- /dev/null +++ b/codex-rs/state/migrations/0010_logs_process_id.sql @@ -0,0 +1,3 @@ +ALTER TABLE logs ADD COLUMN process_uuid TEXT; + +CREATE INDEX idx_logs_process_uuid ON logs(process_uuid); diff --git a/codex-rs/state/src/log_db.rs b/codex-rs/state/src/log_db.rs index 345e90c6a8..64deeef2ad 100644 --- a/codex-rs/state/src/log_db.rs +++ b/codex-rs/state/src/log_db.rs @@ -20,6 +20,7 @@ use chrono::Duration as ChronoDuration; use chrono::Utc; +use std::sync::OnceLock; use std::time::Duration; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -33,6 +34,7 @@ use tracing::span::Id; use tracing::span::Record; use tracing_subscriber::Layer; use tracing_subscriber::registry::LookupSpan; +use uuid::Uuid; use crate::LogEntry; use crate::StateRuntime; @@ -44,14 +46,19 @@ const LOG_RETENTION_DAYS: i64 = 90; pub struct LogDbLayer { sender: mpsc::Sender, + process_uuid: String, } pub fn start(state_db: std::sync::Arc) -> LogDbLayer { + let process_uuid = current_process_log_uuid().to_string(); let (sender, receiver) = mpsc::channel(LOG_QUEUE_CAPACITY); tokio::spawn(run_inserter(std::sync::Arc::clone(&state_db), receiver)); tokio::spawn(run_retention_cleanup(state_db)); - LogDbLayer { sender } + LogDbLayer { + sender, + process_uuid, + } } impl Layer for LogDbLayer @@ -118,6 +125,7 @@ where target: metadata.target().to_string(), message: visitor.message, thread_id, + process_uuid: Some(self.process_uuid.clone()), module_path: metadata.module_path().map(ToString::to_string), file: metadata.file().map(ToString::to_string), line: metadata.line().map(|line| line as i64), @@ -196,6 +204,15 @@ where thread_id } +fn current_process_log_uuid() -> &'static str { + static PROCESS_LOG_UUID: OnceLock = OnceLock::new(); + PROCESS_LOG_UUID.get_or_init(|| { + let pid = std::process::id(); + let process_uuid = Uuid::new_v4(); + format!("pid:{pid}:{process_uuid}") + }) +} + async fn run_inserter( state_db: std::sync::Arc, mut receiver: mpsc::Receiver, diff --git a/codex-rs/state/src/model/log.rs b/codex-rs/state/src/model/log.rs index 819abb5d22..2aeaf2d5c8 100644 --- a/codex-rs/state/src/model/log.rs +++ b/codex-rs/state/src/model/log.rs @@ -9,6 +9,7 @@ pub struct LogEntry { pub target: String, pub message: Option, pub thread_id: Option, + pub process_uuid: Option, pub module_path: Option, pub file: Option, pub line: Option, @@ -23,6 +24,7 @@ pub struct LogRow { pub target: String, pub message: Option, pub thread_id: Option, + pub process_uuid: Option, pub file: Option, pub line: Option, } diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 4cc9c63b17..3a743a9c70 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -360,7 +360,7 @@ FROM threads } let mut builder = QueryBuilder::::new( - "INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, module_path, file, line) ", + "INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, process_uuid, module_path, file, line) ", ); builder.push_values(entries, |mut row, entry| { row.push_bind(entry.ts) @@ -369,6 +369,7 @@ FROM threads .push_bind(&entry.target) .push_bind(&entry.message) .push_bind(&entry.thread_id) + .push_bind(&entry.process_uuid) .push_bind(&entry.module_path) .push_bind(&entry.file) .push_bind(entry.line); @@ -388,7 +389,7 @@ FROM threads /// Query logs with optional filters. pub async fn query_logs(&self, query: &LogQuery) -> anyhow::Result> { let mut builder = QueryBuilder::::new( - "SELECT id, ts, ts_nanos, level, target, message, thread_id, file, line FROM logs WHERE 1 = 1", + "SELECT id, ts, ts_nanos, level, target, message, thread_id, process_uuid, file, line FROM logs WHERE 1 = 1", ); push_log_filters(&mut builder, query); if query.descending {