Compare commits

...

20 Commits

Author SHA1 Message Date
jif-oai
328ef2585f switch shortcut 2025-12-27 18:17:48 +01:00
jif-oai
9fead1f290 NIT 2025-12-23 11:12:00 +01:00
jif-oai
0a3db61ec8 Merge remote-tracking branch 'origin/jif/multi-agent-3' into jif/multi-agent-4
# Conflicts:
#	codex-rs/tui/src/chatwidget.rs
#	codex-rs/tui/src/chatwidget/tests.rs
2025-12-23 11:05:11 +01:00
jif-oai
c26c29be9b Fix stuff 2025-12-23 11:02:54 +01:00
jif-oai
105effb569 Merge remote-tracking branch 'origin/jif/multi-agent-2' into jif/multi-agent-3
# Conflicts:
#	codex-rs/core/src/codex.rs
2025-12-23 10:52:41 +01:00
jif-oai
ec1de7e8c0 Merge branch 'main' into jif/multi-agent-2 2025-12-23 10:49:12 +01:00
jif-oai
96da76b77c NIT 3 2025-12-23 10:48:20 +01:00
jif-oai
16e3312eff NIT 2 2025-12-23 10:43:00 +01:00
jif-oai
9c84be7841 NIT 2025-12-23 10:41:40 +01:00
jif-oai
0ef7b23edb Merge branch 'jif/multi-agent-3' into jif/multi-agent-4 2025-12-23 10:36:13 +01:00
jif-oai
7f4b324ce2 Fix test 2025-12-23 10:35:24 +01:00
jif-oai
c5961db8a4 NIT TUI 2025-12-23 10:27:34 +01:00
jif-oai
ee1f92012c chore[multi-agent]: debug UI 2025-12-23 10:08:46 +01:00
jif-oai
50cb2bea55 NIT clean 2025-12-23 10:03:20 +01:00
jif-oai
43b6d3e631 Move agent state 2025-12-18 15:26:03 +00:00
jif-oai
c2f57b605a Doc 2025-12-18 15:21:39 +00:00
jif-oai
b9331372ad chore[multi-agent]: multiple agents running 2025-12-18 14:51:41 +00:00
jif-oai
9c859b4e85 base 3 2025-12-18 14:50:41 +00:00
jif-oai
090e5cd95e More baase 2025-12-18 14:27:28 +00:00
jif-oai
61474b285c base 2025-12-18 14:07:39 +00:00
39 changed files with 1623 additions and 304 deletions

View File

@@ -90,6 +90,7 @@ pub(crate) async fn apply_bespoke_event_handling(
let Event {
id: event_turn_id,
msg,
..
} = event;
match msg {
EventMsg::TaskComplete(_ev) => {

View File

@@ -151,6 +151,7 @@ use codex_protocol::protocol::GitInfo as CoreGitInfo;
use codex_protocol::protocol::McpAuthStatus as CoreMcpAuthStatus;
use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use codex_protocol::user_input::UserInput as CoreInputItem;
@@ -1592,7 +1593,16 @@ impl CodexMessageProcessor {
.await;
return;
}
InitialHistory::Forked(history.into_iter().map(RolloutItem::ResponseItem).collect())
InitialHistory::Forked(
history
.into_iter()
.map(|item| RolloutLine {
timestamp: String::new(),
agent_id: None,
item: RolloutItem::ResponseItem(item),
})
.collect(),
)
} else if let Some(path) = path {
match RolloutRecorder::get_rollout_history(&path).await {
Ok(initial_history) => initial_history,
@@ -2298,7 +2308,14 @@ impl CodexMessageProcessor {
} else {
match history {
Some(history) if !history.is_empty() => InitialHistory::Forked(
history.into_iter().map(RolloutItem::ResponseItem).collect(),
history
.into_iter()
.map(|item| RolloutLine {
timestamp: String::new(),
agent_id: None,
item: RolloutItem::ResponseItem(item),
})
.collect(),
),
Some(_) | None => {
self.send_invalid_request_error(
@@ -3594,6 +3611,7 @@ mod tests {
let line = RolloutLine {
timestamp: timestamp.clone(),
agent_id: None,
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: session_meta.clone(),
git: None,

File diff suppressed because it is too large Load Diff

View File

@@ -182,14 +182,17 @@ async fn forward_events(
// ignore all legacy delta events
Event {
id: _,
agent_id: _,
msg: EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_),
} => {}
Event {
id: _,
agent_id: _,
msg: EventMsg::SessionConfigured(_),
} => {}
Event {
id,
agent_id: _,
msg: EventMsg::ExecApprovalRequest(event),
} => {
// Initiate approval via parent session; do not surface to consumer.
@@ -205,6 +208,7 @@ async fn forward_events(
}
Event {
id,
agent_id: _,
msg: EventMsg::ApplyPatchApprovalRequest(event),
} => {
handle_patch_approval(
@@ -372,6 +376,7 @@ mod tests {
tx_out
.send(Event {
id: "full".to_string(),
agent_id: None,
msg: EventMsg::TurnAborted(TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -391,6 +396,7 @@ mod tests {
tx_events
.send(Event {
id: "evt".to_string(),
agent_id: None,
msg: EventMsg::RawResponseItem(RawResponseItemEvent {
item: ResponseItem::CustomToolCall {
id: None,

View File

@@ -68,7 +68,7 @@ async fn run_compact_task_inner(
) {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let mut history = sess.clone_history().await;
let mut history = sess.clone_history(&turn_context.agent_id).await;
history.record_items(
&[initial_input_for_turn.into()],
turn_context.truncation_policy,
@@ -92,7 +92,8 @@ async fn run_compact_task_inner(
final_output_json_schema: turn_context.final_output_json_schema.clone(),
truncation_policy: Some(turn_context.truncation_policy.into()),
});
sess.persist_rollout_items(&[rollout_item]).await;
sess.persist_rollout_items(&turn_context.agent_id, &[rollout_item])
.await;
loop {
let turn_input = history.get_history_for_prompt();
@@ -155,7 +156,10 @@ async fn run_compact_task_inner(
}
}
let history_snapshot = sess.clone_history().await.get_history();
let history_snapshot = sess
.clone_history(&turn_context.agent_id)
.await
.get_history();
let summary_suffix =
get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default();
let summary_text = format!("{SUMMARY_PREFIX}\n{summary_suffix}");
@@ -169,14 +173,16 @@ async fn run_compact_task_inner(
.cloned()
.collect();
new_history.extend(ghost_snapshots);
sess.replace_history(new_history).await;
sess.replace_history(&turn_context.agent_id, new_history)
.await;
sess.recompute_token_usage(&turn_context).await;
let rollout_item = RolloutItem::Compacted(CompactedItem {
message: summary_text.clone(),
replacement_history: None,
});
sess.persist_rollout_items(&[rollout_item]).await;
sess.persist_rollout_items(&turn_context.agent_id, &[rollout_item])
.await;
let event = EventMsg::ContextCompacted(ContextCompactedEvent {});
sess.send_event(&turn_context, event).await;

View File

@@ -40,7 +40,7 @@ async fn run_remote_compact_task_inner_impl(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> CodexResult<()> {
let mut history = sess.clone_history().await;
let mut history = sess.clone_history(&turn_context.agent_id).await;
let prompt = Prompt {
input: history.get_history_for_prompt(),
tools: vec![],
@@ -64,15 +64,19 @@ async fn run_remote_compact_task_inner_impl(
if !ghost_snapshots.is_empty() {
new_history.extend(ghost_snapshots);
}
sess.replace_history(new_history.clone()).await;
sess.replace_history(&turn_context.agent_id, new_history.clone())
.await;
sess.recompute_token_usage(turn_context).await;
let compacted_item = CompactedItem {
message: String::new(),
replacement_history: Some(new_history),
};
sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)])
.await;
sess.persist_rollout_items(
&turn_context.agent_id,
&[RolloutItem::Compacted(compacted_item)],
)
.await;
let event = EventMsg::ContextCompacted(ContextCompactedEvent {});
sess.send_event(turn_context, event).await;

View File

@@ -20,8 +20,10 @@ use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ModelPreset;
use codex_protocol::protocol::DEFAULT_AGENT_ID;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionSource;
use std::collections::HashMap;
use std::path::PathBuf;
@@ -146,6 +148,7 @@ impl ConversationManager {
Event {
id,
msg: EventMsg::SessionConfigured(session_configured),
..
} if id == INITIAL_SUBMIT_ID => session_configured,
_ => {
return Err(CodexErr::SessionConfiguredNotFirstEvent);
@@ -267,12 +270,16 @@ impl ConversationManager {
/// (0-based) and all items that follow it.
fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> InitialHistory {
// Work directly on rollout items, and cut the vector at the nth user message input.
let items: Vec<RolloutItem> = history.get_rollout_items();
let lines: Vec<RolloutLine> = history.get_rollout_lines();
let root_lines: Vec<RolloutLine> = lines
.into_iter()
.filter(|line| line.agent_id.as_deref().unwrap_or(&DEFAULT_AGENT_ID) == *DEFAULT_AGENT_ID)
.collect();
// Find indices of user message inputs in rollout order.
let mut user_positions: Vec<usize> = Vec::new();
for (idx, item) in items.iter().enumerate() {
if let RolloutItem::ResponseItem(item @ ResponseItem::Message { .. }) = item
for (idx, line) in root_lines.iter().enumerate() {
if let RolloutItem::ResponseItem(item @ ResponseItem::Message { .. }) = &line.item
&& matches!(
crate::event_mapping::parse_turn_item(item),
Some(TurnItem::UserMessage(_))
@@ -289,7 +296,7 @@ fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> Initia
// Cut strictly before the nth user message (do not keep the nth itself).
let cut_idx = user_positions[n];
let rolled: Vec<RolloutItem> = items.into_iter().take(cut_idx).collect();
let rolled: Vec<RolloutLine> = root_lines.into_iter().take(cut_idx).collect();
if rolled.is_empty() {
InitialHistory::New
@@ -353,13 +360,17 @@ mod tests {
];
// Wrap as InitialHistory::Forked with response items only.
let initial: Vec<RolloutItem> = items
let initial: Vec<RolloutLine> = items
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.map(|item| RolloutLine {
timestamp: "t".to_string(),
agent_id: None,
item: RolloutItem::ResponseItem(item),
})
.collect();
let truncated = truncate_before_nth_user_message(InitialHistory::Forked(initial), 1);
let got_items = truncated.get_rollout_items();
let got_items = truncated.get_rollout_items_for_agent(&DEFAULT_AGENT_ID);
let expected_items = vec![
RolloutItem::ResponseItem(items[0].clone()),
RolloutItem::ResponseItem(items[1].clone()),
@@ -370,10 +381,14 @@ mod tests {
serde_json::to_value(&expected_items).unwrap()
);
let initial2: Vec<RolloutItem> = items
let initial2: Vec<RolloutLine> = items
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.map(|item| RolloutLine {
timestamp: "t".to_string(),
agent_id: None,
item: RolloutItem::ResponseItem(item),
})
.collect();
let truncated2 = truncate_before_nth_user_message(InitialHistory::Forked(initial2), 2);
assert_matches!(truncated2, InitialHistory::New);
@@ -388,14 +403,18 @@ mod tests {
items.push(user_msg("second question"));
items.push(assistant_msg("answer"));
let rollout_items: Vec<RolloutItem> = items
let rollout_items: Vec<RolloutLine> = items
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.map(|item| RolloutLine {
timestamp: "t".to_string(),
agent_id: None,
item: RolloutItem::ResponseItem(item),
})
.collect();
let truncated = truncate_before_nth_user_message(InitialHistory::Forked(rollout_items), 1);
let got_items = truncated.get_rollout_items();
let got_items = truncated.get_rollout_items_for_agent(&DEFAULT_AGENT_ID);
let expected: Vec<RolloutItem> = vec![
RolloutItem::ResponseItem(items[0].clone()),

View File

@@ -20,6 +20,7 @@ use crate::error::CodexErr;
use crate::error::Result;
use crate::error::SandboxErr;
use crate::get_platform_sandbox;
use crate::protocol::AgentId;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::ExecCommandOutputDeltaEvent;
@@ -123,6 +124,7 @@ pub enum SandboxType {
#[derive(Clone)]
pub struct StdoutStream {
pub sub_id: String,
pub agent_id: AgentId,
pub call_id: String,
pub tx_event: Sender<Event>,
}
@@ -715,6 +717,7 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
});
let event = Event {
id: stream.sub_id.clone(),
agent_id: Some(stream.agent_id.clone()),
msg,
};
#[allow(clippy::let_unit_value)]

View File

@@ -159,6 +159,7 @@ impl ElicitationRequestManager {
let _ = tx_event
.send(Event {
id: "mcp_elicitation_request".to_string(),
agent_id: None,
msg: EventMsg::ElicitationRequest(ElicitationRequestEvent {
server_name,
id,
@@ -373,6 +374,7 @@ impl McpConnectionManager {
let _ = tx_event
.send(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
agent_id: None,
msg: EventMsg::McpStartupComplete(summary),
})
.await;
@@ -664,6 +666,7 @@ async fn emit_update(
tx_event
.send(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
agent_id: None,
msg: EventMsg::McpStartupUpdate(update),
})
.await

View File

@@ -5,7 +5,7 @@
//! JSON-Lines tooling. Each record has the following schema:
//!
//! ````text
//! {"conversation_id":"<uuid>","ts":<unix_seconds>,"text":"<message>"}
//! {"conversation_id":"<uuid>","agent_id":"<agent>","ts":<unix_seconds>,"text":"<message>"}
//! ````
//!
//! To minimise the chance of interleaved writes when multiple processes are
@@ -54,6 +54,8 @@ const RETRY_SLEEP: Duration = Duration::from_millis(100);
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct HistoryEntry {
pub session_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub agent_id: Option<String>,
pub ts: u64,
pub text: String,
}
@@ -69,6 +71,7 @@ fn history_filepath(config: &Config) -> PathBuf {
/// which entails a small amount of blocking I/O internally.
pub(crate) async fn append_entry(
text: &str,
agent_id: Option<&str>,
conversation_id: &ConversationId,
config: &Config,
) -> Result<()> {
@@ -99,6 +102,7 @@ pub(crate) async fn append_entry(
// Construct the JSON line first so we can write it in a single syscall.
let entry = HistoryEntry {
session_id: conversation_id.to_string(),
agent_id: agent_id.map(str::to_string),
ts,
text: text.to_string(),
};
@@ -416,11 +420,13 @@ mod tests {
let entries = vec![
HistoryEntry {
session_id: "first-session".to_string(),
agent_id: None,
ts: 1,
text: "first".to_string(),
},
HistoryEntry {
session_id: "second-session".to_string(),
agent_id: None,
ts: 2,
text: "second".to_string(),
},
@@ -451,11 +457,13 @@ mod tests {
let initial = HistoryEntry {
session_id: "first-session".to_string(),
agent_id: None,
ts: 1,
text: "first".to_string(),
};
let appended = HistoryEntry {
session_id: "second-session".to_string(),
agent_id: None,
ts: 2,
text: "second".to_string(),
};
@@ -504,7 +512,7 @@ mod tests {
let history_path = codex_home.path().join("history.jsonl");
append_entry(&entry_one, &conversation_id, &config)
append_entry(&entry_one, None, &conversation_id, &config)
.await
.expect("write first entry");
@@ -514,7 +522,7 @@ mod tests {
config.history.max_bytes =
Some(usize::try_from(limit_bytes).expect("limit should fit into usize"));
append_entry(&entry_two, &conversation_id, &config)
append_entry(&entry_two, None, &conversation_id, &config)
.await
.expect("write second entry");
@@ -551,13 +559,13 @@ mod tests {
let history_path = codex_home.path().join("history.jsonl");
append_entry(&short_entry, &conversation_id, &config)
append_entry(&short_entry, None, &conversation_id, &config)
.await
.expect("write first entry");
let short_entry_len = std::fs::metadata(&history_path).expect("metadata").len();
append_entry(&long_entry, &conversation_id, &config)
append_entry(&long_entry, None, &conversation_id, &config)
.await
.expect("write second entry");
@@ -572,7 +580,7 @@ mod tests {
.expect("max bytes should fit into usize"),
);
append_entry(&long_entry, &conversation_id, &config)
append_entry(&long_entry, None, &conversation_id, &config)
.await
.expect("write third entry");

View File

@@ -26,6 +26,7 @@ use super::policy::is_persisted_response_item;
use crate::config::Config;
use crate::default_client::originator;
use crate::git_info::collect_git_info;
use codex_protocol::protocol::AgentId;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::ResumedHistory;
use codex_protocol::protocol::RolloutItem;
@@ -62,7 +63,10 @@ pub enum RolloutRecorderParams {
}
enum RolloutCmd {
AddItems(Vec<RolloutItem>),
AddItems {
agent_id: String,
items: Vec<RolloutItem>,
},
/// Ensure all prior writes are processed; respond when flushed.
Flush {
ack: oneshot::Sender<()>,
@@ -177,7 +181,11 @@ impl RolloutRecorder {
Ok(Self { tx, rollout_path })
}
pub(crate) async fn record_items(&self, items: &[RolloutItem]) -> std::io::Result<()> {
pub(crate) async fn record_items(
&self,
agent_id: &AgentId,
items: &[RolloutItem],
) -> std::io::Result<()> {
let mut filtered = Vec::new();
for item in items {
// Note that function calls may look a bit strange if they are
@@ -191,7 +199,10 @@ impl RolloutRecorder {
return Ok(());
}
self.tx
.send(RolloutCmd::AddItems(filtered))
.send(RolloutCmd::AddItems {
agent_id: agent_id.to_string(),
items: filtered,
})
.await
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
}
@@ -214,7 +225,7 @@ impl RolloutRecorder {
return Err(IoError::other("empty session file"));
}
let mut items: Vec<RolloutItem> = Vec::new();
let mut lines: Vec<RolloutLine> = Vec::new();
let mut conversation_id: Option<ConversationId> = None;
for line in text.lines() {
if line.trim().is_empty() {
@@ -228,30 +239,17 @@ impl RolloutRecorder {
}
};
// Parse the rollout line structure
match serde_json::from_value::<RolloutLine>(v.clone()) {
Ok(rollout_line) => match rollout_line.item {
RolloutItem::SessionMeta(session_meta_line) => {
Ok(rollout_line) => {
if let RolloutItem::SessionMeta(session_meta_line) = &rollout_line.item
&& conversation_id.is_none()
{
// Use the FIRST SessionMeta encountered in the file as the canonical
// conversation id and main session information. Keep all items intact.
if conversation_id.is_none() {
conversation_id = Some(session_meta_line.meta.id);
}
items.push(RolloutItem::SessionMeta(session_meta_line));
conversation_id = Some(session_meta_line.meta.id);
}
RolloutItem::ResponseItem(item) => {
items.push(RolloutItem::ResponseItem(item));
}
RolloutItem::Compacted(item) => {
items.push(RolloutItem::Compacted(item));
}
RolloutItem::TurnContext(item) => {
items.push(RolloutItem::TurnContext(item));
}
RolloutItem::EventMsg(_ev) => {
items.push(RolloutItem::EventMsg(_ev));
}
},
lines.push(rollout_line);
}
Err(e) => {
warn!("failed to parse rollout line: {v:?}, error: {e}");
}
@@ -260,20 +258,20 @@ impl RolloutRecorder {
info!(
"Resumed rollout with {} items, conversation ID: {:?}",
items.len(),
lines.len(),
conversation_id
);
let conversation_id = conversation_id
.ok_or_else(|| IoError::other("failed to parse conversation ID from rollout file"))?;
if items.is_empty() {
if lines.is_empty() {
return Ok(InitialHistory::New);
}
info!("Resumed rollout successfully from {path:?}");
Ok(InitialHistory::Resumed(ResumedHistory {
conversation_id,
history: items,
history: lines,
rollout_path: path.to_path_buf(),
}))
}
@@ -364,17 +362,19 @@ async fn rollout_writer(
// Write the SessionMeta as the first item in the file, wrapped in a rollout line
writer
.write_rollout_item(RolloutItem::SessionMeta(session_meta_line))
.write_rollout_item(None, RolloutItem::SessionMeta(session_meta_line))
.await?;
}
// Process rollout commands
while let Some(cmd) = rx.recv().await {
match cmd {
RolloutCmd::AddItems(items) => {
RolloutCmd::AddItems { agent_id, items } => {
for item in items {
if is_persisted_response_item(&item) {
writer.write_rollout_item(item).await?;
writer
.write_rollout_item(Some(agent_id.as_str()), item)
.await?;
}
}
}
@@ -400,7 +400,11 @@ struct JsonlWriter {
}
impl JsonlWriter {
async fn write_rollout_item(&mut self, rollout_item: RolloutItem) -> std::io::Result<()> {
async fn write_rollout_item(
&mut self,
agent_id: Option<&str>,
rollout_item: RolloutItem,
) -> std::io::Result<()> {
let timestamp_format: &[FormatItem] = format_description!(
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
);
@@ -410,6 +414,7 @@ impl JsonlWriter {
let line = RolloutLine {
timestamp,
agent_id: agent_id.map(str::to_string),
item: rollout_item,
};
self.write_line(&line).await

View File

@@ -582,6 +582,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> {
let conversation_id = ConversationId::from_string(&uuid.to_string())?;
let meta_line = RolloutLine {
timestamp: ts.to_string(),
agent_id: None,
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
id: conversation_id,
@@ -600,6 +601,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> {
let user_event_line = RolloutLine {
timestamp: ts.to_string(),
agent_id: None,
item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "hello".into(),
images: None,
@@ -611,6 +613,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> {
for idx in 0..total_messages {
let response_line = RolloutLine {
timestamp: format!("{ts}-{idx:02}"),
agent_id: None,
item: RolloutItem::ResponseItem(ResponseItem::Message {
id: None,
role: "assistant".into(),

View File

@@ -0,0 +1,28 @@
use tokio::sync::Mutex;
use crate::codex::SessionConfiguration;
use crate::state::ActiveTurn;
use crate::state::SessionState;
use codex_protocol::protocol::AgentId;
/// Per-agent mutable state shared across async tasks.
///
/// The struct itself is stored in an `Arc`, so fields use `Mutex` to guard
/// concurrent mutation rather than additional `Arc` layers.
pub(crate) struct AgentState {
pub(crate) agent_id: AgentId,
/// Session configuration + conversation history for this agent.
pub(crate) state: Mutex<SessionState>,
/// Active turn state is tracked per-agent (each agent can have its own task).
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
}
impl AgentState {
pub(crate) fn new(agent_id: AgentId, session_configuration: SessionConfiguration) -> Self {
Self {
agent_id,
state: Mutex::new(SessionState::new(session_configuration)),
active_turn: Mutex::new(None),
}
}
}

View File

@@ -1,7 +1,9 @@
mod agent;
mod service;
mod session;
mod turn;
pub(crate) use agent::AgentState;
pub(crate) use service::SessionServices;
pub(crate) use session::SessionState;
pub(crate) use turn::ActiveTurn;

View File

@@ -8,14 +8,6 @@ mod user_shell;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use tokio::select;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tokio_util::task::AbortOnDropHandle;
use tracing::trace;
use tracing::warn;
use crate::AuthManager;
use crate::codex::Session;
use crate::codex::TurnContext;
@@ -27,7 +19,15 @@ use crate::protocol::TurnAbortedEvent;
use crate::state::ActiveTurn;
use crate::state::RunningTask;
use crate::state::TaskKind;
use async_trait::async_trait;
use codex_protocol::protocol::AgentId;
use codex_protocol::user_input::UserInput;
use tokio::select;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tokio_util::task::AbortOnDropHandle;
use tracing::trace;
use tracing::warn;
pub(crate) use compact::CompactTask;
pub(crate) use ghost_snapshot::GhostSnapshotTask;
@@ -109,7 +109,8 @@ impl Session {
input: Vec<UserInput>,
task: T,
) {
self.abort_all_tasks(TurnAbortReason::Replaced).await;
self.abort_agent_tasks(&turn_context.agent_id, TurnAbortReason::Replaced)
.await;
let task: Arc<dyn SessionTask> = Arc::new(task);
let task_kind = task.kind();
@@ -152,14 +153,25 @@ impl Session {
cancellation_token,
turn_context: Arc::clone(&turn_context),
};
self.register_new_active_task(running_task).await;
self.register_new_active_task(&turn_context.agent_id, running_task)
.await;
}
pub async fn abort_agent_tasks(self: &Arc<Self>, agent_id: &AgentId, reason: TurnAbortReason) {
for task in self.take_running_tasks_for_agent(agent_id).await {
self.handle_task_abort(task, reason.clone()).await;
}
self.maybe_close_unified_exec_sessions().await;
}
pub async fn abort_all_tasks(self: &Arc<Self>, reason: TurnAbortReason) {
for task in self.take_all_running_tasks().await {
self.handle_task_abort(task, reason.clone()).await;
for agent in self.agent_states().await {
let agent_id = agent.agent_id.clone();
for task in self.take_running_tasks_for_agent(&agent_id).await {
self.handle_task_abort(task, reason.clone()).await;
}
}
self.close_unified_exec_sessions().await;
self.maybe_close_unified_exec_sessions().await;
}
pub async fn on_task_finished(
@@ -167,32 +179,31 @@ impl Session {
turn_context: Arc<TurnContext>,
last_agent_message: Option<String>,
) {
let mut active = self.active_turn.lock().await;
let should_close_sessions = if let Some(at) = active.as_mut()
&& at.remove_task(&turn_context.sub_id)
let agent = self.get_or_create_agent(&turn_context.agent_id).await;
{
*active = None;
true
} else {
false
};
drop(active);
if should_close_sessions {
self.close_unified_exec_sessions().await;
let mut active = agent.active_turn.lock().await;
if let Some(at) = active.as_mut()
&& at.remove_task(&turn_context.sub_id)
{
*active = None;
}
}
self.maybe_close_unified_exec_sessions().await;
let event = EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message });
self.send_event(turn_context.as_ref(), event).await;
}
async fn register_new_active_task(&self, task: RunningTask) {
let mut active = self.active_turn.lock().await;
async fn register_new_active_task(&self, agent_id: &AgentId, task: RunningTask) {
let agent = self.get_or_create_agent(agent_id).await;
let mut active = agent.active_turn.lock().await;
let mut turn = ActiveTurn::default();
turn.add_task(task);
*active = Some(turn);
}
async fn take_all_running_tasks(&self) -> Vec<RunningTask> {
let mut active = self.active_turn.lock().await;
async fn take_running_tasks_for_agent(&self, agent_id: &AgentId) -> Vec<RunningTask> {
let agent = self.get_or_create_agent(agent_id).await;
let mut active = agent.active_turn.lock().await;
match active.take() {
Some(mut at) => {
at.clear_pending().await;
@@ -203,7 +214,10 @@ impl Session {
}
}
async fn close_unified_exec_sessions(&self) {
async fn maybe_close_unified_exec_sessions(&self) {
if self.has_active_tasks().await {
return;
}
self.services
.unified_exec_manager
.terminate_all_sessions()

View File

@@ -59,7 +59,7 @@ impl SessionTask for UndoTask {
return None;
}
let mut history = sess.clone_history().await;
let mut history = sess.clone_history(&ctx.agent_id).await;
let mut items = history.get_history();
let mut completed = UndoCompletedEvent {
success: false,
@@ -96,7 +96,7 @@ impl SessionTask for UndoTask {
match restore_result {
Ok(Ok(())) => {
items.remove(idx);
sess.replace_history(items).await;
sess.replace_history(&ctx.agent_id, items).await;
let short_id: String = commit_id.chars().take(7).collect();
info!(commit_id = commit_id, "Undo restored ghost snapshot");
completed.success = true;

View File

@@ -108,6 +108,7 @@ impl SessionTask for UserShellCommandTask {
let stdout_stream = Some(StdoutStream {
sub_id: turn_context.sub_id.clone(),
agent_id: turn_context.agent_id.clone(),
call_id: call_id.clone(),
tx_event: session.get_tx_event(),
});

View File

@@ -65,7 +65,10 @@ impl ToolHandler for ViewImageHandler {
let event_path = abs_path.clone();
session
.inject_input(vec![UserInput::LocalImage { path: abs_path }])
.inject_input(
&turn.agent_id,
vec![UserInput::LocalImage { path: abs_path }],
)
.await
.map_err(|_| {
FunctionCallError::RespondToModel(

View File

@@ -71,6 +71,7 @@ impl ApplyPatchRuntime {
fn stdout_stream(ctx: &ToolCtx<'_>) -> Option<crate::exec::StdoutStream> {
Some(crate::exec::StdoutStream {
sub_id: ctx.turn.sub_id.clone(),
agent_id: ctx.turn.agent_id.clone(),
call_id: ctx.call_id.clone(),
tx_event: ctx.session.get_tx_event(),
})

View File

@@ -56,6 +56,7 @@ impl ShellRuntime {
fn stdout_stream(ctx: &ToolCtx<'_>) -> Option<crate::exec::StdoutStream> {
Some(crate::exec::StdoutStream {
sub_id: ctx.turn.sub_id.clone(),
agent_id: ctx.turn.agent_id.clone(),
call_id: ctx.call_id.clone(),
tx_event: ctx.session.get_tx_event(),
})

View File

@@ -8,6 +8,7 @@ use codex_core::protocol::EventMsg;
use codex_core::protocol::InitialHistory;
use codex_core::protocol::ResumedHistory;
use codex_core::protocol::RolloutItem;
use codex_core::protocol::RolloutLine;
use codex_core::protocol::TurnContextItem;
use codex_core::protocol::WarningEvent;
use codex_protocol::ConversationId;
@@ -37,7 +38,11 @@ fn resume_history(
InitialHistory::Resumed(ResumedHistory {
conversation_id: ConversationId::default(),
history: vec![RolloutItem::TurnContext(turn_ctx)],
history: vec![RolloutLine {
timestamp: String::new(),
agent_id: None,
item: RolloutItem::TurnContext(turn_ctx),
}],
rollout_path: rollout_path.to_path_buf(),
})
}

View File

@@ -20,9 +20,10 @@ These are entities exit on the codex backend. The intent of this section is to e
- `Codex` starts with no `Session`, and it is initialized by `Op::ConfigureSession`, which should be the first message sent by the UI.
- The current `Session` can be reconfigured with additional `Op::ConfigureSession` calls.
- Any running execution is aborted when the session is reconfigured.
- A session hosts one or more agents, each with its own conversation history and turn loop.
3. `Task`
- A `Task` is `Codex` executing work in response to user input.
- `Session` has at most one `Task` running at a time.
- Each agent has at most one `Task` running at a time (tasks can run concurrently across agents).
- Receiving `Op::UserInput` starts a `Task`
- Consists of a series of `Turn`s
- The `Task` executes to until:
@@ -44,7 +45,7 @@ The term "UI" is used to refer to the application driving `Codex`. This may be t
When a `Turn` completes, the `response_id` from the `Model`'s final `response.completed` message is stored in the `Session` state to resume the thread given the next `Op::UserInput`. The `response_id` is also returned in the `EventMsg::TurnComplete` to the UI, which can be used to fork the thread from an earlier point by providing it in the `Op::UserInput`.
Since only 1 `Task` can be run at a time, for parallel tasks it is recommended that a single `Codex` be run for each thread of work.
Since each agent can only run 1 `Task` at a time, parallel tasks should be routed to different agents (or separate Codex sessions).
## Interface
@@ -58,6 +59,7 @@ Since only 1 `Task` can be run at a time, for parallel tasks it is recommended t
- `Event`
- These are messages sent on the `EQ` (`Codex` -> UI)
- Each `Event` has a non-unique ID, matching the `sub_id` from the `Op::UserInput` that started the current task.
- `Event` includes an optional `agent_id`; when omitted, clients should treat it as the default agent (`root`).
- `EventMsg` refers to the enum of all possible `Event` payloads
- This enum is `non_exhaustive`; variants can be added at future dates
- It should be expected that new `EventMsg` variants will be added over time to expose more detailed information about the model's actions.
@@ -66,6 +68,7 @@ For complete documentation of the `Op` and `EventMsg` variants, refer to [protoc
- `Op`
- `Op::UserInput` Any input from the user to kick off a `Task`
- `Op::ForAgent` Route a nested `Op` to a specific agent (defaults to the `root` agent when omitted)
- `Op::Interrupt` Interrupts a running task
- `Op::ExecApproval` Approve or deny code execution
- `Op::ListSkills` Request skills for one or more cwd values (optionally `force_reload`)

View File

@@ -160,7 +160,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
}
fn process_event(&mut self, event: Event) -> CodexStatus {
let Event { id: _, msg } = event;
let Event { id: _, msg, .. } = event;
match msg {
EventMsg::Error(ErrorEvent { message, .. }) => {
let prefix = "ERROR:".style(self.red);

View File

@@ -510,6 +510,7 @@ impl EventProcessor for EventProcessorWithJsonOutput {
fn print_config_summary(&mut self, _: &Config, _: &str, ev: &SessionConfiguredEvent) {
self.process_event(Event {
id: "".to_string(),
agent_id: None,
msg: EventMsg::SessionConfigured(ev.clone()),
});
}

View File

@@ -61,6 +61,7 @@ use std::time::Duration;
fn event(id: &str, msg: EventMsg) -> Event {
Event {
id: id.to_string(),
agent_id: None,
msg,
}
}

View File

@@ -68,6 +68,7 @@ pub async fn run_codex_tool_session(
let session_configured_event = Event {
// Use a fake id value for now.
id: "".to_string(),
agent_id: None,
msg: EventMsg::SessionConfigured(session_configured.clone()),
};
outgoing

View File

@@ -255,6 +255,7 @@ mod tests {
let rollout_file = NamedTempFile::new()?;
let event = Event {
id: "1".to_string(),
agent_id: None,
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: conversation_id,
model: "gpt-4o".to_string(),
@@ -309,6 +310,7 @@ mod tests {
};
let event = Event {
id: "1".to_string(),
agent_id: None,
msg: EventMsg::SessionConfigured(session_configured_event.clone()),
};
let meta = OutgoingNotificationMeta {

View File

@@ -6,6 +6,9 @@ use ts_rs::TS;
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
pub struct HistoryEntry {
pub conversation_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub agent_id: Option<String>,
pub ts: u64,
pub text: String,
}

View File

@@ -8,6 +8,7 @@ use std::fmt;
use std::path::Path;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::LazyLock;
use std::time::Duration;
use crate::ConversationId;
@@ -50,6 +51,9 @@ pub const USER_INSTRUCTIONS_CLOSE_TAG: &str = "</user_instructions>";
pub const ENVIRONMENT_CONTEXT_OPEN_TAG: &str = "<environment_context>";
pub const ENVIRONMENT_CONTEXT_CLOSE_TAG: &str = "</environment_context>";
pub const USER_MESSAGE_BEGIN: &str = "## My request for Codex:";
pub type AgentId = String;
pub static DEFAULT_AGENT_ID: LazyLock<AgentId> = LazyLock::new(|| "root".to_string());
/// Submission Queue Entry - requests from user
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
@@ -66,6 +70,13 @@ pub struct Submission {
#[allow(clippy::large_enum_variant)]
#[non_exhaustive]
pub enum Op {
/// Route an operation to a specific agent.
ForAgent {
/// Identifier of the target agent.
agent_id: AgentId,
/// Operation to route to the agent.
op: Box<Op>,
},
/// Abort current task.
/// This server sends [`EventMsg::TurnAborted`] in response.
Interrupt,
@@ -513,10 +524,14 @@ impl SandboxPolicy {
}
/// Event Queue Entry - events from agent
#[derive(Debug, Clone, Deserialize, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct Event {
/// Submission `id` that this event is correlated with.
pub id: String,
/// Optional agent identifier associated with this event.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub agent_id: Option<AgentId>,
/// Payload
pub msg: EventMsg,
}
@@ -1158,7 +1173,7 @@ pub struct ConversationPathResponseEvent {
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct ResumedHistory {
pub conversation_id: ConversationId,
pub history: Vec<RolloutItem>,
pub history: Vec<RolloutLine>,
pub rollout_path: PathBuf,
}
@@ -1166,37 +1181,63 @@ pub struct ResumedHistory {
pub enum InitialHistory {
New,
Resumed(ResumedHistory),
Forked(Vec<RolloutItem>),
Forked(Vec<RolloutLine>),
}
impl InitialHistory {
pub fn get_rollout_items(&self) -> Vec<RolloutItem> {
pub fn get_rollout_lines(&self) -> Vec<RolloutLine> {
match self {
InitialHistory::New => Vec::new(),
InitialHistory::Resumed(resumed) => resumed.history.clone(),
InitialHistory::Forked(items) => items.clone(),
InitialHistory::Forked(lines) => lines.clone(),
}
}
pub fn get_rollout_items_for_agent(&self, agent_id: &AgentId) -> Vec<RolloutItem> {
self.get_rollout_lines()
.into_iter()
.filter_map(|line| {
let line_agent = line.agent_id.as_deref().unwrap_or(&DEFAULT_AGENT_ID);
(line_agent == agent_id).then_some(line.item)
})
.collect()
}
pub fn get_event_msgs(&self) -> Option<Vec<EventMsg>> {
self.get_event_msgs_for_agent(&DEFAULT_AGENT_ID)
}
pub fn get_event_msgs_for_agent(&self, agent_id: &AgentId) -> Option<Vec<EventMsg>> {
match self {
InitialHistory::New => None,
InitialHistory::Resumed(resumed) => Some(
resumed
.history
.iter()
.filter_map(|ri| match ri {
RolloutItem::EventMsg(ev) => Some(ev.clone()),
_ => None,
.filter_map(|line| {
let line_agent = line.agent_id.as_deref().unwrap_or(&DEFAULT_AGENT_ID);
if line_agent != agent_id {
return None;
}
match &line.item {
RolloutItem::EventMsg(ev) => Some(ev.clone()),
_ => None,
}
})
.collect(),
),
InitialHistory::Forked(items) => Some(
items
InitialHistory::Forked(lines) => Some(
lines
.iter()
.filter_map(|ri| match ri {
RolloutItem::EventMsg(ev) => Some(ev.clone()),
_ => None,
.filter_map(|line| {
let line_agent = line.agent_id.as_deref().unwrap_or(&DEFAULT_AGENT_ID);
if line_agent != agent_id {
return None;
}
match &line.item {
RolloutItem::EventMsg(ev) => Some(ev.clone()),
_ => None,
}
})
.collect(),
),
@@ -1343,9 +1384,12 @@ pub enum TruncationPolicy {
Tokens(usize),
}
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema, TS)]
pub struct RolloutLine {
pub timestamp: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub agent_id: Option<AgentId>,
#[serde(flatten)]
pub item: RolloutItem,
}
@@ -1931,6 +1975,7 @@ mod tests {
let rollout_file = NamedTempFile::new()?;
let event = Event {
id: "1234".to_string(),
agent_id: None,
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: conversation_id,
model: "codex-mini-latest".to_string(),
@@ -1990,6 +2035,7 @@ mod tests {
fn serialize_mcp_startup_update_event() -> Result<()> {
let event = Event {
id: "init".to_string(),
agent_id: None,
msg: EventMsg::McpStartupUpdate(McpStartupUpdateEvent {
server: "srv".to_string(),
status: McpStartupStatus::Failed {
@@ -2010,6 +2056,7 @@ mod tests {
fn serialize_mcp_startup_complete_event() -> Result<()> {
let event = Event {
id: "init".to_string(),
agent_id: None,
msg: EventMsg::McpStartupComplete(McpStartupCompleteEvent {
ready: vec!["a".to_string()],
failed: vec![McpStartupFailure {

View File

@@ -693,6 +693,19 @@ impl App {
}
}
}
AppEvent::InsertEphemeralHistoryLines(lines) => {
if lines.is_empty() {
return Ok(true);
}
// Ephemeral lines are appended to the inline viewport only and do not
// update the persisted transcript_cells.
self.has_emitted_history_lines = true;
if self.overlay.is_some() {
self.deferred_history_lines.extend(lines);
} else {
tui.insert_history_lines(lines);
}
}
AppEvent::StartCommitAnimation => {
if self
.commit_anim_running
@@ -736,6 +749,12 @@ impl App {
return Ok(false);
}
AppEvent::CodexOp(op) => self.chat_widget.submit_op(op),
AppEvent::SwitchAgent { agent_id } => {
self.chat_widget.switch_to_agent(agent_id);
}
AppEvent::CreateAgentFromComposer { agent_id } => {
self.chat_widget.create_agent_from_composer(agent_id);
}
AppEvent::DiffResult(text) => {
// Clear the in-progress state in the bottom pane
self.chat_widget.on_diff_complete();
@@ -1624,6 +1643,7 @@ mod tests {
app.chat_widget.handle_codex_event(Event {
id: String::new(),
agent_id: None,
msg: EventMsg::SessionConfigured(event),
});

View File

@@ -6,6 +6,7 @@ use codex_core::protocol::Event;
use codex_core::protocol::RateLimitSnapshot;
use codex_file_search::FileMatch;
use codex_protocol::openai_models::ModelPreset;
use ratatui::text::Line;
use crate::bottom_pane::ApprovalRequest;
use crate::history_cell::HistoryCell;
@@ -54,6 +55,9 @@ pub(crate) enum AppEvent {
InsertHistoryCell(Box<dyn HistoryCell>),
/// Insert lines into the inline viewport only (do not persist to transcript).
InsertEphemeralHistoryLines(Vec<Line<'static>>),
StartCommitAnimation,
StopCommitAnimation,
CommitTick,
@@ -156,6 +160,18 @@ pub(crate) enum AppEvent {
/// Re-open the approval presets popup.
OpenApprovalsPopup,
/// Switch the active agent view.
SwitchAgent {
agent_id: String,
},
/// Create (if needed) a new agent and submit the current composer draft to it.
///
/// Triggered by Ctrl+N in the chat view.
CreateAgentFromComposer {
agent_id: String,
},
/// Forwarded conversation history snapshot from the current conversation.
ConversationHistory(ConversationPathResponseEvent),

View File

@@ -26,6 +26,7 @@ use codex_core::protocol::AgentReasoningRawContentEvent;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::BackgroundEventEvent;
use codex_core::protocol::CreditsSnapshot;
use codex_core::protocol::DEFAULT_AGENT_ID;
use codex_core::protocol::DeprecationNoticeEvent;
use codex_core::protocol::ErrorEvent;
use codex_core::protocol::Event;
@@ -144,6 +145,7 @@ use codex_file_search::FileMatch;
use codex_protocol::openai_models::ModelPreset;
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::plan_tool::UpdatePlanArgs;
use mcp_types::RequestId;
use strum::IntoEnumIterator;
const USER_SHELL_COMMAND_HELP_TITLE: &str = "Prefix a command with ! to run it locally";
@@ -155,6 +157,116 @@ struct RunningCommand {
source: ExecCommandSource,
}
struct AgentViewState {
active_cell: Option<Box<dyn HistoryCell>>,
stream_controller: Option<StreamController>,
running_commands: HashMap<String, RunningCommand>,
suppressed_exec_calls: HashSet<String>,
last_unified_wait: Option<UnifiedExecWaitState>,
task_complete_pending: bool,
unified_exec_sessions: Vec<UnifiedExecSessionSummary>,
interrupts: InterruptManager,
reasoning_buffer: String,
full_reasoning_buffer: String,
current_status_header: String,
retry_status_header: Option<String>,
token_info: Option<TokenUsageInfo>,
rate_limit_snapshot: Option<RateLimitSnapshotDisplay>,
plan_type: Option<PlanType>,
rate_limit_warnings: RateLimitWarningState,
rate_limit_switch_prompt: RateLimitSwitchPromptState,
is_review_mode: bool,
pre_review_token_info: Option<Option<TokenUsageInfo>>,
needs_final_message_separator: bool,
is_task_running: bool,
replay_messages: Vec<ReplayMessage>,
}
impl AgentViewState {
fn empty() -> Self {
Self {
active_cell: None,
stream_controller: None,
running_commands: HashMap::new(),
suppressed_exec_calls: HashSet::new(),
last_unified_wait: None,
task_complete_pending: false,
unified_exec_sessions: Vec::new(),
interrupts: InterruptManager::new(),
reasoning_buffer: String::new(),
full_reasoning_buffer: String::new(),
current_status_header: String::from("Working"),
retry_status_header: None,
token_info: None,
rate_limit_snapshot: None,
plan_type: None,
rate_limit_warnings: RateLimitWarningState::default(),
rate_limit_switch_prompt: RateLimitSwitchPromptState::default(),
is_review_mode: false,
pre_review_token_info: None,
needs_final_message_separator: false,
is_task_running: false,
replay_messages: Vec::new(),
}
}
fn capture_from(widget: &mut ChatWidget) -> Self {
Self {
active_cell: widget.active_cell.take(),
stream_controller: widget.stream_controller.take(),
running_commands: std::mem::take(&mut widget.running_commands),
suppressed_exec_calls: std::mem::take(&mut widget.suppressed_exec_calls),
last_unified_wait: widget.last_unified_wait.take(),
task_complete_pending: widget.task_complete_pending,
unified_exec_sessions: std::mem::take(&mut widget.unified_exec_sessions),
interrupts: std::mem::take(&mut widget.interrupts),
reasoning_buffer: std::mem::take(&mut widget.reasoning_buffer),
full_reasoning_buffer: std::mem::take(&mut widget.full_reasoning_buffer),
current_status_header: std::mem::take(&mut widget.current_status_header),
retry_status_header: widget.retry_status_header.take(),
token_info: widget.token_info.take(),
rate_limit_snapshot: widget.rate_limit_snapshot.take(),
plan_type: widget.plan_type.take(),
rate_limit_warnings: std::mem::take(&mut widget.rate_limit_warnings),
rate_limit_switch_prompt: std::mem::take(&mut widget.rate_limit_switch_prompt),
is_review_mode: widget.is_review_mode,
pre_review_token_info: widget.pre_review_token_info.take(),
needs_final_message_separator: widget.needs_final_message_separator,
is_task_running: widget.bottom_pane.is_task_running(),
replay_messages: std::mem::take(&mut widget.replay_messages),
}
}
fn restore_into(self, widget: &mut ChatWidget) {
widget.active_cell = self.active_cell;
widget.stream_controller = self.stream_controller;
widget.running_commands = self.running_commands;
widget.suppressed_exec_calls = self.suppressed_exec_calls;
widget.last_unified_wait = self.last_unified_wait;
widget.task_complete_pending = self.task_complete_pending;
widget.unified_exec_sessions = self.unified_exec_sessions;
widget.interrupts = self.interrupts;
widget.reasoning_buffer = self.reasoning_buffer;
widget.full_reasoning_buffer = self.full_reasoning_buffer;
widget.current_status_header = self.current_status_header;
widget.retry_status_header = self.retry_status_header;
widget.rate_limit_snapshot = self.rate_limit_snapshot;
widget.plan_type = self.plan_type;
widget.rate_limit_warnings = self.rate_limit_warnings;
widget.rate_limit_switch_prompt = self.rate_limit_switch_prompt;
widget.is_review_mode = self.is_review_mode;
widget.pre_review_token_info = self.pre_review_token_info;
widget.needs_final_message_separator = self.needs_final_message_separator;
widget.replay_messages = self.replay_messages;
widget.bottom_pane.set_task_running(self.is_task_running);
widget
.bottom_pane
.update_status_header(widget.current_status_header.clone());
widget.set_token_info(self.token_info);
widget.sync_unified_exec_footer();
}
}
struct UnifiedExecSessionSummary {
key: String,
command_display: String,
@@ -302,6 +414,15 @@ enum RateLimitSwitchPromptState {
Shown,
}
#[derive(Clone, Debug)]
enum ReplayMessage {
UserPrompt(String),
AgentChunk {
lines: Vec<Line<'static>>,
is_first_line: bool,
},
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub(crate) enum ExternalEditorState {
#[default]
@@ -362,6 +483,8 @@ pub(crate) struct ChatWidget {
pre_review_token_info: Option<Option<TokenUsageInfo>>,
// Whether to add a final message separator after the last message
needs_final_message_separator: bool,
// Cached user/agent messages for ephemeral replay when switching agent views.
replay_messages: Vec<ReplayMessage>,
last_rendered_width: std::cell::Cell<Option<usize>>,
// Feedback sink for /feedback
@@ -369,6 +492,13 @@ pub(crate) struct ChatWidget {
// Current session rollout path (if known)
current_rollout_path: Option<PathBuf>,
external_editor_state: ExternalEditorState,
selected_agent_id: String,
known_agents: Vec<String>,
known_agents_set: HashSet<String>,
pending_agent_events: HashMap<String, Vec<EventMsg>>,
agent_states: HashMap<String, AgentViewState>,
approval_agent_ids: HashMap<String, String>,
elicitation_agent_ids: HashMap<RequestId, String>,
}
struct UserMessage {
@@ -1425,6 +1555,7 @@ impl ChatWidget {
let placeholder = EXAMPLE_PROMPTS[rng.random_range(0..EXAMPLE_PROMPTS.len())].to_string();
let codex_op_tx = spawn_agent(config.clone(), app_event_tx.clone(), conversation_manager);
let default_agent_id = DEFAULT_AGENT_ID.as_str().to_string();
let mut widget = Self {
app_event_tx: app_event_tx.clone(),
frame_requester: frame_requester.clone(),
@@ -1475,10 +1606,18 @@ impl ChatWidget {
is_review_mode: false,
pre_review_token_info: None,
needs_final_message_separator: false,
replay_messages: Vec::new(),
last_rendered_width: std::cell::Cell::new(None),
feedback,
current_rollout_path: None,
external_editor_state: ExternalEditorState::Closed,
selected_agent_id: default_agent_id.clone(),
known_agents: vec![default_agent_id.clone()],
known_agents_set: HashSet::from([default_agent_id]),
pending_agent_events: HashMap::new(),
agent_states: HashMap::new(),
approval_agent_ids: HashMap::new(),
elicitation_agent_ids: HashMap::new(),
};
widget.prefetch_rate_limits();
@@ -1512,6 +1651,7 @@ impl ChatWidget {
let codex_op_tx =
spawn_agent_from_existing(conversation, session_configured, app_event_tx.clone());
let default_agent_id = DEFAULT_AGENT_ID.as_str().to_string();
let mut widget = Self {
app_event_tx: app_event_tx.clone(),
frame_requester: frame_requester.clone(),
@@ -1562,10 +1702,18 @@ impl ChatWidget {
is_review_mode: false,
pre_review_token_info: None,
needs_final_message_separator: false,
replay_messages: Vec::new(),
last_rendered_width: std::cell::Cell::new(None),
feedback,
current_rollout_path: None,
external_editor_state: ExternalEditorState::Closed,
selected_agent_id: default_agent_id.clone(),
known_agents: vec![default_agent_id.clone()],
known_agents_set: HashSet::from([default_agent_id]),
pending_agent_events: HashMap::new(),
agent_states: HashMap::new(),
approval_agent_ids: HashMap::new(),
elicitation_agent_ids: HashMap::new(),
};
widget.prefetch_rate_limits();
@@ -1584,6 +1732,29 @@ impl ChatWidget {
self.on_ctrl_c();
return;
}
KeyEvent {
code: KeyCode::Char(c),
modifiers,
kind: KeyEventKind::Press,
..
} if modifiers.contains(KeyModifiers::CONTROL) && c.eq_ignore_ascii_case(&'o') => {
self.open_agents_popup();
return;
}
KeyEvent {
code: KeyCode::Char(c),
modifiers,
kind: KeyEventKind::Press,
..
} if modifiers.contains(KeyModifiers::CONTROL)
&& c.eq_ignore_ascii_case(&'n')
&& !self.bottom_pane.composer_is_empty() =>
{
// Avoid clobbering the composer's Ctrl+N history navigation when the
// composer is empty. When there's a draft, Ctrl+N is "new agent".
self.open_new_agent_prompt();
return;
}
KeyEvent {
code: KeyCode::Char(c),
modifiers,
@@ -1807,6 +1978,7 @@ impl ChatWidget {
self.app_event_tx.send(AppEvent::CodexEvent(Event {
id: "1".to_string(),
agent_id: None,
// msg: EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
// call_id: "1".to_string(),
// command: vec!["git".into(), "apply".into()],
@@ -1869,6 +2041,19 @@ impl ChatWidget {
}
}
fn insert_history_cell_without_flushing(&mut self, cell: Box<dyn HistoryCell>) {
if !cell.display_lines(u16::MAX).is_empty() {
self.needs_final_message_separator = true;
}
self.app_event_tx.send(AppEvent::InsertHistoryCell(cell));
}
fn insert_agent_view_header(&mut self, agent_id: &str) {
self.insert_history_cell_without_flushing(Box::new(history_cell::new_agent_view_header(
agent_id.to_string(),
)));
}
// Only flush a live wait cell here; other active cells must finalize via their end events.
fn flush_wait_cell(&mut self) {
// Wait cells are transient: convert them into "(waited)" history entries if present.
@@ -1900,6 +2085,22 @@ impl ChatWidget {
self.flush_active_cell();
self.needs_final_message_separator = true;
}
if let Some(user) = cell
.as_any()
.downcast_ref::<history_cell::UserHistoryCell>()
{
self.replay_messages
.push(ReplayMessage::UserPrompt(user.message.clone()));
} else if let Some(agent) = cell
.as_any()
.downcast_ref::<history_cell::AgentMessageCell>()
{
let (lines, is_first_line) = agent.replay_snapshot();
self.replay_messages.push(ReplayMessage::AgentChunk {
lines,
is_first_line,
});
}
self.app_event_tx.send(AppEvent::InsertHistoryCell(cell));
}
@@ -1956,19 +2157,11 @@ impl ChatWidget {
}
}
self.codex_op_tx
.send(Op::UserInput { items })
.unwrap_or_else(|e| {
tracing::error!("failed to send message: {e}");
});
self.submit_op(Op::UserInput { items });
// Persist the text to cross-session message history.
if !text.is_empty() {
self.codex_op_tx
.send(Op::AddToHistory { text: text.clone() })
.unwrap_or_else(|e| {
tracing::error!("failed to send AddHistory op: {e}");
});
self.submit_op(Op::AddToHistory { text: text.clone() });
}
// Only show the text portion in conversation history.
@@ -1994,8 +2187,30 @@ impl ChatWidget {
}
pub(crate) fn handle_codex_event(&mut self, event: Event) {
let Event { id, msg } = event;
self.dispatch_event_msg(Some(id), msg, false);
let Event { id, msg, agent_id } = event;
if self.should_handle_event_globally(&msg) {
self.dispatch_event_msg(Some(id), msg, false);
return;
}
let resolved_agent_id = agent_id.unwrap_or_else(|| DEFAULT_AGENT_ID.as_str().to_string());
self.register_agent(&resolved_agent_id);
if self.is_approval_event(&msg) {
self.track_approval_agent(&msg, &id, &resolved_agent_id);
self.dispatch_event_msg(Some(id), msg, false);
return;
}
if resolved_agent_id == self.selected_agent_id {
self.dispatch_event_msg(Some(id), msg, false);
return;
}
self.pending_agent_events
.entry(resolved_agent_id)
.or_default()
.push(msg);
}
/// Dispatch a protocol `EventMsg` to the appropriate handler.
@@ -2184,6 +2399,50 @@ impl ChatWidget {
self.frame_requester.schedule_frame();
}
fn register_agent(&mut self, agent_id: &str) {
if self.known_agents_set.insert(agent_id.to_string()) {
self.known_agents.push(agent_id.to_string());
}
}
fn should_handle_event_globally(&self, msg: &EventMsg) -> bool {
matches!(
msg,
EventMsg::SessionConfigured(_)
| EventMsg::ListCustomPromptsResponse(_)
| EventMsg::ListSkillsResponse(_)
| EventMsg::SkillsUpdateAvailable
| EventMsg::McpStartupUpdate(_)
| EventMsg::McpStartupComplete(_)
| EventMsg::ShutdownComplete
)
}
fn is_approval_event(&self, msg: &EventMsg) -> bool {
matches!(
msg,
EventMsg::ExecApprovalRequest(_)
| EventMsg::ApplyPatchApprovalRequest(_)
| EventMsg::ElicitationRequest(_)
)
}
fn track_approval_agent(&mut self, msg: &EventMsg, id: &str, agent_id: &str) {
match msg {
EventMsg::ExecApprovalRequest(_) | EventMsg::ApplyPatchApprovalRequest(_) => {
if !id.is_empty() {
self.approval_agent_ids
.insert(id.to_string(), agent_id.to_string());
}
}
EventMsg::ElicitationRequest(ElicitationRequestEvent { id: request_id, .. }) => {
self.elicitation_agent_ids
.insert(request_id.clone(), agent_id.to_string());
}
_ => {}
}
}
fn notify(&mut self, notification: Notification) {
if !notification.allowed_for(&self.config.tui_notifications) {
return;
@@ -2859,6 +3118,160 @@ impl ChatWidget {
self.bottom_pane.show_view(Box::new(view));
}
pub(crate) fn open_agents_popup(&mut self) {
let mut items: Vec<SelectionItem> = Vec::new();
let mut initial_selected_idx = None;
for (idx, agent_id) in self.known_agents.iter().enumerate() {
let is_current = agent_id == &self.selected_agent_id;
if is_current {
initial_selected_idx = Some(idx);
}
let agent_id_for_action = agent_id.clone();
let actions: Vec<SelectionAction> = vec![Box::new(move |tx| {
tx.send(AppEvent::SwitchAgent {
agent_id: agent_id_for_action.clone(),
});
})];
items.push(SelectionItem {
name: agent_id.clone(),
description: None,
is_current,
is_default: agent_id == DEFAULT_AGENT_ID.as_str(),
actions,
dismiss_on_select: true,
..Default::default()
});
}
self.bottom_pane.show_selection_view(SelectionViewParams {
title: Some("Switch agent".to_string()),
footer_hint: Some(standard_popup_hint_line()),
items,
initial_selected_idx,
header: Box::new(()),
..Default::default()
});
self.request_redraw();
}
pub(crate) fn open_new_agent_prompt(&mut self) {
let app_event_tx = self.app_event_tx.clone();
let view = CustomPromptView::new(
"Spawn new agent".to_string(),
"Enter a new agent id, then press Enter to send your current message.".to_string(),
Some("Create a new agent from the current message".to_string()),
Box::new(move |agent_id| {
app_event_tx.send(AppEvent::CreateAgentFromComposer { agent_id });
}),
);
self.bottom_pane.show_view(Box::new(view));
self.request_redraw();
}
pub(crate) fn create_agent_from_composer(&mut self, agent_id: String) {
let agent_id = agent_id.trim().to_string();
if agent_id.is_empty() {
self.add_to_history(history_cell::new_info_event(
"Agent id cannot be empty.".to_string(),
None,
));
return;
}
self.switch_to_agent(agent_id);
// Reuse the composer's normal submission pipeline (trim/expansion/etc.)
// by simulating an Enter press.
match self
.bottom_pane
.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE))
{
InputResult::Submitted(text) => {
let user_message = UserMessage {
text,
image_paths: self.bottom_pane.take_recent_submission_images(),
};
self.queue_user_message(user_message);
}
InputResult::Command(cmd) => {
self.dispatch_command(cmd);
}
InputResult::None => {}
}
}
pub(crate) fn switch_to_agent(&mut self, agent_id: String) {
if agent_id == self.selected_agent_id {
return;
}
self.register_agent(&agent_id);
let previous_agent = self.selected_agent_id.clone();
let previous_state = AgentViewState::capture_from(self);
self.agent_states.insert(previous_agent, previous_state);
self.bottom_pane.set_task_running(false);
self.selected_agent_id = agent_id.clone();
let next_state = self
.agent_states
.remove(&agent_id)
.unwrap_or_else(AgentViewState::empty);
next_state.restore_into(self);
self.insert_agent_view_header(&agent_id);
self.replay_user_and_agent_messages();
if let Some(pending) = self.pending_agent_events.remove(&agent_id) {
for msg in pending {
self.dispatch_event_msg(None, msg, false);
}
}
self.request_redraw();
}
fn replay_user_and_agent_messages(&mut self) {
if self.replay_messages.is_empty() {
return;
}
let width = self
.last_rendered_width
.get()
.and_then(|w| u16::try_from(w).ok())
.unwrap_or(u16::MAX);
let mut out: Vec<Line<'static>> = Vec::new();
let mut has_emitted = false;
for replay in &self.replay_messages {
let (cell, is_stream_continuation): (Box<dyn HistoryCell>, bool) = match replay {
ReplayMessage::UserPrompt(message) => (
Box::new(history_cell::new_user_prompt(message.clone())),
false,
),
ReplayMessage::AgentChunk {
lines,
is_first_line,
} => (
Box::new(history_cell::AgentMessageCell::new(
lines.clone(),
*is_first_line,
)),
!*is_first_line,
),
};
let mut display = cell.display_lines(width);
if display.is_empty() {
continue;
}
if !is_stream_continuation {
if has_emitted {
out.push(Line::from(""));
} else {
has_emitted = true;
}
}
out.append(&mut display);
}
if !out.is_empty() {
self.app_event_tx
.send(AppEvent::InsertEphemeralHistoryLines(out));
}
}
fn approval_preset_actions(
approval: AskForApproval,
sandbox: SandboxPolicy,
@@ -3316,10 +3729,69 @@ impl ChatWidget {
self.bottom_pane.clear_esc_backtrack_hint();
}
/// Forward an `Op` directly to codex.
pub(crate) fn submit_op(&self, op: Op) {
fn wrap_for_agent(&self, agent_id: String, op: Op) -> Op {
if agent_id == DEFAULT_AGENT_ID.as_str() {
op
} else {
Op::ForAgent {
agent_id,
op: Box::new(op),
}
}
}
fn route_op_for_agent(&mut self, op: Op) -> Op {
match op {
Op::ForAgent { agent_id, op } => {
self.register_agent(&agent_id);
Op::ForAgent { agent_id, op }
}
Op::ExecApproval { id, decision } => {
let agent_id = self
.approval_agent_ids
.remove(&id)
.unwrap_or_else(|| self.selected_agent_id.clone());
self.register_agent(&agent_id);
self.wrap_for_agent(agent_id, Op::ExecApproval { id, decision })
}
Op::PatchApproval { id, decision } => {
let agent_id = self
.approval_agent_ids
.remove(&id)
.unwrap_or_else(|| self.selected_agent_id.clone());
self.register_agent(&agent_id);
self.wrap_for_agent(agent_id, Op::PatchApproval { id, decision })
}
Op::ResolveElicitation {
server_name,
request_id,
decision,
} => {
let agent_id = self
.elicitation_agent_ids
.remove(&request_id)
.unwrap_or_else(|| self.selected_agent_id.clone());
self.register_agent(&agent_id);
let op = Op::ResolveElicitation {
server_name,
request_id,
decision,
};
self.wrap_for_agent(agent_id, op)
}
op => {
let agent_id = self.selected_agent_id.clone();
self.register_agent(&agent_id);
self.wrap_for_agent(agent_id, op)
}
}
}
pub(crate) fn submit_op(&mut self, op: Op) {
// Record outbound operation for session replay fidelity.
crate::session_log::log_outbound_op(&op);
if let Err(e) = self.codex_op_tx.send(op) {
let routed = self.route_op_for_agent(op);
crate::session_log::log_outbound_op(&routed);
if let Err(e) = self.codex_op_tx.send(routed) {
tracing::error!("failed to submit op: {e}");
}
}

View File

@@ -36,6 +36,7 @@ pub(crate) fn spawn_agent(
eprintln!("{message}");
app_event_tx_clone.send(AppEvent::CodexEvent(Event {
id: "".to_string(),
agent_id: None,
msg: EventMsg::Error(err.to_error_event(None)),
}));
app_event_tx_clone.send(AppEvent::ExitRequest);
@@ -48,6 +49,7 @@ pub(crate) fn spawn_agent(
let ev = codex_core::protocol::Event {
// The `id` does not matter for rendering, so we can use a fake value.
id: "".to_string(),
agent_id: None,
msg: codex_core::protocol::EventMsg::SessionConfigured(session_configured),
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
@@ -85,6 +87,7 @@ pub(crate) fn spawn_agent_from_existing(
// Forward the captured `SessionConfigured` event so it can be rendered in the UI.
let ev = codex_core::protocol::Event {
id: "".to_string(),
agent_id: None,
msg: codex_core::protocol::EventMsg::SessionConfigured(session_configured),
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));

View File

@@ -127,6 +127,7 @@ async fn resumed_initial_messages_render_history() {
chat.handle_codex_event(Event {
id: "initial".into(),
agent_id: None,
msg: EventMsg::SessionConfigured(configured),
});
@@ -159,6 +160,7 @@ async fn entered_review_mode_uses_request_hint() {
chat.handle_codex_event(Event {
id: "review-start".into(),
agent_id: None,
msg: EventMsg::EnteredReviewMode(ReviewRequest {
target: ReviewTarget::BaseBranch {
branch: "feature".to_string(),
@@ -180,6 +182,7 @@ async fn entered_review_mode_defaults_to_current_changes_banner() {
chat.handle_codex_event(Event {
id: "review-start".into(),
agent_id: None,
msg: EventMsg::EnteredReviewMode(ReviewRequest {
target: ReviewTarget::UncommittedChanges,
user_facing_hint: None,
@@ -203,6 +206,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "token-before".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(make_token_info(pre_review_tokens, context_window)),
rate_limits: None,
@@ -212,6 +216,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "review-start".into(),
agent_id: None,
msg: EventMsg::EnteredReviewMode(ReviewRequest {
target: ReviewTarget::BaseBranch {
branch: "feature".to_string(),
@@ -222,6 +227,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "token-review".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(make_token_info(review_tokens, context_window)),
rate_limits: None,
@@ -231,6 +237,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "review-end".into(),
agent_id: None,
msg: EventMsg::ExitedReviewMode(ExitedReviewModeEvent {
review_output: None,
}),
@@ -251,6 +258,7 @@ async fn token_count_none_resets_context_indicator() {
chat.handle_codex_event(Event {
id: "token-before".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(make_token_info(pre_compact_tokens, context_window)),
rate_limits: None,
@@ -260,6 +268,7 @@ async fn token_count_none_resets_context_indicator() {
chat.handle_codex_event(Event {
id: "token-cleared".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: None,
rate_limits: None,
@@ -290,6 +299,7 @@ async fn context_indicator_shows_used_tokens_when_window_unknown() {
chat.handle_codex_event(Event {
id: "token-usage".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(token_info),
rate_limits: None,
@@ -366,6 +376,7 @@ async fn make_chatwidget_manual(
skills: None,
});
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("test"));
let default_agent_id = DEFAULT_AGENT_ID.as_str().to_string();
let widget = ChatWidget {
app_event_tx,
codex_op_tx: op_tx,
@@ -404,10 +415,18 @@ async fn make_chatwidget_manual(
is_review_mode: false,
pre_review_token_info: None,
needs_final_message_separator: false,
replay_messages: Vec::new(),
last_rendered_width: std::cell::Cell::new(None),
feedback: codex_feedback::CodexFeedback::new(),
current_rollout_path: None,
external_editor_state: ExternalEditorState::Closed,
selected_agent_id: default_agent_id.clone(),
known_agents: vec![default_agent_id.clone()],
known_agents_set: HashSet::from([default_agent_id]),
pending_agent_events: HashMap::new(),
agent_states: HashMap::new(),
approval_agent_ids: HashMap::new(),
elicitation_agent_ids: HashMap::new(),
};
(widget, rx, op_rx)
}
@@ -727,6 +746,7 @@ async fn exec_approval_emits_proposed_command_and_decision_history() {
};
chat.handle_codex_event(Event {
id: "sub-short".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
@@ -771,6 +791,7 @@ async fn exec_approval_decision_truncates_multiline_and_long_commands() {
};
chat.handle_codex_event(Event {
id: "sub-multi".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev_multi),
});
let proposed_multi = drain_insert_history(&mut rx);
@@ -821,6 +842,7 @@ async fn exec_approval_decision_truncates_multiline_and_long_commands() {
};
chat.handle_codex_event(Event {
id: "sub-long".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev_long),
});
let proposed_long = drain_insert_history(&mut rx);
@@ -863,6 +885,7 @@ fn begin_exec_with_source(
};
chat.handle_codex_event(Event {
id: call_id.to_string(),
agent_id: None,
msg: EventMsg::ExecCommandBegin(event.clone()),
});
event
@@ -888,6 +911,7 @@ fn begin_unified_exec_startup(
};
chat.handle_codex_event(Event {
id: call_id.to_string(),
agent_id: None,
msg: EventMsg::ExecCommandBegin(event.clone()),
});
event
@@ -896,6 +920,7 @@ fn begin_unified_exec_startup(
fn terminal_interaction(chat: &mut ChatWidget, call_id: &str, process_id: &str, stdin: &str) {
chat.handle_codex_event(Event {
id: call_id.to_string(),
agent_id: None,
msg: EventMsg::TerminalInteraction(TerminalInteractionEvent {
call_id: call_id.to_string(),
process_id: process_id.to_string(),
@@ -932,6 +957,7 @@ fn end_exec(
} = begin_event;
chat.handle_codex_event(Event {
id: call_id.clone(),
agent_id: None,
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
process_id,
@@ -1187,6 +1213,7 @@ async fn exec_end_without_begin_uses_event_command() {
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
chat.handle_codex_event(Event {
id: "call-orphan".to_string(),
agent_id: None,
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: "call-orphan".to_string(),
process_id: None,
@@ -1299,6 +1326,7 @@ async fn unified_exec_waiting_multiple_empty_snapshots() {
chat.handle_codex_event(Event {
id: "turn-wait-1".into(),
agent_id: None,
msg: EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: None,
}),
@@ -1348,6 +1376,7 @@ async fn unified_exec_non_empty_then_empty_snapshots() {
chat.handle_codex_event(Event {
id: "turn-wait-3".into(),
agent_id: None,
msg: EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: None,
}),
@@ -1497,6 +1526,7 @@ async fn undo_success_events_render_info_messages() {
chat.handle_codex_event(Event {
id: "turn-1".to_string(),
agent_id: None,
msg: EventMsg::UndoStarted(UndoStartedEvent {
message: Some("Undo requested for the last turn...".to_string()),
}),
@@ -1508,6 +1538,7 @@ async fn undo_success_events_render_info_messages() {
chat.handle_codex_event(Event {
id: "turn-1".to_string(),
agent_id: None,
msg: EventMsg::UndoCompleted(UndoCompletedEvent {
success: true,
message: None,
@@ -1534,6 +1565,7 @@ async fn undo_failure_events_render_error_message() {
chat.handle_codex_event(Event {
id: "turn-2".to_string(),
agent_id: None,
msg: EventMsg::UndoStarted(UndoStartedEvent { message: None }),
});
assert!(
@@ -1543,6 +1575,7 @@ async fn undo_failure_events_render_error_message() {
chat.handle_codex_event(Event {
id: "turn-2".to_string(),
agent_id: None,
msg: EventMsg::UndoCompleted(UndoCompletedEvent {
success: false,
message: Some("Failed to restore workspace state.".to_string()),
@@ -1569,6 +1602,7 @@ async fn undo_started_hides_interrupt_hint() {
chat.handle_codex_event(Event {
id: "turn-hint".to_string(),
agent_id: None,
msg: EventMsg::UndoStarted(UndoStartedEvent { message: None }),
});
@@ -1692,6 +1726,7 @@ async fn view_image_tool_call_adds_history_cell() {
chat.handle_codex_event(Event {
id: "sub-image".into(),
agent_id: None,
msg: EventMsg::ViewImageToolCall(ViewImageToolCallEvent {
call_id: "call-image".into(),
path: image_path,
@@ -1717,6 +1752,7 @@ async fn interrupt_exec_marks_failed_snapshot() {
// cause the active exec cell to be finalized as failed and flushed.
chat.handle_codex_event(Event {
id: "call-int".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -1742,6 +1778,7 @@ async fn interrupted_turn_error_message_snapshot() {
// Simulate an in-progress task so the widget is in a running state.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -1750,6 +1787,7 @@ async fn interrupted_turn_error_message_snapshot() {
// Abort the turn (like pressing Esc) and drain inserted history.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -2394,6 +2432,7 @@ async fn approval_modal_exec_snapshot() -> anyhow::Result<()> {
};
chat.handle_codex_event(Event {
id: "sub-approve".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
// Render to a fixed-size test terminal and snapshot.
@@ -2447,6 +2486,7 @@ async fn approval_modal_exec_without_reason_snapshot() -> anyhow::Result<()> {
};
chat.handle_codex_event(Event {
id: "sub-approve-noreason".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
@@ -2489,6 +2529,7 @@ async fn approval_modal_patch_snapshot() -> anyhow::Result<()> {
};
chat.handle_codex_event(Event {
id: "sub-approve-patch".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
@@ -2525,6 +2566,7 @@ async fn interrupt_restores_queued_messages_into_composer() {
// Deliver a TurnAborted event with Interrupted reason (as if Esc was pressed).
chat.handle_codex_event(Event {
id: "turn-1".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -2563,6 +2605,7 @@ async fn interrupt_prepends_queued_messages_before_existing_composer_text() {
chat.handle_codex_event(Event {
id: "turn-1".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -2608,12 +2651,14 @@ async fn ui_snapshots_small_heights_task_running() {
// Activate status line
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
});
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Thinking**".into(),
}),
@@ -2639,6 +2684,7 @@ async fn status_widget_and_approval_modal_snapshot() {
// Begin a running task so the status indicator would be active.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -2646,6 +2692,7 @@ async fn status_widget_and_approval_modal_snapshot() {
// Provide a deterministic header for the status line.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Analyzing**".into(),
}),
@@ -2668,6 +2715,7 @@ async fn status_widget_and_approval_modal_snapshot() {
};
chat.handle_codex_event(Event {
id: "sub-approve-exec".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
@@ -2691,6 +2739,7 @@ async fn status_widget_active_snapshot() {
// Activate the status indicator by simulating a task start.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -2698,6 +2747,7 @@ async fn status_widget_active_snapshot() {
// Provide a deterministic header via a bold reasoning chunk.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Analyzing**".into(),
}),
@@ -2719,6 +2769,7 @@ async fn mcp_startup_header_booting_snapshot() {
chat.handle_codex_event(Event {
id: "mcp-1".into(),
agent_id: None,
msg: EventMsg::McpStartupUpdate(McpStartupUpdateEvent {
server: "alpha".into(),
status: McpStartupStatus::Starting,
@@ -2740,6 +2791,7 @@ async fn background_event_updates_status_header() {
chat.handle_codex_event(Event {
id: "bg-1".into(),
agent_id: None,
msg: EventMsg::BackgroundEvent(BackgroundEventEvent {
message: "Waiting for `vim`".to_string(),
}),
@@ -2771,6 +2823,7 @@ async fn apply_patch_events_emit_history_cells() {
};
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
let cells = drain_insert_history(&mut rx);
@@ -2811,6 +2864,7 @@ async fn apply_patch_events_emit_history_cells() {
};
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(begin),
});
let cells = drain_insert_history(&mut rx);
@@ -2839,6 +2893,7 @@ async fn apply_patch_events_emit_history_cells() {
};
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyEnd(end),
});
let cells = drain_insert_history(&mut rx);
@@ -2861,6 +2916,7 @@ async fn apply_patch_manual_approval_adjusts_header() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2880,6 +2936,7 @@ async fn apply_patch_manual_approval_adjusts_header() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2910,6 +2967,7 @@ async fn apply_patch_manual_flow_snapshot() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2933,6 +2991,7 @@ async fn apply_patch_manual_flow_snapshot() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2970,6 +3029,7 @@ async fn apply_patch_approval_sends_op_with_submission_id() {
};
chat.handle_codex_event(Event {
id: "sub-123".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
@@ -3001,6 +3061,7 @@ async fn apply_patch_full_flow_integration_like() {
);
chat.handle_codex_event(Event {
id: "sub-xyz".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -3042,6 +3103,7 @@ async fn apply_patch_full_flow_integration_like() {
);
chat.handle_codex_event(Event {
id: "sub-xyz".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -3056,6 +3118,7 @@ async fn apply_patch_full_flow_integration_like() {
);
chat.handle_codex_event(Event {
id: "sub-xyz".into(),
agent_id: None,
msg: EventMsg::PatchApplyEnd(PatchApplyEndEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -3081,6 +3144,7 @@ async fn apply_patch_untrusted_shows_approval_modal() -> anyhow::Result<()> {
);
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -3132,6 +3196,7 @@ async fn apply_patch_request_shows_diff_summary() -> anyhow::Result<()> {
);
chat.handle_codex_event(Event {
id: "sub-apply".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "call-apply".into(),
turn_id: "turn-apply".into(),
@@ -3204,6 +3269,7 @@ async fn plan_update_renders_history_cell() {
};
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::PlanUpdate(update),
});
let cells = drain_insert_history(&mut rx);
@@ -3225,6 +3291,7 @@ async fn stream_error_updates_status_indicator() {
let msg = "Reconnecting... 2/5";
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::StreamError(StreamErrorEvent {
message: msg.to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
@@ -3248,6 +3315,7 @@ async fn warning_event_adds_warning_history_cell() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::Warning(WarningEvent {
message: "test warning message".to_string(),
}),
@@ -3267,6 +3335,7 @@ async fn stream_recovery_restores_previous_status_header() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "task".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -3274,6 +3343,7 @@ async fn stream_recovery_restores_previous_status_header() {
drain_insert_history(&mut rx);
chat.handle_codex_event(Event {
id: "retry".into(),
agent_id: None,
msg: EventMsg::StreamError(StreamErrorEvent {
message: "Reconnecting... 1/5".to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
@@ -3282,6 +3352,7 @@ async fn stream_recovery_restores_previous_status_header() {
drain_insert_history(&mut rx);
chat.handle_codex_event(Event {
id: "delta".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "hello".to_string(),
}),
@@ -3302,6 +3373,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// Begin turn
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -3310,6 +3382,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// First finalized assistant message
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "First message".into(),
}),
@@ -3318,6 +3391,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// Second finalized assistant message in the same turn
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Second message".into(),
}),
@@ -3326,6 +3400,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// End turn
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: None,
}),
@@ -3349,6 +3424,106 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
assert!(first_idx < second_idx, "messages out of order: {combined}");
}
#[tokio::test]
async fn non_selected_agent_events_buffer_until_switch() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "ev1".into(),
agent_id: Some("worker_1".into()),
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "hello from worker".into(),
}),
});
let cells = drain_insert_history(&mut rx);
assert!(cells.is_empty(), "expected no history while not selected");
chat.switch_to_agent("worker_1".to_string());
let cells = drain_insert_history(&mut rx);
let combined: String = cells
.iter()
.map(|lines| lines_to_single_string(lines))
.collect();
assert!(
combined.contains("Viewing agent `worker_1`"),
"missing view header: {combined}"
);
assert!(
combined.contains("hello from worker"),
"missing buffered message: {combined}"
);
}
#[tokio::test]
async fn approval_ops_route_to_requesting_agent() {
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await;
let ev = ExecApprovalRequestEvent {
call_id: "call-1".into(),
turn_id: "turn-1".into(),
command: vec!["echo".into(), "ok".into()],
cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
reason: None,
proposed_execpolicy_amendment: None,
parsed_cmd: vec![],
};
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: Some("worker_2".into()),
msg: EventMsg::ExecApprovalRequest(ev),
});
chat.submit_op(Op::ExecApproval {
id: "sub-1".into(),
decision: codex_core::protocol::ReviewDecision::Approved,
});
let routed = op_rx.try_recv().expect("expected routed op");
match routed {
Op::ForAgent { agent_id, op } => {
assert_eq!(agent_id, "worker_2");
assert!(
matches!(*op, Op::ExecApproval { .. }),
"expected exec approval op, got {op:?}"
);
}
other => panic!("expected ForAgent op, got {other:?}"),
}
}
#[tokio::test]
async fn ctrl_n_create_agent_from_composer_routes_submission() {
let (mut chat, _rx, mut op_rx) = make_chatwidget_manual(None).await;
chat.set_composer_text("hello".to_string());
chat.create_agent_from_composer("worker_3".to_string());
let first = op_rx.try_recv().expect("expected first routed op");
match first {
Op::ForAgent { agent_id, op } => {
assert_eq!(agent_id, "worker_3");
assert!(
matches!(*op, Op::UserInput { .. }),
"expected user input op, got {op:?}"
);
}
other => panic!("expected ForAgent op, got {other:?}"),
}
let second = op_rx.try_recv().expect("expected second routed op");
match second {
Op::ForAgent { agent_id, op } => {
assert_eq!(agent_id, "worker_3");
assert!(
matches!(*op, Op::AddToHistory { .. }),
"expected add to history op, got {op:?}"
);
}
other => panic!("expected ForAgent op, got {other:?}"),
}
}
#[tokio::test]
async fn final_reasoning_then_message_without_deltas_are_rendered() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
@@ -3356,12 +3531,14 @@ async fn final_reasoning_then_message_without_deltas_are_rendered() {
// No deltas; only final reasoning followed by final message.
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoning(AgentReasoningEvent {
text: "I will first analyze the request.".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Here is the result.".into(),
}),
@@ -3383,24 +3560,28 @@ async fn deltas_then_same_final_message_are_rendered_snapshot() {
// Stream some reasoning deltas first.
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "I will ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "first analyze the ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "request.".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoning(AgentReasoningEvent {
text: "request.".into(),
}),
@@ -3409,12 +3590,14 @@ async fn deltas_then_same_final_message_are_rendered_snapshot() {
// Then stream answer deltas, followed by the exact same final message.
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "Here is the ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "result.".into(),
}),
@@ -3422,6 +3605,7 @@ async fn deltas_then_same_final_message_are_rendered_snapshot() {
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Here is the result.".into(),
}),
@@ -3445,6 +3629,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent { message: "Im going to search the repo for where “Change Approved” is rendered to update that view.".into() }),
});
@@ -3464,6 +3649,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
chat.handle_codex_event(Event {
id: "c1".into(),
agent_id: None,
msg: EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: "c1".into(),
process_id: None,
@@ -3477,6 +3663,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
});
chat.handle_codex_event(Event {
id: "c1".into(),
agent_id: None,
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: "c1".into(),
process_id: None,
@@ -3496,12 +3683,14 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
});
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
});
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Investigating rendering code**".into(),
}),
@@ -3540,6 +3729,7 @@ async fn chatwidget_markdown_code_blocks_vt100_snapshot() {
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -3588,6 +3778,7 @@ printf 'fenced within fenced\n'
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
});
// Drive commit ticks and drain emitted history lines into the vt100 buffer.
@@ -3611,6 +3802,7 @@ printf 'fenced within fenced\n'
// Finalize the stream without sending a final AgentMessage, to flush any tail.
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: None,
}),
@@ -3628,6 +3820,7 @@ async fn chatwidget_tall() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),

View File

@@ -246,6 +246,10 @@ impl AgentMessageCell {
is_first_line,
}
}
pub(crate) fn replay_snapshot(&self) -> (Vec<Line<'static>>, bool) {
(self.lines.clone(), self.is_first_line)
}
}
impl HistoryCell for AgentMessageCell {
@@ -1537,6 +1541,11 @@ pub(crate) fn new_info_event(message: String, hint: Option<String>) -> PlainHist
PlainHistoryCell { lines }
}
pub(crate) fn new_agent_view_header(agent_id: String) -> PlainHistoryCell {
let line: Line<'static> = format!("Viewing agent `{agent_id}`").cyan().into();
PlainHistoryCell { lines: vec![line] }
}
pub(crate) fn new_error_event(message: String) -> PlainHistoryCell {
// Use a hair space (U+200A) to create a subtle, near-invisible separation
// before the text. VS16 is intentionally omitted to keep spacing tighter

View File

@@ -2735,6 +2735,7 @@ mod tests {
app.chat_widget.handle_codex_event(Event {
id: String::new(),
agent_id: None,
msg: EventMsg::SessionConfigured(event),
});

View File

@@ -24,6 +24,7 @@ use codex_core::protocol::AgentReasoningRawContentEvent;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::BackgroundEventEvent;
use codex_core::protocol::CreditsSnapshot;
use codex_core::protocol::DEFAULT_AGENT_ID;
use codex_core::protocol::DeprecationNoticeEvent;
use codex_core::protocol::ErrorEvent;
use codex_core::protocol::Event;
@@ -1632,6 +1633,7 @@ impl ChatWidget {
self.app_event_tx.send(AppEvent::CodexEvent(Event {
id: "1".to_string(),
agent_id: None,
// msg: EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
// call_id: "1".to_string(),
// command: vec!["git".into(), "apply".into()],
@@ -1797,10 +1799,25 @@ impl ChatWidget {
}
pub(crate) fn handle_codex_event(&mut self, event: Event) {
let Event { id, msg } = event;
let Event { id, msg, agent_id } = event;
if !self.should_handle_agent_event(agent_id.as_deref(), &msg) {
return;
}
self.dispatch_event_msg(Some(id), msg, false);
}
fn should_handle_agent_event(&self, agent_id: Option<&str>, msg: &EventMsg) -> bool {
let agent_id = agent_id.unwrap_or(&DEFAULT_AGENT_ID);
if agent_id == *DEFAULT_AGENT_ID {
return true;
}
matches!(
msg,
EventMsg::ExecApprovalRequest(_) | EventMsg::ApplyPatchApprovalRequest(_)
)
}
/// Dispatch a protocol `EventMsg` to the appropriate handler.
///
/// `id` is `Some` for live events and `None` for replayed events from

View File

@@ -36,6 +36,7 @@ pub(crate) fn spawn_agent(
eprintln!("{message}");
app_event_tx_clone.send(AppEvent::CodexEvent(Event {
id: "".to_string(),
agent_id: None,
msg: EventMsg::Error(err.to_error_event(None)),
}));
app_event_tx_clone.send(AppEvent::ExitRequest);
@@ -48,6 +49,7 @@ pub(crate) fn spawn_agent(
let ev = codex_core::protocol::Event {
// The `id` does not matter for rendering, so we can use a fake value.
id: "".to_string(),
agent_id: None,
msg: codex_core::protocol::EventMsg::SessionConfigured(session_configured),
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
@@ -85,6 +87,7 @@ pub(crate) fn spawn_agent_from_existing(
// Forward the captured `SessionConfigured` event so it can be rendered in the UI.
let ev = codex_core::protocol::Event {
id: "".to_string(),
agent_id: None,
msg: codex_core::protocol::EventMsg::SessionConfigured(session_configured),
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));

View File

@@ -125,6 +125,7 @@ async fn resumed_initial_messages_render_history() {
chat.handle_codex_event(Event {
id: "initial".into(),
agent_id: None,
msg: EventMsg::SessionConfigured(configured),
});
@@ -157,6 +158,7 @@ async fn entered_review_mode_uses_request_hint() {
chat.handle_codex_event(Event {
id: "review-start".into(),
agent_id: None,
msg: EventMsg::EnteredReviewMode(ReviewRequest {
target: ReviewTarget::BaseBranch {
branch: "feature".to_string(),
@@ -178,6 +180,7 @@ async fn entered_review_mode_defaults_to_current_changes_banner() {
chat.handle_codex_event(Event {
id: "review-start".into(),
agent_id: None,
msg: EventMsg::EnteredReviewMode(ReviewRequest {
target: ReviewTarget::UncommittedChanges,
user_facing_hint: None,
@@ -201,6 +204,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "token-before".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(make_token_info(pre_review_tokens, context_window)),
rate_limits: None,
@@ -210,6 +214,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "review-start".into(),
agent_id: None,
msg: EventMsg::EnteredReviewMode(ReviewRequest {
target: ReviewTarget::BaseBranch {
branch: "feature".to_string(),
@@ -220,6 +225,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "token-review".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(make_token_info(review_tokens, context_window)),
rate_limits: None,
@@ -229,6 +235,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "review-end".into(),
agent_id: None,
msg: EventMsg::ExitedReviewMode(ExitedReviewModeEvent {
review_output: None,
}),
@@ -249,6 +256,7 @@ async fn token_count_none_resets_context_indicator() {
chat.handle_codex_event(Event {
id: "token-before".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(make_token_info(pre_compact_tokens, context_window)),
rate_limits: None,
@@ -258,6 +266,7 @@ async fn token_count_none_resets_context_indicator() {
chat.handle_codex_event(Event {
id: "token-cleared".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: None,
rate_limits: None,
@@ -288,6 +297,7 @@ async fn context_indicator_shows_used_tokens_when_window_unknown() {
chat.handle_codex_event(Event {
id: "token-usage".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(token_info),
rate_limits: None,
@@ -723,6 +733,7 @@ async fn exec_approval_emits_proposed_command_and_decision_history() {
};
chat.handle_codex_event(Event {
id: "sub-short".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
@@ -767,6 +778,7 @@ async fn exec_approval_decision_truncates_multiline_and_long_commands() {
};
chat.handle_codex_event(Event {
id: "sub-multi".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev_multi),
});
let proposed_multi = drain_insert_history(&mut rx);
@@ -817,6 +829,7 @@ async fn exec_approval_decision_truncates_multiline_and_long_commands() {
};
chat.handle_codex_event(Event {
id: "sub-long".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev_long),
});
let proposed_long = drain_insert_history(&mut rx);
@@ -859,6 +872,7 @@ fn begin_exec_with_source(
};
chat.handle_codex_event(Event {
id: call_id.to_string(),
agent_id: None,
msg: EventMsg::ExecCommandBegin(event.clone()),
});
event
@@ -892,6 +906,7 @@ fn end_exec(
} = begin_event;
chat.handle_codex_event(Event {
id: call_id.clone(),
agent_id: None,
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
process_id,
@@ -1147,6 +1162,7 @@ async fn exec_end_without_begin_uses_event_command() {
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
chat.handle_codex_event(Event {
id: "call-orphan".to_string(),
agent_id: None,
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: "call-orphan".to_string(),
process_id: None,
@@ -1332,6 +1348,7 @@ async fn undo_success_events_render_info_messages() {
chat.handle_codex_event(Event {
id: "turn-1".to_string(),
agent_id: None,
msg: EventMsg::UndoStarted(UndoStartedEvent {
message: Some("Undo requested for the last turn...".to_string()),
}),
@@ -1343,6 +1360,7 @@ async fn undo_success_events_render_info_messages() {
chat.handle_codex_event(Event {
id: "turn-1".to_string(),
agent_id: None,
msg: EventMsg::UndoCompleted(UndoCompletedEvent {
success: true,
message: None,
@@ -1369,6 +1387,7 @@ async fn undo_failure_events_render_error_message() {
chat.handle_codex_event(Event {
id: "turn-2".to_string(),
agent_id: None,
msg: EventMsg::UndoStarted(UndoStartedEvent { message: None }),
});
assert!(
@@ -1378,6 +1397,7 @@ async fn undo_failure_events_render_error_message() {
chat.handle_codex_event(Event {
id: "turn-2".to_string(),
agent_id: None,
msg: EventMsg::UndoCompleted(UndoCompletedEvent {
success: false,
message: Some("Failed to restore workspace state.".to_string()),
@@ -1404,6 +1424,7 @@ async fn undo_started_hides_interrupt_hint() {
chat.handle_codex_event(Event {
id: "turn-hint".to_string(),
agent_id: None,
msg: EventMsg::UndoStarted(UndoStartedEvent { message: None }),
});
@@ -1527,6 +1548,7 @@ async fn view_image_tool_call_adds_history_cell() {
chat.handle_codex_event(Event {
id: "sub-image".into(),
agent_id: None,
msg: EventMsg::ViewImageToolCall(ViewImageToolCallEvent {
call_id: "call-image".into(),
path: image_path,
@@ -1552,6 +1574,7 @@ async fn interrupt_exec_marks_failed_snapshot() {
// cause the active exec cell to be finalized as failed and flushed.
chat.handle_codex_event(Event {
id: "call-int".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -1577,6 +1600,7 @@ async fn interrupted_turn_error_message_snapshot() {
// Simulate an in-progress task so the widget is in a running state.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -1585,6 +1609,7 @@ async fn interrupted_turn_error_message_snapshot() {
// Abort the turn (like pressing Esc) and drain inserted history.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -2056,6 +2081,7 @@ async fn approval_modal_exec_snapshot() {
};
chat.handle_codex_event(Event {
id: "sub-approve".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
// Render to a fixed-size test terminal and snapshot.
@@ -2107,6 +2133,7 @@ async fn approval_modal_exec_without_reason_snapshot() {
};
chat.handle_codex_event(Event {
id: "sub-approve-noreason".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
@@ -2147,6 +2174,7 @@ async fn approval_modal_patch_snapshot() {
};
chat.handle_codex_event(Event {
id: "sub-approve-patch".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
@@ -2181,6 +2209,7 @@ async fn interrupt_restores_queued_messages_into_composer() {
// Deliver a TurnAborted event with Interrupted reason (as if Esc was pressed).
chat.handle_codex_event(Event {
id: "turn-1".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -2219,6 +2248,7 @@ async fn interrupt_prepends_queued_messages_before_existing_composer_text() {
chat.handle_codex_event(Event {
id: "turn-1".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -2264,12 +2294,14 @@ async fn ui_snapshots_small_heights_task_running() {
// Activate status line
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
});
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Thinking**".into(),
}),
@@ -2295,6 +2327,7 @@ async fn status_widget_and_approval_modal_snapshot() {
// Begin a running task so the status indicator would be active.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -2302,6 +2335,7 @@ async fn status_widget_and_approval_modal_snapshot() {
// Provide a deterministic header for the status line.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Analyzing**".into(),
}),
@@ -2324,6 +2358,7 @@ async fn status_widget_and_approval_modal_snapshot() {
};
chat.handle_codex_event(Event {
id: "sub-approve-exec".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
@@ -2347,6 +2382,7 @@ async fn status_widget_active_snapshot() {
// Activate the status indicator by simulating a task start.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -2354,6 +2390,7 @@ async fn status_widget_active_snapshot() {
// Provide a deterministic header via a bold reasoning chunk.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Analyzing**".into(),
}),
@@ -2375,6 +2412,7 @@ async fn mcp_startup_header_booting_snapshot() {
chat.handle_codex_event(Event {
id: "mcp-1".into(),
agent_id: None,
msg: EventMsg::McpStartupUpdate(McpStartupUpdateEvent {
server: "alpha".into(),
status: McpStartupStatus::Starting,
@@ -2396,6 +2434,7 @@ async fn background_event_updates_status_header() {
chat.handle_codex_event(Event {
id: "bg-1".into(),
agent_id: None,
msg: EventMsg::BackgroundEvent(BackgroundEventEvent {
message: "Waiting for `vim`".to_string(),
}),
@@ -2427,6 +2466,7 @@ async fn apply_patch_events_emit_history_cells() {
};
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
let cells = drain_insert_history(&mut rx);
@@ -2467,6 +2507,7 @@ async fn apply_patch_events_emit_history_cells() {
};
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(begin),
});
let cells = drain_insert_history(&mut rx);
@@ -2495,6 +2536,7 @@ async fn apply_patch_events_emit_history_cells() {
};
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyEnd(end),
});
let cells = drain_insert_history(&mut rx);
@@ -2517,6 +2559,7 @@ async fn apply_patch_manual_approval_adjusts_header() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2536,6 +2579,7 @@ async fn apply_patch_manual_approval_adjusts_header() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2566,6 +2610,7 @@ async fn apply_patch_manual_flow_snapshot() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2589,6 +2634,7 @@ async fn apply_patch_manual_flow_snapshot() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2626,6 +2672,7 @@ async fn apply_patch_approval_sends_op_with_submission_id() {
};
chat.handle_codex_event(Event {
id: "sub-123".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
@@ -2657,6 +2704,7 @@ async fn apply_patch_full_flow_integration_like() {
);
chat.handle_codex_event(Event {
id: "sub-xyz".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -2698,6 +2746,7 @@ async fn apply_patch_full_flow_integration_like() {
);
chat.handle_codex_event(Event {
id: "sub-xyz".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -2712,6 +2761,7 @@ async fn apply_patch_full_flow_integration_like() {
);
chat.handle_codex_event(Event {
id: "sub-xyz".into(),
agent_id: None,
msg: EventMsg::PatchApplyEnd(PatchApplyEndEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -2737,6 +2787,7 @@ async fn apply_patch_untrusted_shows_approval_modal() {
);
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -2786,6 +2837,7 @@ async fn apply_patch_request_shows_diff_summary() {
);
chat.handle_codex_event(Event {
id: "sub-apply".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "call-apply".into(),
turn_id: "turn-apply".into(),
@@ -2856,6 +2908,7 @@ async fn plan_update_renders_history_cell() {
};
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::PlanUpdate(update),
});
let cells = drain_insert_history(&mut rx);
@@ -2877,6 +2930,7 @@ async fn stream_error_updates_status_indicator() {
let msg = "Reconnecting... 2/5";
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::StreamError(StreamErrorEvent {
message: msg.to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
@@ -2900,6 +2954,7 @@ async fn warning_event_adds_warning_history_cell() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::Warning(WarningEvent {
message: "test warning message".to_string(),
}),
@@ -2919,6 +2974,7 @@ async fn stream_recovery_restores_previous_status_header() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "task".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -2926,6 +2982,7 @@ async fn stream_recovery_restores_previous_status_header() {
drain_insert_history(&mut rx);
chat.handle_codex_event(Event {
id: "retry".into(),
agent_id: None,
msg: EventMsg::StreamError(StreamErrorEvent {
message: "Reconnecting... 1/5".to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
@@ -2934,6 +2991,7 @@ async fn stream_recovery_restores_previous_status_header() {
drain_insert_history(&mut rx);
chat.handle_codex_event(Event {
id: "delta".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "hello".to_string(),
}),
@@ -2954,6 +3012,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// Begin turn
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -2962,6 +3021,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// First finalized assistant message
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "First message".into(),
}),
@@ -2970,6 +3030,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// Second finalized assistant message in the same turn
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Second message".into(),
}),
@@ -2978,6 +3039,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// End turn
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: None,
}),
@@ -3008,12 +3070,14 @@ async fn final_reasoning_then_message_without_deltas_are_rendered() {
// No deltas; only final reasoning followed by final message.
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoning(AgentReasoningEvent {
text: "I will first analyze the request.".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Here is the result.".into(),
}),
@@ -3035,24 +3099,28 @@ async fn deltas_then_same_final_message_are_rendered_snapshot() {
// Stream some reasoning deltas first.
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "I will ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "first analyze the ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "request.".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoning(AgentReasoningEvent {
text: "request.".into(),
}),
@@ -3061,12 +3129,14 @@ async fn deltas_then_same_final_message_are_rendered_snapshot() {
// Then stream answer deltas, followed by the exact same final message.
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "Here is the ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "result.".into(),
}),
@@ -3074,6 +3144,7 @@ async fn deltas_then_same_final_message_are_rendered_snapshot() {
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Here is the result.".into(),
}),
@@ -3097,6 +3168,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent { message: "Im going to search the repo for where “Change Approved” is rendered to update that view.".into() }),
});
@@ -3116,6 +3188,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
chat.handle_codex_event(Event {
id: "c1".into(),
agent_id: None,
msg: EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: "c1".into(),
process_id: None,
@@ -3129,6 +3202,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
});
chat.handle_codex_event(Event {
id: "c1".into(),
agent_id: None,
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: "c1".into(),
process_id: None,
@@ -3148,12 +3222,14 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
});
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
});
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Investigating rendering code**".into(),
}),
@@ -3192,6 +3268,7 @@ async fn chatwidget_markdown_code_blocks_vt100_snapshot() {
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -3240,6 +3317,7 @@ printf 'fenced within fenced\n'
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
});
// Drive commit ticks and drain emitted history lines into the vt100 buffer.
@@ -3263,6 +3341,7 @@ printf 'fenced within fenced\n'
// Finalize the stream without sending a final AgentMessage, to flush any tail.
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: None,
}),
@@ -3280,6 +3359,7 @@ async fn chatwidget_tall() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),