Add ephemeral flag support to thread fork (#14248)

### Summary
This PR adds first-class ephemeral support to thread/fork, bringing it
in line with thread/start. The goal is to support one-off completions on
full forked threads without persisting them as normal user-visible
threads.

### Testing
This commit is contained in:
joeytrasatti-openai
2026-03-10 16:34:27 -07:00
committed by Michael Bolin
parent 07c22d20f6
commit 8ac27b2a16
9 changed files with 290 additions and 82 deletions

View File

@@ -3666,9 +3666,9 @@ impl CodexMessageProcessor {
thread.id = thread_id.to_string();
thread.path = Some(rollout_path.to_path_buf());
let history_items = thread_history.get_rollout_items();
if let Err(message) = populate_resume_turns(
if let Err(message) = populate_thread_turns(
&mut thread,
ResumeTurnSource::HistoryItems(&history_items),
ThreadTurnSource::HistoryItems(&history_items),
None,
)
.await
@@ -3704,6 +3704,7 @@ impl CodexMessageProcessor {
config: cli_overrides,
base_instructions,
developer_instructions,
ephemeral,
persist_extended_history,
} = params;
@@ -3713,12 +3714,11 @@ impl CodexMessageProcessor {
let existing_thread_id = match ThreadId::from_string(&thread_id) {
Ok(id) => id,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid thread id: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
self.send_invalid_request_error(
request_id,
format!("invalid thread id: {err}"),
)
.await;
return;
}
};
@@ -3775,7 +3775,7 @@ impl CodexMessageProcessor {
} else {
Some(cli_overrides)
};
let typesafe_overrides = self.build_thread_config_overrides(
let mut typesafe_overrides = self.build_thread_config_overrides(
model,
model_provider,
service_tier,
@@ -3786,6 +3786,7 @@ impl CodexMessageProcessor {
developer_instructions,
None,
);
typesafe_overrides.ephemeral = ephemeral.then_some(true);
// Derive a Config using the same logic as new conversation, honoring overrides if provided.
let cloud_requirements = self.current_cloud_requirements();
let config = match derive_config_for_cwd(
@@ -3799,12 +3800,11 @@ impl CodexMessageProcessor {
{
Ok(config) => config,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("error deriving config: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
self.send_invalid_request_error(
request_id,
format!("error deriving config: {err}"),
)
.await;
return;
}
};
@@ -3813,6 +3813,7 @@ impl CodexMessageProcessor {
let NewThread {
thread_id,
thread: forked_thread,
session_configured,
..
} = match self
@@ -3827,33 +3828,29 @@ impl CodexMessageProcessor {
{
Ok(thread) => thread,
Err(err) => {
let (code, message) = match err {
CodexErr::Io(_) | CodexErr::Json(_) => (
INVALID_REQUEST_ERROR_CODE,
format!("failed to load rollout `{}`: {err}", rollout_path.display()),
),
CodexErr::InvalidRequest(message) => (INVALID_REQUEST_ERROR_CODE, message),
_ => (INTERNAL_ERROR_CODE, format!("error forking thread: {err}")),
};
let error = JSONRPCErrorError {
code,
message,
data: None,
};
self.outgoing.send_error(request_id, error).await;
match err {
CodexErr::Io(_) | CodexErr::Json(_) => {
self.send_invalid_request_error(
request_id,
format!("failed to load rollout `{}`: {err}", rollout_path.display()),
)
.await;
}
CodexErr::InvalidRequest(message) => {
self.send_invalid_request_error(request_id, message).await;
}
_ => {
self.send_internal_error(
request_id,
format!("error forking thread: {err}"),
)
.await;
}
}
return;
}
};
let SessionConfiguredEvent { rollout_path, .. } = session_configured;
let Some(rollout_path) = rollout_path else {
self.send_internal_error(
request_id,
format!("rollout path missing for thread {thread_id}"),
)
.await;
return;
};
// Auto-attach a conversation listener when forking a thread.
Self::log_listener_attach_result(
self.ensure_conversation_listener(
@@ -3868,41 +3865,71 @@ impl CodexMessageProcessor {
"thread",
);
let mut thread = match read_summary_from_rollout(
rollout_path.as_path(),
fallback_model_provider.as_str(),
)
.await
{
Ok(summary) => summary_to_thread(summary),
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_id}: {err}",
rollout_path.display()
),
)
.await;
// Persistent forks materialize their own rollout immediately. Ephemeral forks stay
// pathless, so they rebuild their visible history from the copied source rollout instead.
let mut thread = if let Some(fork_rollout_path) = session_configured.rollout_path.as_ref() {
match read_summary_from_rollout(
fork_rollout_path.as_path(),
fallback_model_provider.as_str(),
)
.await
{
Ok(summary) => summary_to_thread(summary),
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_id}: {err}",
fork_rollout_path.display()
),
)
.await;
return;
}
}
} else {
let config_snapshot = forked_thread.config_snapshot().await;
// forked thread names do not inherit the source thread name
let mut thread = build_thread_from_snapshot(thread_id, &config_snapshot, None);
let history_items = match read_rollout_items_from_rollout(rollout_path.as_path()).await
{
Ok(items) => items,
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load source rollout `{}` for thread {thread_id}: {err}",
rollout_path.display()
),
)
.await;
return;
}
};
thread.preview = preview_from_rollout_items(&history_items);
if let Err(message) = populate_thread_turns(
&mut thread,
ThreadTurnSource::HistoryItems(&history_items),
None,
)
.await
{
self.send_internal_error(request_id, message).await;
return;
}
thread
};
// forked thread names do not inherit the source thread name
match read_rollout_items_from_rollout(rollout_path.as_path()).await {
Ok(items) => {
thread.turns = build_turns_from_rollout_items(&items);
}
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_id}: {err}",
rollout_path.display()
),
)
.await;
return;
}
if let Some(fork_rollout_path) = session_configured.rollout_path.as_ref()
&& let Err(message) = populate_thread_turns(
&mut thread,
ThreadTurnSource::RolloutPath(fork_rollout_path.as_path()),
None,
)
.await
{
self.send_internal_error(request_id, message).await;
return;
}
self.thread_watch_manager
@@ -6990,9 +7017,9 @@ async fn handle_pending_thread_resume_request(
let request_id = pending.request_id;
let connection_id = request_id.connection_id;
let mut thread = pending.thread_summary;
if let Err(message) = populate_resume_turns(
if let Err(message) = populate_thread_turns(
&mut thread,
ResumeTurnSource::RolloutPath(pending.rollout_path.as_path()),
ThreadTurnSource::RolloutPath(pending.rollout_path.as_path()),
active_turn.as_ref(),
)
.await
@@ -7054,18 +7081,18 @@ async fn handle_pending_thread_resume_request(
.await;
}
enum ResumeTurnSource<'a> {
enum ThreadTurnSource<'a> {
RolloutPath(&'a Path),
HistoryItems(&'a [RolloutItem]),
}
async fn populate_resume_turns(
async fn populate_thread_turns(
thread: &mut Thread,
turn_source: ResumeTurnSource<'_>,
turn_source: ThreadTurnSource<'_>,
active_turn: Option<&Turn>,
) -> std::result::Result<(), String> {
let mut turns = match turn_source {
ResumeTurnSource::RolloutPath(rollout_path) => {
ThreadTurnSource::RolloutPath(rollout_path) => {
read_rollout_items_from_rollout(rollout_path)
.await
.map(|items| build_turns_from_rollout_items(&items))
@@ -7077,7 +7104,7 @@ async fn populate_resume_turns(
)
})?
}
ResumeTurnSource::HistoryItems(items) => build_turns_from_rollout_items(items),
ThreadTurnSource::HistoryItems(items) => build_turns_from_rollout_items(items),
};
if let Some(active_turn) = active_turn {
merge_turn_history_with_active_turn(&mut turns, active_turn.clone());