mirror of
https://github.com/openai/codex.git
synced 2026-04-17 19:24:47 +00:00
Compare commits
6 Commits
dev/ningyi
...
jif/cascad
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d8b13ae9c | ||
|
|
41fa376cc5 | ||
|
|
bf01c21710 | ||
|
|
7fbef76cf3 | ||
|
|
88faf875d2 | ||
|
|
1435addd61 |
@@ -146,7 +146,7 @@ Example with notification opt-out:
|
||||
- `thread/memoryMode/set` — experimental; set a thread’s persisted memory eligibility to `"enabled"` or `"disabled"` for either a loaded thread or a stored rollout; returns `{}` on success.
|
||||
- `memory/reset` — experimental; clear the current `CODEX_HOME/memories` directory and reset persisted memory stage data in sqlite while preserving existing thread memory modes; returns `{}` on success.
|
||||
- `thread/status/changed` — notification emitted when a loaded thread’s status changes (`threadId` + new `status`).
|
||||
- `thread/archive` — move a thread’s rollout file into the archived directory; returns `{}` on success and emits `thread/archived`.
|
||||
- `thread/archive` — move a thread’s rollout file into the archived directory and attempt to move any spawned descendant thread rollout files; returns `{}` on success and emits `thread/archived` for each archived thread.
|
||||
- `thread/unsubscribe` — unsubscribe this connection from thread turn/item events. If this was the last subscriber, the server keeps the thread loaded and unloads it only after it has had no subscribers and no thread activity for 30 minutes, then emits `thread/closed`.
|
||||
- `thread/name/set` — set or update a thread’s user-facing name for either a loaded thread or a persisted rollout; returns `{}` on success and emits `thread/name/updated` to initialized, opted-in clients. Thread names are not required to be unique; name lookups resolve to the most recently updated thread.
|
||||
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success and emits `thread/unarchived`.
|
||||
@@ -426,7 +426,7 @@ Experimental: use `memory/reset` to clear local memory artifacts and sqlite-back
|
||||
|
||||
### Example: Archive a thread
|
||||
|
||||
Use `thread/archive` to move the persisted rollout (stored as a JSONL file on disk) into the archived sessions directory.
|
||||
Use `thread/archive` to move the persisted rollout (stored as a JSONL file on disk) into the archived sessions directory and attempt to move any spawned descendant thread rollouts.
|
||||
|
||||
```json
|
||||
{ "method": "thread/archive", "id": 21, "params": { "threadId": "thr_b" } }
|
||||
|
||||
@@ -2732,8 +2732,36 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
let thread_id_str = thread_id.to_string();
|
||||
if let Err(err) = self
|
||||
let mut thread_ids = vec![thread_id];
|
||||
if let Some(state_db_ctx) = get_state_db(&self.config).await {
|
||||
let descendants = match state_db_ctx.list_thread_spawn_descendants(thread_id).await {
|
||||
Ok(descendants) => descendants,
|
||||
Err(err) => {
|
||||
self.outgoing
|
||||
.send_error(
|
||||
request_id,
|
||||
JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!(
|
||||
"failed to list spawned descendants for thread id {thread_id}: {err}"
|
||||
),
|
||||
data: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
let mut seen = HashSet::from([thread_id]);
|
||||
for descendant_id in descendants {
|
||||
if seen.insert(descendant_id) {
|
||||
thread_ids.push(descendant_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut archive_thread_ids = Vec::new();
|
||||
match self
|
||||
.thread_store
|
||||
.read_thread(StoreReadThreadParams {
|
||||
thread_id,
|
||||
@@ -2742,34 +2770,98 @@ impl CodexMessageProcessor {
|
||||
})
|
||||
.await
|
||||
{
|
||||
self.outgoing
|
||||
.send_error(request_id, thread_store_archive_error("archive", err))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
self.prepare_thread_for_archive(thread_id).await;
|
||||
|
||||
match self
|
||||
.thread_store
|
||||
.archive_thread(StoreArchiveThreadParams { thread_id })
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
let response = ThreadArchiveResponse {};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
let notification = ThreadArchivedNotification {
|
||||
thread_id: thread_id_str,
|
||||
};
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::ThreadArchived(notification))
|
||||
.await;
|
||||
Ok(thread) => {
|
||||
if thread.archived_at.is_none() {
|
||||
archive_thread_ids.push(thread_id);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
self.outgoing
|
||||
.send_error(request_id, thread_store_archive_error("archive", err))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
for descendant_thread_id in thread_ids.into_iter().skip(1) {
|
||||
match self
|
||||
.thread_store
|
||||
.read_thread(StoreReadThreadParams {
|
||||
thread_id: descendant_thread_id,
|
||||
include_archived: true,
|
||||
include_history: false,
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(thread) => {
|
||||
if thread.archived_at.is_none() {
|
||||
archive_thread_ids.push(descendant_thread_id);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to read spawned descendant thread {descendant_thread_id} while archiving {thread_id}: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut archived_thread_ids = Vec::new();
|
||||
let Some((parent_thread_id, descendant_thread_ids)) = archive_thread_ids.split_first()
|
||||
else {
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadArchiveResponse {})
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
|
||||
self.prepare_thread_for_archive(*parent_thread_id).await;
|
||||
match self
|
||||
.thread_store
|
||||
.archive_thread(StoreArchiveThreadParams {
|
||||
thread_id: *parent_thread_id,
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
archived_thread_ids.push(parent_thread_id.to_string());
|
||||
}
|
||||
Err(err) => {
|
||||
self.outgoing
|
||||
.send_error(request_id, thread_store_archive_error("archive", err))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
for descendant_thread_id in descendant_thread_ids.iter().rev().copied() {
|
||||
self.prepare_thread_for_archive(descendant_thread_id).await;
|
||||
match self
|
||||
.thread_store
|
||||
.archive_thread(StoreArchiveThreadParams {
|
||||
thread_id: descendant_thread_id,
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
archived_thread_ids.push(descendant_thread_id.to_string());
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to archive spawned descendant thread {descendant_thread_id} while archiving {thread_id}: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadArchiveResponse {})
|
||||
.await;
|
||||
for thread_id in archived_thread_ids {
|
||||
let notification = ThreadArchivedNotification { thread_id };
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::ThreadArchived(notification))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn thread_increment_elicitation(
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_fake_rollout;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
@@ -19,7 +20,11 @@ use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_core::ARCHIVED_SESSIONS_SUBDIR;
|
||||
use codex_core::find_archived_thread_path_by_id_str;
|
||||
use codex_core::find_thread_path_by_id_str;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_state::DirectionalThreadSpawnEdgeStatus;
|
||||
use codex_state::StateRuntime;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
@@ -160,6 +165,311 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_archive_archives_spawned_descendants() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let parent_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-00-00",
|
||||
"2025-01-01T00:00:00Z",
|
||||
"parent",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let child_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-01-00",
|
||||
"2025-01-01T00:01:00Z",
|
||||
"child",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let grandchild_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-02-00",
|
||||
"2025-01-01T00:02:00Z",
|
||||
"grandchild",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
|
||||
let parent_thread_id = ThreadId::from_string(&parent_id)?;
|
||||
let child_thread_id = ThreadId::from_string(&child_id)?;
|
||||
let grandchild_thread_id = ThreadId::from_string(&grandchild_id)?;
|
||||
let state_db =
|
||||
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
|
||||
state_db
|
||||
.mark_backfill_complete(/*last_watermark*/ None)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
child_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Closed,
|
||||
)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
child_thread_id,
|
||||
grandchild_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let archive_id = mcp
|
||||
.send_thread_archive_request(ThreadArchiveParams {
|
||||
thread_id: parent_id.clone(),
|
||||
})
|
||||
.await?;
|
||||
let archive_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
|
||||
|
||||
let mut archived_ids = Vec::new();
|
||||
for _ in 0..3 {
|
||||
let notification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/archived"),
|
||||
)
|
||||
.await??;
|
||||
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
|
||||
notification
|
||||
.params
|
||||
.expect("thread/archived notification params"),
|
||||
)?;
|
||||
archived_ids.push(archived_notification.thread_id);
|
||||
}
|
||||
assert_eq!(archived_ids, vec![parent_id, grandchild_id, child_id]);
|
||||
|
||||
for thread_id in [parent_thread_id, child_thread_id, grandchild_thread_id] {
|
||||
assert!(
|
||||
find_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
|
||||
.await?
|
||||
.is_none(),
|
||||
"expected active rollout for {thread_id} to be archived"
|
||||
);
|
||||
assert!(
|
||||
find_archived_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
|
||||
.await?
|
||||
.is_some(),
|
||||
"expected archived rollout for {thread_id} to exist"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_archive_succeeds_when_descendant_archive_fails() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let parent_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-00-00",
|
||||
"2025-01-01T00:00:00Z",
|
||||
"parent",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let child_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-01-00",
|
||||
"2025-01-01T00:01:00Z",
|
||||
"child",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let grandchild_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-02-00",
|
||||
"2025-01-01T00:02:00Z",
|
||||
"grandchild",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
|
||||
let parent_thread_id = ThreadId::from_string(&parent_id)?;
|
||||
let child_thread_id = ThreadId::from_string(&child_id)?;
|
||||
let grandchild_thread_id = ThreadId::from_string(&grandchild_id)?;
|
||||
let state_db =
|
||||
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
|
||||
state_db
|
||||
.mark_backfill_complete(/*last_watermark*/ None)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
child_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Closed,
|
||||
)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
child_thread_id,
|
||||
grandchild_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let child_rollout_path = find_thread_path_by_id_str(codex_home.path(), &child_id)
|
||||
.await?
|
||||
.expect("child rollout path");
|
||||
let archived_child_path = codex_home
|
||||
.path()
|
||||
.join(ARCHIVED_SESSIONS_SUBDIR)
|
||||
.join(child_rollout_path.file_name().expect("rollout file name"));
|
||||
std::fs::create_dir_all(&archived_child_path)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let archive_id = mcp
|
||||
.send_thread_archive_request(ThreadArchiveParams {
|
||||
thread_id: parent_id.clone(),
|
||||
})
|
||||
.await?;
|
||||
let archive_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
|
||||
|
||||
let mut archived_ids = Vec::new();
|
||||
for _ in 0..2 {
|
||||
let notification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/archived"),
|
||||
)
|
||||
.await??;
|
||||
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
|
||||
notification
|
||||
.params
|
||||
.expect("thread/archived notification params"),
|
||||
)?;
|
||||
archived_ids.push(archived_notification.thread_id);
|
||||
}
|
||||
assert_eq!(archived_ids, vec![parent_id, grandchild_id]);
|
||||
|
||||
assert!(
|
||||
timeout(
|
||||
std::time::Duration::from_millis(250),
|
||||
mcp.read_stream_until_notification_message("thread/archived"),
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
|
||||
assert!(
|
||||
child_rollout_path.exists(),
|
||||
"child should stay active after descendant archive failure"
|
||||
);
|
||||
assert!(
|
||||
archived_child_path.is_dir(),
|
||||
"test conflict should remain in archived sessions"
|
||||
);
|
||||
for thread_id in [parent_thread_id, grandchild_thread_id] {
|
||||
assert!(
|
||||
find_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
|
||||
.await?
|
||||
.is_none(),
|
||||
"expected active rollout for {thread_id} to be archived"
|
||||
);
|
||||
assert!(
|
||||
find_archived_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
|
||||
.await?
|
||||
.is_some(),
|
||||
"expected archived rollout for {thread_id} to exist"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_archive_succeeds_when_spawned_descendant_is_missing() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let parent_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-00-00",
|
||||
"2025-01-01T00:00:00Z",
|
||||
"parent",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let parent_thread_id = ThreadId::from_string(&parent_id)?;
|
||||
let missing_child_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000901")?;
|
||||
|
||||
let state_db =
|
||||
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
|
||||
state_db
|
||||
.mark_backfill_complete(/*last_watermark*/ None)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
missing_child_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Closed,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let archive_id = mcp
|
||||
.send_thread_archive_request(ThreadArchiveParams {
|
||||
thread_id: parent_id.clone(),
|
||||
})
|
||||
.await?;
|
||||
let archive_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
|
||||
|
||||
let notification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/archived"),
|
||||
)
|
||||
.await??;
|
||||
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
|
||||
notification
|
||||
.params
|
||||
.expect("thread/archived notification params"),
|
||||
)?;
|
||||
assert_eq!(archived_notification.thread_id, parent_id);
|
||||
|
||||
assert!(
|
||||
find_thread_path_by_id_str(codex_home.path(), &parent_id)
|
||||
.await?
|
||||
.is_none(),
|
||||
"parent should be archived even when a descendant is missing"
|
||||
);
|
||||
assert!(
|
||||
find_archived_thread_path_by_id_str(codex_home.path(), &parent_id)
|
||||
.await?
|
||||
.is_some(),
|
||||
"parent should be moved into archived sessions"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_archive_clears_stale_subscriptions_before_resume() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
|
||||
@@ -144,6 +144,17 @@ ON CONFLICT(child_thread_id) DO UPDATE SET
|
||||
.await
|
||||
}
|
||||
|
||||
/// List all spawned descendants of `root_thread_id`.
|
||||
///
|
||||
/// Descendants are returned breadth-first by depth, then by thread id for stable ordering.
|
||||
pub async fn list_thread_spawn_descendants(
|
||||
&self,
|
||||
root_thread_id: ThreadId,
|
||||
) -> anyhow::Result<Vec<ThreadId>> {
|
||||
self.list_thread_spawn_descendants_matching(root_thread_id, /*status*/ None)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Find a direct spawned child of `parent_thread_id` by canonical agent path.
|
||||
pub async fn find_thread_spawn_child_by_path(
|
||||
&self,
|
||||
@@ -1655,5 +1666,11 @@ mod tests {
|
||||
.await
|
||||
.expect("open descendants from child should load");
|
||||
assert_eq!(open_descendants_from_child, vec![grandchild_thread_id]);
|
||||
|
||||
let all_descendants = runtime
|
||||
.list_thread_spawn_descendants(parent_thread_id)
|
||||
.await
|
||||
.expect("all descendants should load");
|
||||
assert_eq!(all_descendants, vec![child_thread_id, grandchild_thread_id]);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user