mirror of
https://github.com/openai/codex.git
synced 2026-02-08 01:43:46 +00:00
Compare commits
46 Commits
streaming
...
stream-con
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3b90ca959b | ||
|
|
c6cfdf705c | ||
|
|
622a84f4ba | ||
|
|
34704ff055 | ||
|
|
e0303dbac0 | ||
|
|
d31e149cb1 | ||
|
|
136b3ee5bf | ||
|
|
0bf33f7359 | ||
|
|
0c9d8f13e5 | ||
|
|
4259e5787f | ||
|
|
fcdb1c4b4d | ||
|
|
c85c6dfccd | ||
|
|
c1e9083cbd | ||
|
|
f68bf94db1 | ||
|
|
e054715bea | ||
|
|
c72fe752cc | ||
|
|
985c97985b | ||
|
|
7dec04ae4f | ||
|
|
c182126bca | ||
|
|
a362ad00ce | ||
|
|
c515d2869e | ||
|
|
bfbe523f81 | ||
|
|
95423b26d7 | ||
|
|
5bab2bd2f8 | ||
|
|
1294def888 | ||
|
|
ab70497539 | ||
|
|
2a40d07a06 | ||
|
|
2e07f4b033 | ||
|
|
324926e240 | ||
|
|
9805ad1fbc | ||
|
|
792efc990c | ||
|
|
ec6a4f9e2a | ||
|
|
c01b9d2d2a | ||
|
|
d5efc45869 | ||
|
|
dbcb9e7ca6 | ||
|
|
8d413194f3 | ||
|
|
19d3e17572 | ||
|
|
a5b3c151ac | ||
|
|
0110749efa | ||
|
|
bea4a5358a | ||
|
|
4c13829e8b | ||
|
|
5ccd02b0fe | ||
|
|
21c334ae54 | ||
|
|
66ea94f723 | ||
|
|
ae6becc58d | ||
|
|
3a456c1fbb |
4
.vscode/settings.json
vendored
4
.vscode/settings.json
vendored
@@ -11,6 +11,8 @@
|
||||
"editor.defaultFormatter": "tamasfe.even-better-toml",
|
||||
"editor.formatOnSave": true,
|
||||
},
|
||||
"evenBetterToml.formatter.reorderArrays": true,
|
||||
// Array order for options in ~/.codex/config.toml such as `notify` and the
|
||||
// `args` for an MCP server is significant, so we disable reordering.
|
||||
"evenBetterToml.formatter.reorderArrays": false,
|
||||
"evenBetterToml.formatter.reorderKeys": true,
|
||||
}
|
||||
|
||||
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -831,6 +831,7 @@ dependencies = [
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tokio-test",
|
||||
"tokio-util",
|
||||
"toml 0.9.4",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
|
||||
@@ -483,6 +483,19 @@ Setting `hide_agent_reasoning` to `true` suppresses these events in **both** the
|
||||
hide_agent_reasoning = true # defaults to false
|
||||
```
|
||||
|
||||
## show_raw_agent_reasoning
|
||||
|
||||
Surfaces the model’s raw chain-of-thought ("raw reasoning content") when available.
|
||||
|
||||
Notes:
|
||||
- Only takes effect if the selected model/provider actually emits raw reasoning content. Many models do not. When unsupported, this option has no visible effect.
|
||||
- Raw reasoning may include intermediate thoughts or sensitive context. Enable only if acceptable for your workflow.
|
||||
|
||||
Example:
|
||||
```toml
|
||||
show_raw_agent_reasoning = true # defaults to false
|
||||
```
|
||||
|
||||
## model_context_window
|
||||
|
||||
The size of the context window for the model, in tokens.
|
||||
|
||||
@@ -1,8 +1,21 @@
|
||||
Please resolve the user's task by editing and testing the code files in your current code execution session.
|
||||
You are a deployed coding agent.
|
||||
Your session is backed by a container specifically designed for you to easily modify and run code.
|
||||
The repo(s) are already cloned in your working directory, and you must fully solve the problem for your answer to be considered correct.
|
||||
You are operating as and within the Codex CLI, an open-source, terminal-based agentic coding assistant built by OpenAI. It wraps OpenAI models to enable natural language interaction with a local codebase. You are expected to be precise, safe, and helpful.
|
||||
|
||||
Your capabilities:
|
||||
- Receive user prompts, project context, and files.
|
||||
- Stream responses and emit function calls (e.g., shell commands, code edits).
|
||||
- Run commands, like apply_patch, and manage user approvals based on policy.
|
||||
- Work inside a workspace with sandboxing instructions specified by the policy described in (## Sandbox environment and approval instructions)
|
||||
|
||||
Within this context, Codex refers to the open-source agentic coding interface (not the old Codex language model built by OpenAI).
|
||||
|
||||
## General guidelines
|
||||
As a deployed coding agent, please continue working on the user's task until their query is resolved, before ending your turn and yielding back to the user. Only terminate your turn when you are sure that the task is solved. If you are not sure about file content or codebase structure pertaining to the user's request, use your tools to read files and gather the relevant information. Do NOT guess or make up an answer.
|
||||
|
||||
After a user sends their first message, you should immediately provide a brief message acknowledging their request to set the tone and expectation of future work to be done (no more than 8-10 words). This should be done before performing work like exploring the codebase, writing or reading files, or other tool calls needed to complete the task. Use a natural, collaborative tone similar to how a teammate would receive a task during a pair programming session.
|
||||
|
||||
Please resolve the user's task by editing the code files in your current code execution session. Your session allows for you to modify and run code. The repo(s) are already cloned in your working directory, and you must fully solve the problem for your answer to be considered correct.
|
||||
|
||||
### Task execution
|
||||
You MUST adhere to the following criteria when executing the task:
|
||||
|
||||
- Working on the repo(s) in the current environment is allowed, even if they are proprietary.
|
||||
@@ -12,7 +25,7 @@ You MUST adhere to the following criteria when executing the task:
|
||||
- `user_instructions` are not part of the user's request, but guidance for how to complete the task.
|
||||
- Do not cite `user_instructions` back to the user unless a specific piece is relevant.
|
||||
- Do not use \`ls -R\`, \`find\`, or \`grep\` - these are slow in large repos. Use \`rg\` and \`rg --files\`.
|
||||
- Use \`apply_patch\` to edit files: {"command":["apply_patch","*** Begin Patch\\n*** Update File: path/to/file.py\\n@@ def example():\\n- pass\\n+ return 123\\n*** End Patch"]}
|
||||
- Use the \`apply_patch\` shell command to edit files: {"command":["apply_patch","*** Begin Patch\\n*** Update File: path/to/file.py\\n@@ def example():\\n- pass\\n+ return 123\\n*** End Patch"]}
|
||||
- If completing the user's task requires writing or modifying files:
|
||||
- Your code and final answer should follow these _CODING GUIDELINES_:
|
||||
- Fix the problem at the root cause rather than applying surface-level patches, when possible.
|
||||
@@ -35,12 +48,11 @@ You MUST adhere to the following criteria when executing the task:
|
||||
- If completing the user's task DOES NOT require writing or modifying files (e.g., the user asks a question about the code base):
|
||||
- Respond in a friendly tune as a remote teammate, who is knowledgeable, capable and eager to help with coding.
|
||||
- When your task involves writing or modifying files:
|
||||
- Do NOT tell the user to "save the file" or "copy the code into a file" if you already created or modified the file using \`apply_patch\`. Instead, reference the file as already saved.
|
||||
- Do NOT tell the user to "save the file" or "copy the code into a file" if you already created or modified the file using the `apply_patch` shell command. Instead, reference the file as already saved.
|
||||
- Do NOT show the full contents of large files you have already written, unless the user explicitly asks for them.
|
||||
|
||||
§ `apply-patch` Specification
|
||||
|
||||
Your patch language is a stripped‑down, file‑oriented diff format designed to be easy to parse and safe to apply. You can think of it as a high‑level envelope:
|
||||
## Using the shell command `apply_patch` to edit files
|
||||
`apply_patch` is a shell command for editing files. Your patch language is a stripped‑down, file‑oriented diff format designed to be easy to parse and safe to apply. You can think of it as a high‑level envelope:
|
||||
|
||||
*** Begin Patch
|
||||
[ one or more file sections ]
|
||||
@@ -92,14 +104,28 @@ It is important to remember:
|
||||
|
||||
- You must include a header with your intended action (Add/Delete/Update)
|
||||
- You must prefix new lines with `+` even when creating a new file
|
||||
- You must follow this schema exactly when providing a patch
|
||||
|
||||
You can invoke apply_patch like:
|
||||
You can invoke apply_patch with the following shell command:
|
||||
|
||||
```
|
||||
shell {"command":["apply_patch","*** Begin Patch\n*** Add File: hello.txt\n+Hello, world!\n*** End Patch\n"]}
|
||||
```
|
||||
|
||||
Plan updates
|
||||
## Sandbox environment and approval instructions
|
||||
|
||||
You are running in a sandboxed workspace backed by version control. The sandbox might be configured by the user to restrict certain behaviors, like accessing the internet or writing to files outside the current directory.
|
||||
|
||||
Commands that are blocked by sandbox settings will be automatically sent to the user for approval. The result of the request will be returned (i.e. the command result, or the request denial).
|
||||
The user also has an opportunity to approve the same command for the rest of the session.
|
||||
|
||||
Guidance on running within the sandbox:
|
||||
- When running commands that will likely require approval, attempt to use simple, precise commands, to reduce frequency of approval requests.
|
||||
- When approval is denied or a command fails due to a permission error, do not retry the exact command in a different way. Move on and continue trying to address the user's request.
|
||||
|
||||
|
||||
## Tools available
|
||||
### Plan updates
|
||||
|
||||
A tool named `update_plan` is available. Use it to keep an up‑to‑date, step‑by‑step plan for the task so you can follow your progress. When making your plans, keep in mind that you are a deployed coding agent - `update_plan` calls should not involve doing anything that you aren't capable of doing. For example, `update_plan` calls should NEVER contain tasks to merge your own pull requests. Only stop to ask the user if you genuinely need their feedback on a change.
|
||||
|
||||
@@ -107,3 +133,4 @@ A tool named `update_plan` is available. Use it to keep an up‑to‑date, step
|
||||
- Whenever you finish a step, call `update_plan` again, marking the finished step as `completed` and the next step as `in_progress`.
|
||||
- If your plan needs to change, call `update_plan` with the revised steps and include an `explanation` describing the change.
|
||||
- When all steps are complete, make a final `update_plan` call with all steps marked `completed`.
|
||||
|
||||
|
||||
@@ -21,7 +21,9 @@ use crate::client_common::ResponseEvent;
|
||||
use crate::client_common::ResponseStream;
|
||||
use crate::error::CodexErr;
|
||||
use crate::error::Result;
|
||||
use crate::model_family::ModelFamily;
|
||||
use crate::models::ContentItem;
|
||||
use crate::models::ReasoningItemContent;
|
||||
use crate::models::ResponseItem;
|
||||
use crate::openai_tools::create_tools_json_for_chat_completions_api;
|
||||
use crate::util::backoff;
|
||||
@@ -29,7 +31,7 @@ use crate::util::backoff;
|
||||
/// Implementation for the classic Chat Completions API.
|
||||
pub(crate) async fn stream_chat_completions(
|
||||
prompt: &Prompt,
|
||||
model: &str,
|
||||
model_family: &ModelFamily,
|
||||
include_plan_tool: bool,
|
||||
client: &reqwest::Client,
|
||||
provider: &ModelProviderInfo,
|
||||
@@ -37,7 +39,7 @@ pub(crate) async fn stream_chat_completions(
|
||||
// Build messages array
|
||||
let mut messages = Vec::<serde_json::Value>::new();
|
||||
|
||||
let full_instructions = prompt.get_full_instructions(model);
|
||||
let full_instructions = prompt.get_full_instructions(model_family);
|
||||
messages.push(json!({"role": "system", "content": full_instructions}));
|
||||
|
||||
if let Some(instr) = &prompt.get_formatted_user_instructions() {
|
||||
@@ -110,9 +112,10 @@ pub(crate) async fn stream_chat_completions(
|
||||
}
|
||||
}
|
||||
|
||||
let tools_json = create_tools_json_for_chat_completions_api(prompt, model, include_plan_tool)?;
|
||||
let tools_json =
|
||||
create_tools_json_for_chat_completions_api(prompt, model_family, include_plan_tool)?;
|
||||
let payload = json!({
|
||||
"model": model,
|
||||
"model": model_family.slug,
|
||||
"messages": messages,
|
||||
"stream": true,
|
||||
"tools": tools_json,
|
||||
@@ -207,6 +210,8 @@ async fn process_chat_sse<S>(
|
||||
}
|
||||
|
||||
let mut fn_call_state = FunctionCallState::default();
|
||||
let mut assistant_text = String::new();
|
||||
let mut reasoning_text = String::new();
|
||||
|
||||
loop {
|
||||
let sse = match timeout(idle_timeout, stream.next()).await {
|
||||
@@ -235,6 +240,31 @@ async fn process_chat_sse<S>(
|
||||
|
||||
// OpenAI Chat streaming sends a literal string "[DONE]" when finished.
|
||||
if sse.data.trim() == "[DONE]" {
|
||||
// Emit any finalized items before closing so downstream consumers receive
|
||||
// terminal events for both assistant content and raw reasoning.
|
||||
if !assistant_text.is_empty() {
|
||||
let item = ResponseItem::Message {
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: std::mem::take(&mut assistant_text),
|
||||
}],
|
||||
id: None,
|
||||
};
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
}
|
||||
|
||||
if !reasoning_text.is_empty() {
|
||||
let item = ResponseItem::Reasoning {
|
||||
id: String::new(),
|
||||
summary: Vec::new(),
|
||||
content: Some(vec![ReasoningItemContent::ReasoningText {
|
||||
text: std::mem::take(&mut reasoning_text),
|
||||
}]),
|
||||
encrypted_content: None,
|
||||
};
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
}
|
||||
|
||||
let _ = tx_event
|
||||
.send(Ok(ResponseEvent::Completed {
|
||||
response_id: String::new(),
|
||||
@@ -254,26 +284,47 @@ async fn process_chat_sse<S>(
|
||||
let choice_opt = chunk.get("choices").and_then(|c| c.get(0));
|
||||
|
||||
if let Some(choice) = choice_opt {
|
||||
// Handle assistant content tokens.
|
||||
// Handle assistant content tokens as streaming deltas.
|
||||
if let Some(content) = choice
|
||||
.get("delta")
|
||||
.and_then(|d| d.get("content"))
|
||||
.and_then(|c| c.as_str())
|
||||
{
|
||||
// Emit a delta so downstream consumers can stream text live.
|
||||
let _ = tx_event
|
||||
.send(Ok(ResponseEvent::OutputTextDelta(content.to_string())))
|
||||
.await;
|
||||
if !content.is_empty() {
|
||||
assistant_text.push_str(content);
|
||||
let _ = tx_event
|
||||
.send(Ok(ResponseEvent::OutputTextDelta(content.to_string())))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
let item = ResponseItem::Message {
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: content.to_string(),
|
||||
}],
|
||||
id: None,
|
||||
};
|
||||
// Forward any reasoning/thinking deltas if present.
|
||||
// Some providers stream `reasoning` as a plain string while others
|
||||
// nest the text under an object (e.g. `{ "reasoning": { "text": "…" } }`).
|
||||
if let Some(reasoning_val) = choice.get("delta").and_then(|d| d.get("reasoning")) {
|
||||
let mut maybe_text = reasoning_val.as_str().map(|s| s.to_string());
|
||||
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
if maybe_text.is_none() && reasoning_val.is_object() {
|
||||
if let Some(s) = reasoning_val
|
||||
.get("text")
|
||||
.and_then(|t| t.as_str())
|
||||
.filter(|s| !s.is_empty())
|
||||
{
|
||||
maybe_text = Some(s.to_string());
|
||||
} else if let Some(s) = reasoning_val
|
||||
.get("content")
|
||||
.and_then(|t| t.as_str())
|
||||
.filter(|s| !s.is_empty())
|
||||
{
|
||||
maybe_text = Some(s.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(reasoning) = maybe_text {
|
||||
let _ = tx_event
|
||||
.send(Ok(ResponseEvent::ReasoningContentDelta(reasoning)))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
// Handle streaming function / tool calls.
|
||||
@@ -310,7 +361,21 @@ async fn process_chat_sse<S>(
|
||||
if let Some(finish_reason) = choice.get("finish_reason").and_then(|v| v.as_str()) {
|
||||
match finish_reason {
|
||||
"tool_calls" if fn_call_state.active => {
|
||||
// Build the FunctionCall response item.
|
||||
// First, flush the terminal raw reasoning so UIs can finalize
|
||||
// the reasoning stream before any exec/tool events begin.
|
||||
if !reasoning_text.is_empty() {
|
||||
let item = ResponseItem::Reasoning {
|
||||
id: String::new(),
|
||||
summary: Vec::new(),
|
||||
content: Some(vec![ReasoningItemContent::ReasoningText {
|
||||
text: std::mem::take(&mut reasoning_text),
|
||||
}]),
|
||||
encrypted_content: None,
|
||||
};
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
}
|
||||
|
||||
// Then emit the FunctionCall response item.
|
||||
let item = ResponseItem::FunctionCall {
|
||||
id: None,
|
||||
name: fn_call_state.name.clone().unwrap_or_else(|| "".to_string()),
|
||||
@@ -318,11 +383,33 @@ async fn process_chat_sse<S>(
|
||||
call_id: fn_call_state.call_id.clone().unwrap_or_else(String::new),
|
||||
};
|
||||
|
||||
// Emit it downstream.
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
}
|
||||
"stop" => {
|
||||
// Regular turn without tool-call.
|
||||
// Regular turn without tool-call. Emit the final assistant message
|
||||
// as a single OutputItemDone so non-delta consumers see the result.
|
||||
if !assistant_text.is_empty() {
|
||||
let item = ResponseItem::Message {
|
||||
role: "assistant".to_string(),
|
||||
content: vec![ContentItem::OutputText {
|
||||
text: std::mem::take(&mut assistant_text),
|
||||
}],
|
||||
id: None,
|
||||
};
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
}
|
||||
// Also emit a terminal Reasoning item so UIs can finalize raw reasoning.
|
||||
if !reasoning_text.is_empty() {
|
||||
let item = ResponseItem::Reasoning {
|
||||
id: String::new(),
|
||||
summary: Vec::new(),
|
||||
content: Some(vec![ReasoningItemContent::ReasoningText {
|
||||
text: std::mem::take(&mut reasoning_text),
|
||||
}]),
|
||||
encrypted_content: None,
|
||||
};
|
||||
let _ = tx_event.send(Ok(ResponseEvent::OutputItemDone(item))).await;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
@@ -360,10 +447,17 @@ async fn process_chat_sse<S>(
|
||||
/// The adapter is intentionally *lossless*: callers who do **not** opt in via
|
||||
/// [`AggregateStreamExt::aggregate()`] keep receiving the original unmodified
|
||||
/// events.
|
||||
#[derive(Copy, Clone, Eq, PartialEq)]
|
||||
enum AggregateMode {
|
||||
AggregatedOnly,
|
||||
Streaming,
|
||||
}
|
||||
pub(crate) struct AggregatedChatStream<S> {
|
||||
inner: S,
|
||||
cumulative: String,
|
||||
pending_completed: Option<ResponseEvent>,
|
||||
cumulative_reasoning: String,
|
||||
pending: std::collections::VecDeque<ResponseEvent>,
|
||||
mode: AggregateMode,
|
||||
}
|
||||
|
||||
impl<S> Stream for AggregatedChatStream<S>
|
||||
@@ -375,8 +469,8 @@ where
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.get_mut();
|
||||
|
||||
// First, flush any buffered Completed event from the previous call.
|
||||
if let Some(ev) = this.pending_completed.take() {
|
||||
// First, flush any buffered events from the previous call.
|
||||
if let Some(ev) = this.pending.pop_front() {
|
||||
return Poll::Ready(Some(Ok(ev)));
|
||||
}
|
||||
|
||||
@@ -393,16 +487,21 @@ where
|
||||
let is_assistant_delta = matches!(&item, crate::models::ResponseItem::Message { role, .. } if role == "assistant");
|
||||
|
||||
if is_assistant_delta {
|
||||
if let crate::models::ResponseItem::Message { content, .. } = &item {
|
||||
if let Some(text) = content.iter().find_map(|c| match c {
|
||||
crate::models::ContentItem::OutputText { text } => Some(text),
|
||||
_ => None,
|
||||
}) {
|
||||
this.cumulative.push_str(text);
|
||||
// Only use the final assistant message if we have not
|
||||
// seen any deltas; otherwise, deltas already built the
|
||||
// cumulative text and this would duplicate it.
|
||||
if this.cumulative.is_empty() {
|
||||
if let crate::models::ResponseItem::Message { content, .. } = &item {
|
||||
if let Some(text) = content.iter().find_map(|c| match c {
|
||||
crate::models::ContentItem::OutputText { text } => Some(text),
|
||||
_ => None,
|
||||
}) {
|
||||
this.cumulative.push_str(text);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Swallow partial assistant chunk; keep polling.
|
||||
// Swallow assistant message here; emit on Completed.
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -413,24 +512,50 @@ where
|
||||
response_id,
|
||||
token_usage,
|
||||
}))) => {
|
||||
// Build any aggregated items in the correct order: Reasoning first, then Message.
|
||||
let mut emitted_any = false;
|
||||
|
||||
if !this.cumulative_reasoning.is_empty()
|
||||
&& matches!(this.mode, AggregateMode::AggregatedOnly)
|
||||
{
|
||||
let aggregated_reasoning = crate::models::ResponseItem::Reasoning {
|
||||
id: String::new(),
|
||||
summary: Vec::new(),
|
||||
content: Some(vec![
|
||||
crate::models::ReasoningItemContent::ReasoningText {
|
||||
text: std::mem::take(&mut this.cumulative_reasoning),
|
||||
},
|
||||
]),
|
||||
encrypted_content: None,
|
||||
};
|
||||
this.pending
|
||||
.push_back(ResponseEvent::OutputItemDone(aggregated_reasoning));
|
||||
emitted_any = true;
|
||||
}
|
||||
|
||||
if !this.cumulative.is_empty() {
|
||||
let aggregated_item = crate::models::ResponseItem::Message {
|
||||
let aggregated_message = crate::models::ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![crate::models::ContentItem::OutputText {
|
||||
text: std::mem::take(&mut this.cumulative),
|
||||
}],
|
||||
};
|
||||
this.pending
|
||||
.push_back(ResponseEvent::OutputItemDone(aggregated_message));
|
||||
emitted_any = true;
|
||||
}
|
||||
|
||||
// Buffer Completed so it is returned *after* the aggregated message.
|
||||
this.pending_completed = Some(ResponseEvent::Completed {
|
||||
response_id,
|
||||
token_usage,
|
||||
// Always emit Completed last when anything was aggregated.
|
||||
if emitted_any {
|
||||
this.pending.push_back(ResponseEvent::Completed {
|
||||
response_id: response_id.clone(),
|
||||
token_usage: token_usage.clone(),
|
||||
});
|
||||
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::OutputItemDone(
|
||||
aggregated_item,
|
||||
))));
|
||||
// Return the first pending event now.
|
||||
if let Some(ev) = this.pending.pop_front() {
|
||||
return Poll::Ready(Some(Ok(ev)));
|
||||
}
|
||||
}
|
||||
|
||||
// Nothing aggregated – forward Completed directly.
|
||||
@@ -445,13 +570,27 @@ where
|
||||
continue;
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta)))) => {
|
||||
// Forward deltas unchanged so callers can stream text
|
||||
// live while still receiving a single aggregated
|
||||
// OutputItemDone at the end of the turn.
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta))));
|
||||
// Always accumulate deltas so we can emit a final OutputItemDone at Completed.
|
||||
this.cumulative.push_str(&delta);
|
||||
if matches!(this.mode, AggregateMode::Streaming) {
|
||||
// In streaming mode, also forward the delta immediately.
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::OutputTextDelta(delta))));
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(delta)))) => {
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(delta))));
|
||||
Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta(delta)))) => {
|
||||
// Always accumulate reasoning deltas so we can emit a final Reasoning item at Completed.
|
||||
this.cumulative_reasoning.push_str(&delta);
|
||||
if matches!(this.mode, AggregateMode::Streaming) {
|
||||
// In streaming mode, also forward the delta immediately.
|
||||
return Poll::Ready(Some(Ok(ResponseEvent::ReasoningContentDelta(delta))));
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Poll::Ready(Some(Ok(ResponseEvent::ReasoningSummaryDelta(_)))) => {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -480,12 +619,24 @@ pub(crate) trait AggregateStreamExt: Stream<Item = Result<ResponseEvent>> + Size
|
||||
/// }
|
||||
/// ```
|
||||
fn aggregate(self) -> AggregatedChatStream<Self> {
|
||||
AggregatedChatStream {
|
||||
inner: self,
|
||||
cumulative: String::new(),
|
||||
pending_completed: None,
|
||||
}
|
||||
AggregatedChatStream::new(self, AggregateMode::AggregatedOnly)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AggregateStreamExt for T where T: Stream<Item = Result<ResponseEvent>> + Sized {}
|
||||
|
||||
impl<S> AggregatedChatStream<S> {
|
||||
fn new(inner: S, mode: AggregateMode) -> Self {
|
||||
AggregatedChatStream {
|
||||
inner,
|
||||
cumulative: String::new(),
|
||||
cumulative_reasoning: String::new(),
|
||||
pending: std::collections::VecDeque::new(),
|
||||
mode,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn streaming_mode(inner: S) -> Self {
|
||||
Self::new(inner, AggregateMode::Streaming)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -82,7 +82,7 @@ impl ModelClient {
|
||||
// Create the raw streaming connection first.
|
||||
let response_stream = stream_chat_completions(
|
||||
prompt,
|
||||
&self.config.model,
|
||||
&self.config.model_family,
|
||||
self.config.include_plan_tool,
|
||||
&self.client,
|
||||
&self.provider,
|
||||
@@ -92,7 +92,11 @@ impl ModelClient {
|
||||
// Wrap it with the aggregation adapter so callers see *only*
|
||||
// the final assistant message per turn (matching the
|
||||
// behaviour of the Responses API).
|
||||
let mut aggregated = response_stream.aggregate();
|
||||
let mut aggregated = if self.config.show_raw_agent_reasoning {
|
||||
crate::chat_completions::AggregatedChatStream::streaming_mode(response_stream)
|
||||
} else {
|
||||
response_stream.aggregate()
|
||||
};
|
||||
|
||||
// Bridge the aggregated stream back into a standard
|
||||
// `ResponseStream` by forwarding events through a channel.
|
||||
@@ -127,13 +131,17 @@ impl ModelClient {
|
||||
|
||||
let store = prompt.store && auth_mode != Some(AuthMode::ChatGPT);
|
||||
|
||||
let full_instructions = prompt.get_full_instructions(&self.config.model);
|
||||
let full_instructions = prompt.get_full_instructions(&self.config.model_family);
|
||||
let tools_json = create_tools_json_for_responses_api(
|
||||
prompt,
|
||||
&self.config.model,
|
||||
&self.config.model_family,
|
||||
self.config.include_plan_tool,
|
||||
)?;
|
||||
let reasoning = create_reasoning_param_for_request(&self.config, self.effort, self.summary);
|
||||
let reasoning = create_reasoning_param_for_request(
|
||||
&self.config.model_family,
|
||||
self.effort,
|
||||
self.summary,
|
||||
);
|
||||
|
||||
// Request encrypted COT if we are not storing responses,
|
||||
// otherwise reasoning items will be referenced by ID
|
||||
@@ -433,6 +441,14 @@ async fn process_sse<S>(
|
||||
}
|
||||
}
|
||||
}
|
||||
"response.reasoning_text.delta" => {
|
||||
if let Some(delta) = event.delta {
|
||||
let event = ResponseEvent::ReasoningContentDelta(delta);
|
||||
if tx_event.send(Ok(event)).await.is_err() {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
"response.created" => {
|
||||
if event.response.is_some() {
|
||||
let _ = tx_event.send(Ok(ResponseEvent::Created {})).await;
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::config_types::ReasoningEffort as ReasoningEffortConfig;
|
||||
use crate::config_types::ReasoningSummary as ReasoningSummaryConfig;
|
||||
use crate::error::Result;
|
||||
use crate::model_family::ModelFamily;
|
||||
use crate::models::ResponseItem;
|
||||
use crate::protocol::TokenUsage;
|
||||
use codex_apply_patch::APPLY_PATCH_TOOL_INSTRUCTIONS;
|
||||
@@ -42,13 +43,13 @@ pub struct Prompt {
|
||||
}
|
||||
|
||||
impl Prompt {
|
||||
pub(crate) fn get_full_instructions(&self, model: &str) -> Cow<'_, str> {
|
||||
pub(crate) fn get_full_instructions(&self, model: &ModelFamily) -> Cow<'_, str> {
|
||||
let base = self
|
||||
.base_instructions_override
|
||||
.as_deref()
|
||||
.unwrap_or(BASE_INSTRUCTIONS);
|
||||
let mut sections: Vec<&str> = vec![base];
|
||||
if model.starts_with("gpt-4.1") {
|
||||
if model.needs_special_apply_patch_instructions {
|
||||
sections.push(APPLY_PATCH_TOOL_INSTRUCTIONS);
|
||||
}
|
||||
Cow::Owned(sections.join("\n"))
|
||||
@@ -71,6 +72,7 @@ pub enum ResponseEvent {
|
||||
},
|
||||
OutputTextDelta(String),
|
||||
ReasoningSummaryDelta(String),
|
||||
ReasoningContentDelta(String),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
@@ -144,14 +146,12 @@ pub(crate) struct ResponsesApiRequest<'a> {
|
||||
pub(crate) include: Vec<String>,
|
||||
}
|
||||
|
||||
use crate::config::Config;
|
||||
|
||||
pub(crate) fn create_reasoning_param_for_request(
|
||||
config: &Config,
|
||||
model_family: &ModelFamily,
|
||||
effort: ReasoningEffortConfig,
|
||||
summary: ReasoningSummaryConfig,
|
||||
) -> Option<Reasoning> {
|
||||
if model_supports_reasoning_summaries(config) {
|
||||
if model_family.supports_reasoning_summaries {
|
||||
let effort: Option<OpenAiReasoningEffort> = effort.into();
|
||||
let effort = effort?;
|
||||
Some(Reasoning {
|
||||
@@ -163,27 +163,6 @@ pub(crate) fn create_reasoning_param_for_request(
|
||||
}
|
||||
}
|
||||
|
||||
pub fn model_supports_reasoning_summaries(config: &Config) -> bool {
|
||||
// Currently, we hardcode this rule to decide whether to enable reasoning.
|
||||
// We expect reasoning to apply only to OpenAI models, but we do not want
|
||||
// users to have to mess with their config to disable reasoning for models
|
||||
// that do not support it, such as `gpt-4.1`.
|
||||
//
|
||||
// Though if a user is using Codex with non-OpenAI models that, say, happen
|
||||
// to start with "o", then they can set `model_reasoning_effort = "none"` in
|
||||
// config.toml to disable reasoning.
|
||||
//
|
||||
// Converseley, if a user has a non-OpenAI provider that supports reasoning,
|
||||
// they can set the top-level `model_supports_reasoning_summaries = true`
|
||||
// config option to enable reasoning.
|
||||
if config.model_supports_reasoning_summaries {
|
||||
return true;
|
||||
}
|
||||
|
||||
let model = &config.model;
|
||||
model.starts_with("o") || model.starts_with("codex")
|
||||
}
|
||||
|
||||
pub(crate) struct ResponseStream {
|
||||
pub(crate) rx_event: mpsc::Receiver<Result<ResponseEvent>>,
|
||||
}
|
||||
@@ -198,6 +177,9 @@ impl Stream for ResponseStream {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#![allow(clippy::expect_used)]
|
||||
use crate::model_family::find_family_for_model;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
@@ -207,7 +189,8 @@ mod tests {
|
||||
..Default::default()
|
||||
};
|
||||
let expected = format!("{BASE_INSTRUCTIONS}\n{APPLY_PATCH_TOOL_INSTRUCTIONS}");
|
||||
let full = prompt.get_full_instructions("gpt-4.1");
|
||||
let model_family = find_family_for_model("gpt-4.1").expect("known model slug");
|
||||
let full = prompt.get_full_instructions(&model_family);
|
||||
assert_eq!(full, expected);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,6 +56,7 @@ use crate::mcp_tool_call::handle_mcp_tool_call;
|
||||
use crate::models::ContentItem;
|
||||
use crate::models::FunctionCallOutputPayload;
|
||||
use crate::models::LocalShellAction;
|
||||
use crate::models::ReasoningItemContent;
|
||||
use crate::models::ReasoningItemReasoningSummary;
|
||||
use crate::models::ResponseInputItem;
|
||||
use crate::models::ResponseItem;
|
||||
@@ -66,6 +67,8 @@ use crate::protocol::AgentMessageDeltaEvent;
|
||||
use crate::protocol::AgentMessageEvent;
|
||||
use crate::protocol::AgentReasoningDeltaEvent;
|
||||
use crate::protocol::AgentReasoningEvent;
|
||||
use crate::protocol::AgentReasoningRawContentDeltaEvent;
|
||||
use crate::protocol::AgentReasoningRawContentEvent;
|
||||
use crate::protocol::ApplyPatchApprovalRequestEvent;
|
||||
use crate::protocol::AskForApproval;
|
||||
use crate::protocol::BackgroundEventEvent;
|
||||
@@ -227,6 +230,7 @@ pub(crate) struct Session {
|
||||
state: Mutex<State>,
|
||||
codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
user_shell: shell::Shell,
|
||||
show_raw_agent_reasoning: bool,
|
||||
}
|
||||
|
||||
impl Session {
|
||||
@@ -822,6 +826,7 @@ async fn submission_loop(
|
||||
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
|
||||
disable_response_storage,
|
||||
user_shell: default_shell,
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
}));
|
||||
|
||||
// Patch restored state into the newly created session.
|
||||
@@ -1132,6 +1137,7 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
|
||||
ResponseItem::Reasoning {
|
||||
id,
|
||||
summary,
|
||||
content,
|
||||
encrypted_content,
|
||||
},
|
||||
None,
|
||||
@@ -1139,6 +1145,7 @@ async fn run_task(sess: Arc<Session>, sub_id: String, input: Vec<InputItem>) {
|
||||
items_to_record_in_conversation_history.push(ResponseItem::Reasoning {
|
||||
id: id.clone(),
|
||||
summary: summary.clone(),
|
||||
content: content.clone(),
|
||||
encrypted_content: encrypted_content.clone(),
|
||||
});
|
||||
}
|
||||
@@ -1392,6 +1399,17 @@ async fn try_run_turn(
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
}
|
||||
ResponseEvent::ReasoningContentDelta(delta) => {
|
||||
if sess.show_raw_agent_reasoning {
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentReasoningRawContentDelta(
|
||||
AgentReasoningRawContentDeltaEvent { delta },
|
||||
),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1498,7 +1516,12 @@ async fn handle_response_item(
|
||||
}
|
||||
None
|
||||
}
|
||||
ResponseItem::Reasoning { summary, .. } => {
|
||||
ResponseItem::Reasoning {
|
||||
id: _,
|
||||
summary,
|
||||
content,
|
||||
encrypted_content: _,
|
||||
} => {
|
||||
for item in summary {
|
||||
let text = match item {
|
||||
ReasoningItemReasoningSummary::SummaryText { text } => text,
|
||||
@@ -1509,6 +1532,21 @@ async fn handle_response_item(
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
}
|
||||
if sess.show_raw_agent_reasoning && content.is_some() {
|
||||
let content = content.unwrap();
|
||||
for item in content {
|
||||
let text = match item {
|
||||
ReasoningItemContent::ReasoningText { text } => text,
|
||||
};
|
||||
let event = Event {
|
||||
id: sub_id.to_string(),
|
||||
msg: EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent {
|
||||
text,
|
||||
}),
|
||||
};
|
||||
sess.tx_event.send(event).await.ok();
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
ResponseItem::FunctionCall {
|
||||
|
||||
@@ -10,6 +10,8 @@ use crate::config_types::ShellEnvironmentPolicyToml;
|
||||
use crate::config_types::Tui;
|
||||
use crate::config_types::UriBasedFileOpener;
|
||||
use crate::flags::OPENAI_DEFAULT_MODEL;
|
||||
use crate::model_family::ModelFamily;
|
||||
use crate::model_family::find_family_for_model;
|
||||
use crate::model_provider_info::ModelProviderInfo;
|
||||
use crate::model_provider_info::built_in_model_providers;
|
||||
use crate::openai_model_info::get_model_info;
|
||||
@@ -33,6 +35,8 @@ pub struct Config {
|
||||
/// Optional override of model selection.
|
||||
pub model: String,
|
||||
|
||||
pub model_family: ModelFamily,
|
||||
|
||||
/// Size of the context window for the model, in tokens.
|
||||
pub model_context_window: Option<u64>,
|
||||
|
||||
@@ -57,6 +61,10 @@ pub struct Config {
|
||||
/// users are only interested in the final agent responses.
|
||||
pub hide_agent_reasoning: bool,
|
||||
|
||||
/// When set to `true`, `AgentReasoningRawContentEvent` events will be shown in the UI/output.
|
||||
/// Defaults to `false`.
|
||||
pub show_raw_agent_reasoning: bool,
|
||||
|
||||
/// Disable server-side response storage (sends the full conversation
|
||||
/// context with every request). Currently necessary for OpenAI customers
|
||||
/// who have opted into Zero Data Retention (ZDR).
|
||||
@@ -134,10 +142,6 @@ pub struct Config {
|
||||
/// request using the Responses API.
|
||||
pub model_reasoning_summary: ReasoningSummary,
|
||||
|
||||
/// When set to `true`, overrides the default heuristic and forces
|
||||
/// `model_supports_reasoning_summaries()` to return `true`.
|
||||
pub model_supports_reasoning_summaries: bool,
|
||||
|
||||
/// Base URL for requests to ChatGPT (as opposed to the OpenAI API).
|
||||
pub chatgpt_base_url: String,
|
||||
|
||||
@@ -325,6 +329,10 @@ pub struct ConfigToml {
|
||||
/// UI/output. Defaults to `false`.
|
||||
pub hide_agent_reasoning: Option<bool>,
|
||||
|
||||
/// When set to `true`, `AgentReasoningRawContentEvent` events will be shown in the UI/output.
|
||||
/// Defaults to `false`.
|
||||
pub show_raw_agent_reasoning: Option<bool>,
|
||||
|
||||
pub model_reasoning_effort: Option<ReasoningEffort>,
|
||||
pub model_reasoning_summary: Option<ReasoningSummary>,
|
||||
|
||||
@@ -465,7 +473,19 @@ impl Config {
|
||||
.or(config_profile.model)
|
||||
.or(cfg.model)
|
||||
.unwrap_or_else(default_model);
|
||||
let openai_model_info = get_model_info(&model);
|
||||
let model_family = find_family_for_model(&model).unwrap_or_else(|| {
|
||||
let supports_reasoning_summaries =
|
||||
cfg.model_supports_reasoning_summaries.unwrap_or(false);
|
||||
ModelFamily {
|
||||
slug: model.clone(),
|
||||
family: model.clone(),
|
||||
needs_special_apply_patch_instructions: false,
|
||||
supports_reasoning_summaries,
|
||||
uses_local_shell_tool: false,
|
||||
}
|
||||
});
|
||||
|
||||
let openai_model_info = get_model_info(&model_family);
|
||||
let model_context_window = cfg
|
||||
.model_context_window
|
||||
.or_else(|| openai_model_info.as_ref().map(|info| info.context_window));
|
||||
@@ -490,6 +510,7 @@ impl Config {
|
||||
|
||||
let config = Self {
|
||||
model,
|
||||
model_family,
|
||||
model_context_window,
|
||||
model_max_output_tokens,
|
||||
model_provider_id,
|
||||
@@ -518,6 +539,7 @@ impl Config {
|
||||
codex_linux_sandbox_exe,
|
||||
|
||||
hide_agent_reasoning: cfg.hide_agent_reasoning.unwrap_or(false),
|
||||
show_raw_agent_reasoning: cfg.show_raw_agent_reasoning.unwrap_or(false),
|
||||
model_reasoning_effort: config_profile
|
||||
.model_reasoning_effort
|
||||
.or(cfg.model_reasoning_effort)
|
||||
@@ -527,10 +549,6 @@ impl Config {
|
||||
.or(cfg.model_reasoning_summary)
|
||||
.unwrap_or_default(),
|
||||
|
||||
model_supports_reasoning_summaries: cfg
|
||||
.model_supports_reasoning_summaries
|
||||
.unwrap_or(false),
|
||||
|
||||
chatgpt_base_url: config_profile
|
||||
.chatgpt_base_url
|
||||
.or(cfg.chatgpt_base_url)
|
||||
@@ -871,6 +889,7 @@ disable_response_storage = true
|
||||
assert_eq!(
|
||||
Config {
|
||||
model: "o3".to_string(),
|
||||
model_family: find_family_for_model("o3").expect("known model slug"),
|
||||
model_context_window: Some(200_000),
|
||||
model_max_output_tokens: Some(100_000),
|
||||
model_provider_id: "openai".to_string(),
|
||||
@@ -891,9 +910,9 @@ disable_response_storage = true
|
||||
tui: Tui::default(),
|
||||
codex_linux_sandbox_exe: None,
|
||||
hide_agent_reasoning: false,
|
||||
show_raw_agent_reasoning: false,
|
||||
model_reasoning_effort: ReasoningEffort::High,
|
||||
model_reasoning_summary: ReasoningSummary::Detailed,
|
||||
model_supports_reasoning_summaries: false,
|
||||
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
|
||||
experimental_resume: None,
|
||||
base_instructions: None,
|
||||
@@ -921,6 +940,7 @@ disable_response_storage = true
|
||||
)?;
|
||||
let expected_gpt3_profile_config = Config {
|
||||
model: "gpt-3.5-turbo".to_string(),
|
||||
model_family: find_family_for_model("gpt-3.5-turbo").expect("known model slug"),
|
||||
model_context_window: Some(16_385),
|
||||
model_max_output_tokens: Some(4_096),
|
||||
model_provider_id: "openai-chat-completions".to_string(),
|
||||
@@ -941,9 +961,9 @@ disable_response_storage = true
|
||||
tui: Tui::default(),
|
||||
codex_linux_sandbox_exe: None,
|
||||
hide_agent_reasoning: false,
|
||||
show_raw_agent_reasoning: false,
|
||||
model_reasoning_effort: ReasoningEffort::default(),
|
||||
model_reasoning_summary: ReasoningSummary::default(),
|
||||
model_supports_reasoning_summaries: false,
|
||||
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
|
||||
experimental_resume: None,
|
||||
base_instructions: None,
|
||||
@@ -986,6 +1006,7 @@ disable_response_storage = true
|
||||
)?;
|
||||
let expected_zdr_profile_config = Config {
|
||||
model: "o3".to_string(),
|
||||
model_family: find_family_for_model("o3").expect("known model slug"),
|
||||
model_context_window: Some(200_000),
|
||||
model_max_output_tokens: Some(100_000),
|
||||
model_provider_id: "openai".to_string(),
|
||||
@@ -1006,9 +1027,9 @@ disable_response_storage = true
|
||||
tui: Tui::default(),
|
||||
codex_linux_sandbox_exe: None,
|
||||
hide_agent_reasoning: false,
|
||||
show_raw_agent_reasoning: false,
|
||||
model_reasoning_effort: ReasoningEffort::default(),
|
||||
model_reasoning_summary: ReasoningSummary::default(),
|
||||
model_supports_reasoning_summaries: false,
|
||||
chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(),
|
||||
experimental_resume: None,
|
||||
base_instructions: None,
|
||||
|
||||
@@ -31,6 +31,7 @@ mod model_provider_info;
|
||||
pub use model_provider_info::ModelProviderInfo;
|
||||
pub use model_provider_info::WireApi;
|
||||
pub use model_provider_info::built_in_model_providers;
|
||||
pub mod model_family;
|
||||
mod models;
|
||||
mod openai_model_info;
|
||||
mod openai_tools;
|
||||
@@ -47,5 +48,4 @@ mod user_notification;
|
||||
pub mod util;
|
||||
|
||||
pub use apply_patch::CODEX_APPLY_PATCH_ARG1;
|
||||
pub use client_common::model_supports_reasoning_summaries;
|
||||
pub use safety::get_platform_sandbox;
|
||||
|
||||
93
codex-rs/core/src/model_family.rs
Normal file
93
codex-rs/core/src/model_family.rs
Normal file
@@ -0,0 +1,93 @@
|
||||
/// A model family is a group of models that share certain characteristics.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct ModelFamily {
|
||||
/// The full model slug used to derive this model family, e.g.
|
||||
/// "gpt-4.1-2025-04-14".
|
||||
pub slug: String,
|
||||
|
||||
/// The model family name, e.g. "gpt-4.1". Note this should able to be used
|
||||
/// with [`crate::openai_model_info::get_model_info`].
|
||||
pub family: String,
|
||||
|
||||
/// True if the model needs additional instructions on how to use the
|
||||
/// "virtual" `apply_patch` CLI.
|
||||
pub needs_special_apply_patch_instructions: bool,
|
||||
|
||||
// Whether the `reasoning` field can be set when making a request to this
|
||||
// model family. Note it has `effort` and `summary` subfields (though
|
||||
// `summary` is optional).
|
||||
pub supports_reasoning_summaries: bool,
|
||||
|
||||
// This should be set to true when the model expects a tool named
|
||||
// "local_shell" to be provided. Its contract must be understood natively by
|
||||
// the model such that its description can be omitted.
|
||||
// See https://platform.openai.com/docs/guides/tools-local-shell
|
||||
pub uses_local_shell_tool: bool,
|
||||
}
|
||||
|
||||
macro_rules! model_family {
|
||||
(
|
||||
$slug:expr, $family:expr $(, $key:ident : $value:expr )* $(,)?
|
||||
) => {{
|
||||
// defaults
|
||||
let mut mf = ModelFamily {
|
||||
slug: $slug.to_string(),
|
||||
family: $family.to_string(),
|
||||
needs_special_apply_patch_instructions: false,
|
||||
supports_reasoning_summaries: false,
|
||||
uses_local_shell_tool: false,
|
||||
};
|
||||
// apply overrides
|
||||
$(
|
||||
mf.$key = $value;
|
||||
)*
|
||||
Some(mf)
|
||||
}};
|
||||
}
|
||||
|
||||
macro_rules! simple_model_family {
|
||||
(
|
||||
$slug:expr, $family:expr
|
||||
) => {{
|
||||
Some(ModelFamily {
|
||||
slug: $slug.to_string(),
|
||||
family: $family.to_string(),
|
||||
needs_special_apply_patch_instructions: false,
|
||||
supports_reasoning_summaries: false,
|
||||
uses_local_shell_tool: false,
|
||||
})
|
||||
}};
|
||||
}
|
||||
|
||||
/// Returns a `ModelFamily` for the given model slug, or `None` if the slug
|
||||
/// does not match any known model family.
|
||||
pub fn find_family_for_model(slug: &str) -> Option<ModelFamily> {
|
||||
if slug.starts_with("o3") {
|
||||
model_family!(
|
||||
slug, "o3",
|
||||
supports_reasoning_summaries: true,
|
||||
)
|
||||
} else if slug.starts_with("o4-mini") {
|
||||
model_family!(
|
||||
slug, "o4-mini",
|
||||
supports_reasoning_summaries: true,
|
||||
)
|
||||
} else if slug.starts_with("codex-mini-latest") {
|
||||
model_family!(
|
||||
slug, "codex-mini-latest",
|
||||
supports_reasoning_summaries: true,
|
||||
uses_local_shell_tool: true,
|
||||
)
|
||||
} else if slug.starts_with("gpt-4.1") {
|
||||
model_family!(
|
||||
slug, "gpt-4.1",
|
||||
needs_special_apply_patch_instructions: true,
|
||||
)
|
||||
} else if slug.starts_with("gpt-4o") {
|
||||
simple_model_family!(slug, "gpt-4o")
|
||||
} else if slug.starts_with("gpt-3.5") {
|
||||
simple_model_family!(slug, "gpt-3.5")
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
@@ -45,6 +45,8 @@ pub enum ResponseItem {
|
||||
Reasoning {
|
||||
id: String,
|
||||
summary: Vec<ReasoningItemReasoningSummary>,
|
||||
#[serde(default, skip_serializing_if = "Option::is_none")]
|
||||
content: Option<Vec<ReasoningItemContent>>,
|
||||
encrypted_content: Option<String>,
|
||||
},
|
||||
LocalShellCall {
|
||||
@@ -136,6 +138,12 @@ pub enum ReasoningItemReasoningSummary {
|
||||
SummaryText { text: String },
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(tag = "type", rename_all = "snake_case")]
|
||||
pub enum ReasoningItemContent {
|
||||
ReasoningText { text: String },
|
||||
}
|
||||
|
||||
impl From<Vec<InputItem>> for ResponseInputItem {
|
||||
fn from(items: Vec<InputItem>) -> Self {
|
||||
Self::Message {
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use crate::model_family::ModelFamily;
|
||||
|
||||
/// Metadata about a model, particularly OpenAI models.
|
||||
/// We may want to consider including details like the pricing for
|
||||
/// input tokens, output tokens, etc., though users will need to be able to
|
||||
@@ -14,8 +16,8 @@ pub(crate) struct ModelInfo {
|
||||
|
||||
/// Note details such as what a model like gpt-4o is aliased to may be out of
|
||||
/// date.
|
||||
pub(crate) fn get_model_info(name: &str) -> Option<ModelInfo> {
|
||||
match name {
|
||||
pub(crate) fn get_model_info(model_family: &ModelFamily) -> Option<ModelInfo> {
|
||||
match model_family.slug.as_str() {
|
||||
// https://platform.openai.com/docs/models/o3
|
||||
"o3" => Some(ModelInfo {
|
||||
context_window: 200_000,
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::LazyLock;
|
||||
|
||||
use crate::client_common::Prompt;
|
||||
use crate::model_family::ModelFamily;
|
||||
use crate::plan_tool::PLAN_TOOL;
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
@@ -42,8 +42,7 @@ pub(crate) enum JsonSchema {
|
||||
},
|
||||
}
|
||||
|
||||
/// Tool usage specification
|
||||
static DEFAULT_TOOLS: LazyLock<Vec<OpenAiTool>> = LazyLock::new(|| {
|
||||
fn create_shell_tool() -> OpenAiTool {
|
||||
let mut properties = BTreeMap::new();
|
||||
properties.insert(
|
||||
"command".to_string(),
|
||||
@@ -54,38 +53,35 @@ static DEFAULT_TOOLS: LazyLock<Vec<OpenAiTool>> = LazyLock::new(|| {
|
||||
properties.insert("workdir".to_string(), JsonSchema::String);
|
||||
properties.insert("timeout".to_string(), JsonSchema::Number);
|
||||
|
||||
vec![OpenAiTool::Function(ResponsesApiTool {
|
||||
OpenAiTool::Function(ResponsesApiTool {
|
||||
name: "shell",
|
||||
description: "Runs a shell command, and returns its output.",
|
||||
description: "Runs a shell command and returns its output",
|
||||
strict: false,
|
||||
parameters: JsonSchema::Object {
|
||||
properties,
|
||||
required: &["command"],
|
||||
additional_properties: false,
|
||||
},
|
||||
})]
|
||||
});
|
||||
|
||||
static DEFAULT_CODEX_MODEL_TOOLS: LazyLock<Vec<OpenAiTool>> =
|
||||
LazyLock::new(|| vec![OpenAiTool::LocalShell {}]);
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns JSON values that are compatible with Function Calling in the
|
||||
/// Responses API:
|
||||
/// https://platform.openai.com/docs/guides/function-calling?api-mode=responses
|
||||
pub(crate) fn create_tools_json_for_responses_api(
|
||||
prompt: &Prompt,
|
||||
model: &str,
|
||||
model_family: &ModelFamily,
|
||||
include_plan_tool: bool,
|
||||
) -> crate::error::Result<Vec<serde_json::Value>> {
|
||||
// Assemble tool list: built-in tools + any extra tools from the prompt.
|
||||
let default_tools = if model.starts_with("codex") {
|
||||
&DEFAULT_CODEX_MODEL_TOOLS
|
||||
} else {
|
||||
&DEFAULT_TOOLS
|
||||
};
|
||||
let mut tools_json = Vec::with_capacity(default_tools.len() + prompt.extra_tools.len());
|
||||
for t in default_tools.iter() {
|
||||
tools_json.push(serde_json::to_value(t)?);
|
||||
let mut openai_tools = vec![create_shell_tool()];
|
||||
if model_family.uses_local_shell_tool {
|
||||
openai_tools.push(OpenAiTool::LocalShell {});
|
||||
}
|
||||
|
||||
let mut tools_json = Vec::with_capacity(openai_tools.len() + prompt.extra_tools.len() + 1);
|
||||
for tool in openai_tools.iter() {
|
||||
tools_json.push(serde_json::to_value(tool)?);
|
||||
}
|
||||
tools_json.extend(
|
||||
prompt
|
||||
@@ -107,13 +103,13 @@ pub(crate) fn create_tools_json_for_responses_api(
|
||||
/// https://platform.openai.com/docs/guides/function-calling?api-mode=chat
|
||||
pub(crate) fn create_tools_json_for_chat_completions_api(
|
||||
prompt: &Prompt,
|
||||
model: &str,
|
||||
model_family: &ModelFamily,
|
||||
include_plan_tool: bool,
|
||||
) -> crate::error::Result<Vec<serde_json::Value>> {
|
||||
// We start with the JSON for the Responses API and than rewrite it to match
|
||||
// the chat completions tool call format.
|
||||
let responses_api_tools_json =
|
||||
create_tools_json_for_responses_api(prompt, model, include_plan_tool)?;
|
||||
create_tools_json_for_responses_api(prompt, model_family, include_plan_tool)?;
|
||||
let tools_json = responses_api_tools_json
|
||||
.into_iter()
|
||||
.filter_map(|mut tool| {
|
||||
|
||||
@@ -359,6 +359,12 @@ pub enum EventMsg {
|
||||
/// Agent reasoning delta event from agent.
|
||||
AgentReasoningDelta(AgentReasoningDeltaEvent),
|
||||
|
||||
/// Raw chain-of-thought from agent.
|
||||
AgentReasoningRawContent(AgentReasoningRawContentEvent),
|
||||
|
||||
/// Agent reasoning content delta event from agent.
|
||||
AgentReasoningRawContentDelta(AgentReasoningRawContentDeltaEvent),
|
||||
|
||||
/// Ack the client's configure message.
|
||||
SessionConfigured(SessionConfiguredEvent),
|
||||
|
||||
@@ -464,6 +470,16 @@ pub struct AgentReasoningEvent {
|
||||
pub text: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct AgentReasoningRawContentEvent {
|
||||
pub text: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct AgentReasoningRawContentDeltaEvent {
|
||||
pub delta: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct AgentReasoningDeltaEvent {
|
||||
pub delta: String,
|
||||
|
||||
@@ -3,7 +3,6 @@ use std::path::Path;
|
||||
use codex_common::summarize_sandbox_policy;
|
||||
use codex_core::WireApi;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::model_supports_reasoning_summaries;
|
||||
use codex_core::protocol::Event;
|
||||
|
||||
pub(crate) enum CodexStatus {
|
||||
@@ -29,7 +28,7 @@ pub(crate) fn create_config_summary_entries(config: &Config) -> Vec<(&'static st
|
||||
("sandbox", summarize_sandbox_policy(&config.sandbox_policy)),
|
||||
];
|
||||
if config.model_provider.wire_api == WireApi::Responses
|
||||
&& model_supports_reasoning_summaries(config)
|
||||
&& config.model_family.supports_reasoning_summaries
|
||||
{
|
||||
entries.push((
|
||||
"reasoning effort",
|
||||
|
||||
@@ -5,6 +5,8 @@ use codex_core::plan_tool::UpdatePlanArgs;
|
||||
use codex_core::protocol::AgentMessageDeltaEvent;
|
||||
use codex_core::protocol::AgentMessageEvent;
|
||||
use codex_core::protocol::AgentReasoningDeltaEvent;
|
||||
use codex_core::protocol::AgentReasoningRawContentDeltaEvent;
|
||||
use codex_core::protocol::AgentReasoningRawContentEvent;
|
||||
use codex_core::protocol::BackgroundEventEvent;
|
||||
use codex_core::protocol::ErrorEvent;
|
||||
use codex_core::protocol::Event;
|
||||
@@ -55,8 +57,10 @@ pub(crate) struct EventProcessorWithHumanOutput {
|
||||
|
||||
/// Whether to include `AgentReasoning` events in the output.
|
||||
show_agent_reasoning: bool,
|
||||
show_raw_agent_reasoning: bool,
|
||||
answer_started: bool,
|
||||
reasoning_started: bool,
|
||||
raw_reasoning_started: bool,
|
||||
last_message_path: Option<PathBuf>,
|
||||
}
|
||||
|
||||
@@ -81,8 +85,10 @@ impl EventProcessorWithHumanOutput {
|
||||
green: Style::new().green(),
|
||||
cyan: Style::new().cyan(),
|
||||
show_agent_reasoning: !config.hide_agent_reasoning,
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
answer_started: false,
|
||||
reasoning_started: false,
|
||||
raw_reasoning_started: false,
|
||||
last_message_path,
|
||||
}
|
||||
} else {
|
||||
@@ -97,8 +103,10 @@ impl EventProcessorWithHumanOutput {
|
||||
green: Style::new(),
|
||||
cyan: Style::new(),
|
||||
show_agent_reasoning: !config.hide_agent_reasoning,
|
||||
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
|
||||
answer_started: false,
|
||||
reasoning_started: false,
|
||||
raw_reasoning_started: false,
|
||||
last_message_path,
|
||||
}
|
||||
}
|
||||
@@ -203,6 +211,32 @@ impl EventProcessor for EventProcessorWithHumanOutput {
|
||||
#[allow(clippy::expect_used)]
|
||||
std::io::stdout().flush().expect("could not flush stdout");
|
||||
}
|
||||
EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent { text }) => {
|
||||
if !self.show_raw_agent_reasoning {
|
||||
return CodexStatus::Running;
|
||||
}
|
||||
if !self.raw_reasoning_started {
|
||||
print!("{text}");
|
||||
#[allow(clippy::expect_used)]
|
||||
std::io::stdout().flush().expect("could not flush stdout");
|
||||
} else {
|
||||
println!();
|
||||
self.raw_reasoning_started = false;
|
||||
}
|
||||
}
|
||||
EventMsg::AgentReasoningRawContentDelta(AgentReasoningRawContentDeltaEvent {
|
||||
delta,
|
||||
}) => {
|
||||
if !self.show_raw_agent_reasoning {
|
||||
return CodexStatus::Running;
|
||||
}
|
||||
if !self.raw_reasoning_started {
|
||||
self.raw_reasoning_started = true;
|
||||
}
|
||||
print!("{delta}");
|
||||
#[allow(clippy::expect_used)]
|
||||
std::io::stdout().flush().expect("could not flush stdout");
|
||||
}
|
||||
EventMsg::AgentMessage(AgentMessageEvent { message }) => {
|
||||
// if answer_started is false, this means we haven't received any
|
||||
// delta. Thus, we need to print the message as a new answer.
|
||||
|
||||
@@ -31,6 +31,7 @@ tokio = { version = "1", features = [
|
||||
"rt-multi-thread",
|
||||
"signal",
|
||||
] }
|
||||
tokio-util = { version = "0.7" }
|
||||
toml = "0.9"
|
||||
tracing = { version = "0.1.41", features = ["log"] }
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
|
||||
|
||||
@@ -15,7 +15,6 @@ use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::ExecApprovalRequestEvent;
|
||||
use codex_core::protocol::InputItem;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::Submission;
|
||||
use codex_core::protocol::TaskCompleteEvent;
|
||||
use mcp_types::CallToolResult;
|
||||
use mcp_types::ContentBlock;
|
||||
@@ -79,27 +78,18 @@ pub async fn run_codex_tool_session(
|
||||
)
|
||||
.await;
|
||||
|
||||
// Use the original MCP request ID as the `sub_id` for the Codex submission so that
|
||||
// any events emitted for this tool-call can be correlated with the
|
||||
// originating `tools/call` request.
|
||||
let sub_id = match &id {
|
||||
RequestId::String(s) => s.clone(),
|
||||
RequestId::Integer(n) => n.to_string(),
|
||||
};
|
||||
running_requests_id_to_codex_uuid
|
||||
.lock()
|
||||
.await
|
||||
.insert(id.clone(), session_id);
|
||||
let submission = Submission {
|
||||
id: sub_id.clone(),
|
||||
op: Op::UserInput {
|
||||
if let Err(e) = codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![InputItem::Text {
|
||||
text: initial_prompt.clone(),
|
||||
}],
|
||||
},
|
||||
};
|
||||
|
||||
if let Err(e) = codex.submit_with_id(submission).await {
|
||||
})
|
||||
.await
|
||||
{
|
||||
tracing::error!("Failed to submit initial prompt: {e}");
|
||||
// unregister the id so we don't keep it in the map
|
||||
running_requests_id_to_codex_uuid.lock().await.remove(&id);
|
||||
@@ -151,10 +141,7 @@ async fn run_codex_tool_session_inner(
|
||||
request_id: RequestId,
|
||||
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, Uuid>>>,
|
||||
) {
|
||||
let request_id_str = match &request_id {
|
||||
RequestId::String(s) => s.clone(),
|
||||
RequestId::Integer(n) => n.to_string(),
|
||||
};
|
||||
let request_id_str = crate::request_id::request_id_to_string(&request_id);
|
||||
|
||||
// Stream events until the task needs to pause for user interaction or
|
||||
// completes.
|
||||
@@ -252,7 +239,9 @@ async fn run_codex_tool_session_inner(
|
||||
EventMsg::AgentMessage(AgentMessageEvent { .. }) => {
|
||||
// TODO: think how we want to support this in the MCP
|
||||
}
|
||||
EventMsg::TaskStarted
|
||||
EventMsg::AgentReasoningRawContent(_)
|
||||
| EventMsg::AgentReasoningRawContentDelta(_)
|
||||
| EventMsg::TaskStarted
|
||||
| EventMsg::TokenCount(_)
|
||||
| EventMsg::AgentReasoning(_)
|
||||
| EventMsg::McpToolCallBegin(_)
|
||||
|
||||
@@ -1,122 +1,369 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::exec_approval::handle_exec_approval_request;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use crate::outgoing_message::OutgoingNotificationMeta;
|
||||
use crate::patch_approval::handle_patch_approval_request;
|
||||
use codex_core::Codex;
|
||||
use codex_core::error::Result as CodexResult;
|
||||
use codex_core::protocol::AgentMessageEvent;
|
||||
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
|
||||
use codex_core::protocol::Event;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::ExecApprovalRequestEvent;
|
||||
use codex_core::protocol::FileChange;
|
||||
use codex_core::protocol::InputItem;
|
||||
use codex_core::protocol::Op;
|
||||
use mcp_types::RequestId;
|
||||
use tokio::sync::Mutex;
|
||||
// no streaming watch channel; streaming is toggled via set_streaming on the struct
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::error;
|
||||
use uuid::Uuid;
|
||||
|
||||
pub async fn run_conversation_loop(
|
||||
use crate::exec_approval::handle_exec_approval_request;
|
||||
use crate::mcp_protocol::CodexEventNotificationParams;
|
||||
use crate::mcp_protocol::ConversationId;
|
||||
use crate::mcp_protocol::InitialStateNotificationParams;
|
||||
use crate::mcp_protocol::InitialStatePayload;
|
||||
use crate::mcp_protocol::NotificationMeta;
|
||||
use crate::mcp_protocol::ServerNotification;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use crate::patch_approval::handle_patch_approval_request;
|
||||
use crate::request_id::request_id_to_string;
|
||||
|
||||
/// Conversation struct that owns the Codex session and all per-conversation state.
|
||||
pub(crate) struct Conversation {
|
||||
codex: Arc<Codex>,
|
||||
session_id: Uuid,
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
request_id: RequestId,
|
||||
) {
|
||||
let request_id_str = match &request_id {
|
||||
RequestId::String(s) => s.clone(),
|
||||
RequestId::Integer(n) => n.to_string(),
|
||||
};
|
||||
state: Mutex<ConversationState>,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
// Stream events until the task needs to pause for user interaction or
|
||||
// completes.
|
||||
loop {
|
||||
match codex.next_event().await {
|
||||
Ok(event) => {
|
||||
outgoing
|
||||
.send_event_as_notification(
|
||||
&event,
|
||||
Some(OutgoingNotificationMeta::new(Some(request_id.clone()))),
|
||||
)
|
||||
struct ConversationState {
|
||||
streaming_enabled: bool,
|
||||
buffered_events: Vec<CodexEventNotificationParams>,
|
||||
pending_elicitations: Vec<PendingElicitation>,
|
||||
}
|
||||
|
||||
impl Conversation {
|
||||
pub(crate) fn new(
|
||||
codex: Arc<Codex>,
|
||||
outgoing: Arc<OutgoingMessageSender>,
|
||||
request_id: RequestId,
|
||||
session_id: Uuid,
|
||||
) -> Arc<Self> {
|
||||
let conv = Arc::new(Self {
|
||||
codex,
|
||||
session_id,
|
||||
outgoing,
|
||||
request_id,
|
||||
state: Mutex::new(ConversationState {
|
||||
streaming_enabled: false,
|
||||
buffered_events: Vec::new(),
|
||||
pending_elicitations: Vec::new(),
|
||||
}),
|
||||
cancel: CancellationToken::new(),
|
||||
});
|
||||
// Detach a background loop tied to this Conversation
|
||||
spawn_conversation_loop(conv.clone());
|
||||
conv
|
||||
}
|
||||
|
||||
pub(crate) async fn set_streaming(&self, enabled: bool) {
|
||||
if enabled {
|
||||
let (events_snapshot, pending_snapshot) = {
|
||||
let mut st = self.state.lock().await;
|
||||
st.streaming_enabled = true;
|
||||
(
|
||||
st.buffered_events.clone(),
|
||||
std::mem::take(&mut st.pending_elicitations),
|
||||
)
|
||||
};
|
||||
self.emit_initial_state_with(events_snapshot).await;
|
||||
self.drain_pending_elicitations_from(pending_snapshot).await;
|
||||
} else {
|
||||
let mut st = self.state.lock().await;
|
||||
st.streaming_enabled = false;
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn codex(&self) -> Arc<Codex> {
|
||||
self.codex.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn try_submit_user_input(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
items: Vec<InputItem>,
|
||||
) -> CodexResult<()> {
|
||||
let _ = request_id; // request_id is not used to enforce uniqueness; Codex generates ids.
|
||||
self.codex.submit(Op::UserInput { items }).await.map(|_| ())
|
||||
}
|
||||
|
||||
async fn handle_event(&self, event: Event) {
|
||||
{
|
||||
let mut st = self.state.lock().await;
|
||||
st.buffered_events.push(CodexEventNotificationParams {
|
||||
meta: None,
|
||||
msg: event.msg.clone(),
|
||||
});
|
||||
}
|
||||
self.stream_event_if_enabled(&event.msg).await;
|
||||
|
||||
match event.msg {
|
||||
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
|
||||
command,
|
||||
cwd,
|
||||
call_id,
|
||||
reason: _,
|
||||
}) => {
|
||||
self.process_exec_request(command, cwd, call_id, event.id.clone())
|
||||
.await;
|
||||
}
|
||||
EventMsg::Error(err) => {
|
||||
error!("Codex runtime error: {}", err.message);
|
||||
}
|
||||
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
|
||||
call_id,
|
||||
reason,
|
||||
grant_root,
|
||||
changes,
|
||||
}) => {
|
||||
self.start_patch_approval(PatchRequest {
|
||||
call_id,
|
||||
reason,
|
||||
grant_root,
|
||||
changes,
|
||||
event_id: event.id.clone(),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
EventMsg::TaskComplete(_) => {}
|
||||
EventMsg::TaskStarted => {}
|
||||
EventMsg::SessionConfigured(ev) => {
|
||||
error!("unexpected SessionConfigured event: {:?}", ev);
|
||||
}
|
||||
EventMsg::AgentMessageDelta(_) => {}
|
||||
EventMsg::AgentReasoningDelta(_) => {}
|
||||
EventMsg::AgentMessage(AgentMessageEvent { .. }) => {}
|
||||
EventMsg::TokenCount(_)
|
||||
| EventMsg::AgentReasoning(_)
|
||||
| EventMsg::AgentReasoningRawContent(_)
|
||||
| EventMsg::AgentReasoningRawContentDelta(_)
|
||||
| EventMsg::McpToolCallBegin(_)
|
||||
| EventMsg::McpToolCallEnd(_)
|
||||
| EventMsg::ExecCommandBegin(_)
|
||||
| EventMsg::ExecCommandEnd(_)
|
||||
| EventMsg::BackgroundEvent(_)
|
||||
| EventMsg::ExecCommandOutputDelta(_)
|
||||
| EventMsg::PatchApplyBegin(_)
|
||||
| EventMsg::PatchApplyEnd(_)
|
||||
| EventMsg::GetHistoryEntryResponse(_)
|
||||
| EventMsg::PlanUpdate(_)
|
||||
| EventMsg::TurnDiff(_)
|
||||
| EventMsg::ShutdownComplete => {}
|
||||
}
|
||||
}
|
||||
|
||||
match event.msg {
|
||||
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
|
||||
async fn emit_initial_state_with(&self, events: Vec<CodexEventNotificationParams>) {
|
||||
let params = InitialStateNotificationParams {
|
||||
meta: Some(NotificationMeta {
|
||||
conversation_id: Some(ConversationId(self.session_id)),
|
||||
request_id: None,
|
||||
}),
|
||||
initial_state: InitialStatePayload { events },
|
||||
};
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::InitialState(params))
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn drain_pending_elicitations_from(&self, items: Vec<PendingElicitation>) {
|
||||
for item in items {
|
||||
match item {
|
||||
PendingElicitation::ExecRequest(ExecRequest {
|
||||
command,
|
||||
cwd,
|
||||
event_id,
|
||||
call_id,
|
||||
}) => {
|
||||
handle_exec_approval_request(
|
||||
command,
|
||||
cwd,
|
||||
self.outgoing.clone(),
|
||||
self.codex.clone(),
|
||||
self.request_id.clone(),
|
||||
request_id_to_string(&self.request_id),
|
||||
event_id,
|
||||
call_id,
|
||||
reason: _,
|
||||
}) => {
|
||||
handle_exec_approval_request(
|
||||
command,
|
||||
cwd,
|
||||
outgoing.clone(),
|
||||
codex.clone(),
|
||||
request_id.clone(),
|
||||
request_id_str.clone(),
|
||||
event.id.clone(),
|
||||
call_id,
|
||||
)
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
EventMsg::Error(_) => {
|
||||
error!("Codex runtime error");
|
||||
}
|
||||
EventMsg::ApplyPatchApprovalRequest(ApplyPatchApprovalRequestEvent {
|
||||
)
|
||||
.await;
|
||||
}
|
||||
PendingElicitation::PatchRequest(PatchRequest {
|
||||
call_id,
|
||||
reason,
|
||||
grant_root,
|
||||
changes,
|
||||
event_id,
|
||||
}) => {
|
||||
handle_patch_approval_request(
|
||||
call_id,
|
||||
reason,
|
||||
grant_root,
|
||||
changes,
|
||||
}) => {
|
||||
handle_patch_approval_request(
|
||||
call_id,
|
||||
reason,
|
||||
grant_root,
|
||||
changes,
|
||||
outgoing.clone(),
|
||||
codex.clone(),
|
||||
request_id.clone(),
|
||||
request_id_str.clone(),
|
||||
event.id.clone(),
|
||||
)
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
EventMsg::TaskComplete(_) => {}
|
||||
EventMsg::SessionConfigured(_) => {
|
||||
tracing::error!("unexpected SessionConfigured event");
|
||||
}
|
||||
EventMsg::AgentMessageDelta(_) => {
|
||||
// TODO: think how we want to support this in the MCP
|
||||
}
|
||||
EventMsg::AgentReasoningDelta(_) => {
|
||||
// TODO: think how we want to support this in the MCP
|
||||
}
|
||||
EventMsg::AgentMessage(AgentMessageEvent { .. }) => {
|
||||
// TODO: think how we want to support this in the MCP
|
||||
}
|
||||
EventMsg::TaskStarted
|
||||
| EventMsg::TokenCount(_)
|
||||
| EventMsg::AgentReasoning(_)
|
||||
| EventMsg::McpToolCallBegin(_)
|
||||
| EventMsg::McpToolCallEnd(_)
|
||||
| EventMsg::ExecCommandBegin(_)
|
||||
| EventMsg::ExecCommandEnd(_)
|
||||
| EventMsg::TurnDiff(_)
|
||||
| EventMsg::BackgroundEvent(_)
|
||||
| EventMsg::ExecCommandOutputDelta(_)
|
||||
| EventMsg::PatchApplyBegin(_)
|
||||
| EventMsg::PatchApplyEnd(_)
|
||||
| EventMsg::GetHistoryEntryResponse(_)
|
||||
| EventMsg::PlanUpdate(_)
|
||||
| EventMsg::ShutdownComplete => {
|
||||
// For now, we do not do anything extra for these
|
||||
// events. Note that
|
||||
// send(codex_event_to_notification(&event)) above has
|
||||
// already dispatched these events as notifications,
|
||||
// though we may want to do give different treatment to
|
||||
// individual events in the future.
|
||||
}
|
||||
self.outgoing.clone(),
|
||||
self.codex.clone(),
|
||||
self.request_id.clone(),
|
||||
request_id_to_string(&self.request_id),
|
||||
event_id,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Codex runtime error: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_exec_request(
|
||||
&self,
|
||||
command: Vec<String>,
|
||||
cwd: PathBuf,
|
||||
call_id: String,
|
||||
event_id: String,
|
||||
) {
|
||||
let should_stream = {
|
||||
let st = self.state.lock().await;
|
||||
st.streaming_enabled
|
||||
};
|
||||
if should_stream {
|
||||
handle_exec_approval_request(
|
||||
command,
|
||||
cwd,
|
||||
self.outgoing.clone(),
|
||||
self.codex.clone(),
|
||||
self.request_id.clone(),
|
||||
request_id_to_string(&self.request_id),
|
||||
event_id,
|
||||
call_id,
|
||||
)
|
||||
.await;
|
||||
} else {
|
||||
let mut st = self.state.lock().await;
|
||||
st.pending_elicitations
|
||||
.push(PendingElicitation::ExecRequest(ExecRequest {
|
||||
command,
|
||||
cwd,
|
||||
event_id,
|
||||
call_id,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
async fn start_patch_approval(&self, req: PatchRequest) {
|
||||
let PatchRequest {
|
||||
call_id,
|
||||
reason,
|
||||
grant_root,
|
||||
changes,
|
||||
event_id,
|
||||
} = req;
|
||||
let should_stream = {
|
||||
let st = self.state.lock().await;
|
||||
st.streaming_enabled
|
||||
};
|
||||
if should_stream {
|
||||
handle_patch_approval_request(
|
||||
call_id,
|
||||
reason,
|
||||
grant_root,
|
||||
changes,
|
||||
self.outgoing.clone(),
|
||||
self.codex.clone(),
|
||||
self.request_id.clone(),
|
||||
request_id_to_string(&self.request_id),
|
||||
event_id,
|
||||
)
|
||||
.await;
|
||||
} else {
|
||||
let mut st = self.state.lock().await;
|
||||
st.pending_elicitations
|
||||
.push(PendingElicitation::PatchRequest(PatchRequest {
|
||||
call_id,
|
||||
reason,
|
||||
grant_root,
|
||||
changes,
|
||||
event_id,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
async fn stream_event_if_enabled(&self, msg: &EventMsg) {
|
||||
if !{ self.state.lock().await.streaming_enabled } {
|
||||
return;
|
||||
}
|
||||
let method = msg.to_string();
|
||||
let params = CodexEventNotificationParams {
|
||||
meta: None,
|
||||
msg: msg.clone(),
|
||||
};
|
||||
match serde_json::to_value(¶ms) {
|
||||
Ok(params_val) => {
|
||||
self.outgoing
|
||||
.send_custom_notification(&method, params_val)
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Failed to serialize event params: {err:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum PendingElicitation {
|
||||
ExecRequest(ExecRequest),
|
||||
PatchRequest(PatchRequest),
|
||||
}
|
||||
|
||||
struct PatchRequest {
|
||||
call_id: String,
|
||||
reason: Option<String>,
|
||||
grant_root: Option<PathBuf>,
|
||||
changes: HashMap<PathBuf, FileChange>,
|
||||
event_id: String,
|
||||
}
|
||||
|
||||
struct ExecRequest {
|
||||
command: Vec<String>,
|
||||
cwd: PathBuf,
|
||||
event_id: String,
|
||||
call_id: String,
|
||||
}
|
||||
|
||||
impl Drop for Conversation {
|
||||
fn drop(&mut self) {
|
||||
self.cancel.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_conversation_loop(this: Arc<Conversation>) {
|
||||
tokio::spawn(async move {
|
||||
let codex = this.codex.clone();
|
||||
let cancel = this.cancel.clone();
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => {
|
||||
break;
|
||||
}
|
||||
res = codex.next_event() => {
|
||||
match res {
|
||||
Ok(event) => this.handle_event(event).await,
|
||||
Err(e) => {
|
||||
error!("Codex next_event error (session {}): {e}", this.session_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ pub mod mcp_protocol;
|
||||
pub(crate) mod message_processor;
|
||||
mod outgoing_message;
|
||||
mod patch_approval;
|
||||
mod request_id;
|
||||
pub(crate) mod tool_handlers;
|
||||
|
||||
use crate::message_processor::MessageProcessor;
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -13,10 +12,11 @@ use crate::mcp_protocol::ToolCallResponseResult;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use crate::tool_handlers::create_conversation::handle_create_conversation;
|
||||
use crate::tool_handlers::send_message::handle_send_message;
|
||||
use crate::tool_handlers::stream_conversation;
|
||||
use crate::tool_handlers::stream_conversation::handle_stream_conversation;
|
||||
|
||||
use codex_core::Codex;
|
||||
use codex_core::config::Config as CodexConfig;
|
||||
use codex_core::protocol::Submission;
|
||||
use mcp_types::CallToolRequest;
|
||||
use mcp_types::CallToolRequestParams;
|
||||
use mcp_types::CallToolResult;
|
||||
@@ -43,8 +43,10 @@ pub(crate) struct MessageProcessor {
|
||||
initialized: bool,
|
||||
codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
session_map: Arc<Mutex<HashMap<Uuid, Arc<Codex>>>>,
|
||||
conversation_map: Arc<Mutex<HashMap<Uuid, Arc<crate::conversation_loop::Conversation>>>>,
|
||||
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, Uuid>>>,
|
||||
running_session_ids: Arc<Mutex<HashSet<Uuid>>>,
|
||||
/// Track request IDs to the original ToolCallRequestParams for cancellation handling
|
||||
tool_request_map: Arc<Mutex<HashMap<RequestId, ToolCallRequestParams>>>,
|
||||
}
|
||||
|
||||
impl MessageProcessor {
|
||||
@@ -59,23 +61,22 @@ impl MessageProcessor {
|
||||
initialized: false,
|
||||
codex_linux_sandbox_exe,
|
||||
session_map: Arc::new(Mutex::new(HashMap::new())),
|
||||
conversation_map: Arc::new(Mutex::new(HashMap::new())),
|
||||
running_requests_id_to_codex_uuid: Arc::new(Mutex::new(HashMap::new())),
|
||||
running_session_ids: Arc::new(Mutex::new(HashSet::new())),
|
||||
tool_request_map: Arc::new(Mutex::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn session_map(&self) -> Arc<Mutex<HashMap<Uuid, Arc<Codex>>>> {
|
||||
self.session_map.clone()
|
||||
pub(crate) fn conversation_map(
|
||||
&self,
|
||||
) -> Arc<Mutex<HashMap<Uuid, Arc<crate::conversation_loop::Conversation>>>> {
|
||||
self.conversation_map.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn outgoing(&self) -> Arc<OutgoingMessageSender> {
|
||||
self.outgoing.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn running_session_ids(&self) -> Arc<Mutex<HashSet<Uuid>>> {
|
||||
self.running_session_ids.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn process_request(&mut self, request: JSONRPCRequest) {
|
||||
// Hold on to the ID so we can respond.
|
||||
let request_id = request.id.clone();
|
||||
@@ -353,6 +354,11 @@ impl MessageProcessor {
|
||||
}
|
||||
}
|
||||
async fn handle_new_tool_calls(&self, request_id: RequestId, params: ToolCallRequestParams) {
|
||||
// Track the request to allow graceful cancellation routing later.
|
||||
{
|
||||
let mut tool_request_map = self.tool_request_map.lock().await;
|
||||
tool_request_map.insert(request_id.clone(), params.clone());
|
||||
}
|
||||
match params {
|
||||
ToolCallRequestParams::ConversationCreate(args) => {
|
||||
handle_create_conversation(self, request_id, args).await;
|
||||
@@ -360,6 +366,9 @@ impl MessageProcessor {
|
||||
ToolCallRequestParams::ConversationSendMessage(args) => {
|
||||
handle_send_message(self, request_id, args).await;
|
||||
}
|
||||
ToolCallRequestParams::ConversationStream(args) => {
|
||||
handle_stream_conversation(self, request_id, args).await;
|
||||
}
|
||||
_ => {
|
||||
let result = CallToolResult {
|
||||
content: vec![ContentBlock::TextContent(TextContent {
|
||||
@@ -584,23 +593,72 @@ impl MessageProcessor {
|
||||
// ---------------------------------------------------------------------
|
||||
// Notification handlers
|
||||
// ---------------------------------------------------------------------
|
||||
|
||||
async fn handle_cancelled_notification(
|
||||
&self,
|
||||
params: <mcp_types::CancelledNotification as mcp_types::ModelContextProtocolNotification>::Params,
|
||||
) {
|
||||
let request_id = params.request_id;
|
||||
// Create a stable string form early for logging and submission id.
|
||||
let request_id_string = match &request_id {
|
||||
RequestId::String(s) => s.clone(),
|
||||
RequestId::Integer(i) => i.to_string(),
|
||||
};
|
||||
|
||||
// Obtain the session_id while holding the first lock, then release.
|
||||
if let Some(orig) = {
|
||||
let mut tool_request_map = self.tool_request_map.lock().await;
|
||||
tool_request_map.remove(&request_id)
|
||||
} {
|
||||
self.handle_mcp_protocol_cancelled_notification(request_id, orig)
|
||||
.await;
|
||||
} else {
|
||||
self.handle_legacy_cancelled_notification(request_id).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_mcp_protocol_cancelled_notification(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
orig: ToolCallRequestParams,
|
||||
) {
|
||||
match orig {
|
||||
ToolCallRequestParams::ConversationStream(args) => {
|
||||
stream_conversation::handle_cancel(self, &args).await;
|
||||
}
|
||||
ToolCallRequestParams::ConversationSendMessage(args) => {
|
||||
// Cancel in-flight user input for this conversation by interrupting the session.
|
||||
|
||||
let session_id = args.conversation_id.0;
|
||||
let codex_arc = {
|
||||
let sessions_guard = self.conversation_map.lock().await;
|
||||
match sessions_guard.get(&session_id) {
|
||||
Some(conv) => conv.codex().clone(),
|
||||
None => {
|
||||
tracing::warn!(
|
||||
"Cancel send_message: session not found for session_id: {session_id}"
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = codex_arc.submit(codex_core::protocol::Op::Interrupt).await {
|
||||
tracing::error!("Failed to submit interrupt for send_message cancel: {e}");
|
||||
}
|
||||
}
|
||||
ToolCallRequestParams::ConversationCreate(_)
|
||||
| ToolCallRequestParams::ConversationsList(_) => {
|
||||
// Likely fast/non-streaming; nothing to cancel currently.
|
||||
tracing::debug!(
|
||||
"Cancel conversationsList received for request_id: {:?} (no-op)",
|
||||
request_id
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_legacy_cancelled_notification(&self, request_id: RequestId) {
|
||||
use crate::request_id::request_id_to_string;
|
||||
let request_id_string = request_id_to_string(&request_id);
|
||||
|
||||
let session_id = {
|
||||
let map_guard = self.running_requests_id_to_codex_uuid.lock().await;
|
||||
match map_guard.get(&request_id) {
|
||||
Some(id) => *id, // Uuid is Copy
|
||||
Some(id) => *id,
|
||||
None => {
|
||||
tracing::warn!("Session not found for request_id: {}", request_id_string);
|
||||
return;
|
||||
@@ -609,7 +667,6 @@ impl MessageProcessor {
|
||||
};
|
||||
tracing::info!("session_id: {session_id}");
|
||||
|
||||
// Obtain the Codex Arc while holding the session_map lock, then release.
|
||||
let codex_arc = {
|
||||
let sessions_guard = self.session_map.lock().await;
|
||||
match sessions_guard.get(&session_id) {
|
||||
@@ -621,18 +678,11 @@ impl MessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
// Submit interrupt to Codex.
|
||||
let err = codex_arc
|
||||
.submit_with_id(Submission {
|
||||
id: request_id_string,
|
||||
op: codex_core::protocol::Op::Interrupt,
|
||||
})
|
||||
.await;
|
||||
if let Err(e) = err {
|
||||
if let Err(e) = codex_arc.submit(codex_core::protocol::Op::Interrupt).await {
|
||||
tracing::error!("Failed to submit interrupt to Codex: {e}");
|
||||
return;
|
||||
}
|
||||
// unregister the id so we don't keep it in the map
|
||||
|
||||
self.running_requests_id_to_codex_uuid
|
||||
.lock()
|
||||
.await
|
||||
|
||||
@@ -109,7 +109,7 @@ impl OutgoingMessageSender {
|
||||
|
||||
// should be backwards compatible.
|
||||
// it will replace send_event_as_notification eventually.
|
||||
async fn send_event_as_notification_new_schema(
|
||||
pub(crate) async fn send_event_as_notification_new_schema(
|
||||
&self,
|
||||
event: &Event,
|
||||
params: Option<serde_json::Value>,
|
||||
@@ -124,6 +124,37 @@ impl OutgoingMessageSender {
|
||||
let outgoing_message = OutgoingMessage::Error(OutgoingError { id, error });
|
||||
let _ = self.sender.send(outgoing_message).await;
|
||||
}
|
||||
|
||||
/// Send a custom notification with an explicit method name and params object.
|
||||
pub(crate) async fn send_custom_notification(&self, method: &str, params: serde_json::Value) {
|
||||
let outgoing_message = OutgoingMessage::Notification(OutgoingNotification {
|
||||
method: method.to_string(),
|
||||
params: Some(params),
|
||||
});
|
||||
let _ = self.sender.send(outgoing_message).await;
|
||||
}
|
||||
|
||||
/// Send a typed server notification by serializing it into a method/params pair.
|
||||
pub(crate) async fn send_server_notification(
|
||||
&self,
|
||||
notification: crate::mcp_protocol::ServerNotification,
|
||||
) {
|
||||
match serde_json::to_value(notification) {
|
||||
Ok(serde_json::Value::Object(mut map)) => {
|
||||
let method = map
|
||||
.remove("method")
|
||||
.and_then(|v| v.as_str().map(|s| s.to_string()));
|
||||
let params = map.remove("params").unwrap_or(serde_json::Value::Null);
|
||||
if let Some(method) = method {
|
||||
self.send_custom_notification(&method, params).await;
|
||||
} else {
|
||||
warn!("ServerNotification missing method after serialization");
|
||||
}
|
||||
}
|
||||
Ok(_) => warn!("ServerNotification did not serialize to an object"),
|
||||
Err(err) => warn!("Failed to serialize ServerNotification: {err:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Outgoing message from the server to the client.
|
||||
|
||||
9
codex-rs/mcp-server/src/request_id.rs
Normal file
9
codex-rs/mcp-server/src/request_id.rs
Normal file
@@ -0,0 +1,9 @@
|
||||
use mcp_types::RequestId;
|
||||
|
||||
/// Utility to convert an MCP `RequestId` into a `String`.
|
||||
pub(crate) fn request_id_to_string(id: &RequestId) -> String {
|
||||
match id {
|
||||
RequestId::String(s) => s.clone(),
|
||||
RequestId::Integer(i) => i.to_string(),
|
||||
}
|
||||
}
|
||||
@@ -1,18 +1,14 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_core::Codex;
|
||||
use codex_core::codex_wrapper::init_codex;
|
||||
use codex_core::config::Config as CodexConfig;
|
||||
use codex_core::config::ConfigOverrides;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::SessionConfiguredEvent;
|
||||
use mcp_types::RequestId;
|
||||
use tokio::sync::Mutex;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::conversation_loop::run_conversation_loop;
|
||||
use crate::conversation_loop::Conversation;
|
||||
use crate::json_to_toml::json_to_toml;
|
||||
use crate::mcp_protocol::ConversationCreateArgs;
|
||||
use crate::mcp_protocol::ConversationCreateResult;
|
||||
@@ -121,24 +117,17 @@ pub(crate) async fn handle_create_conversation(
|
||||
let session_id = codex_conversation.session_id;
|
||||
let codex_arc = Arc::new(codex_conversation.codex);
|
||||
|
||||
// Store session for future calls
|
||||
insert_session(
|
||||
session_id,
|
||||
codex_arc.clone(),
|
||||
message_processor.session_map(),
|
||||
)
|
||||
.await;
|
||||
// Run the conversation loop in the background so this request can return immediately.
|
||||
// Construct conversation and start its loop, store it, then reply with id and model
|
||||
let outgoing = message_processor.outgoing();
|
||||
let spawn_id = id.clone();
|
||||
tokio::spawn(async move {
|
||||
run_conversation_loop(codex_arc.clone(), outgoing, spawn_id).await;
|
||||
});
|
||||
|
||||
// Reply with the new conversation id and effective model
|
||||
let conversation = Conversation::new(codex_arc.clone(), outgoing, id.clone(), session_id);
|
||||
let conv_map = message_processor.conversation_map();
|
||||
{
|
||||
let mut guard = conv_map.lock().await;
|
||||
guard.insert(session_id, conversation);
|
||||
}
|
||||
message_processor
|
||||
.send_response_with_optional_error(
|
||||
id,
|
||||
id.clone(),
|
||||
Some(ToolCallResponseResult::ConversationCreate(
|
||||
ConversationCreateResult::Ok {
|
||||
conversation_id: ConversationId(session_id),
|
||||
@@ -149,12 +138,3 @@ pub(crate) async fn handle_create_conversation(
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn insert_session(
|
||||
session_id: Uuid,
|
||||
codex: Arc<Codex>,
|
||||
session_map: Arc<Mutex<HashMap<Uuid, Arc<Codex>>>>,
|
||||
) {
|
||||
let mut guard = session_map.lock().await;
|
||||
guard.insert(session_id, codex);
|
||||
}
|
||||
|
||||
@@ -1,2 +1,3 @@
|
||||
pub(crate) mod create_conversation;
|
||||
pub(crate) mod send_message;
|
||||
pub(crate) mod stream_conversation;
|
||||
|
||||
@@ -1,13 +1,11 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_core::Codex;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::Submission;
|
||||
use mcp_types::RequestId;
|
||||
use tokio::sync::Mutex;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::conversation_loop::Conversation;
|
||||
use crate::mcp_protocol::ConversationSendMessageArgs;
|
||||
use crate::mcp_protocol::ConversationSendMessageResult;
|
||||
use crate::mcp_protocol::ToolCallResponseResult;
|
||||
@@ -41,7 +39,8 @@ pub(crate) async fn handle_send_message(
|
||||
}
|
||||
|
||||
let session_id = conversation_id.0;
|
||||
let Some(codex) = get_session(session_id, message_processor.session_map()).await else {
|
||||
let Some(conversation) = get_session(session_id, message_processor.conversation_map()).await
|
||||
else {
|
||||
message_processor
|
||||
.send_response_with_optional_error(
|
||||
id,
|
||||
@@ -56,46 +55,15 @@ pub(crate) async fn handle_send_message(
|
||||
return;
|
||||
};
|
||||
|
||||
let running = {
|
||||
let running_sessions = message_processor.running_session_ids();
|
||||
let mut running_sessions = running_sessions.lock().await;
|
||||
!running_sessions.insert(session_id)
|
||||
};
|
||||
let res = conversation.try_submit_user_input(id.clone(), items).await;
|
||||
|
||||
if running {
|
||||
if let Err(e) = res {
|
||||
message_processor
|
||||
.send_response_with_optional_error(
|
||||
id,
|
||||
Some(ToolCallResponseResult::ConversationSendMessage(
|
||||
ConversationSendMessageResult::Error {
|
||||
message: "Session is already running".to_string(),
|
||||
},
|
||||
)),
|
||||
Some(true),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
let request_id_string = match &id {
|
||||
RequestId::String(s) => s.clone(),
|
||||
RequestId::Integer(i) => i.to_string(),
|
||||
};
|
||||
|
||||
let submit_res = codex
|
||||
.submit_with_id(Submission {
|
||||
id: request_id_string,
|
||||
op: Op::UserInput { items },
|
||||
})
|
||||
.await;
|
||||
|
||||
if let Err(e) = submit_res {
|
||||
message_processor
|
||||
.send_response_with_optional_error(
|
||||
id,
|
||||
Some(ToolCallResponseResult::ConversationSendMessage(
|
||||
ConversationSendMessageResult::Error {
|
||||
message: format!("Failed to submit user input: {e}"),
|
||||
message: e.to_string(),
|
||||
},
|
||||
)),
|
||||
Some(true),
|
||||
@@ -117,8 +85,8 @@ pub(crate) async fn handle_send_message(
|
||||
|
||||
pub(crate) async fn get_session(
|
||||
session_id: Uuid,
|
||||
session_map: Arc<Mutex<HashMap<Uuid, Arc<Codex>>>>,
|
||||
) -> Option<Arc<Codex>> {
|
||||
let guard = session_map.lock().await;
|
||||
conversation_map: Arc<Mutex<HashMap<Uuid, Arc<Conversation>>>>,
|
||||
) -> Option<Arc<Conversation>> {
|
||||
let guard = conversation_map.lock().await;
|
||||
guard.get(&session_id).cloned()
|
||||
}
|
||||
|
||||
57
codex-rs/mcp-server/src/tool_handlers/stream_conversation.rs
Normal file
57
codex-rs/mcp-server/src/tool_handlers/stream_conversation.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
use mcp_types::RequestId;
|
||||
|
||||
use crate::mcp_protocol::ConversationStreamArgs;
|
||||
use crate::mcp_protocol::ConversationStreamResult;
|
||||
use crate::mcp_protocol::ToolCallResponseResult;
|
||||
use crate::message_processor::MessageProcessor;
|
||||
use crate::tool_handlers::send_message::get_session;
|
||||
|
||||
/// Handles the ConversationStream tool call: verifies the session and
|
||||
/// enables streaming for the session, replying with an OK result.
|
||||
pub(crate) async fn handle_stream_conversation(
|
||||
message_processor: &MessageProcessor,
|
||||
id: RequestId,
|
||||
arguments: ConversationStreamArgs,
|
||||
) {
|
||||
let ConversationStreamArgs { conversation_id } = arguments;
|
||||
|
||||
let session_id = conversation_id.0;
|
||||
|
||||
// Ensure the session exists
|
||||
let conv = get_session(session_id, message_processor.conversation_map()).await;
|
||||
|
||||
if conv.is_none() {
|
||||
// Return an error with no result payload per MCP error pattern
|
||||
message_processor
|
||||
.send_response_with_optional_error(id, None, Some(true))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
message_processor
|
||||
.send_response_with_optional_error(
|
||||
id,
|
||||
Some(ToolCallResponseResult::ConversationStream(
|
||||
ConversationStreamResult {},
|
||||
)),
|
||||
Some(false),
|
||||
)
|
||||
.await;
|
||||
|
||||
if let Some(conv) = conv {
|
||||
tokio::spawn(async move {
|
||||
conv.set_streaming(true).await;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles cancellation for ConversationStream by disabling streaming for the session.
|
||||
pub(crate) async fn handle_cancel(
|
||||
message_processor: &MessageProcessor,
|
||||
args: &ConversationStreamArgs,
|
||||
) {
|
||||
let session_id = args.conversation_id.0;
|
||||
if let Some(conv) = get_session(session_id, message_processor.conversation_map()).await {
|
||||
conv.set_streaming(false).await;
|
||||
}
|
||||
}
|
||||
26
codex-rs/mcp-server/tests/common/config.rs
Normal file
26
codex-rs/mcp-server/tests/common/config.rs
Normal file
@@ -0,0 +1,26 @@
|
||||
use std::path::Path;
|
||||
|
||||
/// Write a minimal Codex config.toml pointing at the provided mock server URI.
|
||||
/// Used by tests that don't exercise approval/sandbox variations.
|
||||
pub fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(
|
||||
config_toml,
|
||||
format!(
|
||||
r#"
|
||||
model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "danger-full-access"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "chat"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
),
|
||||
)
|
||||
}
|
||||
@@ -1,7 +1,9 @@
|
||||
mod config;
|
||||
mod mcp_process;
|
||||
mod mock_model_server;
|
||||
mod responses;
|
||||
|
||||
pub use config::create_config_toml;
|
||||
pub use mcp_process::McpProcess;
|
||||
pub use mock_model_server::create_mock_chat_completions_server;
|
||||
pub use responses::create_apply_patch_sse_response;
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::path::Path;
|
||||
use std::process::Stdio;
|
||||
use std::sync::atomic::AtomicI64;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::time::Duration;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::io::BufReader;
|
||||
@@ -17,6 +18,7 @@ use codex_mcp_server::CodexToolCallReplyParam;
|
||||
use codex_mcp_server::mcp_protocol::ConversationCreateArgs;
|
||||
use codex_mcp_server::mcp_protocol::ConversationId;
|
||||
use codex_mcp_server::mcp_protocol::ConversationSendMessageArgs;
|
||||
use codex_mcp_server::mcp_protocol::ConversationStreamArgs;
|
||||
use codex_mcp_server::mcp_protocol::ToolCallRequestParams;
|
||||
|
||||
use mcp_types::CallToolRequestParams;
|
||||
@@ -201,6 +203,20 @@ impl McpProcess {
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn send_conversation_stream_tool_call(
|
||||
&mut self,
|
||||
session_id: &str,
|
||||
) -> anyhow::Result<i64> {
|
||||
let params = ToolCallRequestParams::ConversationStream(ConversationStreamArgs {
|
||||
conversation_id: ConversationId(Uuid::parse_str(session_id)?),
|
||||
});
|
||||
self.send_request(
|
||||
mcp_types::CallToolRequest::METHOD,
|
||||
Some(serde_json::to_value(params)?),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn send_conversation_create_tool_call(
|
||||
&mut self,
|
||||
prompt: &str,
|
||||
@@ -236,6 +252,83 @@ impl McpProcess {
|
||||
.await
|
||||
}
|
||||
|
||||
/// Create a conversation and return its conversation_id as a string.
|
||||
pub async fn create_conversation_and_get_id(
|
||||
&mut self,
|
||||
prompt: &str,
|
||||
model: &str,
|
||||
cwd: &str,
|
||||
) -> anyhow::Result<String> {
|
||||
let req_id = self
|
||||
.send_conversation_create_tool_call(prompt, model, cwd)
|
||||
.await?;
|
||||
let resp = self
|
||||
.read_stream_until_response_message(RequestId::Integer(req_id))
|
||||
.await?;
|
||||
let conv_id = resp.result["structuredContent"]["conversation_id"]
|
||||
.as_str()
|
||||
.ok_or_else(|| anyhow::format_err!("missing conversation_id"))?
|
||||
.to_string();
|
||||
Ok(conv_id)
|
||||
}
|
||||
|
||||
/// Connect stream for a conversation and wait for the initial_state notification.
|
||||
/// Returns (requestId, params) where params are the initial_state notification params.
|
||||
pub async fn connect_stream_and_expect_initial_state(
|
||||
&mut self,
|
||||
session_id: &str,
|
||||
) -> anyhow::Result<(i64, serde_json::Value)> {
|
||||
let req_id = self.send_conversation_stream_tool_call(session_id).await?;
|
||||
// Wait for stream() tool-call response first
|
||||
let _ = self
|
||||
.read_stream_until_response_message(RequestId::Integer(req_id))
|
||||
.await?;
|
||||
// Then the initial_state notification
|
||||
let note = self
|
||||
.read_stream_until_notification_method("notifications/initial_state")
|
||||
.await?;
|
||||
let params = note
|
||||
.params
|
||||
.ok_or_else(|| anyhow::format_err!("initial_state must have params"))?;
|
||||
Ok((req_id, params))
|
||||
}
|
||||
|
||||
/// Wait for an agent_message with a bounded timeout. Returns Some(params) if received, None on timeout.
|
||||
pub async fn maybe_wait_for_agent_message(
|
||||
&mut self,
|
||||
dur: Duration,
|
||||
) -> anyhow::Result<Option<serde_json::Value>> {
|
||||
match tokio::time::timeout(dur, self.wait_for_agent_message()).await {
|
||||
Ok(Ok(v)) => Ok(Some(v)),
|
||||
Ok(Err(e)) => Err(e),
|
||||
Err(_elapsed) => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// Send a user message to a conversation and wait for the OK tool-call response.
|
||||
pub async fn send_user_message_and_wait_ok(
|
||||
&mut self,
|
||||
message: &str,
|
||||
session_id: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
let req_id = self
|
||||
.send_user_message_tool_call(message, session_id)
|
||||
.await?;
|
||||
let _ = self
|
||||
.read_stream_until_response_message(RequestId::Integer(req_id))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Wait until an agent_message notification arrives; returns its params.
|
||||
pub async fn wait_for_agent_message(&mut self) -> anyhow::Result<serde_json::Value> {
|
||||
let note = self
|
||||
.read_stream_until_notification_method("agent_message")
|
||||
.await?;
|
||||
note.params
|
||||
.ok_or_else(|| anyhow::format_err!("agent_message missing params"))
|
||||
}
|
||||
|
||||
async fn send_request(
|
||||
&mut self,
|
||||
method: &str,
|
||||
@@ -329,53 +422,51 @@ impl McpProcess {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn read_stream_until_notification_method(
|
||||
&mut self,
|
||||
method: &str,
|
||||
) -> anyhow::Result<JSONRPCNotification> {
|
||||
loop {
|
||||
let message = self.read_jsonrpc_message().await?;
|
||||
match message {
|
||||
JSONRPCMessage::Notification(n) => {
|
||||
if n.method == method {
|
||||
return Ok(n);
|
||||
}
|
||||
}
|
||||
JSONRPCMessage::Request(_) => {
|
||||
// ignore
|
||||
}
|
||||
JSONRPCMessage::Error(_) => {
|
||||
anyhow::bail!("unexpected JSONRPCMessage::Error: {message:?}");
|
||||
}
|
||||
JSONRPCMessage::Response(_) => {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn read_stream_until_configured_response_message(
|
||||
&mut self,
|
||||
) -> anyhow::Result<String> {
|
||||
let mut sid_old: Option<String> = None;
|
||||
let mut sid_new: Option<String> = None;
|
||||
loop {
|
||||
let message = self.read_jsonrpc_message().await?;
|
||||
eprint!("message: {message:?}");
|
||||
|
||||
match message {
|
||||
JSONRPCMessage::Notification(notification) => {
|
||||
if let Some(params) = notification.params {
|
||||
// Back-compat schema: method == "codex/event" and msg.type == "session_configured"
|
||||
if notification.method == "codex/event" {
|
||||
if let Some(msg) = params.get("msg") {
|
||||
if msg.get("type").and_then(|v| v.as_str())
|
||||
== Some("session_configured")
|
||||
{
|
||||
if let Some(session_id) =
|
||||
msg.get("session_id").and_then(|v| v.as_str())
|
||||
{
|
||||
sid_old = Some(session_id.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// New schema: method is the Display of EventMsg::SessionConfigured => "SessionConfigured"
|
||||
if notification.method == "session_configured" {
|
||||
if notification.method == "session_configured" {
|
||||
if let Some(params) = notification.params {
|
||||
if let Some(msg) = params.get("msg") {
|
||||
if let Some(session_id) =
|
||||
msg.get("session_id").and_then(|v| v.as_str())
|
||||
{
|
||||
sid_new = Some(session_id.to_string());
|
||||
return Ok(session_id.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if sid_old.is_some() && sid_new.is_some() {
|
||||
// Both seen, they must match
|
||||
assert_eq!(
|
||||
sid_old.as_ref().unwrap(),
|
||||
sid_new.as_ref().unwrap(),
|
||||
"session_id mismatch between old and new schema"
|
||||
);
|
||||
return Ok(sid_old.unwrap());
|
||||
}
|
||||
}
|
||||
JSONRPCMessage::Request(_) => {
|
||||
anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}");
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
#![allow(clippy::expect_used, clippy::unwrap_used)]
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
use mcp_test_support::McpProcess;
|
||||
use mcp_test_support::create_config_toml;
|
||||
use mcp_test_support::create_final_assistant_message_sse_response;
|
||||
use mcp_test_support::create_mock_chat_completions_server;
|
||||
use mcp_types::JSONRPCResponse;
|
||||
@@ -103,26 +102,4 @@ async fn test_conversation_create_and_send_message_ok() {
|
||||
drop(server);
|
||||
}
|
||||
|
||||
// Helper to create a config.toml pointing at the mock model server.
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(
|
||||
config_toml,
|
||||
format!(
|
||||
r#"
|
||||
model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "danger-full-access"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "chat"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
),
|
||||
)
|
||||
}
|
||||
// create_config_toml is provided by tests/common
|
||||
|
||||
@@ -1,17 +1,17 @@
|
||||
#![cfg(unix)]
|
||||
// Support code lives in the `mcp_test_support` crate under tests/common.
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
|
||||
use codex_mcp_server::CodexToolCallParam;
|
||||
use mcp_types::JSONRPCResponse;
|
||||
use mcp_types::ModelContextProtocolNotification;
|
||||
use mcp_types::RequestId;
|
||||
use serde_json::json;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
use mcp_test_support::McpProcess;
|
||||
use mcp_test_support::create_config_toml;
|
||||
use mcp_test_support::create_mock_chat_completions_server;
|
||||
use mcp_test_support::create_shell_sse_response;
|
||||
|
||||
@@ -66,7 +66,7 @@ async fn shell_command_interruption() -> anyhow::Result<()> {
|
||||
|
||||
// Create Codex configuration
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), server.uri())?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
let mut mcp_process = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp_process.initialize()).await??;
|
||||
|
||||
@@ -95,7 +95,7 @@ async fn shell_command_interruption() -> anyhow::Result<()> {
|
||||
// Send interrupt notification
|
||||
mcp_process
|
||||
.send_notification(
|
||||
"notifications/cancelled",
|
||||
mcp_types::CancelledNotification::METHOD,
|
||||
Some(json!({ "requestId": codex_request_id })),
|
||||
)
|
||||
.await?;
|
||||
@@ -126,7 +126,7 @@ async fn shell_command_interruption() -> anyhow::Result<()> {
|
||||
// Send interrupt notification
|
||||
mcp_process
|
||||
.send_notification(
|
||||
"notifications/cancelled",
|
||||
mcp_types::CancelledNotification::METHOD,
|
||||
Some(json!({ "requestId": codex_reply_request_id })),
|
||||
)
|
||||
.await?;
|
||||
@@ -148,30 +148,3 @@ async fn shell_command_interruption() -> anyhow::Result<()> {
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
fn create_config_toml(codex_home: &Path, server_uri: String) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(
|
||||
config_toml,
|
||||
format!(
|
||||
r#"
|
||||
model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "danger-full-access"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "chat"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
#![allow(clippy::expect_used)]
|
||||
|
||||
use std::path::Path;
|
||||
use std::thread::sleep;
|
||||
use std::time::Duration;
|
||||
|
||||
use codex_mcp_server::CodexToolCallParam;
|
||||
use mcp_test_support::McpProcess;
|
||||
use mcp_test_support::create_config_toml;
|
||||
use mcp_test_support::create_final_assistant_message_sse_response;
|
||||
use mcp_test_support::create_mock_chat_completions_server;
|
||||
use mcp_types::JSONRPC_VERSION;
|
||||
@@ -20,11 +19,9 @@ const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_send_message_success() {
|
||||
// Spin up a mock completions server that immediately ends the Codex turn.
|
||||
// Two Codex turns hit the mock model (session start + send-user-message). Provide two SSE responses.
|
||||
// Spin up a mock completions server that ends the Codex turn for the send-user-message call.
|
||||
let responses = vec![
|
||||
create_final_assistant_message_sse_response("Done").expect("build mock assistant message"),
|
||||
create_final_assistant_message_sse_response("Done").expect("build mock assistant message"),
|
||||
];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
|
||||
@@ -41,29 +38,11 @@ async fn test_send_message_success() {
|
||||
.expect("init timed out")
|
||||
.expect("init failed");
|
||||
|
||||
// Kick off a Codex session so we have a valid session_id.
|
||||
let codex_request_id = mcp_process
|
||||
.send_codex_tool_call(CodexToolCallParam {
|
||||
prompt: "Start a session".to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("send codex tool call");
|
||||
|
||||
// Wait for the session_configured event to get the session_id.
|
||||
// Create a conversation using the tool and get its conversation_id
|
||||
let session_id = mcp_process
|
||||
.read_stream_until_configured_response_message()
|
||||
.create_conversation_and_get_id("", "mock-model", "/repo")
|
||||
.await
|
||||
.expect("read session_configured");
|
||||
|
||||
// The original codex call will finish quickly given our mock; consume its response.
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp_process.read_stream_until_response_message(RequestId::Integer(codex_request_id)),
|
||||
)
|
||||
.await
|
||||
.expect("codex response timeout")
|
||||
.expect("codex response error");
|
||||
.expect("create conversation");
|
||||
|
||||
// Now exercise the send-user-message tool.
|
||||
let send_msg_request_id = mcp_process
|
||||
@@ -135,29 +114,4 @@ async fn test_send_message_session_not_found() {
|
||||
assert_eq!(result["isError"], json!(true));
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
|
||||
let config_toml = codex_home.join("config.toml");
|
||||
std::fs::write(
|
||||
config_toml,
|
||||
format!(
|
||||
r#"
|
||||
model = "mock-model"
|
||||
approval_policy = "never"
|
||||
sandbox_mode = "danger-full-access"
|
||||
|
||||
model_provider = "mock_provider"
|
||||
|
||||
[model_providers.mock_provider]
|
||||
name = "Mock provider for test"
|
||||
base_url = "{server_uri}/v1"
|
||||
wire_api = "chat"
|
||||
request_max_retries = 0
|
||||
stream_max_retries = 0
|
||||
"#
|
||||
),
|
||||
)
|
||||
}
|
||||
// Helpers are provided by tests/common
|
||||
|
||||
251
codex-rs/mcp-server/tests/stream_conversation.rs
Normal file
251
codex-rs/mcp-server/tests/stream_conversation.rs
Normal file
@@ -0,0 +1,251 @@
|
||||
#![allow(clippy::expect_used, clippy::unwrap_used)]
|
||||
|
||||
use mcp_test_support::McpProcess;
|
||||
use mcp_test_support::create_config_toml;
|
||||
use mcp_test_support::create_final_assistant_message_sse_response;
|
||||
use mcp_test_support::create_mock_chat_completions_server;
|
||||
use mcp_types::JSONRPCNotification;
|
||||
use mcp_types::ModelContextProtocolNotification;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::timeout;
|
||||
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_connect_then_send_receives_initial_state_and_notifications() {
|
||||
let responses = vec![
|
||||
create_final_assistant_message_sse_response("Done").expect("build mock assistant message"),
|
||||
];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
|
||||
let codex_home = TempDir::new().expect("create temp dir");
|
||||
create_config_toml(codex_home.path(), &server.uri()).expect("write config.toml");
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path())
|
||||
.await
|
||||
.expect("spawn mcp process");
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
|
||||
.await
|
||||
.expect("init timeout")
|
||||
.expect("init failed");
|
||||
|
||||
// Create conversation
|
||||
let conv_id = mcp
|
||||
.create_conversation_and_get_id("", "o3", "/repo")
|
||||
.await
|
||||
.expect("create conversation");
|
||||
|
||||
// Connect the stream
|
||||
let (_stream_req, params) = mcp
|
||||
.connect_stream_and_expect_initial_state(&conv_id)
|
||||
.await
|
||||
.expect("initial_state params");
|
||||
let expected_params = json!({
|
||||
"_meta": {
|
||||
"conversationId": conv_id.as_str(),
|
||||
},
|
||||
"initial_state": {
|
||||
"events": []
|
||||
}
|
||||
});
|
||||
assert_eq!(params, expected_params);
|
||||
|
||||
// Send a message and expect a subsequent notification (non-initial_state)
|
||||
mcp.send_user_message_and_wait_ok("Hello there", &conv_id)
|
||||
.await
|
||||
.expect("send message ok");
|
||||
|
||||
// Read until we see an event notification (new schema example: agent_message)
|
||||
let params = mcp.wait_for_agent_message().await.expect("agent message");
|
||||
let expected_params = json!({
|
||||
"msg": {
|
||||
"type": "agent_message",
|
||||
"message": "Done"
|
||||
}
|
||||
});
|
||||
assert_eq!(params, expected_params);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_send_then_connect_receives_initial_state_with_message() {
|
||||
let responses = vec![
|
||||
create_final_assistant_message_sse_response("Done").expect("build mock assistant message"),
|
||||
];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
|
||||
let codex_home = TempDir::new().expect("create temp dir");
|
||||
create_config_toml(codex_home.path(), &server.uri()).expect("write config.toml");
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path())
|
||||
.await
|
||||
.expect("spawn mcp process");
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
|
||||
.await
|
||||
.expect("init timeout")
|
||||
.expect("init failed");
|
||||
|
||||
// Create conversation
|
||||
let conv_id = mcp
|
||||
.create_conversation_and_get_id("", "o3", "/repo")
|
||||
.await
|
||||
.expect("create conversation");
|
||||
|
||||
// Send a message BEFORE connecting stream
|
||||
mcp.send_user_message_and_wait_ok("Hello world", &conv_id)
|
||||
.await
|
||||
.expect("send message ok");
|
||||
|
||||
// Now connect stream and expect InitialState with the prior message included
|
||||
let (_stream_req, params) = mcp
|
||||
.connect_stream_and_expect_initial_state(&conv_id)
|
||||
.await
|
||||
.expect("initial_state params");
|
||||
let events = params["initial_state"]["events"]
|
||||
.as_array()
|
||||
.expect("events array");
|
||||
if !events.iter().any(|ev| {
|
||||
ev.get("msg")
|
||||
.and_then(|m| m.get("type"))
|
||||
.and_then(|t| t.as_str())
|
||||
== Some("agent_message")
|
||||
&& ev
|
||||
.get("msg")
|
||||
.and_then(|m| m.get("message"))
|
||||
.and_then(|t| t.as_str())
|
||||
== Some("Done")
|
||||
}) {
|
||||
// Fallback to live notification if not present in initial state
|
||||
let note: JSONRPCNotification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_method("agent_message"),
|
||||
)
|
||||
.await
|
||||
.expect("event note timeout")
|
||||
.expect("event note err");
|
||||
let params = note.params.expect("params");
|
||||
let expected_params = json!({
|
||||
"msg": {
|
||||
"type": "agent_message",
|
||||
"message": "Done"
|
||||
}
|
||||
});
|
||||
assert_eq!(params, expected_params);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn test_cancel_stream_then_reconnect_catches_up_initial_state() {
|
||||
// One response is sufficient for the assertions in this test
|
||||
let responses = vec![
|
||||
create_final_assistant_message_sse_response("Done 1")
|
||||
.expect("build mock assistant message"),
|
||||
create_final_assistant_message_sse_response("Done 2")
|
||||
.expect("build mock assistant message"),
|
||||
];
|
||||
let server = create_mock_chat_completions_server(responses).await;
|
||||
|
||||
let codex_home = TempDir::new().expect("create temp dir");
|
||||
create_config_toml(codex_home.path(), &server.uri()).expect("write config.toml");
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path())
|
||||
.await
|
||||
.expect("spawn mcp process");
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize())
|
||||
.await
|
||||
.expect("init timeout")
|
||||
.expect("init failed");
|
||||
|
||||
// Create and connect stream A
|
||||
let conv_id = mcp
|
||||
.create_conversation_and_get_id("", "o3", "/repo")
|
||||
.await
|
||||
.expect("create");
|
||||
let (stream_a_id, _params) = mcp
|
||||
.connect_stream_and_expect_initial_state(&conv_id)
|
||||
.await
|
||||
.expect("stream A initial_state");
|
||||
|
||||
// Send M1 and ensure we get live agent_message
|
||||
mcp.send_user_message_and_wait_ok("Hello M1", &conv_id)
|
||||
.await
|
||||
.expect("send M1");
|
||||
let _params = mcp.wait_for_agent_message().await.expect("agent M1");
|
||||
|
||||
// Ensure the first task has fully completed before cancelling the stream
|
||||
// so that the session is no longer marked as running.
|
||||
let _ = mcp
|
||||
.read_stream_until_notification_method("task_complete")
|
||||
.await
|
||||
.expect("task complete");
|
||||
|
||||
// Cancel stream A
|
||||
mcp.send_notification(
|
||||
mcp_types::CancelledNotification::METHOD,
|
||||
Some(json!({ "requestId": stream_a_id })),
|
||||
)
|
||||
.await
|
||||
.expect("send cancelled");
|
||||
|
||||
// Send M2 while stream is cancelled; we should NOT get agent_message live
|
||||
mcp.send_user_message_and_wait_ok("Hello M2", &conv_id)
|
||||
.await
|
||||
.expect("send M2");
|
||||
let maybe = mcp
|
||||
.maybe_wait_for_agent_message(std::time::Duration::from_millis(300))
|
||||
.await
|
||||
.expect("maybe wait");
|
||||
assert!(
|
||||
maybe.is_none(),
|
||||
"should not get live agent_message after cancel"
|
||||
);
|
||||
|
||||
// Connect stream B and expect initial_state that includes the response
|
||||
let (_stream_req, params) = mcp
|
||||
.connect_stream_and_expect_initial_state(&conv_id)
|
||||
.await
|
||||
.expect("stream B initial_state");
|
||||
let events = params["initial_state"]["events"]
|
||||
.as_array()
|
||||
.expect("events array");
|
||||
let expected = vec![
|
||||
json!({
|
||||
"msg": {
|
||||
"type": "task_started",
|
||||
},
|
||||
}),
|
||||
json!({
|
||||
"msg": {
|
||||
"message": "Done 1",
|
||||
"type": "agent_message",
|
||||
},
|
||||
}),
|
||||
json!({
|
||||
"msg": {
|
||||
"last_agent_message": "Done 1",
|
||||
"type": "task_complete",
|
||||
},
|
||||
}),
|
||||
json!({
|
||||
"msg": {
|
||||
"type": "task_started",
|
||||
},
|
||||
}),
|
||||
json!({
|
||||
"msg": {
|
||||
"message": "Done 2",
|
||||
"type": "agent_message",
|
||||
},
|
||||
}),
|
||||
json!({
|
||||
"msg": {
|
||||
"last_agent_message": "Done 2",
|
||||
"type": "task_complete",
|
||||
},
|
||||
}),
|
||||
];
|
||||
assert_eq!(*events, expected);
|
||||
}
|
||||
|
||||
//
|
||||
@@ -138,6 +138,11 @@ impl BottomPane<'_> {
|
||||
view.handle_key_event(self, key_event);
|
||||
if !view.is_complete() {
|
||||
self.active_view = Some(view);
|
||||
} else if self.is_task_running {
|
||||
let mut v = StatusIndicatorView::new(self.app_event_tx.clone());
|
||||
v.update_text("waiting for model".to_string());
|
||||
self.active_view = Some(Box::new(v));
|
||||
self.status_view_active = true;
|
||||
}
|
||||
self.request_redraw();
|
||||
InputResult::None
|
||||
@@ -163,6 +168,12 @@ impl BottomPane<'_> {
|
||||
CancellationEvent::Handled => {
|
||||
if !view.is_complete() {
|
||||
self.active_view = Some(view);
|
||||
} else if self.is_task_running {
|
||||
// Modal aborted but task still running – restore status indicator.
|
||||
let mut v = StatusIndicatorView::new(self.app_event_tx.clone());
|
||||
v.update_text("waiting for model".to_string());
|
||||
self.active_view = Some(Box::new(v));
|
||||
self.status_view_active = true;
|
||||
}
|
||||
self.show_ctrl_c_quit_hint();
|
||||
}
|
||||
@@ -202,15 +213,20 @@ impl BottomPane<'_> {
|
||||
handled_by_view = true;
|
||||
}
|
||||
|
||||
// Fallback: if the current active view did not consume status updates,
|
||||
// present an overlay above the composer.
|
||||
if !handled_by_view {
|
||||
// Fallback: if the current active view did not consume status updates
|
||||
// and no modal view is active, present an overlay above the composer.
|
||||
// If a modal is active, do NOT render the overlay to avoid drawing
|
||||
// over the dialog.
|
||||
if !handled_by_view && self.active_view.is_none() {
|
||||
if self.live_status.is_none() {
|
||||
self.live_status = Some(StatusIndicatorWidget::new(self.app_event_tx.clone()));
|
||||
}
|
||||
if let Some(status) = &mut self.live_status {
|
||||
status.update_text(text);
|
||||
}
|
||||
} else if !handled_by_view {
|
||||
// Ensure any previous overlay is cleared when a modal becomes active.
|
||||
self.live_status = None;
|
||||
}
|
||||
self.request_redraw();
|
||||
}
|
||||
@@ -296,6 +312,8 @@ impl BottomPane<'_> {
|
||||
// Otherwise create a new approval modal overlay.
|
||||
let modal = ApprovalModalView::new(request, self.app_event_tx.clone());
|
||||
self.active_view = Some(Box::new(modal));
|
||||
// Hide any overlay status while a modal is visible.
|
||||
self.live_status = None;
|
||||
self.status_view_active = false;
|
||||
self.request_redraw()
|
||||
}
|
||||
@@ -368,16 +386,18 @@ impl WidgetRef for &BottomPane<'_> {
|
||||
y_offset = y_offset.saturating_add(1);
|
||||
}
|
||||
if let Some(status) = &self.live_status {
|
||||
let live_h = status.desired_height(area.width).min(area.height);
|
||||
let live_h = status
|
||||
.desired_height(area.width)
|
||||
.min(area.height.saturating_sub(y_offset));
|
||||
if live_h > 0 {
|
||||
let live_rect = Rect {
|
||||
x: area.x,
|
||||
y: area.y,
|
||||
y: area.y + y_offset,
|
||||
width: area.width,
|
||||
height: live_h,
|
||||
};
|
||||
status.render_ref(live_rect, buf);
|
||||
y_offset = live_h;
|
||||
y_offset = y_offset.saturating_add(live_h);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -540,6 +560,122 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn overlay_not_shown_above_approval_modal() {
|
||||
let (tx_raw, _rx) = channel::<AppEvent>();
|
||||
let tx = AppEventSender::new(tx_raw);
|
||||
let mut pane = BottomPane::new(BottomPaneParams {
|
||||
app_event_tx: tx,
|
||||
has_input_focus: true,
|
||||
enhanced_keys_supported: false,
|
||||
});
|
||||
|
||||
// Create an approval modal (active view).
|
||||
pane.push_approval_request(exec_request());
|
||||
// Attempt to update status; this should NOT create an overlay while modal is visible.
|
||||
pane.update_status_text("running command".to_string());
|
||||
|
||||
// Render and verify the top row does not include the Working header overlay.
|
||||
let area = Rect::new(0, 0, 60, 6);
|
||||
let mut buf = Buffer::empty(area);
|
||||
(&pane).render_ref(area, &mut buf);
|
||||
|
||||
let mut r0 = String::new();
|
||||
for x in 0..area.width {
|
||||
r0.push(buf[(x, 0)].symbol().chars().next().unwrap_or(' '));
|
||||
}
|
||||
assert!(
|
||||
!r0.contains("Working"),
|
||||
"overlay Working header should not render above modal"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn composer_not_shown_after_denied_if_task_running() {
|
||||
let (tx_raw, rx) = channel::<AppEvent>();
|
||||
let tx = AppEventSender::new(tx_raw);
|
||||
let mut pane = BottomPane::new(BottomPaneParams {
|
||||
app_event_tx: tx.clone(),
|
||||
has_input_focus: true,
|
||||
enhanced_keys_supported: false,
|
||||
});
|
||||
|
||||
// Start a running task so the status indicator replaces the composer.
|
||||
pane.set_task_running(true);
|
||||
pane.update_status_text("waiting for model".to_string());
|
||||
|
||||
// Push an approval modal (e.g., command approval) which should hide the status view.
|
||||
pane.push_approval_request(exec_request());
|
||||
|
||||
// Simulate pressing 'n' (deny) on the modal.
|
||||
use crossterm::event::KeyCode;
|
||||
use crossterm::event::KeyEvent;
|
||||
use crossterm::event::KeyModifiers;
|
||||
pane.handle_key_event(KeyEvent::new(KeyCode::Char('n'), KeyModifiers::NONE));
|
||||
|
||||
// After denial, since the task is still running, the status indicator
|
||||
// should be restored as the active view; the composer should NOT be visible.
|
||||
assert!(
|
||||
pane.status_view_active,
|
||||
"status view should be active after denial"
|
||||
);
|
||||
assert!(pane.active_view.is_some(), "active view should be present");
|
||||
|
||||
// Render and ensure the top row includes the Working header instead of the composer.
|
||||
// Give the animation thread a moment to tick.
|
||||
std::thread::sleep(std::time::Duration::from_millis(120));
|
||||
let area = Rect::new(0, 0, 40, 3);
|
||||
let mut buf = Buffer::empty(area);
|
||||
(&pane).render_ref(area, &mut buf);
|
||||
let mut row0 = String::new();
|
||||
for x in 0..area.width {
|
||||
row0.push(buf[(x, 0)].symbol().chars().next().unwrap_or(' '));
|
||||
}
|
||||
assert!(
|
||||
row0.contains("Working"),
|
||||
"expected Working header after denial: {row0:?}"
|
||||
);
|
||||
|
||||
// Drain the channel to avoid unused warnings.
|
||||
drop(rx);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn status_indicator_visible_during_command_execution() {
|
||||
let (tx_raw, _rx) = channel::<AppEvent>();
|
||||
let tx = AppEventSender::new(tx_raw);
|
||||
let mut pane = BottomPane::new(BottomPaneParams {
|
||||
app_event_tx: tx,
|
||||
has_input_focus: true,
|
||||
enhanced_keys_supported: false,
|
||||
});
|
||||
|
||||
// Begin a task: show initial status.
|
||||
pane.set_task_running(true);
|
||||
pane.update_status_text("waiting for model".to_string());
|
||||
|
||||
// As a long-running command begins (post-approval), ensure the status
|
||||
// indicator is visible while we wait for the command to run.
|
||||
pane.update_status_text("running command".to_string());
|
||||
|
||||
// Allow some frames so the animation thread ticks.
|
||||
std::thread::sleep(std::time::Duration::from_millis(120));
|
||||
|
||||
// Render and confirm the line contains the "Working" header.
|
||||
let area = Rect::new(0, 0, 40, 3);
|
||||
let mut buf = Buffer::empty(area);
|
||||
(&pane).render_ref(area, &mut buf);
|
||||
|
||||
let mut row0 = String::new();
|
||||
for x in 0..area.width {
|
||||
row0.push(buf[(x, 0)].symbol().chars().next().unwrap_or(' '));
|
||||
}
|
||||
assert!(
|
||||
row0.contains("Working"),
|
||||
"expected Working header: {row0:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bottom_padding_present_for_status_view() {
|
||||
let (tx_raw, _rx) = channel::<AppEvent>();
|
||||
|
||||
@@ -9,6 +9,8 @@ use codex_core::protocol::AgentMessageDeltaEvent;
|
||||
use codex_core::protocol::AgentMessageEvent;
|
||||
use codex_core::protocol::AgentReasoningDeltaEvent;
|
||||
use codex_core::protocol::AgentReasoningEvent;
|
||||
use codex_core::protocol::AgentReasoningRawContentDeltaEvent;
|
||||
use codex_core::protocol::AgentReasoningRawContentEvent;
|
||||
use codex_core::protocol::ApplyPatchApprovalRequestEvent;
|
||||
use codex_core::protocol::ErrorEvent;
|
||||
use codex_core::protocol::Event;
|
||||
@@ -61,6 +63,7 @@ pub(crate) struct ChatWidget<'a> {
|
||||
initial_user_message: Option<UserMessage>,
|
||||
token_usage: TokenUsage,
|
||||
reasoning_buffer: String,
|
||||
content_buffer: String,
|
||||
// Buffer for streaming assistant answer text; we do not surface partial
|
||||
// We wait for the final AgentMessage event and then emit the full text
|
||||
// at once into scrollback so the history contains a single message.
|
||||
@@ -101,6 +104,24 @@ fn create_initial_user_message(text: String, image_paths: Vec<PathBuf>) -> Optio
|
||||
}
|
||||
|
||||
impl ChatWidget<'_> {
|
||||
fn emit_stream_header(&mut self, kind: StreamKind) {
|
||||
use ratatui::text::Line as RLine;
|
||||
if self.stream_header_emitted {
|
||||
return;
|
||||
}
|
||||
let header = match kind {
|
||||
StreamKind::Reasoning => RLine::from("thinking".magenta().italic()),
|
||||
StreamKind::Answer => RLine::from("codex".magenta().bold()),
|
||||
};
|
||||
self.app_event_tx
|
||||
.send(AppEvent::InsertHistory(vec![header]));
|
||||
self.stream_header_emitted = true;
|
||||
}
|
||||
fn finalize_active_stream(&mut self) {
|
||||
if let Some(kind) = self.current_stream {
|
||||
self.finalize_stream(kind);
|
||||
}
|
||||
}
|
||||
pub(crate) fn new(
|
||||
config: Config,
|
||||
app_event_tx: AppEventSender,
|
||||
@@ -161,6 +182,7 @@ impl ChatWidget<'_> {
|
||||
),
|
||||
token_usage: TokenUsage::default(),
|
||||
reasoning_buffer: String::new(),
|
||||
content_buffer: String::new(),
|
||||
answer_buffer: String::new(),
|
||||
running_commands: HashMap::new(),
|
||||
live_builder: RowBuilder::new(80),
|
||||
@@ -276,6 +298,20 @@ impl ChatWidget<'_> {
|
||||
self.finalize_stream(StreamKind::Reasoning);
|
||||
self.request_redraw();
|
||||
}
|
||||
EventMsg::AgentReasoningRawContentDelta(AgentReasoningRawContentDeltaEvent {
|
||||
delta,
|
||||
}) => {
|
||||
// Treat raw reasoning content the same as summarized reasoning for UI flow.
|
||||
self.begin_stream(StreamKind::Reasoning);
|
||||
self.reasoning_buffer.push_str(&delta);
|
||||
self.stream_push_and_maybe_commit(&delta);
|
||||
self.request_redraw();
|
||||
}
|
||||
EventMsg::AgentReasoningRawContent(AgentReasoningRawContentEvent { text: _ }) => {
|
||||
// Finalize the raw reasoning stream just like the summarized reasoning event.
|
||||
self.finalize_stream(StreamKind::Reasoning);
|
||||
self.request_redraw();
|
||||
}
|
||||
EventMsg::TaskStarted => {
|
||||
self.bottom_pane.clear_ctrl_c_quit_hint();
|
||||
self.bottom_pane.set_task_running(true);
|
||||
@@ -299,6 +335,14 @@ impl ChatWidget<'_> {
|
||||
EventMsg::Error(ErrorEvent { message }) => {
|
||||
self.add_to_history(HistoryCell::new_error_event(message.clone()));
|
||||
self.bottom_pane.set_task_running(false);
|
||||
self.bottom_pane.clear_live_ring();
|
||||
self.live_builder = RowBuilder::new(self.live_builder.width());
|
||||
self.current_stream = None;
|
||||
self.stream_header_emitted = false;
|
||||
self.answer_buffer.clear();
|
||||
self.reasoning_buffer.clear();
|
||||
self.content_buffer.clear();
|
||||
self.request_redraw();
|
||||
}
|
||||
EventMsg::PlanUpdate(update) => {
|
||||
// Commit plan updates directly to history (no status-line preview).
|
||||
@@ -310,6 +354,7 @@ impl ChatWidget<'_> {
|
||||
cwd,
|
||||
reason,
|
||||
}) => {
|
||||
self.finalize_active_stream();
|
||||
// Log a background summary immediately so the history is chronological.
|
||||
let cmdline = strip_bash_lc_and_escape(&command);
|
||||
let text = format!(
|
||||
@@ -336,6 +381,7 @@ impl ChatWidget<'_> {
|
||||
reason,
|
||||
grant_root,
|
||||
}) => {
|
||||
self.finalize_active_stream();
|
||||
// ------------------------------------------------------------------
|
||||
// Before we even prompt the user for approval we surface the patch
|
||||
// summary in the main conversation so that the dialog appears in a
|
||||
@@ -365,6 +411,10 @@ impl ChatWidget<'_> {
|
||||
command,
|
||||
cwd,
|
||||
}) => {
|
||||
self.finalize_active_stream();
|
||||
// Ensure the status indicator is visible while the command runs.
|
||||
self.bottom_pane
|
||||
.update_status_text("running command".to_string());
|
||||
self.running_commands.insert(
|
||||
call_id,
|
||||
RunningCommand {
|
||||
@@ -408,6 +458,7 @@ impl ChatWidget<'_> {
|
||||
call_id: _,
|
||||
invocation,
|
||||
}) => {
|
||||
self.finalize_active_stream();
|
||||
self.add_to_history(HistoryCell::new_active_mcp_tool_call(invocation));
|
||||
}
|
||||
EventMsg::McpToolCallEnd(McpToolCallEndEvent {
|
||||
@@ -451,7 +502,9 @@ impl ChatWidget<'_> {
|
||||
|
||||
/// Update the live log preview while a task is running.
|
||||
pub(crate) fn update_latest_log(&mut self, line: String) {
|
||||
self.bottom_pane.update_status_text(line);
|
||||
if self.bottom_pane.is_task_running() {
|
||||
self.bottom_pane.update_status_text(line);
|
||||
}
|
||||
}
|
||||
|
||||
fn request_redraw(&mut self) {
|
||||
@@ -478,8 +531,15 @@ impl ChatWidget<'_> {
|
||||
if self.bottom_pane.is_task_running() {
|
||||
self.bottom_pane.clear_ctrl_c_quit_hint();
|
||||
self.submit_op(Op::Interrupt);
|
||||
self.bottom_pane.set_task_running(false);
|
||||
self.bottom_pane.clear_live_ring();
|
||||
self.live_builder = RowBuilder::new(self.live_builder.width());
|
||||
self.current_stream = None;
|
||||
self.stream_header_emitted = false;
|
||||
self.answer_buffer.clear();
|
||||
self.reasoning_buffer.clear();
|
||||
self.content_buffer.clear();
|
||||
self.request_redraw();
|
||||
CancellationEvent::Ignored
|
||||
} else if self.bottom_pane.ctrl_c_quit_hint_visible() {
|
||||
self.submit_op(Op::Shutdown);
|
||||
@@ -518,6 +578,12 @@ impl ChatWidget<'_> {
|
||||
|
||||
impl ChatWidget<'_> {
|
||||
fn begin_stream(&mut self, kind: StreamKind) {
|
||||
if let Some(current) = self.current_stream {
|
||||
if current != kind {
|
||||
self.finalize_stream(current);
|
||||
}
|
||||
}
|
||||
|
||||
if self.current_stream != Some(kind) {
|
||||
self.current_stream = Some(kind);
|
||||
self.stream_header_emitted = false;
|
||||
@@ -526,6 +592,7 @@ impl ChatWidget<'_> {
|
||||
// Ensure the waiting status is visible (composer replaced).
|
||||
self.bottom_pane
|
||||
.update_status_text("waiting for model".to_string());
|
||||
self.emit_stream_header(kind);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -7,7 +7,6 @@ use codex_common::elapsed::format_duration;
|
||||
use codex_common::summarize_sandbox_policy;
|
||||
use codex_core::WireApi;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::model_supports_reasoning_summaries;
|
||||
use codex_core::plan_tool::PlanItemArg;
|
||||
use codex_core::plan_tool::StepStatus;
|
||||
use codex_core::plan_tool::UpdatePlanArgs;
|
||||
@@ -177,7 +176,7 @@ impl HistoryCell {
|
||||
("sandbox", summarize_sandbox_policy(&config.sandbox_policy)),
|
||||
];
|
||||
if config.model_provider.wire_api == WireApi::Responses
|
||||
&& model_supports_reasoning_summaries(config)
|
||||
&& config.model_family.supports_reasoning_summaries
|
||||
{
|
||||
entries.push((
|
||||
"reasoning effort",
|
||||
|
||||
Reference in New Issue
Block a user