Files
codex/codex-rs/core/src/thread_manager.rs
Owen Lin efc8d45750 feat(app-server): experimental flag to persist extended history (#11227)
This PR adds an experimental `persist_extended_history` bool flag to
app-server thread APIs so rollout logs can retain a richer set of
EventMsgs for non-lossy Thread > Turn > ThreadItems reconstruction (i.e.
on `thread/resume`).

### Motivation
Today, our rollout recorder only persists a small subset (e.g. user
message, reasoning, assistant message) of `EventMsg` types, dropping a
good number (like command exec, file change, etc.) that are important
for reconstructing full item history for `thread/resume`, `thread/read`,
and `thread/fork`.

Some clients want to be able to resume a thread without lossiness. This
lossiness is primarily a UI thing, since what the model sees are
`ResponseItem` and not `EventMsg`.

### Approach
This change introduces an opt-in `persist_full_history` flag to preserve
those events when you start/resume/fork a thread (defaults to `false`).

This is done by adding an `EventPersistenceMode` to the rollout
recorder:
- `Limited` (existing behavior, default)
- `Extended` (new opt-in behavior)

In `Extended` mode, persist additional `EventMsg` variants needed for
non-lossy app-server `ThreadItem` reconstruction. We now store the
following ThreadItems that we didn't before:
- web search
- command execution
- patch/file changes
- MCP tool calls
- image view calls
- collab tool outcomes
- context compaction
- review mode enter/exit

For **command executions** in particular, we truncate the output using
the existing `truncate_text` from core to store an upper bound of 10,000
bytes, which is also the default value for truncating tool outputs shown
to the model. This keeps the size of the rollout file and command
execution items returned over the wire reasonable.

And we also persist `EventMsg::Error` which we can now map back to the
Turn's status and populates the Turn's error metadata.

#### Updates to EventMsgs
To truly make `thread/resume` non-lossy, we also needed to persist the
`status` on `EventMsg::CommandExecutionEndEvent` and
`EventMsg::PatchApplyEndEvent`. Previously it was not obvious whether a
command failed or was declined (similar for apply_patch). These
EventMsgs were never persisted before so I made it a required field.
2026-02-12 19:34:22 +00:00

677 lines
23 KiB
Rust

use crate::AuthManager;
use crate::CodexAuth;
use crate::ModelProviderInfo;
use crate::agent::AgentControl;
use crate::codex::Codex;
use crate::codex::CodexSpawnOk;
use crate::codex::INITIAL_SUBMIT_ID;
use crate::codex_thread::CodexThread;
use crate::config::Config;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::file_watcher::FileWatcher;
use crate::file_watcher::FileWatcherEvent;
use crate::models_manager::manager::ModelsManager;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::SessionConfiguredEvent;
use crate::rollout::RolloutRecorder;
use crate::rollout::truncation;
use crate::skills::SkillsManager;
use codex_protocol::ThreadId;
use codex_protocol::config_types::CollaborationModeMask;
use codex_protocol::openai_models::ModelPreset;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::McpServerRefreshConfig;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use tokio::runtime::Handle;
use tokio::runtime::RuntimeFlavor;
use tokio::sync::RwLock;
use tokio::sync::broadcast;
use tracing::warn;
const THREAD_CREATED_CHANNEL_CAPACITY: usize = 1024;
/// Test-only override for enabling thread-manager behaviors used by integration
/// tests.
///
/// In production builds this value should remain at its default (`false`) and
/// must not be toggled.
static FORCE_TEST_THREAD_MANAGER_BEHAVIOR: AtomicBool = AtomicBool::new(false);
type CapturedOps = Vec<(ThreadId, Op)>;
type SharedCapturedOps = Arc<std::sync::Mutex<CapturedOps>>;
pub(crate) fn set_thread_manager_test_mode_for_tests(enabled: bool) {
FORCE_TEST_THREAD_MANAGER_BEHAVIOR.store(enabled, Ordering::Relaxed);
}
fn should_use_test_thread_manager_behavior() -> bool {
FORCE_TEST_THREAD_MANAGER_BEHAVIOR.load(Ordering::Relaxed)
}
struct TempCodexHomeGuard {
path: PathBuf,
}
impl Drop for TempCodexHomeGuard {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.path);
}
}
fn build_file_watcher(codex_home: PathBuf, skills_manager: Arc<SkillsManager>) -> Arc<FileWatcher> {
if should_use_test_thread_manager_behavior()
&& let Ok(handle) = Handle::try_current()
&& handle.runtime_flavor() == RuntimeFlavor::CurrentThread
{
// The real watcher spins background tasks that can starve the
// current-thread test runtime and cause event waits to time out.
warn!("using noop file watcher under current-thread test runtime");
return Arc::new(FileWatcher::noop());
}
let file_watcher = match FileWatcher::new(codex_home) {
Ok(file_watcher) => Arc::new(file_watcher),
Err(err) => {
warn!("failed to initialize file watcher: {err}");
Arc::new(FileWatcher::noop())
}
};
let mut rx = file_watcher.subscribe();
let skills_manager = Arc::clone(&skills_manager);
if let Ok(handle) = Handle::try_current() {
handle.spawn(async move {
loop {
match rx.recv().await {
Ok(FileWatcherEvent::SkillsChanged { .. }) => {
skills_manager.clear_cache();
}
Err(broadcast::error::RecvError::Closed) => break,
Err(broadcast::error::RecvError::Lagged(_)) => continue,
}
}
});
} else {
warn!("file watcher listener skipped: no Tokio runtime available");
}
file_watcher
}
/// Represents a newly created Codex thread (formerly called a conversation), including the first event
/// (which is [`EventMsg::SessionConfigured`]).
pub struct NewThread {
pub thread_id: ThreadId,
pub thread: Arc<CodexThread>,
pub session_configured: SessionConfiguredEvent,
}
/// [`ThreadManager`] is responsible for creating threads and maintaining
/// them in memory.
pub struct ThreadManager {
state: Arc<ThreadManagerState>,
_test_codex_home_guard: Option<TempCodexHomeGuard>,
}
/// Shared, `Arc`-owned state for [`ThreadManager`]. This `Arc` is required to have a single
/// `Arc` reference that can be downgraded to by `AgentControl` while preventing every single
/// function to require an `Arc<&Self>`.
pub(crate) struct ThreadManagerState {
threads: Arc<RwLock<HashMap<ThreadId, Arc<CodexThread>>>>,
thread_created_tx: broadcast::Sender<ThreadId>,
auth_manager: Arc<AuthManager>,
models_manager: Arc<ModelsManager>,
skills_manager: Arc<SkillsManager>,
file_watcher: Arc<FileWatcher>,
session_source: SessionSource,
// Captures submitted ops for testing purpose when test mode is enabled.
ops_log: Option<SharedCapturedOps>,
}
impl ThreadManager {
pub fn new(
codex_home: PathBuf,
auth_manager: Arc<AuthManager>,
session_source: SessionSource,
) -> Self {
let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY);
let skills_manager = Arc::new(SkillsManager::new(codex_home.clone()));
let file_watcher = build_file_watcher(codex_home.clone(), Arc::clone(&skills_manager));
Self {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
thread_created_tx,
models_manager: Arc::new(ModelsManager::new(codex_home, auth_manager.clone())),
skills_manager,
file_watcher,
auth_manager,
session_source,
ops_log: should_use_test_thread_manager_behavior()
.then(|| Arc::new(std::sync::Mutex::new(Vec::new()))),
}),
_test_codex_home_guard: None,
}
}
/// Construct with a dummy AuthManager containing the provided CodexAuth.
/// Used for integration tests: should not be used by ordinary business logic.
pub(crate) fn with_models_provider_for_tests(
auth: CodexAuth,
provider: ModelProviderInfo,
) -> Self {
set_thread_manager_test_mode_for_tests(true);
let codex_home = std::env::temp_dir().join(format!(
"codex-thread-manager-test-{}",
uuid::Uuid::new_v4()
));
std::fs::create_dir_all(&codex_home)
.unwrap_or_else(|err| panic!("temp codex home dir create failed: {err}"));
let mut manager =
Self::with_models_provider_and_home_for_tests(auth, provider, codex_home.clone());
manager._test_codex_home_guard = Some(TempCodexHomeGuard { path: codex_home });
manager
}
/// Construct with a dummy AuthManager containing the provided CodexAuth and codex home.
/// Used for integration tests: should not be used by ordinary business logic.
pub(crate) fn with_models_provider_and_home_for_tests(
auth: CodexAuth,
provider: ModelProviderInfo,
codex_home: PathBuf,
) -> Self {
set_thread_manager_test_mode_for_tests(true);
let auth_manager = AuthManager::from_auth_for_testing(auth);
let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY);
let skills_manager = Arc::new(SkillsManager::new(codex_home.clone()));
let file_watcher = build_file_watcher(codex_home.clone(), Arc::clone(&skills_manager));
Self {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
thread_created_tx,
models_manager: Arc::new(ModelsManager::with_provider_for_tests(
codex_home,
auth_manager.clone(),
provider,
)),
skills_manager,
file_watcher,
auth_manager,
session_source: SessionSource::Exec,
ops_log: should_use_test_thread_manager_behavior()
.then(|| Arc::new(std::sync::Mutex::new(Vec::new()))),
}),
_test_codex_home_guard: None,
}
}
pub fn session_source(&self) -> SessionSource {
self.state.session_source.clone()
}
pub fn skills_manager(&self) -> Arc<SkillsManager> {
self.state.skills_manager.clone()
}
pub fn subscribe_file_watcher(&self) -> broadcast::Receiver<FileWatcherEvent> {
self.state.file_watcher.subscribe()
}
pub fn get_models_manager(&self) -> Arc<ModelsManager> {
self.state.models_manager.clone()
}
pub async fn list_models(
&self,
config: &Config,
refresh_strategy: crate::models_manager::manager::RefreshStrategy,
) -> Vec<ModelPreset> {
self.state
.models_manager
.list_models(config, refresh_strategy)
.await
}
pub fn list_collaboration_modes(&self) -> Vec<CollaborationModeMask> {
self.state.models_manager.list_collaboration_modes()
}
pub async fn list_thread_ids(&self) -> Vec<ThreadId> {
self.state.threads.read().await.keys().copied().collect()
}
pub async fn refresh_mcp_servers(&self, refresh_config: McpServerRefreshConfig) {
let threads = self
.state
.threads
.read()
.await
.values()
.cloned()
.collect::<Vec<_>>();
for thread in threads {
if let Err(err) = thread
.submit(Op::RefreshMcpServers {
config: refresh_config.clone(),
})
.await
{
warn!("failed to request MCP server refresh: {err}");
}
}
}
pub fn subscribe_thread_created(&self) -> broadcast::Receiver<ThreadId> {
self.state.thread_created_tx.subscribe()
}
pub async fn get_thread(&self, thread_id: ThreadId) -> CodexResult<Arc<CodexThread>> {
self.state.get_thread(thread_id).await
}
pub async fn start_thread(&self, config: Config) -> CodexResult<NewThread> {
self.start_thread_with_tools(config, Vec::new(), false)
.await
}
pub async fn start_thread_with_tools(
&self,
config: Config,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
) -> CodexResult<NewThread> {
self.state
.spawn_thread(
config,
InitialHistory::New,
Arc::clone(&self.state.auth_manager),
self.agent_control(),
dynamic_tools,
persist_extended_history,
)
.await
}
pub async fn resume_thread_from_rollout(
&self,
config: Config,
rollout_path: PathBuf,
auth_manager: Arc<AuthManager>,
) -> CodexResult<NewThread> {
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
self.resume_thread_with_history(config, initial_history, auth_manager, false)
.await
}
pub async fn resume_thread_with_history(
&self,
config: Config,
initial_history: InitialHistory,
auth_manager: Arc<AuthManager>,
persist_extended_history: bool,
) -> CodexResult<NewThread> {
self.state
.spawn_thread(
config,
initial_history,
auth_manager,
self.agent_control(),
Vec::new(),
persist_extended_history,
)
.await
}
/// Removes the thread from the manager's internal map, though the thread is stored
/// as `Arc<CodexThread>`, it is possible that other references to it exist elsewhere.
/// Returns the thread if the thread was found and removed.
pub async fn remove_thread(&self, thread_id: &ThreadId) -> Option<Arc<CodexThread>> {
self.state.threads.write().await.remove(thread_id)
}
/// Closes all threads open in this ThreadManager
pub async fn remove_and_close_all_threads(&self) -> CodexResult<()> {
for thread in self.state.threads.read().await.values() {
thread.submit(Op::Shutdown).await?;
}
self.state.threads.write().await.clear();
Ok(())
}
/// Fork an existing thread by taking messages up to the given position (not including
/// the message at the given position) and starting a new thread with identical
/// configuration (unless overridden by the caller's `config`). The new thread will have
/// a fresh id. Pass `usize::MAX` to keep the full rollout history.
pub async fn fork_thread(
&self,
nth_user_message: usize,
config: Config,
path: PathBuf,
persist_extended_history: bool,
) -> CodexResult<NewThread> {
let history = RolloutRecorder::get_rollout_history(&path).await?;
let history = truncate_before_nth_user_message(history, nth_user_message);
self.state
.spawn_thread(
config,
history,
Arc::clone(&self.state.auth_manager),
self.agent_control(),
Vec::new(),
persist_extended_history,
)
.await
}
pub(crate) fn agent_control(&self) -> AgentControl {
AgentControl::new(Arc::downgrade(&self.state))
}
#[cfg(test)]
pub(crate) fn captured_ops(&self) -> Vec<(ThreadId, Op)> {
self.state
.ops_log
.as_ref()
.and_then(|ops_log| ops_log.lock().ok().map(|log| log.clone()))
.unwrap_or_default()
}
}
impl ThreadManagerState {
/// Fetch a thread by ID or return ThreadNotFound.
pub(crate) async fn get_thread(&self, thread_id: ThreadId) -> CodexResult<Arc<CodexThread>> {
let threads = self.threads.read().await;
threads
.get(&thread_id)
.cloned()
.ok_or_else(|| CodexErr::ThreadNotFound(thread_id))
}
/// Send an operation to a thread by ID.
pub(crate) async fn send_op(&self, thread_id: ThreadId, op: Op) -> CodexResult<String> {
let thread = self.get_thread(thread_id).await?;
if let Some(ops_log) = &self.ops_log
&& let Ok(mut log) = ops_log.lock()
{
log.push((thread_id, op.clone()));
}
thread.submit(op).await
}
/// Remove a thread from the manager by ID, returning it when present.
pub(crate) async fn remove_thread(&self, thread_id: &ThreadId) -> Option<Arc<CodexThread>> {
self.threads.write().await.remove(thread_id)
}
/// Spawn a new thread with no history using a provided config.
pub(crate) async fn spawn_new_thread(
&self,
config: Config,
agent_control: AgentControl,
) -> CodexResult<NewThread> {
self.spawn_new_thread_with_source(config, agent_control, self.session_source.clone(), false)
.await
}
pub(crate) async fn spawn_new_thread_with_source(
&self,
config: Config,
agent_control: AgentControl,
session_source: SessionSource,
persist_extended_history: bool,
) -> CodexResult<NewThread> {
self.spawn_thread_with_source(
config,
InitialHistory::New,
Arc::clone(&self.auth_manager),
agent_control,
session_source,
Vec::new(),
persist_extended_history,
)
.await
}
pub(crate) async fn resume_thread_from_rollout_with_source(
&self,
config: Config,
rollout_path: PathBuf,
agent_control: AgentControl,
session_source: SessionSource,
) -> CodexResult<NewThread> {
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
self.spawn_thread_with_source(
config,
initial_history,
Arc::clone(&self.auth_manager),
agent_control,
session_source,
Vec::new(),
false,
)
.await
}
/// Spawn a new thread with optional history and register it with the manager.
pub(crate) async fn spawn_thread(
&self,
config: Config,
initial_history: InitialHistory,
auth_manager: Arc<AuthManager>,
agent_control: AgentControl,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
) -> CodexResult<NewThread> {
self.spawn_thread_with_source(
config,
initial_history,
auth_manager,
agent_control,
self.session_source.clone(),
dynamic_tools,
persist_extended_history,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn spawn_thread_with_source(
&self,
config: Config,
initial_history: InitialHistory,
auth_manager: Arc<AuthManager>,
agent_control: AgentControl,
session_source: SessionSource,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
) -> CodexResult<NewThread> {
self.file_watcher.register_config(&config);
let CodexSpawnOk {
codex, thread_id, ..
} = Codex::spawn(
config,
auth_manager,
Arc::clone(&self.models_manager),
Arc::clone(&self.skills_manager),
Arc::clone(&self.file_watcher),
initial_history,
session_source,
agent_control,
dynamic_tools,
persist_extended_history,
)
.await?;
self.finalize_thread_spawn(codex, thread_id).await
}
async fn finalize_thread_spawn(
&self,
codex: Codex,
thread_id: ThreadId,
) -> CodexResult<NewThread> {
let event = codex.next_event().await?;
let session_configured = match event {
Event {
id,
msg: EventMsg::SessionConfigured(session_configured),
} if id == INITIAL_SUBMIT_ID => session_configured,
_ => {
return Err(CodexErr::SessionConfiguredNotFirstEvent);
}
};
let thread = Arc::new(CodexThread::new(
codex,
session_configured.rollout_path.clone(),
));
let mut threads = self.threads.write().await;
threads.insert(thread_id, thread.clone());
Ok(NewThread {
thread_id,
thread,
session_configured,
})
}
pub(crate) fn notify_thread_created(&self, thread_id: ThreadId) {
let _ = self.thread_created_tx.send(thread_id);
}
}
/// Return a prefix of `items` obtained by cutting strictly before the nth user message
/// (0-based) and all items that follow it.
fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> InitialHistory {
let items: Vec<RolloutItem> = history.get_rollout_items();
let rolled = truncation::truncate_rollout_before_nth_user_message_from_start(&items, n);
if rolled.is_empty() {
InitialHistory::New
} else {
InitialHistory::Forked(rolled)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::codex::make_session_and_context;
use assert_matches::assert_matches;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ReasoningItemReasoningSummary;
use codex_protocol::models::ResponseItem;
use pretty_assertions::assert_eq;
fn user_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
fn assistant_msg(text: &str) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: text.to_string(),
}],
end_turn: None,
phase: None,
}
}
#[test]
fn drops_from_last_user_only() {
let items = [
user_msg("u1"),
assistant_msg("a1"),
assistant_msg("a2"),
user_msg("u2"),
assistant_msg("a3"),
ResponseItem::Reasoning {
id: "r1".to_string(),
summary: vec![ReasoningItemReasoningSummary::SummaryText {
text: "s".to_string(),
}],
content: None,
encrypted_content: None,
},
ResponseItem::FunctionCall {
id: None,
call_id: "c1".to_string(),
name: "tool".to_string(),
arguments: "{}".to_string(),
},
assistant_msg("a4"),
];
let initial: Vec<RolloutItem> = items
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.collect();
let truncated = truncate_before_nth_user_message(InitialHistory::Forked(initial), 1);
let got_items = truncated.get_rollout_items();
let expected_items = vec![
RolloutItem::ResponseItem(items[0].clone()),
RolloutItem::ResponseItem(items[1].clone()),
RolloutItem::ResponseItem(items[2].clone()),
];
assert_eq!(
serde_json::to_value(&got_items).unwrap(),
serde_json::to_value(&expected_items).unwrap()
);
let initial2: Vec<RolloutItem> = items
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.collect();
let truncated2 = truncate_before_nth_user_message(InitialHistory::Forked(initial2), 2);
assert_matches!(truncated2, InitialHistory::New);
}
#[tokio::test]
async fn ignores_session_prefix_messages_when_truncating() {
let (session, turn_context) = make_session_and_context().await;
let mut items = session.build_initial_context(&turn_context).await;
items.push(user_msg("feature request"));
items.push(assistant_msg("ack"));
items.push(user_msg("second question"));
items.push(assistant_msg("answer"));
let rollout_items: Vec<RolloutItem> = items
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.collect();
let truncated = truncate_before_nth_user_message(InitialHistory::Forked(rollout_items), 1);
let got_items = truncated.get_rollout_items();
let expected: Vec<RolloutItem> = vec![
RolloutItem::ResponseItem(items[0].clone()),
RolloutItem::ResponseItem(items[1].clone()),
RolloutItem::ResponseItem(items[2].clone()),
RolloutItem::ResponseItem(items[3].clone()),
];
assert_eq!(
serde_json::to_value(&got_items).unwrap(),
serde_json::to_value(&expected).unwrap()
);
}
}