Compare commits

...

10 Commits

Author SHA1 Message Date
jif-oai
00538f2ed9 Merge branch 'main' into jif/enable-parallel-tc 2025-10-27 13:41:20 +00:00
jif-oai
81be54b229 fix: test yield time (#5811) 2025-10-27 11:57:29 +00:00
jif-oai
5e8659dcbc chore: undo nits (#5631) 2025-10-27 11:48:01 +00:00
jif-oai
2338294b39 nit: doc on session task (#5809) 2025-10-27 11:43:33 +00:00
jif-oai
afc4eaab8b feat: TUI undo op (#5629) 2025-10-27 10:55:29 +00:00
jif-oai
e92c4f6561 feat: async ghost commit (#5618) 2025-10-27 10:09:10 +00:00
Michael Bolin
15fa2283e7 feat: update NewConversationParams to take an optional model_provider (#5793)
An AppServer client should be able to use any (`model_provider`, `model`) in the user's config. `NewConversationParams` already supported specifying the `model`, but this PR expands it to support `model_provider`, as well.

---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/5793).
* #5803
* __->__ #5793
2025-10-27 09:33:30 +00:00
Michael Bolin
5907422d65 feat: annotate conversations with model_provider for filtering (#5658)
Because conversations that use the Responses API can have encrypted
reasoning messages, trying to resume a conversation with a different
provider could lead to confusing "failed to decrypt" errors. (This is
reproducible by starting a conversation using ChatGPT login and resuming
it as a conversation that uses OpenAI models via Azure.)

This changes `ListConversationsParams` to take a `model_providers:
Option<Vec<String>>` and adds `model_provider` on each
`ConversationSummary` it returns so these cases can be disambiguated.

Note this ended up making changes to
`codex-rs/core/src/rollout/tests.rs` because it had a number of cases
where it expected `Some` for the value of `next_cursor`, but the list of
rollouts was complete, so according to this docstring:


bcd64c7e72/codex-rs/app-server-protocol/src/protocol.rs (L334-L337)

If there are no more items to return, then `next_cursor` should be
`None`. This PR updates that logic.






---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/5658).
* #5803
* #5793
* __->__ #5658
2025-10-27 02:03:30 -07:00
Ahmed Ibrahim
f178805252 Add feedback upload request handling (#5682) 2025-10-27 05:53:39 +00:00
jif-oai
ce5c0b6306 Enable parallel tc 2025-10-23 17:23:51 +01:00
55 changed files with 1740 additions and 355 deletions

20
codex-rs/Cargo.lock generated
View File

@@ -843,6 +843,7 @@ dependencies = [
"codex-backend-client",
"codex-common",
"codex-core",
"codex-feedback",
"codex-file-search",
"codex-login",
"codex-protocol",
@@ -1062,10 +1063,12 @@ dependencies = [
"codex-apply-patch",
"codex-async-utils",
"codex-file-search",
"codex-git-tooling",
"codex-otel",
"codex-protocol",
"codex-rmcp-client",
"codex-utils-pty",
"codex-utils-readiness",
"codex-utils-string",
"codex-utils-tokenizer",
"core-foundation 0.9.4",
@@ -1209,8 +1212,11 @@ version = "0.0.0"
dependencies = [
"assert_matches",
"pretty_assertions",
"schemars 0.8.22",
"serde",
"tempfile",
"thiserror 2.0.16",
"ts-rs",
"walkdir",
]
@@ -1328,6 +1334,7 @@ version = "0.0.0"
dependencies = [
"anyhow",
"base64",
"codex-git-tooling",
"icu_decimal",
"icu_locale_core",
"mcp-types",
@@ -1428,7 +1435,6 @@ dependencies = [
"codex-core",
"codex-feedback",
"codex-file-search",
"codex-git-tooling",
"codex-login",
"codex-ollama",
"codex-protocol",
@@ -5458,9 +5464,9 @@ dependencies = [
[[package]]
name = "serde"
version = "1.0.226"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0dca6411025b24b60bfa7ec1fe1f8e710ac09782dca409ee8237ba74b51295fd"
checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e"
dependencies = [
"serde_core",
"serde_derive",
@@ -5468,18 +5474,18 @@ dependencies = [
[[package]]
name = "serde_core"
version = "1.0.226"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba2ba63999edb9dac981fb34b3e5c0d111a69b0924e253ed29d83f7c99e966a4"
checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.226"
version = "1.0.228"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8db53ae22f34573731bafa1db20f04027b2d25e02d8205921b569171699cdb33"
checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79"
dependencies = [
"proc-macro2",
"quote",

View File

@@ -124,6 +124,13 @@ client_request_definitions! {
response: GetAccountRateLimitsResponse,
},
#[serde(rename = "feedback/upload")]
#[ts(rename = "feedback/upload")]
UploadFeedback {
params: UploadFeedbackParams,
response: UploadFeedbackResponse,
},
#[serde(rename = "account/read")]
#[ts(rename = "account/read")]
GetAccount {
@@ -259,6 +266,10 @@ pub struct NewConversationParams {
#[serde(skip_serializing_if = "Option::is_none")]
pub model: Option<String>,
/// Override the model provider to use for this session.
#[serde(skip_serializing_if = "Option::is_none")]
pub model_provider: Option<String>,
/// Configuration profile from config.toml to specify default options.
#[serde(skip_serializing_if = "Option::is_none")]
pub profile: Option<String>,
@@ -320,6 +331,12 @@ pub struct ListConversationsParams {
/// Opaque pagination cursor returned by a previous call.
#[serde(skip_serializing_if = "Option::is_none")]
pub cursor: Option<String>,
/// Optional model provider filter (matches against session metadata).
/// - None => filter by the server's default model provider
/// - Some([]) => no filtering, include all providers
/// - Some([...]) => only include sessions with one of the specified providers
#[serde(skip_serializing_if = "Option::is_none")]
pub model_providers: Option<Vec<String>>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
@@ -331,6 +348,8 @@ pub struct ConversationSummary {
/// RFC3339 timestamp string for the session start, if available.
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<String>,
/// Model provider recorded for the session (resolved when absent in metadata).
pub model_provider: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
@@ -384,6 +403,23 @@ pub struct ListModelsResponse {
pub next_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
pub struct UploadFeedbackParams {
pub classification: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub conversation_id: Option<ConversationId>,
pub include_logs: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
pub struct UploadFeedbackResponse {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(tag = "type")]
#[ts(tag = "type")]
@@ -1000,6 +1036,7 @@ mod tests {
request_id: RequestId::Integer(42),
params: NewConversationParams {
model: Some("gpt-5-codex".to_string()),
model_provider: None,
profile: None,
cwd: None,
approval_policy: Some(AskForApproval::OnRequest),

View File

@@ -24,6 +24,7 @@ codex-file-search = { workspace = true }
codex-login = { workspace = true }
codex-protocol = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-feedback = { workspace = true }
codex-utils-json-to-toml = { workspace = true }
chrono = { workspace = true }
serde = { workspace = true, features = ["derive"] }

View File

@@ -52,6 +52,8 @@ use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::SessionConfiguredNotification;
use codex_app_server_protocol::SetDefaultModelParams;
use codex_app_server_protocol::SetDefaultModelResponse;
use codex_app_server_protocol::UploadFeedbackParams;
use codex_app_server_protocol::UploadFeedbackResponse;
use codex_app_server_protocol::UserInfoResponse;
use codex_app_server_protocol::UserSavedConfig;
use codex_backend_client::Client as BackendClient;
@@ -85,6 +87,7 @@ use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecApprovalRequestEvent;
use codex_core::protocol::Op;
use codex_core::protocol::ReviewDecision;
use codex_feedback::CodexFeedback;
use codex_login::ServerOptions as LoginServerOptions;
use codex_login::ShutdownHandle;
use codex_login::run_login_server;
@@ -136,6 +139,7 @@ pub(crate) struct CodexMessageProcessor {
// Queue of pending interrupt requests per conversation. We reply when TurnAborted arrives.
pending_interrupts: Arc<Mutex<HashMap<ConversationId, Vec<RequestId>>>>,
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
feedback: CodexFeedback,
}
impl CodexMessageProcessor {
@@ -145,6 +149,7 @@ impl CodexMessageProcessor {
outgoing: Arc<OutgoingMessageSender>,
codex_linux_sandbox_exe: Option<PathBuf>,
config: Arc<Config>,
feedback: CodexFeedback,
) -> Self {
Self {
auth_manager,
@@ -156,6 +161,7 @@ impl CodexMessageProcessor {
active_login: Arc::new(Mutex::new(None)),
pending_interrupts: Arc::new(Mutex::new(HashMap::new())),
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
feedback,
}
}
@@ -275,6 +281,9 @@ impl CodexMessageProcessor {
} => {
self.get_account_rate_limits(request_id).await;
}
ClientRequest::UploadFeedback { request_id, params } => {
self.upload_feedback(request_id, params).await;
}
}
}
@@ -818,19 +827,38 @@ impl CodexMessageProcessor {
request_id: RequestId,
params: ListConversationsParams,
) {
let page_size = params.page_size.unwrap_or(25);
let ListConversationsParams {
page_size,
cursor,
model_providers: model_provider,
} = params;
let page_size = page_size.unwrap_or(25);
// Decode the optional cursor string to a Cursor via serde (Cursor implements Deserialize from string)
let cursor_obj: Option<RolloutCursor> = match params.cursor {
let cursor_obj: Option<RolloutCursor> = match cursor {
Some(s) => serde_json::from_str::<RolloutCursor>(&format!("\"{s}\"")).ok(),
None => None,
};
let cursor_ref = cursor_obj.as_ref();
let model_provider_filter = match model_provider {
Some(providers) => {
if providers.is_empty() {
None
} else {
Some(providers)
}
}
None => Some(vec![self.config.model_provider_id.clone()]),
};
let model_provider_slice = model_provider_filter.as_deref();
let fallback_provider = self.config.model_provider_id.clone();
let page = match RolloutRecorder::list_conversations(
&self.config.codex_home,
page_size,
cursor_ref,
INTERACTIVE_SESSION_SOURCES,
model_provider_slice,
fallback_provider.as_str(),
)
.await
{
@@ -849,7 +877,7 @@ impl CodexMessageProcessor {
let items = page
.items
.into_iter()
.filter_map(|it| extract_conversation_summary(it.path, &it.head))
.filter_map(|it| extract_conversation_summary(it.path, &it.head, &fallback_provider))
.collect();
// Encode next_cursor as a plain string
@@ -1418,6 +1446,77 @@ impl CodexMessageProcessor {
let response = FuzzyFileSearchResponse { files: results };
self.outgoing.send_response(request_id, response).await;
}
async fn upload_feedback(&self, request_id: RequestId, params: UploadFeedbackParams) {
let UploadFeedbackParams {
classification,
reason,
conversation_id,
include_logs,
} = params;
let snapshot = self.feedback.snapshot(conversation_id);
let thread_id = snapshot.thread_id.clone();
let validated_rollout_path = if include_logs {
match conversation_id {
Some(conv_id) => self.resolve_rollout_path(conv_id).await,
None => None,
}
} else {
None
};
let upload_result = tokio::task::spawn_blocking(move || {
let rollout_path_ref = validated_rollout_path.as_deref();
snapshot.upload_feedback(
&classification,
reason.as_deref(),
include_logs,
rollout_path_ref,
)
})
.await;
let upload_result = match upload_result {
Ok(result) => result,
Err(join_err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to upload feedback: {join_err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
match upload_result {
Ok(()) => {
let response = UploadFeedbackResponse { thread_id };
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to upload feedback: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
}
async fn resolve_rollout_path(&self, conversation_id: ConversationId) -> Option<PathBuf> {
match self
.conversation_manager
.get_conversation(conversation_id)
.await
{
Ok(conv) => Some(conv.rollout_path()),
Err(_) => None,
}
}
}
async fn apply_bespoke_event_handling(
@@ -1511,6 +1610,7 @@ async fn derive_config_from_params(
) -> std::io::Result<Config> {
let NewConversationParams {
model,
model_provider,
profile,
cwd,
approval_policy,
@@ -1526,7 +1626,7 @@ async fn derive_config_from_params(
cwd: cwd.map(PathBuf::from),
approval_policy,
sandbox_mode,
model_provider: None,
model_provider,
codex_linux_sandbox_exe,
base_instructions,
include_apply_patch_tool,
@@ -1627,6 +1727,7 @@ async fn on_exec_approval_response(
fn extract_conversation_summary(
path: PathBuf,
head: &[serde_json::Value],
fallback_provider: &str,
) -> Option<ConversationSummary> {
let session_meta = match head.first() {
Some(first_line) => serde_json::from_value::<SessionMeta>(first_line.clone()).ok()?,
@@ -1651,12 +1752,17 @@ fn extract_conversation_summary(
} else {
Some(session_meta.timestamp.clone())
};
let conversation_id = session_meta.id;
let model_provider = session_meta
.model_provider
.unwrap_or_else(|| fallback_provider.to_string());
Some(ConversationSummary {
conversation_id: session_meta.id,
conversation_id,
timestamp,
path,
preview: preview.to_string(),
model_provider,
})
}
@@ -1680,7 +1786,8 @@ mod tests {
"cwd": "/",
"originator": "codex",
"cli_version": "0.0.0",
"instructions": null
"instructions": null,
"model_provider": "test-provider"
}),
json!({
"type": "message",
@@ -1700,7 +1807,8 @@ mod tests {
}),
];
let summary = extract_conversation_summary(path.clone(), &head).expect("summary");
let summary =
extract_conversation_summary(path.clone(), &head, "test-provider").expect("summary");
assert_eq!(summary.conversation_id, conversation_id);
assert_eq!(
@@ -1709,6 +1817,7 @@ mod tests {
);
assert_eq!(summary.path, path);
assert_eq!(summary.preview, "Count to 5");
assert_eq!(summary.model_provider, "test-provider");
Ok(())
}
}

View File

@@ -12,16 +12,19 @@ use crate::message_processor::MessageProcessor;
use crate::outgoing_message::OutgoingMessage;
use crate::outgoing_message::OutgoingMessageSender;
use codex_app_server_protocol::JSONRPCMessage;
use codex_feedback::CodexFeedback;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::io::{self};
use tokio::sync::mpsc;
use tracing::Level;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::Layer;
use tracing_subscriber::filter::Targets;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
@@ -82,6 +85,8 @@ pub async fn run_main(
std::io::Error::new(ErrorKind::InvalidData, format!("error loading config: {e}"))
})?;
let feedback = CodexFeedback::new();
let otel =
codex_core::otel_init::build_provider(&config, env!("CARGO_PKG_VERSION")).map_err(|e| {
std::io::Error::new(
@@ -96,8 +101,15 @@ pub async fn run_main(
.with_writer(std::io::stderr)
.with_filter(EnvFilter::from_default_env());
let feedback_layer = tracing_subscriber::fmt::layer()
.with_writer(feedback.make_writer())
.with_ansi(false)
.with_target(false)
.with_filter(Targets::new().with_default(Level::TRACE));
let _ = tracing_subscriber::registry()
.with(stderr_fmt)
.with(feedback_layer)
.with(otel.as_ref().map(|provider| {
OpenTelemetryTracingBridge::new(&provider.logger).with_filter(
tracing_subscriber::filter::filter_fn(codex_core::otel_init::codex_export_filter),
@@ -112,6 +124,7 @@ pub async fn run_main(
outgoing_message_sender,
codex_linux_sandbox_exe,
std::sync::Arc::new(config),
feedback.clone(),
);
async move {
while let Some(msg) = incoming_rx.recv().await {

View File

@@ -17,6 +17,7 @@ use codex_core::ConversationManager;
use codex_core::config::Config;
use codex_core::default_client::USER_AGENT_SUFFIX;
use codex_core::default_client::get_codex_user_agent;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;
use std::sync::Arc;
@@ -33,6 +34,7 @@ impl MessageProcessor {
outgoing: OutgoingMessageSender,
codex_linux_sandbox_exe: Option<PathBuf>,
config: Arc<Config>,
feedback: CodexFeedback,
) -> Self {
let outgoing = Arc::new(outgoing);
let auth_manager = AuthManager::shared(config.codex_home.clone(), false);
@@ -46,6 +48,7 @@ impl MessageProcessor {
outgoing.clone(),
codex_linux_sandbox_exe,
config,
feedback,
);
Self {

View File

@@ -30,6 +30,7 @@ use codex_app_server_protocol::SendUserMessageParams;
use codex_app_server_protocol::SendUserTurnParams;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::SetDefaultModelParams;
use codex_app_server_protocol::UploadFeedbackParams;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
@@ -242,6 +243,15 @@ impl McpProcess {
self.send_request("account/rateLimits/read", None).await
}
/// Send a `feedback/upload` JSON-RPC request.
pub async fn send_upload_feedback_request(
&mut self,
params: UploadFeedbackParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("feedback/upload", params).await
}
/// Send a `userInfo` JSON-RPC request.
pub async fn send_user_info_request(&mut self) -> anyhow::Result<i64> {
self.send_request("userInfo", None).await

View File

@@ -30,18 +30,21 @@ async fn test_list_and_resume_conversations() {
"2025-01-02T12-00-00",
"2025-01-02T12:00:00Z",
"Hello A",
Some("openai"),
);
create_fake_rollout(
codex_home.path(),
"2025-01-01T13-00-00",
"2025-01-01T13:00:00Z",
"Hello B",
Some("openai"),
);
create_fake_rollout(
codex_home.path(),
"2025-01-01T12-00-00",
"2025-01-01T12:00:00Z",
"Hello C",
None,
);
let mut mcp = McpProcess::new(codex_home.path())
@@ -57,6 +60,7 @@ async fn test_list_and_resume_conversations() {
.send_list_conversations_request(ListConversationsParams {
page_size: Some(2),
cursor: None,
model_providers: None,
})
.await
.expect("send listConversations");
@@ -74,6 +78,8 @@ async fn test_list_and_resume_conversations() {
// Newest first; preview text should match
assert_eq!(items[0].preview, "Hello A");
assert_eq!(items[1].preview, "Hello B");
assert_eq!(items[0].model_provider, "openai");
assert_eq!(items[1].model_provider, "openai");
assert!(items[0].path.is_absolute());
assert!(next_cursor.is_some());
@@ -82,6 +88,7 @@ async fn test_list_and_resume_conversations() {
.send_list_conversations_request(ListConversationsParams {
page_size: Some(2),
cursor: next_cursor,
model_providers: None,
})
.await
.expect("send listConversations page 2");
@@ -99,7 +106,88 @@ async fn test_list_and_resume_conversations() {
} = to_response::<ListConversationsResponse>(resp2).expect("deserialize response");
assert_eq!(items2.len(), 1);
assert_eq!(items2[0].preview, "Hello C");
assert!(next2.is_some());
assert_eq!(items2[0].model_provider, "openai");
assert_eq!(next2, None);
// Add a conversation with an explicit non-OpenAI provider for filter tests.
create_fake_rollout(
codex_home.path(),
"2025-01-01T11-30-00",
"2025-01-01T11:30:00Z",
"Hello TP",
Some("test-provider"),
);
// Filtering by model provider should return only matching sessions.
let filter_req_id = mcp
.send_list_conversations_request(ListConversationsParams {
page_size: Some(10),
cursor: None,
model_providers: Some(vec!["test-provider".to_string()]),
})
.await
.expect("send listConversations filtered");
let filter_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(filter_req_id)),
)
.await
.expect("listConversations filtered timeout")
.expect("listConversations filtered resp");
let ListConversationsResponse {
items: filtered_items,
next_cursor: filtered_next,
} = to_response::<ListConversationsResponse>(filter_resp).expect("deserialize filtered");
assert_eq!(filtered_items.len(), 1);
assert_eq!(filtered_next, None);
assert_eq!(filtered_items[0].preview, "Hello TP");
assert_eq!(filtered_items[0].model_provider, "test-provider");
// Empty filter should include every session regardless of provider metadata.
let unfiltered_req_id = mcp
.send_list_conversations_request(ListConversationsParams {
page_size: Some(10),
cursor: None,
model_providers: Some(Vec::new()),
})
.await
.expect("send listConversations unfiltered");
let unfiltered_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(unfiltered_req_id)),
)
.await
.expect("listConversations unfiltered timeout")
.expect("listConversations unfiltered resp");
let ListConversationsResponse {
items: unfiltered_items,
next_cursor: unfiltered_next,
} = to_response::<ListConversationsResponse>(unfiltered_resp)
.expect("deserialize unfiltered response");
assert_eq!(unfiltered_items.len(), 4);
assert!(unfiltered_next.is_none());
let empty_req_id = mcp
.send_list_conversations_request(ListConversationsParams {
page_size: Some(10),
cursor: None,
model_providers: Some(vec!["other".to_string()]),
})
.await
.expect("send listConversations filtered empty");
let empty_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(empty_req_id)),
)
.await
.expect("listConversations filtered empty timeout")
.expect("listConversations filtered empty resp");
let ListConversationsResponse {
items: empty_items,
next_cursor: empty_next,
} = to_response::<ListConversationsResponse>(empty_resp).expect("deserialize filtered empty");
assert!(empty_items.is_empty());
assert!(empty_next.is_none());
// Now resume one of the sessions and expect a SessionConfigured notification and response.
let resume_req_id = mcp
@@ -152,7 +240,13 @@ async fn test_list_and_resume_conversations() {
assert!(!conversation_id.to_string().is_empty());
}
fn create_fake_rollout(codex_home: &Path, filename_ts: &str, meta_rfc3339: &str, preview: &str) {
fn create_fake_rollout(
codex_home: &Path,
filename_ts: &str,
meta_rfc3339: &str,
preview: &str,
model_provider: Option<&str>,
) {
let uuid = Uuid::new_v4();
// sessions/YYYY/MM/DD/ derived from filename_ts (YYYY-MM-DDThh-mm-ss)
let year = &filename_ts[0..4];
@@ -164,18 +258,22 @@ fn create_fake_rollout(codex_home: &Path, filename_ts: &str, meta_rfc3339: &str,
let file_path = dir.join(format!("rollout-{filename_ts}-{uuid}.jsonl"));
let mut lines = Vec::new();
// Meta line with timestamp (flattened meta in payload for new schema)
let mut payload = json!({
"id": uuid,
"timestamp": meta_rfc3339,
"cwd": "/",
"originator": "codex",
"cli_version": "0.0.0",
"instructions": null,
});
if let Some(provider) = model_provider {
payload["model_provider"] = json!(provider);
}
lines.push(
json!({
"timestamp": meta_rfc3339,
"type": "session_meta",
"payload": {
"id": uuid,
"timestamp": meta_rfc3339,
"cwd": "/",
"originator": "codex",
"cli_version": "0.0.0",
"instructions": null
}
"payload": payload
})
.to_string(),
);

View File

@@ -24,10 +24,12 @@ codex-apply-patch = { workspace = true }
codex-file-search = { workspace = true }
codex-otel = { workspace = true, features = ["otel"] }
codex-protocol = { workspace = true }
codex-git-tooling = { workspace = true }
codex-rmcp-client = { workspace = true }
codex-async-utils = { workspace = true }
codex-utils-string = { workspace = true }
codex-utils-pty = { workspace = true }
codex-utils-readiness = { workspace = true }
codex-utils-tokenizer = { workspace = true }
dirs = { workspace = true }
dunce = { workspace = true }

View File

@@ -76,6 +76,7 @@ pub(crate) async fn stream_chat_completions(
ResponseItem::CustomToolCall { .. } => {}
ResponseItem::CustomToolCallOutput { .. } => {}
ResponseItem::WebSearchCall { .. } => {}
ResponseItem::GhostSnapshot { .. } => {}
}
}
@@ -270,6 +271,10 @@ pub(crate) async fn stream_chat_completions(
"content": output,
}));
}
ResponseItem::GhostSnapshot { .. } => {
// Ghost snapshots annotate history but are not sent to the model.
continue;
}
ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::Other => {

View File

@@ -20,7 +20,6 @@ use async_channel::Sender;
use codex_apply_patch::ApplyPatchAction;
use codex_protocol::ConversationId;
use codex_protocol::items::TurnItem;
use codex_protocol::protocol::ConversationPathResponseEvent;
use codex_protocol::protocol::ExitedReviewModeEvent;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ItemStartedEvent;
@@ -105,8 +104,12 @@ use crate::state::SessionServices;
use crate::state::SessionState;
use crate::state::TaskKind;
use crate::tasks::CompactTask;
use crate::tasks::GhostSnapshotTask;
use crate::tasks::RegularTask;
use crate::tasks::ReviewTask;
use crate::tasks::SessionTask;
use crate::tasks::SessionTaskContext;
use crate::tasks::UndoTask;
use crate::tools::ToolRouter;
use crate::tools::context::SharedTurnDiffTracker;
use crate::tools::parallel::ToolCallRuntime;
@@ -129,6 +132,8 @@ use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::InitialHistory;
use codex_protocol::user_input::UserInput;
use codex_utils_readiness::Readiness;
use codex_utils_readiness::ReadinessFlag;
pub mod compact;
use self::compact::build_compacted_history;
@@ -179,6 +184,7 @@ impl Codex {
sandbox_policy: config.sandbox_policy.clone(),
cwd: config.cwd.clone(),
original_config_do_not_use: Arc::clone(&config),
features: config.features.clone(),
};
// Generate a unique ID for the lifetime of this Codex session.
@@ -272,6 +278,7 @@ pub(crate) struct TurnContext {
pub(crate) is_review_mode: bool,
pub(crate) final_output_json_schema: Option<Value>,
pub(crate) codex_linux_sandbox_exe: Option<PathBuf>,
pub(crate) tool_call_gate: Arc<ReadinessFlag>,
}
impl TurnContext {
@@ -313,6 +320,9 @@ pub(crate) struct SessionConfiguration {
/// operate deterministically.
cwd: PathBuf,
/// Set of feature flags for this session
features: Features,
// TODO(pakrym): Remove config from here
original_config_do_not_use: Arc<Config>,
}
@@ -407,6 +417,7 @@ impl Session {
is_review_mode: false,
final_output_json_schema: None,
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
tool_call_gate: Arc::new(ReadinessFlag::new()),
}
}
@@ -598,6 +609,19 @@ impl Session {
self.tx_event.clone()
}
/// Ensure all rollout writes are durably flushed.
pub(crate) async fn flush_rollout(&self) {
let recorder = {
let guard = self.services.rollout.lock().await;
guard.clone()
};
if let Some(rec) = recorder
&& let Err(e) = rec.flush().await
{
warn!("failed to flush rollout recorder: {e}");
}
}
fn next_internal_sub_id(&self) -> String {
let id = self
.next_internal_sub_id
@@ -612,6 +636,8 @@ impl Session {
// Build and record initial items (user instructions + environment context)
let items = self.build_initial_context(&turn_context);
self.record_conversation_items(&turn_context, &items).await;
// Ensure initial items are visible to immediate readers (e.g., tests, forks).
self.flush_rollout().await;
}
InitialHistory::Resumed(_) | InitialHistory::Forked(_) => {
let rollout_items = conversation_history.get_rollout_items();
@@ -628,6 +654,8 @@ impl Session {
if persist && !rollout_items.is_empty() {
self.persist_rollout_items(&rollout_items).await;
}
// Flush after seeding history and any persisted rollout copy.
self.flush_rollout().await;
}
}
}
@@ -931,7 +959,7 @@ impl Session {
state.record_items(items.iter());
}
async fn replace_history(&self, items: Vec<ResponseItem>) {
pub(crate) async fn replace_history(&self, items: Vec<ResponseItem>) {
let mut state = self.state.lock().await;
state.replace_history(items);
}
@@ -1080,6 +1108,43 @@ impl Session {
self.send_event(turn_context, event).await;
}
async fn maybe_start_ghost_snapshot(
self: &Arc<Self>,
turn_context: Arc<TurnContext>,
cancellation_token: CancellationToken,
) {
if turn_context.is_review_mode
|| !self
.state
.lock()
.await
.session_configuration
.features
.enabled(Feature::GhostCommit)
{
return;
}
let token = match turn_context.tool_call_gate.subscribe().await {
Ok(token) => token,
Err(err) => {
warn!("failed to subscribe to ghost snapshot readiness: {err}");
return;
}
};
info!("spawning ghost snapshot task");
let task = GhostSnapshotTask::new(token);
Arc::new(task)
.run(
Arc::new(SessionTaskContext::new(self.clone())),
turn_context.clone(),
Vec::new(),
cancellation_token,
)
.await;
}
/// Returns the input if there was no task running to inject into
pub async fn inject_input(&self, input: Vec<UserInput>) -> Result<(), Vec<UserInput>> {
let mut active = self.active_turn.lock().await;
@@ -1356,6 +1421,13 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
};
sess.send_event_raw(event).await;
}
Op::Undo => {
let turn_context = sess
.new_turn_with_sub_id(sub.id.clone(), SessionSettingsUpdate::default())
.await;
sess.spawn_task(turn_context, Vec::new(), UndoTask::new())
.await;
}
Op::Compact => {
let turn_context = sess
.new_turn_with_sub_id(sub.id.clone(), SessionSettingsUpdate::default())
@@ -1401,33 +1473,7 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
sess.send_event_raw(event).await;
break;
}
Op::GetPath => {
let sub_id = sub.id.clone();
// Flush rollout writes before returning the path so readers observe a consistent file.
let (path, rec_opt) = {
let guard = sess.services.rollout.lock().await;
match guard.as_ref() {
Some(rec) => (rec.get_rollout_path(), Some(rec.clone())),
None => {
error!("rollout recorder not found");
continue;
}
}
};
if let Some(rec) = rec_opt
&& let Err(e) = rec.flush().await
{
warn!("failed to flush rollout recorder before GetHistory: {e}");
}
let event = Event {
id: sub_id.clone(),
msg: EventMsg::ConversationPath(ConversationPathResponseEvent {
conversation_id: sess.conversation_id,
path,
}),
};
sess.send_event_raw(event).await;
}
Op::Review { review_request } => {
let turn_context = sess
.new_turn_with_sub_id(sub.id.clone(), SessionSettingsUpdate::default())
@@ -1518,6 +1564,7 @@ async fn spawn_review_thread(
is_review_mode: true,
final_output_json_schema: None,
codex_linux_sandbox_exe: parent_turn_context.codex_linux_sandbox_exe.clone(),
tool_call_gate: Arc::new(ReadinessFlag::new()),
};
// Seed the child task with the review prompt as the initial user message.
@@ -1581,6 +1628,8 @@ pub(crate) async fn run_task(
.await;
}
sess.maybe_start_ghost_snapshot(Arc::clone(&turn_context), cancellation_token.child_token())
.await;
let mut last_agent_message: Option<String> = None;
// Although from the perspective of codex.rs, TurnDiffTracker has the lifecycle of a Task which contains
// many turns, from the perspective of the user, it is a single turn.
@@ -1773,6 +1822,13 @@ fn parse_review_output_event(text: &str) -> ReviewOutputEvent {
}
}
fn filter_model_visible_history(input: Vec<ResponseItem>) -> Vec<ResponseItem> {
input
.into_iter()
.filter(|item| !matches!(item, ResponseItem::GhostSnapshot { .. }))
.collect()
}
async fn run_turn(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
@@ -1793,7 +1849,7 @@ async fn run_turn(
.supports_parallel_tool_calls;
let parallel_tool_calls = model_supports_parallel;
let prompt = Prompt {
input,
input: filter_model_visible_history(input),
tools: router.specs(),
parallel_tool_calls,
base_instructions_override: turn_context.base_instructions.clone(),
@@ -2231,6 +2287,8 @@ pub(crate) async fn exit_review_mode(
}],
)
.await;
// Make the recorded review note visible immediately for readers.
session.flush_rollout().await;
}
fn mcp_init_error_display(
@@ -2286,6 +2344,8 @@ fn is_mcp_client_startup_timeout_error(error: &anyhow::Error) -> bool {
|| error_message.contains("timed out handshaking with MCP server")
}
use crate::features::Feature;
use crate::features::Features;
#[cfg(test)]
pub(crate) use tests::make_session_and_context;
@@ -2602,6 +2662,7 @@ mod tests {
sandbox_policy: config.sandbox_policy.clone(),
cwd: config.cwd.clone(),
original_config_do_not_use: Arc::clone(&config),
features: Features::default(),
};
let state = SessionState::new(session_configuration.clone());
@@ -2670,6 +2731,7 @@ mod tests {
sandbox_policy: config.sandbox_policy.clone(),
cwd: config.cwd.clone(),
original_config_do_not_use: Arc::clone(&config),
features: Features::default(),
};
let state = SessionState::new(session_configuration.clone());

View File

@@ -2,6 +2,7 @@ use std::sync::Arc;
use super::Session;
use super::TurnContext;
use super::filter_model_visible_history;
use super::get_last_assistant_message_from_turn;
use crate::Prompt;
use crate::client_common::ResponseEvent;
@@ -86,8 +87,9 @@ async fn run_compact_task_inner(
loop {
let turn_input = history.get_history();
let prompt_input = filter_model_visible_history(turn_input.clone());
let prompt = Prompt {
input: turn_input.clone(),
input: prompt_input.clone(),
..Default::default()
};
let attempt_result = drain_to_completed(&sess, turn_context.as_ref(), &prompt).await;
@@ -109,7 +111,7 @@ async fn run_compact_task_inner(
return;
}
Err(e @ CodexErr::ContextWindowExceeded) => {
if turn_input.len() > 1 {
if prompt_input.len() > 1 {
// Trim from the beginning to preserve cache (prefix-based) and keep recent messages intact.
error!(
"Context window exceeded while compacting; removing oldest history item. Error: {e}"
@@ -152,7 +154,13 @@ async fn run_compact_task_inner(
let summary_text = get_last_assistant_message_from_turn(&history_snapshot).unwrap_or_default();
let user_messages = collect_user_messages(&history_snapshot);
let initial_context = sess.build_initial_context(turn_context.as_ref());
let new_history = build_compacted_history(initial_context, &user_messages, &summary_text);
let mut new_history = build_compacted_history(initial_context, &user_messages, &summary_text);
let ghost_snapshots: Vec<ResponseItem> = history_snapshot
.iter()
.filter(|item| matches!(item, ResponseItem::GhostSnapshot { .. }))
.cloned()
.collect();
new_history.extend(ghost_snapshots);
sess.replace_history(new_history).await;
let rollout_item = RolloutItem::Compacted(CompactedItem {

View File

@@ -3,16 +3,21 @@ use crate::error::Result as CodexResult;
use crate::protocol::Event;
use crate::protocol::Op;
use crate::protocol::Submission;
use std::path::PathBuf;
pub struct CodexConversation {
codex: Codex,
rollout_path: PathBuf,
}
/// Conduit for the bidirectional stream of messages that compose a conversation
/// in Codex.
impl CodexConversation {
pub(crate) fn new(codex: Codex) -> Self {
Self { codex }
pub(crate) fn new(codex: Codex, rollout_path: PathBuf) -> Self {
Self {
codex,
rollout_path,
}
}
pub async fn submit(&self, op: Op) -> CodexResult<String> {
@@ -27,4 +32,8 @@ impl CodexConversation {
pub async fn next_event(&self) -> CodexResult<Event> {
self.codex.next_event().await
}
pub fn rollout_path(&self) -> PathBuf {
self.rollout_path.clone()
}
}

View File

@@ -2,6 +2,7 @@ use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::protocol::TokenUsageInfo;
use std::ops::Deref;
use tracing::error;
/// Transcript of conversation history
@@ -40,7 +41,9 @@ impl ConversationHistory {
I::Item: std::ops::Deref<Target = ResponseItem>,
{
for item in items {
if !is_api_message(&item) {
let item_ref = item.deref();
let is_ghost_snapshot = matches!(item_ref, ResponseItem::GhostSnapshot { .. });
if !is_api_message(item_ref) && !is_ghost_snapshot {
continue;
}
@@ -165,6 +168,7 @@ impl ConversationHistory {
| ResponseItem::WebSearchCall { .. }
| ResponseItem::FunctionCallOutput { .. }
| ResponseItem::CustomToolCallOutput { .. }
| ResponseItem::GhostSnapshot { .. }
| ResponseItem::Other
| ResponseItem::Message { .. } => {
// nothing to do for these variants
@@ -231,6 +235,7 @@ impl ConversationHistory {
| ResponseItem::LocalShellCall { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::GhostSnapshot { .. }
| ResponseItem::Other
| ResponseItem::Message { .. } => {
// nothing to do for these variants
@@ -355,6 +360,7 @@ fn is_api_message(message: &ResponseItem) -> bool {
| ResponseItem::LocalShellCall { .. }
| ResponseItem::Reasoning { .. }
| ResponseItem::WebSearchCall { .. } => true,
ResponseItem::GhostSnapshot { .. } => false,
ResponseItem::Other => false,
}
}

View File

@@ -98,7 +98,10 @@ impl ConversationManager {
}
};
let conversation = Arc::new(CodexConversation::new(codex));
let conversation = Arc::new(CodexConversation::new(
codex,
session_configured.rollout_path.clone(),
));
self.conversations
.write()
.await

View File

@@ -41,6 +41,8 @@ pub enum Feature {
WebSearchRequest,
/// Enable the model-based risk assessments for sandboxed commands.
SandboxCommandAssessment,
/// Create a ghost commit at each turn.
GhostCommit,
}
impl Feature {
@@ -248,4 +250,10 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::Experimental,
default_enabled: false,
},
FeatureSpec {
id: Feature::GhostCommit,
key: "ghost_commit",
stage: Stage::Experimental,
default_enabled: false,
},
];

View File

@@ -160,6 +160,7 @@ pub fn find_family_for_model(slug: &str) -> Option<ModelFamily> {
slug, "gpt-5",
supports_reasoning_summaries: true,
needs_special_apply_patch_instructions: true,
supports_parallel_tool_calls: true,
)
} else {
None

View File

@@ -54,6 +54,7 @@ struct HeadTailSummary {
saw_session_meta: bool,
saw_user_event: bool,
source: Option<SessionSource>,
model_provider: Option<String>,
created_at: Option<String>,
updated_at: Option<String>,
}
@@ -109,6 +110,8 @@ pub(crate) async fn get_conversations(
page_size: usize,
cursor: Option<&Cursor>,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
default_provider: &str,
) -> io::Result<ConversationsPage> {
let mut root = codex_home.to_path_buf();
root.push(SESSIONS_SUBDIR);
@@ -124,8 +127,17 @@ pub(crate) async fn get_conversations(
let anchor = cursor.cloned();
let result =
traverse_directories_for_paths(root.clone(), page_size, anchor, allowed_sources).await?;
let provider_matcher =
model_providers.and_then(|filters| ProviderMatcher::new(filters, default_provider));
let result = traverse_directories_for_paths(
root.clone(),
page_size,
anchor,
allowed_sources,
provider_matcher.as_ref(),
)
.await?;
Ok(result)
}
@@ -145,6 +157,7 @@ async fn traverse_directories_for_paths(
page_size: usize,
anchor: Option<Cursor>,
allowed_sources: &[SessionSource],
provider_matcher: Option<&ProviderMatcher<'_>>,
) -> io::Result<ConversationsPage> {
let mut items: Vec<ConversationItem> = Vec::with_capacity(page_size);
let mut scanned_files = 0usize;
@@ -153,6 +166,7 @@ async fn traverse_directories_for_paths(
Some(c) => (c.ts, c.id),
None => (OffsetDateTime::UNIX_EPOCH, Uuid::nil()),
};
let mut more_matches_available = false;
let year_dirs = collect_dirs_desc(&root, |s| s.parse::<u16>().ok()).await?;
@@ -184,6 +198,7 @@ async fn traverse_directories_for_paths(
for (ts, sid, _name_str, path) in day_files.into_iter() {
scanned_files += 1;
if scanned_files >= MAX_SCAN_FILES && items.len() >= page_size {
more_matches_available = true;
break 'outer;
}
if !anchor_passed {
@@ -194,6 +209,7 @@ async fn traverse_directories_for_paths(
}
}
if items.len() == page_size {
more_matches_available = true;
break 'outer;
}
// Read head and simultaneously detect message events within the same
@@ -208,6 +224,11 @@ async fn traverse_directories_for_paths(
{
continue;
}
if let Some(matcher) = provider_matcher
&& !matcher.matches(summary.model_provider.as_deref())
{
continue;
}
// Apply filters: must have session meta and at least one user message event
if summary.saw_session_meta && summary.saw_user_event {
let HeadTailSummary {
@@ -231,12 +252,21 @@ async fn traverse_directories_for_paths(
}
}
let next = build_next_cursor(&items);
let reached_scan_cap = scanned_files >= MAX_SCAN_FILES;
if reached_scan_cap && !items.is_empty() {
more_matches_available = true;
}
let next = if more_matches_available {
build_next_cursor(&items)
} else {
None
};
Ok(ConversationsPage {
items,
next_cursor: next,
num_scanned_files: scanned_files,
reached_scan_cap: scanned_files >= MAX_SCAN_FILES,
reached_scan_cap,
})
}
@@ -328,6 +358,32 @@ fn parse_timestamp_uuid_from_filename(name: &str) -> Option<(OffsetDateTime, Uui
Some((ts, uuid))
}
struct ProviderMatcher<'a> {
filters: &'a [String],
matches_default_provider: bool,
}
impl<'a> ProviderMatcher<'a> {
fn new(filters: &'a [String], default_provider: &'a str) -> Option<Self> {
if filters.is_empty() {
return None;
}
let matches_default_provider = filters.iter().any(|provider| provider == default_provider);
Some(Self {
filters,
matches_default_provider,
})
}
fn matches(&self, session_provider: Option<&str>) -> bool {
match session_provider {
Some(provider) => self.filters.iter().any(|candidate| candidate == provider),
None => self.matches_default_provider,
}
}
}
async fn read_head_and_tail(
path: &Path,
head_limit: usize,
@@ -354,6 +410,7 @@ async fn read_head_and_tail(
match rollout_line.item {
RolloutItem::SessionMeta(session_meta_line) => {
summary.source = Some(session_meta_line.meta.source);
summary.model_provider = session_meta_line.meta.model_provider.clone();
summary.created_at = summary
.created_at
.clone()

View File

@@ -26,7 +26,8 @@ pub(crate) fn should_persist_response_item(item: &ResponseItem) -> bool {
| ResponseItem::FunctionCallOutput { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::CustomToolCallOutput { .. }
| ResponseItem::WebSearchCall { .. } => true,
| ResponseItem::WebSearchCall { .. }
| ResponseItem::GhostSnapshot { .. } => true,
ResponseItem::Other => false,
}
}
@@ -42,6 +43,7 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::TokenCount(_)
| EventMsg::EnteredReviewMode(_)
| EventMsg::ExitedReviewMode(_)
| EventMsg::UndoCompleted(_)
| EventMsg::TurnAborted(_) => true,
EventMsg::Error(_)
| EventMsg::TaskStarted(_)
@@ -67,12 +69,12 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::PatchApplyEnd(_)
| EventMsg::TurnDiff(_)
| EventMsg::GetHistoryEntryResponse(_)
| EventMsg::UndoStarted(_)
| EventMsg::McpListToolsResponse(_)
| EventMsg::ListCustomPromptsResponse(_)
| EventMsg::PlanUpdate(_)
| EventMsg::ShutdownComplete
| EventMsg::ViewImageToolCall(_)
| EventMsg::ConversationPath(_)
| EventMsg::ItemStarted(_)
| EventMsg::ItemCompleted(_) => false,
}

View File

@@ -97,8 +97,18 @@ impl RolloutRecorder {
page_size: usize,
cursor: Option<&Cursor>,
allowed_sources: &[SessionSource],
model_providers: Option<&[String]>,
default_provider: &str,
) -> std::io::Result<ConversationsPage> {
get_conversations(codex_home, page_size, cursor, allowed_sources).await
get_conversations(
codex_home,
page_size,
cursor,
allowed_sources,
model_providers,
default_provider,
)
.await
}
/// Attempt to create a new [`RolloutRecorder`]. If the sessions directory
@@ -137,6 +147,7 @@ impl RolloutRecorder {
cli_version: env!("CARGO_PKG_VERSION").to_string(),
instructions,
source,
model_provider: Some(config.model_provider_id.clone()),
}),
)
}
@@ -267,10 +278,6 @@ impl RolloutRecorder {
}))
}
pub(crate) fn get_rollout_path(&self) -> PathBuf {
self.rollout_path.clone()
}
pub async fn shutdown(&self) -> std::io::Result<()> {
let (tx_done, rx_done) = oneshot::channel();
match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await {

View File

@@ -32,6 +32,14 @@ use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::UserMessageEvent;
const NO_SOURCE_FILTER: &[SessionSource] = &[];
const TEST_PROVIDER: &str = "test-provider";
fn provider_vec(providers: &[&str]) -> Vec<String> {
providers
.iter()
.map(std::string::ToString::to_string)
.collect()
}
fn write_session_file(
root: &Path,
@@ -39,6 +47,24 @@ fn write_session_file(
uuid: Uuid,
num_records: usize,
source: Option<SessionSource>,
) -> std::io::Result<(OffsetDateTime, Uuid)> {
write_session_file_with_provider(
root,
ts_str,
uuid,
num_records,
source,
Some("test-provider"),
)
}
fn write_session_file_with_provider(
root: &Path,
ts_str: &str,
uuid: Uuid,
num_records: usize,
source: Option<SessionSource>,
model_provider: Option<&str>,
) -> std::io::Result<(OffsetDateTime, Uuid)> {
let format: &[FormatItem] =
format_description!("[year]-[month]-[day]T[hour]-[minute]-[second]");
@@ -68,6 +94,9 @@ fn write_session_file(
if let Some(source) = source {
payload["source"] = serde_json::to_value(source).unwrap();
}
if let Some(provider) = model_provider {
payload["model_provider"] = serde_json::Value::String(provider.to_string());
}
let meta = serde_json::json!({
"timestamp": ts_str,
@@ -134,9 +163,17 @@ async fn test_list_conversations_latest_first() {
)
.unwrap();
let page = get_conversations(home, 10, None, INTERACTIVE_SESSION_SOURCES)
.await
.unwrap();
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let page = get_conversations(
home,
10,
None,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
)
.await
.unwrap();
// Build expected objects
let p1 = home
@@ -166,6 +203,7 @@ async fn test_list_conversations_latest_first() {
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
})];
let head_2 = vec![serde_json::json!({
"id": u2,
@@ -175,6 +213,7 @@ async fn test_list_conversations_latest_first() {
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
})];
let head_1 = vec![serde_json::json!({
"id": u1,
@@ -184,11 +223,9 @@ async fn test_list_conversations_latest_first() {
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
})];
let expected_cursor: Cursor =
serde_json::from_str(&format!("\"2025-01-01T12-00-00|{u1}\"")).unwrap();
let expected = ConversationsPage {
items: vec![
ConversationItem {
@@ -213,7 +250,7 @@ async fn test_list_conversations_latest_first() {
updated_at: Some("2025-01-01T12-00-00".into()),
},
],
next_cursor: Some(expected_cursor),
next_cursor: None,
num_scanned_files: 3,
reached_scan_cap: false,
};
@@ -275,9 +312,17 @@ async fn test_pagination_cursor() {
)
.unwrap();
let page1 = get_conversations(home, 2, None, INTERACTIVE_SESSION_SOURCES)
.await
.unwrap();
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let page1 = get_conversations(
home,
2,
None,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
)
.await
.unwrap();
let p5 = home
.join("sessions")
.join("2025")
@@ -298,6 +343,7 @@ async fn test_pagination_cursor() {
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
})];
let head_4 = vec![serde_json::json!({
"id": u4,
@@ -307,6 +353,7 @@ async fn test_pagination_cursor() {
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
})];
let expected_cursor1: Cursor =
serde_json::from_str(&format!("\"2025-03-04T09-00-00|{u4}\"")).unwrap();
@@ -338,6 +385,8 @@ async fn test_pagination_cursor() {
2,
page1.next_cursor.as_ref(),
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
)
.await
.unwrap();
@@ -361,6 +410,7 @@ async fn test_pagination_cursor() {
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
})];
let head_2 = vec![serde_json::json!({
"id": u2,
@@ -370,6 +420,7 @@ async fn test_pagination_cursor() {
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
})];
let expected_cursor2: Cursor =
serde_json::from_str(&format!("\"2025-03-02T09-00-00|{u2}\"")).unwrap();
@@ -401,6 +452,8 @@ async fn test_pagination_cursor() {
2,
page2.next_cursor.as_ref(),
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
)
.await
.unwrap();
@@ -418,9 +471,8 @@ async fn test_pagination_cursor() {
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
})];
let expected_cursor3: Cursor =
serde_json::from_str(&format!("\"2025-03-01T09-00-00|{u1}\"")).unwrap();
let expected_page3 = ConversationsPage {
items: vec![ConversationItem {
path: p1,
@@ -429,7 +481,7 @@ async fn test_pagination_cursor() {
created_at: Some("2025-03-01T09-00-00".into()),
updated_at: Some("2025-03-01T09-00-00".into()),
}],
next_cursor: Some(expected_cursor3),
next_cursor: None,
num_scanned_files: 5, // scanned 05, 04 (anchor), 03, 02 (anchor), 01
reached_scan_cap: false,
};
@@ -445,9 +497,17 @@ async fn test_get_conversation_contents() {
let ts = "2025-04-01T10-30-00";
write_session_file(home, ts, uuid, 2, Some(SessionSource::VSCode)).unwrap();
let page = get_conversations(home, 1, None, INTERACTIVE_SESSION_SOURCES)
.await
.unwrap();
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let page = get_conversations(
home,
1,
None,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
)
.await
.unwrap();
let path = &page.items[0].path;
let content = get_conversation(path).await.unwrap();
@@ -467,8 +527,8 @@ async fn test_get_conversation_contents() {
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
})];
let expected_cursor: Cursor = serde_json::from_str(&format!("\"{ts}|{uuid}\"")).unwrap();
let expected_page = ConversationsPage {
items: vec![ConversationItem {
path: expected_path,
@@ -477,7 +537,7 @@ async fn test_get_conversation_contents() {
created_at: Some(ts.into()),
updated_at: Some(ts.into()),
}],
next_cursor: Some(expected_cursor),
next_cursor: None,
num_scanned_files: 1,
reached_scan_cap: false,
};
@@ -495,6 +555,7 @@ async fn test_get_conversation_contents() {
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
}
});
let user_event = serde_json::json!({
@@ -532,6 +593,7 @@ async fn test_tail_includes_last_response_items() -> Result<()> {
originator: "test_originator".into(),
cli_version: "test_version".into(),
source: SessionSource::VSCode,
model_provider: Some("test-provider".into()),
},
git: None,
}),
@@ -563,7 +625,16 @@ async fn test_tail_includes_last_response_items() -> Result<()> {
}
drop(file);
let page = get_conversations(home, 1, None, INTERACTIVE_SESSION_SOURCES).await?;
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let page = get_conversations(
home,
1,
None,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
)
.await?;
let item = page.items.first().expect("conversation item");
let tail_len = item.tail.len();
assert_eq!(tail_len, 10usize.min(total_messages));
@@ -615,6 +686,7 @@ async fn test_tail_handles_short_sessions() -> Result<()> {
originator: "test_originator".into(),
cli_version: "test_version".into(),
source: SessionSource::VSCode,
model_provider: Some("test-provider".into()),
},
git: None,
}),
@@ -645,7 +717,16 @@ async fn test_tail_handles_short_sessions() -> Result<()> {
}
drop(file);
let page = get_conversations(home, 1, None, INTERACTIVE_SESSION_SOURCES).await?;
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let page = get_conversations(
home,
1,
None,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
)
.await?;
let tail = &page.items.first().expect("conversation item").tail;
assert_eq!(tail.len(), 3);
@@ -699,6 +780,7 @@ async fn test_tail_skips_trailing_non_responses() -> Result<()> {
originator: "test_originator".into(),
cli_version: "test_version".into(),
source: SessionSource::VSCode,
model_provider: Some("test-provider".into()),
},
git: None,
}),
@@ -743,7 +825,16 @@ async fn test_tail_skips_trailing_non_responses() -> Result<()> {
writeln!(file, "{}", serde_json::to_string(&shutdown_event)?)?;
drop(file);
let page = get_conversations(home, 1, None, INTERACTIVE_SESSION_SOURCES).await?;
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let page = get_conversations(
home,
1,
None,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
)
.await?;
let tail = &page.items.first().expect("conversation item").tail;
let expected: Vec<serde_json::Value> = (0..4)
@@ -785,9 +876,17 @@ async fn test_stable_ordering_same_second_pagination() {
write_session_file(home, ts, u2, 0, Some(SessionSource::VSCode)).unwrap();
write_session_file(home, ts, u3, 0, Some(SessionSource::VSCode)).unwrap();
let page1 = get_conversations(home, 2, None, INTERACTIVE_SESSION_SOURCES)
.await
.unwrap();
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let page1 = get_conversations(
home,
2,
None,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
)
.await
.unwrap();
let p3 = home
.join("sessions")
@@ -810,6 +909,7 @@ async fn test_stable_ordering_same_second_pagination() {
"originator": "test_originator",
"cli_version": "test_version",
"source": "vscode",
"model_provider": "test-provider",
})]
};
let expected_cursor1: Cursor = serde_json::from_str(&format!("\"{ts}|{u2}\"")).unwrap();
@@ -841,6 +941,8 @@ async fn test_stable_ordering_same_second_pagination() {
2,
page1.next_cursor.as_ref(),
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
)
.await
.unwrap();
@@ -850,7 +952,6 @@ async fn test_stable_ordering_same_second_pagination() {
.join("07")
.join("01")
.join(format!("rollout-2025-07-01T00-00-00-{u1}.jsonl"));
let expected_cursor2: Cursor = serde_json::from_str(&format!("\"{ts}|{u1}\"")).unwrap();
let expected_page2 = ConversationsPage {
items: vec![ConversationItem {
path: p1,
@@ -859,7 +960,7 @@ async fn test_stable_ordering_same_second_pagination() {
created_at: Some(ts.to_string()),
updated_at: Some(ts.to_string()),
}],
next_cursor: Some(expected_cursor2),
next_cursor: None,
num_scanned_files: 3, // scanned u3, u2 (anchor), u1
reached_scan_cap: false,
};
@@ -891,9 +992,17 @@ async fn test_source_filter_excludes_non_matching_sessions() {
)
.unwrap();
let interactive_only = get_conversations(home, 10, None, INTERACTIVE_SESSION_SOURCES)
.await
.unwrap();
let provider_filter = provider_vec(&[TEST_PROVIDER]);
let interactive_only = get_conversations(
home,
10,
None,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
TEST_PROVIDER,
)
.await
.unwrap();
let paths: Vec<_> = interactive_only
.items
.iter()
@@ -905,7 +1014,7 @@ async fn test_source_filter_excludes_non_matching_sessions() {
path.ends_with("rollout-2025-08-02T10-00-00-00000000-0000-0000-0000-00000000002a.jsonl")
}));
let all_sessions = get_conversations(home, 10, None, NO_SOURCE_FILTER)
let all_sessions = get_conversations(home, 10, None, NO_SOURCE_FILTER, None, TEST_PROVIDER)
.await
.unwrap();
let all_paths: Vec<_> = all_sessions
@@ -921,3 +1030,102 @@ async fn test_source_filter_excludes_non_matching_sessions() {
path.ends_with("rollout-2025-08-01T10-00-00-00000000-0000-0000-0000-00000000004d.jsonl")
}));
}
#[tokio::test]
async fn test_model_provider_filter_selects_only_matching_sessions() -> Result<()> {
let temp = TempDir::new().unwrap();
let home = temp.path();
let openai_id = Uuid::from_u128(1);
let beta_id = Uuid::from_u128(2);
let none_id = Uuid::from_u128(3);
write_session_file_with_provider(
home,
"2025-09-01T12-00-00",
openai_id,
1,
Some(SessionSource::VSCode),
Some("openai"),
)?;
write_session_file_with_provider(
home,
"2025-09-01T11-00-00",
beta_id,
1,
Some(SessionSource::VSCode),
Some("beta"),
)?;
write_session_file_with_provider(
home,
"2025-09-01T10-00-00",
none_id,
1,
Some(SessionSource::VSCode),
None,
)?;
let openai_id_str = openai_id.to_string();
let none_id_str = none_id.to_string();
let openai_filter = provider_vec(&["openai"]);
let openai_sessions = get_conversations(
home,
10,
None,
NO_SOURCE_FILTER,
Some(openai_filter.as_slice()),
"openai",
)
.await?;
assert_eq!(openai_sessions.items.len(), 2);
let openai_ids: Vec<_> = openai_sessions
.items
.iter()
.filter_map(|item| {
item.head
.first()
.and_then(|value| value.get("id"))
.and_then(serde_json::Value::as_str)
.map(str::to_string)
})
.collect();
assert!(openai_ids.contains(&openai_id_str));
assert!(openai_ids.contains(&none_id_str));
let beta_filter = provider_vec(&["beta"]);
let beta_sessions = get_conversations(
home,
10,
None,
NO_SOURCE_FILTER,
Some(beta_filter.as_slice()),
"openai",
)
.await?;
assert_eq!(beta_sessions.items.len(), 1);
let beta_id_str = beta_id.to_string();
let beta_head = beta_sessions
.items
.first()
.and_then(|item| item.head.first())
.and_then(|value| value.get("id"))
.and_then(serde_json::Value::as_str);
assert_eq!(beta_head, Some(beta_id_str.as_str()));
let unknown_filter = provider_vec(&["unknown"]);
let unknown_sessions = get_conversations(
home,
10,
None,
NO_SOURCE_FILTER,
Some(unknown_filter.as_slice()),
"openai",
)
.await?;
assert!(unknown_sessions.items.is_empty());
let all_sessions = get_conversations(home, 10, None, NO_SOURCE_FILTER, None, "openai").await?;
assert_eq!(all_sessions.items.len(), 3);
Ok(())
}

View File

@@ -0,0 +1,110 @@
use crate::codex::TurnContext;
use crate::state::TaskKind;
use crate::tasks::SessionTask;
use crate::tasks::SessionTaskContext;
use async_trait::async_trait;
use codex_git_tooling::CreateGhostCommitOptions;
use codex_git_tooling::GitToolingError;
use codex_git_tooling::create_ghost_commit;
use codex_protocol::models::ResponseItem;
use codex_protocol::user_input::UserInput;
use codex_utils_readiness::Readiness;
use codex_utils_readiness::Token;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::warn;
pub(crate) struct GhostSnapshotTask {
token: Token,
}
#[async_trait]
impl SessionTask for GhostSnapshotTask {
fn kind(&self) -> TaskKind {
TaskKind::Regular
}
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
_input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
tokio::task::spawn(async move {
let token = self.token;
let ctx_for_task = Arc::clone(&ctx);
let cancelled = tokio::select! {
_ = cancellation_token.cancelled() => true,
_ = async {
let repo_path = ctx_for_task.cwd.clone();
// Required to run in a dedicated blocking pool.
match tokio::task::spawn_blocking(move || {
let options = CreateGhostCommitOptions::new(&repo_path);
create_ghost_commit(&options)
})
.await
{
Ok(Ok(ghost_commit)) => {
info!("ghost snapshot blocking task finished");
session
.session
.record_conversation_items(&ctx, &[ResponseItem::GhostSnapshot {
ghost_commit: ghost_commit.clone(),
}])
.await;
info!("ghost commit captured: {}", ghost_commit.id());
}
Ok(Err(err)) => {
warn!(
sub_id = ctx_for_task.sub_id.as_str(),
"failed to capture ghost snapshot: {err}"
);
let message = match err {
GitToolingError::NotAGitRepository { .. } => {
"Snapshots disabled: current directory is not a Git repository."
.to_string()
}
_ => format!("Snapshots disabled after ghost snapshot error: {err}."),
};
session
.session
.notify_background_event(&ctx_for_task, message)
.await;
}
Err(err) => {
warn!(
sub_id = ctx_for_task.sub_id.as_str(),
"ghost snapshot task panicked: {err}"
);
let message =
format!("Snapshots disabled after ghost snapshot panic: {err}.");
session
.session
.notify_background_event(&ctx_for_task, message)
.await;
}
}
} => false,
};
if cancelled {
info!("ghost snapshot task cancelled");
}
match ctx.tool_call_gate.mark_ready(token).await {
Ok(true) => info!("ghost snapshot gate marked ready"),
Ok(false) => warn!("ghost snapshot gate already ready"),
Err(err) => warn!("failed to mark ghost snapshot ready: {err}"),
}
});
None
}
}
impl GhostSnapshotTask {
pub(crate) fn new(token: Token) -> Self {
Self { token }
}
}

View File

@@ -1,6 +1,8 @@
mod compact;
mod ghost_snapshot;
mod regular;
mod review;
mod undo;
use std::sync::Arc;
use std::time::Duration;
@@ -25,8 +27,10 @@ use crate::state::TaskKind;
use codex_protocol::user_input::UserInput;
pub(crate) use compact::CompactTask;
pub(crate) use ghost_snapshot::GhostSnapshotTask;
pub(crate) use regular::RegularTask;
pub(crate) use review::ReviewTask;
pub(crate) use undo::UndoTask;
const GRACEFULL_INTERRUPTION_TIMEOUT_MS: u64 = 100;
@@ -46,10 +50,28 @@ impl SessionTaskContext {
}
}
/// Async task that drives a [`Session`] turn.
///
/// Implementations encapsulate a specific Codex workflow (regular chat,
/// reviews, ghost snapshots, etc.). Each task instance is owned by a
/// [`Session`] and executed on a background Tokio task. The trait is
/// intentionally small: implementers identify themselves via
/// [`SessionTask::kind`], perform their work in [`SessionTask::run`], and may
/// release resources in [`SessionTask::abort`].
#[async_trait]
pub(crate) trait SessionTask: Send + Sync + 'static {
/// Describes the type of work the task performs so the session can
/// surface it in telemetry and UI.
fn kind(&self) -> TaskKind;
/// Executes the task until completion or cancellation.
///
/// Implementations typically stream protocol events using `session` and
/// `ctx`, returning an optional final agent message when finished. The
/// provided `cancellation_token` is cancelled when the session requests an
/// abort; implementers should watch for it and terminate quickly once it
/// fires. Returning [`Some`] yields a final message that
/// [`Session::on_task_finished`] will emit to the client.
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
@@ -58,6 +80,11 @@ pub(crate) trait SessionTask: Send + Sync + 'static {
cancellation_token: CancellationToken,
) -> Option<String>;
/// Gives the task a chance to perform cleanup after an abort.
///
/// The default implementation is a no-op; override this if additional
/// teardown or notifications are required once
/// [`Session::abort_all_tasks`] cancels the task.
async fn abort(&self, session: Arc<SessionTaskContext>, ctx: Arc<TurnContext>) {
let _ = (session, ctx);
}

View File

@@ -0,0 +1,117 @@
use std::sync::Arc;
use crate::codex::TurnContext;
use crate::protocol::EventMsg;
use crate::protocol::UndoCompletedEvent;
use crate::protocol::UndoStartedEvent;
use crate::state::TaskKind;
use crate::tasks::SessionTask;
use crate::tasks::SessionTaskContext;
use async_trait::async_trait;
use codex_git_tooling::restore_ghost_commit;
use codex_protocol::models::ResponseItem;
use codex_protocol::user_input::UserInput;
use tokio_util::sync::CancellationToken;
use tracing::error;
use tracing::info;
use tracing::warn;
pub(crate) struct UndoTask;
impl UndoTask {
pub(crate) fn new() -> Self {
Self
}
}
#[async_trait]
impl SessionTask for UndoTask {
fn kind(&self) -> TaskKind {
TaskKind::Regular
}
async fn run(
self: Arc<Self>,
session: Arc<SessionTaskContext>,
ctx: Arc<TurnContext>,
_input: Vec<UserInput>,
cancellation_token: CancellationToken,
) -> Option<String> {
let sess = session.clone_session();
sess.send_event(
ctx.as_ref(),
EventMsg::UndoStarted(UndoStartedEvent {
message: Some("Undo in progress...".to_string()),
}),
)
.await;
if cancellation_token.is_cancelled() {
sess.send_event(
ctx.as_ref(),
EventMsg::UndoCompleted(UndoCompletedEvent {
success: false,
message: Some("Undo cancelled.".to_string()),
}),
)
.await;
return None;
}
let mut history = sess.clone_history().await;
let mut items = history.get_history();
let mut completed = UndoCompletedEvent {
success: false,
message: None,
};
let Some((idx, ghost_commit)) =
items
.iter()
.enumerate()
.rev()
.find_map(|(idx, item)| match item {
ResponseItem::GhostSnapshot { ghost_commit } => {
Some((idx, ghost_commit.clone()))
}
_ => None,
})
else {
completed.message = Some("No ghost snapshot available to undo.".to_string());
sess.send_event(ctx.as_ref(), EventMsg::UndoCompleted(completed))
.await;
return None;
};
let commit_id = ghost_commit.id().to_string();
let repo_path = ctx.cwd.clone();
let restore_result =
tokio::task::spawn_blocking(move || restore_ghost_commit(&repo_path, &ghost_commit))
.await;
match restore_result {
Ok(Ok(())) => {
items.remove(idx);
sess.replace_history(items).await;
let short_id: String = commit_id.chars().take(7).collect();
info!(commit_id = commit_id, "Undo restored ghost snapshot");
completed.success = true;
completed.message = Some(format!("Undo restored snapshot {short_id}."));
}
Ok(Err(err)) => {
let message = format!("Failed to restore snapshot {commit_id}: {err}");
warn!("{message}");
completed.message = Some(message);
}
Err(err) => {
let message = format!("Failed to restore snapshot {commit_id}: {err}");
error!("{message}");
completed.message = Some(message);
}
}
sess.send_event(ctx.as_ref(), EventMsg::UndoCompleted(completed))
.await;
None
}
}

View File

@@ -15,6 +15,7 @@ use crate::tools::router::ToolCall;
use crate::tools::router::ToolRouter;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use codex_utils_readiness::Readiness;
pub(crate) struct ToolCallRuntime {
router: Arc<ToolRouter>,
@@ -53,12 +54,16 @@ impl ToolCallRuntime {
let tracker = Arc::clone(&self.tracker);
let lock = Arc::clone(&self.parallel_execution);
let aborted_response = Self::aborted_response(&call);
let readiness = self.turn_context.tool_call_gate.clone();
let handle: AbortOnDropHandle<Result<ResponseInputItem, FunctionCallError>> =
AbortOnDropHandle::new(tokio::spawn(async move {
tokio::select! {
_ = cancellation_token.cancelled() => Ok(aborted_response),
res = async {
tracing::info!("waiting for tool gate");
readiness.wait_ready().await;
tracing::info!("tool gate released");
let _guard = if supports_parallel {
Either::Left(lock.read().await)
} else {

View File

@@ -890,17 +890,17 @@ pub(crate) fn build_specs(
|| matches!(config.shell_type, ConfigShellToolType::Streamable);
if use_unified_exec {
builder.push_spec(create_exec_command_tool());
builder.push_spec(create_write_stdin_tool());
builder.push_spec_with_parallel_support(create_exec_command_tool(), true);
builder.push_spec_with_parallel_support(create_write_stdin_tool(), true);
builder.register_handler("exec_command", unified_exec_handler.clone());
builder.register_handler("write_stdin", unified_exec_handler);
}
match &config.shell_type {
ConfigShellToolType::Default => {
builder.push_spec(create_shell_tool());
builder.push_spec_with_parallel_support(create_shell_tool(), true);
}
ConfigShellToolType::Local => {
builder.push_spec(ToolSpec::LocalShell {});
builder.push_spec_with_parallel_support(ToolSpec::LocalShell {}, true);
}
ConfigShellToolType::Streamable => {
// Already handled by use_unified_exec.

View File

@@ -75,9 +75,17 @@ async fn chat_mode_stream_cli() {
server.verify().await;
// Verify a new session rollout was created and is discoverable via list_conversations
let page = RolloutRecorder::list_conversations(home.path(), 10, None, &[])
.await
.expect("list conversations");
let provider_filter = vec!["mock".to_string()];
let page = RolloutRecorder::list_conversations(
home.path(),
10,
None,
&[],
Some(provider_filter.as_slice()),
"mock",
)
.await
.expect("list conversations");
assert!(
!page.items.is_empty(),
"expected at least one session to be listed"

View File

@@ -154,7 +154,8 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
"instructions": "be nice",
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version"
"cli_version": "test_version",
"model_provider": "test-provider"
}
})
)

View File

@@ -18,7 +18,6 @@ use codex_core::built_in_model_providers;
use codex_core::codex::compact::SUMMARIZATION_PROMPT;
use codex_core::config::Config;
use codex_core::config::OPENAI_DEFAULT_MODEL;
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
@@ -42,6 +41,29 @@ fn network_disabled() -> bool {
std::env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok()
}
fn filter_out_ghost_snapshot_entries(items: &[Value]) -> Vec<Value> {
items
.iter()
.filter(|item| !is_ghost_snapshot_message(item))
.cloned()
.collect()
}
fn is_ghost_snapshot_message(item: &Value) -> bool {
if item.get("type").and_then(Value::as_str) != Some("message") {
return false;
}
if item.get("role").and_then(Value::as_str) != Some("user") {
return false;
}
item.get("content")
.and_then(Value::as_array)
.and_then(|content| content.first())
.and_then(|entry| entry.get("text"))
.and_then(Value::as_str)
.is_some_and(|text| text.trim_start().starts_with("<ghost_snapshot>"))
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
/// Scenario: compact an initial conversation, resume it, fork one turn back, and
/// ensure the model-visible history matches expectations at each request.
@@ -61,7 +83,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
user_turn(&base, "hello world").await;
compact_conversation(&base).await;
user_turn(&base, "AFTER_COMPACT").await;
let base_path = fetch_conversation_path(&base, "base conversation").await;
let base_path = fetch_conversation_path(&base).await;
assert!(
base_path.exists(),
"compact+resume test expects base path {base_path:?} to exist",
@@ -69,7 +91,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
let resumed = resume_conversation(&manager, &config, base_path).await;
user_turn(&resumed, "AFTER_RESUME").await;
let resumed_path = fetch_conversation_path(&resumed, "resumed conversation").await;
let resumed_path = fetch_conversation_path(&resumed).await;
assert!(
resumed_path.exists(),
"compact+resume test expects resumed path {resumed_path:?} to exist",
@@ -518,7 +540,7 @@ async fn compact_resume_after_second_compaction_preserves_history() {
user_turn(&base, "hello world").await;
compact_conversation(&base).await;
user_turn(&base, "AFTER_COMPACT").await;
let base_path = fetch_conversation_path(&base, "base conversation").await;
let base_path = fetch_conversation_path(&base).await;
assert!(
base_path.exists(),
"second compact test expects base path {base_path:?} to exist",
@@ -526,7 +548,7 @@ async fn compact_resume_after_second_compaction_preserves_history() {
let resumed = resume_conversation(&manager, &config, base_path).await;
user_turn(&resumed, "AFTER_RESUME").await;
let resumed_path = fetch_conversation_path(&resumed, "resumed conversation").await;
let resumed_path = fetch_conversation_path(&resumed).await;
assert!(
resumed_path.exists(),
"second compact test expects resumed path {resumed_path:?} to exist",
@@ -537,7 +559,7 @@ async fn compact_resume_after_second_compaction_preserves_history() {
compact_conversation(&forked).await;
user_turn(&forked, "AFTER_COMPACT_2").await;
let forked_path = fetch_conversation_path(&forked, "forked conversation").await;
let forked_path = fetch_conversation_path(&forked).await;
assert!(
forked_path.exists(),
"second compact test expects forked path {forked_path:?} to exist",
@@ -557,13 +579,15 @@ async fn compact_resume_after_second_compaction_preserves_history() {
let resume_input_array = input_after_resume
.as_array()
.expect("input after resume should be an array");
let compact_filtered = filter_out_ghost_snapshot_entries(compact_input_array);
let resume_filtered = filter_out_ghost_snapshot_entries(resume_input_array);
assert!(
compact_input_array.len() <= resume_input_array.len(),
compact_filtered.len() <= resume_filtered.len(),
"after-resume input should have at least as many items as after-compact"
);
assert_eq!(
compact_input_array.as_slice(),
&resume_input_array[..compact_input_array.len()]
compact_filtered.as_slice(),
&resume_filtered[..compact_filtered.len()]
);
// hard coded test
let prompt = requests[0]["instructions"]
@@ -792,22 +816,8 @@ async fn compact_conversation(conversation: &Arc<CodexConversation>) {
wait_for_event(conversation, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
}
async fn fetch_conversation_path(
conversation: &Arc<CodexConversation>,
context: &str,
) -> std::path::PathBuf {
conversation
.submit(Op::GetPath)
.await
.expect("request conversation path");
match wait_for_event(conversation, |ev| {
matches!(ev, EventMsg::ConversationPath(_))
})
.await
{
EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path,
_ => panic!("expected ConversationPath event for {context}"),
}
async fn fetch_conversation_path(conversation: &Arc<CodexConversation>) -> std::path::PathBuf {
conversation.rollout_path()
}
async fn resume_conversation(

View File

@@ -4,7 +4,6 @@ use codex_core::ModelProviderInfo;
use codex_core::NewConversation;
use codex_core::built_in_model_providers;
use codex_core::parse_turn_item;
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_core::protocol::RolloutItem;
@@ -79,13 +78,7 @@ async fn fork_conversation_twice_drops_to_first_message() {
}
// Request history from the base conversation to obtain rollout path.
codex.submit(Op::GetPath).await.unwrap();
let base_history =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationPath(_))).await;
let base_path = match &base_history {
EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path.clone(),
_ => panic!("expected ConversationHistory event"),
};
let base_path = codex.rollout_path();
// GetHistory flushes before returning the path; no wait needed.
@@ -140,15 +133,7 @@ async fn fork_conversation_twice_drops_to_first_message() {
.await
.expect("fork 1");
codex_fork1.submit(Op::GetPath).await.unwrap();
let fork1_history = wait_for_event(&codex_fork1, |ev| {
matches!(ev, EventMsg::ConversationPath(_))
})
.await;
let fork1_path = match &fork1_history {
EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path.clone(),
_ => panic!("expected ConversationHistory event after first fork"),
};
let fork1_path = codex_fork1.rollout_path();
// GetHistory on fork1 flushed; the file is ready.
let fork1_items = read_items(&fork1_path);
@@ -166,15 +151,7 @@ async fn fork_conversation_twice_drops_to_first_message() {
.await
.expect("fork 2");
codex_fork2.submit(Op::GetPath).await.unwrap();
let fork2_history = wait_for_event(&codex_fork2, |ev| {
matches!(ev, EventMsg::ConversationPath(_))
})
.await;
let fork2_path = match &fork2_history {
EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path.clone(),
_ => panic!("expected ConversationHistory event after second fork"),
};
let fork2_path = codex_fork2.rollout_path();
// GetHistory on fork2 flushed; the file is ready.
let fork1_items = read_items(&fork1_path);
let fork1_user_inputs = find_user_input_positions(&fork1_items);

View File

@@ -7,7 +7,6 @@ use codex_core::REVIEW_PROMPT;
use codex_core::ResponseItem;
use codex_core::built_in_model_providers;
use codex_core::config::Config;
use codex_core::protocol::ConversationPathResponseEvent;
use codex_core::protocol::ENVIRONMENT_CONTEXT_OPEN_TAG;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExitedReviewModeEvent;
@@ -120,13 +119,7 @@ async fn review_op_emits_lifecycle_and_review_output() {
// Also verify that a user message with the header and a formatted finding
// was recorded back in the parent session's rollout.
codex.submit(Op::GetPath).await.unwrap();
let history_event =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationPath(_))).await;
let path = match history_event {
EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path,
other => panic!("expected ConversationPath event, got {other:?}"),
};
let path = codex.rollout_path();
let text = std::fs::read_to_string(&path).expect("read rollout file");
let mut saw_header = false;
@@ -375,7 +368,8 @@ async fn review_input_isolated_from_parent_history() {
"instructions": null,
"cwd": ".",
"originator": "test_originator",
"cli_version": "test_version"
"cli_version": "test_version",
"model_provider": "test-provider"
}
});
f.write_all(format!("{meta_line}\n").as_bytes())
@@ -482,13 +476,7 @@ async fn review_input_isolated_from_parent_history() {
assert_eq!(instructions, REVIEW_PROMPT);
// Also verify that a user interruption note was recorded in the rollout.
codex.submit(Op::GetPath).await.unwrap();
let history_event =
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ConversationPath(_))).await;
let path = match history_event {
EventMsg::ConversationPath(ConversationPathResponseEvent { path, .. }) => path,
other => panic!("expected ConversationPath event, got {other:?}"),
};
let path = codex.rollout_path();
let text = std::fs::read_to_string(&path).expect("read rollout file");
let mut saw_interruption_message = false;
for line in text.lines() {

View File

@@ -28,7 +28,8 @@ fn write_minimal_rollout_with_id(codex_home: &Path, id: Uuid) -> PathBuf {
"instructions": null,
"cwd": ".",
"originator": "test",
"cli_version": "test"
"cli_version": "test",
"model_provider": "test-provider"
}
})
)

View File

@@ -240,7 +240,7 @@ async fn unified_exec_emits_output_delta_for_exec_command() -> Result<()> {
let call_id = "uexec-delta-1";
let args = json!({
"cmd": "printf 'HELLO-UEXEC'",
"yield_time_ms": 250,
"yield_time_ms": 1000,
});
let responses = vec![

View File

@@ -20,7 +20,6 @@ use codex_core::protocol::StreamErrorEvent;
use codex_core::protocol::TaskCompleteEvent;
use codex_core::protocol::TurnAbortReason;
use codex_core::protocol::TurnDiffEvent;
use codex_core::protocol::WebSearchBeginEvent;
use codex_core::protocol::WebSearchEndEvent;
use codex_protocol::num_format::format_with_separators;
use owo_colors::OwoColorize;
@@ -216,7 +215,6 @@ impl EventProcessor for EventProcessorWithHumanOutput {
cwd.to_string_lossy(),
);
}
EventMsg::ExecCommandOutputDelta(_) => {}
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
aggregated_output,
duration,
@@ -283,7 +281,6 @@ impl EventProcessor for EventProcessorWithHumanOutput {
}
}
}
EventMsg::WebSearchBegin(WebSearchBeginEvent { call_id: _ }) => {}
EventMsg::WebSearchEnd(WebSearchEndEvent { call_id: _, query }) => {
ts_msg!(self, "🌐 Searched: {query}");
}
@@ -411,12 +408,6 @@ impl EventProcessor for EventProcessorWithHumanOutput {
);
eprintln!("{unified_diff}");
}
EventMsg::ExecApprovalRequest(_) => {
// Should we exit?
}
EventMsg::ApplyPatchApprovalRequest(_) => {
// Should we exit?
}
EventMsg::AgentReasoning(agent_reasoning_event) => {
if self.show_agent_reasoning {
ts_msg!(
@@ -481,15 +472,6 @@ impl EventProcessor for EventProcessorWithHumanOutput {
}
}
}
EventMsg::GetHistoryEntryResponse(_) => {
// Currently ignored in exec output.
}
EventMsg::McpListToolsResponse(_) => {
// Currently ignored in exec output.
}
EventMsg::ListCustomPromptsResponse(_) => {
// Currently ignored in exec output.
}
EventMsg::ViewImageToolCall(view) => {
ts_msg!(
self,
@@ -510,16 +492,24 @@ impl EventProcessor for EventProcessorWithHumanOutput {
}
},
EventMsg::ShutdownComplete => return CodexStatus::Shutdown,
EventMsg::ConversationPath(_) => {}
EventMsg::UserMessage(_) => {}
EventMsg::EnteredReviewMode(_) => {}
EventMsg::ExitedReviewMode(_) => {}
EventMsg::AgentMessageDelta(_) => {}
EventMsg::AgentReasoningDelta(_) => {}
EventMsg::AgentReasoningRawContentDelta(_) => {}
EventMsg::ItemStarted(_) => {}
EventMsg::ItemCompleted(_) => {}
EventMsg::RawResponseItem(_) => {}
EventMsg::WebSearchBegin(_)
| EventMsg::ExecApprovalRequest(_)
| EventMsg::ApplyPatchApprovalRequest(_)
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::GetHistoryEntryResponse(_)
| EventMsg::McpListToolsResponse(_)
| EventMsg::ListCustomPromptsResponse(_)
| EventMsg::RawResponseItem(_)
| EventMsg::UserMessage(_)
| EventMsg::EnteredReviewMode(_)
| EventMsg::ExitedReviewMode(_)
| EventMsg::AgentMessageDelta(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::AgentReasoningRawContentDelta(_)
| EventMsg::ItemStarted(_)
| EventMsg::ItemCompleted(_)
| EventMsg::UndoCompleted(_)
| EventMsg::UndoStarted(_) => {}
}
CodexStatus::Running
}

View File

@@ -389,8 +389,16 @@ async fn resolve_resume_path(
args: &crate::cli::ResumeArgs,
) -> anyhow::Result<Option<PathBuf>> {
if args.last {
match codex_core::RolloutRecorder::list_conversations(&config.codex_home, 1, None, &[])
.await
let default_provider_filter = vec![config.model_provider_id.clone()];
match codex_core::RolloutRecorder::list_conversations(
&config.codex_home,
1,
None,
&[],
Some(default_provider_filter.as_slice()),
&config.model_provider_id,
)
.await
{
Ok(page) => Ok(page.items.first().map(|it| it.path.clone())),
Err(e) => {

View File

@@ -172,7 +172,6 @@ impl CodexLogSnapshot {
&self,
classification: &str,
reason: Option<&str>,
cli_version: &str,
include_logs: bool,
rollout_path: Option<&std::path::Path>,
) -> Result<()> {
@@ -198,6 +197,7 @@ impl CodexLogSnapshot {
..Default::default()
});
let cli_version = env!("CARGO_PKG_VERSION");
let mut tags = BTreeMap::from([
(String::from("thread_id"), self.thread_id.to_string()),
(String::from("classification"), classification.to_string()),

View File

@@ -9,13 +9,20 @@ name = "codex_git_tooling"
path = "src/lib.rs"
[dependencies]
tempfile = "3"
thiserror = "2"
walkdir = "2"
tempfile = { workspace = true }
thiserror = { workspace = true }
walkdir = { workspace = true }
schemars = { workspace = true }
serde = { workspace = true, features = ["derive"] }
ts-rs = { workspace = true, features = [
"uuid-impl",
"serde-json-impl",
"no-serde-warnings",
] }
[lints]
workspace = true
[dev-dependencies]
assert_matches = { workspace = true }
pretty_assertions = "1.4.1"
pretty_assertions = { workspace = true }

View File

@@ -1,4 +1,7 @@
use std::collections::HashSet;
use std::ffi::OsString;
use std::fs;
use std::io;
use std::path::Path;
use std::path::PathBuf;
@@ -14,6 +17,7 @@ use crate::operations::resolve_head;
use crate::operations::resolve_repository_root;
use crate::operations::run_git_for_status;
use crate::operations::run_git_for_stdout;
use crate::operations::run_git_for_stdout_all;
/// Default commit message used for ghost commits when none is provided.
const DEFAULT_COMMIT_MESSAGE: &str = "codex snapshot";
@@ -69,6 +73,8 @@ pub fn create_ghost_commit(
let repo_root = resolve_repository_root(options.repo_path)?;
let repo_prefix = repo_subdir(repo_root.as_path(), options.repo_path);
let parent = resolve_head(repo_root.as_path())?;
let existing_untracked =
capture_existing_untracked(repo_root.as_path(), repo_prefix.as_deref())?;
let normalized_force = options
.force_include
@@ -84,6 +90,16 @@ pub fn create_ghost_commit(
OsString::from(index_path.as_os_str()),
)];
// Pre-populate the temporary index with HEAD so unchanged tracked files
// are included in the snapshot tree.
if let Some(parent_sha) = parent.as_deref() {
run_git_for_status(
repo_root.as_path(),
vec![OsString::from("read-tree"), OsString::from(parent_sha)],
Some(base_env.as_slice()),
)?;
}
let mut add_args = vec![OsString::from("add"), OsString::from("--all")];
if let Some(prefix) = repo_prefix.as_deref() {
add_args.extend([OsString::from("--"), prefix.as_os_str().to_os_string()]);
@@ -127,12 +143,29 @@ pub fn create_ghost_commit(
Some(commit_env.as_slice()),
)?;
Ok(GhostCommit::new(commit_id, parent))
Ok(GhostCommit::new(
commit_id,
parent,
existing_untracked.files,
existing_untracked.dirs,
))
}
/// Restore the working tree to match the provided ghost commit.
pub fn restore_ghost_commit(repo_path: &Path, commit: &GhostCommit) -> Result<(), GitToolingError> {
restore_to_commit(repo_path, commit.id())
ensure_git_repository(repo_path)?;
let repo_root = resolve_repository_root(repo_path)?;
let repo_prefix = repo_subdir(repo_root.as_path(), repo_path);
let current_untracked =
capture_existing_untracked(repo_root.as_path(), repo_prefix.as_deref())?;
restore_to_commit_inner(repo_root.as_path(), repo_prefix.as_deref(), commit.id())?;
remove_new_untracked(
repo_root.as_path(),
commit.preexisting_untracked_files(),
commit.preexisting_untracked_dirs(),
current_untracked,
)
}
/// Restore the working tree to match the given commit ID.
@@ -141,7 +174,16 @@ pub fn restore_to_commit(repo_path: &Path, commit_id: &str) -> Result<(), GitToo
let repo_root = resolve_repository_root(repo_path)?;
let repo_prefix = repo_subdir(repo_root.as_path(), repo_path);
restore_to_commit_inner(repo_root.as_path(), repo_prefix.as_deref(), commit_id)
}
/// Restores the working tree and index to the given commit using `git restore`.
/// The repository root and optional repository-relative prefix limit the restore scope.
fn restore_to_commit_inner(
repo_root: &Path,
repo_prefix: Option<&Path>,
commit_id: &str,
) -> Result<(), GitToolingError> {
let mut restore_args = vec![
OsString::from("restore"),
OsString::from("--source"),
@@ -150,13 +192,143 @@ pub fn restore_to_commit(repo_path: &Path, commit_id: &str) -> Result<(), GitToo
OsString::from("--staged"),
OsString::from("--"),
];
if let Some(prefix) = repo_prefix.as_deref() {
if let Some(prefix) = repo_prefix {
restore_args.push(prefix.as_os_str().to_os_string());
} else {
restore_args.push(OsString::from("."));
}
run_git_for_status(repo_root.as_path(), restore_args, None)?;
run_git_for_status(repo_root, restore_args, None)?;
Ok(())
}
#[derive(Default)]
struct UntrackedSnapshot {
files: Vec<PathBuf>,
dirs: Vec<PathBuf>,
}
/// Captures the untracked and ignored entries under `repo_root`, optionally limited by `repo_prefix`.
/// Returns the result as an `UntrackedSnapshot`.
fn capture_existing_untracked(
repo_root: &Path,
repo_prefix: Option<&Path>,
) -> Result<UntrackedSnapshot, GitToolingError> {
// Ask git for the zero-delimited porcelain status so we can enumerate
// every untracked or ignored path (including ones filtered by prefix).
let mut args = vec![
OsString::from("status"),
OsString::from("--porcelain=2"),
OsString::from("-z"),
OsString::from("--ignored=matching"),
OsString::from("--untracked-files=all"),
];
if let Some(prefix) = repo_prefix {
args.push(OsString::from("--"));
args.push(prefix.as_os_str().to_os_string());
}
let output = run_git_for_stdout_all(repo_root, args, None)?;
if output.is_empty() {
return Ok(UntrackedSnapshot::default());
}
let mut snapshot = UntrackedSnapshot::default();
// Each entry is of the form "<code> <path>" where code is '?' (untracked)
// or '!' (ignored); everything else is irrelevant to this snapshot.
for entry in output.split('\0') {
if entry.is_empty() {
continue;
}
let mut parts = entry.splitn(2, ' ');
let code = parts.next();
let path_part = parts.next();
let (Some(code), Some(path_part)) = (code, path_part) else {
continue;
};
if code != "?" && code != "!" {
continue;
}
if path_part.is_empty() {
continue;
}
let normalized = normalize_relative_path(Path::new(path_part))?;
let absolute = repo_root.join(&normalized);
let is_dir = absolute.is_dir();
if is_dir {
snapshot.dirs.push(normalized);
} else {
snapshot.files.push(normalized);
}
}
Ok(snapshot)
}
/// Removes untracked files and directories that were not present when the snapshot was captured.
fn remove_new_untracked(
repo_root: &Path,
preserved_files: &[PathBuf],
preserved_dirs: &[PathBuf],
current: UntrackedSnapshot,
) -> Result<(), GitToolingError> {
if current.files.is_empty() && current.dirs.is_empty() {
return Ok(());
}
let preserved_file_set: HashSet<PathBuf> = preserved_files.iter().cloned().collect();
let preserved_dirs_vec: Vec<PathBuf> = preserved_dirs.to_vec();
for path in current.files {
if should_preserve(&path, &preserved_file_set, &preserved_dirs_vec) {
continue;
}
remove_path(&repo_root.join(&path))?;
}
for dir in current.dirs {
if should_preserve(&dir, &preserved_file_set, &preserved_dirs_vec) {
continue;
}
remove_path(&repo_root.join(&dir))?;
}
Ok(())
}
/// Determines whether an untracked path should be kept because it existed in the snapshot.
fn should_preserve(
path: &Path,
preserved_files: &HashSet<PathBuf>,
preserved_dirs: &[PathBuf],
) -> bool {
if preserved_files.contains(path) {
return true;
}
preserved_dirs
.iter()
.any(|dir| path.starts_with(dir.as_path()))
}
/// Deletes the file or directory at the provided path, ignoring if it is already absent.
fn remove_path(path: &Path) -> Result<(), GitToolingError> {
match fs::symlink_metadata(path) {
Ok(metadata) => {
if metadata.is_dir() {
fs::remove_dir_all(path)?;
} else {
fs::remove_file(path)?;
}
}
Err(err) => {
if err.kind() == io::ErrorKind::NotFound {
return Ok(());
}
return Err(err.into());
}
}
Ok(())
}
@@ -239,6 +411,9 @@ mod tests {
],
);
let preexisting_untracked = repo.join("notes.txt");
std::fs::write(&preexisting_untracked, "notes before\n")?;
let tracked_contents = "modified contents\n";
std::fs::write(repo.join("tracked.txt"), tracked_contents)?;
std::fs::remove_file(repo.join("delete-me.txt"))?;
@@ -267,6 +442,7 @@ mod tests {
std::fs::write(repo.join("ignored.txt"), "changed\n")?;
std::fs::remove_file(repo.join("new-file.txt"))?;
std::fs::write(repo.join("ephemeral.txt"), "temp data\n")?;
std::fs::write(&preexisting_untracked, "notes after\n")?;
restore_ghost_commit(repo, &ghost)?;
@@ -277,7 +453,9 @@ mod tests {
let new_file_after = std::fs::read_to_string(repo.join("new-file.txt"))?;
assert_eq!(new_file_after, new_file_contents);
assert_eq!(repo.join("delete-me.txt").exists(), false);
assert!(repo.join("ephemeral.txt").exists());
assert!(!repo.join("ephemeral.txt").exists());
let notes_after = std::fs::read_to_string(&preexisting_untracked)?;
assert_eq!(notes_after, "notes before\n");
Ok(())
}
@@ -488,7 +666,43 @@ mod tests {
assert!(vscode.join("settings.json").exists());
let settings_after = std::fs::read_to_string(vscode.join("settings.json"))?;
assert_eq!(settings_after, "{\n \"after\": true\n}\n");
assert!(repo.join("temp.txt").exists());
assert!(!repo.join("temp.txt").exists());
Ok(())
}
#[test]
/// Restoring removes ignored directories created after the snapshot.
fn restore_removes_new_ignored_directory() -> Result<(), GitToolingError> {
let temp = tempfile::tempdir()?;
let repo = temp.path();
init_test_repo(repo);
std::fs::write(repo.join(".gitignore"), ".vscode/\n")?;
std::fs::write(repo.join("tracked.txt"), "snapshot version\n")?;
run_git_in(repo, &["add", ".gitignore", "tracked.txt"]);
run_git_in(
repo,
&[
"-c",
"user.name=Tester",
"-c",
"user.email=test@example.com",
"commit",
"-m",
"initial",
],
);
let ghost = create_ghost_commit(&CreateGhostCommitOptions::new(repo))?;
let vscode = repo.join(".vscode");
std::fs::create_dir_all(&vscode)?;
std::fs::write(vscode.join("settings.json"), "{\n \"after\": true\n}\n")?;
restore_ghost_commit(repo, &ghost)?;
assert!(!vscode.exists());
Ok(())
}

View File

@@ -1,4 +1,5 @@
use std::fmt;
use std::path::PathBuf;
mod errors;
mod ghost_commits;
@@ -11,18 +12,36 @@ pub use ghost_commits::create_ghost_commit;
pub use ghost_commits::restore_ghost_commit;
pub use ghost_commits::restore_to_commit;
pub use platform::create_symlink;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use ts_rs::TS;
type CommitID = String;
/// Details of a ghost commit created from a repository state.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema, TS)]
pub struct GhostCommit {
id: String,
parent: Option<String>,
id: CommitID,
parent: Option<CommitID>,
preexisting_untracked_files: Vec<PathBuf>,
preexisting_untracked_dirs: Vec<PathBuf>,
}
impl GhostCommit {
/// Create a new ghost commit wrapper from a raw commit ID and optional parent.
pub fn new(id: String, parent: Option<String>) -> Self {
Self { id, parent }
pub fn new(
id: CommitID,
parent: Option<CommitID>,
preexisting_untracked_files: Vec<PathBuf>,
preexisting_untracked_dirs: Vec<PathBuf>,
) -> Self {
Self {
id,
parent,
preexisting_untracked_files,
preexisting_untracked_dirs,
}
}
/// Commit ID for the snapshot.
@@ -34,6 +53,16 @@ impl GhostCommit {
pub fn parent(&self) -> Option<&str> {
self.parent.as_deref()
}
/// Untracked or ignored files that already existed when the snapshot was captured.
pub fn preexisting_untracked_files(&self) -> &[PathBuf] {
&self.preexisting_untracked_files
}
/// Untracked or ignored directories that already existed when the snapshot was captured.
pub fn preexisting_untracked_dirs(&self) -> &[PathBuf] {
&self.preexisting_untracked_dirs
}
}
impl fmt::Display for GhostCommit {

View File

@@ -161,6 +161,27 @@ where
})
}
/// Executes `git` and returns the full stdout without trimming so callers
/// can parse delimiter-sensitive output, propagating UTF-8 errors with context.
pub(crate) fn run_git_for_stdout_all<I, S>(
dir: &Path,
args: I,
env: Option<&[(OsString, OsString)]>,
) -> Result<String, GitToolingError>
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
// Keep the raw stdout untouched so callers can parse delimiter-sensitive
// output (e.g. NUL-separated paths) without trimming artefacts.
let run = run_git(dir, args, env)?;
// Propagate UTF-8 conversion failures with the command context for debugging.
String::from_utf8(run.output.stdout).map_err(|source| GitToolingError::GitOutputUtf8 {
command: run.command,
source,
})
}
fn run_git<I, S>(
dir: &Path,
args: I,

View File

@@ -281,7 +281,6 @@ async fn run_codex_tool_session_inner(
| EventMsg::GetHistoryEntryResponse(_)
| EventMsg::PlanUpdate(_)
| EventMsg::TurnAborted(_)
| EventMsg::ConversationPath(_)
| EventMsg::UserMessage(_)
| EventMsg::ShutdownComplete
| EventMsg::ViewImageToolCall(_)
@@ -289,6 +288,8 @@ async fn run_codex_tool_session_inner(
| EventMsg::EnteredReviewMode(_)
| EventMsg::ItemStarted(_)
| EventMsg::ItemCompleted(_)
| EventMsg::UndoStarted(_)
| EventMsg::UndoCompleted(_)
| EventMsg::ExitedReviewMode(_) => {
// For now, we do not do anything extra for these
// events. Note that

View File

@@ -11,6 +11,8 @@ path = "src/lib.rs"
workspace = true
[dependencies]
codex-git-tooling = { workspace = true }
base64 = { workspace = true }
icu_decimal = { workspace = true }
icu_locale_core = { workspace = true }

View File

@@ -9,6 +9,7 @@ use serde::ser::Serializer;
use ts_rs::TS;
use crate::user_input::UserInput;
use codex_git_tooling::GhostCommit;
use schemars::JsonSchema;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, JsonSchema, TS)]
@@ -116,6 +117,10 @@ pub enum ResponseItem {
status: Option<String>,
action: WebSearchAction,
},
// Generated by the harness but considered exactly as a model response.
GhostSnapshot {
ghost_commit: GhostCommit,
},
#[serde(other)]
Other,
}

View File

@@ -166,10 +166,6 @@ pub enum Op {
/// Request a single history entry identified by `log_id` + `offset`.
GetHistoryEntryRequest { offset: usize, log_id: u64 },
/// Request the full in-memory conversation transcript for the current session.
/// Reply is delivered via `EventMsg::ConversationHistory`.
GetPath,
/// Request the list of MCP tools available across all configured servers.
/// Reply is delivered via `EventMsg::McpListToolsResponse`.
ListMcpTools,
@@ -182,6 +178,9 @@ pub enum Op {
/// to generate a summary which will be returned as an AgentMessage event.
Compact,
/// Request Codex to undo a turn (turn are stacked so it is the same effect as CMD + Z).
Undo,
/// Request a code review from the agent.
Review { review_request: ReviewRequest },
@@ -490,6 +489,10 @@ pub enum EventMsg {
BackgroundEvent(BackgroundEventEvent),
UndoStarted(UndoStartedEvent),
UndoCompleted(UndoCompletedEvent),
/// Notification that a model stream experienced an error or disconnect
/// and the system is handling it (e.g., retrying with backoff).
StreamError(StreamErrorEvent),
@@ -519,8 +522,6 @@ pub enum EventMsg {
/// Notification that the agent is shutting down.
ShutdownComplete,
ConversationPath(ConversationPathResponseEvent),
/// Entered review mode.
EnteredReviewMode(ReviewRequest),
@@ -941,6 +942,7 @@ pub struct SessionMeta {
pub instructions: Option<String>,
#[serde(default)]
pub source: SessionSource,
pub model_provider: Option<String>,
}
impl Default for SessionMeta {
@@ -953,6 +955,7 @@ impl Default for SessionMeta {
cli_version: String::new(),
instructions: None,
source: SessionSource::default(),
model_provider: None,
}
}
}
@@ -1139,6 +1142,19 @@ pub struct BackgroundEventEvent {
pub message: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct UndoStartedEvent {
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct UndoCompletedEvent {
pub success: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct StreamErrorEvent {
pub message: String,

View File

@@ -35,7 +35,6 @@ codex-common = { workspace = true, features = [
] }
codex-core = { workspace = true }
codex-file-search = { workspace = true }
codex-git-tooling = { workspace = true }
codex-login = { workspace = true }
codex-ollama = { workspace = true }
codex-protocol = { workspace = true }

View File

@@ -103,9 +103,16 @@ impl App {
nth_user_message: usize,
) {
self.backtrack.pending = Some((base_id, nth_user_message, prefill));
self.app_event_tx.send(crate::app_event::AppEvent::CodexOp(
codex_core::protocol::Op::GetPath,
));
if let Some(path) = self.chat_widget.rollout_path() {
let ev = ConversationPathResponseEvent {
conversation_id: base_id,
path,
};
self.app_event_tx
.send(crate::app_event::AppEvent::ConversationHistory(ev));
} else {
tracing::error!("rollout path unavailable; cannot backtrack");
}
}
/// Open transcript overlay (enters alternate screen and shows full transcript).

View File

@@ -73,13 +73,11 @@ impl FeedbackNoteView {
let rollout_path_ref = self.rollout_path.as_deref();
let classification = feedback_classification(self.category);
let cli_version = crate::version::CODEX_CLI_VERSION;
let mut thread_id = self.snapshot.thread_id.clone();
let result = self.snapshot.upload_feedback(
classification,
reason_opt,
cli_version,
self.include_logs,
if self.include_logs {
rollout_path_ref

View File

@@ -315,6 +315,11 @@ impl BottomPane {
self.ctrl_c_quit_hint
}
#[cfg(test)]
pub(crate) fn status_indicator_visible(&self) -> bool {
self.status.is_some()
}
pub(crate) fn show_esc_backtrack_hint(&mut self) {
self.esc_backtrack_hint = true;
self.composer.set_esc_backtrack_hint(true);
@@ -343,6 +348,7 @@ impl BottomPane {
));
}
if let Some(status) = self.status.as_mut() {
status.set_interrupt_hint_visible(true);
status.set_queued_messages(self.queued_user_messages.clone());
}
self.request_redraw();
@@ -359,6 +365,23 @@ impl BottomPane {
}
}
pub(crate) fn ensure_status_indicator(&mut self) {
if self.status.is_none() {
self.status = Some(StatusIndicatorWidget::new(
self.app_event_tx.clone(),
self.frame_requester.clone(),
));
self.request_redraw();
}
}
pub(crate) fn set_interrupt_hint_visible(&mut self, visible: bool) {
if let Some(status) = self.status.as_mut() {
status.set_interrupt_hint_visible(visible);
self.request_redraw();
}
}
pub(crate) fn set_context_window_percent(&mut self, percent: Option<i64>) {
if self.context_window_percent == percent {
return;

View File

@@ -37,6 +37,8 @@ use codex_core::protocol::TokenUsage;
use codex_core::protocol::TokenUsageInfo;
use codex_core::protocol::TurnAbortReason;
use codex_core::protocol::TurnDiffEvent;
use codex_core::protocol::UndoCompletedEvent;
use codex_core::protocol::UndoStartedEvent;
use codex_core::protocol::UserMessageEvent;
use codex_core::protocol::ViewImageToolCallEvent;
use codex_core::protocol::WebSearchBeginEvent;
@@ -113,16 +115,9 @@ use codex_core::protocol::AskForApproval;
use codex_core::protocol::SandboxPolicy;
use codex_core::protocol_config_types::ReasoningEffort as ReasoningEffortConfig;
use codex_file_search::FileMatch;
use codex_git_tooling::CreateGhostCommitOptions;
use codex_git_tooling::GhostCommit;
use codex_git_tooling::GitToolingError;
use codex_git_tooling::create_ghost_commit;
use codex_git_tooling::restore_ghost_commit;
use codex_protocol::plan_tool::UpdatePlanArgs;
use strum::IntoEnumIterator;
const MAX_TRACKED_GHOST_COMMITS: usize = 20;
// Track information about an in-flight exec command.
struct RunningCommand {
command: Vec<String>,
@@ -267,9 +262,6 @@ pub(crate) struct ChatWidget {
pending_notification: Option<Notification>,
// Simple review mode flag; used to adjust layout and banners.
is_review_mode: bool,
// List of ghost commits corresponding to each turn.
ghost_snapshots: Vec<GhostCommit>,
ghost_snapshots_disabled: bool,
// Whether to add a final message separator after the last message
needs_final_message_separator: bool,
@@ -312,9 +304,6 @@ impl ChatWidget {
}
fn set_status_header(&mut self, header: String) {
if self.current_status_header == header {
return;
}
self.current_status_header = header.clone();
self.bottom_pane.update_status_header(header);
}
@@ -437,6 +426,7 @@ impl ChatWidget {
self.bottom_pane.clear_ctrl_c_quit_hint();
self.bottom_pane.set_task_running(true);
self.retry_status_header = None;
self.bottom_pane.set_interrupt_hint_visible(true);
self.set_status_header(String::from("Working"));
self.full_reasoning_buffer.clear();
self.reasoning_buffer.clear();
@@ -672,6 +662,32 @@ impl ChatWidget {
debug!("BackgroundEvent: {message}");
}
fn on_undo_started(&mut self, event: UndoStartedEvent) {
self.bottom_pane.ensure_status_indicator();
self.bottom_pane.set_interrupt_hint_visible(false);
let message = event
.message
.unwrap_or_else(|| "Undo in progress...".to_string());
self.set_status_header(message);
}
fn on_undo_completed(&mut self, event: UndoCompletedEvent) {
let UndoCompletedEvent { success, message } = event;
self.bottom_pane.hide_status_indicator();
let message = message.unwrap_or_else(|| {
if success {
"Undo completed successfully.".to_string()
} else {
"Undo failed.".to_string()
}
});
if success {
self.add_info_message(message, None);
} else {
self.add_error_message(message);
}
}
fn on_stream_error(&mut self, message: String) {
if self.retry_status_header.is_none() {
self.retry_status_header = Some(self.current_status_header.clone());
@@ -989,8 +1005,6 @@ impl ChatWidget {
suppress_session_configured_redraw: false,
pending_notification: None,
is_review_mode: false,
ghost_snapshots: Vec::new(),
ghost_snapshots_disabled: true,
needs_final_message_separator: false,
last_rendered_width: std::cell::Cell::new(None),
feedback,
@@ -1057,8 +1071,6 @@ impl ChatWidget {
suppress_session_configured_redraw: true,
pending_notification: None,
is_review_mode: false,
ghost_snapshots: Vec::new(),
ghost_snapshots_disabled: true,
needs_final_message_separator: false,
last_rendered_width: std::cell::Cell::new(None),
feedback,
@@ -1211,7 +1223,7 @@ impl ChatWidget {
self.app_event_tx.send(AppEvent::ExitRequest);
}
SlashCommand::Undo => {
self.undo_last_snapshot();
self.app_event_tx.send(AppEvent::CodexOp(Op::Undo));
}
SlashCommand::Diff => {
self.add_diff_in_progress();
@@ -1328,8 +1340,6 @@ impl ChatWidget {
return;
}
self.capture_ghost_snapshot();
let mut items: Vec<UserInput> = Vec::new();
if !text.is_empty() {
@@ -1362,57 +1372,6 @@ impl ChatWidget {
self.needs_final_message_separator = false;
}
fn capture_ghost_snapshot(&mut self) {
if self.ghost_snapshots_disabled {
return;
}
let options = CreateGhostCommitOptions::new(&self.config.cwd);
match create_ghost_commit(&options) {
Ok(commit) => {
self.ghost_snapshots.push(commit);
if self.ghost_snapshots.len() > MAX_TRACKED_GHOST_COMMITS {
self.ghost_snapshots.remove(0);
}
}
Err(err) => {
self.ghost_snapshots_disabled = true;
let (message, hint) = match &err {
GitToolingError::NotAGitRepository { .. } => (
"Snapshots disabled: current directory is not a Git repository."
.to_string(),
None,
),
_ => (
format!("Snapshots disabled after error: {err}"),
Some(
"Restart Codex after resolving the issue to re-enable snapshots."
.to_string(),
),
),
};
self.add_info_message(message, hint);
tracing::warn!("failed to create ghost snapshot: {err}");
}
}
}
fn undo_last_snapshot(&mut self) {
let Some(commit) = self.ghost_snapshots.pop() else {
self.add_info_message("No snapshot available to undo.".to_string(), None);
return;
};
if let Err(err) = restore_ghost_commit(&self.config.cwd, &commit) {
self.add_error_message(format!("Failed to restore snapshot: {err}"));
self.ghost_snapshots.push(commit);
return;
}
let short_id: String = commit.id().chars().take(8).collect();
self.add_info_message(format!("Restored workspace to snapshot {short_id}"), None);
}
/// Replay a subset of initial events into the UI to seed the transcript when
/// resuming an existing session. This approximates the live event flow and
/// is intentionally conservative: only safe-to-replay items are rendered to
@@ -1510,16 +1469,14 @@ impl ChatWidget {
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
self.on_background_event(message)
}
EventMsg::UndoStarted(ev) => self.on_undo_started(ev),
EventMsg::UndoCompleted(ev) => self.on_undo_completed(ev),
EventMsg::StreamError(StreamErrorEvent { message }) => self.on_stream_error(message),
EventMsg::UserMessage(ev) => {
if from_replay {
self.on_user_message_event(ev);
}
}
EventMsg::ConversationPath(ev) => {
self.app_event_tx
.send(crate::app_event::AppEvent::ConversationHistory(ev));
}
EventMsg::EnteredReviewMode(review_request) => {
self.on_entered_review_mode(review_request)
}
@@ -2250,6 +2207,10 @@ impl ChatWidget {
self.conversation_id
}
pub(crate) fn rollout_path(&self) -> Option<PathBuf> {
self.current_rollout_path.clone()
}
/// Return a reference to the widget's current config (includes any
/// runtime overrides applied via TUI, e.g., model or approval policy).
pub(crate) fn config_ref(&self) -> &Config {

View File

@@ -34,6 +34,8 @@ use codex_core::protocol::ReviewRequest;
use codex_core::protocol::StreamErrorEvent;
use codex_core::protocol::TaskCompleteEvent;
use codex_core::protocol::TaskStartedEvent;
use codex_core::protocol::UndoCompletedEvent;
use codex_core::protocol::UndoStartedEvent;
use codex_core::protocol::ViewImageToolCallEvent;
use codex_protocol::ConversationId;
use codex_protocol::plan_tool::PlanItemArg;
@@ -294,8 +296,6 @@ fn make_chatwidget_manual() -> (
suppress_session_configured_redraw: false,
pending_notification: None,
is_review_mode: false,
ghost_snapshots: Vec::new(),
ghost_snapshots_disabled: false,
needs_final_message_separator: false,
last_rendered_width: std::cell::Cell::new(None),
feedback: codex_feedback::CodexFeedback::new(),
@@ -849,6 +849,109 @@ fn slash_init_skips_when_project_doc_exists() {
);
}
#[test]
fn slash_undo_sends_op() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
chat.dispatch_command(SlashCommand::Undo);
match rx.try_recv() {
Ok(AppEvent::CodexOp(Op::Undo)) => {}
other => panic!("expected AppEvent::CodexOp(Op::Undo), got {other:?}"),
}
}
#[test]
fn undo_success_events_render_info_messages() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
chat.handle_codex_event(Event {
id: "turn-1".to_string(),
msg: EventMsg::UndoStarted(UndoStartedEvent {
message: Some("Undo requested for the last turn...".to_string()),
}),
});
assert!(
chat.bottom_pane.status_indicator_visible(),
"status indicator should be visible during undo"
);
chat.handle_codex_event(Event {
id: "turn-1".to_string(),
msg: EventMsg::UndoCompleted(UndoCompletedEvent {
success: true,
message: None,
}),
});
let cells = drain_insert_history(&mut rx);
assert_eq!(cells.len(), 1, "expected final status only");
assert!(
!chat.bottom_pane.status_indicator_visible(),
"status indicator should be hidden after successful undo"
);
let completed = lines_to_single_string(&cells[0]);
assert!(
completed.contains("Undo completed successfully."),
"expected default success message, got {completed:?}"
);
}
#[test]
fn undo_failure_events_render_error_message() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();
chat.handle_codex_event(Event {
id: "turn-2".to_string(),
msg: EventMsg::UndoStarted(UndoStartedEvent { message: None }),
});
assert!(
chat.bottom_pane.status_indicator_visible(),
"status indicator should be visible during undo"
);
chat.handle_codex_event(Event {
id: "turn-2".to_string(),
msg: EventMsg::UndoCompleted(UndoCompletedEvent {
success: false,
message: Some("Failed to restore workspace state.".to_string()),
}),
});
let cells = drain_insert_history(&mut rx);
assert_eq!(cells.len(), 1, "expected final status only");
assert!(
!chat.bottom_pane.status_indicator_visible(),
"status indicator should be hidden after failed undo"
);
let completed = lines_to_single_string(&cells[0]);
assert!(
completed.contains("Failed to restore workspace state."),
"expected failure message, got {completed:?}"
);
}
#[test]
fn undo_started_hides_interrupt_hint() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual();
chat.handle_codex_event(Event {
id: "turn-hint".to_string(),
msg: EventMsg::UndoStarted(UndoStartedEvent { message: None }),
});
let status = chat
.bottom_pane
.status_widget()
.expect("status indicator should be active");
assert!(
!status.interrupt_hint_visible(),
"undo should hide the interrupt hint because the operation cannot be cancelled"
);
}
/// The commit picker shows only commit subjects (no timestamps).
#[test]
fn review_commit_picker_shows_subjects_without_timestamps() {

View File

@@ -391,11 +391,14 @@ async fn run_ratatui_app(
}
}
} else if cli.resume_last {
let provider_filter = vec![config.model_provider_id.clone()];
match RolloutRecorder::list_conversations(
&config.codex_home,
1,
None,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
&config.model_provider_id,
)
.await
{
@@ -407,7 +410,13 @@ async fn run_ratatui_app(
Err(_) => resume_picker::ResumeSelection::StartFresh,
}
} else if cli.resume_picker {
match resume_picker::run_resume_picker(&mut tui, &config.codex_home).await? {
match resume_picker::run_resume_picker(
&mut tui,
&config.codex_home,
&config.model_provider_id,
)
.await?
{
resume_picker::ResumeSelection::Exit => {
restore();
session_log::log_session_end();

View File

@@ -49,6 +49,7 @@ struct PageLoadRequest {
cursor: Option<Cursor>,
request_token: usize,
search_token: Option<usize>,
default_provider: String,
}
type PageLoader = Arc<dyn Fn(PageLoadRequest) + Send + Sync>;
@@ -64,19 +65,28 @@ enum BackgroundEvent {
/// Interactive session picker that lists recorded rollout files with simple
/// search and pagination. Shows the first user input as the preview, relative
/// time (e.g., "5 seconds ago"), and the absolute path.
pub async fn run_resume_picker(tui: &mut Tui, codex_home: &Path) -> Result<ResumeSelection> {
pub async fn run_resume_picker(
tui: &mut Tui,
codex_home: &Path,
default_provider: &str,
) -> Result<ResumeSelection> {
let alt = AltScreenGuard::enter(tui);
let (bg_tx, bg_rx) = mpsc::unbounded_channel();
let default_provider = default_provider.to_string();
let loader_tx = bg_tx.clone();
let page_loader: PageLoader = Arc::new(move |request: PageLoadRequest| {
let tx = loader_tx.clone();
tokio::spawn(async move {
let provider_filter = vec![request.default_provider.clone()];
let page = RolloutRecorder::list_conversations(
&request.codex_home,
PAGE_SIZE,
request.cursor.as_ref(),
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
request.default_provider.as_str(),
)
.await;
let _ = tx.send(BackgroundEvent::PageLoaded {
@@ -91,6 +101,7 @@ pub async fn run_resume_picker(tui: &mut Tui, codex_home: &Path) -> Result<Resum
codex_home.to_path_buf(),
alt.tui.frame_requester(),
page_loader,
default_provider.clone(),
);
state.load_initial_page().await?;
state.request_frame();
@@ -165,6 +176,7 @@ struct PickerState {
next_search_token: usize,
page_loader: PageLoader,
view_rows: Option<usize>,
default_provider: String,
}
struct PaginationState {
@@ -225,7 +237,12 @@ struct Row {
}
impl PickerState {
fn new(codex_home: PathBuf, requester: FrameRequester, page_loader: PageLoader) -> Self {
fn new(
codex_home: PathBuf,
requester: FrameRequester,
page_loader: PageLoader,
default_provider: String,
) -> Self {
Self {
codex_home,
requester,
@@ -246,6 +263,7 @@ impl PickerState {
next_search_token: 0,
page_loader,
view_rows: None,
default_provider,
}
}
@@ -324,11 +342,14 @@ impl PickerState {
}
async fn load_initial_page(&mut self) -> Result<()> {
let provider_filter = vec![self.default_provider.clone()];
let page = RolloutRecorder::list_conversations(
&self.codex_home,
PAGE_SIZE,
None,
INTERACTIVE_SESSION_SOURCES,
Some(provider_filter.as_slice()),
self.default_provider.as_str(),
)
.await?;
self.reset_pagination();
@@ -552,6 +573,7 @@ impl PickerState {
cursor: Some(cursor),
request_token,
search_token,
default_provider: self.default_provider.clone(),
});
}
@@ -1061,8 +1083,12 @@ mod tests {
use ratatui::layout::Layout;
let loader: PageLoader = Arc::new(|_| {});
let mut state =
PickerState::new(PathBuf::from("/tmp"), FrameRequester::test_dummy(), loader);
let mut state = PickerState::new(
PathBuf::from("/tmp"),
FrameRequester::test_dummy(),
loader,
String::from("openai"),
);
let now = Utc::now();
let rows = vec![
@@ -1117,8 +1143,12 @@ mod tests {
#[test]
fn pageless_scrolling_deduplicates_and_keeps_order() {
let loader: PageLoader = Arc::new(|_| {});
let mut state =
PickerState::new(PathBuf::from("/tmp"), FrameRequester::test_dummy(), loader);
let mut state = PickerState::new(
PathBuf::from("/tmp"),
FrameRequester::test_dummy(),
loader,
String::from("openai"),
);
state.reset_pagination();
state.ingest_page(page(
@@ -1179,8 +1209,12 @@ mod tests {
request_sink.lock().unwrap().push(req);
});
let mut state =
PickerState::new(PathBuf::from("/tmp"), FrameRequester::test_dummy(), loader);
let mut state = PickerState::new(
PathBuf::from("/tmp"),
FrameRequester::test_dummy(),
loader,
String::from("openai"),
);
state.reset_pagination();
state.ingest_page(page(
vec![
@@ -1204,8 +1238,12 @@ mod tests {
#[test]
fn page_navigation_uses_view_rows() {
let loader: PageLoader = Arc::new(|_| {});
let mut state =
PickerState::new(PathBuf::from("/tmp"), FrameRequester::test_dummy(), loader);
let mut state = PickerState::new(
PathBuf::from("/tmp"),
FrameRequester::test_dummy(),
loader,
String::from("openai"),
);
let mut items = Vec::new();
for idx in 0..20 {
@@ -1248,8 +1286,12 @@ mod tests {
#[test]
fn up_at_bottom_does_not_scroll_when_visible() {
let loader: PageLoader = Arc::new(|_| {});
let mut state =
PickerState::new(PathBuf::from("/tmp"), FrameRequester::test_dummy(), loader);
let mut state = PickerState::new(
PathBuf::from("/tmp"),
FrameRequester::test_dummy(),
loader,
String::from("openai"),
);
let mut items = Vec::new();
for idx in 0..10 {
@@ -1288,8 +1330,12 @@ mod tests {
request_sink.lock().unwrap().push(req);
});
let mut state =
PickerState::new(PathBuf::from("/tmp"), FrameRequester::test_dummy(), loader);
let mut state = PickerState::new(
PathBuf::from("/tmp"),
FrameRequester::test_dummy(),
loader,
String::from("openai"),
);
state.reset_pagination();
state.ingest_page(page(
vec![make_item(

View File

@@ -39,7 +39,7 @@ impl SlashCommand {
SlashCommand::Init => "create an AGENTS.md file with instructions for Codex",
SlashCommand::Compact => "summarize conversation to prevent hitting the context limit",
SlashCommand::Review => "review my current changes and find issues",
SlashCommand::Undo => "restore the workspace to the last Codex snapshot",
SlashCommand::Undo => "ask Codex to undo a turn",
SlashCommand::Quit => "exit Codex",
SlashCommand::Diff => "show git diff (including untracked files)",
SlashCommand::Mention => "mention a file",
@@ -85,14 +85,5 @@ impl SlashCommand {
/// Return all built-in commands in a Vec paired with their command string.
pub fn built_in_slash_commands() -> Vec<(&'static str, SlashCommand)> {
let show_beta_features = beta_features_enabled();
SlashCommand::iter()
.filter(|cmd| *cmd != SlashCommand::Undo || show_beta_features)
.map(|c| (c.command(), c))
.collect()
}
fn beta_features_enabled() -> bool {
std::env::var_os("BETA_FEATURE").is_some()
SlashCommand::iter().map(|c| (c.command(), c)).collect()
}

View File

@@ -25,6 +25,8 @@ pub(crate) struct StatusIndicatorWidget {
header: String,
/// Queued user messages to display under the status line.
queued_messages: Vec<String>,
/// Whether to show the interrupt hint (Esc).
show_interrupt_hint: bool,
elapsed_running: Duration,
last_resume_at: Instant,
@@ -55,6 +57,7 @@ impl StatusIndicatorWidget {
Self {
header: String::from("Working"),
queued_messages: Vec::new(),
show_interrupt_hint: true,
elapsed_running: Duration::ZERO,
last_resume_at: Instant::now(),
is_paused: false,
@@ -98,9 +101,11 @@ impl StatusIndicatorWidget {
/// Update the animated header label (left of the brackets).
pub(crate) fn update_header(&mut self, header: String) {
if self.header != header {
self.header = header;
}
self.header = header;
}
pub(crate) fn set_interrupt_hint_visible(&mut self, visible: bool) {
self.show_interrupt_hint = visible;
}
#[cfg(test)]
@@ -108,6 +113,11 @@ impl StatusIndicatorWidget {
&self.header
}
#[cfg(test)]
pub(crate) fn interrupt_hint_visible(&self) -> bool {
self.show_interrupt_hint
}
/// Replace the queued messages displayed beneath the header.
pub(crate) fn set_queued_messages(&mut self, queued: Vec<String>) {
self.queued_messages = queued;
@@ -175,12 +185,16 @@ impl WidgetRef for StatusIndicatorWidget {
spans.push(spinner(Some(self.last_resume_at)));
spans.push(" ".into());
spans.extend(shimmer_spans(&self.header));
spans.extend(vec![
" ".into(),
format!("({pretty_elapsed}").dim(),
key_hint::plain(KeyCode::Esc).into(),
" to interrupt)".dim(),
]);
spans.push(" ".into());
if self.show_interrupt_hint {
spans.extend(vec![
format!("({pretty_elapsed}").dim(),
key_hint::plain(KeyCode::Esc).into(),
" to interrupt)".dim(),
]);
} else {
spans.push(format!("({pretty_elapsed})").dim());
}
// Build lines: status, then queued messages, then spacer.
let mut lines: Vec<Line<'static>> = Vec::new();

View File

@@ -1,6 +1,7 @@
//! Readiness flag with token-based authorization and async waiting (Tokio).
use std::collections::HashSet;
use std::fmt;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
@@ -71,6 +72,10 @@ impl ReadinessFlag {
.map_err(|_| errors::ReadinessError::TokenLockFailed)?;
Ok(f(&mut guard))
}
fn load_ready(&self) -> bool {
self.ready.load(Ordering::Acquire)
}
}
impl Default for ReadinessFlag {
@@ -79,14 +84,37 @@ impl Default for ReadinessFlag {
}
}
impl fmt::Debug for ReadinessFlag {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReadinessFlag")
.field("ready", &self.load_ready())
.finish()
}
}
#[async_trait::async_trait]
impl Readiness for ReadinessFlag {
fn is_ready(&self) -> bool {
self.ready.load(Ordering::Acquire)
if self.load_ready() {
return true;
}
if let Ok(tokens) = self.tokens.try_lock()
&& tokens.is_empty()
{
let was_ready = self.ready.swap(true, Ordering::AcqRel);
drop(tokens);
if !was_ready {
let _ = self.tx.send(true);
}
return true;
}
self.load_ready()
}
async fn subscribe(&self) -> Result<Token, errors::ReadinessError> {
if self.is_ready() {
if self.load_ready() {
return Err(errors::ReadinessError::FlagAlreadyReady);
}
@@ -97,7 +125,7 @@ impl Readiness for ReadinessFlag {
// check above and inserting the token.
let inserted = self
.with_tokens(|tokens| {
if self.is_ready() {
if self.load_ready() {
return false;
}
tokens.insert(token);
@@ -113,7 +141,7 @@ impl Readiness for ReadinessFlag {
}
async fn mark_ready(&self, token: Token) -> Result<bool, errors::ReadinessError> {
if self.is_ready() {
if self.load_ready() {
return Ok(false);
}
if token.0 == 0 {
@@ -202,7 +230,8 @@ mod tests {
async fn mark_ready_rejects_unknown_token() -> Result<(), ReadinessError> {
let flag = ReadinessFlag::new();
assert!(!flag.mark_ready(Token(42)).await?);
assert!(!flag.is_ready());
assert!(!flag.load_ready());
assert!(flag.is_ready());
Ok(())
}
@@ -233,6 +262,19 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn is_ready_without_subscribers_marks_flag_ready() -> Result<(), ReadinessError> {
let flag = ReadinessFlag::new();
assert!(flag.is_ready());
assert!(flag.is_ready());
assert_matches!(
flag.subscribe().await,
Err(ReadinessError::FlagAlreadyReady)
);
Ok(())
}
#[tokio::test]
async fn subscribe_returns_error_when_lock_is_held() {
let flag = ReadinessFlag::new();