Compare commits

...

3 Commits

Author SHA1 Message Date
Dylan Hurd
684598f8c6 fmt tests 2025-08-06 20:27:46 -07:00
Dylan Hurd
9725fcacd5 fix rebase, add tests 2025-08-06 20:10:47 -07:00
Dylan Hurd
8be96fb592 [onboarding] Add simple codex-state interface 2025-08-06 19:59:15 -07:00
8 changed files with 409 additions and 80 deletions

View File

@@ -4,6 +4,7 @@
use std::borrow::Cow;
use std::collections::HashMap;
use std::collections::HashSet;
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::Mutex;
@@ -145,6 +146,7 @@ impl Codex {
disable_response_storage: config.disable_response_storage,
notify: config.notify.clone(),
cwd: config.cwd.clone(),
trust_cwd: config.trust_cwd,
resume_path: resume_path.clone(),
};
@@ -707,6 +709,7 @@ async fn submission_loop(
disable_response_storage,
notify,
cwd,
trust_cwd,
resume_path,
} => {
debug!(
@@ -724,6 +727,37 @@ async fn submission_loop(
}
return;
}
// clone the config and update trust_cwd to match Op::ConfigureSession
let mut updated_config = config.deref().clone();
updated_config.trust_cwd = trust_cwd;
let updated_config = Arc::new(updated_config);
// sync trust_cwd to codex_state, and update config accordingly
if let Some(trust_cwd) = trust_cwd {
if trust_cwd {
match crate::codex_state::lookup_project(&updated_config).await {
Ok(mut project) => {
project.trusted = trust_cwd;
trace!(
"set cwd {} trust to {}",
updated_config.cwd.to_string_lossy(),
trust_cwd
);
if let Err(e) =
crate::codex_state::update_project(&updated_config, &project)
.await
{
warn!("failed to persist project state: {e}");
}
}
Err(e) => {
warn!("failed to persist project state: {e}");
}
}
}
}
// Optionally resume an existing rollout.
let mut restored_items: Option<Vec<ResponseItem>> = None;
let rollout_recorder: Option<RolloutRecorder> =
@@ -748,8 +782,12 @@ async fn submission_loop(
let rollout_recorder = match rollout_recorder {
Some(rec) => Some(rec),
None => {
match RolloutRecorder::new(&config, session_id, user_instructions.clone())
.await
match RolloutRecorder::new(
&updated_config,
session_id,
user_instructions.clone(),
)
.await
{
Ok(r) => Some(r),
Err(e) => {
@@ -761,7 +799,7 @@ async fn submission_loop(
};
let client = ModelClient::new(
config.clone(),
updated_config.clone(),
auth.clone(),
provider.clone(),
model_reasoning_effort,
@@ -786,7 +824,7 @@ async fn submission_loop(
// Error messages to dispatch after SessionConfigured is sent.
let mut mcp_connection_errors = Vec::<Event>::new();
let (mcp_connection_manager, failed_clients) =
match McpConnectionManager::new(config.mcp_servers.clone()).await {
match McpConnectionManager::new(updated_config.mcp_servers.clone()).await {
Ok((mgr, failures)) => (mgr, failures),
Err(e) => {
let message = format!("Failed to create MCP connection manager: {e:#}");
@@ -815,10 +853,10 @@ async fn submission_loop(
sess = Some(Arc::new(Session {
client,
tools_config: ToolsConfig::new(
&config.model_family,
&updated_config.model_family,
approval_policy,
sandbox_policy.clone(),
config.include_plan_tool,
updated_config.include_plan_tool,
),
tx_event: tx_event.clone(),
ctrl_c: Arc::clone(&ctrl_c),
@@ -826,17 +864,17 @@ async fn submission_loop(
base_instructions,
approval_policy,
sandbox_policy,
shell_environment_policy: config.shell_environment_policy.clone(),
shell_environment_policy: updated_config.shell_environment_policy.clone(),
cwd,
writable_roots,
mcp_connection_manager,
notify,
state: Mutex::new(state),
rollout: Mutex::new(rollout_recorder),
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
codex_linux_sandbox_exe: updated_config.codex_linux_sandbox_exe.clone(),
disable_response_storage,
user_shell: default_shell,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
show_raw_agent_reasoning: updated_config.show_raw_agent_reasoning,
}));
// Patch restored state into the newly created session.
@@ -849,7 +887,7 @@ async fn submission_loop(
// Gather history metadata for SessionConfiguredEvent.
let (history_log_id, history_entry_count) =
crate::message_history::history_metadata(&config).await;
crate::message_history::history_metadata(&updated_config).await;
// ack
let events = std::iter::once(Event {

View File

@@ -0,0 +1,179 @@
//! Persistence layer for the global, mutable project state store
//!
//! The project state is stored at `~/.codex/project_state.json` as a JSON
//! object. The object has the following schema:
//!
//! ```json
//! {
//! "projects": {
//! "</abs/path/to/project>": {
//! "trusted": <boolean>,
//! // more to come...
//! }
//! },
//! }
//! ```
//!
//! To avoid race conditions, we leverage advisory file locking to ensure that
//! only one process can read or write the file at a time.
use std::collections::HashMap;
use std::fs::File;
use std::fs::OpenOptions;
use std::io::Read;
use std::io::Result;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;
use std::path::PathBuf;
use serde::Deserialize;
use serde::Serialize;
use crate::config::Config;
use crate::util::acquire_exclusive_lock_with_retry;
#[cfg(unix)]
use crate::util::acquire_shared_lock_with_retry;
use crate::util::ensure_owner_only_permissions;
/// Filename that stores the project state inside `~/.codex`.
const CODEX_STATE_FILENAME: &str = "codex-state.json";
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct Project {
pub trusted: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
#[serde(rename_all = "camelCase")]
#[derive(Default)]
pub struct CodexState {
pub projects: HashMap<PathBuf, Project>,
}
fn codex_state_filepath(config: &Config) -> PathBuf {
let mut path = config.codex_home.clone();
path.push(CODEX_STATE_FILENAME);
path
}
async fn open_or_create_codex_state_file(config: &Config) -> Result<File> {
// Resolve `~/.codex/codex-state.json` and ensure the parent directory exists.
let path = codex_state_filepath(config);
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let file: File = match OpenOptions::new()
.read(true)
.create(true)
.append(true)
.open(&path)
{
Ok(f) => f,
Err(e) => {
tracing::warn!(error = %e, "failed to open project state file");
return Err(e);
}
};
// Ensure restrictive permissions (0600) on Unix.
#[cfg(unix)]
ensure_owner_only_permissions(&file).await?;
Ok(file)
}
/// Lookup current project's state, creating it if needed.
#[cfg(unix)]
pub async fn lookup_project(config: &Config) -> Result<Project> {
use std::io::BufReader;
let file = open_or_create_codex_state_file(config).await?;
// Acquire a shared lock for reading.
if let Err(e) = acquire_shared_lock_with_retry(&file) {
tracing::warn!(error = %e, "failed to acquire shared lock on project state file");
return Err(e);
}
// Try to parse JSON; if empty or invalid, fall back to default state.
let mut reader = BufReader::new(&file);
let mut buf = String::new();
match reader.read_to_string(&mut buf) {
Ok(_) => {
let codex_state: CodexState = if buf.trim().is_empty() {
CodexState::default()
} else {
match serde_json::from_str(&buf) {
Ok(v) => v,
Err(e) => {
tracing::warn!(error = %e, "failed to parse project state file; using defaults");
CodexState::default()
}
}
};
let project = codex_state
.projects
.get(&config.cwd)
.cloned()
.unwrap_or(Project { trusted: false });
Ok(project)
}
Err(e) => {
tracing::warn!(error = %e, "failed to read project state file");
Err(e)
}
}
}
/// Not yet supported on nonUnix platforms.
#[cfg(not(unix))]
pub async fn lookup_project(_config: &Config) -> Result<Project> {
return Ok(Project { trusted: false });
}
/// Update the project state for the given project. This function will
/// (currently) read and write the entire file, which does not scale well.
/// Use this function sparingly until we implement a more efficient solution.
#[cfg(unix)]
pub async fn update_project(config: &Config, project: &Project) -> Result<()> {
let mut file = open_or_create_codex_state_file(config).await?;
// Open & lock file for writing.
if let Err(e) = acquire_exclusive_lock_with_retry(&file).await {
tracing::warn!(error = %e, "failed to acquire exclusive lock on project state file");
return Err(e);
}
// Ensure file permissions.
ensure_owner_only_permissions(&file).await?;
// Read existing state (if any).
let mut contents = String::new();
// Safety: reading from start; ensure cursor at 0.
file.seek(SeekFrom::Start(0))?;
let _ = file.read_to_string(&mut contents);
let mut codex_state: CodexState = if contents.trim().is_empty() {
CodexState::default()
} else {
serde_json::from_str(&contents).unwrap_or_default()
};
let cwd = config.cwd.clone();
codex_state.projects.insert(cwd, project.clone());
// Overwrite the file from the beginning
file.set_len(0)?;
file.seek(SeekFrom::Start(0))?;
serde_json::to_writer_pretty(&mut file, &codex_state)?;
file.flush()?;
Ok(())
}
/// Not yet supported on nonUnix platforms.
#[cfg(not(unix))]
pub async fn update_project(_config: &Config, _project: &Project) -> Result<()> {
return Ok(());
}

View File

@@ -103,6 +103,9 @@ pub struct Config {
/// resolved against this path.
pub cwd: PathBuf,
/// Whether the current repo is trusted.
pub trust_cwd: Option<bool>,
/// Definition for MCP servers that Codex can reach out to for tool calls.
pub mcp_servers: HashMap<String, McpServerConfig>,
@@ -520,6 +523,7 @@ impl Config {
model_provider_id,
model_provider,
cwd: resolved_cwd,
trust_cwd: None,
approval_policy: approval_policy
.or(config_profile.approval_policy)
.or(cfg.approval_policy)
@@ -909,6 +913,7 @@ disable_response_storage = true
user_instructions: None,
notify: None,
cwd: fixture.cwd(),
trust_cwd: None,
mcp_servers: HashMap::new(),
model_providers: fixture.model_provider_map.clone(),
project_doc_max_bytes: PROJECT_DOC_MAX_BYTES,
@@ -960,6 +965,7 @@ disable_response_storage = true
user_instructions: None,
notify: None,
cwd: fixture.cwd(),
trust_cwd: None,
mcp_servers: HashMap::new(),
model_providers: fixture.model_provider_map.clone(),
project_doc_max_bytes: PROJECT_DOC_MAX_BYTES,
@@ -1026,6 +1032,7 @@ disable_response_storage = true
user_instructions: None,
notify: None,
cwd: fixture.cwd(),
trust_cwd: None,
mcp_servers: HashMap::new(),
model_providers: fixture.model_provider_map.clone(),
project_doc_max_bytes: PROJECT_DOC_MAX_BYTES,

View File

@@ -11,6 +11,7 @@ mod chat_completions;
mod client;
mod client_common;
pub mod codex;
pub mod codex_state;
pub use codex::Codex;
pub use codex::CodexSpawnOk;
pub mod codex_wrapper;

View File

@@ -14,7 +14,6 @@
//! the file descriptor is opened with the `O_APPEND` flag. POSIX guarantees
//! that writes up to `PIPE_BUF` bytes are atomic in that case.
use std::fs::File;
use std::fs::OpenOptions;
use std::io::Result;
use std::io::Write;
@@ -22,25 +21,21 @@ use std::path::PathBuf;
use serde::Deserialize;
use serde::Serialize;
use std::time::Duration;
use tokio::fs;
use tokio::io::AsyncReadExt;
use uuid::Uuid;
use crate::config::Config;
use crate::config_types::HistoryPersistence;
use crate::util::acquire_exclusive_lock_with_retry;
use crate::util::ensure_owner_only_permissions;
#[cfg(unix)]
use std::os::unix::fs::OpenOptionsExt;
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
/// Filename that stores the message history inside `~/.codex`.
const HISTORY_FILENAME: &str = "history.jsonl";
const MAX_RETRIES: usize = 10;
const RETRY_SLEEP: Duration = Duration::from_millis(100);
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct HistoryEntry {
pub session_id: String,
@@ -121,29 +116,6 @@ pub(crate) async fn append_entry(text: &str, session_id: &Uuid, config: &Config)
Ok(())
}
/// Attempt to acquire an exclusive advisory lock on `file`, retrying up to 10
/// times if the lock is currently held by another process. This prevents a
/// potential indefinite wait while still giving other writers some time to
/// finish their operation.
async fn acquire_exclusive_lock_with_retry(file: &std::fs::File) -> Result<()> {
use tokio::time::sleep;
for _ in 0..MAX_RETRIES {
match fs2::FileExt::try_lock_exclusive(file) {
Ok(()) => return Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
sleep(RETRY_SLEEP).await;
}
Err(e) => return Err(e),
}
}
Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"could not acquire exclusive lock on history file after multiple attempts",
))
}
/// Asynchronously fetch the history file's *identifier* (inode on Unix) and
/// the current number of entries by counting newline characters.
pub(crate) async fn history_metadata(config: &Config) -> (u64, usize) {
@@ -194,6 +166,8 @@ pub(crate) async fn history_metadata(config: &Config) -> (u64, usize) {
/// locking API.
#[cfg(unix)]
pub(crate) fn lookup(log_id: u64, offset: usize, config: &Config) -> Option<HistoryEntry> {
use crate::util::acquire_shared_lock_with_retry;
use std::fs::File;
use std::io::BufRead;
use std::io::BufReader;
use std::os::unix::fs::MetadataExt;
@@ -255,43 +229,3 @@ pub(crate) fn lookup(log_id: u64, offset: usize, config: &Config) -> Option<Hist
let _ = (log_id, offset, config);
None
}
#[cfg(unix)]
fn acquire_shared_lock_with_retry(file: &File) -> Result<()> {
for _ in 0..MAX_RETRIES {
match fs2::FileExt::try_lock_shared(file) {
Ok(()) => return Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::thread::sleep(RETRY_SLEEP);
}
Err(e) => return Err(e),
}
}
Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"could not acquire shared lock on history file after multiple attempts",
))
}
/// On Unix systems ensure the file permissions are `0o600` (rw-------). If the
/// permissions cannot be changed the error is propagated to the caller.
#[cfg(unix)]
async fn ensure_owner_only_permissions(file: &File) -> Result<()> {
let metadata = file.metadata()?;
let current_mode = metadata.permissions().mode() & 0o777;
if current_mode != 0o600 {
let mut perms = metadata.permissions();
perms.set_mode(0o600);
let perms_clone = perms.clone();
let file_clone = file.try_clone()?;
tokio::task::spawn_blocking(move || file_clone.set_permissions(perms_clone)).await??;
}
Ok(())
}
#[cfg(not(unix))]
async fn ensure_owner_only_permissions(_file: &File) -> Result<()> {
// For now, on non-Unix, simply succeed.
Ok(())
}

View File

@@ -79,6 +79,11 @@ pub enum Op {
/// operate deterministically.
cwd: std::path::PathBuf,
// Whether the user has indicated the current repo is trusted.
// If true, we persist this state to CodexState, so they don't get
// asked again.
trust_cwd: Option<bool>,
/// Path to a rollout file to resume from.
#[serde(skip_serializing_if = "Option::is_none")]
resume_path: Option<std::path::PathBuf>,

View File

@@ -1,3 +1,5 @@
use std::fs::File;
use std::io::Result;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
@@ -6,9 +8,15 @@ use rand::Rng;
use tokio::sync::Notify;
use tracing::debug;
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
const INITIAL_DELAY_MS: u64 = 200;
const BACKOFF_FACTOR: f64 = 1.3;
const MAX_RETRIES: usize = 10;
const RETRY_SLEEP: Duration = Duration::from_millis(100);
/// Make a CancellationToken that is fulfilled when SIGINT occurs.
pub fn notify_on_sigint() -> Arc<Notify> {
let notify = Arc::new(Notify::new());
@@ -63,3 +71,66 @@ pub fn is_inside_git_repo(base_dir: &Path) -> bool {
false
}
/// Attempt to acquire an exclusive advisory lock on `file`, retrying up to 10
/// times if the lock is currently held by another process. This prevents a
/// potential indefinite wait while still giving other writers some time to
/// finish their operation.
pub(crate) async fn acquire_exclusive_lock_with_retry(file: &std::fs::File) -> Result<()> {
use tokio::time::sleep;
for _ in 0..MAX_RETRIES {
match fs2::FileExt::try_lock_exclusive(file) {
Ok(()) => return Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
sleep(RETRY_SLEEP).await;
}
Err(e) => return Err(e),
}
}
Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"could not acquire exclusive lock on history file after multiple attempts",
))
}
#[cfg(unix)]
pub(crate) fn acquire_shared_lock_with_retry(file: &File) -> Result<()> {
for _ in 0..MAX_RETRIES {
match fs2::FileExt::try_lock_shared(file) {
Ok(()) => return Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::thread::sleep(RETRY_SLEEP);
}
Err(e) => return Err(e),
}
}
Err(std::io::Error::new(
std::io::ErrorKind::WouldBlock,
"could not acquire shared lock on history file after multiple attempts",
))
}
/// On Unix systems ensure the file permissions are `0o600` (rw-------). If the
/// permissions cannot be changed the error is propagated to the caller.
#[cfg(unix)]
pub(crate) async fn ensure_owner_only_permissions(file: &File) -> Result<()> {
let metadata = file.metadata()?;
let current_mode = metadata.permissions().mode() & 0o777;
if current_mode != 0o600 {
let mut perms = metadata.permissions();
perms.set_mode(0o600);
let perms_clone = perms.clone();
let file_clone = file.try_clone()?;
tokio::task::spawn_blocking(move || file_clone.set_permissions(perms_clone)).await??;
}
Ok(())
}
#[cfg(not(unix))]
pub(crate) async fn ensure_owner_only_permissions(_file: &File) -> Result<()> {
// For now, on non-Unix, simply succeed.
Ok(())
}

View File

@@ -0,0 +1,94 @@
#![cfg(unix)]
#![allow(clippy::expect_used)]
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use codex_core::codex_state::CodexState;
use codex_core::codex_state::Project;
use codex_core::codex_state::lookup_project;
use codex_core::codex_state::update_project;
use codex_core::config::Config;
use codex_core::config::ConfigOverrides;
use codex_core::config::ConfigToml;
use tempfile::TempDir;
/// Build a Config for tests with a temporary `codex_home` and a specific `cwd`.
fn make_config(codex_home: &TempDir, cwd: PathBuf) -> Config {
Config::load_from_base_config_with_overrides(
ConfigToml::default(),
ConfigOverrides {
cwd: Some(cwd),
..Default::default()
},
codex_home.path().to_path_buf(),
)
.expect("config construction for tests should succeed")
}
#[tokio::test]
async fn lookup_returns_default_for_missing_or_empty_state() {
// Given a fresh codex_home with no state file
let codex_home = TempDir::new().expect("create temp codex_home");
let project_dir = TempDir::new().expect("create temp project dir");
let cfg = make_config(&codex_home, project_dir.path().to_path_buf());
// When we lookup the project
let project = lookup_project(&cfg).await.expect("lookup should succeed");
// Then the default should be returned
assert!(!project.trusted);
// And the state file should have been created (empty file acceptable)
let state_path = cfg.codex_home.join("codex-state.json");
assert!(state_path.exists());
}
#[tokio::test]
async fn update_then_lookup_roundtrips_and_sets_permissions() {
let codex_home = TempDir::new().expect("create temp codex_home");
let project_dir = TempDir::new().expect("create temp project dir");
let cfg = make_config(&codex_home, project_dir.path().to_path_buf());
// Update project state to trusted = true
let p = Project { trusted: true };
update_project(&cfg, &p)
.await
.expect("update should succeed");
// Verify file exists with correct JSON structure
let state_path = cfg.codex_home.join("codex-state.json");
let contents = fs::read_to_string(&state_path).expect("read state file");
let state: CodexState = serde_json::from_str(&contents).expect("parse state JSON");
assert_eq!(
state,
CodexState {
projects: HashMap::from([(
project_dir.path().to_path_buf(),
Project { trusted: true }
)])
}
);
// Lookup should now return the updated value
let looked_up = lookup_project(&cfg).await.expect("lookup should succeed");
assert!(looked_up.trusted);
}
#[tokio::test]
async fn lookup_handles_invalid_json_gracefully() {
let codex_home = TempDir::new().expect("create temp codex_home");
let project_dir = TempDir::new().expect("create temp project dir");
let cfg = make_config(&codex_home, project_dir.path().to_path_buf());
// Write invalid JSON into the state file
let state_path = cfg.codex_home.join("codex-state.json");
fs::create_dir_all(&cfg.codex_home).expect("create codex_home dir");
fs::write(&state_path, b"this is not json").expect("write invalid json");
// Lookup should not error and should return defaults
let project = lookup_project(&cfg).await.expect("lookup should succeed");
assert!(!project.trusted);
}