diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index eda154a4e1..6bd8b0ac45 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -1065,7 +1065,7 @@ pub(crate) async fn apply_bespoke_event_handling( if let Some(request_id) = pending { // Rollback responses are rebuilt from rollout-on-disk, and `thread/start` // can defer rollout file creation, so force persistence before reading. - let rollout_path = match conversation.ensure_rollout_persisted_for_api().await { + let rollout_path = match conversation.ensure_rollout_persisted().await { Ok(RolloutPersistenceStatus::Persisted(path)) => path, Ok(RolloutPersistenceStatus::Ephemeral) => { let error = JSONRPCErrorError { diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index abe408300b..e798e3d9ec 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -5183,7 +5183,7 @@ impl CodexMessageProcessor { thread_id: ThreadId, ) -> Result { if let Ok(thread) = self.thread_manager.get_thread(thread_id).await { - match thread.ensure_rollout_persisted_for_api().await { + match thread.ensure_rollout_persisted().await { Ok(RolloutPersistenceStatus::Persisted(path)) => return Ok(path), Ok(RolloutPersistenceStatus::Ephemeral) => { return Err(JSONRPCErrorError { diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 29036fc086..7ab1d89f66 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -487,10 +487,10 @@ impl Codex { self.session.state_db() } - pub(crate) async fn ensure_rollout_persisted_for_api( - &self, - ) -> CodexResult { - self.session.ensure_rollout_persisted_for_api().await + /// Ensure this thread's rollout is persisted and return whether it is + /// persisted or ephemeral. + pub(crate) async fn ensure_rollout_persisted(&self) -> CodexResult { + self.session.ensure_rollout_persisted().await } } @@ -1236,6 +1236,12 @@ impl Session { } } + /// Lazily instantiate the rollout recorder when deferred creation is enabled. + /// + /// This is idempotent and concurrency-safe: + /// - If a recorder already exists, it returns immediately. + /// - If initialization fails, pending creation params are restored so a + /// later turn can retry. async fn ensure_rollout_initialized_for_turn( &self, turn_context: &TurnContext, @@ -1281,7 +1287,12 @@ impl Session { Ok(()) } - async fn ensure_rollout_persisted_for_api(&self) -> CodexResult { + /// Ensure this thread has a persisted rollout on disk and return its status. + /// + /// For non-ephemeral threads this guarantees a concrete rollout path by: + /// initializing the recorder if needed, seeding initial context if needed, + /// and flushing buffered rollout writes. + async fn ensure_rollout_persisted(&self) -> CodexResult { let turn_context = self.new_default_turn().await; if turn_context.config.ephemeral { return Ok(RolloutPersistenceStatus::Ephemeral); @@ -5493,7 +5504,7 @@ mod tests { } #[tokio::test] - async fn ensure_rollout_persisted_for_api_is_idempotent() { + async fn ensure_rollout_persisted_is_idempotent() { let (session, turn_context) = make_session_and_context().await; { let mut state = session.state.lock().await; @@ -5517,11 +5528,11 @@ mod tests { } let first = session - .ensure_rollout_persisted_for_api() + .ensure_rollout_persisted() .await .expect("first persistence call"); let second = session - .ensure_rollout_persisted_for_api() + .ensure_rollout_persisted() .await .expect("second persistence call"); @@ -5551,7 +5562,7 @@ mod tests { } #[tokio::test] - async fn ensure_rollout_persisted_for_api_returns_ephemeral_when_session_is_ephemeral() { + async fn ensure_rollout_persisted_returns_ephemeral_when_session_is_ephemeral() { let (session, _) = make_session_and_context().await; { let mut state = session.state.lock().await; @@ -5561,7 +5572,7 @@ mod tests { } let outcome = session - .ensure_rollout_persisted_for_api() + .ensure_rollout_persisted() .await .expect("ephemeral persistence check"); assert_eq!(outcome, RolloutPersistenceStatus::Ephemeral); diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index 72b769a514..d5d57db4fc 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -82,8 +82,9 @@ impl CodexThread { self.rollout_path.clone() } - pub async fn ensure_rollout_persisted_for_api(&self) -> CodexResult { - self.codex.ensure_rollout_persisted_for_api().await + /// Ensure this thread has a persisted rollout and return its status. + pub async fn ensure_rollout_persisted(&self) -> CodexResult { + self.codex.ensure_rollout_persisted().await } pub fn state_db(&self) -> Option {