Compare commits

...

8 Commits

Author SHA1 Message Date
jif-oai
743caea3a6 feat: add shell snapshot failure reason (#12233) 2026-02-19 13:49:12 +00:00
jif-oai
2daa3fd44f feat: sub-agent injection (#12152)
This PR adds parent-thread sub-agent completion notifications and change
the prompt of the model to prevent if from being confused
2026-02-19 11:32:10 +00:00
jif-oai
f298c48cc6 Adjust memories rollout defaults (#12231)
- Summary
- raise `DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP` to 16 so more
rollouts are allowed per startup
- lower `DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS` to 6 to make rollouts
eligible sooner
- Testing
  - Not run (not requested)
2026-02-19 10:52:43 +00:00
Eric Traut
227352257c Update docs links for feature flag notice (#12164)
Summary
- replace the stale `docs/config.md#feature-flags` reference in the
legacy feature notice with the canonical published URL
- align the deprecation notice test to expect the new link

This addresses #12123
2026-02-19 00:00:44 -08:00
viyatb-oai
4fe99b086f fix(linux-sandbox): mount /dev in bwrap sandbox (#12081)
## Summary
- Updates the Linux bubblewrap sandbox args to mount a minimal `/dev`
using `--dev /dev` instead of only binding `/dev/null`. tools needing
entropy (git, crypto libs, etc.) can fail.

- Changed mount order so `--dev /dev` is added before writable-root
`--bind` mounts, preserving writable `/dev/*` submounts like `/dev/shm`

## Why
Fixes sandboxed command failures when reading `/dev/urandom` (and
similar standard device-node access).


Fixes https://github.com/openai/codex/issues/12056
2026-02-18 23:27:32 -08:00
Matthew Zeng
18eb640a47 [apps] Update apps allowlist. (#12211)
- [x] Update apps allowlist.
2026-02-18 23:21:32 -08:00
Charley Cunningham
16c3c47535 Stabilize app-server detached review and running-resume tests (#12203)
## Summary
- stabilize
`thread_resume_rejoins_running_thread_even_with_override_mismatch` by
using a valid delayed second SSE response instead of an intentionally
truncated stream
- set `RUST_MIN_STACK=4194304` for spawned app-server test processes in
`McpProcess` to avoid stack-sensitive CI overflows in detached review
tests

## Why
- the thread-resume assertion could race with a mocked stream-disconnect
error and intermittently observe `systemError`
- detached review startup is stack-sensitive in some CI environments;
pinning a larger stack in the test harness removes that flake without
changing product behavior

## Validation
- `just fmt`
- `cargo test -p codex-app-server --test all
suite::v2::thread_resume::thread_resume_rejoins_running_thread_even_with_override_mismatch`
- `cargo test -p codex-app-server --test all
suite::v2::review::review_start_with_detached_delivery_returns_new_thread_id`
2026-02-18 19:05:35 -08:00
Charley Cunningham
7f3dbaeb25 state: enforce 10 MiB log caps for thread and threadless process logs (#12038)
## Summary
- enforce a 10 MiB cap per `thread_id` in state log storage
- enforce a 10 MiB cap per `process_uuid` for threadless (`thread_id IS
NULL`) logs
- scope pruning to only keys affected by the current insert batch
- add a cheap per-key `SUM(...)` precheck so windowed prune queries only
run for keys that are currently over the cap
- add SQLite indexes used by the pruning queries
- add focused runtime tests covering both pruning behaviors

## Why
This keeps log growth bounded by the intended partition semantics while
preserving a small, readable implementation localized to the existing
insert path.

## Local Latency Snapshot (No Truncation-Pressure Run)
Collected from session `019c734f-1d16-7002-9e00-c966c9fbbcae` using
local-only (uncommitted) instrumentation, while not specifically
benchmarking the truncation-heavy regime.

### Percentiles By Query (ms)
| query | count | p50 | p90 | p95 | p99 | max |
|---|---:|---:|---:|---:|---:|---:|
| `insert_logs.insert_batch` | 110 | 0.332 | 0.999 | 1.811 | 2.978 |
3.493 |
| `insert_logs.precheck.process` | 106 | 0.074 | 0.152 | 0.206 | 0.258 |
0.426 |
| `insert_logs.precheck.thread` | 73 | 0.118 | 0.206 | 0.253 | 1.025 |
1.025 |
| `insert_logs.prune.process` | 58 | 0.291 | 0.576 | 0.607 | 1.088 |
1.088 |
| `insert_logs.prune.thread` | 44 | 0.318 | 0.467 | 0.728 | 0.797 |
0.797 |
| `insert_logs.prune_total` | 110 | 0.488 | 0.976 | 1.237 | 1.593 |
1.684 |
| `insert_logs.total` | 110 | 1.315 | 2.889 | 3.623 | 5.739 | 5.961 |
| `insert_logs.tx_begin` | 110 | 0.133 | 0.235 | 0.282 | 0.412 | 0.546 |
| `insert_logs.tx_commit` | 110 | 0.259 | 0.689 | 0.772 | 1.065 | 1.080
|

### `insert_logs.total` Histogram (ms)
| bucket | count |
|---|---:|
| `<= 0.100` | 0 |
| `<= 0.250` | 0 |
| `<= 0.500` | 7 |
| `<= 1.000` | 33 |
| `<= 2.000` | 40 |
| `<= 5.000` | 28 |
| `<= 10.000` | 2 |
| `<= 20.000` | 0 |
| `<= 50.000` | 0 |
| `<= 100.000` | 0 |
| `> 100.000` | 0 |

## Local Latency Snapshot (Truncation-Heavy / Cap-Hit Regime)
Collected from a run where cap-hit behavior was frequent (`135/180`
insert calls), using local-only (uncommitted) instrumentation and a
temporary local cap of `10_000` bytes for stress testing (not the merged
`10 MiB` cap).

### Percentiles By Query (ms)
| query | count | p50 | p90 | p95 | p99 | max |
|---|---:|---:|---:|---:|---:|---:|
| `insert_logs.insert_batch` | 180 | 0.524 | 1.645 | 2.163 | 3.424 |
3.777 |
| `insert_logs.precheck.process` | 171 | 0.086 | 0.235 | 0.373 | 0.758 |
1.147 |
| `insert_logs.precheck.thread` | 100 | 0.105 | 0.251 | 0.291 | 1.176 |
1.622 |
| `insert_logs.prune.process` | 109 | 0.386 | 0.839 | 1.146 | 1.548 |
2.588 |
| `insert_logs.prune.thread` | 56 | 0.253 | 0.550 | 1.148 | 2.484 |
2.484 |
| `insert_logs.prune_total` | 180 | 0.511 | 1.221 | 1.695 | 4.548 |
5.512 |
| `insert_logs.total` | 180 | 1.631 | 3.902 | 5.103 | 8.901 | 9.095 |
| `insert_logs.total_cap_hit` | 135 | 1.876 | 4.501 | 5.547 | 8.902 |
9.096 |
| `insert_logs.total_no_cap_hit` | 45 | 0.520 | 1.700 | 2.079 | 3.294 |
3.294 |
| `insert_logs.tx_begin` | 180 | 0.109 | 0.253 | 0.287 | 1.088 | 1.406 |
| `insert_logs.tx_commit` | 180 | 0.267 | 0.813 | 1.170 | 2.497 | 2.574
|

### `insert_logs.total` Histogram (ms)
| bucket | count |
|---|---:|
| `<= 0.100` | 0 |
| `<= 0.250` | 0 |
| `<= 0.500` | 16 |
| `<= 1.000` | 39 |
| `<= 2.000` | 60 |
| `<= 5.000` | 54 |
| `<= 10.000` | 11 |
| `<= 20.000` | 0 |
| `<= 50.000` | 0 |
| `<= 100.000` | 0 |
| `> 100.000` | 0 |

### `insert_logs.total` Histogram When Cap Was Hit (ms)
| bucket | count |
|---|---:|
| `<= 0.100` | 0 |
| `<= 0.250` | 0 |
| `<= 0.500` | 0 |
| `<= 1.000` | 22 |
| `<= 2.000` | 51 |
| `<= 5.000` | 51 |
| `<= 10.000` | 11 |
| `<= 20.000` | 0 |
| `<= 50.000` | 0 |
| `<= 100.000` | 0 |
| `> 100.000` | 0 |

### Performance Takeaways
- Even in a cap-hit-heavy run (`75%` cap-hit calls), `insert_logs.total`
stays sub-10ms at p99 (`8.901ms`) and max (`9.095ms`).
- Calls that did **not** hit the cap are materially cheaper
(`insert_logs.total_no_cap_hit` p95 `2.079ms`) than cap-hit calls
(`insert_logs.total_cap_hit` p95 `5.547ms`).
- Compared to the earlier non-truncation-pressure run, overall
`insert_logs.total` rose from p95 `3.623ms` to p95 `5.103ms`
(+`1.48ms`), indicating bounded overhead when pruning is active.
- This truncation-heavy run used an intentionally low local cap for
stress testing; with the real 10 MiB cap, cap-hit frequency should be
much lower in normal sessions.

## Testing
- `just fmt` (in `codex-rs`)
- `cargo test -p codex-state` (in `codex-rs`)
2026-02-18 17:08:08 -08:00
23 changed files with 1209 additions and 97 deletions

View File

@@ -581,6 +581,17 @@ jobs:
tool: nextest
version: 0.9.103
- name: Enable unprivileged user namespaces (Linux)
if: runner.os == 'Linux'
run: |
# Required for bubblewrap to work on Linux CI runners.
sudo sysctl -w kernel.unprivileged_userns_clone=1
# Ubuntu 24.04+ can additionally gate unprivileged user namespaces
# behind AppArmor.
if sudo sysctl -a 2>/dev/null | grep -q '^kernel.apparmor_restrict_unprivileged_userns'; then
sudo sysctl -w kernel.apparmor_restrict_unprivileged_userns=0
fi
- name: tests
id: test
run: cargo nextest run --all-features --no-fail-fast --target ${{ matrix.target }} --cargo-profile ci-test --timings

View File

@@ -105,6 +105,10 @@ impl McpProcess {
cmd.stderr(Stdio::piped());
cmd.env("CODEX_HOME", codex_home);
cmd.env("RUST_LOG", "debug");
// Bazel/Linux workers can run with smaller default thread stacks, which makes
// tokio-runtime-worker stack overflows more likely in app-server integration tests.
// Pin a larger minimum stack for the spawned test server process.
cmd.env("RUST_MIN_STACK", "4194304");
cmd.env_remove(CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR);
for (k, v) in env_overrides {

View File

@@ -507,14 +507,19 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul
#[tokio::test]
async fn thread_resume_rejoins_running_thread_even_with_override_mismatch() -> Result<()> {
let server = responses::start_mock_server().await;
let first_body = responses::sse(vec![
let first_response = responses::sse_response(responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_assistant_message("msg-1", "Done"),
responses::ev_completed("resp-1"),
]);
let second_body = responses::sse(vec![responses::ev_response_created("resp-2")]);
]));
let second_response = responses::sse_response(responses::sse(vec![
responses::ev_response_created("resp-2"),
responses::ev_assistant_message("msg-2", "Done"),
responses::ev_completed("resp-2"),
]))
.set_delay(std::time::Duration::from_millis(500));
let _response_mock =
responses::mount_sse_sequence(&server, vec![first_body, second_body]).await;
responses::mount_response_sequence(&server, vec![first_response, second_response]).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;

View File

@@ -4,6 +4,8 @@ use std::sync::LazyLock;
use std::sync::Mutex as StdMutex;
use codex_core::config::Config;
use codex_core::default_client::is_first_party_chat_originator;
use codex_core::default_client::originator;
use codex_core::features::Feature;
use codex_core::token_data::TokenData;
use serde::Deserialize;
@@ -460,23 +462,34 @@ const DISALLOWED_CONNECTOR_IDS: &[&str] = &[
"connector_69272cb413a081919685ec3c88d1744e",
"connector_0f9c9d4592e54d0a9a12b3f44a1e2010",
];
const FIRST_PARTY_CHAT_DISALLOWED_CONNECTOR_IDS: &[&str] =
&["connector_0f9c9d4592e54d0a9a12b3f44a1e2010"];
const DISALLOWED_CONNECTOR_PREFIX: &str = "connector_openai_";
fn filter_disallowed_connectors(connectors: Vec<AppInfo>) -> Vec<AppInfo> {
filter_disallowed_connectors_for_originator(connectors, originator().value.as_str())
}
fn filter_disallowed_connectors_for_originator(
connectors: Vec<AppInfo>,
originator_value: &str,
) -> Vec<AppInfo> {
let disallowed_connector_ids = if is_first_party_chat_originator(originator_value) {
FIRST_PARTY_CHAT_DISALLOWED_CONNECTOR_IDS
} else {
DISALLOWED_CONNECTOR_IDS
};
connectors
.into_iter()
.filter(is_connector_allowed)
.filter(|connector| is_connector_allowed(connector, disallowed_connector_ids))
.collect()
}
fn is_connector_allowed(connector: &AppInfo) -> bool {
fn is_connector_allowed(connector: &AppInfo, disallowed_connector_ids: &[&str]) -> bool {
let connector_id = connector.id.as_str();
if connector_id.starts_with(DISALLOWED_CONNECTOR_PREFIX)
|| DISALLOWED_CONNECTOR_IDS.contains(&connector_id)
{
return false;
}
true
!connector_id.starts_with(DISALLOWED_CONNECTOR_PREFIX)
&& !disallowed_connector_ids.contains(&connector_id)
}
#[cfg(test)]
@@ -523,7 +536,7 @@ mod tests {
}
#[test]
fn filters_openai_connectors() {
fn filters_openai_prefixed_connectors() {
let filtered = filter_disallowed_connectors(vec![
app("connector_openai_foo"),
app("connector_openai_bar"),
@@ -541,6 +554,22 @@ mod tests {
assert_eq!(filtered, vec![app("delta")]);
}
#[test]
fn first_party_chat_originator_filters_target_and_openai_prefixed_connectors() {
let filtered = filter_disallowed_connectors_for_originator(
vec![
app("connector_openai_foo"),
app("asdk_app_6938a94a61d881918ef32cb999ff937c"),
app("connector_0f9c9d4592e54d0a9a12b3f44a1e2010"),
],
"codex_atlas",
);
assert_eq!(
filtered,
vec![app("asdk_app_6938a94a61d881918ef32cb999ff937c"),]
);
}
fn merged_app(id: &str, is_accessible: bool) -> AppInfo {
AppInfo {
id: id.to_string(),

View File

@@ -1,11 +1,14 @@
use crate::agent::AgentStatus;
use crate::agent::guards::Guards;
use crate::agent::status::is_final;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::session_prefix::format_subagent_notification_message;
use crate::thread_manager::ThreadManagerState;
use codex_protocol::ThreadId;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TokenUsage;
use codex_protocol::user_input::UserInput;
use std::path::PathBuf;
@@ -46,6 +49,7 @@ impl AgentControl {
) -> CodexResult<ThreadId> {
let state = self.upgrade()?;
let reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
let notification_source = session_source.clone();
// The same `AgentControl` is sent to spawn the thread.
let new_thread = match session_source {
@@ -64,6 +68,7 @@ impl AgentControl {
state.notify_thread_created(new_thread.thread_id);
self.send_input(new_thread.thread_id, items).await?;
self.maybe_start_completion_watcher(new_thread.thread_id, notification_source);
Ok(new_thread.thread_id)
}
@@ -77,6 +82,7 @@ impl AgentControl {
) -> CodexResult<ThreadId> {
let state = self.upgrade()?;
let reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
let notification_source = session_source.clone();
let resumed_thread = state
.resume_thread_from_rollout_with_source(
@@ -90,6 +96,7 @@ impl AgentControl {
// Resumed threads are re-registered in-memory and need the same listener
// attachment path as freshly spawned threads.
state.notify_thread_created(resumed_thread.thread_id);
self.maybe_start_completion_watcher(resumed_thread.thread_id, Some(notification_source));
Ok(resumed_thread.thread_id)
}
@@ -164,13 +171,60 @@ impl AgentControl {
thread.total_token_usage().await
}
/// Starts a detached watcher for sub-agents spawned from another thread.
///
/// This is only enabled for `SubAgentSource::ThreadSpawn`, where a parent thread exists and
/// can receive completion notifications.
fn maybe_start_completion_watcher(
&self,
child_thread_id: ThreadId,
session_source: Option<SessionSource>,
) {
let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id, ..
})) = session_source
else {
return;
};
let control = self.clone();
tokio::spawn(async move {
let mut status_rx = match control.subscribe_status(child_thread_id).await {
Ok(rx) => rx,
Err(_) => return,
};
let mut status = status_rx.borrow().clone();
while !is_final(&status) {
if status_rx.changed().await.is_err() {
status = control.get_status(child_thread_id).await;
break;
}
status = status_rx.borrow().clone();
}
if !is_final(&status) {
return;
}
let Ok(state) = control.upgrade() else {
return;
};
let Ok(parent_thread) = state.get_thread(parent_thread_id).await else {
return;
};
parent_thread
.inject_user_message_without_turn(format_subagent_notification_message(
&child_thread_id.to_string(),
&status,
))
.await;
});
}
fn upgrade(&self) -> CodexResult<Arc<ThreadManagerState>> {
self.manager
.upgrade()
.ok_or_else(|| CodexErr::UnsupportedOperation("thread manager dropped".to_string()))
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -180,16 +234,24 @@ mod tests {
use crate::agent::agent_status_from_event;
use crate::config::Config;
use crate::config::ConfigBuilder;
use crate::session_prefix::SUBAGENT_NOTIFICATION_OPEN_TAG;
use assert_matches::assert_matches;
use codex_protocol::config_types::ModeKind;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnCompleteEvent;
use codex_protocol::protocol::TurnStartedEvent;
use pretty_assertions::assert_eq;
use tempfile::TempDir;
use tokio::time::Duration;
use tokio::time::sleep;
use tokio::time::timeout;
use toml::Value as TomlValue;
async fn test_config_with_cli_overrides(
@@ -250,6 +312,42 @@ mod tests {
}
}
fn has_subagent_notification(history_items: &[ResponseItem]) -> bool {
history_items.iter().any(|item| {
let ResponseItem::Message { role, content, .. } = item else {
return false;
};
if role != "user" {
return false;
}
content.iter().any(|content_item| match content_item {
ContentItem::InputText { text } | ContentItem::OutputText { text } => {
text.contains(SUBAGENT_NOTIFICATION_OPEN_TAG)
}
ContentItem::InputImage { .. } => false,
})
})
}
async fn wait_for_subagent_notification(parent_thread: &Arc<CodexThread>) -> bool {
let wait = async {
loop {
let history_items = parent_thread
.codex
.session
.clone_history()
.await
.raw_items()
.to_vec();
if has_subagent_notification(&history_items) {
return true;
}
sleep(Duration::from_millis(25)).await;
}
};
timeout(Duration::from_secs(2), wait).await.is_ok()
}
#[tokio::test]
async fn send_input_errors_when_manager_dropped() {
let control = AgentControl::default();
@@ -683,4 +781,35 @@ mod tests {
.await
.expect("shutdown resumed thread");
}
#[tokio::test]
async fn spawn_child_completion_notifies_parent_history() {
let harness = AgentControlHarness::new().await;
let (parent_thread_id, parent_thread) = harness.start_thread().await;
let child_thread_id = harness
.control
.spawn_agent(
harness.config.clone(),
text_input("hello child"),
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
parent_thread_id,
depth: 1,
})),
)
.await
.expect("child spawn should succeed");
let child_thread = harness
.manager
.get_thread(child_thread_id)
.await
.expect("child thread should exist");
let _ = child_thread
.submit(Op::Shutdown {})
.await
.expect("child shutdown should submit");
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);
}
}

View File

@@ -8,6 +8,9 @@ use crate::protocol::Event;
use crate::protocol::Op;
use crate::protocol::Submission;
use codex_protocol::config_types::Personality;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::SandboxPolicy;
@@ -32,7 +35,7 @@ pub struct ThreadConfigSnapshot {
}
pub struct CodexThread {
codex: Codex,
pub(crate) codex: Codex,
rollout_path: Option<PathBuf>,
_watch_registration: WatchRegistration,
}
@@ -85,6 +88,33 @@ impl CodexThread {
self.codex.session.total_token_usage().await
}
/// Records a user-role session-prefix message without creating a new user turn boundary.
pub(crate) async fn inject_user_message_without_turn(&self, message: String) {
let pending_item = ResponseInputItem::Message {
role: "user".to_string(),
content: vec![ContentItem::InputText { text: message }],
};
let pending_items = vec![pending_item];
let Err(items_without_active_turn) = self
.codex
.session
.inject_response_items(pending_items)
.await
else {
return;
};
let turn_context = self.codex.session.new_default_turn().await;
let items: Vec<ResponseItem> = items_without_active_turn
.into_iter()
.map(ResponseItem::from)
.collect();
self.codex
.session
.record_conversation_items(turn_context.as_ref(), &items)
.await;
}
pub fn rollout_path(&self) -> Option<PathBuf> {
self.rollout_path.clone()
}

View File

@@ -23,9 +23,9 @@ use serde::Serialize;
use serde::de::Error as SerdeError;
pub const DEFAULT_OTEL_ENVIRONMENT: &str = "dev";
pub const DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 8;
pub const DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 16;
pub const DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS: i64 = 30;
pub const DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS: i64 = 12;
pub const DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS: i64 = 6;
pub const DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL: usize = 1_024;
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema)]

View File

@@ -571,6 +571,9 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
"<skill>\n<name>demo</name>\n<path>skills/demo/SKILL.md</path>\nbody\n</skill>",
),
user_input_text_msg("<user_shell_command>echo 42</user_shell_command>"),
user_input_text_msg(
"<subagent_notification>{\"agent_id\":\"a\",\"status\":\"completed\"}</subagent_notification>",
),
user_input_text_msg("turn 1 user"),
assistant_msg("turn 1 assistant"),
user_input_text_msg("turn 2 user"),
@@ -591,6 +594,9 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
"<skill>\n<name>demo</name>\n<path>skills/demo/SKILL.md</path>\nbody\n</skill>",
),
user_input_text_msg("<user_shell_command>echo 42</user_shell_command>"),
user_input_text_msg(
"<subagent_notification>{\"agent_id\":\"a\",\"status\":\"completed\"}</subagent_notification>",
),
user_input_text_msg("turn 1 user"),
assistant_msg("turn 1 assistant"),
];
@@ -610,6 +616,9 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
"<skill>\n<name>demo</name>\n<path>skills/demo/SKILL.md</path>\nbody\n</skill>",
),
user_input_text_msg("<user_shell_command>echo 42</user_shell_command>"),
user_input_text_msg(
"<subagent_notification>{\"agent_id\":\"a\",\"status\":\"completed\"}</subagent_notification>",
),
];
let mut history = create_history_with_items(vec![
@@ -622,6 +631,9 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
"<skill>\n<name>demo</name>\n<path>skills/demo/SKILL.md</path>\nbody\n</skill>",
),
user_input_text_msg("<user_shell_command>echo 42</user_shell_command>"),
user_input_text_msg(
"<subagent_notification>{\"agent_id\":\"a\",\"status\":\"completed\"}</subagent_notification>",
),
user_input_text_msg("turn 1 user"),
assistant_msg("turn 1 assistant"),
user_input_text_msg("turn 2 user"),
@@ -640,6 +652,9 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
"<skill>\n<name>demo</name>\n<path>skills/demo/SKILL.md</path>\nbody\n</skill>",
),
user_input_text_msg("<user_shell_command>echo 42</user_shell_command>"),
user_input_text_msg(
"<subagent_notification>{\"agent_id\":\"a\",\"status\":\"completed\"}</subagent_notification>",
),
user_input_text_msg("turn 1 user"),
assistant_msg("turn 1 assistant"),
user_input_text_msg("turn 2 user"),

View File

@@ -114,6 +114,10 @@ pub fn is_first_party_originator(originator_value: &str) -> bool {
|| originator_value.starts_with("Codex ")
}
pub fn is_first_party_chat_originator(originator_value: &str) -> bool {
originator_value == "codex_atlas" || originator_value == "codex_chatgpt_desktop"
}
pub fn get_codex_user_agent() -> String {
let build_version = env!("CARGO_PKG_VERSION");
let os_info = os_info::get();
@@ -234,6 +238,17 @@ mod tests {
assert_eq!(is_first_party_originator("Other"), false);
}
#[test]
fn is_first_party_chat_originator_matches_known_values() {
assert_eq!(is_first_party_chat_originator("codex_atlas"), true);
assert_eq!(
is_first_party_chat_originator("codex_chatgpt_desktop"),
true
);
assert_eq!(is_first_party_chat_originator(DEFAULT_ORIGINATOR), false);
assert_eq!(is_first_party_chat_originator("codex_vscode"), false);
}
#[tokio::test]
async fn test_create_client_sets_default_headers() {
skip_if_no_network!();

View File

@@ -370,7 +370,7 @@ fn legacy_usage_notice(alias: &str, feature: Feature) -> (String, Option<String>
None
} else {
Some(format!(
"Enable it with `--enable {canonical}` or `[features].{canonical}` in config.toml. See https://github.com/openai/codex/blob/main/docs/config.md#feature-flags for details."
"Enable it with `--enable {canonical}` or `[features].{canonical}` in config.toml. See https://developers.openai.com/codex/config-basic#feature-flags for details."
))
};
(summary, details)

View File

@@ -1,3 +1,5 @@
use codex_protocol::protocol::AgentStatus;
/// Helpers for identifying model-visible "session prefix" messages.
///
/// A session prefix is a user-role message that carries configuration or state needed by
@@ -6,10 +8,41 @@
/// boundaries.
pub(crate) const ENVIRONMENT_CONTEXT_OPEN_TAG: &str = "<environment_context>";
pub(crate) const TURN_ABORTED_OPEN_TAG: &str = "<turn_aborted>";
pub(crate) const SUBAGENT_NOTIFICATION_OPEN_TAG: &str = "<subagent_notification>";
pub(crate) const SUBAGENT_NOTIFICATION_CLOSE_TAG: &str = "</subagent_notification>";
fn starts_with_ascii_case_insensitive(text: &str, prefix: &str) -> bool {
text.get(..prefix.len())
.is_some_and(|candidate| candidate.eq_ignore_ascii_case(prefix))
}
/// Returns true if `text` starts with a session prefix marker (case-insensitive).
pub(crate) fn is_session_prefix(text: &str) -> bool {
let trimmed = text.trim_start();
let lowered = trimmed.to_ascii_lowercase();
lowered.starts_with(ENVIRONMENT_CONTEXT_OPEN_TAG) || lowered.starts_with(TURN_ABORTED_OPEN_TAG)
starts_with_ascii_case_insensitive(trimmed, ENVIRONMENT_CONTEXT_OPEN_TAG)
|| starts_with_ascii_case_insensitive(trimmed, TURN_ABORTED_OPEN_TAG)
|| starts_with_ascii_case_insensitive(trimmed, SUBAGENT_NOTIFICATION_OPEN_TAG)
}
pub(crate) fn format_subagent_notification_message(agent_id: &str, status: &AgentStatus) -> String {
let payload_json = serde_json::json!({
"agent_id": agent_id,
"status": status,
})
.to_string();
format!("{SUBAGENT_NOTIFICATION_OPEN_TAG}\n{payload_json}\n{SUBAGENT_NOTIFICATION_CLOSE_TAG}")
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn is_session_prefix_is_case_insensitive() {
assert_eq!(
is_session_prefix("<SUBAGENT_NOTIFICATION>{}</subagent_notification>"),
true
);
}
}

View File

@@ -95,10 +95,15 @@ impl ShellSnapshot {
)
.await
.map(Arc::new);
let success = if snapshot.is_some() { "true" } else { "false" };
let _ = timer.map(|timer| timer.record(&[("success", success)]));
otel_manager.counter("codex.shell_snapshot", 1, &[("success", success)]);
let _ = shell_snapshot_tx.send(snapshot);
let success = snapshot.is_ok();
let success_tag = if success { "true" } else { "false" };
let _ = timer.map(|timer| timer.record(&[("success", success_tag)]));
let mut counter_tags = vec![("success", success_tag)];
if let Some(failure_reason) = snapshot.as_ref().err() {
counter_tags.push(("failure_reason", *failure_reason));
}
otel_manager.counter("codex.shell_snapshot", 1, &counter_tags);
let _ = shell_snapshot_tx.send(snapshot.ok());
}
.instrument(snapshot_span),
);
@@ -109,7 +114,7 @@ impl ShellSnapshot {
session_id: ThreadId,
session_cwd: &Path,
shell: &Shell,
) -> Option<Self> {
) -> std::result::Result<Self, &'static str> {
// File to store the snapshot
let extension = match shell.shell_type {
ShellType::PowerShell => "ps1",
@@ -129,32 +134,31 @@ impl ShellSnapshot {
});
// Make the new snapshot.
let snapshot =
match write_shell_snapshot(shell.shell_type.clone(), &path, session_cwd).await {
Ok(path) => {
tracing::info!("Shell snapshot successfully created: {}", path.display());
Some(Self {
path,
cwd: session_cwd.to_path_buf(),
})
}
Err(err) => {
tracing::warn!(
"Failed to create shell snapshot for {}: {err:?}",
shell.name()
);
None
}
};
let path = match write_shell_snapshot(shell.shell_type.clone(), &path, session_cwd).await {
Ok(path) => {
tracing::info!("Shell snapshot successfully created: {}", path.display());
path
}
Err(err) => {
tracing::warn!(
"Failed to create shell snapshot for {}: {err:?}",
shell.name()
);
return Err("write_failed");
}
};
if let Some(snapshot) = snapshot.as_ref()
&& let Err(err) = validate_snapshot(shell, &snapshot.path, session_cwd).await
{
let snapshot = Self {
path,
cwd: session_cwd.to_path_buf(),
};
if let Err(err) = validate_snapshot(shell, &snapshot.path, session_cwd).await {
tracing::error!("Shell snapshot validation failed: {err:?}");
return None;
return Err("validation_failed");
}
snapshot
Ok(snapshot)
}
}

View File

@@ -427,7 +427,7 @@ mod resume_agent {
}
}
mod wait {
pub(crate) mod wait {
use super::*;
use crate::agent::status::is_final;
use futures::FutureExt;
@@ -447,10 +447,10 @@ mod wait {
timeout_ms: Option<i64>,
}
#[derive(Debug, Serialize)]
struct WaitResult {
status: HashMap<ThreadId, AgentStatus>,
timed_out: bool,
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
pub(crate) struct WaitResult {
pub(crate) status: HashMap<ThreadId, AgentStatus>,
pub(crate) timed_out: bool,
}
pub async fn handle(
@@ -1462,12 +1462,6 @@ mod tests {
);
}
#[derive(Debug, Deserialize, PartialEq, Eq)]
struct WaitResult {
status: HashMap<ThreadId, AgentStatus>,
timed_out: bool,
}
#[tokio::test]
async fn wait_rejects_non_positive_timeout() {
let (session, turn) = make_session_and_context().await;
@@ -1553,11 +1547,11 @@ mod tests {
else {
panic!("expected function output");
};
let result: WaitResult =
let result: wait::WaitResult =
serde_json::from_str(&content).expect("wait result should be json");
assert_eq!(
result,
WaitResult {
wait::WaitResult {
status: HashMap::from([
(id_a, AgentStatus::NotFound),
(id_b, AgentStatus::NotFound),
@@ -1597,11 +1591,11 @@ mod tests {
else {
panic!("expected function output");
};
let result: WaitResult =
let result: wait::WaitResult =
serde_json::from_str(&content).expect("wait result should be json");
assert_eq!(
result,
WaitResult {
wait::WaitResult {
status: HashMap::new(),
timed_out: true
}
@@ -1694,11 +1688,11 @@ mod tests {
else {
panic!("expected function output");
};
let result: WaitResult =
let result: wait::WaitResult =
serde_json::from_str(&content).expect("wait result should be json");
assert_eq!(
result,
WaitResult {
wait::WaitResult {
status: HashMap::from([(agent_id, AgentStatus::Shutdown)]),
timed_out: false
}

View File

@@ -646,7 +646,7 @@ fn create_wait_tool() -> ToolSpec {
ToolSpec::Function(ResponsesApiTool {
name: "wait".to_string(),
description: "Wait for agents to reach a final status. Completed statuses may include the agent's final message. Returns empty status when timed out."
description: "Wait for agents to reach a final status. Completed statuses may include the agent's final message. Returns empty status when timed out. Once the agent reaches his final status, a notification message will be received containing the same completed status."
.to_string(),
strict: false,
parameters: JsonSchema::Object {

View File

@@ -50,7 +50,7 @@ async fn emits_deprecation_notice_for_legacy_feature_flag() -> anyhow::Result<()
assert_eq!(
details.as_deref(),
Some(
"Enable it with `--enable unified_exec` or `[features].unified_exec` in config.toml. See https://github.com/openai/codex/blob/main/docs/config.md#feature-flags for details."
"Enable it with `--enable unified_exec` or `[features].unified_exec` in config.toml. See https://developers.openai.com/codex/config-basic#feature-flags for details."
),
);

View File

@@ -114,6 +114,7 @@ mod skills;
mod sqlite_state;
mod stream_error_allows_next_turn;
mod stream_no_completed;
mod subagent_notifications;
mod text_encoding_fix;
mod tool_harness;
mod tool_parallelism;

View File

@@ -0,0 +1,196 @@
use anyhow::Result;
use codex_core::features::Feature;
use core_test_support::responses::ResponsesRequest;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_response_once_match;
use core_test_support::responses::mount_sse_once_match;
use core_test_support::responses::sse;
use core_test_support::responses::sse_response;
use core_test_support::responses::start_mock_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use serde_json::json;
use std::time::Duration;
use tokio::time::Instant;
use tokio::time::sleep;
use wiremock::MockServer;
const SPAWN_CALL_ID: &str = "spawn-call-1";
const TURN_1_PROMPT: &str = "spawn a child and continue";
const TURN_2_NO_WAIT_PROMPT: &str = "follow up without wait";
const CHILD_PROMPT: &str = "child: do work";
fn body_contains(req: &wiremock::Request, text: &str) -> bool {
let is_zstd = req
.headers
.get("content-encoding")
.and_then(|value| value.to_str().ok())
.is_some_and(|value| {
value
.split(',')
.any(|entry| entry.trim().eq_ignore_ascii_case("zstd"))
});
let bytes = if is_zstd {
zstd::stream::decode_all(std::io::Cursor::new(&req.body)).ok()
} else {
Some(req.body.clone())
};
bytes
.and_then(|body| String::from_utf8(body).ok())
.is_some_and(|body| body.contains(text))
}
fn has_subagent_notification(req: &ResponsesRequest) -> bool {
req.message_input_texts("user")
.iter()
.any(|text| text.contains("<subagent_notification>"))
}
async fn wait_for_spawned_thread_id(test: &TestCodex) -> Result<String> {
let deadline = Instant::now() + Duration::from_secs(2);
loop {
let ids = test.thread_manager.list_thread_ids().await;
if let Some(spawned_id) = ids
.iter()
.find(|id| **id != test.session_configured.session_id)
{
return Ok(spawned_id.to_string());
}
if Instant::now() >= deadline {
anyhow::bail!("timed out waiting for spawned thread id");
}
sleep(Duration::from_millis(10)).await;
}
}
async fn wait_for_requests(
mock: &core_test_support::responses::ResponseMock,
) -> Result<Vec<ResponsesRequest>> {
let deadline = Instant::now() + Duration::from_secs(2);
loop {
let requests = mock.requests();
if !requests.is_empty() {
return Ok(requests);
}
if Instant::now() >= deadline {
anyhow::bail!("expected at least 1 request, got {}", requests.len());
}
sleep(Duration::from_millis(10)).await;
}
}
async fn setup_turn_one_with_spawned_child(
server: &MockServer,
child_response_delay: Option<Duration>,
) -> Result<(TestCodex, String)> {
let spawn_args = serde_json::to_string(&json!({
"message": CHILD_PROMPT,
}))?;
mount_sse_once_match(
server,
|req: &wiremock::Request| body_contains(req, TURN_1_PROMPT),
sse(vec![
ev_response_created("resp-turn1-1"),
ev_function_call(SPAWN_CALL_ID, "spawn_agent", &spawn_args),
ev_completed("resp-turn1-1"),
]),
)
.await;
let child_sse = sse(vec![
ev_response_created("resp-child-1"),
ev_assistant_message("msg-child-1", "child done"),
ev_completed("resp-child-1"),
]);
let child_request_log = if let Some(delay) = child_response_delay {
mount_response_once_match(
server,
|req: &wiremock::Request| {
body_contains(req, CHILD_PROMPT) && !body_contains(req, SPAWN_CALL_ID)
},
sse_response(child_sse).set_delay(delay),
)
.await
} else {
mount_sse_once_match(
server,
|req: &wiremock::Request| {
body_contains(req, CHILD_PROMPT) && !body_contains(req, SPAWN_CALL_ID)
},
child_sse,
)
.await
};
let _turn1_followup = mount_sse_once_match(
server,
|req: &wiremock::Request| body_contains(req, SPAWN_CALL_ID),
sse(vec![
ev_response_created("resp-turn1-2"),
ev_assistant_message("msg-turn1-2", "parent done"),
ev_completed("resp-turn1-2"),
]),
)
.await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::Collab);
});
let test = builder.build(server).await?;
test.submit_turn(TURN_1_PROMPT).await?;
if child_response_delay.is_none() {
let _ = wait_for_requests(&child_request_log).await?;
let rollout_path = test
.codex
.rollout_path()
.ok_or_else(|| anyhow::anyhow!("expected parent rollout path"))?;
let deadline = Instant::now() + Duration::from_secs(6);
loop {
let has_notification = tokio::fs::read_to_string(&rollout_path)
.await
.is_ok_and(|rollout| rollout.contains("<subagent_notification>"));
if has_notification {
break;
}
if Instant::now() >= deadline {
anyhow::bail!(
"timed out waiting for parent rollout to include subagent notification"
);
}
sleep(Duration::from_millis(10)).await;
}
}
let spawned_id = wait_for_spawned_thread_id(&test).await?;
Ok((test, spawned_id))
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn subagent_notification_is_included_without_wait() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let (test, _spawned_id) = setup_turn_one_with_spawned_child(&server, None).await?;
let turn2 = mount_sse_once_match(
&server,
|req: &wiremock::Request| body_contains(req, TURN_2_NO_WAIT_PROMPT),
sse(vec![
ev_response_created("resp-turn2-1"),
ev_assistant_message("msg-turn2-1", "no wait path"),
ev_completed("resp-turn2-1"),
]),
)
.await;
test.submit_turn(TURN_2_NO_WAIT_PROMPT).await?;
let turn2_requests = wait_for_requests(&turn2).await?;
assert!(turn2_requests.iter().any(has_subagent_notification));
Ok(())
}

View File

@@ -135,11 +135,12 @@ fn create_bwrap_flags(
///
/// The mount order is important:
/// 1. `--ro-bind / /` makes the entire filesystem read-only.
/// 2. `--bind <root> <root>` re-enables writes for allowed roots.
/// 3. `--ro-bind <subpath> <subpath>` re-applies read-only protections under
/// 2. `--dev /dev` mounts a minimal writable `/dev` with standard device nodes
/// (including `/dev/urandom`) even under a read-only root.
/// 3. `--bind <root> <root>` re-enables writes for allowed roots, including
/// writable subpaths under `/dev` (for example, `/dev/shm`).
/// 4. `--ro-bind <subpath> <subpath>` re-applies read-only protections under
/// those writable roots so protected subpaths win.
/// 4. `--dev-bind /dev/null /dev/null` preserves the common sink even under a
/// read-only root.
fn create_filesystem_args(sandbox_policy: &SandboxPolicy, cwd: &Path) -> Result<Vec<String>> {
if !sandbox_policy.has_full_disk_read_access() {
return Err(CodexErr::UnsupportedOperation(
@@ -151,12 +152,18 @@ fn create_filesystem_args(sandbox_policy: &SandboxPolicy, cwd: &Path) -> Result<
let writable_roots = sandbox_policy.get_writable_roots_with_cwd(cwd);
ensure_mount_targets_exist(&writable_roots)?;
let mut args = Vec::new();
// Read-only root, then selectively re-enable writes.
args.push("--ro-bind".to_string());
args.push("/".to_string());
args.push("/".to_string());
// Read-only root, then mount a minimal device tree.
// In bubblewrap (`bubblewrap.c`, `SETUP_MOUNT_DEV`), `--dev /dev` creates
// the standard minimal nodes: null, zero, full, random, urandom, and tty.
// `/dev` must be mounted before writable roots so explicit `/dev/*`
// writable binds remain visible.
let mut args = vec![
"--ro-bind".to_string(),
"/".to_string(),
"/".to_string(),
"--dev".to_string(),
"/dev".to_string(),
];
for writable_root in &writable_roots {
let root = writable_root.root.as_path();
@@ -180,12 +187,15 @@ fn create_filesystem_args(sandbox_policy: &SandboxPolicy, cwd: &Path) -> Result<
}
if !subpath.exists() {
if let Some(first_missing) = find_first_non_existent_component(&subpath)
&& is_within_allowed_write_paths(&first_missing, &allowed_write_paths)
// Keep this in the per-subpath loop: each protected subpath can have
// a different first missing component that must be blocked
// independently (for example, `/repo/.git` vs `/repo/.codex`).
if let Some(first_missing_component) = find_first_non_existent_component(&subpath)
&& is_within_allowed_write_paths(&first_missing_component, &allowed_write_paths)
{
args.push("--ro-bind".to_string());
args.push("/dev/null".to_string());
args.push(path_to_string(&first_missing));
args.push(path_to_string(&first_missing_component));
}
continue;
}
@@ -197,11 +207,6 @@ fn create_filesystem_args(sandbox_policy: &SandboxPolicy, cwd: &Path) -> Result<
}
}
// Ensure `/dev/null` remains usable regardless of the root bind.
args.push("--dev-bind".to_string());
args.push("/dev/null".to_string());
args.push("/dev/null".to_string());
Ok(args)
}

View File

@@ -307,7 +307,9 @@ fn close_fd_or_panic(fd: libc::c_int, context: &str) {
fn is_proc_mount_failure(stderr: &str) -> bool {
stderr.contains("Can't mount proc")
&& stderr.contains("/newroot/proc")
&& stderr.contains("Invalid argument")
&& (stderr.contains("Invalid argument")
|| stderr.contains("Operation not permitted")
|| stderr.contains("Permission denied"))
}
/// Build the inner command that applies seccomp after bubblewrap.
@@ -381,6 +383,18 @@ mod tests {
assert_eq!(is_proc_mount_failure(stderr), true);
}
#[test]
fn detects_proc_mount_operation_not_permitted_failure() {
let stderr = "bwrap: Can't mount proc on /newroot/proc: Operation not permitted";
assert_eq!(is_proc_mount_failure(stderr), true);
}
#[test]
fn detects_proc_mount_permission_denied_failure() {
let stderr = "bwrap: Can't mount proc on /newroot/proc: Permission denied";
assert_eq!(is_proc_mount_failure(stderr), true);
}
#[test]
fn ignores_non_proc_mount_errors() {
let stderr = "bwrap: Can't bind mount /dev/null: Operation not permitted";
@@ -407,9 +421,8 @@ mod tests {
"--ro-bind".to_string(),
"/".to_string(),
"/".to_string(),
"--dev-bind".to_string(),
"/dev/null".to_string(),
"/dev/null".to_string(),
"--dev".to_string(),
"/dev".to_string(),
"--unshare-pid".to_string(),
"--proc".to_string(),
"/proc".to_string(),

View File

@@ -56,7 +56,7 @@ async fn run_cmd_output(
writable_roots: &[PathBuf],
timeout_ms: u64,
) -> codex_core::exec::ExecToolCallOutput {
run_cmd_result_with_writable_roots(cmd, writable_roots, timeout_ms, false)
run_cmd_result_with_writable_roots(cmd, writable_roots, timeout_ms, false, false)
.await
.expect("sandboxed command should execute")
}
@@ -67,6 +67,7 @@ async fn run_cmd_result_with_writable_roots(
writable_roots: &[PathBuf],
timeout_ms: u64,
use_bwrap_sandbox: bool,
network_access: bool,
) -> Result<codex_core::exec::ExecToolCallOutput> {
let cwd = std::env::current_dir().expect("cwd should exist");
let sandbox_cwd = cwd.clone();
@@ -89,7 +90,7 @@ async fn run_cmd_result_with_writable_roots(
.map(|p| AbsolutePathBuf::try_from(p.as_path()).unwrap())
.collect(),
read_only_access: Default::default(),
network_access: false,
network_access,
// Exclude tmp-related folders from writable roots because we need a
// folder that is writable by tests but that we intentionally disallow
// writing to in the sandbox.
@@ -112,6 +113,13 @@ async fn run_cmd_result_with_writable_roots(
fn is_bwrap_unavailable_output(output: &codex_core::exec::ExecToolCallOutput) -> bool {
output.stderr.text.contains(BWRAP_UNAVAILABLE_ERR)
|| (output
.stderr
.text
.contains("Can't mount proc on /newroot/proc")
&& (output.stderr.text.contains("Operation not permitted")
|| output.stderr.text.contains("Permission denied")
|| output.stderr.text.contains("Invalid argument")))
}
async fn should_skip_bwrap_tests() -> bool {
@@ -120,6 +128,7 @@ async fn should_skip_bwrap_tests() -> bool {
&[],
NETWORK_TIMEOUT_MS,
true,
true,
)
.await
{
@@ -168,14 +177,90 @@ async fn test_root_write() {
#[tokio::test]
async fn test_dev_null_write() {
run_cmd(
if should_skip_bwrap_tests().await {
eprintln!("skipping bwrap test: bwrap sandbox prerequisites are unavailable");
return;
}
let output = run_cmd_result_with_writable_roots(
&["bash", "-lc", "echo blah > /dev/null"],
&[],
// We have seen timeouts when running this test in CI on GitHub,
// so we are using a generous timeout until we can diagnose further.
LONG_TIMEOUT_MS,
true,
true,
)
.await;
.await
.expect("sandboxed command should execute");
assert_eq!(output.exit_code, 0);
}
#[tokio::test]
async fn bwrap_populates_minimal_dev_nodes() {
if should_skip_bwrap_tests().await {
eprintln!("skipping bwrap test: bwrap sandbox prerequisites are unavailable");
return;
}
let output = run_cmd_result_with_writable_roots(
&[
"bash",
"-lc",
"for node in null zero full random urandom tty; do [ -c \"/dev/$node\" ] || { echo \"missing /dev/$node\" >&2; exit 1; }; done",
],
&[],
LONG_TIMEOUT_MS,
true,
true,
)
.await
.expect("sandboxed command should execute");
assert_eq!(output.exit_code, 0);
}
#[tokio::test]
async fn bwrap_preserves_writable_dev_shm_bind_mount() {
if should_skip_bwrap_tests().await {
eprintln!("skipping bwrap test: bwrap sandbox prerequisites are unavailable");
return;
}
if !std::path::Path::new("/dev/shm").exists() {
eprintln!("skipping bwrap test: /dev/shm is unavailable in this environment");
return;
}
let target_file = match NamedTempFile::new_in("/dev/shm") {
Ok(file) => file,
Err(err) => {
eprintln!("skipping bwrap test: failed to create /dev/shm temp file: {err}");
return;
}
};
let target_path = target_file.path().to_path_buf();
std::fs::write(&target_path, "host-before").expect("seed /dev/shm file");
let output = run_cmd_result_with_writable_roots(
&[
"bash",
"-lc",
&format!("printf sandbox-after > {}", target_path.to_string_lossy()),
],
&[PathBuf::from("/dev/shm")],
LONG_TIMEOUT_MS,
true,
true,
)
.await
.expect("sandboxed command should execute");
assert_eq!(output.exit_code, 0);
assert_eq!(
std::fs::read_to_string(&target_path).expect("read /dev/shm file"),
"sandbox-after"
);
}
#[tokio::test]
@@ -306,7 +391,7 @@ async fn sandbox_blocks_nc() {
#[tokio::test]
async fn sandbox_blocks_git_and_codex_writes_inside_writable_root() {
if should_skip_bwrap_tests().await {
eprintln!("skipping bwrap test: vendored bwrap was not built in this environment");
eprintln!("skipping bwrap test: bwrap sandbox prerequisites are unavailable");
return;
}
@@ -329,6 +414,7 @@ async fn sandbox_blocks_git_and_codex_writes_inside_writable_root() {
&[tmpdir.path().to_path_buf()],
LONG_TIMEOUT_MS,
true,
true,
)
.await,
".git write should be denied under bubblewrap",
@@ -344,6 +430,7 @@ async fn sandbox_blocks_git_and_codex_writes_inside_writable_root() {
&[tmpdir.path().to_path_buf()],
LONG_TIMEOUT_MS,
true,
true,
)
.await,
".codex write should be denied under bubblewrap",
@@ -355,7 +442,7 @@ async fn sandbox_blocks_git_and_codex_writes_inside_writable_root() {
#[tokio::test]
async fn sandbox_blocks_codex_symlink_replacement_attack() {
if should_skip_bwrap_tests().await {
eprintln!("skipping bwrap test: vendored bwrap was not built in this environment");
eprintln!("skipping bwrap test: bwrap sandbox prerequisites are unavailable");
return;
}
@@ -380,6 +467,7 @@ async fn sandbox_blocks_codex_symlink_replacement_attack() {
&[tmpdir.path().to_path_buf()],
LONG_TIMEOUT_MS,
true,
true,
)
.await,
".codex symlink replacement should be denied",

View File

@@ -0,0 +1,4 @@
CREATE INDEX idx_logs_thread_id_ts ON logs(thread_id, ts DESC, ts_nanos DESC, id DESC);
CREATE INDEX idx_logs_process_uuid_threadless_ts ON logs(process_uuid, ts DESC, ts_nanos DESC, id DESC)
WHERE thread_id IS NULL;

View File

@@ -0,0 +1,9 @@
ALTER TABLE logs ADD COLUMN estimated_bytes INTEGER NOT NULL DEFAULT 0;
UPDATE logs
SET estimated_bytes =
LENGTH(CAST(COALESCE(message, '') AS BLOB))
+ LENGTH(CAST(level AS BLOB))
+ LENGTH(CAST(target AS BLOB))
+ LENGTH(CAST(COALESCE(module_path, '') AS BLOB))
+ LENGTH(CAST(COALESCE(file, '') AS BLOB));

View File

@@ -27,11 +27,13 @@ use sqlx::ConnectOptions;
use sqlx::QueryBuilder;
use sqlx::Row;
use sqlx::Sqlite;
use sqlx::SqliteConnection;
use sqlx::SqlitePool;
use sqlx::sqlite::SqliteConnectOptions;
use sqlx::sqlite::SqliteJournalMode;
use sqlx::sqlite::SqlitePoolOptions;
use sqlx::sqlite::SqliteSynchronous;
use std::collections::BTreeSet;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
@@ -42,6 +44,12 @@ use uuid::Uuid;
mod memories;
// Memory-specific CRUD and phase job lifecycle methods live in `runtime/memories.rs`.
// "Partition" is the retention bucket we cap at 10 MiB:
// - one bucket per non-null thread_id
// - one bucket per threadless (thread_id IS NULL) non-null process_uuid
// - one bucket for threadless rows with process_uuid IS NULL
const LOG_PARTITION_SIZE_LIMIT_BYTES: i64 = 10 * 1024 * 1024;
#[derive(Clone)]
pub struct StateRuntime {
codex_home: PathBuf,
@@ -359,10 +367,16 @@ FROM threads
return Ok(());
}
let mut tx = self.pool.begin().await?;
let mut builder = QueryBuilder::<Sqlite>::new(
"INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, process_uuid, module_path, file, line) ",
"INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, process_uuid, module_path, file, line, estimated_bytes) ",
);
builder.push_values(entries, |mut row, entry| {
let estimated_bytes = entry.message.as_ref().map_or(0, String::len) as i64
+ entry.level.len() as i64
+ entry.target.len() as i64
+ entry.module_path.as_ref().map_or(0, String::len) as i64
+ entry.file.as_ref().map_or(0, String::len) as i64;
row.push_bind(entry.ts)
.push_bind(entry.ts_nanos)
.push_bind(&entry.level)
@@ -372,9 +386,228 @@ FROM threads
.push_bind(&entry.process_uuid)
.push_bind(&entry.module_path)
.push_bind(&entry.file)
.push_bind(entry.line);
.push_bind(entry.line)
.push_bind(estimated_bytes);
});
builder.build().execute(self.pool.as_ref()).await?;
builder.build().execute(&mut *tx).await?;
self.prune_logs_after_insert(entries, &mut tx).await?;
tx.commit().await?;
Ok(())
}
/// Enforce per-partition log size caps after a successful batch insert.
///
/// We maintain two independent budgets:
/// - Thread logs: rows with `thread_id IS NOT NULL`, capped per `thread_id`.
/// - Threadless process logs: rows with `thread_id IS NULL` ("threadless"),
/// capped per `process_uuid` (including `process_uuid IS NULL` as its own
/// threadless partition).
///
/// "Threadless" means the log row is not associated with any conversation
/// thread, so retention is keyed by process identity instead.
///
/// This runs inside the same transaction as the insert so callers never
/// observe "inserted but not yet pruned" rows.
async fn prune_logs_after_insert(
&self,
entries: &[LogEntry],
tx: &mut SqliteConnection,
) -> anyhow::Result<()> {
let thread_ids: BTreeSet<&str> = entries
.iter()
.filter_map(|entry| entry.thread_id.as_deref())
.collect();
if !thread_ids.is_empty() {
// Cheap precheck: only run the heavier window-function prune for
// threads that are currently above the cap.
let mut over_limit_threads_query =
QueryBuilder::<Sqlite>::new("SELECT thread_id FROM logs WHERE thread_id IN (");
{
let mut separated = over_limit_threads_query.separated(", ");
for thread_id in &thread_ids {
separated.push_bind(*thread_id);
}
}
over_limit_threads_query.push(") GROUP BY thread_id HAVING SUM(");
over_limit_threads_query.push("estimated_bytes");
over_limit_threads_query.push(") > ");
over_limit_threads_query.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
let over_limit_thread_ids: Vec<String> = over_limit_threads_query
.build()
.fetch_all(&mut *tx)
.await?
.into_iter()
.map(|row| row.try_get("thread_id"))
.collect::<Result<_, _>>()?;
if !over_limit_thread_ids.is_empty() {
// Enforce a strict per-thread cap by deleting every row whose
// newest-first cumulative bytes exceed the partition budget.
let mut prune_threads = QueryBuilder::<Sqlite>::new(
r#"
DELETE FROM logs
WHERE id IN (
SELECT id
FROM (
SELECT
id,
SUM(
"#,
);
prune_threads.push("estimated_bytes");
prune_threads.push(
r#"
) OVER (
PARTITION BY thread_id
ORDER BY ts DESC, ts_nanos DESC, id DESC
) AS cumulative_bytes
FROM logs
WHERE thread_id IN (
"#,
);
{
let mut separated = prune_threads.separated(", ");
for thread_id in &over_limit_thread_ids {
separated.push_bind(thread_id);
}
}
prune_threads.push(
r#"
)
)
WHERE cumulative_bytes >
"#,
);
prune_threads.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
prune_threads.push("\n)");
prune_threads.build().execute(&mut *tx).await?;
}
}
let threadless_process_uuids: BTreeSet<&str> = entries
.iter()
.filter(|entry| entry.thread_id.is_none())
.filter_map(|entry| entry.process_uuid.as_deref())
.collect();
let has_threadless_null_process_uuid = entries
.iter()
.any(|entry| entry.thread_id.is_none() && entry.process_uuid.is_none());
if !threadless_process_uuids.is_empty() {
// Threadless logs are budgeted separately per process UUID.
let mut over_limit_processes_query = QueryBuilder::<Sqlite>::new(
"SELECT process_uuid FROM logs WHERE thread_id IS NULL AND process_uuid IN (",
);
{
let mut separated = over_limit_processes_query.separated(", ");
for process_uuid in &threadless_process_uuids {
separated.push_bind(*process_uuid);
}
}
over_limit_processes_query.push(") GROUP BY process_uuid HAVING SUM(");
over_limit_processes_query.push("estimated_bytes");
over_limit_processes_query.push(") > ");
over_limit_processes_query.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
let over_limit_process_uuids: Vec<String> = over_limit_processes_query
.build()
.fetch_all(&mut *tx)
.await?
.into_iter()
.map(|row| row.try_get("process_uuid"))
.collect::<Result<_, _>>()?;
if !over_limit_process_uuids.is_empty() {
// Same strict cap policy as thread pruning, but only for
// threadless rows in the affected process UUIDs.
let mut prune_threadless_process_logs = QueryBuilder::<Sqlite>::new(
r#"
DELETE FROM logs
WHERE id IN (
SELECT id
FROM (
SELECT
id,
SUM(
"#,
);
prune_threadless_process_logs.push("estimated_bytes");
prune_threadless_process_logs.push(
r#"
) OVER (
PARTITION BY process_uuid
ORDER BY ts DESC, ts_nanos DESC, id DESC
) AS cumulative_bytes
FROM logs
WHERE thread_id IS NULL
AND process_uuid IN (
"#,
);
{
let mut separated = prune_threadless_process_logs.separated(", ");
for process_uuid in &over_limit_process_uuids {
separated.push_bind(process_uuid);
}
}
prune_threadless_process_logs.push(
r#"
)
)
WHERE cumulative_bytes >
"#,
);
prune_threadless_process_logs.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
prune_threadless_process_logs.push("\n)");
prune_threadless_process_logs
.build()
.execute(&mut *tx)
.await?;
}
}
if has_threadless_null_process_uuid {
// Rows without a process UUID still need a cap; treat NULL as its
// own threadless partition.
let mut null_process_usage_query = QueryBuilder::<Sqlite>::new("SELECT SUM(");
null_process_usage_query.push("estimated_bytes");
null_process_usage_query.push(
") AS total_bytes FROM logs WHERE thread_id IS NULL AND process_uuid IS NULL",
);
let total_null_process_bytes: Option<i64> = null_process_usage_query
.build()
.fetch_one(&mut *tx)
.await?
.try_get("total_bytes")?;
if total_null_process_bytes.unwrap_or(0) > LOG_PARTITION_SIZE_LIMIT_BYTES {
let mut prune_threadless_null_process_logs = QueryBuilder::<Sqlite>::new(
r#"
DELETE FROM logs
WHERE id IN (
SELECT id
FROM (
SELECT
id,
SUM(
"#,
);
prune_threadless_null_process_logs.push("estimated_bytes");
prune_threadless_null_process_logs.push(
r#"
) OVER (
PARTITION BY process_uuid
ORDER BY ts DESC, ts_nanos DESC, id DESC
) AS cumulative_bytes
FROM logs
WHERE thread_id IS NULL
AND process_uuid IS NULL
)
WHERE cumulative_bytes >
"#,
);
prune_threadless_null_process_logs.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
prune_threadless_null_process_logs.push("\n)");
prune_threadless_null_process_logs
.build()
.execute(&mut *tx)
.await?;
}
}
Ok(())
}
@@ -2553,6 +2786,300 @@ VALUES (?, ?, ?, ?, ?)
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn insert_logs_prunes_old_rows_when_thread_exceeds_size_limit() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let six_mebibytes = "a".repeat(6 * 1024 * 1024);
runtime
.insert_logs(&[
LogEntry {
ts: 1,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some(six_mebibytes.clone()),
thread_id: Some("thread-1".to_string()),
process_uuid: Some("proc-1".to_string()),
file: Some("main.rs".to_string()),
line: Some(1),
module_path: Some("mod".to_string()),
},
LogEntry {
ts: 2,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some(six_mebibytes.clone()),
thread_id: Some("thread-1".to_string()),
process_uuid: Some("proc-1".to_string()),
file: Some("main.rs".to_string()),
line: Some(2),
module_path: Some("mod".to_string()),
},
])
.await
.expect("insert test logs");
let rows = runtime
.query_logs(&LogQuery {
thread_ids: vec!["thread-1".to_string()],
..Default::default()
})
.await
.expect("query thread logs");
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].ts, 2);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn insert_logs_prunes_single_thread_row_when_it_exceeds_size_limit() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let eleven_mebibytes = "d".repeat(11 * 1024 * 1024);
runtime
.insert_logs(&[LogEntry {
ts: 1,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some(eleven_mebibytes),
thread_id: Some("thread-oversized".to_string()),
process_uuid: Some("proc-1".to_string()),
file: Some("main.rs".to_string()),
line: Some(1),
module_path: Some("mod".to_string()),
}])
.await
.expect("insert test log");
let rows = runtime
.query_logs(&LogQuery {
thread_ids: vec!["thread-oversized".to_string()],
..Default::default()
})
.await
.expect("query thread logs");
assert!(rows.is_empty());
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn insert_logs_prunes_threadless_rows_per_process_uuid_only() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let six_mebibytes = "b".repeat(6 * 1024 * 1024);
runtime
.insert_logs(&[
LogEntry {
ts: 1,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some(six_mebibytes.clone()),
thread_id: None,
process_uuid: Some("proc-1".to_string()),
file: Some("main.rs".to_string()),
line: Some(1),
module_path: Some("mod".to_string()),
},
LogEntry {
ts: 2,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some(six_mebibytes.clone()),
thread_id: None,
process_uuid: Some("proc-1".to_string()),
file: Some("main.rs".to_string()),
line: Some(2),
module_path: Some("mod".to_string()),
},
LogEntry {
ts: 3,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some(six_mebibytes),
thread_id: Some("thread-1".to_string()),
process_uuid: Some("proc-1".to_string()),
file: Some("main.rs".to_string()),
line: Some(3),
module_path: Some("mod".to_string()),
},
])
.await
.expect("insert test logs");
let rows = runtime
.query_logs(&LogQuery {
thread_ids: vec!["thread-1".to_string()],
include_threadless: true,
..Default::default()
})
.await
.expect("query thread and threadless logs");
let mut timestamps: Vec<i64> = rows.into_iter().map(|row| row.ts).collect();
timestamps.sort_unstable();
assert_eq!(timestamps, vec![2, 3]);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn insert_logs_prunes_single_threadless_process_row_when_it_exceeds_size_limit() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let eleven_mebibytes = "e".repeat(11 * 1024 * 1024);
runtime
.insert_logs(&[LogEntry {
ts: 1,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some(eleven_mebibytes),
thread_id: None,
process_uuid: Some("proc-oversized".to_string()),
file: Some("main.rs".to_string()),
line: Some(1),
module_path: Some("mod".to_string()),
}])
.await
.expect("insert test log");
let rows = runtime
.query_logs(&LogQuery {
include_threadless: true,
..Default::default()
})
.await
.expect("query threadless logs");
assert!(rows.is_empty());
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn insert_logs_prunes_threadless_rows_with_null_process_uuid() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let six_mebibytes = "c".repeat(6 * 1024 * 1024);
runtime
.insert_logs(&[
LogEntry {
ts: 1,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some(six_mebibytes.clone()),
thread_id: None,
process_uuid: None,
file: Some("main.rs".to_string()),
line: Some(1),
module_path: Some("mod".to_string()),
},
LogEntry {
ts: 2,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some(six_mebibytes),
thread_id: None,
process_uuid: None,
file: Some("main.rs".to_string()),
line: Some(2),
module_path: Some("mod".to_string()),
},
LogEntry {
ts: 3,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some("small".to_string()),
thread_id: None,
process_uuid: Some("proc-1".to_string()),
file: Some("main.rs".to_string()),
line: Some(3),
module_path: Some("mod".to_string()),
},
])
.await
.expect("insert test logs");
let rows = runtime
.query_logs(&LogQuery {
include_threadless: true,
..Default::default()
})
.await
.expect("query threadless logs");
let mut timestamps: Vec<i64> = rows.into_iter().map(|row| row.ts).collect();
timestamps.sort_unstable();
assert_eq!(timestamps, vec![2, 3]);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn insert_logs_prunes_single_threadless_null_process_row_when_it_exceeds_limit() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let eleven_mebibytes = "f".repeat(11 * 1024 * 1024);
runtime
.insert_logs(&[LogEntry {
ts: 1,
ts_nanos: 0,
level: "INFO".to_string(),
target: "cli".to_string(),
message: Some(eleven_mebibytes),
thread_id: None,
process_uuid: None,
file: Some("main.rs".to_string()),
line: Some(1),
module_path: Some("mod".to_string()),
}])
.await
.expect("insert test log");
let rows = runtime
.query_logs(&LogQuery {
include_threadless: true,
..Default::default()
})
.await
.expect("query threadless logs");
assert!(rows.is_empty());
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
fn test_thread_metadata(
codex_home: &Path,
thread_id: ThreadId,