feat: codex exec auto-subscribe to new threads (#9821)

This commit is contained in:
jif-oai
2026-01-28 13:03:20 +00:00
committed by GitHub
parent 71b8d937ed
commit dabafe204a
4 changed files with 183 additions and 43 deletions

View File

@@ -244,3 +244,37 @@ pub enum Color {
#[default]
Auto,
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn resume_parses_prompt_after_global_flags() {
const PROMPT: &str = "echo resume-with-global-flags-after-subcommand";
let cli = Cli::parse_from([
"codex-exec",
"resume",
"--last",
"--json",
"--model",
"gpt-5.2-codex",
"--dangerously-bypass-approvals-and-sandbox",
"--skip-git-repo-check",
PROMPT,
]);
let Some(Command::Resume(args)) = cli.command else {
panic!("expected resume command");
};
let effective_prompt = args.prompt.clone().or_else(|| {
if args.last {
args.session_id.clone()
} else {
None
}
});
assert_eq!(effective_prompt.as_deref(), Some(PROMPT));
}
}

View File

@@ -849,16 +849,17 @@ impl EventProcessor for EventProcessorWithJsonOutput {
let protocol::Event { msg, .. } = event;
if let protocol::EventMsg::TurnComplete(protocol::TurnCompleteEvent {
last_agent_message,
}) = msg
{
if let Some(output_file) = self.last_message_path.as_deref() {
handle_last_message(last_agent_message.as_deref(), output_file);
match msg {
protocol::EventMsg::TurnComplete(protocol::TurnCompleteEvent {
last_agent_message,
}) => {
if let Some(output_file) = self.last_message_path.as_deref() {
handle_last_message(last_agent_message.as_deref(), output_file);
}
CodexStatus::InitiateShutdown
}
CodexStatus::InitiateShutdown
} else {
CodexStatus::Running
protocol::EventMsg::ShutdownComplete => CodexStatus::Shutdown,
_ => CodexStatus::Running,
}
}
}

View File

@@ -46,13 +46,17 @@ use codex_utils_absolute_path::AbsolutePathBuf;
use event_processor_with_human_output::EventProcessorWithHumanOutput;
use event_processor_with_jsonl_output::EventProcessorWithJsonOutput;
use serde_json::Value;
use std::collections::HashSet;
use std::io::IsTerminal;
use std::io::Read;
use std::path::PathBuf;
use std::sync::Arc;
use supports_color::Stream;
use tokio::sync::Mutex;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::prelude::*;
@@ -72,6 +76,13 @@ enum InitialOperation {
},
}
#[derive(Clone)]
struct ThreadEventEnvelope {
thread_id: codex_protocol::ThreadId,
thread: Arc<codex_core::CodexThread>,
event: Event,
}
pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()> {
if let Err(err) = set_default_originator("codex_exec".to_string()) {
tracing::warn!(?err, "Failed to set codex exec originator override {err:?}");
@@ -326,11 +337,11 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
true,
config.cli_auth_credentials_store_mode,
);
let thread_manager = ThreadManager::new(
let thread_manager = Arc::new(ThreadManager::new(
config.codex_home.clone(),
auth_manager.clone(),
SessionSource::Exec,
);
));
let default_model = thread_manager
.get_models_manager()
.get_default_model(&config.model, &config, RefreshStrategy::OnlineIfUncached)
@@ -338,7 +349,7 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
// Handle resume subcommand by resolving a rollout path and using explicit resume API.
let NewThread {
thread_id: _,
thread_id: primary_thread_id,
thread,
session_configured,
} = if let Some(ExecCommand::Resume(args)) = command.as_ref() {
@@ -420,40 +431,47 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
info!("Codex initialized with event: {session_configured:?}");
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<ThreadEventEnvelope>();
let attached_threads = Arc::new(Mutex::new(HashSet::from([primary_thread_id])));
spawn_thread_listener(primary_thread_id, thread.clone(), tx.clone());
{
let thread = thread.clone();
tokio::spawn(async move {
if tokio::signal::ctrl_c().await.is_ok() {
tracing::debug!("Keyboard interrupt");
// Immediately notify Codex to abort any in-flight task.
thread.submit(Op::Interrupt).await.ok();
}
});
}
{
let thread_manager = Arc::clone(&thread_manager);
let attached_threads = Arc::clone(&attached_threads);
let tx = tx.clone();
let mut thread_created_rx = thread_manager.subscribe_thread_created();
tokio::spawn(async move {
loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
tracing::debug!("Keyboard interrupt");
// Immediately notify Codex to abort any inflight task.
thread.submit(Op::Interrupt).await.ok();
// Exit the inner loop and return to the main input prompt. The codex
// will emit a `TurnInterrupted` (Error) event which is drained later.
break;
}
res = thread.next_event() => match res {
Ok(event) => {
debug!("Received event: {event:?}");
let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
if let Err(e) = tx.send(event) {
error!("Error sending event: {e:?}");
break;
match thread_created_rx.recv().await {
Ok(thread_id) => {
if attached_threads.lock().await.contains(&thread_id) {
continue;
}
match thread_manager.get_thread(thread_id).await {
Ok(thread) => {
attached_threads.lock().await.insert(thread_id);
spawn_thread_listener(thread_id, thread, tx.clone());
}
if is_shutdown_complete {
info!("Received shutdown event, exiting event loop.");
break;
Err(err) => {
warn!("failed to attach listener for thread {thread_id}: {err}")
}
},
Err(e) => {
error!("Error receiving event: {e:?}");
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
warn!("thread_created receiver lagged; skipping resync");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
@@ -492,7 +510,12 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
// Track whether a fatal error was reported by the server so we can
// exit with a non-zero status for automation-friendly signaling.
let mut error_seen = false;
while let Some(event) = rx.recv().await {
while let Some(envelope) = rx.recv().await {
let ThreadEventEnvelope {
thread_id,
thread,
event,
} = envelope;
if let EventMsg::ElicitationRequest(ev) = &event.msg {
// Automatically cancel elicitation requests in exec mode.
thread
@@ -506,15 +529,20 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
if matches!(event.msg, EventMsg::Error(_)) {
error_seen = true;
}
let shutdown: CodexStatus = event_processor.process_event(event);
if thread_id != primary_thread_id && matches!(&event.msg, EventMsg::TurnComplete(_)) {
continue;
}
let shutdown = event_processor.process_event(event);
if thread_id != primary_thread_id && matches!(shutdown, CodexStatus::InitiateShutdown) {
continue;
}
match shutdown {
CodexStatus::Running => continue,
CodexStatus::InitiateShutdown => {
thread.submit(Op::Shutdown).await?;
}
CodexStatus::Shutdown => {
break;
}
CodexStatus::Shutdown if thread_id == primary_thread_id => break,
CodexStatus::Shutdown => continue,
}
}
event_processor.print_final_output();
@@ -525,6 +553,42 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
Ok(())
}
fn spawn_thread_listener(
thread_id: codex_protocol::ThreadId,
thread: Arc<codex_core::CodexThread>,
tx: tokio::sync::mpsc::UnboundedSender<ThreadEventEnvelope>,
) {
tokio::spawn(async move {
loop {
match thread.next_event().await {
Ok(event) => {
debug!("Received event: {event:?}");
let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
if let Err(err) = tx.send(ThreadEventEnvelope {
thread_id,
thread: Arc::clone(&thread),
event,
}) {
error!("Error sending event: {err:?}");
break;
}
if is_shutdown_complete {
info!(
"Received shutdown event for thread {thread_id}, exiting event loop."
);
break;
}
}
Err(err) => {
error!("Error receiving event: {err:?}");
break;
}
}
}
});
}
async fn resolve_resume_path(
config: &Config,
args: &crate::cli::ResumeArgs,

View File

@@ -38,3 +38,44 @@ fn main() -> anyhow::Result<()> {
Ok(())
})
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn top_cli_parses_resume_prompt_after_config_flag() {
const PROMPT: &str = "echo resume-with-global-flags-after-subcommand";
let cli = TopCli::parse_from([
"codex-exec",
"resume",
"--last",
"--json",
"--model",
"gpt-5.2-codex",
"--config",
"reasoning_level=xhigh",
"--dangerously-bypass-approvals-and-sandbox",
"--skip-git-repo-check",
PROMPT,
]);
let Some(codex_exec::Command::Resume(args)) = cli.inner.command else {
panic!("expected resume command");
};
let effective_prompt = args.prompt.clone().or_else(|| {
if args.last {
args.session_id.clone()
} else {
None
}
});
assert_eq!(effective_prompt.as_deref(), Some(PROMPT));
assert_eq!(cli.config_overrides.raw_overrides.len(), 1);
assert_eq!(
cli.config_overrides.raw_overrides[0],
"reasoning_level=xhigh"
);
}
}