mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
moving to headers
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::OnceLock;
|
||||
|
||||
@@ -5,6 +6,7 @@ use crate::api_bridge::CoreAuthProvider;
|
||||
use crate::api_bridge::auth_provider_from_auth;
|
||||
use crate::api_bridge::map_api_error;
|
||||
use crate::auth::UnauthorizedRecovery;
|
||||
use crate::turn_metadata::build_turn_metadata_header;
|
||||
use codex_api::AggregateStreamExt;
|
||||
use codex_api::ChatClient as ApiChatClient;
|
||||
use codex_api::CompactClient as ApiCompactClient;
|
||||
@@ -108,6 +110,8 @@ pub struct ModelClientSession {
|
||||
turn_state: Arc<OnceLock<String>>,
|
||||
/// Turn-scoped metadata attached to every request in the turn.
|
||||
turn_metadata_header: Option<HeaderValue>,
|
||||
/// Working directory used to lazily compute turn metadata at send time.
|
||||
turn_metadata_cwd: Option<PathBuf>,
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
@@ -141,12 +145,20 @@ impl ModelClient {
|
||||
}
|
||||
|
||||
pub fn new_session(&self) -> ModelClientSession {
|
||||
self.new_session_with_turn_metadata(None)
|
||||
self.new_session_with_turn_metadata_and_cwd(None, None)
|
||||
}
|
||||
|
||||
pub fn new_session_with_turn_metadata(
|
||||
&self,
|
||||
turn_metadata_header: Option<String>,
|
||||
) -> ModelClientSession {
|
||||
self.new_session_with_turn_metadata_and_cwd(turn_metadata_header, None)
|
||||
}
|
||||
|
||||
pub fn new_session_with_turn_metadata_and_cwd(
|
||||
&self,
|
||||
turn_metadata_header: Option<String>,
|
||||
turn_metadata_cwd: Option<PathBuf>,
|
||||
) -> ModelClientSession {
|
||||
let turn_metadata_header =
|
||||
turn_metadata_header.and_then(|value| HeaderValue::from_str(&value).ok());
|
||||
@@ -157,6 +169,7 @@ impl ModelClient {
|
||||
transport_manager: self.state.transport_manager.clone(),
|
||||
turn_state: Arc::new(OnceLock::new()),
|
||||
turn_metadata_header,
|
||||
turn_metadata_cwd,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -267,6 +280,21 @@ impl ModelClient {
|
||||
}
|
||||
|
||||
impl ModelClientSession {
|
||||
async fn ensure_turn_metadata_header(&mut self) {
|
||||
if self.turn_metadata_header.is_some() {
|
||||
return;
|
||||
}
|
||||
let Some(cwd) = self.turn_metadata_cwd.as_deref() else {
|
||||
return;
|
||||
};
|
||||
let Some(value) = build_turn_metadata_header(cwd).await else {
|
||||
return;
|
||||
};
|
||||
if let Ok(header_value) = HeaderValue::from_str(value.as_str()) {
|
||||
self.turn_metadata_header = Some(header_value);
|
||||
}
|
||||
}
|
||||
|
||||
/// Streams a single model turn using either the Responses or Chat
|
||||
/// Completions wire API, depending on the configured provider.
|
||||
///
|
||||
@@ -274,6 +302,9 @@ impl ModelClientSession {
|
||||
/// based on the `show_raw_agent_reasoning` flag in the config.
|
||||
pub async fn stream(&mut self, prompt: &Prompt) -> Result<ResponseStream> {
|
||||
let wire_api = self.state.provider.wire_api;
|
||||
if matches!(wire_api, WireApi::Responses) {
|
||||
self.ensure_turn_metadata_header().await;
|
||||
}
|
||||
match wire_api {
|
||||
WireApi::Responses => {
|
||||
let websocket_enabled = self.responses_websocket_enabled()
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Debug;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
@@ -79,14 +77,11 @@ use mcp_types::ListResourcesResult;
|
||||
use mcp_types::ReadResourceRequestParams;
|
||||
use mcp_types::ReadResourceResult;
|
||||
use mcp_types::RequestId;
|
||||
use serde::Serialize;
|
||||
use serde_json;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::Mutex;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::time::Duration as TokioDuration;
|
||||
use tokio::time::timeout;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::Instrument;
|
||||
use tracing::debug;
|
||||
@@ -121,8 +116,6 @@ use crate::error::Result as CodexResult;
|
||||
use crate::exec::StreamOutput;
|
||||
use crate::exec_policy::ExecPolicyUpdateError;
|
||||
use crate::feedback_tags;
|
||||
use crate::git_info::collect_git_info;
|
||||
use crate::git_info::get_git_remote_urls;
|
||||
use crate::git_info::get_git_repo_root;
|
||||
use crate::instructions::UserInstructions;
|
||||
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
@@ -512,9 +505,6 @@ pub(crate) struct TurnContext {
|
||||
/// Per-turn metadata serialized as a header value for outbound model requests.
|
||||
pub(crate) turn_metadata_header: Option<String>,
|
||||
}
|
||||
|
||||
const TURN_METADATA_TIMEOUT: TokioDuration = TokioDuration::from_millis(150);
|
||||
|
||||
impl TurnContext {
|
||||
pub(crate) fn resolve_path(&self, path: Option<String>) -> PathBuf {
|
||||
path.as_ref()
|
||||
@@ -718,47 +708,6 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_turn_metadata_header(cwd: &Path) -> Option<String> {
|
||||
#[derive(Serialize)]
|
||||
struct TurnMetadataWorkspace {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
associated_remote_urls: Option<BTreeMap<String, String>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
latest_git_commit_hash: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct TurnMetadata {
|
||||
workspaces: BTreeMap<String, TurnMetadataWorkspace>,
|
||||
}
|
||||
|
||||
get_git_repo_root(cwd)?;
|
||||
// On some CI environments (notably Windows), git commands can hang
|
||||
// until their timeout elapses. Cap the total time we spend gathering
|
||||
// optional metadata so turn startup stays responsive.
|
||||
timeout(TURN_METADATA_TIMEOUT, async {
|
||||
let git_info = collect_git_info(cwd).await?;
|
||||
let latest_git_commit_hash = git_info.commit_hash;
|
||||
let associated_remote_urls = get_git_remote_urls(cwd).await;
|
||||
if latest_git_commit_hash.is_none() && associated_remote_urls.is_none() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut workspaces = BTreeMap::new();
|
||||
workspaces.insert(
|
||||
cwd.to_string_lossy().into_owned(),
|
||||
TurnMetadataWorkspace {
|
||||
associated_remote_urls,
|
||||
latest_git_commit_hash,
|
||||
},
|
||||
);
|
||||
serde_json::to_string(&TurnMetadata { workspaces }).ok()
|
||||
})
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn new(
|
||||
mut session_configuration: SessionConfiguration,
|
||||
@@ -1295,8 +1244,6 @@ impl Session {
|
||||
if let Some(final_schema) = final_output_json_schema {
|
||||
turn_context.final_output_json_schema = final_schema;
|
||||
}
|
||||
turn_context.turn_metadata_header =
|
||||
Self::build_turn_metadata_header(turn_context.cwd.as_path()).await;
|
||||
Arc::new(turn_context)
|
||||
}
|
||||
|
||||
@@ -3252,8 +3199,6 @@ async fn spawn_review_thread(
|
||||
parent_turn_context.client.transport_manager(),
|
||||
);
|
||||
|
||||
let turn_metadata_header =
|
||||
Session::build_turn_metadata_header(parent_turn_context.cwd.as_path()).await;
|
||||
let review_turn_context = TurnContext {
|
||||
sub_id: sub_id.to_string(),
|
||||
client,
|
||||
@@ -3274,7 +3219,7 @@ async fn spawn_review_thread(
|
||||
tool_call_gate: Arc::new(ReadinessFlag::new()),
|
||||
dynamic_tools: parent_turn_context.dynamic_tools.clone(),
|
||||
truncation_policy: model_info.truncation_policy.into(),
|
||||
turn_metadata_header,
|
||||
turn_metadata_header: None,
|
||||
};
|
||||
|
||||
// Seed the child task with the review prompt as the initial user message.
|
||||
@@ -3470,9 +3415,10 @@ pub(crate) async fn run_turn(
|
||||
// many turns, from the perspective of the user, it is a single turn.
|
||||
let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new()));
|
||||
|
||||
let mut client_session = turn_context
|
||||
.client
|
||||
.new_session_with_turn_metadata(turn_context.turn_metadata_header.clone());
|
||||
let mut client_session = turn_context.client.new_session_with_turn_metadata_and_cwd(
|
||||
turn_context.turn_metadata_header.clone(),
|
||||
Some(turn_context.cwd.clone()),
|
||||
);
|
||||
|
||||
loop {
|
||||
// Note that pending_input would be something like a message the user
|
||||
|
||||
@@ -335,7 +335,10 @@ async fn drain_to_completed(
|
||||
turn_context: &TurnContext,
|
||||
prompt: &Prompt,
|
||||
) -> CodexResult<()> {
|
||||
let mut client_session = turn_context.client.new_session();
|
||||
let mut client_session = turn_context.client.new_session_with_turn_metadata_and_cwd(
|
||||
turn_context.turn_metadata_header.clone(),
|
||||
Some(turn_context.cwd.clone()),
|
||||
);
|
||||
let mut stream = client_session.stream(prompt).await?;
|
||||
loop {
|
||||
let maybe_event = stream.next().await;
|
||||
|
||||
@@ -120,12 +120,37 @@ pub async fn get_git_remote_urls(cwd: &Path) -> Option<BTreeMap<String, String>>
|
||||
return None;
|
||||
}
|
||||
|
||||
get_git_remote_urls_assume_git_repo(cwd).await
|
||||
}
|
||||
|
||||
/// Collect fetch remotes without checking whether `cwd` is in a git repo.
|
||||
pub async fn get_git_remote_urls_assume_git_repo(cwd: &Path) -> Option<BTreeMap<String, String>> {
|
||||
let output = run_git_command_with_timeout(&["remote", "-v"], cwd).await?;
|
||||
if !output.status.success() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let stdout = String::from_utf8(output.stdout).ok()?;
|
||||
parse_git_remote_urls(stdout.as_str())
|
||||
}
|
||||
|
||||
/// Return the current HEAD commit hash without checking whether `cwd` is in a git repo.
|
||||
pub async fn get_head_commit_hash(cwd: &Path) -> Option<String> {
|
||||
let output = run_git_command_with_timeout(&["rev-parse", "HEAD"], cwd).await?;
|
||||
if !output.status.success() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let stdout = String::from_utf8(output.stdout).ok()?;
|
||||
let hash = stdout.trim();
|
||||
if hash.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(hash.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_git_remote_urls(stdout: &str) -> Option<BTreeMap<String, String>> {
|
||||
let mut remotes = BTreeMap::new();
|
||||
for line in stdout.lines() {
|
||||
let Some(fetch_line) = line.strip_suffix(" (fetch)") else {
|
||||
|
||||
@@ -99,6 +99,7 @@ pub mod state_db;
|
||||
pub mod terminal;
|
||||
mod tools;
|
||||
pub mod turn_diff_tracker;
|
||||
mod turn_metadata;
|
||||
pub use rollout::ARCHIVED_SESSIONS_SUBDIR;
|
||||
pub use rollout::INTERACTIVE_SESSION_SOURCES;
|
||||
pub use rollout::RolloutRecorder;
|
||||
|
||||
58
codex-rs/core/src/turn_metadata.rs
Normal file
58
codex-rs/core/src/turn_metadata.rs
Normal file
@@ -0,0 +1,58 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::path::Path;
|
||||
|
||||
use serde::Serialize;
|
||||
use tokio::time::Duration as TokioDuration;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use crate::git_info::get_git_remote_urls_assume_git_repo;
|
||||
use crate::git_info::get_git_repo_root;
|
||||
use crate::git_info::get_head_commit_hash;
|
||||
|
||||
const TURN_METADATA_TIMEOUT: TokioDuration = TokioDuration::from_millis(150);
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct TurnMetadataWorkspace {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
associated_remote_urls: Option<BTreeMap<String, String>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
latest_git_commit_hash: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct TurnMetadata {
|
||||
workspaces: BTreeMap<String, TurnMetadataWorkspace>,
|
||||
}
|
||||
|
||||
/// Build a per-turn metadata header value for the given working directory.
|
||||
///
|
||||
/// This is intentionally evaluated lazily at request send time so turns that
|
||||
/// never reach the model do not pay the git subprocess cost.
|
||||
pub(crate) async fn build_turn_metadata_header(cwd: &Path) -> Option<String> {
|
||||
let repo_root = get_git_repo_root(cwd)?;
|
||||
|
||||
// Cap the total time we spend gathering optional metadata so turn startup
|
||||
// stays responsive even when git is slow in certain environments.
|
||||
timeout(TURN_METADATA_TIMEOUT, async {
|
||||
let (latest_git_commit_hash, associated_remote_urls) = tokio::join!(
|
||||
get_head_commit_hash(cwd),
|
||||
get_git_remote_urls_assume_git_repo(cwd)
|
||||
);
|
||||
if latest_git_commit_hash.is_none() && associated_remote_urls.is_none() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut workspaces = BTreeMap::new();
|
||||
workspaces.insert(
|
||||
repo_root.to_string_lossy().into_owned(),
|
||||
TurnMetadataWorkspace {
|
||||
associated_remote_urls,
|
||||
latest_git_commit_hash,
|
||||
},
|
||||
);
|
||||
serde_json::to_string(&TurnMetadata { workspaces }).ok()
|
||||
})
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
}
|
||||
Reference in New Issue
Block a user