Compare commits

...

1 Commits

Author SHA1 Message Date
starr-openai
b7749c93b0 feat(codex): implement /btw /async slash command path 2026-04-13 16:14:00 -07:00
21 changed files with 1192 additions and 1 deletions

View File

@@ -2549,6 +2549,28 @@
],
"type": "object"
},
"ThreadAsyncTaskStartParams": {
"properties": {
"input": {
"items": {
"$ref": "#/definitions/UserInput"
},
"type": "array"
},
"taskId": {
"type": "string"
},
"threadId": {
"type": "string"
}
},
"required": [
"input",
"taskId",
"threadId"
],
"type": "object"
},
"ThreadCompactStartParams": {
"properties": {
"threadId": {
@@ -3743,6 +3765,30 @@
"title": "Thread/shellCommandRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"thread/asyncTask/start"
],
"title": "Thread/asyncTask/startRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadAsyncTaskStartParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Thread/asyncTask/startRequest",
"type": "object"
},
{
"properties": {
"id": {

View File

@@ -482,6 +482,30 @@
"title": "Thread/shellCommandRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/v2/RequestId"
},
"method": {
"enum": [
"thread/asyncTask/start"
],
"title": "Thread/asyncTask/startRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/ThreadAsyncTaskStartParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Thread/asyncTask/startRequest",
"type": "object"
},
{
"properties": {
"id": {
@@ -12448,6 +12472,35 @@
"title": "ThreadArchivedNotification",
"type": "object"
},
"ThreadAsyncTaskStartParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"input": {
"items": {
"$ref": "#/definitions/v2/UserInput"
},
"type": "array"
},
"taskId": {
"type": "string"
},
"threadId": {
"type": "string"
}
},
"required": [
"input",
"taskId",
"threadId"
],
"title": "ThreadAsyncTaskStartParams",
"type": "object"
},
"ThreadAsyncTaskStartResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ThreadAsyncTaskStartResponse",
"type": "object"
},
"ThreadClosedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {

View File

@@ -1057,6 +1057,30 @@
"title": "Thread/shellCommandRequest",
"type": "object"
},
{
"properties": {
"id": {
"$ref": "#/definitions/RequestId"
},
"method": {
"enum": [
"thread/asyncTask/start"
],
"title": "Thread/asyncTask/startRequestMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadAsyncTaskStartParams"
}
},
"required": [
"id",
"method",
"params"
],
"title": "Thread/asyncTask/startRequest",
"type": "object"
},
{
"properties": {
"id": {
@@ -10303,6 +10327,35 @@
"title": "ThreadArchivedNotification",
"type": "object"
},
"ThreadAsyncTaskStartParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"input": {
"items": {
"$ref": "#/definitions/UserInput"
},
"type": "array"
},
"taskId": {
"type": "string"
},
"threadId": {
"type": "string"
}
},
"required": [
"input",
"taskId",
"threadId"
],
"title": "ThreadAsyncTaskStartParams",
"type": "object"
},
"ThreadAsyncTaskStartResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ThreadAsyncTaskStartResponse",
"type": "object"
},
"ThreadClosedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {

View File

@@ -0,0 +1,188 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"ByteRange": {
"properties": {
"end": {
"format": "uint",
"minimum": 0.0,
"type": "integer"
},
"start": {
"format": "uint",
"minimum": 0.0,
"type": "integer"
}
},
"required": [
"end",
"start"
],
"type": "object"
},
"TextElement": {
"properties": {
"byteRange": {
"allOf": [
{
"$ref": "#/definitions/ByteRange"
}
],
"description": "Byte range in the parent `text` buffer that this element occupies."
},
"placeholder": {
"description": "Optional human-readable placeholder for the element, displayed in the UI.",
"type": [
"string",
"null"
]
}
},
"required": [
"byteRange"
],
"type": "object"
},
"UserInput": {
"oneOf": [
{
"properties": {
"text": {
"type": "string"
},
"text_elements": {
"default": [],
"description": "UI-defined spans within `text` used to render or persist special elements.",
"items": {
"$ref": "#/definitions/TextElement"
},
"type": "array"
},
"type": {
"enum": [
"text"
],
"title": "TextUserInputType",
"type": "string"
}
},
"required": [
"text",
"type"
],
"title": "TextUserInput",
"type": "object"
},
{
"properties": {
"type": {
"enum": [
"image"
],
"title": "ImageUserInputType",
"type": "string"
},
"url": {
"type": "string"
}
},
"required": [
"type",
"url"
],
"title": "ImageUserInput",
"type": "object"
},
{
"properties": {
"path": {
"type": "string"
},
"type": {
"enum": [
"localImage"
],
"title": "LocalImageUserInputType",
"type": "string"
}
},
"required": [
"path",
"type"
],
"title": "LocalImageUserInput",
"type": "object"
},
{
"properties": {
"name": {
"type": "string"
},
"path": {
"type": "string"
},
"type": {
"enum": [
"skill"
],
"title": "SkillUserInputType",
"type": "string"
}
},
"required": [
"name",
"path",
"type"
],
"title": "SkillUserInput",
"type": "object"
},
{
"properties": {
"name": {
"type": "string"
},
"path": {
"type": "string"
},
"type": {
"enum": [
"mention"
],
"title": "MentionUserInputType",
"type": "string"
}
},
"required": [
"name",
"path",
"type"
],
"title": "MentionUserInput",
"type": "object"
}
]
}
},
"properties": {
"input": {
"items": {
"$ref": "#/definitions/UserInput"
},
"type": "array"
},
"taskId": {
"type": "string"
},
"threadId": {
"type": "string"
}
},
"required": [
"input",
"taskId",
"threadId"
],
"title": "ThreadAsyncTaskStartParams",
"type": "object"
}

View File

@@ -0,0 +1,5 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ThreadAsyncTaskStartResponse",
"type": "object"
}

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,6 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { UserInput } from "./UserInput";
export type ThreadAsyncTaskStartParams = { threadId: string, taskId: string, input: Array<UserInput>, };

View File

@@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type ThreadAsyncTaskStartResponse = Record<string, never>;

View File

@@ -273,6 +273,8 @@ export type { ThreadActiveFlag } from "./ThreadActiveFlag";
export type { ThreadArchiveParams } from "./ThreadArchiveParams";
export type { ThreadArchiveResponse } from "./ThreadArchiveResponse";
export type { ThreadArchivedNotification } from "./ThreadArchivedNotification";
export type { ThreadAsyncTaskStartParams } from "./ThreadAsyncTaskStartParams";
export type { ThreadAsyncTaskStartResponse } from "./ThreadAsyncTaskStartResponse";
export type { ThreadClosedNotification } from "./ThreadClosedNotification";
export type { ThreadCompactStartParams } from "./ThreadCompactStartParams";
export type { ThreadCompactStartResponse } from "./ThreadCompactStartResponse";

View File

@@ -296,6 +296,10 @@ client_request_definitions! {
params: v2::ThreadShellCommandParams,
response: v2::ThreadShellCommandResponse,
},
ThreadAsyncTaskStart => "thread/asyncTask/start" {
params: v2::ThreadAsyncTaskStartParams,
response: v2::ThreadAsyncTaskStartResponse,
},
#[experimental("thread/backgroundTerminals/clean")]
ThreadBackgroundTerminalsClean => "thread/backgroundTerminals/clean" {
params: v2::ThreadBackgroundTerminalsCleanParams,

View File

@@ -3014,6 +3014,20 @@ pub struct ThreadShellCommandParams {
#[ts(export_to = "v2/")]
pub struct ThreadShellCommandResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadAsyncTaskStartParams {
pub thread_id: String,
pub task_id: String,
pub input: Vec<UserInput>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadAsyncTaskStartResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
@@ -6684,6 +6698,49 @@ mod tests {
assert_eq!(decoded, response);
}
#[test]
fn thread_async_task_start_params_round_trip() {
let params = ThreadAsyncTaskStartParams {
thread_id: "thr_123".to_string(),
task_id: "async-1".to_string(),
input: vec![UserInput::Text {
text: "answer this on the side".to_string(),
text_elements: Vec::new(),
}],
};
let value = serde_json::to_value(&params).expect("serialize thread/asyncTask/start params");
assert_eq!(
value,
json!({
"threadId": "thr_123",
"taskId": "async-1",
"input": [{
"type": "text",
"text": "answer this on the side",
"text_elements": [],
}],
})
);
let decoded = serde_json::from_value::<ThreadAsyncTaskStartParams>(value)
.expect("deserialize thread/asyncTask/start params");
assert_eq!(decoded, params);
}
#[test]
fn thread_async_task_start_response_round_trip() {
let response = ThreadAsyncTaskStartResponse {};
let value =
serde_json::to_value(&response).expect("serialize thread/asyncTask/start response");
assert_eq!(value, json!({}));
let decoded = serde_json::from_value::<ThreadAsyncTaskStartResponse>(value)
.expect("deserialize thread/asyncTask/start response");
assert_eq!(decoded, response);
}
#[test]
fn fs_changed_notification_round_trips() {
let notification = FsChangedNotification {

View File

@@ -118,6 +118,8 @@ use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadArchivedNotification;
use codex_app_server_protocol::ThreadAsyncTaskStartParams;
use codex_app_server_protocol::ThreadAsyncTaskStartResponse;
use codex_app_server_protocol::ThreadBackgroundTerminalsCleanParams;
use codex_app_server_protocol::ThreadBackgroundTerminalsCleanResponse;
use codex_app_server_protocol::ThreadClosedNotification;
@@ -776,6 +778,10 @@ impl CodexMessageProcessor {
self.thread_shell_command(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::ThreadAsyncTaskStart { request_id, params } => {
self.thread_async_task_start(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::SkillsList { request_id, params } => {
self.skills_list(to_connection_request_id(request_id), params)
.await;
@@ -3375,6 +3381,57 @@ impl CodexMessageProcessor {
}
}
async fn thread_async_task_start(
&self,
request_id: ConnectionRequestId,
params: ThreadAsyncTaskStartParams,
) {
if let Err(error) = Self::validate_v2_input_limit(&params.input) {
self.outgoing.send_error(request_id, error).await;
return;
}
let ThreadAsyncTaskStartParams {
thread_id,
task_id,
input,
} = params;
if task_id.trim().is_empty() {
self.send_invalid_request_error(request_id, "taskId must not be empty".to_string())
.await;
return;
}
let (_, thread) = match self.load_thread(&thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let items = input.into_iter().map(V2UserInput::into_core).collect();
match self
.submit_core_op(
&request_id,
thread.as_ref(),
Op::StartAsyncTask { task_id, items },
)
.await
{
Ok(_) => {
self.outgoing
.send_response(request_id, ThreadAsyncTaskStartResponse {})
.await;
}
Err(err) => {
self.send_internal_error(request_id, format!("failed to start async task: {err}"))
.await;
}
}
}
async fn thread_list(&self, request_id: ConnectionRequestId, params: ThreadListParams) {
let ThreadListParams {
cursor,

View File

@@ -4594,6 +4594,10 @@ async fn submission_loop(sess: Arc<Session>, config: Arc<Config>, rx_sub: Receiv
handlers::inter_agent_communication(&sess, sub.id.clone(), communication).await;
false
}
Op::StartAsyncTask { task_id, items } => {
handlers::start_async_task(&sess, sub.id.clone(), task_id, items).await;
false
}
Op::ExecApproval {
id: approval_id,
turn_id,
@@ -4747,9 +4751,12 @@ fn submission_dispatch_span(sub: &Submission) -> tracing::Span {
/// Operation handlers
mod handlers {
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
use crate::codex::Session;
use crate::codex::SessionSettingsUpdate;
use crate::codex::SteerInputError;
use crate::codex::TurnContext;
use crate::SkillError;
use crate::codex::spawn_review_thread;
@@ -4763,6 +4770,8 @@ mod handlers {
use crate::review_prompts::resolve_review_request;
use crate::rollout::RolloutRecorder;
use crate::rollout::session_index;
use crate::stream_events_utils::last_assistant_message_from_item;
use crate::stream_events_utils::raw_assistant_output_text_from_item;
use crate::tasks::CompactTask;
use crate::tasks::UndoTask;
use crate::tasks::UserShellCommandMode;
@@ -4770,6 +4779,15 @@ mod handlers {
use crate::tasks::execute_user_shell_command;
use codex_mcp::collect_mcp_snapshot_from_manager;
use codex_mcp::compute_auth_statuses;
use codex_protocol::error::CodexErr;
use codex_protocol::error::Result as CodexResult;
use codex_protocol::items::AgentMessageContent;
use codex_protocol::items::AgentMessageItem;
use codex_protocol::items::TurnItem;
use codex_protocol::models::MessagePhase;
use codex_protocol::models::ResponseInputItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::AgentMessageContentDeltaEvent;
use codex_protocol::protocol::CodexErrorInfo;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::Event;
@@ -4788,6 +4806,9 @@ mod handlers {
use codex_protocol::protocol::WarningEvent;
use codex_protocol::request_permissions::RequestPermissionsResponse;
use codex_protocol::request_user_input::RequestUserInputResponse;
use futures::StreamExt;
use tracing::Instrument;
use tracing::trace_span;
use crate::context_manager::is_user_turn_boundary;
use codex_protocol::config_types::CollaborationMode;
@@ -4801,6 +4822,7 @@ mod handlers {
use serde_json::Value;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tracing::info;
use tracing::warn;
@@ -4933,6 +4955,327 @@ mod handlers {
}
}
pub async fn start_async_task(
sess: &Arc<Session>,
_sub_id: String,
task_id: String,
items: Vec<UserInput>,
) {
let turn_context = sess.new_default_turn_with_sub_id(task_id).await;
let sess = Arc::clone(sess);
tokio::spawn(async move {
if let Err(err) =
run_side_completion(Arc::clone(&sess), Arc::clone(&turn_context), items).await
{
sess.send_event(
&turn_context,
EventMsg::Error(ErrorEvent {
message: format!("/async failed: {err}"),
codex_error_info: Some(CodexErrorInfo::Other),
}),
)
.await;
}
});
}
async fn run_side_completion(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
items: Vec<UserInput>,
) -> CodexResult<()> {
if let Some(delay) = async_side_completion_leading_delay(&items) {
tokio::time::sleep(delay).await;
}
let side_request: ResponseItem = ResponseInputItem::from(items).into();
let mut input = sess
.clone_history()
.await
.for_prompt(&turn_context.model_info.input_modalities);
input.push(side_request);
let mut base_instructions = sess.get_base_instructions().await;
base_instructions
.text
.push_str("\n\n<codex_async_side_completion>\nYou are answering a /async request. The parent Codex turn may still be running. Answer only this /async request, using the parent thread history as read-only context. Do not claim to run tools; no tools are available in this side completion. If the request began with a sleep/wait duration, the runtime has already honored that delay; do not sleep again. Keep the answer focused and complete.\n</codex_async_side_completion>");
let prompt = Prompt {
input,
tools: Vec::new(),
parallel_tool_calls: false,
base_instructions,
personality: turn_context.personality,
output_schema: None,
};
let mut client_session = sess.services.model_client.new_session();
let mut stream = client_session
.stream(
&prompt,
&turn_context.model_info,
&turn_context.session_telemetry,
turn_context.reasoning_effort,
turn_context.reasoning_summary,
turn_context.config.service_tier,
/*turn_metadata_header*/ None,
)
.instrument(trace_span!("async_side_completion_stream"))
.await?;
let mut active_item_id: Option<String> = None;
let mut message = String::new();
while let Some(event) = stream.next().await {
match event? {
ResponseEvent::Created
| ResponseEvent::ServerModel(_)
| ResponseEvent::ServerReasoningIncluded(_)
| ResponseEvent::ModelsEtag(_)
| ResponseEvent::RateLimits(_)
| ResponseEvent::ReasoningSummaryDelta { .. }
| ResponseEvent::ReasoningContentDelta { .. }
| ResponseEvent::ReasoningSummaryPartAdded { .. } => {}
ResponseEvent::OutputItemAdded(item) => {
if active_item_id.is_none()
&& let Some(text) = raw_assistant_output_text_from_item(&item)
{
let item_id = side_completion_item_id(&turn_context, &item);
start_side_completion_item(sess.as_ref(), turn_context.as_ref(), &item_id)
.await;
message.push_str(&text);
active_item_id = Some(item_id);
}
}
ResponseEvent::OutputTextDelta(delta) => {
if active_item_id.is_none() {
let item_id = format!("{}-message", turn_context.sub_id);
start_side_completion_item(sess.as_ref(), turn_context.as_ref(), &item_id)
.await;
active_item_id = Some(item_id);
}
if let Some(item_id) = active_item_id.as_ref() {
sess.send_event(
&turn_context,
EventMsg::AgentMessageContentDelta(AgentMessageContentDeltaEvent {
thread_id: sess.conversation_id.to_string(),
turn_id: turn_context.sub_id.clone(),
item_id: item_id.clone(),
delta: delta.clone(),
}),
)
.await;
}
message.push_str(&delta);
}
ResponseEvent::OutputItemDone(item) => {
if let Some(final_message) = final_assistant_message_from_item(
&item,
turn_context.collaboration_mode.mode == ModeKind::Plan,
) {
active_item_id
.get_or_insert_with(|| side_completion_item_id(&turn_context, &item));
message = final_message;
}
}
ResponseEvent::Completed { token_usage, .. } => {
sess.update_token_usage_info(&turn_context, token_usage.as_ref())
.await;
let item_id = active_item_id
.unwrap_or_else(|| format!("{}-message", turn_context.sub_id));
complete_side_completion_item(
sess.as_ref(),
turn_context.as_ref(),
item_id,
message,
)
.await;
return Ok(());
}
}
}
Err(CodexErr::Stream(
"stream closed before response.completed".into(),
None,
))
}
fn async_side_completion_leading_delay(items: &[UserInput]) -> Option<Duration> {
let [UserInput::Text { text, .. }] = items else {
return None;
};
parse_leading_sleep_then_delay(text)
}
fn parse_leading_sleep_then_delay(text: &str) -> Option<Duration> {
let normalized = text.trim_start().to_ascii_lowercase();
let mut rest = strip_keyword(&normalized, "sleep")
.or_else(|| strip_keyword(&normalized, "wait"))?
.trim_start();
rest = strip_keyword(rest, "for").unwrap_or(rest).trim_start();
let (amount, after_amount) = parse_leading_non_negative_number(rest)?;
let after_amount = after_amount.trim_start();
let (unit_scale, after_unit) = parse_optional_duration_unit(after_amount)?;
let mut tail = after_unit.trim_start();
tail = strip_keyword(tail, "and").unwrap_or(tail).trim_start();
tail = tail.strip_prefix(',').unwrap_or(tail).trim_start();
strip_keyword(tail, "then")?;
Duration::try_from_secs_f64(amount * unit_scale).ok()
}
fn parse_leading_non_negative_number(rest: &str) -> Option<(f64, &str)> {
let mut end = 0usize;
let mut saw_digit = false;
let mut saw_dot = false;
for (idx, ch) in rest.char_indices() {
if ch.is_ascii_digit() {
saw_digit = true;
end = idx + ch.len_utf8();
} else if ch == '.' && !saw_dot {
saw_dot = true;
end = idx + ch.len_utf8();
} else {
break;
}
}
if !saw_digit {
return None;
}
let amount = rest[..end].parse::<f64>().ok()?;
if !amount.is_finite() || amount < 0.0 {
return None;
}
Some((amount, &rest[end..]))
}
fn parse_optional_duration_unit(rest: &str) -> Option<(f64, &str)> {
let mut end = 0usize;
for (idx, ch) in rest.char_indices() {
if ch.is_ascii_alphabetic() {
end = idx + ch.len_utf8();
} else {
break;
}
}
if end == 0 {
return Some((1.0, rest));
}
let (token, tail) = rest.split_at(end);
let scale = match token {
"ms" | "msec" | "msecs" | "millisecond" | "milliseconds" => 0.001,
"s" | "sec" | "secs" | "second" | "seconds" => 1.0,
"m" | "min" | "mins" | "minute" | "minutes" => 60.0,
"h" | "hr" | "hrs" | "hour" | "hours" => 60.0 * 60.0,
"then" | "and" => return Some((1.0, rest)),
_ => return None,
};
Some((scale, tail))
}
fn strip_keyword<'a>(input: &'a str, keyword: &str) -> Option<&'a str> {
let rest = input.strip_prefix(keyword)?;
if rest.is_empty() || rest.starts_with(|ch: char| !ch.is_ascii_alphanumeric()) {
Some(rest)
} else {
None
}
}
#[cfg(test)]
mod async_side_completion_delay_tests {
use super::*;
#[test]
fn parses_leading_sleep_then_request_delay() {
assert_eq!(
parse_leading_sleep_then_delay("sleep 10s then say done"),
Some(Duration::from_secs(10))
);
assert_eq!(
parse_leading_sleep_then_delay("wait for 1.5 minutes, then summarize"),
Some(Duration::from_secs(90))
);
assert_eq!(
parse_leading_sleep_then_delay("WAIT 250ms and then answer"),
Some(Duration::from_millis(250))
);
assert_eq!(
parse_leading_sleep_then_delay("sleep 2 then answer"),
Some(Duration::from_secs(2))
);
}
#[test]
fn ignores_non_leading_or_unfinished_sleep_requests() {
assert_eq!(
parse_leading_sleep_then_delay("please sleep 10s then answer"),
None
);
assert_eq!(parse_leading_sleep_then_delay("sleep 10s"), None);
assert_eq!(
parse_leading_sleep_then_delay("sleep 10 bananas then answer"),
None
);
assert_eq!(
parse_leading_sleep_then_delay("sleeper 10s then answer"),
None
);
}
}
fn side_completion_item_id(turn_context: &TurnContext, item: &ResponseItem) -> String {
if let ResponseItem::Message { id: Some(id), .. } = item {
return id.clone();
}
format!("{}-message", turn_context.sub_id)
}
fn final_assistant_message_from_item(item: &ResponseItem, plan_mode: bool) -> Option<String> {
if let ResponseItem::Message { role, phase, .. } = item
&& role == "assistant"
&& matches!(phase, Some(MessagePhase::Commentary))
{
return None;
}
last_assistant_message_from_item(item, plan_mode)
}
async fn start_side_completion_item(sess: &Session, turn_context: &TurnContext, item_id: &str) {
sess.emit_turn_item_started(
turn_context,
&TurnItem::AgentMessage(AgentMessageItem {
id: item_id.to_string(),
content: vec![AgentMessageContent::Text {
text: String::new(),
}],
phase: None,
memory_citation: None,
}),
)
.await;
}
async fn complete_side_completion_item(
sess: &Session,
turn_context: &TurnContext,
item_id: String,
text: String,
) {
sess.emit_turn_item_completed(
turn_context,
TurnItem::AgentMessage(AgentMessageItem {
id: item_id,
content: vec![AgentMessageContent::Text { text }],
phase: None,
memory_citation: None,
}),
)
.await;
}
pub async fn run_user_shell_command(sess: &Arc<Session>, sub_id: String, command: String) {
if let Some((turn_context, cancellation_token)) =
sess.active_turn_context_and_cancellation_token().await

View File

@@ -307,6 +307,17 @@ pub enum Op {
communication: InterAgentCommunication,
},
/// Run a tool-free side completion against the active session history.
///
/// Unlike [`Op::UserTurn`], this op does not start or steer the parent turn and the
/// model response is not recorded back into the parent thread.
StartAsyncTask {
/// Client-generated identifier used as the side completion turn id.
task_id: String,
/// Input to append to the parent thread context for this one completion.
items: Vec<UserInput>,
},
/// Override parts of the persistent turn context for subsequent turns.
///
/// All fields are optional; when omitted, the existing value is preserved.
@@ -581,6 +592,7 @@ impl Op {
Self::UserInput { .. } => "user_input",
Self::UserTurn { .. } => "user_turn",
Self::InterAgentCommunication { .. } => "inter_agent_communication",
Self::StartAsyncTask { .. } => "start_async_task",
Self::OverrideTurnContext { .. } => "override_turn_context",
Self::ExecApproval { .. } => "exec_approval",
Self::PatchApproval { .. } => "patch_approval",

View File

@@ -30,6 +30,7 @@ use crate::exec_command::strip_bash_lc_and_escape;
use crate::external_editor;
use crate::file_search::FileSearchManager;
use crate::history_cell;
use crate::history_cell::AgentMessageCell;
use crate::history_cell::HistoryCell;
#[cfg(not(debug_assertions))]
use crate::history_cell::UpdateAvailableHistoryCell;
@@ -1005,6 +1006,14 @@ pub(crate) struct App {
primary_session_configured: Option<ThreadSessionState>,
pending_primary_events: VecDeque<ThreadBufferedEvent>,
pending_app_server_requests: PendingAppServerRequests,
async_turns: HashMap<String, AsyncTurnState>,
}
#[derive(Debug)]
struct AsyncTurnState {
origin_thread_id: ThreadId,
prompt: String,
last_agent_message: Option<String>,
}
#[derive(Default)]
@@ -2208,6 +2217,135 @@ impl App {
}
}
async fn start_async_turn(
&mut self,
app_server: &mut AppServerSession,
task_id: String,
prompt: String,
op: AppCommand,
) -> Result<()> {
let Some(origin_thread_id) = self.active_thread_id else {
self.chat_widget
.add_error_message("No active thread is available.".to_string());
return Ok(());
};
let AppCommandView::StartAsyncTask {
task_id: op_task_id,
..
} = op.view()
else {
self.chat_widget.add_error_message(
"Internal error: /async produced an unsupported operation.".to_string(),
);
return Ok(());
};
if op_task_id != task_id {
self.chat_widget
.add_error_message("Internal error: /async task id mismatch.".to_string());
return Ok(());
}
self.async_turns.insert(
task_id.clone(),
AsyncTurnState {
origin_thread_id,
prompt,
last_agent_message: None,
},
);
if let Err(err) = self.submit_active_thread_op(app_server, op).await {
self.async_turns.remove(&task_id);
return Err(err);
}
Ok(())
}
fn observe_async_turn_notification(
&mut self,
_thread_id: ThreadId,
notification: &ServerNotification,
) -> bool {
match notification {
ServerNotification::ItemStarted(notification) => {
self.async_turns.contains_key(&notification.turn_id)
}
ServerNotification::ItemCompleted(notification) => {
if let Some(mut state) = self.async_turns.remove(&notification.turn_id) {
if let ThreadItem::AgentMessage { text, .. } = &notification.item {
state.last_agent_message = Some(text.clone());
}
self.render_async_turn_result(state, TurnStatus::Completed);
return true;
}
false
}
ServerNotification::AgentMessageDelta(notification) => {
if let Some(state) = self.async_turns.get_mut(&notification.turn_id) {
state
.last_agent_message
.get_or_insert_with(String::new)
.push_str(&notification.delta);
return true;
}
false
}
ServerNotification::Error(notification) => {
if let Some(state) = self.async_turns.remove(&notification.turn_id) {
self.chat_widget.add_error_message(format!(
"/async failed: {}\n{}",
state.prompt, notification.error.message
));
return true;
}
false
}
_ => false,
}
}
fn render_async_turn_result(&mut self, state: AsyncTurnState, status: TurnStatus) {
let thread_suffix = if self.active_thread_id == Some(state.origin_thread_id) {
String::new()
} else {
format!(" for thread {}", state.origin_thread_id)
};
match status {
TurnStatus::Completed => {
let Some(message) = state
.last_agent_message
.filter(|message| !message.is_empty())
else {
self.chat_widget.add_info_message(
format!("/async completed{thread_suffix} without a final message"),
/*hint*/ None,
);
return;
};
let lines: Vec<Line<'static>> = message
.lines()
.map(|line| Line::from(line.to_string()))
.collect();
self.chat_widget
.add_to_history(AgentMessageCell::new(lines, /*is_first_line*/ true));
}
TurnStatus::Interrupted => {
self.chat_widget.add_info_message(
format!("/async interrupted{thread_suffix}: {}", state.prompt),
/*hint*/ None,
);
}
TurnStatus::Failed => {
self.chat_widget
.add_error_message(format!("/async failed{thread_suffix}: {}", state.prompt));
}
TurnStatus::InProgress => {}
}
}
async fn try_submit_active_thread_op_via_app_server(
&mut self,
app_server: &mut AppServerSession,
@@ -2222,6 +2360,12 @@ impl App {
app_server.turn_interrupt(thread_id, turn_id).await?;
Ok(true)
}
AppCommandView::StartAsyncTask { task_id, items } => {
app_server
.thread_async_task_start(thread_id, task_id.to_string(), items.to_vec())
.await?;
Ok(true)
}
AppCommandView::UserTurn {
items,
cwd,
@@ -2473,6 +2617,9 @@ impl App {
thread_id: ThreadId,
notification: ServerNotification,
) -> Result<()> {
if self.observe_async_turn_notification(thread_id, &notification) {
return Ok(());
}
let inferred_session = self
.infer_session_for_thread_notification(thread_id, &notification)
.await;
@@ -3263,6 +3410,7 @@ impl App {
self.primary_session_configured = None;
self.pending_primary_events.clear();
self.pending_app_server_requests.clear();
self.async_turns.clear();
self.chat_widget.set_pending_thread_approvals(Vec::new());
self.sync_active_agent_label();
}
@@ -3798,6 +3946,7 @@ impl App {
primary_session_configured: None,
pending_primary_events: VecDeque::new(),
pending_app_server_requests: PendingAppServerRequests::default(),
async_turns: HashMap::new(),
};
if let Some(started) = initial_started_thread {
app.enqueue_primary_thread_session(started.session, started.turns)
@@ -4278,6 +4427,14 @@ impl App {
AppEvent::CodexOp(op) => {
self.submit_active_thread_op(app_server, op.into()).await?;
}
AppEvent::StartAsyncTurn {
task_id,
prompt,
op,
} => {
self.start_async_turn(app_server, task_id, prompt, op.into())
.await?;
}
AppEvent::SubmitThreadOp { thread_id, op } => {
self.submit_thread_op(app_server, thread_id, op.into())
.await?;
@@ -9113,6 +9270,7 @@ guardian_approval = true
primary_session_configured: None,
pending_primary_events: VecDeque::new(),
pending_app_server_requests: PendingAppServerRequests::default(),
async_turns: HashMap::new(),
}
}
@@ -9167,6 +9325,7 @@ guardian_approval = true
primary_session_configured: None,
pending_primary_events: VecDeque::new(),
pending_app_server_requests: PendingAppServerRequests::default(),
async_turns: HashMap::new(),
},
rx,
op_rx,
@@ -9524,6 +9683,75 @@ guardian_approval = true
);
}
#[tokio::test]
async fn async_turn_observer_renders_side_completion_when_item_completes() {
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;
let origin_thread_id = ThreadId::new();
let task_id = "async-task-1".to_string();
app.active_thread_id = Some(origin_thread_id);
app.async_turns.insert(
task_id.clone(),
AsyncTurnState {
origin_thread_id,
prompt: "side question".to_string(),
last_agent_message: None,
},
);
while app_event_rx.try_recv().is_ok() {}
assert!(app.observe_async_turn_notification(
origin_thread_id,
&ServerNotification::ItemStarted(codex_app_server_protocol::ItemStartedNotification {
thread_id: origin_thread_id.to_string(),
turn_id: task_id.clone(),
item: ThreadItem::AgentMessage {
id: "agent-1".to_string(),
text: String::new(),
phase: None,
memory_citation: None,
},
}),
));
assert!(
app.observe_async_turn_notification(
origin_thread_id,
&agent_message_delta_notification(
origin_thread_id,
&task_id,
"agent-1",
"background ",
),
)
);
assert!(app.observe_async_turn_notification(
origin_thread_id,
&ServerNotification::ItemCompleted(
codex_app_server_protocol::ItemCompletedNotification {
thread_id: origin_thread_id.to_string(),
turn_id: task_id,
item: ThreadItem::AgentMessage {
id: "agent-1".to_string(),
text: "background answer".to_string(),
phase: None,
memory_citation: None,
},
},
),
));
let cell = match app_event_rx.try_recv() {
Ok(AppEvent::InsertHistoryCell(cell)) => cell,
other => panic!("expected async answer history cell, saw {other:?}"),
};
assert_eq!(
lines_to_single_string(&cell.transcript_lines(/*width*/ 120)),
"• background answer"
);
assert!(app.async_turns.is_empty());
}
#[tokio::test]
async fn feedback_submission_for_inactive_thread_replays_into_origin_thread() {
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;

View File

@@ -38,6 +38,10 @@ pub(crate) enum AppCommandView<'a> {
RunUserShellCommand {
command: &'a str,
},
StartAsyncTask {
task_id: &'a str,
items: &'a [UserInput],
},
UserTurn {
items: &'a [UserInput],
cwd: &'a PathBuf,
@@ -139,6 +143,10 @@ impl AppCommand {
Self(Op::RunUserShellCommand { command })
}
pub(crate) fn start_async_task(task_id: String, items: Vec<UserInput>) -> Self {
Self(Op::StartAsyncTask { task_id, items })
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn user_turn(
items: Vec<UserInput>,
@@ -298,6 +306,9 @@ impl AppCommand {
}
Op::RealtimeConversationClose => AppCommandView::RealtimeConversationClose,
Op::RunUserShellCommand { command } => AppCommandView::RunUserShellCommand { command },
Op::StartAsyncTask { task_id, items } => {
AppCommandView::StartAsyncTask { task_id, items }
}
Op::UserTurn {
items,
cwd,

View File

@@ -108,6 +108,13 @@ pub(crate) enum AppEvent {
/// Fork the current session into a new thread.
ForkCurrentSession,
/// Spawn an async task from the active session and render its final response on completion.
StartAsyncTurn {
task_id: String,
prompt: String,
op: Op,
},
/// Request to exit the application.
///
/// Use `ShutdownFirst` for user-initiated quits so core cleanup runs and the

View File

@@ -24,6 +24,8 @@ use codex_app_server_protocol::ReviewStartResponse;
use codex_app_server_protocol::SkillsListParams;
use codex_app_server_protocol::SkillsListResponse;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadAsyncTaskStartParams;
use codex_app_server_protocol::ThreadAsyncTaskStartResponse;
use codex_app_server_protocol::ThreadBackgroundTerminalsCleanParams;
use codex_app_server_protocol::ThreadBackgroundTerminalsCleanResponse;
use codex_app_server_protocol::ThreadCompactStartParams;
@@ -562,6 +564,28 @@ impl AppServerSession {
Ok(())
}
pub(crate) async fn thread_async_task_start(
&mut self,
thread_id: ThreadId,
task_id: String,
items: Vec<codex_protocol::user_input::UserInput>,
) -> Result<()> {
let request_id = self.next_request_id();
let _: ThreadAsyncTaskStartResponse = self
.client
.request_typed(ClientRequest::ThreadAsyncTaskStart {
request_id,
params: ThreadAsyncTaskStartParams {
thread_id: thread_id.to_string(),
task_id,
input: items.into_iter().map(Into::into).collect(),
},
})
.await
.wrap_err("thread/asyncTask/start failed in TUI")?;
Ok(())
}
pub(crate) async fn thread_background_terminals_clean(
&mut self,
thread_id: ThreadId,

View File

@@ -5046,6 +5046,9 @@ impl ChatWidget {
}
self.app_event_tx.compact();
}
SlashCommand::Async => {
self.add_error_message("Usage: /async <prompt>".to_string());
}
SlashCommand::Review => {
self.open_review_popup();
}
@@ -5404,6 +5407,33 @@ impl ChatWidget {
self.app_event_tx.set_thread_name(name);
self.bottom_pane.drain_pending_submission_state();
}
SlashCommand::Async if !trimmed.is_empty() => {
let Some((prepared_args, prepared_elements)) = self
.bottom_pane
.prepare_inline_args_submission(/*record_history*/ false)
else {
return;
};
let task_id = format!("async-{}", uuid::Uuid::new_v4());
let Some(op) =
self.async_turn_op(task_id.clone(), prepared_args.clone(), prepared_elements)
else {
self.bottom_pane.drain_pending_submission_state();
return;
};
self.add_to_history(history_cell::new_user_prompt(
format!("/async {prepared_args}"),
Vec::new(),
Vec::new(),
Vec::new(),
));
self.app_event_tx.send(AppEvent::StartAsyncTurn {
task_id,
prompt: prepared_args,
op: op.into_core(),
});
self.bottom_pane.drain_pending_submission_state();
}
SlashCommand::Plan if !trimmed.is_empty() => {
self.dispatch_command(cmd);
if self.active_mode_kind() != ModeKind::Plan {
@@ -5839,6 +5869,24 @@ impl ChatWidget {
self.needs_final_message_separator = false;
}
fn async_turn_op(
&mut self,
task_id: String,
text: String,
text_elements: Vec<TextElement>,
) -> Option<AppCommand> {
if text.trim().is_empty() {
return None;
}
Some(AppCommand::start_async_task(
task_id,
vec![UserInput::Text {
text,
text_elements,
}],
))
}
/// Restore the blocked submission draft without losing mention resolution state.
///
/// The blocked-image path intentionally keeps the draft in the composer so

View File

@@ -29,6 +29,38 @@ async fn slash_compact_eagerly_queues_follow_up_before_turn_start() {
assert_matches!(op_rx.try_recv(), Err(TryRecvError::Empty));
}
#[tokio::test]
async fn slash_async_echoes_prompt_and_dispatches_side_turn() {
let (mut chat, mut rx, mut op_rx) = make_chatwidget_manual(/*model_override*/ None).await;
chat.bottom_pane
.set_composer_text("/async side question".to_string(), Vec::new(), Vec::new());
chat.handle_key_event(KeyEvent::new(KeyCode::Enter, KeyModifiers::NONE));
let mut rendered_prompt = None;
let mut dispatched_prompt = None;
while let Ok(event) = rx.try_recv() {
match event {
AppEvent::InsertHistoryCell(cell) => {
rendered_prompt = Some(lines_to_single_string(&cell.display_lines(/*width*/ 80)));
}
AppEvent::StartAsyncTurn { prompt, op, .. } => {
dispatched_prompt = Some(prompt);
assert_matches!(op, Op::StartAsyncTask { .. });
}
_ => {}
}
}
let rendered_prompt = rendered_prompt.expect("expected /async prompt history cell");
assert!(
rendered_prompt.contains("/async side question"),
"expected literal /async command to be rendered, got {rendered_prompt:?}"
);
assert_eq!(dispatched_prompt.as_deref(), Some("side question"));
assert_no_submit_op(&mut op_rx);
}
#[tokio::test]
async fn ctrl_d_quits_without_prompt() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await;

View File

@@ -29,6 +29,7 @@ pub enum SlashCommand {
Fork,
Init,
Compact,
Async,
Plan,
Collab,
Agent,
@@ -74,6 +75,7 @@ impl SlashCommand {
SlashCommand::New => "start a new chat during a conversation",
SlashCommand::Init => "create an AGENTS.md file with instructions for Codex",
SlashCommand::Compact => "summarize conversation to prevent hitting the context limit",
SlashCommand::Async => "ask in a background thread",
SlashCommand::Review => "review my current changes and find issues",
SlashCommand::Rename => "rename the current thread",
SlashCommand::Resume => "resume a saved chat",
@@ -130,6 +132,7 @@ impl SlashCommand {
self,
SlashCommand::Review
| SlashCommand::Rename
| SlashCommand::Async
| SlashCommand::Plan
| SlashCommand::Fast
| SlashCommand::SandboxReadRoot
@@ -160,6 +163,7 @@ impl SlashCommand {
| SlashCommand::MemoryDrop
| SlashCommand::MemoryUpdate => false,
SlashCommand::Diff
| SlashCommand::Async
| SlashCommand::Copy
| SlashCommand::Rename
| SlashCommand::Mention
@@ -216,6 +220,11 @@ mod tests {
assert_eq!(SlashCommand::Stop.command(), "stop");
}
#[test]
fn async_command_parses() {
assert_eq!(SlashCommand::from_str("async"), Ok(SlashCommand::Async));
}
#[test]
fn clean_alias_parses_to_stop_command() {
assert_eq!(SlashCommand::from_str("clean"), Ok(SlashCommand::Stop));