mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
fix(app-server): surface more helpful errors for json-rpc (#11638)
Propagate client JSON-RPC errors for app-server request callbacks.
Previously a number of possible errors were collapsed to `channel
closed`. Now we should be able to see the underlying client error.
### Summary
This change stops masking client JSON-RPC error responses as generic
callback cancellation in app-server server->client request flows.
Previously, when the client responded with a JSON-RPC error, we removed
the callback entry but did not send anything to the waiting oneshot
receiver. Waiters then observed channel closure (for example, auth
refresh request canceled: channel closed), which hid the actual client
error.
Now, client JSON-RPC errors are forwarded through the callback channel
and handled explicitly by request consumers.
### User-visible behavior
- External auth refresh now surfaces real client JSON-RPC errors when
provided.
- True transport/callback-drop cases still report
canceled/channel-closed semantics.
### Example: client JSON-RPC error is now propagated (not masked as
"canceled")
When app-server asks the client to refresh ChatGPT auth tokens, it sends
a server->client JSON-RPC request like:
```json
{
"id": 42,
"method": "account/chatgptAuthTokens/refresh",
"params": {
"reason": "unauthorized",
"previousAccountId": "org-abc"
}
}
```
If the client cannot refresh and responds with a JSON-RPC error:
```
{
"id": 42,
"error": {
"code": -32000,
"message": "refresh failed",
"data": null
}
}
```
app-server now forwards that error through the callback path and
surfaces:
`auth refresh request failed: code=-32000 message=refresh failed`
Previously, this same case could be reported as:
`auth refresh request canceled: channel closed`
This commit is contained in:
@@ -4,6 +4,7 @@ use crate::codex_message_processor::read_summary_from_rollout;
|
||||
use crate::codex_message_processor::summary_to_thread;
|
||||
use crate::error_code::INTERNAL_ERROR_CODE;
|
||||
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
||||
use crate::outgoing_message::ClientRequestResult;
|
||||
use crate::outgoing_message::ThreadScopedOutgoingMessageSender;
|
||||
use crate::thread_state::ThreadState;
|
||||
use crate::thread_state::TurnSummary;
|
||||
@@ -1408,12 +1409,25 @@ async fn handle_error(
|
||||
|
||||
async fn on_patch_approval_response(
|
||||
call_id: String,
|
||||
receiver: oneshot::Receiver<JsonValue>,
|
||||
receiver: oneshot::Receiver<ClientRequestResult>,
|
||||
codex: Arc<CodexThread>,
|
||||
) {
|
||||
let response = receiver.await;
|
||||
let value = match response {
|
||||
Ok(value) => value,
|
||||
Ok(Ok(value)) => value,
|
||||
Ok(Err(err)) => {
|
||||
error!("request failed with client error: {err:?}");
|
||||
if let Err(submit_err) = codex
|
||||
.submit(Op::PatchApproval {
|
||||
id: call_id.clone(),
|
||||
decision: ReviewDecision::Denied,
|
||||
})
|
||||
.await
|
||||
{
|
||||
error!("failed to submit denied PatchApproval after request failure: {submit_err}");
|
||||
}
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
error!("request failed: {err:?}");
|
||||
if let Err(submit_err) = codex
|
||||
@@ -1451,12 +1465,16 @@ async fn on_patch_approval_response(
|
||||
async fn on_exec_approval_response(
|
||||
call_id: String,
|
||||
turn_id: String,
|
||||
receiver: oneshot::Receiver<JsonValue>,
|
||||
receiver: oneshot::Receiver<ClientRequestResult>,
|
||||
conversation: Arc<CodexThread>,
|
||||
) {
|
||||
let response = receiver.await;
|
||||
let value = match response {
|
||||
Ok(value) => value,
|
||||
Ok(Ok(value)) => value,
|
||||
Ok(Err(err)) => {
|
||||
error!("request failed with client error: {err:?}");
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
error!("request failed: {err:?}");
|
||||
return;
|
||||
@@ -1488,12 +1506,28 @@ async fn on_exec_approval_response(
|
||||
|
||||
async fn on_request_user_input_response(
|
||||
event_turn_id: String,
|
||||
receiver: oneshot::Receiver<JsonValue>,
|
||||
receiver: oneshot::Receiver<ClientRequestResult>,
|
||||
conversation: Arc<CodexThread>,
|
||||
) {
|
||||
let response = receiver.await;
|
||||
let value = match response {
|
||||
Ok(value) => value,
|
||||
Ok(Ok(value)) => value,
|
||||
Ok(Err(err)) => {
|
||||
error!("request failed with client error: {err:?}");
|
||||
let empty = CoreRequestUserInputResponse {
|
||||
answers: HashMap::new(),
|
||||
};
|
||||
if let Err(err) = conversation
|
||||
.submit(Op::UserInputAnswer {
|
||||
id: event_turn_id,
|
||||
response: empty,
|
||||
})
|
||||
.await
|
||||
{
|
||||
error!("failed to submit UserInputAnswer: {err}");
|
||||
}
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
error!("request failed: {err:?}");
|
||||
let empty = CoreRequestUserInputResponse {
|
||||
@@ -1628,14 +1662,14 @@ async fn on_file_change_request_approval_response(
|
||||
conversation_id: ThreadId,
|
||||
item_id: String,
|
||||
changes: Vec<FileUpdateChange>,
|
||||
receiver: oneshot::Receiver<JsonValue>,
|
||||
receiver: oneshot::Receiver<ClientRequestResult>,
|
||||
codex: Arc<CodexThread>,
|
||||
outgoing: ThreadScopedOutgoingMessageSender,
|
||||
thread_state: Arc<Mutex<ThreadState>>,
|
||||
) {
|
||||
let response = receiver.await;
|
||||
let (decision, completion_status) = match response {
|
||||
Ok(value) => {
|
||||
Ok(Ok(value)) => {
|
||||
let response = serde_json::from_value::<FileChangeRequestApprovalResponse>(value)
|
||||
.unwrap_or_else(|err| {
|
||||
error!("failed to deserialize FileChangeRequestApprovalResponse: {err}");
|
||||
@@ -1650,6 +1684,10 @@ async fn on_file_change_request_approval_response(
|
||||
// Only short-circuit on declines/cancels/failures.
|
||||
(decision, completion_status)
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
error!("request failed with client error: {err:?}");
|
||||
(ReviewDecision::Denied, Some(PatchApplyStatus::Failed))
|
||||
}
|
||||
Err(err) => {
|
||||
error!("request failed: {err:?}");
|
||||
(ReviewDecision::Denied, Some(PatchApplyStatus::Failed))
|
||||
@@ -1688,13 +1726,13 @@ async fn on_command_execution_request_approval_response(
|
||||
command: String,
|
||||
cwd: PathBuf,
|
||||
command_actions: Vec<V2ParsedCommand>,
|
||||
receiver: oneshot::Receiver<JsonValue>,
|
||||
receiver: oneshot::Receiver<ClientRequestResult>,
|
||||
conversation: Arc<CodexThread>,
|
||||
outgoing: ThreadScopedOutgoingMessageSender,
|
||||
) {
|
||||
let response = receiver.await;
|
||||
let (decision, completion_status) = match response {
|
||||
Ok(value) => {
|
||||
Ok(Ok(value)) => {
|
||||
let response = serde_json::from_value::<CommandExecutionRequestApprovalResponse>(value)
|
||||
.unwrap_or_else(|err| {
|
||||
error!("failed to deserialize CommandExecutionRequestApprovalResponse: {err}");
|
||||
@@ -1729,6 +1767,10 @@ async fn on_command_execution_request_approval_response(
|
||||
};
|
||||
(decision, completion_status)
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
error!("request failed with client error: {err:?}");
|
||||
(ReviewDecision::Denied, Some(CommandExecutionStatus::Failed))
|
||||
}
|
||||
Err(err) => {
|
||||
error!("request failed: {err:?}");
|
||||
(ReviewDecision::Denied, Some(CommandExecutionStatus::Failed))
|
||||
|
||||
@@ -7,14 +7,35 @@ use std::sync::Arc;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::error;
|
||||
|
||||
use crate::outgoing_message::ClientRequestResult;
|
||||
|
||||
pub(crate) async fn on_call_response(
|
||||
call_id: String,
|
||||
receiver: oneshot::Receiver<serde_json::Value>,
|
||||
receiver: oneshot::Receiver<ClientRequestResult>,
|
||||
conversation: Arc<CodexThread>,
|
||||
) {
|
||||
let response = receiver.await;
|
||||
let value = match response {
|
||||
Ok(value) => value,
|
||||
Ok(Ok(value)) => value,
|
||||
Ok(Err(err)) => {
|
||||
error!("request failed with client error: {err:?}");
|
||||
let fallback = CoreDynamicToolResponse {
|
||||
content_items: vec![CoreDynamicToolCallOutputContentItem::InputText {
|
||||
text: "dynamic tool request failed".to_string(),
|
||||
}],
|
||||
success: false,
|
||||
};
|
||||
if let Err(err) = conversation
|
||||
.submit(Op::DynamicToolResponse {
|
||||
id: call_id.clone(),
|
||||
response: fallback,
|
||||
})
|
||||
.await
|
||||
{
|
||||
error!("failed to submit DynamicToolResponse: {err}");
|
||||
}
|
||||
return;
|
||||
}
|
||||
Err(err) => {
|
||||
error!("request failed: {err:?}");
|
||||
let fallback = CoreDynamicToolResponse {
|
||||
|
||||
@@ -86,9 +86,20 @@ impl ExternalAuthRefresher for ExternalAuthRefreshBridge {
|
||||
.await;
|
||||
|
||||
let result = match timeout(EXTERNAL_AUTH_REFRESH_TIMEOUT, rx).await {
|
||||
Ok(result) => result.map_err(|err| {
|
||||
std::io::Error::other(format!("auth refresh request canceled: {err}"))
|
||||
})?,
|
||||
Ok(result) => {
|
||||
// Two failure scenarios:
|
||||
// 1) `oneshot::Receiver` failed (sender dropped) => request canceled/channel closed.
|
||||
// 2) client answered with JSON-RPC error payload => propagate code/message.
|
||||
let result = result.map_err(|err| {
|
||||
std::io::Error::other(format!("auth refresh request canceled: {err}"))
|
||||
})?;
|
||||
result.map_err(|err| {
|
||||
std::io::Error::other(format!(
|
||||
"auth refresh request failed: code={} message={}",
|
||||
err.code, err.message
|
||||
))
|
||||
})?
|
||||
}
|
||||
Err(_) => {
|
||||
let _canceled = self.outgoing.cancel_request(&request_id).await;
|
||||
return Err(std::io::Error::other(format!(
|
||||
|
||||
@@ -20,6 +20,8 @@ use crate::error_code::INTERNAL_ERROR_CODE;
|
||||
#[cfg(test)]
|
||||
use codex_protocol::account::PlanType;
|
||||
|
||||
pub(crate) type ClientRequestResult = std::result::Result<Result, JSONRPCErrorError>;
|
||||
|
||||
/// Stable identifier for a transport connection.
|
||||
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
|
||||
pub(crate) struct ConnectionId(pub(crate) u64);
|
||||
@@ -46,7 +48,7 @@ pub(crate) enum OutgoingEnvelope {
|
||||
pub(crate) struct OutgoingMessageSender {
|
||||
next_server_request_id: AtomicI64,
|
||||
sender: mpsc::Sender<OutgoingEnvelope>,
|
||||
request_id_to_callback: Mutex<HashMap<RequestId, oneshot::Sender<Result>>>,
|
||||
request_id_to_callback: Mutex<HashMap<RequestId, oneshot::Sender<ClientRequestResult>>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -69,7 +71,7 @@ impl ThreadScopedOutgoingMessageSender {
|
||||
pub(crate) async fn send_request(
|
||||
&self,
|
||||
payload: ServerRequestPayload,
|
||||
) -> oneshot::Receiver<Result> {
|
||||
) -> oneshot::Receiver<ClientRequestResult> {
|
||||
if self.connection_ids.is_empty() {
|
||||
let (_tx, rx) = oneshot::channel();
|
||||
return rx;
|
||||
@@ -118,7 +120,7 @@ impl OutgoingMessageSender {
|
||||
&self,
|
||||
connection_ids: &[ConnectionId],
|
||||
request: ServerRequestPayload,
|
||||
) -> oneshot::Receiver<Result> {
|
||||
) -> oneshot::Receiver<ClientRequestResult> {
|
||||
let (_id, rx) = self
|
||||
.send_request_with_id_to_connections(connection_ids, request)
|
||||
.await;
|
||||
@@ -128,7 +130,7 @@ impl OutgoingMessageSender {
|
||||
pub(crate) async fn send_request_with_id(
|
||||
&self,
|
||||
request: ServerRequestPayload,
|
||||
) -> (RequestId, oneshot::Receiver<Result>) {
|
||||
) -> (RequestId, oneshot::Receiver<ClientRequestResult>) {
|
||||
self.send_request_with_id_to_connections(&[], request).await
|
||||
}
|
||||
|
||||
@@ -136,7 +138,7 @@ impl OutgoingMessageSender {
|
||||
&self,
|
||||
connection_ids: &[ConnectionId],
|
||||
request: ServerRequestPayload,
|
||||
) -> (RequestId, oneshot::Receiver<Result>) {
|
||||
) -> (RequestId, oneshot::Receiver<ClientRequestResult>) {
|
||||
let id = RequestId::Integer(self.next_server_request_id.fetch_add(1, Ordering::Relaxed));
|
||||
let outgoing_message_id = id.clone();
|
||||
let (tx_approve, rx_approve) = oneshot::channel();
|
||||
@@ -190,7 +192,7 @@ impl OutgoingMessageSender {
|
||||
|
||||
match entry {
|
||||
Some((id, sender)) => {
|
||||
if let Err(err) = sender.send(result) {
|
||||
if let Err(err) = sender.send(Ok(result)) {
|
||||
warn!("could not notify callback for {id:?} due to: {err:?}");
|
||||
}
|
||||
}
|
||||
@@ -207,8 +209,11 @@ impl OutgoingMessageSender {
|
||||
};
|
||||
|
||||
match entry {
|
||||
Some((id, _sender)) => {
|
||||
Some((id, sender)) => {
|
||||
warn!("client responded with error for {id:?}: {error:?}");
|
||||
if let Err(err) = sender.send(Err(error)) {
|
||||
warn!("could not notify callback for {id:?} due to: {err:?}");
|
||||
}
|
||||
}
|
||||
None => {
|
||||
warn!("could not find callback for {id:?}");
|
||||
@@ -390,11 +395,13 @@ mod tests {
|
||||
use codex_app_server_protocol::AccountLoginCompletedNotification;
|
||||
use codex_app_server_protocol::AccountRateLimitsUpdatedNotification;
|
||||
use codex_app_server_protocol::AccountUpdatedNotification;
|
||||
use codex_app_server_protocol::ApplyPatchApprovalParams;
|
||||
use codex_app_server_protocol::AuthMode;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::LoginChatGptCompleteNotification;
|
||||
use codex_app_server_protocol::RateLimitSnapshot;
|
||||
use codex_app_server_protocol::RateLimitWindow;
|
||||
use codex_protocol::ThreadId;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use tokio::time::timeout;
|
||||
@@ -609,4 +616,38 @@ mod tests {
|
||||
other => panic!("expected targeted error envelope, got: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn notify_client_error_forwards_error_to_waiter() {
|
||||
let (tx, _rx) = mpsc::channel::<OutgoingEnvelope>(4);
|
||||
let outgoing = OutgoingMessageSender::new(tx);
|
||||
|
||||
let (request_id, wait_for_result) = outgoing
|
||||
.send_request_with_id(ServerRequestPayload::ApplyPatchApproval(
|
||||
ApplyPatchApprovalParams {
|
||||
conversation_id: ThreadId::new(),
|
||||
call_id: "call-id".to_string(),
|
||||
file_changes: HashMap::new(),
|
||||
reason: None,
|
||||
grant_root: None,
|
||||
},
|
||||
))
|
||||
.await;
|
||||
|
||||
let error = JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: "refresh failed".to_string(),
|
||||
data: None,
|
||||
};
|
||||
|
||||
outgoing
|
||||
.notify_client_error(request_id, error.clone())
|
||||
.await;
|
||||
|
||||
let result = timeout(Duration::from_secs(1), wait_for_result)
|
||||
.await
|
||||
.expect("wait should not time out")
|
||||
.expect("waiter should receive a callback");
|
||||
assert_eq!(result, Err(error));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user