Compare commits

...

1 Commits

Author SHA1 Message Date
miaolin
df769ef24b Persist search service browser state 2026-05-28 23:42:50 -07:00
19 changed files with 820 additions and 15 deletions

View File

@@ -2190,6 +2190,7 @@ mod tests {
sandbox_policy: SandboxPolicy::new_read_only_policy(),
token_usage: None,
first_user_message: Some("before rollback".to_string()),
search_service_browser_state: None,
history: Some(StoredThreadHistory {
thread_id,
items: history_items,

View File

@@ -415,6 +415,7 @@ mod thread_processor_behavior_tests {
sandbox_policy: SandboxPolicy::new_read_only_policy(),
token_usage: None,
first_user_message: Some("first user message".to_string()),
search_service_browser_state: None,
history: None,
};

View File

@@ -79,6 +79,7 @@ pub(crate) mod mentions {
}
mod sandbox_tags;
pub mod sandboxing;
mod search_service_browser_state;
mod session_prefix;
mod session_startup_prewarm;
mod shell_detect;

View File

@@ -18,6 +18,8 @@ use crate::hook_runtime::run_permission_request_hooks;
use crate::mcp_openai_file::rewrite_mcp_tool_arguments_for_openai_files;
use crate::mcp_tool_approval_templates::RenderedMcpToolApprovalParam;
use crate::mcp_tool_approval_templates::render_mcp_tool_approval_template;
use crate::search_service_browser_state;
use crate::search_service_browser_state::SEARCH_SERVICE_CONNECTOR_ID;
use crate::session::session::Session;
use crate::session::turn_context::TurnContext;
use crate::tools::hook_names::HookToolName;
@@ -74,6 +76,7 @@ use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_rmcp_client::ElicitationAction;
use codex_rmcp_client::ElicitationResponse;
use codex_rollout::state_db;
use codex_thread_store::ThreadMetadataPatch;
use codex_utils_absolute_path::AbsolutePathBuf;
use codex_utils_output_truncation::TruncationPolicy;
use codex_utils_output_truncation::truncate_text;
@@ -88,6 +91,7 @@ use tracing::Instrument;
use tracing::Span;
use tracing::error;
use tracing::field::Empty;
use tracing::warn;
use url::Url;
const MCP_CALL_COUNT_METRIC: &str = "codex.mcp.call";
@@ -555,6 +559,20 @@ async fn execute_mcp_tool_call(
) -> Result<CallToolResult, String> {
let request_meta =
with_mcp_tool_call_thread_id_meta(request_meta, &sess.conversation_id.to_string());
let is_search_service_call = is_search_service_mcp_tool_call(invocation, metadata);
let search_service_browser_state = if is_search_service_call {
read_search_service_browser_state(sess).await
} else {
None
};
let request_meta = if is_search_service_call {
search_service_browser_state::augment_request_meta(
request_meta,
search_service_browser_state.as_ref(),
)
} else {
request_meta
};
let request_meta = augment_mcp_tool_request_meta_with_sandbox_state(
sess,
turn_context,
@@ -584,7 +602,7 @@ async fn execute_mcp_tool_call(
.contains(&InputModality::Image),
Ok(result),
)?;
Ok(maybe_request_codex_apps_auth_elicitation(
let result = maybe_request_codex_apps_auth_elicitation(
sess,
turn_context,
call_id,
@@ -592,7 +610,78 @@ async fn execute_mcp_tool_call(
metadata,
result,
)
.await)
.await;
if is_search_service_call {
if let Some(new_state) =
search_service_browser_state::browser_state_from_tool_result(&result)
&& let Err(err) = persist_search_service_browser_state(
sess,
search_service_browser_state.as_ref(),
&new_state,
)
.await
{
warn!("failed to persist search service browser state: {err:#}");
}
}
Ok(result)
}
async fn read_search_service_browser_state(sess: &Session) -> Option<JsonValue> {
let live_thread = match sess.live_thread_for_persistence("read search service browser state") {
Ok(live_thread) => live_thread,
Err(err) => {
warn!("search service browser state unavailable: {err:#}");
return None;
}
};
match live_thread
.read_thread(
/*include_archived*/ false, /*include_history*/ false,
)
.await
{
Ok(thread) => thread.search_service_browser_state,
Err(err) => {
warn!("failed to read search service browser state: {err}");
None
}
}
}
async fn persist_search_service_browser_state(
sess: &Session,
previous_state: Option<&JsonValue>,
new_state: &JsonValue,
) -> anyhow::Result<()> {
let durable_state = read_search_service_browser_state(sess).await;
let Some(merged_state) = search_service_browser_state::merge_browser_states(
durable_state.as_ref().or(previous_state),
Some(new_state),
) else {
return Ok(());
};
let live_thread = sess.live_thread_for_persistence("persist search service browser state")?;
live_thread
.update_metadata(
ThreadMetadataPatch {
search_service_browser_state: Some(merged_state),
..Default::default()
},
/*include_archived*/ false,
)
.await?;
Ok(())
}
fn is_search_service_mcp_tool_call(
invocation: &McpInvocation,
metadata: Option<&McpToolApprovalMetadata>,
) -> bool {
invocation.tool == "web_run"
&& metadata
.and_then(|metadata| metadata.connector_id.as_deref())
.is_some_and(|connector_id| connector_id == SEARCH_SERVICE_CONNECTOR_ID)
}
async fn maybe_request_codex_apps_auth_elicitation(

View File

@@ -62,6 +62,7 @@ fn stored_thread(cwd: &str, title: &str, first_user_message: &str) -> StoredThre
sandbox_policy: SandboxPolicy::new_read_only_policy(),
token_usage: None,
first_user_message: Some(first_user_message.to_string()),
search_service_browser_state: None,
history: None,
}
}

View File

@@ -0,0 +1,585 @@
use codex_protocol::mcp::CallToolResult;
use serde_json::Map;
use serde_json::Value;
pub(crate) const SEARCH_SERVICE_CONNECTOR_ID: &str = "connector_openai_search_service";
const SEARCH_SERVICE_BROWSER_STATE_META_KEY: &str = "search_service_browser_state";
pub(crate) fn augment_request_meta(meta: Option<Value>, state: Option<&Value>) -> Option<Value> {
match state.and_then(normalize_browser_state) {
Some(state) => Some(insert_browser_state(meta, state)),
None => meta,
}
}
fn insert_browser_state(meta: Option<Value>, state: Value) -> Value {
match meta {
Some(Value::Object(mut map)) => {
map.insert(SEARCH_SERVICE_BROWSER_STATE_META_KEY.to_string(), state);
Value::Object(map)
}
None => {
let mut map = Map::new();
map.insert(SEARCH_SERVICE_BROWSER_STATE_META_KEY.to_string(), state);
Value::Object(map)
}
Some(other) => other,
}
}
pub(crate) fn browser_state_from_tool_result(result: &CallToolResult) -> Option<Value> {
let meta = result.meta.as_ref()?.as_object()?;
if let Some(state) = meta
.get(SEARCH_SERVICE_BROWSER_STATE_META_KEY)
.and_then(normalize_browser_state)
{
return Some(state);
}
let raw_messages = meta.get("raw_messages")?.as_array()?;
browser_state_from_raw_messages(raw_messages)
}
fn browser_state_from_raw_messages(raw_messages: &[Value]) -> Option<Value> {
let mut state = empty_browser_state();
for raw_message in raw_messages {
let Some(raw_message) = raw_message.as_object() else {
continue;
};
merge_raw_message_into_browser_state(&mut state, raw_message);
}
non_empty_browser_state(state)
}
fn normalize_browser_state(value: &Value) -> Option<Value> {
let value = value.as_object()?;
let mut state = empty_browser_state();
copy_latest_string(value, &mut state, "sonic_thread_id");
copy_latest_string(value, &mut state, "sonic_user_id");
copy_latest_i64(value, &mut state, "sonic_turn_index");
if let Some(precise_location_metadata) = value.get("precise_location_metadata") {
state.insert(
"precise_location_metadata".to_string(),
precise_location_metadata.clone(),
);
}
if let Some(external_browser_state) = value
.get("external_browser_state")
.and_then(Value::as_object)
{
if let Some(content) = external_browser_state
.get("content")
.and_then(Value::as_object)
{
external_browser_state_object_mut(&mut state, "content").extend(content.clone());
}
if let Some(link_urls) = external_browser_state
.get("link_urls")
.and_then(Value::as_object)
{
external_browser_state_object_mut(&mut state, "link_urls").extend(link_urls.clone());
}
}
for key in [
"external_urls",
"product_lookup_keys_by_ref_id",
"refid_randomizer_state",
"randomized_to_canonical_ref_ids",
] {
if let Some(object) = value.get(key).and_then(Value::as_object) {
state.insert(key.to_string(), Value::Object(object.clone()));
}
}
for key in [
"num_image_queries",
"num_product_queries",
"elapsed_tokens",
"elapsed_queries",
] {
copy_latest_i64(value, &mut state, key);
}
if let Some(product_queries) = value.get("product_queries").and_then(Value::as_array) {
state.insert(
"product_queries".to_string(),
Value::Array(product_queries.clone()),
);
}
for key in ["commerce_blocked_canvas", "image_group_shown"] {
if let Some(value) = value.get(key).and_then(Value::as_bool) {
state.insert(key.to_string(), Value::Bool(value));
}
}
let aliases = randomized_ref_aliases(
state
.get("refid_randomizer_state")
.and_then(Value::as_object),
);
object_mut(&mut state, "randomized_to_canonical_ref_ids").extend(aliases);
non_empty_browser_state(state)
}
pub(crate) fn merge_browser_states(
previous_state: Option<&Value>,
new_state: Option<&Value>,
) -> Option<Value> {
let mut merged = empty_browser_state();
for state in [
previous_state.and_then(normalize_browser_state),
new_state.and_then(normalize_browser_state),
]
.into_iter()
.flatten()
{
merge_browser_state(&mut merged, state.as_object()?);
}
non_empty_browser_state(merged)
}
fn merge_raw_message_into_browser_state(
state: &mut Map<String, Value>,
raw_message: &Map<String, Value>,
) {
let internal = raw_message
.get("metadata")
.and_then(Value::as_object)
.and_then(|metadata| metadata.get("__internal"))
.and_then(Value::as_object);
let Some(internal) = internal else {
merge_webpage_content_into_external_urls(state, raw_message);
return;
};
let browser_state = internal
.get("sonic_browser_tool")
.and_then(Value::as_object);
let Some(browser_state) = browser_state else {
merge_webpage_content_into_external_urls(state, raw_message);
return;
};
copy_latest_string(internal, state, "sonic_thread_id");
copy_latest_string(internal, state, "sonic_user_id");
copy_latest_i64(internal, state, "sonic_turn_index");
if let Some(precise_location_metadata) = internal.get("precise_location_metadata") {
state.insert(
"precise_location_metadata".to_string(),
precise_location_metadata.clone(),
);
}
for key in ["summary_content", "content", "content_to_persist"] {
if let Some(content) = browser_state.get(key).and_then(Value::as_object) {
external_browser_state_object_mut(state, "content").extend(content.clone());
}
}
if let Some(link_urls) = browser_state.get("link_urls").and_then(Value::as_object) {
external_browser_state_object_mut(state, "link_urls").extend(link_urls.clone());
}
if let Some(product_lookup_keys) = browser_state
.get("product_lookup_keys_by_ref_id")
.and_then(Value::as_object)
{
object_mut(state, "product_lookup_keys_by_ref_id").extend(product_lookup_keys.clone());
}
if let Some(refid_randomizer_state) = browser_state
.get("refid_randomizer_state")
.and_then(Value::as_object)
{
state.insert(
"refid_randomizer_state".to_string(),
Value::Object(refid_randomizer_state.clone()),
);
object_mut(state, "randomized_to_canonical_ref_ids")
.extend(randomized_ref_aliases(Some(refid_randomizer_state)));
}
for key in [
"num_image_queries",
"num_product_queries",
"elapsed_tokens",
"elapsed_queries",
] {
copy_latest_i64(browser_state, state, key);
}
if let Some(product_queries) = browser_state
.get("product_queries")
.and_then(Value::as_array)
{
state.insert(
"product_queries".to_string(),
Value::Array(product_queries.clone()),
);
}
for key in ["commerce_blocked_canvas", "image_group_shown"] {
if let Some(value) = browser_state.get(key).and_then(Value::as_bool) {
state.insert(key.to_string(), Value::Bool(value));
}
}
refresh_external_urls_from_content(state);
merge_webpage_content_into_external_urls(state, raw_message);
}
fn merge_browser_state(target: &mut Map<String, Value>, source: &Map<String, Value>) {
copy_latest_string(source, target, "sonic_thread_id");
copy_latest_string(source, target, "sonic_user_id");
copy_latest_i64(source, target, "sonic_turn_index");
if let Some(precise_location_metadata) = source.get("precise_location_metadata") {
target.insert(
"precise_location_metadata".to_string(),
precise_location_metadata.clone(),
);
}
if let Some(source_browser_state) = source
.get("external_browser_state")
.and_then(Value::as_object)
{
if let Some(content) = source_browser_state
.get("content")
.and_then(Value::as_object)
{
external_browser_state_object_mut(target, "content").extend(content.clone());
}
if let Some(link_urls) = source_browser_state
.get("link_urls")
.and_then(Value::as_object)
{
external_browser_state_object_mut(target, "link_urls").extend(link_urls.clone());
}
}
for key in [
"external_urls",
"product_lookup_keys_by_ref_id",
"randomized_to_canonical_ref_ids",
] {
if let Some(source_object) = source.get(key).and_then(Value::as_object) {
object_mut(target, key).extend(source_object.clone());
}
}
if let Some(refid_randomizer_state) = source
.get("refid_randomizer_state")
.and_then(Value::as_object)
.filter(|state| !state.is_empty())
{
target.insert(
"refid_randomizer_state".to_string(),
Value::Object(refid_randomizer_state.clone()),
);
}
for key in [
"num_image_queries",
"num_product_queries",
"elapsed_tokens",
"elapsed_queries",
] {
copy_latest_i64(source, target, key);
}
if let Some(product_queries) = source.get("product_queries").and_then(Value::as_array) {
target.insert(
"product_queries".to_string(),
Value::Array(product_queries.clone()),
);
}
for key in ["commerce_blocked_canvas", "image_group_shown"] {
if let Some(value) = source.get(key).and_then(Value::as_bool) {
target.insert(key.to_string(), Value::Bool(value));
}
}
}
fn empty_browser_state() -> Map<String, Value> {
Map::from_iter([
("sonic_thread_id".to_string(), Value::Null),
("sonic_user_id".to_string(), Value::Null),
("sonic_turn_index".to_string(), Value::Number(0.into())),
("precise_location_metadata".to_string(), Value::Null),
(
"external_browser_state".to_string(),
Value::Object(Map::from_iter([
("content".to_string(), Value::Object(Map::new())),
("link_urls".to_string(), Value::Object(Map::new())),
])),
),
("external_urls".to_string(), Value::Object(Map::new())),
(
"product_lookup_keys_by_ref_id".to_string(),
Value::Object(Map::new()),
),
(
"refid_randomizer_state".to_string(),
Value::Object(Map::new()),
),
(
"randomized_to_canonical_ref_ids".to_string(),
Value::Object(Map::new()),
),
("num_image_queries".to_string(), Value::Number(0.into())),
("num_product_queries".to_string(), Value::Number(0.into())),
("product_queries".to_string(), Value::Array(Vec::new())),
("commerce_blocked_canvas".to_string(), Value::Bool(false)),
("image_group_shown".to_string(), Value::Bool(false)),
("elapsed_tokens".to_string(), Value::Number(0.into())),
("elapsed_queries".to_string(), Value::Number(0.into())),
])
}
fn non_empty_browser_state(state: Map<String, Value>) -> Option<Value> {
let external_browser_state = state
.get("external_browser_state")
.and_then(Value::as_object);
let content = external_browser_state
.and_then(|state| state.get("content"))
.and_then(Value::as_object);
let link_urls = external_browser_state
.and_then(|state| state.get("link_urls"))
.and_then(Value::as_object);
let is_empty = [
content.is_none_or(Map::is_empty),
link_urls.is_none_or(Map::is_empty),
state
.get("external_urls")
.and_then(Value::as_object)
.is_none_or(Map::is_empty),
state
.get("product_lookup_keys_by_ref_id")
.and_then(Value::as_object)
.is_none_or(Map::is_empty),
state
.get("refid_randomizer_state")
.and_then(Value::as_object)
.is_none_or(Map::is_empty),
state
.get("randomized_to_canonical_ref_ids")
.and_then(Value::as_object)
.is_none_or(Map::is_empty),
state
.get("sonic_thread_id")
.and_then(Value::as_str)
.is_none(),
]
.into_iter()
.all(|empty| empty);
(!is_empty).then_some(Value::Object(state))
}
fn randomized_ref_aliases(state: Option<&Map<String, Value>>) -> Map<String, Value> {
let Some(refid_to_randomized_id) = state
.and_then(|state| state.get("refid_to_randomized_id"))
.and_then(Value::as_object)
else {
return Map::new();
};
refid_to_randomized_id
.iter()
.filter_map(|(canonical_ref_id, randomized_ref_id)| {
randomized_ref_id
.as_str()
.map(|randomized_ref_id| (randomized_ref_id.to_string(), canonical_ref_id.clone()))
})
.map(|(randomized_ref_id, canonical_ref_id)| {
(randomized_ref_id, Value::String(canonical_ref_id))
})
.collect()
}
fn refresh_external_urls_from_content(state: &mut Map<String, Value>) {
let content = state
.get("external_browser_state")
.and_then(Value::as_object)
.and_then(|state| state.get("content"))
.and_then(Value::as_object)
.cloned()
.unwrap_or_default();
for (ref_id, page) in content {
let Some(url) = page
.as_object()
.and_then(|page| page.get("url"))
.and_then(Value::as_str)
.filter(|url| !url.is_empty())
else {
continue;
};
object_mut(state, "external_urls").insert(ref_id, Value::String(url.to_string()));
}
}
fn merge_webpage_content_into_external_urls(
state: &mut Map<String, Value>,
raw_message: &Map<String, Value>,
) {
let content = raw_message.get("content").and_then(Value::as_object);
let Some((ref_id, url)) = content
.and_then(|content| {
Some((
content.get("ref_id")?.as_str()?,
content.get("url")?.as_str()?,
))
})
.filter(|(_, url)| !url.is_empty())
else {
return;
};
object_mut(state, "external_urls").insert(ref_id.to_string(), Value::String(url.to_string()));
}
fn copy_latest_string(source: &Map<String, Value>, target: &mut Map<String, Value>, key: &str) {
if let Some(value) = source.get(key).and_then(Value::as_str) {
target.insert(key.to_string(), Value::String(value.to_string()));
}
}
fn copy_latest_i64(source: &Map<String, Value>, target: &mut Map<String, Value>, key: &str) {
if let Some(value) = source.get(key).and_then(Value::as_i64) {
target.insert(key.to_string(), Value::Number(value.into()));
}
}
fn external_browser_state_object_mut<'a>(
state: &'a mut Map<String, Value>,
key: &str,
) -> &'a mut Map<String, Value> {
let external_browser_state = state
.entry("external_browser_state".to_string())
.or_insert_with(|| Value::Object(Map::new()));
if !external_browser_state.is_object() {
*external_browser_state = Value::Object(Map::new());
}
let Value::Object(external_browser_state) = external_browser_state else {
unreachable!("external_browser_state was normalized to an object");
};
let child = external_browser_state
.entry(key.to_string())
.or_insert_with(|| Value::Object(Map::new()));
if !child.is_object() {
*child = Value::Object(Map::new());
}
let Value::Object(child) = child else {
unreachable!("external_browser_state child was normalized to an object");
};
child
}
fn object_mut<'a>(state: &'a mut Map<String, Value>, key: &str) -> &'a mut Map<String, Value> {
let value = state
.entry(key.to_string())
.or_insert_with(|| Value::Object(Map::new()));
if !value.is_object() {
*value = Value::Object(Map::new());
}
let Value::Object(value) = value else {
unreachable!("browser state entry was normalized to an object");
};
value
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use serde_json::json;
#[test]
fn stores_browser_state_from_raw_messages_and_attaches_to_request_meta() {
let browser_state = browser_state_from_tool_result(&CallToolResult {
content: Vec::new(),
structured_content: None,
is_error: None,
meta: Some(json!({
"raw_messages": [{
"metadata": {
"__internal": {
"sonic_browser_tool": {
"content": {
"turn0search0": { "url": "https://example.com/" }
},
"link_urls": {
"turn0search0": {
"1": "https://example.com/read-more"
}
},
"refid_randomizer_state": {
"refid_to_randomized_id": {
"turn0search0": "turn123search0"
}
}
},
"sonic_thread_id": "thread-123",
"sonic_turn_index": 1,
"sonic_user_id": "user-123"
}
}
}]
})),
});
let browser_state = merge_browser_states(None, browser_state.as_ref());
assert_eq!(
augment_request_meta(
Some(json!({ "threadId": "thread-live" })),
browser_state.as_ref(),
),
Some(json!({
"threadId": "thread-live",
"search_service_browser_state": {
"sonic_thread_id": "thread-123",
"sonic_user_id": "user-123",
"sonic_turn_index": 1,
"precise_location_metadata": null,
"external_browser_state": {
"content": {
"turn0search0": { "url": "https://example.com/" }
},
"link_urls": {
"turn0search0": {
"1": "https://example.com/read-more"
}
}
},
"external_urls": { "turn0search0": "https://example.com/" },
"product_lookup_keys_by_ref_id": {},
"refid_randomizer_state": {
"refid_to_randomized_id": {
"turn0search0": "turn123search0"
}
},
"randomized_to_canonical_ref_ids": {
"turn123search0": "turn0search0"
},
"num_image_queries": 0,
"num_product_queries": 0,
"product_queries": [],
"commerce_blocked_canvas": false,
"image_group_shown": false,
"elapsed_tokens": 0,
"elapsed_queries": 0
}
}))
);
}
#[test]
fn preserves_request_meta_when_browser_state_is_missing() {
let request_meta = Some(json!({ "threadId": "thread-live" }));
assert_eq!(
augment_request_meta(request_meta.clone(), None),
request_meta
);
}
}

View File

@@ -261,7 +261,7 @@ pub(crate) async fn backfill_sessions_with_lease(
metadata.cwd = normalize_cwd_for_state_db(&metadata.cwd);
let memory_mode = outcome.memory_mode.unwrap_or_else(|| "enabled".to_string());
if let Ok(Some(existing_metadata)) = runtime.get_thread(metadata.id).await {
metadata.prefer_existing_git_info(&existing_metadata);
metadata.prefer_existing_sqlite_only_fields(&existing_metadata);
}
if rollout.archived && metadata.archived_at.is_none() {
let fallback_archived_at = metadata.updated_at;

View File

@@ -512,7 +512,7 @@ pub async fn reconcile_rollout(
let memory_mode = outcome.memory_mode.unwrap_or_else(|| "enabled".to_string());
metadata.cwd = normalize_cwd_for_state_db(&metadata.cwd);
if let Ok(Some(existing_metadata)) = ctx.get_thread(metadata.id).await {
metadata.prefer_existing_git_info(&existing_metadata);
metadata.prefer_existing_sqlite_only_fields(&existing_metadata);
}
match archived_only {
Some(true) if metadata.archived_at.is_none() => {

View File

@@ -0,0 +1 @@
ALTER TABLE threads ADD COLUMN search_service_browser_state TEXT;

View File

@@ -493,6 +493,7 @@ mod tests {
git_sha: None,
git_branch: None,
git_origin_url: None,
search_service_browser_state: None,
}
}

View File

@@ -107,6 +107,8 @@ pub struct ThreadMetadata {
pub git_branch: Option<String>,
/// The git origin URL, if known.
pub git_origin_url: Option<String>,
/// Search Service browser state needed by follow-up browse/open calls.
pub search_service_browser_state: Option<serde_json::Value>,
}
/// Builder data required to construct [`ThreadMetadata`] without parsing filenames.
@@ -148,6 +150,8 @@ pub struct ThreadMetadataBuilder {
pub git_branch: Option<String>,
/// The git origin URL, if known.
pub git_origin_url: Option<String>,
/// Search Service browser state needed by follow-up browse/open calls.
pub search_service_browser_state: Option<serde_json::Value>,
}
impl ThreadMetadataBuilder {
@@ -177,6 +181,7 @@ impl ThreadMetadataBuilder {
git_sha: None,
git_branch: None,
git_origin_url: None,
search_service_browser_state: None,
}
}
@@ -221,13 +226,14 @@ impl ThreadMetadataBuilder {
git_sha: self.git_sha.clone(),
git_branch: self.git_branch.clone(),
git_origin_url: self.git_origin_url.clone(),
search_service_browser_state: self.search_service_browser_state.clone(),
}
}
}
impl ThreadMetadata {
/// Preserve existing non-null Git fields when rollout-derived metadata is reconciled.
pub fn prefer_existing_git_info(&mut self, existing: &Self) {
/// Preserve existing SQLite-only fields when rollout-derived metadata is reconciled.
pub fn prefer_existing_sqlite_only_fields(&mut self, existing: &Self) {
if existing.git_sha.is_some() {
self.git_sha = existing.git_sha.clone();
}
@@ -237,6 +243,9 @@ impl ThreadMetadata {
if existing.git_origin_url.is_some() {
self.git_origin_url = existing.git_origin_url.clone();
}
if existing.search_service_browser_state.is_some() {
self.search_service_browser_state = existing.search_service_browser_state.clone();
}
}
/// Return the list of field names that differ between `self` and `other`.
@@ -311,6 +320,9 @@ impl ThreadMetadata {
if self.git_origin_url != other.git_origin_url {
diffs.push("git_origin_url");
}
if self.search_service_browser_state != other.search_service_browser_state {
diffs.push("search_service_browser_state");
}
diffs
}
}
@@ -345,6 +357,7 @@ pub(crate) struct ThreadRow {
git_sha: Option<String>,
git_branch: Option<String>,
git_origin_url: Option<String>,
search_service_browser_state: Option<String>,
}
impl ThreadRow {
@@ -374,6 +387,7 @@ impl ThreadRow {
git_sha: row.try_get("git_sha")?,
git_branch: row.try_get("git_branch")?,
git_origin_url: row.try_get("git_origin_url")?,
search_service_browser_state: row.try_get("search_service_browser_state")?,
})
}
}
@@ -407,11 +421,15 @@ impl TryFrom<ThreadRow> for ThreadMetadata {
git_sha,
git_branch,
git_origin_url,
search_service_browser_state,
} = row;
let thread_source = thread_source
.map(|thread_source| thread_source.parse())
.transpose()
.map_err(anyhow::Error::msg)?;
let search_service_browser_state = search_service_browser_state
.map(|value| serde_json::from_str(&value))
.transpose()?;
Ok(Self {
id: ThreadId::try_from(id)?,
rollout_path: PathBuf::from(rollout_path),
@@ -438,6 +456,7 @@ impl TryFrom<ThreadRow> for ThreadMetadata {
git_sha,
git_branch,
git_origin_url,
search_service_browser_state,
})
}
}
@@ -524,6 +543,7 @@ mod tests {
git_sha: None,
git_branch: None,
git_origin_url: None,
search_service_browser_state: None,
}
}
@@ -554,6 +574,7 @@ mod tests {
git_sha: None,
git_branch: None,
git_origin_url: None,
search_service_browser_state: None,
}
}

View File

@@ -190,7 +190,8 @@ SELECT
threads.archived_at,
threads.git_sha,
threads.git_branch,
threads.git_origin_url
threads.git_origin_url,
threads.search_service_browser_state
FROM threads
"#,
);
@@ -560,7 +561,8 @@ SELECT
threads.archived_at,
threads.git_sha,
threads.git_branch,
threads.git_origin_url
threads.git_origin_url,
threads.search_service_browser_state
FROM threads
WHERE threads.id = ? AND threads.memory_mode = 'enabled'
"#,

View File

@@ -67,5 +67,6 @@ pub(super) fn test_thread_metadata(
git_sha: None,
git_branch: None,
git_origin_url: None,
search_service_browser_state: None,
}
}

View File

@@ -31,7 +31,8 @@ SELECT
threads.archived_at,
threads.git_sha,
threads.git_branch,
threads.git_origin_url
threads.git_origin_url,
threads.search_service_browser_state
FROM threads
WHERE threads.id = ?
"#,
@@ -476,6 +477,7 @@ ON CONFLICT(child_thread_id) DO NOTHING
) -> anyhow::Result<bool> {
let updated_at = self.allocate_thread_updated_at(metadata.updated_at)?;
let preview = metadata_preview(metadata);
let search_service_browser_state = serialize_search_service_browser_state(metadata)?;
let result = sqlx::query(
r#"
INSERT INTO threads (
@@ -506,8 +508,9 @@ INSERT INTO threads (
git_sha,
git_branch,
git_origin_url,
search_service_browser_state,
memory_mode
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO NOTHING
"#,
)
@@ -547,6 +550,7 @@ ON CONFLICT(id) DO NOTHING
.bind(metadata.git_sha.as_deref())
.bind(metadata.git_branch.as_deref())
.bind(metadata.git_origin_url.as_deref())
.bind(search_service_browser_state.as_deref())
.bind("enabled")
.execute(self.pool.as_ref())
.await?;
@@ -679,6 +683,7 @@ WHERE id = ?
) -> anyhow::Result<()> {
let updated_at = self.allocate_thread_updated_at(metadata.updated_at)?;
let preview = metadata_preview(metadata);
let search_service_browser_state = serialize_search_service_browser_state(metadata)?;
// Backfill/reconcile callers merge existing git info before upserting, but that
// read/modify/write is not atomic. Preserve non-null SQLite git fields here so
// an explicit metadata update cannot be lost if a stale rollout upsert lands later.
@@ -712,8 +717,9 @@ INSERT INTO threads (
git_sha,
git_branch,
git_origin_url,
search_service_browser_state,
memory_mode
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
rollout_path = excluded.rollout_path,
created_at = excluded.created_at,
@@ -740,7 +746,8 @@ ON CONFLICT(id) DO UPDATE SET
archived_at = excluded.archived_at,
git_sha = COALESCE(threads.git_sha, excluded.git_sha),
git_branch = COALESCE(threads.git_branch, excluded.git_branch),
git_origin_url = COALESCE(threads.git_origin_url, excluded.git_origin_url)
git_origin_url = COALESCE(threads.git_origin_url, excluded.git_origin_url),
search_service_browser_state = COALESCE(excluded.search_service_browser_state, threads.search_service_browser_state)
"#,
)
.bind(metadata.id.to_string())
@@ -779,6 +786,7 @@ ON CONFLICT(id) DO UPDATE SET
.bind(metadata.git_sha.as_deref())
.bind(metadata.git_branch.as_deref())
.bind(metadata.git_origin_url.as_deref())
.bind(search_service_browser_state.as_deref())
.bind(creation_memory_mode.unwrap_or("enabled"))
.execute(self.pool.as_ref())
.await?;
@@ -807,7 +815,7 @@ ON CONFLICT(id) DO UPDATE SET
apply_rollout_item(&mut metadata, item, &self.default_provider);
}
if let Some(existing_metadata) = existing_metadata.as_ref() {
metadata.prefer_existing_git_info(existing_metadata);
metadata.prefer_existing_sqlite_only_fields(existing_metadata);
}
let updated_at = match updated_at_override {
Some(updated_at) => Some(updated_at),
@@ -942,7 +950,8 @@ SELECT
threads.archived_at,
threads.git_sha,
threads.git_branch,
threads.git_origin_url
threads.git_origin_url,
threads.search_service_browser_state
"#,
);
}
@@ -1091,6 +1100,17 @@ fn metadata_preview(metadata: &crate::ThreadMetadata) -> &str {
.unwrap_or_default()
}
fn serialize_search_service_browser_state(
metadata: &crate::ThreadMetadata,
) -> anyhow::Result<Option<String>> {
metadata
.search_service_browser_state
.as_ref()
.map(serde_json::to_string)
.transpose()
.map_err(Into::into)
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -368,6 +368,8 @@ fn stored_thread_from_state(
.unwrap_or_else(SandboxPolicy::new_read_only_policy),
token_usage: metadata.and_then(|metadata| metadata.token_usage.clone()),
first_user_message: metadata.and_then(|metadata| metadata.first_user_message.clone()),
search_service_browser_state: metadata
.and_then(|metadata| metadata.search_service_browser_state.clone()),
history,
})
}

View File

@@ -143,6 +143,7 @@ pub(super) fn stored_thread_from_rollout_item(
sandbox_policy: SandboxPolicy::new_read_only_policy(),
token_usage: None,
first_user_message: item.first_user_message,
search_service_browser_state: None,
history: None,
})
}

View File

@@ -57,6 +57,7 @@ pub(super) async fn read_thread(
rollout_thread.name = thread.name;
}
rollout_thread.git_info = thread.git_info;
rollout_thread.search_service_browser_state = thread.search_service_browser_state;
thread = rollout_thread;
}
attach_history_if_requested(&mut thread, params.include_history).await?;
@@ -123,6 +124,7 @@ pub(super) async fn read_thread_by_rollout_path(
metadata.git_branch.or(fallback_branch),
metadata.git_origin_url.or(fallback_origin_url),
);
thread.search_service_browser_state = metadata.search_service_browser_state;
}
attach_history_if_requested(&mut thread, include_history).await?;
Ok(thread)
@@ -321,6 +323,7 @@ async fn stored_thread_from_sqlite_metadata(
),
token_usage: None,
first_user_message: metadata.first_user_message,
search_service_browser_state: metadata.search_service_browser_state,
history: None,
}
}
@@ -380,6 +383,7 @@ fn stored_thread_from_meta_line(
sandbox_policy: SandboxPolicy::new_read_only_policy(),
token_usage: None,
first_user_message: None,
search_service_browser_state: None,
history: None,
}
}

View File

@@ -284,6 +284,9 @@ async fn apply_metadata_update(
if let Some(first_user_message) = patch.first_user_message {
metadata.first_user_message = Some(first_user_message);
}
if let Some(search_service_browser_state) = patch.search_service_browser_state {
metadata.search_service_browser_state = Some(search_service_browser_state);
}
if let Some(git_info) = patch.git_info {
let existing_git_info = git_info_from_parts(
metadata.git_sha.clone(),
@@ -323,6 +326,11 @@ async fn apply_metadata_update(
(true, Err(err)) => {
warn!("state db update_thread_metadata failed for {thread_id}: {err}");
}
(false, Ok(())) if require_sqlite_write => {
return Err(ThreadStoreError::Internal {
message: format!("sqlite state db unavailable for thread {thread_id}"),
});
}
(false, Ok(())) => {}
(false, Err(err)) if require_sqlite_write || !sqlite_write_error_is_best_effort(&err) => {
return Err(err);
@@ -359,7 +367,8 @@ fn sqlite_write_failure_should_block(patch: &ThreadMetadataPatch) -> bool {
// failure isolation so a corrupted optional state DB does not make JSONL transcript durability
// look broken. Explicit git-only updates still require SQLite because partial git patches need
// the existing SQLite value to preserve unspecified fields.
patch.git_info.is_some() && !has_observed_metadata_facts(patch)
patch.search_service_browser_state.is_some()
|| (patch.git_info.is_some() && !has_observed_metadata_facts(patch))
}
fn sqlite_write_error_is_best_effort(err: &ThreadStoreError) -> bool {
@@ -385,6 +394,7 @@ fn has_observed_metadata_facts(patch: &ThreadMetadataPatch) -> bool {
|| patch.sandbox_policy.is_some()
|| patch.token_usage.is_some()
|| patch.first_user_message.is_some()
|| patch.search_service_browser_state.is_some()
}
fn enum_to_string<T: serde::Serialize>(value: &T) -> String {
@@ -761,6 +771,54 @@ mod tests {
assert_eq!(memory_mode.as_deref(), Some("disabled"));
}
#[tokio::test]
async fn update_thread_metadata_persists_search_service_browser_state() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let uuid = Uuid::from_u128(313);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
write_session_file(home.path(), "2025-01-03T18-45-00", uuid).expect("session file");
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config, Some(runtime.clone()));
let browser_state = json!({
"sonic_thread_id": "sonic-thread-123",
"external_urls": {
"turn0search0": "https://example.com/"
}
});
let thread = store
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id,
patch: ThreadMetadataPatch {
search_service_browser_state: Some(browser_state.clone()),
..Default::default()
},
include_archived: false,
})
.await
.expect("persist search service browser state");
assert_eq!(
thread.search_service_browser_state.as_ref(),
Some(&browser_state)
);
let persisted = runtime
.get_thread(thread_id)
.await
.expect("read persisted metadata")
.expect("thread metadata");
assert_eq!(
persisted.search_service_browser_state.as_ref(),
Some(&browser_state)
);
}
#[tokio::test]
async fn update_thread_metadata_uses_live_rollout_path_for_external_resume() {
let home = TempDir::new().expect("temp dir");
@@ -1199,6 +1257,14 @@ mod tests {
}));
}
#[test]
fn sqlite_failures_block_for_search_service_browser_state_updates() {
assert!(sqlite_write_failure_should_block(&ThreadMetadataPatch {
search_service_browser_state: Some(json!({"sonic_thread_id": "sonic-thread-123"})),
..Default::default()
}));
}
#[tokio::test]
async fn metadata_patch_applies_title_over_existing_name() {
let home = TempDir::new().expect("temp dir");

View File

@@ -402,6 +402,8 @@ pub struct StoredThread {
pub token_usage: Option<TokenUsage>,
/// First user message observed for this thread, if any.
pub first_user_message: Option<String>,
/// Search Service browser state needed by follow-up browse/open calls.
pub search_service_browser_state: Option<serde_json::Value>,
/// Persisted history, populated only when requested.
pub history: Option<StoredThreadHistory>,
}
@@ -525,6 +527,8 @@ pub struct ThreadMetadataPatch {
pub token_usage: Option<TokenUsage>,
/// First user message observed for this thread.
pub first_user_message: Option<String>,
/// Search Service browser state needed by follow-up browse/open calls.
pub search_service_browser_state: Option<serde_json::Value>,
/// Git metadata patch.
pub git_info: Option<GitInfoPatch>,
/// Thread memory behavior.
@@ -598,6 +602,9 @@ impl ThreadMetadataPatch {
if next.first_user_message.is_some() {
self.first_user_message = next.first_user_message;
}
if next.search_service_browser_state.is_some() {
self.search_service_browser_state = next.search_service_browser_state;
}
if let Some(git_info) = next.git_info {
self.git_info
.get_or_insert_with(GitInfoPatch::default)
@@ -629,6 +636,7 @@ impl ThreadMetadataPatch {
&& self.sandbox_policy.is_none()
&& self.token_usage.is_none()
&& self.first_user_message.is_none()
&& self.search_service_browser_state.is_none()
&& self.git_info.is_none()
&& self.memory_mode.is_none()
}