#![allow(clippy::expect_used)] use std::fs; use anyhow::Result; use codex_core::CodexAuth; use codex_core::features::Feature; use codex_core::protocol::EventMsg; use codex_core::protocol::Op; use codex_core::protocol::RolloutItem; use codex_core::protocol::RolloutLine; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; use codex_protocol::user_input::UserInput; use core_test_support::responses; use core_test_support::skip_if_no_network; use core_test_support::test_codex::TestCodexHarness; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn remote_compact_replaces_history_for_followups() -> Result<()> { skip_if_no_network!(Ok(())); let harness = TestCodexHarness::with_builder( test_codex() .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) .with_config(|config| { config.features.enable(Feature::RemoteCompaction); }), ) .await?; let codex = harness.test().codex.clone(); let responses_mock = responses::mount_sse_sequence( harness.server(), vec![ responses::sse(vec![ responses::ev_assistant_message("m1", "FIRST_REMOTE_REPLY"), responses::ev_completed("resp-1"), ]), responses::sse(vec![ responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"), responses::ev_completed("resp-2"), ]), ], ) .await; let compacted_history = vec![ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "REMOTE_COMPACTED_SUMMARY".to_string(), }], }]; let compact_mock = responses::mount_compact_json_once( harness.server(), serde_json::json!({ "output": compacted_history.clone() }), ) .await; codex .submit(Op::UserInput { items: vec![UserInput::Text { text: "hello remote compact".into(), }], }) .await?; wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; codex.submit(Op::Compact).await?; wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; codex .submit(Op::UserInput { items: vec![UserInput::Text { text: "after compact".into(), }], }) .await?; wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; let compact_request = compact_mock.single_request(); assert_eq!(compact_request.path(), "/v1/responses/compact"); assert_eq!( compact_request.header("chatgpt-account-id").as_deref(), Some("account_id") ); assert_eq!( compact_request.header("authorization").as_deref(), Some("Bearer Access Token") ); let compact_body = compact_request.body_json(); assert_eq!( compact_body.get("model").and_then(|v| v.as_str()), Some(harness.test().session_configured.model.as_str()) ); let compact_body_text = compact_body.to_string(); assert!( compact_body_text.contains("hello remote compact"), "expected compact request to include user history" ); assert!( compact_body_text.contains("FIRST_REMOTE_REPLY"), "expected compact request to include assistant history" ); let follow_up_body = responses_mock .requests() .last() .expect("follow-up request missing") .body_json() .to_string(); assert!( follow_up_body.contains("REMOTE_COMPACTED_SUMMARY"), "expected follow-up request to use compacted history" ); assert!( !follow_up_body.contains("FIRST_REMOTE_REPLY"), "expected follow-up request to drop pre-compaction assistant messages" ); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> { skip_if_no_network!(Ok(())); let harness = TestCodexHarness::with_builder( test_codex() .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) .with_config(|config| { config.features.enable(Feature::RemoteCompaction); }), ) .await?; let codex = harness.test().codex.clone(); let rollout_path = harness.test().session_configured.rollout_path.clone(); let responses_mock = responses::mount_sse_once( harness.server(), responses::sse(vec![ responses::ev_assistant_message("m1", "COMPACT_BASELINE_REPLY"), responses::ev_completed("resp-1"), ]), ) .await; let compacted_history = vec![ ResponseItem::Message { id: None, role: "user".to_string(), content: vec![ContentItem::InputText { text: "COMPACTED_USER_SUMMARY".to_string(), }], }, ResponseItem::Message { id: None, role: "assistant".to_string(), content: vec![ContentItem::OutputText { text: "COMPACTED_ASSISTANT_NOTE".to_string(), }], }, ]; let compact_mock = responses::mount_compact_json_once( harness.server(), serde_json::json!({ "output": compacted_history.clone() }), ) .await; codex .submit(Op::UserInput { items: vec![UserInput::Text { text: "needs compaction".into(), }], }) .await?; wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; codex.submit(Op::Compact).await?; wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; codex.submit(Op::Shutdown).await?; wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await; assert_eq!(responses_mock.requests().len(), 1); assert_eq!(compact_mock.requests().len(), 1); let rollout_text = fs::read_to_string(&rollout_path)?; let mut saw_compacted_history = false; for line in rollout_text .lines() .map(str::trim) .filter(|l| !l.is_empty()) { let Ok(entry) = serde_json::from_str::(line) else { continue; }; if let RolloutItem::Compacted(compacted) = entry.item && compacted.message.is_empty() && compacted.replacement_history.as_ref() == Some(&compacted_history) { saw_compacted_history = true; break; } } assert!( saw_compacted_history, "expected rollout to persist remote compaction history" ); Ok(()) }