Compare commits

...

4 Commits

Author SHA1 Message Date
jif-oai
aae90b9c8e Merge branch 'main' into jif/no-timeout 2026-02-20 09:06:21 +00:00
jif-oai
fe80c14174 Merge branch 'main' into jif/no-timeout 2026-02-19 18:45:02 +00:00
jif-oai
245626ec69 fix 2026-02-19 18:37:34 +00:00
jif-oai
6f74470567 feat: no timeout mode on ue 2026-02-19 18:19:51 +00:00
5 changed files with 325 additions and 23 deletions

View File

@@ -54,10 +54,12 @@ struct WriteStdinArgs {
session_id: i32,
#[serde(default)]
chars: String,
#[serde(default = "default_write_stdin_yield_time_ms")]
yield_time_ms: u64,
#[serde(default)]
yield_time_ms: Option<u64>,
#[serde(default)]
max_output_tokens: Option<usize>,
#[serde(default)]
no_timeout: bool,
}
fn default_exec_yield_time_ms() -> u64 {
@@ -199,12 +201,29 @@ impl ToolHandler for UnifiedExecHandler {
}
"write_stdin" => {
let args: WriteStdinArgs = parse_arguments(&arguments)?;
if args.no_timeout {
if !args.chars.is_empty() {
return Err(FunctionCallError::RespondToModel(
"`no_timeout=true` requires empty `chars`.".to_string(),
));
}
if args.yield_time_ms.is_some() {
return Err(FunctionCallError::RespondToModel(
"`no_timeout=true` requires `yield_time_ms` to be omitted.".to_string(),
));
}
}
let yield_time_ms = args
.yield_time_ms
.unwrap_or_else(default_write_stdin_yield_time_ms);
let response = manager
.write_stdin(WriteStdinRequest {
process_id: &args.session_id.to_string(),
input: &args.chars,
yield_time_ms: args.yield_time_ms,
yield_time_ms,
max_output_tokens: args.max_output_tokens,
no_timeout: args.no_timeout,
})
.await
.map_err(|err| {

View File

@@ -316,6 +316,14 @@ fn create_write_stdin_tool() -> ToolSpec {
),
},
),
(
"no_timeout".to_string(),
JsonSchema::Boolean {
description: Some(
"If set to true, nothing will be returned before the end of the process. This can only be used with empty `chars` and empty `yield_time_ms`. This must be used to wait for very long running processes/tests or to schedule a wait.".to_string()
)
}
)
]);
ToolSpec::Function(ResponsesApiTool {

View File

@@ -99,6 +99,7 @@ pub(crate) struct WriteStdinRequest<'a> {
pub input: &'a str,
pub yield_time_ms: u64,
pub max_output_tokens: Option<usize>,
pub no_timeout: bool,
}
#[derive(Debug, Clone, PartialEq)]
@@ -247,6 +248,7 @@ mod tests {
input,
yield_time_ms,
max_output_tokens: None,
no_timeout: false,
})
.await
}

View File

@@ -351,27 +351,58 @@ impl UnifiedExecProcessManager {
}
let max_tokens = resolve_max_tokens(request.max_output_tokens);
let yield_time_ms = {
// Empty polls use configurable background timeout bounds. Non-empty
// writes keep a fixed max cap so interactive stdin remains responsive.
let time_ms = request.yield_time_ms.max(MIN_YIELD_TIME_MS);
if request.input.is_empty() {
time_ms.clamp(MIN_EMPTY_YIELD_TIME_MS, self.max_write_stdin_yield_time_ms)
} else {
time_ms.min(MAX_YIELD_TIME_MS)
}
};
let start = Instant::now();
let deadline = start + Duration::from_millis(yield_time_ms);
let collected = Self::collect_output_until_deadline(
&output_buffer,
&output_notify,
&output_closed,
&output_closed_notify,
&cancellation_token,
deadline,
)
.await;
let collected = if request.no_timeout {
let mut collected = Vec::with_capacity(4096);
loop {
let mut next = Self::collect_output_until_deadline(
&output_buffer,
&output_notify,
&output_closed,
&output_closed_notify,
&cancellation_token,
Instant::now() + Duration::from_millis(self.max_write_stdin_yield_time_ms),
)
.await;
collected.append(&mut next);
if cancellation_token.is_cancelled() {
let mut trailing = Self::collect_output_until_deadline(
&output_buffer,
&output_notify,
&output_closed,
&output_closed_notify,
&cancellation_token,
Instant::now() + Duration::from_millis(50),
)
.await;
collected.append(&mut trailing);
break;
}
}
collected
} else {
let yield_time_ms = {
// Empty polls use configurable background timeout bounds. Non-empty
// writes keep a fixed max cap so interactive stdin remains responsive.
let time_ms = request.yield_time_ms.max(MIN_YIELD_TIME_MS);
if request.input.is_empty() {
time_ms.clamp(MIN_EMPTY_YIELD_TIME_MS, self.max_write_stdin_yield_time_ms)
} else {
time_ms.min(MAX_YIELD_TIME_MS)
}
};
let deadline = start + Duration::from_millis(yield_time_ms);
Self::collect_output_until_deadline(
&output_buffer,
&output_notify,
&output_closed,
&output_closed_notify,
&cancellation_token,
deadline,
)
.await
};
let wall_time = Instant::now().saturating_duration_since(start);
let text = String::from_utf8_lossy(&collected).to_string();

View File

@@ -1764,6 +1764,248 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn write_stdin_no_timeout_waits_for_process_exit() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::UnifiedExec);
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let start_call_id = "uexec-no-timeout-start";
let wait_call_id = "uexec-no-timeout-wait";
let start_args = serde_json::json!({
"cmd": "sleep 1 && echo NO_TIMEOUT_DONE",
"yield_time_ms": 250,
"tty": true,
});
let wait_args = serde_json::json!({
"session_id": 1000,
"no_timeout": true,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
start_call_id,
"exec_command",
&serde_json::to_string(&start_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
wait_call_id,
"write_stdin",
&serde_json::to_string(&wait_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_response_created("resp-3"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-3"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "wait for long process exit".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
summary: ReasoningSummary::Auto,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let bodies = requests
.into_iter()
.map(|request| request.body_json())
.collect::<Vec<_>>();
let outputs = collect_tool_outputs(&bodies)?;
let start_output = outputs
.get(start_call_id)
.expect("missing start output for exec_command");
assert!(
start_output.process_id.is_some(),
"expected exec_command to leave an active process"
);
assert!(
start_output.exit_code.is_none(),
"start output should not include exit metadata"
);
let wait_output = outputs
.get(wait_call_id)
.expect("missing no-timeout write_stdin output");
assert!(
wait_output.wall_time_seconds >= 0.6,
"no_timeout should wait for process completion; wall_time={}",
wait_output.wall_time_seconds
);
assert!(
wait_output.process_id.is_none(),
"process_id should be omitted after completion"
);
assert_eq!(wait_output.exit_code, Some(0));
assert!(
wait_output.output.contains("NO_TIMEOUT_DONE"),
"expected no-timeout output to include process stdout, got {:?}",
wait_output.output
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn write_stdin_no_timeout_rejects_invalid_argument_combinations() -> Result<()> {
skip_if_no_network!(Ok(()));
skip_if_sandbox!(Ok(()));
skip_if_windows!(Ok(()));
let server = start_mock_server().await;
let mut builder = test_codex().with_config(|config| {
config.features.enable(Feature::UnifiedExec);
});
let TestCodex {
codex,
cwd,
session_configured,
..
} = builder.build(&server).await?;
let start_call_id = "uexec-no-timeout-invalid-start";
let invalid_chars_call_id = "uexec-no-timeout-invalid-chars";
let invalid_yield_call_id = "uexec-no-timeout-invalid-yield";
let start_args = serde_json::json!({
"cmd": "sleep 1",
"yield_time_ms": 250,
"tty": true,
});
let invalid_chars_args = serde_json::json!({
"session_id": 1000,
"chars": "ping",
"no_timeout": true,
});
let invalid_yield_args = serde_json::json!({
"session_id": 1000,
"yield_time_ms": 1000,
"no_timeout": true,
});
let responses = vec![
sse(vec![
ev_response_created("resp-1"),
ev_function_call(
start_call_id,
"exec_command",
&serde_json::to_string(&start_args)?,
),
ev_completed("resp-1"),
]),
sse(vec![
ev_response_created("resp-2"),
ev_function_call(
invalid_chars_call_id,
"write_stdin",
&serde_json::to_string(&invalid_chars_args)?,
),
ev_completed("resp-2"),
]),
sse(vec![
ev_response_created("resp-3"),
ev_function_call(
invalid_yield_call_id,
"write_stdin",
&serde_json::to_string(&invalid_yield_args)?,
),
ev_completed("resp-3"),
]),
sse(vec![
ev_response_created("resp-4"),
ev_assistant_message("msg-1", "done"),
ev_completed("resp-4"),
]),
];
let request_log = mount_sse_sequence(&server, responses).await;
let session_model = session_configured.model.clone();
codex
.submit(Op::UserTurn {
items: vec![UserInput::Text {
text: "validate no_timeout args".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
cwd: cwd.path().to_path_buf(),
approval_policy: AskForApproval::Never,
sandbox_policy: SandboxPolicy::DangerFullAccess,
model: session_model,
effort: None,
summary: ReasoningSummary::Auto,
collaboration_mode: None,
personality: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let requests = request_log.requests();
assert!(!requests.is_empty(), "expected at least one POST request");
let invalid_chars_output = requests
.iter()
.find_map(|request| request.function_call_output_text(invalid_chars_call_id))
.expect("missing no-timeout invalid chars output");
assert_eq!(
invalid_chars_output,
"`no_timeout=true` requires empty `chars`."
);
let invalid_yield_output = requests
.iter()
.find_map(|request| request.function_call_output_text(invalid_yield_call_id))
.expect("missing no-timeout invalid yield output");
assert_eq!(
invalid_yield_output,
"`no_timeout=true` requires `yield_time_ms` to be omitted."
);
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<()> {
skip_if_no_network!(Ok(()));