Decoupling of TUI

This commit is contained in:
jif-oai
2025-11-19 14:53:17 +00:00
parent b9f260057c
commit 7ec1311aff
12 changed files with 202 additions and 165 deletions

View File

@@ -90,6 +90,7 @@ use crate::protocol::ReasoningRawContentDeltaEvent;
use crate::protocol::ReviewDecision;
use crate::protocol::SandboxCommandAssessment;
use crate::protocol::SandboxPolicy;
use crate::protocol::SaveSessionResponseEvent;
use crate::protocol::SessionConfiguredEvent;
use crate::protocol::StreamErrorEvent;
use crate::protocol::Submission;
@@ -100,6 +101,8 @@ use crate::protocol::TurnDiffEvent;
use crate::protocol::WarningEvent;
use crate::rollout::RolloutRecorder;
use crate::rollout::RolloutRecorderParams;
use crate::saved_sessions::build_saved_session_entry;
use crate::saved_sessions::upsert_saved_session;
use crate::shell;
use crate::state::ActiveTurn;
use crate::state::SessionServices;
@@ -133,6 +136,7 @@ use codex_protocol::user_input::UserInput;
use codex_utils_readiness::Readiness;
use codex_utils_readiness::ReadinessFlag;
use codex_utils_tokenizer::warm_model_cache;
use std::path::Path;
/// The high-level interface to the Codex system.
/// It operates as a queue pair where you send submissions and receive events.
@@ -649,6 +653,44 @@ impl Session {
}
}
pub(crate) async fn rollout_path(&self) -> CodexResult<PathBuf> {
let guard = self.services.rollout.lock().await;
let Some(rec) = guard.as_ref() else {
return Err(CodexErr::Fatal(
"Rollout recorder is not initialized; cannot save session.".to_string(),
));
};
Ok(rec.rollout_path.clone())
}
pub(crate) async fn model(&self) -> String {
let state = self.state.lock().await;
state.session_configuration.model.clone()
}
pub(crate) async fn save_session(
&self,
codex_home: &Path,
name: &str,
) -> CodexResult<crate::SavedSessionEntry> {
let trimmed = name.trim();
if trimmed.is_empty() {
return Err(CodexErr::Fatal("Usage: /save <name>".to_string()));
}
let rollout_path = self.rollout_path().await?;
self.flush_rollout()
.await
.map_err(|e| CodexErr::Fatal(format!("failed to flush rollout recorder: {e}")))?;
self.set_session_name(Some(trimmed.to_string()))
.await
.map_err(|e| CodexErr::Fatal(format!("failed to write session name: {e}")))?;
let entry =
build_saved_session_entry(trimmed.to_string(), rollout_path, self.model().await)
.await?;
upsert_saved_session(codex_home, entry.clone()).await?;
Ok(entry)
}
fn next_internal_sub_id(&self) -> String {
let id = self
.next_internal_sub_id
@@ -712,11 +754,8 @@ impl Session {
if persist && !rollout_items.is_empty() {
// Drop legacy SessionMeta lines from the source rollout so the forked
// session only contains its own SessionMeta written by the recorder.
let filtered: Vec<_> = rollout_items
.iter()
.filter(|&item| !matches!(item, RolloutItem::SessionMeta(_)))
.cloned()
.collect();
let filtered =
InitialHistory::Forked(rollout_items.clone()).without_session_meta();
if !filtered.is_empty() {
self.persist_rollout_items(&filtered).await;
}
@@ -1442,6 +1481,9 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
Op::Review { review_request } => {
handlers::review(&sess, &config, sub.id.clone(), review_request).await;
}
Op::SaveSession { name } => {
handlers::save_session(&sess, &config, sub.id.clone(), name).await;
}
_ => {} // Ignore unknown ops; enum is non_exhaustive to allow extensions.
}
}
@@ -1454,6 +1496,7 @@ mod handlers {
use crate::codex::SessionSettingsUpdate;
use crate::codex::TurnContext;
use crate::SavedSessionEntry;
use crate::codex::spawn_review_thread;
use crate::config::Config;
use crate::mcp::auth::compute_auth_statuses;
@@ -1469,6 +1512,7 @@ mod handlers {
use codex_protocol::protocol::Op;
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::SaveSessionResponseEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::user_input::UserInput;
@@ -1690,6 +1734,35 @@ mod handlers {
.await;
}
pub async fn save_session(
sess: &Arc<Session>,
config: &Arc<crate::config::Config>,
sub_id: String,
name: String,
) {
match sess.save_session(&config.codex_home, &name).await {
Ok(entry) => {
let event = Event {
id: sub_id,
msg: EventMsg::SaveSessionResponse(SaveSessionResponseEvent {
name: entry.name,
rollout_path: entry.rollout_path,
conversation_id: entry.conversation_id,
}),
};
sess.send_event_raw(event).await;
}
Err(err) => {
let message = format!("Failed to save session '{name}': {err}");
let event = Event {
id: sub_id,
msg: EventMsg::Error(ErrorEvent { message }),
};
sess.send_event_raw(event).await;
}
}
}
pub async fn shutdown(sess: &Arc<Session>, sub_id: String) -> bool {
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
info!("Shutting down Codex instance");

View File

@@ -48,4 +48,16 @@ impl CodexConversation {
pub async fn set_session_name(&self, name: Option<String>) -> CodexResult<()> {
Ok(self.session.set_session_name(name).await?)
}
pub async fn model(&self) -> String {
self.session.model().await
}
pub async fn save_session(
&self,
codex_home: &std::path::Path,
name: &str,
) -> CodexResult<crate::SavedSessionEntry> {
self.session.save_session(codex_home, name).await
}
}

View File

@@ -12,6 +12,7 @@ use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::SessionConfiguredEvent;
use crate::rollout::RolloutRecorder;
use crate::saved_sessions::resolve_rollout_path;
use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
@@ -19,6 +20,7 @@ use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
@@ -57,6 +59,7 @@ impl ConversationManager {
)
}
/// Start a brand new conversation with default initial history.
pub async fn new_conversation(&self, config: Config) -> CodexResult<NewConversation> {
self.spawn_conversation(config, self.auth_manager.clone())
.await
@@ -129,6 +132,7 @@ impl ConversationManager {
.ok_or_else(|| CodexErr::ConversationNotFound(conversation_id))
}
/// Resume a conversation from an on-disk rollout file.
pub async fn resume_conversation_from_rollout(
&self,
config: Config,
@@ -140,6 +144,23 @@ impl ConversationManager {
.await
}
/// Resume a conversation by saved-session name or rollout id string.
pub async fn resume_conversation_from_identifier(
&self,
config: Config,
identifier: &str,
auth_manager: Arc<AuthManager>,
) -> CodexResult<NewConversation> {
let Some(path) = resolve_rollout_path(&config.codex_home, identifier).await? else {
return Err(CodexErr::Fatal(format!(
"No saved session or rollout found for '{identifier}'"
)));
};
self.resume_conversation_from_rollout(config, path, auth_manager)
.await
}
/// Resume a conversation from provided rollout history items.
pub async fn resume_conversation_with_history(
&self,
config: Config,
@@ -160,6 +181,7 @@ impl ConversationManager {
self.finalize_spawn(codex, conversation_id, session).await
}
/// Fork a new conversation from the given rollout path.
pub async fn fork_from_rollout(
&self,
config: Config,
@@ -176,6 +198,36 @@ impl ConversationManager {
.await
}
/// Fork a new conversation from a saved-session name or rollout id string.
pub async fn fork_from_identifier(
&self,
config: Config,
identifier: &str,
auth_manager: Arc<AuthManager>,
) -> CodexResult<NewConversation> {
let Some(path) = resolve_rollout_path(&config.codex_home, identifier).await? else {
return Err(CodexErr::Fatal(format!(
"No saved session or rollout found for '{identifier}'"
)));
};
self.fork_from_rollout(config, path, auth_manager).await
}
/// Persist a human-friendly session name and record it in saved_sessions.json.
pub async fn save_session(
&self,
conversation_id: ConversationId,
codex_home: &Path,
name: &str,
) -> CodexResult<crate::SavedSessionEntry> {
let trimmed = name.trim();
if trimmed.is_empty() {
return Err(CodexErr::Fatal("Usage: /save <name>".to_string()));
}
let conversation = self.get_conversation(conversation_id).await?;
conversation.save_session(codex_home, trimmed).await
}
/// Removes the conversation from the manager's internal map, though the
/// conversation is stored as `Arc<CodexConversation>`, it is possible that
/// other references to it exist elsewhere. Returns the conversation if the

View File

@@ -86,6 +86,7 @@ pub use rollout::list::read_head_for_summary;
pub use saved_sessions::SavedSessionEntry;
pub use saved_sessions::build_saved_session_entry;
pub use saved_sessions::list_saved_sessions;
pub use saved_sessions::resolve_rollout_path;
pub use saved_sessions::resolve_saved_session;
pub use saved_sessions::upsert_saved_session;
mod function_tool;

View File

@@ -66,6 +66,7 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::ExecApprovalRequest(_)
| EventMsg::ApplyPatchApprovalRequest(_)
| EventMsg::BackgroundEvent(_)
| EventMsg::SaveSessionResponse(_)
| EventMsg::StreamError(_)
| EventMsg::PatchApplyBegin(_)
| EventMsg::PatchApplyEnd(_)

View File

@@ -1,4 +1,5 @@
use crate::error::Result;
use crate::find_conversation_path_by_id_str;
use crate::rollout::list::read_head_for_summary;
use codex_protocol::ConversationId;
use codex_protocol::protocol::SessionMetaLine;
@@ -11,6 +12,7 @@ use std::path::Path;
use std::path::PathBuf;
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use tracing::warn;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
@@ -124,3 +126,21 @@ pub async fn list_saved_sessions(codex_home: &Path) -> Result<Vec<SavedSessionEn
entries.sort_by(|a, b| b.saved_at.cmp(&a.saved_at));
Ok(entries)
}
/// Resolve a rollout path from either a saved-session name or rollout id string.
/// Returns `Ok(None)` when nothing matches.
pub async fn resolve_rollout_path(codex_home: &Path, identifier: &str) -> Result<Option<PathBuf>> {
if let Some(entry) = resolve_saved_session(codex_home, identifier).await? {
if entry.rollout_path.exists() {
return Ok(Some(entry.rollout_path));
}
warn!(
"saved session '{}' points to missing rollout at {}",
identifier,
entry.rollout_path.display()
);
}
find_conversation_path_by_id_str(codex_home, identifier)
.await
.map_err(Into::into) // todo jif
}

View File

@@ -85,12 +85,11 @@ async fn save_session(
name: &str,
codex: &Arc<CodexConversation>,
config: &Config,
model: &str,
) -> Result<SavedSessionEntry> {
codex.flush_rollout().await?;
codex.set_session_name(Some(name.to_string())).await?;
let entry =
build_saved_session_entry(name.to_string(), codex.rollout_path(), model.to_string())
build_saved_session_entry(name.to_string(), codex.rollout_path(), codex.model().await)
.await?;
upsert_saved_session(&config.codex_home, entry.clone()).await?;
Ok(entry)
@@ -108,13 +107,7 @@ async fn save_and_resume_by_name() -> Result<()> {
submit_text(&initial.codex, "first turn").await?;
let name = "alpha";
let entry = save_session(
name,
&initial.codex,
&initial.config,
&initial.session_configured.model,
)
.await?;
let entry = save_session(name, &initial.codex, &initial.config).await?;
let resolved = resolve_saved_session(&initial.config.codex_home, name)
.await?
.expect("saved session");
@@ -148,13 +141,7 @@ async fn save_and_fork_by_name() -> Result<()> {
let initial = builder.build(&server).await?;
submit_text(&initial.codex, "original").await?;
let entry = save_session(
"forkable",
&initial.codex,
&initial.config,
&initial.session_configured.model,
)
.await?;
let entry = save_session("forkable", &initial.codex, &initial.config).await?;
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec);
@@ -199,13 +186,7 @@ async fn forked_messages_do_not_touch_original() -> Result<()> {
let initial = builder.build(&server).await?;
submit_text(&initial.codex, "first").await?;
let entry = save_session(
"branch",
&initial.codex,
&initial.config,
&initial.session_configured.model,
)
.await?;
let entry = save_session("branch", &initial.codex, &initial.config).await?;
let baseline_items = rollout_items_without_meta(&entry.rollout_path);
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
@@ -257,13 +238,7 @@ async fn resumed_messages_are_present_in_new_fork() -> Result<()> {
let initial = builder.build(&server).await?;
submit_text(&initial.codex, "start").await?;
let entry = save_session(
"seed",
&initial.codex,
&initial.config,
&initial.session_configured.model,
)
.await?;
let entry = save_session("seed", &initial.codex, &initial.config).await?;
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec);
@@ -318,23 +293,11 @@ async fn duplicate_name_overwrites_entry() -> Result<()> {
let first = builder.build(&server).await?;
submit_text(&first.codex, "first session").await?;
let name = "shared";
let entry_one = save_session(
name,
&first.codex,
&first.config,
&first.session_configured.model,
)
.await?;
let entry_one = save_session(name, &first.codex, &first.config).await?;
let second = builder.build(&server).await?;
submit_text(&second.codex, "second session").await?;
let entry_two = save_session(
name,
&second.codex,
&second.config,
&second.session_configured.model,
)
.await?;
let entry_two = save_session(name, &second.codex, &second.config).await?;
let resolved = resolve_saved_session(&second.config.codex_home, name)
.await?
@@ -356,20 +319,8 @@ async fn same_session_multiple_names() -> Result<()> {
let session = builder.build(&server).await?;
submit_text(&session.codex, "save twice").await?;
let entry_first = save_session(
"first",
&session.codex,
&session.config,
&session.session_configured.model,
)
.await?;
let entry_second = save_session(
"second",
&session.codex,
&session.config,
&session.session_configured.model,
)
.await?;
let entry_first = save_session("first", &session.codex, &session.config).await?;
let entry_second = save_session("second", &session.codex, &session.config).await?;
let resolved_first = resolve_saved_session(&session.config.codex_home, "first")
.await?

View File

@@ -102,6 +102,9 @@ pub enum Op {
final_output_json_schema: Option<Value>,
},
/// Persist the current session under a user-provided name.
SaveSession { name: String },
/// Override parts of the persistent turn context for subsequent turns.
///
/// All fields are optional; when omitted, the existing value is preserved.
@@ -513,6 +516,9 @@ pub enum EventMsg {
BackgroundEvent(BackgroundEventEvent),
/// Result of a save-session request.
SaveSessionResponse(SaveSessionResponseEvent),
UndoStarted(UndoStartedEvent),
UndoCompleted(UndoCompletedEvent),
@@ -1028,6 +1034,13 @@ impl InitialHistory {
}
}
pub fn without_session_meta(&self) -> Vec<RolloutItem> {
self.get_rollout_items()
.into_iter()
.filter(|item| !matches!(item, RolloutItem::SessionMeta(_)))
.collect()
}
pub fn get_event_msgs(&self) -> Option<Vec<EventMsg>> {
match self {
InitialHistory::New => None,
@@ -1365,6 +1378,13 @@ pub struct StreamInfoEvent {
pub message: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct SaveSessionResponseEvent {
pub name: String,
pub rollout_path: PathBuf,
pub conversation_id: ConversationId,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct PatchApplyBeginEvent {
/// Identifier so this can be paired with the PatchApplyEnd event.

View File

@@ -27,8 +27,6 @@ use codex_common::model_presets::ModelUpgrade;
use codex_common::model_presets::all_model_presets;
use codex_core::AuthManager;
use codex_core::ConversationManager;
use codex_core::SavedSessionEntry;
use codex_core::build_saved_session_entry;
use codex_core::config::Config;
use codex_core::config::edit::ConfigEditsBuilder;
use codex_core::error::CodexErr;
@@ -41,7 +39,6 @@ use codex_core::protocol::Op;
use codex_core::protocol::SessionSource;
use codex_core::protocol::TokenUsage;
use codex_core::protocol_config_types::ReasoningEffort as ReasoningEffortConfig;
use codex_core::upsert_saved_session;
use codex_protocol::ConversationId;
use color_eyre::eyre::Result;
use color_eyre::eyre::WrapErr;
@@ -440,70 +437,6 @@ impl App {
Ok(true)
}
async fn save_session(&mut self, requested_name: String) {
let name = requested_name.trim().to_string();
if name.is_empty() {
self.chat_widget
.add_error_message("Usage: /save <name>".to_string());
return;
}
match self.try_save_session(&name).await {
Ok(entry) => {
let name = &entry.name;
self.chat_widget.add_info_message(
format!(
"Saved session '{name}' (conversation {}).",
entry.conversation_id
),
Some(format!(
"Resume with `codex resume {name}` or fork with `codex fork {name}`.",
)),
);
}
Err(error) => self
.chat_widget
.add_error_message(format!("Failed to save session '{name}': {error}")),
}
}
async fn try_save_session(&mut self, name: &str) -> Result<SavedSessionEntry, CodexErr> {
// Normalize and validate the user-provided name early so downstream async work
// only runs for actionable requests.
if name.is_empty() {
return Err(CodexErr::Fatal("Usage: /save <name>".to_string()));
}
// Capture identifiers from the active chat widget; these are cheap and fast.
let conversation_id = self.chat_widget.conversation_id().ok_or_else(|| {
CodexErr::Fatal("Session is not ready yet; try /save again in a moment.".to_string())
})?;
let rollout_path = self.chat_widget.rollout_path().ok_or_else(|| {
CodexErr::Fatal(
"Rollout path is not available yet; try /save again shortly.".to_string(),
)
})?;
// Resolve the conversation handle; all subsequent operations use this shared reference.
let conversation = self.server.get_conversation(conversation_id).await?;
// Ensure the rollout is fully flushed before snapshotting metadata.
conversation.flush_rollout().await?;
// Persist the human-friendly name into the SessionMeta line.
conversation
.set_session_name(Some(name.to_string()))
.await?;
// Build and persist the saved-session entry on disk.
let model = self.chat_widget.config_ref().model.clone();
let entry =
build_saved_session_entry(name.to_string(), rollout_path.clone(), model).await?;
upsert_saved_session(&self.config.codex_home, entry.clone()).await?;
Ok(entry)
}
async fn handle_event(&mut self, tui: &mut tui::Tui, event: AppEvent) -> Result<bool> {
match event {
AppEvent::NewSession => {
@@ -586,9 +519,6 @@ impl App {
AppEvent::ConversationHistory(ev) => {
self.on_conversation_history_for_backtrack(tui, ev).await?;
}
AppEvent::SaveSession { name } => {
self.save_session(name).await;
}
AppEvent::ExitRequest => {
return Ok(false);
}

View File

@@ -144,11 +144,6 @@ pub(crate) enum AppEvent {
/// Forwarded conversation history snapshot from the current conversation.
ConversationHistory(ConversationPathResponseEvent),
/// Save the current session under a user-provided name.
SaveSession {
name: String,
},
/// Open the branch picker option from the review popup.
OpenReviewBranchPicker(PathBuf),

View File

@@ -1483,9 +1483,8 @@ impl ChatWidget {
));
return;
}
self.app_event_tx.send(AppEvent::SaveSession {
name: name.to_string(),
});
self.app_event_tx
.send(AppEvent::CodexOp(Op::SaveSession { name: name.into() }));
self.add_info_message(format!("Saving session '{name}'..."), None);
}
@@ -1659,6 +1658,12 @@ impl ChatWidget {
EventMsg::Error(ErrorEvent { message }) => self.on_error(message),
EventMsg::McpStartupUpdate(ev) => self.on_mcp_startup_update(ev),
EventMsg::McpStartupComplete(ev) => self.on_mcp_startup_complete(ev),
EventMsg::SaveSessionResponse(ev) => {
self.add_info_message(
format!("Saved session '{}' ({}).", ev.name, ev.conversation_id),
Some(format!("Rollout: {}", ev.rollout_path.display())),
);
}
EventMsg::TurnAborted(ev) => match ev.reason {
TurnAbortReason::Interrupted => {
self.on_interrupted_turn(ev.reason);

View File

@@ -19,17 +19,14 @@ use codex_core::config::ConfigOverrides;
use codex_core::config::find_codex_home;
use codex_core::config::load_config_as_toml_with_cli_overrides;
use codex_core::config::resolve_oss_provider;
use codex_core::find_conversation_path_by_id_str;
use codex_core::get_platform_sandbox;
use codex_core::protocol::AskForApproval;
use codex_core::resolve_saved_session;
use codex_core::resolve_rollout_path;
use codex_protocol::config_types::SandboxMode;
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use std::fs::OpenOptions;
use std::path::Path;
use std::path::PathBuf;
use tracing::error;
use tracing::warn;
use tracing_appender::non_blocking;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::filter::Targets;
@@ -338,26 +335,6 @@ pub async fn run_main(
.map_err(|err| std::io::Error::other(err.to_string()))
}
async fn resolve_rollout_path(
codex_home: &Path,
identifier: &str,
) -> std::io::Result<Option<PathBuf>> {
if let Some(entry) = resolve_saved_session(codex_home, identifier)
.await
.map_err(|err| std::io::Error::other(err.to_string()))?
{
if entry.rollout_path.exists() {
return Ok(Some(entry.rollout_path));
}
warn!(
"saved session '{}' points to missing rollout at {}",
identifier,
entry.rollout_path.display()
);
}
find_conversation_path_by_id_str(codex_home, identifier).await
}
async fn run_ratatui_app(
cli: Cli,
initial_config: Config,