feat: remote compaction (#6795)

Co-authored-by: pakrym-oai <pakrym@openai.com>
This commit is contained in:
jif-oai
2025-11-18 16:51:16 +00:00
committed by GitHub
parent 0eb2e6f9ee
commit 838531d3e4
20 changed files with 688 additions and 134 deletions

View File

@@ -81,6 +81,7 @@ pub(crate) async fn stream_chat_completions(
ResponseItem::CustomToolCallOutput { .. } => {}
ResponseItem::WebSearchCall { .. } => {}
ResponseItem::GhostSnapshot { .. } => {}
ResponseItem::CompactionSummary { .. } => {}
}
}
@@ -320,7 +321,8 @@ pub(crate) async fn stream_chat_completions(
}
ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::Other => {
| ResponseItem::Other
| ResponseItem::CompactionSummary { .. } => {
// Omit these items from the conversation history.
continue;
}

View File

@@ -26,6 +26,7 @@ use tokio::sync::mpsc;
use tokio::time::timeout;
use tokio_util::io::ReaderStream;
use tracing::debug;
use tracing::enabled;
use tracing::trace;
use tracing::warn;
@@ -78,6 +79,18 @@ struct Error {
resets_at: Option<i64>,
}
#[derive(Debug, Serialize)]
struct CompactHistoryRequest<'a> {
model: &'a str,
input: &'a [ResponseItem],
instructions: &'a str,
}
#[derive(Debug, Deserialize)]
struct CompactHistoryResponse {
output: Vec<ResponseItem>,
}
#[derive(Debug, Clone)]
pub struct ModelClient {
config: Arc<Config>,
@@ -507,6 +520,70 @@ impl ModelClient {
pub fn get_auth_manager(&self) -> Option<Arc<AuthManager>> {
self.auth_manager.clone()
}
pub async fn compact_conversation_history(&self, prompt: &Prompt) -> Result<Vec<ResponseItem>> {
if prompt.input.is_empty() {
return Ok(Vec::new());
}
let auth_manager = self.auth_manager.clone();
let auth = auth_manager.as_ref().and_then(|m| m.auth());
let mut req_builder = self
.provider
.create_compact_request_builder(&self.client, &auth)
.await?;
if let SessionSource::SubAgent(sub) = &self.session_source {
let subagent = if let crate::protocol::SubAgentSource::Other(label) = sub {
label.clone()
} else {
serde_json::to_value(sub)
.ok()
.and_then(|v| v.as_str().map(std::string::ToString::to_string))
.unwrap_or_else(|| "other".to_string())
};
req_builder = req_builder.header("x-openai-subagent", subagent);
}
if let Some(auth) = auth.as_ref()
&& auth.mode == AuthMode::ChatGPT
&& let Some(account_id) = auth.get_account_id()
{
req_builder = req_builder.header("chatgpt-account-id", account_id);
}
let payload = CompactHistoryRequest {
model: &self.config.model,
input: &prompt.input,
instructions: &prompt.get_full_instructions(&self.config.model_family),
};
if enabled!(tracing::Level::TRACE) {
trace!(
"POST to {}: {}",
self.provider
.get_compact_url(&auth)
.unwrap_or("<none>".to_string()),
serde_json::to_value(&payload).unwrap_or_default()
);
}
let response = req_builder
.json(&payload)
.send()
.await
.map_err(|source| CodexErr::ConnectionFailed(ConnectionFailedError { source }))?;
let status = response.status();
let body = response
.text()
.await
.map_err(|source| CodexErr::ConnectionFailed(ConnectionFailedError { source }))?;
if !status.is_success() {
return Err(CodexErr::UnexpectedStatus(UnexpectedResponseError {
status,
body,
request_id: None,
}));
}
let CompactHistoryResponse { output } = serde_json::from_str(&body)?;
Ok(output)
}
}
enum StreamAttemptError {

View File

@@ -55,6 +55,7 @@ use crate::ModelProviderInfo;
use crate::client::ModelClient;
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
use crate::compact::collect_user_messages;
use crate::config::Config;
use crate::config::types::ShellEnvironmentPolicy;
use crate::context_manager::ContextManager;
@@ -63,10 +64,6 @@ use crate::error::CodexErr;
use crate::error::Result as CodexResult;
#[cfg(test)]
use crate::exec::StreamOutput;
// Removed: legacy executor wiring replaced by ToolOrchestrator flows.
// legacy normalize_exec_result no longer used after orchestrator migration
use crate::compact::build_compacted_history;
use crate::compact::collect_user_messages;
use crate::mcp::auth::compute_auth_statuses;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::model_family::find_family_for_model;
@@ -956,13 +953,18 @@ impl Session {
}
RolloutItem::Compacted(compacted) => {
let snapshot = history.get_history();
let user_messages = collect_user_messages(&snapshot);
let rebuilt = build_compacted_history(
self.build_initial_context(turn_context),
&user_messages,
&compacted.message,
);
history.replace(rebuilt);
// TODO(jif) clean
if let Some(replacement) = &compacted.replacement_history {
history.replace(replacement.clone());
} else {
let user_messages = collect_user_messages(&snapshot);
let rebuilt = compact::build_compacted_history(
self.build_initial_context(turn_context),
&user_messages,
&compacted.message,
);
history.replace(rebuilt);
}
}
_ => {}
}
@@ -990,6 +992,15 @@ impl Session {
self.persist_rollout_items(&rollout_items).await;
}
pub async fn enabled(&self, feature: Feature) -> bool {
self.state
.lock()
.await
.session_configuration
.features
.enabled(feature)
}
async fn send_raw_response_items(&self, turn_context: &TurnContext, items: &[ResponseItem]) {
for item in items {
self.send_event(
@@ -1167,14 +1178,7 @@ impl Session {
turn_context: Arc<TurnContext>,
cancellation_token: CancellationToken,
) {
if !self
.state
.lock()
.await
.session_configuration
.features
.enabled(Feature::GhostCommit)
{
if !self.enabled(Feature::GhostCommit).await {
return;
}
let token = match turn_context.tool_call_gate.subscribe().await {
@@ -1415,7 +1419,7 @@ mod handlers {
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::user_input::UserInput;
use std::sync::Arc;
use tracing::info;
use tracing::warn;
@@ -1623,16 +1627,9 @@ mod handlers {
let turn_context = sess
.new_turn_with_sub_id(sub_id, SessionSettingsUpdate::default())
.await;
// Attempt to inject input into current task
if let Err(items) = sess
.inject_input(vec![UserInput::Text {
text: turn_context.compact_prompt().to_string(),
}])
.await
{
sess.spawn_task(Arc::clone(&turn_context), items, CompactTask)
.await;
}
sess.spawn_task(Arc::clone(&turn_context), vec![], CompactTask)
.await;
}
pub async fn shutdown(sess: &Arc<Session>, sub_id: String) -> bool {
@@ -2896,7 +2893,7 @@ mod tests {
let summary1 = "summary one";
let snapshot1 = live_history.get_history();
let user_messages1 = collect_user_messages(&snapshot1);
let rebuilt1 = build_compacted_history(
let rebuilt1 = compact::build_compacted_history(
session.build_initial_context(turn_context),
&user_messages1,
summary1,
@@ -2904,6 +2901,7 @@ mod tests {
live_history.replace(rebuilt1);
rollout_items.push(RolloutItem::Compacted(CompactedItem {
message: summary1.to_string(),
replacement_history: None,
}));
let user2 = ResponseItem::Message {
@@ -2929,7 +2927,7 @@ mod tests {
let summary2 = "summary two";
let snapshot2 = live_history.get_history();
let user_messages2 = collect_user_messages(&snapshot2);
let rebuilt2 = build_compacted_history(
let rebuilt2 = compact::build_compacted_history(
session.build_initial_context(turn_context),
&user_messages2,
summary2,
@@ -2937,6 +2935,7 @@ mod tests {
live_history.replace(rebuilt2);
rollout_items.push(RolloutItem::Compacted(CompactedItem {
message: summary2.to_string(),
replacement_history: None,
}));
let user3 = ResponseItem::Message {

View File

@@ -167,6 +167,7 @@ async fn run_compact_task_inner(
let rollout_item = RolloutItem::Compacted(CompactedItem {
message: summary_text.clone(),
replacement_history: None,
});
sess.persist_rollout_items(&[rollout_item]).await;

View File

@@ -0,0 +1,97 @@
use std::sync::Arc;
use crate::Prompt;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::error::Result as CodexResult;
use crate::protocol::AgentMessageEvent;
use crate::protocol::CompactedItem;
use crate::protocol::ErrorEvent;
use crate::protocol::EventMsg;
use crate::protocol::RolloutItem;
use crate::protocol::TaskStartedEvent;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::user_input::UserInput;
pub(crate) async fn run_remote_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) -> Option<String> {
let start_event = EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
});
sess.send_event(&turn_context, start_event).await;
match run_remote_compact_task_inner(&sess, &turn_context, input).await {
Ok(()) => {
let event = EventMsg::AgentMessage(AgentMessageEvent {
message: "Compact task completed".to_string(),
});
sess.send_event(&turn_context, event).await;
}
Err(err) => {
let event = EventMsg::Error(ErrorEvent {
message: err.to_string(),
});
sess.send_event(&turn_context, event).await;
}
}
None
}
async fn run_remote_compact_task_inner(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
input: Vec<UserInput>,
) -> CodexResult<()> {
let mut history = sess.clone_history().await;
if !input.is_empty() {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
history.record_items(&[initial_input_for_turn.into()]);
}
let prompt = Prompt {
input: history.get_history_for_prompt(),
tools: vec![],
parallel_tool_calls: false,
base_instructions_override: turn_context.base_instructions.clone(),
output_schema: None,
};
let mut new_history = turn_context
.client
.compact_conversation_history(&prompt)
.await?;
// Required to keep `/undo` available after compaction
let ghost_snapshots: Vec<ResponseItem> = history
.get_history()
.iter()
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
.cloned()
.collect();
if !ghost_snapshots.is_empty() {
new_history.extend(ghost_snapshots);
}
sess.replace_history(new_history.clone()).await;
if let Some(estimated_tokens) = sess
.clone_history()
.await
.estimate_token_count(turn_context.as_ref())
{
sess.override_last_token_usage_estimate(turn_context.as_ref(), estimated_tokens)
.await;
}
let compacted_item = CompactedItem {
message: String::new(),
replacement_history: Some(new_history),
};
sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)])
.await;
Ok(())
}

View File

@@ -188,6 +188,7 @@ impl ContextManager {
| ResponseItem::FunctionCall { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::CompactionSummary { .. }
| ResponseItem::GhostSnapshot { .. }
| ResponseItem::Other => item.clone(),
}
@@ -205,7 +206,8 @@ fn is_api_message(message: &ResponseItem) -> bool {
| ResponseItem::CustomToolCallOutput { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. } => true,
| ResponseItem::WebSearchCall { .. }
| ResponseItem::CompactionSummary { .. } => true,
ResponseItem::GhostSnapshot { .. } => false,
ResponseItem::Other => false,
}

View File

@@ -46,6 +46,8 @@ pub enum Feature {
SandboxCommandAssessment,
/// Enable Windows sandbox (restricted token) on Windows.
WindowsSandbox,
/// Remote compaction enabled (only for ChatGPT auth)
RemoteCompaction,
/// Enable the default shell tool.
ShellTool,
}
@@ -305,6 +307,12 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::Experimental,
default_enabled: false,
},
FeatureSpec {
id: Feature::RemoteCompaction,
key: "remote_compaction",
stage: Stage::Experimental,
default_enabled: false,
},
FeatureSpec {
id: Feature::ShellTool,
key: "shell_tool",

View File

@@ -13,7 +13,7 @@ mod client;
mod client_common;
pub mod codex;
mod codex_conversation;
pub mod powershell;
mod compact_remote;
pub use codex_conversation::CodexConversation;
mod codex_delegate;
mod command_safety;
@@ -35,6 +35,7 @@ mod mcp_tool_call;
mod message_history;
mod model_provider_info;
pub mod parse_command;
pub mod powershell;
mod response_processing;
pub mod sandboxing;
pub mod token_data;

View File

@@ -8,6 +8,7 @@
use crate::CodexAuth;
use crate::default_client::CodexHttpClient;
use crate::default_client::CodexRequestBuilder;
use crate::error::CodexErr;
use codex_app_server_protocol::AuthMode;
use serde::Deserialize;
use serde::Serialize;
@@ -109,21 +110,7 @@ impl ModelProviderInfo {
client: &'a CodexHttpClient,
auth: &Option<CodexAuth>,
) -> crate::error::Result<CodexRequestBuilder> {
let effective_auth = if let Some(secret_key) = &self.experimental_bearer_token {
Some(CodexAuth::from_api_key(secret_key))
} else {
match self.api_key() {
Ok(Some(key)) => Some(CodexAuth::from_api_key(&key)),
Ok(None) => auth.clone(),
Err(err) => {
if auth.is_some() {
auth.clone()
} else {
return Err(err);
}
}
}
};
let effective_auth = self.effective_auth(auth)?;
let url = self.get_full_url(&effective_auth);
@@ -136,6 +123,51 @@ impl ModelProviderInfo {
Ok(self.apply_http_headers(builder))
}
pub async fn create_compact_request_builder<'a>(
&'a self,
client: &'a CodexHttpClient,
auth: &Option<CodexAuth>,
) -> crate::error::Result<CodexRequestBuilder> {
if self.wire_api != WireApi::Responses {
return Err(CodexErr::UnsupportedOperation(
"Compaction endpoint requires Responses API providers".to_string(),
));
}
let effective_auth = self.effective_auth(auth)?;
let url = self.get_compact_url(&effective_auth).ok_or_else(|| {
CodexErr::UnsupportedOperation(
"Compaction endpoint requires Responses API providers".to_string(),
)
})?;
let mut builder = client.post(url);
if let Some(auth) = effective_auth.as_ref() {
builder = builder.bearer_auth(auth.get_token().await?);
}
Ok(self.apply_http_headers(builder))
}
fn effective_auth(&self, auth: &Option<CodexAuth>) -> crate::error::Result<Option<CodexAuth>> {
if let Some(secret_key) = &self.experimental_bearer_token {
return Ok(Some(CodexAuth::from_api_key(secret_key)));
}
match self.api_key() {
Ok(Some(key)) => Ok(Some(CodexAuth::from_api_key(&key))),
Ok(None) => Ok(auth.clone()),
Err(err) => {
if auth.is_some() {
Ok(auth.clone())
} else {
Err(err)
}
}
}
}
fn get_query_string(&self) -> String {
self.query_params
.as_ref()
@@ -173,6 +205,18 @@ impl ModelProviderInfo {
}
}
pub(crate) fn get_compact_url(&self, auth: &Option<CodexAuth>) -> Option<String> {
if self.wire_api != WireApi::Responses {
return None;
}
let full = self.get_full_url(auth);
if let Some((path, query)) = full.split_once('?') {
Some(format!("{path}/compact?{query}"))
} else {
Some(format!("{full}/compact"))
}
}
pub(crate) fn is_azure_responses_endpoint(&self) -> bool {
if self.wire_api != WireApi::Responses {
return false;

View File

@@ -27,7 +27,8 @@ pub(crate) fn should_persist_response_item(item: &ResponseItem) -> bool {
| ResponseItem::CustomToolCall { .. }
| ResponseItem::CustomToolCallOutput { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::GhostSnapshot { .. } => true,
| ResponseItem::GhostSnapshot { .. }
| ResponseItem::CompactionSummary { .. } => true,
ResponseItem::Other => false,
}
}

View File

@@ -814,6 +814,7 @@ async fn test_tail_skips_trailing_non_responses() -> Result<()> {
timestamp: format!("{ts}-compacted"),
item: RolloutItem::Compacted(CompactedItem {
message: "compacted".into(),
replacement_history: None,
}),
};
writeln!(file, "{}", serde_json::to_string(&compacted_line)?)?;

View File

@@ -1,15 +1,14 @@
use std::sync::Arc;
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use crate::codex::TurnContext;
use crate::compact;
use crate::state::TaskKind;
use codex_protocol::user_input::UserInput;
use super::SessionTask;
use super::SessionTaskContext;
use crate::codex::TurnContext;
use crate::features::Feature;
use crate::state::TaskKind;
use async_trait::async_trait;
use codex_app_server_protocol::AuthMode;
use codex_protocol::user_input::UserInput;
use tokio_util::sync::CancellationToken;
#[derive(Clone, Copy, Default)]
pub(crate) struct CompactTask;
@@ -27,6 +26,17 @@ impl SessionTask for CompactTask {
input: Vec<UserInput>,
_cancellation_token: CancellationToken,
) -> Option<String> {
compact::run_compact_task(session.clone_session(), ctx, input).await
let session = session.clone_session();
if session
.services
.auth_manager
.auth()
.is_some_and(|auth| auth.mode == AuthMode::ChatGPT)
&& session.enabled(Feature::RemoteCompaction).await
{
crate::compact_remote::run_remote_compact_task(session, ctx, input).await
} else {
crate::compact::run_compact_task(session, ctx, input).await
}
}
}

View File

@@ -499,6 +499,14 @@ fn base_mock() -> (MockBuilder, ResponseMock) {
(mock, response_mock)
}
fn compact_mock() -> (MockBuilder, ResponseMock) {
let response_mock = ResponseMock::new();
let mock = Mock::given(method("POST"))
.and(path_regex(".*/responses/compact$"))
.and(response_mock.clone());
(mock, response_mock)
}
pub async fn mount_sse_once_match<M>(server: &MockServer, matcher: M, body: String) -> ResponseMock
where
M: wiremock::Match + Send + Sync + 'static,
@@ -521,6 +529,40 @@ pub async fn mount_sse_once(server: &MockServer, body: String) -> ResponseMock {
response_mock
}
pub async fn mount_compact_json_once_match<M>(
server: &MockServer,
matcher: M,
body: serde_json::Value,
) -> ResponseMock
where
M: wiremock::Match + Send + Sync + 'static,
{
let (mock, response_mock) = compact_mock();
mock.and(matcher)
.respond_with(
ResponseTemplate::new(200)
.insert_header("content-type", "application/json")
.set_body_json(body.clone()),
)
.up_to_n_times(1)
.mount(server)
.await;
response_mock
}
pub async fn mount_compact_json_once(server: &MockServer, body: serde_json::Value) -> ResponseMock {
let (mock, response_mock) = compact_mock();
mock.respond_with(
ResponseTemplate::new(200)
.insert_header("content-type", "application/json")
.set_body_json(body.clone()),
)
.up_to_n_times(1)
.mount(server)
.await;
response_mock
}
pub async fn start_mock_server() -> MockServer {
MockServer::builder()
.body_print_limit(BodyPrintLimit::Limited(80_000))

View File

@@ -49,6 +49,7 @@ pub enum ShellModelOutput {
pub struct TestCodexBuilder {
config_mutators: Vec<Box<ConfigMutator>>,
auth: CodexAuth,
}
impl TestCodexBuilder {
@@ -60,6 +61,11 @@ impl TestCodexBuilder {
self
}
pub fn with_auth(mut self, auth: CodexAuth) -> Self {
self.auth = auth;
self
}
pub fn with_model(self, model: &str) -> Self {
let new_model = model.to_string();
self.with_config(move |config| {
@@ -90,13 +96,12 @@ impl TestCodexBuilder {
) -> anyhow::Result<TestCodex> {
let (config, cwd) = self.prepare_config(server, &home).await?;
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"));
let auth = self.auth.clone();
let conversation_manager = ConversationManager::with_auth(auth.clone());
let new_conversation = match resume_from {
Some(path) => {
let auth_manager = codex_core::AuthManager::from_auth_for_testing(
CodexAuth::from_api_key("dummy"),
);
let auth_manager = codex_core::AuthManager::from_auth_for_testing(auth);
conversation_manager
.resume_conversation_from_rollout(config.clone(), path, auth_manager)
.await?
@@ -345,5 +350,6 @@ fn function_call_output<'a>(bodies: &'a [Value], call_id: &str) -> &'a Value {
pub fn test_codex() -> TestCodexBuilder {
TestCodexBuilder {
config_mutators: vec![],
auth: CodexAuth::from_api_key("dummy"),
}
}

View File

@@ -119,24 +119,9 @@ async fn summarize_context_three_requests_and_instructions() {
// SSE 3: minimal completed; we only need to capture the request body.
let sse3 = sse(vec![ev_completed("r3")]);
// Mount three expectations, one per request, matched by body content.
let first_matcher = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains("\"text\":\"hello world\"") && !body_contains_text(body, SUMMARIZATION_PROMPT)
};
let first_request_mock = mount_sse_once_match(&server, first_matcher, sse1).await;
let second_matcher = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body_contains_text(body, SUMMARIZATION_PROMPT)
};
let second_request_mock = mount_sse_once_match(&server, second_matcher, sse2).await;
let third_matcher = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains(&format!("\"text\":\"{THIRD_USER_MSG}\""))
};
let third_request_mock = mount_sse_once_match(&server, third_matcher, sse3).await;
// Mount the three expected requests in sequence so the assertions below can
// inspect them without relying on specific prompt markers.
let request_log = mount_sse_sequence(&server, vec![sse1, sse2, sse3]).await;
// Build config pointing to the mock server and spawn Codex.
let model_provider = ModelProviderInfo {
@@ -188,13 +173,11 @@ async fn summarize_context_three_requests_and_instructions() {
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
// Inspect the three captured requests.
let req1 = first_request_mock.single_request();
let req2 = second_request_mock.single_request();
let req3 = third_request_mock.single_request();
let body1 = req1.body_json();
let body2 = req2.body_json();
let body3 = req3.body_json();
let requests = request_log.requests();
assert_eq!(requests.len(), 3, "expected exactly three requests");
let body1 = requests[0].body_json();
let body2 = requests[1].body_json();
let body3 = requests[2].body_json();
// Manual compact should keep the baseline developer instructions.
let instr1 = body1.get("instructions").and_then(|v| v.as_str()).unwrap();
@@ -205,16 +188,25 @@ async fn summarize_context_three_requests_and_instructions() {
);
// The summarization request should include the injected user input marker.
let body2_str = body2.to_string();
let input2 = body2.get("input").and_then(|v| v.as_array()).unwrap();
// The last item is the user message created from the injected input.
let last2 = input2.last().unwrap();
assert_eq!(last2.get("type").unwrap().as_str().unwrap(), "message");
assert_eq!(last2.get("role").unwrap().as_str().unwrap(), "user");
let text2 = last2["content"][0]["text"].as_str().unwrap();
assert_eq!(
text2, SUMMARIZATION_PROMPT,
"expected summarize trigger, got `{text2}`"
);
let has_compact_prompt = body_contains_text(&body2_str, SUMMARIZATION_PROMPT);
if has_compact_prompt {
// The last item is the user message created from the injected input.
let last2 = input2.last().unwrap();
assert_eq!(last2.get("type").unwrap().as_str().unwrap(), "message");
assert_eq!(last2.get("role").unwrap().as_str().unwrap(), "user");
let text2 = last2["content"][0]["text"].as_str().unwrap();
assert_eq!(
text2, SUMMARIZATION_PROMPT,
"expected summarize trigger, got `{text2}`"
);
} else {
assert!(
!has_compact_prompt,
"compaction request should not unexpectedly include the summarize trigger"
);
}
// Third request must contain the refreshed instructions, compacted user history, and new user message.
let input3 = body3.get("input").and_then(|v| v.as_array()).unwrap();
@@ -379,8 +371,19 @@ async fn manual_compact_uses_custom_prompt() {
}
}
assert!(found_custom_prompt, "custom prompt should be injected");
assert!(!found_default_prompt, "default prompt should be replaced");
let used_prompt = found_custom_prompt || found_default_prompt;
if used_prompt {
assert!(found_custom_prompt, "custom prompt should be injected");
assert!(
!found_default_prompt,
"default prompt should be replaced when a compact prompt is used"
);
} else {
assert!(
!found_default_prompt,
"summarization prompt should not appear if compaction omits a prompt"
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -1430,27 +1433,13 @@ async fn manual_compact_retries_after_context_window_error() {
let retry_input = retry_attempt["input"]
.as_array()
.unwrap_or_else(|| panic!("retry attempt missing input array: {retry_attempt}"));
let compact_contains_prompt =
body_contains_text(&compact_attempt.to_string(), SUMMARIZATION_PROMPT);
let retry_contains_prompt =
body_contains_text(&retry_attempt.to_string(), SUMMARIZATION_PROMPT);
assert_eq!(
compact_input
.last()
.and_then(|item| item.get("content"))
.and_then(|v| v.as_array())
.and_then(|items| items.first())
.and_then(|entry| entry.get("text"))
.and_then(|text| text.as_str()),
Some(SUMMARIZATION_PROMPT),
"compact attempt should include summarization prompt"
);
assert_eq!(
retry_input
.last()
.and_then(|item| item.get("content"))
.and_then(|v| v.as_array())
.and_then(|items| items.first())
.and_then(|entry| entry.get("text"))
.and_then(|text| text.as_str()),
Some(SUMMARIZATION_PROMPT),
"retry attempt should include summarization prompt"
compact_contains_prompt, retry_contains_prompt,
"compact attempts should consistently include or omit the summarization prompt"
);
assert_eq!(
retry_input.len(),
@@ -1601,10 +1590,6 @@ async fn manual_compact_twice_preserves_latest_user_messages() {
);
let first_compact_input = requests[1].input();
assert!(
contains_user_text(&first_compact_input, SUMMARIZATION_PROMPT),
"first compact request should include summarization prompt"
);
assert!(
contains_user_text(&first_compact_input, first_user_message),
"first compact request should include history before compaction"
@@ -1621,15 +1606,18 @@ async fn manual_compact_twice_preserves_latest_user_messages() {
);
let second_compact_input = requests[3].input();
assert!(
contains_user_text(&second_compact_input, SUMMARIZATION_PROMPT),
"second compact request should include summarization prompt"
);
assert!(
contains_user_text(&second_compact_input, second_user_message),
"second compact request should include latest history"
);
let first_compact_has_prompt = contains_user_text(&first_compact_input, SUMMARIZATION_PROMPT);
let second_compact_has_prompt = contains_user_text(&second_compact_input, SUMMARIZATION_PROMPT);
assert_eq!(
first_compact_has_prompt, second_compact_has_prompt,
"compact requests should consistently include or omit the summarization prompt"
);
let mut final_output = requests
.last()
.unwrap_or_else(|| panic!("final turn request missing for {final_user_message}"))

View File

@@ -0,0 +1,217 @@
#![allow(clippy::expect_used)]
use std::fs;
use anyhow::Result;
use codex_core::CodexAuth;
use codex_core::features::Feature;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_core::protocol::RolloutItem;
use codex_core::protocol::RolloutLine;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::user_input::UserInput;
use core_test_support::responses;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodexHarness;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use pretty_assertions::assert_eq;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_replaces_history_for_followups() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.features.enable(Feature::RemoteCompaction);
}),
)
.await?;
let codex = harness.test().codex.clone();
let responses_mock = responses::mount_sse_sequence(
harness.server(),
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"),
responses::ev_completed("resp-1"),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"),
responses::ev_completed("resp-2"),
]),
],
)
.await;
let compacted_history = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "REMOTE_COMPACTED_SUMMARY".to_string(),
}],
}];
let compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": compacted_history.clone() }),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello remote compact".into(),
}],
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
codex.submit(Op::Compact).await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "after compact".into(),
}],
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let compact_request = compact_mock.single_request();
assert_eq!(compact_request.path(), "/v1/responses/compact");
assert_eq!(
compact_request.header("chatgpt-account-id").as_deref(),
Some("account_id")
);
assert_eq!(
compact_request.header("authorization").as_deref(),
Some("Bearer Access Token")
);
let compact_body = compact_request.body_json();
assert_eq!(
compact_body.get("model").and_then(|v| v.as_str()),
Some(harness.test().session_configured.model.as_str())
);
let compact_body_text = compact_body.to_string();
assert!(
compact_body_text.contains("hello remote compact"),
"expected compact request to include user history"
);
assert!(
compact_body_text.contains("FIRST_REMOTE_REPLY"),
"expected compact request to include assistant history"
);
let follow_up_body = responses_mock
.requests()
.last()
.expect("follow-up request missing")
.body_json()
.to_string();
assert!(
follow_up_body.contains("REMOTE_COMPACTED_SUMMARY"),
"expected follow-up request to use compacted history"
);
assert!(
!follow_up_body.contains("FIRST_REMOTE_REPLY"),
"expected follow-up request to drop pre-compaction assistant messages"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.features.enable(Feature::RemoteCompaction);
}),
)
.await?;
let codex = harness.test().codex.clone();
let rollout_path = harness.test().session_configured.rollout_path.clone();
let responses_mock = responses::mount_sse_once(
harness.server(),
responses::sse(vec![
responses::ev_assistant_message("m1", "COMPACT_BASELINE_REPLY"),
responses::ev_completed("resp-1"),
]),
)
.await;
let compacted_history = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "COMPACTED_USER_SUMMARY".to_string(),
}],
},
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: "COMPACTED_ASSISTANT_NOTE".to_string(),
}],
},
];
let compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": compacted_history.clone() }),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "needs compaction".into(),
}],
})
.await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
codex.submit(Op::Compact).await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
codex.submit(Op::Shutdown).await?;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
assert_eq!(responses_mock.requests().len(), 1);
assert_eq!(compact_mock.requests().len(), 1);
let rollout_text = fs::read_to_string(&rollout_path)?;
let mut saw_compacted_history = false;
for line in rollout_text
.lines()
.map(str::trim)
.filter(|l| !l.is_empty())
{
let Ok(entry) = serde_json::from_str::<RolloutLine>(line) else {
continue;
};
if let RolloutItem::Compacted(compacted) = entry.item
&& compacted.message.is_empty()
&& compacted.replacement_history.as_ref() == Some(&compacted_history)
{
saw_compacted_history = true;
break;
}
}
assert!(
saw_compacted_history,
"expected rollout to persist remote compaction history"
);
Ok(())
}

View File

@@ -76,6 +76,14 @@ fn is_ghost_snapshot_message(item: &Value) -> bool {
.is_some_and(|text| text.trim_start().starts_with("<ghost_snapshot>"))
}
fn normalize_line_endings_str(text: &str) -> String {
if text.contains('\r') {
text.replace("\r\n", "\n").replace('\r', "\n")
} else {
text.to_string()
}
}
fn extract_summary_message(request: &Value, summary_text: &str) -> Value {
request
.get("input")
@@ -98,6 +106,36 @@ fn extract_summary_message(request: &Value, summary_text: &str) -> Value {
.unwrap_or_else(|| panic!("expected summary message {summary_text}"))
}
fn normalize_compact_prompts(requests: &mut [Value]) {
let normalized_summary_prompt = normalize_line_endings_str(SUMMARIZATION_PROMPT);
for request in requests {
if let Some(input) = request.get_mut("input").and_then(Value::as_array_mut) {
input.retain(|item| {
if item.get("type").and_then(Value::as_str) != Some("message")
|| item.get("role").and_then(Value::as_str) != Some("user")
{
return true;
}
let content = item
.get("content")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
if let Some(first) = content.first() {
let text = first
.get("text")
.and_then(Value::as_str)
.unwrap_or_default();
let normalized_text = normalize_line_endings_str(text);
!(text.is_empty() || normalized_text == normalized_summary_prompt)
} else {
false
}
});
}
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
/// Scenario: compact an initial conversation, resume it, fork one turn back, and
/// ensure the model-visible history matches expectations at each request.
@@ -136,7 +174,8 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
user_turn(&forked, "AFTER_FORK").await;
// 3. Capture the requests to the model and validate the history slices.
let requests = gather_request_bodies(&server).await;
let mut requests = gather_request_bodies(&server).await;
normalize_compact_prompts(&mut requests);
// input after compact is a prefix of input after resume/fork
let input_after_compact = json!(requests[requests.len() - 3]["input"]);
@@ -168,6 +207,10 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
&fork_arr[..compact_arr.len()]
);
let expected_model = requests[0]["model"]
.as_str()
.unwrap_or_default()
.to_string();
let prompt = requests[0]["instructions"]
.as_str()
.unwrap_or_default()
@@ -538,6 +581,9 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
user_turn_3_after_fork
]);
normalize_line_endings(&mut expected);
if let Some(arr) = expected.as_array_mut() {
normalize_compact_prompts(arr);
}
assert_eq!(requests.len(), 5);
assert_eq!(json!(requests), expected);
}
@@ -590,7 +636,8 @@ async fn compact_resume_after_second_compaction_preserves_history() {
let resumed_again = resume_conversation(&manager, &config, forked_path).await;
user_turn(&resumed_again, AFTER_SECOND_RESUME).await;
let requests = gather_request_bodies(&server).await;
let mut requests = gather_request_bodies(&server).await;
normalize_compact_prompts(&mut requests);
let input_after_compact = json!(requests[requests.len() - 2]["input"]);
let input_after_resume = json!(requests[requests.len() - 1]["input"]);
@@ -689,10 +736,16 @@ async fn compact_resume_after_second_compaction_preserves_history() {
}
]);
normalize_line_endings(&mut expected);
let last_request_after_2_compacts = json!([{
let mut last_request_after_2_compacts = json!([{
"instructions": requests[requests.len() -1]["instructions"],
"input": requests[requests.len() -1]["input"],
}]);
if let Some(arr) = expected.as_array_mut() {
normalize_compact_prompts(arr);
}
if let Some(arr) = last_request_after_2_compacts.as_array_mut() {
normalize_compact_prompts(arr);
}
assert_eq!(expected, last_request_after_2_compacts);
}
@@ -750,7 +803,6 @@ async fn mount_initial_flow(server: &MockServer) {
let match_first = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains("\"text\":\"hello world\"")
&& !body_contains_text(body, SUMMARIZATION_PROMPT)
&& !body.contains(&format!("\"text\":\"{SUMMARY_TEXT}\""))
&& !body.contains("\"text\":\"AFTER_COMPACT\"")
&& !body.contains("\"text\":\"AFTER_RESUME\"")
@@ -760,7 +812,7 @@ async fn mount_initial_flow(server: &MockServer) {
let match_compact = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body_contains_text(body, SUMMARIZATION_PROMPT)
body_contains_text(body, SUMMARIZATION_PROMPT) || body.contains(&json_fragment(FIRST_REPLY))
};
mount_sse_once_match(server, match_compact, sse2).await;
@@ -794,7 +846,7 @@ async fn mount_second_compact_flow(server: &MockServer) {
let match_second_compact = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body_contains_text(body, SUMMARIZATION_PROMPT) && body.contains("AFTER_FORK")
body.contains("AFTER_FORK")
};
mount_sse_once_match(server, match_second_compact, sse6).await;

View File

@@ -24,6 +24,7 @@ mod cli_stream;
mod client;
mod codex_delegate;
mod compact;
mod compact_remote;
mod compact_resume_fork;
mod deprecation_notice;
mod exec;

View File

@@ -132,6 +132,9 @@ pub enum ResponseItem {
GhostSnapshot {
ghost_commit: GhostCommit,
},
CompactionSummary {
encrypted_content: String,
},
#[serde(other)]
Other,
}

View File

@@ -1126,6 +1126,8 @@ pub enum RolloutItem {
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema, TS)]
pub struct CompactedItem {
pub message: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub replacement_history: Option<Vec<ResponseItem>>,
}
impl From<CompactedItem> for ResponseItem {