mirror of
https://github.com/openai/codex.git
synced 2026-04-26 15:45:02 +00:00
376 lines
13 KiB
Rust
376 lines
13 KiB
Rust
mod cli;
|
||
mod event_processor;
|
||
mod event_processor_with_human_output;
|
||
mod event_processor_with_json_output;
|
||
|
||
use std::io::IsTerminal;
|
||
use std::io::Read;
|
||
use std::path::PathBuf;
|
||
|
||
pub use cli::Cli;
|
||
use codex_core::AuthManager;
|
||
use codex_core::BUILT_IN_OSS_MODEL_PROVIDER_ID;
|
||
use codex_core::ConversationManager;
|
||
use codex_core::NewConversation;
|
||
use codex_core::config::Config;
|
||
use codex_core::config::ConfigOverrides;
|
||
use codex_core::git_info::get_git_repo_root;
|
||
use codex_core::protocol::AskForApproval;
|
||
use codex_core::protocol::Event;
|
||
use codex_core::protocol::EventMsg;
|
||
use codex_core::protocol::InputItem;
|
||
use codex_core::protocol::Op;
|
||
use codex_core::protocol::TaskCompleteEvent;
|
||
use codex_ollama::DEFAULT_OSS_MODEL;
|
||
use codex_protocol::config_types::SandboxMode;
|
||
use codex_telemetry as telemetry;
|
||
use event_processor_with_human_output::EventProcessorWithHumanOutput;
|
||
use event_processor_with_json_output::EventProcessorWithJsonOutput;
|
||
use tracing::debug;
|
||
use tracing::error;
|
||
use tracing::info;
|
||
use tracing_subscriber::EnvFilter;
|
||
use tracing_subscriber::prelude::*;
|
||
|
||
use crate::cli::Command as ExecCommand;
|
||
use crate::event_processor::CodexStatus;
|
||
use crate::event_processor::EventProcessor;
|
||
use codex_core::find_conversation_path_by_id_str;
|
||
|
||
pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()> {
|
||
let Cli {
|
||
command,
|
||
images,
|
||
model: model_cli_arg,
|
||
oss,
|
||
config_profile,
|
||
full_auto,
|
||
dangerously_bypass_approvals_and_sandbox,
|
||
cwd,
|
||
skip_git_repo_check,
|
||
color,
|
||
last_message_file,
|
||
json: json_mode,
|
||
sandbox_mode: sandbox_mode_cli_arg,
|
||
prompt,
|
||
config_overrides,
|
||
} = cli;
|
||
|
||
// Determine the prompt source (parent or subcommand) and read from stdin if needed.
|
||
let prompt_arg = match &command {
|
||
// Allow prompt before the subcommand by falling back to the parent-level prompt
|
||
// when the Resume subcommand did not provide its own prompt.
|
||
Some(ExecCommand::Resume(args)) => args.prompt.clone().or(prompt),
|
||
None => prompt,
|
||
};
|
||
|
||
let prompt = match prompt_arg {
|
||
Some(p) if p != "-" => p,
|
||
// Either `-` was passed or no positional arg.
|
||
maybe_dash => {
|
||
// When no arg (None) **and** stdin is a TTY, bail out early – unless the
|
||
// user explicitly forced reading via `-`.
|
||
let force_stdin = matches!(maybe_dash.as_deref(), Some("-"));
|
||
|
||
if std::io::stdin().is_terminal() && !force_stdin {
|
||
eprintln!(
|
||
"No prompt provided. Either specify one as an argument or pipe the prompt into stdin."
|
||
);
|
||
std::process::exit(1);
|
||
}
|
||
|
||
// Ensure the user knows we are waiting on stdin, as they may
|
||
// have gotten into this state by mistake. If so, and they are not
|
||
// writing to stdin, Codex will hang indefinitely, so this should
|
||
// help them debug in that case.
|
||
if !force_stdin {
|
||
eprintln!("Reading prompt from stdin...");
|
||
}
|
||
let mut buffer = String::new();
|
||
if let Err(e) = std::io::stdin().read_to_string(&mut buffer) {
|
||
eprintln!("Failed to read prompt from stdin: {e}");
|
||
std::process::exit(1);
|
||
} else if buffer.trim().is_empty() {
|
||
eprintln!("No prompt provided via stdin.");
|
||
std::process::exit(1);
|
||
}
|
||
buffer
|
||
}
|
||
};
|
||
|
||
let (stdout_with_ansi, stderr_with_ansi) = match color {
|
||
cli::Color::Always => (true, true),
|
||
cli::Color::Never => (false, false),
|
||
cli::Color::Auto => (
|
||
std::io::stdout().is_terminal(),
|
||
std::io::stderr().is_terminal(),
|
||
),
|
||
};
|
||
|
||
// Build fmt layer (existing logging) to compose with OTEL layer.
|
||
let default_level = "error";
|
||
let fmt_layer = tracing_subscriber::fmt::layer()
|
||
.with_ansi(stderr_with_ansi)
|
||
.with_writer(std::io::stderr);
|
||
|
||
let sandbox_mode = if full_auto {
|
||
Some(SandboxMode::WorkspaceWrite)
|
||
} else if dangerously_bypass_approvals_and_sandbox {
|
||
Some(SandboxMode::DangerFullAccess)
|
||
} else {
|
||
sandbox_mode_cli_arg.map(Into::<SandboxMode>::into)
|
||
};
|
||
|
||
// When using `--oss`, let the bootstrapper pick the model (defaulting to
|
||
// gpt-oss:20b) and ensure it is present locally. Also, force the built‑in
|
||
// `oss` model provider.
|
||
let model = if let Some(model) = model_cli_arg {
|
||
Some(model)
|
||
} else if oss {
|
||
Some(DEFAULT_OSS_MODEL.to_owned())
|
||
} else {
|
||
None // No model specified, will use the default.
|
||
};
|
||
|
||
let model_provider = if oss {
|
||
Some(BUILT_IN_OSS_MODEL_PROVIDER_ID.to_string())
|
||
} else {
|
||
None // No specific model provider override.
|
||
};
|
||
|
||
// Load configuration and determine approval policy
|
||
let overrides = ConfigOverrides {
|
||
model,
|
||
review_model: None,
|
||
config_profile,
|
||
// This CLI is intended to be headless and has no affordances for asking
|
||
// the user for approval.
|
||
approval_policy: Some(AskForApproval::Never),
|
||
sandbox_mode,
|
||
cwd: cwd.map(|p| p.canonicalize().unwrap_or(p)),
|
||
model_provider,
|
||
codex_linux_sandbox_exe,
|
||
base_instructions: None,
|
||
include_plan_tool: None,
|
||
include_apply_patch_tool: None,
|
||
include_view_image_tool: None,
|
||
show_raw_agent_reasoning: oss.then_some(true),
|
||
tools_web_search_request: None,
|
||
};
|
||
// Parse `-c` overrides.
|
||
let cli_kv_overrides = match config_overrides.parse_overrides() {
|
||
Ok(v) => v,
|
||
Err(e) => {
|
||
eprintln!("Error parsing -c overrides: {e}");
|
||
std::process::exit(1);
|
||
}
|
||
};
|
||
|
||
let config = Config::load_with_cli_overrides(cli_kv_overrides, overrides)?;
|
||
|
||
// Build OTEL layer and compose into subscriber.
|
||
let telemetry = codex_core::telemetry_init::build_otel_layer_from_config(
|
||
&config,
|
||
"codex",
|
||
env!("CARGO_PKG_VERSION"),
|
||
);
|
||
let _telemetry_guard = if let Some((guard, tracer)) = telemetry {
|
||
let otel_layer = tracing_opentelemetry::OpenTelemetryLayer::new(tracer);
|
||
// Build env_filter separately and attach via with_filter.
|
||
let env_filter = EnvFilter::try_from_default_env()
|
||
.or_else(|_| EnvFilter::try_new(default_level))
|
||
.unwrap_or_else(|_| EnvFilter::new(default_level));
|
||
let _ = tracing_subscriber::registry()
|
||
.with(fmt_layer.with_filter(env_filter))
|
||
.with(otel_layer)
|
||
.try_init();
|
||
Some(guard)
|
||
} else {
|
||
let env_filter = EnvFilter::try_from_default_env()
|
||
.or_else(|_| EnvFilter::try_new(default_level))
|
||
.unwrap_or_else(|_| EnvFilter::new(default_level));
|
||
let _ = tracing_subscriber::registry()
|
||
.with(fmt_layer.with_filter(env_filter))
|
||
.try_init();
|
||
None
|
||
};
|
||
|
||
let mut event_processor: Box<dyn EventProcessor> = if json_mode {
|
||
Box::new(EventProcessorWithJsonOutput::new(last_message_file.clone()))
|
||
} else {
|
||
Box::new(EventProcessorWithHumanOutput::create_with_ansi(
|
||
stdout_with_ansi,
|
||
&config,
|
||
last_message_file.clone(),
|
||
))
|
||
};
|
||
|
||
if oss {
|
||
codex_ollama::ensure_oss_ready(&config)
|
||
.await
|
||
.map_err(|e| anyhow::anyhow!("OSS setup failed: {e}"))?;
|
||
}
|
||
|
||
// Print the effective configuration and prompt so users can see what Codex
|
||
// is using.
|
||
event_processor.print_config_summary(&config, &prompt);
|
||
|
||
if !skip_git_repo_check && get_git_repo_root(&config.cwd.to_path_buf()).is_none() {
|
||
eprintln!("Not inside a trusted directory and --skip-git-repo-check was not specified.");
|
||
std::process::exit(1);
|
||
}
|
||
|
||
let conversation_manager =
|
||
ConversationManager::new(AuthManager::shared(config.codex_home.clone()));
|
||
|
||
// Handle resume subcommand by resolving a rollout path and using explicit resume API.
|
||
let NewConversation {
|
||
conversation_id: _,
|
||
conversation,
|
||
session_configured,
|
||
} = if let Some(ExecCommand::Resume(args)) = command {
|
||
let resume_path = resolve_resume_path(&config, &args).await?;
|
||
|
||
if let Some(path) = resume_path {
|
||
conversation_manager
|
||
.resume_conversation_from_rollout(
|
||
config.clone(),
|
||
path,
|
||
AuthManager::shared(config.codex_home.clone()),
|
||
)
|
||
.await?
|
||
} else {
|
||
conversation_manager.new_conversation(config).await?
|
||
}
|
||
} else {
|
||
conversation_manager.new_conversation(config).await?
|
||
};
|
||
info!("Codex initialized with event: {session_configured:?}");
|
||
|
||
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<Event>();
|
||
{
|
||
let conversation = conversation.clone();
|
||
tokio::spawn(async move {
|
||
loop {
|
||
tokio::select! {
|
||
_ = tokio::signal::ctrl_c() => {
|
||
tracing::debug!("Keyboard interrupt");
|
||
// Immediately notify Codex to abort any in‑flight task.
|
||
conversation.submit(Op::Interrupt).await.ok();
|
||
|
||
// Exit the inner loop and return to the main input prompt. The codex
|
||
// will emit a `TurnInterrupted` (Error) event which is drained later.
|
||
break;
|
||
}
|
||
res = conversation.next_event() => match res {
|
||
Ok(event) => {
|
||
debug!("Received event: {event:?}");
|
||
|
||
let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
|
||
if let Err(e) = tx.send(event) {
|
||
error!("Error sending event: {e:?}");
|
||
break;
|
||
}
|
||
if is_shutdown_complete {
|
||
info!("Received shutdown event, exiting event loop.");
|
||
break;
|
||
}
|
||
},
|
||
Err(e) => {
|
||
error!("Error receiving event: {e:?}");
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
});
|
||
}
|
||
|
||
// Send images first, if any.
|
||
if !images.is_empty() {
|
||
let items: Vec<InputItem> = images
|
||
.into_iter()
|
||
.map(|path| InputItem::LocalImage { path })
|
||
.collect();
|
||
let initial_images_event_id = conversation.submit(Op::UserInput { items }).await?;
|
||
info!("Sent images with event ID: {initial_images_event_id}");
|
||
while let Ok(event) = conversation.next_event().await {
|
||
if event.id == initial_images_event_id
|
||
&& matches!(
|
||
event.msg,
|
||
EventMsg::TaskComplete(TaskCompleteEvent {
|
||
last_agent_message: _,
|
||
})
|
||
)
|
||
{
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
|
||
// Send the prompt.
|
||
let items: Vec<InputItem> = vec![InputItem::Text { text: prompt }];
|
||
let initial_prompt_task_id = conversation.submit(Op::UserInput { items }).await?;
|
||
info!("Sent prompt with event ID: {initial_prompt_task_id}");
|
||
|
||
// If stdin is an interactive TTY, watch for EOF (Ctrl+D) and request a graceful shutdown.
|
||
if std::io::stdin().is_terminal() {
|
||
let codex_for_eof = codex.clone();
|
||
tokio::spawn(async move {
|
||
use tokio::io::AsyncReadExt;
|
||
use tokio::io::stdin;
|
||
let mut stdin = stdin();
|
||
let mut buf = [0u8; 1];
|
||
loop {
|
||
match stdin.read(&mut buf).await {
|
||
Ok(0) => {
|
||
let _ = codex_for_eof.submit(Op::Shutdown).await;
|
||
break;
|
||
}
|
||
Ok(_) => {
|
||
// discard any input; exec does not read interactive input
|
||
continue;
|
||
}
|
||
Err(_) => break,
|
||
}
|
||
}
|
||
});
|
||
}
|
||
|
||
// Run the loop until the task is complete.
|
||
while let Some(event) = rx.recv().await {
|
||
let shutdown: CodexStatus = event_processor.process_event(event);
|
||
match shutdown {
|
||
CodexStatus::Running => continue,
|
||
CodexStatus::InitiateShutdown => {
|
||
conversation.submit(Op::Shutdown).await?;
|
||
}
|
||
CodexStatus::Shutdown => {
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn resolve_resume_path(
|
||
config: &Config,
|
||
args: &crate::cli::ResumeArgs,
|
||
) -> anyhow::Result<Option<PathBuf>> {
|
||
if args.last {
|
||
match codex_core::RolloutRecorder::list_conversations(&config.codex_home, 1, None).await {
|
||
Ok(page) => Ok(page.items.first().map(|it| it.path.clone())),
|
||
Err(e) => {
|
||
error!("Error listing conversations: {e}");
|
||
Ok(None)
|
||
}
|
||
}
|
||
} else if let Some(id_str) = args.session_id.as_deref() {
|
||
let path = find_conversation_path_by_id_str(&config.codex_home, id_str).await?;
|
||
Ok(path)
|
||
} else {
|
||
Ok(None)
|
||
}
|
||
}
|