Compare commits

..

1 Commits

Author SHA1 Message Date
Michael Bolin
8898ed33b0 refactor: prepare unified exec for zsh-fork backend 2026-03-03 20:47:16 -08:00
12 changed files with 348 additions and 33 deletions

View File

@@ -70,7 +70,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.108.0-alpha.9"
version = "0.0.0"
# Track the edition for all workspace crates in one place. Individual
# crates can still override this value, but keeping it here means new
# crates created with `cargo new -w ...` automatically inherit the 2024

View File

@@ -5,7 +5,7 @@ Executes shell requests under the orchestrator: asks for approval when needed,
builds a CommandSpec, and runs it under the current SandboxAttempt.
*/
#[cfg(unix)]
mod unix_escalation;
pub(crate) mod unix_escalation;
use crate::command_canonicalization::canonicalize_command_for_approval;
use crate::exec::ExecToolCallOutput;

View File

@@ -6,6 +6,7 @@ use crate::exec::ExecToolCallOutput;
use crate::exec::SandboxType;
use crate::exec::is_likely_sandbox_denied;
use crate::features::Feature;
use crate::sandboxing::ExecRequest;
use crate::sandboxing::SandboxPermissions;
use crate::shell::ShellType;
use crate::skills::SkillMetadata;
@@ -36,6 +37,7 @@ use codex_shell_escalation::EscalationDecision;
use codex_shell_escalation::EscalationExecution;
use codex_shell_escalation::EscalationPermissions;
use codex_shell_escalation::EscalationPolicy;
use codex_shell_escalation::EscalationSession;
use codex_shell_escalation::ExecParams;
use codex_shell_escalation::ExecResult;
use codex_shell_escalation::Permissions as EscalatedPermissions;
@@ -51,6 +53,11 @@ use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
pub(crate) struct PreparedUnifiedExecZshFork {
pub(crate) exec_request: ExecRequest,
pub(crate) escalation_session: EscalationSession,
}
pub(super) async fn try_run_zsh_fork(
req: &ShellRequest,
attempt: &SandboxAttempt<'_>,
@@ -95,7 +102,7 @@ pub(super) async fn try_run_zsh_fork(
justification,
arg0,
} = sandbox_exec_request;
let ParsedShellCommand { script, login } = extract_shell_script(&command)?;
let ParsedShellCommand { script, login, .. } = extract_shell_script(&command)?;
let effective_timeout = Duration::from_millis(
req.timeout_ms
.unwrap_or(crate::exec::DEFAULT_EXEC_COMMAND_TIMEOUT_MS),
@@ -172,6 +179,103 @@ pub(super) async fn try_run_zsh_fork(
map_exec_result(attempt.sandbox, exec_result).map(Some)
}
pub(crate) async fn prepare_unified_exec_zsh_fork(
req: &crate::tools::runtimes::unified_exec::UnifiedExecRequest,
attempt: &SandboxAttempt<'_>,
ctx: &ToolCtx,
exec_request: ExecRequest,
) -> Result<Option<PreparedUnifiedExecZshFork>, ToolError> {
let Some(shell_zsh_path) = ctx.session.services.shell_zsh_path.as_ref() else {
tracing::warn!("ZshFork backend specified, but shell_zsh_path is not configured.");
return Ok(None);
};
if !ctx.session.features().enabled(Feature::ShellZshFork) {
tracing::warn!("ZshFork backend specified, but ShellZshFork feature is not enabled.");
return Ok(None);
}
if !matches!(ctx.session.user_shell().shell_type, ShellType::Zsh) {
tracing::warn!("ZshFork backend specified, but user shell is not Zsh.");
return Ok(None);
}
let parsed = match extract_shell_script(&exec_request.command) {
Ok(parsed) => parsed,
Err(err) => {
tracing::warn!("ZshFork unified exec fallback: {err:?}");
return Ok(None);
}
};
if parsed.program != shell_zsh_path.to_string_lossy() {
tracing::warn!(
"ZshFork backend specified, but unified exec command targets `{}` instead of `{}`.",
parsed.program,
shell_zsh_path.display(),
);
return Ok(None);
}
let exec_policy = Arc::new(RwLock::new(
ctx.session.services.exec_policy.current().as_ref().clone(),
));
let command_executor = CoreShellCommandExecutor {
command: exec_request.command.clone(),
cwd: exec_request.cwd.clone(),
sandbox_policy: exec_request.sandbox_policy.clone(),
sandbox: exec_request.sandbox,
env: exec_request.env.clone(),
network: exec_request.network.clone(),
windows_sandbox_level: exec_request.windows_sandbox_level,
sandbox_permissions: exec_request.sandbox_permissions,
justification: exec_request.justification.clone(),
arg0: exec_request.arg0.clone(),
sandbox_policy_cwd: ctx.turn.cwd.clone(),
macos_seatbelt_profile_extensions: ctx
.turn
.config
.permissions
.macos_seatbelt_profile_extensions
.clone(),
codex_linux_sandbox_exe: ctx.turn.codex_linux_sandbox_exe.clone(),
use_linux_sandbox_bwrap: ctx.turn.features.enabled(Feature::UseLinuxSandboxBwrap),
};
let main_execve_wrapper_exe = ctx
.session
.services
.main_execve_wrapper_exe
.clone()
.ok_or_else(|| {
ToolError::Rejected(
"zsh fork feature enabled, but execve wrapper is not configured".to_string(),
)
})?;
let escalation_policy = CoreShellActionProvider {
policy: Arc::clone(&exec_policy),
session: Arc::clone(&ctx.session),
turn: Arc::clone(&ctx.turn),
call_id: ctx.call_id.clone(),
approval_policy: ctx.turn.approval_policy.value(),
sandbox_policy: attempt.policy.clone(),
sandbox_permissions: req.sandbox_permissions,
prompt_permissions: req.additional_permissions.clone(),
stopwatch: Stopwatch::unlimited(),
};
let escalate_server = EscalateServer::new(
shell_zsh_path.clone(),
main_execve_wrapper_exe,
escalation_policy,
);
let escalation_session = escalate_server
.start_session(Arc::new(command_executor))
.map_err(|err| ToolError::Rejected(err.to_string()))?;
let mut exec_request = exec_request;
exec_request.env.extend(escalation_session.env().clone());
Ok(Some(PreparedUnifiedExecZshFork {
exec_request,
escalation_session,
}))
}
struct CoreShellActionProvider {
policy: Arc<RwLock<Policy>>,
session: Arc<crate::codex::Session>,
@@ -809,6 +913,7 @@ impl CoreShellCommandExecutor {
#[derive(Debug, Eq, PartialEq)]
struct ParsedShellCommand {
program: String,
script: String,
login: bool,
}
@@ -817,12 +922,20 @@ fn extract_shell_script(command: &[String]) -> Result<ParsedShellCommand, ToolEr
// Commands reaching zsh-fork can be wrapped by environment/sandbox helpers, so
// we search for the first `-c`/`-lc` triple anywhere in the argv rather
// than assuming it is the first positional form.
if let Some((script, login)) = command.windows(3).find_map(|parts| match parts {
[_, flag, script] if flag == "-c" => Some((script.to_owned(), false)),
[_, flag, script] if flag == "-lc" => Some((script.to_owned(), true)),
if let Some((program, script, login)) = command.windows(3).find_map(|parts| match parts {
[program, flag, script] if flag == "-c" => {
Some((program.to_owned(), script.to_owned(), false))
}
[program, flag, script] if flag == "-lc" => {
Some((program.to_owned(), script.to_owned(), true))
}
_ => None,
}) {
return Ok(ParsedShellCommand { script, login });
return Ok(ParsedShellCommand {
program,
script,
login,
});
}
Err(ToolError::Rejected(

View File

@@ -64,6 +64,7 @@ fn extract_shell_script_preserves_login_flag() {
assert_eq!(
extract_shell_script(&["/bin/zsh".into(), "-lc".into(), "echo hi".into()]).unwrap(),
ParsedShellCommand {
program: "/bin/zsh".to_string(),
script: "echo hi".to_string(),
login: true,
}
@@ -71,6 +72,7 @@ fn extract_shell_script_preserves_login_flag() {
assert_eq!(
extract_shell_script(&["/bin/zsh".into(), "-c".into(), "echo hi".into()]).unwrap(),
ParsedShellCommand {
program: "/bin/zsh".to_string(),
script: "echo hi".to_string(),
login: false,
}
@@ -89,6 +91,7 @@ fn extract_shell_script_supports_wrapped_command_prefixes() {
])
.unwrap(),
ParsedShellCommand {
program: "/bin/zsh".to_string(),
script: "echo hello".to_string(),
login: true,
}
@@ -105,6 +108,7 @@ fn extract_shell_script_supports_wrapped_command_prefixes() {
])
.unwrap(),
ParsedShellCommand {
program: "/bin/zsh".to_string(),
script: "pwd".to_string(),
login: false,
}

View File

@@ -16,6 +16,8 @@ use crate::tools::network_approval::NetworkApprovalMode;
use crate::tools::network_approval::NetworkApprovalSpec;
use crate::tools::runtimes::build_command_spec;
use crate::tools::runtimes::maybe_wrap_shell_lc_with_snapshot;
#[cfg(unix)]
use crate::tools::runtimes::shell::unix_escalation;
use crate::tools::sandboxing::Approvable;
use crate::tools::sandboxing::ApprovalCtx;
use crate::tools::sandboxing::ExecApprovalRequirement;
@@ -28,6 +30,7 @@ use crate::tools::sandboxing::ToolError;
use crate::tools::sandboxing::ToolRuntime;
use crate::tools::sandboxing::sandbox_override_for_first_attempt;
use crate::tools::sandboxing::with_cached_approval;
use crate::tools::spec::UnifiedExecBackendConfig;
use crate::unified_exec::UnifiedExecError;
use crate::unified_exec::UnifiedExecProcess;
use crate::unified_exec::UnifiedExecProcessManager;
@@ -63,10 +66,18 @@ pub struct UnifiedExecApprovalKey {
pub struct UnifiedExecRuntime<'a> {
manager: &'a UnifiedExecProcessManager,
#[cfg(unix)]
backend: UnifiedExecBackendConfig,
}
impl<'a> UnifiedExecRuntime<'a> {
pub fn new(manager: &'a UnifiedExecProcessManager) -> Self {
#[cfg(unix)]
pub fn new(manager: &'a UnifiedExecProcessManager, backend: UnifiedExecBackendConfig) -> Self {
Self { manager, backend }
}
#[cfg(not(unix))]
pub fn new(manager: &'a UnifiedExecProcessManager, _backend: UnifiedExecBackendConfig) -> Self {
Self { manager }
}
}
@@ -184,6 +195,54 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
if let Some(network) = req.network.as_ref() {
network.apply_to_env(&mut env);
}
#[cfg(unix)]
if self.backend == UnifiedExecBackendConfig::ZshFork {
let spec = build_command_spec(
&command,
&req.cwd,
&env,
ExecExpiration::DefaultTimeout,
req.sandbox_permissions,
req.additional_permissions.clone(),
req.justification.clone(),
)
.map_err(|_| ToolError::Rejected("missing command line for PTY".to_string()))?;
let exec_env = attempt
.env_for(spec, req.network.as_ref())
.map_err(|err| ToolError::Codex(err.into()))?;
match unix_escalation::prepare_unified_exec_zsh_fork(req, attempt, ctx, exec_env)
.await?
{
Some(prepared) => {
let unix_escalation::PreparedUnifiedExecZshFork {
exec_request,
escalation_session,
} = prepared;
return self
.manager
.open_session_with_exec_env(
&exec_request,
req.tty,
Some(escalation_session),
)
.await
.map_err(|err| match err {
UnifiedExecError::SandboxDenied { output, .. } => {
ToolError::Codex(CodexErr::Sandbox(SandboxErr::Denied {
output: Box::new(output),
network_policy_decision: None,
}))
}
other => ToolError::Rejected(other.to_string()),
});
}
None => {
tracing::warn!(
"UnifiedExec ZshFork backend specified, but conditions for using it were not met, falling back to direct execution",
);
}
}
}
let spec = build_command_spec(
&command,
&req.cwd,
@@ -198,7 +257,7 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
.env_for(spec, req.network.as_ref())
.map_err(|err| ToolError::Codex(err.into()))?;
self.manager
.open_session_with_exec_env(&exec_env, req.tty)
.open_session_with_exec_env(&exec_env, req.tty, None)
.await
.map_err(|err| match err {
UnifiedExecError::SandboxDenied { output, .. } => {

View File

@@ -42,10 +42,17 @@ pub enum ShellCommandBackendConfig {
ZshFork,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum UnifiedExecBackendConfig {
Direct,
ZshFork,
}
#[derive(Debug, Clone)]
pub(crate) struct ToolsConfig {
pub shell_type: ConfigShellToolType,
shell_command_backend: ShellCommandBackendConfig,
pub unified_exec_backend: UnifiedExecBackendConfig,
pub allow_login_shell: bool,
pub apply_patch_tool_type: Option<ApplyPatchToolType>,
pub web_search_mode: Option<WebSearchMode>,
@@ -94,6 +101,12 @@ impl ToolsConfig {
} else {
ShellCommandBackendConfig::Classic
};
let unified_exec_backend =
if features.enabled(Feature::ShellTool) && features.enabled(Feature::ShellZshFork) {
UnifiedExecBackendConfig::ZshFork
} else {
UnifiedExecBackendConfig::Direct
};
let shell_type = if !features.enabled(Feature::ShellTool) {
ConfigShellToolType::Disabled
@@ -132,6 +145,7 @@ impl ToolsConfig {
Self {
shell_type,
shell_command_backend,
unified_exec_backend,
allow_login_shell: true,
apply_patch_tool_type,
web_search_mode: *web_search_mode,
@@ -2727,6 +2741,10 @@ mod tests {
tools_config.shell_command_backend,
ShellCommandBackendConfig::ZshFork
);
assert_eq!(
tools_config.unified_exec_backend,
UnifiedExecBackendConfig::ZshFork
);
}
#[test]

View File

@@ -24,6 +24,12 @@ use super::UNIFIED_EXEC_OUTPUT_MAX_TOKENS;
use super::UnifiedExecError;
use super::head_tail_buffer::HeadTailBuffer;
#[cfg(unix)]
pub(crate) use codex_shell_escalation::EscalationSession;
#[cfg(not(unix))]
#[derive(Debug)]
pub(crate) struct EscalationSession;
pub(crate) type OutputBuffer = Arc<Mutex<HeadTailBuffer>>;
pub(crate) struct OutputHandles {
pub(crate) output_buffer: OutputBuffer,
@@ -44,6 +50,7 @@ pub(crate) struct UnifiedExecProcess {
output_drained: Arc<Notify>,
output_task: JoinHandle<()>,
sandbox_type: SandboxType,
_escalation_session: Option<EscalationSession>,
}
impl UnifiedExecProcess {
@@ -51,6 +58,7 @@ impl UnifiedExecProcess {
process_handle: ExecCommandSession,
initial_output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
sandbox_type: SandboxType,
escalation_session: Option<EscalationSession>,
) -> Self {
let output_buffer = Arc::new(Mutex::new(HeadTailBuffer::default()));
let output_notify = Arc::new(Notify::new());
@@ -92,6 +100,7 @@ impl UnifiedExecProcess {
output_drained,
output_task,
sandbox_type,
_escalation_session: escalation_session,
}
}
@@ -196,13 +205,14 @@ impl UnifiedExecProcess {
pub(super) async fn from_spawned(
spawned: SpawnedPty,
sandbox_type: SandboxType,
escalation_session: Option<EscalationSession>,
) -> Result<Self, UnifiedExecError> {
let SpawnedPty {
session: process_handle,
output_rx,
mut exit_rx,
} = spawned;
let managed = Self::new(process_handle, output_rx, sandbox_type);
let managed = Self::new(process_handle, output_rx, sandbox_type, escalation_session);
let exit_ready = matches!(exit_rx.try_recv(), Ok(_) | Err(TryRecvError::Closed));

View File

@@ -47,6 +47,7 @@ use crate::unified_exec::async_watcher::start_streaming_output;
use crate::unified_exec::clamp_yield_time;
use crate::unified_exec::generate_chunk_id;
use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
use crate::unified_exec::process::EscalationSession;
use crate::unified_exec::process::OutputBuffer;
use crate::unified_exec::process::OutputHandles;
use crate::unified_exec::process::UnifiedExecProcess;
@@ -528,6 +529,7 @@ impl UnifiedExecProcessManager {
&self,
env: &ExecRequest,
tty: bool,
escalation_session: Option<EscalationSession>,
) -> Result<UnifiedExecProcess, UnifiedExecError> {
let (program, args) = env
.command
@@ -555,7 +557,15 @@ impl UnifiedExecProcessManager {
};
let spawned =
spawn_result.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
UnifiedExecProcess::from_spawned(spawned, env.sandbox).await
#[cfg(unix)]
let escalation_session = {
let mut escalation_session = escalation_session;
if let Some(session) = escalation_session.as_mut() {
session.close_client_socket();
}
escalation_session
};
UnifiedExecProcess::from_spawned(spawned, env.sandbox, escalation_session).await
}
pub(super) async fn open_session_with_sandbox(
@@ -569,7 +579,8 @@ impl UnifiedExecProcessManager {
Some(context.session.conversation_id),
));
let mut orchestrator = ToolOrchestrator::new();
let mut runtime = UnifiedExecRuntime::new(self);
let mut runtime =
UnifiedExecRuntime::new(self, context.turn.tools_config.unified_exec_backend);
let exec_approval_requirement = context
.session
.services

View File

@@ -14,6 +14,8 @@ pub use unix::EscalationPermissions;
#[cfg(unix)]
pub use unix::EscalationPolicy;
#[cfg(unix)]
pub use unix::EscalationSession;
#[cfg(unix)]
pub use unix::ExecParams;
#[cfg(unix)]
pub use unix::ExecResult;

View File

@@ -7,7 +7,9 @@ use std::time::Duration;
use anyhow::Context as _;
use codex_utils_absolute_path::AbsolutePathBuf;
use socket2::Socket;
use tokio::process::Command;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::unix::escalate_protocol::ESCALATE_SOCKET_ENV_VAR;
@@ -82,6 +84,29 @@ pub struct PreparedExec {
pub arg0: Option<String>,
}
#[derive(Debug)]
pub struct EscalationSession {
env: HashMap<String, String>,
task: JoinHandle<anyhow::Result<()>>,
client_socket: Option<Socket>,
}
impl EscalationSession {
pub fn env(&self) -> &HashMap<String, String> {
&self.env
}
pub fn close_client_socket(&mut self) {
self.client_socket.take();
}
}
impl Drop for EscalationSession {
fn drop(&mut self) {
self.task.abort();
}
}
pub struct EscalateServer {
bash_path: PathBuf,
execve_wrapper: PathBuf,
@@ -106,16 +131,42 @@ impl EscalateServer {
cancel_rx: CancellationToken,
command_executor: Arc<dyn ShellCommandExecutor>,
) -> anyhow::Result<ExecResult> {
let session = self.start_session(Arc::clone(&command_executor))?;
let command = vec![
self.bash_path.to_string_lossy().to_string(),
if params.login == Some(false) {
"-c".to_string()
} else {
"-lc".to_string()
},
params.command,
];
let workdir = AbsolutePathBuf::try_from(params.workdir)?;
let result = command_executor
.run(
command,
workdir.to_path_buf(),
session.env().clone(),
cancel_rx,
)
.await?;
Ok(result)
}
pub fn start_session(
&self,
command_executor: Arc<dyn ShellCommandExecutor>,
) -> anyhow::Result<EscalationSession> {
let (escalate_server, escalate_client) = AsyncDatagramSocket::pair()?;
let client_socket = escalate_client.into_inner();
// Only the client endpoint should cross exec into the wrapper process.
client_socket.set_cloexec(false)?;
let escalate_task = tokio::spawn(escalate_task(
let task = tokio::spawn(escalate_task(
escalate_server,
Arc::clone(&self.policy),
Arc::clone(&command_executor),
));
let mut env = std::env::vars().collect::<HashMap<String, String>>();
let mut env = HashMap::new();
env.insert(
ESCALATE_SOCKET_ENV_VAR.to_string(),
client_socket.as_raw_fd().to_string(),
@@ -128,22 +179,11 @@ impl EscalateServer {
LEGACY_BASH_EXEC_WRAPPER_ENV_VAR.to_string(),
self.execve_wrapper.to_string_lossy().to_string(),
);
let command = vec![
self.bash_path.to_string_lossy().to_string(),
if params.login == Some(false) {
"-c".to_string()
} else {
"-lc".to_string()
},
params.command,
];
let workdir = AbsolutePathBuf::try_from(params.workdir)?;
let result = command_executor
.run(command, workdir.to_path_buf(), env, cancel_rx)
.await?;
escalate_task.abort();
Ok(result)
Ok(EscalationSession {
env,
task,
client_socket: Some(client_socket),
})
}
}
@@ -390,6 +430,37 @@ mod tests {
}
}
#[tokio::test]
async fn start_session_exposes_wrapper_env_overlay() -> anyhow::Result<()> {
let execve_wrapper = PathBuf::from("/tmp/codex-execve-wrapper");
let execve_wrapper_str = execve_wrapper.to_string_lossy().to_string();
let server = EscalateServer::new(
PathBuf::from("/bin/bash"),
execve_wrapper.clone(),
DeterministicEscalationPolicy {
decision: EscalationDecision::run(),
},
);
let mut session = server.start_session(Arc::new(ForwardingShellCommandExecutor))?;
let env = session.env();
assert_eq!(env.get(EXEC_WRAPPER_ENV_VAR), Some(&execve_wrapper_str));
assert_eq!(
env.get(LEGACY_BASH_EXEC_WRAPPER_ENV_VAR),
Some(&execve_wrapper_str)
);
let socket_fd = env
.get(ESCALATE_SOCKET_ENV_VAR)
.expect("session should export shell escalation socket");
let socket_fd = socket_fd.parse::<i32>()?;
assert!(socket_fd >= 0);
assert_ne!(unsafe { libc::fcntl(socket_fd, libc::F_GETFD) }, -1);
session.close_client_socket();
assert_eq!(unsafe { libc::fcntl(socket_fd, libc::F_GETFD) }, -1);
Ok(())
}
#[tokio::test]
async fn handle_escalate_session_respects_run_in_sandbox_decision() -> anyhow::Result<()> {
let (server, client) = AsyncSocket::pair()?;

View File

@@ -66,6 +66,7 @@ pub use self::escalate_protocol::EscalateAction;
pub use self::escalate_protocol::EscalationDecision;
pub use self::escalate_protocol::EscalationExecution;
pub use self::escalate_server::EscalateServer;
pub use self::escalate_server::EscalationSession;
pub use self::escalate_server::ExecParams;
pub use self::escalate_server::ExecResult;
pub use self::escalate_server::PreparedExec;

View File

@@ -9,7 +9,7 @@ use tokio_util::sync::CancellationToken;
#[derive(Clone, Debug)]
pub struct Stopwatch {
limit: Duration,
limit: Option<Duration>,
inner: Arc<Mutex<StopwatchState>>,
notify: Arc<Notify>,
}
@@ -30,13 +30,27 @@ impl Stopwatch {
active_pauses: 0,
})),
notify: Arc::new(Notify::new()),
limit,
limit: Some(limit),
}
}
pub fn unlimited() -> Self {
Self {
inner: Arc::new(Mutex::new(StopwatchState {
elapsed: Duration::ZERO,
running_since: Some(Instant::now()),
active_pauses: 0,
})),
notify: Arc::new(Notify::new()),
limit: None,
}
}
pub fn cancellation_token(&self) -> CancellationToken {
let limit = self.limit;
let token = CancellationToken::new();
let Some(limit) = self.limit else {
return token;
};
let cancel = token.clone();
let inner = Arc::clone(&self.inner);
let notify = Arc::clone(&self.notify);
@@ -208,4 +222,16 @@ mod tests {
// Now the stopwatch should resume and hit the limit shortly after.
token.cancelled().await;
}
#[tokio::test]
async fn unlimited_stopwatch_never_cancels() {
let stopwatch = Stopwatch::unlimited();
let token = stopwatch.cancellation_token();
assert!(
timeout(Duration::from_millis(30), token.cancelled())
.await
.is_err()
);
}
}