diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index 55b3aeef13..ff88a076e3 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -263,7 +263,6 @@ use codex_core::exec::ExecCapturePolicy; use codex_core::exec::ExecExpiration; use codex_core::exec::ExecParams; use codex_core::exec_env::create_env; -use codex_core::find_archived_thread_path_by_id_str; use codex_core::find_thread_name_by_id; use codex_core::find_thread_path_by_id_str; use codex_core::path_utils; @@ -370,11 +369,10 @@ use codex_rollout::EventPersistenceMode; use codex_rollout::is_persisted_rollout_item; use codex_rollout::state_db::StateDbHandle; use codex_rollout::state_db::reconcile_rollout; -use codex_state::StateRuntime; use codex_state::ThreadMetadata; -use codex_state::ThreadMetadataBuilder; use codex_state::log_db::LogDbLayer; use codex_thread_store::ArchiveThreadParams as StoreArchiveThreadParams; +use codex_thread_store::GitInfoPatch as StoreGitInfoPatch; use codex_thread_store::ListThreadsParams as StoreListThreadsParams; use codex_thread_store::LocalThreadStore; use codex_thread_store::ReadThreadByRolloutPathParams as StoreReadThreadByRolloutPathParams; diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index 40be4f34ef..75f79b808c 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -1440,54 +1440,49 @@ impl ThreadRequestProcessor { return Err(invalid_request("gitInfo must include at least one field")); } - let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?; - let loaded_thread = self.thread_manager.get_thread(thread_uuid).await.ok(); - let mut state_db_ctx = loaded_thread.as_ref().and_then(|thread| thread.state_db()); - if state_db_ctx.is_none() { - state_db_ctx = self.state_db.clone(); - } - let Some(state_db_ctx) = state_db_ctx else { - return Err(internal_error(format!( - "sqlite state db unavailable for thread {thread_uuid}" - ))); - }; - - self.ensure_thread_metadata_row_exists(thread_uuid, &state_db_ctx, loaded_thread.as_ref()) - .await?; - let git_sha = Self::normalize_thread_metadata_git_field(sha, "gitInfo.sha")?; let git_branch = Self::normalize_thread_metadata_git_field(branch, "gitInfo.branch")?; let git_origin_url = Self::normalize_thread_metadata_git_field(origin_url, "gitInfo.originUrl")?; - let updated = state_db_ctx - .update_thread_git_info( - thread_uuid, - git_sha.as_ref().map(|value| value.as_deref()), - git_branch.as_ref().map(|value| value.as_deref()), - git_origin_url.as_ref().map(|value| value.as_deref()), - ) - .await - .map_err(|err| { - internal_error(format!( - "failed to update thread metadata for {thread_uuid}: {err}" - )) - })?; - if !updated { - return Err(internal_error(format!( - "thread metadata disappeared before update completed: {thread_uuid}" - ))); - } - - let Some(summary) = - read_summary_from_state_db_context_by_thread_id(Some(&state_db_ctx), thread_uuid).await - else { - return Err(internal_error(format!( - "failed to reload updated thread metadata for {thread_uuid}" - ))); + let patch = StoreThreadMetadataPatch { + git_info: Some(StoreGitInfoPatch { + sha: git_sha, + branch: git_branch, + origin_url: git_origin_url, + }), + ..Default::default() }; - let mut thread = summary_to_thread(summary, &self.config.cwd); + let updated_thread = { + let _thread_list_state_permit = self.acquire_thread_list_state_permit().await?; + let loaded_thread = self.thread_manager.get_thread(thread_uuid).await.ok(); + if let Some(loaded_thread) = loaded_thread.as_ref() { + if loaded_thread.config_snapshot().await.ephemeral { + return Err(invalid_request(format!( + "ephemeral thread does not support metadata updates: {thread_id}" + ))); + } + loaded_thread + .update_thread_metadata(patch, /*include_archived*/ true) + .await + } else { + self.thread_store + .update_thread_metadata(StoreUpdateThreadMetadataParams { + thread_id: thread_uuid, + patch, + include_archived: true, + }) + .await + } + .map_err(|err| thread_store_write_error("update thread metadata", err))? + }; + + let (mut thread, _) = thread_from_stored_thread( + updated_thread, + self.config.model_provider_id.as_str(), + &self.config.cwd, + ); self.attach_thread_name(thread_uuid, &mut thread).await; thread.status = resolve_thread_status( self.thread_watch_manager @@ -1516,126 +1511,6 @@ impl ThreadRequestProcessor { } } - async fn ensure_thread_metadata_row_exists( - &self, - thread_uuid: ThreadId, - state_db_ctx: &Arc, - loaded_thread: Option<&Arc>, - ) -> Result<(), JSONRPCErrorError> { - match state_db_ctx.get_thread(thread_uuid).await { - Ok(Some(_)) => return Ok(()), - Ok(None) => {} - Err(err) => { - return Err(internal_error(format!( - "failed to load thread metadata for {thread_uuid}: {err}" - ))); - } - } - - if let Some(thread) = loaded_thread { - let Some(rollout_path) = thread.rollout_path() else { - return Err(invalid_request(format!( - "ephemeral thread does not support metadata updates: {thread_uuid}" - ))); - }; - - reconcile_rollout( - Some(state_db_ctx), - rollout_path.as_path(), - self.config.model_provider_id.as_str(), - /*builder*/ None, - &[], - /*archived_only*/ None, - /*new_thread_memory_mode*/ None, - ) - .await; - - match state_db_ctx.get_thread(thread_uuid).await { - Ok(Some(_)) => return Ok(()), - Ok(None) => {} - Err(err) => { - return Err(internal_error(format!( - "failed to load reconciled thread metadata for {thread_uuid}: {err}" - ))); - } - } - - let config_snapshot = thread.config_snapshot().await; - let model_provider = config_snapshot.model_provider_id.clone(); - let mut builder = ThreadMetadataBuilder::new( - thread_uuid, - rollout_path, - Utc::now(), - config_snapshot.session_source.clone(), - ); - builder.model_provider = Some(model_provider.clone()); - builder.cwd = config_snapshot.cwd.to_path_buf(); - builder.cli_version = Some(env!("CARGO_PKG_VERSION").to_string()); - builder.sandbox_policy = config_snapshot.sandbox_policy(); - builder.approval_mode = config_snapshot.approval_policy; - let metadata = builder.build(model_provider.as_str()); - if let Err(err) = state_db_ctx.insert_thread_if_absent(&metadata).await { - return Err(internal_error(format!( - "failed to create thread metadata for {thread_uuid}: {err}" - ))); - } - return Ok(()); - } - - let rollout_path = match find_thread_path_by_id_str( - &self.config.codex_home, - &thread_uuid.to_string(), - self.state_db.as_deref(), - ) - .await - { - Ok(Some(path)) => path, - Ok(None) => match find_archived_thread_path_by_id_str( - &self.config.codex_home, - &thread_uuid.to_string(), - self.state_db.as_deref(), - ) - .await - { - Ok(Some(path)) => path, - Ok(None) => { - return Err(invalid_request(format!("thread not found: {thread_uuid}"))); - } - Err(err) => { - return Err(internal_error(format!( - "failed to locate archived thread id {thread_uuid}: {err}" - ))); - } - }, - Err(err) => { - return Err(internal_error(format!( - "failed to locate thread id {thread_uuid}: {err}" - ))); - } - }; - - reconcile_rollout( - Some(state_db_ctx), - rollout_path.as_path(), - self.config.model_provider_id.as_str(), - /*builder*/ None, - &[], - /*archived_only*/ None, - /*new_thread_memory_mode*/ None, - ) - .await; - - match state_db_ctx.get_thread(thread_uuid).await { - Ok(Some(_)) => Ok(()), - Ok(None) => Err(internal_error(format!( - "failed to create thread metadata from rollout for {thread_uuid}" - ))), - Err(err) => Err(internal_error(format!( - "failed to load reconciled thread metadata for {thread_uuid}: {err}" - ))), - } - } - async fn thread_unarchive_inner( &self, params: ThreadUnarchiveParams, @@ -3675,19 +3550,6 @@ fn thread_store_archive_error(operation: &str, err: ThreadStoreError) -> JSONRPC } } -async fn read_summary_from_state_db_context_by_thread_id( - state_db_ctx: Option<&StateDbHandle>, - thread_id: ThreadId, -) -> Option { - let state_db_ctx = state_db_ctx?; - - let metadata = match state_db_ctx.get_thread(thread_id).await { - Ok(Some(metadata)) => metadata, - Ok(None) | Err(_) => return None, - }; - Some(summary_from_thread_metadata(&metadata)) -} - async fn title_from_state_db( config: &Config, state_db_ctx: Option<&StateDbHandle>, @@ -3820,6 +3682,7 @@ fn summary_from_stored_thread( } #[allow(clippy::too_many_arguments)] +#[cfg(test)] fn summary_from_state_db_metadata( conversation_id: ThreadId, path: PathBuf, @@ -3864,6 +3727,7 @@ fn summary_from_state_db_metadata( } } +#[cfg(test)] fn summary_from_thread_metadata(metadata: &ThreadMetadata) -> ConversationSummary { summary_from_state_db_metadata( metadata.id, diff --git a/codex-rs/app-server/src/request_processors/thread_processor_tests.rs b/codex-rs/app-server/src/request_processors/thread_processor_tests.rs index 14859d5938..6072fd4222 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor_tests.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor_tests.rs @@ -70,6 +70,7 @@ mod thread_processor_behavior_tests { use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; + use codex_state::ThreadMetadataBuilder; use codex_thread_store::StoredThread; use codex_utils_absolute_path::test_support::PathBufExt; use codex_utils_absolute_path::test_support::test_path_buf; diff --git a/codex-rs/app-server/tests/suite/v2/thread_metadata_update.rs b/codex-rs/app-server/tests/suite/v2/thread_metadata_update.rs index d06b22edc4..430f8e5392 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_metadata_update.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_metadata_update.rs @@ -33,6 +33,7 @@ use tempfile::TempDir; use tokio::time::timeout; const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); +const INVALID_REQUEST_ERROR_CODE: i64 = -32600; #[tokio::test] async fn thread_metadata_update_patches_git_branch_and_returns_updated_thread() -> Result<()> { @@ -170,6 +171,57 @@ async fn thread_metadata_update_rejects_empty_git_info_patch() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_metadata_update_rejects_ephemeral_thread() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let start_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ephemeral: Some(true), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let update_id = mcp + .send_thread_metadata_update_request(ThreadMetadataUpdateParams { + thread_id: thread.id.clone(), + git_info: Some(ThreadMetadataGitInfoUpdateParams { + sha: None, + branch: Some(Some("feature/ephemeral".to_string())), + origin_url: None, + }), + }) + .await?; + let update_err: JSONRPCError = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(update_id)), + ) + .await??; + + assert_eq!(update_err.error.code, INVALID_REQUEST_ERROR_CODE); + assert_eq!( + update_err.error.message, + format!( + "ephemeral thread does not support metadata updates: {}", + thread.id + ) + ); + + Ok(()) +} + #[tokio::test] async fn thread_metadata_update_repairs_missing_sqlite_row_for_stored_thread() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index cc83c0a7c1..3065fcf4aa 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -32,7 +32,9 @@ use codex_protocol::protocol::ThreadMemoryMode; use codex_protocol::protocol::TokenUsageInfo; use codex_protocol::protocol::W3cTraceContext; use codex_protocol::user_input::UserInput; +use codex_thread_store::StoredThread; use codex_thread_store::StoredThreadHistory; +use codex_thread_store::ThreadMetadataPatch; use codex_thread_store::ThreadStoreError; use codex_thread_store::ThreadStoreResult; use codex_utils_absolute_path::AbsolutePathBuf; @@ -411,6 +413,21 @@ impl CodexThread { live_thread.load_history(include_archived).await } + pub async fn update_thread_metadata( + &self, + patch: ThreadMetadataPatch, + include_archived: bool, + ) -> ThreadStoreResult { + let live_thread = self + .codex + .session + .live_thread_for_persistence("update thread metadata") + .map_err(|err| ThreadStoreError::Internal { + message: err.to_string(), + })?; + live_thread.update_metadata(patch, include_archived).await + } + pub fn state_db(&self) -> Option { self.codex.state_db() } diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index 906a3bb39a..5188bc3bc0 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -670,6 +670,9 @@ WHERE id = ? creation_memory_mode: Option<&str>, ) -> anyhow::Result<()> { let updated_at = self.allocate_thread_updated_at(metadata.updated_at)?; + // Backfill/reconcile callers merge existing git info before upserting, but that + // read/modify/write is not atomic. Preserve non-null SQLite git fields here so + // an explicit metadata update cannot be lost if a stale rollout upsert lands later. sqlx::query( r#" INSERT INTO threads ( @@ -722,9 +725,9 @@ ON CONFLICT(id) DO UPDATE SET first_user_message = excluded.first_user_message, archived = excluded.archived, archived_at = excluded.archived_at, - git_sha = excluded.git_sha, - git_branch = excluded.git_branch, - git_origin_url = excluded.git_origin_url + git_sha = COALESCE(threads.git_sha, excluded.git_sha), + git_branch = COALESCE(threads.git_branch, excluded.git_branch), + git_origin_url = COALESCE(threads.git_origin_url, excluded.git_origin_url) "#, ) .bind(metadata.id.to_string()) @@ -1452,6 +1455,47 @@ mod tests { ); } + #[tokio::test] + async fn upsert_thread_preserves_existing_git_fields_atomically() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) + .await + .expect("state db should initialize"); + let thread_id = + ThreadId::from_string("00000000-0000-0000-0000-000000000458").expect("valid thread id"); + let mut metadata = test_thread_metadata(&codex_home, thread_id, codex_home.clone()); + metadata.git_sha = Some("sqlite-sha".to_string()); + metadata.git_branch = Some("sqlite-branch".to_string()); + metadata.git_origin_url = Some("git@example.com:openai/codex.git".to_string()); + + runtime + .upsert_thread(&metadata) + .await + .expect("initial upsert should succeed"); + + let mut rollout_metadata = metadata.clone(); + rollout_metadata.git_sha = Some("rollout-sha".to_string()); + rollout_metadata.git_branch = Some("rollout-branch".to_string()); + rollout_metadata.git_origin_url = Some("https://example.com/repo.git".to_string()); + + runtime + .upsert_thread(&rollout_metadata) + .await + .expect("rollout upsert should succeed"); + + let persisted = runtime + .get_thread(thread_id) + .await + .expect("thread should load") + .expect("thread should exist"); + assert_eq!(persisted.git_sha.as_deref(), Some("sqlite-sha")); + assert_eq!(persisted.git_branch.as_deref(), Some("sqlite-branch")); + assert_eq!( + persisted.git_origin_url.as_deref(), + Some("git@example.com:openai/codex.git") + ); + } + #[tokio::test] async fn update_thread_git_info_preserves_newer_non_git_metadata() { let codex_home = unique_temp_dir(); diff --git a/codex-rs/thread-store/src/live_thread.rs b/codex-rs/thread-store/src/live_thread.rs index bcce1c7645..6cbb1881f0 100644 --- a/codex-rs/thread-store/src/live_thread.rs +++ b/codex-rs/thread-store/src/live_thread.rs @@ -11,6 +11,7 @@ use crate::CreateThreadParams; use crate::LoadThreadHistoryParams; use crate::LocalThreadStore; use crate::ResumeThreadParams; +use crate::StoredThread; use crate::StoredThreadHistory; use crate::ThreadMetadataPatch; use crate::ThreadStore; @@ -157,6 +158,20 @@ impl LiveThread { Ok(()) } + pub async fn update_metadata( + &self, + patch: ThreadMetadataPatch, + include_archived: bool, + ) -> ThreadStoreResult { + self.thread_store + .update_thread_metadata(UpdateThreadMetadataParams { + thread_id: self.thread_id, + patch, + include_archived, + }) + .await + } + /// Returns the live local rollout path for legacy local-only callers. /// /// Remote stores do not expose rollout files, so they return `Ok(None)`. diff --git a/codex-rs/thread-store/src/local/update_thread_metadata.rs b/codex-rs/thread-store/src/local/update_thread_metadata.rs index 677aa3fdce..98ee788662 100644 --- a/codex-rs/thread-store/src/local/update_thread_metadata.rs +++ b/codex-rs/thread-store/src/local/update_thread_metadata.rs @@ -1,7 +1,9 @@ +use std::path::Path; use std::path::PathBuf; use codex_protocol::ThreadId; use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::GitInfo; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::ThreadMemoryMode; use codex_protocol::protocol::ThreadNameUpdatedEvent; @@ -13,7 +15,9 @@ use codex_rollout::find_thread_path_by_id_str; use codex_rollout::read_session_meta_line; use super::LocalThreadStore; +use super::helpers::git_info_from_parts; use super::live_writer; +use crate::GitInfoPatch; use crate::ReadThreadParams; use crate::StoredThread; use crate::ThreadStoreError; @@ -30,13 +34,10 @@ pub(super) async fn update_thread_metadata( store: &LocalThreadStore, params: UpdateThreadMetadataParams, ) -> ThreadStoreResult { - if params.patch.git_info.is_some() { - return Err(ThreadStoreError::Internal { - message: "local thread store does not implement git metadata updates in this slice" - .to_string(), - }); - } - if params.patch.name.is_some() && params.patch.memory_mode.is_some() { + let field_count = usize::from(params.patch.name.is_some()) + + usize::from(params.patch.memory_mode.is_some()) + + usize::from(params.patch.git_info.is_some()); + if field_count > 1 { return Err(ThreadStoreError::InvalidRequest { message: "local thread store applies one metadata field per patch in this slice" .to_string(), @@ -44,8 +45,12 @@ pub(super) async fn update_thread_metadata( } let thread_id = params.thread_id; + if live_writer::rollout_path(store, thread_id).await.is_ok() { + live_writer::persist_thread(store, thread_id).await?; + } let resolved_rollout_path = resolve_rollout_path(store, thread_id, params.include_archived).await?; + let git_info = params.patch.git_info; if let Some(name) = params.patch.name { apply_thread_name(store, resolved_rollout_path.path.as_path(), thread_id, name).await?; } @@ -66,7 +71,59 @@ pub(super) async fn update_thread_metadata( ) .await; - match read_thread::read_thread( + let resolved_git_info = match git_info { + Some(git_info) => { + let Some(state_db) = store.state_db().await else { + return Err(ThreadStoreError::Internal { + message: format!("sqlite state db unavailable for thread {thread_id}"), + }); + }; + let metadata = + state_db + .get_thread(thread_id) + .await + .map_err(|err| ThreadStoreError::Internal { + message: format!( + "failed to read git metadata for thread {thread_id}: {err}" + ), + })?; + let Some(metadata) = metadata else { + return Err(ThreadStoreError::Internal { + message: format!("thread metadata unavailable before git update: {thread_id}"), + }); + }; + let memory_mode = state_db + .get_thread_memory_mode(thread_id) + .await + .map_err(|err| ThreadStoreError::Internal { + message: format!("failed to read memory mode for thread {thread_id}: {err}"), + })?; + let existing_git_info = git_info_from_parts( + metadata.git_sha, + metadata.git_branch, + metadata.git_origin_url, + ); + Some(( + resolve_git_info_patch(existing_git_info, git_info), + memory_mode, + )) + } + None => None, + }; + if let Some(((sha, branch, origin_url), memory_mode)) = resolved_git_info.as_ref() { + apply_thread_git_info_to_rollout( + resolved_rollout_path.path.as_path(), + thread_id, + sha, + branch, + origin_url, + memory_mode.as_deref(), + ) + .await?; + apply_thread_git_info(store, thread_id, sha, branch, origin_url).await?; + } + + let mut thread = match read_thread::read_thread( store, ReadThreadParams { thread_id, @@ -76,7 +133,7 @@ pub(super) async fn update_thread_metadata( ) .await { - Ok(thread) => Ok(thread), + Ok(thread) => thread, Err(_) => { read_thread::read_thread_by_rollout_path( store, @@ -84,14 +141,104 @@ pub(super) async fn update_thread_metadata( params.include_archived, /*include_history*/ false, ) - .await + .await? } + }; + if let Some(((sha, branch, origin_url), _memory_mode)) = resolved_git_info { + thread.git_info = git_info_from_parts(sha, branch, origin_url); } + Ok(thread) +} + +async fn apply_thread_git_info( + store: &LocalThreadStore, + thread_id: ThreadId, + sha: &Option, + branch: &Option, + origin_url: &Option, +) -> ThreadStoreResult<()> { + let Some(state_db) = store.state_db().await else { + return Err(ThreadStoreError::Internal { + message: format!("sqlite state db unavailable for thread {thread_id}"), + }); + }; + let updated = state_db + .update_thread_git_info( + thread_id, + Some(sha.as_deref()), + Some(branch.as_deref()), + Some(origin_url.as_deref()), + ) + .await + .map_err(|err| ThreadStoreError::Internal { + message: format!("failed to update git metadata for thread {thread_id}: {err}"), + })?; + if updated { + Ok(()) + } else { + Err(ThreadStoreError::Internal { + message: format!("thread metadata disappeared before update completed: {thread_id}"), + }) + } +} + +fn resolve_git_info_patch( + existing: Option, + git_info: GitInfoPatch, +) -> (Option, Option, Option) { + let (existing_sha, existing_branch, existing_origin_url) = match existing { + Some(info) => ( + info.commit_hash.map(|sha| sha.0), + info.branch, + info.repository_url, + ), + None => (None, None, None), + }; + let sha = git_info.sha.unwrap_or(existing_sha); + let branch = git_info.branch.unwrap_or(existing_branch); + let origin_url = git_info.origin_url.unwrap_or(existing_origin_url); + (sha, branch, origin_url) +} + +async fn apply_thread_git_info_to_rollout( + rollout_path: &Path, + thread_id: ThreadId, + sha: &Option, + branch: &Option, + origin_url: &Option, + memory_mode: Option<&str>, +) -> ThreadStoreResult<()> { + let mut session_meta = + read_session_meta_line(rollout_path) + .await + .map_err(|err| ThreadStoreError::Internal { + message: format!("failed to set thread git metadata: {err}"), + })?; + if session_meta.meta.id != thread_id { + return Err(ThreadStoreError::Internal { + message: format!( + "failed to set thread git metadata: rollout session metadata id mismatch: expected {thread_id}, found {}", + session_meta.meta.id + ), + }); + } + + session_meta.git = Some(GitInfo { + commit_hash: sha.as_deref().map(codex_git_utils::GitSha::new), + branch: branch.clone(), + repository_url: origin_url.clone(), + }); + session_meta.meta.memory_mode = memory_mode.map(str::to_string); + append_rollout_item_to_path(rollout_path, &RolloutItem::SessionMeta(session_meta)) + .await + .map_err(|err| ThreadStoreError::Internal { + message: format!("failed to set thread git metadata: {err}"), + }) } async fn apply_thread_name( store: &LocalThreadStore, - rollout_path: &std::path::Path, + rollout_path: &Path, thread_id: ThreadId, name: String, ) -> ThreadStoreResult<()> { @@ -113,7 +260,7 @@ async fn apply_thread_name( } async fn apply_thread_memory_mode( - rollout_path: &std::path::Path, + rollout_path: &Path, thread_id: ThreadId, memory_mode: ThreadMemoryMode, ) -> ThreadStoreResult<()> { @@ -132,6 +279,9 @@ async fn apply_thread_memory_mode( }); } + // Memory-mode updates should not modify git metadata. The rollout replay + // code will preserve the latest prior git marker when this field is absent. + session_meta.git = None; session_meta.meta.memory_mode = Some(memory_mode_as_str(memory_mode).to_string()); append_rollout_item_to_path(rollout_path, &RolloutItem::SessionMeta(session_meta)) .await @@ -196,7 +346,7 @@ async fn resolve_rollout_path( }) } -fn rollout_path_is_archived(store: &LocalThreadStore, path: &std::path::Path) -> bool { +fn rollout_path_is_archived(store: &LocalThreadStore, path: &Path) -> bool { path.starts_with(store.config.codex_home.join(ARCHIVED_SESSIONS_SUBDIR)) } @@ -204,10 +354,12 @@ fn rollout_path_is_archived(store: &LocalThreadStore, path: &std::path::Path) -> mod tests { use pretty_assertions::assert_eq; use serde_json::Value; + use serde_json::json; use tempfile::TempDir; use uuid::Uuid; use super::*; + use crate::GitInfoPatch; use crate::ResumeThreadParams; use crate::ThreadEventPersistenceMode; use crate::ThreadMetadataPatch; @@ -292,6 +444,75 @@ mod tests { assert_eq!(memory_mode.as_deref(), Some("disabled")); } + #[tokio::test] + async fn update_thread_metadata_preserves_memory_mode_when_updating_git_info() { + let home = TempDir::new().expect("temp dir"); + let config = test_config(home.path()); + let uuid = Uuid::from_u128(312); + let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); + let path = + write_session_file(home.path(), "2025-01-03T18-30-00", uuid).expect("session file"); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); + + store + .update_thread_metadata(UpdateThreadMetadataParams { + thread_id, + patch: ThreadMetadataPatch { + memory_mode: Some(ThreadMemoryMode::Disabled), + ..Default::default() + }, + include_archived: false, + }) + .await + .expect("set memory mode"); + + let thread = store + .update_thread_metadata(UpdateThreadMetadataParams { + thread_id, + patch: ThreadMetadataPatch { + git_info: Some(GitInfoPatch { + branch: Some(Some("feature".to_string())), + ..Default::default() + }), + ..Default::default() + }, + include_archived: false, + }) + .await + .expect("set git metadata"); + + assert_eq!( + thread.git_info.expect("git info").branch.as_deref(), + Some("feature") + ); + let appended = last_rollout_item(path.as_path()); + assert_eq!(appended["type"], "session_meta"); + assert_eq!(appended["payload"]["memory_mode"], "disabled"); + assert_eq!(appended["payload"]["git"]["branch"], "feature"); + + codex_rollout::state_db::reconcile_rollout( + Some(runtime.as_ref()), + path.as_path(), + config.default_model_provider_id.as_str(), + /*builder*/ None, + &[], + /*archived_only*/ None, + /*new_thread_memory_mode*/ None, + ) + .await; + let memory_mode = runtime + .get_thread_memory_mode(thread_id) + .await + .expect("thread memory mode should be readable"); + assert_eq!(memory_mode.as_deref(), Some("disabled")); + } + #[tokio::test] async fn update_thread_metadata_uses_live_rollout_path_for_external_resume() { let home = TempDir::new().expect("temp dir"); @@ -333,6 +554,278 @@ mod tests { assert_eq!(appended["payload"]["memory_mode"], "disabled"); } + #[tokio::test] + async fn update_thread_metadata_sets_git_info() { + let home = TempDir::new().expect("temp dir"); + let config = test_config(home.path()); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config, Some(runtime)); + let uuid = Uuid::from_u128(309); + let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); + write_session_file(home.path(), "2025-01-03T17-00-00", uuid).expect("session file"); + + let thread = store + .update_thread_metadata(UpdateThreadMetadataParams { + thread_id, + patch: ThreadMetadataPatch { + git_info: Some(GitInfoPatch { + sha: Some(Some("abc123".to_string())), + branch: Some(Some("main".to_string())), + origin_url: Some(Some("https://github.com/openai/codex".to_string())), + }), + ..Default::default() + }, + include_archived: false, + }) + .await + .expect("set git metadata"); + + let git_info = thread.git_info.expect("git info should be present"); + assert_eq!( + git_info.commit_hash.as_ref().map(|sha| sha.0.as_str()), + Some("abc123") + ); + assert_eq!(git_info.branch.as_deref(), Some("main")); + assert_eq!( + git_info.repository_url.as_deref(), + Some("https://github.com/openai/codex") + ); + } + + #[tokio::test] + async fn update_thread_metadata_partially_updates_git_info() { + let home = TempDir::new().expect("temp dir"); + let config = test_config(home.path()); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config, Some(runtime)); + let uuid = Uuid::from_u128(310); + let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); + write_session_file(home.path(), "2025-01-03T17-30-00", uuid).expect("session file"); + + store + .update_thread_metadata(UpdateThreadMetadataParams { + thread_id, + patch: ThreadMetadataPatch { + git_info: Some(GitInfoPatch { + sha: Some(Some("abc123".to_string())), + branch: Some(Some("main".to_string())), + origin_url: Some(Some("https://github.com/openai/codex".to_string())), + }), + ..Default::default() + }, + include_archived: false, + }) + .await + .expect("seed git metadata"); + + let thread = store + .update_thread_metadata(UpdateThreadMetadataParams { + thread_id, + patch: ThreadMetadataPatch { + git_info: Some(GitInfoPatch { + branch: Some(Some("feature".to_string())), + ..Default::default() + }), + ..Default::default() + }, + include_archived: false, + }) + .await + .expect("partially update git metadata"); + + let git_info = thread.git_info.expect("git info should be present"); + assert_eq!( + git_info.commit_hash.as_ref().map(|sha| sha.0.as_str()), + Some("abc123") + ); + assert_eq!(git_info.branch.as_deref(), Some("feature")); + assert_eq!( + git_info.repository_url.as_deref(), + Some("https://github.com/openai/codex") + ); + } + + #[tokio::test] + async fn update_thread_metadata_clears_git_info_fields() { + let home = TempDir::new().expect("temp dir"); + let config = test_config(home.path()); + let runtime = codex_state::StateRuntime::init( + config.sqlite_home.clone(), + config.default_model_provider_id.clone(), + ) + .await + .expect("state db should initialize"); + let store = LocalThreadStore::new(config.clone(), Some(runtime.clone())); + let uuid = Uuid::from_u128(311); + let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id"); + let path = + write_session_file(home.path(), "2025-01-03T18-00-00", uuid).expect("session file"); + + store + .update_thread_metadata(UpdateThreadMetadataParams { + thread_id, + patch: ThreadMetadataPatch { + git_info: Some(GitInfoPatch { + sha: Some(Some("abc123".to_string())), + branch: Some(Some("main".to_string())), + origin_url: Some(Some("https://github.com/openai/codex".to_string())), + }), + ..Default::default() + }, + include_archived: false, + }) + .await + .expect("seed git metadata"); + + let thread = store + .update_thread_metadata(UpdateThreadMetadataParams { + thread_id, + patch: ThreadMetadataPatch { + git_info: Some(GitInfoPatch { + sha: Some(None), + branch: Some(None), + origin_url: Some(None), + }), + ..Default::default() + }, + include_archived: false, + }) + .await + .expect("clear git metadata"); + + assert!(thread.git_info.is_none()); + let appended = last_rollout_item(path.as_path()); + assert_eq!(appended["type"], "session_meta"); + assert_eq!(appended["payload"]["git"], json!({})); + + codex_rollout::state_db::reconcile_rollout( + Some(runtime.as_ref()), + path.as_path(), + config.default_model_provider_id.as_str(), + /*builder*/ None, + &[], + /*archived_only*/ None, + /*new_thread_memory_mode*/ None, + ) + .await; + let thread = store + .read_thread(ReadThreadParams { + thread_id, + include_archived: false, + include_history: false, + }) + .await + .expect("read thread after reconcile"); + assert!(thread.git_info.is_none()); + + store + .update_thread_metadata(UpdateThreadMetadataParams { + thread_id, + patch: ThreadMetadataPatch { + memory_mode: Some(ThreadMemoryMode::Disabled), + ..Default::default() + }, + include_archived: false, + }) + .await + .expect("set memory mode after git clear"); + let appended = last_rollout_item(path.as_path()); + assert_eq!(appended["type"], "session_meta"); + assert_eq!(appended["payload"].get("git"), None); + codex_rollout::state_db::reconcile_rollout( + Some(runtime.as_ref()), + path.as_path(), + config.default_model_provider_id.as_str(), + /*builder*/ None, + &[], + /*archived_only*/ None, + /*new_thread_memory_mode*/ None, + ) + .await; + let thread = store + .read_thread(ReadThreadParams { + thread_id, + include_archived: false, + include_history: false, + }) + .await + .expect("read thread after memory mode update with no git"); + assert!(thread.git_info.is_none()); + + assert_eq!( + runtime + .delete_thread(thread_id) + .await + .expect("delete sqlite thread row"), + 1 + ); + let thread = store + .update_thread_metadata(UpdateThreadMetadataParams { + thread_id, + patch: ThreadMetadataPatch { + git_info: Some(GitInfoPatch { + branch: Some(Some("feature".to_string())), + ..Default::default() + }), + ..Default::default() + }, + include_archived: false, + }) + .await + .expect("partially update after clear with missing sqlite row"); + let git_info = thread.git_info.expect("branch should be present"); + assert_eq!(git_info.commit_hash, None); + assert_eq!(git_info.branch.as_deref(), Some("feature")); + assert_eq!(git_info.repository_url, None); + + store + .update_thread_metadata(UpdateThreadMetadataParams { + thread_id, + patch: ThreadMetadataPatch { + memory_mode: Some(ThreadMemoryMode::Disabled), + ..Default::default() + }, + include_archived: false, + }) + .await + .expect("set memory mode after git clear and partial update"); + let appended = last_rollout_item(path.as_path()); + assert_eq!(appended["type"], "session_meta"); + assert_eq!(appended["payload"].get("git"), None); + codex_rollout::state_db::reconcile_rollout( + Some(runtime.as_ref()), + path.as_path(), + config.default_model_provider_id.as_str(), + /*builder*/ None, + &[], + /*archived_only*/ None, + /*new_thread_memory_mode*/ None, + ) + .await; + let thread = store + .read_thread(ReadThreadParams { + thread_id, + include_archived: false, + include_history: false, + }) + .await + .expect("read thread after memory mode update"); + let git_info = thread.git_info.expect("branch should remain present"); + assert_eq!(git_info.commit_hash, None); + assert_eq!(git_info.branch.as_deref(), Some("feature")); + assert_eq!(git_info.repository_url, None); + } + #[tokio::test] async fn update_thread_metadata_rejects_mismatched_session_meta_id() { let home = TempDir::new().expect("temp dir");