Compare commits

..

17 Commits

Author SHA1 Message Date
kevin zhao
702bd19443 fix test 2025-11-04 19:15:32 -08:00
kevin zhao
c9f133c4b8 python 2025-11-04 19:06:22 -08:00
kevin zhao
7dd81c4dd9 update snapshots 2025-11-04 18:52:22 -08:00
kevin zhao
950ddd7ae1 fix test bug 2025-11-04 18:09:02 -08:00
kevin zhao
b2ebd1da7c update event mapping tests 2025-11-04 17:56:50 -08:00
kevin zhao
1104ba15d6 add test 2025-11-04 17:53:59 -08:00
kevin zhao
ee32bdb773 commonizing logic 2025-11-04 16:41:00 -08:00
kevin zhao
00cb2e20b4 add todo comment 2025-11-04 16:08:28 -08:00
kevin zhao
c23b8912bd colons 2025-11-04 16:07:13 -08:00
kevin zhao
1771759de9 fmt 2025-11-04 15:58:26 -08:00
kevin zhao
459c53c45a user shell command filtering 2025-11-04 15:56:16 -08:00
kevin zhao
f0eb141e02 fixing how we handle err in local shell runs 2025-11-04 15:44:24 -08:00
kevin zhao
4ab82bb579 clippy 2025-11-04 14:40:31 -08:00
kevin zhao
0d3afbd499 You Ran 2025-11-04 14:36:12 -08:00
kevin zhao
29038df468 clippy again 2025-11-04 14:27:31 -08:00
kevin zhao
6e8e5ebdf4 fix clippy 2025-11-04 14:11:11 -08:00
kevin zhao
3b345e0cb4 user shell command refactor 2025-11-04 14:03:35 -08:00
49 changed files with 1255 additions and 3287 deletions

View File

@@ -0,0 +1,47 @@
name: Setup musl 1.2.5 toolchain
description: Install musl 1.2.5 from source and configure the linker for the requested target.
inputs:
target:
description: Cargo target triple that requires musl (e.g., x86_64-unknown-linux-musl).
required: true
runs:
using: composite
steps:
- name: Install musl 1.2.5
shell: bash
env:
MUSL_VERSION: 1.2.5
MUSL_PREFIX: /opt/musl-1.2.5
DEBIAN_FRONTEND: noninteractive
run: |
set -euo pipefail
sudo apt-get -y update -o Acquire::Retries=3
sudo apt-get -y install --no-install-recommends build-essential curl pkg-config
curl -sSfL --retry 3 --retry-delay 1 "https://musl.libc.org/releases/musl-${MUSL_VERSION}.tar.gz" -o /tmp/musl.tar.gz
tar -xf /tmp/musl.tar.gz -C /tmp
pushd "/tmp/musl-${MUSL_VERSION}"
./configure --prefix="${MUSL_PREFIX}"
make -j"$(nproc)"
sudo make install
popd
echo "${MUSL_PREFIX}/bin" >> "$GITHUB_PATH"
musl_gcc="${MUSL_PREFIX}/bin/musl-gcc"
"${musl_gcc}" --version
case "${{ inputs.target }}" in
x86_64-unknown-linux-musl)
echo "CC_x86_64_unknown_linux_musl=${musl_gcc}" >> "$GITHUB_ENV"
echo "CARGO_TARGET_X86_64_UNKNOWN_LINUX_MUSL_LINKER=${musl_gcc}" >> "$GITHUB_ENV"
;;
aarch64-unknown-linux-musl)
echo "CC_aarch64_unknown_linux_musl=${musl_gcc}" >> "$GITHUB_ENV"
echo "CARGO_TARGET_AARCH64_UNKNOWN_LINUX_MUSL_LINKER=${musl_gcc}" >> "$GITHUB_ENV"
;;
*)
echo "Unsupported musl target '${{ inputs.target }}'" >&2
exit 1
;;
esac

View File

@@ -217,14 +217,10 @@ jobs:
key: apt-${{ matrix.runner }}-${{ matrix.target }}-v1
- if: ${{ matrix.target == 'x86_64-unknown-linux-musl' || matrix.target == 'aarch64-unknown-linux-musl'}}
name: Install musl build tools
env:
DEBIAN_FRONTEND: noninteractive
shell: bash
run: |
set -euo pipefail
sudo apt-get -y update -o Acquire::Retries=3
sudo apt-get -y install --no-install-recommends musl-tools pkg-config
name: Setup musl 1.2.5 toolchain
uses: ./.github/actions/setup-musl-1_2_5
with:
target: ${{ matrix.target }}
- name: Install cargo-chef
if: ${{ matrix.profile == 'release' }}

View File

@@ -92,10 +92,10 @@ jobs:
key: cargo-${{ matrix.runner }}-${{ matrix.target }}-release-${{ hashFiles('**/Cargo.lock') }}
- if: ${{ matrix.target == 'x86_64-unknown-linux-musl' || matrix.target == 'aarch64-unknown-linux-musl'}}
name: Install musl build tools
run: |
sudo apt-get update
sudo apt-get install -y musl-tools pkg-config
name: Setup musl 1.2.5 toolchain
uses: ./.github/actions/setup-musl-1_2_5
with:
target: ${{ matrix.target }}
- name: Cargo build
run: cargo build --target ${{ matrix.target }} --release --bin codex --bin codex-responses-api-proxy

2
codex-rs/Cargo.lock generated
View File

@@ -189,7 +189,6 @@ dependencies = [
"serde",
"serde_json",
"tokio",
"uuid",
"wiremock",
]
@@ -872,7 +871,6 @@ dependencies = [
"anyhow",
"clap",
"codex-protocol",
"mcp-types",
"paste",
"pretty_assertions",
"schemars 0.8.22",

View File

@@ -14,7 +14,6 @@ workspace = true
anyhow = { workspace = true }
clap = { workspace = true, features = ["derive"] }
codex-protocol = { workspace = true }
mcp-types = { workspace = true }
paste = { workspace = true }
schemars = { workspace = true }
serde = { workspace = true, features = ["derive"] }

View File

@@ -2,12 +2,8 @@ use crate::ClientNotification;
use crate::ClientRequest;
use crate::ServerNotification;
use crate::ServerRequest;
use crate::export_client_notification_schemas;
use crate::export_client_param_schemas;
use crate::export_client_response_schemas;
use crate::export_client_responses;
use crate::export_server_notification_schemas;
use crate::export_server_param_schemas;
use crate::export_server_response_schemas;
use crate::export_server_responses;
use anyhow::Context;
@@ -32,7 +28,84 @@ use ts_rs::TS;
const HEADER: &str = "// GENERATED CODE! DO NOT MODIFY BY HAND!\n\n";
type JsonSchemaEmitter = fn(&Path) -> Result<RootSchema>;
macro_rules! for_each_schema_type {
($macro:ident) => {
$macro!(crate::RequestId);
$macro!(crate::JSONRPCMessage);
$macro!(crate::JSONRPCRequest);
$macro!(crate::JSONRPCNotification);
$macro!(crate::JSONRPCResponse);
$macro!(crate::JSONRPCError);
$macro!(crate::JSONRPCErrorError);
$macro!(crate::AddConversationListenerParams);
$macro!(crate::AddConversationSubscriptionResponse);
$macro!(crate::ApplyPatchApprovalParams);
$macro!(crate::ApplyPatchApprovalResponse);
$macro!(crate::ArchiveConversationParams);
$macro!(crate::ArchiveConversationResponse);
$macro!(crate::AuthMode);
$macro!(crate::AccountUpdatedNotification);
$macro!(crate::AuthStatusChangeNotification);
$macro!(crate::CancelLoginChatGptParams);
$macro!(crate::CancelLoginChatGptResponse);
$macro!(crate::ClientInfo);
$macro!(crate::ClientNotification);
$macro!(crate::ClientRequest);
$macro!(crate::ConversationSummary);
$macro!(crate::ExecCommandApprovalParams);
$macro!(crate::ExecCommandApprovalResponse);
$macro!(crate::ExecOneOffCommandParams);
$macro!(crate::ExecOneOffCommandResponse);
$macro!(crate::FuzzyFileSearchParams);
$macro!(crate::FuzzyFileSearchResponse);
$macro!(crate::FuzzyFileSearchResult);
$macro!(crate::GetAuthStatusParams);
$macro!(crate::GetAuthStatusResponse);
$macro!(crate::GetUserAgentResponse);
$macro!(crate::GetUserSavedConfigResponse);
$macro!(crate::GitDiffToRemoteParams);
$macro!(crate::GitDiffToRemoteResponse);
$macro!(crate::GitSha);
$macro!(crate::InitializeParams);
$macro!(crate::InitializeResponse);
$macro!(crate::InputItem);
$macro!(crate::InterruptConversationParams);
$macro!(crate::InterruptConversationResponse);
$macro!(crate::ListConversationsParams);
$macro!(crate::ListConversationsResponse);
$macro!(crate::LoginApiKeyParams);
$macro!(crate::LoginApiKeyResponse);
$macro!(crate::LoginChatGptCompleteNotification);
$macro!(crate::LoginChatGptResponse);
$macro!(crate::LogoutChatGptParams);
$macro!(crate::LogoutChatGptResponse);
$macro!(crate::NewConversationParams);
$macro!(crate::NewConversationResponse);
$macro!(crate::Profile);
$macro!(crate::RemoveConversationListenerParams);
$macro!(crate::RemoveConversationSubscriptionResponse);
$macro!(crate::ResumeConversationParams);
$macro!(crate::ResumeConversationResponse);
$macro!(crate::SandboxSettings);
$macro!(crate::SendUserMessageParams);
$macro!(crate::SendUserMessageResponse);
$macro!(crate::SendUserTurnParams);
$macro!(crate::SendUserTurnResponse);
$macro!(crate::ServerNotification);
$macro!(crate::ServerRequest);
$macro!(crate::SessionConfiguredNotification);
$macro!(crate::SetDefaultModelParams);
$macro!(crate::SetDefaultModelResponse);
$macro!(crate::Tools);
$macro!(crate::UserInfoResponse);
$macro!(crate::UserSavedConfig);
$macro!(codex_protocol::protocol::EventMsg);
$macro!(codex_protocol::protocol::FileChange);
$macro!(codex_protocol::parse_command::ParsedCommand);
$macro!(codex_protocol::protocol::SandboxPolicy);
};
}
pub fn generate_types(out_dir: &Path, prettier: Option<&Path>) -> Result<()> {
generate_ts(out_dir, prettier)?;
generate_json(out_dir)?;
@@ -40,9 +113,7 @@ pub fn generate_types(out_dir: &Path, prettier: Option<&Path>) -> Result<()> {
}
pub fn generate_ts(out_dir: &Path, prettier: Option<&Path>) -> Result<()> {
let v2_out_dir = out_dir.join("v2");
ensure_dir(out_dir)?;
ensure_dir(&v2_out_dir)?;
ClientRequest::export_all_to(out_dir)?;
export_client_responses(out_dir)?;
@@ -53,15 +124,12 @@ pub fn generate_ts(out_dir: &Path, prettier: Option<&Path>) -> Result<()> {
ServerNotification::export_all_to(out_dir)?;
generate_index_ts(out_dir)?;
generate_index_ts(&v2_out_dir)?;
// Ensure our header is present on all TS files (root + subdirs like v2/).
let ts_files = ts_files_in_recursive(out_dir)?;
let ts_files = ts_files_in(out_dir)?;
for file in &ts_files {
prepend_header_if_missing(file)?;
}
// Optionally run Prettier on all generated TS files.
if let Some(prettier_bin) = prettier
&& !ts_files.is_empty()
{
@@ -80,57 +148,20 @@ pub fn generate_ts(out_dir: &Path, prettier: Option<&Path>) -> Result<()> {
pub fn generate_json(out_dir: &Path) -> Result<()> {
ensure_dir(out_dir)?;
// Emit schemas for core envelope/wire types that may not be referenced
// by params/responses but are useful as standalone:
let mut bundle: BTreeMap<String, RootSchema> = BTreeMap::new();
let envelope_emitters: &[(&str, JsonSchemaEmitter)] = &[
("RequestId", |d| {
write_json_schema_with_return::<crate::RequestId>(d, "RequestId")
}),
("JSONRPCMessage", |d| {
write_json_schema_with_return::<crate::JSONRPCMessage>(d, "JSONRPCMessage")
}),
("JSONRPCRequest", |d| {
write_json_schema_with_return::<crate::JSONRPCRequest>(d, "JSONRPCRequest")
}),
("JSONRPCNotification", |d| {
write_json_schema_with_return::<crate::JSONRPCNotification>(d, "JSONRPCNotification")
}),
("JSONRPCResponse", |d| {
write_json_schema_with_return::<crate::JSONRPCResponse>(d, "JSONRPCResponse")
}),
("JSONRPCError", |d| {
write_json_schema_with_return::<crate::JSONRPCError>(d, "JSONRPCError")
}),
("JSONRPCErrorError", |d| {
write_json_schema_with_return::<crate::JSONRPCErrorError>(d, "JSONRPCErrorError")
}),
("ClientRequest", |d| {
write_json_schema_with_return::<crate::ClientRequest>(d, "ClientRequest")
}),
("ServerRequest", |d| {
write_json_schema_with_return::<crate::ServerRequest>(d, "ServerRequest")
}),
("ClientNotification", |d| {
write_json_schema_with_return::<crate::ClientNotification>(d, "ClientNotification")
}),
("ServerNotification", |d| {
write_json_schema_with_return::<crate::ServerNotification>(d, "ServerNotification")
}),
];
for (name, emit) in envelope_emitters {
let schema = emit(out_dir)?;
bundle.insert((*name).to_string(), schema);
macro_rules! add_schema {
($ty:path) => {{
let name = type_basename(stringify!($ty));
let schema = write_json_schema_with_return::<$ty>(out_dir, &name)?;
bundle.insert(name, schema);
}};
}
// Have the macros generate per-type JSON for params, responses, and notifications.
export_client_param_schemas(out_dir)?;
for_each_schema_type!(add_schema);
export_client_response_schemas(out_dir)?;
export_server_param_schemas(out_dir)?;
export_server_response_schemas(out_dir)?;
export_client_notification_schemas(out_dir)?;
export_server_notification_schemas(out_dir)?;
let mut definitions = Map::new();
@@ -146,44 +177,10 @@ pub fn generate_json(out_dir: &Path) -> Result<()> {
"ServerRequest",
];
// Merge all generated per-type JSON files (including the envelopes above)
// into a single definitions bundle. Also recognize a v2 namespace so that
// types like RateLimitSnapshot can co-exist between legacy and v2.
let json_files = json_files_in_recursive(out_dir)?;
for path in json_files {
// Skip the bundle were about to (re)generate.
if path.file_name().and_then(OsStr::to_str)
== Some("codex_app_server_protocol.schemas.json")
{
continue;
}
for (name, schema) in bundle {
let mut schema_value = serde_json::to_value(schema)?;
annotate_schema(&mut schema_value, Some(name.as_str()));
let file_stem = path
.file_stem()
.and_then(OsStr::to_str)
.ok_or_else(|| anyhow!(format!("Invalid schema file name {}", path.display())))?;
// Determine namespace and logical type name from either the stem
// (e.g., "v2::Type") or the directory layout (e.g., ".../v2/Type.json").
let (ns_opt, logical_name) = detect_namespace(&path, file_stem);
let mut schema_value: Value = serde_json::from_str(
&fs::read_to_string(&path)
.with_context(|| format!("Failed to read {}", path.display()))?,
)
.with_context(|| format!("Failed to parse JSON from {}", path.display()))?;
// Normalize; use the original stem as the base so existing naming rules apply.
annotate_schema(&mut schema_value, Some(file_stem));
if let Some(ref ns) = ns_opt {
// Rewrite internal $ref targets to point to the namespaced location
// under the bundle's definitions.
rewrite_refs_to_namespace(&mut schema_value, ns);
}
// Pull embedded definitions out and insert into our bundle map at the
// appropriate namespace.
if let Value::Object(ref mut obj) = schema_value
&& let Some(defs) = obj.remove("definitions")
&& let Value::Object(defs_obj) = defs
@@ -191,37 +188,11 @@ pub fn generate_json(out_dir: &Path) -> Result<()> {
for (def_name, mut def_schema) in defs_obj {
if !SPECIAL_DEFINITIONS.contains(&def_name.as_str()) {
annotate_schema(&mut def_schema, Some(def_name.as_str()));
if let Some(ref ns) = ns_opt {
definitions
.entry(ns.to_string())
.or_insert_with(|| Value::Object(Map::new()));
if let Value::Object(defs_ns) = definitions
.get_mut(ns)
.ok_or_else(|| anyhow!("just inserted v2 namespace"))?
{
defs_ns.insert(def_name, def_schema);
}
} else {
definitions.insert(def_name, def_schema);
}
definitions.insert(def_name, def_schema);
}
}
}
// Insert the schema root itself into the bundle under the namespace.
if let Some(ref ns) = ns_opt {
definitions
.entry(ns.to_string())
.or_insert_with(|| Value::Object(Map::new()));
if let Value::Object(defs_ns) = definitions
.get_mut(ns)
.ok_or_else(|| anyhow!("just inserted v2 namespace"))?
{
defs_ns.insert(logical_name.to_string(), schema_value);
}
} else {
definitions.insert(logical_name.to_string(), schema_value);
}
definitions.insert(name, schema_value);
}
let mut root = Map::new();
@@ -252,19 +223,7 @@ where
let schema = schema_for!(T);
let mut schema_value = serde_json::to_value(schema)?;
annotate_schema(&mut schema_value, Some(file_stem));
// If the name looks like a namespaced path (e.g., "v2::Type"), mirror
// the TypeScript layout and write to out_dir/v2/Type.json. Otherwise
// write alongside the legacy files.
let (ns_opt, logical_name) = split_namespace(file_stem);
let out_path = if let Some(ns) = ns_opt {
let dir = out_dir.join(ns);
ensure_dir(&dir)?;
dir.join(format!("{logical_name}.json"))
} else {
out_dir.join(format!("{file_stem}.json"))
};
write_pretty_json(out_path, &schema_value)
write_pretty_json(out_dir.join(format!("{file_stem}.json")), &schema_value)
.with_context(|| format!("Failed to write JSON schema for {file_stem}"))?;
let annotated_schema = serde_json::from_value(schema_value)?;
Ok(annotated_schema)
@@ -283,76 +242,13 @@ fn write_pretty_json(path: PathBuf, value: &impl Serialize) -> Result<()> {
fs::write(&path, json).with_context(|| format!("Failed to write {}", path.display()))?;
Ok(())
}
/// Detect a namespace from a schema file path and stem.
/// Supports both legacy on-disk names like "v2::Type.json" and the new
/// directory layout ".../v2/Type.json".
fn detect_namespace(path: &Path, stem: &str) -> (Option<String>, String) {
// Prefer directory-based detection.
if let Some(parent) = path.parent()
&& parent.file_name().and_then(OsStr::to_str) == Some("v2")
{
return (Some("v2".to_string()), stem.to_string());
}
split_namespace(stem)
}
/// Split a "ns::Type" name into (Some(ns), Type). Returns (None, name) if no ns.
fn split_namespace(name: &str) -> (Option<String>, String) {
if let Some(idx) = name.find("::") {
let (ns, rest) = name.split_at(idx);
// rest starts with "::"
let typ = &rest[2..];
return (Some(ns.to_string()), typ.to_string());
}
(None, name.to_string())
}
/// Recursively rewrite $ref values that point at "#/definitions/..." so that
/// they point to a namespaced location under the bundle.
fn rewrite_refs_to_namespace(value: &mut Value, ns: &str) {
match value {
Value::Object(obj) => {
if let Some(Value::String(r)) = obj.get_mut("$ref")
&& let Some(suffix) = r.strip_prefix("#/definitions/")
{
*r = format!("#/definitions/{ns}/{suffix}");
}
for v in obj.values_mut() {
rewrite_refs_to_namespace(v, ns);
}
}
Value::Array(items) => {
for v in items.iter_mut() {
rewrite_refs_to_namespace(v, ns);
}
}
_ => {}
}
}
fn json_files_in_recursive(dir: &Path) -> Result<Vec<PathBuf>> {
let mut files = Vec::new();
let mut stack = vec![dir.to_path_buf()];
while let Some(d) = stack.pop() {
for entry in
fs::read_dir(&d).with_context(|| format!("Failed to read dir {}", d.display()))?
{
let entry = entry?;
let path = entry.path();
if path.is_dir() {
// Skip known non-protocol directories.
if path.file_name().and_then(OsStr::to_str) == Some("serde_json") {
continue;
}
stack.push(path);
} else if path.is_file() && path.extension() == Some(OsStr::new("json")) {
files.push(path);
}
}
}
files.sort();
Ok(files)
fn type_basename(type_path: &str) -> String {
type_path
.rsplit_once("::")
.map(|(_, name)| name)
.unwrap_or(type_path)
.trim()
.to_string()
}
fn variant_definition_name(base: &str, variant: &Value) -> Option<String> {
@@ -609,26 +505,6 @@ fn ts_files_in(dir: &Path) -> Result<Vec<PathBuf>> {
Ok(files)
}
fn ts_files_in_recursive(dir: &Path) -> Result<Vec<PathBuf>> {
let mut files = Vec::new();
let mut stack = vec![dir.to_path_buf()];
while let Some(d) = stack.pop() {
for entry in
fs::read_dir(&d).with_context(|| format!("Failed to read dir {}", d.display()))?
{
let entry = entry?;
let path = entry.path();
if path.is_dir() {
stack.push(path);
} else if path.is_file() && path.extension() == Some(OsStr::new("ts")) {
files.push(path);
}
}
}
files.sort();
Ok(files)
}
fn generate_index_ts(out_dir: &Path) -> Result<PathBuf> {
let mut entries: Vec<String> = Vec::new();
let mut stems: Vec<String> = ts_files_in(out_dir)?
@@ -645,14 +521,6 @@ fn generate_index_ts(out_dir: &Path) -> Result<PathBuf> {
entries.push(format!("export type {{ {name} }} from \"./{name}\";\n"));
}
// If this is the root out_dir and a ./v2 folder exists with TS files,
// expose it as a namespace to avoid symbol collisions at the root.
let v2_dir = out_dir.join("v2");
let has_v2_ts = ts_files_in(&v2_dir).map(|v| !v.is_empty()).unwrap_or(false);
if has_v2_ts {
entries.push("export * as v2 from \"./v2\";\n".to_string());
}
let mut content =
String::with_capacity(HEADER.len() + entries.iter().map(String::len).sum::<usize>());
content.push_str(HEADER);
@@ -679,7 +547,6 @@ mod tests {
#[test]
fn generated_ts_has_no_optional_nullable_fields() -> Result<()> {
// Assert that there are no types of the form "?: T | null" in the generated TS files.
let output_dir = std::env::temp_dir().join(format!("codex_ts_types_{}", Uuid::now_v7()));
fs::create_dir(&output_dir)?;

View File

@@ -9,6 +9,7 @@ use crate::protocol::v2;
use codex_protocol::ConversationId;
use codex_protocol::parse_command::ParsedCommand;
use codex_protocol::protocol::FileChange;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::ReviewDecision;
use codex_protocol::protocol::SandboxCommandAssessment;
use paste::paste;
@@ -81,71 +82,16 @@ macro_rules! client_request_definitions {
)*
Ok(())
}
pub fn export_client_param_schemas(
out_dir: &::std::path::Path,
) -> ::anyhow::Result<()> {
$(
crate::export::write_json_schema::<$params>(out_dir, stringify!($params))?;
)*
Ok(())
}
};
}
client_request_definitions! {
/// NEW APIs
// Thread lifecycle
#[serde(rename = "thread/start")]
#[ts(rename = "thread/start")]
ThreadStart {
params: v2::ThreadStartParams,
response: v2::ThreadStartResponse,
},
#[serde(rename = "thread/resume")]
#[ts(rename = "thread/resume")]
ThreadResume {
params: v2::ThreadResumeParams,
response: v2::ThreadResumeResponse,
},
#[serde(rename = "thread/archive")]
#[ts(rename = "thread/archive")]
ThreadArchive {
params: v2::ThreadArchiveParams,
response: v2::ThreadArchiveResponse,
},
#[serde(rename = "thread/list")]
#[ts(rename = "thread/list")]
ThreadList {
params: v2::ThreadListParams,
response: v2::ThreadListResponse,
},
#[serde(rename = "thread/compact")]
#[ts(rename = "thread/compact")]
ThreadCompact {
params: v2::ThreadCompactParams,
response: v2::ThreadCompactResponse,
},
// Turn lifecycle
#[serde(rename = "turn/start")]
#[ts(rename = "turn/start")]
TurnStart {
params: v2::TurnStartParams,
response: v2::TurnStartResponse,
},
#[serde(rename = "turn/interrupt")]
#[ts(rename = "turn/interrupt")]
TurnInterrupt {
params: v2::TurnInterruptParams,
response: v2::TurnInterruptResponse,
},
#[serde(rename = "model/list")]
#[ts(rename = "model/list")]
ModelList {
params: v2::ModelListParams,
response: v2::ModelListResponse,
ListModels {
params: v2::ListModelsParams,
response: v2::ListModelsResponse,
},
#[serde(rename = "account/login")]
@@ -171,9 +117,9 @@ client_request_definitions! {
#[serde(rename = "feedback/upload")]
#[ts(rename = "feedback/upload")]
FeedbackUpload {
params: v2::FeedbackUploadParams,
response: v2::FeedbackUploadResponse,
UploadFeedback {
params: v2::UploadFeedbackParams,
response: v2::UploadFeedbackResponse,
},
#[serde(rename = "account/read")]
@@ -338,92 +284,6 @@ macro_rules! server_request_definitions {
}
Ok(())
}
pub fn export_server_param_schemas(
out_dir: &::std::path::Path,
) -> ::anyhow::Result<()> {
paste! {
$(crate::export::write_json_schema::<[<$variant Params>]>(out_dir, stringify!([<$variant Params>]))?;)*
}
Ok(())
}
};
}
/// Generates `ServerNotification` enum and helpers, including a JSON Schema
/// exporter for each notification.
macro_rules! server_notification_definitions {
(
$(
$(#[$variant_meta:meta])*
$variant:ident ( $payload:ty )
),* $(,)?
) => {
/// Notification sent from the server to the client.
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS, Display)]
#[serde(tag = "method", content = "params", rename_all = "camelCase")]
#[strum(serialize_all = "camelCase")]
pub enum ServerNotification {
$(
$(#[$variant_meta])*
$variant($payload),
)*
}
impl ServerNotification {
pub fn to_params(self) -> Result<serde_json::Value, serde_json::Error> {
match self {
$(Self::$variant(params) => serde_json::to_value(params),)*
}
}
}
impl TryFrom<JSONRPCNotification> for ServerNotification {
type Error = serde_json::Error;
fn try_from(value: JSONRPCNotification) -> Result<Self, Self::Error> {
serde_json::from_value(serde_json::to_value(value)?)
}
}
pub fn export_server_notification_schemas(
_out_dir: &::std::path::Path,
) -> ::anyhow::Result<()> {
$(
crate::export::write_json_schema::<$payload>(_out_dir, stringify!($payload))?;
)*
Ok(())
}
};
}
/// Notifications sent from the client to the server.
/// Generates `ClientNotification` enum and a JSON Schema exporter.
macro_rules! client_notification_definitions {
(
$(
$(#[$variant_meta:meta])*
$variant:ident $( ( $payload:ty ) )?
),* $(,)?
) => {
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS, Display)]
#[serde(tag = "method", content = "params", rename_all = "camelCase")]
#[strum(serialize_all = "camelCase")]
pub enum ClientNotification {
$(
$(#[$variant_meta])*
$variant $( ( $payload ) )? ,
)*
}
pub fn export_client_notification_schemas(
_out_dir: &::std::path::Path,
) -> ::anyhow::Result<()> {
$(
$(crate::export::write_json_schema::<$payload>(_out_dir, stringify!($payload))?;)?
)*
Ok(())
}
};
}
@@ -506,48 +366,12 @@ pub struct FuzzyFileSearchResponse {
pub files: Vec<FuzzyFileSearchResult>,
}
server_notification_definitions! {
/// Notification sent from the server to the client.
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS, Display)]
#[serde(tag = "method", content = "params", rename_all = "camelCase")]
#[strum(serialize_all = "camelCase")]
pub enum ServerNotification {
/// NEW NOTIFICATIONS
#[serde(rename = "thread/started")]
#[ts(rename = "thread/started")]
#[strum(serialize = "thread/started")]
ThreadStarted(v2::ThreadStartedNotification),
#[serde(rename = "turn/started")]
#[ts(rename = "turn/started")]
#[strum(serialize = "turn/started")]
TurnStarted(v2::TurnStartedNotification),
#[serde(rename = "turn/completed")]
#[ts(rename = "turn/completed")]
#[strum(serialize = "turn/completed")]
TurnCompleted(v2::TurnCompletedNotification),
#[serde(rename = "item/started")]
#[ts(rename = "item/started")]
#[strum(serialize = "item/started")]
ItemStarted(v2::ItemStartedNotification),
#[serde(rename = "item/completed")]
#[ts(rename = "item/completed")]
#[strum(serialize = "item/completed")]
ItemCompleted(v2::ItemCompletedNotification),
#[serde(rename = "item/agentMessage/delta")]
#[ts(rename = "item/agentMessage/delta")]
#[strum(serialize = "item/agentMessage/delta")]
AgentMessageDelta(v2::AgentMessageDeltaNotification),
#[serde(rename = "item/commandExecution/outputDelta")]
#[ts(rename = "item/commandExecution/outputDelta")]
#[strum(serialize = "item/commandExecution/outputDelta")]
CommandExecutionOutputDelta(v2::CommandExecutionOutputDeltaNotification),
#[serde(rename = "item/mcpToolCall/progress")]
#[ts(rename = "item/mcpToolCall/progress")]
#[strum(serialize = "item/mcpToolCall/progress")]
McpToolCallProgress(v2::McpToolCallProgressNotification),
#[serde(rename = "account/updated")]
#[ts(rename = "account/updated")]
#[strum(serialize = "account/updated")]
@@ -556,7 +380,7 @@ server_notification_definitions! {
#[serde(rename = "account/rateLimits/updated")]
#[ts(rename = "account/rateLimits/updated")]
#[strum(serialize = "account/rateLimits/updated")]
AccountRateLimitsUpdated(v2::AccountRateLimitsUpdatedNotification),
AccountRateLimitsUpdated(RateLimitSnapshot),
/// DEPRECATED NOTIFICATIONS below
/// Authentication status changed
@@ -569,7 +393,31 @@ server_notification_definitions! {
SessionConfigured(v1::SessionConfiguredNotification),
}
client_notification_definitions! {
impl ServerNotification {
pub fn to_params(self) -> Result<serde_json::Value, serde_json::Error> {
match self {
ServerNotification::AccountUpdated(params) => serde_json::to_value(params),
ServerNotification::AccountRateLimitsUpdated(params) => serde_json::to_value(params),
ServerNotification::AuthStatusChange(params) => serde_json::to_value(params),
ServerNotification::LoginChatGptComplete(params) => serde_json::to_value(params),
ServerNotification::SessionConfigured(params) => serde_json::to_value(params),
}
}
}
impl TryFrom<JSONRPCNotification> for ServerNotification {
type Error = serde_json::Error;
fn try_from(value: JSONRPCNotification) -> Result<Self, Self::Error> {
serde_json::from_value(serde_json::to_value(value)?)
}
}
/// Notification sent from the client to the server.
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, TS, Display)]
#[serde(tag = "method", content = "params", rename_all = "camelCase")]
#[strum(serialize_all = "camelCase")]
pub enum ClientNotification {
Initialized,
}
@@ -823,16 +671,16 @@ mod tests {
#[test]
fn serialize_list_models() -> Result<()> {
let request = ClientRequest::ModelList {
let request = ClientRequest::ListModels {
request_id: RequestId::Integer(6),
params: v2::ModelListParams::default(),
params: v2::ListModelsParams::default(),
};
assert_eq!(
json!({
"method": "model/list",
"id": 6,
"params": {
"limit": null,
"pageSize": null,
"cursor": null
}
}),

View File

@@ -1,123 +1,17 @@
use std::collections::HashMap;
use std::path::PathBuf;
use crate::protocol::common::AuthMode;
use codex_protocol::ConversationId;
use codex_protocol::account::PlanType;
use codex_protocol::config_types::ReasoningEffort;
use codex_protocol::config_types::ReasoningSummary;
use mcp_types::ContentBlock as McpContentBlock;
use codex_protocol::protocol::RateLimitSnapshot;
use schemars::JsonSchema;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JsonValue;
use ts_rs::TS;
use uuid::Uuid;
// Macro to declare a camelCased API v2 enum mirroring a core enum which
// tends to use kebab-case.
macro_rules! v2_enum_from_core {
(
pub enum $Name:ident from $Src:path { $( $Variant:ident ),+ $(,)? }
) => {
#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub enum $Name { $( $Variant ),+ }
impl $Name {
pub fn to_core(self) -> $Src {
match self { $( $Name::$Variant => <$Src>::$Variant ),+ }
}
}
impl From<$Src> for $Name {
fn from(value: $Src) -> Self {
match value { $( <$Src>::$Variant => $Name::$Variant ),+ }
}
}
};
}
v2_enum_from_core!(
pub enum AskForApproval from codex_protocol::protocol::AskForApproval {
UnlessTrusted, OnFailure, OnRequest, Never
}
);
v2_enum_from_core!(
pub enum SandboxMode from codex_protocol::config_types::SandboxMode {
ReadOnly, WorkspaceWrite, DangerFullAccess
}
);
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)]
#[serde(tag = "mode", rename_all = "camelCase")]
#[ts(tag = "mode")]
#[ts(export_to = "v2/")]
pub enum SandboxPolicy {
DangerFullAccess,
ReadOnly,
WorkspaceWrite {
#[serde(default, skip_serializing_if = "Vec::is_empty")]
writable_roots: Vec<PathBuf>,
#[serde(default)]
network_access: bool,
#[serde(default)]
exclude_tmpdir_env_var: bool,
#[serde(default)]
exclude_slash_tmp: bool,
},
}
impl SandboxPolicy {
pub fn to_core(&self) -> codex_protocol::protocol::SandboxPolicy {
match self {
SandboxPolicy::DangerFullAccess => {
codex_protocol::protocol::SandboxPolicy::DangerFullAccess
}
SandboxPolicy::ReadOnly => codex_protocol::protocol::SandboxPolicy::ReadOnly,
SandboxPolicy::WorkspaceWrite {
writable_roots,
network_access,
exclude_tmpdir_env_var,
exclude_slash_tmp,
} => codex_protocol::protocol::SandboxPolicy::WorkspaceWrite {
writable_roots: writable_roots.clone(),
network_access: *network_access,
exclude_tmpdir_env_var: *exclude_tmpdir_env_var,
exclude_slash_tmp: *exclude_slash_tmp,
},
}
}
}
impl From<codex_protocol::protocol::SandboxPolicy> for SandboxPolicy {
fn from(value: codex_protocol::protocol::SandboxPolicy) -> Self {
match value {
codex_protocol::protocol::SandboxPolicy::DangerFullAccess => {
SandboxPolicy::DangerFullAccess
}
codex_protocol::protocol::SandboxPolicy::ReadOnly => SandboxPolicy::ReadOnly,
codex_protocol::protocol::SandboxPolicy::WorkspaceWrite {
writable_roots,
network_access,
exclude_tmpdir_env_var,
exclude_slash_tmp,
} => SandboxPolicy::WorkspaceWrite {
writable_roots,
network_access,
exclude_tmpdir_env_var,
exclude_slash_tmp,
},
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(tag = "type", rename_all = "camelCase")]
#[ts(tag = "type")]
#[ts(export_to = "v2/")]
pub enum Account {
#[serde(rename = "apiKey", rename_all = "camelCase")]
#[ts(rename = "apiKey", rename_all = "camelCase")]
@@ -134,7 +28,6 @@ pub enum Account {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(tag = "type")]
#[ts(tag = "type")]
#[ts(export_to = "v2/")]
pub enum LoginAccountParams {
#[serde(rename = "apiKey")]
#[ts(rename = "apiKey")]
@@ -150,7 +43,6 @@ pub enum LoginAccountParams {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct LoginAccountResponse {
/// Only set if the login method is ChatGPT.
#[schemars(with = "String")]
@@ -163,36 +55,31 @@ pub struct LoginAccountResponse {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct LogoutAccountResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct GetAccountRateLimitsResponse {
pub rate_limits: RateLimitSnapshot,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct GetAccountResponse {
pub account: Account,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ModelListParams {
pub struct ListModelsParams {
/// Optional page size; defaults to a reasonable server-side value.
pub page_size: Option<usize>,
/// Opaque pagination cursor returned by a previous call.
pub cursor: Option<String>,
/// Optional page size; defaults to a reasonable server-side value.
pub limit: Option<i32>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct Model {
pub id: String,
pub model: String,
@@ -206,7 +93,6 @@ pub struct Model {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ReasoningEffortOption {
pub reasoning_effort: ReasoningEffort,
pub description: String,
@@ -214,9 +100,8 @@ pub struct ReasoningEffortOption {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ModelListResponse {
pub data: Vec<Model>,
pub struct ListModelsResponse {
pub items: Vec<Model>,
/// Opaque cursor to pass to the next call to continue after the last item.
/// if None, there are no more items to return.
pub next_cursor: Option<String>,
@@ -224,8 +109,7 @@ pub struct ModelListResponse {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct FeedbackUploadParams {
pub struct UploadFeedbackParams {
pub classification: String,
pub reason: Option<String>,
pub conversation_id: Option<ConversationId>,
@@ -234,428 +118,10 @@ pub struct FeedbackUploadParams {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct FeedbackUploadResponse {
pub struct UploadFeedbackResponse {
pub thread_id: String,
}
// === Threads, Turns, and Items ===
// Thread APIs
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadStartParams {
pub model: Option<String>,
pub model_provider: Option<String>,
pub cwd: Option<String>,
pub approval_policy: Option<AskForApproval>,
pub sandbox: Option<SandboxMode>,
pub config: Option<HashMap<String, serde_json::Value>>,
pub base_instructions: Option<String>,
pub developer_instructions: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadStartResponse {
pub thread: Thread,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadResumeParams {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadResumeResponse {
pub thread: Thread,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadArchiveParams {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadArchiveResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadListParams {
/// Opaque pagination cursor returned by a previous call.
pub cursor: Option<String>,
/// Optional page size; defaults to a reasonable server-side value.
pub limit: Option<i32>,
/// Optional provider filter; when set, only sessions recorded under these
/// providers are returned. When present but empty, includes all providers.
pub model_providers: Option<Vec<String>>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadListResponse {
pub data: Vec<Thread>,
/// Opaque cursor to pass to the next call to continue after the last item.
/// if None, there are no more items to return.
pub next_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadCompactParams {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadCompactResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct Thread {
pub id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct Turn {
pub id: String,
pub items: Vec<ThreadItem>,
pub status: TurnStatus,
pub error: Option<TurnError>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct TurnError {
pub message: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub enum TurnStatus {
Completed,
Interrupted,
Failed,
InProgress,
}
// Turn APIs
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct TurnStartParams {
pub thread_id: String,
pub input: Vec<UserInput>,
/// Override the working directory for this turn and subsequent turns.
pub cwd: Option<PathBuf>,
/// Override the approval policy for this turn and subsequent turns.
pub approval_policy: Option<AskForApproval>,
/// Override the sandbox policy for this turn and subsequent turns.
pub sandbox_policy: Option<SandboxPolicy>,
/// Override the model for this turn and subsequent turns.
pub model: Option<String>,
/// Override the reasoning effort for this turn and subsequent turns.
pub effort: Option<ReasoningEffort>,
/// Override the reasoning summary for this turn and subsequent turns.
pub summary: Option<ReasoningSummary>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct TurnStartResponse {
pub turn: Turn,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct TurnInterruptParams {
pub thread_id: String,
pub turn_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct TurnInterruptResponse {}
// User input types
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(tag = "type", rename_all = "camelCase")]
#[ts(tag = "type")]
#[ts(export_to = "v2/")]
pub enum UserInput {
Text { text: String },
Image { url: String },
LocalImage { path: PathBuf },
}
// Thread items
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(tag = "type", rename_all = "camelCase")]
#[ts(tag = "type")]
#[ts(export_to = "v2/")]
pub enum ThreadItem {
UserMessage {
id: String,
content: Vec<UserInput>,
},
AgentMessage {
id: String,
text: String,
},
Reasoning {
id: String,
text: String,
},
CommandExecution {
id: String,
command: String,
aggregated_output: String,
exit_code: Option<i32>,
status: CommandExecutionStatus,
duration_ms: Option<i64>,
},
FileChange {
id: String,
changes: Vec<FileUpdateChange>,
status: PatchApplyStatus,
},
McpToolCall {
id: String,
server: String,
tool: String,
status: McpToolCallStatus,
arguments: JsonValue,
result: Option<McpToolCallResult>,
error: Option<McpToolCallError>,
},
WebSearch {
id: String,
query: String,
},
TodoList {
id: String,
items: Vec<TodoItem>,
},
ImageView {
id: String,
path: String,
},
CodeReview {
id: String,
review: String,
},
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub enum CommandExecutionStatus {
InProgress,
Completed,
Failed,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct FileUpdateChange {
pub path: String,
pub kind: PatchChangeKind,
pub diff: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub enum PatchChangeKind {
Add,
Delete,
Update,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub enum PatchApplyStatus {
Completed,
Failed,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub enum McpToolCallStatus {
InProgress,
Completed,
Failed,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct McpToolCallResult {
pub content: Vec<McpContentBlock>,
pub structured_content: JsonValue,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct McpToolCallError {
pub message: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct TodoItem {
pub id: String,
pub text: String,
pub completed: bool,
}
// === Server Notifications ===
// Thread/Turn lifecycle notifications and item progress events
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadStartedNotification {
pub thread: Thread,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct TurnStartedNotification {
pub turn: Turn,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct Usage {
pub input_tokens: i32,
pub cached_input_tokens: i32,
pub output_tokens: i32,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct TurnCompletedNotification {
pub turn: Turn,
// TODO: should usage be stored on the Turn object, and we return that instead?
pub usage: Usage,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ItemStartedNotification {
pub item: ThreadItem,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ItemCompletedNotification {
pub item: ThreadItem,
}
// Item-specific progress notifications
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct AgentMessageDeltaNotification {
pub item_id: String,
pub delta: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct CommandExecutionOutputDeltaNotification {
pub item_id: String,
pub delta: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct McpToolCallProgressNotification {
pub item_id: String,
pub message: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct AccountRateLimitsUpdatedNotification {
pub rate_limits: RateLimitSnapshot,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct RateLimitSnapshot {
pub primary: Option<RateLimitWindow>,
pub secondary: Option<RateLimitWindow>,
}
impl From<codex_protocol::protocol::RateLimitSnapshot> for RateLimitSnapshot {
fn from(value: codex_protocol::protocol::RateLimitSnapshot) -> Self {
Self {
primary: value.primary.map(RateLimitWindow::from),
secondary: value.secondary.map(RateLimitWindow::from),
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct RateLimitWindow {
/// Percentage (0-100) of the window that has been consumed.
pub used_percent: i64,
/// Rolling window duration, in minutes.
#[ts(type = "number | null")]
pub window_duration_mins: Option<i64>,
/// Unix timestamp (seconds since epoch) when the window resets.
#[ts(type = "number | null")]
pub resets_at: Option<i64>,
}
impl From<codex_protocol::protocol::RateLimitWindow> for RateLimitWindow {
fn from(value: codex_protocol::protocol::RateLimitWindow) -> Self {
Self {
used_percent: value.used_percent.round() as i64,
window_duration_mins: value.window_minutes,
resets_at: value.resets_at,
}
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
pub struct AccountUpdatedNotification {

File diff suppressed because it is too large Load Diff

View File

@@ -144,8 +144,8 @@ mod tests {
use codex_app_server_protocol::AccountUpdatedNotification;
use codex_app_server_protocol::AuthMode;
use codex_app_server_protocol::LoginChatGptCompleteNotification;
use codex_app_server_protocol::RateLimitSnapshot;
use codex_app_server_protocol::RateLimitWindow;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::RateLimitWindow;
use pretty_assertions::assert_eq;
use serde_json::json;
use uuid::Uuid;
@@ -179,32 +179,26 @@ mod tests {
#[test]
fn verify_account_rate_limits_notification_serialization() {
let notification = ServerNotification::AccountRateLimitsUpdated(
codex_app_server_protocol::AccountRateLimitsUpdatedNotification {
rate_limits: RateLimitSnapshot {
primary: Some(RateLimitWindow {
used_percent: 25,
window_duration_mins: Some(15),
resets_at: Some(123),
}),
secondary: None,
},
},
);
let notification = ServerNotification::AccountRateLimitsUpdated(RateLimitSnapshot {
primary: Some(RateLimitWindow {
used_percent: 25.0,
window_minutes: Some(15),
resets_at: Some(123),
}),
secondary: None,
});
let jsonrpc_notification = OutgoingMessage::AppServerNotification(notification);
assert_eq!(
json!({
"method": "account/rateLimits/updated",
"params": {
"rateLimits": {
"primary": {
"usedPercent": 25,
"windowDurationMins": 15,
"resetsAt": 123
},
"secondary": null
}
"primary": {
"used_percent": 25.0,
"window_minutes": 15,
"resets_at": 123,
},
"secondary": null,
},
}),
serde_json::to_value(jsonrpc_notification)

View File

@@ -22,4 +22,3 @@ tokio = { workspace = true, features = [
"rt-multi-thread",
] }
wiremock = { workspace = true }
uuid = { workspace = true }

View File

@@ -2,7 +2,6 @@ mod auth_fixtures;
mod mcp_process;
mod mock_model_server;
mod responses;
mod rollout;
pub use auth_fixtures::ChatGptAuthFixture;
pub use auth_fixtures::ChatGptIdTokenClaims;
@@ -11,11 +10,9 @@ pub use auth_fixtures::write_chatgpt_auth;
use codex_app_server_protocol::JSONRPCResponse;
pub use mcp_process::McpProcess;
pub use mock_model_server::create_mock_chat_completions_server;
pub use mock_model_server::create_mock_chat_completions_server_unchecked;
pub use responses::create_apply_patch_sse_response;
pub use responses::create_final_assistant_message_sse_response;
pub use responses::create_shell_sse_response;
pub use rollout::create_fake_rollout;
use serde::de::DeserializeOwned;
pub fn to_response<T: DeserializeOwned>(response: JSONRPCResponse) -> anyhow::Result<T> {

View File

@@ -17,13 +17,12 @@ use codex_app_server_protocol::ArchiveConversationParams;
use codex_app_server_protocol::CancelLoginChatGptParams;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::FeedbackUploadParams;
use codex_app_server_protocol::GetAuthStatusParams;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::InterruptConversationParams;
use codex_app_server_protocol::ListConversationsParams;
use codex_app_server_protocol::ListModelsParams;
use codex_app_server_protocol::LoginApiKeyParams;
use codex_app_server_protocol::ModelListParams;
use codex_app_server_protocol::NewConversationParams;
use codex_app_server_protocol::RemoveConversationListenerParams;
use codex_app_server_protocol::ResumeConversationParams;
@@ -31,12 +30,7 @@ use codex_app_server_protocol::SendUserMessageParams;
use codex_app_server_protocol::SendUserTurnParams;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::SetDefaultModelParams;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::TurnInterruptParams as V2TurnInterruptParams;
use codex_app_server_protocol::TurnStartParams as V2TurnStartParams;
use codex_app_server_protocol::UploadFeedbackParams;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
@@ -252,7 +246,7 @@ impl McpProcess {
/// Send a `feedback/upload` JSON-RPC request.
pub async fn send_upload_feedback_request(
&mut self,
params: FeedbackUploadParams,
params: UploadFeedbackParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("feedback/upload", params).await
@@ -284,21 +278,12 @@ impl McpProcess {
/// Send a `model/list` JSON-RPC request.
pub async fn send_list_models_request(
&mut self,
params: ModelListParams,
params: ListModelsParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("model/list", params).await
}
/// Send a `thread/start` JSON-RPC request (v2).
pub async fn send_thread_start_request(
&mut self,
params: ThreadStartParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/start", params).await
}
/// Send a `resumeConversation` JSON-RPC request.
pub async fn send_resume_conversation_request(
&mut self,
@@ -322,51 +307,6 @@ impl McpProcess {
self.send_request("loginChatGpt", None).await
}
/// Send a `thread/archive` JSON-RPC request (v2).
pub async fn send_thread_archive_request(
&mut self,
params: ThreadArchiveParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/archive", params).await
}
/// Send a `thread/list` JSON-RPC request (v2).
pub async fn send_thread_list_request(
&mut self,
params: ThreadListParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/list", params).await
}
/// Send a `thread/resume` JSON-RPC request (v2).
pub async fn send_thread_resume_request(
&mut self,
params: ThreadResumeParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("thread/resume", params).await
}
/// Send a `turn/start` JSON-RPC request (v2).
pub async fn send_turn_start_request(
&mut self,
params: V2TurnStartParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("turn/start", params).await
}
/// Send a `turn/interrupt` JSON-RPC request (v2).
pub async fn send_turn_interrupt_request(
&mut self,
params: V2TurnInterruptParams,
) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("turn/interrupt", params).await
}
/// Send a `cancelLoginChatGpt` JSON-RPC request.
pub async fn send_cancel_login_chat_gpt_request(
&mut self,

View File

@@ -29,27 +29,6 @@ pub async fn create_mock_chat_completions_server(responses: Vec<String>) -> Mock
server
}
/// Same as `create_mock_chat_completions_server` but does not set an expectation
/// on the number of requests. Useful for flows where the exact number of
/// model calls may vary (e.g., when the pipeline conditionally compacts or
/// when the test only needs a responsive endpoint).
pub async fn create_mock_chat_completions_server_unchecked(responses: Vec<String>) -> MockServer {
let server = MockServer::start().await;
let seq_responder = SeqResponder {
num_calls: AtomicUsize::new(0),
responses,
};
Mock::given(method("POST"))
.and(path("/v1/chat/completions"))
.respond_with(seq_responder)
.mount(&server)
.await;
server
}
struct SeqResponder {
num_calls: AtomicUsize,
responses: Vec<String>,

View File

@@ -1,77 +0,0 @@
use anyhow::Result;
use serde_json::json;
use std::fs;
use std::path::Path;
use uuid::Uuid;
/// Create a minimal rollout file under `CODEX_HOME/sessions/YYYY/MM/DD/`.
///
/// - `filename_ts` is the filename timestamp component in `YYYY-MM-DDThh-mm-ss` format.
/// - `meta_rfc3339` is the envelope timestamp used in JSON lines.
/// - `preview` is the user message preview text.
/// - `model_provider` optionally sets the provider in the session meta payload.
///
/// Returns the generated conversation/session UUID as a string.
pub fn create_fake_rollout(
codex_home: &Path,
filename_ts: &str,
meta_rfc3339: &str,
preview: &str,
model_provider: Option<&str>,
) -> Result<String> {
let uuid = Uuid::new_v4();
// sessions/YYYY/MM/DD derived from filename_ts (YYYY-MM-DDThh-mm-ss)
let year = &filename_ts[0..4];
let month = &filename_ts[5..7];
let day = &filename_ts[8..10];
let dir = codex_home.join("sessions").join(year).join(month).join(day);
fs::create_dir_all(&dir)?;
let file_path = dir.join(format!("rollout-{filename_ts}-{uuid}.jsonl"));
// Build JSONL lines
let mut payload = json!({
"id": uuid,
"timestamp": meta_rfc3339,
"cwd": "/",
"originator": "codex",
"cli_version": "0.0.0",
"instructions": null,
});
if let Some(provider) = model_provider {
payload["model_provider"] = json!(provider);
}
let lines = [
json!({
"timestamp": meta_rfc3339,
"type": "session_meta",
"payload": payload
})
.to_string(),
json!({
"timestamp": meta_rfc3339,
"type":"response_item",
"payload": {
"type":"message",
"role":"user",
"content":[{"type":"input_text","text": preview}]
}
})
.to_string(),
json!({
"timestamp": meta_rfc3339,
"type":"event_msg",
"payload": {
"type":"user_message",
"message": preview,
"kind": "plain"
}
})
.to_string(),
];
fs::write(file_path, lines.join("\n") + "\n")?;
Ok(uuid.to_string())
}

View File

@@ -1,6 +1,5 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
@@ -16,8 +15,12 @@ use codex_core::protocol::EventMsg;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::fs;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
use uuid::Uuid;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
@@ -354,3 +357,70 @@ async fn test_list_and_resume_conversations() -> Result<()> {
Ok(())
}
fn create_fake_rollout(
codex_home: &Path,
filename_ts: &str,
meta_rfc3339: &str,
preview: &str,
model_provider: Option<&str>,
) -> Result<()> {
let uuid = Uuid::new_v4();
// sessions/YYYY/MM/DD/ derived from filename_ts (YYYY-MM-DDThh-mm-ss)
let year = &filename_ts[0..4];
let month = &filename_ts[5..7];
let day = &filename_ts[8..10];
let dir = codex_home.join("sessions").join(year).join(month).join(day);
fs::create_dir_all(&dir)?;
let file_path = dir.join(format!("rollout-{filename_ts}-{uuid}.jsonl"));
let mut lines = Vec::new();
// Meta line with timestamp (flattened meta in payload for new schema)
let mut payload = json!({
"id": uuid,
"timestamp": meta_rfc3339,
"cwd": "/",
"originator": "codex",
"cli_version": "0.0.0",
"instructions": null,
});
if let Some(provider) = model_provider {
payload["model_provider"] = json!(provider);
}
lines.push(
json!({
"timestamp": meta_rfc3339,
"type": "session_meta",
"payload": payload
})
.to_string(),
);
// Minimal user message entry as a persisted response item (with envelope timestamp)
lines.push(
json!({
"timestamp": meta_rfc3339,
"type":"response_item",
"payload": {
"type":"message",
"role":"user",
"content":[{"type":"input_text","text": preview}]
}
})
.to_string(),
);
// Add a matching user message event line to satisfy filters
lines.push(
json!({
"timestamp": meta_rfc3339,
"type":"event_msg",
"payload": {
"type":"user_message",
"message": preview,
"kind": "plain"
}
})
.to_string(),
);
fs::write(file_path, lines.join("\n") + "\n")?;
Ok(())
}

View File

@@ -7,6 +7,8 @@ mod fuzzy_file_search;
mod interrupt;
mod list_resume;
mod login;
mod model_list;
mod rate_limits;
mod send_message;
mod set_default_model;
mod user_agent;

View File

@@ -6,9 +6,9 @@ use app_test_support::McpProcess;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::ListModelsParams;
use codex_app_server_protocol::ListModelsResponse;
use codex_app_server_protocol::Model;
use codex_app_server_protocol::ModelListParams;
use codex_app_server_protocol::ModelListResponse;
use codex_app_server_protocol::ReasoningEffortOption;
use codex_app_server_protocol::RequestId;
use codex_protocol::config_types::ReasoningEffort;
@@ -19,7 +19,7 @@ use tokio::time::timeout;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn list_models_returns_all_models_with_large_limit() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
@@ -27,9 +27,9 @@ async fn list_models_returns_all_models_with_large_limit() -> Result<()> {
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_list_models_request(ModelListParams {
.send_list_models_request(ListModelsParams {
page_size: Some(100),
cursor: None,
limit: Some(100),
})
.await?;
@@ -39,7 +39,7 @@ async fn list_models_returns_all_models_with_large_limit() -> Result<()> {
)
.await??;
let ModelListResponse { data, next_cursor } = to_response::<ModelListResponse>(response)?;
let ListModelsResponse { items, next_cursor } = to_response::<ListModelsResponse>(response)?;
let expected_models = vec![
Model {
@@ -98,12 +98,12 @@ async fn list_models_returns_all_models_with_large_limit() -> Result<()> {
},
];
assert_eq!(data, expected_models);
assert_eq!(items, expected_models);
assert!(next_cursor.is_none());
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn list_models_pagination_works() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
@@ -111,9 +111,9 @@ async fn list_models_pagination_works() -> Result<()> {
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let first_request = mcp
.send_list_models_request(ModelListParams {
.send_list_models_request(ListModelsParams {
page_size: Some(1),
cursor: None,
limit: Some(1),
})
.await?;
@@ -123,19 +123,19 @@ async fn list_models_pagination_works() -> Result<()> {
)
.await??;
let ModelListResponse {
data: first_items,
let ListModelsResponse {
items: first_items,
next_cursor: first_cursor,
} = to_response::<ModelListResponse>(first_response)?;
} = to_response::<ListModelsResponse>(first_response)?;
assert_eq!(first_items.len(), 1);
assert_eq!(first_items[0].id, "gpt-5-codex");
let next_cursor = first_cursor.ok_or_else(|| anyhow!("cursor for second page"))?;
let second_request = mcp
.send_list_models_request(ModelListParams {
.send_list_models_request(ListModelsParams {
page_size: Some(1),
cursor: Some(next_cursor.clone()),
limit: Some(1),
})
.await?;
@@ -145,10 +145,10 @@ async fn list_models_pagination_works() -> Result<()> {
)
.await??;
let ModelListResponse {
data: second_items,
let ListModelsResponse {
items: second_items,
next_cursor: second_cursor,
} = to_response::<ModelListResponse>(second_response)?;
} = to_response::<ListModelsResponse>(second_response)?;
assert_eq!(second_items.len(), 1);
assert_eq!(second_items[0].id, "gpt-5");
@@ -156,7 +156,7 @@ async fn list_models_pagination_works() -> Result<()> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn list_models_rejects_invalid_cursor() -> Result<()> {
let codex_home = TempDir::new()?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
@@ -164,9 +164,9 @@ async fn list_models_rejects_invalid_cursor() -> Result<()> {
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_list_models_request(ModelListParams {
.send_list_models_request(ListModelsParams {
page_size: None,
cursor: Some("invalid".to_string()),
limit: None,
})
.await?;

View File

@@ -7,10 +7,10 @@ use codex_app_server_protocol::GetAccountRateLimitsResponse;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::LoginApiKeyParams;
use codex_app_server_protocol::RateLimitSnapshot;
use codex_app_server_protocol::RateLimitWindow;
use codex_app_server_protocol::RequestId;
use codex_core::auth::AuthCredentialsStoreMode;
use codex_protocol::protocol::RateLimitSnapshot;
use codex_protocol::protocol::RateLimitWindow;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::path::Path;
@@ -26,7 +26,7 @@ use wiremock::matchers::path;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_account_rate_limits_requires_auth() -> Result<()> {
let codex_home = TempDir::new()?;
@@ -51,7 +51,7 @@ async fn get_account_rate_limits_requires_auth() -> Result<()> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_account_rate_limits_requires_chatgpt_auth() -> Result<()> {
let codex_home = TempDir::new()?;
@@ -78,7 +78,7 @@ async fn get_account_rate_limits_requires_chatgpt_auth() -> Result<()> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_account_rate_limits_returns_snapshot() -> Result<()> {
let codex_home = TempDir::new()?;
write_chatgpt_auth(
@@ -143,13 +143,13 @@ async fn get_account_rate_limits_returns_snapshot() -> Result<()> {
let expected = GetAccountRateLimitsResponse {
rate_limits: RateLimitSnapshot {
primary: Some(RateLimitWindow {
used_percent: 42,
window_duration_mins: Some(60),
used_percent: 42.0,
window_minutes: Some(60),
resets_at: Some(primary_reset_timestamp),
}),
secondary: Some(RateLimitWindow {
used_percent: 5,
window_duration_mins: Some(1440),
used_percent: 5.0,
window_minutes: Some(1440),
resets_at: Some(secondary_reset_timestamp),
}),
},

View File

@@ -1,9 +1,2 @@
// v2 test suite modules
mod account;
mod model_list;
mod rate_limits;
mod thread_archive;
mod thread_list;
mod thread_resume;
mod thread_start;
mod turn_interrupt;
mod turn_start;

View File

@@ -1,93 +0,0 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_core::ARCHIVED_SESSIONS_SUBDIR;
use codex_core::find_conversation_path_by_id_str;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_archive_moves_rollout_into_archived_directory() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Start a thread.
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(start_resp)?;
assert!(!thread.id.is_empty());
// Locate the rollout path recorded for this thread id.
let rollout_path = find_conversation_path_by_id_str(codex_home.path(), &thread.id)
.await?
.expect("expected rollout path for thread id to exist");
assert!(
rollout_path.exists(),
"expected {} to exist",
rollout_path.display()
);
// Archive the thread.
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: thread.id.clone(),
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
)
.await??;
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
// Verify file moved.
let archived_directory = codex_home.path().join(ARCHIVED_SESSIONS_SUBDIR);
// The archived file keeps the original filename (rollout-...-<id>.jsonl).
let archived_rollout_path =
archived_directory.join(rollout_path.file_name().expect("rollout file name"));
assert!(
!rollout_path.exists(),
"expected rollout path {} to be moved",
rollout_path.display()
);
assert!(
archived_rollout_path.exists(),
"expected archived rollout path {} to exist",
archived_rollout_path.display()
);
Ok(())
}
fn create_config_toml(codex_home: &Path) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(config_toml, config_contents())
}
fn config_contents() -> &'static str {
r#"model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
"#
}

View File

@@ -1,208 +0,0 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use serde_json::json;
use tempfile::TempDir;
use tokio::time::timeout;
use uuid::Uuid;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_list_basic_empty() -> Result<()> {
let codex_home = TempDir::new()?;
create_minimal_config(codex_home.path())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// List threads in an empty CODEX_HOME; should return an empty page with nextCursor: null.
let list_id = mcp
.send_thread_list_request(ThreadListParams {
cursor: None,
limit: Some(10),
model_providers: None,
})
.await?;
let list_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
)
.await??;
let ThreadListResponse { data, next_cursor } = to_response::<ThreadListResponse>(list_resp)?;
assert!(data.is_empty());
assert!(next_cursor.is_none());
Ok(())
}
// Minimal config.toml for listing.
fn create_minimal_config(codex_home: &std::path::Path) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
r#"
model = "mock-model"
approval_policy = "never"
"#,
)
}
#[tokio::test]
async fn thread_list_pagination_next_cursor_none_on_last_page() -> Result<()> {
let codex_home = TempDir::new()?;
create_minimal_config(codex_home.path())?;
// Create three rollouts so we can paginate with limit=2.
let _a = create_fake_rollout(
codex_home.path(),
"2025-01-02T12-00-00",
"2025-01-02T12:00:00Z",
"Hello",
Some("mock_provider"),
)?;
let _b = create_fake_rollout(
codex_home.path(),
"2025-01-01T13-00-00",
"2025-01-01T13:00:00Z",
"Hello",
Some("mock_provider"),
)?;
let _c = create_fake_rollout(
codex_home.path(),
"2025-01-01T12-00-00",
"2025-01-01T12:00:00Z",
"Hello",
Some("mock_provider"),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Page 1: limit 2 → expect next_cursor Some.
let page1_id = mcp
.send_thread_list_request(ThreadListParams {
cursor: None,
limit: Some(2),
model_providers: Some(vec!["mock_provider".to_string()]),
})
.await?;
let page1_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(page1_id)),
)
.await??;
let ThreadListResponse {
data: data1,
next_cursor: cursor1,
} = to_response::<ThreadListResponse>(page1_resp)?;
assert_eq!(data1.len(), 2);
let cursor1 = cursor1.expect("expected nextCursor on first page");
// Page 2: with cursor → expect next_cursor None when no more results.
let page2_id = mcp
.send_thread_list_request(ThreadListParams {
cursor: Some(cursor1),
limit: Some(2),
model_providers: Some(vec!["mock_provider".to_string()]),
})
.await?;
let page2_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(page2_id)),
)
.await??;
let ThreadListResponse {
data: data2,
next_cursor: cursor2,
} = to_response::<ThreadListResponse>(page2_resp)?;
assert!(data2.len() <= 2);
assert!(
cursor2.is_none(),
"expected nextCursor to be null on last page"
);
Ok(())
}
#[tokio::test]
async fn thread_list_respects_provider_filter() -> Result<()> {
let codex_home = TempDir::new()?;
create_minimal_config(codex_home.path())?;
// Create rollouts under two providers.
let _a = create_fake_rollout(
codex_home.path(),
"2025-01-02T10-00-00",
"2025-01-02T10:00:00Z",
"X",
Some("mock_provider"),
)?; // mock_provider
// one with a different provider
let uuid = Uuid::new_v4();
let dir = codex_home
.path()
.join("sessions")
.join("2025")
.join("01")
.join("02");
std::fs::create_dir_all(&dir)?;
let file_path = dir.join(format!("rollout-2025-01-02T11-00-00-{uuid}.jsonl"));
let lines = [
json!({
"timestamp": "2025-01-02T11:00:00Z",
"type": "session_meta",
"payload": {
"id": uuid,
"timestamp": "2025-01-02T11:00:00Z",
"cwd": "/",
"originator": "codex",
"cli_version": "0.0.0",
"instructions": null,
"source": "vscode",
"model_provider": "other_provider"
}
})
.to_string(),
json!({
"timestamp": "2025-01-02T11:00:00Z",
"type":"response_item",
"payload": {"type":"message","role":"user","content":[{"type":"input_text","text":"X"}]}
})
.to_string(),
json!({
"timestamp": "2025-01-02T11:00:00Z",
"type":"event_msg",
"payload": {"type":"user_message","message":"X","kind":"plain"}
})
.to_string(),
];
std::fs::write(file_path, lines.join("\n") + "\n")?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Filter to only other_provider; expect 1 item, nextCursor None.
let list_id = mcp
.send_thread_list_request(ThreadListParams {
cursor: None,
limit: Some(10),
model_providers: Some(vec!["other_provider".to_string()]),
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(list_id)),
)
.await??;
let ThreadListResponse { data, next_cursor } = to_response::<ThreadListResponse>(resp)?;
assert_eq!(data.len(), 1);
assert!(next_cursor.is_none());
Ok(())
}

View File

@@ -1,79 +0,0 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_mock_chat_completions_server;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_resume_returns_existing_thread() -> Result<()> {
let server = create_mock_chat_completions_server(vec![]).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Start a thread.
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("gpt-5-codex".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(start_resp)?;
// Resume it via v2 API.
let resume_id = mcp
.send_thread_resume_request(ThreadResumeParams {
thread_id: thread.id.clone(),
})
.await?;
let resume_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(resume_id)),
)
.await??;
let ThreadResumeResponse { thread: resumed } =
to_response::<ThreadResumeResponse>(resume_resp)?;
assert_eq!(resumed.id, thread.id);
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "workspace-write"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "chat"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}

View File

@@ -1,82 +0,0 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_mock_chat_completions_server;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
// Provide a mock server and config so model wiring is valid.
let server = create_mock_chat_completions_server(vec![]).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
// Start server and initialize.
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Start a v2 thread with an explicit model override.
let req_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("o3".to_string()),
..Default::default()
})
.await?;
// Expect a proper JSON-RPC response with a thread id.
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
)
.await??;
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(resp)?;
assert!(!thread.id.is_empty(), "thread id should not be empty");
// A corresponding thread/started notification should arrive.
let notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/started"),
)
.await??;
let started: ThreadStartedNotification =
serde_json::from_value(notif.params.expect("params must be present"))?;
assert_eq!(started.thread.id, thread.id);
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "workspace-write"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "chat"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}

View File

@@ -1,132 +0,0 @@
#![cfg(unix)]
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_mock_chat_completions_server;
use app_test_support::create_shell_sse_response;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnInterruptParams;
use codex_app_server_protocol::TurnInterruptResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn turn_interrupt_aborts_running_turn() -> Result<()> {
// Use a portable sleep command to keep the turn running.
#[cfg(target_os = "windows")]
let shell_command = vec![
"powershell".to_string(),
"-Command".to_string(),
"Start-Sleep -Seconds 10".to_string(),
];
#[cfg(not(target_os = "windows"))]
let shell_command = vec!["sleep".to_string(), "10".to_string()];
let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home");
std::fs::create_dir(&codex_home)?;
let working_directory = tmp.path().join("workdir");
std::fs::create_dir(&working_directory)?;
// Mock server: long-running shell command then (after abort) nothing else needed.
let server = create_mock_chat_completions_server(vec![create_shell_sse_response(
shell_command.clone(),
Some(&working_directory),
Some(10_000),
"call_sleep",
)?])
.await;
create_config_toml(&codex_home, &server.uri())?;
let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Start a v2 thread and capture its id.
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(thread_resp)?;
// Start a turn that triggers a long-running command.
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "run sleep".to_string(),
}],
cwd: Some(working_directory.clone()),
approval_policy: None,
sandbox_policy: None,
model: None,
effort: None,
summary: None,
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
// Give the command a brief moment to start.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// Interrupt the in-progress turn by id (v2 API).
let interrupt_id = mcp
.send_turn_interrupt_request(TurnInterruptParams {
thread_id: thread.id,
turn_id: turn.id,
})
.await?;
let interrupt_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(interrupt_id)),
)
.await??;
let _resp: TurnInterruptResponse = to_response::<TurnInterruptResponse>(interrupt_resp)?;
// No fields to assert on; successful deserialization confirms proper response shape.
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "workspace-write"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "chat"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}

View File

@@ -1,529 +0,0 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_chat_completions_server;
use app_test_support::create_mock_chat_completions_server_unchecked;
use app_test_support::create_shell_sse_response;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStartedNotification;
use codex_app_server_protocol::UserInput as V2UserInput;
use codex_core::protocol_config_types::ReasoningEffort;
use codex_core::protocol_config_types::ReasoningSummary;
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_protocol::parse_command::ParsedCommand;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use core_test_support::skip_if_no_network;
use pretty_assertions::assert_eq;
use std::env;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
#[tokio::test]
async fn turn_start_emits_notifications_and_accepts_model_override() -> Result<()> {
// Provide a mock server and config so model wiring is valid.
// Three Codex turns hit the mock model (session start + two turn/start calls).
let responses = vec![
create_final_assistant_message_sse_response("Done")?,
create_final_assistant_message_sse_response("Done")?,
create_final_assistant_message_sse_response("Done")?,
];
let server = create_mock_chat_completions_server_unchecked(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// Start a thread (v2) and capture its id.
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(thread_resp)?;
// Start a turn with only input and thread_id set (no overrides).
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
}],
cwd: None,
approval_policy: None,
sandbox_policy: None,
model: None,
effort: None,
summary: None,
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
assert!(!turn.id.is_empty());
// Expect a turn/started notification.
let notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/started"),
)
.await??;
let started: TurnStartedNotification =
serde_json::from_value(notif.params.expect("params must be present"))?;
assert_eq!(
started.turn.status,
codex_app_server_protocol::TurnStatus::InProgress
);
// Send a second turn that exercises the overrides path: change the model.
let turn_req2 = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "Second".to_string(),
}],
cwd: None,
approval_policy: None,
sandbox_policy: None,
model: Some("mock-model-override".to_string()),
effort: None,
summary: None,
})
.await?;
let turn_resp2: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req2)),
)
.await??;
let TurnStartResponse { turn: turn2 } = to_response::<TurnStartResponse>(turn_resp2)?;
assert!(!turn2.id.is_empty());
// Ensure the second turn has a different id than the first.
assert_ne!(turn.id, turn2.id);
// Expect a second turn/started notification as well.
let _notif2: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/started"),
)
.await??;
// And we should ultimately get a task_complete without having to add a
// legacy conversation listener explicitly (auto-attached by thread/start).
let _task_complete: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
)
.await??;
Ok(())
}
#[tokio::test]
async fn turn_start_accepts_local_image_input() -> Result<()> {
// Two Codex turns hit the mock model (session start + turn/start).
let responses = vec![
create_final_assistant_message_sse_response("Done")?,
create_final_assistant_message_sse_response("Done")?,
];
// Use the unchecked variant because the request payload includes a LocalImage
// which the strict matcher does not currently cover.
let server = create_mock_chat_completions_server_unchecked(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(thread_resp)?;
let image_path = codex_home.path().join("image.png");
// No need to actually write the file; we just exercise the input path.
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::LocalImage { path: image_path }],
cwd: None,
approval_policy: None,
sandbox_policy: None,
model: None,
effort: None,
summary: None,
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn } = to_response::<TurnStartResponse>(turn_resp)?;
assert!(!turn.id.is_empty());
// This test only validates that turn/start responds and returns a turn.
Ok(())
}
#[tokio::test]
async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
if env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() {
println!("Skipping v2 exec approval toggle test due to sandbox network disabled.");
return Ok(());
}
let tmp = TempDir::new()?;
let codex_home = tmp.path().to_path_buf();
// Mock server: first turn requests a shell call (elicitation), then completes.
// Second turn same, but we'll set approval_policy=never to avoid elicitation.
let responses = vec![
create_shell_sse_response(
vec![
"python3".to_string(),
"-c".to_string(),
"print(42)".to_string(),
],
None,
Some(5000),
"call1",
)?,
create_final_assistant_message_sse_response("done 1")?,
create_shell_sse_response(
vec![
"python3".to_string(),
"-c".to_string(),
"print(42)".to_string(),
],
None,
Some(5000),
"call2",
)?,
create_final_assistant_message_sse_response("done 2")?,
];
let server = create_mock_chat_completions_server(responses).await;
// Default approval is untrusted to force elicitation on first turn.
create_config_toml_untrusted(codex_home.as_path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.as_path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// thread/start
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(start_resp)?;
// turn/start — expect ExecCommandApproval request from server
let first_turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "run python".to_string(),
}],
cwd: None,
approval_policy: None,
sandbox_policy: None,
model: None,
effort: None,
summary: None,
})
.await?;
// Acknowledge RPC
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(first_turn_id)),
)
.await??;
// Receive elicitation
let server_req = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_request_message(),
)
.await??;
let ServerRequest::ExecCommandApproval { request_id, params } = server_req else {
panic!("expected ExecCommandApproval request");
};
assert_eq!(params.call_id, "call1");
assert_eq!(
params.parsed_cmd,
vec![ParsedCommand::Unknown {
cmd: "python3 -c 'print(42)'".to_string()
}]
);
// Approve and wait for task completion
mcp.send_response(
request_id,
serde_json::json!({ "decision": codex_core::protocol::ReviewDecision::Approved }),
)
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
)
.await??;
// Second turn with approval_policy=never should not elicit approval
let second_turn_id = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "run python again".to_string(),
}],
cwd: None,
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::DangerFullAccess),
model: Some("mock-model".to_string()),
effort: Some(ReasoningEffort::Medium),
summary: Some(ReasoningSummary::Auto),
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(second_turn_id)),
)
.await??;
// Ensure we do NOT receive an ExecCommandApproval request before task completes
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
)
.await??;
Ok(())
}
#[tokio::test]
async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
// When returning Result from a test, pass an Ok(()) to the skip macro
// so the early return type matches. The no-arg form returns unit.
skip_if_no_network!(Ok(()));
let tmp = TempDir::new()?;
let codex_home = tmp.path().join("codex_home");
std::fs::create_dir(&codex_home)?;
let workspace_root = tmp.path().join("workspace");
std::fs::create_dir(&workspace_root)?;
let first_cwd = workspace_root.join("turn1");
let second_cwd = workspace_root.join("turn2");
std::fs::create_dir(&first_cwd)?;
std::fs::create_dir(&second_cwd)?;
let responses = vec![
create_shell_sse_response(
vec![
"bash".to_string(),
"-lc".to_string(),
"echo first turn".to_string(),
],
None,
Some(5000),
"call-first",
)?,
create_final_assistant_message_sse_response("done first")?,
create_shell_sse_response(
vec![
"bash".to_string(),
"-lc".to_string(),
"echo second turn".to_string(),
],
None,
Some(5000),
"call-second",
)?,
create_final_assistant_message_sse_response("done second")?,
];
let server = create_mock_chat_completions_server(responses).await;
create_config_toml_untrusted(&codex_home, &server.uri())?;
let mut mcp = McpProcess::new(&codex_home).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
// thread/start
let start_id = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let start_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(start_id)),
)
.await??;
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(start_resp)?;
// first turn with workspace-write sandbox and first_cwd
let first_turn = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "first turn".to_string(),
}],
cwd: Some(first_cwd.clone()),
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::WorkspaceWrite {
writable_roots: vec![first_cwd.clone()],
network_access: false,
exclude_tmpdir_env_var: false,
exclude_slash_tmp: false,
}),
model: Some("mock-model".to_string()),
effort: Some(ReasoningEffort::Medium),
summary: Some(ReasoningSummary::Auto),
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(first_turn)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
)
.await??;
// second turn with workspace-write and second_cwd, ensure exec begins in second_cwd
let second_turn = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
input: vec![V2UserInput::Text {
text: "second turn".to_string(),
}],
cwd: Some(second_cwd.clone()),
approval_policy: Some(codex_app_server_protocol::AskForApproval::Never),
sandbox_policy: Some(codex_app_server_protocol::SandboxPolicy::DangerFullAccess),
model: Some("mock-model".to_string()),
effort: Some(ReasoningEffort::Medium),
summary: Some(ReasoningSummary::Auto),
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(second_turn)),
)
.await??;
let exec_begin_notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/exec_command_begin"),
)
.await??;
let params = exec_begin_notification
.params
.clone()
.expect("exec_command_begin params");
let event: Event = serde_json::from_value(params).expect("deserialize exec begin event");
let exec_begin = match event.msg {
EventMsg::ExecCommandBegin(exec_begin) => exec_begin,
other => panic!("expected ExecCommandBegin event, got {other:?}"),
};
assert_eq!(exec_begin.cwd, second_cwd);
assert_eq!(
exec_begin.command,
vec![
"bash".to_string(),
"-lc".to_string(),
"echo second turn".to_string()
]
);
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("codex/event/task_complete"),
)
.await??;
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "workspace-write"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "chat"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}
fn create_config_toml_untrusted(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "untrusted"
sandbox_mode = "workspace-write"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "chat"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}

View File

@@ -15,6 +15,13 @@ use uuid::Uuid;
use crate::user_instructions::UserInstructions;
fn is_user_shell_command_prefix(text: &str) -> bool {
let trimmed = text.trim_start();
let lowered = trimmed.to_ascii_lowercase();
lowered.starts_with("<user_shell_command>")
|| lowered.starts_with("<user_shell_command_output>")
}
fn is_session_prefix(text: &str) -> bool {
let trimmed = text.trim_start();
let lowered = trimmed.to_ascii_lowercase();
@@ -31,7 +38,7 @@ fn parse_user_message(message: &[ContentItem]) -> Option<UserMessageItem> {
for content_item in message.iter() {
match content_item {
ContentItem::InputText { text } => {
if is_session_prefix(text) {
if is_session_prefix(text) || is_user_shell_command_prefix(text) {
return None;
}
content.push(UserInput::Text { text: text.clone() });
@@ -42,7 +49,7 @@ fn parse_user_message(message: &[ContentItem]) -> Option<UserMessageItem> {
});
}
ContentItem::OutputText { text } => {
if is_session_prefix(text) {
if is_session_prefix(text) || is_user_shell_command_prefix(text) {
return None;
}
warn!("Output text in user message: {}", text);
@@ -197,6 +204,21 @@ mod tests {
text: "# AGENTS.md instructions for test_directory\n\n<INSTRUCTIONS>\ntest_text\n</INSTRUCTIONS>".to_string(),
}],
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "<user_shell_command>echo 42</user_shell_command>".to_string(),
}],
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "<user_shell_command_output>done</user_shell_command_output>"
.to_string(),
}],
},
];
for item in items {

View File

@@ -6,6 +6,7 @@ use std::io;
use std::path::Path;
use std::path::PathBuf;
use std::process::ExitStatus;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
@@ -23,6 +24,7 @@ use crate::protocol::EventMsg;
use crate::protocol::ExecCommandOutputDeltaEvent;
use crate::protocol::ExecOutputStream;
use crate::protocol::SandboxPolicy;
use crate::protocol::UserCommandOutputDeltaEvent;
use crate::sandboxing::CommandSpec;
use crate::sandboxing::ExecEnv;
use crate::sandboxing::SandboxManager;
@@ -84,6 +86,43 @@ pub struct StdoutStream {
pub tx_event: Sender<Event>,
}
type DeltaEventFn = dyn Fn(&str, ExecOutputStream, Vec<u8>) -> EventMsg + Send + Sync;
#[derive(Clone)]
pub struct DeltaEventBuilder {
inner: Arc<DeltaEventFn>,
}
impl DeltaEventBuilder {
pub fn exec_command() -> Self {
Self {
inner: Arc::new(|call_id, stream, chunk| {
EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
call_id: call_id.to_string(),
stream,
chunk,
})
}),
}
}
pub fn user_command() -> Self {
Self {
inner: Arc::new(|call_id, stream, chunk| {
EventMsg::UserCommandOutputDelta(UserCommandOutputDeltaEvent {
call_id: call_id.to_string(),
stream,
chunk,
})
}),
}
}
pub fn build(&self, call_id: &str, stream: ExecOutputStream, chunk: Vec<u8>) -> EventMsg {
(self.inner)(call_id, stream, chunk)
}
}
pub async fn process_exec_tool_call(
params: ExecParams,
sandbox_type: SandboxType,
@@ -138,6 +177,7 @@ pub(crate) async fn execute_exec_env(
env: ExecEnv,
sandbox_policy: &SandboxPolicy,
stdout_stream: Option<StdoutStream>,
delta_event_builder: Option<DeltaEventBuilder>,
) -> Result<ExecToolCallOutput> {
let ExecEnv {
command,
@@ -161,7 +201,15 @@ pub(crate) async fn execute_exec_env(
};
let start = Instant::now();
let raw_output_result = exec(params, sandbox, sandbox_policy, stdout_stream).await;
let delta_event_builder = delta_event_builder.unwrap_or_else(DeltaEventBuilder::exec_command);
let raw_output_result = exec(
params,
sandbox,
sandbox_policy,
stdout_stream,
delta_event_builder.clone(),
)
.await;
let duration = start.elapsed();
finalize_exec_result(raw_output_result, sandbox, duration)
}
@@ -434,6 +482,7 @@ async fn exec(
sandbox: SandboxType,
sandbox_policy: &SandboxPolicy,
stdout_stream: Option<StdoutStream>,
delta_event_builder: DeltaEventBuilder,
) -> Result<RawExecToolCallOutput> {
#[cfg(target_os = "windows")]
if sandbox == SandboxType::WindowsRestrictedToken {
@@ -465,7 +514,7 @@ async fn exec(
env,
)
.await?;
consume_truncated_output(child, timeout, stdout_stream).await
consume_truncated_output(child, timeout, stdout_stream, delta_event_builder).await
}
/// Consumes the output of a child process, truncating it so it is suitable for
@@ -474,6 +523,7 @@ async fn consume_truncated_output(
mut child: Child,
timeout: Duration,
stdout_stream: Option<StdoutStream>,
delta_event_builder: DeltaEventBuilder,
) -> Result<RawExecToolCallOutput> {
// Both stdout and stderr were configured with `Stdio::piped()`
// above, therefore `take()` should normally return `Some`. If it doesn't
@@ -497,12 +547,14 @@ async fn consume_truncated_output(
stdout_stream.clone(),
false,
Some(agg_tx.clone()),
delta_event_builder.clone(),
));
let stderr_handle = tokio::spawn(read_capped(
BufReader::new(stderr_reader),
stdout_stream.clone(),
true,
Some(agg_tx.clone()),
delta_event_builder.clone(),
));
let (exit_status, timed_out) = tokio::select! {
@@ -554,6 +606,7 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
stream: Option<StdoutStream>,
is_stderr: bool,
aggregate_tx: Option<Sender<Vec<u8>>>,
delta_event_builder: DeltaEventBuilder,
) -> io::Result<StreamOutput<Vec<u8>>> {
let mut buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY);
let mut tmp = [0u8; READ_CHUNK_SIZE];
@@ -571,15 +624,15 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
&& emitted_deltas < MAX_EXEC_OUTPUT_DELTAS_PER_CALL
{
let chunk = tmp[..n].to_vec();
let msg = EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
call_id: stream.call_id.clone(),
stream: if is_stderr {
let msg = delta_event_builder.build(
&stream.call_id,
if is_stderr {
ExecOutputStream::Stderr
} else {
ExecOutputStream::Stdout
},
chunk,
});
);
let event = Event {
id: stream.sub_id.clone(),
msg,

View File

@@ -41,6 +41,9 @@ pub(crate) fn should_persist_event_msg(ev: &EventMsg) -> bool {
| EventMsg::AgentReasoning(_)
| EventMsg::AgentReasoningRawContent(_)
| EventMsg::TokenCount(_)
| EventMsg::UserCommandBegin(_)
| EventMsg::UserCommandOutputDelta(_)
| EventMsg::UserCommandEnd(_)
| EventMsg::EnteredReviewMode(_)
| EventMsg::ExitedReviewMode(_)
| EventMsg::UndoCompleted(_)

View File

@@ -165,5 +165,5 @@ pub async fn execute_env(
policy: &SandboxPolicy,
stdout_stream: Option<StdoutStream>,
) -> crate::error::Result<ExecToolCallOutput> {
execute_exec_env(env.clone(), policy, stdout_stream).await
execute_exec_env(env.clone(), policy, stdout_stream, None).await
}

View File

@@ -1,28 +1,36 @@
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use codex_protocol::models::ShellToolCallParams;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::user_input::UserInput;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use tracing::error;
use uuid::Uuid;
use crate::codex::TurnContext;
use crate::exec::DeltaEventBuilder;
use crate::exec::ExecToolCallOutput;
use crate::exec::SandboxType;
use crate::exec::StdoutStream;
use crate::exec::StreamOutput;
use crate::exec::execute_exec_env;
use crate::exec_env::create_env;
use crate::parse_command::parse_command;
use crate::protocol::EventMsg;
use crate::protocol::SandboxPolicy;
use crate::protocol::TaskStartedEvent;
use crate::protocol::UserCommandBeginEvent;
use crate::protocol::UserCommandEndEvent;
use crate::sandboxing::ExecEnv;
use crate::state::TaskKind;
use crate::tools::context::ToolPayload;
use crate::tools::parallel::ToolCallRuntime;
use crate::tools::router::ToolCall;
use crate::tools::router::ToolRouter;
use crate::turn_diff_tracker::TurnDiffTracker;
use crate::tools::format_exec_output_for_model;
use crate::tools::format_exec_output_str;
use super::SessionTask;
use super::SessionTaskContext;
const USER_SHELL_TOOL_NAME: &str = "local_shell";
#[derive(Clone)]
pub(crate) struct UserShellCommandTask {
command: String,
@@ -78,34 +86,150 @@ impl SessionTask for UserShellCommandTask {
}
};
let params = ShellToolCallParams {
fn build_user_message(text: String) -> ResponseItem {
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText { text }],
}
}
let call_id = Uuid::new_v4().to_string();
let raw_command = self.command.clone();
let command_text = format!("<user_shell_command>\n{raw_command}\n</user_shell_command>");
let command_items = [build_user_message(command_text)];
session
.record_conversation_items(turn_context.as_ref(), &command_items)
.await;
let parsed_cmd = parse_command(&shell_invocation);
session
.send_event(
turn_context.as_ref(),
EventMsg::UserCommandBegin(UserCommandBeginEvent {
call_id: call_id.clone(),
command: shell_invocation.clone(),
cwd: turn_context.cwd.clone(),
parsed_cmd,
}),
)
.await;
let exec_env = ExecEnv {
command: shell_invocation,
workdir: None,
cwd: turn_context.cwd.clone(),
env: create_env(&turn_context.shell_environment_policy),
timeout_ms: None,
sandbox: SandboxType::None,
with_escalated_permissions: None,
justification: None,
arg0: None,
};
let tool_call = ToolCall {
tool_name: USER_SHELL_TOOL_NAME.to_string(),
call_id: Uuid::new_v4().to_string(),
payload: ToolPayload::LocalShell { params },
};
let stdout_stream = Some(StdoutStream {
sub_id: turn_context.sub_id.clone(),
call_id: call_id.clone(),
tx_event: session.get_tx_event(),
});
let router = Arc::new(ToolRouter::from_config(&turn_context.tools_config, None));
let tracker = Arc::new(Mutex::new(TurnDiffTracker::new()));
let runtime = ToolCallRuntime::new(
Arc::clone(&router),
Arc::clone(&session),
Arc::clone(&turn_context),
Arc::clone(&tracker),
let sandbox_policy = SandboxPolicy::DangerFullAccess;
let exec_future = execute_exec_env(
exec_env,
&sandbox_policy,
stdout_stream,
Some(DeltaEventBuilder::user_command()),
);
tokio::pin!(exec_future);
if let Err(err) = runtime
.handle_tool_call(tool_call, cancellation_token)
.await
{
error!("user shell command failed: {err:?}");
let exec_result = tokio::select! {
res = &mut exec_future => Some(res),
_ = cancellation_token.cancelled() => None,
};
match exec_result {
None => {
let aborted_message = "command aborted by user".to_string();
let aborted_text = format!(
"<user_shell_command_output>\n{aborted_message}\n</user_shell_command_output>"
);
let output_items = [build_user_message(aborted_text)];
session
.record_conversation_items(turn_context.as_ref(), &output_items)
.await;
session
.send_event(
turn_context.as_ref(),
EventMsg::UserCommandEnd(UserCommandEndEvent {
call_id,
stdout: String::new(),
stderr: aborted_message.clone(),
aggregated_output: aborted_message.clone(),
exit_code: -1,
duration: Duration::ZERO,
formatted_output: aborted_message,
}),
)
.await;
}
Some(Ok(output)) => {
session
.send_event(
turn_context.as_ref(),
EventMsg::UserCommandEnd(UserCommandEndEvent {
call_id: call_id.clone(),
stdout: output.stdout.text.clone(),
stderr: output.stderr.text.clone(),
aggregated_output: output.aggregated_output.text.clone(),
exit_code: output.exit_code,
duration: output.duration,
formatted_output: format_exec_output_str(&output),
}),
)
.await;
let output_payload = format_exec_output_for_model(&output);
let output_text = format!(
"<user_shell_command_output>\n{output_payload}\n</user_shell_command_output>"
);
let output_items = [build_user_message(output_text)];
session
.record_conversation_items(turn_context.as_ref(), &output_items)
.await;
}
Some(Err(err)) => {
error!("user shell command failed: {err:?}");
let message = format!("execution error: {err:?}");
let exec_output = ExecToolCallOutput {
exit_code: -1,
stdout: StreamOutput::new(String::new()),
stderr: StreamOutput::new(message.clone()),
aggregated_output: StreamOutput::new(message.clone()),
duration: Duration::ZERO,
timed_out: false,
};
session
.send_event(
turn_context.as_ref(),
EventMsg::UserCommandEnd(UserCommandEndEvent {
call_id,
stdout: exec_output.stdout.text.clone(),
stderr: exec_output.stderr.text.clone(),
aggregated_output: exec_output.aggregated_output.text.clone(),
exit_code: exec_output.exit_code,
duration: exec_output.duration,
formatted_output: format_exec_output_str(&exec_output),
}),
)
.await;
let output_payload = format_exec_output_for_model(&exec_output);
let output_text = format!(
"<user_shell_command_output>\n{output_payload}\n</user_shell_command_output>"
);
let output_items = [build_user_message(output_text)];
session
.record_conversation_items(turn_context.as_ref(), &output_items)
.await;
}
}
None
}

View File

@@ -1,11 +1,14 @@
use codex_core::ConversationManager;
use codex_core::NewConversation;
use codex_core::protocol::EventMsg;
use codex_core::protocol::ExecCommandEndEvent;
use codex_core::protocol::ExecOutputStream;
use codex_core::protocol::Op;
use codex_core::protocol::TurnAbortReason;
use codex_core::protocol::UserCommandEndEvent;
use core_test_support::load_default_config_for_test;
use core_test_support::responses;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use std::path::PathBuf;
use std::process::Command;
use std::process::Stdio;
@@ -63,8 +66,8 @@ async fn user_shell_cmd_ls_and_cat_in_temp_dir() {
.submit(Op::RunUserShellCommand { command: list_cmd })
.await
.unwrap();
let msg = wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandEnd(_))).await;
let EventMsg::ExecCommandEnd(ExecCommandEndEvent {
let msg = wait_for_event(&codex, |ev| matches!(ev, EventMsg::UserCommandEnd(_))).await;
let EventMsg::UserCommandEnd(UserCommandEndEvent {
stdout, exit_code, ..
}) = msg
else {
@@ -84,8 +87,8 @@ async fn user_shell_cmd_ls_and_cat_in_temp_dir() {
.submit(Op::RunUserShellCommand { command: cat_cmd })
.await
.unwrap();
let msg = wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandEnd(_))).await;
let EventMsg::ExecCommandEnd(ExecCommandEndEvent {
let msg = wait_for_event(&codex, |ev| matches!(ev, EventMsg::UserCommandEnd(_))).await;
let EventMsg::UserCommandEnd(UserCommandEndEvent {
mut stdout,
exit_code,
..
@@ -128,7 +131,7 @@ async fn user_shell_cmd_can_be_interrupted() {
.unwrap();
// Wait until it has started (ExecCommandBegin), then interrupt.
let _ = wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await;
let _ = wait_for_event(&codex, |ev| matches!(ev, EventMsg::UserCommandBegin(_))).await;
codex.submit(Op::Interrupt).await.unwrap();
// Expect a TurnAborted(Interrupted) notification.
@@ -138,3 +141,138 @@ async fn user_shell_cmd_can_be_interrupted() {
};
assert_eq!(ev.reason, TurnAbortReason::Interrupted);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn user_shell_command_history_is_persisted_and_shared_with_model() -> anyhow::Result<()> {
let Some(python) = detect_python_executable() else {
eprintln!("skipping test: python3 not found in PATH");
return Ok(());
};
let server = responses::start_mock_server().await;
let mut builder = core_test_support::test_codex::test_codex();
let test = builder.build(&server).await?;
let script_path = test.workspace_path("print_sandbox_env.py");
tokio::fs::write(
&script_path,
"import os\nimport sys\nvalue = os.environ.get('CODEX_SANDBOX', 'not-set')\n\
sys.stdout.write(value)\n",
)
.await?;
let script_display = script_path.display().to_string();
let script_arg = if script_display.contains(' ') {
format!("\"{script_display}\"")
} else {
script_display
};
let command = format!("{python} {script_arg}");
test.codex
.submit(Op::RunUserShellCommand {
command: command.clone(),
})
.await?;
let begin_event = wait_for_event_match(&test.codex, |ev| match ev {
EventMsg::UserCommandBegin(event) => Some(event.clone()),
_ => None,
})
.await;
if begin_event.command.last() != Some(&command) {
let expected_tokens = shlex::split(&command).unwrap_or_else(|| vec![command.clone()]);
assert_eq!(
begin_event.command, expected_tokens,
"user command begin event did not include expected command tokens.\nexpected={expected_tokens:?}\nactual={:?}",
begin_event.command
);
}
let delta_event = wait_for_event_match(&test.codex, |ev| match ev {
EventMsg::UserCommandOutputDelta(event) => Some(event.clone()),
_ => None,
})
.await;
assert_eq!(delta_event.stream, ExecOutputStream::Stdout);
let chunk_text =
String::from_utf8(delta_event.chunk.clone()).expect("user command chunk is valid utf-8");
assert_eq!(chunk_text.trim(), "not-set");
let end_event = wait_for_event_match(&test.codex, |ev| match ev {
EventMsg::UserCommandEnd(event) => Some(event.clone()),
_ => None,
})
.await;
assert_eq!(end_event.exit_code, 0);
assert_eq!(end_event.stdout.trim(), "not-set");
let _ = wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
let responses = vec![responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_assistant_message("msg-1", "done"),
responses::ev_completed("resp-1"),
])];
let mock = responses::mount_sse_sequence(&server, responses).await;
test.submit_turn("follow-up after shell command").await?;
let request = mock.single_request();
let items = request.input();
fn find_user_text(items: &[serde_json::Value], marker: &str) -> Option<String> {
items.iter().find_map(|item| {
if item.get("type").and_then(serde_json::Value::as_str) != Some("message") {
return None;
}
if item.get("role").and_then(serde_json::Value::as_str) != Some("user") {
return None;
}
let content = item.get("content")?.as_array()?;
content.iter().find_map(|span| {
if span.get("type").and_then(serde_json::Value::as_str) == Some("input_text") {
let text = span.get("text").and_then(serde_json::Value::as_str)?;
if text.contains(marker) {
return Some(text.to_string());
}
}
None
})
})
}
let command_message = find_user_text(&items, "<user_shell_command>")
.expect("command message recorded in request");
assert!(
command_message.contains(&command),
"command message should include shell invocation: {command_message}"
);
let output_message = find_user_text(&items, "<user_shell_command_output>")
.expect("output message recorded in request");
let payload = output_message
.strip_prefix("<user_shell_command_output>\n")
.and_then(|text| text.strip_suffix("\n</user_shell_command_output>"))
.expect("shell command output payload present");
let parsed: serde_json::Value =
serde_json::from_str(payload).expect("parse shell command output payload");
assert_eq!(
parsed
.get("metadata")
.and_then(|meta| meta.get("exit_code"))
.and_then(serde_json::Value::as_i64),
Some(0),
"expected exit_code metadata to be present and zero",
);
let output_text = parsed
.get("output")
.and_then(serde_json::Value::as_str)
.expect("model-facing output string present");
assert!(
output_text.contains("not-set"),
"model-facing output should include stdout content: {output_text:?}"
);
Ok(())
}

View File

@@ -518,6 +518,9 @@ impl EventProcessor for EventProcessorWithHumanOutput {
| EventMsg::ExecApprovalRequest(_)
| EventMsg::ApplyPatchApprovalRequest(_)
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::UserCommandBegin(_)
| EventMsg::UserCommandOutputDelta(_)
| EventMsg::UserCommandEnd(_)
| EventMsg::GetHistoryEntryResponse(_)
| EventMsg::McpListToolsResponse(_)
| EventMsg::ListCustomPromptsResponse(_)

View File

@@ -48,6 +48,8 @@ use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::TaskCompleteEvent;
use codex_core::protocol::TaskStartedEvent;
use codex_core::protocol::UserCommandBeginEvent;
use codex_core::protocol::UserCommandEndEvent;
use codex_core::protocol::WebSearchEndEvent;
use codex_protocol::plan_tool::StepStatus;
use codex_protocol::plan_tool::UpdatePlanArgs;
@@ -109,6 +111,9 @@ impl EventProcessorWithJsonOutput {
EventMsg::AgentReasoning(ev) => self.handle_reasoning_event(ev),
EventMsg::ExecCommandBegin(ev) => self.handle_exec_command_begin(ev),
EventMsg::ExecCommandEnd(ev) => self.handle_exec_command_end(ev),
EventMsg::UserCommandBegin(ev) => self.handle_user_command_begin(ev),
EventMsg::UserCommandEnd(ev) => self.handle_user_command_end(ev),
EventMsg::UserCommandOutputDelta(_) => Vec::new(),
EventMsg::McpToolCallBegin(ev) => self.handle_mcp_tool_call_begin(ev),
EventMsg::McpToolCallEnd(ev) => self.handle_mcp_tool_call_end(ev),
EventMsg::PatchApplyBegin(ev) => self.handle_patch_apply_begin(ev),
@@ -195,22 +200,28 @@ impl EventProcessorWithJsonOutput {
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })]
}
fn handle_exec_command_begin(&mut self, ev: &ExecCommandBeginEvent) -> Vec<ThreadEvent> {
fn start_command_item(
&mut self,
call_id: &str,
command: &[String],
warn_subject: &'static str,
) -> Vec<ThreadEvent> {
let item_id = self.get_next_item_id();
let command_string = match shlex::try_join(ev.command.iter().map(String::as_str)) {
let command_string = match shlex::try_join(command.iter().map(String::as_str)) {
Ok(command_string) => command_string,
Err(e) => {
warn!(
call_id = ev.call_id,
"Failed to stringify command: {e:?}; skipping item.started"
call_id = call_id,
"Failed to stringify {warn_subject}: {e:?}; skipping item.started"
);
ev.command.join(" ")
command.join(" ")
}
};
self.running_commands.insert(
ev.call_id.clone(),
call_id.to_string(),
RunningCommand {
command: command_string.clone(),
item_id: item_id.clone(),
@@ -230,6 +241,45 @@ impl EventProcessorWithJsonOutput {
vec![ThreadEvent::ItemStarted(ItemStartedEvent { item })]
}
fn finish_command_item(
&mut self,
call_id: &str,
aggregated_output: &str,
exit_code: i32,
warn_event_names: (&'static str, &'static str),
) -> Vec<ThreadEvent> {
let some_running_command = self.running_commands.remove(call_id);
let Some(RunningCommand { command, item_id }) = some_running_command else {
let (end_event, begin_event) = warn_event_names;
warn!(
call_id = call_id,
"{end_event} without matching {begin_event}; skipping item.completed"
);
return Vec::new();
};
let status = if exit_code == 0 {
CommandExecutionStatus::Completed
} else {
CommandExecutionStatus::Failed
};
let item = ThreadItem {
id: item_id,
details: ThreadItemDetails::CommandExecution(CommandExecutionItem {
command,
aggregated_output: aggregated_output.to_string(),
exit_code: Some(exit_code),
status,
}),
};
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })]
}
fn handle_exec_command_begin(&mut self, ev: &ExecCommandBeginEvent) -> Vec<ThreadEvent> {
self.start_command_item(&ev.call_id, &ev.command, "command")
}
fn handle_mcp_tool_call_begin(&mut self, ev: &McpToolCallBeginEvent) -> Vec<ThreadEvent> {
let item_id = self.get_next_item_id();
let server = ev.invocation.server.clone();
@@ -321,6 +371,19 @@ impl EventProcessorWithJsonOutput {
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })]
}
fn handle_user_command_begin(&mut self, ev: &UserCommandBeginEvent) -> Vec<ThreadEvent> {
self.start_command_item(&ev.call_id, &ev.command, "user command")
}
fn handle_user_command_end(&mut self, ev: &UserCommandEndEvent) -> Vec<ThreadEvent> {
self.finish_command_item(
&ev.call_id,
&ev.aggregated_output,
ev.exit_code,
("UserCommandEnd", "UserCommandBegin"),
)
}
fn handle_patch_apply_begin(&mut self, ev: &PatchApplyBeginEvent) -> Vec<ThreadEvent> {
self.running_patch_applies
.insert(ev.call_id.clone(), ev.clone());
@@ -366,31 +429,12 @@ impl EventProcessorWithJsonOutput {
}
fn handle_exec_command_end(&mut self, ev: &ExecCommandEndEvent) -> Vec<ThreadEvent> {
let Some(RunningCommand { command, item_id }) = self.running_commands.remove(&ev.call_id)
else {
warn!(
call_id = ev.call_id,
"ExecCommandEnd without matching ExecCommandBegin; skipping item.completed"
);
return Vec::new();
};
let status = if ev.exit_code == 0 {
CommandExecutionStatus::Completed
} else {
CommandExecutionStatus::Failed
};
let item = ThreadItem {
id: item_id,
details: ThreadItemDetails::CommandExecution(CommandExecutionItem {
command,
aggregated_output: ev.aggregated_output.clone(),
exit_code: Some(ev.exit_code),
status,
}),
};
vec![ThreadEvent::ItemCompleted(ItemCompletedEvent { item })]
self.finish_command_item(
&ev.call_id,
&ev.aggregated_output,
ev.exit_code,
("ExecCommandEnd", "ExecCommandBegin"),
)
}
fn todo_items_from_plan(&self, args: &UpdatePlanArgs) -> Vec<TodoItem> {

View File

@@ -274,6 +274,9 @@ async fn run_codex_tool_session_inner(
| EventMsg::ExecCommandBegin(_)
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::ExecCommandEnd(_)
| EventMsg::UserCommandBegin(_)
| EventMsg::UserCommandOutputDelta(_)
| EventMsg::UserCommandEnd(_)
| EventMsg::BackgroundEvent(_)
| EventMsg::StreamError(_)
| EventMsg::PatchApplyBegin(_)

View File

@@ -493,6 +493,15 @@ pub enum EventMsg {
ExecCommandEnd(ExecCommandEndEvent),
/// Notification that the user initiated a shell command.
UserCommandBegin(UserCommandBeginEvent),
/// Incremental chunk of output from a running user command.
UserCommandOutputDelta(UserCommandOutputDeltaEvent),
/// Completion notification for a user shell command.
UserCommandEnd(UserCommandEndEvent),
/// Notification that the agent attached a local image via the view_image tool.
ViewImageToolCall(ViewImageToolCallEvent),
@@ -1267,6 +1276,51 @@ pub struct ExecCommandOutputDeltaEvent {
pub chunk: Vec<u8>,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct UserCommandBeginEvent {
/// Identifier so this can be paired with the UserCommandEnd event.
pub call_id: String,
/// The command to be executed.
pub command: Vec<String>,
/// The command's working directory.
pub cwd: PathBuf,
pub parsed_cmd: Vec<ParsedCommand>,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct UserCommandEndEvent {
/// Identifier for the UserCommandBegin that finished.
pub call_id: String,
/// Captured stdout.
pub stdout: String,
/// Captured stderr.
pub stderr: String,
/// Captured aggregated output.
#[serde(default)]
pub aggregated_output: String,
/// The command's exit code.
pub exit_code: i32,
/// The duration of the command execution.
#[ts(type = "string")]
pub duration: Duration,
/// Formatted output from the command, as seen by the model.
pub formatted_output: String,
}
#[serde_as]
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, JsonSchema, TS)]
pub struct UserCommandOutputDeltaEvent {
/// Identifier for the UserCommandBegin that produced this chunk.
pub call_id: String,
/// Which stream produced this chunk.
pub stream: ExecOutputStream,
/// Raw bytes from the stream (may not be valid UTF-8).
#[serde_as(as = "serde_with::base64::Base64")]
#[schemars(with = "String")]
#[ts(type = "string")]
pub chunk: Vec<u8>,
}
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, TS)]
pub struct BackgroundEventEvent {
pub message: String,

View File

@@ -40,6 +40,8 @@ use codex_core::protocol::TurnAbortReason;
use codex_core::protocol::TurnDiffEvent;
use codex_core::protocol::UndoCompletedEvent;
use codex_core::protocol::UndoStartedEvent;
use codex_core::protocol::UserCommandBeginEvent;
use codex_core::protocol::UserCommandEndEvent;
use codex_core::protocol::UserMessageEvent;
use codex_core::protocol::ViewImageToolCallEvent;
use codex_core::protocol::WarningEvent;
@@ -627,6 +629,30 @@ impl ChatWidget {
self.defer_or_handle(|q| q.push_exec_end(ev), |s| s.handle_exec_end_now(ev2));
}
fn on_user_command_begin(&mut self, ev: UserCommandBeginEvent) {
self.flush_answer_stream_with_separator();
let ev2 = ev.clone();
self.defer_or_handle(
|q| q.push_user_command_begin(ev),
|s| s.handle_user_command_begin_now(ev2),
);
}
fn on_user_command_output_delta(
&mut self,
_ev: codex_core::protocol::UserCommandOutputDeltaEvent,
) {
// TODO: Handle streaming exec output if/when implemented
}
fn on_user_command_end(&mut self, ev: UserCommandEndEvent) {
let ev2 = ev.clone();
self.defer_or_handle(
|q| q.push_user_command_end(ev),
|s| s.handle_user_command_end_now(ev2),
);
}
fn on_mcp_tool_call_begin(&mut self, ev: McpToolCallBeginEvent) {
let ev2 = ev.clone();
self.defer_or_handle(|q| q.push_mcp_begin(ev), |s| s.handle_mcp_begin_now(ev2));
@@ -785,11 +811,23 @@ impl ChatWidget {
self.request_redraw();
}
pub(crate) fn handle_exec_end_now(&mut self, ev: ExecCommandEndEvent) {
let running = self.running_commands.remove(&ev.call_id);
fn handle_command_end_internal(
&mut self,
call_id: String,
aggregated_output: String,
formatted_output: String,
exit_code: i32,
duration: std::time::Duration,
default_is_user_shell_command: bool,
) {
let running = self.running_commands.remove(&call_id);
let (command, parsed, is_user_shell_command) = match running {
Some(rc) => (rc.command, rc.parsed_cmd, rc.is_user_shell_command),
None => (vec![ev.call_id.clone()], Vec::new(), false),
None => (
vec![call_id.clone()],
Vec::new(),
default_is_user_shell_command,
),
};
let needs_new = self
@@ -800,7 +838,7 @@ impl ChatWidget {
if needs_new {
self.flush_active_cell();
self.active_cell = Some(Box::new(new_active_exec_command(
ev.call_id.clone(),
call_id.clone(),
command,
parsed,
is_user_shell_command,
@@ -813,13 +851,13 @@ impl ChatWidget {
.and_then(|c| c.as_any_mut().downcast_mut::<ExecCell>())
{
cell.complete_call(
&ev.call_id,
&call_id,
CommandOutput {
exit_code: ev.exit_code,
formatted_output: ev.formatted_output.clone(),
aggregated_output: ev.aggregated_output.clone(),
exit_code,
formatted_output,
aggregated_output,
},
ev.duration,
duration,
);
if cell.should_flush() {
self.flush_active_cell();
@@ -827,6 +865,44 @@ impl ChatWidget {
}
}
pub(crate) fn handle_exec_end_now(&mut self, ev: ExecCommandEndEvent) {
let ExecCommandEndEvent {
call_id,
aggregated_output,
formatted_output,
exit_code,
duration,
..
} = ev;
self.handle_command_end_internal(
call_id,
aggregated_output,
formatted_output,
exit_code,
duration,
false,
);
}
pub(crate) fn handle_user_command_end_now(&mut self, ev: UserCommandEndEvent) {
let UserCommandEndEvent {
call_id,
aggregated_output,
formatted_output,
exit_code,
duration,
..
} = ev;
self.handle_command_end_internal(
call_id,
aggregated_output,
formatted_output,
exit_code,
duration,
true,
);
}
pub(crate) fn handle_patch_apply_end_now(
&mut self,
event: codex_core::protocol::PatchApplyEndEvent,
@@ -875,14 +951,19 @@ impl ChatWidget {
});
}
pub(crate) fn handle_exec_begin_now(&mut self, ev: ExecCommandBeginEvent) {
// Ensure the status indicator is visible while the command runs.
fn handle_command_begin_internal(
&mut self,
call_id: String,
command: Vec<String>,
parsed_cmd: Vec<ParsedCommand>,
is_user_shell_command: bool,
) {
self.running_commands.insert(
ev.call_id.clone(),
call_id.clone(),
RunningCommand {
command: ev.command.clone(),
parsed_cmd: ev.parsed_cmd.clone(),
is_user_shell_command: ev.is_user_shell_command,
command: command.clone(),
parsed_cmd: parsed_cmd.clone(),
is_user_shell_command,
},
);
if let Some(cell) = self
@@ -890,10 +971,10 @@ impl ChatWidget {
.as_mut()
.and_then(|c| c.as_any_mut().downcast_mut::<ExecCell>())
&& let Some(new_exec) = cell.with_added_call(
ev.call_id.clone(),
ev.command.clone(),
ev.parsed_cmd.clone(),
ev.is_user_shell_command,
call_id.clone(),
command.clone(),
parsed_cmd.clone(),
is_user_shell_command,
)
{
*cell = new_exec;
@@ -901,16 +982,37 @@ impl ChatWidget {
self.flush_active_cell();
self.active_cell = Some(Box::new(new_active_exec_command(
ev.call_id.clone(),
ev.command.clone(),
ev.parsed_cmd,
ev.is_user_shell_command,
call_id,
command,
parsed_cmd,
is_user_shell_command,
)));
}
self.request_redraw();
}
pub(crate) fn handle_exec_begin_now(&mut self, ev: ExecCommandBeginEvent) {
let ExecCommandBeginEvent {
call_id,
command,
parsed_cmd,
is_user_shell_command,
..
} = ev;
self.handle_command_begin_internal(call_id, command, parsed_cmd, is_user_shell_command);
}
pub(crate) fn handle_user_command_begin_now(&mut self, ev: UserCommandBeginEvent) {
let UserCommandBeginEvent {
call_id,
command,
parsed_cmd,
..
} = ev;
self.handle_command_begin_internal(call_id, command, parsed_cmd, true);
}
pub(crate) fn handle_mcp_begin_now(&mut self, ev: McpToolCallBeginEvent) {
self.flush_answer_stream_with_separator();
self.flush_active_cell();
@@ -1453,7 +1555,8 @@ impl ChatWidget {
match msg {
EventMsg::AgentMessageDelta(_)
| EventMsg::AgentReasoningDelta(_)
| EventMsg::ExecCommandOutputDelta(_) => {}
| EventMsg::ExecCommandOutputDelta(_)
| EventMsg::UserCommandOutputDelta(_) => {}
_ => {
tracing::trace!("handle_codex_event: {:?}", msg);
}
@@ -1506,9 +1609,12 @@ impl ChatWidget {
}
EventMsg::ExecCommandBegin(ev) => self.on_exec_command_begin(ev),
EventMsg::ExecCommandOutputDelta(delta) => self.on_exec_command_output_delta(delta),
EventMsg::UserCommandBegin(ev) => self.on_user_command_begin(ev),
EventMsg::UserCommandOutputDelta(delta) => self.on_user_command_output_delta(delta),
EventMsg::PatchApplyBegin(ev) => self.on_patch_apply_begin(ev),
EventMsg::PatchApplyEnd(ev) => self.on_patch_apply_end(ev),
EventMsg::ExecCommandEnd(ev) => self.on_exec_command_end(ev),
EventMsg::UserCommandEnd(ev) => self.on_user_command_end(ev),
EventMsg::ViewImageToolCall(ev) => self.on_view_image_tool_call(ev),
EventMsg::McpToolCallBegin(ev) => self.on_mcp_tool_call_begin(ev),
EventMsg::McpToolCallEnd(ev) => self.on_mcp_tool_call_end(ev),

View File

@@ -7,6 +7,8 @@ use codex_core::protocol::ExecCommandEndEvent;
use codex_core::protocol::McpToolCallBeginEvent;
use codex_core::protocol::McpToolCallEndEvent;
use codex_core::protocol::PatchApplyEndEvent;
use codex_core::protocol::UserCommandBeginEvent;
use codex_core::protocol::UserCommandEndEvent;
use super::ChatWidget;
@@ -16,6 +18,8 @@ pub(crate) enum QueuedInterrupt {
ApplyPatchApproval(String, ApplyPatchApprovalRequestEvent),
ExecBegin(ExecCommandBeginEvent),
ExecEnd(ExecCommandEndEvent),
UserCommandBegin(UserCommandBeginEvent),
UserCommandEnd(UserCommandEndEvent),
McpBegin(McpToolCallBeginEvent),
McpEnd(McpToolCallEndEvent),
PatchEnd(PatchApplyEndEvent),
@@ -59,6 +63,14 @@ impl InterruptManager {
self.queue.push_back(QueuedInterrupt::ExecEnd(ev));
}
pub(crate) fn push_user_command_begin(&mut self, ev: UserCommandBeginEvent) {
self.queue.push_back(QueuedInterrupt::UserCommandBegin(ev));
}
pub(crate) fn push_user_command_end(&mut self, ev: UserCommandEndEvent) {
self.queue.push_back(QueuedInterrupt::UserCommandEnd(ev));
}
pub(crate) fn push_mcp_begin(&mut self, ev: McpToolCallBeginEvent) {
self.queue.push_back(QueuedInterrupt::McpBegin(ev));
}
@@ -80,6 +92,8 @@ impl InterruptManager {
}
QueuedInterrupt::ExecBegin(ev) => chat.handle_exec_begin_now(ev),
QueuedInterrupt::ExecEnd(ev) => chat.handle_exec_end_now(ev),
QueuedInterrupt::UserCommandBegin(ev) => chat.handle_user_command_begin_now(ev),
QueuedInterrupt::UserCommandEnd(ev) => chat.handle_user_command_end_now(ev),
QueuedInterrupt::McpBegin(ev) => chat.handle_mcp_begin_now(ev),
QueuedInterrupt::McpEnd(ev) => chat.handle_mcp_end_now(ev),
QueuedInterrupt::PatchEnd(ev) => chat.handle_patch_apply_end_now(ev),

View File

@@ -26,7 +26,7 @@ expression: "lines[start_idx..].join(\"\\n\")"
through crates for heavy dependencies in Cargo.toml, including cli, core,
exec, linux-sandbox, tui, login, ollama, and mcp.
• Ran for d in ansi-escape apply-patch arg0 cli common core exec execpolicy
• Ran: for d in ansi-escape apply-patch arg0 cli common core exec execpolicy
│ file-search linux-sandbox login mcp-client mcp-server mcp-types ollama
│ tui; do echo "--- $d/Cargo.toml"; sed -n '1,200p' $d/Cargo.toml; echo;
│ … +1 lines

View File

@@ -2,5 +2,5 @@
source: tui/src/chatwidget/tests.rs
expression: exec_blob
---
• Ran sleep 1
• Ran: sleep 1
└ (no output)

View File

@@ -789,7 +789,7 @@ fn exec_history_cell_shows_working_then_failed() {
let lines = &cells[0];
let blob = lines_to_single_string(lines);
assert!(
blob.contains("• Ran false"),
blob.contains("• Ran: false"),
"expected command and header text present: {blob:?}"
);
assert!(blob.to_lowercase().contains("bloop"), "expected error text");

View File

@@ -345,7 +345,13 @@ impl ExecCell {
Some(false) => "".red().bold(),
None => spinner(call.start_time),
};
let title = if self.is_active() { "Running" } else { "Ran" };
let title = if self.is_active() {
"Running"
} else if call.is_user_shell_command {
"You Ran:"
} else {
"Ran:"
};
let mut header_line =
Line::from(vec![bullet.clone(), " ".into(), title.bold(), " ".into()]);

View File

@@ -2,8 +2,8 @@
source: tui/src/history_cell.rs
expression: rendered
---
• Ran first_token_is_long_en
│ ough_to_wrap
• Ran: first_token_is_long_e
nough_to_wrap
│ second_token_is_also_lon
│ … +1 lines
└ (no output)

View File

@@ -2,6 +2,6 @@
source: tui/src/history_cell.rs
expression: rendered
---
• Ran echo one
• Ran: echo one
│ echo two
└ (no output)

View File

@@ -2,7 +2,7 @@
source: tui/src/history_cell.rs
expression: rendered
---
• Ran set -o pipefail
• Ran: set -o pipefail
│ cargo test
│ --all-features --quiet
└ (no output)

View File

@@ -2,9 +2,9 @@
source: tui/src/history_cell.rs
expression: rendered
---
• Ran echo
│ this_is_a_very_long_si
│ ngle_token_that_will_w
• Ran: echo
│ this_is_a_very_long_s
ingle_token_that_will
│ … +2 lines
└ error: first line on
stderr

View File

@@ -2,5 +2,5 @@
source: tui/src/history_cell.rs
expression: rendered
---
• Ran echo ok
• Ran: echo ok
└ (no output)

View File

@@ -2,7 +2,7 @@
source: tui/src/history_cell.rs
expression: rendered
---
• Ran a_very_long_token_
│ without_spaces_to_
│ force_wrapping
• Ran: a_very_long_token
_without_spaces_t
o_force_wrapping
└ (no output)

View File

@@ -2,7 +2,7 @@
source: tui/src/history_cell.rs
expression: rendered
---
• Ran seq 1 10 1>&2 && false
• Ran: seq 1 10 1>&2 && false
└ 1
2
… +6 lines