mirror of
https://github.com/openai/codex.git
synced 2026-05-02 18:37:01 +00:00
Add excludeTurns parameter to thread/resume and thread/fork (#19014)
For callers who expect to be paginating the results for the UI, they can now call thread/resume or thread/fork with excludeturns:true so it will not fetch any pages of turns, and instead only set up the subscription. That call can be immediately followed by pagination requests to thread/turns/list to fetch pages of turns according to the UI's current interactions.
This commit is contained in:
@@ -241,6 +241,54 @@ async fn thread_fork_emits_restored_token_usage_before_next_turn() -> Result<()>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_fork_can_exclude_turns_and_skip_restored_token_usage() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let conversation_id = create_fake_rollout_with_token_usage(
|
||||
codex_home.path(),
|
||||
"2025-01-05T12-00-00",
|
||||
"2025-01-05T12:00:00Z",
|
||||
"Saved user message",
|
||||
Some("mock_provider"),
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let fork_id = mcp
|
||||
.send_thread_fork_request(ThreadForkParams {
|
||||
thread_id: conversation_id.clone(),
|
||||
exclude_turns: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let fork_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(fork_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadForkResponse { thread, .. } = to_response::<ThreadForkResponse>(fork_resp)?;
|
||||
|
||||
assert_eq!(thread.forked_from_id, Some(conversation_id));
|
||||
assert_eq!(thread.preview, "Saved user message");
|
||||
assert!(thread.turns.is_empty());
|
||||
|
||||
let note = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/tokenUsage/updated"),
|
||||
)
|
||||
.await;
|
||||
assert!(
|
||||
note.is_err(),
|
||||
"excludeTurns=true should not replay token usage"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_fork_tracks_thread_initialized_analytics() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
|
||||
@@ -285,6 +285,45 @@ async fn thread_resume_returns_rollout_history() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_can_skip_turns_for_metadata_only_resume() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let conversation_id = create_fake_rollout_with_text_elements(
|
||||
codex_home.path(),
|
||||
"2025-01-05T12-00-00",
|
||||
"2025-01-05T12:00:00Z",
|
||||
"Saved user message",
|
||||
Vec::new(),
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let resume_id = mcp
|
||||
.send_thread_resume_request(ThreadResumeParams {
|
||||
thread_id: conversation_id.clone(),
|
||||
exclude_turns: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let resume_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
|
||||
|
||||
assert_eq!(thread.id, conversation_id);
|
||||
assert!(thread.turns.is_empty());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_emits_restored_token_usage_before_next_turn() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
@@ -338,6 +377,80 @@ async fn thread_resume_emits_restored_token_usage_before_next_turn() -> Result<(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_skips_restored_token_usage_when_turns_are_excluded() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let conversation_id = create_fake_rollout_with_token_usage(
|
||||
codex_home.path(),
|
||||
"2025-01-05T12-00-00",
|
||||
"2025-01-05T12:00:00Z",
|
||||
"Saved user message",
|
||||
Some("mock_provider"),
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let first_resume_id = mcp
|
||||
.send_thread_resume_request(ThreadResumeParams {
|
||||
thread_id: conversation_id.clone(),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let first_resume_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(first_resume_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadResumeResponse { thread, .. } =
|
||||
to_response::<ThreadResumeResponse>(first_resume_resp)?;
|
||||
let expected_turn_id = thread.turns[0].id.clone();
|
||||
|
||||
let first_note = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/tokenUsage/updated"),
|
||||
)
|
||||
.await??;
|
||||
let parsed: ServerNotification = first_note.try_into()?;
|
||||
let ServerNotification::ThreadTokenUsageUpdated(notification) = parsed else {
|
||||
panic!("expected thread/tokenUsage/updated notification");
|
||||
};
|
||||
assert_eq!(notification.turn_id, expected_turn_id);
|
||||
|
||||
let second_resume_id = mcp
|
||||
.send_thread_resume_request(ThreadResumeParams {
|
||||
thread_id: conversation_id,
|
||||
exclude_turns: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let second_resume_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(second_resume_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadResumeResponse {
|
||||
thread: resumed_again,
|
||||
..
|
||||
} = to_response::<ThreadResumeResponse>(second_resume_resp)?;
|
||||
assert!(resumed_again.turns.is_empty());
|
||||
|
||||
let second_note = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/tokenUsage/updated"),
|
||||
)
|
||||
.await;
|
||||
assert!(
|
||||
second_note.is_err(),
|
||||
"excludeTurns=true should not replay token usage"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_token_usage_replay_ignores_stale_interrupted_tail_turn() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
@@ -1339,6 +1452,84 @@ async fn thread_resume_rejoins_running_thread_even_with_override_mismatch() -> R
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_can_skip_turns_when_thread_is_running() -> Result<()> {
|
||||
let server = responses::start_mock_server().await;
|
||||
let _response_mock = responses::mount_sse_once(
|
||||
&server,
|
||||
responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_assistant_message("msg-1", "Done"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let mut primary = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??;
|
||||
|
||||
let start_id = primary
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("gpt-5.4".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let start_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_response_message(RequestId::Integer(start_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
|
||||
|
||||
let turn_id = primary
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
input: vec![UserInput::Text {
|
||||
text: "seed history".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_response_message(RequestId::Integer(turn_id)),
|
||||
)
|
||||
.await??;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
primary.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let mut secondary = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, secondary.initialize()).await??;
|
||||
|
||||
let resume_id = secondary
|
||||
.send_thread_resume_request(ThreadResumeParams {
|
||||
thread_id: thread.id.clone(),
|
||||
exclude_turns: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let resume_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
secondary.read_stream_until_response_message(RequestId::Integer(resume_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadResumeResponse {
|
||||
thread: resumed, ..
|
||||
} = to_response::<ThreadResumeResponse>(resume_resp)?;
|
||||
|
||||
assert_eq!(resumed.id, thread.id);
|
||||
assert_eq!(resumed.status, ThreadStatus::Idle);
|
||||
assert!(resumed.turns.is_empty());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_resume_replays_pending_command_execution_request_approval() -> Result<()> {
|
||||
let responses = vec![
|
||||
|
||||
Reference in New Issue
Block a user