Compare commits

...

4 Commits

Author SHA1 Message Date
jif-oai
ec1de7e8c0 Merge branch 'main' into jif/multi-agent-2 2025-12-23 10:49:12 +01:00
jif-oai
9c859b4e85 base 3 2025-12-18 14:50:41 +00:00
jif-oai
090e5cd95e More baase 2025-12-18 14:27:28 +00:00
jif-oai
61474b285c base 2025-12-18 14:07:39 +00:00
29 changed files with 344 additions and 57 deletions

View File

@@ -90,6 +90,7 @@ pub(crate) async fn apply_bespoke_event_handling(
let Event {
id: event_turn_id,
msg,
..
} = event;
match msg {
EventMsg::TaskComplete(_ev) => {

View File

@@ -32,6 +32,8 @@ use async_channel::Sender;
use codex_protocol::ConversationId;
use codex_protocol::approvals::ExecPolicyAmendment;
use codex_protocol::items::TurnItem;
use codex_protocol::protocol::AgentId;
use codex_protocol::protocol::DEFAULT_AGENT_ID;
use codex_protocol::protocol::FileChange;
use codex_protocol::protocol::HasLegacyEvent;
use codex_protocol::protocol::ItemCompletedEvent;
@@ -197,6 +199,7 @@ fn maybe_push_chat_wire_api_deprecation(
post_session_configured_events.push(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
agent_id: Some(DEFAULT_AGENT_ID.to_string()),
msg: EventMsg::DeprecationNotice(DeprecationNoticeEvent {
summary: CHAT_WIRE_API_DEPRECATION_SUMMARY.to_string(),
details: None,
@@ -351,6 +354,7 @@ pub(crate) struct Session {
/// The context needed for a single turn of the conversation.
#[derive(Debug)]
pub(crate) struct TurnContext {
pub(crate) agent_id: AgentId,
pub(crate) sub_id: String,
pub(crate) client: ModelClient,
/// The session's current working directory. All relative paths provided by
@@ -486,6 +490,7 @@ impl Session {
per_turn_config: Config,
model_family: ModelFamily,
conversation_id: ConversationId,
agent_id: AgentId,
sub_id: String,
) -> TurnContext {
let otel_manager = otel_manager.clone().with_model(
@@ -512,6 +517,7 @@ impl Session {
});
TurnContext {
agent_id,
sub_id,
client,
cwd: session_configuration.cwd.clone(),
@@ -612,6 +618,7 @@ impl Session {
};
post_session_configured_events.push(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
agent_id: Some(DEFAULT_AGENT_ID.to_string()),
msg: EventMsg::DeprecationNotice(DeprecationNoticeEvent { summary, details }),
});
}
@@ -867,6 +874,7 @@ impl Session {
Ok(self
.new_turn_from_configuration(
agent_id,
sub_id,
session_configuration,
updates.final_output_json_schema,
@@ -877,6 +885,7 @@ impl Session {
async fn new_turn_from_configuration(
&self,
agent_id: &AgentId,
sub_id: String,
session_configuration: SessionConfiguration,
final_output_json_schema: Option<Option<Value>>,
@@ -915,6 +924,7 @@ impl Session {
per_turn_config,
model_family,
self.conversation_id,
agent_id.clone(),
sub_id,
);
if let Some(final_schema) = final_output_json_schema {
@@ -962,6 +972,7 @@ impl Session {
let legacy_source = msg.clone();
let event = Event {
id: turn_context.sub_id.clone(),
agent_id: Some(turn_context.agent_id.clone()),
msg,
};
self.send_event_raw(event).await;
@@ -970,6 +981,7 @@ impl Session {
for legacy in legacy_source.as_legacy_events(show_raw_agent_reasoning) {
let legacy_event = Event {
id: turn_context.sub_id.clone(),
agent_id: Some(turn_context.agent_id.clone()),
msg: legacy,
};
self.send_event_raw(legacy_event).await;
@@ -1169,7 +1181,8 @@ impl Session {
items: &[ResponseItem],
) {
self.record_into_history(items, turn_context).await;
self.persist_rollout_response_items(items).await;
self.persist_rollout_response_items(&turn_context.agent_id, items)
.await;
self.send_raw_response_items(turn_context, items).await;
}
@@ -1296,7 +1309,7 @@ impl Session {
guard.clone()
};
if let Some(rec) = recorder
&& let Err(e) = rec.record_items(items).await
&& let Err(e) = rec.record_items(agent_id, items).await
{
error!("failed to record rollout items: {e:#}");
}
@@ -1326,7 +1339,7 @@ impl Session {
pub(crate) async fn recompute_token_usage(&self, turn_context: &TurnContext) {
let Some(estimated_total_tokens) = self
.clone_history()
.clone_history(&turn_context.agent_id)
.await
.estimate_token_count(turn_context)
else {
@@ -1575,14 +1588,15 @@ impl Session {
async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiver<Submission>) {
// Seed with context in case there is an OverrideTurnContext first.
let mut previous_context: Option<Arc<TurnContext>> = Some(sess.new_default_turn().await);
let mut previous_context: Option<Arc<TurnContext>> =
Some(sess.new_default_turn_for_agent(&agent_id).await);
// To break out of this loop, send Op::Shutdown.
while let Ok(sub) = rx_sub.recv().await {
debug!(?sub, "Submission");
match sub.op.clone() {
Op::Interrupt => {
handlers::interrupt(&sess).await;
handlers::interrupt(&sess, &agent_id).await;
}
Op::OverrideTurnContext {
cwd,
@@ -1594,6 +1608,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
} => {
handlers::override_turn_context(
&sess,
&agent_id,
sub.id.clone(),
SessionSettingsUpdate {
cwd,
@@ -1608,8 +1623,14 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
.await;
}
Op::UserInput { .. } | Op::UserTurn { .. } => {
handlers::user_input_or_turn(&sess, sub.id.clone(), sub.op, &mut previous_context)
.await;
handlers::user_input_or_turn(
&sess,
&agent_id,
sub.id.clone(),
sub.op,
&mut previous_context,
)
.await;
}
Op::ExecApproval { id, decision } => {
handlers::exec_approval(&sess, id, decision).await;
@@ -1618,30 +1639,38 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
handlers::patch_approval(&sess, id, decision).await;
}
Op::AddToHistory { text } => {
handlers::add_to_history(&sess, &config, text).await;
handlers::add_to_history(&sess, &agent_id, &config, text).await;
}
Op::GetHistoryEntryRequest { offset, log_id } => {
handlers::get_history_entry_request(&sess, &config, sub.id.clone(), offset, log_id)
.await;
handlers::get_history_entry_request(
&sess,
&agent_id,
&config,
sub.id.clone(),
offset,
log_id,
)
.await;
}
Op::ListMcpTools => {
handlers::list_mcp_tools(&sess, &config, sub.id.clone()).await;
handlers::list_mcp_tools(&sess, &agent_id, &config, sub.id.clone()).await;
}
Op::ListCustomPrompts => {
handlers::list_custom_prompts(&sess, sub.id.clone()).await;
handlers::list_custom_prompts(&sess, &agent_id, sub.id.clone()).await;
}
Op::ListSkills { cwds, force_reload } => {
handlers::list_skills(&sess, sub.id.clone(), cwds, force_reload).await;
handlers::list_skills(&sess, &agent_id, sub.id.clone(), cwds, force_reload).await;
}
Op::Undo => {
handlers::undo(&sess, sub.id.clone()).await;
handlers::undo(&sess, &agent_id, sub.id.clone()).await;
}
Op::Compact => {
handlers::compact(&sess, sub.id.clone()).await;
handlers::compact(&sess, &agent_id, sub.id.clone()).await;
}
Op::RunUserShellCommand { command } => {
handlers::run_user_shell_command(
&sess,
&agent_id,
sub.id.clone(),
command,
&mut previous_context,
@@ -1661,7 +1690,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
}
}
Op::Review { review_request } => {
handlers::review(&sess, &config, sub.id.clone(), review_request).await;
handlers::review(&sess, &agent_id, &config, sub.id.clone(), review_request).await;
}
_ => {} // Ignore unknown ops; enum is non_exhaustive to allow extensions.
}
@@ -1714,6 +1743,7 @@ mod handlers {
pub async fn override_turn_context(
sess: &Session,
agent_id: &AgentId,
sub_id: String,
updates: SessionSettingsUpdate,
) {
@@ -1731,6 +1761,7 @@ mod handlers {
pub async fn user_input_or_turn(
sess: &Arc<Session>,
agent_id: &AgentId,
sub_id: String,
op: Op,
previous_context: &mut Option<Arc<TurnContext>>,
@@ -1787,6 +1818,7 @@ mod handlers {
pub async fn run_user_shell_command(
sess: &Arc<Session>,
agent_id: &AgentId,
sub_id: String,
command: String,
previous_context: &mut Option<Arc<TurnContext>>,
@@ -1863,7 +1895,12 @@ mod handlers {
}
}
pub async fn add_to_history(sess: &Arc<Session>, config: &Arc<Config>, text: String) {
pub async fn add_to_history(
sess: &Arc<Session>,
agent_id: &AgentId,
config: &Arc<Config>,
text: String,
) {
let id = sess.conversation_id;
let config = Arc::clone(config);
tokio::spawn(async move {
@@ -1875,6 +1912,7 @@ mod handlers {
pub async fn get_history_entry_request(
sess: &Arc<Session>,
agent_id: &AgentId,
config: &Arc<Config>,
sub_id: String,
offset: usize,
@@ -1910,7 +1948,12 @@ mod handlers {
});
}
pub async fn list_mcp_tools(sess: &Session, config: &Arc<Config>, sub_id: String) {
pub async fn list_mcp_tools(
sess: &Session,
agent_id: &AgentId,
config: &Arc<Config>,
sub_id: String,
) {
let mcp_connection_manager = sess.services.mcp_connection_manager.read().await;
let snapshot = collect_mcp_snapshot_from_manager(
&mcp_connection_manager,
@@ -1928,7 +1971,7 @@ mod handlers {
sess.send_event_raw(event).await;
}
pub async fn list_custom_prompts(sess: &Session, sub_id: String) {
pub async fn list_custom_prompts(sess: &Session, agent_id: &str, sub_id: String) {
let custom_prompts: Vec<CustomPrompt> =
if let Some(dir) = crate::custom_prompts::default_prompts_dir() {
crate::custom_prompts::discover_prompts_in(&dir).await
@@ -1947,6 +1990,7 @@ mod handlers {
pub async fn list_skills(
sess: &Session,
agent_id: &AgentId,
sub_id: String,
cwds: Vec<PathBuf>,
force_reload: bool,
@@ -2044,6 +2088,7 @@ mod handlers {
pub async fn review(
sess: &Arc<Session>,
agent_id: &AgentId,
config: &Arc<Config>,
sub_id: String,
review_request: ReviewRequest,
@@ -2745,12 +2790,15 @@ mod tests {
use codex_protocol::models::FunctionCallOutputPayload;
use super::AgentState;
use crate::protocol::CompactedItem;
use crate::protocol::CreditsSnapshot;
use crate::protocol::DEFAULT_AGENT_ID;
use crate::protocol::InitialHistory;
use crate::protocol::RateLimitSnapshot;
use crate::protocol::RateLimitWindow;
use crate::protocol::ResumedHistory;
use crate::protocol::RolloutLine;
use crate::state::TaskKind;
use crate::tasks::SessionTask;
use crate::tasks::SessionTaskContext;
@@ -2777,6 +2825,7 @@ mod tests {
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration as StdDuration;
use tokio::sync::RwLock;
#[tokio::test]
async fn reconstruct_history_matches_live_compactions() {
@@ -3238,6 +3287,7 @@ mod tests {
per_turn_config,
model_family,
conversation_id,
agent_id,
"turn_id".to_string(),
));
@@ -3477,7 +3527,7 @@ mod tests {
let initial_context = session.build_initial_context(turn_context);
for item in &initial_context {
rollout_items.push(RolloutItem::ResponseItem(item.clone()));
push_line(RolloutItem::ResponseItem(item.clone()));
}
live_history.record_items(initial_context.iter(), turn_context.truncation_policy);
@@ -3489,7 +3539,7 @@ mod tests {
}],
};
live_history.record_items(std::iter::once(&user1), turn_context.truncation_policy);
rollout_items.push(RolloutItem::ResponseItem(user1.clone()));
push_line(RolloutItem::ResponseItem(user1.clone()));
let assistant1 = ResponseItem::Message {
id: None,
@@ -3499,7 +3549,7 @@ mod tests {
}],
};
live_history.record_items(std::iter::once(&assistant1), turn_context.truncation_policy);
rollout_items.push(RolloutItem::ResponseItem(assistant1.clone()));
push_line(RolloutItem::ResponseItem(assistant1.clone()));
let summary1 = "summary one";
let snapshot1 = live_history.get_history();
@@ -3510,7 +3560,7 @@ mod tests {
summary1,
);
live_history.replace(rebuilt1);
rollout_items.push(RolloutItem::Compacted(CompactedItem {
push_line(RolloutItem::Compacted(CompactedItem {
message: summary1.to_string(),
replacement_history: None,
}));
@@ -3523,7 +3573,7 @@ mod tests {
}],
};
live_history.record_items(std::iter::once(&user2), turn_context.truncation_policy);
rollout_items.push(RolloutItem::ResponseItem(user2.clone()));
push_line(RolloutItem::ResponseItem(user2.clone()));
let assistant2 = ResponseItem::Message {
id: None,
@@ -3533,7 +3583,7 @@ mod tests {
}],
};
live_history.record_items(std::iter::once(&assistant2), turn_context.truncation_policy);
rollout_items.push(RolloutItem::ResponseItem(assistant2.clone()));
push_line(RolloutItem::ResponseItem(assistant2.clone()));
let summary2 = "summary two";
let snapshot2 = live_history.get_history();
@@ -3544,7 +3594,7 @@ mod tests {
summary2,
);
live_history.replace(rebuilt2);
rollout_items.push(RolloutItem::Compacted(CompactedItem {
push_line(RolloutItem::Compacted(CompactedItem {
message: summary2.to_string(),
replacement_history: None,
}));
@@ -3557,7 +3607,7 @@ mod tests {
}],
};
live_history.record_items(std::iter::once(&user3), turn_context.truncation_policy);
rollout_items.push(RolloutItem::ResponseItem(user3.clone()));
push_line(RolloutItem::ResponseItem(user3.clone()));
let assistant3 = ResponseItem::Message {
id: None,
@@ -3567,7 +3617,7 @@ mod tests {
}],
};
live_history.record_items(std::iter::once(&assistant3), turn_context.truncation_policy);
rollout_items.push(RolloutItem::ResponseItem(assistant3.clone()));
push_line(RolloutItem::ResponseItem(assistant3.clone()));
(rollout_items, live_history.get_history())
}

View File

@@ -182,14 +182,17 @@ async fn forward_events(
// ignore all legacy delta events
Event {
id: _,
agent_id: _,
msg: EventMsg::AgentMessageDelta(_) | EventMsg::AgentReasoningDelta(_),
} => {}
Event {
id: _,
agent_id: _,
msg: EventMsg::SessionConfigured(_),
} => {}
Event {
id,
agent_id: _,
msg: EventMsg::ExecApprovalRequest(event),
} => {
// Initiate approval via parent session; do not surface to consumer.
@@ -205,6 +208,7 @@ async fn forward_events(
}
Event {
id,
agent_id: _,
msg: EventMsg::ApplyPatchApprovalRequest(event),
} => {
handle_patch_approval(
@@ -372,6 +376,7 @@ mod tests {
tx_out
.send(Event {
id: "full".to_string(),
agent_id: None,
msg: EventMsg::TurnAborted(TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -391,6 +396,7 @@ mod tests {
tx_events
.send(Event {
id: "evt".to_string(),
agent_id: None,
msg: EventMsg::RawResponseItem(RawResponseItemEvent {
item: ResponseItem::CustomToolCall {
id: None,

View File

@@ -68,7 +68,7 @@ async fn run_compact_task_inner(
) {
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
let mut history = sess.clone_history().await;
let mut history = sess.clone_history(&turn_context.agent_id).await;
history.record_items(
&[initial_input_for_turn.into()],
turn_context.truncation_policy,
@@ -92,7 +92,8 @@ async fn run_compact_task_inner(
final_output_json_schema: turn_context.final_output_json_schema.clone(),
truncation_policy: Some(turn_context.truncation_policy.into()),
});
sess.persist_rollout_items(&[rollout_item]).await;
sess.persist_rollout_items(&turn_context.agent_id, &[rollout_item])
.await;
loop {
let turn_input = history.get_history_for_prompt();
@@ -155,7 +156,10 @@ async fn run_compact_task_inner(
}
}
let history_snapshot = sess.clone_history().await.get_history();
let history_snapshot = sess
.clone_history(&turn_context.agent_id)
.await
.get_history();
let summary_suffix =
get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default();
let summary_text = format!("{SUMMARY_PREFIX}\n{summary_suffix}");
@@ -169,14 +173,16 @@ async fn run_compact_task_inner(
.cloned()
.collect();
new_history.extend(ghost_snapshots);
sess.replace_history(new_history).await;
sess.replace_history(&turn_context.agent_id, new_history)
.await;
sess.recompute_token_usage(&turn_context).await;
let rollout_item = RolloutItem::Compacted(CompactedItem {
message: summary_text.clone(),
replacement_history: None,
});
sess.persist_rollout_items(&[rollout_item]).await;
sess.persist_rollout_items(&turn_context.agent_id, &[rollout_item])
.await;
let event = EventMsg::ContextCompacted(ContextCompactedEvent {});
sess.send_event(&turn_context, event).await;

View File

@@ -40,7 +40,7 @@ async fn run_remote_compact_task_inner_impl(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> CodexResult<()> {
let mut history = sess.clone_history().await;
let mut history = sess.clone_history(&turn_context.agent_id).await;
let prompt = Prompt {
input: history.get_history_for_prompt(),
tools: vec![],
@@ -64,15 +64,19 @@ async fn run_remote_compact_task_inner_impl(
if !ghost_snapshots.is_empty() {
new_history.extend(ghost_snapshots);
}
sess.replace_history(new_history.clone()).await;
sess.replace_history(&turn_context.agent_id, new_history.clone())
.await;
sess.recompute_token_usage(turn_context).await;
let compacted_item = CompactedItem {
message: String::new(),
replacement_history: Some(new_history),
};
sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)])
.await;
sess.persist_rollout_items(
&turn_context.agent_id,
&[RolloutItem::Compacted(compacted_item)],
)
.await;
let event = EventMsg::ContextCompacted(ContextCompactedEvent {});
sess.send_event(turn_context, event).await;

View File

@@ -20,8 +20,10 @@ use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ModelPreset;
use codex_protocol::protocol::DEFAULT_AGENT_ID;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionSource;
use std::collections::HashMap;
use std::path::PathBuf;

View File

@@ -20,6 +20,7 @@ use crate::error::CodexErr;
use crate::error::Result;
use crate::error::SandboxErr;
use crate::get_platform_sandbox;
use crate::protocol::AgentId;
use crate::protocol::Event;
use crate::protocol::EventMsg;
use crate::protocol::ExecCommandOutputDeltaEvent;
@@ -123,6 +124,7 @@ pub enum SandboxType {
#[derive(Clone)]
pub struct StdoutStream {
pub sub_id: String,
pub agent_id: AgentId,
pub call_id: String,
pub tx_event: Sender<Event>,
}
@@ -715,6 +717,7 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
});
let event = Event {
id: stream.sub_id.clone(),
agent_id: Some(stream.agent_id.clone()),
msg,
};
#[allow(clippy::let_unit_value)]

View File

@@ -159,6 +159,7 @@ impl ElicitationRequestManager {
let _ = tx_event
.send(Event {
id: "mcp_elicitation_request".to_string(),
agent_id: None,
msg: EventMsg::ElicitationRequest(ElicitationRequestEvent {
server_name,
id,
@@ -373,6 +374,7 @@ impl McpConnectionManager {
let _ = tx_event
.send(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
agent_id: None,
msg: EventMsg::McpStartupComplete(summary),
})
.await;
@@ -664,6 +666,7 @@ async fn emit_update(
tx_event
.send(Event {
id: INITIAL_SUBMIT_ID.to_owned(),
agent_id: None,
msg: EventMsg::McpStartupUpdate(update),
})
.await

View File

@@ -5,7 +5,7 @@
//! JSON-Lines tooling. Each record has the following schema:
//!
//! ````text
//! {"conversation_id":"<uuid>","ts":<unix_seconds>,"text":"<message>"}
//! {"conversation_id":"<uuid>","agent_id":"<agent>","ts":<unix_seconds>,"text":"<message>"}
//! ````
//!
//! To minimise the chance of interleaved writes when multiple processes are
@@ -54,6 +54,8 @@ const RETRY_SLEEP: Duration = Duration::from_millis(100);
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct HistoryEntry {
pub session_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub agent_id: Option<String>,
pub ts: u64,
pub text: String,
}
@@ -69,6 +71,7 @@ fn history_filepath(config: &Config) -> PathBuf {
/// which entails a small amount of blocking I/O internally.
pub(crate) async fn append_entry(
text: &str,
agent_id: Option<&str>,
conversation_id: &ConversationId,
config: &Config,
) -> Result<()> {
@@ -99,6 +102,7 @@ pub(crate) async fn append_entry(
// Construct the JSON line first so we can write it in a single syscall.
let entry = HistoryEntry {
session_id: conversation_id.to_string(),
agent_id: agent_id.map(str::to_string),
ts,
text: text.to_string(),
};
@@ -416,11 +420,13 @@ mod tests {
let entries = vec![
HistoryEntry {
session_id: "first-session".to_string(),
agent_id: None,
ts: 1,
text: "first".to_string(),
},
HistoryEntry {
session_id: "second-session".to_string(),
agent_id: None,
ts: 2,
text: "second".to_string(),
},
@@ -451,11 +457,13 @@ mod tests {
let initial = HistoryEntry {
session_id: "first-session".to_string(),
agent_id: None,
ts: 1,
text: "first".to_string(),
};
let appended = HistoryEntry {
session_id: "second-session".to_string(),
agent_id: None,
ts: 2,
text: "second".to_string(),
};
@@ -504,7 +512,7 @@ mod tests {
let history_path = codex_home.path().join("history.jsonl");
append_entry(&entry_one, &conversation_id, &config)
append_entry(&entry_one, None, &conversation_id, &config)
.await
.expect("write first entry");
@@ -514,7 +522,7 @@ mod tests {
config.history.max_bytes =
Some(usize::try_from(limit_bytes).expect("limit should fit into usize"));
append_entry(&entry_two, &conversation_id, &config)
append_entry(&entry_two, None, &conversation_id, &config)
.await
.expect("write second entry");
@@ -551,13 +559,13 @@ mod tests {
let history_path = codex_home.path().join("history.jsonl");
append_entry(&short_entry, &conversation_id, &config)
append_entry(&short_entry, None, &conversation_id, &config)
.await
.expect("write first entry");
let short_entry_len = std::fs::metadata(&history_path).expect("metadata").len();
append_entry(&long_entry, &conversation_id, &config)
append_entry(&long_entry, None, &conversation_id, &config)
.await
.expect("write second entry");
@@ -572,7 +580,7 @@ mod tests {
.expect("max bytes should fit into usize"),
);
append_entry(&long_entry, &conversation_id, &config)
append_entry(&long_entry, None, &conversation_id, &config)
.await
.expect("write third entry");

View File

@@ -582,6 +582,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> {
let conversation_id = ConversationId::from_string(&uuid.to_string())?;
let meta_line = RolloutLine {
timestamp: ts.to_string(),
agent_id: None,
item: RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
id: conversation_id,
@@ -600,6 +601,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> {
let user_event_line = RolloutLine {
timestamp: ts.to_string(),
agent_id: None,
item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent {
message: "hello".into(),
images: None,
@@ -611,6 +613,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> {
for idx in 0..total_messages {
let response_line = RolloutLine {
timestamp: format!("{ts}-{idx:02}"),
agent_id: None,
item: RolloutItem::ResponseItem(ResponseItem::Message {
id: None,
role: "assistant".into(),

View File

@@ -8,14 +8,6 @@ mod user_shell;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use tokio::select;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tokio_util::task::AbortOnDropHandle;
use tracing::trace;
use tracing::warn;
use crate::AuthManager;
use crate::codex::Session;
use crate::codex::TurnContext;
@@ -27,7 +19,15 @@ use crate::protocol::TurnAbortedEvent;
use crate::state::ActiveTurn;
use crate::state::RunningTask;
use crate::state::TaskKind;
use async_trait::async_trait;
use codex_protocol::protocol::AgentId;
use codex_protocol::user_input::UserInput;
use tokio::select;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use tokio_util::task::AbortOnDropHandle;
use tracing::trace;
use tracing::warn;
pub(crate) use compact::CompactTask;
pub(crate) use ghost_snapshot::GhostSnapshotTask;

View File

@@ -59,7 +59,7 @@ impl SessionTask for UndoTask {
return None;
}
let mut history = sess.clone_history().await;
let mut history = sess.clone_history(&ctx.agent_id).await;
let mut items = history.get_history();
let mut completed = UndoCompletedEvent {
success: false,
@@ -96,7 +96,7 @@ impl SessionTask for UndoTask {
match restore_result {
Ok(Ok(())) => {
items.remove(idx);
sess.replace_history(items).await;
sess.replace_history(&ctx.agent_id, items).await;
let short_id: String = commit_id.chars().take(7).collect();
info!(commit_id = commit_id, "Undo restored ghost snapshot");
completed.success = true;

View File

@@ -108,6 +108,7 @@ impl SessionTask for UserShellCommandTask {
let stdout_stream = Some(StdoutStream {
sub_id: turn_context.sub_id.clone(),
agent_id: turn_context.agent_id.clone(),
call_id: call_id.clone(),
tx_event: session.get_tx_event(),
});

View File

@@ -65,7 +65,10 @@ impl ToolHandler for ViewImageHandler {
let event_path = abs_path.clone();
session
.inject_input(vec![UserInput::LocalImage { path: abs_path }])
.inject_input(
&turn.agent_id,
vec![UserInput::LocalImage { path: abs_path }],
)
.await
.map_err(|_| {
FunctionCallError::RespondToModel(

View File

@@ -71,6 +71,7 @@ impl ApplyPatchRuntime {
fn stdout_stream(ctx: &ToolCtx<'_>) -> Option<crate::exec::StdoutStream> {
Some(crate::exec::StdoutStream {
sub_id: ctx.turn.sub_id.clone(),
agent_id: ctx.turn.agent_id.clone(),
call_id: ctx.call_id.clone(),
tx_event: ctx.session.get_tx_event(),
})

View File

@@ -56,6 +56,7 @@ impl ShellRuntime {
fn stdout_stream(ctx: &ToolCtx<'_>) -> Option<crate::exec::StdoutStream> {
Some(crate::exec::StdoutStream {
sub_id: ctx.turn.sub_id.clone(),
agent_id: ctx.turn.agent_id.clone(),
call_id: ctx.call_id.clone(),
tx_event: ctx.session.get_tx_event(),
})

View File

@@ -8,6 +8,7 @@ use codex_core::protocol::EventMsg;
use codex_core::protocol::InitialHistory;
use codex_core::protocol::ResumedHistory;
use codex_core::protocol::RolloutItem;
use codex_core::protocol::RolloutLine;
use codex_core::protocol::TurnContextItem;
use codex_core::protocol::WarningEvent;
use codex_protocol::ConversationId;
@@ -37,7 +38,11 @@ fn resume_history(
InitialHistory::Resumed(ResumedHistory {
conversation_id: ConversationId::default(),
history: vec![RolloutItem::TurnContext(turn_ctx)],
history: vec![RolloutLine {
timestamp: String::new(),
agent_id: None,
item: RolloutItem::TurnContext(turn_ctx),
}],
rollout_path: rollout_path.to_path_buf(),
})
}

View File

@@ -160,7 +160,7 @@ impl EventProcessor for EventProcessorWithHumanOutput {
}
fn process_event(&mut self, event: Event) -> CodexStatus {
let Event { id: _, msg } = event;
let Event { id: _, msg, .. } = event;
match msg {
EventMsg::Error(ErrorEvent { message, .. }) => {
let prefix = "ERROR:".style(self.red);

View File

@@ -510,6 +510,7 @@ impl EventProcessor for EventProcessorWithJsonOutput {
fn print_config_summary(&mut self, _: &Config, _: &str, ev: &SessionConfiguredEvent) {
self.process_event(Event {
id: "".to_string(),
agent_id: None,
msg: EventMsg::SessionConfigured(ev.clone()),
});
}

View File

@@ -61,6 +61,7 @@ use std::time::Duration;
fn event(id: &str, msg: EventMsg) -> Event {
Event {
id: id.to_string(),
agent_id: None,
msg,
}
}

View File

@@ -68,6 +68,7 @@ pub async fn run_codex_tool_session(
let session_configured_event = Event {
// Use a fake id value for now.
id: "".to_string(),
agent_id: None,
msg: EventMsg::SessionConfigured(session_configured.clone()),
};
outgoing

View File

@@ -255,6 +255,7 @@ mod tests {
let rollout_file = NamedTempFile::new()?;
let event = Event {
id: "1".to_string(),
agent_id: None,
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: conversation_id,
model: "gpt-4o".to_string(),
@@ -309,6 +310,7 @@ mod tests {
};
let event = Event {
id: "1".to_string(),
agent_id: None,
msg: EventMsg::SessionConfigured(session_configured_event.clone()),
};
let meta = OutgoingNotificationMeta {

View File

@@ -1624,6 +1624,7 @@ mod tests {
app.chat_widget.handle_codex_event(Event {
id: String::new(),
agent_id: None,
msg: EventMsg::SessionConfigured(event),
});

View File

@@ -36,6 +36,7 @@ pub(crate) fn spawn_agent(
eprintln!("{message}");
app_event_tx_clone.send(AppEvent::CodexEvent(Event {
id: "".to_string(),
agent_id: None,
msg: EventMsg::Error(err.to_error_event(None)),
}));
app_event_tx_clone.send(AppEvent::ExitRequest);
@@ -48,6 +49,7 @@ pub(crate) fn spawn_agent(
let ev = codex_core::protocol::Event {
// The `id` does not matter for rendering, so we can use a fake value.
id: "".to_string(),
agent_id: None,
msg: codex_core::protocol::EventMsg::SessionConfigured(session_configured),
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
@@ -85,6 +87,7 @@ pub(crate) fn spawn_agent_from_existing(
// Forward the captured `SessionConfigured` event so it can be rendered in the UI.
let ev = codex_core::protocol::Event {
id: "".to_string(),
agent_id: None,
msg: codex_core::protocol::EventMsg::SessionConfigured(session_configured),
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));

View File

@@ -127,6 +127,7 @@ async fn resumed_initial_messages_render_history() {
chat.handle_codex_event(Event {
id: "initial".into(),
agent_id: None,
msg: EventMsg::SessionConfigured(configured),
});
@@ -159,6 +160,7 @@ async fn entered_review_mode_uses_request_hint() {
chat.handle_codex_event(Event {
id: "review-start".into(),
agent_id: None,
msg: EventMsg::EnteredReviewMode(ReviewRequest {
target: ReviewTarget::BaseBranch {
branch: "feature".to_string(),
@@ -180,6 +182,7 @@ async fn entered_review_mode_defaults_to_current_changes_banner() {
chat.handle_codex_event(Event {
id: "review-start".into(),
agent_id: None,
msg: EventMsg::EnteredReviewMode(ReviewRequest {
target: ReviewTarget::UncommittedChanges,
user_facing_hint: None,
@@ -203,6 +206,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "token-before".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(make_token_info(pre_review_tokens, context_window)),
rate_limits: None,
@@ -212,6 +216,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "review-start".into(),
agent_id: None,
msg: EventMsg::EnteredReviewMode(ReviewRequest {
target: ReviewTarget::BaseBranch {
branch: "feature".to_string(),
@@ -222,6 +227,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "token-review".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(make_token_info(review_tokens, context_window)),
rate_limits: None,
@@ -231,6 +237,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "review-end".into(),
agent_id: None,
msg: EventMsg::ExitedReviewMode(ExitedReviewModeEvent {
review_output: None,
}),
@@ -251,6 +258,7 @@ async fn token_count_none_resets_context_indicator() {
chat.handle_codex_event(Event {
id: "token-before".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(make_token_info(pre_compact_tokens, context_window)),
rate_limits: None,
@@ -260,6 +268,7 @@ async fn token_count_none_resets_context_indicator() {
chat.handle_codex_event(Event {
id: "token-cleared".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: None,
rate_limits: None,
@@ -290,6 +299,7 @@ async fn context_indicator_shows_used_tokens_when_window_unknown() {
chat.handle_codex_event(Event {
id: "token-usage".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(token_info),
rate_limits: None,
@@ -727,6 +737,7 @@ async fn exec_approval_emits_proposed_command_and_decision_history() {
};
chat.handle_codex_event(Event {
id: "sub-short".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
@@ -771,6 +782,7 @@ async fn exec_approval_decision_truncates_multiline_and_long_commands() {
};
chat.handle_codex_event(Event {
id: "sub-multi".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev_multi),
});
let proposed_multi = drain_insert_history(&mut rx);
@@ -821,6 +833,7 @@ async fn exec_approval_decision_truncates_multiline_and_long_commands() {
};
chat.handle_codex_event(Event {
id: "sub-long".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev_long),
});
let proposed_long = drain_insert_history(&mut rx);
@@ -863,6 +876,7 @@ fn begin_exec_with_source(
};
chat.handle_codex_event(Event {
id: call_id.to_string(),
agent_id: None,
msg: EventMsg::ExecCommandBegin(event.clone()),
});
event
@@ -932,6 +946,7 @@ fn end_exec(
} = begin_event;
chat.handle_codex_event(Event {
id: call_id.clone(),
agent_id: None,
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
process_id,
@@ -1187,6 +1202,7 @@ async fn exec_end_without_begin_uses_event_command() {
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
chat.handle_codex_event(Event {
id: "call-orphan".to_string(),
agent_id: None,
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: "call-orphan".to_string(),
process_id: None,
@@ -1497,6 +1513,7 @@ async fn undo_success_events_render_info_messages() {
chat.handle_codex_event(Event {
id: "turn-1".to_string(),
agent_id: None,
msg: EventMsg::UndoStarted(UndoStartedEvent {
message: Some("Undo requested for the last turn...".to_string()),
}),
@@ -1508,6 +1525,7 @@ async fn undo_success_events_render_info_messages() {
chat.handle_codex_event(Event {
id: "turn-1".to_string(),
agent_id: None,
msg: EventMsg::UndoCompleted(UndoCompletedEvent {
success: true,
message: None,
@@ -1534,6 +1552,7 @@ async fn undo_failure_events_render_error_message() {
chat.handle_codex_event(Event {
id: "turn-2".to_string(),
agent_id: None,
msg: EventMsg::UndoStarted(UndoStartedEvent { message: None }),
});
assert!(
@@ -1543,6 +1562,7 @@ async fn undo_failure_events_render_error_message() {
chat.handle_codex_event(Event {
id: "turn-2".to_string(),
agent_id: None,
msg: EventMsg::UndoCompleted(UndoCompletedEvent {
success: false,
message: Some("Failed to restore workspace state.".to_string()),
@@ -1569,6 +1589,7 @@ async fn undo_started_hides_interrupt_hint() {
chat.handle_codex_event(Event {
id: "turn-hint".to_string(),
agent_id: None,
msg: EventMsg::UndoStarted(UndoStartedEvent { message: None }),
});
@@ -1692,6 +1713,7 @@ async fn view_image_tool_call_adds_history_cell() {
chat.handle_codex_event(Event {
id: "sub-image".into(),
agent_id: None,
msg: EventMsg::ViewImageToolCall(ViewImageToolCallEvent {
call_id: "call-image".into(),
path: image_path,
@@ -1717,6 +1739,7 @@ async fn interrupt_exec_marks_failed_snapshot() {
// cause the active exec cell to be finalized as failed and flushed.
chat.handle_codex_event(Event {
id: "call-int".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -1742,6 +1765,7 @@ async fn interrupted_turn_error_message_snapshot() {
// Simulate an in-progress task so the widget is in a running state.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -1750,6 +1774,7 @@ async fn interrupted_turn_error_message_snapshot() {
// Abort the turn (like pressing Esc) and drain inserted history.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -2394,6 +2419,7 @@ async fn approval_modal_exec_snapshot() -> anyhow::Result<()> {
};
chat.handle_codex_event(Event {
id: "sub-approve".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
// Render to a fixed-size test terminal and snapshot.
@@ -2447,6 +2473,7 @@ async fn approval_modal_exec_without_reason_snapshot() -> anyhow::Result<()> {
};
chat.handle_codex_event(Event {
id: "sub-approve-noreason".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
@@ -2489,6 +2516,7 @@ async fn approval_modal_patch_snapshot() -> anyhow::Result<()> {
};
chat.handle_codex_event(Event {
id: "sub-approve-patch".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
@@ -2525,6 +2553,7 @@ async fn interrupt_restores_queued_messages_into_composer() {
// Deliver a TurnAborted event with Interrupted reason (as if Esc was pressed).
chat.handle_codex_event(Event {
id: "turn-1".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -2563,6 +2592,7 @@ async fn interrupt_prepends_queued_messages_before_existing_composer_text() {
chat.handle_codex_event(Event {
id: "turn-1".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -2608,12 +2638,14 @@ async fn ui_snapshots_small_heights_task_running() {
// Activate status line
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
});
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Thinking**".into(),
}),
@@ -2639,6 +2671,7 @@ async fn status_widget_and_approval_modal_snapshot() {
// Begin a running task so the status indicator would be active.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -2646,6 +2679,7 @@ async fn status_widget_and_approval_modal_snapshot() {
// Provide a deterministic header for the status line.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Analyzing**".into(),
}),
@@ -2668,6 +2702,7 @@ async fn status_widget_and_approval_modal_snapshot() {
};
chat.handle_codex_event(Event {
id: "sub-approve-exec".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
@@ -2691,6 +2726,7 @@ async fn status_widget_active_snapshot() {
// Activate the status indicator by simulating a task start.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -2698,6 +2734,7 @@ async fn status_widget_active_snapshot() {
// Provide a deterministic header via a bold reasoning chunk.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Analyzing**".into(),
}),
@@ -2719,6 +2756,7 @@ async fn mcp_startup_header_booting_snapshot() {
chat.handle_codex_event(Event {
id: "mcp-1".into(),
agent_id: None,
msg: EventMsg::McpStartupUpdate(McpStartupUpdateEvent {
server: "alpha".into(),
status: McpStartupStatus::Starting,
@@ -2740,6 +2778,7 @@ async fn background_event_updates_status_header() {
chat.handle_codex_event(Event {
id: "bg-1".into(),
agent_id: None,
msg: EventMsg::BackgroundEvent(BackgroundEventEvent {
message: "Waiting for `vim`".to_string(),
}),
@@ -2771,6 +2810,7 @@ async fn apply_patch_events_emit_history_cells() {
};
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
let cells = drain_insert_history(&mut rx);
@@ -2811,6 +2851,7 @@ async fn apply_patch_events_emit_history_cells() {
};
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(begin),
});
let cells = drain_insert_history(&mut rx);
@@ -2839,6 +2880,7 @@ async fn apply_patch_events_emit_history_cells() {
};
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyEnd(end),
});
let cells = drain_insert_history(&mut rx);
@@ -2861,6 +2903,7 @@ async fn apply_patch_manual_approval_adjusts_header() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2880,6 +2923,7 @@ async fn apply_patch_manual_approval_adjusts_header() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2910,6 +2954,7 @@ async fn apply_patch_manual_flow_snapshot() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2933,6 +2978,7 @@ async fn apply_patch_manual_flow_snapshot() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2970,6 +3016,7 @@ async fn apply_patch_approval_sends_op_with_submission_id() {
};
chat.handle_codex_event(Event {
id: "sub-123".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
@@ -3001,6 +3048,7 @@ async fn apply_patch_full_flow_integration_like() {
);
chat.handle_codex_event(Event {
id: "sub-xyz".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -3042,6 +3090,7 @@ async fn apply_patch_full_flow_integration_like() {
);
chat.handle_codex_event(Event {
id: "sub-xyz".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -3056,6 +3105,7 @@ async fn apply_patch_full_flow_integration_like() {
);
chat.handle_codex_event(Event {
id: "sub-xyz".into(),
agent_id: None,
msg: EventMsg::PatchApplyEnd(PatchApplyEndEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -3081,6 +3131,7 @@ async fn apply_patch_untrusted_shows_approval_modal() -> anyhow::Result<()> {
);
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -3132,6 +3183,7 @@ async fn apply_patch_request_shows_diff_summary() -> anyhow::Result<()> {
);
chat.handle_codex_event(Event {
id: "sub-apply".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "call-apply".into(),
turn_id: "turn-apply".into(),
@@ -3204,6 +3256,7 @@ async fn plan_update_renders_history_cell() {
};
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::PlanUpdate(update),
});
let cells = drain_insert_history(&mut rx);
@@ -3225,6 +3278,7 @@ async fn stream_error_updates_status_indicator() {
let msg = "Reconnecting... 2/5";
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::StreamError(StreamErrorEvent {
message: msg.to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
@@ -3248,6 +3302,7 @@ async fn warning_event_adds_warning_history_cell() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::Warning(WarningEvent {
message: "test warning message".to_string(),
}),
@@ -3267,6 +3322,7 @@ async fn stream_recovery_restores_previous_status_header() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "task".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -3274,6 +3330,7 @@ async fn stream_recovery_restores_previous_status_header() {
drain_insert_history(&mut rx);
chat.handle_codex_event(Event {
id: "retry".into(),
agent_id: None,
msg: EventMsg::StreamError(StreamErrorEvent {
message: "Reconnecting... 1/5".to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
@@ -3282,6 +3339,7 @@ async fn stream_recovery_restores_previous_status_header() {
drain_insert_history(&mut rx);
chat.handle_codex_event(Event {
id: "delta".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "hello".to_string(),
}),
@@ -3302,6 +3360,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// Begin turn
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -3310,6 +3369,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// First finalized assistant message
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "First message".into(),
}),
@@ -3318,6 +3378,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// Second finalized assistant message in the same turn
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Second message".into(),
}),
@@ -3326,6 +3387,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// End turn
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: None,
}),
@@ -3356,12 +3418,14 @@ async fn final_reasoning_then_message_without_deltas_are_rendered() {
// No deltas; only final reasoning followed by final message.
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoning(AgentReasoningEvent {
text: "I will first analyze the request.".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Here is the result.".into(),
}),
@@ -3383,24 +3447,28 @@ async fn deltas_then_same_final_message_are_rendered_snapshot() {
// Stream some reasoning deltas first.
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "I will ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "first analyze the ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "request.".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoning(AgentReasoningEvent {
text: "request.".into(),
}),
@@ -3409,12 +3477,14 @@ async fn deltas_then_same_final_message_are_rendered_snapshot() {
// Then stream answer deltas, followed by the exact same final message.
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "Here is the ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "result.".into(),
}),
@@ -3422,6 +3492,7 @@ async fn deltas_then_same_final_message_are_rendered_snapshot() {
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Here is the result.".into(),
}),
@@ -3445,6 +3516,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent { message: "Im going to search the repo for where “Change Approved” is rendered to update that view.".into() }),
});
@@ -3464,6 +3536,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
chat.handle_codex_event(Event {
id: "c1".into(),
agent_id: None,
msg: EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: "c1".into(),
process_id: None,
@@ -3477,6 +3550,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
});
chat.handle_codex_event(Event {
id: "c1".into(),
agent_id: None,
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: "c1".into(),
process_id: None,
@@ -3496,12 +3570,14 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
});
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
});
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Investigating rendering code**".into(),
}),
@@ -3540,6 +3616,7 @@ async fn chatwidget_markdown_code_blocks_vt100_snapshot() {
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -3588,6 +3665,7 @@ printf 'fenced within fenced\n'
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
});
// Drive commit ticks and drain emitted history lines into the vt100 buffer.
@@ -3611,6 +3689,7 @@ printf 'fenced within fenced\n'
// Finalize the stream without sending a final AgentMessage, to flush any tail.
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: None,
}),
@@ -3628,6 +3707,7 @@ async fn chatwidget_tall() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),

View File

@@ -2735,6 +2735,7 @@ mod tests {
app.chat_widget.handle_codex_event(Event {
id: String::new(),
agent_id: None,
msg: EventMsg::SessionConfigured(event),
});

View File

@@ -24,6 +24,7 @@ use codex_core::protocol::AgentReasoningRawContentEvent;
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
use codex_core::protocol::BackgroundEventEvent;
use codex_core::protocol::CreditsSnapshot;
use codex_core::protocol::DEFAULT_AGENT_ID;
use codex_core::protocol::DeprecationNoticeEvent;
use codex_core::protocol::ErrorEvent;
use codex_core::protocol::Event;
@@ -1632,6 +1633,7 @@ impl ChatWidget {
self.app_event_tx.send(AppEvent::CodexEvent(Event {
id: "1".to_string(),
agent_id: None,
// msg: EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
// call_id: "1".to_string(),
// command: vec!["git".into(), "apply".into()],
@@ -1797,10 +1799,25 @@ impl ChatWidget {
}
pub(crate) fn handle_codex_event(&mut self, event: Event) {
let Event { id, msg } = event;
let Event { id, msg, agent_id } = event;
if !self.should_handle_agent_event(agent_id.as_deref(), &msg) {
return;
}
self.dispatch_event_msg(Some(id), msg, false);
}
fn should_handle_agent_event(&self, agent_id: Option<&str>, msg: &EventMsg) -> bool {
let agent_id = agent_id.unwrap_or(DEFAULT_AGENT_ID);
if agent_id == DEFAULT_AGENT_ID {
return true;
}
matches!(
msg,
EventMsg::ExecApprovalRequest(_) | EventMsg::ApplyPatchApprovalRequest(_)
)
}
/// Dispatch a protocol `EventMsg` to the appropriate handler.
///
/// `id` is `Some` for live events and `None` for replayed events from

View File

@@ -36,6 +36,7 @@ pub(crate) fn spawn_agent(
eprintln!("{message}");
app_event_tx_clone.send(AppEvent::CodexEvent(Event {
id: "".to_string(),
agent_id: None,
msg: EventMsg::Error(err.to_error_event(None)),
}));
app_event_tx_clone.send(AppEvent::ExitRequest);
@@ -48,6 +49,7 @@ pub(crate) fn spawn_agent(
let ev = codex_core::protocol::Event {
// The `id` does not matter for rendering, so we can use a fake value.
id: "".to_string(),
agent_id: None,
msg: codex_core::protocol::EventMsg::SessionConfigured(session_configured),
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
@@ -85,6 +87,7 @@ pub(crate) fn spawn_agent_from_existing(
// Forward the captured `SessionConfigured` event so it can be rendered in the UI.
let ev = codex_core::protocol::Event {
id: "".to_string(),
agent_id: None,
msg: codex_core::protocol::EventMsg::SessionConfigured(session_configured),
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));

View File

@@ -125,6 +125,7 @@ async fn resumed_initial_messages_render_history() {
chat.handle_codex_event(Event {
id: "initial".into(),
agent_id: None,
msg: EventMsg::SessionConfigured(configured),
});
@@ -157,6 +158,7 @@ async fn entered_review_mode_uses_request_hint() {
chat.handle_codex_event(Event {
id: "review-start".into(),
agent_id: None,
msg: EventMsg::EnteredReviewMode(ReviewRequest {
target: ReviewTarget::BaseBranch {
branch: "feature".to_string(),
@@ -178,6 +180,7 @@ async fn entered_review_mode_defaults_to_current_changes_banner() {
chat.handle_codex_event(Event {
id: "review-start".into(),
agent_id: None,
msg: EventMsg::EnteredReviewMode(ReviewRequest {
target: ReviewTarget::UncommittedChanges,
user_facing_hint: None,
@@ -201,6 +204,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "token-before".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(make_token_info(pre_review_tokens, context_window)),
rate_limits: None,
@@ -210,6 +214,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "review-start".into(),
agent_id: None,
msg: EventMsg::EnteredReviewMode(ReviewRequest {
target: ReviewTarget::BaseBranch {
branch: "feature".to_string(),
@@ -220,6 +225,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "token-review".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(make_token_info(review_tokens, context_window)),
rate_limits: None,
@@ -229,6 +235,7 @@ async fn review_restores_context_window_indicator() {
chat.handle_codex_event(Event {
id: "review-end".into(),
agent_id: None,
msg: EventMsg::ExitedReviewMode(ExitedReviewModeEvent {
review_output: None,
}),
@@ -249,6 +256,7 @@ async fn token_count_none_resets_context_indicator() {
chat.handle_codex_event(Event {
id: "token-before".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(make_token_info(pre_compact_tokens, context_window)),
rate_limits: None,
@@ -258,6 +266,7 @@ async fn token_count_none_resets_context_indicator() {
chat.handle_codex_event(Event {
id: "token-cleared".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: None,
rate_limits: None,
@@ -288,6 +297,7 @@ async fn context_indicator_shows_used_tokens_when_window_unknown() {
chat.handle_codex_event(Event {
id: "token-usage".into(),
agent_id: None,
msg: EventMsg::TokenCount(TokenCountEvent {
info: Some(token_info),
rate_limits: None,
@@ -723,6 +733,7 @@ async fn exec_approval_emits_proposed_command_and_decision_history() {
};
chat.handle_codex_event(Event {
id: "sub-short".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
@@ -767,6 +778,7 @@ async fn exec_approval_decision_truncates_multiline_and_long_commands() {
};
chat.handle_codex_event(Event {
id: "sub-multi".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev_multi),
});
let proposed_multi = drain_insert_history(&mut rx);
@@ -817,6 +829,7 @@ async fn exec_approval_decision_truncates_multiline_and_long_commands() {
};
chat.handle_codex_event(Event {
id: "sub-long".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev_long),
});
let proposed_long = drain_insert_history(&mut rx);
@@ -859,6 +872,7 @@ fn begin_exec_with_source(
};
chat.handle_codex_event(Event {
id: call_id.to_string(),
agent_id: None,
msg: EventMsg::ExecCommandBegin(event.clone()),
});
event
@@ -892,6 +906,7 @@ fn end_exec(
} = begin_event;
chat.handle_codex_event(Event {
id: call_id.clone(),
agent_id: None,
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
process_id,
@@ -1147,6 +1162,7 @@ async fn exec_end_without_begin_uses_event_command() {
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
chat.handle_codex_event(Event {
id: "call-orphan".to_string(),
agent_id: None,
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: "call-orphan".to_string(),
process_id: None,
@@ -1332,6 +1348,7 @@ async fn undo_success_events_render_info_messages() {
chat.handle_codex_event(Event {
id: "turn-1".to_string(),
agent_id: None,
msg: EventMsg::UndoStarted(UndoStartedEvent {
message: Some("Undo requested for the last turn...".to_string()),
}),
@@ -1343,6 +1360,7 @@ async fn undo_success_events_render_info_messages() {
chat.handle_codex_event(Event {
id: "turn-1".to_string(),
agent_id: None,
msg: EventMsg::UndoCompleted(UndoCompletedEvent {
success: true,
message: None,
@@ -1369,6 +1387,7 @@ async fn undo_failure_events_render_error_message() {
chat.handle_codex_event(Event {
id: "turn-2".to_string(),
agent_id: None,
msg: EventMsg::UndoStarted(UndoStartedEvent { message: None }),
});
assert!(
@@ -1378,6 +1397,7 @@ async fn undo_failure_events_render_error_message() {
chat.handle_codex_event(Event {
id: "turn-2".to_string(),
agent_id: None,
msg: EventMsg::UndoCompleted(UndoCompletedEvent {
success: false,
message: Some("Failed to restore workspace state.".to_string()),
@@ -1404,6 +1424,7 @@ async fn undo_started_hides_interrupt_hint() {
chat.handle_codex_event(Event {
id: "turn-hint".to_string(),
agent_id: None,
msg: EventMsg::UndoStarted(UndoStartedEvent { message: None }),
});
@@ -1527,6 +1548,7 @@ async fn view_image_tool_call_adds_history_cell() {
chat.handle_codex_event(Event {
id: "sub-image".into(),
agent_id: None,
msg: EventMsg::ViewImageToolCall(ViewImageToolCallEvent {
call_id: "call-image".into(),
path: image_path,
@@ -1552,6 +1574,7 @@ async fn interrupt_exec_marks_failed_snapshot() {
// cause the active exec cell to be finalized as failed and flushed.
chat.handle_codex_event(Event {
id: "call-int".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -1577,6 +1600,7 @@ async fn interrupted_turn_error_message_snapshot() {
// Simulate an in-progress task so the widget is in a running state.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -1585,6 +1609,7 @@ async fn interrupted_turn_error_message_snapshot() {
// Abort the turn (like pressing Esc) and drain inserted history.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -2056,6 +2081,7 @@ async fn approval_modal_exec_snapshot() {
};
chat.handle_codex_event(Event {
id: "sub-approve".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
// Render to a fixed-size test terminal and snapshot.
@@ -2107,6 +2133,7 @@ async fn approval_modal_exec_without_reason_snapshot() {
};
chat.handle_codex_event(Event {
id: "sub-approve-noreason".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
@@ -2147,6 +2174,7 @@ async fn approval_modal_patch_snapshot() {
};
chat.handle_codex_event(Event {
id: "sub-approve-patch".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
@@ -2181,6 +2209,7 @@ async fn interrupt_restores_queued_messages_into_composer() {
// Deliver a TurnAborted event with Interrupted reason (as if Esc was pressed).
chat.handle_codex_event(Event {
id: "turn-1".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -2219,6 +2248,7 @@ async fn interrupt_prepends_queued_messages_before_existing_composer_text() {
chat.handle_codex_event(Event {
id: "turn-1".into(),
agent_id: None,
msg: EventMsg::TurnAborted(codex_core::protocol::TurnAbortedEvent {
reason: TurnAbortReason::Interrupted,
}),
@@ -2264,12 +2294,14 @@ async fn ui_snapshots_small_heights_task_running() {
// Activate status line
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
});
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Thinking**".into(),
}),
@@ -2295,6 +2327,7 @@ async fn status_widget_and_approval_modal_snapshot() {
// Begin a running task so the status indicator would be active.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -2302,6 +2335,7 @@ async fn status_widget_and_approval_modal_snapshot() {
// Provide a deterministic header for the status line.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Analyzing**".into(),
}),
@@ -2324,6 +2358,7 @@ async fn status_widget_and_approval_modal_snapshot() {
};
chat.handle_codex_event(Event {
id: "sub-approve-exec".into(),
agent_id: None,
msg: EventMsg::ExecApprovalRequest(ev),
});
@@ -2347,6 +2382,7 @@ async fn status_widget_active_snapshot() {
// Activate the status indicator by simulating a task start.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -2354,6 +2390,7 @@ async fn status_widget_active_snapshot() {
// Provide a deterministic header via a bold reasoning chunk.
chat.handle_codex_event(Event {
id: "task-1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Analyzing**".into(),
}),
@@ -2375,6 +2412,7 @@ async fn mcp_startup_header_booting_snapshot() {
chat.handle_codex_event(Event {
id: "mcp-1".into(),
agent_id: None,
msg: EventMsg::McpStartupUpdate(McpStartupUpdateEvent {
server: "alpha".into(),
status: McpStartupStatus::Starting,
@@ -2396,6 +2434,7 @@ async fn background_event_updates_status_header() {
chat.handle_codex_event(Event {
id: "bg-1".into(),
agent_id: None,
msg: EventMsg::BackgroundEvent(BackgroundEventEvent {
message: "Waiting for `vim`".to_string(),
}),
@@ -2427,6 +2466,7 @@ async fn apply_patch_events_emit_history_cells() {
};
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
let cells = drain_insert_history(&mut rx);
@@ -2467,6 +2507,7 @@ async fn apply_patch_events_emit_history_cells() {
};
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(begin),
});
let cells = drain_insert_history(&mut rx);
@@ -2495,6 +2536,7 @@ async fn apply_patch_events_emit_history_cells() {
};
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyEnd(end),
});
let cells = drain_insert_history(&mut rx);
@@ -2517,6 +2559,7 @@ async fn apply_patch_manual_approval_adjusts_header() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2536,6 +2579,7 @@ async fn apply_patch_manual_approval_adjusts_header() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2566,6 +2610,7 @@ async fn apply_patch_manual_flow_snapshot() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2589,6 +2634,7 @@ async fn apply_patch_manual_flow_snapshot() {
);
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "c1".into(),
turn_id: "turn-c1".into(),
@@ -2626,6 +2672,7 @@ async fn apply_patch_approval_sends_op_with_submission_id() {
};
chat.handle_codex_event(Event {
id: "sub-123".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ev),
});
@@ -2657,6 +2704,7 @@ async fn apply_patch_full_flow_integration_like() {
);
chat.handle_codex_event(Event {
id: "sub-xyz".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -2698,6 +2746,7 @@ async fn apply_patch_full_flow_integration_like() {
);
chat.handle_codex_event(Event {
id: "sub-xyz".into(),
agent_id: None,
msg: EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -2712,6 +2761,7 @@ async fn apply_patch_full_flow_integration_like() {
);
chat.handle_codex_event(Event {
id: "sub-xyz".into(),
agent_id: None,
msg: EventMsg::PatchApplyEnd(PatchApplyEndEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -2737,6 +2787,7 @@ async fn apply_patch_untrusted_shows_approval_modal() {
);
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "call-1".into(),
turn_id: "turn-call-1".into(),
@@ -2786,6 +2837,7 @@ async fn apply_patch_request_shows_diff_summary() {
);
chat.handle_codex_event(Event {
id: "sub-apply".into(),
agent_id: None,
msg: EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
call_id: "call-apply".into(),
turn_id: "turn-apply".into(),
@@ -2856,6 +2908,7 @@ async fn plan_update_renders_history_cell() {
};
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::PlanUpdate(update),
});
let cells = drain_insert_history(&mut rx);
@@ -2877,6 +2930,7 @@ async fn stream_error_updates_status_indicator() {
let msg = "Reconnecting... 2/5";
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::StreamError(StreamErrorEvent {
message: msg.to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
@@ -2900,6 +2954,7 @@ async fn warning_event_adds_warning_history_cell() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "sub-1".into(),
agent_id: None,
msg: EventMsg::Warning(WarningEvent {
message: "test warning message".to_string(),
}),
@@ -2919,6 +2974,7 @@ async fn stream_recovery_restores_previous_status_header() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "task".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -2926,6 +2982,7 @@ async fn stream_recovery_restores_previous_status_header() {
drain_insert_history(&mut rx);
chat.handle_codex_event(Event {
id: "retry".into(),
agent_id: None,
msg: EventMsg::StreamError(StreamErrorEvent {
message: "Reconnecting... 1/5".to_string(),
codex_error_info: Some(CodexErrorInfo::Other),
@@ -2934,6 +2991,7 @@ async fn stream_recovery_restores_previous_status_header() {
drain_insert_history(&mut rx);
chat.handle_codex_event(Event {
id: "delta".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "hello".to_string(),
}),
@@ -2954,6 +3012,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// Begin turn
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -2962,6 +3021,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// First finalized assistant message
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "First message".into(),
}),
@@ -2970,6 +3030,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// Second finalized assistant message in the same turn
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Second message".into(),
}),
@@ -2978,6 +3039,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
// End turn
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: None,
}),
@@ -3008,12 +3070,14 @@ async fn final_reasoning_then_message_without_deltas_are_rendered() {
// No deltas; only final reasoning followed by final message.
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoning(AgentReasoningEvent {
text: "I will first analyze the request.".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Here is the result.".into(),
}),
@@ -3035,24 +3099,28 @@ async fn deltas_then_same_final_message_are_rendered_snapshot() {
// Stream some reasoning deltas first.
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "I will ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "first analyze the ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "request.".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentReasoning(AgentReasoningEvent {
text: "request.".into(),
}),
@@ -3061,12 +3129,14 @@ async fn deltas_then_same_final_message_are_rendered_snapshot() {
// Then stream answer deltas, followed by the exact same final message.
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "Here is the ".into(),
}),
});
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "result.".into(),
}),
@@ -3074,6 +3144,7 @@ async fn deltas_then_same_final_message_are_rendered_snapshot() {
chat.handle_codex_event(Event {
id: "s1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent {
message: "Here is the result.".into(),
}),
@@ -3097,6 +3168,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::AgentMessage(AgentMessageEvent { message: "Im going to search the repo for where “Change Approved” is rendered to update that view.".into() }),
});
@@ -3116,6 +3188,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
let cwd = std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."));
chat.handle_codex_event(Event {
id: "c1".into(),
agent_id: None,
msg: EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: "c1".into(),
process_id: None,
@@ -3129,6 +3202,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
});
chat.handle_codex_event(Event {
id: "c1".into(),
agent_id: None,
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: "c1".into(),
process_id: None,
@@ -3148,12 +3222,14 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
});
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
});
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent {
delta: "**Investigating rendering code**".into(),
}),
@@ -3192,6 +3268,7 @@ async fn chatwidget_markdown_code_blocks_vt100_snapshot() {
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),
@@ -3240,6 +3317,7 @@ printf 'fenced within fenced\n'
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }),
});
// Drive commit ticks and drain emitted history lines into the vt100 buffer.
@@ -3263,6 +3341,7 @@ printf 'fenced within fenced\n'
// Finalize the stream without sending a final AgentMessage, to flush any tail.
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskComplete(TaskCompleteEvent {
last_agent_message: None,
}),
@@ -3280,6 +3359,7 @@ async fn chatwidget_tall() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;
chat.handle_codex_event(Event {
id: "t1".into(),
agent_id: None,
msg: EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: None,
}),