Compare commits

...

1 Commits

Author SHA1 Message Date
Owen Lin
2aa08e92b2 wip more informative authrequired error 2026-01-20 16:23:55 -08:00
5 changed files with 382 additions and 64 deletions

View File

@@ -1353,10 +1353,13 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error creating conversation: {err}"),
data: None,
let error = match err {
CodexErr::AuthRequired => Self::auth_required_error(),
_ => JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error creating conversation: {err}"),
data: None,
},
};
self.outgoing.send_error(request_id, error).await;
}
@@ -1465,10 +1468,13 @@ impl CodexMessageProcessor {
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error creating thread: {err}"),
data: None,
let error = match err {
CodexErr::AuthRequired => Self::auth_required_error(),
_ => JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error creating thread: {err}"),
data: None,
},
};
self.outgoing.send_error(request_id, error).await;
}
@@ -1896,10 +1902,13 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error resuming thread: {err}"),
data: None,
let error = match err {
CodexErr::AuthRequired => Self::auth_required_error(),
_ => JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error resuming thread: {err}"),
data: None,
},
};
self.outgoing.send_error(request_id, error).await;
}
@@ -2022,18 +2031,26 @@ impl CodexMessageProcessor {
{
Ok(thread) => thread,
Err(err) => {
let (code, message) = match err {
CodexErr::Io(_) | CodexErr::Json(_) => (
INVALID_REQUEST_ERROR_CODE,
format!("failed to load rollout `{}`: {err}", rollout_path.display()),
),
CodexErr::InvalidRequest(message) => (INVALID_REQUEST_ERROR_CODE, message),
_ => (INTERNAL_ERROR_CODE, format!("error forking thread: {err}")),
};
let error = JSONRPCErrorError {
code,
message,
data: None,
let error = match err {
CodexErr::AuthRequired => Self::auth_required_error(),
CodexErr::Io(_) | CodexErr::Json(_) => JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"failed to load rollout `{}`: {err}",
rollout_path.display()
),
data: None,
},
CodexErr::InvalidRequest(message) => JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message,
data: None,
},
_ => JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error forking thread: {err}"),
data: None,
},
};
self.outgoing.send_error(request_id, error).await;
return;
@@ -2763,10 +2780,13 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error resuming conversation: {err}"),
data: None,
let error = match err {
CodexErr::AuthRequired => Self::auth_required_error(),
_ => JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error resuming conversation: {err}"),
data: None,
},
};
self.outgoing.send_error(request_id, error).await;
}
@@ -2885,21 +2905,26 @@ impl CodexMessageProcessor {
{
Ok(thread) => thread,
Err(err) => {
let (code, message) = match err {
CodexErr::Io(_) | CodexErr::Json(_) => (
INVALID_REQUEST_ERROR_CODE,
format!("failed to load rollout `{}`: {err}", rollout_path.display()),
),
CodexErr::InvalidRequest(message) => (INVALID_REQUEST_ERROR_CODE, message),
_ => (
INTERNAL_ERROR_CODE,
format!("error forking conversation: {err}"),
),
};
let error = JSONRPCErrorError {
code,
message,
data: None,
let error = match err {
CodexErr::AuthRequired => Self::auth_required_error(),
CodexErr::Io(_) | CodexErr::Json(_) => JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"failed to load rollout `{}`: {err}",
rollout_path.display()
),
data: None,
},
CodexErr::InvalidRequest(message) => JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message,
data: None,
},
_ => JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error forking conversation: {err}"),
data: None,
},
};
self.outgoing.send_error(request_id, error).await;
return;
@@ -2942,6 +2967,15 @@ impl CodexMessageProcessor {
self.outgoing.send_error(request_id, error).await;
}
fn auth_required_error() -> JSONRPCErrorError {
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "OpenAI authentication required. Please login via account/login/start (chatgpt or apiKey)."
.to_string(),
data: None,
}
}
async fn send_internal_error(&self, request_id: RequestId, message: String) {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
@@ -3136,12 +3170,24 @@ impl CodexMessageProcessor {
.collect();
// Submit user input to the conversation.
let _ = conversation
if let Err(err) = conversation
.submit(Op::UserInput {
items: mapped_items,
final_output_json_schema: None,
})
.await;
.await
{
let error = match err {
CodexErr::AuthRequired => Self::auth_required_error(),
_ => JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to send message: {err}"),
data: None,
},
};
self.outgoing.send_error(request_id, error).await;
return;
}
// Acknowledge with an empty result.
self.outgoing
@@ -3185,7 +3231,7 @@ impl CodexMessageProcessor {
})
.collect();
let _ = conversation
if let Err(err) = conversation
.submit(Op::UserTurn {
items: mapped_items,
cwd,
@@ -3196,7 +3242,19 @@ impl CodexMessageProcessor {
summary,
final_output_json_schema: output_schema,
})
.await;
.await
{
let error = match err {
CodexErr::AuthRequired => Self::auth_required_error(),
_ => JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start turn: {err}"),
data: None,
},
};
self.outgoing.send_error(request_id, error).await;
return;
}
self.outgoing
.send_response(request_id, SendUserTurnResponse {})
@@ -3323,10 +3381,13 @@ impl CodexMessageProcessor {
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start turn: {err}"),
data: None,
let error = match err {
CodexErr::AuthRequired => Self::auth_required_error(),
_ => JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start turn: {err}"),
data: None,
},
};
self.outgoing.send_error(request_id, error).await;
}
@@ -3400,10 +3461,13 @@ impl CodexMessageProcessor {
.await;
Ok(())
}
Err(err) => Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start review: {err}"),
data: None,
Err(err) => Err(match err {
CodexErr::AuthRequired => Self::auth_required_error(),
_ => JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start review: {err}"),
data: None,
},
}),
}
}
@@ -3443,10 +3507,13 @@ impl CodexMessageProcessor {
.thread_manager
.fork_thread(usize::MAX, config, rollout_path)
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error creating detached review thread: {err}"),
data: None,
.map_err(|err| match err {
CodexErr::AuthRequired => Self::auth_required_error(),
_ => JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error creating detached review thread: {err}"),
data: None,
},
})?;
if let Err(err) = self
@@ -3482,10 +3549,13 @@ impl CodexMessageProcessor {
let turn_id = review_thread
.submit(Op::Review { review_request })
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start detached review turn: {err}"),
data: None,
.map_err(|err| match err {
CodexErr::AuthRequired => Self::auth_required_error(),
_ => JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start detached review turn: {err}"),
data: None,
},
})?;
let turn = Self::build_review_turn(turn_id, display_text);

View File

@@ -0,0 +1,207 @@
use anyhow::Result;
use app_test_support::ChatGptAuthFixture;
use app_test_support::McpProcess;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::to_response;
use app_test_support::write_chatgpt_auth;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ReviewDelivery;
use codex_app_server_protocol::ReviewStartParams;
use codex_app_server_protocol::ReviewTarget;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_core::auth::AuthCredentialsStoreMode;
use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
#[tokio::test]
async fn thread_start_requires_auth() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("unused").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), true)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let error: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE);
assert!(
error.error.message.contains("authentication required"),
"unexpected message: {}",
error.error.message
);
Ok(())
}
#[tokio::test]
async fn turn_start_requires_auth_after_logout() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("unused").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), true)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("access-token"),
AuthCredentialsStoreMode::File,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response(thread_resp)?;
let logout_req = mcp.send_logout_account_request().await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(logout_req)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("account/updated"),
)
.await??;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let error: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(turn_req)),
)
.await??;
assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE);
assert!(
error.error.message.contains("authentication required"),
"unexpected message: {}",
error.error.message
);
Ok(())
}
#[tokio::test]
async fn review_start_requires_auth_after_logout() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("unused").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), true)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("access-token"),
AuthCredentialsStoreMode::File,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response(thread_resp)?;
let logout_req = mcp.send_logout_account_request().await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(logout_req)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("account/updated"),
)
.await??;
let review_req = mcp
.send_review_start_request(ReviewStartParams {
thread_id: thread.id,
delivery: Some(ReviewDelivery::Inline),
target: ReviewTarget::Custom {
instructions: "review".to_string(),
},
})
.await?;
let error: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(review_req)),
)
.await??;
assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE);
assert!(
error.error.message.contains("authentication required"),
"unexpected message: {}",
error.error.message
);
Ok(())
}
fn create_config_toml(
codex_home: &Path,
server_uri: &str,
requires_openai_auth: bool,
) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for auth required tests"
base_url = "{server_uri}/v1"
wire_api = "responses"
requires_openai_auth = {requires_openai_auth}
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}

View File

@@ -179,6 +179,8 @@ pub struct Codex {
pub(crate) rx_event: Receiver<Event>,
// Last known status of the agent.
pub(crate) agent_status: watch::Receiver<AgentStatus>,
pub(crate) auth_manager: Arc<AuthManager>,
pub(crate) requires_openai_auth: bool,
}
/// Wrapper returned by [`Codex::spawn`] containing the spawned [`Codex`],
@@ -230,6 +232,10 @@ impl Codex {
session_source: SessionSource,
agent_control: AgentControl,
) -> CodexResult<CodexSpawnOk> {
if config.model_provider.requires_openai_auth && auth_manager.get_auth_mode().is_none() {
return Err(CodexErr::AuthRequired);
}
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_event, rx_event) = async_channel::unbounded();
@@ -254,6 +260,7 @@ impl Codex {
.await
.map_err(|err| CodexErr::Fatal(format!("failed to load rules: {err}")))?;
let requires_openai_auth = config.model_provider.requires_openai_auth;
let config = Arc::new(config);
let _ = models_manager
.list_models(
@@ -315,6 +322,8 @@ impl Codex {
tx_sub,
rx_event,
agent_status: agent_status_rx,
auth_manager,
requires_openai_auth,
};
#[allow(deprecated)]
@@ -327,6 +336,16 @@ impl Codex {
/// Submit the `op` wrapped in a `Submission` with a unique ID.
pub async fn submit(&self, op: Op) -> CodexResult<String> {
if self.requires_openai_auth
&& matches!(
op,
Op::UserInput { .. } | Op::UserTurn { .. } | Op::Review { .. }
)
&& self.auth_manager.get_auth_mode().is_none()
{
return Err(CodexErr::AuthRequired);
}
let id = self
.next_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)

View File

@@ -42,6 +42,8 @@ pub(crate) async fn run_codex_thread_interactive(
cancel_token: CancellationToken,
initial_history: Option<InitialHistory>,
) -> Result<Codex, CodexErr> {
let requires_openai_auth = config.model_provider.requires_openai_auth;
let auth_manager_clone = Arc::clone(&auth_manager);
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_ops, rx_ops) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
@@ -88,6 +90,8 @@ pub(crate) async fn run_codex_thread_interactive(
tx_sub: tx_ops,
rx_event: rx_sub,
agent_status: codex.agent_status.clone(),
auth_manager: auth_manager_clone,
requires_openai_auth,
})
}
@@ -105,6 +109,8 @@ pub(crate) async fn run_codex_thread_one_shot(
cancel_token: CancellationToken,
initial_history: Option<InitialHistory>,
) -> Result<Codex, CodexErr> {
let requires_openai_auth = config.model_provider.requires_openai_auth;
let auth_manager_clone = Arc::clone(&auth_manager);
// Use a child token so we can stop the delegate after completion without
// requiring the caller to cancel the parent token.
let child_cancel = cancel_token.child_token();
@@ -162,6 +168,8 @@ pub(crate) async fn run_codex_thread_one_shot(
rx_event: rx_bridge,
tx_sub: tx_closed,
agent_status,
auth_manager: auth_manager_clone,
requires_openai_auth,
})
}
@@ -361,6 +369,8 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::AuthManager;
use crate::auth::AuthCredentialsStoreMode;
use async_channel::bounded;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AgentStatus;
@@ -368,6 +378,7 @@ mod tests {
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use pretty_assertions::assert_eq;
use std::path::PathBuf;
use tokio::sync::watch;
#[tokio::test]
@@ -375,11 +386,18 @@ mod tests {
let (tx_events, rx_events) = bounded(1);
let (tx_sub, rx_sub) = bounded(SUBMISSION_CHANNEL_CAPACITY);
let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit);
let auth_manager = AuthManager::shared(
PathBuf::from("non-existent"),
false,
AuthCredentialsStoreMode::File,
);
let codex = Arc::new(Codex {
next_id: AtomicU64::new(0),
tx_sub,
rx_event: rx_events,
agent_status,
auth_manager,
requires_openai_auth: false,
});
let (session, ctx, _rx_evt) = crate::codex::make_session_and_context_with_rx().await;

View File

@@ -103,6 +103,9 @@ pub enum CodexErr {
#[error("{0}")]
InvalidRequest(String),
#[error("OpenAI authentication required.")]
AuthRequired,
/// Invalid image.
#[error("Image poisoning")]
InvalidImageRequest(),
@@ -201,7 +204,8 @@ impl CodexErr {
| CodexErr::ThreadNotFound(_)
| CodexErr::Spawn
| CodexErr::SessionConfiguredNotFirstEvent
| CodexErr::UsageLimitReached(_) => false,
| CodexErr::UsageLimitReached(_)
| CodexErr::AuthRequired => false,
CodexErr::Stream(..)
| CodexErr::Timeout
| CodexErr::UnexpectedStatus(_)