Merge branch 'main' of https://github.com/openai/codex into dev/mzeng/disk_cache_apps

This commit is contained in:
Matthew Zeng
2026-02-15 09:56:47 -08:00
24 changed files with 1620 additions and 224 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -1712,6 +1712,7 @@ dependencies = [
"include_dir",
"indexmap 2.13.0",
"indoc",
"insta",
"keyring",
"landlock",
"libc",

View File

@@ -152,6 +152,7 @@ codex-utils-cargo-bin = { workspace = true }
core_test_support = { workspace = true }
ctor = { workspace = true }
image = { workspace = true, features = ["jpeg", "png"] }
insta = { workspace = true }
maplit = { workspace = true }
predicates = { workspace = true }
pretty_assertions = { workspace = true }

View File

@@ -0,0 +1,339 @@
use serde_json::Value;
use crate::responses::ResponsesRequest;
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum ContextSnapshotRenderMode {
#[default]
RedactedText,
FullText,
KindOnly,
KindWithTextPrefix {
max_chars: usize,
},
}
#[derive(Debug, Clone)]
pub struct ContextSnapshotOptions {
render_mode: ContextSnapshotRenderMode,
}
impl Default for ContextSnapshotOptions {
fn default() -> Self {
Self {
render_mode: ContextSnapshotRenderMode::RedactedText,
}
}
}
impl ContextSnapshotOptions {
pub fn render_mode(mut self, render_mode: ContextSnapshotRenderMode) -> Self {
self.render_mode = render_mode;
self
}
}
pub fn format_request_input_snapshot(
request: &ResponsesRequest,
options: &ContextSnapshotOptions,
) -> String {
let items = request.input();
format_response_items_snapshot(items.as_slice(), options)
}
pub fn format_response_items_snapshot(items: &[Value], options: &ContextSnapshotOptions) -> String {
items
.iter()
.enumerate()
.map(|(idx, item)| {
let Some(item_type) = item.get("type").and_then(Value::as_str) else {
return format!("{idx:02}:<MISSING_TYPE>");
};
if options.render_mode == ContextSnapshotRenderMode::KindOnly {
return if item_type == "message" {
let role = item.get("role").and_then(Value::as_str).unwrap_or("unknown");
format!("{idx:02}:message/{role}")
} else {
format!("{idx:02}:{item_type}")
};
}
match item_type {
"message" => {
let role = item.get("role").and_then(Value::as_str).unwrap_or("unknown");
let text = item
.get("content")
.and_then(Value::as_array)
.map(|content| {
content
.iter()
.map(|entry| {
if let Some(text) = entry.get("text").and_then(Value::as_str) {
return format_snapshot_text(text, options);
}
let Some(content_type) =
entry.get("type").and_then(Value::as_str)
else {
return "<UNKNOWN_CONTENT_ITEM>".to_string();
};
let Some(content_object) = entry.as_object() else {
return format!("<{content_type}>");
};
let mut extra_keys = content_object
.keys()
.filter(|key| *key != "type" && *key != "text")
.cloned()
.collect::<Vec<String>>();
extra_keys.sort();
if extra_keys.is_empty() {
format!("<{content_type}>")
} else {
format!("<{content_type}:{}>", extra_keys.join(","))
}
})
.collect::<Vec<String>>()
.join(" | ")
})
.filter(|text| !text.is_empty())
.unwrap_or_else(|| "<NO_TEXT>".to_string());
format!("{idx:02}:message/{role}:{text}")
}
"function_call" => {
let name = item.get("name").and_then(Value::as_str).unwrap_or("unknown");
format!("{idx:02}:function_call/{name}")
}
"function_call_output" => {
let output = item
.get("output")
.and_then(Value::as_str)
.map(|output| format_snapshot_text(output, options))
.unwrap_or_else(|| "<NON_STRING_OUTPUT>".to_string());
format!("{idx:02}:function_call_output:{output}")
}
"local_shell_call" => {
let command = item
.get("action")
.and_then(|action| action.get("command"))
.and_then(Value::as_array)
.map(|parts| {
parts
.iter()
.filter_map(Value::as_str)
.collect::<Vec<&str>>()
.join(" ")
})
.map(|command| format_snapshot_text(&command, options))
.filter(|cmd| !cmd.is_empty())
.unwrap_or_else(|| "<NO_COMMAND>".to_string());
format!("{idx:02}:local_shell_call:{command}")
}
"reasoning" => {
let summary_text = item
.get("summary")
.and_then(Value::as_array)
.and_then(|summary| summary.first())
.and_then(|entry| entry.get("text"))
.and_then(Value::as_str)
.map(|text| format_snapshot_text(text, options))
.unwrap_or_else(|| "<NO_SUMMARY>".to_string());
let has_encrypted_content = item
.get("encrypted_content")
.and_then(Value::as_str)
.is_some_and(|value| !value.is_empty());
format!(
"{idx:02}:reasoning:summary={summary_text}:encrypted={has_encrypted_content}"
)
}
"compaction" => {
let has_encrypted_content = item
.get("encrypted_content")
.and_then(Value::as_str)
.is_some_and(|value| !value.is_empty());
format!("{idx:02}:compaction:encrypted={has_encrypted_content}")
}
other => format!("{idx:02}:{other}"),
}
})
.collect::<Vec<String>>()
.join("\n")
}
pub fn format_labeled_requests_snapshot(
scenario: &str,
sections: &[(&str, &ResponsesRequest)],
options: &ContextSnapshotOptions,
) -> String {
let sections = sections
.iter()
.map(|(title, request)| {
format!(
"## {title}\n{}",
format_request_input_snapshot(request, options)
)
})
.collect::<Vec<String>>()
.join("\n\n");
format!("Scenario: {scenario}\n\n{sections}")
}
pub fn format_labeled_items_snapshot(
scenario: &str,
sections: &[(&str, &[Value])],
options: &ContextSnapshotOptions,
) -> String {
let sections = sections
.iter()
.map(|(title, items)| {
format!(
"## {title}\n{}",
format_response_items_snapshot(items, options)
)
})
.collect::<Vec<String>>()
.join("\n\n");
format!("Scenario: {scenario}\n\n{sections}")
}
fn format_snapshot_text(text: &str, options: &ContextSnapshotOptions) -> String {
match options.render_mode {
ContextSnapshotRenderMode::RedactedText => {
canonicalize_snapshot_text(text).replace('\n', "\\n")
}
ContextSnapshotRenderMode::FullText => text.replace('\n', "\\n"),
ContextSnapshotRenderMode::KindWithTextPrefix { max_chars } => {
let normalized = canonicalize_snapshot_text(text).replace('\n', "\\n");
if normalized.chars().count() <= max_chars {
normalized
} else {
let prefix = normalized.chars().take(max_chars).collect::<String>();
format!("{prefix}...")
}
}
ContextSnapshotRenderMode::KindOnly => unreachable!(),
}
}
fn canonicalize_snapshot_text(text: &str) -> String {
if text.starts_with("<permissions instructions>") {
return "<PERMISSIONS_INSTRUCTIONS>".to_string();
}
if text.starts_with("# AGENTS.md instructions for ") {
return "<AGENTS_MD>".to_string();
}
if text.starts_with("<environment_context>") {
if let (Some(cwd_start), Some(cwd_end)) = (text.find("<cwd>"), text.find("</cwd>")) {
let cwd = &text[cwd_start + "<cwd>".len()..cwd_end];
return if cwd.ends_with("PRETURN_CONTEXT_DIFF_CWD") {
"<ENVIRONMENT_CONTEXT:cwd=PRETURN_CONTEXT_DIFF_CWD>".to_string()
} else {
"<ENVIRONMENT_CONTEXT:cwd=<CWD>>".to_string()
};
}
return "<ENVIRONMENT_CONTEXT>".to_string();
}
if text.starts_with("You are performing a CONTEXT CHECKPOINT COMPACTION.") {
return "<SUMMARIZATION_PROMPT>".to_string();
}
if text.starts_with("Another language model started to solve this problem")
&& let Some((_, summary)) = text.split_once('\n')
{
return format!("<COMPACTION_SUMMARY>\n{summary}");
}
text.to_string()
}
#[cfg(test)]
mod tests {
use super::ContextSnapshotOptions;
use super::ContextSnapshotRenderMode;
use super::format_response_items_snapshot;
use pretty_assertions::assert_eq;
use serde_json::json;
#[test]
fn full_text_mode_preserves_unredacted_text() {
let items = vec![json!({
"type": "message",
"role": "user",
"content": [{
"type": "input_text",
"text": "# AGENTS.md instructions for /tmp/example"
}]
})];
let rendered = format_response_items_snapshot(
&items,
&ContextSnapshotOptions::default().render_mode(ContextSnapshotRenderMode::FullText),
);
assert_eq!(
rendered,
"00:message/user:# AGENTS.md instructions for /tmp/example"
);
}
#[test]
fn redacted_text_mode_keeps_canonical_placeholders() {
let items = vec![json!({
"type": "message",
"role": "user",
"content": [{
"type": "input_text",
"text": "# AGENTS.md instructions for /tmp/example"
}]
})];
let rendered = format_response_items_snapshot(
&items,
&ContextSnapshotOptions::default().render_mode(ContextSnapshotRenderMode::RedactedText),
);
assert_eq!(rendered, "00:message/user:<AGENTS_MD>");
}
#[test]
fn image_only_message_is_rendered_as_non_text_span() {
let items = vec![json!({
"type": "message",
"role": "user",
"content": [{
"type": "input_image",
"image_url": "data:image/png;base64,AAAA"
}]
})];
let rendered = format_response_items_snapshot(&items, &ContextSnapshotOptions::default());
assert_eq!(rendered, "00:message/user:<input_image:image_url>");
}
#[test]
fn mixed_text_and_image_message_keeps_image_span() {
let items = vec![json!({
"type": "message",
"role": "user",
"content": [
{
"type": "input_text",
"text": "<image>"
},
{
"type": "input_image",
"image_url": "data:image/png;base64,AAAA"
},
{
"type": "input_text",
"text": "</image>"
}
]
})];
let rendered = format_response_items_snapshot(&items, &ContextSnapshotOptions::default());
assert_eq!(
rendered,
"00:message/user:<image> | <input_image:image_url> | </image>"
);
}
}

View File

@@ -12,6 +12,7 @@ use codex_utils_absolute_path::AbsolutePathBuf;
use regex_lite::Regex;
use std::path::PathBuf;
pub mod context_snapshot;
pub mod process;
pub mod responses;
pub mod streaming_sse;

View File

@@ -5,6 +5,8 @@ use std::time::Duration;
use anyhow::Result;
use base64::Engine;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ModelsResponse;
use futures::SinkExt;
use futures::StreamExt;
@@ -112,6 +114,14 @@ impl ResponsesRequest {
self.0.body.clone()
}
pub fn body_contains_text(&self, text: &str) -> bool {
let json_fragment = serde_json::to_string(text)
.expect("serialize text to JSON")
.trim_matches('"')
.to_string();
self.body_json().to_string().contains(&json_fragment)
}
pub fn instructions_text(&self) -> String {
self.body_json()["instructions"]
.as_str()
@@ -131,6 +141,22 @@ impl ResponsesRequest {
.collect()
}
/// Returns all `input_image` `image_url` spans from `message` inputs for the provided role.
pub fn message_input_image_urls(&self, role: &str) -> Vec<String> {
self.inputs_of_type("message")
.into_iter()
.filter(|item| item.get("role").and_then(Value::as_str) == Some(role))
.filter_map(|item| item.get("content").and_then(Value::as_array).cloned())
.flatten()
.filter(|span| span.get("type").and_then(Value::as_str) == Some("input_image"))
.filter_map(|span| {
span.get("image_url")
.and_then(Value::as_str)
.map(str::to_owned)
})
.collect()
}
pub fn input(&self) -> Vec<Value> {
self.body_json()["input"]
.as_array()
@@ -480,6 +506,18 @@ pub fn ev_assistant_message(id: &str, text: &str) -> Value {
})
}
pub fn user_message_item(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
pub fn ev_message_item_added(id: &str, text: &str) -> Value {
serde_json::json!({
"type": "response.output_item.added",
@@ -818,15 +856,24 @@ where
}
pub async fn mount_compact_json_once(server: &MockServer, body: serde_json::Value) -> ResponseMock {
let (mock, response_mock) = compact_mock();
mock.respond_with(
mount_compact_response_once(
server,
ResponseTemplate::new(200)
.insert_header("content-type", "application/json")
.set_body_json(body.clone()),
.set_body_json(body),
)
.up_to_n_times(1)
.mount(server)
.await;
.await
}
pub async fn mount_compact_response_once(
server: &MockServer,
response: ResponseTemplate,
) -> ResponseMock {
let (mock, response_mock) = compact_mock();
mock.respond_with(response)
.up_to_n_times(1)
.mount(server)
.await;
response_mock
}

View File

@@ -20,6 +20,9 @@ use codex_protocol::items::TurnItem;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::openai_models::ModelsResponse;
use codex_protocol::user_input::UserInput;
use core_test_support::context_snapshot;
use core_test_support::context_snapshot::ContextSnapshotOptions;
use core_test_support::context_snapshot::ContextSnapshotRenderMode;
use core_test_support::responses::ev_local_shell_call;
use core_test_support::responses::ev_reasoning_item;
use core_test_support::responses::mount_models_once;
@@ -27,7 +30,7 @@ use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use std::collections::VecDeque;
use std::path::PathBuf;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
@@ -60,10 +63,11 @@ const SECOND_AUTO_SUMMARY: &str = "SECOND_AUTO_SUMMARY";
const FINAL_REPLY: &str = "FINAL_REPLY";
const CONTEXT_LIMIT_MESSAGE: &str =
"Your input exceeds the context window of this model. Please adjust your input and try again.";
const DUMMY_FUNCTION_NAME: &str = "unsupported_tool";
const DUMMY_FUNCTION_NAME: &str = "test_tool";
const DUMMY_CALL_ID: &str = "call-multi-auto";
const FUNCTION_CALL_LIMIT_MSG: &str = "function call limit push";
const POST_AUTO_USER_MSG: &str = "post auto follow-up";
const PRETURN_CONTEXT_DIFF_CWD: &str = "/tmp/PRETURN_CONTEXT_DIFF_CWD";
pub(super) const COMPACT_WARNING_MESSAGE: &str = "Heads up: Long threads and multiple compactions can cause the model to be less accurate. Start a new thread when possible to keep threads small and targeted.";
@@ -75,23 +79,6 @@ fn summary_with_prefix(summary: &str) -> String {
format!("{SUMMARY_PREFIX}\n{summary}")
}
fn drop_call_id(value: &mut serde_json::Value) {
match value {
serde_json::Value::Object(obj) => {
obj.retain(|k, _| k != "call_id");
for v in obj.values_mut() {
drop_call_id(v);
}
}
serde_json::Value::Array(arr) => {
for v in arr {
drop_call_id(v);
}
}
_ => {}
}
}
fn set_test_compact_prompt(config: &mut Config) {
config.compact_prompt = Some(SUMMARIZATION_PROMPT.to_string());
}
@@ -194,6 +181,21 @@ async fn assert_compaction_uses_turn_lifecycle_id(codex: &std::sync::Arc<codex_c
"compaction item completion should use the turn event id"
);
}
fn context_snapshot_options() -> ContextSnapshotOptions {
ContextSnapshotOptions::default()
.render_mode(ContextSnapshotRenderMode::KindWithTextPrefix { max_chars: 64 })
}
fn format_labeled_requests_snapshot(
scenario: &str,
sections: &[(&str, &core_test_support::responses::ResponsesRequest)],
) -> String {
context_snapshot::format_labeled_requests_snapshot(
scenario,
sections,
&context_snapshot_options(),
)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn summarize_context_three_requests_and_instructions() {
@@ -408,8 +410,15 @@ async fn manual_compact_uses_custom_prompt() {
skip_if_no_network!();
let server = start_mock_server().await;
let sse_stream = sse(vec![ev_completed("r1")]);
let response_mock = mount_sse_once(&server, sse_stream).await;
let first_turn = sse(vec![
ev_assistant_message("m0", FIRST_REPLY),
ev_completed_with_tokens("r0", 80),
]);
let compact_turn = sse(vec![
ev_assistant_message("m1", SUMMARY_TEXT),
ev_completed_with_tokens("r1", 100),
]);
let request_log = mount_sse_sequence(&server, vec![first_turn, compact_turn]).await;
let custom_prompt = "Use this compact prompt instead";
@@ -424,6 +433,18 @@ async fn manual_compact_uses_custom_prompt() {
.expect("create conversation")
.codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit first user turn");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex.submit(Op::Compact).await.expect("trigger compact");
let warning_event = wait_for_event(&codex, |ev| matches!(ev, EventMsg::Warning(_))).await;
let EventMsg::Warning(WarningEvent { message }) = warning_event else {
@@ -432,7 +453,13 @@ async fn manual_compact_uses_custom_prompt() {
assert_eq!(message, COMPACT_WARNING_MESSAGE);
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let body = response_mock.single_request().body_json();
let requests = request_log.requests();
assert_eq!(
requests.len(),
2,
"expected first turn and compact requests"
);
let body = requests[1].body_json();
let input = body
.get("input")
@@ -1742,6 +1769,21 @@ async fn pre_sampling_compact_runs_on_switch_to_smaller_context_model() {
previous_model,
next_model,
);
insta::assert_snapshot!(
"pre_sampling_model_switch_compaction_shapes",
format_labeled_requests_snapshot(
"Pre-sampling compaction on model switch to a smaller context window: current behavior compacts using prior-turn history only (incoming user message excluded), and the follow-up request carries compacted history plus the new user message.",
&[
("Initial Request (Previous Model)", &requests[0]),
("Pre-sampling Compaction Request", &requests[1]),
(
"Post-Compaction Follow-up Request (Next Model)",
&requests[2]
),
]
)
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -2133,6 +2175,82 @@ async fn manual_compact_retries_after_context_window_error() {
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// TODO(ccunningham): Re-enable after the follow-up compaction behavior PR lands.
// Current main behavior around non-context manual /compact failures is known-incorrect.
#[ignore = "behavior change covered in follow-up compaction PR"]
async fn manual_compact_non_context_failure_retries_then_emits_task_error() {
skip_if_no_network!();
let server = start_mock_server().await;
let user_turn = sse(vec![
ev_assistant_message("m1", FIRST_REPLY),
ev_completed("r1"),
]);
let compact_failed_1 = sse_failed(
"resp-fail-1",
"server_error",
"temporary compact failure one",
);
let compact_failed_2 = sse_failed(
"resp-fail-2",
"server_error",
"temporary compact failure two",
);
mount_sse_sequence(&server, vec![user_turn, compact_failed_1, compact_failed_2]).await;
let mut model_provider = non_openai_model_provider(&server);
model_provider.stream_max_retries = Some(1);
let codex = test_codex()
.with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
config.model_auto_compact_token_limit = Some(200_000);
})
.build(&server)
.await
.expect("build codex")
.codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "first turn".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit user input");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex.submit(Op::Compact).await.expect("trigger compact");
let reconnect_message = wait_for_event_match(&codex, |event| match event {
EventMsg::StreamError(stream_error) => Some(stream_error.message.clone()),
_ => None,
})
.await;
assert!(
reconnect_message.contains("Reconnecting... 1/1"),
"expected reconnect stream error message, got {reconnect_message}"
);
let task_error_message = wait_for_event_match(&codex, |event| match event {
EventMsg::Error(err) => Some(err.message.clone()),
_ => None,
})
.await;
assert!(
task_error_message.contains("Error running local compact task"),
"expected local compact task error prefix, got {task_error_message}"
);
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn manual_compact_twice_preserves_latest_user_messages() {
skip_if_no_network!();
@@ -2237,114 +2355,89 @@ async fn manual_compact_twice_preserves_latest_user_messages() {
5,
"expected exactly 5 requests (user turn, compact, user turn, compact, final turn)"
);
let contains_user_text = |input: &[serde_json::Value], expected: &str| -> bool {
input.iter().any(|item| {
item.get("type").and_then(|v| v.as_str()) == Some("message")
&& item.get("role").and_then(|v| v.as_str()) == Some("user")
&& item
.get("content")
.and_then(|v| v.as_array())
.is_some_and(|arr| {
arr.iter().any(|entry| {
entry.get("text").and_then(|v| v.as_str()) == Some(expected)
})
})
})
let contains_user_text = |request: &core_test_support::responses::ResponsesRequest,
expected: &str| {
request
.message_input_texts("user")
.iter()
.any(|text| text == expected)
};
let first_turn_input = requests[0].input();
assert!(
contains_user_text(&first_turn_input, first_user_message),
contains_user_text(&requests[0], first_user_message),
"first turn request missing first user message"
);
assert!(
!contains_user_text(&first_turn_input, SUMMARIZATION_PROMPT),
!contains_user_text(&requests[0], SUMMARIZATION_PROMPT),
"first turn request should not include summarization prompt"
);
let first_compact_input = requests[1].input();
assert!(
contains_user_text(&first_compact_input, first_user_message),
contains_user_text(&requests[1], first_user_message),
"first compact request should include history before compaction"
);
let second_turn_input = requests[2].input();
assert!(
contains_user_text(&second_turn_input, second_user_message),
contains_user_text(&requests[2], second_user_message),
"second turn request missing second user message"
);
assert!(
contains_user_text(&second_turn_input, first_user_message),
contains_user_text(&requests[2], first_user_message),
"second turn request should include the compacted user history"
);
let second_compact_input = requests[3].input();
assert!(
contains_user_text(&second_compact_input, second_user_message),
contains_user_text(&requests[3], second_user_message),
"second compact request should include latest history"
);
let first_compact_has_prompt = contains_user_text(&first_compact_input, SUMMARIZATION_PROMPT);
let second_compact_has_prompt = contains_user_text(&second_compact_input, SUMMARIZATION_PROMPT);
insta::assert_snapshot!(
"manual_compact_with_history_shapes",
format_labeled_requests_snapshot(
"Manual /compact with prior user history compacts existing history and the follow-up turn includes the compact summary plus new user message.",
&[
("Local Compaction Request", &requests[1]),
("Local Post-Compaction History Layout", &requests[2]),
]
)
);
let first_compact_has_prompt = contains_user_text(&requests[1], SUMMARIZATION_PROMPT);
let second_compact_has_prompt = contains_user_text(&requests[3], SUMMARIZATION_PROMPT);
assert_eq!(
first_compact_has_prompt, second_compact_has_prompt,
"compact requests should consistently include or omit the summarization prompt"
);
let mut final_output = requests
let first_request_user_texts = requests[0].message_input_texts("user");
let first_turn_user_index = first_request_user_texts
.len()
.checked_sub(1)
.unwrap_or_else(|| panic!("first turn request missing user messages"));
assert_eq!(
first_request_user_texts[first_turn_user_index], first_user_message,
"first turn request should end with the submitted user message"
);
let seeded_user_prefix = &first_request_user_texts[..first_turn_user_index];
let final_request_user_texts = requests
.last()
.unwrap_or_else(|| panic!("final turn request missing for {final_user_message}"))
.input()
.into_iter()
.collect::<VecDeque<_>>();
// Permissions developer message
final_output.pop_front();
// User instructions (project docs/skills)
final_output.pop_front();
// Environment context
final_output.pop_front();
let _ = final_output
.iter_mut()
.map(drop_call_id)
.collect::<Vec<_>>();
.message_input_texts("user");
assert!(
final_request_user_texts
.as_slice()
.starts_with(seeded_user_prefix),
"final request should start with seeded user prefix from first request: {seeded_user_prefix:?}"
);
let final_output = &final_request_user_texts[seeded_user_prefix.len()..];
let expected = vec![
json!({
"content": vec![json!({
"text": first_user_message,
"type": "input_text",
})],
"role": "user",
"type": "message",
}),
json!({
"content": vec![json!({
"text": second_user_message,
"type": "input_text",
})],
"role": "user",
"type": "message",
}),
json!({
"content": vec![json!({
"text": expected_second_summary,
"type": "input_text",
})],
"role": "user",
"type": "message",
}),
json!({
"content": vec![json!({
"text": final_user_message,
"type": "input_text",
})],
"role": "user",
"type": "message",
}),
first_user_message.to_string(),
second_user_message.to_string(),
expected_second_summary,
final_user_message.to_string(),
];
assert_eq!(final_output, expected);
assert_eq!(final_output, expected.as_slice());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -2459,7 +2552,7 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_compact_triggers_after_function_call_over_95_percent_usage() {
async fn snapshot_request_shape_mid_turn_continuation_compaction() {
skip_if_no_network!();
let server = start_mock_server().await;
@@ -2467,29 +2560,25 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() {
let context_window = 100;
let limit = context_window * 90 / 100;
let over_limit_tokens = context_window * 95 / 100 + 1;
let follow_up_user = "FOLLOW_UP_AFTER_LIMIT";
let first_turn = sse(vec![
ev_function_call(DUMMY_CALL_ID, DUMMY_FUNCTION_NAME, "{}"),
ev_completed_with_tokens("r1", 50),
]);
let function_call_follow_up = sse(vec![
ev_assistant_message("m2", FINAL_REPLY),
ev_completed_with_tokens("r2", over_limit_tokens),
ev_completed_with_tokens("r1", over_limit_tokens),
]);
let auto_summary_payload = auto_summary(AUTO_SUMMARY_TEXT);
let auto_compact_turn = sse(vec![
ev_assistant_message("m3", &auto_summary_payload),
ev_assistant_message("m2", &auto_summary_payload),
ev_completed_with_tokens("r3", 10),
]);
let post_auto_compact_turn = sse(vec![ev_completed_with_tokens("r4", 10)]);
let post_auto_compact_turn = sse(vec![
ev_assistant_message("m3", FINAL_REPLY),
ev_completed_with_tokens("r4", 10),
]);
// Mount responses in order and keep mocks only for the ones we assert on.
let first_turn_mock = mount_sse_once(&server, first_turn).await;
let follow_up_mock = mount_sse_once(&server, function_call_follow_up).await;
let auto_compact_mock = mount_sse_once(&server, auto_compact_turn).await;
// We don't assert on the post-compact request, so no need to keep its mock.
mount_sse_once(&server, post_auto_compact_turn).await;
let post_auto_compact_mock = mount_sse_once(&server, post_auto_compact_turn).await;
let model_provider = non_openai_model_provider(&server);
@@ -2514,19 +2603,6 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() {
wait_for_event(&codex, |msg| matches!(msg, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: follow_up_user.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.unwrap();
wait_for_event(&codex, |msg| matches!(msg, EventMsg::TurnComplete(_))).await;
// Assert first request captured expected user message that triggers function call.
let first_request = first_turn_mock.single_request().input();
assert!(
@@ -2543,7 +2619,7 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() {
"first request should include the user message that triggers the function call"
);
let function_call_output = follow_up_mock
let function_call_output = auto_compact_mock
.single_request()
.function_call_output(DUMMY_CALL_ID);
let output_text = function_call_output
@@ -2558,7 +2634,24 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() {
let auto_compact_body = auto_compact_mock.single_request().body_json().to_string();
assert!(
body_contains_text(&auto_compact_body, SUMMARIZATION_PROMPT),
"auto compact request should include the summarization prompt after exceeding 95% (limit {limit})"
"mid-turn auto compact request should include the summarization prompt after exceeding 95% (limit {limit})"
);
insta::assert_snapshot!(
"mid_turn_compaction_shapes",
format_labeled_requests_snapshot(
"True mid-turn continuation compaction after tool output: compact request includes tool artifacts, and the continuation request includes the summary in the same turn.",
&[
(
"Local Compaction Request",
&auto_compact_mock.single_request()
),
(
"Local Post-Compaction History Layout",
&post_auto_compact_mock.single_request()
),
]
)
);
}
@@ -2828,3 +2921,270 @@ async fn auto_compact_runs_when_reasoning_header_clears_between_turns() {
"remote compaction should run once after the reasoning header clears"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// TODO(ccunningham): Update once pre-turn compaction includes incoming user input.
async fn snapshot_request_shape_pre_turn_compaction_including_incoming_user_message() {
skip_if_no_network!();
let server = start_mock_server().await;
let sse1 = sse(vec![
ev_assistant_message("m1", FIRST_REPLY),
ev_completed_with_tokens("r1", 60),
]);
let sse2 = sse(vec![
ev_assistant_message("m2", "SECOND_REPLY"),
ev_completed_with_tokens("r2", 500),
]);
let sse3 = sse(vec![
ev_assistant_message("m3", "PRE_TURN_SUMMARY"),
ev_completed_with_tokens("r3", 100),
]);
let sse4 = sse(vec![
ev_assistant_message("m4", FINAL_REPLY),
ev_completed_with_tokens("r4", 80),
]);
let request_log = mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4]).await;
let model_provider = non_openai_model_provider(&server);
let codex = test_codex()
.with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
config.model_auto_compact_token_limit = Some(200);
})
.build(&server)
.await
.expect("build codex")
.codex;
for user in ["USER_ONE", "USER_TWO"] {
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: user.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit user input");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
}
codex
.submit(Op::OverrideTurnContext {
cwd: Some(PathBuf::from(PRETURN_CONTEXT_DIFF_CWD)),
approval_policy: None,
sandbox_policy: None,
windows_sandbox_level: None,
model: None,
effort: None,
summary: None,
collaboration_mode: None,
personality: None,
})
.await
.expect("override turn context");
let image_url = "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR4nGNgYAAAAAMAASsJTYQAAAAASUVORK5CYII="
.to_string();
codex
.submit(Op::UserInput {
items: vec![
UserInput::Image {
image_url: image_url.clone(),
},
UserInput::Text {
text: "USER_THREE".to_string(),
text_elements: Vec::new(),
},
],
final_output_json_schema: None,
})
.await
.expect("submit user input");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let requests = request_log.requests();
assert_eq!(requests.len(), 4, "expected user, user, compact, follow-up");
insta::assert_snapshot!(
"pre_turn_compaction_including_incoming_shapes",
format_labeled_requests_snapshot(
"Pre-turn auto-compaction with a context override emits the context diff in the compact request while the incoming user message is still excluded.",
&[
("Local Compaction Request", &requests[2]),
("Local Post-Compaction History Layout", &requests[3]),
]
)
);
let compact_request_user_texts = requests[2].message_input_texts("user");
assert!(
!compact_request_user_texts
.iter()
.any(|text| text == "USER_THREE"),
"current behavior excludes incoming user message from pre-turn compaction input"
);
let follow_up_user_texts = requests[3].message_input_texts("user");
assert!(
follow_up_user_texts.iter().any(|text| text == "USER_THREE"),
"expected post-compaction follow-up request to keep incoming user text"
);
let follow_up_user_images = requests[3].message_input_image_urls("user");
assert!(
follow_up_user_images
.iter()
.any(|url| url == image_url.as_str()),
"expected post-compaction follow-up request to keep incoming user image content"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// TODO(ccunningham): Update once pre-turn compaction context-overflow handling includes incoming
// user input and emits richer oversized-input messaging.
async fn snapshot_request_shape_pre_turn_compaction_context_window_exceeded() {
skip_if_no_network!();
let server = start_mock_server().await;
let first_turn = sse(vec![
ev_assistant_message("m1", FIRST_REPLY),
ev_completed_with_tokens("r1", 500),
]);
let mut responses = vec![first_turn];
responses.extend(
(0..6).map(|_| {
sse_failed(
"compact-failed",
"context_length_exceeded",
"Your input exceeds the context window of this model. Please adjust your input and try again.",
)
}),
);
let request_log = mount_sse_sequence(&server, responses).await;
let mut model_provider = non_openai_model_provider(&server);
model_provider.stream_max_retries = Some(0);
let codex = test_codex()
.with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
config.model_auto_compact_token_limit = Some(200);
})
.build(&server)
.await
.expect("build codex")
.codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit first user");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit second user");
let error_message = wait_for_event_match(&codex, |event| match event {
EventMsg::Error(err) => Some(err.message.clone()),
_ => None,
})
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let requests = request_log.requests();
assert!(
requests.len() >= 2,
"expected first turn and at least one compaction request"
);
insta::assert_snapshot!(
"pre_turn_compaction_context_window_exceeded_shapes",
format_labeled_requests_snapshot(
"Pre-turn auto-compaction context-window failure: compaction request excludes the incoming user message and the turn errors.",
&[(
"Local Compaction Request (Incoming User Excluded)",
&requests[1]
),]
)
);
assert!(
error_message.contains("ran out of room in the model's context window"),
"expected context window exceeded message, got {error_message}"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_manual_compact_without_previous_user_messages() {
skip_if_no_network!();
let server = start_mock_server().await;
let compact_turn = sse(vec![
ev_assistant_message("m1", "MANUAL_EMPTY_SUMMARY"),
ev_completed_with_tokens("r1", 90),
]);
let follow_up_turn = sse(vec![
ev_assistant_message("m2", FINAL_REPLY),
ev_completed_with_tokens("r2", 80),
]);
let request_log = mount_sse_sequence(&server, vec![compact_turn, follow_up_turn]).await;
let model_provider = non_openai_model_provider(&server);
let codex = test_codex()
.with_config(move |config| {
config.model_provider = model_provider;
set_test_compact_prompt(config);
})
.build(&server)
.await
.expect("build codex")
.codex;
codex.submit(Op::Compact).await.expect("run /compact");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "AFTER_MANUAL_EMPTY_COMPACT".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await
.expect("submit follow-up user input");
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
let requests = request_log.requests();
assert_eq!(
requests.len(),
2,
"expected manual /compact request and follow-up turn request"
);
insta::assert_snapshot!(
"manual_compact_without_prev_user_shapes",
format_labeled_requests_snapshot(
"Manual /compact with no prior user turn currently still issues a compaction request; follow-up turn carries canonical context and the new user message.",
&[
("Local Compaction Request", &requests[0]),
("Local Post-Compaction History Layout", &requests[1]),
]
)
);
}

View File

@@ -1,9 +1,11 @@
#![allow(clippy::expect_used)]
use std::fs;
use std::path::PathBuf;
use anyhow::Result;
use codex_core::CodexAuth;
use codex_core::compact::SUMMARY_PREFIX;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ItemCompletedEvent;
use codex_core::protocol::ItemStartedEvent;
@@ -14,6 +16,9 @@ use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::user_input::UserInput;
use core_test_support::context_snapshot;
use core_test_support::context_snapshot::ContextSnapshotOptions;
use core_test_support::context_snapshot::ContextSnapshotRenderMode;
use core_test_support::responses;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::sse;
@@ -23,6 +28,7 @@ use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;
use wiremock::ResponseTemplate;
fn approx_token_count(text: &str) -> i64 {
i64::try_from(text.len().saturating_add(3) / 4).unwrap_or(i64::MAX)
@@ -39,6 +45,29 @@ fn estimate_compact_payload_tokens(request: &responses::ResponsesRequest) -> i64
.saturating_add(approx_token_count(&request.instructions_text()))
}
const PRETURN_CONTEXT_DIFF_CWD: &str = "/tmp/PRETURN_CONTEXT_DIFF_CWD";
const DUMMY_FUNCTION_NAME: &str = "test_tool";
fn summary_with_prefix(summary: &str) -> String {
format!("{SUMMARY_PREFIX}\n{summary}")
}
fn context_snapshot_options() -> ContextSnapshotOptions {
ContextSnapshotOptions::default()
.render_mode(ContextSnapshotRenderMode::KindWithTextPrefix { max_chars: 64 })
}
fn format_labeled_requests_snapshot(
scenario: &str,
sections: &[(&str, &responses::ResponsesRequest)],
) -> String {
context_snapshot::format_labeled_requests_snapshot(
scenario,
sections,
&context_snapshot_options(),
)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_replaces_history_for_followups() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -65,15 +94,7 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> {
.await;
let compacted_history = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "REMOTE_COMPACTED_SUMMARY".to_string(),
}],
end_turn: None,
phase: None,
},
responses::user_message_item("REMOTE_COMPACTED_SUMMARY"),
ResponseItem::Compaction {
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
},
@@ -134,12 +155,9 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> {
"expected compact request to include assistant history"
);
let follow_up_body = responses_mock
.requests()
.last()
.expect("follow-up request missing")
.body_json()
.to_string();
let response_requests = responses_mock.requests();
let follow_up_request = response_requests.last().expect("follow-up request missing");
let follow_up_body = follow_up_request.body_json().to_string();
assert!(
follow_up_body.contains("REMOTE_COMPACTED_SUMMARY"),
"expected follow-up request to use compacted history"
@@ -152,6 +170,21 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> {
!follow_up_body.contains("FIRST_REMOTE_REPLY"),
"expected follow-up request to drop pre-compaction assistant messages"
);
assert!(
!follow_up_body.contains("hello remote compact"),
"expected follow-up request to drop compacted-away user turns when remote output omits them"
);
insta::assert_snapshot!(
"remote_manual_compact_with_history_shapes",
format_labeled_requests_snapshot(
"Remote manual /compact where remote compact output is summary-only: follow-up layout uses returned summary plus new user message.",
&[
("Remote Compaction Request", &compact_request),
("Remote Post-Compaction History Layout", follow_up_request),
]
)
);
Ok(())
}
@@ -184,15 +217,7 @@ async fn remote_compact_runs_automatically() -> Result<()> {
.await;
let compacted_history = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "REMOTE_COMPACTED_SUMMARY".to_string(),
}],
end_turn: None,
phase: None,
},
responses::user_message_item("REMOTE_COMPACTED_SUMMARY"),
ResponseItem::Compaction {
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
},
@@ -221,7 +246,8 @@ async fn remote_compact_runs_automatically() -> Result<()> {
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
assert!(message);
assert_eq!(compact_mock.requests().len(), 1);
let follow_up_body = responses_mock.single_request().body_json().to_string();
let follow_up_request = responses_mock.single_request();
let follow_up_body = follow_up_request.body_json().to_string();
assert!(follow_up_body.contains("REMOTE_COMPACTED_SUMMARY"));
assert!(follow_up_body.contains("ENCRYPTED_COMPACTION_SUMMARY"));
@@ -497,7 +523,7 @@ async fn auto_remote_compact_failure_stops_agent_loop() -> Result<()> {
)
.await;
let compact_mock = responses::mount_compact_json_once(
let first_compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": "invalid compact payload shape" }),
)
@@ -541,18 +567,29 @@ async fn auto_remote_compact_failure_stops_agent_loop() -> Result<()> {
assert!(
error_message.contains("Error running remote compact task"),
"expected compact failure error, got {error_message}"
"expected remote compact task error prefix, got {error_message}"
);
assert_eq!(
compact_mock.requests().len(),
first_compact_mock.requests().len(),
1,
"expected exactly one remote compact attempt"
"expected first remote compact attempt with incoming items"
);
assert!(
post_compact_turn_mock.requests().is_empty(),
"expected agent loop to stop after compaction failure"
);
insta::assert_snapshot!(
"remote_pre_turn_compaction_failure_shapes",
format_labeled_requests_snapshot(
"Remote pre-turn auto-compaction parse failure: compaction request excludes the incoming user message and the turn stops.",
&[(
"Remote Compaction Request (Incoming User Excluded)",
&first_compact_mock.single_request()
),]
)
);
Ok(())
}
@@ -778,15 +815,7 @@ async fn remote_manual_compact_emits_context_compaction_items() -> Result<()> {
.await;
let compacted_history = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "REMOTE_COMPACTED_SUMMARY".to_string(),
}],
end_turn: None,
phase: None,
},
responses::user_message_item("REMOTE_COMPACTED_SUMMARY"),
ResponseItem::Compaction {
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
},
@@ -851,6 +880,68 @@ async fn remote_manual_compact_emits_context_compaction_items() -> Result<()> {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_manual_compact_failure_emits_task_error_event() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
)
.await?;
let codex = harness.test().codex.clone();
mount_sse_once(
harness.server(),
sse(vec![
responses::ev_assistant_message("m1", "REMOTE_REPLY"),
responses::ev_completed("resp-1"),
]),
)
.await;
let compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": "invalid compact payload shape" }),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "manual remote compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
codex.submit(Op::Compact).await?;
let error_message = wait_for_event_match(&codex, |event| match event {
EventMsg::Error(err) => Some(err.message.clone()),
_ => None,
})
.await;
assert!(
error_message.contains("Error running remote compact task"),
"expected remote compact task error prefix, got {error_message}"
);
assert!(
error_message.contains("invalid compact payload shape")
|| error_message.contains("invalid type: string"),
"expected invalid compact payload details, got {error_message}"
);
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// TODO(ccunningham): Re-enable after the follow-up compaction behavior PR lands.
// Current main behavior for rollout replacement-history persistence is known-incorrect.
#[ignore = "behavior change covered in follow-up compaction PR"]
async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -876,15 +967,7 @@ async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()>
.await;
let compacted_history = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "COMPACTED_USER_SUMMARY".to_string(),
}],
end_turn: None,
phase: None,
},
responses::user_message_item("COMPACTED_USER_SUMMARY"),
ResponseItem::Compaction {
encrypted_content: "ENCRYPTED_COMPACTION_SUMMARY".to_string(),
},
@@ -980,11 +1063,11 @@ async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()>
)
});
if has_compacted_user_summary
&& has_compaction_item
&& has_compacted_assistant_note
&& has_permissions_developer_message
{
if has_compacted_user_summary && has_compaction_item && has_compacted_assistant_note {
assert!(
!has_permissions_developer_message,
"manual remote compact rollout replacement history should not inject permissions context"
);
saw_compacted_history = true;
break;
}
@@ -1036,15 +1119,7 @@ async fn remote_compact_and_resume_refresh_stale_developer_instructions() -> Res
.await;
let compacted_history = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "REMOTE_COMPACTED_SUMMARY".to_string(),
}],
end_turn: None,
phase: None,
},
responses::user_message_item("REMOTE_COMPACTED_SUMMARY"),
ResponseItem::Message {
id: None,
role: "developer".to_string(),
@@ -1177,15 +1252,7 @@ async fn remote_compact_refreshes_stale_developer_instructions_without_resume()
.await;
let compacted_history = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "REMOTE_COMPACTED_SUMMARY".to_string(),
}],
end_turn: None,
phase: None,
},
responses::user_message_item("REMOTE_COMPACTED_SUMMARY"),
ResponseItem::Message {
id: None,
role: "developer".to_string(),
@@ -1250,3 +1317,344 @@ async fn remote_compact_refreshes_stale_developer_instructions_without_resume()
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// TODO(ccunningham): Update once remote pre-turn compaction includes incoming user input.
async fn snapshot_request_shape_remote_pre_turn_compaction_including_incoming_user_message()
-> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", 60),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_SECOND_REPLY"),
responses::ev_completed_with_tokens("r2", 500),
]),
responses::sse(vec![
responses::ev_assistant_message("m3", "REMOTE_FINAL_REPLY"),
responses::ev_completed_with_tokens("r3", 80),
]),
],
)
.await;
let compacted_history = vec![
responses::user_message_item("USER_ONE"),
responses::user_message_item("USER_TWO"),
responses::user_message_item(&summary_with_prefix("REMOTE_PRE_TURN_SUMMARY")),
];
let compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": compacted_history }),
)
.await;
for user in ["USER_ONE", "USER_TWO", "USER_THREE"] {
if user == "USER_THREE" {
codex
.submit(Op::OverrideTurnContext {
cwd: Some(PathBuf::from(PRETURN_CONTEXT_DIFF_CWD)),
approval_policy: None,
sandbox_policy: None,
windows_sandbox_level: None,
model: None,
effort: None,
summary: None,
collaboration_mode: None,
personality: None,
})
.await?;
}
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: user.to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
}
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(
requests.len(),
3,
"expected user, user, and post-compact turn"
);
let compact_request = compact_mock.single_request();
insta::assert_snapshot!(
"remote_pre_turn_compaction_including_incoming_shapes",
format_labeled_requests_snapshot(
"Remote pre-turn auto-compaction with a context override emits the context diff in the compact request while excluding the incoming user message.",
&[
("Remote Compaction Request", &compact_request),
("Remote Post-Compaction History Layout", &requests[2]),
]
)
);
assert_eq!(
requests[2]
.message_input_texts("user")
.iter()
.filter(|text| text.as_str() == "USER_THREE")
.count(),
1,
"post-compaction request should contain incoming user exactly once from runtime append"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// TODO(ccunningham): Update once remote pre-turn compaction context-overflow handling includes
// incoming user input and emits richer oversized-input messaging.
async fn snapshot_request_shape_remote_pre_turn_compaction_context_window_exceeded() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", 500),
])],
)
.await;
let compact_mock = responses::mount_compact_response_once(
harness.server(),
ResponseTemplate::new(400).set_body_json(serde_json::json!({
"error": {
"code": "context_length_exceeded",
"message": "Your input exceeds the context window of this model. Please adjust your input and try again."
}
})),
)
.await;
let post_compact_turn_mock = responses::mount_sse_once(
harness.server(),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_POST_COMPACT_SHOULD_NOT_RUN"),
responses::ev_completed_with_tokens("r2", 80),
]),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
let error_message = wait_for_event_match(&codex, |event| match event {
EventMsg::Error(err) => Some(err.message.clone()),
_ => None,
})
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(
requests.len(),
1,
"expected no post-compaction follow-up turn request after compact failure"
);
assert!(
post_compact_turn_mock.requests().is_empty(),
"expected turn to stop after compaction failure"
);
let include_attempt_request = compact_mock.single_request();
insta::assert_snapshot!(
"remote_pre_turn_compaction_context_window_exceeded_shapes",
format_labeled_requests_snapshot(
"Remote pre-turn auto-compaction context-window failure: compaction request excludes the incoming user message and the turn errors.",
&[(
"Remote Compaction Request (Incoming User Excluded)",
&include_attempt_request
),]
)
);
assert!(
error_message.to_lowercase().contains("context window"),
"expected context window failure to surface, got {error_message}"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_mid_turn_continuation_compaction() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![
responses::sse(vec![
responses::ev_function_call("call-remote-mid-turn", DUMMY_FUNCTION_NAME, "{}"),
responses::ev_completed_with_tokens("r1", 500),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_MID_TURN_FINAL_REPLY"),
responses::ev_completed_with_tokens("r2", 80),
]),
],
)
.await;
let compacted_history = vec![
responses::user_message_item("USER_ONE"),
responses::user_message_item(&summary_with_prefix("REMOTE_MID_TURN_SUMMARY")),
];
let compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": compacted_history }),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(
requests.len(),
2,
"expected initial and post-compact requests"
);
let compact_request = compact_mock.single_request();
insta::assert_snapshot!(
"remote_mid_turn_compaction_shapes",
format_labeled_requests_snapshot(
"Remote mid-turn continuation compaction after tool output: compact request includes tool artifacts and follow-up request includes the summary.",
&[
("Remote Compaction Request", &compact_request),
("Remote Post-Compaction History Layout", &requests[1]),
]
)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// TODO(ccunningham): Update once manual remote /compact with no prior user turn becomes a no-op.
async fn snapshot_request_shape_remote_manual_compact_without_previous_user_messages() -> Result<()>
{
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_once(
harness.server(),
responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_MANUAL_EMPTY_FOLLOW_UP_REPLY"),
responses::ev_completed_with_tokens("r1", 80),
]),
)
.await;
let compact_mock =
responses::mount_compact_json_once(harness.server(), serde_json::json!({ "output": [] }))
.await;
codex.submit(Op::Compact).await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(
compact_mock.requests().len(),
1,
"current behavior still issues remote compaction for manual /compact without prior user"
);
let compact_request = compact_mock.single_request();
let follow_up_request = responses_mock.single_request();
insta::assert_snapshot!(
"remote_manual_compact_without_prev_user_shapes",
format_labeled_requests_snapshot(
"Remote manual /compact with no prior user turn still issues a compact request; follow-up turn carries canonical context and new user message.",
&[
("Remote Compaction Request", &compact_request),
("Remote Post-Compaction History Layout", &follow_up_request),
]
)
);
Ok(())
}

View File

@@ -29,7 +29,6 @@ use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use pretty_assertions::assert_eq;
use serde_json::Value;
use wiremock::MockServer;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -324,32 +323,14 @@ async fn model_change_from_image_to_text_strips_prior_image_content() -> Result<
assert_eq!(requests.len(), 2, "expected two model requests");
let first_request = requests.first().expect("expected first request");
let first_has_input_image = first_request.inputs_of_type("message").iter().any(|item| {
item.get("content")
.and_then(Value::as_array)
.is_some_and(|content| {
content
.iter()
.any(|span| span.get("type").and_then(Value::as_str) == Some("input_image"))
})
});
assert!(
first_has_input_image,
!first_request.message_input_image_urls("user").is_empty(),
"first request should include the uploaded image"
);
let second_request = requests.last().expect("expected second request");
let second_has_input_image = second_request.inputs_of_type("message").iter().any(|item| {
item.get("content")
.and_then(Value::as_array)
.is_some_and(|content| {
content
.iter()
.any(|span| span.get("type").and_then(Value::as_str) == Some("input_image"))
})
});
assert!(
!second_has_input_image,
second_request.message_input_image_urls("user").is_empty(),
"second request should strip unsupported image content"
);
let second_user_texts = second_request.message_input_texts("user");

View File

@@ -0,0 +1,21 @@
---
source: core/tests/suite/compact.rs
expression: "format_labeled_requests_snapshot(\"Manual /compact with prior user history compacts existing history and the follow-up turn includes the compact summary plus new user message.\",\n&[(\"Local Compaction Request\", &requests[1]),\n(\"Local Post-Compaction History Layout\", &requests[2]),])"
---
Scenario: Manual /compact with prior user history compacts existing history and the follow-up turn includes the compact summary plus new user message.
## Local Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:first manual turn
04:message/assistant:FIRST_REPLY
05:message/user:<SUMMARIZATION_PROMPT>
## Local Post-Compaction History Layout
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:first manual turn
04:message/user:<COMPACTION_SUMMARY>\nFIRST_MANUAL_SUMMARY
05:message/user:second manual turn

View File

@@ -0,0 +1,18 @@
---
source: core/tests/suite/compact.rs
expression: "format_labeled_requests_snapshot(\"Manual /compact with no prior user turn currently still issues a compaction request; follow-up turn carries canonical context and the new user message.\",\n&[(\"Local Compaction Request\", &requests[0]),\n(\"Local Post-Compaction History Layout\", &requests[1]),])"
---
Scenario: Manual /compact with no prior user turn currently still issues a compaction request; follow-up turn carries canonical context and the new user message.
## Local Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:<SUMMARIZATION_PROMPT>
## Local Post-Compaction History Layout
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:<COMPACTION_SUMMARY>\nMANUAL_EMPTY_SUMMARY
04:message/user:AFTER_MANUAL_EMPTY_COMPACT

View File

@@ -0,0 +1,22 @@
---
source: core/tests/suite/compact.rs
assertion_line: 2646
expression: "format_labeled_requests_snapshot(\"True mid-turn continuation compaction after tool output: compact request includes tool artifacts, and the continuation request includes the summary in the same turn.\",\n&[(\"Local Compaction Request\", &auto_compact_mock.single_request()),\n(\"Local Post-Compaction History Layout\",\n&post_auto_compact_mock.single_request()),])"
---
Scenario: True mid-turn continuation compaction after tool output: compact request includes tool artifacts, and the continuation request includes the summary in the same turn.
## Local Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:function call limit push
04:function_call/test_tool
05:function_call_output:unsupported call: test_tool
06:message/user:<SUMMARIZATION_PROMPT>
## Local Post-Compaction History Layout
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:function call limit push
04:message/user:<COMPACTION_SUMMARY>\nAUTO_SUMMARY

View File

@@ -0,0 +1,31 @@
---
source: core/tests/suite/compact.rs
assertion_line: 1773
expression: "format_labeled_requests_snapshot(\"Pre-sampling compaction on model switch to a smaller context window: current behavior compacts using prior-turn history only (incoming user message excluded), and the follow-up request carries compacted history plus the new user message.\",\n&[(\"Initial Request (Previous Model)\", &requests[0]),\n(\"Pre-sampling Compaction Request\", &requests[1]),\n(\"Post-Compaction Follow-up Request (Next Model)\", &requests[2]),])"
---
Scenario: Pre-sampling compaction on model switch to a smaller context window: current behavior compacts using prior-turn history only (incoming user message excluded), and the follow-up request carries compacted history plus the new user message.
## Initial Request (Previous Model)
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/developer:<PERMISSIONS_INSTRUCTIONS>
04:message/user:before switch
## Pre-sampling Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/developer:<PERMISSIONS_INSTRUCTIONS>
04:message/user:before switch
05:message/assistant:before switch
06:message/user:<SUMMARIZATION_PROMPT>
## Post-Compaction Follow-up Request (Next Model)
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:before switch
04:message/user:<COMPACTION_SUMMARY>\nPRE_SAMPLING_SUMMARY
05:message/developer:<model_switch>\nThe user was previously using a different model....
06:message/user:after switch

View File

@@ -0,0 +1,13 @@
---
source: core/tests/suite/compact.rs
expression: "format_labeled_requests_snapshot(\"Pre-turn auto-compaction context-window failure: compaction request excludes the incoming user message and the turn errors.\",\n&[(\"Local Compaction Request (Incoming User Excluded)\", &requests[1]),])"
---
Scenario: Pre-turn auto-compaction context-window failure: compaction request excludes the incoming user message and the turn errors.
## Local Compaction Request (Incoming User Excluded)
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:USER_ONE
04:message/assistant:FIRST_REPLY
05:message/user:<SUMMARIZATION_PROMPT>

View File

@@ -0,0 +1,25 @@
---
source: core/tests/suite/compact.rs
expression: "format_labeled_requests_snapshot(\"Pre-turn auto-compaction with a context override emits the context diff in the compact request while the incoming user message is still excluded.\",\n&[(\"Local Compaction Request\", &requests[2]),\n(\"Local Post-Compaction History Layout\", &requests[3]),])"
---
Scenario: Pre-turn auto-compaction with a context override emits the context diff in the compact request while the incoming user message is still excluded.
## Local Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:USER_ONE
04:message/assistant:FIRST_REPLY
05:message/user:USER_TWO
06:message/assistant:SECOND_REPLY
07:message/user:<ENVIRONMENT_CONTEXT:cwd=PRETURN_CONTEXT_DIFF_CWD>
08:message/user:<SUMMARIZATION_PROMPT>
## Local Post-Compaction History Layout
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=PRETURN_CONTEXT_DIFF_CWD>
03:message/user:USER_ONE
04:message/user:USER_TWO
05:message/user:<COMPACTION_SUMMARY>\nPRE_TURN_SUMMARY
06:message/user:<image> | <input_image:image_url> | </image> | USER_THREE

View File

@@ -0,0 +1,21 @@
---
source: core/tests/suite/compact_remote.rs
assertion_line: 178
expression: "format_labeled_requests_snapshot(\"Remote manual /compact where remote compact output is summary-only: follow-up layout uses returned summary plus new user message.\",\n&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Layout\", follow_up_request),])"
---
Scenario: Remote manual /compact where remote compact output is summary-only: follow-up layout uses returned summary plus new user message.
## Remote Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:hello remote compact
04:message/assistant:FIRST_REMOTE_REPLY
## Remote Post-Compaction History Layout
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:REMOTE_COMPACTED_SUMMARY
04:compaction:encrypted=true
05:message/user:after compact

View File

@@ -0,0 +1,16 @@
---
source: core/tests/suite/compact_remote.rs
expression: "format_labeled_requests_snapshot(\"Remote manual /compact with no prior user turn still issues a compact request; follow-up turn carries canonical context and new user message.\",\n&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Layout\", &follow_up_request),])"
---
Scenario: Remote manual /compact with no prior user turn still issues a compact request; follow-up turn carries canonical context and new user message.
## Remote Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
## Remote Post-Compaction History Layout
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:USER_ONE

View File

@@ -0,0 +1,20 @@
---
source: core/tests/suite/compact_remote.rs
expression: "format_labeled_requests_snapshot(\"Remote mid-turn continuation compaction after tool output: compact request includes tool artifacts and follow-up request includes the summary.\",\n&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Layout\", &requests[1]),])"
---
Scenario: Remote mid-turn continuation compaction after tool output: compact request includes tool artifacts and follow-up request includes the summary.
## Remote Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:USER_ONE
04:function_call/test_tool
05:function_call_output:unsupported call: test_tool
## Remote Post-Compaction History Layout
00:message/user:USER_ONE
01:message/developer:<PERMISSIONS_INSTRUCTIONS>
02:message/user:<AGENTS_MD>
03:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
04:message/user:<COMPACTION_SUMMARY>\nREMOTE_MID_TURN_SUMMARY

View File

@@ -0,0 +1,12 @@
---
source: core/tests/suite/compact_remote.rs
expression: "format_labeled_requests_snapshot(\"Remote pre-turn auto-compaction context-window failure: compaction request excludes the incoming user message and the turn errors.\",\n&[(\"Remote Compaction Request (Incoming User Excluded)\",\n&include_attempt_request),])"
---
Scenario: Remote pre-turn auto-compaction context-window failure: compaction request excludes the incoming user message and the turn errors.
## Remote Compaction Request (Incoming User Excluded)
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:USER_ONE
04:message/assistant:REMOTE_FIRST_REPLY

View File

@@ -0,0 +1,12 @@
---
source: core/tests/suite/compact_remote.rs
expression: "format_labeled_requests_snapshot(\"Remote pre-turn auto-compaction parse failure: compaction request excludes the incoming user message and the turn stops.\",\n&[(\"Remote Compaction Request (Incoming User Excluded)\",\n&first_compact_mock.single_request()),])"
---
Scenario: Remote pre-turn auto-compaction parse failure: compaction request excludes the incoming user message and the turn stops.
## Remote Compaction Request (Incoming User Excluded)
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:turn that exceeds token threshold
04:message/assistant:initial turn complete

View File

@@ -0,0 +1,24 @@
---
source: core/tests/suite/compact_remote.rs
expression: "format_labeled_requests_snapshot(\"Remote pre-turn auto-compaction with a context override emits the context diff in the compact request while excluding the incoming user message.\",\n&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Layout\", &requests[2]),])"
---
Scenario: Remote pre-turn auto-compaction with a context override emits the context diff in the compact request while excluding the incoming user message.
## Remote Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user:<AGENTS_MD>
02:message/user:<ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:USER_ONE
04:message/assistant:REMOTE_FIRST_REPLY
05:message/user:USER_TWO
06:message/assistant:REMOTE_SECOND_REPLY
07:message/user:<ENVIRONMENT_CONTEXT:cwd=PRETURN_CONTEXT_DIFF_CWD>
## Remote Post-Compaction History Layout
00:message/user:USER_ONE
01:message/user:USER_TWO
02:message/developer:<PERMISSIONS_INSTRUCTIONS>
03:message/user:<AGENTS_MD>
04:message/user:<ENVIRONMENT_CONTEXT:cwd=PRETURN_CONTEXT_DIFF_CWD>
05:message/user:<COMPACTION_SUMMARY>\nREMOTE_PRE_TURN_SUMMARY
06:message/user:USER_THREE

View File

@@ -0,0 +1,3 @@
ALTER TABLE logs ADD COLUMN process_uuid TEXT;
CREATE INDEX idx_logs_process_uuid ON logs(process_uuid);

View File

@@ -20,6 +20,7 @@
use chrono::Duration as ChronoDuration;
use chrono::Utc;
use std::sync::OnceLock;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
@@ -33,6 +34,7 @@ use tracing::span::Id;
use tracing::span::Record;
use tracing_subscriber::Layer;
use tracing_subscriber::registry::LookupSpan;
use uuid::Uuid;
use crate::LogEntry;
use crate::StateRuntime;
@@ -44,14 +46,19 @@ const LOG_RETENTION_DAYS: i64 = 90;
pub struct LogDbLayer {
sender: mpsc::Sender<LogEntry>,
process_uuid: String,
}
pub fn start(state_db: std::sync::Arc<StateRuntime>) -> LogDbLayer {
let process_uuid = current_process_log_uuid().to_string();
let (sender, receiver) = mpsc::channel(LOG_QUEUE_CAPACITY);
tokio::spawn(run_inserter(std::sync::Arc::clone(&state_db), receiver));
tokio::spawn(run_retention_cleanup(state_db));
LogDbLayer { sender }
LogDbLayer {
sender,
process_uuid,
}
}
impl<S> Layer<S> for LogDbLayer
@@ -118,6 +125,7 @@ where
target: metadata.target().to_string(),
message: visitor.message,
thread_id,
process_uuid: Some(self.process_uuid.clone()),
module_path: metadata.module_path().map(ToString::to_string),
file: metadata.file().map(ToString::to_string),
line: metadata.line().map(|line| line as i64),
@@ -196,6 +204,15 @@ where
thread_id
}
fn current_process_log_uuid() -> &'static str {
static PROCESS_LOG_UUID: OnceLock<String> = OnceLock::new();
PROCESS_LOG_UUID.get_or_init(|| {
let pid = std::process::id();
let process_uuid = Uuid::new_v4();
format!("pid:{pid}:{process_uuid}")
})
}
async fn run_inserter(
state_db: std::sync::Arc<StateRuntime>,
mut receiver: mpsc::Receiver<LogEntry>,

View File

@@ -9,6 +9,7 @@ pub struct LogEntry {
pub target: String,
pub message: Option<String>,
pub thread_id: Option<String>,
pub process_uuid: Option<String>,
pub module_path: Option<String>,
pub file: Option<String>,
pub line: Option<i64>,
@@ -23,6 +24,7 @@ pub struct LogRow {
pub target: String,
pub message: Option<String>,
pub thread_id: Option<String>,
pub process_uuid: Option<String>,
pub file: Option<String>,
pub line: Option<i64>,
}

View File

@@ -360,7 +360,7 @@ FROM threads
}
let mut builder = QueryBuilder::<Sqlite>::new(
"INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, module_path, file, line) ",
"INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, process_uuid, module_path, file, line) ",
);
builder.push_values(entries, |mut row, entry| {
row.push_bind(entry.ts)
@@ -369,6 +369,7 @@ FROM threads
.push_bind(&entry.target)
.push_bind(&entry.message)
.push_bind(&entry.thread_id)
.push_bind(&entry.process_uuid)
.push_bind(&entry.module_path)
.push_bind(&entry.file)
.push_bind(entry.line);
@@ -388,7 +389,7 @@ FROM threads
/// Query logs with optional filters.
pub async fn query_logs(&self, query: &LogQuery) -> anyhow::Result<Vec<LogRow>> {
let mut builder = QueryBuilder::<Sqlite>::new(
"SELECT id, ts, ts_nanos, level, target, message, thread_id, file, line FROM logs WHERE 1 = 1",
"SELECT id, ts, ts_nanos, level, target, message, thread_id, process_uuid, file, line FROM logs WHERE 1 = 1",
);
push_log_filters(&mut builder, query);
if query.descending {