Compare commits

...

15 Commits

Author SHA1 Message Date
jif-oai
39ec8e0906 NIT fix 2025-12-23 11:31:24 +01:00
jif-oai
c26c29be9b Fix stuff 2025-12-23 11:02:54 +01:00
jif-oai
105effb569 Merge remote-tracking branch 'origin/jif/multi-agent-2' into jif/multi-agent-3
# Conflicts:
#	codex-rs/core/src/codex.rs
2025-12-23 10:52:41 +01:00
jif-oai
ec1de7e8c0 Merge branch 'main' into jif/multi-agent-2 2025-12-23 10:49:12 +01:00
jif-oai
96da76b77c NIT 3 2025-12-23 10:48:20 +01:00
jif-oai
16e3312eff NIT 2 2025-12-23 10:43:00 +01:00
jif-oai
9c84be7841 NIT 2025-12-23 10:41:40 +01:00
jif-oai
7f4b324ce2 Fix test 2025-12-23 10:35:24 +01:00
jif-oai
50cb2bea55 NIT clean 2025-12-23 10:03:20 +01:00
jif-oai
43b6d3e631 Move agent state 2025-12-18 15:26:03 +00:00
jif-oai
c2f57b605a Doc 2025-12-18 15:21:39 +00:00
jif-oai
b9331372ad chore[multi-agent]: multiple agents running 2025-12-18 14:51:41 +00:00
jif-oai
9c859b4e85 base 3 2025-12-18 14:50:41 +00:00
jif-oai
090e5cd95e More baase 2025-12-18 14:27:28 +00:00
jif-oai
61474b285c base 2025-12-18 14:07:39 +00:00
37 changed files with 1001 additions and 290 deletions

View File

@@ -90,6 +90,7 @@ pub(crate) async fn apply_bespoke_event_handling(
let Event {
id: event_turn_id,
msg,
..
} = event;
match msg {
EventMsg::TaskComplete(_ev) => {

View File

@@ -151,6 +151,7 @@ use codex_protocol::protocol::GitInfo as CoreGitInfo;
use codex_protocol::protocol::McpAuthStatus as CoreMcpAuthStatus;
use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionMetaLine;
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use codex_protocol::user_input::UserInput as CoreInputItem;
@@ -1592,7 +1593,18 @@ impl CodexMessageProcessor {
.await;
return;
}
InitialHistory::Forked(history.into_iter().map(RolloutItem::ResponseItem).collect())
InitialHistory::Forked(
history
.into_iter()
.map(|item| RolloutLine {
timestamp: Utc::now()
.format("%Y-%m-%dT%H:%M:%S%.3fZ")
.to_string(),
agent_id: None,
item: RolloutItem::ResponseItem(item),
})
.collect(),
)
} else if let Some(path) = path {
match RolloutRecorder::get_rollout_history(&path).await {
Ok(initial_history) => initial_history,
@@ -2298,7 +2310,16 @@ impl CodexMessageProcessor {
} else {
match history {
Some(history) if !history.is_empty() => InitialHistory::Forked(
history.into_iter().map(RolloutItem::ResponseItem).collect(),
history
.into_iter()
.map(|item| RolloutLine {
timestamp: Utc::now()
.format("%Y-%m-%dT%H:%M:%S%.3fZ")
.to_string(),
agent_id: None,
item: RolloutItem::ResponseItem(item),
})
.collect(),
),
Some(_) | None => {
self.send_invalid_request_error(
@@ -3594,6 +3615,7 @@ mod tests {
let line = RolloutLine {
timestamp: timestamp.clone(),
agent_id: None,
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: session_meta.clone(),
git: None,

File diff suppressed because it is too large Load Diff

View File

@@ -182,14 +182,17 @@ async fn forward_events(
// ignore all legacy delta events
Event {
id: _,
agent_id: _,
msg: EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_),
} => {}
Event {
id: _,
agent_id: _,
msg: EventMsg::SessionConfigured(_),
} => {}
Event {
id,
agent_id: _,
msg: EventMsg::ExecApprovalRequest(event),
} => {
// Initiate approval via parent session; do not surface to consumer.
@@ -205,6 +208,7 @@ async fn forward_events(
}
Event {
id,
agent_id: _,
msg: EventMsg::ApplyPatchApprovalRequest(event),
} => {
handle_patch_approval(
@@ -372,6 +376,7 @@ mod tests {
tx_out
.send(Event {
id: "full".to_string(),
agent_id: None,
msg: EventMsg::TurnAborted(TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -391,6 +396,7 @@ mod tests {
tx_events
.send(Event {
id: "evt".to_string(),
agent_id: None,
msg: EventMsg::RawResponseItem(RawResponseItemEvent {
item: ResponseItem::CustomToolCall {
id: None,

View File

@@ -68,7 +68,7 @@ async fn run_compact_task_inner(
) {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let mut history = sess.clone_history().await;
let mut history = sess.clone_history(&turn_context.agent_id).await;
history.record_items(
&[initial_input_for_turn.into()],
turn_context.truncation_policy,
@@ -92,7 +92,8 @@ async fn run_compact_task_inner(
final_output_json_schema: turn_context.final_output_json_schema.clone(),
truncation_policy: Some(turn_context.truncation_policy.into()),
});
sess.persist_rollout_items(&[rollout_item]).await;
sess.persist_rollout_items(&turn_context.agent_id, &[rollout_item])
.await;
loop {
let turn_input = history.get_history_for_prompt();
@@ -155,7 +156,10 @@ async fn run_compact_task_inner(
}
}
let history_snapshot = sess.clone_history().await.get_history();
let history_snapshot = sess
.clone_history(&turn_context.agent_id)
.await
.get_history();
let summary_suffix =
get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default();
let summary_text = format!("{SUMMARY_PREFIX}\n{summary_suffix}");
@@ -169,14 +173,16 @@ async fn run_compact_task_inner(
.cloned()
.collect();
new_history.extend(ghost_snapshots);
sess.replace_history(new_history).await;
sess.replace_history(&turn_context.agent_id, new_history)
.await;
sess.recompute_token_usage(&turn_context).await;
let rollout_item = RolloutItem::Compacted(CompactedItem {
message: summary_text.clone(),
replacement_history: None,
});
sess.persist_rollout_items(&[rollout_item]).await;
sess.persist_rollout_items(&turn_context.agent_id, &[rollout_item])
.await;
let event = EventMsg::ContextCompacted(ContextCompactedEvent {});
sess.send_event(&turn_context, event).await;

View File

@@ -40,7 +40,7 @@ async fn run_remote_compact_task_inner_impl(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> CodexResult<()> {
let mut history = sess.clone_history().await;
let mut history = sess.clone_history(&turn_context.agent_id).await;
let prompt = Prompt {
input: history.get_history_for_prompt(),
tools: vec![],
@@ -64,15 +64,19 @@ async fn run_remote_compact_task_inner_impl(
if !ghost_snapshots.is_empty() {
new_history.extend(ghost_snapshots);
}
sess.replace_history(new_history.clone()).await;
sess.replace_history(&turn_context.agent_id, new_history.clone())
.await;
sess.recompute_token_usage(turn_context).await;
let compacted_item = CompactedItem {
message: String::new(),
replacement_history: Some(new_history),
};
sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)])
.await;
sess.persist_rollout_items(
&turn_context.agent_id,
&[RolloutItem::Compacted(compacted_item)],
)
.await;
let event = EventMsg::ContextCompacted(ContextCompactedEvent {});
sess.send_event(turn_context, event).await;

View File

@@ -20,8 +20,10 @@ use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ModelPreset;
use codex_protocol::protocol::DEFAULT_AGENT_ID;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionSource;
use std::collections::HashMap;
use std::path::PathBuf;
@@ -146,6 +148,7 @@ impl ConversationManager {
Event {
id,
msg: EventMsg::SessionConfigured(session_configured),
..
} if id == INITIAL_SUBMIT_ID => session_configured,
_ => {
return Err(CodexErr::SessionConfiguredNotFirstEvent);
@@ -267,12 +270,16 @@ impl ConversationManager {
/// (0-based) and all items that follow it.
fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> InitialHistory {
// Work directly on rollout items, and cut the vector at the nth user message input.
let items: Vec<RolloutItem> = history.get_rollout_items();
let lines: Vec<RolloutLine> = history.get_rollout_lines();
let root_lines: Vec<RolloutLine> = lines
.into_iter()
.filter(|line| line.agent_id.as_deref().unwrap_or(&DEFAULT_AGENT_ID) == *DEFAULT_AGENT_ID)
.collect();
// Find indices of user message inputs in rollout order.
let mut user_positions: Vec<usize> = Vec::new();
for (idx, item) in items.iter().enumerate() {
if let RolloutItem::ResponseItem(item @ ResponseItem::Message { .. }) = item
for (idx, line) in root_lines.iter().enumerate() {
if let RolloutItem::ResponseItem(item @ ResponseItem::Message { .. }) = &line.item
&& matches!(
crate::event_mapping::parse_turn_item(item),
Some(TurnItem::UserMessage(_))
@@ -289,7 +296,7 @@ fn truncate_before_nth_user_message(history: InitialHistory, n: usize) -> Initia
// Cut strictly before the nth user message (do not keep the nth itself).
let cut_idx = user_positions[n];
let rolled: Vec<RolloutItem> = items.into_iter().take(cut_idx).collect();
let rolled: Vec<RolloutLine> = root_lines.into_iter().take(cut_idx).collect();
if rolled.is_empty() {
InitialHistory::New
@@ -353,13 +360,17 @@ mod tests {
];
// Wrap as InitialHistory::Forked with response items only.
let initial: Vec<RolloutItem> = items
let initial: Vec<RolloutLine> = items
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.map(|item| RolloutLine {
timestamp: "t".to_string(),
agent_id: None,
item: RolloutItem::ResponseItem(item),
})
.collect();
let truncated = truncate_before_nth_user_message(InitialHistory::Forked(initial), 1);
let got_items = truncated.get_rollout_items();
let got_items = truncated.get_rollout_items_for_agent(&DEFAULT_AGENT_ID);
let expected_items = vec![
RolloutItem::ResponseItem(items[0].clone()),
RolloutItem::ResponseItem(items[1].clone()),
@@ -370,10 +381,14 @@ mod tests {
serde_json::to_value(&expected_items).unwrap()
);
let initial2: Vec<RolloutItem> = items
let initial2: Vec<RolloutLine> = items
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.map(|item| RolloutLine {
timestamp: "t".to_string(),
agent_id: None,
item: RolloutItem::ResponseItem(item),
})
.collect();
let truncated2 = truncate_before_nth_user_message(InitialHistory::Forked(initial2), 2);
assert_matches!(truncated2, InitialHistory::New);
@@ -388,14 +403,18 @@ mod tests {
items.push(user_msg("second question"));
items.push(assistant_msg("answer"));
let rollout_items: Vec<RolloutItem> = items
let rollout_items: Vec<RolloutLine> = items
.iter()
.cloned()
.map(RolloutItem::ResponseItem)
.map(|item| RolloutLine {
timestamp: "t".to_string(),
agent_id: None,
item: RolloutItem::ResponseItem(item),
})
.collect();
let truncated = truncate_before_nth_user_message(InitialHistory::Forked(rollout_items), 1);
let got_items = truncated.get_rollout_items();
let got_items = truncated.get_rollout_items_for_agent(&DEFAULT_AGENT_ID);
let expected: Vec<RolloutItem> = vec![
RolloutItem::ResponseItem(items[0].clone()),

View File

@@ -20,6 +20,7 @@ use crate::error::CodexErr;
use crate::error::Result;
use crate::error::SandboxErr;
use crate::get_platform_sandbox;
use crate::protocol::AgentId;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::ExecCommandOutputDeltaEvent;
@@ -123,6 +124,7 @@ pub enum SandboxType {
#[derive(Clone)]
pub struct StdoutStream {
pub sub_id: String,
pub agent_id: AgentId,
pub call_id: String,
pub tx_event: Sender<Event>,
}
@@ -715,6 +717,7 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
});
let event = Event {
id: stream.sub_id.clone(),
agent_id: Some(stream.agent_id.clone()),
msg,
};
#[allow(clippy::let_unit_value)]

View File

@@ -159,6 +159,7 @@ impl ElicitationRequestManager {
let _ = tx_event
.send(Event {
id: "mcp_elicitation_request".to_string(),
agent_id: None,
msg: EventMsg::ElicitationRequest(ElicitationRequestEvent {
server_name,
id,
@@ -373,6 +374,7 @@ impl McpConnectionManager {
let _ = tx_event
.send(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
agent_id: None,
msg: EventMsg::McpStartupComplete(summary),
})
.await;
@@ -664,6 +666,7 @@ async fn emit_update(
tx_event
.send(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
agent_id: None,
msg: EventMsg::McpStartupUpdate(update),
})
.await

View File

@@ -5,7 +5,7 @@
//! JSON-Lines tooling. Each record has the following schema:
//!
//! ````text
//! {"conversation_id":"<uuid>","ts":<unix_seconds>,"text":"<message>"}
//! {"conversation_id":"<uuid>","agent_id":"<agent>","ts":<unix_seconds>,"text":"<message>"}
//! ````
//!
//! To minimise the chance of interleaved writes when multiple processes are
@@ -54,6 +54,8 @@ const RETRY_SLEEP: Duration = Duration::from_millis(100);
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct HistoryEntry {
pub session_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub agent_id: Option<String>,
pub ts: u64,
pub text: String,
}
@@ -69,6 +71,7 @@ fn history_filepath(config: &Config) -> PathBuf {
/// which entails a small amount of blocking I/O internally.
pub(crate) async fn append_entry(
text: &str,
agent_id: Option<&str>,
conversation_id: &ConversationId,
config: &Config,
) -> Result<()> {
@@ -99,6 +102,7 @@ pub(crate) async fn append_entry(
// Construct the JSON line first so we can write it in a single syscall.
let entry = HistoryEntry {
session_id: conversation_id.to_string(),
agent_id: agent_id.map(str::to_string),
ts,
text: text.to_string(),
};
@@ -416,11 +420,13 @@ mod tests {
let entries = vec![
HistoryEntry {
session_id: "first-session".to_string(),
agent_id: None,
ts: 1,
text: "first".to_string(),
},
HistoryEntry {
session_id: "second-session".to_string(),
agent_id: None,
ts: 2,
text: "second".to_string(),
},
@@ -451,11 +457,13 @@ mod tests {
let initial = HistoryEntry {
session_id: "first-session".to_string(),
agent_id: None,
ts: 1,
text: "first".to_string(),
};
let appended = HistoryEntry {
session_id: "second-session".to_string(),
agent_id: None,
ts: 2,
text: "second".to_string(),
};
@@ -504,7 +512,7 @@ mod tests {
let history_path = codex_home.path().join("history.jsonl");
append_entry(&entry_one, &conversation_id, &config)
append_entry(&entry_one, None, &conversation_id, &config)
.await
.expect("write first entry");
@@ -514,7 +522,7 @@ mod tests {
config.history.max_bytes =
Some(usize::try_from(limit_bytes).expect("limit should fit into usize"));
append_entry(&entry_two, &conversation_id, &config)
append_entry(&entry_two, None, &conversation_id, &config)
.await
.expect("write second entry");
@@ -551,13 +559,13 @@ mod tests {
let history_path = codex_home.path().join("history.jsonl");
append_entry(&short_entry, &conversation_id, &config)
append_entry(&short_entry, None, &conversation_id, &config)
.await
.expect("write first entry");
let short_entry_len = std::fs::metadata(&history_path).expect("metadata").len();
append_entry(&long_entry, &conversation_id, &config)
append_entry(&long_entry, None, &conversation_id, &config)
.await
.expect("write second entry");
@@ -572,7 +580,7 @@ mod tests {
.expect("max bytes should fit into usize"),
);
append_entry(&long_entry, &conversation_id, &config)
append_entry(&long_entry, None, &conversation_id, &config)
.await
.expect("write third entry");

View File

@@ -26,6 +26,7 @@ use super::policy::is_persisted_response_item;
use crate::config::Config;
use crate::default_client::originator;
use crate::git_info::collect_git_info;
use codex_protocol::protocol::AgentId;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::ResumedHistory;
use codex_protocol::protocol::RolloutItem;
@@ -62,7 +63,10 @@ pub enum RolloutRecorderParams {
}
enum RolloutCmd {
AddItems(Vec<RolloutItem>),
AddItems {
agent_id: String,
items: Vec<RolloutItem>,
},
/// Ensure all prior writes are processed; respond when flushed.
Flush {
ack: oneshot::Sender<()>,
@@ -177,7 +181,11 @@ impl RolloutRecorder {
Ok(Self { tx, rollout_path })
}
pub(crate) async fn record_items(&self, items: &[RolloutItem]) -> std::io::Result<()> {
pub(crate) async fn record_items(
&self,
agent_id: &AgentId,
items: &[RolloutItem],
) -> std::io::Result<()> {
let mut filtered = Vec::new();
for item in items {
// Note that function calls may look a bit strange if they are
@@ -191,7 +199,10 @@ impl RolloutRecorder {
return Ok(());
}
self.tx
.send(RolloutCmd::AddItems(filtered))
.send(RolloutCmd::AddItems {
agent_id: agent_id.to_string(),
items: filtered,
})
.await
.map_err(|e| IoError::other(format!("failed to queue rollout items: {e}")))
}
@@ -214,7 +225,7 @@ impl RolloutRecorder {
return Err(IoError::other("empty session file"));
}
let mut items: Vec<RolloutItem> = Vec::new();
let mut lines: Vec<RolloutLine> = Vec::new();
let mut conversation_id: Option<ConversationId> = None;
for line in text.lines() {
if line.trim().is_empty() {
@@ -228,30 +239,17 @@ impl RolloutRecorder {
}
};
// Parse the rollout line structure
match serde_json::from_value::<RolloutLine>(v.clone()) {
Ok(rollout_line) => match rollout_line.item {
RolloutItem::SessionMeta(session_meta_line) => {
Ok(rollout_line) => {
if let RolloutItem::SessionMeta(session_meta_line) = &rollout_line.item
&& conversation_id.is_none()
{
// Use the FIRST SessionMeta encountered in the file as the canonical
// conversation id and main session information. Keep all items intact.
if conversation_id.is_none() {
conversation_id = Some(session_meta_line.meta.id);
}
items.push(RolloutItem::SessionMeta(session_meta_line));
conversation_id = Some(session_meta_line.meta.id);
}
RolloutItem::ResponseItem(item) => {
items.push(RolloutItem::ResponseItem(item));
}
RolloutItem::Compacted(item) => {
items.push(RolloutItem::Compacted(item));
}
RolloutItem::TurnContext(item) => {
items.push(RolloutItem::TurnContext(item));
}
RolloutItem::EventMsg(_ev) => {
items.push(RolloutItem::EventMsg(_ev));
}
},
lines.push(rollout_line);
}
Err(e) => {
warn!("failed to parse rollout line: {v:?}, error: {e}");
}
@@ -260,20 +258,20 @@ impl RolloutRecorder {
info!(
"Resumed rollout with {} items, conversation ID: {:?}",
items.len(),
lines.len(),
conversation_id
);
let conversation_id = conversation_id
.ok_or_else(|| IoError::other("failed to parse conversation ID from rollout file"))?;
if items.is_empty() {
if lines.is_empty() {
return Ok(InitialHistory::New);
}
info!("Resumed rollout successfully from {path:?}");
Ok(InitialHistory::Resumed(ResumedHistory {
conversation_id,
history: items,
history: lines,
rollout_path: path.to_path_buf(),
}))
}
@@ -364,17 +362,19 @@ async fn rollout_writer(
// Write the SessionMeta as the first item in the file, wrapped in a rollout line
writer
.write_rollout_item(RolloutItem::SessionMeta(session_meta_line))
.write_rollout_item(None, RolloutItem::SessionMeta(session_meta_line))
.await?;
}
// Process rollout commands
while let Some(cmd) = rx.recv().await {
match cmd {
RolloutCmd::AddItems(items) => {
RolloutCmd::AddItems { agent_id, items } => {
for item in items {
if is_persisted_response_item(&item) {
writer.write_rollout_item(item).await?;
writer
.write_rollout_item(Some(agent_id.as_str()), item)
.await?;
}
}
}
@@ -400,7 +400,11 @@ struct JsonlWriter {
}
impl JsonlWriter {
async fn write_rollout_item(&mut self, rollout_item: RolloutItem) -> std::io::Result<()> {
async fn write_rollout_item(
&mut self,
agent_id: Option<&str>,
rollout_item: RolloutItem,
) -> std::io::Result<()> {
let timestamp_format: &[FormatItem] = format_description!(
"[year]-[month]-[day]T[hour]:[minute]:[second].[subsecond digits:3]Z"
);
@@ -410,6 +414,7 @@ impl JsonlWriter {
let line = RolloutLine {
timestamp,
agent_id: agent_id.map(str::to_string),
item: rollout_item,
};
self.write_line(&line).await

View File

@@ -582,6 +582,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> {
let conversation_id = ConversationId::from_string(&uuid.to_string())?;
let meta_line = RolloutLine {
timestamp: ts.to_string(),
agent_id: None,
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
id: conversation_id,
@@ -600,6 +601,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> {
let user_event_line = RolloutLine {
timestamp: ts.to_string(),
agent_id: None,
item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "hello".into(),
images: None,
@@ -611,6 +613,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> {
for idx in 0..total_messages {
let response_line = RolloutLine {
timestamp: format!("{ts}-{idx:02}"),
agent_id: None,
item: RolloutItem::ResponseItem(ResponseItem::Message {
id: None,
role: "assistant".into(),

View File

@@ -0,0 +1,28 @@
use tokio::sync::Mutex;
use crate::codex::SessionConfiguration;
use crate::state::ActiveTurn;
use crate::state::SessionState;
use codex_protocol::protocol::AgentId;
/// Per-agent mutable state shared across async tasks.
///
/// The struct itself is stored in an `Arc`, so fields use `Mutex` to guard
/// concurrent mutation rather than additional `Arc` layers.
pub(crate) struct AgentState {
pub(crate) agent_id: AgentId,
/// Session configuration + conversation history for this agent.
pub(crate) state: Mutex<SessionState>,
/// Active turn state is tracked per-agent (each agent can have its own task).
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
}
impl AgentState {
pub(crate) fn new(agent_id: AgentId, session_configuration: SessionConfiguration) -> Self {
Self {
agent_id,
state: Mutex::new(SessionState::new(session_configuration)),
active_turn: Mutex::new(None),
}
}
}

View File

@@ -1,7 +1,9 @@
mod agent;
mod service;
mod session;
mod turn;
pub(crate) use agent::AgentState;
pub(crate) use service::SessionServices;
pub(crate) use session::SessionState;
pub(crate) use turn::ActiveTurn;

View File

@@ -8,14 +8,6 @@ mod user_shell;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use tokio::select;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tokio_util::task::AbortOnDropHandle;
use tracing::trace;
use tracing::warn;
use crate::AuthManager;
use crate::codex::Session;
use crate::codex::TurnContext;
@@ -27,7 +19,15 @@ use crate::protocol::TurnAbortedEvent;
use crate::state::ActiveTurn;
use crate::state::RunningTask;
use crate::state::TaskKind;
use async_trait::async_trait;
use codex_protocol::protocol::AgentId;
use codex_protocol::user_input::UserInput;
use tokio::select;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tokio_util::task::AbortOnDropHandle;
use tracing::trace;
use tracing::warn;
pub(crate) use compact::CompactTask;
pub(crate) use ghost_snapshot::GhostSnapshotTask;
@@ -109,7 +109,8 @@ impl Session {
input: Vec<UserInput>,
task: T,
) {
self.abort_all_tasks(TurnAbortReason::Replaced).await;
self.abort_agent_tasks(&turn_context.agent_id, TurnAbortReason::Replaced)
.await;
let task: Arc<dyn SessionTask> = Arc::new(task);
let task_kind = task.kind();
@@ -152,14 +153,25 @@ impl Session {
cancellation_token,
turn_context: Arc::clone(&turn_context),
};
self.register_new_active_task(running_task).await;
self.register_new_active_task(&turn_context.agent_id, running_task)
.await;
}
pub async fn abort_agent_tasks(self: &Arc<Self>, agent_id: &AgentId, reason: TurnAbortReason) {
for task in self.take_running_tasks_for_agent(agent_id).await {
self.handle_task_abort(task, reason.clone()).await;
}
self.maybe_close_unified_exec_sessions().await;
}
pub async fn abort_all_tasks(self: &Arc<Self>, reason: TurnAbortReason) {
for task in self.take_all_running_tasks().await {
self.handle_task_abort(task, reason.clone()).await;
for agent in self.agent_states().await {
let agent_id = agent.agent_id.clone();
for task in self.take_running_tasks_for_agent(&agent_id).await {
self.handle_task_abort(task, reason.clone()).await;
}
}
self.close_unified_exec_sessions().await;
self.maybe_close_unified_exec_sessions().await;
}
pub async fn on_task_finished(
@@ -167,32 +179,31 @@ impl Session {
turn_context: Arc<TurnContext>,
last_agent_message: Option<String>,
) {
let mut active = self.active_turn.lock().await;
let should_close_sessions = if let Some(at) = active.as_mut()
&& at.remove_task(&turn_context.sub_id)
let agent = self.get_or_create_agent(&turn_context.agent_id).await;
{
*active = None;
true
} else {
false
};
drop(active);
if should_close_sessions {
self.close_unified_exec_sessions().await;
let mut active = agent.active_turn.lock().await;
if let Some(at) = active.as_mut()
&& at.remove_task(&turn_context.sub_id)
{
*active = None;
}
}
self.maybe_close_unified_exec_sessions().await;
let event = EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message });
self.send_event(turn_context.as_ref(), event).await;
}
async fn register_new_active_task(&self, task: RunningTask) {
let mut active = self.active_turn.lock().await;
async fn register_new_active_task(&self, agent_id: &AgentId, task: RunningTask) {
let agent = self.get_or_create_agent(agent_id).await;
let mut active = agent.active_turn.lock().await;
let mut turn = ActiveTurn::default();
turn.add_task(task);
*active = Some(turn);
}
async fn take_all_running_tasks(&self) -> Vec<RunningTask> {
let mut active = self.active_turn.lock().await;
async fn take_running_tasks_for_agent(&self, agent_id: &AgentId) -> Vec<RunningTask> {
let agent = self.get_or_create_agent(agent_id).await;
let mut active = agent.active_turn.lock().await;
match active.take() {
Some(mut at) => {
at.clear_pending().await;
@@ -203,7 +214,10 @@ impl Session {
}
}
async fn close_unified_exec_sessions(&self) {
async fn maybe_close_unified_exec_sessions(&self) {
if self.has_active_tasks().await {
return;
}
self.services
.unified_exec_manager
.terminate_all_sessions()

View File

@@ -59,7 +59,7 @@ impl SessionTask for UndoTask {
return None;
}
let mut history = sess.clone_history().await;
let mut history = sess.clone_history(&ctx.agent_id).await;
let mut items = history.get_history();
let mut completed = UndoCompletedEvent {
success: false,
@@ -96,7 +96,7 @@ impl SessionTask for UndoTask {
match restore_result {
Ok(Ok(())) => {
items.remove(idx);
sess.replace_history(items).await;
sess.replace_history(&ctx.agent_id, items).await;
let short_id: String = commit_id.chars().take(7).collect();
info!(commit_id = commit_id, "Undo restored ghost snapshot");
completed.success = true;

View File

@@ -108,6 +108,7 @@ impl SessionTask for UserShellCommandTask {
let stdout_stream = Some(StdoutStream {
sub_id: turn_context.sub_id.clone(),
agent_id: turn_context.agent_id.clone(),
call_id: call_id.clone(),
tx_event: session.get_tx_event(),
});

View File

@@ -65,7 +65,10 @@ impl ToolHandler for ViewImageHandler {
let event_path = abs_path.clone();
session
.inject_input(vec![UserInput::LocalImage { path: abs_path }])
.inject_input(
&turn.agent_id,
vec![UserInput::LocalImage { path: abs_path }],
)
.await
.map_err(|_| {
FunctionCallError::RespondToModel(

View File

@@ -71,6 +71,7 @@ impl ApplyPatchRuntime {
fn stdout_stream(ctx: &ToolCtx<'_>) -> Option<crate::exec::StdoutStream> {
Some(crate::exec::StdoutStream {
sub_id: ctx.turn.sub_id.clone(),
agent_id: ctx.turn.agent_id.clone(),
call_id: ctx.call_id.clone(),
tx_event: ctx.session.get_tx_event(),
})

View File

@@ -56,6 +56,7 @@ impl ShellRuntime {
fn stdout_stream(ctx: &ToolCtx<'_>) -> Option<crate::exec::StdoutStream> {
Some(crate::exec::StdoutStream {
sub_id: ctx.turn.sub_id.clone(),
agent_id: ctx.turn.agent_id.clone(),
call_id: ctx.call_id.clone(),
tx_event: ctx.session.get_tx_event(),
})

View File

@@ -8,6 +8,7 @@ use codex_core::protocol::EventMsg;
use codex_core::protocol::InitialHistory;
use codex_core::protocol::ResumedHistory;
use codex_core::protocol::RolloutItem;
use codex_core::protocol::RolloutLine;
use codex_core::protocol::TurnContextItem;
use codex_core::protocol::WarningEvent;
use codex_protocol::ConversationId;
@@ -37,7 +38,11 @@ fn resume_history(
InitialHistory::Resumed(ResumedHistory {
conversation_id: ConversationId::default(),
history: vec![RolloutItem::TurnContext(turn_ctx)],
history: vec![RolloutLine {
timestamp: String::new(),
agent_id: None,
item: RolloutItem::TurnContext(turn_ctx),
}],
rollout_path: rollout_path.to_path_buf(),
})
}

View File

@@ -20,9 +20,10 @@ These are entities exit on the codex backend. The intent of this section is to e
- `Codex` starts with no `Session`, and it is initialized by `Op::ConfigureSession`, which should be the first message sent by the UI.
- The current `Session` can be reconfigured with additional `Op::ConfigureSession` calls.
- Any running execution is aborted when the session is reconfigured.
- A session hosts one or more agents, each with its own conversation history and turn loop.
3. `Task`
- A `Task` is `Codex` executing work in response to user input.
- `Session` has at most one `Task` running at a time.
- Each agent has at most one `Task` running at a time (tasks can run concurrently across agents).
- Receiving `Op::UserInput` starts a `Task`
- Consists of a series of `Turn`s
- The `Task` executes to until:
@@ -44,7 +45,7 @@ The term "UI" is used to refer to the application driving `Codex`. This may be t
When a `Turn` completes, the `response_id` from the `Model`'s final `response.completed` message is stored in the `Session` state to resume the thread given the next `Op::UserInput`. The `response_id` is also returned in the `EventMsg::TurnComplete` to the UI, which can be used to fork the thread from an earlier point by providing it in the `Op::UserInput`.
Since only 1 `Task` can be run at a time, for parallel tasks it is recommended that a single `Codex` be run for each thread of work.
Since each agent can only run 1 `Task` at a time, parallel tasks should be routed to different agents (or separate Codex sessions).
## Interface
@@ -58,6 +59,7 @@ Since only 1 `Task` can be run at a time, for parallel tasks it is recommended t
- `Event`
- These are messages sent on the `EQ` (`Codex` -> UI)
- Each `Event` has a non-unique ID, matching the `sub_id` from the `Op::UserInput` that started the current task.
- `Event` includes an optional `agent_id`; when omitted, clients should treat it as the default agent (`root`).
- `EventMsg` refers to the enum of all possible `Event` payloads
- This enum is `non_exhaustive`; variants can be added at future dates
- It should be expected that new `EventMsg` variants will be added over time to expose more detailed information about the model's actions.
@@ -66,6 +68,7 @@ For complete documentation of the `Op` and `EventMsg` variants, refer to [protoc
- `Op`
- `Op::UserInput` Any input from the user to kick off a `Task`
- `Op::ForAgent` Route a nested `Op` to a specific agent (defaults to the `root` agent when omitted)
- `Op::Interrupt` Interrupts a running task
- `Op::ExecApproval` Approve or deny code execution
- `Op::ListSkills` Request skills for one or more cwd values (optionally `force_reload`)

View File

@@ -160,7 +160,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
}
fn process_event(&mut self, event: Event) -> CodexStatus {
let Event { id: _, msg } = event;
let Event { id: _, msg, .. } = event;
match msg {
EventMsg::Error(ErrorEvent { message, .. }) => {
let prefix = "ERROR:".style(self.red);

View File

@@ -510,6 +510,7 @@ impl EventProcessor for EventProcessorWithJsonOutput {
fn print_config_summary(&mut self, _: &Config, _: &str, ev: &SessionConfiguredEvent) {
self.process_event(Event {
id: "".to_string(),
agent_id: None,
msg: EventMsg::SessionConfigured(ev.clone()),
});
}

View File

@@ -61,6 +61,7 @@ use std::time::Duration;
fn event(id: &str, msg: EventMsg) -> Event {
Event {
id: id.to_string(),
agent_id: None,
msg,
}
}

View File

@@ -68,6 +68,7 @@ pub async fn run_codex_tool_session(
let session_configured_event = Event {
// Use a fake id value for now.
id: "".to_string(),
agent_id: None,
msg: EventMsg::SessionConfigured(session_configured.clone()),
};
outgoing

View File

@@ -255,6 +255,7 @@ mod tests {
let rollout_file = NamedTempFile::new()?;
let event = Event {
id: "1".to_string(),
agent_id: None,
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: conversation_id,
model: "gpt-4o".to_string(),
@@ -309,6 +310,7 @@ mod tests {
};
let event = Event {
id: "1".to_string(),
agent_id: None,
msg: EventMsg::SessionConfigured(session_configured_event.clone()),
};
let meta = OutgoingNotificationMeta {

View File

@@ -6,6 +6,9 @@ use ts_rs::TS;
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS)]
pub struct HistoryEntry {
pub conversation_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub agent_id: Option<String>,
pub ts: u64,
pub text: String,
}

View File

@@ -8,6 +8,7 @@ use std::fmt;
use std::path::Path;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::LazyLock;
use std::time::Duration;
use crate::ConversationId;
@@ -50,6 +51,9 @@ pub const USER_INSTRUCTIONS_CLOSE_TAG: &str = "</user_instructions>";
pub const ENVIRONMENT_CONTEXT_OPEN_TAG: &str = "<environment_context>";
pub const ENVIRONMENT_CONTEXT_CLOSE_TAG: &str = "</environment_context>";
pub const USER_MESSAGE_BEGIN: &str = "## My request for Codex:";
pub type AgentId = String;
pub static DEFAULT_AGENT_ID: LazyLock<AgentId> = LazyLock::new(|| "root".to_string());
/// Submission Queue Entry - requests from user
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
@@ -66,6 +70,13 @@ pub struct Submission {
#[allow(clippy::large_enum_variant)]
#[non_exhaustive]
pub enum Op {
/// Route an operation to a specific agent.
ForAgent {
/// Identifier of the target agent.
agent_id: AgentId,
/// Operation to route to the agent.
op: Box<Op>,
},
/// Abort current task.
/// This server sends [`EventMsg::TurnAborted`] in response.
Interrupt,
@@ -513,10 +524,14 @@ impl SandboxPolicy {
}
/// Event Queue Entry - events from agent
#[derive(Debug, Clone, Deserialize, Serialize)]
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct Event {
/// Submission `id` that this event is correlated with.
pub id: String,
/// Optional agent identifier associated with this event.
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub agent_id: Option<AgentId>,
/// Payload
pub msg: EventMsg,
}
@@ -1158,7 +1173,7 @@ pub struct ConversationPathResponseEvent {
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct ResumedHistory {
pub conversation_id: ConversationId,
pub history: Vec<RolloutItem>,
pub history: Vec<RolloutLine>,
pub rollout_path: PathBuf,
}
@@ -1166,37 +1181,63 @@ pub struct ResumedHistory {
pub enum InitialHistory {
New,
Resumed(ResumedHistory),
Forked(Vec<RolloutItem>),
Forked(Vec<RolloutLine>),
}
impl InitialHistory {
pub fn get_rollout_items(&self) -> Vec<RolloutItem> {
pub fn get_rollout_lines(&self) -> Vec<RolloutLine> {
match self {
InitialHistory::New => Vec::new(),
InitialHistory::Resumed(resumed) => resumed.history.clone(),
InitialHistory::Forked(items) => items.clone(),
InitialHistory::Forked(lines) => lines.clone(),
}
}
pub fn get_rollout_items_for_agent(&self, agent_id: &AgentId) -> Vec<RolloutItem> {
self.get_rollout_lines()
.into_iter()
.filter_map(|line| {
let line_agent = line.agent_id.as_deref().unwrap_or(&DEFAULT_AGENT_ID);
(line_agent == agent_id).then_some(line.item)
})
.collect()
}
pub fn get_event_msgs(&self) -> Option<Vec<EventMsg>> {
self.get_event_msgs_for_agent(&DEFAULT_AGENT_ID)
}
pub fn get_event_msgs_for_agent(&self, agent_id: &AgentId) -> Option<Vec<EventMsg>> {
match self {
InitialHistory::New => None,
InitialHistory::Resumed(resumed) => Some(
resumed
.history
.iter()
.filter_map(|ri| match ri {
RolloutItem::EventMsg(ev) => Some(ev.clone()),
_ => None,
.filter_map(|line| {
let line_agent = line.agent_id.as_deref().unwrap_or(&DEFAULT_AGENT_ID);
if line_agent != agent_id {
return None;
}
match &line.item {
RolloutItem::EventMsg(ev) => Some(ev.clone()),
_ => None,
}
})
.collect(),
),
InitialHistory::Forked(items) => Some(
items
InitialHistory::Forked(lines) => Some(
lines
.iter()
.filter_map(|ri| match ri {
RolloutItem::EventMsg(ev) => Some(ev.clone()),
_ => None,
.filter_map(|line| {
let line_agent = line.agent_id.as_deref().unwrap_or(&DEFAULT_AGENT_ID);
if line_agent != agent_id {
return None;
}
match &line.item {
RolloutItem::EventMsg(ev) => Some(ev.clone()),
_ => None,
}
})
.collect(),
),
@@ -1343,9 +1384,12 @@ pub enum TruncationPolicy {
Tokens(usize),
}
#[derive(Serialize, Deserialize, Clone, JsonSchema)]
#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema, TS)]
pub struct RolloutLine {
pub timestamp: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub agent_id: Option<AgentId>,
#[serde(flatten)]
pub item: RolloutItem,
}
@@ -1931,6 +1975,7 @@ mod tests {
let rollout_file = NamedTempFile::new()?;
let event = Event {
id: "1234".to_string(),
agent_id: None,
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: conversation_id,
model: "codex-mini-latest".to_string(),
@@ -1990,6 +2035,7 @@ mod tests {
fn serialize_mcp_startup_update_event() -> Result<()> {
let event = Event {
id: "init".to_string(),
agent_id: None,
msg: EventMsg::McpStartupUpdate(McpStartupUpdateEvent {
server: "srv".to_string(),
status: McpStartupStatus::Failed {
@@ -2010,6 +2056,7 @@ mod tests {
fn serialize_mcp_startup_complete_event() -> Result<()> {
let event = Event {
id: "init".to_string(),
agent_id: None,
msg: EventMsg::McpStartupComplete(McpStartupCompleteEvent {
ready: vec!["a".to_string()],
failed: vec![McpStartupFailure {

View File

@@ -1624,6 +1624,7 @@ mod tests {
app.chat_widget.handle_codex_event(Event {
id: String::new(),
agent_id: None,
msg: EventMsg::SessionConfigured(event),
});

View File

@@ -26,6 +26,7 @@ use codex_core::protocol::AgentReasoningRawContentEvent;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::BackgroundEventEvent;
use codex_core::protocol::CreditsSnapshot;
use codex_core::protocol::DEFAULT_AGENT_ID;
use codex_core::protocol::DeprecationNoticeEvent;
use codex_core::protocol::ErrorEvent;
use codex_core::protocol::Event;
@@ -1807,6 +1808,7 @@ impl ChatWidget {
self.app_event_tx.send(AppEvent::CodexEvent(Event {
id: "1".to_string(),
agent_id: None,
// msg: EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
// call_id: "1".to_string(),
// command: vec!["git".into(), "apply".into()],
@@ -1994,10 +1996,25 @@ impl ChatWidget {
}
pub(crate) fn handle_codex_event(&mut self, event: Event) {
let Event { id, msg } = event;
let Event { id, msg, agent_id } = event;
if !self.should_handle_agent_event(agent_id.as_deref(), &msg) {
return;
}
self.dispatch_event_msg(Some(id), msg, false);
}
fn should_handle_agent_event(&self, agent_id: Option<&str>, msg: &EventMsg) -> bool {
let agent_id = agent_id.unwrap_or(&DEFAULT_AGENT_ID);
if agent_id == *DEFAULT_AGENT_ID {
return true;
}
matches!(
msg,
EventMsg::ExecApprovalRequest(_) | EventMsg::ApplyPatchApprovalRequest(_)
)
}
/// Dispatch a protocol `EventMsg` to the appropriate handler.
///
/// `id` is `Some` for live events and `None` for replayed events from

View File

@@ -36,6 +36,7 @@ pub(crate) fn spawn_agent(
eprintln!("{message}");
app_event_tx_clone.send(AppEvent::CodexEvent(Event {
id: "".to_string(),
agent_id: None,
msg: EventMsg::Error(err.to_error_event(None)),
}));
app_event_tx_clone.send(AppEvent::ExitRequest);
@@ -48,6 +49,7 @@ pub(crate) fn spawn_agent(
let ev = codex_core::protocol::Event {
// The `id` does not matter for rendering, so we can use a fake value.
id: "".to_string(),
agent_id: None,
msg: codex_core::protocol::EventMsg::SessionConfigured(session_configured),
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
@@ -85,6 +87,7 @@ pub(crate) fn spawn_agent_from_existing(
// Forward the captured `SessionConfigured` event so it can be rendered in the UI.
let ev = codex_core::protocol::Event {
id: "".to_string(),
agent_id: None,
msg: codex_core::protocol::EventMsg::SessionConfigured(session_configured),
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));

View File

@@ -127,6 +127,7 @@ async fn resumed_initial_messages_render_history() {
chat.handle_codex_event(Event {
id: "initial".into(),
agent_id: None,
msg: EventMsg::SessionConfigured(configured),
});
@@ -159,6 +160,7 @@ async fn entered_review_mode_uses_request_hint() {
chat.handle_codex_event(Event {
id: "review-start".into(),
agent_id: None,
msg: EventMsg::EnteredReviewMode(ReviewRequest {
target: ReviewTarget::BaseBranch {
branch: "feature".to_string(),
@@ -180,6 +182,7 @@ async fn entered_review_mode_defaults_to_current_changes_banner() {
chat.handle_codex_event(Event {
id: "review-start".into(),
agent_id: None,
msg: EventMsg::EnteredReviewMode(ReviewRequest {
target: ReviewTarget::UncommittedChanges,
user_facing_hint: None,
@@ -203,6 +206,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "token-before".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(make_token_info(pre_review_tokens, context_window)),
rate_limits: None,
@@ -212,6 +216,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "review-start".into(),
agent_id: None,
msg: EventMsg::EnteredReviewMode(ReviewRequest {
target: ReviewTarget::BaseBranch {
branch: "feature".to_string(),
@@ -222,6 +227,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "token-review".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(make_token_info(review_tokens, context_window)),
rate_limits: None,
@@ -231,6 +237,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "review-end".into(),
agent_id: None,
msg: EventMsg::ExitedReviewMode(ExitedReviewModeEvent {
review_output: None,
}),
@@ -251,6 +258,7 @@ async fn token_count_none_resets_context_indicator() {
chat.handle_codex_event(Event {
id: "token-before".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(make_token_info(pre_compact_tokens, context_window)),
rate_limits: None,
@@ -260,6 +268,7 @@ async fn token_count_none_resets_context_indicator() {
chat.handle_codex_event(Event {
id: "token-cleared".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: None,
rate_limits: None,
@@ -290,6 +299,7 @@ async fn context_indicator_shows_used_tokens_when_window_unknown() {
chat.handle_codex_event(Event {
id: "token-usage".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(token_info),
rate_limits: None,
@@ -727,6 +737,7 @@ async fn exec_approval_emits_proposed_command_and_decision_history() {
};
chat.handle_codex_event(Event {
id: "sub-short".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
@@ -771,6 +782,7 @@ async fn exec_approval_decision_truncates_multiline_and_long_commands() {
};
chat.handle_codex_event(Event {
id: "sub-multi".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev_multi),
});
let proposed_multi = drain_insert_history(&mut rx);
@@ -821,6 +833,7 @@ async fn exec_approval_decision_truncates_multiline_and_long_commands() {
};
chat.handle_codex_event(Event {
id: "sub-long".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev_long),
});
let proposed_long = drain_insert_history(&mut rx);
@@ -863,6 +876,7 @@ fn begin_exec_with_source(
};
chat.handle_codex_event(Event {
id: call_id.to_string(),
agent_id: None,
msg: EventMsg::ExecCommandBegin(event.clone()),
});
event
@@ -932,6 +946,7 @@ fn end_exec(
} = begin_event;
chat.handle_codex_event(Event {
id: call_id.clone(),
agent_id: None,
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
process_id,
@@ -1187,6 +1202,7 @@ async fn exec_end_without_begin_uses_event_command() {
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
chat.handle_codex_event(Event {
id: "call-orphan".to_string(),
agent_id: None,
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: "call-orphan".to_string(),
process_id: None,
@@ -1497,6 +1513,7 @@ async fn undo_success_events_render_info_messages() {
chat.handle_codex_event(Event {
id: "turn-1".to_string(),
agent_id: None,
msg: EventMsg::UndoStarted(UndoStartedEvent {
message: Some("Undo requested for the last turn...".to_string()),
}),
@@ -1508,6 +1525,7 @@ async fn undo_success_events_render_info_messages() {
chat.handle_codex_event(Event {
id: "turn-1".to_string(),
agent_id: None,
msg: EventMsg::UndoCompleted(UndoCompletedEvent {
success: true,
message: None,
@@ -1534,6 +1552,7 @@ async fn undo_failure_events_render_error_message() {
chat.handle_codex_event(Event {
id: "turn-2".to_string(),
agent_id: None,
msg: EventMsg::UndoStarted(UndoStartedEvent { message: None }),
});
assert!(
@@ -1543,6 +1562,7 @@ async fn undo_failure_events_render_error_message() {
chat.handle_codex_event(Event {
id: "turn-2".to_string(),
agent_id: None,
msg: EventMsg::UndoCompleted(UndoCompletedEvent {
success: false,
message: Some("Failed to restore workspace state.".to_string()),
@@ -1569,6 +1589,7 @@ async fn undo_started_hides_interrupt_hint() {
chat.handle_codex_event(Event {
id: "turn-hint".to_string(),
agent_id: None,
msg: EventMsg::UndoStarted(UndoStartedEvent { message: None }),
});
@@ -1692,6 +1713,7 @@ async fn view_image_tool_call_adds_history_cell() {
chat.handle_codex_event(Event {
id: "sub-image".into(),
agent_id: None,
msg: EventMsg::ViewImageToolCall(ViewImageToolCallEvent {
call_id: "call-image".into(),
path: image_path,
@@ -1717,6 +1739,7 @@ async fn interrupt_exec_marks_failed_snapshot() {
// cause the active exec cell to be finalized as failed and flushed.
chat.handle_codex_event(Event {
id: "call-int".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -1742,6 +1765,7 @@ async fn interrupted_turn_error_message_snapshot() {
// Simulate an in-progress task so the widget is in a running state.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -1750,6 +1774,7 @@ async fn interrupted_turn_error_message_snapshot() {
// Abort the turn (like pressing Esc) and drain inserted history.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -2394,6 +2419,7 @@ async fn approval_modal_exec_snapshot() -> anyhow::Result<()> {
};
chat.handle_codex_event(Event {
id: "sub-approve".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
// Render to a fixed-size test terminal and snapshot.
@@ -2447,6 +2473,7 @@ async fn approval_modal_exec_without_reason_snapshot() -> anyhow::Result<()> {
};
chat.handle_codex_event(Event {
id: "sub-approve-noreason".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
@@ -2489,6 +2516,7 @@ async fn approval_modal_patch_snapshot() -> anyhow::Result<()> {
};
chat.handle_codex_event(Event {
id: "sub-approve-patch".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
@@ -2525,6 +2553,7 @@ async fn interrupt_restores_queued_messages_into_composer() {
// Deliver a TurnAborted event with Interrupted reason (as if Esc was pressed).
chat.handle_codex_event(Event {
id: "turn-1".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -2563,6 +2592,7 @@ async fn interrupt_prepends_queued_messages_before_existing_composer_text() {
chat.handle_codex_event(Event {
id: "turn-1".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -2608,12 +2638,14 @@ async fn ui_snapshots_small_heights_task_running() {
// Activate status line
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
});
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Thinking**".into(),
}),
@@ -2639,6 +2671,7 @@ async fn status_widget_and_approval_modal_snapshot() {
// Begin a running task so the status indicator would be active.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -2646,6 +2679,7 @@ async fn status_widget_and_approval_modal_snapshot() {
// Provide a deterministic header for the status line.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Analyzing**".into(),
}),
@@ -2668,6 +2702,7 @@ async fn status_widget_and_approval_modal_snapshot() {
};
chat.handle_codex_event(Event {
id: "sub-approve-exec".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
@@ -2691,6 +2726,7 @@ async fn status_widget_active_snapshot() {
// Activate the status indicator by simulating a task start.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -2698,6 +2734,7 @@ async fn status_widget_active_snapshot() {
// Provide a deterministic header via a bold reasoning chunk.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Analyzing**".into(),
}),
@@ -2719,6 +2756,7 @@ async fn mcp_startup_header_booting_snapshot() {
chat.handle_codex_event(Event {
id: "mcp-1".into(),
agent_id: None,
msg: EventMsg::McpStartupUpdate(McpStartupUpdateEvent {
server: "alpha".into(),
status: McpStartupStatus::Starting,
@@ -2740,6 +2778,7 @@ async fn background_event_updates_status_header() {
chat.handle_codex_event(Event {
id: "bg-1".into(),
agent_id: None,
msg: EventMsg::BackgroundEvent(BackgroundEventEvent {
message: "Waiting for `vim`".to_string(),
}),
@@ -2771,6 +2810,7 @@ async fn apply_patch_events_emit_history_cells() {
};
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
let cells = drain_insert_history(&mut rx);
@@ -2811,6 +2851,7 @@ async fn apply_patch_events_emit_history_cells() {
};
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(begin),
});
let cells = drain_insert_history(&mut rx);
@@ -2839,6 +2880,7 @@ async fn apply_patch_events_emit_history_cells() {
};
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyEnd(end),
});
let cells = drain_insert_history(&mut rx);
@@ -2861,6 +2903,7 @@ async fn apply_patch_manual_approval_adjusts_header() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2880,6 +2923,7 @@ async fn apply_patch_manual_approval_adjusts_header() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2910,6 +2954,7 @@ async fn apply_patch_manual_flow_snapshot() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2933,6 +2978,7 @@ async fn apply_patch_manual_flow_snapshot() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2970,6 +3016,7 @@ async fn apply_patch_approval_sends_op_with_submission_id() {
};
chat.handle_codex_event(Event {
id: "sub-123".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
@@ -3001,6 +3048,7 @@ async fn apply_patch_full_flow_integration_like() {
);
chat.handle_codex_event(Event {
id: "sub-xyz".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -3042,6 +3090,7 @@ async fn apply_patch_full_flow_integration_like() {
);
chat.handle_codex_event(Event {
id: "sub-xyz".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -3056,6 +3105,7 @@ async fn apply_patch_full_flow_integration_like() {
);
chat.handle_codex_event(Event {
id: "sub-xyz".into(),
agent_id: None,
msg: EventMsg::PatchApplyEnd(PatchApplyEndEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -3081,6 +3131,7 @@ async fn apply_patch_untrusted_shows_approval_modal() -> anyhow::Result<()> {
);
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -3132,6 +3183,7 @@ async fn apply_patch_request_shows_diff_summary() -> anyhow::Result<()> {
);
chat.handle_codex_event(Event {
id: "sub-apply".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "call-apply".into(),
turn_id: "turn-apply".into(),
@@ -3204,6 +3256,7 @@ async fn plan_update_renders_history_cell() {
};
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::PlanUpdate(update),
});
let cells = drain_insert_history(&mut rx);
@@ -3225,6 +3278,7 @@ async fn stream_error_updates_status_indicator() {
let msg = "Reconnecting... 2/5";
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::StreamError(StreamErrorEvent {
message: msg.to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
@@ -3248,6 +3302,7 @@ async fn warning_event_adds_warning_history_cell() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::Warning(WarningEvent {
message: "test warning message".to_string(),
}),
@@ -3267,6 +3322,7 @@ async fn stream_recovery_restores_previous_status_header() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "task".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -3274,6 +3330,7 @@ async fn stream_recovery_restores_previous_status_header() {
drain_insert_history(&mut rx);
chat.handle_codex_event(Event {
id: "retry".into(),
agent_id: None,
msg: EventMsg::StreamError(StreamErrorEvent {
message: "Reconnecting... 1/5".to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
@@ -3282,6 +3339,7 @@ async fn stream_recovery_restores_previous_status_header() {
drain_insert_history(&mut rx);
chat.handle_codex_event(Event {
id: "delta".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "hello".to_string(),
}),
@@ -3302,6 +3360,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// Begin turn
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -3310,6 +3369,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// First finalized assistant message
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "First message".into(),
}),
@@ -3318,6 +3378,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// Second finalized assistant message in the same turn
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Second message".into(),
}),
@@ -3326,6 +3387,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// End turn
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: None,
}),
@@ -3356,12 +3418,14 @@ async fn final_reasoning_then_message_without_deltas_are_rendered() {
// No deltas; only final reasoning followed by final message.
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoning(AgentReasoningEvent {
text: "I will first analyze the request.".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Here is the result.".into(),
}),
@@ -3383,24 +3447,28 @@ async fn deltas_then_same_final_message_are_rendered_snapshot() {
// Stream some reasoning deltas first.
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "I will ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "first analyze the ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "request.".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoning(AgentReasoningEvent {
text: "request.".into(),
}),
@@ -3409,12 +3477,14 @@ async fn deltas_then_same_final_message_are_rendered_snapshot() {
// Then stream answer deltas, followed by the exact same final message.
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "Here is the ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "result.".into(),
}),
@@ -3422,6 +3492,7 @@ async fn deltas_then_same_final_message_are_rendered_snapshot() {
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Here is the result.".into(),
}),
@@ -3445,6 +3516,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent { message: "Im going to search the repo for where “Change Approved” is rendered to update that view.".into() }),
});
@@ -3464,6 +3536,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
chat.handle_codex_event(Event {
id: "c1".into(),
agent_id: None,
msg: EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: "c1".into(),
process_id: None,
@@ -3477,6 +3550,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
});
chat.handle_codex_event(Event {
id: "c1".into(),
agent_id: None,
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: "c1".into(),
process_id: None,
@@ -3496,12 +3570,14 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
});
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
});
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Investigating rendering code**".into(),
}),
@@ -3540,6 +3616,7 @@ async fn chatwidget_markdown_code_blocks_vt100_snapshot() {
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -3588,6 +3665,7 @@ printf 'fenced within fenced\n'
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
});
// Drive commit ticks and drain emitted history lines into the vt100 buffer.
@@ -3611,6 +3689,7 @@ printf 'fenced within fenced\n'
// Finalize the stream without sending a final AgentMessage, to flush any tail.
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: None,
}),
@@ -3628,6 +3707,7 @@ async fn chatwidget_tall() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),

View File

@@ -2735,6 +2735,7 @@ mod tests {
app.chat_widget.handle_codex_event(Event {
id: String::new(),
agent_id: None,
msg: EventMsg::SessionConfigured(event),
});

View File

@@ -24,6 +24,7 @@ use codex_core::protocol::AgentReasoningRawContentEvent;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::BackgroundEventEvent;
use codex_core::protocol::CreditsSnapshot;
use codex_core::protocol::DEFAULT_AGENT_ID;
use codex_core::protocol::DeprecationNoticeEvent;
use codex_core::protocol::ErrorEvent;
use codex_core::protocol::Event;
@@ -1632,6 +1633,7 @@ impl ChatWidget {
self.app_event_tx.send(AppEvent::CodexEvent(Event {
id: "1".to_string(),
agent_id: None,
// msg: EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
// call_id: "1".to_string(),
// command: vec!["git".into(), "apply".into()],
@@ -1797,10 +1799,25 @@ impl ChatWidget {
}
pub(crate) fn handle_codex_event(&mut self, event: Event) {
let Event { id, msg } = event;
let Event { id, msg, agent_id } = event;
if !self.should_handle_agent_event(agent_id.as_deref(), &msg) {
return;
}
self.dispatch_event_msg(Some(id), msg, false);
}
fn should_handle_agent_event(&self, agent_id: Option<&str>, msg: &EventMsg) -> bool {
let agent_id = agent_id.unwrap_or(&DEFAULT_AGENT_ID);
if agent_id == *DEFAULT_AGENT_ID {
return true;
}
matches!(
msg,
EventMsg::ExecApprovalRequest(_) | EventMsg::ApplyPatchApprovalRequest(_)
)
}
/// Dispatch a protocol `EventMsg` to the appropriate handler.
///
/// `id` is `Some` for live events and `None` for replayed events from

View File

@@ -36,6 +36,7 @@ pub(crate) fn spawn_agent(
eprintln!("{message}");
app_event_tx_clone.send(AppEvent::CodexEvent(Event {
id: "".to_string(),
agent_id: None,
msg: EventMsg::Error(err.to_error_event(None)),
}));
app_event_tx_clone.send(AppEvent::ExitRequest);
@@ -48,6 +49,7 @@ pub(crate) fn spawn_agent(
let ev = codex_core::protocol::Event {
// The `id` does not matter for rendering, so we can use a fake value.
id: "".to_string(),
agent_id: None,
msg: codex_core::protocol::EventMsg::SessionConfigured(session_configured),
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
@@ -85,6 +87,7 @@ pub(crate) fn spawn_agent_from_existing(
// Forward the captured `SessionConfigured` event so it can be rendered in the UI.
let ev = codex_core::protocol::Event {
id: "".to_string(),
agent_id: None,
msg: codex_core::protocol::EventMsg::SessionConfigured(session_configured),
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));

View File

@@ -125,6 +125,7 @@ async fn resumed_initial_messages_render_history() {
chat.handle_codex_event(Event {
id: "initial".into(),
agent_id: None,
msg: EventMsg::SessionConfigured(configured),
});
@@ -157,6 +158,7 @@ async fn entered_review_mode_uses_request_hint() {
chat.handle_codex_event(Event {
id: "review-start".into(),
agent_id: None,
msg: EventMsg::EnteredReviewMode(ReviewRequest {
target: ReviewTarget::BaseBranch {
branch: "feature".to_string(),
@@ -178,6 +180,7 @@ async fn entered_review_mode_defaults_to_current_changes_banner() {
chat.handle_codex_event(Event {
id: "review-start".into(),
agent_id: None,
msg: EventMsg::EnteredReviewMode(ReviewRequest {
target: ReviewTarget::UncommittedChanges,
user_facing_hint: None,
@@ -201,6 +204,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "token-before".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(make_token_info(pre_review_tokens, context_window)),
rate_limits: None,
@@ -210,6 +214,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "review-start".into(),
agent_id: None,
msg: EventMsg::EnteredReviewMode(ReviewRequest {
target: ReviewTarget::BaseBranch {
branch: "feature".to_string(),
@@ -220,6 +225,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "token-review".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(make_token_info(review_tokens, context_window)),
rate_limits: None,
@@ -229,6 +235,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "review-end".into(),
agent_id: None,
msg: EventMsg::ExitedReviewMode(ExitedReviewModeEvent {
review_output: None,
}),
@@ -249,6 +256,7 @@ async fn token_count_none_resets_context_indicator() {
chat.handle_codex_event(Event {
id: "token-before".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(make_token_info(pre_compact_tokens, context_window)),
rate_limits: None,
@@ -258,6 +266,7 @@ async fn token_count_none_resets_context_indicator() {
chat.handle_codex_event(Event {
id: "token-cleared".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: None,
rate_limits: None,
@@ -288,6 +297,7 @@ async fn context_indicator_shows_used_tokens_when_window_unknown() {
chat.handle_codex_event(Event {
id: "token-usage".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(token_info),
rate_limits: None,
@@ -723,6 +733,7 @@ async fn exec_approval_emits_proposed_command_and_decision_history() {
};
chat.handle_codex_event(Event {
id: "sub-short".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
@@ -767,6 +778,7 @@ async fn exec_approval_decision_truncates_multiline_and_long_commands() {
};
chat.handle_codex_event(Event {
id: "sub-multi".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev_multi),
});
let proposed_multi = drain_insert_history(&mut rx);
@@ -817,6 +829,7 @@ async fn exec_approval_decision_truncates_multiline_and_long_commands() {
};
chat.handle_codex_event(Event {
id: "sub-long".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev_long),
});
let proposed_long = drain_insert_history(&mut rx);
@@ -859,6 +872,7 @@ fn begin_exec_with_source(
};
chat.handle_codex_event(Event {
id: call_id.to_string(),
agent_id: None,
msg: EventMsg::ExecCommandBegin(event.clone()),
});
event
@@ -892,6 +906,7 @@ fn end_exec(
} = begin_event;
chat.handle_codex_event(Event {
id: call_id.clone(),
agent_id: None,
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
process_id,
@@ -1147,6 +1162,7 @@ async fn exec_end_without_begin_uses_event_command() {
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
chat.handle_codex_event(Event {
id: "call-orphan".to_string(),
agent_id: None,
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: "call-orphan".to_string(),
process_id: None,
@@ -1332,6 +1348,7 @@ async fn undo_success_events_render_info_messages() {
chat.handle_codex_event(Event {
id: "turn-1".to_string(),
agent_id: None,
msg: EventMsg::UndoStarted(UndoStartedEvent {
message: Some("Undo requested for the last turn...".to_string()),
}),
@@ -1343,6 +1360,7 @@ async fn undo_success_events_render_info_messages() {
chat.handle_codex_event(Event {
id: "turn-1".to_string(),
agent_id: None,
msg: EventMsg::UndoCompleted(UndoCompletedEvent {
success: true,
message: None,
@@ -1369,6 +1387,7 @@ async fn undo_failure_events_render_error_message() {
chat.handle_codex_event(Event {
id: "turn-2".to_string(),
agent_id: None,
msg: EventMsg::UndoStarted(UndoStartedEvent { message: None }),
});
assert!(
@@ -1378,6 +1397,7 @@ async fn undo_failure_events_render_error_message() {
chat.handle_codex_event(Event {
id: "turn-2".to_string(),
agent_id: None,
msg: EventMsg::UndoCompleted(UndoCompletedEvent {
success: false,
message: Some("Failed to restore workspace state.".to_string()),
@@ -1404,6 +1424,7 @@ async fn undo_started_hides_interrupt_hint() {
chat.handle_codex_event(Event {
id: "turn-hint".to_string(),
agent_id: None,
msg: EventMsg::UndoStarted(UndoStartedEvent { message: None }),
});
@@ -1527,6 +1548,7 @@ async fn view_image_tool_call_adds_history_cell() {
chat.handle_codex_event(Event {
id: "sub-image".into(),
agent_id: None,
msg: EventMsg::ViewImageToolCall(ViewImageToolCallEvent {
call_id: "call-image".into(),
path: image_path,
@@ -1552,6 +1574,7 @@ async fn interrupt_exec_marks_failed_snapshot() {
// cause the active exec cell to be finalized as failed and flushed.
chat.handle_codex_event(Event {
id: "call-int".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -1577,6 +1600,7 @@ async fn interrupted_turn_error_message_snapshot() {
// Simulate an in-progress task so the widget is in a running state.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -1585,6 +1609,7 @@ async fn interrupted_turn_error_message_snapshot() {
// Abort the turn (like pressing Esc) and drain inserted history.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -2056,6 +2081,7 @@ async fn approval_modal_exec_snapshot() {
};
chat.handle_codex_event(Event {
id: "sub-approve".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
// Render to a fixed-size test terminal and snapshot.
@@ -2107,6 +2133,7 @@ async fn approval_modal_exec_without_reason_snapshot() {
};
chat.handle_codex_event(Event {
id: "sub-approve-noreason".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
@@ -2147,6 +2174,7 @@ async fn approval_modal_patch_snapshot() {
};
chat.handle_codex_event(Event {
id: "sub-approve-patch".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
@@ -2181,6 +2209,7 @@ async fn interrupt_restores_queued_messages_into_composer() {
// Deliver a TurnAborted event with Interrupted reason (as if Esc was pressed).
chat.handle_codex_event(Event {
id: "turn-1".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -2219,6 +2248,7 @@ async fn interrupt_prepends_queued_messages_before_existing_composer_text() {
chat.handle_codex_event(Event {
id: "turn-1".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -2264,12 +2294,14 @@ async fn ui_snapshots_small_heights_task_running() {
// Activate status line
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
});
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Thinking**".into(),
}),
@@ -2295,6 +2327,7 @@ async fn status_widget_and_approval_modal_snapshot() {
// Begin a running task so the status indicator would be active.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -2302,6 +2335,7 @@ async fn status_widget_and_approval_modal_snapshot() {
// Provide a deterministic header for the status line.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Analyzing**".into(),
}),
@@ -2324,6 +2358,7 @@ async fn status_widget_and_approval_modal_snapshot() {
};
chat.handle_codex_event(Event {
id: "sub-approve-exec".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
@@ -2347,6 +2382,7 @@ async fn status_widget_active_snapshot() {
// Activate the status indicator by simulating a task start.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -2354,6 +2390,7 @@ async fn status_widget_active_snapshot() {
// Provide a deterministic header via a bold reasoning chunk.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Analyzing**".into(),
}),
@@ -2375,6 +2412,7 @@ async fn mcp_startup_header_booting_snapshot() {
chat.handle_codex_event(Event {
id: "mcp-1".into(),
agent_id: None,
msg: EventMsg::McpStartupUpdate(McpStartupUpdateEvent {
server: "alpha".into(),
status: McpStartupStatus::Starting,
@@ -2396,6 +2434,7 @@ async fn background_event_updates_status_header() {
chat.handle_codex_event(Event {
id: "bg-1".into(),
agent_id: None,
msg: EventMsg::BackgroundEvent(BackgroundEventEvent {
message: "Waiting for `vim`".to_string(),
}),
@@ -2427,6 +2466,7 @@ async fn apply_patch_events_emit_history_cells() {
};
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
let cells = drain_insert_history(&mut rx);
@@ -2467,6 +2507,7 @@ async fn apply_patch_events_emit_history_cells() {
};
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(begin),
});
let cells = drain_insert_history(&mut rx);
@@ -2495,6 +2536,7 @@ async fn apply_patch_events_emit_history_cells() {
};
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyEnd(end),
});
let cells = drain_insert_history(&mut rx);
@@ -2517,6 +2559,7 @@ async fn apply_patch_manual_approval_adjusts_header() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2536,6 +2579,7 @@ async fn apply_patch_manual_approval_adjusts_header() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2566,6 +2610,7 @@ async fn apply_patch_manual_flow_snapshot() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2589,6 +2634,7 @@ async fn apply_patch_manual_flow_snapshot() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2626,6 +2672,7 @@ async fn apply_patch_approval_sends_op_with_submission_id() {
};
chat.handle_codex_event(Event {
id: "sub-123".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
@@ -2657,6 +2704,7 @@ async fn apply_patch_full_flow_integration_like() {
);
chat.handle_codex_event(Event {
id: "sub-xyz".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -2698,6 +2746,7 @@ async fn apply_patch_full_flow_integration_like() {
);
chat.handle_codex_event(Event {
id: "sub-xyz".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -2712,6 +2761,7 @@ async fn apply_patch_full_flow_integration_like() {
);
chat.handle_codex_event(Event {
id: "sub-xyz".into(),
agent_id: None,
msg: EventMsg::PatchApplyEnd(PatchApplyEndEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -2737,6 +2787,7 @@ async fn apply_patch_untrusted_shows_approval_modal() {
);
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -2786,6 +2837,7 @@ async fn apply_patch_request_shows_diff_summary() {
);
chat.handle_codex_event(Event {
id: "sub-apply".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "call-apply".into(),
turn_id: "turn-apply".into(),
@@ -2856,6 +2908,7 @@ async fn plan_update_renders_history_cell() {
};
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::PlanUpdate(update),
});
let cells = drain_insert_history(&mut rx);
@@ -2877,6 +2930,7 @@ async fn stream_error_updates_status_indicator() {
let msg = "Reconnecting... 2/5";
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::StreamError(StreamErrorEvent {
message: msg.to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
@@ -2900,6 +2954,7 @@ async fn warning_event_adds_warning_history_cell() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::Warning(WarningEvent {
message: "test warning message".to_string(),
}),
@@ -2919,6 +2974,7 @@ async fn stream_recovery_restores_previous_status_header() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "task".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -2926,6 +2982,7 @@ async fn stream_recovery_restores_previous_status_header() {
drain_insert_history(&mut rx);
chat.handle_codex_event(Event {
id: "retry".into(),
agent_id: None,
msg: EventMsg::StreamError(StreamErrorEvent {
message: "Reconnecting... 1/5".to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
@@ -2934,6 +2991,7 @@ async fn stream_recovery_restores_previous_status_header() {
drain_insert_history(&mut rx);
chat.handle_codex_event(Event {
id: "delta".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "hello".to_string(),
}),
@@ -2954,6 +3012,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// Begin turn
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -2962,6 +3021,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// First finalized assistant message
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "First message".into(),
}),
@@ -2970,6 +3030,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// Second finalized assistant message in the same turn
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Second message".into(),
}),
@@ -2978,6 +3039,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// End turn
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: None,
}),
@@ -3008,12 +3070,14 @@ async fn final_reasoning_then_message_without_deltas_are_rendered() {
// No deltas; only final reasoning followed by final message.
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoning(AgentReasoningEvent {
text: "I will first analyze the request.".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Here is the result.".into(),
}),
@@ -3035,24 +3099,28 @@ async fn deltas_then_same_final_message_are_rendered_snapshot() {
// Stream some reasoning deltas first.
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "I will ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "first analyze the ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "request.".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoning(AgentReasoningEvent {
text: "request.".into(),
}),
@@ -3061,12 +3129,14 @@ async fn deltas_then_same_final_message_are_rendered_snapshot() {
// Then stream answer deltas, followed by the exact same final message.
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "Here is the ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "result.".into(),
}),
@@ -3074,6 +3144,7 @@ async fn deltas_then_same_final_message_are_rendered_snapshot() {
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Here is the result.".into(),
}),
@@ -3097,6 +3168,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent { message: "Im going to search the repo for where “Change Approved” is rendered to update that view.".into() }),
});
@@ -3116,6 +3188,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
chat.handle_codex_event(Event {
id: "c1".into(),
agent_id: None,
msg: EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: "c1".into(),
process_id: None,
@@ -3129,6 +3202,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
});
chat.handle_codex_event(Event {
id: "c1".into(),
agent_id: None,
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: "c1".into(),
process_id: None,
@@ -3148,12 +3222,14 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
});
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
});
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Investigating rendering code**".into(),
}),
@@ -3192,6 +3268,7 @@ async fn chatwidget_markdown_code_blocks_vt100_snapshot() {
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -3240,6 +3317,7 @@ printf 'fenced within fenced\n'
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
});
// Drive commit ticks and drain emitted history lines into the vt100 buffer.
@@ -3263,6 +3341,7 @@ printf 'fenced within fenced\n'
// Finalize the stream without sending a final AgentMessage, to flush any tail.
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: None,
}),
@@ -3280,6 +3359,7 @@ async fn chatwidget_tall() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),