Compare commits

...

1 Commits

Author SHA1 Message Date
Eric Horacek
4b1bc6e4ec app-server: support caller-supplied thread and turn IDs
Add optional threadId/turnId fields to the v2 app-server API and document them as UUIDs.

Validate caller-supplied IDs in the app server, reject duplicates, and route custom turn IDs through submit_with_id while surfacing invalid-request errors correctly. Canonicalize caller-supplied turn IDs to hyphenated UUID strings so case variants cannot bypass duplicate checks.

Plumb requested thread IDs through core thread spawning and add duplicate-ID guards in core (session-level duplicate submission IDs and ThreadManager thread-ID collision checks).

Cache known turn IDs per loaded thread so rollout history is only parsed once when validating custom turn IDs, and track active turn IDs without cloning full turn state on each streamed event.

Update docs/schema fixtures and add app-server tests for caller-supplied IDs, UUID validation, and duplicate-turn rejection.
2026-02-20 13:39:49 -08:00
16 changed files with 575 additions and 23 deletions

View File

@@ -2973,6 +2973,13 @@
"type": "null"
}
]
},
"threadId": {
"description": "Optional caller-supplied thread ID (UUID). If omitted, the server generates a new thread ID.",
"type": [
"string",
"null"
]
}
},
"type": "object"
@@ -3085,6 +3092,13 @@
},
"threadId": {
"type": "string"
},
"turnId": {
"description": "Optional caller-supplied turn ID (UUID). If omitted, the server generates a new turn ID.",
"type": [
"string",
"null"
]
}
},
"required": [

View File

@@ -16625,6 +16625,13 @@
"type": "null"
}
]
},
"threadId": {
"description": "Optional caller-supplied thread ID (UUID). If omitted, the server generates a new thread ID.",
"type": [
"string",
"null"
]
}
},
"title": "ThreadStartParams",
@@ -17204,6 +17211,13 @@
},
"threadId": {
"type": "string"
},
"turnId": {
"description": "Optional caller-supplied turn ID (UUID). If omitted, the server generates a new turn ID.",
"type": [
"string",
"null"
]
}
},
"required": [

View File

@@ -150,6 +150,13 @@
"type": "null"
}
]
},
"threadId": {
"description": "Optional caller-supplied thread ID (UUID). If omitted, the server generates a new thread ID.",
"type": [
"string",
"null"
]
}
},
"title": "ThreadStartParams",

View File

@@ -552,6 +552,13 @@
},
"threadId": {
"type": "string"
},
"turnId": {
"description": "Optional caller-supplied turn ID (UUID). If omitted, the server generates a new turn ID.",
"type": [
"string",
"null"
]
}
},
"required": [

View File

@@ -6,7 +6,11 @@ import type { JsonValue } from "../serde_json/JsonValue";
import type { AskForApproval } from "./AskForApproval";
import type { SandboxMode } from "./SandboxMode";
export type ThreadStartParams = {model?: string | null, modelProvider?: string | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, personality?: Personality | null, ephemeral?: boolean | null, /**
export type ThreadStartParams = {/**
* Optional caller-supplied thread ID (UUID). If omitted, the server
* generates a new thread ID.
*/
threadId?: string | null, model?: string | null, modelProvider?: string | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, personality?: Personality | null, ephemeral?: boolean | null, /**
* If true, opt into emitting raw Responses API items on the event stream.
* This is for internal use only (e.g. Codex Cloud).
*/

View File

@@ -10,7 +10,11 @@ import type { AskForApproval } from "./AskForApproval";
import type { SandboxPolicy } from "./SandboxPolicy";
import type { UserInput } from "./UserInput";
export type TurnStartParams = {threadId: string, input: Array<UserInput>, /**
export type TurnStartParams = {threadId: string, /**
* Optional caller-supplied turn ID (UUID). If omitted, the server
* generates a new turn ID.
*/
turnId?: string | null, input: Array<UserInput>, /**
* Override the working directory for this turn and subsequent turns.
*/
cwd?: string | null, /**

View File

@@ -100,6 +100,13 @@ impl ThreadHistoryBuilder {
.or_else(|| self.turns.last().cloned())
}
pub fn active_turn_id(&self) -> Option<&str> {
self.current_turn
.as_ref()
.map(|turn| turn.id.as_str())
.or_else(|| self.turns.last().map(|turn| turn.id.as_str()))
}
pub fn has_active_turn(&self) -> bool {
self.current_turn.is_some()
}

View File

@@ -1483,6 +1483,10 @@ pub struct CommandExecResponse {
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadStartParams {
/// Optional caller-supplied thread ID (UUID). If omitted, the server
/// generates a new thread ID.
#[ts(optional = nullable)]
pub thread_id: Option<String>,
#[ts(optional = nullable)]
pub model: Option<String>,
#[ts(optional = nullable)]
@@ -2339,6 +2343,10 @@ pub enum TurnStatus {
#[ts(export_to = "v2/")]
pub struct TurnStartParams {
pub thread_id: String,
/// Optional caller-supplied turn ID (UUID). If omitted, the server
/// generates a new turn ID.
#[ts(optional = nullable)]
pub turn_id: Option<String>,
pub input: Vec<UserInput>,
/// Override the working directory for this turn and subsequent turns.
#[ts(optional = nullable)]

View File

@@ -114,7 +114,7 @@ Example with notification opt-out:
## API Overview
- `thread/start` — create a new thread; emits `thread/started` and auto-subscribes you to turn/item events for that thread.
- `thread/start` — create a new thread; optionally supply `threadId` (UUID) to pre-assign the thread id. Emits `thread/started` and auto-subscribes you to turn/item events for that thread.
- `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it.
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; emits `thread/started` and auto-subscribes you to turn/item events for the new thread.
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, and `cwd` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
@@ -127,7 +127,7 @@ Example with notification opt-out:
- `thread/compact/start` — trigger conversation history compaction for a thread; returns `{}` immediately while progress streams through standard turn/item notifications.
- `thread/backgroundTerminals/clean` — terminate all running background terminals for a thread (experimental; requires `capabilities.experimentalApi`); returns `{}` when the cleanup request is accepted.
- `thread/rollback` — drop the last N turns from the agents in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
- `turn/start` — add user input to a thread and begin Codex generation; optionally supply `turnId` (UUID) to pre-assign the turn id. Responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
- `turn/steer` — add user input to an already in-flight turn without starting a new turn; returns the active `turnId` that accepted the input.
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
- `review/start` — kick off Codexs automated reviewer for a thread; responds like `turn/start` and emits `item/started`/`item/completed` notifications with `enteredReviewMode` and `exitedReviewMode` items, plus a final assistant `agentMessage` containing the review.
@@ -158,6 +158,8 @@ Start a fresh thread when you need a new Codex conversation.
```json
{ "method": "thread/start", "id": 10, "params": {
// Optionally pre-assign the thread id (must be a UUID string).
"threadId": "018f2b7a-5d5d-7c39-9d6b-9f3f88d4f1c2",
// Optionally set config settings. If not specified, will use the user's
// current config settings.
"model": "gpt-5.1-codex",
@@ -339,6 +341,8 @@ You can optionally specify config overrides on the new turn. If specified, these
```json
{ "method": "turn/start", "id": 30, "params": {
"threadId": "thr_123",
// Optionally pre-assign the turn id (must be a UUID string).
"turnId": "018f2b7a-5d5d-7c39-9d6b-9f3f88d4f1c3",
"input": [ { "type": "text", "text": "Run tests" } ],
// Below are optional config overrides
"cwd": "/Users/me/project",

View File

@@ -213,6 +213,7 @@ use codex_core::protocol::ReviewDelivery as CoreReviewDelivery;
use codex_core::protocol::ReviewRequest;
use codex_core::protocol::ReviewTarget as CoreReviewTarget;
use codex_core::protocol::SessionConfiguredEvent;
use codex_core::protocol::Submission;
use codex_core::read_head_for_summary;
use codex_core::read_session_meta_line;
use codex_core::rollout_date_parts;
@@ -390,6 +391,70 @@ impl CodexMessageProcessor {
Ok((thread_id, thread))
}
async fn validate_requested_turn_id(
&mut self,
thread_uuid: ThreadId,
thread: &Arc<CodexThread>,
turn_id: &str,
) -> Result<(), JSONRPCErrorError> {
let thread_state = self.thread_state_manager.thread_state(thread_uuid);
{
let state = thread_state.lock().await;
if state.active_turn_id().is_some_and(|id| id == turn_id)
|| state.has_known_turn_id(turn_id)
{
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("turnId already exists: {turn_id}"),
data: None,
});
}
if state.known_turn_ids_seeded() {
return Ok(());
}
}
let historical_turn_ids = if let Some(rollout_path) = thread.rollout_path()
&& rollout_path.exists()
{
read_rollout_items_from_rollout(rollout_path.as_path())
.await
.map(|items| build_turns_from_rollout_items(&items))
.map(|turns| turns.into_iter().map(|turn| turn.id).collect::<Vec<_>>())
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!(
"failed to validate turnId {turn_id} against rollout history: {err}"
),
data: None,
})?
} else {
Vec::new()
};
let mut state = thread_state.lock().await;
if state.active_turn_id().is_some_and(|id| id == turn_id) {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("turnId already exists: {turn_id}"),
data: None,
});
}
if !state.known_turn_ids_seeded() {
state.seed_known_turn_ids(historical_turn_ids);
}
if state.has_known_turn_id(turn_id) {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("turnId already exists: {turn_id}"),
data: None,
});
}
Ok(())
}
pub fn new(args: CodexMessageProcessorArgs) -> Self {
let CodexMessageProcessorArgs {
auth_manager,
@@ -1940,6 +2005,7 @@ impl CodexMessageProcessor {
async fn thread_start(&mut self, request_id: ConnectionRequestId, params: ThreadStartParams) {
let ThreadStartParams {
thread_id,
model,
model_provider,
cwd,
@@ -1955,6 +2021,17 @@ impl CodexMessageProcessor {
ephemeral,
persist_extended_history,
} = params;
let requested_thread_id = match thread_id {
Some(thread_id) => match ThreadId::from_string(&thread_id) {
Ok(thread_id) => Some(thread_id),
Err(err) => {
self.send_invalid_request_error(request_id, format!("invalid threadId: {err}"))
.await;
return;
}
},
None => None,
};
let mut typesafe_overrides = self.build_thread_config_overrides(
model,
model_provider,
@@ -1988,6 +2065,61 @@ impl CodexMessageProcessor {
}
};
if let Some(thread_id) = requested_thread_id {
let thread_id_str = thread_id.to_string();
if self.thread_manager.get_thread(thread_id).await.is_ok() {
self.send_invalid_request_error(
request_id,
format!("threadId already exists: {thread_id_str}"),
)
.await;
return;
}
let existing_thread_path =
match find_thread_path_by_id_str(&config.codex_home, &thread_id_str).await {
Ok(path) => path,
Err(err) => {
self.send_internal_error(
request_id,
format!("failed to validate threadId {thread_id_str}: {err}"),
)
.await;
return;
}
};
if existing_thread_path.is_some() {
self.send_invalid_request_error(
request_id,
format!("threadId already exists: {thread_id_str}"),
)
.await;
return;
}
let existing_archived_thread_path =
match find_archived_thread_path_by_id_str(&config.codex_home, &thread_id_str).await
{
Ok(path) => path,
Err(err) => {
self.send_internal_error(
request_id,
format!("failed to validate threadId {thread_id_str}: {err}"),
)
.await;
return;
}
};
if existing_archived_thread_path.is_some() {
self.send_invalid_request_error(
request_id,
format!("threadId already exists: {thread_id_str}"),
)
.await;
return;
}
}
let dynamic_tools = dynamic_tools.unwrap_or_default();
let core_dynamic_tools = if dynamic_tools.is_empty() {
Vec::new()
@@ -2013,7 +2145,12 @@ impl CodexMessageProcessor {
match self
.thread_manager
.start_thread_with_tools(config, core_dynamic_tools, persist_extended_history)
.start_thread_with_tools(
config,
core_dynamic_tools,
persist_extended_history,
requested_thread_id,
)
.await
{
Ok(new_conv) => {
@@ -5294,8 +5431,30 @@ impl CodexMessageProcessor {
let _ = conversation.submit(Op::Interrupt).await;
}
async fn turn_start(&self, request_id: ConnectionRequestId, params: TurnStartParams) {
let (_, thread) = match self.load_thread(&params.thread_id).await {
async fn turn_start(&mut self, request_id: ConnectionRequestId, params: TurnStartParams) {
if let Some(turn_id) = params.turn_id.as_ref()
&& turn_id.is_empty()
{
self.send_invalid_request_error(request_id, "turnId must not be empty".to_string())
.await;
return;
}
let turn_id = match params.turn_id {
Some(turn_id) => match Uuid::parse_str(&turn_id) {
Ok(turn_uuid) => Some(turn_uuid.hyphenated().to_string()),
Err(_) => {
self.send_invalid_request_error(
request_id,
"invalid turnId: must be a UUID".to_string(),
)
.await;
return;
}
},
None => None,
};
let (thread_uuid, thread) = match self.load_thread(&params.thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
@@ -5303,6 +5462,16 @@ impl CodexMessageProcessor {
}
};
if let Some(turn_id) = turn_id.as_deref() {
if let Err(error) = self
.validate_requested_turn_id(thread_uuid, &thread, turn_id)
.await
{
self.outgoing.send_error(request_id, error).await;
return;
}
}
let collaboration_mode = params
.collaboration_mode
.map(|mode| self.normalize_turn_start_collaboration_mode(mode));
@@ -5341,15 +5510,26 @@ impl CodexMessageProcessor {
}
// Start the turn by submitting the user input. Return its submission id as turn_id.
let turn_id = thread
.submit(Op::UserInput {
items: mapped_items,
final_output_json_schema: params.output_schema,
})
.await;
let op = Op::UserInput {
items: mapped_items,
final_output_json_schema: params.output_schema,
};
let turn_id = match turn_id {
Some(turn_id) => thread
.submit_with_id(Submission {
id: turn_id.clone(),
op,
})
.await
.map(|()| turn_id),
None => thread.submit(op).await,
};
match turn_id {
Ok(turn_id) => {
let thread_state = self.thread_state_manager.thread_state(thread_uuid);
thread_state.lock().await.remember_turn_id(turn_id.clone());
let turn = Turn {
id: turn_id.clone(),
items: vec![],
@@ -5369,14 +5549,19 @@ impl CodexMessageProcessor {
.send_server_notification(ServerNotification::TurnStarted(notif))
.await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start turn: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
Err(err) => match err {
CodexErr::InvalidRequest(message) => {
self.send_invalid_request_error(request_id, message).await;
}
err => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start turn: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
},
}
}

View File

@@ -49,6 +49,8 @@ pub(crate) struct ThreadState {
pub(crate) experimental_raw_events: bool,
listener_command_tx: Option<mpsc::UnboundedSender<ThreadListenerCommand>>,
current_turn_history: ThreadHistoryBuilder,
known_turn_ids: HashSet<String>,
known_turn_ids_seeded: bool,
listener_thread: Option<Weak<CodexThread>>,
subscribed_connections: HashSet<ConnectionId>,
}
@@ -110,8 +112,35 @@ impl ThreadState {
self.current_turn_history.active_turn_snapshot()
}
pub(crate) fn active_turn_id(&self) -> Option<&str> {
self.current_turn_history.active_turn_id()
}
pub(crate) fn has_known_turn_id(&self, turn_id: &str) -> bool {
self.known_turn_ids.contains(turn_id)
}
pub(crate) fn known_turn_ids_seeded(&self) -> bool {
self.known_turn_ids_seeded
}
pub(crate) fn seed_known_turn_ids(&mut self, turn_ids: impl IntoIterator<Item = String>) {
if self.known_turn_ids_seeded {
return;
}
self.known_turn_ids.extend(turn_ids);
self.known_turn_ids_seeded = true;
}
pub(crate) fn remember_turn_id(&mut self, turn_id: impl Into<String>) {
self.known_turn_ids.insert(turn_id.into());
}
pub(crate) fn track_current_turn_event(&mut self, event: &EventMsg) {
self.current_turn_history.handle_event(event);
if let Some(active_turn_id) = self.current_turn_history.active_turn_id() {
self.known_turn_ids.insert(active_turn_id.to_string());
}
if !self.current_turn_history.has_active_turn() {
self.current_turn_history.reset();
}

View File

@@ -81,6 +81,43 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_start_uses_caller_supplied_thread_id() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let requested_thread_id = "018f2b7a-5d5d-7c39-9d6b-9f3f88d4f1c2".to_string();
let req_id = mcp
.send_thread_start_request(ThreadStartParams {
thread_id: Some(requested_thread_id.clone()),
..Default::default()
})
.await?;
let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(resp)?;
assert_eq!(thread.id, requested_thread_id);
let notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/started"),
)
.await??;
let started: ThreadStartedNotification =
serde_json::from_value(notif.params.expect("params must be present"))?;
assert_eq!(started.thread.id, thread.id);
Ok(())
}
#[tokio::test]
async fn thread_start_respects_project_config_from_cwd() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;

View File

@@ -19,6 +19,7 @@ use codex_app_server_protocol::FileChangeOutputDeltaNotification;
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::PatchApplyStatus;
@@ -135,6 +136,199 @@ async fn turn_start_sends_originator_header() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn turn_start_uses_caller_supplied_turn_id() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(
codex_home.path(),
&server.uri(),
"never",
&BTreeMap::from([(Feature::Personality, true)]),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let requested_turn_id = "018f2b7a-5d5d-7c39-9d6b-9f3f88d4f1d3".to_string();
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
turn_id: Some(requested_turn_id.clone()),
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let TurnStartResponse { turn: started_turn } = to_response::<TurnStartResponse>(turn_resp)?;
assert_eq!(started_turn.id, requested_turn_id);
let notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/started"),
)
.await??;
let started: TurnStartedNotification =
serde_json::from_value(notif.params.expect("turn/started params"))?;
assert_eq!(started.thread_id, thread.id);
assert_eq!(started.turn.id, started_turn.id);
let completed_notif: JSONRPCNotification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let completed: TurnCompletedNotification =
serde_json::from_value(completed_notif.params.expect("turn/completed params"))?;
assert_eq!(completed.turn.id, started_turn.id);
Ok(())
}
#[tokio::test]
async fn turn_start_rejects_non_uuid_turn_id() -> Result<()> {
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
let codex_home = TempDir::new()?;
create_config_toml(
codex_home.path(),
&server.uri(),
"never",
&BTreeMap::from([(Feature::Personality, true)]),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
turn_id: Some("not-a-uuid".to_string()),
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(turn_req)),
)
.await??;
assert!(err.error.message.contains("invalid turnId"));
Ok(())
}
#[tokio::test]
async fn turn_start_rejects_duplicate_turn_id() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(
codex_home.path(),
&server.uri(),
"never",
&BTreeMap::from([(Feature::Personality, true)]),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
})
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
let requested_turn_id = "018f2b7a-5d5d-7c39-9d6b-9f3f88d4f1d4".to_string();
let first_turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
turn_id: Some(requested_turn_id.clone()),
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(first_turn_req)),
)
.await??;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
let second_turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
turn_id: Some(requested_turn_id.to_uppercase()),
input: vec![V2UserInput::Text {
text: "Hello again".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let err: JSONRPCError = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(second_turn_req)),
)
.await??;
assert!(err.error.message.contains("turnId already exists"));
Ok(())
}
#[tokio::test]
async fn turn_start_emits_user_message_item_with_text_elements() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];
@@ -1101,6 +1295,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
let first_turn = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
turn_id: None,
input: vec![V2UserInput::Text {
text: "first turn".to_string(),
text_elements: Vec::new(),
@@ -1138,6 +1333,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
let second_turn = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id.clone(),
turn_id: None,
input: vec![V2UserInput::Text {
text: "second turn".to_string(),
text_elements: Vec::new(),

View File

@@ -300,6 +300,7 @@ impl Codex {
agent_control: AgentControl,
dynamic_tools: Vec<DynamicToolSpec>,
persist_extended_history: bool,
requested_thread_id: Option<ThreadId>,
) -> CodexResult<CodexSpawnOk> {
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_event, rx_event) = async_channel::unbounded();
@@ -429,6 +430,7 @@ impl Codex {
skills_manager,
file_watcher,
agent_control,
requested_thread_id,
)
.instrument(session_init_span)
.await
@@ -469,6 +471,7 @@ impl Codex {
/// Use sparingly: prefer `submit()` so Codex is responsible for generating
/// unique IDs for each submission.
pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> {
self.session.reserve_submission_id(&sub.id).await?;
self.tx_sub
.send(sub)
.await
@@ -524,6 +527,7 @@ pub(crate) struct Session {
features: Features,
pending_mcp_server_refresh_config: Mutex<Option<McpServerRefreshConfig>>,
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
submitted_ids: Mutex<HashSet<String>>,
pub(crate) services: SessionServices,
js_repl: Arc<JsReplHandle>,
next_internal_sub_id: AtomicU64,
@@ -1001,6 +1005,7 @@ impl Session {
skills_manager: Arc<SkillsManager>,
file_watcher: Arc<FileWatcher>,
agent_control: AgentControl,
requested_thread_id: Option<ThreadId>,
) -> anyhow::Result<Arc<Self>> {
debug!(
"Configuring session: model={}; provider={:?}",
@@ -1018,7 +1023,7 @@ impl Session {
let (conversation_id, rollout_params) = match &initial_history {
InitialHistory::New | InitialHistory::Forked(_) => {
let conversation_id = ThreadId::default();
let conversation_id = requested_thread_id.unwrap_or_default();
(
conversation_id,
RolloutRecorderParams::new(
@@ -1356,6 +1361,7 @@ impl Session {
features: config.features.clone(),
pending_mcp_server_refresh_config: Mutex::new(None),
active_turn: Mutex::new(None),
submitted_ids: Mutex::new(HashSet::new()),
services,
js_repl,
next_internal_sub_id: AtomicU64::new(0),
@@ -1772,6 +1778,17 @@ impl Session {
state.set_previous_model(previous_model);
}
async fn reserve_submission_id(&self, submission_id: &str) -> CodexResult<()> {
let mut submitted_ids = self.submitted_ids.lock().await;
if submitted_ids.insert(submission_id.to_string()) {
return Ok(());
}
Err(CodexErr::InvalidRequest(format!(
"duplicate submission id: {submission_id}"
)))
}
fn maybe_refresh_shell_snapshot_for_cwd(
&self,
previous_cwd: &Path,
@@ -7312,6 +7329,7 @@ mod tests {
Arc::new(SkillsManager::new(config.codex_home.clone())),
Arc::new(FileWatcher::noop()),
AgentControl::default(),
None,
)
.await;
@@ -7466,6 +7484,7 @@ mod tests {
features: config.features.clone(),
pending_mcp_server_refresh_config: Mutex::new(None),
active_turn: Mutex::new(None),
submitted_ids: Mutex::new(HashSet::new()),
services,
js_repl,
next_internal_sub_id: AtomicU64::new(0),
@@ -7622,6 +7641,7 @@ mod tests {
features: config.features.clone(),
pending_mcp_server_refresh_config: Mutex::new(None),
active_turn: Mutex::new(None),
submitted_ids: Mutex::new(HashSet::new()),
services,
js_repl,
next_internal_sub_id: AtomicU64::new(0),

View File

@@ -59,6 +59,7 @@ pub(crate) async fn run_codex_thread_interactive(
parent_session.services.agent_control.clone(),
Vec::new(),
false,
None,
)
.await?;
let codex = Arc::new(codex);

View File

@@ -282,7 +282,7 @@ impl ThreadManager {
}
pub async fn start_thread(&self, config: Config) -> CodexResult<NewThread> {
self.start_thread_with_tools(config, Vec::new(), false)
self.start_thread_with_tools(config, Vec::new(), false, None)
.await
}
@@ -291,6 +291,7 @@ impl ThreadManager {
config: Config,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
requested_thread_id: Option<ThreadId>,
) -> CodexResult<NewThread> {
self.state
.spawn_thread(
@@ -300,6 +301,7 @@ impl ThreadManager {
self.agent_control(),
dynamic_tools,
persist_extended_history,
requested_thread_id,
)
.await
}
@@ -330,6 +332,7 @@ impl ThreadManager {
self.agent_control(),
Vec::new(),
persist_extended_history,
None,
)
.await
}
@@ -371,6 +374,7 @@ impl ThreadManager {
self.agent_control(),
Vec::new(),
persist_extended_history,
None,
)
.await
}
@@ -440,6 +444,7 @@ impl ThreadManagerState {
session_source,
Vec::new(),
persist_extended_history,
None,
)
.await
}
@@ -460,6 +465,7 @@ impl ThreadManagerState {
session_source,
Vec::new(),
false,
None,
)
.await
}
@@ -473,6 +479,7 @@ impl ThreadManagerState {
agent_control: AgentControl,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
requested_thread_id: Option<ThreadId>,
) -> CodexResult<NewThread> {
self.spawn_thread_with_source(
config,
@@ -482,6 +489,7 @@ impl ThreadManagerState {
self.session_source.clone(),
dynamic_tools,
persist_extended_history,
requested_thread_id,
)
.await
}
@@ -496,6 +504,7 @@ impl ThreadManagerState {
session_source: SessionSource,
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
persist_extended_history: bool,
requested_thread_id: Option<ThreadId>,
) -> CodexResult<NewThread> {
let watch_registration = self.file_watcher.register_config(&config);
let CodexSpawnOk {
@@ -511,6 +520,7 @@ impl ThreadManagerState {
agent_control,
dynamic_tools,
persist_extended_history,
requested_thread_id,
)
.await?;
self.finalize_thread_spawn(codex, thread_id, watch_registration)
@@ -540,6 +550,11 @@ impl ThreadManagerState {
watch_registration,
));
let mut threads = self.threads.write().await;
if threads.contains_key(&thread_id) {
return Err(CodexErr::InvalidRequest(format!(
"duplicate thread id: {thread_id}"
)));
}
threads.insert(thread_id, thread.clone());
Ok(NewThread {