mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
This PR does various types of cleanup before I can proceed with more ambitious changes to config loading. First, I noticed duplicated code across these two methods:774bd9e432/codex-rs/core/src/config/mod.rs (L314-L324)774bd9e432/codex-rs/core/src/config/mod.rs (L334-L344)This has now been consolidated in `load_config_as_toml_with_cli_overrides()`. Further, I noticed that `Config::load_with_cli_overrides()` took two similar arguments:774bd9e432/codex-rs/core/src/config/mod.rs (L308-L311)The difference between `cli_overrides` and `overrides` was not immediately obvious to me. At first glance, it appears that one should be able to be expressed in terms of the other, but it turns out that some fields of `ConfigOverrides` (such as `cwd` and `codex_linux_sandbox_exe`) are, by design, not configurable via a `.toml` file or a command-line `--config` flag. That said, I discovered that many callers of `Config::load_with_cli_overrides()` were passing `ConfigOverrides::default()` for `overrides`, so I created two separate methods: - `Config::load_with_cli_overrides(cli_overrides: Vec<(String, TomlValue)>)` - `Config::load_with_cli_overrides_and_harness_overrides(cli_overrides: Vec<(String, TomlValue)>, harness_overrides: ConfigOverrides)` The latter has a long name, as it is _not_ what should be used in the common case, so the extra typing is designed to draw attention to this fact. I tried to update the existing callsites to use the shorter name, where possible. Further, in the cases where `ConfigOverrides` is used, usually only a limited subset of fields are actually set, so I updated the declarations to leverage `..Default::default()` where possible.
178 lines
6.4 KiB
Rust
178 lines
6.4 KiB
Rust
#![deny(clippy::print_stdout, clippy::print_stderr)]
|
||
|
||
use codex_common::CliConfigOverrides;
|
||
use codex_core::config::Config;
|
||
use std::io::ErrorKind;
|
||
use std::io::Result as IoResult;
|
||
use std::path::PathBuf;
|
||
|
||
use crate::message_processor::MessageProcessor;
|
||
use crate::outgoing_message::OutgoingMessage;
|
||
use crate::outgoing_message::OutgoingMessageSender;
|
||
use codex_app_server_protocol::JSONRPCMessage;
|
||
use codex_feedback::CodexFeedback;
|
||
use tokio::io::AsyncBufReadExt;
|
||
use tokio::io::AsyncWriteExt;
|
||
use tokio::io::BufReader;
|
||
use tokio::io::{self};
|
||
use tokio::sync::mpsc;
|
||
use toml::Value as TomlValue;
|
||
use tracing::Level;
|
||
use tracing::debug;
|
||
use tracing::error;
|
||
use tracing::info;
|
||
use tracing_subscriber::EnvFilter;
|
||
use tracing_subscriber::Layer;
|
||
use tracing_subscriber::filter::Targets;
|
||
use tracing_subscriber::layer::SubscriberExt;
|
||
use tracing_subscriber::util::SubscriberInitExt;
|
||
|
||
mod bespoke_event_handling;
|
||
mod codex_message_processor;
|
||
mod config_api;
|
||
mod error_code;
|
||
mod fuzzy_file_search;
|
||
mod message_processor;
|
||
mod models;
|
||
mod outgoing_message;
|
||
|
||
/// Size of the bounded channels used to communicate between tasks. The value
|
||
/// is a balance between throughput and memory usage – 128 messages should be
|
||
/// plenty for an interactive CLI.
|
||
const CHANNEL_CAPACITY: usize = 128;
|
||
|
||
pub async fn run_main(
|
||
codex_linux_sandbox_exe: Option<PathBuf>,
|
||
cli_config_overrides: CliConfigOverrides,
|
||
) -> IoResult<()> {
|
||
// Set up channels.
|
||
let (incoming_tx, mut incoming_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
|
||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingMessage>(CHANNEL_CAPACITY);
|
||
|
||
// Task: read from stdin, push to `incoming_tx`.
|
||
let stdin_reader_handle = tokio::spawn({
|
||
async move {
|
||
let stdin = io::stdin();
|
||
let reader = BufReader::new(stdin);
|
||
let mut lines = reader.lines();
|
||
|
||
while let Some(line) = lines.next_line().await.unwrap_or_default() {
|
||
match serde_json::from_str::<JSONRPCMessage>(&line) {
|
||
Ok(msg) => {
|
||
if incoming_tx.send(msg).await.is_err() {
|
||
// Receiver gone – nothing left to do.
|
||
break;
|
||
}
|
||
}
|
||
Err(e) => error!("Failed to deserialize JSONRPCMessage: {e}"),
|
||
}
|
||
}
|
||
|
||
debug!("stdin reader finished (EOF)");
|
||
}
|
||
});
|
||
|
||
// Parse CLI overrides once and derive the base Config eagerly so later
|
||
// components do not need to work with raw TOML values.
|
||
let cli_kv_overrides = cli_config_overrides.parse_overrides().map_err(|e| {
|
||
std::io::Error::new(
|
||
ErrorKind::InvalidInput,
|
||
format!("error parsing -c overrides: {e}"),
|
||
)
|
||
})?;
|
||
let config = Config::load_with_cli_overrides(cli_kv_overrides.clone())
|
||
.await
|
||
.map_err(|e| {
|
||
std::io::Error::new(ErrorKind::InvalidData, format!("error loading config: {e}"))
|
||
})?;
|
||
|
||
let feedback = CodexFeedback::new();
|
||
|
||
let otel =
|
||
codex_core::otel_init::build_provider(&config, env!("CARGO_PKG_VERSION")).map_err(|e| {
|
||
std::io::Error::new(
|
||
ErrorKind::InvalidData,
|
||
format!("error loading otel config: {e}"),
|
||
)
|
||
})?;
|
||
|
||
// Install a simple subscriber so `tracing` output is visible. Users can
|
||
// control the log level with `RUST_LOG`.
|
||
let stderr_fmt = tracing_subscriber::fmt::layer()
|
||
.with_writer(std::io::stderr)
|
||
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL)
|
||
.with_filter(EnvFilter::from_default_env());
|
||
|
||
let feedback_layer = tracing_subscriber::fmt::layer()
|
||
.with_writer(feedback.make_writer())
|
||
.with_ansi(false)
|
||
.with_target(false)
|
||
.with_filter(Targets::new().with_default(Level::TRACE));
|
||
|
||
let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());
|
||
|
||
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());
|
||
|
||
let _ = tracing_subscriber::registry()
|
||
.with(stderr_fmt)
|
||
.with(feedback_layer)
|
||
.with(otel_logger_layer)
|
||
.with(otel_tracing_layer)
|
||
.try_init();
|
||
|
||
// Task: process incoming messages.
|
||
let processor_handle = tokio::spawn({
|
||
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
|
||
let cli_overrides: Vec<(String, TomlValue)> = cli_kv_overrides.clone();
|
||
let mut processor = MessageProcessor::new(
|
||
outgoing_message_sender,
|
||
codex_linux_sandbox_exe,
|
||
std::sync::Arc::new(config),
|
||
cli_overrides,
|
||
feedback.clone(),
|
||
);
|
||
async move {
|
||
while let Some(msg) = incoming_rx.recv().await {
|
||
match msg {
|
||
JSONRPCMessage::Request(r) => processor.process_request(r).await,
|
||
JSONRPCMessage::Response(r) => processor.process_response(r).await,
|
||
JSONRPCMessage::Notification(n) => processor.process_notification(n).await,
|
||
JSONRPCMessage::Error(e) => processor.process_error(e),
|
||
}
|
||
}
|
||
|
||
info!("processor task exited (channel closed)");
|
||
}
|
||
});
|
||
|
||
// Task: write outgoing messages to stdout.
|
||
let stdout_writer_handle = tokio::spawn(async move {
|
||
let mut stdout = io::stdout();
|
||
while let Some(outgoing_message) = outgoing_rx.recv().await {
|
||
let Ok(value) = serde_json::to_value(outgoing_message) else {
|
||
error!("Failed to convert OutgoingMessage to JSON value");
|
||
continue;
|
||
};
|
||
match serde_json::to_string(&value) {
|
||
Ok(mut json) => {
|
||
json.push('\n');
|
||
if let Err(e) = stdout.write_all(json.as_bytes()).await {
|
||
error!("Failed to write to stdout: {e}");
|
||
break;
|
||
}
|
||
}
|
||
Err(e) => error!("Failed to serialize JSONRPCMessage: {e}"),
|
||
}
|
||
}
|
||
|
||
info!("stdout writer exited (channel closed)");
|
||
});
|
||
|
||
// Wait for all tasks to finish. The typical exit path is the stdin reader
|
||
// hitting EOF which, once it drops `incoming_tx`, propagates shutdown to
|
||
// the processor and then to the stdout task.
|
||
let _ = tokio::join!(stdin_reader_handle, processor_handle, stdout_writer_handle);
|
||
|
||
Ok(())
|
||
}
|