Compare commits

...

1 Commits

Author SHA1 Message Date
pakrym-oai
176d2da040 Revert blocking state DB initialization 2026-05-06 21:59:37 -07:00
42 changed files with 675 additions and 545 deletions

View File

@@ -951,7 +951,7 @@ mod tests {
use codex_app_server_protocol::ToolRequestUserInputParams;
use codex_app_server_protocol::ToolRequestUserInputQuestion;
use codex_core::config::ConfigBuilder;
use codex_core::init_state_db_from_config;
use codex_core::init_state_db;
use futures::SinkExt;
use futures::StreamExt;
use pretty_assertions::assert_eq;
@@ -1017,7 +1017,7 @@ mod tests {
) -> TestClient {
let codex_home = TempDir::new().expect("temp dir");
let config = Arc::new(build_test_config_for_codex_home(codex_home.path()).await);
let state_db = init_state_db_from_config(config.as_ref())
let state_db = init_state_db(config.as_ref())
.await
.expect("state db should initialize for in-process test");
let client = InProcessAppServerClient::start(InProcessClientStartArgs {

View File

@@ -82,13 +82,13 @@ use codex_config::CloudRequirementsLoader;
use codex_config::LoaderOverrides;
use codex_config::ThreadConfigLoader;
use codex_core::config::Config;
use codex_core::init_state_db_from_config;
use codex_core::init_state_db;
use codex_core::resolve_installation_id;
use codex_exec_server::EnvironmentManager;
use codex_feedback::CodexFeedback;
use codex_login::AuthManager;
use codex_protocol::protocol::SessionSource;
use codex_rollout::state_db::StateDbHandle;
pub use codex_rollout::StateDbHandle;
pub use codex_state::log_db::LogDbLayer;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
@@ -129,7 +129,7 @@ pub struct InProcessStartArgs {
pub feedback: CodexFeedback,
/// SQLite tracing layer used to flush recently emitted logs before feedback upload.
pub log_db: Option<LogDbLayer>,
/// Optional state DB handle to use for the in-process runtime.
/// Process-wide SQLite state handle shared with embedded app-server consumers.
pub state_db: Option<StateDbHandle>,
/// Environment manager used by core execution and filesystem operations.
pub environment_manager: Arc<EnvironmentManager>,
@@ -370,7 +370,7 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
let channel_capacity = args.channel_capacity.max(1);
let state_db = match args.state_db.clone() {
Some(state_db) => Some(state_db),
None => init_state_db_from_config(args.config.as_ref()).await,
None => init_state_db(args.config.as_ref()).await,
};
let installation_id = resolve_installation_id(&args.config.codex_home).await?;
let (client_tx, mut client_rx) = mpsc::channel::<InProcessClientMessage>(channel_capacity);
@@ -421,12 +421,6 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
);
let (processor_tx, mut processor_rx) = mpsc::channel::<ProcessorCommand>(channel_capacity);
let mut processor_handle = tokio::spawn(async move {
let Some(state_db) = state_db else {
warn!(
"in-process app-server state db initialization failed; shutting down processor task"
);
return;
};
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing: Arc::clone(&processor_outgoing),
analytics_events_client,
@@ -436,7 +430,7 @@ async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClie
environment_manager: args.environment_manager,
feedback: args.feedback,
log_db: args.log_db,
state_db,
state_db: state_db.clone(),
config_warnings: args.config_warnings,
session_source: args.session_source,
auth_manager,
@@ -775,7 +769,7 @@ mod tests {
) -> InProcessClientHandle {
let codex_home = TempDir::new().expect("temp dir");
let config = Arc::new(build_test_config(codex_home.path()).await);
let state_db = init_state_db_from_config(config.as_ref())
let state_db = codex_rollout::state_db::try_init(config.as_ref())
.await
.expect("state db should initialize for in-process test");
let args = InProcessStartArgs {
@@ -833,7 +827,7 @@ mod tests {
}
#[tokio::test]
async fn in_process_allows_device_key_requests_to_reach_device_key_api() {
async fn in_process_allows_device_key_requests_to_reach_device_key_processor() {
let client = start_test_client(SessionSource::Cli).await;
const MALFORMED_KEY_ID_MESSAGE: &str = concat!(
"invalid device key payload: keyId must be dk_hse_, dk_tpm_, or dk_osn_ ",

View File

@@ -51,11 +51,11 @@ use codex_config::TextRange as CoreTextRange;
use codex_core::ExecPolicyError;
use codex_core::check_execpolicy_for_warnings;
use codex_core::config::find_codex_home;
use codex_core::init_state_db_from_config;
use codex_exec_server::EnvironmentManager;
use codex_exec_server::ExecServerRuntimePaths;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;
use codex_rollout::state_db as rollout_state_db;
use codex_state::log_db;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
@@ -489,9 +489,9 @@ pub async fn run_main_with_transport_options(
}
};
let state_db = init_state_db_from_config(&config)
.await
.ok_or_else(|| std::io::Error::other("failed to initialize sqlite state db"))?;
let state_db_result = rollout_state_db::try_init(&config).await;
let state_db_init_error = state_db_result.as_ref().err().map(ToString::to_string);
let state_db = state_db_result.ok();
if should_run_personality_migration {
let effective_toml = config.config_layer_stack.effective_config();
@@ -600,12 +600,10 @@ pub async fn run_main_with_transport_options(
let feedback_layer = feedback.logger_layer();
let feedback_metadata_layer = feedback.metadata_layer();
let log_db = log_db::start(state_db.clone());
let log_db_layer = Some(
log_db
.clone()
.with_filter(Targets::new().with_default(Level::TRACE)),
);
let log_db = state_db.clone().map(log_db::start);
let log_db_layer = log_db
.clone()
.map(|layer| layer.with_filter(Targets::new().with_default(Level::TRACE)));
let otel_logger_layer = otel.as_ref().and_then(|o| o.logger_layer());
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());
let _ = tracing_subscriber::registry()
@@ -622,6 +620,9 @@ pub async fn run_main_with_transport_options(
None => error!("{}", warning.summary),
}
}
if let Some(err) = &state_db_init_error {
error!("failed to initialize sqlite state db: {err}");
}
let installation_id = resolve_installation_id(&config.codex_home).await?;
let transport_shutdown_token = CancellationToken::new();
let mut transport_accept_handles = Vec::<JoinHandle<()>>::new();
@@ -667,17 +668,25 @@ pub async fn run_main_with_transport_options(
let auth_manager =
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;
let remote_control_enabled = config.features.enabled(Feature::RemoteControl);
let remote_control_config_enabled = config.features.enabled(Feature::RemoteControl);
let remote_control_enabled = remote_control_config_enabled && state_db.is_some();
if remote_control_config_enabled && state_db.is_none() {
error!("remote control disabled because sqlite state db is unavailable");
}
if transport_accept_handles.is_empty() && !remote_control_enabled {
return Err(std::io::Error::new(
ErrorKind::InvalidInput,
"no transport configured; use --listen or enable remote control",
if remote_control_config_enabled && state_db.is_none() {
"no transport configured; remote control disabled because sqlite state db is unavailable"
} else {
"no transport configured; use --listen or enable remote control"
},
));
}
let (remote_control_accept_handle, remote_control_handle) = start_remote_control(
config.chatgpt_base_url.clone(),
Some(state_db.clone()),
state_db.clone(),
auth_manager.clone(),
transport_event_tx.clone(),
transport_shutdown_token.clone(),
@@ -761,7 +770,7 @@ pub async fn run_main_with_transport_options(
config_manager,
environment_manager,
feedback: feedback.clone(),
log_db: Some(log_db),
log_db,
state_db: state_db.clone(),
config_warnings,
session_source,

View File

@@ -108,9 +108,8 @@ mod tests {
use codex_config::ThreadConfigLoadErrorCode;
use codex_config::ThreadConfigLoader;
use codex_config::ThreadConfigSource;
use codex_core::agent_graph_store_from_state_db;
use codex_core::config::ConfigOverrides;
use codex_core::init_state_db_from_config;
use codex_core::init_state_db;
use codex_core::thread_store_from_config;
use codex_exec_server::EnvironmentManager;
use codex_login::AuthManager;
@@ -175,21 +174,19 @@ mod tests {
.await?;
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("dummy"));
let state_db = init_state_db_from_config(&good_config)
let state_db = init_state_db(&good_config)
.await
.expect("refresh tests require state db");
let thread_store = thread_store_from_config(&good_config, state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
let thread_store = thread_store_from_config(&good_config, Some(state_db.clone()));
let thread_manager = Arc::new(ThreadManager::new(
&good_config,
auth_manager,
SessionSource::Exec,
Arc::new(EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
"11111111-1111-4111-8111-111111111111".to_string(),
Some(state_db),
));
thread_manager.start_thread(good_config).await?;
thread_manager.start_thread(bad_config).await?;

View File

@@ -61,7 +61,6 @@ use codex_app_server_protocol::experimental_required_message;
use codex_arg0::Arg0DispatchPaths;
use codex_chatgpt::workspace_settings;
use codex_core::ThreadManager;
use codex_core::agent_graph_store_from_state_db;
use codex_core::config::Config;
use codex_core::thread_store_from_config;
use codex_exec_server::EnvironmentManager;
@@ -255,7 +254,7 @@ pub(crate) struct MessageProcessorArgs {
pub(crate) environment_manager: Arc<EnvironmentManager>,
pub(crate) feedback: CodexFeedback,
pub(crate) log_db: Option<LogDbLayer>,
pub(crate) state_db: StateDbHandle,
pub(crate) state_db: Option<StateDbHandle>,
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
pub(crate) session_source: SessionSource,
pub(crate) auth_manager: Arc<AuthManager>,
@@ -294,17 +293,15 @@ impl MessageProcessor {
// affect per-thread behavior, but they must not move newly started,
// resumed, or forked threads to a different persistence backend/root.
let thread_store = thread_store_from_config(config.as_ref(), state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
let thread_manager = Arc::new(ThreadManager::new(
config.as_ref(),
auth_manager.clone(),
session_source,
environment_manager,
Some(analytics_events_client.clone()),
state_db.clone(),
Arc::clone(&thread_store),
agent_graph_store.clone(),
installation_id,
state_db.clone(),
));
thread_manager
.plugins_manager()
@@ -350,7 +347,7 @@ impl MessageProcessor {
Arc::clone(&config),
feedback,
log_db,
Some(state_db.clone()),
state_db.clone(),
);
let git_processor = GitRequestProcessor::new();
let initialize_processor = InitializeRequestProcessor::new(
@@ -400,7 +397,7 @@ impl MessageProcessor {
thread_watch_manager.clone(),
Arc::clone(&thread_list_state_permit),
thread_goal_processor.clone(),
Some(state_db.clone()),
state_db.clone(),
);
let turn_processor = TurnRequestProcessor::new(
auth_manager.clone(),

View File

@@ -32,7 +32,6 @@ use codex_config::CloudRequirementsLoader;
use codex_config::LoaderOverrides;
use codex_core::config::Config;
use codex_core::config::ConfigBuilder;
use codex_core::init_state_db_from_config;
use codex_exec_server::EnvironmentManager;
use codex_feedback::CodexFeedback;
use codex_login::AuthManager;
@@ -282,9 +281,6 @@ async fn build_test_processor(
outgoing_tx,
analytics_events_client.clone(),
));
let state_db = init_state_db_from_config(config.as_ref())
.await
.expect("tracing test processor requires state db");
let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs {
outgoing,
analytics_events_client,
@@ -294,7 +290,7 @@ async fn build_test_processor(
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
feedback: CodexFeedback::new(),
log_db: None,
state_db,
state_db: None,
config_warnings: Vec::new(),
session_source: SessionSource::VSCode,
auth_manager,

View File

@@ -33,8 +33,8 @@ use codex_device_key::RemoteControlClientConnectionAudience;
use codex_device_key::RemoteControlClientConnectionSignPayload;
use codex_device_key::RemoteControlClientEnrollmentAudience;
use codex_device_key::RemoteControlClientEnrollmentSignPayload;
use codex_rollout::state_db::StateDbHandle;
use codex_state::DeviceKeyBindingRecord;
use codex_state::StateRuntime;
#[derive(Clone)]
pub(crate) struct DeviceKeyRequestProcessor {
@@ -43,7 +43,10 @@ pub(crate) struct DeviceKeyRequestProcessor {
}
impl DeviceKeyRequestProcessor {
pub(crate) fn new(outgoing: Arc<OutgoingMessageSender>, state_db: StateDbHandle) -> Self {
pub(crate) fn new(
outgoing: Arc<OutgoingMessageSender>,
state_db: Option<Arc<StateRuntime>>,
) -> Self {
Self {
outgoing,
store: DeviceKeyStore::new(Arc::new(StateDeviceKeyBindingStore::new(state_db))),
@@ -167,18 +170,25 @@ async fn sign_device_key(
}
struct StateDeviceKeyBindingStore {
state_db: StateDbHandle,
state_db: Option<Arc<StateRuntime>>,
}
impl StateDeviceKeyBindingStore {
fn new(state_db: StateDbHandle) -> Self {
fn new(state_db: Option<Arc<StateRuntime>>) -> Self {
Self { state_db }
}
async fn state_db(&self) -> Result<Arc<StateRuntime>, DeviceKeyError> {
self.state_db
.clone()
.ok_or_else(|| DeviceKeyError::Platform("sqlite state db unavailable".to_string()))
}
}
impl fmt::Debug for StateDeviceKeyBindingStore {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("StateDeviceKeyBindingStore")
.field("has_state_db", &self.state_db.is_some())
.finish_non_exhaustive()
}
}
@@ -186,7 +196,7 @@ impl fmt::Debug for StateDeviceKeyBindingStore {
#[async_trait]
impl DeviceKeyBindingStore for StateDeviceKeyBindingStore {
async fn get_binding(&self, key_id: &str) -> Result<Option<DeviceKeyBinding>, DeviceKeyError> {
let state_db = self.state_db.clone();
let state_db = self.state_db().await?;
state_db
.get_device_key_binding(key_id)
.await
@@ -204,7 +214,7 @@ impl DeviceKeyBindingStore for StateDeviceKeyBindingStore {
key_id: &str,
binding: &DeviceKeyBinding,
) -> Result<(), DeviceKeyError> {
let state_db = self.state_db.clone();
let state_db = self.state_db().await?;
state_db
.upsert_device_key_binding(&DeviceKeyBindingRecord {
key_id: key_id.to_string(),

View File

@@ -7,7 +7,7 @@ pub(crate) struct ThreadGoalRequestProcessor {
outgoing: Arc<OutgoingMessageSender>,
config: Arc<Config>,
thread_state_manager: ThreadStateManager,
state_db: StateDbHandle,
state_db: Option<StateDbHandle>,
}
impl ThreadGoalRequestProcessor {
@@ -16,7 +16,7 @@ impl ThreadGoalRequestProcessor {
outgoing: Arc<OutgoingMessageSender>,
config: Arc<Config>,
thread_state_manager: ThreadStateManager,
state_db: StateDbHandle,
state_db: Option<StateDbHandle>,
) -> Self {
Self {
thread_manager,
@@ -72,6 +72,23 @@ impl ThreadGoalRequestProcessor {
}
}
pub(crate) async fn pending_resume_goal_state(
&self,
thread: &CodexThread,
) -> (bool, Option<StateDbHandle>) {
let emit_thread_goal_update = self.config.features.enabled(Feature::Goals);
let thread_goal_state_db = if emit_thread_goal_update {
if let Some(state_db) = thread.state_db() {
Some(state_db)
} else {
self.state_db.clone()
}
} else {
None
};
(emit_thread_goal_update, thread_goal_state_db)
}
async fn thread_goal_set_inner(
&self,
request_id: ConnectionRequestId,
@@ -93,7 +110,7 @@ impl ThreadGoalRequestProcessor {
None => find_thread_path_by_id_str(
&self.config.codex_home,
&thread_id.to_string(),
Some(self.state_db.as_ref()),
self.state_db.as_deref(),
)
.await
.map_err(|err| {
@@ -258,7 +275,7 @@ impl ThreadGoalRequestProcessor {
None => find_thread_path_by_id_str(
&self.config.codex_home,
&thread_id.to_string(),
Some(self.state_db.as_ref()),
self.state_db.as_deref(),
)
.await
.map_err(|err| {
@@ -322,7 +339,7 @@ impl ThreadGoalRequestProcessor {
find_thread_path_by_id_str(
&self.config.codex_home,
&thread_id.to_string(),
Some(self.state_db.as_ref()),
self.state_db.as_deref(),
)
.await
.map_err(|err| {
@@ -331,7 +348,9 @@ impl ThreadGoalRequestProcessor {
.ok_or_else(|| invalid_request(format!("thread not found: {thread_id}")))?;
}
Ok(self.state_db.clone())
self.state_db
.clone()
.ok_or_else(|| internal_error("sqlite state db unavailable for thread goals"))
}
async fn emit_thread_goal_snapshot(&self, thread_id: ThreadId) {

View File

@@ -2671,10 +2671,10 @@ impl ThreadRequestProcessor {
)));
};
let emit_thread_goal_update = self.config.features.enabled(Feature::Goals);
let thread_goal_state_db = emit_thread_goal_update
.then(|| self.state_db.clone())
.flatten();
let (emit_thread_goal_update, thread_goal_state_db) = self
.thread_goal_processor
.pending_resume_goal_state(existing_thread.as_ref())
.await;
let command = crate::thread_state::ThreadListenerCommand::SendThreadResumeResponse(
Box::new(crate::thread_state::PendingThreadResumeRequest {

View File

@@ -31,7 +31,6 @@ pub use codex_core::StartThreadOptions;
pub use codex_core::StateDbHandle;
pub use codex_core::ThreadManager;
pub use codex_core::ThreadShutdownReport;
pub use codex_core::agent_graph_store_from_state_db;
pub use codex_core::config::Config;
pub use codex_core::config::Constrained;
pub use codex_core::config::GhostSnapshotConfig;
@@ -41,7 +40,6 @@ pub use codex_core::config::TerminalResizeReflowConfig;
pub use codex_core::config::ThreadStoreConfig;
pub use codex_core::config::find_codex_home;
pub use codex_core::init_state_db;
pub use codex_core::init_state_db_from_config;
pub use codex_core::resolve_installation_id;
pub use codex_core::skills::SkillsManager;
pub use codex_core::thread_store_from_config;

View File

@@ -31,6 +31,7 @@ use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::ThreadSource;
use codex_protocol::protocol::TurnEnvironmentSelection;
use codex_protocol::user_input::UserInput;
use codex_state::DirectionalThreadSpawnEdgeStatus;
use codex_thread_store::ReadThreadParams;
use serde::Serialize;
use std::collections::HashMap;
@@ -310,6 +311,7 @@ impl AgentControl {
state.notify_thread_created(new_thread.thread_id);
self.persist_thread_spawn_edge_for_source(
new_thread.thread.as_ref(),
new_thread.thread_id,
notification_source.as_ref(),
)
@@ -459,14 +461,19 @@ impl AgentControl {
))
.await?;
let state = self.upgrade()?;
let agent_graph_store = state.agent_graph_store();
let Ok(resumed_thread) = state.get_thread(resumed_thread_id).await else {
return Ok(resumed_thread_id);
};
let Some(state_db_ctx) = resumed_thread.state_db() else {
return Ok(resumed_thread_id);
};
let mut resume_queue = VecDeque::from([(thread_id, root_depth)]);
while let Some((parent_thread_id, parent_depth)) = resume_queue.pop_front() {
let child_ids = match agent_graph_store
.list_thread_spawn_children(
let child_ids = match state_db_ctx
.list_thread_spawn_children_with_status(
parent_thread_id,
Some(codex_agent_graph_store::ThreadSpawnEdgeStatus::Open),
DirectionalThreadSpawnEdgeStatus::Open,
)
.await
{
@@ -530,6 +537,7 @@ impl AgentControl {
let _ = config.features.disable(Feature::Collab);
}
let state = self.upgrade()?;
let state_db_ctx = state.state_db();
let mut reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
let (session_source, agent_metadata) = match session_source {
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
@@ -539,11 +547,14 @@ impl AgentControl {
agent_role: _,
agent_nickname: _,
}) => {
let state_db_ctx = state.state_db();
let (resumed_agent_nickname, resumed_agent_role) =
match state_db_ctx.get_thread(thread_id).await {
Ok(Some(metadata)) => (metadata.agent_nickname, metadata.agent_role),
Ok(None) | Err(_) => (None, None),
if let Some(state_db_ctx) = state_db_ctx.as_ref() {
match state_db_ctx.get_thread(thread_id).await {
Ok(Some(metadata)) => (metadata.agent_nickname, metadata.agent_role),
Ok(None) | Err(_) => (None, None),
}
} else {
(None, None)
};
self.prepare_thread_spawn(
&mut reservation,
@@ -610,6 +621,7 @@ impl AgentControl {
);
}
self.persist_thread_spawn_edge_for_source(
resumed_thread.thread.as_ref(),
resumed_thread.thread_id,
Some(&notification_source),
)
@@ -722,13 +734,11 @@ impl AgentControl {
/// agent and any live descendants reached from the in-memory tree.
pub(crate) async fn close_agent(&self, agent_id: ThreadId) -> CodexResult<String> {
let state = self.upgrade()?;
if let Err(err) = state
.agent_graph_store()
.set_thread_spawn_edge_status(
agent_id,
codex_agent_graph_store::ThreadSpawnEdgeStatus::Closed,
)
.await
if let Ok(thread) = state.get_thread(agent_id).await
&& let Some(state_db_ctx) = thread.state_db()
&& let Err(err) = state_db_ctx
.set_thread_spawn_edge_status(agent_id, DirectionalThreadSpawnEdgeStatus::Closed)
.await
{
warn!("failed to persist thread-spawn edge status for {agent_id}: {err}");
}
@@ -1144,21 +1154,21 @@ impl AgentControl {
async fn persist_thread_spawn_edge_for_source(
&self,
thread: &crate::CodexThread,
child_thread_id: ThreadId,
session_source: Option<&SessionSource>,
) {
let Some(parent_thread_id) = session_source.and_then(thread_spawn_parent_thread_id) else {
return;
};
let Ok(state) = self.upgrade() else {
let Some(state_db_ctx) = thread.state_db() else {
return;
};
if let Err(err) = state
.agent_graph_store()
if let Err(err) = state_db_ctx
.upsert_thread_spawn_edge(
parent_thread_id,
child_thread_id,
codex_agent_graph_store::ThreadSpawnEdgeStatus::Open,
DirectionalThreadSpawnEdgeStatus::Open,
)
.await
{

View File

@@ -107,6 +107,32 @@ impl AgentControlHarness {
}
}
async fn new_with_state_db() -> Self {
let (home, mut config) = test_config().await;
config
.features
.enable(Feature::Sqlite)
.expect("test config should allow sqlite");
let state_db = crate::init_state_db(&config)
.await
.expect("test config should initialize state db");
let manager = ThreadManager::with_models_provider_home_and_state_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.to_path_buf(),
std::sync::Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
Some(state_db),
)
.await;
let control = manager.agent_control();
Self {
_home: home,
config,
manager,
control,
}
}
async fn start_thread(&self) -> (ThreadId, Arc<CodexThread>) {
let new_thread = self
.manager
@@ -1538,25 +1564,7 @@ async fn spawn_thread_subagent_uses_role_specific_nickname_candidates() {
#[tokio::test]
async fn resume_thread_subagent_restores_stored_nickname_and_role() {
let (home, mut config) = test_config().await;
config
.features
.enable(Feature::Sqlite)
.expect("test config should allow sqlite");
let manager = ThreadManager::with_models_provider_and_home_for_tests(
CodexAuth::from_api_key("dummy"),
config.model_provider.clone(),
config.codex_home.to_path_buf(),
std::sync::Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
)
.await;
let control = manager.agent_control();
let harness = AgentControlHarness {
_home: home,
config,
manager,
control,
};
let harness = AgentControlHarness::new_with_state_db().await;
let (parent_thread_id, _parent_thread) = harness.start_thread().await;
let agent_path = AgentPath::from_string("/root/explorer".to_string())
.expect("test agent path should be valid");
@@ -1704,12 +1712,14 @@ async fn resume_agent_from_rollout_reads_archived_rollout_path() {
.expect("child shutdown should succeed");
let store = LocalThreadStore::new(
LocalThreadStoreConfig::from_config(&harness.config),
codex_state::StateRuntime::init(
harness.config.sqlite_home.clone(),
harness.config.model_provider_id.clone(),
)
.await
.expect("state db should initialize"),
Some(
codex_state::StateRuntime::init(
harness.config.sqlite_home.clone(),
harness.config.model_provider_id.clone(),
)
.await
.expect("state db should initialize"),
),
);
store
.archive_thread(ArchiveThreadParams {
@@ -1734,7 +1744,7 @@ async fn resume_agent_from_rollout_reads_archived_rollout_path() {
#[tokio::test]
async fn list_agent_subtree_thread_ids_includes_anonymous_and_closed_descendants() {
let harness = AgentControlHarness::new().await;
let harness = AgentControlHarness::new_with_state_db().await;
let (parent_thread_id, _parent_thread) = harness.start_thread().await;
let worker_path = AgentPath::root().join("worker").expect("worker path");
let reviewer_path = AgentPath::root().join("reviewer").expect("reviewer path");
@@ -1860,7 +1870,7 @@ async fn list_agent_subtree_thread_ids_includes_anonymous_and_closed_descendants
#[tokio::test]
async fn shutdown_agent_tree_closes_live_descendants() {
let harness = AgentControlHarness::new().await;
let harness = AgentControlHarness::new_with_state_db().await;
let (parent_thread_id, _parent_thread) = harness.start_thread().await;
let child_thread_id = harness
@@ -1945,7 +1955,7 @@ async fn shutdown_agent_tree_closes_live_descendants() {
#[tokio::test]
async fn shutdown_agent_tree_closes_descendants_when_started_at_child() {
let harness = AgentControlHarness::new().await;
let harness = AgentControlHarness::new_with_state_db().await;
let (parent_thread_id, _parent_thread) = harness.start_thread().await;
let child_thread_id = harness
@@ -2036,7 +2046,7 @@ async fn shutdown_agent_tree_closes_descendants_when_started_at_child() {
#[tokio::test]
async fn resume_agent_from_rollout_does_not_reopen_closed_descendants() {
let harness = AgentControlHarness::new().await;
let harness = AgentControlHarness::new_with_state_db().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = harness
@@ -2131,7 +2141,7 @@ async fn resume_agent_from_rollout_does_not_reopen_closed_descendants() {
#[tokio::test]
async fn resume_closed_child_reopens_open_descendants() {
let harness = AgentControlHarness::new().await;
let harness = AgentControlHarness::new_with_state_db().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = harness
@@ -2228,7 +2238,7 @@ async fn resume_closed_child_reopens_open_descendants() {
#[tokio::test]
async fn resume_agent_from_rollout_reopens_open_descendants_after_manager_shutdown() {
let harness = AgentControlHarness::new().await;
let harness = AgentControlHarness::new_with_state_db().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = harness
@@ -2319,7 +2329,7 @@ async fn resume_agent_from_rollout_reopens_open_descendants_after_manager_shutdo
#[tokio::test]
async fn resume_agent_from_rollout_uses_edge_data_when_descendant_metadata_source_is_stale() {
let harness = AgentControlHarness::new().await;
let harness = AgentControlHarness::new_with_state_db().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = harness
@@ -2450,7 +2460,7 @@ async fn resume_agent_from_rollout_uses_edge_data_when_descendant_metadata_sourc
#[tokio::test]
async fn resume_agent_from_rollout_skips_descendants_when_parent_resume_fails() {
let harness = AgentControlHarness::new().await;
let harness = AgentControlHarness::new_with_state_db().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = harness

View File

@@ -98,7 +98,6 @@ pub(crate) async fn run_codex_thread_interactive(
parent_trace: None,
environment_selections: parent_ctx.environments.clone(),
analytics_events_client: Some(parent_session.services.analytics_events_client.clone()),
state_db: parent_session.services.state_db.clone(),
thread_store: Arc::clone(&parent_session.services.thread_store),
}))
.or_cancel(&cancel_token)

View File

@@ -30,6 +30,7 @@ use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::validate_thread_goal_objective;
use codex_rollout::state_db::reconcile_rollout;
use codex_thread_store::LocalThreadStore;
use codex_utils_template::Template;
use futures::future::BoxFuture;
use std::sync::Arc;
@@ -1338,6 +1339,17 @@ impl Session {
state_db
} else if let Some(state_db) = self.goal_runtime.state_db.lock().await.clone() {
state_db
} else if let Some(local_store) = self
.services
.thread_store
.as_any()
.downcast_ref::<LocalThreadStore>()
{
local_store.state_db().await.ok_or_else(|| {
anyhow::anyhow!(
"thread goals require a local persisted thread with a state database"
)
})?
} else {
anyhow::bail!("thread goals require a local persisted thread with a state database");
};

View File

@@ -117,9 +117,7 @@ pub use thread_manager::NewThread;
pub use thread_manager::StartThreadOptions;
pub use thread_manager::ThreadManager;
pub use thread_manager::ThreadShutdownReport;
pub use thread_manager::agent_graph_store_from_state_db;
pub use thread_manager::build_models_manager;
pub use thread_manager::init_state_db_from_config;
pub use thread_manager::thread_store_from_config;
pub use web_search::web_search_action_detail;
pub use web_search::web_search_detail;

View File

@@ -25,7 +25,7 @@ pub enum PersonalityMigrationStatus {
pub async fn maybe_migrate_personality(
codex_home: &Path,
config_toml: &ConfigToml,
state_db: StateDbHandle,
state_db: Option<StateDbHandle>,
) -> io::Result<PersonalityMigrationStatus> {
let marker_path = codex_home.join(PERSONALITY_MIGRATION_FILENAME);
if tokio::fs::try_exists(&marker_path).await? {
@@ -65,13 +65,16 @@ pub async fn maybe_migrate_personality(
async fn has_recorded_sessions(
codex_home: &Path,
default_provider: &str,
state_db: StateDbHandle,
state_db: Option<StateDbHandle>,
) -> io::Result<bool> {
let config = LocalThreadStoreConfig {
codex_home: codex_home.to_path_buf(),
default_model_provider_id: default_provider.to_string(),
};
let store = LocalThreadStore::new(config, state_db);
let store = LocalThreadStore::new(
LocalThreadStoreConfig {
codex_home: codex_home.to_path_buf(),
sqlite_home: codex_home.to_path_buf(),
default_model_provider_id: default_provider.to_string(),
},
state_db,
);
if has_threads(&store, /*archived*/ false).await? {
return Ok(true);
}

View File

@@ -112,7 +112,7 @@ async fn applies_when_sessions_exist_and_no_personality() -> io::Result<()> {
let config_toml = ConfigToml::default();
let state_db = state_db_for_test(temp.path()).await?;
let status = maybe_migrate_personality(temp.path(), &config_toml, state_db).await?;
let status = maybe_migrate_personality(temp.path(), &config_toml, Some(state_db)).await?;
assert_eq!(status, PersonalityMigrationStatus::Applied);
assert!(temp.path().join(PERSONALITY_MIGRATION_FILENAME).exists());
@@ -129,7 +129,7 @@ async fn applies_when_only_archived_sessions_exist_and_no_personality() -> io::R
let config_toml = ConfigToml::default();
let state_db = state_db_for_test(temp.path()).await?;
let status = maybe_migrate_personality(temp.path(), &config_toml, state_db).await?;
let status = maybe_migrate_personality(temp.path(), &config_toml, Some(state_db)).await?;
assert_eq!(status, PersonalityMigrationStatus::Applied);
assert!(temp.path().join(PERSONALITY_MIGRATION_FILENAME).exists());
@@ -146,7 +146,7 @@ async fn skips_when_marker_exists() -> io::Result<()> {
let config_toml = ConfigToml::default();
let state_db = state_db_for_test(temp.path()).await?;
let status = maybe_migrate_personality(temp.path(), &config_toml, state_db).await?;
let status = maybe_migrate_personality(temp.path(), &config_toml, Some(state_db)).await?;
assert_eq!(status, PersonalityMigrationStatus::SkippedMarker);
assert!(!temp.path().join("config.toml").exists());
@@ -164,7 +164,7 @@ async fn skips_when_personality_explicit() -> io::Result<()> {
let config_toml = read_config_toml(temp.path()).await?;
let state_db = state_db_for_test(temp.path()).await?;
let status = maybe_migrate_personality(temp.path(), &config_toml, state_db).await?;
let status = maybe_migrate_personality(temp.path(), &config_toml, Some(state_db)).await?;
assert_eq!(
status,
@@ -182,7 +182,7 @@ async fn skips_when_no_sessions() -> io::Result<()> {
let temp = TempDir::new()?;
let config_toml = ConfigToml::default();
let state_db = state_db_for_test(temp.path()).await?;
let status = maybe_migrate_personality(temp.path(), &config_toml, state_db).await?;
let status = maybe_migrate_personality(temp.path(), &config_toml, Some(state_db)).await?;
assert_eq!(status, PersonalityMigrationStatus::SkippedNoSessions);
assert!(temp.path().join(PERSONALITY_MIGRATION_FILENAME).exists());
@@ -199,7 +199,7 @@ async fn uses_configured_sqlite_home_when_checking_for_sessions() -> io::Result<
let config_toml = ConfigToml::default();
let state_db =
state_db_for_test_with_sqlite_home(codex_home.path(), sqlite_home.path()).await?;
let status = maybe_migrate_personality(codex_home.path(), &config_toml, state_db).await?;
let status = maybe_migrate_personality(codex_home.path(), &config_toml, Some(state_db)).await?;
assert_eq!(status, PersonalityMigrationStatus::Applied);
assert!(

View File

@@ -17,9 +17,8 @@ use crate::resolve_installation_id;
use crate::session::session::Session;
use crate::session::turn::build_prompt;
use crate::session::turn::built_tools;
use crate::state_db_bridge::init_state_db;
use crate::thread_manager::ThreadManager;
use crate::thread_manager::agent_graph_store_from_state_db;
use crate::thread_manager::init_state_db_from_config;
use crate::thread_manager::thread_store_from_config;
/// Build the model-visible `input` list for a single debug turn.
@@ -38,11 +37,8 @@ pub async fn build_prompt_input(
config.codex_linux_sandbox_exe.clone(),
)?;
let state_db = init_state_db_from_config(&config)
.await
.ok_or_else(|| std::io::Error::other("prompt debug requires state db"))?;
let state_db = init_state_db(&config).await;
let thread_store = thread_store_from_config(&config, state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
let installation_id = resolve_installation_id(&config.codex_home).await?;
let thread_manager = ThreadManager::new(
&config,
@@ -50,10 +46,9 @@ pub async fn build_prompt_input(
SessionSource::Exec,
Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
installation_id,
state_db.clone(),
);
let thread = thread_manager.start_thread(config).await?;

View File

@@ -132,6 +132,7 @@ use codex_terminal_detection::user_agent;
use codex_thread_store::CreateThreadParams;
use codex_thread_store::LiveThread;
use codex_thread_store::LiveThreadInitGuard;
use codex_thread_store::LocalThreadStore;
use codex_thread_store::ResumeThreadParams;
use codex_thread_store::ThreadEventPersistenceMode;
use codex_thread_store::ThreadPersistenceMetadata;
@@ -409,7 +410,6 @@ pub(crate) struct CodexSpawnArgs {
pub(crate) parent_trace: Option<W3cTraceContext>,
pub(crate) environment_selections: ResolvedTurnEnvironments,
pub(crate) analytics_events_client: Option<AnalyticsEventsClient>,
pub(crate) state_db: Option<state_db::StateDbHandle>,
pub(crate) thread_store: Arc<dyn ThreadStore>,
}
@@ -469,7 +469,6 @@ impl Codex {
parent_trace: _,
environment_selections,
analytics_events_client,
state_db,
thread_store,
} = args;
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
@@ -558,7 +557,15 @@ impl Codex {
};
match thread_id {
Some(thread_id) => {
let state_db_ctx = state_db.clone();
let state_db_ctx = if config.ephemeral {
None
} else if let Some(local_store) =
thread_store.as_any().downcast_ref::<LocalThreadStore>()
{
local_store.state_db().await
} else {
None
};
state_db::get_dynamic_tools(state_db_ctx.as_deref(), thread_id, "codex_spawn")
.await
}
@@ -646,7 +653,6 @@ impl Codex {
agent_control,
environment_manager,
analytics_events_client,
state_db,
thread_store,
parent_rollout_thread_trace,
)
@@ -1308,7 +1314,7 @@ impl Session {
self.services.user_shell.as_ref().clone(),
self.services.shell_snapshot_tx.clone(),
self.services.session_telemetry.clone(),
self.state_db(),
self.services.state_db.clone(),
);
}

View File

@@ -368,7 +368,6 @@ impl Session {
agent_control: AgentControl,
environment_manager: Arc<EnvironmentManager>,
analytics_events_client: Option<AnalyticsEventsClient>,
state_db: Option<state_db::StateDbHandle>,
thread_store: Arc<dyn ThreadStore>,
parent_rollout_thread_trace: ThreadTraceContext,
) -> anyhow::Result<Arc<Self>> {
@@ -468,7 +467,22 @@ impl Session {
otel.name = "session_init.thread_persistence",
session_init.ephemeral = config.ephemeral,
));
let state_db_ctx = if config.ephemeral { None } else { state_db };
let state_db_fut = async {
if config.ephemeral {
None
} else if let Some(local_store) =
thread_store.as_any().downcast_ref::<LocalThreadStore>()
{
local_store.state_db().await
} else {
None
}
}
.instrument(info_span!(
"session_init.state_db",
otel.name = "session_init.state_db",
session_init.ephemeral = config.ephemeral,
));
let auth_manager_clone = Arc::clone(&auth_manager);
let config_for_mcp = Arc::clone(&config);
@@ -492,8 +506,8 @@ impl Session {
));
// Join all independent futures.
let (thread_persistence_result, (auth, mcp_servers, auth_statuses)) =
tokio::join!(thread_persistence_fut, auth_and_mcp_fut);
let (thread_persistence_result, state_db_ctx, (auth, mcp_servers, auth_statuses)) =
tokio::join!(thread_persistence_fut, state_db_fut, auth_and_mcp_fut);
let mut live_thread_init =
LiveThreadInitGuard::new(thread_persistence_result.map_err(|e| {

View File

@@ -3598,15 +3598,16 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() {
AgentControl::default(),
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
/*state_db*/ None,
Arc::new(codex_thread_store::LocalThreadStore::new(
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.model_provider_id.clone(),
)
.await
.expect("state db should initialize"),
Some(
codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.model_provider_id.clone(),
)
.await
.expect("state db should initialize"),
),
)),
codex_rollout_trace::ThreadTraceContext::disabled(),
)
@@ -3755,12 +3756,14 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
live_thread: None,
thread_store: Arc::new(codex_thread_store::LocalThreadStore::new(
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.model_provider_id.clone(),
)
.await
.expect("state db should initialize"),
Some(
codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.model_provider_id.clone(),
)
.await
.expect("state db should initialize"),
),
)),
model_client: ModelClient::new(
Some(auth_manager.clone()),
@@ -3939,15 +3942,16 @@ async fn make_session_with_config_and_rx(
AgentControl::default(),
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
/*state_db*/ None,
Arc::new(codex_thread_store::LocalThreadStore::new(
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.model_provider_id.clone(),
)
.await
.expect("state db should initialize"),
Some(
codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.model_provider_id.clone(),
)
.await
.expect("state db should initialize"),
),
)),
codex_rollout_trace::ThreadTraceContext::disabled(),
)
@@ -4047,15 +4051,16 @@ async fn make_session_with_history_source_and_agent_control_and_rx(
agent_control,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
/*state_db*/ None,
Arc::new(codex_thread_store::LocalThreadStore::new(
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.model_provider_id.clone(),
)
.await
.expect("state db should initialize"),
Some(
codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.model_provider_id.clone(),
)
.await
.expect("state db should initialize"),
),
)),
codex_rollout_trace::ThreadTraceContext::disabled(),
)
@@ -5446,7 +5451,7 @@ where
live_thread: None,
thread_store: Arc::new(codex_thread_store::LocalThreadStore::new(
codex_thread_store::LocalThreadStoreConfig::from_config(config.as_ref()),
state_db,
Some(state_db),
)),
model_client: ModelClient::new(
Some(Arc::clone(&auth_manager)),

View File

@@ -731,12 +731,14 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
let skills_watcher = Arc::new(SkillsWatcher::noop());
let thread_store = Arc::new(codex_thread_store::LocalThreadStore::new(
codex_thread_store::LocalThreadStoreConfig::from_config(&config),
codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.model_provider_id.clone(),
)
.await
.expect("state db should initialize"),
Some(
codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.model_provider_id.clone(),
)
.await
.expect("state db should initialize"),
),
));
let CodexSpawnOk { codex, .. } = Codex::spawn(CodexSpawnArgs {
@@ -767,7 +769,6 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
turn_environments: Vec::new(),
},
analytics_events_client: None,
state_db: None,
thread_store,
})
.await

View File

@@ -138,8 +138,11 @@ pub(crate) async fn record_completed_response_item(
.await;
}
mark_thread_memory_mode_polluted_if_external_context(sess, turn_context, item).await;
let has_memory_citation =
record_stage1_output_usage_and_detect_memory_citation(sess.state_db(), item).await;
let has_memory_citation = record_stage1_output_usage_and_detect_memory_citation(
sess.services.state_db.as_ref(),
item,
)
.await;
if has_memory_citation {
sess.record_memory_citation_for_turn(&turn_context.sub_id)
.await;
@@ -174,7 +177,7 @@ pub(crate) async fn mark_thread_memory_mode_polluted_if_external_context(
}
async fn record_stage1_output_usage_and_detect_memory_citation(
state_db_ctx: Option<state_db::StateDbHandle>,
state_db_ctx: Option<&state_db::StateDbHandle>,
item: &ResponseItem,
) -> bool {
let Some(raw_text) = raw_assistant_output_text_from_item(item) else {

View File

@@ -74,6 +74,23 @@ pub async fn thread_manager_with_models_provider_and_home(
.await
}
pub async fn thread_manager_with_models_provider_home_and_state(
auth: CodexAuth,
provider: ModelProviderInfo,
codex_home: PathBuf,
environment_manager: Arc<EnvironmentManager>,
state_db: Option<crate::StateDbHandle>,
) -> ThreadManager {
ThreadManager::with_models_provider_home_and_state_for_tests(
auth,
provider,
codex_home,
environment_manager,
state_db,
)
.await
}
pub async fn start_thread_with_user_shell_override(
thread_manager: &ThreadManager,
config: Config,

View File

@@ -19,8 +19,6 @@ use crate::skills_watcher::SkillsWatcher;
use crate::skills_watcher::SkillsWatcherEvent;
use crate::tasks::InterruptedTurnHistoryMarker;
use crate::tasks::interrupted_turn_history_marker;
use codex_agent_graph_store::AgentGraphStore;
use codex_agent_graph_store::LocalAgentGraphStore;
use codex_analytics::AnalyticsEventsClient;
use codex_app_server_protocol::ThreadHistoryBuilder;
use codex_app_server_protocol::TurnStatus;
@@ -53,8 +51,8 @@ use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnEnvironmentSelection;
use codex_protocol::protocol::W3cTraceContext;
use codex_rollout::state_db;
use codex_rollout::state_db::StateDbHandle;
use codex_state::DirectionalThreadSpawnEdgeStatus;
use codex_thread_store::InMemoryThreadStore;
use codex_thread_store::LocalThreadStore;
use codex_thread_store::LocalThreadStoreConfig;
@@ -251,11 +249,10 @@ pub(crate) struct ThreadManagerState {
mcp_manager: Arc<McpManager>,
skills_watcher: Arc<SkillsWatcher>,
thread_store: Arc<dyn ThreadStore>,
state_db: StateDbHandle,
agent_graph_store: Arc<dyn AgentGraphStore>,
session_source: SessionSource,
installation_id: String,
analytics_events_client: Option<AnalyticsEventsClient>,
state_db: Option<StateDbHandle>,
// Captures submitted ops for testing purpose when test mode is enabled.
ops_log: Option<SharedCapturedOps>,
}
@@ -271,11 +268,10 @@ pub fn build_models_manager(
)
}
pub async fn init_state_db_from_config(config: &Config) -> Option<StateDbHandle> {
state_db::init(config).await
}
pub fn thread_store_from_config(config: &Config, state_db: StateDbHandle) -> Arc<dyn ThreadStore> {
pub fn thread_store_from_config(
config: &Config,
state_db: Option<StateDbHandle>,
) -> Arc<dyn ThreadStore> {
match &config.experimental_thread_store {
ThreadStoreConfig::Local => Arc::new(LocalThreadStore::new(
LocalThreadStoreConfig::from_config(config),
@@ -286,27 +282,6 @@ pub fn thread_store_from_config(config: &Config, state_db: StateDbHandle) -> Arc
}
}
pub fn agent_graph_store_from_state_db(state_db: StateDbHandle) -> Arc<dyn AgentGraphStore> {
Arc::new(LocalAgentGraphStore::new(state_db))
}
async fn state_db_from_roots_for_tests(
codex_home: PathBuf,
sqlite_home: PathBuf,
default_model_provider_id: String,
) -> StateDbHandle {
let config = codex_rollout::RolloutConfig {
codex_home: codex_home.clone(),
sqlite_home,
cwd: codex_home,
model_provider_id: default_model_provider_id,
generate_memories: false,
};
state_db::try_init(&config)
.await
.unwrap_or_else(|err| panic!("test state db should initialize: {err}"))
}
impl ThreadManager {
#[allow(clippy::too_many_arguments)]
pub fn new(
@@ -315,10 +290,9 @@ impl ThreadManager {
session_source: SessionSource,
environment_manager: Arc<EnvironmentManager>,
analytics_events_client: Option<AnalyticsEventsClient>,
state_db: StateDbHandle,
thread_store: Arc<dyn ThreadStore>,
agent_graph_store: Arc<dyn AgentGraphStore>,
installation_id: String,
state_db: Option<StateDbHandle>,
) -> Self {
let codex_home = config.codex_home.clone();
let restriction_product = session_source.restriction_product();
@@ -345,12 +319,11 @@ impl ThreadManager {
mcp_manager,
skills_watcher,
thread_store,
state_db,
agent_graph_store,
auth_manager,
session_source,
installation_id,
analytics_events_client,
state_db,
ops_log: should_use_test_thread_manager_behavior()
.then(|| Arc::new(std::sync::Mutex::new(Vec::new()))),
}),
@@ -371,28 +344,13 @@ impl ThreadManager {
));
std::fs::create_dir_all(&codex_home)
.unwrap_or_else(|err| panic!("temp codex home dir create failed: {err}"));
let state_db = state_db_from_roots_for_tests(
codex_home.clone(),
codex_home.clone(),
OPENAI_PROVIDER_ID.to_string(),
)
.await;
let skills_codex_home = match AbsolutePathBuf::from_absolute_path_checked(&codex_home) {
Ok(codex_home) => codex_home,
Err(err) => panic!("test codex_home should be absolute: {err}"),
};
let installation_id = resolve_installation_id(&skills_codex_home)
.await
.unwrap_or_else(|err| panic!("resolve test installation id failed: {err}"));
let mut manager = Self::with_models_provider_and_home_and_state_db_for_tests(
let mut manager = Self::with_models_provider_and_home_for_tests(
auth,
provider,
codex_home.clone(),
Arc::new(EnvironmentManager::default_for_tests()),
state_db,
skills_codex_home,
installation_id,
);
)
.await;
manager._test_codex_home_guard = Some(TempCodexHomeGuard { path: codex_home });
manager
}
@@ -405,12 +363,23 @@ impl ThreadManager {
codex_home: PathBuf,
environment_manager: Arc<EnvironmentManager>,
) -> Self {
let state_db = state_db_from_roots_for_tests(
codex_home.clone(),
codex_home.clone(),
OPENAI_PROVIDER_ID.to_string(),
Self::with_models_provider_home_and_state_for_tests(
auth,
provider,
codex_home,
environment_manager,
/*state_db*/ None,
)
.await;
.await
}
pub(crate) async fn with_models_provider_home_and_state_for_tests(
auth: CodexAuth,
provider: ModelProviderInfo,
codex_home: PathBuf,
environment_manager: Arc<EnvironmentManager>,
state_db: Option<StateDbHandle>,
) -> Self {
let skills_codex_home = match AbsolutePathBuf::from_absolute_path_checked(&codex_home) {
Ok(codex_home) => codex_home,
Err(err) => panic!("test codex_home should be absolute: {err}"),
@@ -418,26 +387,6 @@ impl ThreadManager {
let installation_id = resolve_installation_id(&skills_codex_home)
.await
.unwrap_or_else(|err| panic!("resolve test installation id failed: {err}"));
Self::with_models_provider_and_home_and_state_db_for_tests(
auth,
provider,
codex_home,
environment_manager,
state_db,
skills_codex_home,
installation_id,
)
}
fn with_models_provider_and_home_and_state_db_for_tests(
auth: CodexAuth,
provider: ModelProviderInfo,
codex_home: PathBuf,
environment_manager: Arc<EnvironmentManager>,
state_db: StateDbHandle,
skills_codex_home: AbsolutePathBuf,
installation_id: String,
) -> Self {
set_thread_manager_test_mode_for_tests(/*enabled*/ true);
let auth_manager = AuthManager::from_auth_for_testing(auth);
let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY);
@@ -458,11 +407,11 @@ impl ThreadManager {
let thread_store: Arc<dyn ThreadStore> = Arc::new(LocalThreadStore::new(
LocalThreadStoreConfig {
codex_home: codex_home.clone(),
sqlite_home: codex_home.clone(),
default_model_provider_id: OPENAI_PROVIDER_ID.to_string(),
},
state_db.clone(),
));
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
Self {
state: Arc::new(ThreadManagerState {
threads: Arc::new(RwLock::new(HashMap::new())),
@@ -475,12 +424,11 @@ impl ThreadManager {
mcp_manager,
skills_watcher,
thread_store,
state_db,
agent_graph_store,
auth_manager,
session_source: SessionSource::Exec,
installation_id,
analytics_events_client: None,
state_db,
ops_log: should_use_test_thread_manager_behavior()
.then(|| Arc::new(std::sync::Mutex::new(Vec::new()))),
}),
@@ -566,17 +514,22 @@ impl ThreadManager {
subtree_thread_ids.push(thread_id);
seen_thread_ids.insert(thread_id);
for descendant_id in self
.state
.agent_graph_store
.list_thread_spawn_descendants(thread_id, /*status_filter*/ None)
.await
.map_err(|err| {
CodexErr::Fatal(format!("failed to load thread-spawn descendants: {err}"))
})?
{
if seen_thread_ids.insert(descendant_id) {
subtree_thread_ids.push(descendant_id);
if let Some(state_db_ctx) = thread.state_db() {
for status in [
DirectionalThreadSpawnEdgeStatus::Open,
DirectionalThreadSpawnEdgeStatus::Closed,
] {
for descendant_id in state_db_ctx
.list_thread_spawn_descendants_with_status(thread_id, status)
.await
.map_err(|err| {
CodexErr::Fatal(format!("failed to load thread-spawn descendants: {err}"))
})?
{
if seen_thread_ids.insert(descendant_id) {
subtree_thread_ids.push(descendant_id);
}
}
}
}
@@ -918,14 +871,10 @@ impl ThreadManager {
}
impl ThreadManagerState {
pub(crate) fn state_db(&self) -> StateDbHandle {
pub(crate) fn state_db(&self) -> Option<StateDbHandle> {
self.state_db.clone()
}
pub(crate) fn agent_graph_store(&self) -> Arc<dyn AgentGraphStore> {
self.agent_graph_store.clone()
}
pub(crate) async fn list_thread_ids(&self) -> Vec<ThreadId> {
self.threads
.read()
@@ -1242,7 +1191,6 @@ impl ThreadManagerState {
parent_trace,
environment_selections,
analytics_events_client: self.analytics_events_client.clone(),
state_db: Some(self.state_db.clone()),
thread_store: Arc::clone(&self.thread_store),
})
.await?;

View File

@@ -1,5 +1,6 @@
use super::*;
use crate::config::test_config;
use crate::init_state_db;
use crate::installation_id::INSTALLATION_ID_FILENAME;
use crate::rollout::RolloutRecorder;
use crate::session::session::SessionSettingsUpdate;
@@ -50,19 +51,12 @@ fn assistant_msg(text: &str) -> ResponseItem {
}
}
async fn state_backed_stores(
config: &Config,
) -> (
StateDbHandle,
Arc<dyn ThreadStore>,
Arc<dyn AgentGraphStore>,
) {
let state_db = init_state_db_from_config(config)
async fn state_backed_stores(config: &Config) -> (StateDbHandle, Arc<dyn ThreadStore>) {
let state_db = init_state_db(config)
.await
.expect("thread manager test requires state db");
let thread_store = thread_store_from_config(config, state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
(state_db, thread_store, agent_graph_store)
let thread_store = thread_store_from_config(config, Some(state_db.clone()));
(state_db, thread_store)
}
fn contextual_user_interrupted_marker() -> ResponseItem {
@@ -408,17 +402,16 @@ async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let (state_db, thread_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
TEST_INSTALLATION_ID.to_string(),
Some(state_db),
);
let selected_cwd =
AbsolutePathBuf::try_from(config.cwd.as_path().join("selected")).expect("absolute path");
@@ -524,17 +517,16 @@ async fn explicit_installation_id_skips_codex_home_file() {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let installation_id = uuid::Uuid::new_v4().to_string();
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let (state_db, thread_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager,
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
installation_id.clone(),
Some(state_db),
);
let thread = manager
@@ -563,17 +555,16 @@ async fn resume_active_thread_from_rollout_returns_running_thread() {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let (state_db, thread_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
TEST_INSTALLATION_ID.to_string(),
Some(state_db),
);
let source = manager
@@ -620,17 +611,16 @@ async fn resume_stopped_thread_from_rollout_spawns_new_thread() {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let (state_db, thread_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
TEST_INSTALLATION_ID.to_string(),
Some(state_db),
);
let source = manager
@@ -682,17 +672,16 @@ async fn resume_stopped_thread_from_rollout_preserves_thread_source() {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let (state_db, thread_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
TEST_INSTALLATION_ID.to_string(),
Some(state_db),
);
let source = manager
@@ -768,17 +757,16 @@ async fn new_uses_active_provider_for_model_refresh() {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let (state_db, thread_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager,
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
TEST_INSTALLATION_ID.to_string(),
Some(state_db),
);
let _ = manager.list_models(RefreshStrategy::Online).await;
@@ -983,17 +971,16 @@ async fn interrupted_fork_snapshot_does_not_synthesize_turn_id_for_legacy_histor
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let (state_db, thread_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
TEST_INSTALLATION_ID.to_string(),
Some(state_db),
);
let source = manager
@@ -1090,17 +1077,16 @@ async fn interrupted_fork_snapshot_preserves_explicit_turn_id() {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let (state_db, thread_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
TEST_INSTALLATION_ID.to_string(),
Some(state_db),
);
let source = manager
@@ -1186,17 +1172,16 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let (state_db, thread_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
TEST_INSTALLATION_ID.to_string(),
Some(state_db),
);
let source = manager
@@ -1328,17 +1313,16 @@ async fn resumed_thread_keeps_paused_goal_paused() -> anyhow::Result<()> {
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let (state_db, thread_store, agent_graph_store) = state_backed_stores(&config).await;
let (state_db, thread_store) = state_backed_stores(&config).await;
let manager = ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
TEST_INSTALLATION_ID.to_string(),
Some(state_db),
);
let source = manager

View File

@@ -6,7 +6,6 @@ use crate::function_tool::FunctionCallError;
use crate::init_state_db;
use crate::session::tests::make_session_and_context;
use crate::session_prefix::format_subagent_notification_message;
use crate::thread_manager::agent_graph_store_from_state_db;
use crate::thread_manager::thread_store_from_config;
use crate::tools::context::ToolOutput;
use crate::tools::handlers::multi_agents_v2::CloseAgentHandler as CloseAgentHandlerV2;
@@ -3150,8 +3149,28 @@ async fn close_agent_submits_shutdown_and_returns_previous_status() {
assert_eq!(status_after, AgentStatus::NotFound);
}
#[tokio::test]
async fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtrees_closed() {
#[test]
fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtrees_closed() {
let handle = std::thread::Builder::new()
.name("multi-agent-cascade-test".to_string())
.stack_size(8 * 1024 * 1024)
.spawn(|| {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.expect("tokio runtime should build");
runtime.block_on(
tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtrees_closed_impl(),
);
})
.expect("multi-agent cascade test thread should spawn");
if let Err(payload) = handle.join() {
std::panic::resume_unwind(payload);
}
}
async fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtrees_closed_impl() {
let (_session, turn) = make_session_and_context().await;
let mut config = turn.config.as_ref().clone();
config.agent_max_depth = 3;
@@ -3168,10 +3187,9 @@ async fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtr
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db.clone(),
thread_store_from_config(&config, state_db.clone()),
agent_graph_store_from_state_db(state_db.clone()),
thread_store_from_config(&config, Some(state_db.clone())),
"11111111-1111-4111-8111-111111111111".to_string(),
Some(state_db),
);
let parent = manager

View File

@@ -15,9 +15,8 @@ use anyhow::anyhow;
use codex_config::CloudRequirementsLoader;
use codex_core::CodexThread;
use codex_core::ThreadManager;
use codex_core::agent_graph_store_from_state_db;
use codex_core::config::Config;
use codex_core::init_state_db_from_config;
use codex_core::init_state_db;
use codex_core::resolve_installation_id;
use codex_core::shell::Shell;
use codex_core::shell::get_shell_by_model_provided_path;
@@ -426,12 +425,14 @@ impl TestCodexBuilder {
environment_manager: Arc<codex_exec_server::EnvironmentManager>,
) -> anyhow::Result<TestCodex> {
let auth = self.auth.clone();
let thread_manager = if config.model_catalog.is_some() {
let state_db = init_state_db_from_config(&config)
let needs_state_db = config.model_catalog.is_some()
|| config.features.enabled(Feature::Goals)
|| config.features.enabled(Feature::Sqlite);
let thread_manager = if needs_state_db {
let state_db = init_state_db(&config)
.await
.expect("test codex requires state db");
let thread_store = thread_store_from_config(&config, state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
let thread_store = thread_store_from_config(&config, Some(state_db.clone()));
let installation_id = resolve_installation_id(&config.codex_home).await?;
ThreadManager::new(
&config,
@@ -439,10 +440,9 @@ impl TestCodexBuilder {
SessionSource::Exec,
Arc::clone(&environment_manager),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
installation_id,
Some(state_db),
)
} else {
codex_core::test_support::thread_manager_with_models_provider_and_home(

View File

@@ -5,8 +5,7 @@ use codex_core::NewThread;
use codex_core::Prompt;
use codex_core::ResponseEvent;
use codex_core::ThreadManager;
use codex_core::agent_graph_store_from_state_db;
use codex_core::init_state_db_from_config;
use codex_core::init_state_db;
use codex_core::resolve_installation_id;
use codex_core::thread_store_from_config;
use codex_features::Feature;
@@ -1116,11 +1115,10 @@ async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() {
Ok(None) => panic!("No CodexAuth found in codex_home"),
Err(e) => panic!("Failed to load CodexAuth: {e}"),
};
let state_db = init_state_db_from_config(&config)
let state_db = init_state_db(&config)
.await
.expect("client test requires state db");
let thread_store = thread_store_from_config(&config, state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
let thread_store = thread_store_from_config(&config, Some(state_db.clone()));
let installation_id = resolve_installation_id(&config.codex_home)
.await
.expect("resolve installation id");
@@ -1130,10 +1128,9 @@ async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() {
SessionSource::Exec,
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
installation_id,
Some(state_db),
);
let NewThread { thread: codex, .. } = thread_manager
.start_thread(config.clone())

View File

@@ -46,7 +46,7 @@ async fn run_migration(
config_toml: &ConfigToml,
) -> io::Result<PersonalityMigrationStatus> {
let state_db = state_db_for_test(codex_home).await?;
maybe_migrate_personality(codex_home, config_toml, state_db).await
maybe_migrate_personality(codex_home, config_toml, Some(state_db)).await
}
async fn write_session_with_user_event(codex_home: &Path) -> io::Result<()> {

View File

@@ -7,6 +7,7 @@ use std::sync::Arc;
use codex_arg0::Arg0DispatchPaths;
use codex_core::config::Config;
use codex_core::init_state_db;
use codex_core::resolve_installation_id;
use codex_exec_server::EnvironmentManager;
use codex_exec_server::EnvironmentManagerArgs;
@@ -141,19 +142,17 @@ pub async fn run_main(
// Task: process incoming messages.
let processor_handle = tokio::spawn({
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
let processor = MessageProcessor::new(
let state_db = init_state_db(&config).await;
let mut processor = MessageProcessor::new(
outgoing_message_sender,
arg0_paths,
Arc::new(config),
environment_manager,
installation_id,
state_db,
)
.await;
async move {
let Some(mut processor) = processor else {
error!("failed to initialize MCP processor");
return;
};
while let Some(msg) = incoming_rx.recv().await {
match msg {
JsonRpcMessage::Request(r) => processor.process_request(r).await,

View File

@@ -2,10 +2,9 @@ use std::collections::HashMap;
use std::sync::Arc;
use codex_arg0::Arg0DispatchPaths;
use codex_core::StateDbHandle;
use codex_core::ThreadManager;
use codex_core::agent_graph_store_from_state_db;
use codex_core::config::Config;
use codex_core::init_state_db_from_config;
use codex_core::thread_store_from_config;
use codex_exec_server::EnvironmentManager;
use codex_login::AuthManager;
@@ -56,34 +55,31 @@ impl MessageProcessor {
config: Arc<Config>,
environment_manager: Arc<EnvironmentManager>,
installation_id: String,
) -> Option<Self> {
state_db: Option<StateDbHandle>,
) -> Self {
let outgoing = Arc::new(outgoing);
let auth_manager = AuthManager::shared_from_config(
config.as_ref(),
/*enable_codex_api_key_env*/ false,
)
.await;
let state_db = init_state_db_from_config(config.as_ref()).await?;
let thread_store = thread_store_from_config(config.as_ref(), state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
let thread_manager = Arc::new(ThreadManager::new(
config.as_ref(),
auth_manager,
SessionSource::Mcp,
environment_manager,
/*analytics_events_client*/ None,
state_db,
thread_store,
agent_graph_store,
thread_store_from_config(config.as_ref(), state_db.clone()),
installation_id,
state_db.clone(),
));
Some(Self {
Self {
outgoing,
initialized: false,
arg0_paths,
thread_manager,
running_requests_id_to_codex_uuid: Arc::new(Mutex::new(HashMap::new())),
})
}
}
pub(crate) async fn process_request(&mut self, request: JsonRpcRequest<ClientRequest>) {

View File

@@ -52,11 +52,10 @@ use codex_core_api::TuiNotificationSettings;
use codex_core_api::UriBasedFileOpener;
use codex_core_api::UserInput;
use codex_core_api::WebSearchMode;
use codex_core_api::agent_graph_store_from_state_db;
use codex_core_api::arg0_dispatch_or_else;
use codex_core_api::built_in_model_providers;
use codex_core_api::find_codex_home;
use codex_core_api::init_state_db_from_config;
use codex_core_api::init_state_db;
use codex_core_api::item_event_to_server_notification;
use codex_core_api::resolve_installation_id;
use codex_core_api::set_default_originator;
@@ -106,6 +105,7 @@ async fn run_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
};
let config = new_config(args.model, arg0_paths)?;
let state_db = init_state_db(&config).await;
let auth_manager =
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;
@@ -113,11 +113,7 @@ async fn run_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
config.codex_self_exe.clone(),
config.codex_linux_sandbox_exe.clone(),
)?;
let Some(state_db) = init_state_db_from_config(&config).await else {
bail!("thread manager sample requires state db");
};
let thread_store = thread_store_from_config(&config, state_db.clone());
let agent_graph_store = agent_graph_store_from_state_db(state_db.clone());
let environment_manager =
Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await);
let installation_id = resolve_installation_id(&config.codex_home).await?;
@@ -127,10 +123,9 @@ async fn run_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
SessionSource::Exec,
environment_manager,
/*analytics_events_client*/ None,
state_db,
Arc::clone(&thread_store),
agent_graph_store,
installation_id,
state_db,
);
let NewThread {

View File

@@ -13,11 +13,11 @@ pub(super) async fn archive_thread(
params: ArchiveThreadParams,
) -> ThreadStoreResult<()> {
let thread_id = params.thread_id;
let state_db = store.state_db();
let state_db_ctx = store.state_db().await;
let rollout_path = find_thread_path_by_id_str(
store.config.codex_home.as_path(),
&thread_id.to_string(),
Some(state_db.as_ref()),
state_db_ctx.as_deref(),
)
.await
.map_err(|err| ThreadStoreError::InvalidRequest {
@@ -52,10 +52,11 @@ pub(super) async fn archive_thread(
}
})?;
let _ = store
.state_db()
.mark_archived(thread_id, archived_path.as_path(), Utc::now())
.await;
if let Some(ctx) = state_db_ctx {
let _ = ctx
.mark_archived(thread_id, archived_path.as_path(), Utc::now())
.await;
}
Ok(())
}
@@ -74,15 +75,13 @@ mod tests {
use crate::ThreadSortKey;
use crate::ThreadStore;
use crate::local::LocalThreadStore;
use crate::local::test_support::init_test_state_db;
use crate::local::test_support::test_config;
use crate::local::test_support::test_store;
use crate::local::test_support::write_session_file;
#[tokio::test]
async fn archive_thread_moves_rollout_to_archived_collection() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let uuid = Uuid::from_u128(201);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let active_path =
@@ -128,12 +127,21 @@ mod tests {
async fn archive_thread_updates_sqlite_metadata_when_present() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config.clone(), runtime.clone());
let uuid = Uuid::from_u128(202);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let active_path =
write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file");
let runtime = codex_state::StateRuntime::init(
home.path().to_path_buf(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
runtime
.mark_backfill_complete(/*last_watermark*/ None)
.await
.expect("backfill should be complete");
let mut builder = codex_state::ThreadMetadataBuilder::new(
thread_id,
active_path.clone(),

View File

@@ -22,12 +22,12 @@ pub(super) async fn create_thread(
})?;
let config = RolloutConfig {
codex_home: store.config.codex_home.clone(),
sqlite_home: store.sqlite_home(),
sqlite_home: store.config.sqlite_home.clone(),
cwd,
model_provider_id: params.metadata.model_provider.clone(),
generate_memories: matches!(params.metadata.memory_mode, ThreadMemoryMode::Enabled),
};
let state_db_ctx = Some(store.state_db());
let state_db_ctx = store.state_db().await;
let recorder = RolloutRecorder::new(
&config,
RolloutRecorderParams::new(

View File

@@ -39,16 +39,16 @@ pub(super) async fn list_threads(
SortDirection::Asc => codex_rollout::SortDirection::Asc,
SortDirection::Desc => codex_rollout::SortDirection::Desc,
};
let state_db = store.state_db().await;
let rollout_config = RolloutConfig {
codex_home: store.config.codex_home.clone(),
sqlite_home: store.sqlite_home(),
sqlite_home: store.config.sqlite_home.clone(),
cwd: store.config.codex_home.clone(),
model_provider_id: store.config.default_model_provider_id.clone(),
generate_memories: false,
};
let state_db_ctx = Some(store.state_db());
let page = list_rollout_threads(
state_db_ctx,
state_db,
&rollout_config,
store.config.default_model_provider_id.as_str(),
&params,
@@ -80,13 +80,14 @@ pub(super) async fn list_threads(
.map(|thread| thread.thread_id)
.collect::<HashSet<_>>();
let mut names = HashMap::<ThreadId, String>::with_capacity(thread_ids.len());
let state_db_ctx = store.state_db();
for &thread_id in &thread_ids {
let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await else {
continue;
};
if let Some(title) = distinct_thread_metadata_title(&metadata) {
names.insert(thread_id, title);
if let Some(state_db_ctx) = store.state_db().await {
for &thread_id in &thread_ids {
let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await else {
continue;
};
if let Some(title) = distinct_thread_metadata_title(&metadata) {
names.insert(thread_id, title);
}
}
}
if names.len() < thread_ids.len()
@@ -107,9 +108,9 @@ pub(super) async fn list_threads(
}
async fn list_rollout_threads(
state_db_ctx: Option<codex_rollout::StateDbHandle>,
state_db: Option<codex_rollout::StateDbHandle>,
config: &RolloutConfig,
default_model_provider: &str,
default_model_provider_id: &str,
params: &ListThreadsParams,
cursor: Option<&codex_rollout::Cursor>,
sort_key: codex_rollout::ThreadSortKey,
@@ -117,7 +118,7 @@ async fn list_rollout_threads(
) -> ThreadStoreResult<codex_rollout::ThreadsPage> {
let page = if params.use_state_db_only && params.archived {
RolloutRecorder::list_archived_threads_from_state_db(
state_db_ctx.clone(),
state_db,
config,
params.page_size,
cursor,
@@ -126,13 +127,13 @@ async fn list_rollout_threads(
params.allowed_sources.as_slice(),
params.model_providers.as_deref(),
params.cwd_filters.as_deref(),
default_model_provider,
default_model_provider_id,
params.search_term.as_deref(),
)
.await
} else if params.use_state_db_only {
RolloutRecorder::list_threads_from_state_db(
state_db_ctx.clone(),
state_db,
config,
params.page_size,
cursor,
@@ -141,13 +142,13 @@ async fn list_rollout_threads(
params.allowed_sources.as_slice(),
params.model_providers.as_deref(),
params.cwd_filters.as_deref(),
default_model_provider,
default_model_provider_id,
params.search_term.as_deref(),
)
.await
} else if params.archived {
RolloutRecorder::list_archived_threads(
state_db_ctx.clone(),
state_db,
config,
params.page_size,
cursor,
@@ -156,13 +157,13 @@ async fn list_rollout_threads(
params.allowed_sources.as_slice(),
params.model_providers.as_deref(),
params.cwd_filters.as_deref(),
default_model_provider,
default_model_provider_id,
params.search_term.as_deref(),
)
.await
} else {
RolloutRecorder::list_threads(
state_db_ctx,
state_db,
config,
params.page_size,
cursor,
@@ -171,7 +172,7 @@ async fn list_rollout_threads(
params.allowed_sources.as_slice(),
params.model_providers.as_deref(),
params.cwd_filters.as_deref(),
default_model_provider,
default_model_provider_id,
params.search_term.as_deref(),
)
.await
@@ -194,9 +195,7 @@ mod tests {
use super::*;
use crate::ThreadStore;
use crate::local::LocalThreadStore;
use crate::local::test_support::init_test_state_db;
use crate::local::test_support::test_config;
use crate::local::test_support::test_store;
use crate::local::test_support::write_archived_session_file;
use crate::local::test_support::write_session_file;
use crate::local::test_support::write_session_file_with;
@@ -204,7 +203,7 @@ mod tests {
#[tokio::test]
async fn list_threads_uses_default_provider_when_rollout_omits_provider() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
write_session_file_with(
home.path(),
home.path().join("sessions/2025/01/03"),
@@ -239,13 +238,22 @@ mod tests {
async fn list_threads_preserves_sqlite_title_search_results() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config.clone(), runtime.clone());
let uuid = Uuid::from_u128(103);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let rollout_path = home.path().join("rollout-title-search.jsonl");
fs::write(&rollout_path, "").expect("placeholder rollout file");
let runtime = codex_state::StateRuntime::init(
home.path().to_path_buf(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
runtime
.mark_backfill_complete(/*last_watermark*/ None)
.await
.expect("backfill should be complete");
let created_at = Utc::now();
let mut builder = codex_state::ThreadMetadataBuilder::new(
thread_id,
@@ -259,10 +267,6 @@ mod tests {
let mut metadata = builder.build(config.default_model_provider_id.as_str());
metadata.title = "needle title".to_string();
metadata.first_user_message = Some("plain preview".to_string());
runtime
.mark_backfill_complete(/*last_watermark*/ None)
.await
.expect("backfill should be complete");
runtime
.upsert_thread(&metadata)
.await
@@ -299,7 +303,7 @@ mod tests {
#[tokio::test]
async fn list_threads_selects_active_or_archived_collection() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let active_uuid = Uuid::from_u128(105);
let archived_uuid = Uuid::from_u128(106);
write_session_file(home.path(), "2025-01-03T12-00-00", active_uuid)
@@ -368,7 +372,7 @@ mod tests {
async fn list_threads_returns_local_rollout_summary() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let store = LocalThreadStore::new(config.clone(), init_test_state_db(&config).await);
let store = LocalThreadStore::new(config, /*state_db*/ None);
let uuid = Uuid::from_u128(101);
let path =
write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file");
@@ -407,7 +411,7 @@ mod tests {
#[tokio::test]
async fn list_threads_rejects_invalid_cursor() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let err = store
.list_threads(ListThreadsParams {

View File

@@ -66,12 +66,12 @@ pub(super) async fn resume_thread(
})?;
let config = RolloutConfig {
codex_home: store.config.codex_home.clone(),
sqlite_home: store.sqlite_home(),
sqlite_home: store.config.sqlite_home.clone(),
cwd,
model_provider_id: params.metadata.model_provider.clone(),
generate_memories: matches!(params.metadata.memory_mode, ThreadMemoryMode::Enabled),
};
let state_db_ctx = Some(store.state_db());
let state_db_ctx = store.state_db().await;
let recorder = RolloutRecorder::new(
&config,
RolloutRecorderParams::resume(

View File

@@ -41,7 +41,7 @@ use crate::UpdateThreadMetadataParams;
pub struct LocalThreadStore {
pub(super) config: LocalThreadStoreConfig,
live_recorders: Arc<Mutex<HashMap<ThreadId, RolloutRecorder>>>,
state_db: StateDbHandle,
state_db: Option<StateDbHandle>,
}
/// Process-scoped configuration for local thread storage.
@@ -51,6 +51,7 @@ pub struct LocalThreadStore {
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct LocalThreadStoreConfig {
pub codex_home: PathBuf,
pub sqlite_home: PathBuf,
/// Provider used only when older local metadata does not contain one.
pub default_model_provider_id: String,
}
@@ -59,6 +60,7 @@ impl LocalThreadStoreConfig {
pub fn from_config(config: &impl codex_rollout::RolloutConfigView) -> Self {
Self {
codex_home: config.codex_home().to_path_buf(),
sqlite_home: config.sqlite_home().to_path_buf(),
default_model_provider_id: config.model_provider_id().to_string(),
}
}
@@ -73,9 +75,8 @@ impl std::fmt::Debug for LocalThreadStore {
}
impl LocalThreadStore {
/// Create a local store from process-scoped local storage configuration and
/// the caller-provided shared state DB handle.
pub fn new(config: LocalThreadStoreConfig, state_db: StateDbHandle) -> Self {
/// Create a local store using an already initialized state DB handle.
pub fn new(config: LocalThreadStoreConfig, state_db: Option<StateDbHandle>) -> Self {
Self {
config,
live_recorders: Arc::new(Mutex::new(HashMap::new())),
@@ -84,14 +85,10 @@ impl LocalThreadStore {
}
/// Return the state DB handle used by local rollout writers.
pub fn state_db(&self) -> StateDbHandle {
pub async fn state_db(&self) -> Option<StateDbHandle> {
self.state_db.clone()
}
pub(super) fn sqlite_home(&self) -> PathBuf {
self.state_db.codex_home().to_path_buf()
}
/// Read a local rollout-backed thread by path.
pub async fn read_thread_by_rollout_path(
&self,
@@ -285,16 +282,14 @@ mod tests {
use super::*;
use crate::ThreadEventPersistenceMode;
use crate::ThreadPersistenceMetadata;
use crate::local::test_support::init_test_state_db;
use crate::local::test_support::test_config;
use crate::local::test_support::test_store;
use crate::local::test_support::write_archived_session_file;
use crate::local::test_support::write_session_file;
#[tokio::test]
async fn live_writer_lifecycle_writes_and_closes() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let thread_id = ThreadId::default();
store
@@ -343,7 +338,7 @@ mod tests {
#[tokio::test]
async fn create_thread_rejects_missing_cwd() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let thread_id = ThreadId::default();
let mut params = create_thread_params(thread_id);
params.metadata.cwd = None;
@@ -363,7 +358,7 @@ mod tests {
#[tokio::test]
async fn discard_thread_drops_unmaterialized_live_writer() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let thread_id = ThreadId::default();
store
@@ -401,9 +396,8 @@ mod tests {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let thread_id = ThreadId::default();
let state_db = init_test_state_db(&config).await;
let first_store = LocalThreadStore::new(config.clone(), state_db.clone());
let first_store = LocalThreadStore::new(config.clone(), /*state_db*/ None);
first_store
.create_thread(create_thread_params(thread_id))
.await
@@ -432,7 +426,7 @@ mod tests {
.await
.expect("shutdown initial writer");
let resumed_store = LocalThreadStore::new(config, state_db);
let resumed_store = LocalThreadStore::new(config, /*state_db*/ None);
resumed_store
.resume_thread(ResumeThreadParams {
thread_id,
@@ -463,7 +457,7 @@ mod tests {
#[tokio::test]
async fn create_thread_rejects_duplicate_live_writer() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let thread_id = ThreadId::default();
store
@@ -483,7 +477,7 @@ mod tests {
#[tokio::test]
async fn resume_thread_rejects_duplicate_live_writer() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let thread_id = ThreadId::default();
store
@@ -512,7 +506,7 @@ mod tests {
#[tokio::test]
async fn resume_thread_rejects_missing_cwd() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let uuid = uuid::Uuid::from_u128(407);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let rollout_path =
@@ -541,7 +535,7 @@ mod tests {
async fn load_history_uses_live_writer_rollout_path() {
let home = TempDir::new().expect("temp dir");
let external_home = TempDir::new().expect("external temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let uuid = uuid::Uuid::from_u128(404);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let rollout_path = write_session_file(external_home.path(), "2025-01-04T10-00-00", uuid)
@@ -590,7 +584,7 @@ mod tests {
async fn read_thread_uses_live_writer_rollout_path_for_external_resume() {
let home = TempDir::new().expect("temp dir");
let external_home = TempDir::new().expect("external temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let uuid = uuid::Uuid::from_u128(406);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let rollout_path = write_session_file(external_home.path(), "2025-01-04T11-00-00", uuid)
@@ -629,7 +623,7 @@ mod tests {
#[tokio::test]
async fn load_history_uses_live_writer_rollout_path_for_archived_source() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let uuid = uuid::Uuid::from_u128(405);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let rollout_path = write_archived_session_file(home.path(), "2025-01-04T10-30-00", uuid)
@@ -697,7 +691,7 @@ mod tests {
#[tokio::test]
async fn read_thread_by_rollout_path_includes_history() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let thread_id = ThreadId::default();
store

View File

@@ -176,12 +176,12 @@ async fn resolve_rollout_path(
return Ok(Some(path));
}
let state_db = store.state_db();
let state_db_ctx = store.state_db().await;
if include_archived {
match find_thread_path_by_id_str(
store.config.codex_home.as_path(),
&thread_id.to_string(),
Some(state_db.as_ref()),
state_db_ctx.as_deref(),
)
.await
.map_err(|err| ThreadStoreError::InvalidRequest {
@@ -191,7 +191,7 @@ async fn resolve_rollout_path(
None => find_archived_thread_path_by_id_str(
store.config.codex_home.as_path(),
&thread_id.to_string(),
Some(state_db.as_ref()),
state_db_ctx.as_deref(),
)
.await
.map_err(|err| ThreadStoreError::InvalidRequest {
@@ -202,7 +202,7 @@ async fn resolve_rollout_path(
find_thread_path_by_id_str(
store.config.codex_home.as_path(),
&thread_id.to_string(),
Some(state_db.as_ref()),
state_db_ctx.as_deref(),
)
.await
.map_err(|err| ThreadStoreError::InvalidRequest {
@@ -260,7 +260,8 @@ async fn read_sqlite_metadata(
store: &LocalThreadStore,
thread_id: codex_protocol::ThreadId,
) -> Option<ThreadMetadata> {
store.state_db().get_thread(thread_id).await.ok().flatten()
let runtime = store.state_db().await?;
runtime.get_thread(thread_id).await.ok().flatten()
}
async fn stored_thread_from_sqlite_metadata(
@@ -414,9 +415,7 @@ mod tests {
use super::*;
use crate::ThreadStore;
use crate::local::LocalThreadStore;
use crate::local::test_support::init_test_state_db;
use crate::local::test_support::test_config;
use crate::local::test_support::test_store;
use crate::local::test_support::write_archived_session_file;
use crate::local::test_support::write_session_file;
use crate::local::test_support::write_session_file_with_fork;
@@ -424,7 +423,7 @@ mod tests {
#[tokio::test]
async fn read_thread_returns_active_rollout_summary() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let uuid = Uuid::from_u128(205);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let active_path =
@@ -452,7 +451,7 @@ mod tests {
#[tokio::test]
async fn read_thread_returns_rollout_path_summary() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let uuid = Uuid::from_u128(211);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let active_path =
@@ -483,12 +482,17 @@ mod tests {
async fn read_thread_by_rollout_path_prefers_sqlite_git_info() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config.clone(), runtime.clone());
let uuid = Uuid::from_u128(223);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let active_path =
write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file");
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
let mut builder = ThreadMetadataBuilder::new(
thread_id,
active_path.clone(),
@@ -526,7 +530,7 @@ mod tests {
#[tokio::test]
async fn read_thread_returns_archived_rollout_when_requested() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let uuid = Uuid::from_u128(207);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let archived_path = write_archived_session_file(home.path(), "2025-01-03T12-00-00", uuid)
@@ -567,7 +571,7 @@ mod tests {
#[tokio::test]
async fn read_thread_prefers_active_rollout_over_archived() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let uuid = Uuid::from_u128(208);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let active_path =
@@ -592,7 +596,7 @@ mod tests {
#[tokio::test]
async fn read_thread_returns_forked_from_id() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let uuid = Uuid::from_u128(209);
let parent_uuid = Uuid::from_u128(210);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
@@ -625,12 +629,17 @@ mod tests {
async fn read_thread_applies_sqlite_thread_name() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config.clone(), runtime.clone());
let uuid = Uuid::from_u128(212);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let rollout_path =
write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file");
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
let mut builder =
ThreadMetadataBuilder::new(thread_id, rollout_path, Utc::now(), SessionSource::Cli);
builder.model_provider = Some(config.default_model_provider_id.clone());
@@ -660,8 +669,13 @@ mod tests {
async fn read_thread_preserves_rollout_cwd_when_sqlite_metadata_exists() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config.clone(), runtime.clone());
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
let uuid = Uuid::from_u128(224);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let day_dir = home.path().join("sessions/2025/01/03");
@@ -730,7 +744,7 @@ mod tests {
#[tokio::test]
async fn read_thread_uses_legacy_thread_name_when_sqlite_title_is_missing() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let uuid = Uuid::from_u128(213);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file");
@@ -754,8 +768,6 @@ mod tests {
async fn read_thread_uses_sqlite_metadata_for_rollout_without_user_preview() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config.clone(), runtime.clone());
let uuid = Uuid::from_u128(217);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let day_dir = home.path().join("sessions/2025/01/03");
@@ -777,6 +789,13 @@ mod tests {
});
writeln!(file, "{meta}").expect("write session meta");
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
let mut builder = ThreadMetadataBuilder::new(
thread_id,
rollout_path.clone(),
@@ -819,13 +838,18 @@ mod tests {
let home = TempDir::new().expect("temp dir");
let external = TempDir::new().expect("external temp dir");
let config = test_config(home.path());
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config.clone(), runtime.clone());
let uuid = Uuid::from_u128(220);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let rollout_path =
write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file");
let stale_path = external.path().join("missing-rollout.jsonl");
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
let mut builder = ThreadMetadataBuilder::new(
thread_id,
stale_path.clone(),
@@ -863,8 +887,6 @@ mod tests {
let home = TempDir::new().expect("temp dir");
let external = TempDir::new().expect("external temp dir");
let config = test_config(home.path());
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config.clone(), runtime.clone());
let uuid = Uuid::from_u128(221);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let rollout_path =
@@ -872,6 +894,13 @@ mod tests {
let other_uuid = Uuid::from_u128(222);
let stale_path = write_session_file(external.path(), "2025-01-04T12-00-00", other_uuid)
.expect("other session file");
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
let mut builder =
ThreadMetadataBuilder::new(thread_id, stale_path, Utc::now(), SessionSource::Cli);
builder.model_provider = Some("wrong-sqlite-provider".to_string());
@@ -903,7 +932,7 @@ mod tests {
#[tokio::test]
async fn read_thread_uses_session_meta_for_rollout_without_user_preview_or_sqlite_metadata() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let uuid = Uuid::from_u128(218);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let day_dir = home.path().join("sessions/2025/01/03");
@@ -958,13 +987,18 @@ mod tests {
let home = TempDir::new().expect("temp dir");
let external = TempDir::new().expect("external temp dir");
let config = test_config(home.path());
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config.clone(), runtime.clone());
let uuid = Uuid::from_u128(214);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let rollout_path = external
.path()
.join(format!("rollout-2025-01-03T12-00-00-{uuid}.jsonl"));
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
let mut builder = ThreadMetadataBuilder::new(
thread_id,
rollout_path.clone(),
@@ -1011,15 +1045,20 @@ mod tests {
let home = TempDir::new().expect("temp dir");
let external = TempDir::new().expect("external temp dir");
let config = test_config(home.path());
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config.clone(), runtime.clone());
let uuid = Uuid::from_u128(216);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let rollout_path = external
.path()
.join(format!("rollout-2025-01-03T12-00-00-{uuid}.jsonl"));
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let mut builder =
ThreadMetadataBuilder::new(thread_id, rollout_path, Utc::now(), SessionSource::Cli);
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
builder.archived_at = Some(Utc::now());
let mut metadata = builder.build(config.default_model_provider_id.as_str());
metadata.first_user_message = Some("Archived SQLite preview".to_string());
@@ -1062,12 +1101,17 @@ mod tests {
async fn read_thread_sqlite_fallback_loads_archived_history() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config.clone(), runtime.clone());
let uuid = Uuid::from_u128(219);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let archived_path = write_archived_session_file(home.path(), "2025-01-03T12-00-00", uuid)
.expect("archived session file");
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
let mut builder = ThreadMetadataBuilder::new(
thread_id,
archived_path.clone(),
@@ -1103,7 +1147,7 @@ mod tests {
#[tokio::test]
async fn read_thread_fails_without_rollout() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let uuid = Uuid::from_u128(206);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");

View File

@@ -4,34 +4,18 @@ use std::path::Path;
use std::path::PathBuf;
use codex_rollout::ARCHIVED_SESSIONS_SUBDIR;
use codex_rollout::StateDbHandle;
use uuid::Uuid;
use super::LocalThreadStore;
use super::LocalThreadStoreConfig;
pub(super) fn test_config(codex_home: &Path) -> LocalThreadStoreConfig {
LocalThreadStoreConfig {
codex_home: codex_home.to_path_buf(),
sqlite_home: codex_home.to_path_buf(),
default_model_provider_id: "test-provider".to_string(),
}
}
pub(super) async fn init_test_state_db(config: &LocalThreadStoreConfig) -> StateDbHandle {
codex_state::StateRuntime::init(
config.codex_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize")
}
pub(super) async fn test_store(codex_home: &Path) -> LocalThreadStore {
let config = test_config(codex_home);
let state_db = init_test_state_db(&config).await;
LocalThreadStore::new(config, state_db)
}
pub(super) fn write_session_file(root: &Path, ts: &str, uuid: Uuid) -> std::io::Result<PathBuf> {
write_session_file_with(
root,

View File

@@ -17,11 +17,11 @@ pub(super) async fn unarchive_thread(
params: ArchiveThreadParams,
) -> ThreadStoreResult<StoredThread> {
let thread_id = params.thread_id;
let state_db = store.state_db();
let state_db_ctx = store.state_db().await;
let archived_path = find_archived_thread_path_by_id_str(
store.config.codex_home.as_path(),
&thread_id.to_string(),
Some(state_db.as_ref()),
state_db_ctx.as_deref(),
)
.await
.map_err(|err| ThreadStoreError::InvalidRequest {
@@ -73,10 +73,11 @@ pub(super) async fn unarchive_thread(
message: format!("failed to update unarchived thread timestamp: {err}"),
})?;
let _ = store
.state_db()
.mark_unarchived(thread_id, restored_path.as_path())
.await;
if let Some(ctx) = state_db_ctx {
let _ = ctx
.mark_unarchived(thread_id, restored_path.as_path())
.await;
}
let item = read_thread_item_from_rollout(restored_path.clone())
.await
@@ -111,15 +112,13 @@ mod tests {
use super::*;
use crate::ThreadStore;
use crate::local::LocalThreadStore;
use crate::local::test_support::init_test_state_db;
use crate::local::test_support::test_config;
use crate::local::test_support::test_store;
use crate::local::test_support::write_archived_session_file;
#[tokio::test]
async fn unarchive_thread_restores_rollout_and_returns_updated_thread() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let uuid = Uuid::from_u128(203);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let archived_path = write_archived_session_file(home.path(), "2025-01-03T13-00-00", uuid)
@@ -150,12 +149,21 @@ mod tests {
async fn unarchive_thread_updates_sqlite_metadata_when_present() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config.clone(), runtime.clone());
let uuid = Uuid::from_u128(204);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let archived_path = write_archived_session_file(home.path(), "2025-01-03T13-00-00", uuid)
.expect("archived session file");
let runtime = codex_state::StateRuntime::init(
home.path().to_path_buf(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
runtime
.mark_backfill_complete(/*last_watermark*/ None)
.await
.expect("backfill should be complete");
let mut builder = codex_state::ThreadMetadataBuilder::new(
thread_id,
archived_path.clone(),

View File

@@ -55,8 +55,9 @@ pub(super) async fn update_thread_metadata(
.await?;
}
let state_db_ctx = store.state_db().await;
codex_rollout::state_db::reconcile_rollout(
Some(store.state_db()).as_deref(),
state_db_ctx.as_deref(),
resolved_rollout_path.path.as_path(),
store.config.default_model_provider_id.as_str(),
/*builder*/ None,
@@ -72,7 +73,11 @@ pub(super) async fn update_thread_metadata(
let resolved_git_info = match git_info {
Some(git_info) => {
let state_db = store.state_db();
let Some(state_db) = store.state_db().await else {
return Err(ThreadStoreError::Internal {
message: format!("sqlite state db unavailable for thread {thread_id}"),
});
};
let metadata =
state_db
.get_thread(thread_id)
@@ -152,7 +157,11 @@ async fn apply_thread_git_info(
branch: &Option<String>,
origin_url: &Option<String>,
) -> ThreadStoreResult<()> {
let state_db = store.state_db();
let Some(state_db) = store.state_db().await else {
return Err(ThreadStoreError::Internal {
message: format!("sqlite state db unavailable for thread {thread_id}"),
});
};
let updated = state_db
.update_thread_git_info(
thread_id,
@@ -232,17 +241,18 @@ async fn apply_thread_name(
thread_id: ThreadId,
name: String,
) -> ThreadStoreResult<()> {
let updated = store
.state_db()
.update_thread_title(thread_id, &name)
.await
.map_err(|err| ThreadStoreError::Internal {
message: format!("failed to set thread name: {err}"),
})?;
if !updated {
return Err(ThreadStoreError::Internal {
message: format!("thread metadata unavailable before name update: {thread_id}"),
});
if let Some(state_db) = store.state_db().await {
let updated = state_db
.update_thread_title(thread_id, &name)
.await
.map_err(|err| ThreadStoreError::Internal {
message: format!("failed to set thread name: {err}"),
})?;
if !updated {
return Err(ThreadStoreError::Internal {
message: format!("thread metadata unavailable before name update: {thread_id}"),
});
}
}
append_thread_name(store.config.codex_home.as_path(), thread_id, &name)
@@ -300,11 +310,11 @@ async fn resolve_rollout_path(
return Ok(ResolvedRolloutPath { path, archived });
}
let state_db = store.state_db();
let state_db_ctx = store.state_db().await;
let active_path = find_thread_path_by_id_str(
store.config.codex_home.as_path(),
&thread_id.to_string(),
Some(state_db.as_ref()),
state_db_ctx.as_deref(),
)
.await
.map_err(|err| ThreadStoreError::InvalidRequest {
@@ -324,7 +334,7 @@ async fn resolve_rollout_path(
find_archived_thread_path_by_id_str(
store.config.codex_home.as_path(),
&thread_id.to_string(),
Some(state_db.as_ref()),
state_db_ctx.as_deref(),
)
.await
.map_err(|err| ThreadStoreError::InvalidRequest {
@@ -359,16 +369,21 @@ mod tests {
use crate::ThreadPersistenceMetadata;
use crate::ThreadStore;
use crate::local::LocalThreadStore;
use crate::local::test_support::init_test_state_db;
use crate::local::test_support::test_config;
use crate::local::test_support::test_store;
use crate::local::test_support::write_archived_session_file;
use crate::local::test_support::write_session_file;
#[tokio::test]
async fn update_thread_metadata_sets_name_on_active_rollout_and_indexes_name() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let config = test_config(home.path());
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config, Some(runtime.clone()));
let uuid = Uuid::from_u128(301);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
write_session_file(home.path(), "2025-01-03T14-00-00", uuid).expect("session file");
@@ -391,8 +406,7 @@ mod tests {
.expect("find thread name");
assert_eq!(latest_name.as_deref(), Some("A sharper name"));
let metadata = store
.state_db()
let metadata = runtime
.get_thread(thread_id)
.await
.expect("get metadata")
@@ -404,12 +418,18 @@ mod tests {
async fn update_thread_metadata_sets_memory_mode_on_active_rollout() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config.clone(), runtime.clone());
let uuid = Uuid::from_u128(302);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let path =
write_session_file(home.path(), "2025-01-03T14-30-00", uuid).expect("session file");
let runtime = codex_state::StateRuntime::init(
home.path().to_path_buf(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
let thread = store
.update_thread_metadata(UpdateThreadMetadataParams {
thread_id,
@@ -442,8 +462,13 @@ mod tests {
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let path =
write_session_file(home.path(), "2025-01-03T18-30-00", uuid).expect("session file");
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config.clone(), runtime.clone());
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
store
.update_thread_metadata(UpdateThreadMetadataParams {
@@ -502,7 +527,7 @@ mod tests {
async fn update_thread_metadata_uses_live_rollout_path_for_external_resume() {
let home = TempDir::new().expect("temp dir");
let external_home = TempDir::new().expect("external temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let uuid = Uuid::from_u128(307);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let path = write_session_file(external_home.path(), "2025-01-03T14-45-00", uuid)
@@ -543,8 +568,13 @@ mod tests {
async fn update_thread_metadata_sets_git_info() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config, runtime);
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config, Some(runtime));
let uuid = Uuid::from_u128(309);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
write_session_file(home.path(), "2025-01-03T17-00-00", uuid).expect("session file");
@@ -581,8 +611,13 @@ mod tests {
async fn update_thread_metadata_partially_updates_git_info() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config, runtime);
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config, Some(runtime));
let uuid = Uuid::from_u128(310);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
write_session_file(home.path(), "2025-01-03T17-30-00", uuid).expect("session file");
@@ -634,8 +669,13 @@ mod tests {
async fn update_thread_metadata_clears_git_info_fields() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config.clone(), runtime.clone());
let runtime = codex_state::StateRuntime::init(
config.sqlite_home.clone(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
let uuid = Uuid::from_u128(311);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let path =
@@ -799,7 +839,7 @@ mod tests {
#[tokio::test]
async fn update_thread_metadata_rejects_mismatched_session_meta_id() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let filename_uuid = Uuid::from_u128(303);
let metadata_uuid = Uuid::from_u128(304);
let thread_id = ThreadId::from_string(&filename_uuid.to_string()).expect("valid thread id");
@@ -831,7 +871,7 @@ mod tests {
#[tokio::test]
async fn update_thread_metadata_rejects_multi_field_patch_without_partial_write() {
let home = TempDir::new().expect("temp dir");
let store = test_store(home.path()).await;
let store = LocalThreadStore::new(test_config(home.path()), /*state_db*/ None);
let uuid = Uuid::from_u128(305);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let path =
@@ -866,12 +906,21 @@ mod tests {
async fn update_thread_metadata_keeps_archived_thread_archived_in_sqlite() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config.clone(), runtime.clone());
let uuid = Uuid::from_u128(306);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let archived_path = write_archived_session_file(home.path(), "2025-01-03T16-00-00", uuid)
.expect("archived session file");
let runtime = codex_state::StateRuntime::init(
home.path().to_path_buf(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
runtime
.mark_backfill_complete(/*last_watermark*/ None)
.await
.expect("backfill should be complete");
codex_rollout::state_db::reconcile_rollout(
Some(runtime.as_ref()),
archived_path.as_path(),
@@ -920,12 +969,21 @@ mod tests {
async fn update_thread_metadata_keeps_live_archived_thread_archived_in_sqlite() {
let home = TempDir::new().expect("temp dir");
let config = test_config(home.path());
let runtime = init_test_state_db(&config).await;
let store = LocalThreadStore::new(config.clone(), runtime.clone());
let uuid = Uuid::from_u128(308);
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
let archived_path = write_archived_session_file(home.path(), "2025-01-03T16-30-00", uuid)
.expect("archived session file");
let runtime = codex_state::StateRuntime::init(
home.path().to_path_buf(),
config.default_model_provider_id.clone(),
)
.await
.expect("state db should initialize");
let store = LocalThreadStore::new(config.clone(), Some(runtime.clone()));
runtime
.mark_backfill_complete(/*last_watermark*/ None)
.await
.expect("backfill should be complete");
codex_rollout::state_db::reconcile_rollout(
Some(runtime.as_ref()),
archived_path.as_path(),