Compare commits

...

5 Commits

Author SHA1 Message Date
Charles Cunningham
305190df68 Shutdown exec/run threads after completion 2026-01-26 09:07:39 -08:00
Charles Cunningham
61e5efc502 Error on blocking request 2026-01-26 09:07:39 -08:00
Charles Cunningham
22f47b3285 Fix 2026-01-26 09:07:39 -08:00
Charles Cunningham
123cb25c36 Fixes 2026-01-26 09:07:39 -08:00
Charles Cunningham
7820de53fa Add exec/run one-off turn API to app-server 2026-01-26 09:07:39 -08:00
7 changed files with 464 additions and 0 deletions

View File

@@ -214,6 +214,12 @@ client_request_definitions! {
response: v2::CommandExecResponse,
},
/// Run a single turn to completion without managing a thread lifecycle.
ExecRun => "exec/run" {
params: v2::ExecRunParams,
response: v2::ExecRunResponse,
},
ConfigRead => "config/read" {
params: v2::ConfigReadParams,
response: v2::ConfigReadResponse,

View File

@@ -1083,6 +1083,35 @@ pub struct CommandExecResponse {
pub stderr: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ExecRunParams {
pub input: Vec<UserInput>,
pub model: Option<String>,
pub model_provider: Option<String>,
pub cwd: Option<String>,
pub effort: Option<ReasoningEffort>,
pub summary: Option<ReasoningSummary>,
pub collaboration_mode: Option<CollaborationMode>,
pub personality: Option<Personality>,
pub config: Option<HashMap<String, JsonValue>>,
pub base_instructions: Option<String>,
pub developer_instructions: Option<String>,
pub output_schema: Option<JsonValue>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ExecRunResponse {
pub thread_id: String,
pub turn_id: String,
pub status: TurnStatus,
pub last_agent_message: Option<String>,
pub error: Option<TurnError>,
}
// === Threads, Turns, and Items ===
// Thread APIs
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]

View File

@@ -43,6 +43,7 @@ Use the thread APIs to create, list, or archive conversations. Drive a conversat
- Initialize once: Immediately after launching the codex app-server process, send an `initialize` request with your client metadata, then emit an `initialized` notification. Any other request before this handshake gets rejected.
- Start (or resume) a thread: Call `thread/start` to open a fresh conversation. The response returns the thread object and youll also get a `thread/started` notification. If youre continuing an existing conversation, call `thread/resume` with its ID instead. If you want to branch from an existing conversation, call `thread/fork` to create a new thread id with copied history.
- Begin a turn: To send user input, call `turn/start` with the target `threadId` and the user's input. Optional fields let you override model, cwd, sandbox policy, etc. This immediately returns the new turn object and triggers a `turn/started` notification.
- Or run once: Call `exec/run` to execute a single turn to completion and receive the final status/message in the response without subscribing to the event stream.
- Stream events: After `turn/start`, keep reading JSON-RPC notifications on stdout. Youll see `item/started`, `item/completed`, deltas like `item/agentMessage/delta`, tool progress, etc. These represent streaming model output plus any side effects (commands, tool calls, reasoning notes).
- Finish the turn: When the model is done (or the turn is interrupted via making the `turn/interrupt` call), the server sends `turn/completed` with the final turn state and token usage.
@@ -85,6 +86,7 @@ Example (from OpenAI's official VSCode extension):
- `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications.
- `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`.
- `review/start` — kick off Codexs automated reviewer for a thread; responds like `turn/start` and emits `item/started`/`item/completed` notifications with `enteredReviewMode` and `exitedReviewMode` items, plus a final assistant `agentMessage` containing the review.
- `exec/run` — run a single turn to completion and return the final status/message inline without streaming notifications.
- `command/exec` — run a single command under the server sandbox without starting a thread/turn (handy for utilities and validation).
- `model/list` — list available models (with reasoning effort options).
- `collaborationMode/list` — list available collaboration mode presets (experimental, no pagination).
@@ -368,6 +370,34 @@ containing an `exitedReviewMode` item with the final review text:
The `review` string is plain text that already bundles the overall explanation plus a bullet list for each structured finding (matching `ThreadItem::ExitedReviewMode` in the generated schema). Use this notification to render the reviewer output in your client.
### Example: Exec run to completion
Run a single turn to completion without subscribing to streamed turn/item events:
```json
{ "method": "exec/run", "id": 31, "params": {
"input": [{
"type": "text",
"text": "Generate a concise thread title",
"textElements": []
}]
} }
{ "id": 31, "result": {
"threadId": "thread-123",
"turnId": "turn-456",
"status": "completed",
"lastAgentMessage": "Fix title generation for app-server",
"error": null
} }
```
Notes:
- `exec/run` is best for one-off utilities (for example, generating a title) where you just need the final result.
- `exec/run` always runs ephemerally, so it does not create a rollout and will not appear in `thread/list`.
- `exec/run` forces `approvalPolicy: "never"` and a read-only sandbox.
- Use `turn/start` when you want streaming events, intermediate items, or a long-lived thread.
### Example: One-off command execution
Run a standalone command (argv vector) in the servers sandbox without creating a thread or turn:

View File

@@ -33,6 +33,8 @@ use codex_app_server_protocol::ConversationGitInfo;
use codex_app_server_protocol::ConversationSummary;
use codex_app_server_protocol::DynamicToolSpec as ApiDynamicToolSpec;
use codex_app_server_protocol::ExecOneOffCommandResponse;
use codex_app_server_protocol::ExecRunParams;
use codex_app_server_protocol::ExecRunResponse;
use codex_app_server_protocol::FeedbackUploadParams;
use codex_app_server_protocol::FeedbackUploadResponse;
use codex_app_server_protocol::ForkConversationParams;
@@ -209,6 +211,7 @@ type PendingInterruptQueue = Vec<(RequestId, ApiVersion)>;
pub(crate) type PendingInterrupts = Arc<Mutex<HashMap<ThreadId, PendingInterruptQueue>>>;
pub(crate) type PendingRollbacks = Arc<Mutex<HashMap<ThreadId, RequestId>>>;
pub(crate) type ThreadIdsToSkipListenerAttachment = Arc<Mutex<HashSet<ThreadId>>>;
/// Per-conversation accumulation of the latest states e.g. error message while a turn runs.
#[derive(Default, Clone)]
@@ -256,6 +259,9 @@ pub(crate) struct CodexMessageProcessor {
// Queue of pending rollback requests per conversation. We reply when ThreadRollback arrives.
pending_rollbacks: PendingRollbacks,
turn_summary_store: TurnSummaryStore,
// `exec/run` consumes events directly; background listeners would drain the
// stream and prevent it from detecting completion.
thread_ids_to_skip_listener_attachment: ThreadIdsToSkipListenerAttachment,
pending_fuzzy_searches: Arc<Mutex<HashMap<String, Arc<AtomicBool>>>>,
feedback: CodexFeedback,
}
@@ -312,6 +318,7 @@ impl CodexMessageProcessor {
pending_interrupts: Arc::new(Mutex::new(HashMap::new())),
pending_rollbacks: Arc::new(Mutex::new(HashMap::new())),
turn_summary_store: Arc::new(Mutex::new(HashMap::new())),
thread_ids_to_skip_listener_attachment: Arc::new(Mutex::new(HashSet::new())),
pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())),
feedback,
}
@@ -566,6 +573,9 @@ impl CodexMessageProcessor {
ClientRequest::ExecOneOffCommand { request_id, params } => {
self.exec_one_off_command(request_id, params.into()).await;
}
ClientRequest::ExecRun { request_id, params } => {
self.exec_run(request_id, params).await;
}
ClientRequest::ConfigRead { .. }
| ClientRequest::ConfigValueWrite { .. }
| ClientRequest::ConfigBatchWrite { .. } => {
@@ -1311,6 +1321,245 @@ impl CodexMessageProcessor {
});
}
fn turn_status_from_agent_status(agent_status: &AgentStatus) -> TurnStatus {
match agent_status {
AgentStatus::Completed(_) => TurnStatus::Completed,
AgentStatus::Errored(_) | AgentStatus::Shutdown | AgentStatus::NotFound => {
TurnStatus::Failed
}
AgentStatus::PendingInit | AgentStatus::Running => TurnStatus::InProgress,
}
}
async fn run_exec_turn_to_completion(
thread: Arc<CodexThread>,
thread_id: ThreadId,
turn_id: String,
) -> Result<ExecRunResponse, JSONRPCErrorError> {
let mut last_agent_message = None;
let mut error = None;
let agent_status = loop {
let event = thread.next_event().await.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to read exec/run events: {err}"),
data: None,
})?;
if event.id != turn_id {
continue;
}
match event.msg {
EventMsg::TurnStarted(_) => continue,
EventMsg::TurnComplete(ev) => {
last_agent_message = ev.last_agent_message.clone();
break AgentStatus::Completed(ev.last_agent_message);
}
EventMsg::TurnAborted(ev) => {
let message = format!("{:?}", ev.reason);
error = Some(TurnError {
message: message.clone(),
codex_error_info: None,
additional_details: None,
});
break AgentStatus::Errored(message);
}
EventMsg::Error(ev) => {
let message = ev.message;
error = Some(TurnError {
message: message.clone(),
codex_error_info: ev.codex_error_info.map(Into::into),
additional_details: None,
});
break AgentStatus::Errored(message);
}
EventMsg::ExecApprovalRequest(_)
| EventMsg::ApplyPatchApprovalRequest(_)
| EventMsg::RequestUserInput(_)
| EventMsg::ElicitationRequest(_) => {
// `exec/run` has no interactive channel; fail fast instead of hanging.
let message =
"exec/run encountered a blocking approval or input request".to_string();
error = Some(TurnError {
message: message.clone(),
codex_error_info: None,
additional_details: None,
});
break AgentStatus::Errored(message);
}
_ => {}
}
};
Ok(ExecRunResponse {
thread_id: thread_id.to_string(),
turn_id,
status: Self::turn_status_from_agent_status(&agent_status),
last_agent_message,
error,
})
}
async fn exec_run(&self, request_id: RequestId, params: ExecRunParams) {
if params.input.is_empty() {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: "input must not be empty".to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
if let Err(err) = self
.config
.approval_policy
.can_set(&codex_protocol::protocol::AskForApproval::Never)
{
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid approval policy: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
if let Err(err) = self
.config
.sandbox_policy
.can_set(&codex_protocol::protocol::SandboxPolicy::ReadOnly)
{
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid sandbox policy: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
let mut typesafe_overrides = self.build_thread_config_overrides(
params.model,
params.model_provider,
params.cwd,
Some(codex_app_server_protocol::AskForApproval::Never),
Some(codex_app_server_protocol::SandboxMode::ReadOnly),
params.base_instructions,
params.developer_instructions,
params.personality,
);
typesafe_overrides.ephemeral = Some(true);
let config =
match derive_config_from_params(&self.cli_overrides, params.config, typesafe_overrides)
.await
{
Ok(config) => config,
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("error deriving config: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let new_thread = match self.thread_manager.start_thread(config).await {
Ok(new_thread) => new_thread,
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error creating thread: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
};
let NewThread {
thread_id, thread, ..
} = new_thread;
let thread_ids_to_skip_listener_attachment =
self.thread_ids_to_skip_listener_attachment.clone();
let thread_id_for_turn = thread_id;
let thread_id_for_remove = thread_id;
thread_ids_to_skip_listener_attachment
.lock()
.await
.insert(thread_id);
let response_result: Result<ExecRunResponse, JSONRPCErrorError> = async {
let has_turn_overrides = params.effort.is_some()
|| params.summary.is_some()
|| params.collaboration_mode.is_some();
if has_turn_overrides {
let _ = thread
.submit(Op::OverrideTurnContext {
cwd: None,
approval_policy: None,
sandbox_policy: None,
model: None,
effort: params.effort.map(Some),
summary: params.summary,
collaboration_mode: params.collaboration_mode.clone(),
personality: None,
})
.await;
}
let mapped_items = params
.input
.into_iter()
.map(V2UserInput::into_core)
.collect();
let turn_id = thread
.submit(Op::UserInput {
items: mapped_items,
final_output_json_schema: params.output_schema,
})
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to start exec/run turn: {err}"),
data: None,
})?;
Self::run_exec_turn_to_completion(thread.clone(), thread_id_for_turn, turn_id).await
}
.await;
if let Some(thread_for_shutdown) = self.thread_manager.remove_thread(&thread_id).await {
// `exec/run` threads are one-shot; shut them down to avoid leaking resources.
let _ = thread_for_shutdown.submit(Op::Shutdown).await;
}
let thread_ids_to_skip_listener_attachment_for_cleanup =
thread_ids_to_skip_listener_attachment.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
thread_ids_to_skip_listener_attachment_for_cleanup
.lock()
.await
.remove(&thread_id_for_remove);
});
match response_result {
Ok(response) => {
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
self.outgoing.send_error(request_id, err).await;
}
}
}
async fn process_new_conversation(
&mut self,
request_id: RequestId,
@@ -1879,6 +2128,15 @@ impl CodexMessageProcessor {
/// Best-effort: attach a listener for thread_id if missing.
pub(crate) async fn try_attach_thread_listener(&mut self, thread_id: ThreadId) {
if self
.thread_ids_to_skip_listener_attachment
.lock()
.await
.contains(&thread_id)
{
return;
}
if self
.listener_thread_ids_by_subscription
.values()

View File

@@ -22,6 +22,7 @@ use codex_app_server_protocol::CollaborationModeListParams;
use codex_app_server_protocol::ConfigBatchWriteParams;
use codex_app_server_protocol::ConfigReadParams;
use codex_app_server_protocol::ConfigValueWriteParams;
use codex_app_server_protocol::ExecRunParams;
use codex_app_server_protocol::FeedbackUploadParams;
use codex_app_server_protocol::ForkConversationParams;
use codex_app_server_protocol::GetAccountParams;
@@ -466,6 +467,12 @@ impl McpProcess {
self.send_request("turn/start", params).await
}
/// Send an `exec/run` JSON-RPC request (v2).
pub async fn send_exec_run_request(&mut self, params: ExecRunParams) -> anyhow::Result<i64> {
let params = Some(serde_json::to_value(params)?);
self.send_request("exec/run", params).await
}
/// Send a `turn/interrupt` JSON-RPC request (v2).
pub async fn send_turn_interrupt_request(
&mut self,

View File

@@ -0,0 +1,133 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::to_response;
use codex_app_server_protocol::ExecRunParams;
use codex_app_server_protocol::ExecRunResponse;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::TurnStatus;
use codex_app_server_protocol::UserInput as V2UserInput;
use core_test_support::skip_if_no_network;
use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
#[tokio::test]
async fn exec_run_completes_turn_and_returns_final_message() -> Result<()> {
skip_if_no_network!(Ok(()));
let responses = vec![create_final_assistant_message_sse_response("Done")?];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
mcp.initialize().await?;
let request_id = mcp
.send_exec_run_request(ExecRunParams {
input: vec![V2UserInput::Text {
text: "Generate a title".to_string(),
text_elements: Vec::new(),
}],
model: None,
model_provider: None,
cwd: None,
effort: None,
summary: None,
collaboration_mode: None,
personality: None,
config: None,
base_instructions: None,
developer_instructions: None,
output_schema: None,
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: ExecRunResponse = to_response(response)?;
assert_eq!(response.status, TurnStatus::Completed);
assert_eq!(response.last_agent_message, Some("Done".to_string()));
assert_eq!(response.error, None);
assert!(!response.thread_id.is_empty(), "thread_id should be set");
assert!(!response.turn_id.is_empty(), "turn_id should be set");
Ok(())
}
#[tokio::test]
async fn exec_run_rejects_empty_input() -> Result<()> {
skip_if_no_network!(Ok(()));
let responses = vec![create_final_assistant_message_sse_response("Done")?];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
mcp.initialize().await?;
let request_id = mcp
.send_exec_run_request(ExecRunParams {
input: Vec::new(),
model: None,
model_provider: None,
cwd: None,
effort: None,
summary: None,
collaboration_mode: None,
personality: None,
config: None,
base_instructions: None,
developer_instructions: None,
output_schema: None,
})
.await?;
let error = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;
assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE);
assert_eq!(error.error.message, "input must not be empty".to_string());
Ok(())
}
fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
config_toml,
format!(
r#"
model = "mock-model"
approval_policy = "never"
sandbox_mode = "read-only"
model_provider = "mock_provider"
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
wire_api = "responses"
request_max_retries = 0
stream_max_retries = 0
"#
),
)
}

View File

@@ -4,6 +4,7 @@ mod app_list;
mod collaboration_mode_list;
mod config_rpc;
mod dynamic_tools;
mod exec_run;
mod initialize;
mod model_list;
mod output_schema;