Files
codex/codex-rs/app-server/src/transport/stdio.rs
Ruslan Nigmatullin 69c3d12274 app-server: implement device key v2 methods (#18430)
## Why

The device-key protocol needs an app-server implementation that keeps
local key operations behind the same request-processing boundary as
other v2 APIs.

app-server owns request dispatch, transport policy, documentation, and
JSON-RPC error shaping. `codex-device-key` owns key binding, validation,
platform provider selection, and signing mechanics. Keeping the adapter
thin makes the boundary easier to review and avoids moving local
key-management details into thread orchestration code.

## What changed

- Added `DeviceKeyApi` as the app-server adapter around
`DeviceKeyStore`.
- Converted protocol protection policies, payload variants, algorithms,
and protection classes to and from the device-key crate types.
- Encoded SPKI public keys and DER signatures as base64 protocol fields.
- Routed `device/key/create`, `device/key/public`, and `device/key/sign`
through `MessageProcessor`.
- Rejected remote transports before provider access while allowing local
`stdio` and in-process callers to reach the device-key API.
- Added stdio, in-process, and websocket tests for device-key validation
and transport policy.
- Documented the device-key methods in the app-server v2 method list.

## Test coverage

- `device_key_create_rejects_empty_account_user_id`
- `in_process_allows_device_key_requests_to_reach_device_key_api`
- `device_key_methods_are_rejected_over_websocket`

## Stack

This is PR 3 of 4 in the device-key app-server stack. It is stacked on
#18429.

## Validation

- `cargo test -p codex-app-server device_key`
- `just fix -p codex-app-server`
2026-04-21 14:07:08 -07:00

114 lines
3.9 KiB
Rust

use super::CHANNEL_CAPACITY;
use super::ConnectionOrigin;
use super::TransportEvent;
use super::forward_incoming_message;
use super::next_connection_id;
use super::serialize_outgoing_message;
use crate::outgoing_message::QueuedOutgoingMessage;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCRequest;
use std::io::ErrorKind;
use std::io::Result as IoResult;
use tokio::io;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tracing::debug;
use tracing::error;
use tracing::info;
pub(crate) async fn start_stdio_connection(
transport_event_tx: mpsc::Sender<TransportEvent>,
stdio_handles: &mut Vec<JoinHandle<()>>,
initialize_client_name_tx: oneshot::Sender<String>,
) -> IoResult<()> {
let connection_id = next_connection_id();
let (writer_tx, mut writer_rx) = mpsc::channel::<QueuedOutgoingMessage>(CHANNEL_CAPACITY);
let writer_tx_for_reader = writer_tx.clone();
transport_event_tx
.send(TransportEvent::ConnectionOpened {
connection_id,
origin: ConnectionOrigin::Stdio,
writer: writer_tx,
disconnect_sender: None,
})
.await
.map_err(|_| std::io::Error::new(ErrorKind::BrokenPipe, "processor unavailable"))?;
let transport_event_tx_for_reader = transport_event_tx.clone();
stdio_handles.push(tokio::spawn(async move {
let stdin = io::stdin();
let reader = BufReader::new(stdin);
let mut lines = reader.lines();
let mut initialize_client_name_tx = Some(initialize_client_name_tx);
loop {
match lines.next_line().await {
Ok(Some(line)) => {
if let Some(client_name) = stdio_initialize_client_name(&line)
&& let Some(initialize_client_name_tx) = initialize_client_name_tx.take()
{
let _ = initialize_client_name_tx.send(client_name);
}
if !forward_incoming_message(
&transport_event_tx_for_reader,
&writer_tx_for_reader,
connection_id,
&line,
)
.await
{
break;
}
}
Ok(None) => break,
Err(err) => {
error!("Failed reading stdin: {err}");
break;
}
}
}
let _ = transport_event_tx_for_reader
.send(TransportEvent::ConnectionClosed { connection_id })
.await;
debug!("stdin reader finished (EOF)");
}));
stdio_handles.push(tokio::spawn(async move {
let mut stdout = io::stdout();
while let Some(queued_message) = writer_rx.recv().await {
let Some(mut json) = serialize_outgoing_message(queued_message.message) else {
continue;
};
json.push('\n');
if let Err(err) = stdout.write_all(json.as_bytes()).await {
error!("Failed to write to stdout: {err}");
break;
}
if let Some(write_complete_tx) = queued_message.write_complete_tx {
let _ = write_complete_tx.send(());
}
}
info!("stdout writer exited (channel closed)");
}));
Ok(())
}
fn stdio_initialize_client_name(line: &str) -> Option<String> {
let message = serde_json::from_str::<JSONRPCMessage>(line).ok()?;
let JSONRPCMessage::Request(JSONRPCRequest { method, params, .. }) = message else {
return None;
};
if method != "initialize" {
return None;
}
let params = serde_json::from_value::<InitializeParams>(params?).ok()?;
Some(params.client_info.name)
}