mirror of
https://github.com/openai/codex.git
synced 2026-05-21 19:45:26 +00:00
Compare commits
2 Commits
xli-codex/
...
etraut/cap
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a73999abc7 | ||
|
|
88b816c031 |
15
codex-rs/Cargo.lock
generated
15
codex-rs/Cargo.lock
generated
@@ -1821,6 +1821,7 @@ dependencies = [
|
||||
"codex-plugin",
|
||||
"codex-protocol",
|
||||
"codex-utils-absolute-path",
|
||||
"codex-utils-log",
|
||||
"os_info",
|
||||
"pretty_assertions",
|
||||
"serde",
|
||||
@@ -1852,6 +1853,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"codex-client",
|
||||
"codex-protocol",
|
||||
"codex-utils-log",
|
||||
"codex-utils-rustls-provider",
|
||||
"eventsource-stream",
|
||||
"futures",
|
||||
@@ -1926,6 +1928,7 @@ dependencies = [
|
||||
"codex-utils-cargo-bin",
|
||||
"codex-utils-cli",
|
||||
"codex-utils-json-to-toml",
|
||||
"codex-utils-log",
|
||||
"codex-utils-pty",
|
||||
"core_test_support",
|
||||
"flate2",
|
||||
@@ -2532,6 +2535,7 @@ dependencies = [
|
||||
"codex-utils-cargo-bin",
|
||||
"codex-utils-home-dir",
|
||||
"codex-utils-image",
|
||||
"codex-utils-log",
|
||||
"codex-utils-output-truncation",
|
||||
"codex-utils-path",
|
||||
"codex-utils-plugins",
|
||||
@@ -3049,6 +3053,7 @@ dependencies = [
|
||||
"codex-otel",
|
||||
"codex-protocol",
|
||||
"codex-terminal-detection",
|
||||
"codex-utils-log",
|
||||
"codex-utils-template",
|
||||
"core_test_support",
|
||||
"jsonwebtoken",
|
||||
@@ -3493,6 +3498,7 @@ dependencies = [
|
||||
"codex-protocol",
|
||||
"codex-utils-cargo-bin",
|
||||
"codex-utils-home-dir",
|
||||
"codex-utils-log",
|
||||
"codex-utils-pty",
|
||||
"futures",
|
||||
"keyring",
|
||||
@@ -3800,6 +3806,7 @@ dependencies = [
|
||||
"codex-utils-elapsed",
|
||||
"codex-utils-fuzzy-match",
|
||||
"codex-utils-home-dir",
|
||||
"codex-utils-log",
|
||||
"codex-utils-oss",
|
||||
"codex-utils-path",
|
||||
"codex-utils-plugins",
|
||||
@@ -3964,6 +3971,14 @@ dependencies = [
|
||||
"toml 0.9.11+spec-1.1.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-utils-log"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"pretty_assertions",
|
||||
"sha2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-utils-oss"
|
||||
version = "0.0.0"
|
||||
|
||||
@@ -84,6 +84,7 @@ members = [
|
||||
"utils/cache",
|
||||
"utils/image",
|
||||
"utils/json-to-toml",
|
||||
"utils/log",
|
||||
"utils/home-dir",
|
||||
"utils/pty",
|
||||
"utils/readiness",
|
||||
@@ -221,6 +222,7 @@ codex-utils-fuzzy-match = { path = "utils/fuzzy-match" }
|
||||
codex-utils-home-dir = { path = "utils/home-dir" }
|
||||
codex-utils-image = { path = "utils/image" }
|
||||
codex-utils-json-to-toml = { path = "utils/json-to-toml" }
|
||||
codex-utils-log = { path = "utils/log" }
|
||||
codex-utils-oss = { path = "utils/oss" }
|
||||
codex-utils-output-truncation = { path = "utils/output-truncation" }
|
||||
codex-utils-path = { path = "utils/path-utils" }
|
||||
|
||||
@@ -19,6 +19,7 @@ codex-login = { workspace = true }
|
||||
codex-model-provider = { workspace = true }
|
||||
codex-plugin = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-utils-log = { workspace = true }
|
||||
os_info = { workspace = true }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
|
||||
@@ -34,6 +34,7 @@ use codex_login::CodexAuth;
|
||||
use codex_login::default_client::create_client;
|
||||
use codex_plugin::PluginTelemetryMetadata;
|
||||
use codex_protocol::request_permissions::RequestPermissionsResponse;
|
||||
use codex_utils_log::bounded_str;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
@@ -454,7 +455,7 @@ async fn send_track_events_request(auth: &CodexAuth, url: &str, events: Vec<Trac
|
||||
Ok(response) => {
|
||||
let status = response.status();
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
tracing::warn!("events failed with status {status}: {body}");
|
||||
tracing::warn!("events failed with status {status}: {}", bounded_str(&body));
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!("failed to send events request: {err}");
|
||||
|
||||
@@ -71,6 +71,7 @@ codex-thread-store = { workspace = true }
|
||||
codex-tools = { workspace = true }
|
||||
codex-utils-absolute-path = { workspace = true }
|
||||
codex-utils-json-to-toml = { workspace = true }
|
||||
codex-utils-log = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
futures = { workspace = true }
|
||||
|
||||
@@ -79,6 +79,7 @@ use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use codex_rollout::StateDbHandle;
|
||||
use codex_state::log_db::LogDbLayer;
|
||||
use codex_utils_log::bounded_debug;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio::sync::broadcast;
|
||||
@@ -615,14 +616,14 @@ impl MessageProcessor {
|
||||
pub(crate) async fn process_notification(&self, notification: JSONRPCNotification) {
|
||||
// Currently, we do not expect to receive any notifications from the
|
||||
// client, so we just log them.
|
||||
tracing::info!("<- notification: {:?}", notification);
|
||||
tracing::info!("<- notification: {}", bounded_debug(¬ification));
|
||||
}
|
||||
|
||||
/// Handles typed notifications from in-process clients.
|
||||
pub(crate) async fn process_client_notification(&self, notification: ClientNotification) {
|
||||
// Currently, we do not expect to receive any typed notifications from
|
||||
// in-process clients, so we just log them.
|
||||
tracing::info!("<- typed notification: {:?}", notification);
|
||||
tracing::info!("<- typed notification: {}", bounded_debug(¬ification));
|
||||
}
|
||||
|
||||
async fn run_request_with_context<F>(
|
||||
@@ -722,14 +723,14 @@ impl MessageProcessor {
|
||||
|
||||
/// Handle a standalone JSON-RPC response originating from the peer.
|
||||
pub(crate) async fn process_response(&self, response: JSONRPCResponse) {
|
||||
tracing::info!("<- response: {:?}", response);
|
||||
tracing::info!("<- response: {}", bounded_debug(&response));
|
||||
let JSONRPCResponse { id, result, .. } = response;
|
||||
self.outgoing.notify_client_response(id, result).await
|
||||
}
|
||||
|
||||
/// Handle an error object received from the peer.
|
||||
pub(crate) async fn process_error(&self, err: JSONRPCError) {
|
||||
tracing::error!("<- error: {:?}", err);
|
||||
tracing::error!("<- error: {}", bounded_debug(&err));
|
||||
self.outgoing.notify_client_error(err.id, err.error).await;
|
||||
}
|
||||
|
||||
|
||||
@@ -18,11 +18,15 @@ use codex_otel::span_w3c_trace_context;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use codex_protocol::request_permissions::RequestPermissionsResponse;
|
||||
use codex_utils_log::bounded_debug;
|
||||
use codex_utils_log::bounded_display;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::Instrument;
|
||||
use tracing::Level;
|
||||
use tracing::Span;
|
||||
use tracing::enabled;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::error_code::internal_error;
|
||||
@@ -342,7 +346,10 @@ impl OutgoingMessageSender {
|
||||
};
|
||||
|
||||
if let Err(err) = send_result {
|
||||
warn!("failed to send request {outgoing_message_id:?} to client: {err:?}");
|
||||
warn!(
|
||||
"failed to send request {outgoing_message_id:?} to client: {}",
|
||||
bounded_debug(&err)
|
||||
);
|
||||
let mut request_id_to_callback = self.request_id_to_callback.lock().await;
|
||||
request_id_to_callback.remove(&outgoing_message_id);
|
||||
}
|
||||
@@ -365,7 +372,10 @@ impl OutgoingMessageSender {
|
||||
})
|
||||
.await
|
||||
{
|
||||
warn!("failed to resend request to client: {err:?}");
|
||||
warn!(
|
||||
"failed to resend request to client: {}",
|
||||
bounded_debug(&err)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -383,7 +393,10 @@ impl OutgoingMessageSender {
|
||||
.track_server_response(completed_at_ms, response);
|
||||
}
|
||||
if let Err(err) = entry.callback.send(Ok(result)) {
|
||||
warn!("could not notify callback for {id:?} due to: {err:?}");
|
||||
warn!(
|
||||
"could not notify callback for {id:?} due to: {}",
|
||||
bounded_debug(&err)
|
||||
);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
@@ -397,11 +410,17 @@ impl OutgoingMessageSender {
|
||||
|
||||
match entry {
|
||||
Some((id, entry)) => {
|
||||
warn!("client responded with error for {id:?}: {error:?}");
|
||||
warn!(
|
||||
"client responded with error for {id:?}: {}",
|
||||
bounded_debug(&error)
|
||||
);
|
||||
self.analytics_events_client
|
||||
.track_server_request_aborted(now_unix_timestamp_ms(), id.clone());
|
||||
if let Err(err) = entry.callback.send(Err(error)) {
|
||||
warn!("could not notify callback for {id:?} due to: {err:?}");
|
||||
warn!(
|
||||
"could not notify callback for {id:?} due to: {}",
|
||||
bounded_debug(&err)
|
||||
);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
@@ -437,7 +456,10 @@ impl OutgoingMessageSender {
|
||||
&& let Err(err) = entry.callback.send(Err(error.clone()))
|
||||
{
|
||||
let request_id = entry.request.id();
|
||||
warn!("could not notify callback for {request_id:?} due to: {err:?}");
|
||||
warn!(
|
||||
"could not notify callback for {request_id:?} due to: {}",
|
||||
bounded_debug(&err)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -495,7 +517,10 @@ impl OutgoingMessageSender {
|
||||
&& let Err(err) = entry.callback.send(Err(error.clone()))
|
||||
{
|
||||
let request_id = entry.request.id();
|
||||
warn!("could not notify callback for {request_id:?} due to: {err:?}",);
|
||||
warn!(
|
||||
"could not notify callback for {request_id:?} due to: {}",
|
||||
bounded_debug(&err)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -560,10 +585,13 @@ impl OutgoingMessageSender {
|
||||
connection_ids: &[ConnectionId],
|
||||
notification: ServerNotification,
|
||||
) {
|
||||
tracing::trace!(
|
||||
targeted_connections = connection_ids.len(),
|
||||
"app-server event: {notification}"
|
||||
);
|
||||
if enabled!(Level::TRACE) {
|
||||
let notification_log = bounded_display(¬ification);
|
||||
tracing::trace!(
|
||||
targeted_connections = connection_ids.len(),
|
||||
"app-server event: {notification_log}"
|
||||
);
|
||||
}
|
||||
let outgoing_message = OutgoingMessage::AppServerNotification(notification.clone());
|
||||
if connection_ids.is_empty() {
|
||||
if let Err(err) = self
|
||||
@@ -573,7 +601,10 @@ impl OutgoingMessageSender {
|
||||
})
|
||||
.await
|
||||
{
|
||||
warn!("failed to send server notification to client: {err:?}");
|
||||
warn!(
|
||||
"failed to send server notification to client: {}",
|
||||
bounded_debug(&err)
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@@ -587,7 +618,10 @@ impl OutgoingMessageSender {
|
||||
})
|
||||
.await
|
||||
{
|
||||
warn!("failed to send server notification to client: {err:?}");
|
||||
warn!(
|
||||
"failed to send server notification to client: {}",
|
||||
bounded_debug(&err)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -597,7 +631,9 @@ impl OutgoingMessageSender {
|
||||
connection_id: ConnectionId,
|
||||
notification: ServerNotification,
|
||||
) {
|
||||
tracing::trace!("app-server event: {notification}");
|
||||
if enabled!(Level::TRACE) {
|
||||
tracing::trace!("app-server event: {}", bounded_display(¬ification));
|
||||
}
|
||||
let outgoing_message = OutgoingMessage::AppServerNotification(notification.clone());
|
||||
let (write_complete_tx, write_complete_rx) = oneshot::channel();
|
||||
if let Err(err) = self
|
||||
@@ -609,7 +645,10 @@ impl OutgoingMessageSender {
|
||||
})
|
||||
.await
|
||||
{
|
||||
warn!("failed to send server notification to client: {err:?}");
|
||||
warn!(
|
||||
"failed to send server notification to client: {}",
|
||||
bounded_debug(&err)
|
||||
);
|
||||
}
|
||||
let _ = write_complete_rx.await;
|
||||
}
|
||||
@@ -678,7 +717,10 @@ impl OutgoingMessageSender {
|
||||
};
|
||||
|
||||
if let Err(err) = send_result {
|
||||
warn!("failed to send {message_kind} to client: {err:?}");
|
||||
warn!(
|
||||
"failed to send {message_kind} to client: {}",
|
||||
bounded_debug(&err)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ bytes = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
codex-client = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-utils-log = { workspace = true }
|
||||
codex-utils-rustls-provider = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
http = { workspace = true }
|
||||
|
||||
@@ -6,6 +6,7 @@ use codex_client::HttpTransport;
|
||||
use codex_client::RequestTelemetry;
|
||||
use codex_protocol::openai_models::ModelInfo;
|
||||
use codex_protocol::openai_models::ModelsResponse;
|
||||
use codex_utils_log::bounded_bytes_lossy;
|
||||
use http::HeaderMap;
|
||||
use http::Method;
|
||||
use http::header::ETAG;
|
||||
@@ -65,7 +66,7 @@ impl<T: HttpTransport> ModelsClient<T> {
|
||||
.map_err(|e| {
|
||||
ApiError::Stream(format!(
|
||||
"failed to decode models response: {e}; body: {}",
|
||||
String::from_utf8_lossy(&resp.body)
|
||||
bounded_bytes_lossy(&resp.body)
|
||||
))
|
||||
})?;
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ use bytes::Bytes;
|
||||
use codex_client::HttpTransport;
|
||||
use codex_client::RequestBody;
|
||||
use codex_client::RequestTelemetry;
|
||||
use codex_utils_log::bounded_str;
|
||||
use http::HeaderMap;
|
||||
use http::HeaderValue;
|
||||
use http::Method;
|
||||
@@ -119,7 +120,7 @@ impl<T: HttpTransport> RealtimeCallClient<T> {
|
||||
session_config: RealtimeSessionConfig,
|
||||
extra_headers: HeaderMap,
|
||||
) -> Result<RealtimeCallResponse, ApiError> {
|
||||
trace!(target: "codex_api::realtime_websocket::wire", "realtime call request SDP: {sdp}");
|
||||
trace!(target: "codex_api::realtime_websocket::wire", "realtime call request SDP: {}", bounded_str(&sdp));
|
||||
// WebRTC can begin inference as soon as the peer connection comes up, so the initial
|
||||
// session payload is sent with call creation. The sideband WebSocket still sends its normal
|
||||
// session.update after it joins.
|
||||
@@ -202,7 +203,7 @@ fn decode_call_id_from_location(headers: &HeaderMap) -> Result<String, ApiError>
|
||||
.ok_or_else(|| ApiError::Stream("realtime call response missing Location".to_string()))?
|
||||
.to_str()
|
||||
.map_err(|err| ApiError::Stream(format!("invalid realtime call Location: {err}")))?;
|
||||
trace!("realtime call Location: {location}");
|
||||
trace!("realtime call Location: {}", bounded_str(location));
|
||||
|
||||
location
|
||||
.split('?')
|
||||
|
||||
@@ -18,6 +18,8 @@ use crate::provider::Provider;
|
||||
use codex_client::backoff;
|
||||
use codex_client::maybe_build_rustls_client_config_with_custom_ca;
|
||||
use codex_protocol::protocol::RealtimeTranscriptDelta;
|
||||
use codex_utils_log::bounded_debug;
|
||||
use codex_utils_log::bounded_str;
|
||||
use codex_utils_rustls_provider::ensure_rustls_crypto_provider;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
@@ -345,7 +347,7 @@ impl RealtimeWebsocketWriter {
|
||||
async fn send_json(&self, message: &RealtimeOutboundMessage) -> Result<(), ApiError> {
|
||||
let payload = serde_json::to_string(message)
|
||||
.map_err(|err| ApiError::Stream(format!("failed to encode realtime request: {err}")))?;
|
||||
debug!(?message, "realtime websocket request");
|
||||
debug!(message = %bounded_debug(message), "realtime websocket request");
|
||||
self.send_payload(payload).await
|
||||
}
|
||||
|
||||
@@ -356,7 +358,7 @@ impl RealtimeWebsocketWriter {
|
||||
));
|
||||
}
|
||||
|
||||
trace!(target: REALTIME_WIRE_LOG_TARGET, "realtime websocket request: {payload}");
|
||||
trace!(target: REALTIME_WIRE_LOG_TARGET, "realtime websocket request: {}", bounded_str(&payload));
|
||||
self.stream
|
||||
.send(Message::Text(payload.into()))
|
||||
.await
|
||||
@@ -390,10 +392,10 @@ impl RealtimeWebsocketEvents {
|
||||
|
||||
match msg {
|
||||
Message::Text(text) => {
|
||||
trace!(target: REALTIME_WIRE_LOG_TARGET, "realtime websocket event: {text}");
|
||||
trace!(target: REALTIME_WIRE_LOG_TARGET, "realtime websocket event: {}", bounded_str(&text));
|
||||
if let Some(mut event) = parse_realtime_event(&text, self.event_parser) {
|
||||
self.update_active_transcript(&mut event).await;
|
||||
debug!(?event, "realtime websocket parsed event");
|
||||
debug!(event = %bounded_debug(&event), "realtime websocket parsed event");
|
||||
return Ok(Some(event));
|
||||
}
|
||||
debug!("realtime websocket ignored unsupported text frame");
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use codex_protocol::protocol::RealtimeEvent;
|
||||
use codex_protocol::protocol::RealtimeTranscriptDelta;
|
||||
use codex_protocol::protocol::RealtimeTranscriptDone;
|
||||
use codex_utils_log::bounded_str;
|
||||
use serde_json::Value;
|
||||
use tracing::debug;
|
||||
|
||||
@@ -8,7 +9,10 @@ pub(super) fn parse_realtime_payload(payload: &str, parser_name: &str) -> Option
|
||||
let parsed: Value = match serde_json::from_str(payload) {
|
||||
Ok(message) => message,
|
||||
Err(err) => {
|
||||
debug!("failed to parse {parser_name} event: {err}, data: {payload}");
|
||||
debug!(
|
||||
"failed to parse {parser_name} event: {err}, data: {}",
|
||||
bounded_str(payload)
|
||||
);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
@@ -16,7 +20,10 @@ pub(super) fn parse_realtime_payload(payload: &str, parser_name: &str) -> Option
|
||||
let message_type = match parsed.get("type").and_then(Value::as_str) {
|
||||
Some(message_type) => message_type.to_string(),
|
||||
None => {
|
||||
debug!("received {parser_name} event without type field: {payload}");
|
||||
debug!(
|
||||
"received {parser_name} event without type field: {}",
|
||||
bounded_str(payload)
|
||||
);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -6,6 +6,7 @@ use crate::endpoint::realtime_websocket::protocol_common::parse_transcript_done_
|
||||
use codex_protocol::protocol::RealtimeAudioFrame;
|
||||
use codex_protocol::protocol::RealtimeEvent;
|
||||
use codex_protocol::protocol::RealtimeHandoffRequested;
|
||||
use codex_utils_log::bounded_str;
|
||||
use serde_json::Value;
|
||||
use tracing::debug;
|
||||
|
||||
@@ -83,7 +84,10 @@ pub(super) fn parse_realtime_event_v1(payload: &str) -> Option<RealtimeEvent> {
|
||||
}
|
||||
"error" => parse_error_event(&parsed),
|
||||
_ => {
|
||||
debug!("received unsupported realtime v1 event type: {message_type}, data: {payload}");
|
||||
debug!(
|
||||
"received unsupported realtime v1 event type: {message_type}, data: {}",
|
||||
bounded_str(payload)
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use codex_protocol::protocol::RealtimeNoopRequested;
|
||||
use codex_protocol::protocol::RealtimeResponseCancelled;
|
||||
use codex_protocol::protocol::RealtimeResponseCreated;
|
||||
use codex_protocol::protocol::RealtimeResponseDone;
|
||||
use codex_utils_log::bounded_str;
|
||||
use serde_json::Map as JsonMap;
|
||||
use serde_json::Value;
|
||||
use tracing::debug;
|
||||
@@ -72,7 +73,10 @@ pub(super) fn parse_realtime_event_v2(payload: &str) -> Option<RealtimeEvent> {
|
||||
})),
|
||||
"error" => parse_error_event(&parsed),
|
||||
_ => {
|
||||
debug!("received unsupported realtime v2 event type: {message_type}, data: {payload}");
|
||||
debug!(
|
||||
"received unsupported realtime v2 event type: {message_type}, data: {}",
|
||||
bounded_str(payload)
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ use crate::sse::process_responses_event;
|
||||
use crate::telemetry::WebsocketTelemetry;
|
||||
use codex_client::TransportError;
|
||||
use codex_client::maybe_build_rustls_client_config_with_custom_ca;
|
||||
use codex_utils_log::bounded_str;
|
||||
use codex_utils_rustls_provider::ensure_rustls_crypto_provider;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
@@ -702,7 +703,7 @@ async fn run_websocket_response_stream(
|
||||
|
||||
match message {
|
||||
Message::Text(text) => {
|
||||
trace!("websocket event: {text}");
|
||||
trace!("websocket event: {}", bounded_str(&text));
|
||||
if let Some(wrapped_error) = parse_wrapped_websocket_error_event(&text)
|
||||
&& let Some(error) =
|
||||
map_wrapped_websocket_error_event(wrapped_error, text.to_string())
|
||||
@@ -713,7 +714,10 @@ async fn run_websocket_response_stream(
|
||||
let event = match serde_json::from_str::<ResponsesStreamEvent>(&text) {
|
||||
Ok(event) => event,
|
||||
Err(err) => {
|
||||
debug!("failed to parse websocket event: {err}, data: {text}");
|
||||
debug!(
|
||||
"failed to parse websocket event: {err}, data: {}",
|
||||
bounded_str(&text)
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
@@ -787,7 +791,7 @@ async fn send_websocket_request(
|
||||
)));
|
||||
}
|
||||
};
|
||||
trace!("websocket request: {request_text}");
|
||||
trace!("websocket request: {}", bounded_str(&request_text));
|
||||
|
||||
let request_start = Instant::now();
|
||||
let result = tokio::time::timeout(
|
||||
|
||||
@@ -8,6 +8,7 @@ use codex_client::StreamResponse;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::ModelVerification;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_utils_log::bounded_str;
|
||||
use eventsource_stream::Eventsource;
|
||||
use futures::StreamExt;
|
||||
use serde::Deserialize;
|
||||
@@ -415,7 +416,7 @@ pub async fn process_sse(
|
||||
let sse = match response {
|
||||
Ok(Some(Ok(sse))) => sse,
|
||||
Ok(Some(Err(e))) => {
|
||||
debug!("SSE Error: {e:#}");
|
||||
debug!("SSE Error: {}", bounded_str(&format!("{e:#}")));
|
||||
let _ = tx_event.send(Err(ApiError::Stream(e.to_string()))).await;
|
||||
return;
|
||||
}
|
||||
@@ -434,12 +435,15 @@ pub async fn process_sse(
|
||||
}
|
||||
};
|
||||
|
||||
trace!("SSE event: {}", &sse.data);
|
||||
trace!("SSE event: {}", bounded_str(&sse.data));
|
||||
|
||||
let event: ResponsesStreamEvent = match serde_json::from_str(&sse.data) {
|
||||
Ok(event) => event,
|
||||
Err(e) => {
|
||||
debug!("Failed to parse SSE event: {e}, data: {}", &sse.data);
|
||||
debug!(
|
||||
"Failed to parse SSE event: {e}, data: {}",
|
||||
bounded_str(&sse.data)
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -65,6 +65,7 @@ codex-utils-absolute-path = { workspace = true }
|
||||
codex-utils-cache = { workspace = true }
|
||||
codex-utils-image = { workspace = true }
|
||||
codex-utils-home-dir = { workspace = true }
|
||||
codex-utils-log = { workspace = true }
|
||||
codex-utils-output-truncation = { workspace = true }
|
||||
codex-utils-path = { workspace = true }
|
||||
codex-utils-plugins = { workspace = true }
|
||||
|
||||
@@ -80,6 +80,7 @@ use codex_rmcp_client::ElicitationAction;
|
||||
use codex_rmcp_client::ElicitationResponse;
|
||||
use codex_rollout::state_db;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_log::bounded_debug;
|
||||
use codex_utils_output_truncation::TruncationPolicy;
|
||||
use codex_utils_output_truncation::truncate_text;
|
||||
use codex_utils_pty::DEFAULT_OUTPUT_BYTES_CAP;
|
||||
@@ -373,7 +374,7 @@ async fn handle_approved_mcp_tool_call(
|
||||
))
|
||||
.await;
|
||||
if let Err(error) = &result {
|
||||
tracing::warn!("MCP tool call error: {error:?}");
|
||||
tracing::warn!("MCP tool call error: {}", bounded_debug(error));
|
||||
}
|
||||
let duration = start.elapsed();
|
||||
notify_mcp_tool_call_completed(
|
||||
|
||||
@@ -45,6 +45,7 @@ use codex_protocol::protocol::RealtimeHandoffRequested;
|
||||
use codex_protocol::protocol::RealtimeOutputModality;
|
||||
use codex_protocol::protocol::RealtimeVoice;
|
||||
use codex_protocol::protocol::RealtimeVoicesList;
|
||||
use codex_utils_log::bounded_debug;
|
||||
use http::HeaderMap;
|
||||
use http::HeaderValue;
|
||||
use http::header::AUTHORIZATION;
|
||||
@@ -834,7 +835,7 @@ async fn handle_start_inner(
|
||||
RealtimeEvent::AudioOut(_) => {}
|
||||
_ => {
|
||||
info!(
|
||||
event = ?event,
|
||||
event = %bounded_debug(&event),
|
||||
"received realtime conversation event"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::realtime_conversation::handle_text as handle_realtime_conversation_te
|
||||
use async_channel::Receiver;
|
||||
use codex_otel::set_parent_from_w3c_trace_context;
|
||||
use codex_protocol::protocol::Submission;
|
||||
use codex_utils_log::bounded_debug;
|
||||
use tracing::Instrument;
|
||||
use tracing::debug_span;
|
||||
use tracing::info_span;
|
||||
@@ -738,7 +739,7 @@ pub(super) async fn submission_loop(
|
||||
// To break out of this loop, send Op::Shutdown.
|
||||
let mut shutdown_received = false;
|
||||
while let Ok(sub) = rx_sub.recv().await {
|
||||
debug!(?sub, "Submission");
|
||||
debug!(submission = %bounded_debug(&sub), "Submission");
|
||||
let dispatch_span = submission_dispatch_span(&sub);
|
||||
let should_exit = async {
|
||||
match sub.op.clone() {
|
||||
|
||||
@@ -3,6 +3,8 @@ use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::Ordering;
|
||||
|
||||
use codex_utils_log::bounded_str;
|
||||
|
||||
use crate::SkillInjections;
|
||||
use crate::SkillLoadOutcome;
|
||||
use crate::build_skill_injections;
|
||||
@@ -662,7 +664,7 @@ pub(crate) async fn run_turn(
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
info!("Turn error: {e:#}");
|
||||
info!("Turn error: {}", bounded_str(&format!("{e:#}")));
|
||||
let event = EventMsg::Error(e.to_error_event(/*message_prefix*/ None));
|
||||
sess.send_event(&turn_context, event).await;
|
||||
// let the user continue the conversation
|
||||
|
||||
@@ -29,6 +29,7 @@ use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_rollout::state_db;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use codex_utils_log::bounded_debug;
|
||||
use codex_utils_stream_parser::strip_proposed_plan_blocks;
|
||||
use futures::Future;
|
||||
use tracing::debug;
|
||||
@@ -454,7 +455,7 @@ pub(crate) async fn handle_non_tool_response_item(
|
||||
item: &ResponseItem,
|
||||
plan_mode: bool,
|
||||
) -> Option<TurnItem> {
|
||||
debug!(?item, "Output item");
|
||||
debug!(item = %bounded_debug(item), "Output item");
|
||||
|
||||
match item {
|
||||
ResponseItem::Message { .. }
|
||||
|
||||
@@ -20,6 +20,7 @@ codex-model-provider-info = { workspace = true }
|
||||
codex-otel = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-terminal-detection = { workspace = true }
|
||||
codex-utils-log = { workspace = true }
|
||||
codex-utils-template = { workspace = true }
|
||||
once_cell = { workspace = true }
|
||||
os_info = { workspace = true }
|
||||
|
||||
@@ -42,6 +42,7 @@ use codex_protocol::account::PlanType as AccountPlanType;
|
||||
use codex_protocol::auth::PlanType as InternalPlanType;
|
||||
use codex_protocol::auth::RefreshTokenFailedError;
|
||||
use codex_protocol::auth::RefreshTokenFailedReason;
|
||||
use codex_utils_log::bounded_str;
|
||||
use serde_json::Value;
|
||||
use thiserror::Error;
|
||||
|
||||
@@ -842,7 +843,7 @@ async fn request_chatgpt_token_refresh(
|
||||
Ok(refresh_response)
|
||||
} else {
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
tracing::error!("Failed to refresh token: {status}: {body}");
|
||||
tracing::error!("Failed to refresh token: {status}: {}", bounded_str(&body));
|
||||
if status == StatusCode::UNAUTHORIZED {
|
||||
let failed = classify_refresh_token_failure(&body);
|
||||
Err(RefreshTokenError::Permanent(failed))
|
||||
@@ -869,7 +870,7 @@ fn classify_refresh_token_failure(body: &str) -> RefreshTokenFailedError {
|
||||
if reason == RefreshTokenFailedReason::Other {
|
||||
tracing::warn!(
|
||||
backend_code = normalized_code.as_deref(),
|
||||
backend_body = body,
|
||||
backend_body = %bounded_str(body),
|
||||
"Encountered unknown 401 response while refreshing token"
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use codex_utils_log::bounded_str;
|
||||
use tracing::debug;
|
||||
|
||||
pub(crate) fn try_parse_error_message(text: &str) -> String {
|
||||
debug!("Parsing server error response: {}", text);
|
||||
debug!("Parsing server error response: {}", bounded_str(text));
|
||||
let json = serde_json::from_str::<serde_json::Value>(text).unwrap_or_default();
|
||||
if let Some(error) = json.get("error")
|
||||
&& let Some(message) = error.get("message")
|
||||
|
||||
@@ -20,6 +20,7 @@ codex-config = { workspace = true }
|
||||
codex-exec-server = { workspace = true }
|
||||
codex-keyring-store = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
codex-utils-log = { workspace = true }
|
||||
codex-utils-pty = { workspace = true }
|
||||
codex-utils-home-dir = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
|
||||
@@ -20,7 +20,6 @@
|
||||
|
||||
use std::future::Future;
|
||||
use std::io;
|
||||
use std::mem::take;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
@@ -41,9 +40,10 @@ use serde_json::to_vec;
|
||||
use tokio::runtime::Handle;
|
||||
use tokio::sync::broadcast;
|
||||
use tracing::debug;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::stderr_log::StderrLogBuffer;
|
||||
|
||||
static PROCESS_COUNTER: AtomicUsize = AtomicUsize::new(1);
|
||||
|
||||
// Remote public implementation.
|
||||
@@ -76,7 +76,7 @@ pub(super) struct ExecutorProcessTransport {
|
||||
stdout: Vec<u8>,
|
||||
|
||||
/// Buffered stderr bytes for diagnostic logging.
|
||||
stderr: Vec<u8>,
|
||||
stderr: StderrLogBuffer,
|
||||
|
||||
/// Whether the executor has reported process closure or a terminal
|
||||
/// subscription failure. Once closed, any remaining partial stdout line is
|
||||
@@ -101,12 +101,13 @@ impl ExecutorProcessTransport {
|
||||
// process event log will replay anything that landed before this
|
||||
// subscriber was attached.
|
||||
let events = process.subscribe_events();
|
||||
let stderr = StderrLogBuffer::new(program_name.clone());
|
||||
Self {
|
||||
process,
|
||||
events,
|
||||
program_name,
|
||||
stdout: Vec::new(),
|
||||
stderr: Vec::new(),
|
||||
stderr,
|
||||
closed: false,
|
||||
terminated: false,
|
||||
last_seq: 0,
|
||||
@@ -312,33 +313,11 @@ impl ExecutorProcessTransport {
|
||||
}
|
||||
|
||||
fn push_stderr(&mut self, bytes: &[u8]) {
|
||||
// Keep stderr line-oriented in logs so a chatty MCP server does not
|
||||
// produce one log record per byte chunk.
|
||||
self.stderr.extend_from_slice(bytes);
|
||||
while let Some(index) = self.stderr.iter().position(|byte| *byte == b'\n') {
|
||||
let mut line = self.stderr.drain(..=index).collect::<Vec<_>>();
|
||||
line.pop();
|
||||
if line.last() == Some(&b'\r') {
|
||||
line.pop();
|
||||
}
|
||||
info!(
|
||||
"MCP server stderr ({}): {}",
|
||||
self.program_name,
|
||||
String::from_utf8_lossy(&line)
|
||||
);
|
||||
}
|
||||
self.stderr.push(bytes);
|
||||
}
|
||||
|
||||
fn flush_stderr(&mut self) {
|
||||
if self.stderr.is_empty() {
|
||||
return;
|
||||
}
|
||||
let line = take(&mut self.stderr);
|
||||
info!(
|
||||
"MCP server stderr ({}): {}",
|
||||
self.program_name,
|
||||
String::from_utf8_lossy(&line)
|
||||
);
|
||||
self.stderr.flush();
|
||||
}
|
||||
|
||||
fn trim_trailing_carriage_return(mut line: Vec<u8>) -> Vec<u8> {
|
||||
|
||||
@@ -8,6 +8,7 @@ mod oauth;
|
||||
mod perform_oauth_login;
|
||||
mod program_resolver;
|
||||
mod rmcp_client;
|
||||
mod stderr_log;
|
||||
mod stdio_server_launcher;
|
||||
mod utils;
|
||||
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_utils_log::bounded_debug;
|
||||
use codex_utils_log::bounded_display;
|
||||
use rmcp::ClientHandler;
|
||||
use rmcp::RoleClient;
|
||||
use rmcp::model::CancelledNotificationParam;
|
||||
@@ -12,7 +14,9 @@ use rmcp::model::ProgressNotificationParam;
|
||||
use rmcp::model::ResourceUpdatedNotificationParam;
|
||||
use rmcp::service::NotificationContext;
|
||||
use rmcp::service::RequestContext;
|
||||
use tracing::Level;
|
||||
use tracing::debug;
|
||||
use tracing::enabled;
|
||||
use tracing::error;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
@@ -51,9 +55,11 @@ impl ClientHandler for LoggingClientHandler {
|
||||
params: CancelledNotificationParam,
|
||||
_context: NotificationContext<RoleClient>,
|
||||
) {
|
||||
let request_id = bounded_display(¶ms.request_id);
|
||||
let reason = bounded_debug(¶ms.reason);
|
||||
info!(
|
||||
"MCP server cancelled request (request_id: {}, reason: {:?})",
|
||||
params.request_id, params.reason
|
||||
"MCP server cancelled request (request_id: {}, reason: {})",
|
||||
request_id, reason
|
||||
);
|
||||
}
|
||||
|
||||
@@ -62,9 +68,11 @@ impl ClientHandler for LoggingClientHandler {
|
||||
params: ProgressNotificationParam,
|
||||
_context: NotificationContext<RoleClient>,
|
||||
) {
|
||||
let progress_token = bounded_debug(¶ms.progress_token);
|
||||
let message = bounded_debug(¶ms.message);
|
||||
info!(
|
||||
"MCP server progress notification (token: {:?}, progress: {}, total: {:?}, message: {:?})",
|
||||
params.progress_token, params.progress, params.total, params.message
|
||||
"MCP server progress notification (token: {}, progress: {}, total: {:?}, message: {})",
|
||||
progress_token, params.progress, params.total, message
|
||||
);
|
||||
}
|
||||
|
||||
@@ -73,7 +81,8 @@ impl ClientHandler for LoggingClientHandler {
|
||||
params: ResourceUpdatedNotificationParam,
|
||||
_context: NotificationContext<RoleClient>,
|
||||
) {
|
||||
info!("MCP server resource updated (uri: {})", params.uri);
|
||||
let uri = bounded_display(¶ms.uri);
|
||||
info!("MCP server resource updated (uri: {})", uri);
|
||||
}
|
||||
|
||||
async fn on_resource_list_changed(&self, _context: NotificationContext<RoleClient>) {
|
||||
@@ -108,28 +117,44 @@ impl ClientHandler for LoggingClientHandler {
|
||||
| LoggingLevel::Alert
|
||||
| LoggingLevel::Critical
|
||||
| LoggingLevel::Error => {
|
||||
error!(
|
||||
"MCP server log message (level: {:?}, logger: {:?}, data: {})",
|
||||
level, logger, data
|
||||
);
|
||||
if enabled!(Level::ERROR) {
|
||||
let logger = bounded_debug(&logger);
|
||||
let data = bounded_display(&data);
|
||||
error!(
|
||||
"MCP server log message (level: {:?}, logger: {}, data: {})",
|
||||
level, logger, data
|
||||
);
|
||||
}
|
||||
}
|
||||
LoggingLevel::Warning => {
|
||||
warn!(
|
||||
"MCP server log message (level: {:?}, logger: {:?}, data: {})",
|
||||
level, logger, data
|
||||
);
|
||||
if enabled!(Level::WARN) {
|
||||
let logger = bounded_debug(&logger);
|
||||
let data = bounded_display(&data);
|
||||
warn!(
|
||||
"MCP server log message (level: {:?}, logger: {}, data: {})",
|
||||
level, logger, data
|
||||
);
|
||||
}
|
||||
}
|
||||
LoggingLevel::Notice | LoggingLevel::Info => {
|
||||
info!(
|
||||
"MCP server log message (level: {:?}, logger: {:?}, data: {})",
|
||||
level, logger, data
|
||||
);
|
||||
if enabled!(Level::INFO) {
|
||||
let logger = bounded_debug(&logger);
|
||||
let data = bounded_display(&data);
|
||||
info!(
|
||||
"MCP server log message (level: {:?}, logger: {}, data: {})",
|
||||
level, logger, data
|
||||
);
|
||||
}
|
||||
}
|
||||
LoggingLevel::Debug => {
|
||||
debug!(
|
||||
"MCP server log message (level: {:?}, logger: {:?}, data: {})",
|
||||
level, logger, data
|
||||
);
|
||||
if enabled!(Level::DEBUG) {
|
||||
let logger = bounded_debug(&logger);
|
||||
let data = bounded_display(&data);
|
||||
debug!(
|
||||
"MCP server log message (level: {:?}, logger: {}, data: {})",
|
||||
level, logger, data
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
110
codex-rs/rmcp-client/src/stderr_log.rs
Normal file
110
codex-rs/rmcp-client/src/stderr_log.rs
Normal file
@@ -0,0 +1,110 @@
|
||||
use codex_utils_log::DEFAULT_BOUNDED_LOG_VALUE_BYTES;
|
||||
use codex_utils_log::bounded_bytes_lossy;
|
||||
use tracing::info;
|
||||
|
||||
pub(crate) struct StderrLogBuffer {
|
||||
program_name: String,
|
||||
buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl StderrLogBuffer {
|
||||
pub(crate) fn new(program_name: String) -> Self {
|
||||
Self {
|
||||
program_name,
|
||||
buffer: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn push(&mut self, mut bytes: &[u8]) {
|
||||
while !bytes.is_empty() {
|
||||
if let Some(newline_index) = bytes.iter().position(|byte| *byte == b'\n') {
|
||||
self.push_without_newline(&bytes[..newline_index]);
|
||||
self.log_complete_line();
|
||||
bytes = &bytes[newline_index + 1..];
|
||||
} else {
|
||||
self.push_without_newline(bytes);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn flush(&mut self) {
|
||||
if self.buffer.is_empty() {
|
||||
return;
|
||||
}
|
||||
self.log_line("MCP server stderr");
|
||||
self.buffer.clear();
|
||||
}
|
||||
|
||||
fn push_without_newline(&mut self, mut bytes: &[u8]) {
|
||||
while !bytes.is_empty() {
|
||||
let remaining_capacity =
|
||||
DEFAULT_BOUNDED_LOG_VALUE_BYTES.saturating_sub(self.buffer.len());
|
||||
if remaining_capacity == 0 {
|
||||
self.log_line("MCP server stderr line exceeded limit");
|
||||
self.buffer.clear();
|
||||
continue;
|
||||
}
|
||||
|
||||
let chunk_len = remaining_capacity.min(bytes.len());
|
||||
self.buffer.extend_from_slice(&bytes[..chunk_len]);
|
||||
bytes = &bytes[chunk_len..];
|
||||
|
||||
if self.buffer.len() >= DEFAULT_BOUNDED_LOG_VALUE_BYTES {
|
||||
self.log_line("MCP server stderr line exceeded limit");
|
||||
self.buffer.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn log_complete_line(&mut self) {
|
||||
if self.buffer.last() == Some(&b'\r') {
|
||||
self.buffer.pop();
|
||||
}
|
||||
if self.buffer.is_empty() {
|
||||
return;
|
||||
}
|
||||
self.log_line("MCP server stderr");
|
||||
self.buffer.clear();
|
||||
}
|
||||
|
||||
fn log_line(&self, label: &str) {
|
||||
info!(
|
||||
"{} ({}): {}",
|
||||
label,
|
||||
self.program_name,
|
||||
bounded_bytes_lossy(&self.buffer)
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn buffered_len(&self) -> usize {
|
||||
self.buffer.len()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn push_without_newline_does_not_grow_past_log_limit() {
|
||||
let mut buffer = StderrLogBuffer::new("server".to_string());
|
||||
let bytes = vec![b'a'; DEFAULT_BOUNDED_LOG_VALUE_BYTES * 2 + 17];
|
||||
|
||||
buffer.push(&bytes);
|
||||
|
||||
assert_eq!(17, buffer.buffered_len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn push_newline_clears_buffer() {
|
||||
let mut buffer = StderrLogBuffer::new("server".to_string());
|
||||
|
||||
buffer.push(b"hello\n");
|
||||
|
||||
assert_eq!(0, buffer.buffered_len());
|
||||
}
|
||||
}
|
||||
@@ -46,14 +46,13 @@ use rmcp::service::RxJsonRpcMessage;
|
||||
use rmcp::service::TxJsonRpcMessage;
|
||||
use rmcp::transport::Transport;
|
||||
use rmcp::transport::child_process::TokioChildProcess;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::process::Command;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::executor_process_transport::ExecutorProcessTransport;
|
||||
use crate::program_resolver;
|
||||
use crate::stderr_log::StderrLogBuffer;
|
||||
use crate::utils::create_env_for_mcp_server;
|
||||
use crate::utils::create_env_overlay_for_remote_mcp_server;
|
||||
use crate::utils::remote_mcp_env_var_names;
|
||||
@@ -272,13 +271,16 @@ impl LocalStdioServerLauncher {
|
||||
|
||||
if let Some(stderr) = stderr {
|
||||
tokio::spawn(async move {
|
||||
let mut reader = BufReader::new(stderr).lines();
|
||||
let mut stderr = stderr;
|
||||
let mut stderr_log = StderrLogBuffer::new(program_name.clone());
|
||||
let mut buffer = [0_u8; 8192];
|
||||
loop {
|
||||
match reader.next_line().await {
|
||||
Ok(Some(line)) => {
|
||||
info!("MCP server stderr ({program_name}): {line}");
|
||||
match stderr.read(&mut buffer).await {
|
||||
Ok(0) => {
|
||||
stderr_log.flush();
|
||||
break;
|
||||
}
|
||||
Ok(None) => break,
|
||||
Ok(bytes_read) => stderr_log.push(&buffer[..bytes_read]),
|
||||
Err(error) => {
|
||||
warn!("Failed to read MCP server stderr ({program_name}): {error}");
|
||||
break;
|
||||
|
||||
@@ -62,6 +62,7 @@ codex-utils-cli = { workspace = true }
|
||||
codex-utils-elapsed = { workspace = true }
|
||||
codex-utils-fuzzy-match = { workspace = true }
|
||||
codex-utils-home-dir = { workspace = true }
|
||||
codex-utils-log = { workspace = true }
|
||||
codex-utils-oss = { workspace = true }
|
||||
codex-utils-path = { workspace = true }
|
||||
codex-utils-plugins = { workspace = true }
|
||||
|
||||
@@ -121,7 +121,10 @@ impl App {
|
||||
};
|
||||
|
||||
if let Err(err) = result {
|
||||
tracing::warn!("failed to enqueue app-server notification: {err}");
|
||||
tracing::warn!(
|
||||
"failed to enqueue app-server notification: {}",
|
||||
codex_utils_log::bounded_display(&err)
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@@ -180,7 +183,10 @@ impl App {
|
||||
self.enqueue_thread_request(thread_id, request).await
|
||||
};
|
||||
if let Err(err) = result {
|
||||
tracing::warn!("failed to enqueue app-server request: {err}");
|
||||
tracing::warn!(
|
||||
"failed to enqueue app-server request: {}",
|
||||
codex_utils_log::bounded_display(&err)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,7 +39,10 @@ impl AppEventSender {
|
||||
session_log::log_inbound_app_event(&event);
|
||||
}
|
||||
if let Err(e) = self.app_event_tx.send(event) {
|
||||
tracing::error!("failed to send event: {e}");
|
||||
tracing::error!(
|
||||
"failed to send event: {}",
|
||||
codex_utils_log::bounded_display(&e)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -216,6 +216,7 @@ use codex_protocol::ThreadId;
|
||||
use codex_protocol::user_input::ByteRange;
|
||||
use codex_protocol::user_input::MAX_USER_INPUT_TEXT_CHARS;
|
||||
use codex_protocol::user_input::TextElement;
|
||||
use codex_utils_log::bounded_str;
|
||||
|
||||
mod attachment_state;
|
||||
mod draft_state;
|
||||
@@ -900,7 +901,7 @@ impl ChatComposer {
|
||||
// so we can directly try to read the image dimensions.
|
||||
match image::image_dimensions(&path_buf) {
|
||||
Ok((width, height)) => {
|
||||
tracing::info!("OK: {pasted}");
|
||||
tracing::info!("OK: {}", bounded_str(&pasted));
|
||||
tracing::debug!("image dimensions={}x{}", width, height);
|
||||
let format = pasted_image_format(&path_buf);
|
||||
tracing::debug!("attached image format={}", format.label());
|
||||
|
||||
@@ -137,7 +137,10 @@ impl ChatWidget {
|
||||
}
|
||||
|
||||
pub(super) fn on_turn_diff(&mut self, unified_diff: String) {
|
||||
debug!("TurnDiffEvent: {unified_diff}");
|
||||
debug!(
|
||||
"TurnDiffEvent: {}",
|
||||
codex_utils_log::bounded_str(&unified_diff)
|
||||
);
|
||||
self.refresh_status_line();
|
||||
}
|
||||
|
||||
|
||||
@@ -1216,7 +1216,7 @@ async fn run_ratatui_app(
|
||||
// (including backtraces) after we restore the terminal.
|
||||
let prev_hook = std::panic::take_hook();
|
||||
std::panic::set_hook(Box::new(move |info| {
|
||||
tracing::error!("panic: {info}");
|
||||
tracing::error!("panic: {}", codex_utils_log::bounded_display(info));
|
||||
prev_hook(info);
|
||||
}));
|
||||
let mut terminal = tui::init()?;
|
||||
|
||||
@@ -8,6 +8,9 @@
|
||||
//! On finalization, `finalize_and_drain_source()` flushes whatever remains (the last line, which
|
||||
//! may lack a trailing newline).
|
||||
|
||||
use codex_utils_log::bounded_debug;
|
||||
#[cfg(test)]
|
||||
use codex_utils_log::bounded_str;
|
||||
#[cfg(test)]
|
||||
use ratatui::text::Line;
|
||||
use std::path::Path;
|
||||
@@ -75,7 +78,7 @@ impl MarkdownStreamCollector {
|
||||
|
||||
/// Append a raw streaming delta to the internal source buffer.
|
||||
pub fn push_delta(&mut self, delta: &str) {
|
||||
tracing::trace!("push_delta: {delta:?}");
|
||||
tracing::trace!("push_delta: {}", bounded_debug(&delta));
|
||||
self.buffer.push_str(delta);
|
||||
}
|
||||
|
||||
@@ -174,7 +177,10 @@ impl MarkdownStreamCollector {
|
||||
self.buffer.len(),
|
||||
source.len()
|
||||
);
|
||||
tracing::trace!("markdown finalize (raw source):\n---\n{source}\n---");
|
||||
tracing::trace!(
|
||||
"markdown finalize (raw source):\n---\n{}\n---",
|
||||
bounded_str(&source)
|
||||
);
|
||||
|
||||
let mut rendered: Vec<Line<'static>> = Vec::new();
|
||||
markdown::append_markdown(&source, self.width, Some(self.cwd.as_path()), &mut rendered);
|
||||
|
||||
6
codex-rs/utils/log/BUILD.bazel
Normal file
6
codex-rs/utils/log/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "log",
|
||||
crate_name = "codex_utils_log",
|
||||
)
|
||||
17
codex-rs/utils/log/Cargo.toml
Normal file
17
codex-rs/utils/log/Cargo.toml
Normal file
@@ -0,0 +1,17 @@
|
||||
[package]
|
||||
name = "codex-utils-log"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
sha2 = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
|
||||
[lib]
|
||||
doctest = false
|
||||
289
codex-rs/utils/log/src/lib.rs
Normal file
289
codex-rs/utils/log/src/lib.rs
Normal file
@@ -0,0 +1,289 @@
|
||||
use std::fmt;
|
||||
|
||||
use sha2::Digest;
|
||||
use sha2::Sha256;
|
||||
|
||||
pub const DEFAULT_BOUNDED_LOG_VALUE_BYTES: usize = 16 * 1024;
|
||||
|
||||
pub fn bounded_display<T>(value: &T) -> String
|
||||
where
|
||||
T: fmt::Display + ?Sized,
|
||||
{
|
||||
bounded_format_with_limit(format_args!("{value}"), DEFAULT_BOUNDED_LOG_VALUE_BYTES)
|
||||
}
|
||||
|
||||
pub fn bounded_debug<T>(value: &T) -> String
|
||||
where
|
||||
T: fmt::Debug + ?Sized,
|
||||
{
|
||||
bounded_format_with_limit(format_args!("{value:?}"), DEFAULT_BOUNDED_LOG_VALUE_BYTES)
|
||||
}
|
||||
|
||||
pub fn bounded_str(value: &str) -> String {
|
||||
bounded_str_with_limit(value, DEFAULT_BOUNDED_LOG_VALUE_BYTES)
|
||||
}
|
||||
|
||||
pub fn bounded_str_with_limit(value: &str, max_bytes: usize) -> String {
|
||||
bounded_utf8_bytes(value.as_bytes(), max_bytes)
|
||||
}
|
||||
|
||||
pub fn bounded_bytes_lossy(value: &[u8]) -> String {
|
||||
bounded_bytes_lossy_with_limit(value, DEFAULT_BOUNDED_LOG_VALUE_BYTES)
|
||||
}
|
||||
|
||||
pub fn bounded_bytes_lossy_with_limit(value: &[u8], max_bytes: usize) -> String {
|
||||
if value.len() <= max_bytes {
|
||||
return String::from_utf8_lossy(value).into_owned();
|
||||
}
|
||||
|
||||
let (prefix, suffix) = split_bytes(value, max_bytes);
|
||||
let shown_bytes = prefix.len().saturating_add(suffix.len());
|
||||
assemble_bounded_log_value(
|
||||
&String::from_utf8_lossy(prefix),
|
||||
&String::from_utf8_lossy(suffix),
|
||||
value.len(),
|
||||
shown_bytes,
|
||||
digest_hex(value),
|
||||
)
|
||||
}
|
||||
|
||||
fn bounded_utf8_bytes(value: &[u8], max_bytes: usize) -> String {
|
||||
if value.len() <= max_bytes {
|
||||
return String::from_utf8_lossy(value).into_owned();
|
||||
}
|
||||
|
||||
let text = String::from_utf8_lossy(value);
|
||||
let prefix_len = utf8_prefix_boundary(&text, max_bytes / 2);
|
||||
let suffix_start = utf8_suffix_boundary(&text, max_bytes - max_bytes / 2);
|
||||
let prefix = &text[..prefix_len];
|
||||
let suffix = &text[suffix_start..];
|
||||
let shown_bytes = prefix.len().saturating_add(suffix.len());
|
||||
assemble_bounded_log_value(prefix, suffix, value.len(), shown_bytes, digest_hex(value))
|
||||
}
|
||||
|
||||
fn bounded_format_with_limit(args: fmt::Arguments<'_>, max_bytes: usize) -> String {
|
||||
let mut writer = BoundedFormatWriter::new(max_bytes);
|
||||
if fmt::write(&mut writer, args).is_err() {
|
||||
return String::from("<failed to format bounded log value>");
|
||||
}
|
||||
writer.finish()
|
||||
}
|
||||
|
||||
struct BoundedFormatWriter {
|
||||
max_bytes: usize,
|
||||
head: String,
|
||||
tail: String,
|
||||
full: Option<String>,
|
||||
original_bytes: usize,
|
||||
digest: Sha256,
|
||||
}
|
||||
|
||||
impl BoundedFormatWriter {
|
||||
fn new(max_bytes: usize) -> Self {
|
||||
Self {
|
||||
max_bytes,
|
||||
head: String::new(),
|
||||
tail: String::new(),
|
||||
full: Some(String::new()),
|
||||
original_bytes: 0,
|
||||
digest: Sha256::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn finish(self) -> String {
|
||||
if let Some(full) = self.full {
|
||||
return full;
|
||||
}
|
||||
|
||||
let shown_bytes = self.head.len().saturating_add(self.tail.len());
|
||||
let digest = self.digest.finalize();
|
||||
assemble_bounded_log_value(
|
||||
&self.head,
|
||||
&self.tail,
|
||||
self.original_bytes,
|
||||
shown_bytes,
|
||||
format!("{digest:x}"),
|
||||
)
|
||||
}
|
||||
|
||||
fn head_capacity(&self) -> usize {
|
||||
self.max_bytes / 2
|
||||
}
|
||||
|
||||
fn tail_capacity(&self) -> usize {
|
||||
self.max_bytes - self.head_capacity()
|
||||
}
|
||||
|
||||
fn push_head(&mut self, value: &str) {
|
||||
let remaining = self.head_capacity().saturating_sub(self.head.len());
|
||||
if remaining == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
let boundary = utf8_prefix_boundary(value, remaining);
|
||||
self.head.push_str(&value[..boundary]);
|
||||
}
|
||||
|
||||
fn push_tail(&mut self, value: &str) {
|
||||
let capacity = self.tail_capacity();
|
||||
if capacity == 0 {
|
||||
return;
|
||||
}
|
||||
|
||||
if value.len() >= capacity {
|
||||
let start = utf8_suffix_boundary(value, capacity);
|
||||
self.tail.clear();
|
||||
self.tail.push_str(&value[start..]);
|
||||
return;
|
||||
}
|
||||
|
||||
self.tail.push_str(value);
|
||||
if self.tail.len() > capacity {
|
||||
let start = utf8_suffix_boundary(&self.tail, capacity);
|
||||
self.tail.replace_range(..start, "");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Write for BoundedFormatWriter {
|
||||
fn write_str(&mut self, value: &str) -> fmt::Result {
|
||||
let original_bytes = self.original_bytes.saturating_add(value.len());
|
||||
self.digest.update(value.as_bytes());
|
||||
self.push_head(value);
|
||||
self.push_tail(value);
|
||||
|
||||
if let Some(full) = &mut self.full {
|
||||
if original_bytes <= self.max_bytes {
|
||||
full.push_str(value);
|
||||
} else {
|
||||
self.full = None;
|
||||
}
|
||||
}
|
||||
|
||||
self.original_bytes = original_bytes;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn split_bytes(value: &[u8], max_bytes: usize) -> (&[u8], &[u8]) {
|
||||
let prefix_len = max_bytes / 2;
|
||||
let suffix_len = max_bytes - prefix_len;
|
||||
let suffix_start = value.len().saturating_sub(suffix_len);
|
||||
(&value[..prefix_len], &value[suffix_start..])
|
||||
}
|
||||
|
||||
fn utf8_prefix_boundary(value: &str, max_bytes: usize) -> usize {
|
||||
if value.len() <= max_bytes {
|
||||
return value.len();
|
||||
}
|
||||
let mut boundary = max_bytes;
|
||||
while boundary > 0 && !value.is_char_boundary(boundary) {
|
||||
boundary -= 1;
|
||||
}
|
||||
boundary
|
||||
}
|
||||
|
||||
fn utf8_suffix_boundary(value: &str, max_bytes: usize) -> usize {
|
||||
if value.len() <= max_bytes {
|
||||
return 0;
|
||||
}
|
||||
let mut boundary = value.len().saturating_sub(max_bytes);
|
||||
while boundary < value.len() && !value.is_char_boundary(boundary) {
|
||||
boundary += 1;
|
||||
}
|
||||
boundary
|
||||
}
|
||||
|
||||
fn assemble_bounded_log_value(
|
||||
prefix: &str,
|
||||
suffix: &str,
|
||||
original_bytes: usize,
|
||||
shown_bytes: usize,
|
||||
sha256: String,
|
||||
) -> String {
|
||||
let omitted_bytes = original_bytes.saturating_sub(shown_bytes);
|
||||
format!(
|
||||
"{prefix}...[truncated: original_bytes={original_bytes} shown_bytes={shown_bytes} omitted_bytes={omitted_bytes} sha256={sha256}]...{suffix}"
|
||||
)
|
||||
}
|
||||
|
||||
fn digest_hex(value: &[u8]) -> String {
|
||||
let digest = Sha256::digest(value);
|
||||
format!("{digest:x}")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn bounded_str_returns_short_values_unchanged() {
|
||||
assert_eq!("hello", bounded_str_with_limit("hello", /*max_bytes*/ 16));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bounded_str_preserves_prefix_suffix_and_reports_metadata() {
|
||||
let value = "abcdefghijklmnop";
|
||||
|
||||
let bounded = bounded_str_with_limit(value, /*max_bytes*/ 8);
|
||||
|
||||
assert!(bounded.starts_with("abcd...[truncated: original_bytes=16 shown_bytes=8"));
|
||||
assert!(bounded.contains(" omitted_bytes=8 "));
|
||||
assert!(bounded.contains(" sha256="));
|
||||
assert!(bounded.ends_with("]...mnop"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bounded_str_respects_utf8_boundaries() {
|
||||
let value = "αβγδεζηθ";
|
||||
|
||||
let bounded = bounded_str_with_limit(value, /*max_bytes*/ 9);
|
||||
|
||||
assert!(bounded.starts_with("αβ...[truncated:"));
|
||||
assert!(bounded.ends_with("]...ηθ"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bounded_bytes_lossy_hashes_original_bytes() {
|
||||
let value = b"abcdef\xffghijklmnop";
|
||||
|
||||
let bounded = bounded_bytes_lossy_with_limit(value, /*max_bytes*/ 8);
|
||||
|
||||
assert!(bounded.starts_with("abcd...[truncated: original_bytes=17 shown_bytes=8"));
|
||||
assert!(bounded.ends_with("]...mnop"));
|
||||
assert!(bounded.contains(&digest_hex(value)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bounded_debug_formats_then_bounds() {
|
||||
let value = Some("a".repeat(DEFAULT_BOUNDED_LOG_VALUE_BYTES));
|
||||
|
||||
let bounded = bounded_debug(&value);
|
||||
|
||||
assert!(bounded.starts_with("Some(\""));
|
||||
assert!(bounded.contains("[truncated:"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bounded_format_streams_prefix_suffix_and_hash() {
|
||||
struct ChunkedDisplay;
|
||||
|
||||
impl fmt::Display for ChunkedDisplay {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
for chunk in ["abcd", "efgh", "ijkl", "mnop"] {
|
||||
f.write_str(chunk)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
let bounded =
|
||||
bounded_format_with_limit(format_args!("{ChunkedDisplay}"), /*max_bytes*/ 8);
|
||||
|
||||
assert!(bounded.starts_with("abcd...[truncated: original_bytes=16 shown_bytes=8"));
|
||||
assert!(bounded.ends_with("]...mnop"));
|
||||
assert!(bounded.contains(&digest_hex(b"abcdefghijklmnop")));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user