[app-server] populate thread>turns>items on thread/resume (#6848)

This PR allows clients to render historical messages when resuming a
thread via `thread/resume` by reading from the list of `EventMsg`
payloads loaded from the rollout, and then transforming them into Turns
and ThreadItems to be returned on the `Thread` object.

This is implemented by leveraging `SessionConfiguredNotification` which
returns this list of `EventMsg` objects when resuming a conversation,
and then applying a stateful `ThreadHistoryBuilder` that parses from
this EventMsg log and transforms it into Turns and ThreadItems.

Note that we only persist a subset of `EventMsg`s in a rollout as
defined in `policy.rs`, so we lose fidelity whenever we resume a thread
compared to when we streamed the thread's turns originally. However,
this behavior is at parity with the legacy API.
This commit is contained in:
Owen Lin
2025-11-19 07:58:09 -08:00
committed by GitHub
parent cfc57e14c7
commit 1924500250
6 changed files with 496 additions and 5 deletions

View File

@@ -7,5 +7,6 @@ pub use export::generate_ts;
pub use export::generate_types;
pub use jsonrpc_lite::*;
pub use protocol::common::*;
pub use protocol::thread_history::*;
pub use protocol::v1::*;
pub use protocol::v2::*;

View File

@@ -2,5 +2,6 @@
// Exposes protocol pieces used by `lib.rs` via `pub use protocol::common::*;`.
pub mod common;
pub mod thread_history;
pub mod v1;
pub mod v2;

View File

@@ -0,0 +1,409 @@
use crate::protocol::v2::ThreadItem;
use crate::protocol::v2::Turn;
use crate::protocol::v2::TurnStatus;
use crate::protocol::v2::UserInput;
use codex_protocol::protocol::AgentReasoningEvent;
use codex_protocol::protocol::AgentReasoningRawContentEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::UserMessageEvent;
/// Convert persisted [`EventMsg`] entries into a sequence of [`Turn`] values.
///
/// The purpose of this is to convert the EventMsgs persisted in a rollout file
/// into a sequence of Turns and ThreadItems, which allows the client to render
/// the historical messages when resuming a thread.
pub fn build_turns_from_event_msgs(events: &[EventMsg]) -> Vec<Turn> {
let mut builder = ThreadHistoryBuilder::new();
for event in events {
builder.handle_event(event);
}
builder.finish()
}
struct ThreadHistoryBuilder {
turns: Vec<Turn>,
current_turn: Option<PendingTurn>,
next_turn_index: i64,
next_item_index: i64,
}
impl ThreadHistoryBuilder {
fn new() -> Self {
Self {
turns: Vec::new(),
current_turn: None,
next_turn_index: 1,
next_item_index: 1,
}
}
fn finish(mut self) -> Vec<Turn> {
self.finish_current_turn();
self.turns
}
/// This function should handle all EventMsg variants that can be persisted in a rollout file.
/// See `should_persist_event_msg` in `codex-rs/core/rollout/policy.rs`.
fn handle_event(&mut self, event: &EventMsg) {
match event {
EventMsg::UserMessage(payload) => self.handle_user_message(payload),
EventMsg::AgentMessage(payload) => self.handle_agent_message(payload.message.clone()),
EventMsg::AgentReasoning(payload) => self.handle_agent_reasoning(payload),
EventMsg::AgentReasoningRawContent(payload) => {
self.handle_agent_reasoning_raw_content(payload)
}
EventMsg::TokenCount(_) => {}
EventMsg::EnteredReviewMode(_) => {}
EventMsg::ExitedReviewMode(_) => {}
EventMsg::UndoCompleted(_) => {}
EventMsg::TurnAborted(payload) => self.handle_turn_aborted(payload),
_ => {}
}
}
fn handle_user_message(&mut self, payload: &UserMessageEvent) {
self.finish_current_turn();
let mut turn = self.new_turn();
let id = self.next_item_id();
let content = self.build_user_inputs(payload);
turn.items.push(ThreadItem::UserMessage { id, content });
self.current_turn = Some(turn);
}
fn handle_agent_message(&mut self, text: String) {
if text.is_empty() {
return;
}
let id = self.next_item_id();
self.ensure_turn()
.items
.push(ThreadItem::AgentMessage { id, text });
}
fn handle_agent_reasoning(&mut self, payload: &AgentReasoningEvent) {
if payload.text.is_empty() {
return;
}
// If the last item is a reasoning item, add the new text to the summary.
if let Some(ThreadItem::Reasoning { summary, .. }) = self.ensure_turn().items.last_mut() {
summary.push(payload.text.clone());
return;
}
// Otherwise, create a new reasoning item.
let id = self.next_item_id();
self.ensure_turn().items.push(ThreadItem::Reasoning {
id,
summary: vec![payload.text.clone()],
content: Vec::new(),
});
}
fn handle_agent_reasoning_raw_content(&mut self, payload: &AgentReasoningRawContentEvent) {
if payload.text.is_empty() {
return;
}
// If the last item is a reasoning item, add the new text to the content.
if let Some(ThreadItem::Reasoning { content, .. }) = self.ensure_turn().items.last_mut() {
content.push(payload.text.clone());
return;
}
// Otherwise, create a new reasoning item.
let id = self.next_item_id();
self.ensure_turn().items.push(ThreadItem::Reasoning {
id,
summary: Vec::new(),
content: vec![payload.text.clone()],
});
}
fn handle_turn_aborted(&mut self, _payload: &TurnAbortedEvent) {
let Some(turn) = self.current_turn.as_mut() else {
return;
};
turn.status = TurnStatus::Interrupted;
}
fn finish_current_turn(&mut self) {
if let Some(turn) = self.current_turn.take() {
if turn.items.is_empty() {
return;
}
self.turns.push(turn.into());
}
}
fn new_turn(&mut self) -> PendingTurn {
PendingTurn {
id: self.next_turn_id(),
items: Vec::new(),
status: TurnStatus::Completed,
}
}
fn ensure_turn(&mut self) -> &mut PendingTurn {
if self.current_turn.is_none() {
let turn = self.new_turn();
return self.current_turn.insert(turn);
}
if let Some(turn) = self.current_turn.as_mut() {
return turn;
}
unreachable!("current turn must exist after initialization");
}
fn next_turn_id(&mut self) -> String {
let id = format!("turn-{}", self.next_turn_index);
self.next_turn_index += 1;
id
}
fn next_item_id(&mut self) -> String {
let id = format!("item-{}", self.next_item_index);
self.next_item_index += 1;
id
}
fn build_user_inputs(&self, payload: &UserMessageEvent) -> Vec<UserInput> {
let mut content = Vec::new();
if !payload.message.trim().is_empty() {
content.push(UserInput::Text {
text: payload.message.clone(),
});
}
if let Some(images) = &payload.images {
for image in images {
content.push(UserInput::Image { url: image.clone() });
}
}
content
}
}
struct PendingTurn {
id: String,
items: Vec<ThreadItem>,
status: TurnStatus,
}
impl From<PendingTurn> for Turn {
fn from(value: PendingTurn) -> Self {
Self {
id: value.id,
items: value.items,
status: value.status,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use codex_protocol::protocol::AgentMessageEvent;
use codex_protocol::protocol::AgentReasoningEvent;
use codex_protocol::protocol::AgentReasoningRawContentEvent;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::UserMessageEvent;
use pretty_assertions::assert_eq;
#[test]
fn builds_multiple_turns_with_reasoning_items() {
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
message: "First turn".into(),
images: Some(vec!["https://example.com/one.png".into()]),
}),
EventMsg::AgentMessage(AgentMessageEvent {
message: "Hi there".into(),
}),
EventMsg::AgentReasoning(AgentReasoningEvent {
text: "thinking".into(),
}),
EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent {
text: "full reasoning".into(),
}),
EventMsg::UserMessage(UserMessageEvent {
message: "Second turn".into(),
images: None,
}),
EventMsg::AgentMessage(AgentMessageEvent {
message: "Reply two".into(),
}),
];
let turns = build_turns_from_event_msgs(&events);
assert_eq!(turns.len(), 2);
let first = &turns[0];
assert_eq!(first.id, "turn-1");
assert_eq!(first.status, TurnStatus::Completed);
assert_eq!(first.items.len(), 3);
assert_eq!(
first.items[0],
ThreadItem::UserMessage {
id: "item-1".into(),
content: vec![
UserInput::Text {
text: "First turn".into(),
},
UserInput::Image {
url: "https://example.com/one.png".into(),
}
],
}
);
assert_eq!(
first.items[1],
ThreadItem::AgentMessage {
id: "item-2".into(),
text: "Hi there".into(),
}
);
assert_eq!(
first.items[2],
ThreadItem::Reasoning {
id: "item-3".into(),
summary: vec!["thinking".into()],
content: vec!["full reasoning".into()],
}
);
let second = &turns[1];
assert_eq!(second.id, "turn-2");
assert_eq!(second.items.len(), 2);
assert_eq!(
second.items[0],
ThreadItem::UserMessage {
id: "item-4".into(),
content: vec![UserInput::Text {
text: "Second turn".into()
}],
}
);
assert_eq!(
second.items[1],
ThreadItem::AgentMessage {
id: "item-5".into(),
text: "Reply two".into(),
}
);
}
#[test]
fn splits_reasoning_when_interleaved() {
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
message: "Turn start".into(),
images: None,
}),
EventMsg::AgentReasoning(AgentReasoningEvent {
text: "first summary".into(),
}),
EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent {
text: "first content".into(),
}),
EventMsg::AgentMessage(AgentMessageEvent {
message: "interlude".into(),
}),
EventMsg::AgentReasoning(AgentReasoningEvent {
text: "second summary".into(),
}),
];
let turns = build_turns_from_event_msgs(&events);
assert_eq!(turns.len(), 1);
let turn = &turns[0];
assert_eq!(turn.items.len(), 4);
assert_eq!(
turn.items[1],
ThreadItem::Reasoning {
id: "item-2".into(),
summary: vec!["first summary".into()],
content: vec!["first content".into()],
}
);
assert_eq!(
turn.items[3],
ThreadItem::Reasoning {
id: "item-4".into(),
summary: vec!["second summary".into()],
content: Vec::new(),
}
);
}
#[test]
fn marks_turn_as_interrupted_when_aborted() {
let events = vec![
EventMsg::UserMessage(UserMessageEvent {
message: "Please do the thing".into(),
images: None,
}),
EventMsg::AgentMessage(AgentMessageEvent {
message: "Working...".into(),
}),
EventMsg::TurnAborted(TurnAbortedEvent {
reason: TurnAbortReason::Replaced,
}),
EventMsg::UserMessage(UserMessageEvent {
message: "Let's try again".into(),
images: None,
}),
EventMsg::AgentMessage(AgentMessageEvent {
message: "Second attempt complete.".into(),
}),
];
let turns = build_turns_from_event_msgs(&events);
assert_eq!(turns.len(), 2);
let first_turn = &turns[0];
assert_eq!(first_turn.status, TurnStatus::Interrupted);
assert_eq!(first_turn.items.len(), 2);
assert_eq!(
first_turn.items[0],
ThreadItem::UserMessage {
id: "item-1".into(),
content: vec![UserInput::Text {
text: "Please do the thing".into()
}],
}
);
assert_eq!(
first_turn.items[1],
ThreadItem::AgentMessage {
id: "item-2".into(),
text: "Working...".into(),
}
);
let second_turn = &turns[1];
assert_eq!(second_turn.status, TurnStatus::Completed);
assert_eq!(second_turn.items.len(), 2);
assert_eq!(
second_turn.items[0],
ThreadItem::UserMessage {
id: "item-3".into(),
content: vec![UserInput::Text {
text: "Let's try again".into()
}],
}
);
assert_eq!(
second_turn.items[1],
ThreadItem::AgentMessage {
id: "item-4".into(),
text: "Second attempt complete.".into(),
}
);
}
}

View File

@@ -517,6 +517,10 @@ pub struct Thread {
pub created_at: i64,
/// [UNSTABLE] Path to the thread on disk.
pub path: PathBuf,
/// Only populated on a `thread/resume` response.
/// For all other responses and notifications returning a Thread,
/// the turns field will be an empty list.
pub turns: Vec<Turn>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
@@ -531,8 +535,9 @@ pub struct AccountUpdatedNotification {
#[ts(export_to = "v2/")]
pub struct Turn {
pub id: String,
/// This is currently only populated for resumed threads.
/// TODO: properly populate items for all turns.
/// Only populated on a `thread/resume` response.
/// For all other responses and notifications returning a Turn,
/// the items field will be an empty list.
pub items: Vec<ThreadItem>,
#[serde(flatten)]
pub status: TurnStatus,

View File

@@ -91,6 +91,7 @@ use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInfoResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_app_server_protocol::UserSavedConfig;
use codex_app_server_protocol::build_turns_from_event_msgs;
use codex_backend_client::Client as BackendClient;
use codex_core::AuthManager;
use codex_core::CodexConversation;
@@ -1652,6 +1653,11 @@ impl CodexMessageProcessor {
session_configured,
..
}) => {
let SessionConfiguredEvent {
rollout_path,
initial_messages,
..
} = session_configured;
// Auto-attach a conversation listener when resuming a thread.
if let Err(err) = self
.attach_conversation_listener(conversation_id, false, ApiVersion::V2)
@@ -1664,8 +1670,8 @@ impl CodexMessageProcessor {
);
}
let thread = match read_summary_from_rollout(
session_configured.rollout_path.as_path(),
let mut thread = match read_summary_from_rollout(
rollout_path.as_path(),
fallback_model_provider.as_str(),
)
.await
@@ -1676,13 +1682,17 @@ impl CodexMessageProcessor {
request_id,
format!(
"failed to load rollout `{}` for conversation {conversation_id}: {err}",
session_configured.rollout_path.display()
rollout_path.display()
),
)
.await;
return;
}
};
thread.turns = initial_messages
.as_deref()
.map_or_else(Vec::new, build_turns_from_event_msgs);
let response = ThreadResumeResponse {
thread,
model: session_configured.model,
@@ -1692,6 +1702,7 @@ impl CodexMessageProcessor {
sandbox: session_configured.sandbox_policy.into(),
reasoning_effort: session_configured.reasoning_effort,
};
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
@@ -2983,6 +2994,7 @@ fn summary_to_thread(summary: ConversationSummary) -> Thread {
model_provider,
created_at: created_at.map(|dt| dt.timestamp()).unwrap_or(0),
path,
turns: Vec::new(),
}
}

View File

@@ -1,13 +1,17 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout;
use app_test_support::create_mock_chat_completions_server;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use tempfile::TempDir;
@@ -58,6 +62,65 @@ async fn thread_resume_returns_original_thread() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_resume_returns_rollout_history() -> Result<()> {
let server = create_mock_chat_completions_server(vec![]).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let preview = "Saved user message";
let conversation_id = create_fake_rollout(
codex_home.path(),
"2025-01-05T12-00-00",
"2025-01-05T12:00:00Z",
preview,
Some("mock_provider"),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: conversation_id.clone(),
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(thread.id, conversation_id);
assert_eq!(thread.preview, preview);
assert_eq!(thread.model_provider, "mock_provider");
assert!(thread.path.is_absolute());
assert_eq!(
thread.turns.len(),
1,
"expected rollouts to include one turn"
);
let turn = &thread.turns[0];
assert_eq!(turn.status, TurnStatus::Completed);
assert_eq!(turn.items.len(), 1, "expected user message item");
match &turn.items[0] {
ThreadItem::UserMessage { content, .. } => {
assert_eq!(
content,
&vec![UserInput::Text {
text: preview.to_string()
}]
);
}
other => panic!("expected user message item, got {other:?}"),
}
Ok(())
}
#[tokio::test]
async fn thread_resume_prefers_path_over_thread_id() -> Result<()> {
let server = create_mock_chat_completions_server(vec![]).await;