codex: route metadata updates through ThreadStore (#20576)

- Route `thread/metadata/update` through
`ThreadStore::update_thread_metadata`.
- Add `LocalThreadStore` git metadata patch support for set, partial
update, and clear semantics.
- Add some unit tests for the new thread store code
- Remove a lot of dead code/tests!
This commit is contained in:
Tom
2026-05-04 20:09:41 -07:00
committed by GitHub
parent 0d418f478d
commit 707e51bd8b
8 changed files with 677 additions and 193 deletions

View File

@@ -11,6 +11,7 @@ use crate::CreateThreadParams;
use crate::LoadThreadHistoryParams;
use crate::LocalThreadStore;
use crate::ResumeThreadParams;
use crate::StoredThread;
use crate::StoredThreadHistory;
use crate::ThreadMetadataPatch;
use crate::ThreadStore;
@@ -157,6 +158,20 @@ impl LiveThread {
Ok(())
}
pub async fn update_metadata(
&self,
patch: ThreadMetadataPatch,
include_archived: bool,
) -> ThreadStoreResult<StoredThread> {
self.thread_store
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id: self.thread_id,
patch,
include_archived,
})
.await
}
/// Returns the live local rollout path for legacy local-only callers.
///
/// Remote stores do not expose rollout files, so they return `Ok(None)`.

View File

@@ -1,7 +1,9 @@
use std::path::Path;
use std::path::PathBuf;
use codex_protocol::ThreadId;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::GitInfo;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::ThreadMemoryMode;
use codex_protocol::protocol::ThreadNameUpdatedEvent;
@@ -13,7 +15,9 @@ use codex_rollout::find_thread_path_by_id_str;
use codex_rollout::read_session_meta_line;
use super::LocalThreadStore;
use super::helpers::git_info_from_parts;
use super::live_writer;
use crate::GitInfoPatch;
use crate::ReadThreadParams;
use crate::StoredThread;
use crate::ThreadStoreError;
@@ -30,13 +34,10 @@ pub(super) async fn update_thread_metadata(
store: &LocalThreadStore,
params: UpdateThreadMetadataParams,
) -> ThreadStoreResult<StoredThread> {
if params.patch.git_info.is_some() {
return Err(ThreadStoreError::Internal {
message: "local thread store does not implement git metadata updates in this slice"
.to_string(),
});
}
if params.patch.name.is_some() && params.patch.memory_mode.is_some() {
let field_count = usize::from(params.patch.name.is_some())
+ usize::from(params.patch.memory_mode.is_some())
+ usize::from(params.patch.git_info.is_some());
if field_count > 1 {
return Err(ThreadStoreError::InvalidRequest {
message: "local thread store applies one metadata field per patch in this slice"
.to_string(),
@@ -44,8 +45,12 @@ pub(super) async fn update_thread_metadata(
}
let thread_id = params.thread_id;
if live_writer::rollout_path(store, thread_id).await.is_ok() {
live_writer::persist_thread(store, thread_id).await?;
}
let resolved_rollout_path =
resolve_rollout_path(store, thread_id, params.include_archived).await?;
let git_info = params.patch.git_info;
if let Some(name) = params.patch.name {
apply_thread_name(store, resolved_rollout_path.path.as_path(), thread_id, name).await?;
}
@@ -66,7 +71,59 @@ pub(super) async fn update_thread_metadata(
)
.await;
match read_thread::read_thread(
let resolved_git_info = match git_info {
Some(git_info) => {
let Some(state_db) = store.state_db().await else {
return Err(ThreadStoreError::Internal {
message: format!("sqlite state db unavailable for thread {thread_id}"),
});
};
let metadata =
state_db
.get_thread(thread_id)
.await
.map_err(|err| ThreadStoreError::Internal {
message: format!(
"failed to read git metadata for thread {thread_id}: {err}"
),
})?;
let Some(metadata) = metadata else {
return Err(ThreadStoreError::Internal {
message: format!("thread metadata unavailable before git update: {thread_id}"),
});
};
let memory_mode = state_db
.get_thread_memory_mode(thread_id)
.await
.map_err(|err| ThreadStoreError::Internal {
message: format!("failed to read memory mode for thread {thread_id}: {err}"),
})?;
let existing_git_info = git_info_from_parts(
metadata.git_sha,
metadata.git_branch,
metadata.git_origin_url,
);
Some((
resolve_git_info_patch(existing_git_info, git_info),
memory_mode,
))
}
None => None,
};
if let Some(((sha, branch, origin_url), memory_mode)) = resolved_git_info.as_ref() {
apply_thread_git_info_to_rollout(
resolved_rollout_path.path.as_path(),
thread_id,
sha,
branch,
origin_url,
memory_mode.as_deref(),
)
.await?;
apply_thread_git_info(store, thread_id, sha, branch, origin_url).await?;
}
let mut thread = match read_thread::read_thread(
store,
ReadThreadParams {
thread_id,
@@ -76,7 +133,7 @@ pub(super) async fn update_thread_metadata(
)
.await
{
Ok(thread) => Ok(thread),
Ok(thread) => thread,
Err(_) => {
read_thread::read_thread_by_rollout_path(
store,
@@ -84,14 +141,104 @@ pub(super) async fn update_thread_metadata(
params.include_archived,
/*include_history*/ false,
)
.await
.await?
}
};
if let Some(((sha, branch, origin_url), _memory_mode)) = resolved_git_info {
thread.git_info = git_info_from_parts(sha, branch, origin_url);
}
Ok(thread)
}
async fn apply_thread_git_info(
store: &LocalThreadStore,
thread_id: ThreadId,
sha: &Option<String>,
branch: &Option<String>,
origin_url: &Option<String>,
) -> ThreadStoreResult<()> {
let Some(state_db) = store.state_db().await else {
return Err(ThreadStoreError::Internal {
message: format!("sqlite state db unavailable for thread {thread_id}"),
});
};
let updated = state_db
.update_thread_git_info(
thread_id,
Some(sha.as_deref()),
Some(branch.as_deref()),
Some(origin_url.as_deref()),
)
.await
.map_err(|err| ThreadStoreError::Internal {
message: format!("failed to update git metadata for thread {thread_id}: {err}"),
})?;
if updated {
Ok(())
} else {
Err(ThreadStoreError::Internal {
message: format!("thread metadata disappeared before update completed: {thread_id}"),
})
}
}
fn resolve_git_info_patch(
existing: Option<GitInfo>,
git_info: GitInfoPatch,
) -> (Option<String>, Option<String>, Option<String>) {
let (existing_sha, existing_branch, existing_origin_url) = match existing {
Some(info) => (
info.commit_hash.map(|sha| sha.0),
info.branch,
info.repository_url,
),
None => (None, None, None),
};
let sha = git_info.sha.unwrap_or(existing_sha);
let branch = git_info.branch.unwrap_or(existing_branch);
let origin_url = git_info.origin_url.unwrap_or(existing_origin_url);
(sha, branch, origin_url)
}
async fn apply_thread_git_info_to_rollout(
rollout_path: &Path,
thread_id: ThreadId,
sha: &Option<String>,
branch: &Option<String>,
origin_url: &Option<String>,
memory_mode: Option<&str>,
) -> ThreadStoreResult<()> {
let mut session_meta =
read_session_meta_line(rollout_path)
.await
.map_err(|err| ThreadStoreError::Internal {
message: format!("failed to set thread git metadata: {err}"),
})?;
if session_meta.meta.id != thread_id {
return Err(ThreadStoreError::Internal {
message: format!(
"failed to set thread git metadata: rollout session metadata id mismatch: expected {thread_id}, found {}",
session_meta.meta.id
),
});
}
session_meta.git = Some(GitInfo {
commit_hash: sha.as_deref().map(codex_git_utils::GitSha::new),
branch: branch.clone(),
repository_url: origin_url.clone(),
});
session_meta.meta.memory_mode = memory_mode.map(str::to_string);
append_rollout_item_to_path(rollout_path, &RolloutItem::SessionMeta(session_meta))
.await
.map_err(|err| ThreadStoreError::Internal {
message: format!("failed to set thread git metadata: {err}"),
})
}
async fn apply_thread_name(
store: &LocalThreadStore,
rollout_path: &std::path::Path,
rollout_path: &Path,
thread_id: ThreadId,
name: String,
) -> ThreadStoreResult<()> {
@@ -113,7 +260,7 @@ async fn apply_thread_name(
}
async fn apply_thread_memory_mode(
rollout_path: &std::path::Path,
rollout_path: &Path,
thread_id: ThreadId,
memory_mode: ThreadMemoryMode,
) -> ThreadStoreResult<()> {
@@ -132,6 +279,9 @@ async fn apply_thread_memory_mode(
});
}
// Memory-mode updates should not modify git metadata. The rollout replay
// code will preserve the latest prior git marker when this field is absent.
session_meta.git = None;
session_meta.meta.memory_mode = Some(memory_mode_as_str(memory_mode).to_string());
append_rollout_item_to_path(rollout_path, &RolloutItem::SessionMeta(session_meta))
.await
@@ -196,7 +346,7 @@ async fn resolve_rollout_path(
})
}
fn rollout_path_is_archived(store: &LocalThreadStore, path: &std::path::Path) -> bool {
fn rollout_path_is_archived(store: &LocalThreadStore, path: &Path) -> bool {
path.starts_with(store.config.codex_home.join(ARCHIVED_SESSIONS_SUBDIR))
}
@@ -204,10 +354,12 @@ fn rollout_path_is_archived(store: &LocalThreadStore, path: &std::path::Path) ->
mod tests {
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use tempfile::TempDir;
use uuid::Uuid;
use super::*;
use crate::GitInfoPatch;
use crate::ResumeThreadParams;
use crate::ThreadEventPersistenceMode;
use crate::ThreadMetadataPatch;
@@ -292,6 +444,75 @@ mod tests {
assert_eq!(memory_mode.as_deref(), Some("disabled"));
}
#[tokio::test]
async fn update_thread_metadata_preserves_memory_mode_when_updating_git_info() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let uuid = Uuid::from_u128(312);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let path =
write_session_file(home.path(), "2025-01-03T18-30-00", uuid).expect("session file");
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
store
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id,
patch: ThreadMetadataPatch {
memory_mode: Some(ThreadMemoryMode::Disabled),
..Default::default()
},
include_archived: false,
})
.await
.expect("set memory mode");
let thread = store
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id,
patch: ThreadMetadataPatch {
git_info: Some(GitInfoPatch {
branch: Some(Some("feature".to_string())),
..Default::default()
}),
..Default::default()
},
include_archived: false,
})
.await
.expect("set git metadata");
assert_eq!(
thread.git_info.expect("git info").branch.as_deref(),
Some("feature")
);
let appended = last_rollout_item(path.as_path());
assert_eq!(appended["type"], "session_meta");
assert_eq!(appended["payload"]["memory_mode"], "disabled");
assert_eq!(appended["payload"]["git"]["branch"], "feature");
codex_rollout::state_db::reconcile_rollout(
Some(runtime.as_ref()),
path.as_path(),
config.default_model_provider_id.as_str(),
/*builder*/ None,
&[],
/*archived_only*/ None,
/*new_thread_memory_mode*/ None,
)
.await;
let memory_mode = runtime
.get_thread_memory_mode(thread_id)
.await
.expect("thread memory mode should be readable");
assert_eq!(memory_mode.as_deref(), Some("disabled"));
}
#[tokio::test]
async fn update_thread_metadata_uses_live_rollout_path_for_external_resume() {
let home = TempDir::new().expect("temp dir");
@@ -333,6 +554,278 @@ mod tests {
assert_eq!(appended["payload"]["memory_mode"], "disabled");
}
#[tokio::test]
async fn update_thread_metadata_sets_git_info() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config, Some(runtime));
let uuid = Uuid::from_u128(309);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
write_session_file(home.path(), "2025-01-03T17-00-00", uuid).expect("session file");
let thread = store
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id,
patch: ThreadMetadataPatch {
git_info: Some(GitInfoPatch {
sha: Some(Some("abc123".to_string())),
branch: Some(Some("main".to_string())),
origin_url: Some(Some("https://github.com/openai/codex".to_string())),
}),
..Default::default()
},
include_archived: false,
})
.await
.expect("set git metadata");
let git_info = thread.git_info.expect("git info should be present");
assert_eq!(
git_info.commit_hash.as_ref().map(|sha| sha.0.as_str()),
Some("abc123")
);
assert_eq!(git_info.branch.as_deref(), Some("main"));
assert_eq!(
git_info.repository_url.as_deref(),
Some("https://github.com/openai/codex")
);
}
#[tokio::test]
async fn update_thread_metadata_partially_updates_git_info() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config, Some(runtime));
let uuid = Uuid::from_u128(310);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
write_session_file(home.path(), "2025-01-03T17-30-00", uuid).expect("session file");
store
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id,
patch: ThreadMetadataPatch {
git_info: Some(GitInfoPatch {
sha: Some(Some("abc123".to_string())),
branch: Some(Some("main".to_string())),
origin_url: Some(Some("https://github.com/openai/codex".to_string())),
}),
..Default::default()
},
include_archived: false,
})
.await
.expect("seed git metadata");
let thread = store
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id,
patch: ThreadMetadataPatch {
git_info: Some(GitInfoPatch {
branch: Some(Some("feature".to_string())),
..Default::default()
}),
..Default::default()
},
include_archived: false,
})
.await
.expect("partially update git metadata");
let git_info = thread.git_info.expect("git info should be present");
assert_eq!(
git_info.commit_hash.as_ref().map(|sha| sha.0.as_str()),
Some("abc123")
);
assert_eq!(git_info.branch.as_deref(), Some("feature"));
assert_eq!(
git_info.repository_url.as_deref(),
Some("https://github.com/openai/codex")
);
}
#[tokio::test]
async fn update_thread_metadata_clears_git_info_fields() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
let uuid = Uuid::from_u128(311);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let path =
write_session_file(home.path(), "2025-01-03T18-00-00", uuid).expect("session file");
store
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id,
patch: ThreadMetadataPatch {
git_info: Some(GitInfoPatch {
sha: Some(Some("abc123".to_string())),
branch: Some(Some("main".to_string())),
origin_url: Some(Some("https://github.com/openai/codex".to_string())),
}),
..Default::default()
},
include_archived: false,
})
.await
.expect("seed git metadata");
let thread = store
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id,
patch: ThreadMetadataPatch {
git_info: Some(GitInfoPatch {
sha: Some(None),
branch: Some(None),
origin_url: Some(None),
}),
..Default::default()
},
include_archived: false,
})
.await
.expect("clear git metadata");
assert!(thread.git_info.is_none());
let appended = last_rollout_item(path.as_path());
assert_eq!(appended["type"], "session_meta");
assert_eq!(appended["payload"]["git"], json!({}));
codex_rollout::state_db::reconcile_rollout(
Some(runtime.as_ref()),
path.as_path(),
config.default_model_provider_id.as_str(),
/*builder*/ None,
&[],
/*archived_only*/ None,
/*new_thread_memory_mode*/ None,
)
.await;
let thread = store
.read_thread(ReadThreadParams {
thread_id,
include_archived: false,
include_history: false,
})
.await
.expect("read thread after reconcile");
assert!(thread.git_info.is_none());
store
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id,
patch: ThreadMetadataPatch {
memory_mode: Some(ThreadMemoryMode::Disabled),
..Default::default()
},
include_archived: false,
})
.await
.expect("set memory mode after git clear");
let appended = last_rollout_item(path.as_path());
assert_eq!(appended["type"], "session_meta");
assert_eq!(appended["payload"].get("git"), None);
codex_rollout::state_db::reconcile_rollout(
Some(runtime.as_ref()),
path.as_path(),
config.default_model_provider_id.as_str(),
/*builder*/ None,
&[],
/*archived_only*/ None,
/*new_thread_memory_mode*/ None,
)
.await;
let thread = store
.read_thread(ReadThreadParams {
thread_id,
include_archived: false,
include_history: false,
})
.await
.expect("read thread after memory mode update with no git");
assert!(thread.git_info.is_none());
assert_eq!(
runtime
.delete_thread(thread_id)
.await
.expect("delete sqlite thread row"),
1
);
let thread = store
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id,
patch: ThreadMetadataPatch {
git_info: Some(GitInfoPatch {
branch: Some(Some("feature".to_string())),
..Default::default()
}),
..Default::default()
},
include_archived: false,
})
.await
.expect("partially update after clear with missing sqlite row");
let git_info = thread.git_info.expect("branch should be present");
assert_eq!(git_info.commit_hash, None);
assert_eq!(git_info.branch.as_deref(), Some("feature"));
assert_eq!(git_info.repository_url, None);
store
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id,
patch: ThreadMetadataPatch {
memory_mode: Some(ThreadMemoryMode::Disabled),
..Default::default()
},
include_archived: false,
})
.await
.expect("set memory mode after git clear and partial update");
let appended = last_rollout_item(path.as_path());
assert_eq!(appended["type"], "session_meta");
assert_eq!(appended["payload"].get("git"), None);
codex_rollout::state_db::reconcile_rollout(
Some(runtime.as_ref()),
path.as_path(),
config.default_model_provider_id.as_str(),
/*builder*/ None,
&[],
/*archived_only*/ None,
/*new_thread_memory_mode*/ None,
)
.await;
let thread = store
.read_thread(ReadThreadParams {
thread_id,
include_archived: false,
include_history: false,
})
.await
.expect("read thread after memory mode update");
let git_info = thread.git_info.expect("branch should remain present");
assert_eq!(git_info.commit_hash, None);
assert_eq!(git_info.branch.as_deref(), Some("feature"));
assert_eq!(git_info.repository_url, None);
}
#[tokio::test]
async fn update_thread_metadata_rejects_mismatched_session_meta_id() {
let home = TempDir::new().expect("temp dir");