mirror of
https://github.com/openai/codex.git
synced 2026-04-17 19:24:47 +00:00
Compare commits
1 Commits
codex-debu
...
starr/asyn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b7749c93b0 |
@@ -2549,6 +2549,28 @@
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadAsyncTaskStartParams": {
|
||||
"properties": {
|
||||
"input": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/UserInput"
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"taskId": {
|
||||
"type": "string"
|
||||
},
|
||||
"threadId": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"input",
|
||||
"taskId",
|
||||
"threadId"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadCompactStartParams": {
|
||||
"properties": {
|
||||
"threadId": {
|
||||
@@ -3743,6 +3765,30 @@
|
||||
"title": "Thread/shellCommandRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
"$ref": "#/definitions/RequestId"
|
||||
},
|
||||
"method": {
|
||||
"enum": [
|
||||
"thread/asyncTask/start"
|
||||
],
|
||||
"title": "Thread/asyncTask/startRequestMethod",
|
||||
"type": "string"
|
||||
},
|
||||
"params": {
|
||||
"$ref": "#/definitions/ThreadAsyncTaskStartParams"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"id",
|
||||
"method",
|
||||
"params"
|
||||
],
|
||||
"title": "Thread/asyncTask/startRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
|
||||
@@ -482,6 +482,30 @@
|
||||
"title": "Thread/shellCommandRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
"$ref": "#/definitions/v2/RequestId"
|
||||
},
|
||||
"method": {
|
||||
"enum": [
|
||||
"thread/asyncTask/start"
|
||||
],
|
||||
"title": "Thread/asyncTask/startRequestMethod",
|
||||
"type": "string"
|
||||
},
|
||||
"params": {
|
||||
"$ref": "#/definitions/v2/ThreadAsyncTaskStartParams"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"id",
|
||||
"method",
|
||||
"params"
|
||||
],
|
||||
"title": "Thread/asyncTask/startRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
@@ -12448,6 +12472,35 @@
|
||||
"title": "ThreadArchivedNotification",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadAsyncTaskStartParams": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
"input": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/v2/UserInput"
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"taskId": {
|
||||
"type": "string"
|
||||
},
|
||||
"threadId": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"input",
|
||||
"taskId",
|
||||
"threadId"
|
||||
],
|
||||
"title": "ThreadAsyncTaskStartParams",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadAsyncTaskStartResponse": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "ThreadAsyncTaskStartResponse",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadClosedNotification": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
|
||||
@@ -1057,6 +1057,30 @@
|
||||
"title": "Thread/shellCommandRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
"$ref": "#/definitions/RequestId"
|
||||
},
|
||||
"method": {
|
||||
"enum": [
|
||||
"thread/asyncTask/start"
|
||||
],
|
||||
"title": "Thread/asyncTask/startRequestMethod",
|
||||
"type": "string"
|
||||
},
|
||||
"params": {
|
||||
"$ref": "#/definitions/ThreadAsyncTaskStartParams"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"id",
|
||||
"method",
|
||||
"params"
|
||||
],
|
||||
"title": "Thread/asyncTask/startRequest",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"id": {
|
||||
@@ -10303,6 +10327,35 @@
|
||||
"title": "ThreadArchivedNotification",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadAsyncTaskStartParams": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
"input": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/UserInput"
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"taskId": {
|
||||
"type": "string"
|
||||
},
|
||||
"threadId": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"input",
|
||||
"taskId",
|
||||
"threadId"
|
||||
],
|
||||
"title": "ThreadAsyncTaskStartParams",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadAsyncTaskStartResponse": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "ThreadAsyncTaskStartResponse",
|
||||
"type": "object"
|
||||
},
|
||||
"ThreadClosedNotification": {
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"properties": {
|
||||
|
||||
@@ -0,0 +1,188 @@
|
||||
{
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"definitions": {
|
||||
"ByteRange": {
|
||||
"properties": {
|
||||
"end": {
|
||||
"format": "uint",
|
||||
"minimum": 0.0,
|
||||
"type": "integer"
|
||||
},
|
||||
"start": {
|
||||
"format": "uint",
|
||||
"minimum": 0.0,
|
||||
"type": "integer"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"end",
|
||||
"start"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"TextElement": {
|
||||
"properties": {
|
||||
"byteRange": {
|
||||
"allOf": [
|
||||
{
|
||||
"$ref": "#/definitions/ByteRange"
|
||||
}
|
||||
],
|
||||
"description": "Byte range in the parent `text` buffer that this element occupies."
|
||||
},
|
||||
"placeholder": {
|
||||
"description": "Optional human-readable placeholder for the element, displayed in the UI.",
|
||||
"type": [
|
||||
"string",
|
||||
"null"
|
||||
]
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"byteRange"
|
||||
],
|
||||
"type": "object"
|
||||
},
|
||||
"UserInput": {
|
||||
"oneOf": [
|
||||
{
|
||||
"properties": {
|
||||
"text": {
|
||||
"type": "string"
|
||||
},
|
||||
"text_elements": {
|
||||
"default": [],
|
||||
"description": "UI-defined spans within `text` used to render or persist special elements.",
|
||||
"items": {
|
||||
"$ref": "#/definitions/TextElement"
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"text"
|
||||
],
|
||||
"title": "TextUserInputType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"text",
|
||||
"type"
|
||||
],
|
||||
"title": "TextUserInput",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"type": {
|
||||
"enum": [
|
||||
"image"
|
||||
],
|
||||
"title": "ImageUserInputType",
|
||||
"type": "string"
|
||||
},
|
||||
"url": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"type",
|
||||
"url"
|
||||
],
|
||||
"title": "ImageUserInput",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"path": {
|
||||
"type": "string"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"localImage"
|
||||
],
|
||||
"title": "LocalImageUserInputType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"path",
|
||||
"type"
|
||||
],
|
||||
"title": "LocalImageUserInput",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string"
|
||||
},
|
||||
"path": {
|
||||
"type": "string"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"skill"
|
||||
],
|
||||
"title": "SkillUserInputType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"name",
|
||||
"path",
|
||||
"type"
|
||||
],
|
||||
"title": "SkillUserInput",
|
||||
"type": "object"
|
||||
},
|
||||
{
|
||||
"properties": {
|
||||
"name": {
|
||||
"type": "string"
|
||||
},
|
||||
"path": {
|
||||
"type": "string"
|
||||
},
|
||||
"type": {
|
||||
"enum": [
|
||||
"mention"
|
||||
],
|
||||
"title": "MentionUserInputType",
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"name",
|
||||
"path",
|
||||
"type"
|
||||
],
|
||||
"title": "MentionUserInput",
|
||||
"type": "object"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"properties": {
|
||||
"input": {
|
||||
"items": {
|
||||
"$ref": "#/definitions/UserInput"
|
||||
},
|
||||
"type": "array"
|
||||
},
|
||||
"taskId": {
|
||||
"type": "string"
|
||||
},
|
||||
"threadId": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"required": [
|
||||
"input",
|
||||
"taskId",
|
||||
"threadId"
|
||||
],
|
||||
"title": "ThreadAsyncTaskStartParams",
|
||||
"type": "object"
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
{
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "ThreadAsyncTaskStartResponse",
|
||||
"type": "object"
|
||||
}
|
||||
File diff suppressed because one or more lines are too long
@@ -0,0 +1,6 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
import type { UserInput } from "./UserInput";
|
||||
|
||||
export type ThreadAsyncTaskStartParams = { threadId: string, taskId: string, input: Array<UserInput>, };
|
||||
@@ -0,0 +1,5 @@
|
||||
// GENERATED CODE! DO NOT MODIFY BY HAND!
|
||||
|
||||
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
|
||||
|
||||
export type ThreadAsyncTaskStartResponse = Record<string, never>;
|
||||
@@ -273,6 +273,8 @@ export type { ThreadActiveFlag } from "./ThreadActiveFlag";
|
||||
export type { ThreadArchiveParams } from "./ThreadArchiveParams";
|
||||
export type { ThreadArchiveResponse } from "./ThreadArchiveResponse";
|
||||
export type { ThreadArchivedNotification } from "./ThreadArchivedNotification";
|
||||
export type { ThreadAsyncTaskStartParams } from "./ThreadAsyncTaskStartParams";
|
||||
export type { ThreadAsyncTaskStartResponse } from "./ThreadAsyncTaskStartResponse";
|
||||
export type { ThreadClosedNotification } from "./ThreadClosedNotification";
|
||||
export type { ThreadCompactStartParams } from "./ThreadCompactStartParams";
|
||||
export type { ThreadCompactStartResponse } from "./ThreadCompactStartResponse";
|
||||
|
||||
@@ -296,6 +296,10 @@ client_request_definitions! {
|
||||
params: v2::ThreadShellCommandParams,
|
||||
response: v2::ThreadShellCommandResponse,
|
||||
},
|
||||
ThreadAsyncTaskStart => "thread/asyncTask/start" {
|
||||
params: v2::ThreadAsyncTaskStartParams,
|
||||
response: v2::ThreadAsyncTaskStartResponse,
|
||||
},
|
||||
#[experimental("thread/backgroundTerminals/clean")]
|
||||
ThreadBackgroundTerminalsClean => "thread/backgroundTerminals/clean" {
|
||||
params: v2::ThreadBackgroundTerminalsCleanParams,
|
||||
|
||||
@@ -3014,6 +3014,20 @@ pub struct ThreadShellCommandParams {
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadShellCommandResponse {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadAsyncTaskStartParams {
|
||||
pub thread_id: String,
|
||||
pub task_id: String,
|
||||
pub input: Vec<UserInput>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadAsyncTaskStartResponse {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
@@ -6684,6 +6698,49 @@ mod tests {
|
||||
assert_eq!(decoded, response);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn thread_async_task_start_params_round_trip() {
|
||||
let params = ThreadAsyncTaskStartParams {
|
||||
thread_id: "thr_123".to_string(),
|
||||
task_id: "async-1".to_string(),
|
||||
input: vec![UserInput::Text {
|
||||
text: "answer this on the side".to_string(),
|
||||
text_elements: Vec::new(),
|
||||
}],
|
||||
};
|
||||
|
||||
let value = serde_json::to_value(¶ms).expect("serialize thread/asyncTask/start params");
|
||||
assert_eq!(
|
||||
value,
|
||||
json!({
|
||||
"threadId": "thr_123",
|
||||
"taskId": "async-1",
|
||||
"input": [{
|
||||
"type": "text",
|
||||
"text": "answer this on the side",
|
||||
"text_elements": [],
|
||||
}],
|
||||
})
|
||||
);
|
||||
|
||||
let decoded = serde_json::from_value::<ThreadAsyncTaskStartParams>(value)
|
||||
.expect("deserialize thread/asyncTask/start params");
|
||||
assert_eq!(decoded, params);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn thread_async_task_start_response_round_trip() {
|
||||
let response = ThreadAsyncTaskStartResponse {};
|
||||
|
||||
let value =
|
||||
serde_json::to_value(&response).expect("serialize thread/asyncTask/start response");
|
||||
assert_eq!(value, json!({}));
|
||||
|
||||
let decoded = serde_json::from_value::<ThreadAsyncTaskStartResponse>(value)
|
||||
.expect("deserialize thread/asyncTask/start response");
|
||||
assert_eq!(decoded, response);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fs_changed_notification_round_trips() {
|
||||
let notification = FsChangedNotification {
|
||||
|
||||
@@ -118,6 +118,8 @@ use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadArchiveParams;
|
||||
use codex_app_server_protocol::ThreadArchiveResponse;
|
||||
use codex_app_server_protocol::ThreadArchivedNotification;
|
||||
use codex_app_server_protocol::ThreadAsyncTaskStartParams;
|
||||
use codex_app_server_protocol::ThreadAsyncTaskStartResponse;
|
||||
use codex_app_server_protocol::ThreadBackgroundTerminalsCleanParams;
|
||||
use codex_app_server_protocol::ThreadBackgroundTerminalsCleanResponse;
|
||||
use codex_app_server_protocol::ThreadClosedNotification;
|
||||
@@ -776,6 +778,10 @@ impl CodexMessageProcessor {
|
||||
self.thread_shell_command(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ThreadAsyncTaskStart { request_id, params } => {
|
||||
self.thread_async_task_start(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::SkillsList { request_id, params } => {
|
||||
self.skills_list(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
@@ -3375,6 +3381,57 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
async fn thread_async_task_start(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
params: ThreadAsyncTaskStartParams,
|
||||
) {
|
||||
if let Err(error) = Self::validate_v2_input_limit(¶ms.input) {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
|
||||
let ThreadAsyncTaskStartParams {
|
||||
thread_id,
|
||||
task_id,
|
||||
input,
|
||||
} = params;
|
||||
|
||||
if task_id.trim().is_empty() {
|
||||
self.send_invalid_request_error(request_id, "taskId must not be empty".to_string())
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
let (_, thread) = match self.load_thread(&thread_id).await {
|
||||
Ok(v) => v,
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let items = input.into_iter().map(V2UserInput::into_core).collect();
|
||||
match self
|
||||
.submit_core_op(
|
||||
&request_id,
|
||||
thread.as_ref(),
|
||||
Op::StartAsyncTask { task_id, items },
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadAsyncTaskStartResponse {})
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_internal_error(request_id, format!("failed to start async task: {err}"))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn thread_list(&self, request_id: ConnectionRequestId, params: ThreadListParams) {
|
||||
let ThreadListParams {
|
||||
cursor,
|
||||
|
||||
@@ -4594,6 +4594,10 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
|
||||
handlers::inter_agent_communication(&sess, sub.id.clone(), communication).await;
|
||||
false
|
||||
}
|
||||
Op::StartAsyncTask { task_id, items } => {
|
||||
handlers::start_async_task(&sess, sub.id.clone(), task_id, items).await;
|
||||
false
|
||||
}
|
||||
Op::ExecApproval {
|
||||
id: approval_id,
|
||||
turn_id,
|
||||
@@ -4747,9 +4751,12 @@ fn submission_dispatch_span(sub: &Submission) -> tracing::Span {
|
||||
|
||||
/// Operation handlers
|
||||
mod handlers {
|
||||
use crate::client_common::Prompt;
|
||||
use crate::client_common::ResponseEvent;
|
||||
use crate::codex::Session;
|
||||
use crate::codex::SessionSettingsUpdate;
|
||||
use crate::codex::SteerInputError;
|
||||
use crate::codex::TurnContext;
|
||||
|
||||
use crate::SkillError;
|
||||
use crate::codex::spawn_review_thread;
|
||||
@@ -4763,6 +4770,8 @@ mod handlers {
|
||||
use crate::review_prompts::resolve_review_request;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::session_index;
|
||||
use crate::stream_events_utils::last_assistant_message_from_item;
|
||||
use crate::stream_events_utils::raw_assistant_output_text_from_item;
|
||||
use crate::tasks::CompactTask;
|
||||
use crate::tasks::UndoTask;
|
||||
use crate::tasks::UserShellCommandMode;
|
||||
@@ -4770,6 +4779,15 @@ mod handlers {
|
||||
use crate::tasks::execute_user_shell_command;
|
||||
use codex_mcp::collect_mcp_snapshot_from_manager;
|
||||
use codex_mcp::compute_auth_statuses;
|
||||
use codex_protocol::error::CodexErr;
|
||||
use codex_protocol::error::Result as CodexResult;
|
||||
use codex_protocol::items::AgentMessageContent;
|
||||
use codex_protocol::items::AgentMessageItem;
|
||||
use codex_protocol::items::TurnItem;
|
||||
use codex_protocol::models::MessagePhase;
|
||||
use codex_protocol::models::ResponseInputItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::AgentMessageContentDeltaEvent;
|
||||
use codex_protocol::protocol::CodexErrorInfo;
|
||||
use codex_protocol::protocol::ErrorEvent;
|
||||
use codex_protocol::protocol::Event;
|
||||
@@ -4788,6 +4806,9 @@ mod handlers {
|
||||
use codex_protocol::protocol::WarningEvent;
|
||||
use codex_protocol::request_permissions::RequestPermissionsResponse;
|
||||
use codex_protocol::request_user_input::RequestUserInputResponse;
|
||||
use futures::StreamExt;
|
||||
use tracing::Instrument;
|
||||
use tracing::trace_span;
|
||||
|
||||
use crate::context_manager::is_user_turn_boundary;
|
||||
use codex_protocol::config_types::CollaborationMode;
|
||||
@@ -4801,6 +4822,7 @@ mod handlers {
|
||||
use serde_json::Value;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
@@ -4933,6 +4955,327 @@ mod handlers {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_async_task(
|
||||
sess: &Arc<Session>,
|
||||
_sub_id: String,
|
||||
task_id: String,
|
||||
items: Vec<UserInput>,
|
||||
) {
|
||||
let turn_context = sess.new_default_turn_with_sub_id(task_id).await;
|
||||
let sess = Arc::clone(sess);
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) =
|
||||
run_side_completion(Arc::clone(&sess), Arc::clone(&turn_context), items).await
|
||||
{
|
||||
sess.send_event(
|
||||
&turn_context,
|
||||
EventMsg::Error(ErrorEvent {
|
||||
message: format!("/async failed: {err}"),
|
||||
codex_error_info: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn run_side_completion(
|
||||
sess: Arc<Session>,
|
||||
turn_context: Arc<TurnContext>,
|
||||
items: Vec<UserInput>,
|
||||
) -> CodexResult<()> {
|
||||
if let Some(delay) = async_side_completion_leading_delay(&items) {
|
||||
tokio::time::sleep(delay).await;
|
||||
}
|
||||
|
||||
let side_request: ResponseItem = ResponseInputItem::from(items).into();
|
||||
let mut input = sess
|
||||
.clone_history()
|
||||
.await
|
||||
.for_prompt(&turn_context.model_info.input_modalities);
|
||||
input.push(side_request);
|
||||
|
||||
let mut base_instructions = sess.get_base_instructions().await;
|
||||
base_instructions
|
||||
.text
|
||||
.push_str("\n\n<codex_async_side_completion>\nYou are answering a /async request. The parent Codex turn may still be running. Answer only this /async request, using the parent thread history as read-only context. Do not claim to run tools; no tools are available in this side completion. If the request began with a sleep/wait duration, the runtime has already honored that delay; do not sleep again. Keep the answer focused and complete.\n</codex_async_side_completion>");
|
||||
|
||||
let prompt = Prompt {
|
||||
input,
|
||||
tools: Vec::new(),
|
||||
parallel_tool_calls: false,
|
||||
base_instructions,
|
||||
personality: turn_context.personality,
|
||||
output_schema: None,
|
||||
};
|
||||
let mut client_session = sess.services.model_client.new_session();
|
||||
let mut stream = client_session
|
||||
.stream(
|
||||
&prompt,
|
||||
&turn_context.model_info,
|
||||
&turn_context.session_telemetry,
|
||||
turn_context.reasoning_effort,
|
||||
turn_context.reasoning_summary,
|
||||
turn_context.config.service_tier,
|
||||
/*turn_metadata_header*/ None,
|
||||
)
|
||||
.instrument(trace_span!("async_side_completion_stream"))
|
||||
.await?;
|
||||
|
||||
let mut active_item_id: Option<String> = None;
|
||||
let mut message = String::new();
|
||||
|
||||
while let Some(event) = stream.next().await {
|
||||
match event? {
|
||||
ResponseEvent::Created
|
||||
| ResponseEvent::ServerModel(_)
|
||||
| ResponseEvent::ServerReasoningIncluded(_)
|
||||
| ResponseEvent::ModelsEtag(_)
|
||||
| ResponseEvent::RateLimits(_)
|
||||
| ResponseEvent::ReasoningSummaryDelta { .. }
|
||||
| ResponseEvent::ReasoningContentDelta { .. }
|
||||
| ResponseEvent::ReasoningSummaryPartAdded { .. } => {}
|
||||
ResponseEvent::OutputItemAdded(item) => {
|
||||
if active_item_id.is_none()
|
||||
&& let Some(text) = raw_assistant_output_text_from_item(&item)
|
||||
{
|
||||
let item_id = side_completion_item_id(&turn_context, &item);
|
||||
start_side_completion_item(sess.as_ref(), turn_context.as_ref(), &item_id)
|
||||
.await;
|
||||
message.push_str(&text);
|
||||
active_item_id = Some(item_id);
|
||||
}
|
||||
}
|
||||
ResponseEvent::OutputTextDelta(delta) => {
|
||||
if active_item_id.is_none() {
|
||||
let item_id = format!("{}-message", turn_context.sub_id);
|
||||
start_side_completion_item(sess.as_ref(), turn_context.as_ref(), &item_id)
|
||||
.await;
|
||||
active_item_id = Some(item_id);
|
||||
}
|
||||
if let Some(item_id) = active_item_id.as_ref() {
|
||||
sess.send_event(
|
||||
&turn_context,
|
||||
EventMsg::AgentMessageContentDelta(AgentMessageContentDeltaEvent {
|
||||
thread_id: sess.conversation_id.to_string(),
|
||||
turn_id: turn_context.sub_id.clone(),
|
||||
item_id: item_id.clone(),
|
||||
delta: delta.clone(),
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
message.push_str(&delta);
|
||||
}
|
||||
ResponseEvent::OutputItemDone(item) => {
|
||||
if let Some(final_message) = final_assistant_message_from_item(
|
||||
&item,
|
||||
turn_context.collaboration_mode.mode == ModeKind::Plan,
|
||||
) {
|
||||
active_item_id
|
||||
.get_or_insert_with(|| side_completion_item_id(&turn_context, &item));
|
||||
message = final_message;
|
||||
}
|
||||
}
|
||||
ResponseEvent::Completed { token_usage, .. } => {
|
||||
sess.update_token_usage_info(&turn_context, token_usage.as_ref())
|
||||
.await;
|
||||
let item_id = active_item_id
|
||||
.unwrap_or_else(|| format!("{}-message", turn_context.sub_id));
|
||||
complete_side_completion_item(
|
||||
sess.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
item_id,
|
||||
message,
|
||||
)
|
||||
.await;
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(CodexErr::Stream(
|
||||
"stream closed before response.completed".into(),
|
||||
None,
|
||||
))
|
||||
}
|
||||
|
||||
fn async_side_completion_leading_delay(items: &[UserInput]) -> Option<Duration> {
|
||||
let [UserInput::Text { text, .. }] = items else {
|
||||
return None;
|
||||
};
|
||||
parse_leading_sleep_then_delay(text)
|
||||
}
|
||||
|
||||
fn parse_leading_sleep_then_delay(text: &str) -> Option<Duration> {
|
||||
let normalized = text.trim_start().to_ascii_lowercase();
|
||||
let mut rest = strip_keyword(&normalized, "sleep")
|
||||
.or_else(|| strip_keyword(&normalized, "wait"))?
|
||||
.trim_start();
|
||||
rest = strip_keyword(rest, "for").unwrap_or(rest).trim_start();
|
||||
|
||||
let (amount, after_amount) = parse_leading_non_negative_number(rest)?;
|
||||
let after_amount = after_amount.trim_start();
|
||||
let (unit_scale, after_unit) = parse_optional_duration_unit(after_amount)?;
|
||||
let mut tail = after_unit.trim_start();
|
||||
tail = strip_keyword(tail, "and").unwrap_or(tail).trim_start();
|
||||
tail = tail.strip_prefix(',').unwrap_or(tail).trim_start();
|
||||
strip_keyword(tail, "then")?;
|
||||
|
||||
Duration::try_from_secs_f64(amount * unit_scale).ok()
|
||||
}
|
||||
|
||||
fn parse_leading_non_negative_number(rest: &str) -> Option<(f64, &str)> {
|
||||
let mut end = 0usize;
|
||||
let mut saw_digit = false;
|
||||
let mut saw_dot = false;
|
||||
for (idx, ch) in rest.char_indices() {
|
||||
if ch.is_ascii_digit() {
|
||||
saw_digit = true;
|
||||
end = idx + ch.len_utf8();
|
||||
} else if ch == '.' && !saw_dot {
|
||||
saw_dot = true;
|
||||
end = idx + ch.len_utf8();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !saw_digit {
|
||||
return None;
|
||||
}
|
||||
let amount = rest[..end].parse::<f64>().ok()?;
|
||||
if !amount.is_finite() || amount < 0.0 {
|
||||
return None;
|
||||
}
|
||||
Some((amount, &rest[end..]))
|
||||
}
|
||||
|
||||
fn parse_optional_duration_unit(rest: &str) -> Option<(f64, &str)> {
|
||||
let mut end = 0usize;
|
||||
for (idx, ch) in rest.char_indices() {
|
||||
if ch.is_ascii_alphabetic() {
|
||||
end = idx + ch.len_utf8();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if end == 0 {
|
||||
return Some((1.0, rest));
|
||||
}
|
||||
|
||||
let (token, tail) = rest.split_at(end);
|
||||
let scale = match token {
|
||||
"ms" | "msec" | "msecs" | "millisecond" | "milliseconds" => 0.001,
|
||||
"s" | "sec" | "secs" | "second" | "seconds" => 1.0,
|
||||
"m" | "min" | "mins" | "minute" | "minutes" => 60.0,
|
||||
"h" | "hr" | "hrs" | "hour" | "hours" => 60.0 * 60.0,
|
||||
"then" | "and" => return Some((1.0, rest)),
|
||||
_ => return None,
|
||||
};
|
||||
Some((scale, tail))
|
||||
}
|
||||
|
||||
fn strip_keyword<'a>(input: &'a str, keyword: &str) -> Option<&'a str> {
|
||||
let rest = input.strip_prefix(keyword)?;
|
||||
if rest.is_empty() || rest.starts_with(|ch: char| !ch.is_ascii_alphanumeric()) {
|
||||
Some(rest)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod async_side_completion_delay_tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn parses_leading_sleep_then_request_delay() {
|
||||
assert_eq!(
|
||||
parse_leading_sleep_then_delay("sleep 10s then say done"),
|
||||
Some(Duration::from_secs(10))
|
||||
);
|
||||
assert_eq!(
|
||||
parse_leading_sleep_then_delay("wait for 1.5 minutes, then summarize"),
|
||||
Some(Duration::from_secs(90))
|
||||
);
|
||||
assert_eq!(
|
||||
parse_leading_sleep_then_delay("WAIT 250ms and then answer"),
|
||||
Some(Duration::from_millis(250))
|
||||
);
|
||||
assert_eq!(
|
||||
parse_leading_sleep_then_delay("sleep 2 then answer"),
|
||||
Some(Duration::from_secs(2))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ignores_non_leading_or_unfinished_sleep_requests() {
|
||||
assert_eq!(
|
||||
parse_leading_sleep_then_delay("please sleep 10s then answer"),
|
||||
None
|
||||
);
|
||||
assert_eq!(parse_leading_sleep_then_delay("sleep 10s"), None);
|
||||
assert_eq!(
|
||||
parse_leading_sleep_then_delay("sleep 10 bananas then answer"),
|
||||
None
|
||||
);
|
||||
assert_eq!(
|
||||
parse_leading_sleep_then_delay("sleeper 10s then answer"),
|
||||
None
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn side_completion_item_id(turn_context: &TurnContext, item: &ResponseItem) -> String {
|
||||
if let ResponseItem::Message { id: Some(id), .. } = item {
|
||||
return id.clone();
|
||||
}
|
||||
format!("{}-message", turn_context.sub_id)
|
||||
}
|
||||
|
||||
fn final_assistant_message_from_item(item: &ResponseItem, plan_mode: bool) -> Option<String> {
|
||||
if let ResponseItem::Message { role, phase, .. } = item
|
||||
&& role == "assistant"
|
||||
&& matches!(phase, Some(MessagePhase::Commentary))
|
||||
{
|
||||
return None;
|
||||
}
|
||||
last_assistant_message_from_item(item, plan_mode)
|
||||
}
|
||||
|
||||
async fn start_side_completion_item(sess: &Session, turn_context: &TurnContext, item_id: &str) {
|
||||
sess.emit_turn_item_started(
|
||||
turn_context,
|
||||
&TurnItem::AgentMessage(AgentMessageItem {
|
||||
id: item_id.to_string(),
|
||||
content: vec![AgentMessageContent::Text {
|
||||
text: String::new(),
|
||||
}],
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn complete_side_completion_item(
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
item_id: String,
|
||||
text: String,
|
||||
) {
|
||||
sess.emit_turn_item_completed(
|
||||
turn_context,
|
||||
TurnItem::AgentMessage(AgentMessageItem {
|
||||
id: item_id,
|
||||
content: vec![AgentMessageContent::Text { text }],
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
pub async fn run_user_shell_command(sess: &Arc<Session>, sub_id: String, command: String) {
|
||||
if let Some((turn_context, cancellation_token)) =
|
||||
sess.active_turn_context_and_cancellation_token().await
|
||||
|
||||
@@ -307,6 +307,17 @@ pub enum Op {
|
||||
communication: InterAgentCommunication,
|
||||
},
|
||||
|
||||
/// Run a tool-free side completion against the active session history.
|
||||
///
|
||||
/// Unlike [`Op::UserTurn`], this op does not start or steer the parent turn and the
|
||||
/// model response is not recorded back into the parent thread.
|
||||
StartAsyncTask {
|
||||
/// Client-generated identifier used as the side completion turn id.
|
||||
task_id: String,
|
||||
/// Input to append to the parent thread context for this one completion.
|
||||
items: Vec<UserInput>,
|
||||
},
|
||||
|
||||
/// Override parts of the persistent turn context for subsequent turns.
|
||||
///
|
||||
/// All fields are optional; when omitted, the existing value is preserved.
|
||||
@@ -581,6 +592,7 @@ impl Op {
|
||||
Self::UserInput { .. } => "user_input",
|
||||
Self::UserTurn { .. } => "user_turn",
|
||||
Self::InterAgentCommunication { .. } => "inter_agent_communication",
|
||||
Self::StartAsyncTask { .. } => "start_async_task",
|
||||
Self::OverrideTurnContext { .. } => "override_turn_context",
|
||||
Self::ExecApproval { .. } => "exec_approval",
|
||||
Self::PatchApproval { .. } => "patch_approval",
|
||||
|
||||
@@ -30,6 +30,7 @@ use crate::exec_command::strip_bash_lc_and_escape;
|
||||
use crate::external_editor;
|
||||
use crate::file_search::FileSearchManager;
|
||||
use crate::history_cell;
|
||||
use crate::history_cell::AgentMessageCell;
|
||||
use crate::history_cell::HistoryCell;
|
||||
#[cfg(not(debug_assertions))]
|
||||
use crate::history_cell::UpdateAvailableHistoryCell;
|
||||
@@ -1005,6 +1006,14 @@ pub(crate) struct App {
|
||||
primary_session_configured: Option<ThreadSessionState>,
|
||||
pending_primary_events: VecDeque<ThreadBufferedEvent>,
|
||||
pending_app_server_requests: PendingAppServerRequests,
|
||||
async_turns: HashMap<String, AsyncTurnState>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct AsyncTurnState {
|
||||
origin_thread_id: ThreadId,
|
||||
prompt: String,
|
||||
last_agent_message: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -2208,6 +2217,135 @@ impl App {
|
||||
}
|
||||
}
|
||||
|
||||
async fn start_async_turn(
|
||||
&mut self,
|
||||
app_server: &mut AppServerSession,
|
||||
task_id: String,
|
||||
prompt: String,
|
||||
op: AppCommand,
|
||||
) -> Result<()> {
|
||||
let Some(origin_thread_id) = self.active_thread_id else {
|
||||
self.chat_widget
|
||||
.add_error_message("No active thread is available.".to_string());
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let AppCommandView::StartAsyncTask {
|
||||
task_id: op_task_id,
|
||||
..
|
||||
} = op.view()
|
||||
else {
|
||||
self.chat_widget.add_error_message(
|
||||
"Internal error: /async produced an unsupported operation.".to_string(),
|
||||
);
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
if op_task_id != task_id {
|
||||
self.chat_widget
|
||||
.add_error_message("Internal error: /async task id mismatch.".to_string());
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.async_turns.insert(
|
||||
task_id.clone(),
|
||||
AsyncTurnState {
|
||||
origin_thread_id,
|
||||
prompt,
|
||||
last_agent_message: None,
|
||||
},
|
||||
);
|
||||
|
||||
if let Err(err) = self.submit_active_thread_op(app_server, op).await {
|
||||
self.async_turns.remove(&task_id);
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn observe_async_turn_notification(
|
||||
&mut self,
|
||||
_thread_id: ThreadId,
|
||||
notification: &ServerNotification,
|
||||
) -> bool {
|
||||
match notification {
|
||||
ServerNotification::ItemStarted(notification) => {
|
||||
self.async_turns.contains_key(¬ification.turn_id)
|
||||
}
|
||||
ServerNotification::ItemCompleted(notification) => {
|
||||
if let Some(mut state) = self.async_turns.remove(¬ification.turn_id) {
|
||||
if let ThreadItem::AgentMessage { text, .. } = ¬ification.item {
|
||||
state.last_agent_message = Some(text.clone());
|
||||
}
|
||||
self.render_async_turn_result(state, TurnStatus::Completed);
|
||||
return true;
|
||||
}
|
||||
false
|
||||
}
|
||||
ServerNotification::AgentMessageDelta(notification) => {
|
||||
if let Some(state) = self.async_turns.get_mut(¬ification.turn_id) {
|
||||
state
|
||||
.last_agent_message
|
||||
.get_or_insert_with(String::new)
|
||||
.push_str(¬ification.delta);
|
||||
return true;
|
||||
}
|
||||
false
|
||||
}
|
||||
ServerNotification::Error(notification) => {
|
||||
if let Some(state) = self.async_turns.remove(¬ification.turn_id) {
|
||||
self.chat_widget.add_error_message(format!(
|
||||
"/async failed: {}\n{}",
|
||||
state.prompt, notification.error.message
|
||||
));
|
||||
return true;
|
||||
}
|
||||
false
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn render_async_turn_result(&mut self, state: AsyncTurnState, status: TurnStatus) {
|
||||
let thread_suffix = if self.active_thread_id == Some(state.origin_thread_id) {
|
||||
String::new()
|
||||
} else {
|
||||
format!(" for thread {}", state.origin_thread_id)
|
||||
};
|
||||
match status {
|
||||
TurnStatus::Completed => {
|
||||
let Some(message) = state
|
||||
.last_agent_message
|
||||
.filter(|message| !message.is_empty())
|
||||
else {
|
||||
self.chat_widget.add_info_message(
|
||||
format!("/async completed{thread_suffix} without a final message"),
|
||||
/*hint*/ None,
|
||||
);
|
||||
return;
|
||||
};
|
||||
let lines: Vec<Line<'static>> = message
|
||||
.lines()
|
||||
.map(|line| Line::from(line.to_string()))
|
||||
.collect();
|
||||
self.chat_widget
|
||||
.add_to_history(AgentMessageCell::new(lines, /*is_first_line*/ true));
|
||||
}
|
||||
TurnStatus::Interrupted => {
|
||||
self.chat_widget.add_info_message(
|
||||
format!("/async interrupted{thread_suffix}: {}", state.prompt),
|
||||
/*hint*/ None,
|
||||
);
|
||||
}
|
||||
TurnStatus::Failed => {
|
||||
self.chat_widget
|
||||
.add_error_message(format!("/async failed{thread_suffix}: {}", state.prompt));
|
||||
}
|
||||
TurnStatus::InProgress => {}
|
||||
}
|
||||
}
|
||||
|
||||
async fn try_submit_active_thread_op_via_app_server(
|
||||
&mut self,
|
||||
app_server: &mut AppServerSession,
|
||||
@@ -2222,6 +2360,12 @@ impl App {
|
||||
app_server.turn_interrupt(thread_id, turn_id).await?;
|
||||
Ok(true)
|
||||
}
|
||||
AppCommandView::StartAsyncTask { task_id, items } => {
|
||||
app_server
|
||||
.thread_async_task_start(thread_id, task_id.to_string(), items.to_vec())
|
||||
.await?;
|
||||
Ok(true)
|
||||
}
|
||||
AppCommandView::UserTurn {
|
||||
items,
|
||||
cwd,
|
||||
@@ -2473,6 +2617,9 @@ impl App {
|
||||
thread_id: ThreadId,
|
||||
notification: ServerNotification,
|
||||
) -> Result<()> {
|
||||
if self.observe_async_turn_notification(thread_id, ¬ification) {
|
||||
return Ok(());
|
||||
}
|
||||
let inferred_session = self
|
||||
.infer_session_for_thread_notification(thread_id, ¬ification)
|
||||
.await;
|
||||
@@ -3263,6 +3410,7 @@ impl App {
|
||||
self.primary_session_configured = None;
|
||||
self.pending_primary_events.clear();
|
||||
self.pending_app_server_requests.clear();
|
||||
self.async_turns.clear();
|
||||
self.chat_widget.set_pending_thread_approvals(Vec::new());
|
||||
self.sync_active_agent_label();
|
||||
}
|
||||
@@ -3798,6 +3946,7 @@ impl App {
|
||||
primary_session_configured: None,
|
||||
pending_primary_events: VecDeque::new(),
|
||||
pending_app_server_requests: PendingAppServerRequests::default(),
|
||||
async_turns: HashMap::new(),
|
||||
};
|
||||
if let Some(started) = initial_started_thread {
|
||||
app.enqueue_primary_thread_session(started.session, started.turns)
|
||||
@@ -4278,6 +4427,14 @@ impl App {
|
||||
AppEvent::CodexOp(op) => {
|
||||
self.submit_active_thread_op(app_server, op.into()).await?;
|
||||
}
|
||||
AppEvent::StartAsyncTurn {
|
||||
task_id,
|
||||
prompt,
|
||||
op,
|
||||
} => {
|
||||
self.start_async_turn(app_server, task_id, prompt, op.into())
|
||||
.await?;
|
||||
}
|
||||
AppEvent::SubmitThreadOp { thread_id, op } => {
|
||||
self.submit_thread_op(app_server, thread_id, op.into())
|
||||
.await?;
|
||||
@@ -9113,6 +9270,7 @@ guardian_approval = true
|
||||
primary_session_configured: None,
|
||||
pending_primary_events: VecDeque::new(),
|
||||
pending_app_server_requests: PendingAppServerRequests::default(),
|
||||
async_turns: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9167,6 +9325,7 @@ guardian_approval = true
|
||||
primary_session_configured: None,
|
||||
pending_primary_events: VecDeque::new(),
|
||||
pending_app_server_requests: PendingAppServerRequests::default(),
|
||||
async_turns: HashMap::new(),
|
||||
},
|
||||
rx,
|
||||
op_rx,
|
||||
@@ -9524,6 +9683,75 @@ guardian_approval = true
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn async_turn_observer_renders_side_completion_when_item_completes() {
|
||||
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;
|
||||
let origin_thread_id = ThreadId::new();
|
||||
let task_id = "async-task-1".to_string();
|
||||
app.active_thread_id = Some(origin_thread_id);
|
||||
app.async_turns.insert(
|
||||
task_id.clone(),
|
||||
AsyncTurnState {
|
||||
origin_thread_id,
|
||||
prompt: "side question".to_string(),
|
||||
last_agent_message: None,
|
||||
},
|
||||
);
|
||||
while app_event_rx.try_recv().is_ok() {}
|
||||
|
||||
assert!(app.observe_async_turn_notification(
|
||||
origin_thread_id,
|
||||
&ServerNotification::ItemStarted(codex_app_server_protocol::ItemStartedNotification {
|
||||
thread_id: origin_thread_id.to_string(),
|
||||
turn_id: task_id.clone(),
|
||||
item: ThreadItem::AgentMessage {
|
||||
id: "agent-1".to_string(),
|
||||
text: String::new(),
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
},
|
||||
}),
|
||||
));
|
||||
|
||||
assert!(
|
||||
app.observe_async_turn_notification(
|
||||
origin_thread_id,
|
||||
&agent_message_delta_notification(
|
||||
origin_thread_id,
|
||||
&task_id,
|
||||
"agent-1",
|
||||
"background ",
|
||||
),
|
||||
)
|
||||
);
|
||||
|
||||
assert!(app.observe_async_turn_notification(
|
||||
origin_thread_id,
|
||||
&ServerNotification::ItemCompleted(
|
||||
codex_app_server_protocol::ItemCompletedNotification {
|
||||
thread_id: origin_thread_id.to_string(),
|
||||
turn_id: task_id,
|
||||
item: ThreadItem::AgentMessage {
|
||||
id: "agent-1".to_string(),
|
||||
text: "background answer".to_string(),
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
},
|
||||
},
|
||||
),
|
||||
));
|
||||
|
||||
let cell = match app_event_rx.try_recv() {
|
||||
Ok(AppEvent::InsertHistoryCell(cell)) => cell,
|
||||
other => panic!("expected async answer history cell, saw {other:?}"),
|
||||
};
|
||||
assert_eq!(
|
||||
lines_to_single_string(&cell.transcript_lines(/*width*/ 120)),
|
||||
"• background answer"
|
||||
);
|
||||
assert!(app.async_turns.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn feedback_submission_for_inactive_thread_replays_into_origin_thread() {
|
||||
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;
|
||||
|
||||
@@ -38,6 +38,10 @@ pub(crate) enum AppCommandView<'a> {
|
||||
RunUserShellCommand {
|
||||
command: &'a str,
|
||||
},
|
||||
StartAsyncTask {
|
||||
task_id: &'a str,
|
||||
items: &'a [UserInput],
|
||||
},
|
||||
UserTurn {
|
||||
items: &'a [UserInput],
|
||||
cwd: &'a PathBuf,
|
||||
@@ -139,6 +143,10 @@ impl AppCommand {
|
||||
Self(Op::RunUserShellCommand { command })
|
||||
}
|
||||
|
||||
pub(crate) fn start_async_task(task_id: String, items: Vec<UserInput>) -> Self {
|
||||
Self(Op::StartAsyncTask { task_id, items })
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) fn user_turn(
|
||||
items: Vec<UserInput>,
|
||||
@@ -298,6 +306,9 @@ impl AppCommand {
|
||||
}
|
||||
Op::RealtimeConversationClose => AppCommandView::RealtimeConversationClose,
|
||||
Op::RunUserShellCommand { command } => AppCommandView::RunUserShellCommand { command },
|
||||
Op::StartAsyncTask { task_id, items } => {
|
||||
AppCommandView::StartAsyncTask { task_id, items }
|
||||
}
|
||||
Op::UserTurn {
|
||||
items,
|
||||
cwd,
|
||||
|
||||
@@ -108,6 +108,13 @@ pub(crate) enum AppEvent {
|
||||
/// Fork the current session into a new thread.
|
||||
ForkCurrentSession,
|
||||
|
||||
/// Spawn an async task from the active session and render its final response on completion.
|
||||
StartAsyncTurn {
|
||||
task_id: String,
|
||||
prompt: String,
|
||||
op: Op,
|
||||
},
|
||||
|
||||
/// Request to exit the application.
|
||||
///
|
||||
/// Use `ShutdownFirst` for user-initiated quits so core cleanup runs and the
|
||||
|
||||
@@ -24,6 +24,8 @@ use codex_app_server_protocol::ReviewStartResponse;
|
||||
use codex_app_server_protocol::SkillsListParams;
|
||||
use codex_app_server_protocol::SkillsListResponse;
|
||||
use codex_app_server_protocol::Thread;
|
||||
use codex_app_server_protocol::ThreadAsyncTaskStartParams;
|
||||
use codex_app_server_protocol::ThreadAsyncTaskStartResponse;
|
||||
use codex_app_server_protocol::ThreadBackgroundTerminalsCleanParams;
|
||||
use codex_app_server_protocol::ThreadBackgroundTerminalsCleanResponse;
|
||||
use codex_app_server_protocol::ThreadCompactStartParams;
|
||||
@@ -562,6 +564,28 @@ impl AppServerSession {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn thread_async_task_start(
|
||||
&mut self,
|
||||
thread_id: ThreadId,
|
||||
task_id: String,
|
||||
items: Vec<codex_protocol::user_input::UserInput>,
|
||||
) -> Result<()> {
|
||||
let request_id = self.next_request_id();
|
||||
let _: ThreadAsyncTaskStartResponse = self
|
||||
.client
|
||||
.request_typed(ClientRequest::ThreadAsyncTaskStart {
|
||||
request_id,
|
||||
params: ThreadAsyncTaskStartParams {
|
||||
thread_id: thread_id.to_string(),
|
||||
task_id,
|
||||
input: items.into_iter().map(Into::into).collect(),
|
||||
},
|
||||
})
|
||||
.await
|
||||
.wrap_err("thread/asyncTask/start failed in TUI")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn thread_background_terminals_clean(
|
||||
&mut self,
|
||||
thread_id: ThreadId,
|
||||
|
||||
@@ -5046,6 +5046,9 @@ impl ChatWidget {
|
||||
}
|
||||
self.app_event_tx.compact();
|
||||
}
|
||||
SlashCommand::Async => {
|
||||
self.add_error_message("Usage: /async <prompt>".to_string());
|
||||
}
|
||||
SlashCommand::Review => {
|
||||
self.open_review_popup();
|
||||
}
|
||||
@@ -5404,6 +5407,33 @@ impl ChatWidget {
|
||||
self.app_event_tx.set_thread_name(name);
|
||||
self.bottom_pane.drain_pending_submission_state();
|
||||
}
|
||||
SlashCommand::Async if !trimmed.is_empty() => {
|
||||
let Some((prepared_args, prepared_elements)) = self
|
||||
.bottom_pane
|
||||
.prepare_inline_args_submission(/*record_history*/ false)
|
||||
else {
|
||||
return;
|
||||
};
|
||||
let task_id = format!("async-{}", uuid::Uuid::new_v4());
|
||||
let Some(op) =
|
||||
self.async_turn_op(task_id.clone(), prepared_args.clone(), prepared_elements)
|
||||
else {
|
||||
self.bottom_pane.drain_pending_submission_state();
|
||||
return;
|
||||
};
|
||||
self.add_to_history(history_cell::new_user_prompt(
|
||||
format!("/async {prepared_args}"),
|
||||
Vec::new(),
|
||||
Vec::new(),
|
||||
Vec::new(),
|
||||
));
|
||||
self.app_event_tx.send(AppEvent::StartAsyncTurn {
|
||||
task_id,
|
||||
prompt: prepared_args,
|
||||
op: op.into_core(),
|
||||
});
|
||||
self.bottom_pane.drain_pending_submission_state();
|
||||
}
|
||||
SlashCommand::Plan if !trimmed.is_empty() => {
|
||||
self.dispatch_command(cmd);
|
||||
if self.active_mode_kind() != ModeKind::Plan {
|
||||
@@ -5839,6 +5869,24 @@ impl ChatWidget {
|
||||
self.needs_final_message_separator = false;
|
||||
}
|
||||
|
||||
fn async_turn_op(
|
||||
&mut self,
|
||||
task_id: String,
|
||||
text: String,
|
||||
text_elements: Vec<TextElement>,
|
||||
) -> Option<AppCommand> {
|
||||
if text.trim().is_empty() {
|
||||
return None;
|
||||
}
|
||||
Some(AppCommand::start_async_task(
|
||||
task_id,
|
||||
vec![UserInput::Text {
|
||||
text,
|
||||
text_elements,
|
||||
}],
|
||||
))
|
||||
}
|
||||
|
||||
/// Restore the blocked submission draft without losing mention resolution state.
|
||||
///
|
||||
/// The blocked-image path intentionally keeps the draft in the composer so
|
||||
|
||||
@@ -29,6 +29,38 @@ async fn slash_compact_eagerly_queues_follow_up_before_turn_start() {
|
||||
assert_matches!(op_rx.try_recv(), Err(TryRecvError::Empty));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn slash_async_echoes_prompt_and_dispatches_side_turn() {
|
||||
let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
|
||||
|
||||
chat.bottom_pane
|
||||
.set_composer_text("/async side question".to_string(), Vec::new(), Vec::new());
|
||||
chat.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
|
||||
|
||||
let mut rendered_prompt = None;
|
||||
let mut dispatched_prompt = None;
|
||||
while let Ok(event) = rx.try_recv() {
|
||||
match event {
|
||||
AppEvent::InsertHistoryCell(cell) => {
|
||||
rendered_prompt = Some(lines_to_single_string(&cell.display_lines(/*width*/ 80)));
|
||||
}
|
||||
AppEvent::StartAsyncTurn { prompt, op, .. } => {
|
||||
dispatched_prompt = Some(prompt);
|
||||
assert_matches!(op, Op::StartAsyncTask { .. });
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let rendered_prompt = rendered_prompt.expect("expected /async prompt history cell");
|
||||
assert!(
|
||||
rendered_prompt.contains("/async side question"),
|
||||
"expected literal /async command to be rendered, got {rendered_prompt:?}"
|
||||
);
|
||||
assert_eq!(dispatched_prompt.as_deref(), Some("side question"));
|
||||
assert_no_submit_op(&mut op_rx);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ctrl_d_quits_without_prompt() {
|
||||
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
|
||||
|
||||
@@ -29,6 +29,7 @@ pub enum SlashCommand {
|
||||
Fork,
|
||||
Init,
|
||||
Compact,
|
||||
Async,
|
||||
Plan,
|
||||
Collab,
|
||||
Agent,
|
||||
@@ -74,6 +75,7 @@ impl SlashCommand {
|
||||
SlashCommand::New => "start a new chat during a conversation",
|
||||
SlashCommand::Init => "create an AGENTS.md file with instructions for Codex",
|
||||
SlashCommand::Compact => "summarize conversation to prevent hitting the context limit",
|
||||
SlashCommand::Async => "ask in a background thread",
|
||||
SlashCommand::Review => "review my current changes and find issues",
|
||||
SlashCommand::Rename => "rename the current thread",
|
||||
SlashCommand::Resume => "resume a saved chat",
|
||||
@@ -130,6 +132,7 @@ impl SlashCommand {
|
||||
self,
|
||||
SlashCommand::Review
|
||||
| SlashCommand::Rename
|
||||
| SlashCommand::Async
|
||||
| SlashCommand::Plan
|
||||
| SlashCommand::Fast
|
||||
| SlashCommand::SandboxReadRoot
|
||||
@@ -160,6 +163,7 @@ impl SlashCommand {
|
||||
| SlashCommand::MemoryDrop
|
||||
| SlashCommand::MemoryUpdate => false,
|
||||
SlashCommand::Diff
|
||||
| SlashCommand::Async
|
||||
| SlashCommand::Copy
|
||||
| SlashCommand::Rename
|
||||
| SlashCommand::Mention
|
||||
@@ -216,6 +220,11 @@ mod tests {
|
||||
assert_eq!(SlashCommand::Stop.command(), "stop");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn async_command_parses() {
|
||||
assert_eq!(SlashCommand::from_str("async"), Ok(SlashCommand::Async));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clean_alias_parses_to_stop_command() {
|
||||
assert_eq!(SlashCommand::from_str("clean"), Ok(SlashCommand::Stop));
|
||||
|
||||
Reference in New Issue
Block a user