Compare commits

...

1 Commits

Author SHA1 Message Date
Ahmed Ibrahim
6fc3f1347f convert conversation manager to async 2025-12-04 13:10:46 -08:00
18 changed files with 83 additions and 68 deletions

View File

@@ -131,7 +131,8 @@ pub async fn run_main(
std::sync::Arc::new(config),
cli_overrides,
feedback.clone(),
);
)
.await;
async move {
while let Some(msg) = incoming_rx.recv().await {
match msg {

View File

@@ -36,7 +36,7 @@ pub(crate) struct MessageProcessor {
impl MessageProcessor {
/// Create a new `MessageProcessor`, retaining a handle to the outgoing
/// `Sender` so handlers can enqueue messages to be written to stdout.
pub(crate) fn new(
pub(crate) async fn new(
outgoing: OutgoingMessageSender,
codex_linux_sandbox_exe: Option<PathBuf>,
config: Arc<Config>,
@@ -49,10 +49,8 @@ impl MessageProcessor {
false,
config.cli_auth_credentials_store_mode,
);
let conversation_manager = Arc::new(ConversationManager::new(
auth_manager.clone(),
SessionSource::VSCode,
));
let conversation_manager =
Arc::new(ConversationManager::new(auth_manager.clone(), SessionSource::VSCode).await);
let codex_message_processor = CodexMessageProcessor::new(
auth_manager,
conversation_manager,

View File

@@ -42,7 +42,7 @@ pub struct ConversationManager {
}
impl ConversationManager {
pub fn new(auth_manager: Arc<AuthManager>, session_source: SessionSource) -> Self {
pub async fn new(auth_manager: Arc<AuthManager>, session_source: SessionSource) -> Self {
Self {
conversations: Arc::new(RwLock::new(HashMap::new())),
auth_manager: auth_manager.clone(),
@@ -53,11 +53,12 @@ impl ConversationManager {
/// Construct with a dummy AuthManager containing the provided CodexAuth.
/// Used for integration tests: should not be used by ordinary business logic.
pub fn with_auth(auth: CodexAuth) -> Self {
pub async fn with_auth(auth: CodexAuth) -> Self {
Self::new(
crate::AuthManager::from_auth_for_testing(auth),
SessionSource::Exec,
)
.await
}
pub fn session_source(&self) -> SessionSource {

View File

@@ -96,7 +96,7 @@ impl TestCodexBuilder {
let (config, cwd) = self.prepare_config(server, &home).await?;
let auth = self.auth.clone();
let conversation_manager = ConversationManager::with_auth(auth.clone());
let conversation_manager = ConversationManager::with_auth(auth.clone()).await;
let new_conversation = match resume_from {
Some(path) => {

View File

@@ -254,7 +254,7 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
config.user_instructions = Some("be nice".to_string());
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key"));
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key")).await;
let auth_manager =
codex_core::AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key"));
let NewConversation {
@@ -338,7 +338,7 @@ async fn includes_conversation_id_and_model_headers_in_request() {
config.model_provider = model_provider;
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key"));
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key")).await;
let NewConversation {
conversation: codex,
conversation_id,
@@ -394,7 +394,7 @@ async fn includes_base_instructions_override_in_request() {
config.model_provider = model_provider;
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key"));
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key")).await;
let codex = conversation_manager
.new_conversation(config)
.await
@@ -451,7 +451,7 @@ async fn chatgpt_auth_sends_correct_request() {
let codex_home = TempDir::new().unwrap();
let mut config = load_default_config_for_test(&codex_home);
config.model_provider = model_provider;
let conversation_manager = ConversationManager::with_auth(create_dummy_codex_auth());
let conversation_manager = ConversationManager::with_auth(create_dummy_codex_auth()).await;
let NewConversation {
conversation: codex,
conversation_id,
@@ -543,7 +543,7 @@ async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() {
Ok(None) => panic!("No CodexAuth found in codex_home"),
Err(e) => panic!("Failed to load CodexAuth: {e}"),
};
let conversation_manager = ConversationManager::new(auth_manager, SessionSource::Exec);
let conversation_manager = ConversationManager::new(auth_manager, SessionSource::Exec).await;
let NewConversation {
conversation: codex,
..
@@ -582,7 +582,7 @@ async fn includes_user_instructions_message_in_request() {
config.user_instructions = Some("be nice".to_string());
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key"));
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key")).await;
let codex = conversation_manager
.new_conversation(config)
.await
@@ -649,7 +649,7 @@ async fn skills_append_to_instructions_when_feature_enabled() {
config.cwd = codex_home.path().to_path_buf();
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key"));
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key")).await;
let codex = conversation_manager
.new_conversation(config)
.await
@@ -928,7 +928,7 @@ async fn includes_developer_instructions_message_in_request() {
config.developer_instructions = Some("be useful".to_string());
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key"));
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key")).await;
let codex = conversation_manager
.new_conversation(config)
.await
@@ -1156,7 +1156,8 @@ async fn token_count_includes_rate_limits_snapshot() {
let mut config = load_default_config_for_test(&home);
config.model_provider = provider;
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("test"));
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("test")).await;
let codex = conversation_manager
.new_conversation(config)
.await
@@ -1504,7 +1505,7 @@ async fn azure_overrides_assign_properties_used_for_responses_url() {
let mut config = load_default_config_for_test(&codex_home);
config.model_provider = provider;
let conversation_manager = ConversationManager::with_auth(create_dummy_codex_auth());
let conversation_manager = ConversationManager::with_auth(create_dummy_codex_auth()).await;
let codex = conversation_manager
.new_conversation(config)
.await
@@ -1582,7 +1583,7 @@ async fn env_var_overrides_loaded_auth() {
let mut config = load_default_config_for_test(&codex_home);
config.model_provider = provider;
let conversation_manager = ConversationManager::with_auth(create_dummy_codex_auth());
let conversation_manager = ConversationManager::with_auth(create_dummy_codex_auth()).await;
let codex = conversation_manager
.new_conversation(config)
.await
@@ -1661,7 +1662,7 @@ async fn history_dedupes_streamed_and_final_messages_across_turns() {
config.model_provider = model_provider;
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key"));
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key")).await;
let NewConversation {
conversation: codex,
..

View File

@@ -135,7 +135,8 @@ async fn summarize_context_three_requests_and_instructions() {
config.model_provider = model_provider;
set_test_compact_prompt(&mut config);
config.model_auto_compact_token_limit = Some(200_000);
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"));
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("dummy")).await;
let NewConversation {
conversation: codex,
session_configured,
@@ -329,12 +330,15 @@ async fn manual_compact_uses_custom_prompt() {
config.model_provider = model_provider;
config.compact_prompt = Some(custom_prompt.to_string());
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"));
let codex = conversation_manager
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("dummy")).await;
let NewConversation {
conversation: codex,
..
} = conversation_manager
.new_conversation(config)
.await
.expect("create conversation")
.conversation;
.expect("create conversation");
codex.submit(Op::Compact).await.expect("trigger compact");
let warning_event = wait_for_event(&codex, |ev| matches!(ev, EventMsg::Warning(_))).await;
@@ -409,7 +413,8 @@ async fn manual_compact_emits_api_and_local_token_usage_events() {
config.model_provider = model_provider;
set_test_compact_prompt(&mut config);
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"));
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("dummy")).await;
let NewConversation {
conversation: codex,
..
@@ -1050,7 +1055,8 @@ async fn auto_compact_runs_after_token_limit_hit() {
config.model_provider = model_provider;
set_test_compact_prompt(&mut config);
config.model_auto_compact_token_limit = Some(200_000);
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"));
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("dummy")).await;
let codex = conversation_manager
.new_conversation(config)
.await
@@ -1295,7 +1301,8 @@ async fn auto_compact_persists_rollout_entries() {
let mut config = load_default_config_for_test(&home);
config.model_provider = model_provider;
set_test_compact_prompt(&mut config);
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"));
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("dummy")).await;
let NewConversation {
conversation: codex,
session_configured,
@@ -1398,6 +1405,7 @@ async fn manual_compact_retries_after_context_window_error() {
set_test_compact_prompt(&mut config);
config.model_auto_compact_token_limit = Some(200_000);
let codex = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"))
.await
.new_conversation(config)
.await
.unwrap()
@@ -1530,6 +1538,7 @@ async fn manual_compact_twice_preserves_latest_user_messages() {
config.model_provider = model_provider;
set_test_compact_prompt(&mut config);
let codex = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"))
.await
.new_conversation(config)
.await
.unwrap()
@@ -1731,7 +1740,8 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_
config.model_provider = model_provider;
set_test_compact_prompt(&mut config);
config.model_auto_compact_token_limit = Some(200);
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"));
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("dummy")).await;
let codex = conversation_manager
.new_conversation(config)
.await
@@ -1846,6 +1856,7 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() {
config.model_auto_compact_token_limit = Some(limit);
let codex = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"))
.await
.new_conversation(config)
.await
.unwrap()

View File

@@ -872,7 +872,7 @@ async fn start_test_conversation(
if let Some(model) = model {
config.model = model.to_string();
}
let manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"));
let manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy")).await;
let NewConversation { conversation, .. } = manager
.new_conversation(config.clone())
.await

View File

@@ -55,7 +55,8 @@ async fn fork_conversation_twice_drops_to_first_message() {
config.model_provider = model_provider.clone();
let config_for_fork = config.clone();
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"));
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("dummy")).await;
let NewConversation {
conversation: codex,
..

View File

@@ -8,7 +8,7 @@ use pretty_assertions::assert_eq;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn list_models_returns_api_key_models() -> Result<()> {
let manager = ConversationManager::with_auth(CodexAuth::from_api_key("sk-test"));
let manager = ConversationManager::with_auth(CodexAuth::from_api_key("sk-test")).await;
let models = manager.list_models().await;
let expected_models = expected_models_for_api_key();
@@ -20,7 +20,7 @@ async fn list_models_returns_api_key_models() -> Result<()> {
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn list_models_returns_chatgpt_models() -> Result<()> {
let manager =
ConversationManager::with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
ConversationManager::with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()).await;
let models = manager.list_models().await;
let expected_models = expected_models_for_chatgpt();

View File

@@ -23,7 +23,7 @@ async fn override_turn_context_does_not_persist_when_config_exists() {
config.model = "gpt-4o".to_string();
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key"));
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key")).await;
let codex = conversation_manager
.new_conversation(config)
.await
@@ -63,7 +63,7 @@ async fn override_turn_context_does_not_create_config_file() {
let config = load_default_config_for_test(&codex_home);
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key"));
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key")).await;
let codex = conversation_manager
.new_conversation(config)
.await

View File

@@ -16,7 +16,11 @@ use core_test_support::load_default_config_for_test;
use core_test_support::wait_for_event;
use tempfile::TempDir;
fn resume_history(config: &codex_core::config::Config, previous_model: &str, rollout_path: &std::path::Path) -> InitialHistory {
fn resume_history(
config: &codex_core::config::Config,
previous_model: &str,
rollout_path: &std::path::Path,
) -> InitialHistory {
let turn_ctx = TurnContextItem {
cwd: config.cwd.clone(),
approval_policy: config.approval_policy,
@@ -47,7 +51,8 @@ async fn emits_warning_when_resumed_model_differs() {
let initial_history = resume_history(&config, "previous-model", &rollout_path);
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("test"));
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("test")).await;
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("test"));
// Act: resume the conversation.

View File

@@ -744,7 +744,7 @@ where
config.model_provider = model_provider;
mutator(&mut config);
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key"));
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key")).await;
conversation_manager
.new_conversation(config)
.await
@@ -771,7 +771,7 @@ where
config.model_provider = model_provider;
mutator(&mut config);
let conversation_manager =
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key"));
ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key")).await;
let auth_manager =
codex_core::AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key"));
conversation_manager

View File

@@ -43,7 +43,7 @@ async fn user_shell_cmd_ls_and_cat_in_temp_dir() {
config.cwd = cwd.path().to_path_buf();
let conversation_manager =
ConversationManager::with_auth(codex_core::CodexAuth::from_api_key("dummy"));
ConversationManager::with_auth(codex_core::CodexAuth::from_api_key("dummy")).await;
let NewConversation {
conversation: codex,
..
@@ -100,7 +100,7 @@ async fn user_shell_cmd_can_be_interrupted() {
let codex_home = TempDir::new().unwrap();
let config = load_default_config_for_test(&codex_home);
let conversation_manager =
ConversationManager::with_auth(codex_core::CodexAuth::from_api_key("dummy"));
ConversationManager::with_auth(codex_core::CodexAuth::from_api_key("dummy")).await;
let NewConversation {
conversation: codex,
..

View File

@@ -278,7 +278,8 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
true,
config.cli_auth_credentials_store_mode,
);
let conversation_manager = ConversationManager::new(auth_manager.clone(), SessionSource::Exec);
let conversation_manager =
ConversationManager::new(auth_manager.clone(), SessionSource::Exec).await;
// Handle resume subcommand by resolving a rollout path and using explicit resume API.
let NewConversation {

View File

@@ -103,7 +103,8 @@ pub async fn run_main(
outgoing_message_sender,
codex_linux_sandbox_exe,
std::sync::Arc::new(config),
);
)
.await;
async move {
while let Some(msg) = incoming_rx.recv().await {
match msg {

View File

@@ -47,7 +47,7 @@ pub(crate) struct MessageProcessor {
impl MessageProcessor {
/// Create a new `MessageProcessor`, retaining a handle to the outgoing
/// `Sender` so handlers can enqueue messages to be written to stdout.
pub(crate) fn new(
pub(crate) async fn new(
outgoing: OutgoingMessageSender,
codex_linux_sandbox_exe: Option<PathBuf>,
config: Arc<Config>,
@@ -59,7 +59,7 @@ impl MessageProcessor {
config.cli_auth_credentials_store_mode,
);
let conversation_manager =
Arc::new(ConversationManager::new(auth_manager, SessionSource::Mcp));
Arc::new(ConversationManager::new(auth_manager, SessionSource::Mcp).await);
Self {
outgoing,
initialized: false,

View File

@@ -265,10 +265,8 @@ impl App {
let app_event_tx = AppEventSender::new(app_event_tx);
let auth_mode = auth_manager.auth().map(|auth| auth.mode);
let conversation_manager = Arc::new(ConversationManager::new(
auth_manager.clone(),
SessionSource::Cli,
));
let conversation_manager =
Arc::new(ConversationManager::new(auth_manager.clone(), SessionSource::Cli).await);
let exit_info = handle_model_migration_prompt_if_needed(
tui,
&mut config,
@@ -1140,12 +1138,11 @@ mod tests {
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
fn make_test_app() -> App {
async fn make_test_app() -> App {
let (chat_widget, app_event_tx, _rx, _op_rx) = make_chatwidget_manual_with_sender();
let config = chat_widget.config_ref().clone();
let server = Arc::new(ConversationManager::with_auth(CodexAuth::from_api_key(
"Test API Key",
)));
let server =
Arc::new(ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key")).await);
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key"));
let file_search = FileSearchManager::new(config.cwd.clone(), app_event_tx.clone());
@@ -1173,16 +1170,15 @@ mod tests {
}
}
fn make_test_app_with_channels() -> (
async fn make_test_app_with_channels() -> (
App,
tokio::sync::mpsc::UnboundedReceiver<AppEvent>,
tokio::sync::mpsc::UnboundedReceiver<Op>,
) {
let (chat_widget, app_event_tx, rx, op_rx) = make_chatwidget_manual_with_sender();
let config = chat_widget.config_ref().clone();
let server = Arc::new(ConversationManager::with_auth(CodexAuth::from_api_key(
"Test API Key",
)));
let server =
Arc::new(ConversationManager::with_auth(CodexAuth::from_api_key("Test API Key")).await);
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::from_api_key("Test API Key"));
let file_search = FileSearchManager::new(config.cwd.clone(), app_event_tx.clone());
@@ -1268,9 +1264,9 @@ mod tests {
));
}
#[test]
fn update_reasoning_effort_updates_config() {
let mut app = make_test_app();
#[tokio::test]
async fn update_reasoning_effort_updates_config() {
let mut app = make_test_app().await;
app.config.model_reasoning_effort = Some(ReasoningEffortConfig::Medium);
app.chat_widget
.set_reasoning_effort(Some(ReasoningEffortConfig::Medium));
@@ -1287,9 +1283,9 @@ mod tests {
);
}
#[test]
fn backtrack_selection_with_duplicate_history_targets_unique_turn() {
let mut app = make_test_app();
#[tokio::test]
async fn backtrack_selection_with_duplicate_history_targets_unique_turn() {
let mut app = make_test_app().await;
let user_cell = |text: &str| -> Arc<dyn HistoryCell> {
Arc::new(UserHistoryCell {
@@ -1355,7 +1351,7 @@ mod tests {
#[tokio::test]
async fn new_session_requests_shutdown_for_previous_conversation() {
let (mut app, mut app_event_rx, mut op_rx) = make_test_app_with_channels();
let (mut app, mut app_event_rx, mut op_rx) = make_test_app_with_channels().await;
let conversation_id = ConversationId::new();
let event = SessionConfiguredEvent {

View File

@@ -343,9 +343,8 @@ async fn helpers_are_available_and_do_not_panic() {
let (tx_raw, _rx) = unbounded_channel::<AppEvent>();
let tx = AppEventSender::new(tx_raw);
let cfg = test_config();
let conversation_manager = Arc::new(ConversationManager::with_auth(CodexAuth::from_api_key(
"test",
)));
let conversation_manager =
Arc::new(ConversationManager::with_auth(CodexAuth::from_api_key("test")).await);
let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("test"));
let init = ChatWidgetInit {
config: cfg,