Merge branch 'etraut/next-turn-state-app-server' into etraut/next-turn-state-tui

This commit is contained in:
Eric Traut
2026-05-16 18:34:22 -07:00
2 changed files with 78 additions and 127 deletions

View File

@@ -444,13 +444,9 @@ impl TurnRequestProcessor {
.as_ref()
.map(|cwd| AbsolutePathBuf::resolve_path_against_base(cwd, base_snapshot.cwd.as_path()))
.unwrap_or_else(|| base_snapshot.cwd.clone());
let runtime_workspace_roots =
request
.runtime_workspace_roots
.clone()
.map(|workspace_roots| {
resolve_runtime_workspace_roots(workspace_roots, &effective_cwd)
});
let runtime_workspace_roots = request.runtime_workspace_roots.map(|workspace_roots| {
resolve_runtime_workspace_roots(workspace_roots, &effective_cwd)
});
let effective_workspace_roots = effective_workspace_roots(
base_snapshot,
&effective_cwd,

View File

@@ -19,6 +19,7 @@ use codex_app_server_protocol::ThreadTurnContextUpdateParams;
use codex_app_server_protocol::ThreadTurnContextUpdateResponse;
use codex_app_server_protocol::ThreadTurnContextUpdatedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_features::Feature;
use codex_protocol::models::PermissionProfile as CorePermissionProfile;
@@ -56,12 +57,43 @@ async fn start_thread(mcp: &mut McpProcess) -> Result<ThreadStartResponse> {
..Default::default()
})
.await?;
read_response(mcp, request_id).await
}
async fn send_thread_turn_context_update(
mcp: &mut McpProcess,
params: ThreadTurnContextUpdateParams,
) -> Result<ThreadTurnContextUpdateResponse> {
let request_id = mcp.send_thread_turn_context_update_request(params).await?;
read_response(mcp, request_id).await
}
async fn read_response<T: serde::de::DeserializeOwned>(
mcp: &mut McpProcess,
request_id: i64,
) -> Result<T> {
let response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
to_response::<ThreadStartResponse>(response)
to_response(response)
}
fn text_input(text: &str) -> V2UserInput {
V2UserInput::Text {
text: text.to_string(),
text_elements: Vec::new(),
}
}
async fn wait_for_turn_completed(mcp: &mut McpProcess) -> Result<()> {
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
Ok(())
}
async fn read_turn_context_updated(
@@ -114,20 +146,16 @@ async fn thread_turn_context_update_applies_partial_patch_and_emits_full_state()
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let ThreadStartResponse { thread, .. } = start_thread(&mut mcp).await?;
let request_id = mcp
.send_thread_turn_context_update_request(ThreadTurnContextUpdateParams {
let response = send_thread_turn_context_update(
&mut mcp,
ThreadTurnContextUpdateParams {
thread_id: thread.id.clone(),
model: Some("gpt-5.2".to_string()),
effort: Some(Some(ReasoningEffort::High)),
..Default::default()
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
},
)
.await??;
let response = to_response::<ThreadTurnContextUpdateResponse>(response)?;
.await?;
assert_eq!(response.turn_context.model, "gpt-5.2");
assert_eq!(response.turn_context.service_tier.as_deref(), Some("flex"));
@@ -139,20 +167,16 @@ async fn thread_turn_context_update_applies_partial_patch_and_emits_full_state()
assert_eq!(notification.turn_context, response.turn_context);
mcp.clear_message_buffer();
let no_op_request = mcp
.send_thread_turn_context_update_request(ThreadTurnContextUpdateParams {
let no_op_response = send_thread_turn_context_update(
&mut mcp,
ThreadTurnContextUpdateParams {
thread_id: thread.id,
model: Some("gpt-5.2".to_string()),
effort: Some(Some(ReasoningEffort::High)),
..Default::default()
})
.await?;
let no_op_response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(no_op_request)),
},
)
.await??;
let no_op_response = to_response::<ThreadTurnContextUpdateResponse>(no_op_response)?;
.await?;
assert_eq!(no_op_response.turn_context, response.turn_context);
assert!(
!mcp.pending_notification_methods()
@@ -175,20 +199,16 @@ async fn thread_turn_context_update_retargets_permissions_when_cwd_changes() ->
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let ThreadStartResponse { thread, .. } = start_thread(&mut mcp).await?;
let request_id = mcp
.send_thread_turn_context_update_request(ThreadTurnContextUpdateParams {
let response = send_thread_turn_context_update(
&mut mcp,
ThreadTurnContextUpdateParams {
thread_id: thread.id,
cwd: Some(next_cwd.path().to_path_buf()),
permissions: Some(PermissionProfileSelectionParams::new(":workspace")),
..Default::default()
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
},
)
.await??;
let response = to_response::<ThreadTurnContextUpdateResponse>(response)?;
.await?;
assert_eq!(response.turn_context.cwd, next_cwd_abs);
assert_permission_profile_write_root(
@@ -210,19 +230,15 @@ async fn thread_turn_context_update_clears_service_tier_with_explicit_null() ->
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let ThreadStartResponse { thread, .. } = start_thread(&mut mcp).await?;
let request_id = mcp
.send_thread_turn_context_update_request(ThreadTurnContextUpdateParams {
let response = send_thread_turn_context_update(
&mut mcp,
ThreadTurnContextUpdateParams {
thread_id: thread.id,
service_tier: Some(None),
..Default::default()
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
},
)
.await??;
let response = to_response::<ThreadTurnContextUpdateResponse>(response)?;
.await?;
assert_eq!(response.turn_context.service_tier, None);
let notification = read_turn_context_updated(&mut mcp).await?;
@@ -285,10 +301,7 @@ async fn thread_turn_context_update_waits_for_pending_cwd_before_permissions() -
let turn_request_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
input: vec![text_input("Hello")],
cwd: Some(next_cwd.path().to_path_buf()),
..Default::default()
})
@@ -301,17 +314,9 @@ async fn thread_turn_context_update_waits_for_pending_cwd_before_permissions() -
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_request_id)),
)
.await??;
let update_response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(update_request_id)),
)
.await??;
let update_response = to_response::<ThreadTurnContextUpdateResponse>(update_response)?;
let _: TurnStartResponse = read_response(&mut mcp, turn_request_id).await?;
let update_response =
read_response::<ThreadTurnContextUpdateResponse>(&mut mcp, update_request_id).await?;
assert_eq!(update_response.turn_context.cwd, next_cwd_abs);
assert_permission_profile_write_root(
@@ -320,11 +325,7 @@ async fn thread_turn_context_update_waits_for_pending_cwd_before_permissions() -
&thread.cwd,
);
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
wait_for_turn_completed(&mut mcp).await?;
Ok(())
}
@@ -345,20 +346,13 @@ async fn turn_start_emits_turn_context_updated_when_overrides_change_defaults()
let request_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
input: vec![text_input("Hello")],
model: Some("gpt-5.2".to_string()),
effort: Some(ReasoningEffort::Low),
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let _: TurnStartResponse = read_response(&mut mcp, request_id).await?;
let notification = read_turn_context_updated(&mut mcp).await?;
assert_eq!(notification.thread_id, thread.id);
@@ -369,11 +363,7 @@ async fn turn_start_emits_turn_context_updated_when_overrides_change_defaults()
Some("flex")
);
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
wait_for_turn_completed(&mut mcp).await?;
Ok(())
}
@@ -395,10 +385,7 @@ async fn assert_newer_update_survives_turn_start(
let turn_request_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
input: vec![text_input("Hello")],
..turn_start_overrides
})
.await?;
@@ -411,43 +398,26 @@ async fn assert_newer_update_survives_turn_start(
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_request_id)),
)
.await??;
let update_response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(update_request_id)),
)
.await??;
let update_response = to_response::<ThreadTurnContextUpdateResponse>(update_response)?;
let _: TurnStartResponse = read_response(&mut mcp, turn_request_id).await?;
let update_response =
read_response::<ThreadTurnContextUpdateResponse>(&mut mcp, update_request_id).await?;
assert_eq!(update_response.turn_context.model, "gpt-5.4");
assert_eq!(
update_response.turn_context.effort,
Some(ReasoningEffort::High)
);
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
wait_for_turn_completed(&mut mcp).await?;
mcp.clear_message_buffer();
let read_current_request_id = mcp
.send_thread_turn_context_update_request(ThreadTurnContextUpdateParams {
let read_current_response = send_thread_turn_context_update(
&mut mcp,
ThreadTurnContextUpdateParams {
thread_id: thread.id,
..Default::default()
})
.await?;
let read_current_response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_current_request_id)),
},
)
.await??;
let read_current_response =
to_response::<ThreadTurnContextUpdateResponse>(read_current_response)?;
.await?;
assert_eq!(
read_current_response.turn_context,
update_response.turn_context
@@ -482,10 +452,7 @@ async fn queued_updates_keep_each_turn_context_notification_snapshot() -> Result
let turn_request_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
input: vec![text_input("Hello")],
model: Some("gpt-5.2".to_string()),
effort: Some(ReasoningEffort::Low),
..Default::default()
@@ -500,16 +467,8 @@ async fn queued_updates_keep_each_turn_context_notification_snapshot() -> Result
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_request_id)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(update_request_id)),
)
.await??;
let _: TurnStartResponse = read_response(&mut mcp, turn_request_id).await?;
let _: ThreadTurnContextUpdateResponse = read_response(&mut mcp, update_request_id).await?;
let notifications = [
read_turn_context_updated(&mut mcp).await?,
@@ -524,11 +483,7 @@ async fn queued_updates_keep_each_turn_context_notification_snapshot() -> Result
&& notification.turn_context.effort == Some(ReasoningEffort::High)
}));
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
wait_for_turn_completed(&mut mcp).await?;
Ok(())
}