mirror of
https://github.com/openai/codex.git
synced 2026-05-16 17:23:57 +00:00
## Why Initialized app-server RPCs no longer need to bottleneck behind one request processor path. Running them concurrently improves responsiveness, but several request families still mutate shared state or depend on ordered side effects. Those stateful families need an auditable serialization contract so concurrency does not reorder thread, config, auth, command, watcher, MCP, or similar state transitions. This PR keeps that boundary explicit: stateful work is serialized by the smallest useful key, while intentionally read-only or externally concurrent work remains unkeyed. In particular, `thread/list` and `thread/turns/list` explicitly have no serialization because they primarily read append-only rollout storage and should continue to be served concurrently. ## What changed - Adds `ClientRequest::serialization_scope()` in `app-server-protocol` and requires every client request definition to declare its serialization behavior. - Introduces keyed request scopes for thread, thread path, command exec process, fuzzy search session, fs watch, MCP OAuth, and global state buckets such as config, account auth, memory, and device keys. - Routes initialized app-server RPCs through per-key FIFO serialization while allowing unkeyed initialized requests to run concurrently. - Cancels in-flight initialized RPC work when the connection disconnects or the app-server exits so spawned request tasks do not outlive their session. - Adds focused coverage for representative keyed and unkeyed serialization scopes, including explicitly concurrent `thread/turns/list` behavior. ## Validation - Added protocol tests for representative keyed serialization scopes and intentionally unkeyed request families. - Added app-server request serialization tests covering per-key FIFO behavior, concurrent unkeyed execution, disconnect shutdown, and config read-after-write ordering. - Local focused protocol validation after the latest rebase is currently blocked by packageproxy failing to resolve locked `rustls-webpki 0.103.13`; CI is expected to provide the full validation signal.
210 lines
6.1 KiB
Rust
210 lines
6.1 KiB
Rust
use std::future::Future;
|
|
|
|
use tokio::sync::Mutex;
|
|
use tokio_util::task::TaskTracker;
|
|
|
|
/// Per-connection gate for initialized RPC handler execution.
|
|
///
|
|
/// Closing the gate prevents queued handlers from starting while allowing
|
|
/// handlers that already acquired a token to finish.
|
|
#[derive(Debug)]
|
|
pub(crate) struct ConnectionRpcGate {
|
|
accepting: Mutex<bool>,
|
|
tasks: TaskTracker,
|
|
}
|
|
|
|
impl ConnectionRpcGate {
|
|
pub(crate) fn new() -> Self {
|
|
let accepting = true;
|
|
Self {
|
|
accepting: Mutex::new(accepting),
|
|
tasks: TaskTracker::new(),
|
|
}
|
|
}
|
|
|
|
pub(crate) async fn run<F>(&self, future: F)
|
|
where
|
|
F: Future<Output = ()>,
|
|
{
|
|
let token = {
|
|
let accepting = self.accepting.lock().await;
|
|
if !*accepting {
|
|
return;
|
|
}
|
|
self.tasks.token()
|
|
};
|
|
|
|
future.await;
|
|
drop(token);
|
|
}
|
|
|
|
pub(crate) async fn shutdown(&self) {
|
|
{
|
|
let mut accepting = self.accepting.lock().await;
|
|
*accepting = false;
|
|
self.tasks.close();
|
|
}
|
|
self.tasks.wait().await;
|
|
}
|
|
|
|
#[cfg(test)]
|
|
async fn is_accepting(&self) -> bool {
|
|
*self.accepting.lock().await
|
|
}
|
|
|
|
#[cfg(test)]
|
|
fn inflight_count(&self) -> usize {
|
|
self.tasks.len()
|
|
}
|
|
}
|
|
|
|
impl Default for ConnectionRpcGate {
|
|
fn default() -> Self {
|
|
Self::new()
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use pretty_assertions::assert_eq;
|
|
use std::sync::Arc;
|
|
use std::sync::atomic::AtomicBool;
|
|
use std::sync::atomic::Ordering;
|
|
use tokio::sync::oneshot;
|
|
use tokio::time::Duration;
|
|
use tokio::time::timeout;
|
|
|
|
#[tokio::test]
|
|
async fn run_executes_while_open() {
|
|
let gate = ConnectionRpcGate::new();
|
|
let ran = Arc::new(AtomicBool::new(/*v*/ false));
|
|
let ran_clone = Arc::clone(&ran);
|
|
|
|
gate.run(async move {
|
|
ran_clone.store(/*val*/ true, Ordering::Release);
|
|
})
|
|
.await;
|
|
|
|
assert!(ran.load(Ordering::Acquire));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn run_drops_future_without_polling_after_shutdown() {
|
|
let gate = ConnectionRpcGate::new();
|
|
gate.shutdown().await;
|
|
let polled = Arc::new(AtomicBool::new(/*v*/ false));
|
|
let polled_clone = Arc::clone(&polled);
|
|
|
|
gate.run(async move {
|
|
polled_clone.store(/*val*/ true, Ordering::Release);
|
|
})
|
|
.await;
|
|
|
|
assert!(!polled.load(Ordering::Acquire));
|
|
assert!(!gate.is_accepting().await);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn shutdown_waits_for_started_run_to_finish() {
|
|
let gate = Arc::new(ConnectionRpcGate::new());
|
|
let (started_tx, started_rx) = oneshot::channel();
|
|
let (finish_tx, finish_rx) = oneshot::channel();
|
|
let gate_for_run = Arc::clone(&gate);
|
|
let run_task = tokio::spawn(async move {
|
|
gate_for_run
|
|
.run(async move {
|
|
started_tx.send(()).expect("receiver should be open");
|
|
let _ = finish_rx.await;
|
|
})
|
|
.await;
|
|
});
|
|
|
|
started_rx.await.expect("run should start");
|
|
assert_eq!(gate.inflight_count(), 1);
|
|
|
|
let gate_for_shutdown = Arc::clone(&gate);
|
|
let shutdown_task = tokio::spawn(async move {
|
|
gate_for_shutdown.shutdown().await;
|
|
});
|
|
|
|
timeout(Duration::from_millis(/*millis*/ 50), shutdown_task)
|
|
.await
|
|
.expect_err("shutdown should wait for the running future");
|
|
|
|
finish_tx
|
|
.send(())
|
|
.expect("running future should be waiting");
|
|
run_task.await.expect("run task should complete");
|
|
gate.shutdown().await;
|
|
assert_eq!(gate.inflight_count(), 0);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn shutdown_drops_late_runs_while_waiting_for_inflight_work() {
|
|
let gate = Arc::new(ConnectionRpcGate::new());
|
|
let (started_tx, started_rx) = oneshot::channel();
|
|
let (finish_tx, finish_rx) = oneshot::channel();
|
|
let gate_for_run = Arc::clone(&gate);
|
|
let run_task = tokio::spawn(async move {
|
|
gate_for_run
|
|
.run(async move {
|
|
started_tx.send(()).expect("receiver should be open");
|
|
let _ = finish_rx.await;
|
|
})
|
|
.await;
|
|
});
|
|
|
|
started_rx.await.expect("run should start");
|
|
let gate_for_shutdown = Arc::clone(&gate);
|
|
let shutdown_task = tokio::spawn(async move {
|
|
gate_for_shutdown.shutdown().await;
|
|
});
|
|
|
|
timeout(Duration::from_millis(/*millis*/ 50), shutdown_task)
|
|
.await
|
|
.expect_err("shutdown should wait for the running future");
|
|
|
|
let late_polled = Arc::new(AtomicBool::new(/*v*/ false));
|
|
let late_polled_clone = Arc::clone(&late_polled);
|
|
gate.run(async move {
|
|
late_polled_clone.store(/*val*/ true, Ordering::Release);
|
|
})
|
|
.await;
|
|
|
|
assert!(!late_polled.load(Ordering::Acquire));
|
|
|
|
finish_tx
|
|
.send(())
|
|
.expect("running future should still be waiting");
|
|
run_task.await.expect("run task should complete");
|
|
gate.shutdown().await;
|
|
assert_eq!(gate.inflight_count(), 0);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn run_is_counted_before_handler_body_continues() {
|
|
let gate = Arc::new(ConnectionRpcGate::new());
|
|
let (entered_tx, entered_rx) = oneshot::channel();
|
|
let (continue_tx, continue_rx) = oneshot::channel();
|
|
let gate_for_run = Arc::clone(&gate);
|
|
let run_task = tokio::spawn(async move {
|
|
gate_for_run
|
|
.run(async move {
|
|
entered_tx.send(()).expect("receiver should be open");
|
|
let _ = continue_rx.await;
|
|
})
|
|
.await;
|
|
});
|
|
|
|
entered_rx.await.expect("handler body should be entered");
|
|
assert_eq!(gate.inflight_count(), 1);
|
|
|
|
continue_tx
|
|
.send(())
|
|
.expect("handler body should still be waiting");
|
|
run_task.await.expect("run task should complete");
|
|
assert_eq!(gate.inflight_count(), 0);
|
|
}
|
|
}
|