diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 53a1c4cc6b..8c0c64baeb 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2868,6 +2868,7 @@ dependencies = [ "codex-plugin", "codex-protocol", "codex-utils-absolute-path", + "codex-utils-output-truncation", "futures", "pretty_assertions", "regex", @@ -2876,6 +2877,8 @@ dependencies = [ "serde_json", "tempfile", "tokio", + "tracing", + "uuid", ] [[package]] diff --git a/codex-rs/core/tests/suite/hooks.rs b/codex-rs/core/tests/suite/hooks.rs index 695e908ed8..0f8c6e552b 100644 --- a/codex-rs/core/tests/suite/hooks.rs +++ b/codex-rs/core/tests/suite/hooks.rs @@ -585,6 +585,38 @@ with Path(r"{log_path}").open("a", encoding="utf-8") as handle: Ok(()) } +fn write_session_start_hook_with_context(home: &Path, additional_context: &str) -> Result<()> { + let script_path = home.join("session_start_hook.py"); + let additional_context_json = serde_json::to_string(additional_context) + .context("serialize session start additional context for test")?; + let script = format!( + r#"import json + +print(json.dumps({{ + "hookSpecificOutput": {{ + "hookEventName": "SessionStart", + "additionalContext": {additional_context_json} + }} +}})) +"#, + ); + let hooks = serde_json::json!({ + "hooks": { + "SessionStart": [{ + "hooks": [{ + "type": "command", + "command": format!("python3 {}", script_path.display()), + "statusMessage": "running session start hook", + }] + }] + } + }); + + fs::write(&script_path, script).context("write session start hook script")?; + fs::write(home.join("hooks.json"), hooks.to_string()).context("write hooks.json")?; + Ok(()) +} + fn rollout_hook_prompt_texts(text: &str) -> Result> { let mut texts = Vec::new(); for line in text.lines() { @@ -618,6 +650,11 @@ fn request_hook_prompt_texts( .collect() } +fn spilled_hook_output_path(text: &str) -> Option<&str> { + text.lines() + .find_map(|line| line.strip_prefix("Full hook output saved to: ")) +} + fn read_stop_hook_inputs(home: &Path) -> Result> { fs::read_to_string(home.join("stop_hook_log.jsonl")) .context("read stop hook log")? @@ -905,6 +942,111 @@ async fn session_start_hook_sees_materialized_transcript_path() -> Result<()> { Ok(()) } +#[tokio::test] +async fn session_start_hook_spills_large_additional_context() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let response = mount_sse_once( + &server, + sse(vec![ + ev_response_created("resp-1"), + ev_assistant_message("msg-1", "hello from the reef"), + ev_completed("resp-1"), + ]), + ) + .await; + let additional_context = "remember the reef ".repeat(800); + + let mut builder = test_codex() + .with_pre_build_hook({ + let additional_context = additional_context.clone(); + move |home| { + if let Err(error) = write_session_start_hook_with_context(home, &additional_context) + { + panic!("failed to write session start hook test fixture: {error}"); + } + } + }) + .with_config(|config| { + config + .features + .enable(Feature::CodexHooks) + .expect("test config should allow feature update"); + }); + let test = builder.build(&server).await?; + + test.submit_turn("hello").await?; + + let request = response.single_request(); + let developer_messages = request.message_input_texts("developer"); + let developer_message = developer_messages + .iter() + .find(|message| spilled_hook_output_path(message).is_some()) + .context("spilled developer hook message")?; + assert!(developer_message.contains("tokens truncated")); + let path = spilled_hook_output_path(developer_message).context("spill path")?; + assert_eq!(fs::read_to_string(path)?, additional_context); + + Ok(()) +} + +#[tokio::test] +async fn stop_hook_spills_large_continuation_prompt() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let responses = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-1"), + ev_assistant_message("msg-1", "draft one"), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_assistant_message("msg-2", "draft two"), + ev_completed("resp-2"), + ]), + ], + ) + .await; + let continuation_prompt = std::iter::repeat_n("retry with the reef note", 800) + .collect::>() + .join(" "); + + let mut builder = test_codex() + .with_pre_build_hook({ + let continuation_prompt = continuation_prompt.clone(); + move |home| { + if let Err(error) = write_stop_hook(home, &[&continuation_prompt]) { + panic!("failed to write stop hook test fixture: {error}"); + } + } + }) + .with_config(|config| { + config + .features + .enable(Feature::CodexHooks) + .expect("test config should allow feature update"); + }); + let test = builder.build(&server).await?; + + test.submit_turn("hello from the sea").await?; + + let requests = responses.requests(); + assert_eq!(requests.len(), 2); + let hook_prompt_texts = request_hook_prompt_texts(&requests[1]); + assert_eq!(hook_prompt_texts.len(), 1); + let hook_prompt_text = &hook_prompt_texts[0]; + assert!(hook_prompt_text.contains("tokens truncated")); + let path = spilled_hook_output_path(hook_prompt_text).context("spill path")?; + assert_eq!(fs::read_to_string(path)?, continuation_prompt); + + Ok(()) +} + #[tokio::test] async fn resumed_thread_keeps_stop_continuation_prompt_in_history() -> Result<()> { skip_if_no_network!(Ok(())); @@ -2947,6 +3089,77 @@ async fn post_tool_use_exit_two_replaces_one_shot_exec_command_output_with_feedb Ok(()) } +#[tokio::test] +async fn post_tool_use_spills_large_feedback_message() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let call_id = "posttooluse-large-feedback"; + let command = "printf post-hook-output".to_string(); + let args = serde_json::json!({ "cmd": command, "tty": false }); + let responses = mount_sse_sequence( + &server, + vec![ + sse(vec![ + ev_response_created("resp-1"), + core_test_support::responses::ev_function_call( + call_id, + "exec_command", + &serde_json::to_string(&args)?, + ), + ev_completed("resp-1"), + ]), + sse(vec![ + ev_response_created("resp-2"), + ev_assistant_message("msg-1", "post hook blocked the exec result"), + ev_completed("resp-2"), + ]), + ], + ) + .await; + let feedback = "blocked by post hook ".repeat(800); + + let mut builder = test_codex() + .with_pre_build_hook({ + let feedback = feedback.clone(); + move |home| { + if let Err(error) = + write_post_tool_use_hook(home, Some("^Bash$"), "exit_2", &feedback) + { + panic!("failed to write post tool use hook test fixture: {error}"); + } + } + }) + .with_config(|config| { + config.use_experimental_unified_exec_tool = true; + config + .features + .enable(Feature::CodexHooks) + .expect("test config should allow feature update"); + config + .features + .enable(Feature::UnifiedExec) + .expect("test config should allow feature update"); + }); + let test = builder.build(&server).await?; + + test.submit_turn("run the exec command with long post-hook feedback") + .await?; + + let requests = responses.requests(); + assert_eq!(requests.len(), 2); + let output_item = requests[1].function_call_output(call_id); + let output = output_item + .get("output") + .and_then(Value::as_str) + .expect("exec command output string"); + assert!(output.contains("tokens truncated")); + let path = spilled_hook_output_path(output).context("spill path")?; + assert_eq!(fs::read_to_string(path)?, feedback.trim()); + + Ok(()) +} + #[tokio::test] async fn post_tool_use_blocks_when_exec_session_completes_via_write_stdin() -> Result<()> { skip_if_no_network!(Ok(())); diff --git a/codex-rs/hooks/Cargo.toml b/codex-rs/hooks/Cargo.toml index 028a055424..1bd7f455f2 100644 --- a/codex-rs/hooks/Cargo.toml +++ b/codex-rs/hooks/Cargo.toml @@ -19,12 +19,15 @@ codex-config = { workspace = true } codex-plugin = { workspace = true } codex-protocol = { workspace = true } codex-utils-absolute-path = { workspace = true } +codex-utils-output-truncation = { workspace = true } futures = { workspace = true, features = ["alloc"] } regex = { workspace = true } schemars = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } -tokio = { workspace = true, features = ["io-util", "process", "time"] } +tokio = { workspace = true, features = ["fs", "io-util", "process", "time"] } +tracing = { workspace = true } +uuid = { workspace = true, features = ["v4"] } [dev-dependencies] pretty_assertions = { workspace = true } diff --git a/codex-rs/hooks/src/engine/mod.rs b/codex-rs/hooks/src/engine/mod.rs index c06c9fabd7..37967862a8 100644 --- a/codex-rs/hooks/src/engine/mod.rs +++ b/codex-rs/hooks/src/engine/mod.rs @@ -6,14 +6,6 @@ pub(crate) mod schema_loader; use std::collections::HashMap; -use codex_config::ConfigLayerStack; -use codex_plugin::PluginHookSource; -use codex_protocol::protocol::HookEventName; -use codex_protocol::protocol::HookHandlerType; -use codex_protocol::protocol::HookRunSummary; -use codex_protocol::protocol::HookSource; -use codex_utils_absolute_path::AbsolutePathBuf; - use crate::events::permission_request::PermissionRequestOutcome; use crate::events::permission_request::PermissionRequestRequest; use crate::events::post_tool_use::PostToolUseOutcome; @@ -26,6 +18,15 @@ use crate::events::stop::StopOutcome; use crate::events::stop::StopRequest; use crate::events::user_prompt_submit::UserPromptSubmitOutcome; use crate::events::user_prompt_submit::UserPromptSubmitRequest; +use crate::output_spill::HookOutputSpiller; +use codex_config::ConfigLayerStack; +use codex_plugin::PluginHookSource; +use codex_protocol::ThreadId; +use codex_protocol::protocol::HookEventName; +use codex_protocol::protocol::HookHandlerType; +use codex_protocol::protocol::HookRunSummary; +use codex_protocol::protocol::HookSource; +use codex_utils_absolute_path::AbsolutePathBuf; #[derive(Debug, Clone)] pub(crate) struct CommandShell { @@ -90,6 +91,7 @@ pub(crate) struct ClaudeHooksEngine { handlers: Vec, warnings: Vec, shell: CommandShell, + output_spiller: HookOutputSpiller, } impl ClaudeHooksEngine { @@ -105,6 +107,7 @@ impl ClaudeHooksEngine { handlers: Vec::new(), warnings: Vec::new(), shell, + output_spiller: HookOutputSpiller::new(), }; } @@ -118,6 +121,7 @@ impl ClaudeHooksEngine { handlers: discovered.handlers, warnings: discovered.warnings, shell, + output_spiller: HookOutputSpiller::new(), } } @@ -155,7 +159,13 @@ impl ClaudeHooksEngine { request: SessionStartRequest, turn_id: Option, ) -> SessionStartOutcome { - crate::events::session_start::run(&self.handlers, &self.shell, request, turn_id).await + let session_id = request.session_id; + let mut outcome = + crate::events::session_start::run(&self.handlers, &self.shell, request, turn_id).await; + outcome.additional_contexts = self + .maybe_spill_texts(session_id, outcome.additional_contexts) + .await; + outcome } pub(crate) async fn run_pre_tool_use(&self, request: PreToolUseRequest) -> PreToolUseOutcome { @@ -173,7 +183,16 @@ impl ClaudeHooksEngine { &self, request: PostToolUseRequest, ) -> PostToolUseOutcome { - crate::events::post_tool_use::run(&self.handlers, &self.shell, request).await + let session_id = request.session_id; + let mut outcome = + crate::events::post_tool_use::run(&self.handlers, &self.shell, request).await; + outcome.additional_contexts = self + .maybe_spill_texts(session_id, outcome.additional_contexts) + .await; + outcome.feedback_message = self + .maybe_spill_text(session_id, outcome.feedback_message) + .await; + outcome } pub(crate) fn preview_user_prompt_submit( @@ -187,7 +206,13 @@ impl ClaudeHooksEngine { &self, request: UserPromptSubmitRequest, ) -> UserPromptSubmitOutcome { - crate::events::user_prompt_submit::run(&self.handlers, &self.shell, request).await + let session_id = request.session_id; + let mut outcome = + crate::events::user_prompt_submit::run(&self.handlers, &self.shell, request).await; + outcome.additional_contexts = self + .maybe_spill_texts(session_id, outcome.additional_contexts) + .await; + outcome } pub(crate) fn preview_stop(&self, request: &StopRequest) -> Vec { @@ -195,7 +220,35 @@ impl ClaudeHooksEngine { } pub(crate) async fn run_stop(&self, request: StopRequest) -> StopOutcome { - crate::events::stop::run(&self.handlers, &self.shell, request).await + let session_id = request.session_id; + let mut outcome = crate::events::stop::run(&self.handlers, &self.shell, request).await; + outcome.continuation_fragments = self + .maybe_spill_prompt_fragments(session_id, outcome.continuation_fragments) + .await; + outcome + } + + async fn maybe_spill_texts(&self, session_id: ThreadId, texts: Vec) -> Vec { + self.output_spiller + .maybe_spill_texts(session_id, texts) + .await + } + + async fn maybe_spill_text(&self, session_id: ThreadId, text: Option) -> Option { + match text { + Some(text) => Some(self.output_spiller.maybe_spill_text(session_id, text).await), + None => None, + } + } + + async fn maybe_spill_prompt_fragments( + &self, + session_id: ThreadId, + fragments: Vec, + ) -> Vec { + self.output_spiller + .maybe_spill_prompt_fragments(session_id, fragments) + .await } } diff --git a/codex-rs/hooks/src/lib.rs b/codex-rs/hooks/src/lib.rs index 4e16969a58..ad627b497a 100644 --- a/codex-rs/hooks/src/lib.rs +++ b/codex-rs/hooks/src/lib.rs @@ -2,6 +2,7 @@ mod config_rules; mod engine; pub(crate) mod events; mod legacy_notify; +mod output_spill; mod registry; mod schema; mod types; diff --git a/codex-rs/hooks/src/output_spill.rs b/codex-rs/hooks/src/output_spill.rs new file mode 100644 index 0000000000..b1828c0825 --- /dev/null +++ b/codex-rs/hooks/src/output_spill.rs @@ -0,0 +1,111 @@ +use codex_protocol::ThreadId; +use codex_protocol::items::HookPromptFragment; +use codex_utils_absolute_path::AbsolutePathBuf; +use codex_utils_output_truncation::TruncationPolicy; +use codex_utils_output_truncation::approx_token_count; +use codex_utils_output_truncation::formatted_truncate_text; +use tokio::fs; +use tracing::warn; +use uuid::Uuid; + +const HOOK_OUTPUTS_DIR: &str = "hook_outputs"; +const HOOK_OUTPUT_TOKEN_LIMIT: usize = 2_500; + +#[derive(Clone)] +pub(crate) struct HookOutputSpiller { + output_dir: AbsolutePathBuf, +} + +impl HookOutputSpiller { + pub(crate) fn new() -> Self { + Self { + output_dir: AbsolutePathBuf::resolve_path_against_base(std::env::temp_dir(), "/") + .join(HOOK_OUTPUTS_DIR), + } + } + + /// Keeps hook text within the model-visible hook-output budget. + /// + /// Oversized text is written in full under the OS temp directory at + /// `/hook_outputs//` + /// and replaced with the same head/tail preview style used for other truncated + /// output, plus a path back to the preserved full text. + pub(crate) async fn maybe_spill_text(&self, thread_id: ThreadId, text: String) -> String { + if approx_token_count(&text) <= HOOK_OUTPUT_TOKEN_LIMIT { + return text; + } + + let path = hook_output_path(&self.output_dir, thread_id); + if let Some(parent) = path.parent() + && let Err(err) = fs::create_dir_all(parent.as_ref()).await + { + warn!( + "failed to create hook output directory {}: {err}", + parent.display() + ); + return formatted_truncate_text( + &text, + TruncationPolicy::Tokens(HOOK_OUTPUT_TOKEN_LIMIT), + ); + } + + if let Err(err) = fs::write(path.as_ref(), &text).await { + warn!("failed to write hook output {}: {err}", path.display()); + return formatted_truncate_text( + &text, + TruncationPolicy::Tokens(HOOK_OUTPUT_TOKEN_LIMIT), + ); + } + + spilled_hook_output_preview(&text, &path) + } + + pub(crate) async fn maybe_spill_texts( + &self, + thread_id: ThreadId, + texts: Vec, + ) -> Vec { + let mut spilled = Vec::with_capacity(texts.len()); + for text in texts { + spilled.push(self.maybe_spill_text(thread_id, text).await); + } + spilled + } + + pub(crate) async fn maybe_spill_prompt_fragments( + &self, + thread_id: ThreadId, + fragments: Vec, + ) -> Vec { + let mut spilled = Vec::with_capacity(fragments.len()); + for fragment in fragments { + spilled.push(HookPromptFragment { + text: self.maybe_spill_text(thread_id, fragment.text).await, + hook_run_id: fragment.hook_run_id, + }); + } + spilled + } +} + +fn hook_output_path(output_dir: &AbsolutePathBuf, thread_id: ThreadId) -> AbsolutePathBuf { + output_dir + .join(thread_id.to_string()) + .join(format!("{}.txt", Uuid::new_v4())) +} + +/// Builds the model-visible replacement for a spilled hook output. +/// +/// The path footer is budgeted before truncation so adding the recovery path +/// does not let the preview grow past the hook-output limit. +fn spilled_hook_output_preview(text: &str, path: &AbsolutePathBuf) -> String { + let footer = format!("\n\nFull hook output saved to: {}", path.display()); + let preview_policy = TruncationPolicy::Tokens( + HOOK_OUTPUT_TOKEN_LIMIT.saturating_sub(approx_token_count(&footer)), + ); + format!("{}{footer}", formatted_truncate_text(text, preview_policy)) +} + +#[cfg(test)] +#[path = "output_spill_tests.rs"] +mod tests; diff --git a/codex-rs/hooks/src/output_spill_tests.rs b/codex-rs/hooks/src/output_spill_tests.rs new file mode 100644 index 0000000000..6c5f9b5848 --- /dev/null +++ b/codex-rs/hooks/src/output_spill_tests.rs @@ -0,0 +1,42 @@ +use super::*; +use anyhow::Context; +use anyhow::Result; +use tempfile::tempdir; + +#[tokio::test] +async fn small_hook_output_remains_inline() -> Result<()> { + let dir = tempdir()?; + let output_dir = AbsolutePathBuf::from_absolute_path(dir.path())?.join(HOOK_OUTPUTS_DIR); + let thread_id = ThreadId::new(); + let spiller = HookOutputSpiller { + output_dir: output_dir.clone(), + }; + + let output = spiller + .maybe_spill_text(thread_id, "short".to_string()) + .await; + + assert_eq!(output, "short"); + assert!(!output_dir.exists()); + Ok(()) +} + +#[tokio::test] +async fn large_hook_output_spills_to_file() -> Result<()> { + let dir = tempdir()?; + let text = "hook output ".repeat(1_000); + let output_dir = AbsolutePathBuf::from_absolute_path(dir.path())?.join(HOOK_OUTPUTS_DIR); + let spiller = HookOutputSpiller { output_dir }; + + let output = spiller + .maybe_spill_text(ThreadId::new(), text.clone()) + .await; + + assert!(output.contains("tokens truncated")); + let path = output + .lines() + .find_map(|line| line.strip_prefix("Full hook output saved to: ")) + .context("spill path")?; + assert_eq!(fs::read_to_string(path).await?, text); + Ok(()) +}