[codex] Migrate thread turns list to thread store (#19280)

- migrate `thread/turns/list` to ThreadStore. Uses ThreadStore for most
data now but merges in the in-memory state from thread manager
- keep v2 `thread/list` pathless-store friendly by converting
`StoredThread` directly to API `Thread`
- add regression coverage for pathless store history/listing
This commit is contained in:
Tom
2026-04-30 14:16:42 -07:00
committed by GitHub
parent 9121132c8f
commit 127be0612c
12 changed files with 713 additions and 192 deletions

View File

@@ -28,6 +28,8 @@ use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
@@ -136,10 +138,35 @@ async fn thread_start_with_non_local_thread_store_does_not_create_local_persiste
})
.await??;
let response = client
.request(ClientRequest::ThreadList {
request_id: RequestId::Integer(3),
params: ThreadListParams {
cursor: None,
limit: Some(10),
sort_key: None,
sort_direction: None,
model_providers: Some(Vec::new()),
source_kinds: None,
archived: None,
cwd: None,
use_state_db_only: false,
search_term: None,
},
})
.await?
.expect("thread/list should succeed");
let ThreadListResponse { data, .. } =
serde_json::from_value(response).expect("thread/list response should parse");
assert_eq!(data.len(), 1);
assert_eq!(data[0].id, thread.id);
assert_eq!(data[0].path, None);
client.shutdown().await?;
let calls = thread_store.calls().await;
assert_eq!(calls.create_thread, 1);
assert_eq!(calls.list_threads, 1);
assert!(
calls.append_items > 0,
"turn/start should append rollout items through the injected store"

View File

@@ -5,6 +5,12 @@ use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::rollout_path;
use app_test_support::test_absolute_path;
use app_test_support::to_response;
use codex_app_server::in_process;
use codex_app_server::in_process::InProcessStartArgs;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::InitializeCapabilities;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
@@ -31,17 +37,37 @@ use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput;
use codex_arg0::Arg0DispatchPaths;
use codex_config::CloudRequirementsLoader;
use codex_config::LoaderOverrides;
use codex_core::ARCHIVED_SESSIONS_SUBDIR;
use codex_core::config::ConfigBuilder;
use codex_exec_server::EnvironmentManager;
use codex_feedback::CodexFeedback;
use codex_protocol::models::BaseInstructions;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::SessionSource as ProtocolSessionSource;
use codex_protocol::protocol::UserMessageEvent;
use codex_protocol::user_input::ByteRange;
use codex_protocol::user_input::TextElement;
use codex_thread_store::AppendThreadItemsParams;
use codex_thread_store::CreateThreadParams;
use codex_thread_store::InMemoryThreadStore;
use codex_thread_store::ThreadEventPersistenceMode;
use codex_thread_store::ThreadMetadataPatch;
use codex_thread_store::ThreadStore;
use codex_thread_store::UpdateThreadMetadataParams;
use core_test_support::responses;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::io::Write;
use std::path::Path;
use std::sync::Arc;
use tempfile::TempDir;
use tokio::time::timeout;
use uuid::Uuid;
#[cfg(windows)]
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(25);
@@ -246,6 +272,147 @@ async fn thread_turns_list_can_page_backward_and_forward() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_turns_list_reads_store_history_without_rollout_path() -> Result<()> {
let codex_home = TempDir::new()?;
let thread_id = codex_protocol::ThreadId::from_string("00000000-0000-4000-8000-000000000123")?;
let store_id = Uuid::new_v4().to_string();
create_config_toml_with_thread_store(codex_home.path(), &store_id)?;
let store = InMemoryThreadStore::for_id(store_id.clone());
let _in_memory_store = InMemoryThreadStoreId { store_id };
seed_pathless_store_thread(&store, thread_id).await?;
let loader_overrides = LoaderOverrides::without_managed_config_for_tests();
let config = ConfigBuilder::default()
.codex_home(codex_home.path().to_path_buf())
.fallback_cwd(Some(codex_home.path().to_path_buf()))
.loader_overrides(loader_overrides.clone())
.build()
.await?;
let client = in_process::start(InProcessStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config: Arc::new(config),
cli_overrides: Vec::new(),
loader_overrides,
cloud_requirements: CloudRequirementsLoader::default(),
thread_config_loader: Arc::new(codex_config::NoopThreadConfigLoader),
feedback: CodexFeedback::new(),
log_db: None,
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
config_warnings: Vec::new(),
session_source: SessionSource::Cli.into(),
enable_codex_api_key_env: false,
initialize: InitializeParams {
client_info: ClientInfo {
name: "codex-app-server-tests".to_string(),
title: None,
version: "0.1.0".to_string(),
},
capabilities: Some(InitializeCapabilities {
experimental_api: true,
..Default::default()
}),
},
channel_capacity: in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
})
.await?;
let result = client
.request(ClientRequest::ThreadTurnsList {
request_id: RequestId::Integer(1),
params: ThreadTurnsListParams {
thread_id: thread_id.to_string(),
cursor: None,
limit: Some(10),
sort_direction: Some(SortDirection::Asc),
},
})
.await?
.expect("thread/turns/list should succeed");
let ThreadTurnsListResponse { data, .. } = serde_json::from_value(result)?;
assert_eq!(turn_user_texts(&data), vec!["history from store"]);
client.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn thread_list_includes_store_thread_without_rollout_path() -> Result<()> {
let codex_home = TempDir::new()?;
let thread_id = codex_protocol::ThreadId::from_string("00000000-0000-4000-8000-000000000124")?;
let store_id = Uuid::new_v4().to_string();
create_config_toml_with_thread_store(codex_home.path(), &store_id)?;
let store = InMemoryThreadStore::for_id(store_id.clone());
let _in_memory_store = InMemoryThreadStoreId { store_id };
seed_pathless_store_thread(&store, thread_id).await?;
let loader_overrides = LoaderOverrides::without_managed_config_for_tests();
let config = ConfigBuilder::default()
.codex_home(codex_home.path().to_path_buf())
.fallback_cwd(Some(codex_home.path().to_path_buf()))
.loader_overrides(loader_overrides.clone())
.build()
.await?;
let client = in_process::start(InProcessStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config: Arc::new(config),
cli_overrides: Vec::new(),
loader_overrides,
cloud_requirements: CloudRequirementsLoader::default(),
thread_config_loader: Arc::new(codex_config::NoopThreadConfigLoader),
feedback: CodexFeedback::new(),
log_db: None,
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
config_warnings: Vec::new(),
session_source: SessionSource::Cli.into(),
enable_codex_api_key_env: false,
initialize: InitializeParams {
client_info: ClientInfo {
name: "codex-app-server-tests".to_string(),
title: None,
version: "0.1.0".to_string(),
},
capabilities: Some(InitializeCapabilities {
experimental_api: true,
..Default::default()
}),
},
channel_capacity: in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
})
.await?;
let result = client
.request(ClientRequest::ThreadList {
request_id: RequestId::Integer(1),
params: ThreadListParams {
cursor: None,
limit: Some(10),
sort_key: None,
sort_direction: None,
model_providers: Some(Vec::new()),
source_kinds: None,
archived: None,
cwd: None,
use_state_db_only: false,
search_term: None,
},
})
.await?
.expect("thread/list should succeed");
let ThreadListResponse { data, .. } = serde_json::from_value(result)?;
assert_eq!(data.len(), 1);
let thread = &data[0];
assert_eq!(thread.id, thread_id.to_string());
assert_eq!(thread.path, None);
assert_eq!(thread.preview, "");
assert_eq!(thread.name.as_deref(), Some("named pathless thread"));
client.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn thread_read_can_return_archived_threads_by_id() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
@@ -670,6 +837,59 @@ async fn thread_read_include_turns_rejects_unmaterialized_loaded_thread() -> Res
Ok(())
}
#[tokio::test]
async fn thread_turns_list_rejects_unmaterialized_loaded_thread() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let thread_path = thread.path.clone().expect("thread path");
assert!(
!thread_path.exists(),
"fresh thread rollout should not be materialized yet"
);
let read_id = mcp
.send_thread_turns_list_request(ThreadTurnsListParams {
thread_id: thread.id,
cursor: None,
limit: None,
sort_direction: None,
})
.await?;
let read_err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(read_id)),
)
.await??;
assert!(
read_err
.error
.message
.contains("thread/turns/list is unavailable before first user message"),
"unexpected error: {}",
read_err.error.message
);
Ok(())
}
#[tokio::test]
async fn thread_read_reports_system_error_idle_flag_after_failed_turn() -> Result<()> {
let server = responses::start_mock_server().await;
@@ -787,6 +1007,84 @@ fn turn_user_texts(turns: &[codex_app_server_protocol::Turn]) -> Vec<&str> {
.collect()
}
struct InMemoryThreadStoreId {
store_id: String,
}
impl Drop for InMemoryThreadStoreId {
fn drop(&mut self) {
InMemoryThreadStore::remove_id(&self.store_id);
}
}
async fn seed_pathless_store_thread(
store: &InMemoryThreadStore,
thread_id: codex_protocol::ThreadId,
) -> Result<()> {
store
.create_thread(CreateThreadParams {
thread_id,
forked_from_id: None,
source: ProtocolSessionSource::Cli,
base_instructions: BaseInstructions::default(),
dynamic_tools: Vec::new(),
event_persistence_mode: ThreadEventPersistenceMode::default(),
})
.await?;
store
.append_items(AppendThreadItemsParams {
thread_id,
items: store_history_items(),
})
.await?;
store
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id,
patch: ThreadMetadataPatch {
name: Some("named pathless thread".to_string()),
..Default::default()
},
include_archived: true,
})
.await?;
Ok(())
}
fn store_history_items() -> Vec<RolloutItem> {
vec![RolloutItem::EventMsg(EventMsg::UserMessage(
UserMessageEvent {
message: "history from store".to_string(),
images: None,
local_images: Vec::new(),
text_elements: Vec::new(),
},
))]
}
fn create_config_toml_with_thread_store(codex_home: &Path, store_id: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
experimental_thread_store = {{ type = "in_memory", id = "{store_id}" }}
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "http://127.0.0.1:1/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");