mirror of
https://github.com/openai/codex.git
synced 2026-02-21 00:03:47 +00:00
Compare commits
4 Commits
codex/titl
...
jif/no-tim
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
aae90b9c8e | ||
|
|
fe80c14174 | ||
|
|
245626ec69 | ||
|
|
6f74470567 |
@@ -54,10 +54,12 @@ struct WriteStdinArgs {
|
||||
session_id: i32,
|
||||
#[serde(default)]
|
||||
chars: String,
|
||||
#[serde(default = "default_write_stdin_yield_time_ms")]
|
||||
yield_time_ms: u64,
|
||||
#[serde(default)]
|
||||
yield_time_ms: Option<u64>,
|
||||
#[serde(default)]
|
||||
max_output_tokens: Option<usize>,
|
||||
#[serde(default)]
|
||||
no_timeout: bool,
|
||||
}
|
||||
|
||||
fn default_exec_yield_time_ms() -> u64 {
|
||||
@@ -199,12 +201,29 @@ impl ToolHandler for UnifiedExecHandler {
|
||||
}
|
||||
"write_stdin" => {
|
||||
let args: WriteStdinArgs = parse_arguments(&arguments)?;
|
||||
if args.no_timeout {
|
||||
if !args.chars.is_empty() {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"`no_timeout=true` requires empty `chars`.".to_string(),
|
||||
));
|
||||
}
|
||||
if args.yield_time_ms.is_some() {
|
||||
return Err(FunctionCallError::RespondToModel(
|
||||
"`no_timeout=true` requires `yield_time_ms` to be omitted.".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
let yield_time_ms = args
|
||||
.yield_time_ms
|
||||
.unwrap_or_else(default_write_stdin_yield_time_ms);
|
||||
let response = manager
|
||||
.write_stdin(WriteStdinRequest {
|
||||
process_id: &args.session_id.to_string(),
|
||||
input: &args.chars,
|
||||
yield_time_ms: args.yield_time_ms,
|
||||
yield_time_ms,
|
||||
max_output_tokens: args.max_output_tokens,
|
||||
no_timeout: args.no_timeout,
|
||||
})
|
||||
.await
|
||||
.map_err(|err| {
|
||||
|
||||
@@ -316,6 +316,14 @@ fn create_write_stdin_tool() -> ToolSpec {
|
||||
),
|
||||
},
|
||||
),
|
||||
(
|
||||
"no_timeout".to_string(),
|
||||
JsonSchema::Boolean {
|
||||
description: Some(
|
||||
"If set to true, nothing will be returned before the end of the process. This can only be used with empty `chars` and empty `yield_time_ms`. This must be used to wait for very long running processes/tests or to schedule a wait.".to_string()
|
||||
)
|
||||
}
|
||||
)
|
||||
]);
|
||||
|
||||
ToolSpec::Function(ResponsesApiTool {
|
||||
|
||||
@@ -99,6 +99,7 @@ pub(crate) struct WriteStdinRequest<'a> {
|
||||
pub input: &'a str,
|
||||
pub yield_time_ms: u64,
|
||||
pub max_output_tokens: Option<usize>,
|
||||
pub no_timeout: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
@@ -247,6 +248,7 @@ mod tests {
|
||||
input,
|
||||
yield_time_ms,
|
||||
max_output_tokens: None,
|
||||
no_timeout: false,
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -351,27 +351,58 @@ impl UnifiedExecProcessManager {
|
||||
}
|
||||
|
||||
let max_tokens = resolve_max_tokens(request.max_output_tokens);
|
||||
let yield_time_ms = {
|
||||
// Empty polls use configurable background timeout bounds. Non-empty
|
||||
// writes keep a fixed max cap so interactive stdin remains responsive.
|
||||
let time_ms = request.yield_time_ms.max(MIN_YIELD_TIME_MS);
|
||||
if request.input.is_empty() {
|
||||
time_ms.clamp(MIN_EMPTY_YIELD_TIME_MS, self.max_write_stdin_yield_time_ms)
|
||||
} else {
|
||||
time_ms.min(MAX_YIELD_TIME_MS)
|
||||
}
|
||||
};
|
||||
let start = Instant::now();
|
||||
let deadline = start + Duration::from_millis(yield_time_ms);
|
||||
let collected = Self::collect_output_until_deadline(
|
||||
&output_buffer,
|
||||
&output_notify,
|
||||
&output_closed,
|
||||
&output_closed_notify,
|
||||
&cancellation_token,
|
||||
deadline,
|
||||
)
|
||||
.await;
|
||||
let collected = if request.no_timeout {
|
||||
let mut collected = Vec::with_capacity(4096);
|
||||
loop {
|
||||
let mut next = Self::collect_output_until_deadline(
|
||||
&output_buffer,
|
||||
&output_notify,
|
||||
&output_closed,
|
||||
&output_closed_notify,
|
||||
&cancellation_token,
|
||||
Instant::now() + Duration::from_millis(self.max_write_stdin_yield_time_ms),
|
||||
)
|
||||
.await;
|
||||
collected.append(&mut next);
|
||||
|
||||
if cancellation_token.is_cancelled() {
|
||||
let mut trailing = Self::collect_output_until_deadline(
|
||||
&output_buffer,
|
||||
&output_notify,
|
||||
&output_closed,
|
||||
&output_closed_notify,
|
||||
&cancellation_token,
|
||||
Instant::now() + Duration::from_millis(50),
|
||||
)
|
||||
.await;
|
||||
collected.append(&mut trailing);
|
||||
break;
|
||||
}
|
||||
}
|
||||
collected
|
||||
} else {
|
||||
let yield_time_ms = {
|
||||
// Empty polls use configurable background timeout bounds. Non-empty
|
||||
// writes keep a fixed max cap so interactive stdin remains responsive.
|
||||
let time_ms = request.yield_time_ms.max(MIN_YIELD_TIME_MS);
|
||||
if request.input.is_empty() {
|
||||
time_ms.clamp(MIN_EMPTY_YIELD_TIME_MS, self.max_write_stdin_yield_time_ms)
|
||||
} else {
|
||||
time_ms.min(MAX_YIELD_TIME_MS)
|
||||
}
|
||||
};
|
||||
let deadline = start + Duration::from_millis(yield_time_ms);
|
||||
Self::collect_output_until_deadline(
|
||||
&output_buffer,
|
||||
&output_notify,
|
||||
&output_closed,
|
||||
&output_closed_notify,
|
||||
&cancellation_token,
|
||||
deadline,
|
||||
)
|
||||
.await
|
||||
};
|
||||
let wall_time = Instant::now().saturating_duration_since(start);
|
||||
|
||||
let text = String::from_utf8_lossy(&collected).to_string();
|
||||
|
||||
@@ -1764,6 +1764,248 @@ async fn write_stdin_returns_exit_metadata_and_clears_session() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn write_stdin_no_timeout_waits_for_process_exit() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
skip_if_sandbox!(Ok(()));
|
||||
skip_if_windows!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.features.enable(Feature::UnifiedExec);
|
||||
});
|
||||
let TestCodex {
|
||||
codex,
|
||||
cwd,
|
||||
session_configured,
|
||||
..
|
||||
} = builder.build(&server).await?;
|
||||
|
||||
let start_call_id = "uexec-no-timeout-start";
|
||||
let wait_call_id = "uexec-no-timeout-wait";
|
||||
|
||||
let start_args = serde_json::json!({
|
||||
"cmd": "sleep 1 && echo NO_TIMEOUT_DONE",
|
||||
"yield_time_ms": 250,
|
||||
"tty": true,
|
||||
});
|
||||
let wait_args = serde_json::json!({
|
||||
"session_id": 1000,
|
||||
"no_timeout": true,
|
||||
});
|
||||
|
||||
let responses = vec![
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_function_call(
|
||||
start_call_id,
|
||||
"exec_command",
|
||||
&serde_json::to_string(&start_args)?,
|
||||
),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
ev_function_call(
|
||||
wait_call_id,
|
||||
"write_stdin",
|
||||
&serde_json::to_string(&wait_args)?,
|
||||
),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-3"),
|
||||
ev_assistant_message("msg-1", "done"),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
];
|
||||
let request_log = mount_sse_sequence(&server, responses).await;
|
||||
|
||||
let session_model = session_configured.model.clone();
|
||||
|
||||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "wait for long process exit".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let requests = request_log.requests();
|
||||
assert!(!requests.is_empty(), "expected at least one POST request");
|
||||
let bodies = requests
|
||||
.into_iter()
|
||||
.map(|request| request.body_json())
|
||||
.collect::<Vec<_>>();
|
||||
let outputs = collect_tool_outputs(&bodies)?;
|
||||
|
||||
let start_output = outputs
|
||||
.get(start_call_id)
|
||||
.expect("missing start output for exec_command");
|
||||
assert!(
|
||||
start_output.process_id.is_some(),
|
||||
"expected exec_command to leave an active process"
|
||||
);
|
||||
assert!(
|
||||
start_output.exit_code.is_none(),
|
||||
"start output should not include exit metadata"
|
||||
);
|
||||
|
||||
let wait_output = outputs
|
||||
.get(wait_call_id)
|
||||
.expect("missing no-timeout write_stdin output");
|
||||
assert!(
|
||||
wait_output.wall_time_seconds >= 0.6,
|
||||
"no_timeout should wait for process completion; wall_time={}",
|
||||
wait_output.wall_time_seconds
|
||||
);
|
||||
assert!(
|
||||
wait_output.process_id.is_none(),
|
||||
"process_id should be omitted after completion"
|
||||
);
|
||||
assert_eq!(wait_output.exit_code, Some(0));
|
||||
assert!(
|
||||
wait_output.output.contains("NO_TIMEOUT_DONE"),
|
||||
"expected no-timeout output to include process stdout, got {:?}",
|
||||
wait_output.output
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn write_stdin_no_timeout_rejects_invalid_argument_combinations() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
skip_if_sandbox!(Ok(()));
|
||||
skip_if_windows!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.features.enable(Feature::UnifiedExec);
|
||||
});
|
||||
let TestCodex {
|
||||
codex,
|
||||
cwd,
|
||||
session_configured,
|
||||
..
|
||||
} = builder.build(&server).await?;
|
||||
|
||||
let start_call_id = "uexec-no-timeout-invalid-start";
|
||||
let invalid_chars_call_id = "uexec-no-timeout-invalid-chars";
|
||||
let invalid_yield_call_id = "uexec-no-timeout-invalid-yield";
|
||||
|
||||
let start_args = serde_json::json!({
|
||||
"cmd": "sleep 1",
|
||||
"yield_time_ms": 250,
|
||||
"tty": true,
|
||||
});
|
||||
let invalid_chars_args = serde_json::json!({
|
||||
"session_id": 1000,
|
||||
"chars": "ping",
|
||||
"no_timeout": true,
|
||||
});
|
||||
let invalid_yield_args = serde_json::json!({
|
||||
"session_id": 1000,
|
||||
"yield_time_ms": 1000,
|
||||
"no_timeout": true,
|
||||
});
|
||||
|
||||
let responses = vec![
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_function_call(
|
||||
start_call_id,
|
||||
"exec_command",
|
||||
&serde_json::to_string(&start_args)?,
|
||||
),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
ev_function_call(
|
||||
invalid_chars_call_id,
|
||||
"write_stdin",
|
||||
&serde_json::to_string(&invalid_chars_args)?,
|
||||
),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-3"),
|
||||
ev_function_call(
|
||||
invalid_yield_call_id,
|
||||
"write_stdin",
|
||||
&serde_json::to_string(&invalid_yield_args)?,
|
||||
),
|
||||
ev_completed("resp-3"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-4"),
|
||||
ev_assistant_message("msg-1", "done"),
|
||||
ev_completed("resp-4"),
|
||||
]),
|
||||
];
|
||||
let request_log = mount_sse_sequence(&server, responses).await;
|
||||
|
||||
let session_model = session_configured.model.clone();
|
||||
|
||||
codex
|
||||
.submit(Op::UserTurn {
|
||||
items: vec![UserInput::Text {
|
||||
text: "validate no_timeout args".into(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
final_output_json_schema: None,
|
||||
cwd: cwd.path().to_path_buf(),
|
||||
approval_policy: AskForApproval::Never,
|
||||
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
||||
model: session_model,
|
||||
effort: None,
|
||||
summary: ReasoningSummary::Auto,
|
||||
collaboration_mode: None,
|
||||
personality: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
||||
|
||||
let requests = request_log.requests();
|
||||
assert!(!requests.is_empty(), "expected at least one POST request");
|
||||
|
||||
let invalid_chars_output = requests
|
||||
.iter()
|
||||
.find_map(|request| request.function_call_output_text(invalid_chars_call_id))
|
||||
.expect("missing no-timeout invalid chars output");
|
||||
assert_eq!(
|
||||
invalid_chars_output,
|
||||
"`no_timeout=true` requires empty `chars`."
|
||||
);
|
||||
|
||||
let invalid_yield_output = requests
|
||||
.iter()
|
||||
.find_map(|request| request.function_call_output_text(invalid_yield_call_id))
|
||||
.expect("missing no-timeout invalid yield output");
|
||||
assert_eq!(
|
||||
invalid_yield_output,
|
||||
"`no_timeout=true` requires `yield_time_ms` to be omitted."
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn unified_exec_emits_end_event_when_session_dies_via_stdin() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
Reference in New Issue
Block a user