mirror of
https://github.com/openai/codex.git
synced 2026-04-24 06:35:50 +00:00
codex: simplify in-process startup wiring (#15106)
This commit is contained in:
@@ -227,8 +227,6 @@ impl InProcessClientStartArgs {
|
||||
cli_overrides: self.cli_overrides,
|
||||
loader_overrides: self.loader_overrides,
|
||||
cloud_requirements: self.cloud_requirements,
|
||||
auth_manager: None,
|
||||
thread_manager: None,
|
||||
feedback: self.feedback,
|
||||
config_warnings: self.config_warnings,
|
||||
session_source: self.session_source,
|
||||
@@ -303,6 +301,9 @@ pub enum AppServerClient {
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ChatgptAuthRefreshContext {
|
||||
// Reconstructs the current local ChatGPT login from auth.json so the
|
||||
// in-process client can satisfy app-server refresh requests without
|
||||
// surfacing AuthManager above the client boundary.
|
||||
codex_home: std::path::PathBuf,
|
||||
auth_credentials_store_mode: AuthCredentialsStoreMode,
|
||||
forced_chatgpt_workspace_id: Option<String>,
|
||||
@@ -1663,8 +1664,6 @@ mod tests {
|
||||
}
|
||||
.into_runtime_start_args();
|
||||
|
||||
assert!(runtime_args.auth_manager.is_none());
|
||||
assert!(runtime_args.thread_manager.is_none());
|
||||
assert_eq!(runtime_args.allow_legacy_notifications, false);
|
||||
assert_eq!(runtime_args.config, config);
|
||||
}
|
||||
|
||||
@@ -74,8 +74,6 @@ use codex_app_server_protocol::Result;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
@@ -124,10 +122,6 @@ pub struct InProcessStartArgs {
|
||||
pub loader_overrides: LoaderOverrides,
|
||||
/// Preloaded cloud requirements provider.
|
||||
pub cloud_requirements: CloudRequirementsLoader,
|
||||
/// Optional prebuilt auth manager reused by an embedding caller.
|
||||
pub auth_manager: Option<Arc<AuthManager>>,
|
||||
/// Optional prebuilt thread manager reused by an embedding caller.
|
||||
pub thread_manager: Option<Arc<ThreadManager>>,
|
||||
/// Feedback sink used by app-server/core telemetry and logs.
|
||||
pub feedback: CodexFeedback,
|
||||
/// Startup warnings emitted after initialize succeeds.
|
||||
@@ -413,8 +407,6 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
cli_overrides: args.cli_overrides,
|
||||
loader_overrides: args.loader_overrides,
|
||||
cloud_requirements: args.cloud_requirements,
|
||||
auth_manager: args.auth_manager,
|
||||
thread_manager: args.thread_manager,
|
||||
feedback: args.feedback,
|
||||
log_db: None,
|
||||
config_warnings: args.config_warnings,
|
||||
@@ -762,8 +754,6 @@ mod tests {
|
||||
cli_overrides: Vec::new(),
|
||||
loader_overrides: LoaderOverrides::default(),
|
||||
cloud_requirements: CloudRequirementsLoader::default(),
|
||||
auth_manager: None,
|
||||
thread_manager: None,
|
||||
feedback: CodexFeedback::new(),
|
||||
config_warnings: Vec::new(),
|
||||
session_source,
|
||||
|
||||
@@ -618,8 +618,6 @@ pub async fn run_main_with_transport(
|
||||
cli_overrides,
|
||||
loader_overrides,
|
||||
cloud_requirements: cloud_requirements.clone(),
|
||||
auth_manager: None,
|
||||
thread_manager: None,
|
||||
feedback: feedback.clone(),
|
||||
log_db,
|
||||
config_warnings,
|
||||
|
||||
@@ -172,8 +172,6 @@ pub(crate) struct MessageProcessorArgs {
|
||||
pub(crate) cli_overrides: Vec<(String, TomlValue)>,
|
||||
pub(crate) loader_overrides: LoaderOverrides,
|
||||
pub(crate) cloud_requirements: CloudRequirementsLoader,
|
||||
pub(crate) auth_manager: Option<Arc<AuthManager>>,
|
||||
pub(crate) thread_manager: Option<Arc<ThreadManager>>,
|
||||
pub(crate) feedback: CodexFeedback,
|
||||
pub(crate) log_db: Option<LogDbLayer>,
|
||||
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
|
||||
@@ -192,36 +190,27 @@ impl MessageProcessor {
|
||||
cli_overrides,
|
||||
loader_overrides,
|
||||
cloud_requirements,
|
||||
auth_manager,
|
||||
thread_manager,
|
||||
feedback,
|
||||
log_db,
|
||||
config_warnings,
|
||||
session_source,
|
||||
enable_codex_api_key_env,
|
||||
} = args;
|
||||
let (auth_manager, thread_manager) = match (auth_manager, thread_manager) {
|
||||
(Some(auth_manager), Some(thread_manager)) => (auth_manager, thread_manager),
|
||||
(None, None) => {
|
||||
let auth_manager = AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
enable_codex_api_key_env,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
);
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
config.as_ref(),
|
||||
auth_manager.clone(),
|
||||
session_source,
|
||||
CollaborationModesConfig {
|
||||
default_mode_request_user_input: config
|
||||
.features
|
||||
.enabled(Feature::DefaultModeRequestUserInput),
|
||||
},
|
||||
));
|
||||
(auth_manager, thread_manager)
|
||||
}
|
||||
_ => panic!("MessageProcessorArgs must provide both auth_manager and thread_manager"),
|
||||
};
|
||||
let auth_manager = AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
enable_codex_api_key_env,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
);
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
config.as_ref(),
|
||||
auth_manager.clone(),
|
||||
session_source,
|
||||
CollaborationModesConfig {
|
||||
default_mode_request_user_input: config
|
||||
.features
|
||||
.enabled(Feature::DefaultModeRequestUserInput),
|
||||
},
|
||||
));
|
||||
auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id.clone());
|
||||
auth_manager.set_external_auth_refresher(Arc::new(ExternalAuthRefreshBridge {
|
||||
outgoing: outgoing.clone(),
|
||||
|
||||
@@ -239,8 +239,6 @@ fn build_test_processor(
|
||||
cli_overrides: Vec::new(),
|
||||
loader_overrides: LoaderOverrides::default(),
|
||||
cloud_requirements: CloudRequirementsLoader::default(),
|
||||
auth_manager: None,
|
||||
thread_manager: None,
|
||||
feedback: CodexFeedback::new(),
|
||||
log_db: None,
|
||||
config_warnings: Vec::new(),
|
||||
|
||||
@@ -502,5 +502,5 @@ impl EventProcessor for EventProcessorWithJsonOutput {
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
#[path = "event_processor_with_jsonl_output_tests.rs"]
|
||||
#[path = "tests/event_processor_with_jsonl_output_tests.rs"]
|
||||
mod tests;
|
||||
|
||||
@@ -0,0 +1,654 @@
|
||||
use std::path::PathBuf;
|
||||
|
||||
use codex_app_server_protocol::CommandAction;
|
||||
use codex_app_server_protocol::CommandExecutionSource;
|
||||
use codex_app_server_protocol::CommandExecutionStatus as ApiCommandExecutionStatus;
|
||||
use codex_app_server_protocol::ErrorNotification;
|
||||
use codex_app_server_protocol::ItemCompletedNotification;
|
||||
use codex_app_server_protocol::ItemStartedNotification;
|
||||
use codex_app_server_protocol::ThreadItem;
|
||||
use codex_app_server_protocol::ThreadTokenUsage;
|
||||
use codex_app_server_protocol::TokenUsageBreakdown;
|
||||
use codex_app_server_protocol::Turn;
|
||||
use codex_app_server_protocol::TurnCompletedNotification;
|
||||
use codex_app_server_protocol::TurnError;
|
||||
use codex_app_server_protocol::TurnPlanStep;
|
||||
use codex_app_server_protocol::TurnPlanStepStatus;
|
||||
use codex_app_server_protocol::TurnPlanUpdatedNotification;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
use codex_app_server_protocol::WebSearchAction as ApiWebSearchAction;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::models::WebSearchAction;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::protocol::SessionConfiguredEvent;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::CollectedThreadEvents;
|
||||
use super::EventProcessorWithJsonOutput;
|
||||
use crate::event_processor::CodexStatus;
|
||||
use crate::exec_events::CommandExecutionItem;
|
||||
use crate::exec_events::CommandExecutionStatus;
|
||||
use crate::exec_events::ItemCompletedEvent;
|
||||
use crate::exec_events::ItemStartedEvent;
|
||||
use crate::exec_events::ItemUpdatedEvent;
|
||||
use crate::exec_events::ThreadErrorEvent;
|
||||
use crate::exec_events::ThreadEvent;
|
||||
use crate::exec_events::ThreadItem as ExecThreadItem;
|
||||
use crate::exec_events::ThreadItemDetails;
|
||||
use crate::exec_events::ThreadStartedEvent;
|
||||
use crate::exec_events::TodoItem;
|
||||
use crate::exec_events::TodoListItem;
|
||||
use crate::exec_events::TurnCompletedEvent;
|
||||
use crate::exec_events::TurnStartedEvent;
|
||||
use crate::exec_events::Usage;
|
||||
use crate::exec_events::WebSearchItem;
|
||||
use crate::typed_exec_event::TypedExecEvent;
|
||||
|
||||
#[test]
|
||||
fn map_todo_items_preserves_text_and_completion_state() {
|
||||
let items = EventProcessorWithJsonOutput::map_todo_items(&[
|
||||
TurnPlanStep {
|
||||
step: "inspect bootstrap".to_string(),
|
||||
status: TurnPlanStepStatus::InProgress,
|
||||
},
|
||||
TurnPlanStep {
|
||||
step: "drop legacy notifications".to_string(),
|
||||
status: TurnPlanStepStatus::Completed,
|
||||
},
|
||||
]);
|
||||
|
||||
assert_eq!(
|
||||
items,
|
||||
vec![
|
||||
TodoItem {
|
||||
text: "inspect bootstrap".to_string(),
|
||||
completed: false,
|
||||
},
|
||||
TodoItem {
|
||||
text: "drop legacy notifications".to_string(),
|
||||
completed: true,
|
||||
},
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn session_configured_produces_thread_started_event() {
|
||||
let session_configured = SessionConfiguredEvent {
|
||||
session_id: ThreadId::from_string("67e55044-10b1-426f-9247-bb680e5fe0c8")
|
||||
.expect("thread id should parse"),
|
||||
forked_from_id: None,
|
||||
thread_name: None,
|
||||
model: "codex-mini-latest".to_string(),
|
||||
model_provider_id: "test-provider".to_string(),
|
||||
service_tier: None,
|
||||
approval_policy: AskForApproval::Never,
|
||||
approvals_reviewer: codex_protocol::config_types::ApprovalsReviewer::User,
|
||||
sandbox_policy: SandboxPolicy::new_read_only_policy(),
|
||||
cwd: PathBuf::from("/tmp/project"),
|
||||
reasoning_effort: None,
|
||||
history_log_id: 0,
|
||||
history_entry_count: 0,
|
||||
initial_messages: None,
|
||||
network_proxy: None,
|
||||
rollout_path: None,
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
EventProcessorWithJsonOutput::thread_started_event(&session_configured),
|
||||
ThreadEvent::ThreadStarted(ThreadStartedEvent {
|
||||
thread_id: "67e55044-10b1-426f-9247-bb680e5fe0c8".to_string(),
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn turn_started_emits_turn_started_event() {
|
||||
let mut processor = EventProcessorWithJsonOutput::new(None);
|
||||
|
||||
let collected = processor.collect_thread_events(TypedExecEvent::TurnStarted);
|
||||
|
||||
assert_eq!(
|
||||
collected,
|
||||
CollectedThreadEvents {
|
||||
events: vec![ThreadEvent::TurnStarted(TurnStartedEvent {})],
|
||||
status: CodexStatus::Running,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn command_execution_started_and_completed_translate_to_thread_events() {
|
||||
let mut processor = EventProcessorWithJsonOutput::new(None);
|
||||
let command_item = ThreadItem::CommandExecution {
|
||||
id: "cmd-1".to_string(),
|
||||
command: "ls".to_string(),
|
||||
cwd: PathBuf::from("/tmp/project"),
|
||||
process_id: Some("123".to_string()),
|
||||
source: CommandExecutionSource::UserShell,
|
||||
status: ApiCommandExecutionStatus::InProgress,
|
||||
command_actions: Vec::<CommandAction>::new(),
|
||||
aggregated_output: None,
|
||||
exit_code: None,
|
||||
duration_ms: None,
|
||||
};
|
||||
|
||||
let started =
|
||||
processor.collect_thread_events(TypedExecEvent::ItemStarted(ItemStartedNotification {
|
||||
item: command_item,
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
}));
|
||||
assert_eq!(
|
||||
started,
|
||||
CollectedThreadEvents {
|
||||
events: vec![ThreadEvent::ItemStarted(ItemStartedEvent {
|
||||
item: ExecThreadItem {
|
||||
id: "cmd-1".to_string(),
|
||||
details: ThreadItemDetails::CommandExecution(CommandExecutionItem {
|
||||
command: "ls".to_string(),
|
||||
aggregated_output: String::new(),
|
||||
exit_code: None,
|
||||
status: CommandExecutionStatus::InProgress,
|
||||
}),
|
||||
},
|
||||
})],
|
||||
status: CodexStatus::Running,
|
||||
}
|
||||
);
|
||||
|
||||
let completed =
|
||||
processor.collect_thread_events(TypedExecEvent::ItemCompleted(ItemCompletedNotification {
|
||||
item: ThreadItem::CommandExecution {
|
||||
id: "cmd-1".to_string(),
|
||||
command: "ls".to_string(),
|
||||
cwd: PathBuf::from("/tmp/project"),
|
||||
process_id: Some("123".to_string()),
|
||||
source: CommandExecutionSource::UserShell,
|
||||
status: ApiCommandExecutionStatus::Completed,
|
||||
command_actions: Vec::<CommandAction>::new(),
|
||||
aggregated_output: Some("a.txt\n".to_string()),
|
||||
exit_code: Some(0),
|
||||
duration_ms: Some(3),
|
||||
},
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
}));
|
||||
assert_eq!(
|
||||
completed,
|
||||
CollectedThreadEvents {
|
||||
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
||||
item: ExecThreadItem {
|
||||
id: "cmd-1".to_string(),
|
||||
details: ThreadItemDetails::CommandExecution(CommandExecutionItem {
|
||||
command: "ls".to_string(),
|
||||
aggregated_output: "a.txt\n".to_string(),
|
||||
exit_code: Some(0),
|
||||
status: CommandExecutionStatus::Completed,
|
||||
}),
|
||||
},
|
||||
})],
|
||||
status: CodexStatus::Running,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn reasoning_items_emit_summary_not_raw_content() {
|
||||
let mut processor = EventProcessorWithJsonOutput::new(None);
|
||||
|
||||
let collected =
|
||||
processor.collect_thread_events(TypedExecEvent::ItemCompleted(ItemCompletedNotification {
|
||||
item: ThreadItem::Reasoning {
|
||||
id: "reasoning-1".to_string(),
|
||||
summary: vec!["safe summary".to_string()],
|
||||
content: vec!["raw reasoning".to_string()],
|
||||
},
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
}));
|
||||
|
||||
assert_eq!(
|
||||
collected,
|
||||
CollectedThreadEvents {
|
||||
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
||||
item: ExecThreadItem {
|
||||
id: "reasoning-1".to_string(),
|
||||
details: ThreadItemDetails::Reasoning(crate::exec_events::ReasoningItem {
|
||||
text: "safe summary".to_string(),
|
||||
}),
|
||||
},
|
||||
})],
|
||||
status: CodexStatus::Running,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn web_search_completion_preserves_query_and_action() {
|
||||
let mut processor = EventProcessorWithJsonOutput::new(None);
|
||||
|
||||
let collected =
|
||||
processor.collect_thread_events(TypedExecEvent::ItemCompleted(ItemCompletedNotification {
|
||||
item: ThreadItem::WebSearch {
|
||||
id: "search-1".to_string(),
|
||||
query: "rust async await".to_string(),
|
||||
action: Some(ApiWebSearchAction::Search {
|
||||
query: Some("rust async await".to_string()),
|
||||
queries: None,
|
||||
}),
|
||||
},
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
}));
|
||||
|
||||
assert_eq!(
|
||||
collected,
|
||||
CollectedThreadEvents {
|
||||
events: vec![ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
||||
item: ExecThreadItem {
|
||||
id: "search-1".to_string(),
|
||||
details: ThreadItemDetails::WebSearch(WebSearchItem {
|
||||
id: "search-1".to_string(),
|
||||
query: "rust async await".to_string(),
|
||||
action: WebSearchAction::Search {
|
||||
query: Some("rust async await".to_string()),
|
||||
queries: None,
|
||||
},
|
||||
}),
|
||||
},
|
||||
})],
|
||||
status: CodexStatus::Running,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn plan_update_emits_started_then_updated_then_completed() {
|
||||
let mut processor = EventProcessorWithJsonOutput::new(None);
|
||||
|
||||
let started = processor.collect_thread_events(TypedExecEvent::TurnPlanUpdated(
|
||||
TurnPlanUpdatedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
explanation: None,
|
||||
plan: vec![
|
||||
TurnPlanStep {
|
||||
step: "step one".to_string(),
|
||||
status: TurnPlanStepStatus::Pending,
|
||||
},
|
||||
TurnPlanStep {
|
||||
step: "step two".to_string(),
|
||||
status: TurnPlanStepStatus::InProgress,
|
||||
},
|
||||
],
|
||||
},
|
||||
));
|
||||
assert_eq!(
|
||||
started,
|
||||
CollectedThreadEvents {
|
||||
events: vec![ThreadEvent::ItemStarted(ItemStartedEvent {
|
||||
item: ExecThreadItem {
|
||||
id: "item_0".to_string(),
|
||||
details: ThreadItemDetails::TodoList(TodoListItem {
|
||||
items: vec![
|
||||
TodoItem {
|
||||
text: "step one".to_string(),
|
||||
completed: false,
|
||||
},
|
||||
TodoItem {
|
||||
text: "step two".to_string(),
|
||||
completed: false,
|
||||
},
|
||||
],
|
||||
}),
|
||||
},
|
||||
})],
|
||||
status: CodexStatus::Running,
|
||||
}
|
||||
);
|
||||
|
||||
let updated = processor.collect_thread_events(TypedExecEvent::TurnPlanUpdated(
|
||||
TurnPlanUpdatedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
explanation: None,
|
||||
plan: vec![
|
||||
TurnPlanStep {
|
||||
step: "step one".to_string(),
|
||||
status: TurnPlanStepStatus::Completed,
|
||||
},
|
||||
TurnPlanStep {
|
||||
step: "step two".to_string(),
|
||||
status: TurnPlanStepStatus::InProgress,
|
||||
},
|
||||
],
|
||||
},
|
||||
));
|
||||
assert_eq!(
|
||||
updated,
|
||||
CollectedThreadEvents {
|
||||
events: vec![ThreadEvent::ItemUpdated(ItemUpdatedEvent {
|
||||
item: ExecThreadItem {
|
||||
id: "item_0".to_string(),
|
||||
details: ThreadItemDetails::TodoList(TodoListItem {
|
||||
items: vec![
|
||||
TodoItem {
|
||||
text: "step one".to_string(),
|
||||
completed: true,
|
||||
},
|
||||
TodoItem {
|
||||
text: "step two".to_string(),
|
||||
completed: false,
|
||||
},
|
||||
],
|
||||
}),
|
||||
},
|
||||
})],
|
||||
status: CodexStatus::Running,
|
||||
}
|
||||
);
|
||||
|
||||
let completed =
|
||||
processor.collect_thread_events(TypedExecEvent::TurnCompleted(TurnCompletedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn: Turn {
|
||||
id: "turn-1".to_string(),
|
||||
items: Vec::new(),
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
},
|
||||
}));
|
||||
assert_eq!(
|
||||
completed,
|
||||
CollectedThreadEvents {
|
||||
events: vec![
|
||||
ThreadEvent::ItemCompleted(ItemCompletedEvent {
|
||||
item: ExecThreadItem {
|
||||
id: "item_0".to_string(),
|
||||
details: ThreadItemDetails::TodoList(TodoListItem {
|
||||
items: vec![
|
||||
TodoItem {
|
||||
text: "step one".to_string(),
|
||||
completed: true,
|
||||
},
|
||||
TodoItem {
|
||||
text: "step two".to_string(),
|
||||
completed: false,
|
||||
},
|
||||
],
|
||||
}),
|
||||
},
|
||||
}),
|
||||
ThreadEvent::TurnCompleted(TurnCompletedEvent {
|
||||
usage: Usage::default(),
|
||||
}),
|
||||
],
|
||||
status: CodexStatus::InitiateShutdown,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn token_usage_update_is_emitted_on_turn_completion() {
|
||||
let mut processor = EventProcessorWithJsonOutput::new(None);
|
||||
|
||||
let usage_update = processor.collect_thread_events(TypedExecEvent::ThreadTokenUsageUpdated(
|
||||
codex_app_server_protocol::ThreadTokenUsageUpdatedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
token_usage: ThreadTokenUsage {
|
||||
total: TokenUsageBreakdown {
|
||||
total_tokens: 42,
|
||||
input_tokens: 10,
|
||||
cached_input_tokens: 3,
|
||||
output_tokens: 29,
|
||||
reasoning_output_tokens: 7,
|
||||
},
|
||||
last: TokenUsageBreakdown {
|
||||
total_tokens: 42,
|
||||
input_tokens: 10,
|
||||
cached_input_tokens: 3,
|
||||
output_tokens: 29,
|
||||
reasoning_output_tokens: 7,
|
||||
},
|
||||
model_context_window: Some(128_000),
|
||||
},
|
||||
},
|
||||
));
|
||||
assert_eq!(
|
||||
usage_update,
|
||||
CollectedThreadEvents {
|
||||
events: Vec::new(),
|
||||
status: CodexStatus::Running,
|
||||
}
|
||||
);
|
||||
|
||||
let completed =
|
||||
processor.collect_thread_events(TypedExecEvent::TurnCompleted(TurnCompletedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn: Turn {
|
||||
id: "turn-1".to_string(),
|
||||
items: Vec::new(),
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
},
|
||||
}));
|
||||
assert_eq!(
|
||||
completed,
|
||||
CollectedThreadEvents {
|
||||
events: vec![ThreadEvent::TurnCompleted(TurnCompletedEvent {
|
||||
usage: Usage {
|
||||
input_tokens: 10,
|
||||
cached_input_tokens: 3,
|
||||
output_tokens: 29,
|
||||
},
|
||||
})],
|
||||
status: CodexStatus::InitiateShutdown,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn turn_completion_recovers_final_message_from_turn_items() {
|
||||
let mut processor = EventProcessorWithJsonOutput::new(None);
|
||||
|
||||
let completed =
|
||||
processor.collect_thread_events(TypedExecEvent::TurnCompleted(TurnCompletedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn: Turn {
|
||||
id: "turn-1".to_string(),
|
||||
items: vec![ThreadItem::AgentMessage {
|
||||
id: "msg-1".to_string(),
|
||||
text: "final answer".to_string(),
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
}],
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
},
|
||||
}));
|
||||
|
||||
assert_eq!(
|
||||
completed,
|
||||
CollectedThreadEvents {
|
||||
events: vec![ThreadEvent::TurnCompleted(TurnCompletedEvent {
|
||||
usage: Usage::default(),
|
||||
})],
|
||||
status: CodexStatus::InitiateShutdown,
|
||||
}
|
||||
);
|
||||
assert_eq!(processor.final_message.as_deref(), Some("final answer"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn turn_completion_overwrites_stale_final_message_from_turn_items() {
|
||||
let mut processor = EventProcessorWithJsonOutput::new(None);
|
||||
processor.final_message = Some("stale answer".to_string());
|
||||
|
||||
let completed =
|
||||
processor.collect_thread_events(TypedExecEvent::TurnCompleted(TurnCompletedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn: Turn {
|
||||
id: "turn-1".to_string(),
|
||||
items: vec![ThreadItem::AgentMessage {
|
||||
id: "msg-1".to_string(),
|
||||
text: "final answer".to_string(),
|
||||
phase: None,
|
||||
memory_citation: None,
|
||||
}],
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
},
|
||||
}));
|
||||
|
||||
assert_eq!(
|
||||
completed,
|
||||
CollectedThreadEvents {
|
||||
events: vec![ThreadEvent::TurnCompleted(TurnCompletedEvent {
|
||||
usage: Usage::default(),
|
||||
})],
|
||||
status: CodexStatus::InitiateShutdown,
|
||||
}
|
||||
);
|
||||
assert_eq!(processor.final_message.as_deref(), Some("final answer"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn turn_completion_preserves_streamed_final_message_when_turn_items_are_empty() {
|
||||
let mut processor = EventProcessorWithJsonOutput::new(None);
|
||||
processor.final_message = Some("streamed answer".to_string());
|
||||
|
||||
let completed =
|
||||
processor.collect_thread_events(TypedExecEvent::TurnCompleted(TurnCompletedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn: Turn {
|
||||
id: "turn-1".to_string(),
|
||||
items: Vec::new(),
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
},
|
||||
}));
|
||||
|
||||
assert_eq!(
|
||||
completed,
|
||||
CollectedThreadEvents {
|
||||
events: vec![ThreadEvent::TurnCompleted(TurnCompletedEvent {
|
||||
usage: Usage::default(),
|
||||
})],
|
||||
status: CodexStatus::InitiateShutdown,
|
||||
}
|
||||
);
|
||||
assert_eq!(processor.final_message.as_deref(), Some("streamed answer"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn turn_completion_falls_back_to_final_plan_text() {
|
||||
let mut processor = EventProcessorWithJsonOutput::new(None);
|
||||
|
||||
let completed =
|
||||
processor.collect_thread_events(TypedExecEvent::TurnCompleted(TurnCompletedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn: Turn {
|
||||
id: "turn-1".to_string(),
|
||||
items: vec![ThreadItem::Plan {
|
||||
id: "plan-1".to_string(),
|
||||
text: "ship the typed adapter".to_string(),
|
||||
}],
|
||||
status: TurnStatus::Completed,
|
||||
error: None,
|
||||
},
|
||||
}));
|
||||
|
||||
assert_eq!(
|
||||
completed,
|
||||
CollectedThreadEvents {
|
||||
events: vec![ThreadEvent::TurnCompleted(TurnCompletedEvent {
|
||||
usage: Usage::default(),
|
||||
})],
|
||||
status: CodexStatus::InitiateShutdown,
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
processor.final_message.as_deref(),
|
||||
Some("ship the typed adapter")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn turn_failure_prefers_structured_error_message() {
|
||||
let mut processor = EventProcessorWithJsonOutput::new(None);
|
||||
|
||||
let error = processor.collect_thread_events(TypedExecEvent::Error(ErrorNotification {
|
||||
error: TurnError {
|
||||
message: "backend failed".to_string(),
|
||||
codex_error_info: None,
|
||||
additional_details: Some("request id abc".to_string()),
|
||||
},
|
||||
will_retry: false,
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
}));
|
||||
assert_eq!(
|
||||
error,
|
||||
CollectedThreadEvents {
|
||||
events: vec![ThreadEvent::Error(ThreadErrorEvent {
|
||||
message: "backend failed (request id abc)".to_string(),
|
||||
})],
|
||||
status: CodexStatus::Running,
|
||||
}
|
||||
);
|
||||
|
||||
let failed =
|
||||
processor.collect_thread_events(TypedExecEvent::TurnCompleted(TurnCompletedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn: Turn {
|
||||
id: "turn-1".to_string(),
|
||||
items: Vec::new(),
|
||||
status: TurnStatus::Failed,
|
||||
error: None,
|
||||
},
|
||||
}));
|
||||
assert_eq!(
|
||||
failed,
|
||||
CollectedThreadEvents {
|
||||
events: vec![ThreadEvent::TurnFailed(
|
||||
crate::exec_events::TurnFailedEvent {
|
||||
error: ThreadErrorEvent {
|
||||
message: "backend failed (request id abc)".to_string(),
|
||||
},
|
||||
}
|
||||
)],
|
||||
status: CodexStatus::InitiateShutdown,
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn model_reroute_surfaces_as_error_item() {
|
||||
let mut processor = EventProcessorWithJsonOutput::new(None);
|
||||
|
||||
let collected = processor.collect_thread_events(TypedExecEvent::ModelRerouted(
|
||||
codex_app_server_protocol::ModelReroutedNotification {
|
||||
thread_id: "thread-1".to_string(),
|
||||
turn_id: "turn-1".to_string(),
|
||||
from_model: "gpt-5".to_string(),
|
||||
to_model: "gpt-5-mini".to_string(),
|
||||
reason: codex_app_server_protocol::ModelRerouteReason::HighRiskCyberActivity,
|
||||
},
|
||||
));
|
||||
|
||||
assert_eq!(collected.status, CodexStatus::Running);
|
||||
assert_eq!(collected.events.len(), 1);
|
||||
let ThreadEvent::ItemCompleted(ItemCompletedEvent { item }) = &collected.events[0] else {
|
||||
panic!("expected ItemCompleted");
|
||||
};
|
||||
assert_eq!(item.id, "item_0");
|
||||
assert_eq!(
|
||||
item.details,
|
||||
ThreadItemDetails::Error(crate::exec_events::ErrorItem {
|
||||
message: "model rerouted: gpt-5 -> gpt-5-mini (HighRiskCyberActivity)".to_string(),
|
||||
})
|
||||
);
|
||||
}
|
||||
Reference in New Issue
Block a user