feat: ephemeral threads (#9765)

Add ephemeral threads capabilities. Only exposed through the
`app-server` v2

The idea is to disable the rollout recorder for those threads.
This commit is contained in:
jif-oai
2026-01-24 15:57:40 +01:00
committed by GitHub
parent 515ac2cd19
commit 83775f4df1
30 changed files with 343 additions and 166 deletions

View File

@@ -1006,7 +1006,15 @@ pub(crate) async fn apply_bespoke_event_handling(
};
if let Some(request_id) = pending {
let rollout_path = conversation.rollout_path();
let Some(rollout_path) = conversation.rollout_path() else {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "thread has no persisted rollout".to_string(),
data: None,
};
outgoing.send_error(request_id, error).await;
return;
};
let response = match read_summary_from_rollout(
rollout_path.as_path(),
fallback_model_provider.as_str(),

View File

@@ -134,6 +134,7 @@ use codex_core::InitialHistory;
use codex_core::NewThread;
use codex_core::RolloutRecorder;
use codex_core::SessionMeta;
use codex_core::ThreadConfigSnapshot;
use codex_core::ThreadManager;
use codex_core::ThreadSortKey as CoreThreadSortKey;
use codex_core::auth::CLIENT_ID;
@@ -172,6 +173,7 @@ use codex_protocol::config_types::ForcedLoginMethod;
use codex_protocol::config_types::Personality;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AgentStatus;
use codex_protocol::protocol::GitInfo as CoreGitInfo;
use codex_protocol::protocol::McpAuthStatus as CoreMcpAuthStatus;
use codex_protocol::protocol::McpServerRefreshConfig;
@@ -192,7 +194,6 @@ use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use tokio::select;
use tokio::sync::Mutex;
use tokio::sync::broadcast;
use tokio::sync::oneshot;
@@ -1378,11 +1379,23 @@ impl CodexMessageProcessor {
session_configured,
..
} = new_thread;
let rollout_path = match session_configured.rollout_path {
Some(path) => path,
None => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "rollout path missing for v1 conversation".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let response = NewConversationResponse {
conversation_id: thread_id,
model: session_configured.model,
reasoning_effort: session_configured.reasoning_effort,
rollout_path: session_configured.rollout_path,
rollout_path,
};
self.outgoing.send_response(request_id, response).await;
}
@@ -1398,7 +1411,7 @@ impl CodexMessageProcessor {
}
async fn thread_start(&mut self, request_id: RequestId, params: ThreadStartParams) {
let typesafe_overrides = self.build_thread_config_overrides(
let mut typesafe_overrides = self.build_thread_config_overrides(
params.model,
params.model_provider,
params.cwd,
@@ -1408,6 +1421,7 @@ impl CodexMessageProcessor {
params.developer_instructions,
params.personality,
);
typesafe_overrides.ephemeral = Some(params.ephemeral.unwrap_or_default());
let config =
match derive_config_from_params(&self.cli_overrides, params.config, typesafe_overrides)
@@ -1429,50 +1443,45 @@ impl CodexMessageProcessor {
Ok(new_conv) => {
let NewThread {
thread_id,
thread,
session_configured,
..
} = new_conv;
let rollout_path = session_configured.rollout_path.clone();
let config_snapshot = thread.config_snapshot().await;
let fallback_provider = self.config.model_provider_id.as_str();
// A bit hacky, but the summary contains a lot of useful information for the thread
// that unfortunately does not get returned from thread_manager.start_thread().
let thread = match read_summary_from_rollout(
rollout_path.as_path(),
fallback_provider,
)
.await
{
Ok(summary) => summary_to_thread(summary),
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_id}: {err}",
rollout_path.display()
),
)
.await;
return;
let thread = match session_configured.rollout_path.as_ref() {
Some(rollout_path) => {
match read_summary_from_rollout(rollout_path.as_path(), fallback_provider)
.await
{
Ok(summary) => summary_to_thread(summary),
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_id}: {err}",
rollout_path.display()
),
)
.await;
return;
}
}
}
None => build_ephemeral_thread(thread_id, &config_snapshot),
};
let SessionConfiguredEvent {
model,
model_provider_id,
cwd,
approval_policy,
sandbox_policy,
..
} = session_configured;
let response = ThreadStartResponse {
thread: thread.clone(),
model,
model_provider: model_provider_id,
cwd,
approval_policy: approval_policy.into(),
sandbox: sandbox_policy.into(),
reasoning_effort: session_configured.reasoning_effort,
model: config_snapshot.model,
model_provider: config_snapshot.model_provider_id,
cwd: config_snapshot.cwd,
approval_policy: config_snapshot.approval_policy.into(),
sandbox: config_snapshot.sandbox_policy.into(),
reasoning_effort: config_snapshot.reasoning_effort,
};
// Auto-attach a thread listener when starting a thread.
@@ -1725,7 +1734,7 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
async fn thread_read(&self, request_id: RequestId, params: ThreadReadParams) {
async fn thread_read(&mut self, request_id: RequestId, params: ThreadReadParams) {
let ThreadReadParams {
thread_id,
include_turns,
@@ -1744,15 +1753,8 @@ impl CodexMessageProcessor {
match find_thread_path_by_id_str(&self.config.codex_home, &thread_uuid.to_string())
.await
{
Ok(Some(path)) => path,
Ok(None) => {
self.send_invalid_request_error(
request_id,
format!("no rollout found for thread id {thread_uuid}"),
)
.await;
return;
}
Ok(Some(path)) => Some(path),
Ok(None) => None,
Err(err) => {
self.send_invalid_request_error(
request_id,
@@ -1763,24 +1765,45 @@ impl CodexMessageProcessor {
}
};
let fallback_provider = self.config.model_provider_id.as_str();
let mut thread = match read_summary_from_rollout(&rollout_path, fallback_provider).await {
Ok(summary) => summary_to_thread(summary),
Err(err) => {
self.send_internal_error(
let mut thread = if let Some(rollout_path) = rollout_path.as_ref() {
let fallback_provider = self.config.model_provider_id.as_str();
match read_summary_from_rollout(rollout_path, fallback_provider).await {
Ok(summary) => summary_to_thread(summary),
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_uuid}: {err}",
rollout_path.display()
),
)
.await;
return;
}
}
} else {
let Ok(thread) = self.thread_manager.get_thread(thread_uuid).await else {
self.send_invalid_request_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_uuid}: {err}",
rollout_path.display()
),
format!("thread not loaded: {thread_uuid}"),
)
.await;
return;
};
let config_snapshot = thread.config_snapshot().await;
if include_turns {
self.send_invalid_request_error(
request_id,
"ephemeral threads do not support includeTurns".to_string(),
)
.await;
return;
}
build_ephemeral_thread(thread_uuid, &config_snapshot)
};
if include_turns {
match read_event_msgs_from_rollout(&rollout_path).await {
if include_turns && let Some(rollout_path) = rollout_path.as_ref() {
match read_event_msgs_from_rollout(rollout_path).await {
Ok(events) => {
thread.turns = build_turns_from_event_msgs(&events);
}
@@ -1967,6 +1990,14 @@ impl CodexMessageProcessor {
initial_messages,
..
} = session_configured;
let Some(rollout_path) = rollout_path else {
self.send_internal_error(
request_id,
format!("rollout path missing for thread {thread_id}"),
)
.await;
return;
};
// Auto-attach a thread listener when resuming a thread.
if let Err(err) = self
.attach_conversation_listener(thread_id, false, ApiVersion::V2)
@@ -2170,6 +2201,14 @@ impl CodexMessageProcessor {
initial_messages,
..
} = session_configured;
let Some(rollout_path) = rollout_path else {
self.send_internal_error(
request_id,
format!("rollout path missing for thread {thread_id}"),
)
.await;
return;
};
// Auto-attach a conversation listener when forking a thread.
if let Err(err) = self
.attach_conversation_listener(thread_id, false, ApiVersion::V2)
@@ -2919,6 +2958,18 @@ impl CodexMessageProcessor {
session_configured,
..
}) => {
let rollout_path = match session_configured.rollout_path.clone() {
Some(path) => path,
None => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "rollout path missing for resumed conversation".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
self.outgoing
.send_server_notification(ServerNotification::SessionConfigured(
SessionConfiguredNotification {
@@ -2928,7 +2979,7 @@ impl CodexMessageProcessor {
history_log_id: session_configured.history_log_id,
history_entry_count: session_configured.history_entry_count,
initial_messages: session_configured.initial_messages.clone(),
rollout_path: session_configured.rollout_path.clone(),
rollout_path: rollout_path.clone(),
},
))
.await;
@@ -2941,7 +2992,7 @@ impl CodexMessageProcessor {
conversation_id: thread_id,
model: session_configured.model.clone(),
initial_messages,
rollout_path: session_configured.rollout_path.clone(),
rollout_path,
};
self.outgoing.send_response(request_id, response).await;
}
@@ -3117,6 +3168,19 @@ impl CodexMessageProcessor {
}
};
let rollout_path = match session_configured.rollout_path.clone() {
Some(path) => path,
None => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "rollout path missing for forked conversation".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
self.outgoing
.send_server_notification(ServerNotification::SessionConfigured(
SessionConfiguredNotification {
@@ -3126,7 +3190,7 @@ impl CodexMessageProcessor {
history_log_id: session_configured.history_log_id,
history_entry_count: session_configured.history_entry_count,
initial_messages: session_configured.initial_messages.clone(),
rollout_path: session_configured.rollout_path.clone(),
rollout_path: rollout_path.clone(),
},
))
.await;
@@ -3139,7 +3203,7 @@ impl CodexMessageProcessor {
conversation_id: thread_id,
model: session_configured.model.clone(),
initial_messages,
rollout_path: session_configured.rollout_path.clone(),
rollout_path,
};
self.outgoing.send_response(request_id, response).await;
}
@@ -3250,51 +3314,27 @@ impl CodexMessageProcessor {
// If the thread is active, request shutdown and wait briefly.
if let Some(conversation) = self.thread_manager.remove_thread(&thread_id).await {
info!("thread {thread_id} was active; shutting down");
let conversation_clone = conversation.clone();
let notify = Arc::new(tokio::sync::Notify::new());
let notify_clone = notify.clone();
// Establish the listener for ShutdownComplete before submitting
// Shutdown so it is not missed.
let is_shutdown = tokio::spawn(async move {
// Create the notified future outside the loop to avoid losing notifications.
let notified = notify_clone.notified();
tokio::pin!(notified);
loop {
select! {
_ = &mut notified => { break; }
event = conversation_clone.next_event() => {
match event {
Ok(event) => {
if matches!(event.msg, EventMsg::ShutdownComplete) { break; }
}
// Break on errors to avoid tight loops when the agent loop has exited.
Err(_) => { break; }
}
}
}
}
});
// Request shutdown.
match conversation.submit(Op::Shutdown).await {
Ok(_) => {
// Successfully submitted Shutdown; wait before proceeding.
select! {
_ = is_shutdown => {
// Normal shutdown: proceed with archive.
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
warn!("thread {thread_id} shutdown timed out; proceeding with archive");
// Wake any waiter; use notify_waiters to avoid missing the signal.
notify.notify_waiters();
// Perhaps we lost a shutdown race, so let's continue to
// clean up the .jsonl file.
// Poll agent status rather than consuming events so attached listeners do not block shutdown.
let wait_for_shutdown = async {
loop {
if matches!(conversation.agent_status().await, AgentStatus::Shutdown) {
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
};
if tokio::time::timeout(Duration::from_secs(10), wait_for_shutdown)
.await
.is_err()
{
warn!("thread {thread_id} shutdown timed out; proceeding with archive");
}
}
Err(err) => {
error!("failed to submit Shutdown to thread {thread_id}: {err}");
notify.notify_waiters();
}
}
}
@@ -3809,23 +3849,29 @@ impl CodexMessageProcessor {
);
}
let rollout_path = review_thread.rollout_path();
let fallback_provider = self.config.model_provider_id.as_str();
match read_summary_from_rollout(rollout_path.as_path(), fallback_provider).await {
Ok(summary) => {
let thread = summary_to_thread(summary);
let notif = ThreadStartedNotification { thread };
self.outgoing
.send_server_notification(ServerNotification::ThreadStarted(notif))
.await;
}
Err(err) => {
tracing::warn!(
"failed to load summary for review thread {}: {}",
session_configured.session_id,
err
);
if let Some(rollout_path) = review_thread.rollout_path() {
match read_summary_from_rollout(rollout_path.as_path(), fallback_provider).await {
Ok(summary) => {
let thread = summary_to_thread(summary);
let notif = ThreadStartedNotification { thread };
self.outgoing
.send_server_notification(ServerNotification::ThreadStarted(notif))
.await;
}
Err(err) => {
tracing::warn!(
"failed to load summary for review thread {}: {}",
session_configured.session_id,
err
);
}
}
} else {
tracing::warn!(
"review thread {} has no rollout path",
session_configured.session_id
);
}
let turn_id = review_thread
@@ -4228,7 +4274,7 @@ impl CodexMessageProcessor {
async fn resolve_rollout_path(&self, conversation_id: ThreadId) -> Option<PathBuf> {
match self.thread_manager.get_thread(conversation_id).await {
Ok(conv) => Some(conv.rollout_path()),
Ok(conv) => conv.rollout_path(),
Err(_) => None,
}
}
@@ -4493,6 +4539,23 @@ async fn read_updated_at(path: &Path, created_at: Option<&str>) -> Option<String
updated_at.or_else(|| created_at.map(str::to_string))
}
fn build_ephemeral_thread(thread_id: ThreadId, config_snapshot: &ThreadConfigSnapshot) -> Thread {
let now = time::OffsetDateTime::now_utc().unix_timestamp();
Thread {
id: thread_id.to_string(),
preview: String::new(),
model_provider: config_snapshot.model_provider_id.clone(),
created_at: now,
updated_at: now,
path: None,
cwd: config_snapshot.cwd.clone(),
cli_version: env!("CARGO_PKG_VERSION").to_string(),
source: config_snapshot.session_source.clone().into(),
git_info: None,
turns: Vec::new(),
}
}
pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
let ConversationSummary {
conversation_id,
@@ -4521,7 +4584,7 @@ pub(crate) fn summary_to_thread(summary: ConversationSummary) -> Thread {
model_provider,
created_at: created_at.map(|dt| dt.timestamp()).unwrap_or(0),
updated_at: updated_at.map(|dt| dt.timestamp()).unwrap_or(0),
path,
path: Some(path),
cwd,
cli_version,
source: source.into(),