mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
Compare commits
12 Commits
system-dat
...
forking-2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5cccaa5a0e | ||
|
|
6564b498ad | ||
|
|
92d0bb41ed | ||
|
|
daf45dcf17 | ||
|
|
de4524e280 | ||
|
|
bc5499d3f5 | ||
|
|
f147df539e | ||
|
|
d2730f2b93 | ||
|
|
7b3d8c83c9 | ||
|
|
2f50987567 | ||
|
|
35107a121a | ||
|
|
70dc8db746 |
@@ -9,6 +9,7 @@ use std::sync::atomic::AtomicU64;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use crate::AuthManager;
|
use crate::AuthManager;
|
||||||
|
use crate::conversation_history::EventMsgsHistory;
|
||||||
use crate::event_mapping::map_response_item_to_event_messages;
|
use crate::event_mapping::map_response_item_to_event_messages;
|
||||||
use async_channel::Receiver;
|
use async_channel::Receiver;
|
||||||
use async_channel::Sender;
|
use async_channel::Sender;
|
||||||
@@ -44,7 +45,7 @@ use crate::client_common::Prompt;
|
|||||||
use crate::client_common::ResponseEvent;
|
use crate::client_common::ResponseEvent;
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use crate::config_types::ShellEnvironmentPolicy;
|
use crate::config_types::ShellEnvironmentPolicy;
|
||||||
use crate::conversation_history::ConversationHistory;
|
use crate::conversation_history::ResponseItemsHistory;
|
||||||
use crate::environment_context::EnvironmentContext;
|
use crate::environment_context::EnvironmentContext;
|
||||||
use crate::error::CodexErr;
|
use crate::error::CodexErr;
|
||||||
use crate::error::Result as CodexResult;
|
use crate::error::Result as CodexResult;
|
||||||
@@ -263,7 +264,8 @@ struct State {
|
|||||||
current_task: Option<AgentTask>,
|
current_task: Option<AgentTask>,
|
||||||
pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
|
pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
|
||||||
pending_input: Vec<ResponseInputItem>,
|
pending_input: Vec<ResponseInputItem>,
|
||||||
history: ConversationHistory,
|
response_items: ResponseItemsHistory,
|
||||||
|
event_msgs: EventMsgsHistory,
|
||||||
token_info: Option<TokenUsageInfo>,
|
token_info: Option<TokenUsageInfo>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -417,7 +419,7 @@ impl Session {
|
|||||||
let rollout_path = rollout_recorder.rollout_path.clone();
|
let rollout_path = rollout_recorder.rollout_path.clone();
|
||||||
// Create the mutable state for the Session.
|
// Create the mutable state for the Session.
|
||||||
let state = State {
|
let state = State {
|
||||||
history: ConversationHistory::new(),
|
response_items: ResponseItemsHistory::new(),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -541,7 +543,7 @@ impl Session {
|
|||||||
InitialHistory::New => {
|
InitialHistory::New => {
|
||||||
// Build and record initial items (user instructions + environment context)
|
// Build and record initial items (user instructions + environment context)
|
||||||
let items = self.build_initial_context(turn_context);
|
let items = self.build_initial_context(turn_context);
|
||||||
self.record_conversation_items(&items).await;
|
self.record_response_items(&items).await;
|
||||||
}
|
}
|
||||||
InitialHistory::Resumed(_) | InitialHistory::Forked(_) => {
|
InitialHistory::Resumed(_) | InitialHistory::Forked(_) => {
|
||||||
let rollout_items = conversation_history.get_rollout_items();
|
let rollout_items = conversation_history.get_rollout_items();
|
||||||
@@ -550,7 +552,13 @@ impl Session {
|
|||||||
// Always add response items to conversation history
|
// Always add response items to conversation history
|
||||||
let response_items = conversation_history.get_response_items();
|
let response_items = conversation_history.get_response_items();
|
||||||
if !response_items.is_empty() {
|
if !response_items.is_empty() {
|
||||||
self.record_into_history(&response_items);
|
self.record_into_history_response_items(&response_items);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Always add event msgs to conversation history
|
||||||
|
let event_msgs = conversation_history.get_event_msgs();
|
||||||
|
if let Some(event_msgs) = event_msgs {
|
||||||
|
self.record_into_history_event_msgs(&event_msgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
// If persisting, persist all rollout items as-is (recorder filters)
|
// If persisting, persist all rollout items as-is (recorder filters)
|
||||||
@@ -563,9 +571,9 @@ impl Session {
|
|||||||
|
|
||||||
/// Persist the event to rollout and send it to clients.
|
/// Persist the event to rollout and send it to clients.
|
||||||
pub(crate) async fn send_event(&self, event: Event) {
|
pub(crate) async fn send_event(&self, event: Event) {
|
||||||
// Persist the event into rollout (recorder filters as needed)
|
// Persist the event into event_msgs in memory
|
||||||
let rollout_items = vec![RolloutItem::EventMsg(event.msg.clone())];
|
self.record_conversation_event_msgs(std::slice::from_ref(&event.msg))
|
||||||
self.persist_rollout_items(&rollout_items).await;
|
.await;
|
||||||
if let Err(e) = self.tx_event.send(event).await {
|
if let Err(e) = self.tx_event.send(event).await {
|
||||||
error!("failed to send tool call event: {e}");
|
error!("failed to send tool call event: {e}");
|
||||||
}
|
}
|
||||||
@@ -655,18 +663,31 @@ impl Session {
|
|||||||
state.approved_commands.insert(cmd);
|
state.approved_commands.insert(cmd);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn record_conversation_event_msgs(&self, items: &[EventMsg]) {
|
||||||
|
self.record_into_history_event_msgs(items);
|
||||||
|
self.persist_rollout_event_msgs(items).await;
|
||||||
|
}
|
||||||
|
|
||||||
/// Records input items: always append to conversation history and
|
/// Records input items: always append to conversation history and
|
||||||
/// persist these response items to rollout.
|
/// persist these response items to rollout.
|
||||||
async fn record_conversation_items(&self, items: &[ResponseItem]) {
|
async fn record_response_items(&self, items: &[ResponseItem]) {
|
||||||
self.record_into_history(items);
|
self.record_into_history_response_items(items);
|
||||||
self.persist_rollout_response_items(items).await;
|
self.persist_rollout_response_items(items).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Append ResponseItems to the in-memory conversation history only.
|
/// Append ResponseItems to the in-memory conversation history only.
|
||||||
fn record_into_history(&self, items: &[ResponseItem]) {
|
fn record_into_history_response_items(&self, items: &[ResponseItem]) {
|
||||||
self.state
|
self.state
|
||||||
.lock_unchecked()
|
.lock_unchecked()
|
||||||
.history
|
.response_items
|
||||||
|
.record_items(items.iter());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Append EventMsgs to the in-memory conversation history only.
|
||||||
|
fn record_into_history_event_msgs(&self, items: &[EventMsg]) {
|
||||||
|
self.state
|
||||||
|
.lock_unchecked()
|
||||||
|
.event_msgs
|
||||||
.record_items(items.iter());
|
.record_items(items.iter());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -679,6 +700,12 @@ impl Session {
|
|||||||
self.persist_rollout_items(&rollout_items).await;
|
self.persist_rollout_items(&rollout_items).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn persist_rollout_event_msgs(&self, items: &[EventMsg]) {
|
||||||
|
let rollout_items: Vec<RolloutItem> =
|
||||||
|
items.iter().cloned().map(RolloutItem::EventMsg).collect();
|
||||||
|
self.persist_rollout_items(&rollout_items).await;
|
||||||
|
}
|
||||||
|
|
||||||
fn build_initial_context(&self, turn_context: &TurnContext) -> Vec<ResponseItem> {
|
fn build_initial_context(&self, turn_context: &TurnContext) -> Vec<ResponseItem> {
|
||||||
let mut items = Vec::<ResponseItem>::with_capacity(2);
|
let mut items = Vec::<ResponseItem>::with_capacity(2);
|
||||||
if let Some(user_instructions) = turn_context.user_instructions.as_deref() {
|
if let Some(user_instructions) = turn_context.user_instructions.as_deref() {
|
||||||
@@ -710,13 +737,14 @@ impl Session {
|
|||||||
async fn record_input_and_rollout_usermsg(&self, response_input: &ResponseInputItem) {
|
async fn record_input_and_rollout_usermsg(&self, response_input: &ResponseInputItem) {
|
||||||
let response_item: ResponseItem = response_input.clone().into();
|
let response_item: ResponseItem = response_input.clone().into();
|
||||||
// Add to conversation history and persist response item to rollout
|
// Add to conversation history and persist response item to rollout
|
||||||
self.record_conversation_items(std::slice::from_ref(&response_item))
|
self.record_response_items(std::slice::from_ref(&response_item))
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// Derive user message events and persist only UserMessage to rollout
|
// Derive user message events and persist only UserMessage to rollout
|
||||||
let msgs =
|
let msgs =
|
||||||
map_response_item_to_event_messages(&response_item, self.show_raw_agent_reasoning);
|
map_response_item_to_event_messages(&response_item, self.show_raw_agent_reasoning);
|
||||||
let user_msgs: Vec<RolloutItem> = msgs
|
let user_msgs: Vec<RolloutItem> = msgs
|
||||||
|
.clone()
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(|m| match m {
|
.filter_map(|m| match m {
|
||||||
EventMsg::UserMessage(ev) => Some(RolloutItem::EventMsg(EventMsg::UserMessage(ev))),
|
EventMsg::UserMessage(ev) => Some(RolloutItem::EventMsg(EventMsg::UserMessage(ev))),
|
||||||
@@ -726,6 +754,7 @@ impl Session {
|
|||||||
if !user_msgs.is_empty() {
|
if !user_msgs.is_empty() {
|
||||||
self.persist_rollout_items(&user_msgs).await;
|
self.persist_rollout_items(&user_msgs).await;
|
||||||
}
|
}
|
||||||
|
self.state.lock_unchecked().event_msgs.record_items(&msgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn on_exec_command_begin(
|
async fn on_exec_command_begin(
|
||||||
@@ -908,7 +937,7 @@ impl Session {
|
|||||||
/// Build the full turn input by concatenating the current conversation
|
/// Build the full turn input by concatenating the current conversation
|
||||||
/// history with additional items for this turn.
|
/// history with additional items for this turn.
|
||||||
pub fn turn_input_with_history(&self, extra: Vec<ResponseItem>) -> Vec<ResponseItem> {
|
pub fn turn_input_with_history(&self, extra: Vec<ResponseItem>) -> Vec<ResponseItem> {
|
||||||
[self.state.lock_unchecked().history.contents(), extra].concat()
|
[self.state.lock_unchecked().response_items.contents(), extra].concat()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the input if there was no task running to inject into
|
/// Returns the input if there was no task running to inject into
|
||||||
@@ -1163,7 +1192,7 @@ async fn submission_loop(
|
|||||||
// Install the new persistent context for subsequent tasks/turns.
|
// Install the new persistent context for subsequent tasks/turns.
|
||||||
turn_context = Arc::new(new_turn_context);
|
turn_context = Arc::new(new_turn_context);
|
||||||
if cwd.is_some() || approval_policy.is_some() || sandbox_policy.is_some() {
|
if cwd.is_some() || approval_policy.is_some() || sandbox_policy.is_some() {
|
||||||
sess.record_conversation_items(&[ResponseItem::from(EnvironmentContext::new(
|
sess.record_response_items(&[ResponseItem::from(EnvironmentContext::new(
|
||||||
cwd,
|
cwd,
|
||||||
approval_policy,
|
approval_policy,
|
||||||
sandbox_policy,
|
sandbox_policy,
|
||||||
@@ -1380,12 +1409,17 @@ async fn submission_loop(
|
|||||||
}
|
}
|
||||||
Op::GetHistory => {
|
Op::GetHistory => {
|
||||||
let sub_id = sub.id.clone();
|
let sub_id = sub.id.clone();
|
||||||
|
let entries = {
|
||||||
|
let state = sess.state.lock_unchecked();
|
||||||
|
let rolled_response_items: Vec<RolloutItem> = (&state.response_items).into();
|
||||||
|
let rolled_event_msgs: Vec<RolloutItem> = (&state.event_msgs).into();
|
||||||
|
[rolled_response_items, rolled_event_msgs].concat()
|
||||||
|
};
|
||||||
let event = Event {
|
let event = Event {
|
||||||
id: sub_id.clone(),
|
id: sub_id.clone(),
|
||||||
msg: EventMsg::ConversationHistory(ConversationHistoryResponseEvent {
|
msg: EventMsg::ConversationHistory(ConversationHistoryResponseEvent {
|
||||||
conversation_id: sess.conversation_id,
|
conversation_id: sess.conversation_id,
|
||||||
entries: sess.state.lock_unchecked().history.contents(),
|
history: InitialHistory::Forked(entries),
|
||||||
}),
|
}),
|
||||||
};
|
};
|
||||||
sess.send_event(event).await;
|
sess.send_event(event).await;
|
||||||
@@ -1446,7 +1480,7 @@ async fn run_task(
|
|||||||
.into_iter()
|
.into_iter()
|
||||||
.map(ResponseItem::from)
|
.map(ResponseItem::from)
|
||||||
.collect::<Vec<ResponseItem>>();
|
.collect::<Vec<ResponseItem>>();
|
||||||
sess.record_conversation_items(&pending_input).await;
|
sess.record_response_items(&pending_input).await;
|
||||||
|
|
||||||
// Construct the input that we will send to the model. When using the
|
// Construct the input that we will send to the model. When using the
|
||||||
// Chat completions API (or ZDR clients), the model needs the full
|
// Chat completions API (or ZDR clients), the model needs the full
|
||||||
@@ -1573,7 +1607,7 @@ async fn run_task(
|
|||||||
|
|
||||||
// Only attempt to take the lock if there is something to record.
|
// Only attempt to take the lock if there is something to record.
|
||||||
if !items_to_record_in_conversation_history.is_empty() {
|
if !items_to_record_in_conversation_history.is_empty() {
|
||||||
sess.record_conversation_items(&items_to_record_in_conversation_history)
|
sess.record_response_items(&items_to_record_in_conversation_history)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1928,7 +1962,7 @@ async fn run_compact_task(
|
|||||||
|
|
||||||
{
|
{
|
||||||
let mut state = sess.state.lock_unchecked();
|
let mut state = sess.state.lock_unchecked();
|
||||||
state.history.keep_last_messages(1);
|
state.response_items.keep_last_messages(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
let event = Event {
|
let event = Event {
|
||||||
@@ -2882,7 +2916,9 @@ async fn drain_to_completed(
|
|||||||
Ok(ResponseEvent::OutputItemDone(item)) => {
|
Ok(ResponseEvent::OutputItemDone(item)) => {
|
||||||
// Record only to in-memory conversation history; avoid state snapshot.
|
// Record only to in-memory conversation history; avoid state snapshot.
|
||||||
let mut state = sess.state.lock_unchecked();
|
let mut state = sess.state.lock_unchecked();
|
||||||
state.history.record_items(std::slice::from_ref(&item));
|
state
|
||||||
|
.response_items
|
||||||
|
.record_items(std::slice::from_ref(&item));
|
||||||
}
|
}
|
||||||
Ok(ResponseEvent::Completed {
|
Ok(ResponseEvent::Completed {
|
||||||
response_id: _,
|
response_id: _,
|
||||||
|
|||||||
@@ -1,13 +1,16 @@
|
|||||||
|
use crate::rollout::policy::should_persist_event_msg;
|
||||||
use codex_protocol::models::ResponseItem;
|
use codex_protocol::models::ResponseItem;
|
||||||
|
use codex_protocol::protocol::EventMsg;
|
||||||
|
use codex_protocol::protocol::RolloutItem;
|
||||||
|
|
||||||
/// Transcript of conversation history
|
/// Transcript of conversation history
|
||||||
#[derive(Debug, Clone, Default)]
|
#[derive(Debug, Clone, Default)]
|
||||||
pub(crate) struct ConversationHistory {
|
pub(crate) struct ResponseItemsHistory {
|
||||||
/// The oldest items are at the beginning of the vector.
|
/// The oldest items are at the beginning of the vector.
|
||||||
items: Vec<ResponseItem>,
|
items: Vec<ResponseItem>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ConversationHistory {
|
impl ResponseItemsHistory {
|
||||||
pub(crate) fn new() -> Self {
|
pub(crate) fn new() -> Self {
|
||||||
Self { items: Vec::new() }
|
Self { items: Vec::new() }
|
||||||
}
|
}
|
||||||
@@ -61,6 +64,51 @@ impl ConversationHistory {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
pub(crate) struct EventMsgsHistory {
|
||||||
|
items: Vec<EventMsg>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EventMsgsHistory {
|
||||||
|
pub(crate) fn record_items<I>(&mut self, items: I)
|
||||||
|
where
|
||||||
|
I: IntoIterator,
|
||||||
|
I::Item: std::ops::Deref<Target = EventMsg>,
|
||||||
|
{
|
||||||
|
for item in items {
|
||||||
|
if self.should_record_item(&item) {
|
||||||
|
self.items.push(item.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn should_record_item(&self, item: &EventMsg) -> bool {
|
||||||
|
should_persist_event_msg(item)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&ResponseItemsHistory> for Vec<RolloutItem> {
|
||||||
|
fn from(history: &ResponseItemsHistory) -> Self {
|
||||||
|
history
|
||||||
|
.items
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.map(RolloutItem::ResponseItem)
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&EventMsgsHistory> for Vec<RolloutItem> {
|
||||||
|
fn from(history: &EventMsgsHistory) -> Self {
|
||||||
|
history
|
||||||
|
.items
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.map(RolloutItem::EventMsg)
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Anything that is not a system message or "reasoning" message is considered
|
/// Anything that is not a system message or "reasoning" message is considered
|
||||||
/// an API message.
|
/// an API message.
|
||||||
fn is_api_message(message: &ResponseItem) -> bool {
|
fn is_api_message(message: &ResponseItem) -> bool {
|
||||||
@@ -103,7 +151,7 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn filters_non_api_messages() {
|
fn filters_non_api_messages() {
|
||||||
let mut h = ConversationHistory::default();
|
let mut h = ResponseItemsHistory::default();
|
||||||
// System message is not an API message; Other is ignored.
|
// System message is not an API message; Other is ignored.
|
||||||
let system = ResponseItem::Message {
|
let system = ResponseItem::Message {
|
||||||
id: None,
|
id: None,
|
||||||
|
|||||||
@@ -150,7 +150,7 @@ impl ConversationManager {
|
|||||||
/// caller's `config`). The new conversation will have a fresh id.
|
/// caller's `config`). The new conversation will have a fresh id.
|
||||||
pub async fn fork_conversation(
|
pub async fn fork_conversation(
|
||||||
&self,
|
&self,
|
||||||
conversation_history: Vec<ResponseItem>,
|
conversation_history: InitialHistory,
|
||||||
num_messages_to_drop: usize,
|
num_messages_to_drop: usize,
|
||||||
config: Config,
|
config: Config,
|
||||||
) -> CodexResult<NewConversation> {
|
) -> CodexResult<NewConversation> {
|
||||||
@@ -171,43 +171,117 @@ impl ConversationManager {
|
|||||||
|
|
||||||
/// Return a prefix of `items` obtained by dropping the last `n` user messages
|
/// Return a prefix of `items` obtained by dropping the last `n` user messages
|
||||||
/// and all items that follow them.
|
/// and all items that follow them.
|
||||||
fn truncate_after_dropping_last_messages(items: Vec<ResponseItem>, n: usize) -> InitialHistory {
|
fn truncate_after_dropping_last_messages(history: InitialHistory, n: usize) -> InitialHistory {
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
let rolled: Vec<RolloutItem> = items.into_iter().map(RolloutItem::ResponseItem).collect();
|
return history;
|
||||||
return InitialHistory::Forked(rolled);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Walk backwards counting only `user` Message items, find cut index.
|
// Compute event prefix by dropping the last `n` user events (counted from the end).
|
||||||
let mut count = 0usize;
|
let event_msgs_prefix: Vec<EventMsg> =
|
||||||
let mut cut_index = 0usize;
|
build_event_prefix_excluding_last_n_user_turns(&history, n);
|
||||||
for (idx, item) in items.iter().enumerate().rev() {
|
|
||||||
|
// Keep only response items strictly before the cut (drop last `n` user messages).
|
||||||
|
let response_prefix: Vec<ResponseItem> =
|
||||||
|
build_response_prefix_excluding_last_n_user_turns(&history, n);
|
||||||
|
|
||||||
|
let rolled = build_truncated_rollout(&event_msgs_prefix, &response_prefix);
|
||||||
|
if rolled.is_empty() {
|
||||||
|
InitialHistory::New
|
||||||
|
} else {
|
||||||
|
InitialHistory::Forked(rolled)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build the event messages prefix from `history` by dropping the last `n` user
|
||||||
|
/// turns (counted from the end) and taking everything before that cut.
|
||||||
|
fn build_event_prefix_excluding_last_n_user_turns(
|
||||||
|
history: &InitialHistory,
|
||||||
|
n: usize,
|
||||||
|
) -> Vec<EventMsg> {
|
||||||
|
match history.get_event_msgs() {
|
||||||
|
Some(all_events) => {
|
||||||
|
take_prefix_before_index(&all_events, find_cut_event_index(&all_events, n))
|
||||||
|
}
|
||||||
|
None => Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build the response items prefix from `history` by dropping the last `n` user
|
||||||
|
/// turns (counted from the end) and taking everything before that cut.
|
||||||
|
fn build_response_prefix_excluding_last_n_user_turns(
|
||||||
|
history: &InitialHistory,
|
||||||
|
n: usize,
|
||||||
|
) -> Vec<ResponseItem> {
|
||||||
|
let all_items: Vec<ResponseItem> = history.get_response_items();
|
||||||
|
take_prefix_before_index(&all_items, find_cut_response_index(&all_items, n))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return a cloned prefix of `items` up to (but not including) `idx`.
|
||||||
|
/// If `idx` is `None`, returns an empty vector.
|
||||||
|
fn take_prefix_before_index<T: Clone>(items: &[T], idx: Option<usize>) -> Vec<T> {
|
||||||
|
match idx {
|
||||||
|
Some(i) => items[..i].to_vec(),
|
||||||
|
None => Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Find the index (into response items) of the Nth user message from the end.
|
||||||
|
fn find_cut_response_index(response_items: &[ResponseItem], n: usize) -> Option<usize> {
|
||||||
|
if n == 0 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let mut remaining = n;
|
||||||
|
for (idx, item) in response_items.iter().enumerate().rev() {
|
||||||
if let ResponseItem::Message { role, .. } = item
|
if let ResponseItem::Message { role, .. } = item
|
||||||
&& role == "user"
|
&& role == "user"
|
||||||
{
|
{
|
||||||
count += 1;
|
remaining -= 1;
|
||||||
if count == n {
|
if remaining == 0 {
|
||||||
// Cut everything from this user message to the end.
|
return Some(idx);
|
||||||
cut_index = idx;
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if cut_index == 0 {
|
None
|
||||||
// No prefix remains after dropping; start a new conversation.
|
}
|
||||||
InitialHistory::New
|
|
||||||
} else {
|
/// Find the index (into event messages) of the Nth user event from the end.
|
||||||
let rolled: Vec<RolloutItem> = items
|
fn find_cut_event_index(event_msgs: &[EventMsg], n: usize) -> Option<usize> {
|
||||||
.into_iter()
|
if n == 0 {
|
||||||
.take(cut_index)
|
return None;
|
||||||
.map(RolloutItem::ResponseItem)
|
|
||||||
.collect();
|
|
||||||
InitialHistory::Forked(rolled)
|
|
||||||
}
|
}
|
||||||
|
let mut remaining = n;
|
||||||
|
for (idx, ev) in event_msgs.iter().enumerate().rev() {
|
||||||
|
if matches!(ev, EventMsg::UserMessage(_)) {
|
||||||
|
remaining -= 1;
|
||||||
|
if remaining == 0 {
|
||||||
|
return Some(idx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build a truncated rollout by concatenating the (already-sliced) event messages and response items.
|
||||||
|
fn build_truncated_rollout(
|
||||||
|
event_msgs: &[EventMsg],
|
||||||
|
response_items: &[ResponseItem],
|
||||||
|
) -> Vec<RolloutItem> {
|
||||||
|
let mut rolled: Vec<RolloutItem> = Vec::with_capacity(event_msgs.len() + response_items.len());
|
||||||
|
rolled.extend(event_msgs.iter().cloned().map(RolloutItem::EventMsg));
|
||||||
|
rolled.extend(
|
||||||
|
response_items
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.map(RolloutItem::ResponseItem),
|
||||||
|
);
|
||||||
|
rolled
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::event_mapping::map_response_item_to_event_messages;
|
||||||
|
use crate::protocol::EventMsg;
|
||||||
use codex_protocol::models::ContentItem;
|
use codex_protocol::models::ContentItem;
|
||||||
use codex_protocol::models::ReasoningItemReasoningSummary;
|
use codex_protocol::models::ReasoningItemReasoningSummary;
|
||||||
use codex_protocol::models::ResponseItem;
|
use codex_protocol::models::ResponseItem;
|
||||||
@@ -221,6 +295,15 @@ mod tests {
|
|||||||
}],
|
}],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
fn user_input(text: &str) -> ResponseItem {
|
||||||
|
ResponseItem::Message {
|
||||||
|
id: None,
|
||||||
|
role: "user".to_string(),
|
||||||
|
content: vec![ContentItem::InputText {
|
||||||
|
text: text.to_string(),
|
||||||
|
}],
|
||||||
|
}
|
||||||
|
}
|
||||||
fn assistant_msg(text: &str) -> ResponseItem {
|
fn assistant_msg(text: &str) -> ResponseItem {
|
||||||
ResponseItem::Message {
|
ResponseItem::Message {
|
||||||
id: None,
|
id: None,
|
||||||
@@ -256,7 +339,13 @@ mod tests {
|
|||||||
assistant_msg("a4"),
|
assistant_msg("a4"),
|
||||||
];
|
];
|
||||||
|
|
||||||
let truncated = truncate_after_dropping_last_messages(items.clone(), 1);
|
// Wrap as InitialHistory::Forked with response items only.
|
||||||
|
let initial: Vec<RolloutItem> = items
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.map(RolloutItem::ResponseItem)
|
||||||
|
.collect();
|
||||||
|
let truncated = truncate_after_dropping_last_messages(InitialHistory::Forked(initial), 1);
|
||||||
let got_items = truncated.get_rollout_items();
|
let got_items = truncated.get_rollout_items();
|
||||||
let expected_items = vec![
|
let expected_items = vec![
|
||||||
RolloutItem::ResponseItem(items[0].clone()),
|
RolloutItem::ResponseItem(items[0].clone()),
|
||||||
@@ -268,7 +357,62 @@ mod tests {
|
|||||||
serde_json::to_value(&expected_items).unwrap()
|
serde_json::to_value(&expected_items).unwrap()
|
||||||
);
|
);
|
||||||
|
|
||||||
let truncated2 = truncate_after_dropping_last_messages(items, 2);
|
let initial2: Vec<RolloutItem> = items
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.map(RolloutItem::ResponseItem)
|
||||||
|
.collect();
|
||||||
|
let truncated2 = truncate_after_dropping_last_messages(InitialHistory::Forked(initial2), 2);
|
||||||
assert!(matches!(truncated2, InitialHistory::New));
|
assert!(matches!(truncated2, InitialHistory::New));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn event_prefix_counts_from_end_with_duplicate_user_prompts() {
|
||||||
|
// Two identical user prompts with assistant replies between them.
|
||||||
|
let responses = vec![
|
||||||
|
user_input("same"),
|
||||||
|
assistant_msg("a1"),
|
||||||
|
user_input("same"),
|
||||||
|
assistant_msg("a2"),
|
||||||
|
];
|
||||||
|
|
||||||
|
// Derive event messages in order from responses (user → UserMessage, assistant → AgentMessage).
|
||||||
|
let mut events: Vec<EventMsg> = Vec::new();
|
||||||
|
for r in &responses {
|
||||||
|
events.extend(map_response_item_to_event_messages(r, false));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build initial history containing both events and responses.
|
||||||
|
let mut initial: Vec<RolloutItem> = Vec::new();
|
||||||
|
initial.extend(events.iter().cloned().map(RolloutItem::EventMsg));
|
||||||
|
initial.extend(responses.iter().cloned().map(RolloutItem::ResponseItem));
|
||||||
|
|
||||||
|
// Drop the last user turn.
|
||||||
|
let truncated = truncate_after_dropping_last_messages(InitialHistory::Forked(initial), 1);
|
||||||
|
|
||||||
|
// Expect the event prefix to include the first user + first assistant only,
|
||||||
|
// and the response prefix to include the first user + first assistant only.
|
||||||
|
let got_items = truncated.get_rollout_items();
|
||||||
|
|
||||||
|
// Compute expected events and responses after cut.
|
||||||
|
let expected_event_prefix: Vec<RolloutItem> = events[..2]
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.map(RolloutItem::EventMsg)
|
||||||
|
.collect();
|
||||||
|
let expected_response_prefix: Vec<RolloutItem> = responses[..2]
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.map(RolloutItem::ResponseItem)
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut expected: Vec<RolloutItem> = Vec::new();
|
||||||
|
expected.extend(expected_event_prefix);
|
||||||
|
expected.extend(expected_response_prefix);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
serde_json::to_value(&got_items).unwrap(),
|
||||||
|
serde_json::to_value(&expected).unwrap()
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -77,12 +77,13 @@ async fn fork_conversation_twice_drops_to_first_message() {
|
|||||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationHistory(_))).await;
|
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationHistory(_))).await;
|
||||||
|
|
||||||
// Capture entries from the base history and compute expected prefixes after each fork.
|
// Capture entries from the base history and compute expected prefixes after each fork.
|
||||||
let entries_after_three = match &base_history {
|
let history_after_three = match &base_history {
|
||||||
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { entries, .. }) => {
|
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { history, .. }) => {
|
||||||
entries.clone()
|
history.clone()
|
||||||
}
|
}
|
||||||
_ => panic!("expected ConversationHistory event"),
|
_ => panic!("expected ConversationHistory event"),
|
||||||
};
|
};
|
||||||
|
let entries_after_three = history_after_three.get_rollout_items();
|
||||||
// History layout for this test:
|
// History layout for this test:
|
||||||
// [0] user instructions,
|
// [0] user instructions,
|
||||||
// [1] environment context,
|
// [1] environment context,
|
||||||
@@ -113,7 +114,7 @@ async fn fork_conversation_twice_drops_to_first_message() {
|
|||||||
conversation: codex_fork1,
|
conversation: codex_fork1,
|
||||||
..
|
..
|
||||||
} = conversation_manager
|
} = conversation_manager
|
||||||
.fork_conversation(entries_after_three.clone(), 1, config_for_fork.clone())
|
.fork_conversation(history_after_three.clone(), 1, config_for_fork.clone())
|
||||||
.await
|
.await
|
||||||
.expect("fork 1");
|
.expect("fork 1");
|
||||||
|
|
||||||
@@ -122,13 +123,14 @@ async fn fork_conversation_twice_drops_to_first_message() {
|
|||||||
matches!(ev, EventMsg::ConversationHistory(_))
|
matches!(ev, EventMsg::ConversationHistory(_))
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
let entries_after_first_fork = match &fork1_history {
|
let history_after_first_fork = match &fork1_history {
|
||||||
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { entries, .. }) => {
|
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { history, .. }) => {
|
||||||
assert!(matches!(
|
let got = history.get_rollout_items();
|
||||||
fork1_history,
|
assert_eq!(
|
||||||
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { ref entries, .. }) if *entries == expected_after_first
|
serde_json::to_value(&got).unwrap(),
|
||||||
));
|
serde_json::to_value(&expected_after_first).unwrap()
|
||||||
entries.clone()
|
);
|
||||||
|
history.clone()
|
||||||
}
|
}
|
||||||
_ => panic!("expected ConversationHistory event after first fork"),
|
_ => panic!("expected ConversationHistory event after first fork"),
|
||||||
};
|
};
|
||||||
@@ -138,7 +140,7 @@ async fn fork_conversation_twice_drops_to_first_message() {
|
|||||||
conversation: codex_fork2,
|
conversation: codex_fork2,
|
||||||
..
|
..
|
||||||
} = conversation_manager
|
} = conversation_manager
|
||||||
.fork_conversation(entries_after_first_fork.clone(), 1, config_for_fork.clone())
|
.fork_conversation(history_after_first_fork.clone(), 1, config_for_fork.clone())
|
||||||
.await
|
.await
|
||||||
.expect("fork 2");
|
.expect("fork 2");
|
||||||
|
|
||||||
@@ -147,8 +149,14 @@ async fn fork_conversation_twice_drops_to_first_message() {
|
|||||||
matches!(ev, EventMsg::ConversationHistory(_))
|
matches!(ev, EventMsg::ConversationHistory(_))
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
assert!(matches!(
|
match &fork2_history {
|
||||||
fork2_history,
|
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { history, .. }) => {
|
||||||
EventMsg::ConversationHistory(ConversationHistoryResponseEvent { ref entries, .. }) if *entries == expected_after_second
|
let got = history.get_rollout_items();
|
||||||
));
|
assert_eq!(
|
||||||
|
serde_json::to_value(&got).unwrap(),
|
||||||
|
serde_json::to_value(&expected_after_second).unwrap()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
_ => panic!("expected ConversationHistory event after second fork"),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -803,7 +803,7 @@ pub struct WebSearchEndEvent {
|
|||||||
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
|
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
|
||||||
pub struct ConversationHistoryResponseEvent {
|
pub struct ConversationHistoryResponseEvent {
|
||||||
pub conversation_id: ConversationId,
|
pub conversation_id: ConversationId,
|
||||||
pub entries: Vec<ResponseItem>,
|
pub history: InitialHistory,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
|
#[derive(Debug, Clone, Deserialize, Serialize, TS)]
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ use crate::backtrack_helpers;
|
|||||||
use crate::pager_overlay::Overlay;
|
use crate::pager_overlay::Overlay;
|
||||||
use crate::tui;
|
use crate::tui;
|
||||||
use crate::tui::TuiEvent;
|
use crate::tui::TuiEvent;
|
||||||
|
use codex_core::InitialHistory;
|
||||||
use codex_core::protocol::ConversationHistoryResponseEvent;
|
use codex_core::protocol::ConversationHistoryResponseEvent;
|
||||||
use codex_protocol::mcp_protocol::ConversationId;
|
use codex_protocol::mcp_protocol::ConversationId;
|
||||||
use color_eyre::eyre::Result;
|
use color_eyre::eyre::Result;
|
||||||
@@ -288,7 +289,7 @@ impl App {
|
|||||||
let cfg = self.chat_widget.config_ref().clone();
|
let cfg = self.chat_widget.config_ref().clone();
|
||||||
// Perform the fork via a thin wrapper for clarity/testability.
|
// Perform the fork via a thin wrapper for clarity/testability.
|
||||||
let result = self
|
let result = self
|
||||||
.perform_fork(ev.entries.clone(), drop_count, cfg.clone())
|
.perform_fork(ev.history.clone(), drop_count, cfg.clone())
|
||||||
.await;
|
.await;
|
||||||
match result {
|
match result {
|
||||||
Ok(new_conv) => {
|
Ok(new_conv) => {
|
||||||
@@ -301,7 +302,7 @@ impl App {
|
|||||||
/// Thin wrapper around ConversationManager::fork_conversation.
|
/// Thin wrapper around ConversationManager::fork_conversation.
|
||||||
async fn perform_fork(
|
async fn perform_fork(
|
||||||
&self,
|
&self,
|
||||||
entries: Vec<codex_protocol::models::ResponseItem>,
|
entries: InitialHistory,
|
||||||
drop_count: usize,
|
drop_count: usize,
|
||||||
cfg: codex_core::config::Config,
|
cfg: codex_core::config::Config,
|
||||||
) -> codex_core::error::Result<codex_core::NewConversation> {
|
) -> codex_core::error::Result<codex_core::NewConversation> {
|
||||||
|
|||||||
Reference in New Issue
Block a user