Compare commits

...

1 Commits

Author SHA1 Message Date
Michael Bolin
f0249dc085 notify: include client in legacy hook payload 2026-02-26 17:46:45 -08:00
11 changed files with 266 additions and 25 deletions

View File

@@ -560,7 +560,12 @@ impl CodexMessageProcessor {
Ok((review_request, hint))
}
pub async fn process_request(&mut self, connection_id: ConnectionId, request: ClientRequest) {
pub async fn process_request(
&mut self,
connection_id: ConnectionId,
request: ClientRequest,
app_server_client_name: Option<String>,
) {
let to_connection_request_id = |request_id| ConnectionRequestId {
connection_id,
request_id,
@@ -647,8 +652,12 @@ impl CodexMessageProcessor {
.await;
}
ClientRequest::TurnStart { request_id, params } => {
self.turn_start(to_connection_request_id(request_id), params)
.await;
self.turn_start(
to_connection_request_id(request_id),
params,
app_server_client_name.clone(),
)
.await;
}
ClientRequest::TurnSteer { request_id, params } => {
self.turn_steer(to_connection_request_id(request_id), params)
@@ -767,12 +776,20 @@ impl CodexMessageProcessor {
.await;
}
ClientRequest::SendUserMessage { request_id, params } => {
self.send_user_message(to_connection_request_id(request_id), params)
.await;
self.send_user_message(
to_connection_request_id(request_id),
params,
app_server_client_name.clone(),
)
.await;
}
ClientRequest::SendUserTurn { request_id, params } => {
self.send_user_turn(to_connection_request_id(request_id), params)
.await;
self.send_user_turn(
to_connection_request_id(request_id),
params,
app_server_client_name.clone(),
)
.await;
}
ClientRequest::InterruptConversation { request_id, params } => {
self.interrupt_conversation(to_connection_request_id(request_id), params)
@@ -5062,6 +5079,7 @@ impl CodexMessageProcessor {
&self,
request_id: ConnectionRequestId,
params: SendUserMessageParams,
app_server_client_name: Option<String>,
) {
let SendUserMessageParams {
conversation_id,
@@ -5080,6 +5098,12 @@ impl CodexMessageProcessor {
self.outgoing.send_error(request_id, error).await;
return;
};
if let Err(error) =
Self::set_app_server_client_name(conversation.as_ref(), app_server_client_name).await
{
self.outgoing.send_error(request_id, error).await;
return;
}
let mapped_items: Vec<CoreInputItem> = items
.into_iter()
@@ -5110,7 +5134,12 @@ impl CodexMessageProcessor {
.await;
}
async fn send_user_turn(&self, request_id: ConnectionRequestId, params: SendUserTurnParams) {
async fn send_user_turn(
&self,
request_id: ConnectionRequestId,
params: SendUserTurnParams,
app_server_client_name: Option<String>,
) {
let SendUserTurnParams {
conversation_id,
items,
@@ -5136,6 +5165,12 @@ impl CodexMessageProcessor {
self.outgoing.send_error(request_id, error).await;
return;
};
if let Err(error) =
Self::set_app_server_client_name(conversation.as_ref(), app_server_client_name).await
{
self.outgoing.send_error(request_id, error).await;
return;
}
let mapped_items: Vec<CoreInputItem> = items
.into_iter()
@@ -5607,7 +5642,12 @@ impl CodexMessageProcessor {
let _ = conversation.submit(Op::Interrupt).await;
}
async fn turn_start(&self, request_id: ConnectionRequestId, params: TurnStartParams) {
async fn turn_start(
&self,
request_id: ConnectionRequestId,
params: TurnStartParams,
app_server_client_name: Option<String>,
) {
if let Err(error) = Self::validate_v2_input_limit(&params.input) {
self.outgoing.send_error(request_id, error).await;
return;
@@ -5619,6 +5659,12 @@ impl CodexMessageProcessor {
return;
}
};
if let Err(error) =
Self::set_app_server_client_name(thread.as_ref(), app_server_client_name).await
{
self.outgoing.send_error(request_id, error).await;
return;
}
let collaboration_modes_config = CollaborationModesConfig {
default_mode_request_user_input: thread.enabled(Feature::DefaultModeRequestUserInput),
@@ -5700,6 +5746,20 @@ impl CodexMessageProcessor {
}
}
async fn set_app_server_client_name(
thread: &CodexThread,
app_server_client_name: Option<String>,
) -> Result<(), JSONRPCErrorError> {
thread
.set_app_server_client_name(app_server_client_name)
.await
.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to set app server client name: {err}"),
data: None,
})
}
async fn turn_steer(&self, request_id: ConnectionRequestId, params: TurnSteerParams) {
let (_, thread) = match self.load_thread(&params.thread_id).await {
Ok(v) => v,

View File

@@ -140,6 +140,7 @@ pub(crate) struct ConnectionSessionState {
pub(crate) initialized: bool,
pub(crate) experimental_api_enabled: bool,
pub(crate) opted_out_notification_methods: HashSet<String>,
pub(crate) app_server_client_name: Option<String>,
}
pub(crate) struct MessageProcessorArgs {
@@ -329,6 +330,7 @@ impl MessageProcessor {
if let Ok(mut suffix) = USER_AGENT_SUFFIX.lock() {
*suffix = Some(user_agent_suffix);
}
session.app_server_client_name = Some(name.clone());
let user_agent = get_codex_user_agent();
let response = InitializeResponse { user_agent };
@@ -430,7 +432,7 @@ impl MessageProcessor {
// inline the full `CodexMessageProcessor::process_request` future, which
// can otherwise push worker-thread stack usage over the edge.
self.codex_message_processor
.process_request(connection_id, other)
.process_request(connection_id, other, session.app_server_client_name.clone())
.boxed()
.await;
}

View File

@@ -1,16 +1,24 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_final_assistant_message_sse_response;
use app_test_support::create_mock_responses_server_sequence_unchecked;
use app_test_support::to_response;
use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::InitializeCapabilities;
use codex_app_server_protocol::InitializeResponse;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput as V2UserInput;
use core_test_support::fs_wait;
use pretty_assertions::assert_eq;
use serde_json::Value;
use std::path::Path;
use std::time::Duration;
use tempfile::TempDir;
use tokio::time::timeout;
@@ -178,11 +186,100 @@ async fn initialize_opt_out_notification_methods_filters_notifications() -> Resu
Ok(())
}
#[tokio::test]
async fn turn_start_notify_payload_includes_initialize_client_name() -> Result<()> {
let responses = vec![create_final_assistant_message_sse_response("Done")?];
let server = create_mock_responses_server_sequence_unchecked(responses).await;
let codex_home = TempDir::new()?;
let notify_script = codex_home.path().join("notify.py");
std::fs::write(
&notify_script,
r#"from pathlib import Path
import sys
Path(__file__).with_name("notify.json").write_text(sys.argv[-1], encoding="utf-8")
"#,
)?;
let notify_file = codex_home.path().join("notify.json");
let notify_script = notify_script
.to_str()
.expect("notify script path should be valid UTF-8");
create_config_toml_with_extra(
codex_home.path(),
&server.uri(),
"never",
&format!(
"notify = [\"python3\", {}]",
toml_basic_string(notify_script)
),
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.initialize_with_client_info(ClientInfo {
name: "xcode".to_string(),
title: Some("Xcode".to_string()),
version: "1.0.0".to_string(),
}),
)
.await??;
let thread_req = mcp
.send_thread_start_request(ThreadStartParams::default())
.await?;
let thread_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(thread_req)),
)
.await??;
let ThreadStartResponse { thread, .. } = to_response(thread_resp)?;
let turn_req = mcp
.send_turn_start_request(TurnStartParams {
thread_id: thread.id,
input: vec![V2UserInput::Text {
text: "Hello".to_string(),
text_elements: Vec::new(),
}],
..Default::default()
})
.await?;
let turn_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(turn_req)),
)
.await??;
let _: TurnStartResponse = to_response(turn_resp)?;
timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("turn/completed"),
)
.await??;
fs_wait::wait_for_path_exists(&notify_file, Duration::from_secs(5)).await?;
let payload_raw = tokio::fs::read_to_string(&notify_file).await?;
let payload: Value = serde_json::from_str(&payload_raw)?;
assert_eq!(payload["client"], "xcode");
Ok(())
}
// Helper to create a config.toml pointing at the mock model server.
fn create_config_toml(
codex_home: &Path,
server_uri: &str,
approval_policy: &str,
) -> std::io::Result<()> {
create_config_toml_with_extra(codex_home, server_uri, approval_policy, "")
}
fn create_config_toml_with_extra(
codex_home: &Path,
server_uri: &str,
approval_policy: &str,
extra: &str,
) -> std::io::Result<()> {
let config_toml = codex_home.join("config.toml");
std::fs::write(
@@ -195,6 +292,8 @@ sandbox_mode = "read-only"
model_provider = "mock_provider"
{extra}
[model_providers.mock_provider]
name = "Mock provider for test"
base_url = "{server_uri}/v1"
@@ -205,3 +304,7 @@ stream_max_retries = 0
),
)
}
fn toml_basic_string(value: &str) -> String {
format!("\"{}\"", value.replace('\\', "\\\\").replace('"', "\\\""))
}

View File

@@ -443,6 +443,7 @@ impl Codex {
thread_name: None,
original_config_do_not_use: Arc::clone(&config),
metrics_service_name,
app_server_client_name: None,
session_source,
dynamic_tools,
persist_extended_history,
@@ -530,6 +531,18 @@ impl Codex {
self.session.steer_input(input, expected_turn_id).await
}
pub(crate) async fn set_app_server_client_name(
&self,
app_server_client_name: Option<String>,
) -> ConstraintResult<()> {
self.session
.update_settings(SessionSettingsUpdate {
app_server_client_name,
..Default::default()
})
.await
}
pub(crate) async fn agent_status(&self) -> AgentStatus {
self.agent_status.borrow().clone()
}
@@ -599,6 +612,7 @@ pub(crate) struct TurnContext {
pub(crate) cwd: PathBuf,
pub(crate) current_date: Option<String>,
pub(crate) timezone: Option<String>,
pub(crate) app_server_client_name: Option<String>,
pub(crate) developer_instructions: Option<String>,
pub(crate) compact_prompt: Option<String>,
pub(crate) user_instructions: Option<String>,
@@ -685,6 +699,7 @@ impl TurnContext {
cwd: self.cwd.clone(),
current_date: self.current_date.clone(),
timezone: self.timezone.clone(),
app_server_client_name: self.app_server_client_name.clone(),
developer_instructions: self.developer_instructions.clone(),
compact_prompt: self.compact_prompt.clone(),
user_instructions: self.user_instructions.clone(),
@@ -812,6 +827,7 @@ pub(crate) struct SessionConfiguration {
original_config_do_not_use: Arc<Config>,
/// Optional service name tag for session metrics.
metrics_service_name: Option<String>,
app_server_client_name: Option<String>,
/// Source of the session (cli, vscode, exec, mcp, ...)
session_source: SessionSource,
dynamic_tools: Vec<DynamicToolSpec>,
@@ -859,6 +875,9 @@ impl SessionConfiguration {
if let Some(cwd) = updates.cwd.clone() {
next_configuration.cwd = cwd;
}
if let Some(app_server_client_name) = updates.app_server_client_name.clone() {
next_configuration.app_server_client_name = Some(app_server_client_name);
}
Ok(next_configuration)
}
}
@@ -873,6 +892,7 @@ pub(crate) struct SessionSettingsUpdate {
pub(crate) reasoning_summary: Option<ReasoningSummaryConfig>,
pub(crate) final_output_json_schema: Option<Option<Value>>,
pub(crate) personality: Option<Personality>,
pub(crate) app_server_client_name: Option<String>,
}
impl Session {
@@ -1049,6 +1069,7 @@ impl Session {
cwd,
current_date: Some(current_date),
timezone: Some(timezone),
app_server_client_name: session_configuration.app_server_client_name.clone(),
developer_instructions: session_configuration.developer_instructions.clone(),
compact_prompt: session_configuration.compact_prompt.clone(),
user_instructions: session_configuration.user_instructions.clone(),
@@ -3948,6 +3969,7 @@ mod handlers {
reasoning_summary: summary,
final_output_json_schema: Some(final_output_json_schema),
personality,
app_server_client_name: None,
},
)
}
@@ -4699,6 +4721,7 @@ async fn spawn_review_thread(
ghost_snapshot: parent_turn_context.ghost_snapshot.clone(),
current_date: parent_turn_context.current_date.clone(),
timezone: parent_turn_context.timezone.clone(),
app_server_client_name: parent_turn_context.app_server_client_name.clone(),
developer_instructions: None,
user_instructions: None,
compact_prompt: parent_turn_context.compact_prompt.clone(),
@@ -5084,6 +5107,7 @@ pub(crate) async fn run_turn(
.dispatch(HookPayload {
session_id: sess.conversation_id,
cwd: turn_context.cwd.clone(),
client: turn_context.app_server_client_name.clone(),
triggered_at: chrono::Utc::now(),
hook_event: HookEvent::AfterAgent {
event: HookEventAfterAgent {
@@ -7893,6 +7917,7 @@ mod tests {
thread_name: None,
original_config_do_not_use: Arc::clone(&config),
metrics_service_name: None,
app_server_client_name: None,
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
@@ -7985,6 +8010,7 @@ mod tests {
thread_name: None,
original_config_do_not_use: Arc::clone(&config),
metrics_service_name: None,
app_server_client_name: None,
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
@@ -8296,6 +8322,7 @@ mod tests {
thread_name: None,
original_config_do_not_use: Arc::clone(&config),
metrics_service_name: None,
app_server_client_name: None,
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
@@ -8349,6 +8376,7 @@ mod tests {
thread_name: None,
original_config_do_not_use: Arc::clone(&config),
metrics_service_name: None,
app_server_client_name: None,
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
@@ -8430,6 +8458,7 @@ mod tests {
thread_name: None,
original_config_do_not_use: Arc::clone(&config),
metrics_service_name: None,
app_server_client_name: None,
session_source: SessionSource::Exec,
dynamic_tools: Vec::new(),
persist_extended_history: false,
@@ -8589,6 +8618,7 @@ mod tests {
thread_name: None,
original_config_do_not_use: Arc::clone(&config),
metrics_service_name: None,
app_server_client_name: None,
session_source: SessionSource::Exec,
dynamic_tools,
persist_extended_history: false,

View File

@@ -1,6 +1,7 @@
use crate::agent::AgentStatus;
use crate::codex::Codex;
use crate::codex::SteerInputError;
use crate::config::ConstraintResult;
use crate::error::Result as CodexResult;
use crate::features::Feature;
use crate::file_watcher::WatchRegistration;
@@ -67,6 +68,15 @@ impl CodexThread {
self.codex.steer_input(input, expected_turn_id).await
}
pub async fn set_app_server_client_name(
&self,
app_server_client_name: Option<String>,
) -> ConstraintResult<()> {
self.codex
.set_app_server_client_name(app_server_client_name)
.await
}
/// Use sparingly: this is intended to be removed soon.
pub async fn submit_with_id(&self, sub: Submission) -> CodexResult<()> {
self.codex.submit_with_id(sub).await

View File

@@ -379,6 +379,7 @@ async fn dispatch_after_tool_use_hook(
.dispatch(HookPayload {
session_id: session.conversation_id,
cwd: turn.cwd.clone(),
client: turn.app_server_client_name.clone(),
triggered_at: chrono::Utc::now(),
hook_event: HookEvent::AfterToolUse {
event: HookEventAfterToolUse {

View File

@@ -104,6 +104,7 @@ mod tests {
HookPayload {
session_id: ThreadId::new(),
cwd: PathBuf::from(CWD),
client: None,
triggered_at: Utc
.with_ymd_and_hms(2025, 1, 1, 0, 0, 0)
.single()
@@ -172,6 +173,7 @@ mod tests {
HookPayload {
session_id: ThreadId::new(),
cwd: PathBuf::from(CWD),
client: None,
triggered_at: Utc
.with_ymd_and_hms(2025, 1, 1, 0, 0, 0)
.single()

View File

@@ -65,6 +65,8 @@ impl Hook {
pub struct HookPayload {
pub session_id: ThreadId,
pub cwd: PathBuf,
#[serde(skip_serializing_if = "Option::is_none")]
pub client: Option<String>,
#[serde(serialize_with = "serialize_triggered_at")]
pub triggered_at: DateTime<Utc>,
pub hook_event: HookEvent,
@@ -181,6 +183,7 @@ mod tests {
let payload = HookPayload {
session_id,
cwd: PathBuf::from("tmp"),
client: None,
triggered_at: Utc
.with_ymd_and_hms(2025, 1, 1, 0, 0, 0)
.single()
@@ -218,6 +221,7 @@ mod tests {
let payload = HookPayload {
session_id,
cwd: PathBuf::from("tmp"),
client: None,
triggered_at: Utc
.with_ymd_and_hms(2025, 1, 1, 0, 0, 0)
.single()

View File

@@ -1,4 +1,3 @@
use std::path::Path;
use std::process::Stdio;
use std::sync::Arc;
@@ -19,6 +18,8 @@ enum UserNotification {
thread_id: String,
turn_id: String,
cwd: String,
#[serde(skip_serializing_if = "Option::is_none")]
client: Option<String>,
/// Messages that the user sent to the agent to initiate the turn.
input_messages: Vec<String>,
@@ -28,13 +29,14 @@ enum UserNotification {
},
}
pub fn legacy_notify_json(hook_event: &HookEvent, cwd: &Path) -> Result<String, serde_json::Error> {
match hook_event {
pub fn legacy_notify_json(payload: &HookPayload) -> Result<String, serde_json::Error> {
match &payload.hook_event {
HookEvent::AfterAgent { event } => {
serde_json::to_string(&UserNotification::AgentTurnComplete {
thread_id: event.thread_id.to_string(),
turn_id: event.turn_id.clone(),
cwd: cwd.display().to_string(),
cwd: payload.cwd.display().to_string(),
client: payload.client.clone(),
input_messages: event.input_messages.clone(),
last_assistant_message: event.last_assistant_message.clone(),
})
@@ -56,7 +58,7 @@ pub fn notify_hook(argv: Vec<String>) -> Hook {
Some(command) => command,
None => return HookResult::Success,
};
if let Ok(notify_payload) = legacy_notify_json(&payload.hook_event, &payload.cwd) {
if let Ok(notify_payload) = legacy_notify_json(payload) {
command.arg(notify_payload);
}
@@ -91,6 +93,7 @@ mod tests {
"thread-id": "b5f6c1c2-1111-2222-3333-444455556666",
"turn-id": "12345",
"cwd": "/Users/example/project",
"client": "codex-tui",
"input-messages": ["Rename `foo` to `bar` and update the callsites."],
"last-assistant-message": "Rename complete and verified `cargo build` succeeds.",
})
@@ -102,6 +105,7 @@ mod tests {
thread_id: "b5f6c1c2-1111-2222-3333-444455556666".to_string(),
turn_id: "12345".to_string(),
cwd: "/Users/example/project".to_string(),
client: Some("codex-tui".to_string()),
input_messages: vec!["Rename `foo` to `bar` and update the callsites.".to_string()],
last_assistant_message: Some(
"Rename complete and verified `cargo build` succeeds.".to_string(),
@@ -115,19 +119,27 @@ mod tests {
#[test]
fn legacy_notify_json_matches_historical_wire_shape() -> Result<()> {
let hook_event = HookEvent::AfterAgent {
event: crate::HookEventAfterAgent {
thread_id: ThreadId::from_string("b5f6c1c2-1111-2222-3333-444455556666")
.expect("valid thread id"),
turn_id: "12345".to_string(),
input_messages: vec!["Rename `foo` to `bar` and update the callsites.".to_string()],
last_assistant_message: Some(
"Rename complete and verified `cargo build` succeeds.".to_string(),
),
let payload = HookPayload {
session_id: ThreadId::new(),
cwd: std::path::Path::new("/Users/example/project").to_path_buf(),
client: Some("codex-tui".to_string()),
triggered_at: chrono::Utc::now(),
hook_event: HookEvent::AfterAgent {
event: crate::HookEventAfterAgent {
thread_id: ThreadId::from_string("b5f6c1c2-1111-2222-3333-444455556666")
.expect("valid thread id"),
turn_id: "12345".to_string(),
input_messages: vec![
"Rename `foo` to `bar` and update the callsites.".to_string(),
],
last_assistant_message: Some(
"Rename complete and verified `cargo build` succeeds.".to_string(),
),
},
},
};
let serialized = legacy_notify_json(&hook_event, Path::new("/Users/example/project"))?;
let serialized = legacy_notify_json(&payload)?;
let actual: Value = serde_json::from_str(&serialized)?;
assert_eq!(actual, expected_notification_json());

View File

@@ -13,6 +13,17 @@ use tokio::sync::mpsc::unbounded_channel;
use crate::app_event::AppEvent;
use crate::app_event_sender::AppEventSender;
const TUI_NOTIFY_CLIENT: &str = "codex-tui";
async fn initialize_app_server_client_name(thread: &CodexThread) {
if let Err(err) = thread
.set_app_server_client_name(Some(TUI_NOTIFY_CLIENT.to_string()))
.await
{
tracing::error!("failed to set app server client name: {err}");
}
}
/// Spawn the agent bootstrapper and op forwarding loop, returning the
/// `UnboundedSender<Op>` used by the UI to submit operations.
pub(crate) fn spawn_agent(
@@ -42,6 +53,7 @@ pub(crate) fn spawn_agent(
return;
}
};
initialize_app_server_client_name(thread.as_ref()).await;
// Forward the captured `SessionConfigured` event so it can be rendered in the UI.
let ev = codex_protocol::protocol::Event {
@@ -87,6 +99,8 @@ pub(crate) fn spawn_agent_from_existing(
let app_event_tx_clone = app_event_tx;
tokio::spawn(async move {
initialize_app_server_client_name(thread.as_ref()).await;
// Forward the captured `SessionConfigured` event so it can be rendered in the UI.
let ev = codex_protocol::protocol::Event {
id: "".to_string(),
@@ -123,6 +137,7 @@ pub(crate) fn spawn_op_forwarder(thread: std::sync::Arc<CodexThread>) -> Unbound
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
tokio::spawn(async move {
initialize_app_server_client_name(thread.as_ref()).await;
while let Some(op) = codex_op_rx.recv().await {
if let Err(e) = thread.submit(op).await {
tracing::error!("failed to submit op: {e}");

View File

@@ -24,6 +24,8 @@ Codex can run a notification hook when the agent finishes a turn. See the config
- https://developers.openai.com/codex/config-reference
When Codex knows which client started the turn, the legacy notify JSON payload also includes a top-level `client` field. The TUI reports `codex-tui`, and the app server reports the `clientInfo.name` value from `initialize`.
## JSON Schema
The generated JSON Schema for `config.toml` lives at `codex-rs/core/config.schema.json`.