Compare commits

...

3 Commits

Author SHA1 Message Date
Ahmed Ibrahim
377b251d59 tests 2025-10-24 13:32:54 -07:00
Ahmed Ibrahim
acd62cc610 centralized_processing 2025-10-24 13:15:00 -07:00
Ahmed Ibrahim
0e30347746 centralized_processing 2025-10-24 13:14:44 -07:00
9 changed files with 270 additions and 105 deletions

View File

@@ -59,7 +59,7 @@ use crate::client_common::ResponseEvent;
use crate::config::Config;
use crate::config_types::McpServerTransportConfig;
use crate::config_types::ShellEnvironmentPolicy;
use crate::conversation_history::ConversationHistory;
use crate::context_manager::ContextManager;
use crate::environment_context::EnvironmentContext;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
@@ -867,7 +867,7 @@ impl Session {
turn_context: &TurnContext,
rollout_items: &[RolloutItem],
) -> Vec<ResponseItem> {
let mut history = ConversationHistory::new();
let mut history = ContextManager::new();
for item in rollout_items {
match item {
RolloutItem::ResponseItem(response_item) => {
@@ -941,7 +941,7 @@ impl Session {
state.history_snapshot()
}
pub(crate) async fn clone_history(&self) -> ConversationHistory {
pub(crate) async fn clone_history(&self) -> ContextManager {
let state = self.state.lock().await;
state.clone_history()
}
@@ -1524,7 +1524,7 @@ pub(crate) async fn run_task(
// For normal turns, continue recording to the session history as before.
let is_review_mode = turn_context.is_review_mode;
let mut review_thread_history: ConversationHistory = ConversationHistory::new();
let mut review_thread_history: ContextManager = ContextManager::new();
if is_review_mode {
// Seed review threads with environment context so the model knows the working directory.
review_thread_history
@@ -2843,7 +2843,7 @@ mod tests {
turn_context: &TurnContext,
) -> (Vec<RolloutItem>, Vec<ResponseItem>) {
let mut rollout_items = Vec::new();
let mut live_history = ConversationHistory::new();
let mut live_history = ContextManager::new();
let initial_context = session.build_initial_context(turn_context);
for item in &initial_context {

View File

@@ -1,3 +1,4 @@
use crate::context_manager::truncation::truncate_context_output;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::TokenUsage;
@@ -6,13 +7,13 @@ use tracing::error;
/// Transcript of conversation history
#[derive(Debug, Clone, Default)]
pub(crate) struct ConversationHistory {
pub(crate) struct ContextManager {
/// The oldest items are at the beginning of the vector.
items: Vec<ResponseItem>,
token_info: Option<TokenUsageInfo>,
}
impl ConversationHistory {
impl ContextManager {
pub(crate) fn new() -> Self {
Self {
items: Vec::new(),
@@ -44,7 +45,8 @@ impl ConversationHistory {
continue;
}
self.items.push(item.clone());
let processed = Self::process_item(&item);
self.items.push(processed);
}
}
@@ -76,6 +78,29 @@ impl ConversationHistory {
self.remove_orphan_outputs();
}
fn process_item(item: &ResponseItem) -> ResponseItem {
match item {
ResponseItem::FunctionCallOutput { call_id, output } => {
let truncated_content = truncate_context_output(output.content.as_str());
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: FunctionCallOutputPayload {
content: truncated_content,
success: output.success,
},
}
}
ResponseItem::CustomToolCallOutput { call_id, output } => {
let truncated = truncate_context_output(output);
ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
output: truncated,
}
}
_ => item.clone(),
}
}
/// Returns a clone of the contents in the transcript.
fn contents(&self) -> Vec<ResponseItem> {
self.items.clone()
@@ -106,7 +131,7 @@ impl ConversationHistory {
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: FunctionCallOutputPayload {
content: "aborted".to_string(),
content: truncate_context_output("aborted"),
success: None,
},
},
@@ -129,7 +154,7 @@ impl ConversationHistory {
idx,
ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
output: "aborted".to_string(),
output: truncate_context_output("aborted"),
},
));
}
@@ -153,7 +178,7 @@ impl ConversationHistory {
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: FunctionCallOutputPayload {
content: "aborted".to_string(),
content: truncate_context_output("aborted"),
success: None,
},
},
@@ -249,7 +274,10 @@ impl ConversationHistory {
}
pub(crate) fn replace(&mut self, items: Vec<ResponseItem>) {
self.items = items;
self.items = items
.into_iter()
.map(|item| Self::process_item(&item))
.collect();
}
/// Removes the corresponding paired item for the provided `item`, if any.
@@ -362,6 +390,8 @@ fn is_api_message(message: &ResponseItem) -> bool {
#[cfg(test)]
mod tests {
use super::*;
use crate::context_manager::truncation::TELEMETRY_PREVIEW_MAX_BYTES;
use crate::context_manager::truncation::TELEMETRY_PREVIEW_TRUNCATION_NOTICE;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::LocalShellAction;
@@ -379,8 +409,8 @@ mod tests {
}
}
fn create_history_with_items(items: Vec<ResponseItem>) -> ConversationHistory {
let mut h = ConversationHistory::new();
fn create_history_with_items(items: Vec<ResponseItem>) -> ContextManager {
let mut h = ContextManager::new();
h.record_items(items.iter());
h
}
@@ -397,7 +427,7 @@ mod tests {
#[test]
fn filters_non_api_messages() {
let mut h = ConversationHistory::default();
let mut h = ContextManager::default();
// System message is not an API message; Other is ignored.
let system = ResponseItem::Message {
id: None,
@@ -435,6 +465,61 @@ mod tests {
);
}
#[test]
fn record_items_truncates_function_call_output() {
let mut h = ContextManager::new();
let long_content = "a".repeat(TELEMETRY_PREVIEW_MAX_BYTES + 32);
let item = ResponseItem::FunctionCallOutput {
call_id: "call-long".to_string(),
output: FunctionCallOutputPayload {
content: long_content.clone(),
success: Some(true),
},
};
h.record_items([&item]);
let stored = h.contents();
let ResponseItem::FunctionCallOutput { output, .. } = &stored[0] else {
panic!("expected FunctionCallOutput variant");
};
assert!(
output
.content
.ends_with(TELEMETRY_PREVIEW_TRUNCATION_NOTICE),
"truncated content should end with notice"
);
assert!(
output.content.len() < long_content.len(),
"content should shrink after truncation"
);
}
#[test]
fn record_items_truncates_custom_tool_output() {
let mut h = ContextManager::new();
let long_content = "b".repeat(TELEMETRY_PREVIEW_MAX_BYTES + 64);
let item = ResponseItem::CustomToolCallOutput {
call_id: "custom-long".to_string(),
output: long_content.clone(),
};
h.record_items([&item]);
let stored = h.contents();
let ResponseItem::CustomToolCallOutput { output, .. } = &stored[0] else {
panic!("expected CustomToolCallOutput variant");
};
assert!(
output.ends_with(TELEMETRY_PREVIEW_TRUNCATION_NOTICE),
"truncated output should end with notice"
);
assert!(
output.len() < long_content.len(),
"output should shrink after truncation"
);
}
#[test]
fn remove_first_item_removes_matching_output_for_function_call() {
let items = vec![

View File

@@ -0,0 +1,3 @@
mod manager;
pub(crate) use manager::ContextManager;
pub mod truncation;

View File

@@ -0,0 +1,159 @@
use codex_utils_string::take_bytes_at_char_boundary;
#[derive(Clone, Copy)]
pub(crate) struct TruncationConfig {
pub max_bytes: usize,
pub max_lines: usize,
pub truncation_notice: &'static str,
}
// Telemetry preview limits: keep log events smaller than model budgets.
pub(crate) const TELEMETRY_PREVIEW_MAX_BYTES: usize = 2 * 1024; // 2 KiB
pub(crate) const TELEMETRY_PREVIEW_MAX_LINES: usize = 64; // lines
pub(crate) const TELEMETRY_PREVIEW_TRUNCATION_NOTICE: &str =
"[... telemetry preview truncated ...]";
pub(crate) const CONTEXT_OUTPUT_TRUNCATION: TruncationConfig = TruncationConfig {
max_bytes: TELEMETRY_PREVIEW_MAX_BYTES,
max_lines: TELEMETRY_PREVIEW_MAX_LINES,
truncation_notice: TELEMETRY_PREVIEW_TRUNCATION_NOTICE,
};
pub(crate) fn truncate_with_config(content: &str, config: TruncationConfig) -> String {
let TruncationConfig {
max_bytes,
max_lines,
truncation_notice,
} = config;
let truncated_slice = take_bytes_at_char_boundary(content, max_bytes);
let truncated_by_bytes = truncated_slice.len() < content.len();
let mut preview = String::new();
let mut lines_iter = truncated_slice.lines();
for idx in 0..max_lines {
match lines_iter.next() {
Some(line) => {
if idx > 0 {
preview.push('\n');
}
preview.push_str(line);
}
None => break,
}
}
let truncated_by_lines = lines_iter.next().is_some();
if !truncated_by_bytes && !truncated_by_lines {
return content.to_string();
}
if preview.len() < truncated_slice.len()
&& truncated_slice
.as_bytes()
.get(preview.len())
.is_some_and(|byte| *byte == b'\n')
{
preview.push('\n');
}
if !preview.is_empty() && !preview.ends_with('\n') {
preview.push('\n');
}
preview.push_str(truncation_notice);
preview
}
pub(crate) fn truncate_context_output(content: &str) -> String {
truncate_with_config(content, CONTEXT_OUTPUT_TRUNCATION)
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn truncate_with_config_returns_original_within_limits() {
let content = "short output";
let config = TruncationConfig {
max_bytes: 64,
max_lines: 5,
truncation_notice: "[notice]",
};
assert_eq!(truncate_with_config(content, config), content);
}
#[test]
fn truncate_with_config_truncates_by_bytes() {
let config = TruncationConfig {
max_bytes: 16,
max_lines: 10,
truncation_notice: "[notice]",
};
let content = "abcdefghijklmnopqrstuvwxyz";
let truncated = truncate_with_config(content, config);
assert!(truncated.contains("[notice]"));
}
#[test]
fn truncate_with_config_truncates_by_lines() {
let config = TruncationConfig {
max_bytes: 1024,
max_lines: 2,
truncation_notice: "[notice]",
};
let content = "l1\nl2\nl3\nl4";
let truncated = truncate_with_config(content, config);
assert!(truncated.lines().count() <= 3);
assert!(truncated.contains("[notice]"));
}
#[test]
fn telemetry_preview_returns_original_within_limits() {
let content = "short output";
let config = TruncationConfig {
max_bytes: TELEMETRY_PREVIEW_MAX_BYTES,
max_lines: TELEMETRY_PREVIEW_MAX_LINES,
truncation_notice: TELEMETRY_PREVIEW_TRUNCATION_NOTICE,
};
assert_eq!(truncate_with_config(content, config), content);
}
#[test]
fn telemetry_preview_truncates_by_bytes() {
let config = TruncationConfig {
max_bytes: TELEMETRY_PREVIEW_MAX_BYTES,
max_lines: TELEMETRY_PREVIEW_MAX_LINES,
truncation_notice: TELEMETRY_PREVIEW_TRUNCATION_NOTICE,
};
let content = "x".repeat(TELEMETRY_PREVIEW_MAX_BYTES + 8);
let preview = truncate_with_config(&content, config);
assert!(preview.contains(TELEMETRY_PREVIEW_TRUNCATION_NOTICE));
assert!(
preview.len()
<= TELEMETRY_PREVIEW_MAX_BYTES + TELEMETRY_PREVIEW_TRUNCATION_NOTICE.len() + 1
);
}
#[test]
fn telemetry_preview_truncates_by_lines() {
let config = TruncationConfig {
max_bytes: TELEMETRY_PREVIEW_MAX_BYTES,
max_lines: TELEMETRY_PREVIEW_MAX_LINES,
truncation_notice: TELEMETRY_PREVIEW_TRUNCATION_NOTICE,
};
let content = (0..(TELEMETRY_PREVIEW_MAX_LINES + 5))
.map(|idx| format!("line {idx}"))
.collect::<Vec<_>>()
.join("\n");
let preview = truncate_with_config(&content, config);
let lines: Vec<&str> = preview.lines().collect();
assert!(lines.len() <= TELEMETRY_PREVIEW_MAX_LINES + 1);
assert_eq!(lines.last(), Some(&TELEMETRY_PREVIEW_TRUNCATION_NOTICE));
}
}

View File

@@ -20,7 +20,7 @@ pub mod config_edit;
pub mod config_loader;
pub mod config_profile;
pub mod config_types;
mod conversation_history;
mod context_manager;
pub mod custom_prompts;
mod environment_context;
pub mod error;

View File

@@ -1,5 +1,5 @@
use crate::codex::Session;
use crate::conversation_history::ConversationHistory;
use crate::context_manager::ContextManager;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
@@ -11,7 +11,7 @@ use tracing::warn;
pub(crate) async fn process_items(
processed_items: Vec<crate::codex::ProcessedResponseItem>,
is_review_mode: bool,
review_thread_history: &mut ConversationHistory,
review_thread_history: &mut ContextManager,
sess: &Session,
) -> (Vec<ResponseInputItem>, Vec<ResponseItem>) {
let mut items_to_record_in_conversation_history = Vec::<ResponseItem>::new();

View File

@@ -3,7 +3,7 @@
use codex_protocol::models::ResponseItem;
use crate::codex::SessionConfiguration;
use crate::conversation_history::ConversationHistory;
use crate::context_manager::ContextManager;
use crate::protocol::RateLimitSnapshot;
use crate::protocol::TokenUsage;
use crate::protocol::TokenUsageInfo;
@@ -11,7 +11,7 @@ use crate::protocol::TokenUsageInfo;
/// Persistent, session-scoped state previously stored directly on `Session`.
pub(crate) struct SessionState {
pub(crate) session_configuration: SessionConfiguration,
pub(crate) history: ConversationHistory,
pub(crate) history: ContextManager,
pub(crate) latest_rate_limits: Option<RateLimitSnapshot>,
}
@@ -20,7 +20,7 @@ impl SessionState {
pub(crate) fn new(session_configuration: SessionConfiguration) -> Self {
Self {
session_configuration,
history: ConversationHistory::new(),
history: ContextManager::new(),
latest_rate_limits: None,
}
}
@@ -38,7 +38,7 @@ impl SessionState {
self.history.get_history()
}
pub(crate) fn clone_history(&self) -> ConversationHistory {
pub(crate) fn clone_history(&self) -> ContextManager {
self.history.clone()
}

View File

@@ -1,15 +1,11 @@
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::tools::TELEMETRY_PREVIEW_MAX_BYTES;
use crate::tools::TELEMETRY_PREVIEW_MAX_LINES;
use crate::tools::TELEMETRY_PREVIEW_TRUNCATION_NOTICE;
use crate::turn_diff_tracker::TurnDiffTracker;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ShellToolCallParams;
use codex_protocol::protocol::FileChange;
use codex_utils_string::take_bytes_at_char_boundary;
use mcp_types::CallToolResult;
use std::borrow::Cow;
use std::collections::HashMap;
@@ -76,7 +72,7 @@ pub enum ToolOutput {
impl ToolOutput {
pub fn log_preview(&self) -> String {
match self {
ToolOutput::Function { content, .. } => telemetry_preview(content),
ToolOutput::Function { content, .. } => content.clone(),
ToolOutput::Mcp { result } => format!("{result:?}"),
}
}
@@ -111,46 +107,6 @@ impl ToolOutput {
}
}
fn telemetry_preview(content: &str) -> String {
let truncated_slice = take_bytes_at_char_boundary(content, TELEMETRY_PREVIEW_MAX_BYTES);
let truncated_by_bytes = truncated_slice.len() < content.len();
let mut preview = String::new();
let mut lines_iter = truncated_slice.lines();
for idx in 0..TELEMETRY_PREVIEW_MAX_LINES {
match lines_iter.next() {
Some(line) => {
if idx > 0 {
preview.push('\n');
}
preview.push_str(line);
}
None => break,
}
}
let truncated_by_lines = lines_iter.next().is_some();
if !truncated_by_bytes && !truncated_by_lines {
return content.to_string();
}
if preview.len() < truncated_slice.len()
&& truncated_slice
.as_bytes()
.get(preview.len())
.is_some_and(|byte| *byte == b'\n')
{
preview.push('\n');
}
if !preview.is_empty() && !preview.ends_with('\n') {
preview.push('\n');
}
preview.push_str(TELEMETRY_PREVIEW_TRUNCATION_NOTICE);
preview
}
#[cfg(test)]
mod tests {
use super::*;
@@ -196,38 +152,6 @@ mod tests {
other => panic!("expected FunctionCallOutput, got {other:?}"),
}
}
#[test]
fn telemetry_preview_returns_original_within_limits() {
let content = "short output";
assert_eq!(telemetry_preview(content), content);
}
#[test]
fn telemetry_preview_truncates_by_bytes() {
let content = "x".repeat(TELEMETRY_PREVIEW_MAX_BYTES + 8);
let preview = telemetry_preview(&content);
assert!(preview.contains(TELEMETRY_PREVIEW_TRUNCATION_NOTICE));
assert!(
preview.len()
<= TELEMETRY_PREVIEW_MAX_BYTES + TELEMETRY_PREVIEW_TRUNCATION_NOTICE.len() + 1
);
}
#[test]
fn telemetry_preview_truncates_by_lines() {
let content = (0..(TELEMETRY_PREVIEW_MAX_LINES + 5))
.map(|idx| format!("line {idx}"))
.collect::<Vec<_>>()
.join("\n");
let preview = telemetry_preview(&content);
let lines: Vec<&str> = preview.lines().collect();
assert!(lines.len() <= TELEMETRY_PREVIEW_MAX_LINES + 1);
assert_eq!(lines.last(), Some(&TELEMETRY_PREVIEW_TRUNCATION_NOTICE));
}
}
#[derive(Clone, Debug)]

View File

@@ -22,12 +22,6 @@ pub(crate) const MODEL_FORMAT_HEAD_LINES: usize = MODEL_FORMAT_MAX_LINES / 2;
pub(crate) const MODEL_FORMAT_TAIL_LINES: usize = MODEL_FORMAT_MAX_LINES - MODEL_FORMAT_HEAD_LINES; // 128
pub(crate) const MODEL_FORMAT_HEAD_BYTES: usize = MODEL_FORMAT_MAX_BYTES / 2;
// Telemetry preview limits: keep log events smaller than model budgets.
pub(crate) const TELEMETRY_PREVIEW_MAX_BYTES: usize = 2 * 1024; // 2 KiB
pub(crate) const TELEMETRY_PREVIEW_MAX_LINES: usize = 64; // lines
pub(crate) const TELEMETRY_PREVIEW_TRUNCATION_NOTICE: &str =
"[... telemetry preview truncated ...]";
/// Format the combined exec output for sending back to the model.
/// Includes exit code and duration metadata; truncates large bodies safely.
pub fn format_exec_output_for_model(exec_output: &ExecToolCallOutput) -> String {