mirror of
https://github.com/openai/codex.git
synced 2026-02-19 15:23:46 +00:00
Compare commits
8 Commits
core/post-
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
743caea3a6 | ||
|
|
2daa3fd44f | ||
|
|
f298c48cc6 | ||
|
|
227352257c | ||
|
|
4fe99b086f | ||
|
|
18eb640a47 | ||
|
|
16c3c47535 | ||
|
|
7f3dbaeb25 |
11
.github/workflows/rust-ci.yml
vendored
11
.github/workflows/rust-ci.yml
vendored
@@ -581,6 +581,17 @@ jobs:
|
||||
tool: nextest
|
||||
version: 0.9.103
|
||||
|
||||
- name: Enable unprivileged user namespaces (Linux)
|
||||
if: runner.os == 'Linux'
|
||||
run: |
|
||||
# Required for bubblewrap to work on Linux CI runners.
|
||||
sudo sysctl -w kernel.unprivileged_userns_clone=1
|
||||
# Ubuntu 24.04+ can additionally gate unprivileged user namespaces
|
||||
# behind AppArmor.
|
||||
if sudo sysctl -a 2>/dev/null | grep -q '^kernel.apparmor_restrict_unprivileged_userns'; then
|
||||
sudo sysctl -w kernel.apparmor_restrict_unprivileged_userns=0
|
||||
fi
|
||||
|
||||
- name: tests
|
||||
id: test
|
||||
run: cargo nextest run --all-features --no-fail-fast --target ${{ matrix.target }} --cargo-profile ci-test --timings
|
||||
|
||||
@@ -105,6 +105,10 @@ impl McpProcess {
|
||||
cmd.stderr(Stdio::piped());
|
||||
cmd.env("CODEX_HOME", codex_home);
|
||||
cmd.env("RUST_LOG", "debug");
|
||||
// Bazel/Linux workers can run with smaller default thread stacks, which makes
|
||||
// tokio-runtime-worker stack overflows more likely in app-server integration tests.
|
||||
// Pin a larger minimum stack for the spawned test server process.
|
||||
cmd.env("RUST_MIN_STACK", "4194304");
|
||||
cmd.env_remove(CODEX_INTERNAL_ORIGINATOR_OVERRIDE_ENV_VAR);
|
||||
|
||||
for (k, v) in env_overrides {
|
||||
|
||||
@@ -507,14 +507,19 @@ async fn thread_resume_rejects_mismatched_path_when_thread_is_running() -> Resul
|
||||
#[tokio::test]
|
||||
async fn thread_resume_rejoins_running_thread_even_with_override_mismatch() -> Result<()> {
|
||||
let server = responses::start_mock_server().await;
|
||||
let first_body = responses::sse(vec![
|
||||
let first_response = responses::sse_response(responses::sse(vec![
|
||||
responses::ev_response_created("resp-1"),
|
||||
responses::ev_assistant_message("msg-1", "Done"),
|
||||
responses::ev_completed("resp-1"),
|
||||
]);
|
||||
let second_body = responses::sse(vec![responses::ev_response_created("resp-2")]);
|
||||
]));
|
||||
let second_response = responses::sse_response(responses::sse(vec![
|
||||
responses::ev_response_created("resp-2"),
|
||||
responses::ev_assistant_message("msg-2", "Done"),
|
||||
responses::ev_completed("resp-2"),
|
||||
]))
|
||||
.set_delay(std::time::Duration::from_millis(500));
|
||||
let _response_mock =
|
||||
responses::mount_sse_sequence(&server, vec![first_body, second_body]).await;
|
||||
responses::mount_response_sequence(&server, vec![first_response, second_response]).await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
|
||||
@@ -4,6 +4,8 @@ use std::sync::LazyLock;
|
||||
use std::sync::Mutex as StdMutex;
|
||||
|
||||
use codex_core::config::Config;
|
||||
use codex_core::default_client::is_first_party_chat_originator;
|
||||
use codex_core::default_client::originator;
|
||||
use codex_core::features::Feature;
|
||||
use codex_core::token_data::TokenData;
|
||||
use serde::Deserialize;
|
||||
@@ -460,23 +462,34 @@ const DISALLOWED_CONNECTOR_IDS: &[&str] = &[
|
||||
"connector_69272cb413a081919685ec3c88d1744e",
|
||||
"connector_0f9c9d4592e54d0a9a12b3f44a1e2010",
|
||||
];
|
||||
const FIRST_PARTY_CHAT_DISALLOWED_CONNECTOR_IDS: &[&str] =
|
||||
&["connector_0f9c9d4592e54d0a9a12b3f44a1e2010"];
|
||||
const DISALLOWED_CONNECTOR_PREFIX: &str = "connector_openai_";
|
||||
|
||||
fn filter_disallowed_connectors(connectors: Vec<AppInfo>) -> Vec<AppInfo> {
|
||||
filter_disallowed_connectors_for_originator(connectors, originator().value.as_str())
|
||||
}
|
||||
|
||||
fn filter_disallowed_connectors_for_originator(
|
||||
connectors: Vec<AppInfo>,
|
||||
originator_value: &str,
|
||||
) -> Vec<AppInfo> {
|
||||
let disallowed_connector_ids = if is_first_party_chat_originator(originator_value) {
|
||||
FIRST_PARTY_CHAT_DISALLOWED_CONNECTOR_IDS
|
||||
} else {
|
||||
DISALLOWED_CONNECTOR_IDS
|
||||
};
|
||||
|
||||
connectors
|
||||
.into_iter()
|
||||
.filter(is_connector_allowed)
|
||||
.filter(|connector| is_connector_allowed(connector, disallowed_connector_ids))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn is_connector_allowed(connector: &AppInfo) -> bool {
|
||||
fn is_connector_allowed(connector: &AppInfo, disallowed_connector_ids: &[&str]) -> bool {
|
||||
let connector_id = connector.id.as_str();
|
||||
if connector_id.starts_with(DISALLOWED_CONNECTOR_PREFIX)
|
||||
|| DISALLOWED_CONNECTOR_IDS.contains(&connector_id)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
true
|
||||
!connector_id.starts_with(DISALLOWED_CONNECTOR_PREFIX)
|
||||
&& !disallowed_connector_ids.contains(&connector_id)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -523,7 +536,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn filters_openai_connectors() {
|
||||
fn filters_openai_prefixed_connectors() {
|
||||
let filtered = filter_disallowed_connectors(vec![
|
||||
app("connector_openai_foo"),
|
||||
app("connector_openai_bar"),
|
||||
@@ -541,6 +554,22 @@ mod tests {
|
||||
assert_eq!(filtered, vec![app("delta")]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn first_party_chat_originator_filters_target_and_openai_prefixed_connectors() {
|
||||
let filtered = filter_disallowed_connectors_for_originator(
|
||||
vec![
|
||||
app("connector_openai_foo"),
|
||||
app("asdk_app_6938a94a61d881918ef32cb999ff937c"),
|
||||
app("connector_0f9c9d4592e54d0a9a12b3f44a1e2010"),
|
||||
],
|
||||
"codex_atlas",
|
||||
);
|
||||
assert_eq!(
|
||||
filtered,
|
||||
vec![app("asdk_app_6938a94a61d881918ef32cb999ff937c"),]
|
||||
);
|
||||
}
|
||||
|
||||
fn merged_app(id: &str, is_accessible: bool) -> AppInfo {
|
||||
AppInfo {
|
||||
id: id.to_string(),
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
use crate::agent::AgentStatus;
|
||||
use crate::agent::guards::Guards;
|
||||
use crate::agent::status::is_final;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::session_prefix::format_subagent_notification_message;
|
||||
use crate::thread_manager::ThreadManagerState;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use std::path::PathBuf;
|
||||
@@ -46,6 +49,7 @@ impl AgentControl {
|
||||
) -> CodexResult<ThreadId> {
|
||||
let state = self.upgrade()?;
|
||||
let reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
|
||||
let notification_source = session_source.clone();
|
||||
|
||||
// The same `AgentControl` is sent to spawn the thread.
|
||||
let new_thread = match session_source {
|
||||
@@ -64,6 +68,7 @@ impl AgentControl {
|
||||
state.notify_thread_created(new_thread.thread_id);
|
||||
|
||||
self.send_input(new_thread.thread_id, items).await?;
|
||||
self.maybe_start_completion_watcher(new_thread.thread_id, notification_source);
|
||||
|
||||
Ok(new_thread.thread_id)
|
||||
}
|
||||
@@ -77,6 +82,7 @@ impl AgentControl {
|
||||
) -> CodexResult<ThreadId> {
|
||||
let state = self.upgrade()?;
|
||||
let reservation = self.state.reserve_spawn_slot(config.agent_max_threads)?;
|
||||
let notification_source = session_source.clone();
|
||||
|
||||
let resumed_thread = state
|
||||
.resume_thread_from_rollout_with_source(
|
||||
@@ -90,6 +96,7 @@ impl AgentControl {
|
||||
// Resumed threads are re-registered in-memory and need the same listener
|
||||
// attachment path as freshly spawned threads.
|
||||
state.notify_thread_created(resumed_thread.thread_id);
|
||||
self.maybe_start_completion_watcher(resumed_thread.thread_id, Some(notification_source));
|
||||
|
||||
Ok(resumed_thread.thread_id)
|
||||
}
|
||||
@@ -164,13 +171,60 @@ impl AgentControl {
|
||||
thread.total_token_usage().await
|
||||
}
|
||||
|
||||
/// Starts a detached watcher for sub-agents spawned from another thread.
|
||||
///
|
||||
/// This is only enabled for `SubAgentSource::ThreadSpawn`, where a parent thread exists and
|
||||
/// can receive completion notifications.
|
||||
fn maybe_start_completion_watcher(
|
||||
&self,
|
||||
child_thread_id: ThreadId,
|
||||
session_source: Option<SessionSource>,
|
||||
) {
|
||||
let Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id, ..
|
||||
})) = session_source
|
||||
else {
|
||||
return;
|
||||
};
|
||||
let control = self.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut status_rx = match control.subscribe_status(child_thread_id).await {
|
||||
Ok(rx) => rx,
|
||||
Err(_) => return,
|
||||
};
|
||||
let mut status = status_rx.borrow().clone();
|
||||
while !is_final(&status) {
|
||||
if status_rx.changed().await.is_err() {
|
||||
status = control.get_status(child_thread_id).await;
|
||||
break;
|
||||
}
|
||||
status = status_rx.borrow().clone();
|
||||
}
|
||||
if !is_final(&status) {
|
||||
return;
|
||||
}
|
||||
|
||||
let Ok(state) = control.upgrade() else {
|
||||
return;
|
||||
};
|
||||
let Ok(parent_thread) = state.get_thread(parent_thread_id).await else {
|
||||
return;
|
||||
};
|
||||
parent_thread
|
||||
.inject_user_message_without_turn(format_subagent_notification_message(
|
||||
&child_thread_id.to_string(),
|
||||
&status,
|
||||
))
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
fn upgrade(&self) -> CodexResult<Arc<ThreadManagerState>> {
|
||||
self.manager
|
||||
.upgrade()
|
||||
.ok_or_else(|| CodexErr::UnsupportedOperation("thread manager dropped".to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
@@ -180,16 +234,24 @@ mod tests {
|
||||
use crate::agent::agent_status_from_event;
|
||||
use crate::config::Config;
|
||||
use crate::config::ConfigBuilder;
|
||||
use crate::session_prefix::SUBAGENT_NOTIFICATION_OPEN_TAG;
|
||||
use assert_matches::assert_matches;
|
||||
use codex_protocol::config_types::ModeKind;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::ErrorEvent;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::SubAgentSource;
|
||||
use codex_protocol::protocol::TurnAbortReason;
|
||||
use codex_protocol::protocol::TurnAbortedEvent;
|
||||
use codex_protocol::protocol::TurnCompleteEvent;
|
||||
use codex_protocol::protocol::TurnStartedEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::sleep;
|
||||
use tokio::time::timeout;
|
||||
use toml::Value as TomlValue;
|
||||
|
||||
async fn test_config_with_cli_overrides(
|
||||
@@ -250,6 +312,42 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
fn has_subagent_notification(history_items: &[ResponseItem]) -> bool {
|
||||
history_items.iter().any(|item| {
|
||||
let ResponseItem::Message { role, content, .. } = item else {
|
||||
return false;
|
||||
};
|
||||
if role != "user" {
|
||||
return false;
|
||||
}
|
||||
content.iter().any(|content_item| match content_item {
|
||||
ContentItem::InputText { text } | ContentItem::OutputText { text } => {
|
||||
text.contains(SUBAGENT_NOTIFICATION_OPEN_TAG)
|
||||
}
|
||||
ContentItem::InputImage { .. } => false,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
async fn wait_for_subagent_notification(parent_thread: &Arc<CodexThread>) -> bool {
|
||||
let wait = async {
|
||||
loop {
|
||||
let history_items = parent_thread
|
||||
.codex
|
||||
.session
|
||||
.clone_history()
|
||||
.await
|
||||
.raw_items()
|
||||
.to_vec();
|
||||
if has_subagent_notification(&history_items) {
|
||||
return true;
|
||||
}
|
||||
sleep(Duration::from_millis(25)).await;
|
||||
}
|
||||
};
|
||||
timeout(Duration::from_secs(2), wait).await.is_ok()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn send_input_errors_when_manager_dropped() {
|
||||
let control = AgentControl::default();
|
||||
@@ -683,4 +781,35 @@ mod tests {
|
||||
.await
|
||||
.expect("shutdown resumed thread");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spawn_child_completion_notifies_parent_history() {
|
||||
let harness = AgentControlHarness::new().await;
|
||||
let (parent_thread_id, parent_thread) = harness.start_thread().await;
|
||||
|
||||
let child_thread_id = harness
|
||||
.control
|
||||
.spawn_agent(
|
||||
harness.config.clone(),
|
||||
text_input("hello child"),
|
||||
Some(SessionSource::SubAgent(SubAgentSource::ThreadSpawn {
|
||||
parent_thread_id,
|
||||
depth: 1,
|
||||
})),
|
||||
)
|
||||
.await
|
||||
.expect("child spawn should succeed");
|
||||
|
||||
let child_thread = harness
|
||||
.manager
|
||||
.get_thread(child_thread_id)
|
||||
.await
|
||||
.expect("child thread should exist");
|
||||
let _ = child_thread
|
||||
.submit(Op::Shutdown {})
|
||||
.await
|
||||
.expect("child shutdown should submit");
|
||||
|
||||
assert_eq!(wait_for_subagent_notification(&parent_thread).await, true);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,6 +8,9 @@ use crate::protocol::Event;
|
||||
use crate::protocol::Op;
|
||||
use crate::protocol::Submission;
|
||||
use codex_protocol::config_types::Personality;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
@@ -32,7 +35,7 @@ pub struct ThreadConfigSnapshot {
|
||||
}
|
||||
|
||||
pub struct CodexThread {
|
||||
codex: Codex,
|
||||
pub(crate) codex: Codex,
|
||||
rollout_path: Option<PathBuf>,
|
||||
_watch_registration: WatchRegistration,
|
||||
}
|
||||
@@ -85,6 +88,33 @@ impl CodexThread {
|
||||
self.codex.session.total_token_usage().await
|
||||
}
|
||||
|
||||
/// Records a user-role session-prefix message without creating a new user turn boundary.
|
||||
pub(crate) async fn inject_user_message_without_turn(&self, message: String) {
|
||||
let pending_item = ResponseInputItem::Message {
|
||||
role: "user".to_string(),
|
||||
content: vec![ContentItem::InputText { text: message }],
|
||||
};
|
||||
let pending_items = vec![pending_item];
|
||||
let Err(items_without_active_turn) = self
|
||||
.codex
|
||||
.session
|
||||
.inject_response_items(pending_items)
|
||||
.await
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
let turn_context = self.codex.session.new_default_turn().await;
|
||||
let items: Vec<ResponseItem> = items_without_active_turn
|
||||
.into_iter()
|
||||
.map(ResponseItem::from)
|
||||
.collect();
|
||||
self.codex
|
||||
.session
|
||||
.record_conversation_items(turn_context.as_ref(), &items)
|
||||
.await;
|
||||
}
|
||||
|
||||
pub fn rollout_path(&self) -> Option<PathBuf> {
|
||||
self.rollout_path.clone()
|
||||
}
|
||||
|
||||
@@ -23,9 +23,9 @@ use serde::Serialize;
|
||||
use serde::de::Error as SerdeError;
|
||||
|
||||
pub const DEFAULT_OTEL_ENVIRONMENT: &str = "dev";
|
||||
pub const DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 8;
|
||||
pub const DEFAULT_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 16;
|
||||
pub const DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS: i64 = 30;
|
||||
pub const DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS: i64 = 12;
|
||||
pub const DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS: i64 = 6;
|
||||
pub const DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_GLOBAL: usize = 1_024;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema)]
|
||||
|
||||
@@ -571,6 +571,9 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
|
||||
"<skill>\n<name>demo</name>\n<path>skills/demo/SKILL.md</path>\nbody\n</skill>",
|
||||
),
|
||||
user_input_text_msg("<user_shell_command>echo 42</user_shell_command>"),
|
||||
user_input_text_msg(
|
||||
"<subagent_notification>{\"agent_id\":\"a\",\"status\":\"completed\"}</subagent_notification>",
|
||||
),
|
||||
user_input_text_msg("turn 1 user"),
|
||||
assistant_msg("turn 1 assistant"),
|
||||
user_input_text_msg("turn 2 user"),
|
||||
@@ -591,6 +594,9 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
|
||||
"<skill>\n<name>demo</name>\n<path>skills/demo/SKILL.md</path>\nbody\n</skill>",
|
||||
),
|
||||
user_input_text_msg("<user_shell_command>echo 42</user_shell_command>"),
|
||||
user_input_text_msg(
|
||||
"<subagent_notification>{\"agent_id\":\"a\",\"status\":\"completed\"}</subagent_notification>",
|
||||
),
|
||||
user_input_text_msg("turn 1 user"),
|
||||
assistant_msg("turn 1 assistant"),
|
||||
];
|
||||
@@ -610,6 +616,9 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
|
||||
"<skill>\n<name>demo</name>\n<path>skills/demo/SKILL.md</path>\nbody\n</skill>",
|
||||
),
|
||||
user_input_text_msg("<user_shell_command>echo 42</user_shell_command>"),
|
||||
user_input_text_msg(
|
||||
"<subagent_notification>{\"agent_id\":\"a\",\"status\":\"completed\"}</subagent_notification>",
|
||||
),
|
||||
];
|
||||
|
||||
let mut history = create_history_with_items(vec![
|
||||
@@ -622,6 +631,9 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
|
||||
"<skill>\n<name>demo</name>\n<path>skills/demo/SKILL.md</path>\nbody\n</skill>",
|
||||
),
|
||||
user_input_text_msg("<user_shell_command>echo 42</user_shell_command>"),
|
||||
user_input_text_msg(
|
||||
"<subagent_notification>{\"agent_id\":\"a\",\"status\":\"completed\"}</subagent_notification>",
|
||||
),
|
||||
user_input_text_msg("turn 1 user"),
|
||||
assistant_msg("turn 1 assistant"),
|
||||
user_input_text_msg("turn 2 user"),
|
||||
@@ -640,6 +652,9 @@ fn drop_last_n_user_turns_ignores_session_prefix_user_messages() {
|
||||
"<skill>\n<name>demo</name>\n<path>skills/demo/SKILL.md</path>\nbody\n</skill>",
|
||||
),
|
||||
user_input_text_msg("<user_shell_command>echo 42</user_shell_command>"),
|
||||
user_input_text_msg(
|
||||
"<subagent_notification>{\"agent_id\":\"a\",\"status\":\"completed\"}</subagent_notification>",
|
||||
),
|
||||
user_input_text_msg("turn 1 user"),
|
||||
assistant_msg("turn 1 assistant"),
|
||||
user_input_text_msg("turn 2 user"),
|
||||
|
||||
@@ -114,6 +114,10 @@ pub fn is_first_party_originator(originator_value: &str) -> bool {
|
||||
|| originator_value.starts_with("Codex ")
|
||||
}
|
||||
|
||||
pub fn is_first_party_chat_originator(originator_value: &str) -> bool {
|
||||
originator_value == "codex_atlas" || originator_value == "codex_chatgpt_desktop"
|
||||
}
|
||||
|
||||
pub fn get_codex_user_agent() -> String {
|
||||
let build_version = env!("CARGO_PKG_VERSION");
|
||||
let os_info = os_info::get();
|
||||
@@ -234,6 +238,17 @@ mod tests {
|
||||
assert_eq!(is_first_party_originator("Other"), false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn is_first_party_chat_originator_matches_known_values() {
|
||||
assert_eq!(is_first_party_chat_originator("codex_atlas"), true);
|
||||
assert_eq!(
|
||||
is_first_party_chat_originator("codex_chatgpt_desktop"),
|
||||
true
|
||||
);
|
||||
assert_eq!(is_first_party_chat_originator(DEFAULT_ORIGINATOR), false);
|
||||
assert_eq!(is_first_party_chat_originator("codex_vscode"), false);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_create_client_sets_default_headers() {
|
||||
skip_if_no_network!();
|
||||
|
||||
@@ -370,7 +370,7 @@ fn legacy_usage_notice(alias: &str, feature: Feature) -> (String, Option<String>
|
||||
None
|
||||
} else {
|
||||
Some(format!(
|
||||
"Enable it with `--enable {canonical}` or `[features].{canonical}` in config.toml. See https://github.com/openai/codex/blob/main/docs/config.md#feature-flags for details."
|
||||
"Enable it with `--enable {canonical}` or `[features].{canonical}` in config.toml. See https://developers.openai.com/codex/config-basic#feature-flags for details."
|
||||
))
|
||||
};
|
||||
(summary, details)
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use codex_protocol::protocol::AgentStatus;
|
||||
|
||||
/// Helpers for identifying model-visible "session prefix" messages.
|
||||
///
|
||||
/// A session prefix is a user-role message that carries configuration or state needed by
|
||||
@@ -6,10 +8,41 @@
|
||||
/// boundaries.
|
||||
pub(crate) const ENVIRONMENT_CONTEXT_OPEN_TAG: &str = "<environment_context>";
|
||||
pub(crate) const TURN_ABORTED_OPEN_TAG: &str = "<turn_aborted>";
|
||||
pub(crate) const SUBAGENT_NOTIFICATION_OPEN_TAG: &str = "<subagent_notification>";
|
||||
pub(crate) const SUBAGENT_NOTIFICATION_CLOSE_TAG: &str = "</subagent_notification>";
|
||||
|
||||
fn starts_with_ascii_case_insensitive(text: &str, prefix: &str) -> bool {
|
||||
text.get(..prefix.len())
|
||||
.is_some_and(|candidate| candidate.eq_ignore_ascii_case(prefix))
|
||||
}
|
||||
|
||||
/// Returns true if `text` starts with a session prefix marker (case-insensitive).
|
||||
pub(crate) fn is_session_prefix(text: &str) -> bool {
|
||||
let trimmed = text.trim_start();
|
||||
let lowered = trimmed.to_ascii_lowercase();
|
||||
lowered.starts_with(ENVIRONMENT_CONTEXT_OPEN_TAG) || lowered.starts_with(TURN_ABORTED_OPEN_TAG)
|
||||
starts_with_ascii_case_insensitive(trimmed, ENVIRONMENT_CONTEXT_OPEN_TAG)
|
||||
|| starts_with_ascii_case_insensitive(trimmed, TURN_ABORTED_OPEN_TAG)
|
||||
|| starts_with_ascii_case_insensitive(trimmed, SUBAGENT_NOTIFICATION_OPEN_TAG)
|
||||
}
|
||||
|
||||
pub(crate) fn format_subagent_notification_message(agent_id: &str, status: &AgentStatus) -> String {
|
||||
let payload_json = serde_json::json!({
|
||||
"agent_id": agent_id,
|
||||
"status": status,
|
||||
})
|
||||
.to_string();
|
||||
format!("{SUBAGENT_NOTIFICATION_OPEN_TAG}\n{payload_json}\n{SUBAGENT_NOTIFICATION_CLOSE_TAG}")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn is_session_prefix_is_case_insensitive() {
|
||||
assert_eq!(
|
||||
is_session_prefix("<SUBAGENT_NOTIFICATION>{}</subagent_notification>"),
|
||||
true
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -95,10 +95,15 @@ impl ShellSnapshot {
|
||||
)
|
||||
.await
|
||||
.map(Arc::new);
|
||||
let success = if snapshot.is_some() { "true" } else { "false" };
|
||||
let _ = timer.map(|timer| timer.record(&[("success", success)]));
|
||||
otel_manager.counter("codex.shell_snapshot", 1, &[("success", success)]);
|
||||
let _ = shell_snapshot_tx.send(snapshot);
|
||||
let success = snapshot.is_ok();
|
||||
let success_tag = if success { "true" } else { "false" };
|
||||
let _ = timer.map(|timer| timer.record(&[("success", success_tag)]));
|
||||
let mut counter_tags = vec![("success", success_tag)];
|
||||
if let Some(failure_reason) = snapshot.as_ref().err() {
|
||||
counter_tags.push(("failure_reason", *failure_reason));
|
||||
}
|
||||
otel_manager.counter("codex.shell_snapshot", 1, &counter_tags);
|
||||
let _ = shell_snapshot_tx.send(snapshot.ok());
|
||||
}
|
||||
.instrument(snapshot_span),
|
||||
);
|
||||
@@ -109,7 +114,7 @@ impl ShellSnapshot {
|
||||
session_id: ThreadId,
|
||||
session_cwd: &Path,
|
||||
shell: &Shell,
|
||||
) -> Option<Self> {
|
||||
) -> std::result::Result<Self, &'static str> {
|
||||
// File to store the snapshot
|
||||
let extension = match shell.shell_type {
|
||||
ShellType::PowerShell => "ps1",
|
||||
@@ -129,32 +134,31 @@ impl ShellSnapshot {
|
||||
});
|
||||
|
||||
// Make the new snapshot.
|
||||
let snapshot =
|
||||
match write_shell_snapshot(shell.shell_type.clone(), &path, session_cwd).await {
|
||||
Ok(path) => {
|
||||
tracing::info!("Shell snapshot successfully created: {}", path.display());
|
||||
Some(Self {
|
||||
path,
|
||||
cwd: session_cwd.to_path_buf(),
|
||||
})
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"Failed to create shell snapshot for {}: {err:?}",
|
||||
shell.name()
|
||||
);
|
||||
None
|
||||
}
|
||||
};
|
||||
let path = match write_shell_snapshot(shell.shell_type.clone(), &path, session_cwd).await {
|
||||
Ok(path) => {
|
||||
tracing::info!("Shell snapshot successfully created: {}", path.display());
|
||||
path
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"Failed to create shell snapshot for {}: {err:?}",
|
||||
shell.name()
|
||||
);
|
||||
return Err("write_failed");
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(snapshot) = snapshot.as_ref()
|
||||
&& let Err(err) = validate_snapshot(shell, &snapshot.path, session_cwd).await
|
||||
{
|
||||
let snapshot = Self {
|
||||
path,
|
||||
cwd: session_cwd.to_path_buf(),
|
||||
};
|
||||
|
||||
if let Err(err) = validate_snapshot(shell, &snapshot.path, session_cwd).await {
|
||||
tracing::error!("Shell snapshot validation failed: {err:?}");
|
||||
return None;
|
||||
return Err("validation_failed");
|
||||
}
|
||||
|
||||
snapshot
|
||||
Ok(snapshot)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -427,7 +427,7 @@ mod resume_agent {
|
||||
}
|
||||
}
|
||||
|
||||
mod wait {
|
||||
pub(crate) mod wait {
|
||||
use super::*;
|
||||
use crate::agent::status::is_final;
|
||||
use futures::FutureExt;
|
||||
@@ -447,10 +447,10 @@ mod wait {
|
||||
timeout_ms: Option<i64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct WaitResult {
|
||||
status: HashMap<ThreadId, AgentStatus>,
|
||||
timed_out: bool,
|
||||
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
|
||||
pub(crate) struct WaitResult {
|
||||
pub(crate) status: HashMap<ThreadId, AgentStatus>,
|
||||
pub(crate) timed_out: bool,
|
||||
}
|
||||
|
||||
pub async fn handle(
|
||||
@@ -1462,12 +1462,6 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, PartialEq, Eq)]
|
||||
struct WaitResult {
|
||||
status: HashMap<ThreadId, AgentStatus>,
|
||||
timed_out: bool,
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn wait_rejects_non_positive_timeout() {
|
||||
let (session, turn) = make_session_and_context().await;
|
||||
@@ -1553,11 +1547,11 @@ mod tests {
|
||||
else {
|
||||
panic!("expected function output");
|
||||
};
|
||||
let result: WaitResult =
|
||||
let result: wait::WaitResult =
|
||||
serde_json::from_str(&content).expect("wait result should be json");
|
||||
assert_eq!(
|
||||
result,
|
||||
WaitResult {
|
||||
wait::WaitResult {
|
||||
status: HashMap::from([
|
||||
(id_a, AgentStatus::NotFound),
|
||||
(id_b, AgentStatus::NotFound),
|
||||
@@ -1597,11 +1591,11 @@ mod tests {
|
||||
else {
|
||||
panic!("expected function output");
|
||||
};
|
||||
let result: WaitResult =
|
||||
let result: wait::WaitResult =
|
||||
serde_json::from_str(&content).expect("wait result should be json");
|
||||
assert_eq!(
|
||||
result,
|
||||
WaitResult {
|
||||
wait::WaitResult {
|
||||
status: HashMap::new(),
|
||||
timed_out: true
|
||||
}
|
||||
@@ -1694,11 +1688,11 @@ mod tests {
|
||||
else {
|
||||
panic!("expected function output");
|
||||
};
|
||||
let result: WaitResult =
|
||||
let result: wait::WaitResult =
|
||||
serde_json::from_str(&content).expect("wait result should be json");
|
||||
assert_eq!(
|
||||
result,
|
||||
WaitResult {
|
||||
wait::WaitResult {
|
||||
status: HashMap::from([(agent_id, AgentStatus::Shutdown)]),
|
||||
timed_out: false
|
||||
}
|
||||
|
||||
@@ -646,7 +646,7 @@ fn create_wait_tool() -> ToolSpec {
|
||||
|
||||
ToolSpec::Function(ResponsesApiTool {
|
||||
name: "wait".to_string(),
|
||||
description: "Wait for agents to reach a final status. Completed statuses may include the agent's final message. Returns empty status when timed out."
|
||||
description: "Wait for agents to reach a final status. Completed statuses may include the agent's final message. Returns empty status when timed out. Once the agent reaches his final status, a notification message will be received containing the same completed status."
|
||||
.to_string(),
|
||||
strict: false,
|
||||
parameters: JsonSchema::Object {
|
||||
|
||||
@@ -50,7 +50,7 @@ async fn emits_deprecation_notice_for_legacy_feature_flag() -> anyhow::Result<()
|
||||
assert_eq!(
|
||||
details.as_deref(),
|
||||
Some(
|
||||
"Enable it with `--enable unified_exec` or `[features].unified_exec` in config.toml. See https://github.com/openai/codex/blob/main/docs/config.md#feature-flags for details."
|
||||
"Enable it with `--enable unified_exec` or `[features].unified_exec` in config.toml. See https://developers.openai.com/codex/config-basic#feature-flags for details."
|
||||
),
|
||||
);
|
||||
|
||||
|
||||
@@ -114,6 +114,7 @@ mod skills;
|
||||
mod sqlite_state;
|
||||
mod stream_error_allows_next_turn;
|
||||
mod stream_no_completed;
|
||||
mod subagent_notifications;
|
||||
mod text_encoding_fix;
|
||||
mod tool_harness;
|
||||
mod tool_parallelism;
|
||||
|
||||
196
codex-rs/core/tests/suite/subagent_notifications.rs
Normal file
196
codex-rs/core/tests/suite/subagent_notifications.rs
Normal file
@@ -0,0 +1,196 @@
|
||||
use anyhow::Result;
|
||||
use codex_core::features::Feature;
|
||||
use core_test_support::responses::ResponsesRequest;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_function_call;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::mount_response_once_match;
|
||||
use core_test_support::responses::mount_sse_once_match;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::sse_response;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::skip_if_no_network;
|
||||
use core_test_support::test_codex::TestCodex;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use serde_json::json;
|
||||
use std::time::Duration;
|
||||
use tokio::time::Instant;
|
||||
use tokio::time::sleep;
|
||||
use wiremock::MockServer;
|
||||
|
||||
const SPAWN_CALL_ID: &str = "spawn-call-1";
|
||||
const TURN_1_PROMPT: &str = "spawn a child and continue";
|
||||
const TURN_2_NO_WAIT_PROMPT: &str = "follow up without wait";
|
||||
const CHILD_PROMPT: &str = "child: do work";
|
||||
|
||||
fn body_contains(req: &wiremock::Request, text: &str) -> bool {
|
||||
let is_zstd = req
|
||||
.headers
|
||||
.get("content-encoding")
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.is_some_and(|value| {
|
||||
value
|
||||
.split(',')
|
||||
.any(|entry| entry.trim().eq_ignore_ascii_case("zstd"))
|
||||
});
|
||||
let bytes = if is_zstd {
|
||||
zstd::stream::decode_all(std::io::Cursor::new(&req.body)).ok()
|
||||
} else {
|
||||
Some(req.body.clone())
|
||||
};
|
||||
bytes
|
||||
.and_then(|body| String::from_utf8(body).ok())
|
||||
.is_some_and(|body| body.contains(text))
|
||||
}
|
||||
|
||||
fn has_subagent_notification(req: &ResponsesRequest) -> bool {
|
||||
req.message_input_texts("user")
|
||||
.iter()
|
||||
.any(|text| text.contains("<subagent_notification>"))
|
||||
}
|
||||
|
||||
async fn wait_for_spawned_thread_id(test: &TestCodex) -> Result<String> {
|
||||
let deadline = Instant::now() + Duration::from_secs(2);
|
||||
loop {
|
||||
let ids = test.thread_manager.list_thread_ids().await;
|
||||
if let Some(spawned_id) = ids
|
||||
.iter()
|
||||
.find(|id| **id != test.session_configured.session_id)
|
||||
{
|
||||
return Ok(spawned_id.to_string());
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
anyhow::bail!("timed out waiting for spawned thread id");
|
||||
}
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_requests(
|
||||
mock: &core_test_support::responses::ResponseMock,
|
||||
) -> Result<Vec<ResponsesRequest>> {
|
||||
let deadline = Instant::now() + Duration::from_secs(2);
|
||||
loop {
|
||||
let requests = mock.requests();
|
||||
if !requests.is_empty() {
|
||||
return Ok(requests);
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
anyhow::bail!("expected at least 1 request, got {}", requests.len());
|
||||
}
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn setup_turn_one_with_spawned_child(
|
||||
server: &MockServer,
|
||||
child_response_delay: Option<Duration>,
|
||||
) -> Result<(TestCodex, String)> {
|
||||
let spawn_args = serde_json::to_string(&json!({
|
||||
"message": CHILD_PROMPT,
|
||||
}))?;
|
||||
|
||||
mount_sse_once_match(
|
||||
server,
|
||||
|req: &wiremock::Request| body_contains(req, TURN_1_PROMPT),
|
||||
sse(vec![
|
||||
ev_response_created("resp-turn1-1"),
|
||||
ev_function_call(SPAWN_CALL_ID, "spawn_agent", &spawn_args),
|
||||
ev_completed("resp-turn1-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let child_sse = sse(vec![
|
||||
ev_response_created("resp-child-1"),
|
||||
ev_assistant_message("msg-child-1", "child done"),
|
||||
ev_completed("resp-child-1"),
|
||||
]);
|
||||
let child_request_log = if let Some(delay) = child_response_delay {
|
||||
mount_response_once_match(
|
||||
server,
|
||||
|req: &wiremock::Request| {
|
||||
body_contains(req, CHILD_PROMPT) && !body_contains(req, SPAWN_CALL_ID)
|
||||
},
|
||||
sse_response(child_sse).set_delay(delay),
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
mount_sse_once_match(
|
||||
server,
|
||||
|req: &wiremock::Request| {
|
||||
body_contains(req, CHILD_PROMPT) && !body_contains(req, SPAWN_CALL_ID)
|
||||
},
|
||||
child_sse,
|
||||
)
|
||||
.await
|
||||
};
|
||||
|
||||
let _turn1_followup = mount_sse_once_match(
|
||||
server,
|
||||
|req: &wiremock::Request| body_contains(req, SPAWN_CALL_ID),
|
||||
sse(vec![
|
||||
ev_response_created("resp-turn1-2"),
|
||||
ev_assistant_message("msg-turn1-2", "parent done"),
|
||||
ev_completed("resp-turn1-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.features.enable(Feature::Collab);
|
||||
});
|
||||
let test = builder.build(server).await?;
|
||||
test.submit_turn(TURN_1_PROMPT).await?;
|
||||
if child_response_delay.is_none() {
|
||||
let _ = wait_for_requests(&child_request_log).await?;
|
||||
let rollout_path = test
|
||||
.codex
|
||||
.rollout_path()
|
||||
.ok_or_else(|| anyhow::anyhow!("expected parent rollout path"))?;
|
||||
let deadline = Instant::now() + Duration::from_secs(6);
|
||||
loop {
|
||||
let has_notification = tokio::fs::read_to_string(&rollout_path)
|
||||
.await
|
||||
.is_ok_and(|rollout| rollout.contains("<subagent_notification>"));
|
||||
if has_notification {
|
||||
break;
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
anyhow::bail!(
|
||||
"timed out waiting for parent rollout to include subagent notification"
|
||||
);
|
||||
}
|
||||
sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
}
|
||||
let spawned_id = wait_for_spawned_thread_id(&test).await?;
|
||||
|
||||
Ok((test, spawned_id))
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn subagent_notification_is_included_without_wait() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let (test, _spawned_id) = setup_turn_one_with_spawned_child(&server, None).await?;
|
||||
|
||||
let turn2 = mount_sse_once_match(
|
||||
&server,
|
||||
|req: &wiremock::Request| body_contains(req, TURN_2_NO_WAIT_PROMPT),
|
||||
sse(vec![
|
||||
ev_response_created("resp-turn2-1"),
|
||||
ev_assistant_message("msg-turn2-1", "no wait path"),
|
||||
ev_completed("resp-turn2-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
test.submit_turn(TURN_2_NO_WAIT_PROMPT).await?;
|
||||
|
||||
let turn2_requests = wait_for_requests(&turn2).await?;
|
||||
assert!(turn2_requests.iter().any(has_subagent_notification));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -135,11 +135,12 @@ fn create_bwrap_flags(
|
||||
///
|
||||
/// The mount order is important:
|
||||
/// 1. `--ro-bind / /` makes the entire filesystem read-only.
|
||||
/// 2. `--bind <root> <root>` re-enables writes for allowed roots.
|
||||
/// 3. `--ro-bind <subpath> <subpath>` re-applies read-only protections under
|
||||
/// 2. `--dev /dev` mounts a minimal writable `/dev` with standard device nodes
|
||||
/// (including `/dev/urandom`) even under a read-only root.
|
||||
/// 3. `--bind <root> <root>` re-enables writes for allowed roots, including
|
||||
/// writable subpaths under `/dev` (for example, `/dev/shm`).
|
||||
/// 4. `--ro-bind <subpath> <subpath>` re-applies read-only protections under
|
||||
/// those writable roots so protected subpaths win.
|
||||
/// 4. `--dev-bind /dev/null /dev/null` preserves the common sink even under a
|
||||
/// read-only root.
|
||||
fn create_filesystem_args(sandbox_policy: &SandboxPolicy, cwd: &Path) -> Result<Vec<String>> {
|
||||
if !sandbox_policy.has_full_disk_read_access() {
|
||||
return Err(CodexErr::UnsupportedOperation(
|
||||
@@ -151,12 +152,18 @@ fn create_filesystem_args(sandbox_policy: &SandboxPolicy, cwd: &Path) -> Result<
|
||||
let writable_roots = sandbox_policy.get_writable_roots_with_cwd(cwd);
|
||||
ensure_mount_targets_exist(&writable_roots)?;
|
||||
|
||||
let mut args = Vec::new();
|
||||
|
||||
// Read-only root, then selectively re-enable writes.
|
||||
args.push("--ro-bind".to_string());
|
||||
args.push("/".to_string());
|
||||
args.push("/".to_string());
|
||||
// Read-only root, then mount a minimal device tree.
|
||||
// In bubblewrap (`bubblewrap.c`, `SETUP_MOUNT_DEV`), `--dev /dev` creates
|
||||
// the standard minimal nodes: null, zero, full, random, urandom, and tty.
|
||||
// `/dev` must be mounted before writable roots so explicit `/dev/*`
|
||||
// writable binds remain visible.
|
||||
let mut args = vec![
|
||||
"--ro-bind".to_string(),
|
||||
"/".to_string(),
|
||||
"/".to_string(),
|
||||
"--dev".to_string(),
|
||||
"/dev".to_string(),
|
||||
];
|
||||
|
||||
for writable_root in &writable_roots {
|
||||
let root = writable_root.root.as_path();
|
||||
@@ -180,12 +187,15 @@ fn create_filesystem_args(sandbox_policy: &SandboxPolicy, cwd: &Path) -> Result<
|
||||
}
|
||||
|
||||
if !subpath.exists() {
|
||||
if let Some(first_missing) = find_first_non_existent_component(&subpath)
|
||||
&& is_within_allowed_write_paths(&first_missing, &allowed_write_paths)
|
||||
// Keep this in the per-subpath loop: each protected subpath can have
|
||||
// a different first missing component that must be blocked
|
||||
// independently (for example, `/repo/.git` vs `/repo/.codex`).
|
||||
if let Some(first_missing_component) = find_first_non_existent_component(&subpath)
|
||||
&& is_within_allowed_write_paths(&first_missing_component, &allowed_write_paths)
|
||||
{
|
||||
args.push("--ro-bind".to_string());
|
||||
args.push("/dev/null".to_string());
|
||||
args.push(path_to_string(&first_missing));
|
||||
args.push(path_to_string(&first_missing_component));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@@ -197,11 +207,6 @@ fn create_filesystem_args(sandbox_policy: &SandboxPolicy, cwd: &Path) -> Result<
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure `/dev/null` remains usable regardless of the root bind.
|
||||
args.push("--dev-bind".to_string());
|
||||
args.push("/dev/null".to_string());
|
||||
args.push("/dev/null".to_string());
|
||||
|
||||
Ok(args)
|
||||
}
|
||||
|
||||
|
||||
@@ -307,7 +307,9 @@ fn close_fd_or_panic(fd: libc::c_int, context: &str) {
|
||||
fn is_proc_mount_failure(stderr: &str) -> bool {
|
||||
stderr.contains("Can't mount proc")
|
||||
&& stderr.contains("/newroot/proc")
|
||||
&& stderr.contains("Invalid argument")
|
||||
&& (stderr.contains("Invalid argument")
|
||||
|| stderr.contains("Operation not permitted")
|
||||
|| stderr.contains("Permission denied"))
|
||||
}
|
||||
|
||||
/// Build the inner command that applies seccomp after bubblewrap.
|
||||
@@ -381,6 +383,18 @@ mod tests {
|
||||
assert_eq!(is_proc_mount_failure(stderr), true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn detects_proc_mount_operation_not_permitted_failure() {
|
||||
let stderr = "bwrap: Can't mount proc on /newroot/proc: Operation not permitted";
|
||||
assert_eq!(is_proc_mount_failure(stderr), true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn detects_proc_mount_permission_denied_failure() {
|
||||
let stderr = "bwrap: Can't mount proc on /newroot/proc: Permission denied";
|
||||
assert_eq!(is_proc_mount_failure(stderr), true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ignores_non_proc_mount_errors() {
|
||||
let stderr = "bwrap: Can't bind mount /dev/null: Operation not permitted";
|
||||
@@ -407,9 +421,8 @@ mod tests {
|
||||
"--ro-bind".to_string(),
|
||||
"/".to_string(),
|
||||
"/".to_string(),
|
||||
"--dev-bind".to_string(),
|
||||
"/dev/null".to_string(),
|
||||
"/dev/null".to_string(),
|
||||
"--dev".to_string(),
|
||||
"/dev".to_string(),
|
||||
"--unshare-pid".to_string(),
|
||||
"--proc".to_string(),
|
||||
"/proc".to_string(),
|
||||
|
||||
@@ -56,7 +56,7 @@ async fn run_cmd_output(
|
||||
writable_roots: &[PathBuf],
|
||||
timeout_ms: u64,
|
||||
) -> codex_core::exec::ExecToolCallOutput {
|
||||
run_cmd_result_with_writable_roots(cmd, writable_roots, timeout_ms, false)
|
||||
run_cmd_result_with_writable_roots(cmd, writable_roots, timeout_ms, false, false)
|
||||
.await
|
||||
.expect("sandboxed command should execute")
|
||||
}
|
||||
@@ -67,6 +67,7 @@ async fn run_cmd_result_with_writable_roots(
|
||||
writable_roots: &[PathBuf],
|
||||
timeout_ms: u64,
|
||||
use_bwrap_sandbox: bool,
|
||||
network_access: bool,
|
||||
) -> Result<codex_core::exec::ExecToolCallOutput> {
|
||||
let cwd = std::env::current_dir().expect("cwd should exist");
|
||||
let sandbox_cwd = cwd.clone();
|
||||
@@ -89,7 +90,7 @@ async fn run_cmd_result_with_writable_roots(
|
||||
.map(|p| AbsolutePathBuf::try_from(p.as_path()).unwrap())
|
||||
.collect(),
|
||||
read_only_access: Default::default(),
|
||||
network_access: false,
|
||||
network_access,
|
||||
// Exclude tmp-related folders from writable roots because we need a
|
||||
// folder that is writable by tests but that we intentionally disallow
|
||||
// writing to in the sandbox.
|
||||
@@ -112,6 +113,13 @@ async fn run_cmd_result_with_writable_roots(
|
||||
|
||||
fn is_bwrap_unavailable_output(output: &codex_core::exec::ExecToolCallOutput) -> bool {
|
||||
output.stderr.text.contains(BWRAP_UNAVAILABLE_ERR)
|
||||
|| (output
|
||||
.stderr
|
||||
.text
|
||||
.contains("Can't mount proc on /newroot/proc")
|
||||
&& (output.stderr.text.contains("Operation not permitted")
|
||||
|| output.stderr.text.contains("Permission denied")
|
||||
|| output.stderr.text.contains("Invalid argument")))
|
||||
}
|
||||
|
||||
async fn should_skip_bwrap_tests() -> bool {
|
||||
@@ -120,6 +128,7 @@ async fn should_skip_bwrap_tests() -> bool {
|
||||
&[],
|
||||
NETWORK_TIMEOUT_MS,
|
||||
true,
|
||||
true,
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -168,14 +177,90 @@ async fn test_root_write() {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_dev_null_write() {
|
||||
run_cmd(
|
||||
if should_skip_bwrap_tests().await {
|
||||
eprintln!("skipping bwrap test: bwrap sandbox prerequisites are unavailable");
|
||||
return;
|
||||
}
|
||||
|
||||
let output = run_cmd_result_with_writable_roots(
|
||||
&["bash", "-lc", "echo blah > /dev/null"],
|
||||
&[],
|
||||
// We have seen timeouts when running this test in CI on GitHub,
|
||||
// so we are using a generous timeout until we can diagnose further.
|
||||
LONG_TIMEOUT_MS,
|
||||
true,
|
||||
true,
|
||||
)
|
||||
.await;
|
||||
.await
|
||||
.expect("sandboxed command should execute");
|
||||
|
||||
assert_eq!(output.exit_code, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn bwrap_populates_minimal_dev_nodes() {
|
||||
if should_skip_bwrap_tests().await {
|
||||
eprintln!("skipping bwrap test: bwrap sandbox prerequisites are unavailable");
|
||||
return;
|
||||
}
|
||||
|
||||
let output = run_cmd_result_with_writable_roots(
|
||||
&[
|
||||
"bash",
|
||||
"-lc",
|
||||
"for node in null zero full random urandom tty; do [ -c \"/dev/$node\" ] || { echo \"missing /dev/$node\" >&2; exit 1; }; done",
|
||||
],
|
||||
&[],
|
||||
LONG_TIMEOUT_MS,
|
||||
true,
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.expect("sandboxed command should execute");
|
||||
|
||||
assert_eq!(output.exit_code, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn bwrap_preserves_writable_dev_shm_bind_mount() {
|
||||
if should_skip_bwrap_tests().await {
|
||||
eprintln!("skipping bwrap test: bwrap sandbox prerequisites are unavailable");
|
||||
return;
|
||||
}
|
||||
if !std::path::Path::new("/dev/shm").exists() {
|
||||
eprintln!("skipping bwrap test: /dev/shm is unavailable in this environment");
|
||||
return;
|
||||
}
|
||||
|
||||
let target_file = match NamedTempFile::new_in("/dev/shm") {
|
||||
Ok(file) => file,
|
||||
Err(err) => {
|
||||
eprintln!("skipping bwrap test: failed to create /dev/shm temp file: {err}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
let target_path = target_file.path().to_path_buf();
|
||||
std::fs::write(&target_path, "host-before").expect("seed /dev/shm file");
|
||||
|
||||
let output = run_cmd_result_with_writable_roots(
|
||||
&[
|
||||
"bash",
|
||||
"-lc",
|
||||
&format!("printf sandbox-after > {}", target_path.to_string_lossy()),
|
||||
],
|
||||
&[PathBuf::from("/dev/shm")],
|
||||
LONG_TIMEOUT_MS,
|
||||
true,
|
||||
true,
|
||||
)
|
||||
.await
|
||||
.expect("sandboxed command should execute");
|
||||
|
||||
assert_eq!(output.exit_code, 0);
|
||||
assert_eq!(
|
||||
std::fs::read_to_string(&target_path).expect("read /dev/shm file"),
|
||||
"sandbox-after"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -306,7 +391,7 @@ async fn sandbox_blocks_nc() {
|
||||
#[tokio::test]
|
||||
async fn sandbox_blocks_git_and_codex_writes_inside_writable_root() {
|
||||
if should_skip_bwrap_tests().await {
|
||||
eprintln!("skipping bwrap test: vendored bwrap was not built in this environment");
|
||||
eprintln!("skipping bwrap test: bwrap sandbox prerequisites are unavailable");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -329,6 +414,7 @@ async fn sandbox_blocks_git_and_codex_writes_inside_writable_root() {
|
||||
&[tmpdir.path().to_path_buf()],
|
||||
LONG_TIMEOUT_MS,
|
||||
true,
|
||||
true,
|
||||
)
|
||||
.await,
|
||||
".git write should be denied under bubblewrap",
|
||||
@@ -344,6 +430,7 @@ async fn sandbox_blocks_git_and_codex_writes_inside_writable_root() {
|
||||
&[tmpdir.path().to_path_buf()],
|
||||
LONG_TIMEOUT_MS,
|
||||
true,
|
||||
true,
|
||||
)
|
||||
.await,
|
||||
".codex write should be denied under bubblewrap",
|
||||
@@ -355,7 +442,7 @@ async fn sandbox_blocks_git_and_codex_writes_inside_writable_root() {
|
||||
#[tokio::test]
|
||||
async fn sandbox_blocks_codex_symlink_replacement_attack() {
|
||||
if should_skip_bwrap_tests().await {
|
||||
eprintln!("skipping bwrap test: vendored bwrap was not built in this environment");
|
||||
eprintln!("skipping bwrap test: bwrap sandbox prerequisites are unavailable");
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -380,6 +467,7 @@ async fn sandbox_blocks_codex_symlink_replacement_attack() {
|
||||
&[tmpdir.path().to_path_buf()],
|
||||
LONG_TIMEOUT_MS,
|
||||
true,
|
||||
true,
|
||||
)
|
||||
.await,
|
||||
".codex symlink replacement should be denied",
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
CREATE INDEX idx_logs_thread_id_ts ON logs(thread_id, ts DESC, ts_nanos DESC, id DESC);
|
||||
|
||||
CREATE INDEX idx_logs_process_uuid_threadless_ts ON logs(process_uuid, ts DESC, ts_nanos DESC, id DESC)
|
||||
WHERE thread_id IS NULL;
|
||||
9
codex-rs/state/migrations/0012_logs_estimated_bytes.sql
Normal file
9
codex-rs/state/migrations/0012_logs_estimated_bytes.sql
Normal file
@@ -0,0 +1,9 @@
|
||||
ALTER TABLE logs ADD COLUMN estimated_bytes INTEGER NOT NULL DEFAULT 0;
|
||||
|
||||
UPDATE logs
|
||||
SET estimated_bytes =
|
||||
LENGTH(CAST(COALESCE(message, '') AS BLOB))
|
||||
+ LENGTH(CAST(level AS BLOB))
|
||||
+ LENGTH(CAST(target AS BLOB))
|
||||
+ LENGTH(CAST(COALESCE(module_path, '') AS BLOB))
|
||||
+ LENGTH(CAST(COALESCE(file, '') AS BLOB));
|
||||
@@ -27,11 +27,13 @@ use sqlx::ConnectOptions;
|
||||
use sqlx::QueryBuilder;
|
||||
use sqlx::Row;
|
||||
use sqlx::Sqlite;
|
||||
use sqlx::SqliteConnection;
|
||||
use sqlx::SqlitePool;
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
use sqlx::sqlite::SqliteJournalMode;
|
||||
use sqlx::sqlite::SqlitePoolOptions;
|
||||
use sqlx::sqlite::SqliteSynchronous;
|
||||
use std::collections::BTreeSet;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
@@ -42,6 +44,12 @@ use uuid::Uuid;
|
||||
mod memories;
|
||||
// Memory-specific CRUD and phase job lifecycle methods live in `runtime/memories.rs`.
|
||||
|
||||
// "Partition" is the retention bucket we cap at 10 MiB:
|
||||
// - one bucket per non-null thread_id
|
||||
// - one bucket per threadless (thread_id IS NULL) non-null process_uuid
|
||||
// - one bucket for threadless rows with process_uuid IS NULL
|
||||
const LOG_PARTITION_SIZE_LIMIT_BYTES: i64 = 10 * 1024 * 1024;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct StateRuntime {
|
||||
codex_home: PathBuf,
|
||||
@@ -359,10 +367,16 @@ FROM threads
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut tx = self.pool.begin().await?;
|
||||
let mut builder = QueryBuilder::<Sqlite>::new(
|
||||
"INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, process_uuid, module_path, file, line) ",
|
||||
"INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, process_uuid, module_path, file, line, estimated_bytes) ",
|
||||
);
|
||||
builder.push_values(entries, |mut row, entry| {
|
||||
let estimated_bytes = entry.message.as_ref().map_or(0, String::len) as i64
|
||||
+ entry.level.len() as i64
|
||||
+ entry.target.len() as i64
|
||||
+ entry.module_path.as_ref().map_or(0, String::len) as i64
|
||||
+ entry.file.as_ref().map_or(0, String::len) as i64;
|
||||
row.push_bind(entry.ts)
|
||||
.push_bind(entry.ts_nanos)
|
||||
.push_bind(&entry.level)
|
||||
@@ -372,9 +386,228 @@ FROM threads
|
||||
.push_bind(&entry.process_uuid)
|
||||
.push_bind(&entry.module_path)
|
||||
.push_bind(&entry.file)
|
||||
.push_bind(entry.line);
|
||||
.push_bind(entry.line)
|
||||
.push_bind(estimated_bytes);
|
||||
});
|
||||
builder.build().execute(self.pool.as_ref()).await?;
|
||||
builder.build().execute(&mut *tx).await?;
|
||||
self.prune_logs_after_insert(entries, &mut tx).await?;
|
||||
tx.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Enforce per-partition log size caps after a successful batch insert.
|
||||
///
|
||||
/// We maintain two independent budgets:
|
||||
/// - Thread logs: rows with `thread_id IS NOT NULL`, capped per `thread_id`.
|
||||
/// - Threadless process logs: rows with `thread_id IS NULL` ("threadless"),
|
||||
/// capped per `process_uuid` (including `process_uuid IS NULL` as its own
|
||||
/// threadless partition).
|
||||
///
|
||||
/// "Threadless" means the log row is not associated with any conversation
|
||||
/// thread, so retention is keyed by process identity instead.
|
||||
///
|
||||
/// This runs inside the same transaction as the insert so callers never
|
||||
/// observe "inserted but not yet pruned" rows.
|
||||
async fn prune_logs_after_insert(
|
||||
&self,
|
||||
entries: &[LogEntry],
|
||||
tx: &mut SqliteConnection,
|
||||
) -> anyhow::Result<()> {
|
||||
let thread_ids: BTreeSet<&str> = entries
|
||||
.iter()
|
||||
.filter_map(|entry| entry.thread_id.as_deref())
|
||||
.collect();
|
||||
if !thread_ids.is_empty() {
|
||||
// Cheap precheck: only run the heavier window-function prune for
|
||||
// threads that are currently above the cap.
|
||||
let mut over_limit_threads_query =
|
||||
QueryBuilder::<Sqlite>::new("SELECT thread_id FROM logs WHERE thread_id IN (");
|
||||
{
|
||||
let mut separated = over_limit_threads_query.separated(", ");
|
||||
for thread_id in &thread_ids {
|
||||
separated.push_bind(*thread_id);
|
||||
}
|
||||
}
|
||||
over_limit_threads_query.push(") GROUP BY thread_id HAVING SUM(");
|
||||
over_limit_threads_query.push("estimated_bytes");
|
||||
over_limit_threads_query.push(") > ");
|
||||
over_limit_threads_query.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
|
||||
let over_limit_thread_ids: Vec<String> = over_limit_threads_query
|
||||
.build()
|
||||
.fetch_all(&mut *tx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|row| row.try_get("thread_id"))
|
||||
.collect::<Result<_, _>>()?;
|
||||
if !over_limit_thread_ids.is_empty() {
|
||||
// Enforce a strict per-thread cap by deleting every row whose
|
||||
// newest-first cumulative bytes exceed the partition budget.
|
||||
let mut prune_threads = QueryBuilder::<Sqlite>::new(
|
||||
r#"
|
||||
DELETE FROM logs
|
||||
WHERE id IN (
|
||||
SELECT id
|
||||
FROM (
|
||||
SELECT
|
||||
id,
|
||||
SUM(
|
||||
"#,
|
||||
);
|
||||
prune_threads.push("estimated_bytes");
|
||||
prune_threads.push(
|
||||
r#"
|
||||
) OVER (
|
||||
PARTITION BY thread_id
|
||||
ORDER BY ts DESC, ts_nanos DESC, id DESC
|
||||
) AS cumulative_bytes
|
||||
FROM logs
|
||||
WHERE thread_id IN (
|
||||
"#,
|
||||
);
|
||||
{
|
||||
let mut separated = prune_threads.separated(", ");
|
||||
for thread_id in &over_limit_thread_ids {
|
||||
separated.push_bind(thread_id);
|
||||
}
|
||||
}
|
||||
prune_threads.push(
|
||||
r#"
|
||||
)
|
||||
)
|
||||
WHERE cumulative_bytes >
|
||||
"#,
|
||||
);
|
||||
prune_threads.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
|
||||
prune_threads.push("\n)");
|
||||
prune_threads.build().execute(&mut *tx).await?;
|
||||
}
|
||||
}
|
||||
|
||||
let threadless_process_uuids: BTreeSet<&str> = entries
|
||||
.iter()
|
||||
.filter(|entry| entry.thread_id.is_none())
|
||||
.filter_map(|entry| entry.process_uuid.as_deref())
|
||||
.collect();
|
||||
let has_threadless_null_process_uuid = entries
|
||||
.iter()
|
||||
.any(|entry| entry.thread_id.is_none() && entry.process_uuid.is_none());
|
||||
if !threadless_process_uuids.is_empty() {
|
||||
// Threadless logs are budgeted separately per process UUID.
|
||||
let mut over_limit_processes_query = QueryBuilder::<Sqlite>::new(
|
||||
"SELECT process_uuid FROM logs WHERE thread_id IS NULL AND process_uuid IN (",
|
||||
);
|
||||
{
|
||||
let mut separated = over_limit_processes_query.separated(", ");
|
||||
for process_uuid in &threadless_process_uuids {
|
||||
separated.push_bind(*process_uuid);
|
||||
}
|
||||
}
|
||||
over_limit_processes_query.push(") GROUP BY process_uuid HAVING SUM(");
|
||||
over_limit_processes_query.push("estimated_bytes");
|
||||
over_limit_processes_query.push(") > ");
|
||||
over_limit_processes_query.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
|
||||
let over_limit_process_uuids: Vec<String> = over_limit_processes_query
|
||||
.build()
|
||||
.fetch_all(&mut *tx)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|row| row.try_get("process_uuid"))
|
||||
.collect::<Result<_, _>>()?;
|
||||
if !over_limit_process_uuids.is_empty() {
|
||||
// Same strict cap policy as thread pruning, but only for
|
||||
// threadless rows in the affected process UUIDs.
|
||||
let mut prune_threadless_process_logs = QueryBuilder::<Sqlite>::new(
|
||||
r#"
|
||||
DELETE FROM logs
|
||||
WHERE id IN (
|
||||
SELECT id
|
||||
FROM (
|
||||
SELECT
|
||||
id,
|
||||
SUM(
|
||||
"#,
|
||||
);
|
||||
prune_threadless_process_logs.push("estimated_bytes");
|
||||
prune_threadless_process_logs.push(
|
||||
r#"
|
||||
) OVER (
|
||||
PARTITION BY process_uuid
|
||||
ORDER BY ts DESC, ts_nanos DESC, id DESC
|
||||
) AS cumulative_bytes
|
||||
FROM logs
|
||||
WHERE thread_id IS NULL
|
||||
AND process_uuid IN (
|
||||
"#,
|
||||
);
|
||||
{
|
||||
let mut separated = prune_threadless_process_logs.separated(", ");
|
||||
for process_uuid in &over_limit_process_uuids {
|
||||
separated.push_bind(process_uuid);
|
||||
}
|
||||
}
|
||||
prune_threadless_process_logs.push(
|
||||
r#"
|
||||
)
|
||||
)
|
||||
WHERE cumulative_bytes >
|
||||
"#,
|
||||
);
|
||||
prune_threadless_process_logs.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
|
||||
prune_threadless_process_logs.push("\n)");
|
||||
prune_threadless_process_logs
|
||||
.build()
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
if has_threadless_null_process_uuid {
|
||||
// Rows without a process UUID still need a cap; treat NULL as its
|
||||
// own threadless partition.
|
||||
let mut null_process_usage_query = QueryBuilder::<Sqlite>::new("SELECT SUM(");
|
||||
null_process_usage_query.push("estimated_bytes");
|
||||
null_process_usage_query.push(
|
||||
") AS total_bytes FROM logs WHERE thread_id IS NULL AND process_uuid IS NULL",
|
||||
);
|
||||
let total_null_process_bytes: Option<i64> = null_process_usage_query
|
||||
.build()
|
||||
.fetch_one(&mut *tx)
|
||||
.await?
|
||||
.try_get("total_bytes")?;
|
||||
|
||||
if total_null_process_bytes.unwrap_or(0) > LOG_PARTITION_SIZE_LIMIT_BYTES {
|
||||
let mut prune_threadless_null_process_logs = QueryBuilder::<Sqlite>::new(
|
||||
r#"
|
||||
DELETE FROM logs
|
||||
WHERE id IN (
|
||||
SELECT id
|
||||
FROM (
|
||||
SELECT
|
||||
id,
|
||||
SUM(
|
||||
"#,
|
||||
);
|
||||
prune_threadless_null_process_logs.push("estimated_bytes");
|
||||
prune_threadless_null_process_logs.push(
|
||||
r#"
|
||||
) OVER (
|
||||
PARTITION BY process_uuid
|
||||
ORDER BY ts DESC, ts_nanos DESC, id DESC
|
||||
) AS cumulative_bytes
|
||||
FROM logs
|
||||
WHERE thread_id IS NULL
|
||||
AND process_uuid IS NULL
|
||||
)
|
||||
WHERE cumulative_bytes >
|
||||
"#,
|
||||
);
|
||||
prune_threadless_null_process_logs.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
|
||||
prune_threadless_null_process_logs.push("\n)");
|
||||
prune_threadless_null_process_logs
|
||||
.build()
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -2553,6 +2786,300 @@ VALUES (?, ?, ?, ?, ?)
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn insert_logs_prunes_old_rows_when_thread_exceeds_size_limit() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let six_mebibytes = "a".repeat(6 * 1024 * 1024);
|
||||
runtime
|
||||
.insert_logs(&[
|
||||
LogEntry {
|
||||
ts: 1,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(six_mebibytes.clone()),
|
||||
thread_id: Some("thread-1".to_string()),
|
||||
process_uuid: Some("proc-1".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(1),
|
||||
module_path: Some("mod".to_string()),
|
||||
},
|
||||
LogEntry {
|
||||
ts: 2,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(six_mebibytes.clone()),
|
||||
thread_id: Some("thread-1".to_string()),
|
||||
process_uuid: Some("proc-1".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(2),
|
||||
module_path: Some("mod".to_string()),
|
||||
},
|
||||
])
|
||||
.await
|
||||
.expect("insert test logs");
|
||||
|
||||
let rows = runtime
|
||||
.query_logs(&LogQuery {
|
||||
thread_ids: vec!["thread-1".to_string()],
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("query thread logs");
|
||||
|
||||
assert_eq!(rows.len(), 1);
|
||||
assert_eq!(rows[0].ts, 2);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn insert_logs_prunes_single_thread_row_when_it_exceeds_size_limit() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let eleven_mebibytes = "d".repeat(11 * 1024 * 1024);
|
||||
runtime
|
||||
.insert_logs(&[LogEntry {
|
||||
ts: 1,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(eleven_mebibytes),
|
||||
thread_id: Some("thread-oversized".to_string()),
|
||||
process_uuid: Some("proc-1".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(1),
|
||||
module_path: Some("mod".to_string()),
|
||||
}])
|
||||
.await
|
||||
.expect("insert test log");
|
||||
|
||||
let rows = runtime
|
||||
.query_logs(&LogQuery {
|
||||
thread_ids: vec!["thread-oversized".to_string()],
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("query thread logs");
|
||||
|
||||
assert!(rows.is_empty());
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn insert_logs_prunes_threadless_rows_per_process_uuid_only() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let six_mebibytes = "b".repeat(6 * 1024 * 1024);
|
||||
runtime
|
||||
.insert_logs(&[
|
||||
LogEntry {
|
||||
ts: 1,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(six_mebibytes.clone()),
|
||||
thread_id: None,
|
||||
process_uuid: Some("proc-1".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(1),
|
||||
module_path: Some("mod".to_string()),
|
||||
},
|
||||
LogEntry {
|
||||
ts: 2,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(six_mebibytes.clone()),
|
||||
thread_id: None,
|
||||
process_uuid: Some("proc-1".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(2),
|
||||
module_path: Some("mod".to_string()),
|
||||
},
|
||||
LogEntry {
|
||||
ts: 3,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(six_mebibytes),
|
||||
thread_id: Some("thread-1".to_string()),
|
||||
process_uuid: Some("proc-1".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(3),
|
||||
module_path: Some("mod".to_string()),
|
||||
},
|
||||
])
|
||||
.await
|
||||
.expect("insert test logs");
|
||||
|
||||
let rows = runtime
|
||||
.query_logs(&LogQuery {
|
||||
thread_ids: vec!["thread-1".to_string()],
|
||||
include_threadless: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("query thread and threadless logs");
|
||||
|
||||
let mut timestamps: Vec<i64> = rows.into_iter().map(|row| row.ts).collect();
|
||||
timestamps.sort_unstable();
|
||||
assert_eq!(timestamps, vec![2, 3]);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn insert_logs_prunes_single_threadless_process_row_when_it_exceeds_size_limit() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let eleven_mebibytes = "e".repeat(11 * 1024 * 1024);
|
||||
runtime
|
||||
.insert_logs(&[LogEntry {
|
||||
ts: 1,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(eleven_mebibytes),
|
||||
thread_id: None,
|
||||
process_uuid: Some("proc-oversized".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(1),
|
||||
module_path: Some("mod".to_string()),
|
||||
}])
|
||||
.await
|
||||
.expect("insert test log");
|
||||
|
||||
let rows = runtime
|
||||
.query_logs(&LogQuery {
|
||||
include_threadless: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("query threadless logs");
|
||||
|
||||
assert!(rows.is_empty());
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn insert_logs_prunes_threadless_rows_with_null_process_uuid() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let six_mebibytes = "c".repeat(6 * 1024 * 1024);
|
||||
runtime
|
||||
.insert_logs(&[
|
||||
LogEntry {
|
||||
ts: 1,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(six_mebibytes.clone()),
|
||||
thread_id: None,
|
||||
process_uuid: None,
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(1),
|
||||
module_path: Some("mod".to_string()),
|
||||
},
|
||||
LogEntry {
|
||||
ts: 2,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(six_mebibytes),
|
||||
thread_id: None,
|
||||
process_uuid: None,
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(2),
|
||||
module_path: Some("mod".to_string()),
|
||||
},
|
||||
LogEntry {
|
||||
ts: 3,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some("small".to_string()),
|
||||
thread_id: None,
|
||||
process_uuid: Some("proc-1".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(3),
|
||||
module_path: Some("mod".to_string()),
|
||||
},
|
||||
])
|
||||
.await
|
||||
.expect("insert test logs");
|
||||
|
||||
let rows = runtime
|
||||
.query_logs(&LogQuery {
|
||||
include_threadless: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("query threadless logs");
|
||||
|
||||
let mut timestamps: Vec<i64> = rows.into_iter().map(|row| row.ts).collect();
|
||||
timestamps.sort_unstable();
|
||||
assert_eq!(timestamps, vec![2, 3]);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn insert_logs_prunes_single_threadless_null_process_row_when_it_exceeds_limit() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let eleven_mebibytes = "f".repeat(11 * 1024 * 1024);
|
||||
runtime
|
||||
.insert_logs(&[LogEntry {
|
||||
ts: 1,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(eleven_mebibytes),
|
||||
thread_id: None,
|
||||
process_uuid: None,
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(1),
|
||||
module_path: Some("mod".to_string()),
|
||||
}])
|
||||
.await
|
||||
.expect("insert test log");
|
||||
|
||||
let rows = runtime
|
||||
.query_logs(&LogQuery {
|
||||
include_threadless: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("query threadless logs");
|
||||
|
||||
assert!(rows.is_empty());
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
fn test_thread_metadata(
|
||||
codex_home: &Path,
|
||||
thread_id: ThreadId,
|
||||
|
||||
Reference in New Issue
Block a user