Add thread/unarchive to restore archived rollouts (#9843)

## Summary
- Adds a new `thread/unarchive` RPC to move archived thread rollouts
back into the active `sessions/` tree.

## What changed
- **Protocol**
  - Adds `thread/unarchive` request/response types and wiring.
- **Server**
  - Implements `thread_unarchive` in the app server.
  - Validates the archived rollout path and thread ID.
- Restores the rollout to `sessions/YYYY/MM/DD/...` based on the rollout
filename timestamp.
- **Core**
- Adds `find_archived_thread_path_by_id_str` helper for archived
rollouts.
- **Docs**
  - Documents the new RPC and usage example.
- **Tests**
  - Adds an end-to-end server test that:
    1) starts a thread,
    2) archives it,
    3) unarchives it,
    4) asserts the file is restored to `sessions/`.

## How to use
```json
{ "method": "thread/unarchive", "id": 24, "params": { "threadId": "<thread-id>" } }
```

## Author Codex Session

`codex resume 019bf158-54b6-7960-a696-9d85df7e1bc1` (soon I'll make this
kind of session UUID forkable by anyone with the right
`session_object_storage_url` line in their config, but for now just
pasting it here for my reference)
This commit is contained in:
Charley Cunningham
2026-01-26 11:24:36 -08:00
committed by GitHub
parent 09251387e0
commit 62266b13f8
13 changed files with 367 additions and 8 deletions

View File

@@ -117,6 +117,10 @@ client_request_definitions! {
params: v2::ThreadArchiveParams,
response: v2::ThreadArchiveResponse,
},
ThreadUnarchive => "thread/unarchive" {
params: v2::ThreadUnarchiveParams,
response: v2::ThreadUnarchiveResponse,
},
ThreadRollback => "thread/rollback" {
params: v2::ThreadRollbackParams,
response: v2::ThreadRollbackResponse,

View File

@@ -1223,6 +1223,20 @@ pub struct ThreadArchiveParams {
#[ts(export_to = "v2/")]
pub struct ThreadArchiveResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadUnarchiveParams {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadUnarchiveResponse {
pub thread: Thread,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -81,6 +81,7 @@ Example (from OpenAI's official VSCode extension):
- `thread/loaded/list` — list the thread ids currently loaded in memory.
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`.
- `thread/archive` — move a threads rollout file into the archived directory; returns `{}` on success.
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success.
- `thread/rollback` — drop the last N turns from the agents in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications.
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
@@ -223,6 +224,15 @@ Use `thread/archive` to move the persisted rollout (stored as a JSONL file on di
An archived thread will not appear in `thread/list` unless `archived` is set to `true`.
### Example: Unarchive a thread
Use `thread/unarchive` to move an archived rollout back into the sessions directory.
```json
{ "method": "thread/unarchive", "id": 24, "params": { "threadId": "thr_b" } }
{ "id": 24, "result": { "thread": { "id": "thr_b" } } }
```
### Example: Start a turn (send user input)
Turns attach user input (text or images) to a thread and trigger Codex generation. The `input` field is a list of discriminated unions:

View File

@@ -114,6 +114,8 @@ use codex_app_server_protocol::ThreadSortKey;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::ThreadUnarchiveResponse;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnError;
use codex_app_server_protocol::TurnInterruptParams;
@@ -151,6 +153,7 @@ use codex_core::error::CodexErr;
use codex_core::exec::ExecParams;
use codex_core::exec_env::create_env;
use codex_core::features::Feature;
use codex_core::find_archived_thread_path_by_id_str;
use codex_core::find_thread_path_by_id_str;
use codex_core::git_info::git_diff_to_remote;
use codex_core::mcp::collect_mcp_snapshot;
@@ -164,6 +167,7 @@ use codex_core::protocol::ReviewTarget as CoreReviewTarget;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::read_head_for_summary;
use codex_core::read_session_meta_line;
use codex_core::rollout_date_parts;
use codex_core::sandboxing::SandboxPermissions;
use codex_feedback::CodexFeedback;
use codex_login::ServerOptions as LoginServerOptions;
@@ -404,6 +408,9 @@ impl CodexMessageProcessor {
ClientRequest::ThreadArchive { request_id, params } => {
self.thread_archive(request_id, params).await;
}
ClientRequest::ThreadUnarchive { request_id, params } => {
self.thread_unarchive(request_id, params).await;
}
ClientRequest::ThreadRollback { request_id, params } => {
self.thread_rollback(request_id, params).await;
}
@@ -1643,6 +1650,150 @@ impl CodexMessageProcessor {
}
}
async fn thread_unarchive(&mut self, request_id: RequestId, params: ThreadUnarchiveParams) {
let thread_id = match ThreadId::from_string(&params.thread_id) {
Ok(id) => id,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid thread id: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let archived_path = match find_archived_thread_path_by_id_str(
&self.config.codex_home,
&thread_id.to_string(),
)
.await
{
Ok(Some(path)) => path,
Ok(None) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("no archived rollout found for thread id {thread_id}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("failed to locate archived thread id {thread_id}: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let rollout_path_display = archived_path.display().to_string();
let fallback_provider = self.config.model_provider_id.clone();
let archived_folder = self
.config
.codex_home
.join(codex_core::ARCHIVED_SESSIONS_SUBDIR);
let result: Result<Thread, JSONRPCErrorError> = async {
let canonical_archived_dir = tokio::fs::canonicalize(&archived_folder).await.map_err(
|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!(
"failed to unarchive thread: unable to resolve archived directory: {err}"
),
data: None,
},
)?;
let canonical_rollout_path = tokio::fs::canonicalize(&archived_path).await;
let canonical_rollout_path = if let Ok(path) = canonical_rollout_path
&& path.starts_with(&canonical_archived_dir)
{
path
} else {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"rollout path `{rollout_path_display}` must be in archived directory"
),
data: None,
});
};
let required_suffix = format!("{thread_id}.jsonl");
let Some(file_name) = canonical_rollout_path.file_name().map(OsStr::to_owned) else {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("rollout path `{rollout_path_display}` missing file name"),
data: None,
});
};
if !file_name
.to_string_lossy()
.ends_with(required_suffix.as_str())
{
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"rollout path `{rollout_path_display}` does not match thread id {thread_id}"
),
data: None,
});
}
let Some((year, month, day)) = rollout_date_parts(&file_name) else {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"rollout path `{rollout_path_display}` missing filename timestamp"
),
data: None,
});
};
let sessions_folder = self.config.codex_home.join(codex_core::SESSIONS_SUBDIR);
let dest_dir = sessions_folder.join(year).join(month).join(day);
let restored_path = dest_dir.join(&file_name);
tokio::fs::create_dir_all(&dest_dir)
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to unarchive thread: {err}"),
data: None,
})?;
tokio::fs::rename(&canonical_rollout_path, &restored_path)
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to unarchive thread: {err}"),
data: None,
})?;
let summary =
read_summary_from_rollout(restored_path.as_path(), fallback_provider.as_str())
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to read unarchived thread: {err}"),
data: None,
})?;
Ok(summary_to_thread(summary))
}
.await;
match result {
Ok(thread) => {
let response = ThreadUnarchiveResponse { thread };
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
self.outgoing.send_error(request_id, err).await;
}
}
}
async fn thread_rollback(&mut self, request_id: RequestId, params: ThreadRollbackParams) {
let ThreadRollbackParams {
thread_id,

View File

@@ -53,6 +53,7 @@ use codex_app_server_protocol::ThreadReadParams;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadRollbackParams;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::TurnInterruptParams;
use codex_app_server_protocol::TurnStartParams;
use codex_core::default_client::CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR;
@@ -365,6 +366,15 @@ impl McpProcess {
self.send_request("thread/archive", params).await
}
/// Send a `thread/unarchive` JSON-RPC request.
pub async fn send_thread_unarchive_request(
&mut self,
params: ThreadUnarchiveParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/unarchive", params).await
}
/// Send a `thread/rollback` JSON-RPC request.
pub async fn send_thread_rollback_request(
&mut self,

View File

@@ -18,5 +18,6 @@ mod thread_read;
mod thread_resume;
mod thread_rollback;
mod thread_start;
mod thread_unarchive;
mod turn_interrupt;
mod turn_start;

View File

@@ -0,0 +1,101 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::ThreadUnarchiveResponse;
use codex_core::find_archived_thread_path_by_id_str;
use codex_core::find_thread_path_by_id_str;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
#[tokio::test]
async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let rollout_path = find_thread_path_by_id_str(codex_home.path(), &thread.id)
.await?
.expect("expected rollout path for thread id to exist");
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: thread.id.clone(),
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
)
.await??;
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
let archived_path = find_archived_thread_path_by_id_str(codex_home.path(), &thread.id)
.await?
.expect("expected archived rollout path for thread id to exist");
let archived_path_display = archived_path.display();
assert!(
archived_path.exists(),
"expected {archived_path_display} to exist"
);
let unarchive_id = mcp
.send_thread_unarchive_request(ThreadUnarchiveParams {
thread_id: thread.id.clone(),
})
.await?;
let unarchive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(unarchive_id)),
)
.await??;
let _: ThreadUnarchiveResponse = to_response::<ThreadUnarchiveResponse>(unarchive_resp)?;
let rollout_path_display = rollout_path.display();
assert!(
rollout_path.exists(),
"expected rollout path {rollout_path_display} to be restored"
);
assert!(
!archived_path.exists(),
"expected archived rollout path {archived_path_display} to be moved"
);
Ok(())
}
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(config_toml, config_contents())
}
fn config_contents() -> &'static str {
r#"model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
"#
}

View File

@@ -98,6 +98,7 @@ pub use rollout::INTERACTIVE_SESSION_SOURCES;
pub use rollout::RolloutRecorder;
pub use rollout::SESSIONS_SUBDIR;
pub use rollout::SessionMeta;
pub use rollout::find_archived_thread_path_by_id_str;
#[deprecated(note = "use find_thread_path_by_id_str")]
pub use rollout::find_conversation_path_by_id_str;
pub use rollout::find_thread_path_by_id_str;
@@ -108,6 +109,7 @@ pub use rollout::list::ThreadsPage;
pub use rollout::list::parse_cursor;
pub use rollout::list::read_head_for_summary;
pub use rollout::list::read_session_meta_line;
pub use rollout::rollout_date_parts;
mod function_tool;
mod state;
mod tasks;

View File

@@ -1,4 +1,5 @@
use std::cmp::Reverse;
use std::ffi::OsStr;
use std::io::{self};
use std::num::NonZero;
use std::ops::ControlFlow;
@@ -15,6 +16,7 @@ use time::format_description::well_known::Rfc3339;
use time::macros::format_description;
use uuid::Uuid;
use super::ARCHIVED_SESSIONS_SUBDIR;
use super::SESSIONS_SUBDIR;
use crate::protocol::EventMsg;
use codex_file_search as file_search;
@@ -1054,11 +1056,9 @@ fn truncate_to_seconds(dt: OffsetDateTime) -> Option<OffsetDateTime> {
dt.replace_nanosecond(0).ok()
}
/// Locate a recorded thread rollout file by its UUID string using the existing
/// paginated listing implementation. Returns `Ok(Some(path))` if found, `Ok(None)` if not present
/// or the id is invalid.
pub async fn find_thread_path_by_id_str(
async fn find_thread_path_by_id_str_in_subdir(
codex_home: &Path,
subdir: &str,
id_str: &str,
) -> io::Result<Option<PathBuf>> {
// Validate UUID format early.
@@ -1067,7 +1067,7 @@ pub async fn find_thread_path_by_id_str(
}
let mut root = codex_home.to_path_buf();
root.push(SESSIONS_SUBDIR);
root.push(subdir);
if !root.exists() {
return Ok(None);
}
@@ -1099,3 +1099,31 @@ pub async fn find_thread_path_by_id_str(
.next()
.map(|m| root.join(m.path)))
}
/// Locate a recorded thread rollout file by its UUID string using the existing
/// paginated listing implementation. Returns `Ok(Some(path))` if found, `Ok(None)` if not present
/// or the id is invalid.
pub async fn find_thread_path_by_id_str(
codex_home: &Path,
id_str: &str,
) -> io::Result<Option<PathBuf>> {
find_thread_path_by_id_str_in_subdir(codex_home, SESSIONS_SUBDIR, id_str).await
}
/// Locate an archived thread rollout file by its UUID string.
pub async fn find_archived_thread_path_by_id_str(
codex_home: &Path,
id_str: &str,
) -> io::Result<Option<PathBuf>> {
find_thread_path_by_id_str_in_subdir(codex_home, ARCHIVED_SESSIONS_SUBDIR, id_str).await
}
/// Extract the `YYYY/MM/DD` directory components from a rollout filename.
pub fn rollout_date_parts(file_name: &OsStr) -> Option<(String, String, String)> {
let name = file_name.to_string_lossy();
let date = name.strip_prefix("rollout-")?.get(..10)?;
let year = date.get(..4)?.to_string();
let month = date.get(5..7)?.to_string();
let day = date.get(8..10)?.to_string();
Some((year, month, day))
}

View File

@@ -15,9 +15,11 @@ pub(crate) mod truncation;
pub use codex_protocol::protocol::SessionMeta;
pub(crate) use error::map_session_init_error;
pub use list::find_archived_thread_path_by_id_str;
pub use list::find_thread_path_by_id_str;
#[deprecated(note = "use find_thread_path_by_id_str")]
pub use list::find_thread_path_by_id_str as find_conversation_path_by_id_str;
pub use list::rollout_date_parts;
pub use recorder::RolloutRecorder;
pub use recorder::RolloutRecorderParams;

View File

@@ -1,5 +1,6 @@
#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::ffi::OsStr;
use std::fs::File;
use std::fs::FileTimes;
use std::fs::{self};
@@ -21,6 +22,7 @@ use crate::rollout::list::ThreadItem;
use crate::rollout::list::ThreadSortKey;
use crate::rollout::list::ThreadsPage;
use crate::rollout::list::get_threads;
use crate::rollout::rollout_date_parts;
use anyhow::Result;
use codex_protocol::ThreadId;
use codex_protocol::models::ContentItem;
@@ -43,6 +45,16 @@ fn provider_vec(providers: &[&str]) -> Vec<String> {
.collect()
}
#[test]
fn rollout_date_parts_extracts_directory_components() {
let file_name = OsStr::new("rollout-2025-03-01T09-00-00-123.jsonl");
let parts = rollout_date_parts(file_name);
assert_eq!(
parts,
Some(("2025".to_string(), "03".to_string(), "01".to_string()))
);
}
fn write_session_file(
root: &Path,
ts_str: &str,

View File

@@ -3,14 +3,15 @@ use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use codex_core::find_archived_thread_path_by_id_str;
use codex_core::find_thread_path_by_id_str;
use tempfile::TempDir;
use uuid::Uuid;
/// Create sessions/YYYY/MM/DD and write a minimal rollout file containing the
/// Create <subdir>/YYYY/MM/DD and write a minimal rollout file containing the
/// provided conversation id in the SessionMeta line. Returns the absolute path.
fn write_minimal_rollout_with_id(codex_home: &Path, id: Uuid) -> PathBuf {
let sessions = codex_home.join("sessions/2024/01/01");
fn write_minimal_rollout_with_id_in_subdir(codex_home: &Path, subdir: &str, id: Uuid) -> PathBuf {
let sessions = codex_home.join(subdir).join("2024/01/01");
std::fs::create_dir_all(&sessions).unwrap();
let file = sessions.join(format!("rollout-2024-01-01T00-00-00-{id}.jsonl"));
@@ -37,6 +38,12 @@ fn write_minimal_rollout_with_id(codex_home: &Path, id: Uuid) -> PathBuf {
file
}
/// Create sessions/YYYY/MM/DD and write a minimal rollout file containing the
/// provided conversation id in the SessionMeta line. Returns the absolute path.
fn write_minimal_rollout_with_id(codex_home: &Path, id: Uuid) -> PathBuf {
write_minimal_rollout_with_id_in_subdir(codex_home, "sessions", id)
}
#[tokio::test]
async fn find_locates_rollout_file_by_id() {
let home = TempDir::new().unwrap();
@@ -79,3 +86,16 @@ async fn find_ignores_granular_gitignore_rules() {
assert_eq!(found, Some(expected));
}
#[tokio::test]
async fn find_archived_locates_rollout_file_by_id() {
let home = TempDir::new().unwrap();
let id = Uuid::new_v4();
let expected = write_minimal_rollout_with_id_in_subdir(home.path(), "archived_sessions", id);
let found = find_archived_thread_path_by_id_str(home.path(), &id.to_string())
.await
.unwrap();
assert_eq!(found, Some(expected));
}

View File

@@ -79,6 +79,10 @@ Interrupt a running turn: `interruptConversation`.
List/resume/archive: `listConversations`, `resumeConversation`, `archiveConversation`.
For v2 threads, use `thread/list` with `archived: true` to list archived rollouts and
`thread/unarchive` to restore them to the active sessions directory (it returns the restored
thread summary).
## Models
Fetch the catalog of models available in the current Codex build with `model/list`. The request accepts optional pagination inputs: