mirror of
https://github.com/openai/codex.git
synced 2026-05-25 05:24:37 +00:00
## Summary Auth loading used to expose synchronous construction helpers in several places even though some auth sources now need async work. This PR makes the auth-loading surface async and updates the callers to await it. This is intentionally only plumbing. It does not change how AgentIdentity tokens are decoded, how task runtime ids are allocated, or how JWT signatures are verified. ## Stack 1. **This PR:** [refactor: make auth loading async](https://github.com/openai/codex/pull/19762) 2. [refactor: load AgentIdentity runtime eagerly](https://github.com/openai/codex/pull/19763) 3. [feat: verify AgentIdentity JWTs with JWKS](https://github.com/openai/codex/pull/19764) ## Important call sites | Area | Change | | --- | --- | | `codex-login` auth loading | `CodexAuth` and `AuthManager` construction paths now await auth loading. | | app-server startup | Auth manager construction is awaited during initialization. | | CLI/TUI/exec/MCP/chatgpt callers | Existing auth-loading calls now await the same behavior. | | cloud requirements storage loader | The loader becomes async so it can share the same auth construction path. | | auth tests | Tests that load auth now run in async contexts. | ## Testing Tests: targeted Rust auth test compilation, formatter, scoped Clippy fix, and Bazel lock check.
992 lines
39 KiB
Rust
992 lines
39 KiB
Rust
#![deny(clippy::print_stdout, clippy::print_stderr)]
|
|
|
|
use codex_arg0::Arg0DispatchPaths;
|
|
use codex_config::ConfigLayerStackOrdering;
|
|
use codex_config::LoaderOverrides;
|
|
use codex_config::NoopThreadConfigLoader;
|
|
use codex_config::RemoteThreadConfigLoader;
|
|
use codex_config::ThreadConfigLoader;
|
|
use codex_core::config::Config;
|
|
use codex_exec_server::EnvironmentManagerArgs;
|
|
use codex_features::Feature;
|
|
use codex_login::AuthManager;
|
|
use codex_utils_cli::CliConfigOverrides;
|
|
use std::collections::HashMap;
|
|
use std::collections::HashSet;
|
|
use std::io::ErrorKind;
|
|
use std::io::Result as IoResult;
|
|
use std::sync::Arc;
|
|
use std::sync::RwLock;
|
|
use std::sync::atomic::AtomicBool;
|
|
|
|
use crate::config_manager::ConfigManager;
|
|
use crate::message_processor::MessageProcessor;
|
|
use crate::message_processor::MessageProcessorArgs;
|
|
use crate::outgoing_message::ConnectionId;
|
|
use crate::outgoing_message::OutgoingEnvelope;
|
|
use crate::outgoing_message::OutgoingMessageSender;
|
|
use crate::outgoing_message::QueuedOutgoingMessage;
|
|
use crate::transport::CHANNEL_CAPACITY;
|
|
use crate::transport::ConnectionState;
|
|
use crate::transport::OutboundConnectionState;
|
|
use crate::transport::TransportEvent;
|
|
use crate::transport::auth::policy_from_settings;
|
|
use crate::transport::route_outgoing_envelope;
|
|
use crate::transport::start_control_socket_acceptor;
|
|
use crate::transport::start_remote_control;
|
|
use crate::transport::start_stdio_connection;
|
|
use crate::transport::start_websocket_acceptor;
|
|
use codex_analytics::AppServerRpcTransport;
|
|
use codex_app_server_protocol::ConfigLayerSource;
|
|
use codex_app_server_protocol::ConfigWarningNotification;
|
|
use codex_app_server_protocol::JSONRPCMessage;
|
|
use codex_app_server_protocol::TextPosition as AppTextPosition;
|
|
use codex_app_server_protocol::TextRange as AppTextRange;
|
|
use codex_config::ConfigLoadError;
|
|
use codex_config::TextRange as CoreTextRange;
|
|
use codex_core::ExecPolicyError;
|
|
use codex_core::check_execpolicy_for_warnings;
|
|
use codex_core::config::find_codex_home;
|
|
use codex_exec_server::EnvironmentManager;
|
|
use codex_exec_server::ExecServerRuntimePaths;
|
|
use codex_feedback::CodexFeedback;
|
|
use codex_protocol::protocol::SessionSource;
|
|
use codex_state::log_db;
|
|
use tokio::sync::mpsc;
|
|
use tokio::sync::oneshot;
|
|
use tokio::task::JoinHandle;
|
|
use tokio_util::sync::CancellationToken;
|
|
use tracing::Level;
|
|
use tracing::error;
|
|
use tracing::info;
|
|
use tracing::warn;
|
|
use tracing_subscriber::EnvFilter;
|
|
use tracing_subscriber::Layer;
|
|
use tracing_subscriber::filter::Targets;
|
|
use tracing_subscriber::layer::SubscriberExt;
|
|
use tracing_subscriber::registry::Registry;
|
|
use tracing_subscriber::util::SubscriberInitExt;
|
|
|
|
mod app_server_tracing;
|
|
mod bespoke_event_handling;
|
|
mod codex_message_processor;
|
|
mod command_exec;
|
|
mod config;
|
|
mod config_api;
|
|
mod config_manager;
|
|
mod config_manager_service;
|
|
mod device_key_api;
|
|
mod dynamic_tools;
|
|
mod error_code;
|
|
mod external_agent_config_api;
|
|
mod filters;
|
|
mod fs_api;
|
|
mod fs_watch;
|
|
mod fuzzy_file_search;
|
|
pub mod in_process;
|
|
mod message_processor;
|
|
mod models;
|
|
mod outgoing_message;
|
|
mod server_request_error;
|
|
mod thread_state;
|
|
mod thread_status;
|
|
mod transport;
|
|
|
|
pub use crate::error_code::INPUT_TOO_LARGE_ERROR_CODE;
|
|
pub use crate::error_code::INVALID_PARAMS_ERROR_CODE;
|
|
pub use crate::transport::AppServerTransport;
|
|
pub use crate::transport::app_server_control_socket_path;
|
|
pub use crate::transport::auth::AppServerWebsocketAuthArgs;
|
|
pub use crate::transport::auth::AppServerWebsocketAuthSettings;
|
|
pub use crate::transport::auth::WebsocketAuthCliMode;
|
|
|
|
const LOG_FORMAT_ENV_VAR: &str = "LOG_FORMAT";
|
|
|
|
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
|
enum LogFormat {
|
|
Default,
|
|
Json,
|
|
}
|
|
|
|
type StderrLogLayer = Box<dyn Layer<Registry> + Send + Sync + 'static>;
|
|
|
|
fn configured_thread_config_loader(config: &Config) -> Arc<dyn ThreadConfigLoader> {
|
|
match config.experimental_thread_config_endpoint.as_deref() {
|
|
Some(endpoint) => Arc::new(RemoteThreadConfigLoader::new(endpoint)),
|
|
None => Arc::new(NoopThreadConfigLoader),
|
|
}
|
|
}
|
|
|
|
/// Control-plane messages from the processor/transport side to the outbound router task.
|
|
///
|
|
/// `run_main_with_transport` now uses two loops/tasks:
|
|
/// - processor loop: handles incoming JSON-RPC and request dispatch
|
|
/// - outbound loop: performs potentially slow writes to per-connection writers
|
|
///
|
|
/// `OutboundControlEvent` keeps those loops coordinated without sharing mutable
|
|
/// connection state directly. In particular, the outbound loop needs to know
|
|
/// when a connection opens/closes so it can route messages correctly.
|
|
enum OutboundControlEvent {
|
|
/// Register a new writer for an opened connection.
|
|
Opened {
|
|
connection_id: ConnectionId,
|
|
writer: mpsc::Sender<QueuedOutgoingMessage>,
|
|
disconnect_sender: Option<CancellationToken>,
|
|
initialized: Arc<AtomicBool>,
|
|
experimental_api_enabled: Arc<AtomicBool>,
|
|
opted_out_notification_methods: Arc<RwLock<HashSet<String>>>,
|
|
},
|
|
/// Remove state for a closed/disconnected connection.
|
|
Closed { connection_id: ConnectionId },
|
|
/// Disconnect all connection-oriented clients during graceful restart.
|
|
DisconnectAll,
|
|
}
|
|
|
|
#[derive(Default)]
|
|
struct ShutdownState {
|
|
requested: bool,
|
|
forced: bool,
|
|
last_logged_running_turn_count: Option<usize>,
|
|
}
|
|
|
|
enum ShutdownAction {
|
|
Noop,
|
|
Finish,
|
|
}
|
|
|
|
async fn shutdown_signal() -> IoResult<()> {
|
|
#[cfg(unix)]
|
|
{
|
|
use tokio::signal::unix::SignalKind;
|
|
use tokio::signal::unix::signal;
|
|
|
|
let mut term = signal(SignalKind::terminate())?;
|
|
tokio::select! {
|
|
ctrl_c_result = tokio::signal::ctrl_c() => ctrl_c_result,
|
|
_ = term.recv() => Ok(()),
|
|
}
|
|
}
|
|
|
|
#[cfg(not(unix))]
|
|
{
|
|
tokio::signal::ctrl_c().await
|
|
}
|
|
}
|
|
|
|
impl ShutdownState {
|
|
fn requested(&self) -> bool {
|
|
self.requested
|
|
}
|
|
|
|
fn forced(&self) -> bool {
|
|
self.forced
|
|
}
|
|
|
|
fn on_signal(&mut self, connection_count: usize, running_turn_count: usize) {
|
|
if self.requested {
|
|
self.forced = true;
|
|
return;
|
|
}
|
|
|
|
self.requested = true;
|
|
self.last_logged_running_turn_count = None;
|
|
info!(
|
|
"received shutdown signal; entering graceful restart drain (connections={}, runningAssistantTurns={}, requests still accepted until no assistant turns are running)",
|
|
connection_count, running_turn_count,
|
|
);
|
|
}
|
|
|
|
fn update(&mut self, running_turn_count: usize, connection_count: usize) -> ShutdownAction {
|
|
if !self.requested {
|
|
return ShutdownAction::Noop;
|
|
}
|
|
|
|
if self.forced || running_turn_count == 0 {
|
|
if self.forced {
|
|
info!(
|
|
"received second shutdown signal; forcing restart with {running_turn_count} running assistant turn(s) and {connection_count} connection(s)"
|
|
);
|
|
} else {
|
|
info!(
|
|
"shutdown signal restart: no assistant turns running; stopping acceptor and disconnecting {connection_count} connection(s)"
|
|
);
|
|
}
|
|
return ShutdownAction::Finish;
|
|
}
|
|
|
|
if self.last_logged_running_turn_count != Some(running_turn_count) {
|
|
info!(
|
|
"shutdown signal restart: waiting for {running_turn_count} running assistant turn(s) to finish"
|
|
);
|
|
self.last_logged_running_turn_count = Some(running_turn_count);
|
|
}
|
|
|
|
ShutdownAction::Noop
|
|
}
|
|
}
|
|
|
|
fn config_warning_from_error(
|
|
summary: impl Into<String>,
|
|
err: &std::io::Error,
|
|
) -> ConfigWarningNotification {
|
|
let (path, range) = match config_error_location(err) {
|
|
Some((path, range)) => (Some(path), Some(range)),
|
|
None => (None, None),
|
|
};
|
|
ConfigWarningNotification {
|
|
summary: summary.into(),
|
|
details: Some(err.to_string()),
|
|
path,
|
|
range,
|
|
}
|
|
}
|
|
|
|
fn config_error_location(err: &std::io::Error) -> Option<(String, AppTextRange)> {
|
|
err.get_ref()
|
|
.and_then(|err| err.downcast_ref::<ConfigLoadError>())
|
|
.map(|err| {
|
|
let config_error = err.config_error();
|
|
(
|
|
config_error.path.to_string_lossy().to_string(),
|
|
app_text_range(&config_error.range),
|
|
)
|
|
})
|
|
}
|
|
|
|
fn exec_policy_warning_location(err: &ExecPolicyError) -> (Option<String>, Option<AppTextRange>) {
|
|
match err {
|
|
ExecPolicyError::ParsePolicy { path, source } => {
|
|
if let Some(location) = source.location() {
|
|
let range = AppTextRange {
|
|
start: AppTextPosition {
|
|
line: location.range.start.line,
|
|
column: location.range.start.column,
|
|
},
|
|
end: AppTextPosition {
|
|
line: location.range.end.line,
|
|
column: location.range.end.column,
|
|
},
|
|
};
|
|
return (Some(location.path), Some(range));
|
|
}
|
|
(Some(path.clone()), None)
|
|
}
|
|
_ => (None, None),
|
|
}
|
|
}
|
|
|
|
fn app_text_range(range: &CoreTextRange) -> AppTextRange {
|
|
AppTextRange {
|
|
start: AppTextPosition {
|
|
line: range.start.line,
|
|
column: range.start.column,
|
|
},
|
|
end: AppTextPosition {
|
|
line: range.end.line,
|
|
column: range.end.column,
|
|
},
|
|
}
|
|
}
|
|
|
|
fn project_config_warning(config: &Config) -> Option<ConfigWarningNotification> {
|
|
let mut disabled_folders = Vec::new();
|
|
|
|
for layer in config.config_layer_stack.get_layers(
|
|
ConfigLayerStackOrdering::LowestPrecedenceFirst,
|
|
/*include_disabled*/ true,
|
|
) {
|
|
let ConfigLayerSource::Project { dot_codex_folder } = &layer.name else {
|
|
continue;
|
|
};
|
|
let Some(disabled_reason) = &layer.disabled_reason else {
|
|
continue;
|
|
};
|
|
disabled_folders.push((
|
|
dot_codex_folder.as_path().display().to_string(),
|
|
disabled_reason.clone(),
|
|
));
|
|
}
|
|
|
|
if disabled_folders.is_empty() {
|
|
return None;
|
|
}
|
|
|
|
let mut message = concat!(
|
|
"Project-local config, hooks, and exec policies are disabled in the following folders ",
|
|
"until the project is trusted, but skills still load.\n",
|
|
)
|
|
.to_string();
|
|
for (index, (folder, reason)) in disabled_folders.iter().enumerate() {
|
|
let display_index = index + 1;
|
|
message.push_str(&format!(" {display_index}. {folder}\n"));
|
|
message.push_str(&format!(" {reason}\n"));
|
|
}
|
|
|
|
Some(ConfigWarningNotification {
|
|
summary: message,
|
|
details: None,
|
|
path: None,
|
|
range: None,
|
|
})
|
|
}
|
|
|
|
impl LogFormat {
|
|
fn from_env_value(value: Option<&str>) -> Self {
|
|
match value.map(str::trim).map(str::to_ascii_lowercase) {
|
|
Some(value) if value == "json" => Self::Json,
|
|
_ => Self::Default,
|
|
}
|
|
}
|
|
}
|
|
|
|
fn log_format_from_env() -> LogFormat {
|
|
let value = std::env::var(LOG_FORMAT_ENV_VAR).ok();
|
|
LogFormat::from_env_value(value.as_deref())
|
|
}
|
|
|
|
pub async fn run_main(
|
|
arg0_paths: Arg0DispatchPaths,
|
|
cli_config_overrides: CliConfigOverrides,
|
|
loader_overrides: LoaderOverrides,
|
|
default_analytics_enabled: bool,
|
|
) -> IoResult<()> {
|
|
run_main_with_transport(
|
|
arg0_paths,
|
|
cli_config_overrides,
|
|
loader_overrides,
|
|
default_analytics_enabled,
|
|
AppServerTransport::Stdio,
|
|
SessionSource::VSCode,
|
|
AppServerWebsocketAuthSettings::default(),
|
|
)
|
|
.await
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
pub enum PluginStartupTasks {
|
|
Start,
|
|
Skip,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
pub struct AppServerRuntimeOptions {
|
|
pub plugin_startup_tasks: PluginStartupTasks,
|
|
}
|
|
|
|
impl Default for AppServerRuntimeOptions {
|
|
fn default() -> Self {
|
|
Self {
|
|
plugin_startup_tasks: PluginStartupTasks::Start,
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn run_main_with_transport(
|
|
arg0_paths: Arg0DispatchPaths,
|
|
cli_config_overrides: CliConfigOverrides,
|
|
loader_overrides: LoaderOverrides,
|
|
default_analytics_enabled: bool,
|
|
transport: AppServerTransport,
|
|
session_source: SessionSource,
|
|
auth: AppServerWebsocketAuthSettings,
|
|
) -> IoResult<()> {
|
|
run_main_with_transport_options(
|
|
arg0_paths,
|
|
cli_config_overrides,
|
|
loader_overrides,
|
|
default_analytics_enabled,
|
|
transport,
|
|
session_source,
|
|
auth,
|
|
AppServerRuntimeOptions::default(),
|
|
)
|
|
.await
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub async fn run_main_with_transport_options(
|
|
arg0_paths: Arg0DispatchPaths,
|
|
cli_config_overrides: CliConfigOverrides,
|
|
loader_overrides: LoaderOverrides,
|
|
default_analytics_enabled: bool,
|
|
transport: AppServerTransport,
|
|
session_source: SessionSource,
|
|
auth: AppServerWebsocketAuthSettings,
|
|
runtime_options: AppServerRuntimeOptions,
|
|
) -> IoResult<()> {
|
|
let environment_manager = Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::from_env(
|
|
ExecServerRuntimePaths::from_optional_paths(
|
|
arg0_paths.codex_self_exe.clone(),
|
|
arg0_paths.codex_linux_sandbox_exe.clone(),
|
|
)?,
|
|
)));
|
|
let (transport_event_tx, mut transport_event_rx) =
|
|
mpsc::channel::<TransportEvent>(CHANNEL_CAPACITY);
|
|
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
|
|
let (outbound_control_tx, mut outbound_control_rx) =
|
|
mpsc::channel::<OutboundControlEvent>(CHANNEL_CAPACITY);
|
|
|
|
// Parse CLI overrides once and derive the base Config eagerly so later
|
|
// components do not need to work with raw TOML values.
|
|
let cli_kv_overrides = cli_config_overrides.parse_overrides().map_err(|e| {
|
|
std::io::Error::new(
|
|
ErrorKind::InvalidInput,
|
|
format!("error parsing -c overrides: {e}"),
|
|
)
|
|
})?;
|
|
let codex_home = find_codex_home()?;
|
|
let config_manager = ConfigManager::new(
|
|
codex_home.to_path_buf(),
|
|
cli_kv_overrides.clone(),
|
|
loader_overrides,
|
|
Default::default(),
|
|
arg0_paths.clone(),
|
|
Arc::new(NoopThreadConfigLoader),
|
|
);
|
|
match config_manager
|
|
.load_latest_config(/*fallback_cwd*/ None)
|
|
.await
|
|
{
|
|
Ok(config) => {
|
|
let effective_toml = config.config_layer_stack.effective_config();
|
|
match effective_toml.try_into() {
|
|
Ok(config_toml) => {
|
|
if let Err(err) = codex_core::personality_migration::maybe_migrate_personality(
|
|
&config.codex_home,
|
|
&config_toml,
|
|
)
|
|
.await
|
|
{
|
|
warn!(error = %err, "Failed to run personality migration");
|
|
}
|
|
}
|
|
Err(err) => {
|
|
warn!(error = %err, "Failed to deserialize config for personality migration");
|
|
}
|
|
}
|
|
|
|
let discovered_thread_config_loader = configured_thread_config_loader(&config);
|
|
config_manager
|
|
.replace_thread_config_loader(Arc::clone(&discovered_thread_config_loader));
|
|
let auth_manager =
|
|
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;
|
|
config_manager.replace_cloud_requirements_loader(auth_manager, config.chatgpt_base_url);
|
|
}
|
|
Err(err) => {
|
|
warn!(error = %err, "Failed to preload config for cloud requirements");
|
|
// TODO(gt): Make cloud requirements preload failures blocking once we can fail-closed.
|
|
}
|
|
};
|
|
let mut config_warnings = Vec::new();
|
|
let config = match config_manager
|
|
.load_latest_config(/*fallback_cwd*/ None)
|
|
.await
|
|
{
|
|
Ok(config) => config,
|
|
Err(err) => {
|
|
let message = config_warning_from_error("Invalid configuration; using defaults.", &err);
|
|
config_warnings.push(message);
|
|
config_manager.load_default_config().await.map_err(|e| {
|
|
std::io::Error::new(
|
|
ErrorKind::InvalidData,
|
|
format!("error loading default config after config error: {e}"),
|
|
)
|
|
})?
|
|
}
|
|
};
|
|
|
|
if let Ok(Some(err)) = check_execpolicy_for_warnings(&config.config_layer_stack).await {
|
|
let (path, range) = exec_policy_warning_location(&err);
|
|
let message = ConfigWarningNotification {
|
|
summary: "Error parsing rules; custom rules not applied.".to_string(),
|
|
details: Some(err.to_string()),
|
|
path,
|
|
range,
|
|
};
|
|
config_warnings.push(message);
|
|
}
|
|
|
|
if let Some(warning) = project_config_warning(&config) {
|
|
config_warnings.push(warning);
|
|
}
|
|
for warning in &config.startup_warnings {
|
|
config_warnings.push(ConfigWarningNotification {
|
|
summary: warning.clone(),
|
|
details: None,
|
|
path: None,
|
|
range: None,
|
|
});
|
|
}
|
|
if let Some(warning) =
|
|
codex_core::config::system_bwrap_warning(config.permissions.permission_profile.get())
|
|
{
|
|
config_warnings.push(ConfigWarningNotification {
|
|
summary: warning,
|
|
details: None,
|
|
path: None,
|
|
range: None,
|
|
});
|
|
}
|
|
|
|
let feedback = CodexFeedback::new();
|
|
|
|
let otel = codex_core::otel_init::build_provider(
|
|
&config,
|
|
env!("CARGO_PKG_VERSION"),
|
|
Some("codex-app-server"),
|
|
default_analytics_enabled,
|
|
)
|
|
.map_err(|e| {
|
|
std::io::Error::new(
|
|
ErrorKind::InvalidData,
|
|
format!("error loading otel config: {e}"),
|
|
)
|
|
})?;
|
|
|
|
// Install a simple subscriber so `tracing` output is visible. Users can
|
|
// control the log level with `RUST_LOG` and switch to JSON logs with
|
|
// `LOG_FORMAT=json`.
|
|
let stderr_fmt: StderrLogLayer = match log_format_from_env() {
|
|
LogFormat::Json => tracing_subscriber::fmt::layer()
|
|
.json()
|
|
.with_writer(std::io::stderr)
|
|
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL)
|
|
.with_filter(EnvFilter::from_default_env())
|
|
.boxed(),
|
|
LogFormat::Default => tracing_subscriber::fmt::layer()
|
|
.with_writer(std::io::stderr)
|
|
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL)
|
|
.with_filter(EnvFilter::from_default_env())
|
|
.boxed(),
|
|
};
|
|
|
|
let feedback_layer = feedback.logger_layer();
|
|
let feedback_metadata_layer = feedback.metadata_layer();
|
|
let state_db = codex_state::StateRuntime::init(
|
|
config.sqlite_home.clone(),
|
|
config.model_provider_id.clone(),
|
|
)
|
|
.await
|
|
.ok();
|
|
let log_db = state_db.clone().map(log_db::start);
|
|
let log_db_layer = log_db
|
|
.clone()
|
|
.map(|layer| layer.with_filter(Targets::new().with_default(Level::TRACE)));
|
|
let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());
|
|
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());
|
|
let _ = tracing_subscriber::registry()
|
|
.with(stderr_fmt)
|
|
.with(feedback_layer)
|
|
.with(feedback_metadata_layer)
|
|
.with(log_db_layer)
|
|
.with(otel_logger_layer)
|
|
.with(otel_tracing_layer)
|
|
.try_init();
|
|
for warning in &config_warnings {
|
|
match &warning.details {
|
|
Some(details) => error!("{} {}", warning.summary, details),
|
|
None => error!("{}", warning.summary),
|
|
}
|
|
}
|
|
|
|
let transport_shutdown_token = CancellationToken::new();
|
|
let mut transport_accept_handles = Vec::<JoinHandle<()>>::new();
|
|
|
|
let single_client_mode = matches!(&transport, AppServerTransport::Stdio);
|
|
let shutdown_when_no_connections = single_client_mode;
|
|
let graceful_signal_restart_enabled = !single_client_mode;
|
|
let mut app_server_client_name_rx = None;
|
|
|
|
match &transport {
|
|
AppServerTransport::Stdio => {
|
|
let (stdio_client_name_tx, stdio_client_name_rx) = oneshot::channel::<String>();
|
|
app_server_client_name_rx = Some(stdio_client_name_rx);
|
|
start_stdio_connection(
|
|
transport_event_tx.clone(),
|
|
&mut transport_accept_handles,
|
|
stdio_client_name_tx,
|
|
)
|
|
.await?;
|
|
}
|
|
AppServerTransport::UnixSocket { socket_path } => {
|
|
let accept_handle = start_control_socket_acceptor(
|
|
socket_path.clone(),
|
|
transport_event_tx.clone(),
|
|
transport_shutdown_token.clone(),
|
|
)
|
|
.await?;
|
|
transport_accept_handles.push(accept_handle);
|
|
}
|
|
AppServerTransport::WebSocket { bind_address } => {
|
|
let accept_handle = start_websocket_acceptor(
|
|
*bind_address,
|
|
transport_event_tx.clone(),
|
|
transport_shutdown_token.clone(),
|
|
policy_from_settings(&auth)?,
|
|
)
|
|
.await?;
|
|
transport_accept_handles.push(accept_handle);
|
|
}
|
|
AppServerTransport::Off => {}
|
|
}
|
|
|
|
let auth_manager =
|
|
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;
|
|
|
|
let remote_control_enabled = config.features.enabled(Feature::RemoteControl);
|
|
if transport_accept_handles.is_empty() && !remote_control_enabled {
|
|
return Err(std::io::Error::new(
|
|
ErrorKind::InvalidInput,
|
|
"no transport configured; use --listen or enable remote control",
|
|
));
|
|
}
|
|
|
|
let (remote_control_accept_handle, remote_control_handle) = start_remote_control(
|
|
config.chatgpt_base_url.clone(),
|
|
state_db.clone(),
|
|
auth_manager.clone(),
|
|
transport_event_tx.clone(),
|
|
transport_shutdown_token.clone(),
|
|
app_server_client_name_rx,
|
|
remote_control_enabled,
|
|
)
|
|
.await?;
|
|
transport_accept_handles.push(remote_control_accept_handle);
|
|
|
|
let outbound_handle = tokio::spawn(async move {
|
|
let mut outbound_connections = HashMap::<ConnectionId, OutboundConnectionState>::new();
|
|
loop {
|
|
tokio::select! {
|
|
biased;
|
|
event = outbound_control_rx.recv() => {
|
|
let Some(event) = event else {
|
|
break;
|
|
};
|
|
match event {
|
|
OutboundControlEvent::Opened {
|
|
connection_id,
|
|
writer,
|
|
disconnect_sender,
|
|
initialized,
|
|
experimental_api_enabled,
|
|
opted_out_notification_methods,
|
|
} => {
|
|
outbound_connections.insert(
|
|
connection_id,
|
|
OutboundConnectionState::new(
|
|
writer,
|
|
initialized,
|
|
experimental_api_enabled,
|
|
opted_out_notification_methods,
|
|
disconnect_sender,
|
|
),
|
|
);
|
|
}
|
|
OutboundControlEvent::Closed { connection_id } => {
|
|
outbound_connections.remove(&connection_id);
|
|
}
|
|
OutboundControlEvent::DisconnectAll => {
|
|
info!(
|
|
"disconnecting {} outbound websocket connection(s) for graceful restart",
|
|
outbound_connections.len()
|
|
);
|
|
for connection_state in outbound_connections.values() {
|
|
connection_state.request_disconnect();
|
|
}
|
|
outbound_connections.clear();
|
|
}
|
|
}
|
|
}
|
|
envelope = outgoing_rx.recv() => {
|
|
let Some(envelope) = envelope else {
|
|
break;
|
|
};
|
|
route_outgoing_envelope(&mut outbound_connections, envelope).await;
|
|
}
|
|
}
|
|
}
|
|
info!("outbound router task exited (channel closed)");
|
|
});
|
|
|
|
let processor_handle = tokio::spawn({
|
|
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
|
let outbound_control_tx = outbound_control_tx;
|
|
let auth_manager =
|
|
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;
|
|
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
|
|
outgoing: outgoing_message_sender,
|
|
arg0_paths,
|
|
config: Arc::new(config),
|
|
config_manager,
|
|
environment_manager,
|
|
feedback: feedback.clone(),
|
|
log_db,
|
|
config_warnings,
|
|
session_source,
|
|
auth_manager,
|
|
rpc_transport: analytics_rpc_transport(&transport),
|
|
remote_control_handle: Some(remote_control_handle),
|
|
plugin_startup_tasks: runtime_options.plugin_startup_tasks,
|
|
}));
|
|
let mut thread_created_rx = processor.thread_created_receiver();
|
|
let mut running_turn_count_rx = processor.subscribe_running_assistant_turn_count();
|
|
let mut connections = HashMap::<ConnectionId, ConnectionState>::new();
|
|
let transport_shutdown_token = transport_shutdown_token.clone();
|
|
async move {
|
|
let mut listen_for_threads = true;
|
|
let mut shutdown_state = ShutdownState::default();
|
|
loop {
|
|
let running_turn_count = {
|
|
let running_turn_count = running_turn_count_rx.borrow();
|
|
*running_turn_count
|
|
};
|
|
if matches!(
|
|
shutdown_state.update(running_turn_count, connections.len()),
|
|
ShutdownAction::Finish
|
|
) {
|
|
transport_shutdown_token.cancel();
|
|
let _ = outbound_control_tx
|
|
.send(OutboundControlEvent::DisconnectAll)
|
|
.await;
|
|
break;
|
|
}
|
|
|
|
tokio::select! {
|
|
shutdown_signal_result = shutdown_signal(), if graceful_signal_restart_enabled && !shutdown_state.forced() => {
|
|
if let Err(err) = shutdown_signal_result {
|
|
warn!("failed to listen for shutdown signal during graceful restart drain: {err}");
|
|
}
|
|
let running_turn_count = *running_turn_count_rx.borrow();
|
|
shutdown_state.on_signal(connections.len(), running_turn_count);
|
|
}
|
|
changed = running_turn_count_rx.changed(), if graceful_signal_restart_enabled && shutdown_state.requested() => {
|
|
if changed.is_err() {
|
|
warn!("running-turn watcher closed during graceful restart drain");
|
|
}
|
|
}
|
|
event = transport_event_rx.recv() => {
|
|
let Some(event) = event else {
|
|
break;
|
|
};
|
|
match event {
|
|
TransportEvent::ConnectionOpened {
|
|
connection_id,
|
|
origin,
|
|
writer,
|
|
disconnect_sender,
|
|
} => {
|
|
let outbound_initialized = Arc::new(AtomicBool::new(false));
|
|
let outbound_experimental_api_enabled =
|
|
Arc::new(AtomicBool::new(false));
|
|
let outbound_opted_out_notification_methods =
|
|
Arc::new(RwLock::new(HashSet::new()));
|
|
if outbound_control_tx
|
|
.send(OutboundControlEvent::Opened {
|
|
connection_id,
|
|
writer,
|
|
disconnect_sender,
|
|
initialized: Arc::clone(&outbound_initialized),
|
|
experimental_api_enabled: Arc::clone(
|
|
&outbound_experimental_api_enabled,
|
|
),
|
|
opted_out_notification_methods: Arc::clone(
|
|
&outbound_opted_out_notification_methods,
|
|
),
|
|
})
|
|
.await
|
|
.is_err()
|
|
{
|
|
break;
|
|
}
|
|
connections.insert(
|
|
connection_id,
|
|
ConnectionState::new(
|
|
origin,
|
|
outbound_initialized,
|
|
outbound_experimental_api_enabled,
|
|
outbound_opted_out_notification_methods,
|
|
),
|
|
);
|
|
}
|
|
TransportEvent::ConnectionClosed { connection_id } => {
|
|
if connections.remove(&connection_id).is_none() {
|
|
continue;
|
|
}
|
|
if outbound_control_tx
|
|
.send(OutboundControlEvent::Closed { connection_id })
|
|
.await
|
|
.is_err()
|
|
{
|
|
break;
|
|
}
|
|
processor.connection_closed(connection_id).await;
|
|
if shutdown_when_no_connections && connections.is_empty() {
|
|
break;
|
|
}
|
|
}
|
|
TransportEvent::IncomingMessage { connection_id, message } => {
|
|
match message {
|
|
JSONRPCMessage::Request(request) => {
|
|
let Some(connection_state) = connections.get_mut(&connection_id) else {
|
|
warn!("dropping request from unknown connection: {connection_id:?}");
|
|
continue;
|
|
};
|
|
let was_initialized =
|
|
connection_state.session.initialized();
|
|
processor
|
|
.process_request(
|
|
connection_id,
|
|
request,
|
|
&transport,
|
|
Arc::clone(&connection_state.session),
|
|
)
|
|
.await;
|
|
let opted_out_notification_methods_snapshot = connection_state
|
|
.session
|
|
.opted_out_notification_methods();
|
|
let experimental_api_enabled =
|
|
connection_state.session.experimental_api_enabled();
|
|
let is_initialized = connection_state.session.initialized();
|
|
if let Ok(mut opted_out_notification_methods) = connection_state
|
|
.outbound_opted_out_notification_methods
|
|
.write()
|
|
{
|
|
*opted_out_notification_methods =
|
|
opted_out_notification_methods_snapshot;
|
|
} else {
|
|
warn!(
|
|
"failed to update outbound opted-out notifications"
|
|
);
|
|
}
|
|
connection_state
|
|
.outbound_experimental_api_enabled
|
|
.store(
|
|
experimental_api_enabled,
|
|
std::sync::atomic::Ordering::Release,
|
|
);
|
|
if !was_initialized && is_initialized {
|
|
processor
|
|
.send_initialize_notifications_to_connection(
|
|
connection_id,
|
|
)
|
|
.await;
|
|
processor.connection_initialized(connection_id).await;
|
|
connection_state
|
|
.outbound_initialized
|
|
.store(true, std::sync::atomic::Ordering::Release);
|
|
}
|
|
}
|
|
JSONRPCMessage::Response(response) => {
|
|
if !connections.contains_key(&connection_id) {
|
|
warn!("dropping response from unknown connection: {connection_id:?}");
|
|
continue;
|
|
}
|
|
processor.process_response(response).await;
|
|
}
|
|
JSONRPCMessage::Notification(notification) => {
|
|
if !connections.contains_key(&connection_id) {
|
|
warn!("dropping notification from unknown connection: {connection_id:?}");
|
|
continue;
|
|
}
|
|
processor.process_notification(notification).await;
|
|
}
|
|
JSONRPCMessage::Error(err) => {
|
|
if !connections.contains_key(&connection_id) {
|
|
warn!("dropping error from unknown connection: {connection_id:?}");
|
|
continue;
|
|
}
|
|
processor.process_error(err).await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
created = thread_created_rx.recv(), if listen_for_threads => {
|
|
match created {
|
|
Ok(thread_id) => {
|
|
let mut initialized_connection_ids = Vec::new();
|
|
for (connection_id, connection_state) in &connections {
|
|
if connection_state.session.initialized() {
|
|
initialized_connection_ids.push(*connection_id);
|
|
}
|
|
}
|
|
processor
|
|
.try_attach_thread_listener(
|
|
thread_id,
|
|
initialized_connection_ids,
|
|
)
|
|
.await;
|
|
}
|
|
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
|
|
// TODO(jif) handle lag.
|
|
// Assumes thread creation volume is low enough that lag never happens.
|
|
// If it does, we log and continue without resyncing to avoid attaching
|
|
// listeners for threads that should remain unsubscribed.
|
|
warn!("thread_created receiver lagged; skipping resync");
|
|
}
|
|
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
|
listen_for_threads = false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if !shutdown_state.forced() {
|
|
processor.drain_background_tasks().await;
|
|
processor.shutdown_threads().await;
|
|
}
|
|
info!("processor task exited (channel closed)");
|
|
}
|
|
});
|
|
|
|
drop(transport_event_tx);
|
|
|
|
let _ = processor_handle.await;
|
|
let _ = outbound_handle.await;
|
|
|
|
transport_shutdown_token.cancel();
|
|
for handle in transport_accept_handles {
|
|
let _ = handle.await;
|
|
}
|
|
|
|
if let Some(otel) = otel {
|
|
otel.shutdown();
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn analytics_rpc_transport(transport: &AppServerTransport) -> AppServerRpcTransport {
|
|
match transport {
|
|
AppServerTransport::Stdio => AppServerRpcTransport::Stdio,
|
|
AppServerTransport::UnixSocket { .. }
|
|
| AppServerTransport::WebSocket { .. }
|
|
| AppServerTransport::Off => AppServerRpcTransport::Websocket,
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::LogFormat;
|
|
use pretty_assertions::assert_eq;
|
|
|
|
#[test]
|
|
fn log_format_from_env_value_matches_json_values_case_insensitively() {
|
|
assert_eq!(LogFormat::from_env_value(Some("json")), LogFormat::Json);
|
|
assert_eq!(LogFormat::from_env_value(Some("JSON")), LogFormat::Json);
|
|
assert_eq!(LogFormat::from_env_value(Some(" Json ")), LogFormat::Json);
|
|
}
|
|
|
|
#[test]
|
|
fn log_format_from_env_value_defaults_for_non_json_values() {
|
|
assert_eq!(
|
|
LogFormat::from_env_value(/*value*/ None),
|
|
LogFormat::Default
|
|
);
|
|
assert_eq!(LogFormat::from_env_value(Some("")), LogFormat::Default);
|
|
assert_eq!(LogFormat::from_env_value(Some("text")), LogFormat::Default);
|
|
assert_eq!(LogFormat::from_env_value(Some("jsonl")), LogFormat::Default);
|
|
}
|
|
}
|