Compare commits

...

2 Commits

Author SHA1 Message Date
jif-oai
03c5bfdd28 mvoes 2026-04-29 17:17:19 +01:00
jif-oai
63402cbc3b feat: codex journal 2026-04-29 13:09:30 +01:00
47 changed files with 3939 additions and 1023 deletions

19
codex-rs/Cargo.lock generated
View File

@@ -2374,6 +2374,7 @@ dependencies = [
"codex-feedback",
"codex-git-utils",
"codex-hooks",
"codex-journal",
"codex-login",
"codex-mcp",
"codex-memories-read",
@@ -2815,6 +2816,23 @@ dependencies = [
"tempfile",
]
[[package]]
name = "codex-journal"
version = "0.0.0"
dependencies = [
"base64 0.22.1",
"codex-protocol",
"codex-utils-cache",
"codex-utils-output-truncation",
"image",
"indexmap 2.13.0",
"pretty_assertions",
"serde_json",
"tempfile",
"thiserror 2.0.18",
"tracing",
]
[[package]]
name = "codex-keyring-store"
version = "0.0.0"
@@ -3484,6 +3502,7 @@ dependencies = [
"async-trait",
"chrono",
"codex-git-utils",
"codex-journal",
"codex-protocol",
"codex-rollout",
"codex-state",

View File

@@ -98,6 +98,7 @@ members = [
"terminal-detection",
"test-binary-support",
"thread-store",
"journal",
"uds",
"codex-experimental-api-macros",
"plugin",
@@ -190,6 +191,7 @@ codex-stdio-to-uds = { path = "stdio-to-uds" }
codex-terminal-detection = { path = "terminal-detection" }
codex-test-binary-support = { path = "test-binary-support" }
codex-thread-store = { path = "thread-store" }
codex-journal = { path = "journal" }
codex-tools = { path = "tools" }
codex-tui = { path = "tui" }
codex-uds = { path = "uds" }

View File

@@ -207,7 +207,7 @@ pub struct ConfigToml {
/// Ordered list of fallback filenames to look for when AGENTS.md is missing.
pub project_doc_fallback_filenames: Option<Vec<String>>,
/// Token budget applied when storing tool/function outputs in the context manager.
/// Token budget applied when storing tool/function outputs in thread history.
pub tool_output_token_limit: Option<usize>,
/// Maximum poll window for background terminal output (`write_stdin`), in milliseconds.

View File

@@ -60,6 +60,7 @@ codex-sandboxing = { workspace = true }
codex-state = { workspace = true }
codex-terminal-detection = { workspace = true }
codex-thread-store = { workspace = true }
codex-journal = { workspace = true }
codex-tools = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-cache = { workspace = true }

View File

@@ -3885,7 +3885,7 @@
"type": "boolean"
},
"tool_output_token_limit": {
"description": "Token budget applied when storing tool/function outputs in the context manager.",
"description": "Token budget applied when storing tool/function outputs in thread history.",
"format": "uint",
"minimum": 0.0,
"type": "integer"
@@ -3946,4 +3946,4 @@
},
"title": "ConfigToml",
"type": "object"
}
}

View File

@@ -7,8 +7,10 @@ use crate::config::Config;
use crate::config::ConfigBuilder;
use crate::context::ContextualUserFragment;
use crate::context::SubagentNotification;
use crate::state::history as session_history;
use assert_matches::assert_matches;
use codex_features::Feature;
use codex_journal::Journal;
use codex_login::CodexAuth;
use codex_protocol::AgentPath;
use codex_protocol::config_types::ModeKind;
@@ -34,6 +36,16 @@ use tokio::time::sleep;
use tokio::time::timeout;
use toml::Value as TomlValue;
trait JournalTestExt {
fn raw_items(&self) -> Vec<ResponseItem>;
}
impl JournalTestExt for Journal {
fn raw_items(&self) -> Vec<ResponseItem> {
session_history::raw_items(self)
}
}
async fn test_config_with_cli_overrides(
cli_overrides: Vec<(String, TomlValue)>,
) -> (TempDir, Config) {
@@ -796,8 +808,9 @@ async fn spawn_agent_fork_flushes_parent_rollout_before_loading_history() {
.await
.expect("child thread should be registered");
let history = child_thread.codex.session.clone_history().await;
let history_items = history.raw_items();
assert!(
history_contains_text(history.raw_items(), "unflushed final answer"),
history_contains_text(&history_items, "unflushed final answer"),
"forked child history should include unflushed assistant final answers after flushing the parent rollout"
);
@@ -906,21 +919,22 @@ async fn spawn_agent_fork_last_n_turns_keeps_only_recent_turns() {
.await
.expect("child thread should be registered");
let history = child_thread.codex.session.clone_history().await;
let history_items = history.raw_items();
assert!(
!history_contains_text(history.raw_items(), "old parent context"),
!history_contains_text(&history_items, "old parent context"),
"forked child history should drop parent context outside the requested last-N turn window"
);
assert!(
!history_contains_text(history.raw_items(), "queued message"),
!history_contains_text(&history_items, "queued message"),
"forked child history should drop queued inter-agent messages outside the requested last-N turn window"
);
assert!(
!history_contains_text(history.raw_items(), "triggered context"),
!history_contains_text(&history_items, "triggered context"),
"forked child history should filter assistant inter-agent messages even when they fall inside the requested last-N turn window"
);
assert!(
history_contains_text(history.raw_items(), "current parent task"),
history_contains_text(&history_items, "current parent task"),
"forked child history should keep the parent user message from the requested last-N turn window"
);

View File

@@ -9,6 +9,7 @@ use crate::compact::content_items_to_text;
use crate::event_mapping::is_contextual_user_message_content;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use crate::state::history as session_history;
use codex_login::default_client::build_reqwest_client;
use codex_protocol::models::MessagePhase;
use codex_protocol::models::ResponseItem;
@@ -224,7 +225,8 @@ async fn build_arc_monitor_request(
protection_client_callsite: &'static str,
) -> ArcMonitorRequest {
let history = sess.clone_history().await;
let mut messages = build_arc_monitor_messages(history.raw_items());
let history_items = session_history::raw_items(&history);
let mut messages = build_arc_monitor_messages(&history_items);
if messages.is_empty() {
messages.push(build_arc_monitor_message(
"user",

View File

@@ -9,6 +9,7 @@ use crate::session::PreviousTurnSettings;
use crate::session::session::Session;
use crate::session::turn::get_last_assistant_message_from_turn;
use crate::session::turn_context::TurnContext;
use crate::state::history as session_history;
use crate::util::backoff;
use codex_analytics::CodexCompactionEvent;
use codex_analytics::CompactionImplementation;
@@ -160,7 +161,8 @@ async fn run_compact_task_inner_impl(
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let mut history = sess.clone_history().await;
history.record_items(
session_history::record_items(
&mut history,
&[initial_input_for_turn.into()],
turn_context.truncation_policy,
);
@@ -176,9 +178,8 @@ async fn run_compact_task_inner_impl(
loop {
// Clone is required because of the loop
let turn_input = history
.clone()
.for_prompt(&turn_context.model_info.input_modalities);
let turn_input =
session_history::for_prompt(&history, &turn_context.model_info.input_modalities);
let turn_input_len = turn_input.len();
let prompt = Prompt {
input: turn_input,
@@ -218,7 +219,7 @@ async fn run_compact_task_inner_impl(
error!(
"Context window exceeded while compacting; removing oldest history item. Error: {e}"
);
history.remove_first_item();
session_history::remove_first_item(&mut history);
truncated_count += 1;
retries = 0;
continue;
@@ -250,10 +251,11 @@ async fn run_compact_task_inner_impl(
}
let history_snapshot = sess.clone_history().await;
let history_items = history_snapshot.raw_items();
let summary_suffix = get_last_assistant_message_from_turn(history_items).unwrap_or_default();
let history_items = session_history::raw_items(&history_snapshot);
let summary_suffix =
get_last_assistant_message_from_turn(history_items.as_slice()).unwrap_or_default();
let summary_text = format!("{SUMMARY_PREFIX}\n{summary_suffix}");
let user_messages = collect_user_messages(history_items);
let user_messages = collect_user_messages(history_items.as_slice());
let mut new_history = build_compacted_history(Vec::new(), &user_messages, &summary_text);

View File

@@ -6,17 +6,18 @@ use crate::compact::CompactionAnalyticsAttempt;
use crate::compact::InitialContextInjection;
use crate::compact::compaction_status_from_result;
use crate::compact::insert_initial_context_before_last_real_user_or_summary;
use crate::context_manager::ContextManager;
use crate::context_manager::TotalTokenUsageBreakdown;
use crate::context_manager::estimate_response_item_model_visible_bytes;
use crate::context_manager::is_codex_generated_item;
use crate::session::session::Session;
use crate::session::turn::built_tools;
use crate::session::turn_context::TurnContext;
use crate::state::TotalTokenUsageBreakdown;
use crate::state::history as session_history;
use codex_analytics::CompactionImplementation;
use codex_analytics::CompactionPhase;
use codex_analytics::CompactionReason;
use codex_analytics::CompactionTrigger;
use codex_journal::Journal;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::items::ContextCompactionItem;
@@ -144,8 +145,9 @@ async fn run_remote_compact_task_inner_impl(
// This is the history selected for remote compaction, after any trimming required to fit the
// compact endpoint. The checkpoint below records it separately from the next sampling request,
// whose prompt will repeat current developer/context prefix items.
let trace_input_history = history.raw_items().to_vec();
let prompt_input = history.for_prompt(&turn_context.model_info.input_modalities);
let trace_input_history = session_history::raw_items(&history);
let prompt_input =
session_history::for_prompt(&history, &turn_context.model_info.input_modalities);
let tool_router = built_tools(
sess.as_ref(),
turn_context.as_ref(),
@@ -325,7 +327,7 @@ fn log_remote_compact_failure(
}
fn trim_function_call_history_to_fit_context_window(
history: &mut ContextManager,
history: &mut Journal,
turn_context: &TurnContext,
base_instructions: &BaseInstructions,
) -> usize {
@@ -334,17 +336,17 @@ fn trim_function_call_history_to_fit_context_window(
return deleted_items;
};
while history
.estimate_token_count_with_base_instructions(base_instructions)
while session_history::estimate_token_count_with_base_instructions(history, base_instructions)
.is_some_and(|estimated_tokens| estimated_tokens > context_window)
{
let Some(last_item) = history.raw_items().last() else {
let history_items = session_history::raw_items(history);
let Some(last_item) = history_items.last() else {
break;
};
if !is_codex_generated_item(last_item) {
break;
}
if !history.remove_last_item() {
if !session_history::remove_last_item(history) {
break;
}
deleted_items += 1;

View File

@@ -559,7 +559,7 @@ pub struct Config {
/// Additional filenames to try when looking for project-level docs.
pub project_doc_fallback_filenames: Vec<String>,
/// Token budget applied when storing tool/function outputs in the context manager.
/// Token budget applied when storing tool/function outputs in thread history.
pub tool_output_token_limit: Option<usize>,
/// Maximum number of agent threads that can be open concurrently.

View File

@@ -1,726 +0,0 @@
use crate::context_manager::normalize;
use crate::event_mapping::has_non_contextual_dev_message_content;
use crate::event_mapping::is_contextual_dev_message_content;
use crate::event_mapping::is_contextual_user_message_content;
use crate::session::turn_context::TurnContext;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ImageDetail;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::InputModality;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TokenUsageInfo;
use codex_protocol::protocol::TurnContextItem;
use codex_utils_cache::BlockingLruCache;
use codex_utils_cache::sha1_digest;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::approx_bytes_for_tokens;
use codex_utils_output_truncation::approx_token_count;
use codex_utils_output_truncation::approx_tokens_from_byte_count_i64;
use codex_utils_output_truncation::truncate_function_output_items_with_policy;
use codex_utils_output_truncation::truncate_text;
use std::num::NonZeroUsize;
use std::ops::Deref;
use std::sync::LazyLock;
/// Transcript of thread history
#[derive(Debug, Clone, Default)]
pub(crate) struct ContextManager {
/// The oldest items are at the beginning of the vector.
items: Vec<ResponseItem>,
/// Bumped whenever history is rewritten, such as compaction or rollback.
history_version: u64,
token_info: Option<TokenUsageInfo>,
/// Reference context snapshot used for diffing and producing model-visible
/// settings update items.
///
/// This is the baseline for the next regular model turn, and may already
/// match the current turn after context updates are persisted.
///
/// When this is `None`, settings diffing treats the next turn as having no
/// baseline and emits a full reinjection of context state. Rollback may
/// also clear this when it trims a mixed initial-context developer bundle
/// whose non-diff fragments no longer exist in the surviving history.
reference_context_item: Option<TurnContextItem>,
}
#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct TotalTokenUsageBreakdown {
pub last_api_response_total_tokens: i64,
pub all_history_items_model_visible_bytes: i64,
pub estimated_tokens_of_items_added_since_last_successful_api_response: i64,
pub estimated_bytes_of_items_added_since_last_successful_api_response: i64,
}
impl ContextManager {
pub(crate) fn new() -> Self {
Self {
items: Vec::new(),
history_version: 0,
token_info: TokenUsageInfo::new_or_append(
&None, &None, /*model_context_window*/ None,
),
reference_context_item: None,
}
}
pub(crate) fn token_info(&self) -> Option<TokenUsageInfo> {
self.token_info.clone()
}
pub(crate) fn set_token_info(&mut self, info: Option<TokenUsageInfo>) {
self.token_info = info;
}
pub(crate) fn set_reference_context_item(&mut self, item: Option<TurnContextItem>) {
self.reference_context_item = item;
}
pub(crate) fn reference_context_item(&self) -> Option<TurnContextItem> {
self.reference_context_item.clone()
}
pub(crate) fn set_token_usage_full(&mut self, context_window: i64) {
match &mut self.token_info {
Some(info) => info.fill_to_context_window(context_window),
None => {
self.token_info = Some(TokenUsageInfo::full_context_window(context_window));
}
}
}
/// `items` is ordered from oldest to newest.
pub(crate) fn record_items<I>(&mut self, items: I, policy: TruncationPolicy)
where
I: IntoIterator,
I::Item: std::ops::Deref<Target = ResponseItem>,
{
for item in items {
let item_ref = item.deref();
if !is_api_message(item_ref) {
continue;
}
let processed = self.process_item(item_ref, policy);
self.items.push(processed);
}
}
/// Returns the history prepared for sending to the model. This applies a proper
/// normalization and drops un-suited items. When `input_modalities` does not
/// include `InputModality::Image`, images are stripped from messages and tool
/// outputs.
pub(crate) fn for_prompt(mut self, input_modalities: &[InputModality]) -> Vec<ResponseItem> {
self.normalize_history(input_modalities);
self.items
}
/// Returns raw items in the history.
pub(crate) fn raw_items(&self) -> &[ResponseItem] {
&self.items
}
pub(crate) fn history_version(&self) -> u64 {
self.history_version
}
// Estimate token usage using byte-based heuristics from the truncation helpers.
// This is a coarse lower bound, not a tokenizer-accurate count.
pub(crate) fn estimate_token_count(&self, turn_context: &TurnContext) -> Option<i64> {
let model_info = &turn_context.model_info;
let personality = turn_context.personality.or(turn_context.config.personality);
let base_instructions = BaseInstructions {
text: model_info.get_model_instructions(personality),
};
self.estimate_token_count_with_base_instructions(&base_instructions)
}
pub(crate) fn estimate_token_count_with_base_instructions(
&self,
base_instructions: &BaseInstructions,
) -> Option<i64> {
let base_tokens =
i64::try_from(approx_token_count(&base_instructions.text)).unwrap_or(i64::MAX);
let items_tokens = self
.items
.iter()
.map(estimate_item_token_count)
.fold(0i64, i64::saturating_add);
Some(base_tokens.saturating_add(items_tokens))
}
pub(crate) fn remove_first_item(&mut self) {
if !self.items.is_empty() {
// Remove the oldest item (front of the list). Items are ordered from
// oldest → newest, so index 0 is the first entry recorded.
let removed = self.items.remove(0);
// If the removed item participates in a call/output pair, also remove
// its corresponding counterpart to keep the invariants intact without
// running a full normalization pass.
normalize::remove_corresponding_for(&mut self.items, &removed);
}
}
pub(crate) fn remove_last_item(&mut self) -> bool {
if let Some(removed) = self.items.pop() {
normalize::remove_corresponding_for(&mut self.items, &removed);
self.history_version = self.history_version.saturating_add(1);
true
} else {
false
}
}
pub(crate) fn replace(&mut self, items: Vec<ResponseItem>) {
self.items = items;
self.history_version = self.history_version.saturating_add(1);
}
/// Replace image content in the last turn if it originated from a tool output.
/// Returns true when a tool image was replaced, false otherwise.
pub(crate) fn replace_last_turn_images(&mut self, placeholder: &str) -> bool {
let Some(index) = self.items.iter().rposition(|item| {
matches!(item, ResponseItem::FunctionCallOutput { .. }) || is_user_turn_boundary(item)
}) else {
return false;
};
match &mut self.items[index] {
ResponseItem::FunctionCallOutput { output, .. } => {
let Some(content_items) = output.content_items_mut() else {
return false;
};
let mut replaced = false;
let placeholder = placeholder.to_string();
for item in content_items.iter_mut() {
if matches!(item, FunctionCallOutputContentItem::InputImage { .. }) {
*item = FunctionCallOutputContentItem::InputText {
text: placeholder.clone(),
};
replaced = true;
}
}
if replaced {
self.history_version = self.history_version.saturating_add(1);
}
replaced
}
ResponseItem::Message { .. } => false,
_ => false,
}
}
/// Drop the last `num_turns` instruction turns from this history.
///
/// Instruction turns are history messages that should behave like a new prompt boundary:
/// ordinary user messages and structured assistant inter-agent instructions.
///
/// This mirrors thread-rollback semantics:
/// - `num_turns == 0` is a no-op
/// - if there are no user turns, this is a no-op
/// - if `num_turns` exceeds the number of user turns, all user turns are dropped while
/// preserving any items that occurred before the first user message.
///
/// If rollback trims a pre-turn developer message that mixes contextual fragments with
/// persistent developer text from `build_initial_context`, this also clears
/// `reference_context_item`. The surviving history no longer contains the full bundle that
/// established the prior baseline, so future turns must fall back to full reinjection instead
/// of diffing against stale state.
pub(crate) fn drop_last_n_user_turns(&mut self, num_turns: u32) {
if num_turns == 0 {
return;
}
let snapshot = self.items.clone();
let user_positions = user_message_positions(&snapshot);
let Some(&first_instruction_turn_idx) = user_positions.first() else {
self.replace(snapshot);
return;
};
let n_from_end = usize::try_from(num_turns).unwrap_or(usize::MAX);
let mut cut_idx = if n_from_end >= user_positions.len() {
first_instruction_turn_idx
} else {
user_positions[user_positions.len() - n_from_end]
};
cut_idx =
self.trim_pre_turn_context_updates(&snapshot, first_instruction_turn_idx, cut_idx);
self.replace(snapshot[..cut_idx].to_vec());
}
pub(crate) fn update_token_info(
&mut self,
usage: &TokenUsage,
model_context_window: Option<i64>,
) {
self.token_info = TokenUsageInfo::new_or_append(
&self.token_info,
&Some(usage.clone()),
model_context_window,
);
}
fn get_non_last_reasoning_items_tokens(&self) -> i64 {
// Get reasoning items excluding all the ones after the last instruction boundary.
let Some(last_user_index) = self.items.iter().rposition(is_user_turn_boundary) else {
return 0;
};
self.items
.iter()
.take(last_user_index)
.filter(|item| {
matches!(
item,
ResponseItem::Reasoning {
encrypted_content: Some(_),
..
}
)
})
.map(estimate_item_token_count)
.fold(0i64, i64::saturating_add)
}
// These are local items added after the most recent model-emitted item.
// They are not reflected in `last_token_usage.total_tokens`.
fn items_after_last_model_generated_item(&self) -> &[ResponseItem] {
let start = self
.items
.iter()
.rposition(is_model_generated_item)
.map_or(self.items.len(), |index| index.saturating_add(1));
&self.items[start..]
}
/// When true, the server already accounted for past reasoning tokens and
/// the client should not re-estimate them.
pub(crate) fn get_total_token_usage(&self, server_reasoning_included: bool) -> i64 {
let last_tokens = self
.token_info
.as_ref()
.map(|info| info.last_token_usage.total_tokens)
.unwrap_or(0);
let items_after_last_model_generated_tokens = self
.items_after_last_model_generated_item()
.iter()
.map(estimate_item_token_count)
.fold(0i64, i64::saturating_add);
if server_reasoning_included {
last_tokens.saturating_add(items_after_last_model_generated_tokens)
} else {
last_tokens
.saturating_add(self.get_non_last_reasoning_items_tokens())
.saturating_add(items_after_last_model_generated_tokens)
}
}
pub(crate) fn get_total_token_usage_breakdown(&self) -> TotalTokenUsageBreakdown {
let last_usage = self
.token_info
.as_ref()
.map(|info| info.last_token_usage.clone())
.unwrap_or_default();
let items_after_last_model_generated = self.items_after_last_model_generated_item();
TotalTokenUsageBreakdown {
last_api_response_total_tokens: last_usage.total_tokens,
all_history_items_model_visible_bytes: self
.items
.iter()
.map(estimate_response_item_model_visible_bytes)
.fold(0i64, i64::saturating_add),
estimated_tokens_of_items_added_since_last_successful_api_response:
items_after_last_model_generated
.iter()
.map(estimate_item_token_count)
.fold(0i64, i64::saturating_add),
estimated_bytes_of_items_added_since_last_successful_api_response:
items_after_last_model_generated
.iter()
.map(estimate_response_item_model_visible_bytes)
.fold(0i64, i64::saturating_add),
}
}
/// This function enforces a couple of invariants on the in-memory history:
/// 1. every call (function/custom) has a corresponding output entry
/// 2. every output has a corresponding call entry
/// 3. when images are unsupported, image content is stripped from messages and tool outputs
fn normalize_history(&mut self, input_modalities: &[InputModality]) {
// all function/tool calls must have a corresponding output
normalize::ensure_call_outputs_present(&mut self.items);
// all outputs must have a corresponding function/tool call
normalize::remove_orphan_outputs(&mut self.items);
// strip images when model does not support them
normalize::strip_images_when_unsupported(input_modalities, &mut self.items);
}
fn process_item(&self, item: &ResponseItem, policy: TruncationPolicy) -> ResponseItem {
let policy_with_serialization_budget = policy * 1.2;
match item {
ResponseItem::FunctionCallOutput { call_id, output } => {
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: truncate_function_output_payload(
output,
policy_with_serialization_budget,
),
}
}
ResponseItem::CustomToolCallOutput {
call_id,
name,
output,
} => ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
name: name.clone(),
output: truncate_function_output_payload(output, policy_with_serialization_budget),
},
ResponseItem::Message { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::FunctionCall { .. }
| ResponseItem::ToolSearchCall { .. }
| ResponseItem::ToolSearchOutput { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::ImageGenerationCall { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::Compaction { .. }
| ResponseItem::Other => item.clone(),
}
}
/// Walk backward from a rollback cut and trim contiguous pre-turn context-update items.
///
/// Returns the adjusted cut index after removing contextual developer/user items immediately
/// above the rolled-back turn boundary.
///
/// `first_instruction_turn_idx` is the earliest rollback-eligible instruction-turn boundary
/// in `snapshot`; the trim walk never crosses it so any session-prefix items that predate the
/// first real turn survive rollback.
///
/// `cut_idx` is the tentative slice boundary after dropping the requested number of
/// instruction turns, before stripping contextual pre-turn items that sit immediately above
/// that boundary.
///
/// If any trimmed developer message was a mixed `build_initial_context` bundle containing both
/// rollback-trimmable contextual fragments and persistent developer text, this also clears the
/// stored `reference_context_item` baseline so the next real turn falls back to full
/// reinjection.
fn trim_pre_turn_context_updates(
&mut self,
snapshot: &[ResponseItem],
first_instruction_turn_idx: usize,
mut cut_idx: usize,
) -> usize {
while cut_idx > first_instruction_turn_idx {
match &snapshot[cut_idx - 1] {
ResponseItem::Message { role, content, .. }
if role == "developer" && is_contextual_dev_message_content(content) =>
{
if has_non_contextual_dev_message_content(content) {
// Mixed `build_initial_context` bundles are not reconstructible from
// steady-state diffs once trimmed, so the next real turn must fully
// reinject context instead of diffing against a stale baseline.
self.reference_context_item = None;
}
cut_idx -= 1;
}
ResponseItem::Message { role, content, .. }
if role == "user" && is_contextual_user_message_content(content) =>
{
cut_idx -= 1;
}
_ => break,
}
}
cut_idx
}
}
fn truncate_function_output_payload(
output: &FunctionCallOutputPayload,
policy: TruncationPolicy,
) -> FunctionCallOutputPayload {
let body = match &output.body {
FunctionCallOutputBody::Text(content) => {
FunctionCallOutputBody::Text(truncate_text(content, policy))
}
FunctionCallOutputBody::ContentItems(items) => FunctionCallOutputBody::ContentItems(
truncate_function_output_items_with_policy(items, policy),
),
};
FunctionCallOutputPayload {
body,
success: output.success,
}
}
/// API messages include every non-system item (user/assistant messages, reasoning,
/// tool calls, tool outputs, shell calls, web-search calls, and image-generation
/// calls).
fn is_api_message(message: &ResponseItem) -> bool {
match message {
ResponseItem::Message { role, .. } => role.as_str() != "system",
ResponseItem::FunctionCallOutput { .. }
| ResponseItem::FunctionCall { .. }
| ResponseItem::ToolSearchCall { .. }
| ResponseItem::ToolSearchOutput { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::CustomToolCallOutput { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::ImageGenerationCall { .. }
| ResponseItem::Compaction { .. } => true,
ResponseItem::Other => false,
}
}
fn estimate_reasoning_length(encoded_len: usize) -> usize {
encoded_len
.saturating_mul(3)
.checked_div(4)
.unwrap_or(0)
.saturating_sub(650)
}
fn estimate_item_token_count(item: &ResponseItem) -> i64 {
let model_visible_bytes = estimate_response_item_model_visible_bytes(item);
approx_tokens_from_byte_count_i64(model_visible_bytes)
}
/// Approximate model-visible byte cost for one image input.
///
/// The estimator later converts bytes to tokens using a 4-bytes/token heuristic
/// with ceiling division, so 7,373 bytes maps to approximately 1,844 tokens.
const RESIZED_IMAGE_BYTES_ESTIMATE: i64 = 7373;
// See https://platform.openai.com/docs/guides/images-vision#calculating-costs.
// Use a direct 32px patch count only for `detail: "original"`;
// all other image inputs continue to use `RESIZED_IMAGE_BYTES_ESTIMATE`.
const ORIGINAL_IMAGE_PATCH_SIZE: u32 = 32;
// See https://platform.openai.com/docs/guides/images-vision#model-sizing-behavior.
// Keep this hard-coded for now; move it into model capabilities if the patch
// budget starts changing often across model releases.
const ORIGINAL_IMAGE_MAX_PATCHES: usize = 10_000;
const ORIGINAL_IMAGE_ESTIMATE_CACHE_SIZE: usize = 32;
static ORIGINAL_IMAGE_ESTIMATE_CACHE: LazyLock<BlockingLruCache<[u8; 20], Option<i64>>> =
LazyLock::new(|| {
BlockingLruCache::new(
NonZeroUsize::new(ORIGINAL_IMAGE_ESTIMATE_CACHE_SIZE).unwrap_or(NonZeroUsize::MIN),
)
});
pub(crate) fn estimate_response_item_model_visible_bytes(item: &ResponseItem) -> i64 {
match item {
ResponseItem::Reasoning {
encrypted_content: Some(content),
..
}
| ResponseItem::Compaction {
encrypted_content: content,
} => i64::try_from(estimate_reasoning_length(content.len())).unwrap_or(i64::MAX),
item => {
let raw = serde_json::to_string(item)
.map(|serialized| i64::try_from(serialized.len()).unwrap_or(i64::MAX))
.unwrap_or_default();
let (payload_bytes, replacement_bytes) = image_data_url_estimate_adjustment(item);
if payload_bytes == 0 || replacement_bytes == 0 {
raw
} else {
// Replace raw base64 payload bytes with a per-image estimate.
// We intentionally preserve the data URL prefix and JSON
// wrapper bytes already included in `raw`.
raw.saturating_sub(payload_bytes)
.saturating_add(replacement_bytes)
}
}
}
}
/// Returns the base64 payload byte length for inline image data URLs that are
/// eligible for token-estimation discounting.
///
/// We only discount payloads for `data:image/...;base64,...` URLs (case
/// insensitive markers) and leave everything else at raw serialized size.
fn parse_base64_image_data_url(url: &str) -> Option<&str> {
if !url
.get(.."data:".len())
.is_some_and(|prefix| prefix.eq_ignore_ascii_case("data:"))
{
return None;
}
let comma_index = url.find(',')?;
let metadata = &url[..comma_index];
let payload = &url[comma_index + 1..];
// Parse the media type and parameters without decoding. This keeps the
// estimator cheap while ensuring we only apply the fixed-cost image
// heuristic to image-typed base64 data URLs.
let metadata_without_scheme = &metadata["data:".len()..];
let mut metadata_parts = metadata_without_scheme.split(';');
let mime_type = metadata_parts.next().unwrap_or_default();
let has_base64_marker = metadata_parts.any(|part| part.eq_ignore_ascii_case("base64"));
if !mime_type
.get(.."image/".len())
.is_some_and(|prefix| prefix.eq_ignore_ascii_case("image/"))
{
return None;
}
if !has_base64_marker {
return None;
}
Some(payload)
}
fn estimate_original_image_bytes(image_url: &str) -> Option<i64> {
let key = sha1_digest(image_url.as_bytes());
ORIGINAL_IMAGE_ESTIMATE_CACHE.get_or_insert_with(key, || {
let payload = match parse_base64_image_data_url(image_url) {
Some(payload) => payload,
None => {
tracing::trace!("skipping original-detail estimate for non-base64 image data URL");
return None;
}
};
let bytes = match BASE64_STANDARD.decode(payload) {
Ok(bytes) => bytes,
Err(error) => {
tracing::trace!("failed to decode original-detail image payload: {error}");
return None;
}
};
let dynamic = match image::load_from_memory(&bytes) {
Ok(dynamic) => dynamic,
Err(error) => {
tracing::trace!("failed to decode original-detail image bytes: {error}");
return None;
}
};
let width = i64::from(dynamic.width());
let height = i64::from(dynamic.height());
let patch_size = i64::from(ORIGINAL_IMAGE_PATCH_SIZE);
let patches_wide = width.saturating_add(patch_size.saturating_sub(1)) / patch_size;
let patches_high = height.saturating_add(patch_size.saturating_sub(1)) / patch_size;
let patch_count = patches_wide.saturating_mul(patches_high);
let patch_count = usize::try_from(patch_count).unwrap_or(usize::MAX);
let patch_count = patch_count.min(ORIGINAL_IMAGE_MAX_PATCHES);
Some(i64::try_from(approx_bytes_for_tokens(patch_count)).unwrap_or(i64::MAX))
})
}
/// Scans one response item for discount-eligible inline image data URLs and
/// returns:
/// - total base64 payload bytes to subtract from raw serialized size
/// - total replacement byte estimate for those images
fn image_data_url_estimate_adjustment(item: &ResponseItem) -> (i64, i64) {
let mut payload_bytes = 0i64;
let mut replacement_bytes = 0i64;
let mut accumulate = |image_url: &str, detail: Option<ImageDetail>| {
if let Some(payload_len) = parse_base64_image_data_url(image_url).map(str::len) {
payload_bytes =
payload_bytes.saturating_add(i64::try_from(payload_len).unwrap_or(i64::MAX));
replacement_bytes = replacement_bytes.saturating_add(match detail {
Some(ImageDetail::Original) => {
estimate_original_image_bytes(image_url).unwrap_or(RESIZED_IMAGE_BYTES_ESTIMATE)
}
_ => RESIZED_IMAGE_BYTES_ESTIMATE,
});
}
};
match item {
ResponseItem::Message { content, .. } => {
for content_item in content {
if let ContentItem::InputImage { image_url, detail } = content_item {
accumulate(image_url, *detail);
}
}
}
ResponseItem::FunctionCallOutput { output, .. }
| ResponseItem::CustomToolCallOutput { output, .. } => {
if let FunctionCallOutputBody::ContentItems(items) = &output.body {
for content_item in items {
if let FunctionCallOutputContentItem::InputImage { image_url, detail } =
content_item
{
accumulate(image_url, *detail);
}
}
}
}
_ => {}
}
(payload_bytes, replacement_bytes)
}
fn is_model_generated_item(item: &ResponseItem) -> bool {
match item {
ResponseItem::Message { role, .. } => role == "assistant",
ResponseItem::Reasoning { .. }
| ResponseItem::FunctionCall { .. }
| ResponseItem::ToolSearchCall { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::ImageGenerationCall { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::Compaction { .. } => true,
ResponseItem::FunctionCallOutput { .. }
| ResponseItem::ToolSearchOutput { .. }
| ResponseItem::CustomToolCallOutput { .. }
| ResponseItem::Other => false,
}
}
pub(crate) fn is_codex_generated_item(item: &ResponseItem) -> bool {
matches!(
item,
ResponseItem::FunctionCallOutput { .. }
| ResponseItem::ToolSearchOutput { .. }
| ResponseItem::CustomToolCallOutput { .. }
) || matches!(item, ResponseItem::Message { role, .. } if role == "developer")
}
pub(crate) fn is_user_turn_boundary(item: &ResponseItem) -> bool {
let ResponseItem::Message { role, content, .. } = item else {
return false;
};
(role == "user" && !is_contextual_user_message_content(content))
|| (role == "assistant" && is_inter_agent_instruction_content(content))
}
fn is_inter_agent_instruction_content(content: &[ContentItem]) -> bool {
InterAgentCommunication::is_message_content(content)
}
fn user_message_positions(items: &[ResponseItem]) -> Vec<usize> {
let mut positions = Vec::new();
for (idx, item) in items.iter().enumerate() {
if is_user_turn_boundary(item) {
positions.push(idx);
}
}
positions
}
#[cfg(test)]
#[path = "history_tests.rs"]
mod tests;

View File

@@ -1,9 +1,5 @@
mod history;
mod normalize;
pub(crate) mod updates;
pub(crate) use history::ContextManager;
pub(crate) use history::TotalTokenUsageBreakdown;
pub(crate) use history::estimate_response_item_model_visible_bytes;
pub(crate) use history::is_codex_generated_item;
pub(crate) use history::is_user_turn_boundary;
pub(crate) use codex_journal::history::estimate_response_item_model_visible_bytes;
pub(crate) use codex_journal::history::is_codex_generated_item;
pub(crate) use codex_journal::history::is_user_turn_boundary;

View File

@@ -12,17 +12,43 @@ use crate::session::turn_context::TurnContext;
use crate::shell::Shell;
use codex_execpolicy::Policy;
use codex_features::Feature;
use codex_journal::Journal;
use codex_journal::JournalEntry;
use codex_journal::KeyFilter;
use codex_journal::MetadataEntryBuilder;
use codex_journal::PromptMessage;
use codex_journal::PromptRenderer;
use codex_protocol::config_types::Personality;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::protocol::TurnContextItem;
const PROMPT_BUNDLE_KEY_PREFIX: &str = "prompt";
pub(crate) const DEVELOPER_BUNDLE: &str = "developer";
pub(crate) const USAGE_HINT_BUNDLE: &str = "usage_hint";
pub(crate) const CONTEXTUAL_USER_BUNDLE: &str = "contextual_user";
pub(crate) const GUARDIAN_BUNDLE: &str = "guardian";
pub(crate) fn context_prompt_renderer() -> PromptRenderer {
PromptRenderer::new()
.group(KeyFilter::prefix([
PROMPT_BUNDLE_KEY_PREFIX,
DEVELOPER_BUNDLE,
]))
.group(KeyFilter::prefix([
PROMPT_BUNDLE_KEY_PREFIX,
USAGE_HINT_BUNDLE,
]))
.group(KeyFilter::prefix([
PROMPT_BUNDLE_KEY_PREFIX,
CONTEXTUAL_USER_BUNDLE,
]))
}
fn build_environment_update_item(
previous: Option<&TurnContextItem>,
next: &TurnContext,
shell: &Shell,
) -> Option<ResponseItem> {
) -> Option<String> {
if !next.config.include_environment_context {
return None;
}
@@ -34,9 +60,7 @@ fn build_environment_update_item(
return None;
}
Some(ContextualUserFragment::into(
EnvironmentContext::diff_from_turn_context_item(prev, &next_context),
))
Some(EnvironmentContext::diff_from_turn_context_item(prev, &next_context).render())
}
fn build_permissions_update_item(
@@ -175,64 +199,105 @@ pub(crate) fn build_model_instructions_update_item(
Some(ModelSwitchInstructions::new(model_instructions).render())
}
pub(crate) fn build_developer_update_item(text_sections: Vec<String>) -> Option<ResponseItem> {
build_text_message("developer", text_sections)
pub(crate) fn context_entry(
bundle: &str,
name: &str,
prompt_order: i64,
message: PromptMessage,
) -> Option<JournalEntry> {
context_entry_builder(bundle, name, message)
.prompt_order(prompt_order)
.build()
}
pub(crate) fn build_contextual_user_message(text_sections: Vec<String>) -> Option<ResponseItem> {
build_text_message("user", text_sections)
fn context_entry_builder(bundle: &str, name: &str, message: PromptMessage) -> MetadataEntryBuilder {
Journal::metadata_entry_builder([PROMPT_BUNDLE_KEY_PREFIX, bundle, name], message)
}
fn build_text_message(role: &str, text_sections: Vec<String>) -> Option<ResponseItem> {
if text_sections.is_empty() {
return None;
}
let content = text_sections
.into_iter()
.map(|text| ContentItem::InputText { text })
.collect();
Some(ResponseItem::Message {
id: None,
role: role.to_string(),
content,
phase: None,
})
}
pub(crate) fn build_settings_update_items(
pub(crate) fn build_settings_update_entries(
previous: Option<&TurnContextItem>,
previous_turn_settings: Option<&PreviousTurnSettings>,
next: &TurnContext,
shell: &Shell,
exec_policy: &Policy,
personality_feature_enabled: bool,
) -> Vec<ResponseItem> {
) -> Vec<JournalEntry> {
// TODO(ccunningham): build_settings_update_items still does not cover every
// model-visible item emitted by build_initial_context. Persist the remaining
// inputs or add explicit replay events so fork/resume can diff everything
// deterministically.
let contextual_user_message = build_environment_update_item(previous, next, shell);
let developer_update_sections = [
// Keep model-switch instructions first so model-specific guidance is read before
// any other context diffs on this turn.
build_model_instructions_update_item(previous_turn_settings, next),
build_permissions_update_item(previous, next, exec_policy),
build_collaboration_mode_update_item(previous, next),
build_realtime_update_item(previous, previous_turn_settings, next),
build_personality_update_item(previous, next, personality_feature_enabled),
]
.into_iter()
.flatten()
.collect();
let mut entries = Vec::with_capacity(6);
let mut items = Vec::with_capacity(2);
if let Some(developer_message) = build_developer_update_item(developer_update_sections) {
items.push(developer_message);
if let Some(item) =
build_model_instructions_update_item(previous_turn_settings, next).and_then(|text| {
// Keep model-switch instructions first so model-specific guidance is read before
// any other context diffs on this turn.
context_entry(
DEVELOPER_BUNDLE,
"model_switch",
10,
PromptMessage::developer_text(text),
)
})
{
entries.push(item);
}
if let Some(contextual_user_message) = contextual_user_message {
items.push(contextual_user_message);
if let Some(item) =
build_permissions_update_item(previous, next, exec_policy).and_then(|text| {
context_entry(
DEVELOPER_BUNDLE,
"permissions",
20,
PromptMessage::developer_text(text),
)
})
{
entries.push(item);
}
items
if let Some(item) = build_collaboration_mode_update_item(previous, next).and_then(|text| {
context_entry(
DEVELOPER_BUNDLE,
"collaboration_mode",
30,
PromptMessage::developer_text(text),
)
}) {
entries.push(item);
}
if let Some(item) =
build_realtime_update_item(previous, previous_turn_settings, next).and_then(|text| {
context_entry(
DEVELOPER_BUNDLE,
"realtime",
40,
PromptMessage::developer_text(text),
)
})
{
entries.push(item);
}
if let Some(item) = build_personality_update_item(previous, next, personality_feature_enabled)
.and_then(|text| {
context_entry(
DEVELOPER_BUNDLE,
"personality",
50,
PromptMessage::developer_text(text),
)
})
{
entries.push(item);
}
if let Some(item) = build_environment_update_item(previous, next, shell).and_then(|text| {
context_entry(
CONTEXTUAL_USER_BUNDLE,
"environment",
60,
PromptMessage::user_text(text),
)
}) {
entries.push(item);
}
entries
}

View File

@@ -10,6 +10,7 @@ use serde_json::Value;
use crate::compact::content_items_to_text;
use crate::event_mapping::is_contextual_user_message_content;
use crate::session::session::Session;
use crate::state::history as session_history;
use codex_utils_output_truncation::approx_bytes_for_tokens;
use codex_utils_output_truncation::approx_token_count;
use codex_utils_output_truncation::approx_tokens_from_byte_count;
@@ -93,9 +94,10 @@ pub(crate) async fn build_guardian_prompt_items(
mode: GuardianPromptMode,
) -> serde_json::Result<GuardianPromptItems> {
let history = session.clone_history().await;
let transcript_entries = collect_guardian_transcript_entries(history.raw_items());
let history_items = session_history::raw_items(&history);
let transcript_entries = collect_guardian_transcript_entries(&history_items);
let transcript_cursor = GuardianTranscriptCursor {
parent_history_version: history.history_version(),
parent_history_version: session.history_version().await,
transcript_entry_count: transcript_entries.len(),
};
let planned_action_json = format_guardian_action_pretty(&request)?;

View File

@@ -18,6 +18,7 @@ use crate::config::Config;
use crate::session::session::Session;
use crate::session::turn::build_prompt;
use crate::session::turn::built_tools;
use crate::state::history as session_history;
use crate::thread_manager::ThreadManager;
/// Build the model-visible `input` list for a single debug turn.
@@ -73,10 +74,9 @@ pub(crate) async fn build_prompt_input_from_session(
.await;
}
let prompt_input = sess
.clone_history()
.await
.for_prompt(&turn_context.model_info.input_modalities);
let history = sess.clone_history().await;
let prompt_input =
session_history::for_prompt(&history, &turn_context.model_info.input_modalities);
let router = built_tools(
sess,
turn_context.as_ref(),

View File

@@ -1,6 +1,7 @@
use crate::compact::content_items_to_text;
use crate::event_mapping::is_contextual_user_message_content;
use crate::session::session::Session;
use crate::state::history as session_history;
use chrono::Utc;
use codex_exec_server::LOCAL_FS;
use codex_git_utils::resolve_root_git_project_for_trust;
@@ -62,7 +63,8 @@ pub(crate) async fn build_realtime_startup_context(
let config = sess.get_config().await;
let cwd = config.cwd.clone();
let history = sess.clone_history().await;
let current_thread_section = build_current_thread_section(history.raw_items());
let history_items = session_history::raw_items(&history);
let current_thread_section = build_current_thread_section(&history_items);
let recent_threads = load_recent_threads(sess).await;
let recent_work_section = build_recent_work_section(&cwd, &recent_threads).await;
let workspace_section = build_workspace_section_with_user_root(&cwd, home_dir()).await;

View File

@@ -19,6 +19,7 @@ use crate::realtime_context::truncate_realtime_text_to_token_budget;
use crate::realtime_conversation::REALTIME_USER_TEXT_PREFIX;
use crate::realtime_conversation::prefix_realtime_v2_text;
use crate::session::spawn_review_thread;
use crate::state::history as session_history;
use codex_config::CloudRequirementsLoader;
use codex_config::LoaderOverrides;
use codex_config::loader::load_config_layers_state;
@@ -891,8 +892,8 @@ pub async fn shutdown(sess: &Arc<Session>, sub_id: String) -> bool {
sess.guardian_review_session.shutdown().await;
info!("Shutting down Codex instance");
let history = sess.clone_history().await;
let turn_count = history
.raw_items()
let history_items = session_history::raw_items(&history);
let turn_count = history_items
.iter()
.filter(|item| is_user_turn_boundary(item))
.count();

View File

@@ -59,6 +59,7 @@ use codex_features::Feature;
use codex_features::unstable_features_warning_event;
use codex_hooks::Hooks;
use codex_hooks::HooksConfig;
use codex_journal::PromptMessage;
use codex_login::AuthManager;
use codex_login::CodexAuth;
use codex_login::auth_env_telemetry::collect_auth_env_telemetry;
@@ -171,8 +172,7 @@ use crate::config::Constrained;
use crate::config::ConstraintResult;
use crate::config::StartedNetworkProxy;
use crate::config::resolve_web_search_mode_for_turn;
use crate::context_manager::ContextManager;
use crate::context_manager::TotalTokenUsageBreakdown;
use crate::state::TotalTokenUsageBreakdown;
use crate::thread_rollout_truncation::initial_history_has_prior_user_turns;
use codex_config::CONFIG_TOML_FILE;
use codex_config::types::McpServerConfig;
@@ -1084,7 +1084,7 @@ impl Session {
pub(crate) async fn get_total_token_usage_breakdown(&self) -> TotalTokenUsageBreakdown {
let state = self.state.lock().await;
state.history.get_total_token_usage_breakdown()
state.get_total_token_usage_breakdown()
}
pub(crate) async fn total_token_usage(&self) -> Option<TokenUsage> {
@@ -1108,7 +1108,7 @@ impl Session {
turn_context: &TurnContext,
) -> Option<i64> {
let state = self.state.lock().await;
state.history.estimate_token_count(turn_context)
state.estimate_token_count(turn_context)
}
pub(crate) async fn get_base_instructions(&self) -> BaseInstructions {
@@ -1430,14 +1430,20 @@ impl Session {
};
let shell = self.user_shell();
let exec_policy = self.services.exec_policy.current();
crate::context_manager::updates::build_settings_update_items(
let entries = crate::context_manager::updates::build_settings_update_entries(
reference_context_item,
previous_turn_settings.as_ref(),
current_context,
shell.as_ref(),
exec_policy.as_ref(),
self.features.enabled(Feature::Personality),
)
);
let resolved = match codex_journal::Journal::from_entries(entries).resolve() {
Ok(resolved) => resolved,
Err(error) => unreachable!("settings update entries should resolve: {error}"),
};
crate::context_manager::updates::context_prompt_renderer()
.render_metadata(resolved.metadata())
}
/// Persist the event to rollout and send it to clients.
@@ -2499,16 +2505,31 @@ impl Session {
}
}
#[expect(
clippy::await_holding_invalid_type,
reason = "MCP app context rendering reads through the session-owned manager guard"
)]
pub(crate) async fn build_initial_context(
&self,
turn_context: &TurnContext,
) -> Vec<ResponseItem> {
let mut developer_sections = Vec::<String>::with_capacity(8);
let mut contextual_user_sections = Vec::<String>::with_capacity(2);
let resolved = match codex_journal::Journal::from_entries(
self.build_initial_context_entries(turn_context).await,
)
.resolve()
{
Ok(resolved) => resolved,
Err(error) => unreachable!("initial context entries should resolve: {error}"),
};
crate::context_manager::updates::context_prompt_renderer()
.render_metadata(resolved.metadata())
}
#[expect(
clippy::await_holding_invalid_type,
reason = "MCP app context rendering reads through the session-owned manager guard"
)]
async fn build_initial_context_entries(
&self,
turn_context: &TurnContext,
) -> Vec<codex_journal::JournalEntry> {
let mut entries = Vec::<codex_journal::JournalEntry>::with_capacity(12);
let shell = self.user_shell();
let (
reference_context_item,
@@ -2531,26 +2552,38 @@ impl Session {
previous_turn_settings.as_ref(),
turn_context,
)
&& let Some(entry) = crate::context_manager::updates::context_entry(
crate::context_manager::updates::DEVELOPER_BUNDLE,
"model_switch",
10,
PromptMessage::developer_text(model_switch_message),
)
{
developer_sections.push(model_switch_message);
entries.push(entry);
}
if turn_context.config.include_permissions_instructions {
developer_sections.push(
PermissionsInstructions::from_permission_profile(
&turn_context.permission_profile,
turn_context.approval_policy.value(),
turn_context.config.approvals_reviewer,
self.services.exec_policy.current().as_ref(),
&turn_context.cwd,
turn_context
.features
.enabled(Feature::ExecPermissionApprovals),
turn_context
.features
.enabled(Feature::RequestPermissionsTool),
)
.render(),
);
let permissions_instructions = PermissionsInstructions::from_permission_profile(
&turn_context.permission_profile,
turn_context.approval_policy.value(),
turn_context.config.approvals_reviewer,
self.services.exec_policy.current().as_ref(),
&turn_context.cwd,
turn_context
.features
.enabled(Feature::ExecPermissionApprovals),
turn_context
.features
.enabled(Feature::RequestPermissionsTool),
)
.render();
if let Some(entry) = crate::context_manager::updates::context_entry(
crate::context_manager::updates::DEVELOPER_BUNDLE,
"permissions",
20,
PromptMessage::developer_text(permissions_instructions),
) {
entries.push(entry);
}
}
let separate_guardian_developer_message =
crate::guardian::is_guardian_reviewer_source(&session_source);
@@ -2559,29 +2592,52 @@ impl Session {
if !separate_guardian_developer_message
&& let Some(developer_instructions) = turn_context.developer_instructions.as_deref()
&& !developer_instructions.is_empty()
&& let Some(entry) = crate::context_manager::updates::context_entry(
crate::context_manager::updates::DEVELOPER_BUNDLE,
"developer_instructions",
30,
PromptMessage::developer_text(developer_instructions.to_string()),
)
{
developer_sections.push(developer_instructions.to_string());
entries.push(entry);
}
// Add developer instructions for memories.
if turn_context.features.enabled(Feature::MemoryTool)
&& turn_context.config.memories.use_memories
&& let Some(memory_prompt) =
build_memory_tool_developer_instructions(&turn_context.config.codex_home).await
&& let Some(entry) = crate::context_manager::updates::context_entry(
crate::context_manager::updates::DEVELOPER_BUNDLE,
"memory_tool",
40,
PromptMessage::developer_text(memory_prompt),
)
{
developer_sections.push(memory_prompt);
entries.push(entry);
}
// Add developer instructions from collaboration_mode if they exist and are non-empty
if let Some(collab_instructions) =
CollaborationModeInstructions::from_collaboration_mode(&collaboration_mode)
&& let Some(entry) = crate::context_manager::updates::context_entry(
crate::context_manager::updates::DEVELOPER_BUNDLE,
"collaboration_mode",
50,
PromptMessage::developer_text(collab_instructions.render()),
)
{
developer_sections.push(collab_instructions.render());
entries.push(entry);
}
if let Some(realtime_update) = crate::context_manager::updates::build_initial_realtime_item(
reference_context_item.as_ref(),
previous_turn_settings.as_ref(),
turn_context,
) && let Some(entry) = crate::context_manager::updates::context_entry(
crate::context_manager::updates::DEVELOPER_BUNDLE,
"realtime",
60,
PromptMessage::developer_text(realtime_update),
) {
developer_sections.push(realtime_update);
entries.push(entry);
}
if self.features.enabled(Feature::Personality)
&& let Some(personality) = turn_context.personality
@@ -2595,9 +2651,16 @@ impl Session {
&model_info,
personality,
)
&& let Some(entry) = crate::context_manager::updates::context_entry(
crate::context_manager::updates::DEVELOPER_BUNDLE,
"personality",
70,
PromptMessage::developer_text(
PersonalitySpecInstructions::new(personality_message).render(),
),
)
{
developer_sections
.push(PersonalitySpecInstructions::new(personality_message).render());
entries.push(entry);
}
}
if turn_context.config.include_apps_instructions && turn_context.apps_enabled() {
@@ -2610,8 +2673,14 @@ impl Session {
.await;
if let Some(apps_instructions) =
AppsInstructions::from_connectors(&accessible_and_enabled_connectors)
&& let Some(entry) = crate::context_manager::updates::context_entry(
crate::context_manager::updates::DEVELOPER_BUNDLE,
"apps",
80,
PromptMessage::developer_text(apps_instructions.render()),
)
{
developer_sections.push(apps_instructions.render());
entries.push(entry);
}
}
if turn_context.config.include_skill_instructions {
@@ -2634,7 +2703,14 @@ impl Session {
})
.await;
}
developer_sections.push(skills_instructions.render());
if let Some(entry) = crate::context_manager::updates::context_entry(
crate::context_manager::updates::DEVELOPER_BUNDLE,
"skills",
90,
PromptMessage::developer_text(skills_instructions.render()),
) {
entries.push(entry);
}
}
}
let loaded_plugins = self
@@ -2644,24 +2720,43 @@ impl Session {
.await;
if let Some(plugin_instructions) =
AvailablePluginsInstructions::from_plugins(loaded_plugins.capability_summaries())
&& let Some(entry) = crate::context_manager::updates::context_entry(
crate::context_manager::updates::DEVELOPER_BUNDLE,
"plugins",
100,
PromptMessage::developer_text(plugin_instructions.render()),
)
{
developer_sections.push(plugin_instructions.render());
entries.push(entry);
}
if turn_context.features.enabled(Feature::CodexGitCommit)
&& let Some(commit_message_instruction) = commit_message_trailer_instruction(
turn_context.config.commit_attribution.as_deref(),
)
&& let Some(entry) = crate::context_manager::updates::context_entry(
crate::context_manager::updates::DEVELOPER_BUNDLE,
"commit_message_trailer",
110,
PromptMessage::developer_text(commit_message_instruction),
)
{
developer_sections.push(commit_message_instruction);
entries.push(entry);
}
if let Some(user_instructions) = turn_context.user_instructions.as_deref() {
contextual_user_sections.push(
UserInstructions {
text: user_instructions.to_string(),
directory: turn_context.cwd.to_string_lossy().into_owned(),
}
.render(),
);
if let Some(user_instructions) = turn_context.user_instructions.as_deref()
&& let Some(entry) = crate::context_manager::updates::context_entry(
crate::context_manager::updates::CONTEXTUAL_USER_BUNDLE,
"user_instructions",
130,
PromptMessage::user_text(
UserInstructions {
text: user_instructions.to_string(),
directory: turn_context.cwd.to_string_lossy().into_owned(),
}
.render(),
),
)
{
entries.push(entry);
}
if turn_context.config.include_environment_context {
let subagents = self
@@ -2669,48 +2764,51 @@ impl Session {
.agent_control
.format_environment_context_subagents(self.conversation_id)
.await;
contextual_user_sections.push(
crate::context::EnvironmentContext::from_turn_context(turn_context, shell.as_ref())
if let Some(entry) = crate::context_manager::updates::context_entry(
crate::context_manager::updates::CONTEXTUAL_USER_BUNDLE,
"environment",
140,
PromptMessage::user_text(
crate::context::EnvironmentContext::from_turn_context(
turn_context,
shell.as_ref(),
)
.with_subagents(subagents)
.render(),
);
),
) {
entries.push(entry);
}
}
let multi_agent_v2_usage_hint_text =
multi_agents::usage_hint_text(turn_context, &session_source);
let mut items = Vec::with_capacity(4);
if let Some(developer_message) =
crate::context_manager::updates::build_developer_update_item(developer_sections)
{
items.push(developer_message);
}
if let Some(usage_hint_text) = multi_agent_v2_usage_hint_text
&& let Some(usage_hint_message) =
crate::context_manager::updates::build_developer_update_item(vec![
usage_hint_text.to_string(),
])
&& let Some(entry) = crate::context_manager::updates::context_entry(
crate::context_manager::updates::USAGE_HINT_BUNDLE,
"multi_agent_v2",
120,
PromptMessage::developer_text(usage_hint_text.to_string()),
)
{
items.push(usage_hint_message);
}
if let Some(contextual_user_message) =
crate::context_manager::updates::build_contextual_user_message(contextual_user_sections)
{
items.push(contextual_user_message);
entries.push(entry);
}
// Emit the guardian policy prompt as a separate developer item so the guardian
// subagent sees a distinct, easy-to-audit instruction block.
if separate_guardian_developer_message
&& let Some(developer_instructions) = turn_context.developer_instructions.as_deref()
&& !developer_instructions.is_empty()
&& let Some(guardian_developer_message) =
crate::context_manager::updates::build_developer_update_item(vec![
developer_instructions.to_string(),
])
&& let Some(entry) = crate::context_manager::updates::context_entry(
crate::context_manager::updates::GUARDIAN_BUNDLE,
"developer_instructions",
150,
PromptMessage::developer_text(developer_instructions.to_string()),
)
{
items.push(guardian_developer_message);
entries.push(entry);
}
items
entries
}
pub(crate) async fn persist_rollout_items(&self, items: &[RolloutItem]) {
@@ -2721,11 +2819,16 @@ impl Session {
}
}
pub(crate) async fn clone_history(&self) -> ContextManager {
pub(crate) async fn clone_history(&self) -> codex_journal::Journal {
let state = self.state.lock().await;
state.clone_history()
}
pub(crate) async fn history_version(&self) -> u64 {
let state = self.state.lock().await;
state.history_version()
}
pub(crate) async fn reference_context_item(&self) -> Option<TurnContextItem> {
let state = self.state.lock().await;
state.reference_context_item()
@@ -2792,7 +2895,10 @@ impl Session {
let history = self.clone_history().await;
let base_instructions = self.get_base_instructions().await;
let Some(estimated_total_tokens) =
history.estimate_token_count_with_base_instructions(&base_instructions)
crate::state::history::estimate_token_count_with_base_instructions(
&history,
&base_instructions,
)
else {
return;
};

View File

@@ -1,5 +1,7 @@
use super::*;
use crate::context_manager::is_user_turn_boundary;
use crate::state::history as session_history;
use codex_journal::Journal;
// Return value of `Session::reconstruct_history_from_rollout`, bundling the rebuilt history with
// the resume/fork hydration metadata derived from the same replay.
@@ -231,10 +233,11 @@ impl Session {
);
}
let mut history = ContextManager::new();
let mut history = Journal::new();
let mut rebuilt_history_reference_context_item = None;
let mut saw_legacy_compaction_without_replacement_history = false;
if let Some(base_replacement_history) = base_replacement_history {
history.replace(base_replacement_history.to_vec());
session_history::replace_history(&mut history, base_replacement_history.to_vec());
}
// Materialize exact history semantics from the replay-derived suffix. The eventual lazy
// design should keep this same replay shape, but drive it from a resumable reverse source
@@ -242,7 +245,8 @@ impl Session {
for item in rollout_suffix {
match item {
RolloutItem::ResponseItem(response_item) => {
history.record_items(
session_history::record_items(
&mut history,
std::iter::once(response_item),
turn_context.truncation_policy,
);
@@ -251,7 +255,7 @@ impl Session {
if let Some(replacement_history) = &compacted.replacement_history {
// This should actually never happen, because the reverse loop above (to build rollout_suffix)
// should stop before any compaction that has Some replacement_history
history.replace(replacement_history.clone());
session_history::replace_history(&mut history, replacement_history.clone());
} else {
saw_legacy_compaction_without_replacement_history = true;
// Legacy rollouts without `replacement_history` should rebuild the
@@ -262,17 +266,22 @@ impl Session {
// prompt shape.
// TODO(ccunningham): if we drop support for None replacement_history compaction items,
// we can get rid of this second loop entirely and just build `history` directly in the first loop.
let user_messages = collect_user_messages(history.raw_items());
let history_items = session_history::raw_items(&history);
let user_messages = collect_user_messages(history_items.as_slice());
let rebuilt = compact::build_compacted_history(
Vec::new(),
&user_messages,
&compacted.message,
);
history.replace(rebuilt);
session_history::replace_history(&mut history, rebuilt);
}
}
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
history.drop_last_n_user_turns(rollback.num_turns);
session_history::drop_last_n_user_turns(
&mut history,
&mut rebuilt_history_reference_context_item,
rollback.num_turns,
);
}
RolloutItem::EventMsg(_)
| RolloutItem::TurnContext(_)
@@ -293,7 +302,7 @@ impl Session {
};
RolloutReconstruction {
history: history.raw_items().to_vec(),
history: session_history::raw_items(&history),
previous_turn_settings,
reference_context_item,
}

View File

@@ -9,6 +9,7 @@ use crate::function_tool::FunctionCallError;
use crate::shell::default_user_shell;
use crate::skills::SkillRenderSideEffects;
use crate::skills::render::SkillMetadataBudget;
use crate::state::history as session_history;
use crate::test_support::models_manager_with_provider;
use crate::tools::format_exec_output_str;
use codex_config::ConfigLayerStack;
@@ -20,6 +21,7 @@ use codex_config::RequirementSource;
use codex_config::Sourced;
use codex_config::loader::project_trust_key;
use codex_config::types::ToolSuggestDisabledTool;
use codex_journal::Journal;
use codex_features::Feature;
use codex_features::Features;
@@ -78,6 +80,63 @@ use codex_execpolicy::Policy;
use codex_network_proxy::NetworkProxyConfig;
use codex_otel::MetricsClient;
use codex_otel::MetricsConfig;
trait JournalTestExt {
fn raw_items(&self) -> Vec<ResponseItem>;
fn for_prompt(
&self,
input_modalities: &[codex_protocol::openai_models::InputModality],
) -> Vec<ResponseItem>;
fn estimate_token_count(&self, turn_context: &TurnContext) -> Option<i64>;
fn estimate_token_count_with_base_instructions(
&self,
base_instructions: &BaseInstructions,
) -> Option<i64>;
fn record_items<I>(
&mut self,
items: I,
policy: codex_utils_output_truncation::TruncationPolicy,
) where
I: IntoIterator,
I::Item: std::ops::Deref<Target = ResponseItem>;
fn replace(&mut self, items: Vec<ResponseItem>);
}
impl JournalTestExt for Journal {
fn raw_items(&self) -> Vec<ResponseItem> {
session_history::raw_items(self)
}
fn for_prompt(
&self,
input_modalities: &[codex_protocol::openai_models::InputModality],
) -> Vec<ResponseItem> {
session_history::for_prompt(self, input_modalities)
}
fn estimate_token_count(&self, turn_context: &TurnContext) -> Option<i64> {
session_history::estimate_token_count(self, turn_context)
}
fn estimate_token_count_with_base_instructions(
&self,
base_instructions: &BaseInstructions,
) -> Option<i64> {
session_history::estimate_token_count_with_base_instructions(self, base_instructions)
}
fn record_items<I>(&mut self, items: I, policy: codex_utils_output_truncation::TruncationPolicy)
where
I: IntoIterator,
I::Item: std::ops::Deref<Target = ResponseItem>,
{
session_history::record_items(self, items, policy);
}
fn replace(&mut self, items: Vec<ResponseItem>) {
session_history::replace_history(self, items);
}
}
use codex_otel::THREAD_SKILLS_DESCRIPTION_TRUNCATED_CHARS_METRIC;
use codex_otel::THREAD_SKILLS_ENABLED_TOTAL_METRIC;
use codex_otel::THREAD_SKILLS_KEPT_TOTAL_METRIC;
@@ -7629,7 +7688,7 @@ async fn sample_rollout(
_turn_context: &TurnContext,
) -> (Vec<RolloutItem>, Vec<ResponseItem>) {
let mut rollout_items = Vec::new();
let mut live_history = ContextManager::new();
let mut live_history = Journal::new();
// Use the same turn_context source as record_initial_history so model_info (and thus
// personality_spec) matches reconstruction.

View File

@@ -42,6 +42,7 @@ use crate::resolve_skill_dependencies_for_turn;
use crate::session::PreviousTurnSettings;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use crate::state::history as session_history;
use crate::stream_events_utils::HandleOutputCtx;
use crate::stream_events_utils::handle_non_tool_response_item;
use crate::stream_events_utils::handle_output_item_done;
@@ -428,9 +429,8 @@ pub(crate) async fn run_turn(
// Construct the input that we will send to the model.
let sampling_request_input: Vec<ResponseItem> = {
sess.clone_history()
.await
.for_prompt(&turn_context.model_info.input_modalities)
let history = sess.clone_history().await;
session_history::for_prompt(&history, &turn_context.model_info.input_modalities)
};
let sampling_request_input_messages = sampling_request_input
@@ -629,7 +629,7 @@ pub(crate) async fn run_turn(
error_or_panic(
"Invalid image detected; sanitizing tool output to prevent poisoning",
);
if state.history.replace_last_turn_images("Invalid image") {
if state.replace_last_turn_images("Invalid image") {
continue;
}
}
@@ -1005,9 +1005,8 @@ async fn run_sampling_request(
let prompt_input = if let Some(input) = initial_input.take() {
input
} else {
sess.clone_history()
.await
.for_prompt(&turn_context.model_info.input_modalities)
let history = sess.clone_history().await;
session_history::for_prompt(&history, &turn_context.model_info.input_modalities)
};
let prompt = build_prompt(
prompt_input,

View File

@@ -0,0 +1,314 @@
use crate::event_mapping::has_non_contextual_dev_message_content;
use crate::event_mapping::is_contextual_dev_message_content;
use crate::event_mapping::is_contextual_user_message_content;
use crate::session::turn_context::TurnContext;
use codex_journal::Journal;
use codex_journal::JournalItem;
use codex_journal::JournalKey;
use codex_journal::JournalTranscriptItem;
use codex_journal::history as thread_history;
use codex_journal::history::estimate_item_token_count;
use codex_journal::history::estimate_response_item_model_visible_bytes;
use codex_journal::history::is_api_message;
use codex_journal::history::is_model_generated_item;
use codex_journal::history::is_user_turn_boundary;
use codex_journal::history::user_turn_boundary_positions;
use codex_protocol::models::BaseInstructions;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::InputModality;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TokenUsageInfo;
use codex_protocol::protocol::TurnContextItem;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::approx_token_count;
use std::ops::Deref;
#[derive(Debug, Clone, Copy, Default)]
pub(crate) struct TotalTokenUsageBreakdown {
pub(crate) last_api_response_total_tokens: i64,
pub(crate) all_history_items_model_visible_bytes: i64,
pub(crate) estimated_tokens_of_items_added_since_last_successful_api_response: i64,
pub(crate) estimated_bytes_of_items_added_since_last_successful_api_response: i64,
}
pub(crate) fn record_items<I>(journal: &mut Journal, items: I, policy: TruncationPolicy)
where
I: IntoIterator,
I::Item: Deref<Target = ResponseItem>,
{
for item in items {
let item_ref = item.deref();
if !is_api_message(item_ref) {
continue;
}
let processed = thread_history::truncate_history_item(item_ref, policy);
push_history_item(journal, processed);
}
}
pub(crate) fn raw_items(journal: &Journal) -> Vec<ResponseItem> {
journal
.entries()
.iter()
.filter_map(|entry| match &entry.item {
JournalItem::Transcript(item) => Some(item.item.clone()),
JournalItem::Metadata(_) | JournalItem::Checkpoint(_) => None,
})
.collect()
}
pub(crate) fn for_prompt(
journal: &Journal,
input_modalities: &[InputModality],
) -> Vec<ResponseItem> {
let mut items = raw_items(journal);
normalize_history(&mut items, input_modalities);
items
}
pub(crate) fn estimate_token_count(journal: &Journal, turn_context: &TurnContext) -> Option<i64> {
let model_info = &turn_context.model_info;
let personality = turn_context.personality.or(turn_context.config.personality);
let base_instructions = BaseInstructions {
text: model_info.get_model_instructions(personality),
};
estimate_token_count_with_base_instructions(journal, &base_instructions)
}
pub(crate) fn estimate_token_count_with_base_instructions(
journal: &Journal,
base_instructions: &BaseInstructions,
) -> Option<i64> {
let base_tokens =
i64::try_from(approx_token_count(&base_instructions.text)).unwrap_or(i64::MAX);
let items_tokens = raw_items(journal)
.iter()
.map(estimate_item_token_count)
.fold(0i64, i64::saturating_add);
Some(base_tokens.saturating_add(items_tokens))
}
pub(crate) fn remove_first_item(journal: &mut Journal) -> bool {
let mut items = raw_items(journal);
if items.is_empty() {
return false;
}
let removed = items.remove(0);
thread_history::remove_corresponding_for(&mut items, &removed);
replace_history(journal, items);
true
}
pub(crate) fn remove_last_item(journal: &mut Journal) -> bool {
let mut items = raw_items(journal);
if let Some(removed) = items.pop() {
thread_history::remove_corresponding_for(&mut items, &removed);
replace_history(journal, items);
true
} else {
false
}
}
pub(crate) fn replace_history(journal: &mut Journal, items: Vec<ResponseItem>) {
*journal = journal_from_items(items);
}
pub(crate) fn replace_last_turn_images(journal: &mut Journal, placeholder: &str) -> bool {
let mut items = raw_items(journal);
let replaced = thread_history::replace_last_turn_images(&mut items, placeholder);
if replaced {
replace_history(journal, items);
}
replaced
}
pub(crate) fn drop_last_n_user_turns(
journal: &mut Journal,
reference_context_item: &mut Option<TurnContextItem>,
num_turns: u32,
) {
if num_turns == 0 {
return;
}
let snapshot = raw_items(journal);
let user_positions = user_turn_boundary_positions(&snapshot);
let Some(&first_instruction_turn_idx) = user_positions.first() else {
replace_history(journal, snapshot);
return;
};
let n_from_end = usize::try_from(num_turns).unwrap_or(usize::MAX);
let mut cut_idx = if n_from_end >= user_positions.len() {
first_instruction_turn_idx
} else {
user_positions[user_positions.len() - n_from_end]
};
cut_idx = trim_pre_turn_context_updates(
reference_context_item,
&snapshot,
first_instruction_turn_idx,
cut_idx,
);
replace_history(journal, snapshot[..cut_idx].to_vec());
}
pub(crate) fn update_token_info(
token_info: &mut Option<TokenUsageInfo>,
usage: &TokenUsage,
model_context_window: Option<i64>,
) {
*token_info =
TokenUsageInfo::new_or_append(token_info, &Some(usage.clone()), model_context_window);
}
pub(crate) fn set_token_usage_full(token_info: &mut Option<TokenUsageInfo>, context_window: i64) {
match token_info {
Some(info) => info.fill_to_context_window(context_window),
None => {
*token_info = Some(TokenUsageInfo::full_context_window(context_window));
}
}
}
pub(crate) fn get_total_token_usage(
journal: &Journal,
token_info: &Option<TokenUsageInfo>,
server_reasoning_included: bool,
) -> i64 {
let items = raw_items(journal);
let last_tokens = token_info
.as_ref()
.map(|info| info.last_token_usage.total_tokens)
.unwrap_or(0);
let items_after_last_model_generated_tokens = items_after_last_model_generated_item(&items)
.iter()
.map(estimate_item_token_count)
.fold(0i64, i64::saturating_add);
if server_reasoning_included {
last_tokens.saturating_add(items_after_last_model_generated_tokens)
} else {
last_tokens
.saturating_add(get_non_last_reasoning_items_tokens(&items))
.saturating_add(items_after_last_model_generated_tokens)
}
}
pub(crate) fn get_total_token_usage_breakdown(
journal: &Journal,
token_info: &Option<TokenUsageInfo>,
) -> TotalTokenUsageBreakdown {
let items = raw_items(journal);
let last_usage = token_info
.as_ref()
.map(|info| info.last_token_usage.clone())
.unwrap_or_default();
let items_after_last_model_generated = items_after_last_model_generated_item(&items);
TotalTokenUsageBreakdown {
last_api_response_total_tokens: last_usage.total_tokens,
all_history_items_model_visible_bytes: items
.iter()
.map(estimate_response_item_model_visible_bytes)
.fold(0i64, i64::saturating_add),
estimated_tokens_of_items_added_since_last_successful_api_response:
items_after_last_model_generated
.iter()
.map(estimate_item_token_count)
.fold(0i64, i64::saturating_add),
estimated_bytes_of_items_added_since_last_successful_api_response:
items_after_last_model_generated
.iter()
.map(estimate_response_item_model_visible_bytes)
.fold(0i64, i64::saturating_add),
}
}
fn push_history_item(journal: &mut Journal, item: ResponseItem) {
let history_item = JournalTranscriptItem::new(item);
let key = JournalKey::new(vec!["history".to_string(), history_item.id.clone()]);
journal.add(key, history_item);
}
fn journal_from_items(items: Vec<ResponseItem>) -> Journal {
let mut journal = Journal::new();
for item in items {
if is_api_message(&item) {
push_history_item(&mut journal, item);
}
}
journal
}
fn get_non_last_reasoning_items_tokens(items: &[ResponseItem]) -> i64 {
let Some(last_user_index) = items.iter().rposition(is_user_turn_boundary) else {
return 0;
};
items
.iter()
.take(last_user_index)
.filter(|item| {
matches!(
item,
ResponseItem::Reasoning {
encrypted_content: Some(_),
..
}
)
})
.map(estimate_item_token_count)
.fold(0i64, i64::saturating_add)
}
fn items_after_last_model_generated_item(items: &[ResponseItem]) -> &[ResponseItem] {
let start = items
.iter()
.rposition(is_model_generated_item)
.map_or(items.len(), |index| index.saturating_add(1));
&items[start..]
}
fn normalize_history(items: &mut Vec<ResponseItem>, input_modalities: &[InputModality]) {
thread_history::ensure_call_outputs_present(items);
thread_history::remove_orphan_outputs(items);
thread_history::strip_images_when_unsupported(input_modalities, items);
}
fn trim_pre_turn_context_updates(
reference_context_item: &mut Option<TurnContextItem>,
snapshot: &[ResponseItem],
first_instruction_turn_idx: usize,
mut cut_idx: usize,
) -> usize {
while cut_idx > first_instruction_turn_idx {
match &snapshot[cut_idx - 1] {
ResponseItem::Message { role, content, .. }
if role == "developer" && is_contextual_dev_message_content(content) =>
{
if has_non_contextual_dev_message_content(content) {
*reference_context_item = None;
}
cut_idx -= 1;
}
ResponseItem::Message { role, content, .. }
if role == "user" && is_contextual_user_message_content(content) =>
{
cut_idx -= 1;
}
_ => break,
}
}
cut_idx
}
#[cfg(test)]
#[path = "history_tests.rs"]
mod tests;

View File

@@ -1,6 +1,9 @@
use super::*;
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use codex_journal::Journal;
use codex_journal::history::ORIGINAL_IMAGE_MAX_PATCHES;
use codex_journal::history::RESIZED_IMAGE_BYTES_ESTIMATE;
use codex_protocol::AgentPath;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::models::BaseInstructions;
@@ -20,8 +23,11 @@ use codex_protocol::openai_models::default_input_modalities;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::InterAgentCommunication;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TokenUsageInfo;
use codex_protocol::protocol::TurnContextItem;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::approx_bytes_for_tokens;
use codex_utils_output_truncation::truncate_text;
use image::ImageBuffer;
use image::ImageFormat;
@@ -34,6 +40,98 @@ use std::path::PathBuf;
const EXEC_FORMAT_MAX_BYTES: usize = 10_000;
const EXEC_FORMAT_MAX_TOKENS: usize = 2_500;
#[derive(Clone, Default)]
struct TestHistory {
journal: Journal,
token_info: Option<TokenUsageInfo>,
reference_context_item: Option<TurnContextItem>,
}
impl TestHistory {
fn new() -> Self {
Self {
journal: Journal::new(),
token_info: TokenUsageInfo::new_or_append(
&None, &None, /*model_context_window*/ None,
),
reference_context_item: None,
}
}
fn record_items<I>(&mut self, items: I, policy: TruncationPolicy)
where
I: IntoIterator,
I::Item: std::ops::Deref<Target = ResponseItem>,
{
super::record_items(&mut self.journal, items, policy);
}
fn raw_items(&self) -> Vec<ResponseItem> {
super::raw_items(&self.journal)
}
fn for_prompt(&self, input_modalities: &[InputModality]) -> Vec<ResponseItem> {
super::for_prompt(&self.journal, input_modalities)
}
fn estimate_token_count_with_base_instructions(
&self,
base_instructions: &BaseInstructions,
) -> Option<i64> {
super::estimate_token_count_with_base_instructions(&self.journal, base_instructions)
}
fn remove_first_item(&mut self) {
super::remove_first_item(&mut self.journal);
}
fn remove_last_item(&mut self) -> bool {
super::remove_last_item(&mut self.journal)
}
fn replace_last_turn_images(&mut self, placeholder: &str) -> bool {
super::replace_last_turn_images(&mut self.journal, placeholder)
}
fn drop_last_n_user_turns(&mut self, num_turns: u32) {
super::drop_last_n_user_turns(
&mut self.journal,
&mut self.reference_context_item,
num_turns,
);
}
fn update_token_info(&mut self, usage: &TokenUsage, model_context_window: Option<i64>) {
super::update_token_info(&mut self.token_info, usage, model_context_window);
}
fn get_total_token_usage(&self, server_reasoning_included: bool) -> i64 {
super::get_total_token_usage(&self.journal, &self.token_info, server_reasoning_included)
}
fn set_reference_context_item(&mut self, item: Option<TurnContextItem>) {
self.reference_context_item = item;
}
fn reference_context_item(&self) -> Option<TurnContextItem> {
self.reference_context_item.clone()
}
fn normalize_history(&mut self, input_modalities: &[InputModality]) {
let mut items = self.raw_items();
super::normalize_history(&mut items, input_modalities);
super::replace_history(&mut self.journal, items);
}
fn get_non_last_reasoning_items_tokens(&self) -> i64 {
super::get_non_last_reasoning_items_tokens(&self.raw_items())
}
fn items_after_last_model_generated_item(&self) -> Vec<ResponseItem> {
super::items_after_last_model_generated_item(&self.raw_items()).to_vec()
}
}
fn assistant_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
@@ -63,8 +161,8 @@ fn inter_agent_assistant_msg(text: &str) -> ResponseItem {
}
}
fn create_history_with_items(items: Vec<ResponseItem>) -> ContextManager {
let mut h = ContextManager::new();
fn create_history_with_items(items: Vec<ResponseItem>) -> TestHistory {
let mut h = TestHistory::new();
// Use a generous but fixed token budget; tests only rely on truncation
// behavior, not on a specific model's token limit.
h.record_items(items.iter(), TruncationPolicy::Tokens(10_000));
@@ -185,7 +283,7 @@ fn approx_token_count_for_text(text: &str) -> i64 {
#[test]
fn filters_non_api_messages() {
let mut h = ContextManager::default();
let mut h = TestHistory::default();
let policy = TruncationPolicy::Tokens(10_000);
// System message is not API messages; Other is ignored.
let system = ResponseItem::Message {
@@ -327,7 +425,7 @@ fn drop_last_n_user_turns_treats_inter_agent_assistant_messages_as_instruction_t
history.drop_last_n_user_turns(/*num_turns*/ 1);
assert_eq!(history.raw_items(), &vec![first_turn, first_reply]);
assert_eq!(history.raw_items(), vec![first_turn, first_reply]);
}
#[test]
@@ -1014,7 +1112,7 @@ fn normalization_retains_local_shell_outputs() {
#[test]
fn record_items_truncates_function_call_output_content() {
let mut history = ContextManager::new();
let mut history = TestHistory::new();
// Any reasonably small token budget works; the test only cares that
// truncation happens and the marker is present.
let policy = TruncationPolicy::Tokens(1_000);
@@ -1030,8 +1128,9 @@ fn record_items_truncates_function_call_output_content() {
history.record_items([&item], policy);
assert_eq!(history.items.len(), 1);
match &history.items[0] {
let history_items = history.raw_items();
assert_eq!(history_items.len(), 1);
match &history_items[0] {
ResponseItem::FunctionCallOutput { output, .. } => {
let content = output.text_content().unwrap_or_default();
assert_ne!(content, long_output);
@@ -1050,7 +1149,7 @@ fn record_items_truncates_function_call_output_content() {
#[test]
fn record_items_truncates_custom_tool_call_output_content() {
let mut history = ContextManager::new();
let mut history = TestHistory::new();
let policy = TruncationPolicy::Tokens(1_000);
let line = "custom output that is very long\n";
let long_output = line.repeat(2_500);
@@ -1062,8 +1161,9 @@ fn record_items_truncates_custom_tool_call_output_content() {
history.record_items([&item], policy);
assert_eq!(history.items.len(), 1);
match &history.items[0] {
let history_items = history.raw_items();
assert_eq!(history_items.len(), 1);
match &history_items[0] {
ResponseItem::CustomToolCallOutput { output, .. } => {
let output = output.text_content().unwrap_or_default();
assert_ne!(output, long_output);
@@ -1082,7 +1182,7 @@ fn record_items_truncates_custom_tool_call_output_content() {
#[test]
fn record_items_respects_custom_token_limit() {
let mut history = ContextManager::new();
let mut history = TestHistory::new();
let policy = TruncationPolicy::Tokens(10);
let long_output = "tokenized content repeated many times ".repeat(200);
let item = ResponseItem::FunctionCallOutput {
@@ -1095,7 +1195,8 @@ fn record_items_respects_custom_token_limit() {
history.record_items([&item], policy);
let stored = match &history.items[0] {
let history_items = history.raw_items();
let stored = match &history_items[0] {
ResponseItem::FunctionCallOutput { output, .. } => output,
other => panic!("unexpected history item: {other:?}"),
};

View File

@@ -1,7 +1,9 @@
pub(crate) mod history;
mod service;
mod session;
mod turn;
pub(crate) use history::TotalTokenUsageBreakdown;
pub(crate) use service::SessionServices;
pub(crate) use session::SessionState;
pub(crate) use turn::ActiveTurn;

View File

@@ -1,15 +1,17 @@
//! Session-wide mutable state.
use codex_journal::Journal;
use codex_protocol::models::AdditionalPermissionProfile;
use codex_protocol::models::ResponseItem;
use codex_sandboxing::policy_transforms::merge_permission_profiles;
use std::collections::HashMap;
use std::collections::HashSet;
use crate::context_manager::ContextManager;
use crate::session::PreviousTurnSettings;
use crate::session::session::SessionConfiguration;
use crate::session_startup_prewarm::SessionStartupPrewarmHandle;
use crate::state::history as session_history;
use crate::state::history::TotalTokenUsageBreakdown;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TokenUsageInfo;
@@ -19,7 +21,10 @@ use codex_utils_output_truncation::TruncationPolicy;
/// Persistent, session-scoped state previously stored directly on `Session`.
pub(crate) struct SessionState {
pub(crate) session_configuration: SessionConfiguration,
pub(crate) history: ContextManager,
journal: Journal,
history_version: u64,
token_info: Option<TokenUsageInfo>,
reference_context_item: Option<TurnContextItem>,
pub(crate) latest_rate_limits: Option<RateLimitSnapshot>,
pub(crate) server_reasoning_included: bool,
pub(crate) dependency_env: HashMap<String, String>,
@@ -39,10 +44,14 @@ pub(crate) struct SessionState {
impl SessionState {
/// Create a new session state mirroring previous `State::default()` semantics.
pub(crate) fn new(session_configuration: SessionConfiguration) -> Self {
let history = ContextManager::new();
Self {
session_configuration,
history,
journal: Journal::new(),
history_version: 0,
token_info: TokenUsageInfo::new_or_append(
&None, &None, /*model_context_window*/ None,
),
reference_context_item: None,
latest_rate_limits: None,
server_reasoning_included: false,
dependency_env: HashMap::new(),
@@ -62,7 +71,7 @@ impl SessionState {
I: IntoIterator,
I::Item: std::ops::Deref<Target = ResponseItem>,
{
self.history.record_items(items, policy);
session_history::record_items(&mut self.journal, items, policy);
}
pub(crate) fn previous_turn_settings(&self) -> Option<PreviousTurnSettings> {
@@ -85,8 +94,8 @@ impl SessionState {
is_first_turn
}
pub(crate) fn clone_history(&self) -> ContextManager {
self.history.clone()
pub(crate) fn clone_history(&self) -> Journal {
self.journal.clone()
}
pub(crate) fn replace_history(
@@ -94,21 +103,21 @@ impl SessionState {
items: Vec<ResponseItem>,
reference_context_item: Option<TurnContextItem>,
) {
self.history.replace(items);
self.history
.set_reference_context_item(reference_context_item);
session_history::replace_history(&mut self.journal, items);
self.history_version = self.history_version.saturating_add(1);
self.reference_context_item = reference_context_item;
}
pub(crate) fn set_token_info(&mut self, info: Option<TokenUsageInfo>) {
self.history.set_token_info(info);
self.token_info = info;
}
pub(crate) fn set_reference_context_item(&mut self, item: Option<TurnContextItem>) {
self.history.set_reference_context_item(item);
self.reference_context_item = item;
}
pub(crate) fn reference_context_item(&self) -> Option<TurnContextItem> {
self.history.reference_context_item()
self.reference_context_item.clone()
}
// Token/rate limit helpers
@@ -117,11 +126,11 @@ impl SessionState {
usage: &TokenUsage,
model_context_window: Option<i64>,
) {
self.history.update_token_info(usage, model_context_window);
session_history::update_token_info(&mut self.token_info, usage, model_context_window);
}
pub(crate) fn token_info(&self) -> Option<TokenUsageInfo> {
self.history.token_info()
self.token_info.clone()
}
pub(crate) fn set_rate_limits(&mut self, snapshot: RateLimitSnapshot) {
@@ -138,12 +147,38 @@ impl SessionState {
}
pub(crate) fn set_token_usage_full(&mut self, context_window: i64) {
self.history.set_token_usage_full(context_window);
session_history::set_token_usage_full(&mut self.token_info, context_window);
}
pub(crate) fn get_total_token_usage(&self, server_reasoning_included: bool) -> i64 {
self.history
.get_total_token_usage(server_reasoning_included)
session_history::get_total_token_usage(
&self.journal,
&self.token_info,
server_reasoning_included,
)
}
pub(crate) fn get_total_token_usage_breakdown(&self) -> TotalTokenUsageBreakdown {
session_history::get_total_token_usage_breakdown(&self.journal, &self.token_info)
}
pub(crate) fn estimate_token_count(
&self,
turn_context: &crate::session::turn_context::TurnContext,
) -> Option<i64> {
session_history::estimate_token_count(&self.journal, turn_context)
}
pub(crate) fn history_version(&self) -> u64 {
self.history_version
}
pub(crate) fn replace_last_turn_images(&mut self, placeholder: &str) -> bool {
let replaced = session_history::replace_last_turn_images(&mut self.journal, placeholder);
if replaced {
self.history_version = self.history_version.saturating_add(1);
}
replaced
}
pub(crate) fn set_server_reasoning_included(&mut self, included: bool) {

View File

@@ -0,0 +1,6 @@
load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "journal",
crate_name = "codex_journal",
)

View File

@@ -0,0 +1,24 @@
[package]
name = "codex-journal"
version.workspace = true
edition.workspace = true
license.workspace = true
readme = "README.md"
[lints]
workspace = true
[dependencies]
base64 = { workspace = true }
codex-utils-cache = { workspace = true }
codex-utils-output-truncation = { workspace = true }
codex-protocol = { workspace = true }
image = { workspace = true, features = ["jpeg", "png", "webp"] }
indexmap = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
tracing = { workspace = true }
[dev-dependencies]
pretty_assertions = { workspace = true }
tempfile = { workspace = true }

View File

@@ -0,0 +1,86 @@
# codex-journal
`codex-journal` is the typed journal model behind prompt metadata, transcript rewriting, forked views,
and prompt-ready projection in Codex.
## Model
A `Journal` stores one append-only sequence of `JournalEntry` values. Each entry is one of:
- prompt metadata: keyed entries that can replace older entries with the same key
- transcript: ordered prompt-visible items
- checkpoints: transcript rewrite operations such as prefix replacement or truncation
The append-only journal is the source of truth. Derived views are produced by resolving it.
## Resolved Views
`Journal::resolve()` returns a `ResolvedJournal` with two typed views:
- `ResolvedMetadata`: effective prompt-metadata entries after key-based replacement
- `ResolvedTranscript`: effective transcript after checkpoint application
These two views come from the same journal, but they behave differently:
- metadata is deduplicated by key and ordered by `prompt_order`
- transcript preserves order and is rewritten only by checkpoints
## Building Metadata Entries
Use `Journal::metadata_entry_builder(...)` when you want explicit control over metadata:
```rust
use codex_journal::Journal;
use codex_journal::JournalContextAudience;
use codex_journal::PromptMessage;
let entry = Journal::metadata_entry_builder(
["prompt", "developer", "permissions"],
PromptMessage::developer_text("sandbox is workspace-write"),
)
.prompt_order(20)
.audience(JournalContextAudience::All)
.build();
```
`Journal::metadata_entry(...)` remains available as a shorthand for the common case.
## Rendering
Prompt rendering lives in `PromptRenderer`, not in `Journal` itself. Grouping is explicit and
applied only to consecutive resolved metadata entries:
```rust
use codex_journal::Journal;
use codex_journal::KeyFilter;
use codex_journal::PromptMessage;
use codex_journal::PromptRenderer;
let journal = Journal::from_entries(vec![
Journal::metadata_entry(
["prompt", "developer", "one"],
10,
PromptMessage::developer_text("first"),
).unwrap(),
Journal::metadata_entry(
["prompt", "developer", "two"],
20,
PromptMessage::developer_text("second"),
).unwrap(),
]);
let resolved = journal.resolve().unwrap();
let prompt = PromptRenderer::new()
.group(KeyFilter::prefix(["prompt", "developer"]))
.render_metadata(resolved.metadata());
```
## Transformations
`Journal` also supports:
- `filter` and `resolve_with_filter` for key-scoped views
- `flatten` for dropping obsolete entries while keeping the current effective view
- `fork` for producing child views with audience and `on_fork` filtering
- `with_history_window` for keeping only a recent hot history suffix
- `persist_jsonl` / `load_jsonl` for durable JSONL storage

View File

@@ -0,0 +1,87 @@
use crate::JournalContextAudience;
use crate::JournalContextForkBehavior;
use crate::JournalEntry;
use crate::JournalKey;
use crate::JournalMetadataItem;
use crate::PromptMessage;
use codex_protocol::models::ContentItem;
/// Builder for one prompt-metadata journal entry.
#[derive(Debug, Clone)]
pub struct MetadataEntryBuilder {
key: JournalKey,
message: PromptMessage,
prompt_order: i64,
audience: JournalContextAudience,
on_fork: JournalContextForkBehavior,
tags: Vec<String>,
source: Option<String>,
}
impl MetadataEntryBuilder {
pub(crate) fn new(key: impl Into<JournalKey>, message: impl Into<PromptMessage>) -> Self {
Self {
key: key.into(),
message: message.into(),
prompt_order: 0,
audience: JournalContextAudience::default(),
on_fork: JournalContextForkBehavior::default(),
tags: Vec::new(),
source: None,
}
}
/// Sets the prompt ordering used after resolution.
pub fn prompt_order(mut self, prompt_order: i64) -> Self {
self.prompt_order = prompt_order;
self
}
/// Sets the audience used when projecting or forking the journal.
pub fn audience(mut self, audience: JournalContextAudience) -> Self {
self.audience = audience;
self
}
/// Sets how this context entry behaves when creating a forked journal view.
pub fn on_fork(mut self, on_fork: JournalContextForkBehavior) -> Self {
self.on_fork = on_fork;
self
}
/// Sets arbitrary tags for downstream classification.
pub fn tags(mut self, tags: Vec<String>) -> Self {
self.tags = tags;
self
}
/// Records the origin of this prompt-metadata entry.
pub fn source(mut self, source: impl Into<String>) -> Self {
self.source = Some(source.into());
self
}
/// Builds one prompt-metadata entry if the message is non-empty after trimming text content.
pub fn build(self) -> Option<JournalEntry> {
if self.message.content.is_empty()
|| self.message.content.iter().all(|item| match item {
ContentItem::InputText { text } | ContentItem::OutputText { text } => {
text.trim().is_empty()
}
ContentItem::InputImage { .. } => false,
})
{
return None;
}
let mut item = JournalMetadataItem::new(self.message)
.with_prompt_order(self.prompt_order)
.with_audience(self.audience)
.with_on_fork(self.on_fork)
.with_tags(self.tags);
if let Some(source) = self.source {
item = item.with_source(source);
}
Some(JournalEntry::new(self.key, item))
}
}

View File

@@ -0,0 +1,24 @@
use thiserror::Error;
/// Convenience result type for journal operations.
pub type Result<T> = std::result::Result<T, JournalError>;
/// Errors produced while materializing or persisting a journal.
#[derive(Debug, Error)]
pub enum JournalError {
#[error("failed to read or write journal")]
Io(#[from] std::io::Error),
#[error("failed to serialize journal item")]
SerializeJson {
#[from]
source: serde_json::Error,
},
#[error("failed to parse journal item at line {line_number}")]
ParseJson {
line_number: usize,
#[source]
source: serde_json::Error,
},
#[error("history cursor referenced unknown history item id `{history_item_id}`")]
UnknownHistoryItemId { history_item_id: String },
}

View File

@@ -0,0 +1,133 @@
use codex_protocol::items::parse_hook_prompt_fragment;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::ENVIRONMENT_CONTEXT_CLOSE_TAG;
use codex_protocol::protocol::ENVIRONMENT_CONTEXT_OPEN_TAG;
use codex_protocol::protocol::InterAgentCommunication;
const USER_INSTRUCTIONS_START_MARKER: &str = "# AGENTS.md instructions for ";
const USER_INSTRUCTIONS_END_MARKER: &str = "</INSTRUCTIONS>";
const SKILL_START_MARKER: &str = "<skill>";
const SKILL_END_MARKER: &str = "</skill>";
const USER_SHELL_COMMAND_START_MARKER: &str = "<user_shell_command>";
const USER_SHELL_COMMAND_END_MARKER: &str = "</user_shell_command>";
const TURN_ABORTED_START_MARKER: &str = "<turn_aborted>";
const TURN_ABORTED_END_MARKER: &str = "</turn_aborted>";
const SUBAGENT_NOTIFICATION_START_MARKER: &str = "<subagent_notification>";
const SUBAGENT_NOTIFICATION_END_MARKER: &str = "</subagent_notification>";
/// Returns whether an item should be carried in API-visible conversation history.
pub fn is_api_message(message: &ResponseItem) -> bool {
match message {
ResponseItem::Message { role, .. } => role.as_str() != "system",
ResponseItem::FunctionCallOutput { .. }
| ResponseItem::FunctionCall { .. }
| ResponseItem::ToolSearchCall { .. }
| ResponseItem::ToolSearchOutput { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::CustomToolCallOutput { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::ImageGenerationCall { .. }
| ResponseItem::Compaction { .. } => true,
ResponseItem::Other => false,
}
}
/// Returns whether an item originated from model generation rather than client-side bookkeeping.
pub fn is_model_generated_item(item: &ResponseItem) -> bool {
match item {
ResponseItem::Message { role, .. } => role == "assistant",
ResponseItem::Reasoning { .. }
| ResponseItem::FunctionCall { .. }
| ResponseItem::ToolSearchCall { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::ImageGenerationCall { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::Compaction { .. } => true,
ResponseItem::FunctionCallOutput { .. }
| ResponseItem::ToolSearchOutput { .. }
| ResponseItem::CustomToolCallOutput { .. }
| ResponseItem::Other => false,
}
}
/// Returns whether an item was injected by Codex rather than supplied by the user or model.
pub fn is_codex_generated_item(item: &ResponseItem) -> bool {
matches!(
item,
ResponseItem::FunctionCallOutput { .. }
| ResponseItem::ToolSearchOutput { .. }
| ResponseItem::CustomToolCallOutput { .. }
) || matches!(item, ResponseItem::Message { role, .. } if role == "developer")
}
/// Returns whether an item should count as an instruction-turn boundary for history rewrites.
pub fn is_user_turn_boundary(item: &ResponseItem) -> bool {
let ResponseItem::Message { role, content, .. } = item else {
return false;
};
(role == "user" && !is_contextual_user_message_content(content))
|| (role == "assistant" && is_inter_agent_instruction_content(content))
}
/// Returns the indexes of all instruction-turn boundaries in order.
pub fn user_turn_boundary_positions(items: &[ResponseItem]) -> Vec<usize> {
let mut positions = Vec::new();
for (index, item) in items.iter().enumerate() {
if is_user_turn_boundary(item) {
positions.push(index);
}
}
positions
}
fn is_inter_agent_instruction_content(content: &[ContentItem]) -> bool {
InterAgentCommunication::is_message_content(content)
}
fn is_contextual_user_message_content(content: &[ContentItem]) -> bool {
content.iter().any(|item| match item {
ContentItem::InputText { text } => {
parse_hook_prompt_fragment(text).is_some()
|| matches_fragment(
text,
USER_INSTRUCTIONS_START_MARKER,
USER_INSTRUCTIONS_END_MARKER,
)
|| matches_fragment(
text,
ENVIRONMENT_CONTEXT_OPEN_TAG,
ENVIRONMENT_CONTEXT_CLOSE_TAG,
)
|| matches_fragment(text, SKILL_START_MARKER, SKILL_END_MARKER)
|| matches_fragment(
text,
USER_SHELL_COMMAND_START_MARKER,
USER_SHELL_COMMAND_END_MARKER,
)
|| matches_fragment(text, TURN_ABORTED_START_MARKER, TURN_ABORTED_END_MARKER)
|| matches_fragment(
text,
SUBAGENT_NOTIFICATION_START_MARKER,
SUBAGENT_NOTIFICATION_END_MARKER,
)
}
ContentItem::InputImage { .. } | ContentItem::OutputText { .. } => false,
})
}
fn matches_fragment(text: &str, start_marker: &str, end_marker: &str) -> bool {
let trimmed = text.trim_start();
let starts_with_marker = trimmed
.get(..start_marker.len())
.is_some_and(|candidate| candidate.eq_ignore_ascii_case(start_marker));
let trimmed = trimmed.trim_end();
let ends_with_marker = trimmed
.get(trimmed.len().saturating_sub(end_marker.len())..)
.is_some_and(|candidate| candidate.eq_ignore_ascii_case(end_marker));
starts_with_marker && ends_with_marker
}

View File

@@ -0,0 +1,174 @@
use base64::Engine;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::ImageDetail;
use codex_protocol::models::ResponseItem;
use codex_utils_cache::BlockingLruCache;
use codex_utils_cache::sha1_digest;
use codex_utils_output_truncation::approx_bytes_for_tokens;
use codex_utils_output_truncation::approx_tokens_from_byte_count_i64;
use std::num::NonZeroUsize;
use std::sync::LazyLock;
fn estimate_reasoning_length(encoded_len: usize) -> usize {
encoded_len
.saturating_mul(3)
.checked_div(4)
.unwrap_or(0)
.saturating_sub(650)
}
/// Approximates the token cost of one history item using model-visible byte heuristics.
pub fn estimate_item_token_count(item: &ResponseItem) -> i64 {
let model_visible_bytes = estimate_response_item_model_visible_bytes(item);
approx_tokens_from_byte_count_i64(model_visible_bytes)
}
/// Approximates the model-visible byte size of one history item.
///
/// Inline base64 image payloads are discounted to a fixed vision-token estimate instead of their
/// raw serialized size, while encrypted reasoning content uses a coarse decoded-length heuristic.
pub fn estimate_response_item_model_visible_bytes(item: &ResponseItem) -> i64 {
match item {
ResponseItem::Reasoning {
encrypted_content: Some(content),
..
}
| ResponseItem::Compaction {
encrypted_content: content,
} => i64::try_from(estimate_reasoning_length(content.len())).unwrap_or(i64::MAX),
item => {
let raw = serde_json::to_string(item)
.map(|serialized| i64::try_from(serialized.len()).unwrap_or(i64::MAX))
.unwrap_or_default();
let (payload_bytes, replacement_bytes) = image_data_url_estimate_adjustment(item);
if payload_bytes == 0 || replacement_bytes == 0 {
raw
} else {
raw.saturating_sub(payload_bytes)
.saturating_add(replacement_bytes)
}
}
}
}
/// Approximate model-visible byte cost for one resized image input.
pub const RESIZED_IMAGE_BYTES_ESTIMATE: i64 = 7373;
const ORIGINAL_IMAGE_PATCH_SIZE: u32 = 32;
/// Maximum patch budget used when estimating `detail: "original"` image cost.
pub const ORIGINAL_IMAGE_MAX_PATCHES: usize = 10_000;
const ORIGINAL_IMAGE_ESTIMATE_CACHE_SIZE: usize = 32;
static ORIGINAL_IMAGE_ESTIMATE_CACHE: LazyLock<BlockingLruCache<[u8; 20], Option<i64>>> =
LazyLock::new(|| {
BlockingLruCache::new(
NonZeroUsize::new(ORIGINAL_IMAGE_ESTIMATE_CACHE_SIZE).unwrap_or(NonZeroUsize::MIN),
)
});
fn parse_base64_image_data_url(url: &str) -> Option<&str> {
if !url
.get(.."data:".len())
.is_some_and(|prefix| prefix.eq_ignore_ascii_case("data:"))
{
return None;
}
let comma_index = url.find(',')?;
let metadata = &url[..comma_index];
let payload = &url[comma_index + 1..];
let metadata_without_scheme = &metadata["data:".len()..];
let mut metadata_parts = metadata_without_scheme.split(';');
let mime_type = metadata_parts.next().unwrap_or_default();
let has_base64_marker = metadata_parts.any(|part| part.eq_ignore_ascii_case("base64"));
if !mime_type
.get(.."image/".len())
.is_some_and(|prefix| prefix.eq_ignore_ascii_case("image/"))
{
return None;
}
if !has_base64_marker {
return None;
}
Some(payload)
}
fn estimate_original_image_bytes(image_url: &str) -> Option<i64> {
let key = sha1_digest(image_url.as_bytes());
ORIGINAL_IMAGE_ESTIMATE_CACHE.get_or_insert_with(key, || {
let payload = match parse_base64_image_data_url(image_url) {
Some(payload) => payload,
None => {
tracing::trace!("skipping original-detail estimate for non-base64 image data URL");
return None;
}
};
let bytes = match BASE64_STANDARD.decode(payload) {
Ok(bytes) => bytes,
Err(error) => {
tracing::trace!("failed to decode original-detail image payload: {error}");
return None;
}
};
let dynamic = match image::load_from_memory(&bytes) {
Ok(dynamic) => dynamic,
Err(error) => {
tracing::trace!("failed to decode original-detail image bytes: {error}");
return None;
}
};
let width = i64::from(dynamic.width());
let height = i64::from(dynamic.height());
let patch_size = i64::from(ORIGINAL_IMAGE_PATCH_SIZE);
let patches_wide = width.saturating_add(patch_size.saturating_sub(1)) / patch_size;
let patches_high = height.saturating_add(patch_size.saturating_sub(1)) / patch_size;
let patch_count = patches_wide.saturating_mul(patches_high);
let patch_count = usize::try_from(patch_count).unwrap_or(usize::MAX);
let patch_count = patch_count.min(ORIGINAL_IMAGE_MAX_PATCHES);
Some(i64::try_from(approx_bytes_for_tokens(patch_count)).unwrap_or(i64::MAX))
})
}
fn image_data_url_estimate_adjustment(item: &ResponseItem) -> (i64, i64) {
let mut payload_bytes = 0i64;
let mut replacement_bytes = 0i64;
let mut accumulate = |image_url: &str, detail: Option<ImageDetail>| {
if let Some(payload_len) = parse_base64_image_data_url(image_url).map(str::len) {
payload_bytes =
payload_bytes.saturating_add(i64::try_from(payload_len).unwrap_or(i64::MAX));
replacement_bytes = replacement_bytes.saturating_add(match detail {
Some(ImageDetail::Original) => {
estimate_original_image_bytes(image_url).unwrap_or(RESIZED_IMAGE_BYTES_ESTIMATE)
}
_ => RESIZED_IMAGE_BYTES_ESTIMATE,
});
}
};
match item {
ResponseItem::Message { content, .. } => {
for content_item in content {
if let ContentItem::InputImage { image_url, detail } = content_item {
accumulate(image_url, *detail);
}
}
}
ResponseItem::FunctionCallOutput { output, .. }
| ResponseItem::CustomToolCallOutput { output, .. } => {
if let FunctionCallOutputBody::ContentItems(items) = &output.body {
for content_item in items {
if let FunctionCallOutputContentItem::InputImage { image_url, detail } =
content_item
{
accumulate(image_url, *detail);
}
}
}
}
_ => {}
}
(payload_bytes, replacement_bytes)
}

View File

@@ -0,0 +1,26 @@
//! Pure history utilities for normalizing, classifying, rewriting, and estimating prompt-ready
//! history items.
mod classify;
mod estimate;
mod normalize;
mod transform;
#[cfg(test)]
mod tests;
pub use classify::is_api_message;
pub use classify::is_codex_generated_item;
pub use classify::is_model_generated_item;
pub use classify::is_user_turn_boundary;
pub use classify::user_turn_boundary_positions;
pub use estimate::ORIGINAL_IMAGE_MAX_PATCHES;
pub use estimate::RESIZED_IMAGE_BYTES_ESTIMATE;
pub use estimate::estimate_item_token_count;
pub use estimate::estimate_response_item_model_visible_bytes;
pub use normalize::ensure_call_outputs_present;
pub use normalize::remove_corresponding_for;
pub use normalize::remove_orphan_outputs;
pub use normalize::strip_images_when_unsupported;
pub use transform::replace_last_turn_images;
pub use transform::truncate_history_item;

View File

@@ -3,24 +3,21 @@ use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::InputModality;
use std::collections::HashSet;
use crate::util::error_or_panic;
use tracing::error;
use tracing::info;
const IMAGE_CONTENT_OMITTED_PLACEHOLDER: &str =
"image content omitted because you do not support image input";
pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
// Collect synthetic outputs to insert immediately after their calls.
// Store the insertion position (index of call) alongside the item so
// we can insert in reverse order and avoid index shifting.
/// Ensures every tool-call item has a matching output item, inserting synthetic aborted outputs
/// immediately after calls that are missing one.
pub fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
let mut missing_outputs_to_insert: Vec<(usize, ResponseItem)> = Vec::new();
for (idx, item) in items.iter().enumerate() {
for (index, item) in items.iter().enumerate() {
match item {
ResponseItem::FunctionCall { call_id, .. } => {
let has_output = items.iter().any(|i| match i {
let has_output = items.iter().any(|other| match other {
ResponseItem::FunctionCallOutput {
call_id: existing, ..
} => existing == call_id,
@@ -30,7 +27,7 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
if !has_output {
info!("Function call output is missing for call id: {call_id}");
missing_outputs_to_insert.push((
idx,
index,
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: FunctionCallOutputPayload::from_text("aborted".to_string()),
@@ -42,7 +39,7 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
call_id: Some(call_id),
..
} => {
let has_output = items.iter().any(|i| match i {
let has_output = items.iter().any(|other| match other {
ResponseItem::ToolSearchOutput {
call_id: Some(existing),
..
@@ -53,7 +50,7 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
if !has_output {
info!("Tool search output is missing for call id: {call_id}");
missing_outputs_to_insert.push((
idx,
index,
ResponseItem::ToolSearchOutput {
call_id: Some(call_id.clone()),
status: "completed".to_string(),
@@ -64,7 +61,7 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
}
}
ResponseItem::CustomToolCall { call_id, .. } => {
let has_output = items.iter().any(|i| match i {
let has_output = items.iter().any(|other| match other {
ResponseItem::CustomToolCallOutput {
call_id: existing, ..
} => existing == call_id,
@@ -72,11 +69,11 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
});
if !has_output {
error_or_panic(format!(
report_invariant_violation(format!(
"Custom tool call output is missing for call id: {call_id}"
));
missing_outputs_to_insert.push((
idx,
index,
ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
name: None,
@@ -85,10 +82,9 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
));
}
}
// LocalShellCall is represented in upstream streams by a FunctionCallOutput
ResponseItem::LocalShellCall { call_id, .. } => {
if let Some(call_id) = call_id.as_ref() {
let has_output = items.iter().any(|i| match i {
let has_output = items.iter().any(|other| match other {
ResponseItem::FunctionCallOutput {
call_id: existing, ..
} => existing == call_id,
@@ -96,11 +92,11 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
});
if !has_output {
error_or_panic(format!(
report_invariant_violation(format!(
"Local shell call output is missing for call id: {call_id}"
));
missing_outputs_to_insert.push((
idx,
index,
ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: FunctionCallOutputPayload::from_text("aborted".to_string()),
@@ -113,57 +109,57 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
}
}
// Insert synthetic outputs in reverse index order to avoid re-indexing.
for (idx, output_item) in missing_outputs_to_insert.into_iter().rev() {
items.insert(idx + 1, output_item);
for (index, output_item) in missing_outputs_to_insert.into_iter().rev() {
items.insert(index + 1, output_item);
}
}
pub(crate) fn remove_orphan_outputs(items: &mut Vec<ResponseItem>) {
let function_call_ids: HashSet<String> = items
/// Removes output items whose corresponding tool-call items no longer exist.
pub fn remove_orphan_outputs(items: &mut Vec<ResponseItem>) {
let function_call_ids = items
.iter()
.filter_map(|i| match i {
.filter_map(|item| match item {
ResponseItem::FunctionCall { call_id, .. } => Some(call_id.clone()),
_ => None,
})
.collect();
.collect::<std::collections::HashSet<_>>();
let tool_search_call_ids: HashSet<String> = items
let tool_search_call_ids = items
.iter()
.filter_map(|i| match i {
.filter_map(|item| match item {
ResponseItem::ToolSearchCall {
call_id: Some(call_id),
..
} => Some(call_id.clone()),
_ => None,
})
.collect();
.collect::<std::collections::HashSet<_>>();
let local_shell_call_ids: HashSet<String> = items
let local_shell_call_ids = items
.iter()
.filter_map(|i| match i {
.filter_map(|item| match item {
ResponseItem::LocalShellCall {
call_id: Some(call_id),
..
} => Some(call_id.clone()),
_ => None,
})
.collect();
.collect::<std::collections::HashSet<_>>();
let custom_tool_call_ids: HashSet<String> = items
let custom_tool_call_ids = items
.iter()
.filter_map(|i| match i {
.filter_map(|item| match item {
ResponseItem::CustomToolCall { call_id, .. } => Some(call_id.clone()),
_ => None,
})
.collect();
.collect::<std::collections::HashSet<_>>();
items.retain(|item| match item {
ResponseItem::FunctionCallOutput { call_id, .. } => {
let has_match =
function_call_ids.contains(call_id) || local_shell_call_ids.contains(call_id);
if !has_match {
error_or_panic(format!(
report_invariant_violation(format!(
"Orphan function call output for call id: {call_id}"
));
}
@@ -172,7 +168,7 @@ pub(crate) fn remove_orphan_outputs(items: &mut Vec<ResponseItem>) {
ResponseItem::CustomToolCallOutput { call_id, .. } => {
let has_match = custom_tool_call_ids.contains(call_id);
if !has_match {
error_or_panic(format!(
report_invariant_violation(format!(
"Orphan custom tool call output for call id: {call_id}"
));
}
@@ -185,7 +181,9 @@ pub(crate) fn remove_orphan_outputs(items: &mut Vec<ResponseItem>) {
} => {
let has_match = tool_search_call_ids.contains(call_id);
if !has_match {
error_or_panic(format!("Orphan tool search output for call id: {call_id}"));
report_invariant_violation(format!(
"Orphan tool search output for call id: {call_id}"
));
}
has_match
}
@@ -194,12 +192,13 @@ pub(crate) fn remove_orphan_outputs(items: &mut Vec<ResponseItem>) {
});
}
pub(crate) fn remove_corresponding_for(items: &mut Vec<ResponseItem>, item: &ResponseItem) {
/// Removes the matching paired item for `item` from the history, if one exists.
pub fn remove_corresponding_for(items: &mut Vec<ResponseItem>, item: &ResponseItem) {
match item {
ResponseItem::FunctionCall { call_id, .. } => {
remove_first_matching(items, |i| {
remove_first_matching(items, |other| {
matches!(
i,
other,
ResponseItem::FunctionCallOutput {
call_id: existing, ..
} if existing == call_id
@@ -207,23 +206,35 @@ pub(crate) fn remove_corresponding_for(items: &mut Vec<ResponseItem>, item: &Res
});
}
ResponseItem::FunctionCallOutput { call_id, .. } => {
if let Some(pos) = items.iter().position(|i| {
matches!(i, ResponseItem::FunctionCall { call_id: existing, .. } if existing == call_id)
if let Some(position) = items.iter().position(|other| {
matches!(
other,
ResponseItem::FunctionCall {
call_id: existing,
..
} if existing == call_id
)
}) {
items.remove(pos);
} else if let Some(pos) = items.iter().position(|i| {
matches!(i, ResponseItem::LocalShellCall { call_id: Some(existing), .. } if existing == call_id)
items.remove(position);
} else if let Some(position) = items.iter().position(|other| {
matches!(
other,
ResponseItem::LocalShellCall {
call_id: Some(existing),
..
} if existing == call_id
)
}) {
items.remove(pos);
items.remove(position);
}
}
ResponseItem::ToolSearchCall {
call_id: Some(call_id),
..
} => {
remove_first_matching(items, |i| {
remove_first_matching(items, |other| {
matches!(
i,
other,
ResponseItem::ToolSearchOutput {
call_id: Some(existing),
..
@@ -235,23 +246,20 @@ pub(crate) fn remove_corresponding_for(items: &mut Vec<ResponseItem>, item: &Res
call_id: Some(call_id),
..
} => {
remove_first_matching(
items,
|i| {
matches!(
i,
ResponseItem::ToolSearchCall {
call_id: Some(existing),
..
} if existing == call_id
)
},
);
remove_first_matching(items, |other| {
matches!(
other,
ResponseItem::ToolSearchCall {
call_id: Some(existing),
..
} if existing == call_id
)
});
}
ResponseItem::CustomToolCall { call_id, .. } => {
remove_first_matching(items, |i| {
remove_first_matching(items, |other| {
matches!(
i,
other,
ResponseItem::CustomToolCallOutput {
call_id: existing, ..
} if existing == call_id
@@ -259,18 +267,23 @@ pub(crate) fn remove_corresponding_for(items: &mut Vec<ResponseItem>, item: &Res
});
}
ResponseItem::CustomToolCallOutput { call_id, .. } => {
remove_first_matching(
items,
|i| matches!(i, ResponseItem::CustomToolCall { call_id: existing, .. } if existing == call_id),
);
remove_first_matching(items, |other| {
matches!(
other,
ResponseItem::CustomToolCall {
call_id: existing,
..
} if existing == call_id
)
});
}
ResponseItem::LocalShellCall {
call_id: Some(call_id),
..
} => {
remove_first_matching(items, |i| {
remove_first_matching(items, |other| {
matches!(
i,
other,
ResponseItem::FunctionCallOutput {
call_id: existing, ..
} if existing == call_id
@@ -281,23 +294,15 @@ pub(crate) fn remove_corresponding_for(items: &mut Vec<ResponseItem>, item: &Res
}
}
fn remove_first_matching<F>(items: &mut Vec<ResponseItem>, predicate: F)
where
F: Fn(&ResponseItem) -> bool,
{
if let Some(pos) = items.iter().position(predicate) {
items.remove(pos);
}
}
/// Strip image content from messages and tool outputs when the model does not support images.
/// When `input_modalities` contains `InputModality::Image`, no stripping is performed.
pub(crate) fn strip_images_when_unsupported(
/// Strips image inputs from messages and tool outputs when the model does not support images.
///
/// Image-generation call results are cleared in the same mode so text-only prompts do not carry
/// image payloads forward.
pub fn strip_images_when_unsupported(
input_modalities: &[InputModality],
items: &mut [ResponseItem],
) {
let supports_images = input_modalities.contains(&InputModality::Image);
if supports_images {
if input_modalities.contains(&InputModality::Image) {
return;
}
@@ -343,3 +348,20 @@ pub(crate) fn strip_images_when_unsupported(
}
}
}
fn remove_first_matching<F>(items: &mut Vec<ResponseItem>, predicate: F)
where
F: Fn(&ResponseItem) -> bool,
{
if let Some(position) = items.iter().position(predicate) {
items.remove(position);
}
}
fn report_invariant_violation(message: String) {
if cfg!(debug_assertions) {
panic!("{message}");
} else {
error!("{message}");
}
}

View File

@@ -0,0 +1,286 @@
use super::ensure_call_outputs_present;
use super::estimate_response_item_model_visible_bytes;
use super::is_user_turn_boundary;
use super::remove_corresponding_for;
use super::replace_last_turn_images;
use super::strip_images_when_unsupported;
use super::truncate_history_item;
use codex_protocol::AgentPath;
use codex_protocol::models::ContentItem;
use codex_protocol::models::DEFAULT_IMAGE_DETAIL;
use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::InputModality;
use codex_protocol::protocol::InterAgentCommunication;
use codex_utils_output_truncation::TruncationPolicy;
use pretty_assertions::assert_eq;
fn user_message(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: text.to_string(),
}],
phase: None,
}
}
fn assistant_message(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
}],
phase: None,
}
}
#[test]
fn ensure_call_outputs_present_inserts_missing_function_output_after_call() {
let mut items = vec![ResponseItem::FunctionCall {
id: None,
name: "shell".to_string(),
namespace: None,
arguments: "{}".to_string(),
call_id: "call-1".to_string(),
}];
ensure_call_outputs_present(&mut items);
assert_eq!(
items,
vec![
ResponseItem::FunctionCall {
id: None,
name: "shell".to_string(),
namespace: None,
arguments: "{}".to_string(),
call_id: "call-1".to_string(),
},
ResponseItem::FunctionCallOutput {
call_id: "call-1".to_string(),
output: FunctionCallOutputPayload::from_text("aborted".to_string()),
},
]
);
}
#[test]
fn remove_corresponding_for_removes_matching_tool_output() {
let removed = ResponseItem::FunctionCall {
id: None,
name: "shell".to_string(),
namespace: None,
arguments: "{}".to_string(),
call_id: "call-1".to_string(),
};
let mut items = vec![
removed.clone(),
ResponseItem::FunctionCallOutput {
call_id: "call-1".to_string(),
output: FunctionCallOutputPayload::from_text("done".to_string()),
},
];
items.remove(0);
remove_corresponding_for(&mut items, &removed);
assert_eq!(items, Vec::new());
}
#[test]
fn strip_images_when_unsupported_rewrites_messages_and_tool_outputs() {
let mut items = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![
ContentItem::InputText {
text: "look".to_string(),
},
ContentItem::InputImage {
image_url: "https://example.com/img.png".to_string(),
detail: Some(DEFAULT_IMAGE_DETAIL),
},
],
phase: None,
},
ResponseItem::FunctionCallOutput {
call_id: "call-1".to_string(),
output: FunctionCallOutputPayload::from_content_items(vec![
FunctionCallOutputContentItem::InputImage {
image_url: "https://example.com/tool.png".to_string(),
detail: Some(DEFAULT_IMAGE_DETAIL),
},
]),
},
ResponseItem::ImageGenerationCall {
id: "ig-1".to_string(),
status: "completed".to_string(),
revised_prompt: None,
result: "Zm9v".to_string(),
},
];
strip_images_when_unsupported(&[InputModality::Text], &mut items);
assert_eq!(
items,
vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![
ContentItem::InputText {
text: "look".to_string(),
},
ContentItem::InputText {
text: "image content omitted because you do not support image input"
.to_string(),
},
],
phase: None,
},
ResponseItem::FunctionCallOutput {
call_id: "call-1".to_string(),
output: FunctionCallOutputPayload::from_content_items(vec![
FunctionCallOutputContentItem::InputText {
text: "image content omitted because you do not support image input"
.to_string(),
},
]),
},
ResponseItem::ImageGenerationCall {
id: "ig-1".to_string(),
status: "completed".to_string(),
revised_prompt: None,
result: String::new(),
},
]
);
}
#[test]
fn inter_agent_assistant_messages_are_turn_boundaries() {
let communication = InterAgentCommunication::new(
AgentPath::root(),
AgentPath::root()
.join("worker")
.expect("sub-agent path should be valid"),
Vec::new(),
"continue".to_string(),
/*trigger_turn*/ true,
);
let item = ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: serde_json::to_string(&communication).expect("message should serialize"),
}],
phase: None,
};
assert!(is_user_turn_boundary(&item));
}
#[test]
fn estimate_response_item_model_visible_bytes_discounts_inline_image_data_urls() {
let payload = "a".repeat(20_000);
let image_item = ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputImage {
image_url: format!("data:image/png;base64,{payload}"),
detail: Some(DEFAULT_IMAGE_DETAIL),
}],
phase: None,
};
let estimated = estimate_response_item_model_visible_bytes(&image_item);
let raw = i64::try_from(
serde_json::to_string(&image_item)
.expect("item should serialize")
.len(),
)
.expect("raw length should fit");
assert!(estimated > 0);
assert!(estimated < raw);
}
#[test]
fn truncate_history_item_truncates_tool_outputs_but_not_messages() {
let output = "x".repeat(20_000);
let function_output = ResponseItem::FunctionCallOutput {
call_id: "call-1".to_string(),
output: FunctionCallOutputPayload::from_text(output),
};
let user = user_message("hello");
let truncated = truncate_history_item(
&function_output,
TruncationPolicy::Tokens(/*max_tokens*/ 64),
);
assert_ne!(truncated, function_output);
assert_eq!(
truncate_history_item(&user, TruncationPolicy::Tokens(/*max_tokens*/ 64)),
user
);
}
#[test]
fn replace_last_turn_images_only_rewrites_latest_tool_output() {
let mut items = vec![
assistant_message("already done"),
ResponseItem::FunctionCallOutput {
call_id: "call-1".to_string(),
output: FunctionCallOutputPayload::from_content_items(vec![
FunctionCallOutputContentItem::InputImage {
image_url: "https://example.com/older.png".to_string(),
detail: Some(DEFAULT_IMAGE_DETAIL),
},
]),
},
user_message("new turn"),
ResponseItem::FunctionCallOutput {
call_id: "call-2".to_string(),
output: FunctionCallOutputPayload::from_content_items(vec![
FunctionCallOutputContentItem::InputImage {
image_url: "https://example.com/newer.png".to_string(),
detail: Some(DEFAULT_IMAGE_DETAIL),
},
]),
},
];
let replaced = replace_last_turn_images(&mut items, "omitted");
assert!(replaced);
assert_eq!(
items[1],
ResponseItem::FunctionCallOutput {
call_id: "call-1".to_string(),
output: FunctionCallOutputPayload::from_content_items(vec![
FunctionCallOutputContentItem::InputImage {
image_url: "https://example.com/older.png".to_string(),
detail: Some(DEFAULT_IMAGE_DETAIL),
},
]),
}
);
assert_eq!(
items[3],
ResponseItem::FunctionCallOutput {
call_id: "call-2".to_string(),
output: FunctionCallOutputPayload::from_content_items(vec![
FunctionCallOutputContentItem::InputText {
text: "omitted".to_string(),
},
]),
}
);
}

View File

@@ -0,0 +1,92 @@
use crate::history::is_user_turn_boundary;
use codex_protocol::models::FunctionCallOutputBody;
use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseItem;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::truncate_function_output_items_with_policy;
use codex_utils_output_truncation::truncate_text;
/// Truncates one history item before it is recorded in the in-memory journal.
///
/// Tool outputs receive a small serialization headroom multiplier so the JSON wrapper bytes do not
/// cause unexpected overages after truncation.
pub fn truncate_history_item(item: &ResponseItem, policy: TruncationPolicy) -> ResponseItem {
let policy_with_serialization_budget = policy * 1.2;
match item {
ResponseItem::FunctionCallOutput { call_id, output } => ResponseItem::FunctionCallOutput {
call_id: call_id.clone(),
output: truncate_function_output_payload(output, policy_with_serialization_budget),
},
ResponseItem::CustomToolCallOutput {
call_id,
name,
output,
} => ResponseItem::CustomToolCallOutput {
call_id: call_id.clone(),
name: name.clone(),
output: truncate_function_output_payload(output, policy_with_serialization_budget),
},
ResponseItem::Message { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::LocalShellCall { .. }
| ResponseItem::FunctionCall { .. }
| ResponseItem::ToolSearchCall { .. }
| ResponseItem::ToolSearchOutput { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::ImageGenerationCall { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::Compaction { .. }
| ResponseItem::Other => item.clone(),
}
}
/// Replaces image content in the last tool-output item of the current turn.
pub fn replace_last_turn_images(items: &mut [ResponseItem], placeholder: &str) -> bool {
let Some(index) = items.iter().rposition(|item| {
matches!(item, ResponseItem::FunctionCallOutput { .. }) || is_user_turn_boundary(item)
}) else {
return false;
};
match &mut items[index] {
ResponseItem::FunctionCallOutput { output, .. } => {
let Some(content_items) = output.content_items_mut() else {
return false;
};
let mut replaced = false;
let placeholder = placeholder.to_string();
for item in content_items.iter_mut() {
if matches!(item, FunctionCallOutputContentItem::InputImage { .. }) {
*item = FunctionCallOutputContentItem::InputText {
text: placeholder.clone(),
};
replaced = true;
}
}
replaced
}
ResponseItem::Message { .. } => false,
_ => false,
}
}
fn truncate_function_output_payload(
output: &FunctionCallOutputPayload,
policy: TruncationPolicy,
) -> FunctionCallOutputPayload {
let body = match &output.body {
FunctionCallOutputBody::Text(content) => {
FunctionCallOutputBody::Text(truncate_text(content, policy))
}
FunctionCallOutputBody::ContentItems(items) => FunctionCallOutputBody::ContentItems(
truncate_function_output_items_with_policy(items, policy),
),
};
FunctionCallOutputPayload {
body,
success: output.success,
}
}

View File

@@ -0,0 +1,545 @@
use crate::JournalEntry;
use crate::JournalError;
use crate::JournalItem;
use crate::JournalKey;
use crate::KeyFilter;
use crate::MetadataEntryBuilder;
use crate::PromptView;
use crate::Result;
use codex_protocol::journal::JournalCheckpointItem;
use codex_protocol::journal::JournalContextAudience;
use codex_protocol::journal::JournalContextForkBehavior;
use codex_protocol::journal::JournalHistoryCursor;
use codex_protocol::journal::JournalMetadataItem;
use codex_protocol::journal::JournalReplacePrefixCheckpoint;
use codex_protocol::journal::JournalTranscriptItem;
use codex_protocol::journal::JournalTruncateHistoryCheckpoint;
use codex_protocol::models::ResponseItem;
use indexmap::IndexMap;
use std::fs::File;
use std::io::BufRead;
use std::io::BufReader;
use std::io::BufWriter;
use std::io::Write;
use std::path::Path;
/// Canonical typed journal.
#[derive(Debug, Clone, Default, PartialEq)]
pub struct Journal {
entries: Vec<JournalEntry>,
}
impl Journal {
/// Creates an empty journal.
pub fn new() -> Self {
Self::default()
}
/// Creates a journal from an explicit list of entries.
pub fn from_entries(entries: Vec<JournalEntry>) -> Self {
Self { entries }
}
/// Appends one keyed item to the journal.
pub fn add<K, T>(&mut self, key: K, item: T)
where
K: Into<JournalKey>,
T: Into<JournalItem>,
{
self.entries.push(JournalEntry::new(key, item));
}
/// Appends several keyed journal entries.
pub fn extend<I, T>(&mut self, entries: I)
where
I: IntoIterator<Item = T>,
T: Into<JournalEntry>,
{
self.entries.extend(entries.into_iter().map(Into::into));
}
/// Returns the raw append-only journal entries.
pub fn entries(&self) -> &[JournalEntry] {
&self.entries
}
/// Starts building one prompt-metadata entry.
pub fn metadata_entry_builder(
key: impl Into<JournalKey>,
message: impl Into<crate::PromptMessage>,
) -> MetadataEntryBuilder {
MetadataEntryBuilder::new(key, message)
}
/// Builds one prompt-metadata entry if the message is non-empty after trimming text content.
pub fn metadata_entry(
key: impl Into<JournalKey>,
prompt_order: i64,
message: impl Into<crate::PromptMessage>,
) -> Option<JournalEntry> {
Self::metadata_entry_builder(key, message)
.prompt_order(prompt_order)
.build()
}
pub fn context_entry_builder(
key: impl Into<JournalKey>,
message: impl Into<crate::PromptMessage>,
) -> MetadataEntryBuilder {
Self::metadata_entry_builder(key, message)
}
pub fn context_entry(
key: impl Into<JournalKey>,
prompt_order: i64,
message: impl Into<crate::PromptMessage>,
) -> Option<JournalEntry> {
Self::metadata_entry(key, prompt_order, message)
}
/// Returns a journal containing only journal entries whose keys match the filter.
pub fn filter(&self, filter: &KeyFilter) -> Self {
let entries = self
.entries
.iter()
.filter(|entry| filter.matches(&entry.key))
.cloned()
.collect();
Self::from_entries(entries)
}
/// Returns the number of journal entries.
pub fn len(&self) -> usize {
self.entries.len()
}
/// Returns whether the journal is empty.
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
/// Renders the current effective journal view into model prompt items.
pub fn to_prompt(&self, view: &PromptView) -> Result<Vec<ResponseItem>> {
self.to_prompt_matching_filter(view, None)
}
/// Renders the current effective journal view into model prompt items after selecting only
/// journal entries whose keys match the filter.
pub fn to_prompt_with_filter(
&self,
view: &PromptView,
filter: &KeyFilter,
) -> Result<Vec<ResponseItem>> {
self.to_prompt_matching_filter(view, Some(filter))
}
/// Produces a flattened child state for the provided view.
pub fn fork(&self, view: &PromptView) -> Result<Self> {
self.fork_matching_filter(view, None)
}
/// Produces a flattened child state for the provided view after selecting only
/// journal entries whose keys match the filter.
pub fn fork_with_filter(&self, view: &PromptView, filter: &KeyFilter) -> Result<Self> {
self.fork_matching_filter(view, Some(filter))
}
/// Drops obsolete journal entries and keeps only the current effective journal view.
///
/// This is the first building block for a rolling in-memory window: callers can
/// persist the full journal elsewhere, then keep only the flattened journal hot.
pub fn flatten(&self) -> Result<Self> {
let resolved = self.resolve()?;
Ok(Self::from_entries(resolved.into_entries()))
}
/// Keeps only the current effective journal view plus the history suffix that starts
/// at the resolved cursor.
///
/// This is a lightweight rolling-window helper: callers can persist the full
/// journal on disk, then keep only a recent hot suffix in memory.
pub fn with_history_window(&self, start: &JournalHistoryCursor) -> Result<Self> {
let resolved = self.resolve()?;
let start_index = resolve_cursor(resolved.transcript().entries(), start)?;
let ResolvedJournal {
metadata,
transcript,
} = resolved;
Ok(Self::from_entries(
metadata
.into_iter()
.chain(transcript.into_iter().skip(start_index))
.collect(),
))
}
/// Persists the raw journal to a JSONL file, one `JournalEntry` per line.
pub fn persist_jsonl(&self, path: &Path) -> Result<()> {
let file = File::create(path)?;
let mut writer = BufWriter::new(file);
for entry in &self.entries {
serde_json::to_writer(&mut writer, entry)?;
writer.write_all(b"\n")?;
}
writer.flush()?;
Ok(())
}
/// Loads a raw journal from a JSONL file written by [`Self::persist_jsonl`].
pub fn load_jsonl(path: &Path) -> Result<Self> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let mut entries = Vec::new();
for (line_index, line) in reader.lines().enumerate() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let entry = serde_json::from_str::<JournalEntry>(&line).map_err(|source| {
JournalError::ParseJson {
line_number: line_index + 1,
source,
}
})?;
entries.push(entry);
}
Ok(Self::from_entries(entries))
}
/// Resolves the current effective journal into prompt metadata and transcript views.
pub fn resolve(&self) -> Result<ResolvedJournal> {
self.resolve_filter(None)
}
/// Resolves the current effective journal after selecting only entries whose keys match the
/// filter.
pub fn resolve_with_filter(&self, filter: &KeyFilter) -> Result<ResolvedJournal> {
self.resolve_filter(Some(filter))
}
fn to_prompt_matching_filter(
&self,
view: &PromptView,
filter: Option<&KeyFilter>,
) -> Result<Vec<ResponseItem>> {
let resolved = self.resolve_filter(filter)?;
let mut prompt: Vec<ResponseItem> = resolved
.metadata
.into_iter()
.filter(|entry| metadata_visible_in_view(metadata_item(entry), view))
.map(|entry| match entry.item {
JournalItem::Metadata(item) => ResponseItem::from(item),
_ => unreachable!("resolved metadata entries must be metadata items"),
})
.collect();
prompt.extend(
resolved
.transcript
.into_iter()
.map(|entry| match entry.item {
JournalItem::Transcript(item) => ResponseItem::from(item),
_ => unreachable!("resolved transcript entries must be transcript items"),
}),
);
Ok(prompt)
}
fn fork_matching_filter(&self, view: &PromptView, filter: Option<&KeyFilter>) -> Result<Self> {
let resolved = self.resolve_filter(filter)?;
Ok(Self::from_entries(
resolved
.metadata
.into_iter()
.filter(|entry| metadata_item(entry).on_fork == JournalContextForkBehavior::Keep)
.filter(|entry| metadata_visible_in_view(metadata_item(entry), view))
.chain(resolved.transcript)
.collect(),
))
}
fn resolve_filter(&self, filter: Option<&KeyFilter>) -> Result<ResolvedJournal> {
let mut transcript = Vec::<JournalEntry>::new();
let mut latest_metadata_by_key = IndexMap::<JournalKey, (usize, JournalEntry)>::new();
for (index, entry) in self.entries.iter().enumerate() {
if let Some(filter) = filter
&& !filter.matches(&entry.key)
{
continue;
}
match &entry.item {
JournalItem::Transcript(_) => transcript.push(entry.clone()),
JournalItem::Metadata(_) => {
latest_metadata_by_key.insert(entry.key.clone(), (index, entry.clone()));
}
JournalItem::Checkpoint(checkpoint) => {
apply_checkpoint(&mut transcript, &entry.key, checkpoint)?;
}
}
}
let mut metadata = latest_metadata_by_key.into_values().collect::<Vec<_>>();
metadata.sort_by(|(left_index, left_entry), (right_index, right_entry)| {
metadata_item(left_entry)
.prompt_order
.cmp(&metadata_item(right_entry).prompt_order)
.then_with(|| left_index.cmp(right_index))
});
Ok(ResolvedJournal::new(
metadata.into_iter().map(|(_, entry)| entry).collect(),
transcript,
))
}
}
/// Effective journal view after deduplicating prompt context and applying history checkpoints.
///
/// `contexts` and `history` are derived from the same append-only source of truth, but they have
/// different semantics:
///
/// - [`ResolvedMetadata`] contains keyed prompt-metadata entries. Later entries with the same key
/// replace earlier ones and the result is ordered by `prompt_order`.
/// - [`ResolvedTranscript`] contains ordered transcript entries after checkpoint application.
/// Transcript preserves order and can be truncated or rewritten by checkpoints.
#[derive(Debug, Clone, PartialEq)]
pub struct ResolvedJournal {
metadata: ResolvedMetadata,
transcript: ResolvedTranscript,
}
impl ResolvedJournal {
fn new(metadata: Vec<JournalEntry>, transcript: Vec<JournalEntry>) -> Self {
Self {
metadata: ResolvedMetadata::new(metadata),
transcript: ResolvedTranscript::new(transcript),
}
}
/// Returns the effective prompt-metadata view.
pub fn metadata(&self) -> &ResolvedMetadata {
&self.metadata
}
/// Returns the effective transcript view.
pub fn transcript(&self) -> &ResolvedTranscript {
&self.transcript
}
/// Returns the effective prompt-metadata view.
pub fn contexts(&self) -> &ResolvedMetadata {
self.metadata()
}
/// Returns the effective transcript view.
pub fn history(&self) -> &ResolvedTranscript {
self.transcript()
}
/// Consumes the resolved view and returns only the prompt-metadata entries.
pub fn into_metadata(self) -> ResolvedMetadata {
self.metadata
}
/// Consumes the resolved view and returns only the transcript entries.
pub fn into_transcript(self) -> ResolvedTranscript {
self.transcript
}
pub fn into_contexts(self) -> ResolvedMetadata {
self.into_metadata()
}
pub fn into_history(self) -> ResolvedTranscript {
self.into_transcript()
}
/// Consumes the resolved view and concatenates metadata followed by transcript entries.
pub fn into_entries(self) -> Vec<JournalEntry> {
self.metadata
.into_entries()
.into_iter()
.chain(self.transcript.into_entries())
.collect()
}
}
/// Effective prompt-metadata entries derived from a journal.
///
/// These entries are deduplicated by key and sorted for prompt rendering. They are suitable for
/// rendering or for building flattened and forked journal states.
#[derive(Debug, Clone, PartialEq)]
pub struct ResolvedMetadata {
entries: Vec<JournalEntry>,
}
impl ResolvedMetadata {
fn new(entries: Vec<JournalEntry>) -> Self {
Self { entries }
}
/// Returns the resolved prompt-metadata entries.
pub fn entries(&self) -> &[JournalEntry] {
&self.entries
}
/// Returns whether there are no resolved prompt-metadata entries.
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
/// Returns the number of resolved prompt-metadata entries.
pub fn len(&self) -> usize {
self.entries.len()
}
/// Consumes the view and returns its entries.
pub fn into_entries(self) -> Vec<JournalEntry> {
self.entries
}
}
impl IntoIterator for ResolvedMetadata {
type Item = JournalEntry;
type IntoIter = std::vec::IntoIter<JournalEntry>;
fn into_iter(self) -> Self::IntoIter {
self.entries.into_iter()
}
}
/// Effective transcript entries derived from a journal.
///
/// These entries preserve prompt-visible order after checkpoint application. Unlike prompt
/// metadata, transcript is not deduplicated by key.
#[derive(Debug, Clone, PartialEq)]
pub struct ResolvedTranscript {
entries: Vec<JournalEntry>,
}
impl ResolvedTranscript {
fn new(entries: Vec<JournalEntry>) -> Self {
Self { entries }
}
/// Returns the resolved transcript entries.
pub fn entries(&self) -> &[JournalEntry] {
&self.entries
}
/// Returns whether there are no resolved transcript entries.
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
/// Returns the number of resolved transcript entries.
pub fn len(&self) -> usize {
self.entries.len()
}
/// Consumes the view and returns its entries.
pub fn into_entries(self) -> Vec<JournalEntry> {
self.entries
}
}
impl IntoIterator for ResolvedTranscript {
type Item = JournalEntry;
type IntoIter = std::vec::IntoIter<JournalEntry>;
fn into_iter(self) -> Self::IntoIter {
self.entries.into_iter()
}
}
fn apply_checkpoint(
transcript: &mut Vec<JournalEntry>,
checkpoint_key: &JournalKey,
checkpoint: &JournalCheckpointItem,
) -> Result<()> {
match checkpoint {
JournalCheckpointItem::ReplacePrefix(JournalReplacePrefixCheckpoint {
through,
replacement,
}) => {
let keep_from = resolve_cursor(transcript.as_slice(), through)?;
let mut next_transcript =
Vec::with_capacity(replacement.len() + transcript.len().saturating_sub(keep_from));
next_transcript.extend(
replacement
.iter()
.cloned()
.enumerate()
.map(|(index, item)| {
JournalEntry::new(
replacement_transcript_key(checkpoint_key, index, &item),
item,
)
}),
);
next_transcript.extend(transcript[keep_from..].iter().cloned());
*transcript = next_transcript;
Ok(())
}
JournalCheckpointItem::TruncateHistory(JournalTruncateHistoryCheckpoint { through }) => {
let keep_len = resolve_cursor(transcript.as_slice(), through)?;
transcript.truncate(keep_len);
Ok(())
}
}
}
fn resolve_cursor(transcript: &[JournalEntry], cursor: &JournalHistoryCursor) -> Result<usize> {
match cursor {
JournalHistoryCursor::Start => Ok(0),
JournalHistoryCursor::End => Ok(transcript.len()),
JournalHistoryCursor::AfterItem(history_item_id) => transcript
.iter()
.position(|entry| transcript_item(entry).id == *history_item_id)
.map(|index| index + 1)
.ok_or_else(|| JournalError::UnknownHistoryItemId {
history_item_id: history_item_id.clone(),
}),
}
}
fn replacement_transcript_key(
checkpoint_key: &JournalKey,
index: usize,
transcript_item: &JournalTranscriptItem,
) -> JournalKey {
checkpoint_key
.child("replacement")
.child(index.to_string())
.child(transcript_item.id.clone())
}
fn metadata_item(entry: &JournalEntry) -> &JournalMetadataItem {
match &entry.item {
JournalItem::Metadata(item) => item,
_ => unreachable!("resolved metadata entries must be metadata items"),
}
}
fn transcript_item(entry: &JournalEntry) -> &JournalTranscriptItem {
match &entry.item {
JournalItem::Transcript(item) => item,
_ => unreachable!("resolved transcript entries must be transcript items"),
}
}
fn metadata_visible_in_view(item: &JournalMetadataItem, view: &PromptView) -> bool {
match &item.audience {
JournalContextAudience::All => true,
JournalContextAudience::RootOnly => view.is_root,
JournalContextAudience::SubAgentsOnly => !view.is_root,
JournalContextAudience::AgentPathPrefix(prefix) => view
.agent_path
.as_deref()
.is_some_and(|agent_path| agent_path.starts_with(prefix)),
JournalContextAudience::AgentRole(role) => view
.agent_role
.as_deref()
.is_some_and(|agent_role| agent_role == role),
}
}

View File

@@ -0,0 +1,50 @@
//! Typed journal model for prompt rendering, filtering, forking, and persistence.
//!
//! A [`Journal`] stores one append-only sequence of [`JournalEntry`] values and resolves it into
//! two derived views:
//!
//! - prompt metadata: keyed, deduplicated entries ordered by `prompt_order`
//! - transcript: ordered prompt-visible items after checkpoint application
//!
//! Callers can resolve the journal once, then choose how to project those views into prompt
//! messages with [`PromptRenderer`].
mod context_builder;
mod error;
pub mod history;
mod journal;
mod prompt_view;
mod render;
#[cfg(test)]
mod tests;
pub use codex_protocol::journal::JournalCheckpointItem;
pub use codex_protocol::journal::JournalContextAudience;
pub use codex_protocol::journal::JournalContextForkBehavior;
pub use codex_protocol::journal::JournalEntry;
pub use codex_protocol::journal::JournalHistoryCursor;
pub use codex_protocol::journal::JournalItem;
pub use codex_protocol::journal::JournalKey;
pub use codex_protocol::journal::JournalMetadataItem;
pub use codex_protocol::journal::JournalReplacePrefixCheckpoint;
pub use codex_protocol::journal::JournalTranscriptItem;
pub use codex_protocol::journal::JournalTruncateHistoryCheckpoint;
pub use codex_protocol::journal::KeyFilter;
pub use codex_protocol::journal::PromptMessage;
pub use codex_protocol::journal::PromptMessageRole;
pub use context_builder::MetadataEntryBuilder;
pub use error::JournalError;
pub use error::Result;
pub use journal::Journal;
pub use journal::ResolvedJournal;
pub use journal::ResolvedMetadata;
pub use journal::ResolvedTranscript;
pub use prompt_view::PromptView;
pub use render::PromptRenderer;
pub type JournalContextItem = JournalMetadataItem;
pub type JournalHistoryItem = JournalTranscriptItem;
pub type ContextEntryBuilder = MetadataEntryBuilder;
pub type ResolvedContexts = ResolvedMetadata;
pub type ResolvedHistory = ResolvedTranscript;

View File

@@ -0,0 +1,33 @@
/// View configuration used when rendering or forking a journal.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct PromptView {
pub(crate) is_root: bool,
pub(crate) agent_path: Option<String>,
pub(crate) agent_role: Option<String>,
}
impl PromptView {
/// Returns the view for the root agent.
pub fn root() -> Self {
Self {
is_root: true,
agent_path: None,
agent_role: None,
}
}
/// Returns the view for a spawned subagent.
pub fn subagent(agent_path: impl Into<String>, agent_role: Option<String>) -> Self {
Self {
is_root: false,
agent_path: Some(agent_path.into()),
agent_role,
}
}
/// Sets the agent role used for audience matching.
pub fn with_agent_role(mut self, agent_role: impl Into<String>) -> Self {
self.agent_role = Some(agent_role.into());
self
}
}

View File

@@ -0,0 +1,86 @@
use crate::JournalItem;
use crate::KeyFilter;
use crate::PromptMessage;
use crate::PromptMessageRole;
use crate::ResolvedMetadata;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
/// Renders resolved prompt metadata into model-visible prompt messages.
///
/// Group filters are applied in declaration order. Entries are merged only when they are
/// consecutive, match the same group filter, and share the same role.
#[derive(Debug, Clone, Default)]
pub struct PromptRenderer {
group_filters: Vec<KeyFilter>,
}
impl PromptRenderer {
/// Creates a renderer with no grouping rules.
pub fn new() -> Self {
Self::default()
}
/// Adds one grouping rule for consecutive resolved metadata entries.
pub fn group(mut self, filter: KeyFilter) -> Self {
self.group_filters.push(filter);
self
}
/// Renders resolved prompt metadata according to the configured grouping rules.
pub fn render_metadata(&self, metadata: &ResolvedMetadata) -> Vec<ResponseItem> {
let mut rendered = Vec::new();
let mut current_group: Option<usize> = None;
let mut current_role: Option<PromptMessageRole> = None;
let mut current_content = Vec::new();
for entry in metadata.entries() {
let JournalItem::Metadata(item) = &entry.item else {
continue;
};
let group = self
.group_filters
.iter()
.position(|filter| filter.matches(&entry.key));
if group.is_none() {
flush_prompt_message(&mut rendered, &mut current_role, &mut current_content);
rendered.push(ResponseItem::from(item.clone()));
current_group = None;
continue;
}
if current_group != group || current_role != Some(item.message.role) {
flush_prompt_message(&mut rendered, &mut current_role, &mut current_content);
current_group = group;
current_role = Some(item.message.role);
}
current_content.extend(item.message.content.clone());
}
flush_prompt_message(&mut rendered, &mut current_role, &mut current_content);
rendered
}
pub fn render_contexts(&self, contexts: &ResolvedMetadata) -> Vec<ResponseItem> {
self.render_metadata(contexts)
}
}
fn flush_prompt_message(
rendered: &mut Vec<ResponseItem>,
role: &mut Option<PromptMessageRole>,
content: &mut Vec<ContentItem>,
) {
let Some(role) = role.take() else {
return;
};
if content.is_empty() {
return;
}
rendered.push(ResponseItem::from(PromptMessage::new(
role,
std::mem::take(content),
)));
}

View File

@@ -0,0 +1,516 @@
use crate::Journal;
use crate::JournalCheckpointItem;
use crate::JournalContextAudience;
use crate::JournalContextForkBehavior;
use crate::JournalEntry;
use crate::JournalHistoryCursor;
use crate::JournalMetadataItem;
use crate::JournalReplacePrefixCheckpoint;
use crate::JournalTranscriptItem;
use crate::JournalTruncateHistoryCheckpoint;
use crate::KeyFilter;
use crate::PromptMessage;
use crate::PromptRenderer;
use crate::PromptView;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use pretty_assertions::assert_eq;
use tempfile::tempdir;
fn user_message(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: text.to_string(),
}],
phase: None,
}
}
fn assistant_message(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
}],
phase: None,
}
}
fn developer_context(text: &str, prompt_order: i64) -> JournalMetadataItem {
JournalMetadataItem::new(PromptMessage::developer_text(text)).with_prompt_order(prompt_order)
}
#[test]
fn to_prompt_uses_latest_context_for_key() {
let mut state = Journal::new();
state.add(
["prompt", "permissions"],
developer_context("older permissions", 10),
);
state.add(["history", "hello"], user_message("hello"));
state.add(
["prompt", "permissions"],
developer_context("newer permissions", 10),
);
let prompt = state
.to_prompt(&PromptView::root())
.expect("prompt should render");
assert_eq!(
prompt,
vec![
ResponseItem::from(PromptMessage::developer_text("newer permissions")),
user_message("hello"),
]
);
}
#[test]
fn to_prompt_filters_context_by_audience() {
let mut state = Journal::new();
state.add(
["prompt", "root", "hint"],
developer_context("root-only", 0).with_audience(JournalContextAudience::RootOnly),
);
state.add(
["prompt", "child", "hint"],
developer_context("child-only", 1).with_audience(JournalContextAudience::SubAgentsOnly),
);
let root_prompt = state
.to_prompt(&PromptView::root())
.expect("root prompt should render");
let child_prompt = state
.to_prompt(&PromptView::subagent(
"/root/worker",
Option::<String>::None,
))
.expect("child prompt should render");
assert_eq!(
root_prompt,
vec![ResponseItem::from(PromptMessage::developer_text(
"root-only"
))]
);
assert_eq!(
child_prompt,
vec![ResponseItem::from(PromptMessage::developer_text(
"child-only"
))]
);
}
#[test]
fn to_prompt_with_filter_matches_key_prefix() {
let mut state = Journal::new();
state.add(["prompt", "root", "keep"], developer_context("keep me", 0));
state.add(["prompt", "child", "drop"], developer_context("drop me", 1));
let prompt = state
.to_prompt_with_filter(&PromptView::root(), &KeyFilter::prefix(["prompt", "root"]))
.expect("prompt should render");
assert_eq!(
prompt,
vec![ResponseItem::from(PromptMessage::developer_text("keep me"))]
);
}
#[test]
fn checkpoints_replace_prefix_and_then_truncate_history() {
let first = JournalTranscriptItem::new(user_message("turn 1"));
let second = JournalTranscriptItem::new(assistant_message("turn 1 answer"));
let third = JournalTranscriptItem::new(user_message("turn 2"));
let summary = JournalTranscriptItem {
id: "summary".to_string(),
turn_id: None,
item: assistant_message("summary"),
};
let state = Journal::from_entries(vec![
JournalEntry::new(["history", "1"], first),
JournalEntry::new(["history", "2"], second.clone()),
JournalEntry::new(["history", "3"], third),
JournalEntry::new(
["checkpoint", "replace"],
JournalCheckpointItem::ReplacePrefix(JournalReplacePrefixCheckpoint {
through: JournalHistoryCursor::AfterItem(second.id),
replacement: vec![summary.clone()],
}),
),
JournalEntry::new(
["checkpoint", "truncate"],
JournalCheckpointItem::TruncateHistory(JournalTruncateHistoryCheckpoint {
through: JournalHistoryCursor::AfterItem(summary.id),
}),
),
]);
let prompt = state
.to_prompt(&PromptView::root())
.expect("prompt should render");
assert_eq!(prompt, vec![assistant_message("summary")]);
}
#[test]
fn flatten_preserves_prompt_and_drops_obsolete_items() {
let first = JournalTranscriptItem::new(user_message("turn 1"));
let answer = JournalTranscriptItem::new(assistant_message("turn 1 answer"));
let summary = JournalTranscriptItem {
id: "summary".to_string(),
turn_id: None,
item: assistant_message("summary"),
};
let state = Journal::from_entries(vec![
JournalEntry::new(["prompt", "permissions"], developer_context("old", 0)),
JournalEntry::new(["prompt", "permissions"], developer_context("new", 0)),
JournalEntry::new(["history", "1"], first),
JournalEntry::new(["history", "2"], answer.clone()),
JournalEntry::new(
["checkpoint", "replace"],
JournalCheckpointItem::ReplacePrefix(JournalReplacePrefixCheckpoint {
through: JournalHistoryCursor::AfterItem(answer.id),
replacement: vec![summary.clone()],
}),
),
]);
let before = state
.to_prompt(&PromptView::root())
.expect("prompt should render");
let flattened = state.flatten().expect("flatten should succeed");
let after = flattened
.to_prompt(&PromptView::root())
.expect("flattened prompt should render");
assert_eq!(before, after);
assert_eq!(
flattened.entries(),
vec![
JournalEntry::new(["prompt", "permissions"], developer_context("new", 0),),
JournalEntry::new(
["checkpoint", "replace", "replacement", "0", "summary"],
summary,
),
]
);
}
#[test]
fn with_history_window_keeps_only_recent_effective_history() {
let first = JournalTranscriptItem::new(user_message("turn 1"));
let second = JournalTranscriptItem::new(assistant_message("turn 1 answer"));
let third = JournalTranscriptItem::new(user_message("turn 2"));
let state = Journal::from_entries(vec![
JournalEntry::new(
["prompt", "permissions", "current"],
developer_context("p", 0),
),
JournalEntry::new(["history", "1"], first),
JournalEntry::new(["history", "2"], second.clone()),
JournalEntry::new(["history", "3"], third.clone()),
]);
let windowed = state
.with_history_window(&JournalHistoryCursor::AfterItem(second.id))
.expect("windowing should succeed");
assert_eq!(
windowed.entries(),
vec![
JournalEntry::new(
["prompt", "permissions", "current"],
developer_context("p", 0),
),
JournalEntry::new(["history", "3"], third),
]
);
}
#[test]
fn fork_drops_non_keep_context_and_respects_audience() {
let history = JournalTranscriptItem::new(user_message("hello"));
let state = Journal::from_entries(vec![
JournalEntry::new(
["prompt", "child", "shared"],
developer_context("shared child context", 0)
.with_audience(JournalContextAudience::SubAgentsOnly),
),
JournalEntry::new(
["prompt", "child", "regenerate"],
developer_context("usage hint", 1)
.with_audience(JournalContextAudience::SubAgentsOnly)
.with_on_fork(JournalContextForkBehavior::Regenerate),
),
JournalEntry::new(
["prompt", "root", "only"],
developer_context("root only", 2).with_audience(JournalContextAudience::RootOnly),
),
JournalEntry::new(["history", "hello"], history.clone()),
]);
let forked = state
.fork(&PromptView::subagent(
"/root/worker",
Option::<String>::None,
))
.expect("fork should succeed");
assert_eq!(
forked.entries(),
vec![
JournalEntry::new(
["prompt", "child", "shared"],
developer_context("shared child context", 0)
.with_audience(JournalContextAudience::SubAgentsOnly),
),
JournalEntry::new(["history", "hello"], history),
]
);
}
#[test]
fn persist_and_load_jsonl_round_trip() {
let dir = tempdir().expect("tempdir");
let path = dir.path().join("journal.jsonl");
let history = JournalTranscriptItem::new(user_message("hello"));
let state = Journal::from_entries(vec![
JournalEntry::new(
["prompt", "permissions", "current"],
developer_context("p", 0),
),
JournalEntry::new(["history", "hello"], history),
]);
state
.persist_jsonl(path.as_path())
.expect("journal should persist");
let loaded = Journal::load_jsonl(path.as_path()).expect("journal should load");
assert_eq!(loaded, state);
}
#[test]
fn filter_returns_matching_raw_entries() {
let state = Journal::from_entries(vec![
JournalEntry::new(["prompt", "root", "keep"], developer_context("keep me", 0)),
JournalEntry::new(["prompt", "child", "drop"], developer_context("drop me", 1)),
JournalEntry::new(["history", "hello"], user_message("hello")),
]);
let filtered = state.filter(&KeyFilter::prefix(["prompt", "root"]));
assert_eq!(
filtered.entries(),
vec![JournalEntry::new(
["prompt", "root", "keep"],
developer_context("keep me", 0),
)]
);
}
#[test]
fn context_entry_skips_blank_text_messages() {
assert_eq!(
Journal::context_entry(
["prompt", "developer", "blank"],
10,
PromptMessage::developer_text(" "),
),
None
);
}
#[test]
fn context_entry_builder_carries_optional_fields() {
let entry = Journal::context_entry_builder(
["prompt", "developer", "one"],
PromptMessage::developer_text("first"),
)
.prompt_order(10)
.audience(JournalContextAudience::SubAgentsOnly)
.on_fork(JournalContextForkBehavior::Regenerate)
.tags(vec!["foo".to_string(), "bar".to_string()])
.source("unit-test")
.build()
.expect("entry should be kept");
assert_eq!(
entry,
JournalEntry::new(
["prompt", "developer", "one"],
JournalMetadataItem::new(PromptMessage::developer_text("first"))
.with_prompt_order(10)
.with_audience(JournalContextAudience::SubAgentsOnly)
.with_on_fork(JournalContextForkBehavior::Regenerate)
.with_tags(vec!["foo".to_string(), "bar".to_string()])
.with_source("unit-test"),
)
);
}
#[test]
fn resolve_splits_contexts_from_history() {
let history = JournalTranscriptItem::new(user_message("hello"));
let journal = Journal::from_entries(vec![
Journal::context_entry(
["prompt", "developer", "one"],
10,
PromptMessage::developer_text("first"),
)
.expect("entry should be kept"),
Journal::context_entry(
["prompt", "guardian", "one"],
20,
PromptMessage::developer_text("guardian one"),
)
.expect("entry should be kept"),
Journal::context_entry(
["prompt", "guardian", "two"],
30,
PromptMessage::developer_text("guardian two"),
)
.expect("entry should be kept"),
JournalEntry::new(["history", "hello"], history.clone()),
]);
let resolved = journal.resolve().expect("journal should resolve");
assert_eq!(
resolved.metadata().entries(),
vec![
Journal::context_entry(
["prompt", "developer", "one"],
10,
PromptMessage::developer_text("first"),
)
.expect("entry should be kept"),
Journal::context_entry(
["prompt", "guardian", "one"],
20,
PromptMessage::developer_text("guardian one"),
)
.expect("entry should be kept"),
Journal::context_entry(
["prompt", "guardian", "two"],
30,
PromptMessage::developer_text("guardian two"),
)
.expect("entry should be kept"),
]
);
assert_eq!(
resolved.transcript().entries(),
vec![JournalEntry::new(["history", "hello"], history)]
);
}
#[test]
fn prompt_renderer_groups_by_declared_prefix_and_role() {
let resolved = Journal::from_entries(vec![
Journal::context_entry(
["prompt", "developer", "one"],
10,
PromptMessage::developer_text("first"),
)
.expect("entry should be kept"),
Journal::context_entry(
["prompt", "developer", "two"],
20,
PromptMessage::developer_text("second"),
)
.expect("entry should be kept"),
Journal::context_entry(
["prompt", "contextual_user", "one"],
30,
PromptMessage::user_text("third"),
)
.expect("entry should be kept"),
])
.resolve()
.expect("entries should resolve");
let rendered = PromptRenderer::new()
.group(KeyFilter::prefix(["prompt", "developer"]))
.group(KeyFilter::prefix(["prompt", "contextual_user"]))
.render_metadata(resolved.metadata());
assert_eq!(
rendered,
vec![
ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![
ContentItem::InputText {
text: "first".to_string(),
},
ContentItem::InputText {
text: "second".to_string(),
},
],
phase: None,
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "third".to_string(),
}],
phase: None,
},
]
);
}
#[test]
fn prompt_renderer_leaves_ungrouped_entries_separate() {
let resolved = Journal::from_entries(vec![
Journal::context_entry(
["prompt", "developer", "one"],
10,
PromptMessage::developer_text("first"),
)
.expect("entry should be kept"),
Journal::context_entry(
["prompt", "guardian", "one"],
20,
PromptMessage::developer_text("guardian one"),
)
.expect("entry should be kept"),
Journal::context_entry(
["prompt", "guardian", "two"],
30,
PromptMessage::developer_text("guardian two"),
)
.expect("entry should be kept"),
])
.resolve()
.expect("entries should resolve");
let rendered = PromptRenderer::new()
.group(KeyFilter::prefix(["prompt", "developer"]))
.render_metadata(resolved.metadata());
assert_eq!(
rendered,
vec![
ResponseItem::Message {
id: None,
role: "developer".to_string(),
content: vec![ContentItem::InputText {
text: "first".to_string(),
}],
phase: None,
},
ResponseItem::from(PromptMessage::developer_text("guardian one")),
ResponseItem::from(PromptMessage::developer_text("guardian two")),
]
);
}

View File

@@ -0,0 +1,364 @@
use crate::models::ContentItem;
use crate::models::ResponseItem;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use ts_rs::TS;
use uuid::Uuid;
/// Stable key attached to a journal entry.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema, TS, Default)]
pub struct JournalKey {
pub parts: Vec<String>,
}
impl JournalKey {
/// Creates a key from an ordered sequence of parts.
pub fn new<I, S>(parts: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
Self {
parts: parts.into_iter().map(Into::into).collect(),
}
}
/// Returns the ordered key parts.
pub fn parts(&self) -> &[String] {
&self.parts
}
/// Returns a new key with one child part appended.
pub fn child(&self, part: impl Into<String>) -> Self {
let mut parts = self.parts.clone();
parts.push(part.into());
Self { parts }
}
/// Returns whether this key starts with the provided prefix.
pub fn starts_with(&self, prefix: &JournalKey) -> bool {
self.parts.starts_with(prefix.parts.as_slice())
}
}
impl<S, const N: usize> From<[S; N]> for JournalKey
where
S: Into<String>,
{
fn from(value: [S; N]) -> Self {
Self::new(value)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema, TS)]
#[serde(tag = "type", content = "value", rename_all = "snake_case")]
#[ts(tag = "type", content = "value", rename_all = "snake_case")]
pub enum KeyFilter {
Exact(JournalKey),
Prefix(JournalKey),
}
impl KeyFilter {
/// Matches one exact key.
pub fn exact(key: impl Into<JournalKey>) -> Self {
Self::Exact(key.into())
}
/// Matches any key with the provided prefix.
pub fn prefix(prefix: impl Into<JournalKey>) -> Self {
Self::Prefix(prefix.into())
}
/// Returns whether the filter matches the key.
pub fn matches(&self, key: &JournalKey) -> bool {
match self {
Self::Exact(expected) => key == expected,
Self::Prefix(prefix) => key.starts_with(prefix),
}
}
}
/// Minimal message block that can be projected into a model-visible prompt item.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, TS)]
pub struct PromptMessage {
pub role: PromptMessageRole,
pub content: Vec<ContentItem>,
}
impl PromptMessage {
pub fn new(role: PromptMessageRole, content: Vec<ContentItem>) -> Self {
Self { role, content }
}
pub fn developer_text(text: impl Into<String>) -> Self {
Self::text(PromptMessageRole::Developer, text)
}
pub fn user_text(text: impl Into<String>) -> Self {
Self::text(PromptMessageRole::User, text)
}
pub fn assistant_text(text: impl Into<String>) -> Self {
Self::text(PromptMessageRole::Assistant, text)
}
fn text(role: PromptMessageRole, text: impl Into<String>) -> Self {
let text = text.into();
let content = match role {
PromptMessageRole::Developer | PromptMessageRole::User => {
vec![ContentItem::InputText { text }]
}
PromptMessageRole::Assistant => vec![ContentItem::OutputText { text }],
};
Self::new(role, content)
}
}
impl From<PromptMessage> for ResponseItem {
fn from(value: PromptMessage) -> Self {
ResponseItem::Message {
id: None,
role: value.role.as_str().to_string(),
content: value.content,
phase: None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema, TS, Default)]
#[serde(rename_all = "snake_case")]
#[ts(rename_all = "snake_case")]
pub enum PromptMessageRole {
#[default]
Developer,
User,
Assistant,
}
impl PromptMessageRole {
pub fn as_str(self) -> &'static str {
match self {
Self::Developer => "developer",
Self::User => "user",
Self::Assistant => "assistant",
}
}
}
/// Durable transcript item. Unlike prompt metadata, transcript keeps original ordering.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, TS)]
pub struct JournalTranscriptItem {
pub id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub turn_id: Option<String>,
pub item: ResponseItem,
}
impl JournalTranscriptItem {
pub fn new(item: ResponseItem) -> Self {
Self {
id: Uuid::new_v4().to_string(),
turn_id: None,
item,
}
}
pub fn with_turn_id(mut self, turn_id: impl Into<String>) -> Self {
self.turn_id = Some(turn_id.into());
self
}
}
impl From<ResponseItem> for JournalTranscriptItem {
fn from(value: ResponseItem) -> Self {
Self::new(value)
}
}
impl From<JournalTranscriptItem> for ResponseItem {
fn from(value: JournalTranscriptItem) -> Self {
value.item
}
}
impl From<JournalMetadataItem> for ResponseItem {
fn from(value: JournalMetadataItem) -> Self {
value.message.into()
}
}
/// Prompt-metadata entry payload and filtering metadata.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, TS)]
pub struct JournalMetadataItem {
pub message: PromptMessage,
#[serde(default)]
pub prompt_order: i64,
#[serde(default)]
pub audience: JournalContextAudience,
#[serde(default)]
pub on_fork: JournalContextForkBehavior,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub tags: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub source: Option<String>,
}
impl JournalMetadataItem {
pub fn new(message: PromptMessage) -> Self {
Self {
message,
prompt_order: 0,
audience: JournalContextAudience::default(),
on_fork: JournalContextForkBehavior::default(),
tags: Vec::new(),
source: None,
}
}
pub fn with_prompt_order(mut self, prompt_order: i64) -> Self {
self.prompt_order = prompt_order;
self
}
pub fn with_audience(mut self, audience: JournalContextAudience) -> Self {
self.audience = audience;
self
}
pub fn with_on_fork(mut self, on_fork: JournalContextForkBehavior) -> Self {
self.on_fork = on_fork;
self
}
pub fn with_tags(mut self, tags: Vec<String>) -> Self {
self.tags = tags;
self
}
pub fn with_source(mut self, source: impl Into<String>) -> Self {
self.source = Some(source.into());
self
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema, TS, Default)]
#[serde(tag = "type", content = "value", rename_all = "snake_case")]
#[ts(tag = "type", content = "value", rename_all = "snake_case")]
pub enum JournalContextAudience {
#[default]
All,
RootOnly,
SubAgentsOnly,
AgentPathPrefix(String),
AgentRole(String),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema, TS, Default)]
#[serde(rename_all = "snake_case")]
#[ts(rename_all = "snake_case")]
pub enum JournalContextForkBehavior {
#[default]
Keep,
Drop,
Regenerate,
}
/// Cursor into the effective history at the point a checkpoint is applied.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema, TS)]
#[serde(tag = "type", content = "value", rename_all = "snake_case")]
#[ts(tag = "type", content = "value", rename_all = "snake_case")]
pub enum JournalHistoryCursor {
Start,
AfterItem(String),
End,
}
/// Replace the current history prefix through the resolved cursor.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, TS)]
pub struct JournalReplacePrefixCheckpoint {
pub through: JournalHistoryCursor,
pub replacement: Vec<JournalTranscriptItem>,
}
/// Keep only the current history prefix through the resolved cursor.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, TS)]
pub struct JournalTruncateHistoryCheckpoint {
pub through: JournalHistoryCursor,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, TS)]
#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
#[ts(tag = "type", content = "payload", rename_all = "snake_case")]
pub enum JournalCheckpointItem {
ReplacePrefix(JournalReplacePrefixCheckpoint),
TruncateHistory(JournalTruncateHistoryCheckpoint),
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, TS)]
#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
#[ts(tag = "type", content = "payload", rename_all = "snake_case")]
pub enum JournalItem {
#[serde(rename = "history")]
#[ts(rename = "history")]
Transcript(JournalTranscriptItem),
#[serde(rename = "context")]
#[ts(rename = "context")]
Metadata(JournalMetadataItem),
Checkpoint(JournalCheckpointItem),
}
impl From<ResponseItem> for JournalItem {
fn from(value: ResponseItem) -> Self {
Self::Transcript(value.into())
}
}
impl From<JournalTranscriptItem> for JournalItem {
fn from(value: JournalTranscriptItem) -> Self {
Self::Transcript(value)
}
}
impl From<JournalMetadataItem> for JournalItem {
fn from(value: JournalMetadataItem) -> Self {
Self::Metadata(value)
}
}
impl From<JournalCheckpointItem> for JournalItem {
fn from(value: JournalCheckpointItem) -> Self {
Self::Checkpoint(value)
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, TS)]
pub struct JournalEntry {
pub key: JournalKey,
pub item: JournalItem,
}
impl JournalEntry {
/// Creates a keyed journal entry.
pub fn new(key: impl Into<JournalKey>, item: impl Into<JournalItem>) -> Self {
Self {
key: key.into(),
item: item.into(),
}
}
}
impl<K, T> From<(K, T)> for JournalEntry
where
K: Into<JournalKey>,
T: Into<JournalItem>,
{
fn from(value: (K, T)) -> Self {
Self::new(value.0, value.1)
}
}
pub type JournalHistoryItem = JournalTranscriptItem;
pub type JournalContextItem = JournalMetadataItem;

View File

@@ -12,6 +12,7 @@ pub mod dynamic_tools;
pub mod error;
pub mod exec_output;
pub mod items;
pub mod journal;
pub mod mcp;
pub mod memory_citation;
pub mod message_history;

View File

@@ -22,11 +22,12 @@ codex-git-utils = { workspace = true }
codex-protocol = { workspace = true }
codex-rollout = { workspace = true }
codex-state = { workspace = true }
codex-journal = { workspace = true }
prost = "0.14.3"
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio = { workspace = true, features = ["fs", "io-util", "rt", "sync"] }
tonic = { workspace = true }
tonic-prost = { workspace = true }
tracing = { workspace = true }

View File

@@ -4,7 +4,7 @@ use codex_protocol::ThreadId;
pub type ThreadStoreResult<T> = Result<T, ThreadStoreError>;
/// Error type shared by thread-store implementations.
#[derive(Debug, thiserror::Error)]
#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
pub enum ThreadStoreError {
/// The requested thread does not exist in this store.
#[error("thread {thread_id} not found")]

View File

@@ -0,0 +1,351 @@
use std::path::Path;
use std::path::PathBuf;
use codex_journal::JournalEntry;
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
use tokio::io::BufWriter;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use crate::ThreadStoreError;
use crate::ThreadStoreResult;
const DEFAULT_QUEUE_CAPACITY: usize = 1024;
/// Async append-only JSONL journal for [`JournalEntry`] items.
///
/// Callers update the in-memory journal first, then enqueue the same keyed entries here for durable
/// persistence. Writes happen on a dedicated worker task; use [`Self::flush`] or
/// [`Self::shutdown`] when durability matters.
pub struct JournalWriter {
path: PathBuf,
tx: mpsc::Sender<Command>,
worker: Option<JoinHandle<ThreadStoreResult<()>>>,
}
impl JournalWriter {
/// Opens or creates a journal at `path`.
pub async fn open(path: impl Into<PathBuf>) -> ThreadStoreResult<Self> {
Self::open_with_capacity(path, DEFAULT_QUEUE_CAPACITY).await
}
/// Opens or creates a journal at `path` with a custom queue capacity.
pub async fn open_with_capacity(
path: impl Into<PathBuf>,
queue_capacity: usize,
) -> ThreadStoreResult<Self> {
if queue_capacity == 0 {
return Err(ThreadStoreError::InvalidRequest {
message: "journal queue capacity must be greater than zero".to_string(),
});
}
let path = path.into();
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await.map_err(io_error)?;
}
let file = OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.await
.map_err(io_error)?;
let writer = BufWriter::new(file);
let (tx, rx) = mpsc::channel(queue_capacity);
let worker_path = path.clone();
let worker = tokio::spawn(async move { run_worker(worker_path, writer, rx).await });
Ok(Self {
path,
tx,
worker: Some(worker),
})
}
/// Returns the backing journal path.
pub fn path(&self) -> &Path {
self.path.as_path()
}
/// Enqueues one entry for async persistence.
pub async fn enqueue(&self, entry: JournalEntry) -> ThreadStoreResult<()> {
self.tx
.send(Command::Append(vec![entry]))
.await
.map_err(send_error)
}
/// Enqueues several entries for async persistence.
pub async fn enqueue_all<I>(&self, entries: I) -> ThreadStoreResult<()>
where
I: IntoIterator<Item = JournalEntry>,
{
let entries = entries.into_iter().collect::<Vec<_>>();
if entries.is_empty() {
return Ok(());
}
self.tx
.send(Command::Append(entries))
.await
.map_err(send_error)
}
/// Waits until all previously enqueued entries are durable.
pub async fn flush(&self) -> ThreadStoreResult<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.tx
.send(Command::Flush { reply: reply_tx })
.await
.map_err(send_error)?;
reply_rx.await.map_err(recv_error)?
}
/// Flushes pending entries and stops the background worker.
pub async fn shutdown(mut self) -> ThreadStoreResult<()> {
let (reply_tx, reply_rx) = oneshot::channel();
self.tx
.send(Command::Shutdown { reply: reply_tx })
.await
.map_err(send_error)?;
let flush_result = reply_rx.await.map_err(recv_error)?;
let Some(worker) = self.worker.take() else {
return flush_result;
};
let worker_result = worker.await.map_err(join_error)?;
flush_result?;
worker_result
}
}
enum Command {
Append(Vec<JournalEntry>),
Flush {
reply: oneshot::Sender<ThreadStoreResult<()>>,
},
Shutdown {
reply: oneshot::Sender<ThreadStoreResult<()>>,
},
}
async fn run_worker(
path: PathBuf,
mut writer: BufWriter<tokio::fs::File>,
mut rx: mpsc::Receiver<Command>,
) -> ThreadStoreResult<()> {
let mut failure: Option<ThreadStoreError> = None;
while let Some(command) = rx.recv().await {
match command {
Command::Append(entries) => {
if failure.is_none()
&& let Err(err) = append_entries(&mut writer, entries.as_slice()).await
{
failure = Some(err);
}
}
Command::Flush { reply } => {
let result = flush_writer(&mut writer, failure.clone(), path.as_path()).await;
if let Err(err) = &result {
failure = Some(err.clone());
}
let _ = reply.send(result);
}
Command::Shutdown { reply } => {
let result = flush_writer(&mut writer, failure.clone(), path.as_path()).await;
let worker_result = result.clone();
let _ = reply.send(result);
return worker_result;
}
}
}
flush_writer(&mut writer, failure, path.as_path()).await
}
async fn append_entries(
writer: &mut BufWriter<tokio::fs::File>,
entries: &[JournalEntry],
) -> ThreadStoreResult<()> {
for entry in entries {
let mut line = serde_json::to_vec(entry).map_err(|source| ThreadStoreError::Internal {
message: format!("failed to serialize journal entry: {source}"),
})?;
line.push(b'\n');
writer.write_all(line.as_slice()).await.map_err(io_error)?;
}
Ok(())
}
async fn flush_writer(
writer: &mut BufWriter<tokio::fs::File>,
failure: Option<ThreadStoreError>,
path: &Path,
) -> ThreadStoreResult<()> {
if let Some(err) = failure {
return Err(err);
}
writer.flush().await.map_err(io_error)?;
writer
.get_mut()
.sync_data()
.await
.map_err(|err| ThreadStoreError::Internal {
message: format!("failed to sync journal at {}: {err}", path.display()),
})?;
Ok(())
}
fn io_error(err: std::io::Error) -> ThreadStoreError {
ThreadStoreError::Internal {
message: err.to_string(),
}
}
fn send_error(_: mpsc::error::SendError<Command>) -> ThreadStoreError {
ThreadStoreError::Internal {
message: "journal worker is not available".to_string(),
}
}
fn recv_error(_: oneshot::error::RecvError) -> ThreadStoreError {
ThreadStoreError::Internal {
message: "journal worker dropped its response channel".to_string(),
}
}
fn join_error(err: tokio::task::JoinError) -> ThreadStoreError {
ThreadStoreError::Internal {
message: format!("journal worker task failed: {err}"),
}
}
#[cfg(test)]
mod tests {
use codex_journal::Journal;
use codex_journal::JournalMetadataItem;
use codex_journal::JournalTranscriptItem;
use codex_journal::PromptMessage;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use pretty_assertions::assert_eq;
use tempfile::tempdir;
use super::JournalWriter;
fn developer_entry(text: &str) -> codex_journal::JournalEntry {
codex_journal::JournalEntry::new(
["prompt", text],
JournalMetadataItem::new(PromptMessage::developer_text(text)),
)
}
fn user_entry(text: &str) -> codex_journal::JournalEntry {
codex_journal::JournalEntry::new(
["history", text],
JournalTranscriptItem {
id: format!("history-{text}"),
turn_id: None,
item: ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: text.to_string(),
}],
phase: None,
},
},
)
}
#[tokio::test]
async fn enqueue_and_flush_persists_entries_in_order() {
let dir = tempdir().expect("tempdir");
let path = dir.path().join("journal.jsonl");
let journal = JournalWriter::open(path.clone())
.await
.expect("journal should open");
journal
.enqueue(developer_entry("first"))
.await
.expect("first entry should enqueue");
journal
.enqueue(user_entry("hello"))
.await
.expect("second entry should enqueue");
journal.flush().await.expect("journal should flush");
let loaded = Journal::load_jsonl(path.as_path()).expect("journal should load");
assert_eq!(
loaded.entries(),
vec![developer_entry("first"), user_entry("hello")]
);
}
#[tokio::test]
async fn enqueue_all_and_shutdown_persist_entries() {
let dir = tempdir().expect("tempdir");
let path = dir.path().join("journal.jsonl");
let journal = JournalWriter::open(path.clone())
.await
.expect("journal should open");
journal
.enqueue_all(vec![developer_entry("one"), developer_entry("two")])
.await
.expect("entries should enqueue");
journal.shutdown().await.expect("journal should shut down");
let loaded = Journal::load_jsonl(path.as_path()).expect("journal should load");
assert_eq!(
loaded.entries(),
vec![developer_entry("one"), developer_entry("two")]
);
}
#[tokio::test]
async fn reopen_appends_to_existing_journal() {
let dir = tempdir().expect("tempdir");
let path = dir.path().join("journal.jsonl");
let first = JournalWriter::open(path.clone())
.await
.expect("first journal should open");
first
.enqueue(developer_entry("first"))
.await
.expect("entry should enqueue");
first
.shutdown()
.await
.expect("first journal should shut down");
let second = JournalWriter::open(path.clone())
.await
.expect("second journal should open");
second
.enqueue(user_entry("second"))
.await
.expect("entry should enqueue");
second
.shutdown()
.await
.expect("second journal should shut down");
let loaded = Journal::load_jsonl(path.as_path()).expect("journal should load");
assert_eq!(
loaded.entries(),
vec![developer_entry("first"), user_entry("second")]
);
}
}

View File

@@ -7,6 +7,7 @@
mod error;
#[cfg(debug_assertions)]
mod in_memory;
mod journal_writer;
mod live_thread;
mod local;
mod remote;
@@ -19,6 +20,7 @@ pub use error::ThreadStoreResult;
pub use in_memory::InMemoryThreadStore;
#[cfg(debug_assertions)]
pub use in_memory::InMemoryThreadStoreCalls;
pub use journal_writer::JournalWriter;
pub use live_thread::LiveThread;
pub use live_thread::LiveThreadInitGuard;
pub use local::LocalThreadStore;