Files
codex/codex-rs/app-server/tests/suite/v2/thread_name_websocket.rs
Eric Traut 51bfb5f3b1 Restore app-server websocket listener with auth guard (#22404)
## Why
PR #21843 removed the TCP websocket app-server listener, but that also
removed functionality that still needs to exist. Restoring it as-is
would reopen the old remote exposure problem, so this keeps the restored
listener while making remote and non-loopback usage require explicit
auth.

## What Changed
- Mostly reverts #21843 and reapplies the small merge-conflict
resolutions needed on top of current main.
- Restores ws://IP:PORT parsing, the app-server TCP websocket acceptor,
websocket auth CLI flags, and the associated tests.
- The only intentional behavior change from the restored code is that
non-loopback websocket listeners now fail startup unless --ws-auth
capability-token or --ws-auth signed-bearer-token is configured.
Loopback listeners remain available for local and SSH-forwarding
workflows.

## Reviewer Focus
Please focus review on the small auth-enforcement delta layered on top
of the revert:

- codex-rs/app-server-transport/src/transport/websocket.rs:
start_websocket_acceptor now rejects unauthenticated non-loopback
websocket binds before accepting connections.
- codex-rs/app-server-transport/src/transport/auth.rs: helper logic
classifies unauthenticated non-loopback listeners.
- codex-rs/app-server/tests/suite/v2/connection_handling_websocket.rs:
tests cover unauthenticated ws://0.0.0.0 startup rejection and
authenticated non-loopback capability-token startup.

Everything else is intended to be revert/merge-conflict restoration
rather than new product behavior.

## Verification

- Manually verified that TUI remoting is restored and that auth is
enforced for non-localhost urls.
2026-05-12 18:40:53 -07:00

197 lines
7.2 KiB
Rust

use super::connection_handling_websocket::DEFAULT_READ_TIMEOUT;
use super::connection_handling_websocket::WsClient;
use super::connection_handling_websocket::assert_no_message;
use super::connection_handling_websocket::connect_websocket;
use super::connection_handling_websocket::create_config_toml;
use super::connection_handling_websocket::read_notification_for_method;
use super::connection_handling_websocket::read_response_and_notification_for_method;
use super::connection_handling_websocket::read_response_for_id;
use super::connection_handling_websocket::send_initialize_request;
use super::connection_handling_websocket::send_request;
use super::connection_handling_websocket::spawn_websocket_server;
use anyhow::Context;
use anyhow::Result;
use app_test_support::create_fake_rollout_with_text_elements;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::ThreadNameUpdatedNotification;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadSetNameParams;
use codex_app_server_protocol::ThreadSetNameResponse;
use codex_core::find_thread_name_by_id;
use codex_protocol::ThreadId;
use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::Duration;
use tokio::time::timeout;
#[tokio::test]
async fn thread_name_updated_broadcasts_for_loaded_threads() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), "never")?;
let conversation_id = create_rollout(codex_home.path(), "2025-01-05T12-00-00")?;
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
let result = async {
let mut ws1 = connect_websocket(bind_addr).await?;
let mut ws2 = connect_websocket(bind_addr).await?;
initialize_both_clients(&mut ws1, &mut ws2).await?;
send_request(
&mut ws1,
"thread/resume",
/*id*/ 10,
Some(serde_json::to_value(ThreadResumeParams {
thread_id: conversation_id.clone(),
..Default::default()
})?),
)
.await?;
let resume_resp: JSONRPCResponse = read_response_for_id(&mut ws1, /*id*/ 10).await?;
let resume: ThreadResumeResponse = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(resume.thread.id, conversation_id);
let renamed = "Loaded rename";
send_request(
&mut ws1,
"thread/name/set",
/*id*/ 11,
Some(serde_json::to_value(ThreadSetNameParams {
thread_id: conversation_id.clone(),
name: renamed.to_string(),
})?),
)
.await?;
let (rename_resp, ws1_notification) = read_response_and_notification_for_method(
&mut ws1,
/*id*/ 11,
"thread/name/updated",
)
.await?;
let _: ThreadSetNameResponse = to_response::<ThreadSetNameResponse>(rename_resp)?;
assert_thread_name_updated(ws1_notification, &conversation_id, renamed)?;
let ws2_notification =
read_notification_for_method(&mut ws2, "thread/name/updated").await?;
assert_thread_name_updated(ws2_notification, &conversation_id, renamed)?;
assert_legacy_thread_name(codex_home.path(), &conversation_id, renamed).await?;
assert_no_message(&mut ws1, Duration::from_millis(250)).await?;
assert_no_message(&mut ws2, Duration::from_millis(250)).await?;
Ok(())
}
.await;
process
.kill()
.await
.context("failed to stop websocket app-server process")?;
result
}
#[tokio::test]
async fn thread_name_updated_broadcasts_for_not_loaded_threads() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), "never")?;
let conversation_id = create_rollout(codex_home.path(), "2025-01-05T12-05-00")?;
let (mut process, bind_addr) = spawn_websocket_server(codex_home.path()).await?;
let result = async {
let mut ws1 = connect_websocket(bind_addr).await?;
let mut ws2 = connect_websocket(bind_addr).await?;
initialize_both_clients(&mut ws1, &mut ws2).await?;
let renamed = "Stored rename";
send_request(
&mut ws1,
"thread/name/set",
/*id*/ 20,
Some(serde_json::to_value(ThreadSetNameParams {
thread_id: conversation_id.clone(),
name: renamed.to_string(),
})?),
)
.await?;
let (rename_resp, ws1_notification) = read_response_and_notification_for_method(
&mut ws1,
/*id*/ 20,
"thread/name/updated",
)
.await?;
let _: ThreadSetNameResponse = to_response::<ThreadSetNameResponse>(rename_resp)?;
assert_thread_name_updated(ws1_notification, &conversation_id, renamed)?;
let ws2_notification =
read_notification_for_method(&mut ws2, "thread/name/updated").await?;
assert_thread_name_updated(ws2_notification, &conversation_id, renamed)?;
assert_legacy_thread_name(codex_home.path(), &conversation_id, renamed).await?;
assert_no_message(&mut ws1, Duration::from_millis(250)).await?;
assert_no_message(&mut ws2, Duration::from_millis(250)).await?;
Ok(())
}
.await;
process
.kill()
.await
.context("failed to stop websocket app-server process")?;
result
}
async fn initialize_both_clients(ws1: &mut WsClient, ws2: &mut WsClient) -> Result<()> {
send_initialize_request(ws1, /*id*/ 1, "ws_client_one").await?;
timeout(DEFAULT_READ_TIMEOUT, read_response_for_id(ws1, /*id*/ 1)).await??;
send_initialize_request(ws2, /*id*/ 2, "ws_client_two").await?;
timeout(DEFAULT_READ_TIMEOUT, read_response_for_id(ws2, /*id*/ 2)).await??;
Ok(())
}
fn create_rollout(codex_home: &std::path::Path, filename_ts: &str) -> Result<String> {
create_fake_rollout_with_text_elements(
codex_home,
filename_ts,
"2025-01-05T12:00:00Z",
"Saved user message",
Vec::new(),
Some("mock_provider"),
/*git_info*/ None,
)
}
fn assert_thread_name_updated(
notification: JSONRPCNotification,
thread_id: &str,
thread_name: &str,
) -> Result<()> {
let notification: ThreadNameUpdatedNotification =
serde_json::from_value(notification.params.context("thread/name/updated params")?)?;
assert_eq!(notification.thread_id, thread_id);
assert_eq!(notification.thread_name.as_deref(), Some(thread_name));
Ok(())
}
async fn assert_legacy_thread_name(
codex_home: &Path,
conversation_id: &str,
expected_name: &str,
) -> Result<()> {
let thread_id = ThreadId::from_string(conversation_id)?;
assert_eq!(
find_thread_name_by_id(codex_home, &thread_id)
.await?
.as_deref(),
Some(expected_name)
);
Ok(())
}