Compare commits

...

1 Commits

Author SHA1 Message Date
Owen Lin
71c839b42d feat(zsh-sidecar): add standalone sidecar crate and protocol tests 2026-02-17 12:08:36 -08:00
5 changed files with 1987 additions and 0 deletions

16
codex-rs/Cargo.lock generated
View File

@@ -2536,6 +2536,22 @@ dependencies = [
"winres",
]
[[package]]
name = "codex-zsh-sidecar"
version = "0.0.0"
dependencies = [
"anyhow",
"base64 0.22.1",
"clap",
"serde",
"serde_json",
"shlex",
"tokio",
"tracing",
"tracing-subscriber",
"uuid",
]
[[package]]
name = "color-eyre"
version = "0.6.5"

View File

@@ -17,6 +17,7 @@ members = [
"cli",
"config",
"shell-command",
"zsh-sidecar",
"core",
"hooks",
"secrets",

View File

@@ -0,0 +1,24 @@
[package]
name = "codex-zsh-sidecar"
version.workspace = true
edition.workspace = true
license.workspace = true
[[bin]]
name = "codex-zsh-sidecar"
path = "src/main.rs"
[lints]
workspace = true
[dependencies]
anyhow = { workspace = true }
base64 = { workspace = true }
clap = { workspace = true, features = ["derive"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
shlex = { workspace = true }
tokio = { workspace = true, features = ["io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "sync", "time"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] }
uuid = { workspace = true, features = ["v4"] }

View File

@@ -0,0 +1,994 @@
use anyhow::Context;
use anyhow::Result;
use base64::Engine as _;
use clap::Parser;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::collections::HashMap;
use std::path::PathBuf;
use std::process::Stdio;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncReadExt as _;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
#[cfg(unix)]
use tokio::net::UnixListener;
#[cfg(unix)]
use tokio::net::UnixStream;
use tokio::process::Command;
use tokio::sync::mpsc;
use tokio::task::JoinSet;
use uuid::Uuid;
const JSONRPC_VERSION: &str = "2.0";
const METHOD_ZSH_INITIALIZE: &str = "zsh/initialize";
const METHOD_ZSH_EXEC_START: &str = "zsh/execStart";
const METHOD_ZSH_EXEC_STDIN: &str = "zsh/execStdin";
const METHOD_ZSH_EXEC_RESIZE: &str = "zsh/execResize";
const METHOD_ZSH_EXEC_INTERRUPT: &str = "zsh/execInterrupt";
const METHOD_ZSH_SHUTDOWN: &str = "zsh/shutdown";
const METHOD_ZSH_REQUEST_APPROVAL: &str = "zsh/requestApproval";
const METHOD_ZSH_EVENT_EXEC_STARTED: &str = "zsh/event/execStarted";
const METHOD_ZSH_EVENT_EXEC_STDOUT: &str = "zsh/event/execStdout";
const METHOD_ZSH_EVENT_EXEC_STDERR: &str = "zsh/event/execStderr";
const METHOD_ZSH_EVENT_EXEC_EXITED: &str = "zsh/event/execExited";
const EXEC_WRAPPER_ENV_VAR: &str = "EXEC_WRAPPER";
const SIDECAREXEC_WRAPPER_MODE_ENV_VAR: &str = "CODEX_ZSH_SIDECAR_WRAPPER_MODE";
const SIDECAREXEC_WRAPPER_SOCKET_ENV_VAR: &str = "CODEX_ZSH_SIDECAR_WRAPPER_SOCKET";
const SIDECAREXEC_WRAPPER_ORIGIN_ENV_VAR: &str = "CODEX_ZSH_SIDECAR_WRAPPER_ORIGIN";
#[derive(Parser, Debug)]
struct Args {
#[arg(long)]
zsh_path: String,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[serde(untagged)]
enum JsonRpcId {
Number(i64),
String(String),
}
#[derive(Debug, Clone, Serialize)]
struct JsonRpcRequest<T> {
jsonrpc: &'static str,
id: JsonRpcId,
method: &'static str,
params: T,
}
#[derive(Debug, Clone, Serialize)]
struct JsonRpcSuccess<T> {
jsonrpc: &'static str,
id: JsonRpcId,
result: T,
}
#[derive(Debug, Clone, Serialize)]
struct JsonRpcErrorResponse {
jsonrpc: &'static str,
id: JsonRpcId,
error: JsonRpcError,
}
#[derive(Debug, Clone, Serialize)]
struct JsonRpcError {
code: i64,
message: String,
}
#[derive(Debug, Clone, Serialize)]
struct JsonRpcNotification<T> {
jsonrpc: &'static str,
method: &'static str,
params: T,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
/// Host -> sidecar parameters for `zsh/execStart`.
///
/// This defines the top-level command invocation the sidecar should run and
/// the execution environment to apply before zsh begins intercepting nested
/// `execve` calls via wrapper mode.
struct ExecStartParams {
exec_id: String,
command: Vec<String>,
cwd: String,
#[serde(default)]
env: Option<HashMap<String, String>>,
#[serde(default)]
approval_reason: Option<String>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct ExecInterruptParams {
exec_id: String,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
enum ApprovalDecision {
Approved,
ApprovedForSession,
ApprovedExecpolicyAmendment,
Denied,
Abort,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct RequestApprovalParams {
approval_id: String,
exec_id: String,
command: Vec<String>,
cwd: String,
reason: Option<String>,
proposed_execpolicy_amendment: Option<ExecPolicyAmendmentProposal>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct RequestApprovalResult {
decision: ApprovalDecision,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct ExecPolicyAmendmentProposal {
command_prefix: Vec<String>,
rationale: Option<String>,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct InitializeResult {
protocol_version: u32,
capabilities: Capabilities,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct Capabilities {
interactive_pty: bool,
}
#[derive(Debug, Serialize)]
struct EmptyResult {}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct ExecStartedEvent {
exec_id: String,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct ExecChunkEvent {
exec_id: String,
chunk_base64: String,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct ExecExitedEvent {
exec_id: String,
exit_code: i32,
signal: Option<String>,
timed_out: Option<bool>,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
/// Wrapper-mode request sent over the Unix socket for each intercepted execve.
///
/// Unlike `ExecStartParams`, this payload is produced by the wrapper subprocess
/// (invoked by patched zsh via `EXEC_WRAPPER`) and consumed by the sidecar to
/// decide whether host approval is required for this specific subcommand.
struct WrapperExecRequest {
file: String,
argv: Vec<String>,
cwd: String,
origin: Option<WrapperExecOrigin>,
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
/// Execution origin metadata provided by the patched zsh fork.
///
/// This is used to suppress approvals for shell startup/bootstrap commands
/// while still enforcing approvals for model/user command execution.
enum WrapperExecOrigin {
/// Command was spawned while evaluating login-shell startup files
/// (for example `.zprofile` / `.zlogin`) before user command execution.
LoginStartup,
/// Command was spawned while evaluating non-login rc startup files
/// (for example `.zshrc`) before user command execution.
RcStartup,
/// Command comes from the top-level user-provided command string.
/// This is the primary approval surface for shell-tool requests.
UserCommand,
/// Command is a nested child command derived from user command evaluation
/// (for example a command reached through shell operators or subshells).
Subcommand,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
/// Sidecar decision returned to wrapper mode for an intercepted execve.
///
/// `Run` continues execution of the target program.
/// `Deny` blocks execution and returns a failure to the shell process.
struct WrapperExecResponse {
action: WrapperExecAction,
reason: Option<String>,
}
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
enum WrapperExecAction {
Run,
Deny,
}
/// Runs the zsh sidecar entrypoint.
///
/// Behavior:
/// - In normal mode, speaks JSON-RPC over stdio with the host (core).
/// - In wrapper mode (`CODEX_ZSH_SIDECAR_WRAPPER_MODE`), handles a single
/// exec-wrapper callback request and exits.
#[tokio::main]
async fn main() -> Result<()> {
if std::env::var_os(SIDECAREXEC_WRAPPER_MODE_ENV_VAR).is_some() {
return run_exec_wrapper_mode();
}
tracing_subscriber::fmt()
.with_env_filter("warn")
.with_writer(std::io::stderr)
.init();
let _args = Args::parse();
let mut stdout = tokio::io::stdout();
let stdin = tokio::io::stdin();
let mut lines = BufReader::new(stdin).lines();
loop {
let Some(line) = lines.next_line().await.context("read stdin")? else {
break;
};
if line.trim().is_empty() {
continue;
}
let value: JsonValue = match serde_json::from_str(&line) {
Ok(v) => v,
Err(err) => {
tracing::warn!("invalid JSON-RPC input: {err}");
continue;
}
};
let Some(id_value) = value.get("id") else {
continue;
};
let id: JsonRpcId = match serde_json::from_value(id_value.clone()) {
Ok(v) => v,
Err(err) => {
tracing::warn!("invalid request id: {err}");
continue;
}
};
let method = value
.get("method")
.and_then(JsonValue::as_str)
.unwrap_or_default();
match method {
METHOD_ZSH_INITIALIZE => {
write_json_line(
&mut stdout,
&JsonRpcSuccess {
jsonrpc: JSONRPC_VERSION,
id,
result: InitializeResult {
protocol_version: 1,
capabilities: Capabilities {
interactive_pty: false,
},
},
},
)
.await?;
}
METHOD_ZSH_EXEC_START => {
let params: ExecStartParams = match parse_params(&value) {
Ok(p) => p,
Err(message) => {
write_json_line(
&mut stdout,
&JsonRpcErrorResponse {
jsonrpc: JSONRPC_VERSION,
id,
error: JsonRpcError {
code: -32602,
message,
},
},
)
.await?;
continue;
}
};
if params.command.is_empty() {
write_json_line(
&mut stdout,
&JsonRpcErrorResponse {
jsonrpc: JSONRPC_VERSION,
id,
error: JsonRpcError {
code: -32602,
message: "execStart.command is empty".to_string(),
},
},
)
.await?;
continue;
}
let mut cmd = Command::new(&params.command[0]);
if params.command.len() > 1 {
cmd.args(&params.command[1..]);
}
cmd.current_dir(&params.cwd);
cmd.stdin(Stdio::null());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
cmd.kill_on_drop(true);
cmd.env_clear();
if let Some(env) = params.env.as_ref() {
cmd.envs(env);
}
#[cfg(unix)]
let wrapper_socket_path = {
params
.env
.as_ref()
.and_then(|env| env.get(SIDECAREXEC_WRAPPER_SOCKET_ENV_VAR))
.map(std::path::PathBuf::from)
.unwrap_or_else(|| {
let socket_id = Uuid::new_v4().as_simple().to_string();
let temp_dir = std::env::temp_dir();
let canonical_temp_dir = temp_dir.canonicalize().unwrap_or(temp_dir);
canonical_temp_dir.join(format!("czs-{}.sock", &socket_id[..12]))
})
};
#[cfg(unix)]
let listener = {
let _ = std::fs::remove_file(&wrapper_socket_path);
UnixListener::bind(&wrapper_socket_path).with_context(|| {
format!("bind wrapper socket at {}", wrapper_socket_path.display())
})?
};
#[cfg(unix)]
{
cmd.env(
SIDECAREXEC_WRAPPER_SOCKET_ENV_VAR,
wrapper_socket_path.to_string_lossy().to_string(),
);
let wrapper_path =
std::env::current_exe().context("resolve current sidecar binary path")?;
cmd.env(
EXEC_WRAPPER_ENV_VAR,
wrapper_path.to_string_lossy().to_string(),
);
}
cmd.env(SIDECAREXEC_WRAPPER_MODE_ENV_VAR, "1");
let mut child = match cmd.spawn() {
Ok(c) => c,
Err(err) => {
#[cfg(unix)]
{
let _ = std::fs::remove_file(&wrapper_socket_path);
}
write_json_line(
&mut stdout,
&JsonRpcErrorResponse {
jsonrpc: JSONRPC_VERSION,
id,
error: JsonRpcError {
code: -32000,
message: format!("failed to spawn command: {err}"),
},
},
)
.await?;
continue;
}
};
let exec_id = params.exec_id.clone();
write_json_line(
&mut stdout,
&JsonRpcSuccess {
jsonrpc: JSONRPC_VERSION,
id,
result: EmptyResult {},
},
)
.await?;
write_json_line(
&mut stdout,
&JsonRpcNotification {
jsonrpc: JSONRPC_VERSION,
method: METHOD_ZSH_EVENT_EXEC_STARTED,
params: ExecStartedEvent {
exec_id: exec_id.clone(),
},
},
)
.await?;
let mut tasks = JoinSet::new();
let (stream_tx, mut stream_rx) =
mpsc::unbounded_channel::<(&'static str, Vec<u8>)>();
if let Some(mut out) = child.stdout.take() {
let tx = stream_tx.clone();
tasks.spawn(async move {
let mut buf = [0_u8; 8192];
loop {
let read = match out.read(&mut buf).await {
Ok(0) => break,
Ok(n) => n,
Err(err) => {
tracing::warn!("stdout read error: {err}");
break;
}
};
let _ = tx.send((METHOD_ZSH_EVENT_EXEC_STDOUT, buf[..read].to_vec()));
}
});
}
if let Some(mut err) = child.stderr.take() {
let tx = stream_tx.clone();
tasks.spawn(async move {
let mut buf = [0_u8; 8192];
loop {
let read = match err.read(&mut buf).await {
Ok(0) => break,
Ok(n) => n,
Err(err) => {
tracing::warn!("stderr read error: {err}");
break;
}
};
let _ = tx.send((METHOD_ZSH_EVENT_EXEC_STDERR, buf[..read].to_vec()));
}
});
}
drop(stream_tx);
let wait = child.wait();
tokio::pin!(wait);
let mut child_exit = None;
#[cfg(unix)]
while child_exit.is_none() || !stream_rx.is_closed() {
tokio::select! {
result = &mut wait, if child_exit.is_none() => {
child_exit = Some(result.context("wait for command exit")?);
}
stream = stream_rx.recv(), if !stream_rx.is_closed() => {
if let Some((method, chunk)) = stream {
write_json_line(
&mut stdout,
&JsonRpcNotification {
jsonrpc: JSONRPC_VERSION,
method,
params: ExecChunkEvent {
exec_id: exec_id.clone(),
chunk_base64: base64::engine::general_purpose::STANDARD.encode(chunk),
},
}
).await?;
}
}
accept_result = listener.accept() => {
let (stream, _) = match accept_result {
Ok(pair) => pair,
Err(err) => {
tracing::warn!("failed to accept wrapper request: {err}");
continue;
}
};
handle_wrapper_request(
&mut stdout,
&mut lines,
stream,
exec_id.clone(),
params.approval_reason.clone(),
).await?;
}
}
}
#[cfg(not(unix))]
while child_exit.is_none() || !stream_rx.is_closed() {
tokio::select! {
result = &mut wait, if child_exit.is_none() => {
child_exit = Some(result.context("wait for command exit")?);
}
stream = stream_rx.recv(), if !stream_rx.is_closed() => {
if let Some((method, chunk)) = stream {
write_json_line(
&mut stdout,
&JsonRpcNotification {
jsonrpc: JSONRPC_VERSION,
method,
params: ExecChunkEvent {
exec_id: exec_id.clone(),
chunk_base64: base64::engine::general_purpose::STANDARD.encode(chunk),
},
}
).await?;
}
}
}
}
while tasks.join_next().await.is_some() {}
#[cfg(unix)]
{
let _ = std::fs::remove_file(&wrapper_socket_path);
}
let status = child_exit.context("missing child exit status")?;
let exit_code = status.code().unwrap_or(-1);
#[cfg(unix)]
let signal = {
use std::os::unix::process::ExitStatusExt;
status.signal().map(|sig: i32| sig.to_string())
};
#[cfg(not(unix))]
let signal = None;
write_json_line(
&mut stdout,
&JsonRpcNotification {
jsonrpc: JSONRPC_VERSION,
method: METHOD_ZSH_EVENT_EXEC_EXITED,
params: ExecExitedEvent {
exec_id: exec_id.clone(),
exit_code,
signal,
timed_out: Some(false),
},
},
)
.await?;
}
METHOD_ZSH_EXEC_INTERRUPT => {
let params: ExecInterruptParams = match parse_params(&value) {
Ok(p) => p,
Err(message) => {
write_json_line(
&mut stdout,
&JsonRpcErrorResponse {
jsonrpc: JSONRPC_VERSION,
id,
error: JsonRpcError {
code: -32602,
message,
},
},
)
.await?;
continue;
}
};
write_json_line(
&mut stdout,
&JsonRpcErrorResponse {
jsonrpc: JSONRPC_VERSION,
id,
error: JsonRpcError {
code: -32002,
message: format!("unknown exec id: {}", params.exec_id),
},
},
)
.await?;
}
METHOD_ZSH_EXEC_STDIN | METHOD_ZSH_EXEC_RESIZE => {
write_json_line(
&mut stdout,
&JsonRpcErrorResponse {
jsonrpc: JSONRPC_VERSION,
id,
error: JsonRpcError {
code: -32004,
message: "method not supported in sidecar phase 1".to_string(),
},
},
)
.await?;
}
METHOD_ZSH_SHUTDOWN => {
write_json_line(
&mut stdout,
&JsonRpcSuccess {
jsonrpc: JSONRPC_VERSION,
id,
result: EmptyResult {},
},
)
.await?;
break;
}
_ => {
write_json_line(
&mut stdout,
&JsonRpcErrorResponse {
jsonrpc: JSONRPC_VERSION,
id,
error: JsonRpcError {
code: -32601,
message: format!("unknown method: {method}"),
},
},
)
.await?;
}
}
}
Ok(())
}
#[cfg(unix)]
/// Runs exec-wrapper mode for a single intercepted execve.
///
/// The patched zsh process invokes the sidecar binary as `EXEC_WRAPPER`.
/// This mode forwards the intercepted exec metadata over the wrapper socket,
/// waits for host policy (`run` or `deny`), and then either executes the
/// target program or exits with failure.
fn run_exec_wrapper_mode() -> Result<()> {
use std::io::Read;
use std::io::Write;
use std::os::unix::net::UnixStream as StdUnixStream;
let args: Vec<String> = std::env::args().collect();
if args.len() < 2 {
anyhow::bail!("exec wrapper mode requires target executable path");
}
let file = args[1].clone();
let argv = if args.len() > 2 {
args[2..].to_vec()
} else {
vec![file.clone()]
};
let cwd = std::env::current_dir()
.context("resolve wrapper cwd")?
.to_string_lossy()
.to_string();
let socket_path = std::env::var(SIDECAREXEC_WRAPPER_SOCKET_ENV_VAR)
.context("missing wrapper socket path env var")?;
let mut stream = StdUnixStream::connect(&socket_path)
.with_context(|| format!("connect to wrapper socket at {socket_path}"))?;
let request = WrapperExecRequest {
file: file.clone(),
argv: argv.clone(),
cwd,
origin: wrapper_exec_origin_from_env(),
};
let encoded = serde_json::to_string(&request).context("serialize wrapper request")?;
stream
.write_all(encoded.as_bytes())
.context("write wrapper request")?;
stream
.write_all(b"\n")
.context("write wrapper request newline")?;
stream
.shutdown(std::net::Shutdown::Write)
.context("shutdown wrapper write")?;
let mut response_buf = String::new();
stream
.read_to_string(&mut response_buf)
.context("read wrapper response")?;
let response: WrapperExecResponse =
serde_json::from_str(response_buf.trim()).context("parse wrapper response")?;
if response.action == WrapperExecAction::Deny {
if let Some(reason) = response.reason {
eprintln!("Execution denied: {reason}");
} else {
eprintln!("Execution denied");
}
std::process::exit(1);
}
let mut command = std::process::Command::new(&file);
if argv.len() > 1 {
command.args(&argv[1..]);
}
let status = command.status().context("spawn wrapped executable")?;
std::process::exit(status.code().unwrap_or(1));
}
#[cfg(not(unix))]
fn run_exec_wrapper_mode() -> Result<()> {
anyhow::bail!("exec wrapper mode is only supported on unix");
}
async fn wait_for_approval_result(
lines: &mut tokio::io::Lines<BufReader<tokio::io::Stdin>>,
expected_id: JsonRpcId,
) -> Result<ApprovalDecision> {
loop {
let Some(line) = lines.next_line().await.context("read stdin")? else {
anyhow::bail!("stdin closed while waiting for approval response");
};
if line.trim().is_empty() {
continue;
}
let value: JsonValue =
serde_json::from_str(&line).context("parse approval response JSON-RPC message")?;
let Some(id_value) = value.get("id") else {
continue;
};
let id: JsonRpcId = serde_json::from_value(id_value.clone())
.context("parse approval response JSON-RPC id")?;
if id != expected_id {
tracing::warn!("ignoring unexpected JSON-RPC message while waiting for approval");
continue;
}
if let Some(error) = value.get("error") {
let message = error
.get("message")
.and_then(JsonValue::as_str)
.unwrap_or("unknown host approval callback error");
anyhow::bail!("host rejected approval callback: {message}");
}
let result: RequestApprovalResult = serde_json::from_value(
value
.get("result")
.cloned()
.ok_or_else(|| anyhow::anyhow!("missing approval callback result"))?,
)
.context("parse approval callback result")?;
return Ok(result.decision);
}
}
#[cfg(unix)]
async fn handle_wrapper_request(
stdout: &mut tokio::io::Stdout,
lines: &mut tokio::io::Lines<BufReader<tokio::io::Stdin>>,
mut stream: UnixStream,
exec_id: String,
approval_reason: Option<String>,
) -> Result<()> {
let mut request_buf = Vec::new();
stream
.read_to_end(&mut request_buf)
.await
.context("read wrapper request from socket")?;
let request_line = String::from_utf8(request_buf).context("decode wrapper request as utf-8")?;
let request: WrapperExecRequest =
serde_json::from_str(request_line.trim()).context("parse wrapper request payload")?;
if should_skip_host_approval_for_request(&request) {
return write_json_line(
&mut stream,
&WrapperExecResponse {
action: WrapperExecAction::Run,
reason: None,
},
)
.await;
}
let command_for_approval = if request.argv.is_empty() {
vec![request.file.clone()]
} else {
request.argv.clone()
};
let approval_callback_id = JsonRpcId::String(format!("approval-cb-{}", Uuid::new_v4()));
let approval_id = Uuid::new_v4().to_string();
let approval_request = JsonRpcRequest {
jsonrpc: JSONRPC_VERSION,
id: approval_callback_id.clone(),
method: METHOD_ZSH_REQUEST_APPROVAL,
params: RequestApprovalParams {
approval_id,
exec_id: exec_id.clone(),
command: command_for_approval,
cwd: request.cwd,
reason: approval_reason,
proposed_execpolicy_amendment: None,
},
};
write_json_line(stdout, &approval_request).await?;
let decision = wait_for_approval_result(lines, approval_callback_id).await?;
let response = match decision {
ApprovalDecision::Approved
| ApprovalDecision::ApprovedForSession
| ApprovalDecision::ApprovedExecpolicyAmendment => WrapperExecResponse {
action: WrapperExecAction::Run,
reason: None,
},
ApprovalDecision::Denied => WrapperExecResponse {
action: WrapperExecAction::Deny,
reason: Some("command denied by host approval policy".to_string()),
},
ApprovalDecision::Abort => WrapperExecResponse {
action: WrapperExecAction::Deny,
reason: Some("command aborted by host approval policy".to_string()),
},
};
write_json_line(&mut stream, &response).await
}
fn wrapper_exec_origin_from_env() -> Option<WrapperExecOrigin> {
let raw = std::env::var(SIDECAREXEC_WRAPPER_ORIGIN_ENV_VAR).ok()?;
parse_wrapper_exec_origin(raw.as_str())
}
fn parse_wrapper_exec_origin(raw: &str) -> Option<WrapperExecOrigin> {
// These numeric values are emitted by the patched zsh fork via:
// `CODEX_ZSH_SIDECAR_WRAPPER_ORIGIN=<int>`.
//
// Mapping comes from zsh's `enum exec_wrapper_origin`:
// 0 = user command
// 1 = login startup (.zprofile/.zlogin/etc.)
// 2 = rc startup (.zshrc/.zshenv/etc.)
// 3 = subcommand
match raw {
"0" => Some(WrapperExecOrigin::UserCommand),
"1" => Some(WrapperExecOrigin::LoginStartup),
"2" => Some(WrapperExecOrigin::RcStartup),
"3" => Some(WrapperExecOrigin::Subcommand),
_ => None,
}
}
/// Decide whether a wrapper-intercepted execve should bypass host approval.
///
/// Rationale:
/// - We intentionally suppress startup-file commands (`login_startup`, `rc_startup`)
/// so clients only see approvals for model-generated command execution, not shell bootstrap.
/// - We also suppress internal zsh re-exec handoffs (`zsh -c` / `zsh -fc`) because
/// they are implementation details and not actionable user-facing approvals.
/// - All other commands remain approval-eligible and are surfaced to host policy.
fn should_skip_host_approval_for_request(request: &WrapperExecRequest) -> bool {
if matches!(
request.origin,
Some(WrapperExecOrigin::LoginStartup | WrapperExecOrigin::RcStartup)
) {
return true;
}
let request_file = PathBuf::from(&request.file);
let request_file_name = request_file
.file_name()
.and_then(|name| name.to_str())
.unwrap_or_default();
let argv_program_name = request
.argv
.first()
.map(PathBuf::from)
.and_then(|path| path.file_name().map(std::borrow::ToOwned::to_owned))
.and_then(|name| name.to_str().map(str::to_owned))
.unwrap_or_default();
let looks_like_zsh_exec = request_file_name == "zsh" || argv_program_name == "zsh";
let has_command_string_flag = request
.argv
.iter()
.filter(|arg| arg.starts_with('-'))
.any(|arg| arg.chars().skip(1).any(|ch| ch == 'c'));
looks_like_zsh_exec && has_command_string_flag
}
fn parse_params<T: for<'de> Deserialize<'de>>(value: &JsonValue) -> std::result::Result<T, String> {
let params = value
.get("params")
.cloned()
.ok_or_else(|| "missing params".to_string())?;
serde_json::from_value(params).map_err(|err| format!("invalid params: {err}"))
}
async fn write_json_line<W: tokio::io::AsyncWrite + Unpin, T: Serialize>(
writer: &mut W,
message: &T,
) -> Result<()> {
let encoded = serde_json::to_string(message).context("serialize JSON-RPC message")?;
writer
.write_all(encoded.as_bytes())
.await
.context("write message")?;
writer.write_all(b"\n").await.context("write newline")?;
writer.flush().await.context("flush message")?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn wrapper_request(
origin: Option<WrapperExecOrigin>,
file: &str,
argv: &[&str],
) -> WrapperExecRequest {
WrapperExecRequest {
file: file.to_string(),
argv: argv.iter().map(ToString::to_string).collect(),
cwd: "/tmp".to_string(),
origin,
}
}
#[test]
fn skips_host_approval_for_login_startup_origin() {
let request = wrapper_request(
Some(WrapperExecOrigin::LoginStartup),
"/usr/bin/echo",
&["/usr/bin/echo", "hi"],
);
assert!(should_skip_host_approval_for_request(&request));
}
#[test]
fn skips_host_approval_for_rc_startup_origin() {
let request = wrapper_request(
Some(WrapperExecOrigin::RcStartup),
"/usr/bin/echo",
&["/usr/bin/echo", "hi"],
);
assert!(should_skip_host_approval_for_request(&request));
}
#[test]
fn does_not_skip_host_approval_for_user_command_origin() {
let request = wrapper_request(
Some(WrapperExecOrigin::UserCommand),
"/usr/bin/true",
&["/usr/bin/true"],
);
assert!(!should_skip_host_approval_for_request(&request));
}
#[test]
fn skips_host_approval_for_internal_zsh_reexec_without_origin() {
let request = wrapper_request(None, "/Users/test/bin/zsh", &["zsh", "-fc", "true"]);
assert!(should_skip_host_approval_for_request(&request));
}
#[test]
fn parses_numeric_wrapper_origin_values() {
assert_eq!(
parse_wrapper_exec_origin("1"),
Some(WrapperExecOrigin::LoginStartup)
);
assert_eq!(
parse_wrapper_exec_origin("2"),
Some(WrapperExecOrigin::RcStartup)
);
assert_eq!(
parse_wrapper_exec_origin("0"),
Some(WrapperExecOrigin::UserCommand)
);
}
}

View File

@@ -0,0 +1,952 @@
#![cfg(unix)]
use anyhow::Context;
use anyhow::Result;
use serde_json::Value as JsonValue;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::net::UnixListener;
use tokio::process::Child;
use tokio::process::ChildStdin;
use tokio::process::ChildStdout;
use tokio::process::Command;
use tokio::time::Duration;
use tokio::time::timeout;
const JSONRPC_VERSION: &str = "2.0";
const EXEC_START_REQUEST_ID: i64 = 2;
const WRAPPER_MODE_ENV_VAR: &str = "CODEX_ZSH_SIDECAR_WRAPPER_MODE";
const WRAPPER_SOCKET_ENV_VAR: &str = "CODEX_ZSH_SIDECAR_WRAPPER_SOCKET";
const WRAPPER_ORIGIN_ENV_VAR: &str = "CODEX_ZSH_SIDECAR_WRAPPER_ORIGIN";
#[tokio::test]
async fn exec_start_emits_multiple_subcommand_approvals_for_compound_command() -> Result<()> {
let Some(mut harness) = SidecarHarness::start().await? else {
return Ok(());
};
harness.initialize().await?;
harness
.start_exec_with_command("/usr/bin/true && /usr/bin/true", None)
.await?;
let mut exec_start_acked = false;
let mut intercepted_subcommand_callbacks = 0usize;
let mut intercepted_true_callbacks = 0usize;
let mut saw_exec_exited = false;
let mut exit_code = None;
while !saw_exec_exited {
let value = harness.read_next_message().await?;
if let Some((id, _reason, command)) = parse_approval_request(&value) {
if command.first().is_some_and(|c| c == "/usr/bin/true") {
intercepted_subcommand_callbacks += 1;
intercepted_true_callbacks += 1;
}
harness.respond_approval(id, "approved").await?;
continue;
}
if value.get("id").and_then(JsonValue::as_i64) == Some(EXEC_START_REQUEST_ID)
&& value.get("result").is_some()
{
exec_start_acked = true;
continue;
}
if value.get("method").and_then(JsonValue::as_str) == Some("zsh/event/execExited") {
saw_exec_exited = true;
exit_code = value
.pointer("/params/exitCode")
.and_then(JsonValue::as_i64)
.map(|code| code as i32);
}
}
harness.shutdown().await?;
assert!(exec_start_acked, "expected execStart success response");
assert_eq!(exit_code, Some(0), "expected successful command exit");
assert!(
intercepted_subcommand_callbacks >= 2,
"expected at least two intercepted subcommand approvals, got {intercepted_subcommand_callbacks}"
);
assert!(
intercepted_true_callbacks >= 2,
"expected at least two intercepted /usr/bin/true approvals, got {intercepted_true_callbacks}"
);
Ok(())
}
#[tokio::test]
async fn exec_start_uses_custom_approval_reason_when_provided() -> Result<()> {
let Some(mut harness) = SidecarHarness::start().await? else {
return Ok(());
};
harness.initialize().await?;
harness
.start_exec_with_command(
"/usr/bin/true && /usr/bin/true",
Some("test custom zsh approval reason"),
)
.await?;
let mut saw_custom_reason = false;
let mut saw_exec_exited = false;
while !saw_exec_exited {
let value = harness.read_next_message().await?;
if let Some((id, reason, _command)) = parse_approval_request(&value) {
if reason.as_deref() == Some("test custom zsh approval reason") {
saw_custom_reason = true;
}
harness.respond_approval(id, "approved").await?;
continue;
}
if value.get("method").and_then(JsonValue::as_str) == Some("zsh/event/execExited") {
saw_exec_exited = true;
}
}
harness.shutdown().await?;
assert!(
saw_custom_reason,
"expected at least one approval callback with custom reason"
);
Ok(())
}
#[tokio::test]
async fn exec_start_does_not_emit_exec_start_approval_callback() -> Result<()> {
let Some(mut harness) = SidecarHarness::start().await? else {
return Ok(());
};
harness.initialize().await?;
harness
.start_exec_with_command("/usr/bin/true && /usr/bin/true", None)
.await?;
let mut saw_exec_start_success = false;
let mut saw_exec_exited = false;
let mut saw_exec_start_reason = false;
while !saw_exec_exited {
let value = harness.read_next_message().await?;
if let Some((id, reason, _command)) = parse_approval_request(&value) {
if reason.as_deref() == Some("zsh sidecar execStart command approval") {
saw_exec_start_reason = true;
}
harness.respond_approval(id, "approved").await?;
continue;
}
if value.get("id").and_then(JsonValue::as_i64) == Some(EXEC_START_REQUEST_ID)
&& value.get("result").is_some()
{
saw_exec_start_success = true;
continue;
}
if value.get("method").and_then(JsonValue::as_str) == Some("zsh/event/execExited") {
saw_exec_exited = true;
}
}
harness.shutdown().await?;
assert!(
saw_exec_start_success,
"expected execStart success response"
);
assert!(
!saw_exec_start_reason,
"did not expect execStart approval callback reason"
);
Ok(())
}
#[tokio::test]
async fn exec_start_does_not_emit_internal_zsh_reexec_approval_callback() -> Result<()> {
let Some(mut harness) = SidecarHarness::start().await? else {
return Ok(());
};
harness.initialize().await?;
harness
.start_exec_with_command("/usr/bin/true && /usr/bin/true", None)
.await?;
let mut saw_exec_start_success = false;
let mut saw_exec_exited = false;
let mut saw_internal_zsh_reexec = false;
while !saw_exec_exited {
let value = harness.read_next_message().await?;
if let Some((id, _reason, command)) = parse_approval_request(&value) {
let command_program_name = command
.first()
.map(std::path::PathBuf::from)
.and_then(|path| path.file_name().map(std::borrow::ToOwned::to_owned))
.and_then(|name| name.to_str().map(str::to_owned))
.unwrap_or_default();
let has_command_string_flag = command
.iter()
.filter(|arg| arg.starts_with('-'))
.any(|arg| arg.chars().skip(1).any(|ch| ch == 'c'));
if command_program_name == "zsh" && has_command_string_flag {
saw_internal_zsh_reexec = true;
}
harness.respond_approval(id, "approved").await?;
continue;
}
if value.get("id").and_then(JsonValue::as_i64) == Some(EXEC_START_REQUEST_ID)
&& value.get("result").is_some()
{
saw_exec_start_success = true;
continue;
}
if value.get("method").and_then(JsonValue::as_str) == Some("zsh/event/execExited") {
saw_exec_exited = true;
}
}
harness.shutdown().await?;
assert!(
saw_exec_start_success,
"expected execStart success response"
);
assert!(
!saw_internal_zsh_reexec,
"did not expect host approval callback for internal zsh re-exec"
);
Ok(())
}
#[tokio::test]
async fn denying_second_subcommand_fails_exec() -> Result<()> {
let Some(outcome) = run_compound_exec_with_second_subcommand_decision("denied").await? else {
return Ok(());
};
assert!(
outcome.saw_exec_start_success,
"expected execStart success response"
);
assert!(
outcome.subcommand_callbacks >= 2,
"expected at least two subcommand callbacks before exit, got {}",
outcome.subcommand_callbacks
);
assert_ne!(
outcome.exit_code,
Some(0),
"denying the second subcommand should cause non-zero exit"
);
Ok(())
}
#[tokio::test]
async fn aborting_second_subcommand_fails_exec() -> Result<()> {
let Some(outcome) = run_compound_exec_with_second_subcommand_decision("abort").await? else {
return Ok(());
};
assert!(
outcome.saw_exec_start_success,
"expected execStart success response"
);
assert!(
outcome.subcommand_callbacks >= 2,
"expected at least two subcommand callbacks before exit, got {}",
outcome.subcommand_callbacks
);
assert_ne!(
outcome.exit_code,
Some(0),
"aborting the second subcommand should cause non-zero exit"
);
Ok(())
}
#[tokio::test]
async fn approved_for_session_subcommand_decision_allows_exec() -> Result<()> {
let Some(outcome) =
run_compound_exec_with_second_subcommand_decision("approved_for_session").await?
else {
return Ok(());
};
assert!(
outcome.saw_exec_start_success,
"expected execStart success response"
);
assert!(
outcome.subcommand_callbacks >= 2,
"expected at least two subcommand callbacks before exit, got {}",
outcome.subcommand_callbacks
);
assert_eq!(
outcome.exit_code,
Some(0),
"approved_for_session should allow successful command exit"
);
Ok(())
}
#[tokio::test]
async fn approved_execpolicy_amendment_subcommand_decision_allows_exec() -> Result<()> {
let Some(outcome) =
run_compound_exec_with_second_subcommand_decision("approved_execpolicy_amendment").await?
else {
return Ok(());
};
assert!(
outcome.saw_exec_start_success,
"expected execStart success response"
);
assert!(
outcome.subcommand_callbacks >= 2,
"expected at least two subcommand callbacks before exit, got {}",
outcome.subcommand_callbacks
);
assert_eq!(
outcome.exit_code,
Some(0),
"approved_execpolicy_amendment should allow successful command exit"
);
Ok(())
}
#[tokio::test]
async fn approval_callback_ignores_unexpected_response_id() -> Result<()> {
let Some(mut harness) = SidecarHarness::start().await? else {
return Ok(());
};
harness.initialize().await?;
harness
.start_exec_with_command("/usr/bin/true && /usr/bin/true", None)
.await?;
let mut sent_wrong_id_once = false;
let mut saw_exec_start_success = false;
let mut saw_exec_exited = false;
let mut exit_code = None;
while !saw_exec_exited {
let value = harness.read_next_message().await?;
if let Some((id, _reason, _command)) = parse_approval_request(&value) {
if !sent_wrong_id_once {
harness
.respond_approval(
JsonValue::String("definitely-wrong-id".to_string()),
"approved",
)
.await?;
sent_wrong_id_once = true;
}
harness.respond_approval(id, "approved").await?;
continue;
}
if value.get("id").and_then(JsonValue::as_i64) == Some(EXEC_START_REQUEST_ID)
&& value.get("result").is_some()
{
saw_exec_start_success = true;
continue;
}
if value.get("method").and_then(JsonValue::as_str) == Some("zsh/event/execExited") {
saw_exec_exited = true;
exit_code = value
.pointer("/params/exitCode")
.and_then(JsonValue::as_i64)
.map(|code| code as i32);
}
}
harness.shutdown().await?;
assert!(sent_wrong_id_once, "expected wrong-id response to be sent");
assert!(
saw_exec_start_success,
"expected execStart success despite wrong callback id response"
);
assert_eq!(exit_code, Some(0), "expected successful command exit");
Ok(())
}
#[tokio::test]
async fn malformed_approval_response_terminates_sidecar() -> Result<()> {
let Some(mut harness) = SidecarHarness::start().await? else {
return Ok(());
};
harness.initialize().await?;
harness
.start_exec_with_command("/usr/bin/true && /usr/bin/true", None)
.await?;
loop {
let value = harness.read_next_message().await?;
if let Some((id, _reason, _command)) = parse_approval_request(&value) {
harness
.write_json_line(&serde_json::json!({
"jsonrpc": JSONRPC_VERSION,
"id": id,
"result": {}
}))
.await?;
break;
}
}
let status = timeout(Duration::from_secs(3), harness.child.wait())
.await
.context("timed out waiting for sidecar crash on malformed callback response")??;
assert!(
!status.success(),
"sidecar should fail fast on malformed callback response"
);
Ok(())
}
#[tokio::test]
async fn returns_jsonrpc_error_for_unknown_method() -> Result<()> {
let Some(mut harness) = SidecarHarness::start().await? else {
return Ok(());
};
harness.initialize().await?;
harness
.write_json_line(&serde_json::json!({
"jsonrpc": JSONRPC_VERSION,
"id": 55,
"method": "zsh/notRealMethod",
"params": {}
}))
.await?;
let response = harness.wait_for_response(55).await?;
assert_eq!(
response.pointer("/error/code"),
Some(&JsonValue::from(-32601))
);
harness.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn returns_jsonrpc_invalid_params_for_exec_start_with_empty_command() -> Result<()> {
let Some(mut harness) = SidecarHarness::start().await? else {
return Ok(());
};
harness.initialize().await?;
harness
.write_json_line(&serde_json::json!({
"jsonrpc": JSONRPC_VERSION,
"id": EXEC_START_REQUEST_ID,
"method": "zsh/execStart",
"params": {
"execId": "exec-invalid",
"command": [],
"cwd": std::env::current_dir()?.to_string_lossy().to_string(),
"env": {}
}
}))
.await?;
let response = harness.wait_for_response(EXEC_START_REQUEST_ID).await?;
assert_eq!(
response.pointer("/error/code"),
Some(&JsonValue::from(-32602))
);
harness.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn exec_events_are_ordered_exec_started_before_output_and_single_exit() -> Result<()> {
let Some(mut harness) = SidecarHarness::start().await? else {
return Ok(());
};
harness.initialize().await?;
harness
.start_exec_with_command("/usr/bin/printf 'hi\\n'", None)
.await?;
let mut saw_exec_started = false;
let mut saw_output_before_started = false;
let mut exec_exited_count = 0usize;
let mut saw_exec_start_success = false;
while exec_exited_count == 0 {
let value = harness.read_next_message().await?;
if let Some((id, _reason, _command)) = parse_approval_request(&value) {
harness.respond_approval(id, "approved").await?;
continue;
}
match value.get("method").and_then(JsonValue::as_str) {
Some("zsh/event/execStarted") => {
saw_exec_started = true;
}
Some("zsh/event/execStdout") | Some("zsh/event/execStderr") => {
if !saw_exec_started {
saw_output_before_started = true;
}
}
Some("zsh/event/execExited") => {
exec_exited_count += 1;
}
_ => {}
}
if value.get("id").and_then(JsonValue::as_i64) == Some(EXEC_START_REQUEST_ID)
&& value.get("result").is_some()
{
saw_exec_start_success = true;
}
}
for _ in 0..4 {
let Some(value) = harness
.read_next_message_with_timeout(Duration::from_millis(100))
.await?
else {
break;
};
if value.get("method").and_then(JsonValue::as_str) == Some("zsh/event/execExited") {
exec_exited_count += 1;
}
}
harness.shutdown().await?;
assert!(
saw_exec_start_success,
"expected execStart success response"
);
assert!(saw_exec_started, "expected execStarted event");
assert!(!saw_output_before_started, "saw output before execStarted");
assert_eq!(exec_exited_count, 1, "expected one execExited event");
Ok(())
}
#[tokio::test]
async fn exec_interrupt_returns_unknown_exec_id_error_in_phase1() -> Result<()> {
let Some(mut harness) = SidecarHarness::start().await? else {
return Ok(());
};
harness.initialize().await?;
harness
.write_json_line(&serde_json::json!({
"jsonrpc": JSONRPC_VERSION,
"id": 77,
"method": "zsh/execInterrupt",
"params": {
"execId": "exec-does-not-exist"
}
}))
.await?;
let response = harness.wait_for_response(77).await?;
assert_eq!(
response.pointer("/error/code"),
Some(&JsonValue::from(-32002))
);
harness.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn exec_stdin_and_resize_return_not_supported_error_in_phase1() -> Result<()> {
let Some(mut harness) = SidecarHarness::start().await? else {
return Ok(());
};
harness.initialize().await?;
harness
.write_json_line(&serde_json::json!({
"jsonrpc": JSONRPC_VERSION,
"id": 78,
"method": "zsh/execStdin",
"params": {
"execId": "exec-does-not-exist",
"chunkBase64": "aGk="
}
}))
.await?;
let stdin_response = harness.wait_for_response(78).await?;
assert_eq!(
stdin_response.pointer("/error/code"),
Some(&JsonValue::from(-32004))
);
harness
.write_json_line(&serde_json::json!({
"jsonrpc": JSONRPC_VERSION,
"id": 79,
"method": "zsh/execResize",
"params": {
"execId": "exec-does-not-exist",
"cols": 80,
"rows": 24
}
}))
.await?;
let resize_response = harness.wait_for_response(79).await?;
assert_eq!(
resize_response.pointer("/error/code"),
Some(&JsonValue::from(-32004))
);
harness.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn wrapper_mode_with_invalid_socket_fails_fast() -> Result<()> {
let sidecar = env!("CARGO_BIN_EXE_codex-zsh-sidecar");
let mut child = Command::new(sidecar)
.arg("/usr/bin/true")
.env(WRAPPER_MODE_ENV_VAR, "1")
.env(
WRAPPER_SOCKET_ENV_VAR,
"/tmp/definitely-not-a-real-codex-zsh-wrapper.sock",
)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.context("spawn wrapper mode sidecar process")?;
let status = timeout(Duration::from_secs(3), child.wait())
.await
.context("timed out waiting for wrapper mode process failure")??;
let stderr = child
.stderr
.take()
.context("missing stderr for wrapper mode process")?;
let mut lines = BufReader::new(stderr).lines();
let mut stderr_text = String::new();
while let Some(line) = lines.next_line().await? {
stderr_text.push_str(&line);
stderr_text.push('\n');
}
assert!(
!status.success(),
"wrapper mode should fail when socket path is invalid"
);
assert!(
stderr_text.contains("wrapper socket"),
"expected wrapper socket failure message, got: {stderr_text}"
);
Ok(())
}
#[tokio::test]
async fn wrapper_mode_sends_origin_metadata_from_env() -> Result<()> {
let sidecar = env!("CARGO_BIN_EXE_codex-zsh-sidecar");
let socket_path = std::env::temp_dir().join(format!(
"czs-wrapper-test-{}.sock",
uuid::Uuid::new_v4().as_simple()
));
let _ = std::fs::remove_file(&socket_path);
let listener = UnixListener::bind(&socket_path)
.with_context(|| format!("bind test listener at {}", socket_path.display()))?;
let mut child = Command::new(sidecar)
.arg("/usr/bin/true")
.env(WRAPPER_MODE_ENV_VAR, "1")
.env(
WRAPPER_SOCKET_ENV_VAR,
socket_path.to_string_lossy().to_string(),
)
.env(WRAPPER_ORIGIN_ENV_VAR, "1")
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
.context("spawn wrapper mode sidecar process")?;
let (mut stream, _) = timeout(Duration::from_secs(3), listener.accept())
.await
.context("timed out waiting for wrapper connection")??;
let mut request_buf = Vec::new();
stream
.read_to_end(&mut request_buf)
.await
.context("read wrapper request payload")?;
let request: JsonValue =
serde_json::from_slice(&request_buf).context("parse wrapper request payload")?;
assert_eq!(
request.pointer("/origin"),
Some(&JsonValue::from("login_startup"))
);
stream
.write_all(br#"{"action":"run","reason":null}"#)
.await
.context("write wrapper allow response")?;
stream.shutdown().await.context("shutdown wrapper stream")?;
let status = timeout(Duration::from_secs(3), child.wait())
.await
.context("timed out waiting for wrapper mode process exit")??;
let _ = std::fs::remove_file(&socket_path);
assert!(
status.success(),
"wrapper mode should succeed when action=run"
);
Ok(())
}
struct SidecarHarness {
child: Child,
stdin: ChildStdin,
lines: tokio::io::Lines<BufReader<ChildStdout>>,
zsh_path: std::path::PathBuf,
}
impl SidecarHarness {
async fn start() -> Result<Option<Self>> {
let Some(zsh_path) = std::env::var_os("CODEX_TEST_ZSH_PATH") else {
eprintln!("skipping direct sidecar protocol test: CODEX_TEST_ZSH_PATH is not set");
return Ok(None);
};
let zsh_path = std::path::PathBuf::from(zsh_path);
if !zsh_path.is_file() {
anyhow::bail!(
"CODEX_TEST_ZSH_PATH is set but is not a file: {}",
zsh_path.display()
);
}
let sidecar = env!("CARGO_BIN_EXE_codex-zsh-sidecar");
let mut child = Command::new(sidecar)
.arg("--zsh-path")
.arg(&zsh_path)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::inherit())
.spawn()
.context("spawn codex-zsh-sidecar")?;
let stdin = child.stdin.take().context("missing sidecar stdin")?;
let stdout = child.stdout.take().context("missing sidecar stdout")?;
Ok(Some(Self {
child,
stdin,
lines: BufReader::new(stdout).lines(),
zsh_path,
}))
}
async fn initialize(&mut self) -> Result<()> {
self.write_json_line(&serde_json::json!({
"jsonrpc": JSONRPC_VERSION,
"id": 1,
"method": "zsh/initialize",
"params": {
"sessionId": "test-session"
}
}))
.await?;
self.wait_for_response(1).await?;
Ok(())
}
async fn start_exec_with_command(
&mut self,
shell_command: &str,
approval_reason: Option<&str>,
) -> Result<()> {
self.write_json_line(&serde_json::json!({
"jsonrpc": JSONRPC_VERSION,
"id": EXEC_START_REQUEST_ID,
"method": "zsh/execStart",
"params": {
"execId": "exec-test-1",
"command": [self.zsh_path.to_string_lossy(), "-fc", shell_command],
"cwd": std::env::current_dir()?.to_string_lossy().to_string(),
"env": {},
"approvalReason": approval_reason
}
}))
.await
}
async fn respond_approval(&mut self, id: JsonValue, decision: &str) -> Result<()> {
self.write_json_line(&serde_json::json!({
"jsonrpc": JSONRPC_VERSION,
"id": id,
"result": {
"decision": decision
}
}))
.await
}
async fn shutdown(&mut self) -> Result<()> {
self.write_json_line(&serde_json::json!({
"jsonrpc": JSONRPC_VERSION,
"id": 3,
"method": "zsh/shutdown",
"params": {
"graceMs": 100
}
}))
.await?;
self.wait_for_response(3).await?;
let status = timeout(Duration::from_secs(3), self.child.wait())
.await
.context("timed out waiting for sidecar process exit")??;
assert!(status.success(), "sidecar should exit cleanly");
Ok(())
}
async fn read_next_message(&mut self) -> Result<JsonValue> {
let line = timeout(Duration::from_secs(10), self.lines.next_line())
.await
.context("timed out reading sidecar output")??
.context("sidecar stdout closed unexpectedly")?;
serde_json::from_str(&line).with_context(|| format!("parse sidecar JSON line: {line}"))
}
async fn read_next_message_with_timeout(
&mut self,
duration: Duration,
) -> Result<Option<JsonValue>> {
let line = match timeout(duration, self.lines.next_line()).await {
Ok(line) => line?,
Err(_) => return Ok(None),
};
let Some(line) = line else {
return Ok(None);
};
let value = serde_json::from_str(&line)
.with_context(|| format!("parse sidecar JSON line: {line}"))?;
Ok(Some(value))
}
async fn wait_for_response(&mut self, id: i64) -> Result<JsonValue> {
loop {
let value = self.read_next_message().await?;
if value.get("id").and_then(JsonValue::as_i64) == Some(id) {
return Ok(value);
}
}
}
async fn write_json_line(&mut self, value: &JsonValue) -> Result<()> {
let encoded = serde_json::to_string(value).context("serialize JSON line")?;
self.stdin
.write_all(encoded.as_bytes())
.await
.context("write JSON line")?;
self.stdin
.write_all(b"\n")
.await
.context("write line break")?;
self.stdin.flush().await.context("flush stdin")
}
}
fn parse_approval_request(value: &JsonValue) -> Option<(JsonValue, Option<String>, Vec<String>)> {
if value.get("method").and_then(JsonValue::as_str) != Some("zsh/requestApproval") {
return None;
}
let id = value.get("id")?.clone();
let reason = value
.pointer("/params/reason")
.and_then(JsonValue::as_str)
.map(ToString::to_string);
let command = value
.pointer("/params/command")
.and_then(JsonValue::as_array)
.map(|items| {
items
.iter()
.filter_map(JsonValue::as_str)
.map(ToString::to_string)
.collect()
})
.unwrap_or_default();
Some((id, reason, command))
}
struct CompoundExecOutcome {
saw_exec_start_success: bool,
subcommand_callbacks: usize,
exit_code: Option<i32>,
}
async fn run_compound_exec_with_second_subcommand_decision(
second_subcommand_decision: &str,
) -> Result<Option<CompoundExecOutcome>> {
let Some(mut harness) = SidecarHarness::start().await? else {
return Ok(None);
};
harness.initialize().await?;
harness
.start_exec_with_command("/usr/bin/true && /usr/bin/true", None)
.await?;
let mut subcommand_callbacks = 0usize;
let mut saw_exec_start_success = false;
let mut saw_exec_exited = false;
let mut exit_code = None;
while !saw_exec_exited {
let value = harness.read_next_message().await?;
if let Some((id, _reason, command)) = parse_approval_request(&value) {
if command.first().is_some_and(|c| c == "/usr/bin/true") {
subcommand_callbacks += 1;
if subcommand_callbacks == 2 {
harness
.respond_approval(id, second_subcommand_decision)
.await?;
} else {
harness.respond_approval(id, "approved").await?;
}
} else {
harness.respond_approval(id, "approved").await?;
}
continue;
}
if value.get("id").and_then(JsonValue::as_i64) == Some(EXEC_START_REQUEST_ID)
&& value.get("result").is_some()
{
saw_exec_start_success = true;
continue;
}
if value.get("method").and_then(JsonValue::as_str) == Some("zsh/event/execExited") {
saw_exec_exited = true;
exit_code = value
.pointer("/params/exitCode")
.and_then(JsonValue::as_i64)
.map(|code| code as i32);
}
}
harness.shutdown().await?;
Ok(Some(CompoundExecOutcome {
saw_exec_start_success,
subcommand_callbacks,
exit_code,
}))
}