Compare commits

..

1 Commits

Author SHA1 Message Date
Ahmed Ibrahim
3efd0dbff8 codex: delay pending cleanup until task aborts 2026-03-08 13:51:32 -07:00
25 changed files with 474 additions and 3181 deletions

21
codex-rs/Cargo.lock generated
View File

@@ -1458,24 +1458,6 @@ dependencies = [
"wiremock",
]
[[package]]
name = "codex-app-server-client"
version = "0.0.0"
dependencies = [
"codex-app-server",
"codex-app-server-protocol",
"codex-arg0",
"codex-core",
"codex-feedback",
"codex-protocol",
"pretty_assertions",
"serde",
"serde_json",
"tokio",
"toml 0.9.11+spec-1.1.0",
"tracing",
]
[[package]]
name = "codex-app-server-protocol"
version = "0.0.0"
@@ -1917,13 +1899,10 @@ dependencies = [
"anyhow",
"assert_cmd",
"clap",
"codex-app-server-client",
"codex-app-server-protocol",
"codex-apply-patch",
"codex-arg0",
"codex-cloud-requirements",
"codex-core",
"codex-feedback",
"codex-otel",
"codex-protocol",
"codex-utils-absolute-path",

View File

@@ -4,7 +4,6 @@ members = [
"ansi-escape",
"async-utils",
"app-server",
"app-server-client",
"app-server-protocol",
"app-server-test-client",
"debug-client",
@@ -87,7 +86,6 @@ codex-api = { path = "codex-api" }
codex-artifacts = { path = "artifacts" }
codex-package-manager = { path = "package-manager" }
codex-app-server = { path = "app-server" }
codex-app-server-client = { path = "app-server-client" }
codex-app-server-protocol = { path = "app-server-protocol" }
codex-app-server-test-client = { path = "app-server-test-client" }
codex-apply-patch = { path = "apply-patch" }

View File

@@ -1,30 +0,0 @@
[package]
name = "codex-app-server-client"
version.workspace = true
edition.workspace = true
license.workspace = true
[lib]
name = "codex_app_server_client"
path = "src/lib.rs"
[lints]
workspace = true
[dependencies]
codex-app-server = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-arg0 = { workspace = true }
codex-core = { workspace = true }
codex-feedback = { workspace = true }
codex-protocol = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["sync", "time", "rt"] }
toml = { workspace = true }
tracing = { workspace = true }
[dev-dependencies]
pretty_assertions = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }

View File

@@ -1,67 +0,0 @@
# codex-app-server-client
Shared in-process app-server client used by conversational CLI surfaces:
- `codex-exec`
- `codex-tui`
## Purpose
This crate centralizes startup and lifecycle management for an in-process
`codex-app-server` runtime, so CLI clients do not need to duplicate:
- app-server bootstrap and initialize handshake
- in-memory request/event transport wiring
- lifecycle orchestration around caller-provided startup identity
- graceful shutdown behavior
## Startup identity
Callers pass both the app-server `SessionSource` and the initialize
`client_info.name` explicitly when starting the facade.
That keeps thread metadata (for example in `thread/list` and `thread/read`)
aligned with the originating runtime without baking TUI/exec-specific policy
into the shared client layer.
## Transport model
The in-process path uses typed channels:
- client -> server: `ClientRequest` / `ClientNotification`
- server -> client: `InProcessServerEvent`
- `ServerRequest`
- `ServerNotification`
- `LegacyNotification`
JSON serialization is still used at external transport boundaries
(stdio/websocket), but the in-process hot path is typed.
Typed requests still receive app-server responses through the JSON-RPC
result envelope internally. That is intentional: the in-process path is
meant to preserve app-server semantics while removing the process
boundary, not to introduce a second response contract.
## Bootstrap behavior
The client facade starts an already-initialized in-process runtime, but
thread bootstrap still follows normal app-server flow:
- caller sends `thread/start` or `thread/resume`
- app-server returns the immediate typed response
- richer session metadata may arrive later as a `SessionConfigured`
legacy event
Surfaces such as TUI and exec may therefore need a short bootstrap
phase where they reconcile startup response data with later events.
## Backpressure and shutdown
- Queues are bounded and use `DEFAULT_IN_PROCESS_CHANNEL_CAPACITY` by default.
- Full queues return explicit overload behavior instead of unbounded growth.
- `shutdown()` performs a bounded graceful shutdown and then aborts if timeout
is exceeded.
If the client falls behind on event consumption, the worker emits
`InProcessServerEvent::Lagged` and may reject pending server requests so
approval flows do not hang indefinitely behind a saturated queue.

View File

@@ -1,801 +0,0 @@
//! Shared in-process app-server client facade for CLI surfaces.
//!
//! This crate wraps [`codex_app_server::in_process`] behind a single async API
//! used by surfaces like TUI and exec. It centralizes:
//!
//! - Runtime startup and initialize-capabilities handshake.
//! - Typed caller-provided startup identity (`SessionSource` + client name).
//! - Typed and raw request/notification dispatch.
//! - Server request resolution and rejection.
//! - Event consumption with backpressure signaling ([`InProcessServerEvent::Lagged`]).
//! - Bounded graceful shutdown with abort fallback.
//!
//! The facade interposes a worker task between the caller and the underlying
//! [`InProcessClientHandle`](codex_app_server::in_process::InProcessClientHandle),
//! bridging async `mpsc` channels on both sides. Queues are bounded so overload
//! surfaces as channel-full errors rather than unbounded memory growth.
use std::error::Error;
use std::fmt;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::io::Result as IoResult;
use std::sync::Arc;
use std::time::Duration;
pub use codex_app_server::in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
pub use codex_app_server::in_process::InProcessServerEvent;
use codex_app_server::in_process::InProcessStartArgs;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::InitializeCapabilities;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result as JsonRpcResult;
use codex_arg0::Arg0DispatchPaths;
use codex_core::config::Config;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::LoaderOverrides;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;
use serde::de::DeserializeOwned;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::timeout;
use toml::Value as TomlValue;
use tracing::warn;
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
/// Raw app-server request result for typed in-process requests.
///
/// Even on the in-process path, successful responses still travel back through
/// the same JSON-RPC result envelope used by socket/stdio transports because
/// `MessageProcessor` continues to produce that shape internally.
pub type RequestResult = std::result::Result<JsonRpcResult, JSONRPCErrorError>;
fn event_requires_delivery(event: &InProcessServerEvent) -> bool {
// These terminal events drive surface shutdown/completion state. Dropping
// them under backpressure can leave exec/TUI waiting forever even though
// the underlying turn has already ended.
match event {
InProcessServerEvent::ServerNotification(
codex_app_server_protocol::ServerNotification::TurnCompleted(_),
) => true,
InProcessServerEvent::LegacyNotification(notification) => matches!(
notification
.method
.strip_prefix("codex/event/")
.unwrap_or(&notification.method),
"task_complete" | "turn_aborted" | "shutdown_complete"
),
_ => false,
}
}
/// Layered error for [`InProcessAppServerClient::request_typed`].
///
/// This keeps transport failures, server-side JSON-RPC failures, and response
/// decode failures distinct so callers can decide whether to retry, surface a
/// server error, or treat the response as an internal request/response mismatch.
#[derive(Debug)]
pub enum TypedRequestError {
Transport {
method: String,
source: IoError,
},
Server {
method: String,
source: JSONRPCErrorError,
},
Deserialize {
method: String,
source: serde_json::Error,
},
}
impl fmt::Display for TypedRequestError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Transport { method, source } => {
write!(f, "{method} transport error: {source}")
}
Self::Server { method, source } => {
write!(f, "{method} failed: {}", source.message)
}
Self::Deserialize { method, source } => {
write!(f, "{method} response decode error: {source}")
}
}
}
}
impl Error for TypedRequestError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Transport { source, .. } => Some(source),
Self::Server { .. } => None,
Self::Deserialize { source, .. } => Some(source),
}
}
}
#[derive(Clone)]
pub struct InProcessClientStartArgs {
/// Resolved argv0 dispatch paths used by command execution internals.
pub arg0_paths: Arg0DispatchPaths,
/// Shared config used to initialize app-server runtime.
pub config: Arc<Config>,
/// CLI config overrides that are already parsed into TOML values.
pub cli_overrides: Vec<(String, TomlValue)>,
/// Loader override knobs used by config API paths.
pub loader_overrides: LoaderOverrides,
/// Preloaded cloud requirements provider.
pub cloud_requirements: CloudRequirementsLoader,
/// Feedback sink used by app-server/core telemetry and logs.
pub feedback: CodexFeedback,
/// Startup warnings emitted after initialize succeeds.
pub config_warnings: Vec<ConfigWarningNotification>,
/// Session source recorded in app-server thread metadata.
pub session_source: SessionSource,
/// Whether auth loading should honor the `CODEX_API_KEY` environment variable.
pub enable_codex_api_key_env: bool,
/// Client name reported during initialize.
pub client_name: String,
/// Client version reported during initialize.
pub client_version: String,
/// Whether experimental APIs are requested at initialize time.
pub experimental_api: bool,
/// Notification methods this client opts out of receiving.
pub opt_out_notification_methods: Vec<String>,
/// Queue capacity for command/event channels (clamped to at least 1).
pub channel_capacity: usize,
}
impl InProcessClientStartArgs {
/// Builds initialize params from caller-provided metadata.
pub fn initialize_params(&self) -> InitializeParams {
let capabilities = InitializeCapabilities {
experimental_api: self.experimental_api,
opt_out_notification_methods: if self.opt_out_notification_methods.is_empty() {
None
} else {
Some(self.opt_out_notification_methods.clone())
},
};
InitializeParams {
client_info: ClientInfo {
name: self.client_name.clone(),
title: None,
version: self.client_version.clone(),
},
capabilities: Some(capabilities),
}
}
fn into_runtime_start_args(self) -> InProcessStartArgs {
let initialize = self.initialize_params();
InProcessStartArgs {
arg0_paths: self.arg0_paths,
config: self.config,
cli_overrides: self.cli_overrides,
loader_overrides: self.loader_overrides,
cloud_requirements: self.cloud_requirements,
feedback: self.feedback,
config_warnings: self.config_warnings,
session_source: self.session_source,
enable_codex_api_key_env: self.enable_codex_api_key_env,
initialize,
channel_capacity: self.channel_capacity,
}
}
}
/// Internal command sent from public facade methods to the worker task.
///
/// Each variant carries a oneshot sender so the caller can `await` the
/// result without holding a mutable reference to the client.
enum ClientCommand {
Request {
request: Box<ClientRequest>,
response_tx: oneshot::Sender<IoResult<RequestResult>>,
},
Notify {
notification: ClientNotification,
response_tx: oneshot::Sender<IoResult<()>>,
},
ResolveServerRequest {
request_id: RequestId,
result: JsonRpcResult,
response_tx: oneshot::Sender<IoResult<()>>,
},
RejectServerRequest {
request_id: RequestId,
error: JSONRPCErrorError,
response_tx: oneshot::Sender<IoResult<()>>,
},
Shutdown {
response_tx: oneshot::Sender<IoResult<()>>,
},
}
/// Async facade over the in-process app-server runtime.
///
/// This type owns a worker task that bridges between:
/// - caller-facing async `mpsc` channels used by TUI/exec
/// - [`codex_app_server::in_process::InProcessClientHandle`], which speaks to
/// the embedded `MessageProcessor`
///
/// The facade intentionally preserves the server's request/notification/event
/// model instead of exposing direct core runtime handles. That keeps in-process
/// callers aligned with app-server behavior while still avoiding a process
/// boundary.
pub struct InProcessAppServerClient {
command_tx: mpsc::Sender<ClientCommand>,
event_rx: mpsc::Receiver<InProcessServerEvent>,
worker_handle: tokio::task::JoinHandle<()>,
}
impl InProcessAppServerClient {
/// Starts the in-process runtime and facade worker task.
///
/// The returned client is ready for requests and event consumption. If the
/// internal event queue is saturated later, server requests are rejected
/// with overload error instead of being silently dropped.
pub async fn start(args: InProcessClientStartArgs) -> IoResult<Self> {
let channel_capacity = args.channel_capacity.max(1);
let mut handle =
codex_app_server::in_process::start(args.into_runtime_start_args()).await?;
let request_sender = handle.sender();
let (command_tx, mut command_rx) = mpsc::channel::<ClientCommand>(channel_capacity);
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
let worker_handle = tokio::spawn(async move {
let mut event_stream_enabled = true;
let mut skipped_events = 0usize;
loop {
tokio::select! {
command = command_rx.recv() => {
match command {
Some(ClientCommand::Request { request, response_tx }) => {
let request_sender = request_sender.clone();
// Request waits happen on a detached task so
// this loop can keep draining runtime events
// while the request is blocked on client input.
tokio::spawn(async move {
let result = request_sender.request(*request).await;
let _ = response_tx.send(result);
});
}
Some(ClientCommand::Notify {
notification,
response_tx,
}) => {
let result = request_sender.notify(notification);
let _ = response_tx.send(result);
}
Some(ClientCommand::ResolveServerRequest {
request_id,
result,
response_tx,
}) => {
let send_result =
request_sender.respond_to_server_request(request_id, result);
let _ = response_tx.send(send_result);
}
Some(ClientCommand::RejectServerRequest {
request_id,
error,
response_tx,
}) => {
let send_result = request_sender.fail_server_request(request_id, error);
let _ = response_tx.send(send_result);
}
Some(ClientCommand::Shutdown { response_tx }) => {
let shutdown_result = handle.shutdown().await;
let _ = response_tx.send(shutdown_result);
break;
}
None => {
let _ = handle.shutdown().await;
break;
}
}
}
event = handle.next_event(), if event_stream_enabled => {
let Some(event) = event else {
break;
};
if skipped_events > 0 {
if event_requires_delivery(&event) {
// Surface lag before the terminal event, but
// do not let the lag marker itself cause us to
// drop the completion/abort notification that
// the caller is blocked on.
if event_tx
.send(InProcessServerEvent::Lagged {
skipped: skipped_events,
})
.await
.is_err()
{
event_stream_enabled = false;
continue;
}
skipped_events = 0;
} else {
match event_tx.try_send(InProcessServerEvent::Lagged {
skipped: skipped_events,
}) {
Ok(()) => {
skipped_events = 0;
}
Err(mpsc::error::TrySendError::Full(_)) => {
skipped_events = skipped_events.saturating_add(1);
warn!(
"dropping in-process app-server event because consumer queue is full"
);
if let InProcessServerEvent::ServerRequest(request) = event {
let _ = request_sender.fail_server_request(
request.id().clone(),
JSONRPCErrorError {
code: -32001,
message: "in-process app-server event queue is full".to_string(),
data: None,
},
);
}
continue;
}
Err(mpsc::error::TrySendError::Closed(_)) => {
event_stream_enabled = false;
continue;
}
}
}
}
if event_requires_delivery(&event) {
// Block until the consumer catches up for
// terminal notifications; this preserves the
// completion signal even when the queue is
// otherwise saturated.
if event_tx.send(event).await.is_err() {
event_stream_enabled = false;
}
continue;
}
match event_tx.try_send(event) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(event)) => {
skipped_events = skipped_events.saturating_add(1);
warn!("dropping in-process app-server event because consumer queue is full");
if let InProcessServerEvent::ServerRequest(request) = event {
let _ = request_sender.fail_server_request(
request.id().clone(),
JSONRPCErrorError {
code: -32001,
message: "in-process app-server event queue is full".to_string(),
data: None,
},
);
}
}
Err(mpsc::error::TrySendError::Closed(_)) => {
event_stream_enabled = false;
}
}
}
}
}
});
Ok(Self {
command_tx,
event_rx,
worker_handle,
})
}
/// Sends a typed client request and returns raw JSON-RPC result.
///
/// Callers that expect a concrete response type should usually prefer
/// [`request_typed`](Self::request_typed).
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(ClientCommand::Request {
request: Box::new(request),
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server request channel is closed",
)
})?
}
/// Sends a typed client request and decodes the successful response body.
///
/// This still deserializes from a JSON value produced by app-server's
/// JSON-RPC result envelope. Because the caller chooses `T`, `Deserialize`
/// failures indicate an internal request/response mismatch at the call site
/// (or an in-process bug), not transport skew from an external client.
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
where
T: DeserializeOwned,
{
let method = request_method_name(&request);
let response =
self.request(request)
.await
.map_err(|source| TypedRequestError::Transport {
method: method.clone(),
source,
})?;
let result = response.map_err(|source| TypedRequestError::Server {
method: method.clone(),
source,
})?;
serde_json::from_value(result)
.map_err(|source| TypedRequestError::Deserialize { method, source })
}
/// Sends a typed client notification.
pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(ClientCommand::Notify {
notification,
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server notify channel is closed",
)
})?
}
/// Resolves a pending server request.
///
/// This should only be called with request IDs obtained from the current
/// client's event stream.
pub async fn resolve_server_request(
&self,
request_id: RequestId,
result: JsonRpcResult,
) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(ClientCommand::ResolveServerRequest {
request_id,
result,
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server resolve channel is closed",
)
})?
}
/// Rejects a pending server request with JSON-RPC error payload.
pub async fn reject_server_request(
&self,
request_id: RequestId,
error: JSONRPCErrorError,
) -> IoResult<()> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(ClientCommand::RejectServerRequest {
request_id,
error,
response_tx,
})
.await
.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server worker channel is closed",
)
})?;
response_rx.await.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server reject channel is closed",
)
})?
}
/// Returns the next in-process event, or `None` when worker exits.
///
/// Callers are expected to drain this stream promptly. If they fall behind,
/// the worker emits [`InProcessServerEvent::Lagged`] markers and may reject
/// pending server requests rather than letting approval flows hang.
pub async fn next_event(&mut self) -> Option<InProcessServerEvent> {
self.event_rx.recv().await
}
/// Shuts down worker and in-process runtime with bounded wait.
///
/// If graceful shutdown exceeds timeout, the worker task is aborted to
/// avoid leaking background tasks in embedding callers.
pub async fn shutdown(self) -> IoResult<()> {
let Self {
command_tx,
event_rx,
worker_handle,
} = self;
let mut worker_handle = worker_handle;
// Drop the caller-facing receiver before asking the worker to shut
// down. That unblocks any pending must-deliver `event_tx.send(..)`
// so the worker can reach `handle.shutdown()` instead of timing out
// and getting aborted with the runtime still attached.
drop(event_rx);
let (response_tx, response_rx) = oneshot::channel();
if command_tx
.send(ClientCommand::Shutdown { response_tx })
.await
.is_ok()
&& let Ok(command_result) = timeout(SHUTDOWN_TIMEOUT, response_rx).await
{
command_result.map_err(|_| {
IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server shutdown channel is closed",
)
})??;
}
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut worker_handle).await {
worker_handle.abort();
let _ = worker_handle.await;
}
Ok(())
}
}
/// Extracts the JSON-RPC method name for diagnostics without extending the
/// protocol crate with in-process-only helpers.
fn request_method_name(request: &ClientRequest) -> String {
serde_json::to_value(request)
.ok()
.and_then(|value| {
value
.get("method")
.and_then(serde_json::Value::as_str)
.map(ToOwned::to_owned)
})
.unwrap_or_else(|| "<unknown>".to_string())
}
#[cfg(test)]
mod tests {
use super::*;
use codex_app_server_protocol::ConfigRequirementsReadResponse;
use codex_app_server_protocol::SessionSource as ApiSessionSource;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_core::config::ConfigBuilder;
use pretty_assertions::assert_eq;
use tokio::time::Duration;
use tokio::time::timeout;
async fn build_test_config() -> Config {
match ConfigBuilder::default().build().await {
Ok(config) => config,
Err(_) => Config::load_default_with_cli_overrides(Vec::new())
.expect("default config should load"),
}
}
async fn start_test_client_with_capacity(
session_source: SessionSource,
channel_capacity: usize,
) -> InProcessAppServerClient {
InProcessAppServerClient::start(InProcessClientStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config: Arc::new(build_test_config().await),
cli_overrides: Vec::new(),
loader_overrides: LoaderOverrides::default(),
cloud_requirements: CloudRequirementsLoader::default(),
feedback: CodexFeedback::new(),
config_warnings: Vec::new(),
session_source,
enable_codex_api_key_env: false,
client_name: "codex-app-server-client-test".to_string(),
client_version: "0.0.0-test".to_string(),
experimental_api: true,
opt_out_notification_methods: Vec::new(),
channel_capacity,
})
.await
.expect("in-process app-server client should start")
}
async fn start_test_client(session_source: SessionSource) -> InProcessAppServerClient {
start_test_client_with_capacity(session_source, DEFAULT_IN_PROCESS_CHANNEL_CAPACITY).await
}
#[tokio::test]
async fn typed_request_roundtrip_works() {
let client = start_test_client(SessionSource::Exec).await;
let _response: ConfigRequirementsReadResponse = client
.request_typed(ClientRequest::ConfigRequirementsRead {
request_id: RequestId::Integer(1),
params: None,
})
.await
.expect("typed request should succeed");
client.shutdown().await.expect("shutdown should complete");
}
#[tokio::test]
async fn typed_request_reports_json_rpc_errors() {
let client = start_test_client(SessionSource::Exec).await;
let err = client
.request_typed::<ConfigRequirementsReadResponse>(ClientRequest::ThreadRead {
request_id: RequestId::Integer(99),
params: codex_app_server_protocol::ThreadReadParams {
thread_id: "missing-thread".to_string(),
include_turns: false,
},
})
.await
.expect_err("missing thread should return a JSON-RPC error");
assert!(
err.to_string().starts_with("thread/read failed:"),
"expected method-qualified JSON-RPC failure message"
);
client.shutdown().await.expect("shutdown should complete");
}
#[tokio::test]
async fn caller_provided_session_source_is_applied() {
for (session_source, expected_source) in [
(SessionSource::Exec, ApiSessionSource::Exec),
(SessionSource::Cli, ApiSessionSource::Cli),
] {
let client = start_test_client(session_source).await;
let parsed: ThreadStartResponse = client
.request_typed(ClientRequest::ThreadStart {
request_id: RequestId::Integer(2),
params: ThreadStartParams {
ephemeral: Some(true),
..ThreadStartParams::default()
},
})
.await
.expect("thread/start should succeed");
assert_eq!(parsed.thread.source, expected_source);
client.shutdown().await.expect("shutdown should complete");
}
}
#[tokio::test]
async fn tiny_channel_capacity_still_supports_request_roundtrip() {
let client = start_test_client_with_capacity(SessionSource::Exec, 1).await;
let _response: ConfigRequirementsReadResponse = client
.request_typed(ClientRequest::ConfigRequirementsRead {
request_id: RequestId::Integer(1),
params: None,
})
.await
.expect("typed request should succeed");
client.shutdown().await.expect("shutdown should complete");
}
#[test]
fn typed_request_error_exposes_sources() {
let transport = TypedRequestError::Transport {
method: "config/read".to_string(),
source: IoError::new(ErrorKind::BrokenPipe, "closed"),
};
assert_eq!(std::error::Error::source(&transport).is_some(), true);
let server = TypedRequestError::Server {
method: "thread/read".to_string(),
source: JSONRPCErrorError {
code: -32603,
data: None,
message: "internal".to_string(),
},
};
assert_eq!(std::error::Error::source(&server).is_some(), false);
let deserialize = TypedRequestError::Deserialize {
method: "thread/start".to_string(),
source: serde_json::from_str::<u32>("\"nope\"")
.expect_err("invalid integer should return deserialize error"),
};
assert_eq!(std::error::Error::source(&deserialize).is_some(), true);
}
#[tokio::test]
async fn next_event_surfaces_lagged_markers() {
let (command_tx, _command_rx) = mpsc::channel(1);
let (event_tx, event_rx) = mpsc::channel(1);
let worker_handle = tokio::spawn(async {});
event_tx
.send(InProcessServerEvent::Lagged { skipped: 3 })
.await
.expect("lagged marker should enqueue");
drop(event_tx);
let mut client = InProcessAppServerClient {
command_tx,
event_rx,
worker_handle,
};
let event = timeout(Duration::from_secs(2), client.next_event())
.await
.expect("lagged marker should arrive before timeout");
assert!(matches!(
event,
Some(InProcessServerEvent::Lagged { skipped: 3 })
));
client.shutdown().await.expect("shutdown should complete");
}
#[test]
fn event_requires_delivery_marks_terminal_events() {
assert!(event_requires_delivery(
&InProcessServerEvent::ServerNotification(
codex_app_server_protocol::ServerNotification::TurnCompleted(
codex_app_server_protocol::TurnCompletedNotification {
thread_id: "thread".to_string(),
turn: codex_app_server_protocol::Turn {
id: "turn".to_string(),
items: Vec::new(),
status: codex_app_server_protocol::TurnStatus::Completed,
error: None,
},
}
)
)
));
assert!(event_requires_delivery(
&InProcessServerEvent::LegacyNotification(
codex_app_server_protocol::JSONRPCNotification {
method: "codex/event/turn_aborted".to_string(),
params: None,
}
)
));
assert!(!event_requires_delivery(&InProcessServerEvent::Lagged {
skipped: 1
}));
}
}

View File

@@ -350,10 +350,6 @@
"string",
"null"
]
},
"reloadUserConfig": {
"description": "When true, hot-reload the updated user config into all loaded threads after writing.",
"type": "boolean"
}
},
"required": [

View File

@@ -9943,10 +9943,6 @@
"string",
"null"
]
},
"reloadUserConfig": {
"description": "When true, hot-reload the updated user config into all loaded threads after writing.",
"type": "boolean"
}
},
"required": [

View File

@@ -2962,10 +2962,6 @@
"string",
"null"
]
},
"reloadUserConfig": {
"description": "When true, hot-reload the updated user config into all loaded threads after writing.",
"type": "boolean"
}
},
"required": [

View File

@@ -45,10 +45,6 @@
"string",
"null"
]
},
"reloadUserConfig": {
"description": "When true, hot-reload the updated user config into all loaded threads after writing.",
"type": "boolean"
}
},
"required": [

View File

@@ -7,8 +7,4 @@ export type ConfigBatchWriteParams = { edits: Array<ConfigEdit>,
/**
* Path to the config file to write; defaults to the user's `config.toml` when omitted.
*/
filePath?: string | null, expectedVersion?: string | null,
/**
* When true, hot-reload the updated user config into all loaded threads after writing.
*/
reloadUserConfig?: boolean, };
filePath?: string | null, expectedVersion?: string | null, };

View File

@@ -110,26 +110,6 @@ macro_rules! client_request_definitions {
)*
}
impl ClientRequest {
pub fn id(&self) -> &RequestId {
match self {
$(Self::$variant { request_id, .. } => request_id,)*
}
}
pub fn method(&self) -> String {
serde_json::to_value(self)
.ok()
.and_then(|value| {
value
.get("method")
.and_then(serde_json::Value::as_str)
.map(str::to_owned)
})
.unwrap_or_else(|| "<unknown>".to_string())
}
}
impl crate::experimental_api::ExperimentalApi for ClientRequest {
fn experimental_reason(&self) -> Option<&'static str> {
match self {
@@ -1156,8 +1136,6 @@ mod tests {
request_id: RequestId::Integer(1),
params: None,
};
assert_eq!(request.id(), &RequestId::Integer(1));
assert_eq!(request.method(), "account/rateLimits/read");
assert_eq!(
json!({
"method": "account/rateLimits/read",

View File

@@ -734,9 +734,6 @@ pub struct ConfigBatchWriteParams {
pub file_path: Option<String>,
#[ts(optional = nullable)]
pub expected_version: Option<String>,
/// When true, hot-reload the updated user config into all loaded threads after writing.
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub reload_user_config: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]

View File

@@ -169,7 +169,7 @@ Example with notification opt-out:
- `externalAgentConfig/detect` — detect migratable external-agent artifacts with `includeHome` and optional `cwds`; each detected item includes `cwd` (`null` for home).
- `externalAgentConfig/import` — apply selected external-agent migration items by passing explicit `migrationItems` with `cwd` (`null` for home).
- `config/value/write` — write a single config key/value to the user's config.toml on disk.
- `config/batchWrite` — apply multiple config edits atomically to the user's config.toml on disk, with optional `reloadUserConfig: true` to hot-reload loaded threads.
- `config/batchWrite` — apply multiple config edits atomically to the user's config.toml on disk.
- `configRequirements/read` — fetch loaded requirements constraints from `requirements.toml` and/or MDM (or `null` if none are configured), including allow-lists (`allowedApprovalPolicies`, `allowedSandboxModes`, `allowedWebSearchModes`), pinned feature values (`featureRequirements`), `enforceResidency`, and `network` constraints.
### Example: Start or resume a thread

View File

@@ -1,16 +1,6 @@
//! Tracing helpers shared by socket and in-process app-server entry points.
//!
//! The in-process path intentionally reuses the same span shape as JSON-RPC
//! transports so request telemetry stays comparable across stdio, websocket,
//! and embedded callers. [`typed_request_span`] is the in-process counterpart
//! of [`request_span`] and stamps `rpc.transport` as `"in-process"` while
//! deriving client identity from the typed [`ClientRequest`] rather than
//! from a parsed JSON envelope.
use crate::message_processor::ConnectionSessionState;
use crate::outgoing_message::ConnectionId;
use crate::transport::AppServerTransport;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCRequest;
use codex_otel::set_parent_from_context;
@@ -75,51 +65,6 @@ pub(crate) fn request_span(
span
}
/// Builds tracing span metadata for typed in-process requests.
///
/// This mirrors `request_span` semantics while stamping transport as
/// `in-process` and deriving client info either from initialize params or
/// from existing connection session state.
pub(crate) fn typed_request_span(
request: &ClientRequest,
connection_id: ConnectionId,
session: &ConnectionSessionState,
) -> Span {
let method = request.method();
let span = info_span!(
"app_server.request",
otel.kind = "server",
otel.name = method,
rpc.system = "jsonrpc",
rpc.method = method,
rpc.transport = "in-process",
rpc.request_id = ?request.id(),
app_server.connection_id = ?connection_id,
app_server.api_version = "v2",
app_server.client_name = field::Empty,
app_server.client_version = field::Empty,
);
if let Some((client_name, client_version)) = initialize_client_info_from_typed_request(request)
{
span.record("app_server.client_name", client_name);
span.record("app_server.client_version", client_version);
} else {
if let Some(client_name) = session.app_server_client_name.as_deref() {
span.record("app_server.client_name", client_name);
}
if let Some(client_version) = session.client_version.as_deref() {
span.record("app_server.client_version", client_version);
}
}
if let Some(context) = traceparent_context_from_env() {
set_parent_from_context(&span, context);
}
span
}
fn transport_name(transport: AppServerTransport) -> &'static str {
match transport {
AppServerTransport::Stdio => "stdio",
@@ -154,13 +99,3 @@ fn initialize_client_info(request: &JSONRPCRequest) -> Option<InitializeParams>
let params = request.params.clone()?;
serde_json::from_value(params).ok()
}
fn initialize_client_info_from_typed_request(request: &ClientRequest) -> Option<(&str, &str)> {
match request {
ClientRequest::Initialize { params, .. } => Some((
params.client_info.name.as_str(),
params.client_info.version.as_str(),
)),
_ => None,
}
}

View File

@@ -1,6 +1,5 @@
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use async_trait::async_trait;
use codex_app_server_protocol::ConfigBatchWriteParams;
use codex_app_server_protocol::ConfigReadParams;
use codex_app_server_protocol::ConfigReadResponse;
@@ -12,7 +11,6 @@ use codex_app_server_protocol::ConfigWriteResponse;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::NetworkRequirements;
use codex_app_server_protocol::SandboxMode;
use codex_core::ThreadManager;
use codex_core::config::ConfigService;
use codex_core::config::ConfigServiceError;
use codex_core::config_loader::CloudRequirementsLoader;
@@ -21,33 +19,11 @@ use codex_core::config_loader::LoaderOverrides;
use codex_core::config_loader::ResidencyRequirement as CoreResidencyRequirement;
use codex_core::config_loader::SandboxModeRequirement as CoreSandboxModeRequirement;
use codex_protocol::config_types::WebSearchMode;
use codex_protocol::protocol::Op;
use serde_json::json;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;
use toml::Value as TomlValue;
use tracing::warn;
#[async_trait]
pub(crate) trait UserConfigReloader: Send + Sync {
async fn reload_user_config(&self);
}
#[async_trait]
impl UserConfigReloader for ThreadManager {
async fn reload_user_config(&self) {
let thread_ids = self.list_thread_ids().await;
for thread_id in thread_ids {
let Ok(thread) = self.get_thread(thread_id).await else {
continue;
};
if let Err(err) = thread.submit(Op::ReloadUserConfig).await {
warn!("failed to request user config reload: {err}");
}
}
}
}
#[derive(Clone)]
pub(crate) struct ConfigApi {
@@ -55,7 +31,6 @@ pub(crate) struct ConfigApi {
cli_overrides: Vec<(String, TomlValue)>,
loader_overrides: LoaderOverrides,
cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
user_config_reloader: Arc<dyn UserConfigReloader>,
}
impl ConfigApi {
@@ -64,14 +39,12 @@ impl ConfigApi {
cli_overrides: Vec<(String, TomlValue)>,
loader_overrides: LoaderOverrides,
cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
user_config_reloader: Arc<dyn UserConfigReloader>,
) -> Self {
Self {
codex_home,
cli_overrides,
loader_overrides,
cloud_requirements,
user_config_reloader,
}
}
@@ -123,16 +96,10 @@ impl ConfigApi {
&self,
params: ConfigBatchWriteParams,
) -> Result<ConfigWriteResponse, JSONRPCErrorError> {
let reload_user_config = params.reload_user_config;
let response = self
.config_service()
self.config_service()
.batch_write(params)
.await
.map_err(map_error)?;
if reload_user_config {
self.user_config_reloader.reload_user_config().await;
}
Ok(response)
.map_err(map_error)
}
}
@@ -232,22 +199,6 @@ mod tests {
use codex_core::config_loader::NetworkRequirementsToml as CoreNetworkRequirementsToml;
use codex_protocol::protocol::AskForApproval as CoreAskForApproval;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use tempfile::TempDir;
#[derive(Default)]
struct RecordingUserConfigReloader {
call_count: AtomicUsize,
}
#[async_trait]
impl UserConfigReloader for RecordingUserConfigReloader {
async fn reload_user_config(&self) {
self.call_count.fetch_add(1, Ordering::Relaxed);
}
}
#[test]
fn map_requirements_toml_to_api_converts_core_enums() {
@@ -352,51 +303,4 @@ mod tests {
Some(vec![WebSearchMode::Disabled])
);
}
#[tokio::test]
async fn batch_write_reloads_user_config_when_requested() {
let codex_home = TempDir::new().expect("create temp dir");
let user_config_path = codex_home.path().join("config.toml");
std::fs::write(&user_config_path, "").expect("write config");
let reloader = Arc::new(RecordingUserConfigReloader::default());
let config_api = ConfigApi::new(
codex_home.path().to_path_buf(),
Vec::new(),
LoaderOverrides::default(),
Arc::new(RwLock::new(CloudRequirementsLoader::default())),
reloader.clone(),
);
let response = config_api
.batch_write(ConfigBatchWriteParams {
edits: vec![codex_app_server_protocol::ConfigEdit {
key_path: "model".to_string(),
value: json!("gpt-5"),
merge_strategy: codex_app_server_protocol::MergeStrategy::Replace,
}],
file_path: Some(user_config_path.display().to_string()),
expected_version: None,
reload_user_config: true,
})
.await
.expect("batch write should succeed");
assert_eq!(
response,
ConfigWriteResponse {
status: codex_app_server_protocol::WriteStatus::Ok,
version: response.version.clone(),
file_path: codex_utils_absolute_path::AbsolutePathBuf::try_from(
user_config_path.clone()
)
.expect("absolute config path"),
overridden_metadata: None,
}
);
assert_eq!(
std::fs::read_to_string(user_config_path).unwrap(),
"model = \"gpt-5\"\n"
);
assert_eq!(reloader.call_count.load(Ordering::Relaxed), 1);
}
}

View File

@@ -1,884 +0,0 @@
//! In-process app-server runtime host for local embedders.
//!
//! This module runs the existing [`MessageProcessor`] and outbound routing logic
//! on Tokio tasks, but replaces socket/stdio transports with bounded in-memory
//! channels. The intent is to preserve app-server semantics while avoiding a
//! process boundary for CLI surfaces that run in the same process.
//!
//! # Lifecycle
//!
//! 1. Construct runtime state with [`InProcessStartArgs`].
//! 2. Call [`start`], which performs the `initialize` / `initialized` handshake
//! internally and returns a ready-to-use [`InProcessClientHandle`].
//! 3. Send requests via [`InProcessClientHandle::request`], notifications via
//! [`InProcessClientHandle::notify`], and consume events via
//! [`InProcessClientHandle::next_event`].
//! 4. Terminate with [`InProcessClientHandle::shutdown`].
//!
//! # Transport model
//!
//! The runtime is transport-local but not protocol-free. Incoming requests are
//! typed [`ClientRequest`] values, yet responses still come back through the
//! same JSON-RPC result envelope that `MessageProcessor` uses for stdio and
//! websocket transports. This keeps in-process behavior aligned with
//! app-server rather than creating a second execution contract.
//!
//! # Backpressure
//!
//! Command submission uses `try_send` and can return `WouldBlock`, while event
//! fanout may drop notifications under saturation. Server requests are never
//! silently abandoned: if they cannot be queued they are failed back into
//! `MessageProcessor` with overload or internal errors so approval flows do
//! not hang indefinitely.
//!
//! # Relationship to `codex-app-server-client`
//!
//! This module provides the low-level runtime handle ([`InProcessClientHandle`]).
//! Higher-level callers (TUI, exec) should go through `codex-app-server-client`,
//! which wraps this module behind a worker task with async request/response
//! helpers, surface-specific startup policy, and bounded shutdown.
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::hash_map::Entry;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::io::Result as IoResult;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use crate::error_code::INTERNAL_ERROR_CODE;
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
use crate::error_code::OVERLOADED_ERROR_CODE;
use crate::message_processor::ConnectionSessionState;
use crate::message_processor::MessageProcessor;
use crate::message_processor::MessageProcessorArgs;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessage;
use crate::outgoing_message::OutgoingMessageSender;
use crate::transport::CHANNEL_CAPACITY;
use crate::transport::OutboundConnectionState;
use crate::transport::route_outgoing_envelope;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_arg0::Arg0DispatchPaths;
use codex_core::config::Config;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::LoaderOverrides;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::time::timeout;
use toml::Value as TomlValue;
use tracing::warn;
const IN_PROCESS_CONNECTION_ID: ConnectionId = ConnectionId(0);
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
/// Default bounded channel capacity for in-process runtime queues.
pub const DEFAULT_IN_PROCESS_CHANNEL_CAPACITY: usize = CHANNEL_CAPACITY;
type PendingClientRequestResponse = std::result::Result<Result, JSONRPCErrorError>;
fn server_notification_requires_delivery(notification: &ServerNotification) -> bool {
matches!(notification, ServerNotification::TurnCompleted(_))
}
fn legacy_notification_requires_delivery(notification: &JSONRPCNotification) -> bool {
matches!(
notification
.method
.strip_prefix("codex/event/")
.unwrap_or(&notification.method),
"task_complete" | "turn_aborted" | "shutdown_complete"
)
}
/// Input needed to start an in-process app-server runtime.
///
/// These fields mirror the pieces of ambient process state that stdio and
/// websocket transports normally assemble before `MessageProcessor` starts.
#[derive(Clone)]
pub struct InProcessStartArgs {
/// Resolved argv0 dispatch paths used by command execution internals.
pub arg0_paths: Arg0DispatchPaths,
/// Shared base config used to initialize core components.
pub config: Arc<Config>,
/// CLI config overrides that are already parsed into TOML values.
pub cli_overrides: Vec<(String, TomlValue)>,
/// Loader override knobs used by config API paths.
pub loader_overrides: LoaderOverrides,
/// Preloaded cloud requirements provider.
pub cloud_requirements: CloudRequirementsLoader,
/// Feedback sink used by app-server/core telemetry and logs.
pub feedback: CodexFeedback,
/// Startup warnings emitted after initialize succeeds.
pub config_warnings: Vec<ConfigWarningNotification>,
/// Session source stamped into thread/session metadata.
pub session_source: SessionSource,
/// Whether auth loading should honor the `CODEX_API_KEY` environment variable.
pub enable_codex_api_key_env: bool,
/// Initialize params used for initial handshake.
pub initialize: InitializeParams,
/// Capacity used for all runtime queues (clamped to at least 1).
pub channel_capacity: usize,
}
/// Event emitted from the app-server to the in-process client.
///
/// The stream carries three event families because CLI surfaces are mid-migration
/// from the legacy `codex_protocol::Event` model to the typed app-server
/// notification model. Once all surfaces consume only [`ServerNotification`],
/// [`LegacyNotification`](Self::LegacyNotification) can be removed.
///
/// [`Lagged`](Self::Lagged) is a transport health marker, not an application
/// event — it signals that the consumer fell behind and some events were dropped.
#[derive(Debug, Clone)]
pub enum InProcessServerEvent {
/// Server request that requires client response/rejection.
ServerRequest(ServerRequest),
/// App-server notification directed to the embedded client.
ServerNotification(ServerNotification),
/// Legacy JSON-RPC notification from core event bridge.
LegacyNotification(JSONRPCNotification),
/// Indicates one or more events were dropped due to backpressure.
Lagged { skipped: usize },
}
/// Internal message sent from [`InProcessClientHandle`] methods to the runtime task.
///
/// Requests carry a oneshot sender for the response; notifications and server-request
/// replies are fire-and-forget from the caller's perspective (transport errors are
/// caught by `try_send` on the outer channel).
enum InProcessClientMessage {
Request {
request: Box<ClientRequest>,
response_tx: oneshot::Sender<PendingClientRequestResponse>,
},
Notification {
notification: ClientNotification,
},
ServerRequestResponse {
request_id: RequestId,
result: Result,
},
ServerRequestError {
request_id: RequestId,
error: JSONRPCErrorError,
},
Shutdown {
done_tx: oneshot::Sender<()>,
},
}
enum ProcessorCommand {
Request(Box<ClientRequest>),
Notification(ClientNotification),
}
#[derive(Clone)]
pub struct InProcessClientSender {
client_tx: mpsc::Sender<InProcessClientMessage>,
}
impl InProcessClientSender {
pub async fn request(&self, request: ClientRequest) -> IoResult<PendingClientRequestResponse> {
let (response_tx, response_rx) = oneshot::channel();
self.try_send_client_message(InProcessClientMessage::Request {
request: Box::new(request),
response_tx,
})?;
response_rx.await.map_err(|err| {
IoError::new(
ErrorKind::BrokenPipe,
format!("in-process request response channel closed: {err}"),
)
})
}
pub fn notify(&self, notification: ClientNotification) -> IoResult<()> {
self.try_send_client_message(InProcessClientMessage::Notification { notification })
}
pub fn respond_to_server_request(&self, request_id: RequestId, result: Result) -> IoResult<()> {
self.try_send_client_message(InProcessClientMessage::ServerRequestResponse {
request_id,
result,
})
}
pub fn fail_server_request(
&self,
request_id: RequestId,
error: JSONRPCErrorError,
) -> IoResult<()> {
self.try_send_client_message(InProcessClientMessage::ServerRequestError {
request_id,
error,
})
}
fn try_send_client_message(&self, message: InProcessClientMessage) -> IoResult<()> {
match self.client_tx.try_send(message) {
Ok(()) => Ok(()),
Err(mpsc::error::TrySendError::Full(_)) => Err(IoError::new(
ErrorKind::WouldBlock,
"in-process app-server client queue is full",
)),
Err(mpsc::error::TrySendError::Closed(_)) => Err(IoError::new(
ErrorKind::BrokenPipe,
"in-process app-server runtime is closed",
)),
}
}
}
/// Handle used by an in-process client to call app-server and consume events.
///
/// This is the low-level runtime handle. Higher-level callers should usually go
/// through `codex-app-server-client`, which adds worker-task buffering,
/// request/response helpers, and surface-specific startup policy.
pub struct InProcessClientHandle {
client: InProcessClientSender,
event_rx: mpsc::Receiver<InProcessServerEvent>,
runtime_handle: tokio::task::JoinHandle<()>,
}
impl InProcessClientHandle {
/// Sends a typed client request into the in-process runtime.
///
/// The returned value is a transport-level `IoResult` containing either a
/// JSON-RPC success payload or JSON-RPC error payload. Callers must keep
/// request IDs unique among concurrent requests; reusing an in-flight ID
/// produces an `INVALID_REQUEST` response and can make request routing
/// ambiguous in the caller.
pub async fn request(&self, request: ClientRequest) -> IoResult<PendingClientRequestResponse> {
self.client.request(request).await
}
/// Sends a typed client notification into the in-process runtime.
///
/// Notifications do not have an application-level response. Transport
/// errors indicate queue saturation or closed runtime.
pub fn notify(&self, notification: ClientNotification) -> IoResult<()> {
self.client.notify(notification)
}
/// Resolves a pending [`ServerRequest`](InProcessServerEvent::ServerRequest).
///
/// This should be used only with request IDs received from the current
/// runtime event stream; sending arbitrary IDs has no effect on app-server
/// state and can mask a stuck approval flow in the caller.
pub fn respond_to_server_request(&self, request_id: RequestId, result: Result) -> IoResult<()> {
self.client.respond_to_server_request(request_id, result)
}
/// Rejects a pending [`ServerRequest`](InProcessServerEvent::ServerRequest).
///
/// Use this when the embedder cannot satisfy a server request; leaving
/// requests unanswered can stall turn progress.
pub fn fail_server_request(
&self,
request_id: RequestId,
error: JSONRPCErrorError,
) -> IoResult<()> {
self.client.fail_server_request(request_id, error)
}
/// Receives the next server event from the in-process runtime.
///
/// Returns `None` when the runtime task exits and no more events are
/// available.
pub async fn next_event(&mut self) -> Option<InProcessServerEvent> {
self.event_rx.recv().await
}
/// Requests runtime shutdown and waits for worker termination.
///
/// Shutdown is bounded by internal timeouts and may abort background tasks
/// if graceful drain does not complete in time.
pub async fn shutdown(self) -> IoResult<()> {
let mut runtime_handle = self.runtime_handle;
let (done_tx, done_rx) = oneshot::channel();
if self
.client
.client_tx
.send(InProcessClientMessage::Shutdown { done_tx })
.await
.is_ok()
{
let _ = timeout(SHUTDOWN_TIMEOUT, done_rx).await;
}
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut runtime_handle).await {
runtime_handle.abort();
let _ = runtime_handle.await;
}
Ok(())
}
pub fn sender(&self) -> InProcessClientSender {
self.client.clone()
}
}
/// Starts an in-process app-server runtime and performs initialize handshake.
///
/// This function sends `initialize` followed by `initialized` before returning
/// the handle, so callers receive a ready-to-use runtime. If initialize fails,
/// the runtime is shut down and an `InvalidData` error is returned.
pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle> {
let initialize = args.initialize.clone();
let client = start_uninitialized(args);
let initialize_response = client
.request(ClientRequest::Initialize {
request_id: RequestId::Integer(0),
params: initialize,
})
.await?;
if let Err(error) = initialize_response {
let _ = client.shutdown().await;
return Err(IoError::new(
ErrorKind::InvalidData,
format!("in-process initialize failed: {}", error.message),
));
}
client.notify(ClientNotification::Initialized)?;
Ok(client)
}
fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
let channel_capacity = args.channel_capacity.max(1);
let (client_tx, mut client_rx) = mpsc::channel::<InProcessClientMessage>(channel_capacity);
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
let runtime_handle = tokio::spawn(async move {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(channel_capacity);
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let (writer_tx, mut writer_rx) = mpsc::channel::<OutgoingMessage>(channel_capacity);
let outbound_initialized = Arc::new(AtomicBool::new(false));
let outbound_experimental_api_enabled = Arc::new(AtomicBool::new(false));
let outbound_opted_out_notification_methods = Arc::new(RwLock::new(HashSet::new()));
let mut outbound_connections = HashMap::<ConnectionId, OutboundConnectionState>::new();
outbound_connections.insert(
IN_PROCESS_CONNECTION_ID,
OutboundConnectionState::new(
writer_tx,
Arc::clone(&outbound_initialized),
Arc::clone(&outbound_experimental_api_enabled),
Arc::clone(&outbound_opted_out_notification_methods),
None,
),
);
let mut outbound_handle = tokio::spawn(async move {
while let Some(envelope) = outgoing_rx.recv().await {
route_outgoing_envelope(&mut outbound_connections, envelope).await;
}
});
let processor_outgoing = Arc::clone(&outgoing_message_sender);
let (processor_tx, mut processor_rx) = mpsc::channel::<ProcessorCommand>(channel_capacity);
let mut processor_handle = tokio::spawn(async move {
let mut processor = MessageProcessor::new(MessageProcessorArgs {
outgoing: Arc::clone(&processor_outgoing),
arg0_paths: args.arg0_paths,
config: args.config,
cli_overrides: args.cli_overrides,
loader_overrides: args.loader_overrides,
cloud_requirements: args.cloud_requirements,
feedback: args.feedback,
log_db: None,
config_warnings: args.config_warnings,
session_source: args.session_source,
enable_codex_api_key_env: args.enable_codex_api_key_env,
});
let mut thread_created_rx = processor.thread_created_receiver();
let mut session = ConnectionSessionState::default();
let mut listen_for_threads = true;
loop {
tokio::select! {
command = processor_rx.recv() => {
match command {
Some(ProcessorCommand::Request(request)) => {
let was_initialized = session.initialized;
processor
.process_client_request(
IN_PROCESS_CONNECTION_ID,
*request,
&mut session,
&outbound_initialized,
)
.await;
if let Ok(mut opted_out_notification_methods) =
outbound_opted_out_notification_methods.write()
{
*opted_out_notification_methods =
session.opted_out_notification_methods.clone();
} else {
warn!("failed to update outbound opted-out notifications");
}
outbound_experimental_api_enabled.store(
session.experimental_api_enabled,
Ordering::Release,
);
if !was_initialized && session.initialized {
processor.send_initialize_notifications().await;
}
}
Some(ProcessorCommand::Notification(notification)) => {
processor.process_client_notification(notification).await;
}
None => {
break;
}
}
}
created = thread_created_rx.recv(), if listen_for_threads => {
match created {
Ok(thread_id) => {
let connection_ids = if session.initialized {
vec![IN_PROCESS_CONNECTION_ID]
} else {
Vec::<ConnectionId>::new()
};
processor
.try_attach_thread_listener(thread_id, connection_ids)
.await;
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
warn!("thread_created receiver lagged; skipping resync");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
listen_for_threads = false;
}
}
}
}
}
processor.connection_closed(IN_PROCESS_CONNECTION_ID).await;
});
let mut pending_request_responses =
HashMap::<RequestId, oneshot::Sender<PendingClientRequestResponse>>::new();
let mut shutdown_ack = None;
loop {
tokio::select! {
message = client_rx.recv() => {
match message {
Some(InProcessClientMessage::Request { request, response_tx }) => {
let request = *request;
let request_id = request.id().clone();
match pending_request_responses.entry(request_id.clone()) {
Entry::Vacant(entry) => {
entry.insert(response_tx);
}
Entry::Occupied(_) => {
let _ = response_tx.send(Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("duplicate request id: {request_id:?}"),
data: None,
}));
continue;
}
}
match processor_tx.try_send(ProcessorCommand::Request(Box::new(request))) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(_)) => {
if let Some(response_tx) =
pending_request_responses.remove(&request_id)
{
let _ = response_tx.send(Err(JSONRPCErrorError {
code: OVERLOADED_ERROR_CODE,
message: "in-process app-server request queue is full"
.to_string(),
data: None,
}));
}
}
Err(mpsc::error::TrySendError::Closed(_)) => {
if let Some(response_tx) =
pending_request_responses.remove(&request_id)
{
let _ = response_tx.send(Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message:
"in-process app-server request processor is closed"
.to_string(),
data: None,
}));
}
break;
}
}
}
Some(InProcessClientMessage::Notification { notification }) => {
match processor_tx.try_send(ProcessorCommand::Notification(notification)) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(_)) => {
warn!("dropping in-process client notification (queue full)");
}
Err(mpsc::error::TrySendError::Closed(_)) => {
break;
}
}
}
Some(InProcessClientMessage::ServerRequestResponse { request_id, result }) => {
outgoing_message_sender
.notify_client_response(request_id, result)
.await;
}
Some(InProcessClientMessage::ServerRequestError { request_id, error }) => {
outgoing_message_sender
.notify_client_error(request_id, error)
.await;
}
Some(InProcessClientMessage::Shutdown { done_tx }) => {
shutdown_ack = Some(done_tx);
break;
}
None => {
break;
}
}
}
outgoing_message = writer_rx.recv() => {
let Some(outgoing_message) = outgoing_message else {
break;
};
match outgoing_message {
OutgoingMessage::Response(response) => {
if let Some(response_tx) = pending_request_responses.remove(&response.id) {
let _ = response_tx.send(Ok(response.result));
} else {
warn!(
request_id = ?response.id,
"dropping unmatched in-process response"
);
}
}
OutgoingMessage::Error(error) => {
if let Some(response_tx) = pending_request_responses.remove(&error.id) {
let _ = response_tx.send(Err(error.error));
} else {
warn!(
request_id = ?error.id,
"dropping unmatched in-process error response"
);
}
}
OutgoingMessage::Request(request) => {
// Send directly to avoid cloning; on failure the
// original value is returned inside the error.
if let Err(send_error) = event_tx
.try_send(InProcessServerEvent::ServerRequest(request))
{
let (code, message, inner) = match send_error {
mpsc::error::TrySendError::Full(inner) => (
OVERLOADED_ERROR_CODE,
"in-process server request queue is full",
inner,
),
mpsc::error::TrySendError::Closed(inner) => (
INTERNAL_ERROR_CODE,
"in-process server request consumer is closed",
inner,
),
};
let request_id = match inner {
InProcessServerEvent::ServerRequest(req) => req.id().clone(),
_ => unreachable!("we just sent a ServerRequest variant"),
};
outgoing_message_sender
.notify_client_error(
request_id,
JSONRPCErrorError {
code,
message: message.to_string(),
data: None,
},
)
.await;
}
}
OutgoingMessage::AppServerNotification(notification) => {
if server_notification_requires_delivery(&notification) {
if event_tx
.send(InProcessServerEvent::ServerNotification(notification))
.await
.is_err()
{
break;
}
} else if let Err(send_error) =
event_tx.try_send(InProcessServerEvent::ServerNotification(notification))
{
match send_error {
mpsc::error::TrySendError::Full(_) => {
warn!("dropping in-process server notification (queue full)");
}
mpsc::error::TrySendError::Closed(_) => {
break;
}
}
}
}
OutgoingMessage::Notification(notification) => {
let notification = JSONRPCNotification {
method: notification.method,
params: notification.params,
};
if legacy_notification_requires_delivery(&notification) {
if event_tx
.send(InProcessServerEvent::LegacyNotification(notification))
.await
.is_err()
{
break;
}
} else if let Err(send_error) =
event_tx.try_send(InProcessServerEvent::LegacyNotification(notification))
{
match send_error {
mpsc::error::TrySendError::Full(_) => {
warn!("dropping in-process legacy notification (queue full)");
}
mpsc::error::TrySendError::Closed(_) => {
break;
}
}
}
}
}
}
}
}
drop(writer_rx);
drop(processor_tx);
outgoing_message_sender
.cancel_all_requests(Some(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "in-process app-server runtime is shutting down".to_string(),
data: None,
}))
.await;
// Drop the runtime's last sender before awaiting the router task so
// `outgoing_rx.recv()` can observe channel closure and exit cleanly.
drop(outgoing_message_sender);
for (_, response_tx) in pending_request_responses {
let _ = response_tx.send(Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "in-process app-server runtime is shutting down".to_string(),
data: None,
}));
}
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut processor_handle).await {
processor_handle.abort();
let _ = processor_handle.await;
}
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut outbound_handle).await {
outbound_handle.abort();
let _ = outbound_handle.await;
}
if let Some(done_tx) = shutdown_ack {
let _ = done_tx.send(());
}
});
InProcessClientHandle {
client: InProcessClientSender { client_tx },
event_rx,
runtime_handle,
}
}
#[cfg(test)]
mod tests {
use super::*;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ConfigRequirementsReadResponse;
use codex_app_server_protocol::SessionSource as ApiSessionSource;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnCompletedNotification;
use codex_app_server_protocol::TurnStatus;
use codex_core::config::ConfigBuilder;
use pretty_assertions::assert_eq;
async fn build_test_config() -> Config {
match ConfigBuilder::default().build().await {
Ok(config) => config,
Err(_) => Config::load_default_with_cli_overrides(Vec::new())
.expect("default config should load"),
}
}
async fn start_test_client_with_capacity(
session_source: SessionSource,
channel_capacity: usize,
) -> InProcessClientHandle {
let args = InProcessStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config: Arc::new(build_test_config().await),
cli_overrides: Vec::new(),
loader_overrides: LoaderOverrides::default(),
cloud_requirements: CloudRequirementsLoader::default(),
feedback: CodexFeedback::new(),
config_warnings: Vec::new(),
session_source,
enable_codex_api_key_env: false,
initialize: InitializeParams {
client_info: ClientInfo {
name: "codex-in-process-test".to_string(),
title: None,
version: "0.0.0".to_string(),
},
capabilities: None,
},
channel_capacity,
};
start(args).await.expect("in-process runtime should start")
}
async fn start_test_client(session_source: SessionSource) -> InProcessClientHandle {
start_test_client_with_capacity(session_source, DEFAULT_IN_PROCESS_CHANNEL_CAPACITY).await
}
#[tokio::test]
async fn in_process_start_initializes_and_handles_typed_v2_request() {
let client = start_test_client(SessionSource::Cli).await;
let response = client
.request(ClientRequest::ConfigRequirementsRead {
request_id: RequestId::Integer(1),
params: None,
})
.await
.expect("request transport should work")
.expect("request should succeed");
assert!(response.is_object());
let _parsed: ConfigRequirementsReadResponse =
serde_json::from_value(response).expect("response should match v2 schema");
client
.shutdown()
.await
.expect("in-process runtime should shutdown cleanly");
}
#[tokio::test]
async fn in_process_start_uses_requested_session_source_for_thread_start() {
for (requested_source, expected_source) in [
(SessionSource::Cli, ApiSessionSource::Cli),
(SessionSource::Exec, ApiSessionSource::Exec),
] {
let client = start_test_client(requested_source).await;
let response = client
.request(ClientRequest::ThreadStart {
request_id: RequestId::Integer(2),
params: ThreadStartParams {
ephemeral: Some(true),
..ThreadStartParams::default()
},
})
.await
.expect("request transport should work")
.expect("thread/start should succeed");
let parsed: ThreadStartResponse =
serde_json::from_value(response).expect("thread/start response should parse");
assert_eq!(parsed.thread.source, expected_source);
client
.shutdown()
.await
.expect("in-process runtime should shutdown cleanly");
}
}
#[tokio::test]
async fn in_process_start_clamps_zero_channel_capacity() {
let client = start_test_client_with_capacity(SessionSource::Cli, 0).await;
let response = loop {
match client
.request(ClientRequest::ConfigRequirementsRead {
request_id: RequestId::Integer(4),
params: None,
})
.await
{
Ok(response) => break response.expect("request should succeed"),
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
tokio::task::yield_now().await;
}
Err(err) => panic!("request transport should work: {err}"),
}
};
let _parsed: ConfigRequirementsReadResponse =
serde_json::from_value(response).expect("response should match v2 schema");
client
.shutdown()
.await
.expect("in-process runtime should shutdown cleanly");
}
#[test]
fn guaranteed_delivery_helpers_cover_terminal_notifications() {
assert!(server_notification_requires_delivery(
&ServerNotification::TurnCompleted(TurnCompletedNotification {
thread_id: "thread-1".to_string(),
turn: Turn {
id: "turn-1".to_string(),
items: Vec::new(),
status: TurnStatus::Completed,
error: None,
},
})
));
assert!(legacy_notification_requires_delivery(
&JSONRPCNotification {
method: "codex/event/task_complete".to_string(),
params: None,
}
));
assert!(legacy_notification_requires_delivery(
&JSONRPCNotification {
method: "codex/event/turn_aborted".to_string(),
params: None,
}
));
assert!(legacy_notification_requires_delivery(
&JSONRPCNotification {
method: "codex/event/shutdown_complete".to_string(),
params: None,
}
));
assert!(!legacy_notification_requires_delivery(
&JSONRPCNotification {
method: "codex/event/item_started".to_string(),
params: None,
}
));
}
}

View File

@@ -39,7 +39,6 @@ use codex_core::check_execpolicy_for_warnings;
use codex_core::config_loader::ConfigLoadError;
use codex_core::config_loader::TextRange as CoreTextRange;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;
use codex_state::log_db;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
@@ -66,7 +65,6 @@ mod error_code;
mod external_agent_config_api;
mod filters;
mod fuzzy_file_search;
pub mod in_process;
mod message_processor;
mod models;
mod outgoing_message;
@@ -599,8 +597,6 @@ pub async fn run_main_with_transport(
feedback: feedback.clone(),
log_db,
config_warnings,
session_source: SessionSource::VSCode,
enable_codex_api_key_env: false,
});
let mut thread_created_rx = processor.thread_created_receiver();
let mut running_turn_count_rx = processor.subscribe_running_assistant_turn_count();

View File

@@ -18,7 +18,6 @@ use codex_app_server_protocol::ChatgptAuthTokensRefreshParams;
use codex_app_server_protocol::ChatgptAuthTokensRefreshReason;
use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConfigBatchWriteParams;
use codex_app_server_protocol::ConfigReadParams;
@@ -158,8 +157,6 @@ pub(crate) struct MessageProcessorArgs {
pub(crate) feedback: CodexFeedback,
pub(crate) log_db: Option<LogDbLayer>,
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
pub(crate) session_source: SessionSource,
pub(crate) enable_codex_api_key_env: bool,
}
impl MessageProcessor {
@@ -176,12 +173,10 @@ impl MessageProcessor {
feedback,
log_db,
config_warnings,
session_source,
enable_codex_api_key_env,
} = args;
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
enable_codex_api_key_env,
false,
config.cli_auth_credentials_store_mode,
);
auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id.clone());
@@ -191,7 +186,7 @@ impl MessageProcessor {
let thread_manager = Arc::new(ThreadManager::new(
config.codex_home.clone(),
auth_manager.clone(),
session_source,
SessionSource::VSCode,
config.model_catalog.clone(),
CollaborationModesConfig {
default_mode_request_user_input: config
@@ -205,7 +200,7 @@ impl MessageProcessor {
let cloud_requirements = Arc::new(RwLock::new(cloud_requirements));
let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs {
auth_manager,
thread_manager: Arc::clone(&thread_manager),
thread_manager,
outgoing: outgoing.clone(),
arg0_paths,
config: Arc::clone(&config),
@@ -219,7 +214,6 @@ impl MessageProcessor {
cli_overrides,
loader_overrides,
cloud_requirements,
thread_manager,
);
let external_agent_config_api = ExternalAgentConfigApi::new(config.codex_home.clone());
@@ -280,50 +274,187 @@ impl MessageProcessor {
}
};
self.handle_client_request(
connection_id,
request_id,
codex_request,
session,
outbound_initialized,
)
.await;
}
.instrument(request_span)
.await;
}
match codex_request {
// Handle Initialize internally so CodexMessageProcessor does not have to concern
// itself with the `initialized` bool.
ClientRequest::Initialize { request_id, params } => {
let request_id = ConnectionRequestId {
connection_id,
request_id,
};
if session.initialized {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Already initialized".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
} else {
// TODO(maxj): Revisit capability scoping for `experimental_api_enabled`.
// Current behavior is per-connection. Reviewer feedback notes this can
// create odd cross-client behavior (for example dynamic tool calls on a
// shared thread when another connected client did not opt into
// experimental API). Proposed direction is instance-global first-write-wins
// with initialize-time mismatch rejection.
let (experimental_api_enabled, opt_out_notification_methods) =
match params.capabilities {
Some(capabilities) => (
capabilities.experimental_api,
capabilities
.opt_out_notification_methods
.unwrap_or_default(),
),
None => (false, Vec::new()),
};
session.experimental_api_enabled = experimental_api_enabled;
session.opted_out_notification_methods =
opt_out_notification_methods.into_iter().collect();
let ClientInfo {
name,
title: _title,
version,
} = params.client_info;
session.app_server_client_name = Some(name.clone());
session.client_version = Some(version.clone());
if let Err(error) = set_default_originator(name.clone()) {
match error {
SetOriginatorError::InvalidHeaderValue => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"Invalid clientInfo.name: '{name}'. Must be a valid HTTP header value."
),
data: None,
};
self.outgoing.send_error(request_id.clone(), error).await;
return;
}
SetOriginatorError::AlreadyInitialized => {
// No-op. This is expected to happen if the originator is already set via env var.
// TODO(owen): Once we remove support for CODEX_INTERNAL_ORIGINATOR_OVERRIDE,
// this will be an unexpected state and we can return a JSON-RPC error indicating
// internal server error.
}
}
}
set_default_client_residency_requirement(self.config.enforce_residency.value());
let user_agent_suffix = format!("{name}; {version}");
if let Ok(mut suffix) = USER_AGENT_SUFFIX.lock() {
*suffix = Some(user_agent_suffix);
}
/// Handles a typed request path used by in-process embedders.
///
/// This bypasses JSON request deserialization but keeps identical request
/// semantics by delegating to `handle_client_request`.
pub(crate) async fn process_client_request(
&mut self,
connection_id: ConnectionId,
request: ClientRequest,
session: &mut ConnectionSessionState,
outbound_initialized: &AtomicBool,
) {
let request_span =
crate::app_server_tracing::typed_request_span(&request, connection_id, session);
async {
let request_id = ConnectionRequestId {
connection_id,
request_id: request.id().clone(),
};
tracing::trace!(
?connection_id,
request_id = ?request_id.request_id,
"app-server typed request"
);
self.handle_client_request(
connection_id,
request_id,
request,
session,
outbound_initialized,
)
.await;
let user_agent = get_codex_user_agent();
let response = InitializeResponse { user_agent };
self.outgoing.send_response(request_id, response).await;
session.initialized = true;
outbound_initialized.store(true, Ordering::Release);
self.codex_message_processor
.connection_initialized(connection_id)
.await;
return;
}
}
_ => {
if !session.initialized {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Not initialized".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
}
}
if let Some(reason) = codex_request.experimental_reason()
&& !session.experimental_api_enabled
{
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: experimental_required_message(reason),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
match codex_request {
ClientRequest::ConfigRead { request_id, params } => {
self.handle_config_read(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ExternalAgentConfigDetect { request_id, params } => {
self.handle_external_agent_config_detect(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ExternalAgentConfigImport { request_id, params } => {
self.handle_external_agent_config_import(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigValueWrite { request_id, params } => {
self.handle_config_value_write(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigBatchWrite { request_id, params } => {
self.handle_config_batch_write(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigRequirementsRead {
request_id,
params: _,
} => {
self.handle_config_requirements_read(ConnectionRequestId {
connection_id,
request_id,
})
.await;
}
other => {
// Box the delegated future so this wrapper's async state machine does not
// inline the full `CodexMessageProcessor::process_request` future, which
// can otherwise push worker-thread stack usage over the edge.
self.codex_message_processor
.process_request(
connection_id,
other,
session.app_server_client_name.clone(),
)
.boxed()
.await;
}
}
}
.instrument(request_span)
.await;
@@ -335,13 +466,6 @@ impl MessageProcessor {
tracing::info!("<- notification: {:?}", notification);
}
/// Handles typed notifications from in-process clients.
pub(crate) async fn process_client_notification(&self, notification: ClientNotification) {
// Currently, we do not expect to receive any typed notifications from
// in-process clients, so we just log them.
tracing::info!("<- typed notification: {:?}", notification);
}
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
self.codex_message_processor.thread_created_receiver()
}
@@ -388,193 +512,6 @@ impl MessageProcessor {
self.outgoing.notify_client_error(err.id, err.error).await;
}
async fn handle_client_request(
&mut self,
connection_id: ConnectionId,
request_id: ConnectionRequestId,
codex_request: ClientRequest,
session: &mut ConnectionSessionState,
outbound_initialized: &AtomicBool,
) {
match codex_request {
// Handle Initialize internally so CodexMessageProcessor does not have to concern
// itself with the `initialized` bool.
ClientRequest::Initialize { request_id, params } => {
let request_id = ConnectionRequestId {
connection_id,
request_id,
};
if session.initialized {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Already initialized".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
// TODO(maxj): Revisit capability scoping for `experimental_api_enabled`.
// Current behavior is per-connection. Reviewer feedback notes this can
// create odd cross-client behavior (for example dynamic tool calls on a
// shared thread when another connected client did not opt into
// experimental API). Proposed direction is instance-global first-write-wins
// with initialize-time mismatch rejection.
let (experimental_api_enabled, opt_out_notification_methods) =
match params.capabilities {
Some(capabilities) => (
capabilities.experimental_api,
capabilities
.opt_out_notification_methods
.unwrap_or_default(),
),
None => (false, Vec::new()),
};
session.experimental_api_enabled = experimental_api_enabled;
session.opted_out_notification_methods =
opt_out_notification_methods.into_iter().collect();
let ClientInfo {
name,
title: _title,
version,
} = params.client_info;
session.app_server_client_name = Some(name.clone());
session.client_version = Some(version.clone());
if let Err(error) = set_default_originator(name.clone()) {
match error {
SetOriginatorError::InvalidHeaderValue => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"Invalid clientInfo.name: '{name}'. Must be a valid HTTP header value."
),
data: None,
};
self.outgoing.send_error(request_id.clone(), error).await;
return;
}
SetOriginatorError::AlreadyInitialized => {
// No-op. This is expected to happen if the originator is already set via env var.
// TODO(owen): Once we remove support for CODEX_INTERNAL_ORIGINATOR_OVERRIDE,
// this will be an unexpected state and we can return a JSON-RPC error indicating
// internal server error.
}
}
}
set_default_client_residency_requirement(self.config.enforce_residency.value());
let user_agent_suffix = format!("{name}; {version}");
if let Ok(mut suffix) = USER_AGENT_SUFFIX.lock() {
*suffix = Some(user_agent_suffix);
}
let user_agent = get_codex_user_agent();
let response = InitializeResponse { user_agent };
self.outgoing.send_response(request_id, response).await;
session.initialized = true;
outbound_initialized.store(true, Ordering::Release);
self.codex_message_processor
.connection_initialized(connection_id)
.await;
return;
}
_ => {
if !session.initialized {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "Not initialized".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
}
}
if let Some(reason) = codex_request.experimental_reason()
&& !session.experimental_api_enabled
{
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: experimental_required_message(reason),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
match codex_request {
ClientRequest::ConfigRead { request_id, params } => {
self.handle_config_read(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ExternalAgentConfigDetect { request_id, params } => {
self.handle_external_agent_config_detect(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ExternalAgentConfigImport { request_id, params } => {
self.handle_external_agent_config_import(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigValueWrite { request_id, params } => {
self.handle_config_value_write(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigBatchWrite { request_id, params } => {
self.handle_config_batch_write(
ConnectionRequestId {
connection_id,
request_id,
},
params,
)
.await;
}
ClientRequest::ConfigRequirementsRead {
request_id,
params: _,
} => {
self.handle_config_requirements_read(ConnectionRequestId {
connection_id,
request_id,
})
.await;
}
other => {
// Box the delegated future so this wrapper's async state machine does not
// inline the full `CodexMessageProcessor::process_request` future, which
// can otherwise push worker-thread stack usage over the edge.
self.codex_message_processor
.process_request(connection_id, other, session.app_server_client_name.clone())
.boxed()
.await;
}
}
}
async fn handle_config_read(&self, request_id: ConnectionRequestId, params: ConfigReadParams) {
match self.config_api.read(params).await {
Ok(response) => self.outgoing.send_response(request_id, response).await,

View File

@@ -273,25 +273,6 @@ impl OutgoingMessageSender {
self.take_request_callback(id).await.is_some()
}
pub(crate) async fn cancel_all_requests(&self, error: Option<JSONRPCErrorError>) {
let entries = {
let mut request_id_to_callback = self.request_id_to_callback.lock().await;
request_id_to_callback
.drain()
.map(|(_, entry)| entry)
.collect::<Vec<_>>()
};
if let Some(error) = error {
for entry in entries {
if let Err(err) = entry.callback.send(Err(error.clone())) {
let request_id = entry.request.id();
warn!("could not notify callback for {request_id:?} due to: {err:?}");
}
}
}
}
async fn take_request_callback(
&self,
id: &RequestId,

View File

@@ -605,7 +605,6 @@ async fn config_batch_write_applies_multiple_edits() -> Result<()> {
},
],
expected_version: None,
reload_user_config: false,
})
.await?;
let batch_resp: JSONRPCResponse = timeout(

View File

@@ -53,6 +53,9 @@ enum WsCommand {
message: Message,
tx_result: oneshot::Sender<Result<(), WsError>>,
},
Close {
tx_result: oneshot::Sender<Result<(), WsError>>,
},
}
impl WsStream {
@@ -77,6 +80,11 @@ impl WsStream {
break;
}
}
WsCommand::Close { tx_result } => {
let result = inner.close(None).await;
let _ = tx_result.send(result);
break;
}
}
}
message = inner.next() => {
@@ -136,6 +144,11 @@ impl WsStream {
.await
}
async fn close(&self) -> Result<(), WsError> {
self.request(|tx_result| WsCommand::Close { tx_result })
.await
}
async fn next(&mut self) -> Option<Result<Message, WsError>> {
self.rx_message.recv().await
}
@@ -229,32 +242,26 @@ impl ResponsesWebsocketConnection {
.await;
}
let mut guard = stream.lock().await;
let result = {
let Some(ws_stream) = guard.as_mut() else {
let _ = tx_event
.send(Err(ApiError::Stream(
"websocket connection is closed".to_string(),
)))
.await;
return;
};
run_websocket_response_stream(
ws_stream,
tx_event.clone(),
request_body,
idle_timeout,
telemetry,
)
.await
let Some(ws_stream) = guard.as_mut() else {
let _ = tx_event
.send(Err(ApiError::Stream(
"websocket connection is closed".to_string(),
)))
.await;
return;
};
if let Err(err) = result {
// A terminal stream error should reach the caller immediately. Waiting for a
// graceful close handshake here can stall indefinitely and mask the error.
let failed_stream = guard.take();
drop(guard);
drop(failed_stream);
if let Err(err) = run_websocket_response_stream(
ws_stream,
tx_event.clone(),
request_body,
idle_timeout,
telemetry,
)
.await
{
let _ = ws_stream.close().await;
*guard = None;
let _ = tx_event.send(Err(err)).await;
}
});

View File

@@ -201,8 +201,13 @@ impl Session {
}
pub async fn abort_all_tasks(self: &Arc<Self>, reason: TurnAbortReason) {
for task in self.take_all_running_tasks().await {
self.handle_task_abort(task, reason.clone()).await;
if let Some(mut active_turn) = self.take_active_turn().await {
for task in active_turn.drain_tasks() {
self.handle_task_abort(task, reason.clone()).await;
}
// Let interrupted tasks observe cancellation before dropping pending approvals, or an
// in-flight approval wait can surface as a model-visible rejection before TurnAborted.
active_turn.clear_pending().await;
}
if reason == TurnAbortReason::Interrupted {
self.close_unified_exec_processes().await;
@@ -342,16 +347,9 @@ impl Session {
*active = Some(turn);
}
async fn take_all_running_tasks(&self) -> Vec<RunningTask> {
async fn take_active_turn(&self) -> Option<ActiveTurn> {
let mut active = self.active_turn.lock().await;
match active.take() {
Some(mut at) => {
at.clear_pending().await;
at.drain_tasks()
}
None => Vec::new(),
}
active.take()
}
pub(crate) async fn close_unified_exec_processes(&self) {

View File

@@ -349,7 +349,7 @@ async fn shell_output_for_freeform_tool_records_duration(
let test = builder.build(&server).await?;
let call_id = "shell-structured";
let responses = shell_responses(call_id, vec!["/bin/sh", "-c", "sleep 0.2"], output_type)?;
let responses = shell_responses(call_id, vec!["/bin/sh", "-c", "sleep 1"], output_type)?;
let mock = mount_sse_sequence(&server, responses).await;
test.submit_turn_with_policy(
@@ -381,7 +381,7 @@ $"#;
.and_then(|value| value.as_str().parse::<f32>().ok())
.expect("expected structured shell output to contain wall time seconds");
assert!(
wall_time_seconds > 0.1,
wall_time_seconds > 0.5,
"expected wall time to be greater than zero seconds, got {wall_time_seconds}"
);
@@ -740,7 +740,6 @@ async fn shell_command_output_is_freeform() -> Result<()> {
let call_id = "shell-command";
let args = json!({
"command": "echo shell command",
"login": false,
"timeout_ms": 1_000,
});
let responses = vec![
@@ -792,7 +791,6 @@ async fn shell_command_output_is_not_truncated_under_10k_bytes() -> Result<()> {
let call_id = "shell-command";
let args = json!({
"command": "perl -e 'print \"1\" x 10000'",
"login": false,
"timeout_ms": 1000,
});
let responses = vec![
@@ -843,7 +841,6 @@ async fn shell_command_output_is_not_truncated_over_10k_bytes() -> Result<()> {
let call_id = "shell-command";
let args = json!({
"command": "perl -e 'print \"1\" x 10001'",
"login": false,
"timeout_ms": 1000,
});
let responses = vec![

View File

@@ -19,11 +19,8 @@ workspace = true
anyhow = { workspace = true }
clap = { workspace = true, features = ["derive"] }
codex-arg0 = { workspace = true }
codex-app-server-client = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-cloud-requirements = { workspace = true }
codex-core = { workspace = true }
codex-feedback = { workspace = true }
codex-otel = { workspace = true }
codex-protocol = { workspace = true }
codex-utils-absolute-path = { workspace = true }

File diff suppressed because it is too large Load Diff