mirror of
https://github.com/openai/codex.git
synced 2026-02-02 06:57:03 +00:00
Compare commits
15 Commits
exec-run-a
...
jif/fork
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
63a3f3941a | ||
|
|
7ceabac707 | ||
|
|
2a24ae36c2 | ||
|
|
5071cc8fff | ||
|
|
9c765f1217 | ||
|
|
7116d2a6a4 | ||
|
|
94f1a61df5 | ||
|
|
7ec1311aff | ||
|
|
b9f260057c | ||
|
|
72af9e3092 | ||
|
|
8f0d83eb11 | ||
|
|
26d667e152 | ||
|
|
d1cf2b967c | ||
|
|
78afe914e1 | ||
|
|
1a8c1a4d9a |
@@ -148,7 +148,6 @@ use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
use tokio::select;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::error;
|
||||
@@ -2239,51 +2238,25 @@ impl CodexMessageProcessor {
|
||||
.await
|
||||
{
|
||||
info!("conversation {conversation_id} was active; shutting down");
|
||||
let conversation_clone = conversation.clone();
|
||||
let notify = Arc::new(tokio::sync::Notify::new());
|
||||
let notify_clone = notify.clone();
|
||||
// Do not wait on conversation.next_event(); the listener task already consumes
|
||||
// the stream. Request shutdown and ensure the rollout file is flushed before moving it.
|
||||
if let Err(err) = conversation.submit(Op::Shutdown).await {
|
||||
error!("failed to submit Shutdown to conversation {conversation_id}: {err}");
|
||||
}
|
||||
|
||||
// Establish the listener for ShutdownComplete before submitting
|
||||
// Shutdown so it is not missed.
|
||||
let is_shutdown = tokio::spawn(async move {
|
||||
// Create the notified future outside the loop to avoid losing notifications.
|
||||
let notified = notify_clone.notified();
|
||||
tokio::pin!(notified);
|
||||
loop {
|
||||
select! {
|
||||
_ = &mut notified => { break; }
|
||||
event = conversation_clone.next_event() => {
|
||||
match event {
|
||||
Ok(event) => {
|
||||
if matches!(event.msg, EventMsg::ShutdownComplete) { break; }
|
||||
}
|
||||
// Break on errors to avoid tight loops when the agent loop has exited.
|
||||
Err(_) => { break; }
|
||||
}
|
||||
}
|
||||
}
|
||||
let flush_result =
|
||||
tokio::time::timeout(Duration::from_secs(5), conversation.flush_rollout()).await;
|
||||
match flush_result {
|
||||
Ok(Ok(())) => {}
|
||||
Ok(Err(err)) => {
|
||||
warn!(
|
||||
"conversation {conversation_id} rollout flush failed before archive: {err}"
|
||||
);
|
||||
}
|
||||
});
|
||||
// Request shutdown.
|
||||
match conversation.submit(Op::Shutdown).await {
|
||||
Ok(_) => {
|
||||
// Successfully submitted Shutdown; wait before proceeding.
|
||||
select! {
|
||||
_ = is_shutdown => {
|
||||
// Normal shutdown: proceed with archive.
|
||||
}
|
||||
_ = tokio::time::sleep(Duration::from_secs(10)) => {
|
||||
warn!("conversation {conversation_id} shutdown timed out; proceeding with archive");
|
||||
// Wake any waiter; use notify_waiters to avoid missing the signal.
|
||||
notify.notify_waiters();
|
||||
// Perhaps we lost a shutdown race, so let's continue to
|
||||
// clean up the .jsonl file.
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!("failed to submit Shutdown to conversation {conversation_id}: {err}");
|
||||
notify.notify_waiters();
|
||||
Err(_) => {
|
||||
warn!(
|
||||
"conversation {conversation_id} rollout flush timed out; proceeding with archive"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2295,7 +2268,8 @@ impl CodexMessageProcessor {
|
||||
.codex_home
|
||||
.join(codex_core::ARCHIVED_SESSIONS_SUBDIR);
|
||||
tokio::fs::create_dir_all(&archive_folder).await?;
|
||||
tokio::fs::rename(&canonical_rollout_path, &archive_folder.join(&file_name)).await?;
|
||||
let destination = archive_folder.join(&file_name);
|
||||
tokio::fs::rename(&canonical_rollout_path, &destination).await?;
|
||||
Ok(())
|
||||
}
|
||||
.await;
|
||||
|
||||
@@ -46,6 +46,7 @@ pub fn create_fake_rollout(
|
||||
instructions: None,
|
||||
source: SessionSource::Cli,
|
||||
model_provider: model_provider.map(str::to_string),
|
||||
name: None,
|
||||
})?;
|
||||
|
||||
let lines = [
|
||||
|
||||
@@ -100,6 +100,9 @@ enum Subcommand {
|
||||
/// Resume a previous interactive session (picker by default; use --last to continue the most recent).
|
||||
Resume(ResumeCommand),
|
||||
|
||||
/// Fork an existing session into a new conversation.
|
||||
Fork(ForkCommand),
|
||||
|
||||
/// [EXPERIMENTAL] Browse tasks from Codex Cloud and apply changes locally.
|
||||
#[clap(name = "cloud", alias = "cloud-tasks")]
|
||||
Cloud(CloudTasksCli),
|
||||
@@ -142,6 +145,16 @@ struct ResumeCommand {
|
||||
config_overrides: TuiCli,
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
struct ForkCommand {
|
||||
/// Resume from a saved session name or rollout id, but start a new conversation.
|
||||
#[arg(value_name = "ID|NAME")]
|
||||
target: String,
|
||||
|
||||
#[clap(flatten)]
|
||||
config_overrides: TuiCli,
|
||||
}
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
struct SandboxArgs {
|
||||
#[command(subcommand)]
|
||||
@@ -466,6 +479,19 @@ async fn cli_main(codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()
|
||||
let exit_info = codex_tui::run_main(interactive, codex_linux_sandbox_exe).await?;
|
||||
handle_app_exit(exit_info)?;
|
||||
}
|
||||
Some(Subcommand::Fork(ForkCommand {
|
||||
target,
|
||||
config_overrides,
|
||||
})) => {
|
||||
interactive = finalize_fork_interactive(
|
||||
interactive,
|
||||
root_config_overrides.clone(),
|
||||
target,
|
||||
config_overrides,
|
||||
);
|
||||
let exit_info = codex_tui::run_main(interactive, codex_linux_sandbox_exe).await?;
|
||||
handle_app_exit(exit_info)?;
|
||||
}
|
||||
Some(Subcommand::Login(mut login_cli)) => {
|
||||
prepend_config_flags(
|
||||
&mut login_cli.config_overrides,
|
||||
@@ -627,6 +653,7 @@ fn finalize_resume_interactive(
|
||||
interactive.resume_last = last;
|
||||
interactive.resume_session_id = resume_session_id;
|
||||
interactive.resume_show_all = show_all;
|
||||
interactive.fork_source = None;
|
||||
|
||||
// Merge resume-scoped flags and overrides with highest precedence.
|
||||
merge_resume_cli_flags(&mut interactive, resume_cli);
|
||||
@@ -637,6 +664,21 @@ fn finalize_resume_interactive(
|
||||
interactive
|
||||
}
|
||||
|
||||
fn finalize_fork_interactive(
|
||||
mut interactive: TuiCli,
|
||||
root_config_overrides: CliConfigOverrides,
|
||||
target: String,
|
||||
fork_cli: TuiCli,
|
||||
) -> TuiCli {
|
||||
interactive.resume_picker = false;
|
||||
interactive.resume_last = false;
|
||||
interactive.resume_session_id = None;
|
||||
interactive.fork_source = Some(target);
|
||||
merge_resume_cli_flags(&mut interactive, fork_cli);
|
||||
prepend_config_flags(&mut interactive.config_overrides, root_config_overrides);
|
||||
interactive
|
||||
}
|
||||
|
||||
/// Merge flags provided to `codex resume` so they take precedence over any
|
||||
/// root-level flags. Only overrides fields explicitly set on the resume-scoped
|
||||
/// CLI. Also appends `-c key=value` overrides with highest precedence.
|
||||
@@ -727,6 +769,26 @@ mod tests {
|
||||
)
|
||||
}
|
||||
|
||||
fn fork_from_args(args: &[&str]) -> TuiCli {
|
||||
let cli = MultitoolCli::try_parse_from(args).expect("parse");
|
||||
let MultitoolCli {
|
||||
interactive,
|
||||
config_overrides: root_overrides,
|
||||
subcommand,
|
||||
feature_toggles: _,
|
||||
} = cli;
|
||||
|
||||
let Subcommand::Fork(ForkCommand {
|
||||
target,
|
||||
config_overrides,
|
||||
}) = subcommand.expect("fork present")
|
||||
else {
|
||||
unreachable!()
|
||||
};
|
||||
|
||||
finalize_fork_interactive(interactive, root_overrides, target, config_overrides)
|
||||
}
|
||||
|
||||
fn sample_exit_info(conversation: Option<&str>) -> AppExitInfo {
|
||||
let token_usage = TokenUsage {
|
||||
output_tokens: 2,
|
||||
@@ -819,6 +881,15 @@ mod tests {
|
||||
assert!(interactive.resume_show_all);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fork_sets_target_and_disables_resume_controls() {
|
||||
let interactive = fork_from_args(["codex", "fork", "saved"].as_ref());
|
||||
assert_eq!(interactive.fork_source.as_deref(), Some("saved"));
|
||||
assert!(!interactive.resume_picker);
|
||||
assert!(!interactive.resume_last);
|
||||
assert!(interactive.resume_session_id.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn resume_merges_option_flags_and_full_auto() {
|
||||
let interactive = finalize_from_args(
|
||||
|
||||
@@ -100,6 +100,8 @@ use crate::protocol::TurnDiffEvent;
|
||||
use crate::protocol::WarningEvent;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::RolloutRecorderParams;
|
||||
use crate::saved_sessions::build_saved_session_entry;
|
||||
use crate::saved_sessions::upsert_saved_session;
|
||||
use crate::shell;
|
||||
use crate::state::ActiveTurn;
|
||||
use crate::state::SessionServices;
|
||||
@@ -134,6 +136,8 @@ use codex_protocol::user_input::UserInput;
|
||||
use codex_utils_readiness::Readiness;
|
||||
use codex_utils_readiness::ReadinessFlag;
|
||||
use codex_utils_tokenizer::warm_model_cache;
|
||||
use reqwest::StatusCode;
|
||||
use std::path::Path;
|
||||
|
||||
/// The high-level interface to the Codex system.
|
||||
/// It operates as a queue pair where you send submissions and receive events.
|
||||
@@ -149,6 +153,7 @@ pub struct Codex {
|
||||
pub struct CodexSpawnOk {
|
||||
pub codex: Codex,
|
||||
pub conversation_id: ConversationId,
|
||||
pub(crate) session: Arc<Session>,
|
||||
}
|
||||
|
||||
pub(crate) const INITIAL_SUBMIT_ID: &str = "";
|
||||
@@ -209,7 +214,8 @@ impl Codex {
|
||||
let conversation_id = session.conversation_id;
|
||||
|
||||
// This task will run until Op::Shutdown is received.
|
||||
tokio::spawn(submission_loop(session, config, rx_sub));
|
||||
let submission_session = Arc::clone(&session);
|
||||
tokio::spawn(submission_loop(submission_session, config, rx_sub));
|
||||
let codex = Codex {
|
||||
next_id: AtomicU64::new(0),
|
||||
tx_sub,
|
||||
@@ -219,6 +225,7 @@ impl Codex {
|
||||
Ok(CodexSpawnOk {
|
||||
codex,
|
||||
conversation_id,
|
||||
session,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -632,18 +639,68 @@ impl Session {
|
||||
}
|
||||
|
||||
/// Ensure all rollout writes are durably flushed.
|
||||
pub(crate) async fn flush_rollout(&self) {
|
||||
pub(crate) async fn flush_rollout(&self) -> std::io::Result<()> {
|
||||
let recorder = {
|
||||
let guard = self.services.rollout.lock().await;
|
||||
guard.clone()
|
||||
};
|
||||
if let Some(rec) = recorder
|
||||
&& let Err(e) = rec.flush().await
|
||||
{
|
||||
warn!("failed to flush rollout recorder: {e}");
|
||||
if let Some(rec) = recorder {
|
||||
rec.flush().await
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn set_session_name(&self, name: Option<String>) -> std::io::Result<()> {
|
||||
let recorder = {
|
||||
let guard = self.services.rollout.lock().await;
|
||||
guard.clone()
|
||||
};
|
||||
if let Some(rec) = recorder {
|
||||
rec.set_session_name(name).await
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn rollout_path(&self) -> CodexResult<PathBuf> {
|
||||
let guard = self.services.rollout.lock().await;
|
||||
let Some(rec) = guard.as_ref() else {
|
||||
return Err(CodexErr::Fatal(
|
||||
"Rollout recorder is not initialized; cannot save session.".to_string(),
|
||||
));
|
||||
};
|
||||
Ok(rec.rollout_path.clone())
|
||||
}
|
||||
|
||||
pub(crate) async fn model(&self) -> String {
|
||||
let state = self.state.lock().await;
|
||||
state.session_configuration.model.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn save_session(
|
||||
&self,
|
||||
codex_home: &Path,
|
||||
name: &str,
|
||||
) -> CodexResult<crate::SavedSessionEntry> {
|
||||
let trimmed = name.trim();
|
||||
if trimmed.is_empty() {
|
||||
return Err(CodexErr::Fatal("Usage: /save <name>".to_string()));
|
||||
}
|
||||
let rollout_path = self.rollout_path().await?;
|
||||
self.flush_rollout()
|
||||
.await
|
||||
.map_err(|e| CodexErr::Fatal(format!("failed to flush rollout recorder: {e}")))?;
|
||||
self.set_session_name(Some(trimmed.to_string()))
|
||||
.await
|
||||
.map_err(|e| CodexErr::Fatal(format!("failed to write session name: {e}")))?;
|
||||
let entry =
|
||||
build_saved_session_entry(trimmed.to_string(), rollout_path, self.model().await)
|
||||
.await?;
|
||||
upsert_saved_session(codex_home, entry.clone()).await?;
|
||||
Ok(entry)
|
||||
}
|
||||
|
||||
fn next_internal_sub_id(&self) -> String {
|
||||
let id = self
|
||||
.next_internal_sub_id
|
||||
@@ -659,7 +716,9 @@ impl Session {
|
||||
let items = self.build_initial_context(&turn_context);
|
||||
self.record_conversation_items(&turn_context, &items).await;
|
||||
// Ensure initial items are visible to immediate readers (e.g., tests, forks).
|
||||
self.flush_rollout().await;
|
||||
if let Err(e) = self.flush_rollout().await {
|
||||
warn!("failed to flush rollout recorder: {e}");
|
||||
}
|
||||
}
|
||||
InitialHistory::Resumed(_) | InitialHistory::Forked(_) => {
|
||||
let rollout_items = conversation_history.get_rollout_items();
|
||||
@@ -703,10 +762,18 @@ impl Session {
|
||||
|
||||
// If persisting, persist all rollout items as-is (recorder filters)
|
||||
if persist && !rollout_items.is_empty() {
|
||||
self.persist_rollout_items(&rollout_items).await;
|
||||
// Drop legacy SessionMeta lines from the source rollout so the forked
|
||||
// session only contains its own SessionMeta written by the recorder.
|
||||
let filtered =
|
||||
InitialHistory::Forked(rollout_items.clone()).without_session_meta();
|
||||
if !filtered.is_empty() {
|
||||
self.persist_rollout_items(&filtered).await;
|
||||
}
|
||||
}
|
||||
// Flush after seeding history and any persisted rollout copy.
|
||||
self.flush_rollout().await;
|
||||
if let Err(e) = self.flush_rollout().await {
|
||||
warn!("failed to flush rollout recorder: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1425,6 +1492,9 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
||||
Op::Review { review_request } => {
|
||||
handlers::review(&sess, &config, sub.id.clone(), review_request).await;
|
||||
}
|
||||
Op::SaveSession { name } => {
|
||||
handlers::save_session(&sess, &config, sub.id.clone(), name).await;
|
||||
}
|
||||
_ => {} // Ignore unknown ops; enum is non_exhaustive to allow extensions.
|
||||
}
|
||||
}
|
||||
@@ -1452,6 +1522,7 @@ mod handlers {
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::ReviewDecision;
|
||||
use codex_protocol::protocol::ReviewRequest;
|
||||
use codex_protocol::protocol::SaveSessionResponseEvent;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
|
||||
use codex_protocol::user_input::UserInput;
|
||||
@@ -1673,6 +1744,38 @@ mod handlers {
|
||||
.await;
|
||||
}
|
||||
|
||||
pub async fn save_session(
|
||||
sess: &Arc<Session>,
|
||||
config: &Arc<crate::config::Config>,
|
||||
sub_id: String,
|
||||
name: String,
|
||||
) {
|
||||
match sess.save_session(&config.codex_home, &name).await {
|
||||
Ok(entry) => {
|
||||
let event = Event {
|
||||
id: sub_id,
|
||||
msg: EventMsg::SaveSessionResponse(SaveSessionResponseEvent {
|
||||
name: entry.name,
|
||||
rollout_path: entry.rollout_path,
|
||||
conversation_id: entry.conversation_id,
|
||||
}),
|
||||
};
|
||||
sess.send_event_raw(event).await;
|
||||
}
|
||||
Err(err) => {
|
||||
let message = format!("Failed to save session '{name}': {err}");
|
||||
let event = Event {
|
||||
id: sub_id,
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message,
|
||||
http_status_code: None,
|
||||
}),
|
||||
};
|
||||
sess.send_event_raw(event).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn shutdown(sess: &Arc<Session>, sub_id: String) -> bool {
|
||||
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
|
||||
info!("Shutting down Codex instance");
|
||||
|
||||
@@ -1,22 +1,26 @@
|
||||
use crate::codex::Codex;
|
||||
use crate::codex::Session;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::protocol::Event;
|
||||
use crate::protocol::Op;
|
||||
use crate::protocol::Submission;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct CodexConversation {
|
||||
codex: Codex,
|
||||
rollout_path: PathBuf,
|
||||
session: Arc<Session>,
|
||||
}
|
||||
|
||||
/// Conduit for the bidirectional stream of messages that compose a conversation
|
||||
/// in Codex.
|
||||
impl CodexConversation {
|
||||
pub(crate) fn new(codex: Codex, rollout_path: PathBuf) -> Self {
|
||||
pub(crate) fn new(codex: Codex, rollout_path: PathBuf, session: Arc<Session>) -> Self {
|
||||
Self {
|
||||
codex,
|
||||
rollout_path,
|
||||
session,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,4 +40,24 @@ impl CodexConversation {
|
||||
pub fn rollout_path(&self) -> PathBuf {
|
||||
self.rollout_path.clone()
|
||||
}
|
||||
|
||||
pub async fn flush_rollout(&self) -> CodexResult<()> {
|
||||
Ok(self.session.flush_rollout().await?)
|
||||
}
|
||||
|
||||
pub async fn set_session_name(&self, name: Option<String>) -> CodexResult<()> {
|
||||
Ok(self.session.set_session_name(name).await?)
|
||||
}
|
||||
|
||||
pub async fn model(&self) -> String {
|
||||
self.session.model().await
|
||||
}
|
||||
|
||||
pub async fn save_session(
|
||||
&self,
|
||||
codex_home: &std::path::Path,
|
||||
name: &str,
|
||||
) -> CodexResult<crate::SavedSessionEntry> {
|
||||
self.session.save_session(codex_home, name).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ use crate::CodexAuth;
|
||||
use crate::codex::Codex;
|
||||
use crate::codex::CodexSpawnOk;
|
||||
use crate::codex::INITIAL_SUBMIT_ID;
|
||||
use crate::codex::Session;
|
||||
use crate::codex_conversation::CodexConversation;
|
||||
use crate::config::Config;
|
||||
use crate::error::CodexErr;
|
||||
@@ -11,6 +12,7 @@ use crate::protocol::Event;
|
||||
use crate::protocol::EventMsg;
|
||||
use crate::protocol::SessionConfiguredEvent;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::saved_sessions::resolve_rollout_path;
|
||||
use codex_protocol::ConversationId;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
@@ -18,6 +20,7 @@ use codex_protocol::protocol::InitialHistory;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use std::collections::HashMap;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::RwLock;
|
||||
@@ -56,6 +59,7 @@ impl ConversationManager {
|
||||
)
|
||||
}
|
||||
|
||||
/// Start a brand new conversation with default initial history.
|
||||
pub async fn new_conversation(&self, config: Config) -> CodexResult<NewConversation> {
|
||||
self.spawn_conversation(config, self.auth_manager.clone())
|
||||
.await
|
||||
@@ -69,6 +73,7 @@ impl ConversationManager {
|
||||
let CodexSpawnOk {
|
||||
codex,
|
||||
conversation_id,
|
||||
session,
|
||||
} = Codex::spawn(
|
||||
config,
|
||||
auth_manager,
|
||||
@@ -76,13 +81,14 @@ impl ConversationManager {
|
||||
self.session_source.clone(),
|
||||
)
|
||||
.await?;
|
||||
self.finalize_spawn(codex, conversation_id).await
|
||||
self.finalize_spawn(codex, conversation_id, session).await
|
||||
}
|
||||
|
||||
async fn finalize_spawn(
|
||||
&self,
|
||||
codex: Codex,
|
||||
conversation_id: ConversationId,
|
||||
session: Arc<Session>,
|
||||
) -> CodexResult<NewConversation> {
|
||||
// The first event must be `SessionInitialized`. Validate and forward it
|
||||
// to the caller so that they can display it in the conversation
|
||||
@@ -101,6 +107,7 @@ impl ConversationManager {
|
||||
let conversation = Arc::new(CodexConversation::new(
|
||||
codex,
|
||||
session_configured.rollout_path.clone(),
|
||||
session,
|
||||
));
|
||||
self.conversations
|
||||
.write()
|
||||
@@ -125,6 +132,7 @@ impl ConversationManager {
|
||||
.ok_or_else(|| CodexErr::ConversationNotFound(conversation_id))
|
||||
}
|
||||
|
||||
/// Resume a conversation from an on-disk rollout file.
|
||||
pub async fn resume_conversation_from_rollout(
|
||||
&self,
|
||||
config: Config,
|
||||
@@ -136,6 +144,23 @@ impl ConversationManager {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Resume a conversation by saved-session name or rollout id string.
|
||||
pub async fn resume_conversation_from_identifier(
|
||||
&self,
|
||||
config: Config,
|
||||
identifier: &str,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
) -> CodexResult<NewConversation> {
|
||||
let Some(path) = resolve_rollout_path(&config.codex_home, identifier).await? else {
|
||||
return Err(CodexErr::Fatal(format!(
|
||||
"No saved session or rollout found for '{identifier}'"
|
||||
)));
|
||||
};
|
||||
self.resume_conversation_from_rollout(config, path, auth_manager)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Resume a conversation from provided rollout history items.
|
||||
pub async fn resume_conversation_with_history(
|
||||
&self,
|
||||
config: Config,
|
||||
@@ -145,6 +170,7 @@ impl ConversationManager {
|
||||
let CodexSpawnOk {
|
||||
codex,
|
||||
conversation_id,
|
||||
session,
|
||||
} = Codex::spawn(
|
||||
config,
|
||||
auth_manager,
|
||||
@@ -152,7 +178,54 @@ impl ConversationManager {
|
||||
self.session_source.clone(),
|
||||
)
|
||||
.await?;
|
||||
self.finalize_spawn(codex, conversation_id).await
|
||||
self.finalize_spawn(codex, conversation_id, session).await
|
||||
}
|
||||
|
||||
/// Fork a new conversation from the given rollout path.
|
||||
pub async fn fork_from_rollout(
|
||||
&self,
|
||||
config: Config,
|
||||
path: PathBuf,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
) -> CodexResult<NewConversation> {
|
||||
let initial_history = RolloutRecorder::get_rollout_history(&path).await?;
|
||||
let forked = match initial_history {
|
||||
InitialHistory::Resumed(resumed) => InitialHistory::Forked(resumed.history),
|
||||
InitialHistory::Forked(items) => InitialHistory::Forked(items),
|
||||
InitialHistory::New => InitialHistory::New,
|
||||
};
|
||||
self.resume_conversation_with_history(config, forked, auth_manager)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Fork a new conversation from a saved-session name or rollout id string.
|
||||
pub async fn fork_from_identifier(
|
||||
&self,
|
||||
config: Config,
|
||||
identifier: &str,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
) -> CodexResult<NewConversation> {
|
||||
let Some(path) = resolve_rollout_path(&config.codex_home, identifier).await? else {
|
||||
return Err(CodexErr::Fatal(format!(
|
||||
"No saved session or rollout found for '{identifier}'"
|
||||
)));
|
||||
};
|
||||
self.fork_from_rollout(config, path, auth_manager).await
|
||||
}
|
||||
|
||||
/// Persist a human-friendly session name and record it in saved_sessions.json.
|
||||
pub async fn save_session(
|
||||
&self,
|
||||
conversation_id: ConversationId,
|
||||
codex_home: &Path,
|
||||
name: &str,
|
||||
) -> CodexResult<crate::SavedSessionEntry> {
|
||||
let trimmed = name.trim();
|
||||
if trimmed.is_empty() {
|
||||
return Err(CodexErr::Fatal("Usage: /save <name>".to_string()));
|
||||
}
|
||||
let conversation = self.get_conversation(conversation_id).await?;
|
||||
conversation.save_session(codex_home, trimmed).await
|
||||
}
|
||||
|
||||
/// Removes the conversation from the manager's internal map, though the
|
||||
@@ -185,9 +258,10 @@ impl ConversationManager {
|
||||
let CodexSpawnOk {
|
||||
codex,
|
||||
conversation_id,
|
||||
session,
|
||||
} = Codex::spawn(config, auth_manager, history, self.session_source.clone()).await?;
|
||||
|
||||
self.finalize_spawn(codex, conversation_id).await
|
||||
self.finalize_spawn(codex, conversation_id, session).await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -66,6 +66,7 @@ mod openai_model_info;
|
||||
pub mod project_doc;
|
||||
mod rollout;
|
||||
pub(crate) mod safety;
|
||||
pub mod saved_sessions;
|
||||
pub mod seatbelt;
|
||||
pub mod shell;
|
||||
pub mod spawn;
|
||||
@@ -83,6 +84,12 @@ pub use rollout::list::ConversationsPage;
|
||||
pub use rollout::list::Cursor;
|
||||
pub use rollout::list::parse_cursor;
|
||||
pub use rollout::list::read_head_for_summary;
|
||||
pub use saved_sessions::SavedSessionEntry;
|
||||
pub use saved_sessions::build_saved_session_entry;
|
||||
pub use saved_sessions::list_saved_sessions;
|
||||
pub use saved_sessions::resolve_rollout_path;
|
||||
pub use saved_sessions::resolve_saved_session;
|
||||
pub use saved_sessions::upsert_saved_session;
|
||||
mod function_tool;
|
||||
mod state;
|
||||
mod tasks;
|
||||
|
||||
@@ -66,6 +66,7 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
|
||||
| EventMsg::ExecApprovalRequest(_)
|
||||
| EventMsg::ApplyPatchApprovalRequest(_)
|
||||
| EventMsg::BackgroundEvent(_)
|
||||
| EventMsg::SaveSessionResponse(_)
|
||||
| EventMsg::StreamError(_)
|
||||
| EventMsg::PatchApplyBegin(_)
|
||||
| EventMsg::PatchApplyEnd(_)
|
||||
|
||||
@@ -11,6 +11,8 @@ use serde_json::Value;
|
||||
use time::OffsetDateTime;
|
||||
use time::format_description::FormatItem;
|
||||
use time::macros::format_description;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncSeekExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::mpsc::{self};
|
||||
@@ -70,6 +72,10 @@ enum RolloutCmd {
|
||||
Shutdown {
|
||||
ack: oneshot::Sender<()>,
|
||||
},
|
||||
SetName {
|
||||
name: Option<String>,
|
||||
ack: oneshot::Sender<std::io::Result<()>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl RolloutRecorderParams {
|
||||
@@ -148,11 +154,14 @@ impl RolloutRecorder {
|
||||
instructions,
|
||||
source,
|
||||
model_provider: Some(config.model_provider_id.clone()),
|
||||
name: None,
|
||||
}),
|
||||
)
|
||||
}
|
||||
RolloutRecorderParams::Resume { path } => (
|
||||
tokio::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.append(true)
|
||||
.open(&path)
|
||||
.await?,
|
||||
@@ -196,6 +205,21 @@ impl RolloutRecorder {
|
||||
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
|
||||
}
|
||||
|
||||
/// Update the session name stored in the rollout's SessionMeta line.
|
||||
pub async fn set_session_name(&self, name: Option<String>) -> std::io::Result<()> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.tx
|
||||
.send(RolloutCmd::SetName { name, ack: tx })
|
||||
.await
|
||||
.map_err(|e| IoError::other(format!("failed to queue session name update: {e}")))?;
|
||||
match rx.await {
|
||||
Ok(result) => result,
|
||||
Err(e) => Err(IoError::other(format!(
|
||||
"failed waiting for session name update: {e}"
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
/// Flush all queued writes and wait until they are committed by the writer task.
|
||||
pub async fn flush(&self) -> std::io::Result<()> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
@@ -334,6 +358,7 @@ fn create_log_file(
|
||||
|
||||
let path = dir.join(filename);
|
||||
let file = std::fs::OpenOptions::new()
|
||||
.read(true)
|
||||
.append(true)
|
||||
.create(true)
|
||||
.open(&path)?;
|
||||
@@ -389,6 +414,10 @@ async fn rollout_writer(
|
||||
RolloutCmd::Shutdown { ack } => {
|
||||
let _ = ack.send(());
|
||||
}
|
||||
RolloutCmd::SetName { name, ack } => {
|
||||
let result = rewrite_session_meta_name(&mut writer.file, name).await;
|
||||
let _ = ack.send(result);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -422,3 +451,232 @@ impl JsonlWriter {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn rewrite_session_meta_name(
|
||||
file: &mut tokio::fs::File,
|
||||
name: Option<String>,
|
||||
) -> std::io::Result<()> {
|
||||
use std::io::SeekFrom;
|
||||
|
||||
file.flush().await?;
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
let mut contents = Vec::new();
|
||||
file.read_to_end(&mut contents).await?;
|
||||
if contents.is_empty() {
|
||||
return Err(IoError::other("empty rollout file"));
|
||||
}
|
||||
let newline_idx = contents
|
||||
.iter()
|
||||
.position(|&b| b == b'\n')
|
||||
.ok_or_else(|| IoError::other("rollout missing newline after SessionMeta"))?;
|
||||
let first_line = &contents[..newline_idx];
|
||||
let mut rollout_line: RolloutLine = serde_json::from_slice(first_line)
|
||||
.map_err(|e| IoError::other(format!("failed to parse SessionMeta: {e}")))?;
|
||||
let RolloutItem::SessionMeta(ref mut session_meta_line) = rollout_line.item else {
|
||||
return Err(IoError::other("first rollout item is not SessionMeta"));
|
||||
};
|
||||
session_meta_line.meta.name = name;
|
||||
let mut updated = serde_json::to_vec(&rollout_line)?;
|
||||
updated.push(b'\n');
|
||||
updated.extend_from_slice(&contents[newline_idx + 1..]);
|
||||
file.set_len(0).await?;
|
||||
file.seek(SeekFrom::Start(0)).await?;
|
||||
file.write_all(&updated).await?;
|
||||
file.flush().await?;
|
||||
file.seek(SeekFrom::End(0)).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::rewrite_session_meta_name;
|
||||
use codex_protocol::ConversationId;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use tempfile::NamedTempFile;
|
||||
use tokio::fs::OpenOptions;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncSeekExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
fn sample_meta(name: Option<&str>) -> RolloutItem {
|
||||
RolloutItem::SessionMeta(SessionMetaLine {
|
||||
meta: SessionMeta {
|
||||
id: ConversationId::from_string("00000000-0000-4000-8000-000000000001")
|
||||
.expect("conversation id"),
|
||||
timestamp: "2025-01-01T00:00:00.000Z".to_string(),
|
||||
cwd: "/tmp".into(),
|
||||
originator: "tester".to_string(),
|
||||
cli_version: "1.0.0".to_string(),
|
||||
instructions: None,
|
||||
source: codex_protocol::protocol::SessionSource::Cli,
|
||||
model_provider: Some("provider".to_string()),
|
||||
name: name.map(str::to_string),
|
||||
},
|
||||
git: None,
|
||||
})
|
||||
}
|
||||
|
||||
fn sample_line() -> RolloutLine {
|
||||
RolloutLine {
|
||||
timestamp: "2025-01-01T00:00:00.000Z".to_string(),
|
||||
item: sample_meta(None),
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_rollout(lines: &[RolloutLine]) -> (NamedTempFile, tokio::fs::File) {
|
||||
let temp = NamedTempFile::new().expect("temp file");
|
||||
let mut file = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.truncate(true)
|
||||
.create(true)
|
||||
.open(temp.path())
|
||||
.await
|
||||
.expect("open temp file");
|
||||
for line in lines {
|
||||
let mut json = serde_json::to_vec(line).expect("serialize line");
|
||||
json.push(b'\n');
|
||||
file.write_all(&json).await.expect("write line");
|
||||
}
|
||||
file.seek(std::io::SeekFrom::Start(0))
|
||||
.await
|
||||
.expect("rewind");
|
||||
(temp, file)
|
||||
}
|
||||
|
||||
async fn read_first_line(path: &std::path::Path) -> RolloutLine {
|
||||
let mut contents = String::new();
|
||||
let mut file = OpenOptions::new()
|
||||
.read(true)
|
||||
.open(path)
|
||||
.await
|
||||
.expect("open for read");
|
||||
file.read_to_string(&mut contents).await.expect("read file");
|
||||
let first = contents.lines().next().expect("first line");
|
||||
serde_json::from_str(first).expect("parse first line")
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn updates_meta_name_and_preserves_rest() {
|
||||
let events = vec![
|
||||
sample_line(),
|
||||
RolloutLine {
|
||||
timestamp: "2025-01-01T00:00:01.000Z".to_string(),
|
||||
item: RolloutItem::ResponseItem(ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: "hello".to_string(),
|
||||
}],
|
||||
}),
|
||||
},
|
||||
];
|
||||
let (temp, mut file) = write_rollout(&events).await;
|
||||
|
||||
rewrite_session_meta_name(&mut file, Some("renamed".to_string()))
|
||||
.await
|
||||
.expect("rewrite ok");
|
||||
|
||||
let first = read_first_line(temp.path()).await;
|
||||
let RolloutItem::SessionMeta(meta_line) = first.item else {
|
||||
panic!("expected SessionMeta line");
|
||||
};
|
||||
assert_eq!(meta_line.meta.name.as_deref(), Some("renamed"));
|
||||
|
||||
let contents = tokio::fs::read_to_string(temp.path())
|
||||
.await
|
||||
.expect("read file");
|
||||
let lines: Vec<_> = contents.lines().collect();
|
||||
assert_eq!(lines.len(), 2);
|
||||
let parsed: RolloutLine = serde_json::from_str(lines[1]).expect("parse second line");
|
||||
let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) = parsed.item
|
||||
else {
|
||||
panic!("expected response item");
|
||||
};
|
||||
assert_eq!(role, "assistant");
|
||||
assert_eq!(
|
||||
content,
|
||||
vec![ContentItem::OutputText {
|
||||
text: "hello".to_string()
|
||||
}]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn clearing_name_sets_none() {
|
||||
let mut first = sample_line();
|
||||
first.item = sample_meta(Some("existing"));
|
||||
let (temp, mut file) = write_rollout(&[first]).await;
|
||||
|
||||
rewrite_session_meta_name(&mut file, None)
|
||||
.await
|
||||
.expect("rewrite ok");
|
||||
|
||||
let first = read_first_line(temp.path()).await;
|
||||
let RolloutItem::SessionMeta(meta_line) = first.item else {
|
||||
panic!("expected SessionMeta line");
|
||||
};
|
||||
assert_eq!(meta_line.meta.name, None);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn errors_on_empty_file() {
|
||||
let temp = NamedTempFile::new().expect("temp file");
|
||||
let mut file = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(temp.path())
|
||||
.await
|
||||
.expect("open temp file");
|
||||
let err = rewrite_session_meta_name(&mut file, Some("x".to_string()))
|
||||
.await
|
||||
.expect_err("expected error");
|
||||
assert!(format!("{err}").contains("empty rollout file"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn errors_when_first_line_not_session_meta() {
|
||||
let wrong = RolloutLine {
|
||||
timestamp: "t".to_string(),
|
||||
item: RolloutItem::ResponseItem(ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: "hello".to_string(),
|
||||
}],
|
||||
}),
|
||||
};
|
||||
let (_temp, mut file) = write_rollout(&[wrong]).await;
|
||||
let err = rewrite_session_meta_name(&mut file, Some("x".to_string()))
|
||||
.await
|
||||
.expect_err("expected error");
|
||||
assert!(format!("{err}").contains("first rollout item is not SessionMeta"));
|
||||
// ensure file pointer is rewound to end after failure paths
|
||||
let pos = file
|
||||
.seek(std::io::SeekFrom::Current(0))
|
||||
.await
|
||||
.expect("seek");
|
||||
assert!(pos > 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn errors_when_missing_newline() {
|
||||
let temp = NamedTempFile::new().expect("temp file");
|
||||
let mut file = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(true)
|
||||
.open(temp.path())
|
||||
.await
|
||||
.expect("open temp file");
|
||||
file.write_all(b"no newline").await.expect("write");
|
||||
let err = rewrite_session_meta_name(&mut file, Some("x".to_string()))
|
||||
.await
|
||||
.expect_err("expected error");
|
||||
assert!(format!("{err}").contains("rollout missing newline after SessionMeta"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -594,6 +594,7 @@ async fn test_tail_includes_last_response_items() -> Result<()> {
|
||||
cli_version: "test_version".into(),
|
||||
source: SessionSource::VSCode,
|
||||
model_provider: Some("test-provider".into()),
|
||||
name: None,
|
||||
},
|
||||
git: None,
|
||||
}),
|
||||
@@ -687,6 +688,7 @@ async fn test_tail_handles_short_sessions() -> Result<()> {
|
||||
cli_version: "test_version".into(),
|
||||
source: SessionSource::VSCode,
|
||||
model_provider: Some("test-provider".into()),
|
||||
name: None,
|
||||
},
|
||||
git: None,
|
||||
}),
|
||||
@@ -781,6 +783,7 @@ async fn test_tail_skips_trailing_non_responses() -> Result<()> {
|
||||
cli_version: "test_version".into(),
|
||||
source: SessionSource::VSCode,
|
||||
model_provider: Some("test-provider".into()),
|
||||
name: None,
|
||||
},
|
||||
git: None,
|
||||
}),
|
||||
|
||||
144
codex-rs/core/src/saved_sessions.rs
Normal file
144
codex-rs/core/src/saved_sessions.rs
Normal file
@@ -0,0 +1,144 @@
|
||||
use crate::error::Result;
|
||||
use crate::find_conversation_path_by_id_str;
|
||||
use crate::rollout::list::read_head_for_summary;
|
||||
use codex_protocol::ConversationId;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use std::collections::BTreeMap;
|
||||
use std::io::Error as IoError;
|
||||
use std::io::ErrorKind;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use time::OffsetDateTime;
|
||||
use time::format_description::well_known::Rfc3339;
|
||||
use tracing::warn;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SavedSessionEntry {
|
||||
pub name: String,
|
||||
pub conversation_id: ConversationId,
|
||||
pub rollout_path: PathBuf,
|
||||
pub cwd: PathBuf,
|
||||
pub model: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub model_provider: Option<String>,
|
||||
pub saved_at: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub created_at: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct SavedSessionsFile {
|
||||
#[serde(default)]
|
||||
entries: BTreeMap<String, SavedSessionEntry>,
|
||||
}
|
||||
|
||||
fn saved_sessions_path(codex_home: &Path) -> PathBuf {
|
||||
codex_home.join("saved_sessions.json")
|
||||
}
|
||||
|
||||
async fn load_saved_sessions_file(path: &Path) -> Result<SavedSessionsFile> {
|
||||
match tokio::fs::read_to_string(path).await {
|
||||
Ok(text) => {
|
||||
let parsed = serde_json::from_str(&text)
|
||||
.map_err(|e| IoError::other(format!("failed to parse saved sessions: {e}")))?;
|
||||
Ok(parsed)
|
||||
}
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => Ok(SavedSessionsFile::default()),
|
||||
Err(err) => Err(err.into()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn write_saved_sessions_file(path: &Path, file: &SavedSessionsFile) -> Result<()> {
|
||||
if let Some(parent) = path.parent() {
|
||||
tokio::fs::create_dir_all(parent).await?;
|
||||
}
|
||||
let json = serde_json::to_string_pretty(file)
|
||||
.map_err(|e| IoError::other(format!("failed to serialize saved sessions: {e}")))?;
|
||||
let tmp_path = path.with_extension("json.tmp");
|
||||
tokio::fs::write(&tmp_path, json).await?;
|
||||
tokio::fs::rename(tmp_path, path).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a new entry from the rollout's SessionMeta line.
|
||||
pub async fn build_saved_session_entry(
|
||||
name: String,
|
||||
rollout_path: PathBuf,
|
||||
model: String,
|
||||
) -> Result<SavedSessionEntry> {
|
||||
let head = read_head_for_summary(&rollout_path).await?;
|
||||
let first = head.first().ok_or_else(|| {
|
||||
IoError::other(format!(
|
||||
"rollout at {} has no SessionMeta",
|
||||
rollout_path.display()
|
||||
))
|
||||
})?;
|
||||
let SessionMetaLine { mut meta, .. } = serde_json::from_value::<SessionMetaLine>(first.clone())
|
||||
.map_err(|e| IoError::other(format!("failed to parse SessionMeta: {e}")))?;
|
||||
meta.name = Some(name.clone());
|
||||
let saved_at = OffsetDateTime::now_utc()
|
||||
.format(&Rfc3339)
|
||||
.map_err(|e| IoError::other(format!("failed to format timestamp: {e}")))?;
|
||||
let created_at = if meta.timestamp.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(meta.timestamp.clone())
|
||||
};
|
||||
Ok(SavedSessionEntry {
|
||||
name,
|
||||
conversation_id: meta.id,
|
||||
rollout_path,
|
||||
cwd: meta.cwd,
|
||||
model,
|
||||
model_provider: meta.model_provider,
|
||||
saved_at,
|
||||
created_at,
|
||||
})
|
||||
}
|
||||
|
||||
/// Insert or replace a saved session entry in `saved_sessions.json`.
|
||||
pub async fn upsert_saved_session(codex_home: &Path, entry: SavedSessionEntry) -> Result<()> {
|
||||
let path = saved_sessions_path(codex_home);
|
||||
let mut file = load_saved_sessions_file(&path).await?;
|
||||
file.entries.insert(entry.name.clone(), entry);
|
||||
write_saved_sessions_file(&path, &file).await
|
||||
}
|
||||
|
||||
/// Lookup a saved session by name, if present.
|
||||
pub async fn resolve_saved_session(
|
||||
codex_home: &Path,
|
||||
name: &str,
|
||||
) -> Result<Option<SavedSessionEntry>> {
|
||||
let path = saved_sessions_path(codex_home);
|
||||
let file = load_saved_sessions_file(&path).await?;
|
||||
Ok(file.entries.get(name).cloned())
|
||||
}
|
||||
|
||||
/// Return all saved sessions ordered by newest `saved_at` first.
|
||||
pub async fn list_saved_sessions(codex_home: &Path) -> Result<Vec<SavedSessionEntry>> {
|
||||
let path = saved_sessions_path(codex_home);
|
||||
let file = load_saved_sessions_file(&path).await?;
|
||||
let mut entries: Vec<SavedSessionEntry> = file.entries.values().cloned().collect();
|
||||
entries.sort_by(|a, b| b.saved_at.cmp(&a.saved_at));
|
||||
Ok(entries)
|
||||
}
|
||||
|
||||
/// Resolve a rollout path from either a saved-session name or rollout id string.
|
||||
/// Returns `Ok(None)` when nothing matches.
|
||||
pub async fn resolve_rollout_path(codex_home: &Path, identifier: &str) -> Result<Option<PathBuf>> {
|
||||
if let Some(entry) = resolve_saved_session(codex_home, identifier).await? {
|
||||
if entry.rollout_path.exists() {
|
||||
return Ok(Some(entry.rollout_path));
|
||||
}
|
||||
warn!(
|
||||
"saved session '{}' points to missing rollout at {}",
|
||||
identifier,
|
||||
entry.rollout_path.display()
|
||||
);
|
||||
}
|
||||
Ok(find_conversation_path_by_id_str(codex_home, identifier).await?)
|
||||
}
|
||||
@@ -128,7 +128,9 @@ impl Session {
|
||||
task_cancellation_token.child_token(),
|
||||
)
|
||||
.await;
|
||||
session_ctx.clone_session().flush_rollout().await;
|
||||
if let Err(e) = session_ctx.clone_session().flush_rollout().await {
|
||||
tracing::warn!("failed to flush rollout recorder: {e}");
|
||||
}
|
||||
if !task_cancellation_token.is_cancelled() {
|
||||
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
|
||||
let sess = session_ctx.clone_session();
|
||||
|
||||
@@ -45,6 +45,7 @@ mod resume;
|
||||
mod review;
|
||||
mod rmcp_client;
|
||||
mod rollout_list_find;
|
||||
mod saved_sessions;
|
||||
mod seatbelt;
|
||||
mod shell_serialization;
|
||||
mod stream_error_allows_next_turn;
|
||||
|
||||
457
codex-rs/core/tests/suite/saved_sessions.rs
Normal file
457
codex-rs/core/tests/suite/saved_sessions.rs
Normal file
@@ -0,0 +1,457 @@
|
||||
#![allow(clippy::expect_used)]
|
||||
use anyhow::Result;
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::CodexConversation;
|
||||
use codex_core::ConversationManager;
|
||||
use codex_core::SavedSessionEntry;
|
||||
use codex_core::build_saved_session_entry;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::RolloutItem;
|
||||
use codex_core::protocol::RolloutLine;
|
||||
use codex_core::protocol::SaveSessionResponseEvent;
|
||||
use codex_core::protocol::SessionSource;
|
||||
use codex_core::resolve_saved_session;
|
||||
use codex_core::upsert_saved_session;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use core_test_support::wait_for_event;
|
||||
use core_test_support::wait_for_event_match;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
fn completion_body(idx: usize, message: &str) -> String {
|
||||
let resp_id = format!("resp-{idx}");
|
||||
let msg_id = format!("msg-{idx}");
|
||||
sse(vec![
|
||||
ev_response_created(&resp_id),
|
||||
ev_assistant_message(&msg_id, message),
|
||||
ev_completed(&resp_id),
|
||||
])
|
||||
}
|
||||
|
||||
fn rollout_lines(path: &Path) -> Vec<RolloutLine> {
|
||||
let text = std::fs::read_to_string(path).expect("read rollout");
|
||||
text.lines()
|
||||
.filter_map(|line| {
|
||||
if line.trim().is_empty() {
|
||||
return None;
|
||||
}
|
||||
let value: serde_json::Value = serde_json::from_str(line).expect("rollout line json");
|
||||
Some(serde_json::from_value::<RolloutLine>(value).expect("rollout line"))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn rollout_items_without_meta(path: &Path) -> Vec<RolloutItem> {
|
||||
rollout_lines(path)
|
||||
.into_iter()
|
||||
.filter_map(|line| match line.item {
|
||||
RolloutItem::SessionMeta(_) => None,
|
||||
other => Some(other),
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn session_meta_count(path: &Path) -> usize {
|
||||
rollout_lines(path)
|
||||
.iter()
|
||||
.filter(|line| matches!(line.item, RolloutItem::SessionMeta(_)))
|
||||
.count()
|
||||
}
|
||||
|
||||
async fn submit_text(codex: &Arc<CodexConversation>, text: &str) -> Result<()> {
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: text.to_string(),
|
||||
}],
|
||||
})
|
||||
.await?;
|
||||
let _ = wait_for_event(codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn save_session(
|
||||
name: &str,
|
||||
codex: &Arc<CodexConversation>,
|
||||
config: &Config,
|
||||
) -> Result<SavedSessionEntry> {
|
||||
codex.flush_rollout().await?;
|
||||
codex.set_session_name(Some(name.to_string())).await?;
|
||||
let entry =
|
||||
build_saved_session_entry(name.to_string(), codex.rollout_path(), codex.model().await)
|
||||
.await?;
|
||||
upsert_saved_session(&config.codex_home, entry.clone()).await?;
|
||||
Ok(entry)
|
||||
}
|
||||
|
||||
async fn save_session_via_op(
|
||||
codex: &Arc<CodexConversation>,
|
||||
name: &str,
|
||||
) -> Result<SaveSessionResponseEvent> {
|
||||
codex
|
||||
.submit(Op::SaveSession {
|
||||
name: name.to_string(),
|
||||
})
|
||||
.await?;
|
||||
let response: SaveSessionResponseEvent = wait_for_event_match(codex, |ev| match ev {
|
||||
EventMsg::SaveSessionResponse(resp) => Some(resp.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn save_and_resume_by_name() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
mount_sse_sequence(&server, vec![completion_body(1, "initial")]).await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
let initial = builder.build(&server).await?;
|
||||
submit_text(&initial.codex, "first turn").await?;
|
||||
|
||||
let name = "alpha";
|
||||
let entry = save_session(name, &initial.codex, &initial.config).await?;
|
||||
let resolved = resolve_saved_session(&initial.config.codex_home, name)
|
||||
.await?
|
||||
.expect("saved session");
|
||||
assert_eq!(entry, resolved);
|
||||
assert_eq!(session_meta_count(&entry.rollout_path), 1);
|
||||
|
||||
let saved_items = rollout_items_without_meta(&entry.rollout_path);
|
||||
|
||||
let resumed = builder
|
||||
.resume(&server, initial.home.clone(), entry.rollout_path.clone())
|
||||
.await?;
|
||||
assert_eq!(resumed.session_configured.session_id, entry.conversation_id);
|
||||
let resumed_items = rollout_items_without_meta(&resumed.session_configured.rollout_path);
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_value(saved_items)?,
|
||||
serde_json::to_value(resumed_items)?
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn save_session_op_persists_and_emits_response() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
mount_sse_sequence(&server, vec![completion_body(1, "initial")]).await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
let initial = builder.build(&server).await?;
|
||||
submit_text(&initial.codex, "first turn").await?;
|
||||
|
||||
let name = "via-op";
|
||||
let response = save_session_via_op(&initial.codex, name).await?;
|
||||
|
||||
assert_eq!(response.name, name);
|
||||
assert_eq!(
|
||||
response.conversation_id,
|
||||
initial.session_configured.session_id
|
||||
);
|
||||
assert!(response.rollout_path.exists());
|
||||
|
||||
let resolved = resolve_saved_session(&initial.config.codex_home, name)
|
||||
.await?
|
||||
.expect("saved session");
|
||||
assert_eq!(resolved.rollout_path, response.rollout_path);
|
||||
assert_eq!(resolved.conversation_id, response.conversation_id);
|
||||
assert_eq!(session_meta_count(&resolved.rollout_path), 1);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn fork_from_identifier_after_save_op() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
completion_body(1, "seed"),
|
||||
completion_body(2, "fork-extra-1"),
|
||||
completion_body(3, "fork-extra-2"),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
let initial = builder.build(&server).await?;
|
||||
submit_text(&initial.codex, "seeded").await?;
|
||||
|
||||
let name = "forkable-op";
|
||||
let response = save_session_via_op(&initial.codex, name).await?;
|
||||
|
||||
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
|
||||
let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec);
|
||||
let forked = conversation_manager
|
||||
.fork_from_identifier(initial.config.clone(), name, auth_manager)
|
||||
.await?;
|
||||
|
||||
assert_ne!(
|
||||
forked.session_configured.session_id,
|
||||
response.conversation_id
|
||||
);
|
||||
|
||||
// Record the baseline rollout for the saved session.
|
||||
let base_items = rollout_items_without_meta(&response.rollout_path);
|
||||
|
||||
// Send additional turns to the forked conversation and flush.
|
||||
submit_text(&forked.conversation, "fork one").await?;
|
||||
submit_text(&forked.conversation, "fork two").await?;
|
||||
forked.conversation.flush_rollout().await?;
|
||||
|
||||
// Re-read both rollouts: source should remain unchanged.
|
||||
let base_after = rollout_items_without_meta(&response.rollout_path);
|
||||
assert_eq!(
|
||||
serde_json::to_value(&base_items)?,
|
||||
serde_json::to_value(&base_after)?
|
||||
);
|
||||
|
||||
// Forked rollout should extend the baseline.
|
||||
let fork_items = rollout_items_without_meta(&forked.conversation.rollout_path());
|
||||
assert!(
|
||||
fork_items.len() > base_items.len(),
|
||||
"expected forked rollout to contain additional items"
|
||||
);
|
||||
let fork_prefix: Vec<_> = fork_items.iter().take(base_items.len()).cloned().collect();
|
||||
assert_eq!(
|
||||
serde_json::to_value(&base_items)?,
|
||||
serde_json::to_value(&fork_prefix)?,
|
||||
"forked rollout should extend the baseline history"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn save_and_fork_by_name() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
mount_sse_sequence(&server, vec![completion_body(1, "base")]).await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
let initial = builder.build(&server).await?;
|
||||
submit_text(&initial.codex, "original").await?;
|
||||
|
||||
let entry = save_session("forkable", &initial.codex, &initial.config).await?;
|
||||
|
||||
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
|
||||
let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec);
|
||||
let forked = conversation_manager
|
||||
.fork_from_rollout(
|
||||
initial.config.clone(),
|
||||
entry.rollout_path.clone(),
|
||||
auth_manager,
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert_ne!(forked.session_configured.session_id, entry.conversation_id);
|
||||
assert_ne!(forked.conversation.rollout_path(), entry.rollout_path);
|
||||
assert_eq!(session_meta_count(&forked.conversation.rollout_path()), 1);
|
||||
|
||||
let base_items = rollout_items_without_meta(&entry.rollout_path);
|
||||
let fork_items = rollout_items_without_meta(&forked.conversation.rollout_path());
|
||||
assert_eq!(
|
||||
serde_json::to_value(base_items)?,
|
||||
serde_json::to_value(fork_items)?
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn forked_messages_do_not_touch_original() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
completion_body(1, "base"),
|
||||
completion_body(2, "fork-1"),
|
||||
completion_body(3, "fork-2"),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
let initial = builder.build(&server).await?;
|
||||
submit_text(&initial.codex, "first").await?;
|
||||
|
||||
let entry = save_session("branch", &initial.codex, &initial.config).await?;
|
||||
let baseline_items = rollout_items_without_meta(&entry.rollout_path);
|
||||
|
||||
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
|
||||
let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec);
|
||||
let forked = conversation_manager
|
||||
.fork_from_rollout(
|
||||
initial.config.clone(),
|
||||
entry.rollout_path.clone(),
|
||||
auth_manager.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
submit_text(&forked.conversation, "fork message one").await?;
|
||||
submit_text(&forked.conversation, "fork message two").await?;
|
||||
|
||||
let resumed = builder
|
||||
.resume(&server, initial.home.clone(), entry.rollout_path.clone())
|
||||
.await?;
|
||||
let resumed_items = rollout_items_without_meta(&resumed.session_configured.rollout_path);
|
||||
|
||||
assert_eq!(
|
||||
serde_json::to_value(baseline_items.clone())?,
|
||||
serde_json::to_value(resumed_items)?
|
||||
);
|
||||
assert_eq!(
|
||||
serde_json::to_value(baseline_items)?,
|
||||
serde_json::to_value(rollout_items_without_meta(&entry.rollout_path))?
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn resumed_messages_are_present_in_new_fork() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
completion_body(1, "original"),
|
||||
completion_body(2, "fork-extra"),
|
||||
completion_body(3, "resumed-extra"),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
let initial = builder.build(&server).await?;
|
||||
submit_text(&initial.codex, "start").await?;
|
||||
|
||||
let entry = save_session("seed", &initial.codex, &initial.config).await?;
|
||||
|
||||
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
|
||||
let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec);
|
||||
let forked = conversation_manager
|
||||
.fork_from_rollout(
|
||||
initial.config.clone(),
|
||||
entry.rollout_path.clone(),
|
||||
auth_manager.clone(),
|
||||
)
|
||||
.await?;
|
||||
submit_text(&forked.conversation, "fork only").await?;
|
||||
|
||||
let resumed = builder
|
||||
.resume(&server, initial.home.clone(), entry.rollout_path.clone())
|
||||
.await?;
|
||||
submit_text(&resumed.codex, "resumed addition").await?;
|
||||
resumed.codex.flush_rollout().await?;
|
||||
let updated_base_items = rollout_items_without_meta(&entry.rollout_path);
|
||||
|
||||
let fork_again = conversation_manager
|
||||
.fork_from_rollout(
|
||||
initial.config.clone(),
|
||||
entry.rollout_path.clone(),
|
||||
auth_manager,
|
||||
)
|
||||
.await?;
|
||||
let fork_again_items = rollout_items_without_meta(&fork_again.conversation.rollout_path());
|
||||
assert_eq!(
|
||||
serde_json::to_value(updated_base_items)?,
|
||||
serde_json::to_value(fork_again_items)?
|
||||
);
|
||||
assert_eq!(
|
||||
session_meta_count(&fork_again.conversation.rollout_path()),
|
||||
1
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn duplicate_name_overwrites_entry() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
mount_sse_sequence(
|
||||
&server,
|
||||
vec![completion_body(1, "one"), completion_body(2, "two")],
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
let first = builder.build(&server).await?;
|
||||
submit_text(&first.codex, "first session").await?;
|
||||
let name = "shared";
|
||||
let entry_one = save_session(name, &first.codex, &first.config).await?;
|
||||
|
||||
let second = builder.build(&server).await?;
|
||||
submit_text(&second.codex, "second session").await?;
|
||||
let entry_two = save_session(name, &second.codex, &second.config).await?;
|
||||
|
||||
let resolved = resolve_saved_session(&second.config.codex_home, name)
|
||||
.await?
|
||||
.expect("latest entry present");
|
||||
assert_eq!(resolved, entry_two);
|
||||
assert_ne!(resolved.conversation_id, entry_one.conversation_id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn same_session_multiple_names() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
mount_sse_sequence(&server, vec![completion_body(1, "hello")]).await;
|
||||
|
||||
let mut builder = test_codex();
|
||||
let session = builder.build(&server).await?;
|
||||
submit_text(&session.codex, "save twice").await?;
|
||||
|
||||
let entry_first = save_session("first", &session.codex, &session.config).await?;
|
||||
let entry_second = save_session("second", &session.codex, &session.config).await?;
|
||||
|
||||
let resolved_first = resolve_saved_session(&session.config.codex_home, "first")
|
||||
.await?
|
||||
.expect("first entry");
|
||||
let resolved_second = resolve_saved_session(&session.config.codex_home, "second")
|
||||
.await?
|
||||
.expect("second entry");
|
||||
|
||||
assert_eq!(entry_first.conversation_id, entry_second.conversation_id);
|
||||
assert_eq!(
|
||||
resolved_first.conversation_id,
|
||||
resolved_second.conversation_id
|
||||
);
|
||||
assert_eq!(resolved_first.rollout_path, resolved_second.rollout_path);
|
||||
|
||||
let names: serde_json::Value = json!([entry_first.name, entry_second.name]);
|
||||
assert_eq!(names, json!(["first", "second"]));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -567,6 +567,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
|
||||
| EventMsg::ReasoningContentDelta(_)
|
||||
| EventMsg::ReasoningRawContentDelta(_)
|
||||
| EventMsg::UndoCompleted(_)
|
||||
| EventMsg::SaveSessionResponse(_)
|
||||
| EventMsg::UndoStarted(_) => {}
|
||||
}
|
||||
CodexStatus::Running
|
||||
|
||||
@@ -302,6 +302,7 @@ async fn run_codex_tool_session_inner(
|
||||
| EventMsg::UndoStarted(_)
|
||||
| EventMsg::UndoCompleted(_)
|
||||
| EventMsg::ExitedReviewMode(_)
|
||||
| EventMsg::SaveSessionResponse(_)
|
||||
| EventMsg::DeprecationNotice(_) => {
|
||||
// For now, we do not do anything extra for these
|
||||
// events. Note that
|
||||
|
||||
@@ -102,6 +102,9 @@ pub enum Op {
|
||||
final_output_json_schema: Option<Value>,
|
||||
},
|
||||
|
||||
/// Persist the current session under a user-provided name.
|
||||
SaveSession { name: String },
|
||||
|
||||
/// Override parts of the persistent turn context for subsequent turns.
|
||||
///
|
||||
/// All fields are optional; when omitted, the existing value is preserved.
|
||||
@@ -513,6 +516,9 @@ pub enum EventMsg {
|
||||
|
||||
BackgroundEvent(BackgroundEventEvent),
|
||||
|
||||
/// Result of a save-session request.
|
||||
SaveSessionResponse(SaveSessionResponseEvent),
|
||||
|
||||
UndoStarted(UndoStartedEvent),
|
||||
|
||||
UndoCompleted(UndoCompletedEvent),
|
||||
@@ -1036,6 +1042,13 @@ impl InitialHistory {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn without_session_meta(&self) -> Vec<RolloutItem> {
|
||||
self.get_rollout_items()
|
||||
.into_iter()
|
||||
.filter(|item| !matches!(item, RolloutItem::SessionMeta(_)))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub fn get_event_msgs(&self) -> Option<Vec<EventMsg>> {
|
||||
match self {
|
||||
InitialHistory::New => None,
|
||||
@@ -1096,6 +1109,8 @@ pub struct SessionMeta {
|
||||
#[serde(default)]
|
||||
pub source: SessionSource,
|
||||
pub model_provider: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub name: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for SessionMeta {
|
||||
@@ -1109,6 +1124,7 @@ impl Default for SessionMeta {
|
||||
instructions: None,
|
||||
source: SessionSource::default(),
|
||||
model_provider: None,
|
||||
name: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1370,6 +1386,13 @@ pub struct StreamInfoEvent {
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
|
||||
pub struct SaveSessionResponseEvent {
|
||||
pub name: String,
|
||||
pub rollout_path: PathBuf,
|
||||
pub conversation_id: ConversationId,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
|
||||
pub struct PatchApplyBeginEvent {
|
||||
/// Identifier so this can be paired with the PatchApplyEnd event.
|
||||
|
||||
@@ -7,6 +7,8 @@ use crate::diff_render::DiffSummary;
|
||||
use crate::exec_command::strip_bash_lc_and_escape;
|
||||
use crate::file_search::FileSearchManager;
|
||||
use crate::history_cell::HistoryCell;
|
||||
#[cfg(not(debug_assertions))]
|
||||
use crate::history_cell::UpdateAvailableHistoryCell;
|
||||
use crate::model_migration::ModelMigrationOutcome;
|
||||
use crate::model_migration::migration_copy_for_config;
|
||||
use crate::model_migration::run_model_migration_prompt;
|
||||
@@ -53,9 +55,6 @@ use std::time::Duration;
|
||||
use tokio::select;
|
||||
use tokio::sync::mpsc::unbounded_channel;
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
use crate::history_cell::UpdateAvailableHistoryCell;
|
||||
|
||||
const GPT_5_1_MIGRATION_AUTH_MODES: [AuthMode; 2] = [AuthMode::ChatGPT, AuthMode::ApiKey];
|
||||
const GPT_5_1_CODEX_MIGRATION_AUTH_MODES: [AuthMode; 1] = [AuthMode::ChatGPT];
|
||||
|
||||
@@ -296,6 +295,27 @@ impl App {
|
||||
resumed.session_configured,
|
||||
)
|
||||
}
|
||||
ResumeSelection::Fork(path) => {
|
||||
let resumed = conversation_manager
|
||||
.fork_from_rollout(config.clone(), path.clone(), auth_manager.clone())
|
||||
.await
|
||||
.wrap_err_with(|| format!("Failed to fork session from {}", path.display()))?;
|
||||
let init = crate::chatwidget::ChatWidgetInit {
|
||||
config: config.clone(),
|
||||
frame_requester: tui.frame_requester(),
|
||||
app_event_tx: app_event_tx.clone(),
|
||||
initial_prompt: initial_prompt.clone(),
|
||||
initial_images: initial_images.clone(),
|
||||
enhanced_keys_supported,
|
||||
auth_manager: auth_manager.clone(),
|
||||
feedback: feedback.clone(),
|
||||
};
|
||||
ChatWidget::new_from_existing(
|
||||
init,
|
||||
resumed.conversation,
|
||||
resumed.session_configured,
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
chat_widget.maybe_prompt_windows_sandbox_enable();
|
||||
|
||||
@@ -69,7 +69,10 @@ const LARGE_PASTE_CHAR_THRESHOLD: usize = 1000;
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum InputResult {
|
||||
Submitted(String),
|
||||
Command(SlashCommand),
|
||||
Command {
|
||||
command: SlashCommand,
|
||||
args: Option<String>,
|
||||
},
|
||||
None,
|
||||
}
|
||||
|
||||
@@ -333,6 +336,19 @@ impl ChatComposer {
|
||||
PasteBurst::recommended_flush_delay()
|
||||
}
|
||||
|
||||
fn command_args_from_line(line: &str, command: SlashCommand) -> Option<String> {
|
||||
if let Some((name, rest)) = parse_slash_name(line)
|
||||
&& name == command.command()
|
||||
{
|
||||
let trimmed = rest.trim();
|
||||
if trimmed.is_empty() {
|
||||
return None;
|
||||
}
|
||||
return Some(trimmed.to_string());
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
/// Integrate results from an asynchronous file search.
|
||||
pub(crate) fn on_file_search_result(&mut self, query: String, matches: Vec<FileMatch>) {
|
||||
// Only apply if user is still editing a token starting with `query`.
|
||||
@@ -505,8 +521,9 @@ impl ChatComposer {
|
||||
if let Some(sel) = popup.selected_item() {
|
||||
match sel {
|
||||
CommandItem::Builtin(cmd) => {
|
||||
let args = Self::command_args_from_line(first_line, cmd);
|
||||
self.textarea.set_text("");
|
||||
return (InputResult::Command(cmd), true);
|
||||
return (InputResult::Command { command: cmd, args }, true);
|
||||
}
|
||||
CommandItem::UserPrompt(idx) => {
|
||||
if let Some(prompt) = popup.prompt(idx) {
|
||||
@@ -920,22 +937,21 @@ impl ChatComposer {
|
||||
modifiers: KeyModifiers::NONE,
|
||||
..
|
||||
} => {
|
||||
// If the first line is a bare built-in slash command (no args),
|
||||
// dispatch it even when the slash popup isn't visible. This preserves
|
||||
// the workflow: type a prefix ("/di"), press Tab to complete to
|
||||
// "/diff ", then press Enter to run it. Tab moves the cursor beyond
|
||||
// the '/name' token and our caret-based heuristic hides the popup,
|
||||
// but Enter should still dispatch the command rather than submit
|
||||
// literal text.
|
||||
// If the first line is a built-in slash command, dispatch it even when
|
||||
// the slash popup isn't visible. This preserves the workflow:
|
||||
// type a prefix ("/di"), press Tab to complete to "/diff ", then press
|
||||
// Enter to run it. Tab moves the cursor beyond the '/name' token and
|
||||
// our caret-based heuristic hides the popup, but Enter should still
|
||||
// dispatch the command rather than submit literal text.
|
||||
let first_line = self.textarea.text().lines().next().unwrap_or("");
|
||||
if let Some((name, rest)) = parse_slash_name(first_line)
|
||||
&& rest.is_empty()
|
||||
if let Some((name, _rest)) = parse_slash_name(first_line)
|
||||
&& let Some((_n, cmd)) = built_in_slash_commands()
|
||||
.into_iter()
|
||||
.find(|(n, _)| *n == name)
|
||||
{
|
||||
let args = Self::command_args_from_line(first_line, cmd);
|
||||
self.textarea.set_text("");
|
||||
return (InputResult::Command(cmd), true);
|
||||
return (InputResult::Command { command: cmd, args }, true);
|
||||
}
|
||||
// If we're in a paste-like burst capture, treat Enter as part of the burst
|
||||
// and accumulate it rather than submitting or inserting immediately.
|
||||
@@ -2399,8 +2415,9 @@ mod tests {
|
||||
// When a slash command is dispatched, the composer should return a
|
||||
// Command result (not submit literal text) and clear its textarea.
|
||||
match result {
|
||||
InputResult::Command(cmd) => {
|
||||
InputResult::Command { command: cmd, args } => {
|
||||
assert_eq!(cmd.command(), "init");
|
||||
assert!(args.is_none());
|
||||
}
|
||||
InputResult::Submitted(text) => {
|
||||
panic!("expected command dispatch, but composer submitted literal text: {text}")
|
||||
@@ -2474,7 +2491,10 @@ mod tests {
|
||||
let (result, _needs_redraw) =
|
||||
composer.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
|
||||
match result {
|
||||
InputResult::Command(cmd) => assert_eq!(cmd.command(), "diff"),
|
||||
InputResult::Command { command: cmd, args } => {
|
||||
assert_eq!(cmd.command(), "diff");
|
||||
assert!(args.is_none());
|
||||
}
|
||||
InputResult::Submitted(text) => {
|
||||
panic!("expected command dispatch after Tab completion, got literal submit: {text}")
|
||||
}
|
||||
@@ -2483,6 +2503,42 @@ mod tests {
|
||||
assert!(composer.textarea.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn slash_command_with_args_dispatches_and_preserves_args() {
|
||||
use crossterm::event::KeyCode;
|
||||
use crossterm::event::KeyEvent;
|
||||
use crossterm::event::KeyModifiers;
|
||||
|
||||
let (tx, _rx) = unbounded_channel::<AppEvent>();
|
||||
let sender = AppEventSender::new(tx);
|
||||
let mut composer = ChatComposer::new(
|
||||
true,
|
||||
sender,
|
||||
false,
|
||||
"Ask Codex to do anything".to_string(),
|
||||
false,
|
||||
);
|
||||
|
||||
composer.textarea.set_text("/save feature-one");
|
||||
|
||||
let (result, _needs_redraw) =
|
||||
composer.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
|
||||
|
||||
match result {
|
||||
InputResult::Command { command: cmd, args } => {
|
||||
assert_eq!(cmd, SlashCommand::Save);
|
||||
assert_eq!(args.as_deref(), Some("feature-one"));
|
||||
}
|
||||
InputResult::Submitted(text) => {
|
||||
panic!(
|
||||
"expected slash command dispatch, but composer submitted literal text: {text}"
|
||||
)
|
||||
}
|
||||
InputResult::None => panic!("expected Command result for '/save feature-one'"),
|
||||
}
|
||||
assert!(composer.textarea.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn slash_mention_dispatches_command_and_inserts_at() {
|
||||
use crossterm::event::KeyCode;
|
||||
@@ -2505,8 +2561,9 @@ mod tests {
|
||||
composer.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
|
||||
|
||||
match result {
|
||||
InputResult::Command(cmd) => {
|
||||
InputResult::Command { command: cmd, args } => {
|
||||
assert_eq!(cmd.command(), "mention");
|
||||
assert!(args.is_none());
|
||||
}
|
||||
InputResult::Submitted(text) => {
|
||||
panic!("expected command dispatch, but composer submitted literal text: {text}")
|
||||
|
||||
@@ -1326,8 +1326,8 @@ impl ChatWidget {
|
||||
};
|
||||
self.queue_user_message(user_message);
|
||||
}
|
||||
InputResult::Command(cmd) => {
|
||||
self.dispatch_command(cmd);
|
||||
InputResult::Command { command: cmd, args } => {
|
||||
self.dispatch_command(cmd, args);
|
||||
}
|
||||
InputResult::None => {}
|
||||
}
|
||||
@@ -1350,7 +1350,7 @@ impl ChatWidget {
|
||||
self.request_redraw();
|
||||
}
|
||||
|
||||
fn dispatch_command(&mut self, cmd: SlashCommand) {
|
||||
fn dispatch_command(&mut self, cmd: SlashCommand, args: Option<String>) {
|
||||
if !cmd.available_during_task() && self.bottom_pane.is_task_running() {
|
||||
let message = format!(
|
||||
"'/{}' is disabled while a task is in progress.",
|
||||
@@ -1371,6 +1371,9 @@ impl ChatWidget {
|
||||
SlashCommand::New => {
|
||||
self.app_event_tx.send(AppEvent::NewSession);
|
||||
}
|
||||
SlashCommand::Save => {
|
||||
self.handle_save_command(args);
|
||||
}
|
||||
SlashCommand::Init => {
|
||||
let init_target = self.config.cwd.join(DEFAULT_PROJECT_DOC_FILENAME);
|
||||
if init_target.exists() {
|
||||
@@ -1488,6 +1491,31 @@ impl ChatWidget {
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_save_command(&mut self, args: Option<String>) {
|
||||
let Some(name_raw) = args else {
|
||||
self.add_to_history(history_cell::new_error_event(
|
||||
"Usage: /save <name>".to_string(),
|
||||
));
|
||||
return;
|
||||
};
|
||||
let name = name_raw.trim();
|
||||
if name.is_empty() {
|
||||
self.add_to_history(history_cell::new_error_event(
|
||||
"Usage: /save <name>".to_string(),
|
||||
));
|
||||
return;
|
||||
}
|
||||
if self.conversation_id.is_none() {
|
||||
self.add_to_history(history_cell::new_error_event(
|
||||
"Session is not ready yet; try /save again in a moment.".to_string(),
|
||||
));
|
||||
return;
|
||||
}
|
||||
self.app_event_tx
|
||||
.send(AppEvent::CodexOp(Op::SaveSession { name: name.into() }));
|
||||
self.add_info_message(format!("Saving session '{name}'..."), None);
|
||||
}
|
||||
|
||||
pub(crate) fn handle_paste(&mut self, text: String) {
|
||||
self.bottom_pane.handle_paste(text);
|
||||
}
|
||||
@@ -1658,6 +1686,12 @@ impl ChatWidget {
|
||||
EventMsg::Error(ErrorEvent { message }) => self.on_error(message),
|
||||
EventMsg::McpStartupUpdate(ev) => self.on_mcp_startup_update(ev),
|
||||
EventMsg::McpStartupComplete(ev) => self.on_mcp_startup_complete(ev),
|
||||
EventMsg::SaveSessionResponse(ev) => {
|
||||
self.add_info_message(
|
||||
format!("Saved session '{}' ({}).", ev.name, ev.conversation_id),
|
||||
Some(format!("Rollout: {}", ev.rollout_path.display())),
|
||||
);
|
||||
}
|
||||
EventMsg::TurnAborted(ev) => match ev.reason {
|
||||
TurnAbortReason::Interrupted => {
|
||||
self.on_interrupted_turn(ev.reason);
|
||||
|
||||
@@ -1049,7 +1049,7 @@ fn slash_init_skips_when_project_doc_exists() {
|
||||
std::fs::write(&existing_path, "existing instructions").unwrap();
|
||||
chat.config.cwd = tempdir.path().to_path_buf();
|
||||
|
||||
chat.dispatch_command(SlashCommand::Init);
|
||||
chat.dispatch_command(SlashCommand::Init, None);
|
||||
|
||||
match op_rx.try_recv() {
|
||||
Err(TryRecvError::Empty) => {}
|
||||
@@ -1077,7 +1077,7 @@ fn slash_init_skips_when_project_doc_exists() {
|
||||
fn slash_quit_requests_exit() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
|
||||
|
||||
chat.dispatch_command(SlashCommand::Quit);
|
||||
chat.dispatch_command(SlashCommand::Quit, None);
|
||||
|
||||
assert_matches!(rx.try_recv(), Ok(AppEvent::ExitRequest));
|
||||
}
|
||||
@@ -1086,7 +1086,7 @@ fn slash_quit_requests_exit() {
|
||||
fn slash_exit_requests_exit() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
|
||||
|
||||
chat.dispatch_command(SlashCommand::Exit);
|
||||
chat.dispatch_command(SlashCommand::Exit, None);
|
||||
|
||||
assert_matches!(rx.try_recv(), Ok(AppEvent::ExitRequest));
|
||||
}
|
||||
@@ -1095,7 +1095,7 @@ fn slash_exit_requests_exit() {
|
||||
fn slash_undo_sends_op() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
|
||||
|
||||
chat.dispatch_command(SlashCommand::Undo);
|
||||
chat.dispatch_command(SlashCommand::Undo, None);
|
||||
|
||||
match rx.try_recv() {
|
||||
Ok(AppEvent::CodexOp(Op::Undo)) => {}
|
||||
@@ -1109,7 +1109,7 @@ fn slash_rollout_displays_current_path() {
|
||||
let rollout_path = PathBuf::from("/tmp/codex-test-rollout.jsonl");
|
||||
chat.current_rollout_path = Some(rollout_path.clone());
|
||||
|
||||
chat.dispatch_command(SlashCommand::Rollout);
|
||||
chat.dispatch_command(SlashCommand::Rollout, None);
|
||||
|
||||
let cells = drain_insert_history(&mut rx);
|
||||
assert_eq!(cells.len(), 1, "expected info message for rollout path");
|
||||
@@ -1124,7 +1124,7 @@ fn slash_rollout_displays_current_path() {
|
||||
fn slash_rollout_handles_missing_path() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
|
||||
|
||||
chat.dispatch_command(SlashCommand::Rollout);
|
||||
chat.dispatch_command(SlashCommand::Rollout, None);
|
||||
|
||||
let cells = drain_insert_history(&mut rx);
|
||||
assert_eq!(
|
||||
@@ -1719,7 +1719,7 @@ fn feedback_selection_popup_snapshot() {
|
||||
let (mut chat, _rx, _op_rx) = make_chatwidget_manual();
|
||||
|
||||
// Open the feedback category selection popup via slash command.
|
||||
chat.dispatch_command(SlashCommand::Feedback);
|
||||
chat.dispatch_command(SlashCommand::Feedback, None);
|
||||
|
||||
let popup = render_bottom_popup(&chat, 80);
|
||||
assert_snapshot!("feedback_selection_popup", popup);
|
||||
@@ -1797,7 +1797,7 @@ fn disabled_slash_command_while_task_running_snapshot() {
|
||||
chat.bottom_pane.set_task_running(true);
|
||||
|
||||
// Dispatch a command that is unavailable while a task runs (e.g., /model)
|
||||
chat.dispatch_command(SlashCommand::Model);
|
||||
chat.dispatch_command(SlashCommand::Model, None);
|
||||
|
||||
// Drain history and snapshot the rendered error line(s)
|
||||
let cells = drain_insert_history(&mut rx);
|
||||
|
||||
@@ -32,6 +32,9 @@ pub struct Cli {
|
||||
#[clap(skip)]
|
||||
pub resume_show_all: bool,
|
||||
|
||||
#[clap(skip)]
|
||||
pub fork_source: Option<String>,
|
||||
|
||||
/// Model the agent should use.
|
||||
#[arg(long, short = 'm')]
|
||||
pub model: Option<String>,
|
||||
|
||||
@@ -19,9 +19,9 @@ use codex_core::config::ConfigOverrides;
|
||||
use codex_core::config::find_codex_home;
|
||||
use codex_core::config::load_config_as_toml_with_cli_overrides;
|
||||
use codex_core::config::resolve_oss_provider;
|
||||
use codex_core::find_conversation_path_by_id_str;
|
||||
use codex_core::get_platform_sandbox;
|
||||
use codex_core::protocol::AskForApproval;
|
||||
use codex_core::resolve_rollout_path;
|
||||
use codex_protocol::config_types::SandboxMode;
|
||||
use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge;
|
||||
use std::fs::OpenOptions;
|
||||
@@ -429,8 +429,29 @@ async fn run_ratatui_app(
|
||||
};
|
||||
|
||||
// Determine resume behavior: explicit id, then resume last, then picker.
|
||||
let resume_selection = if let Some(id_str) = cli.resume_session_id.as_deref() {
|
||||
match find_conversation_path_by_id_str(&config.codex_home, id_str).await? {
|
||||
let resume_selection = if let Some(target) = cli.fork_source.as_deref() {
|
||||
match resolve_rollout_path(&config.codex_home, target).await? {
|
||||
Some(path) => resume_picker::ResumeSelection::Fork(path),
|
||||
None => {
|
||||
error!("Error finding conversation path: {target}");
|
||||
restore();
|
||||
session_log::log_session_end();
|
||||
let _ = tui.terminal.clear();
|
||||
if let Err(err) = writeln!(
|
||||
std::io::stdout(),
|
||||
"No saved session or rollout found for '{target}'. Run `codex resume` without a name to choose from existing sessions."
|
||||
) {
|
||||
error!("Failed to write resume error message: {err}");
|
||||
}
|
||||
return Ok(AppExitInfo {
|
||||
token_usage: codex_core::protocol::TokenUsage::default(),
|
||||
conversation_id: None,
|
||||
update_action: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
} else if let Some(id_str) = cli.resume_session_id.as_deref() {
|
||||
match resolve_rollout_path(&config.codex_home, id_str).await? {
|
||||
Some(path) => resume_picker::ResumeSelection::Resume(path),
|
||||
None => {
|
||||
error!("Error finding conversation path: {id_str}");
|
||||
@@ -439,7 +460,7 @@ async fn run_ratatui_app(
|
||||
let _ = tui.terminal.clear();
|
||||
if let Err(err) = writeln!(
|
||||
std::io::stdout(),
|
||||
"No saved session found with ID {id_str}. Run `codex resume` without an ID to choose from existing sessions."
|
||||
"No saved session or rollout found for '{id_str}'. Run `codex resume` without a name to choose from existing sessions."
|
||||
) {
|
||||
error!("Failed to write resume error message: {err}");
|
||||
}
|
||||
|
||||
@@ -42,6 +42,7 @@ const LOAD_NEAR_THRESHOLD: usize = 5;
|
||||
pub enum ResumeSelection {
|
||||
StartFresh,
|
||||
Resume(PathBuf),
|
||||
Fork(PathBuf),
|
||||
Exit,
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ pub enum SlashCommand {
|
||||
Approvals,
|
||||
Review,
|
||||
New,
|
||||
Save,
|
||||
Init,
|
||||
Compact,
|
||||
Undo,
|
||||
@@ -37,6 +38,7 @@ impl SlashCommand {
|
||||
match self {
|
||||
SlashCommand::Feedback => "send logs to maintainers",
|
||||
SlashCommand::New => "start a new chat during a conversation",
|
||||
SlashCommand::Save => "save this session so it can be resumed later",
|
||||
SlashCommand::Init => "create an AGENTS.md file with instructions for Codex",
|
||||
SlashCommand::Compact => "summarize conversation to prevent hitting the context limit",
|
||||
SlashCommand::Review => "review my current changes and find issues",
|
||||
@@ -64,6 +66,7 @@ impl SlashCommand {
|
||||
pub fn available_during_task(self) -> bool {
|
||||
match self {
|
||||
SlashCommand::New
|
||||
| SlashCommand::Save
|
||||
| SlashCommand::Init
|
||||
| SlashCommand::Compact
|
||||
| SlashCommand::Undo
|
||||
|
||||
Reference in New Issue
Block a user