turn metadata: per-turn non-blocking (#11677)

This commit is contained in:
pash-openai
2026-02-13 12:48:29 -08:00
committed by GitHub
parent a4bb59884b
commit 6c0a924203
11 changed files with 457 additions and 158 deletions

View File

@@ -633,7 +633,6 @@ impl ModelClientSession {
&mut self,
otel_manager: &OtelManager,
model_info: &ModelInfo,
turn_metadata_header: Option<&str>,
) -> std::result::Result<(), ApiError> {
if !self.client.responses_websocket_enabled(model_info) || self.client.websockets_disabled()
{
@@ -656,7 +655,7 @@ impl ModelClientSession {
client_setup.api_provider,
client_setup.api_auth,
Some(Arc::clone(&self.turn_state)),
turn_metadata_header,
None,
)
.await?;
self.connection = Some(connection);

View File

@@ -38,8 +38,7 @@ use crate::stream_events_utils::handle_output_item_done;
use crate::stream_events_utils::last_assistant_message_from_item;
use crate::terminal;
use crate::truncate::TruncationPolicy;
use crate::turn_metadata::build_turn_metadata_header;
use crate::turn_metadata::resolve_turn_metadata_header_with_timeout;
use crate::turn_metadata::TurnMetadataState;
use crate::util::error_or_panic;
use async_channel::Receiver;
use async_channel::Sender;
@@ -92,7 +91,6 @@ use rmcp::model::RequestId;
use serde_json;
use serde_json::Value;
use tokio::sync::Mutex;
use tokio::sync::OnceCell;
use tokio::sync::RwLock;
use tokio::sync::oneshot;
use tokio::sync::watch;
@@ -558,7 +556,7 @@ pub(crate) struct TurnContext {
pub(crate) truncation_policy: TruncationPolicy,
pub(crate) js_repl: Arc<JsReplHandle>,
pub(crate) dynamic_tools: Vec<DynamicToolSpec>,
turn_metadata_header: OnceCell<Option<String>>,
turn_metadata_state: Arc<TurnMetadataState>,
}
impl TurnContext {
pub(crate) fn model_context_window(&self) -> Option<i64> {
@@ -638,7 +636,7 @@ impl TurnContext {
truncation_policy,
js_repl: Arc::clone(&self.js_repl),
dynamic_tools: self.dynamic_tools.clone(),
turn_metadata_header: self.turn_metadata_header.clone(),
turn_metadata_state: self.turn_metadata_state.clone(),
}
}
@@ -689,39 +687,16 @@ impl TurnContext {
})
}
async fn build_turn_metadata_header(&self) -> Option<String> {
let sandbox = sandbox_tag(&self.sandbox_policy, self.windows_sandbox_level);
self.turn_metadata_header
.get_or_init(|| async {
build_turn_metadata_header(self.cwd.as_path(), Some(sandbox)).await
})
.await
.clone()
pub fn current_turn_metadata_header(&self) -> Option<String> {
self.turn_metadata_state.current_header_value()
}
/// Resolves the per-turn metadata header under a shared timeout policy.
///
/// This uses the same timeout helper as websocket startup prewarm so both turn execution and
/// background prewarm observe identical "timeout means best-effort fallback" behavior.
pub async fn resolve_turn_metadata_header(&self) -> Option<String> {
resolve_turn_metadata_header_with_timeout(
self.build_turn_metadata_header(),
self.turn_metadata_header.get().cloned().flatten(),
)
.await
pub fn spawn_turn_metadata_enrichment_task(self: &Arc<Self>) {
self.turn_metadata_state.spawn_git_enrichment_task();
}
/// Starts best-effort background computation of turn metadata.
///
/// This warms the cached value used by [`TurnContext::resolve_turn_metadata_header`] so turns
/// and websocket prewarm are less likely to pay metadata construction latency on demand.
pub fn spawn_turn_metadata_header_task(self: &Arc<Self>) {
let context = Arc::clone(self);
tokio::spawn(async move {
trace!("Spawning turn metadata calculation task");
context.build_turn_metadata_header().await;
trace!("Turn metadata calculation task completed");
});
pub fn cancel_turn_metadata_enrichment_task(&self) {
self.turn_metadata_state.cancel_git_enrichment_task();
}
}
@@ -948,6 +923,17 @@ impl Session {
});
let cwd = session_configuration.cwd.clone();
let turn_metadata_state = Arc::new(TurnMetadataState::new(
sub_id.clone(),
cwd.clone(),
Some(
sandbox_tag(
session_configuration.sandbox_policy.get(),
session_configuration.windows_sandbox_level,
)
.to_string(),
),
));
TurnContext {
sub_id,
config: per_turn_config.clone(),
@@ -978,7 +964,7 @@ impl Session {
truncation_policy: model_info.truncation_policy.into(),
js_repl,
dynamic_tools: session_configuration.dynamic_tools.clone(),
turn_metadata_header: OnceCell::new(),
turn_metadata_state,
}
}
@@ -1273,17 +1259,10 @@ impl Session {
let prewarm_model_info = models_manager
.get_model_info(session_configuration.collaboration_mode.model(), &config)
.await;
let prewarm_cwd = session_configuration.cwd.clone();
let turn_metadata_header = resolve_turn_metadata_header_with_timeout(
async move { build_turn_metadata_header(prewarm_cwd.as_path(), None).await },
None,
)
.boxed();
let startup_regular_task = RegularTask::with_startup_prewarm(
services.model_client.clone(),
services.otel_manager.clone(),
prewarm_model_info,
turn_metadata_header,
);
state.set_startup_regular_task(startup_regular_task);
@@ -1779,7 +1758,7 @@ impl Session {
turn_context.final_output_json_schema = final_schema;
}
let turn_context = Arc::new(turn_context);
turn_context.spawn_turn_metadata_header_task();
turn_context.spawn_turn_metadata_enrichment_task();
turn_context
}
@@ -4026,9 +4005,21 @@ async fn spawn_review_thread(
let session_source = parent_turn_context.session_source.clone();
let per_turn_config = Arc::new(per_turn_config);
let review_turn_id = sub_id.to_string();
let turn_metadata_state = Arc::new(TurnMetadataState::new(
review_turn_id.clone(),
parent_turn_context.cwd.clone(),
Some(
sandbox_tag(
&parent_turn_context.sandbox_policy,
parent_turn_context.windows_sandbox_level,
)
.to_string(),
),
));
let review_turn_context = TurnContext {
sub_id: sub_id.to_string(),
sub_id: review_turn_id,
config: per_turn_config,
auth_manager: auth_manager_for_context,
model_info: model_info.clone(),
@@ -4057,7 +4048,7 @@ async fn spawn_review_thread(
js_repl: Arc::clone(&sess.js_repl),
dynamic_tools: parent_turn_context.dynamic_tools.clone(),
truncation_policy: model_info.truncation_policy.into(),
turn_metadata_header: parent_turn_context.turn_metadata_header.clone(),
turn_metadata_state,
};
// Seed the child task with the review prompt as the initial user message.
@@ -4067,6 +4058,7 @@ async fn spawn_review_thread(
text_elements: Vec::new(),
}];
let tc = Arc::new(review_turn_context);
tc.spawn_turn_metadata_enrichment_task();
sess.spawn_task(tc.clone(), input, ReviewTask::new()).await;
// Announce entering review mode so UIs can switch modes.
@@ -4298,7 +4290,6 @@ pub(crate) async fn run_turn(
// many turns, from the perspective of the user, it is a single turn.
let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
let turn_metadata_header = turn_context.resolve_turn_metadata_header().await;
// `ModelClientSession` is turn-scoped and caches WebSocket + sticky routing state, so we reuse
// one instance across retries within this turn.
let mut client_session =
@@ -4350,6 +4341,7 @@ pub(crate) async fn run_turn(
})
.map(|user_message| user_message.message())
.collect::<Vec<String>>();
let turn_metadata_header = turn_context.current_turn_metadata_header();
match run_sampling_request(
Arc::clone(&sess),
Arc::clone(&turn_context),

View File

@@ -84,7 +84,6 @@ async fn run_compact_task_inner(
let max_retries = turn_context.provider.stream_max_retries();
let mut retries = 0;
let turn_metadata_header = turn_context.resolve_turn_metadata_header().await;
let mut client_session = sess.services.model_client.new_session();
// Reuse one client session so turn-scoped state (sticky routing, websocket append tracking)
// survives retries within this compact turn.
@@ -110,6 +109,7 @@ async fn run_compact_task_inner(
personality: turn_context.personality,
..Default::default()
};
let turn_metadata_header = turn_context.current_turn_metadata_header();
let attempt_result = drain_to_completed(
&sess,
turn_context.as_ref(),

View File

@@ -150,6 +150,15 @@ pub async fn get_head_commit_hash(cwd: &Path) -> Option<String> {
}
}
pub async fn get_has_changes(cwd: &Path) -> Option<bool> {
let output = run_git_command_with_timeout(&["status", "--porcelain"], cwd).await?;
if !output.status.success() {
return None;
}
Some(!output.stdout.is_empty())
}
fn parse_git_remote_urls(stdout: &str) -> Option<BTreeMap<String, String>> {
let mut remotes = BTreeMap::new();
for line in stdout.lines() {
@@ -254,7 +263,11 @@ pub async fn git_diff_to_remote(cwd: &Path) -> Option<GitDiffToRemote> {
/// Run a git command with a timeout to prevent blocking on large repositories
async fn run_git_command_with_timeout(args: &[&str], cwd: &Path) -> Option<std::process::Output> {
let mut command = Command::new("git");
command.args(args).current_dir(cwd).kill_on_drop(true);
command
.env("GIT_OPTIONAL_LOCKS", "0")
.args(args)
.current_dir(cwd)
.kill_on_drop(true);
let result = timeout(GIT_COMMAND_TIMEOUT, command.output()).await;
match result {
@@ -962,6 +975,37 @@ mod tests {
assert_eq!(git_info.branch, Some("feature-branch".to_string()));
}
#[tokio::test]
async fn test_get_has_changes_non_git_directory_returns_none() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
assert_eq!(get_has_changes(temp_dir.path()).await, None);
}
#[tokio::test]
async fn test_get_has_changes_clean_repo_returns_false() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let repo_path = create_test_git_repo(&temp_dir).await;
assert_eq!(get_has_changes(&repo_path).await, Some(false));
}
#[tokio::test]
async fn test_get_has_changes_with_tracked_change_returns_true() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let repo_path = create_test_git_repo(&temp_dir).await;
fs::write(repo_path.join("test.txt"), "updated tracked file").expect("write tracked file");
assert_eq!(get_has_changes(&repo_path).await, Some(true));
}
#[tokio::test]
async fn test_get_has_changes_with_untracked_change_returns_true() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");
let repo_path = create_test_git_repo(&temp_dir).await;
fs::write(repo_path.join("new_file.txt"), "untracked").expect("write untracked file");
assert_eq!(get_has_changes(&repo_path).await, Some(true));
}
#[tokio::test]
async fn test_get_git_working_tree_state_clean_repo() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");

View File

@@ -198,7 +198,7 @@ async fn build_request_context(session: &Arc<Session>, config: &Config) -> Reque
let turn_context = session.new_default_turn().await;
RequestContext::from_turn_context(
turn_context.as_ref(),
turn_context.resolve_turn_metadata_header().await,
turn_context.current_turn_metadata_header(),
model,
)
}

View File

@@ -197,6 +197,8 @@ impl Session {
turn_context: Arc<TurnContext>,
last_agent_message: Option<String>,
) {
turn_context.cancel_turn_metadata_enrichment_task();
let mut active = self.active_turn.lock().await;
let mut pending_input = Vec::<ResponseInputItem>::new();
let mut should_clear_active_turn = false;
@@ -260,6 +262,7 @@ impl Session {
trace!(task_kind = ?task.kind, sub_id, "aborting running task");
task.cancellation_token.cancel();
task.turn_context.cancel_turn_metadata_enrichment_task();
let session_task = task.task;
select! {

View File

@@ -10,7 +10,6 @@ use async_trait::async_trait;
use codex_otel::OtelManager;
use codex_protocol::openai_models::ModelInfo;
use codex_protocol::user_input::UserInput;
use futures::future::BoxFuture;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
@@ -39,13 +38,11 @@ impl RegularTask {
model_client: ModelClient,
otel_manager: OtelManager,
model_info: ModelInfo,
turn_metadata_header: BoxFuture<'static, Option<String>>,
) -> Self {
let prewarmed_session_task = tokio::spawn(async move {
let mut client_session = model_client.new_session();
let turn_metadata_header = turn_metadata_header.await;
match client_session
.prewarm_websocket(&otel_manager, &model_info, turn_metadata_header.as_deref())
.prewarm_websocket(&otel_manager, &model_info)
.await
{
Ok(()) => Some(client_session),

View File

@@ -1,88 +1,284 @@
//! Helpers for computing and resolving optional per-turn metadata headers.
//!
//! This module owns both metadata construction and the shared timeout policy used by
//! turn execution and startup websocket prewarm. Keeping timeout behavior centralized
//! ensures both call sites treat timeout as the same best-effort fallback condition.
use std::collections::BTreeMap;
use std::future::Future;
use std::path::Path;
use std::time::Duration;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::RwLock;
use serde::Serialize;
use tracing::warn;
use tokio::task::JoinHandle;
use crate::git_info::get_git_remote_urls_assume_git_repo;
use crate::git_info::get_git_repo_root;
use crate::git_info::get_has_changes;
use crate::git_info::get_head_commit_hash;
pub(crate) const TURN_METADATA_HEADER_TIMEOUT: Duration = Duration::from_millis(250);
#[derive(Clone, Debug, Default)]
struct WorkspaceGitMetadata {
associated_remote_urls: Option<BTreeMap<String, String>>,
latest_git_commit_hash: Option<String>,
has_changes: Option<bool>,
}
/// Resolves turn metadata with a shared timeout policy.
///
/// On timeout, this logs a warning and returns the provided fallback header.
///
/// Keeping this helper centralized avoids drift between turn-time metadata resolution and startup
/// websocket prewarm, both of which need identical timeout semantics.
pub(crate) async fn resolve_turn_metadata_header_with_timeout<F>(
build_header: F,
fallback_on_timeout: Option<String>,
) -> Option<String>
where
F: Future<Output = Option<String>>,
{
match tokio::time::timeout(TURN_METADATA_HEADER_TIMEOUT, build_header).await {
Ok(header) => header,
Err(_) => {
warn!(
"timed out after {}ms while building turn metadata header",
TURN_METADATA_HEADER_TIMEOUT.as_millis()
);
fallback_on_timeout
}
impl WorkspaceGitMetadata {
fn is_empty(&self) -> bool {
self.associated_remote_urls.is_none()
&& self.latest_git_commit_hash.is_none()
&& self.has_changes.is_none()
}
}
#[derive(Serialize)]
#[derive(Clone, Debug, Serialize, Default)]
struct TurnMetadataWorkspace {
#[serde(default, skip_serializing_if = "Option::is_none")]
associated_remote_urls: Option<BTreeMap<String, String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
latest_git_commit_hash: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
has_changes: Option<bool>,
}
#[derive(Serialize)]
struct TurnMetadata {
impl From<WorkspaceGitMetadata> for TurnMetadataWorkspace {
fn from(value: WorkspaceGitMetadata) -> Self {
Self {
associated_remote_urls: value.associated_remote_urls,
latest_git_commit_hash: value.latest_git_commit_hash,
has_changes: value.has_changes,
}
}
}
#[derive(Clone, Debug, Serialize, Default)]
pub(crate) struct TurnMetadataBag {
#[serde(default, skip_serializing_if = "Option::is_none")]
turn_id: Option<String>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
workspaces: BTreeMap<String, TurnMetadataWorkspace>,
#[serde(default, skip_serializing_if = "Option::is_none")]
sandbox: Option<String>,
}
pub async fn build_turn_metadata_header(cwd: &Path, sandbox: Option<&str>) -> Option<String> {
let repo_root = get_git_repo_root(cwd);
impl TurnMetadataBag {
fn to_header_value(&self) -> Option<String> {
serde_json::to_string(self).ok()
}
}
let (latest_git_commit_hash, associated_remote_urls) = tokio::join!(
fn build_turn_metadata_bag(
turn_id: Option<String>,
sandbox: Option<String>,
repo_root: Option<String>,
workspace_git_metadata: Option<WorkspaceGitMetadata>,
) -> TurnMetadataBag {
let mut workspaces = BTreeMap::new();
if let (Some(repo_root), Some(workspace_git_metadata)) = (repo_root, workspace_git_metadata)
&& !workspace_git_metadata.is_empty()
{
workspaces.insert(repo_root, workspace_git_metadata.into());
}
TurnMetadataBag {
turn_id,
workspaces,
sandbox,
}
}
pub async fn build_turn_metadata_header(cwd: &Path, sandbox: Option<&str>) -> Option<String> {
let repo_root = get_git_repo_root(cwd).map(|root| root.to_string_lossy().into_owned());
let (latest_git_commit_hash, associated_remote_urls, has_changes) = tokio::join!(
get_head_commit_hash(cwd),
get_git_remote_urls_assume_git_repo(cwd)
get_git_remote_urls_assume_git_repo(cwd),
get_has_changes(cwd),
);
if latest_git_commit_hash.is_none() && associated_remote_urls.is_none() && sandbox.is_none() {
if latest_git_commit_hash.is_none()
&& associated_remote_urls.is_none()
&& has_changes.is_none()
&& sandbox.is_none()
{
return None;
}
let mut workspaces = BTreeMap::new();
if let Some(repo_root) = repo_root {
workspaces.insert(
repo_root.to_string_lossy().into_owned(),
TurnMetadataWorkspace {
associated_remote_urls,
latest_git_commit_hash,
},
build_turn_metadata_bag(
None,
sandbox.map(ToString::to_string),
repo_root,
Some(WorkspaceGitMetadata {
associated_remote_urls,
latest_git_commit_hash,
has_changes,
}),
)
.to_header_value()
}
#[derive(Clone, Debug)]
pub(crate) struct TurnMetadataState {
cwd: PathBuf,
repo_root: Option<String>,
base_metadata: TurnMetadataBag,
base_header: String,
enriched_header: Arc<RwLock<Option<String>>>,
enrichment_task: Arc<Mutex<Option<JoinHandle<()>>>>,
}
impl TurnMetadataState {
pub(crate) fn new(turn_id: String, cwd: PathBuf, sandbox: Option<String>) -> Self {
let repo_root = get_git_repo_root(&cwd).map(|root| root.to_string_lossy().into_owned());
let base_metadata = build_turn_metadata_bag(Some(turn_id), sandbox, None, None);
let base_header = base_metadata
.to_header_value()
.unwrap_or_else(|| "{}".to_string());
Self {
cwd,
repo_root,
base_metadata,
base_header,
enriched_header: Arc::new(RwLock::new(None)),
enrichment_task: Arc::new(Mutex::new(None)),
}
}
pub(crate) fn current_header_value(&self) -> Option<String> {
if let Some(header) = self
.enriched_header
.read()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.as_ref()
.cloned()
{
return Some(header);
}
Some(self.base_header.clone())
}
pub(crate) fn spawn_git_enrichment_task(&self) {
if self.repo_root.is_none() {
return;
}
let mut task_guard = self
.enrichment_task
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if task_guard.is_some() {
return;
}
let state = self.clone();
*task_guard = Some(tokio::spawn(async move {
let workspace_git_metadata = state.fetch_workspace_git_metadata().await;
let Some(repo_root) = state.repo_root.clone() else {
return;
};
let enriched_metadata = build_turn_metadata_bag(
state.base_metadata.turn_id.clone(),
state.base_metadata.sandbox.clone(),
Some(repo_root),
Some(workspace_git_metadata),
);
if enriched_metadata.workspaces.is_empty() {
return;
}
if let Some(header_value) = enriched_metadata.to_header_value() {
*state
.enriched_header
.write()
.unwrap_or_else(std::sync::PoisonError::into_inner) = Some(header_value);
}
}));
}
pub(crate) fn cancel_git_enrichment_task(&self) {
let mut task_guard = self
.enrichment_task
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if let Some(task) = task_guard.take() {
task.abort();
}
}
async fn fetch_workspace_git_metadata(&self) -> WorkspaceGitMetadata {
let (latest_git_commit_hash, associated_remote_urls, has_changes) = tokio::join!(
get_head_commit_hash(&self.cwd),
get_git_remote_urls_assume_git_repo(&self.cwd),
get_has_changes(&self.cwd),
);
WorkspaceGitMetadata {
associated_remote_urls,
latest_git_commit_hash,
has_changes,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::Value;
use tempfile::TempDir;
use tokio::process::Command;
#[tokio::test]
async fn build_turn_metadata_header_includes_has_changes_for_clean_repo() {
let temp_dir = TempDir::new().expect("temp dir");
let repo_path = temp_dir.path().join("repo");
std::fs::create_dir_all(&repo_path).expect("create repo");
Command::new("git")
.args(["init"])
.current_dir(&repo_path)
.output()
.await
.expect("git init");
Command::new("git")
.args(["config", "user.name", "Test User"])
.current_dir(&repo_path)
.output()
.await
.expect("git config user.name");
Command::new("git")
.args(["config", "user.email", "test@example.com"])
.current_dir(&repo_path)
.output()
.await
.expect("git config user.email");
std::fs::write(repo_path.join("README.md"), "hello").expect("write file");
Command::new("git")
.args(["add", "."])
.current_dir(&repo_path)
.output()
.await
.expect("git add");
Command::new("git")
.args(["commit", "-m", "initial"])
.current_dir(&repo_path)
.output()
.await
.expect("git commit");
let header = build_turn_metadata_header(&repo_path, Some("none"))
.await
.expect("header");
let parsed: Value = serde_json::from_str(&header).expect("valid json");
let workspace = parsed
.get("workspaces")
.and_then(Value::as_object)
.and_then(|workspaces| workspaces.values().next())
.cloned()
.expect("workspace");
assert_eq!(
workspace.get("has_changes").and_then(Value::as_bool),
Some(false)
);
}
serde_json::to_string(&TurnMetadata {
workspaces,
sandbox: sandbox.map(ToString::to_string),
})
.ok()
}