mirror of
https://github.com/openai/codex.git
synced 2026-03-02 04:33:54 +00:00
Compare commits
1 Commits
main
...
dev/eh/cod
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4b1bc6e4ec |
@@ -2973,6 +2973,13 @@
|
||||
"type": "null"
|
||||
}
|
||||
]
|
||||
},
|
||||
"threadId": {
|
||||
"description": "Optional caller-supplied thread ID (UUID). If omitted, the server generates a new thread ID.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
}
|
||||
},
|
||||
"type": "object"
|
||||
@@ -3085,6 +3092,13 @@
|
||||
},
|
||||
"threadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"turnId": {
|
||||
"description": "Optional caller-supplied turn ID (UUID). If omitted, the server generates a new turn ID.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
|
||||
@@ -16625,6 +16625,13 @@
|
||||
"type": "null"
|
||||
}
|
||||
]
|
||||
},
|
||||
"threadId": {
|
||||
"description": "Optional caller-supplied thread ID (UUID). If omitted, the server generates a new thread ID.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
}
|
||||
},
|
||||
"title": "ThreadStartParams",
|
||||
@@ -17204,6 +17211,13 @@
|
||||
},
|
||||
"threadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"turnId": {
|
||||
"description": "Optional caller-supplied turn ID (UUID). If omitted, the server generates a new turn ID.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
|
||||
@@ -150,6 +150,13 @@
|
||||
"type": "null"
|
||||
}
|
||||
]
|
||||
},
|
||||
"threadId": {
|
||||
"description": "Optional caller-supplied thread ID (UUID). If omitted, the server generates a new thread ID.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
}
|
||||
},
|
||||
"title": "ThreadStartParams",
|
||||
|
||||
@@ -552,6 +552,13 @@
|
||||
},
|
||||
"threadId": {
|
||||
"type": "string"
|
||||
},
|
||||
"turnId": {
|
||||
"description": "Optional caller-supplied turn ID (UUID). If omitted, the server generates a new turn ID.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
|
||||
@@ -6,7 +6,11 @@ import type { JsonValue } from "../serde_json/JsonValue";
|
||||
import type { AskForApproval } from "./AskForApproval";
|
||||
import type { SandboxMode } from "./SandboxMode";
|
||||
|
||||
export type ThreadStartParams = {model?: string | null, modelProvider?: string | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, personality?: Personality | null, ephemeral?: boolean | null, /**
|
||||
export type ThreadStartParams = {/**
|
||||
* Optional caller-supplied thread ID (UUID). If omitted, the server
|
||||
* generates a new thread ID.
|
||||
*/
|
||||
threadId?: string | null, model?: string | null, modelProvider?: string | null, cwd?: string | null, approvalPolicy?: AskForApproval | null, sandbox?: SandboxMode | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, personality?: Personality | null, ephemeral?: boolean | null, /**
|
||||
* If true, opt into emitting raw Responses API items on the event stream.
|
||||
* This is for internal use only (e.g. Codex Cloud).
|
||||
*/
|
||||
|
||||
@@ -10,7 +10,11 @@ import type { AskForApproval } from "./AskForApproval";
|
||||
import type { SandboxPolicy } from "./SandboxPolicy";
|
||||
import type { UserInput } from "./UserInput";
|
||||
|
||||
export type TurnStartParams = {threadId: string, input: Array<UserInput>, /**
|
||||
export type TurnStartParams = {threadId: string, /**
|
||||
* Optional caller-supplied turn ID (UUID). If omitted, the server
|
||||
* generates a new turn ID.
|
||||
*/
|
||||
turnId?: string | null, input: Array<UserInput>, /**
|
||||
* Override the working directory for this turn and subsequent turns.
|
||||
*/
|
||||
cwd?: string | null, /**
|
||||
|
||||
@@ -100,6 +100,13 @@ impl ThreadHistoryBuilder {
|
||||
.or_else(|| self.turns.last().cloned())
|
||||
}
|
||||
|
||||
pub fn active_turn_id(&self) -> Option<&str> {
|
||||
self.current_turn
|
||||
.as_ref()
|
||||
.map(|turn| turn.id.as_str())
|
||||
.or_else(|| self.turns.last().map(|turn| turn.id.as_str()))
|
||||
}
|
||||
|
||||
pub fn has_active_turn(&self) -> bool {
|
||||
self.current_turn.is_some()
|
||||
}
|
||||
|
||||
@@ -1483,6 +1483,10 @@ pub struct CommandExecResponse {
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadStartParams {
|
||||
/// Optional caller-supplied thread ID (UUID). If omitted, the server
|
||||
/// generates a new thread ID.
|
||||
#[ts(optional = nullable)]
|
||||
pub thread_id: Option<String>,
|
||||
#[ts(optional = nullable)]
|
||||
pub model: Option<String>,
|
||||
#[ts(optional = nullable)]
|
||||
@@ -2339,6 +2343,10 @@ pub enum TurnStatus {
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct TurnStartParams {
|
||||
pub thread_id: String,
|
||||
/// Optional caller-supplied turn ID (UUID). If omitted, the server
|
||||
/// generates a new turn ID.
|
||||
#[ts(optional = nullable)]
|
||||
pub turn_id: Option<String>,
|
||||
pub input: Vec<UserInput>,
|
||||
/// Override the working directory for this turn and subsequent turns.
|
||||
#[ts(optional = nullable)]
|
||||
|
||||
@@ -114,7 +114,7 @@ Example with notification opt-out:
|
||||
|
||||
## API Overview
|
||||
|
||||
- `thread/start` — create a new thread; emits `thread/started` and auto-subscribes you to turn/item events for that thread.
|
||||
- `thread/start` — create a new thread; optionally supply `threadId` (UUID) to pre-assign the thread id. Emits `thread/started` and auto-subscribes you to turn/item events for that thread.
|
||||
- `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it.
|
||||
- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; emits `thread/started` and auto-subscribes you to turn/item events for the new thread.
|
||||
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, and `cwd` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded.
|
||||
@@ -127,7 +127,7 @@ Example with notification opt-out:
|
||||
- `thread/compact/start` — trigger conversation history compaction for a thread; returns `{}` immediately while progress streams through standard turn/item notifications.
|
||||
- `thread/backgroundTerminals/clean` — terminate all running background terminals for a thread (experimental; requires `capabilities.experimentalApi`); returns `{}` when the cleanup request is accepted.
|
||||
- `thread/rollback` — drop the last N turns from the agent’s in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
|
||||
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
|
||||
- `turn/start` — add user input to a thread and begin Codex generation; optionally supply `turnId` (UUID) to pre-assign the turn id. Responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. For `collaborationMode`, `settings.developer_instructions: null` means "use built-in instructions for the selected mode".
|
||||
- `turn/steer` — add user input to an already in-flight turn without starting a new turn; returns the active `turnId` that accepted the input.
|
||||
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
|
||||
- `review/start` — kick off Codex’s automated reviewer for a thread; responds like `turn/start` and emits `item/started`/`item/completed` notifications with `enteredReviewMode` and `exitedReviewMode` items, plus a final assistant `agentMessage` containing the review.
|
||||
@@ -158,6 +158,8 @@ Start a fresh thread when you need a new Codex conversation.
|
||||
|
||||
```json
|
||||
{ "method": "thread/start", "id": 10, "params": {
|
||||
// Optionally pre-assign the thread id (must be a UUID string).
|
||||
"threadId": "018f2b7a-5d5d-7c39-9d6b-9f3f88d4f1c2",
|
||||
// Optionally set config settings. If not specified, will use the user's
|
||||
// current config settings.
|
||||
"model": "gpt-5.1-codex",
|
||||
@@ -339,6 +341,8 @@ You can optionally specify config overrides on the new turn. If specified, these
|
||||
```json
|
||||
{ "method": "turn/start", "id": 30, "params": {
|
||||
"threadId": "thr_123",
|
||||
// Optionally pre-assign the turn id (must be a UUID string).
|
||||
"turnId": "018f2b7a-5d5d-7c39-9d6b-9f3f88d4f1c3",
|
||||
"input": [ { "type": "text", "text": "Run tests" } ],
|
||||
// Below are optional config overrides
|
||||
"cwd": "/Users/me/project",
|
||||
|
||||
@@ -213,6 +213,7 @@ use codex_core::protocol::ReviewDelivery as CoreReviewDelivery;
|
||||
use codex_core::protocol::ReviewRequest;
|
||||
use codex_core::protocol::ReviewTarget as CoreReviewTarget;
|
||||
use codex_core::protocol::SessionConfiguredEvent;
|
||||
use codex_core::protocol::Submission;
|
||||
use codex_core::read_head_for_summary;
|
||||
use codex_core::read_session_meta_line;
|
||||
use codex_core::rollout_date_parts;
|
||||
@@ -390,6 +391,70 @@ impl CodexMessageProcessor {
|
||||
|
||||
Ok((thread_id, thread))
|
||||
}
|
||||
|
||||
async fn validate_requested_turn_id(
|
||||
&mut self,
|
||||
thread_uuid: ThreadId,
|
||||
thread: &Arc<CodexThread>,
|
||||
turn_id: &str,
|
||||
) -> Result<(), JSONRPCErrorError> {
|
||||
let thread_state = self.thread_state_manager.thread_state(thread_uuid);
|
||||
{
|
||||
let state = thread_state.lock().await;
|
||||
if state.active_turn_id().is_some_and(|id| id == turn_id)
|
||||
|| state.has_known_turn_id(turn_id)
|
||||
{
|
||||
return Err(JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("turnId already exists: {turn_id}"),
|
||||
data: None,
|
||||
});
|
||||
}
|
||||
if state.known_turn_ids_seeded() {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
let historical_turn_ids = if let Some(rollout_path) = thread.rollout_path()
|
||||
&& rollout_path.exists()
|
||||
{
|
||||
read_rollout_items_from_rollout(rollout_path.as_path())
|
||||
.await
|
||||
.map(|items| build_turns_from_rollout_items(&items))
|
||||
.map(|turns| turns.into_iter().map(|turn| turn.id).collect::<Vec<_>>())
|
||||
.map_err(|err| JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!(
|
||||
"failed to validate turnId {turn_id} against rollout history: {err}"
|
||||
),
|
||||
data: None,
|
||||
})?
|
||||
} else {
|
||||
Vec::new()
|
||||
};
|
||||
|
||||
let mut state = thread_state.lock().await;
|
||||
if state.active_turn_id().is_some_and(|id| id == turn_id) {
|
||||
return Err(JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("turnId already exists: {turn_id}"),
|
||||
data: None,
|
||||
});
|
||||
}
|
||||
if !state.known_turn_ids_seeded() {
|
||||
state.seed_known_turn_ids(historical_turn_ids);
|
||||
}
|
||||
if state.has_known_turn_id(turn_id) {
|
||||
return Err(JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("turnId already exists: {turn_id}"),
|
||||
data: None,
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn new(args: CodexMessageProcessorArgs) -> Self {
|
||||
let CodexMessageProcessorArgs {
|
||||
auth_manager,
|
||||
@@ -1940,6 +2005,7 @@ impl CodexMessageProcessor {
|
||||
|
||||
async fn thread_start(&mut self, request_id: ConnectionRequestId, params: ThreadStartParams) {
|
||||
let ThreadStartParams {
|
||||
thread_id,
|
||||
model,
|
||||
model_provider,
|
||||
cwd,
|
||||
@@ -1955,6 +2021,17 @@ impl CodexMessageProcessor {
|
||||
ephemeral,
|
||||
persist_extended_history,
|
||||
} = params;
|
||||
let requested_thread_id = match thread_id {
|
||||
Some(thread_id) => match ThreadId::from_string(&thread_id) {
|
||||
Ok(thread_id) => Some(thread_id),
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(request_id, format!("invalid threadId: {err}"))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
},
|
||||
None => None,
|
||||
};
|
||||
let mut typesafe_overrides = self.build_thread_config_overrides(
|
||||
model,
|
||||
model_provider,
|
||||
@@ -1988,6 +2065,61 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(thread_id) = requested_thread_id {
|
||||
let thread_id_str = thread_id.to_string();
|
||||
if self.thread_manager.get_thread(thread_id).await.is_ok() {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("threadId already exists: {thread_id_str}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
let existing_thread_path =
|
||||
match find_thread_path_by_id_str(&config.codex_home, &thread_id_str).await {
|
||||
Ok(path) => path,
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!("failed to validate threadId {thread_id_str}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
if existing_thread_path.is_some() {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("threadId already exists: {thread_id_str}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
let existing_archived_thread_path =
|
||||
match find_archived_thread_path_by_id_str(&config.codex_home, &thread_id_str).await
|
||||
{
|
||||
Ok(path) => path,
|
||||
Err(err) => {
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!("failed to validate threadId {thread_id_str}: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
if existing_archived_thread_path.is_some() {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("threadId already exists: {thread_id_str}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let dynamic_tools = dynamic_tools.unwrap_or_default();
|
||||
let core_dynamic_tools = if dynamic_tools.is_empty() {
|
||||
Vec::new()
|
||||
@@ -2013,7 +2145,12 @@ impl CodexMessageProcessor {
|
||||
|
||||
match self
|
||||
.thread_manager
|
||||
.start_thread_with_tools(config, core_dynamic_tools, persist_extended_history)
|
||||
.start_thread_with_tools(
|
||||
config,
|
||||
core_dynamic_tools,
|
||||
persist_extended_history,
|
||||
requested_thread_id,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(new_conv) => {
|
||||
@@ -5294,8 +5431,30 @@ impl CodexMessageProcessor {
|
||||
let _ = conversation.submit(Op::Interrupt).await;
|
||||
}
|
||||
|
||||
async fn turn_start(&self, request_id: ConnectionRequestId, params: TurnStartParams) {
|
||||
let (_, thread) = match self.load_thread(¶ms.thread_id).await {
|
||||
async fn turn_start(&mut self, request_id: ConnectionRequestId, params: TurnStartParams) {
|
||||
if let Some(turn_id) = params.turn_id.as_ref()
|
||||
&& turn_id.is_empty()
|
||||
{
|
||||
self.send_invalid_request_error(request_id, "turnId must not be empty".to_string())
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
let turn_id = match params.turn_id {
|
||||
Some(turn_id) => match Uuid::parse_str(&turn_id) {
|
||||
Ok(turn_uuid) => Some(turn_uuid.hyphenated().to_string()),
|
||||
Err(_) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
"invalid turnId: must be a UUID".to_string(),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
},
|
||||
None => None,
|
||||
};
|
||||
|
||||
let (thread_uuid, thread) = match self.load_thread(¶ms.thread_id).await {
|
||||
Ok(v) => v,
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
@@ -5303,6 +5462,16 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(turn_id) = turn_id.as_deref() {
|
||||
if let Err(error) = self
|
||||
.validate_requested_turn_id(thread_uuid, &thread, turn_id)
|
||||
.await
|
||||
{
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let collaboration_mode = params
|
||||
.collaboration_mode
|
||||
.map(|mode| self.normalize_turn_start_collaboration_mode(mode));
|
||||
@@ -5341,15 +5510,26 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
// Start the turn by submitting the user input. Return its submission id as turn_id.
|
||||
let turn_id = thread
|
||||
.submit(Op::UserInput {
|
||||
items: mapped_items,
|
||||
final_output_json_schema: params.output_schema,
|
||||
})
|
||||
.await;
|
||||
let op = Op::UserInput {
|
||||
items: mapped_items,
|
||||
final_output_json_schema: params.output_schema,
|
||||
};
|
||||
let turn_id = match turn_id {
|
||||
Some(turn_id) => thread
|
||||
.submit_with_id(Submission {
|
||||
id: turn_id.clone(),
|
||||
op,
|
||||
})
|
||||
.await
|
||||
.map(|()| turn_id),
|
||||
None => thread.submit(op).await,
|
||||
};
|
||||
|
||||
match turn_id {
|
||||
Ok(turn_id) => {
|
||||
let thread_state = self.thread_state_manager.thread_state(thread_uuid);
|
||||
thread_state.lock().await.remember_turn_id(turn_id.clone());
|
||||
|
||||
let turn = Turn {
|
||||
id: turn_id.clone(),
|
||||
items: vec![],
|
||||
@@ -5369,14 +5549,19 @@ impl CodexMessageProcessor {
|
||||
.send_server_notification(ServerNotification::TurnStarted(notif))
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("failed to start turn: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
}
|
||||
Err(err) => match err {
|
||||
CodexErr::InvalidRequest(message) => {
|
||||
self.send_invalid_request_error(request_id, message).await;
|
||||
}
|
||||
err => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("failed to start turn: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -49,6 +49,8 @@ pub(crate) struct ThreadState {
|
||||
pub(crate) experimental_raw_events: bool,
|
||||
listener_command_tx: Option<mpsc::UnboundedSender<ThreadListenerCommand>>,
|
||||
current_turn_history: ThreadHistoryBuilder,
|
||||
known_turn_ids: HashSet<String>,
|
||||
known_turn_ids_seeded: bool,
|
||||
listener_thread: Option<Weak<CodexThread>>,
|
||||
subscribed_connections: HashSet<ConnectionId>,
|
||||
}
|
||||
@@ -110,8 +112,35 @@ impl ThreadState {
|
||||
self.current_turn_history.active_turn_snapshot()
|
||||
}
|
||||
|
||||
pub(crate) fn active_turn_id(&self) -> Option<&str> {
|
||||
self.current_turn_history.active_turn_id()
|
||||
}
|
||||
|
||||
pub(crate) fn has_known_turn_id(&self, turn_id: &str) -> bool {
|
||||
self.known_turn_ids.contains(turn_id)
|
||||
}
|
||||
|
||||
pub(crate) fn known_turn_ids_seeded(&self) -> bool {
|
||||
self.known_turn_ids_seeded
|
||||
}
|
||||
|
||||
pub(crate) fn seed_known_turn_ids(&mut self, turn_ids: impl IntoIterator<Item = String>) {
|
||||
if self.known_turn_ids_seeded {
|
||||
return;
|
||||
}
|
||||
self.known_turn_ids.extend(turn_ids);
|
||||
self.known_turn_ids_seeded = true;
|
||||
}
|
||||
|
||||
pub(crate) fn remember_turn_id(&mut self, turn_id: impl Into<String>) {
|
||||
self.known_turn_ids.insert(turn_id.into());
|
||||
}
|
||||
|
||||
pub(crate) fn track_current_turn_event(&mut self, event: &EventMsg) {
|
||||
self.current_turn_history.handle_event(event);
|
||||
if let Some(active_turn_id) = self.current_turn_history.active_turn_id() {
|
||||
self.known_turn_ids.insert(active_turn_id.to_string());
|
||||
}
|
||||
if !self.current_turn_history.has_active_turn() {
|
||||
self.current_turn_history.reset();
|
||||
}
|
||||
|
||||
@@ -81,6 +81,43 @@ async fn thread_start_creates_thread_and_emits_started() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_start_uses_caller_supplied_thread_id() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let requested_thread_id = "018f2b7a-5d5d-7c39-9d6b-9f3f88d4f1c2".to_string();
|
||||
let req_id = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
thread_id: Some(requested_thread_id.clone()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
|
||||
let resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(req_id)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(resp)?;
|
||||
assert_eq!(thread.id, requested_thread_id);
|
||||
|
||||
let notif: JSONRPCNotification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/started"),
|
||||
)
|
||||
.await??;
|
||||
let started: ThreadStartedNotification =
|
||||
serde_json::from_value(notif.params.expect("params must be present"))?;
|
||||
assert_eq!(started.thread.id, thread.id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_start_respects_project_config_from_cwd() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
|
||||
@@ -19,6 +19,7 @@ use codex_app_server_protocol::FileChangeOutputDeltaNotification;
|
||||
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
|
||||
use codex_app_server_protocol::ItemCompletedNotification;
|
||||
use codex_app_server_protocol::ItemStartedNotification;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::PatchApplyStatus;
|
||||
@@ -135,6 +136,199 @@ async fn turn_start_sends_originator_header() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_uses_caller_supplied_turn_id() -> Result<()> {
|
||||
let responses = vec![create_final_assistant_message_sse_response("Done")?];
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
"never",
|
||||
&BTreeMap::from([(Feature::Personality, true)]),
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let thread_req = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("mock-model".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let thread_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
||||
|
||||
let requested_turn_id = "018f2b7a-5d5d-7c39-9d6b-9f3f88d4f1d3".to_string();
|
||||
let turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
turn_id: Some(requested_turn_id.clone()),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "Hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let turn_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
|
||||
)
|
||||
.await??;
|
||||
let TurnStartResponse { turn: started_turn } = to_response::<TurnStartResponse>(turn_resp)?;
|
||||
assert_eq!(started_turn.id, requested_turn_id);
|
||||
|
||||
let notif: JSONRPCNotification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/started"),
|
||||
)
|
||||
.await??;
|
||||
let started: TurnStartedNotification =
|
||||
serde_json::from_value(notif.params.expect("turn/started params"))?;
|
||||
assert_eq!(started.thread_id, thread.id);
|
||||
assert_eq!(started.turn.id, started_turn.id);
|
||||
|
||||
let completed_notif: JSONRPCNotification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
let completed: TurnCompletedNotification =
|
||||
serde_json::from_value(completed_notif.params.expect("turn/completed params"))?;
|
||||
assert_eq!(completed.turn.id, started_turn.id);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_rejects_non_uuid_turn_id() -> Result<()> {
|
||||
let server = create_mock_responses_server_sequence_unchecked(Vec::new()).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
"never",
|
||||
&BTreeMap::from([(Feature::Personality, true)]),
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let thread_req = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("mock-model".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let thread_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
||||
|
||||
let turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id,
|
||||
turn_id: Some("not-a-uuid".to_string()),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "Hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let err: JSONRPCError = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(turn_req)),
|
||||
)
|
||||
.await??;
|
||||
assert!(err.error.message.contains("invalid turnId"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_rejects_duplicate_turn_id() -> Result<()> {
|
||||
let responses = vec![create_final_assistant_message_sse_response("Done")?];
|
||||
let server = create_mock_responses_server_sequence_unchecked(responses).await;
|
||||
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(
|
||||
codex_home.path(),
|
||||
&server.uri(),
|
||||
"never",
|
||||
&BTreeMap::from([(Feature::Personality, true)]),
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let thread_req = mcp
|
||||
.send_thread_start_request(ThreadStartParams {
|
||||
model: Some("mock-model".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let thread_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response::<ThreadStartResponse>(thread_resp)?;
|
||||
|
||||
let requested_turn_id = "018f2b7a-5d5d-7c39-9d6b-9f3f88d4f1d4".to_string();
|
||||
let first_turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
turn_id: Some(requested_turn_id.clone()),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "Hello".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(first_turn_req)),
|
||||
)
|
||||
.await??;
|
||||
timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("turn/completed"),
|
||||
)
|
||||
.await??;
|
||||
|
||||
let second_turn_req = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id,
|
||||
turn_id: Some(requested_turn_id.to_uppercase()),
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "Hello again".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let err: JSONRPCError = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(second_turn_req)),
|
||||
)
|
||||
.await??;
|
||||
assert!(err.error.message.contains("turnId already exists"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn turn_start_emits_user_message_item_with_text_elements() -> Result<()> {
|
||||
let responses = vec![create_final_assistant_message_sse_response("Done")?];
|
||||
@@ -1101,6 +1295,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
|
||||
let first_turn = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
turn_id: None,
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "first turn".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
@@ -1138,6 +1333,7 @@ async fn turn_start_updates_sandbox_and_cwd_between_turns_v2() -> Result<()> {
|
||||
let second_turn = mcp
|
||||
.send_turn_start_request(TurnStartParams {
|
||||
thread_id: thread.id.clone(),
|
||||
turn_id: None,
|
||||
input: vec![V2UserInput::Text {
|
||||
text: "second turn".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
|
||||
@@ -300,6 +300,7 @@ impl Codex {
|
||||
agent_control: AgentControl,
|
||||
dynamic_tools: Vec<DynamicToolSpec>,
|
||||
persist_extended_history: bool,
|
||||
requested_thread_id: Option<ThreadId>,
|
||||
) -> CodexResult<CodexSpawnOk> {
|
||||
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let (tx_event, rx_event) = async_channel::unbounded();
|
||||
@@ -429,6 +430,7 @@ impl Codex {
|
||||
skills_manager,
|
||||
file_watcher,
|
||||
agent_control,
|
||||
requested_thread_id,
|
||||
)
|
||||
.instrument(session_init_span)
|
||||
.await
|
||||
@@ -469,6 +471,7 @@ impl Codex {
|
||||
/// Use sparingly: prefer `submit()` so Codex is responsible for generating
|
||||
/// unique IDs for each submission.
|
||||
pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> {
|
||||
self.session.reserve_submission_id(&sub.id).await?;
|
||||
self.tx_sub
|
||||
.send(sub)
|
||||
.await
|
||||
@@ -524,6 +527,7 @@ pub(crate) struct Session {
|
||||
features: Features,
|
||||
pending_mcp_server_refresh_config: Mutex<Option<McpServerRefreshConfig>>,
|
||||
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
|
||||
submitted_ids: Mutex<HashSet<String>>,
|
||||
pub(crate) services: SessionServices,
|
||||
js_repl: Arc<JsReplHandle>,
|
||||
next_internal_sub_id: AtomicU64,
|
||||
@@ -1001,6 +1005,7 @@ impl Session {
|
||||
skills_manager: Arc<SkillsManager>,
|
||||
file_watcher: Arc<FileWatcher>,
|
||||
agent_control: AgentControl,
|
||||
requested_thread_id: Option<ThreadId>,
|
||||
) -> anyhow::Result<Arc<Self>> {
|
||||
debug!(
|
||||
"Configuring session: model={}; provider={:?}",
|
||||
@@ -1018,7 +1023,7 @@ impl Session {
|
||||
|
||||
let (conversation_id, rollout_params) = match &initial_history {
|
||||
InitialHistory::New | InitialHistory::Forked(_) => {
|
||||
let conversation_id = ThreadId::default();
|
||||
let conversation_id = requested_thread_id.unwrap_or_default();
|
||||
(
|
||||
conversation_id,
|
||||
RolloutRecorderParams::new(
|
||||
@@ -1356,6 +1361,7 @@ impl Session {
|
||||
features: config.features.clone(),
|
||||
pending_mcp_server_refresh_config: Mutex::new(None),
|
||||
active_turn: Mutex::new(None),
|
||||
submitted_ids: Mutex::new(HashSet::new()),
|
||||
services,
|
||||
js_repl,
|
||||
next_internal_sub_id: AtomicU64::new(0),
|
||||
@@ -1772,6 +1778,17 @@ impl Session {
|
||||
state.set_previous_model(previous_model);
|
||||
}
|
||||
|
||||
async fn reserve_submission_id(&self, submission_id: &str) -> CodexResult<()> {
|
||||
let mut submitted_ids = self.submitted_ids.lock().await;
|
||||
if submitted_ids.insert(submission_id.to_string()) {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
Err(CodexErr::InvalidRequest(format!(
|
||||
"duplicate submission id: {submission_id}"
|
||||
)))
|
||||
}
|
||||
|
||||
fn maybe_refresh_shell_snapshot_for_cwd(
|
||||
&self,
|
||||
previous_cwd: &Path,
|
||||
@@ -7312,6 +7329,7 @@ mod tests {
|
||||
Arc::new(SkillsManager::new(config.codex_home.clone())),
|
||||
Arc::new(FileWatcher::noop()),
|
||||
AgentControl::default(),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -7466,6 +7484,7 @@ mod tests {
|
||||
features: config.features.clone(),
|
||||
pending_mcp_server_refresh_config: Mutex::new(None),
|
||||
active_turn: Mutex::new(None),
|
||||
submitted_ids: Mutex::new(HashSet::new()),
|
||||
services,
|
||||
js_repl,
|
||||
next_internal_sub_id: AtomicU64::new(0),
|
||||
@@ -7622,6 +7641,7 @@ mod tests {
|
||||
features: config.features.clone(),
|
||||
pending_mcp_server_refresh_config: Mutex::new(None),
|
||||
active_turn: Mutex::new(None),
|
||||
submitted_ids: Mutex::new(HashSet::new()),
|
||||
services,
|
||||
js_repl,
|
||||
next_internal_sub_id: AtomicU64::new(0),
|
||||
|
||||
@@ -59,6 +59,7 @@ pub(crate) async fn run_codex_thread_interactive(
|
||||
parent_session.services.agent_control.clone(),
|
||||
Vec::new(),
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
let codex = Arc::new(codex);
|
||||
|
||||
@@ -282,7 +282,7 @@ impl ThreadManager {
|
||||
}
|
||||
|
||||
pub async fn start_thread(&self, config: Config) -> CodexResult<NewThread> {
|
||||
self.start_thread_with_tools(config, Vec::new(), false)
|
||||
self.start_thread_with_tools(config, Vec::new(), false, None)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -291,6 +291,7 @@ impl ThreadManager {
|
||||
config: Config,
|
||||
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
|
||||
persist_extended_history: bool,
|
||||
requested_thread_id: Option<ThreadId>,
|
||||
) -> CodexResult<NewThread> {
|
||||
self.state
|
||||
.spawn_thread(
|
||||
@@ -300,6 +301,7 @@ impl ThreadManager {
|
||||
self.agent_control(),
|
||||
dynamic_tools,
|
||||
persist_extended_history,
|
||||
requested_thread_id,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -330,6 +332,7 @@ impl ThreadManager {
|
||||
self.agent_control(),
|
||||
Vec::new(),
|
||||
persist_extended_history,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -371,6 +374,7 @@ impl ThreadManager {
|
||||
self.agent_control(),
|
||||
Vec::new(),
|
||||
persist_extended_history,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -440,6 +444,7 @@ impl ThreadManagerState {
|
||||
session_source,
|
||||
Vec::new(),
|
||||
persist_extended_history,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -460,6 +465,7 @@ impl ThreadManagerState {
|
||||
session_source,
|
||||
Vec::new(),
|
||||
false,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -473,6 +479,7 @@ impl ThreadManagerState {
|
||||
agent_control: AgentControl,
|
||||
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
|
||||
persist_extended_history: bool,
|
||||
requested_thread_id: Option<ThreadId>,
|
||||
) -> CodexResult<NewThread> {
|
||||
self.spawn_thread_with_source(
|
||||
config,
|
||||
@@ -482,6 +489,7 @@ impl ThreadManagerState {
|
||||
self.session_source.clone(),
|
||||
dynamic_tools,
|
||||
persist_extended_history,
|
||||
requested_thread_id,
|
||||
)
|
||||
.await
|
||||
}
|
||||
@@ -496,6 +504,7 @@ impl ThreadManagerState {
|
||||
session_source: SessionSource,
|
||||
dynamic_tools: Vec<codex_protocol::dynamic_tools::DynamicToolSpec>,
|
||||
persist_extended_history: bool,
|
||||
requested_thread_id: Option<ThreadId>,
|
||||
) -> CodexResult<NewThread> {
|
||||
let watch_registration = self.file_watcher.register_config(&config);
|
||||
let CodexSpawnOk {
|
||||
@@ -511,6 +520,7 @@ impl ThreadManagerState {
|
||||
agent_control,
|
||||
dynamic_tools,
|
||||
persist_extended_history,
|
||||
requested_thread_id,
|
||||
)
|
||||
.await?;
|
||||
self.finalize_thread_spawn(codex, thread_id, watch_registration)
|
||||
@@ -540,6 +550,11 @@ impl ThreadManagerState {
|
||||
watch_registration,
|
||||
));
|
||||
let mut threads = self.threads.write().await;
|
||||
if threads.contains_key(&thread_id) {
|
||||
return Err(CodexErr::InvalidRequest(format!(
|
||||
"duplicate thread id: {thread_id}"
|
||||
)));
|
||||
}
|
||||
threads.insert(thread_id, thread.clone());
|
||||
|
||||
Ok(NewThread {
|
||||
|
||||
Reference in New Issue
Block a user