Fix exec CI regressions on app-server migration

This commit is contained in:
Eric Traut
2026-03-19 00:35:12 -06:00
parent a19077ce26
commit c2851508b4
5 changed files with 90 additions and 46 deletions

4
codex-rs/Cargo.lock generated
View File

@@ -1996,9 +1996,7 @@ dependencies = [
"codex-utils-absolute-path",
"codex-utils-cargo-bin",
"codex-utils-cli",
"codex-utils-elapsed",
"codex-utils-oss",
"codex-utils-sandbox-summary",
"core_test_support",
"libc",
"opentelemetry",
@@ -2006,10 +2004,8 @@ dependencies = [
"owo-colors",
"predicates",
"pretty_assertions",
"rmcp",
"serde",
"serde_json",
"shlex",
"supports-color 3.0.2",
"tempfile",
"tokio",

View File

@@ -311,7 +311,7 @@ struct ChatgptAuthRefreshContext {
impl ChatgptAuthRefreshContext {
fn resolve_refresh_response(
&self,
params: &ChatgptAuthTokensRefreshParams,
_params: &ChatgptAuthTokensRefreshParams,
) -> Result<ChatgptAuthTokensRefreshResponse, String> {
let auth = load_auth_dot_json(&self.codex_home, self.auth_credentials_store_mode)
.map_err(|err| format!("failed to load local auth: {err}"))?
@@ -356,7 +356,7 @@ impl InProcessAppServerClient {
let channel_capacity = args.channel_capacity.max(1);
let auto_handle_chatgpt_auth_refresh = args.auto_handle_chatgpt_auth_refresh;
let auth_refresh_context = ChatgptAuthRefreshContext {
codex_home: args.config.codex_home.clone().into(),
codex_home: args.config.codex_home.clone(),
auth_credentials_store_mode: args.config.cli_auth_credentials_store_mode,
forced_chatgpt_workspace_id: args.config.forced_chatgpt_workspace_id.clone(),
};

View File

@@ -28,13 +28,10 @@ codex-otel = { workspace = true }
codex-protocol = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-cli = { workspace = true }
codex-utils-elapsed = { workspace = true }
codex-utils-oss = { workspace = true }
codex-utils-sandbox-summary = { workspace = true }
owo-colors = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
shlex = { workspace = true }
supports-color = { workspace = true }
tokio = { workspace = true, features = [
"io-std",
@@ -63,7 +60,6 @@ opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
predicates = { workspace = true }
pretty_assertions = { workspace = true }
rmcp = { workspace = true }
tempfile = { workspace = true }
tracing-opentelemetry = { workspace = true }
uuid = { workspace = true }

View File

@@ -29,6 +29,7 @@ use codex_app_server_protocol::ReviewStartResponse;
use codex_app_server_protocol::ReviewTarget as ApiReviewTarget;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::Thread as AppServerThread;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadResumeParams;
@@ -61,6 +62,7 @@ use codex_core::config_loader::LoaderOverrides;
use codex_core::config_loader::format_config_error_with_source;
use codex_core::format_exec_policy_error_with_source;
use codex_core::git_info::get_git_repo_root;
use codex_core::path_utils;
use codex_feedback::CodexFeedback;
use codex_otel::set_parent_from_context;
use codex_otel::traceparent_context_from_env;
@@ -68,6 +70,8 @@ use codex_protocol::config_types::SandboxMode;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::ReviewTarget;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SessionConfiguredEvent;
use codex_protocol::protocol::SessionSource;
use codex_protocol::user_input::UserInput;
@@ -80,6 +84,7 @@ use serde_json::Value;
use std::collections::HashMap;
use std::io::IsTerminal;
use std::io::Read;
use std::path::Path;
use std::path::PathBuf;
use supports_color::Stream;
use tokio::sync::mpsc;
@@ -1021,39 +1026,81 @@ fn all_thread_source_kinds() -> Vec<ThreadSourceKind> {
]
}
async fn latest_thread_cwd(thread: &AppServerThread) -> PathBuf {
if let Some(path) = thread.path.as_deref()
&& let Some(cwd) = parse_latest_turn_context_cwd(path).await
{
return cwd;
}
thread.cwd.clone()
}
async fn parse_latest_turn_context_cwd(path: &Path) -> Option<PathBuf> {
let text = tokio::fs::read_to_string(path).await.ok()?;
for line in text.lines().rev() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let Ok(rollout_line) = serde_json::from_str::<RolloutLine>(trimmed) else {
continue;
};
if let RolloutItem::TurnContext(item) = rollout_line.item {
return Some(item.cwd);
}
}
None
}
fn cwds_match(current_cwd: &Path, session_cwd: &Path) -> bool {
match (
path_utils::normalize_for_path_comparison(current_cwd),
path_utils::normalize_for_path_comparison(session_cwd),
) {
(Ok(current), Ok(session)) => current == session,
_ => current_cwd == session_cwd,
}
}
async fn resolve_resume_thread_id(
client: &InProcessAppServerClient,
config: &Config,
args: &crate::cli::ResumeArgs,
) -> anyhow::Result<Option<String>> {
let filter_cwd = if args.all {
None
} else {
Some(config.cwd.to_string_lossy().to_string())
};
let model_providers = Some(vec![config.model_provider_id.clone()]);
if args.last {
let response: ThreadListResponse = send_request_with_response(
client,
ClientRequest::ThreadList {
request_id: RequestId::Integer(0),
params: ThreadListParams {
cursor: None,
limit: Some(1),
sort_key: Some(ThreadSortKey::UpdatedAt),
model_providers,
source_kinds: Some(all_thread_source_kinds()),
archived: Some(false),
cwd: filter_cwd,
search_term: None,
let mut cursor = None;
loop {
let response: ThreadListResponse = send_request_with_response(
client,
ClientRequest::ThreadList {
request_id: RequestId::Integer(0),
params: ThreadListParams {
cursor,
limit: Some(100),
sort_key: Some(ThreadSortKey::UpdatedAt),
model_providers: model_providers.clone(),
source_kinds: Some(all_thread_source_kinds()),
archived: Some(false),
cwd: None,
search_term: None,
},
},
},
"thread/list",
)
.await
.map_err(anyhow::Error::msg)?;
return Ok(response.data.into_iter().next().map(|thread| thread.id));
"thread/list",
)
.await
.map_err(anyhow::Error::msg)?;
for thread in response.data {
if args.all || cwds_match(config.cwd.as_path(), &latest_thread_cwd(&thread).await) {
return Ok(Some(thread.id));
}
}
let Some(next_cursor) = response.next_cursor else {
return Ok(None);
};
cursor = Some(next_cursor);
}
}
let Some(session_id) = args.session_id.as_deref() else {
@@ -1076,11 +1123,7 @@ async fn resolve_resume_thread_id(
model_providers: Some(vec![config.model_provider_id.clone()]),
source_kinds: Some(all_thread_source_kinds()),
archived: Some(false),
cwd: if args.all {
None
} else {
Some(config.cwd.to_string_lossy().to_string())
},
cwd: None,
// Thread names are attached separately from rollout titles, so name
// resolution must scan the filtered list client-side instead of relying
// on the backend `search_term` filter.
@@ -1091,12 +1134,13 @@ async fn resolve_resume_thread_id(
)
.await
.map_err(anyhow::Error::msg)?;
if let Some(thread) = response
.data
.into_iter()
.find(|thread| thread.name.as_deref() == Some(session_id))
{
return Ok(Some(thread.id));
for thread in response.data {
if thread.name.as_deref() != Some(session_id) {
continue;
}
if args.all || cwds_match(config.cwd.as_path(), &latest_thread_cwd(&thread).await) {
return Ok(Some(thread.id));
}
}
let Some(next_cursor) = response.next_cursor else {
return Ok(None);

View File

@@ -36,10 +36,18 @@ describe("Codex", () => {
{
type: "turn.started",
},
{
type: "item.started",
item: {
id: "msg_mock",
type: "agent_message",
text: "Hi!",
},
},
{
type: "item.completed",
item: {
id: "item_0",
id: "msg_mock",
type: "agent_message",
text: "Hi!",
},