Compare commits

...

25 Commits

Author SHA1 Message Date
Ahmed Ibrahim
e6016f5489 client 2025-12-02 16:16:38 -08:00
Ahmed Ibrahim
2721498ec9 client 2025-12-02 14:54:33 -08:00
Ahmed Ibrahim
47ef2cd9ca client 2025-12-02 14:45:44 -08:00
Ahmed Ibrahim
db70faab42 copy 2025-11-25 16:01:19 -08:00
Ahmed Ibrahim
1e7796570a copy 2025-11-25 15:56:48 -08:00
Ahmed Ibrahim
11b13914a8 copy 2025-11-25 15:54:53 -08:00
Ahmed Ibrahim
567844fe05 codex-status 2025-11-25 15:47:39 -08:00
Ahmed Ibrahim
6eb0474455 Merge branch 'codex-status' of https://github.com/openai/codex into codex-status 2025-11-25 15:42:32 -08:00
Ahmed Ibrahim
af1254fc4e codex-status 2025-11-25 15:39:03 -08:00
Ahmed Ibrahim
764aff6753 codex-status 2025-11-25 15:38:54 -08:00
Ahmed Ibrahim
e2a55921ec Merge branch 'main' into codex-status 2025-11-25 15:36:46 -08:00
Ahmed Ibrahim
08b6d9ef1f codex-status 2025-11-25 15:35:43 -08:00
Ahmed Ibrahim
bbf536847c codex-status 2025-11-25 15:34:32 -08:00
Ahmed Ibrahim
0d0779d08a codex-status 2025-11-25 15:32:46 -08:00
Ahmed Ibrahim
087e571198 tests 2025-11-25 15:17:14 -08:00
jif-oai
28ff364c3a feat: update process ID for event handling (#7261) 2025-11-25 14:21:05 -08:00
Ahmed Ibrahim
70b613be81 warning 2025-11-25 14:18:41 -08:00
Ahmed Ibrahim
53ff941cf3 warning 2025-11-25 14:18:25 -08:00
Ahmed Ibrahim
6b66534356 status 2025-11-25 12:49:59 -08:00
Ahmed Ibrahim
1938116a5d status 2025-11-25 12:49:04 -08:00
Ahmed Ibrahim
075d50677d use client 2025-11-25 12:44:43 -08:00
Ahmed Ibrahim
5c40534e98 use client 2025-11-25 12:42:25 -08:00
Ahmed Ibrahim
f05492fc94 crate 2025-11-25 12:35:40 -08:00
Ahmed Ibrahim
44ff9fcb69 review 2025-11-25 12:35:40 -08:00
Ahmed Ibrahim
b4a1a500ec status 2025-11-25 12:35:40 -08:00
24 changed files with 984 additions and 172 deletions

2
codex-rs/Cargo.lock generated
View File

@@ -1144,6 +1144,8 @@ dependencies = [
"codex-apply-patch",
"codex-arg0",
"codex-async-utils",
"codex-client",
"codex-core",
"codex-execpolicy",
"codex-file-search",
"codex-git",

View File

@@ -1020,6 +1020,8 @@ pub enum ThreadItem {
command: String,
/// The command's working directory.
cwd: PathBuf,
/// Identifier for the underlying PTY process (when available).
process_id: Option<String>,
status: CommandExecutionStatus,
/// A best-effort parsing of the command to understand the action(s) it will perform.
/// This returns a list of CommandAction objects because a single shell command may

View File

@@ -449,11 +449,13 @@ pub(crate) async fn apply_bespoke_event_handling(
.collect::<Vec<_>>();
let command = shlex_join(&exec_command_begin_event.command);
let cwd = exec_command_begin_event.cwd;
let process_id = exec_command_begin_event.process_id;
let item = ThreadItem::CommandExecution {
id: item_id,
command,
cwd,
process_id,
status: CommandExecutionStatus::InProgress,
command_actions,
aggregated_output: None,
@@ -486,6 +488,7 @@ pub(crate) async fn apply_bespoke_event_handling(
command,
cwd,
parsed_cmd,
process_id,
aggregated_output,
exit_code,
duration,
@@ -514,6 +517,7 @@ pub(crate) async fn apply_bespoke_event_handling(
id: call_id,
command: shlex_join(&command),
cwd,
process_id,
status,
command_actions,
aggregated_output,
@@ -649,6 +653,7 @@ async fn complete_command_execution_item(
item_id: String,
command: String,
cwd: PathBuf,
process_id: Option<String>,
command_actions: Vec<V2ParsedCommand>,
status: CommandExecutionStatus,
outgoing: &OutgoingMessageSender,
@@ -657,6 +662,7 @@ async fn complete_command_execution_item(
id: item_id,
command,
cwd,
process_id,
status,
command_actions,
aggregated_output: None,
@@ -1015,6 +1021,7 @@ async fn on_command_execution_request_approval_response(
item_id.clone(),
command.clone(),
cwd.clone(),
None,
command_actions.clone(),
status,
outgoing.as_ref(),

View File

@@ -15,6 +15,7 @@ pub use mcp_process::McpProcess;
pub use mock_model_server::create_mock_chat_completions_server;
pub use mock_model_server::create_mock_chat_completions_server_unchecked;
pub use responses::create_apply_patch_sse_response;
pub use responses::create_exec_command_sse_response;
pub use responses::create_final_assistant_message_sse_response;
pub use responses::create_shell_command_sse_response;
pub use rollout::create_fake_rollout;

View File

@@ -94,3 +94,42 @@ pub fn create_apply_patch_sse_response(
);
Ok(sse)
}
pub fn create_exec_command_sse_response(call_id: &str) -> anyhow::Result<String> {
let (cmd, args) = if cfg!(windows) {
("cmd.exe", vec!["/d", "/c", "echo hi"])
} else {
("/bin/sh", vec!["-c", "echo hi"])
};
let command = std::iter::once(cmd.to_string())
.chain(args.into_iter().map(str::to_string))
.collect::<Vec<_>>();
let tool_call_arguments = serde_json::to_string(&json!({
"cmd": command.join(" "),
"yield_time_ms": 500
}))?;
let tool_call = json!({
"choices": [
{
"delta": {
"tool_calls": [
{
"id": call_id,
"function": {
"name": "exec_command",
"arguments": tool_call_arguments
}
}
]
},
"finish_reason": "tool_calls"
}
]
});
let sse = format!(
"data: {}\n\ndata: DONE\n\n",
serde_json::to_string(&tool_call)?
);
Ok(sse)
}

View File

@@ -1,6 +1,7 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_apply_patch_sse_response;
use app_test_support::create_exec_command_sse_response;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_chat_completions_server;
use app_test_support::create_mock_chat_completions_server_unchecked;
@@ -907,6 +908,134 @@ async fn turn_start_file_change_approval_decline_v2() -> Result<()> {
Ok(())
}
#[tokio::test]
#[cfg_attr(windows, ignore = "process id reporting differs on Windows")]
async fn command_execution_notifications_include_process_id() -> Result<()> {
skip_if_no_network!(Ok(()));
let responses = vec![
create_exec_command_sse_response("uexec-1")?,
create_final_assistant_message_sse_response("done")?,
];
let server = create_mock_chat_completions_server(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri(), "never")?;
let config_toml = codex_home.path().join("config.toml");
let mut config_contents = std::fs::read_to_string(&config_toml)?;
config_contents.push_str(
r#"
[features]
unified_exec = true
"#,
);
std::fs::write(&config_toml, config_contents)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(start_resp)?;
let turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "run a command".to_string(),
}],
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_id)),
)
.await??;
let TurnStartResponse { turn: _turn } = to_response::<TurnStartResponse>(turn_resp)?;
let started_command = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let notif = mcp
.read_stream_until_notification_message("item/started")
.await?;
let started: ItemStartedNotification = serde_json::from_value(
notif
.params
.clone()
.expect("item/started should include params"),
)?;
if let ThreadItem::CommandExecution { .. } = started.item {
return Ok::<ThreadItem, anyhow::Error>(started.item);
}
}
})
.await??;
let ThreadItem::CommandExecution {
id,
process_id: started_process_id,
status,
..
} = started_command
else {
unreachable!("loop ensures we break on command execution items");
};
assert_eq!(id, "uexec-1");
assert_eq!(status, CommandExecutionStatus::InProgress);
let started_process_id = started_process_id.expect("process id should be present");
let completed_command = timeout(DEFAULT_READ_TIMEOUT, async {
loop {
let notif = mcp
.read_stream_until_notification_message("item/completed")
.await?;
let completed: ItemCompletedNotification = serde_json::from_value(
notif
.params
.clone()
.expect("item/completed should include params"),
)?;
if let ThreadItem::CommandExecution { .. } = completed.item {
return Ok::<ThreadItem, anyhow::Error>(completed.item);
}
}
})
.await??;
let ThreadItem::CommandExecution {
id: completed_id,
process_id: completed_process_id,
status: completed_status,
exit_code,
..
} = completed_command
else {
unreachable!("loop ensures we break on command execution items");
};
assert_eq!(completed_id, "uexec-1");
assert_eq!(completed_status, CommandExecutionStatus::Completed);
assert_eq!(exit_code, Some(0));
assert_eq!(
completed_process_id.as_deref(),
Some(started_process_id.as_str())
);
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(
codex_home: &Path,

View File

@@ -1,8 +1,8 @@
[package]
name = "codex-core"
version.workspace = true
edition.workspace = true
license.workspace = true
name = "codex-core"
version.workspace = true
[lib]
doctest = false
@@ -18,12 +18,13 @@ askama = { workspace = true }
async-channel = { workspace = true }
async-trait = { workspace = true }
base64 = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
chardetng = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
codex-api = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-apply-patch = { workspace = true }
codex-async-utils = { workspace = true }
codex-api = { workspace = true }
codex-client = { workspace = true }
codex-execpolicy = { workspace = true }
codex-file-search = { workspace = true }
codex-git = { workspace = true }
@@ -37,8 +38,8 @@ codex-utils-string = { workspace = true }
codex-windows-sandbox = { package = "codex-windows-sandbox", path = "../windows-sandbox-rs" }
dirs = { workspace = true }
dunce = { workspace = true }
env-flags = { workspace = true }
encoding_rs = { workspace = true }
env-flags = { workspace = true }
eventsource-stream = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
@@ -46,8 +47,10 @@ indexmap = { workspace = true }
keyring = { workspace = true, features = ["crypto-rust"] }
libc = { workspace = true }
mcp-types = { workspace = true }
once_cell = { workspace = true }
os_info = { workspace = true }
rand = { workspace = true }
regex = { workspace = true }
regex-lite = { workspace = true }
reqwest = { workspace = true, features = ["json", "stream"] }
serde = { workspace = true, features = ["derive"] }
@@ -57,9 +60,6 @@ sha2 = { workspace = true }
shlex = { workspace = true }
similar = { workspace = true }
strum_macros = { workspace = true }
url = { workspace = true }
once_cell = { workspace = true }
regex = { workspace = true }
tempfile = { workspace = true }
test-case = "3.3.1"
test-log = { workspace = true }
@@ -83,15 +83,19 @@ toml_edit = { workspace = true }
tracing = { workspace = true, features = ["log"] }
tree-sitter = { workspace = true }
tree-sitter-bash = { workspace = true }
url = { workspace = true }
uuid = { workspace = true, features = ["serde", "v4", "v5"] }
which = { workspace = true }
wildmatch = { workspace = true }
[features]
deterministic_process_ids = []
[target.'cfg(target_os = "linux")'.dependencies]
keyring = { workspace = true, features = ["linux-native-async-persistent"] }
landlock = { workspace = true }
seccompiler = { workspace = true }
keyring = { workspace = true, features = ["linux-native-async-persistent"] }
[target.'cfg(target_os = "macos")'.dependencies]
core-foundation = "0.9"
@@ -115,6 +119,7 @@ keyring = { workspace = true, features = ["sync-secret-service"] }
assert_cmd = { workspace = true }
assert_matches = { workspace = true }
codex-arg0 = { workspace = true }
codex-core = { path = ".", features = ["deterministic_process_ids"] }
core_test_support = { workspace = true }
ctor = { workspace = true }
escargot = { workspace = true }

View File

@@ -106,6 +106,8 @@ use crate::shell;
use crate::state::ActiveTurn;
use crate::state::SessionServices;
use crate::state::SessionState;
use crate::status::ComponentHealth;
use crate::status::maybe_codex_status_warning;
use crate::tasks::GhostSnapshotTask;
use crate::tasks::ReviewTask;
use crate::tasks::SessionTask;
@@ -451,6 +453,16 @@ impl Session {
}
}
pub(crate) async fn replace_codex_backend_status(
&self,
status: ComponentHealth,
) -> Option<ComponentHealth> {
let mut guard = self.services.codex_backend_status.lock().await;
let previous = *guard;
*guard = Some(status);
previous
}
async fn new(
session_configuration: SessionConfiguration,
config: Arc<Config>,
@@ -567,6 +579,7 @@ impl Session {
auth_manager: Arc::clone(&auth_manager),
otel_event_manager,
tool_approvals: Mutex::new(ApprovalStore::default()),
codex_backend_status: Mutex::new(None),
};
let sess = Arc::new(Session {
@@ -2180,6 +2193,19 @@ async fn try_run_turn(
});
sess.persist_rollout_items(&[rollout_item]).await;
let sess_clone = Arc::clone(&sess);
let turn_context_clone = Arc::clone(&turn_context);
tokio::spawn(async move {
if let Some(message) = maybe_codex_status_warning(sess_clone.as_ref()).await {
sess_clone
.send_event(
&turn_context_clone,
EventMsg::Warning(WarningEvent { message }),
)
.await;
}
});
let mut stream = turn_context
.client
.clone()
@@ -2694,6 +2720,7 @@ mod tests {
auth_manager: Arc::clone(&auth_manager),
otel_event_manager: otel_event_manager.clone(),
tool_approvals: Mutex::new(ApprovalStore::default()),
codex_backend_status: Mutex::new(None),
};
let turn_context = Session::make_turn_context(
@@ -2772,6 +2799,7 @@ mod tests {
auth_manager: Arc::clone(&auth_manager),
otel_event_manager: otel_event_manager.clone(),
tool_approvals: Mutex::new(ApprovalStore::default()),
codex_backend_status: Mutex::new(None),
};
let turn_context = Arc::new(Session::make_turn_context(

View File

@@ -42,6 +42,7 @@ pub mod parse_command;
pub mod powershell;
mod response_processing;
pub mod sandboxing;
pub mod status;
mod text_encoding;
pub mod token_data;
mod truncate;

View File

@@ -3,6 +3,7 @@ use std::sync::Arc;
use crate::AuthManager;
use crate::RolloutRecorder;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::status::ComponentHealth;
use crate::tools::sandboxing::ApprovalStore;
use crate::unified_exec::UnifiedExecSessionManager;
use crate::user_notification::UserNotifier;
@@ -22,4 +23,5 @@ pub(crate) struct SessionServices {
pub(crate) auth_manager: Arc<AuthManager>,
pub(crate) otel_event_manager: OtelEventManager,
pub(crate) tool_approvals: Mutex<ApprovalStore>,
pub(crate) codex_backend_status: Mutex<Option<ComponentHealth>>,
}

257
codex-rs/core/src/status.rs Normal file
View File

@@ -0,0 +1,257 @@
use std::sync::OnceLock;
use std::time::Duration;
use crate::codex::Session;
use anyhow::Context;
use anyhow::Result;
use anyhow::anyhow;
use anyhow::bail;
use codex_client::HttpTransport;
use codex_client::Request;
use codex_client::ReqwestTransport;
use codex_client::RetryOn;
use codex_client::RetryPolicy;
use codex_client::run_with_retry;
use http::header::CONTENT_TYPE;
use reqwest::Method;
use serde::Deserialize;
use serde::Serialize;
use strum_macros::Display;
const STATUS_WIDGET_URL: &str = "https://status.openai.com/proxy/status.openai.com";
const CODEX_COMPONENT_NAME: &str = "Codex";
static TEST_STATUS_WIDGET_URL: OnceLock<String> = OnceLock::new();
#[derive(Debug, Clone, Display, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub(crate) enum ComponentHealth {
#[strum(to_string = "operational")]
Operational,
#[strum(to_string = "degraded performance")]
DegradedPerformance,
#[strum(to_string = "partial outage")]
PartialOutage,
#[strum(to_string = "major outage")]
MajorOutage,
#[strum(to_string = "under maintenance")]
UnderMaintenance,
#[serde(other)]
#[strum(to_string = "unknown")]
Unknown,
}
impl ComponentHealth {
fn operational() -> Self {
Self::Operational
}
pub(crate) fn is_operational(self) -> bool {
self == Self::Operational
}
}
pub(crate) async fn maybe_codex_status_warning(session: &Session) -> Option<String> {
let Ok(status) = fetch_codex_health().await else {
return None;
};
let previous = session.replace_codex_backend_status(status).await;
if status.is_operational() || previous == Some(status) {
return None;
}
Some(format!(
"Codex is experiencing a {status}. If a response stalls, try again later. You can follow incident updates at status.openai.com."
))
}
async fn fetch_codex_health() -> Result<ComponentHealth> {
let status_widget_url = status_widget_url();
let client = reqwest::Client::builder()
.connect_timeout(Duration::from_millis(200))
.timeout(Duration::from_millis(300))
.build()
.context("building HTTP client")?;
let transport = ReqwestTransport::new(client);
let policy = RetryPolicy {
max_attempts: 0,
base_delay: Duration::from_millis(100),
retry_on: RetryOn {
retry_429: true,
retry_5xx: true,
retry_transport: true,
},
};
let response = run_with_retry(
policy,
|| Request::new(Method::GET, status_widget_url.clone()),
|req, _attempt| {
let transport = transport.clone();
async move { transport.execute(req).await }
},
)
.await
.context("requesting status widget")?;
let content_type = response
.headers
.get(CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.unwrap_or_default()
.to_ascii_lowercase();
if !content_type.contains("json") {
let snippet = String::from_utf8_lossy(&response.body)
.chars()
.take(200)
.collect::<String>();
bail!(
"Expected JSON from {status_widget_url}: Content-Type={content_type}. Body starts with: {snippet:?}"
);
}
let payload: StatusPayload =
serde_json::from_slice(&response.body).context("parsing status widget JSON")?;
derive_component_health(&payload, CODEX_COMPONENT_NAME)
}
#[derive(Debug, Clone, Deserialize, Default)]
struct StatusPayload {
#[serde(default)]
summary: Summary,
}
#[derive(Debug, Clone, Deserialize, Default)]
struct Summary {
#[serde(default)]
components: Vec<Component>,
#[serde(default)]
affected_components: Vec<AffectedComponent>,
}
#[derive(Debug, Clone, Deserialize)]
struct Component {
id: String,
name: String,
}
#[derive(Debug, Clone, Deserialize)]
struct AffectedComponent {
component_id: String,
#[serde(default = "ComponentHealth::operational")]
status: ComponentHealth,
}
fn derive_component_health(
payload: &StatusPayload,
component_name: &str,
) -> Result<ComponentHealth> {
let component = payload
.summary
.components
.iter()
.find(|component| component.name == component_name)
.ok_or_else(|| anyhow!("Component {component_name:?} not found in status summary"))?;
let status = payload
.summary
.affected_components
.iter()
.find(|affected| affected.component_id == component.id)
.map(|affected| affected.status)
.unwrap_or(ComponentHealth::Operational);
Ok(status)
}
fn status_widget_url() -> String {
TEST_STATUS_WIDGET_URL
.get()
.cloned()
.unwrap_or_else(|| STATUS_WIDGET_URL.to_string())
}
#[doc(hidden)]
#[cfg_attr(not(test), allow(dead_code))]
pub fn set_test_status_widget_url(url: impl Into<String>) {
let _ = TEST_STATUS_WIDGET_URL.set(url.into());
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use serde_json::json;
#[test]
fn uses_affected_component_status() {
let payload = serde_json::from_value::<StatusPayload>(json!({
"summary": {
"id": "sum-1",
"name": "OpenAI",
"components": [
{"id": "cmp-1", "name": "Codex", "status_page_id": "page-1"}
],
"affected_components": [
{"component_id": "cmp-1", "status": "major_outage"}
]
}
}))
.expect("valid payload");
let status = derive_component_health(&payload, "Codex").expect("codex component exists");
assert_eq!(status, ComponentHealth::MajorOutage);
assert!(!status.is_operational());
}
#[test]
fn unknown_status_is_preserved_as_unknown() {
let payload = serde_json::from_value::<StatusPayload>(json!({
"summary": {
"id": "sum-1",
"name": "OpenAI",
"components": [
{"id": "cmp-1", "name": "Codex", "status_page_id": "page-1"}
],
"affected_components": [
{"component_id": "cmp-1", "status": "custom_status"}
]
}
}))
.expect("valid payload");
let status = derive_component_health(&payload, "Codex").expect("codex component exists");
assert_eq!(status, ComponentHealth::Unknown);
assert!(!status.is_operational());
}
#[test]
fn missing_component_returns_error() {
let payload = serde_json::from_value::<StatusPayload>(json!({
"summary": {
"id": "sum-1",
"name": "OpenAI",
"components": [],
"affected_components": []
}
}))
.expect("valid payload");
let error =
derive_component_health(&payload, "Codex").expect_err("missing component should error");
assert!(
error
.to_string()
.contains("Component \"Codex\" not found in status summary")
);
}
}

View File

@@ -81,6 +81,7 @@ impl SessionTask for UserShellCommandTask {
turn_context.as_ref(),
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: call_id.clone(),
process_id: None,
turn_id: turn_context.sub_id.clone(),
command: command.clone(),
cwd: cwd.clone(),
@@ -139,6 +140,7 @@ impl SessionTask for UserShellCommandTask {
turn_context.as_ref(),
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
process_id: None,
turn_id: turn_context.sub_id.clone(),
command: command.clone(),
cwd: cwd.clone(),
@@ -161,6 +163,7 @@ impl SessionTask for UserShellCommandTask {
turn_context.as_ref(),
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: call_id.clone(),
process_id: None,
turn_id: turn_context.sub_id.clone(),
command: command.clone(),
cwd: cwd.clone(),
@@ -205,6 +208,7 @@ impl SessionTask for UserShellCommandTask {
turn_context.as_ref(),
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
process_id: None,
turn_id: turn_context.sub_id.clone(),
command,
cwd,

View File

@@ -65,12 +65,14 @@ pub(crate) async fn emit_exec_command_begin(
parsed_cmd: &[ParsedCommand],
source: ExecCommandSource,
interaction_input: Option<String>,
process_id: Option<&str>,
) {
ctx.session
.send_event(
ctx.turn,
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: ctx.call_id.to_string(),
process_id: process_id.map(str::to_owned),
turn_id: ctx.turn.sub_id.clone(),
command: command.to_vec(),
cwd: cwd.to_path_buf(),
@@ -100,6 +102,7 @@ pub(crate) enum ToolEmitter {
source: ExecCommandSource,
interaction_input: Option<String>,
parsed_cmd: Vec<ParsedCommand>,
process_id: Option<String>,
},
}
@@ -132,6 +135,7 @@ impl ToolEmitter {
cwd: PathBuf,
source: ExecCommandSource,
interaction_input: Option<String>,
process_id: Option<String>,
) -> Self {
let parsed_cmd = parse_command(command);
Self::UnifiedExec {
@@ -140,6 +144,7 @@ impl ToolEmitter {
source,
interaction_input,
parsed_cmd,
process_id,
}
}
@@ -157,7 +162,7 @@ impl ToolEmitter {
) => {
emit_exec_stage(
ctx,
ExecCommandInput::new(command, cwd.as_path(), parsed_cmd, *source, None),
ExecCommandInput::new(command, cwd.as_path(), parsed_cmd, *source, None, None),
stage,
)
.await;
@@ -229,6 +234,7 @@ impl ToolEmitter {
source,
interaction_input,
parsed_cmd,
process_id,
},
stage,
) => {
@@ -240,6 +246,7 @@ impl ToolEmitter {
parsed_cmd,
*source,
interaction_input.as_deref(),
process_id.as_deref(),
),
stage,
)
@@ -319,6 +326,7 @@ struct ExecCommandInput<'a> {
parsed_cmd: &'a [ParsedCommand],
source: ExecCommandSource,
interaction_input: Option<&'a str>,
process_id: Option<&'a str>,
}
impl<'a> ExecCommandInput<'a> {
@@ -328,6 +336,7 @@ impl<'a> ExecCommandInput<'a> {
parsed_cmd: &'a [ParsedCommand],
source: ExecCommandSource,
interaction_input: Option<&'a str>,
process_id: Option<&'a str>,
) -> Self {
Self {
command,
@@ -335,6 +344,7 @@ impl<'a> ExecCommandInput<'a> {
parsed_cmd,
source,
interaction_input,
process_id,
}
}
}
@@ -362,6 +372,7 @@ async fn emit_exec_stage(
exec_input.parsed_cmd,
exec_input.source,
exec_input.interaction_input.map(str::to_owned),
exec_input.process_id,
)
.await;
}
@@ -402,6 +413,7 @@ async fn emit_exec_end(
ctx.turn,
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: ctx.call_id.to_string(),
process_id: exec_input.process_id.map(str::to_owned),
turn_id: ctx.turn.sub_id.clone(),
command: exec_input.command.to_vec(),
cwd: exec_input.cwd.to_path_buf(),

View File

@@ -46,6 +46,7 @@ struct ExecCommandArgs {
#[derive(Debug, Deserialize)]
struct WriteStdinArgs {
// The model is trained on `session_id`.
session_id: i32,
#[serde(default)]
chars: String,
@@ -128,6 +129,7 @@ impl ToolHandler for UnifiedExecHandler {
"failed to parse exec_command arguments: {err:?}"
))
})?;
let process_id = manager.allocate_process_id().await;
let command = get_command(&args);
let ExecCommandArgs {
@@ -168,6 +170,7 @@ impl ToolHandler for UnifiedExecHandler {
cwd.clone(),
ExecCommandSource::UnifiedExecStartup,
None,
Some(process_id.clone()),
);
emitter.emit(event_ctx, ToolEventStage::Begin).await;
@@ -175,6 +178,7 @@ impl ToolHandler for UnifiedExecHandler {
.exec_command(
ExecCommandRequest {
command,
process_id,
yield_time_ms,
max_output_tokens,
workdir,
@@ -197,7 +201,7 @@ impl ToolHandler for UnifiedExecHandler {
manager
.write_stdin(WriteStdinRequest {
call_id: &call_id,
session_id: args.session_id,
process_id: &args.session_id.to_string(),
input: &args.chars,
yield_time_ms: args.yield_time_ms,
max_output_tokens: args.max_output_tokens,
@@ -255,8 +259,9 @@ fn format_response(response: &UnifiedExecResponse) -> String {
sections.push(format!("Process exited with code {exit_code}"));
}
if let Some(session_id) = response.session_id {
sections.push(format!("Process running with session ID {session_id}"));
if let Some(process_id) = &response.process_id {
// Training still uses "session ID".
sections.push(format!("Process running with session ID {process_id}"));
}
if let Some(original_token_count) = response.original_token_count {

View File

@@ -5,8 +5,9 @@ use thiserror::Error;
pub(crate) enum UnifiedExecError {
#[error("Failed to create unified exec session: {message}")]
CreateSession { message: String },
#[error("Unknown session id {session_id}")]
UnknownSessionId { session_id: i32 },
// Called "session" in the model's training.
#[error("Unknown session id {process_id}")]
UnknownSessionId { process_id: String },
#[error("failed to write to stdin")]
WriteToStdin,
#[error("missing command line for unified exec request")]

View File

@@ -22,9 +22,9 @@
//! - `session_manager.rs`: orchestration (approvals, sandboxing, reuse) and request handling.
use std::collections::HashMap;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicI32;
use std::time::Duration;
use rand::Rng;
@@ -67,6 +67,7 @@ impl UnifiedExecContext {
#[derive(Debug)]
pub(crate) struct ExecCommandRequest {
pub command: Vec<String>,
pub process_id: String,
pub yield_time_ms: u64,
pub max_output_tokens: Option<usize>,
pub workdir: Option<PathBuf>,
@@ -77,7 +78,7 @@ pub(crate) struct ExecCommandRequest {
#[derive(Debug)]
pub(crate) struct WriteStdinRequest<'a> {
pub call_id: &'a str,
pub session_id: i32,
pub process_id: &'a str,
pub input: &'a str,
pub yield_time_ms: u64,
pub max_output_tokens: Option<usize>,
@@ -89,7 +90,7 @@ pub(crate) struct UnifiedExecResponse {
pub chunk_id: String,
pub wall_time: Duration,
pub output: String,
pub session_id: Option<i32>,
pub process_id: Option<String>,
pub exit_code: Option<i32>,
pub original_token_count: Option<usize>,
pub session_command: Option<Vec<String>>,
@@ -97,15 +98,16 @@ pub(crate) struct UnifiedExecResponse {
#[derive(Default)]
pub(crate) struct UnifiedExecSessionManager {
next_session_id: AtomicI32,
sessions: Mutex<HashMap<i32, SessionEntry>>,
sessions: Mutex<HashMap<String, SessionEntry>>,
used_session_ids: Mutex<HashSet<String>>,
}
struct SessionEntry {
session: session::UnifiedExecSession,
session: UnifiedExecSession,
session_ref: Arc<Session>,
turn_ref: Arc<TurnContext>,
call_id: String,
process_id: String,
command: Vec<String>,
cwd: PathBuf,
started_at: tokio::time::Instant,
@@ -159,6 +161,11 @@ mod tests {
) -> Result<UnifiedExecResponse, UnifiedExecError> {
let context =
UnifiedExecContext::new(Arc::clone(session), Arc::clone(turn), "call".to_string());
let process_id = session
.services
.unified_exec_manager
.allocate_process_id()
.await;
session
.services
@@ -166,6 +173,7 @@ mod tests {
.exec_command(
ExecCommandRequest {
command: vec!["bash".to_string(), "-lc".to_string(), cmd.to_string()],
process_id,
yield_time_ms,
max_output_tokens: None,
workdir: None,
@@ -179,7 +187,7 @@ mod tests {
async fn write_stdin(
session: &Arc<Session>,
session_id: i32,
process_id: &str,
input: &str,
yield_time_ms: u64,
) -> Result<UnifiedExecResponse, UnifiedExecError> {
@@ -188,7 +196,7 @@ mod tests {
.unified_exec_manager
.write_stdin(WriteStdinRequest {
call_id: "write-stdin",
session_id,
process_id,
input,
yield_time_ms,
max_output_tokens: None,
@@ -221,11 +229,15 @@ mod tests {
let (session, turn) = test_session_and_turn();
let open_shell = exec_command(&session, &turn, "bash -i", 2_500).await?;
let session_id = open_shell.session_id.expect("expected session_id");
let process_id = open_shell
.process_id
.as_ref()
.expect("expected process_id")
.as_str();
write_stdin(
&session,
session_id,
process_id,
"export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
2_500,
)
@@ -233,7 +245,7 @@ mod tests {
let out_2 = write_stdin(
&session,
session_id,
process_id,
"echo $CODEX_INTERACTIVE_SHELL_VAR\n",
2_500,
)
@@ -253,11 +265,15 @@ mod tests {
let (session, turn) = test_session_and_turn();
let shell_a = exec_command(&session, &turn, "bash -i", 2_500).await?;
let session_a = shell_a.session_id.expect("expected session id");
let session_a = shell_a
.process_id
.as_ref()
.expect("expected process id")
.clone();
write_stdin(
&session,
session_a,
session_a.as_str(),
"export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
2_500,
)
@@ -265,9 +281,10 @@ mod tests {
let out_2 =
exec_command(&session, &turn, "echo $CODEX_INTERACTIVE_SHELL_VAR", 2_500).await?;
tokio::time::sleep(Duration::from_secs(2)).await;
assert!(
out_2.session_id.is_none(),
"short command should not retain a session"
out_2.process_id.is_none(),
"short command should not report a process id if it exits quickly"
);
assert!(
!out_2.output.contains("codex"),
@@ -276,7 +293,11 @@ mod tests {
let out_3 = write_stdin(
&session,
session_a,
shell_a
.process_id
.as_ref()
.expect("expected process id")
.as_str(),
"echo $CODEX_INTERACTIVE_SHELL_VAR\n",
2_500,
)
@@ -296,11 +317,15 @@ mod tests {
let (session, turn) = test_session_and_turn();
let open_shell = exec_command(&session, &turn, "bash -i", 2_500).await?;
let session_id = open_shell.session_id.expect("expected session id");
let process_id = open_shell
.process_id
.as_ref()
.expect("expected process id")
.as_str();
write_stdin(
&session,
session_id,
process_id,
"export CODEX_INTERACTIVE_SHELL_VAR=codex\n",
2_500,
)
@@ -308,7 +333,7 @@ mod tests {
let out_2 = write_stdin(
&session,
session_id,
process_id,
"sleep 5 && echo $CODEX_INTERACTIVE_SHELL_VAR\n",
10,
)
@@ -320,7 +345,7 @@ mod tests {
tokio::time::sleep(Duration::from_secs(7)).await;
let out_3 = write_stdin(&session, session_id, "", 100).await?;
let out_3 = write_stdin(&session, process_id, "", 100).await?;
assert!(
out_3.output.contains("codex"),
@@ -337,7 +362,7 @@ mod tests {
let result = exec_command(&session, &turn, "echo codex", 120_000).await?;
assert!(result.session_id.is_none());
assert!(result.process_id.is_some());
assert!(result.output.contains("codex"));
Ok(())
@@ -350,8 +375,8 @@ mod tests {
let result = exec_command(&session, &turn, "echo codex", 2_500).await?;
assert!(
result.session_id.is_none(),
"completed command should not retain session"
result.process_id.is_some(),
"completed command should report a process id"
);
assert!(result.output.contains("codex"));
@@ -375,31 +400,35 @@ mod tests {
let (session, turn) = test_session_and_turn();
let open_shell = exec_command(&session, &turn, "bash -i", 2_500).await?;
let session_id = open_shell.session_id.expect("expected session id");
let process_id = open_shell
.process_id
.as_ref()
.expect("expected process id")
.as_str();
write_stdin(&session, session_id, "exit\n", 2_500).await?;
write_stdin(&session, process_id, "exit\n", 2_500).await?;
tokio::time::sleep(Duration::from_millis(200)).await;
let err = write_stdin(&session, session_id, "", 100)
let err = write_stdin(&session, process_id, "", 100)
.await
.expect_err("expected unknown session error");
match err {
UnifiedExecError::UnknownSessionId { session_id: err_id } => {
assert_eq!(err_id, session_id);
UnifiedExecError::UnknownSessionId { process_id: err_id } => {
assert_eq!(err_id, process_id, "process id should match request");
}
other => panic!("expected UnknownSessionId, got {other:?}"),
}
assert!(
!session
session
.services
.unified_exec_manager
.sessions
.lock()
.await
.contains_key(&session_id)
.is_empty()
);
Ok(())

View File

@@ -1,9 +1,9 @@
use rand::Rng;
use std::cmp::Reverse;
use std::collections::HashMap;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::Notify;
use tokio::sync::mpsc;
use tokio::time::Duration;
@@ -75,9 +75,38 @@ struct PreparedSessionHandles {
turn_ref: Arc<TurnContext>,
command: Vec<String>,
cwd: PathBuf,
process_id: String,
}
impl UnifiedExecSessionManager {
pub(crate) async fn allocate_process_id(&self) -> String {
loop {
let mut store = self.used_session_ids.lock().await;
let process_id = if !cfg!(test) && !cfg!(feature = "deterministic_process_ids") {
// production mode → random
rand::rng().random_range(1_000..100_000).to_string()
} else {
// test or deterministic mode
let next = store
.iter()
.filter_map(|s| s.parse::<i32>().ok())
.max()
.map(|m| std::cmp::max(m, 999) + 1)
.unwrap_or(1000);
next.to_string()
};
if store.contains(&process_id) {
continue;
}
store.insert(process_id.clone());
return process_id;
}
}
pub(crate) async fn exec_command(
&self,
request: ExecCommandRequest,
@@ -122,14 +151,20 @@ impl UnifiedExecSessionManager {
let has_exited = session.has_exited();
let exit_code = session.exit_code();
let chunk_id = generate_chunk_id();
let session_id = if has_exited {
let process_id = if has_exited {
None
} else {
// Only store session if not exited.
let stored_id = self
.store_session(session, context, &request.command, cwd.clone(), start)
.await;
Some(stored_id)
self.store_session(
session,
context,
&request.command,
cwd.clone(),
start,
request.process_id.clone(),
)
.await;
Some(request.process_id.clone())
};
let original_token_count = approx_token_count(&text);
@@ -138,18 +173,18 @@ impl UnifiedExecSessionManager {
chunk_id,
wall_time,
output,
session_id,
process_id: process_id.clone(),
exit_code,
original_token_count: Some(original_token_count),
session_command: Some(request.command.clone()),
};
if response.session_id.is_some() {
if !has_exited {
Self::emit_waiting_status(&context.session, &context.turn, &request.command).await;
}
// If the command completed during this call, emit an ExecCommandEnd via the emitter.
if response.session_id.is_none() {
if has_exited {
let exit = response.exit_code.unwrap_or(-1);
Self::emit_exec_end_from_context(
context,
@@ -158,6 +193,9 @@ impl UnifiedExecSessionManager {
response.output.clone(),
exit,
response.wall_time,
// We always emit the process ID in order to keep consistency between the Begin
// event and the End event.
Some(request.process_id),
)
.await;
}
@@ -169,7 +207,7 @@ impl UnifiedExecSessionManager {
&self,
request: WriteStdinRequest<'_>,
) -> Result<UnifiedExecResponse, UnifiedExecError> {
let session_id = request.session_id;
let process_id = request.process_id.to_string();
let PreparedSessionHandles {
writer_tx,
@@ -180,13 +218,15 @@ impl UnifiedExecSessionManager {
turn_ref,
command: session_command,
cwd: session_cwd,
} = self.prepare_session_handles(session_id).await?;
process_id,
} = self.prepare_session_handles(process_id.as_str()).await?;
let interaction_emitter = ToolEmitter::unified_exec(
&session_command,
session_cwd.clone(),
ExecCommandSource::UnifiedExecInteraction,
(!request.input.is_empty()).then(|| request.input.to_string()),
Some(process_id.clone()),
);
let make_event_ctx = || {
ToolEventCtx::new(
@@ -233,17 +273,21 @@ impl UnifiedExecSessionManager {
let original_token_count = approx_token_count(&text);
let chunk_id = generate_chunk_id();
let status = self.refresh_session_state(session_id).await;
let (session_id, exit_code, completion_entry, event_call_id) = match status {
SessionStatus::Alive { exit_code, call_id } => {
(Some(session_id), exit_code, None, call_id)
}
let status = self.refresh_session_state(process_id.as_str()).await;
let (process_id, exit_code, completion_entry, event_call_id) = match status {
SessionStatus::Alive {
exit_code,
call_id,
process_id,
} => (Some(process_id), exit_code, None, call_id),
SessionStatus::Exited { exit_code, entry } => {
let call_id = entry.call_id.clone();
(None, exit_code, Some(*entry), call_id)
}
SessionStatus::Unknown => {
return Err(UnifiedExecError::UnknownSessionId { session_id });
return Err(UnifiedExecError::UnknownSessionId {
process_id: request.process_id.to_string(),
});
}
};
@@ -252,7 +296,7 @@ impl UnifiedExecSessionManager {
chunk_id,
wall_time,
output,
session_id,
process_id,
exit_code,
original_token_count: Some(original_token_count),
session_command: Some(session_command.clone()),
@@ -273,7 +317,7 @@ impl UnifiedExecSessionManager {
)
.await;
if response.session_id.is_some() {
if response.process_id.is_some() {
Self::emit_waiting_status(&session_ref, &turn_ref, &session_command).await;
}
@@ -286,16 +330,17 @@ impl UnifiedExecSessionManager {
Ok(response)
}
async fn refresh_session_state(&self, session_id: i32) -> SessionStatus {
async fn refresh_session_state(&self, process_id: &str) -> SessionStatus {
let mut sessions = self.sessions.lock().await;
let Some(entry) = sessions.get(&session_id) else {
let Some(entry) = sessions.get(process_id) else {
return SessionStatus::Unknown;
};
let exit_code = entry.session.exit_code();
let process_id = entry.process_id.clone();
if entry.session.has_exited() {
let Some(entry) = sessions.remove(&session_id) else {
let Some(entry) = sessions.remove(&process_id) else {
return SessionStatus::Unknown;
};
SessionStatus::Exited {
@@ -306,18 +351,21 @@ impl UnifiedExecSessionManager {
SessionStatus::Alive {
exit_code,
call_id: entry.call_id.clone(),
process_id,
}
}
}
async fn prepare_session_handles(
&self,
session_id: i32,
process_id: &str,
) -> Result<PreparedSessionHandles, UnifiedExecError> {
let mut sessions = self.sessions.lock().await;
let entry = sessions
.get_mut(&session_id)
.ok_or(UnifiedExecError::UnknownSessionId { session_id })?;
.get_mut(process_id)
.ok_or(UnifiedExecError::UnknownSessionId {
process_id: process_id.to_string(),
})?;
entry.last_used = Instant::now();
let OutputHandles {
output_buffer,
@@ -334,6 +382,7 @@ impl UnifiedExecSessionManager {
turn_ref: Arc::clone(&entry.turn_ref),
command: entry.command.clone(),
cwd: entry.cwd.clone(),
process_id: entry.process_id.clone(),
})
}
@@ -347,6 +396,7 @@ impl UnifiedExecSessionManager {
.map_err(|_| UnifiedExecError::WriteToStdin)
}
#[allow(clippy::too_many_arguments)]
async fn store_session(
&self,
session: UnifiedExecSession,
@@ -354,15 +404,14 @@ impl UnifiedExecSessionManager {
command: &[String],
cwd: PathBuf,
started_at: Instant,
) -> i32 {
let session_id = self
.next_session_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
process_id: String,
) {
let entry = SessionEntry {
session,
session_ref: Arc::clone(&context.session),
turn_ref: Arc::clone(&context.turn),
call_id: context.call_id.clone(),
process_id: process_id.clone(),
command: command.to_vec(),
cwd,
started_at,
@@ -370,8 +419,7 @@ impl UnifiedExecSessionManager {
};
let mut sessions = self.sessions.lock().await;
Self::prune_sessions_if_needed(&mut sessions);
sessions.insert(session_id, entry);
session_id
sessions.insert(process_id, entry);
}
async fn emit_exec_end_from_entry(
@@ -399,6 +447,7 @@ impl UnifiedExecSessionManager {
entry.cwd,
ExecCommandSource::UnifiedExecStartup,
None,
Some(entry.process_id.clone()),
);
emitter
.emit(event_ctx, ToolEventStage::Success(output))
@@ -412,6 +461,7 @@ impl UnifiedExecSessionManager {
aggregated_output: String,
exit_code: i32,
duration: Duration,
process_id: Option<String>,
) {
let output = ExecToolCallOutput {
exit_code,
@@ -427,8 +477,13 @@ impl UnifiedExecSessionManager {
&context.call_id,
None,
);
let emitter =
ToolEmitter::unified_exec(command, cwd, ExecCommandSource::UnifiedExecStartup, None);
let emitter = ToolEmitter::unified_exec(
command,
cwd,
ExecCommandSource::UnifiedExecStartup,
None,
process_id,
);
emitter
.emit(event_ctx, ToolEventStage::Success(output))
.await;
@@ -574,14 +629,14 @@ impl UnifiedExecSessionManager {
collected
}
fn prune_sessions_if_needed(sessions: &mut HashMap<i32, SessionEntry>) {
fn prune_sessions_if_needed(sessions: &mut HashMap<String, SessionEntry>) {
if sessions.len() < MAX_UNIFIED_EXEC_SESSIONS {
return;
}
let meta: Vec<(i32, Instant, bool)> = sessions
let meta: Vec<(String, Instant, bool)> = sessions
.iter()
.map(|(id, entry)| (*id, entry.last_used, entry.session.has_exited()))
.map(|(id, entry)| (id.clone(), entry.last_used, entry.session.has_exited()))
.collect();
if let Some(session_id) = Self::session_id_to_prune_from_meta(&meta) {
@@ -590,32 +645,32 @@ impl UnifiedExecSessionManager {
}
// Centralized pruning policy so we can easily swap strategies later.
fn session_id_to_prune_from_meta(meta: &[(i32, Instant, bool)]) -> Option<i32> {
fn session_id_to_prune_from_meta(meta: &[(String, Instant, bool)]) -> Option<String> {
if meta.is_empty() {
return None;
}
let mut by_recency = meta.to_vec();
by_recency.sort_by_key(|(_, last_used, _)| Reverse(*last_used));
let protected: HashSet<i32> = by_recency
let protected: HashSet<String> = by_recency
.iter()
.take(8)
.map(|(session_id, _, _)| *session_id)
.map(|(process_id, _, _)| process_id.clone())
.collect();
let mut lru = meta.to_vec();
lru.sort_by_key(|(_, last_used, _)| *last_used);
if let Some((session_id, _, _)) = lru
if let Some((process_id, _, _)) = lru
.iter()
.find(|(session_id, _, exited)| !protected.contains(session_id) && *exited)
.find(|(process_id, _, exited)| !protected.contains(process_id) && *exited)
{
return Some(*session_id);
return Some(process_id.clone());
}
lru.into_iter()
.find(|(session_id, _, _)| !protected.contains(session_id))
.map(|(session_id, _, _)| session_id)
.find(|(process_id, _, _)| !protected.contains(process_id))
.map(|(process_id, _, _)| process_id)
}
pub(crate) async fn terminate_all_sessions(&self) {
@@ -628,6 +683,7 @@ enum SessionStatus {
Alive {
exit_code: Option<i32>,
call_id: String,
process_id: String,
},
Exited {
exit_code: Option<i32>,
@@ -675,64 +731,67 @@ mod tests {
#[test]
fn pruning_prefers_exited_sessions_outside_recently_used() {
let now = Instant::now();
let id = |n: i32| n.to_string();
let meta = vec![
(1, now - Duration::from_secs(40), false),
(2, now - Duration::from_secs(30), true),
(3, now - Duration::from_secs(20), false),
(4, now - Duration::from_secs(19), false),
(5, now - Duration::from_secs(18), false),
(6, now - Duration::from_secs(17), false),
(7, now - Duration::from_secs(16), false),
(8, now - Duration::from_secs(15), false),
(9, now - Duration::from_secs(14), false),
(10, now - Duration::from_secs(13), false),
(id(1), now - Duration::from_secs(40), false),
(id(2), now - Duration::from_secs(30), true),
(id(3), now - Duration::from_secs(20), false),
(id(4), now - Duration::from_secs(19), false),
(id(5), now - Duration::from_secs(18), false),
(id(6), now - Duration::from_secs(17), false),
(id(7), now - Duration::from_secs(16), false),
(id(8), now - Duration::from_secs(15), false),
(id(9), now - Duration::from_secs(14), false),
(id(10), now - Duration::from_secs(13), false),
];
let candidate = UnifiedExecSessionManager::session_id_to_prune_from_meta(&meta);
assert_eq!(candidate, Some(2));
assert_eq!(candidate, Some(id(2)));
}
#[test]
fn pruning_falls_back_to_lru_when_no_exited() {
let now = Instant::now();
let id = |n: i32| n.to_string();
let meta = vec![
(1, now - Duration::from_secs(40), false),
(2, now - Duration::from_secs(30), false),
(3, now - Duration::from_secs(20), false),
(4, now - Duration::from_secs(19), false),
(5, now - Duration::from_secs(18), false),
(6, now - Duration::from_secs(17), false),
(7, now - Duration::from_secs(16), false),
(8, now - Duration::from_secs(15), false),
(9, now - Duration::from_secs(14), false),
(10, now - Duration::from_secs(13), false),
(id(1), now - Duration::from_secs(40), false),
(id(2), now - Duration::from_secs(30), false),
(id(3), now - Duration::from_secs(20), false),
(id(4), now - Duration::from_secs(19), false),
(id(5), now - Duration::from_secs(18), false),
(id(6), now - Duration::from_secs(17), false),
(id(7), now - Duration::from_secs(16), false),
(id(8), now - Duration::from_secs(15), false),
(id(9), now - Duration::from_secs(14), false),
(id(10), now - Duration::from_secs(13), false),
];
let candidate = UnifiedExecSessionManager::session_id_to_prune_from_meta(&meta);
assert_eq!(candidate, Some(1));
assert_eq!(candidate, Some(id(1)));
}
#[test]
fn pruning_protects_recent_sessions_even_if_exited() {
let now = Instant::now();
let id = |n: i32| n.to_string();
let meta = vec![
(1, now - Duration::from_secs(40), false),
(2, now - Duration::from_secs(30), false),
(3, now - Duration::from_secs(20), true),
(4, now - Duration::from_secs(19), false),
(5, now - Duration::from_secs(18), false),
(6, now - Duration::from_secs(17), false),
(7, now - Duration::from_secs(16), false),
(8, now - Duration::from_secs(15), false),
(9, now - Duration::from_secs(14), false),
(10, now - Duration::from_secs(13), true),
(id(1), now - Duration::from_secs(40), false),
(id(2), now - Duration::from_secs(30), false),
(id(3), now - Duration::from_secs(20), true),
(id(4), now - Duration::from_secs(19), false),
(id(5), now - Duration::from_secs(18), false),
(id(6), now - Duration::from_secs(17), false),
(id(7), now - Duration::from_secs(16), false),
(id(8), now - Duration::from_secs(15), false),
(id(9), now - Duration::from_secs(14), false),
(id(10), now - Duration::from_secs(13), true),
];
let candidate = UnifiedExecSessionManager::session_id_to_prune_from_meta(&meta);
// (10) is exited but among the last 8; we should drop the LRU outside that set.
assert_eq!(candidate, Some(1));
assert_eq!(candidate, Some(id(1)));
}
}

View File

@@ -1,5 +1,6 @@
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use anyhow::Result;
use base64::Engine;
@@ -546,6 +547,19 @@ pub async fn mount_sse_once(server: &MockServer, body: String) -> ResponseMock {
response_mock
}
pub async fn mount_sse_once_with_delay(
server: &MockServer,
body: String,
delay: Duration,
) -> ResponseMock {
let (mock, response_mock) = base_mock();
mock.respond_with(sse_response(body).set_delay(delay))
.up_to_n_times(1)
.mount(server)
.await;
response_mock
}
pub async fn mount_compact_json_once_match<M>(
server: &MockServer,
matcher: M,

View File

@@ -0,0 +1,197 @@
#![allow(clippy::unwrap_used, clippy::expect_used)]
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_core::protocol::WarningEvent;
use codex_core::status::set_test_status_widget_url;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_response_created;
use core_test_support::responses::mount_sse_once_with_delay;
use core_test_support::responses::sse;
use core_test_support::responses::start_mock_server;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use pretty_assertions::assert_eq;
use wiremock::Mock;
use wiremock::Request;
use wiremock::Respond;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn emits_warning_when_status_is_degraded_at_turn_start() {
let status_server = start_mock_server().await;
let status_path = "/proxy/status.openai.com";
Mock::given(method("GET"))
.and(path(status_path))
.respond_with(status_payload("major_outage"))
.mount(&status_server)
.await;
set_test_status_widget_url(format!("{}{}", status_server.uri(), status_path));
let responses_server = start_mock_server().await;
let stalled_response = sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "finally"),
ev_completed("resp-1"),
]);
let _responses_mock = mount_sse_once_with_delay(
&responses_server,
stalled_response,
Duration::from_millis(10),
)
.await;
let test_codex = test_codex().build(&responses_server).await.unwrap();
let codex = test_codex.codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text { text: "hi".into() }],
})
.await
.unwrap();
let warning = wait_for_event(&codex, |event| matches!(event, EventMsg::Warning(_))).await;
let EventMsg::Warning(WarningEvent { message }) = warning else {
panic!("expected warning event");
};
assert_eq!(
message,
"Codex is experiencing a major outage. If a response stalls, try again later. You can follow incident updates at status.openai.com.",
"unexpected warning message"
);
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn warns_once_per_status_change_only_when_unhealthy() {
let status_server = start_mock_server().await;
let status_path = "/proxy/status.openai.com";
let responder = SequenceResponder::new(vec!["major_outage", "partial_outage"]);
Mock::given(method("GET"))
.and(path(status_path))
.respond_with(responder)
.mount(&status_server)
.await;
set_test_status_widget_url(format!("{}{}", status_server.uri(), status_path));
let responses_server = start_mock_server().await;
let stalled_response = sse(vec![
ev_response_created("resp-1"),
ev_assistant_message("msg-1", "finally"),
ev_completed("resp-1"),
]);
let _responses_mock = mount_sse_once_with_delay(
&responses_server,
stalled_response,
Duration::from_millis(300),
)
.await;
let test_codex = test_codex().build(&responses_server).await.unwrap();
let codex = test_codex.codex;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text { text: "hi".into() }],
})
.await
.unwrap();
let first_warning = wait_for_event(&codex, |event| matches!(event, EventMsg::Warning(_))).await;
let EventMsg::Warning(WarningEvent { message }) = first_warning else {
panic!("expected warning event");
};
assert_eq!(
message,
"Codex is experiencing a major outage. If a response stalls, try again later. You can follow incident updates at status.openai.com.",
);
wait_for_event(&codex, |event| matches!(event, EventMsg::TaskComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "second".into(),
}],
})
.await
.unwrap();
let mut saw_warning = false;
let mut saw_complete = false;
while !(saw_warning && saw_complete) {
let event = codex.next_event().await.expect("event");
match event.msg {
EventMsg::Warning(WarningEvent { message }) => {
assert_eq!(
message,
"Codex is experiencing a partial outage. If a response stalls, try again later. You can follow incident updates at status.openai.com.",
);
saw_warning = true;
}
EventMsg::TaskComplete(_) => {
saw_complete = true;
}
_ => {}
}
}
}
fn status_payload(status: &str) -> ResponseTemplate {
ResponseTemplate::new(200)
.insert_header("content-type", "application/json")
.set_body_json(serde_json::json!({
"summary": {
"components": [
{"id": "cmp-1", "name": "Codex", "status_page_id": "page-1"}
],
"affected_components": [
{"component_id": "cmp-1", "status": status}
]
}
}))
}
#[derive(Clone)]
struct SequenceResponder {
statuses: Vec<&'static str>,
calls: Arc<AtomicUsize>,
}
impl SequenceResponder {
fn new(statuses: Vec<&'static str>) -> Self {
Self {
statuses,
calls: Arc::new(AtomicUsize::new(0)),
}
}
}
impl Respond for SequenceResponder {
fn respond(&self, _request: &Request) -> ResponseTemplate {
let call = self.calls.fetch_add(1, Ordering::SeqCst);
let idx = usize::try_from(call).unwrap_or(0);
let status = self
.statuses
.get(idx)
.copied()
.or_else(|| self.statuses.last().copied())
.unwrap_or("operational");
status_payload(status)
}
}

View File

@@ -31,6 +31,7 @@ mod exec;
mod exec_policy;
mod fork_conversation;
mod grep_files;
mod idle_warning;
mod items;
mod json_result;
mod list_dir;

View File

@@ -44,7 +44,7 @@ fn extract_output_text(item: &Value) -> Option<&str> {
struct ParsedUnifiedExecOutput {
chunk_id: Option<String>,
wall_time_seconds: f64,
session_id: Option<i32>,
process_id: Option<String>,
exit_code: Option<i32>,
original_token_count: Option<usize>,
output: String,
@@ -59,7 +59,7 @@ fn parse_unified_exec_output(raw: &str) -> Result<ParsedUnifiedExecOutput> {
r#"(?:Chunk ID: (?P<chunk_id>[^\n]+)\n)?"#,
r#"Wall time: (?P<wall_time>-?\d+(?:\.\d+)?) seconds\n"#,
r#"(?:Process exited with code (?P<exit_code>-?\d+)\n)?"#,
r#"(?:Process running with session ID (?P<session_id>-?\d+)\n)?"#,
r#"(?:Process running with session ID (?P<process_id>-?\d+)\n)?"#,
r#"(?:Original token count: (?P<original_token_count>\d+)\n)?"#,
r#"Output:\n?(?P<output>.*)$"#,
))
@@ -92,15 +92,9 @@ fn parse_unified_exec_output(raw: &str) -> Result<ParsedUnifiedExecOutput> {
})
.transpose()?;
let session_id = captures
.name("session_id")
.map(|value| {
value
.as_str()
.parse::<i32>()
.context("failed to parse session id from unified exec output")
})
.transpose()?;
let process_id = captures
.name("process_id")
.map(|value| value.as_str().to_string());
let original_token_count = captures
.name("original_token_count")
@@ -121,7 +115,7 @@ fn parse_unified_exec_output(raw: &str) -> Result<ParsedUnifiedExecOutput> {
Ok(ParsedUnifiedExecOutput {
chunk_id,
wall_time_seconds,
session_id,
process_id,
exit_code,
original_token_count,
output,
@@ -335,7 +329,7 @@ async fn unified_exec_emits_exec_command_end_event() -> Result<()> {
let poll_call_id = "uexec-end-event-poll";
let poll_args = json!({
"chars": "",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 250,
});
@@ -493,7 +487,7 @@ async fn unified_exec_emits_output_delta_for_write_stdin() -> Result<()> {
let stdin_call_id = "uexec-stdin-delta";
let stdin_args = json!({
"chars": "echo WSTDIN-MARK\\n",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 800,
});
@@ -592,7 +586,7 @@ async fn unified_exec_emits_begin_for_write_stdin() -> Result<()> {
let stdin_call_id = "uexec-stdin-begin";
let stdin_args = json!({
"chars": "echo hello",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 400,
});
@@ -694,7 +688,7 @@ async fn unified_exec_emits_begin_event_for_write_stdin_requests() -> Result<()>
let poll_call_id = "uexec-poll-empty";
let poll_args = json!({
"chars": "",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 150,
});
@@ -880,8 +874,8 @@ async fn exec_command_reports_chunk_and_exit_metadata() -> Result<()> {
);
assert!(
metadata.session_id.is_none(),
"exec_command for a completed process should not include session_id"
metadata.process_id.is_none(),
"exec_command for a completed process should not include process_id"
);
let exit_code = metadata.exit_code.expect("expected exit_code");
@@ -973,7 +967,7 @@ async fn unified_exec_respects_early_exit_notifications() -> Result<()> {
.expect("missing early exit unified_exec output");
assert!(
output.session_id.is_none(),
output.process_id.is_none(),
"short-lived process should not keep a session alive"
);
assert_eq!(
@@ -1023,12 +1017,12 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
});
let send_args = serde_json::json!({
"chars": "hello unified exec\n",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 500,
});
let exit_args = serde_json::json!({
"chars": "\u{0004}",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 500,
});
@@ -1099,12 +1093,13 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
let start_output = outputs
.get(start_call_id)
.expect("missing start output for exec_command");
let session_id = start_output
.session_id
.expect("expected session id from exec_command");
let process_id = start_output
.process_id
.clone()
.expect("expected process id from exec_command");
assert!(
session_id >= 0,
"session_id should be non-negative, got {session_id}"
process_id.len() > 3,
"process_id should be at least 4 digits, got {process_id}"
);
assert!(
start_output.exit_code.is_none(),
@@ -1120,11 +1115,12 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
"expected echoed output from cat, got {echoed:?}"
);
let echoed_session = send_output
.session_id
.expect("write_stdin should return session id while process is running");
.process_id
.clone()
.expect("write_stdin should return process id while process is running");
assert_eq!(
echoed_session, session_id,
"write_stdin should reuse existing session id"
echoed_session, process_id,
"write_stdin should reuse existing process id"
);
assert!(
send_output.exit_code.is_none(),
@@ -1135,8 +1131,8 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
.get(exit_call_id)
.expect("missing exit metadata output");
assert!(
exit_output.session_id.is_none(),
"session_id should be omitted once the process exits"
exit_output.process_id.is_none(),
"process_id should be omitted once the process exits"
);
let exit_code = exit_output
.exit_code
@@ -1182,14 +1178,14 @@ async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<()
let echo_call_id = "uexec-end-on-exit-echo";
let echo_args = serde_json::json!({
"chars": "bye-END\n",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 300,
});
let exit_call_id = "uexec-end-on-exit";
let exit_args = serde_json::json!({
"chars": "\u{0004}",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 500,
});
@@ -1285,7 +1281,7 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> {
let second_call_id = "uexec-stdin";
let second_args = serde_json::json!({
"chars": "hello unified exec\n",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 500,
});
@@ -1347,17 +1343,20 @@ async fn unified_exec_reuses_session_via_stdin() -> Result<()> {
let start_output = outputs
.get(first_call_id)
.expect("missing first unified_exec output");
let session_id = start_output.session_id.unwrap_or_default();
let process_id = start_output.process_id.clone().unwrap_or_default();
assert!(
session_id >= 0,
"expected session id in first unified_exec response"
!process_id.is_empty(),
"expected process id in first unified_exec response"
);
assert!(start_output.output.is_empty());
let reuse_output = outputs
.get(second_call_id)
.expect("missing reused unified_exec output");
assert_eq!(reuse_output.session_id.unwrap_or_default(), session_id);
assert_eq!(
reuse_output.process_id.clone().unwrap_or_default(),
process_id
);
let echoed = reuse_output.output.as_str();
assert!(
echoed.contains("hello unified exec"),
@@ -1413,7 +1412,7 @@ PY
let second_call_id = "uexec-lag-poll";
let second_args = serde_json::json!({
"chars": "",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 2_000,
});
@@ -1480,9 +1479,9 @@ PY
let start_output = outputs
.get(first_call_id)
.expect("missing initial unified_exec output");
let session_id = start_output.session_id.unwrap_or_default();
let process_id = start_output.process_id.clone().unwrap_or_default();
assert!(
session_id >= 0,
!process_id.is_empty(),
"expected session id from initial unified_exec response"
);
@@ -1524,7 +1523,7 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> {
let second_call_id = "uexec-poll";
let second_args = serde_json::json!({
"chars": "",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 800,
});
@@ -1589,7 +1588,7 @@ async fn unified_exec_timeout_and_followup_poll() -> Result<()> {
let outputs = collect_tool_outputs(&bodies)?;
let first_output = outputs.get(first_call_id).expect("missing timeout output");
assert_eq!(first_output.session_id, Some(0));
assert!(first_output.process_id.is_some());
assert!(first_output.output.is_empty());
let poll_output = outputs.get(second_call_id).expect("missing poll output");
@@ -1824,7 +1823,7 @@ async fn unified_exec_prunes_exited_sessions_first() -> Result<()> {
let keep_write_call_id = "uexec-prune-keep-write";
let keep_write_args = serde_json::json!({
"chars": "still alive\n",
"session_id": 0,
"session_id": 1000,
"yield_time_ms": 500,
});
events.push(ev_function_call(
@@ -1836,7 +1835,7 @@ async fn unified_exec_prunes_exited_sessions_first() -> Result<()> {
let probe_call_id = "uexec-prune-probe";
let probe_args = serde_json::json!({
"chars": "should fail\n",
"session_id": 1,
"session_id": 1001,
"yield_time_ms": 500,
});
events.push(ev_function_call(
@@ -1885,7 +1884,7 @@ async fn unified_exec_prunes_exited_sessions_first() -> Result<()> {
.find_map(|req| req.function_call_output_text(keep_call_id))
.expect("missing initial keep session output");
let keep_start_output = parse_unified_exec_output(&keep_start)?;
pretty_assertions::assert_eq!(keep_start_output.session_id, Some(0));
assert!(keep_start_output.process_id.is_some());
assert!(keep_start_output.exit_code.is_none());
let prune_start = requests
@@ -1893,7 +1892,7 @@ async fn unified_exec_prunes_exited_sessions_first() -> Result<()> {
.find_map(|req| req.function_call_output_text(prune_call_id))
.expect("missing initial prune session output");
let prune_start_output = parse_unified_exec_output(&prune_start)?;
pretty_assertions::assert_eq!(prune_start_output.session_id, Some(1));
assert!(prune_start_output.process_id.is_some());
assert!(prune_start_output.exit_code.is_none());
let keep_write = requests
@@ -1901,7 +1900,7 @@ async fn unified_exec_prunes_exited_sessions_first() -> Result<()> {
.find_map(|req| req.function_call_output_text(keep_write_call_id))
.expect("missing keep write output");
let keep_write_output = parse_unified_exec_output(&keep_write)?;
pretty_assertions::assert_eq!(keep_write_output.session_id, Some(0));
assert!(keep_write_output.process_id.is_some());
assert!(
keep_write_output.output.contains("still alive"),
"expected cat session to echo input, got {:?}",
@@ -1913,7 +1912,7 @@ async fn unified_exec_prunes_exited_sessions_first() -> Result<()> {
.find_map(|req| req.function_call_output_text(probe_call_id))
.expect("missing probe output");
assert!(
pruned_probe.contains("UnknownSessionId") || pruned_probe.contains("Unknown session id"),
pruned_probe.contains("UnknownSessionId") || pruned_probe.contains("Unknown process id"),
"expected probe to fail after pruning, got {pruned_probe:?}"
);

View File

@@ -637,6 +637,7 @@ fn exec_command_end_success_produces_completed_command_item() {
"c1",
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: "1".to_string(),
process_id: None,
turn_id: "turn-1".to_string(),
command: command.clone(),
cwd: cwd.clone(),
@@ -666,6 +667,7 @@ fn exec_command_end_success_produces_completed_command_item() {
"c2",
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: "1".to_string(),
process_id: None,
turn_id: "turn-1".to_string(),
command,
cwd,
@@ -709,6 +711,7 @@ fn exec_command_end_failure_produces_failed_command_item() {
"c1",
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: "2".to_string(),
process_id: None,
turn_id: "turn-1".to_string(),
command: command.clone(),
cwd: cwd.clone(),
@@ -737,6 +740,7 @@ fn exec_command_end_failure_produces_failed_command_item() {
"c2",
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: "2".to_string(),
process_id: None,
turn_id: "turn-1".to_string(),
command,
cwd,
@@ -777,6 +781,7 @@ fn exec_command_end_without_begin_is_ignored() {
"c1",
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: "no-begin".to_string(),
process_id: None,
turn_id: "turn-1".to_string(),
command: Vec::new(),
cwd: PathBuf::from("."),

View File

@@ -1328,6 +1328,10 @@ impl Default for ExecCommandSource {
pub struct ExecCommandBeginEvent {
/// Identifier so this can be paired with the ExecCommandEnd event.
pub call_id: String,
/// Identifier for the underlying PTY process (when available).
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub process_id: Option<String>,
/// Turn ID that this command belongs to.
pub turn_id: String,
/// The command to be executed.
@@ -1348,6 +1352,10 @@ pub struct ExecCommandBeginEvent {
pub struct ExecCommandEndEvent {
/// Identifier for the ExecCommandBegin that finished.
pub call_id: String,
/// Identifier for the underlying PTY process (when available).
#[serde(default, skip_serializing_if = "Option::is_none")]
#[ts(optional)]
pub process_id: Option<String>,
/// Turn ID that this command belongs to.
pub turn_id: String,
/// The command that was executed.

View File

@@ -722,6 +722,7 @@ fn begin_exec_with_source(
let interaction_input = None;
let event = ExecCommandBeginEvent {
call_id: call_id.to_string(),
process_id: None,
turn_id: "turn-1".to_string(),
command,
cwd,
@@ -760,11 +761,13 @@ fn end_exec(
parsed_cmd,
source,
interaction_input,
process_id,
} = begin_event;
chat.handle_codex_event(Event {
id: call_id.clone(),
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
process_id,
turn_id,
command,
cwd,
@@ -2880,6 +2883,7 @@ fn chatwidget_exec_and_status_layout_vt100_snapshot() {
id: "c1".into(),
msg: EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id: "c1".into(),
process_id: None,
turn_id: "turn-1".into(),
command: command.clone(),
cwd: cwd.clone(),
@@ -2892,6 +2896,7 @@ fn chatwidget_exec_and_status_layout_vt100_snapshot() {
id: "c1".into(),
msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id: "c1".into(),
process_id: None,
turn_id: "turn-1".into(),
command,
cwd,