Compare commits

...

15 Commits

Author SHA1 Message Date
jif-oai
63a3f3941a Merge branch 'main' into jif/fork 2025-11-20 12:36:40 +00:00
jif-oai
7ceabac707 nit fix 2025-11-19 20:40:35 +00:00
jif-oai
2a24ae36c2 Merge remote-tracking branch 'origin/main' into jif/fork
# Conflicts:
#	codex-rs/core/src/codex.rs
2025-11-19 20:21:19 +00:00
jif-oai
5071cc8fff Add a few tests 2025-11-19 15:30:04 +00:00
jif-oai
9c765f1217 Compilation nits 2025-11-19 15:21:58 +00:00
jif-oai
7116d2a6a4 More tests 2025-11-19 15:16:12 +00:00
jif-oai
94f1a61df5 Clippy 2025-11-19 15:06:51 +00:00
jif-oai
7ec1311aff Decoupling of TUI 2025-11-19 14:53:17 +00:00
jif-oai
b9f260057c Optional command 2025-11-19 14:21:04 +00:00
jif-oai
72af9e3092 Clippy 2025-11-19 12:39:28 +00:00
jif-oai
8f0d83eb11 Clean errors 2025-11-19 12:31:27 +00:00
jif-oai
26d667e152 Add tests 2025-11-19 12:13:36 +00:00
jif-oai
d1cf2b967c One fix 2025-11-19 11:50:42 +00:00
jif-oai
78afe914e1 Merge remote-tracking branch 'origin/main' into jif/fork
# Conflicts:
#	codex-rs/cli/src/main.rs
#	codex-rs/tui/src/cli.rs
2025-11-19 11:41:17 +00:00
jif-oai
1a8c1a4d9a V1 2025-11-18 16:50:45 +00:00
25 changed files with 1376 additions and 92 deletions

View File

@@ -148,7 +148,6 @@ use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tokio::select;
use tokio::sync::Mutex;
use tokio::sync::oneshot;
use tracing::error;
@@ -2239,51 +2238,25 @@ impl CodexMessageProcessor {
.await
{
info!("conversation {conversation_id} was active; shutting down");
let conversation_clone = conversation.clone();
let notify = Arc::new(tokio::sync::Notify::new());
let notify_clone = notify.clone();
// Do not wait on conversation.next_event(); the listener task already consumes
// the stream. Request shutdown and ensure the rollout file is flushed before moving it.
if let Err(err) = conversation.submit(Op::Shutdown).await {
error!("failed to submit Shutdown to conversation {conversation_id}: {err}");
}
// Establish the listener for ShutdownComplete before submitting
// Shutdown so it is not missed.
let is_shutdown = tokio::spawn(async move {
// Create the notified future outside the loop to avoid losing notifications.
let notified = notify_clone.notified();
tokio::pin!(notified);
loop {
select! {
_ = &mut notified => { break; }
event = conversation_clone.next_event() => {
match event {
Ok(event) => {
if matches!(event.msg, EventMsg::ShutdownComplete) { break; }
}
// Break on errors to avoid tight loops when the agent loop has exited.
Err(_) => { break; }
}
}
}
let flush_result =
tokio::time::timeout(Duration::from_secs(5), conversation.flush_rollout()).await;
match flush_result {
Ok(Ok(())) => {}
Ok(Err(err)) => {
warn!(
"conversation {conversation_id} rollout flush failed before archive: {err}"
);
}
});
// Request shutdown.
match conversation.submit(Op::Shutdown).await {
Ok(_) => {
// Successfully submitted Shutdown; wait before proceeding.
select! {
_ = is_shutdown => {
// Normal shutdown: proceed with archive.
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
warn!("conversation {conversation_id} shutdown timed out; proceeding with archive");
// Wake any waiter; use notify_waiters to avoid missing the signal.
notify.notify_waiters();
// Perhaps we lost a shutdown race, so let's continue to
// clean up the .jsonl file.
}
}
}
Err(err) => {
error!("failed to submit Shutdown to conversation {conversation_id}: {err}");
notify.notify_waiters();
Err(_) => {
warn!(
"conversation {conversation_id} rollout flush timed out; proceeding with archive"
);
}
}
}
@@ -2295,7 +2268,8 @@ impl CodexMessageProcessor {
.codex_home
.join(codex_core::ARCHIVED_SESSIONS_SUBDIR);
tokio::fs::create_dir_all(&archive_folder).await?;
tokio::fs::rename(&canonical_rollout_path, &archive_folder.join(&file_name)).await?;
let destination = archive_folder.join(&file_name);
tokio::fs::rename(&canonical_rollout_path, &destination).await?;
Ok(())
}
.await;

View File

@@ -46,6 +46,7 @@ pub fn create_fake_rollout(
instructions: None,
source: SessionSource::Cli,
model_provider: model_provider.map(str::to_string),
name: None,
})?;
let lines = [

View File

@@ -100,6 +100,9 @@ enum Subcommand {
/// Resume a previous interactive session (picker by default; use --last to continue the most recent).
Resume(ResumeCommand),
/// Fork an existing session into a new conversation.
Fork(ForkCommand),
/// [EXPERIMENTAL] Browse tasks from Codex Cloud and apply changes locally.
#[clap(name = "cloud", alias = "cloud-tasks")]
Cloud(CloudTasksCli),
@@ -142,6 +145,16 @@ struct ResumeCommand {
config_overrides: TuiCli,
}
#[derive(Debug, Parser)]
struct ForkCommand {
/// Resume from a saved session name or rollout id, but start a new conversation.
#[arg(value_name = "ID|NAME")]
target: String,
#[clap(flatten)]
config_overrides: TuiCli,
}
#[derive(Debug, Parser)]
struct SandboxArgs {
#[command(subcommand)]
@@ -466,6 +479,19 @@ async fn cli_main(codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()
let exit_info = codex_tui::run_main(interactive, codex_linux_sandbox_exe).await?;
handle_app_exit(exit_info)?;
}
Some(Subcommand::Fork(ForkCommand {
target,
config_overrides,
})) => {
interactive = finalize_fork_interactive(
interactive,
root_config_overrides.clone(),
target,
config_overrides,
);
let exit_info = codex_tui::run_main(interactive, codex_linux_sandbox_exe).await?;
handle_app_exit(exit_info)?;
}
Some(Subcommand::Login(mut login_cli)) => {
prepend_config_flags(
&mut login_cli.config_overrides,
@@ -627,6 +653,7 @@ fn finalize_resume_interactive(
interactive.resume_last = last;
interactive.resume_session_id = resume_session_id;
interactive.resume_show_all = show_all;
interactive.fork_source = None;
// Merge resume-scoped flags and overrides with highest precedence.
merge_resume_cli_flags(&mut interactive, resume_cli);
@@ -637,6 +664,21 @@ fn finalize_resume_interactive(
interactive
}
fn finalize_fork_interactive(
mut interactive: TuiCli,
root_config_overrides: CliConfigOverrides,
target: String,
fork_cli: TuiCli,
) -> TuiCli {
interactive.resume_picker = false;
interactive.resume_last = false;
interactive.resume_session_id = None;
interactive.fork_source = Some(target);
merge_resume_cli_flags(&mut interactive, fork_cli);
prepend_config_flags(&mut interactive.config_overrides, root_config_overrides);
interactive
}
/// Merge flags provided to `codex resume` so they take precedence over any
/// root-level flags. Only overrides fields explicitly set on the resume-scoped
/// CLI. Also appends `-c key=value` overrides with highest precedence.
@@ -727,6 +769,26 @@ mod tests {
)
}
fn fork_from_args(args: &[&str]) -> TuiCli {
let cli = MultitoolCli::try_parse_from(args).expect("parse");
let MultitoolCli {
interactive,
config_overrides: root_overrides,
subcommand,
feature_toggles: _,
} = cli;
let Subcommand::Fork(ForkCommand {
target,
config_overrides,
}) = subcommand.expect("fork present")
else {
unreachable!()
};
finalize_fork_interactive(interactive, root_overrides, target, config_overrides)
}
fn sample_exit_info(conversation: Option<&str>) -> AppExitInfo {
let token_usage = TokenUsage {
output_tokens: 2,
@@ -819,6 +881,15 @@ mod tests {
assert!(interactive.resume_show_all);
}
#[test]
fn fork_sets_target_and_disables_resume_controls() {
let interactive = fork_from_args(["codex", "fork", "saved"].as_ref());
assert_eq!(interactive.fork_source.as_deref(), Some("saved"));
assert!(!interactive.resume_picker);
assert!(!interactive.resume_last);
assert!(interactive.resume_session_id.is_none());
}
#[test]
fn resume_merges_option_flags_and_full_auto() {
let interactive = finalize_from_args(

View File

@@ -100,6 +100,8 @@ use crate::protocol::TurnDiffEvent;
use crate::protocol::WarningEvent;
use crate::rollout::RolloutRecorder;
use crate::rollout::RolloutRecorderParams;
use crate::saved_sessions::build_saved_session_entry;
use crate::saved_sessions::upsert_saved_session;
use crate::shell;
use crate::state::ActiveTurn;
use crate::state::SessionServices;
@@ -134,6 +136,8 @@ use codex_protocol::user_input::UserInput;
use codex_utils_readiness::Readiness;
use codex_utils_readiness::ReadinessFlag;
use codex_utils_tokenizer::warm_model_cache;
use reqwest::StatusCode;
use std::path::Path;
/// The high-level interface to the Codex system.
/// It operates as a queue pair where you send submissions and receive events.
@@ -149,6 +153,7 @@ pub struct Codex {
pub struct CodexSpawnOk {
pub codex: Codex,
pub conversation_id: ConversationId,
pub(crate) session: Arc<Session>,
}
pub(crate) const INITIAL_SUBMIT_ID: &str = "";
@@ -209,7 +214,8 @@ impl Codex {
let conversation_id = session.conversation_id;
// This task will run until Op::Shutdown is received.
tokio::spawn(submission_loop(session, config, rx_sub));
let submission_session = Arc::clone(&session);
tokio::spawn(submission_loop(submission_session, config, rx_sub));
let codex = Codex {
next_id: AtomicU64::new(0),
tx_sub,
@@ -219,6 +225,7 @@ impl Codex {
Ok(CodexSpawnOk {
codex,
conversation_id,
session,
})
}
@@ -632,18 +639,68 @@ impl Session {
}
/// Ensure all rollout writes are durably flushed.
pub(crate) async fn flush_rollout(&self) {
pub(crate) async fn flush_rollout(&self) -> std::io::Result<()> {
let recorder = {
let guard = self.services.rollout.lock().await;
guard.clone()
};
if let Some(rec) = recorder
&& let Err(e) = rec.flush().await
{
warn!("failed to flush rollout recorder: {e}");
if let Some(rec) = recorder {
rec.flush().await
} else {
Ok(())
}
}
pub(crate) async fn set_session_name(&self, name: Option<String>) -> std::io::Result<()> {
let recorder = {
let guard = self.services.rollout.lock().await;
guard.clone()
};
if let Some(rec) = recorder {
rec.set_session_name(name).await
} else {
Ok(())
}
}
pub(crate) async fn rollout_path(&self) -> CodexResult<PathBuf> {
let guard = self.services.rollout.lock().await;
let Some(rec) = guard.as_ref() else {
return Err(CodexErr::Fatal(
"Rollout recorder is not initialized; cannot save session.".to_string(),
));
};
Ok(rec.rollout_path.clone())
}
pub(crate) async fn model(&self) -> String {
let state = self.state.lock().await;
state.session_configuration.model.clone()
}
pub(crate) async fn save_session(
&self,
codex_home: &Path,
name: &str,
) -> CodexResult<crate::SavedSessionEntry> {
let trimmed = name.trim();
if trimmed.is_empty() {
return Err(CodexErr::Fatal("Usage: /save <name>".to_string()));
}
let rollout_path = self.rollout_path().await?;
self.flush_rollout()
.await
.map_err(|e| CodexErr::Fatal(format!("failed to flush rollout recorder: {e}")))?;
self.set_session_name(Some(trimmed.to_string()))
.await
.map_err(|e| CodexErr::Fatal(format!("failed to write session name: {e}")))?;
let entry =
build_saved_session_entry(trimmed.to_string(), rollout_path, self.model().await)
.await?;
upsert_saved_session(codex_home, entry.clone()).await?;
Ok(entry)
}
fn next_internal_sub_id(&self) -> String {
let id = self
.next_internal_sub_id
@@ -659,7 +716,9 @@ impl Session {
let items = self.build_initial_context(&turn_context);
self.record_conversation_items(&turn_context, &items).await;
// Ensure initial items are visible to immediate readers (e.g., tests, forks).
self.flush_rollout().await;
if let Err(e) = self.flush_rollout().await {
warn!("failed to flush rollout recorder: {e}");
}
}
InitialHistory::Resumed(_) | InitialHistory::Forked(_) => {
let rollout_items = conversation_history.get_rollout_items();
@@ -703,10 +762,18 @@ impl Session {
// If persisting, persist all rollout items as-is (recorder filters)
if persist && !rollout_items.is_empty() {
self.persist_rollout_items(&rollout_items).await;
// Drop legacy SessionMeta lines from the source rollout so the forked
// session only contains its own SessionMeta written by the recorder.
let filtered =
InitialHistory::Forked(rollout_items.clone()).without_session_meta();
if !filtered.is_empty() {
self.persist_rollout_items(&filtered).await;
}
}
// Flush after seeding history and any persisted rollout copy.
self.flush_rollout().await;
if let Err(e) = self.flush_rollout().await {
warn!("failed to flush rollout recorder: {e}");
}
}
}
}
@@ -1425,6 +1492,9 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
Op::Review { review_request } => {
handlers::review(&sess, &config, sub.id.clone(), review_request).await;
}
Op::SaveSession { name } => {
handlers::save_session(&sess, &config, sub.id.clone(), name).await;
}
_ => {} // Ignore unknown ops; enum is non_exhaustive to allow extensions.
}
}
@@ -1452,6 +1522,7 @@ mod handlers {
use codex_protocol::protocol::Op;
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::SaveSessionResponseEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::user_input::UserInput;
@@ -1673,6 +1744,38 @@ mod handlers {
.await;
}
pub async fn save_session(
sess: &Arc<Session>,
config: &Arc<crate::config::Config>,
sub_id: String,
name: String,
) {
match sess.save_session(&config.codex_home, &name).await {
Ok(entry) => {
let event = Event {
id: sub_id,
msg: EventMsg::SaveSessionResponse(SaveSessionResponseEvent {
name: entry.name,
rollout_path: entry.rollout_path,
conversation_id: entry.conversation_id,
}),
};
sess.send_event_raw(event).await;
}
Err(err) => {
let message = format!("Failed to save session '{name}': {err}");
let event = Event {
id: sub_id,
msg: EventMsg::Error(ErrorEvent {
message,
http_status_code: None,
}),
};
sess.send_event_raw(event).await;
}
}
}
pub async fn shutdown(sess: &Arc<Session>, sub_id: String) -> bool {
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
info!("Shutting down Codex instance");

View File

@@ -1,22 +1,26 @@
use crate::codex::Codex;
use crate::codex::Session;
use crate::error::Result as CodexResult;
use crate::protocol::Event;
use crate::protocol::Op;
use crate::protocol::Submission;
use std::path::PathBuf;
use std::sync::Arc;
pub struct CodexConversation {
codex: Codex,
rollout_path: PathBuf,
session: Arc<Session>,
}
/// Conduit for the bidirectional stream of messages that compose a conversation
/// in Codex.
impl CodexConversation {
pub(crate) fn new(codex: Codex, rollout_path: PathBuf) -> Self {
pub(crate) fn new(codex: Codex, rollout_path: PathBuf, session: Arc<Session>) -> Self {
Self {
codex,
rollout_path,
session,
}
}
@@ -36,4 +40,24 @@ impl CodexConversation {
pub fn rollout_path(&self) -> PathBuf {
self.rollout_path.clone()
}
pub async fn flush_rollout(&self) -> CodexResult<()> {
Ok(self.session.flush_rollout().await?)
}
pub async fn set_session_name(&self, name: Option<String>) -> CodexResult<()> {
Ok(self.session.set_session_name(name).await?)
}
pub async fn model(&self) -> String {
self.session.model().await
}
pub async fn save_session(
&self,
codex_home: &std::path::Path,
name: &str,
) -> CodexResult<crate::SavedSessionEntry> {
self.session.save_session(codex_home, name).await
}
}

View File

@@ -3,6 +3,7 @@ use crate::CodexAuth;
use crate::codex::Codex;
use crate::codex::CodexSpawnOk;
use crate::codex::INITIAL_SUBMIT_ID;
use crate::codex::Session;
use crate::codex_conversation::CodexConversation;
use crate::config::Config;
use crate::error::CodexErr;
@@ -11,6 +12,7 @@ use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::SessionConfiguredEvent;
use crate::rollout::RolloutRecorder;
use crate::saved_sessions::resolve_rollout_path;
use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
@@ -18,6 +20,7 @@ use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource;
use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
@@ -56,6 +59,7 @@ impl ConversationManager {
)
}
/// Start a brand new conversation with default initial history.
pub async fn new_conversation(&self, config: Config) -> CodexResult<NewConversation> {
self.spawn_conversation(config, self.auth_manager.clone())
.await
@@ -69,6 +73,7 @@ impl ConversationManager {
let CodexSpawnOk {
codex,
conversation_id,
session,
} = Codex::spawn(
config,
auth_manager,
@@ -76,13 +81,14 @@ impl ConversationManager {
self.session_source.clone(),
)
.await?;
self.finalize_spawn(codex, conversation_id).await
self.finalize_spawn(codex, conversation_id, session).await
}
async fn finalize_spawn(
&self,
codex: Codex,
conversation_id: ConversationId,
session: Arc<Session>,
) -> CodexResult<NewConversation> {
// The first event must be `SessionInitialized`. Validate and forward it
// to the caller so that they can display it in the conversation
@@ -101,6 +107,7 @@ impl ConversationManager {
let conversation = Arc::new(CodexConversation::new(
codex,
session_configured.rollout_path.clone(),
session,
));
self.conversations
.write()
@@ -125,6 +132,7 @@ impl ConversationManager {
.ok_or_else(|| CodexErr::ConversationNotFound(conversation_id))
}
/// Resume a conversation from an on-disk rollout file.
pub async fn resume_conversation_from_rollout(
&self,
config: Config,
@@ -136,6 +144,23 @@ impl ConversationManager {
.await
}
/// Resume a conversation by saved-session name or rollout id string.
pub async fn resume_conversation_from_identifier(
&self,
config: Config,
identifier: &str,
auth_manager: Arc<AuthManager>,
) -> CodexResult<NewConversation> {
let Some(path) = resolve_rollout_path(&config.codex_home, identifier).await? else {
return Err(CodexErr::Fatal(format!(
"No saved session or rollout found for '{identifier}'"
)));
};
self.resume_conversation_from_rollout(config, path, auth_manager)
.await
}
/// Resume a conversation from provided rollout history items.
pub async fn resume_conversation_with_history(
&self,
config: Config,
@@ -145,6 +170,7 @@ impl ConversationManager {
let CodexSpawnOk {
codex,
conversation_id,
session,
} = Codex::spawn(
config,
auth_manager,
@@ -152,7 +178,54 @@ impl ConversationManager {
self.session_source.clone(),
)
.await?;
self.finalize_spawn(codex, conversation_id).await
self.finalize_spawn(codex, conversation_id, session).await
}
/// Fork a new conversation from the given rollout path.
pub async fn fork_from_rollout(
&self,
config: Config,
path: PathBuf,
auth_manager: Arc<AuthManager>,
) -> CodexResult<NewConversation> {
let initial_history = RolloutRecorder::get_rollout_history(&path).await?;
let forked = match initial_history {
InitialHistory::Resumed(resumed) => InitialHistory::Forked(resumed.history),
InitialHistory::Forked(items) => InitialHistory::Forked(items),
InitialHistory::New => InitialHistory::New,
};
self.resume_conversation_with_history(config, forked, auth_manager)
.await
}
/// Fork a new conversation from a saved-session name or rollout id string.
pub async fn fork_from_identifier(
&self,
config: Config,
identifier: &str,
auth_manager: Arc<AuthManager>,
) -> CodexResult<NewConversation> {
let Some(path) = resolve_rollout_path(&config.codex_home, identifier).await? else {
return Err(CodexErr::Fatal(format!(
"No saved session or rollout found for '{identifier}'"
)));
};
self.fork_from_rollout(config, path, auth_manager).await
}
/// Persist a human-friendly session name and record it in saved_sessions.json.
pub async fn save_session(
&self,
conversation_id: ConversationId,
codex_home: &Path,
name: &str,
) -> CodexResult<crate::SavedSessionEntry> {
let trimmed = name.trim();
if trimmed.is_empty() {
return Err(CodexErr::Fatal("Usage: /save <name>".to_string()));
}
let conversation = self.get_conversation(conversation_id).await?;
conversation.save_session(codex_home, trimmed).await
}
/// Removes the conversation from the manager's internal map, though the
@@ -185,9 +258,10 @@ impl ConversationManager {
let CodexSpawnOk {
codex,
conversation_id,
session,
} = Codex::spawn(config, auth_manager, history, self.session_source.clone()).await?;
self.finalize_spawn(codex, conversation_id).await
self.finalize_spawn(codex, conversation_id, session).await
}
}

View File

@@ -66,6 +66,7 @@ mod openai_model_info;
pub mod project_doc;
mod rollout;
pub(crate) mod safety;
pub mod saved_sessions;
pub mod seatbelt;
pub mod shell;
pub mod spawn;
@@ -83,6 +84,12 @@ pub use rollout::list::ConversationsPage;
pub use rollout::list::Cursor;
pub use rollout::list::parse_cursor;
pub use rollout::list::read_head_for_summary;
pub use saved_sessions::SavedSessionEntry;
pub use saved_sessions::build_saved_session_entry;
pub use saved_sessions::list_saved_sessions;
pub use saved_sessions::resolve_rollout_path;
pub use saved_sessions::resolve_saved_session;
pub use saved_sessions::upsert_saved_session;
mod function_tool;
mod state;
mod tasks;

View File

@@ -66,6 +66,7 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::ExecApprovalRequest(_)
| EventMsg::ApplyPatchApprovalRequest(_)
| EventMsg::BackgroundEvent(_)
| EventMsg::SaveSessionResponse(_)
| EventMsg::StreamError(_)
| EventMsg::PatchApplyBegin(_)
| EventMsg::PatchApplyEnd(_)

View File

@@ -11,6 +11,8 @@ use serde_json::Value;
use time::OffsetDateTime;
use time::format_description::FormatItem;
use time::macros::format_description;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::{self};
@@ -70,6 +72,10 @@ enum RolloutCmd {
Shutdown {
ack: oneshot::Sender<()>,
},
SetName {
name: Option<String>,
ack: oneshot::Sender<std::io::Result<()>>,
},
}
impl RolloutRecorderParams {
@@ -148,11 +154,14 @@ impl RolloutRecorder {
instructions,
source,
model_provider: Some(config.model_provider_id.clone()),
name: None,
}),
)
}
RolloutRecorderParams::Resume { path } => (
tokio::fs::OpenOptions::new()
.read(true)
.write(true)
.append(true)
.open(&path)
.await?,
@@ -196,6 +205,21 @@ impl RolloutRecorder {
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
}
/// Update the session name stored in the rollout's SessionMeta line.
pub async fn set_session_name(&self, name: Option<String>) -> std::io::Result<()> {
let (tx, rx) = oneshot::channel();
self.tx
.send(RolloutCmd::SetName { name, ack: tx })
.await
.map_err(|e| IoError::other(format!("failed to queue session name update: {e}")))?;
match rx.await {
Ok(result) => result,
Err(e) => Err(IoError::other(format!(
"failed waiting for session name update: {e}"
))),
}
}
/// Flush all queued writes and wait until they are committed by the writer task.
pub async fn flush(&self) -> std::io::Result<()> {
let (tx, rx) = oneshot::channel();
@@ -334,6 +358,7 @@ fn create_log_file(
let path = dir.join(filename);
let file = std::fs::OpenOptions::new()
.read(true)
.append(true)
.create(true)
.open(&path)?;
@@ -389,6 +414,10 @@ async fn rollout_writer(
RolloutCmd::Shutdown { ack } => {
let _ = ack.send(());
}
RolloutCmd::SetName { name, ack } => {
let result = rewrite_session_meta_name(&mut writer.file, name).await;
let _ = ack.send(result);
}
}
}
@@ -422,3 +451,232 @@ impl JsonlWriter {
Ok(())
}
}
async fn rewrite_session_meta_name(
file: &mut tokio::fs::File,
name: Option<String>,
) -> std::io::Result<()> {
use std::io::SeekFrom;
file.flush().await?;
file.seek(SeekFrom::Start(0)).await?;
let mut contents = Vec::new();
file.read_to_end(&mut contents).await?;
if contents.is_empty() {
return Err(IoError::other("empty rollout file"));
}
let newline_idx = contents
.iter()
.position(|&b| b == b'\n')
.ok_or_else(|| IoError::other("rollout missing newline after SessionMeta"))?;
let first_line = &contents[..newline_idx];
let mut rollout_line: RolloutLine = serde_json::from_slice(first_line)
.map_err(|e| IoError::other(format!("failed to parse SessionMeta: {e}")))?;
let RolloutItem::SessionMeta(ref mut session_meta_line) = rollout_line.item else {
return Err(IoError::other("first rollout item is not SessionMeta"));
};
session_meta_line.meta.name = name;
let mut updated = serde_json::to_vec(&rollout_line)?;
updated.push(b'\n');
updated.extend_from_slice(&contents[newline_idx + 1..]);
file.set_len(0).await?;
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&updated).await?;
file.flush().await?;
file.seek(SeekFrom::End(0)).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::rewrite_session_meta_name;
use codex_protocol::ConversationId;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMeta;
use codex_protocol::protocol::SessionMetaLine;
use tempfile::NamedTempFile;
use tokio::fs::OpenOptions;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncSeekExt;
use tokio::io::AsyncWriteExt;
fn sample_meta(name: Option<&str>) -> RolloutItem {
RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
id: ConversationId::from_string("00000000-0000-4000-8000-000000000001")
.expect("conversation id"),
timestamp: "2025-01-01T00:00:00.000Z".to_string(),
cwd: "/tmp".into(),
originator: "tester".to_string(),
cli_version: "1.0.0".to_string(),
instructions: None,
source: codex_protocol::protocol::SessionSource::Cli,
model_provider: Some("provider".to_string()),
name: name.map(str::to_string),
},
git: None,
})
}
fn sample_line() -> RolloutLine {
RolloutLine {
timestamp: "2025-01-01T00:00:00.000Z".to_string(),
item: sample_meta(None),
}
}
async fn write_rollout(lines: &[RolloutLine]) -> (NamedTempFile, tokio::fs::File) {
let temp = NamedTempFile::new().expect("temp file");
let mut file = OpenOptions::new()
.read(true)
.write(true)
.truncate(true)
.create(true)
.open(temp.path())
.await
.expect("open temp file");
for line in lines {
let mut json = serde_json::to_vec(line).expect("serialize line");
json.push(b'\n');
file.write_all(&json).await.expect("write line");
}
file.seek(std::io::SeekFrom::Start(0))
.await
.expect("rewind");
(temp, file)
}
async fn read_first_line(path: &std::path::Path) -> RolloutLine {
let mut contents = String::new();
let mut file = OpenOptions::new()
.read(true)
.open(path)
.await
.expect("open for read");
file.read_to_string(&mut contents).await.expect("read file");
let first = contents.lines().next().expect("first line");
serde_json::from_str(first).expect("parse first line")
}
#[tokio::test]
async fn updates_meta_name_and_preserves_rest() {
let events = vec![
sample_line(),
RolloutLine {
timestamp: "2025-01-01T00:00:01.000Z".to_string(),
item: RolloutItem::ResponseItem(ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: "hello".to_string(),
}],
}),
},
];
let (temp, mut file) = write_rollout(&events).await;
rewrite_session_meta_name(&mut file, Some("renamed".to_string()))
.await
.expect("rewrite ok");
let first = read_first_line(temp.path()).await;
let RolloutItem::SessionMeta(meta_line) = first.item else {
panic!("expected SessionMeta line");
};
assert_eq!(meta_line.meta.name.as_deref(), Some("renamed"));
let contents = tokio::fs::read_to_string(temp.path())
.await
.expect("read file");
let lines: Vec<_> = contents.lines().collect();
assert_eq!(lines.len(), 2);
let parsed: RolloutLine = serde_json::from_str(lines[1]).expect("parse second line");
let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) = parsed.item
else {
panic!("expected response item");
};
assert_eq!(role, "assistant");
assert_eq!(
content,
vec![ContentItem::OutputText {
text: "hello".to_string()
}]
);
}
#[tokio::test]
async fn clearing_name_sets_none() {
let mut first = sample_line();
first.item = sample_meta(Some("existing"));
let (temp, mut file) = write_rollout(&[first]).await;
rewrite_session_meta_name(&mut file, None)
.await
.expect("rewrite ok");
let first = read_first_line(temp.path()).await;
let RolloutItem::SessionMeta(meta_line) = first.item else {
panic!("expected SessionMeta line");
};
assert_eq!(meta_line.meta.name, None);
}
#[tokio::test]
async fn errors_on_empty_file() {
let temp = NamedTempFile::new().expect("temp file");
let mut file = OpenOptions::new()
.read(true)
.write(true)
.open(temp.path())
.await
.expect("open temp file");
let err = rewrite_session_meta_name(&mut file, Some("x".to_string()))
.await
.expect_err("expected error");
assert!(format!("{err}").contains("empty rollout file"));
}
#[tokio::test]
async fn errors_when_first_line_not_session_meta() {
let wrong = RolloutLine {
timestamp: "t".to_string(),
item: RolloutItem::ResponseItem(ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: "hello".to_string(),
}],
}),
};
let (_temp, mut file) = write_rollout(&[wrong]).await;
let err = rewrite_session_meta_name(&mut file, Some("x".to_string()))
.await
.expect_err("expected error");
assert!(format!("{err}").contains("first rollout item is not SessionMeta"));
// ensure file pointer is rewound to end after failure paths
let pos = file
.seek(std::io::SeekFrom::Current(0))
.await
.expect("seek");
assert!(pos > 0);
}
#[tokio::test]
async fn errors_when_missing_newline() {
let temp = NamedTempFile::new().expect("temp file");
let mut file = OpenOptions::new()
.read(true)
.write(true)
.open(temp.path())
.await
.expect("open temp file");
file.write_all(b"no newline").await.expect("write");
let err = rewrite_session_meta_name(&mut file, Some("x".to_string()))
.await
.expect_err("expected error");
assert!(format!("{err}").contains("rollout missing newline after SessionMeta"));
}
}

View File

@@ -594,6 +594,7 @@ async fn test_tail_includes_last_response_items() -> Result<()> {
cli_version: "test_version".into(),
source: SessionSource::VSCode,
model_provider: Some("test-provider".into()),
name: None,
},
git: None,
}),
@@ -687,6 +688,7 @@ async fn test_tail_handles_short_sessions() -> Result<()> {
cli_version: "test_version".into(),
source: SessionSource::VSCode,
model_provider: Some("test-provider".into()),
name: None,
},
git: None,
}),
@@ -781,6 +783,7 @@ async fn test_tail_skips_trailing_non_responses() -> Result<()> {
cli_version: "test_version".into(),
source: SessionSource::VSCode,
model_provider: Some("test-provider".into()),
name: None,
},
git: None,
}),

View File

@@ -0,0 +1,144 @@
use crate::error::Result;
use crate::find_conversation_path_by_id_str;
use crate::rollout::list::read_head_for_summary;
use codex_protocol::ConversationId;
use codex_protocol::protocol::SessionMetaLine;
use serde::Deserialize;
use serde::Serialize;
use std::collections::BTreeMap;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::path::Path;
use std::path::PathBuf;
use time::OffsetDateTime;
use time::format_description::well_known::Rfc3339;
use tracing::warn;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct SavedSessionEntry {
pub name: String,
pub conversation_id: ConversationId,
pub rollout_path: PathBuf,
pub cwd: PathBuf,
pub model: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub model_provider: Option<String>,
pub saved_at: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub created_at: Option<String>,
}
#[derive(Debug, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
struct SavedSessionsFile {
#[serde(default)]
entries: BTreeMap<String, SavedSessionEntry>,
}
fn saved_sessions_path(codex_home: &Path) -> PathBuf {
codex_home.join("saved_sessions.json")
}
async fn load_saved_sessions_file(path: &Path) -> Result<SavedSessionsFile> {
match tokio::fs::read_to_string(path).await {
Ok(text) => {
let parsed = serde_json::from_str(&text)
.map_err(|e| IoError::other(format!("failed to parse saved sessions: {e}")))?;
Ok(parsed)
}
Err(err) if err.kind() == ErrorKind::NotFound => Ok(SavedSessionsFile::default()),
Err(err) => Err(err.into()),
}
}
async fn write_saved_sessions_file(path: &Path, file: &SavedSessionsFile) -> Result<()> {
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let json = serde_json::to_string_pretty(file)
.map_err(|e| IoError::other(format!("failed to serialize saved sessions: {e}")))?;
let tmp_path = path.with_extension("json.tmp");
tokio::fs::write(&tmp_path, json).await?;
tokio::fs::rename(tmp_path, path).await?;
Ok(())
}
/// Create a new entry from the rollout's SessionMeta line.
pub async fn build_saved_session_entry(
name: String,
rollout_path: PathBuf,
model: String,
) -> Result<SavedSessionEntry> {
let head = read_head_for_summary(&rollout_path).await?;
let first = head.first().ok_or_else(|| {
IoError::other(format!(
"rollout at {} has no SessionMeta",
rollout_path.display()
))
})?;
let SessionMetaLine { mut meta, .. } = serde_json::from_value::<SessionMetaLine>(first.clone())
.map_err(|e| IoError::other(format!("failed to parse SessionMeta: {e}")))?;
meta.name = Some(name.clone());
let saved_at = OffsetDateTime::now_utc()
.format(&Rfc3339)
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
let created_at = if meta.timestamp.is_empty() {
None
} else {
Some(meta.timestamp.clone())
};
Ok(SavedSessionEntry {
name,
conversation_id: meta.id,
rollout_path,
cwd: meta.cwd,
model,
model_provider: meta.model_provider,
saved_at,
created_at,
})
}
/// Insert or replace a saved session entry in `saved_sessions.json`.
pub async fn upsert_saved_session(codex_home: &Path, entry: SavedSessionEntry) -> Result<()> {
let path = saved_sessions_path(codex_home);
let mut file = load_saved_sessions_file(&path).await?;
file.entries.insert(entry.name.clone(), entry);
write_saved_sessions_file(&path, &file).await
}
/// Lookup a saved session by name, if present.
pub async fn resolve_saved_session(
codex_home: &Path,
name: &str,
) -> Result<Option<SavedSessionEntry>> {
let path = saved_sessions_path(codex_home);
let file = load_saved_sessions_file(&path).await?;
Ok(file.entries.get(name).cloned())
}
/// Return all saved sessions ordered by newest `saved_at` first.
pub async fn list_saved_sessions(codex_home: &Path) -> Result<Vec<SavedSessionEntry>> {
let path = saved_sessions_path(codex_home);
let file = load_saved_sessions_file(&path).await?;
let mut entries: Vec<SavedSessionEntry> = file.entries.values().cloned().collect();
entries.sort_by(|a, b| b.saved_at.cmp(&a.saved_at));
Ok(entries)
}
/// Resolve a rollout path from either a saved-session name or rollout id string.
/// Returns `Ok(None)` when nothing matches.
pub async fn resolve_rollout_path(codex_home: &Path, identifier: &str) -> Result<Option<PathBuf>> {
if let Some(entry) = resolve_saved_session(codex_home, identifier).await? {
if entry.rollout_path.exists() {
return Ok(Some(entry.rollout_path));
}
warn!(
"saved session '{}' points to missing rollout at {}",
identifier,
entry.rollout_path.display()
);
}
Ok(find_conversation_path_by_id_str(codex_home, identifier).await?)
}

View File

@@ -128,7 +128,9 @@ impl Session {
task_cancellation_token.child_token(),
)
.await;
session_ctx.clone_session().flush_rollout().await;
if let Err(e) = session_ctx.clone_session().flush_rollout().await {
tracing::warn!("failed to flush rollout recorder: {e}");
}
if !task_cancellation_token.is_cancelled() {
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
let sess = session_ctx.clone_session();

View File

@@ -45,6 +45,7 @@ mod resume;
mod review;
mod rmcp_client;
mod rollout_list_find;
mod saved_sessions;
mod seatbelt;
mod shell_serialization;
mod stream_error_allows_next_turn;

View File

@@ -0,0 +1,457 @@
#![allow(clippy::expect_used)]
use anyhow::Result;
use codex_core::AuthManager;
use codex_core::CodexAuth;
use codex_core::CodexConversation;
use codex_core::ConversationManager;
use codex_core::SavedSessionEntry;
use codex_core::build_saved_session_entry;
use codex_core::config::Config;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_core::protocol::RolloutItem;
use codex_core::protocol::RolloutLine;
use codex_core::protocol::SaveSessionResponseEvent;
use codex_core::protocol::SessionSource;
use codex_core::resolve_saved_session;
use codex_core::upsert_saved_session;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_sequence;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::path::Path;
use std::sync::Arc;
fn completion_body(idx: usize, message: &str) -> String {
let resp_id = format!("resp-{idx}");
let msg_id = format!("msg-{idx}");
sse(vec![
ev_response_created(&resp_id),
ev_assistant_message(&msg_id, message),
ev_completed(&resp_id),
])
}
fn rollout_lines(path: &Path) -> Vec<RolloutLine> {
let text = std::fs::read_to_string(path).expect("read rollout");
text.lines()
.filter_map(|line| {
if line.trim().is_empty() {
return None;
}
let value: serde_json::Value = serde_json::from_str(line).expect("rollout line json");
Some(serde_json::from_value::<RolloutLine>(value).expect("rollout line"))
})
.collect()
}
fn rollout_items_without_meta(path: &Path) -> Vec<RolloutItem> {
rollout_lines(path)
.into_iter()
.filter_map(|line| match line.item {
RolloutItem::SessionMeta(_) => None,
other => Some(other),
})
.collect()
}
fn session_meta_count(path: &Path) -> usize {
rollout_lines(path)
.iter()
.filter(|line| matches!(line.item, RolloutItem::SessionMeta(_)))
.count()
}
async fn submit_text(codex: &Arc<CodexConversation>, text: &str) -> Result<()> {
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: text.to_string(),
}],
})
.await?;
let _ = wait_for_event(codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
Ok(())
}
async fn save_session(
name: &str,
codex: &Arc<CodexConversation>,
config: &Config,
) -> Result<SavedSessionEntry> {
codex.flush_rollout().await?;
codex.set_session_name(Some(name.to_string())).await?;
let entry =
build_saved_session_entry(name.to_string(), codex.rollout_path(), codex.model().await)
.await?;
upsert_saved_session(&config.codex_home, entry.clone()).await?;
Ok(entry)
}
async fn save_session_via_op(
codex: &Arc<CodexConversation>,
name: &str,
) -> Result<SaveSessionResponseEvent> {
codex
.submit(Op::SaveSession {
name: name.to_string(),
})
.await?;
let response: SaveSessionResponseEvent = wait_for_event_match(codex, |ev| match ev {
EventMsg::SaveSessionResponse(resp) => Some(resp.clone()),
_ => None,
})
.await;
Ok(response)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn save_and_resume_by_name() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_sequence(&server, vec![completion_body(1, "initial")]).await;
let mut builder = test_codex();
let initial = builder.build(&server).await?;
submit_text(&initial.codex, "first turn").await?;
let name = "alpha";
let entry = save_session(name, &initial.codex, &initial.config).await?;
let resolved = resolve_saved_session(&initial.config.codex_home, name)
.await?
.expect("saved session");
assert_eq!(entry, resolved);
assert_eq!(session_meta_count(&entry.rollout_path), 1);
let saved_items = rollout_items_without_meta(&entry.rollout_path);
let resumed = builder
.resume(&server, initial.home.clone(), entry.rollout_path.clone())
.await?;
assert_eq!(resumed.session_configured.session_id, entry.conversation_id);
let resumed_items = rollout_items_without_meta(&resumed.session_configured.rollout_path);
assert_eq!(
serde_json::to_value(saved_items)?,
serde_json::to_value(resumed_items)?
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn save_session_op_persists_and_emits_response() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_sequence(&server, vec![completion_body(1, "initial")]).await;
let mut builder = test_codex();
let initial = builder.build(&server).await?;
submit_text(&initial.codex, "first turn").await?;
let name = "via-op";
let response = save_session_via_op(&initial.codex, name).await?;
assert_eq!(response.name, name);
assert_eq!(
response.conversation_id,
initial.session_configured.session_id
);
assert!(response.rollout_path.exists());
let resolved = resolve_saved_session(&initial.config.codex_home, name)
.await?
.expect("saved session");
assert_eq!(resolved.rollout_path, response.rollout_path);
assert_eq!(resolved.conversation_id, response.conversation_id);
assert_eq!(session_meta_count(&resolved.rollout_path), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn fork_from_identifier_after_save_op() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_sequence(
&server,
vec![
completion_body(1, "seed"),
completion_body(2, "fork-extra-1"),
completion_body(3, "fork-extra-2"),
],
)
.await;
let mut builder = test_codex();
let initial = builder.build(&server).await?;
submit_text(&initial.codex, "seeded").await?;
let name = "forkable-op";
let response = save_session_via_op(&initial.codex, name).await?;
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec);
let forked = conversation_manager
.fork_from_identifier(initial.config.clone(), name, auth_manager)
.await?;
assert_ne!(
forked.session_configured.session_id,
response.conversation_id
);
// Record the baseline rollout for the saved session.
let base_items = rollout_items_without_meta(&response.rollout_path);
// Send additional turns to the forked conversation and flush.
submit_text(&forked.conversation, "fork one").await?;
submit_text(&forked.conversation, "fork two").await?;
forked.conversation.flush_rollout().await?;
// Re-read both rollouts: source should remain unchanged.
let base_after = rollout_items_without_meta(&response.rollout_path);
assert_eq!(
serde_json::to_value(&base_items)?,
serde_json::to_value(&base_after)?
);
// Forked rollout should extend the baseline.
let fork_items = rollout_items_without_meta(&forked.conversation.rollout_path());
assert!(
fork_items.len() > base_items.len(),
"expected forked rollout to contain additional items"
);
let fork_prefix: Vec<_> = fork_items.iter().take(base_items.len()).cloned().collect();
assert_eq!(
serde_json::to_value(&base_items)?,
serde_json::to_value(&fork_prefix)?,
"forked rollout should extend the baseline history"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn save_and_fork_by_name() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_sequence(&server, vec![completion_body(1, "base")]).await;
let mut builder = test_codex();
let initial = builder.build(&server).await?;
submit_text(&initial.codex, "original").await?;
let entry = save_session("forkable", &initial.codex, &initial.config).await?;
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec);
let forked = conversation_manager
.fork_from_rollout(
initial.config.clone(),
entry.rollout_path.clone(),
auth_manager,
)
.await?;
assert_ne!(forked.session_configured.session_id, entry.conversation_id);
assert_ne!(forked.conversation.rollout_path(), entry.rollout_path);
assert_eq!(session_meta_count(&forked.conversation.rollout_path()), 1);
let base_items = rollout_items_without_meta(&entry.rollout_path);
let fork_items = rollout_items_without_meta(&forked.conversation.rollout_path());
assert_eq!(
serde_json::to_value(base_items)?,
serde_json::to_value(fork_items)?
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn forked_messages_do_not_touch_original() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_sequence(
&server,
vec![
completion_body(1, "base"),
completion_body(2, "fork-1"),
completion_body(3, "fork-2"),
],
)
.await;
let mut builder = test_codex();
let initial = builder.build(&server).await?;
submit_text(&initial.codex, "first").await?;
let entry = save_session("branch", &initial.codex, &initial.config).await?;
let baseline_items = rollout_items_without_meta(&entry.rollout_path);
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec);
let forked = conversation_manager
.fork_from_rollout(
initial.config.clone(),
entry.rollout_path.clone(),
auth_manager.clone(),
)
.await?;
submit_text(&forked.conversation, "fork message one").await?;
submit_text(&forked.conversation, "fork message two").await?;
let resumed = builder
.resume(&server, initial.home.clone(), entry.rollout_path.clone())
.await?;
let resumed_items = rollout_items_without_meta(&resumed.session_configured.rollout_path);
assert_eq!(
serde_json::to_value(baseline_items.clone())?,
serde_json::to_value(resumed_items)?
);
assert_eq!(
serde_json::to_value(baseline_items)?,
serde_json::to_value(rollout_items_without_meta(&entry.rollout_path))?
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn resumed_messages_are_present_in_new_fork() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_sequence(
&server,
vec![
completion_body(1, "original"),
completion_body(2, "fork-extra"),
completion_body(3, "resumed-extra"),
],
)
.await;
let mut builder = test_codex();
let initial = builder.build(&server).await?;
submit_text(&initial.codex, "start").await?;
let entry = save_session("seed", &initial.codex, &initial.config).await?;
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec);
let forked = conversation_manager
.fork_from_rollout(
initial.config.clone(),
entry.rollout_path.clone(),
auth_manager.clone(),
)
.await?;
submit_text(&forked.conversation, "fork only").await?;
let resumed = builder
.resume(&server, initial.home.clone(), entry.rollout_path.clone())
.await?;
submit_text(&resumed.codex, "resumed addition").await?;
resumed.codex.flush_rollout().await?;
let updated_base_items = rollout_items_without_meta(&entry.rollout_path);
let fork_again = conversation_manager
.fork_from_rollout(
initial.config.clone(),
entry.rollout_path.clone(),
auth_manager,
)
.await?;
let fork_again_items = rollout_items_without_meta(&fork_again.conversation.rollout_path());
assert_eq!(
serde_json::to_value(updated_base_items)?,
serde_json::to_value(fork_again_items)?
);
assert_eq!(
session_meta_count(&fork_again.conversation.rollout_path()),
1
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn duplicate_name_overwrites_entry() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_sequence(
&server,
vec![completion_body(1, "one"), completion_body(2, "two")],
)
.await;
let mut builder = test_codex();
let first = builder.build(&server).await?;
submit_text(&first.codex, "first session").await?;
let name = "shared";
let entry_one = save_session(name, &first.codex, &first.config).await?;
let second = builder.build(&server).await?;
submit_text(&second.codex, "second session").await?;
let entry_two = save_session(name, &second.codex, &second.config).await?;
let resolved = resolve_saved_session(&second.config.codex_home, name)
.await?
.expect("latest entry present");
assert_eq!(resolved, entry_two);
assert_ne!(resolved.conversation_id, entry_one.conversation_id);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn same_session_multiple_names() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_sequence(&server, vec![completion_body(1, "hello")]).await;
let mut builder = test_codex();
let session = builder.build(&server).await?;
submit_text(&session.codex, "save twice").await?;
let entry_first = save_session("first", &session.codex, &session.config).await?;
let entry_second = save_session("second", &session.codex, &session.config).await?;
let resolved_first = resolve_saved_session(&session.config.codex_home, "first")
.await?
.expect("first entry");
let resolved_second = resolve_saved_session(&session.config.codex_home, "second")
.await?
.expect("second entry");
assert_eq!(entry_first.conversation_id, entry_second.conversation_id);
assert_eq!(
resolved_first.conversation_id,
resolved_second.conversation_id
);
assert_eq!(resolved_first.rollout_path, resolved_second.rollout_path);
let names: serde_json::Value = json!([entry_first.name, entry_second.name]);
assert_eq!(names, json!(["first", "second"]));
Ok(())
}

View File

@@ -567,6 +567,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
| EventMsg::ReasoningContentDelta(_)
| EventMsg::ReasoningRawContentDelta(_)
| EventMsg::UndoCompleted(_)
| EventMsg::SaveSessionResponse(_)
| EventMsg::UndoStarted(_) => {}
}
CodexStatus::Running

View File

@@ -302,6 +302,7 @@ async fn run_codex_tool_session_inner(
| EventMsg::UndoStarted(_)
| EventMsg::UndoCompleted(_)
| EventMsg::ExitedReviewMode(_)
| EventMsg::SaveSessionResponse(_)
| EventMsg::DeprecationNotice(_) => {
// For now, we do not do anything extra for these
// events. Note that

View File

@@ -102,6 +102,9 @@ pub enum Op {
final_output_json_schema: Option<Value>,
},
/// Persist the current session under a user-provided name.
SaveSession { name: String },
/// Override parts of the persistent turn context for subsequent turns.
///
/// All fields are optional; when omitted, the existing value is preserved.
@@ -513,6 +516,9 @@ pub enum EventMsg {
BackgroundEvent(BackgroundEventEvent),
/// Result of a save-session request.
SaveSessionResponse(SaveSessionResponseEvent),
UndoStarted(UndoStartedEvent),
UndoCompleted(UndoCompletedEvent),
@@ -1036,6 +1042,13 @@ impl InitialHistory {
}
}
pub fn without_session_meta(&self) -> Vec<RolloutItem> {
self.get_rollout_items()
.into_iter()
.filter(|item| !matches!(item, RolloutItem::SessionMeta(_)))
.collect()
}
pub fn get_event_msgs(&self) -> Option<Vec<EventMsg>> {
match self {
InitialHistory::New => None,
@@ -1096,6 +1109,8 @@ pub struct SessionMeta {
#[serde(default)]
pub source: SessionSource,
pub model_provider: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
}
impl Default for SessionMeta {
@@ -1109,6 +1124,7 @@ impl Default for SessionMeta {
instructions: None,
source: SessionSource::default(),
model_provider: None,
name: None,
}
}
}
@@ -1370,6 +1386,13 @@ pub struct StreamInfoEvent {
pub message: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct SaveSessionResponseEvent {
pub name: String,
pub rollout_path: PathBuf,
pub conversation_id: ConversationId,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct PatchApplyBeginEvent {
/// Identifier so this can be paired with the PatchApplyEnd event.

View File

@@ -7,6 +7,8 @@ use crate::diff_render::DiffSummary;
use crate::exec_command::strip_bash_lc_and_escape;
use crate::file_search::FileSearchManager;
use crate::history_cell::HistoryCell;
#[cfg(not(debug_assertions))]
use crate::history_cell::UpdateAvailableHistoryCell;
use crate::model_migration::ModelMigrationOutcome;
use crate::model_migration::migration_copy_for_config;
use crate::model_migration::run_model_migration_prompt;
@@ -53,9 +55,6 @@ use std::time::Duration;
use tokio::select;
use tokio::sync::mpsc::unbounded_channel;
#[cfg(not(debug_assertions))]
use crate::history_cell::UpdateAvailableHistoryCell;
const GPT_5_1_MIGRATION_AUTH_MODES: [AuthMode; 2] = [AuthMode::ChatGPT, AuthMode::ApiKey];
const GPT_5_1_CODEX_MIGRATION_AUTH_MODES: [AuthMode; 1] = [AuthMode::ChatGPT];
@@ -296,6 +295,27 @@ impl App {
resumed.session_configured,
)
}
ResumeSelection::Fork(path) => {
let resumed = conversation_manager
.fork_from_rollout(config.clone(), path.clone(), auth_manager.clone())
.await
.wrap_err_with(|| format!("Failed to fork session from {}", path.display()))?;
let init = crate::chatwidget::ChatWidgetInit {
config: config.clone(),
frame_requester: tui.frame_requester(),
app_event_tx: app_event_tx.clone(),
initial_prompt: initial_prompt.clone(),
initial_images: initial_images.clone(),
enhanced_keys_supported,
auth_manager: auth_manager.clone(),
feedback: feedback.clone(),
};
ChatWidget::new_from_existing(
init,
resumed.conversation,
resumed.session_configured,
)
}
};
chat_widget.maybe_prompt_windows_sandbox_enable();

View File

@@ -69,7 +69,10 @@ const LARGE_PASTE_CHAR_THRESHOLD: usize = 1000;
#[derive(Debug, PartialEq)]
pub enum InputResult {
Submitted(String),
Command(SlashCommand),
Command {
command: SlashCommand,
args: Option<String>,
},
None,
}
@@ -333,6 +336,19 @@ impl ChatComposer {
PasteBurst::recommended_flush_delay()
}
fn command_args_from_line(line: &str, command: SlashCommand) -> Option<String> {
if let Some((name, rest)) = parse_slash_name(line)
&& name == command.command()
{
let trimmed = rest.trim();
if trimmed.is_empty() {
return None;
}
return Some(trimmed.to_string());
}
None
}
/// Integrate results from an asynchronous file search.
pub(crate) fn on_file_search_result(&mut self, query: String, matches: Vec<FileMatch>) {
// Only apply if user is still editing a token starting with `query`.
@@ -505,8 +521,9 @@ impl ChatComposer {
if let Some(sel) = popup.selected_item() {
match sel {
CommandItem::Builtin(cmd) => {
let args = Self::command_args_from_line(first_line, cmd);
self.textarea.set_text("");
return (InputResult::Command(cmd), true);
return (InputResult::Command { command: cmd, args }, true);
}
CommandItem::UserPrompt(idx) => {
if let Some(prompt) = popup.prompt(idx) {
@@ -920,22 +937,21 @@ impl ChatComposer {
modifiers: KeyModifiers::NONE,
..
} => {
// If the first line is a bare built-in slash command (no args),
// dispatch it even when the slash popup isn't visible. This preserves
// the workflow: type a prefix ("/di"), press Tab to complete to
// "/diff ", then press Enter to run it. Tab moves the cursor beyond
// the '/name' token and our caret-based heuristic hides the popup,
// but Enter should still dispatch the command rather than submit
// literal text.
// If the first line is a built-in slash command, dispatch it even when
// the slash popup isn't visible. This preserves the workflow:
// type a prefix ("/di"), press Tab to complete to "/diff ", then press
// Enter to run it. Tab moves the cursor beyond the '/name' token and
// our caret-based heuristic hides the popup, but Enter should still
// dispatch the command rather than submit literal text.
let first_line = self.textarea.text().lines().next().unwrap_or("");
if let Some((name, rest)) = parse_slash_name(first_line)
&& rest.is_empty()
if let Some((name, _rest)) = parse_slash_name(first_line)
&& let Some((_n, cmd)) = built_in_slash_commands()
.into_iter()
.find(|(n, _)| *n == name)
{
let args = Self::command_args_from_line(first_line, cmd);
self.textarea.set_text("");
return (InputResult::Command(cmd), true);
return (InputResult::Command { command: cmd, args }, true);
}
// If we're in a paste-like burst capture, treat Enter as part of the burst
// and accumulate it rather than submitting or inserting immediately.
@@ -2399,8 +2415,9 @@ mod tests {
// When a slash command is dispatched, the composer should return a
// Command result (not submit literal text) and clear its textarea.
match result {
InputResult::Command(cmd) => {
InputResult::Command { command: cmd, args } => {
assert_eq!(cmd.command(), "init");
assert!(args.is_none());
}
InputResult::Submitted(text) => {
panic!("expected command dispatch, but composer submitted literal text: {text}")
@@ -2474,7 +2491,10 @@ mod tests {
let (result, _needs_redraw) =
composer.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
match result {
InputResult::Command(cmd) => assert_eq!(cmd.command(), "diff"),
InputResult::Command { command: cmd, args } => {
assert_eq!(cmd.command(), "diff");
assert!(args.is_none());
}
InputResult::Submitted(text) => {
panic!("expected command dispatch after Tab completion, got literal submit: {text}")
}
@@ -2483,6 +2503,42 @@ mod tests {
assert!(composer.textarea.is_empty());
}
#[test]
fn slash_command_with_args_dispatches_and_preserves_args() {
use crossterm::event::KeyCode;
use crossterm::event::KeyEvent;
use crossterm::event::KeyModifiers;
let (tx, _rx) = unbounded_channel::<AppEvent>();
let sender = AppEventSender::new(tx);
let mut composer = ChatComposer::new(
true,
sender,
false,
"Ask Codex to do anything".to_string(),
false,
);
composer.textarea.set_text("/save feature-one");
let (result, _needs_redraw) =
composer.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
match result {
InputResult::Command { command: cmd, args } => {
assert_eq!(cmd, SlashCommand::Save);
assert_eq!(args.as_deref(), Some("feature-one"));
}
InputResult::Submitted(text) => {
panic!(
"expected slash command dispatch, but composer submitted literal text: {text}"
)
}
InputResult::None => panic!("expected Command result for '/save feature-one'"),
}
assert!(composer.textarea.is_empty());
}
#[test]
fn slash_mention_dispatches_command_and_inserts_at() {
use crossterm::event::KeyCode;
@@ -2505,8 +2561,9 @@ mod tests {
composer.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
match result {
InputResult::Command(cmd) => {
InputResult::Command { command: cmd, args } => {
assert_eq!(cmd.command(), "mention");
assert!(args.is_none());
}
InputResult::Submitted(text) => {
panic!("expected command dispatch, but composer submitted literal text: {text}")

View File

@@ -1326,8 +1326,8 @@ impl ChatWidget {
};
self.queue_user_message(user_message);
}
InputResult::Command(cmd) => {
self.dispatch_command(cmd);
InputResult::Command { command: cmd, args } => {
self.dispatch_command(cmd, args);
}
InputResult::None => {}
}
@@ -1350,7 +1350,7 @@ impl ChatWidget {
self.request_redraw();
}
fn dispatch_command(&mut self, cmd: SlashCommand) {
fn dispatch_command(&mut self, cmd: SlashCommand, args: Option<String>) {
if !cmd.available_during_task() && self.bottom_pane.is_task_running() {
let message = format!(
"'/{}' is disabled while a task is in progress.",
@@ -1371,6 +1371,9 @@ impl ChatWidget {
SlashCommand::New => {
self.app_event_tx.send(AppEvent::NewSession);
}
SlashCommand::Save => {
self.handle_save_command(args);
}
SlashCommand::Init => {
let init_target = self.config.cwd.join(DEFAULT_PROJECT_DOC_FILENAME);
if init_target.exists() {
@@ -1488,6 +1491,31 @@ impl ChatWidget {
}
}
fn handle_save_command(&mut self, args: Option<String>) {
let Some(name_raw) = args else {
self.add_to_history(history_cell::new_error_event(
"Usage: /save <name>".to_string(),
));
return;
};
let name = name_raw.trim();
if name.is_empty() {
self.add_to_history(history_cell::new_error_event(
"Usage: /save <name>".to_string(),
));
return;
}
if self.conversation_id.is_none() {
self.add_to_history(history_cell::new_error_event(
"Session is not ready yet; try /save again in a moment.".to_string(),
));
return;
}
self.app_event_tx
.send(AppEvent::CodexOp(Op::SaveSession { name: name.into() }));
self.add_info_message(format!("Saving session '{name}'..."), None);
}
pub(crate) fn handle_paste(&mut self, text: String) {
self.bottom_pane.handle_paste(text);
}
@@ -1658,6 +1686,12 @@ impl ChatWidget {
EventMsg::Error(ErrorEvent { message }) => self.on_error(message),
EventMsg::McpStartupUpdate(ev) => self.on_mcp_startup_update(ev),
EventMsg::McpStartupComplete(ev) => self.on_mcp_startup_complete(ev),
EventMsg::SaveSessionResponse(ev) => {
self.add_info_message(
format!("Saved session '{}' ({}).", ev.name, ev.conversation_id),
Some(format!("Rollout: {}", ev.rollout_path.display())),
);
}
EventMsg::TurnAborted(ev) => match ev.reason {
TurnAbortReason::Interrupted => {
self.on_interrupted_turn(ev.reason);

View File

@@ -1049,7 +1049,7 @@ fn slash_init_skips_when_project_doc_exists() {
std::fs::write(&existing_path, "existing instructions").unwrap();
chat.config.cwd = tempdir.path().to_path_buf();
chat.dispatch_command(SlashCommand::Init);
chat.dispatch_command(SlashCommand::Init, None);
match op_rx.try_recv() {
Err(TryRecvError::Empty) => {}
@@ -1077,7 +1077,7 @@ fn slash_init_skips_when_project_doc_exists() {
fn slash_quit_requests_exit() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
chat.dispatch_command(SlashCommand::Quit);
chat.dispatch_command(SlashCommand::Quit, None);
assert_matches!(rx.try_recv(), Ok(AppEvent::ExitRequest));
}
@@ -1086,7 +1086,7 @@ fn slash_quit_requests_exit() {
fn slash_exit_requests_exit() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
chat.dispatch_command(SlashCommand::Exit);
chat.dispatch_command(SlashCommand::Exit, None);
assert_matches!(rx.try_recv(), Ok(AppEvent::ExitRequest));
}
@@ -1095,7 +1095,7 @@ fn slash_exit_requests_exit() {
fn slash_undo_sends_op() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
chat.dispatch_command(SlashCommand::Undo);
chat.dispatch_command(SlashCommand::Undo, None);
match rx.try_recv() {
Ok(AppEvent::CodexOp(Op::Undo)) => {}
@@ -1109,7 +1109,7 @@ fn slash_rollout_displays_current_path() {
let rollout_path = PathBuf::from("/tmp/codex-test-rollout.jsonl");
chat.current_rollout_path = Some(rollout_path.clone());
chat.dispatch_command(SlashCommand::Rollout);
chat.dispatch_command(SlashCommand::Rollout, None);
let cells = drain_insert_history(&mut rx);
assert_eq!(cells.len(), 1, "expected info message for rollout path");
@@ -1124,7 +1124,7 @@ fn slash_rollout_displays_current_path() {
fn slash_rollout_handles_missing_path() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
chat.dispatch_command(SlashCommand::Rollout);
chat.dispatch_command(SlashCommand::Rollout, None);
let cells = drain_insert_history(&mut rx);
assert_eq!(
@@ -1719,7 +1719,7 @@ fn feedback_selection_popup_snapshot() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual();
// Open the feedback category selection popup via slash command.
chat.dispatch_command(SlashCommand::Feedback);
chat.dispatch_command(SlashCommand::Feedback, None);
let popup = render_bottom_popup(&chat, 80);
assert_snapshot!("feedback_selection_popup", popup);
@@ -1797,7 +1797,7 @@ fn disabled_slash_command_while_task_running_snapshot() {
chat.bottom_pane.set_task_running(true);
// Dispatch a command that is unavailable while a task runs (e.g., /model)
chat.dispatch_command(SlashCommand::Model);
chat.dispatch_command(SlashCommand::Model, None);
// Drain history and snapshot the rendered error line(s)
let cells = drain_insert_history(&mut rx);

View File

@@ -32,6 +32,9 @@ pub struct Cli {
#[clap(skip)]
pub resume_show_all: bool,
#[clap(skip)]
pub fork_source: Option<String>,
/// Model the agent should use.
#[arg(long, short = 'm')]
pub model: Option<String>,

View File

@@ -19,9 +19,9 @@ use codex_core::config::ConfigOverrides;
use codex_core::config::find_codex_home;
use codex_core::config::load_config_as_toml_with_cli_overrides;
use codex_core::config::resolve_oss_provider;
use codex_core::find_conversation_path_by_id_str;
use codex_core::get_platform_sandbox;
use codex_core::protocol::AskForApproval;
use codex_core::resolve_rollout_path;
use codex_protocol::config_types::SandboxMode;
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
use std::fs::OpenOptions;
@@ -429,8 +429,29 @@ async fn run_ratatui_app(
};
// Determine resume behavior: explicit id, then resume last, then picker.
let resume_selection = if let Some(id_str) = cli.resume_session_id.as_deref() {
match find_conversation_path_by_id_str(&config.codex_home, id_str).await? {
let resume_selection = if let Some(target) = cli.fork_source.as_deref() {
match resolve_rollout_path(&config.codex_home, target).await? {
Some(path) => resume_picker::ResumeSelection::Fork(path),
None => {
error!("Error finding conversation path: {target}");
restore();
session_log::log_session_end();
let _ = tui.terminal.clear();
if let Err(err) = writeln!(
std::io::stdout(),
"No saved session or rollout found for '{target}'. Run `codex resume` without a name to choose from existing sessions."
) {
error!("Failed to write resume error message: {err}");
}
return Ok(AppExitInfo {
token_usage: codex_core::protocol::TokenUsage::default(),
conversation_id: None,
update_action: None,
});
}
}
} else if let Some(id_str) = cli.resume_session_id.as_deref() {
match resolve_rollout_path(&config.codex_home, id_str).await? {
Some(path) => resume_picker::ResumeSelection::Resume(path),
None => {
error!("Error finding conversation path: {id_str}");
@@ -439,7 +460,7 @@ async fn run_ratatui_app(
let _ = tui.terminal.clear();
if let Err(err) = writeln!(
std::io::stdout(),
"No saved session found with ID {id_str}. Run `codex resume` without an ID to choose from existing sessions."
"No saved session or rollout found for '{id_str}'. Run `codex resume` without a name to choose from existing sessions."
) {
error!("Failed to write resume error message: {err}");
}

View File

@@ -42,6 +42,7 @@ const LOAD_NEAR_THRESHOLD: usize = 5;
pub enum ResumeSelection {
StartFresh,
Resume(PathBuf),
Fork(PathBuf),
Exit,
}

View File

@@ -16,6 +16,7 @@ pub enum SlashCommand {
Approvals,
Review,
New,
Save,
Init,
Compact,
Undo,
@@ -37,6 +38,7 @@ impl SlashCommand {
match self {
SlashCommand::Feedback => "send logs to maintainers",
SlashCommand::New => "start a new chat during a conversation",
SlashCommand::Save => "save this session so it can be resumed later",
SlashCommand::Init => "create an AGENTS.md file with instructions for Codex",
SlashCommand::Compact => "summarize conversation to prevent hitting the context limit",
SlashCommand::Review => "review my current changes and find issues",
@@ -64,6 +66,7 @@ impl SlashCommand {
pub fn available_during_task(self) -> bool {
match self {
SlashCommand::New
| SlashCommand::Save
| SlashCommand::Init
| SlashCommand::Compact
| SlashCommand::Undo