Move to an arcswap

This commit is contained in:
jif-oai
2026-03-24 23:12:20 +00:00
parent deec6cc824
commit 9e91c882fd
3 changed files with 33 additions and 8 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -2025,6 +2025,7 @@ name = "codex-exec-server"
version = "0.0.0"
dependencies = [
"anyhow",
"arc-swap",
"async-trait",
"base64 0.22.1",
"clap",

View File

@@ -15,6 +15,7 @@ path = "src/bin/codex-exec-server.rs"
workspace = true
[dependencies]
arc-swap = "1.8.2"
async-trait = { workspace = true }
base64 = { workspace = true }
clap = { workspace = true, features = ["derive"] }

View File

@@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwap;
use codex_app_server_protocol::FsCopyParams;
use codex_app_server_protocol::FsCopyResponse;
use codex_app_server_protocol::FsCreateDirectoryParams;
@@ -96,7 +97,8 @@ impl RemoteExecServerConnectArgs {
struct Inner {
client: RpcClient,
sessions: Mutex<HashMap<String, broadcast::Sender<ExecSessionEvent>>>,
sessions: ArcSwap<HashMap<String, broadcast::Sender<ExecSessionEvent>>>,
sessions_write_lock: Mutex<()>,
reader_task: tokio::task::JoinHandle<()>,
}
@@ -318,18 +320,28 @@ impl ExecServerClient {
ExecServerError,
> {
let (events_tx, events_rx) = broadcast::channel(256);
let mut sessions = self.inner.sessions.lock().await;
let _sessions_write_guard = self.inner.sessions_write_lock.lock().await;
let sessions = self.inner.sessions.load();
if sessions.contains_key(process_id) {
return Err(ExecServerError::Protocol(format!(
"session already registered for process {process_id}"
)));
}
sessions.insert(process_id.to_string(), events_tx.clone());
let mut next_sessions = sessions.as_ref().clone();
next_sessions.insert(process_id.to_string(), events_tx.clone());
self.inner.sessions.store(Arc::new(next_sessions));
Ok((events_tx, events_rx))
}
pub(crate) async fn unregister_session(&self, process_id: &str) {
self.inner.sessions.lock().await.remove(process_id);
let _sessions_write_guard = self.inner.sessions_write_lock.lock().await;
let sessions = self.inner.sessions.load();
if !sessions.contains_key(process_id) {
return;
}
let mut next_sessions = sessions.as_ref().clone();
next_sessions.remove(process_id);
self.inner.sessions.store(Arc::new(next_sessions));
}
async fn connect(
@@ -363,7 +375,8 @@ impl ExecServerClient {
Inner {
client: rpc_client,
sessions: Mutex::new(HashMap::new()),
sessions: ArcSwap::from_pointee(HashMap::new()),
sessions_write_lock: Mutex::new(()),
reader_task,
}
});
@@ -403,7 +416,7 @@ async fn handle_server_notification(
EXEC_OUTPUT_DELTA_METHOD => {
let params: ExecOutputDeltaNotification =
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
let events_tx = { inner.sessions.lock().await.get(&params.process_id).cloned() };
let events_tx = inner.sessions.load().get(&params.process_id).cloned();
if let Some(events_tx) = events_tx {
let _ = events_tx.send(ExecSessionEvent::Output {
seq: params.seq,
@@ -415,7 +428,7 @@ async fn handle_server_notification(
EXEC_EXITED_METHOD => {
let params: ExecExitedNotification =
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
let events_tx = { inner.sessions.lock().await.get(&params.process_id).cloned() };
let events_tx = inner.sessions.load().get(&params.process_id).cloned();
if let Some(events_tx) = events_tx {
let _ = events_tx.send(ExecSessionEvent::Exited {
seq: params.seq,
@@ -426,7 +439,17 @@ async fn handle_server_notification(
EXEC_CLOSED_METHOD => {
let params: ExecClosedNotification =
serde_json::from_value(notification.params.unwrap_or(Value::Null))?;
let events_tx = { inner.sessions.lock().await.remove(&params.process_id) };
let events_tx = {
let _sessions_write_guard = inner.sessions_write_lock.lock().await;
let sessions = inner.sessions.load();
let events_tx = sessions.get(&params.process_id).cloned();
if events_tx.is_some() {
let mut next_sessions = sessions.as_ref().clone();
next_sessions.remove(&params.process_id);
inner.sessions.store(Arc::new(next_sessions));
}
events_tx
};
if let Some(events_tx) = events_tx {
let _ = events_tx.send(ExecSessionEvent::Closed { seq: params.seq });
}