Compare commits

...

1 Commits

Author SHA1 Message Date
Channing Conger
587b86092e Manage turn worker shutdown
We only want one turn worker processing at a time, this codifies the
shutdown of one before resolving the turn.

Drop still does best effort shutdown.
2026-03-22 14:06:16 -07:00
2 changed files with 104 additions and 82 deletions

View File

@@ -147,7 +147,7 @@ impl CodeModeService {
let inner = Arc::clone(&self.inner);
let turn_message_rx = Arc::clone(&self.inner.turn_message_rx);
tokio::spawn(async move {
let worker_handle = tokio::spawn(async move {
loop {
let next_message = tokio::select! {
_ = &mut shutdown_rx => break,
@@ -205,6 +205,7 @@ impl CodeModeService {
CodeModeTurnWorker {
shutdown_tx: Some(shutdown_tx),
worker_handle: Some(worker_handle),
}
}
}
@@ -217,13 +218,29 @@ impl Default for CodeModeService {
pub struct CodeModeTurnWorker {
shutdown_tx: Option<oneshot::Sender<()>>,
worker_handle: Option<tokio::task::JoinHandle<()>>,
}
impl CodeModeTurnWorker {
fn signal_shutdown(&mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
}
pub async fn shutdown(mut self) {
self.signal_shutdown();
if let Some(worker_handle) = self.worker_handle.take()
&& let Err(err) = worker_handle.await
{
warn!("code mode turn worker task failed to join cleanly: {err}");
}
}
}
impl Drop for CodeModeTurnWorker {
fn drop(&mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
self.signal_shutdown();
}
}

View File

@@ -6182,7 +6182,7 @@ async fn run_sampling_request(
Arc::clone(&turn_context),
Arc::clone(&turn_diff_tracker),
);
let _code_mode_worker = sess
let code_mode_worker = sess
.services
.code_mode_service
.start_turn_worker(
@@ -6192,93 +6192,98 @@ async fn run_sampling_request(
Arc::clone(&turn_diff_tracker),
)
.await;
let mut retries = 0;
loop {
let err = match try_run_sampling_request(
tool_runtime.clone(),
Arc::clone(&sess),
Arc::clone(&turn_context),
client_session,
turn_metadata_header,
Arc::clone(&turn_diff_tracker),
server_model_warning_emitted_for_turn,
&prompt,
cancellation_token.child_token(),
)
.await
{
Ok(output) => {
return Ok(output);
}
Err(CodexErr::ContextWindowExceeded) => {
sess.set_total_tokens_full(&turn_context).await;
return Err(CodexErr::ContextWindowExceeded);
}
Err(CodexErr::UsageLimitReached(e)) => {
let rate_limits = e.rate_limits.clone();
if let Some(rate_limits) = rate_limits {
sess.update_rate_limits(&turn_context, *rate_limits).await;
}
return Err(CodexErr::UsageLimitReached(e));
}
Err(err) => err,
};
if !err.is_retryable() {
return Err(err);
}
// Use the configured provider-specific stream retry budget.
let max_retries = turn_context.provider.stream_max_retries();
if retries >= max_retries
&& client_session.try_switch_fallback_transport(
&turn_context.session_telemetry,
&turn_context.model_info,
let result = async {
let mut retries = 0;
loop {
let err = match try_run_sampling_request(
tool_runtime.clone(),
Arc::clone(&sess),
Arc::clone(&turn_context),
client_session,
turn_metadata_header,
Arc::clone(&turn_diff_tracker),
server_model_warning_emitted_for_turn,
&prompt,
cancellation_token.child_token(),
)
{
sess.send_event(
&turn_context,
EventMsg::Warning(WarningEvent {
message: format!("Falling back from WebSockets to HTTPS transport. {err:#}"),
}),
)
.await;
retries = 0;
continue;
}
if retries < max_retries {
retries += 1;
let delay = match &err {
CodexErr::Stream(_, requested_delay) => {
requested_delay.unwrap_or_else(|| backoff(retries))
.await
{
Ok(output) => break Ok(output),
Err(CodexErr::ContextWindowExceeded) => {
sess.set_total_tokens_full(&turn_context).await;
break Err(CodexErr::ContextWindowExceeded);
}
_ => backoff(retries),
Err(CodexErr::UsageLimitReached(e)) => {
let rate_limits = e.rate_limits.clone();
if let Some(rate_limits) = rate_limits {
sess.update_rate_limits(&turn_context, *rate_limits).await;
}
break Err(CodexErr::UsageLimitReached(e));
}
Err(err) => err,
};
warn!(
"stream disconnected - retrying sampling request ({retries}/{max_retries} in {delay:?})...",
);
// In release builds, hide the first websocket retry notification to reduce noisy
// transient reconnect messages. In debug builds, keep full visibility for diagnosis.
let report_error = retries > 1
|| cfg!(debug_assertions)
|| !sess.services.model_client.responses_websocket_enabled();
if report_error {
// Surface retry information to any UI/frontend so the
// user understands what is happening instead of staring
// at a seemingly frozen screen.
sess.notify_stream_error(
if !err.is_retryable() {
break Err(err);
}
// Use the configured provider-specific stream retry budget.
let max_retries = turn_context.provider.stream_max_retries();
if retries >= max_retries
&& client_session.try_switch_fallback_transport(
&turn_context.session_telemetry,
&turn_context.model_info,
)
{
sess.send_event(
&turn_context,
format!("Reconnecting... {retries}/{max_retries}"),
err,
EventMsg::Warning(WarningEvent {
message: format!("Falling back from WebSockets to HTTPS transport. {err:#}"),
}),
)
.await;
retries = 0;
continue;
}
if retries < max_retries {
retries += 1;
let delay = match &err {
CodexErr::Stream(_, requested_delay) => {
requested_delay.unwrap_or_else(|| backoff(retries))
}
_ => backoff(retries),
};
warn!(
"stream disconnected - retrying sampling request ({retries}/{max_retries} in {delay:?})...",
);
// In release builds, hide the first websocket retry notification to reduce noisy
// transient reconnect messages. In debug builds, keep full visibility for diagnosis.
let report_error = retries > 1
|| cfg!(debug_assertions)
|| !sess.services.model_client.responses_websocket_enabled();
if report_error {
// Surface retry information to any UI/front-end so the
// user understands what is happening instead of staring
// at a seemingly frozen screen.
sess.notify_stream_error(
&turn_context,
format!("Reconnecting... {retries}/{max_retries}"),
err,
)
.await;
}
tokio::time::sleep(delay).await;
} else {
break Err(err);
}
tokio::time::sleep(delay).await;
} else {
return Err(err);
}
}
.await;
if let Some(code_mode_worker) = code_mode_worker {
code_mode_worker.shutdown().await;
}
result
}
pub(crate) async fn built_tools(