Add thread metadata update endpoint to app server (#13280)

## Summary
- add the v2 `thread/metadata/update` API, including
protocol/schema/TypeScript exports and app-server docs
- patch stored thread `gitInfo` in sqlite without resuming the thread,
with validation plus support for explicit `null` clears
- repair missing sqlite thread rows from rollout data before patching,
and make those repairs safe by inserting only when absent and updating
only git columns so newer metadata is not clobbered
- keep sqlite authoritative for mutable thread git metadata by
preserving existing sqlite git fields during reconcile/backfill and only
using rollout `SessionMeta` git fields to fill gaps
- add regression coverage for the endpoint, repair paths, concurrent
sqlite writes, clearing git fields, and rollout/backfill reconciliation
- fix the login server shutdown race so cancelling before the waiter
starts still terminates `block_until_done()` correctly

## Testing
- `cargo test -p codex-state
apply_rollout_items_preserves_existing_git_branch_and_fills_missing_git_fields`
- `cargo test -p codex-state
update_thread_git_info_preserves_newer_non_git_metadata`
- `cargo test -p codex-core
backfill_sessions_preserves_existing_git_branch_and_fills_missing_git_fields`
- `cargo test -p codex-app-server thread_metadata_update`
- `cargo test`
- currently fails in existing `codex-core` grep-files tests with
`unsupported call: grep_files`:
    - `suite::grep_files::grep_files_tool_collects_matches`
    - `suite::grep_files::grep_files_tool_reports_empty_results`
This commit is contained in:
joeytrasatti-openai
2026-03-03 15:56:11 -08:00
committed by GitHub
parent 299b8ac445
commit 935754baa3
24 changed files with 3251 additions and 6 deletions

View File

@@ -110,6 +110,9 @@ use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadLoadedListParams;
use codex_app_server_protocol::ThreadLoadedListResponse;
use codex_app_server_protocol::ThreadMetadataGitInfoUpdateParams;
use codex_app_server_protocol::ThreadMetadataUpdateParams;
use codex_app_server_protocol::ThreadMetadataUpdateResponse;
use codex_app_server_protocol::ThreadNameUpdatedNotification;
use codex_app_server_protocol::ThreadReadParams;
use codex_app_server_protocol::ThreadReadResponse;
@@ -201,6 +204,7 @@ use codex_core::skills::remote::export_remote_skill;
use codex_core::skills::remote::list_remote_skills;
use codex_core::state_db::StateDbHandle;
use codex_core::state_db::get_state_db;
use codex_core::state_db::reconcile_rollout;
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
use codex_core::windows_sandbox::WindowsSandboxSetupMode as CoreWindowsSandboxSetupMode;
use codex_core::windows_sandbox::WindowsSandboxSetupRequest;
@@ -239,6 +243,8 @@ use codex_protocol::protocol::USER_MESSAGE_BEGIN;
use codex_protocol::user_input::MAX_USER_INPUT_TEXT_CHARS;
use codex_protocol::user_input::UserInput as CoreInputItem;
use codex_rmcp_client::perform_oauth_login_return_url;
use codex_state::StateRuntime;
use codex_state::ThreadMetadataBuilder;
use codex_state::log_db::LogDbLayer;
use codex_utils_json_to_toml::json_to_toml;
use std::collections::HashMap;
@@ -597,6 +603,10 @@ impl CodexMessageProcessor {
self.thread_set_name(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::ThreadMetadataUpdate { request_id, params } => {
self.thread_metadata_update(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::ThreadUnarchive { request_id, params } => {
self.thread_unarchive(to_connection_request_id(request_id), params)
.await;
@@ -1924,6 +1934,304 @@ impl CodexMessageProcessor {
.await;
}
async fn thread_metadata_update(
&self,
request_id: ConnectionRequestId,
params: ThreadMetadataUpdateParams,
) {
let ThreadMetadataUpdateParams {
thread_id,
git_info,
} = params;
let thread_uuid = match ThreadId::from_string(&thread_id) {
Ok(id) => id,
Err(err) => {
self.send_invalid_request_error(request_id, format!("invalid thread id: {err}"))
.await;
return;
}
};
let Some(ThreadMetadataGitInfoUpdateParams {
sha,
branch,
origin_url,
}) = git_info
else {
self.send_invalid_request_error(
request_id,
"gitInfo must include at least one field".to_string(),
)
.await;
return;
};
if sha.is_none() && branch.is_none() && origin_url.is_none() {
self.send_invalid_request_error(
request_id,
"gitInfo must include at least one field".to_string(),
)
.await;
return;
}
let loaded_thread = self.thread_manager.get_thread(thread_uuid).await.ok();
let mut state_db_ctx = loaded_thread.as_ref().and_then(|thread| thread.state_db());
if state_db_ctx.is_none() {
state_db_ctx = get_state_db(&self.config, None).await;
}
let Some(state_db_ctx) = state_db_ctx else {
self.send_internal_error(
request_id,
format!("sqlite state db unavailable for thread {thread_uuid}"),
)
.await;
return;
};
if let Err(error) = self
.ensure_thread_metadata_row_exists(thread_uuid, &state_db_ctx, loaded_thread.as_ref())
.await
{
self.outgoing.send_error(request_id, error).await;
return;
}
let git_sha = match sha {
Some(Some(sha)) => {
let sha = sha.trim().to_string();
if sha.is_empty() {
self.send_invalid_request_error(
request_id,
"gitInfo.sha must not be empty".to_string(),
)
.await;
return;
}
Some(Some(sha))
}
Some(None) => Some(None),
None => None,
};
let git_branch = match branch {
Some(Some(branch)) => {
let branch = branch.trim().to_string();
if branch.is_empty() {
self.send_invalid_request_error(
request_id,
"gitInfo.branch must not be empty".to_string(),
)
.await;
return;
}
Some(Some(branch))
}
Some(None) => Some(None),
None => None,
};
let git_origin_url = match origin_url {
Some(Some(origin_url)) => {
let origin_url = origin_url.trim().to_string();
if origin_url.is_empty() {
self.send_invalid_request_error(
request_id,
"gitInfo.originUrl must not be empty".to_string(),
)
.await;
return;
}
Some(Some(origin_url))
}
Some(None) => Some(None),
None => None,
};
let updated = match state_db_ctx
.update_thread_git_info(
thread_uuid,
git_sha.as_ref().map(|value| value.as_deref()),
git_branch.as_ref().map(|value| value.as_deref()),
git_origin_url.as_ref().map(|value| value.as_deref()),
)
.await
{
Ok(updated) => updated,
Err(err) => {
self.send_internal_error(
request_id,
format!("failed to update thread metadata for {thread_uuid}: {err}"),
)
.await;
return;
}
};
if !updated {
self.send_internal_error(
request_id,
format!("thread metadata disappeared before update completed: {thread_uuid}"),
)
.await;
return;
}
let Some(summary) =
read_summary_from_state_db_context_by_thread_id(Some(&state_db_ctx), thread_uuid).await
else {
self.send_internal_error(
request_id,
format!("failed to reload updated thread metadata for {thread_uuid}"),
)
.await;
return;
};
let mut thread = summary_to_thread(summary);
self.attach_thread_name(thread_uuid, &mut thread).await;
thread.status = resolve_thread_status(
self.thread_watch_manager
.loaded_status_for_thread(&thread.id)
.await,
false,
);
self.outgoing
.send_response(request_id, ThreadMetadataUpdateResponse { thread })
.await;
}
async fn ensure_thread_metadata_row_exists(
&self,
thread_uuid: ThreadId,
state_db_ctx: &Arc<StateRuntime>,
loaded_thread: Option<&Arc<CodexThread>>,
) -> Result<(), JSONRPCErrorError> {
fn invalid_request(message: String) -> JSONRPCErrorError {
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message,
data: None,
}
}
fn internal_error(message: String) -> JSONRPCErrorError {
JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message,
data: None,
}
}
match state_db_ctx.get_thread(thread_uuid).await {
Ok(Some(_)) => return Ok(()),
Ok(None) => {}
Err(err) => {
return Err(internal_error(format!(
"failed to load thread metadata for {thread_uuid}: {err}"
)));
}
}
if let Some(thread) = loaded_thread {
let Some(rollout_path) = thread.rollout_path() else {
return Err(invalid_request(format!(
"ephemeral thread does not support metadata updates: {thread_uuid}"
)));
};
reconcile_rollout(
Some(state_db_ctx),
rollout_path.as_path(),
self.config.model_provider_id.as_str(),
None,
&[],
None,
None,
)
.await;
match state_db_ctx.get_thread(thread_uuid).await {
Ok(Some(_)) => return Ok(()),
Ok(None) => {}
Err(err) => {
return Err(internal_error(format!(
"failed to load reconciled thread metadata for {thread_uuid}: {err}"
)));
}
}
let config_snapshot = thread.config_snapshot().await;
let model_provider = config_snapshot.model_provider_id.clone();
let mut builder = ThreadMetadataBuilder::new(
thread_uuid,
rollout_path,
Utc::now(),
config_snapshot.session_source.clone(),
);
builder.model_provider = Some(model_provider.clone());
builder.cwd = config_snapshot.cwd.clone();
builder.cli_version = Some(env!("CARGO_PKG_VERSION").to_string());
builder.sandbox_policy = config_snapshot.sandbox_policy.clone();
builder.approval_mode = config_snapshot.approval_policy;
let metadata = builder.build(model_provider.as_str());
if let Err(err) = state_db_ctx.insert_thread_if_absent(&metadata).await {
return Err(internal_error(format!(
"failed to create thread metadata for {thread_uuid}: {err}"
)));
}
return Ok(());
}
let rollout_path =
match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string())
.await
{
Ok(Some(path)) => path,
Ok(None) => match find_archived_thread_path_by_id_str(
&self.config.codex_home,
&thread_uuid.to_string(),
)
.await
{
Ok(Some(path)) => path,
Ok(None) => {
return Err(invalid_request(format!("thread not found: {thread_uuid}")));
}
Err(err) => {
return Err(internal_error(format!(
"failed to locate archived thread id {thread_uuid}: {err}"
)));
}
},
Err(err) => {
return Err(internal_error(format!(
"failed to locate thread id {thread_uuid}: {err}"
)));
}
};
reconcile_rollout(
Some(state_db_ctx),
rollout_path.as_path(),
self.config.model_provider_id.as_str(),
None,
&[],
None,
None,
)
.await;
match state_db_ctx.get_thread(thread_uuid).await {
Ok(Some(_)) => Ok(()),
Ok(None) => Err(internal_error(format!(
"failed to create thread metadata from rollout for {thread_uuid}"
))),
Err(err) => Err(internal_error(format!(
"failed to load reconciled thread metadata for {thread_uuid}: {err}"
))),
}
}
async fn thread_unarchive(
&mut self,
request_id: ConnectionRequestId,