mirror of
https://github.com/openai/codex.git
synced 2026-05-01 18:06:47 +00:00
Compare commits
1 Commits
dev/abhina
...
shijie/upd
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
71de55886e |
@@ -50,7 +50,7 @@ Use the thread APIs to create, list, or archive conversations. Drive a conversat
|
||||
## Lifecycle Overview
|
||||
|
||||
- Initialize once per connection: Immediately after opening a transport connection, send an `initialize` request with your client metadata, then emit an `initialized` notification. Any other request on that connection before this handshake gets rejected.
|
||||
- Start (or resume) a thread: Call `thread/start` to open a fresh conversation. The response returns the thread object and you’ll also get a `thread/started` notification. If you’re continuing an existing conversation, call `thread/resume` with its ID instead. If you want to branch from an existing conversation, call `thread/fork` to create a new thread id with copied history.
|
||||
- Start (or resume) a thread: Call `thread/start` to open a fresh conversation. The response returns the thread object and you’ll also get a `thread/started` notification. Fresh threads do not create a rollout file until the first `turn/start`. If you’re continuing an existing conversation, call `thread/resume` with its ID instead. If you want to branch from an existing conversation, call `thread/fork` to create a new thread id with copied history.
|
||||
- Begin a turn: To send user input, call `turn/start` with the target `threadId` and the user's input. Optional fields let you override model, cwd, sandbox policy, etc. This immediately returns the new turn object and triggers a `turn/started` notification.
|
||||
- Stream events: After `turn/start`, keep reading JSON-RPC notifications on stdout. You’ll see `item/started`, `item/completed`, deltas like `item/agentMessage/delta`, tool progress, etc. These represent streaming model output plus any side effects (commands, tool calls, reasoning notes).
|
||||
- Finish the turn: When the model is done (or the turn is interrupted via making the `turn/interrupt` call), the server sends `turn/completed` with the final turn state and token usage.
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
@@ -7,6 +8,8 @@ use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadArchiveResponse;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_core::ARCHIVED_SESSIONS_SUBDIR;
|
||||
use codex_core::find_thread_path_by_id_str;
|
||||
use std::path::Path;
|
||||
@@ -18,7 +21,8 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
|
||||
#[tokio::test]
|
||||
async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path())?;
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
@@ -38,6 +42,22 @@ async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
||||
assert!(!thread.id.is_empty());
|
||||
|
||||
let turn_id = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![UserInput::Text {
|
||||
text: "hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let _: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
// Locate the rollout path recorded for this thread id.
|
||||
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id)
|
||||
.await?
|
||||
@@ -80,14 +100,25 @@ async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(config_toml, config_contents())
|
||||
std::fs::write(config_toml, config_contents(server_uri))
|
||||
}
|
||||
|
||||
fn config_contents() -> &'static str {
|
||||
r#"model = "mock-model"
|
||||
fn config_contents(server_uri: &str) -> String {
|
||||
format!(
|
||||
r#"model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "read-only"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
)
|
||||
}
|
||||
|
||||
@@ -56,6 +56,22 @@ async fn thread_resume_returns_original_thread() -> Result<()> {
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
||||
|
||||
let turn_id = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![UserInput::Text {
|
||||
text: "hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let _: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
// Resume it via v2 API.
|
||||
let resume_id = mcp
|
||||
.send_thread_resume_request(ThreadResumeParams {
|
||||
|
||||
@@ -8,7 +8,10 @@ use codex_app_server_protocol::RequestId;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStartedNotification;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_core::config::set_project_trust_level;
|
||||
use codex_core::find_thread_path_by_id_str;
|
||||
use codex_protocol::config_types::TrustLevel;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use std::path::Path;
|
||||
@@ -58,6 +61,11 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
|
||||
thread.created_at > 0,
|
||||
"created_at should be a positive UNIX timestamp"
|
||||
);
|
||||
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id).await?;
|
||||
assert!(
|
||||
rollout_path.is_none(),
|
||||
"fresh threads should not create rollout files until first turn"
|
||||
);
|
||||
|
||||
// A corresponding thread/started notification should arrive.
|
||||
let notif: JSONRPCNotification = timeout(
|
||||
@@ -69,6 +77,28 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
|
||||
serde_json::from_value(notif.params.expect("params must be present"))?;
|
||||
assert_eq!(started.thread, thread);
|
||||
|
||||
// First turn should create the rollout file lazily.
|
||||
let turn_id = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![UserInput::Text {
|
||||
text: "Hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let _: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
||||
)
|
||||
.await??;
|
||||
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id).await?;
|
||||
assert!(
|
||||
rollout_path.is_some(),
|
||||
"first turn should create rollout file"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
@@ -9,6 +10,8 @@ use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadUnarchiveParams;
|
||||
use codex_app_server_protocol::ThreadUnarchiveResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_core::find_archived_thread_path_by_id_str;
|
||||
use codex_core::find_thread_path_by_id_str;
|
||||
use std::fs::FileTimes;
|
||||
@@ -24,7 +27,8 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
|
||||
#[tokio::test]
|
||||
async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path())?;
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
@@ -42,6 +46,22 @@ async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
||||
|
||||
let turn_id = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![UserInput::Text {
|
||||
text: "hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let _: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id)
|
||||
.await?
|
||||
.expect("expected rollout path for thread id to exist");
|
||||
@@ -108,14 +128,25 @@ async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(config_toml, config_contents())
|
||||
std::fs::write(config_toml, config_contents(server_uri))
|
||||
}
|
||||
|
||||
fn config_contents() -> &'static str {
|
||||
r#"model = "mock-model"
|
||||
fn config_contents(server_uri: &str) -> String {
|
||||
format!(
|
||||
r#"model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "read-only"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "responses"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
)
|
||||
}
|
||||
|
||||
@@ -858,26 +858,9 @@ impl Session {
|
||||
|
||||
let forked_from_id = initial_history.forked_from_id();
|
||||
|
||||
let (conversation_id, rollout_params) = match &initial_history {
|
||||
InitialHistory::New | InitialHistory::Forked(_) => {
|
||||
let conversation_id = ThreadId::default();
|
||||
(
|
||||
conversation_id,
|
||||
RolloutRecorderParams::new(
|
||||
conversation_id,
|
||||
forked_from_id,
|
||||
session_source,
|
||||
BaseInstructions {
|
||||
text: session_configuration.base_instructions.clone(),
|
||||
},
|
||||
session_configuration.dynamic_tools.clone(),
|
||||
),
|
||||
)
|
||||
}
|
||||
InitialHistory::Resumed(resumed_history) => (
|
||||
resumed_history.conversation_id,
|
||||
RolloutRecorderParams::resume(resumed_history.rollout_path.clone()),
|
||||
),
|
||||
let conversation_id = match &initial_history {
|
||||
InitialHistory::New | InitialHistory::Forked(_) => ThreadId::default(),
|
||||
InitialHistory::Resumed(resumed_history) => resumed_history.conversation_id,
|
||||
};
|
||||
let state_builder = match &initial_history {
|
||||
InitialHistory::Resumed(resumed) => metadata::builder_from_items(
|
||||
@@ -894,17 +877,52 @@ impl Session {
|
||||
// - load history metadata
|
||||
let rollout_fut = async {
|
||||
if config.ephemeral {
|
||||
Ok::<_, anyhow::Error>((None, None))
|
||||
Ok::<_, anyhow::Error>((None, None, None))
|
||||
} else {
|
||||
let state_db_ctx = state_db::init_if_enabled(&config, None).await;
|
||||
let rollout_recorder = RolloutRecorder::new(
|
||||
&config,
|
||||
rollout_params,
|
||||
state_db_ctx.clone(),
|
||||
state_builder.clone(),
|
||||
)
|
||||
.await?;
|
||||
Ok((Some(rollout_recorder), state_db_ctx))
|
||||
match &initial_history {
|
||||
InitialHistory::New => {
|
||||
let rollout_params = RolloutRecorderParams::new(
|
||||
conversation_id,
|
||||
forked_from_id,
|
||||
session_source.clone(),
|
||||
BaseInstructions {
|
||||
text: session_configuration.base_instructions.clone(),
|
||||
},
|
||||
session_configuration.dynamic_tools.clone(),
|
||||
);
|
||||
Ok((None, state_db_ctx, Some(rollout_params)))
|
||||
}
|
||||
InitialHistory::Forked(_) => {
|
||||
let rollout_params = RolloutRecorderParams::new(
|
||||
conversation_id,
|
||||
forked_from_id,
|
||||
session_source.clone(),
|
||||
BaseInstructions {
|
||||
text: session_configuration.base_instructions.clone(),
|
||||
},
|
||||
session_configuration.dynamic_tools.clone(),
|
||||
);
|
||||
let rollout_recorder = RolloutRecorder::new(
|
||||
&config,
|
||||
rollout_params,
|
||||
state_db_ctx.clone(),
|
||||
state_builder.clone(),
|
||||
)
|
||||
.await?;
|
||||
Ok((Some(rollout_recorder), state_db_ctx, None))
|
||||
}
|
||||
InitialHistory::Resumed(resumed_history) => {
|
||||
let rollout_recorder = RolloutRecorder::new(
|
||||
&config,
|
||||
RolloutRecorderParams::resume(resumed_history.rollout_path.clone()),
|
||||
state_db_ctx.clone(),
|
||||
state_builder.clone(),
|
||||
)
|
||||
.await?;
|
||||
Ok((Some(rollout_recorder), state_db_ctx, None))
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -929,10 +947,11 @@ impl Session {
|
||||
(auth, mcp_servers, auth_statuses),
|
||||
) = tokio::join!(rollout_fut, history_meta_fut, auth_and_mcp_fut);
|
||||
|
||||
let (rollout_recorder, state_db_ctx) = rollout_recorder_and_state_db.map_err(|e| {
|
||||
error!("failed to initialize rollout recorder: {e:#}");
|
||||
e
|
||||
})?;
|
||||
let (rollout_recorder, state_db_ctx, pending_rollout_create) =
|
||||
rollout_recorder_and_state_db.map_err(|e| {
|
||||
error!("failed to initialize rollout recorder: {e:#}");
|
||||
e
|
||||
})?;
|
||||
let rollout_path = rollout_recorder
|
||||
.as_ref()
|
||||
.map(|rec| rec.rollout_path.clone());
|
||||
@@ -1042,6 +1061,7 @@ impl Session {
|
||||
),
|
||||
hooks: Hooks::new(config.as_ref()),
|
||||
rollout: Mutex::new(rollout_recorder),
|
||||
pending_rollout_create: Mutex::new(pending_rollout_create),
|
||||
user_shell: Arc::new(default_shell),
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
exec_policy,
|
||||
@@ -1158,6 +1178,50 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
async fn ensure_rollout_initialized_for_turn(
|
||||
&self,
|
||||
turn_context: &TurnContext,
|
||||
) -> std::io::Result<()> {
|
||||
{
|
||||
let rollout = self.services.rollout.lock().await;
|
||||
if rollout.is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
let params = {
|
||||
let mut pending = self.services.pending_rollout_create.lock().await;
|
||||
pending.take()
|
||||
};
|
||||
let Some(params) = params else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let recorder = match RolloutRecorder::new(
|
||||
turn_context.config.as_ref(),
|
||||
params.clone(),
|
||||
self.services.state_db.clone(),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(recorder) => recorder,
|
||||
Err(err) => {
|
||||
let mut pending = self.services.pending_rollout_create.lock().await;
|
||||
if pending.is_none() {
|
||||
*pending = Some(params);
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
let mut rollout = self.services.rollout.lock().await;
|
||||
if rollout.is_none() {
|
||||
*rollout = Some(recorder);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn next_internal_sub_id(&self) -> String {
|
||||
let id = self
|
||||
.next_internal_sub_id
|
||||
@@ -1186,14 +1250,8 @@ impl Session {
|
||||
let turn_context = self.new_default_turn().await;
|
||||
match conversation_history {
|
||||
InitialHistory::New => {
|
||||
// Build and record initial items (user instructions + environment context)
|
||||
let items = self.build_initial_context(&turn_context).await;
|
||||
self.record_conversation_items(&turn_context, &items).await;
|
||||
{
|
||||
let mut state = self.state.lock().await;
|
||||
state.initial_context_seeded = true;
|
||||
}
|
||||
// Ensure initial items are visible to immediate readers (e.g., tests, forks).
|
||||
// Defer initial context persistence until the first turn starts.
|
||||
// This lets turn/start overrides be reflected in the seeded context.
|
||||
self.flush_rollout().await;
|
||||
}
|
||||
InitialHistory::Resumed(resumed_history) => {
|
||||
@@ -2898,6 +2956,20 @@ mod handlers {
|
||||
|
||||
// Attempt to inject input into current task.
|
||||
if let Err(SteerInputError::NoActiveTurn(items)) = sess.steer_input(items, None).await {
|
||||
if let Err(err) = sess
|
||||
.ensure_rollout_initialized_for_turn(¤t_context)
|
||||
.await
|
||||
{
|
||||
sess.send_event_raw(Event {
|
||||
id: current_context.sub_id.clone(),
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: format!("failed to initialize rollout recorder: {err}"),
|
||||
codex_error_info: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
sess.seed_initial_context_if_needed(¤t_context).await;
|
||||
let resumed_model = sess.take_pending_resume_previous_model().await;
|
||||
let update_items = sess.build_settings_update_items(
|
||||
@@ -5063,6 +5135,25 @@ mod tests {
|
||||
assert_eq!(expected, history.raw_items());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn record_initial_history_new_defers_initial_context_until_first_turn() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
{
|
||||
let mut state = session.state.lock().await;
|
||||
state.initial_context_seeded = false;
|
||||
}
|
||||
|
||||
session.record_initial_history(InitialHistory::New).await;
|
||||
|
||||
let history = session.state.lock().await.clone_history();
|
||||
assert_eq!(history.raw_items(), Vec::<ResponseItem>::new());
|
||||
|
||||
session.seed_initial_context_if_needed(&turn_context).await;
|
||||
let expected = session.build_initial_context(&turn_context).await;
|
||||
let history = session.state.lock().await.clone_history();
|
||||
assert_eq!(history.raw_items(), expected);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn resumed_history_seeds_initial_context_on_first_turn_only() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
@@ -5220,6 +5311,86 @@ mod tests {
|
||||
assert_eq!(expected, history.raw_items());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn lazy_rollout_creation_writes_session_meta_then_initial_context_then_turn_context() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
{
|
||||
let mut state = session.state.lock().await;
|
||||
state.initial_context_seeded = false;
|
||||
}
|
||||
let base_instructions = session.get_base_instructions().await;
|
||||
let session_source = {
|
||||
let state = session.state.lock().await;
|
||||
state.session_configuration.session_source.clone()
|
||||
};
|
||||
{
|
||||
let mut pending = session.services.pending_rollout_create.lock().await;
|
||||
*pending = Some(RolloutRecorderParams::new(
|
||||
session.conversation_id,
|
||||
None,
|
||||
session_source,
|
||||
base_instructions,
|
||||
Vec::new(),
|
||||
));
|
||||
}
|
||||
|
||||
let conversation_id = session.conversation_id.to_string();
|
||||
let rollout_path = crate::rollout::find_thread_path_by_id_str(
|
||||
&turn_context.config.codex_home,
|
||||
&conversation_id,
|
||||
)
|
||||
.await
|
||||
.expect("lookup rollout path before first turn");
|
||||
assert_eq!(rollout_path, None);
|
||||
|
||||
session
|
||||
.ensure_rollout_initialized_for_turn(&turn_context)
|
||||
.await
|
||||
.expect("initialize rollout recorder lazily");
|
||||
session.seed_initial_context_if_needed(&turn_context).await;
|
||||
session
|
||||
.persist_rollout_items(&[RolloutItem::TurnContext(TurnContextItem {
|
||||
cwd: turn_context.cwd.clone(),
|
||||
approval_policy: turn_context.approval_policy,
|
||||
sandbox_policy: turn_context.sandbox_policy.clone(),
|
||||
model: turn_context.model_info.slug.clone(),
|
||||
personality: turn_context.personality,
|
||||
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
|
||||
effort: turn_context.reasoning_effort,
|
||||
summary: turn_context.reasoning_summary,
|
||||
user_instructions: turn_context.user_instructions.clone(),
|
||||
developer_instructions: turn_context.developer_instructions.clone(),
|
||||
final_output_json_schema: turn_context.final_output_json_schema.clone(),
|
||||
truncation_policy: Some(turn_context.truncation_policy.into()),
|
||||
})])
|
||||
.await;
|
||||
session.flush_rollout().await;
|
||||
|
||||
let rollout_path = crate::rollout::find_thread_path_by_id_str(
|
||||
&turn_context.config.codex_home,
|
||||
&conversation_id,
|
||||
)
|
||||
.await
|
||||
.expect("lookup rollout path after first turn")
|
||||
.expect("rollout path should exist");
|
||||
let (items, _, _) = RolloutRecorder::load_rollout_items(&rollout_path)
|
||||
.await
|
||||
.expect("load rollout items");
|
||||
|
||||
assert!(matches!(items.first(), Some(RolloutItem::SessionMeta(_))));
|
||||
let initial_context_len = session.build_initial_context(&turn_context).await.len();
|
||||
assert_eq!(
|
||||
items
|
||||
.iter()
|
||||
.skip(1)
|
||||
.take(initial_context_len)
|
||||
.filter(|item| matches!(item, RolloutItem::ResponseItem(_)))
|
||||
.count(),
|
||||
initial_context_len
|
||||
);
|
||||
assert!(matches!(items.last(), Some(RolloutItem::TurnContext(_))));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_rollback_drops_last_turn_from_history() {
|
||||
let (sess, tc, rx) = make_session_and_context_with_rx().await;
|
||||
@@ -5776,6 +5947,7 @@ mod tests {
|
||||
),
|
||||
hooks: Hooks::new(&config),
|
||||
rollout: Mutex::new(None),
|
||||
pending_rollout_create: Mutex::new(None),
|
||||
user_shell: Arc::new(default_user_shell()),
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
exec_policy,
|
||||
@@ -5906,6 +6078,7 @@ mod tests {
|
||||
),
|
||||
hooks: Hooks::new(&config),
|
||||
rollout: Mutex::new(None),
|
||||
pending_rollout_create: Mutex::new(None),
|
||||
user_shell: Arc::new(default_user_shell()),
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
exec_policy,
|
||||
|
||||
@@ -10,6 +10,7 @@ use crate::file_watcher::FileWatcher;
|
||||
use crate::hooks::Hooks;
|
||||
use crate::mcp_connection_manager::McpConnectionManager;
|
||||
use crate::models_manager::manager::ModelsManager;
|
||||
use crate::rollout::RolloutRecorderParams;
|
||||
use crate::skills::SkillsManager;
|
||||
use crate::state_db::StateDbHandle;
|
||||
use crate::tools::sandboxing::ApprovalStore;
|
||||
@@ -26,6 +27,7 @@ pub(crate) struct SessionServices {
|
||||
pub(crate) analytics_events_client: AnalyticsEventsClient,
|
||||
pub(crate) hooks: Hooks,
|
||||
pub(crate) rollout: Mutex<Option<RolloutRecorder>>,
|
||||
pub(crate) pending_rollout_create: Mutex<Option<RolloutRecorderParams>>,
|
||||
pub(crate) user_shell: Arc<crate::shell::Shell>,
|
||||
pub(crate) show_raw_agent_reasoning: bool,
|
||||
pub(crate) exec_policy: ExecPolicyManager,
|
||||
|
||||
Reference in New Issue
Block a user