Compare commits

...

1 Commits

Author SHA1 Message Date
Gabriel Peal
54dba45979 --wip-- [skip ci] 2025-06-18 13:11:57 -07:00
5 changed files with 131 additions and 27 deletions

View File

@@ -413,7 +413,12 @@ where
// Nothing aggregated forward Completed directly.
return Poll::Ready(Some(Ok(ResponseEvent::Completed { response_id })));
} // No other `Ok` variants exist at the moment, continue polling.
}
Poll::Ready(Some(Ok(ResponseEvent::Created { .. }))) => {
// These events are exclusive to the Responses API and
// will never appear in a Chat Completions stream.
continue;
}
}
}
}

View File

@@ -141,6 +141,7 @@ impl ModelClient {
.client
.post(&url)
.bearer_auth(api_key)
// TODO: is experimental still needed?
.header("OpenAI-Beta", "responses=experimental")
.header(reqwest::header::ACCEPT, "text/event-stream")
.json(&payload)
@@ -167,7 +168,7 @@ impl ModelClient {
// negligible.
if !(status == StatusCode::TOO_MANY_REQUESTS || status.is_server_error()) {
// Surface the error body to callers. Use `unwrap_or_default` per Clippy.
let body = (res.text().await).unwrap_or_default();
let body = res.text().await.unwrap_or_default();
return Err(CodexErr::UnexpectedStatus(status, body));
}
@@ -197,6 +198,46 @@ impl ModelClient {
}
}
}
/// Cancels an in-progress Responses API request using the
/// `/responses/{id}/cancel` endpoint. This is a **best-effort** helper
/// failures are surfaced to the caller so they can be logged, but should
/// not crash the agent.
pub async fn cancel_response(&self, response_id: &str) -> Result<()> {
if self.provider.wire_api != WireApi::Responses {
// Only the experimental Responses API supports server-side
// cancellations. For Chat completions (and other providers) we
// simply return Ok so callers do not need to special-case.
return Ok(());
}
let base_url = self.provider.base_url.trim_end_matches('/');
let url = format!("{}/responses/{}/cancel", base_url, response_id);
let api_key = self.provider.api_key()?.ok_or_else(|| {
CodexErr::EnvVar(EnvVarError {
var: self.provider.env_key.clone().unwrap_or_default(),
instructions: None,
})
})?;
let res = self
.client
.post(&url)
.bearer_auth(api_key)
.header("OpenAI-Beta", "responses=experimental")
.send()
.await
.map_err(CodexErr::Reqwest)?;
if res.status().is_success() {
Ok(())
} else {
let status = res.status();
let body = res.text().await.unwrap_or_default();
Err(CodexErr::UnexpectedStatus(status, body))
}
}
}
#[derive(Debug, Deserialize, Serialize)]
@@ -208,7 +249,7 @@ struct SseEvent {
}
#[derive(Debug, Deserialize)]
struct ResponseCompleted {
struct ResponseId {
id: String,
}
@@ -274,7 +315,7 @@ where
// duplicated `output` array embedded in the `response.completed`
// payload. That produced two concrete issues:
// 1. No realtime streaming the user only saw output after the
// entire turn had finished, which broke the typing UX and
// entire turn had finished, which broke the "typing" UX and
// made longrunning turns look stalled.
// 2. Duplicate `function_call_output` items both the
// individual *and* the completed array were forwarded, which
@@ -299,7 +340,7 @@ where
// Final response completed includes array of output items & id
"response.completed" => {
if let Some(resp_val) = event.response {
match serde_json::from_value::<ResponseCompleted>(resp_val) {
match serde_json::from_value::<ResponseId>(resp_val) {
Ok(r) => {
response_id = Some(r.id);
}
@@ -310,8 +351,25 @@ where
};
};
}
"response.created" => {
if let Some(resp_val) = event.response {
match serde_json::from_value::<ResponseId>(resp_val) {
Ok(r) => {
let id = r.id;
response_id.get_or_insert(id.clone());
// Forward to downstream so the agent can cancel if needed.
let _ = tx_event
.send(Ok(ResponseEvent::Created { response_id: id }))
.await;
}
Err(e) => {
debug!("failed to parse ResponseCreated: {e}");
continue;
}
}
}
}
"response.content_part.done"
| "response.created"
| "response.function_call_arguments.delta"
| "response.in_progress"
| "response.output_item.added"

View File

@@ -50,8 +50,17 @@ impl Prompt {
#[derive(Debug)]
pub enum ResponseEvent {
/// Emitted when the OpenAI Responses API acknowledges creation of a new in-progress
/// response and provides the response `id`. This variant is currently only emitted
/// when using the experimental Responses API.
/// Chat Completions streams will never produce it.
Created {
response_id: String,
},
OutputItemDone(ResponseItem),
Completed { response_id: String },
Completed {
response_id: String,
},
}
#[derive(Debug, Serialize)]

View File

@@ -188,7 +188,7 @@ pub(crate) struct Session {
/// Optional rollout recorder for persisting the conversation transcript so
/// sessions can be replayed or inspected later.
rollout: Mutex<Option<crate::rollout::RolloutRecorder>>,
rollout: Mutex<Option<RolloutRecorder>>,
state: Mutex<State>,
codex_linux_sandbox_exe: Option<PathBuf>,
}
@@ -207,6 +207,9 @@ struct State {
approved_commands: HashSet<Vec<String>>,
current_task: Option<AgentTask>,
previous_response_id: Option<String>,
/// ID of the in-flight Responses API request (if any). Used to cancel
/// the turn when the user interrupts.
in_progress_response_id: Option<String>,
pending_approvals: HashMap<String, oneshot::Sender<ReviewDecision>>,
pending_input: Vec<ResponseInputItem>,
zdr_transcript: Option<ConversationHistory>,
@@ -403,6 +406,7 @@ impl Session {
arguments: Option<serde_json::Value>,
timeout: Option<Duration>,
) -> anyhow::Result<CallToolResult> {
self.notify_background_event("123", "call_tool");
self.mcp_connection_manager
.call_tool(server, tool, arguments, timeout)
.await
@@ -410,11 +414,30 @@ impl Session {
pub fn abort(&self) {
info!("Aborting existing session");
let mut state = self.state.lock().unwrap();
state.pending_approvals.clear();
state.pending_input.clear();
if let Some(task) = state.current_task.take() {
task.abort();
self.notify_background_event("123", "Aborting existing session");
// Extract any in-progress response ID so we can cancel it outside the
// mutex (to avoid holding the lock across an await).
let in_progress_id_opt = {
let mut state = self.state.lock().unwrap();
state.pending_approvals.clear();
state.pending_input.clear();
let id = state.in_progress_response_id.clone();
if let Some(task) = state.current_task.take() {
task.abort();
}
id
};
if let Some(resp_id) = in_progress_id_opt {
let client = self.client.clone();
tokio::spawn(async move {
if let Err(e) = client.cancel_response(&resp_id).await {
warn!("failed to cancel response {resp_id}: {e:#}");
}
});
}
}
@@ -431,7 +454,7 @@ impl Session {
}
let Ok(json) = serde_json::to_string(&notification) else {
tracing::error!("failed to serialise notification payload");
error!("failed to serialise notification payload");
return;
};
@@ -443,7 +466,7 @@ impl Session {
// Fire-and-forget we do not wait for completion.
if let Err(e) = command.spawn() {
tracing::warn!("failed to spawn notifier '{}': {e}", notify_command[0]);
warn!("failed to spawn notifier '{}': {e}", notify_command[0]);
}
}
}
@@ -459,6 +482,7 @@ impl State {
Self {
approved_commands: self.approved_commands.clone(),
previous_response_id: self.previous_response_id.clone(),
in_progress_response_id: self.in_progress_response_id.clone(),
zdr_transcript: if retain_zdr_transcript {
self.zdr_transcript.clone()
} else {
@@ -647,7 +671,7 @@ async fn submission_loop(
match RolloutRecorder::new(&config, session_id, instructions.clone()).await {
Ok(r) => Some(r),
Err(e) => {
tracing::warn!("failed to initialise rollout recorder: {e}");
warn!("failed to initialise rollout recorder: {e}");
None
}
};
@@ -742,7 +766,7 @@ async fn submission_loop(
tokio::spawn(async move {
if let Err(e) = crate::message_history::append_entry(&text, &id, &config).await
{
tracing::warn!("failed to append to message history: {e}");
warn!("failed to append to message history: {e}");
}
});
}
@@ -772,7 +796,7 @@ async fn submission_loop(
};
if let Err(e) = tx_event.send(event).await {
tracing::warn!("failed to send GetHistoryEntryResponse event: {e}");
warn!("failed to send GetHistoryEntryResponse event: {e}");
}
});
}
@@ -1074,6 +1098,12 @@ async fn try_run_turn(
let mut output = Vec::new();
for event in input {
match event {
ResponseEvent::Created { response_id } => {
// Track the in-flight response so we can cancel it if the
// user interrupts.
let mut state = sess.state.lock().unwrap();
state.in_progress_response_id = Some(response_id);
}
ResponseEvent::OutputItemDone(item) => {
let response = handle_response_item(sess, sub_id, item.clone()).await?;
output.push(ProcessedResponseItem { item, response });
@@ -1081,6 +1111,8 @@ async fn try_run_turn(
ResponseEvent::Completed { response_id } => {
let mut state = sess.state.lock().unwrap();
state.previous_response_id = Some(response_id);
// Clear in-progress ID the turn finished successfully.
state.in_progress_response_id = None;
break;
}
}
@@ -1125,7 +1157,7 @@ async fn handle_response_item(
arguments,
call_id,
} => {
tracing::info!("FunctionCall: {arguments}");
info!("FunctionCall: {arguments}");
Some(handle_function_call(sess, sub_id.to_string(), name, arguments, call_id).await)
}
ResponseItem::LocalShellCall {
@@ -1135,7 +1167,7 @@ async fn handle_response_item(
action,
} => {
let LocalShellAction::Exec(action) = action;
tracing::info!("LocalShellCall: {action:?}");
info!("LocalShellCall: {action:?}");
let params = ShellToolCallParams {
command: action.command,
workdir: action.working_directory,
@@ -1207,7 +1239,7 @@ async fn handle_function_call(
// Unknown function: reply with structured failure so the model can adapt.
ResponseInputItem::FunctionCallOutput {
call_id,
output: crate::models::FunctionCallOutputPayload {
output: FunctionCallOutputPayload {
content: format!("unsupported call: {}", name),
success: None,
},
@@ -1239,7 +1271,7 @@ fn parse_container_exec_arguments(
// allow model to re-sample
let output = ResponseInputItem::FunctionCallOutput {
call_id: call_id.to_string(),
output: crate::models::FunctionCallOutputPayload {
output: FunctionCallOutputPayload {
content: format!("failed to parse function arguments: {e}"),
success: None,
},
@@ -1307,7 +1339,7 @@ async fn handle_container_exec_with_params(
ReviewDecision::Denied | ReviewDecision::Abort => {
return ResponseInputItem::FunctionCallOutput {
call_id,
output: crate::models::FunctionCallOutputPayload {
output: FunctionCallOutputPayload {
content: "exec command rejected by user".to_string(),
success: None,
},
@@ -1323,7 +1355,7 @@ async fn handle_container_exec_with_params(
SafetyCheck::Reject { reason } => {
return ResponseInputItem::FunctionCallOutput {
call_id,
output: crate::models::FunctionCallOutputPayload {
output: FunctionCallOutputPayload {
content: format!("exec command rejected: {reason}"),
success: None,
},
@@ -1850,7 +1882,7 @@ fn apply_changes_from_apply_patch(action: &ApplyPatchAction) -> anyhow::Result<A
})
}
fn get_writable_roots(cwd: &Path) -> Vec<std::path::PathBuf> {
fn get_writable_roots(cwd: &Path) -> Vec<PathBuf> {
let mut writable_roots = Vec::new();
if cfg!(target_os = "macos") {
// On macOS, $TMPDIR is private to the user.
@@ -1878,7 +1910,7 @@ fn get_writable_roots(cwd: &Path) -> Vec<std::path::PathBuf> {
}
/// Exec output is a pre-serialized JSON payload
fn format_exec_output(output: &str, exit_code: i32, duration: std::time::Duration) -> String {
fn format_exec_output(output: &str, exit_code: i32, duration: Duration) -> String {
#[derive(Serialize)]
struct ExecMetadata {
exit_code: i32,

View File

@@ -108,7 +108,7 @@ impl WidgetRef for StatusIndicatorWidget {
// Animated 3dot pattern inside brackets. The *active* dot is bold
// white, the others are dim.
const DOT_COUNT: usize = 3;
let idx = self.frame_idx.load(std::sync::atomic::Ordering::Relaxed);
let idx = self.frame_idx.load(Ordering::Relaxed);
let phase = idx % (DOT_COUNT * 2 - 2);
let active = if phase < DOT_COUNT {
phase