agentydragon(tasks): mark in-progress and plan buffer guard in chat_completions

This commit is contained in:
Rai (Michael Pokorny)
2025-06-24 20:53:17 -07:00
parent 6266f49fc8
commit 801eeb5841
2 changed files with 44 additions and 29 deletions

View File

@@ -1,7 +1,7 @@
+++
id = "25"
title = "Guard Against Missing Tool Output in Rust Server Sequencing"
status = "Not started"
status = "In progress"
dependencies = "03,06,08,13,15,32,18,19,22,23"
last_updated = "2025-06-25T01:40:09.600000"
+++
@@ -20,12 +20,17 @@ Ensure the Rust server implementation sequences tool output and chat messages co
## Implementation
**How it was implemented**
- Introduce a pending-invocation registry (`HashMap<InvocationId, PendingState>`) in the Rust message pipeline.
- Modify `handle_user_message` and `handle_model_event` in the broker to check for unresolved pending invocations and enqueue incoming events accordingly.
- On receiving the corresponding tool output or tool abort event, dequeue and dispatch any buffered messages in order.
- Implement a timeout or explicit cancel path to avoid stuck invocations in case of unresponsive tools.
- Extend the Rust test suite (e.g. in `broker/tests/`) with scenarios covering normal, aborted, and concurrent messages.
We will implement the following high-level plan:
- Locate where the ChatCompletion request messages array is built in Rust:
the `stream_chat_completions` function in `codex-rs/core/src/chat_completions.rs`.
- In that loop, track pending tool invocations by their call IDs when encountering `ResponseItem::FunctionCall` entries.
- Buffer any subsequent `ResponseItem::Message { role: "user" }` or new turn inputs until the matching `ResponseItem::FunctionCallOutput` (tool result) appears.
- Once the tool output is seen, flush buffered user messages in order immediately before continuing to build the next API call.
- Add tests under `codex-rs/core/tests/` (e.g. `guard_tool_output_sequencing.rs`) that exercise interleaved input sequences:
- A user message mid-rollout before tool output, ensuring it is delayed until after the tool result.
- Normal flow where no buffering is needed.
- Cancellation or error-result paths that also trigger a flush of buffered messages.
## Notes

View File

@@ -35,57 +35,60 @@ pub(crate) async fn stream_chat_completions(
client: &reqwest::Client,
provider: &ModelProviderInfo,
) -> Result<ResponseStream> {
// Build messages array
// Build messages array, buffering user turns that arrive mid-tool invocation
let mut messages = Vec::<serde_json::Value>::new();
let mut pending_call: Option<String> = None;
let mut buf_user: Vec<serde_json::Value> = Vec::new();
let full_instructions = prompt.get_full_instructions(model);
messages.push(json!({"role": "system", "content": full_instructions}));
for item in &prompt.input {
match item {
ResponseItem::Message { role, content } if role == "user" && pending_call.is_some() => {
// Buffer user message until pending tool result arrives
let mut text = String::new();
for c in content {
if let ContentItem::InputText { text: t } = c {
text.push_str(t);
}
}
buf_user.push(json!({"role": "user", "content": text}));
}
ResponseItem::Message { role, content } => {
let mut text = String::new();
for c in content {
match c {
ContentItem::InputText { text: t }
| ContentItem::OutputText { text: t } => {
text.push_str(t);
}
| ContentItem::OutputText { text: t } => text.push_str(t),
_ => {}
}
}
messages.push(json!({"role": role, "content": text}));
}
ResponseItem::FunctionCall {
name,
arguments,
call_id,
} => {
ResponseItem::FunctionCall { name, arguments, call_id } => {
// Mark tool invocation in-flight
pending_call = Some(call_id.clone());
messages.push(json!({
"role": "assistant",
"content": null,
"tool_calls": [{
"id": call_id,
"type": "function",
"function": {
"name": name,
"arguments": arguments,
}
"function": { "name": name, "arguments": arguments }
}]
}));
}
ResponseItem::LocalShellCall {
id,
call_id: _,
status,
action,
} => {
// Confirm with API team.
ResponseItem::LocalShellCall { id, status, action, .. } => {
// Mark shell-call invocation in-flight by id
if let Some(call_id) = id {
pending_call = Some(call_id.clone());
}
messages.push(json!({
"role": "assistant",
"content": null,
"tool_calls": [{
"id": id.clone().unwrap_or_else(|| "".to_string()),
"id": id.clone().unwrap_or_default(),
"type": "local_shell_call",
"status": status,
"action": action,
@@ -98,9 +101,16 @@ pub(crate) async fn stream_chat_completions(
"tool_call_id": call_id,
"content": output.content,
}));
if pending_call.as_ref() == Some(call_id) {
// Flush any buffered user messages now that tool result arrived
pending_call = None;
for msg in buf_user.drain(..) {
messages.push(msg);
}
}
}
ResponseItem::Reasoning { .. } | ResponseItem::Other => {
// Omit these items from the conversation history.
// Skip these items in the conversation history.
continue;
}
}