Compare commits

...

3 Commits

Author SHA1 Message Date
jif-oai
286931883d Merge branch 'main' into jif/fix-agent-steer 2026-03-02 18:51:31 +00:00
jif-oai
5e177c9221 larger fix 2026-03-02 18:51:21 +00:00
jif-oai
377914403c fix: pending messages in /agent 2026-03-02 14:40:47 +00:00
5 changed files with 960 additions and 7 deletions

View File

@@ -12,6 +12,7 @@ use crate::bottom_pane::SelectionViewParams;
use crate::bottom_pane::popup_consts::standard_popup_hint_line;
use crate::chatwidget::ChatWidget;
use crate::chatwidget::ExternalEditorState;
use crate::chatwidget::ThreadInputState;
use crate::cwd_prompt::CwdPromptAction;
use crate::diff_render::DiffSummary;
use crate::exec_command::strip_bash_lc_and_escape;
@@ -254,6 +255,7 @@ struct SessionSummary {
struct ThreadEventSnapshot {
session_configured: Option<Event>,
events: Vec<Event>,
input_state: Option<ThreadInputState>,
}
#[derive(Debug)]
@@ -262,6 +264,7 @@ struct ThreadEventStore {
buffer: VecDeque<Event>,
user_message_ids: HashSet<String>,
pending_interactive_replay: PendingInteractiveReplayState,
input_state: Option<ThreadInputState>,
capacity: usize,
active: bool,
}
@@ -273,6 +276,7 @@ impl ThreadEventStore {
buffer: VecDeque::new(),
user_message_ids: HashSet::new(),
pending_interactive_replay: PendingInteractiveReplayState::default(),
input_state: None,
capacity,
active: false,
}
@@ -342,6 +346,7 @@ impl ThreadEventStore {
})
.cloned()
.collect(),
input_state: self.input_state.clone(),
}
}
@@ -917,13 +922,15 @@ impl App {
let Some(active_id) = self.active_thread_id else {
return;
};
let Some(receiver) = self.active_thread_rx.take() else {
return;
};
let input_state = self.chat_widget.capture_thread_input_state();
if let Some(channel) = self.thread_event_channels.get_mut(&active_id) {
let receiver = self.active_thread_rx.take();
let mut store = channel.store.lock().await;
store.active = false;
channel.receiver = Some(receiver);
store.input_state = input_state;
if let Some(receiver) = receiver {
channel.receiver = Some(receiver);
}
}
}
@@ -1335,7 +1342,7 @@ impl App {
self.chat_widget = ChatWidget::new_with_op_sender(init, codex_op_tx);
self.reset_for_thread_switch(tui)?;
self.replay_thread_snapshot(snapshot);
self.replay_thread_snapshot(snapshot, !is_replay_only);
if is_replay_only {
self.chat_widget.add_info_message(
format!("Agent thread {thread_id} is closed. Replaying saved transcript."),
@@ -1466,13 +1473,24 @@ impl App {
(active_thread_id != primary_thread_id).then_some((active_thread_id, primary_thread_id))
}
fn replay_thread_snapshot(&mut self, snapshot: ThreadEventSnapshot) {
fn replay_thread_snapshot(
&mut self,
snapshot: ThreadEventSnapshot,
resume_restored_queue: bool,
) {
if let Some(event) = snapshot.session_configured {
self.handle_codex_event_replay(event);
}
self.chat_widget.set_queue_autosend_suppressed(true);
self.chat_widget
.restore_thread_input_state(snapshot.input_state);
for event in snapshot.events {
self.handle_codex_event_replay(event);
}
self.chat_widget.set_queue_autosend_suppressed(false);
if resume_restored_queue {
self.chat_widget.maybe_send_next_queued_input();
}
self.refresh_status_line();
}
@@ -3617,7 +3635,12 @@ mod tests {
use codex_core::config::types::ModelAvailabilityNuxConfig;
use codex_otel::OtelManager;
use codex_protocol::ThreadId;
use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::CollaborationModeMask;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Settings;
use codex_protocol::openai_models::ModelAvailabilityNux;
use codex_protocol::protocol::AgentMessageDeltaEvent;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
@@ -3625,6 +3648,10 @@ mod tests {
use codex_protocol::protocol::SessionConfiguredEvent;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::ThreadRolledBackEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnStartedEvent;
use codex_protocol::protocol::UserMessageEvent;
use codex_protocol::user_input::TextElement;
use codex_protocol::user_input::UserInput;
@@ -3926,6 +3953,755 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn replay_thread_snapshot_restores_draft_and_queued_input() {
let mut app = make_test_app().await;
let thread_id = ThreadId::new();
app.thread_event_channels.insert(
thread_id,
ThreadEventChannel::new_with_session_configured(
THREAD_EVENT_CHANNEL_CAPACITY,
Event {
id: "session-configured".to_string(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: thread_id,
forked_from_id: None,
thread_name: None,
model: "gpt-test".to_string(),
model_provider_id: "test-provider".to_string(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
cwd: PathBuf::from("/tmp/project"),
reasoning_effort: None,
history_log_id: 0,
history_entry_count: 0,
initial_messages: None,
network_proxy: None,
rollout_path: Some(PathBuf::new()),
}),
},
),
);
app.activate_thread_channel(thread_id).await;
app.chat_widget
.apply_external_edit("draft prompt".to_string());
app.chat_widget.submit_user_message_with_mode(
"queued follow-up".to_string(),
CollaborationModeMask {
name: "Default".to_string(),
mode: None,
model: None,
reasoning_effort: None,
developer_instructions: None,
},
);
let expected_input_state = app
.chat_widget
.capture_thread_input_state()
.expect("expected thread input state");
app.store_active_thread_receiver().await;
let snapshot = {
let channel = app
.thread_event_channels
.get(&thread_id)
.expect("thread channel should exist");
let store = channel.store.lock().await;
assert_eq!(store.input_state, Some(expected_input_state));
store.snapshot()
};
let (chat_widget, _app_event_tx, _rx, mut new_op_rx) =
make_chatwidget_manual_with_sender().await;
app.chat_widget = chat_widget;
app.replay_thread_snapshot(snapshot, true);
assert_eq!(app.chat_widget.composer_text_with_pending(), "draft prompt");
assert!(app.chat_widget.queued_user_message_texts().is_empty());
match next_user_turn_op(&mut new_op_rx) {
Op::UserTurn { items, .. } => assert_eq!(
items,
vec![UserInput::Text {
text: "queued follow-up".to_string(),
text_elements: Vec::new(),
}]
),
other => panic!("expected queued follow-up submission, got {other:?}"),
}
}
#[tokio::test]
async fn replayed_turn_complete_submits_restored_queued_follow_up() {
let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await;
let thread_id = ThreadId::new();
let session_configured = Event {
id: "session-configured".to_string(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: thread_id,
forked_from_id: None,
thread_name: None,
model: "gpt-test".to_string(),
model_provider_id: "test-provider".to_string(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
cwd: PathBuf::from("/tmp/project"),
reasoning_effort: None,
history_log_id: 0,
history_entry_count: 0,
initial_messages: None,
network_proxy: None,
rollout_path: Some(PathBuf::new()),
}),
};
app.chat_widget
.handle_codex_event(session_configured.clone());
app.chat_widget.handle_codex_event(Event {
id: "turn-started".to_string(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".to_string(),
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
});
app.chat_widget.handle_codex_event(Event {
id: "agent-delta".to_string(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "streaming".to_string(),
}),
});
app.chat_widget
.apply_external_edit("queued follow-up".to_string());
app.chat_widget
.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
let input_state = app
.chat_widget
.capture_thread_input_state()
.expect("expected queued follow-up state");
let (chat_widget, _app_event_tx, _rx, mut new_op_rx) =
make_chatwidget_manual_with_sender().await;
app.chat_widget = chat_widget;
app.chat_widget.handle_codex_event(session_configured);
while new_op_rx.try_recv().is_ok() {}
app.replay_thread_snapshot(
ThreadEventSnapshot {
session_configured: None,
events: vec![Event {
id: "turn-complete".to_string(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: None,
}),
}],
input_state: Some(input_state),
},
true,
);
match next_user_turn_op(&mut new_op_rx) {
Op::UserTurn { items, .. } => assert_eq!(
items,
vec![UserInput::Text {
text: "queued follow-up".to_string(),
text_elements: Vec::new(),
}]
),
other => panic!("expected queued follow-up submission, got {other:?}"),
}
}
#[tokio::test]
async fn replay_only_thread_keeps_restored_queue_visible() {
let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await;
let thread_id = ThreadId::new();
let session_configured = Event {
id: "session-configured".to_string(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: thread_id,
forked_from_id: None,
thread_name: None,
model: "gpt-test".to_string(),
model_provider_id: "test-provider".to_string(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
cwd: PathBuf::from("/tmp/project"),
reasoning_effort: None,
history_log_id: 0,
history_entry_count: 0,
initial_messages: None,
network_proxy: None,
rollout_path: Some(PathBuf::new()),
}),
};
app.chat_widget
.handle_codex_event(session_configured.clone());
app.chat_widget.handle_codex_event(Event {
id: "turn-started".to_string(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".to_string(),
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
});
app.chat_widget.handle_codex_event(Event {
id: "agent-delta".to_string(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "streaming".to_string(),
}),
});
app.chat_widget
.apply_external_edit("queued follow-up".to_string());
app.chat_widget
.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
let input_state = app
.chat_widget
.capture_thread_input_state()
.expect("expected queued follow-up state");
let (chat_widget, _app_event_tx, _rx, mut new_op_rx) =
make_chatwidget_manual_with_sender().await;
app.chat_widget = chat_widget;
app.chat_widget.handle_codex_event(session_configured);
while new_op_rx.try_recv().is_ok() {}
app.replay_thread_snapshot(
ThreadEventSnapshot {
session_configured: None,
events: vec![Event {
id: "turn-complete".to_string(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: None,
}),
}],
input_state: Some(input_state),
},
false,
);
assert_eq!(
app.chat_widget.queued_user_message_texts(),
vec!["queued follow-up".to_string()]
);
assert!(
new_op_rx.try_recv().is_err(),
"replay-only threads should not auto-submit restored queue"
);
}
#[tokio::test]
async fn replay_thread_snapshot_keeps_queue_when_running_state_only_comes_from_snapshot() {
let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await;
let thread_id = ThreadId::new();
let session_configured = Event {
id: "session-configured".to_string(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: thread_id,
forked_from_id: None,
thread_name: None,
model: "gpt-test".to_string(),
model_provider_id: "test-provider".to_string(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
cwd: PathBuf::from("/tmp/project"),
reasoning_effort: None,
history_log_id: 0,
history_entry_count: 0,
initial_messages: None,
network_proxy: None,
rollout_path: Some(PathBuf::new()),
}),
};
app.chat_widget
.handle_codex_event(session_configured.clone());
app.chat_widget.handle_codex_event(Event {
id: "turn-started".to_string(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".to_string(),
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
});
app.chat_widget.handle_codex_event(Event {
id: "agent-delta".to_string(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "streaming".to_string(),
}),
});
app.chat_widget
.apply_external_edit("queued follow-up".to_string());
app.chat_widget
.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
let input_state = app
.chat_widget
.capture_thread_input_state()
.expect("expected queued follow-up state");
let (chat_widget, _app_event_tx, _rx, mut new_op_rx) =
make_chatwidget_manual_with_sender().await;
app.chat_widget = chat_widget;
app.chat_widget.handle_codex_event(session_configured);
while new_op_rx.try_recv().is_ok() {}
app.replay_thread_snapshot(
ThreadEventSnapshot {
session_configured: None,
events: vec![],
input_state: Some(input_state),
},
true,
);
assert_eq!(
app.chat_widget.queued_user_message_texts(),
vec!["queued follow-up".to_string()]
);
assert!(
new_op_rx.try_recv().is_err(),
"restored queue should stay queued when replay did not prove the turn finished"
);
}
#[tokio::test]
async fn replay_thread_snapshot_does_not_submit_queue_before_replay_catches_up() {
let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await;
let thread_id = ThreadId::new();
let session_configured = Event {
id: "session-configured".to_string(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: thread_id,
forked_from_id: None,
thread_name: None,
model: "gpt-test".to_string(),
model_provider_id: "test-provider".to_string(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
cwd: PathBuf::from("/tmp/project"),
reasoning_effort: None,
history_log_id: 0,
history_entry_count: 0,
initial_messages: None,
network_proxy: None,
rollout_path: Some(PathBuf::new()),
}),
};
app.chat_widget
.handle_codex_event(session_configured.clone());
app.chat_widget.handle_codex_event(Event {
id: "turn-started".to_string(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".to_string(),
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
});
app.chat_widget.handle_codex_event(Event {
id: "agent-delta".to_string(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "streaming".to_string(),
}),
});
app.chat_widget
.apply_external_edit("queued follow-up".to_string());
app.chat_widget
.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
let input_state = app
.chat_widget
.capture_thread_input_state()
.expect("expected queued follow-up state");
let (chat_widget, _app_event_tx, _rx, mut new_op_rx) =
make_chatwidget_manual_with_sender().await;
app.chat_widget = chat_widget;
app.chat_widget.handle_codex_event(session_configured);
while new_op_rx.try_recv().is_ok() {}
app.replay_thread_snapshot(
ThreadEventSnapshot {
session_configured: None,
events: vec![
Event {
id: "older-turn-complete".to_string(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-0".to_string(),
last_agent_message: None,
}),
},
Event {
id: "latest-turn-started".to_string(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".to_string(),
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
},
],
input_state: Some(input_state),
},
true,
);
assert!(
new_op_rx.try_recv().is_err(),
"queued follow-up should stay queued until the latest turn completes"
);
assert_eq!(
app.chat_widget.queued_user_message_texts(),
vec!["queued follow-up".to_string()]
);
app.chat_widget.handle_codex_event(Event {
id: "latest-turn-complete".to_string(),
msg: EventMsg::TurnComplete(TurnCompleteEvent {
turn_id: "turn-1".to_string(),
last_agent_message: None,
}),
});
match next_user_turn_op(&mut new_op_rx) {
Op::UserTurn { items, .. } => assert_eq!(
items,
vec![UserInput::Text {
text: "queued follow-up".to_string(),
text_elements: Vec::new(),
}]
),
other => panic!("expected queued follow-up submission, got {other:?}"),
}
}
#[tokio::test]
async fn replay_thread_snapshot_restores_pending_pastes_for_submit() {
let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await;
let thread_id = ThreadId::new();
app.thread_event_channels.insert(
thread_id,
ThreadEventChannel::new_with_session_configured(
THREAD_EVENT_CHANNEL_CAPACITY,
Event {
id: "session-configured".to_string(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: thread_id,
forked_from_id: None,
thread_name: None,
model: "gpt-test".to_string(),
model_provider_id: "test-provider".to_string(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
cwd: PathBuf::from("/tmp/project"),
reasoning_effort: None,
history_log_id: 0,
history_entry_count: 0,
initial_messages: None,
network_proxy: None,
rollout_path: Some(PathBuf::new()),
}),
},
),
);
app.activate_thread_channel(thread_id).await;
let large = "x".repeat(1005);
app.chat_widget.handle_paste(large.clone());
let expected_input_state = app
.chat_widget
.capture_thread_input_state()
.expect("expected thread input state");
app.store_active_thread_receiver().await;
let snapshot = {
let channel = app
.thread_event_channels
.get(&thread_id)
.expect("thread channel should exist");
let store = channel.store.lock().await;
assert_eq!(store.input_state, Some(expected_input_state));
store.snapshot()
};
let (chat_widget, _app_event_tx, _rx, mut new_op_rx) =
make_chatwidget_manual_with_sender().await;
app.chat_widget = chat_widget;
app.replay_thread_snapshot(snapshot, true);
assert_eq!(app.chat_widget.composer_text_with_pending(), large);
app.chat_widget
.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
match next_user_turn_op(&mut new_op_rx) {
Op::UserTurn { items, .. } => assert_eq!(
items,
vec![UserInput::Text {
text: large,
text_elements: Vec::new(),
}]
),
other => panic!("expected restored paste submission, got {other:?}"),
}
}
#[tokio::test]
async fn replay_thread_snapshot_restores_collaboration_mode_for_draft_submit() {
let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await;
let thread_id = ThreadId::new();
let session_configured = Event {
id: "session-configured".to_string(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: thread_id,
forked_from_id: None,
thread_name: None,
model: "gpt-test".to_string(),
model_provider_id: "test-provider".to_string(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
cwd: PathBuf::from("/tmp/project"),
reasoning_effort: None,
history_log_id: 0,
history_entry_count: 0,
initial_messages: None,
network_proxy: None,
rollout_path: Some(PathBuf::new()),
}),
};
app.chat_widget
.handle_codex_event(session_configured.clone());
app.chat_widget
.set_reasoning_effort(Some(ReasoningEffortConfig::High));
app.chat_widget
.set_collaboration_mask(CollaborationModeMask {
name: "Plan".to_string(),
mode: Some(ModeKind::Plan),
model: Some("gpt-restored".to_string()),
reasoning_effort: Some(Some(ReasoningEffortConfig::High)),
developer_instructions: None,
});
app.chat_widget
.apply_external_edit("draft prompt".to_string());
let input_state = app
.chat_widget
.capture_thread_input_state()
.expect("expected draft input state");
let (chat_widget, _app_event_tx, _rx, mut new_op_rx) =
make_chatwidget_manual_with_sender().await;
app.chat_widget = chat_widget;
app.chat_widget.handle_codex_event(session_configured);
app.chat_widget
.set_reasoning_effort(Some(ReasoningEffortConfig::Low));
app.chat_widget
.set_collaboration_mask(CollaborationModeMask {
name: "Default".to_string(),
mode: Some(ModeKind::Default),
model: Some("gpt-replacement".to_string()),
reasoning_effort: Some(Some(ReasoningEffortConfig::Low)),
developer_instructions: None,
});
while new_op_rx.try_recv().is_ok() {}
app.replay_thread_snapshot(
ThreadEventSnapshot {
session_configured: None,
events: vec![],
input_state: Some(input_state),
},
true,
);
app.chat_widget
.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
match next_user_turn_op(&mut new_op_rx) {
Op::UserTurn {
items,
model,
effort,
collaboration_mode,
..
} => {
assert_eq!(
items,
vec![UserInput::Text {
text: "draft prompt".to_string(),
text_elements: Vec::new(),
}]
);
assert_eq!(model, "gpt-restored".to_string());
assert_eq!(effort, Some(ReasoningEffortConfig::High));
assert_eq!(
collaboration_mode,
Some(CollaborationMode {
mode: ModeKind::Plan,
settings: Settings {
model: "gpt-restored".to_string(),
reasoning_effort: Some(ReasoningEffortConfig::High),
developer_instructions: None,
},
})
);
}
other => panic!("expected restored draft submission, got {other:?}"),
}
}
#[tokio::test]
async fn replay_thread_snapshot_restores_collaboration_mode_without_input() {
let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await;
let thread_id = ThreadId::new();
let session_configured = Event {
id: "session-configured".to_string(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: thread_id,
forked_from_id: None,
thread_name: None,
model: "gpt-test".to_string(),
model_provider_id: "test-provider".to_string(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
cwd: PathBuf::from("/tmp/project"),
reasoning_effort: None,
history_log_id: 0,
history_entry_count: 0,
initial_messages: None,
network_proxy: None,
rollout_path: Some(PathBuf::new()),
}),
};
app.chat_widget
.handle_codex_event(session_configured.clone());
app.chat_widget
.set_reasoning_effort(Some(ReasoningEffortConfig::High));
app.chat_widget
.set_collaboration_mask(CollaborationModeMask {
name: "Plan".to_string(),
mode: Some(ModeKind::Plan),
model: Some("gpt-restored".to_string()),
reasoning_effort: Some(Some(ReasoningEffortConfig::High)),
developer_instructions: None,
});
let input_state = app
.chat_widget
.capture_thread_input_state()
.expect("expected collaboration-only input state");
let (chat_widget, _app_event_tx, _rx, _new_op_rx) =
make_chatwidget_manual_with_sender().await;
app.chat_widget = chat_widget;
app.chat_widget.handle_codex_event(session_configured);
app.chat_widget
.set_reasoning_effort(Some(ReasoningEffortConfig::Low));
app.chat_widget
.set_collaboration_mask(CollaborationModeMask {
name: "Default".to_string(),
mode: Some(ModeKind::Default),
model: Some("gpt-replacement".to_string()),
reasoning_effort: Some(Some(ReasoningEffortConfig::Low)),
developer_instructions: None,
});
app.replay_thread_snapshot(
ThreadEventSnapshot {
session_configured: None,
events: vec![],
input_state: Some(input_state),
},
true,
);
assert_eq!(
app.chat_widget.active_collaboration_mode_kind(),
ModeKind::Plan
);
assert_eq!(app.chat_widget.current_model(), "gpt-restored");
assert_eq!(
app.chat_widget.current_reasoning_effort(),
Some(ReasoningEffortConfig::High)
);
}
#[tokio::test]
async fn replayed_interrupted_turn_restores_queued_input_to_composer() {
let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await;
let thread_id = ThreadId::new();
let session_configured = Event {
id: "session-configured".to_string(),
msg: EventMsg::SessionConfigured(SessionConfiguredEvent {
session_id: thread_id,
forked_from_id: None,
thread_name: None,
model: "gpt-test".to_string(),
model_provider_id: "test-provider".to_string(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::new_read_only_policy(),
cwd: PathBuf::from("/tmp/project"),
reasoning_effort: None,
history_log_id: 0,
history_entry_count: 0,
initial_messages: None,
network_proxy: None,
rollout_path: Some(PathBuf::new()),
}),
};
app.chat_widget
.handle_codex_event(session_configured.clone());
app.chat_widget.handle_codex_event(Event {
id: "turn-started".to_string(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
turn_id: "turn-1".to_string(),
model_context_window: None,
collaboration_mode_kind: Default::default(),
}),
});
app.chat_widget.handle_codex_event(Event {
id: "agent-delta".to_string(),
msg: EventMsg::AgentMessageDelta(AgentMessageDeltaEvent {
delta: "streaming".to_string(),
}),
});
app.chat_widget
.apply_external_edit("queued follow-up".to_string());
app.chat_widget
.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
let input_state = app
.chat_widget
.capture_thread_input_state()
.expect("expected queued follow-up state");
let (chat_widget, _app_event_tx, _rx, mut new_op_rx) =
make_chatwidget_manual_with_sender().await;
app.chat_widget = chat_widget;
app.chat_widget.handle_codex_event(session_configured);
while new_op_rx.try_recv().is_ok() {}
app.replay_thread_snapshot(
ThreadEventSnapshot {
session_configured: None,
events: vec![Event {
id: "turn-aborted".to_string(),
msg: EventMsg::TurnAborted(TurnAbortedEvent {
turn_id: Some("turn-1".to_string()),
reason: TurnAbortReason::ReviewEnded,
}),
}],
input_state: Some(input_state),
},
true,
);
assert_eq!(
app.chat_widget.composer_text_with_pending(),
"queued follow-up"
);
assert!(app.chat_widget.queued_user_message_texts().is_empty());
assert!(
new_op_rx.try_recv().is_err(),
"replayed interrupted turns should restore queued input for editing, not submit it"
);
}
#[tokio::test]
async fn open_agent_picker_keeps_missing_threads_for_replay() -> Result<()> {
let mut app = make_test_app().await;
@@ -4511,6 +5287,17 @@ mod tests {
)
}
fn next_user_turn_op(op_rx: &mut tokio::sync::mpsc::UnboundedReceiver<Op>) -> Op {
let mut seen = Vec::new();
while let Ok(op) = op_rx.try_recv() {
if matches!(op, Op::UserTurn { .. }) {
return op;
}
seen.push(format!("{op:?}"));
}
panic!("expected UserTurn op, saw: {seen:?}");
}
fn test_otel_manager(config: &Config, model: &str) -> OtelManager {
let model_info = codex_core::test_support::construct_model_info_offline(model, config);
OtelManager::new(

View File

@@ -574,6 +574,10 @@ impl BottomPane {
self.composer.current_text_with_pending()
}
pub(crate) fn composer_pending_pastes(&self) -> Vec<(String, String)> {
self.composer.pending_pastes()
}
pub(crate) fn apply_external_edit(&mut self, text: String) {
self.composer.apply_external_edit(text);
self.request_redraw();
@@ -599,6 +603,11 @@ impl BottomPane {
urls
}
pub(crate) fn set_composer_pending_pastes(&mut self, pending_pastes: Vec<(String, String)>) {
self.composer.set_pending_pastes(pending_pastes);
self.request_redraw();
}
/// Update the status indicator header (defaults to "Working") and details below it.
///
/// Passing `None` clears any existing details. No-ops if the status indicator is not active.

View File

@@ -604,6 +604,7 @@ pub(crate) struct ChatWidget {
retry_status_header: Option<String>,
// Set when commentary output completes; once stream queues go idle we restore the status row.
pending_status_indicator_restore: bool,
suppress_queue_autosend: bool,
thread_id: Option<ThreadId>,
thread_name: Option<String>,
forked_from: Option<ThreadId>,
@@ -711,6 +712,7 @@ pub(crate) struct ActiveCellTranscriptKey {
pub(crate) animation_tick: Option<u64>,
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct UserMessage {
text: String,
local_images: Vec<LocalImageAttachment>,
@@ -724,6 +726,36 @@ pub(crate) struct UserMessage {
mention_bindings: Vec<MentionBinding>,
}
#[derive(Debug, Clone, PartialEq, Default)]
struct ThreadComposerState {
text: String,
local_images: Vec<LocalImageAttachment>,
remote_image_urls: Vec<String>,
text_elements: Vec<TextElement>,
mention_bindings: Vec<MentionBinding>,
pending_pastes: Vec<(String, String)>,
}
impl ThreadComposerState {
fn has_content(&self) -> bool {
!self.text.is_empty()
|| !self.local_images.is_empty()
|| !self.remote_image_urls.is_empty()
|| !self.text_elements.is_empty()
|| !self.mention_bindings.is_empty()
|| !self.pending_pastes.is_empty()
}
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct ThreadInputState {
composer: Option<ThreadComposerState>,
queued_user_messages: VecDeque<UserMessage>,
current_collaboration_mode: CollaborationMode,
active_collaboration_mask: Option<CollaborationModeMask>,
agent_turn_running: bool,
}
impl From<String> for UserMessage {
fn from(text: String) -> Self {
Self {
@@ -1962,6 +1994,80 @@ impl ChatWidget {
);
}
pub(crate) fn capture_thread_input_state(&self) -> Option<ThreadInputState> {
let composer = ThreadComposerState {
text: self.bottom_pane.composer_text(),
text_elements: self.bottom_pane.composer_text_elements(),
local_images: self.bottom_pane.composer_local_images(),
remote_image_urls: self.bottom_pane.remote_image_urls(),
mention_bindings: self.bottom_pane.composer_mention_bindings(),
pending_pastes: self.bottom_pane.composer_pending_pastes(),
};
Some(ThreadInputState {
composer: composer.has_content().then_some(composer),
queued_user_messages: self.queued_user_messages.clone(),
current_collaboration_mode: self.current_collaboration_mode.clone(),
active_collaboration_mask: self.active_collaboration_mask.clone(),
agent_turn_running: self.agent_turn_running,
})
}
pub(crate) fn restore_thread_input_state(&mut self, input_state: Option<ThreadInputState>) {
if let Some(input_state) = input_state {
self.current_collaboration_mode = input_state.current_collaboration_mode;
self.active_collaboration_mask = input_state.active_collaboration_mask;
self.agent_turn_running = input_state.agent_turn_running;
self.update_collaboration_mode_indicator();
self.refresh_model_display();
if let Some(composer) = input_state.composer {
let local_image_paths = composer
.local_images
.into_iter()
.map(|img| img.path)
.collect();
self.set_remote_image_urls(composer.remote_image_urls);
self.bottom_pane.set_composer_text_with_mention_bindings(
composer.text,
composer.text_elements,
local_image_paths,
composer.mention_bindings,
);
self.bottom_pane
.set_composer_pending_pastes(composer.pending_pastes);
} else {
self.set_remote_image_urls(Vec::new());
self.bottom_pane.set_composer_text_with_mention_bindings(
String::new(),
Vec::new(),
Vec::new(),
Vec::new(),
);
self.bottom_pane.set_composer_pending_pastes(Vec::new());
}
self.queued_user_messages = input_state.queued_user_messages;
} else {
self.agent_turn_running = false;
self.set_remote_image_urls(Vec::new());
self.bottom_pane.set_composer_text_with_mention_bindings(
String::new(),
Vec::new(),
Vec::new(),
Vec::new(),
);
self.bottom_pane.set_composer_pending_pastes(Vec::new());
self.queued_user_messages.clear();
}
self.turn_sleep_inhibitor
.set_turn_running(self.agent_turn_running);
self.update_task_running_state();
self.refresh_queued_user_messages();
self.request_redraw();
}
pub(crate) fn set_queue_autosend_suppressed(&mut self, suppressed: bool) {
self.suppress_queue_autosend = suppressed;
}
fn on_plan_update(&mut self, update: UpdatePlanArgs) {
self.saw_plan_update_this_turn = true;
self.add_to_history(history_cell::new_plan_update(update));
@@ -2909,6 +3015,7 @@ impl ChatWidget {
current_status_header: String::from("Working"),
retry_status_header: None,
pending_status_indicator_restore: false,
suppress_queue_autosend: false,
thread_id: None,
thread_name: None,
forked_from: None,
@@ -3088,6 +3195,7 @@ impl ChatWidget {
current_status_header: String::from("Working"),
retry_status_header: None,
pending_status_indicator_restore: false,
suppress_queue_autosend: false,
thread_id: None,
thread_name: None,
forked_from: None,
@@ -3256,6 +3364,7 @@ impl ChatWidget {
current_status_header: String::from("Working"),
retry_status_header: None,
pending_status_indicator_restore: false,
suppress_queue_autosend: false,
thread_id: None,
thread_name: None,
forked_from: None,
@@ -4705,7 +4814,10 @@ impl ChatWidget {
}
// If idle and there are queued inputs, submit exactly one to start the next turn.
fn maybe_send_next_queued_input(&mut self) {
pub(crate) fn maybe_send_next_queued_input(&mut self) {
if self.suppress_queue_autosend {
return;
}
if self.bottom_pane.is_task_running() {
return;
}
@@ -7598,6 +7710,14 @@ impl ChatWidget {
self.bottom_pane.remote_image_urls()
}
#[cfg(test)]
pub(crate) fn queued_user_message_texts(&self) -> Vec<String> {
self.queued_user_messages
.iter()
.map(|message| message.text.clone())
.collect()
}
#[cfg(test)]
pub(crate) fn pending_thread_approvals(&self) -> &[String] {
self.bottom_pane.pending_thread_approvals()

View File

@@ -1701,6 +1701,7 @@ async fn make_chatwidget_manual(
current_status_header: String::from("Working"),
retry_status_header: None,
pending_status_indicator_restore: false,
suppress_queue_autosend: false,
thread_id: None,
thread_name: None,
forked_from: None,
@@ -3139,6 +3140,30 @@ async fn empty_enter_during_task_does_not_queue() {
assert!(chat.queued_user_messages.is_empty());
}
#[tokio::test]
async fn restore_thread_input_state_syncs_sleep_inhibitor_state() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;
chat.set_feature_enabled(Feature::PreventIdleSleep, true);
chat.restore_thread_input_state(Some(ThreadInputState {
composer: None,
queued_user_messages: VecDeque::new(),
current_collaboration_mode: chat.current_collaboration_mode.clone(),
active_collaboration_mask: chat.active_collaboration_mask.clone(),
agent_turn_running: true,
}));
assert!(chat.agent_turn_running);
assert!(chat.turn_sleep_inhibitor.is_turn_running());
assert!(chat.bottom_pane.is_task_running());
chat.restore_thread_input_state(None);
assert!(!chat.agent_turn_running);
assert!(!chat.turn_sleep_inhibitor.is_turn_running());
assert!(!chat.bottom_pane.is_task_running());
}
#[tokio::test]
async fn alt_up_edits_most_recent_queued_message() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;

View File

@@ -29,6 +29,7 @@ use windows_inhibitor as imp;
#[derive(Debug)]
pub struct SleepInhibitor {
enabled: bool,
turn_running: bool,
platform: imp::SleepInhibitor,
}
@@ -36,12 +37,14 @@ impl SleepInhibitor {
pub fn new(enabled: bool) -> Self {
Self {
enabled,
turn_running: false,
platform: imp::SleepInhibitor::new(),
}
}
/// Update the active turn state; turns sleep prevention on/off as needed.
pub fn set_turn_running(&mut self, turn_running: bool) {
self.turn_running = turn_running;
if !self.enabled {
self.release();
return;
@@ -61,6 +64,11 @@ impl SleepInhibitor {
fn release(&mut self) {
self.platform.release();
}
/// Return the latest turn-running state requested by the caller.
pub fn is_turn_running(&self) -> bool {
self.turn_running
}
}
#[cfg(test)]
@@ -71,14 +79,18 @@ mod tests {
fn sleep_inhibitor_toggles_without_panicking() {
let mut inhibitor = SleepInhibitor::new(true);
inhibitor.set_turn_running(true);
assert!(inhibitor.is_turn_running());
inhibitor.set_turn_running(false);
assert!(!inhibitor.is_turn_running());
}
#[test]
fn sleep_inhibitor_disabled_does_not_panic() {
let mut inhibitor = SleepInhibitor::new(false);
inhibitor.set_turn_running(true);
assert!(inhibitor.is_turn_running());
inhibitor.set_turn_running(false);
assert!(!inhibitor.is_turn_running());
}
#[test]