mirror of
https://github.com/openai/codex.git
synced 2026-04-23 22:24:57 +00:00
## Summary - run exec-server filesystem RPCs requiring sandboxing through a `codex-fs` arg0 helper over stdin/stdout - keep direct local filesystem execution for `DangerFullAccess` and external sandbox policies - remove the standalone exec-server binary path in favor of top-level arg0 dispatch/runtime paths - add sandbox escape regression coverage for local and remote filesystem paths ## Validation - `just fmt` - `git diff --check` - remote devbox: `cd codex-rs && bazel test --bes_backend= --bes_results_url= //codex-rs/exec-server:all` (6/6 passed) --------- Co-authored-by: Codex <noreply@openai.com>
2027 lines
77 KiB
Rust
2027 lines
77 KiB
Rust
//! Shared in-process app-server client facade for CLI surfaces.
|
|
//!
|
|
//! This crate wraps [`codex_app_server::in_process`] behind a single async API
|
|
//! used by surfaces like TUI and exec. It centralizes:
|
|
//!
|
|
//! - Runtime startup and initialize-capabilities handshake.
|
|
//! - Typed caller-provided startup identity (`SessionSource` + client name).
|
|
//! - Typed and raw request/notification dispatch.
|
|
//! - Server request resolution and rejection.
|
|
//! - Event consumption with backpressure signaling ([`InProcessServerEvent::Lagged`]).
|
|
//! - Bounded graceful shutdown with abort fallback.
|
|
//!
|
|
//! The facade interposes a worker task between the caller and the underlying
|
|
//! [`InProcessClientHandle`](codex_app_server::in_process::InProcessClientHandle),
|
|
//! bridging async `mpsc` channels on both sides. Queues are bounded so overload
|
|
//! surfaces as channel-full errors rather than unbounded memory growth.
|
|
|
|
mod remote;
|
|
|
|
use std::error::Error;
|
|
use std::fmt;
|
|
use std::io::Error as IoError;
|
|
use std::io::ErrorKind;
|
|
use std::io::Result as IoResult;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
pub use codex_app_server::in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
|
|
pub use codex_app_server::in_process::InProcessServerEvent;
|
|
use codex_app_server::in_process::InProcessStartArgs;
|
|
use codex_app_server_protocol::ClientInfo;
|
|
use codex_app_server_protocol::ClientNotification;
|
|
use codex_app_server_protocol::ClientRequest;
|
|
use codex_app_server_protocol::ConfigWarningNotification;
|
|
use codex_app_server_protocol::InitializeCapabilities;
|
|
use codex_app_server_protocol::InitializeParams;
|
|
use codex_app_server_protocol::JSONRPCErrorError;
|
|
use codex_app_server_protocol::RequestId;
|
|
use codex_app_server_protocol::Result as JsonRpcResult;
|
|
use codex_app_server_protocol::ServerNotification;
|
|
use codex_app_server_protocol::ServerRequest;
|
|
use codex_arg0::Arg0DispatchPaths;
|
|
use codex_core::config::Config;
|
|
use codex_core::config_loader::CloudRequirementsLoader;
|
|
use codex_core::config_loader::LoaderOverrides;
|
|
pub use codex_exec_server::EnvironmentManager;
|
|
pub use codex_exec_server::ExecServerRuntimePaths;
|
|
use codex_feedback::CodexFeedback;
|
|
use codex_protocol::protocol::SessionSource;
|
|
use serde::de::DeserializeOwned;
|
|
use tokio::sync::mpsc;
|
|
use tokio::sync::oneshot;
|
|
use tokio::time::timeout;
|
|
use toml::Value as TomlValue;
|
|
use tracing::warn;
|
|
|
|
pub use crate::remote::RemoteAppServerClient;
|
|
pub use crate::remote::RemoteAppServerConnectArgs;
|
|
|
|
/// Transitional access to core-only embedded app-server types.
|
|
///
|
|
/// New TUI behavior should prefer the app-server protocol methods. This
|
|
/// module exists so clients can remove a direct `codex-core` dependency
|
|
/// while legacy startup/config paths are migrated to RPCs.
|
|
pub mod legacy_core {
|
|
pub use codex_core::Cursor;
|
|
pub use codex_core::DEFAULT_PROJECT_DOC_FILENAME;
|
|
pub use codex_core::INTERACTIVE_SESSION_SOURCES;
|
|
pub use codex_core::LOCAL_PROJECT_DOC_FILENAME;
|
|
pub use codex_core::McpManager;
|
|
pub use codex_core::PLUGIN_TEXT_MENTION_SIGIL;
|
|
pub use codex_core::RolloutRecorder;
|
|
pub use codex_core::TOOL_MENTION_SIGIL;
|
|
pub use codex_core::ThreadItem;
|
|
pub use codex_core::ThreadSortKey;
|
|
pub use codex_core::ThreadsPage;
|
|
pub use codex_core::append_message_history_entry;
|
|
pub use codex_core::check_execpolicy_for_warnings;
|
|
pub use codex_core::discover_project_doc_paths;
|
|
pub use codex_core::find_thread_meta_by_name_str;
|
|
pub use codex_core::find_thread_name_by_id;
|
|
pub use codex_core::find_thread_names_by_ids;
|
|
pub use codex_core::format_exec_policy_error_with_source;
|
|
pub use codex_core::grant_read_root_non_elevated;
|
|
pub use codex_core::lookup_message_history_entry;
|
|
pub use codex_core::message_history_metadata;
|
|
pub use codex_core::path_utils;
|
|
pub use codex_core::read_session_meta_line;
|
|
pub use codex_core::web_search_detail;
|
|
|
|
pub mod config {
|
|
pub use codex_core::config::*;
|
|
|
|
pub mod edit {
|
|
pub use codex_core::config::edit::*;
|
|
}
|
|
}
|
|
|
|
pub mod config_loader {
|
|
pub use codex_core::config_loader::*;
|
|
}
|
|
|
|
pub mod connectors {
|
|
pub use codex_core::connectors::*;
|
|
}
|
|
|
|
pub mod otel_init {
|
|
pub use codex_core::otel_init::*;
|
|
}
|
|
|
|
pub mod personality_migration {
|
|
pub use codex_core::personality_migration::*;
|
|
}
|
|
|
|
pub mod plugins {
|
|
pub use codex_core::plugins::*;
|
|
}
|
|
|
|
pub mod review_format {
|
|
pub use codex_core::review_format::*;
|
|
}
|
|
|
|
pub mod review_prompts {
|
|
pub use codex_core::review_prompts::*;
|
|
}
|
|
|
|
pub mod skills {
|
|
pub use codex_core::skills::*;
|
|
}
|
|
|
|
pub mod test_support {
|
|
pub use codex_core::test_support::*;
|
|
}
|
|
|
|
pub mod util {
|
|
pub use codex_core::util::*;
|
|
}
|
|
|
|
pub mod windows_sandbox {
|
|
pub use codex_core::windows_sandbox::*;
|
|
}
|
|
}
|
|
|
|
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
|
|
|
|
/// Raw app-server request result for typed in-process requests.
|
|
///
|
|
/// Even on the in-process path, successful responses still travel back through
|
|
/// the same JSON-RPC result envelope used by socket/stdio transports because
|
|
/// `MessageProcessor` continues to produce that shape internally.
|
|
pub type RequestResult = std::result::Result<JsonRpcResult, JSONRPCErrorError>;
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub enum AppServerEvent {
|
|
Lagged { skipped: usize },
|
|
ServerNotification(ServerNotification),
|
|
ServerRequest(ServerRequest),
|
|
Disconnected { message: String },
|
|
}
|
|
|
|
impl From<InProcessServerEvent> for AppServerEvent {
|
|
fn from(value: InProcessServerEvent) -> Self {
|
|
match value {
|
|
InProcessServerEvent::Lagged { skipped } => Self::Lagged { skipped },
|
|
InProcessServerEvent::ServerNotification(notification) => {
|
|
Self::ServerNotification(notification)
|
|
}
|
|
InProcessServerEvent::ServerRequest(request) => Self::ServerRequest(request),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn event_requires_delivery(event: &InProcessServerEvent) -> bool {
|
|
// These transcript and terminal events must remain lossless. Dropping
|
|
// streamed assistant text or the authoritative completed item can leave
|
|
// the TUI with permanently corrupted markdown, while dropping completion
|
|
// notifications can leave surfaces waiting forever.
|
|
match event {
|
|
InProcessServerEvent::ServerNotification(notification) => {
|
|
server_notification_requires_delivery(notification)
|
|
}
|
|
_ => false,
|
|
}
|
|
}
|
|
|
|
/// Returns `true` for notifications that must survive backpressure.
|
|
///
|
|
/// Transcript events (`AgentMessageDelta`, `PlanDelta`, reasoning deltas) and
|
|
/// the authoritative `ItemCompleted` / `TurnCompleted` form the lossless tier
|
|
/// of the event stream. Dropping any of these corrupts the visible assistant
|
|
/// output or leaves surfaces waiting for a completion signal that already
|
|
/// fired. Everything else (`CommandExecutionOutputDelta`, progress, etc.) is
|
|
/// best-effort and may be dropped with only cosmetic impact.
|
|
///
|
|
/// Both the in-process and remote transports delegate to this function so the
|
|
/// classification stays in sync.
|
|
pub(crate) fn server_notification_requires_delivery(notification: &ServerNotification) -> bool {
|
|
matches!(
|
|
notification,
|
|
ServerNotification::TurnCompleted(_)
|
|
| ServerNotification::ItemCompleted(_)
|
|
| ServerNotification::AgentMessageDelta(_)
|
|
| ServerNotification::PlanDelta(_)
|
|
| ServerNotification::ReasoningSummaryTextDelta(_)
|
|
| ServerNotification::ReasoningTextDelta(_)
|
|
)
|
|
}
|
|
|
|
/// Outcome of attempting to forward a single event to the consumer channel.
|
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
|
enum ForwardEventResult {
|
|
/// The event was delivered (or intentionally dropped); the stream is healthy.
|
|
Continue,
|
|
/// The consumer channel is closed; the caller should stop producing events.
|
|
DisableStream,
|
|
}
|
|
|
|
/// Forwards a single in-process event to the consumer, respecting the
|
|
/// lossless/best-effort split.
|
|
///
|
|
/// Lossless events (transcript deltas, item/turn completions) block until the
|
|
/// consumer drains capacity. Best-effort events use `try_send` and increment
|
|
/// `skipped_events` on failure. When a lag marker needs to be flushed before a
|
|
/// lossless event, the flush itself blocks so the marker is never lost.
|
|
///
|
|
/// If a dropped event is a `ServerRequest`, `reject_server_request` is called
|
|
/// so the server does not wait for a response that will never come.
|
|
async fn forward_in_process_event<F>(
|
|
event_tx: &mpsc::Sender<InProcessServerEvent>,
|
|
skipped_events: &mut usize,
|
|
event: InProcessServerEvent,
|
|
mut reject_server_request: F,
|
|
) -> ForwardEventResult
|
|
where
|
|
F: FnMut(ServerRequest),
|
|
{
|
|
if *skipped_events > 0 {
|
|
if event_requires_delivery(&event) {
|
|
// Surface lag before the lossless event, but do not let the lag marker itself cause
|
|
// us to drop the transcript/completion notification the caller is blocked on.
|
|
if event_tx
|
|
.send(InProcessServerEvent::Lagged {
|
|
skipped: *skipped_events,
|
|
})
|
|
.await
|
|
.is_err()
|
|
{
|
|
return ForwardEventResult::DisableStream;
|
|
}
|
|
*skipped_events = 0;
|
|
} else {
|
|
match event_tx.try_send(InProcessServerEvent::Lagged {
|
|
skipped: *skipped_events,
|
|
}) {
|
|
Ok(()) => {
|
|
*skipped_events = 0;
|
|
}
|
|
Err(mpsc::error::TrySendError::Full(_)) => {
|
|
*skipped_events = skipped_events.saturating_add(1);
|
|
warn!("dropping in-process app-server event because consumer queue is full");
|
|
if let InProcessServerEvent::ServerRequest(request) = event {
|
|
reject_server_request(request);
|
|
}
|
|
return ForwardEventResult::Continue;
|
|
}
|
|
Err(mpsc::error::TrySendError::Closed(_)) => {
|
|
return ForwardEventResult::DisableStream;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if event_requires_delivery(&event) {
|
|
// Block until the consumer catches up for transcript/completion notifications; this
|
|
// preserves the visible assistant output even when the queue is otherwise saturated.
|
|
if event_tx.send(event).await.is_err() {
|
|
return ForwardEventResult::DisableStream;
|
|
}
|
|
return ForwardEventResult::Continue;
|
|
}
|
|
|
|
match event_tx.try_send(event) {
|
|
Ok(()) => ForwardEventResult::Continue,
|
|
Err(mpsc::error::TrySendError::Full(event)) => {
|
|
*skipped_events = skipped_events.saturating_add(1);
|
|
warn!("dropping in-process app-server event because consumer queue is full");
|
|
if let InProcessServerEvent::ServerRequest(request) = event {
|
|
reject_server_request(request);
|
|
}
|
|
ForwardEventResult::Continue
|
|
}
|
|
Err(mpsc::error::TrySendError::Closed(_)) => ForwardEventResult::DisableStream,
|
|
}
|
|
}
|
|
|
|
/// Layered error for [`InProcessAppServerClient::request_typed`].
|
|
///
|
|
/// This keeps transport failures, server-side JSON-RPC failures, and response
|
|
/// decode failures distinct so callers can decide whether to retry, surface a
|
|
/// server error, or treat the response as an internal request/response mismatch.
|
|
#[derive(Debug)]
|
|
pub enum TypedRequestError {
|
|
Transport {
|
|
method: String,
|
|
source: IoError,
|
|
},
|
|
Server {
|
|
method: String,
|
|
source: JSONRPCErrorError,
|
|
},
|
|
Deserialize {
|
|
method: String,
|
|
source: serde_json::Error,
|
|
},
|
|
}
|
|
|
|
impl fmt::Display for TypedRequestError {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
match self {
|
|
Self::Transport { method, source } => {
|
|
write!(f, "{method} transport error: {source}")
|
|
}
|
|
Self::Server { method, source } => {
|
|
write!(f, "{method} failed: {}", source.message)
|
|
}
|
|
Self::Deserialize { method, source } => {
|
|
write!(f, "{method} response decode error: {source}")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Error for TypedRequestError {
|
|
fn source(&self) -> Option<&(dyn Error + 'static)> {
|
|
match self {
|
|
Self::Transport { source, .. } => Some(source),
|
|
Self::Server { .. } => None,
|
|
Self::Deserialize { source, .. } => Some(source),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct InProcessClientStartArgs {
|
|
/// Resolved argv0 dispatch paths used by command execution internals.
|
|
pub arg0_paths: Arg0DispatchPaths,
|
|
/// Shared config used to initialize app-server runtime.
|
|
pub config: Arc<Config>,
|
|
/// CLI config overrides that are already parsed into TOML values.
|
|
pub cli_overrides: Vec<(String, TomlValue)>,
|
|
/// Loader override knobs used by config API paths.
|
|
pub loader_overrides: LoaderOverrides,
|
|
/// Preloaded cloud requirements provider.
|
|
pub cloud_requirements: CloudRequirementsLoader,
|
|
/// Feedback sink used by app-server/core telemetry and logs.
|
|
pub feedback: CodexFeedback,
|
|
/// Environment manager used by core execution and filesystem operations.
|
|
pub environment_manager: Arc<EnvironmentManager>,
|
|
/// Startup warnings emitted after initialize succeeds.
|
|
pub config_warnings: Vec<ConfigWarningNotification>,
|
|
/// Session source recorded in app-server thread metadata.
|
|
pub session_source: SessionSource,
|
|
/// Whether auth loading should honor the `CODEX_API_KEY` environment variable.
|
|
pub enable_codex_api_key_env: bool,
|
|
/// Client name reported during initialize.
|
|
pub client_name: String,
|
|
/// Client version reported during initialize.
|
|
pub client_version: String,
|
|
/// Whether experimental APIs are requested at initialize time.
|
|
pub experimental_api: bool,
|
|
/// Notification methods this client opts out of receiving.
|
|
pub opt_out_notification_methods: Vec<String>,
|
|
/// Queue capacity for command/event channels (clamped to at least 1).
|
|
pub channel_capacity: usize,
|
|
}
|
|
|
|
impl InProcessClientStartArgs {
|
|
/// Builds initialize params from caller-provided metadata.
|
|
pub fn initialize_params(&self) -> InitializeParams {
|
|
let capabilities = InitializeCapabilities {
|
|
experimental_api: self.experimental_api,
|
|
opt_out_notification_methods: if self.opt_out_notification_methods.is_empty() {
|
|
None
|
|
} else {
|
|
Some(self.opt_out_notification_methods.clone())
|
|
},
|
|
};
|
|
|
|
InitializeParams {
|
|
client_info: ClientInfo {
|
|
name: self.client_name.clone(),
|
|
title: None,
|
|
version: self.client_version.clone(),
|
|
},
|
|
capabilities: Some(capabilities),
|
|
}
|
|
}
|
|
|
|
fn into_runtime_start_args(self) -> InProcessStartArgs {
|
|
let initialize = self.initialize_params();
|
|
InProcessStartArgs {
|
|
arg0_paths: self.arg0_paths,
|
|
config: self.config,
|
|
cli_overrides: self.cli_overrides,
|
|
loader_overrides: self.loader_overrides,
|
|
cloud_requirements: self.cloud_requirements,
|
|
feedback: self.feedback,
|
|
environment_manager: self.environment_manager,
|
|
config_warnings: self.config_warnings,
|
|
session_source: self.session_source,
|
|
enable_codex_api_key_env: self.enable_codex_api_key_env,
|
|
initialize,
|
|
channel_capacity: self.channel_capacity,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Internal command sent from public facade methods to the worker task.
|
|
///
|
|
/// Each variant carries a oneshot sender so the caller can `await` the
|
|
/// result without holding a mutable reference to the client.
|
|
enum ClientCommand {
|
|
Request {
|
|
request: Box<ClientRequest>,
|
|
response_tx: oneshot::Sender<IoResult<RequestResult>>,
|
|
},
|
|
Notify {
|
|
notification: ClientNotification,
|
|
response_tx: oneshot::Sender<IoResult<()>>,
|
|
},
|
|
ResolveServerRequest {
|
|
request_id: RequestId,
|
|
result: JsonRpcResult,
|
|
response_tx: oneshot::Sender<IoResult<()>>,
|
|
},
|
|
RejectServerRequest {
|
|
request_id: RequestId,
|
|
error: JSONRPCErrorError,
|
|
response_tx: oneshot::Sender<IoResult<()>>,
|
|
},
|
|
Shutdown {
|
|
response_tx: oneshot::Sender<IoResult<()>>,
|
|
},
|
|
}
|
|
|
|
/// Async facade over the in-process app-server runtime.
|
|
///
|
|
/// This type owns a worker task that bridges between:
|
|
/// - caller-facing async `mpsc` channels used by TUI/exec
|
|
/// - [`codex_app_server::in_process::InProcessClientHandle`], which speaks to
|
|
/// the embedded `MessageProcessor`
|
|
///
|
|
/// The facade intentionally preserves the server's request/notification/event
|
|
/// model instead of exposing direct core runtime handles. That keeps in-process
|
|
/// callers aligned with app-server behavior while still avoiding a process
|
|
/// boundary.
|
|
pub struct InProcessAppServerClient {
|
|
command_tx: mpsc::Sender<ClientCommand>,
|
|
event_rx: mpsc::Receiver<InProcessServerEvent>,
|
|
worker_handle: tokio::task::JoinHandle<()>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct InProcessAppServerRequestHandle {
|
|
command_tx: mpsc::Sender<ClientCommand>,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub enum AppServerRequestHandle {
|
|
InProcess(InProcessAppServerRequestHandle),
|
|
Remote(crate::remote::RemoteAppServerRequestHandle),
|
|
}
|
|
|
|
pub enum AppServerClient {
|
|
InProcess(InProcessAppServerClient),
|
|
Remote(RemoteAppServerClient),
|
|
}
|
|
|
|
impl InProcessAppServerClient {
|
|
/// Starts the in-process runtime and facade worker task.
|
|
///
|
|
/// The returned client is ready for requests and event consumption. If the
|
|
/// internal event queue is saturated later, server requests are rejected
|
|
/// with overload error instead of being silently dropped.
|
|
pub async fn start(args: InProcessClientStartArgs) -> IoResult<Self> {
|
|
let channel_capacity = args.channel_capacity.max(1);
|
|
let mut handle =
|
|
codex_app_server::in_process::start(args.into_runtime_start_args()).await?;
|
|
let request_sender = handle.sender();
|
|
let (command_tx, mut command_rx) = mpsc::channel::<ClientCommand>(channel_capacity);
|
|
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
|
|
|
|
let worker_handle = tokio::spawn(async move {
|
|
let mut event_stream_enabled = true;
|
|
let mut skipped_events = 0usize;
|
|
loop {
|
|
tokio::select! {
|
|
command = command_rx.recv() => {
|
|
match command {
|
|
Some(ClientCommand::Request { request, response_tx }) => {
|
|
let request_sender = request_sender.clone();
|
|
// Request waits happen on a detached task so
|
|
// this loop can keep draining runtime events
|
|
// while the request is blocked on client input.
|
|
tokio::spawn(async move {
|
|
let result = request_sender.request(*request).await;
|
|
let _ = response_tx.send(result);
|
|
});
|
|
}
|
|
Some(ClientCommand::Notify {
|
|
notification,
|
|
response_tx,
|
|
}) => {
|
|
let result = request_sender.notify(notification);
|
|
let _ = response_tx.send(result);
|
|
}
|
|
Some(ClientCommand::ResolveServerRequest {
|
|
request_id,
|
|
result,
|
|
response_tx,
|
|
}) => {
|
|
let send_result =
|
|
request_sender.respond_to_server_request(request_id, result);
|
|
let _ = response_tx.send(send_result);
|
|
}
|
|
Some(ClientCommand::RejectServerRequest {
|
|
request_id,
|
|
error,
|
|
response_tx,
|
|
}) => {
|
|
let send_result = request_sender.fail_server_request(request_id, error);
|
|
let _ = response_tx.send(send_result);
|
|
}
|
|
Some(ClientCommand::Shutdown { response_tx }) => {
|
|
let shutdown_result = handle.shutdown().await;
|
|
let _ = response_tx.send(shutdown_result);
|
|
break;
|
|
}
|
|
None => {
|
|
let _ = handle.shutdown().await;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
event = handle.next_event(), if event_stream_enabled => {
|
|
let Some(event) = event else {
|
|
break;
|
|
};
|
|
if let InProcessServerEvent::ServerRequest(
|
|
ServerRequest::ChatgptAuthTokensRefresh { request_id, .. }
|
|
) = &event
|
|
{
|
|
let send_result = request_sender.fail_server_request(
|
|
request_id.clone(),
|
|
JSONRPCErrorError {
|
|
code: -32000,
|
|
message: "chatgpt auth token refresh is not supported for in-process app-server clients".to_string(),
|
|
data: None,
|
|
},
|
|
);
|
|
if let Err(err) = send_result {
|
|
warn!(
|
|
"failed to reject unsupported chatgpt auth token refresh request: {err}"
|
|
);
|
|
}
|
|
continue;
|
|
}
|
|
|
|
match forward_in_process_event(
|
|
&event_tx,
|
|
&mut skipped_events,
|
|
event,
|
|
|request| {
|
|
let _ = request_sender.fail_server_request(
|
|
request.id().clone(),
|
|
JSONRPCErrorError {
|
|
code: -32001,
|
|
message: "in-process app-server event queue is full"
|
|
.to_string(),
|
|
data: None,
|
|
},
|
|
);
|
|
},
|
|
)
|
|
.await
|
|
{
|
|
ForwardEventResult::Continue => {}
|
|
ForwardEventResult::DisableStream => {
|
|
event_stream_enabled = false;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
Ok(Self {
|
|
command_tx,
|
|
event_rx,
|
|
worker_handle,
|
|
})
|
|
}
|
|
|
|
pub fn request_handle(&self) -> InProcessAppServerRequestHandle {
|
|
InProcessAppServerRequestHandle {
|
|
command_tx: self.command_tx.clone(),
|
|
}
|
|
}
|
|
|
|
/// Sends a typed client request and returns raw JSON-RPC result.
|
|
///
|
|
/// Callers that expect a concrete response type should usually prefer
|
|
/// [`request_typed`](Self::request_typed).
|
|
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(ClientCommand::Request {
|
|
request: Box::new(request),
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server request channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
/// Sends a typed client request and decodes the successful response body.
|
|
///
|
|
/// This still deserializes from a JSON value produced by app-server's
|
|
/// JSON-RPC result envelope. Because the caller chooses `T`, `Deserialize`
|
|
/// failures indicate an internal request/response mismatch at the call site
|
|
/// (or an in-process bug), not transport skew from an external client.
|
|
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
|
|
where
|
|
T: DeserializeOwned,
|
|
{
|
|
let method = request_method_name(&request);
|
|
let response =
|
|
self.request(request)
|
|
.await
|
|
.map_err(|source| TypedRequestError::Transport {
|
|
method: method.clone(),
|
|
source,
|
|
})?;
|
|
let result = response.map_err(|source| TypedRequestError::Server {
|
|
method: method.clone(),
|
|
source,
|
|
})?;
|
|
serde_json::from_value(result)
|
|
.map_err(|source| TypedRequestError::Deserialize { method, source })
|
|
}
|
|
|
|
/// Sends a typed client notification.
|
|
pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(ClientCommand::Notify {
|
|
notification,
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server notify channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
/// Resolves a pending server request.
|
|
///
|
|
/// This should only be called with request IDs obtained from the current
|
|
/// client's event stream.
|
|
pub async fn resolve_server_request(
|
|
&self,
|
|
request_id: RequestId,
|
|
result: JsonRpcResult,
|
|
) -> IoResult<()> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(ClientCommand::ResolveServerRequest {
|
|
request_id,
|
|
result,
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server resolve channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
/// Rejects a pending server request with JSON-RPC error payload.
|
|
pub async fn reject_server_request(
|
|
&self,
|
|
request_id: RequestId,
|
|
error: JSONRPCErrorError,
|
|
) -> IoResult<()> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(ClientCommand::RejectServerRequest {
|
|
request_id,
|
|
error,
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server reject channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
/// Returns the next in-process event, or `None` when worker exits.
|
|
///
|
|
/// Callers are expected to drain this stream promptly. If they fall behind,
|
|
/// the worker emits [`InProcessServerEvent::Lagged`] markers and may reject
|
|
/// pending server requests rather than letting approval flows hang.
|
|
pub async fn next_event(&mut self) -> Option<InProcessServerEvent> {
|
|
self.event_rx.recv().await
|
|
}
|
|
|
|
/// Shuts down worker and in-process runtime with bounded wait.
|
|
///
|
|
/// If graceful shutdown exceeds timeout, the worker task is aborted to
|
|
/// avoid leaking background tasks in embedding callers.
|
|
pub async fn shutdown(self) -> IoResult<()> {
|
|
let Self {
|
|
command_tx,
|
|
event_rx,
|
|
worker_handle,
|
|
} = self;
|
|
let mut worker_handle = worker_handle;
|
|
// Drop the caller-facing receiver before asking the worker to shut
|
|
// down. That unblocks any pending must-deliver `event_tx.send(..)`
|
|
// so the worker can reach `handle.shutdown()` instead of timing out
|
|
// and getting aborted with the runtime still attached.
|
|
drop(event_rx);
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
if command_tx
|
|
.send(ClientCommand::Shutdown { response_tx })
|
|
.await
|
|
.is_ok()
|
|
&& let Ok(command_result) = timeout(SHUTDOWN_TIMEOUT, response_rx).await
|
|
{
|
|
command_result.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server shutdown channel is closed",
|
|
)
|
|
})??;
|
|
}
|
|
|
|
if let Err(_elapsed) = timeout(SHUTDOWN_TIMEOUT, &mut worker_handle).await {
|
|
worker_handle.abort();
|
|
let _ = worker_handle.await;
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl InProcessAppServerRequestHandle {
|
|
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
|
|
let (response_tx, response_rx) = oneshot::channel();
|
|
self.command_tx
|
|
.send(ClientCommand::Request {
|
|
request: Box::new(request),
|
|
response_tx,
|
|
})
|
|
.await
|
|
.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server worker channel is closed",
|
|
)
|
|
})?;
|
|
response_rx.await.map_err(|_| {
|
|
IoError::new(
|
|
ErrorKind::BrokenPipe,
|
|
"in-process app-server request channel is closed",
|
|
)
|
|
})?
|
|
}
|
|
|
|
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
|
|
where
|
|
T: DeserializeOwned,
|
|
{
|
|
let method = request_method_name(&request);
|
|
let response =
|
|
self.request(request)
|
|
.await
|
|
.map_err(|source| TypedRequestError::Transport {
|
|
method: method.clone(),
|
|
source,
|
|
})?;
|
|
let result = response.map_err(|source| TypedRequestError::Server {
|
|
method: method.clone(),
|
|
source,
|
|
})?;
|
|
serde_json::from_value(result)
|
|
.map_err(|source| TypedRequestError::Deserialize { method, source })
|
|
}
|
|
}
|
|
|
|
impl AppServerRequestHandle {
|
|
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
|
|
match self {
|
|
Self::InProcess(handle) => handle.request(request).await,
|
|
Self::Remote(handle) => handle.request(request).await,
|
|
}
|
|
}
|
|
|
|
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
|
|
where
|
|
T: DeserializeOwned,
|
|
{
|
|
match self {
|
|
Self::InProcess(handle) => handle.request_typed(request).await,
|
|
Self::Remote(handle) => handle.request_typed(request).await,
|
|
}
|
|
}
|
|
}
|
|
|
|
impl AppServerClient {
|
|
pub async fn request(&self, request: ClientRequest) -> IoResult<RequestResult> {
|
|
match self {
|
|
Self::InProcess(client) => client.request(request).await,
|
|
Self::Remote(client) => client.request(request).await,
|
|
}
|
|
}
|
|
|
|
pub async fn request_typed<T>(&self, request: ClientRequest) -> Result<T, TypedRequestError>
|
|
where
|
|
T: DeserializeOwned,
|
|
{
|
|
match self {
|
|
Self::InProcess(client) => client.request_typed(request).await,
|
|
Self::Remote(client) => client.request_typed(request).await,
|
|
}
|
|
}
|
|
|
|
pub async fn notify(&self, notification: ClientNotification) -> IoResult<()> {
|
|
match self {
|
|
Self::InProcess(client) => client.notify(notification).await,
|
|
Self::Remote(client) => client.notify(notification).await,
|
|
}
|
|
}
|
|
|
|
pub async fn resolve_server_request(
|
|
&self,
|
|
request_id: RequestId,
|
|
result: JsonRpcResult,
|
|
) -> IoResult<()> {
|
|
match self {
|
|
Self::InProcess(client) => client.resolve_server_request(request_id, result).await,
|
|
Self::Remote(client) => client.resolve_server_request(request_id, result).await,
|
|
}
|
|
}
|
|
|
|
pub async fn reject_server_request(
|
|
&self,
|
|
request_id: RequestId,
|
|
error: JSONRPCErrorError,
|
|
) -> IoResult<()> {
|
|
match self {
|
|
Self::InProcess(client) => client.reject_server_request(request_id, error).await,
|
|
Self::Remote(client) => client.reject_server_request(request_id, error).await,
|
|
}
|
|
}
|
|
|
|
pub async fn next_event(&mut self) -> Option<AppServerEvent> {
|
|
match self {
|
|
Self::InProcess(client) => client.next_event().await.map(Into::into),
|
|
Self::Remote(client) => client.next_event().await,
|
|
}
|
|
}
|
|
|
|
pub async fn shutdown(self) -> IoResult<()> {
|
|
match self {
|
|
Self::InProcess(client) => client.shutdown().await,
|
|
Self::Remote(client) => client.shutdown().await,
|
|
}
|
|
}
|
|
|
|
pub fn request_handle(&self) -> AppServerRequestHandle {
|
|
match self {
|
|
Self::InProcess(client) => AppServerRequestHandle::InProcess(client.request_handle()),
|
|
Self::Remote(client) => AppServerRequestHandle::Remote(client.request_handle()),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Extracts the JSON-RPC method name for diagnostics without extending the
|
|
/// protocol crate with in-process-only helpers.
|
|
pub(crate) fn request_method_name(request: &ClientRequest) -> String {
|
|
serde_json::to_value(request)
|
|
.ok()
|
|
.and_then(|value| {
|
|
value
|
|
.get("method")
|
|
.and_then(serde_json::Value::as_str)
|
|
.map(ToOwned::to_owned)
|
|
})
|
|
.unwrap_or_else(|| "<unknown>".to_string())
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use codex_app_server_protocol::AccountUpdatedNotification;
|
|
use codex_app_server_protocol::ConfigRequirementsReadResponse;
|
|
use codex_app_server_protocol::GetAccountResponse;
|
|
use codex_app_server_protocol::JSONRPCMessage;
|
|
use codex_app_server_protocol::JSONRPCRequest;
|
|
use codex_app_server_protocol::JSONRPCResponse;
|
|
use codex_app_server_protocol::ServerNotification;
|
|
use codex_app_server_protocol::SessionSource as ApiSessionSource;
|
|
use codex_app_server_protocol::ThreadStartParams;
|
|
use codex_app_server_protocol::ThreadStartResponse;
|
|
use codex_app_server_protocol::ToolRequestUserInputParams;
|
|
use codex_app_server_protocol::ToolRequestUserInputQuestion;
|
|
use codex_core::config::ConfigBuilder;
|
|
use futures::SinkExt;
|
|
use futures::StreamExt;
|
|
use pretty_assertions::assert_eq;
|
|
use tokio::net::TcpListener;
|
|
use tokio::time::Duration;
|
|
use tokio::time::timeout;
|
|
use tokio_tungstenite::accept_hdr_async;
|
|
use tokio_tungstenite::tungstenite::Message;
|
|
use tokio_tungstenite::tungstenite::handshake::server::Request as WebSocketRequest;
|
|
use tokio_tungstenite::tungstenite::handshake::server::Response as WebSocketResponse;
|
|
use tokio_tungstenite::tungstenite::http::header::AUTHORIZATION;
|
|
|
|
async fn build_test_config() -> Config {
|
|
match ConfigBuilder::default().build().await {
|
|
Ok(config) => config,
|
|
Err(_) => Config::load_default_with_cli_overrides(Vec::new())
|
|
.expect("default config should load"),
|
|
}
|
|
}
|
|
|
|
async fn start_test_client_with_capacity(
|
|
session_source: SessionSource,
|
|
channel_capacity: usize,
|
|
) -> InProcessAppServerClient {
|
|
InProcessAppServerClient::start(InProcessClientStartArgs {
|
|
arg0_paths: Arg0DispatchPaths::default(),
|
|
config: Arc::new(build_test_config().await),
|
|
cli_overrides: Vec::new(),
|
|
loader_overrides: LoaderOverrides::default(),
|
|
cloud_requirements: CloudRequirementsLoader::default(),
|
|
feedback: CodexFeedback::new(),
|
|
environment_manager: Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)),
|
|
config_warnings: Vec::new(),
|
|
session_source,
|
|
enable_codex_api_key_env: false,
|
|
client_name: "codex-app-server-client-test".to_string(),
|
|
client_version: "0.0.0-test".to_string(),
|
|
experimental_api: true,
|
|
opt_out_notification_methods: Vec::new(),
|
|
channel_capacity,
|
|
})
|
|
.await
|
|
.expect("in-process app-server client should start")
|
|
}
|
|
|
|
async fn start_test_client(session_source: SessionSource) -> InProcessAppServerClient {
|
|
start_test_client_with_capacity(session_source, DEFAULT_IN_PROCESS_CHANNEL_CAPACITY).await
|
|
}
|
|
|
|
async fn start_test_remote_server<F, Fut>(handler: F) -> String
|
|
where
|
|
F: FnOnce(tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>) -> Fut
|
|
+ Send
|
|
+ 'static,
|
|
Fut: std::future::Future<Output = ()> + Send + 'static,
|
|
{
|
|
start_test_remote_server_with_auth(/*expected_auth_token*/ None, handler).await
|
|
}
|
|
|
|
async fn start_test_remote_server_with_auth<F, Fut>(
|
|
expected_auth_token: Option<String>,
|
|
handler: F,
|
|
) -> String
|
|
where
|
|
F: FnOnce(tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>) -> Fut
|
|
+ Send
|
|
+ 'static,
|
|
Fut: std::future::Future<Output = ()> + Send + 'static,
|
|
{
|
|
let listener = TcpListener::bind("127.0.0.1:0")
|
|
.await
|
|
.expect("listener should bind");
|
|
let addr = listener.local_addr().expect("listener address");
|
|
tokio::spawn(async move {
|
|
let (stream, _) = listener.accept().await.expect("accept should succeed");
|
|
let websocket = accept_hdr_async(
|
|
stream,
|
|
move |request: &WebSocketRequest, response: WebSocketResponse| {
|
|
let provided_auth_token = request
|
|
.headers()
|
|
.get(AUTHORIZATION)
|
|
.and_then(|value| value.to_str().ok())
|
|
.map(str::to_owned);
|
|
let expected_auth_token = expected_auth_token
|
|
.as_ref()
|
|
.map(|token| format!("Bearer {token}"));
|
|
assert_eq!(provided_auth_token, expected_auth_token);
|
|
Ok(response)
|
|
},
|
|
)
|
|
.await
|
|
.expect("websocket upgrade should succeed");
|
|
handler(websocket).await;
|
|
});
|
|
format!("ws://{addr}")
|
|
}
|
|
|
|
async fn expect_remote_initialize(
|
|
websocket: &mut tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
|
|
) {
|
|
let JSONRPCMessage::Request(request) = read_websocket_message(websocket).await else {
|
|
panic!("expected initialize request");
|
|
};
|
|
assert_eq!(request.method, "initialize");
|
|
write_websocket_message(
|
|
websocket,
|
|
JSONRPCMessage::Response(JSONRPCResponse {
|
|
id: request.id,
|
|
result: serde_json::json!({}),
|
|
}),
|
|
)
|
|
.await;
|
|
|
|
let JSONRPCMessage::Notification(notification) = read_websocket_message(websocket).await
|
|
else {
|
|
panic!("expected initialized notification");
|
|
};
|
|
assert_eq!(notification.method, "initialized");
|
|
}
|
|
|
|
async fn read_websocket_message(
|
|
websocket: &mut tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
|
|
) -> JSONRPCMessage {
|
|
loop {
|
|
let frame = websocket
|
|
.next()
|
|
.await
|
|
.expect("frame should be available")
|
|
.expect("frame should decode");
|
|
match frame {
|
|
Message::Text(text) => {
|
|
return serde_json::from_str::<JSONRPCMessage>(&text)
|
|
.expect("text frame should be valid JSON-RPC");
|
|
}
|
|
Message::Binary(_) | Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => {
|
|
continue;
|
|
}
|
|
Message::Close(_) => panic!("unexpected close frame"),
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn write_websocket_message(
|
|
websocket: &mut tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
|
|
message: JSONRPCMessage,
|
|
) {
|
|
websocket
|
|
.send(Message::Text(
|
|
serde_json::to_string(&message)
|
|
.expect("message should serialize")
|
|
.into(),
|
|
))
|
|
.await
|
|
.expect("message should send");
|
|
}
|
|
|
|
fn command_execution_output_delta_notification(delta: &str) -> ServerNotification {
|
|
ServerNotification::CommandExecutionOutputDelta(
|
|
codex_app_server_protocol::CommandExecutionOutputDeltaNotification {
|
|
thread_id: "thread".to_string(),
|
|
turn_id: "turn".to_string(),
|
|
item_id: "item".to_string(),
|
|
delta: delta.to_string(),
|
|
},
|
|
)
|
|
}
|
|
|
|
fn agent_message_delta_notification(delta: &str) -> ServerNotification {
|
|
ServerNotification::AgentMessageDelta(
|
|
codex_app_server_protocol::AgentMessageDeltaNotification {
|
|
thread_id: "thread".to_string(),
|
|
turn_id: "turn".to_string(),
|
|
item_id: "item".to_string(),
|
|
delta: delta.to_string(),
|
|
},
|
|
)
|
|
}
|
|
|
|
fn item_completed_notification(text: &str) -> ServerNotification {
|
|
ServerNotification::ItemCompleted(codex_app_server_protocol::ItemCompletedNotification {
|
|
thread_id: "thread".to_string(),
|
|
turn_id: "turn".to_string(),
|
|
item: codex_app_server_protocol::ThreadItem::AgentMessage {
|
|
id: "item".to_string(),
|
|
text: text.to_string(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
},
|
|
})
|
|
}
|
|
|
|
fn turn_completed_notification() -> ServerNotification {
|
|
ServerNotification::TurnCompleted(codex_app_server_protocol::TurnCompletedNotification {
|
|
thread_id: "thread".to_string(),
|
|
turn: codex_app_server_protocol::Turn {
|
|
id: "turn".to_string(),
|
|
items: Vec::new(),
|
|
status: codex_app_server_protocol::TurnStatus::Completed,
|
|
error: None,
|
|
started_at: None,
|
|
completed_at: Some(0),
|
|
duration_ms: Some(1),
|
|
},
|
|
})
|
|
}
|
|
|
|
fn test_remote_connect_args(websocket_url: String) -> RemoteAppServerConnectArgs {
|
|
RemoteAppServerConnectArgs {
|
|
websocket_url,
|
|
auth_token: None,
|
|
client_name: "codex-app-server-client-test".to_string(),
|
|
client_version: "0.0.0-test".to_string(),
|
|
experimental_api: true,
|
|
opt_out_notification_methods: Vec::new(),
|
|
channel_capacity: 8,
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn typed_request_roundtrip_works() {
|
|
let client = start_test_client(SessionSource::Exec).await;
|
|
let _response: ConfigRequirementsReadResponse = client
|
|
.request_typed(ClientRequest::ConfigRequirementsRead {
|
|
request_id: RequestId::Integer(1),
|
|
params: None,
|
|
})
|
|
.await
|
|
.expect("typed request should succeed");
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn typed_request_reports_json_rpc_errors() {
|
|
let client = start_test_client(SessionSource::Exec).await;
|
|
let err = client
|
|
.request_typed::<ConfigRequirementsReadResponse>(ClientRequest::ThreadRead {
|
|
request_id: RequestId::Integer(99),
|
|
params: codex_app_server_protocol::ThreadReadParams {
|
|
thread_id: "missing-thread".to_string(),
|
|
include_turns: false,
|
|
},
|
|
})
|
|
.await
|
|
.expect_err("missing thread should return a JSON-RPC error");
|
|
assert!(
|
|
err.to_string().starts_with("thread/read failed:"),
|
|
"expected method-qualified JSON-RPC failure message"
|
|
);
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn caller_provided_session_source_is_applied() {
|
|
for (session_source, expected_source) in [
|
|
(SessionSource::Exec, ApiSessionSource::Exec),
|
|
(SessionSource::Cli, ApiSessionSource::Cli),
|
|
] {
|
|
let client = start_test_client(session_source).await;
|
|
let parsed: ThreadStartResponse = client
|
|
.request_typed(ClientRequest::ThreadStart {
|
|
request_id: RequestId::Integer(2),
|
|
params: ThreadStartParams {
|
|
ephemeral: Some(true),
|
|
..ThreadStartParams::default()
|
|
},
|
|
})
|
|
.await
|
|
.expect("thread/start should succeed");
|
|
assert_eq!(parsed.thread.source, expected_source);
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn threads_started_via_app_server_are_visible_through_typed_requests() {
|
|
let client = start_test_client(SessionSource::Cli).await;
|
|
|
|
let response: ThreadStartResponse = client
|
|
.request_typed(ClientRequest::ThreadStart {
|
|
request_id: RequestId::Integer(3),
|
|
params: ThreadStartParams {
|
|
ephemeral: Some(true),
|
|
..ThreadStartParams::default()
|
|
},
|
|
})
|
|
.await
|
|
.expect("thread/start should succeed");
|
|
let read = client
|
|
.request_typed::<codex_app_server_protocol::ThreadReadResponse>(
|
|
ClientRequest::ThreadRead {
|
|
request_id: RequestId::Integer(4),
|
|
params: codex_app_server_protocol::ThreadReadParams {
|
|
thread_id: response.thread.id.clone(),
|
|
include_turns: false,
|
|
},
|
|
},
|
|
)
|
|
.await
|
|
.expect("thread/read should return the newly started thread");
|
|
assert_eq!(read.thread.id, response.thread.id);
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn tiny_channel_capacity_still_supports_request_roundtrip() {
|
|
let client =
|
|
start_test_client_with_capacity(SessionSource::Exec, /*channel_capacity*/ 1).await;
|
|
let _response: ConfigRequirementsReadResponse = client
|
|
.request_typed(ClientRequest::ConfigRequirementsRead {
|
|
request_id: RequestId::Integer(1),
|
|
params: None,
|
|
})
|
|
.await
|
|
.expect("typed request should succeed");
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn forward_in_process_event_preserves_transcript_notifications_under_backpressure() {
|
|
let (event_tx, mut event_rx) = mpsc::channel(1);
|
|
event_tx
|
|
.send(InProcessServerEvent::ServerNotification(
|
|
command_execution_output_delta_notification("stdout-1"),
|
|
))
|
|
.await
|
|
.expect("initial event should enqueue");
|
|
|
|
let mut skipped_events = 0usize;
|
|
let result = forward_in_process_event(
|
|
&event_tx,
|
|
&mut skipped_events,
|
|
InProcessServerEvent::ServerNotification(command_execution_output_delta_notification(
|
|
"stdout-2",
|
|
)),
|
|
|_| {},
|
|
)
|
|
.await;
|
|
assert_eq!(result, ForwardEventResult::Continue);
|
|
assert_eq!(skipped_events, 1);
|
|
|
|
let receive_task = tokio::spawn(async move {
|
|
let mut events = Vec::new();
|
|
for _ in 0..5 {
|
|
events.push(
|
|
timeout(Duration::from_secs(2), event_rx.recv())
|
|
.await
|
|
.expect("event should arrive before timeout")
|
|
.expect("event stream should stay open"),
|
|
);
|
|
}
|
|
events
|
|
});
|
|
|
|
for notification in [
|
|
agent_message_delta_notification("hello"),
|
|
item_completed_notification("hello"),
|
|
turn_completed_notification(),
|
|
] {
|
|
let result = forward_in_process_event(
|
|
&event_tx,
|
|
&mut skipped_events,
|
|
InProcessServerEvent::ServerNotification(notification),
|
|
|_| {},
|
|
)
|
|
.await;
|
|
assert_eq!(result, ForwardEventResult::Continue);
|
|
}
|
|
assert_eq!(skipped_events, 0);
|
|
|
|
let events = receive_task
|
|
.await
|
|
.expect("receiver task should join successfully");
|
|
assert!(matches!(
|
|
&events[0],
|
|
InProcessServerEvent::ServerNotification(
|
|
ServerNotification::CommandExecutionOutputDelta(notification)
|
|
) if notification.delta == "stdout-1"
|
|
));
|
|
assert!(matches!(
|
|
&events[1],
|
|
InProcessServerEvent::Lagged { skipped: 1 }
|
|
));
|
|
assert!(matches!(
|
|
&events[2],
|
|
InProcessServerEvent::ServerNotification(ServerNotification::AgentMessageDelta(
|
|
notification
|
|
)) if notification.delta == "hello"
|
|
));
|
|
assert!(matches!(
|
|
&events[3],
|
|
InProcessServerEvent::ServerNotification(ServerNotification::ItemCompleted(
|
|
notification
|
|
)) if matches!(
|
|
¬ification.item,
|
|
codex_app_server_protocol::ThreadItem::AgentMessage { text, .. } if text == "hello"
|
|
)
|
|
));
|
|
assert!(matches!(
|
|
&events[4],
|
|
InProcessServerEvent::ServerNotification(ServerNotification::TurnCompleted(
|
|
notification
|
|
)) if notification.turn.status == codex_app_server_protocol::TurnStatus::Completed
|
|
));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_typed_request_roundtrip_works() {
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
let JSONRPCMessage::Request(request) = read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected account/read request");
|
|
};
|
|
assert_eq!(request.method, "account/read");
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Response(JSONRPCResponse {
|
|
id: request.id,
|
|
result: serde_json::to_value(GetAccountResponse {
|
|
account: None,
|
|
requires_openai_auth: false,
|
|
})
|
|
.expect("response should serialize"),
|
|
}),
|
|
)
|
|
.await;
|
|
websocket.close(None).await.expect("close should succeed");
|
|
})
|
|
.await;
|
|
let client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
let response: GetAccountResponse = client
|
|
.request_typed(ClientRequest::GetAccount {
|
|
request_id: RequestId::Integer(1),
|
|
params: codex_app_server_protocol::GetAccountParams {
|
|
refresh_token: false,
|
|
},
|
|
})
|
|
.await
|
|
.expect("typed request should succeed");
|
|
assert_eq!(response.account, None);
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_connect_includes_auth_header_when_configured() {
|
|
let auth_token = "remote-bearer-token".to_string();
|
|
let websocket_url = start_test_remote_server_with_auth(
|
|
Some(auth_token.clone()),
|
|
|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
websocket.close(None).await.expect("close should succeed");
|
|
},
|
|
)
|
|
.await;
|
|
let client = RemoteAppServerClient::connect(RemoteAppServerConnectArgs {
|
|
auth_token: Some(auth_token),
|
|
..test_remote_connect_args(websocket_url)
|
|
})
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_connect_rejects_non_loopback_ws_when_auth_configured() {
|
|
let result = RemoteAppServerClient::connect(RemoteAppServerConnectArgs {
|
|
websocket_url: "ws://example.com:4500".to_string(),
|
|
auth_token: Some("remote-bearer-token".to_string()),
|
|
..test_remote_connect_args("ws://127.0.0.1:1".to_string())
|
|
})
|
|
.await;
|
|
let err = match result {
|
|
Ok(_) => panic!("non-loopback ws should be rejected before connect"),
|
|
Err(err) => err,
|
|
};
|
|
assert_eq!(err.kind(), ErrorKind::InvalidInput);
|
|
assert!(
|
|
err.to_string()
|
|
.contains("remote auth tokens require `wss://` or loopback `ws://` URLs")
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn remote_auth_token_transport_policy_allows_wss_and_loopback_ws() {
|
|
assert!(crate::remote::websocket_url_supports_auth_token(
|
|
&url::Url::parse("wss://example.com:443").expect("wss URL should parse")
|
|
));
|
|
assert!(crate::remote::websocket_url_supports_auth_token(
|
|
&url::Url::parse("ws://127.0.0.1:4500").expect("loopback ws URL should parse")
|
|
));
|
|
assert!(!crate::remote::websocket_url_supports_auth_token(
|
|
&url::Url::parse("ws://example.com:4500").expect("non-loopback ws URL should parse")
|
|
));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_duplicate_request_id_keeps_original_waiter() {
|
|
let (first_request_seen_tx, first_request_seen_rx) = tokio::sync::oneshot::channel();
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
let JSONRPCMessage::Request(request) = read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected account/read request");
|
|
};
|
|
assert_eq!(request.method, "account/read");
|
|
first_request_seen_tx
|
|
.send(request.id.clone())
|
|
.expect("request id should send");
|
|
assert!(
|
|
timeout(
|
|
Duration::from_millis(100),
|
|
read_websocket_message(&mut websocket)
|
|
)
|
|
.await
|
|
.is_err(),
|
|
"duplicate request should not be forwarded to the server"
|
|
);
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Response(JSONRPCResponse {
|
|
id: request.id,
|
|
result: serde_json::to_value(GetAccountResponse {
|
|
account: None,
|
|
requires_openai_auth: false,
|
|
})
|
|
.expect("response should serialize"),
|
|
}),
|
|
)
|
|
.await;
|
|
let _ = websocket.next().await;
|
|
})
|
|
.await;
|
|
let client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
let first_request_handle = client.request_handle();
|
|
let second_request_handle = first_request_handle.clone();
|
|
|
|
let first_request = tokio::spawn(async move {
|
|
first_request_handle
|
|
.request_typed::<GetAccountResponse>(ClientRequest::GetAccount {
|
|
request_id: RequestId::Integer(1),
|
|
params: codex_app_server_protocol::GetAccountParams {
|
|
refresh_token: false,
|
|
},
|
|
})
|
|
.await
|
|
});
|
|
|
|
let first_request_id = first_request_seen_rx
|
|
.await
|
|
.expect("server should observe the first request");
|
|
assert_eq!(first_request_id, RequestId::Integer(1));
|
|
|
|
let second_err = second_request_handle
|
|
.request_typed::<GetAccountResponse>(ClientRequest::GetAccount {
|
|
request_id: RequestId::Integer(1),
|
|
params: codex_app_server_protocol::GetAccountParams {
|
|
refresh_token: false,
|
|
},
|
|
})
|
|
.await
|
|
.expect_err("duplicate request id should be rejected");
|
|
assert_eq!(
|
|
second_err.to_string(),
|
|
"account/read transport error: duplicate remote app-server request id `1`"
|
|
);
|
|
|
|
let first_response = first_request
|
|
.await
|
|
.expect("first request task should join")
|
|
.expect("first request should succeed");
|
|
assert_eq!(
|
|
first_response,
|
|
GetAccountResponse {
|
|
account: None,
|
|
requires_openai_auth: false,
|
|
}
|
|
);
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_notifications_arrive_over_websocket() {
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Notification(
|
|
serde_json::from_value(
|
|
serde_json::to_value(ServerNotification::AccountUpdated(
|
|
AccountUpdatedNotification {
|
|
auth_mode: None,
|
|
plan_type: None,
|
|
},
|
|
))
|
|
.expect("notification should serialize"),
|
|
)
|
|
.expect("notification should convert to JSON-RPC"),
|
|
),
|
|
)
|
|
.await;
|
|
})
|
|
.await;
|
|
let mut client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
let event = client.next_event().await.expect("event should arrive");
|
|
assert!(matches!(
|
|
event,
|
|
AppServerEvent::ServerNotification(ServerNotification::AccountUpdated(_))
|
|
));
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_backpressure_preserves_transcript_notifications() {
|
|
let (done_tx, done_rx) = tokio::sync::oneshot::channel();
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
for notification in [
|
|
command_execution_output_delta_notification("stdout-1"),
|
|
command_execution_output_delta_notification("stdout-2"),
|
|
agent_message_delta_notification("hello"),
|
|
item_completed_notification("hello"),
|
|
turn_completed_notification(),
|
|
] {
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Notification(
|
|
serde_json::from_value(
|
|
serde_json::to_value(notification)
|
|
.expect("notification should serialize"),
|
|
)
|
|
.expect("notification should convert to JSON-RPC"),
|
|
),
|
|
)
|
|
.await;
|
|
}
|
|
let _ = done_rx.await;
|
|
})
|
|
.await;
|
|
let mut client = RemoteAppServerClient::connect(RemoteAppServerConnectArgs {
|
|
websocket_url,
|
|
auth_token: None,
|
|
client_name: "codex-app-server-client-test".to_string(),
|
|
client_version: "0.0.0-test".to_string(),
|
|
experimental_api: true,
|
|
opt_out_notification_methods: Vec::new(),
|
|
channel_capacity: 1,
|
|
})
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
let first_event = timeout(Duration::from_secs(2), client.next_event())
|
|
.await
|
|
.expect("first event should arrive before timeout")
|
|
.expect("event stream should stay open");
|
|
assert!(matches!(
|
|
first_event,
|
|
AppServerEvent::ServerNotification(ServerNotification::CommandExecutionOutputDelta(
|
|
notification
|
|
)) if notification.delta == "stdout-1"
|
|
));
|
|
|
|
let mut remaining_events = Vec::new();
|
|
for _ in 0..4 {
|
|
remaining_events.push(
|
|
timeout(Duration::from_secs(2), client.next_event())
|
|
.await
|
|
.expect("event should arrive before timeout")
|
|
.expect("event stream should stay open"),
|
|
);
|
|
}
|
|
|
|
let mut transcript_event_names = Vec::new();
|
|
for event in &remaining_events {
|
|
match event {
|
|
AppServerEvent::Lagged { skipped: 1 } => {}
|
|
AppServerEvent::ServerNotification(
|
|
ServerNotification::CommandExecutionOutputDelta(notification),
|
|
) if notification.delta == "stdout-2" => {}
|
|
AppServerEvent::ServerNotification(ServerNotification::AgentMessageDelta(
|
|
notification,
|
|
)) if notification.delta == "hello" => {
|
|
transcript_event_names.push("agent_message_delta");
|
|
}
|
|
AppServerEvent::ServerNotification(ServerNotification::ItemCompleted(
|
|
notification,
|
|
)) if matches!(
|
|
¬ification.item,
|
|
codex_app_server_protocol::ThreadItem::AgentMessage { text, .. } if text == "hello"
|
|
) =>
|
|
{
|
|
transcript_event_names.push("item_completed");
|
|
}
|
|
AppServerEvent::ServerNotification(ServerNotification::TurnCompleted(
|
|
notification,
|
|
)) if notification.turn.status
|
|
== codex_app_server_protocol::TurnStatus::Completed =>
|
|
{
|
|
transcript_event_names.push("turn_completed");
|
|
}
|
|
_ => panic!("unexpected remaining event: {event:?}"),
|
|
}
|
|
}
|
|
assert_eq!(
|
|
transcript_event_names,
|
|
vec!["agent_message_delta", "item_completed", "turn_completed"]
|
|
);
|
|
|
|
done_tx
|
|
.send(())
|
|
.expect("server completion signal should send");
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_server_request_resolution_roundtrip_works() {
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
let request_id = RequestId::String("srv-1".to_string());
|
|
let server_request = JSONRPCRequest {
|
|
id: request_id.clone(),
|
|
method: "item/tool/requestUserInput".to_string(),
|
|
params: Some(
|
|
serde_json::to_value(ToolRequestUserInputParams {
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
item_id: "call-1".to_string(),
|
|
questions: vec![ToolRequestUserInputQuestion {
|
|
id: "question-1".to_string(),
|
|
header: "Mode".to_string(),
|
|
question: "Pick one".to_string(),
|
|
is_other: false,
|
|
is_secret: false,
|
|
options: Some(vec![]),
|
|
}],
|
|
})
|
|
.expect("params should serialize"),
|
|
),
|
|
trace: None,
|
|
};
|
|
write_websocket_message(&mut websocket, JSONRPCMessage::Request(server_request)).await;
|
|
|
|
let JSONRPCMessage::Response(response) = read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected server request response");
|
|
};
|
|
assert_eq!(response.id, request_id);
|
|
})
|
|
.await;
|
|
let mut client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
let AppServerEvent::ServerRequest(request) = client
|
|
.next_event()
|
|
.await
|
|
.expect("request event should arrive")
|
|
else {
|
|
panic!("expected server request event");
|
|
};
|
|
client
|
|
.resolve_server_request(request.id().clone(), serde_json::json!({}))
|
|
.await
|
|
.expect("server request should resolve");
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_server_request_received_during_initialize_is_delivered() {
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
let JSONRPCMessage::Request(request) = read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected initialize request");
|
|
};
|
|
assert_eq!(request.method, "initialize");
|
|
|
|
let request_id = RequestId::String("srv-init".to_string());
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Request(JSONRPCRequest {
|
|
id: request_id.clone(),
|
|
method: "item/tool/requestUserInput".to_string(),
|
|
params: Some(
|
|
serde_json::to_value(ToolRequestUserInputParams {
|
|
thread_id: "thread-1".to_string(),
|
|
turn_id: "turn-1".to_string(),
|
|
item_id: "call-1".to_string(),
|
|
questions: vec![ToolRequestUserInputQuestion {
|
|
id: "question-1".to_string(),
|
|
header: "Mode".to_string(),
|
|
question: "Pick one".to_string(),
|
|
is_other: false,
|
|
is_secret: false,
|
|
options: Some(vec![]),
|
|
}],
|
|
})
|
|
.expect("params should serialize"),
|
|
),
|
|
trace: None,
|
|
}),
|
|
)
|
|
.await;
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Response(JSONRPCResponse {
|
|
id: request.id,
|
|
result: serde_json::json!({}),
|
|
}),
|
|
)
|
|
.await;
|
|
|
|
let JSONRPCMessage::Notification(notification) =
|
|
read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected initialized notification");
|
|
};
|
|
assert_eq!(notification.method, "initialized");
|
|
|
|
let JSONRPCMessage::Response(response) = read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected server request response");
|
|
};
|
|
assert_eq!(response.id, request_id);
|
|
})
|
|
.await;
|
|
let mut client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
let AppServerEvent::ServerRequest(request) = client
|
|
.next_event()
|
|
.await
|
|
.expect("request event should arrive")
|
|
else {
|
|
panic!("expected server request event");
|
|
};
|
|
client
|
|
.resolve_server_request(request.id().clone(), serde_json::json!({}))
|
|
.await
|
|
.expect("server request should resolve");
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_unknown_server_request_is_rejected() {
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
let request_id = RequestId::String("srv-unknown".to_string());
|
|
write_websocket_message(
|
|
&mut websocket,
|
|
JSONRPCMessage::Request(JSONRPCRequest {
|
|
id: request_id.clone(),
|
|
method: "thread/unknown".to_string(),
|
|
params: None,
|
|
trace: None,
|
|
}),
|
|
)
|
|
.await;
|
|
|
|
let JSONRPCMessage::Error(response) = read_websocket_message(&mut websocket).await
|
|
else {
|
|
panic!("expected JSON-RPC error response");
|
|
};
|
|
assert_eq!(response.id, request_id);
|
|
assert_eq!(response.error.code, -32601);
|
|
assert_eq!(
|
|
response.error.message,
|
|
"unsupported remote app-server request `thread/unknown`"
|
|
);
|
|
})
|
|
.await;
|
|
let client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn remote_disconnect_surfaces_as_event() {
|
|
let websocket_url = start_test_remote_server(|mut websocket| async move {
|
|
expect_remote_initialize(&mut websocket).await;
|
|
websocket.close(None).await.expect("close should succeed");
|
|
})
|
|
.await;
|
|
let mut client = RemoteAppServerClient::connect(test_remote_connect_args(websocket_url))
|
|
.await
|
|
.expect("remote client should connect");
|
|
|
|
let event = client
|
|
.next_event()
|
|
.await
|
|
.expect("disconnect event should arrive");
|
|
assert!(matches!(event, AppServerEvent::Disconnected { .. }));
|
|
}
|
|
|
|
#[test]
|
|
fn typed_request_error_exposes_sources() {
|
|
let transport = TypedRequestError::Transport {
|
|
method: "config/read".to_string(),
|
|
source: IoError::new(ErrorKind::BrokenPipe, "closed"),
|
|
};
|
|
assert_eq!(std::error::Error::source(&transport).is_some(), true);
|
|
|
|
let server = TypedRequestError::Server {
|
|
method: "thread/read".to_string(),
|
|
source: JSONRPCErrorError {
|
|
code: -32603,
|
|
data: None,
|
|
message: "internal".to_string(),
|
|
},
|
|
};
|
|
assert_eq!(std::error::Error::source(&server).is_some(), false);
|
|
|
|
let deserialize = TypedRequestError::Deserialize {
|
|
method: "thread/start".to_string(),
|
|
source: serde_json::from_str::<u32>("\"nope\"")
|
|
.expect_err("invalid integer should return deserialize error"),
|
|
};
|
|
assert_eq!(std::error::Error::source(&deserialize).is_some(), true);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn next_event_surfaces_lagged_markers() {
|
|
let (command_tx, _command_rx) = mpsc::channel(1);
|
|
let (event_tx, event_rx) = mpsc::channel(1);
|
|
let worker_handle = tokio::spawn(async {});
|
|
event_tx
|
|
.send(InProcessServerEvent::Lagged { skipped: 3 })
|
|
.await
|
|
.expect("lagged marker should enqueue");
|
|
drop(event_tx);
|
|
|
|
let mut client = InProcessAppServerClient {
|
|
command_tx,
|
|
event_rx,
|
|
worker_handle,
|
|
};
|
|
|
|
let event = timeout(Duration::from_secs(2), client.next_event())
|
|
.await
|
|
.expect("lagged marker should arrive before timeout");
|
|
assert!(matches!(
|
|
event,
|
|
Some(InProcessServerEvent::Lagged { skipped: 3 })
|
|
));
|
|
|
|
client.shutdown().await.expect("shutdown should complete");
|
|
}
|
|
|
|
#[test]
|
|
fn event_requires_delivery_marks_transcript_and_terminal_events() {
|
|
assert!(event_requires_delivery(
|
|
&InProcessServerEvent::ServerNotification(
|
|
codex_app_server_protocol::ServerNotification::TurnCompleted(
|
|
codex_app_server_protocol::TurnCompletedNotification {
|
|
thread_id: "thread".to_string(),
|
|
turn: codex_app_server_protocol::Turn {
|
|
id: "turn".to_string(),
|
|
items: Vec::new(),
|
|
status: codex_app_server_protocol::TurnStatus::Completed,
|
|
error: None,
|
|
started_at: None,
|
|
completed_at: Some(0),
|
|
duration_ms: None,
|
|
},
|
|
}
|
|
)
|
|
)
|
|
));
|
|
assert!(event_requires_delivery(
|
|
&InProcessServerEvent::ServerNotification(
|
|
codex_app_server_protocol::ServerNotification::AgentMessageDelta(
|
|
codex_app_server_protocol::AgentMessageDeltaNotification {
|
|
thread_id: "thread".to_string(),
|
|
turn_id: "turn".to_string(),
|
|
item_id: "item".to_string(),
|
|
delta: "hello".to_string(),
|
|
}
|
|
)
|
|
)
|
|
));
|
|
assert!(event_requires_delivery(
|
|
&InProcessServerEvent::ServerNotification(
|
|
codex_app_server_protocol::ServerNotification::ItemCompleted(
|
|
codex_app_server_protocol::ItemCompletedNotification {
|
|
thread_id: "thread".to_string(),
|
|
turn_id: "turn".to_string(),
|
|
item: codex_app_server_protocol::ThreadItem::AgentMessage {
|
|
id: "item".to_string(),
|
|
text: "hello".to_string(),
|
|
phase: None,
|
|
memory_citation: None,
|
|
},
|
|
}
|
|
)
|
|
)
|
|
));
|
|
assert!(!event_requires_delivery(&InProcessServerEvent::Lagged {
|
|
skipped: 1
|
|
}));
|
|
assert!(!event_requires_delivery(
|
|
&InProcessServerEvent::ServerNotification(
|
|
codex_app_server_protocol::ServerNotification::CommandExecutionOutputDelta(
|
|
codex_app_server_protocol::CommandExecutionOutputDeltaNotification {
|
|
thread_id: "thread".to_string(),
|
|
turn_id: "turn".to_string(),
|
|
item_id: "item".to_string(),
|
|
delta: "stdout".to_string(),
|
|
}
|
|
)
|
|
)
|
|
));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn runtime_start_args_forward_environment_manager() {
|
|
let config = Arc::new(build_test_config().await);
|
|
let environment_manager = Arc::new(EnvironmentManager::new(Some(
|
|
"ws://127.0.0.1:8765".to_string(),
|
|
)));
|
|
|
|
let runtime_args = InProcessClientStartArgs {
|
|
arg0_paths: Arg0DispatchPaths::default(),
|
|
config: config.clone(),
|
|
cli_overrides: Vec::new(),
|
|
loader_overrides: LoaderOverrides::default(),
|
|
cloud_requirements: CloudRequirementsLoader::default(),
|
|
feedback: CodexFeedback::new(),
|
|
environment_manager: environment_manager.clone(),
|
|
config_warnings: Vec::new(),
|
|
session_source: SessionSource::Exec,
|
|
enable_codex_api_key_env: false,
|
|
client_name: "codex-app-server-client-test".to_string(),
|
|
client_version: "0.0.0-test".to_string(),
|
|
experimental_api: true,
|
|
opt_out_notification_methods: Vec::new(),
|
|
channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
|
|
}
|
|
.into_runtime_start_args();
|
|
|
|
assert_eq!(runtime_args.config, config);
|
|
assert!(Arc::ptr_eq(
|
|
&runtime_args.environment_manager,
|
|
&environment_manager
|
|
));
|
|
assert!(runtime_args.environment_manager.is_remote());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn shutdown_completes_promptly_without_retained_managers() {
|
|
let client = start_test_client(SessionSource::Cli).await;
|
|
|
|
timeout(Duration::from_secs(1), client.shutdown())
|
|
.await
|
|
.expect("shutdown should not wait for the 5s fallback timeout")
|
|
.expect("shutdown should complete");
|
|
}
|
|
}
|