mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
Problem: The TUI still depended on `codex-core` directly in a number of places, and we had no enforcement from keeping this problem from getting worse. Solution: Route TUI core access through `codex-app-server-client::legacy_core`, add CI enforcement for that boundary, and re-export this legacy bridge inside the TUI as `crate::legacy_core` so the remaining call sites stay readable. There is no functional change in this PR — just changes to import targets. Over time, we can whittle away at the remaining symbols in this legacy namespace with the eventual goal of removing them all. In the meantime, this linter rule will prevent us from inadvertently importing new symbols from core.
2026 lines
77 KiB
Rust
2026 lines
77 KiB
Rust
//! Shared in-process app-server client facade for CLI surfaces.
|
|
//!
|
|
//! This crate wraps [`codex_app_server::in_process`] behind a single async API
|
|
//! used by surfaces like TUI and exec. It centralizes:
|
|
//!
|
|
//! - Runtime startup and initialize-capabilities handshake.
|
|
//! - Typed caller-provided startup identity (`SessionSource` + client name).
|
|
//! - Typed and raw request/notification dispatch.
|
|
//! - Server request resolution and rejection.
|
|
//! - Event consumption with backpressure signaling ([`InProcessServerEvent::Lagged`]).
|
|
//! - Bounded graceful shutdown with abort fallback.
|
|
//!
|
|
//! The facade interposes a worker task between the caller and the underlying
|
|
//! [`InProcessClientHandle`](codex_app_server::in_process::InProcessClientHandle),
|
|
//! bridging async `mpsc` channels on both sides. Queues are bounded so overload
|
|
//! surfaces as channel-full errors rather than unbounded memory growth.
|
|
|
|
mod remote;
|
|
|
|
use std::error::Error;
|
|
use std::fmt;
|
|
use std::io::Error as IoError;
|
|
use std::io::ErrorKind;
|
|
use std::io::Result as IoResult;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
pub use codex_app_server::in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
|
|
pub use codex_app_server::in_process::InProcessServerEvent;
|
|
use codex_app_server::in_process::InProcessStartArgs;
|
|
use codex_app_server_protocol::ClientInfo;
|
|
use codex_app_server_protocol::ClientNotification;
|
|
use codex_app_server_protocol::ClientRequest;
|
|
use codex_app_server_protocol::ConfigWarningNotification;
|
|
use codex_app_server_protocol::InitializeCapabilities;
|
|
use codex_app_server_protocol::InitializeParams;
|
|
use codex_app_server_protocol::JSONRPCErrorError;
|
|
use codex_app_server_protocol::RequestId;
|
|
use codex_app_server_protocol::Result as JsonRpcResult;
|
|
use codex_app_server_protocol::ServerNotification;
|
|
use codex_app_server_protocol::ServerRequest;
|
|
use codex_arg0::Arg0DispatchPaths;
|
|
use codex_core::config::Config;
|
|
use codex_core::config_loader::CloudRequirementsLoader;
|
|
use codex_core::config_loader::LoaderOverrides;
|
|
pub use codex_exec_server::EnvironmentManager;
|
|
use codex_feedback::CodexFeedback;
|
|
use codex_protocol::protocol::SessionSource;
|
|
use serde::de::DeserializeOwned;
|
|
use tokio::sync::mpsc;
|
|
use tokio::sync::oneshot;
|
|
use tokio::time::timeout;
|
|
use toml::Value as TomlValue;
|
|
use tracing::warn;
|
|
|
|
pub use crate::remote::RemoteAppServerClient;
|
|
pub use crate::remote::RemoteAppServerConnectArgs;
|
|
|
|
/// Transitional access to core-only embedded app-server types.
|
|
///
|
|
/// New TUI behavior should prefer the app-server protocol methods. This
|
|
/// module exists so clients can remove a direct `codex-core` dependency
|
|
/// while legacy startup/config paths are migrated to RPCs.
|
|
pub mod legacy_core {
|
|
pub use codex_core::Cursor;
|
|
pub use codex_core::DEFAULT_PROJECT_DOC_FILENAME;
|
|
pub use codex_core::INTERACTIVE_SESSION_SOURCES;
|
|
pub use codex_core::LOCAL_PROJECT_DOC_FILENAME;
|
|
pub use codex_core::McpManager;
|
|
pub use codex_core::PLUGIN_TEXT_MENTION_SIGIL;
|
|
pub use codex_core::RolloutRecorder;
|
|
pub use codex_core::TOOL_MENTION_SIGIL;
|
|
pub use codex_core::ThreadItem;
|
|
pub use codex_core::ThreadSortKey;
|
|
pub use codex_core::ThreadsPage;
|
|
pub use codex_core::append_message_history_entry;
|
|
pub use codex_core::check_execpolicy_for_warnings;
|
|
pub use codex_core::discover_project_doc_paths;
|
|
pub use codex_core::find_thread_meta_by_name_str;
|
|
pub use codex_core::find_thread_name_by_id;
|
|
pub use codex_core::find_thread_names_by_ids;
|
|
pub use codex_core::format_exec_policy_error_with_source;
|
|
pub use codex_core::grant_read_root_non_elevated;
|
|
pub use codex_core::lookup_message_history_entry;
|
|
pub use codex_core::message_history_metadata;
|
|
pub use codex_core::path_utils;
|
|
pub use codex_core::read_session_meta_line;
|
|
pub use codex_core::web_search_detail;
|
|
|
|
pub mod config {
|
|
pub use codex_core::config::*;
|
|
|
|
pub mod edit {
|
|
pub use codex_core::config::edit::*;
|
|
}
|
|
}
|
|
|
|
pub mod config_loader {
|
|
pub use codex_core::config_loader::*;
|
|
}
|
|
|
|
pub mod connectors {
|
|
pub use codex_core::connectors::*;
|
|
}
|
|
|
|
pub mod otel_init {
|
|
pub use codex_core::otel_init::*;
|
|
}
|
|
|
|
pub mod personality_migration {
|
|
pub use codex_core::personality_migration::*;
|
|
}
|
|
|
|
pub mod plugins {
|
|
pub use codex_core::plugins::*;
|
|
}
|
|
|
|
pub mod review_format {
|
|
pub use codex_core::review_format::*;
|
|
}
|
|
|
|
pub mod review_prompts {
|
|
pub use codex_core::review_prompts::*;
|
|
}
|
|
|
|
pub mod skills {
|
|
pub use codex_core::skills::*;
|
|
}
|
|
|
|
pub mod test_support {
|
|
pub use codex_core::test_support::*;
|
|
}
|
|
|
|
pub mod util {
|
|
pub use codex_core::util::*;
|
|
}
|
|
|
|
pub mod windows_sandbox {
|
|
pub use codex_core::windows_sandbox::*;
|
|
}
|
|
}
|
|
|
|
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
|
|
|
|
/// Raw app-server request result for typed in-process requests.
|
|
///
|
|
/// Even on the in-process path, successful responses still travel back through
|
|
/// the same JSON-RPC result envelope used by socket/stdio transports because
|
|
/// `MessageProcessor` continues to produce that shape internally.
|
|
pub type RequestResult = std::result::Result<JsonRpcResult, JSONRPCErrorError>;
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub enum AppServerEvent {
|
|
Lagged { skipped: usize },
|
|
ServerNotification(ServerNotification),
|
|
ServerRequest(ServerRequest),
|
|
Disconnected { message: String },
|
|
}
|
|
|
|
impl From<InProcessServerEvent> for AppServerEvent {
|
|
fn from(value: InProcessServerEvent) -> Self {
|
|
match value {
|
|
InProcessServerEvent::Lagged { skipped } => Self::Lagged { skipped },
|
|
InProcessServerEvent::ServerNotification(notification) => {
|
|
Self::ServerNotification(notification)
|
|
}
|
|
InProcessServerEvent::ServerRequest(request) => Self::ServerRequest(request),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn event_requires_delivery(event: &InProcessServerEvent) -> bool {
|
|
// These transcript and terminal events must remain lossless. Dropping
|
|
// streamed assistant text or the authoritative completed item can leave
|
|
// the TUI with permanently corrupted markdown, while dropping completion
|
|
// notifications can leave surfaces waiting forever.
|
|
match event {
|
|
InProcessServerEvent::ServerNotification(notification) => {
|
|
server_notification_requires_delivery(notification)
|
|
}
|
|
_ => false,
|
|
}
|
|
}
|
|
|
|
/// Returns `true` for notifications that must survive backpressure.
|
|
///
|
|
/// Transcript events (`AgentMessageDelta`, `PlanDelta`, reasoning deltas) and
|
|
/// the authoritative `ItemCompleted` / `TurnCompleted` form the lossless tier
|
|
/// of the event stream. Dropping any of these corrupts the visible assistant
|
|
/// output or leaves surfaces waiting for a completion signal that already
|
|
/// fired. Everything else (`CommandExecutionOutputDelta`, progress, etc.) is
|
|
/// best-effort and may be dropped with only cosmetic impact.
|
|
///
|
|
/// Both the in-process and remote transports delegate to this function so the
|
|
/// classification stays in sync.
|
|
pub(crate) fn server_notification_requires_delivery(notification: &ServerNotification) -> bool {
|
|
matches!(
|
|
notification,
|
|
ServerNotification::TurnCompleted(_)
|
|
| ServerNotification::ItemCompleted(_)
|
|
| ServerNotification::AgentMessageDelta(_)
|
|
| ServerNotification::PlanDelta(_)
|
|
| ServerNotification::ReasoningSummaryTextDelta(_)
|
|
| ServerNotification::ReasoningTextDelta(_)
|
|
)
|
|
}
|
|
|
|
/// Outcome of attempting to forward a single event to the consumer channel.
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
enum ForwardEventResult {
|
|
/// The event was delivered (or intentionally dropped); the stream is healthy.
|
|
Continue,
|
|
/// The consumer channel is closed; the caller should stop producing events.
|
|
DisableStream,
|
|
}
|
|
|
|
/// Forwards a single in-process event to the consumer, respecting the
|
|
/// lossless/best-effort split.
|
|
///
|
|
/// Lossless events (transcript deltas, item/turn completions) block until the
|
|
/// consumer drains capacity. Best-effort events use `try_send` and increment
|
|
/// `skipped_events` on failure. When a lag marker needs to be flushed before a
|
|
/// lossless event, the flush itself blocks so the marker is never lost.
|
|
///
|
|
/// If a dropped event is a `ServerRequest`, `reject_server_request` is called
|
|
/// so the server does not wait for a response that will never come.
|
|
async fn forward_in_process_event<F>(
|
|
event_tx: &mpsc::Sender<InProcessServerEvent>,
|
|
skipped_events: &mut usize,
|
|
event: InProcessServerEvent,
|
|
mut reject_server_request: F,
|
|
) -> ForwardEventResult
|
|
where
|
|
F: FnMut(ServerRequest),
|
|
{
|
|
if *skipped_events > 0 {
|
|
if event_requires_delivery(&event) {
|
|
// Surface lag before the lossless event, but do not let the lag marker itself cause
|
|
// us to drop the transcript/completion notification the caller is blocked on.
|
|
if event_tx
|
|
.send(InProcessServerEvent::Lagged {
|
|
skipped: *skipped_events,
|
|
})
|
|
.await
|
|
.is_err()
|
|
{
|
|
return ForwardEventResult::DisableStream;
|
|
}
|
|
*skipped_events = 0;
|
|
} else {
|
|
match event_tx.try_send(InProcessServerEvent::Lagged {
|
|
skipped: *skipped_events,
|
|
}) {
|
|
Ok(()) => {
|
|
*skipped_events = 0;
|
|
}
|
|
Err(mpsc::error::TrySendError::Full(_)) => {
|
|
*skipped_events = skipped_events.saturating_add(1);
|
|
warn!("dropping in-process app-server event because consumer queue is full");
|
|
if let InProcessServerEvent::ServerRequest(request) = event {
|
|
reject_server_request(request);
|
|
}
|
|
return ForwardEventResult::Continue;
|
|
}
|
|
Err(mpsc::error::TrySendError::Closed(_)) => {
|
|
return ForwardEventResult::DisableStream;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if event_requires_delivery(&event) {
|
|
// Block until the consumer catches up for transcript/completion notifications; this
|
|
// preserves the visible assistant output even when the queue is otherwise saturated.
|
|
if event_tx.send(event).await.is_err() {
|
|
return ForwardEventResult::DisableStream;
|
|
}
|
|
return ForwardEventResult::Continue;
|
|
}
|
|
|
|
match event_tx.try_send(event) {
|
|
Ok(()) => ForwardEventResult::Continue,
|
|
Err(mpsc::error::TrySendError::Full(event)) => {
|
|
*skipped_events = skipped_events.saturating_add(1);
|
|
warn!("dropping in-process app-server event because consumer queue is full");
|
|
if let InProcessServerEvent::ServerRequest(request) = event {
|
|
reject_server_request(request);
|
|
}
|
|
ForwardEventResult::Continue
|
|
}
|
|
Err(mpsc::error::TrySendError::Closed(_)) => ForwardEventResult::DisableStream,
|
|
}
|
|
}
|
|
|
|
/// Layered error for [`InProcessAppServerClient::request_typed`].
|
|
///
|
|
/// This keeps transport failures, server-side JSON-RPC failures, and response
|
|
/// decode failures distinct so callers can decide whether to retry, surface a
|
|
/// server error, or treat the response as an internal request/response mismatch.
|
|
#[derive(Debug)]
|
|
pub enum TypedRequestError {
|
|
Transport {
|
|
method: String,
|
|
source: IoError,
|
|
},
|
|
Server {
|
|
method: String,
|
|
source: JSONRPCErrorError,
|
|
},
|
|
Deserialize {
|
|
method: String,
|
|
source: serde_json::Error,
|
|
},
|
|
}
|
|
|
|
impl fmt::Display for TypedRequestError {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
match self {
|
|
Self::Transport { method, source } => {
|
|
write!(f, "{method} transport error: {source}")
|
|
}
|
|
Self::Server { method, source } => {
|
|
write!(f, "{method} failed: {}", source.message)
|
|
}
|
|
Self::Deserialize { method, source } => {
|
|
write!(f, "{method} response decode error: {source}")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Error for TypedRequestError {
|
|
fn source(&self) -> Option<&(dyn Error + 'static)> {
|
|
match self {
|
|
Self::Transport { source, .. } => Some(source),
|
|
Self::Server { .. } => None,
|
|
Self::Deserialize { source, .. } => Some(source),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct InProcessClientStartArgs {
|
|
/// Resolved argv0 dispatch paths used by command execution internals.
|
|
pub arg0_paths: Arg0DispatchPaths,
|
|
/// Shared config used to initialize app-server runtime.
|
|
pub config: Arc<Config>,
|
|
/// CLI config overrides that are already parsed into TOML values.
|
|
pub cli_overrides: Vec<(String, TomlValue)>,
|
|
/// Loader override knobs used by config API paths.
|
|
pub loader_overrides: LoaderOverrides,
|
|
/// Preloaded cloud requirements provider.
|
|
pub cloud_requirements: CloudRequirementsLoader,
|
|
/// Feedback sink used by app-server/core telemetry and logs.
|
|
pub feedback: CodexFeedback,
|
|
/// Environment manager used by core execution and filesystem operations.
|
|
pub environment_manager: Arc<EnvironmentManager>,
|
|
/// Startup warnings emitted after initialize succeeds.
|
|
pub config_warnings: Vec<ConfigWarningNotification>,
|
|
/// Session source recorded in app-server thread metadata.
|
|
pub session_source: SessionSource,
|
|
/// Whether auth loading should honor the `CODEX_API_KEY` environment variable.
|
|
pub enable_codex_api_key_env: bool,
|
|
/// Client name reported during initialize.
|
|
pub client_name: String,
|
|
/// Client version reported during initialize.
|
|
pub client_version: String,
|
|
/// Whether experimental APIs are requested at initialize time.
|
|
pub experimental_api: bool,
|
|
/// Notification methods this client opts out of receiving.
|
|
pub opt_out_notification_methods: Vec<String>,
|
|
/// Queue capacity for command/event channels (clamped to at least 1).
|
|
pub channel_capacity: usize,
|
|
}
|
|
|
|
impl InProcessClientStartArgs {
|
|
/// Builds initialize params from caller-provided metadata.
|
|
pub fn initialize_params(&self) -> InitializeParams {
|
|
let capabilities = InitializeCapabilities {
|
|
experimental_api: self.experimental_api,
|
|
opt_out_notification_methods: if self.opt_out_notification_methods.is_empty() {
|
|
None
|
|
} else {
|
|
Some(self.opt_out_notification_methods.clone())
|
|
},
|
|
};
|
|
|
|
InitializeParams {
|
|
client_info: ClientInfo {
|
|
name: self.client_name.clone(),
|
|
title: None,
|
|
version: self.client_version.clone(),
|
|
},
|
|
capabilities: Some(capabilities),
|
|
}
|
|
}
|
|
|
|
fn into_runtime_start_args(self) -> InProcessStartArgs {
|
|
let initialize = self.initialize_params();
|
|
InProcessStartArgs {
|
|
arg0_paths: self.arg0_paths,
|
|
config: self.config,
|
|
cli_overrides: self.cli_overrides,
|
|
loader_overrides: self.loader_overrides,
|
|
cloud_requirements: self.cloud_requirements,
|
|
feedback: self.feedback,
|
|
environment_manager: self.environment_manager,
|
|
config_warnings: self.config_warnings,
|
|
session_source: self.session_source,
|
|
enable_codex_api_key_env: self.enable_codex_api_key_env,
|
|
initialize,
|
|
channel_capacity: self.channel_capacity,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Internal command sent from public facade methods to the worker task.
|
|
///
|
|
/// Each variant carries a oneshot sender so the caller can `await` the
|
|
/// result without holding a mutable reference to the client.
|
|
enum ClientCommand {
|
|
Request {
|
|
request: Box<ClientRequest>,
|
|
response_tx: oneshot::Sender<IoResult<RequestResult>>,
|
|
},
|
|
Notify {
|
|
notification: ClientNotification,
|
|
response_tx: oneshot::Sender<IoResult<()>>,
|
|
},
|
|
ResolveServerRequest {
|
|
request_id: RequestId,
|
|
result: JsonRpcResult,
|
|
response_tx: oneshot::Sender<IoResult<()>>,
|
|
},
|
|
RejectServerRequest {
|
|
request_id: RequestId,
|
|
error: JSONRPCErrorError,
|
|
response_tx: oneshot::Sender<IoResult<()>>,
|
|
},
|
|
Shutdown {
|
|
response_tx: oneshot::Sender<IoResult<()>>,
|
|
},
|
|
}
|
|
|
|
/// Async facade over the in-process app-server runtime.
|
|
///
|
|
/// This type owns a worker task that bridges between:
|
|
/// - caller-facing async `mpsc` channels used by TUI/exec
|
|
/// - [`codex_app_server::in_process::InProcessClientHandle`], which speaks to
|
|
/// the embedded `MessageProcessor`
|
|
///
|
|
/// The facade intentionally preserves the server's request/notification/event
|
|
/// model instead of exposing direct core runtime handles. That keeps in-process
|
|
/// callers aligned with app-server behavior while still avoiding a process
|
|
/// boundary.
|
|
pub struct InProcessAppServerClient {
|
|
command_tx: mpsc::Sender<ClientCommand>,
|
|
event_rx: mpsc::Receiver<InProcessServerEvent>,
|
|
worker_handle: tokio::task::JoinHandle<()>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct InProcessAppServerRequestHandle {
|
|
command_tx: mpsc::Sender<ClientCommand>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub enum AppServerRequestHandle {
|
|
InProcess(InProcessAppServerRequestHandle),
|
|
Remote(crate::remote::RemoteAppServerRequestHandle),
|
|
}
|
|
|
|
pub enum AppServerClient {
|
|
InProcess(InProcessAppServerClient),
|
|
Remote(RemoteAppServerClient),
|
|
}
|
|
|
|
impl InProcessAppServerClient {
|
|
/// Starts the in-process runtime and facade worker task.
|
|
///
|
|
/// The returned client is ready for requests and event consumption. If the
|
|
/// internal event queue is saturated later, server requests are rejected
|
|
/// with overload error instead of being silently dropped.
|
|
pub async fn start(args: InProcessClientStartArgs) -> IoResult<Self> {
|
|
let channel_capacity = args.channel_capacity.max(1);
|
|
let mut handle =
|
|
codex_app_server::in_process::start(args.into_runtime_start_args()).await?;
|
|
let request_sender = handle.sender();
|
|
let (command_tx, mut command_rx) = mpsc::channel::<ClientCommand>(channel_capacity);
|
|
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
|
|
|
|
let worker_handle = tokio::spawn(async move {
|
|
let mut event_stream_enabled = true;
|
|
let mut skipped_events = 0usize;
|
|
loop {
|
|
tokio::select! {
|
|
command = command_rx.recv() => {
|
|
match command {
|
|
Some(ClientCommand::Request { request, response_tx }) => {
|
|
let request_sender = request_sender.clone();
|
|
// Request waits happen on a detached task so
|
|
// this loop can keep draining runtime events
|
|
// while the request is blocked on client input.
|
|
tokio::spawn(async move {
|
|
let result = request_sender.request(*request).await;
|
|
let _ = response_tx.send(result);
|
|
});
|
|
}
|
|
Some(ClientCommand::Notify {
|
|
notification,
|
|
response_tx,
|
|
}) => {
|
|
let result = request_sender.notify(notification);
|
|
let _ = response_tx.send(result);
|
|
}
|
|
Some(ClientCommand::ResolveServerRequest {
|
|
request_id,
|
|
result,
|
|
response_tx,
|
|
}) => {
|
|
let send_result =
|
|
request_sender.respond_to_server_request(request_id, result);
|
|
let _ = response_tx.send(send_result);
|
|
}
|
|
Some(ClientCommand::RejectServerRequest {
|
|
request_id,
|
|
error,
|
|
response_tx,
|
|
}) => {
|
|
let send_result = request_sender.fail_server_request(request_id, error);
|
|
let _ = response_tx.send(send_result);
|
|
}
|
|
Some(ClientCommand::Shutdown { response_tx }) => {
|
|
let shutdown_result = handle.shutdown().await;
|
|
let _ = response_tx.send(shutdown_result);
|
|
break;
|
|
}
|
|
None => {
|
|
let _ = handle.shutdown().await;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
event = handle.next_event(), if event_stream_enabled => {
|
|
let Some(event) = event else {
|
|
break;
|
|
};
|
|
if let InProcessServerEvent::ServerRequest(
|
|
ServerRequest::ChatgptAuthTokensRefresh { request_id, .. }
|
|
) = &event
|
|
{
|
|
let send_result = request_sender.fail_server_request(
|
|
request_id.clone(),
|
|
JSONRPCErrorError {
|
|
code: -32000,
|
|
message: "chatgpt auth token refresh is not supported for in-process app-server clients".to_string(),
|
|
data: None,
|
|
},
|
|
);
|
|
if let Err(err) = send_result {
|
|
warn!(
|
|
"failed to reject unsupported chatgpt auth token refresh request: {err}"
|
|
);
|
|
}
|
|
continue;
|
|
}
|
|
|
|
match forward_in_process_event(
|
|
&event_tx,
|
|
&mut skipped_events,
|
|
event,
|
|
|request| {
|
|
let _ = request_sender.fail_server_request(
|
|
request.id().clone(),
|
|
JSONRPCErrorError {
|
|
code: -32001,
|
|
message: "in-process app-server event queue is full"
|
|
.to_string(),
|
|
data: None,
|
|
},
|
|
);
|
|
},
|
|
)
|
|
.await
|
|
{
|
|
ForwardEventResult::Continue => {}
|
|
ForwardEventResult::DisableStream => {
|
|
event_stream_enabled = false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
Ok(Self {
|
|
command_tx,
|
|
event_rx,
|
|
worker_handle,
|
|
})
|
|
}
|
|
|
|
pub fn request_handle(&self) -> InProcessAppServerRequestHandle {
|
|
InProcessAppServerRequestHandle {
|
|
command_tx: self.command_tx.clone(),
|
|
}
|
|
}
|
|
|
|
/// Sends a typed client request and returns raw JSON-RPC result.
|
|
///
|
|
/// Callers that expect a concrete response type should usually prefer
|
|
/// [`request_typed`](Self::request_typed).
|
|
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(ClientCommand::Request {
|
|
request: Box::new(request),
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server request channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
/// Sends a typed client request and decodes the successful response body.
|
|
///
|
|
/// This still deserializes from a JSON value produced by app-server's
|
|
/// JSON-RPC result envelope. Because the caller chooses `T`, `Deserialize`
|
|
/// failures indicate an internal request/response mismatch at the call site
|
|
/// (or an in-process bug), not transport skew from an external client.
|
|
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
|
|
where
|
|
T: DeserializeOwned,
|
|
{
|
|
let method = request_method_name(&request);
|
|
let response =
|
|
self.request(request)
|
|
.await
|
|
.map_err(|source| TypedRequestError::Transport {
|
|
method: method.clone(),
|
|
source,
|
|
})?;
|
|
let result = response.map_err(|source| TypedRequestError::Server {
|
|
method: method.clone(),
|
|
source,
|
|
})?;
|
|
serde_json::from_value(result)
|
|
.map_err(|source| TypedRequestError::Deserialize { method, source })
|
|
}
|
|
|
|
/// Sends a typed client notification.
|
|
pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(ClientCommand::Notify {
|
|
notification,
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server notify channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
/// Resolves a pending server request.
|
|
///
|
|
/// This should only be called with request IDs obtained from the current
|
|
/// client's event stream.
|
|
pub async fn resolve_server_request(
|
|
&self,
|
|
request_id: RequestId,
|
|
result: JsonRpcResult,
|
|
) -> IoResult<()> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(ClientCommand::ResolveServerRequest {
|
|
request_id,
|
|
result,
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server resolve channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
/// Rejects a pending server request with JSON-RPC error payload.
|
|
pub async fn reject_server_request(
|
|
&self,
|
|
request_id: RequestId,
|
|
error: JSONRPCErrorError,
|
|
) -> IoResult<()> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(ClientCommand::RejectServerRequest {
|
|
request_id,
|
|
error,
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server reject channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
/// Returns the next in-process event, or `None` when worker exits.
|
|
///
|
|
/// Callers are expected to drain this stream promptly. If they fall behind,
|
|
/// the worker emits [`InProcessServerEvent::Lagged`] markers and may reject
|
|
/// pending server requests rather than letting approval flows hang.
|
|
pub async fn next_event(&mut self) -> Option<InProcessServerEvent> {
|
|
self.event_rx.recv().await
|
|
}
|
|
|
|
/// Shuts down worker and in-process runtime with bounded wait.
|
|
///
|
|
/// If graceful shutdown exceeds timeout, the worker task is aborted to
|
|
/// avoid leaking background tasks in embedding callers.
|
|
pub async fn shutdown(self) -> IoResult<()> {
|
|
let Self {
|
|
command_tx,
|
|
event_rx,
|
|
worker_handle,
|
|
} = self;
|
|
let mut worker_handle = worker_handle;
|
|
// Drop the caller-facing receiver before asking the worker to shut
|
|
// down. That unblocks any pending must-deliver `event_tx.send(..)`
|
|
// so the worker can reach `handle.shutdown()` instead of timing out
|
|
// and getting aborted with the runtime still attached.
|
|
drop(event_rx);
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
if command_tx
|
|
.send(ClientCommand::Shutdown { response_tx })
|
|
.await
|
|
.is_ok()
|
|
&& let Ok(command_result) = timeout(SHUTDOWN_TIMEOUT, response_rx).await
|
|
{
|
|
command_result.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server shutdown channel is closed",
|
|
)
|
|
})??;
|
|
}
|
|
|
|
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut worker_handle).await {
|
|
worker_handle.abort();
|
|
let _ = worker_handle.await;
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl InProcessAppServerRequestHandle {
|
|
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(ClientCommand::Request {
|
|
request: Box::new(request),
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server request channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
|
|
where
|
|
T: DeserializeOwned,
|
|
{
|
|
let method = request_method_name(&request);
|
|
let response =
|
|
self.request(request)
|
|
.await
|
|
.map_err(|source| TypedRequestError::Transport {
|
|
method: method.clone(),
|
|
source,
|
|
})?;
|
|
let result = response.map_err(|source| TypedRequestError::Server {
|
|
method: method.clone(),
|
|
source,
|
|
})?;
|
|
serde_json::from_value(result)
|
|
.map_err(|source| TypedRequestError::Deserialize { method, source })
|
|
}
|
|
}
|
|
|
|
impl AppServerRequestHandle {
|
|
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
|
|
match self {
|
|
Self::InProcess(handle) => handle.request(request).await,
|
|
Self::Remote(handle) => handle.request(request).await,
|
|
}
|
|
}
|
|
|
|
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
|
|
where
|
|
T: DeserializeOwned,
|
|
{
|
|
match self {
|
|
Self::InProcess(handle) => handle.request_typed(request).await,
|
|
Self::Remote(handle) => handle.request_typed(request).await,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl AppServerClient {
|
|
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
|
|
match self {
|
|
Self::InProcess(client) => client.request(request).await,
|
|
Self::Remote(client) => client.request(request).await,
|
|
}
|
|
}
|
|
|
|
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
|
|
where
|
|
T: DeserializeOwned,
|
|
{
|
|
match self {
|
|
Self::InProcess(client) => client.request_typed(request).await,
|
|
Self::Remote(client) => client.request_typed(request).await,
|
|
}
|
|
}
|
|
|
|
pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> {
|
|
match self {
|
|
Self::InProcess(client) => client.notify(notification).await,
|
|
Self::Remote(client) => client.notify(notification).await,
|
|
}
|
|
}
|
|
|
|
pub async fn resolve_server_request(
|
|
&self,
|
|
request_id: RequestId,
|
|
result: JsonRpcResult,
|
|
) -> IoResult<()> {
|
|
match self {
|
|
Self::InProcess(client) => client.resolve_server_request(request_id, result).await,
|
|
Self::Remote(client) => client.resolve_server_request(request_id, result).await,
|
|
}
|
|
}
|
|
|
|
pub async fn reject_server_request(
|
|
&self,
|
|
request_id: RequestId,
|
|
error: JSONRPCErrorError,
|
|
) -> IoResult<()> {
|
|
match self {
|
|
Self::InProcess(client) => client.reject_server_request(request_id, error).await,
|
|
Self::Remote(client) => client.reject_server_request(request_id, error).await,
|
|
}
|
|
}
|
|
|
|
pub async fn next_event(&mut self) -> Option<AppServerEvent> {
|
|
match self {
|
|
Self::InProcess(client) => client.next_event().await.map(Into::into),
|
|
Self::Remote(client) => client.next_event().await,
|
|
}
|
|
}
|
|
|
|
pub async fn shutdown(self) -> IoResult<()> {
|
|
match self {
|
|
Self::InProcess(client) => client.shutdown().await,
|
|
Self::Remote(client) => client.shutdown().await,
|
|
}
|
|
}
|
|
|
|
pub fn request_handle(&self) -> AppServerRequestHandle {
|
|
match self {
|
|
Self::InProcess(client) => AppServerRequestHandle::InProcess(client.request_handle()),
|
|
Self::Remote(client) => AppServerRequestHandle::Remote(client.request_handle()),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Extracts the JSON-RPC method name for diagnostics without extending the
|
|
/// protocol crate with in-process-only helpers.
|
|
pub(crate) fn request_method_name(request: &ClientRequest) -> String {
|
|
serde_json::to_value(request)
|
|
.ok()
|
|
.and_then(|value| {
|
|
value
|
|
.get("method")
|
|
.and_then(serde_json::Value::as_str)
|
|
.map(ToOwned::to_owned)
|
|
})
|
|
.unwrap_or_else(|| "<unknown>".to_string())
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use codex_app_server_protocol::AccountUpdatedNotification;
|
|
use codex_app_server_protocol::ConfigRequirementsReadResponse;
|
|
use codex_app_server_protocol::GetAccountResponse;
|
|
use codex_app_server_protocol::JSONRPCMessage;
|
|
use codex_app_server_protocol::JSONRPCRequest;
|
|
use codex_app_server_protocol::JSONRPCResponse;
|
|
use codex_app_server_protocol::ServerNotification;
|
|
use codex_app_server_protocol::SessionSource as ApiSessionSource;
|
|
use codex_app_server_protocol::ThreadStartParams;
|
|
use codex_app_server_protocol::ThreadStartResponse;
|
|
use codex_app_server_protocol::ToolRequestUserInputParams;
|
|
use codex_app_server_protocol::ToolRequestUserInputQuestion;
|
|
use codex_core::config::ConfigBuilder;
|
|
use futures::SinkExt;
|
|
use futures::StreamExt;
|
|
use pretty_assertions::assert_eq;
|
|
use tokio::net::TcpListener;
|
|
use tokio::time::Duration;
|
|
use tokio::time::timeout;
|
|
use tokio_tungstenite::accept_hdr_async;
|
|
use tokio_tungstenite::tungstenite::Message;
|
|
use tokio_tungstenite::tungstenite::handshake::server::Request as WebSocketRequest;
|
|
use tokio_tungstenite::tungstenite::handshake::server::Response as WebSocketResponse;
|
|
use tokio_tungstenite::tungstenite::http::header::AUTHORIZATION;
|
|
|
|
async fn build_test_config() -> Config {
|
|
match ConfigBuilder::default().build().await {
|
|
Ok(config) => config,
|
|
Err(_) => Config::load_default_with_cli_overrides(Vec::new())
|
|
.expect("default config should load"),
|
|
}
|
|
}
|
|
|
|
async fn start_test_client_with_capacity(
|
|
session_source: SessionSource,
|
|
channel_capacity: usize,
|
|
) -> InProcessAppServerClient {
|
|
InProcessAppServerClient::start(InProcessClientStartArgs {
|
|
arg0_paths: Arg0DispatchPaths::default(),
|
|
config: Arc::new(build_test_config().await),
|
|
cli_overrides: Vec::new(),
|
|
loader_overrides: LoaderOverrides::default(),
|
|
cloud_requirements: CloudRequirementsLoader::default(),
|
|
feedback: CodexFeedback::new(),
|
|
environment_manager: Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)),
|
|
config_warnings: Vec::new(),
|
|
session_source,
|
|
enable_codex_api_key_env: false,
|
|
client_name: "codex-app-server-client-test".to_string(),
|
|
client_version: "0.0.0-test".to_string(),
|
|
experimental_api: true,
|
|
opt_out_notification_methods: Vec::new(),
|
|
channel_capacity,
|
|
})
|
|
.await
|
|
.expect("in-process app-server client should start")
|
|
}
|
|
|
|
async fn start_test_client(session_source: SessionSource) -> InProcessAppServerClient {
|
|
start_test_client_with_capacity(session_source, DEFAULT_IN_PROCESS_CHANNEL_CAPACITY).await
|
|
}
|
|
|
|
async fn start_test_remote_server<F, Fut>(handler: F) -> String
|
|
where
|
|
F: FnOnce(tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>) -> Fut
|
|
+ Send
|
|
+ 'static,
|
|
Fut: std::future::Future<Output = ()> + Send + 'static,
|
|
{
|
|
start_test_remote_server_with_auth(/*expected_auth_token*/ None, handler).await
|
|
}
|
|
|
|
async fn start_test_remote_server_with_auth<F, Fut>(
|
|
expected_auth_token: Option<String>,
|
|
handler: F,
|
|
) -> String
|
|
where
|
|
F: FnOnce(tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>) -> Fut
|
|
+ Send
|
|
+ 'static,
|
|
Fut: std::future::Future<Output = ()> + Send + 'static,
|
|
{
|
|
let listener = TcpListener::bind("127.0.0.1:0")
|
|
.await
|
|
.expect("listener should bind");
|
|
let addr = listener.local_addr().expect("listener address");
|
|
tokio::spawn(async move {
|
|
let (stream, _) = listener.accept().await.expect("accept should succeed");
|
|
let websocket = accept_hdr_async(
|
|
stream,
|
|
move |request: &WebSocketRequest, response: WebSocketResponse| {
|
|
let provided_auth_token = request
|
|
.headers()
|
|
.get(AUTHORIZATION)
|
|
.and_then(|value| value.to_str().ok())
|
|
.map(str::to_owned);
|
|
let expected_auth_token = expected_auth_token
|
|
.as_ref()
|
|
.map(|token| format!("Bearer {token}"));
|
|
assert_eq!(provided_auth_token, expected_auth_token);
|
|
Ok(response)
|
|
},
|
|
)
|
|
.await
|
|
.expect("websocket upgrade should succeed");
|
|
handler(websocket).await;
|
|
});
|
|
format!("ws://{addr}")
|
|
}
|
|
|
|
async fn expect_remote_initialize(
|
|
websocket: &mut tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
|
|
) {
|
|
let JSONRPCMessage::Request(request) = read_websocket_message(websocket).await else {
|
|
panic!("expected initialize request");
|
|
};
|
|
assert_eq!(request.method, "initialize");
|
|
write_websocket_message(
|
|
websocket,
|
|
JSONRPCMessage::Response(JSONRPCResponse {
|
|
id: request.id,
|
|
result: serde_json::json!({}),
|
|
}),
|
|
)
|
|
.await;
|
|
|
|
let JSONRPCMessage::Notification(notification) = read_websocket_message(websocket).await
|
|
else {
|
|
panic!("expected initialized notification");
|
|
};
|
|
assert_eq!(notification.method, "initialized");
|
|
}
|
|
|
|
async fn read_websocket_message(
|
|
websocket: &mut tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
|
|
) -> JSONRPCMessage {
|
|
loop {
|
|
let frame = websocket
|
|
.next()
|
|
.await
|
|
.expect("frame should be available")
|
|
.expect("frame should decode");
|
|
match frame {
|
|
Message::Text(text) => {
|
|
return serde_json::from_str::<JSONRPCMessage>(&text)
|
|
.expect("text frame should be valid JSON-RPC");
|
|
}
|
|
Message::Binary(_) | Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => {
|
|
continue;
|
|
}
|
|
Message::Close(_) => panic!("unexpected close frame"),
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn write_websocket_message(
|
|
websocket: &mut tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
|
|
message: JSONRPCMessage,
|
|
) {
|
|
websocket
|
|
.send(Message::Text(
|
|
serde_json::to_string(&message)
|
|
.expect("message should serialize")
|
|
.into(),
|
|
))
|
|
.await
|
|
.expect("message should send");
|
|
}
|
|
|
|
fn command_execution_output_delta_notification(delta: &str) -> ServerNotification {
|
|
ServerNotification::CommandExecutionOutputDelta(
|
|
codex_app_server_protocol::CommandExecutionOutputDeltaNotification {
|
|
thread_id: "thread".to_string(),
|
|
turn_id: "turn".to_string(),
|
|
item_id: "item".to_string(),
|
|
delta: delta.to_string(),
|
|
},
|
|
)
|
|
}
|
|
|
|
fn agent_message_delta_notification(delta: &str) -> ServerNotification {
|
|
ServerNotification::AgentMessageDelta(
|
|
codex_app_server_protocol::AgentMessageDeltaNotification {
|
|
thread_id: "thread".to_string(),
|
|
turn_id: "turn".to_string(),
|
|
item_id: "item".to_string(),
|
|
delta: delta.to_string(),
|
|
},
|
|
)
|
|
}
|
|
|
|
fn item_completed_notification(text: &str) -> ServerNotification {
|
|
ServerNotification::ItemCompleted(codex_app_server_protocol::ItemCompletedNotification {
|
|
thread_id: "thread".to_string(),
|
|
turn_id: "turn".to_string(),
|
|
item: codex_app_server_protocol::ThreadItem::AgentMessage {
|
|
id: "item".to_string(),
|
|
text: text.to_string(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
},
|
|
})
|
|
}
|
|
|
|
fn turn_completed_notification() -> ServerNotification {
|
|
ServerNotification::TurnCompleted(codex_app_server_protocol::TurnCompletedNotification {
|
|
thread_id: "thread".to_string(),
|
|
turn: codex_app_server_protocol::Turn {
|
|
id: "turn".to_string(),
|
|
items: Vec::new(),
|
|
status: codex_app_server_protocol::TurnStatus::Completed,
|
|
error: None,
|
|
started_at: None,
|
|
completed_at: Some(0),
|
|
duration_ms: Some(1),
|
|
},
|
|
})
|
|
}
|
|
|
|
fn test_remote_connect_args(websocket_url: String) -> RemoteAppServerConnectArgs {
|
|
RemoteAppServerConnectArgs {
|
|
websocket_url,
|
|
auth_token: None,
|
|
client_name: "codex-app-server-client-test".to_string(),
|
|
client_version: "0.0.0-test".to_string(),
|
|
experimental_api: true,
|
|
opt_out_notification_methods: Vec::new(),
|
|
channel_capacity: 8,
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn typed_request_roundtrip_works() {
|
|
let client = start_test_client(SessionSource::Exec).await;
|
|
let _response: ConfigRequirementsReadResponse = client
|
|
.request_typed(ClientRequest::ConfigRequirementsRead {
|
|
request_id: RequestId::Integer(1),
|
|
params: None,
|
|
})
|
|
.await
|
|
.expect("typed request should succeed");
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn typed_request_reports_json_rpc_errors() {
|
|
let client = start_test_client(SessionSource::Exec).await;
|
|
let err = client
|
|
.request_typed::<ConfigRequirementsReadResponse>(ClientRequest::ThreadRead {
|
|
request_id: RequestId::Integer(99),
|
|
params: codex_app_server_protocol::ThreadReadParams {
|
|
thread_id: "missing-thread".to_string(),
|
|
include_turns: false,
|
|
},
|
|
})
|
|
.await
|
|
.expect_err("missing thread should return a JSON-RPC error");
|
|
assert!(
|
|
err.to_string().starts_with("thread/read failed:"),
|
|
"expected method-qualified JSON-RPC failure message"
|
|
);
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn caller_provided_session_source_is_applied() {
|
|
for (session_source, expected_source) in [
|
|
(SessionSource::Exec, ApiSessionSource::Exec),
|
|
(SessionSource::Cli, ApiSessionSource::Cli),
|
|
] {
|
|
let client = start_test_client(session_source).await;
|
|
let parsed: ThreadStartResponse = client
|
|
.request_typed(ClientRequest::ThreadStart {
|
|
request_id: RequestId::Integer(2),
|
|
params: ThreadStartParams {
|
|
ephemeral: Some(true),
|
|
..ThreadStartParams::default()
|
|
},
|
|
})
|
|
.await
|
|
.expect("thread/start should succeed");
|
|
assert_eq!(parsed.thread.source, expected_source);
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn threads_started_via_app_server_are_visible_through_typed_requests() {
|
|
let client = start_test_client(SessionSource::Cli).await;
|
|
|
|
let response: ThreadStartResponse = client
|
|
.request_typed(ClientRequest::ThreadStart {
|
|
request_id: RequestId::Integer(3),
|
|
params: ThreadStartParams {
|
|
ephemeral: Some(true),
|
|
..ThreadStartParams::default()
|
|
},
|
|
})
|
|
.await
|
|
.expect("thread/start should succeed");
|
|
let read = client
|
|
.request_typed::<codex_app_server_protocol::ThreadReadResponse>(
|
|
ClientRequest::ThreadRead {
|
|
request_id: RequestId::Integer(4),
|
|
params: codex_app_server_protocol::ThreadReadParams {
|
|
thread_id: response.thread.id.clone(),
|
|
include_turns: false,
|
|
},
|
|
},
|
|
)
|
|
.await
|
|
.expect("thread/read should return the newly started thread");
|
|
assert_eq!(read.thread.id, response.thread.id);
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn tiny_channel_capacity_still_supports_request_roundtrip() {
|
|
let client =
|
|
start_test_client_with_capacity(SessionSource::Exec, /*channel_capacity*/ 1).await;
|
|
let _response: ConfigRequirementsReadResponse = client
|
|
.request_typed(ClientRequest::ConfigRequirementsRead {
|
|
request_id: RequestId::Integer(1),
|
|
params: None,
|
|
})
|
|
.await
|
|
.expect("typed request should succeed");
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn forward_in_process_event_preserves_transcript_notifications_under_backpressure() {
|
|
let (event_tx, mut event_rx) = mpsc::channel(1);
|
|
event_tx
|
|
.send(InProcessServerEvent::ServerNotification(
|
|
command_execution_output_delta_notification("stdout-1"),
|
|
))
|
|
.await
|
|
.expect("initial event should enqueue");
|
|
|
|
let mut skipped_events = 0usize;
|
|
let result = forward_in_process_event(
|
|
&event_tx,
|
|
&mut skipped_events,
|
|
InProcessServerEvent::ServerNotification(command_execution_output_delta_notification(
|
|
"stdout-2",
|
|
)),
|
|
|_| {},
|
|
)
|
|
.await;
|
|
assert_eq!(result, ForwardEventResult::Continue);
|
|
assert_eq!(skipped_events, 1);
|
|
|
|
let receive_task = tokio::spawn(async move {
|
|
let mut events = Vec::new();
|
|
for _ in 0..5 {
|
|
events.push(
|
|
timeout(Duration::from_secs(2), event_rx.recv())
|
|
.await
|
|
.expect("event should arrive before timeout")
|
|
.expect("event stream should stay open"),
|
|
);
|
|
}
|
|
events
|
|
});
|
|
|
|
for notification in [
|
|
agent_message_delta_notification("hello"),
|
|
item_completed_notification("hello"),
|
|
turn_completed_notification(),
|
|
] {
|
|
let result = forward_in_process_event(
|
|
&event_tx,
|
|
&mut skipped_events,
|
|
InProcessServerEvent::ServerNotification(notification),
|
|
|_| {},
|
|
)
|
|
.await;
|
|
assert_eq!(result, ForwardEventResult::Continue);
|
|
}
|
|
assert_eq!(skipped_events, 0);
|
|
|
|
let events = receive_task
|
|
.await
|
|
.expect("receiver task should join successfully");
|
|
assert!(matches!(
|
|
&events[0],
|
|
InProcessServerEvent::ServerNotification(
|
|
ServerNotification::CommandExecutionOutputDelta(notification)
|
|
) if notification.delta == "stdout-1"
|
|
));
|
|
assert!(matches!(
|
|
&events[1],
|
|
InProcessServerEvent::Lagged { skipped: 1 }
|
|
));
|
|
assert!(matches!(
|
|
&events[2],
|
|
InProcessServerEvent::ServerNotification(ServerNotification::AgentMessageDelta(
|
|
notification
|
|
)) if notification.delta == "hello"
|
|
));
|
|
assert!(matches!(
|
|
&events[3],
|
|
InProcessServerEvent::ServerNotification(ServerNotification::ItemCompleted(
|
|
notification
|
|
)) if matches!(
|
|
¬ification.item,
|
|
codex_app_server_protocol::ThreadItem::AgentMessage { text, .. } if text == "hello"
|
|
)
|
|
));
|
|
assert!(matches!(
|
|
&events[4],
|
|
InProcessServerEvent::ServerNotification(ServerNotification::TurnCompleted(
|
|
notification
|
|
)) if notification.turn.status == codex_app_server_protocol::TurnStatus::Completed
|
|
));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_typed_request_roundtrip_works() {
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
let JSONRPCMessage::Request(request) = read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected account/read request");
|
|
};
|
|
assert_eq!(request.method, "account/read");
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Response(JSONRPCResponse {
|
|
id: request.id,
|
|
result: serde_json::to_value(GetAccountResponse {
|
|
account: None,
|
|
requires_openai_auth: false,
|
|
})
|
|
.expect("response should serialize"),
|
|
}),
|
|
)
|
|
.await;
|
|
websocket.close(None).await.expect("close should succeed");
|
|
})
|
|
.await;
|
|
let client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
let response: GetAccountResponse = client
|
|
.request_typed(ClientRequest::GetAccount {
|
|
request_id: RequestId::Integer(1),
|
|
params: codex_app_server_protocol::GetAccountParams {
|
|
refresh_token: false,
|
|
},
|
|
})
|
|
.await
|
|
.expect("typed request should succeed");
|
|
assert_eq!(response.account, None);
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_connect_includes_auth_header_when_configured() {
|
|
let auth_token = "remote-bearer-token".to_string();
|
|
let websocket_url = start_test_remote_server_with_auth(
|
|
Some(auth_token.clone()),
|
|
|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
websocket.close(None).await.expect("close should succeed");
|
|
},
|
|
)
|
|
.await;
|
|
let client = RemoteAppServerClient::connect(RemoteAppServerConnectArgs {
|
|
auth_token: Some(auth_token),
|
|
..test_remote_connect_args(websocket_url)
|
|
})
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_connect_rejects_non_loopback_ws_when_auth_configured() {
|
|
let result = RemoteAppServerClient::connect(RemoteAppServerConnectArgs {
|
|
websocket_url: "ws://example.com:4500".to_string(),
|
|
auth_token: Some("remote-bearer-token".to_string()),
|
|
..test_remote_connect_args("ws://127.0.0.1:1".to_string())
|
|
})
|
|
.await;
|
|
let err = match result {
|
|
Ok(_) => panic!("non-loopback ws should be rejected before connect"),
|
|
Err(err) => err,
|
|
};
|
|
assert_eq!(err.kind(), ErrorKind::InvalidInput);
|
|
assert!(
|
|
err.to_string()
|
|
.contains("remote auth tokens require `wss://` or loopback `ws://` URLs")
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn remote_auth_token_transport_policy_allows_wss_and_loopback_ws() {
|
|
assert!(crate::remote::websocket_url_supports_auth_token(
|
|
&url::Url::parse("wss://example.com:443").expect("wss URL should parse")
|
|
));
|
|
assert!(crate::remote::websocket_url_supports_auth_token(
|
|
&url::Url::parse("ws://127.0.0.1:4500").expect("loopback ws URL should parse")
|
|
));
|
|
assert!(!crate::remote::websocket_url_supports_auth_token(
|
|
&url::Url::parse("ws://example.com:4500").expect("non-loopback ws URL should parse")
|
|
));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_duplicate_request_id_keeps_original_waiter() {
|
|
let (first_request_seen_tx, first_request_seen_rx) = tokio::sync::oneshot::channel();
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
let JSONRPCMessage::Request(request) = read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected account/read request");
|
|
};
|
|
assert_eq!(request.method, "account/read");
|
|
first_request_seen_tx
|
|
.send(request.id.clone())
|
|
.expect("request id should send");
|
|
assert!(
|
|
timeout(
|
|
Duration::from_millis(100),
|
|
read_websocket_message(&mut websocket)
|
|
)
|
|
.await
|
|
.is_err(),
|
|
"duplicate request should not be forwarded to the server"
|
|
);
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Response(JSONRPCResponse {
|
|
id: request.id,
|
|
result: serde_json::to_value(GetAccountResponse {
|
|
account: None,
|
|
requires_openai_auth: false,
|
|
})
|
|
.expect("response should serialize"),
|
|
}),
|
|
)
|
|
.await;
|
|
let _ = websocket.next().await;
|
|
})
|
|
.await;
|
|
let client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
let first_request_handle = client.request_handle();
|
|
let second_request_handle = first_request_handle.clone();
|
|
|
|
let first_request = tokio::spawn(async move {
|
|
first_request_handle
|
|
.request_typed::<GetAccountResponse>(ClientRequest::GetAccount {
|
|
request_id: RequestId::Integer(1),
|
|
params: codex_app_server_protocol::GetAccountParams {
|
|
refresh_token: false,
|
|
},
|
|
})
|
|
.await
|
|
});
|
|
|
|
let first_request_id = first_request_seen_rx
|
|
.await
|
|
.expect("server should observe the first request");
|
|
assert_eq!(first_request_id, RequestId::Integer(1));
|
|
|
|
let second_err = second_request_handle
|
|
.request_typed::<GetAccountResponse>(ClientRequest::GetAccount {
|
|
request_id: RequestId::Integer(1),
|
|
params: codex_app_server_protocol::GetAccountParams {
|
|
refresh_token: false,
|
|
},
|
|
})
|
|
.await
|
|
.expect_err("duplicate request id should be rejected");
|
|
assert_eq!(
|
|
second_err.to_string(),
|
|
"account/read transport error: duplicate remote app-server request id `1`"
|
|
);
|
|
|
|
let first_response = first_request
|
|
.await
|
|
.expect("first request task should join")
|
|
.expect("first request should succeed");
|
|
assert_eq!(
|
|
first_response,
|
|
GetAccountResponse {
|
|
account: None,
|
|
requires_openai_auth: false,
|
|
}
|
|
);
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_notifications_arrive_over_websocket() {
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Notification(
|
|
serde_json::from_value(
|
|
serde_json::to_value(ServerNotification::AccountUpdated(
|
|
AccountUpdatedNotification {
|
|
auth_mode: None,
|
|
plan_type: None,
|
|
},
|
|
))
|
|
.expect("notification should serialize"),
|
|
)
|
|
.expect("notification should convert to JSON-RPC"),
|
|
),
|
|
)
|
|
.await;
|
|
})
|
|
.await;
|
|
let mut client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
let event = client.next_event().await.expect("event should arrive");
|
|
assert!(matches!(
|
|
event,
|
|
AppServerEvent::ServerNotification(ServerNotification::AccountUpdated(_))
|
|
));
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_backpressure_preserves_transcript_notifications() {
|
|
let (done_tx, done_rx) = tokio::sync::oneshot::channel();
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
for notification in [
|
|
command_execution_output_delta_notification("stdout-1"),
|
|
command_execution_output_delta_notification("stdout-2"),
|
|
agent_message_delta_notification("hello"),
|
|
item_completed_notification("hello"),
|
|
turn_completed_notification(),
|
|
] {
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Notification(
|
|
serde_json::from_value(
|
|
serde_json::to_value(notification)
|
|
.expect("notification should serialize"),
|
|
)
|
|
.expect("notification should convert to JSON-RPC"),
|
|
),
|
|
)
|
|
.await;
|
|
}
|
|
let _ = done_rx.await;
|
|
})
|
|
.await;
|
|
let mut client = RemoteAppServerClient::connect(RemoteAppServerConnectArgs {
|
|
websocket_url,
|
|
auth_token: None,
|
|
client_name: "codex-app-server-client-test".to_string(),
|
|
client_version: "0.0.0-test".to_string(),
|
|
experimental_api: true,
|
|
opt_out_notification_methods: Vec::new(),
|
|
channel_capacity: 1,
|
|
})
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
let first_event = timeout(Duration::from_secs(2), client.next_event())
|
|
.await
|
|
.expect("first event should arrive before timeout")
|
|
.expect("event stream should stay open");
|
|
assert!(matches!(
|
|
first_event,
|
|
AppServerEvent::ServerNotification(ServerNotification::CommandExecutionOutputDelta(
|
|
notification
|
|
)) if notification.delta == "stdout-1"
|
|
));
|
|
|
|
let mut remaining_events = Vec::new();
|
|
for _ in 0..4 {
|
|
remaining_events.push(
|
|
timeout(Duration::from_secs(2), client.next_event())
|
|
.await
|
|
.expect("event should arrive before timeout")
|
|
.expect("event stream should stay open"),
|
|
);
|
|
}
|
|
|
|
let mut transcript_event_names = Vec::new();
|
|
for event in &remaining_events {
|
|
match event {
|
|
AppServerEvent::Lagged { skipped: 1 } => {}
|
|
AppServerEvent::ServerNotification(
|
|
ServerNotification::CommandExecutionOutputDelta(notification),
|
|
) if notification.delta == "stdout-2" => {}
|
|
AppServerEvent::ServerNotification(ServerNotification::AgentMessageDelta(
|
|
notification,
|
|
)) if notification.delta == "hello" => {
|
|
transcript_event_names.push("agent_message_delta");
|
|
}
|
|
AppServerEvent::ServerNotification(ServerNotification::ItemCompleted(
|
|
notification,
|
|
)) if matches!(
|
|
¬ification.item,
|
|
codex_app_server_protocol::ThreadItem::AgentMessage { text, .. } if text == "hello"
|
|
) =>
|
|
{
|
|
transcript_event_names.push("item_completed");
|
|
}
|
|
AppServerEvent::ServerNotification(ServerNotification::TurnCompleted(
|
|
notification,
|
|
)) if notification.turn.status
|
|
== codex_app_server_protocol::TurnStatus::Completed =>
|
|
{
|
|
transcript_event_names.push("turn_completed");
|
|
}
|
|
_ => panic!("unexpected remaining event: {event:?}"),
|
|
}
|
|
}
|
|
assert_eq!(
|
|
transcript_event_names,
|
|
vec!["agent_message_delta", "item_completed", "turn_completed"]
|
|
);
|
|
|
|
done_tx
|
|
.send(())
|
|
.expect("server completion signal should send");
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_server_request_resolution_roundtrip_works() {
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
let request_id = RequestId::String("srv-1".to_string());
|
|
let server_request = JSONRPCRequest {
|
|
id: request_id.clone(),
|
|
method: "item/tool/requestUserInput".to_string(),
|
|
params: Some(
|
|
serde_json::to_value(ToolRequestUserInputParams {
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
item_id: "call-1".to_string(),
|
|
questions: vec![ToolRequestUserInputQuestion {
|
|
id: "question-1".to_string(),
|
|
header: "Mode".to_string(),
|
|
question: "Pick one".to_string(),
|
|
is_other: false,
|
|
is_secret: false,
|
|
options: Some(vec![]),
|
|
}],
|
|
})
|
|
.expect("params should serialize"),
|
|
),
|
|
trace: None,
|
|
};
|
|
write_websocket_message(&mut websocket, JSONRPCMessage::Request(server_request)).await;
|
|
|
|
let JSONRPCMessage::Response(response) = read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected server request response");
|
|
};
|
|
assert_eq!(response.id, request_id);
|
|
})
|
|
.await;
|
|
let mut client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
let AppServerEvent::ServerRequest(request) = client
|
|
.next_event()
|
|
.await
|
|
.expect("request event should arrive")
|
|
else {
|
|
panic!("expected server request event");
|
|
};
|
|
client
|
|
.resolve_server_request(request.id().clone(), serde_json::json!({}))
|
|
.await
|
|
.expect("server request should resolve");
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_server_request_received_during_initialize_is_delivered() {
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
let JSONRPCMessage::Request(request) = read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected initialize request");
|
|
};
|
|
assert_eq!(request.method, "initialize");
|
|
|
|
let request_id = RequestId::String("srv-init".to_string());
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Request(JSONRPCRequest {
|
|
id: request_id.clone(),
|
|
method: "item/tool/requestUserInput".to_string(),
|
|
params: Some(
|
|
serde_json::to_value(ToolRequestUserInputParams {
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
item_id: "call-1".to_string(),
|
|
questions: vec![ToolRequestUserInputQuestion {
|
|
id: "question-1".to_string(),
|
|
header: "Mode".to_string(),
|
|
question: "Pick one".to_string(),
|
|
is_other: false,
|
|
is_secret: false,
|
|
options: Some(vec![]),
|
|
}],
|
|
})
|
|
.expect("params should serialize"),
|
|
),
|
|
trace: None,
|
|
}),
|
|
)
|
|
.await;
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Response(JSONRPCResponse {
|
|
id: request.id,
|
|
result: serde_json::json!({}),
|
|
}),
|
|
)
|
|
.await;
|
|
|
|
let JSONRPCMessage::Notification(notification) =
|
|
read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected initialized notification");
|
|
};
|
|
assert_eq!(notification.method, "initialized");
|
|
|
|
let JSONRPCMessage::Response(response) = read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected server request response");
|
|
};
|
|
assert_eq!(response.id, request_id);
|
|
})
|
|
.await;
|
|
let mut client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
let AppServerEvent::ServerRequest(request) = client
|
|
.next_event()
|
|
.await
|
|
.expect("request event should arrive")
|
|
else {
|
|
panic!("expected server request event");
|
|
};
|
|
client
|
|
.resolve_server_request(request.id().clone(), serde_json::json!({}))
|
|
.await
|
|
.expect("server request should resolve");
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_unknown_server_request_is_rejected() {
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
let request_id = RequestId::String("srv-unknown".to_string());
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Request(JSONRPCRequest {
|
|
id: request_id.clone(),
|
|
method: "thread/unknown".to_string(),
|
|
params: None,
|
|
trace: None,
|
|
}),
|
|
)
|
|
.await;
|
|
|
|
let JSONRPCMessage::Error(response) = read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected JSON-RPC error response");
|
|
};
|
|
assert_eq!(response.id, request_id);
|
|
assert_eq!(response.error.code, -32601);
|
|
assert_eq!(
|
|
response.error.message,
|
|
"unsupported remote app-server request `thread/unknown`"
|
|
);
|
|
})
|
|
.await;
|
|
let client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_disconnect_surfaces_as_event() {
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
websocket.close(None).await.expect("close should succeed");
|
|
})
|
|
.await;
|
|
let mut client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
let event = client
|
|
.next_event()
|
|
.await
|
|
.expect("disconnect event should arrive");
|
|
assert!(matches!(event, AppServerEvent::Disconnected { .. }));
|
|
}
|
|
|
|
#[test]
|
|
fn typed_request_error_exposes_sources() {
|
|
let transport = TypedRequestError::Transport {
|
|
method: "config/read".to_string(),
|
|
source: IoError::new(ErrorKind::BrokenPipe, "closed"),
|
|
};
|
|
assert_eq!(std::error::Error::source(&transport).is_some(), true);
|
|
|
|
let server = TypedRequestError::Server {
|
|
method: "thread/read".to_string(),
|
|
source: JSONRPCErrorError {
|
|
code: -32603,
|
|
data: None,
|
|
message: "internal".to_string(),
|
|
},
|
|
};
|
|
assert_eq!(std::error::Error::source(&server).is_some(), false);
|
|
|
|
let deserialize = TypedRequestError::Deserialize {
|
|
method: "thread/start".to_string(),
|
|
source: serde_json::from_str::<u32>("\"nope\"")
|
|
.expect_err("invalid integer should return deserialize error"),
|
|
};
|
|
assert_eq!(std::error::Error::source(&deserialize).is_some(), true);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn next_event_surfaces_lagged_markers() {
|
|
let (command_tx, _command_rx) = mpsc::channel(1);
|
|
let (event_tx, event_rx) = mpsc::channel(1);
|
|
let worker_handle = tokio::spawn(async {});
|
|
event_tx
|
|
.send(InProcessServerEvent::Lagged { skipped: 3 })
|
|
.await
|
|
.expect("lagged marker should enqueue");
|
|
drop(event_tx);
|
|
|
|
let mut client = InProcessAppServerClient {
|
|
command_tx,
|
|
event_rx,
|
|
worker_handle,
|
|
};
|
|
|
|
let event = timeout(Duration::from_secs(2), client.next_event())
|
|
.await
|
|
.expect("lagged marker should arrive before timeout");
|
|
assert!(matches!(
|
|
event,
|
|
Some(InProcessServerEvent::Lagged { skipped: 3 })
|
|
));
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[test]
|
|
fn event_requires_delivery_marks_transcript_and_terminal_events() {
|
|
assert!(event_requires_delivery(
|
|
&InProcessServerEvent::ServerNotification(
|
|
codex_app_server_protocol::ServerNotification::TurnCompleted(
|
|
codex_app_server_protocol::TurnCompletedNotification {
|
|
thread_id: "thread".to_string(),
|
|
turn: codex_app_server_protocol::Turn {
|
|
id: "turn".to_string(),
|
|
items: Vec::new(),
|
|
status: codex_app_server_protocol::TurnStatus::Completed,
|
|
error: None,
|
|
started_at: None,
|
|
completed_at: Some(0),
|
|
duration_ms: None,
|
|
},
|
|
}
|
|
)
|
|
)
|
|
));
|
|
assert!(event_requires_delivery(
|
|
&InProcessServerEvent::ServerNotification(
|
|
codex_app_server_protocol::ServerNotification::AgentMessageDelta(
|
|
codex_app_server_protocol::AgentMessageDeltaNotification {
|
|
thread_id: "thread".to_string(),
|
|
turn_id: "turn".to_string(),
|
|
item_id: "item".to_string(),
|
|
delta: "hello".to_string(),
|
|
}
|
|
)
|
|
)
|
|
));
|
|
assert!(event_requires_delivery(
|
|
&InProcessServerEvent::ServerNotification(
|
|
codex_app_server_protocol::ServerNotification::ItemCompleted(
|
|
codex_app_server_protocol::ItemCompletedNotification {
|
|
thread_id: "thread".to_string(),
|
|
turn_id: "turn".to_string(),
|
|
item: codex_app_server_protocol::ThreadItem::AgentMessage {
|
|
id: "item".to_string(),
|
|
text: "hello".to_string(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
},
|
|
}
|
|
)
|
|
)
|
|
));
|
|
assert!(!event_requires_delivery(&InProcessServerEvent::Lagged {
|
|
skipped: 1
|
|
}));
|
|
assert!(!event_requires_delivery(
|
|
&InProcessServerEvent::ServerNotification(
|
|
codex_app_server_protocol::ServerNotification::CommandExecutionOutputDelta(
|
|
codex_app_server_protocol::CommandExecutionOutputDeltaNotification {
|
|
thread_id: "thread".to_string(),
|
|
turn_id: "turn".to_string(),
|
|
item_id: "item".to_string(),
|
|
delta: "stdout".to_string(),
|
|
}
|
|
)
|
|
)
|
|
));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn runtime_start_args_forward_environment_manager() {
|
|
let config = Arc::new(build_test_config().await);
|
|
let environment_manager = Arc::new(EnvironmentManager::new(Some(
|
|
"ws://127.0.0.1:8765".to_string(),
|
|
)));
|
|
|
|
let runtime_args = InProcessClientStartArgs {
|
|
arg0_paths: Arg0DispatchPaths::default(),
|
|
config: config.clone(),
|
|
cli_overrides: Vec::new(),
|
|
loader_overrides: LoaderOverrides::default(),
|
|
cloud_requirements: CloudRequirementsLoader::default(),
|
|
feedback: CodexFeedback::new(),
|
|
environment_manager: environment_manager.clone(),
|
|
config_warnings: Vec::new(),
|
|
session_source: SessionSource::Exec,
|
|
enable_codex_api_key_env: false,
|
|
client_name: "codex-app-server-client-test".to_string(),
|
|
client_version: "0.0.0-test".to_string(),
|
|
experimental_api: true,
|
|
opt_out_notification_methods: Vec::new(),
|
|
channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
|
|
}
|
|
.into_runtime_start_args();
|
|
|
|
assert_eq!(runtime_args.config, config);
|
|
assert!(Arc::ptr_eq(
|
|
&runtime_args.environment_manager,
|
|
&environment_manager
|
|
));
|
|
assert!(runtime_args.environment_manager.is_remote());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn shutdown_completes_promptly_without_retained_managers() {
|
|
let client = start_test_client(SessionSource::Cli).await;
|
|
|
|
timeout(Duration::from_secs(1), client.shutdown())
|
|
.await
|
|
.expect("shutdown should not wait for the 5s fallback timeout")
|
|
.expect("shutdown should complete");
|
|
}
|
|
}
|