is first turn

This commit is contained in:
Roy Han
2026-03-25 12:06:07 -07:00
parent 3766590bee
commit c36438ca59
6 changed files with 150 additions and 1 deletions

View File

@@ -44,6 +44,7 @@ pub(crate) struct CodexTurnEvent {
pub(crate) collaboration_mode: ModeKind,
pub(crate) personality: Option<Personality>,
pub(crate) num_input_images: usize,
pub(crate) is_first_turn: bool,
}
pub(crate) fn build_track_events_context(
@@ -409,6 +410,7 @@ struct CodexTurnEventParams {
collaboration_mode: Option<&'static str>,
personality: Option<String>,
num_input_images: usize,
is_first_turn: bool,
}
#[derive(Serialize)]
@@ -764,6 +766,7 @@ fn codex_turn_event_params(
collaboration_mode: Some(collaboration_mode_mode(turn_event.collaboration_mode)),
personality: personality_mode(turn_event.personality),
num_input_images: turn_event.num_input_images,
is_first_turn: turn_event.is_first_turn,
}
}

View File

@@ -219,6 +219,7 @@ fn turn_event_serializes_expected_shape() {
collaboration_mode: ModeKind::Plan,
personality: Some(Personality::Pragmatic),
num_input_images: 2,
is_first_turn: true,
},
),
});
@@ -244,7 +245,8 @@ fn turn_event_serializes_expected_shape() {
"sandbox_network_access": true,
"collaboration_mode": "plan",
"personality": "pragmatic",
"num_input_images": 2
"num_input_images": 2,
"is_first_turn": true
}
})
);

View File

@@ -172,6 +172,7 @@ use crate::config::types::McpServerConfig;
use crate::config::types::ShellEnvironmentPolicy;
use crate::context_manager::ContextManager;
use crate::context_manager::TotalTokenUsageBreakdown;
use crate::context_manager::is_user_turn_boundary;
use crate::environment_context::EnvironmentContext;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
@@ -784,6 +785,17 @@ pub(crate) fn session_loop_termination_from_handle(
.shared()
}
fn initial_history_has_prior_user_turns(conversation_history: &InitialHistory) -> bool {
conversation_history.scan_rollout_items(rollout_item_is_user_turn_boundary)
}
fn rollout_item_is_user_turn_boundary(item: &RolloutItem) -> bool {
match item {
RolloutItem::ResponseItem(item) => is_user_turn_boundary(item),
_ => false,
}
}
/// Context for an initialized model agent
///
/// A session has at most 1 running task at a time, and can be interrupted by user input.
@@ -2154,6 +2166,11 @@ impl Session {
SessionSource::SubAgent(_)
)
};
let has_prior_user_turns = initial_history_has_prior_user_turns(&conversation_history);
{
let mut state = self.state.lock().await;
state.set_next_turn_is_first(!has_prior_user_turns);
}
match conversation_history {
InitialHistory::New => {
// Defer initial context insertion until the first real turn starts so
@@ -5992,6 +6009,10 @@ pub(crate) async fn run_turn(
}
if !input.is_empty() {
let is_first_turn = {
let mut state = sess.state.lock().await;
state.take_next_turn_is_first()
};
sess.services.analytics_events_client.track_turn_event(
tracking,
CodexTurnEvent {
@@ -6011,6 +6032,7 @@ pub(crate) async fn run_turn(
matches!(item, UserInput::Image { .. } | UserInput::LocalImage { .. })
})
.count(),
is_first_turn,
},
);
}

View File

@@ -33,6 +33,7 @@ pub(crate) struct SessionState {
pub(crate) active_connector_selection: HashSet<String>,
pub(crate) pending_session_start_source: Option<codex_hooks::SessionStartSource>,
granted_permissions: Option<PermissionProfile>,
next_turn_is_first: bool,
}
impl SessionState {
@@ -51,6 +52,7 @@ impl SessionState {
active_connector_selection: HashSet::new(),
pending_session_start_source: None,
granted_permissions: None,
next_turn_is_first: true,
}
}
@@ -73,6 +75,16 @@ impl SessionState {
self.previous_turn_settings = previous_turn_settings;
}
pub(crate) fn set_next_turn_is_first(&mut self, value: bool) {
self.next_turn_is_first = value;
}
pub(crate) fn take_next_turn_is_first(&mut self) -> bool {
let is_first_turn = self.next_turn_is_first;
self.next_turn_is_first = false;
is_first_turn
}
pub(crate) fn clone_history(&self) -> ContextManager {
self.history.clone()
}

View File

@@ -233,12 +233,114 @@ async fn user_turn_tracks_turn_metadata_analytics() -> anyhow::Result<()> {
assert_eq!(event_params["collaboration_mode"], "plan");
assert!(event_params["personality"].is_null());
assert_eq!(event_params["num_input_images"], 0);
assert_eq!(event_params["is_first_turn"], true);
assert!(event_params["thread_id"].as_str().is_some());
assert!(event_params["turn_id"].as_str().is_some());
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn resumed_thread_turn_tracks_is_not_first_turn_analytics() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
mount_sse_once(
&server,
sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]),
)
.await;
mount_sse_once(
&server,
sse(vec![ev_response_created("resp-2"), ev_completed("resp-2")]),
)
.await;
let chatgpt_base_url = server.uri();
let mut builder = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(move |config| {
config.chatgpt_base_url = chatgpt_base_url;
});
let initial = builder.build(&server).await?;
initial
.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "first turn".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&initial.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let rollout_path = initial
.session_configured
.rollout_path
.clone()
.expect("persisted rollout path should be present");
let home = initial.home.clone();
let resumed_chatgpt_base_url = server.uri();
let mut resume_builder = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(move |config| {
config.chatgpt_base_url = resumed_chatgpt_base_url;
});
let resumed = resume_builder.resume(&server, home, rollout_path).await?;
resumed
.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "resumed turn".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&resumed.codex, |event| {
matches!(event, EventMsg::TurnComplete(_))
})
.await;
let deadline = Instant::now() + Duration::from_secs(10);
let resumed_turn_event = loop {
let requests = server.received_requests().await.unwrap_or_default();
let turn_events = requests
.iter()
.filter(|request| request.url.path() == "/codex/analytics-events/events")
.filter_map(|request| serde_json::from_slice::<Value>(&request.body).ok())
.flat_map(|payload| {
payload["events"]
.as_array()
.cloned()
.unwrap_or_default()
.into_iter()
})
.filter(|event| event["event_type"] == "codex_turn_event")
.collect::<Vec<_>>();
if let Some(event) = turn_events.last().cloned() {
if turn_events.len() >= 2 {
break event;
}
}
if Instant::now() >= deadline {
panic!("timed out waiting for resumed turn analytics event");
}
tokio::time::sleep(Duration::from_millis(50)).await;
};
assert_eq!(resumed_turn_event["event_params"]["is_first_turn"], false);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn assistant_message_item_is_emitted() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -2245,6 +2245,14 @@ pub enum InitialHistory {
}
impl InitialHistory {
pub fn scan_rollout_items(&self, mut predicate: impl FnMut(&RolloutItem) -> bool) -> bool {
match self {
InitialHistory::New => false,
InitialHistory::Resumed(resumed) => resumed.history.iter().any(&mut predicate),
InitialHistory::Forked(items) => items.iter().any(predicate),
}
}
pub fn forked_from_id(&self) -> Option<ThreadId> {
match self {
InitialHistory::New => None,