Compare commits

...

1 Commits

Author SHA1 Message Date
shijie-openai
71de55886e Feat: update rollout creation strategy 2026-02-06 09:13:00 -08:00
7 changed files with 335 additions and 52 deletions

View File

@@ -50,7 +50,7 @@ Use the thread APIs to create, list, or archive conversations. Drive a conversat
## Lifecycle Overview
- Initialize once per connection: Immediately after opening a transport connection, send an `initialize` request with your client metadata, then emit an `initialized` notification. Any other request on that connection before this handshake gets rejected.
- Start (or resume) a thread: Call `thread/start` to open a fresh conversation. The response returns the thread object and youll also get a `thread/started` notification. If youre continuing an existing conversation, call `thread/resume` with its ID instead. If you want to branch from an existing conversation, call `thread/fork` to create a new thread id with copied history.
- Start (or resume) a thread: Call `thread/start` to open a fresh conversation. The response returns the thread object and youll also get a `thread/started` notification. Fresh threads do not create a rollout file until the first `turn/start`. If youre continuing an existing conversation, call `thread/resume` with its ID instead. If you want to branch from an existing conversation, call `thread/fork` to create a new thread id with copied history.
- Begin a turn: To send user input, call `turn/start` with the target `threadId` and the user's input. Optional fields let you override model, cwd, sandbox policy, etc. This immediately returns the new turn object and triggers a `turn/started` notification.
- Stream events: After `turn/start`, keep reading JSON-RPC notifications on stdout. Youll see `item/started`, `item/completed`, deltas like `item/agentMessage/delta`, tool progress, etc. These represent streaming model output plus any side effects (commands, tool calls, reasoning notes).
- Finish the turn: When the model is done (or the turn is interrupted via making the `turn/interrupt` call), the server sends `turn/completed` with the final turn state and token usage.

View File

@@ -1,5 +1,6 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
@@ -7,6 +8,8 @@ use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::UserInput;
use codex_core::ARCHIVED_SESSIONS_SUBDIR;
use codex_core::find_thread_path_by_id_str;
use std::path::Path;
@@ -18,7 +21,8 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
#[tokio::test]
async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
let server = create_mock_responses_server_repeating_assistant("Done").await;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
@@ -38,6 +42,22 @@ async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
assert!(!thread.id.is_empty());
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let _: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
)
.await??;
// Locate the rollout path recorded for this thread id.
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id)
.await?
@@ -80,14 +100,25 @@ async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
Ok(())
}
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(config_toml, config_contents())
std::fs::write(config_toml, config_contents(server_uri))
}
fn config_contents() -> &'static str {
r#"model = "mock-model"
fn config_contents(server_uri: &str) -> String {
format!(
r#"model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
)
}

View File

@@ -56,6 +56,22 @@ async fn thread_resume_returns_original_thread() -> Result<()> {
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let _: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
)
.await??;
// Resume it via v2 API.
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {

View File

@@ -8,7 +8,10 @@ use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::UserInput;
use codex_core::config::set_project_trust_level;
use codex_core::find_thread_path_by_id_str;
use codex_protocol::config_types::TrustLevel;
use codex_protocol::openai_models::ReasoningEffort;
use std::path::Path;
@@ -58,6 +61,11 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
thread.created_at > 0,
"created_at should be a positive UNIX timestamp"
);
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id).await?;
assert!(
rollout_path.is_none(),
"fresh threads should not create rollout files until first turn"
);
// A corresponding thread/started notification should arrive.
let notif: JSONRPCNotification = timeout(
@@ -69,6 +77,28 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
serde_json::from_value(notif.params.expect("params must be present"))?;
assert_eq!(started.thread, thread);
// First turn should create the rollout file lazily.
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let _: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
)
.await??;
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id).await?;
assert!(
rollout_path.is_some(),
"first turn should create rollout file"
);
Ok(())
}

View File

@@ -1,5 +1,6 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
@@ -9,6 +10,8 @@ use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::ThreadUnarchiveResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::UserInput;
use codex_core::find_archived_thread_path_by_id_str;
use codex_core::find_thread_path_by_id_str;
use std::fs::FileTimes;
@@ -24,7 +27,8 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
#[tokio::test]
async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
let server = create_mock_responses_server_repeating_assistant("Done").await;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
@@ -42,6 +46,22 @@ async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let _: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
)
.await??;
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id)
.await?
.expect("expected rollout path for thread id to exist");
@@ -108,14 +128,25 @@ async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result
Ok(())
}
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(config_toml, config_contents())
std::fs::write(config_toml, config_contents(server_uri))
}
fn config_contents() -> &'static str {
r#"model = "mock-model"
fn config_contents(server_uri: &str) -> String {
format!(
r#"model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
)
}

View File

@@ -858,26 +858,9 @@ impl Session {
let forked_from_id = initial_history.forked_from_id();
let (conversation_id, rollout_params) = match &initial_history {
InitialHistory::New | InitialHistory::Forked(_) => {
let conversation_id = ThreadId::default();
(
conversation_id,
RolloutRecorderParams::new(
conversation_id,
forked_from_id,
session_source,
BaseInstructions {
text: session_configuration.base_instructions.clone(),
},
session_configuration.dynamic_tools.clone(),
),
)
}
InitialHistory::Resumed(resumed_history) => (
resumed_history.conversation_id,
RolloutRecorderParams::resume(resumed_history.rollout_path.clone()),
),
let conversation_id = match &initial_history {
InitialHistory::New | InitialHistory::Forked(_) => ThreadId::default(),
InitialHistory::Resumed(resumed_history) => resumed_history.conversation_id,
};
let state_builder = match &initial_history {
InitialHistory::Resumed(resumed) => metadata::builder_from_items(
@@ -894,17 +877,52 @@ impl Session {
// - load history metadata
let rollout_fut = async {
if config.ephemeral {
Ok::<_, anyhow::Error>((None, None))
Ok::<_, anyhow::Error>((None, None, None))
} else {
let state_db_ctx = state_db::init_if_enabled(&config, None).await;
let rollout_recorder = RolloutRecorder::new(
&config,
rollout_params,
state_db_ctx.clone(),
state_builder.clone(),
)
.await?;
Ok((Some(rollout_recorder), state_db_ctx))
match &initial_history {
InitialHistory::New => {
let rollout_params = RolloutRecorderParams::new(
conversation_id,
forked_from_id,
session_source.clone(),
BaseInstructions {
text: session_configuration.base_instructions.clone(),
},
session_configuration.dynamic_tools.clone(),
);
Ok((None, state_db_ctx, Some(rollout_params)))
}
InitialHistory::Forked(_) => {
let rollout_params = RolloutRecorderParams::new(
conversation_id,
forked_from_id,
session_source.clone(),
BaseInstructions {
text: session_configuration.base_instructions.clone(),
},
session_configuration.dynamic_tools.clone(),
);
let rollout_recorder = RolloutRecorder::new(
&config,
rollout_params,
state_db_ctx.clone(),
state_builder.clone(),
)
.await?;
Ok((Some(rollout_recorder), state_db_ctx, None))
}
InitialHistory::Resumed(resumed_history) => {
let rollout_recorder = RolloutRecorder::new(
&config,
RolloutRecorderParams::resume(resumed_history.rollout_path.clone()),
state_db_ctx.clone(),
state_builder.clone(),
)
.await?;
Ok((Some(rollout_recorder), state_db_ctx, None))
}
}
}
};
@@ -929,10 +947,11 @@ impl Session {
(auth, mcp_servers, auth_statuses),
) = tokio::join!(rollout_fut, history_meta_fut, auth_and_mcp_fut);
let (rollout_recorder, state_db_ctx) = rollout_recorder_and_state_db.map_err(|e| {
error!("failed to initialize rollout recorder: {e:#}");
e
})?;
let (rollout_recorder, state_db_ctx, pending_rollout_create) =
rollout_recorder_and_state_db.map_err(|e| {
error!("failed to initialize rollout recorder: {e:#}");
e
})?;
let rollout_path = rollout_recorder
.as_ref()
.map(|rec| rec.rollout_path.clone());
@@ -1042,6 +1061,7 @@ impl Session {
),
hooks: Hooks::new(config.as_ref()),
rollout: Mutex::new(rollout_recorder),
pending_rollout_create: Mutex::new(pending_rollout_create),
user_shell: Arc::new(default_shell),
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
exec_policy,
@@ -1158,6 +1178,50 @@ impl Session {
}
}
async fn ensure_rollout_initialized_for_turn(
&self,
turn_context: &TurnContext,
) -> std::io::Result<()> {
{
let rollout = self.services.rollout.lock().await;
if rollout.is_some() {
return Ok(());
}
}
let params = {
let mut pending = self.services.pending_rollout_create.lock().await;
pending.take()
};
let Some(params) = params else {
return Ok(());
};
let recorder = match RolloutRecorder::new(
turn_context.config.as_ref(),
params.clone(),
self.services.state_db.clone(),
None,
)
.await
{
Ok(recorder) => recorder,
Err(err) => {
let mut pending = self.services.pending_rollout_create.lock().await;
if pending.is_none() {
*pending = Some(params);
}
return Err(err);
}
};
let mut rollout = self.services.rollout.lock().await;
if rollout.is_none() {
*rollout = Some(recorder);
}
Ok(())
}
fn next_internal_sub_id(&self) -> String {
let id = self
.next_internal_sub_id
@@ -1186,14 +1250,8 @@ impl Session {
let turn_context = self.new_default_turn().await;
match conversation_history {
InitialHistory::New => {
// Build and record initial items (user instructions + environment context)
let items = self.build_initial_context(&turn_context).await;
self.record_conversation_items(&turn_context, &items).await;
{
let mut state = self.state.lock().await;
state.initial_context_seeded = true;
}
// Ensure initial items are visible to immediate readers (e.g., tests, forks).
// Defer initial context persistence until the first turn starts.
// This lets turn/start overrides be reflected in the seeded context.
self.flush_rollout().await;
}
InitialHistory::Resumed(resumed_history) => {
@@ -2898,6 +2956,20 @@ mod handlers {
// Attempt to inject input into current task.
if let Err(SteerInputError::NoActiveTurn(items)) = sess.steer_input(items, None).await {
if let Err(err) = sess
.ensure_rollout_initialized_for_turn(&current_context)
.await
{
sess.send_event_raw(Event {
id: current_context.sub_id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: format!("failed to initialize rollout recorder: {err}"),
codex_error_info: Some(CodexErrorInfo::Other),
}),
})
.await;
return;
}
sess.seed_initial_context_if_needed(&current_context).await;
let resumed_model = sess.take_pending_resume_previous_model().await;
let update_items = sess.build_settings_update_items(
@@ -5063,6 +5135,25 @@ mod tests {
assert_eq!(expected, history.raw_items());
}
#[tokio::test]
async fn record_initial_history_new_defers_initial_context_until_first_turn() {
let (session, turn_context) = make_session_and_context().await;
{
let mut state = session.state.lock().await;
state.initial_context_seeded = false;
}
session.record_initial_history(InitialHistory::New).await;
let history = session.state.lock().await.clone_history();
assert_eq!(history.raw_items(), Vec::<ResponseItem>::new());
session.seed_initial_context_if_needed(&turn_context).await;
let expected = session.build_initial_context(&turn_context).await;
let history = session.state.lock().await.clone_history();
assert_eq!(history.raw_items(), expected);
}
#[tokio::test]
async fn resumed_history_seeds_initial_context_on_first_turn_only() {
let (session, turn_context) = make_session_and_context().await;
@@ -5220,6 +5311,86 @@ mod tests {
assert_eq!(expected, history.raw_items());
}
#[tokio::test]
async fn lazy_rollout_creation_writes_session_meta_then_initial_context_then_turn_context() {
let (session, turn_context) = make_session_and_context().await;
{
let mut state = session.state.lock().await;
state.initial_context_seeded = false;
}
let base_instructions = session.get_base_instructions().await;
let session_source = {
let state = session.state.lock().await;
state.session_configuration.session_source.clone()
};
{
let mut pending = session.services.pending_rollout_create.lock().await;
*pending = Some(RolloutRecorderParams::new(
session.conversation_id,
None,
session_source,
base_instructions,
Vec::new(),
));
}
let conversation_id = session.conversation_id.to_string();
let rollout_path = crate::rollout::find_thread_path_by_id_str(
&turn_context.config.codex_home,
&conversation_id,
)
.await
.expect("lookup rollout path before first turn");
assert_eq!(rollout_path, None);
session
.ensure_rollout_initialized_for_turn(&turn_context)
.await
.expect("initialize rollout recorder lazily");
session.seed_initial_context_if_needed(&turn_context).await;
session
.persist_rollout_items(&[RolloutItem::TurnContext(TurnContextItem {
cwd: turn_context.cwd.clone(),
approval_policy: turn_context.approval_policy,
sandbox_policy: turn_context.sandbox_policy.clone(),
model: turn_context.model_info.slug.clone(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: turn_context.user_instructions.clone(),
developer_instructions: turn_context.developer_instructions.clone(),
final_output_json_schema: turn_context.final_output_json_schema.clone(),
truncation_policy: Some(turn_context.truncation_policy.into()),
})])
.await;
session.flush_rollout().await;
let rollout_path = crate::rollout::find_thread_path_by_id_str(
&turn_context.config.codex_home,
&conversation_id,
)
.await
.expect("lookup rollout path after first turn")
.expect("rollout path should exist");
let (items, _, _) = RolloutRecorder::load_rollout_items(&rollout_path)
.await
.expect("load rollout items");
assert!(matches!(items.first(), Some(RolloutItem::SessionMeta(_))));
let initial_context_len = session.build_initial_context(&turn_context).await.len();
assert_eq!(
items
.iter()
.skip(1)
.take(initial_context_len)
.filter(|item| matches!(item, RolloutItem::ResponseItem(_)))
.count(),
initial_context_len
);
assert!(matches!(items.last(), Some(RolloutItem::TurnContext(_))));
}
#[tokio::test]
async fn thread_rollback_drops_last_turn_from_history() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
@@ -5776,6 +5947,7 @@ mod tests {
),
hooks: Hooks::new(&config),
rollout: Mutex::new(None),
pending_rollout_create: Mutex::new(None),
user_shell: Arc::new(default_user_shell()),
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
exec_policy,
@@ -5906,6 +6078,7 @@ mod tests {
),
hooks: Hooks::new(&config),
rollout: Mutex::new(None),
pending_rollout_create: Mutex::new(None),
user_shell: Arc::new(default_user_shell()),
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
exec_policy,

View File

@@ -10,6 +10,7 @@ use crate::file_watcher::FileWatcher;
use crate::hooks::Hooks;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::models_manager::manager::ModelsManager;
use crate::rollout::RolloutRecorderParams;
use crate::skills::SkillsManager;
use crate::state_db::StateDbHandle;
use crate::tools::sandboxing::ApprovalStore;
@@ -26,6 +27,7 @@ pub(crate) struct SessionServices {
pub(crate) analytics_events_client: AnalyticsEventsClient,
pub(crate) hooks: Hooks,
pub(crate) rollout: Mutex<Option<RolloutRecorder>>,
pub(crate) pending_rollout_create: Mutex<Option<RolloutRecorderParams>>,
pub(crate) user_shell: Arc<crate::shell::Shell>,
pub(crate) show_raw_agent_reasoning: bool,
pub(crate) exec_policy: ExecPolicyManager,