feat(app-server): thread/read API (#9569)

This commit is contained in:
Owen Lin
2026-01-22 12:22:01 -08:00
committed by GitHub
parent 8b3521ee77
commit 80240b3b67
7 changed files with 290 additions and 1 deletions

View File

@@ -129,6 +129,10 @@ client_request_definitions! {
params: v2::ThreadLoadedListParams,
response: v2::ThreadLoadedListResponse,
},
ThreadRead => "thread/read" {
params: v2::ThreadReadParams,
response: v2::ThreadReadResponse,
},
SkillsList => "skills/list" {
params: v2::SkillsListParams,
response: v2::SkillsListResponse,

View File

@@ -1246,6 +1246,23 @@ pub struct ThreadLoadedListResponse {
pub next_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadReadParams {
pub thread_id: String,
/// When true, include turns and their items from rollout history.
#[serde(default)]
pub include_turns: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadReadResponse {
pub thread: Thread,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
@@ -1417,7 +1434,8 @@ pub struct Thread {
pub source: SessionSource,
/// Optional Git metadata captured when the thread was created.
pub git_info: Option<GitInfo>,
/// Only populated on `thread/resume`, `thread/rollback`, `thread/fork` responses.
/// Only populated on `thread/resume`, `thread/rollback`, `thread/fork`, and `thread/read`
/// (when `includeTurns` is true) responses.
/// For all other responses and notifications returning a Thread,
/// the turns field will be an empty list.
pub turns: Vec<Turn>,

View File

@@ -79,6 +79,7 @@ Example (from OpenAI's official VSCode extension):
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; emits `thread/started` and auto-subscribes you to turn/item events for the new thread.
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders` filtering.
- `thread/loaded/list` — list the thread ids currently loaded in memory.
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`.
- `thread/archive` — move a threads rollout file into the archived directory; returns `{}` on success.
- `thread/rollback` — drop the last N turns from the agents in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications.
@@ -178,6 +179,20 @@ When `nextCursor` is `null`, youve reached the final page.
} }
```
### Example: Read a thread
Use `thread/read` to fetch a stored thread by id without resuming it. Pass `includeTurns` when you want the rollout history loaded into `thread.turns`.
```json
{ "method": "thread/read", "id": 22, "params": { "threadId": "thr_123" } }
{ "id": 22, "result": { "thread": { "id": "thr_123", "turns": [] } } }
```
```json
{ "method": "thread/read", "id": 23, "params": { "threadId": "thr_123", "includeTurns": true } }
{ "id": 23, "result": { "thread": { "id": "thr_123", "turns": [ ... ] } } }
```
### Example: Archive a thread
Use `thread/archive` to move the persisted rollout (stored as a JSONL file on disk) into the archived sessions directory.

View File

@@ -101,6 +101,8 @@ use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadLoadedListParams;
use codex_app_server_protocol::ThreadLoadedListResponse;
use codex_app_server_protocol::ThreadReadParams;
use codex_app_server_protocol::ThreadReadResponse;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadRollbackParams;
@@ -403,6 +405,9 @@ impl CodexMessageProcessor {
ClientRequest::ThreadLoadedList { request_id, params } => {
self.thread_loaded_list(request_id, params).await;
}
ClientRequest::ThreadRead { request_id, params } => {
self.thread_read(request_id, params).await;
}
ClientRequest::SkillsList { request_id, params } => {
self.skills_list(request_id, params).await;
}
@@ -1702,6 +1707,83 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
async fn thread_read(&self, request_id: RequestId, params: ThreadReadParams) {
let ThreadReadParams {
thread_id,
include_turns,
} = params;
let thread_uuid = match ThreadId::from_string(&thread_id) {
Ok(id) => id,
Err(err) => {
self.send_invalid_request_error(request_id, format!("invalid thread id: {err}"))
.await;
return;
}
};
let rollout_path =
match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string())
.await
{
Ok(Some(path)) => path,
Ok(None) => {
self.send_invalid_request_error(
request_id,
format!("no rollout found for thread id {thread_uuid}"),
)
.await;
return;
}
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate thread id {thread_uuid}: {err}"),
)
.await;
return;
}
};
let fallback_provider = self.config.model_provider_id.as_str();
let mut thread = match read_summary_from_rollout(&rollout_path, fallback_provider).await {
Ok(summary) => summary_to_thread(summary),
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_uuid}: {err}",
rollout_path.display()
),
)
.await;
return;
}
};
if include_turns {
match read_event_msgs_from_rollout(&rollout_path).await {
Ok(events) => {
thread.turns = build_turns_from_event_msgs(&events);
}
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_uuid}: {err}",
rollout_path.display()
),
)
.await;
return;
}
}
}
let response = ThreadReadResponse { thread };
self.outgoing.send_response(request_id, response).await;
}
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
self.thread_manager.subscribe_thread_created()
}

View File

@@ -48,6 +48,7 @@ use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadLoadedListParams;
use codex_app_server_protocol::ThreadReadParams;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadRollbackParams;
use codex_app_server_protocol::ThreadStartParams;
@@ -390,6 +391,15 @@ impl McpProcess {
self.send_request("thread/loaded/list", params).await
}
/// Send a `thread/read` JSON-RPC request.
pub async fn send_thread_read_request(
&mut self,
params: ThreadReadParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/read", params).await
}
/// Send a `model/list` JSON-RPC request.
pub async fn send_list_models_request(
&mut self,

View File

@@ -12,6 +12,7 @@ mod thread_archive;
mod thread_fork;
mod thread_list;
mod thread_loaded_list;
mod thread_read;
mod thread_resume;
mod thread_rollback;
mod thread_start;

View File

@@ -0,0 +1,159 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout_with_text_elements;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::SessionSource;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadReadParams;
use codex_app_server_protocol::ThreadReadResponse;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput;
use codex_protocol::user_input::ByteRange;
use codex_protocol::user_input::TextElement;
use pretty_assertions::assert_eq;
use std::path::Path;
use std::path::PathBuf;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_read_returns_summary_without_turns() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let preview = "Saved user message";
let text_elements = [TextElement::new(
ByteRange { start: 0, end: 5 },
Some("<note>".into()),
)];
let conversation_id = create_fake_rollout_with_text_elements(
codex_home.path(),
"2025-01-05T12-00-00",
"2025-01-05T12:00:00Z",
preview,
text_elements
.iter()
.map(|elem| serde_json::to_value(elem).expect("serialize text element"))
.collect(),
Some("mock_provider"),
None,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let read_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: conversation_id.clone(),
include_turns: false,
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(thread.id, conversation_id);
assert_eq!(thread.preview, preview);
assert_eq!(thread.model_provider, "mock_provider");
assert!(thread.path.is_absolute());
assert_eq!(thread.cwd, PathBuf::from("/"));
assert_eq!(thread.cli_version, "0.0.0");
assert_eq!(thread.source, SessionSource::Cli);
assert_eq!(thread.git_info, None);
assert_eq!(thread.turns.len(), 0);
Ok(())
}
#[tokio::test]
async fn thread_read_can_include_turns() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let preview = "Saved user message";
let text_elements = vec![TextElement::new(
ByteRange { start: 0, end: 5 },
Some("<note>".into()),
)];
let conversation_id = create_fake_rollout_with_text_elements(
codex_home.path(),
"2025-01-05T12-00-00",
"2025-01-05T12:00:00Z",
preview,
text_elements
.iter()
.map(|elem| serde_json::to_value(elem).expect("serialize text element"))
.collect(),
Some("mock_provider"),
None,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let read_id = mcp
.send_thread_read_request(ThreadReadParams {
thread_id: conversation_id.clone(),
include_turns: true,
})
.await?;
let read_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(read_id)),
)
.await??;
let ThreadReadResponse { thread } = to_response::<ThreadReadResponse>(read_resp)?;
assert_eq!(thread.turns.len(), 1);
let turn = &thread.turns[0];
assert_eq!(turn.status, TurnStatus::Completed);
assert_eq!(turn.items.len(), 1, "expected user message item");
match &turn.items[0] {
ThreadItem::UserMessage { content, .. } => {
assert_eq!(
content,
&vec![UserInput::Text {
text: preview.to_string(),
text_elements: text_elements.clone().into_iter().map(Into::into).collect(),
}]
);
}
other => panic!("expected user message item, got {other:?}"),
}
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}