mirror of
https://github.com/openai/codex.git
synced 2026-02-07 01:13:40 +00:00
Compare commits
2 Commits
remove/doc
...
jif/better
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e437195f30 | ||
|
|
a3c07aabd0 |
@@ -29,6 +29,7 @@ use codex_protocol::protocol::SkillErrorInfo as CoreSkillErrorInfo;
|
||||
use codex_protocol::protocol::SkillInterface as CoreSkillInterface;
|
||||
use codex_protocol::protocol::SkillMetadata as CoreSkillMetadata;
|
||||
use codex_protocol::protocol::SkillScope as CoreSkillScope;
|
||||
use codex_protocol::protocol::ThreadOrigin as CoreThreadOrigin;
|
||||
use codex_protocol::protocol::TokenUsage as CoreTokenUsage;
|
||||
use codex_protocol::protocol::TokenUsageInfo as CoreTokenUsageInfo;
|
||||
use codex_protocol::user_input::ByteRange as CoreByteRange;
|
||||
@@ -726,6 +727,62 @@ impl From<SessionSource> for CoreSessionSource {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(tag = "type", rename_all = "camelCase")]
|
||||
#[ts(tag = "type", rename_all = "camelCase", export_to = "v2/")]
|
||||
pub enum ThreadOrigin {
|
||||
UserRequested,
|
||||
Forked {
|
||||
parent_thread_id: String,
|
||||
},
|
||||
SpawnedByThread {
|
||||
parent_thread_id: String,
|
||||
kind: ThreadSpawnedKind,
|
||||
},
|
||||
#[serde(other)]
|
||||
Unknown,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(tag = "type", rename_all = "camelCase")]
|
||||
#[ts(tag = "type", rename_all = "camelCase", export_to = "v2/")]
|
||||
pub enum ThreadSpawnedKind {
|
||||
Review,
|
||||
Compact,
|
||||
Other { name: String },
|
||||
}
|
||||
|
||||
impl From<CoreThreadOrigin> for ThreadOrigin {
|
||||
fn from(value: CoreThreadOrigin) -> Self {
|
||||
match value {
|
||||
CoreThreadOrigin::UserRequested => ThreadOrigin::UserRequested,
|
||||
CoreThreadOrigin::Forked { parent_thread_id } => ThreadOrigin::Forked {
|
||||
parent_thread_id: parent_thread_id.to_string(),
|
||||
},
|
||||
CoreThreadOrigin::SpawnedByThread {
|
||||
parent_thread_id,
|
||||
kind,
|
||||
} => ThreadOrigin::SpawnedByThread {
|
||||
parent_thread_id: parent_thread_id.to_string(),
|
||||
kind: kind.into(),
|
||||
},
|
||||
CoreThreadOrigin::Unknown => ThreadOrigin::Unknown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<codex_protocol::protocol::SpawnedThreadKind> for ThreadSpawnedKind {
|
||||
fn from(value: codex_protocol::protocol::SpawnedThreadKind) -> Self {
|
||||
match value {
|
||||
codex_protocol::protocol::SpawnedThreadKind::Review => ThreadSpawnedKind::Review,
|
||||
codex_protocol::protocol::SpawnedThreadKind::Compact => ThreadSpawnedKind::Compact,
|
||||
codex_protocol::protocol::SpawnedThreadKind::Other(name) => {
|
||||
ThreadSpawnedKind::Other { name }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
@@ -1415,6 +1472,8 @@ pub struct Thread {
|
||||
pub cli_version: String,
|
||||
/// Origin of the thread (CLI, VSCode, codex exec, codex app-server, etc.).
|
||||
pub source: SessionSource,
|
||||
/// Origin of the thread (user requested, forked, spawned by another thread, etc.).
|
||||
pub thread_origin: ThreadOrigin,
|
||||
/// Optional Git metadata captured when the thread was created.
|
||||
pub git_info: Option<GitInfo>,
|
||||
/// Only populated on `thread/resume`, `thread/rollback`, `thread/fork` responses.
|
||||
|
||||
@@ -32,7 +32,7 @@ codex app-server generate-json-schema --out DIR
|
||||
|
||||
The API exposes three top level primitives representing an interaction between a user and Codex:
|
||||
|
||||
- **Thread**: A conversation between a user and the Codex agent. Each thread contains multiple turns.
|
||||
- **Thread**: A conversation between a user and the Codex agent. Each thread contains multiple turns. Thread objects also include `source` (client) and `threadOrigin` (how the thread was created, with `parentThreadId` for forked/review/spawned threads).
|
||||
- **Turn**: One turn of the conversation, typically starting with a user message and finishing with an agent message. Each turn contains multiple items.
|
||||
- **Item**: Represents user inputs and agent outputs as part of the turn, persisted and used as the context for future conversations. Example items include user message, agent reasoning, agent message, shell command, file edit, etc.
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::codex_message_processor::PendingRollbacks;
|
||||
use crate::codex_message_processor::TurnSummary;
|
||||
use crate::codex_message_processor::TurnSummaryStore;
|
||||
use crate::codex_message_processor::read_event_msgs_from_rollout;
|
||||
use crate::codex_message_processor::read_summary_from_rollout;
|
||||
use crate::codex_message_processor::read_thread_summary_from_rollout;
|
||||
use crate::codex_message_processor::summary_to_thread;
|
||||
use crate::error_code::INTERNAL_ERROR_CODE;
|
||||
use crate::error_code::INVALID_REQUEST_ERROR_CODE;
|
||||
@@ -1007,14 +1007,14 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
|
||||
if let Some(request_id) = pending {
|
||||
let rollout_path = conversation.rollout_path();
|
||||
let response = match read_summary_from_rollout(
|
||||
let response = match read_thread_summary_from_rollout(
|
||||
rollout_path.as_path(),
|
||||
fallback_model_provider.as_str(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(summary) => {
|
||||
let mut thread = summary_to_thread(summary);
|
||||
let mut thread = summary_to_thread(summary.summary, summary.thread_origin);
|
||||
match read_event_msgs_from_rollout(rollout_path.as_path()).await {
|
||||
Ok(events) => {
|
||||
thread.turns = build_turns_from_event_msgs(&events);
|
||||
|
||||
@@ -101,6 +101,7 @@ use codex_app_server_protocol::ThreadListParams;
|
||||
use codex_app_server_protocol::ThreadListResponse;
|
||||
use codex_app_server_protocol::ThreadLoadedListParams;
|
||||
use codex_app_server_protocol::ThreadLoadedListResponse;
|
||||
use codex_app_server_protocol::ThreadOrigin;
|
||||
use codex_app_server_protocol::ThreadResumeParams;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadRollbackParams;
|
||||
@@ -171,6 +172,8 @@ use codex_protocol::protocol::McpServerRefreshConfig;
|
||||
use codex_protocol::protocol::RateLimitSnapshot as CoreRateLimitSnapshot;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SpawnedThreadKind;
|
||||
use codex_protocol::protocol::ThreadOrigin as CoreThreadOrigin;
|
||||
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
|
||||
use codex_protocol::user_input::UserInput as CoreInputItem;
|
||||
use codex_rmcp_client::perform_oauth_login_return_url;
|
||||
@@ -250,6 +253,11 @@ pub(crate) struct CodexMessageProcessor {
|
||||
feedback: CodexFeedback,
|
||||
}
|
||||
|
||||
pub(crate) struct ThreadSummaryWithOrigin {
|
||||
pub(crate) summary: ConversationSummary,
|
||||
pub(crate) thread_origin: ThreadOrigin,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
pub(crate) enum ApiVersion {
|
||||
V1,
|
||||
@@ -1423,13 +1431,13 @@ impl CodexMessageProcessor {
|
||||
|
||||
// A bit hacky, but the summary contains a lot of useful information for the thread
|
||||
// that unfortunately does not get returned from thread_manager.start_thread().
|
||||
let thread = match read_summary_from_rollout(
|
||||
let thread_summary = match read_thread_summary_from_rollout(
|
||||
rollout_path.as_path(),
|
||||
fallback_provider,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(summary) => summary_to_thread(summary),
|
||||
Ok(summary) => summary,
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
@@ -1452,7 +1460,10 @@ impl CodexMessageProcessor {
|
||||
..
|
||||
} = session_configured;
|
||||
let response = ThreadStartResponse {
|
||||
thread: thread.clone(),
|
||||
thread: summary_to_thread(
|
||||
thread_summary.summary.clone(),
|
||||
thread_summary.thread_origin.clone(),
|
||||
),
|
||||
model,
|
||||
model_provider: model_provider_id,
|
||||
cwd,
|
||||
@@ -1480,7 +1491,9 @@ impl CodexMessageProcessor {
|
||||
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
|
||||
let notif = ThreadStartedNotification { thread };
|
||||
let notif = ThreadStartedNotification {
|
||||
thread: summary_to_thread(thread_summary.summary, thread_summary.thread_origin),
|
||||
};
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::ThreadStarted(notif))
|
||||
.await;
|
||||
@@ -1632,7 +1645,12 @@ impl CodexMessageProcessor {
|
||||
ThreadSortKey::UpdatedAt => CoreThreadSortKey::UpdatedAt,
|
||||
};
|
||||
let (summaries, next_cursor) = match self
|
||||
.list_threads_common(requested_page_size, cursor, model_providers, core_sort_key)
|
||||
.list_threads_common_with_origin(
|
||||
requested_page_size,
|
||||
cursor,
|
||||
model_providers,
|
||||
core_sort_key,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(r) => r,
|
||||
@@ -1642,7 +1660,10 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
let data = summaries.into_iter().map(summary_to_thread).collect();
|
||||
let data = summaries
|
||||
.into_iter()
|
||||
.map(|summary| summary_to_thread(summary.summary, summary.thread_origin))
|
||||
.collect();
|
||||
let response = ThreadListResponse { data, next_cursor };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
@@ -1877,13 +1898,13 @@ impl CodexMessageProcessor {
|
||||
);
|
||||
}
|
||||
|
||||
let mut thread = match read_summary_from_rollout(
|
||||
let thread_summary = match read_thread_summary_from_rollout(
|
||||
rollout_path.as_path(),
|
||||
fallback_model_provider.as_str(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(summary) => summary_to_thread(summary),
|
||||
Ok(summary) => summary,
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
@@ -1896,6 +1917,8 @@ impl CodexMessageProcessor {
|
||||
return;
|
||||
}
|
||||
};
|
||||
let mut thread =
|
||||
summary_to_thread(thread_summary.summary, thread_summary.thread_origin);
|
||||
thread.turns = initial_messages
|
||||
.as_deref()
|
||||
.map_or_else(Vec::new, build_turns_from_event_msgs);
|
||||
@@ -1937,10 +1960,10 @@ impl CodexMessageProcessor {
|
||||
developer_instructions,
|
||||
} = params;
|
||||
|
||||
let rollout_path = if let Some(path) = path {
|
||||
path
|
||||
let (rollout_path, parent_thread_id) = if let Some(path) = path {
|
||||
(path, None)
|
||||
} else {
|
||||
let existing_thread_id = match ThreadId::from_string(&thread_id) {
|
||||
let parent_thread_id = match ThreadId::from_string(&thread_id) {
|
||||
Ok(id) => id,
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
@@ -1952,14 +1975,14 @@ impl CodexMessageProcessor {
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let existing_thread_id = parent_thread_id;
|
||||
match find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&existing_thread_id.to_string(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Some(p)) => p,
|
||||
Ok(Some(p)) => (p, Some(parent_thread_id)),
|
||||
Ok(None) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
@@ -1979,14 +2002,33 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
let history_cwd = match read_session_meta_line(&rollout_path).await {
|
||||
Ok(meta_line) => Some(meta_line.meta.cwd),
|
||||
let session_meta_line = match read_session_meta_line(&rollout_path).await {
|
||||
Ok(meta_line) => Some(meta_line),
|
||||
Err(err) => {
|
||||
let rollout_path = rollout_path.display();
|
||||
warn!("failed to read session metadata from rollout {rollout_path}: {err}");
|
||||
None
|
||||
}
|
||||
};
|
||||
let history_cwd = session_meta_line
|
||||
.as_ref()
|
||||
.map(|meta_line| meta_line.meta.cwd.clone());
|
||||
let parent_thread_id = parent_thread_id.or_else(|| {
|
||||
session_meta_line
|
||||
.as_ref()
|
||||
.map(|meta_line| meta_line.meta.id)
|
||||
});
|
||||
let parent_thread_id = match parent_thread_id {
|
||||
Some(id) => id,
|
||||
None => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
"failed to derive parent thread id for fork".to_string(),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Persist windows sandbox feature.
|
||||
let mut cli_overrides = cli_overrides.unwrap_or_default();
|
||||
@@ -2039,7 +2081,12 @@ impl CodexMessageProcessor {
|
||||
..
|
||||
} = match self
|
||||
.thread_manager
|
||||
.fork_thread(usize::MAX, config, rollout_path.clone())
|
||||
.fork_thread(
|
||||
usize::MAX,
|
||||
config,
|
||||
rollout_path.clone(),
|
||||
CoreThreadOrigin::Forked { parent_thread_id },
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(thread) => thread,
|
||||
@@ -2079,13 +2126,13 @@ impl CodexMessageProcessor {
|
||||
);
|
||||
}
|
||||
|
||||
let mut thread = match read_summary_from_rollout(
|
||||
let thread_summary = match read_thread_summary_from_rollout(
|
||||
rollout_path.as_path(),
|
||||
fallback_model_provider.as_str(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(summary) => summary_to_thread(summary),
|
||||
Ok(summary) => summary,
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
@@ -2098,6 +2145,7 @@ impl CodexMessageProcessor {
|
||||
return;
|
||||
}
|
||||
};
|
||||
let mut thread = summary_to_thread(thread_summary.summary, thread_summary.thread_origin);
|
||||
thread.turns = initial_messages
|
||||
.as_deref()
|
||||
.map_or_else(Vec::new, build_turns_from_event_msgs);
|
||||
@@ -2315,6 +2363,120 @@ impl CodexMessageProcessor {
|
||||
Ok((items, next_cursor))
|
||||
}
|
||||
|
||||
async fn list_threads_common_with_origin(
|
||||
&self,
|
||||
requested_page_size: usize,
|
||||
cursor: Option<String>,
|
||||
model_providers: Option<Vec<String>>,
|
||||
sort_key: CoreThreadSortKey,
|
||||
) -> Result<(Vec<ThreadSummaryWithOrigin>, Option<String>), JSONRPCErrorError> {
|
||||
let mut cursor_obj: Option<RolloutCursor> = match cursor.as_ref() {
|
||||
Some(cursor_str) => {
|
||||
Some(parse_cursor(cursor_str).ok_or_else(|| JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("invalid cursor: {cursor_str}"),
|
||||
data: None,
|
||||
})?)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
let mut last_cursor = cursor_obj.clone();
|
||||
let mut remaining = requested_page_size;
|
||||
let mut items = Vec::with_capacity(requested_page_size);
|
||||
let mut next_cursor: Option<String> = None;
|
||||
|
||||
let model_provider_filter = match model_providers {
|
||||
Some(providers) => {
|
||||
if providers.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(providers)
|
||||
}
|
||||
}
|
||||
None => Some(vec![self.config.model_provider_id.clone()]),
|
||||
};
|
||||
let fallback_provider = self.config.model_provider_id.clone();
|
||||
|
||||
while remaining > 0 {
|
||||
let page_size = remaining.min(THREAD_LIST_MAX_LIMIT);
|
||||
let page = RolloutRecorder::list_threads(
|
||||
&self.config.codex_home,
|
||||
page_size,
|
||||
cursor_obj.as_ref(),
|
||||
sort_key,
|
||||
INTERACTIVE_SESSION_SOURCES,
|
||||
model_provider_filter.as_deref(),
|
||||
fallback_provider.as_str(),
|
||||
)
|
||||
.await
|
||||
.map_err(|err| JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("failed to list threads: {err}"),
|
||||
data: None,
|
||||
})?;
|
||||
|
||||
let mut filtered = page
|
||||
.items
|
||||
.into_iter()
|
||||
.filter_map(|it| {
|
||||
let updated_at = it.updated_at.clone();
|
||||
let session_meta_line = it.head.first().and_then(|first| {
|
||||
serde_json::from_value::<SessionMetaLine>(first.clone()).ok()
|
||||
})?;
|
||||
let summary = extract_conversation_summary(
|
||||
it.path,
|
||||
&it.head,
|
||||
&session_meta_line.meta,
|
||||
session_meta_line.git.as_ref(),
|
||||
fallback_provider.as_str(),
|
||||
updated_at,
|
||||
)?;
|
||||
let thread_origin = session_meta_line
|
||||
.meta
|
||||
.thread_origin
|
||||
.clone()
|
||||
.map(ThreadOrigin::from)
|
||||
.unwrap_or(ThreadOrigin::Unknown);
|
||||
Some(ThreadSummaryWithOrigin {
|
||||
summary,
|
||||
thread_origin,
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
if filtered.len() > remaining {
|
||||
filtered.truncate(remaining);
|
||||
}
|
||||
items.extend(filtered);
|
||||
remaining = requested_page_size.saturating_sub(items.len());
|
||||
|
||||
// Encode RolloutCursor into the JSON-RPC string form returned to clients.
|
||||
let next_cursor_value = page.next_cursor.clone();
|
||||
next_cursor = next_cursor_value
|
||||
.as_ref()
|
||||
.and_then(|cursor| serde_json::to_value(cursor).ok())
|
||||
.and_then(|value| value.as_str().map(str::to_owned));
|
||||
if remaining == 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
match next_cursor_value {
|
||||
Some(cursor_val) if remaining > 0 => {
|
||||
// Break if our pagination would reuse the same cursor again; this avoids
|
||||
// an infinite loop when filtering drops everything on the page.
|
||||
if last_cursor.as_ref() == Some(&cursor_val) {
|
||||
next_cursor = None;
|
||||
break;
|
||||
}
|
||||
last_cursor = Some(cursor_val.clone());
|
||||
cursor_obj = Some(cursor_val);
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
|
||||
Ok((items, next_cursor))
|
||||
}
|
||||
|
||||
async fn list_models(
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
thread_manager: Arc<ThreadManager>,
|
||||
@@ -2878,14 +3040,33 @@ impl CodexMessageProcessor {
|
||||
return;
|
||||
};
|
||||
|
||||
let history_cwd = match read_session_meta_line(&rollout_path).await {
|
||||
Ok(meta_line) => Some(meta_line.meta.cwd),
|
||||
let session_meta_line = match read_session_meta_line(&rollout_path).await {
|
||||
Ok(meta_line) => Some(meta_line),
|
||||
Err(err) => {
|
||||
let rollout_path = rollout_path.display();
|
||||
warn!("failed to read session metadata from rollout {rollout_path}: {err}");
|
||||
None
|
||||
}
|
||||
};
|
||||
let history_cwd = session_meta_line
|
||||
.as_ref()
|
||||
.map(|meta_line| meta_line.meta.cwd.clone());
|
||||
let parent_thread_id = conversation_id.or_else(|| {
|
||||
session_meta_line
|
||||
.as_ref()
|
||||
.map(|meta_line| meta_line.meta.id)
|
||||
});
|
||||
let parent_thread_id = match parent_thread_id {
|
||||
Some(id) => id,
|
||||
None => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
"failed to derive parent thread id for fork".to_string(),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let (typesafe_overrides, request_overrides) = match overrides {
|
||||
Some(overrides) => {
|
||||
@@ -2968,7 +3149,12 @@ impl CodexMessageProcessor {
|
||||
..
|
||||
} = match self
|
||||
.thread_manager
|
||||
.fork_thread(usize::MAX, config, rollout_path.clone())
|
||||
.fork_thread(
|
||||
usize::MAX,
|
||||
config,
|
||||
rollout_path.clone(),
|
||||
CoreThreadOrigin::Forked { parent_thread_id },
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(thread) => thread,
|
||||
@@ -3568,7 +3754,15 @@ impl CodexMessageProcessor {
|
||||
..
|
||||
} = self
|
||||
.thread_manager
|
||||
.fork_thread(usize::MAX, config, rollout_path)
|
||||
.fork_thread(
|
||||
usize::MAX,
|
||||
config,
|
||||
rollout_path,
|
||||
CoreThreadOrigin::SpawnedByThread {
|
||||
parent_thread_id,
|
||||
kind: SpawnedThreadKind::Review,
|
||||
},
|
||||
)
|
||||
.await
|
||||
.map_err(|err| JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
@@ -3589,9 +3783,9 @@ impl CodexMessageProcessor {
|
||||
|
||||
let rollout_path = review_thread.rollout_path();
|
||||
let fallback_provider = self.config.model_provider_id.as_str();
|
||||
match read_summary_from_rollout(rollout_path.as_path(), fallback_provider).await {
|
||||
match read_thread_summary_from_rollout(rollout_path.as_path(), fallback_provider).await {
|
||||
Ok(summary) => {
|
||||
let thread = summary_to_thread(summary);
|
||||
let thread = summary_to_thread(summary.summary, summary.thread_origin);
|
||||
let notif = ThreadStartedNotification { thread };
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::ThreadStarted(notif))
|
||||
@@ -4177,6 +4371,85 @@ pub(crate) async fn read_summary_from_rollout(
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn read_thread_summary_from_rollout(
|
||||
path: &Path,
|
||||
fallback_provider: &str,
|
||||
) -> std::io::Result<ThreadSummaryWithOrigin> {
|
||||
let head = read_head_for_summary(path).await?;
|
||||
|
||||
let Some(first) = head.first() else {
|
||||
return Err(IoError::other(format!(
|
||||
"rollout at {} is empty",
|
||||
path.display()
|
||||
)));
|
||||
};
|
||||
|
||||
let session_meta_line =
|
||||
serde_json::from_value::<SessionMetaLine>(first.clone()).map_err(|_| {
|
||||
IoError::other(format!(
|
||||
"rollout at {} does not start with session metadata",
|
||||
path.display()
|
||||
))
|
||||
})?;
|
||||
let SessionMetaLine {
|
||||
meta: session_meta,
|
||||
git,
|
||||
} = session_meta_line;
|
||||
let thread_origin = session_meta
|
||||
.thread_origin
|
||||
.clone()
|
||||
.map(ThreadOrigin::from)
|
||||
.unwrap_or(ThreadOrigin::Unknown);
|
||||
|
||||
let created_at = if session_meta.timestamp.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(session_meta.timestamp.as_str())
|
||||
};
|
||||
let updated_at = read_updated_at(path, created_at).await;
|
||||
if let Some(summary) = extract_conversation_summary(
|
||||
path.to_path_buf(),
|
||||
&head,
|
||||
&session_meta,
|
||||
git.as_ref(),
|
||||
fallback_provider,
|
||||
updated_at.clone(),
|
||||
) {
|
||||
return Ok(ThreadSummaryWithOrigin {
|
||||
summary,
|
||||
thread_origin,
|
||||
});
|
||||
}
|
||||
|
||||
let timestamp = if session_meta.timestamp.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(session_meta.timestamp.clone())
|
||||
};
|
||||
let model_provider = session_meta
|
||||
.model_provider
|
||||
.clone()
|
||||
.unwrap_or_else(|| fallback_provider.to_string());
|
||||
let git_info = git.as_ref().map(map_git_info);
|
||||
let updated_at = updated_at.or_else(|| timestamp.clone());
|
||||
|
||||
Ok(ThreadSummaryWithOrigin {
|
||||
summary: ConversationSummary {
|
||||
conversation_id: session_meta.id,
|
||||
timestamp,
|
||||
updated_at,
|
||||
path: path.to_path_buf(),
|
||||
preview: String::new(),
|
||||
model_provider,
|
||||
cwd: session_meta.cwd,
|
||||
cli_version: session_meta.cli_version,
|
||||
source: session_meta.source,
|
||||
git_info,
|
||||
},
|
||||
thread_origin,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn read_event_msgs_from_rollout(
|
||||
path: &Path,
|
||||
) -> std::io::Result<Vec<codex_protocol::protocol::EventMsg>> {
|
||||
@@ -4271,7 +4544,10 @@ async fn read_updated_at(path: &Path, created_at: Option<&str>) -> Option<String
|
||||
updated_at.or_else(|| created_at.map(str::to_string))
|
||||
}
|
||||
|
||||
pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
|
||||
pub(crate) fn summary_to_thread(
|
||||
summary: ConversationSummary,
|
||||
thread_origin: ThreadOrigin,
|
||||
) -> Thread {
|
||||
let ConversationSummary {
|
||||
conversation_id,
|
||||
path,
|
||||
@@ -4303,6 +4579,7 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
|
||||
cwd,
|
||||
cli_version,
|
||||
source: source.into(),
|
||||
thread_origin,
|
||||
git_info,
|
||||
turns: Vec::new(),
|
||||
}
|
||||
|
||||
@@ -53,6 +53,7 @@ pub fn create_fake_rollout(
|
||||
let meta = SessionMeta {
|
||||
id: conversation_id,
|
||||
forked_from_id: None,
|
||||
thread_origin: None,
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
cwd: PathBuf::from("/"),
|
||||
originator: "codex".to_string(),
|
||||
@@ -131,6 +132,7 @@ pub fn create_fake_rollout_with_text_elements(
|
||||
let meta = SessionMeta {
|
||||
id: conversation_id,
|
||||
forked_from_id: None,
|
||||
thread_origin: None,
|
||||
timestamp: meta_rfc3339.to_string(),
|
||||
cwd: PathBuf::from("/"),
|
||||
originator: "codex".to_string(),
|
||||
|
||||
@@ -355,7 +355,7 @@ mod tests {
|
||||
use crate::provider::WireApi;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::protocol::SpawnedThreadKind;
|
||||
use http::HeaderValue;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::time::Duration;
|
||||
@@ -389,7 +389,7 @@ mod tests {
|
||||
}];
|
||||
let req = ChatRequestBuilder::new("gpt-test", "inst", &prompt_input, &[])
|
||||
.conversation_id(Some("conv-1".into()))
|
||||
.session_source(Some(SessionSource::SubAgent(SubAgentSource::Review)))
|
||||
.session_source(Some(SessionSource::SubAgent(SpawnedThreadKind::Review)))
|
||||
.build(&provider())
|
||||
.expect("request");
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ pub(crate) fn subagent_header(source: &Option<SessionSource>) -> Option<String>
|
||||
return None;
|
||||
};
|
||||
match sub {
|
||||
codex_protocol::protocol::SubAgentSource::Other(label) => Some(label.clone()),
|
||||
codex_protocol::protocol::SpawnedThreadKind::Other(label) => Some(label.clone()),
|
||||
other => Some(
|
||||
serde_json::to_value(other)
|
||||
.ok()
|
||||
|
||||
@@ -192,7 +192,7 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::provider::RetryConfig;
|
||||
use crate::provider::WireApi;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::protocol::SpawnedThreadKind;
|
||||
use http::HeaderValue;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::time::Duration;
|
||||
@@ -233,7 +233,7 @@ mod tests {
|
||||
|
||||
let request = ResponsesRequestBuilder::new("gpt-test", "inst", &input)
|
||||
.conversation(Some("conv-1".into()))
|
||||
.session_source(Some(SessionSource::SubAgent(SubAgentSource::Review)))
|
||||
.session_source(Some(SessionSource::SubAgent(SpawnedThreadKind::Review)))
|
||||
.build(&provider)
|
||||
.expect("request");
|
||||
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
danger-unless-trusted
|
||||
@@ -5,6 +5,8 @@ use crate::error::Result as CodexResult;
|
||||
use crate::thread_manager::ThreadManagerState;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::SpawnedThreadKind;
|
||||
use codex_protocol::protocol::ThreadOrigin;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Weak;
|
||||
@@ -39,12 +41,22 @@ impl AgentControl {
|
||||
&self,
|
||||
config: crate::config::Config,
|
||||
prompt: String,
|
||||
parent_thread_id: ThreadId,
|
||||
) -> CodexResult<ThreadId> {
|
||||
let state = self.upgrade()?;
|
||||
let reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
|
||||
|
||||
// The same `AgentControl` is sent to spawn the thread.
|
||||
let new_thread = state.spawn_new_thread(config, self.clone()).await?;
|
||||
let new_thread = state
|
||||
.spawn_new_thread(
|
||||
config,
|
||||
ThreadOrigin::SpawnedByThread {
|
||||
parent_thread_id,
|
||||
kind: SpawnedThreadKind::Other("agent".to_string()),
|
||||
},
|
||||
self.clone(),
|
||||
)
|
||||
.await?;
|
||||
reservation.commit(new_thread.thread_id);
|
||||
|
||||
// Notify a new thread has been created. This notification will be processed by clients
|
||||
@@ -268,7 +280,7 @@ mod tests {
|
||||
let control = AgentControl::default();
|
||||
let (_home, config) = test_config().await;
|
||||
let err = control
|
||||
.spawn_agent(config, "hello".to_string())
|
||||
.spawn_agent(config, "hello".to_string(), ThreadId::new())
|
||||
.await
|
||||
.expect_err("spawn_agent should fail without a manager");
|
||||
assert_eq!(
|
||||
@@ -370,7 +382,11 @@ mod tests {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let thread_id = harness
|
||||
.control
|
||||
.spawn_agent(harness.config.clone(), "spawned".to_string())
|
||||
.spawn_agent(
|
||||
harness.config.clone(),
|
||||
"spawned".to_string(),
|
||||
ThreadId::new(),
|
||||
)
|
||||
.await
|
||||
.expect("spawn_agent should succeed");
|
||||
let _thread = harness
|
||||
@@ -417,12 +433,12 @@ mod tests {
|
||||
.expect("start thread");
|
||||
|
||||
let first_agent_id = control
|
||||
.spawn_agent(config.clone(), "hello".to_string())
|
||||
.spawn_agent(config.clone(), "hello".to_string(), ThreadId::new())
|
||||
.await
|
||||
.expect("spawn_agent should succeed");
|
||||
|
||||
let err = control
|
||||
.spawn_agent(config, "hello again".to_string())
|
||||
.spawn_agent(config, "hello again".to_string(), ThreadId::new())
|
||||
.await
|
||||
.expect_err("spawn_agent should respect max threads");
|
||||
let CodexErr::AgentLimitReached {
|
||||
@@ -455,7 +471,7 @@ mod tests {
|
||||
let control = manager.agent_control();
|
||||
|
||||
let first_agent_id = control
|
||||
.spawn_agent(config.clone(), "hello".to_string())
|
||||
.spawn_agent(config.clone(), "hello".to_string(), ThreadId::new())
|
||||
.await
|
||||
.expect("spawn_agent should succeed");
|
||||
let _ = control
|
||||
@@ -464,7 +480,7 @@ mod tests {
|
||||
.expect("shutdown agent");
|
||||
|
||||
let second_agent_id = control
|
||||
.spawn_agent(config.clone(), "hello again".to_string())
|
||||
.spawn_agent(config.clone(), "hello again".to_string(), ThreadId::new())
|
||||
.await
|
||||
.expect("spawn_agent should succeed after shutdown");
|
||||
let _ = control
|
||||
@@ -490,12 +506,12 @@ mod tests {
|
||||
let cloned = control.clone();
|
||||
|
||||
let first_agent_id = cloned
|
||||
.spawn_agent(config.clone(), "hello".to_string())
|
||||
.spawn_agent(config.clone(), "hello".to_string(), ThreadId::new())
|
||||
.await
|
||||
.expect("spawn_agent should succeed");
|
||||
|
||||
let err = control
|
||||
.spawn_agent(config, "hello again".to_string())
|
||||
.spawn_agent(config, "hello again".to_string(), ThreadId::new())
|
||||
.await
|
||||
.expect_err("spawn_agent should respect shared guard");
|
||||
let CodexErr::AgentLimitReached { max_threads } = err else {
|
||||
|
||||
@@ -226,7 +226,7 @@ impl ModelClient {
|
||||
|
||||
let mut extra_headers = ApiHeaderMap::new();
|
||||
if let SessionSource::SubAgent(sub) = &self.state.session_source {
|
||||
let subagent = if let crate::protocol::SubAgentSource::Other(label) = sub {
|
||||
let subagent = if let crate::protocol::SpawnedThreadKind::Other(label) = sub {
|
||||
label.clone()
|
||||
} else {
|
||||
serde_json::to_value(sub)
|
||||
|
||||
@@ -48,6 +48,7 @@ use codex_protocol::protocol::RawResponseItemEvent;
|
||||
use codex_protocol::protocol::ReviewRequest;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::ThreadOrigin;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
@@ -228,6 +229,7 @@ fn maybe_push_chat_wire_api_deprecation(
|
||||
|
||||
impl Codex {
|
||||
/// Spawn a new [`Codex`] and initialize the session.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn spawn(
|
||||
config: Config,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
@@ -235,6 +237,7 @@ impl Codex {
|
||||
skills_manager: Arc<SkillsManager>,
|
||||
conversation_history: InitialHistory,
|
||||
session_source: SessionSource,
|
||||
thread_origin: ThreadOrigin,
|
||||
agent_control: AgentControl,
|
||||
) -> CodexResult<CodexSpawnOk> {
|
||||
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
@@ -324,6 +327,7 @@ impl Codex {
|
||||
agent_status_tx.clone(),
|
||||
conversation_history,
|
||||
session_source_clone,
|
||||
thread_origin.clone(),
|
||||
skills_manager,
|
||||
agent_control,
|
||||
)
|
||||
@@ -589,6 +593,7 @@ impl Session {
|
||||
agent_status: watch::Sender<AgentStatus>,
|
||||
initial_history: InitialHistory,
|
||||
session_source: SessionSource,
|
||||
thread_origin: ThreadOrigin,
|
||||
skills_manager: Arc<SkillsManager>,
|
||||
agent_control: AgentControl,
|
||||
) -> anyhow::Result<Arc<Self>> {
|
||||
@@ -604,7 +609,10 @@ impl Session {
|
||||
));
|
||||
}
|
||||
|
||||
let forked_from_id = initial_history.forked_from_id();
|
||||
let forked_from_id = match &thread_origin {
|
||||
ThreadOrigin::Forked { parent_thread_id } => Some(*parent_thread_id),
|
||||
_ => initial_history.forked_from_id(),
|
||||
};
|
||||
|
||||
let (conversation_id, rollout_params) = match &initial_history {
|
||||
InitialHistory::New | InitialHistory::Forked(_) => {
|
||||
@@ -614,6 +622,7 @@ impl Session {
|
||||
RolloutRecorderParams::new(
|
||||
conversation_id,
|
||||
forked_from_id,
|
||||
thread_origin,
|
||||
session_source,
|
||||
BaseInstructions {
|
||||
text: session_configuration.base_instructions.clone(),
|
||||
|
||||
@@ -12,8 +12,9 @@ use codex_protocol::protocol::ExecApprovalRequestEvent;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::RequestUserInputEvent;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::protocol::SpawnedThreadKind;
|
||||
use codex_protocol::protocol::Submission;
|
||||
use codex_protocol::protocol::ThreadOrigin;
|
||||
use codex_protocol::request_user_input::RequestUserInputArgs;
|
||||
use codex_protocol::request_user_input::RequestUserInputResponse;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
@@ -55,7 +56,11 @@ pub(crate) async fn run_codex_thread_interactive(
|
||||
models_manager,
|
||||
Arc::clone(&parent_session.services.skills_manager),
|
||||
initial_history.unwrap_or(InitialHistory::New),
|
||||
SessionSource::SubAgent(SubAgentSource::Review),
|
||||
SessionSource::SubAgent(SpawnedThreadKind::Review),
|
||||
ThreadOrigin::SpawnedByThread {
|
||||
parent_thread_id: parent_session.conversation_id,
|
||||
kind: SpawnedThreadKind::Review,
|
||||
},
|
||||
parent_session.services.agent_control.clone(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -36,6 +36,7 @@ use codex_protocol::protocol::RolloutLine;
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::ThreadOrigin;
|
||||
|
||||
/// Records all [`ResponseItem`]s for a session and flushes them to disk after
|
||||
/// every update.
|
||||
@@ -57,6 +58,7 @@ pub enum RolloutRecorderParams {
|
||||
Create {
|
||||
conversation_id: ThreadId,
|
||||
forked_from_id: Option<ThreadId>,
|
||||
thread_origin: ThreadOrigin,
|
||||
source: SessionSource,
|
||||
base_instructions: BaseInstructions,
|
||||
},
|
||||
@@ -80,12 +82,14 @@ impl RolloutRecorderParams {
|
||||
pub fn new(
|
||||
conversation_id: ThreadId,
|
||||
forked_from_id: Option<ThreadId>,
|
||||
thread_origin: ThreadOrigin,
|
||||
source: SessionSource,
|
||||
base_instructions: BaseInstructions,
|
||||
) -> Self {
|
||||
Self::Create {
|
||||
conversation_id,
|
||||
forked_from_id,
|
||||
thread_origin,
|
||||
source,
|
||||
base_instructions,
|
||||
}
|
||||
@@ -161,6 +165,7 @@ impl RolloutRecorder {
|
||||
RolloutRecorderParams::Create {
|
||||
conversation_id,
|
||||
forked_from_id,
|
||||
thread_origin,
|
||||
source,
|
||||
base_instructions,
|
||||
} => {
|
||||
@@ -185,6 +190,7 @@ impl RolloutRecorder {
|
||||
Some(SessionMeta {
|
||||
id: session_id,
|
||||
forked_from_id,
|
||||
thread_origin: Some(thread_origin),
|
||||
timestamp,
|
||||
cwd: config.cwd.clone(),
|
||||
originator: originator().value,
|
||||
|
||||
@@ -772,6 +772,7 @@ async fn test_updated_at_uses_file_mtime() -> Result<()> {
|
||||
meta: SessionMeta {
|
||||
id: conversation_id,
|
||||
forked_from_id: None,
|
||||
thread_origin: None,
|
||||
timestamp: ts.to_string(),
|
||||
cwd: ".".into(),
|
||||
originator: "test_originator".into(),
|
||||
|
||||
@@ -26,6 +26,7 @@ use codex_protocol::protocol::McpServerRefreshConfig;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::ThreadOrigin;
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
@@ -201,6 +202,7 @@ impl ThreadManager {
|
||||
config,
|
||||
InitialHistory::New,
|
||||
Arc::clone(&self.state.auth_manager),
|
||||
ThreadOrigin::UserRequested,
|
||||
self.agent_control(),
|
||||
)
|
||||
.await
|
||||
@@ -224,7 +226,13 @@ impl ThreadManager {
|
||||
auth_manager: Arc<AuthManager>,
|
||||
) -> CodexResult<NewThread> {
|
||||
self.state
|
||||
.spawn_thread(config, initial_history, auth_manager, self.agent_control())
|
||||
.spawn_thread(
|
||||
config,
|
||||
initial_history,
|
||||
auth_manager,
|
||||
ThreadOrigin::Unknown,
|
||||
self.agent_control(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -253,6 +261,7 @@ impl ThreadManager {
|
||||
nth_user_message: usize,
|
||||
config: Config,
|
||||
path: PathBuf,
|
||||
thread_origin: ThreadOrigin,
|
||||
) -> CodexResult<NewThread> {
|
||||
let history = RolloutRecorder::get_rollout_history(&path).await?;
|
||||
let history = truncate_before_nth_user_message(history, nth_user_message);
|
||||
@@ -261,6 +270,7 @@ impl ThreadManager {
|
||||
config,
|
||||
history,
|
||||
Arc::clone(&self.state.auth_manager),
|
||||
thread_origin,
|
||||
self.agent_control(),
|
||||
)
|
||||
.await
|
||||
@@ -312,12 +322,14 @@ impl ThreadManagerState {
|
||||
pub(crate) async fn spawn_new_thread(
|
||||
&self,
|
||||
config: Config,
|
||||
thread_origin: ThreadOrigin,
|
||||
agent_control: AgentControl,
|
||||
) -> CodexResult<NewThread> {
|
||||
self.spawn_thread(
|
||||
config,
|
||||
InitialHistory::New,
|
||||
Arc::clone(&self.auth_manager),
|
||||
thread_origin,
|
||||
agent_control,
|
||||
)
|
||||
.await
|
||||
@@ -329,6 +341,7 @@ impl ThreadManagerState {
|
||||
config: Config,
|
||||
initial_history: InitialHistory,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
thread_origin: ThreadOrigin,
|
||||
agent_control: AgentControl,
|
||||
) -> CodexResult<NewThread> {
|
||||
let CodexSpawnOk {
|
||||
@@ -340,6 +353,7 @@ impl ThreadManagerState {
|
||||
Arc::clone(&self.skills_manager),
|
||||
initial_history,
|
||||
self.session_source.clone(),
|
||||
thread_origin,
|
||||
agent_control,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -125,7 +125,7 @@ mod spawn {
|
||||
let result = session
|
||||
.services
|
||||
.agent_control
|
||||
.spawn_agent(config, prompt.clone())
|
||||
.spawn_agent(config, prompt.clone(), session.conversation_id)
|
||||
.await
|
||||
.map_err(collab_spawn_error);
|
||||
let (new_thread_id, status) = match &result {
|
||||
|
||||
@@ -17,7 +17,7 @@ use codex_protocol::ThreadId;
|
||||
use codex_protocol::config_types::ReasoningSummary;
|
||||
use codex_protocol::config_types::WebSearchMode;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::protocol::SpawnedThreadKind;
|
||||
use core_test_support::load_default_config_for_test;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
@@ -70,7 +70,7 @@ async fn responses_stream_includes_subagent_header_on_review() {
|
||||
|
||||
let conversation_id = ThreadId::new();
|
||||
let auth_mode = AuthMode::ChatGPT;
|
||||
let session_source = SessionSource::SubAgent(SubAgentSource::Review);
|
||||
let session_source = SessionSource::SubAgent(SpawnedThreadKind::Review);
|
||||
let model_info = ModelsManager::construct_model_info_offline(model.as_str(), &config);
|
||||
let otel_manager = OtelManager::new(
|
||||
conversation_id,
|
||||
@@ -165,7 +165,7 @@ async fn responses_stream_includes_subagent_header_on_other() {
|
||||
|
||||
let conversation_id = ThreadId::new();
|
||||
let auth_mode = AuthMode::ChatGPT;
|
||||
let session_source = SessionSource::SubAgent(SubAgentSource::Other("my-task".to_string()));
|
||||
let session_source = SessionSource::SubAgent(SpawnedThreadKind::Other("my-task".to_string()));
|
||||
let model_info = ModelsManager::construct_model_info_offline(model.as_str(), &config);
|
||||
|
||||
let otel_manager = OtelManager::new(
|
||||
@@ -320,7 +320,7 @@ async fn responses_respects_model_info_overrides_from_config() {
|
||||
let auth_mode =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key")).get_auth_mode();
|
||||
let session_source =
|
||||
SessionSource::SubAgent(SubAgentSource::Other("override-check".to_string()));
|
||||
SessionSource::SubAgent(SpawnedThreadKind::Other("override-check".to_string()));
|
||||
let model_info = ModelsManager::construct_model_info_offline(model.as_str(), &config);
|
||||
let otel_manager = OtelManager::new(
|
||||
conversation_id,
|
||||
|
||||
@@ -22,6 +22,8 @@ use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::WarningEvent;
|
||||
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::ThreadOrigin;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::load_default_config_for_test;
|
||||
use core_test_support::responses::ResponseMock;
|
||||
@@ -1038,7 +1040,14 @@ async fn fork_thread(
|
||||
nth_user_message: usize,
|
||||
) -> Arc<CodexThread> {
|
||||
let NewThread { thread, .. } = manager
|
||||
.fork_thread(nth_user_message, config.clone(), path)
|
||||
.fork_thread(
|
||||
nth_user_message,
|
||||
config.clone(),
|
||||
path,
|
||||
ThreadOrigin::Forked {
|
||||
parent_thread_id: ThreadId::new(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("fork conversation");
|
||||
thread
|
||||
|
||||
@@ -8,7 +8,9 @@ use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::RolloutItem;
|
||||
use codex_core::protocol::RolloutLine;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::protocol::ThreadOrigin;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::load_default_config_for_test;
|
||||
use core_test_support::skip_if_no_network;
|
||||
@@ -131,7 +133,14 @@ async fn fork_thread_twice_drops_to_first_message() {
|
||||
thread: codex_fork1,
|
||||
..
|
||||
} = thread_manager
|
||||
.fork_thread(1, config_for_fork.clone(), base_path.clone())
|
||||
.fork_thread(
|
||||
1,
|
||||
config_for_fork.clone(),
|
||||
base_path.clone(),
|
||||
ThreadOrigin::Forked {
|
||||
parent_thread_id: ThreadId::new(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("fork 1");
|
||||
|
||||
@@ -150,7 +159,14 @@ async fn fork_thread_twice_drops_to_first_message() {
|
||||
thread: codex_fork2,
|
||||
..
|
||||
} = thread_manager
|
||||
.fork_thread(0, config_for_fork.clone(), fork1_path.clone())
|
||||
.fork_thread(
|
||||
0,
|
||||
config_for_fork.clone(),
|
||||
fork1_path.clone(),
|
||||
ThreadOrigin::Forked {
|
||||
parent_thread_id: ThreadId::new(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
.expect("fork 2");
|
||||
|
||||
|
||||
@@ -4,6 +4,8 @@ use codex_core::protocol::AskForApproval;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::SandboxPolicy;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::ThreadOrigin;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use core_test_support::responses::ev_completed;
|
||||
@@ -353,7 +355,14 @@ async fn resume_and_fork_append_permissions_messages() -> Result<()> {
|
||||
fork_config.approval_policy = Constrained::allow_any(AskForApproval::UnlessTrusted);
|
||||
let forked = initial
|
||||
.thread_manager
|
||||
.fork_thread(usize::MAX, fork_config, rollout_path)
|
||||
.fork_thread(
|
||||
usize::MAX,
|
||||
fork_config,
|
||||
rollout_path,
|
||||
ThreadOrigin::Forked {
|
||||
parent_thread_id: ThreadId::new(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
forked
|
||||
.thread
|
||||
|
||||
@@ -1498,7 +1498,7 @@ pub enum SessionSource {
|
||||
VSCode,
|
||||
Exec,
|
||||
Mcp,
|
||||
SubAgent(SubAgentSource),
|
||||
SubAgent(SpawnedThreadKind),
|
||||
#[serde(other)]
|
||||
Unknown,
|
||||
}
|
||||
@@ -1506,12 +1506,28 @@ pub enum SessionSource {
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
#[ts(rename_all = "snake_case")]
|
||||
pub enum SubAgentSource {
|
||||
pub enum SpawnedThreadKind {
|
||||
Review,
|
||||
Compact,
|
||||
Other(String),
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, JsonSchema, TS)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
#[ts(tag = "type", rename_all = "snake_case")]
|
||||
pub enum ThreadOrigin {
|
||||
UserRequested,
|
||||
Forked {
|
||||
parent_thread_id: ThreadId,
|
||||
},
|
||||
SpawnedByThread {
|
||||
parent_thread_id: ThreadId,
|
||||
kind: SpawnedThreadKind,
|
||||
},
|
||||
#[serde(other)]
|
||||
Unknown,
|
||||
}
|
||||
|
||||
impl fmt::Display for SessionSource {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
@@ -1519,18 +1535,18 @@ impl fmt::Display for SessionSource {
|
||||
SessionSource::VSCode => f.write_str("vscode"),
|
||||
SessionSource::Exec => f.write_str("exec"),
|
||||
SessionSource::Mcp => f.write_str("mcp"),
|
||||
SessionSource::SubAgent(sub_source) => write!(f, "subagent_{sub_source}"),
|
||||
SessionSource::SubAgent(kind) => write!(f, "subagent_{kind}"),
|
||||
SessionSource::Unknown => f.write_str("unknown"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for SubAgentSource {
|
||||
impl fmt::Display for SpawnedThreadKind {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
SubAgentSource::Review => f.write_str("review"),
|
||||
SubAgentSource::Compact => f.write_str("compact"),
|
||||
SubAgentSource::Other(other) => f.write_str(other),
|
||||
SpawnedThreadKind::Review => f.write_str("review"),
|
||||
SpawnedThreadKind::Compact => f.write_str("compact"),
|
||||
SpawnedThreadKind::Other(other) => f.write_str(other),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1545,6 +1561,8 @@ pub struct SessionMeta {
|
||||
pub id: ThreadId,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub forked_from_id: Option<ThreadId>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
pub thread_origin: Option<ThreadOrigin>,
|
||||
pub timestamp: String,
|
||||
pub cwd: PathBuf,
|
||||
pub originator: String,
|
||||
@@ -1563,6 +1581,7 @@ impl Default for SessionMeta {
|
||||
SessionMeta {
|
||||
id: ThreadId::default(),
|
||||
forked_from_id: None,
|
||||
thread_origin: None,
|
||||
timestamp: String::new(),
|
||||
cwd: PathBuf::new(),
|
||||
originator: String::new(),
|
||||
|
||||
@@ -48,11 +48,13 @@ use codex_core::protocol::Op;
|
||||
use codex_core::protocol::SessionSource;
|
||||
use codex_core::protocol::SkillErrorInfo;
|
||||
use codex_core::protocol::TokenUsage;
|
||||
use codex_core::read_session_meta_line;
|
||||
use codex_otel::OtelManager;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::openai_models::ModelPreset;
|
||||
use codex_protocol::openai_models::ModelUpgrade;
|
||||
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
|
||||
use codex_protocol::protocol::ThreadOrigin;
|
||||
use color_eyre::eyre::Result;
|
||||
use color_eyre::eyre::WrapErr;
|
||||
use crossterm::event::KeyCode;
|
||||
@@ -531,8 +533,21 @@ impl App {
|
||||
ChatWidget::new_from_existing(init, resumed.thread, resumed.session_configured)
|
||||
}
|
||||
SessionSelection::Fork(path) => {
|
||||
let parent_thread_id = read_session_meta_line(&path)
|
||||
.await
|
||||
.wrap_err_with(|| {
|
||||
let path_display = path.display();
|
||||
format!("Failed to read session metadata from {path_display}")
|
||||
})?
|
||||
.meta
|
||||
.id;
|
||||
let forked = thread_manager
|
||||
.fork_thread(usize::MAX, config.clone(), path.clone())
|
||||
.fork_thread(
|
||||
usize::MAX,
|
||||
config.clone(),
|
||||
path.clone(),
|
||||
ThreadOrigin::Forked { parent_thread_id },
|
||||
)
|
||||
.await
|
||||
.wrap_err_with(|| {
|
||||
let path_display = path.display();
|
||||
@@ -836,9 +851,24 @@ impl App {
|
||||
let summary =
|
||||
session_summary(self.chat_widget.token_usage(), self.chat_widget.thread_id());
|
||||
if let Some(path) = self.chat_widget.rollout_path() {
|
||||
let parent_thread_id = match read_session_meta_line(&path).await {
|
||||
Ok(meta_line) => meta_line.meta.id,
|
||||
Err(err) => {
|
||||
let path_display = path.display();
|
||||
self.chat_widget.add_error_message(format!(
|
||||
"Failed to read session metadata from {path_display}: {err}"
|
||||
));
|
||||
return Ok(AppRunControl::Continue);
|
||||
}
|
||||
};
|
||||
match self
|
||||
.server
|
||||
.fork_thread(usize::MAX, self.config.clone(), path.clone())
|
||||
.fork_thread(
|
||||
usize::MAX,
|
||||
self.config.clone(),
|
||||
path.clone(),
|
||||
ThreadOrigin::Forked { parent_thread_id },
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(forked) => {
|
||||
|
||||
Reference in New Issue
Block a user