Serialize thread settings notifications

This commit is contained in:
Eric Traut
2026-05-18 20:04:52 -07:00
parent 7287944484
commit 2a3f524006
2 changed files with 28 additions and 17 deletions

View File

@@ -679,12 +679,16 @@ impl TurnRequestProcessor {
thread_settings,
};
let turn_id = Uuid::now_v7().to_string();
let thread_settings_applied = if has_thread_settings_overrides {
let thread_state = self.thread_state_manager.thread_state(thread_id).await;
Some({
let pending_thread_settings = if has_thread_settings_overrides {
let (thread_settings_applied, notification_lock) = {
let thread_state = self.thread_state_manager.thread_state(thread_id).await;
let mut thread_state = thread_state.lock().await;
thread_state.track_pending_thread_settings(turn_id.clone())
})
};
Some((
thread_settings_applied,
notification_lock.lock_owned().await,
))
} else {
None
};
@@ -706,11 +710,14 @@ impl TurnRequestProcessor {
return Err(error);
}
if let Some(thread_settings_applied) = thread_settings_applied {
if let Some((thread_settings_applied, thread_settings_notification_guard)) =
pending_thread_settings
{
let processor = self.clone();
let api_thread_id = params.thread_id.clone();
let tracked_turn_id = turn_id.clone();
tokio::spawn(async move {
let _thread_settings_notification_guard = thread_settings_notification_guard;
match tokio::time::timeout(THREAD_SETTINGS_ACK_TIMEOUT, thread_settings_applied)
.await
{
@@ -815,11 +822,12 @@ impl TurnRequestProcessor {
invalid_request(format!("invalid thread settings override: {err}"))
})?;
let update_id = Uuid::now_v7().to_string();
let thread_settings_applied = {
let (thread_settings_applied, notification_lock) = {
let thread_state = self.thread_state_manager.thread_state(thread_id).await;
let mut thread_state = thread_state.lock().await;
thread_state.track_pending_thread_settings(update_id.clone())
};
let thread_settings_notification_guard = notification_lock.lock_owned().await;
if let Err(err) = thread
.submit_with_id(Submission {
id: update_id.clone(),
@@ -837,7 +845,7 @@ impl TurnRequestProcessor {
"failed to update thread settings: {err}"
)));
}
match thread_settings_applied.await {
let after_thread_settings = match thread_settings_applied.await {
Ok(Ok(payload)) => thread_settings_from_applied_event(&payload),
Ok(Err(err)) => return Err(invalid_request(err)),
Err(_) => {
@@ -845,17 +853,19 @@ impl TurnRequestProcessor {
"thread settings override waiter was cancelled".to_string(),
));
}
}
};
self.maybe_emit_thread_settings_updated(
thread_id,
&params.thread_id,
&before_thread_settings,
after_thread_settings.clone(),
)
.await;
drop(thread_settings_notification_guard);
after_thread_settings
} else {
before_thread_settings.clone()
};
self.maybe_emit_thread_settings_updated(
thread_id,
&params.thread_id,
&before_thread_settings,
after_thread_settings.clone(),
)
.await;
Ok(ThreadSettingsUpdateResponse {
thread_settings: after_thread_settings,

View File

@@ -81,6 +81,7 @@ pub(crate) struct ThreadState {
listener_command_tx: Option<mpsc::UnboundedSender<ThreadListenerCommand>>,
current_turn_history: ThreadHistoryBuilder,
pending_thread_settings_waiters: HashMap<String, Vec<oneshot::Sender<ThreadSettingsAck>>>,
thread_settings_notification_lock: Arc<Mutex<()>>,
listener_thread: Option<Weak<CodexThread>>,
watch_registration: WatchRegistration,
}
@@ -138,13 +139,13 @@ impl ThreadState {
pub(crate) fn track_pending_thread_settings(
&mut self,
submission_id: String,
) -> oneshot::Receiver<ThreadSettingsAck> {
) -> (oneshot::Receiver<ThreadSettingsAck>, Arc<Mutex<()>>) {
let (tx, rx) = oneshot::channel();
self.pending_thread_settings_waiters
.entry(submission_id)
.or_default()
.push(tx);
rx
(rx, Arc::clone(&self.thread_settings_notification_lock))
}
pub(crate) fn track_current_pending_thread_settings(