Compare commits

...

2 Commits

Author SHA1 Message Date
Friel
2be95f5082 Inherit forked agent prompt cache keys 2026-04-13 17:41:15 +00:00
Eric Traut
d25a9822a7 Do not fail thread start when trust persistence fails (#17595)
Addresses #17593

Problem: A regression introduced in
https://github.com/openai/codex/pull/16492 made thread/start fail when
Codex could not persist trusted project state, which crashes startup for
users with read-only config.toml.

Solution: Treat trusted project persistence as best effort and keep the
current thread's config trusted in memory when writing config.toml
fails.
2026-04-13 10:03:21 -07:00
15 changed files with 169 additions and 26 deletions

View File

@@ -213,6 +213,7 @@ use codex_core::config_loader::CloudRequirementsLoadErrorCode;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::LoaderOverrides;
use codex_core::config_loader::load_config_layers_state;
use codex_core::config_loader::project_trust_key;
use codex_core::exec::ExecCapturePolicy;
use codex_core::exec::ExecExpiration;
use codex_core::exec::ExecParams;
@@ -2291,25 +2292,42 @@ impl CodexMessageProcessor {
{
let trust_target = resolve_root_git_project_for_trust(config.cwd.as_path())
.unwrap_or_else(|| config.cwd.to_path_buf());
if let Err(err) = codex_core::config::set_project_trust_level(
&listener_task_context.codex_home,
trust_target.as_path(),
TrustLevel::Trusted,
) {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to persist trusted project state: {err}"),
data: None,
};
listener_task_context
.outgoing
.send_error(request_id, error)
.await;
return;
}
let cli_overrides_with_trust;
let cli_overrides_for_reload = if let Err(err) =
codex_core::config::set_project_trust_level(
&listener_task_context.codex_home,
trust_target.as_path(),
TrustLevel::Trusted,
) {
warn!(
"failed to persist trusted project state for {}; continuing with in-memory trust for this thread: {err}",
trust_target.display()
);
let mut project = toml::map::Map::new();
project.insert(
"trust_level".to_string(),
TomlValue::String("trusted".to_string()),
);
let mut projects = toml::map::Map::new();
projects.insert(
project_trust_key(trust_target.as_path()),
TomlValue::Table(project),
);
cli_overrides_with_trust = cli_overrides
.iter()
.cloned()
.chain(std::iter::once((
"projects".to_string(),
TomlValue::Table(projects),
)))
.collect::<Vec<_>>();
cli_overrides_with_trust.as_slice()
} else {
&cli_overrides
};
config = match derive_config_from_params(
&cli_overrides,
cli_overrides_for_reload,
config_overrides,
typesafe_overrides,
&cloud_requirements,

View File

@@ -8,6 +8,7 @@ use crate::codex::emit_subagent_session_started;
use crate::codex_thread::ThreadConfigSnapshot;
use crate::find_archived_thread_path_by_id_str;
use crate::find_thread_path_by_id_str;
use crate::inherited_thread_state::InheritedThreadState;
use crate::rollout::RolloutRecorder;
use crate::session_prefix::format_subagent_context_line;
use crate::session_prefix::format_subagent_notification_message;
@@ -218,6 +219,11 @@ impl AgentControl {
// The same `AgentControl` is sent to spawn the thread.
let new_thread = match (session_source, options.fork_mode.as_ref()) {
(Some(session_source), Some(_)) => {
let inherited_thread_state = InheritedThreadState::builder()
.prompt_cache_key(
parent_prompt_cache_key_for_source(&state, Some(&session_source)).await,
)
.build();
self.spawn_forked_thread(
&state,
config,
@@ -225,6 +231,7 @@ impl AgentControl {
&options,
inherited_shell_snapshot,
inherited_exec_policy,
inherited_thread_state,
)
.await?
}
@@ -238,6 +245,7 @@ impl AgentControl {
/*metrics_service_name*/ None,
inherited_shell_snapshot,
inherited_exec_policy,
Default::default(),
)
.await?
}
@@ -324,6 +332,7 @@ impl AgentControl {
})
}
#[allow(clippy::too_many_arguments)]
async fn spawn_forked_thread(
&self,
state: &Arc<ThreadManagerState>,
@@ -332,6 +341,7 @@ impl AgentControl {
options: &SpawnAgentOptions,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
inherited_thread_state: InheritedThreadState,
) -> CodexResult<crate::thread_manager::NewThread> {
if options.fork_parent_spawn_call_id.is_none() {
return Err(CodexErr::Fatal(
@@ -397,6 +407,7 @@ impl AgentControl {
/*persist_extended_history*/ false,
inherited_shell_snapshot,
inherited_exec_policy,
inherited_thread_state,
)
.await
}
@@ -546,6 +557,7 @@ impl AgentControl {
session_source,
inherited_shell_snapshot,
inherited_exec_policy,
Default::default(),
)
.await?;
let mut agent_metadata = agent_metadata;
@@ -1160,6 +1172,24 @@ impl AgentControl {
}
}
async fn parent_prompt_cache_key_for_source(
state: &Arc<ThreadManagerState>,
session_source: Option<&SessionSource>,
) -> Option<ThreadId> {
let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id, ..
})) = session_source
else {
return None;
};
state
.get_thread(*parent_thread_id)
.await
.ok()
.map(|parent_thread| parent_thread.codex.session.prompt_cache_key())
}
fn thread_spawn_parent_thread_id(session_source: &SessionSource) -> Option<ThreadId> {
match session_source {
SessionSource::SubAgent(SubAgentSource::ThreadSpawn {

View File

@@ -663,6 +663,10 @@ async fn spawn_agent_can_fork_parent_thread_history_with_sanitized_items() {
.await
.expect("child thread should be registered");
assert_ne!(child_thread_id, parent_thread_id);
assert_eq!(
child_thread.codex.session.prompt_cache_key(),
parent_thread.codex.session.prompt_cache_key(),
);
let history = child_thread.codex.session.clone_history().await;
let expected_history = [
ResponseItem::Message {
@@ -1518,7 +1522,7 @@ async fn resume_thread_subagent_restores_stored_nickname_and_role() {
manager,
control,
};
let (parent_thread_id, _parent_thread) = harness.start_thread().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let agent_path = AgentPath::from_string("/root/explorer".to_string())
.expect("test agent path should be valid");
@@ -1608,13 +1612,22 @@ async fn resume_thread_subagent_restores_stored_nickname_and_role() {
.expect("resume should succeed");
assert_eq!(resumed_thread_id, child_thread_id);
let resumed_snapshot = harness
let resumed_thread = harness
.manager
.get_thread(resumed_thread_id)
.await
.expect("resumed child thread should exist")
.config_snapshot()
.await;
.expect("resumed child thread should exist");
assert_eq!(
resumed_thread.codex.session.prompt_cache_key(),
resumed_thread_id,
"resume should keep the resumed thread's own cache key"
);
assert_ne!(
resumed_thread.codex.session.prompt_cache_key(),
parent_thread.codex.session.prompt_cache_key(),
"resume must not opportunistically inherit cache state from a live parent"
);
let resumed_snapshot = resumed_thread.config_snapshot().await;
let SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id: resumed_parent_thread_id,
depth: resumed_depth,

View File

@@ -147,6 +147,7 @@ struct ModelClientState {
conversation_id: ThreadId,
window_generation: AtomicU64,
installation_id: String,
prompt_cache_key_override: Option<ThreadId>,
provider: ModelProviderInfo,
auth_env_telemetry: AuthEnvTelemetry,
session_source: SessionSource,
@@ -300,6 +301,7 @@ impl ModelClient {
auth_manager: Option<Arc<AuthManager>>,
conversation_id: ThreadId,
installation_id: String,
prompt_cache_key_override: Option<ThreadId>,
provider: ModelProviderInfo,
session_source: SessionSource,
model_verbosity: Option<VerbosityConfig>,
@@ -318,6 +320,7 @@ impl ModelClient {
conversation_id,
window_generation: AtomicU64::new(0),
installation_id,
prompt_cache_key_override,
provider,
auth_env_telemetry,
session_source,
@@ -365,6 +368,12 @@ impl ModelClient {
format!("{conversation_id}:{window_generation}")
}
pub(crate) fn prompt_cache_key(&self) -> ThreadId {
self.state
.prompt_cache_key_override
.unwrap_or(self.state.conversation_id)
}
fn take_cached_websocket_session(&self) -> WebsocketSession {
let mut cached_websocket_session = self
.state
@@ -861,7 +870,7 @@ impl ModelClientSession {
None
};
let text = create_text_param_for_request(verbosity, &prompt.output_schema);
let prompt_cache_key = Some(self.client.state.conversation_id.to_string());
let prompt_cache_key = Some(self.client.prompt_cache_key().to_string());
let request = ResponsesApiRequest {
model: model_info.slug.clone(),
instructions: instructions.clone(),

View File

@@ -20,11 +20,13 @@ use pretty_assertions::assert_eq;
use serde_json::json;
fn test_model_client(session_source: SessionSource) -> ModelClient {
let conversation_id = ThreadId::new();
let provider = create_oss_provider_with_base_url("https://example.com/v1", WireApi::Responses);
ModelClient::new(
/*auth_manager*/ None,
ThreadId::new(),
conversation_id,
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
/*prompt_cache_key_override*/ None,
provider,
session_source,
/*model_verbosity*/ None,

View File

@@ -24,6 +24,7 @@ use crate::compact_remote::run_inline_remote_auto_compact_task;
use crate::config::ManagedFeatures;
use crate::connectors;
use crate::exec_policy::ExecPolicyManager;
use crate::inherited_thread_state::InheritedThreadState;
use crate::installation_id::resolve_installation_id;
use crate::mcp_tool_exposure::build_mcp_tool_exposure;
use crate::parse_turn_item;
@@ -436,6 +437,7 @@ pub(crate) struct CodexSpawnArgs {
pub(crate) metrics_service_name: Option<String>,
pub(crate) inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
pub(crate) inherited_exec_policy: Option<Arc<ExecPolicyManager>>,
pub(crate) inherited_thread_state: InheritedThreadState,
pub(crate) user_shell_override: Option<shell::Shell>,
pub(crate) parent_trace: Option<W3cTraceContext>,
pub(crate) analytics_events_client: Option<AnalyticsEventsClient>,
@@ -490,6 +492,7 @@ impl Codex {
inherited_shell_snapshot,
user_shell_override,
inherited_exec_policy,
inherited_thread_state,
parent_trace: _,
analytics_events_client,
} = args;
@@ -682,6 +685,7 @@ impl Codex {
skills_watcher,
agent_control,
environment,
inherited_thread_state,
analytics_events_client,
)
.await
@@ -1627,6 +1631,7 @@ impl Session {
skills_watcher: Arc<SkillsWatcher>,
agent_control: AgentControl,
environment: Option<Arc<Environment>>,
inherited_thread_state: InheritedThreadState,
analytics_events_client: Option<AnalyticsEventsClient>,
) -> anyhow::Result<Arc<Self>> {
debug!(
@@ -1669,6 +1674,7 @@ impl Session {
),
),
};
let prompt_cache_key_override = inherited_thread_state.prompt_cache_key();
let window_generation = match &initial_history {
InitialHistory::Resumed(resumed_history) => u64::try_from(
resumed_history
@@ -2057,6 +2063,7 @@ impl Session {
Some(Arc::clone(&auth_manager)),
conversation_id,
installation_id,
prompt_cache_key_override,
session_configuration.provider.clone(),
session_configuration.session_source.clone(),
config.model_verbosity,
@@ -2244,6 +2251,10 @@ impl Session {
self.services.state_db.clone()
}
pub(crate) fn prompt_cache_key(&self) -> ThreadId {
self.services.model_client.prompt_cache_key()
}
/// Flush rollout writes and return the final durability-barrier result.
pub(crate) async fn flush_rollout(&self) -> std::io::Result<()> {
let recorder = {

View File

@@ -96,6 +96,7 @@ pub(crate) async fn run_codex_thread_interactive(
inherited_shell_snapshot: None,
user_shell_override: None,
inherited_exec_policy: Some(Arc::clone(&parent_session.services.exec_policy)),
inherited_thread_state: Default::default(),
parent_trace: None,
})
.await?;

View File

@@ -257,11 +257,13 @@ async fn interrupting_regular_turn_waiting_on_startup_prewarm_emits_turn_aborted
}
fn test_model_client_session() -> crate::client::ModelClientSession {
let conversation_id = ThreadId::try_from("00000000-0000-4000-8000-000000000001")
.expect("test thread id should be valid");
crate::client::ModelClient::new(
/*auth_manager*/ None,
ThreadId::try_from("00000000-0000-4000-8000-000000000001")
.expect("test thread id should be valid"),
conversation_id,
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
/*prompt_cache_key_override*/ None,
ModelProviderInfo::create_openai_provider(/* base_url */ /*base_url*/ None),
codex_protocol::protocol::SessionSource::Exec,
/*model_verbosity*/ None,
@@ -2742,6 +2744,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() {
.await
.expect("create environment"),
)),
Default::default(),
/*analytics_events_client*/ None,
)
.await;
@@ -2886,6 +2889,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
Some(auth_manager.clone()),
conversation_id,
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
/*prompt_cache_key_override*/ None,
session_configuration.provider.clone(),
session_configuration.session_source.clone(),
config.model_verbosity,
@@ -3731,6 +3735,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
Some(Arc::clone(&auth_manager)),
conversation_id,
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
/*prompt_cache_key_override*/ None,
session_configuration.provider.clone(),
session_configuration.session_source.clone(),
config.model_verbosity,

View File

@@ -450,6 +450,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
metrics_service_name: None,
inherited_shell_snapshot: None,
inherited_exec_policy: Some(Arc::new(parent_exec_policy)),
inherited_thread_state: Default::default(),
user_shell_override: None,
parent_trace: None,
})

View File

@@ -0,0 +1,34 @@
use codex_protocol::ThreadId;
#[derive(Clone, Copy, Debug, Default)]
pub(crate) struct InheritedThreadState {
prompt_cache_key: Option<ThreadId>,
}
impl InheritedThreadState {
pub(crate) fn builder() -> InheritedThreadStateBuilder {
InheritedThreadStateBuilder::default()
}
pub(crate) fn prompt_cache_key(&self) -> Option<ThreadId> {
self.prompt_cache_key
}
}
#[derive(Default)]
pub(crate) struct InheritedThreadStateBuilder {
prompt_cache_key: Option<ThreadId>,
}
impl InheritedThreadStateBuilder {
pub(crate) fn prompt_cache_key(mut self, prompt_cache_key: Option<ThreadId>) -> Self {
self.prompt_cache_key = prompt_cache_key;
self
}
pub(crate) fn build(self) -> InheritedThreadState {
InheritedThreadState {
prompt_cache_key: self.prompt_cache_key,
}
}
}

View File

@@ -39,6 +39,7 @@ mod flags;
mod git_info_tests;
mod guardian;
mod hook_runtime;
mod inherited_thread_state;
mod installation_id;
pub(crate) mod instructions;
pub(crate) mod landlock;

View File

@@ -7,6 +7,7 @@ use crate::codex::INITIAL_SUBMIT_ID;
use crate::codex_thread::CodexThread;
use crate::config::Config;
use crate::file_watcher::FileWatcher;
use crate::inherited_thread_state::InheritedThreadState;
use crate::mcp::McpManager;
use crate::plugins::PluginsManager;
use crate::rollout::RolloutRecorder;
@@ -764,6 +765,7 @@ impl ThreadManagerState {
/*metrics_service_name*/ None,
/*inherited_shell_snapshot*/ None,
/*inherited_exec_policy*/ None,
Default::default(),
))
.await
}
@@ -778,6 +780,7 @@ impl ThreadManagerState {
metrics_service_name: Option<String>,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
inherited_thread_state: InheritedThreadState,
) -> CodexResult<NewThread> {
Box::pin(self.spawn_thread_with_source(
config,
@@ -790,12 +793,14 @@ impl ThreadManagerState {
metrics_service_name,
inherited_shell_snapshot,
inherited_exec_policy,
inherited_thread_state,
/*parent_trace*/ None,
/*user_shell_override*/ None,
))
.await
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn resume_thread_from_rollout_with_source(
&self,
config: Config,
@@ -804,6 +809,7 @@ impl ThreadManagerState {
session_source: SessionSource,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
inherited_thread_state: InheritedThreadState,
) -> CodexResult<NewThread> {
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
Box::pin(self.spawn_thread_with_source(
@@ -817,6 +823,7 @@ impl ThreadManagerState {
/*metrics_service_name*/ None,
inherited_shell_snapshot,
inherited_exec_policy,
inherited_thread_state,
/*parent_trace*/ None,
/*user_shell_override*/ None,
))
@@ -833,6 +840,7 @@ impl ThreadManagerState {
persist_extended_history: bool,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
inherited_thread_state: InheritedThreadState,
) -> CodexResult<NewThread> {
Box::pin(self.spawn_thread_with_source(
config,
@@ -845,6 +853,7 @@ impl ThreadManagerState {
/*metrics_service_name*/ None,
inherited_shell_snapshot,
inherited_exec_policy,
inherited_thread_state,
/*parent_trace*/ None,
/*user_shell_override*/ None,
))
@@ -876,6 +885,7 @@ impl ThreadManagerState {
metrics_service_name,
/*inherited_shell_snapshot*/ None,
/*inherited_exec_policy*/ None,
Default::default(),
parent_trace,
user_shell_override,
))
@@ -895,6 +905,7 @@ impl ThreadManagerState {
metrics_service_name: Option<String>,
inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
inherited_exec_policy: Option<Arc<crate::exec_policy::ExecPolicyManager>>,
inherited_thread_state: InheritedThreadState,
parent_trace: Option<W3cTraceContext>,
user_shell_override: Option<crate::shell::Shell>,
) -> CodexResult<NewThread> {
@@ -922,6 +933,7 @@ impl ThreadManagerState {
metrics_service_name,
inherited_shell_snapshot,
inherited_exec_policy,
inherited_thread_state,
user_shell_override,
parent_trace,
analytics_events_client: self.analytics_events_client.clone(),

View File

@@ -101,6 +101,7 @@ async fn responses_stream_includes_subagent_header_on_review() {
/*auth_manager*/ None,
conversation_id,
/*installation_id*/ TEST_INSTALLATION_ID.to_string(),
/*prompt_cache_key_override*/ None,
provider.clone(),
session_source,
config.model_verbosity,
@@ -226,6 +227,7 @@ async fn responses_stream_includes_subagent_header_on_other() {
/*auth_manager*/ None,
conversation_id,
/*installation_id*/ TEST_INSTALLATION_ID.to_string(),
/*prompt_cache_key_override*/ None,
provider.clone(),
session_source,
config.model_verbosity,
@@ -340,6 +342,7 @@ async fn responses_respects_model_info_overrides_from_config() {
/*auth_manager*/ None,
conversation_id,
/*installation_id*/ TEST_INSTALLATION_ID.to_string(),
/*prompt_cache_key_override*/ None,
provider.clone(),
session_source,
config.model_verbosity,

View File

@@ -881,6 +881,7 @@ async fn send_provider_auth_request(server: &MockServer, auth: ModelProviderAuth
))),
conversation_id,
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
/*prompt_cache_key_override*/ None,
provider,
SessionSource::Exec,
config.model_verbosity,
@@ -2179,6 +2180,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
/*auth_manager*/ None,
conversation_id,
/*installation_id*/ "11111111-1111-4111-8111-111111111111".to_string(),
/*prompt_cache_key_override*/ None,
provider.clone(),
SessionSource::Exec,
config.model_verbosity,

View File

@@ -1814,6 +1814,7 @@ async fn websocket_harness_with_provider_options(
/*auth_manager*/ None,
conversation_id,
/*installation_id*/ TEST_INSTALLATION_ID.to_string(),
/*prompt_cache_key_override*/ None,
provider.clone(),
SessionSource::Exec,
config.model_verbosity,