Preserve thread metadata on rollback and unarchive

This commit is contained in:
Guinness Chen
2026-03-25 11:25:54 -07:00
parent 97af7a0649
commit 65b547f643
4 changed files with 73 additions and 3 deletions

View File

@@ -1,4 +1,5 @@
use crate::codex_message_processor::ApiVersion;
use crate::codex_message_processor::merge_thread_metadata_from_state_db_context;
use crate::codex_message_processor::read_rollout_items_from_rollout;
use crate::codex_message_processor::read_summary_from_rollout;
use crate::codex_message_processor::summary_to_thread;
@@ -1810,6 +1811,12 @@ pub(crate) async fn apply_bespoke_event_handling(
{
Ok(summary) => {
let mut thread = summary_to_thread(summary);
merge_thread_metadata_from_state_db_context(
&mut thread,
conversation.state_db().as_ref(),
conversation_id,
)
.await;
match read_rollout_items_from_rollout(rollout_path.as_path()).await {
Ok(items) => {
thread.turns = build_turns_from_rollout_items(&items);

View File

@@ -2965,20 +2965,27 @@ impl CodexMessageProcessor {
message: format!("failed to update unarchived thread timestamp: {err}"),
data: None,
})?;
if let Some(ctx) = state_db_ctx {
if let Some(ref ctx) = state_db_ctx {
let _ = ctx
.mark_unarchived(thread_id, restored_path.as_path())
.await;
}
let summary =
let mut thread =
read_summary_from_rollout(restored_path.as_path(), fallback_provider.as_str())
.await
.map(summary_to_thread)
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to read unarchived thread: {err}"),
data: None,
})?;
Ok(summary_to_thread(summary))
merge_thread_metadata_from_state_db_context(
&mut thread,
state_db_ctx.as_ref(),
thread_id,
)
.await;
Ok(thread)
}
.await;
@@ -8144,6 +8151,18 @@ async fn read_thread_metadata_from_state_db_context_by_thread_id(
}
}
pub(crate) async fn merge_thread_metadata_from_state_db_context(
thread: &mut Thread,
state_db_ctx: Option<&StateDbHandle>,
thread_id: ThreadId,
) {
if let Some(persisted_metadata) =
read_thread_metadata_from_state_db_context_by_thread_id(state_db_ctx, thread_id).await
{
merge_persisted_thread_metadata(thread, &persisted_metadata);
}
}
async fn read_summary_from_state_db_by_thread_id(
config: &Config,
thread_id: ThreadId,

View File

@@ -6,6 +6,7 @@ use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadMetadataUpdateParams;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadRollbackParams;
@@ -96,6 +97,23 @@ async fn thread_rollback_drops_last_turns_and_persists_to_rollout() -> Result<()
)
.await??;
let metadata_update_id = mcp
.send_thread_metadata_update_request(ThreadMetadataUpdateParams {
thread_id: thread.id.clone(),
metadata: Some(
[("source".to_string(), "rollback-test".to_string())]
.into_iter()
.collect(),
),
git_info: None,
})
.await?;
let _metadata_update_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(metadata_update_id)),
)
.await??;
// Roll back the last turn.
let rollback_id = mcp
.send_thread_rollback_request(ThreadRollbackParams {
@@ -127,6 +145,10 @@ async fn thread_rollback_drops_last_turns_and_persists_to_rollout() -> Result<()
assert_eq!(rolled_back_thread.turns.len(), 1);
assert_eq!(rolled_back_thread.status, ThreadStatus::Idle);
assert_eq!(
rolled_back_thread.metadata.get("source"),
Some(&"rollback-test".to_string())
);
assert_eq!(rolled_back_thread.turns[0].items.len(), 2);
match &rolled_back_thread.turns[0].items[0] {
ThreadItem::UserMessage { content, .. } => {

View File

@@ -6,6 +6,7 @@ use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadMetadataUpdateParams;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStatus;
@@ -80,6 +81,23 @@ async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result
.expect("expected rollout path for thread id to exist");
assert_paths_match_on_disk(&found_rollout_path, &rollout_path)?;
let metadata_update_id = mcp
.send_thread_metadata_update_request(ThreadMetadataUpdateParams {
thread_id: thread.id.clone(),
metadata: Some(
[("source".to_string(), "unarchive-test".to_string())]
.into_iter()
.collect(),
),
git_info: None,
})
.await?;
let _metadata_update_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(metadata_update_id)),
)
.await??;
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: thread.id.clone(),
@@ -141,6 +159,10 @@ async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result
"expected updated_at to be bumped on unarchive"
);
assert_eq!(unarchived_thread.status, ThreadStatus::NotLoaded);
assert_eq!(
unarchived_thread.metadata.get("source"),
Some(&"unarchive-test".to_string())
);
// Wire contract: thread title field is `name`, serialized as null when unset.
let thread_json = unarchive_result