mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
Merge branch 'rhan/oob-turn-metadata' into rhan/oob-queue
This commit is contained in:
@@ -46,6 +46,7 @@ pub(crate) struct CodexTurnEvent {
|
||||
pub(crate) collaboration_mode: ModeKind,
|
||||
pub(crate) personality: Option<Personality>,
|
||||
pub(crate) num_input_images: usize,
|
||||
pub(crate) is_first_turn: bool,
|
||||
}
|
||||
|
||||
pub(crate) fn build_track_events_context(
|
||||
@@ -412,6 +413,7 @@ struct CodexTurnEventParams {
|
||||
collaboration_mode: Option<&'static str>,
|
||||
personality: Option<String>,
|
||||
num_input_images: usize,
|
||||
is_first_turn: bool,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
@@ -768,6 +770,7 @@ fn codex_turn_event_params(
|
||||
collaboration_mode: Some(collaboration_mode_mode(turn_event.collaboration_mode)),
|
||||
personality: personality_mode(turn_event.personality),
|
||||
num_input_images: turn_event.num_input_images,
|
||||
is_first_turn: turn_event.is_first_turn,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -221,6 +221,7 @@ fn turn_event_serializes_expected_shape() {
|
||||
collaboration_mode: ModeKind::Plan,
|
||||
personality: Some(Personality::Pragmatic),
|
||||
num_input_images: 2,
|
||||
is_first_turn: true,
|
||||
},
|
||||
),
|
||||
});
|
||||
@@ -247,7 +248,8 @@ fn turn_event_serializes_expected_shape() {
|
||||
"sandbox_network_access": true,
|
||||
"collaboration_mode": "plan",
|
||||
"personality": "pragmatic",
|
||||
"num_input_images": 2
|
||||
"num_input_images": 2,
|
||||
"is_first_turn": true
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
@@ -173,6 +173,7 @@ use crate::config::types::McpServerConfig;
|
||||
use crate::config::types::ShellEnvironmentPolicy;
|
||||
use crate::context_manager::ContextManager;
|
||||
use crate::context_manager::TotalTokenUsageBreakdown;
|
||||
use crate::context_manager::is_user_turn_boundary;
|
||||
use crate::environment_context::EnvironmentContext;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result as CodexResult;
|
||||
@@ -785,6 +786,17 @@ pub(crate) fn session_loop_termination_from_handle(
|
||||
.shared()
|
||||
}
|
||||
|
||||
fn initial_history_has_prior_user_turns(conversation_history: &InitialHistory) -> bool {
|
||||
conversation_history.scan_rollout_items(rollout_item_is_user_turn_boundary)
|
||||
}
|
||||
|
||||
fn rollout_item_is_user_turn_boundary(item: &RolloutItem) -> bool {
|
||||
match item {
|
||||
RolloutItem::ResponseItem(item) => is_user_turn_boundary(item),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Context for an initialized model agent
|
||||
///
|
||||
/// A session has at most 1 running task at a time, and can be interrupted by user input.
|
||||
@@ -2159,6 +2171,11 @@ impl Session {
|
||||
SessionSource::SubAgent(_)
|
||||
)
|
||||
};
|
||||
let has_prior_user_turns = initial_history_has_prior_user_turns(&conversation_history);
|
||||
{
|
||||
let mut state = self.state.lock().await;
|
||||
state.set_next_turn_is_first(!has_prior_user_turns);
|
||||
}
|
||||
match conversation_history {
|
||||
InitialHistory::New => {
|
||||
// Defer initial context insertion until the first real turn starts so
|
||||
@@ -6020,6 +6037,10 @@ pub(crate) async fn run_turn(
|
||||
let submission_type = turn_context
|
||||
.submission_type
|
||||
.unwrap_or(SubmissionType::Prompt);
|
||||
let is_first_turn = {
|
||||
let mut state = sess.state.lock().await;
|
||||
state.take_next_turn_is_first()
|
||||
};
|
||||
sess.services.analytics_events_client.track_turn_event(
|
||||
tracking,
|
||||
CodexTurnEvent {
|
||||
@@ -6040,6 +6061,7 @@ pub(crate) async fn run_turn(
|
||||
matches!(item, UserInput::Image { .. } | UserInput::LocalImage { .. })
|
||||
})
|
||||
.count(),
|
||||
is_first_turn,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
@@ -33,6 +33,7 @@ pub(crate) struct SessionState {
|
||||
pub(crate) active_connector_selection: HashSet<String>,
|
||||
pub(crate) pending_session_start_source: Option<codex_hooks::SessionStartSource>,
|
||||
granted_permissions: Option<PermissionProfile>,
|
||||
next_turn_is_first: bool,
|
||||
}
|
||||
|
||||
impl SessionState {
|
||||
@@ -51,6 +52,7 @@ impl SessionState {
|
||||
active_connector_selection: HashSet::new(),
|
||||
pending_session_start_source: None,
|
||||
granted_permissions: None,
|
||||
next_turn_is_first: true,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,6 +75,16 @@ impl SessionState {
|
||||
self.previous_turn_settings = previous_turn_settings;
|
||||
}
|
||||
|
||||
pub(crate) fn set_next_turn_is_first(&mut self, value: bool) {
|
||||
self.next_turn_is_first = value;
|
||||
}
|
||||
|
||||
pub(crate) fn take_next_turn_is_first(&mut self) -> bool {
|
||||
let is_first_turn = self.next_turn_is_first;
|
||||
self.next_turn_is_first = false;
|
||||
is_first_turn
|
||||
}
|
||||
|
||||
pub(crate) fn clone_history(&self) -> ContextManager {
|
||||
self.history.clone()
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ use anyhow::Ok;
|
||||
use codex_core::CodexAuth;
|
||||
use codex_protocol::config_types::CollaborationMode;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
use codex_protocol::config_types::Personality;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::config_types::ServiceTier;
|
||||
use codex_protocol::config_types::Settings;
|
||||
@@ -145,6 +146,7 @@ async fn user_turn_tracks_turn_metadata_analytics() -> anyhow::Result<()> {
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(move |config| {
|
||||
config.chatgpt_base_url = chatgpt_base_url;
|
||||
config.personality = Some(Personality::None);
|
||||
});
|
||||
let TestCodex {
|
||||
codex,
|
||||
@@ -233,12 +235,114 @@ async fn user_turn_tracks_turn_metadata_analytics() -> anyhow::Result<()> {
|
||||
assert_eq!(event_params["collaboration_mode"], "plan");
|
||||
assert!(event_params["personality"].is_null());
|
||||
assert_eq!(event_params["num_input_images"], 0);
|
||||
assert_eq!(event_params["is_first_turn"], true);
|
||||
assert!(event_params["thread_id"].as_str().is_some());
|
||||
assert!(event_params["turn_id"].as_str().is_some());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn resumed_thread_turn_tracks_is_not_first_turn_analytics() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
mount_sse_once(
|
||||
&server,
|
||||
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
|
||||
)
|
||||
.await;
|
||||
mount_sse_once(
|
||||
&server,
|
||||
sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let chatgpt_base_url = server.uri();
|
||||
let mut builder = test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(move |config| {
|
||||
config.chatgpt_base_url = chatgpt_base_url;
|
||||
});
|
||||
let initial = builder.build(&server).await?;
|
||||
|
||||
initial
|
||||
.codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "first turn".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&initial.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
let rollout_path = initial
|
||||
.session_configured
|
||||
.rollout_path
|
||||
.clone()
|
||||
.expect("persisted rollout path should be present");
|
||||
let home = initial.home.clone();
|
||||
let resumed_chatgpt_base_url = server.uri();
|
||||
|
||||
let mut resume_builder = test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(move |config| {
|
||||
config.chatgpt_base_url = resumed_chatgpt_base_url;
|
||||
});
|
||||
let resumed = resume_builder.resume(&server, home, rollout_path).await?;
|
||||
|
||||
resumed
|
||||
.codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: "resumed turn".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
})
|
||||
.await?;
|
||||
wait_for_event(&resumed.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
let deadline = Instant::now() + Duration::from_secs(10);
|
||||
let resumed_turn_event = loop {
|
||||
let requests = server.received_requests().await.unwrap_or_default();
|
||||
let turn_events = requests
|
||||
.iter()
|
||||
.filter(|request| request.url.path() == "/codex/analytics-events/events")
|
||||
.filter_map(|request| serde_json::from_slice::<Value>(&request.body).ok())
|
||||
.flat_map(|payload| {
|
||||
payload["events"]
|
||||
.as_array()
|
||||
.cloned()
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
})
|
||||
.filter(|event| event["event_type"] == "codex_turn_event")
|
||||
.collect::<Vec<_>>();
|
||||
if let Some(event) = turn_events.last().cloned() {
|
||||
if turn_events.len() >= 2 {
|
||||
break event;
|
||||
}
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
panic!("timed out waiting for resumed turn analytics event");
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
};
|
||||
|
||||
assert_eq!(resumed_turn_event["event_params"]["is_first_turn"], false);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn assistant_message_item_is_emitted() -> anyhow::Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -2271,6 +2271,14 @@ pub enum InitialHistory {
|
||||
}
|
||||
|
||||
impl InitialHistory {
|
||||
pub fn scan_rollout_items(&self, mut predicate: impl FnMut(&RolloutItem) -> bool) -> bool {
|
||||
match self {
|
||||
InitialHistory::New => false,
|
||||
InitialHistory::Resumed(resumed) => resumed.history.iter().any(&mut predicate),
|
||||
InitialHistory::Forked(items) => items.iter().any(predicate),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn forked_from_id(&self) -> Option<ThreadId> {
|
||||
match self {
|
||||
InitialHistory::New => None,
|
||||
|
||||
Reference in New Issue
Block a user