Compare commits

...

2 Commits

Author SHA1 Message Date
Charles Cunningham
e69ad4f446 dd skip_serializing_if 2026-01-24 21:44:46 -08:00
Charles Cunningham
87fbfcc61d Expose collaboration_mode_kind in turn.started / turn notifications 2026-01-24 21:39:51 -08:00
14 changed files with 77 additions and 5 deletions

View File

@@ -3,6 +3,7 @@ use crate::protocol::v2::Turn;
use crate::protocol::v2::TurnError;
use crate::protocol::v2::TurnStatus;
use crate::protocol::v2::UserInput;
use codex_protocol::config_types::ModeKind;
use codex_protocol::protocol::AgentReasoningEvent;
use codex_protocol::protocol::AgentReasoningRawContentEvent;
use codex_protocol::protocol::EventMsg;
@@ -164,6 +165,7 @@ impl ThreadHistoryBuilder {
items: Vec::new(),
error: None,
status: TurnStatus::Completed,
collaboration_mode_kind: None,
}
}
@@ -222,6 +224,7 @@ struct PendingTurn {
items: Vec<ThreadItem>,
error: Option<TurnError>,
status: TurnStatus,
collaboration_mode_kind: Option<ModeKind>,
}
impl From<PendingTurn> for Turn {
@@ -231,6 +234,7 @@ impl From<PendingTurn> for Turn {
items: value.items,
error: value.error,
status: value.status,
collaboration_mode_kind: value.collaboration_mode_kind,
}
}
}
@@ -493,6 +497,7 @@ mod tests {
id: "turn-1".into(),
status: TurnStatus::Completed,
error: None,
collaboration_mode_kind: None,
items: vec![
ThreadItem::UserMessage {
id: "item-1".into(),
@@ -511,6 +516,7 @@ mod tests {
id: "turn-2".into(),
status: TurnStatus::Completed,
error: None,
collaboration_mode_kind: None,
items: vec![
ThreadItem::UserMessage {
id: "item-3".into(),

View File

@@ -6,6 +6,7 @@ use codex_protocol::account::PlanType;
use codex_protocol::approvals::ExecPolicyAmendment as CoreExecPolicyAmendment;
use codex_protocol::config_types::CollaborationMode;
use codex_protocol::config_types::ForcedLoginMethod;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Personality;
use codex_protocol::config_types::ReasoningSummary;
use codex_protocol::config_types::SandboxMode as CoreSandboxMode;
@@ -1563,6 +1564,10 @@ pub struct Turn {
pub status: TurnStatus,
/// Only populated when the Turn's status is failed.
pub error: Option<TurnError>,
/// Collaboration mode kind captured at the start of the turn, when known.
#[serde(default)]
#[ts(optional)]
pub collaboration_mode_kind: Option<ModeKind>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS, Error)]

View File

@@ -385,7 +385,7 @@ Event notifications are the server-initiated event stream for thread lifecycles,
The app-server streams JSON-RPC notifications while a turn is running. Each turn starts with `turn/started` (initial `turn`) and ends with `turn/completed` (final `turn` status). Token usage events stream separately via `thread/tokenUsage/updated`. Clients subscribe to the events they care about, rendering each item incrementally as updates arrive. The per-item lifecycle is always: `item/started` → zero or more item-specific deltas → `item/completed`.
- `turn/started``{ turn }` with the turn id, empty `items`, and `status: "inProgress"`.
- `turn/started``{ turn }` with the turn id, empty `items`, `status: "inProgress"`, and (when known) `collaborationModeKind`.
- `turn/completed``{ turn }` where `turn.status` is `completed`, `interrupted`, or `failed`; failures carry `{ error: { message, codexErrorInfo?, additionalDetails? } }`.
- `turn/diff/updated``{ threadId, turnId, diff }` represents the up-to-date snapshot of the turn-level unified diff, emitted after every FileChange item. `diff` is the latest aggregated unified diff across every file change in the turn. UIs can render this to show the full "what changed" view without stitching individual `fileChange` items.
- `turn/plan/updated``{ turnId, explanation?, plan }` whenever the agent shares or changes its plan; each `plan` entry is `{ step, status }` with `status` in `pending`, `inProgress`, or `completed`.

View File

@@ -85,6 +85,7 @@ use codex_core::protocol::TurnDiffEvent;
use codex_core::review_format::format_review_findings_block;
use codex_core::review_prompts;
use codex_protocol::ThreadId;
use codex_protocol::config_types::ModeKind;
use codex_protocol::plan_tool::UpdatePlanArgs;
use codex_protocol::protocol::ReviewOutputEvent;
use codex_protocol::request_user_input::RequestUserInputAnswer as CoreRequestUserInputAnswer;
@@ -115,6 +116,11 @@ pub(crate) async fn apply_bespoke_event_handling(
msg,
} = event;
match msg {
EventMsg::TurnStarted(turn_started_event) => {
let mut summaries = turn_summary_store.lock().await;
let summary = summaries.entry(conversation_id).or_default();
summary.collaboration_mode_kind = Some(turn_started_event.collaboration_mode_kind);
}
EventMsg::TurnComplete(_ev) => {
handle_turn_complete(
conversation_id,
@@ -1132,6 +1138,7 @@ async fn emit_turn_completed_with_status(
event_turn_id: String,
status: TurnStatus,
error: Option<TurnError>,
collaboration_mode_kind: Option<ModeKind>,
outgoing: &OutgoingMessageSender,
) {
let notification = TurnCompletedNotification {
@@ -1141,6 +1148,7 @@ async fn emit_turn_completed_with_status(
items: vec![],
error,
status,
collaboration_mode_kind,
},
};
outgoing
@@ -1248,13 +1256,22 @@ async fn handle_turn_complete(
turn_summary_store: &TurnSummaryStore,
) {
let turn_summary = find_and_remove_turn_summary(conversation_id, turn_summary_store).await;
let collaboration_mode_kind = turn_summary.collaboration_mode_kind;
let (status, error) = match turn_summary.last_error {
Some(error) => (TurnStatus::Failed, Some(error)),
None => (TurnStatus::Completed, None),
};
emit_turn_completed_with_status(conversation_id, event_turn_id, status, error, outgoing).await;
emit_turn_completed_with_status(
conversation_id,
event_turn_id,
status,
error,
collaboration_mode_kind,
outgoing,
)
.await;
}
async fn handle_turn_interrupted(
@@ -1263,13 +1280,14 @@ async fn handle_turn_interrupted(
outgoing: &OutgoingMessageSender,
turn_summary_store: &TurnSummaryStore,
) {
find_and_remove_turn_summary(conversation_id, turn_summary_store).await;
let turn_summary = find_and_remove_turn_summary(conversation_id, turn_summary_store).await;
emit_turn_completed_with_status(
conversation_id,
event_turn_id,
TurnStatus::Interrupted,
None,
turn_summary.collaboration_mode_kind,
outgoing,
)
.await;

View File

@@ -170,6 +170,7 @@ use codex_login::ShutdownHandle;
use codex_login::run_login_server;
use codex_protocol::ThreadId;
use codex_protocol::config_types::ForcedLoginMethod;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Personality;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
@@ -213,6 +214,7 @@ pub(crate) type PendingRollbacks = Arc<Mutex<HashMap<ThreadId, RequestId>>>;
pub(crate) struct TurnSummary {
pub(crate) file_change_started: HashSet<String>,
pub(crate) last_error: Option<TurnError>,
pub(crate) collaboration_mode_kind: Option<ModeKind>,
}
pub(crate) type TurnSummaryStore = Arc<Mutex<HashMap<ThreadId, TurnSummary>>>;
@@ -3641,13 +3643,14 @@ impl CodexMessageProcessor {
}
async fn turn_start(&self, request_id: RequestId, params: TurnStartParams) {
let (_, thread) = match self.load_thread(&params.thread_id).await {
let (thread_id, thread) = match self.load_thread(&params.thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let thread_id_str = params.thread_id.clone();
// Map v2 input items to core input items.
let mapped_items: Vec<CoreInputItem> = params
@@ -3655,6 +3658,15 @@ impl CodexMessageProcessor {
.into_iter()
.map(V2UserInput::into_core)
.collect();
let collaboration_mode_kind = params.collaboration_mode.as_ref().map(|mode| mode.mode);
let config_snapshot = thread.config_snapshot().await;
let collaboration_mode_kind =
collaboration_mode_kind.or(Some(config_snapshot.collaboration_mode_kind));
{
let mut summaries = self.turn_summary_store.lock().await;
let summary = summaries.entry(thread_id).or_default();
summary.collaboration_mode_kind = collaboration_mode_kind;
}
let has_any_overrides = params.cwd.is_some()
|| params.approval_policy.is_some()
@@ -3696,6 +3708,7 @@ impl CodexMessageProcessor {
items: vec![],
error: None,
status: TurnStatus::InProgress,
collaboration_mode_kind,
};
let response = TurnStartResponse { turn: turn.clone() };
@@ -3703,7 +3716,7 @@ impl CodexMessageProcessor {
// Emit v2 turn/started notification.
let notif = TurnStartedNotification {
thread_id: params.thread_id,
thread_id: thread_id_str,
turn,
};
self.outgoing
@@ -3740,6 +3753,7 @@ impl CodexMessageProcessor {
items,
error: None,
status: TurnStatus::InProgress,
collaboration_mode_kind: None,
}
}

View File

@@ -146,6 +146,7 @@ mod tests {
use crate::config::Config;
use crate::config::ConfigBuilder;
use assert_matches::assert_matches;
use codex_protocol::config_types::ModeKind;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::TurnAbortReason;
@@ -231,6 +232,7 @@ mod tests {
async fn on_event_updates_status_from_task_started() {
let status = agent_status_from_event(&EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
collaboration_mode_kind: ModeKind::Custom,
}));
assert_eq!(status, Some(AgentStatus::Running));
}

View File

@@ -433,6 +433,7 @@ pub(crate) struct TurnContext {
pub(crate) developer_instructions: Option<String>,
pub(crate) compact_prompt: Option<String>,
pub(crate) user_instructions: Option<String>,
pub(crate) collaboration_mode_kind: ModeKind,
pub(crate) personality: Option<Personality>,
pub(crate) approval_policy: AskForApproval,
pub(crate) sandbox_policy: SandboxPolicy,
@@ -513,6 +514,7 @@ impl SessionConfiguration {
reasoning_effort: self.collaboration_mode.reasoning_effort(),
personality: self.personality,
session_source: self.session_source.clone(),
collaboration_mode_kind: self.collaboration_mode.mode,
}
}
@@ -606,6 +608,7 @@ impl Session {
developer_instructions: session_configuration.developer_instructions.clone(),
compact_prompt: session_configuration.compact_prompt.clone(),
user_instructions: session_configuration.user_instructions.clone(),
collaboration_mode_kind: session_configuration.collaboration_mode.mode,
personality: session_configuration.personality,
approval_policy: session_configuration.approval_policy.value(),
sandbox_policy: session_configuration.sandbox_policy.get().clone(),
@@ -2818,6 +2821,7 @@ async fn spawn_review_thread(
developer_instructions: None,
user_instructions: None,
compact_prompt: parent_turn_context.compact_prompt.clone(),
collaboration_mode_kind: parent_turn_context.collaboration_mode_kind,
personality: parent_turn_context.personality,
approval_policy: parent_turn_context.approval_policy,
sandbox_policy: parent_turn_context.sandbox_policy.clone(),
@@ -2917,6 +2921,7 @@ pub(crate) async fn run_turn(
}
let event = EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
collaboration_mode_kind: turn_context.collaboration_mode_kind,
});
sess.send_event(&turn_context, event).await;

View File

@@ -4,6 +4,7 @@ use crate::error::Result as CodexResult;
use crate::protocol::Event;
use crate::protocol::Op;
use crate::protocol::Submission;
use codex_protocol::config_types::ModeKind;
use codex_protocol::config_types::Personality;
use codex_protocol::openai_models::ReasoningEffort;
use codex_protocol::protocol::AskForApproval;
@@ -22,6 +23,7 @@ pub struct ThreadConfigSnapshot {
pub reasoning_effort: Option<ReasoningEffort>,
pub personality: Option<Personality>,
pub session_source: SessionSource,
pub collaboration_mode_kind: ModeKind,
}
pub struct CodexThread {

View File

@@ -61,6 +61,7 @@ pub(crate) async fn run_compact_task(
) {
let start_event = EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
collaboration_mode_kind: turn_context.collaboration_mode_kind,
});
sess.send_event(&turn_context, start_event).await;
run_compact_task_inner(sess.clone(), turn_context, input).await;

View File

@@ -21,6 +21,7 @@ pub(crate) async fn run_inline_remote_auto_compact_task(
pub(crate) async fn run_remote_compact_task(sess: Arc<Session>, turn_context: Arc<TurnContext>) {
let start_event = EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
collaboration_mode_kind: turn_context.collaboration_mode_kind,
});
sess.send_event(&turn_context, start_event).await;

View File

@@ -67,6 +67,7 @@ impl SessionTask for UserShellCommandTask {
let event = EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
collaboration_mode_kind: turn_context.collaboration_mode_kind,
});
let session = session.clone_session();
session.send_event(turn_context.as_ref(), event).await;

View File

@@ -44,6 +44,7 @@ use codex_exec::exec_events::TurnFailedEvent;
use codex_exec::exec_events::TurnStartedEvent;
use codex_exec::exec_events::Usage;
use codex_exec::exec_events::WebSearchItem;
use codex_protocol::config_types::ModeKind;
use codex_protocol::plan_tool::PlanItemArg;
use codex_protocol::plan_tool::StepStatus;
use codex_protocol::plan_tool::UpdatePlanArgs;
@@ -104,6 +105,7 @@ fn task_started_produces_turn_started_event() {
"t1",
EventMsg::TurnStarted(codex_core::protocol::TurnStartedEvent {
model_context_window: Some(32_000),
collaboration_mode_kind: ModeKind::Custom,
}),
));

View File

@@ -14,6 +14,7 @@ use std::time::Duration;
use crate::ThreadId;
use crate::approvals::ElicitationRequestEvent;
use crate::config_types::CollaborationMode;
use crate::config_types::ModeKind;
use crate::config_types::Personality;
use crate::config_types::ReasoningSummary as ReasoningSummaryConfig;
use crate::custom_prompts::CustomPrompt;
@@ -1075,6 +1076,7 @@ pub struct TurnCompleteEvent {
pub struct TurnStartedEvent {
// TODO(aibrahim): make this not optional
pub model_context_window: Option<i64>,
pub collaboration_mode_kind: ModeKind,
}
#[derive(Debug, Clone, Deserialize, Serialize, Default, PartialEq, Eq, JsonSchema, TS)]

View File

@@ -1983,6 +1983,7 @@ async fn unified_exec_wait_after_final_agent_message_snapshot() {
id: "turn-1".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
collaboration_mode_kind: ModeKind::Custom,
}),
});
@@ -2017,6 +2018,7 @@ async fn unified_exec_wait_before_streamed_agent_message_snapshot() {
id: "turn-1".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
collaboration_mode_kind: ModeKind::Custom,
}),
});
@@ -2654,6 +2656,7 @@ async fn interrupted_turn_error_message_snapshot() {
id: "task-1".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
collaboration_mode_kind: ModeKind::Custom,
}),
});
@@ -3631,6 +3634,7 @@ async fn interrupt_clears_unified_exec_wait_streak_snapshot() {
id: "turn-1".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
collaboration_mode_kind: ModeKind::Custom,
}),
});
@@ -3704,6 +3708,7 @@ async fn ui_snapshots_small_heights_task_running() {
id: "task-1".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
collaboration_mode_kind: ModeKind::Custom,
}),
});
chat.handle_codex_event(Event {
@@ -3735,6 +3740,7 @@ async fn status_widget_and_approval_modal_snapshot() {
id: "task-1".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
collaboration_mode_kind: ModeKind::Custom,
}),
});
// Provide a deterministic header for the status line.
@@ -3787,6 +3793,7 @@ async fn status_widget_active_snapshot() {
id: "task-1".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
collaboration_mode_kind: ModeKind::Custom,
}),
});
// Provide a deterministic header via a bold reasoning chunk.
@@ -3836,6 +3843,7 @@ async fn mcp_startup_complete_does_not_clear_running_task() {
id: "task-1".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
collaboration_mode_kind: ModeKind::Custom,
}),
});
@@ -4392,6 +4400,7 @@ async fn stream_recovery_restores_previous_status_header() {
id: "task".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
collaboration_mode_kind: ModeKind::Custom,
}),
});
drain_insert_history(&mut rx);
@@ -4429,6 +4438,7 @@ async fn multiple_agent_messages_in_single_turn_emit_multiple_headers() {
id: "s1".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
collaboration_mode_kind: ModeKind::Custom,
}),
});
@@ -4623,6 +4633,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() {
id: "t1".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
collaboration_mode_kind: ModeKind::Custom,
}),
});
chat.handle_codex_event(Event {
@@ -4670,6 +4681,7 @@ async fn chatwidget_markdown_code_blocks_vt100_snapshot() {
id: "t1".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
collaboration_mode_kind: ModeKind::Custom,
}),
});
// Build a vt100 visual from the history insertions only (no UI overlay)
@@ -4759,6 +4771,7 @@ async fn chatwidget_tall() {
id: "t1".into(),
msg: EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: None,
collaboration_mode_kind: ModeKind::Custom,
}),
});
for i in 0..30 {