Compare commits

...

4 Commits

Author SHA1 Message Date
Eric Traut
64ef82163a Stabilize multi-agent followup interrupt test 2026-04-03 16:41:55 -07:00
Eric Traut
bf85972d4f codex: fix core test metadata constructor (#16594) 2026-04-03 16:12:35 -07:00
Eric Traut
9f20bc3f2b codex: address PR review feedback (#16594) 2026-04-03 16:00:45 -07:00
Eric Traut
0a65490839 Expose fork source thread ids in app-server v2 2026-04-03 15:37:44 -07:00
12 changed files with 254 additions and 43 deletions

View File

@@ -5,4 +5,4 @@ import type { ConversationGitInfo } from "./ConversationGitInfo";
import type { SessionSource } from "./SessionSource";
import type { ThreadId } from "./ThreadId";
export type ConversationSummary = { conversationId: ThreadId, path: string, preview: string, timestamp: string | null, updatedAt: string | null, modelProvider: string, cwd: string, cliVersion: string, source: SessionSource, gitInfo: ConversationGitInfo | null, };
export type ConversationSummary = { conversationId: ThreadId, forkedFromId: ThreadId | null, path: string, preview: string, timestamp: string | null, updatedAt: string | null, modelProvider: string, cwd: string, cliVersion: string, source: SessionSource, gitInfo: ConversationGitInfo | null, };

View File

@@ -89,6 +89,7 @@ pub struct GetConversationSummaryResponse {
#[serde(rename_all = "camelCase")]
pub struct ConversationSummary {
pub conversation_id: ThreadId,
pub forked_from_id: Option<ThreadId>,
pub path: PathBuf,
pub preview: String,
pub timestamp: Option<String>,

View File

@@ -8487,6 +8487,7 @@ async fn summary_from_thread_list_item(
);
return Some(ConversationSummary {
conversation_id: thread_id,
forked_from_id: None,
path: it.path,
preview: it.first_user_message.unwrap_or_default(),
timestamp,
@@ -8531,6 +8532,7 @@ fn thread_id_from_rollout_path(path: &Path) -> Option<ThreadId> {
#[allow(clippy::too_many_arguments)]
fn summary_from_state_db_metadata(
conversation_id: ThreadId,
forked_from_id: Option<ThreadId>,
path: PathBuf,
first_user_message: Option<String>,
timestamp: String,
@@ -8561,6 +8563,7 @@ fn summary_from_state_db_metadata(
};
ConversationSummary {
conversation_id,
forked_from_id,
path,
preview,
timestamp: Some(timestamp),
@@ -8576,6 +8579,7 @@ fn summary_from_state_db_metadata(
fn summary_from_thread_metadata(metadata: &ThreadMetadata) -> ConversationSummary {
summary_from_state_db_metadata(
metadata.id,
metadata.forked_from_id,
metadata.rollout_path.clone(),
metadata.first_user_message.clone(),
metadata
@@ -8658,6 +8662,7 @@ pub(crate) async fn read_summary_from_rollout(
Ok(ConversationSummary {
conversation_id: session_meta.id,
forked_from_id: session_meta.forked_from_id,
timestamp,
updated_at,
path: path.to_path_buf(),
@@ -8718,6 +8723,7 @@ fn extract_conversation_summary(
Some(ConversationSummary {
conversation_id,
forked_from_id: session_meta.forked_from_id,
timestamp,
updated_at,
path,
@@ -8876,6 +8882,7 @@ fn build_thread_from_snapshot(
pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
let ConversationSummary {
conversation_id,
forked_from_id,
path,
preview,
timestamp,
@@ -8897,7 +8904,7 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
Thread {
id: conversation_id.to_string(),
forked_from_id: None,
forked_from_id: forked_from_id.map(|id| id.to_string()),
preview,
ephemeral: false,
model_provider,
@@ -9276,6 +9283,7 @@ mod tests {
let expected = ConversationSummary {
conversation_id,
forked_from_id: None,
timestamp: timestamp.clone(),
updated_at: timestamp,
path,
@@ -9332,6 +9340,7 @@ mod tests {
let expected = ConversationSummary {
conversation_id,
forked_from_id: None,
timestamp: Some(timestamp.clone()),
updated_at: Some("2025-09-05T16:53:11Z".to_string()),
path: path.clone(),
@@ -9429,6 +9438,11 @@ mod tests {
forked_from_id_from_rollout(path.as_path()).await,
Some(forked_from_id.to_string())
);
let summary = read_summary_from_rollout(path.as_path(), "fallback").await?;
let thread = summary_to_thread(summary);
assert_eq!(thread.forked_from_id, Some(forked_from_id.to_string()));
Ok(())
}
@@ -9506,6 +9520,7 @@ mod tests {
let summary = summary_from_state_db_metadata(
conversation_id,
/*forked_from_id*/ None,
PathBuf::from("/tmp/rollout.jsonl"),
Some("hi".to_string()),
"2025-09-05T16:53:11Z".to_string(),
@@ -9528,6 +9543,35 @@ mod tests {
Ok(())
}
#[test]
fn summary_from_state_db_metadata_preserves_forked_from_id() -> Result<()> {
let conversation_id = ThreadId::from_string("bfd12a78-5900-467b-9bc5-d3d35df08191")?;
let forked_from_id = ThreadId::from_string("ad7f0408-99b8-4f6e-a46f-bd0eec433370")?;
let summary = summary_from_state_db_metadata(
conversation_id,
Some(forked_from_id),
PathBuf::from("/tmp/rollout.jsonl"),
Some("hi".to_string()),
"2025-09-05T16:53:11Z".to_string(),
"2025-09-05T16:53:12Z".to_string(),
"test-provider".to_string(),
PathBuf::from("/"),
"0.0.0".to_string(),
serde_json::to_string(&SessionSource::Cli)?,
/*agent_nickname*/ None,
/*agent_role*/ None,
/*git_sha*/ None,
/*git_branch*/ None,
/*git_origin_url*/ None,
);
let thread = summary_to_thread(summary);
assert_eq!(thread.forked_from_id, Some(forked_from_id.to_string()));
Ok(())
}
#[tokio::test]
async fn removing_thread_state_clears_listener_and_active_turn_history() -> Result<()> {
let manager = ThreadStateManager::new();

View File

@@ -24,6 +24,7 @@ const MODEL_PROVIDER: &str = "openai";
fn expected_summary(conversation_id: ThreadId, path: PathBuf) -> ConversationSummary {
ConversationSummary {
conversation_id,
forked_from_id: None,
path,
preview: PREVIEW.to_string(),
timestamp: Some(META_RFC3339.to_string()),

View File

@@ -13,6 +13,7 @@ use tempfile::TempDir;
fn thread_metadata(cwd: &str, title: &str, first_user_message: &str) -> ThreadMetadata {
ThreadMetadata {
id: ThreadId::new(),
forked_from_id: None,
rollout_path: PathBuf::from("/tmp/rollout.jsonl"),
created_at: Utc
.timestamp_opt(1_709_251_100, 0)

View File

@@ -1,4 +1,5 @@
use super::*;
use crate::CodexThread;
use crate::ThreadManager;
use crate::codex::make_session_and_context;
use crate::config::DEFAULT_AGENT_MAX_DEPTH;
@@ -112,6 +113,73 @@ fn history_contains_inter_agent_communication(
})
}
async fn wait_for_turn_aborted(
thread: &Arc<CodexThread>,
expected_turn_id: &str,
expected_reason: TurnAbortReason,
) {
timeout(Duration::from_secs(5), async {
loop {
let event = thread
.next_event()
.await
.expect("child thread should emit events");
if matches!(
event.msg,
EventMsg::TurnAborted(TurnAbortedEvent {
turn_id: Some(ref turn_id),
ref reason,
}) if turn_id == expected_turn_id && *reason == expected_reason
) {
break;
}
}
})
.await
.expect("expected child turn to be interrupted");
}
async fn wait_for_redirected_envelope_in_history(
thread: &Arc<CodexThread>,
expected: &InterAgentCommunication,
) {
timeout(Duration::from_secs(5), async {
loop {
let history_items = thread
.codex
.session
.clone_history()
.await
.raw_items()
.to_vec();
let saw_envelope =
history_contains_inter_agent_communication(&history_items, expected);
let saw_user_message = history_items.iter().any(|item| {
matches!(
item,
ResponseItem::Message { role, content, .. }
if role == "user"
&& content.iter().any(|content_item| matches!(
content_item,
ContentItem::InputText { text }
if text == &expected.content
))
)
});
if saw_envelope {
assert!(
!saw_user_message,
"redirected followup should be stored as an assistant envelope, not a plain user message"
);
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("redirected followup envelope should appear in history");
}
#[derive(Clone, Copy)]
struct NeverEndingTask;
@@ -1199,6 +1267,7 @@ async fn multi_agent_v2_followup_task_interrupts_busy_child_without_losing_messa
.expect("worker thread should exist");
let active_turn = thread.codex.session.new_default_turn().await;
let interrupted_turn_id = active_turn.sub_id.clone();
thread
.codex
.session
@@ -1243,44 +1312,18 @@ async fn multi_agent_v2_followup_task_interrupts_busy_child_without_losing_messa
)
}));
timeout(Duration::from_secs(5), async {
loop {
let history_items = thread
.codex
.session
.clone_history()
.await
.raw_items()
.to_vec();
let saw_envelope = history_contains_inter_agent_communication(
&history_items,
&InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
Vec::new(),
"continue".to_string(),
/*trigger_turn*/ true,
),
);
let saw_user_message = history_items.iter().any(|item| {
matches!(
item,
ResponseItem::Message { role, content, .. }
if role == "user"
&& content.iter().any(|content_item| matches!(
content_item,
ContentItem::InputText { text } if text == "continue"
))
)
});
if saw_envelope && !saw_user_message {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("interrupting v2 followup_task should preserve the redirected message");
wait_for_turn_aborted(&thread, &interrupted_turn_id, TurnAbortReason::Interrupted).await;
wait_for_redirected_envelope_in_history(
&thread,
&InterAgentCommunication::new(
AgentPath::root(),
AgentPath::try_from("/root/worker").expect("agent path"),
Vec::new(),
"continue".to_string(),
/*trigger_turn*/ true,
),
)
.await;
let _ = thread
.submit(Op::Shutdown {})

View File

@@ -0,0 +1,2 @@
ALTER TABLE threads
ADD COLUMN forked_from_id TEXT;

View File

@@ -47,6 +47,7 @@ fn apply_session_meta_from_item(metadata: &mut ThreadMetadata, meta_line: &Sessi
return;
}
metadata.id = meta_line.meta.id;
metadata.forked_from_id = meta_line.meta.forked_from_id;
metadata.source = enum_to_string(&meta_line.meta.source);
metadata.agent_nickname = meta_line.meta.agent_nickname.clone();
metadata.agent_role = meta_line.meta.agent_role.clone();
@@ -363,6 +364,40 @@ mod tests {
assert_eq!(metadata.reasoning_effort, Some(ReasoningEffort::High));
}
#[test]
fn session_meta_sets_forked_from_id() {
let mut metadata = metadata_for_test();
let thread_id = metadata.id;
let forked_from_id =
ThreadId::from_string(&Uuid::from_u128(7).to_string()).expect("thread id");
apply_rollout_item(
&mut metadata,
&RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
id: thread_id,
forked_from_id: Some(forked_from_id),
timestamp: "2026-02-26T00:00:00.000Z".to_string(),
cwd: PathBuf::from("/workspace"),
originator: "codex_cli_rs".to_string(),
cli_version: "0.0.0".to_string(),
source: SessionSource::Cli,
agent_path: None,
agent_nickname: None,
agent_role: None,
model_provider: Some("openai".to_string()),
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
},
git: None,
}),
"test-provider",
);
assert_eq!(metadata.forked_from_id, Some(forked_from_id));
}
#[test]
fn session_meta_does_not_set_model_or_reasoning_effort() {
let mut metadata = metadata_for_test();
@@ -401,6 +436,7 @@ mod tests {
let created_at = DateTime::<Utc>::from_timestamp(1_735_689_600, 0).expect("timestamp");
ThreadMetadata {
id,
forked_from_id: None,
rollout_path: PathBuf::from("/tmp/a.jsonl"),
created_at,
updated_at: created_at,
@@ -429,11 +465,14 @@ mod tests {
fn diff_fields_detects_changes() {
let mut base = metadata_for_test();
base.id = ThreadId::from_string(&Uuid::now_v7().to_string()).expect("thread id");
base.forked_from_id =
Some(ThreadId::from_string(&Uuid::now_v7().to_string()).expect("thread id"));
base.title = "hello".to_string();
let mut other = base.clone();
other.forked_from_id = None;
other.tokens_used = 2;
other.title = "world".to_string();
let diffs = base.diff_fields(&other);
assert_eq!(diffs, vec!["title", "tokens_used"]);
assert_eq!(diffs, vec!["forked_from_id", "title", "tokens_used"]);
}
}

View File

@@ -57,6 +57,8 @@ pub struct ExtractionOutcome {
pub struct ThreadMetadata {
/// The thread identifier.
pub id: ThreadId,
/// The source thread identifier this thread was forked from, if any.
pub forked_from_id: Option<ThreadId>,
/// The absolute rollout path on disk.
pub rollout_path: PathBuf,
/// The creation timestamp.
@@ -106,6 +108,8 @@ pub struct ThreadMetadata {
pub struct ThreadMetadataBuilder {
/// The thread identifier.
pub id: ThreadId,
/// The source thread identifier this thread was forked from, if any.
pub forked_from_id: Option<ThreadId>,
/// The absolute rollout path on disk.
pub rollout_path: PathBuf,
/// The creation timestamp.
@@ -150,6 +154,7 @@ impl ThreadMetadataBuilder {
) -> Self {
Self {
id,
forked_from_id: None,
rollout_path,
created_at,
updated_at: None,
@@ -181,6 +186,7 @@ impl ThreadMetadataBuilder {
.unwrap_or(created_at);
ThreadMetadata {
id: self.id,
forked_from_id: self.forked_from_id,
rollout_path: self.rollout_path.clone(),
created_at,
updated_at,
@@ -232,6 +238,9 @@ impl ThreadMetadata {
if self.id != other.id {
diffs.push("id");
}
if self.forked_from_id != other.forked_from_id {
diffs.push("forked_from_id");
}
if self.rollout_path != other.rollout_path {
diffs.push("rollout_path");
}
@@ -306,6 +315,7 @@ fn canonicalize_datetime(dt: DateTime<Utc>) -> DateTime<Utc> {
#[derive(Debug)]
pub(crate) struct ThreadRow {
id: String,
forked_from_id: Option<String>,
rollout_path: String,
created_at: i64,
updated_at: i64,
@@ -333,6 +343,7 @@ impl ThreadRow {
pub(crate) fn try_from_row(row: &SqliteRow) -> Result<Self> {
Ok(Self {
id: row.try_get("id")?,
forked_from_id: row.try_get("forked_from_id")?,
rollout_path: row.try_get("rollout_path")?,
created_at: row.try_get("created_at")?,
updated_at: row.try_get("updated_at")?,
@@ -364,6 +375,7 @@ impl TryFrom<ThreadRow> for ThreadMetadata {
fn try_from(row: ThreadRow) -> std::result::Result<Self, Self::Error> {
let ThreadRow {
id,
forked_from_id,
rollout_path,
created_at,
updated_at,
@@ -388,6 +400,7 @@ impl TryFrom<ThreadRow> for ThreadMetadata {
} = row;
Ok(Self {
id: ThreadId::try_from(id)?,
forked_from_id: forked_from_id.map(ThreadId::try_from).transpose()?,
rollout_path: PathBuf::from(rollout_path),
created_at: epoch_seconds_to_datetime(created_at)?,
updated_at: epoch_seconds_to_datetime(updated_at)?,
@@ -457,6 +470,7 @@ mod tests {
fn thread_row(reasoning_effort: Option<&str>) -> ThreadRow {
ThreadRow {
id: "00000000-0000-0000-0000-000000000123".to_string(),
forked_from_id: Some("00000000-0000-0000-0000-000000000456".to_string()),
rollout_path: "/tmp/rollout-123.jsonl".to_string(),
created_at: 1_700_000_000,
updated_at: 1_700_000_100,
@@ -485,6 +499,10 @@ mod tests {
ThreadMetadata {
id: ThreadId::from_string("00000000-0000-0000-0000-000000000123")
.expect("valid thread id"),
forked_from_id: Some(
ThreadId::from_string("00000000-0000-0000-0000-000000000456")
.expect("valid thread id"),
),
rollout_path: PathBuf::from("/tmp/rollout-123.jsonl"),
created_at: DateTime::<Utc>::from_timestamp(1_700_000_000, 0).expect("timestamp"),
updated_at: DateTime::<Utc>::from_timestamp(1_700_000_100, 0).expect("timestamp"),

View File

@@ -162,6 +162,7 @@ WHERE thread_id = ?
r#"
SELECT
id,
forked_from_id,
rollout_path,
created_at,
updated_at,

View File

@@ -44,6 +44,7 @@ pub(super) fn test_thread_metadata(
let now = DateTime::<Utc>::from_timestamp(1_700_000_000, 0).expect("timestamp");
ThreadMetadata {
id: thread_id,
forked_from_id: None,
rollout_path: codex_home.join(format!("rollout-{thread_id}.jsonl")),
created_at: now,
updated_at: now,

View File

@@ -7,6 +7,7 @@ impl StateRuntime {
r#"
SELECT
id,
forked_from_id,
rollout_path,
created_at,
updated_at,
@@ -344,6 +345,7 @@ ON CONFLICT(child_thread_id) DO NOTHING
r#"
SELECT
id,
forked_from_id,
rollout_path,
created_at,
updated_at,
@@ -445,6 +447,7 @@ FROM threads
r#"
INSERT INTO threads (
id,
forked_from_id,
rollout_path,
created_at,
updated_at,
@@ -468,11 +471,12 @@ INSERT INTO threads (
git_branch,
git_origin_url,
memory_mode
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO NOTHING
"#,
)
.bind(metadata.id.to_string())
.bind(metadata.forked_from_id.map(|id| id.to_string()))
.bind(metadata.rollout_path.display().to_string())
.bind(datetime_to_epoch_seconds(metadata.created_at))
.bind(datetime_to_epoch_seconds(metadata.updated_at))
@@ -572,6 +576,7 @@ WHERE id = ?
r#"
INSERT INTO threads (
id,
forked_from_id,
rollout_path,
created_at,
updated_at,
@@ -595,8 +600,9 @@ INSERT INTO threads (
git_branch,
git_origin_url,
memory_mode
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
forked_from_id = excluded.forked_from_id,
rollout_path = excluded.rollout_path,
created_at = excluded.created_at,
updated_at = excluded.updated_at,
@@ -622,6 +628,7 @@ ON CONFLICT(id) DO UPDATE SET
"#,
)
.bind(metadata.id.to_string())
.bind(metadata.forked_from_id.map(|id| id.to_string()))
.bind(metadata.rollout_path.display().to_string())
.bind(datetime_to_epoch_seconds(metadata.created_at))
.bind(datetime_to_epoch_seconds(metadata.updated_at))
@@ -1051,6 +1058,59 @@ mod tests {
assert_eq!(memory_mode.as_deref(), Some("polluted"));
}
#[tokio::test]
async fn apply_rollout_items_persists_forked_from_id_from_session_meta() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("state db should initialize");
let thread_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000555").expect("valid thread id");
let forked_from_id =
ThreadId::from_string("00000000-0000-0000-0000-000000000777").expect("valid thread id");
let metadata = test_thread_metadata(&codex_home, thread_id, codex_home.clone());
let builder = ThreadMetadataBuilder::new(
thread_id,
metadata.rollout_path.clone(),
metadata.created_at,
SessionSource::Cli,
);
let items = vec![RolloutItem::SessionMeta(SessionMetaLine {
meta: SessionMeta {
id: thread_id,
forked_from_id: Some(forked_from_id),
timestamp: metadata.created_at.to_rfc3339(),
cwd: metadata.cwd.clone(),
originator: String::new(),
cli_version: "0.0.0".to_string(),
source: SessionSource::Cli,
agent_path: None,
agent_nickname: None,
agent_role: None,
model_provider: None,
base_instructions: None,
dynamic_tools: None,
memory_mode: None,
},
git: None,
})];
runtime
.apply_rollout_items(
&builder, &items, /*new_thread_memory_mode*/ None,
/*updated_at_override*/ None,
)
.await
.expect("apply_rollout_items should succeed");
let persisted = runtime
.get_thread(thread_id)
.await
.expect("thread should load")
.expect("thread should exist");
assert_eq!(persisted.forked_from_id, Some(forked_from_id));
}
#[tokio::test]
async fn apply_rollout_items_preserves_existing_git_branch_and_fills_missing_git_fields() {
let codex_home = unique_temp_dir();