This commit is contained in:
jimmyfraiture
2025-09-24 13:03:14 +01:00
parent d3653f0c54
commit 9ad7e72808
10 changed files with 270 additions and 151 deletions

2
codex-rs/Cargo.lock generated
View File

@@ -686,6 +686,7 @@ dependencies = [
"codex-file-search",
"codex-mcp-client",
"codex-protocol",
"codex-utils-readiness",
"core_test_support",
"dirs",
"env-flags",
@@ -946,6 +947,7 @@ dependencies = [
"codex-login",
"codex-ollama",
"codex-protocol",
"codex-utils-readiness",
"color-eyre",
"crossterm",
"diffy",

View File

@@ -22,6 +22,7 @@ codex-apply-patch = { workspace = true }
codex-file-search = { workspace = true }
codex-mcp-client = { workspace = true }
codex-protocol = { workspace = true }
codex-utils-readiness = { workspace = true }
dirs = { workspace = true }
env-flags = { workspace = true }
eventsource-stream = { workspace = true }

View File

@@ -1,6 +1,5 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::collections::HashSet;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
@@ -31,8 +30,10 @@ use mcp_types::CallToolResult;
use serde::Deserialize;
use serde::Serialize;
use serde_json;
use serde_json::Value;
use tokio::sync::Mutex;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::oneshot;
use tokio::task::AbortHandle;
use tracing::debug;
@@ -51,7 +52,6 @@ use crate::client::ModelClient;
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
use crate::config::Config;
use crate::config_types::ShellEnvironmentPolicy;
use crate::conversation_history::ConversationHistory;
use crate::environment_context::EnvironmentContext;
use crate::error::CodexErr;
@@ -119,6 +119,8 @@ use crate::safety::SafetyCheck;
use crate::safety::assess_command_safety;
use crate::safety::assess_safety_for_untrusted_command;
use crate::shell;
use crate::state::SessionState;
use crate::state::TurnState;
use crate::turn_diff_tracker::TurnDiffTracker;
use crate::unified_exec::UnifiedExecSessionManager;
use crate::user_instructions::UserInstructions;
@@ -134,6 +136,10 @@ use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::models::ShellToolCallParams;
use codex_protocol::protocol::InitialHistory;
use codex_utils_readiness::Readiness;
use codex_utils_readiness::ReadinessFlag;
pub(crate) use crate::state::turn::TurnContext;
pub mod compact;
use self::compact::build_compacted_history;
@@ -145,6 +151,7 @@ pub struct Codex {
next_id: AtomicU64,
tx_sub: Sender<Submission>,
rx_event: Receiver<Event>,
turn_readiness_tx: UnboundedSender<Arc<ReadinessFlag>>,
}
/// Wrapper returned by [`Codex::spawn`] containing the spawned [`Codex`],
@@ -174,6 +181,7 @@ impl Codex {
) -> CodexResult<CodexSpawnOk> {
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_event, rx_event) = async_channel::unbounded();
let (turn_readiness_tx, turn_readiness_rx) = unbounded_channel();
let user_instructions = get_user_instructions(&config).await;
@@ -208,11 +216,18 @@ impl Codex {
let conversation_id = session.conversation_id;
// This task will run until Op::Shutdown is received.
tokio::spawn(submission_loop(session, turn_context, config, rx_sub));
tokio::spawn(submission_loop(
session,
turn_context,
config,
rx_sub,
turn_readiness_rx,
));
let codex = Codex {
next_id: AtomicU64::new(0),
tx_sub,
rx_event,
turn_readiness_tx,
};
Ok(CodexSpawnOk {
@@ -221,6 +236,10 @@ impl Codex {
})
}
pub(crate) fn turn_readiness_sender(&self) -> UnboundedSender<Arc<ReadinessFlag>> {
self.turn_readiness_tx.clone()
}
/// Submit the `op` wrapped in a `Submission` with a unique ID.
pub async fn submit(&self, op: Op) -> CodexResult<String> {
let id = self
@@ -252,18 +271,6 @@ impl Codex {
}
}
/// Mutable state of the agent
#[derive(Default)]
struct State {
approved_commands: HashSet<Vec<String>>,
current_task: Option<AgentTask>,
pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
pending_input: Vec<ResponseInputItem>,
history: ConversationHistory,
token_info: Option<TokenUsageInfo>,
latest_rate_limits: Option<RateLimitSnapshot>,
}
/// Context for an initialized model agent
///
/// A session has at most 1 running task at a time, and can be interrupted by user input.
@@ -281,39 +288,15 @@ pub(crate) struct Session {
/// Optional rollout recorder for persisting the conversation transcript so
/// sessions can be replayed or inspected later.
rollout: Mutex<Option<RolloutRecorder>>,
state: Mutex<State>,
state: Mutex<SessionState>,
current_task: Mutex<Option<AgentTask>>,
current_turn: Mutex<Option<Arc<TurnState>>>,
codex_linux_sandbox_exe: Option<PathBuf>,
user_shell: shell::Shell,
show_raw_agent_reasoning: bool,
next_internal_sub_id: AtomicU64,
}
/// The context needed for a single turn of the conversation.
#[derive(Debug)]
pub(crate) struct TurnContext {
pub(crate) client: ModelClient,
/// The session's current working directory. All relative paths provided by
/// the model as well as sandbox policies are resolved against this path
/// instead of `std::env::current_dir()`.
pub(crate) cwd: PathBuf,
pub(crate) base_instructions: Option<String>,
pub(crate) user_instructions: Option<String>,
pub(crate) approval_policy: AskForApproval,
pub(crate) sandbox_policy: SandboxPolicy,
pub(crate) shell_environment_policy: ShellEnvironmentPolicy,
pub(crate) tools_config: ToolsConfig,
pub(crate) is_review_mode: bool,
pub(crate) final_output_json_schema: Option<Value>,
}
impl TurnContext {
fn resolve_path(&self, path: Option<String>) -> PathBuf {
path.as_ref()
.map(PathBuf::from)
.map_or_else(|| self.cwd.clone(), |p| self.cwd.join(p))
}
}
/// Configure the model session.
struct ConfigureSession {
/// Provider identifier ("openai", "openrouter", ...).
@@ -411,11 +394,6 @@ impl Session {
anyhow::anyhow!("failed to initialize rollout recorder: {e:#}")
})?;
let rollout_path = rollout_recorder.rollout_path.clone();
// Create the mutable state for the Session.
let state = State {
history: ConversationHistory::new(),
..Default::default()
};
// Handle MCP manager result and record any startup failures.
let (mcp_connection_manager, failed_clients) = match mcp_res {
@@ -473,6 +451,7 @@ impl Session {
is_review_mode: false,
final_output_json_schema: None,
};
let state = SessionState::default();
let sess = Arc::new(Session {
conversation_id,
tx_event: tx_event.clone(),
@@ -481,6 +460,8 @@ impl Session {
unified_exec_manager: UnifiedExecSessionManager::default(),
notifier: notify,
state: Mutex::new(state),
current_task: Mutex::new(None),
current_turn: Mutex::new(None),
rollout: Mutex::new(Some(rollout_recorder)),
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
user_shell: default_shell,
@@ -514,20 +495,28 @@ impl Session {
Ok((sess, turn_context))
}
pub async fn set_task(&self, task: AgentTask) {
let mut state = self.state.lock().await;
if let Some(current_task) = state.current_task.take() {
current_task.abort(TurnAbortReason::Replaced);
pub async fn set_task(&self, task: AgentTask, turn_state: Option<Arc<TurnState>>) {
let mut current_task = self.current_task.lock().await;
if let Some(existing) = current_task.take() {
existing.abort(TurnAbortReason::Replaced);
}
state.current_task = Some(task);
*current_task = Some(task);
drop(current_task);
let mut current_turn = self.current_turn.lock().await;
*current_turn = turn_state;
}
pub async fn remove_task(&self, sub_id: &str) {
let mut state = self.state.lock().await;
if let Some(task) = &state.current_task
&& task.sub_id == sub_id
{
state.current_task.take();
let mut current_task = self.current_task.lock().await;
let should_clear = matches!(current_task.as_ref(), Some(task) if task.sub_id == sub_id);
if should_clear {
current_task.take();
}
drop(current_task);
if should_clear {
let mut current_turn = self.current_turn.lock().await;
current_turn.take();
}
}
@@ -991,25 +980,33 @@ impl Session {
[history, extra].concat()
}
/// Returns the input if there was no task running to inject into
pub async fn inject_input(&self, input: Vec<InputItem>) -> Result<(), Vec<InputItem>> {
pub async fn queue_turn_readiness(&self, flag: Arc<ReadinessFlag>) {
let mut state = self.state.lock().await;
if state.current_task.is_some() {
state.pending_input.push(input.into());
Ok(())
} else {
Err(input)
}
state.push_readiness(flag);
}
pub async fn get_pending_input(&self) -> Vec<ResponseInputItem> {
pub async fn next_turn_readiness(&self) -> Option<Arc<ReadinessFlag>> {
let mut state = self.state.lock().await;
if state.pending_input.is_empty() {
Vec::with_capacity(0)
state.next_readiness()
}
/// Returns the input if there was no task running to inject into
pub async fn inject_input(
&self,
input: Vec<InputItem>,
readiness: Option<Arc<ReadinessFlag>>,
) -> Result<(), (Vec<InputItem>, Option<Arc<ReadinessFlag>>)> {
let current_turn = {
let guard = self.current_turn.lock().await;
guard.clone()
};
if let Some(turn_state) = current_turn {
turn_state
.enqueue_user_input(input.clone(), readiness.clone())
.await;
Ok(())
} else {
let mut ret = Vec::new();
std::mem::swap(&mut ret, &mut state.pending_input);
ret
Err((input, readiness))
}
}
@@ -1026,10 +1023,23 @@ impl Session {
pub async fn interrupt_task(&self) {
info!("interrupt received: abort current task, if any");
let mut state = self.state.lock().await;
state.pending_approvals.clear();
state.pending_input.clear();
if let Some(task) = state.current_task.take() {
{
let mut state = self.state.lock().await;
state.pending_approvals.clear();
state.clear_readiness();
}
let task = {
let mut current_task = self.current_task.lock().await;
current_task.take()
};
{
let mut current_turn = self.current_turn.lock().await;
current_turn.take();
}
if let Some(task) = task {
task.abort(TurnAbortReason::Interrupted);
}
}
@@ -1037,10 +1047,15 @@ impl Session {
fn interrupt_task_sync(&self) {
if let Ok(mut state) = self.state.try_lock() {
state.pending_approvals.clear();
state.pending_input.clear();
if let Some(task) = state.current_task.take() {
task.abort(TurnAbortReason::Interrupted);
}
state.clear_readiness();
}
if let Ok(mut current_turn) = self.current_turn.try_lock() {
current_turn.take();
}
if let Ok(mut current_task) = self.current_task.try_lock()
&& let Some(task) = current_task.take()
{
task.abort(TurnAbortReason::Interrupted);
}
}
@@ -1086,17 +1101,12 @@ pub(crate) struct AgentTask {
}
impl AgentTask {
fn spawn(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
) -> Self {
fn spawn(sess: Arc<Session>, turn_state: Arc<TurnState>) -> Self {
let sub_id = turn_state.sub_id().to_string();
let handle = {
let sess = sess.clone();
let sub_id = sub_id.clone();
let tc = Arc::clone(&turn_context);
tokio::spawn(async move { run_task(sess, tc, sub_id, input).await }).abort_handle()
let turn_state = Arc::clone(&turn_state);
tokio::spawn(async move { run_task(sess, turn_state).await }).abort_handle()
};
Self {
sess,
@@ -1106,17 +1116,12 @@ impl AgentTask {
}
}
fn review(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
) -> Self {
fn review(sess: Arc<Session>, turn_state: Arc<TurnState>) -> Self {
let sub_id = turn_state.sub_id().to_string();
let handle = {
let sess = sess.clone();
let sub_id = sub_id.clone();
let tc = Arc::clone(&turn_context);
tokio::spawn(async move { run_task(sess, tc, sub_id, input).await }).abort_handle()
let turn_state = Arc::clone(&turn_state);
tokio::spawn(async move { run_task(sess, turn_state).await }).abort_handle()
};
Self {
sess,
@@ -1171,12 +1176,16 @@ async fn submission_loop(
turn_context: TurnContext,
config: Arc<Config>,
rx_sub: Receiver<Submission>,
mut turn_readiness_rx: UnboundedReceiver<Arc<ReadinessFlag>>,
) {
// Wrap once to avoid cloning TurnContext for each task.
let mut turn_context = Arc::new(turn_context);
// To break out of this loop, send Op::Shutdown.
while let Ok(sub) = rx_sub.recv().await {
debug!(?sub, "Submission");
while let Ok(flag) = turn_readiness_rx.try_recv() {
sess.queue_turn_readiness(flag).await;
}
match sub.op {
Op::Interrupt => {
sess.interrupt_task().await;
@@ -1270,12 +1279,22 @@ async fn submission_loop(
}
}
Op::UserInput { items } => {
// attempt to inject input into current task
if let Err(items) = sess.inject_input(items).await {
// no current task, spawn a new one
let task =
AgentTask::spawn(sess.clone(), Arc::clone(&turn_context), sub.id, items);
sess.set_task(task).await;
let readiness = match sess.next_turn_readiness().await {
Some(flag) => Some(flag),
None => {
warn!("missing readiness flag for user input");
None
}
};
if let Err((items, readiness)) = sess.inject_input(items, readiness.clone()).await {
let turn_state = Arc::new(TurnState::new(
sub.id.clone(),
Arc::clone(&turn_context),
items,
readiness.clone(),
));
let task = AgentTask::spawn(sess.clone(), Arc::clone(&turn_state));
sess.set_task(task, Some(turn_state)).await;
}
}
Op::UserTurn {
@@ -1288,8 +1307,14 @@ async fn submission_loop(
summary,
final_output_json_schema,
} => {
// attempt to inject input into current task
if let Err(items) = sess.inject_input(items).await {
let readiness = match sess.next_turn_readiness().await {
Some(flag) => Some(flag),
None => {
warn!("missing readiness flag for user input");
None
}
};
if let Err((items, readiness)) = sess.inject_input(items, readiness.clone()).await {
// Derive a fresh TurnContext for this turn using the provided overrides.
let provider = turn_context.client.get_provider();
let auth_manager = turn_context.client.get_auth_manager();
@@ -1352,9 +1377,14 @@ async fn submission_loop(
turn_context = Arc::new(fresh_turn_context);
// no current task, spawn a new one with the perturn context
let task =
AgentTask::spawn(sess.clone(), Arc::clone(&turn_context), sub.id, items);
sess.set_task(task).await;
let turn_state = Arc::new(TurnState::new(
sub.id.clone(),
Arc::clone(&turn_context),
items,
readiness.clone(),
));
let task = AgentTask::spawn(sess.clone(), Arc::clone(&turn_state));
sess.set_task(task, Some(turn_state)).await;
}
}
Op::ExecApproval { id, decision } => match decision {
@@ -1446,10 +1476,13 @@ async fn submission_loop(
}
Op::Compact => {
// Attempt to inject input into current task
if let Err(items) = sess
.inject_input(vec![InputItem::Text {
text: compact::SUMMARIZATION_PROMPT.to_string(),
}])
if let Err((items, _)) = sess
.inject_input(
vec![InputItem::Text {
text: compact::SUMMARIZATION_PROMPT.to_string(),
}],
None,
)
.await
{
compact::spawn_compact_task(
@@ -1600,11 +1633,12 @@ async fn spawn_review_thread(
text: format!("{base_instructions}\n\n---\n\nNow, here's your task: {review_prompt}"),
}];
let tc = Arc::new(review_turn_context);
let turn_state = Arc::new(TurnState::new(sub_id.clone(), Arc::clone(&tc), input, None));
// Clone sub_id for the upcoming announcement before moving it into the task.
let sub_id_for_event = sub_id.clone();
let task = AgentTask::review(sess.clone(), tc.clone(), sub_id, input);
sess.set_task(task).await;
let task = AgentTask::review(sess.clone(), Arc::clone(&turn_state));
sess.set_task(task, Some(turn_state)).await;
// Announce entering review mode so UIs can switch modes.
sess.send_event(Event {
@@ -1631,15 +1665,12 @@ async fn spawn_review_thread(
/// Review mode: when `turn_context.is_review_mode` is true, the turn runs in an
/// isolated in-memory thread without the parent session's prior history or
/// user_instructions. Emits ExitedReviewMode upon final review message.
async fn run_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
sub_id: String,
input: Vec<InputItem>,
) {
if input.is_empty() {
async fn run_task(sess: Arc<Session>, turn_state: Arc<TurnState>) {
let turn_context = turn_state.turn_context();
let sub_id = turn_state.sub_id().to_string();
let Some(initial_input_for_turn) = turn_state.initial_input() else {
return;
}
};
let event = Event {
id: sub_id.clone(),
msg: EventMsg::TaskStarted(TaskStartedEvent {
@@ -1648,16 +1679,16 @@ async fn run_task(
};
sess.send_event(event).await;
let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input);
// For review threads, keep an isolated in-memory history so the
// model sees a fresh conversation without the parent session's history.
// For normal turns, continue recording to the session history as before.
let is_review_mode = turn_context.is_review_mode;
let mut review_thread_history: Vec<ResponseItem> = Vec::new();
let mut current_turn_readiness = turn_state.initial_readiness();
if is_review_mode {
// Seed review threads with environment context so the model knows the working directory.
review_thread_history.extend(sess.build_initial_context(turn_context.as_ref()));
review_thread_history.push(initial_input_for_turn.into());
review_thread_history.push(initial_input_for_turn.clone().into());
} else {
sess.record_input_and_rollout_usermsg(&initial_input_for_turn)
.await;
@@ -1670,15 +1701,14 @@ async fn run_task(
let mut auto_compact_recently_attempted = false;
loop {
// Note that pending_input would be something like a message the user
// submitted through the UI while the model was running. Though the UI
// may support this, the model might not.
let pending_input = sess
.get_pending_input()
.await
.into_iter()
.map(ResponseItem::from)
.collect::<Vec<ResponseItem>>();
// Note that pending_input represents follow-up messages submitted while the
// model is running. These are queued on the TurnState mailbox.
let (pending_input, maybe_new_readiness) = turn_state
.drain_mailbox(current_turn_readiness.clone())
.await;
if let Some(readiness) = maybe_new_readiness {
current_turn_readiness = Some(readiness);
}
// Construct the input that we will send to the model.
//
@@ -1692,7 +1722,7 @@ async fn run_task(
// represents an append-only log without duplicates.
let turn_input: Vec<ResponseItem> = if is_review_mode {
if !pending_input.is_empty() {
review_thread_history.extend(pending_input);
review_thread_history.extend(pending_input.clone());
}
review_thread_history.clone()
} else {
@@ -1719,6 +1749,7 @@ async fn run_task(
&mut turn_diff_tracker,
sub_id.clone(),
turn_input,
current_turn_readiness.clone(),
)
.await
{
@@ -1949,6 +1980,7 @@ async fn run_turn(
turn_diff_tracker: &mut TurnDiffTracker,
sub_id: String,
input: Vec<ResponseItem>,
turn_readiness: Option<Arc<ReadinessFlag>>,
) -> CodexResult<TurnRunResult> {
let tools = get_openai_tools(
&turn_context.tools_config,
@@ -1964,7 +1996,16 @@ async fn run_turn(
let mut retries = 0;
loop {
match try_run_turn(sess, turn_context, turn_diff_tracker, &sub_id, &prompt).await {
match try_run_turn(
sess,
turn_context,
turn_diff_tracker,
&sub_id,
&prompt,
turn_readiness.clone(),
)
.await
{
Ok(output) => return Ok(output),
Err(CodexErr::Interrupted) => return Err(CodexErr::Interrupted),
Err(CodexErr::EnvVar(var)) => return Err(CodexErr::EnvVar(var)),
@@ -2031,6 +2072,7 @@ async fn try_run_turn(
turn_diff_tracker: &mut TurnDiffTracker,
sub_id: &str,
prompt: &Prompt,
turn_readiness: Option<Arc<ReadinessFlag>>,
) -> CodexResult<TurnRunResult> {
// call_ids that are part of this response.
let completed_call_ids = prompt
@@ -2132,6 +2174,7 @@ async fn try_run_turn(
turn_diff_tracker,
sub_id,
item.clone(),
turn_readiness.clone(),
)
.await?;
output.push(ProcessedResponseItem { item, response });
@@ -2222,6 +2265,7 @@ async fn handle_response_item(
turn_diff_tracker: &mut TurnDiffTracker,
sub_id: &str,
item: ResponseItem,
turn_readiness: Option<Arc<ReadinessFlag>>,
) -> CodexResult<Option<ResponseInputItem>> {
debug!(?item, "Output item");
let output = match item {
@@ -2231,6 +2275,9 @@ async fn handle_response_item(
call_id,
..
} => {
if let Some(flag) = turn_readiness.as_ref() {
flag.wait_ready().await;
}
info!("FunctionCall: {name}({arguments})");
Some(
handle_function_call(
@@ -2484,7 +2531,7 @@ async fn handle_function_call(
};
let abs = turn_context.resolve_path(Some(args.path));
let output = match sess
.inject_input(vec![InputItem::LocalImage { path: abs }])
.inject_input(vec![InputItem::LocalImage { path: abs }], None)
.await
{
Ok(()) => FunctionCallOutputPayload {
@@ -3631,10 +3678,9 @@ mod tests {
unified_exec_manager: UnifiedExecSessionManager::default(),
notifier: UserNotifier::default(),
rollout: Mutex::new(None),
state: Mutex::new(State {
history: ConversationHistory::new(),
..Default::default()
}),
state: Mutex::new(SessionState::default()),
current_task: Mutex::new(None),
current_turn: Mutex::new(None),
codex_linux_sandbox_exe: None,
user_shell: shell::Shell::Unknown,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,

View File

@@ -44,7 +44,7 @@ pub(super) async fn spawn_compact_task(
input: Vec<InputItem>,
) {
let task = AgentTask::compact(sess.clone(), turn_context, sub_id, input);
sess.set_task(task).await;
sess.set_task(task, None).await;
}
pub(super) async fn run_inline_auto_compact_task(

View File

@@ -3,6 +3,9 @@ use crate::error::Result as CodexResult;
use crate::protocol::Event;
use crate::protocol::Op;
use crate::protocol::Submission;
use codex_utils_readiness::ReadinessFlag;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedSender;
pub struct CodexConversation {
codex: Codex,
@@ -27,4 +30,8 @@ impl CodexConversation {
pub async fn next_event(&self) -> CodexResult<Event> {
self.codex.next_event().await
}
pub fn turn_readiness_sender(&self) -> UnboundedSender<Arc<ReadinessFlag>> {
self.codex.turn_readiness_sender()
}
}

View File

@@ -13,6 +13,7 @@ mod client;
mod client_common;
pub mod codex;
mod codex_conversation;
mod state;
pub mod token_data;
pub use codex_conversation::CodexConversation;
pub mod config;

View File

@@ -39,6 +39,7 @@ codex-git-tooling = { workspace = true }
codex-login = { workspace = true }
codex-ollama = { workspace = true }
codex-protocol = { workspace = true }
codex-utils-readiness = { workspace = true }
color-eyre = { workspace = true }
crossterm = { workspace = true, features = [
"bracketed-paste",

View File

@@ -109,6 +109,7 @@ use codex_git_tooling::GhostCommit;
use codex_git_tooling::GitToolingError;
use codex_git_tooling::create_ghost_commit;
use codex_git_tooling::restore_ghost_commit;
use codex_utils_readiness::ReadinessFlag;
const MAX_TRACKED_GHOST_COMMITS: usize = 20;
@@ -182,6 +183,7 @@ pub(crate) struct ChatWidgetInit {
pub(crate) struct ChatWidget {
app_event_tx: AppEventSender,
codex_op_tx: UnboundedSender<Op>,
turn_readiness: UnboundedSender<Arc<ReadinessFlag>>,
bottom_pane: BottomPane,
active_exec_cell: Option<ExecCell>,
config: Config,
@@ -772,12 +774,14 @@ impl ChatWidget {
} = common;
let mut rng = rand::rng();
let placeholder = EXAMPLE_PROMPTS[rng.random_range(0..EXAMPLE_PROMPTS.len())].to_string();
let codex_op_tx = spawn_agent(config.clone(), app_event_tx.clone(), conversation_manager);
let agent_channels =
spawn_agent(config.clone(), app_event_tx.clone(), conversation_manager);
Self {
app_event_tx: app_event_tx.clone(),
frame_requester: frame_requester.clone(),
codex_op_tx,
codex_op_tx: agent_channels.op_tx,
turn_readiness: agent_channels.turn_readiness,
bottom_pane: BottomPane::new(BottomPaneParams {
frame_requester,
app_event_tx,
@@ -832,13 +836,14 @@ impl ChatWidget {
let mut rng = rand::rng();
let placeholder = EXAMPLE_PROMPTS[rng.random_range(0..EXAMPLE_PROMPTS.len())].to_string();
let codex_op_tx =
let agent_channels =
spawn_agent_from_existing(conversation, session_configured, app_event_tx.clone());
Self {
app_event_tx: app_event_tx.clone(),
frame_requester: frame_requester.clone(),
codex_op_tx,
codex_op_tx: agent_channels.op_tx,
turn_readiness: agent_channels.turn_readiness,
bottom_pane: BottomPane::new(BottomPaneParams {
frame_requester,
app_event_tx,
@@ -1121,6 +1126,9 @@ impl ChatWidget {
return;
}
let readiness_flag = Arc::new(ReadinessFlag::new());
agent::send_turn_readiness(&self.turn_readiness, Arc::clone(&readiness_flag));
self.capture_ghost_snapshot();
let mut items: Vec<InputItem> = Vec::new();

View File

@@ -5,20 +5,58 @@ use codex_core::ConversationManager;
use codex_core::NewConversation;
use codex_core::config::Config;
use codex_core::protocol::Op;
use codex_utils_readiness::Readiness;
use codex_utils_readiness::ReadinessFlag;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::unbounded_channel;
use crate::app_event::AppEvent;
use crate::app_event_sender::AppEventSender;
pub(crate) struct AgentChannels {
pub(crate) op_tx: UnboundedSender<Op>,
pub(crate) turn_readiness: UnboundedSender<Arc<ReadinessFlag>>,
}
fn mark_ready(flag: Arc<ReadinessFlag>) {
tokio::spawn(async move {
if let Ok(token) = flag.subscribe().await {
let _ = flag.mark_ready(token).await;
}
});
}
fn spawn_readiness_forwarder(
mut rx: UnboundedReceiver<Arc<ReadinessFlag>>,
sender: UnboundedSender<Arc<ReadinessFlag>>,
) {
tokio::spawn(async move {
while let Some(flag) = rx.recv().await {
if sender.send(Arc::clone(&flag)).is_err() {
mark_ready(flag);
}
}
});
}
pub(crate) fn send_turn_readiness(
sender: &UnboundedSender<Arc<ReadinessFlag>>,
flag: Arc<ReadinessFlag>,
) {
if sender.send(Arc::clone(&flag)).is_err() {
mark_ready(flag);
}
}
/// Spawn the agent bootstrapper and op forwarding loop, returning the
/// `UnboundedSender<Op>` used by the UI to submit operations.
/// channels used by the UI to submit operations and register turn readiness.
pub(crate) fn spawn_agent(
config: Config,
app_event_tx: AppEventSender,
server: Arc<ConversationManager>,
) -> UnboundedSender<Op> {
) -> AgentChannels {
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
let (turn_readiness_tx, turn_readiness_rx) = unbounded_channel::<Arc<ReadinessFlag>>();
let app_event_tx_clone = app_event_tx;
tokio::spawn(async move {
@@ -35,6 +73,9 @@ pub(crate) fn spawn_agent(
}
};
let readiness_sender = conversation.turn_readiness_sender();
spawn_readiness_forwarder(turn_readiness_rx, readiness_sender);
// Forward the captured `SessionConfigured` event so it can be rendered in the UI.
let ev = codex_core::protocol::Event {
// The `id` does not matter for rendering, so we can use a fake value.
@@ -58,7 +99,10 @@ pub(crate) fn spawn_agent(
}
});
codex_op_tx
AgentChannels {
op_tx: codex_op_tx,
turn_readiness: turn_readiness_tx,
}
}
/// Spawn agent loops for an existing conversation (e.g., a forked conversation).
@@ -68,8 +112,10 @@ pub(crate) fn spawn_agent_from_existing(
conversation: std::sync::Arc<CodexConversation>,
session_configured: codex_core::protocol::SessionConfiguredEvent,
app_event_tx: AppEventSender,
) -> UnboundedSender<Op> {
) -> AgentChannels {
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
let (turn_readiness_tx, turn_readiness_rx) = unbounded_channel::<Arc<ReadinessFlag>>();
spawn_readiness_forwarder(turn_readiness_rx, conversation.turn_readiness_sender());
let app_event_tx_clone = app_event_tx;
tokio::spawn(async move {
@@ -95,5 +141,8 @@ pub(crate) fn spawn_agent_from_existing(
}
});
codex_op_tx
AgentChannels {
op_tx: codex_op_tx,
turn_readiness: turn_readiness_tx,
}
}

View File

@@ -34,6 +34,7 @@ use codex_core::protocol::StreamErrorEvent;
use codex_core::protocol::TaskCompleteEvent;
use codex_core::protocol::TaskStartedEvent;
use codex_protocol::mcp_protocol::ConversationId;
use codex_utils_readiness::ReadinessFlag;
use crossterm::event::KeyCode;
use crossterm::event::KeyEvent;
use crossterm::event::KeyModifiers;
@@ -43,6 +44,7 @@ use std::fs::File;
use std::io::BufRead;
use std::io::BufReader;
use std::path::PathBuf;
use std::sync::Arc;
use tempfile::NamedTempFile;
use tokio::sync::mpsc::unbounded_channel;
@@ -310,9 +312,11 @@ fn make_chatwidget_manual() -> (
disable_paste_burst: false,
});
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("test"));
let (turn_readiness, _rx) = unbounded_channel::<Arc<ReadinessFlag>>();
let widget = ChatWidget {
app_event_tx,
codex_op_tx: op_tx,
turn_readiness,
bottom_pane: bottom,
active_exec_cell: None,
config: cfg.clone(),