More improvements and fixes

This commit is contained in:
Eric Traut
2026-02-06 12:50:39 -08:00
parent 317e2080dc
commit 557e65044b
4 changed files with 26 additions and 14 deletions

View File

@@ -1065,7 +1065,7 @@ pub(crate) async fn apply_bespoke_event_handling(
if let Some(request_id) = pending {
// Rollback responses are rebuilt from rollout-on-disk, and `thread/start`
// can defer rollout file creation, so force persistence before reading.
let rollout_path = match conversation.ensure_rollout_persisted_for_api().await {
let rollout_path = match conversation.ensure_rollout_persisted().await {
Ok(RolloutPersistenceStatus::Persisted(path)) => path,
Ok(RolloutPersistenceStatus::Ephemeral) => {
let error = JSONRPCErrorError {

View File

@@ -5183,7 +5183,7 @@ impl CodexMessageProcessor {
thread_id: ThreadId,
) -> Result<PathBuf, JSONRPCErrorError> {
if let Ok(thread) = self.thread_manager.get_thread(thread_id).await {
match thread.ensure_rollout_persisted_for_api().await {
match thread.ensure_rollout_persisted().await {
Ok(RolloutPersistenceStatus::Persisted(path)) => return Ok(path),
Ok(RolloutPersistenceStatus::Ephemeral) => {
return Err(JSONRPCErrorError {

View File

@@ -487,10 +487,10 @@ impl Codex {
self.session.state_db()
}
pub(crate) async fn ensure_rollout_persisted_for_api(
&self,
) -> CodexResult<RolloutPersistenceStatus> {
self.session.ensure_rollout_persisted_for_api().await
/// Ensure this thread's rollout is persisted and return whether it is
/// persisted or ephemeral.
pub(crate) async fn ensure_rollout_persisted(&self) -> CodexResult<RolloutPersistenceStatus> {
self.session.ensure_rollout_persisted().await
}
}
@@ -1236,6 +1236,12 @@ impl Session {
}
}
/// Lazily instantiate the rollout recorder when deferred creation is enabled.
///
/// This is idempotent and concurrency-safe:
/// - If a recorder already exists, it returns immediately.
/// - If initialization fails, pending creation params are restored so a
/// later turn can retry.
async fn ensure_rollout_initialized_for_turn(
&self,
turn_context: &TurnContext,
@@ -1281,7 +1287,12 @@ impl Session {
Ok(())
}
async fn ensure_rollout_persisted_for_api(&self) -> CodexResult<RolloutPersistenceStatus> {
/// Ensure this thread has a persisted rollout on disk and return its status.
///
/// For non-ephemeral threads this guarantees a concrete rollout path by:
/// initializing the recorder if needed, seeding initial context if needed,
/// and flushing buffered rollout writes.
async fn ensure_rollout_persisted(&self) -> CodexResult<RolloutPersistenceStatus> {
let turn_context = self.new_default_turn().await;
if turn_context.config.ephemeral {
return Ok(RolloutPersistenceStatus::Ephemeral);
@@ -5493,7 +5504,7 @@ mod tests {
}
#[tokio::test]
async fn ensure_rollout_persisted_for_api_is_idempotent() {
async fn ensure_rollout_persisted_is_idempotent() {
let (session, turn_context) = make_session_and_context().await;
{
let mut state = session.state.lock().await;
@@ -5517,11 +5528,11 @@ mod tests {
}
let first = session
.ensure_rollout_persisted_for_api()
.ensure_rollout_persisted()
.await
.expect("first persistence call");
let second = session
.ensure_rollout_persisted_for_api()
.ensure_rollout_persisted()
.await
.expect("second persistence call");
@@ -5551,7 +5562,7 @@ mod tests {
}
#[tokio::test]
async fn ensure_rollout_persisted_for_api_returns_ephemeral_when_session_is_ephemeral() {
async fn ensure_rollout_persisted_returns_ephemeral_when_session_is_ephemeral() {
let (session, _) = make_session_and_context().await;
{
let mut state = session.state.lock().await;
@@ -5561,7 +5572,7 @@ mod tests {
}
let outcome = session
.ensure_rollout_persisted_for_api()
.ensure_rollout_persisted()
.await
.expect("ephemeral persistence check");
assert_eq!(outcome, RolloutPersistenceStatus::Ephemeral);

View File

@@ -82,8 +82,9 @@ impl CodexThread {
self.rollout_path.clone()
}
pub async fn ensure_rollout_persisted_for_api(&self) -> CodexResult<RolloutPersistenceStatus> {
self.codex.ensure_rollout_persisted_for_api().await
/// Ensure this thread has a persisted rollout and return its status.
pub async fn ensure_rollout_persisted(&self) -> CodexResult<RolloutPersistenceStatus> {
self.codex.ensure_rollout_persisted().await
}
pub fn state_db(&self) -> Option<StateDbHandle> {