Merge remote-tracking branch 'origin/main' into jif/multi-agent-1

This commit is contained in:
jif-oai
2025-12-16 17:05:15 +00:00
12 changed files with 125 additions and 120 deletions

View File

@@ -58,7 +58,7 @@ impl<T: HttpTransport, A: AuthProvider> ResponsesClient<T, A> {
self.stream(request.body, request.headers).await
}
#[instrument(skip_all, err)]
#[instrument(level = "trace", skip_all, err)]
pub async fn stream_prompt(
&self,
model: &str,

View File

@@ -181,7 +181,7 @@ mod tests {
use opentelemetry::trace::TracerProvider;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::trace::SdkTracerProvider;
use tracing::info_span;
use tracing::trace_span;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
@@ -195,7 +195,7 @@ mod tests {
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
let _guard = subscriber.set_default();
let span = info_span!("client_request");
let span = trace_span!("client_request");
let _entered = span.enter();
let span_context = span.context().span().span_context().clone();

View File

@@ -66,8 +66,8 @@ use tracing::debug;
use tracing::error;
use tracing::field;
use tracing::info;
use tracing::info_span;
use tracing::instrument;
use tracing::trace_span;
use tracing::warn;
use crate::ModelProviderInfo;
@@ -2477,6 +2477,16 @@ pub(crate) async fn run_task(
if input.is_empty() {
return None;
}
let auto_compact_limit = turn_context
.client
.get_model_family()
.auto_compact_token_limit()
.unwrap_or(i64::MAX);
let total_usage_tokens = sess.get_total_token_usage().await;
if total_usage_tokens >= auto_compact_limit {
run_auto_compact(&sess, &turn_context).await;
}
let event = EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
});
@@ -2559,25 +2569,12 @@ pub(crate) async fn run_task(
needs_follow_up,
last_agent_message: turn_last_agent_message,
} = turn_output;
let limit = turn_context
.client
.get_model_family()
.auto_compact_token_limit()
.unwrap_or(i64::MAX);
let total_usage_tokens = sess.get_total_token_usage().await;
let token_limit_reached = total_usage_tokens >= limit;
let token_limit_reached = total_usage_tokens >= auto_compact_limit;
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
if token_limit_reached {
if should_use_remote_compact_task(
sess.as_ref(),
&turn_context.client.get_provider(),
) {
run_inline_remote_auto_compact_task(sess.clone(), turn_context.clone())
.await;
} else {
run_inline_auto_compact_task(sess.clone(), turn_context.clone()).await;
}
if token_limit_reached && needs_follow_up {
run_auto_compact(&sess, &turn_context).await;
continue;
}
@@ -2619,7 +2616,15 @@ pub(crate) async fn run_task(
last_agent_message
}
#[instrument(
async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) {
if should_use_remote_compact_task(sess.as_ref(), &turn_context.client.get_provider()) {
run_inline_remote_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await;
} else {
run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await;
}
}
#[instrument(level = "trace",
skip_all,
fields(
turn_id = %turn_context.sub_id,
@@ -2784,7 +2789,7 @@ async fn drain_in_flight(
}
#[allow(clippy::too_many_arguments)]
#[instrument(
#[instrument(level = "trace",
skip_all,
fields(
turn_id = %turn_context.sub_id,
@@ -2813,7 +2818,7 @@ async fn try_run_turn(
.client
.clone()
.stream(prompt)
.instrument(info_span!("stream_request"))
.instrument(trace_span!("stream_request"))
.or_cancel(&cancellation_token)
.await??;
@@ -2829,9 +2834,9 @@ async fn try_run_turn(
let mut last_agent_message: Option<String> = None;
let mut active_item: Option<TurnItem> = None;
let mut should_emit_turn_diff = false;
let receiving_span = info_span!("receiving_stream");
let receiving_span = trace_span!("receiving_stream");
let outcome: CodexResult<TurnRunResult> = loop {
let handle_responses = info_span!(
let handle_responses = trace_span!(
parent: &receiving_span,
"handle_responses",
otel.name = field::Empty,
@@ -2841,7 +2846,7 @@ async fn try_run_turn(
let event = match stream
.next()
.instrument(info_span!(parent: &handle_responses, "receiving"))
.instrument(trace_span!(parent: &handle_responses, "receiving"))
.or_cancel(&cancellation_token)
.await
{

View File

@@ -400,7 +400,7 @@ impl McpConnectionManager {
/// Returns a single map that contains all tools. Each key is the
/// fully-qualified name for the tool.
#[instrument(skip_all)]
#[instrument(level = "trace", skip_all)]
pub async fn list_all_tools(&self) -> HashMap<String, ToolInfo> {
let mut tools = HashMap::new();
for managed_client in self.clients.values() {

View File

@@ -39,7 +39,7 @@ pub(crate) struct HandleOutputCtx {
pub cancellation_token: CancellationToken,
}
#[instrument(skip_all)]
#[instrument(level = "trace", skip_all)]
pub(crate) async fn handle_output_item_done(
ctx: &mut HandleOutputCtx,
item: ResponseItem,

View File

@@ -4,7 +4,7 @@ use async_trait::async_trait;
use codex_protocol::user_input::UserInput;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;
use tracing::info_span;
use tracing::trace_span;
use crate::codex::TurnContext;
use crate::codex::run_task;
@@ -31,7 +31,7 @@ impl SessionTask for RegularTask {
) -> Option<String> {
let sess = session.clone_session();
let run_task_span =
info_span!(parent: sess.services.otel_manager.current_span(), "run_task");
trace_span!(parent: sess.services.otel_manager.current_span(), "run_task");
run_task(sess, ctx, input, cancellation_token)
.instrument(run_task_span)
.await

View File

@@ -6,8 +6,8 @@ use tokio_util::either::Either;
use tokio_util::sync::CancellationToken;
use tokio_util::task::AbortOnDropHandle;
use tracing::Instrument;
use tracing::info_span;
use tracing::instrument;
use tracing::trace_span;
use crate::codex::Session;
use crate::codex::TurnContext;
@@ -45,7 +45,7 @@ impl ToolCallRuntime {
}
}
#[instrument(skip_all, fields(call = ?call))]
#[instrument(level = "trace", skip_all, fields(call = ?call))]
pub(crate) fn handle_tool_call(
self,
call: ToolCall,
@@ -60,7 +60,7 @@ impl ToolCallRuntime {
let lock = Arc::clone(&self.parallel_execution);
let started = Instant::now();
let dispatch_span = info_span!(
let dispatch_span = trace_span!(
"dispatch_tool_call",
otel.name = call.tool_name.as_str(),
tool_name = call.tool_name.as_str(),

View File

@@ -54,7 +54,7 @@ impl ToolRouter {
.any(|config| config.spec.name() == tool_name)
}
#[instrument(skip_all, err)]
#[instrument(level = "trace", skip_all, err)]
pub async fn build_tool_call(
session: &Session,
item: ResponseItem,
@@ -130,7 +130,7 @@ impl ToolRouter {
}
}
#[instrument(skip_all, err)]
#[instrument(level = "trace", skip_all, err)]
pub async fn dispatch_tool_call(
&self,
session: Arc<Session>,

View File

@@ -1009,7 +1009,6 @@ async fn auto_compact_runs_after_token_limit_hit() {
ev_assistant_message("m3", AUTO_SUMMARY_TEXT),
ev_completed_with_tokens("r3", 200),
]);
let sse_resume = sse(vec![ev_completed("r3-resume")]);
let sse4 = sse(vec![
ev_assistant_message("m4", FINAL_REPLY),
ev_completed_with_tokens("r4", 120),
@@ -1038,15 +1037,6 @@ async fn auto_compact_runs_after_token_limit_hit() {
};
mount_sse_once_match(&server, third_matcher, sse3).await;
let resume_marker = prefixed_auto_summary;
let resume_matcher = move |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains(resume_marker)
&& !body_contains_text(body, SUMMARIZATION_PROMPT)
&& !body.contains(POST_AUTO_USER_MSG)
};
mount_sse_once_match(&server, resume_matcher, sse_resume).await;
let fourth_matcher = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains(POST_AUTO_USER_MSG) && !body_contains_text(body, SUMMARIZATION_PROMPT)
@@ -1106,8 +1096,8 @@ async fn auto_compact_runs_after_token_limit_hit() {
let requests = get_responses_requests(&server).await;
assert_eq!(
requests.len(),
5,
"expected user turns, a compaction request, a resumed turn, and the follow-up turn; got {}",
4,
"expected user turns, a compaction request, and the follow-up turn; got {}",
requests.len()
);
let is_auto_compact = |req: &wiremock::Request| {
@@ -1131,19 +1121,6 @@ async fn auto_compact_runs_after_token_limit_hit() {
"auto compact should add a third request"
);
let resume_summary_marker = prefixed_auto_summary;
let resume_index = requests
.iter()
.enumerate()
.find_map(|(idx, req)| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
(body.contains(resume_summary_marker)
&& !body_contains_text(body, SUMMARIZATION_PROMPT)
&& !body.contains(POST_AUTO_USER_MSG))
.then_some(idx)
})
.expect("resume request missing after compaction");
let follow_up_index = requests
.iter()
.enumerate()
@@ -1154,15 +1131,12 @@ async fn auto_compact_runs_after_token_limit_hit() {
.then_some(idx)
})
.expect("follow-up request missing");
assert_eq!(follow_up_index, 4, "follow-up request should be last");
assert_eq!(follow_up_index, 3, "follow-up request should be last");
let body_first = requests[0].body_json::<serde_json::Value>().unwrap();
let body_auto = requests[auto_compact_index]
.body_json::<serde_json::Value>()
.unwrap();
let body_resume = requests[resume_index]
.body_json::<serde_json::Value>()
.unwrap();
let body_follow_up = requests[follow_up_index]
.body_json::<serde_json::Value>()
.unwrap();
@@ -1201,23 +1175,6 @@ async fn auto_compact_runs_after_token_limit_hit() {
"auto compact should send the summarization prompt as a user message",
);
let input_resume = body_resume.get("input").and_then(|v| v.as_array()).unwrap();
assert!(
input_resume.iter().any(|item| {
item.get("type").and_then(|v| v.as_str()) == Some("message")
&& item.get("role").and_then(|v| v.as_str()) == Some("user")
&& item
.get("content")
.and_then(|v| v.as_array())
.and_then(|arr| arr.first())
.and_then(|entry| entry.get("text"))
.and_then(|v| v.as_str())
.map(|text| text.contains(prefixed_auto_summary))
.unwrap_or(false)
}),
"resume request should include compacted history"
);
let input_follow_up = body_follow_up
.get("input")
.and_then(|v| v.as_array())
@@ -1276,6 +1233,10 @@ async fn auto_compact_persists_rollout_entries() {
ev_assistant_message("m3", &auto_summary_payload),
ev_completed_with_tokens("r3", 200),
]);
let sse4 = sse(vec![
ev_assistant_message("m4", FINAL_REPLY),
ev_completed_with_tokens("r4", 120),
]);
let first_matcher = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
@@ -1299,12 +1260,19 @@ async fn auto_compact_persists_rollout_entries() {
};
mount_sse_once_match(&server, third_matcher, sse3).await;
let fourth_matcher = |req: &wiremock::Request| {
let body = std::str::from_utf8(&req.body).unwrap_or("");
body.contains(POST_AUTO_USER_MSG) && !body_contains_text(body, SUMMARIZATION_PROMPT)
};
mount_sse_once_match(&server, fourth_matcher, sse4).await;
let model_provider = non_openai_model_provider(&server);
let home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&home);
config.model_provider = model_provider;
set_test_compact_prompt(&mut config);
config.model_auto_compact_token_limit = Some(200_000);
let conversation_manager = ConversationManager::with_models_provider(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
@@ -1335,6 +1303,16 @@ async fn auto_compact_persists_rollout_entries() {
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: POST_AUTO_USER_MSG.into(),
}],
})
.await
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
codex.submit(Op::Shutdown).await.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
@@ -1731,6 +1709,8 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_
ev_assistant_message("m6", FINAL_REPLY),
ev_completed_with_tokens("r6", 120),
]);
let follow_up_user = "FOLLOW_UP_AUTO_COMPACT";
let final_user = "FINAL_AUTO_COMPACT";
mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4, sse5, sse6]).await;
@@ -1751,31 +1731,31 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_
.unwrap()
.conversation;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: MULTI_AUTO_MSG.into(),
}],
})
.await
.unwrap();
let mut auto_compact_lifecycle_events = Vec::new();
loop {
let event = codex.next_event().await.unwrap();
if event.id.starts_with("auto-compact-")
&& matches!(
event.msg,
EventMsg::TaskStarted(_) | EventMsg::TaskComplete(_)
)
{
auto_compact_lifecycle_events.push(event);
continue;
}
if let EventMsg::TaskComplete(_) = &event.msg
&& !event.id.starts_with("auto-compact-")
{
break;
for user in [MULTI_AUTO_MSG, follow_up_user, final_user] {
codex
.submit(Op::UserInput {
items: vec![UserInput::Text { text: user.into() }],
})
.await
.unwrap();
loop {
let event = codex.next_event().await.unwrap();
if event.id.starts_with("auto-compact-")
&& matches!(
event.msg,
EventMsg::TaskStarted(_) | EventMsg::TaskComplete(_)
)
{
auto_compact_lifecycle_events.push(event);
continue;
}
if let EventMsg::TaskComplete(_) = &event.msg
&& !event.id.starts_with("auto-compact-")
{
break;
}
}
}
@@ -1821,6 +1801,7 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() {
let context_window = 100;
let limit = context_window * 90 / 100;
let over_limit_tokens = context_window * 95 / 100 + 1;
let follow_up_user = "FOLLOW_UP_AFTER_LIMIT";
let first_turn = sse(vec![
ev_function_call(DUMMY_CALL_ID, DUMMY_FUNCTION_NAME, "{}"),
@@ -1873,6 +1854,17 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() {
wait_for_event(&codex, |msg| matches!(msg, EventMsg::TaskComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: follow_up_user.into(),
}],
})
.await
.unwrap();
wait_for_event(&codex, |msg| matches!(msg, EventMsg::TaskComplete(_))).await;
// Assert first request captured expected user message that triggers function call.
let first_request = first_turn_mock.single_request().input();
assert!(
@@ -1916,6 +1908,7 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() {
let first_user = "COUNT_PRE_LAST_REASONING";
let second_user = "TRIGGER_COMPACT_AT_LIMIT";
let third_user = "AFTER_REMOTE_COMPACT";
let pre_last_reasoning_content = "a".repeat(2_400);
let post_last_reasoning_content = "b".repeat(4_000);
@@ -1928,7 +1921,7 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() {
ev_reasoning_item("post-reasoning", &["post"], &[&post_last_reasoning_content]),
ev_completed_with_tokens("r2", 80),
]);
let resume_turn = sse(vec![
let third_turn = sse(vec![
ev_assistant_message("m4", FINAL_REPLY),
ev_completed_with_tokens("r4", 1),
]);
@@ -1940,8 +1933,8 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() {
first_turn,
// Turn 2: reasoning after last user (should be ignored for compaction).
second_turn,
// Turn 3: resume after remote compaction.
resume_turn,
// Turn 3: next user turn after remote compaction.
third_turn,
],
)
.await;
@@ -1973,7 +1966,10 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() {
.expect("build codex")
.codex;
for (idx, user) in [first_user, second_user].into_iter().enumerate() {
for (idx, user) in [first_user, second_user, third_user]
.into_iter()
.enumerate()
{
codex
.submit(Op::UserInput {
items: vec![UserInput::Text { text: user.into() }],
@@ -1982,10 +1978,10 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() {
.unwrap();
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
if idx == 0 {
if idx < 2 {
assert!(
compact_mock.requests().is_empty(),
"remote compaction should not run after the first turn"
"remote compaction should not run before the next user turn"
);
}
}
@@ -2006,20 +2002,21 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() {
assert_eq!(
requests.len(),
3,
"conversation should include two user turns and a post-compaction resume"
"conversation should include three user turns"
);
let second_request_body = requests[1].body_json().to_string();
assert!(
!second_request_body.contains("REMOTE_COMPACT_SUMMARY"),
"second turn should not include compacted history"
);
let resume_body = requests[2].body_json().to_string();
let third_request_body = requests[2].body_json().to_string();
assert!(
resume_body.contains("REMOTE_COMPACT_SUMMARY") || resume_body.contains(FINAL_REPLY),
"resume request should follow remote compact and use compacted history"
third_request_body.contains("REMOTE_COMPACT_SUMMARY")
|| third_request_body.contains(FINAL_REPLY),
"third turn should include compacted history"
);
assert!(
resume_body.contains("ENCRYPTED_COMPACTION_SUMMARY"),
"resume request should include compaction summary item"
third_request_body.contains("ENCRYPTED_COMPACTION_SUMMARY"),
"third turn should include compaction summary item"
);
}

View File

@@ -25,6 +25,7 @@ use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use std::sync::Mutex;
use tracing::Level;
use tracing_test::traced_test;
use tracing_subscriber::fmt::format::FmtSpan;
@@ -454,6 +455,7 @@ async fn handle_responses_span_records_response_kind_and_tool_name() {
let subscriber = tracing_subscriber::fmt()
.with_level(true)
.with_ansi(false)
.with_max_level(Level::TRACE)
.with_span_events(FmtSpan::FULL)
.with_writer(MockWriter::new(buffer))
.finish();
@@ -517,6 +519,7 @@ async fn record_responses_sets_span_fields_for_response_events() {
let subscriber = tracing_subscriber::fmt()
.with_level(true)
.with_ansi(false)
.with_max_level(Level::TRACE)
.with_span_events(FmtSpan::FULL)
.with_writer(MockWriter::new(buffer))
.finish();

View File

@@ -25,7 +25,7 @@ use std::time::Instant;
use strum_macros::Display;
use tokio::time::error::Elapsed;
use tracing::Span;
use tracing::info_span;
use tracing::trace_span;
use tracing_opentelemetry::OpenTelemetrySpanExt;
#[derive(Debug, Clone, Serialize, Display)]
@@ -67,7 +67,7 @@ impl OtelManager {
terminal_type: String,
session_source: SessionSource,
) -> OtelManager {
let session_span = info_span!("new_session", conversation_id = %conversation_id, session_source = %session_source);
let session_span = trace_span!("new_session", conversation_id = %conversation_id, session_source = %session_source);
if let Some(context) = traceparent_context_from_env() {
session_span.set_parent(context);

View File

@@ -134,7 +134,7 @@ impl OtelProvider {
self.tracer.as_ref().map(|tracer| {
tracing_opentelemetry::layer()
.with_tracer(tracer.clone())
.with_filter(LevelFilter::INFO)
.with_filter(LevelFilter::TRACE)
})
}