mirror of
https://github.com/openai/codex.git
synced 2026-04-30 17:36:40 +00:00
refactor(exec-server): split transports from client launch
Separate the transport-neutral JSON-RPC connection and server processor from local process spawning, add websocket support, and document the new API shape. Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
262
codex-rs/exec-server/src/connection.rs
Normal file
262
codex-rs/exec-server/src/connection.rs
Normal file
@@ -0,0 +1,262 @@
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use futures::SinkExt;
|
||||
use futures::StreamExt;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::io::BufWriter;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_tungstenite::WebSocketStream;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
|
||||
pub(crate) const CHANNEL_CAPACITY: usize = 128;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum JsonRpcConnectionEvent {
|
||||
Message(JSONRPCMessage),
|
||||
Disconnected { reason: Option<String> },
|
||||
}
|
||||
|
||||
pub(crate) struct JsonRpcConnection {
|
||||
outgoing_tx: mpsc::Sender<JSONRPCMessage>,
|
||||
incoming_rx: mpsc::Receiver<JsonRpcConnectionEvent>,
|
||||
}
|
||||
|
||||
impl JsonRpcConnection {
|
||||
pub(crate) fn from_stdio<R, W>(reader: R, writer: W, connection_label: String) -> Self
|
||||
where
|
||||
R: AsyncRead + Unpin + Send + 'static,
|
||||
W: AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let (incoming_tx, incoming_rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
|
||||
let reader_label = connection_label.clone();
|
||||
let incoming_tx_for_reader = incoming_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut lines = BufReader::new(reader).lines();
|
||||
loop {
|
||||
match lines.next_line().await {
|
||||
Ok(Some(line)) => {
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
match serde_json::from_str::<JSONRPCMessage>(&line) {
|
||||
Ok(message) => {
|
||||
if incoming_tx_for_reader
|
||||
.send(JsonRpcConnectionEvent::Message(message))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
send_disconnected(
|
||||
&incoming_tx_for_reader,
|
||||
Some(format!(
|
||||
"failed to parse JSON-RPC message from {reader_label}: {err}"
|
||||
)),
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
send_disconnected(&incoming_tx_for_reader, None).await;
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
send_disconnected(
|
||||
&incoming_tx_for_reader,
|
||||
Some(format!(
|
||||
"failed to read JSON-RPC message from {reader_label}: {err}"
|
||||
)),
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut writer = BufWriter::new(writer);
|
||||
while let Some(message) = outgoing_rx.recv().await {
|
||||
if let Err(err) = write_jsonrpc_line_message(&mut writer, &message).await {
|
||||
send_disconnected(
|
||||
&incoming_tx,
|
||||
Some(format!(
|
||||
"failed to write JSON-RPC message to {connection_label}: {err}"
|
||||
)),
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Self {
|
||||
outgoing_tx,
|
||||
incoming_rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn from_websocket<S>(stream: WebSocketStream<S>, connection_label: String) -> Self
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let (incoming_tx, incoming_rx) = mpsc::channel(CHANNEL_CAPACITY);
|
||||
let (mut websocket_writer, mut websocket_reader) = stream.split();
|
||||
|
||||
let reader_label = connection_label.clone();
|
||||
let incoming_tx_for_reader = incoming_tx.clone();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match websocket_reader.next().await {
|
||||
Some(Ok(Message::Text(text))) => {
|
||||
match serde_json::from_str::<JSONRPCMessage>(text.as_ref()) {
|
||||
Ok(message) => {
|
||||
if incoming_tx_for_reader
|
||||
.send(JsonRpcConnectionEvent::Message(message))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
send_disconnected(
|
||||
&incoming_tx_for_reader,
|
||||
Some(format!(
|
||||
"failed to parse websocket JSON-RPC message from {reader_label}: {err}"
|
||||
)),
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Ok(Message::Binary(bytes))) => {
|
||||
match serde_json::from_slice::<JSONRPCMessage>(bytes.as_ref()) {
|
||||
Ok(message) => {
|
||||
if incoming_tx_for_reader
|
||||
.send(JsonRpcConnectionEvent::Message(message))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
send_disconnected(
|
||||
&incoming_tx_for_reader,
|
||||
Some(format!(
|
||||
"failed to parse websocket JSON-RPC message from {reader_label}: {err}"
|
||||
)),
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(Ok(Message::Close(_))) => {
|
||||
send_disconnected(&incoming_tx_for_reader, None).await;
|
||||
break;
|
||||
}
|
||||
Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) => {}
|
||||
Some(Ok(_)) => {}
|
||||
Some(Err(err)) => {
|
||||
send_disconnected(
|
||||
&incoming_tx_for_reader,
|
||||
Some(format!(
|
||||
"failed to read websocket JSON-RPC message from {reader_label}: {err}"
|
||||
)),
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
None => {
|
||||
send_disconnected(&incoming_tx_for_reader, None).await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(message) = outgoing_rx.recv().await {
|
||||
match serialize_jsonrpc_message(&message) {
|
||||
Ok(encoded) => {
|
||||
if let Err(err) = websocket_writer.send(Message::Text(encoded.into())).await
|
||||
{
|
||||
send_disconnected(
|
||||
&incoming_tx,
|
||||
Some(format!(
|
||||
"failed to write websocket JSON-RPC message to {connection_label}: {err}"
|
||||
)),
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
send_disconnected(
|
||||
&incoming_tx,
|
||||
Some(format!(
|
||||
"failed to serialize JSON-RPC message for {connection_label}: {err}"
|
||||
)),
|
||||
)
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Self {
|
||||
outgoing_tx,
|
||||
incoming_rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn into_parts(
|
||||
self,
|
||||
) -> (
|
||||
mpsc::Sender<JSONRPCMessage>,
|
||||
mpsc::Receiver<JsonRpcConnectionEvent>,
|
||||
) {
|
||||
(self.outgoing_tx, self.incoming_rx)
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_disconnected(
|
||||
incoming_tx: &mpsc::Sender<JsonRpcConnectionEvent>,
|
||||
reason: Option<String>,
|
||||
) {
|
||||
let _ = incoming_tx
|
||||
.send(JsonRpcConnectionEvent::Disconnected { reason })
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn write_jsonrpc_line_message<W>(
|
||||
writer: &mut BufWriter<W>,
|
||||
message: &JSONRPCMessage,
|
||||
) -> std::io::Result<()>
|
||||
where
|
||||
W: AsyncWrite + Unpin,
|
||||
{
|
||||
let encoded =
|
||||
serialize_jsonrpc_message(message).map_err(|err| std::io::Error::other(err.to_string()))?;
|
||||
writer.write_all(encoded.as_bytes()).await?;
|
||||
writer.write_all(b"\n").await?;
|
||||
writer.flush().await
|
||||
}
|
||||
|
||||
fn serialize_jsonrpc_message(message: &JSONRPCMessage) -> Result<String, serde_json::Error> {
|
||||
serde_json::to_string(message)
|
||||
}
|
||||
Reference in New Issue
Block a user