mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
Today `sub_id` is an ID of a single incoming Codex Op submition. We then associate all events triggered by this operation using the same `sub_id`. At the same time we are also creating a TurnContext per submission and we'd like to start associating some events (item added/item completed) with an entire turn instead of just the operation that started it. Using turn context when sending events give us flexibility to change notification scheme.
251 lines
7.4 KiB
Rust
251 lines
7.4 KiB
Rust
use crate::codex::Session;
|
|
use crate::codex::TurnContext;
|
|
use crate::tools::TELEMETRY_PREVIEW_MAX_BYTES;
|
|
use crate::tools::TELEMETRY_PREVIEW_MAX_LINES;
|
|
use crate::tools::TELEMETRY_PREVIEW_TRUNCATION_NOTICE;
|
|
use crate::turn_diff_tracker::TurnDiffTracker;
|
|
use codex_otel::otel_event_manager::OtelEventManager;
|
|
use codex_protocol::models::FunctionCallOutputPayload;
|
|
use codex_protocol::models::ResponseInputItem;
|
|
use codex_protocol::models::ShellToolCallParams;
|
|
use codex_protocol::protocol::FileChange;
|
|
use codex_utils_string::take_bytes_at_char_boundary;
|
|
use mcp_types::CallToolResult;
|
|
use std::borrow::Cow;
|
|
use std::collections::HashMap;
|
|
use std::path::PathBuf;
|
|
use std::sync::Arc;
|
|
use tokio::sync::Mutex;
|
|
|
|
pub type SharedTurnDiffTracker = Arc<Mutex<TurnDiffTracker>>;
|
|
|
|
#[derive(Clone)]
|
|
pub struct ToolInvocation {
|
|
pub session: Arc<Session>,
|
|
pub turn: Arc<TurnContext>,
|
|
pub tracker: SharedTurnDiffTracker,
|
|
pub call_id: String,
|
|
pub tool_name: String,
|
|
pub payload: ToolPayload,
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub enum ToolPayload {
|
|
Function {
|
|
arguments: String,
|
|
},
|
|
Custom {
|
|
input: String,
|
|
},
|
|
LocalShell {
|
|
params: ShellToolCallParams,
|
|
},
|
|
UnifiedExec {
|
|
arguments: String,
|
|
},
|
|
Mcp {
|
|
server: String,
|
|
tool: String,
|
|
raw_arguments: String,
|
|
},
|
|
}
|
|
|
|
impl ToolPayload {
|
|
pub fn log_payload(&self) -> Cow<'_, str> {
|
|
match self {
|
|
ToolPayload::Function { arguments } => Cow::Borrowed(arguments),
|
|
ToolPayload::Custom { input } => Cow::Borrowed(input),
|
|
ToolPayload::LocalShell { params } => Cow::Owned(params.command.join(" ")),
|
|
ToolPayload::UnifiedExec { arguments } => Cow::Borrowed(arguments),
|
|
ToolPayload::Mcp { raw_arguments, .. } => Cow::Borrowed(raw_arguments),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub enum ToolOutput {
|
|
Function {
|
|
content: String,
|
|
success: Option<bool>,
|
|
},
|
|
Mcp {
|
|
result: Result<CallToolResult, String>,
|
|
},
|
|
}
|
|
|
|
impl ToolOutput {
|
|
pub fn log_preview(&self) -> String {
|
|
match self {
|
|
ToolOutput::Function { content, .. } => telemetry_preview(content),
|
|
ToolOutput::Mcp { result } => format!("{result:?}"),
|
|
}
|
|
}
|
|
|
|
pub fn success_for_logging(&self) -> bool {
|
|
match self {
|
|
ToolOutput::Function { success, .. } => success.unwrap_or(true),
|
|
ToolOutput::Mcp { result } => result.is_ok(),
|
|
}
|
|
}
|
|
|
|
pub fn into_response(self, call_id: &str, payload: &ToolPayload) -> ResponseInputItem {
|
|
match self {
|
|
ToolOutput::Function { content, success } => {
|
|
if matches!(payload, ToolPayload::Custom { .. }) {
|
|
ResponseInputItem::CustomToolCallOutput {
|
|
call_id: call_id.to_string(),
|
|
output: content,
|
|
}
|
|
} else {
|
|
ResponseInputItem::FunctionCallOutput {
|
|
call_id: call_id.to_string(),
|
|
output: FunctionCallOutputPayload { content, success },
|
|
}
|
|
}
|
|
}
|
|
ToolOutput::Mcp { result } => ResponseInputItem::McpToolCallOutput {
|
|
call_id: call_id.to_string(),
|
|
result,
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
fn telemetry_preview(content: &str) -> String {
|
|
let truncated_slice = take_bytes_at_char_boundary(content, TELEMETRY_PREVIEW_MAX_BYTES);
|
|
let truncated_by_bytes = truncated_slice.len() < content.len();
|
|
|
|
let mut preview = String::new();
|
|
let mut lines_iter = truncated_slice.lines();
|
|
for idx in 0..TELEMETRY_PREVIEW_MAX_LINES {
|
|
match lines_iter.next() {
|
|
Some(line) => {
|
|
if idx > 0 {
|
|
preview.push('\n');
|
|
}
|
|
preview.push_str(line);
|
|
}
|
|
None => break,
|
|
}
|
|
}
|
|
let truncated_by_lines = lines_iter.next().is_some();
|
|
|
|
if !truncated_by_bytes && !truncated_by_lines {
|
|
return content.to_string();
|
|
}
|
|
|
|
if preview.len() < truncated_slice.len()
|
|
&& truncated_slice
|
|
.as_bytes()
|
|
.get(preview.len())
|
|
.is_some_and(|byte| *byte == b'\n')
|
|
{
|
|
preview.push('\n');
|
|
}
|
|
|
|
if !preview.is_empty() && !preview.ends_with('\n') {
|
|
preview.push('\n');
|
|
}
|
|
preview.push_str(TELEMETRY_PREVIEW_TRUNCATION_NOTICE);
|
|
|
|
preview
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use pretty_assertions::assert_eq;
|
|
|
|
#[test]
|
|
fn custom_tool_calls_should_roundtrip_as_custom_outputs() {
|
|
let payload = ToolPayload::Custom {
|
|
input: "patch".to_string(),
|
|
};
|
|
let response = ToolOutput::Function {
|
|
content: "patched".to_string(),
|
|
success: Some(true),
|
|
}
|
|
.into_response("call-42", &payload);
|
|
|
|
match response {
|
|
ResponseInputItem::CustomToolCallOutput { call_id, output } => {
|
|
assert_eq!(call_id, "call-42");
|
|
assert_eq!(output, "patched");
|
|
}
|
|
other => panic!("expected CustomToolCallOutput, got {other:?}"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn function_payloads_remain_function_outputs() {
|
|
let payload = ToolPayload::Function {
|
|
arguments: "{}".to_string(),
|
|
};
|
|
let response = ToolOutput::Function {
|
|
content: "ok".to_string(),
|
|
success: Some(true),
|
|
}
|
|
.into_response("fn-1", &payload);
|
|
|
|
match response {
|
|
ResponseInputItem::FunctionCallOutput { call_id, output } => {
|
|
assert_eq!(call_id, "fn-1");
|
|
assert_eq!(output.content, "ok");
|
|
assert_eq!(output.success, Some(true));
|
|
}
|
|
other => panic!("expected FunctionCallOutput, got {other:?}"),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn telemetry_preview_returns_original_within_limits() {
|
|
let content = "short output";
|
|
assert_eq!(telemetry_preview(content), content);
|
|
}
|
|
|
|
#[test]
|
|
fn telemetry_preview_truncates_by_bytes() {
|
|
let content = "x".repeat(TELEMETRY_PREVIEW_MAX_BYTES + 8);
|
|
let preview = telemetry_preview(&content);
|
|
|
|
assert!(preview.contains(TELEMETRY_PREVIEW_TRUNCATION_NOTICE));
|
|
assert!(
|
|
preview.len()
|
|
<= TELEMETRY_PREVIEW_MAX_BYTES + TELEMETRY_PREVIEW_TRUNCATION_NOTICE.len() + 1
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn telemetry_preview_truncates_by_lines() {
|
|
let content = (0..(TELEMETRY_PREVIEW_MAX_LINES + 5))
|
|
.map(|idx| format!("line {idx}"))
|
|
.collect::<Vec<_>>()
|
|
.join("\n");
|
|
|
|
let preview = telemetry_preview(&content);
|
|
let lines: Vec<&str> = preview.lines().collect();
|
|
|
|
assert!(lines.len() <= TELEMETRY_PREVIEW_MAX_LINES + 1);
|
|
assert_eq!(lines.last(), Some(&TELEMETRY_PREVIEW_TRUNCATION_NOTICE));
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Debug)]
|
|
#[allow(dead_code)]
|
|
pub(crate) struct ExecCommandContext {
|
|
pub(crate) turn: Arc<TurnContext>,
|
|
pub(crate) call_id: String,
|
|
pub(crate) command_for_display: Vec<String>,
|
|
pub(crate) cwd: PathBuf,
|
|
pub(crate) apply_patch: Option<ApplyPatchCommandContext>,
|
|
pub(crate) tool_name: String,
|
|
pub(crate) otel_event_manager: OtelEventManager,
|
|
}
|
|
|
|
#[derive(Clone, Debug)]
|
|
#[allow(dead_code)]
|
|
pub(crate) struct ApplyPatchCommandContext {
|
|
pub(crate) user_explicitly_approved_this_action: bool,
|
|
pub(crate) changes: HashMap<PathBuf, FileChange>,
|
|
}
|