core: revert fork_conversation to (path, conversation_id); protocol: ConversationPathResponseEvent carries path again; core: emit path in ConversationHistory; recorder: expose path(); tui: backtrack uses path+id; fmt

This commit is contained in:
Ahmed Ibrahim
2025-09-08 11:16:32 -07:00
parent aa94d9c4b3
commit 52a5ea5f4e
8 changed files with 78 additions and 31 deletions

Submodule codex-rs/.codex/pro/1757202005/agent-a added at 6e743e8496

Submodule codex-rs/.codex/pro/1757202005/agent-b added at 6e743e8496

View File

@@ -1409,7 +1409,12 @@ async fn submission_loop(
id: sub_id.clone(),
msg: EventMsg::ConversationHistory(ConversationPathResponseEvent {
conversation_id: sess.conversation_id,
entries: sess.state.lock_unchecked().history.contents(),
path: sess
.rollout
.lock_unchecked()
.as_ref()
.map(|r| r.path().to_path_buf())
.unwrap_or_default(),
}),
};
sess.send_event(event).await;

View File

@@ -156,16 +156,19 @@ impl ConversationManager {
/// caller's `config`). The new conversation will have a fresh id.
pub async fn fork_conversation(
&self,
conversation_history: Vec<ResponseItem>,
conversation_path: PathBuf,
conversation_id: ConversationId,
num_messages_to_drop: usize,
config: Config,
) -> CodexResult<NewConversation> {
// Compute the prefix up to the cut point.
let items: Vec<RolloutItem> = conversation_history
.into_iter()
.map(RolloutItem::from)
.collect();
let history = truncate_after_dropping_last_messages(items, num_messages_to_drop);
let initial_history = RolloutRecorder::get_rollout_history(&conversation_path).await?;
let conversation_history = match initial_history {
InitialHistory::Resumed(items) => items,
InitialHistory::New => return Err(CodexErr::ConversationNotFound(conversation_id)),
};
let history =
truncate_after_dropping_last_messages(conversation_history, num_messages_to_drop);
// Spawn a new conversation with the computed initial history.
let auth_manager = self.auth_manager.clone();

View File

@@ -138,6 +138,9 @@ impl RolloutRecorderParams {
}
impl RolloutRecorder {
pub fn path(&self) -> &Path {
&self.path
}
#[allow(dead_code)]
/// List conversations (rollout files) under the provided Codex home directory.
pub async fn list_conversations(
@@ -314,10 +317,6 @@ impl RolloutRecorder {
}))
}
pub fn path(&self) -> &Path {
&self.path
}
pub async fn shutdown(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await {

View File

@@ -77,13 +77,43 @@ async fn fork_conversation_twice_drops_to_first_message() {
let base_history =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationHistory(_))).await;
// Capture entries from the base history and compute expected prefixes after each fork.
let entries_after_three: Vec<ResponseItem> = match &base_history {
EventMsg::ConversationHistory(ConversationPathResponseEvent { entries, .. }) => {
entries.clone()
}
// Capture path/id from the base history and compute expected prefixes after each fork.
let (base_conv_id, base_path) = match &base_history {
EventMsg::ConversationHistory(ConversationPathResponseEvent {
conversation_id,
path,
}) => (*conversation_id, path.clone()),
_ => panic!("expected ConversationHistory event"),
};
// Read entries from rollout file.
async fn read_response_entries(path: &std::path::Path) -> Vec<ResponseItem> {
let text = tokio::fs::read_to_string(path).await.unwrap_or_default();
let mut out = Vec::new();
for line in text.lines() {
if line.trim().is_empty() {
continue;
}
if let Ok(item) = serde_json::from_str::<ResponseItem>(line) {
out.push(item);
}
}
out
}
async fn read_response_entries_with_retry(
path: &std::path::Path,
min_len: usize,
) -> Vec<ResponseItem> {
for _ in 0..50u32 {
let entries = read_response_entries(path).await;
if entries.len() >= min_len {
return entries;
}
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
read_response_entries(path).await
}
let entries_after_three: Vec<ResponseItem> = read_response_entries(&base_path).await;
// History layout for this test:
// [0] user instructions,
// [1] environment context,
@@ -114,7 +144,7 @@ async fn fork_conversation_twice_drops_to_first_message() {
conversation: codex_fork1,
..
} = conversation_manager
.fork_conversation(entries_after_three.clone(), 1, config_for_fork.clone())
.fork_conversation(base_path.clone(), base_conv_id, 1, config_for_fork.clone())
.await
.expect("fork 1");
@@ -123,20 +153,23 @@ async fn fork_conversation_twice_drops_to_first_message() {
matches!(ev, EventMsg::ConversationHistory(_))
})
.await;
let entries_after_first_fork: Vec<ResponseItem> = match &fork1_history {
EventMsg::ConversationHistory(ConversationPathResponseEvent { entries, .. }) => {
assert_eq!(entries, &expected_after_first);
entries.clone()
}
let (fork1_id, fork1_path) = match &fork1_history {
EventMsg::ConversationHistory(ConversationPathResponseEvent {
conversation_id,
path,
}) => (*conversation_id, path.clone()),
_ => panic!("expected ConversationHistory event after first fork"),
};
let entries_after_first_fork: Vec<ResponseItem> =
read_response_entries_with_retry(&fork1_path, expected_after_first.len()).await;
assert_eq!(entries_after_first_fork, expected_after_first);
// Fork again with n=1 → drops the (new) last user message, leaving only the first.
let NewConversation {
conversation: codex_fork2,
..
} = conversation_manager
.fork_conversation(entries_after_first_fork.clone(), 1, config_for_fork.clone())
.fork_conversation(fork1_path.clone(), fork1_id, 1, config_for_fork.clone())
.await
.expect("fork 2");
@@ -145,10 +178,14 @@ async fn fork_conversation_twice_drops_to_first_message() {
matches!(ev, EventMsg::ConversationHistory(_))
})
.await;
match &fork2_history {
EventMsg::ConversationHistory(ConversationPathResponseEvent { entries, .. }) => {
assert_eq!(entries, &expected_after_second);
}
let (_fork2_id, fork2_path) = match &fork2_history {
EventMsg::ConversationHistory(ConversationPathResponseEvent {
conversation_id,
path,
}) => (*conversation_id, path.clone()),
_ => panic!("expected ConversationHistory event after second fork"),
};
let entries_after_second_fork: Vec<ResponseItem> =
read_response_entries_with_retry(&fork2_path, expected_after_second.len()).await;
assert_eq!(entries_after_second_fork, expected_after_second);
}

View File

@@ -801,7 +801,7 @@ pub struct WebSearchEndEvent {
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
pub struct ConversationPathResponseEvent {
pub conversation_id: ConversationId,
pub entries: Vec<ResponseItem>,
pub path: PathBuf,
}
#[derive(Debug, Clone, Deserialize, Serialize, TS)]

View File

@@ -288,7 +288,7 @@ impl App {
let cfg = self.chat_widget.config_ref().clone();
// Perform the fork via a thin wrapper for clarity/testability.
let result = self
.perform_fork(ev.entries.clone(), drop_count, cfg.clone())
.perform_fork(ev.path.clone(), ev.conversation_id, drop_count, cfg.clone())
.await;
// We aren't using the initial history UI replay in session configured because we have more accurate version of the history.
match result {
@@ -302,12 +302,13 @@ impl App {
/// Thin wrapper around ConversationManager::fork_conversation.
async fn perform_fork(
&self,
conversation_history: Vec<codex_protocol::models::ResponseItem>,
conversation_path: std::path::PathBuf,
conversation_id: codex_protocol::mcp_protocol::ConversationId,
drop_count: usize,
cfg: codex_core::config::Config,
) -> codex_core::error::Result<codex_core::NewConversation> {
self.server
.fork_conversation(conversation_history, drop_count, cfg)
.fork_conversation(conversation_path, conversation_id, drop_count, cfg)
.await
}