Add subagent lineage metadata for responsesapi (#24161)

## Why

We recently added `forked_from_thread_id` which lets us trace where a
thread's _context_ comes from, but we also want to understand subagent
lineage (e.g. which parent thread spawned this subagent? what kind of
subagent is it?) which is orthogonal.

This PR adds `parent_thread_id` and `subagent_kind` to the
`x-codex-turn-metadata` header sent to ResponsesAPI.

## What changed

- Adds `parent_thread_id` and `subagent_kind` to core-owned
`x-codex-turn-metadata`.
- Restores persisted `SessionSource` and `ThreadSource` from resumed
session metadata so cold-resumed subagent threads keep their lineage on
later Responses API requests.
- Centralizes parent-thread extraction on `SessionSource` /
`SubAgentSource` and reuses it in the Responses client, analytics, agent
control, and state parsing paths.
- Extends reserved-key, git-enrichment, thread-spawn, and app-server v2
metadata coverage for the new lineage fields.

## Verification

- Not run locally per request.
- Added focused coverage in `core/src/turn_metadata_tests.rs` and
`app-server/tests/suite/v2/client_metadata.rs`.
This commit is contained in:
Owen Lin
2026-05-29 11:28:12 -07:00
committed by GitHub
parent 62039e8d35
commit fc9cf62efb
11 changed files with 450 additions and 86 deletions

View File

@@ -1069,12 +1069,9 @@ pub(crate) fn subagent_source_name(subagent_source: &SubAgentSource) -> String {
}
pub(crate) fn subagent_parent_thread_id(subagent_source: &SubAgentSource) -> Option<String> {
match subagent_source {
SubAgentSource::ThreadSpawn {
parent_thread_id, ..
} => Some(parent_thread_id.to_string()),
_ => None,
}
subagent_source
.parent_thread_id()
.map(|parent_thread_id| parent_thread_id.to_string())
}
fn analytics_hook_status(status: HookRunStatus) -> HookRunStatus {

View File

@@ -1,6 +1,7 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout;
use app_test_support::create_fake_rollout_with_source;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
@@ -10,6 +11,8 @@ use codex_app_server_protocol::ReviewStartResponse;
use codex_app_server_protocol::ReviewTarget;
use codex_app_server_protocol::ThreadForkParams;
use codex_app_server_protocol::ThreadForkResponse;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadSource;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
@@ -18,6 +21,9 @@ use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnSteerParams;
use codex_app_server_protocol::TurnSteerResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_protocol::ThreadId as CoreThreadId;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use core_test_support::responses;
use core_test_support::skip_if_no_network;
use pretty_assertions::assert_eq;
@@ -290,6 +296,104 @@ async fn review_start_sends_fork_lineage_in_turn_metadata_for_thread_fork_v2() -
Ok(())
}
#[tokio::test]
async fn turn_start_sends_subagent_lineage_after_cold_thread_resume_v2() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = responses::start_mock_server().await;
let response_mock = responses::mount_sse_once(
&server,
responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_assistant_message("msg-1", "Done"),
responses::ev_completed("resp-1"),
]),
)
.await;
let codex_home = TempDir::new()?;
create_config_toml(
codex_home.path(),
&server.uri(),
/*supports_websockets*/ false,
)?;
let parent_thread_id = CoreThreadId::new();
let parent_thread_id_str = parent_thread_id.to_string();
let subagent_thread_id = create_fake_rollout_with_source(
codex_home.path(),
"2025-01-05T12-00-00",
"2025-01-05T12:00:00Z",
"Saved subagent message",
Some("mock_provider"),
/*git_info*/ None,
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
}),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let resume_req = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: subagent_thread_id.clone(),
..Default::default()
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_req)),
)
.await??;
let ThreadResumeResponse { thread, .. } = to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(thread.id, subagent_thread_id);
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "Continue".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let request = response_mock.single_request();
let metadata = request
.header("x-codex-turn-metadata")
.as_deref()
.map(parse_json_header)
.unwrap_or_else(|| panic!("missing x-codex-turn-metadata header"));
assert_eq!(
metadata["parent_thread_id"].as_str(),
Some(parent_thread_id_str.as_str())
);
assert_eq!(metadata["subagent_kind"].as_str(), Some("thread_spawn"));
assert_eq!(metadata["thread_id"].as_str(), Some(thread.id.as_str()));
assert_eq!(metadata["turn_id"].as_str(), Some(turn.id.as_str()));
assert!(metadata.get("forked_from_thread_id").is_none());
Ok(())
}
#[tokio::test]
async fn turn_steer_updates_client_metadata_on_follow_up_responses_request_v2() -> Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -1264,12 +1264,7 @@ impl AgentControl {
}
fn thread_spawn_parent_thread_id(session_source: &SessionSource) -> Option<ThreadId> {
match session_source {
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id, ..
}) => Some(*parent_thread_id),
_ => None,
}
session_source.parent_thread_id()
}
fn agent_matches_prefix(agent_path: Option<&AgentPath>, prefix: &AgentPath) -> bool {

View File

@@ -1734,19 +1734,9 @@ fn subagent_header_value(session_source: &SessionSource) -> Option<String> {
}
fn parent_thread_id_header_value(session_source: &SessionSource) -> Option<String> {
match session_source {
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id, ..
}) => Some(parent_thread_id.to_string()),
SessionSource::Cli
| SessionSource::VSCode
| SessionSource::Exec
| SessionSource::Mcp
| SessionSource::Custom(_)
| SessionSource::Internal(_)
| SessionSource::SubAgent(_)
| SessionSource::Unknown => None,
}
session_source
.parent_thread_id()
.map(|parent_thread_id| parent_thread_id.to_string())
}
const RESPONSE_STREAM_CHANNEL_CAPACITY: usize = 1600;

View File

@@ -90,6 +90,7 @@ pub(super) async fn spawn_review_thread(
sess.session_id().to_string(),
sess.thread_id().to_string(),
forked_from_thread_id,
&session_source,
parent_turn_context.thread_source,
review_turn_id.clone(),
#[allow(deprecated)]

View File

@@ -506,6 +506,7 @@ impl Session {
session_id.to_string(),
thread_id.to_string(),
session_configuration.forked_from_thread_id,
&session_configuration.session_source,
session_configuration.thread_source,
sub_id.clone(),
cwd.clone(),

View File

@@ -589,12 +589,12 @@ impl ThreadManager {
options: StartThreadOptions,
forked_from_thread_id: Option<ThreadId>,
) -> CodexResult<NewThread> {
let session_source = options
.session_source
.unwrap_or_else(|| self.state.session_source.clone());
let thread_source = options
.thread_source
.or_else(|| options.initial_history.get_resumed_thread_source());
let (resumed_session_source, resumed_thread_source) = options
.initial_history
.get_resumed_session_sources()
.unwrap_or_else(|| (self.state.session_source.clone(), None));
let session_source = options.session_source.unwrap_or(resumed_session_source);
let thread_source = options.thread_source.or(resumed_thread_source);
Box::pin(self.state.spawn_thread_with_source(
options.config,
options.initial_history,
@@ -676,17 +676,22 @@ impl ThreadManager {
self.state.environment_manager.as_ref(),
&config.cwd,
);
let thread_source = initial_history.get_resumed_thread_source();
Box::pin(self.state.spawn_thread(
let (session_source, thread_source) = initial_history
.get_resumed_session_sources()
.unwrap_or_else(|| (self.state.session_source.clone(), None));
Box::pin(self.state.spawn_thread_with_source(
config,
initial_history,
auth_manager,
self.agent_control(),
session_source,
/*forked_from_thread_id*/ None,
thread_source,
Vec::new(),
persist_extended_history,
/*metrics_service_name*/ None,
/*inherited_shell_snapshot*/ None,
/*inherited_exec_policy*/ None,
parent_trace,
environments,
/*user_shell_override*/ None,
@@ -732,17 +737,22 @@ impl ThreadManager {
self.state.environment_manager.as_ref(),
&config.cwd,
);
let thread_source = initial_history.get_resumed_thread_source();
Box::pin(self.state.spawn_thread(
let (session_source, thread_source) = initial_history
.get_resumed_session_sources()
.unwrap_or_else(|| (self.state.session_source.clone(), None));
Box::pin(self.state.spawn_thread_with_source(
config,
initial_history,
auth_manager,
self.agent_control(),
session_source,
/*forked_from_thread_id*/ None,
thread_source,
Vec::new(),
/*persist_extended_history*/ false,
/*metrics_service_name*/ None,
/*inherited_shell_snapshot*/ None,
/*inherited_exec_policy*/ None,
/*parent_trace*/ None,
environments,
/*user_shell_override*/ Some(user_shell_override),

View File

@@ -25,6 +25,7 @@ use codex_protocol::ThreadId;
use codex_protocol::config_types::WindowsSandboxLevel;
use codex_protocol::models::PermissionProfile;
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::ThreadSource;
use codex_utils_absolute_path::AbsolutePathBuf;
@@ -117,9 +118,10 @@ impl From<WorkspaceGitMetadata> for TurnMetadataWorkspace {
/// Base payload for the outbound model request `x-codex-turn-metadata` header.
///
/// Turn-owned state populates identity fields, including optional fork lineage. A concrete
/// request kind is added at outbound model dispatch so turns, startup prewarm, and compaction
/// remain distinguishable. Detached memory requests are constructed as `memory` directly.
/// Turn-owned state populates identity fields, including optional fork and subagent lineage. A
/// concrete request kind is added at outbound model dispatch so turns, startup prewarm, and
/// compaction remain distinguishable. Detached memory requests are constructed as `memory`
/// directly.
#[derive(Clone, Debug, Serialize)]
pub(crate) struct TurnMetadataBag {
#[serde(default, skip_serializing_if = "Option::is_none")]
@@ -131,6 +133,10 @@ pub(crate) struct TurnMetadataBag {
#[serde(default, skip_serializing_if = "Option::is_none")]
forked_from_thread_id: Option<ThreadId>,
#[serde(default, skip_serializing_if = "Option::is_none")]
parent_thread_id: Option<ThreadId>,
#[serde(default, skip_serializing_if = "Option::is_none")]
subagent_kind: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
thread_source: Option<ThreadSource>,
#[serde(default, skip_serializing_if = "Option::is_none")]
turn_id: Option<String>,
@@ -141,27 +147,6 @@ pub(crate) struct TurnMetadataBag {
}
impl TurnMetadataBag {
fn new(
request_kind: Option<TurnMetadataRequestKind>,
session_id: Option<String>,
thread_id: Option<String>,
forked_from_thread_id: Option<ThreadId>,
thread_source: Option<ThreadSource>,
turn_id: Option<String>,
sandbox: Option<String>,
) -> Self {
Self {
request_kind,
session_id,
thread_id,
forked_from_thread_id,
thread_source,
turn_id,
workspaces: BTreeMap::new(),
sandbox,
}
}
fn with_workspace_git_metadata(
mut self,
repo_root: Option<String>,
@@ -206,6 +191,8 @@ fn merge_turn_metadata(
| "turn_id"
| TURN_STARTED_AT_UNIX_MS_KEY
| "forked_from_thread_id"
| "parent_thread_id"
| "subagent_kind"
| REQUEST_KIND_KEY
| COMPACTION_KEY
| WINDOW_ID_KEY
@@ -232,15 +219,18 @@ pub async fn build_turn_metadata_header(
get_has_changes(cwd),
);
let latest_git_commit_hash = head_commit_hash.map(|sha| sha.0);
TurnMetadataBag::new(
Some(TurnMetadataRequestKind::Memory),
/*session_id*/ None,
/*thread_id*/ None,
/*forked_from_thread_id*/ None,
/*thread_source*/ None,
/*turn_id*/ None,
sandbox.map(ToString::to_string),
)
TurnMetadataBag {
request_kind: Some(TurnMetadataRequestKind::Memory),
session_id: None,
thread_id: None,
forked_from_thread_id: None,
parent_thread_id: None,
subagent_kind: None,
thread_source: None,
turn_id: None,
workspaces: BTreeMap::new(),
sandbox: sandbox.map(ToString::to_string),
}
.with_workspace_git_metadata(
repo_root,
Some(WorkspaceGitMetadata {
@@ -271,6 +261,7 @@ impl TurnMetadataState {
session_id: String,
thread_id: String,
forked_from_thread_id: Option<ThreadId>,
session_source: &SessionSource,
thread_source: Option<ThreadSource>,
turn_id: String,
cwd: AbsolutePathBuf,
@@ -287,15 +278,31 @@ impl TurnMetadataState {
)
.to_string(),
);
let base_metadata = TurnMetadataBag::new(
/*request_kind*/ None,
Some(session_id),
Some(thread_id),
let (parent_thread_id, subagent_kind) = match session_source {
SessionSource::SubAgent(subagent_source) => (
subagent_source.parent_thread_id().or(forked_from_thread_id),
Some(subagent_source.kind().to_string()),
),
SessionSource::Cli
| SessionSource::VSCode
| SessionSource::Exec
| SessionSource::Mcp
| SessionSource::Custom(_)
| SessionSource::Internal(_)
| SessionSource::Unknown => (None, None),
};
let base_metadata = TurnMetadataBag {
request_kind: None,
session_id: Some(session_id),
thread_id: Some(thread_id),
forked_from_thread_id,
parent_thread_id,
subagent_kind,
thread_source,
Some(turn_id),
turn_id: Some(turn_id),
workspaces: BTreeMap::new(),
sandbox,
);
};
let base_header = base_metadata.to_header_value();
Self {

View File

@@ -3,12 +3,16 @@ use super::*;
use crate::sandbox_tags::permission_profile_sandbox_tag;
use codex_protocol::models::PermissionProfile;
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::ThreadSource;
use codex_utils_absolute_path::AbsolutePathBuf;
use core_test_support::PathBufExt;
use core_test_support::PathExt;
use pretty_assertions::assert_eq;
use serde_json::Value;
use std::collections::HashMap;
use std::time::Duration;
use tempfile::TempDir;
use tokio::process::Command;
@@ -19,10 +23,9 @@ fn test_mcp_turn_metadata_context() -> McpTurnMetadataContext<'static> {
}
}
#[tokio::test]
async fn build_turn_metadata_header_marks_detached_memory_without_turn_identity() {
async fn create_clean_git_repo(repo_name: &str) -> (TempDir, AbsolutePathBuf) {
let temp_dir = TempDir::new().expect("temp dir");
let repo_path = temp_dir.path().join("repo-東京").abs();
let repo_path = temp_dir.path().join(repo_name).abs();
std::fs::create_dir_all(&repo_path).expect("create repo");
Command::new("git")
@@ -43,7 +46,6 @@ async fn build_turn_metadata_header_marks_detached_memory_without_turn_identity(
.output()
.await
.expect("git config user.email");
std::fs::write(repo_path.join("README.md"), "hello").expect("write file");
Command::new("git")
.args(["add", "."])
@@ -58,6 +60,13 @@ async fn build_turn_metadata_header_marks_detached_memory_without_turn_identity(
.await
.expect("git commit");
(temp_dir, repo_path)
}
#[tokio::test]
async fn build_turn_metadata_header_marks_detached_memory_without_turn_identity() {
let (_temp_dir, repo_path) = create_clean_git_repo("repo-東京").await;
let header = build_turn_metadata_header(&repo_path, Some("none"))
.await
.expect("header");
@@ -113,6 +122,7 @@ fn turn_metadata_state_uses_platform_sandbox_tag() {
"session-a".to_string(),
"thread-a".to_string(),
/*forked_from_thread_id*/ None,
&SessionSource::Exec,
Some(ThreadSource::User),
"turn-a".to_string(),
cwd,
@@ -138,6 +148,9 @@ fn turn_metadata_state_uses_platform_sandbox_tag() {
assert_eq!(session_id, Some("session-a"));
assert_eq!(thread_id, Some("thread-a"));
assert_eq!(thread_source, Some("user"));
assert!(json.get("forked_from_thread_id").is_none());
assert!(json.get("parent_thread_id").is_none());
assert!(json.get("subagent_kind").is_none());
assert!(json.get("session_source").is_none());
}
@@ -150,6 +163,7 @@ fn turn_metadata_state_uses_explicit_subagent_thread_source() {
"session-a".to_string(),
"thread-a".to_string(),
/*forked_from_thread_id*/ None,
&SessionSource::Exec,
Some(ThreadSource::Subagent),
"turn-a".to_string(),
cwd,
@@ -177,6 +191,7 @@ fn turn_metadata_state_includes_root_fork_lineage() {
"session-a".to_string(),
"thread-a".to_string(),
Some(source_thread_id),
&SessionSource::Exec,
Some(ThreadSource::User),
"turn-a".to_string(),
cwd,
@@ -192,6 +207,122 @@ fn turn_metadata_state_includes_root_fork_lineage() {
json["forked_from_thread_id"].as_str(),
Some("11111111-1111-4111-8111-111111111111")
);
assert!(json.get("parent_thread_id").is_none());
assert!(json.get("subagent_kind").is_none());
}
#[test]
fn turn_metadata_state_includes_thread_spawn_subagent_parent_without_fork() {
let temp_dir = TempDir::new().expect("temp dir");
let cwd = temp_dir.path().abs();
let permission_profile = PermissionProfile::read_only();
let parent_thread_id =
ThreadId::from_string("22222222-2222-4222-8222-222222222222").expect("thread id");
let state = TurnMetadataState::new(
"session-a".to_string(),
"thread-a".to_string(),
/*forked_from_thread_id*/ None,
&SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
}),
Some(ThreadSource::Subagent),
"turn-a".to_string(),
cwd,
&permission_profile,
WindowsSandboxLevel::Disabled,
/*enforce_managed_network*/ false,
);
let header = state.current_header_value().expect("header");
let json: Value = serde_json::from_str(&header).expect("json");
assert!(json.get("forked_from_thread_id").is_none());
assert_eq!(
json["parent_thread_id"].as_str(),
Some("22222222-2222-4222-8222-222222222222")
);
assert_eq!(json["subagent_kind"].as_str(), Some("thread_spawn"));
}
#[test]
fn turn_metadata_state_includes_forked_thread_spawn_subagent_lineage() {
let temp_dir = TempDir::new().expect("temp dir");
let cwd = temp_dir.path().abs();
let permission_profile = PermissionProfile::read_only();
let parent_thread_id =
ThreadId::from_string("33333333-3333-4333-8333-333333333333").expect("thread id");
let state = TurnMetadataState::new(
"session-a".to_string(),
"thread-a".to_string(),
Some(parent_thread_id),
&SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
}),
Some(ThreadSource::Subagent),
"turn-a".to_string(),
cwd,
&permission_profile,
WindowsSandboxLevel::Disabled,
/*enforce_managed_network*/ false,
);
let header = state.current_header_value().expect("header");
let json: Value = serde_json::from_str(&header).expect("json");
assert_eq!(
json["forked_from_thread_id"].as_str(),
Some("33333333-3333-4333-8333-333333333333")
);
assert_eq!(
json["parent_thread_id"].as_str(),
Some("33333333-3333-4333-8333-333333333333")
);
assert_eq!(json["subagent_kind"].as_str(), Some("thread_spawn"));
}
#[test]
fn turn_metadata_state_includes_known_parent_for_other_subagent() {
let temp_dir = TempDir::new().expect("temp dir");
let cwd = temp_dir.path().abs();
let permission_profile = PermissionProfile::read_only();
let parent_thread_id =
ThreadId::from_string("44444444-4444-4444-8444-444444444444").expect("thread id");
let state = TurnMetadataState::new(
"session-a".to_string(),
"thread-a".to_string(),
Some(parent_thread_id),
&SessionSource::SubAgent(SubAgentSource::Other("guardian".to_string())),
Some(ThreadSource::Subagent),
"turn-a".to_string(),
cwd,
&permission_profile,
WindowsSandboxLevel::Disabled,
/*enforce_managed_network*/ false,
);
let header = state.current_header_value().expect("header");
let json: Value = serde_json::from_str(&header).expect("json");
assert_eq!(
json["forked_from_thread_id"].as_str(),
Some("44444444-4444-4444-8444-444444444444")
);
assert_eq!(
json["parent_thread_id"].as_str(),
Some("44444444-4444-4444-8444-444444444444")
);
assert_eq!(json["subagent_kind"].as_str(), Some("guardian"));
}
#[test]
@@ -204,6 +335,7 @@ fn turn_metadata_state_includes_turn_started_at_unix_ms_after_start() {
"session-a".to_string(),
"thread-a".to_string(),
/*forked_from_thread_id*/ None,
&SessionSource::Exec,
Some(ThreadSource::User),
"turn-a".to_string(),
cwd,
@@ -232,6 +364,7 @@ fn turn_metadata_state_includes_model_and_reasoning_effort_only_in_request_meta(
"session-a".to_string(),
"thread-a".to_string(),
/*forked_from_thread_id*/ None,
&SessionSource::Exec,
/*thread_source*/ None,
"turn-a".to_string(),
cwd,
@@ -279,6 +412,7 @@ fn turn_metadata_state_marks_user_input_requested_during_turn_only_for_mcp_reque
"session-a".to_string(),
"thread-a".to_string(),
/*forked_from_thread_id*/ None,
&SessionSource::Exec,
/*thread_source*/ None,
"turn-a".to_string(),
cwd,
@@ -330,6 +464,7 @@ fn turn_metadata_state_ignores_client_reserved_metadata_before_start() {
"session-a".to_string(),
"thread-a".to_string(),
/*forked_from_thread_id*/ None,
&SessionSource::Exec,
Some(ThreadSource::User),
"turn-a".to_string(),
cwd,
@@ -346,6 +481,11 @@ fn turn_metadata_state_ignores_client_reserved_metadata_before_start() {
"forked_from_thread_id".to_string(),
"client-supplied".to_string(),
),
(
"parent_thread_id".to_string(),
"client-supplied".to_string(),
),
("subagent_kind".to_string(), "client-supplied".to_string()),
]));
let header = state.current_header_value().expect("header");
@@ -353,6 +493,8 @@ fn turn_metadata_state_ignores_client_reserved_metadata_before_start() {
assert!(json.get("turn_started_at_unix_ms").is_none());
assert!(json.get("forked_from_thread_id").is_none());
assert!(json.get("parent_thread_id").is_none());
assert!(json.get("subagent_kind").is_none());
}
#[test]
@@ -362,11 +504,20 @@ fn turn_metadata_state_merges_client_metadata_without_replacing_reserved_fields(
let permission_profile = PermissionProfile::read_only();
let source_thread_id =
ThreadId::from_string("44444444-4444-4444-8444-444444444444").expect("thread id");
let parent_thread_id =
ThreadId::from_string("55555555-5555-4555-8555-555555555555").expect("thread id");
let state = TurnMetadataState::new(
"session-a".to_string(),
"thread-a".to_string(),
Some(source_thread_id),
&SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
}),
Some(ThreadSource::User),
"turn-a".to_string(),
cwd,
@@ -388,6 +539,11 @@ fn turn_metadata_state_merges_client_metadata_without_replacing_reserved_fields(
"forked_from_thread_id".to_string(),
"client-supplied".to_string(),
),
(
"parent_thread_id".to_string(),
"client-supplied".to_string(),
),
("subagent_kind".to_string(), "client-supplied".to_string()),
("turn_id".to_string(), "client-supplied".to_string()),
(WINDOW_ID_KEY.to_string(), "client-supplied".to_string()),
("thread_source".to_string(), "client-supplied".to_string()),
@@ -414,6 +570,11 @@ fn turn_metadata_state_merges_client_metadata_without_replacing_reserved_fields(
json["forked_from_thread_id"].as_str(),
Some("44444444-4444-4444-8444-444444444444")
);
assert_eq!(
json["parent_thread_id"].as_str(),
Some("55555555-5555-4555-8555-555555555555")
);
assert_eq!(json["subagent_kind"].as_str(), Some("thread_spawn"));
assert_eq!(json["thread_source"].as_str(), Some("user"));
assert_eq!(json["turn_id"].as_str(), Some("turn-a"));
assert!(json.get("request_kind").is_none());
@@ -451,6 +612,7 @@ fn turn_metadata_state_overlays_compaction_only_on_compaction_requests() {
"session-a".to_string(),
"thread-a".to_string(),
/*forked_from_thread_id*/ None,
&SessionSource::Exec,
Some(ThreadSource::User),
"turn-a".to_string(),
cwd,
@@ -497,3 +659,59 @@ fn turn_metadata_state_overlays_compaction_only_on_compaction_requests() {
assert_eq!(regular_json[WINDOW_ID_KEY].as_str(), Some("thread-a:3"));
assert!(regular_json.get("compaction").is_none());
}
#[tokio::test]
async fn turn_metadata_state_preserves_lineage_after_git_enrichment() {
let (_temp_dir, repo_path) = create_clean_git_repo("repo").await;
let permission_profile = PermissionProfile::read_only();
let parent_thread_id =
ThreadId::from_string("66666666-6666-4666-8666-666666666666").expect("thread id");
let state = TurnMetadataState::new(
"session-a".to_string(),
"thread-a".to_string(),
Some(parent_thread_id),
&SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
agent_path: None,
agent_nickname: None,
agent_role: None,
}),
Some(ThreadSource::Subagent),
"turn-a".to_string(),
repo_path,
&permission_profile,
WindowsSandboxLevel::Disabled,
/*enforce_managed_network*/ false,
);
state.spawn_git_enrichment_task();
let json = tokio::time::timeout(Duration::from_secs(2), async {
loop {
let header = state.current_header_value().expect("header");
let json: Value = serde_json::from_str(&header).expect("json");
if json
.get("workspaces")
.and_then(Value::as_object)
.is_some_and(|workspaces| !workspaces.is_empty())
{
return json;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("git enrichment should complete");
assert_eq!(
json["forked_from_thread_id"].as_str(),
Some("66666666-6666-4666-8666-666666666666")
);
assert_eq!(
json["parent_thread_id"].as_str(),
Some("66666666-6666-4666-8666-666666666666")
);
assert_eq!(json["subagent_kind"].as_str(), Some("thread_spawn"));
}

View File

@@ -2440,12 +2440,22 @@ impl InitialHistory {
}
}
pub fn get_resumed_session_sources(&self) -> Option<(SessionSource, Option<ThreadSource>)> {
let meta = self.get_resumed_session_meta()?;
Some((meta.source.clone(), meta.thread_source))
}
pub fn get_resumed_thread_source(&self) -> Option<ThreadSource> {
self.get_resumed_session_meta()
.and_then(|meta| meta.thread_source)
}
fn get_resumed_session_meta(&self) -> Option<&SessionMeta> {
match self {
InitialHistory::New | InitialHistory::Cleared | InitialHistory::Forked(_) => None,
InitialHistory::Resumed(resumed) => {
resumed.history.iter().find_map(|item| match item {
RolloutItem::SessionMeta(meta_line) => meta_line.meta.thread_source,
RolloutItem::SessionMeta(meta_line) => Some(&meta_line.meta),
_ => None,
})
}
@@ -2630,6 +2640,19 @@ impl SessionSource {
.restriction_product()
.is_some_and(|product| product.matches_product_restriction(products))
}
pub fn parent_thread_id(&self) -> Option<ThreadId> {
match self {
SessionSource::SubAgent(subagent_source) => subagent_source.parent_thread_id(),
SessionSource::Cli
| SessionSource::VSCode
| SessionSource::Exec
| SessionSource::Mcp
| SessionSource::Custom(_)
| SessionSource::Internal(_)
| SessionSource::Unknown => None,
}
}
}
impl fmt::Display for SubAgentSource {
@@ -2650,6 +2673,30 @@ impl fmt::Display for SubAgentSource {
}
}
impl SubAgentSource {
pub fn kind(&self) -> &str {
match self {
SubAgentSource::Review => "review",
SubAgentSource::Compact => "compact",
SubAgentSource::ThreadSpawn { .. } => "thread_spawn",
SubAgentSource::MemoryConsolidation => "memory_consolidation",
SubAgentSource::Other(other) => other,
}
}
pub fn parent_thread_id(&self) -> Option<ThreadId> {
match self {
SubAgentSource::ThreadSpawn {
parent_thread_id, ..
} => Some(*parent_thread_id),
SubAgentSource::Review
| SubAgentSource::Compact
| SubAgentSource::MemoryConsolidation
| SubAgentSource::Other(_) => None,
}
}
}
impl fmt::Display for InternalSessionSource {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {

View File

@@ -960,13 +960,7 @@ pub(super) fn extract_memory_mode(items: &[RolloutItem]) -> Option<String> {
fn thread_spawn_parent_thread_id_from_source_str(source: &str) -> Option<ThreadId> {
let parsed_source = serde_json::from_str(source)
.or_else(|_| serde_json::from_value::<SessionSource>(Value::String(source.to_string())));
match parsed_source.ok() {
Some(SessionSource::SubAgent(codex_protocol::protocol::SubAgentSource::ThreadSpawn {
parent_thread_id,
..
})) => Some(parent_thread_id),
_ => None,
}
parsed_source.ok()?.parent_thread_id()
}
#[derive(Clone, Copy)]