mirror of
https://github.com/openai/codex.git
synced 2026-05-06 20:36:33 +00:00
Compare commits
2 Commits
jif/featur
...
jif/codex-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
03c5bfdd28 | ||
|
|
63402cbc3b |
19
codex-rs/Cargo.lock
generated
19
codex-rs/Cargo.lock
generated
@@ -2374,6 +2374,7 @@ dependencies = [
|
||||
"codex-feedback",
|
||||
"codex-git-utils",
|
||||
"codex-hooks",
|
||||
"codex-journal",
|
||||
"codex-login",
|
||||
"codex-mcp",
|
||||
"codex-memories-read",
|
||||
@@ -2815,6 +2816,23 @@ dependencies = [
|
||||
"tempfile",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-journal"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"base64 0.22.1",
|
||||
"codex-protocol",
|
||||
"codex-utils-cache",
|
||||
"codex-utils-output-truncation",
|
||||
"image",
|
||||
"indexmap 2.13.0",
|
||||
"pretty_assertions",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
"thiserror 2.0.18",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-keyring-store"
|
||||
version = "0.0.0"
|
||||
@@ -3484,6 +3502,7 @@ dependencies = [
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"codex-git-utils",
|
||||
"codex-journal",
|
||||
"codex-protocol",
|
||||
"codex-rollout",
|
||||
"codex-state",
|
||||
|
||||
@@ -98,6 +98,7 @@ members = [
|
||||
"terminal-detection",
|
||||
"test-binary-support",
|
||||
"thread-store",
|
||||
"journal",
|
||||
"uds",
|
||||
"codex-experimental-api-macros",
|
||||
"plugin",
|
||||
@@ -190,6 +191,7 @@ codex-stdio-to-uds = { path = "stdio-to-uds" }
|
||||
codex-terminal-detection = { path = "terminal-detection" }
|
||||
codex-test-binary-support = { path = "test-binary-support" }
|
||||
codex-thread-store = { path = "thread-store" }
|
||||
codex-journal = { path = "journal" }
|
||||
codex-tools = { path = "tools" }
|
||||
codex-tui = { path = "tui" }
|
||||
codex-uds = { path = "uds" }
|
||||
|
||||
@@ -207,7 +207,7 @@ pub struct ConfigToml {
|
||||
/// Ordered list of fallback filenames to look for when AGENTS.md is missing.
|
||||
pub project_doc_fallback_filenames: Option<Vec<String>>,
|
||||
|
||||
/// Token budget applied when storing tool/function outputs in the context manager.
|
||||
/// Token budget applied when storing tool/function outputs in thread history.
|
||||
pub tool_output_token_limit: Option<usize>,
|
||||
|
||||
/// Maximum poll window for background terminal output (`write_stdin`), in milliseconds.
|
||||
|
||||
@@ -60,6 +60,7 @@ codex-sandboxing = { workspace = true }
|
||||
codex-state = { workspace = true }
|
||||
codex-terminal-detection = { workspace = true }
|
||||
codex-thread-store = { workspace = true }
|
||||
codex-journal = { workspace = true }
|
||||
codex-tools = { workspace = true }
|
||||
codex-utils-absolute-path = { workspace = true }
|
||||
codex-utils-cache = { workspace = true }
|
||||
|
||||
@@ -3885,7 +3885,7 @@
|
||||
"type": "boolean"
|
||||
},
|
||||
"tool_output_token_limit": {
|
||||
"description": "Token budget applied when storing tool/function outputs in the context manager.",
|
||||
"description": "Token budget applied when storing tool/function outputs in thread history.",
|
||||
"format": "uint",
|
||||
"minimum": 0.0,
|
||||
"type": "integer"
|
||||
@@ -3946,4 +3946,4 @@
|
||||
},
|
||||
"title": "ConfigToml",
|
||||
"type": "object"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,8 +7,10 @@ use crate::config::Config;
|
||||
use crate::config::ConfigBuilder;
|
||||
use crate::context::ContextualUserFragment;
|
||||
use crate::context::SubagentNotification;
|
||||
use crate::state::history as session_history;
|
||||
use assert_matches::assert_matches;
|
||||
use codex_features::Feature;
|
||||
use codex_journal::Journal;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_protocol::AgentPath;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
@@ -34,6 +36,16 @@ use tokio::time::sleep;
|
||||
use tokio::time::timeout;
|
||||
use toml::Value as TomlValue;
|
||||
|
||||
trait JournalTestExt {
|
||||
fn raw_items(&self) -> Vec<ResponseItem>;
|
||||
}
|
||||
|
||||
impl JournalTestExt for Journal {
|
||||
fn raw_items(&self) -> Vec<ResponseItem> {
|
||||
session_history::raw_items(self)
|
||||
}
|
||||
}
|
||||
|
||||
async fn test_config_with_cli_overrides(
|
||||
cli_overrides: Vec<(String, TomlValue)>,
|
||||
) -> (TempDir, Config) {
|
||||
@@ -796,8 +808,9 @@ async fn spawn_agent_fork_flushes_parent_rollout_before_loading_history() {
|
||||
.await
|
||||
.expect("child thread should be registered");
|
||||
let history = child_thread.codex.session.clone_history().await;
|
||||
let history_items = history.raw_items();
|
||||
assert!(
|
||||
history_contains_text(history.raw_items(), "unflushed final answer"),
|
||||
history_contains_text(&history_items, "unflushed final answer"),
|
||||
"forked child history should include unflushed assistant final answers after flushing the parent rollout"
|
||||
);
|
||||
|
||||
@@ -906,21 +919,22 @@ async fn spawn_agent_fork_last_n_turns_keeps_only_recent_turns() {
|
||||
.await
|
||||
.expect("child thread should be registered");
|
||||
let history = child_thread.codex.session.clone_history().await;
|
||||
let history_items = history.raw_items();
|
||||
|
||||
assert!(
|
||||
!history_contains_text(history.raw_items(), "old parent context"),
|
||||
!history_contains_text(&history_items, "old parent context"),
|
||||
"forked child history should drop parent context outside the requested last-N turn window"
|
||||
);
|
||||
assert!(
|
||||
!history_contains_text(history.raw_items(), "queued message"),
|
||||
!history_contains_text(&history_items, "queued message"),
|
||||
"forked child history should drop queued inter-agent messages outside the requested last-N turn window"
|
||||
);
|
||||
assert!(
|
||||
!history_contains_text(history.raw_items(), "triggered context"),
|
||||
!history_contains_text(&history_items, "triggered context"),
|
||||
"forked child history should filter assistant inter-agent messages even when they fall inside the requested last-N turn window"
|
||||
);
|
||||
assert!(
|
||||
history_contains_text(history.raw_items(), "current parent task"),
|
||||
history_contains_text(&history_items, "current parent task"),
|
||||
"forked child history should keep the parent user message from the requested last-N turn window"
|
||||
);
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::compact::content_items_to_text;
|
||||
use crate::event_mapping::is_contextual_user_message_content;
|
||||
use crate::session::session::Session;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use crate::state::history as session_history;
|
||||
use codex_login::default_client::build_reqwest_client;
|
||||
use codex_protocol::models::MessagePhase;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
@@ -224,7 +225,8 @@ async fn build_arc_monitor_request(
|
||||
protection_client_callsite: &'static str,
|
||||
) -> ArcMonitorRequest {
|
||||
let history = sess.clone_history().await;
|
||||
let mut messages = build_arc_monitor_messages(history.raw_items());
|
||||
let history_items = session_history::raw_items(&history);
|
||||
let mut messages = build_arc_monitor_messages(&history_items);
|
||||
if messages.is_empty() {
|
||||
messages.push(build_arc_monitor_message(
|
||||
"user",
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::session::PreviousTurnSettings;
|
||||
use crate::session::session::Session;
|
||||
use crate::session::turn::get_last_assistant_message_from_turn;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use crate::state::history as session_history;
|
||||
use crate::util::backoff;
|
||||
use codex_analytics::CodexCompactionEvent;
|
||||
use codex_analytics::CompactionImplementation;
|
||||
@@ -160,7 +161,8 @@ async fn run_compact_task_inner_impl(
|
||||
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
|
||||
|
||||
let mut history = sess.clone_history().await;
|
||||
history.record_items(
|
||||
session_history::record_items(
|
||||
&mut history,
|
||||
&[initial_input_for_turn.into()],
|
||||
turn_context.truncation_policy,
|
||||
);
|
||||
@@ -176,9 +178,8 @@ async fn run_compact_task_inner_impl(
|
||||
|
||||
loop {
|
||||
// Clone is required because of the loop
|
||||
let turn_input = history
|
||||
.clone()
|
||||
.for_prompt(&turn_context.model_info.input_modalities);
|
||||
let turn_input =
|
||||
session_history::for_prompt(&history, &turn_context.model_info.input_modalities);
|
||||
let turn_input_len = turn_input.len();
|
||||
let prompt = Prompt {
|
||||
input: turn_input,
|
||||
@@ -218,7 +219,7 @@ async fn run_compact_task_inner_impl(
|
||||
error!(
|
||||
"Context window exceeded while compacting; removing oldest history item. Error: {e}"
|
||||
);
|
||||
history.remove_first_item();
|
||||
session_history::remove_first_item(&mut history);
|
||||
truncated_count += 1;
|
||||
retries = 0;
|
||||
continue;
|
||||
@@ -250,10 +251,11 @@ async fn run_compact_task_inner_impl(
|
||||
}
|
||||
|
||||
let history_snapshot = sess.clone_history().await;
|
||||
let history_items = history_snapshot.raw_items();
|
||||
let summary_suffix = get_last_assistant_message_from_turn(history_items).unwrap_or_default();
|
||||
let history_items = session_history::raw_items(&history_snapshot);
|
||||
let summary_suffix =
|
||||
get_last_assistant_message_from_turn(history_items.as_slice()).unwrap_or_default();
|
||||
let summary_text = format!("{SUMMARY_PREFIX}\n{summary_suffix}");
|
||||
let user_messages = collect_user_messages(history_items);
|
||||
let user_messages = collect_user_messages(history_items.as_slice());
|
||||
|
||||
let mut new_history = build_compacted_history(Vec::new(), &user_messages, &summary_text);
|
||||
|
||||
|
||||
@@ -6,17 +6,18 @@ use crate::compact::CompactionAnalyticsAttempt;
|
||||
use crate::compact::InitialContextInjection;
|
||||
use crate::compact::compaction_status_from_result;
|
||||
use crate::compact::insert_initial_context_before_last_real_user_or_summary;
|
||||
use crate::context_manager::ContextManager;
|
||||
use crate::context_manager::TotalTokenUsageBreakdown;
|
||||
use crate::context_manager::estimate_response_item_model_visible_bytes;
|
||||
use crate::context_manager::is_codex_generated_item;
|
||||
use crate::session::session::Session;
|
||||
use crate::session::turn::built_tools;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use crate::state::TotalTokenUsageBreakdown;
|
||||
use crate::state::history as session_history;
|
||||
use codex_analytics::CompactionImplementation;
|
||||
use codex_analytics::CompactionPhase;
|
||||
use codex_analytics::CompactionReason;
|
||||
use codex_analytics::CompactionTrigger;
|
||||
use codex_journal::Journal;
|
||||
use codex_protocol::error::CodexErr;
|
||||
use codex_protocol::error::Result as CodexResult;
|
||||
use codex_protocol::items::ContextCompactionItem;
|
||||
@@ -144,8 +145,9 @@ async fn run_remote_compact_task_inner_impl(
|
||||
// This is the history selected for remote compaction, after any trimming required to fit the
|
||||
// compact endpoint. The checkpoint below records it separately from the next sampling request,
|
||||
// whose prompt will repeat current developer/context prefix items.
|
||||
let trace_input_history = history.raw_items().to_vec();
|
||||
let prompt_input = history.for_prompt(&turn_context.model_info.input_modalities);
|
||||
let trace_input_history = session_history::raw_items(&history);
|
||||
let prompt_input =
|
||||
session_history::for_prompt(&history, &turn_context.model_info.input_modalities);
|
||||
let tool_router = built_tools(
|
||||
sess.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
@@ -325,7 +327,7 @@ fn log_remote_compact_failure(
|
||||
}
|
||||
|
||||
fn trim_function_call_history_to_fit_context_window(
|
||||
history: &mut ContextManager,
|
||||
history: &mut Journal,
|
||||
turn_context: &TurnContext,
|
||||
base_instructions: &BaseInstructions,
|
||||
) -> usize {
|
||||
@@ -334,17 +336,17 @@ fn trim_function_call_history_to_fit_context_window(
|
||||
return deleted_items;
|
||||
};
|
||||
|
||||
while history
|
||||
.estimate_token_count_with_base_instructions(base_instructions)
|
||||
while session_history::estimate_token_count_with_base_instructions(history, base_instructions)
|
||||
.is_some_and(|estimated_tokens| estimated_tokens > context_window)
|
||||
{
|
||||
let Some(last_item) = history.raw_items().last() else {
|
||||
let history_items = session_history::raw_items(history);
|
||||
let Some(last_item) = history_items.last() else {
|
||||
break;
|
||||
};
|
||||
if !is_codex_generated_item(last_item) {
|
||||
break;
|
||||
}
|
||||
if !history.remove_last_item() {
|
||||
if !session_history::remove_last_item(history) {
|
||||
break;
|
||||
}
|
||||
deleted_items += 1;
|
||||
|
||||
@@ -559,7 +559,7 @@ pub struct Config {
|
||||
/// Additional filenames to try when looking for project-level docs.
|
||||
pub project_doc_fallback_filenames: Vec<String>,
|
||||
|
||||
/// Token budget applied when storing tool/function outputs in the context manager.
|
||||
/// Token budget applied when storing tool/function outputs in thread history.
|
||||
pub tool_output_token_limit: Option<usize>,
|
||||
|
||||
/// Maximum number of agent threads that can be open concurrently.
|
||||
|
||||
@@ -1,726 +0,0 @@
|
||||
use crate::context_manager::normalize;
|
||||
use crate::event_mapping::has_non_contextual_dev_message_content;
|
||||
use crate::event_mapping::is_contextual_dev_message_content;
|
||||
use crate::event_mapping::is_contextual_user_message_content;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
||||
use codex_protocol::models::BaseInstructions;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::FunctionCallOutputBody;
|
||||
use codex_protocol::models::FunctionCallOutputContentItem;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::ImageDetail;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::openai_models::InputModality;
|
||||
use codex_protocol::protocol::InterAgentCommunication;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::protocol::TokenUsageInfo;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
use codex_utils_cache::BlockingLruCache;
|
||||
use codex_utils_cache::sha1_digest;
|
||||
use codex_utils_output_truncation::TruncationPolicy;
|
||||
use codex_utils_output_truncation::approx_bytes_for_tokens;
|
||||
use codex_utils_output_truncation::approx_token_count;
|
||||
use codex_utils_output_truncation::approx_tokens_from_byte_count_i64;
|
||||
use codex_utils_output_truncation::truncate_function_output_items_with_policy;
|
||||
use codex_utils_output_truncation::truncate_text;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::ops::Deref;
|
||||
use std::sync::LazyLock;
|
||||
|
||||
/// Transcript of thread history
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub(crate) struct ContextManager {
|
||||
/// The oldest items are at the beginning of the vector.
|
||||
items: Vec<ResponseItem>,
|
||||
/// Bumped whenever history is rewritten, such as compaction or rollback.
|
||||
history_version: u64,
|
||||
token_info: Option<TokenUsageInfo>,
|
||||
/// Reference context snapshot used for diffing and producing model-visible
|
||||
/// settings update items.
|
||||
///
|
||||
/// This is the baseline for the next regular model turn, and may already
|
||||
/// match the current turn after context updates are persisted.
|
||||
///
|
||||
/// When this is `None`, settings diffing treats the next turn as having no
|
||||
/// baseline and emits a full reinjection of context state. Rollback may
|
||||
/// also clear this when it trims a mixed initial-context developer bundle
|
||||
/// whose non-diff fragments no longer exist in the surviving history.
|
||||
reference_context_item: Option<TurnContextItem>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
pub(crate) struct TotalTokenUsageBreakdown {
|
||||
pub last_api_response_total_tokens: i64,
|
||||
pub all_history_items_model_visible_bytes: i64,
|
||||
pub estimated_tokens_of_items_added_since_last_successful_api_response: i64,
|
||||
pub estimated_bytes_of_items_added_since_last_successful_api_response: i64,
|
||||
}
|
||||
|
||||
impl ContextManager {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
items: Vec::new(),
|
||||
history_version: 0,
|
||||
token_info: TokenUsageInfo::new_or_append(
|
||||
&None, &None, /*model_context_window*/ None,
|
||||
),
|
||||
reference_context_item: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn token_info(&self) -> Option<TokenUsageInfo> {
|
||||
self.token_info.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn set_token_info(&mut self, info: Option<TokenUsageInfo>) {
|
||||
self.token_info = info;
|
||||
}
|
||||
|
||||
pub(crate) fn set_reference_context_item(&mut self, item: Option<TurnContextItem>) {
|
||||
self.reference_context_item = item;
|
||||
}
|
||||
|
||||
pub(crate) fn reference_context_item(&self) -> Option<TurnContextItem> {
|
||||
self.reference_context_item.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn set_token_usage_full(&mut self, context_window: i64) {
|
||||
match &mut self.token_info {
|
||||
Some(info) => info.fill_to_context_window(context_window),
|
||||
None => {
|
||||
self.token_info = Some(TokenUsageInfo::full_context_window(context_window));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `items` is ordered from oldest to newest.
|
||||
pub(crate) fn record_items<I>(&mut self, items: I, policy: TruncationPolicy)
|
||||
where
|
||||
I: IntoIterator,
|
||||
I::Item: std::ops::Deref<Target = ResponseItem>,
|
||||
{
|
||||
for item in items {
|
||||
let item_ref = item.deref();
|
||||
if !is_api_message(item_ref) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let processed = self.process_item(item_ref, policy);
|
||||
self.items.push(processed);
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the history prepared for sending to the model. This applies a proper
|
||||
/// normalization and drops un-suited items. When `input_modalities` does not
|
||||
/// include `InputModality::Image`, images are stripped from messages and tool
|
||||
/// outputs.
|
||||
pub(crate) fn for_prompt(mut self, input_modalities: &[InputModality]) -> Vec<ResponseItem> {
|
||||
self.normalize_history(input_modalities);
|
||||
self.items
|
||||
}
|
||||
|
||||
/// Returns raw items in the history.
|
||||
pub(crate) fn raw_items(&self) -> &[ResponseItem] {
|
||||
&self.items
|
||||
}
|
||||
|
||||
pub(crate) fn history_version(&self) -> u64 {
|
||||
self.history_version
|
||||
}
|
||||
|
||||
// Estimate token usage using byte-based heuristics from the truncation helpers.
|
||||
// This is a coarse lower bound, not a tokenizer-accurate count.
|
||||
pub(crate) fn estimate_token_count(&self, turn_context: &TurnContext) -> Option<i64> {
|
||||
let model_info = &turn_context.model_info;
|
||||
let personality = turn_context.personality.or(turn_context.config.personality);
|
||||
let base_instructions = BaseInstructions {
|
||||
text: model_info.get_model_instructions(personality),
|
||||
};
|
||||
self.estimate_token_count_with_base_instructions(&base_instructions)
|
||||
}
|
||||
|
||||
pub(crate) fn estimate_token_count_with_base_instructions(
|
||||
&self,
|
||||
base_instructions: &BaseInstructions,
|
||||
) -> Option<i64> {
|
||||
let base_tokens =
|
||||
i64::try_from(approx_token_count(&base_instructions.text)).unwrap_or(i64::MAX);
|
||||
|
||||
let items_tokens = self
|
||||
.items
|
||||
.iter()
|
||||
.map(estimate_item_token_count)
|
||||
.fold(0i64, i64::saturating_add);
|
||||
|
||||
Some(base_tokens.saturating_add(items_tokens))
|
||||
}
|
||||
|
||||
pub(crate) fn remove_first_item(&mut self) {
|
||||
if !self.items.is_empty() {
|
||||
// Remove the oldest item (front of the list). Items are ordered from
|
||||
// oldest → newest, so index 0 is the first entry recorded.
|
||||
let removed = self.items.remove(0);
|
||||
// If the removed item participates in a call/output pair, also remove
|
||||
// its corresponding counterpart to keep the invariants intact without
|
||||
// running a full normalization pass.
|
||||
normalize::remove_corresponding_for(&mut self.items, &removed);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn remove_last_item(&mut self) -> bool {
|
||||
if let Some(removed) = self.items.pop() {
|
||||
normalize::remove_corresponding_for(&mut self.items, &removed);
|
||||
self.history_version = self.history_version.saturating_add(1);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn replace(&mut self, items: Vec<ResponseItem>) {
|
||||
self.items = items;
|
||||
self.history_version = self.history_version.saturating_add(1);
|
||||
}
|
||||
|
||||
/// Replace image content in the last turn if it originated from a tool output.
|
||||
/// Returns true when a tool image was replaced, false otherwise.
|
||||
pub(crate) fn replace_last_turn_images(&mut self, placeholder: &str) -> bool {
|
||||
let Some(index) = self.items.iter().rposition(|item| {
|
||||
matches!(item, ResponseItem::FunctionCallOutput { .. }) || is_user_turn_boundary(item)
|
||||
}) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
match &mut self.items[index] {
|
||||
ResponseItem::FunctionCallOutput { output, .. } => {
|
||||
let Some(content_items) = output.content_items_mut() else {
|
||||
return false;
|
||||
};
|
||||
let mut replaced = false;
|
||||
let placeholder = placeholder.to_string();
|
||||
for item in content_items.iter_mut() {
|
||||
if matches!(item, FunctionCallOutputContentItem::InputImage { .. }) {
|
||||
*item = FunctionCallOutputContentItem::InputText {
|
||||
text: placeholder.clone(),
|
||||
};
|
||||
replaced = true;
|
||||
}
|
||||
}
|
||||
if replaced {
|
||||
self.history_version = self.history_version.saturating_add(1);
|
||||
}
|
||||
replaced
|
||||
}
|
||||
ResponseItem::Message { .. } => false,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Drop the last `num_turns` instruction turns from this history.
|
||||
///
|
||||
/// Instruction turns are history messages that should behave like a new prompt boundary:
|
||||
/// ordinary user messages and structured assistant inter-agent instructions.
|
||||
///
|
||||
/// This mirrors thread-rollback semantics:
|
||||
/// - `num_turns == 0` is a no-op
|
||||
/// - if there are no user turns, this is a no-op
|
||||
/// - if `num_turns` exceeds the number of user turns, all user turns are dropped while
|
||||
/// preserving any items that occurred before the first user message.
|
||||
///
|
||||
/// If rollback trims a pre-turn developer message that mixes contextual fragments with
|
||||
/// persistent developer text from `build_initial_context`, this also clears
|
||||
/// `reference_context_item`. The surviving history no longer contains the full bundle that
|
||||
/// established the prior baseline, so future turns must fall back to full reinjection instead
|
||||
/// of diffing against stale state.
|
||||
pub(crate) fn drop_last_n_user_turns(&mut self, num_turns: u32) {
|
||||
if num_turns == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
let snapshot = self.items.clone();
|
||||
let user_positions = user_message_positions(&snapshot);
|
||||
let Some(&first_instruction_turn_idx) = user_positions.first() else {
|
||||
self.replace(snapshot);
|
||||
return;
|
||||
};
|
||||
|
||||
let n_from_end = usize::try_from(num_turns).unwrap_or(usize::MAX);
|
||||
let mut cut_idx = if n_from_end >= user_positions.len() {
|
||||
first_instruction_turn_idx
|
||||
} else {
|
||||
user_positions[user_positions.len() - n_from_end]
|
||||
};
|
||||
|
||||
cut_idx =
|
||||
self.trim_pre_turn_context_updates(&snapshot, first_instruction_turn_idx, cut_idx);
|
||||
|
||||
self.replace(snapshot[..cut_idx].to_vec());
|
||||
}
|
||||
|
||||
pub(crate) fn update_token_info(
|
||||
&mut self,
|
||||
usage: &TokenUsage,
|
||||
model_context_window: Option<i64>,
|
||||
) {
|
||||
self.token_info = TokenUsageInfo::new_or_append(
|
||||
&self.token_info,
|
||||
&Some(usage.clone()),
|
||||
model_context_window,
|
||||
);
|
||||
}
|
||||
|
||||
fn get_non_last_reasoning_items_tokens(&self) -> i64 {
|
||||
// Get reasoning items excluding all the ones after the last instruction boundary.
|
||||
let Some(last_user_index) = self.items.iter().rposition(is_user_turn_boundary) else {
|
||||
return 0;
|
||||
};
|
||||
|
||||
self.items
|
||||
.iter()
|
||||
.take(last_user_index)
|
||||
.filter(|item| {
|
||||
matches!(
|
||||
item,
|
||||
ResponseItem::Reasoning {
|
||||
encrypted_content: Some(_),
|
||||
..
|
||||
}
|
||||
)
|
||||
})
|
||||
.map(estimate_item_token_count)
|
||||
.fold(0i64, i64::saturating_add)
|
||||
}
|
||||
|
||||
// These are local items added after the most recent model-emitted item.
|
||||
// They are not reflected in `last_token_usage.total_tokens`.
|
||||
fn items_after_last_model_generated_item(&self) -> &[ResponseItem] {
|
||||
let start = self
|
||||
.items
|
||||
.iter()
|
||||
.rposition(is_model_generated_item)
|
||||
.map_or(self.items.len(), |index| index.saturating_add(1));
|
||||
&self.items[start..]
|
||||
}
|
||||
|
||||
/// When true, the server already accounted for past reasoning tokens and
|
||||
/// the client should not re-estimate them.
|
||||
pub(crate) fn get_total_token_usage(&self, server_reasoning_included: bool) -> i64 {
|
||||
let last_tokens = self
|
||||
.token_info
|
||||
.as_ref()
|
||||
.map(|info| info.last_token_usage.total_tokens)
|
||||
.unwrap_or(0);
|
||||
let items_after_last_model_generated_tokens = self
|
||||
.items_after_last_model_generated_item()
|
||||
.iter()
|
||||
.map(estimate_item_token_count)
|
||||
.fold(0i64, i64::saturating_add);
|
||||
if server_reasoning_included {
|
||||
last_tokens.saturating_add(items_after_last_model_generated_tokens)
|
||||
} else {
|
||||
last_tokens
|
||||
.saturating_add(self.get_non_last_reasoning_items_tokens())
|
||||
.saturating_add(items_after_last_model_generated_tokens)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_total_token_usage_breakdown(&self) -> TotalTokenUsageBreakdown {
|
||||
let last_usage = self
|
||||
.token_info
|
||||
.as_ref()
|
||||
.map(|info| info.last_token_usage.clone())
|
||||
.unwrap_or_default();
|
||||
let items_after_last_model_generated = self.items_after_last_model_generated_item();
|
||||
|
||||
TotalTokenUsageBreakdown {
|
||||
last_api_response_total_tokens: last_usage.total_tokens,
|
||||
all_history_items_model_visible_bytes: self
|
||||
.items
|
||||
.iter()
|
||||
.map(estimate_response_item_model_visible_bytes)
|
||||
.fold(0i64, i64::saturating_add),
|
||||
estimated_tokens_of_items_added_since_last_successful_api_response:
|
||||
items_after_last_model_generated
|
||||
.iter()
|
||||
.map(estimate_item_token_count)
|
||||
.fold(0i64, i64::saturating_add),
|
||||
estimated_bytes_of_items_added_since_last_successful_api_response:
|
||||
items_after_last_model_generated
|
||||
.iter()
|
||||
.map(estimate_response_item_model_visible_bytes)
|
||||
.fold(0i64, i64::saturating_add),
|
||||
}
|
||||
}
|
||||
|
||||
/// This function enforces a couple of invariants on the in-memory history:
|
||||
/// 1. every call (function/custom) has a corresponding output entry
|
||||
/// 2. every output has a corresponding call entry
|
||||
/// 3. when images are unsupported, image content is stripped from messages and tool outputs
|
||||
fn normalize_history(&mut self, input_modalities: &[InputModality]) {
|
||||
// all function/tool calls must have a corresponding output
|
||||
normalize::ensure_call_outputs_present(&mut self.items);
|
||||
|
||||
// all outputs must have a corresponding function/tool call
|
||||
normalize::remove_orphan_outputs(&mut self.items);
|
||||
|
||||
// strip images when model does not support them
|
||||
normalize::strip_images_when_unsupported(input_modalities, &mut self.items);
|
||||
}
|
||||
|
||||
fn process_item(&self, item: &ResponseItem, policy: TruncationPolicy) -> ResponseItem {
|
||||
let policy_with_serialization_budget = policy * 1.2;
|
||||
match item {
|
||||
ResponseItem::FunctionCallOutput { call_id, output } => {
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: truncate_function_output_payload(
|
||||
output,
|
||||
policy_with_serialization_budget,
|
||||
),
|
||||
}
|
||||
}
|
||||
ResponseItem::CustomToolCallOutput {
|
||||
call_id,
|
||||
name,
|
||||
output,
|
||||
} => ResponseItem::CustomToolCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
name: name.clone(),
|
||||
output: truncate_function_output_payload(output, policy_with_serialization_budget),
|
||||
},
|
||||
ResponseItem::Message { .. }
|
||||
| ResponseItem::Reasoning { .. }
|
||||
| ResponseItem::LocalShellCall { .. }
|
||||
| ResponseItem::FunctionCall { .. }
|
||||
| ResponseItem::ToolSearchCall { .. }
|
||||
| ResponseItem::ToolSearchOutput { .. }
|
||||
| ResponseItem::WebSearchCall { .. }
|
||||
| ResponseItem::ImageGenerationCall { .. }
|
||||
| ResponseItem::CustomToolCall { .. }
|
||||
| ResponseItem::Compaction { .. }
|
||||
| ResponseItem::Other => item.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Walk backward from a rollback cut and trim contiguous pre-turn context-update items.
|
||||
///
|
||||
/// Returns the adjusted cut index after removing contextual developer/user items immediately
|
||||
/// above the rolled-back turn boundary.
|
||||
///
|
||||
/// `first_instruction_turn_idx` is the earliest rollback-eligible instruction-turn boundary
|
||||
/// in `snapshot`; the trim walk never crosses it so any session-prefix items that predate the
|
||||
/// first real turn survive rollback.
|
||||
///
|
||||
/// `cut_idx` is the tentative slice boundary after dropping the requested number of
|
||||
/// instruction turns, before stripping contextual pre-turn items that sit immediately above
|
||||
/// that boundary.
|
||||
///
|
||||
/// If any trimmed developer message was a mixed `build_initial_context` bundle containing both
|
||||
/// rollback-trimmable contextual fragments and persistent developer text, this also clears the
|
||||
/// stored `reference_context_item` baseline so the next real turn falls back to full
|
||||
/// reinjection.
|
||||
fn trim_pre_turn_context_updates(
|
||||
&mut self,
|
||||
snapshot: &[ResponseItem],
|
||||
first_instruction_turn_idx: usize,
|
||||
mut cut_idx: usize,
|
||||
) -> usize {
|
||||
while cut_idx > first_instruction_turn_idx {
|
||||
match &snapshot[cut_idx - 1] {
|
||||
ResponseItem::Message { role, content, .. }
|
||||
if role == "developer" && is_contextual_dev_message_content(content) =>
|
||||
{
|
||||
if has_non_contextual_dev_message_content(content) {
|
||||
// Mixed `build_initial_context` bundles are not reconstructible from
|
||||
// steady-state diffs once trimmed, so the next real turn must fully
|
||||
// reinject context instead of diffing against a stale baseline.
|
||||
self.reference_context_item = None;
|
||||
}
|
||||
cut_idx -= 1;
|
||||
}
|
||||
ResponseItem::Message { role, content, .. }
|
||||
if role == "user" && is_contextual_user_message_content(content) =>
|
||||
{
|
||||
cut_idx -= 1;
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
cut_idx
|
||||
}
|
||||
}
|
||||
|
||||
fn truncate_function_output_payload(
|
||||
output: &FunctionCallOutputPayload,
|
||||
policy: TruncationPolicy,
|
||||
) -> FunctionCallOutputPayload {
|
||||
let body = match &output.body {
|
||||
FunctionCallOutputBody::Text(content) => {
|
||||
FunctionCallOutputBody::Text(truncate_text(content, policy))
|
||||
}
|
||||
FunctionCallOutputBody::ContentItems(items) => FunctionCallOutputBody::ContentItems(
|
||||
truncate_function_output_items_with_policy(items, policy),
|
||||
),
|
||||
};
|
||||
|
||||
FunctionCallOutputPayload {
|
||||
body,
|
||||
success: output.success,
|
||||
}
|
||||
}
|
||||
|
||||
/// API messages include every non-system item (user/assistant messages, reasoning,
|
||||
/// tool calls, tool outputs, shell calls, web-search calls, and image-generation
|
||||
/// calls).
|
||||
fn is_api_message(message: &ResponseItem) -> bool {
|
||||
match message {
|
||||
ResponseItem::Message { role, .. } => role.as_str() != "system",
|
||||
ResponseItem::FunctionCallOutput { .. }
|
||||
| ResponseItem::FunctionCall { .. }
|
||||
| ResponseItem::ToolSearchCall { .. }
|
||||
| ResponseItem::ToolSearchOutput { .. }
|
||||
| ResponseItem::CustomToolCall { .. }
|
||||
| ResponseItem::CustomToolCallOutput { .. }
|
||||
| ResponseItem::LocalShellCall { .. }
|
||||
| ResponseItem::Reasoning { .. }
|
||||
| ResponseItem::WebSearchCall { .. }
|
||||
| ResponseItem::ImageGenerationCall { .. }
|
||||
| ResponseItem::Compaction { .. } => true,
|
||||
ResponseItem::Other => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn estimate_reasoning_length(encoded_len: usize) -> usize {
|
||||
encoded_len
|
||||
.saturating_mul(3)
|
||||
.checked_div(4)
|
||||
.unwrap_or(0)
|
||||
.saturating_sub(650)
|
||||
}
|
||||
|
||||
fn estimate_item_token_count(item: &ResponseItem) -> i64 {
|
||||
let model_visible_bytes = estimate_response_item_model_visible_bytes(item);
|
||||
approx_tokens_from_byte_count_i64(model_visible_bytes)
|
||||
}
|
||||
|
||||
/// Approximate model-visible byte cost for one image input.
|
||||
///
|
||||
/// The estimator later converts bytes to tokens using a 4-bytes/token heuristic
|
||||
/// with ceiling division, so 7,373 bytes maps to approximately 1,844 tokens.
|
||||
const RESIZED_IMAGE_BYTES_ESTIMATE: i64 = 7373;
|
||||
// See https://platform.openai.com/docs/guides/images-vision#calculating-costs.
|
||||
// Use a direct 32px patch count only for `detail: "original"`;
|
||||
// all other image inputs continue to use `RESIZED_IMAGE_BYTES_ESTIMATE`.
|
||||
const ORIGINAL_IMAGE_PATCH_SIZE: u32 = 32;
|
||||
// See https://platform.openai.com/docs/guides/images-vision#model-sizing-behavior.
|
||||
// Keep this hard-coded for now; move it into model capabilities if the patch
|
||||
// budget starts changing often across model releases.
|
||||
const ORIGINAL_IMAGE_MAX_PATCHES: usize = 10_000;
|
||||
const ORIGINAL_IMAGE_ESTIMATE_CACHE_SIZE: usize = 32;
|
||||
|
||||
static ORIGINAL_IMAGE_ESTIMATE_CACHE: LazyLock<BlockingLruCache<[u8; 20], Option<i64>>> =
|
||||
LazyLock::new(|| {
|
||||
BlockingLruCache::new(
|
||||
NonZeroUsize::new(ORIGINAL_IMAGE_ESTIMATE_CACHE_SIZE).unwrap_or(NonZeroUsize::MIN),
|
||||
)
|
||||
});
|
||||
|
||||
pub(crate) fn estimate_response_item_model_visible_bytes(item: &ResponseItem) -> i64 {
|
||||
match item {
|
||||
ResponseItem::Reasoning {
|
||||
encrypted_content: Some(content),
|
||||
..
|
||||
}
|
||||
| ResponseItem::Compaction {
|
||||
encrypted_content: content,
|
||||
} => i64::try_from(estimate_reasoning_length(content.len())).unwrap_or(i64::MAX),
|
||||
item => {
|
||||
let raw = serde_json::to_string(item)
|
||||
.map(|serialized| i64::try_from(serialized.len()).unwrap_or(i64::MAX))
|
||||
.unwrap_or_default();
|
||||
let (payload_bytes, replacement_bytes) = image_data_url_estimate_adjustment(item);
|
||||
if payload_bytes == 0 || replacement_bytes == 0 {
|
||||
raw
|
||||
} else {
|
||||
// Replace raw base64 payload bytes with a per-image estimate.
|
||||
// We intentionally preserve the data URL prefix and JSON
|
||||
// wrapper bytes already included in `raw`.
|
||||
raw.saturating_sub(payload_bytes)
|
||||
.saturating_add(replacement_bytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the base64 payload byte length for inline image data URLs that are
|
||||
/// eligible for token-estimation discounting.
|
||||
///
|
||||
/// We only discount payloads for `data:image/...;base64,...` URLs (case
|
||||
/// insensitive markers) and leave everything else at raw serialized size.
|
||||
fn parse_base64_image_data_url(url: &str) -> Option<&str> {
|
||||
if !url
|
||||
.get(.."data:".len())
|
||||
.is_some_and(|prefix| prefix.eq_ignore_ascii_case("data:"))
|
||||
{
|
||||
return None;
|
||||
}
|
||||
let comma_index = url.find(',')?;
|
||||
let metadata = &url[..comma_index];
|
||||
let payload = &url[comma_index + 1..];
|
||||
// Parse the media type and parameters without decoding. This keeps the
|
||||
// estimator cheap while ensuring we only apply the fixed-cost image
|
||||
// heuristic to image-typed base64 data URLs.
|
||||
let metadata_without_scheme = &metadata["data:".len()..];
|
||||
let mut metadata_parts = metadata_without_scheme.split(';');
|
||||
let mime_type = metadata_parts.next().unwrap_or_default();
|
||||
let has_base64_marker = metadata_parts.any(|part| part.eq_ignore_ascii_case("base64"));
|
||||
if !mime_type
|
||||
.get(.."image/".len())
|
||||
.is_some_and(|prefix| prefix.eq_ignore_ascii_case("image/"))
|
||||
{
|
||||
return None;
|
||||
}
|
||||
if !has_base64_marker {
|
||||
return None;
|
||||
}
|
||||
Some(payload)
|
||||
}
|
||||
|
||||
fn estimate_original_image_bytes(image_url: &str) -> Option<i64> {
|
||||
let key = sha1_digest(image_url.as_bytes());
|
||||
ORIGINAL_IMAGE_ESTIMATE_CACHE.get_or_insert_with(key, || {
|
||||
let payload = match parse_base64_image_data_url(image_url) {
|
||||
Some(payload) => payload,
|
||||
None => {
|
||||
tracing::trace!("skipping original-detail estimate for non-base64 image data URL");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
let bytes = match BASE64_STANDARD.decode(payload) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(error) => {
|
||||
tracing::trace!("failed to decode original-detail image payload: {error}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
let dynamic = match image::load_from_memory(&bytes) {
|
||||
Ok(dynamic) => dynamic,
|
||||
Err(error) => {
|
||||
tracing::trace!("failed to decode original-detail image bytes: {error}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
let width = i64::from(dynamic.width());
|
||||
let height = i64::from(dynamic.height());
|
||||
let patch_size = i64::from(ORIGINAL_IMAGE_PATCH_SIZE);
|
||||
let patches_wide = width.saturating_add(patch_size.saturating_sub(1)) / patch_size;
|
||||
let patches_high = height.saturating_add(patch_size.saturating_sub(1)) / patch_size;
|
||||
let patch_count = patches_wide.saturating_mul(patches_high);
|
||||
let patch_count = usize::try_from(patch_count).unwrap_or(usize::MAX);
|
||||
let patch_count = patch_count.min(ORIGINAL_IMAGE_MAX_PATCHES);
|
||||
Some(i64::try_from(approx_bytes_for_tokens(patch_count)).unwrap_or(i64::MAX))
|
||||
})
|
||||
}
|
||||
|
||||
/// Scans one response item for discount-eligible inline image data URLs and
|
||||
/// returns:
|
||||
/// - total base64 payload bytes to subtract from raw serialized size
|
||||
/// - total replacement byte estimate for those images
|
||||
fn image_data_url_estimate_adjustment(item: &ResponseItem) -> (i64, i64) {
|
||||
let mut payload_bytes = 0i64;
|
||||
let mut replacement_bytes = 0i64;
|
||||
|
||||
let mut accumulate = |image_url: &str, detail: Option<ImageDetail>| {
|
||||
if let Some(payload_len) = parse_base64_image_data_url(image_url).map(str::len) {
|
||||
payload_bytes =
|
||||
payload_bytes.saturating_add(i64::try_from(payload_len).unwrap_or(i64::MAX));
|
||||
replacement_bytes = replacement_bytes.saturating_add(match detail {
|
||||
Some(ImageDetail::Original) => {
|
||||
estimate_original_image_bytes(image_url).unwrap_or(RESIZED_IMAGE_BYTES_ESTIMATE)
|
||||
}
|
||||
_ => RESIZED_IMAGE_BYTES_ESTIMATE,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
match item {
|
||||
ResponseItem::Message { content, .. } => {
|
||||
for content_item in content {
|
||||
if let ContentItem::InputImage { image_url, detail } = content_item {
|
||||
accumulate(image_url, *detail);
|
||||
}
|
||||
}
|
||||
}
|
||||
ResponseItem::FunctionCallOutput { output, .. }
|
||||
| ResponseItem::CustomToolCallOutput { output, .. } => {
|
||||
if let FunctionCallOutputBody::ContentItems(items) = &output.body {
|
||||
for content_item in items {
|
||||
if let FunctionCallOutputContentItem::InputImage { image_url, detail } =
|
||||
content_item
|
||||
{
|
||||
accumulate(image_url, *detail);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
(payload_bytes, replacement_bytes)
|
||||
}
|
||||
|
||||
fn is_model_generated_item(item: &ResponseItem) -> bool {
|
||||
match item {
|
||||
ResponseItem::Message { role, .. } => role == "assistant",
|
||||
ResponseItem::Reasoning { .. }
|
||||
| ResponseItem::FunctionCall { .. }
|
||||
| ResponseItem::ToolSearchCall { .. }
|
||||
| ResponseItem::WebSearchCall { .. }
|
||||
| ResponseItem::ImageGenerationCall { .. }
|
||||
| ResponseItem::CustomToolCall { .. }
|
||||
| ResponseItem::LocalShellCall { .. }
|
||||
| ResponseItem::Compaction { .. } => true,
|
||||
ResponseItem::FunctionCallOutput { .. }
|
||||
| ResponseItem::ToolSearchOutput { .. }
|
||||
| ResponseItem::CustomToolCallOutput { .. }
|
||||
| ResponseItem::Other => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn is_codex_generated_item(item: &ResponseItem) -> bool {
|
||||
matches!(
|
||||
item,
|
||||
ResponseItem::FunctionCallOutput { .. }
|
||||
| ResponseItem::ToolSearchOutput { .. }
|
||||
| ResponseItem::CustomToolCallOutput { .. }
|
||||
) || matches!(item, ResponseItem::Message { role, .. } if role == "developer")
|
||||
}
|
||||
|
||||
pub(crate) fn is_user_turn_boundary(item: &ResponseItem) -> bool {
|
||||
let ResponseItem::Message { role, content, .. } = item else {
|
||||
return false;
|
||||
};
|
||||
|
||||
(role == "user" && !is_contextual_user_message_content(content))
|
||||
|| (role == "assistant" && is_inter_agent_instruction_content(content))
|
||||
}
|
||||
|
||||
fn is_inter_agent_instruction_content(content: &[ContentItem]) -> bool {
|
||||
InterAgentCommunication::is_message_content(content)
|
||||
}
|
||||
|
||||
fn user_message_positions(items: &[ResponseItem]) -> Vec<usize> {
|
||||
let mut positions = Vec::new();
|
||||
for (idx, item) in items.iter().enumerate() {
|
||||
if is_user_turn_boundary(item) {
|
||||
positions.push(idx);
|
||||
}
|
||||
}
|
||||
positions
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "history_tests.rs"]
|
||||
mod tests;
|
||||
@@ -1,9 +1,5 @@
|
||||
mod history;
|
||||
mod normalize;
|
||||
pub(crate) mod updates;
|
||||
|
||||
pub(crate) use history::ContextManager;
|
||||
pub(crate) use history::TotalTokenUsageBreakdown;
|
||||
pub(crate) use history::estimate_response_item_model_visible_bytes;
|
||||
pub(crate) use history::is_codex_generated_item;
|
||||
pub(crate) use history::is_user_turn_boundary;
|
||||
pub(crate) use codex_journal::history::estimate_response_item_model_visible_bytes;
|
||||
pub(crate) use codex_journal::history::is_codex_generated_item;
|
||||
pub(crate) use codex_journal::history::is_user_turn_boundary;
|
||||
|
||||
@@ -12,17 +12,43 @@ use crate::session::turn_context::TurnContext;
|
||||
use crate::shell::Shell;
|
||||
use codex_execpolicy::Policy;
|
||||
use codex_features::Feature;
|
||||
use codex_journal::Journal;
|
||||
use codex_journal::JournalEntry;
|
||||
use codex_journal::KeyFilter;
|
||||
use codex_journal::MetadataEntryBuilder;
|
||||
use codex_journal::PromptMessage;
|
||||
use codex_journal::PromptRenderer;
|
||||
use codex_protocol::config_types::Personality;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::openai_models::ModelInfo;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
|
||||
const PROMPT_BUNDLE_KEY_PREFIX: &str = "prompt";
|
||||
pub(crate) const DEVELOPER_BUNDLE: &str = "developer";
|
||||
pub(crate) const USAGE_HINT_BUNDLE: &str = "usage_hint";
|
||||
pub(crate) const CONTEXTUAL_USER_BUNDLE: &str = "contextual_user";
|
||||
pub(crate) const GUARDIAN_BUNDLE: &str = "guardian";
|
||||
|
||||
pub(crate) fn context_prompt_renderer() -> PromptRenderer {
|
||||
PromptRenderer::new()
|
||||
.group(KeyFilter::prefix([
|
||||
PROMPT_BUNDLE_KEY_PREFIX,
|
||||
DEVELOPER_BUNDLE,
|
||||
]))
|
||||
.group(KeyFilter::prefix([
|
||||
PROMPT_BUNDLE_KEY_PREFIX,
|
||||
USAGE_HINT_BUNDLE,
|
||||
]))
|
||||
.group(KeyFilter::prefix([
|
||||
PROMPT_BUNDLE_KEY_PREFIX,
|
||||
CONTEXTUAL_USER_BUNDLE,
|
||||
]))
|
||||
}
|
||||
|
||||
fn build_environment_update_item(
|
||||
previous: Option<&TurnContextItem>,
|
||||
next: &TurnContext,
|
||||
shell: &Shell,
|
||||
) -> Option<ResponseItem> {
|
||||
) -> Option<String> {
|
||||
if !next.config.include_environment_context {
|
||||
return None;
|
||||
}
|
||||
@@ -34,9 +60,7 @@ fn build_environment_update_item(
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(ContextualUserFragment::into(
|
||||
EnvironmentContext::diff_from_turn_context_item(prev, &next_context),
|
||||
))
|
||||
Some(EnvironmentContext::diff_from_turn_context_item(prev, &next_context).render())
|
||||
}
|
||||
|
||||
fn build_permissions_update_item(
|
||||
@@ -175,64 +199,105 @@ pub(crate) fn build_model_instructions_update_item(
|
||||
Some(ModelSwitchInstructions::new(model_instructions).render())
|
||||
}
|
||||
|
||||
pub(crate) fn build_developer_update_item(text_sections: Vec<String>) -> Option<ResponseItem> {
|
||||
build_text_message("developer", text_sections)
|
||||
pub(crate) fn context_entry(
|
||||
bundle: &str,
|
||||
name: &str,
|
||||
prompt_order: i64,
|
||||
message: PromptMessage,
|
||||
) -> Option<JournalEntry> {
|
||||
context_entry_builder(bundle, name, message)
|
||||
.prompt_order(prompt_order)
|
||||
.build()
|
||||
}
|
||||
|
||||
pub(crate) fn build_contextual_user_message(text_sections: Vec<String>) -> Option<ResponseItem> {
|
||||
build_text_message("user", text_sections)
|
||||
fn context_entry_builder(bundle: &str, name: &str, message: PromptMessage) -> MetadataEntryBuilder {
|
||||
Journal::metadata_entry_builder([PROMPT_BUNDLE_KEY_PREFIX, bundle, name], message)
|
||||
}
|
||||
|
||||
fn build_text_message(role: &str, text_sections: Vec<String>) -> Option<ResponseItem> {
|
||||
if text_sections.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let content = text_sections
|
||||
.into_iter()
|
||||
.map(|text| ContentItem::InputText { text })
|
||||
.collect();
|
||||
|
||||
Some(ResponseItem::Message {
|
||||
id: None,
|
||||
role: role.to_string(),
|
||||
content,
|
||||
phase: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn build_settings_update_items(
|
||||
pub(crate) fn build_settings_update_entries(
|
||||
previous: Option<&TurnContextItem>,
|
||||
previous_turn_settings: Option<&PreviousTurnSettings>,
|
||||
next: &TurnContext,
|
||||
shell: &Shell,
|
||||
exec_policy: &Policy,
|
||||
personality_feature_enabled: bool,
|
||||
) -> Vec<ResponseItem> {
|
||||
) -> Vec<JournalEntry> {
|
||||
// TODO(ccunningham): build_settings_update_items still does not cover every
|
||||
// model-visible item emitted by build_initial_context. Persist the remaining
|
||||
// inputs or add explicit replay events so fork/resume can diff everything
|
||||
// deterministically.
|
||||
let contextual_user_message = build_environment_update_item(previous, next, shell);
|
||||
let developer_update_sections = [
|
||||
// Keep model-switch instructions first so model-specific guidance is read before
|
||||
// any other context diffs on this turn.
|
||||
build_model_instructions_update_item(previous_turn_settings, next),
|
||||
build_permissions_update_item(previous, next, exec_policy),
|
||||
build_collaboration_mode_update_item(previous, next),
|
||||
build_realtime_update_item(previous, previous_turn_settings, next),
|
||||
build_personality_update_item(previous, next, personality_feature_enabled),
|
||||
]
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.collect();
|
||||
let mut entries = Vec::with_capacity(6);
|
||||
|
||||
let mut items = Vec::with_capacity(2);
|
||||
if let Some(developer_message) = build_developer_update_item(developer_update_sections) {
|
||||
items.push(developer_message);
|
||||
if let Some(item) =
|
||||
build_model_instructions_update_item(previous_turn_settings, next).and_then(|text| {
|
||||
// Keep model-switch instructions first so model-specific guidance is read before
|
||||
// any other context diffs on this turn.
|
||||
context_entry(
|
||||
DEVELOPER_BUNDLE,
|
||||
"model_switch",
|
||||
10,
|
||||
PromptMessage::developer_text(text),
|
||||
)
|
||||
})
|
||||
{
|
||||
entries.push(item);
|
||||
}
|
||||
if let Some(contextual_user_message) = contextual_user_message {
|
||||
items.push(contextual_user_message);
|
||||
if let Some(item) =
|
||||
build_permissions_update_item(previous, next, exec_policy).and_then(|text| {
|
||||
context_entry(
|
||||
DEVELOPER_BUNDLE,
|
||||
"permissions",
|
||||
20,
|
||||
PromptMessage::developer_text(text),
|
||||
)
|
||||
})
|
||||
{
|
||||
entries.push(item);
|
||||
}
|
||||
items
|
||||
if let Some(item) = build_collaboration_mode_update_item(previous, next).and_then(|text| {
|
||||
context_entry(
|
||||
DEVELOPER_BUNDLE,
|
||||
"collaboration_mode",
|
||||
30,
|
||||
PromptMessage::developer_text(text),
|
||||
)
|
||||
}) {
|
||||
entries.push(item);
|
||||
}
|
||||
if let Some(item) =
|
||||
build_realtime_update_item(previous, previous_turn_settings, next).and_then(|text| {
|
||||
context_entry(
|
||||
DEVELOPER_BUNDLE,
|
||||
"realtime",
|
||||
40,
|
||||
PromptMessage::developer_text(text),
|
||||
)
|
||||
})
|
||||
{
|
||||
entries.push(item);
|
||||
}
|
||||
if let Some(item) = build_personality_update_item(previous, next, personality_feature_enabled)
|
||||
.and_then(|text| {
|
||||
context_entry(
|
||||
DEVELOPER_BUNDLE,
|
||||
"personality",
|
||||
50,
|
||||
PromptMessage::developer_text(text),
|
||||
)
|
||||
})
|
||||
{
|
||||
entries.push(item);
|
||||
}
|
||||
if let Some(item) = build_environment_update_item(previous, next, shell).and_then(|text| {
|
||||
context_entry(
|
||||
CONTEXTUAL_USER_BUNDLE,
|
||||
"environment",
|
||||
60,
|
||||
PromptMessage::user_text(text),
|
||||
)
|
||||
}) {
|
||||
entries.push(item);
|
||||
}
|
||||
|
||||
entries
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ use serde_json::Value;
|
||||
use crate::compact::content_items_to_text;
|
||||
use crate::event_mapping::is_contextual_user_message_content;
|
||||
use crate::session::session::Session;
|
||||
use crate::state::history as session_history;
|
||||
use codex_utils_output_truncation::approx_bytes_for_tokens;
|
||||
use codex_utils_output_truncation::approx_token_count;
|
||||
use codex_utils_output_truncation::approx_tokens_from_byte_count;
|
||||
@@ -93,9 +94,10 @@ pub(crate) async fn build_guardian_prompt_items(
|
||||
mode: GuardianPromptMode,
|
||||
) -> serde_json::Result<GuardianPromptItems> {
|
||||
let history = session.clone_history().await;
|
||||
let transcript_entries = collect_guardian_transcript_entries(history.raw_items());
|
||||
let history_items = session_history::raw_items(&history);
|
||||
let transcript_entries = collect_guardian_transcript_entries(&history_items);
|
||||
let transcript_cursor = GuardianTranscriptCursor {
|
||||
parent_history_version: history.history_version(),
|
||||
parent_history_version: session.history_version().await,
|
||||
transcript_entry_count: transcript_entries.len(),
|
||||
};
|
||||
let planned_action_json = format_guardian_action_pretty(&request)?;
|
||||
|
||||
@@ -18,6 +18,7 @@ use crate::config::Config;
|
||||
use crate::session::session::Session;
|
||||
use crate::session::turn::build_prompt;
|
||||
use crate::session::turn::built_tools;
|
||||
use crate::state::history as session_history;
|
||||
use crate::thread_manager::ThreadManager;
|
||||
|
||||
/// Build the model-visible `input` list for a single debug turn.
|
||||
@@ -73,10 +74,9 @@ pub(crate) async fn build_prompt_input_from_session(
|
||||
.await;
|
||||
}
|
||||
|
||||
let prompt_input = sess
|
||||
.clone_history()
|
||||
.await
|
||||
.for_prompt(&turn_context.model_info.input_modalities);
|
||||
let history = sess.clone_history().await;
|
||||
let prompt_input =
|
||||
session_history::for_prompt(&history, &turn_context.model_info.input_modalities);
|
||||
let router = built_tools(
|
||||
sess,
|
||||
turn_context.as_ref(),
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::compact::content_items_to_text;
|
||||
use crate::event_mapping::is_contextual_user_message_content;
|
||||
use crate::session::session::Session;
|
||||
use crate::state::history as session_history;
|
||||
use chrono::Utc;
|
||||
use codex_exec_server::LOCAL_FS;
|
||||
use codex_git_utils::resolve_root_git_project_for_trust;
|
||||
@@ -62,7 +63,8 @@ pub(crate) async fn build_realtime_startup_context(
|
||||
let config = sess.get_config().await;
|
||||
let cwd = config.cwd.clone();
|
||||
let history = sess.clone_history().await;
|
||||
let current_thread_section = build_current_thread_section(history.raw_items());
|
||||
let history_items = session_history::raw_items(&history);
|
||||
let current_thread_section = build_current_thread_section(&history_items);
|
||||
let recent_threads = load_recent_threads(sess).await;
|
||||
let recent_work_section = build_recent_work_section(&cwd, &recent_threads).await;
|
||||
let workspace_section = build_workspace_section_with_user_root(&cwd, home_dir()).await;
|
||||
|
||||
@@ -19,6 +19,7 @@ use crate::realtime_context::truncate_realtime_text_to_token_budget;
|
||||
use crate::realtime_conversation::REALTIME_USER_TEXT_PREFIX;
|
||||
use crate::realtime_conversation::prefix_realtime_v2_text;
|
||||
use crate::session::spawn_review_thread;
|
||||
use crate::state::history as session_history;
|
||||
use codex_config::CloudRequirementsLoader;
|
||||
use codex_config::LoaderOverrides;
|
||||
use codex_config::loader::load_config_layers_state;
|
||||
@@ -891,8 +892,8 @@ pub async fn shutdown(sess: &Arc<Session>, sub_id: String) -> bool {
|
||||
sess.guardian_review_session.shutdown().await;
|
||||
info!("Shutting down Codex instance");
|
||||
let history = sess.clone_history().await;
|
||||
let turn_count = history
|
||||
.raw_items()
|
||||
let history_items = session_history::raw_items(&history);
|
||||
let turn_count = history_items
|
||||
.iter()
|
||||
.filter(|item| is_user_turn_boundary(item))
|
||||
.count();
|
||||
|
||||
@@ -59,6 +59,7 @@ use codex_features::Feature;
|
||||
use codex_features::unstable_features_warning_event;
|
||||
use codex_hooks::Hooks;
|
||||
use codex_hooks::HooksConfig;
|
||||
use codex_journal::PromptMessage;
|
||||
use codex_login::AuthManager;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_login::auth_env_telemetry::collect_auth_env_telemetry;
|
||||
@@ -171,8 +172,7 @@ use crate::config::Constrained;
|
||||
use crate::config::ConstraintResult;
|
||||
use crate::config::StartedNetworkProxy;
|
||||
use crate::config::resolve_web_search_mode_for_turn;
|
||||
use crate::context_manager::ContextManager;
|
||||
use crate::context_manager::TotalTokenUsageBreakdown;
|
||||
use crate::state::TotalTokenUsageBreakdown;
|
||||
use crate::thread_rollout_truncation::initial_history_has_prior_user_turns;
|
||||
use codex_config::CONFIG_TOML_FILE;
|
||||
use codex_config::types::McpServerConfig;
|
||||
@@ -1084,7 +1084,7 @@ impl Session {
|
||||
|
||||
pub(crate) async fn get_total_token_usage_breakdown(&self) -> TotalTokenUsageBreakdown {
|
||||
let state = self.state.lock().await;
|
||||
state.history.get_total_token_usage_breakdown()
|
||||
state.get_total_token_usage_breakdown()
|
||||
}
|
||||
|
||||
pub(crate) async fn total_token_usage(&self) -> Option<TokenUsage> {
|
||||
@@ -1108,7 +1108,7 @@ impl Session {
|
||||
turn_context: &TurnContext,
|
||||
) -> Option<i64> {
|
||||
let state = self.state.lock().await;
|
||||
state.history.estimate_token_count(turn_context)
|
||||
state.estimate_token_count(turn_context)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_base_instructions(&self) -> BaseInstructions {
|
||||
@@ -1430,14 +1430,20 @@ impl Session {
|
||||
};
|
||||
let shell = self.user_shell();
|
||||
let exec_policy = self.services.exec_policy.current();
|
||||
crate::context_manager::updates::build_settings_update_items(
|
||||
let entries = crate::context_manager::updates::build_settings_update_entries(
|
||||
reference_context_item,
|
||||
previous_turn_settings.as_ref(),
|
||||
current_context,
|
||||
shell.as_ref(),
|
||||
exec_policy.as_ref(),
|
||||
self.features.enabled(Feature::Personality),
|
||||
)
|
||||
);
|
||||
let resolved = match codex_journal::Journal::from_entries(entries).resolve() {
|
||||
Ok(resolved) => resolved,
|
||||
Err(error) => unreachable!("settings update entries should resolve: {error}"),
|
||||
};
|
||||
crate::context_manager::updates::context_prompt_renderer()
|
||||
.render_metadata(resolved.metadata())
|
||||
}
|
||||
|
||||
/// Persist the event to rollout and send it to clients.
|
||||
@@ -2499,16 +2505,31 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "MCP app context rendering reads through the session-owned manager guard"
|
||||
)]
|
||||
pub(crate) async fn build_initial_context(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
) -> Vec<ResponseItem> {
|
||||
let mut developer_sections = Vec::<String>::with_capacity(8);
|
||||
let mut contextual_user_sections = Vec::<String>::with_capacity(2);
|
||||
let resolved = match codex_journal::Journal::from_entries(
|
||||
self.build_initial_context_entries(turn_context).await,
|
||||
)
|
||||
.resolve()
|
||||
{
|
||||
Ok(resolved) => resolved,
|
||||
Err(error) => unreachable!("initial context entries should resolve: {error}"),
|
||||
};
|
||||
crate::context_manager::updates::context_prompt_renderer()
|
||||
.render_metadata(resolved.metadata())
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::await_holding_invalid_type,
|
||||
reason = "MCP app context rendering reads through the session-owned manager guard"
|
||||
)]
|
||||
async fn build_initial_context_entries(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
) -> Vec<codex_journal::JournalEntry> {
|
||||
let mut entries = Vec::<codex_journal::JournalEntry>::with_capacity(12);
|
||||
let shell = self.user_shell();
|
||||
let (
|
||||
reference_context_item,
|
||||
@@ -2531,26 +2552,38 @@ impl Session {
|
||||
previous_turn_settings.as_ref(),
|
||||
turn_context,
|
||||
)
|
||||
&& let Some(entry) = crate::context_manager::updates::context_entry(
|
||||
crate::context_manager::updates::DEVELOPER_BUNDLE,
|
||||
"model_switch",
|
||||
10,
|
||||
PromptMessage::developer_text(model_switch_message),
|
||||
)
|
||||
{
|
||||
developer_sections.push(model_switch_message);
|
||||
entries.push(entry);
|
||||
}
|
||||
if turn_context.config.include_permissions_instructions {
|
||||
developer_sections.push(
|
||||
PermissionsInstructions::from_permission_profile(
|
||||
&turn_context.permission_profile,
|
||||
turn_context.approval_policy.value(),
|
||||
turn_context.config.approvals_reviewer,
|
||||
self.services.exec_policy.current().as_ref(),
|
||||
&turn_context.cwd,
|
||||
turn_context
|
||||
.features
|
||||
.enabled(Feature::ExecPermissionApprovals),
|
||||
turn_context
|
||||
.features
|
||||
.enabled(Feature::RequestPermissionsTool),
|
||||
)
|
||||
.render(),
|
||||
);
|
||||
let permissions_instructions = PermissionsInstructions::from_permission_profile(
|
||||
&turn_context.permission_profile,
|
||||
turn_context.approval_policy.value(),
|
||||
turn_context.config.approvals_reviewer,
|
||||
self.services.exec_policy.current().as_ref(),
|
||||
&turn_context.cwd,
|
||||
turn_context
|
||||
.features
|
||||
.enabled(Feature::ExecPermissionApprovals),
|
||||
turn_context
|
||||
.features
|
||||
.enabled(Feature::RequestPermissionsTool),
|
||||
)
|
||||
.render();
|
||||
if let Some(entry) = crate::context_manager::updates::context_entry(
|
||||
crate::context_manager::updates::DEVELOPER_BUNDLE,
|
||||
"permissions",
|
||||
20,
|
||||
PromptMessage::developer_text(permissions_instructions),
|
||||
) {
|
||||
entries.push(entry);
|
||||
}
|
||||
}
|
||||
let separate_guardian_developer_message =
|
||||
crate::guardian::is_guardian_reviewer_source(&session_source);
|
||||
@@ -2559,29 +2592,52 @@ impl Session {
|
||||
if !separate_guardian_developer_message
|
||||
&& let Some(developer_instructions) = turn_context.developer_instructions.as_deref()
|
||||
&& !developer_instructions.is_empty()
|
||||
&& let Some(entry) = crate::context_manager::updates::context_entry(
|
||||
crate::context_manager::updates::DEVELOPER_BUNDLE,
|
||||
"developer_instructions",
|
||||
30,
|
||||
PromptMessage::developer_text(developer_instructions.to_string()),
|
||||
)
|
||||
{
|
||||
developer_sections.push(developer_instructions.to_string());
|
||||
entries.push(entry);
|
||||
}
|
||||
// Add developer instructions for memories.
|
||||
if turn_context.features.enabled(Feature::MemoryTool)
|
||||
&& turn_context.config.memories.use_memories
|
||||
&& let Some(memory_prompt) =
|
||||
build_memory_tool_developer_instructions(&turn_context.config.codex_home).await
|
||||
&& let Some(entry) = crate::context_manager::updates::context_entry(
|
||||
crate::context_manager::updates::DEVELOPER_BUNDLE,
|
||||
"memory_tool",
|
||||
40,
|
||||
PromptMessage::developer_text(memory_prompt),
|
||||
)
|
||||
{
|
||||
developer_sections.push(memory_prompt);
|
||||
entries.push(entry);
|
||||
}
|
||||
// Add developer instructions from collaboration_mode if they exist and are non-empty
|
||||
if let Some(collab_instructions) =
|
||||
CollaborationModeInstructions::from_collaboration_mode(&collaboration_mode)
|
||||
&& let Some(entry) = crate::context_manager::updates::context_entry(
|
||||
crate::context_manager::updates::DEVELOPER_BUNDLE,
|
||||
"collaboration_mode",
|
||||
50,
|
||||
PromptMessage::developer_text(collab_instructions.render()),
|
||||
)
|
||||
{
|
||||
developer_sections.push(collab_instructions.render());
|
||||
entries.push(entry);
|
||||
}
|
||||
if let Some(realtime_update) = crate::context_manager::updates::build_initial_realtime_item(
|
||||
reference_context_item.as_ref(),
|
||||
previous_turn_settings.as_ref(),
|
||||
turn_context,
|
||||
) && let Some(entry) = crate::context_manager::updates::context_entry(
|
||||
crate::context_manager::updates::DEVELOPER_BUNDLE,
|
||||
"realtime",
|
||||
60,
|
||||
PromptMessage::developer_text(realtime_update),
|
||||
) {
|
||||
developer_sections.push(realtime_update);
|
||||
entries.push(entry);
|
||||
}
|
||||
if self.features.enabled(Feature::Personality)
|
||||
&& let Some(personality) = turn_context.personality
|
||||
@@ -2595,9 +2651,16 @@ impl Session {
|
||||
&model_info,
|
||||
personality,
|
||||
)
|
||||
&& let Some(entry) = crate::context_manager::updates::context_entry(
|
||||
crate::context_manager::updates::DEVELOPER_BUNDLE,
|
||||
"personality",
|
||||
70,
|
||||
PromptMessage::developer_text(
|
||||
PersonalitySpecInstructions::new(personality_message).render(),
|
||||
),
|
||||
)
|
||||
{
|
||||
developer_sections
|
||||
.push(PersonalitySpecInstructions::new(personality_message).render());
|
||||
entries.push(entry);
|
||||
}
|
||||
}
|
||||
if turn_context.config.include_apps_instructions && turn_context.apps_enabled() {
|
||||
@@ -2610,8 +2673,14 @@ impl Session {
|
||||
.await;
|
||||
if let Some(apps_instructions) =
|
||||
AppsInstructions::from_connectors(&accessible_and_enabled_connectors)
|
||||
&& let Some(entry) = crate::context_manager::updates::context_entry(
|
||||
crate::context_manager::updates::DEVELOPER_BUNDLE,
|
||||
"apps",
|
||||
80,
|
||||
PromptMessage::developer_text(apps_instructions.render()),
|
||||
)
|
||||
{
|
||||
developer_sections.push(apps_instructions.render());
|
||||
entries.push(entry);
|
||||
}
|
||||
}
|
||||
if turn_context.config.include_skill_instructions {
|
||||
@@ -2634,7 +2703,14 @@ impl Session {
|
||||
})
|
||||
.await;
|
||||
}
|
||||
developer_sections.push(skills_instructions.render());
|
||||
if let Some(entry) = crate::context_manager::updates::context_entry(
|
||||
crate::context_manager::updates::DEVELOPER_BUNDLE,
|
||||
"skills",
|
||||
90,
|
||||
PromptMessage::developer_text(skills_instructions.render()),
|
||||
) {
|
||||
entries.push(entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
let loaded_plugins = self
|
||||
@@ -2644,24 +2720,43 @@ impl Session {
|
||||
.await;
|
||||
if let Some(plugin_instructions) =
|
||||
AvailablePluginsInstructions::from_plugins(loaded_plugins.capability_summaries())
|
||||
&& let Some(entry) = crate::context_manager::updates::context_entry(
|
||||
crate::context_manager::updates::DEVELOPER_BUNDLE,
|
||||
"plugins",
|
||||
100,
|
||||
PromptMessage::developer_text(plugin_instructions.render()),
|
||||
)
|
||||
{
|
||||
developer_sections.push(plugin_instructions.render());
|
||||
entries.push(entry);
|
||||
}
|
||||
if turn_context.features.enabled(Feature::CodexGitCommit)
|
||||
&& let Some(commit_message_instruction) = commit_message_trailer_instruction(
|
||||
turn_context.config.commit_attribution.as_deref(),
|
||||
)
|
||||
&& let Some(entry) = crate::context_manager::updates::context_entry(
|
||||
crate::context_manager::updates::DEVELOPER_BUNDLE,
|
||||
"commit_message_trailer",
|
||||
110,
|
||||
PromptMessage::developer_text(commit_message_instruction),
|
||||
)
|
||||
{
|
||||
developer_sections.push(commit_message_instruction);
|
||||
entries.push(entry);
|
||||
}
|
||||
if let Some(user_instructions) = turn_context.user_instructions.as_deref() {
|
||||
contextual_user_sections.push(
|
||||
UserInstructions {
|
||||
text: user_instructions.to_string(),
|
||||
directory: turn_context.cwd.to_string_lossy().into_owned(),
|
||||
}
|
||||
.render(),
|
||||
);
|
||||
if let Some(user_instructions) = turn_context.user_instructions.as_deref()
|
||||
&& let Some(entry) = crate::context_manager::updates::context_entry(
|
||||
crate::context_manager::updates::CONTEXTUAL_USER_BUNDLE,
|
||||
"user_instructions",
|
||||
130,
|
||||
PromptMessage::user_text(
|
||||
UserInstructions {
|
||||
text: user_instructions.to_string(),
|
||||
directory: turn_context.cwd.to_string_lossy().into_owned(),
|
||||
}
|
||||
.render(),
|
||||
),
|
||||
)
|
||||
{
|
||||
entries.push(entry);
|
||||
}
|
||||
if turn_context.config.include_environment_context {
|
||||
let subagents = self
|
||||
@@ -2669,48 +2764,51 @@ impl Session {
|
||||
.agent_control
|
||||
.format_environment_context_subagents(self.conversation_id)
|
||||
.await;
|
||||
contextual_user_sections.push(
|
||||
crate::context::EnvironmentContext::from_turn_context(turn_context, shell.as_ref())
|
||||
if let Some(entry) = crate::context_manager::updates::context_entry(
|
||||
crate::context_manager::updates::CONTEXTUAL_USER_BUNDLE,
|
||||
"environment",
|
||||
140,
|
||||
PromptMessage::user_text(
|
||||
crate::context::EnvironmentContext::from_turn_context(
|
||||
turn_context,
|
||||
shell.as_ref(),
|
||||
)
|
||||
.with_subagents(subagents)
|
||||
.render(),
|
||||
);
|
||||
),
|
||||
) {
|
||||
entries.push(entry);
|
||||
}
|
||||
}
|
||||
|
||||
let multi_agent_v2_usage_hint_text =
|
||||
multi_agents::usage_hint_text(turn_context, &session_source);
|
||||
|
||||
let mut items = Vec::with_capacity(4);
|
||||
if let Some(developer_message) =
|
||||
crate::context_manager::updates::build_developer_update_item(developer_sections)
|
||||
{
|
||||
items.push(developer_message);
|
||||
}
|
||||
if let Some(usage_hint_text) = multi_agent_v2_usage_hint_text
|
||||
&& let Some(usage_hint_message) =
|
||||
crate::context_manager::updates::build_developer_update_item(vec![
|
||||
usage_hint_text.to_string(),
|
||||
])
|
||||
&& let Some(entry) = crate::context_manager::updates::context_entry(
|
||||
crate::context_manager::updates::USAGE_HINT_BUNDLE,
|
||||
"multi_agent_v2",
|
||||
120,
|
||||
PromptMessage::developer_text(usage_hint_text.to_string()),
|
||||
)
|
||||
{
|
||||
items.push(usage_hint_message);
|
||||
}
|
||||
if let Some(contextual_user_message) =
|
||||
crate::context_manager::updates::build_contextual_user_message(contextual_user_sections)
|
||||
{
|
||||
items.push(contextual_user_message);
|
||||
entries.push(entry);
|
||||
}
|
||||
// Emit the guardian policy prompt as a separate developer item so the guardian
|
||||
// subagent sees a distinct, easy-to-audit instruction block.
|
||||
if separate_guardian_developer_message
|
||||
&& let Some(developer_instructions) = turn_context.developer_instructions.as_deref()
|
||||
&& !developer_instructions.is_empty()
|
||||
&& let Some(guardian_developer_message) =
|
||||
crate::context_manager::updates::build_developer_update_item(vec![
|
||||
developer_instructions.to_string(),
|
||||
])
|
||||
&& let Some(entry) = crate::context_manager::updates::context_entry(
|
||||
crate::context_manager::updates::GUARDIAN_BUNDLE,
|
||||
"developer_instructions",
|
||||
150,
|
||||
PromptMessage::developer_text(developer_instructions.to_string()),
|
||||
)
|
||||
{
|
||||
items.push(guardian_developer_message);
|
||||
entries.push(entry);
|
||||
}
|
||||
items
|
||||
entries
|
||||
}
|
||||
|
||||
pub(crate) async fn persist_rollout_items(&self, items: &[RolloutItem]) {
|
||||
@@ -2721,11 +2819,16 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn clone_history(&self) -> ContextManager {
|
||||
pub(crate) async fn clone_history(&self) -> codex_journal::Journal {
|
||||
let state = self.state.lock().await;
|
||||
state.clone_history()
|
||||
}
|
||||
|
||||
pub(crate) async fn history_version(&self) -> u64 {
|
||||
let state = self.state.lock().await;
|
||||
state.history_version()
|
||||
}
|
||||
|
||||
pub(crate) async fn reference_context_item(&self) -> Option<TurnContextItem> {
|
||||
let state = self.state.lock().await;
|
||||
state.reference_context_item()
|
||||
@@ -2792,7 +2895,10 @@ impl Session {
|
||||
let history = self.clone_history().await;
|
||||
let base_instructions = self.get_base_instructions().await;
|
||||
let Some(estimated_total_tokens) =
|
||||
history.estimate_token_count_with_base_instructions(&base_instructions)
|
||||
crate::state::history::estimate_token_count_with_base_instructions(
|
||||
&history,
|
||||
&base_instructions,
|
||||
)
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use super::*;
|
||||
use crate::context_manager::is_user_turn_boundary;
|
||||
use crate::state::history as session_history;
|
||||
use codex_journal::Journal;
|
||||
|
||||
// Return value of `Session::reconstruct_history_from_rollout`, bundling the rebuilt history with
|
||||
// the resume/fork hydration metadata derived from the same replay.
|
||||
@@ -231,10 +233,11 @@ impl Session {
|
||||
);
|
||||
}
|
||||
|
||||
let mut history = ContextManager::new();
|
||||
let mut history = Journal::new();
|
||||
let mut rebuilt_history_reference_context_item = None;
|
||||
let mut saw_legacy_compaction_without_replacement_history = false;
|
||||
if let Some(base_replacement_history) = base_replacement_history {
|
||||
history.replace(base_replacement_history.to_vec());
|
||||
session_history::replace_history(&mut history, base_replacement_history.to_vec());
|
||||
}
|
||||
// Materialize exact history semantics from the replay-derived suffix. The eventual lazy
|
||||
// design should keep this same replay shape, but drive it from a resumable reverse source
|
||||
@@ -242,7 +245,8 @@ impl Session {
|
||||
for item in rollout_suffix {
|
||||
match item {
|
||||
RolloutItem::ResponseItem(response_item) => {
|
||||
history.record_items(
|
||||
session_history::record_items(
|
||||
&mut history,
|
||||
std::iter::once(response_item),
|
||||
turn_context.truncation_policy,
|
||||
);
|
||||
@@ -251,7 +255,7 @@ impl Session {
|
||||
if let Some(replacement_history) = &compacted.replacement_history {
|
||||
// This should actually never happen, because the reverse loop above (to build rollout_suffix)
|
||||
// should stop before any compaction that has Some replacement_history
|
||||
history.replace(replacement_history.clone());
|
||||
session_history::replace_history(&mut history, replacement_history.clone());
|
||||
} else {
|
||||
saw_legacy_compaction_without_replacement_history = true;
|
||||
// Legacy rollouts without `replacement_history` should rebuild the
|
||||
@@ -262,17 +266,22 @@ impl Session {
|
||||
// prompt shape.
|
||||
// TODO(ccunningham): if we drop support for None replacement_history compaction items,
|
||||
// we can get rid of this second loop entirely and just build `history` directly in the first loop.
|
||||
let user_messages = collect_user_messages(history.raw_items());
|
||||
let history_items = session_history::raw_items(&history);
|
||||
let user_messages = collect_user_messages(history_items.as_slice());
|
||||
let rebuilt = compact::build_compacted_history(
|
||||
Vec::new(),
|
||||
&user_messages,
|
||||
&compacted.message,
|
||||
);
|
||||
history.replace(rebuilt);
|
||||
session_history::replace_history(&mut history, rebuilt);
|
||||
}
|
||||
}
|
||||
RolloutItem::EventMsg(EventMsg::ThreadRolledBack(rollback)) => {
|
||||
history.drop_last_n_user_turns(rollback.num_turns);
|
||||
session_history::drop_last_n_user_turns(
|
||||
&mut history,
|
||||
&mut rebuilt_history_reference_context_item,
|
||||
rollback.num_turns,
|
||||
);
|
||||
}
|
||||
RolloutItem::EventMsg(_)
|
||||
| RolloutItem::TurnContext(_)
|
||||
@@ -293,7 +302,7 @@ impl Session {
|
||||
};
|
||||
|
||||
RolloutReconstruction {
|
||||
history: history.raw_items().to_vec(),
|
||||
history: session_history::raw_items(&history),
|
||||
previous_turn_settings,
|
||||
reference_context_item,
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ use crate::function_tool::FunctionCallError;
|
||||
use crate::shell::default_user_shell;
|
||||
use crate::skills::SkillRenderSideEffects;
|
||||
use crate::skills::render::SkillMetadataBudget;
|
||||
use crate::state::history as session_history;
|
||||
use crate::test_support::models_manager_with_provider;
|
||||
use crate::tools::format_exec_output_str;
|
||||
use codex_config::ConfigLayerStack;
|
||||
@@ -20,6 +21,7 @@ use codex_config::RequirementSource;
|
||||
use codex_config::Sourced;
|
||||
use codex_config::loader::project_trust_key;
|
||||
use codex_config::types::ToolSuggestDisabledTool;
|
||||
use codex_journal::Journal;
|
||||
|
||||
use codex_features::Feature;
|
||||
use codex_features::Features;
|
||||
@@ -78,6 +80,63 @@ use codex_execpolicy::Policy;
|
||||
use codex_network_proxy::NetworkProxyConfig;
|
||||
use codex_otel::MetricsClient;
|
||||
use codex_otel::MetricsConfig;
|
||||
|
||||
trait JournalTestExt {
|
||||
fn raw_items(&self) -> Vec<ResponseItem>;
|
||||
fn for_prompt(
|
||||
&self,
|
||||
input_modalities: &[codex_protocol::openai_models::InputModality],
|
||||
) -> Vec<ResponseItem>;
|
||||
fn estimate_token_count(&self, turn_context: &TurnContext) -> Option<i64>;
|
||||
fn estimate_token_count_with_base_instructions(
|
||||
&self,
|
||||
base_instructions: &BaseInstructions,
|
||||
) -> Option<i64>;
|
||||
fn record_items<I>(
|
||||
&mut self,
|
||||
items: I,
|
||||
policy: codex_utils_output_truncation::TruncationPolicy,
|
||||
) where
|
||||
I: IntoIterator,
|
||||
I::Item: std::ops::Deref<Target = ResponseItem>;
|
||||
fn replace(&mut self, items: Vec<ResponseItem>);
|
||||
}
|
||||
|
||||
impl JournalTestExt for Journal {
|
||||
fn raw_items(&self) -> Vec<ResponseItem> {
|
||||
session_history::raw_items(self)
|
||||
}
|
||||
|
||||
fn for_prompt(
|
||||
&self,
|
||||
input_modalities: &[codex_protocol::openai_models::InputModality],
|
||||
) -> Vec<ResponseItem> {
|
||||
session_history::for_prompt(self, input_modalities)
|
||||
}
|
||||
|
||||
fn estimate_token_count(&self, turn_context: &TurnContext) -> Option<i64> {
|
||||
session_history::estimate_token_count(self, turn_context)
|
||||
}
|
||||
|
||||
fn estimate_token_count_with_base_instructions(
|
||||
&self,
|
||||
base_instructions: &BaseInstructions,
|
||||
) -> Option<i64> {
|
||||
session_history::estimate_token_count_with_base_instructions(self, base_instructions)
|
||||
}
|
||||
|
||||
fn record_items<I>(&mut self, items: I, policy: codex_utils_output_truncation::TruncationPolicy)
|
||||
where
|
||||
I: IntoIterator,
|
||||
I::Item: std::ops::Deref<Target = ResponseItem>,
|
||||
{
|
||||
session_history::record_items(self, items, policy);
|
||||
}
|
||||
|
||||
fn replace(&mut self, items: Vec<ResponseItem>) {
|
||||
session_history::replace_history(self, items);
|
||||
}
|
||||
}
|
||||
use codex_otel::THREAD_SKILLS_DESCRIPTION_TRUNCATED_CHARS_METRIC;
|
||||
use codex_otel::THREAD_SKILLS_ENABLED_TOTAL_METRIC;
|
||||
use codex_otel::THREAD_SKILLS_KEPT_TOTAL_METRIC;
|
||||
@@ -7629,7 +7688,7 @@ async fn sample_rollout(
|
||||
_turn_context: &TurnContext,
|
||||
) -> (Vec<RolloutItem>, Vec<ResponseItem>) {
|
||||
let mut rollout_items = Vec::new();
|
||||
let mut live_history = ContextManager::new();
|
||||
let mut live_history = Journal::new();
|
||||
|
||||
// Use the same turn_context source as record_initial_history so model_info (and thus
|
||||
// personality_spec) matches reconstruction.
|
||||
|
||||
@@ -42,6 +42,7 @@ use crate::resolve_skill_dependencies_for_turn;
|
||||
use crate::session::PreviousTurnSettings;
|
||||
use crate::session::session::Session;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use crate::state::history as session_history;
|
||||
use crate::stream_events_utils::HandleOutputCtx;
|
||||
use crate::stream_events_utils::handle_non_tool_response_item;
|
||||
use crate::stream_events_utils::handle_output_item_done;
|
||||
@@ -428,9 +429,8 @@ pub(crate) async fn run_turn(
|
||||
|
||||
// Construct the input that we will send to the model.
|
||||
let sampling_request_input: Vec<ResponseItem> = {
|
||||
sess.clone_history()
|
||||
.await
|
||||
.for_prompt(&turn_context.model_info.input_modalities)
|
||||
let history = sess.clone_history().await;
|
||||
session_history::for_prompt(&history, &turn_context.model_info.input_modalities)
|
||||
};
|
||||
|
||||
let sampling_request_input_messages = sampling_request_input
|
||||
@@ -629,7 +629,7 @@ pub(crate) async fn run_turn(
|
||||
error_or_panic(
|
||||
"Invalid image detected; sanitizing tool output to prevent poisoning",
|
||||
);
|
||||
if state.history.replace_last_turn_images("Invalid image") {
|
||||
if state.replace_last_turn_images("Invalid image") {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
@@ -1005,9 +1005,8 @@ async fn run_sampling_request(
|
||||
let prompt_input = if let Some(input) = initial_input.take() {
|
||||
input
|
||||
} else {
|
||||
sess.clone_history()
|
||||
.await
|
||||
.for_prompt(&turn_context.model_info.input_modalities)
|
||||
let history = sess.clone_history().await;
|
||||
session_history::for_prompt(&history, &turn_context.model_info.input_modalities)
|
||||
};
|
||||
let prompt = build_prompt(
|
||||
prompt_input,
|
||||
|
||||
314
codex-rs/core/src/state/history.rs
Normal file
314
codex-rs/core/src/state/history.rs
Normal file
@@ -0,0 +1,314 @@
|
||||
use crate::event_mapping::has_non_contextual_dev_message_content;
|
||||
use crate::event_mapping::is_contextual_dev_message_content;
|
||||
use crate::event_mapping::is_contextual_user_message_content;
|
||||
use crate::session::turn_context::TurnContext;
|
||||
use codex_journal::Journal;
|
||||
use codex_journal::JournalItem;
|
||||
use codex_journal::JournalKey;
|
||||
use codex_journal::JournalTranscriptItem;
|
||||
use codex_journal::history as thread_history;
|
||||
use codex_journal::history::estimate_item_token_count;
|
||||
use codex_journal::history::estimate_response_item_model_visible_bytes;
|
||||
use codex_journal::history::is_api_message;
|
||||
use codex_journal::history::is_model_generated_item;
|
||||
use codex_journal::history::is_user_turn_boundary;
|
||||
use codex_journal::history::user_turn_boundary_positions;
|
||||
use codex_protocol::models::BaseInstructions;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::openai_models::InputModality;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::protocol::TokenUsageInfo;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
use codex_utils_output_truncation::TruncationPolicy;
|
||||
use codex_utils_output_truncation::approx_token_count;
|
||||
use std::ops::Deref;
|
||||
|
||||
#[derive(Debug, Clone, Copy, Default)]
|
||||
pub(crate) struct TotalTokenUsageBreakdown {
|
||||
pub(crate) last_api_response_total_tokens: i64,
|
||||
pub(crate) all_history_items_model_visible_bytes: i64,
|
||||
pub(crate) estimated_tokens_of_items_added_since_last_successful_api_response: i64,
|
||||
pub(crate) estimated_bytes_of_items_added_since_last_successful_api_response: i64,
|
||||
}
|
||||
|
||||
pub(crate) fn record_items<I>(journal: &mut Journal, items: I, policy: TruncationPolicy)
|
||||
where
|
||||
I: IntoIterator,
|
||||
I::Item: Deref<Target = ResponseItem>,
|
||||
{
|
||||
for item in items {
|
||||
let item_ref = item.deref();
|
||||
if !is_api_message(item_ref) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let processed = thread_history::truncate_history_item(item_ref, policy);
|
||||
push_history_item(journal, processed);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn raw_items(journal: &Journal) -> Vec<ResponseItem> {
|
||||
journal
|
||||
.entries()
|
||||
.iter()
|
||||
.filter_map(|entry| match &entry.item {
|
||||
JournalItem::Transcript(item) => Some(item.item.clone()),
|
||||
JournalItem::Metadata(_) | JournalItem::Checkpoint(_) => None,
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub(crate) fn for_prompt(
|
||||
journal: &Journal,
|
||||
input_modalities: &[InputModality],
|
||||
) -> Vec<ResponseItem> {
|
||||
let mut items = raw_items(journal);
|
||||
normalize_history(&mut items, input_modalities);
|
||||
items
|
||||
}
|
||||
|
||||
pub(crate) fn estimate_token_count(journal: &Journal, turn_context: &TurnContext) -> Option<i64> {
|
||||
let model_info = &turn_context.model_info;
|
||||
let personality = turn_context.personality.or(turn_context.config.personality);
|
||||
let base_instructions = BaseInstructions {
|
||||
text: model_info.get_model_instructions(personality),
|
||||
};
|
||||
estimate_token_count_with_base_instructions(journal, &base_instructions)
|
||||
}
|
||||
|
||||
pub(crate) fn estimate_token_count_with_base_instructions(
|
||||
journal: &Journal,
|
||||
base_instructions: &BaseInstructions,
|
||||
) -> Option<i64> {
|
||||
let base_tokens =
|
||||
i64::try_from(approx_token_count(&base_instructions.text)).unwrap_or(i64::MAX);
|
||||
|
||||
let items_tokens = raw_items(journal)
|
||||
.iter()
|
||||
.map(estimate_item_token_count)
|
||||
.fold(0i64, i64::saturating_add);
|
||||
|
||||
Some(base_tokens.saturating_add(items_tokens))
|
||||
}
|
||||
|
||||
pub(crate) fn remove_first_item(journal: &mut Journal) -> bool {
|
||||
let mut items = raw_items(journal);
|
||||
if items.is_empty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
let removed = items.remove(0);
|
||||
thread_history::remove_corresponding_for(&mut items, &removed);
|
||||
replace_history(journal, items);
|
||||
true
|
||||
}
|
||||
|
||||
pub(crate) fn remove_last_item(journal: &mut Journal) -> bool {
|
||||
let mut items = raw_items(journal);
|
||||
if let Some(removed) = items.pop() {
|
||||
thread_history::remove_corresponding_for(&mut items, &removed);
|
||||
replace_history(journal, items);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn replace_history(journal: &mut Journal, items: Vec<ResponseItem>) {
|
||||
*journal = journal_from_items(items);
|
||||
}
|
||||
|
||||
pub(crate) fn replace_last_turn_images(journal: &mut Journal, placeholder: &str) -> bool {
|
||||
let mut items = raw_items(journal);
|
||||
let replaced = thread_history::replace_last_turn_images(&mut items, placeholder);
|
||||
if replaced {
|
||||
replace_history(journal, items);
|
||||
}
|
||||
replaced
|
||||
}
|
||||
|
||||
pub(crate) fn drop_last_n_user_turns(
|
||||
journal: &mut Journal,
|
||||
reference_context_item: &mut Option<TurnContextItem>,
|
||||
num_turns: u32,
|
||||
) {
|
||||
if num_turns == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
let snapshot = raw_items(journal);
|
||||
let user_positions = user_turn_boundary_positions(&snapshot);
|
||||
let Some(&first_instruction_turn_idx) = user_positions.first() else {
|
||||
replace_history(journal, snapshot);
|
||||
return;
|
||||
};
|
||||
|
||||
let n_from_end = usize::try_from(num_turns).unwrap_or(usize::MAX);
|
||||
let mut cut_idx = if n_from_end >= user_positions.len() {
|
||||
first_instruction_turn_idx
|
||||
} else {
|
||||
user_positions[user_positions.len() - n_from_end]
|
||||
};
|
||||
|
||||
cut_idx = trim_pre_turn_context_updates(
|
||||
reference_context_item,
|
||||
&snapshot,
|
||||
first_instruction_turn_idx,
|
||||
cut_idx,
|
||||
);
|
||||
|
||||
replace_history(journal, snapshot[..cut_idx].to_vec());
|
||||
}
|
||||
|
||||
pub(crate) fn update_token_info(
|
||||
token_info: &mut Option<TokenUsageInfo>,
|
||||
usage: &TokenUsage,
|
||||
model_context_window: Option<i64>,
|
||||
) {
|
||||
*token_info =
|
||||
TokenUsageInfo::new_or_append(token_info, &Some(usage.clone()), model_context_window);
|
||||
}
|
||||
|
||||
pub(crate) fn set_token_usage_full(token_info: &mut Option<TokenUsageInfo>, context_window: i64) {
|
||||
match token_info {
|
||||
Some(info) => info.fill_to_context_window(context_window),
|
||||
None => {
|
||||
*token_info = Some(TokenUsageInfo::full_context_window(context_window));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_total_token_usage(
|
||||
journal: &Journal,
|
||||
token_info: &Option<TokenUsageInfo>,
|
||||
server_reasoning_included: bool,
|
||||
) -> i64 {
|
||||
let items = raw_items(journal);
|
||||
let last_tokens = token_info
|
||||
.as_ref()
|
||||
.map(|info| info.last_token_usage.total_tokens)
|
||||
.unwrap_or(0);
|
||||
let items_after_last_model_generated_tokens = items_after_last_model_generated_item(&items)
|
||||
.iter()
|
||||
.map(estimate_item_token_count)
|
||||
.fold(0i64, i64::saturating_add);
|
||||
if server_reasoning_included {
|
||||
last_tokens.saturating_add(items_after_last_model_generated_tokens)
|
||||
} else {
|
||||
last_tokens
|
||||
.saturating_add(get_non_last_reasoning_items_tokens(&items))
|
||||
.saturating_add(items_after_last_model_generated_tokens)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_total_token_usage_breakdown(
|
||||
journal: &Journal,
|
||||
token_info: &Option<TokenUsageInfo>,
|
||||
) -> TotalTokenUsageBreakdown {
|
||||
let items = raw_items(journal);
|
||||
let last_usage = token_info
|
||||
.as_ref()
|
||||
.map(|info| info.last_token_usage.clone())
|
||||
.unwrap_or_default();
|
||||
let items_after_last_model_generated = items_after_last_model_generated_item(&items);
|
||||
|
||||
TotalTokenUsageBreakdown {
|
||||
last_api_response_total_tokens: last_usage.total_tokens,
|
||||
all_history_items_model_visible_bytes: items
|
||||
.iter()
|
||||
.map(estimate_response_item_model_visible_bytes)
|
||||
.fold(0i64, i64::saturating_add),
|
||||
estimated_tokens_of_items_added_since_last_successful_api_response:
|
||||
items_after_last_model_generated
|
||||
.iter()
|
||||
.map(estimate_item_token_count)
|
||||
.fold(0i64, i64::saturating_add),
|
||||
estimated_bytes_of_items_added_since_last_successful_api_response:
|
||||
items_after_last_model_generated
|
||||
.iter()
|
||||
.map(estimate_response_item_model_visible_bytes)
|
||||
.fold(0i64, i64::saturating_add),
|
||||
}
|
||||
}
|
||||
|
||||
fn push_history_item(journal: &mut Journal, item: ResponseItem) {
|
||||
let history_item = JournalTranscriptItem::new(item);
|
||||
let key = JournalKey::new(vec!["history".to_string(), history_item.id.clone()]);
|
||||
journal.add(key, history_item);
|
||||
}
|
||||
|
||||
fn journal_from_items(items: Vec<ResponseItem>) -> Journal {
|
||||
let mut journal = Journal::new();
|
||||
for item in items {
|
||||
if is_api_message(&item) {
|
||||
push_history_item(&mut journal, item);
|
||||
}
|
||||
}
|
||||
journal
|
||||
}
|
||||
|
||||
fn get_non_last_reasoning_items_tokens(items: &[ResponseItem]) -> i64 {
|
||||
let Some(last_user_index) = items.iter().rposition(is_user_turn_boundary) else {
|
||||
return 0;
|
||||
};
|
||||
|
||||
items
|
||||
.iter()
|
||||
.take(last_user_index)
|
||||
.filter(|item| {
|
||||
matches!(
|
||||
item,
|
||||
ResponseItem::Reasoning {
|
||||
encrypted_content: Some(_),
|
||||
..
|
||||
}
|
||||
)
|
||||
})
|
||||
.map(estimate_item_token_count)
|
||||
.fold(0i64, i64::saturating_add)
|
||||
}
|
||||
|
||||
fn items_after_last_model_generated_item(items: &[ResponseItem]) -> &[ResponseItem] {
|
||||
let start = items
|
||||
.iter()
|
||||
.rposition(is_model_generated_item)
|
||||
.map_or(items.len(), |index| index.saturating_add(1));
|
||||
&items[start..]
|
||||
}
|
||||
|
||||
fn normalize_history(items: &mut Vec<ResponseItem>, input_modalities: &[InputModality]) {
|
||||
thread_history::ensure_call_outputs_present(items);
|
||||
thread_history::remove_orphan_outputs(items);
|
||||
thread_history::strip_images_when_unsupported(input_modalities, items);
|
||||
}
|
||||
|
||||
fn trim_pre_turn_context_updates(
|
||||
reference_context_item: &mut Option<TurnContextItem>,
|
||||
snapshot: &[ResponseItem],
|
||||
first_instruction_turn_idx: usize,
|
||||
mut cut_idx: usize,
|
||||
) -> usize {
|
||||
while cut_idx > first_instruction_turn_idx {
|
||||
match &snapshot[cut_idx - 1] {
|
||||
ResponseItem::Message { role, content, .. }
|
||||
if role == "developer" && is_contextual_dev_message_content(content) =>
|
||||
{
|
||||
if has_non_contextual_dev_message_content(content) {
|
||||
*reference_context_item = None;
|
||||
}
|
||||
cut_idx -= 1;
|
||||
}
|
||||
ResponseItem::Message { role, content, .. }
|
||||
if role == "user" && is_contextual_user_message_content(content) =>
|
||||
{
|
||||
cut_idx -= 1;
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
cut_idx
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "history_tests.rs"]
|
||||
mod tests;
|
||||
@@ -1,6 +1,9 @@
|
||||
use super::*;
|
||||
use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
||||
use codex_journal::Journal;
|
||||
use codex_journal::history::ORIGINAL_IMAGE_MAX_PATCHES;
|
||||
use codex_journal::history::RESIZED_IMAGE_BYTES_ESTIMATE;
|
||||
use codex_protocol::AgentPath;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::models::BaseInstructions;
|
||||
@@ -20,8 +23,11 @@ use codex_protocol::openai_models::default_input_modalities;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::InterAgentCommunication;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::protocol::TokenUsageInfo;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
use codex_utils_output_truncation::TruncationPolicy;
|
||||
use codex_utils_output_truncation::approx_bytes_for_tokens;
|
||||
use codex_utils_output_truncation::truncate_text;
|
||||
use image::ImageBuffer;
|
||||
use image::ImageFormat;
|
||||
@@ -34,6 +40,98 @@ use std::path::PathBuf;
|
||||
const EXEC_FORMAT_MAX_BYTES: usize = 10_000;
|
||||
const EXEC_FORMAT_MAX_TOKENS: usize = 2_500;
|
||||
|
||||
#[derive(Clone, Default)]
|
||||
struct TestHistory {
|
||||
journal: Journal,
|
||||
token_info: Option<TokenUsageInfo>,
|
||||
reference_context_item: Option<TurnContextItem>,
|
||||
}
|
||||
|
||||
impl TestHistory {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
journal: Journal::new(),
|
||||
token_info: TokenUsageInfo::new_or_append(
|
||||
&None, &None, /*model_context_window*/ None,
|
||||
),
|
||||
reference_context_item: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn record_items<I>(&mut self, items: I, policy: TruncationPolicy)
|
||||
where
|
||||
I: IntoIterator,
|
||||
I::Item: std::ops::Deref<Target = ResponseItem>,
|
||||
{
|
||||
super::record_items(&mut self.journal, items, policy);
|
||||
}
|
||||
|
||||
fn raw_items(&self) -> Vec<ResponseItem> {
|
||||
super::raw_items(&self.journal)
|
||||
}
|
||||
|
||||
fn for_prompt(&self, input_modalities: &[InputModality]) -> Vec<ResponseItem> {
|
||||
super::for_prompt(&self.journal, input_modalities)
|
||||
}
|
||||
|
||||
fn estimate_token_count_with_base_instructions(
|
||||
&self,
|
||||
base_instructions: &BaseInstructions,
|
||||
) -> Option<i64> {
|
||||
super::estimate_token_count_with_base_instructions(&self.journal, base_instructions)
|
||||
}
|
||||
|
||||
fn remove_first_item(&mut self) {
|
||||
super::remove_first_item(&mut self.journal);
|
||||
}
|
||||
|
||||
fn remove_last_item(&mut self) -> bool {
|
||||
super::remove_last_item(&mut self.journal)
|
||||
}
|
||||
|
||||
fn replace_last_turn_images(&mut self, placeholder: &str) -> bool {
|
||||
super::replace_last_turn_images(&mut self.journal, placeholder)
|
||||
}
|
||||
|
||||
fn drop_last_n_user_turns(&mut self, num_turns: u32) {
|
||||
super::drop_last_n_user_turns(
|
||||
&mut self.journal,
|
||||
&mut self.reference_context_item,
|
||||
num_turns,
|
||||
);
|
||||
}
|
||||
|
||||
fn update_token_info(&mut self, usage: &TokenUsage, model_context_window: Option<i64>) {
|
||||
super::update_token_info(&mut self.token_info, usage, model_context_window);
|
||||
}
|
||||
|
||||
fn get_total_token_usage(&self, server_reasoning_included: bool) -> i64 {
|
||||
super::get_total_token_usage(&self.journal, &self.token_info, server_reasoning_included)
|
||||
}
|
||||
|
||||
fn set_reference_context_item(&mut self, item: Option<TurnContextItem>) {
|
||||
self.reference_context_item = item;
|
||||
}
|
||||
|
||||
fn reference_context_item(&self) -> Option<TurnContextItem> {
|
||||
self.reference_context_item.clone()
|
||||
}
|
||||
|
||||
fn normalize_history(&mut self, input_modalities: &[InputModality]) {
|
||||
let mut items = self.raw_items();
|
||||
super::normalize_history(&mut items, input_modalities);
|
||||
super::replace_history(&mut self.journal, items);
|
||||
}
|
||||
|
||||
fn get_non_last_reasoning_items_tokens(&self) -> i64 {
|
||||
super::get_non_last_reasoning_items_tokens(&self.raw_items())
|
||||
}
|
||||
|
||||
fn items_after_last_model_generated_item(&self) -> Vec<ResponseItem> {
|
||||
super::items_after_last_model_generated_item(&self.raw_items()).to_vec()
|
||||
}
|
||||
}
|
||||
|
||||
fn assistant_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
@@ -63,8 +161,8 @@ fn inter_agent_assistant_msg(text: &str) -> ResponseItem {
|
||||
}
|
||||
}
|
||||
|
||||
fn create_history_with_items(items: Vec<ResponseItem>) -> ContextManager {
|
||||
let mut h = ContextManager::new();
|
||||
fn create_history_with_items(items: Vec<ResponseItem>) -> TestHistory {
|
||||
let mut h = TestHistory::new();
|
||||
// Use a generous but fixed token budget; tests only rely on truncation
|
||||
// behavior, not on a specific model's token limit.
|
||||
h.record_items(items.iter(), TruncationPolicy::Tokens(10_000));
|
||||
@@ -185,7 +283,7 @@ fn approx_token_count_for_text(text: &str) -> i64 {
|
||||
|
||||
#[test]
|
||||
fn filters_non_api_messages() {
|
||||
let mut h = ContextManager::default();
|
||||
let mut h = TestHistory::default();
|
||||
let policy = TruncationPolicy::Tokens(10_000);
|
||||
// System message is not API messages; Other is ignored.
|
||||
let system = ResponseItem::Message {
|
||||
@@ -327,7 +425,7 @@ fn drop_last_n_user_turns_treats_inter_agent_assistant_messages_as_instruction_t
|
||||
|
||||
history.drop_last_n_user_turns(/*num_turns*/ 1);
|
||||
|
||||
assert_eq!(history.raw_items(), &vec![first_turn, first_reply]);
|
||||
assert_eq!(history.raw_items(), vec![first_turn, first_reply]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1014,7 +1112,7 @@ fn normalization_retains_local_shell_outputs() {
|
||||
|
||||
#[test]
|
||||
fn record_items_truncates_function_call_output_content() {
|
||||
let mut history = ContextManager::new();
|
||||
let mut history = TestHistory::new();
|
||||
// Any reasonably small token budget works; the test only cares that
|
||||
// truncation happens and the marker is present.
|
||||
let policy = TruncationPolicy::Tokens(1_000);
|
||||
@@ -1030,8 +1128,9 @@ fn record_items_truncates_function_call_output_content() {
|
||||
|
||||
history.record_items([&item], policy);
|
||||
|
||||
assert_eq!(history.items.len(), 1);
|
||||
match &history.items[0] {
|
||||
let history_items = history.raw_items();
|
||||
assert_eq!(history_items.len(), 1);
|
||||
match &history_items[0] {
|
||||
ResponseItem::FunctionCallOutput { output, .. } => {
|
||||
let content = output.text_content().unwrap_or_default();
|
||||
assert_ne!(content, long_output);
|
||||
@@ -1050,7 +1149,7 @@ fn record_items_truncates_function_call_output_content() {
|
||||
|
||||
#[test]
|
||||
fn record_items_truncates_custom_tool_call_output_content() {
|
||||
let mut history = ContextManager::new();
|
||||
let mut history = TestHistory::new();
|
||||
let policy = TruncationPolicy::Tokens(1_000);
|
||||
let line = "custom output that is very long\n";
|
||||
let long_output = line.repeat(2_500);
|
||||
@@ -1062,8 +1161,9 @@ fn record_items_truncates_custom_tool_call_output_content() {
|
||||
|
||||
history.record_items([&item], policy);
|
||||
|
||||
assert_eq!(history.items.len(), 1);
|
||||
match &history.items[0] {
|
||||
let history_items = history.raw_items();
|
||||
assert_eq!(history_items.len(), 1);
|
||||
match &history_items[0] {
|
||||
ResponseItem::CustomToolCallOutput { output, .. } => {
|
||||
let output = output.text_content().unwrap_or_default();
|
||||
assert_ne!(output, long_output);
|
||||
@@ -1082,7 +1182,7 @@ fn record_items_truncates_custom_tool_call_output_content() {
|
||||
|
||||
#[test]
|
||||
fn record_items_respects_custom_token_limit() {
|
||||
let mut history = ContextManager::new();
|
||||
let mut history = TestHistory::new();
|
||||
let policy = TruncationPolicy::Tokens(10);
|
||||
let long_output = "tokenized content repeated many times ".repeat(200);
|
||||
let item = ResponseItem::FunctionCallOutput {
|
||||
@@ -1095,7 +1195,8 @@ fn record_items_respects_custom_token_limit() {
|
||||
|
||||
history.record_items([&item], policy);
|
||||
|
||||
let stored = match &history.items[0] {
|
||||
let history_items = history.raw_items();
|
||||
let stored = match &history_items[0] {
|
||||
ResponseItem::FunctionCallOutput { output, .. } => output,
|
||||
other => panic!("unexpected history item: {other:?}"),
|
||||
};
|
||||
@@ -1,7 +1,9 @@
|
||||
pub(crate) mod history;
|
||||
mod service;
|
||||
mod session;
|
||||
mod turn;
|
||||
|
||||
pub(crate) use history::TotalTokenUsageBreakdown;
|
||||
pub(crate) use service::SessionServices;
|
||||
pub(crate) use session::SessionState;
|
||||
pub(crate) use turn::ActiveTurn;
|
||||
|
||||
@@ -1,15 +1,17 @@
|
||||
//! Session-wide mutable state.
|
||||
|
||||
use codex_journal::Journal;
|
||||
use codex_protocol::models::AdditionalPermissionProfile;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_sandboxing::policy_transforms::merge_permission_profiles;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
|
||||
use crate::context_manager::ContextManager;
|
||||
use crate::session::PreviousTurnSettings;
|
||||
use crate::session::session::SessionConfiguration;
|
||||
use crate::session_startup_prewarm::SessionStartupPrewarmHandle;
|
||||
use crate::state::history as session_history;
|
||||
use crate::state::history::TotalTokenUsageBreakdown;
|
||||
use codex_protocol::protocol::RateLimitSnapshot;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::protocol::TokenUsageInfo;
|
||||
@@ -19,7 +21,10 @@ use codex_utils_output_truncation::TruncationPolicy;
|
||||
/// Persistent, session-scoped state previously stored directly on `Session`.
|
||||
pub(crate) struct SessionState {
|
||||
pub(crate) session_configuration: SessionConfiguration,
|
||||
pub(crate) history: ContextManager,
|
||||
journal: Journal,
|
||||
history_version: u64,
|
||||
token_info: Option<TokenUsageInfo>,
|
||||
reference_context_item: Option<TurnContextItem>,
|
||||
pub(crate) latest_rate_limits: Option<RateLimitSnapshot>,
|
||||
pub(crate) server_reasoning_included: bool,
|
||||
pub(crate) dependency_env: HashMap<String, String>,
|
||||
@@ -39,10 +44,14 @@ pub(crate) struct SessionState {
|
||||
impl SessionState {
|
||||
/// Create a new session state mirroring previous `State::default()` semantics.
|
||||
pub(crate) fn new(session_configuration: SessionConfiguration) -> Self {
|
||||
let history = ContextManager::new();
|
||||
Self {
|
||||
session_configuration,
|
||||
history,
|
||||
journal: Journal::new(),
|
||||
history_version: 0,
|
||||
token_info: TokenUsageInfo::new_or_append(
|
||||
&None, &None, /*model_context_window*/ None,
|
||||
),
|
||||
reference_context_item: None,
|
||||
latest_rate_limits: None,
|
||||
server_reasoning_included: false,
|
||||
dependency_env: HashMap::new(),
|
||||
@@ -62,7 +71,7 @@ impl SessionState {
|
||||
I: IntoIterator,
|
||||
I::Item: std::ops::Deref<Target = ResponseItem>,
|
||||
{
|
||||
self.history.record_items(items, policy);
|
||||
session_history::record_items(&mut self.journal, items, policy);
|
||||
}
|
||||
|
||||
pub(crate) fn previous_turn_settings(&self) -> Option<PreviousTurnSettings> {
|
||||
@@ -85,8 +94,8 @@ impl SessionState {
|
||||
is_first_turn
|
||||
}
|
||||
|
||||
pub(crate) fn clone_history(&self) -> ContextManager {
|
||||
self.history.clone()
|
||||
pub(crate) fn clone_history(&self) -> Journal {
|
||||
self.journal.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn replace_history(
|
||||
@@ -94,21 +103,21 @@ impl SessionState {
|
||||
items: Vec<ResponseItem>,
|
||||
reference_context_item: Option<TurnContextItem>,
|
||||
) {
|
||||
self.history.replace(items);
|
||||
self.history
|
||||
.set_reference_context_item(reference_context_item);
|
||||
session_history::replace_history(&mut self.journal, items);
|
||||
self.history_version = self.history_version.saturating_add(1);
|
||||
self.reference_context_item = reference_context_item;
|
||||
}
|
||||
|
||||
pub(crate) fn set_token_info(&mut self, info: Option<TokenUsageInfo>) {
|
||||
self.history.set_token_info(info);
|
||||
self.token_info = info;
|
||||
}
|
||||
|
||||
pub(crate) fn set_reference_context_item(&mut self, item: Option<TurnContextItem>) {
|
||||
self.history.set_reference_context_item(item);
|
||||
self.reference_context_item = item;
|
||||
}
|
||||
|
||||
pub(crate) fn reference_context_item(&self) -> Option<TurnContextItem> {
|
||||
self.history.reference_context_item()
|
||||
self.reference_context_item.clone()
|
||||
}
|
||||
|
||||
// Token/rate limit helpers
|
||||
@@ -117,11 +126,11 @@ impl SessionState {
|
||||
usage: &TokenUsage,
|
||||
model_context_window: Option<i64>,
|
||||
) {
|
||||
self.history.update_token_info(usage, model_context_window);
|
||||
session_history::update_token_info(&mut self.token_info, usage, model_context_window);
|
||||
}
|
||||
|
||||
pub(crate) fn token_info(&self) -> Option<TokenUsageInfo> {
|
||||
self.history.token_info()
|
||||
self.token_info.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn set_rate_limits(&mut self, snapshot: RateLimitSnapshot) {
|
||||
@@ -138,12 +147,38 @@ impl SessionState {
|
||||
}
|
||||
|
||||
pub(crate) fn set_token_usage_full(&mut self, context_window: i64) {
|
||||
self.history.set_token_usage_full(context_window);
|
||||
session_history::set_token_usage_full(&mut self.token_info, context_window);
|
||||
}
|
||||
|
||||
pub(crate) fn get_total_token_usage(&self, server_reasoning_included: bool) -> i64 {
|
||||
self.history
|
||||
.get_total_token_usage(server_reasoning_included)
|
||||
session_history::get_total_token_usage(
|
||||
&self.journal,
|
||||
&self.token_info,
|
||||
server_reasoning_included,
|
||||
)
|
||||
}
|
||||
|
||||
pub(crate) fn get_total_token_usage_breakdown(&self) -> TotalTokenUsageBreakdown {
|
||||
session_history::get_total_token_usage_breakdown(&self.journal, &self.token_info)
|
||||
}
|
||||
|
||||
pub(crate) fn estimate_token_count(
|
||||
&self,
|
||||
turn_context: &crate::session::turn_context::TurnContext,
|
||||
) -> Option<i64> {
|
||||
session_history::estimate_token_count(&self.journal, turn_context)
|
||||
}
|
||||
|
||||
pub(crate) fn history_version(&self) -> u64 {
|
||||
self.history_version
|
||||
}
|
||||
|
||||
pub(crate) fn replace_last_turn_images(&mut self, placeholder: &str) -> bool {
|
||||
let replaced = session_history::replace_last_turn_images(&mut self.journal, placeholder);
|
||||
if replaced {
|
||||
self.history_version = self.history_version.saturating_add(1);
|
||||
}
|
||||
replaced
|
||||
}
|
||||
|
||||
pub(crate) fn set_server_reasoning_included(&mut self, included: bool) {
|
||||
|
||||
6
codex-rs/journal/BUILD.bazel
Normal file
6
codex-rs/journal/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "journal",
|
||||
crate_name = "codex_journal",
|
||||
)
|
||||
24
codex-rs/journal/Cargo.toml
Normal file
24
codex-rs/journal/Cargo.toml
Normal file
@@ -0,0 +1,24 @@
|
||||
[package]
|
||||
name = "codex-journal"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
readme = "README.md"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
base64 = { workspace = true }
|
||||
codex-utils-cache = { workspace = true }
|
||||
codex-utils-output-truncation = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
image = { workspace = true, features = ["jpeg", "png", "webp"] }
|
||||
indexmap = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
86
codex-rs/journal/README.md
Normal file
86
codex-rs/journal/README.md
Normal file
@@ -0,0 +1,86 @@
|
||||
# codex-journal
|
||||
|
||||
`codex-journal` is the typed journal model behind prompt metadata, transcript rewriting, forked views,
|
||||
and prompt-ready projection in Codex.
|
||||
|
||||
## Model
|
||||
|
||||
A `Journal` stores one append-only sequence of `JournalEntry` values. Each entry is one of:
|
||||
|
||||
- prompt metadata: keyed entries that can replace older entries with the same key
|
||||
- transcript: ordered prompt-visible items
|
||||
- checkpoints: transcript rewrite operations such as prefix replacement or truncation
|
||||
|
||||
The append-only journal is the source of truth. Derived views are produced by resolving it.
|
||||
|
||||
## Resolved Views
|
||||
|
||||
`Journal::resolve()` returns a `ResolvedJournal` with two typed views:
|
||||
|
||||
- `ResolvedMetadata`: effective prompt-metadata entries after key-based replacement
|
||||
- `ResolvedTranscript`: effective transcript after checkpoint application
|
||||
|
||||
These two views come from the same journal, but they behave differently:
|
||||
|
||||
- metadata is deduplicated by key and ordered by `prompt_order`
|
||||
- transcript preserves order and is rewritten only by checkpoints
|
||||
|
||||
## Building Metadata Entries
|
||||
|
||||
Use `Journal::metadata_entry_builder(...)` when you want explicit control over metadata:
|
||||
|
||||
```rust
|
||||
use codex_journal::Journal;
|
||||
use codex_journal::JournalContextAudience;
|
||||
use codex_journal::PromptMessage;
|
||||
|
||||
let entry = Journal::metadata_entry_builder(
|
||||
["prompt", "developer", "permissions"],
|
||||
PromptMessage::developer_text("sandbox is workspace-write"),
|
||||
)
|
||||
.prompt_order(20)
|
||||
.audience(JournalContextAudience::All)
|
||||
.build();
|
||||
```
|
||||
|
||||
`Journal::metadata_entry(...)` remains available as a shorthand for the common case.
|
||||
|
||||
## Rendering
|
||||
|
||||
Prompt rendering lives in `PromptRenderer`, not in `Journal` itself. Grouping is explicit and
|
||||
applied only to consecutive resolved metadata entries:
|
||||
|
||||
```rust
|
||||
use codex_journal::Journal;
|
||||
use codex_journal::KeyFilter;
|
||||
use codex_journal::PromptMessage;
|
||||
use codex_journal::PromptRenderer;
|
||||
|
||||
let journal = Journal::from_entries(vec![
|
||||
Journal::metadata_entry(
|
||||
["prompt", "developer", "one"],
|
||||
10,
|
||||
PromptMessage::developer_text("first"),
|
||||
).unwrap(),
|
||||
Journal::metadata_entry(
|
||||
["prompt", "developer", "two"],
|
||||
20,
|
||||
PromptMessage::developer_text("second"),
|
||||
).unwrap(),
|
||||
]);
|
||||
|
||||
let resolved = journal.resolve().unwrap();
|
||||
let prompt = PromptRenderer::new()
|
||||
.group(KeyFilter::prefix(["prompt", "developer"]))
|
||||
.render_metadata(resolved.metadata());
|
||||
```
|
||||
|
||||
## Transformations
|
||||
|
||||
`Journal` also supports:
|
||||
|
||||
- `filter` and `resolve_with_filter` for key-scoped views
|
||||
- `flatten` for dropping obsolete entries while keeping the current effective view
|
||||
- `fork` for producing child views with audience and `on_fork` filtering
|
||||
- `with_history_window` for keeping only a recent hot history suffix
|
||||
- `persist_jsonl` / `load_jsonl` for durable JSONL storage
|
||||
87
codex-rs/journal/src/context_builder.rs
Normal file
87
codex-rs/journal/src/context_builder.rs
Normal file
@@ -0,0 +1,87 @@
|
||||
use crate::JournalContextAudience;
|
||||
use crate::JournalContextForkBehavior;
|
||||
use crate::JournalEntry;
|
||||
use crate::JournalKey;
|
||||
use crate::JournalMetadataItem;
|
||||
use crate::PromptMessage;
|
||||
use codex_protocol::models::ContentItem;
|
||||
|
||||
/// Builder for one prompt-metadata journal entry.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MetadataEntryBuilder {
|
||||
key: JournalKey,
|
||||
message: PromptMessage,
|
||||
prompt_order: i64,
|
||||
audience: JournalContextAudience,
|
||||
on_fork: JournalContextForkBehavior,
|
||||
tags: Vec<String>,
|
||||
source: Option<String>,
|
||||
}
|
||||
|
||||
impl MetadataEntryBuilder {
|
||||
pub(crate) fn new(key: impl Into<JournalKey>, message: impl Into<PromptMessage>) -> Self {
|
||||
Self {
|
||||
key: key.into(),
|
||||
message: message.into(),
|
||||
prompt_order: 0,
|
||||
audience: JournalContextAudience::default(),
|
||||
on_fork: JournalContextForkBehavior::default(),
|
||||
tags: Vec::new(),
|
||||
source: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the prompt ordering used after resolution.
|
||||
pub fn prompt_order(mut self, prompt_order: i64) -> Self {
|
||||
self.prompt_order = prompt_order;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets the audience used when projecting or forking the journal.
|
||||
pub fn audience(mut self, audience: JournalContextAudience) -> Self {
|
||||
self.audience = audience;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets how this context entry behaves when creating a forked journal view.
|
||||
pub fn on_fork(mut self, on_fork: JournalContextForkBehavior) -> Self {
|
||||
self.on_fork = on_fork;
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets arbitrary tags for downstream classification.
|
||||
pub fn tags(mut self, tags: Vec<String>) -> Self {
|
||||
self.tags = tags;
|
||||
self
|
||||
}
|
||||
|
||||
/// Records the origin of this prompt-metadata entry.
|
||||
pub fn source(mut self, source: impl Into<String>) -> Self {
|
||||
self.source = Some(source.into());
|
||||
self
|
||||
}
|
||||
|
||||
/// Builds one prompt-metadata entry if the message is non-empty after trimming text content.
|
||||
pub fn build(self) -> Option<JournalEntry> {
|
||||
if self.message.content.is_empty()
|
||||
|| self.message.content.iter().all(|item| match item {
|
||||
ContentItem::InputText { text } | ContentItem::OutputText { text } => {
|
||||
text.trim().is_empty()
|
||||
}
|
||||
ContentItem::InputImage { .. } => false,
|
||||
})
|
||||
{
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut item = JournalMetadataItem::new(self.message)
|
||||
.with_prompt_order(self.prompt_order)
|
||||
.with_audience(self.audience)
|
||||
.with_on_fork(self.on_fork)
|
||||
.with_tags(self.tags);
|
||||
if let Some(source) = self.source {
|
||||
item = item.with_source(source);
|
||||
}
|
||||
Some(JournalEntry::new(self.key, item))
|
||||
}
|
||||
}
|
||||
24
codex-rs/journal/src/error.rs
Normal file
24
codex-rs/journal/src/error.rs
Normal file
@@ -0,0 +1,24 @@
|
||||
use thiserror::Error;
|
||||
|
||||
/// Convenience result type for journal operations.
|
||||
pub type Result<T> = std::result::Result<T, JournalError>;
|
||||
|
||||
/// Errors produced while materializing or persisting a journal.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum JournalError {
|
||||
#[error("failed to read or write journal")]
|
||||
Io(#[from] std::io::Error),
|
||||
#[error("failed to serialize journal item")]
|
||||
SerializeJson {
|
||||
#[from]
|
||||
source: serde_json::Error,
|
||||
},
|
||||
#[error("failed to parse journal item at line {line_number}")]
|
||||
ParseJson {
|
||||
line_number: usize,
|
||||
#[source]
|
||||
source: serde_json::Error,
|
||||
},
|
||||
#[error("history cursor referenced unknown history item id `{history_item_id}`")]
|
||||
UnknownHistoryItemId { history_item_id: String },
|
||||
}
|
||||
133
codex-rs/journal/src/history/classify.rs
Normal file
133
codex-rs/journal/src/history/classify.rs
Normal file
@@ -0,0 +1,133 @@
|
||||
use codex_protocol::items::parse_hook_prompt_fragment;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::ENVIRONMENT_CONTEXT_CLOSE_TAG;
|
||||
use codex_protocol::protocol::ENVIRONMENT_CONTEXT_OPEN_TAG;
|
||||
use codex_protocol::protocol::InterAgentCommunication;
|
||||
|
||||
const USER_INSTRUCTIONS_START_MARKER: &str = "# AGENTS.md instructions for ";
|
||||
const USER_INSTRUCTIONS_END_MARKER: &str = "</INSTRUCTIONS>";
|
||||
const SKILL_START_MARKER: &str = "<skill>";
|
||||
const SKILL_END_MARKER: &str = "</skill>";
|
||||
const USER_SHELL_COMMAND_START_MARKER: &str = "<user_shell_command>";
|
||||
const USER_SHELL_COMMAND_END_MARKER: &str = "</user_shell_command>";
|
||||
const TURN_ABORTED_START_MARKER: &str = "<turn_aborted>";
|
||||
const TURN_ABORTED_END_MARKER: &str = "</turn_aborted>";
|
||||
const SUBAGENT_NOTIFICATION_START_MARKER: &str = "<subagent_notification>";
|
||||
const SUBAGENT_NOTIFICATION_END_MARKER: &str = "</subagent_notification>";
|
||||
|
||||
/// Returns whether an item should be carried in API-visible conversation history.
|
||||
pub fn is_api_message(message: &ResponseItem) -> bool {
|
||||
match message {
|
||||
ResponseItem::Message { role, .. } => role.as_str() != "system",
|
||||
ResponseItem::FunctionCallOutput { .. }
|
||||
| ResponseItem::FunctionCall { .. }
|
||||
| ResponseItem::ToolSearchCall { .. }
|
||||
| ResponseItem::ToolSearchOutput { .. }
|
||||
| ResponseItem::CustomToolCall { .. }
|
||||
| ResponseItem::CustomToolCallOutput { .. }
|
||||
| ResponseItem::LocalShellCall { .. }
|
||||
| ResponseItem::Reasoning { .. }
|
||||
| ResponseItem::WebSearchCall { .. }
|
||||
| ResponseItem::ImageGenerationCall { .. }
|
||||
| ResponseItem::Compaction { .. } => true,
|
||||
ResponseItem::Other => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns whether an item originated from model generation rather than client-side bookkeeping.
|
||||
pub fn is_model_generated_item(item: &ResponseItem) -> bool {
|
||||
match item {
|
||||
ResponseItem::Message { role, .. } => role == "assistant",
|
||||
ResponseItem::Reasoning { .. }
|
||||
| ResponseItem::FunctionCall { .. }
|
||||
| ResponseItem::ToolSearchCall { .. }
|
||||
| ResponseItem::WebSearchCall { .. }
|
||||
| ResponseItem::ImageGenerationCall { .. }
|
||||
| ResponseItem::CustomToolCall { .. }
|
||||
| ResponseItem::LocalShellCall { .. }
|
||||
| ResponseItem::Compaction { .. } => true,
|
||||
ResponseItem::FunctionCallOutput { .. }
|
||||
| ResponseItem::ToolSearchOutput { .. }
|
||||
| ResponseItem::CustomToolCallOutput { .. }
|
||||
| ResponseItem::Other => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns whether an item was injected by Codex rather than supplied by the user or model.
|
||||
pub fn is_codex_generated_item(item: &ResponseItem) -> bool {
|
||||
matches!(
|
||||
item,
|
||||
ResponseItem::FunctionCallOutput { .. }
|
||||
| ResponseItem::ToolSearchOutput { .. }
|
||||
| ResponseItem::CustomToolCallOutput { .. }
|
||||
) || matches!(item, ResponseItem::Message { role, .. } if role == "developer")
|
||||
}
|
||||
|
||||
/// Returns whether an item should count as an instruction-turn boundary for history rewrites.
|
||||
pub fn is_user_turn_boundary(item: &ResponseItem) -> bool {
|
||||
let ResponseItem::Message { role, content, .. } = item else {
|
||||
return false;
|
||||
};
|
||||
|
||||
(role == "user" && !is_contextual_user_message_content(content))
|
||||
|| (role == "assistant" && is_inter_agent_instruction_content(content))
|
||||
}
|
||||
|
||||
/// Returns the indexes of all instruction-turn boundaries in order.
|
||||
pub fn user_turn_boundary_positions(items: &[ResponseItem]) -> Vec<usize> {
|
||||
let mut positions = Vec::new();
|
||||
for (index, item) in items.iter().enumerate() {
|
||||
if is_user_turn_boundary(item) {
|
||||
positions.push(index);
|
||||
}
|
||||
}
|
||||
positions
|
||||
}
|
||||
|
||||
fn is_inter_agent_instruction_content(content: &[ContentItem]) -> bool {
|
||||
InterAgentCommunication::is_message_content(content)
|
||||
}
|
||||
|
||||
fn is_contextual_user_message_content(content: &[ContentItem]) -> bool {
|
||||
content.iter().any(|item| match item {
|
||||
ContentItem::InputText { text } => {
|
||||
parse_hook_prompt_fragment(text).is_some()
|
||||
|| matches_fragment(
|
||||
text,
|
||||
USER_INSTRUCTIONS_START_MARKER,
|
||||
USER_INSTRUCTIONS_END_MARKER,
|
||||
)
|
||||
|| matches_fragment(
|
||||
text,
|
||||
ENVIRONMENT_CONTEXT_OPEN_TAG,
|
||||
ENVIRONMENT_CONTEXT_CLOSE_TAG,
|
||||
)
|
||||
|| matches_fragment(text, SKILL_START_MARKER, SKILL_END_MARKER)
|
||||
|| matches_fragment(
|
||||
text,
|
||||
USER_SHELL_COMMAND_START_MARKER,
|
||||
USER_SHELL_COMMAND_END_MARKER,
|
||||
)
|
||||
|| matches_fragment(text, TURN_ABORTED_START_MARKER, TURN_ABORTED_END_MARKER)
|
||||
|| matches_fragment(
|
||||
text,
|
||||
SUBAGENT_NOTIFICATION_START_MARKER,
|
||||
SUBAGENT_NOTIFICATION_END_MARKER,
|
||||
)
|
||||
}
|
||||
ContentItem::InputImage { .. } | ContentItem::OutputText { .. } => false,
|
||||
})
|
||||
}
|
||||
|
||||
fn matches_fragment(text: &str, start_marker: &str, end_marker: &str) -> bool {
|
||||
let trimmed = text.trim_start();
|
||||
let starts_with_marker = trimmed
|
||||
.get(..start_marker.len())
|
||||
.is_some_and(|candidate| candidate.eq_ignore_ascii_case(start_marker));
|
||||
let trimmed = trimmed.trim_end();
|
||||
let ends_with_marker = trimmed
|
||||
.get(trimmed.len().saturating_sub(end_marker.len())..)
|
||||
.is_some_and(|candidate| candidate.eq_ignore_ascii_case(end_marker));
|
||||
starts_with_marker && ends_with_marker
|
||||
}
|
||||
174
codex-rs/journal/src/history/estimate.rs
Normal file
174
codex-rs/journal/src/history/estimate.rs
Normal file
@@ -0,0 +1,174 @@
|
||||
use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::FunctionCallOutputBody;
|
||||
use codex_protocol::models::FunctionCallOutputContentItem;
|
||||
use codex_protocol::models::ImageDetail;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_utils_cache::BlockingLruCache;
|
||||
use codex_utils_cache::sha1_digest;
|
||||
use codex_utils_output_truncation::approx_bytes_for_tokens;
|
||||
use codex_utils_output_truncation::approx_tokens_from_byte_count_i64;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::sync::LazyLock;
|
||||
|
||||
fn estimate_reasoning_length(encoded_len: usize) -> usize {
|
||||
encoded_len
|
||||
.saturating_mul(3)
|
||||
.checked_div(4)
|
||||
.unwrap_or(0)
|
||||
.saturating_sub(650)
|
||||
}
|
||||
|
||||
/// Approximates the token cost of one history item using model-visible byte heuristics.
|
||||
pub fn estimate_item_token_count(item: &ResponseItem) -> i64 {
|
||||
let model_visible_bytes = estimate_response_item_model_visible_bytes(item);
|
||||
approx_tokens_from_byte_count_i64(model_visible_bytes)
|
||||
}
|
||||
|
||||
/// Approximates the model-visible byte size of one history item.
|
||||
///
|
||||
/// Inline base64 image payloads are discounted to a fixed vision-token estimate instead of their
|
||||
/// raw serialized size, while encrypted reasoning content uses a coarse decoded-length heuristic.
|
||||
pub fn estimate_response_item_model_visible_bytes(item: &ResponseItem) -> i64 {
|
||||
match item {
|
||||
ResponseItem::Reasoning {
|
||||
encrypted_content: Some(content),
|
||||
..
|
||||
}
|
||||
| ResponseItem::Compaction {
|
||||
encrypted_content: content,
|
||||
} => i64::try_from(estimate_reasoning_length(content.len())).unwrap_or(i64::MAX),
|
||||
item => {
|
||||
let raw = serde_json::to_string(item)
|
||||
.map(|serialized| i64::try_from(serialized.len()).unwrap_or(i64::MAX))
|
||||
.unwrap_or_default();
|
||||
let (payload_bytes, replacement_bytes) = image_data_url_estimate_adjustment(item);
|
||||
if payload_bytes == 0 || replacement_bytes == 0 {
|
||||
raw
|
||||
} else {
|
||||
raw.saturating_sub(payload_bytes)
|
||||
.saturating_add(replacement_bytes)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Approximate model-visible byte cost for one resized image input.
|
||||
pub const RESIZED_IMAGE_BYTES_ESTIMATE: i64 = 7373;
|
||||
const ORIGINAL_IMAGE_PATCH_SIZE: u32 = 32;
|
||||
/// Maximum patch budget used when estimating `detail: "original"` image cost.
|
||||
pub const ORIGINAL_IMAGE_MAX_PATCHES: usize = 10_000;
|
||||
const ORIGINAL_IMAGE_ESTIMATE_CACHE_SIZE: usize = 32;
|
||||
|
||||
static ORIGINAL_IMAGE_ESTIMATE_CACHE: LazyLock<BlockingLruCache<[u8; 20], Option<i64>>> =
|
||||
LazyLock::new(|| {
|
||||
BlockingLruCache::new(
|
||||
NonZeroUsize::new(ORIGINAL_IMAGE_ESTIMATE_CACHE_SIZE).unwrap_or(NonZeroUsize::MIN),
|
||||
)
|
||||
});
|
||||
|
||||
fn parse_base64_image_data_url(url: &str) -> Option<&str> {
|
||||
if !url
|
||||
.get(.."data:".len())
|
||||
.is_some_and(|prefix| prefix.eq_ignore_ascii_case("data:"))
|
||||
{
|
||||
return None;
|
||||
}
|
||||
let comma_index = url.find(',')?;
|
||||
let metadata = &url[..comma_index];
|
||||
let payload = &url[comma_index + 1..];
|
||||
let metadata_without_scheme = &metadata["data:".len()..];
|
||||
let mut metadata_parts = metadata_without_scheme.split(';');
|
||||
let mime_type = metadata_parts.next().unwrap_or_default();
|
||||
let has_base64_marker = metadata_parts.any(|part| part.eq_ignore_ascii_case("base64"));
|
||||
if !mime_type
|
||||
.get(.."image/".len())
|
||||
.is_some_and(|prefix| prefix.eq_ignore_ascii_case("image/"))
|
||||
{
|
||||
return None;
|
||||
}
|
||||
if !has_base64_marker {
|
||||
return None;
|
||||
}
|
||||
Some(payload)
|
||||
}
|
||||
|
||||
fn estimate_original_image_bytes(image_url: &str) -> Option<i64> {
|
||||
let key = sha1_digest(image_url.as_bytes());
|
||||
ORIGINAL_IMAGE_ESTIMATE_CACHE.get_or_insert_with(key, || {
|
||||
let payload = match parse_base64_image_data_url(image_url) {
|
||||
Some(payload) => payload,
|
||||
None => {
|
||||
tracing::trace!("skipping original-detail estimate for non-base64 image data URL");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
let bytes = match BASE64_STANDARD.decode(payload) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(error) => {
|
||||
tracing::trace!("failed to decode original-detail image payload: {error}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
let dynamic = match image::load_from_memory(&bytes) {
|
||||
Ok(dynamic) => dynamic,
|
||||
Err(error) => {
|
||||
tracing::trace!("failed to decode original-detail image bytes: {error}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
let width = i64::from(dynamic.width());
|
||||
let height = i64::from(dynamic.height());
|
||||
let patch_size = i64::from(ORIGINAL_IMAGE_PATCH_SIZE);
|
||||
let patches_wide = width.saturating_add(patch_size.saturating_sub(1)) / patch_size;
|
||||
let patches_high = height.saturating_add(patch_size.saturating_sub(1)) / patch_size;
|
||||
let patch_count = patches_wide.saturating_mul(patches_high);
|
||||
let patch_count = usize::try_from(patch_count).unwrap_or(usize::MAX);
|
||||
let patch_count = patch_count.min(ORIGINAL_IMAGE_MAX_PATCHES);
|
||||
Some(i64::try_from(approx_bytes_for_tokens(patch_count)).unwrap_or(i64::MAX))
|
||||
})
|
||||
}
|
||||
|
||||
fn image_data_url_estimate_adjustment(item: &ResponseItem) -> (i64, i64) {
|
||||
let mut payload_bytes = 0i64;
|
||||
let mut replacement_bytes = 0i64;
|
||||
|
||||
let mut accumulate = |image_url: &str, detail: Option<ImageDetail>| {
|
||||
if let Some(payload_len) = parse_base64_image_data_url(image_url).map(str::len) {
|
||||
payload_bytes =
|
||||
payload_bytes.saturating_add(i64::try_from(payload_len).unwrap_or(i64::MAX));
|
||||
replacement_bytes = replacement_bytes.saturating_add(match detail {
|
||||
Some(ImageDetail::Original) => {
|
||||
estimate_original_image_bytes(image_url).unwrap_or(RESIZED_IMAGE_BYTES_ESTIMATE)
|
||||
}
|
||||
_ => RESIZED_IMAGE_BYTES_ESTIMATE,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
match item {
|
||||
ResponseItem::Message { content, .. } => {
|
||||
for content_item in content {
|
||||
if let ContentItem::InputImage { image_url, detail } = content_item {
|
||||
accumulate(image_url, *detail);
|
||||
}
|
||||
}
|
||||
}
|
||||
ResponseItem::FunctionCallOutput { output, .. }
|
||||
| ResponseItem::CustomToolCallOutput { output, .. } => {
|
||||
if let FunctionCallOutputBody::ContentItems(items) = &output.body {
|
||||
for content_item in items {
|
||||
if let FunctionCallOutputContentItem::InputImage { image_url, detail } =
|
||||
content_item
|
||||
{
|
||||
accumulate(image_url, *detail);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
(payload_bytes, replacement_bytes)
|
||||
}
|
||||
26
codex-rs/journal/src/history/mod.rs
Normal file
26
codex-rs/journal/src/history/mod.rs
Normal file
@@ -0,0 +1,26 @@
|
||||
//! Pure history utilities for normalizing, classifying, rewriting, and estimating prompt-ready
|
||||
//! history items.
|
||||
|
||||
mod classify;
|
||||
mod estimate;
|
||||
mod normalize;
|
||||
mod transform;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
pub use classify::is_api_message;
|
||||
pub use classify::is_codex_generated_item;
|
||||
pub use classify::is_model_generated_item;
|
||||
pub use classify::is_user_turn_boundary;
|
||||
pub use classify::user_turn_boundary_positions;
|
||||
pub use estimate::ORIGINAL_IMAGE_MAX_PATCHES;
|
||||
pub use estimate::RESIZED_IMAGE_BYTES_ESTIMATE;
|
||||
pub use estimate::estimate_item_token_count;
|
||||
pub use estimate::estimate_response_item_model_visible_bytes;
|
||||
pub use normalize::ensure_call_outputs_present;
|
||||
pub use normalize::remove_corresponding_for;
|
||||
pub use normalize::remove_orphan_outputs;
|
||||
pub use normalize::strip_images_when_unsupported;
|
||||
pub use transform::replace_last_turn_images;
|
||||
pub use transform::truncate_history_item;
|
||||
@@ -3,24 +3,21 @@ use codex_protocol::models::FunctionCallOutputContentItem;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::openai_models::InputModality;
|
||||
use std::collections::HashSet;
|
||||
|
||||
use crate::util::error_or_panic;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
|
||||
const IMAGE_CONTENT_OMITTED_PLACEHOLDER: &str =
|
||||
"image content omitted because you do not support image input";
|
||||
|
||||
pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
|
||||
// Collect synthetic outputs to insert immediately after their calls.
|
||||
// Store the insertion position (index of call) alongside the item so
|
||||
// we can insert in reverse order and avoid index shifting.
|
||||
/// Ensures every tool-call item has a matching output item, inserting synthetic aborted outputs
|
||||
/// immediately after calls that are missing one.
|
||||
pub fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
|
||||
let mut missing_outputs_to_insert: Vec<(usize, ResponseItem)> = Vec::new();
|
||||
|
||||
for (idx, item) in items.iter().enumerate() {
|
||||
for (index, item) in items.iter().enumerate() {
|
||||
match item {
|
||||
ResponseItem::FunctionCall { call_id, .. } => {
|
||||
let has_output = items.iter().any(|i| match i {
|
||||
let has_output = items.iter().any(|other| match other {
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: existing, ..
|
||||
} => existing == call_id,
|
||||
@@ -30,7 +27,7 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
|
||||
if !has_output {
|
||||
info!("Function call output is missing for call id: {call_id}");
|
||||
missing_outputs_to_insert.push((
|
||||
idx,
|
||||
index,
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: FunctionCallOutputPayload::from_text("aborted".to_string()),
|
||||
@@ -42,7 +39,7 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
|
||||
call_id: Some(call_id),
|
||||
..
|
||||
} => {
|
||||
let has_output = items.iter().any(|i| match i {
|
||||
let has_output = items.iter().any(|other| match other {
|
||||
ResponseItem::ToolSearchOutput {
|
||||
call_id: Some(existing),
|
||||
..
|
||||
@@ -53,7 +50,7 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
|
||||
if !has_output {
|
||||
info!("Tool search output is missing for call id: {call_id}");
|
||||
missing_outputs_to_insert.push((
|
||||
idx,
|
||||
index,
|
||||
ResponseItem::ToolSearchOutput {
|
||||
call_id: Some(call_id.clone()),
|
||||
status: "completed".to_string(),
|
||||
@@ -64,7 +61,7 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
|
||||
}
|
||||
}
|
||||
ResponseItem::CustomToolCall { call_id, .. } => {
|
||||
let has_output = items.iter().any(|i| match i {
|
||||
let has_output = items.iter().any(|other| match other {
|
||||
ResponseItem::CustomToolCallOutput {
|
||||
call_id: existing, ..
|
||||
} => existing == call_id,
|
||||
@@ -72,11 +69,11 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
|
||||
});
|
||||
|
||||
if !has_output {
|
||||
error_or_panic(format!(
|
||||
report_invariant_violation(format!(
|
||||
"Custom tool call output is missing for call id: {call_id}"
|
||||
));
|
||||
missing_outputs_to_insert.push((
|
||||
idx,
|
||||
index,
|
||||
ResponseItem::CustomToolCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
name: None,
|
||||
@@ -85,10 +82,9 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
|
||||
));
|
||||
}
|
||||
}
|
||||
// LocalShellCall is represented in upstream streams by a FunctionCallOutput
|
||||
ResponseItem::LocalShellCall { call_id, .. } => {
|
||||
if let Some(call_id) = call_id.as_ref() {
|
||||
let has_output = items.iter().any(|i| match i {
|
||||
let has_output = items.iter().any(|other| match other {
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: existing, ..
|
||||
} => existing == call_id,
|
||||
@@ -96,11 +92,11 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
|
||||
});
|
||||
|
||||
if !has_output {
|
||||
error_or_panic(format!(
|
||||
report_invariant_violation(format!(
|
||||
"Local shell call output is missing for call id: {call_id}"
|
||||
));
|
||||
missing_outputs_to_insert.push((
|
||||
idx,
|
||||
index,
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: FunctionCallOutputPayload::from_text("aborted".to_string()),
|
||||
@@ -113,57 +109,57 @@ pub(crate) fn ensure_call_outputs_present(items: &mut Vec<ResponseItem>) {
|
||||
}
|
||||
}
|
||||
|
||||
// Insert synthetic outputs in reverse index order to avoid re-indexing.
|
||||
for (idx, output_item) in missing_outputs_to_insert.into_iter().rev() {
|
||||
items.insert(idx + 1, output_item);
|
||||
for (index, output_item) in missing_outputs_to_insert.into_iter().rev() {
|
||||
items.insert(index + 1, output_item);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn remove_orphan_outputs(items: &mut Vec<ResponseItem>) {
|
||||
let function_call_ids: HashSet<String> = items
|
||||
/// Removes output items whose corresponding tool-call items no longer exist.
|
||||
pub fn remove_orphan_outputs(items: &mut Vec<ResponseItem>) {
|
||||
let function_call_ids = items
|
||||
.iter()
|
||||
.filter_map(|i| match i {
|
||||
.filter_map(|item| match item {
|
||||
ResponseItem::FunctionCall { call_id, .. } => Some(call_id.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.collect();
|
||||
.collect::<std::collections::HashSet<_>>();
|
||||
|
||||
let tool_search_call_ids: HashSet<String> = items
|
||||
let tool_search_call_ids = items
|
||||
.iter()
|
||||
.filter_map(|i| match i {
|
||||
.filter_map(|item| match item {
|
||||
ResponseItem::ToolSearchCall {
|
||||
call_id: Some(call_id),
|
||||
..
|
||||
} => Some(call_id.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.collect();
|
||||
.collect::<std::collections::HashSet<_>>();
|
||||
|
||||
let local_shell_call_ids: HashSet<String> = items
|
||||
let local_shell_call_ids = items
|
||||
.iter()
|
||||
.filter_map(|i| match i {
|
||||
.filter_map(|item| match item {
|
||||
ResponseItem::LocalShellCall {
|
||||
call_id: Some(call_id),
|
||||
..
|
||||
} => Some(call_id.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.collect();
|
||||
.collect::<std::collections::HashSet<_>>();
|
||||
|
||||
let custom_tool_call_ids: HashSet<String> = items
|
||||
let custom_tool_call_ids = items
|
||||
.iter()
|
||||
.filter_map(|i| match i {
|
||||
.filter_map(|item| match item {
|
||||
ResponseItem::CustomToolCall { call_id, .. } => Some(call_id.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.collect();
|
||||
.collect::<std::collections::HashSet<_>>();
|
||||
|
||||
items.retain(|item| match item {
|
||||
ResponseItem::FunctionCallOutput { call_id, .. } => {
|
||||
let has_match =
|
||||
function_call_ids.contains(call_id) || local_shell_call_ids.contains(call_id);
|
||||
if !has_match {
|
||||
error_or_panic(format!(
|
||||
report_invariant_violation(format!(
|
||||
"Orphan function call output for call id: {call_id}"
|
||||
));
|
||||
}
|
||||
@@ -172,7 +168,7 @@ pub(crate) fn remove_orphan_outputs(items: &mut Vec<ResponseItem>) {
|
||||
ResponseItem::CustomToolCallOutput { call_id, .. } => {
|
||||
let has_match = custom_tool_call_ids.contains(call_id);
|
||||
if !has_match {
|
||||
error_or_panic(format!(
|
||||
report_invariant_violation(format!(
|
||||
"Orphan custom tool call output for call id: {call_id}"
|
||||
));
|
||||
}
|
||||
@@ -185,7 +181,9 @@ pub(crate) fn remove_orphan_outputs(items: &mut Vec<ResponseItem>) {
|
||||
} => {
|
||||
let has_match = tool_search_call_ids.contains(call_id);
|
||||
if !has_match {
|
||||
error_or_panic(format!("Orphan tool search output for call id: {call_id}"));
|
||||
report_invariant_violation(format!(
|
||||
"Orphan tool search output for call id: {call_id}"
|
||||
));
|
||||
}
|
||||
has_match
|
||||
}
|
||||
@@ -194,12 +192,13 @@ pub(crate) fn remove_orphan_outputs(items: &mut Vec<ResponseItem>) {
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn remove_corresponding_for(items: &mut Vec<ResponseItem>, item: &ResponseItem) {
|
||||
/// Removes the matching paired item for `item` from the history, if one exists.
|
||||
pub fn remove_corresponding_for(items: &mut Vec<ResponseItem>, item: &ResponseItem) {
|
||||
match item {
|
||||
ResponseItem::FunctionCall { call_id, .. } => {
|
||||
remove_first_matching(items, |i| {
|
||||
remove_first_matching(items, |other| {
|
||||
matches!(
|
||||
i,
|
||||
other,
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: existing, ..
|
||||
} if existing == call_id
|
||||
@@ -207,23 +206,35 @@ pub(crate) fn remove_corresponding_for(items: &mut Vec<ResponseItem>, item: &Res
|
||||
});
|
||||
}
|
||||
ResponseItem::FunctionCallOutput { call_id, .. } => {
|
||||
if let Some(pos) = items.iter().position(|i| {
|
||||
matches!(i, ResponseItem::FunctionCall { call_id: existing, .. } if existing == call_id)
|
||||
if let Some(position) = items.iter().position(|other| {
|
||||
matches!(
|
||||
other,
|
||||
ResponseItem::FunctionCall {
|
||||
call_id: existing,
|
||||
..
|
||||
} if existing == call_id
|
||||
)
|
||||
}) {
|
||||
items.remove(pos);
|
||||
} else if let Some(pos) = items.iter().position(|i| {
|
||||
matches!(i, ResponseItem::LocalShellCall { call_id: Some(existing), .. } if existing == call_id)
|
||||
items.remove(position);
|
||||
} else if let Some(position) = items.iter().position(|other| {
|
||||
matches!(
|
||||
other,
|
||||
ResponseItem::LocalShellCall {
|
||||
call_id: Some(existing),
|
||||
..
|
||||
} if existing == call_id
|
||||
)
|
||||
}) {
|
||||
items.remove(pos);
|
||||
items.remove(position);
|
||||
}
|
||||
}
|
||||
ResponseItem::ToolSearchCall {
|
||||
call_id: Some(call_id),
|
||||
..
|
||||
} => {
|
||||
remove_first_matching(items, |i| {
|
||||
remove_first_matching(items, |other| {
|
||||
matches!(
|
||||
i,
|
||||
other,
|
||||
ResponseItem::ToolSearchOutput {
|
||||
call_id: Some(existing),
|
||||
..
|
||||
@@ -235,23 +246,20 @@ pub(crate) fn remove_corresponding_for(items: &mut Vec<ResponseItem>, item: &Res
|
||||
call_id: Some(call_id),
|
||||
..
|
||||
} => {
|
||||
remove_first_matching(
|
||||
items,
|
||||
|i| {
|
||||
matches!(
|
||||
i,
|
||||
ResponseItem::ToolSearchCall {
|
||||
call_id: Some(existing),
|
||||
..
|
||||
} if existing == call_id
|
||||
)
|
||||
},
|
||||
);
|
||||
remove_first_matching(items, |other| {
|
||||
matches!(
|
||||
other,
|
||||
ResponseItem::ToolSearchCall {
|
||||
call_id: Some(existing),
|
||||
..
|
||||
} if existing == call_id
|
||||
)
|
||||
});
|
||||
}
|
||||
ResponseItem::CustomToolCall { call_id, .. } => {
|
||||
remove_first_matching(items, |i| {
|
||||
remove_first_matching(items, |other| {
|
||||
matches!(
|
||||
i,
|
||||
other,
|
||||
ResponseItem::CustomToolCallOutput {
|
||||
call_id: existing, ..
|
||||
} if existing == call_id
|
||||
@@ -259,18 +267,23 @@ pub(crate) fn remove_corresponding_for(items: &mut Vec<ResponseItem>, item: &Res
|
||||
});
|
||||
}
|
||||
ResponseItem::CustomToolCallOutput { call_id, .. } => {
|
||||
remove_first_matching(
|
||||
items,
|
||||
|i| matches!(i, ResponseItem::CustomToolCall { call_id: existing, .. } if existing == call_id),
|
||||
);
|
||||
remove_first_matching(items, |other| {
|
||||
matches!(
|
||||
other,
|
||||
ResponseItem::CustomToolCall {
|
||||
call_id: existing,
|
||||
..
|
||||
} if existing == call_id
|
||||
)
|
||||
});
|
||||
}
|
||||
ResponseItem::LocalShellCall {
|
||||
call_id: Some(call_id),
|
||||
..
|
||||
} => {
|
||||
remove_first_matching(items, |i| {
|
||||
remove_first_matching(items, |other| {
|
||||
matches!(
|
||||
i,
|
||||
other,
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: existing, ..
|
||||
} if existing == call_id
|
||||
@@ -281,23 +294,15 @@ pub(crate) fn remove_corresponding_for(items: &mut Vec<ResponseItem>, item: &Res
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_first_matching<F>(items: &mut Vec<ResponseItem>, predicate: F)
|
||||
where
|
||||
F: Fn(&ResponseItem) -> bool,
|
||||
{
|
||||
if let Some(pos) = items.iter().position(predicate) {
|
||||
items.remove(pos);
|
||||
}
|
||||
}
|
||||
|
||||
/// Strip image content from messages and tool outputs when the model does not support images.
|
||||
/// When `input_modalities` contains `InputModality::Image`, no stripping is performed.
|
||||
pub(crate) fn strip_images_when_unsupported(
|
||||
/// Strips image inputs from messages and tool outputs when the model does not support images.
|
||||
///
|
||||
/// Image-generation call results are cleared in the same mode so text-only prompts do not carry
|
||||
/// image payloads forward.
|
||||
pub fn strip_images_when_unsupported(
|
||||
input_modalities: &[InputModality],
|
||||
items: &mut [ResponseItem],
|
||||
) {
|
||||
let supports_images = input_modalities.contains(&InputModality::Image);
|
||||
if supports_images {
|
||||
if input_modalities.contains(&InputModality::Image) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -343,3 +348,20 @@ pub(crate) fn strip_images_when_unsupported(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_first_matching<F>(items: &mut Vec<ResponseItem>, predicate: F)
|
||||
where
|
||||
F: Fn(&ResponseItem) -> bool,
|
||||
{
|
||||
if let Some(position) = items.iter().position(predicate) {
|
||||
items.remove(position);
|
||||
}
|
||||
}
|
||||
|
||||
fn report_invariant_violation(message: String) {
|
||||
if cfg!(debug_assertions) {
|
||||
panic!("{message}");
|
||||
} else {
|
||||
error!("{message}");
|
||||
}
|
||||
}
|
||||
286
codex-rs/journal/src/history/tests.rs
Normal file
286
codex-rs/journal/src/history/tests.rs
Normal file
@@ -0,0 +1,286 @@
|
||||
use super::ensure_call_outputs_present;
|
||||
use super::estimate_response_item_model_visible_bytes;
|
||||
use super::is_user_turn_boundary;
|
||||
use super::remove_corresponding_for;
|
||||
use super::replace_last_turn_images;
|
||||
use super::strip_images_when_unsupported;
|
||||
use super::truncate_history_item;
|
||||
use codex_protocol::AgentPath;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::DEFAULT_IMAGE_DETAIL;
|
||||
use codex_protocol::models::FunctionCallOutputContentItem;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::openai_models::InputModality;
|
||||
use codex_protocol::protocol::InterAgentCommunication;
|
||||
use codex_utils_output_truncation::TruncationPolicy;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
fn user_message(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn assistant_message(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ensure_call_outputs_present_inserts_missing_function_output_after_call() {
|
||||
let mut items = vec![ResponseItem::FunctionCall {
|
||||
id: None,
|
||||
name: "shell".to_string(),
|
||||
namespace: None,
|
||||
arguments: "{}".to_string(),
|
||||
call_id: "call-1".to_string(),
|
||||
}];
|
||||
|
||||
ensure_call_outputs_present(&mut items);
|
||||
|
||||
assert_eq!(
|
||||
items,
|
||||
vec![
|
||||
ResponseItem::FunctionCall {
|
||||
id: None,
|
||||
name: "shell".to_string(),
|
||||
namespace: None,
|
||||
arguments: "{}".to_string(),
|
||||
call_id: "call-1".to_string(),
|
||||
},
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: "call-1".to_string(),
|
||||
output: FunctionCallOutputPayload::from_text("aborted".to_string()),
|
||||
},
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn remove_corresponding_for_removes_matching_tool_output() {
|
||||
let removed = ResponseItem::FunctionCall {
|
||||
id: None,
|
||||
name: "shell".to_string(),
|
||||
namespace: None,
|
||||
arguments: "{}".to_string(),
|
||||
call_id: "call-1".to_string(),
|
||||
};
|
||||
let mut items = vec![
|
||||
removed.clone(),
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: "call-1".to_string(),
|
||||
output: FunctionCallOutputPayload::from_text("done".to_string()),
|
||||
},
|
||||
];
|
||||
|
||||
items.remove(0);
|
||||
remove_corresponding_for(&mut items, &removed);
|
||||
|
||||
assert_eq!(items, Vec::new());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn strip_images_when_unsupported_rewrites_messages_and_tool_outputs() {
|
||||
let mut items = vec![
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![
|
||||
ContentItem::InputText {
|
||||
text: "look".to_string(),
|
||||
},
|
||||
ContentItem::InputImage {
|
||||
image_url: "https://example.com/img.png".to_string(),
|
||||
detail: Some(DEFAULT_IMAGE_DETAIL),
|
||||
},
|
||||
],
|
||||
phase: None,
|
||||
},
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: "call-1".to_string(),
|
||||
output: FunctionCallOutputPayload::from_content_items(vec![
|
||||
FunctionCallOutputContentItem::InputImage {
|
||||
image_url: "https://example.com/tool.png".to_string(),
|
||||
detail: Some(DEFAULT_IMAGE_DETAIL),
|
||||
},
|
||||
]),
|
||||
},
|
||||
ResponseItem::ImageGenerationCall {
|
||||
id: "ig-1".to_string(),
|
||||
status: "completed".to_string(),
|
||||
revised_prompt: None,
|
||||
result: "Zm9v".to_string(),
|
||||
},
|
||||
];
|
||||
|
||||
strip_images_when_unsupported(&[InputModality::Text], &mut items);
|
||||
|
||||
assert_eq!(
|
||||
items,
|
||||
vec![
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![
|
||||
ContentItem::InputText {
|
||||
text: "look".to_string(),
|
||||
},
|
||||
ContentItem::InputText {
|
||||
text: "image content omitted because you do not support image input"
|
||||
.to_string(),
|
||||
},
|
||||
],
|
||||
phase: None,
|
||||
},
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: "call-1".to_string(),
|
||||
output: FunctionCallOutputPayload::from_content_items(vec![
|
||||
FunctionCallOutputContentItem::InputText {
|
||||
text: "image content omitted because you do not support image input"
|
||||
.to_string(),
|
||||
},
|
||||
]),
|
||||
},
|
||||
ResponseItem::ImageGenerationCall {
|
||||
id: "ig-1".to_string(),
|
||||
status: "completed".to_string(),
|
||||
revised_prompt: None,
|
||||
result: String::new(),
|
||||
},
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn inter_agent_assistant_messages_are_turn_boundaries() {
|
||||
let communication = InterAgentCommunication::new(
|
||||
AgentPath::root(),
|
||||
AgentPath::root()
|
||||
.join("worker")
|
||||
.expect("sub-agent path should be valid"),
|
||||
Vec::new(),
|
||||
"continue".to_string(),
|
||||
/*trigger_turn*/ true,
|
||||
);
|
||||
let item = ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: serde_json::to_string(&communication).expect("message should serialize"),
|
||||
}],
|
||||
phase: None,
|
||||
};
|
||||
|
||||
assert!(is_user_turn_boundary(&item));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn estimate_response_item_model_visible_bytes_discounts_inline_image_data_urls() {
|
||||
let payload = "a".repeat(20_000);
|
||||
let image_item = ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputImage {
|
||||
image_url: format!("data:image/png;base64,{payload}"),
|
||||
detail: Some(DEFAULT_IMAGE_DETAIL),
|
||||
}],
|
||||
phase: None,
|
||||
};
|
||||
|
||||
let estimated = estimate_response_item_model_visible_bytes(&image_item);
|
||||
let raw = i64::try_from(
|
||||
serde_json::to_string(&image_item)
|
||||
.expect("item should serialize")
|
||||
.len(),
|
||||
)
|
||||
.expect("raw length should fit");
|
||||
|
||||
assert!(estimated > 0);
|
||||
assert!(estimated < raw);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn truncate_history_item_truncates_tool_outputs_but_not_messages() {
|
||||
let output = "x".repeat(20_000);
|
||||
let function_output = ResponseItem::FunctionCallOutput {
|
||||
call_id: "call-1".to_string(),
|
||||
output: FunctionCallOutputPayload::from_text(output),
|
||||
};
|
||||
let user = user_message("hello");
|
||||
|
||||
let truncated = truncate_history_item(
|
||||
&function_output,
|
||||
TruncationPolicy::Tokens(/*max_tokens*/ 64),
|
||||
);
|
||||
|
||||
assert_ne!(truncated, function_output);
|
||||
assert_eq!(
|
||||
truncate_history_item(&user, TruncationPolicy::Tokens(/*max_tokens*/ 64)),
|
||||
user
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn replace_last_turn_images_only_rewrites_latest_tool_output() {
|
||||
let mut items = vec![
|
||||
assistant_message("already done"),
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: "call-1".to_string(),
|
||||
output: FunctionCallOutputPayload::from_content_items(vec![
|
||||
FunctionCallOutputContentItem::InputImage {
|
||||
image_url: "https://example.com/older.png".to_string(),
|
||||
detail: Some(DEFAULT_IMAGE_DETAIL),
|
||||
},
|
||||
]),
|
||||
},
|
||||
user_message("new turn"),
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: "call-2".to_string(),
|
||||
output: FunctionCallOutputPayload::from_content_items(vec![
|
||||
FunctionCallOutputContentItem::InputImage {
|
||||
image_url: "https://example.com/newer.png".to_string(),
|
||||
detail: Some(DEFAULT_IMAGE_DETAIL),
|
||||
},
|
||||
]),
|
||||
},
|
||||
];
|
||||
|
||||
let replaced = replace_last_turn_images(&mut items, "omitted");
|
||||
|
||||
assert!(replaced);
|
||||
assert_eq!(
|
||||
items[1],
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: "call-1".to_string(),
|
||||
output: FunctionCallOutputPayload::from_content_items(vec![
|
||||
FunctionCallOutputContentItem::InputImage {
|
||||
image_url: "https://example.com/older.png".to_string(),
|
||||
detail: Some(DEFAULT_IMAGE_DETAIL),
|
||||
},
|
||||
]),
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
items[3],
|
||||
ResponseItem::FunctionCallOutput {
|
||||
call_id: "call-2".to_string(),
|
||||
output: FunctionCallOutputPayload::from_content_items(vec![
|
||||
FunctionCallOutputContentItem::InputText {
|
||||
text: "omitted".to_string(),
|
||||
},
|
||||
]),
|
||||
}
|
||||
);
|
||||
}
|
||||
92
codex-rs/journal/src/history/transform.rs
Normal file
92
codex-rs/journal/src/history/transform.rs
Normal file
@@ -0,0 +1,92 @@
|
||||
use crate::history::is_user_turn_boundary;
|
||||
use codex_protocol::models::FunctionCallOutputBody;
|
||||
use codex_protocol::models::FunctionCallOutputContentItem;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_utils_output_truncation::TruncationPolicy;
|
||||
use codex_utils_output_truncation::truncate_function_output_items_with_policy;
|
||||
use codex_utils_output_truncation::truncate_text;
|
||||
|
||||
/// Truncates one history item before it is recorded in the in-memory journal.
|
||||
///
|
||||
/// Tool outputs receive a small serialization headroom multiplier so the JSON wrapper bytes do not
|
||||
/// cause unexpected overages after truncation.
|
||||
pub fn truncate_history_item(item: &ResponseItem, policy: TruncationPolicy) -> ResponseItem {
|
||||
let policy_with_serialization_budget = policy * 1.2;
|
||||
match item {
|
||||
ResponseItem::FunctionCallOutput { call_id, output } => ResponseItem::FunctionCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
output: truncate_function_output_payload(output, policy_with_serialization_budget),
|
||||
},
|
||||
ResponseItem::CustomToolCallOutput {
|
||||
call_id,
|
||||
name,
|
||||
output,
|
||||
} => ResponseItem::CustomToolCallOutput {
|
||||
call_id: call_id.clone(),
|
||||
name: name.clone(),
|
||||
output: truncate_function_output_payload(output, policy_with_serialization_budget),
|
||||
},
|
||||
ResponseItem::Message { .. }
|
||||
| ResponseItem::Reasoning { .. }
|
||||
| ResponseItem::LocalShellCall { .. }
|
||||
| ResponseItem::FunctionCall { .. }
|
||||
| ResponseItem::ToolSearchCall { .. }
|
||||
| ResponseItem::ToolSearchOutput { .. }
|
||||
| ResponseItem::WebSearchCall { .. }
|
||||
| ResponseItem::ImageGenerationCall { .. }
|
||||
| ResponseItem::CustomToolCall { .. }
|
||||
| ResponseItem::Compaction { .. }
|
||||
| ResponseItem::Other => item.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Replaces image content in the last tool-output item of the current turn.
|
||||
pub fn replace_last_turn_images(items: &mut [ResponseItem], placeholder: &str) -> bool {
|
||||
let Some(index) = items.iter().rposition(|item| {
|
||||
matches!(item, ResponseItem::FunctionCallOutput { .. }) || is_user_turn_boundary(item)
|
||||
}) else {
|
||||
return false;
|
||||
};
|
||||
|
||||
match &mut items[index] {
|
||||
ResponseItem::FunctionCallOutput { output, .. } => {
|
||||
let Some(content_items) = output.content_items_mut() else {
|
||||
return false;
|
||||
};
|
||||
|
||||
let mut replaced = false;
|
||||
let placeholder = placeholder.to_string();
|
||||
for item in content_items.iter_mut() {
|
||||
if matches!(item, FunctionCallOutputContentItem::InputImage { .. }) {
|
||||
*item = FunctionCallOutputContentItem::InputText {
|
||||
text: placeholder.clone(),
|
||||
};
|
||||
replaced = true;
|
||||
}
|
||||
}
|
||||
replaced
|
||||
}
|
||||
ResponseItem::Message { .. } => false,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn truncate_function_output_payload(
|
||||
output: &FunctionCallOutputPayload,
|
||||
policy: TruncationPolicy,
|
||||
) -> FunctionCallOutputPayload {
|
||||
let body = match &output.body {
|
||||
FunctionCallOutputBody::Text(content) => {
|
||||
FunctionCallOutputBody::Text(truncate_text(content, policy))
|
||||
}
|
||||
FunctionCallOutputBody::ContentItems(items) => FunctionCallOutputBody::ContentItems(
|
||||
truncate_function_output_items_with_policy(items, policy),
|
||||
),
|
||||
};
|
||||
|
||||
FunctionCallOutputPayload {
|
||||
body,
|
||||
success: output.success,
|
||||
}
|
||||
}
|
||||
545
codex-rs/journal/src/journal.rs
Normal file
545
codex-rs/journal/src/journal.rs
Normal file
@@ -0,0 +1,545 @@
|
||||
use crate::JournalEntry;
|
||||
use crate::JournalError;
|
||||
use crate::JournalItem;
|
||||
use crate::JournalKey;
|
||||
use crate::KeyFilter;
|
||||
use crate::MetadataEntryBuilder;
|
||||
use crate::PromptView;
|
||||
use crate::Result;
|
||||
use codex_protocol::journal::JournalCheckpointItem;
|
||||
use codex_protocol::journal::JournalContextAudience;
|
||||
use codex_protocol::journal::JournalContextForkBehavior;
|
||||
use codex_protocol::journal::JournalHistoryCursor;
|
||||
use codex_protocol::journal::JournalMetadataItem;
|
||||
use codex_protocol::journal::JournalReplacePrefixCheckpoint;
|
||||
use codex_protocol::journal::JournalTranscriptItem;
|
||||
use codex_protocol::journal::JournalTruncateHistoryCheckpoint;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use indexmap::IndexMap;
|
||||
use std::fs::File;
|
||||
use std::io::BufRead;
|
||||
use std::io::BufReader;
|
||||
use std::io::BufWriter;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
|
||||
/// Canonical typed journal.
|
||||
#[derive(Debug, Clone, Default, PartialEq)]
|
||||
pub struct Journal {
|
||||
entries: Vec<JournalEntry>,
|
||||
}
|
||||
|
||||
impl Journal {
|
||||
/// Creates an empty journal.
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Creates a journal from an explicit list of entries.
|
||||
pub fn from_entries(entries: Vec<JournalEntry>) -> Self {
|
||||
Self { entries }
|
||||
}
|
||||
|
||||
/// Appends one keyed item to the journal.
|
||||
pub fn add<K, T>(&mut self, key: K, item: T)
|
||||
where
|
||||
K: Into<JournalKey>,
|
||||
T: Into<JournalItem>,
|
||||
{
|
||||
self.entries.push(JournalEntry::new(key, item));
|
||||
}
|
||||
|
||||
/// Appends several keyed journal entries.
|
||||
pub fn extend<I, T>(&mut self, entries: I)
|
||||
where
|
||||
I: IntoIterator<Item = T>,
|
||||
T: Into<JournalEntry>,
|
||||
{
|
||||
self.entries.extend(entries.into_iter().map(Into::into));
|
||||
}
|
||||
|
||||
/// Returns the raw append-only journal entries.
|
||||
pub fn entries(&self) -> &[JournalEntry] {
|
||||
&self.entries
|
||||
}
|
||||
|
||||
/// Starts building one prompt-metadata entry.
|
||||
pub fn metadata_entry_builder(
|
||||
key: impl Into<JournalKey>,
|
||||
message: impl Into<crate::PromptMessage>,
|
||||
) -> MetadataEntryBuilder {
|
||||
MetadataEntryBuilder::new(key, message)
|
||||
}
|
||||
|
||||
/// Builds one prompt-metadata entry if the message is non-empty after trimming text content.
|
||||
pub fn metadata_entry(
|
||||
key: impl Into<JournalKey>,
|
||||
prompt_order: i64,
|
||||
message: impl Into<crate::PromptMessage>,
|
||||
) -> Option<JournalEntry> {
|
||||
Self::metadata_entry_builder(key, message)
|
||||
.prompt_order(prompt_order)
|
||||
.build()
|
||||
}
|
||||
|
||||
pub fn context_entry_builder(
|
||||
key: impl Into<JournalKey>,
|
||||
message: impl Into<crate::PromptMessage>,
|
||||
) -> MetadataEntryBuilder {
|
||||
Self::metadata_entry_builder(key, message)
|
||||
}
|
||||
|
||||
pub fn context_entry(
|
||||
key: impl Into<JournalKey>,
|
||||
prompt_order: i64,
|
||||
message: impl Into<crate::PromptMessage>,
|
||||
) -> Option<JournalEntry> {
|
||||
Self::metadata_entry(key, prompt_order, message)
|
||||
}
|
||||
|
||||
/// Returns a journal containing only journal entries whose keys match the filter.
|
||||
pub fn filter(&self, filter: &KeyFilter) -> Self {
|
||||
let entries = self
|
||||
.entries
|
||||
.iter()
|
||||
.filter(|entry| filter.matches(&entry.key))
|
||||
.cloned()
|
||||
.collect();
|
||||
Self::from_entries(entries)
|
||||
}
|
||||
|
||||
/// Returns the number of journal entries.
|
||||
pub fn len(&self) -> usize {
|
||||
self.entries.len()
|
||||
}
|
||||
|
||||
/// Returns whether the journal is empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.entries.is_empty()
|
||||
}
|
||||
|
||||
/// Renders the current effective journal view into model prompt items.
|
||||
pub fn to_prompt(&self, view: &PromptView) -> Result<Vec<ResponseItem>> {
|
||||
self.to_prompt_matching_filter(view, None)
|
||||
}
|
||||
|
||||
/// Renders the current effective journal view into model prompt items after selecting only
|
||||
/// journal entries whose keys match the filter.
|
||||
pub fn to_prompt_with_filter(
|
||||
&self,
|
||||
view: &PromptView,
|
||||
filter: &KeyFilter,
|
||||
) -> Result<Vec<ResponseItem>> {
|
||||
self.to_prompt_matching_filter(view, Some(filter))
|
||||
}
|
||||
|
||||
/// Produces a flattened child state for the provided view.
|
||||
pub fn fork(&self, view: &PromptView) -> Result<Self> {
|
||||
self.fork_matching_filter(view, None)
|
||||
}
|
||||
|
||||
/// Produces a flattened child state for the provided view after selecting only
|
||||
/// journal entries whose keys match the filter.
|
||||
pub fn fork_with_filter(&self, view: &PromptView, filter: &KeyFilter) -> Result<Self> {
|
||||
self.fork_matching_filter(view, Some(filter))
|
||||
}
|
||||
|
||||
/// Drops obsolete journal entries and keeps only the current effective journal view.
|
||||
///
|
||||
/// This is the first building block for a rolling in-memory window: callers can
|
||||
/// persist the full journal elsewhere, then keep only the flattened journal hot.
|
||||
pub fn flatten(&self) -> Result<Self> {
|
||||
let resolved = self.resolve()?;
|
||||
Ok(Self::from_entries(resolved.into_entries()))
|
||||
}
|
||||
|
||||
/// Keeps only the current effective journal view plus the history suffix that starts
|
||||
/// at the resolved cursor.
|
||||
///
|
||||
/// This is a lightweight rolling-window helper: callers can persist the full
|
||||
/// journal on disk, then keep only a recent hot suffix in memory.
|
||||
pub fn with_history_window(&self, start: &JournalHistoryCursor) -> Result<Self> {
|
||||
let resolved = self.resolve()?;
|
||||
let start_index = resolve_cursor(resolved.transcript().entries(), start)?;
|
||||
let ResolvedJournal {
|
||||
metadata,
|
||||
transcript,
|
||||
} = resolved;
|
||||
Ok(Self::from_entries(
|
||||
metadata
|
||||
.into_iter()
|
||||
.chain(transcript.into_iter().skip(start_index))
|
||||
.collect(),
|
||||
))
|
||||
}
|
||||
|
||||
/// Persists the raw journal to a JSONL file, one `JournalEntry` per line.
|
||||
pub fn persist_jsonl(&self, path: &Path) -> Result<()> {
|
||||
let file = File::create(path)?;
|
||||
let mut writer = BufWriter::new(file);
|
||||
for entry in &self.entries {
|
||||
serde_json::to_writer(&mut writer, entry)?;
|
||||
writer.write_all(b"\n")?;
|
||||
}
|
||||
writer.flush()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Loads a raw journal from a JSONL file written by [`Self::persist_jsonl`].
|
||||
pub fn load_jsonl(path: &Path) -> Result<Self> {
|
||||
let file = File::open(path)?;
|
||||
let reader = BufReader::new(file);
|
||||
let mut entries = Vec::new();
|
||||
for (line_index, line) in reader.lines().enumerate() {
|
||||
let line = line?;
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let entry = serde_json::from_str::<JournalEntry>(&line).map_err(|source| {
|
||||
JournalError::ParseJson {
|
||||
line_number: line_index + 1,
|
||||
source,
|
||||
}
|
||||
})?;
|
||||
entries.push(entry);
|
||||
}
|
||||
Ok(Self::from_entries(entries))
|
||||
}
|
||||
|
||||
/// Resolves the current effective journal into prompt metadata and transcript views.
|
||||
pub fn resolve(&self) -> Result<ResolvedJournal> {
|
||||
self.resolve_filter(None)
|
||||
}
|
||||
|
||||
/// Resolves the current effective journal after selecting only entries whose keys match the
|
||||
/// filter.
|
||||
pub fn resolve_with_filter(&self, filter: &KeyFilter) -> Result<ResolvedJournal> {
|
||||
self.resolve_filter(Some(filter))
|
||||
}
|
||||
|
||||
fn to_prompt_matching_filter(
|
||||
&self,
|
||||
view: &PromptView,
|
||||
filter: Option<&KeyFilter>,
|
||||
) -> Result<Vec<ResponseItem>> {
|
||||
let resolved = self.resolve_filter(filter)?;
|
||||
let mut prompt: Vec<ResponseItem> = resolved
|
||||
.metadata
|
||||
.into_iter()
|
||||
.filter(|entry| metadata_visible_in_view(metadata_item(entry), view))
|
||||
.map(|entry| match entry.item {
|
||||
JournalItem::Metadata(item) => ResponseItem::from(item),
|
||||
_ => unreachable!("resolved metadata entries must be metadata items"),
|
||||
})
|
||||
.collect();
|
||||
prompt.extend(
|
||||
resolved
|
||||
.transcript
|
||||
.into_iter()
|
||||
.map(|entry| match entry.item {
|
||||
JournalItem::Transcript(item) => ResponseItem::from(item),
|
||||
_ => unreachable!("resolved transcript entries must be transcript items"),
|
||||
}),
|
||||
);
|
||||
Ok(prompt)
|
||||
}
|
||||
|
||||
fn fork_matching_filter(&self, view: &PromptView, filter: Option<&KeyFilter>) -> Result<Self> {
|
||||
let resolved = self.resolve_filter(filter)?;
|
||||
Ok(Self::from_entries(
|
||||
resolved
|
||||
.metadata
|
||||
.into_iter()
|
||||
.filter(|entry| metadata_item(entry).on_fork == JournalContextForkBehavior::Keep)
|
||||
.filter(|entry| metadata_visible_in_view(metadata_item(entry), view))
|
||||
.chain(resolved.transcript)
|
||||
.collect(),
|
||||
))
|
||||
}
|
||||
|
||||
fn resolve_filter(&self, filter: Option<&KeyFilter>) -> Result<ResolvedJournal> {
|
||||
let mut transcript = Vec::<JournalEntry>::new();
|
||||
let mut latest_metadata_by_key = IndexMap::<JournalKey, (usize, JournalEntry)>::new();
|
||||
|
||||
for (index, entry) in self.entries.iter().enumerate() {
|
||||
if let Some(filter) = filter
|
||||
&& !filter.matches(&entry.key)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
match &entry.item {
|
||||
JournalItem::Transcript(_) => transcript.push(entry.clone()),
|
||||
JournalItem::Metadata(_) => {
|
||||
latest_metadata_by_key.insert(entry.key.clone(), (index, entry.clone()));
|
||||
}
|
||||
JournalItem::Checkpoint(checkpoint) => {
|
||||
apply_checkpoint(&mut transcript, &entry.key, checkpoint)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut metadata = latest_metadata_by_key.into_values().collect::<Vec<_>>();
|
||||
metadata.sort_by(|(left_index, left_entry), (right_index, right_entry)| {
|
||||
metadata_item(left_entry)
|
||||
.prompt_order
|
||||
.cmp(&metadata_item(right_entry).prompt_order)
|
||||
.then_with(|| left_index.cmp(right_index))
|
||||
});
|
||||
|
||||
Ok(ResolvedJournal::new(
|
||||
metadata.into_iter().map(|(_, entry)| entry).collect(),
|
||||
transcript,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Effective journal view after deduplicating prompt context and applying history checkpoints.
|
||||
///
|
||||
/// `contexts` and `history` are derived from the same append-only source of truth, but they have
|
||||
/// different semantics:
|
||||
///
|
||||
/// - [`ResolvedMetadata`] contains keyed prompt-metadata entries. Later entries with the same key
|
||||
/// replace earlier ones and the result is ordered by `prompt_order`.
|
||||
/// - [`ResolvedTranscript`] contains ordered transcript entries after checkpoint application.
|
||||
/// Transcript preserves order and can be truncated or rewritten by checkpoints.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct ResolvedJournal {
|
||||
metadata: ResolvedMetadata,
|
||||
transcript: ResolvedTranscript,
|
||||
}
|
||||
|
||||
impl ResolvedJournal {
|
||||
fn new(metadata: Vec<JournalEntry>, transcript: Vec<JournalEntry>) -> Self {
|
||||
Self {
|
||||
metadata: ResolvedMetadata::new(metadata),
|
||||
transcript: ResolvedTranscript::new(transcript),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the effective prompt-metadata view.
|
||||
pub fn metadata(&self) -> &ResolvedMetadata {
|
||||
&self.metadata
|
||||
}
|
||||
|
||||
/// Returns the effective transcript view.
|
||||
pub fn transcript(&self) -> &ResolvedTranscript {
|
||||
&self.transcript
|
||||
}
|
||||
|
||||
/// Returns the effective prompt-metadata view.
|
||||
pub fn contexts(&self) -> &ResolvedMetadata {
|
||||
self.metadata()
|
||||
}
|
||||
|
||||
/// Returns the effective transcript view.
|
||||
pub fn history(&self) -> &ResolvedTranscript {
|
||||
self.transcript()
|
||||
}
|
||||
|
||||
/// Consumes the resolved view and returns only the prompt-metadata entries.
|
||||
pub fn into_metadata(self) -> ResolvedMetadata {
|
||||
self.metadata
|
||||
}
|
||||
|
||||
/// Consumes the resolved view and returns only the transcript entries.
|
||||
pub fn into_transcript(self) -> ResolvedTranscript {
|
||||
self.transcript
|
||||
}
|
||||
|
||||
pub fn into_contexts(self) -> ResolvedMetadata {
|
||||
self.into_metadata()
|
||||
}
|
||||
|
||||
pub fn into_history(self) -> ResolvedTranscript {
|
||||
self.into_transcript()
|
||||
}
|
||||
|
||||
/// Consumes the resolved view and concatenates metadata followed by transcript entries.
|
||||
pub fn into_entries(self) -> Vec<JournalEntry> {
|
||||
self.metadata
|
||||
.into_entries()
|
||||
.into_iter()
|
||||
.chain(self.transcript.into_entries())
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
|
||||
/// Effective prompt-metadata entries derived from a journal.
|
||||
///
|
||||
/// These entries are deduplicated by key and sorted for prompt rendering. They are suitable for
|
||||
/// rendering or for building flattened and forked journal states.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct ResolvedMetadata {
|
||||
entries: Vec<JournalEntry>,
|
||||
}
|
||||
|
||||
impl ResolvedMetadata {
|
||||
fn new(entries: Vec<JournalEntry>) -> Self {
|
||||
Self { entries }
|
||||
}
|
||||
|
||||
/// Returns the resolved prompt-metadata entries.
|
||||
pub fn entries(&self) -> &[JournalEntry] {
|
||||
&self.entries
|
||||
}
|
||||
|
||||
/// Returns whether there are no resolved prompt-metadata entries.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.entries.is_empty()
|
||||
}
|
||||
|
||||
/// Returns the number of resolved prompt-metadata entries.
|
||||
pub fn len(&self) -> usize {
|
||||
self.entries.len()
|
||||
}
|
||||
|
||||
/// Consumes the view and returns its entries.
|
||||
pub fn into_entries(self) -> Vec<JournalEntry> {
|
||||
self.entries
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoIterator for ResolvedMetadata {
|
||||
type Item = JournalEntry;
|
||||
type IntoIter = std::vec::IntoIter<JournalEntry>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
self.entries.into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
/// Effective transcript entries derived from a journal.
|
||||
///
|
||||
/// These entries preserve prompt-visible order after checkpoint application. Unlike prompt
|
||||
/// metadata, transcript is not deduplicated by key.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct ResolvedTranscript {
|
||||
entries: Vec<JournalEntry>,
|
||||
}
|
||||
|
||||
impl ResolvedTranscript {
|
||||
fn new(entries: Vec<JournalEntry>) -> Self {
|
||||
Self { entries }
|
||||
}
|
||||
|
||||
/// Returns the resolved transcript entries.
|
||||
pub fn entries(&self) -> &[JournalEntry] {
|
||||
&self.entries
|
||||
}
|
||||
|
||||
/// Returns whether there are no resolved transcript entries.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.entries.is_empty()
|
||||
}
|
||||
|
||||
/// Returns the number of resolved transcript entries.
|
||||
pub fn len(&self) -> usize {
|
||||
self.entries.len()
|
||||
}
|
||||
|
||||
/// Consumes the view and returns its entries.
|
||||
pub fn into_entries(self) -> Vec<JournalEntry> {
|
||||
self.entries
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoIterator for ResolvedTranscript {
|
||||
type Item = JournalEntry;
|
||||
type IntoIter = std::vec::IntoIter<JournalEntry>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
self.entries.into_iter()
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_checkpoint(
|
||||
transcript: &mut Vec<JournalEntry>,
|
||||
checkpoint_key: &JournalKey,
|
||||
checkpoint: &JournalCheckpointItem,
|
||||
) -> Result<()> {
|
||||
match checkpoint {
|
||||
JournalCheckpointItem::ReplacePrefix(JournalReplacePrefixCheckpoint {
|
||||
through,
|
||||
replacement,
|
||||
}) => {
|
||||
let keep_from = resolve_cursor(transcript.as_slice(), through)?;
|
||||
let mut next_transcript =
|
||||
Vec::with_capacity(replacement.len() + transcript.len().saturating_sub(keep_from));
|
||||
next_transcript.extend(
|
||||
replacement
|
||||
.iter()
|
||||
.cloned()
|
||||
.enumerate()
|
||||
.map(|(index, item)| {
|
||||
JournalEntry::new(
|
||||
replacement_transcript_key(checkpoint_key, index, &item),
|
||||
item,
|
||||
)
|
||||
}),
|
||||
);
|
||||
next_transcript.extend(transcript[keep_from..].iter().cloned());
|
||||
*transcript = next_transcript;
|
||||
Ok(())
|
||||
}
|
||||
JournalCheckpointItem::TruncateHistory(JournalTruncateHistoryCheckpoint { through }) => {
|
||||
let keep_len = resolve_cursor(transcript.as_slice(), through)?;
|
||||
transcript.truncate(keep_len);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve_cursor(transcript: &[JournalEntry], cursor: &JournalHistoryCursor) -> Result<usize> {
|
||||
match cursor {
|
||||
JournalHistoryCursor::Start => Ok(0),
|
||||
JournalHistoryCursor::End => Ok(transcript.len()),
|
||||
JournalHistoryCursor::AfterItem(history_item_id) => transcript
|
||||
.iter()
|
||||
.position(|entry| transcript_item(entry).id == *history_item_id)
|
||||
.map(|index| index + 1)
|
||||
.ok_or_else(|| JournalError::UnknownHistoryItemId {
|
||||
history_item_id: history_item_id.clone(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn replacement_transcript_key(
|
||||
checkpoint_key: &JournalKey,
|
||||
index: usize,
|
||||
transcript_item: &JournalTranscriptItem,
|
||||
) -> JournalKey {
|
||||
checkpoint_key
|
||||
.child("replacement")
|
||||
.child(index.to_string())
|
||||
.child(transcript_item.id.clone())
|
||||
}
|
||||
|
||||
fn metadata_item(entry: &JournalEntry) -> &JournalMetadataItem {
|
||||
match &entry.item {
|
||||
JournalItem::Metadata(item) => item,
|
||||
_ => unreachable!("resolved metadata entries must be metadata items"),
|
||||
}
|
||||
}
|
||||
|
||||
fn transcript_item(entry: &JournalEntry) -> &JournalTranscriptItem {
|
||||
match &entry.item {
|
||||
JournalItem::Transcript(item) => item,
|
||||
_ => unreachable!("resolved transcript entries must be transcript items"),
|
||||
}
|
||||
}
|
||||
|
||||
fn metadata_visible_in_view(item: &JournalMetadataItem, view: &PromptView) -> bool {
|
||||
match &item.audience {
|
||||
JournalContextAudience::All => true,
|
||||
JournalContextAudience::RootOnly => view.is_root,
|
||||
JournalContextAudience::SubAgentsOnly => !view.is_root,
|
||||
JournalContextAudience::AgentPathPrefix(prefix) => view
|
||||
.agent_path
|
||||
.as_deref()
|
||||
.is_some_and(|agent_path| agent_path.starts_with(prefix)),
|
||||
JournalContextAudience::AgentRole(role) => view
|
||||
.agent_role
|
||||
.as_deref()
|
||||
.is_some_and(|agent_role| agent_role == role),
|
||||
}
|
||||
}
|
||||
50
codex-rs/journal/src/lib.rs
Normal file
50
codex-rs/journal/src/lib.rs
Normal file
@@ -0,0 +1,50 @@
|
||||
//! Typed journal model for prompt rendering, filtering, forking, and persistence.
|
||||
//!
|
||||
//! A [`Journal`] stores one append-only sequence of [`JournalEntry`] values and resolves it into
|
||||
//! two derived views:
|
||||
//!
|
||||
//! - prompt metadata: keyed, deduplicated entries ordered by `prompt_order`
|
||||
//! - transcript: ordered prompt-visible items after checkpoint application
|
||||
//!
|
||||
//! Callers can resolve the journal once, then choose how to project those views into prompt
|
||||
//! messages with [`PromptRenderer`].
|
||||
|
||||
mod context_builder;
|
||||
mod error;
|
||||
pub mod history;
|
||||
mod journal;
|
||||
mod prompt_view;
|
||||
mod render;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
pub use codex_protocol::journal::JournalCheckpointItem;
|
||||
pub use codex_protocol::journal::JournalContextAudience;
|
||||
pub use codex_protocol::journal::JournalContextForkBehavior;
|
||||
pub use codex_protocol::journal::JournalEntry;
|
||||
pub use codex_protocol::journal::JournalHistoryCursor;
|
||||
pub use codex_protocol::journal::JournalItem;
|
||||
pub use codex_protocol::journal::JournalKey;
|
||||
pub use codex_protocol::journal::JournalMetadataItem;
|
||||
pub use codex_protocol::journal::JournalReplacePrefixCheckpoint;
|
||||
pub use codex_protocol::journal::JournalTranscriptItem;
|
||||
pub use codex_protocol::journal::JournalTruncateHistoryCheckpoint;
|
||||
pub use codex_protocol::journal::KeyFilter;
|
||||
pub use codex_protocol::journal::PromptMessage;
|
||||
pub use codex_protocol::journal::PromptMessageRole;
|
||||
pub use context_builder::MetadataEntryBuilder;
|
||||
pub use error::JournalError;
|
||||
pub use error::Result;
|
||||
pub use journal::Journal;
|
||||
pub use journal::ResolvedJournal;
|
||||
pub use journal::ResolvedMetadata;
|
||||
pub use journal::ResolvedTranscript;
|
||||
pub use prompt_view::PromptView;
|
||||
pub use render::PromptRenderer;
|
||||
|
||||
pub type JournalContextItem = JournalMetadataItem;
|
||||
pub type JournalHistoryItem = JournalTranscriptItem;
|
||||
pub type ContextEntryBuilder = MetadataEntryBuilder;
|
||||
pub type ResolvedContexts = ResolvedMetadata;
|
||||
pub type ResolvedHistory = ResolvedTranscript;
|
||||
33
codex-rs/journal/src/prompt_view.rs
Normal file
33
codex-rs/journal/src/prompt_view.rs
Normal file
@@ -0,0 +1,33 @@
|
||||
/// View configuration used when rendering or forking a journal.
|
||||
#[derive(Debug, Clone, Default, PartialEq, Eq)]
|
||||
pub struct PromptView {
|
||||
pub(crate) is_root: bool,
|
||||
pub(crate) agent_path: Option<String>,
|
||||
pub(crate) agent_role: Option<String>,
|
||||
}
|
||||
|
||||
impl PromptView {
|
||||
/// Returns the view for the root agent.
|
||||
pub fn root() -> Self {
|
||||
Self {
|
||||
is_root: true,
|
||||
agent_path: None,
|
||||
agent_role: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the view for a spawned subagent.
|
||||
pub fn subagent(agent_path: impl Into<String>, agent_role: Option<String>) -> Self {
|
||||
Self {
|
||||
is_root: false,
|
||||
agent_path: Some(agent_path.into()),
|
||||
agent_role,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the agent role used for audience matching.
|
||||
pub fn with_agent_role(mut self, agent_role: impl Into<String>) -> Self {
|
||||
self.agent_role = Some(agent_role.into());
|
||||
self
|
||||
}
|
||||
}
|
||||
86
codex-rs/journal/src/render.rs
Normal file
86
codex-rs/journal/src/render.rs
Normal file
@@ -0,0 +1,86 @@
|
||||
use crate::JournalItem;
|
||||
use crate::KeyFilter;
|
||||
use crate::PromptMessage;
|
||||
use crate::PromptMessageRole;
|
||||
use crate::ResolvedMetadata;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
|
||||
/// Renders resolved prompt metadata into model-visible prompt messages.
|
||||
///
|
||||
/// Group filters are applied in declaration order. Entries are merged only when they are
|
||||
/// consecutive, match the same group filter, and share the same role.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct PromptRenderer {
|
||||
group_filters: Vec<KeyFilter>,
|
||||
}
|
||||
|
||||
impl PromptRenderer {
|
||||
/// Creates a renderer with no grouping rules.
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
/// Adds one grouping rule for consecutive resolved metadata entries.
|
||||
pub fn group(mut self, filter: KeyFilter) -> Self {
|
||||
self.group_filters.push(filter);
|
||||
self
|
||||
}
|
||||
|
||||
/// Renders resolved prompt metadata according to the configured grouping rules.
|
||||
pub fn render_metadata(&self, metadata: &ResolvedMetadata) -> Vec<ResponseItem> {
|
||||
let mut rendered = Vec::new();
|
||||
let mut current_group: Option<usize> = None;
|
||||
let mut current_role: Option<PromptMessageRole> = None;
|
||||
let mut current_content = Vec::new();
|
||||
|
||||
for entry in metadata.entries() {
|
||||
let JournalItem::Metadata(item) = &entry.item else {
|
||||
continue;
|
||||
};
|
||||
let group = self
|
||||
.group_filters
|
||||
.iter()
|
||||
.position(|filter| filter.matches(&entry.key));
|
||||
|
||||
if group.is_none() {
|
||||
flush_prompt_message(&mut rendered, &mut current_role, &mut current_content);
|
||||
rendered.push(ResponseItem::from(item.clone()));
|
||||
current_group = None;
|
||||
continue;
|
||||
}
|
||||
|
||||
if current_group != group || current_role != Some(item.message.role) {
|
||||
flush_prompt_message(&mut rendered, &mut current_role, &mut current_content);
|
||||
current_group = group;
|
||||
current_role = Some(item.message.role);
|
||||
}
|
||||
|
||||
current_content.extend(item.message.content.clone());
|
||||
}
|
||||
|
||||
flush_prompt_message(&mut rendered, &mut current_role, &mut current_content);
|
||||
rendered
|
||||
}
|
||||
|
||||
pub fn render_contexts(&self, contexts: &ResolvedMetadata) -> Vec<ResponseItem> {
|
||||
self.render_metadata(contexts)
|
||||
}
|
||||
}
|
||||
|
||||
fn flush_prompt_message(
|
||||
rendered: &mut Vec<ResponseItem>,
|
||||
role: &mut Option<PromptMessageRole>,
|
||||
content: &mut Vec<ContentItem>,
|
||||
) {
|
||||
let Some(role) = role.take() else {
|
||||
return;
|
||||
};
|
||||
if content.is_empty() {
|
||||
return;
|
||||
}
|
||||
rendered.push(ResponseItem::from(PromptMessage::new(
|
||||
role,
|
||||
std::mem::take(content),
|
||||
)));
|
||||
}
|
||||
516
codex-rs/journal/src/tests.rs
Normal file
516
codex-rs/journal/src/tests.rs
Normal file
@@ -0,0 +1,516 @@
|
||||
use crate::Journal;
|
||||
use crate::JournalCheckpointItem;
|
||||
use crate::JournalContextAudience;
|
||||
use crate::JournalContextForkBehavior;
|
||||
use crate::JournalEntry;
|
||||
use crate::JournalHistoryCursor;
|
||||
use crate::JournalMetadataItem;
|
||||
use crate::JournalReplacePrefixCheckpoint;
|
||||
use crate::JournalTranscriptItem;
|
||||
use crate::JournalTruncateHistoryCheckpoint;
|
||||
use crate::KeyFilter;
|
||||
use crate::PromptMessage;
|
||||
use crate::PromptRenderer;
|
||||
use crate::PromptView;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::tempdir;
|
||||
|
||||
fn user_message(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn assistant_message(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn developer_context(text: &str, prompt_order: i64) -> JournalMetadataItem {
|
||||
JournalMetadataItem::new(PromptMessage::developer_text(text)).with_prompt_order(prompt_order)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn to_prompt_uses_latest_context_for_key() {
|
||||
let mut state = Journal::new();
|
||||
state.add(
|
||||
["prompt", "permissions"],
|
||||
developer_context("older permissions", 10),
|
||||
);
|
||||
state.add(["history", "hello"], user_message("hello"));
|
||||
state.add(
|
||||
["prompt", "permissions"],
|
||||
developer_context("newer permissions", 10),
|
||||
);
|
||||
|
||||
let prompt = state
|
||||
.to_prompt(&PromptView::root())
|
||||
.expect("prompt should render");
|
||||
|
||||
assert_eq!(
|
||||
prompt,
|
||||
vec![
|
||||
ResponseItem::from(PromptMessage::developer_text("newer permissions")),
|
||||
user_message("hello"),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn to_prompt_filters_context_by_audience() {
|
||||
let mut state = Journal::new();
|
||||
state.add(
|
||||
["prompt", "root", "hint"],
|
||||
developer_context("root-only", 0).with_audience(JournalContextAudience::RootOnly),
|
||||
);
|
||||
state.add(
|
||||
["prompt", "child", "hint"],
|
||||
developer_context("child-only", 1).with_audience(JournalContextAudience::SubAgentsOnly),
|
||||
);
|
||||
|
||||
let root_prompt = state
|
||||
.to_prompt(&PromptView::root())
|
||||
.expect("root prompt should render");
|
||||
let child_prompt = state
|
||||
.to_prompt(&PromptView::subagent(
|
||||
"/root/worker",
|
||||
Option::<String>::None,
|
||||
))
|
||||
.expect("child prompt should render");
|
||||
|
||||
assert_eq!(
|
||||
root_prompt,
|
||||
vec![ResponseItem::from(PromptMessage::developer_text(
|
||||
"root-only"
|
||||
))]
|
||||
);
|
||||
assert_eq!(
|
||||
child_prompt,
|
||||
vec![ResponseItem::from(PromptMessage::developer_text(
|
||||
"child-only"
|
||||
))]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn to_prompt_with_filter_matches_key_prefix() {
|
||||
let mut state = Journal::new();
|
||||
state.add(["prompt", "root", "keep"], developer_context("keep me", 0));
|
||||
state.add(["prompt", "child", "drop"], developer_context("drop me", 1));
|
||||
|
||||
let prompt = state
|
||||
.to_prompt_with_filter(&PromptView::root(), &KeyFilter::prefix(["prompt", "root"]))
|
||||
.expect("prompt should render");
|
||||
|
||||
assert_eq!(
|
||||
prompt,
|
||||
vec![ResponseItem::from(PromptMessage::developer_text("keep me"))]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn checkpoints_replace_prefix_and_then_truncate_history() {
|
||||
let first = JournalTranscriptItem::new(user_message("turn 1"));
|
||||
let second = JournalTranscriptItem::new(assistant_message("turn 1 answer"));
|
||||
let third = JournalTranscriptItem::new(user_message("turn 2"));
|
||||
let summary = JournalTranscriptItem {
|
||||
id: "summary".to_string(),
|
||||
turn_id: None,
|
||||
item: assistant_message("summary"),
|
||||
};
|
||||
|
||||
let state = Journal::from_entries(vec![
|
||||
JournalEntry::new(["history", "1"], first),
|
||||
JournalEntry::new(["history", "2"], second.clone()),
|
||||
JournalEntry::new(["history", "3"], third),
|
||||
JournalEntry::new(
|
||||
["checkpoint", "replace"],
|
||||
JournalCheckpointItem::ReplacePrefix(JournalReplacePrefixCheckpoint {
|
||||
through: JournalHistoryCursor::AfterItem(second.id),
|
||||
replacement: vec![summary.clone()],
|
||||
}),
|
||||
),
|
||||
JournalEntry::new(
|
||||
["checkpoint", "truncate"],
|
||||
JournalCheckpointItem::TruncateHistory(JournalTruncateHistoryCheckpoint {
|
||||
through: JournalHistoryCursor::AfterItem(summary.id),
|
||||
}),
|
||||
),
|
||||
]);
|
||||
|
||||
let prompt = state
|
||||
.to_prompt(&PromptView::root())
|
||||
.expect("prompt should render");
|
||||
|
||||
assert_eq!(prompt, vec![assistant_message("summary")]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn flatten_preserves_prompt_and_drops_obsolete_items() {
|
||||
let first = JournalTranscriptItem::new(user_message("turn 1"));
|
||||
let answer = JournalTranscriptItem::new(assistant_message("turn 1 answer"));
|
||||
let summary = JournalTranscriptItem {
|
||||
id: "summary".to_string(),
|
||||
turn_id: None,
|
||||
item: assistant_message("summary"),
|
||||
};
|
||||
let state = Journal::from_entries(vec![
|
||||
JournalEntry::new(["prompt", "permissions"], developer_context("old", 0)),
|
||||
JournalEntry::new(["prompt", "permissions"], developer_context("new", 0)),
|
||||
JournalEntry::new(["history", "1"], first),
|
||||
JournalEntry::new(["history", "2"], answer.clone()),
|
||||
JournalEntry::new(
|
||||
["checkpoint", "replace"],
|
||||
JournalCheckpointItem::ReplacePrefix(JournalReplacePrefixCheckpoint {
|
||||
through: JournalHistoryCursor::AfterItem(answer.id),
|
||||
replacement: vec![summary.clone()],
|
||||
}),
|
||||
),
|
||||
]);
|
||||
|
||||
let before = state
|
||||
.to_prompt(&PromptView::root())
|
||||
.expect("prompt should render");
|
||||
let flattened = state.flatten().expect("flatten should succeed");
|
||||
let after = flattened
|
||||
.to_prompt(&PromptView::root())
|
||||
.expect("flattened prompt should render");
|
||||
|
||||
assert_eq!(before, after);
|
||||
assert_eq!(
|
||||
flattened.entries(),
|
||||
vec![
|
||||
JournalEntry::new(["prompt", "permissions"], developer_context("new", 0),),
|
||||
JournalEntry::new(
|
||||
["checkpoint", "replace", "replacement", "0", "summary"],
|
||||
summary,
|
||||
),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn with_history_window_keeps_only_recent_effective_history() {
|
||||
let first = JournalTranscriptItem::new(user_message("turn 1"));
|
||||
let second = JournalTranscriptItem::new(assistant_message("turn 1 answer"));
|
||||
let third = JournalTranscriptItem::new(user_message("turn 2"));
|
||||
let state = Journal::from_entries(vec![
|
||||
JournalEntry::new(
|
||||
["prompt", "permissions", "current"],
|
||||
developer_context("p", 0),
|
||||
),
|
||||
JournalEntry::new(["history", "1"], first),
|
||||
JournalEntry::new(["history", "2"], second.clone()),
|
||||
JournalEntry::new(["history", "3"], third.clone()),
|
||||
]);
|
||||
|
||||
let windowed = state
|
||||
.with_history_window(&JournalHistoryCursor::AfterItem(second.id))
|
||||
.expect("windowing should succeed");
|
||||
|
||||
assert_eq!(
|
||||
windowed.entries(),
|
||||
vec![
|
||||
JournalEntry::new(
|
||||
["prompt", "permissions", "current"],
|
||||
developer_context("p", 0),
|
||||
),
|
||||
JournalEntry::new(["history", "3"], third),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fork_drops_non_keep_context_and_respects_audience() {
|
||||
let history = JournalTranscriptItem::new(user_message("hello"));
|
||||
let state = Journal::from_entries(vec![
|
||||
JournalEntry::new(
|
||||
["prompt", "child", "shared"],
|
||||
developer_context("shared child context", 0)
|
||||
.with_audience(JournalContextAudience::SubAgentsOnly),
|
||||
),
|
||||
JournalEntry::new(
|
||||
["prompt", "child", "regenerate"],
|
||||
developer_context("usage hint", 1)
|
||||
.with_audience(JournalContextAudience::SubAgentsOnly)
|
||||
.with_on_fork(JournalContextForkBehavior::Regenerate),
|
||||
),
|
||||
JournalEntry::new(
|
||||
["prompt", "root", "only"],
|
||||
developer_context("root only", 2).with_audience(JournalContextAudience::RootOnly),
|
||||
),
|
||||
JournalEntry::new(["history", "hello"], history.clone()),
|
||||
]);
|
||||
|
||||
let forked = state
|
||||
.fork(&PromptView::subagent(
|
||||
"/root/worker",
|
||||
Option::<String>::None,
|
||||
))
|
||||
.expect("fork should succeed");
|
||||
|
||||
assert_eq!(
|
||||
forked.entries(),
|
||||
vec![
|
||||
JournalEntry::new(
|
||||
["prompt", "child", "shared"],
|
||||
developer_context("shared child context", 0)
|
||||
.with_audience(JournalContextAudience::SubAgentsOnly),
|
||||
),
|
||||
JournalEntry::new(["history", "hello"], history),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persist_and_load_jsonl_round_trip() {
|
||||
let dir = tempdir().expect("tempdir");
|
||||
let path = dir.path().join("journal.jsonl");
|
||||
let history = JournalTranscriptItem::new(user_message("hello"));
|
||||
let state = Journal::from_entries(vec![
|
||||
JournalEntry::new(
|
||||
["prompt", "permissions", "current"],
|
||||
developer_context("p", 0),
|
||||
),
|
||||
JournalEntry::new(["history", "hello"], history),
|
||||
]);
|
||||
|
||||
state
|
||||
.persist_jsonl(path.as_path())
|
||||
.expect("journal should persist");
|
||||
let loaded = Journal::load_jsonl(path.as_path()).expect("journal should load");
|
||||
|
||||
assert_eq!(loaded, state);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn filter_returns_matching_raw_entries() {
|
||||
let state = Journal::from_entries(vec![
|
||||
JournalEntry::new(["prompt", "root", "keep"], developer_context("keep me", 0)),
|
||||
JournalEntry::new(["prompt", "child", "drop"], developer_context("drop me", 1)),
|
||||
JournalEntry::new(["history", "hello"], user_message("hello")),
|
||||
]);
|
||||
|
||||
let filtered = state.filter(&KeyFilter::prefix(["prompt", "root"]));
|
||||
|
||||
assert_eq!(
|
||||
filtered.entries(),
|
||||
vec![JournalEntry::new(
|
||||
["prompt", "root", "keep"],
|
||||
developer_context("keep me", 0),
|
||||
)]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn context_entry_skips_blank_text_messages() {
|
||||
assert_eq!(
|
||||
Journal::context_entry(
|
||||
["prompt", "developer", "blank"],
|
||||
10,
|
||||
PromptMessage::developer_text(" "),
|
||||
),
|
||||
None
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn context_entry_builder_carries_optional_fields() {
|
||||
let entry = Journal::context_entry_builder(
|
||||
["prompt", "developer", "one"],
|
||||
PromptMessage::developer_text("first"),
|
||||
)
|
||||
.prompt_order(10)
|
||||
.audience(JournalContextAudience::SubAgentsOnly)
|
||||
.on_fork(JournalContextForkBehavior::Regenerate)
|
||||
.tags(vec!["foo".to_string(), "bar".to_string()])
|
||||
.source("unit-test")
|
||||
.build()
|
||||
.expect("entry should be kept");
|
||||
|
||||
assert_eq!(
|
||||
entry,
|
||||
JournalEntry::new(
|
||||
["prompt", "developer", "one"],
|
||||
JournalMetadataItem::new(PromptMessage::developer_text("first"))
|
||||
.with_prompt_order(10)
|
||||
.with_audience(JournalContextAudience::SubAgentsOnly)
|
||||
.with_on_fork(JournalContextForkBehavior::Regenerate)
|
||||
.with_tags(vec!["foo".to_string(), "bar".to_string()])
|
||||
.with_source("unit-test"),
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resolve_splits_contexts_from_history() {
|
||||
let history = JournalTranscriptItem::new(user_message("hello"));
|
||||
let journal = Journal::from_entries(vec![
|
||||
Journal::context_entry(
|
||||
["prompt", "developer", "one"],
|
||||
10,
|
||||
PromptMessage::developer_text("first"),
|
||||
)
|
||||
.expect("entry should be kept"),
|
||||
Journal::context_entry(
|
||||
["prompt", "guardian", "one"],
|
||||
20,
|
||||
PromptMessage::developer_text("guardian one"),
|
||||
)
|
||||
.expect("entry should be kept"),
|
||||
Journal::context_entry(
|
||||
["prompt", "guardian", "two"],
|
||||
30,
|
||||
PromptMessage::developer_text("guardian two"),
|
||||
)
|
||||
.expect("entry should be kept"),
|
||||
JournalEntry::new(["history", "hello"], history.clone()),
|
||||
]);
|
||||
|
||||
let resolved = journal.resolve().expect("journal should resolve");
|
||||
|
||||
assert_eq!(
|
||||
resolved.metadata().entries(),
|
||||
vec![
|
||||
Journal::context_entry(
|
||||
["prompt", "developer", "one"],
|
||||
10,
|
||||
PromptMessage::developer_text("first"),
|
||||
)
|
||||
.expect("entry should be kept"),
|
||||
Journal::context_entry(
|
||||
["prompt", "guardian", "one"],
|
||||
20,
|
||||
PromptMessage::developer_text("guardian one"),
|
||||
)
|
||||
.expect("entry should be kept"),
|
||||
Journal::context_entry(
|
||||
["prompt", "guardian", "two"],
|
||||
30,
|
||||
PromptMessage::developer_text("guardian two"),
|
||||
)
|
||||
.expect("entry should be kept"),
|
||||
]
|
||||
);
|
||||
assert_eq!(
|
||||
resolved.transcript().entries(),
|
||||
vec![JournalEntry::new(["history", "hello"], history)]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prompt_renderer_groups_by_declared_prefix_and_role() {
|
||||
let resolved = Journal::from_entries(vec![
|
||||
Journal::context_entry(
|
||||
["prompt", "developer", "one"],
|
||||
10,
|
||||
PromptMessage::developer_text("first"),
|
||||
)
|
||||
.expect("entry should be kept"),
|
||||
Journal::context_entry(
|
||||
["prompt", "developer", "two"],
|
||||
20,
|
||||
PromptMessage::developer_text("second"),
|
||||
)
|
||||
.expect("entry should be kept"),
|
||||
Journal::context_entry(
|
||||
["prompt", "contextual_user", "one"],
|
||||
30,
|
||||
PromptMessage::user_text("third"),
|
||||
)
|
||||
.expect("entry should be kept"),
|
||||
])
|
||||
.resolve()
|
||||
.expect("entries should resolve");
|
||||
|
||||
let rendered = PromptRenderer::new()
|
||||
.group(KeyFilter::prefix(["prompt", "developer"]))
|
||||
.group(KeyFilter::prefix(["prompt", "contextual_user"]))
|
||||
.render_metadata(resolved.metadata());
|
||||
|
||||
assert_eq!(
|
||||
rendered,
|
||||
vec![
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "developer".to_string(),
|
||||
content: vec![
|
||||
ContentItem::InputText {
|
||||
text: "first".to_string(),
|
||||
},
|
||||
ContentItem::InputText {
|
||||
text: "second".to_string(),
|
||||
},
|
||||
],
|
||||
phase: None,
|
||||
},
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: "third".to_string(),
|
||||
}],
|
||||
phase: None,
|
||||
},
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prompt_renderer_leaves_ungrouped_entries_separate() {
|
||||
let resolved = Journal::from_entries(vec![
|
||||
Journal::context_entry(
|
||||
["prompt", "developer", "one"],
|
||||
10,
|
||||
PromptMessage::developer_text("first"),
|
||||
)
|
||||
.expect("entry should be kept"),
|
||||
Journal::context_entry(
|
||||
["prompt", "guardian", "one"],
|
||||
20,
|
||||
PromptMessage::developer_text("guardian one"),
|
||||
)
|
||||
.expect("entry should be kept"),
|
||||
Journal::context_entry(
|
||||
["prompt", "guardian", "two"],
|
||||
30,
|
||||
PromptMessage::developer_text("guardian two"),
|
||||
)
|
||||
.expect("entry should be kept"),
|
||||
])
|
||||
.resolve()
|
||||
.expect("entries should resolve");
|
||||
|
||||
let rendered = PromptRenderer::new()
|
||||
.group(KeyFilter::prefix(["prompt", "developer"]))
|
||||
.render_metadata(resolved.metadata());
|
||||
|
||||
assert_eq!(
|
||||
rendered,
|
||||
vec![
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: "developer".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: "first".to_string(),
|
||||
}],
|
||||
phase: None,
|
||||
},
|
||||
ResponseItem::from(PromptMessage::developer_text("guardian one")),
|
||||
ResponseItem::from(PromptMessage::developer_text("guardian two")),
|
||||
]
|
||||
);
|
||||
}
|
||||
364
codex-rs/protocol/src/journal.rs
Normal file
364
codex-rs/protocol/src/journal.rs
Normal file
@@ -0,0 +1,364 @@
|
||||
use crate::models::ContentItem;
|
||||
use crate::models::ResponseItem;
|
||||
use schemars::JsonSchema;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use ts_rs::TS;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Stable key attached to a journal entry.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, JsonSchema, TS, Default)]
|
||||
pub struct JournalKey {
|
||||
pub parts: Vec<String>,
|
||||
}
|
||||
|
||||
impl JournalKey {
|
||||
/// Creates a key from an ordered sequence of parts.
|
||||
pub fn new<I, S>(parts: I) -> Self
|
||||
where
|
||||
I: IntoIterator<Item = S>,
|
||||
S: Into<String>,
|
||||
{
|
||||
Self {
|
||||
parts: parts.into_iter().map(Into::into).collect(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the ordered key parts.
|
||||
pub fn parts(&self) -> &[String] {
|
||||
&self.parts
|
||||
}
|
||||
|
||||
/// Returns a new key with one child part appended.
|
||||
pub fn child(&self, part: impl Into<String>) -> Self {
|
||||
let mut parts = self.parts.clone();
|
||||
parts.push(part.into());
|
||||
Self { parts }
|
||||
}
|
||||
|
||||
/// Returns whether this key starts with the provided prefix.
|
||||
pub fn starts_with(&self, prefix: &JournalKey) -> bool {
|
||||
self.parts.starts_with(prefix.parts.as_slice())
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, const N: usize> From<[S; N]> for JournalKey
|
||||
where
|
||||
S: Into<String>,
|
||||
{
|
||||
fn from(value: [S; N]) -> Self {
|
||||
Self::new(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema, TS)]
|
||||
#[serde(tag = "type", content = "value", rename_all = "snake_case")]
|
||||
#[ts(tag = "type", content = "value", rename_all = "snake_case")]
|
||||
pub enum KeyFilter {
|
||||
Exact(JournalKey),
|
||||
Prefix(JournalKey),
|
||||
}
|
||||
|
||||
impl KeyFilter {
|
||||
/// Matches one exact key.
|
||||
pub fn exact(key: impl Into<JournalKey>) -> Self {
|
||||
Self::Exact(key.into())
|
||||
}
|
||||
|
||||
/// Matches any key with the provided prefix.
|
||||
pub fn prefix(prefix: impl Into<JournalKey>) -> Self {
|
||||
Self::Prefix(prefix.into())
|
||||
}
|
||||
|
||||
/// Returns whether the filter matches the key.
|
||||
pub fn matches(&self, key: &JournalKey) -> bool {
|
||||
match self {
|
||||
Self::Exact(expected) => key == expected,
|
||||
Self::Prefix(prefix) => key.starts_with(prefix),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Minimal message block that can be projected into a model-visible prompt item.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, TS)]
|
||||
pub struct PromptMessage {
|
||||
pub role: PromptMessageRole,
|
||||
pub content: Vec<ContentItem>,
|
||||
}
|
||||
|
||||
impl PromptMessage {
|
||||
pub fn new(role: PromptMessageRole, content: Vec<ContentItem>) -> Self {
|
||||
Self { role, content }
|
||||
}
|
||||
|
||||
pub fn developer_text(text: impl Into<String>) -> Self {
|
||||
Self::text(PromptMessageRole::Developer, text)
|
||||
}
|
||||
|
||||
pub fn user_text(text: impl Into<String>) -> Self {
|
||||
Self::text(PromptMessageRole::User, text)
|
||||
}
|
||||
|
||||
pub fn assistant_text(text: impl Into<String>) -> Self {
|
||||
Self::text(PromptMessageRole::Assistant, text)
|
||||
}
|
||||
|
||||
fn text(role: PromptMessageRole, text: impl Into<String>) -> Self {
|
||||
let text = text.into();
|
||||
let content = match role {
|
||||
PromptMessageRole::Developer | PromptMessageRole::User => {
|
||||
vec![ContentItem::InputText { text }]
|
||||
}
|
||||
PromptMessageRole::Assistant => vec![ContentItem::OutputText { text }],
|
||||
};
|
||||
Self::new(role, content)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PromptMessage> for ResponseItem {
|
||||
fn from(value: PromptMessage) -> Self {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
role: value.role.as_str().to_string(),
|
||||
content: value.content,
|
||||
phase: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema, TS, Default)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
#[ts(rename_all = "snake_case")]
|
||||
pub enum PromptMessageRole {
|
||||
#[default]
|
||||
Developer,
|
||||
User,
|
||||
Assistant,
|
||||
}
|
||||
|
||||
impl PromptMessageRole {
|
||||
pub fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Self::Developer => "developer",
|
||||
Self::User => "user",
|
||||
Self::Assistant => "assistant",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Durable transcript item. Unlike prompt metadata, transcript keeps original ordering.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, TS)]
|
||||
pub struct JournalTranscriptItem {
|
||||
pub id: String,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
#[ts(optional)]
|
||||
pub turn_id: Option<String>,
|
||||
pub item: ResponseItem,
|
||||
}
|
||||
|
||||
impl JournalTranscriptItem {
|
||||
pub fn new(item: ResponseItem) -> Self {
|
||||
Self {
|
||||
id: Uuid::new_v4().to_string(),
|
||||
turn_id: None,
|
||||
item,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_turn_id(mut self, turn_id: impl Into<String>) -> Self {
|
||||
self.turn_id = Some(turn_id.into());
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl From<ResponseItem> for JournalTranscriptItem {
|
||||
fn from(value: ResponseItem) -> Self {
|
||||
Self::new(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<JournalTranscriptItem> for ResponseItem {
|
||||
fn from(value: JournalTranscriptItem) -> Self {
|
||||
value.item
|
||||
}
|
||||
}
|
||||
|
||||
impl From<JournalMetadataItem> for ResponseItem {
|
||||
fn from(value: JournalMetadataItem) -> Self {
|
||||
value.message.into()
|
||||
}
|
||||
}
|
||||
|
||||
/// Prompt-metadata entry payload and filtering metadata.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, TS)]
|
||||
pub struct JournalMetadataItem {
|
||||
pub message: PromptMessage,
|
||||
#[serde(default)]
|
||||
pub prompt_order: i64,
|
||||
#[serde(default)]
|
||||
pub audience: JournalContextAudience,
|
||||
#[serde(default)]
|
||||
pub on_fork: JournalContextForkBehavior,
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub tags: Vec<String>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
#[ts(optional)]
|
||||
pub source: Option<String>,
|
||||
}
|
||||
|
||||
impl JournalMetadataItem {
|
||||
pub fn new(message: PromptMessage) -> Self {
|
||||
Self {
|
||||
message,
|
||||
prompt_order: 0,
|
||||
audience: JournalContextAudience::default(),
|
||||
on_fork: JournalContextForkBehavior::default(),
|
||||
tags: Vec::new(),
|
||||
source: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_prompt_order(mut self, prompt_order: i64) -> Self {
|
||||
self.prompt_order = prompt_order;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_audience(mut self, audience: JournalContextAudience) -> Self {
|
||||
self.audience = audience;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_on_fork(mut self, on_fork: JournalContextForkBehavior) -> Self {
|
||||
self.on_fork = on_fork;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_tags(mut self, tags: Vec<String>) -> Self {
|
||||
self.tags = tags;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_source(mut self, source: impl Into<String>) -> Self {
|
||||
self.source = Some(source.into());
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema, TS, Default)]
|
||||
#[serde(tag = "type", content = "value", rename_all = "snake_case")]
|
||||
#[ts(tag = "type", content = "value", rename_all = "snake_case")]
|
||||
pub enum JournalContextAudience {
|
||||
#[default]
|
||||
All,
|
||||
RootOnly,
|
||||
SubAgentsOnly,
|
||||
AgentPathPrefix(String),
|
||||
AgentRole(String),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema, TS, Default)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
#[ts(rename_all = "snake_case")]
|
||||
pub enum JournalContextForkBehavior {
|
||||
#[default]
|
||||
Keep,
|
||||
Drop,
|
||||
Regenerate,
|
||||
}
|
||||
|
||||
/// Cursor into the effective history at the point a checkpoint is applied.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema, TS)]
|
||||
#[serde(tag = "type", content = "value", rename_all = "snake_case")]
|
||||
#[ts(tag = "type", content = "value", rename_all = "snake_case")]
|
||||
pub enum JournalHistoryCursor {
|
||||
Start,
|
||||
AfterItem(String),
|
||||
End,
|
||||
}
|
||||
|
||||
/// Replace the current history prefix through the resolved cursor.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, TS)]
|
||||
pub struct JournalReplacePrefixCheckpoint {
|
||||
pub through: JournalHistoryCursor,
|
||||
pub replacement: Vec<JournalTranscriptItem>,
|
||||
}
|
||||
|
||||
/// Keep only the current history prefix through the resolved cursor.
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, TS)]
|
||||
pub struct JournalTruncateHistoryCheckpoint {
|
||||
pub through: JournalHistoryCursor,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, TS)]
|
||||
#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
|
||||
#[ts(tag = "type", content = "payload", rename_all = "snake_case")]
|
||||
pub enum JournalCheckpointItem {
|
||||
ReplacePrefix(JournalReplacePrefixCheckpoint),
|
||||
TruncateHistory(JournalTruncateHistoryCheckpoint),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, TS)]
|
||||
#[serde(tag = "type", content = "payload", rename_all = "snake_case")]
|
||||
#[ts(tag = "type", content = "payload", rename_all = "snake_case")]
|
||||
pub enum JournalItem {
|
||||
#[serde(rename = "history")]
|
||||
#[ts(rename = "history")]
|
||||
Transcript(JournalTranscriptItem),
|
||||
#[serde(rename = "context")]
|
||||
#[ts(rename = "context")]
|
||||
Metadata(JournalMetadataItem),
|
||||
Checkpoint(JournalCheckpointItem),
|
||||
}
|
||||
|
||||
impl From<ResponseItem> for JournalItem {
|
||||
fn from(value: ResponseItem) -> Self {
|
||||
Self::Transcript(value.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<JournalTranscriptItem> for JournalItem {
|
||||
fn from(value: JournalTranscriptItem) -> Self {
|
||||
Self::Transcript(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<JournalMetadataItem> for JournalItem {
|
||||
fn from(value: JournalMetadataItem) -> Self {
|
||||
Self::Metadata(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<JournalCheckpointItem> for JournalItem {
|
||||
fn from(value: JournalCheckpointItem) -> Self {
|
||||
Self::Checkpoint(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, JsonSchema, TS)]
|
||||
pub struct JournalEntry {
|
||||
pub key: JournalKey,
|
||||
pub item: JournalItem,
|
||||
}
|
||||
|
||||
impl JournalEntry {
|
||||
/// Creates a keyed journal entry.
|
||||
pub fn new(key: impl Into<JournalKey>, item: impl Into<JournalItem>) -> Self {
|
||||
Self {
|
||||
key: key.into(),
|
||||
item: item.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, T> From<(K, T)> for JournalEntry
|
||||
where
|
||||
K: Into<JournalKey>,
|
||||
T: Into<JournalItem>,
|
||||
{
|
||||
fn from(value: (K, T)) -> Self {
|
||||
Self::new(value.0, value.1)
|
||||
}
|
||||
}
|
||||
|
||||
pub type JournalHistoryItem = JournalTranscriptItem;
|
||||
pub type JournalContextItem = JournalMetadataItem;
|
||||
@@ -12,6 +12,7 @@ pub mod dynamic_tools;
|
||||
pub mod error;
|
||||
pub mod exec_output;
|
||||
pub mod items;
|
||||
pub mod journal;
|
||||
pub mod mcp;
|
||||
pub mod memory_citation;
|
||||
pub mod message_history;
|
||||
|
||||
@@ -22,11 +22,12 @@ codex-git-utils = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-rollout = { workspace = true }
|
||||
codex-state = { workspace = true }
|
||||
codex-journal = { workspace = true }
|
||||
prost = "0.14.3"
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio = { workspace = true, features = ["fs", "io-util", "rt", "sync"] }
|
||||
tonic = { workspace = true }
|
||||
tonic-prost = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
@@ -4,7 +4,7 @@ use codex_protocol::ThreadId;
|
||||
pub type ThreadStoreResult<T> = Result<T, ThreadStoreError>;
|
||||
|
||||
/// Error type shared by thread-store implementations.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[derive(Clone, Debug, PartialEq, Eq, thiserror::Error)]
|
||||
pub enum ThreadStoreError {
|
||||
/// The requested thread does not exist in this store.
|
||||
#[error("thread {thread_id} not found")]
|
||||
|
||||
351
codex-rs/thread-store/src/journal_writer.rs
Normal file
351
codex-rs/thread-store/src/journal_writer.rs
Normal file
@@ -0,0 +1,351 @@
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use codex_journal::JournalEntry;
|
||||
use tokio::fs::OpenOptions;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::BufWriter;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::ThreadStoreError;
|
||||
use crate::ThreadStoreResult;
|
||||
|
||||
const DEFAULT_QUEUE_CAPACITY: usize = 1024;
|
||||
|
||||
/// Async append-only JSONL journal for [`JournalEntry`] items.
|
||||
///
|
||||
/// Callers update the in-memory journal first, then enqueue the same keyed entries here for durable
|
||||
/// persistence. Writes happen on a dedicated worker task; use [`Self::flush`] or
|
||||
/// [`Self::shutdown`] when durability matters.
|
||||
pub struct JournalWriter {
|
||||
path: PathBuf,
|
||||
tx: mpsc::Sender<Command>,
|
||||
worker: Option<JoinHandle<ThreadStoreResult<()>>>,
|
||||
}
|
||||
|
||||
impl JournalWriter {
|
||||
/// Opens or creates a journal at `path`.
|
||||
pub async fn open(path: impl Into<PathBuf>) -> ThreadStoreResult<Self> {
|
||||
Self::open_with_capacity(path, DEFAULT_QUEUE_CAPACITY).await
|
||||
}
|
||||
|
||||
/// Opens or creates a journal at `path` with a custom queue capacity.
|
||||
pub async fn open_with_capacity(
|
||||
path: impl Into<PathBuf>,
|
||||
queue_capacity: usize,
|
||||
) -> ThreadStoreResult<Self> {
|
||||
if queue_capacity == 0 {
|
||||
return Err(ThreadStoreError::InvalidRequest {
|
||||
message: "journal queue capacity must be greater than zero".to_string(),
|
||||
});
|
||||
}
|
||||
|
||||
let path = path.into();
|
||||
if let Some(parent) = path.parent() {
|
||||
tokio::fs::create_dir_all(parent).await.map_err(io_error)?;
|
||||
}
|
||||
|
||||
let file = OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&path)
|
||||
.await
|
||||
.map_err(io_error)?;
|
||||
let writer = BufWriter::new(file);
|
||||
let (tx, rx) = mpsc::channel(queue_capacity);
|
||||
let worker_path = path.clone();
|
||||
let worker = tokio::spawn(async move { run_worker(worker_path, writer, rx).await });
|
||||
|
||||
Ok(Self {
|
||||
path,
|
||||
tx,
|
||||
worker: Some(worker),
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns the backing journal path.
|
||||
pub fn path(&self) -> &Path {
|
||||
self.path.as_path()
|
||||
}
|
||||
|
||||
/// Enqueues one entry for async persistence.
|
||||
pub async fn enqueue(&self, entry: JournalEntry) -> ThreadStoreResult<()> {
|
||||
self.tx
|
||||
.send(Command::Append(vec![entry]))
|
||||
.await
|
||||
.map_err(send_error)
|
||||
}
|
||||
|
||||
/// Enqueues several entries for async persistence.
|
||||
pub async fn enqueue_all<I>(&self, entries: I) -> ThreadStoreResult<()>
|
||||
where
|
||||
I: IntoIterator<Item = JournalEntry>,
|
||||
{
|
||||
let entries = entries.into_iter().collect::<Vec<_>>();
|
||||
if entries.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.tx
|
||||
.send(Command::Append(entries))
|
||||
.await
|
||||
.map_err(send_error)
|
||||
}
|
||||
|
||||
/// Waits until all previously enqueued entries are durable.
|
||||
pub async fn flush(&self) -> ThreadStoreResult<()> {
|
||||
let (reply_tx, reply_rx) = oneshot::channel();
|
||||
self.tx
|
||||
.send(Command::Flush { reply: reply_tx })
|
||||
.await
|
||||
.map_err(send_error)?;
|
||||
reply_rx.await.map_err(recv_error)?
|
||||
}
|
||||
|
||||
/// Flushes pending entries and stops the background worker.
|
||||
pub async fn shutdown(mut self) -> ThreadStoreResult<()> {
|
||||
let (reply_tx, reply_rx) = oneshot::channel();
|
||||
self.tx
|
||||
.send(Command::Shutdown { reply: reply_tx })
|
||||
.await
|
||||
.map_err(send_error)?;
|
||||
let flush_result = reply_rx.await.map_err(recv_error)?;
|
||||
|
||||
let Some(worker) = self.worker.take() else {
|
||||
return flush_result;
|
||||
};
|
||||
let worker_result = worker.await.map_err(join_error)?;
|
||||
|
||||
flush_result?;
|
||||
worker_result
|
||||
}
|
||||
}
|
||||
|
||||
enum Command {
|
||||
Append(Vec<JournalEntry>),
|
||||
Flush {
|
||||
reply: oneshot::Sender<ThreadStoreResult<()>>,
|
||||
},
|
||||
Shutdown {
|
||||
reply: oneshot::Sender<ThreadStoreResult<()>>,
|
||||
},
|
||||
}
|
||||
|
||||
async fn run_worker(
|
||||
path: PathBuf,
|
||||
mut writer: BufWriter<tokio::fs::File>,
|
||||
mut rx: mpsc::Receiver<Command>,
|
||||
) -> ThreadStoreResult<()> {
|
||||
let mut failure: Option<ThreadStoreError> = None;
|
||||
|
||||
while let Some(command) = rx.recv().await {
|
||||
match command {
|
||||
Command::Append(entries) => {
|
||||
if failure.is_none()
|
||||
&& let Err(err) = append_entries(&mut writer, entries.as_slice()).await
|
||||
{
|
||||
failure = Some(err);
|
||||
}
|
||||
}
|
||||
Command::Flush { reply } => {
|
||||
let result = flush_writer(&mut writer, failure.clone(), path.as_path()).await;
|
||||
if let Err(err) = &result {
|
||||
failure = Some(err.clone());
|
||||
}
|
||||
let _ = reply.send(result);
|
||||
}
|
||||
Command::Shutdown { reply } => {
|
||||
let result = flush_writer(&mut writer, failure.clone(), path.as_path()).await;
|
||||
let worker_result = result.clone();
|
||||
let _ = reply.send(result);
|
||||
return worker_result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
flush_writer(&mut writer, failure, path.as_path()).await
|
||||
}
|
||||
|
||||
async fn append_entries(
|
||||
writer: &mut BufWriter<tokio::fs::File>,
|
||||
entries: &[JournalEntry],
|
||||
) -> ThreadStoreResult<()> {
|
||||
for entry in entries {
|
||||
let mut line = serde_json::to_vec(entry).map_err(|source| ThreadStoreError::Internal {
|
||||
message: format!("failed to serialize journal entry: {source}"),
|
||||
})?;
|
||||
line.push(b'\n');
|
||||
writer.write_all(line.as_slice()).await.map_err(io_error)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn flush_writer(
|
||||
writer: &mut BufWriter<tokio::fs::File>,
|
||||
failure: Option<ThreadStoreError>,
|
||||
path: &Path,
|
||||
) -> ThreadStoreResult<()> {
|
||||
if let Some(err) = failure {
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
writer.flush().await.map_err(io_error)?;
|
||||
writer
|
||||
.get_mut()
|
||||
.sync_data()
|
||||
.await
|
||||
.map_err(|err| ThreadStoreError::Internal {
|
||||
message: format!("failed to sync journal at {}: {err}", path.display()),
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn io_error(err: std::io::Error) -> ThreadStoreError {
|
||||
ThreadStoreError::Internal {
|
||||
message: err.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn send_error(_: mpsc::error::SendError<Command>) -> ThreadStoreError {
|
||||
ThreadStoreError::Internal {
|
||||
message: "journal worker is not available".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn recv_error(_: oneshot::error::RecvError) -> ThreadStoreError {
|
||||
ThreadStoreError::Internal {
|
||||
message: "journal worker dropped its response channel".to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
fn join_error(err: tokio::task::JoinError) -> ThreadStoreError {
|
||||
ThreadStoreError::Internal {
|
||||
message: format!("journal worker task failed: {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use codex_journal::Journal;
|
||||
use codex_journal::JournalMetadataItem;
|
||||
use codex_journal::JournalTranscriptItem;
|
||||
use codex_journal::PromptMessage;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::tempdir;
|
||||
|
||||
use super::JournalWriter;
|
||||
|
||||
fn developer_entry(text: &str) -> codex_journal::JournalEntry {
|
||||
codex_journal::JournalEntry::new(
|
||||
["prompt", text],
|
||||
JournalMetadataItem::new(PromptMessage::developer_text(text)),
|
||||
)
|
||||
}
|
||||
|
||||
fn user_entry(text: &str) -> codex_journal::JournalEntry {
|
||||
codex_journal::JournalEntry::new(
|
||||
["history", text],
|
||||
JournalTranscriptItem {
|
||||
id: format!("history-{text}"),
|
||||
turn_id: None,
|
||||
item: ResponseItem::Message {
|
||||
id: None,
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
phase: None,
|
||||
},
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn enqueue_and_flush_persists_entries_in_order() {
|
||||
let dir = tempdir().expect("tempdir");
|
||||
let path = dir.path().join("journal.jsonl");
|
||||
let journal = JournalWriter::open(path.clone())
|
||||
.await
|
||||
.expect("journal should open");
|
||||
|
||||
journal
|
||||
.enqueue(developer_entry("first"))
|
||||
.await
|
||||
.expect("first entry should enqueue");
|
||||
journal
|
||||
.enqueue(user_entry("hello"))
|
||||
.await
|
||||
.expect("second entry should enqueue");
|
||||
journal.flush().await.expect("journal should flush");
|
||||
|
||||
let loaded = Journal::load_jsonl(path.as_path()).expect("journal should load");
|
||||
|
||||
assert_eq!(
|
||||
loaded.entries(),
|
||||
vec![developer_entry("first"), user_entry("hello")]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn enqueue_all_and_shutdown_persist_entries() {
|
||||
let dir = tempdir().expect("tempdir");
|
||||
let path = dir.path().join("journal.jsonl");
|
||||
let journal = JournalWriter::open(path.clone())
|
||||
.await
|
||||
.expect("journal should open");
|
||||
|
||||
journal
|
||||
.enqueue_all(vec![developer_entry("one"), developer_entry("two")])
|
||||
.await
|
||||
.expect("entries should enqueue");
|
||||
journal.shutdown().await.expect("journal should shut down");
|
||||
|
||||
let loaded = Journal::load_jsonl(path.as_path()).expect("journal should load");
|
||||
|
||||
assert_eq!(
|
||||
loaded.entries(),
|
||||
vec![developer_entry("one"), developer_entry("two")]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reopen_appends_to_existing_journal() {
|
||||
let dir = tempdir().expect("tempdir");
|
||||
let path = dir.path().join("journal.jsonl");
|
||||
|
||||
let first = JournalWriter::open(path.clone())
|
||||
.await
|
||||
.expect("first journal should open");
|
||||
first
|
||||
.enqueue(developer_entry("first"))
|
||||
.await
|
||||
.expect("entry should enqueue");
|
||||
first
|
||||
.shutdown()
|
||||
.await
|
||||
.expect("first journal should shut down");
|
||||
|
||||
let second = JournalWriter::open(path.clone())
|
||||
.await
|
||||
.expect("second journal should open");
|
||||
second
|
||||
.enqueue(user_entry("second"))
|
||||
.await
|
||||
.expect("entry should enqueue");
|
||||
second
|
||||
.shutdown()
|
||||
.await
|
||||
.expect("second journal should shut down");
|
||||
|
||||
let loaded = Journal::load_jsonl(path.as_path()).expect("journal should load");
|
||||
|
||||
assert_eq!(
|
||||
loaded.entries(),
|
||||
vec![developer_entry("first"), user_entry("second")]
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@
|
||||
mod error;
|
||||
#[cfg(debug_assertions)]
|
||||
mod in_memory;
|
||||
mod journal_writer;
|
||||
mod live_thread;
|
||||
mod local;
|
||||
mod remote;
|
||||
@@ -19,6 +20,7 @@ pub use error::ThreadStoreResult;
|
||||
pub use in_memory::InMemoryThreadStore;
|
||||
#[cfg(debug_assertions)]
|
||||
pub use in_memory::InMemoryThreadStoreCalls;
|
||||
pub use journal_writer::JournalWriter;
|
||||
pub use live_thread::LiveThread;
|
||||
pub use live_thread::LiveThreadInitGuard;
|
||||
pub use local::LocalThreadStore;
|
||||
|
||||
Reference in New Issue
Block a user