mirror of
https://github.com/openai/codex.git
synced 2026-02-03 07:23:39 +00:00
Compare commits
12 Commits
prototype
...
jif/async-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fc179ecdf0 | ||
|
|
e05540ea7f | ||
|
|
40d17f68e1 | ||
|
|
9497ce0906 | ||
|
|
3ea07b67d4 | ||
|
|
8bbcbdc0ed | ||
|
|
e9f4fc57ff | ||
|
|
542297e067 | ||
|
|
6aa1066001 | ||
|
|
305a7a2347 | ||
|
|
9ad7e72808 | ||
|
|
d3653f0c54 |
2
codex-rs/Cargo.lock
generated
2
codex-rs/Cargo.lock
generated
@@ -686,6 +686,7 @@ dependencies = [
|
||||
"codex-file-search",
|
||||
"codex-mcp-client",
|
||||
"codex-protocol",
|
||||
"codex-utils-readiness",
|
||||
"core_test_support",
|
||||
"dirs",
|
||||
"env-flags",
|
||||
@@ -946,6 +947,7 @@ dependencies = [
|
||||
"codex-login",
|
||||
"codex-ollama",
|
||||
"codex-protocol",
|
||||
"codex-utils-readiness",
|
||||
"color-eyre",
|
||||
"crossterm",
|
||||
"diffy",
|
||||
|
||||
@@ -22,6 +22,7 @@ codex-apply-patch = { workspace = true }
|
||||
codex-file-search = { workspace = true }
|
||||
codex-mcp-client = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-utils-readiness = { workspace = true }
|
||||
dirs = { workspace = true }
|
||||
env-flags = { workspace = true }
|
||||
eventsource-stream = { workspace = true }
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -44,7 +44,7 @@ pub(super) async fn spawn_compact_task(
|
||||
input: Vec<InputItem>,
|
||||
) {
|
||||
let task = AgentTask::compact(sess.clone(), turn_context, sub_id, input);
|
||||
sess.set_task(task).await;
|
||||
sess.set_task(task, None).await;
|
||||
}
|
||||
|
||||
pub(super) async fn run_inline_auto_compact_task(
|
||||
|
||||
@@ -3,11 +3,16 @@ use crate::error::Result as CodexResult;
|
||||
use crate::protocol::Event;
|
||||
use crate::protocol::Op;
|
||||
use crate::protocol::Submission;
|
||||
use codex_utils_readiness::ReadinessFlag;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
pub struct CodexConversation {
|
||||
codex: Codex,
|
||||
}
|
||||
|
||||
pub type TurnReadinessTx = oneshot::Sender<Arc<ReadinessFlag>>;
|
||||
|
||||
/// Conduit for the bidirectional stream of messages that compose a conversation
|
||||
/// in Codex.
|
||||
impl CodexConversation {
|
||||
@@ -27,4 +32,12 @@ impl CodexConversation {
|
||||
pub async fn next_event(&self) -> CodexResult<Event> {
|
||||
self.codex.next_event().await
|
||||
}
|
||||
|
||||
pub async fn submit_with_readiness(
|
||||
&self,
|
||||
op: Op,
|
||||
readiness: Option<TurnReadinessTx>,
|
||||
) -> CodexResult<String> {
|
||||
self.codex.submit_with_readiness(op, readiness).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,8 +13,10 @@ mod client;
|
||||
mod client_common;
|
||||
pub mod codex;
|
||||
mod codex_conversation;
|
||||
mod state;
|
||||
pub mod token_data;
|
||||
pub use codex_conversation::CodexConversation;
|
||||
pub use codex_conversation::TurnReadinessTx;
|
||||
pub mod config;
|
||||
pub mod config_edit;
|
||||
pub mod config_profile;
|
||||
|
||||
5
codex-rs/core/src/state/mod.rs
Normal file
5
codex-rs/core/src/state/mod.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
pub(crate) mod session;
|
||||
pub(crate) mod turn;
|
||||
|
||||
pub(crate) use session::SessionState;
|
||||
pub(crate) use turn::TurnState;
|
||||
18
codex-rs/core/src/state/session.rs
Normal file
18
codex-rs/core/src/state/session.rs
Normal file
@@ -0,0 +1,18 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
use crate::conversation_history::ConversationHistory;
|
||||
use crate::protocol::RateLimitSnapshot;
|
||||
use crate::protocol::ReviewDecision;
|
||||
use crate::protocol::TokenUsageInfo;
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct SessionState {
|
||||
pub(crate) approved_commands: HashSet<Vec<String>>,
|
||||
pub(crate) pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
|
||||
pub(crate) history: ConversationHistory,
|
||||
pub(crate) token_info: Option<TokenUsageInfo>,
|
||||
pub(crate) latest_rate_limits: Option<RateLimitSnapshot>,
|
||||
}
|
||||
243
codex-rs/core/src/state/turn.rs
Normal file
243
codex-rs/core/src/state/turn.rs
Normal file
@@ -0,0 +1,243 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Result;
|
||||
use codex_utils_readiness::Readiness;
|
||||
use codex_utils_readiness::ReadinessFlag;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::client::ModelClient;
|
||||
use crate::config_types::ShellEnvironmentPolicy;
|
||||
use crate::openai_tools::ToolsConfig;
|
||||
use crate::protocol::AskForApproval;
|
||||
use crate::protocol::FileChange;
|
||||
use crate::protocol::InputItem;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use crate::turn_diff_tracker::TurnDiffTracker;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TurnContext {
|
||||
pub(crate) client: ModelClient,
|
||||
/// The session's current working directory. All relative paths provided by
|
||||
/// the model as well as sandbox policies are resolved against this path
|
||||
/// instead of `std::env::current_dir()`.
|
||||
pub(crate) cwd: PathBuf,
|
||||
pub(crate) base_instructions: Option<String>,
|
||||
pub(crate) user_instructions: Option<String>,
|
||||
pub(crate) approval_policy: AskForApproval,
|
||||
pub(crate) sandbox_policy: SandboxPolicy,
|
||||
pub(crate) shell_environment_policy: ShellEnvironmentPolicy,
|
||||
pub(crate) tools_config: ToolsConfig,
|
||||
pub(crate) is_review_mode: bool,
|
||||
pub(crate) final_output_json_schema: Option<Value>,
|
||||
}
|
||||
|
||||
impl TurnContext {
|
||||
pub(crate) fn resolve_path(&self, path: Option<String>) -> PathBuf {
|
||||
path.as_ref()
|
||||
.map(PathBuf::from)
|
||||
.map_or_else(|| self.cwd.clone(), |p| self.cwd.join(p))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct TurnMailbox {
|
||||
initial_input: Option<ResponseInputItem>,
|
||||
latest_readiness: Option<Arc<ReadinessFlag>>,
|
||||
pending: VecDeque<ResponseInputItem>,
|
||||
}
|
||||
|
||||
impl TurnMailbox {
|
||||
fn seed(
|
||||
initial_input: Option<ResponseInputItem>,
|
||||
readiness: Option<Arc<ReadinessFlag>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
initial_input,
|
||||
latest_readiness: readiness,
|
||||
pending: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn take_initial_input(&mut self) -> Option<ResponseInputItem> {
|
||||
self.initial_input.take()
|
||||
}
|
||||
|
||||
fn enqueue(&mut self, items: Vec<InputItem>, readiness: Option<Arc<ReadinessFlag>>) {
|
||||
if items.is_empty() && readiness.is_none() {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Some(flag) = readiness {
|
||||
self.latest_readiness = Some(flag);
|
||||
}
|
||||
|
||||
if items.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
self.pending.push_back(items.into());
|
||||
}
|
||||
|
||||
fn drain(&mut self) -> TurnDrain {
|
||||
let items = self
|
||||
.pending
|
||||
.drain(..)
|
||||
.map(ResponseItem::from)
|
||||
.collect::<Vec<_>>();
|
||||
let readiness = self.latest_readiness.clone();
|
||||
TurnDrain { items, readiness }
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct TurnDrain {
|
||||
items: Vec<ResponseItem>,
|
||||
readiness: Option<Arc<ReadinessFlag>>,
|
||||
}
|
||||
|
||||
impl TurnDrain {
|
||||
pub(crate) fn into_parts(self) -> (Vec<ResponseItem>, Option<Arc<ReadinessFlag>>) {
|
||||
(self.items, self.readiness)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct TurnRuntime {
|
||||
mailbox: TurnMailbox,
|
||||
review_history: Vec<ResponseItem>,
|
||||
last_agent_message: Option<String>,
|
||||
auto_compact_recently_attempted: bool,
|
||||
diff_tracker: TurnDiffTracker,
|
||||
}
|
||||
|
||||
impl TurnRuntime {
|
||||
fn new(
|
||||
initial_input: Option<ResponseInputItem>,
|
||||
readiness: Option<Arc<ReadinessFlag>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
mailbox: TurnMailbox::seed(initial_input, readiness),
|
||||
..Self::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct TurnState {
|
||||
sub_id: String,
|
||||
turn_context: Arc<TurnContext>,
|
||||
runtime: RwLock<TurnRuntime>,
|
||||
}
|
||||
|
||||
impl TurnState {
|
||||
pub(crate) fn new(
|
||||
sub_id: String,
|
||||
turn_context: Arc<TurnContext>,
|
||||
initial_input: Vec<InputItem>,
|
||||
readiness: Option<Arc<ReadinessFlag>>,
|
||||
) -> Self {
|
||||
let initial_input = if initial_input.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(initial_input.into())
|
||||
};
|
||||
let runtime = TurnRuntime::new(initial_input, readiness);
|
||||
Self {
|
||||
sub_id,
|
||||
turn_context,
|
||||
runtime: RwLock::new(runtime),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn sub_id(&self) -> &str {
|
||||
&self.sub_id
|
||||
}
|
||||
|
||||
pub(crate) fn turn_context(&self) -> Arc<TurnContext> {
|
||||
Arc::clone(&self.turn_context)
|
||||
}
|
||||
|
||||
pub(crate) async fn take_initial_input(&self) -> Option<ResponseInputItem> {
|
||||
let mut runtime = self.runtime.write().await;
|
||||
runtime.mailbox.take_initial_input()
|
||||
}
|
||||
|
||||
pub(crate) async fn drain_mailbox(&self) -> TurnDrain {
|
||||
let mut runtime = self.runtime.write().await;
|
||||
runtime.mailbox.drain()
|
||||
}
|
||||
|
||||
pub(crate) async fn enqueue_user_input(
|
||||
&self,
|
||||
items: Vec<InputItem>,
|
||||
readiness: Option<Arc<ReadinessFlag>>,
|
||||
) {
|
||||
let mut runtime = self.runtime.write().await;
|
||||
runtime.mailbox.enqueue(items, readiness);
|
||||
}
|
||||
|
||||
pub(crate) async fn set_review_history(&self, history: Vec<ResponseItem>) {
|
||||
let mut runtime = self.runtime.write().await;
|
||||
runtime.review_history = history;
|
||||
}
|
||||
|
||||
pub(crate) async fn extend_review_history(&self, items: &[ResponseItem]) {
|
||||
if items.is_empty() {
|
||||
return;
|
||||
}
|
||||
let mut runtime = self.runtime.write().await;
|
||||
runtime.review_history.extend(items.iter().cloned());
|
||||
}
|
||||
|
||||
pub(crate) async fn review_history(&self) -> Vec<ResponseItem> {
|
||||
let runtime = self.runtime.read().await;
|
||||
runtime.review_history.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn mark_auto_compact_attempted(&self) -> bool {
|
||||
let mut runtime = self.runtime.write().await;
|
||||
let already_attempted = runtime.auto_compact_recently_attempted;
|
||||
runtime.auto_compact_recently_attempted = true;
|
||||
already_attempted
|
||||
}
|
||||
|
||||
pub(crate) async fn reset_auto_compact_attempted(&self) {
|
||||
let mut runtime = self.runtime.write().await;
|
||||
runtime.auto_compact_recently_attempted = false;
|
||||
}
|
||||
|
||||
pub(crate) async fn set_last_agent_message(&self, message: Option<String>) {
|
||||
let mut runtime = self.runtime.write().await;
|
||||
runtime.last_agent_message = message;
|
||||
}
|
||||
|
||||
pub(crate) async fn last_agent_message(&self) -> Option<String> {
|
||||
let runtime = self.runtime.read().await;
|
||||
runtime.last_agent_message.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn on_patch_begin(&self, changes: &HashMap<PathBuf, FileChange>) {
|
||||
let mut runtime = self.runtime.write().await;
|
||||
runtime.diff_tracker.on_patch_begin(changes);
|
||||
}
|
||||
|
||||
pub(crate) async fn take_unified_diff(&self) -> Result<Option<String>> {
|
||||
let mut runtime = self.runtime.write().await;
|
||||
runtime.diff_tracker.get_unified_diff()
|
||||
}
|
||||
|
||||
pub(crate) async fn latest_readiness(&self) -> Option<Arc<ReadinessFlag>> {
|
||||
let runtime = self.runtime.read().await;
|
||||
runtime.mailbox.latest_readiness.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn wait_on_readiness(&self) {
|
||||
if let Some(flag) = self.latest_readiness().await {
|
||||
flag.wait_ready().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -39,6 +39,7 @@ codex-git-tooling = { workspace = true }
|
||||
codex-login = { workspace = true }
|
||||
codex-ollama = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-utils-readiness = { workspace = true }
|
||||
color-eyre = { workspace = true }
|
||||
crossterm = { workspace = true, features = [
|
||||
"bracketed-paste",
|
||||
|
||||
@@ -278,6 +278,9 @@ impl App {
|
||||
AppEvent::ConversationHistory(ev) => {
|
||||
self.on_conversation_history_for_backtrack(tui, ev).await?;
|
||||
}
|
||||
AppEvent::GhostSnapshotResult(event) => {
|
||||
self.chat_widget.handle_ghost_snapshot_event(event);
|
||||
}
|
||||
AppEvent::ExitRequest => {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::path::PathBuf;
|
||||
use codex_core::protocol::ConversationPathResponseEvent;
|
||||
use codex_core::protocol::Event;
|
||||
use codex_file_search::FileMatch;
|
||||
use codex_git_tooling::GhostCommit;
|
||||
|
||||
use crate::history_cell::HistoryCell;
|
||||
|
||||
@@ -68,6 +69,9 @@ pub(crate) enum AppEvent {
|
||||
/// Forwarded conversation history snapshot from the current conversation.
|
||||
ConversationHistory(ConversationPathResponseEvent),
|
||||
|
||||
/// Result of a ghost snapshot capture attempt.
|
||||
GhostSnapshotResult(GhostSnapshotEvent),
|
||||
|
||||
/// Open the branch picker option from the review popup.
|
||||
OpenReviewBranchPicker(PathBuf),
|
||||
|
||||
@@ -77,3 +81,12 @@ pub(crate) enum AppEvent {
|
||||
/// Open the custom prompt option from the review popup.
|
||||
OpenReviewCustomPrompt,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum GhostSnapshotEvent {
|
||||
Success(GhostCommit),
|
||||
Disabled {
|
||||
message: String,
|
||||
hint: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -58,6 +58,7 @@ use tokio::sync::mpsc::UnboundedSender;
|
||||
use tracing::debug;
|
||||
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event::GhostSnapshotEvent;
|
||||
use crate::app_event_sender::AppEventSender;
|
||||
use crate::bottom_pane::BottomPane;
|
||||
use crate::bottom_pane::BottomPaneParams;
|
||||
@@ -109,6 +110,10 @@ use codex_git_tooling::GhostCommit;
|
||||
use codex_git_tooling::GitToolingError;
|
||||
use codex_git_tooling::create_ghost_commit;
|
||||
use codex_git_tooling::restore_ghost_commit;
|
||||
use codex_utils_readiness::Readiness;
|
||||
use codex_utils_readiness::ReadinessFlag;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::warn;
|
||||
|
||||
const MAX_TRACKED_GHOST_COMMITS: usize = 20;
|
||||
|
||||
@@ -181,7 +186,7 @@ pub(crate) struct ChatWidgetInit {
|
||||
|
||||
pub(crate) struct ChatWidget {
|
||||
app_event_tx: AppEventSender,
|
||||
codex_op_tx: UnboundedSender<Op>,
|
||||
codex_op_tx: UnboundedSender<agent::OutgoingOp>,
|
||||
bottom_pane: BottomPane,
|
||||
active_exec_cell: Option<ExecCell>,
|
||||
config: Config,
|
||||
@@ -772,12 +777,13 @@ impl ChatWidget {
|
||||
} = common;
|
||||
let mut rng = rand::rng();
|
||||
let placeholder = EXAMPLE_PROMPTS[rng.random_range(0..EXAMPLE_PROMPTS.len())].to_string();
|
||||
let codex_op_tx = spawn_agent(config.clone(), app_event_tx.clone(), conversation_manager);
|
||||
let agent_channels =
|
||||
spawn_agent(config.clone(), app_event_tx.clone(), conversation_manager);
|
||||
|
||||
Self {
|
||||
app_event_tx: app_event_tx.clone(),
|
||||
frame_requester: frame_requester.clone(),
|
||||
codex_op_tx,
|
||||
codex_op_tx: agent_channels.op_tx,
|
||||
bottom_pane: BottomPane::new(BottomPaneParams {
|
||||
frame_requester,
|
||||
app_event_tx,
|
||||
@@ -810,7 +816,7 @@ impl ChatWidget {
|
||||
pending_notification: None,
|
||||
is_review_mode: false,
|
||||
ghost_snapshots: Vec::new(),
|
||||
ghost_snapshots_disabled: true,
|
||||
ghost_snapshots_disabled: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -832,13 +838,13 @@ impl ChatWidget {
|
||||
let mut rng = rand::rng();
|
||||
let placeholder = EXAMPLE_PROMPTS[rng.random_range(0..EXAMPLE_PROMPTS.len())].to_string();
|
||||
|
||||
let codex_op_tx =
|
||||
let agent_channels =
|
||||
spawn_agent_from_existing(conversation, session_configured, app_event_tx.clone());
|
||||
|
||||
Self {
|
||||
app_event_tx: app_event_tx.clone(),
|
||||
frame_requester: frame_requester.clone(),
|
||||
codex_op_tx,
|
||||
codex_op_tx: agent_channels.op_tx,
|
||||
bottom_pane: BottomPane::new(BottomPaneParams {
|
||||
frame_requester,
|
||||
app_event_tx,
|
||||
@@ -871,7 +877,7 @@ impl ChatWidget {
|
||||
pending_notification: None,
|
||||
is_review_mode: false,
|
||||
ghost_snapshots: Vec::new(),
|
||||
ghost_snapshots_disabled: true,
|
||||
ghost_snapshots_disabled: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1121,7 +1127,48 @@ impl ChatWidget {
|
||||
return;
|
||||
}
|
||||
|
||||
self.capture_ghost_snapshot();
|
||||
let (readiness_tx, readiness_rx) = oneshot::channel::<Arc<ReadinessFlag>>();
|
||||
let capture_snapshot = !self.ghost_snapshots_disabled;
|
||||
let repo_path = self.config.cwd.clone();
|
||||
let app_event_tx = self.app_event_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let Ok(flag) = readiness_rx.await else {
|
||||
return;
|
||||
};
|
||||
let readiness_token = flag.subscribe().await.ok();
|
||||
if capture_snapshot {
|
||||
let event = match create_ghost_commit(&CreateGhostCommitOptions::new(
|
||||
repo_path.as_path(),
|
||||
)) {
|
||||
Ok(commit) => {
|
||||
AppEvent::GhostSnapshotResult(GhostSnapshotEvent::Success(commit))
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("failed to create ghost snapshot: {err}");
|
||||
let (message, hint) = match &err {
|
||||
GitToolingError::NotAGitRepository { .. } => (
|
||||
"Snapshots disabled: current directory is not a Git repository.".to_string(),
|
||||
None,
|
||||
),
|
||||
_ => (
|
||||
format!("Snapshots disabled after error: {err}"),
|
||||
Some(
|
||||
"Restart Codex after resolving the issue to re-enable snapshots.".to_string(),
|
||||
),
|
||||
),
|
||||
};
|
||||
AppEvent::GhostSnapshotResult(GhostSnapshotEvent::Disabled {
|
||||
message,
|
||||
hint,
|
||||
})
|
||||
}
|
||||
};
|
||||
app_event_tx.send(event);
|
||||
}
|
||||
if let Some(token) = readiness_token {
|
||||
let _ = flag.mark_ready(token).await;
|
||||
}
|
||||
});
|
||||
|
||||
let mut items: Vec<InputItem> = Vec::new();
|
||||
|
||||
@@ -1134,7 +1181,10 @@ impl ChatWidget {
|
||||
}
|
||||
|
||||
self.codex_op_tx
|
||||
.send(Op::UserInput { items })
|
||||
.send(agent::OutgoingOp {
|
||||
op: Op::UserInput { items },
|
||||
readiness: Some(readiness_tx),
|
||||
})
|
||||
.unwrap_or_else(|e| {
|
||||
tracing::error!("failed to send message: {e}");
|
||||
});
|
||||
@@ -1142,7 +1192,10 @@ impl ChatWidget {
|
||||
// Persist the text to cross-session message history.
|
||||
if !text.is_empty() {
|
||||
self.codex_op_tx
|
||||
.send(Op::AddToHistory { text: text.clone() })
|
||||
.send(agent::OutgoingOp {
|
||||
op: Op::AddToHistory { text: text.clone() },
|
||||
readiness: None,
|
||||
})
|
||||
.unwrap_or_else(|e| {
|
||||
tracing::error!("failed to send AddHistory op: {e}");
|
||||
});
|
||||
@@ -1154,37 +1207,17 @@ impl ChatWidget {
|
||||
}
|
||||
}
|
||||
|
||||
fn capture_ghost_snapshot(&mut self) {
|
||||
if self.ghost_snapshots_disabled {
|
||||
return;
|
||||
}
|
||||
|
||||
let options = CreateGhostCommitOptions::new(&self.config.cwd);
|
||||
match create_ghost_commit(&options) {
|
||||
Ok(commit) => {
|
||||
pub(crate) fn handle_ghost_snapshot_event(&mut self, event: GhostSnapshotEvent) {
|
||||
match event {
|
||||
GhostSnapshotEvent::Success(commit) => {
|
||||
self.ghost_snapshots.push(commit);
|
||||
if self.ghost_snapshots.len() > MAX_TRACKED_GHOST_COMMITS {
|
||||
self.ghost_snapshots.remove(0);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
GhostSnapshotEvent::Disabled { message, hint } => {
|
||||
self.ghost_snapshots_disabled = true;
|
||||
let (message, hint) = match &err {
|
||||
GitToolingError::NotAGitRepository { .. } => (
|
||||
"Snapshots disabled: current directory is not a Git repository."
|
||||
.to_string(),
|
||||
None,
|
||||
),
|
||||
_ => (
|
||||
format!("Snapshots disabled after error: {err}"),
|
||||
Some(
|
||||
"Restart Codex after resolving the issue to re-enable snapshots."
|
||||
.to_string(),
|
||||
),
|
||||
),
|
||||
};
|
||||
self.add_info_message(message, hint);
|
||||
tracing::warn!("failed to create ghost snapshot: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1651,7 +1684,10 @@ impl ChatWidget {
|
||||
pub(crate) fn submit_op(&self, op: Op) {
|
||||
// Record outbound operation for session replay fidelity.
|
||||
crate::session_log::log_outbound_op(&op);
|
||||
if let Err(e) = self.codex_op_tx.send(op) {
|
||||
if let Err(e) = self.codex_op_tx.send(agent::OutgoingOp {
|
||||
op,
|
||||
readiness: None,
|
||||
}) {
|
||||
tracing::error!("failed to submit op: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::sync::Arc;
|
||||
use codex_core::CodexConversation;
|
||||
use codex_core::ConversationManager;
|
||||
use codex_core::NewConversation;
|
||||
use codex_core::TurnReadinessTx;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::protocol::Op;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
@@ -11,14 +12,23 @@ use tokio::sync::mpsc::unbounded_channel;
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event_sender::AppEventSender;
|
||||
|
||||
pub(crate) struct AgentChannels {
|
||||
pub(crate) op_tx: UnboundedSender<OutgoingOp>,
|
||||
}
|
||||
|
||||
pub(crate) struct OutgoingOp {
|
||||
pub(crate) op: Op,
|
||||
pub(crate) readiness: Option<TurnReadinessTx>,
|
||||
}
|
||||
|
||||
/// Spawn the agent bootstrapper and op forwarding loop, returning the
|
||||
/// `UnboundedSender<Op>` used by the UI to submit operations.
|
||||
/// channels used by the UI to submit operations and register turn readiness.
|
||||
pub(crate) fn spawn_agent(
|
||||
config: Config,
|
||||
app_event_tx: AppEventSender,
|
||||
server: Arc<ConversationManager>,
|
||||
) -> UnboundedSender<Op> {
|
||||
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
|
||||
) -> AgentChannels {
|
||||
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<OutgoingOp>();
|
||||
|
||||
let app_event_tx_clone = app_event_tx;
|
||||
tokio::spawn(async move {
|
||||
@@ -45,8 +55,10 @@ pub(crate) fn spawn_agent(
|
||||
|
||||
let conversation_clone = conversation.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Some(op) = codex_op_rx.recv().await {
|
||||
let id = conversation_clone.submit(op).await;
|
||||
while let Some(outgoing) = codex_op_rx.recv().await {
|
||||
let id = conversation_clone
|
||||
.submit_with_readiness(outgoing.op, outgoing.readiness)
|
||||
.await;
|
||||
if let Err(e) = id {
|
||||
tracing::error!("failed to submit op: {e}");
|
||||
}
|
||||
@@ -58,7 +70,7 @@ pub(crate) fn spawn_agent(
|
||||
}
|
||||
});
|
||||
|
||||
codex_op_tx
|
||||
AgentChannels { op_tx: codex_op_tx }
|
||||
}
|
||||
|
||||
/// Spawn agent loops for an existing conversation (e.g., a forked conversation).
|
||||
@@ -68,8 +80,8 @@ pub(crate) fn spawn_agent_from_existing(
|
||||
conversation: std::sync::Arc<CodexConversation>,
|
||||
session_configured: codex_core::protocol::SessionConfiguredEvent,
|
||||
app_event_tx: AppEventSender,
|
||||
) -> UnboundedSender<Op> {
|
||||
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
|
||||
) -> AgentChannels {
|
||||
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<OutgoingOp>();
|
||||
|
||||
let app_event_tx_clone = app_event_tx;
|
||||
tokio::spawn(async move {
|
||||
@@ -82,8 +94,10 @@ pub(crate) fn spawn_agent_from_existing(
|
||||
|
||||
let conversation_clone = conversation.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Some(op) = codex_op_rx.recv().await {
|
||||
let id = conversation_clone.submit(op).await;
|
||||
while let Some(outgoing) = codex_op_rx.recv().await {
|
||||
let id = conversation_clone
|
||||
.submit_with_readiness(outgoing.op, outgoing.readiness)
|
||||
.await;
|
||||
if let Err(e) = id {
|
||||
tracing::error!("failed to submit op: {e}");
|
||||
}
|
||||
@@ -95,5 +109,5 @@ pub(crate) fn spawn_agent_from_existing(
|
||||
}
|
||||
});
|
||||
|
||||
codex_op_tx
|
||||
AgentChannels { op_tx: codex_op_tx }
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use super::*;
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event_sender::AppEventSender;
|
||||
use crate::chatwidget::agent;
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_core::config::Config;
|
||||
@@ -43,6 +44,7 @@ use std::fs::File;
|
||||
use std::io::BufRead;
|
||||
use std::io::BufReader;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tempfile::NamedTempFile;
|
||||
use tokio::sync::mpsc::unbounded_channel;
|
||||
|
||||
@@ -295,11 +297,11 @@ async fn helpers_are_available_and_do_not_panic() {
|
||||
fn make_chatwidget_manual() -> (
|
||||
ChatWidget,
|
||||
tokio::sync::mpsc::UnboundedReceiver<AppEvent>,
|
||||
tokio::sync::mpsc::UnboundedReceiver<Op>,
|
||||
tokio::sync::mpsc::UnboundedReceiver<agent::OutgoingOp>,
|
||||
) {
|
||||
let (tx_raw, rx) = unbounded_channel::<AppEvent>();
|
||||
let app_event_tx = AppEventSender::new(tx_raw);
|
||||
let (op_tx, op_rx) = unbounded_channel::<Op>();
|
||||
let (op_tx, op_rx) = unbounded_channel::<agent::OutgoingOp>();
|
||||
let cfg = test_config();
|
||||
let bottom = BottomPane::new(BottomPaneParams {
|
||||
app_event_tx: app_event_tx.clone(),
|
||||
@@ -345,7 +347,7 @@ pub(crate) fn make_chatwidget_manual_with_sender() -> (
|
||||
ChatWidget,
|
||||
AppEventSender,
|
||||
tokio::sync::mpsc::UnboundedReceiver<AppEvent>,
|
||||
tokio::sync::mpsc::UnboundedReceiver<Op>,
|
||||
tokio::sync::mpsc::UnboundedReceiver<agent::OutgoingOp>,
|
||||
) {
|
||||
let (widget, rx, op_rx) = make_chatwidget_manual();
|
||||
let app_event_tx = widget.app_event_tx.clone();
|
||||
@@ -1672,7 +1674,8 @@ fn apply_patch_full_flow_integration_like() {
|
||||
let forwarded = op_rx
|
||||
.try_recv()
|
||||
.expect("expected op forwarded to codex channel");
|
||||
match forwarded {
|
||||
assert!(forwarded.readiness.is_none());
|
||||
match forwarded.op {
|
||||
Op::PatchApproval { id, decision } => {
|
||||
assert_eq!(id, "sub-xyz");
|
||||
assert!(matches!(
|
||||
|
||||
@@ -97,5 +97,6 @@ pub fn built_in_slash_commands() -> Vec<(&'static str, SlashCommand)> {
|
||||
}
|
||||
|
||||
fn beta_features_enabled() -> bool {
|
||||
std::env::var_os("BETA_FEATURE").is_some()
|
||||
true
|
||||
// std::env::var_os("BETA_FEATURE").is_some()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user