#![deny(clippy::print_stdout, clippy::print_stderr)] use codex_common::CliConfigOverrides; use codex_core::config::Config; use codex_core::config::ConfigBuilder; use codex_core::config_loader::ConfigLayerStackOrdering; use codex_core::config_loader::LoaderOverrides; use std::io::ErrorKind; use std::io::Result as IoResult; use std::path::PathBuf; use crate::message_processor::MessageProcessor; use crate::outgoing_message::OutgoingMessage; use crate::outgoing_message::OutgoingMessageSender; use codex_app_server_protocol::ConfigLayerSource; use codex_app_server_protocol::ConfigWarningNotification; use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::TextPosition as AppTextPosition; use codex_app_server_protocol::TextRange as AppTextRange; use codex_core::ExecPolicyError; use codex_core::check_execpolicy_for_warnings; use codex_core::config_loader::ConfigLoadError; use codex_core::config_loader::TextRange as CoreTextRange; use codex_feedback::CodexFeedback; use tokio::io::AsyncBufReadExt; use tokio::io::AsyncWriteExt; use tokio::io::BufReader; use tokio::io::{self}; use tokio::sync::mpsc; use toml::Value as TomlValue; use tracing::debug; use tracing::error; use tracing::info; use tracing::warn; use tracing_subscriber::EnvFilter; use tracing_subscriber::Layer; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; mod bespoke_event_handling; mod codex_message_processor; mod config_api; mod dynamic_tools; mod error_code; mod filters; mod fuzzy_file_search; mod message_processor; mod models; mod outgoing_message; /// Size of the bounded channels used to communicate between tasks. The value /// is a balance between throughput and memory usage – 128 messages should be /// plenty for an interactive CLI. const CHANNEL_CAPACITY: usize = 128; fn config_warning_from_error( summary: impl Into, err: &std::io::Error, ) -> ConfigWarningNotification { let (path, range) = match config_error_location(err) { Some((path, range)) => (Some(path), Some(range)), None => (None, None), }; ConfigWarningNotification { summary: summary.into(), details: Some(err.to_string()), path, range, } } fn config_error_location(err: &std::io::Error) -> Option<(String, AppTextRange)> { err.get_ref() .and_then(|err| err.downcast_ref::()) .map(|err| { let config_error = err.config_error(); ( config_error.path.to_string_lossy().to_string(), app_text_range(&config_error.range), ) }) } fn exec_policy_warning_location(err: &ExecPolicyError) -> (Option, Option) { match err { ExecPolicyError::ParsePolicy { path, source } => { if let Some(location) = source.location() { let range = AppTextRange { start: AppTextPosition { line: location.range.start.line, column: location.range.start.column, }, end: AppTextPosition { line: location.range.end.line, column: location.range.end.column, }, }; return (Some(location.path), Some(range)); } (Some(path.clone()), None) } _ => (None, None), } } fn app_text_range(range: &CoreTextRange) -> AppTextRange { AppTextRange { start: AppTextPosition { line: range.start.line, column: range.start.column, }, end: AppTextPosition { line: range.end.line, column: range.end.column, }, } } fn project_config_warning(config: &Config) -> Option { let mut disabled_folders = Vec::new(); for layer in config .config_layer_stack .get_layers(ConfigLayerStackOrdering::LowestPrecedenceFirst, true) { if !matches!(layer.name, ConfigLayerSource::Project { .. }) || layer.disabled_reason.is_none() { continue; } if let ConfigLayerSource::Project { dot_codex_folder } = &layer.name { disabled_folders.push(( dot_codex_folder.as_path().display().to_string(), layer .disabled_reason .as_ref() .map(ToString::to_string) .unwrap_or_else(|| "config.toml is disabled.".to_string()), )); } } if disabled_folders.is_empty() { return None; } let mut message = concat!( "Project config.toml files are disabled in the following folders. ", "Settings in those files are ignored, but skills and exec policies still load.\n", ) .to_string(); for (index, (folder, reason)) in disabled_folders.iter().enumerate() { let display_index = index + 1; message.push_str(&format!(" {display_index}. {folder}\n")); message.push_str(&format!(" {reason}\n")); } Some(ConfigWarningNotification { summary: message, details: None, path: None, range: None, }) } pub async fn run_main( codex_linux_sandbox_exe: Option, cli_config_overrides: CliConfigOverrides, loader_overrides: LoaderOverrides, default_analytics_enabled: bool, ) -> IoResult<()> { // Set up channels. let (incoming_tx, mut incoming_rx) = mpsc::channel::(CHANNEL_CAPACITY); let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); // Task: read from stdin, push to `incoming_tx`. let stdin_reader_handle = tokio::spawn({ async move { let stdin = io::stdin(); let reader = BufReader::new(stdin); let mut lines = reader.lines(); while let Some(line) = lines.next_line().await.unwrap_or_default() { match serde_json::from_str::(&line) { Ok(msg) => { if incoming_tx.send(msg).await.is_err() { // Receiver gone – nothing left to do. break; } } Err(e) => error!("Failed to deserialize JSONRPCMessage: {e}"), } } debug!("stdin reader finished (EOF)"); } }); // Parse CLI overrides once and derive the base Config eagerly so later // components do not need to work with raw TOML values. let cli_kv_overrides = cli_config_overrides.parse_overrides().map_err(|e| { std::io::Error::new( ErrorKind::InvalidInput, format!("error parsing -c overrides: {e}"), ) })?; let loader_overrides_for_config_api = loader_overrides.clone(); let mut config_warnings = Vec::new(); let config = match ConfigBuilder::default() .cli_overrides(cli_kv_overrides.clone()) .loader_overrides(loader_overrides) .build() .await { Ok(config) => config, Err(err) => { let message = config_warning_from_error("Invalid configuration; using defaults.", &err); config_warnings.push(message); Config::load_default_with_cli_overrides(cli_kv_overrides.clone()).map_err(|e| { std::io::Error::new( ErrorKind::InvalidData, format!("error loading default config after config error: {e}"), ) })? } }; if let Ok(Some(err)) = check_execpolicy_for_warnings(&config.features, &config.config_layer_stack).await { let (path, range) = exec_policy_warning_location(&err); let message = ConfigWarningNotification { summary: "Error parsing rules; custom rules not applied.".to_string(), details: Some(err.to_string()), path, range, }; config_warnings.push(message); } if let Some(warning) = project_config_warning(&config) { config_warnings.push(warning); } let feedback = CodexFeedback::new(); let otel = codex_core::otel_init::build_provider( &config, env!("CARGO_PKG_VERSION"), Some("codex_app_server"), default_analytics_enabled, ) .map_err(|e| { std::io::Error::new( ErrorKind::InvalidData, format!("error loading otel config: {e}"), ) })?; // Install a simple subscriber so `tracing` output is visible. Users can // control the log level with `RUST_LOG`. let stderr_fmt = tracing_subscriber::fmt::layer() .with_writer(std::io::stderr) .with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL) .with_filter(EnvFilter::from_default_env()); let feedback_layer = feedback.logger_layer(); let feedback_metadata_layer = feedback.metadata_layer(); let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer()); let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer()); let _ = tracing_subscriber::registry() .with(stderr_fmt) .with(feedback_layer) .with(feedback_metadata_layer) .with(otel_logger_layer) .with(otel_tracing_layer) .try_init(); for warning in &config_warnings { match &warning.details { Some(details) => error!("{} {}", warning.summary, details), None => error!("{}", warning.summary), } } // Task: process incoming messages. let processor_handle = tokio::spawn({ let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx); let cli_overrides: Vec<(String, TomlValue)> = cli_kv_overrides.clone(); let loader_overrides = loader_overrides_for_config_api; let mut processor = MessageProcessor::new( outgoing_message_sender, codex_linux_sandbox_exe, std::sync::Arc::new(config), cli_overrides, loader_overrides, feedback.clone(), config_warnings, ); let mut thread_created_rx = processor.thread_created_receiver(); async move { let mut listen_for_threads = true; loop { tokio::select! { msg = incoming_rx.recv() => { let Some(msg) = msg else { break; }; match msg { JSONRPCMessage::Request(r) => processor.process_request(r).await, JSONRPCMessage::Response(r) => processor.process_response(r).await, JSONRPCMessage::Notification(n) => processor.process_notification(n).await, JSONRPCMessage::Error(e) => processor.process_error(e).await, } } created = thread_created_rx.recv(), if listen_for_threads => { match created { Ok(thread_id) => { processor.try_attach_thread_listener(thread_id).await; } Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { // TODO(jif) handle lag. // Assumes thread creation volume is low enough that lag never happens. // If it does, we log and continue without resyncing to avoid attaching // listeners for threads that should remain unsubscribed. warn!("thread_created receiver lagged; skipping resync"); } Err(tokio::sync::broadcast::error::RecvError::Closed) => { listen_for_threads = false; } } } } } info!("processor task exited (channel closed)"); } }); // Task: write outgoing messages to stdout. let stdout_writer_handle = tokio::spawn(async move { let mut stdout = io::stdout(); while let Some(outgoing_message) = outgoing_rx.recv().await { let Ok(value) = serde_json::to_value(outgoing_message) else { error!("Failed to convert OutgoingMessage to JSON value"); continue; }; match serde_json::to_string(&value) { Ok(mut json) => { json.push('\n'); if let Err(e) = stdout.write_all(json.as_bytes()).await { error!("Failed to write to stdout: {e}"); break; } } Err(e) => error!("Failed to serialize JSONRPCMessage: {e}"), } } info!("stdout writer exited (channel closed)"); }); // Wait for all tasks to finish. The typical exit path is the stdin reader // hitting EOF which, once it drops `incoming_tx`, propagates shutdown to // the processor and then to the stdout task. let _ = tokio::join!(stdin_reader_handle, processor_handle, stdout_writer_handle); Ok(()) }