feat: add threadId to MCP server messages (#9192)

This favors `threadId` instead of `conversationId` so we use the same
terms as https://developers.openai.com/codex/sdk/.

To test the local build:

```
cd codex-rs
cargo build --bin codex
npx -y @modelcontextprotocol/inspector ./target/debug/codex mcp-server
```

I sent:

```json
{
  "method": "tools/call",
  "params": {
    "name": "codex",
    "arguments": {
      "prompt": "favorite ls option?"
    },
    "_meta": {
      "progressToken": 0
    }
  }
}
```

and got:

```json
{
  "content": [
    {
      "type": "text",
      "text": "`ls -lah` (or `ls -alh`) — long listing, includes dotfiles, human-readable sizes."
    }
  ],
  "structuredContent": {
    "threadId": "019bbb20-bff6-7130-83aa-bf45ab33250e"
  }
}
```

and successfully used the `threadId` in the follow-up with the
`codex-reply` tool call:

```json
{
  "method": "tools/call",
  "params": {
    "name": "codex-reply",
    "arguments": {
      "prompt": "what is the long versoin",
      "threadId": "019bbb20-bff6-7130-83aa-bf45ab33250e"
    },
    "_meta": {
      "progressToken": 1
    }
  }
}
```

whose response also has the `threadId`:

```json
{
  "content": [
    {
      "type": "text",
      "text": "Long listing is `ls -l` (adds permissions, owner/group, size, timestamp)."
    }
  ],
  "structuredContent": {
    "threadId": "019bbb20-bff6-7130-83aa-bf45ab33250e"
  }
}
```

Fixes https://github.com/openai/codex/issues/3712.
This commit is contained in:
Michael Bolin
2026-01-13 22:14:41 -08:00
committed by GitHub
parent 5675af5190
commit 0c09dc3c03
7 changed files with 270 additions and 90 deletions

View File

@@ -32,6 +32,26 @@ use tokio::sync::Mutex;
pub(crate) const INVALID_PARAMS_ERROR_CODE: i64 = -32602;
/// To adhere to MCP `tools/call` response format, include the Codex
/// `threadId` in the `structured_content` field of the response.
fn create_call_tool_result_with_thread_id(
thread_id: ThreadId,
text: String,
is_error: Option<bool>,
) -> CallToolResult {
CallToolResult {
content: vec![ContentBlock::TextContent(TextContent {
r#type: "text".to_string(),
text,
annotations: None,
})],
is_error,
structured_content: Some(json!({
"threadId": thread_id,
})),
}
}
/// Run a complete Codex session and stream events back to the client.
///
/// On completion (success or error) the function sends the appropriate
@@ -73,7 +93,10 @@ pub async fn run_codex_tool_session(
outgoing
.send_event_as_notification(
&session_configured_event,
Some(OutgoingNotificationMeta::new(Some(id.clone()))),
Some(OutgoingNotificationMeta {
request_id: Some(id.clone()),
thread_id: Some(thread_id),
}),
)
.await;
@@ -100,27 +123,40 @@ pub async fn run_codex_tool_session(
if let Err(e) = thread.submit_with_id(submission).await {
tracing::error!("Failed to submit initial prompt: {e}");
let result = create_call_tool_result_with_thread_id(
thread_id,
format!("Failed to submit initial prompt: {e}"),
Some(true),
);
outgoing.send_response(id.clone(), result).await;
// unregister the id so we don't keep it in the map
running_requests_id_to_codex_uuid.lock().await.remove(&id);
return;
}
run_codex_tool_session_inner(thread, outgoing, id, running_requests_id_to_codex_uuid).await;
run_codex_tool_session_inner(
thread_id,
thread,
outgoing,
id,
running_requests_id_to_codex_uuid,
)
.await;
}
pub async fn run_codex_tool_session_reply(
conversation: Arc<CodexThread>,
thread_id: ThreadId,
thread: Arc<CodexThread>,
outgoing: Arc<OutgoingMessageSender>,
request_id: RequestId,
prompt: String,
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, ThreadId>>>,
conversation_id: ThreadId,
) {
running_requests_id_to_codex_uuid
.lock()
.await
.insert(request_id.clone(), conversation_id);
if let Err(e) = conversation
.insert(request_id.clone(), thread_id);
if let Err(e) = thread
.submit(Op::UserInput {
items: vec![UserInput::Text { text: prompt }],
final_output_json_schema: None,
@@ -128,6 +164,12 @@ pub async fn run_codex_tool_session_reply(
.await
{
tracing::error!("Failed to submit user input: {e}");
let result = create_call_tool_result_with_thread_id(
thread_id,
format!("Failed to submit user input: {e}"),
Some(true),
);
outgoing.send_response(request_id.clone(), result).await;
// unregister the id so we don't keep it in the map
running_requests_id_to_codex_uuid
.lock()
@@ -137,7 +179,8 @@ pub async fn run_codex_tool_session_reply(
}
run_codex_tool_session_inner(
conversation,
thread_id,
thread,
outgoing,
request_id,
running_requests_id_to_codex_uuid,
@@ -146,7 +189,8 @@ pub async fn run_codex_tool_session_reply(
}
async fn run_codex_tool_session_inner(
codex: Arc<CodexThread>,
thread_id: ThreadId,
thread: Arc<CodexThread>,
outgoing: Arc<OutgoingMessageSender>,
request_id: RequestId,
running_requests_id_to_codex_uuid: Arc<Mutex<HashMap<RequestId, ThreadId>>>,
@@ -159,12 +203,15 @@ async fn run_codex_tool_session_inner(
// Stream events until the task needs to pause for user interaction or
// completes.
loop {
match codex.next_event().await {
match thread.next_event().await {
Ok(event) => {
outgoing
.send_event_as_notification(
&event,
Some(OutgoingNotificationMeta::new(Some(request_id.clone()))),
Some(OutgoingNotificationMeta {
request_id: Some(request_id.clone()),
thread_id: Some(thread_id),
}),
)
.await;
@@ -182,21 +229,24 @@ async fn run_codex_tool_session_inner(
command,
cwd,
outgoing.clone(),
codex.clone(),
thread.clone(),
request_id.clone(),
request_id_str.clone(),
event.id.clone(),
call_id,
parsed_cmd,
thread_id,
)
.await;
continue;
}
EventMsg::Error(err_event) => {
// Return a response to conclude the tool call when the Codex session reports an error (e.g., interruption).
let result = json!({
"error": err_event.message,
});
// Always respond in tools/call's expected shape, and include conversationId so the client can resume.
let result = create_call_tool_result_with_thread_id(
thread_id,
err_event.message,
Some(true),
);
outgoing.send_response(request_id.clone(), result).await;
break;
}
@@ -220,10 +270,11 @@ async fn run_codex_tool_session_inner(
grant_root,
changes,
outgoing.clone(),
codex.clone(),
thread.clone(),
request_id.clone(),
request_id_str.clone(),
event.id.clone(),
thread_id,
)
.await;
continue;
@@ -233,15 +284,7 @@ async fn run_codex_tool_session_inner(
Some(msg) => msg,
None => "".to_string(),
};
let result = CallToolResult {
content: vec![ContentBlock::TextContent(TextContent {
r#type: "text".to_string(),
text,
annotations: None,
})],
is_error: None,
structured_content: None,
};
let result = create_call_tool_result_with_thread_id(thread_id, text, None);
outgoing.send_response(request_id.clone(), result).await;
// unregister the id so we don't keep it in the map
running_requests_id_to_codex_uuid
@@ -317,20 +360,32 @@ async fn run_codex_tool_session_inner(
}
}
Err(e) => {
let result = CallToolResult {
content: vec![ContentBlock::TextContent(TextContent {
r#type: "text".to_string(),
text: format!("Codex runtime error: {e}"),
annotations: None,
})],
is_error: Some(true),
// TODO(mbolin): Could present the error in a more
// structured way.
structured_content: None,
};
let result = create_call_tool_result_with_thread_id(
thread_id,
format!("Codex runtime error: {e}"),
Some(true),
);
outgoing.send_response(request_id.clone(), result).await;
break;
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn call_tool_result_includes_thread_id_in_structured_content() {
let thread_id = ThreadId::new();
let result = create_call_tool_result_with_thread_id(thread_id, "done".to_string(), None);
assert_eq!(
result.structured_content,
Some(json!({
"threadId": thread_id,
}))
);
}
}