Merge branch 'etraut/next-turn-state-app-server' into etraut/next-turn-state-tui

This commit is contained in:
Eric Traut
2026-05-18 20:27:53 -07:00
4 changed files with 37 additions and 40 deletions

View File

@@ -2690,17 +2690,8 @@ impl ThreadRequestProcessor {
let thread_state = self
.thread_state_manager
.try_ensure_connection_subscribed(
existing_thread_id,
request_id.connection_id,
/*experimental_raw_events*/ false,
)
.await
.ok_or_else(|| {
internal_error(format!(
"failed to subscribe connection for running thread {existing_thread_id}"
))
})?;
.thread_state(existing_thread_id)
.await;
self.ensure_listener_task_running(
existing_thread_id,
existing_thread.clone(),

View File

@@ -679,12 +679,16 @@ impl TurnRequestProcessor {
thread_settings,
};
let turn_id = Uuid::now_v7().to_string();
let thread_settings_applied = if has_thread_settings_overrides {
let thread_state = self.thread_state_manager.thread_state(thread_id).await;
Some({
let pending_thread_settings = if has_thread_settings_overrides {
let (thread_settings_applied, notification_lock) = {
let thread_state = self.thread_state_manager.thread_state(thread_id).await;
let mut thread_state = thread_state.lock().await;
thread_state.track_pending_thread_settings(turn_id.clone())
})
};
Some((
thread_settings_applied,
notification_lock.lock_owned().await,
))
} else {
None
};
@@ -706,15 +710,16 @@ impl TurnRequestProcessor {
return Err(error);
}
if let Some(thread_settings_applied) = thread_settings_applied {
if let Some((thread_settings_applied, thread_settings_notification_guard)) =
pending_thread_settings
{
let processor = self.clone();
let api_thread_id = params.thread_id.clone();
let tracked_turn_id = turn_id.clone();
tokio::spawn(async move {
match tokio::time::timeout(THREAD_SETTINGS_ACK_TIMEOUT, thread_settings_applied)
.await
{
Ok(Ok(Ok(payload))) => {
let _thread_settings_notification_guard = thread_settings_notification_guard;
match thread_settings_applied.await {
Ok(Ok(payload)) => {
let after_thread_settings = thread_settings_from_applied_event(&payload);
processor
.maybe_emit_thread_settings_updated(
@@ -725,19 +730,14 @@ impl TurnRequestProcessor {
)
.await;
}
Ok(Ok(Err(err))) => {
Ok(Err(err)) => {
tracing::warn!(
"failed to apply thread settings overrides for turn {tracked_turn_id}: {err}"
);
}
Ok(Err(_)) => {
tracing::warn!(
"thread settings override acknowledgement was cancelled for turn {tracked_turn_id}"
);
}
Err(_) => {
tracing::warn!(
"timed out waiting for thread settings overrides to apply for turn {tracked_turn_id}"
"thread settings override acknowledgement was cancelled for turn {tracked_turn_id}"
);
}
}
@@ -815,11 +815,12 @@ impl TurnRequestProcessor {
invalid_request(format!("invalid thread settings override: {err}"))
})?;
let update_id = Uuid::now_v7().to_string();
let thread_settings_applied = {
let (thread_settings_applied, notification_lock) = {
let thread_state = self.thread_state_manager.thread_state(thread_id).await;
let mut thread_state = thread_state.lock().await;
thread_state.track_pending_thread_settings(update_id.clone())
};
let thread_settings_notification_guard = notification_lock.lock_owned().await;
if let Err(err) = thread
.submit_with_id(Submission {
id: update_id.clone(),
@@ -837,7 +838,7 @@ impl TurnRequestProcessor {
"failed to update thread settings: {err}"
)));
}
match thread_settings_applied.await {
let after_thread_settings = match thread_settings_applied.await {
Ok(Ok(payload)) => thread_settings_from_applied_event(&payload),
Ok(Err(err)) => return Err(invalid_request(err)),
Err(_) => {
@@ -845,17 +846,19 @@ impl TurnRequestProcessor {
"thread settings override waiter was cancelled".to_string(),
));
}
}
};
self.maybe_emit_thread_settings_updated(
thread_id,
&params.thread_id,
&before_thread_settings,
after_thread_settings.clone(),
)
.await;
drop(thread_settings_notification_guard);
after_thread_settings
} else {
before_thread_settings.clone()
};
self.maybe_emit_thread_settings_updated(
thread_id,
&params.thread_id,
&before_thread_settings,
after_thread_settings.clone(),
)
.await;
Ok(ThreadSettingsUpdateResponse {
thread_settings: after_thread_settings,

View File

@@ -81,6 +81,7 @@ pub(crate) struct ThreadState {
listener_command_tx: Option<mpsc::UnboundedSender<ThreadListenerCommand>>,
current_turn_history: ThreadHistoryBuilder,
pending_thread_settings_waiters: HashMap<String, Vec<oneshot::Sender<ThreadSettingsAck>>>,
thread_settings_notification_lock: Arc<Mutex<()>>,
listener_thread: Option<Weak<CodexThread>>,
watch_registration: WatchRegistration,
}
@@ -138,13 +139,13 @@ impl ThreadState {
pub(crate) fn track_pending_thread_settings(
&mut self,
submission_id: String,
) -> oneshot::Receiver<ThreadSettingsAck> {
) -> (oneshot::Receiver<ThreadSettingsAck>, Arc<Mutex<()>>) {
let (tx, rx) = oneshot::channel();
self.pending_thread_settings_waiters
.entry(submission_id)
.or_default()
.push(tx);
rx
(rx, Arc::clone(&self.thread_settings_notification_lock))
}
pub(crate) fn track_current_pending_thread_settings(

View File

@@ -47,7 +47,9 @@ pub(super) fn server_notification_thread_target(
ServerNotification::ThreadStatusChanged(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadSettingsUpdated(_) => None,
ServerNotification::ThreadSettingsUpdated(notification) => {
Some(notification.thread_id.as_str())
}
ServerNotification::ThreadArchived(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ThreadUnarchived(notification) => Some(notification.thread_id.as_str()),
ServerNotification::ThreadClosed(notification) => Some(notification.thread_id.as_str()),