mirror of
https://github.com/openai/codex.git
synced 2026-03-09 08:03:24 +00:00
Compare commits
1 Commits
dev/ws-ter
...
codex/flak
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3efd0dbff8 |
21
codex-rs/Cargo.lock
generated
21
codex-rs/Cargo.lock
generated
@@ -1458,24 +1458,6 @@ dependencies = [
|
||||
"wiremock",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-app-server-client"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"codex-app-server",
|
||||
"codex-app-server-protocol",
|
||||
"codex-arg0",
|
||||
"codex-core",
|
||||
"codex-feedback",
|
||||
"codex-protocol",
|
||||
"pretty_assertions",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"toml 0.9.11+spec-1.1.0",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-app-server-protocol"
|
||||
version = "0.0.0"
|
||||
@@ -1917,13 +1899,10 @@ dependencies = [
|
||||
"anyhow",
|
||||
"assert_cmd",
|
||||
"clap",
|
||||
"codex-app-server-client",
|
||||
"codex-app-server-protocol",
|
||||
"codex-apply-patch",
|
||||
"codex-arg0",
|
||||
"codex-cloud-requirements",
|
||||
"codex-core",
|
||||
"codex-feedback",
|
||||
"codex-otel",
|
||||
"codex-protocol",
|
||||
"codex-utils-absolute-path",
|
||||
|
||||
@@ -4,7 +4,6 @@ members = [
|
||||
"ansi-escape",
|
||||
"async-utils",
|
||||
"app-server",
|
||||
"app-server-client",
|
||||
"app-server-protocol",
|
||||
"app-server-test-client",
|
||||
"debug-client",
|
||||
@@ -87,7 +86,6 @@ codex-api = { path = "codex-api" }
|
||||
codex-artifacts = { path = "artifacts" }
|
||||
codex-package-manager = { path = "package-manager" }
|
||||
codex-app-server = { path = "app-server" }
|
||||
codex-app-server-client = { path = "app-server-client" }
|
||||
codex-app-server-protocol = { path = "app-server-protocol" }
|
||||
codex-app-server-test-client = { path = "app-server-test-client" }
|
||||
codex-apply-patch = { path = "apply-patch" }
|
||||
|
||||
@@ -1,30 +0,0 @@
|
||||
[package]
|
||||
name = "codex-app-server-client"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lib]
|
||||
name = "codex_app_server_client"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
codex-app-server = { workspace = true }
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
codex-arg0 = { workspace = true }
|
||||
codex-core = { workspace = true }
|
||||
codex-feedback = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
serde = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tokio = { workspace = true, features = ["sync", "time", "rt"] }
|
||||
toml = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
serde_json = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
|
||||
@@ -1,67 +0,0 @@
|
||||
# codex-app-server-client
|
||||
|
||||
Shared in-process app-server client used by conversational CLI surfaces:
|
||||
|
||||
- `codex-exec`
|
||||
- `codex-tui`
|
||||
|
||||
## Purpose
|
||||
|
||||
This crate centralizes startup and lifecycle management for an in-process
|
||||
`codex-app-server` runtime, so CLI clients do not need to duplicate:
|
||||
|
||||
- app-server bootstrap and initialize handshake
|
||||
- in-memory request/event transport wiring
|
||||
- lifecycle orchestration around caller-provided startup identity
|
||||
- graceful shutdown behavior
|
||||
|
||||
## Startup identity
|
||||
|
||||
Callers pass both the app-server `SessionSource` and the initialize
|
||||
`client_info.name` explicitly when starting the facade.
|
||||
|
||||
That keeps thread metadata (for example in `thread/list` and `thread/read`)
|
||||
aligned with the originating runtime without baking TUI/exec-specific policy
|
||||
into the shared client layer.
|
||||
|
||||
## Transport model
|
||||
|
||||
The in-process path uses typed channels:
|
||||
|
||||
- client -> server: `ClientRequest` / `ClientNotification`
|
||||
- server -> client: `InProcessServerEvent`
|
||||
- `ServerRequest`
|
||||
- `ServerNotification`
|
||||
- `LegacyNotification`
|
||||
|
||||
JSON serialization is still used at external transport boundaries
|
||||
(stdio/websocket), but the in-process hot path is typed.
|
||||
|
||||
Typed requests still receive app-server responses through the JSON-RPC
|
||||
result envelope internally. That is intentional: the in-process path is
|
||||
meant to preserve app-server semantics while removing the process
|
||||
boundary, not to introduce a second response contract.
|
||||
|
||||
## Bootstrap behavior
|
||||
|
||||
The client facade starts an already-initialized in-process runtime, but
|
||||
thread bootstrap still follows normal app-server flow:
|
||||
|
||||
- caller sends `thread/start` or `thread/resume`
|
||||
- app-server returns the immediate typed response
|
||||
- richer session metadata may arrive later as a `SessionConfigured`
|
||||
legacy event
|
||||
|
||||
Surfaces such as TUI and exec may therefore need a short bootstrap
|
||||
phase where they reconcile startup response data with later events.
|
||||
|
||||
## Backpressure and shutdown
|
||||
|
||||
- Queues are bounded and use `DEFAULT_IN_PROCESS_CHANNEL_CAPACITY` by default.
|
||||
- Full queues return explicit overload behavior instead of unbounded growth.
|
||||
- `shutdown()` performs a bounded graceful shutdown and then aborts if timeout
|
||||
is exceeded.
|
||||
|
||||
If the client falls behind on event consumption, the worker emits
|
||||
`InProcessServerEvent::Lagged` and may reject pending server requests so
|
||||
approval flows do not hang indefinitely behind a saturated queue.
|
||||
@@ -1,801 +0,0 @@
|
||||
//! Shared in-process app-server client facade for CLI surfaces.
|
||||
//!
|
||||
//! This crate wraps [`codex_app_server::in_process`] behind a single async API
|
||||
//! used by surfaces like TUI and exec. It centralizes:
|
||||
//!
|
||||
//! - Runtime startup and initialize-capabilities handshake.
|
||||
//! - Typed caller-provided startup identity (`SessionSource` + client name).
|
||||
//! - Typed and raw request/notification dispatch.
|
||||
//! - Server request resolution and rejection.
|
||||
//! - Event consumption with backpressure signaling ([`InProcessServerEvent::Lagged`]).
|
||||
//! - Bounded graceful shutdown with abort fallback.
|
||||
//!
|
||||
//! The facade interposes a worker task between the caller and the underlying
|
||||
//! [`InProcessClientHandle`](codex_app_server::in_process::InProcessClientHandle),
|
||||
//! bridging async `mpsc` channels on both sides. Queues are bounded so overload
|
||||
//! surfaces as channel-full errors rather than unbounded memory growth.
|
||||
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::io::Error as IoError;
|
||||
use std::io::ErrorKind;
|
||||
use std::io::Result as IoResult;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
pub use codex_app_server::in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
|
||||
pub use codex_app_server::in_process::InProcessServerEvent;
|
||||
use codex_app_server::in_process::InProcessStartArgs;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::ClientNotification;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::InitializeCapabilities;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result as JsonRpcResult;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use serde::de::DeserializeOwned;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::timeout;
|
||||
use toml::Value as TomlValue;
|
||||
use tracing::warn;
|
||||
|
||||
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Raw app-server request result for typed in-process requests.
|
||||
///
|
||||
/// Even on the in-process path, successful responses still travel back through
|
||||
/// the same JSON-RPC result envelope used by socket/stdio transports because
|
||||
/// `MessageProcessor` continues to produce that shape internally.
|
||||
pub type RequestResult = std::result::Result<JsonRpcResult, JSONRPCErrorError>;
|
||||
|
||||
fn event_requires_delivery(event: &InProcessServerEvent) -> bool {
|
||||
// These terminal events drive surface shutdown/completion state. Dropping
|
||||
// them under backpressure can leave exec/TUI waiting forever even though
|
||||
// the underlying turn has already ended.
|
||||
match event {
|
||||
InProcessServerEvent::ServerNotification(
|
||||
codex_app_server_protocol::ServerNotification::TurnCompleted(_),
|
||||
) => true,
|
||||
InProcessServerEvent::LegacyNotification(notification) => matches!(
|
||||
notification
|
||||
.method
|
||||
.strip_prefix("codex/event/")
|
||||
.unwrap_or(¬ification.method),
|
||||
"task_complete" | "turn_aborted" | "shutdown_complete"
|
||||
),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Layered error for [`InProcessAppServerClient::request_typed`].
|
||||
///
|
||||
/// This keeps transport failures, server-side JSON-RPC failures, and response
|
||||
/// decode failures distinct so callers can decide whether to retry, surface a
|
||||
/// server error, or treat the response as an internal request/response mismatch.
|
||||
#[derive(Debug)]
|
||||
pub enum TypedRequestError {
|
||||
Transport {
|
||||
method: String,
|
||||
source: IoError,
|
||||
},
|
||||
Server {
|
||||
method: String,
|
||||
source: JSONRPCErrorError,
|
||||
},
|
||||
Deserialize {
|
||||
method: String,
|
||||
source: serde_json::Error,
|
||||
},
|
||||
}
|
||||
|
||||
impl fmt::Display for TypedRequestError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
Self::Transport { method, source } => {
|
||||
write!(f, "{method} transport error: {source}")
|
||||
}
|
||||
Self::Server { method, source } => {
|
||||
write!(f, "{method} failed: {}", source.message)
|
||||
}
|
||||
Self::Deserialize { method, source } => {
|
||||
write!(f, "{method} response decode error: {source}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for TypedRequestError {
|
||||
fn source(&self) -> Option<&(dyn Error + 'static)> {
|
||||
match self {
|
||||
Self::Transport { source, .. } => Some(source),
|
||||
Self::Server { .. } => None,
|
||||
Self::Deserialize { source, .. } => Some(source),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct InProcessClientStartArgs {
|
||||
/// Resolved argv0 dispatch paths used by command execution internals.
|
||||
pub arg0_paths: Arg0DispatchPaths,
|
||||
/// Shared config used to initialize app-server runtime.
|
||||
pub config: Arc<Config>,
|
||||
/// CLI config overrides that are already parsed into TOML values.
|
||||
pub cli_overrides: Vec<(String, TomlValue)>,
|
||||
/// Loader override knobs used by config API paths.
|
||||
pub loader_overrides: LoaderOverrides,
|
||||
/// Preloaded cloud requirements provider.
|
||||
pub cloud_requirements: CloudRequirementsLoader,
|
||||
/// Feedback sink used by app-server/core telemetry and logs.
|
||||
pub feedback: CodexFeedback,
|
||||
/// Startup warnings emitted after initialize succeeds.
|
||||
pub config_warnings: Vec<ConfigWarningNotification>,
|
||||
/// Session source recorded in app-server thread metadata.
|
||||
pub session_source: SessionSource,
|
||||
/// Whether auth loading should honor the `CODEX_API_KEY` environment variable.
|
||||
pub enable_codex_api_key_env: bool,
|
||||
/// Client name reported during initialize.
|
||||
pub client_name: String,
|
||||
/// Client version reported during initialize.
|
||||
pub client_version: String,
|
||||
/// Whether experimental APIs are requested at initialize time.
|
||||
pub experimental_api: bool,
|
||||
/// Notification methods this client opts out of receiving.
|
||||
pub opt_out_notification_methods: Vec<String>,
|
||||
/// Queue capacity for command/event channels (clamped to at least 1).
|
||||
pub channel_capacity: usize,
|
||||
}
|
||||
|
||||
impl InProcessClientStartArgs {
|
||||
/// Builds initialize params from caller-provided metadata.
|
||||
pub fn initialize_params(&self) -> InitializeParams {
|
||||
let capabilities = InitializeCapabilities {
|
||||
experimental_api: self.experimental_api,
|
||||
opt_out_notification_methods: if self.opt_out_notification_methods.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(self.opt_out_notification_methods.clone())
|
||||
},
|
||||
};
|
||||
|
||||
InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: self.client_name.clone(),
|
||||
title: None,
|
||||
version: self.client_version.clone(),
|
||||
},
|
||||
capabilities: Some(capabilities),
|
||||
}
|
||||
}
|
||||
|
||||
fn into_runtime_start_args(self) -> InProcessStartArgs {
|
||||
let initialize = self.initialize_params();
|
||||
InProcessStartArgs {
|
||||
arg0_paths: self.arg0_paths,
|
||||
config: self.config,
|
||||
cli_overrides: self.cli_overrides,
|
||||
loader_overrides: self.loader_overrides,
|
||||
cloud_requirements: self.cloud_requirements,
|
||||
feedback: self.feedback,
|
||||
config_warnings: self.config_warnings,
|
||||
session_source: self.session_source,
|
||||
enable_codex_api_key_env: self.enable_codex_api_key_env,
|
||||
initialize,
|
||||
channel_capacity: self.channel_capacity,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Internal command sent from public facade methods to the worker task.
|
||||
///
|
||||
/// Each variant carries a oneshot sender so the caller can `await` the
|
||||
/// result without holding a mutable reference to the client.
|
||||
enum ClientCommand {
|
||||
Request {
|
||||
request: Box<ClientRequest>,
|
||||
response_tx: oneshot::Sender<IoResult<RequestResult>>,
|
||||
},
|
||||
Notify {
|
||||
notification: ClientNotification,
|
||||
response_tx: oneshot::Sender<IoResult<()>>,
|
||||
},
|
||||
ResolveServerRequest {
|
||||
request_id: RequestId,
|
||||
result: JsonRpcResult,
|
||||
response_tx: oneshot::Sender<IoResult<()>>,
|
||||
},
|
||||
RejectServerRequest {
|
||||
request_id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
response_tx: oneshot::Sender<IoResult<()>>,
|
||||
},
|
||||
Shutdown {
|
||||
response_tx: oneshot::Sender<IoResult<()>>,
|
||||
},
|
||||
}
|
||||
|
||||
/// Async facade over the in-process app-server runtime.
|
||||
///
|
||||
/// This type owns a worker task that bridges between:
|
||||
/// - caller-facing async `mpsc` channels used by TUI/exec
|
||||
/// - [`codex_app_server::in_process::InProcessClientHandle`], which speaks to
|
||||
/// the embedded `MessageProcessor`
|
||||
///
|
||||
/// The facade intentionally preserves the server's request/notification/event
|
||||
/// model instead of exposing direct core runtime handles. That keeps in-process
|
||||
/// callers aligned with app-server behavior while still avoiding a process
|
||||
/// boundary.
|
||||
pub struct InProcessAppServerClient {
|
||||
command_tx: mpsc::Sender<ClientCommand>,
|
||||
event_rx: mpsc::Receiver<InProcessServerEvent>,
|
||||
worker_handle: tokio::task::JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl InProcessAppServerClient {
|
||||
/// Starts the in-process runtime and facade worker task.
|
||||
///
|
||||
/// The returned client is ready for requests and event consumption. If the
|
||||
/// internal event queue is saturated later, server requests are rejected
|
||||
/// with overload error instead of being silently dropped.
|
||||
pub async fn start(args: InProcessClientStartArgs) -> IoResult<Self> {
|
||||
let channel_capacity = args.channel_capacity.max(1);
|
||||
let mut handle =
|
||||
codex_app_server::in_process::start(args.into_runtime_start_args()).await?;
|
||||
let request_sender = handle.sender();
|
||||
let (command_tx, mut command_rx) = mpsc::channel::<ClientCommand>(channel_capacity);
|
||||
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
|
||||
|
||||
let worker_handle = tokio::spawn(async move {
|
||||
let mut event_stream_enabled = true;
|
||||
let mut skipped_events = 0usize;
|
||||
loop {
|
||||
tokio::select! {
|
||||
command = command_rx.recv() => {
|
||||
match command {
|
||||
Some(ClientCommand::Request { request, response_tx }) => {
|
||||
let request_sender = request_sender.clone();
|
||||
// Request waits happen on a detached task so
|
||||
// this loop can keep draining runtime events
|
||||
// while the request is blocked on client input.
|
||||
tokio::spawn(async move {
|
||||
let result = request_sender.request(*request).await;
|
||||
let _ = response_tx.send(result);
|
||||
});
|
||||
}
|
||||
Some(ClientCommand::Notify {
|
||||
notification,
|
||||
response_tx,
|
||||
}) => {
|
||||
let result = request_sender.notify(notification);
|
||||
let _ = response_tx.send(result);
|
||||
}
|
||||
Some(ClientCommand::ResolveServerRequest {
|
||||
request_id,
|
||||
result,
|
||||
response_tx,
|
||||
}) => {
|
||||
let send_result =
|
||||
request_sender.respond_to_server_request(request_id, result);
|
||||
let _ = response_tx.send(send_result);
|
||||
}
|
||||
Some(ClientCommand::RejectServerRequest {
|
||||
request_id,
|
||||
error,
|
||||
response_tx,
|
||||
}) => {
|
||||
let send_result = request_sender.fail_server_request(request_id, error);
|
||||
let _ = response_tx.send(send_result);
|
||||
}
|
||||
Some(ClientCommand::Shutdown { response_tx }) => {
|
||||
let shutdown_result = handle.shutdown().await;
|
||||
let _ = response_tx.send(shutdown_result);
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
let _ = handle.shutdown().await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
event = handle.next_event(), if event_stream_enabled => {
|
||||
let Some(event) = event else {
|
||||
break;
|
||||
};
|
||||
|
||||
if skipped_events > 0 {
|
||||
if event_requires_delivery(&event) {
|
||||
// Surface lag before the terminal event, but
|
||||
// do not let the lag marker itself cause us to
|
||||
// drop the completion/abort notification that
|
||||
// the caller is blocked on.
|
||||
if event_tx
|
||||
.send(InProcessServerEvent::Lagged {
|
||||
skipped: skipped_events,
|
||||
})
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
event_stream_enabled = false;
|
||||
continue;
|
||||
}
|
||||
skipped_events = 0;
|
||||
} else {
|
||||
match event_tx.try_send(InProcessServerEvent::Lagged {
|
||||
skipped: skipped_events,
|
||||
}) {
|
||||
Ok(()) => {
|
||||
skipped_events = 0;
|
||||
}
|
||||
Err(mpsc::error::TrySendError::Full(_)) => {
|
||||
skipped_events = skipped_events.saturating_add(1);
|
||||
warn!(
|
||||
"dropping in-process app-server event because consumer queue is full"
|
||||
);
|
||||
if let InProcessServerEvent::ServerRequest(request) = event {
|
||||
let _ = request_sender.fail_server_request(
|
||||
request.id().clone(),
|
||||
JSONRPCErrorError {
|
||||
code: -32001,
|
||||
message: "in-process app-server event queue is full".to_string(),
|
||||
data: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => {
|
||||
event_stream_enabled = false;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if event_requires_delivery(&event) {
|
||||
// Block until the consumer catches up for
|
||||
// terminal notifications; this preserves the
|
||||
// completion signal even when the queue is
|
||||
// otherwise saturated.
|
||||
if event_tx.send(event).await.is_err() {
|
||||
event_stream_enabled = false;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
match event_tx.try_send(event) {
|
||||
Ok(()) => {}
|
||||
Err(mpsc::error::TrySendError::Full(event)) => {
|
||||
skipped_events = skipped_events.saturating_add(1);
|
||||
warn!("dropping in-process app-server event because consumer queue is full");
|
||||
if let InProcessServerEvent::ServerRequest(request) = event {
|
||||
let _ = request_sender.fail_server_request(
|
||||
request.id().clone(),
|
||||
JSONRPCErrorError {
|
||||
code: -32001,
|
||||
message: "in-process app-server event queue is full".to_string(),
|
||||
data: None,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => {
|
||||
event_stream_enabled = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Self {
|
||||
command_tx,
|
||||
event_rx,
|
||||
worker_handle,
|
||||
})
|
||||
}
|
||||
|
||||
/// Sends a typed client request and returns raw JSON-RPC result.
|
||||
///
|
||||
/// Callers that expect a concrete response type should usually prefer
|
||||
/// [`request_typed`](Self::request_typed).
|
||||
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.command_tx
|
||||
.send(ClientCommand::Request {
|
||||
request: Box::new(request),
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"in-process app-server worker channel is closed",
|
||||
)
|
||||
})?;
|
||||
response_rx.await.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"in-process app-server request channel is closed",
|
||||
)
|
||||
})?
|
||||
}
|
||||
|
||||
/// Sends a typed client request and decodes the successful response body.
|
||||
///
|
||||
/// This still deserializes from a JSON value produced by app-server's
|
||||
/// JSON-RPC result envelope. Because the caller chooses `T`, `Deserialize`
|
||||
/// failures indicate an internal request/response mismatch at the call site
|
||||
/// (or an in-process bug), not transport skew from an external client.
|
||||
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
|
||||
where
|
||||
T: DeserializeOwned,
|
||||
{
|
||||
let method = request_method_name(&request);
|
||||
let response =
|
||||
self.request(request)
|
||||
.await
|
||||
.map_err(|source| TypedRequestError::Transport {
|
||||
method: method.clone(),
|
||||
source,
|
||||
})?;
|
||||
let result = response.map_err(|source| TypedRequestError::Server {
|
||||
method: method.clone(),
|
||||
source,
|
||||
})?;
|
||||
serde_json::from_value(result)
|
||||
.map_err(|source| TypedRequestError::Deserialize { method, source })
|
||||
}
|
||||
|
||||
/// Sends a typed client notification.
|
||||
pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.command_tx
|
||||
.send(ClientCommand::Notify {
|
||||
notification,
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"in-process app-server worker channel is closed",
|
||||
)
|
||||
})?;
|
||||
response_rx.await.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"in-process app-server notify channel is closed",
|
||||
)
|
||||
})?
|
||||
}
|
||||
|
||||
/// Resolves a pending server request.
|
||||
///
|
||||
/// This should only be called with request IDs obtained from the current
|
||||
/// client's event stream.
|
||||
pub async fn resolve_server_request(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
result: JsonRpcResult,
|
||||
) -> IoResult<()> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.command_tx
|
||||
.send(ClientCommand::ResolveServerRequest {
|
||||
request_id,
|
||||
result,
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"in-process app-server worker channel is closed",
|
||||
)
|
||||
})?;
|
||||
response_rx.await.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"in-process app-server resolve channel is closed",
|
||||
)
|
||||
})?
|
||||
}
|
||||
|
||||
/// Rejects a pending server request with JSON-RPC error payload.
|
||||
pub async fn reject_server_request(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
) -> IoResult<()> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.command_tx
|
||||
.send(ClientCommand::RejectServerRequest {
|
||||
request_id,
|
||||
error,
|
||||
response_tx,
|
||||
})
|
||||
.await
|
||||
.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"in-process app-server worker channel is closed",
|
||||
)
|
||||
})?;
|
||||
response_rx.await.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"in-process app-server reject channel is closed",
|
||||
)
|
||||
})?
|
||||
}
|
||||
|
||||
/// Returns the next in-process event, or `None` when worker exits.
|
||||
///
|
||||
/// Callers are expected to drain this stream promptly. If they fall behind,
|
||||
/// the worker emits [`InProcessServerEvent::Lagged`] markers and may reject
|
||||
/// pending server requests rather than letting approval flows hang.
|
||||
pub async fn next_event(&mut self) -> Option<InProcessServerEvent> {
|
||||
self.event_rx.recv().await
|
||||
}
|
||||
|
||||
/// Shuts down worker and in-process runtime with bounded wait.
|
||||
///
|
||||
/// If graceful shutdown exceeds timeout, the worker task is aborted to
|
||||
/// avoid leaking background tasks in embedding callers.
|
||||
pub async fn shutdown(self) -> IoResult<()> {
|
||||
let Self {
|
||||
command_tx,
|
||||
event_rx,
|
||||
worker_handle,
|
||||
} = self;
|
||||
let mut worker_handle = worker_handle;
|
||||
// Drop the caller-facing receiver before asking the worker to shut
|
||||
// down. That unblocks any pending must-deliver `event_tx.send(..)`
|
||||
// so the worker can reach `handle.shutdown()` instead of timing out
|
||||
// and getting aborted with the runtime still attached.
|
||||
drop(event_rx);
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
if command_tx
|
||||
.send(ClientCommand::Shutdown { response_tx })
|
||||
.await
|
||||
.is_ok()
|
||||
&& let Ok(command_result) = timeout(SHUTDOWN_TIMEOUT, response_rx).await
|
||||
{
|
||||
command_result.map_err(|_| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"in-process app-server shutdown channel is closed",
|
||||
)
|
||||
})??;
|
||||
}
|
||||
|
||||
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut worker_handle).await {
|
||||
worker_handle.abort();
|
||||
let _ = worker_handle.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Extracts the JSON-RPC method name for diagnostics without extending the
|
||||
/// protocol crate with in-process-only helpers.
|
||||
fn request_method_name(request: &ClientRequest) -> String {
|
||||
serde_json::to_value(request)
|
||||
.ok()
|
||||
.and_then(|value| {
|
||||
value
|
||||
.get("method")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.map(ToOwned::to_owned)
|
||||
})
|
||||
.unwrap_or_else(|| "<unknown>".to_string())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use codex_app_server_protocol::ConfigRequirementsReadResponse;
|
||||
use codex_app_server_protocol::SessionSource as ApiSessionSource;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
|
||||
async fn build_test_config() -> Config {
|
||||
match ConfigBuilder::default().build().await {
|
||||
Ok(config) => config,
|
||||
Err(_) => Config::load_default_with_cli_overrides(Vec::new())
|
||||
.expect("default config should load"),
|
||||
}
|
||||
}
|
||||
|
||||
async fn start_test_client_with_capacity(
|
||||
session_source: SessionSource,
|
||||
channel_capacity: usize,
|
||||
) -> InProcessAppServerClient {
|
||||
InProcessAppServerClient::start(InProcessClientStartArgs {
|
||||
arg0_paths: Arg0DispatchPaths::default(),
|
||||
config: Arc::new(build_test_config().await),
|
||||
cli_overrides: Vec::new(),
|
||||
loader_overrides: LoaderOverrides::default(),
|
||||
cloud_requirements: CloudRequirementsLoader::default(),
|
||||
feedback: CodexFeedback::new(),
|
||||
config_warnings: Vec::new(),
|
||||
session_source,
|
||||
enable_codex_api_key_env: false,
|
||||
client_name: "codex-app-server-client-test".to_string(),
|
||||
client_version: "0.0.0-test".to_string(),
|
||||
experimental_api: true,
|
||||
opt_out_notification_methods: Vec::new(),
|
||||
channel_capacity,
|
||||
})
|
||||
.await
|
||||
.expect("in-process app-server client should start")
|
||||
}
|
||||
|
||||
async fn start_test_client(session_source: SessionSource) -> InProcessAppServerClient {
|
||||
start_test_client_with_capacity(session_source, DEFAULT_IN_PROCESS_CHANNEL_CAPACITY).await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn typed_request_roundtrip_works() {
|
||||
let client = start_test_client(SessionSource::Exec).await;
|
||||
let _response: ConfigRequirementsReadResponse = client
|
||||
.request_typed(ClientRequest::ConfigRequirementsRead {
|
||||
request_id: RequestId::Integer(1),
|
||||
params: None,
|
||||
})
|
||||
.await
|
||||
.expect("typed request should succeed");
|
||||
client.shutdown().await.expect("shutdown should complete");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn typed_request_reports_json_rpc_errors() {
|
||||
let client = start_test_client(SessionSource::Exec).await;
|
||||
let err = client
|
||||
.request_typed::<ConfigRequirementsReadResponse>(ClientRequest::ThreadRead {
|
||||
request_id: RequestId::Integer(99),
|
||||
params: codex_app_server_protocol::ThreadReadParams {
|
||||
thread_id: "missing-thread".to_string(),
|
||||
include_turns: false,
|
||||
},
|
||||
})
|
||||
.await
|
||||
.expect_err("missing thread should return a JSON-RPC error");
|
||||
assert!(
|
||||
err.to_string().starts_with("thread/read failed:"),
|
||||
"expected method-qualified JSON-RPC failure message"
|
||||
);
|
||||
client.shutdown().await.expect("shutdown should complete");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn caller_provided_session_source_is_applied() {
|
||||
for (session_source, expected_source) in [
|
||||
(SessionSource::Exec, ApiSessionSource::Exec),
|
||||
(SessionSource::Cli, ApiSessionSource::Cli),
|
||||
] {
|
||||
let client = start_test_client(session_source).await;
|
||||
let parsed: ThreadStartResponse = client
|
||||
.request_typed(ClientRequest::ThreadStart {
|
||||
request_id: RequestId::Integer(2),
|
||||
params: ThreadStartParams {
|
||||
ephemeral: Some(true),
|
||||
..ThreadStartParams::default()
|
||||
},
|
||||
})
|
||||
.await
|
||||
.expect("thread/start should succeed");
|
||||
assert_eq!(parsed.thread.source, expected_source);
|
||||
client.shutdown().await.expect("shutdown should complete");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn tiny_channel_capacity_still_supports_request_roundtrip() {
|
||||
let client = start_test_client_with_capacity(SessionSource::Exec, 1).await;
|
||||
let _response: ConfigRequirementsReadResponse = client
|
||||
.request_typed(ClientRequest::ConfigRequirementsRead {
|
||||
request_id: RequestId::Integer(1),
|
||||
params: None,
|
||||
})
|
||||
.await
|
||||
.expect("typed request should succeed");
|
||||
client.shutdown().await.expect("shutdown should complete");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn typed_request_error_exposes_sources() {
|
||||
let transport = TypedRequestError::Transport {
|
||||
method: "config/read".to_string(),
|
||||
source: IoError::new(ErrorKind::BrokenPipe, "closed"),
|
||||
};
|
||||
assert_eq!(std::error::Error::source(&transport).is_some(), true);
|
||||
|
||||
let server = TypedRequestError::Server {
|
||||
method: "thread/read".to_string(),
|
||||
source: JSONRPCErrorError {
|
||||
code: -32603,
|
||||
data: None,
|
||||
message: "internal".to_string(),
|
||||
},
|
||||
};
|
||||
assert_eq!(std::error::Error::source(&server).is_some(), false);
|
||||
|
||||
let deserialize = TypedRequestError::Deserialize {
|
||||
method: "thread/start".to_string(),
|
||||
source: serde_json::from_str::<u32>("\"nope\"")
|
||||
.expect_err("invalid integer should return deserialize error"),
|
||||
};
|
||||
assert_eq!(std::error::Error::source(&deserialize).is_some(), true);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn next_event_surfaces_lagged_markers() {
|
||||
let (command_tx, _command_rx) = mpsc::channel(1);
|
||||
let (event_tx, event_rx) = mpsc::channel(1);
|
||||
let worker_handle = tokio::spawn(async {});
|
||||
event_tx
|
||||
.send(InProcessServerEvent::Lagged { skipped: 3 })
|
||||
.await
|
||||
.expect("lagged marker should enqueue");
|
||||
drop(event_tx);
|
||||
|
||||
let mut client = InProcessAppServerClient {
|
||||
command_tx,
|
||||
event_rx,
|
||||
worker_handle,
|
||||
};
|
||||
|
||||
let event = timeout(Duration::from_secs(2), client.next_event())
|
||||
.await
|
||||
.expect("lagged marker should arrive before timeout");
|
||||
assert!(matches!(
|
||||
event,
|
||||
Some(InProcessServerEvent::Lagged { skipped: 3 })
|
||||
));
|
||||
|
||||
client.shutdown().await.expect("shutdown should complete");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_requires_delivery_marks_terminal_events() {
|
||||
assert!(event_requires_delivery(
|
||||
&InProcessServerEvent::ServerNotification(
|
||||
codex_app_server_protocol::ServerNotification::TurnCompleted(
|
||||
codex_app_server_protocol::TurnCompletedNotification {
|
||||
thread_id: "thread".to_string(),
|
||||
turn: codex_app_server_protocol::Turn {
|
||||
id: "turn".to_string(),
|
||||
items: Vec::new(),
|
||||
status: codex_app_server_protocol::TurnStatus::Completed,
|
||||
error: None,
|
||||
},
|
||||
}
|
||||
)
|
||||
)
|
||||
));
|
||||
assert!(event_requires_delivery(
|
||||
&InProcessServerEvent::LegacyNotification(
|
||||
codex_app_server_protocol::JSONRPCNotification {
|
||||
method: "codex/event/turn_aborted".to_string(),
|
||||
params: None,
|
||||
}
|
||||
)
|
||||
));
|
||||
assert!(!event_requires_delivery(&InProcessServerEvent::Lagged {
|
||||
skipped: 1
|
||||
}));
|
||||
}
|
||||
}
|
||||
@@ -350,10 +350,6 @@
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"reloadUserConfig": {
|
||||
"description": "When true, hot-reload the updated user config into all loaded threads after writing.",
|
||||
"type": "boolean"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
|
||||
@@ -9943,10 +9943,6 @@
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"reloadUserConfig": {
|
||||
"description": "When true, hot-reload the updated user config into all loaded threads after writing.",
|
||||
"type": "boolean"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
|
||||
@@ -2962,10 +2962,6 @@
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"reloadUserConfig": {
|
||||
"description": "When true, hot-reload the updated user config into all loaded threads after writing.",
|
||||
"type": "boolean"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
|
||||
@@ -45,10 +45,6 @@
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
},
|
||||
"reloadUserConfig": {
|
||||
"description": "When true, hot-reload the updated user config into all loaded threads after writing.",
|
||||
"type": "boolean"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
|
||||
@@ -7,8 +7,4 @@ export type ConfigBatchWriteParams = { edits: Array<ConfigEdit>,
|
||||
/**
|
||||
* Path to the config file to write; defaults to the user's `config.toml` when omitted.
|
||||
*/
|
||||
filePath?: string | null, expectedVersion?: string | null,
|
||||
/**
|
||||
* When true, hot-reload the updated user config into all loaded threads after writing.
|
||||
*/
|
||||
reloadUserConfig?: boolean, };
|
||||
filePath?: string | null, expectedVersion?: string | null, };
|
||||
|
||||
@@ -110,26 +110,6 @@ macro_rules! client_request_definitions {
|
||||
)*
|
||||
}
|
||||
|
||||
impl ClientRequest {
|
||||
pub fn id(&self) -> &RequestId {
|
||||
match self {
|
||||
$(Self::$variant { request_id, .. } => request_id,)*
|
||||
}
|
||||
}
|
||||
|
||||
pub fn method(&self) -> String {
|
||||
serde_json::to_value(self)
|
||||
.ok()
|
||||
.and_then(|value| {
|
||||
value
|
||||
.get("method")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.map(str::to_owned)
|
||||
})
|
||||
.unwrap_or_else(|| "<unknown>".to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl crate::experimental_api::ExperimentalApi for ClientRequest {
|
||||
fn experimental_reason(&self) -> Option<&'static str> {
|
||||
match self {
|
||||
@@ -1156,8 +1136,6 @@ mod tests {
|
||||
request_id: RequestId::Integer(1),
|
||||
params: None,
|
||||
};
|
||||
assert_eq!(request.id(), &RequestId::Integer(1));
|
||||
assert_eq!(request.method(), "account/rateLimits/read");
|
||||
assert_eq!(
|
||||
json!({
|
||||
"method": "account/rateLimits/read",
|
||||
|
||||
@@ -734,9 +734,6 @@ pub struct ConfigBatchWriteParams {
|
||||
pub file_path: Option<String>,
|
||||
#[ts(optional = nullable)]
|
||||
pub expected_version: Option<String>,
|
||||
/// When true, hot-reload the updated user config into all loaded threads after writing.
|
||||
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
|
||||
pub reload_user_config: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
|
||||
@@ -169,7 +169,7 @@ Example with notification opt-out:
|
||||
- `externalAgentConfig/detect` — detect migratable external-agent artifacts with `includeHome` and optional `cwds`; each detected item includes `cwd` (`null` for home).
|
||||
- `externalAgentConfig/import` — apply selected external-agent migration items by passing explicit `migrationItems` with `cwd` (`null` for home).
|
||||
- `config/value/write` — write a single config key/value to the user's config.toml on disk.
|
||||
- `config/batchWrite` — apply multiple config edits atomically to the user's config.toml on disk, with optional `reloadUserConfig: true` to hot-reload loaded threads.
|
||||
- `config/batchWrite` — apply multiple config edits atomically to the user's config.toml on disk.
|
||||
- `configRequirements/read` — fetch loaded requirements constraints from `requirements.toml` and/or MDM (or `null` if none are configured), including allow-lists (`allowedApprovalPolicies`, `allowedSandboxModes`, `allowedWebSearchModes`), pinned feature values (`featureRequirements`), `enforceResidency`, and `network` constraints.
|
||||
|
||||
### Example: Start or resume a thread
|
||||
|
||||
@@ -1,16 +1,6 @@
|
||||
//! Tracing helpers shared by socket and in-process app-server entry points.
|
||||
//!
|
||||
//! The in-process path intentionally reuses the same span shape as JSON-RPC
|
||||
//! transports so request telemetry stays comparable across stdio, websocket,
|
||||
//! and embedded callers. [`typed_request_span`] is the in-process counterpart
|
||||
//! of [`request_span`] and stamps `rpc.transport` as `"in-process"` while
|
||||
//! deriving client identity from the typed [`ClientRequest`] rather than
|
||||
//! from a parsed JSON envelope.
|
||||
|
||||
use crate::message_processor::ConnectionSessionState;
|
||||
use crate::outgoing_message::ConnectionId;
|
||||
use crate::transport::AppServerTransport;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::JSONRPCRequest;
|
||||
use codex_otel::set_parent_from_context;
|
||||
@@ -75,51 +65,6 @@ pub(crate) fn request_span(
|
||||
span
|
||||
}
|
||||
|
||||
/// Builds tracing span metadata for typed in-process requests.
|
||||
///
|
||||
/// This mirrors `request_span` semantics while stamping transport as
|
||||
/// `in-process` and deriving client info either from initialize params or
|
||||
/// from existing connection session state.
|
||||
pub(crate) fn typed_request_span(
|
||||
request: &ClientRequest,
|
||||
connection_id: ConnectionId,
|
||||
session: &ConnectionSessionState,
|
||||
) -> Span {
|
||||
let method = request.method();
|
||||
let span = info_span!(
|
||||
"app_server.request",
|
||||
otel.kind = "server",
|
||||
otel.name = method,
|
||||
rpc.system = "jsonrpc",
|
||||
rpc.method = method,
|
||||
rpc.transport = "in-process",
|
||||
rpc.request_id = ?request.id(),
|
||||
app_server.connection_id = ?connection_id,
|
||||
app_server.api_version = "v2",
|
||||
app_server.client_name = field::Empty,
|
||||
app_server.client_version = field::Empty,
|
||||
);
|
||||
|
||||
if let Some((client_name, client_version)) = initialize_client_info_from_typed_request(request)
|
||||
{
|
||||
span.record("app_server.client_name", client_name);
|
||||
span.record("app_server.client_version", client_version);
|
||||
} else {
|
||||
if let Some(client_name) = session.app_server_client_name.as_deref() {
|
||||
span.record("app_server.client_name", client_name);
|
||||
}
|
||||
if let Some(client_version) = session.client_version.as_deref() {
|
||||
span.record("app_server.client_version", client_version);
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(context) = traceparent_context_from_env() {
|
||||
set_parent_from_context(&span, context);
|
||||
}
|
||||
|
||||
span
|
||||
}
|
||||
|
||||
fn transport_name(transport: AppServerTransport) -> &'static str {
|
||||
match transport {
|
||||
AppServerTransport::Stdio => "stdio",
|
||||
@@ -154,13 +99,3 @@ fn initialize_client_info(request: &JSONRPCRequest) -> Option<InitializeParams>
|
||||
let params = request.params.clone()?;
|
||||
serde_json::from_value(params).ok()
|
||||
}
|
||||
|
||||
fn initialize_client_info_from_typed_request(request: &ClientRequest) -> Option<(&str, &str)> {
|
||||
match request {
|
||||
ClientRequest::Initialize { params, .. } => Some((
|
||||
params.client_info.name.as_str(),
|
||||
params.client_info.version.as_str(),
|
||||
)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use crate::error_code::INTERNAL_ERROR_CODE;
|
||||
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
||||
use async_trait::async_trait;
|
||||
use codex_app_server_protocol::ConfigBatchWriteParams;
|
||||
use codex_app_server_protocol::ConfigReadParams;
|
||||
use codex_app_server_protocol::ConfigReadResponse;
|
||||
@@ -12,7 +11,6 @@ use codex_app_server_protocol::ConfigWriteResponse;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::NetworkRequirements;
|
||||
use codex_app_server_protocol::SandboxMode;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::config::ConfigService;
|
||||
use codex_core::config::ConfigServiceError;
|
||||
use codex_core::config_loader::CloudRequirementsLoader;
|
||||
@@ -21,33 +19,11 @@ use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_core::config_loader::ResidencyRequirement as CoreResidencyRequirement;
|
||||
use codex_core::config_loader::SandboxModeRequirement as CoreSandboxModeRequirement;
|
||||
use codex_protocol::config_types::WebSearchMode;
|
||||
use codex_protocol::protocol::Op;
|
||||
use serde_json::json;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use toml::Value as TomlValue;
|
||||
use tracing::warn;
|
||||
|
||||
#[async_trait]
|
||||
pub(crate) trait UserConfigReloader: Send + Sync {
|
||||
async fn reload_user_config(&self);
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl UserConfigReloader for ThreadManager {
|
||||
async fn reload_user_config(&self) {
|
||||
let thread_ids = self.list_thread_ids().await;
|
||||
for thread_id in thread_ids {
|
||||
let Ok(thread) = self.get_thread(thread_id).await else {
|
||||
continue;
|
||||
};
|
||||
if let Err(err) = thread.submit(Op::ReloadUserConfig).await {
|
||||
warn!("failed to request user config reload: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ConfigApi {
|
||||
@@ -55,7 +31,6 @@ pub(crate) struct ConfigApi {
|
||||
cli_overrides: Vec<(String, TomlValue)>,
|
||||
loader_overrides: LoaderOverrides,
|
||||
cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
|
||||
user_config_reloader: Arc<dyn UserConfigReloader>,
|
||||
}
|
||||
|
||||
impl ConfigApi {
|
||||
@@ -64,14 +39,12 @@ impl ConfigApi {
|
||||
cli_overrides: Vec<(String, TomlValue)>,
|
||||
loader_overrides: LoaderOverrides,
|
||||
cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
|
||||
user_config_reloader: Arc<dyn UserConfigReloader>,
|
||||
) -> Self {
|
||||
Self {
|
||||
codex_home,
|
||||
cli_overrides,
|
||||
loader_overrides,
|
||||
cloud_requirements,
|
||||
user_config_reloader,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -123,16 +96,10 @@ impl ConfigApi {
|
||||
&self,
|
||||
params: ConfigBatchWriteParams,
|
||||
) -> Result<ConfigWriteResponse, JSONRPCErrorError> {
|
||||
let reload_user_config = params.reload_user_config;
|
||||
let response = self
|
||||
.config_service()
|
||||
self.config_service()
|
||||
.batch_write(params)
|
||||
.await
|
||||
.map_err(map_error)?;
|
||||
if reload_user_config {
|
||||
self.user_config_reloader.reload_user_config().await;
|
||||
}
|
||||
Ok(response)
|
||||
.map_err(map_error)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -232,22 +199,6 @@ mod tests {
|
||||
use codex_core::config_loader::NetworkRequirementsToml as CoreNetworkRequirementsToml;
|
||||
use codex_protocol::protocol::AskForApproval as CoreAskForApproval;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[derive(Default)]
|
||||
struct RecordingUserConfigReloader {
|
||||
call_count: AtomicUsize,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl UserConfigReloader for RecordingUserConfigReloader {
|
||||
async fn reload_user_config(&self) {
|
||||
self.call_count.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn map_requirements_toml_to_api_converts_core_enums() {
|
||||
@@ -352,51 +303,4 @@ mod tests {
|
||||
Some(vec![WebSearchMode::Disabled])
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn batch_write_reloads_user_config_when_requested() {
|
||||
let codex_home = TempDir::new().expect("create temp dir");
|
||||
let user_config_path = codex_home.path().join("config.toml");
|
||||
std::fs::write(&user_config_path, "").expect("write config");
|
||||
let reloader = Arc::new(RecordingUserConfigReloader::default());
|
||||
let config_api = ConfigApi::new(
|
||||
codex_home.path().to_path_buf(),
|
||||
Vec::new(),
|
||||
LoaderOverrides::default(),
|
||||
Arc::new(RwLock::new(CloudRequirementsLoader::default())),
|
||||
reloader.clone(),
|
||||
);
|
||||
|
||||
let response = config_api
|
||||
.batch_write(ConfigBatchWriteParams {
|
||||
edits: vec![codex_app_server_protocol::ConfigEdit {
|
||||
key_path: "model".to_string(),
|
||||
value: json!("gpt-5"),
|
||||
merge_strategy: codex_app_server_protocol::MergeStrategy::Replace,
|
||||
}],
|
||||
file_path: Some(user_config_path.display().to_string()),
|
||||
expected_version: None,
|
||||
reload_user_config: true,
|
||||
})
|
||||
.await
|
||||
.expect("batch write should succeed");
|
||||
|
||||
assert_eq!(
|
||||
response,
|
||||
ConfigWriteResponse {
|
||||
status: codex_app_server_protocol::WriteStatus::Ok,
|
||||
version: response.version.clone(),
|
||||
file_path: codex_utils_absolute_path::AbsolutePathBuf::try_from(
|
||||
user_config_path.clone()
|
||||
)
|
||||
.expect("absolute config path"),
|
||||
overridden_metadata: None,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
std::fs::read_to_string(user_config_path).unwrap(),
|
||||
"model = \"gpt-5\"\n"
|
||||
);
|
||||
assert_eq!(reloader.call_count.load(Ordering::Relaxed), 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,884 +0,0 @@
|
||||
//! In-process app-server runtime host for local embedders.
|
||||
//!
|
||||
//! This module runs the existing [`MessageProcessor`] and outbound routing logic
|
||||
//! on Tokio tasks, but replaces socket/stdio transports with bounded in-memory
|
||||
//! channels. The intent is to preserve app-server semantics while avoiding a
|
||||
//! process boundary for CLI surfaces that run in the same process.
|
||||
//!
|
||||
//! # Lifecycle
|
||||
//!
|
||||
//! 1. Construct runtime state with [`InProcessStartArgs`].
|
||||
//! 2. Call [`start`], which performs the `initialize` / `initialized` handshake
|
||||
//! internally and returns a ready-to-use [`InProcessClientHandle`].
|
||||
//! 3. Send requests via [`InProcessClientHandle::request`], notifications via
|
||||
//! [`InProcessClientHandle::notify`], and consume events via
|
||||
//! [`InProcessClientHandle::next_event`].
|
||||
//! 4. Terminate with [`InProcessClientHandle::shutdown`].
|
||||
//!
|
||||
//! # Transport model
|
||||
//!
|
||||
//! The runtime is transport-local but not protocol-free. Incoming requests are
|
||||
//! typed [`ClientRequest`] values, yet responses still come back through the
|
||||
//! same JSON-RPC result envelope that `MessageProcessor` uses for stdio and
|
||||
//! websocket transports. This keeps in-process behavior aligned with
|
||||
//! app-server rather than creating a second execution contract.
|
||||
//!
|
||||
//! # Backpressure
|
||||
//!
|
||||
//! Command submission uses `try_send` and can return `WouldBlock`, while event
|
||||
//! fanout may drop notifications under saturation. Server requests are never
|
||||
//! silently abandoned: if they cannot be queued they are failed back into
|
||||
//! `MessageProcessor` with overload or internal errors so approval flows do
|
||||
//! not hang indefinitely.
|
||||
//!
|
||||
//! # Relationship to `codex-app-server-client`
|
||||
//!
|
||||
//! This module provides the low-level runtime handle ([`InProcessClientHandle`]).
|
||||
//! Higher-level callers (TUI, exec) should go through `codex-app-server-client`,
|
||||
//! which wraps this module behind a worker task with async request/response
|
||||
//! helpers, surface-specific startup policy, and bounded shutdown.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::io::Error as IoError;
|
||||
use std::io::ErrorKind;
|
||||
use std::io::Result as IoResult;
|
||||
use std::sync::Arc;
|
||||
use std::sync::RwLock;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::error_code::INTERNAL_ERROR_CODE;
|
||||
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
||||
use crate::error_code::OVERLOADED_ERROR_CODE;
|
||||
use crate::message_processor::ConnectionSessionState;
|
||||
use crate::message_processor::MessageProcessor;
|
||||
use crate::message_processor::MessageProcessorArgs;
|
||||
use crate::outgoing_message::ConnectionId;
|
||||
use crate::outgoing_message::OutgoingEnvelope;
|
||||
use crate::outgoing_message::OutgoingMessage;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use crate::transport::CHANNEL_CAPACITY;
|
||||
use crate::transport::OutboundConnectionState;
|
||||
use crate::transport::route_outgoing_envelope;
|
||||
use codex_app_server_protocol::ClientNotification;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::InitializeParams;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::Result;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::timeout;
|
||||
use toml::Value as TomlValue;
|
||||
use tracing::warn;
|
||||
|
||||
const IN_PROCESS_CONNECTION_ID: ConnectionId = ConnectionId(0);
|
||||
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
/// Default bounded channel capacity for in-process runtime queues.
|
||||
pub const DEFAULT_IN_PROCESS_CHANNEL_CAPACITY: usize = CHANNEL_CAPACITY;
|
||||
|
||||
type PendingClientRequestResponse = std::result::Result<Result, JSONRPCErrorError>;
|
||||
|
||||
fn server_notification_requires_delivery(notification: &ServerNotification) -> bool {
|
||||
matches!(notification, ServerNotification::TurnCompleted(_))
|
||||
}
|
||||
|
||||
fn legacy_notification_requires_delivery(notification: &JSONRPCNotification) -> bool {
|
||||
matches!(
|
||||
notification
|
||||
.method
|
||||
.strip_prefix("codex/event/")
|
||||
.unwrap_or(¬ification.method),
|
||||
"task_complete" | "turn_aborted" | "shutdown_complete"
|
||||
)
|
||||
}
|
||||
|
||||
/// Input needed to start an in-process app-server runtime.
|
||||
///
|
||||
/// These fields mirror the pieces of ambient process state that stdio and
|
||||
/// websocket transports normally assemble before `MessageProcessor` starts.
|
||||
#[derive(Clone)]
|
||||
pub struct InProcessStartArgs {
|
||||
/// Resolved argv0 dispatch paths used by command execution internals.
|
||||
pub arg0_paths: Arg0DispatchPaths,
|
||||
/// Shared base config used to initialize core components.
|
||||
pub config: Arc<Config>,
|
||||
/// CLI config overrides that are already parsed into TOML values.
|
||||
pub cli_overrides: Vec<(String, TomlValue)>,
|
||||
/// Loader override knobs used by config API paths.
|
||||
pub loader_overrides: LoaderOverrides,
|
||||
/// Preloaded cloud requirements provider.
|
||||
pub cloud_requirements: CloudRequirementsLoader,
|
||||
/// Feedback sink used by app-server/core telemetry and logs.
|
||||
pub feedback: CodexFeedback,
|
||||
/// Startup warnings emitted after initialize succeeds.
|
||||
pub config_warnings: Vec<ConfigWarningNotification>,
|
||||
/// Session source stamped into thread/session metadata.
|
||||
pub session_source: SessionSource,
|
||||
/// Whether auth loading should honor the `CODEX_API_KEY` environment variable.
|
||||
pub enable_codex_api_key_env: bool,
|
||||
/// Initialize params used for initial handshake.
|
||||
pub initialize: InitializeParams,
|
||||
/// Capacity used for all runtime queues (clamped to at least 1).
|
||||
pub channel_capacity: usize,
|
||||
}
|
||||
|
||||
/// Event emitted from the app-server to the in-process client.
|
||||
///
|
||||
/// The stream carries three event families because CLI surfaces are mid-migration
|
||||
/// from the legacy `codex_protocol::Event` model to the typed app-server
|
||||
/// notification model. Once all surfaces consume only [`ServerNotification`],
|
||||
/// [`LegacyNotification`](Self::LegacyNotification) can be removed.
|
||||
///
|
||||
/// [`Lagged`](Self::Lagged) is a transport health marker, not an application
|
||||
/// event — it signals that the consumer fell behind and some events were dropped.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum InProcessServerEvent {
|
||||
/// Server request that requires client response/rejection.
|
||||
ServerRequest(ServerRequest),
|
||||
/// App-server notification directed to the embedded client.
|
||||
ServerNotification(ServerNotification),
|
||||
/// Legacy JSON-RPC notification from core event bridge.
|
||||
LegacyNotification(JSONRPCNotification),
|
||||
/// Indicates one or more events were dropped due to backpressure.
|
||||
Lagged { skipped: usize },
|
||||
}
|
||||
|
||||
/// Internal message sent from [`InProcessClientHandle`] methods to the runtime task.
|
||||
///
|
||||
/// Requests carry a oneshot sender for the response; notifications and server-request
|
||||
/// replies are fire-and-forget from the caller's perspective (transport errors are
|
||||
/// caught by `try_send` on the outer channel).
|
||||
enum InProcessClientMessage {
|
||||
Request {
|
||||
request: Box<ClientRequest>,
|
||||
response_tx: oneshot::Sender<PendingClientRequestResponse>,
|
||||
},
|
||||
Notification {
|
||||
notification: ClientNotification,
|
||||
},
|
||||
ServerRequestResponse {
|
||||
request_id: RequestId,
|
||||
result: Result,
|
||||
},
|
||||
ServerRequestError {
|
||||
request_id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
},
|
||||
Shutdown {
|
||||
done_tx: oneshot::Sender<()>,
|
||||
},
|
||||
}
|
||||
|
||||
enum ProcessorCommand {
|
||||
Request(Box<ClientRequest>),
|
||||
Notification(ClientNotification),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct InProcessClientSender {
|
||||
client_tx: mpsc::Sender<InProcessClientMessage>,
|
||||
}
|
||||
|
||||
impl InProcessClientSender {
|
||||
pub async fn request(&self, request: ClientRequest) -> IoResult<PendingClientRequestResponse> {
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
self.try_send_client_message(InProcessClientMessage::Request {
|
||||
request: Box::new(request),
|
||||
response_tx,
|
||||
})?;
|
||||
response_rx.await.map_err(|err| {
|
||||
IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
format!("in-process request response channel closed: {err}"),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
pub fn notify(&self, notification: ClientNotification) -> IoResult<()> {
|
||||
self.try_send_client_message(InProcessClientMessage::Notification { notification })
|
||||
}
|
||||
|
||||
pub fn respond_to_server_request(&self, request_id: RequestId, result: Result) -> IoResult<()> {
|
||||
self.try_send_client_message(InProcessClientMessage::ServerRequestResponse {
|
||||
request_id,
|
||||
result,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn fail_server_request(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
) -> IoResult<()> {
|
||||
self.try_send_client_message(InProcessClientMessage::ServerRequestError {
|
||||
request_id,
|
||||
error,
|
||||
})
|
||||
}
|
||||
|
||||
fn try_send_client_message(&self, message: InProcessClientMessage) -> IoResult<()> {
|
||||
match self.client_tx.try_send(message) {
|
||||
Ok(()) => Ok(()),
|
||||
Err(mpsc::error::TrySendError::Full(_)) => Err(IoError::new(
|
||||
ErrorKind::WouldBlock,
|
||||
"in-process app-server client queue is full",
|
||||
)),
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => Err(IoError::new(
|
||||
ErrorKind::BrokenPipe,
|
||||
"in-process app-server runtime is closed",
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle used by an in-process client to call app-server and consume events.
|
||||
///
|
||||
/// This is the low-level runtime handle. Higher-level callers should usually go
|
||||
/// through `codex-app-server-client`, which adds worker-task buffering,
|
||||
/// request/response helpers, and surface-specific startup policy.
|
||||
pub struct InProcessClientHandle {
|
||||
client: InProcessClientSender,
|
||||
event_rx: mpsc::Receiver<InProcessServerEvent>,
|
||||
runtime_handle: tokio::task::JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl InProcessClientHandle {
|
||||
/// Sends a typed client request into the in-process runtime.
|
||||
///
|
||||
/// The returned value is a transport-level `IoResult` containing either a
|
||||
/// JSON-RPC success payload or JSON-RPC error payload. Callers must keep
|
||||
/// request IDs unique among concurrent requests; reusing an in-flight ID
|
||||
/// produces an `INVALID_REQUEST` response and can make request routing
|
||||
/// ambiguous in the caller.
|
||||
pub async fn request(&self, request: ClientRequest) -> IoResult<PendingClientRequestResponse> {
|
||||
self.client.request(request).await
|
||||
}
|
||||
|
||||
/// Sends a typed client notification into the in-process runtime.
|
||||
///
|
||||
/// Notifications do not have an application-level response. Transport
|
||||
/// errors indicate queue saturation or closed runtime.
|
||||
pub fn notify(&self, notification: ClientNotification) -> IoResult<()> {
|
||||
self.client.notify(notification)
|
||||
}
|
||||
|
||||
/// Resolves a pending [`ServerRequest`](InProcessServerEvent::ServerRequest).
|
||||
///
|
||||
/// This should be used only with request IDs received from the current
|
||||
/// runtime event stream; sending arbitrary IDs has no effect on app-server
|
||||
/// state and can mask a stuck approval flow in the caller.
|
||||
pub fn respond_to_server_request(&self, request_id: RequestId, result: Result) -> IoResult<()> {
|
||||
self.client.respond_to_server_request(request_id, result)
|
||||
}
|
||||
|
||||
/// Rejects a pending [`ServerRequest`](InProcessServerEvent::ServerRequest).
|
||||
///
|
||||
/// Use this when the embedder cannot satisfy a server request; leaving
|
||||
/// requests unanswered can stall turn progress.
|
||||
pub fn fail_server_request(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
error: JSONRPCErrorError,
|
||||
) -> IoResult<()> {
|
||||
self.client.fail_server_request(request_id, error)
|
||||
}
|
||||
|
||||
/// Receives the next server event from the in-process runtime.
|
||||
///
|
||||
/// Returns `None` when the runtime task exits and no more events are
|
||||
/// available.
|
||||
pub async fn next_event(&mut self) -> Option<InProcessServerEvent> {
|
||||
self.event_rx.recv().await
|
||||
}
|
||||
|
||||
/// Requests runtime shutdown and waits for worker termination.
|
||||
///
|
||||
/// Shutdown is bounded by internal timeouts and may abort background tasks
|
||||
/// if graceful drain does not complete in time.
|
||||
pub async fn shutdown(self) -> IoResult<()> {
|
||||
let mut runtime_handle = self.runtime_handle;
|
||||
let (done_tx, done_rx) = oneshot::channel();
|
||||
|
||||
if self
|
||||
.client
|
||||
.client_tx
|
||||
.send(InProcessClientMessage::Shutdown { done_tx })
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
let _ = timeout(SHUTDOWN_TIMEOUT, done_rx).await;
|
||||
}
|
||||
|
||||
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut runtime_handle).await {
|
||||
runtime_handle.abort();
|
||||
let _ = runtime_handle.await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn sender(&self) -> InProcessClientSender {
|
||||
self.client.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Starts an in-process app-server runtime and performs initialize handshake.
|
||||
///
|
||||
/// This function sends `initialize` followed by `initialized` before returning
|
||||
/// the handle, so callers receive a ready-to-use runtime. If initialize fails,
|
||||
/// the runtime is shut down and an `InvalidData` error is returned.
|
||||
pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle> {
|
||||
let initialize = args.initialize.clone();
|
||||
let client = start_uninitialized(args);
|
||||
|
||||
let initialize_response = client
|
||||
.request(ClientRequest::Initialize {
|
||||
request_id: RequestId::Integer(0),
|
||||
params: initialize,
|
||||
})
|
||||
.await?;
|
||||
if let Err(error) = initialize_response {
|
||||
let _ = client.shutdown().await;
|
||||
return Err(IoError::new(
|
||||
ErrorKind::InvalidData,
|
||||
format!("in-process initialize failed: {}", error.message),
|
||||
));
|
||||
}
|
||||
client.notify(ClientNotification::Initialized)?;
|
||||
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
let channel_capacity = args.channel_capacity.max(1);
|
||||
let (client_tx, mut client_rx) = mpsc::channel::<InProcessClientMessage>(channel_capacity);
|
||||
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
|
||||
|
||||
let runtime_handle = tokio::spawn(async move {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(channel_capacity);
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
|
||||
let (writer_tx, mut writer_rx) = mpsc::channel::<OutgoingMessage>(channel_capacity);
|
||||
let outbound_initialized = Arc::new(AtomicBool::new(false));
|
||||
let outbound_experimental_api_enabled = Arc::new(AtomicBool::new(false));
|
||||
let outbound_opted_out_notification_methods = Arc::new(RwLock::new(HashSet::new()));
|
||||
|
||||
let mut outbound_connections = HashMap::<ConnectionId, OutboundConnectionState>::new();
|
||||
outbound_connections.insert(
|
||||
IN_PROCESS_CONNECTION_ID,
|
||||
OutboundConnectionState::new(
|
||||
writer_tx,
|
||||
Arc::clone(&outbound_initialized),
|
||||
Arc::clone(&outbound_experimental_api_enabled),
|
||||
Arc::clone(&outbound_opted_out_notification_methods),
|
||||
None,
|
||||
),
|
||||
);
|
||||
let mut outbound_handle = tokio::spawn(async move {
|
||||
while let Some(envelope) = outgoing_rx.recv().await {
|
||||
route_outgoing_envelope(&mut outbound_connections, envelope).await;
|
||||
}
|
||||
});
|
||||
|
||||
let processor_outgoing = Arc::clone(&outgoing_message_sender);
|
||||
let (processor_tx, mut processor_rx) = mpsc::channel::<ProcessorCommand>(channel_capacity);
|
||||
let mut processor_handle = tokio::spawn(async move {
|
||||
let mut processor = MessageProcessor::new(MessageProcessorArgs {
|
||||
outgoing: Arc::clone(&processor_outgoing),
|
||||
arg0_paths: args.arg0_paths,
|
||||
config: args.config,
|
||||
cli_overrides: args.cli_overrides,
|
||||
loader_overrides: args.loader_overrides,
|
||||
cloud_requirements: args.cloud_requirements,
|
||||
feedback: args.feedback,
|
||||
log_db: None,
|
||||
config_warnings: args.config_warnings,
|
||||
session_source: args.session_source,
|
||||
enable_codex_api_key_env: args.enable_codex_api_key_env,
|
||||
});
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
let mut session = ConnectionSessionState::default();
|
||||
let mut listen_for_threads = true;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
command = processor_rx.recv() => {
|
||||
match command {
|
||||
Some(ProcessorCommand::Request(request)) => {
|
||||
let was_initialized = session.initialized;
|
||||
processor
|
||||
.process_client_request(
|
||||
IN_PROCESS_CONNECTION_ID,
|
||||
*request,
|
||||
&mut session,
|
||||
&outbound_initialized,
|
||||
)
|
||||
.await;
|
||||
if let Ok(mut opted_out_notification_methods) =
|
||||
outbound_opted_out_notification_methods.write()
|
||||
{
|
||||
*opted_out_notification_methods =
|
||||
session.opted_out_notification_methods.clone();
|
||||
} else {
|
||||
warn!("failed to update outbound opted-out notifications");
|
||||
}
|
||||
outbound_experimental_api_enabled.store(
|
||||
session.experimental_api_enabled,
|
||||
Ordering::Release,
|
||||
);
|
||||
if !was_initialized && session.initialized {
|
||||
processor.send_initialize_notifications().await;
|
||||
}
|
||||
}
|
||||
Some(ProcessorCommand::Notification(notification)) => {
|
||||
processor.process_client_notification(notification).await;
|
||||
}
|
||||
None => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
created = thread_created_rx.recv(), if listen_for_threads => {
|
||||
match created {
|
||||
Ok(thread_id) => {
|
||||
let connection_ids = if session.initialized {
|
||||
vec![IN_PROCESS_CONNECTION_ID]
|
||||
} else {
|
||||
Vec::<ConnectionId>::new()
|
||||
};
|
||||
processor
|
||||
.try_attach_thread_listener(thread_id, connection_ids)
|
||||
.await;
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
|
||||
warn!("thread_created receiver lagged; skipping resync");
|
||||
}
|
||||
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
|
||||
listen_for_threads = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
processor.connection_closed(IN_PROCESS_CONNECTION_ID).await;
|
||||
});
|
||||
let mut pending_request_responses =
|
||||
HashMap::<RequestId, oneshot::Sender<PendingClientRequestResponse>>::new();
|
||||
let mut shutdown_ack = None;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
message = client_rx.recv() => {
|
||||
match message {
|
||||
Some(InProcessClientMessage::Request { request, response_tx }) => {
|
||||
let request = *request;
|
||||
let request_id = request.id().clone();
|
||||
match pending_request_responses.entry(request_id.clone()) {
|
||||
Entry::Vacant(entry) => {
|
||||
entry.insert(response_tx);
|
||||
}
|
||||
Entry::Occupied(_) => {
|
||||
let _ = response_tx.send(Err(JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("duplicate request id: {request_id:?}"),
|
||||
data: None,
|
||||
}));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
match processor_tx.try_send(ProcessorCommand::Request(Box::new(request))) {
|
||||
Ok(()) => {}
|
||||
Err(mpsc::error::TrySendError::Full(_)) => {
|
||||
if let Some(response_tx) =
|
||||
pending_request_responses.remove(&request_id)
|
||||
{
|
||||
let _ = response_tx.send(Err(JSONRPCErrorError {
|
||||
code: OVERLOADED_ERROR_CODE,
|
||||
message: "in-process app-server request queue is full"
|
||||
.to_string(),
|
||||
data: None,
|
||||
}));
|
||||
}
|
||||
}
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => {
|
||||
if let Some(response_tx) =
|
||||
pending_request_responses.remove(&request_id)
|
||||
{
|
||||
let _ = response_tx.send(Err(JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message:
|
||||
"in-process app-server request processor is closed"
|
||||
.to_string(),
|
||||
data: None,
|
||||
}));
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(InProcessClientMessage::Notification { notification }) => {
|
||||
match processor_tx.try_send(ProcessorCommand::Notification(notification)) {
|
||||
Ok(()) => {}
|
||||
Err(mpsc::error::TrySendError::Full(_)) => {
|
||||
warn!("dropping in-process client notification (queue full)");
|
||||
}
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(InProcessClientMessage::ServerRequestResponse { request_id, result }) => {
|
||||
outgoing_message_sender
|
||||
.notify_client_response(request_id, result)
|
||||
.await;
|
||||
}
|
||||
Some(InProcessClientMessage::ServerRequestError { request_id, error }) => {
|
||||
outgoing_message_sender
|
||||
.notify_client_error(request_id, error)
|
||||
.await;
|
||||
}
|
||||
Some(InProcessClientMessage::Shutdown { done_tx }) => {
|
||||
shutdown_ack = Some(done_tx);
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
outgoing_message = writer_rx.recv() => {
|
||||
let Some(outgoing_message) = outgoing_message else {
|
||||
break;
|
||||
};
|
||||
match outgoing_message {
|
||||
OutgoingMessage::Response(response) => {
|
||||
if let Some(response_tx) = pending_request_responses.remove(&response.id) {
|
||||
let _ = response_tx.send(Ok(response.result));
|
||||
} else {
|
||||
warn!(
|
||||
request_id = ?response.id,
|
||||
"dropping unmatched in-process response"
|
||||
);
|
||||
}
|
||||
}
|
||||
OutgoingMessage::Error(error) => {
|
||||
if let Some(response_tx) = pending_request_responses.remove(&error.id) {
|
||||
let _ = response_tx.send(Err(error.error));
|
||||
} else {
|
||||
warn!(
|
||||
request_id = ?error.id,
|
||||
"dropping unmatched in-process error response"
|
||||
);
|
||||
}
|
||||
}
|
||||
OutgoingMessage::Request(request) => {
|
||||
// Send directly to avoid cloning; on failure the
|
||||
// original value is returned inside the error.
|
||||
if let Err(send_error) = event_tx
|
||||
.try_send(InProcessServerEvent::ServerRequest(request))
|
||||
{
|
||||
let (code, message, inner) = match send_error {
|
||||
mpsc::error::TrySendError::Full(inner) => (
|
||||
OVERLOADED_ERROR_CODE,
|
||||
"in-process server request queue is full",
|
||||
inner,
|
||||
),
|
||||
mpsc::error::TrySendError::Closed(inner) => (
|
||||
INTERNAL_ERROR_CODE,
|
||||
"in-process server request consumer is closed",
|
||||
inner,
|
||||
),
|
||||
};
|
||||
let request_id = match inner {
|
||||
InProcessServerEvent::ServerRequest(req) => req.id().clone(),
|
||||
_ => unreachable!("we just sent a ServerRequest variant"),
|
||||
};
|
||||
outgoing_message_sender
|
||||
.notify_client_error(
|
||||
request_id,
|
||||
JSONRPCErrorError {
|
||||
code,
|
||||
message: message.to_string(),
|
||||
data: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
OutgoingMessage::AppServerNotification(notification) => {
|
||||
if server_notification_requires_delivery(¬ification) {
|
||||
if event_tx
|
||||
.send(InProcessServerEvent::ServerNotification(notification))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
} else if let Err(send_error) =
|
||||
event_tx.try_send(InProcessServerEvent::ServerNotification(notification))
|
||||
{
|
||||
match send_error {
|
||||
mpsc::error::TrySendError::Full(_) => {
|
||||
warn!("dropping in-process server notification (queue full)");
|
||||
}
|
||||
mpsc::error::TrySendError::Closed(_) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
OutgoingMessage::Notification(notification) => {
|
||||
let notification = JSONRPCNotification {
|
||||
method: notification.method,
|
||||
params: notification.params,
|
||||
};
|
||||
if legacy_notification_requires_delivery(¬ification) {
|
||||
if event_tx
|
||||
.send(InProcessServerEvent::LegacyNotification(notification))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
} else if let Err(send_error) =
|
||||
event_tx.try_send(InProcessServerEvent::LegacyNotification(notification))
|
||||
{
|
||||
match send_error {
|
||||
mpsc::error::TrySendError::Full(_) => {
|
||||
warn!("dropping in-process legacy notification (queue full)");
|
||||
}
|
||||
mpsc::error::TrySendError::Closed(_) => {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
drop(writer_rx);
|
||||
drop(processor_tx);
|
||||
outgoing_message_sender
|
||||
.cancel_all_requests(Some(JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: "in-process app-server runtime is shutting down".to_string(),
|
||||
data: None,
|
||||
}))
|
||||
.await;
|
||||
// Drop the runtime's last sender before awaiting the router task so
|
||||
// `outgoing_rx.recv()` can observe channel closure and exit cleanly.
|
||||
drop(outgoing_message_sender);
|
||||
for (_, response_tx) in pending_request_responses {
|
||||
let _ = response_tx.send(Err(JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: "in-process app-server runtime is shutting down".to_string(),
|
||||
data: None,
|
||||
}));
|
||||
}
|
||||
|
||||
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut processor_handle).await {
|
||||
processor_handle.abort();
|
||||
let _ = processor_handle.await;
|
||||
}
|
||||
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut outbound_handle).await {
|
||||
outbound_handle.abort();
|
||||
let _ = outbound_handle.await;
|
||||
}
|
||||
|
||||
if let Some(done_tx) = shutdown_ack {
|
||||
let _ = done_tx.send(());
|
||||
}
|
||||
});
|
||||
|
||||
InProcessClientHandle {
|
||||
client: InProcessClientSender { client_tx },
|
||||
event_rx,
|
||||
runtime_handle,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::ConfigRequirementsReadResponse;
|
||||
use codex_app_server_protocol::SessionSource as ApiSessionSource;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::Turn;
|
||||
use codex_app_server_protocol::TurnCompletedNotification;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
async fn build_test_config() -> Config {
|
||||
match ConfigBuilder::default().build().await {
|
||||
Ok(config) => config,
|
||||
Err(_) => Config::load_default_with_cli_overrides(Vec::new())
|
||||
.expect("default config should load"),
|
||||
}
|
||||
}
|
||||
|
||||
async fn start_test_client_with_capacity(
|
||||
session_source: SessionSource,
|
||||
channel_capacity: usize,
|
||||
) -> InProcessClientHandle {
|
||||
let args = InProcessStartArgs {
|
||||
arg0_paths: Arg0DispatchPaths::default(),
|
||||
config: Arc::new(build_test_config().await),
|
||||
cli_overrides: Vec::new(),
|
||||
loader_overrides: LoaderOverrides::default(),
|
||||
cloud_requirements: CloudRequirementsLoader::default(),
|
||||
feedback: CodexFeedback::new(),
|
||||
config_warnings: Vec::new(),
|
||||
session_source,
|
||||
enable_codex_api_key_env: false,
|
||||
initialize: InitializeParams {
|
||||
client_info: ClientInfo {
|
||||
name: "codex-in-process-test".to_string(),
|
||||
title: None,
|
||||
version: "0.0.0".to_string(),
|
||||
},
|
||||
capabilities: None,
|
||||
},
|
||||
channel_capacity,
|
||||
};
|
||||
start(args).await.expect("in-process runtime should start")
|
||||
}
|
||||
|
||||
async fn start_test_client(session_source: SessionSource) -> InProcessClientHandle {
|
||||
start_test_client_with_capacity(session_source, DEFAULT_IN_PROCESS_CHANNEL_CAPACITY).await
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn in_process_start_initializes_and_handles_typed_v2_request() {
|
||||
let client = start_test_client(SessionSource::Cli).await;
|
||||
let response = client
|
||||
.request(ClientRequest::ConfigRequirementsRead {
|
||||
request_id: RequestId::Integer(1),
|
||||
params: None,
|
||||
})
|
||||
.await
|
||||
.expect("request transport should work")
|
||||
.expect("request should succeed");
|
||||
assert!(response.is_object());
|
||||
|
||||
let _parsed: ConfigRequirementsReadResponse =
|
||||
serde_json::from_value(response).expect("response should match v2 schema");
|
||||
client
|
||||
.shutdown()
|
||||
.await
|
||||
.expect("in-process runtime should shutdown cleanly");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn in_process_start_uses_requested_session_source_for_thread_start() {
|
||||
for (requested_source, expected_source) in [
|
||||
(SessionSource::Cli, ApiSessionSource::Cli),
|
||||
(SessionSource::Exec, ApiSessionSource::Exec),
|
||||
] {
|
||||
let client = start_test_client(requested_source).await;
|
||||
let response = client
|
||||
.request(ClientRequest::ThreadStart {
|
||||
request_id: RequestId::Integer(2),
|
||||
params: ThreadStartParams {
|
||||
ephemeral: Some(true),
|
||||
..ThreadStartParams::default()
|
||||
},
|
||||
})
|
||||
.await
|
||||
.expect("request transport should work")
|
||||
.expect("thread/start should succeed");
|
||||
let parsed: ThreadStartResponse =
|
||||
serde_json::from_value(response).expect("thread/start response should parse");
|
||||
assert_eq!(parsed.thread.source, expected_source);
|
||||
client
|
||||
.shutdown()
|
||||
.await
|
||||
.expect("in-process runtime should shutdown cleanly");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn in_process_start_clamps_zero_channel_capacity() {
|
||||
let client = start_test_client_with_capacity(SessionSource::Cli, 0).await;
|
||||
let response = loop {
|
||||
match client
|
||||
.request(ClientRequest::ConfigRequirementsRead {
|
||||
request_id: RequestId::Integer(4),
|
||||
params: None,
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(response) => break response.expect("request should succeed"),
|
||||
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
|
||||
tokio::task::yield_now().await;
|
||||
}
|
||||
Err(err) => panic!("request transport should work: {err}"),
|
||||
}
|
||||
};
|
||||
let _parsed: ConfigRequirementsReadResponse =
|
||||
serde_json::from_value(response).expect("response should match v2 schema");
|
||||
client
|
||||
.shutdown()
|
||||
.await
|
||||
.expect("in-process runtime should shutdown cleanly");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn guaranteed_delivery_helpers_cover_terminal_notifications() {
|
||||
assert!(server_notification_requires_delivery(
|
||||
&ServerNotification::TurnCompleted(TurnCompletedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn: Turn {
|
||||
id: "turn-1".to_string(),
|
||||
items: Vec::new(),
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
},
|
||||
})
|
||||
));
|
||||
|
||||
assert!(legacy_notification_requires_delivery(
|
||||
&JSONRPCNotification {
|
||||
method: "codex/event/task_complete".to_string(),
|
||||
params: None,
|
||||
}
|
||||
));
|
||||
assert!(legacy_notification_requires_delivery(
|
||||
&JSONRPCNotification {
|
||||
method: "codex/event/turn_aborted".to_string(),
|
||||
params: None,
|
||||
}
|
||||
));
|
||||
assert!(legacy_notification_requires_delivery(
|
||||
&JSONRPCNotification {
|
||||
method: "codex/event/shutdown_complete".to_string(),
|
||||
params: None,
|
||||
}
|
||||
));
|
||||
assert!(!legacy_notification_requires_delivery(
|
||||
&JSONRPCNotification {
|
||||
method: "codex/event/item_started".to_string(),
|
||||
params: None,
|
||||
}
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -39,7 +39,6 @@ use codex_core::check_execpolicy_for_warnings;
|
||||
use codex_core::config_loader::ConfigLoadError;
|
||||
use codex_core::config_loader::TextRange as CoreTextRange;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_state::log_db;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::task::JoinHandle;
|
||||
@@ -66,7 +65,6 @@ mod error_code;
|
||||
mod external_agent_config_api;
|
||||
mod filters;
|
||||
mod fuzzy_file_search;
|
||||
pub mod in_process;
|
||||
mod message_processor;
|
||||
mod models;
|
||||
mod outgoing_message;
|
||||
@@ -599,8 +597,6 @@ pub async fn run_main_with_transport(
|
||||
feedback: feedback.clone(),
|
||||
log_db,
|
||||
config_warnings,
|
||||
session_source: SessionSource::VSCode,
|
||||
enable_codex_api_key_env: false,
|
||||
});
|
||||
let mut thread_created_rx = processor.thread_created_receiver();
|
||||
let mut running_turn_count_rx = processor.subscribe_running_assistant_turn_count();
|
||||
|
||||
@@ -18,7 +18,6 @@ use codex_app_server_protocol::ChatgptAuthTokensRefreshParams;
|
||||
use codex_app_server_protocol::ChatgptAuthTokensRefreshReason;
|
||||
use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse;
|
||||
use codex_app_server_protocol::ClientInfo;
|
||||
use codex_app_server_protocol::ClientNotification;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ConfigBatchWriteParams;
|
||||
use codex_app_server_protocol::ConfigReadParams;
|
||||
@@ -158,8 +157,6 @@ pub(crate) struct MessageProcessorArgs {
|
||||
pub(crate) feedback: CodexFeedback,
|
||||
pub(crate) log_db: Option<LogDbLayer>,
|
||||
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
|
||||
pub(crate) session_source: SessionSource,
|
||||
pub(crate) enable_codex_api_key_env: bool,
|
||||
}
|
||||
|
||||
impl MessageProcessor {
|
||||
@@ -176,12 +173,10 @@ impl MessageProcessor {
|
||||
feedback,
|
||||
log_db,
|
||||
config_warnings,
|
||||
session_source,
|
||||
enable_codex_api_key_env,
|
||||
} = args;
|
||||
let auth_manager = AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
enable_codex_api_key_env,
|
||||
false,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
);
|
||||
auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id.clone());
|
||||
@@ -191,7 +186,7 @@ impl MessageProcessor {
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
config.codex_home.clone(),
|
||||
auth_manager.clone(),
|
||||
session_source,
|
||||
SessionSource::VSCode,
|
||||
config.model_catalog.clone(),
|
||||
CollaborationModesConfig {
|
||||
default_mode_request_user_input: config
|
||||
@@ -205,7 +200,7 @@ impl MessageProcessor {
|
||||
let cloud_requirements = Arc::new(RwLock::new(cloud_requirements));
|
||||
let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs {
|
||||
auth_manager,
|
||||
thread_manager: Arc::clone(&thread_manager),
|
||||
thread_manager,
|
||||
outgoing: outgoing.clone(),
|
||||
arg0_paths,
|
||||
config: Arc::clone(&config),
|
||||
@@ -219,7 +214,6 @@ impl MessageProcessor {
|
||||
cli_overrides,
|
||||
loader_overrides,
|
||||
cloud_requirements,
|
||||
thread_manager,
|
||||
);
|
||||
let external_agent_config_api = ExternalAgentConfigApi::new(config.codex_home.clone());
|
||||
|
||||
@@ -280,50 +274,187 @@ impl MessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
self.handle_client_request(
|
||||
connection_id,
|
||||
request_id,
|
||||
codex_request,
|
||||
session,
|
||||
outbound_initialized,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
.instrument(request_span)
|
||||
.await;
|
||||
}
|
||||
match codex_request {
|
||||
// Handle Initialize internally so CodexMessageProcessor does not have to concern
|
||||
// itself with the `initialized` bool.
|
||||
ClientRequest::Initialize { request_id, params } => {
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
};
|
||||
if session.initialized {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: "Already initialized".to_string(),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
} else {
|
||||
// TODO(maxj): Revisit capability scoping for `experimental_api_enabled`.
|
||||
// Current behavior is per-connection. Reviewer feedback notes this can
|
||||
// create odd cross-client behavior (for example dynamic tool calls on a
|
||||
// shared thread when another connected client did not opt into
|
||||
// experimental API). Proposed direction is instance-global first-write-wins
|
||||
// with initialize-time mismatch rejection.
|
||||
let (experimental_api_enabled, opt_out_notification_methods) =
|
||||
match params.capabilities {
|
||||
Some(capabilities) => (
|
||||
capabilities.experimental_api,
|
||||
capabilities
|
||||
.opt_out_notification_methods
|
||||
.unwrap_or_default(),
|
||||
),
|
||||
None => (false, Vec::new()),
|
||||
};
|
||||
session.experimental_api_enabled = experimental_api_enabled;
|
||||
session.opted_out_notification_methods =
|
||||
opt_out_notification_methods.into_iter().collect();
|
||||
let ClientInfo {
|
||||
name,
|
||||
title: _title,
|
||||
version,
|
||||
} = params.client_info;
|
||||
session.app_server_client_name = Some(name.clone());
|
||||
session.client_version = Some(version.clone());
|
||||
if let Err(error) = set_default_originator(name.clone()) {
|
||||
match error {
|
||||
SetOriginatorError::InvalidHeaderValue => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!(
|
||||
"Invalid clientInfo.name: '{name}'. Must be a valid HTTP header value."
|
||||
),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id.clone(), error).await;
|
||||
return;
|
||||
}
|
||||
SetOriginatorError::AlreadyInitialized => {
|
||||
// No-op. This is expected to happen if the originator is already set via env var.
|
||||
// TODO(owen): Once we remove support for CODEX_INTERNAL_ORIGINATOR_OVERRIDE,
|
||||
// this will be an unexpected state and we can return a JSON-RPC error indicating
|
||||
// internal server error.
|
||||
}
|
||||
}
|
||||
}
|
||||
set_default_client_residency_requirement(self.config.enforce_residency.value());
|
||||
let user_agent_suffix = format!("{name}; {version}");
|
||||
if let Ok(mut suffix) = USER_AGENT_SUFFIX.lock() {
|
||||
*suffix = Some(user_agent_suffix);
|
||||
}
|
||||
|
||||
/// Handles a typed request path used by in-process embedders.
|
||||
///
|
||||
/// This bypasses JSON request deserialization but keeps identical request
|
||||
/// semantics by delegating to `handle_client_request`.
|
||||
pub(crate) async fn process_client_request(
|
||||
&mut self,
|
||||
connection_id: ConnectionId,
|
||||
request: ClientRequest,
|
||||
session: &mut ConnectionSessionState,
|
||||
outbound_initialized: &AtomicBool,
|
||||
) {
|
||||
let request_span =
|
||||
crate::app_server_tracing::typed_request_span(&request, connection_id, session);
|
||||
async {
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id: request.id().clone(),
|
||||
};
|
||||
tracing::trace!(
|
||||
?connection_id,
|
||||
request_id = ?request_id.request_id,
|
||||
"app-server typed request"
|
||||
);
|
||||
self.handle_client_request(
|
||||
connection_id,
|
||||
request_id,
|
||||
request,
|
||||
session,
|
||||
outbound_initialized,
|
||||
)
|
||||
.await;
|
||||
let user_agent = get_codex_user_agent();
|
||||
let response = InitializeResponse { user_agent };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
|
||||
session.initialized = true;
|
||||
outbound_initialized.store(true, Ordering::Release);
|
||||
self.codex_message_processor
|
||||
.connection_initialized(connection_id)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
if !session.initialized {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: "Not initialized".to_string(),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(reason) = codex_request.experimental_reason()
|
||||
&& !session.experimental_api_enabled
|
||||
{
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: experimental_required_message(reason),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
|
||||
match codex_request {
|
||||
ClientRequest::ConfigRead { request_id, params } => {
|
||||
self.handle_config_read(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ExternalAgentConfigDetect { request_id, params } => {
|
||||
self.handle_external_agent_config_detect(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ExternalAgentConfigImport { request_id, params } => {
|
||||
self.handle_external_agent_config_import(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ConfigValueWrite { request_id, params } => {
|
||||
self.handle_config_value_write(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ConfigBatchWrite { request_id, params } => {
|
||||
self.handle_config_batch_write(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ConfigRequirementsRead {
|
||||
request_id,
|
||||
params: _,
|
||||
} => {
|
||||
self.handle_config_requirements_read(ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
other => {
|
||||
// Box the delegated future so this wrapper's async state machine does not
|
||||
// inline the full `CodexMessageProcessor::process_request` future, which
|
||||
// can otherwise push worker-thread stack usage over the edge.
|
||||
self.codex_message_processor
|
||||
.process_request(
|
||||
connection_id,
|
||||
other,
|
||||
session.app_server_client_name.clone(),
|
||||
)
|
||||
.boxed()
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(request_span)
|
||||
.await;
|
||||
@@ -335,13 +466,6 @@ impl MessageProcessor {
|
||||
tracing::info!("<- notification: {:?}", notification);
|
||||
}
|
||||
|
||||
/// Handles typed notifications from in-process clients.
|
||||
pub(crate) async fn process_client_notification(&self, notification: ClientNotification) {
|
||||
// Currently, we do not expect to receive any typed notifications from
|
||||
// in-process clients, so we just log them.
|
||||
tracing::info!("<- typed notification: {:?}", notification);
|
||||
}
|
||||
|
||||
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
|
||||
self.codex_message_processor.thread_created_receiver()
|
||||
}
|
||||
@@ -388,193 +512,6 @@ impl MessageProcessor {
|
||||
self.outgoing.notify_client_error(err.id, err.error).await;
|
||||
}
|
||||
|
||||
async fn handle_client_request(
|
||||
&mut self,
|
||||
connection_id: ConnectionId,
|
||||
request_id: ConnectionRequestId,
|
||||
codex_request: ClientRequest,
|
||||
session: &mut ConnectionSessionState,
|
||||
outbound_initialized: &AtomicBool,
|
||||
) {
|
||||
match codex_request {
|
||||
// Handle Initialize internally so CodexMessageProcessor does not have to concern
|
||||
// itself with the `initialized` bool.
|
||||
ClientRequest::Initialize { request_id, params } => {
|
||||
let request_id = ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
};
|
||||
if session.initialized {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: "Already initialized".to_string(),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO(maxj): Revisit capability scoping for `experimental_api_enabled`.
|
||||
// Current behavior is per-connection. Reviewer feedback notes this can
|
||||
// create odd cross-client behavior (for example dynamic tool calls on a
|
||||
// shared thread when another connected client did not opt into
|
||||
// experimental API). Proposed direction is instance-global first-write-wins
|
||||
// with initialize-time mismatch rejection.
|
||||
let (experimental_api_enabled, opt_out_notification_methods) =
|
||||
match params.capabilities {
|
||||
Some(capabilities) => (
|
||||
capabilities.experimental_api,
|
||||
capabilities
|
||||
.opt_out_notification_methods
|
||||
.unwrap_or_default(),
|
||||
),
|
||||
None => (false, Vec::new()),
|
||||
};
|
||||
session.experimental_api_enabled = experimental_api_enabled;
|
||||
session.opted_out_notification_methods =
|
||||
opt_out_notification_methods.into_iter().collect();
|
||||
let ClientInfo {
|
||||
name,
|
||||
title: _title,
|
||||
version,
|
||||
} = params.client_info;
|
||||
session.app_server_client_name = Some(name.clone());
|
||||
session.client_version = Some(version.clone());
|
||||
if let Err(error) = set_default_originator(name.clone()) {
|
||||
match error {
|
||||
SetOriginatorError::InvalidHeaderValue => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!(
|
||||
"Invalid clientInfo.name: '{name}'. Must be a valid HTTP header value."
|
||||
),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id.clone(), error).await;
|
||||
return;
|
||||
}
|
||||
SetOriginatorError::AlreadyInitialized => {
|
||||
// No-op. This is expected to happen if the originator is already set via env var.
|
||||
// TODO(owen): Once we remove support for CODEX_INTERNAL_ORIGINATOR_OVERRIDE,
|
||||
// this will be an unexpected state and we can return a JSON-RPC error indicating
|
||||
// internal server error.
|
||||
}
|
||||
}
|
||||
}
|
||||
set_default_client_residency_requirement(self.config.enforce_residency.value());
|
||||
let user_agent_suffix = format!("{name}; {version}");
|
||||
if let Ok(mut suffix) = USER_AGENT_SUFFIX.lock() {
|
||||
*suffix = Some(user_agent_suffix);
|
||||
}
|
||||
|
||||
let user_agent = get_codex_user_agent();
|
||||
let response = InitializeResponse { user_agent };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
|
||||
session.initialized = true;
|
||||
outbound_initialized.store(true, Ordering::Release);
|
||||
self.codex_message_processor
|
||||
.connection_initialized(connection_id)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
_ => {
|
||||
if !session.initialized {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: "Not initialized".to_string(),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(reason) = codex_request.experimental_reason()
|
||||
&& !session.experimental_api_enabled
|
||||
{
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: experimental_required_message(reason),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
|
||||
match codex_request {
|
||||
ClientRequest::ConfigRead { request_id, params } => {
|
||||
self.handle_config_read(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ExternalAgentConfigDetect { request_id, params } => {
|
||||
self.handle_external_agent_config_detect(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ExternalAgentConfigImport { request_id, params } => {
|
||||
self.handle_external_agent_config_import(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ConfigValueWrite { request_id, params } => {
|
||||
self.handle_config_value_write(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ConfigBatchWrite { request_id, params } => {
|
||||
self.handle_config_batch_write(
|
||||
ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
},
|
||||
params,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ConfigRequirementsRead {
|
||||
request_id,
|
||||
params: _,
|
||||
} => {
|
||||
self.handle_config_requirements_read(ConnectionRequestId {
|
||||
connection_id,
|
||||
request_id,
|
||||
})
|
||||
.await;
|
||||
}
|
||||
other => {
|
||||
// Box the delegated future so this wrapper's async state machine does not
|
||||
// inline the full `CodexMessageProcessor::process_request` future, which
|
||||
// can otherwise push worker-thread stack usage over the edge.
|
||||
self.codex_message_processor
|
||||
.process_request(connection_id, other, session.app_server_client_name.clone())
|
||||
.boxed()
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_config_read(&self, request_id: ConnectionRequestId, params: ConfigReadParams) {
|
||||
match self.config_api.read(params).await {
|
||||
Ok(response) => self.outgoing.send_response(request_id, response).await,
|
||||
|
||||
@@ -273,25 +273,6 @@ impl OutgoingMessageSender {
|
||||
self.take_request_callback(id).await.is_some()
|
||||
}
|
||||
|
||||
pub(crate) async fn cancel_all_requests(&self, error: Option<JSONRPCErrorError>) {
|
||||
let entries = {
|
||||
let mut request_id_to_callback = self.request_id_to_callback.lock().await;
|
||||
request_id_to_callback
|
||||
.drain()
|
||||
.map(|(_, entry)| entry)
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
|
||||
if let Some(error) = error {
|
||||
for entry in entries {
|
||||
if let Err(err) = entry.callback.send(Err(error.clone())) {
|
||||
let request_id = entry.request.id();
|
||||
warn!("could not notify callback for {request_id:?} due to: {err:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn take_request_callback(
|
||||
&self,
|
||||
id: &RequestId,
|
||||
|
||||
@@ -605,7 +605,6 @@ async fn config_batch_write_applies_multiple_edits() -> Result<()> {
|
||||
},
|
||||
],
|
||||
expected_version: None,
|
||||
reload_user_config: false,
|
||||
})
|
||||
.await?;
|
||||
let batch_resp: JSONRPCResponse = timeout(
|
||||
|
||||
@@ -53,6 +53,9 @@ enum WsCommand {
|
||||
message: Message,
|
||||
tx_result: oneshot::Sender<Result<(), WsError>>,
|
||||
},
|
||||
Close {
|
||||
tx_result: oneshot::Sender<Result<(), WsError>>,
|
||||
},
|
||||
}
|
||||
|
||||
impl WsStream {
|
||||
@@ -77,6 +80,11 @@ impl WsStream {
|
||||
break;
|
||||
}
|
||||
}
|
||||
WsCommand::Close { tx_result } => {
|
||||
let result = inner.close(None).await;
|
||||
let _ = tx_result.send(result);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
message = inner.next() => {
|
||||
@@ -136,6 +144,11 @@ impl WsStream {
|
||||
.await
|
||||
}
|
||||
|
||||
async fn close(&self) -> Result<(), WsError> {
|
||||
self.request(|tx_result| WsCommand::Close { tx_result })
|
||||
.await
|
||||
}
|
||||
|
||||
async fn next(&mut self) -> Option<Result<Message, WsError>> {
|
||||
self.rx_message.recv().await
|
||||
}
|
||||
@@ -229,32 +242,26 @@ impl ResponsesWebsocketConnection {
|
||||
.await;
|
||||
}
|
||||
let mut guard = stream.lock().await;
|
||||
let result = {
|
||||
let Some(ws_stream) = guard.as_mut() else {
|
||||
let _ = tx_event
|
||||
.send(Err(ApiError::Stream(
|
||||
"websocket connection is closed".to_string(),
|
||||
)))
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
|
||||
run_websocket_response_stream(
|
||||
ws_stream,
|
||||
tx_event.clone(),
|
||||
request_body,
|
||||
idle_timeout,
|
||||
telemetry,
|
||||
)
|
||||
.await
|
||||
let Some(ws_stream) = guard.as_mut() else {
|
||||
let _ = tx_event
|
||||
.send(Err(ApiError::Stream(
|
||||
"websocket connection is closed".to_string(),
|
||||
)))
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
|
||||
if let Err(err) = result {
|
||||
// A terminal stream error should reach the caller immediately. Waiting for a
|
||||
// graceful close handshake here can stall indefinitely and mask the error.
|
||||
let failed_stream = guard.take();
|
||||
drop(guard);
|
||||
drop(failed_stream);
|
||||
if let Err(err) = run_websocket_response_stream(
|
||||
ws_stream,
|
||||
tx_event.clone(),
|
||||
request_body,
|
||||
idle_timeout,
|
||||
telemetry,
|
||||
)
|
||||
.await
|
||||
{
|
||||
let _ = ws_stream.close().await;
|
||||
*guard = None;
|
||||
let _ = tx_event.send(Err(err)).await;
|
||||
}
|
||||
});
|
||||
|
||||
@@ -201,8 +201,13 @@ impl Session {
|
||||
}
|
||||
|
||||
pub async fn abort_all_tasks(self: &Arc<Self>, reason: TurnAbortReason) {
|
||||
for task in self.take_all_running_tasks().await {
|
||||
self.handle_task_abort(task, reason.clone()).await;
|
||||
if let Some(mut active_turn) = self.take_active_turn().await {
|
||||
for task in active_turn.drain_tasks() {
|
||||
self.handle_task_abort(task, reason.clone()).await;
|
||||
}
|
||||
// Let interrupted tasks observe cancellation before dropping pending approvals, or an
|
||||
// in-flight approval wait can surface as a model-visible rejection before TurnAborted.
|
||||
active_turn.clear_pending().await;
|
||||
}
|
||||
if reason == TurnAbortReason::Interrupted {
|
||||
self.close_unified_exec_processes().await;
|
||||
@@ -342,16 +347,9 @@ impl Session {
|
||||
*active = Some(turn);
|
||||
}
|
||||
|
||||
async fn take_all_running_tasks(&self) -> Vec<RunningTask> {
|
||||
async fn take_active_turn(&self) -> Option<ActiveTurn> {
|
||||
let mut active = self.active_turn.lock().await;
|
||||
match active.take() {
|
||||
Some(mut at) => {
|
||||
at.clear_pending().await;
|
||||
|
||||
at.drain_tasks()
|
||||
}
|
||||
None => Vec::new(),
|
||||
}
|
||||
active.take()
|
||||
}
|
||||
|
||||
pub(crate) async fn close_unified_exec_processes(&self) {
|
||||
|
||||
@@ -349,7 +349,7 @@ async fn shell_output_for_freeform_tool_records_duration(
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let call_id = "shell-structured";
|
||||
let responses = shell_responses(call_id, vec!["/bin/sh", "-c", "sleep 0.2"], output_type)?;
|
||||
let responses = shell_responses(call_id, vec!["/bin/sh", "-c", "sleep 1"], output_type)?;
|
||||
let mock = mount_sse_sequence(&server, responses).await;
|
||||
|
||||
test.submit_turn_with_policy(
|
||||
@@ -381,7 +381,7 @@ $"#;
|
||||
.and_then(|value| value.as_str().parse::<f32>().ok())
|
||||
.expect("expected structured shell output to contain wall time seconds");
|
||||
assert!(
|
||||
wall_time_seconds > 0.1,
|
||||
wall_time_seconds > 0.5,
|
||||
"expected wall time to be greater than zero seconds, got {wall_time_seconds}"
|
||||
);
|
||||
|
||||
@@ -740,7 +740,6 @@ async fn shell_command_output_is_freeform() -> Result<()> {
|
||||
let call_id = "shell-command";
|
||||
let args = json!({
|
||||
"command": "echo shell command",
|
||||
"login": false,
|
||||
"timeout_ms": 1_000,
|
||||
});
|
||||
let responses = vec![
|
||||
@@ -792,7 +791,6 @@ async fn shell_command_output_is_not_truncated_under_10k_bytes() -> Result<()> {
|
||||
let call_id = "shell-command";
|
||||
let args = json!({
|
||||
"command": "perl -e 'print \"1\" x 10000'",
|
||||
"login": false,
|
||||
"timeout_ms": 1000,
|
||||
});
|
||||
let responses = vec![
|
||||
@@ -843,7 +841,6 @@ async fn shell_command_output_is_not_truncated_over_10k_bytes() -> Result<()> {
|
||||
let call_id = "shell-command";
|
||||
let args = json!({
|
||||
"command": "perl -e 'print \"1\" x 10001'",
|
||||
"login": false,
|
||||
"timeout_ms": 1000,
|
||||
});
|
||||
let responses = vec![
|
||||
|
||||
@@ -19,11 +19,8 @@ workspace = true
|
||||
anyhow = { workspace = true }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
codex-arg0 = { workspace = true }
|
||||
codex-app-server-client = { workspace = true }
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
codex-cloud-requirements = { workspace = true }
|
||||
codex-core = { workspace = true }
|
||||
codex-feedback = { workspace = true }
|
||||
codex-otel = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-utils-absolute-path = { workspace = true }
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user