mirror of
https://github.com/openai/codex.git
synced 2026-05-11 23:02:39 +00:00
Compare commits
1 Commits
dh--app-se
...
jif/rework
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
540a678365 |
@@ -82,6 +82,7 @@ use codex_config::CloudRequirementsLoader;
|
||||
use codex_config::LoaderOverrides;
|
||||
use codex_config::ThreadConfigLoader;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::resolve_installation_id;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_login::AuthManager;
|
||||
@@ -344,7 +345,7 @@ impl InProcessClientHandle {
|
||||
/// the runtime is shut down and an `InvalidData` error is returned.
|
||||
pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle> {
|
||||
let initialize = args.initialize.clone();
|
||||
let client = start_uninitialized(args);
|
||||
let client = start_uninitialized(args).await?;
|
||||
|
||||
let initialize_response = client
|
||||
.request(ClientRequest::Initialize {
|
||||
@@ -364,10 +365,11 @@ pub async fn start(args: InProcessStartArgs) -> IoResult<InProcessClientHandle>
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
async fn start_uninitialized(args: InProcessStartArgs) -> IoResult<InProcessClientHandle> {
|
||||
let channel_capacity = args.channel_capacity.max(1);
|
||||
let (client_tx, mut client_rx) = mpsc::channel::<InProcessClientMessage>(channel_capacity);
|
||||
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
|
||||
let installation_id = resolve_installation_id(&args.config.codex_home).await?;
|
||||
|
||||
let runtime_handle = tokio::spawn(async move {
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(channel_capacity);
|
||||
@@ -427,6 +429,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
config_warnings: args.config_warnings,
|
||||
session_source: args.session_source,
|
||||
auth_manager,
|
||||
installation_id,
|
||||
rpc_transport: AppServerRpcTransport::InProcess,
|
||||
remote_control_handle: None,
|
||||
plugin_startup_tasks: crate::PluginStartupTasks::Start,
|
||||
@@ -719,13 +722,13 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
}
|
||||
});
|
||||
|
||||
InProcessClientHandle {
|
||||
Ok(InProcessClientHandle {
|
||||
client: InProcessClientSender { client_tx },
|
||||
event_rx,
|
||||
runtime_handle,
|
||||
#[cfg(test)]
|
||||
_test_codex_home: None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -7,6 +7,7 @@ use codex_config::NoopThreadConfigLoader;
|
||||
use codex_config::RemoteThreadConfigLoader;
|
||||
use codex_config::ThreadConfigLoader;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::resolve_installation_id;
|
||||
use codex_exec_server::EnvironmentManagerArgs;
|
||||
use codex_features::Feature;
|
||||
use codex_login::AuthManager;
|
||||
@@ -754,6 +755,7 @@ pub async fn run_main_with_transport_options(
|
||||
AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await;
|
||||
let analytics_events_client =
|
||||
analytics_events_client_from_config(Arc::clone(&auth_manager), &config);
|
||||
let installation_id = resolve_installation_id(&config.codex_home).await?;
|
||||
let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(
|
||||
outgoing_tx,
|
||||
analytics_events_client.clone(),
|
||||
@@ -773,6 +775,7 @@ pub async fn run_main_with_transport_options(
|
||||
config_warnings,
|
||||
session_source,
|
||||
auth_manager,
|
||||
installation_id,
|
||||
rpc_transport: analytics_rpc_transport(&transport),
|
||||
remote_control_handle: Some(remote_control_handle.clone()),
|
||||
plugin_startup_tasks: runtime_options.plugin_startup_tasks,
|
||||
|
||||
@@ -258,6 +258,7 @@ pub(crate) struct MessageProcessorArgs {
|
||||
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
|
||||
pub(crate) session_source: SessionSource,
|
||||
pub(crate) auth_manager: Arc<AuthManager>,
|
||||
pub(crate) installation_id: String,
|
||||
pub(crate) rpc_transport: AppServerRpcTransport,
|
||||
pub(crate) remote_control_handle: Option<RemoteControlHandle>,
|
||||
pub(crate) plugin_startup_tasks: crate::PluginStartupTasks,
|
||||
@@ -280,6 +281,7 @@ impl MessageProcessor {
|
||||
config_warnings,
|
||||
session_source,
|
||||
auth_manager,
|
||||
installation_id,
|
||||
rpc_transport,
|
||||
remote_control_handle,
|
||||
plugin_startup_tasks,
|
||||
@@ -299,6 +301,7 @@ impl MessageProcessor {
|
||||
Some(analytics_events_client.clone()),
|
||||
Arc::clone(&thread_store),
|
||||
state_db.clone(),
|
||||
installation_id,
|
||||
));
|
||||
thread_manager
|
||||
.plugins_manager()
|
||||
|
||||
@@ -294,6 +294,7 @@ async fn build_test_processor(
|
||||
config_warnings: Vec::new(),
|
||||
session_source: SessionSource::VSCode,
|
||||
auth_manager,
|
||||
installation_id: "11111111-1111-4111-8111-111111111111".to_string(),
|
||||
rpc_transport: AppServerRpcTransport::Stdio,
|
||||
remote_control_handle: None,
|
||||
plugin_startup_tasks: crate::PluginStartupTasks::Start,
|
||||
|
||||
@@ -39,6 +39,7 @@ pub use codex_core::config::TerminalResizeReflowConfig;
|
||||
pub use codex_core::config::ThreadStoreConfig;
|
||||
pub use codex_core::config::find_codex_home;
|
||||
pub use codex_core::init_state_db;
|
||||
pub use codex_core::resolve_installation_id;
|
||||
pub use codex_core::skills::SkillsManager;
|
||||
pub use codex_core::thread_store_from_config;
|
||||
pub use codex_exec_server::EnvironmentManager;
|
||||
|
||||
@@ -75,6 +75,7 @@ pub(crate) async fn run_codex_thread_interactive(
|
||||
let (tx_ops, rx_ops) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let CodexSpawnOk { codex, .. } = Box::pin(Codex::spawn(CodexSpawnArgs {
|
||||
config,
|
||||
installation_id: parent_session.installation_id.clone(),
|
||||
auth_manager,
|
||||
models_manager,
|
||||
environment_manager: Arc::clone(&parent_session.services.environment_manager),
|
||||
|
||||
@@ -13,6 +13,7 @@ use codex_protocol::user_input::UserInput;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::config::Config;
|
||||
use crate::resolve_installation_id;
|
||||
use crate::session::session::Session;
|
||||
use crate::session::turn::build_prompt;
|
||||
use crate::session::turn::built_tools;
|
||||
@@ -38,6 +39,7 @@ pub async fn build_prompt_input(
|
||||
)?;
|
||||
|
||||
let thread_store = thread_store_from_config(&config, state_db.clone());
|
||||
let installation_id = resolve_installation_id(&config.codex_home).await?;
|
||||
let thread_manager = ThreadManager::new(
|
||||
&config,
|
||||
Arc::clone(&auth_manager),
|
||||
@@ -46,6 +48,7 @@ pub async fn build_prompt_input(
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store,
|
||||
state_db.clone(),
|
||||
installation_id,
|
||||
);
|
||||
let thread = thread_manager.start_thread(config).await?;
|
||||
|
||||
|
||||
@@ -32,7 +32,6 @@ use crate::context::PersonalitySpecInstructions;
|
||||
use crate::default_skill_metadata_budget;
|
||||
use crate::environment_selection::ResolvedTurnEnvironments;
|
||||
use crate::exec_policy::ExecPolicyManager;
|
||||
use crate::installation_id::resolve_installation_id;
|
||||
use crate::parse_turn_item;
|
||||
use crate::path_utils::normalize_for_native_workdir;
|
||||
use crate::realtime_conversation::RealtimeConversationManager;
|
||||
@@ -388,6 +387,7 @@ pub struct CodexSpawnOk {
|
||||
|
||||
pub(crate) struct CodexSpawnArgs {
|
||||
pub(crate) config: Config,
|
||||
pub(crate) installation_id: String,
|
||||
pub(crate) auth_manager: Arc<AuthManager>,
|
||||
pub(crate) models_manager: SharedModelsManager,
|
||||
pub(crate) environment_manager: Arc<EnvironmentManager>,
|
||||
@@ -449,6 +449,7 @@ impl Codex {
|
||||
async fn spawn_internal(args: CodexSpawnArgs) -> CodexResult<CodexSpawnOk> {
|
||||
let CodexSpawnArgs {
|
||||
mut config,
|
||||
installation_id,
|
||||
auth_manager,
|
||||
models_manager,
|
||||
environment_manager,
|
||||
@@ -637,6 +638,7 @@ impl Codex {
|
||||
let session = Session::new(
|
||||
session_configuration,
|
||||
config.clone(),
|
||||
installation_id,
|
||||
auth_manager.clone(),
|
||||
models_manager.clone(),
|
||||
exec_policy,
|
||||
|
||||
@@ -11,6 +11,7 @@ use tokio::sync::Semaphore;
|
||||
/// A session has at most 1 running task at a time, and can be interrupted by user input.
|
||||
pub(crate) struct Session {
|
||||
pub(crate) conversation_id: ThreadId,
|
||||
pub(crate) installation_id: String,
|
||||
pub(super) tx_event: Sender<Event>,
|
||||
pub(super) agent_status: watch::Sender<AgentStatus>,
|
||||
pub(super) out_of_band_elicitation_paused: watch::Sender<bool>,
|
||||
@@ -330,6 +331,7 @@ impl Session {
|
||||
pub(crate) async fn new(
|
||||
mut session_configuration: SessionConfiguration,
|
||||
config: Arc<Config>,
|
||||
installation_id: String,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
models_manager: SharedModelsManager,
|
||||
exec_policy: Arc<ExecPolicyManager>,
|
||||
@@ -815,7 +817,6 @@ impl Session {
|
||||
});
|
||||
}
|
||||
|
||||
let installation_id = resolve_installation_id(&config.codex_home).await?;
|
||||
let analytics_events_client = analytics_events_client.unwrap_or_else(|| {
|
||||
AnalyticsEventsClient::new(
|
||||
Arc::clone(&auth_manager),
|
||||
@@ -868,7 +869,7 @@ impl Session {
|
||||
model_client: ModelClient::new(
|
||||
Some(Arc::clone(&auth_manager)),
|
||||
conversation_id,
|
||||
installation_id,
|
||||
installation_id.clone(),
|
||||
session_configuration.provider.clone(),
|
||||
session_configuration.session_source.clone(),
|
||||
config.model_verbosity,
|
||||
@@ -888,6 +889,7 @@ impl Session {
|
||||
let (mailbox, mailbox_rx) = Mailbox::new();
|
||||
let sess = Arc::new(Session {
|
||||
conversation_id,
|
||||
installation_id,
|
||||
tx_event: tx_event.clone(),
|
||||
agent_status,
|
||||
out_of_band_elicitation_paused,
|
||||
|
||||
@@ -3476,6 +3476,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() {
|
||||
let result = Session::new(
|
||||
session_configuration,
|
||||
Arc::clone(&config),
|
||||
"11111111-1111-4111-8111-111111111111".to_string(),
|
||||
auth_manager,
|
||||
models_manager,
|
||||
Arc::new(ExecPolicyManager::default()),
|
||||
@@ -3695,6 +3696,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
let (mailbox, mailbox_rx) = crate::agent::Mailbox::new();
|
||||
let session = Session {
|
||||
conversation_id,
|
||||
installation_id: "11111111-1111-4111-8111-111111111111".to_string(),
|
||||
tx_event,
|
||||
agent_status: agent_status_tx,
|
||||
out_of_band_elicitation_paused: watch::channel(false).0,
|
||||
@@ -3796,6 +3798,7 @@ async fn make_session_with_config_and_rx(
|
||||
let session = Session::new(
|
||||
session_configuration,
|
||||
Arc::clone(&config),
|
||||
"11111111-1111-4111-8111-111111111111".to_string(),
|
||||
auth_manager,
|
||||
models_manager,
|
||||
Arc::new(ExecPolicyManager::default()),
|
||||
@@ -5223,6 +5226,7 @@ where
|
||||
let (mailbox, mailbox_rx) = crate::agent::Mailbox::new();
|
||||
let session = Arc::new(Session {
|
||||
conversation_id,
|
||||
installation_id: "11111111-1111-4111-8111-111111111111".to_string(),
|
||||
tx_event,
|
||||
agent_status: agent_status_tx,
|
||||
out_of_band_elicitation_paused: watch::channel(false).0,
|
||||
|
||||
@@ -736,6 +736,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
|
||||
|
||||
let CodexSpawnOk { codex, .. } = Codex::spawn(CodexSpawnArgs {
|
||||
config,
|
||||
installation_id: "11111111-1111-4111-8111-111111111111".to_string(),
|
||||
auth_manager,
|
||||
models_manager,
|
||||
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
|
||||
|
||||
@@ -248,6 +248,7 @@ pub(crate) struct ThreadManagerState {
|
||||
skills_watcher: Arc<SkillsWatcher>,
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
session_source: SessionSource,
|
||||
installation_id: String,
|
||||
analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
state_db: Option<StateDbHandle>,
|
||||
// Captures submitted ops for testing purpose when test mode is enabled.
|
||||
@@ -280,6 +281,7 @@ pub fn thread_store_from_config(
|
||||
}
|
||||
|
||||
impl ThreadManager {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
config: &Config,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
@@ -288,6 +290,7 @@ impl ThreadManager {
|
||||
analytics_events_client: Option<AnalyticsEventsClient>,
|
||||
thread_store: Arc<dyn ThreadStore>,
|
||||
state_db: Option<StateDbHandle>,
|
||||
installation_id: String,
|
||||
) -> Self {
|
||||
let codex_home = config.codex_home.clone();
|
||||
let restriction_product = session_source.restriction_product();
|
||||
@@ -316,6 +319,7 @@ impl ThreadManager {
|
||||
thread_store,
|
||||
auth_manager,
|
||||
session_source,
|
||||
installation_id,
|
||||
analytics_events_client,
|
||||
state_db,
|
||||
ops_log: should_use_test_thread_manager_behavior()
|
||||
@@ -415,6 +419,7 @@ impl ThreadManager {
|
||||
thread_store,
|
||||
auth_manager,
|
||||
session_source: SessionSource::Exec,
|
||||
installation_id: uuid::Uuid::new_v4().to_string(),
|
||||
analytics_events_client: None,
|
||||
state_db,
|
||||
ops_log: should_use_test_thread_manager_behavior()
|
||||
@@ -1152,6 +1157,7 @@ impl ThreadManagerState {
|
||||
codex, thread_id, ..
|
||||
} = Codex::spawn(CodexSpawnArgs {
|
||||
config,
|
||||
installation_id: self.installation_id.clone(),
|
||||
auth_manager,
|
||||
models_manager: Arc::clone(&self.models_manager),
|
||||
environment_manager: Arc::clone(&self.environment_manager),
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use super::*;
|
||||
use crate::config::test_config;
|
||||
use crate::init_state_db;
|
||||
use crate::installation_id::INSTALLATION_ID_FILENAME;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::session::session::SessionSettingsUpdate;
|
||||
use crate::session::tests::make_session_and_context;
|
||||
@@ -26,6 +27,8 @@ use std::time::Duration;
|
||||
use tempfile::tempdir;
|
||||
use wiremock::MockServer;
|
||||
|
||||
const TEST_INSTALLATION_ID: &str = "11111111-1111-4111-8111-111111111111";
|
||||
|
||||
fn user_msg(text: &str) -> ResponseItem {
|
||||
ResponseItem::Message {
|
||||
id: None,
|
||||
@@ -392,6 +395,7 @@ async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() {
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store_from_config(&config, /*state_db*/ None),
|
||||
/*state_db*/ None,
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
);
|
||||
let selected_cwd =
|
||||
AbsolutePathBuf::try_from(config.cwd.as_path().join("selected")).expect("absolute path");
|
||||
@@ -484,6 +488,44 @@ async fn resume_and_fork_do_not_restore_thread_environments_from_rollout() {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn explicit_installation_id_skips_codex_home_file() {
|
||||
let temp_dir = tempdir().expect("tempdir");
|
||||
let mut config = test_config().await;
|
||||
config.codex_home = temp_dir.path().join("codex-home").abs();
|
||||
config.cwd = config.codex_home.abs();
|
||||
std::fs::create_dir_all(&config.codex_home).expect("create codex home");
|
||||
|
||||
let auth_manager =
|
||||
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
|
||||
let installation_id = uuid::Uuid::new_v4().to_string();
|
||||
let manager = ThreadManager::new(
|
||||
&config,
|
||||
auth_manager,
|
||||
SessionSource::Exec,
|
||||
Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()),
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store_from_config(&config, /*state_db*/ None),
|
||||
/*state_db*/ None,
|
||||
installation_id.clone(),
|
||||
);
|
||||
|
||||
let thread = manager
|
||||
.start_thread(config.clone())
|
||||
.await
|
||||
.expect("start thread with explicit installation id");
|
||||
|
||||
assert!(!config.codex_home.join(INSTALLATION_ID_FILENAME).exists());
|
||||
assert_eq!(thread.thread.codex.session.installation_id, installation_id);
|
||||
|
||||
thread
|
||||
.thread
|
||||
.shutdown_and_wait()
|
||||
.await
|
||||
.expect("shutdown thread");
|
||||
let _ = manager.remove_thread(&thread.thread_id).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn resume_active_thread_from_rollout_returns_running_thread() {
|
||||
let temp_dir = tempdir().expect("tempdir");
|
||||
@@ -502,6 +544,7 @@ async fn resume_active_thread_from_rollout_returns_running_thread() {
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store_from_config(&config, /*state_db*/ None),
|
||||
/*state_db*/ None,
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
);
|
||||
|
||||
let source = manager
|
||||
@@ -556,6 +599,7 @@ async fn resume_stopped_thread_from_rollout_spawns_new_thread() {
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store_from_config(&config, /*state_db*/ None),
|
||||
/*state_db*/ None,
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
);
|
||||
|
||||
let source = manager
|
||||
@@ -620,6 +664,7 @@ async fn new_uses_active_provider_for_model_refresh() {
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store_from_config(&config, /*state_db*/ None),
|
||||
/*state_db*/ None,
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
);
|
||||
|
||||
let _ = manager.list_models(RefreshStrategy::Online).await;
|
||||
@@ -833,6 +878,7 @@ async fn interrupted_fork_snapshot_does_not_synthesize_turn_id_for_legacy_histor
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store_from_config(&config, state_db.clone()),
|
||||
state_db.clone(),
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
);
|
||||
|
||||
let source = manager
|
||||
@@ -937,6 +983,7 @@ async fn interrupted_fork_snapshot_preserves_explicit_turn_id() {
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store_from_config(&config, state_db.clone()),
|
||||
state_db.clone(),
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
);
|
||||
|
||||
let source = manager
|
||||
@@ -1030,6 +1077,7 @@ async fn interrupted_fork_snapshot_uses_persisted_mid_turn_history_without_live_
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store_from_config(&config, state_db.clone()),
|
||||
state_db.clone(),
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
);
|
||||
|
||||
let source = manager
|
||||
@@ -1168,6 +1216,7 @@ async fn resumed_thread_keeps_paused_goal_paused() -> anyhow::Result<()> {
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store_from_config(&config, state_db.clone()),
|
||||
state_db.clone(),
|
||||
TEST_INSTALLATION_ID.to_string(),
|
||||
);
|
||||
|
||||
let source = manager
|
||||
|
||||
@@ -3166,6 +3166,7 @@ async fn tool_handlers_cascade_close_and_resume_and_keep_explicitly_closed_subtr
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store_from_config(&config, state_db.clone()),
|
||||
state_db.clone(),
|
||||
"11111111-1111-4111-8111-111111111111".to_string(),
|
||||
);
|
||||
|
||||
let parent = manager
|
||||
|
||||
@@ -16,6 +16,7 @@ use codex_config::CloudRequirementsLoader;
|
||||
use codex_core::CodexThread;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::resolve_installation_id;
|
||||
use codex_core::shell::Shell;
|
||||
use codex_core::shell::get_shell_by_model_provided_path;
|
||||
use codex_core::thread_store_from_config;
|
||||
@@ -424,25 +425,18 @@ impl TestCodexBuilder {
|
||||
) -> anyhow::Result<TestCodex> {
|
||||
let auth = self.auth.clone();
|
||||
let state_db = codex_core::init_state_db(&config).await;
|
||||
let thread_manager = if config.model_catalog.is_some() {
|
||||
ThreadManager::new(
|
||||
&config,
|
||||
codex_core::test_support::auth_manager_from_auth(auth.clone()),
|
||||
SessionSource::Exec,
|
||||
Arc::clone(&environment_manager),
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store_from_config(&config, state_db.clone()),
|
||||
state_db.clone(),
|
||||
)
|
||||
} else {
|
||||
codex_core::test_support::thread_manager_with_models_provider_home_and_state(
|
||||
auth.clone(),
|
||||
config.model_provider.clone(),
|
||||
config.codex_home.to_path_buf(),
|
||||
Arc::clone(&environment_manager),
|
||||
state_db.clone(),
|
||||
)
|
||||
};
|
||||
codex_core::test_support::set_thread_manager_test_mode(true);
|
||||
let installation_id = resolve_installation_id(&config.codex_home).await?;
|
||||
let thread_manager = ThreadManager::new(
|
||||
&config,
|
||||
codex_core::test_support::auth_manager_from_auth(auth.clone()),
|
||||
SessionSource::Exec,
|
||||
Arc::clone(&environment_manager),
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store_from_config(&config, state_db.clone()),
|
||||
state_db.clone(),
|
||||
installation_id,
|
||||
);
|
||||
let thread_manager = Arc::new(thread_manager);
|
||||
let user_shell_override = self.user_shell_override.clone();
|
||||
|
||||
|
||||
@@ -5,6 +5,7 @@ use codex_core::NewThread;
|
||||
use codex_core::Prompt;
|
||||
use codex_core::ResponseEvent;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::resolve_installation_id;
|
||||
use codex_core::thread_store_from_config;
|
||||
use codex_features::Feature;
|
||||
use codex_login::AuthManager;
|
||||
@@ -1106,6 +1107,9 @@ async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() {
|
||||
Ok(None) => panic!("No CodexAuth found in codex_home"),
|
||||
Err(e) => panic!("Failed to load CodexAuth: {e}"),
|
||||
};
|
||||
let installation_id = resolve_installation_id(&config.codex_home)
|
||||
.await
|
||||
.expect("resolve installation id");
|
||||
let thread_manager = ThreadManager::new(
|
||||
&config,
|
||||
auth_manager,
|
||||
@@ -1114,6 +1118,7 @@ async fn prefers_apikey_when_config_prefers_apikey_even_with_chatgpt_tokens() {
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store_from_config(&config, /*state_db*/ None),
|
||||
/*state_db*/ None,
|
||||
installation_id,
|
||||
);
|
||||
let NewThread { thread: codex, .. } = thread_manager
|
||||
.start_thread(config.clone())
|
||||
|
||||
@@ -248,6 +248,7 @@ async fn list_skills_skips_cwd_roots_when_environment_disabled() -> Result<()> {
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store_from_config(&config, /*state_db*/ None),
|
||||
/*state_db*/ None,
|
||||
"11111111-1111-4111-8111-111111111111".to_string(),
|
||||
);
|
||||
let new_thread = thread_manager.start_thread(config.clone()).await?;
|
||||
let cwd = config.cwd.to_path_buf();
|
||||
|
||||
@@ -114,6 +114,16 @@ pub async fn run_main(
|
||||
let (incoming_tx, mut incoming_rx) = mpsc::channel::<IncomingMessage>(CHANNEL_CAPACITY);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded_channel::<OutgoingMessage>();
|
||||
|
||||
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
|
||||
let mut processor = MessageProcessor::new(
|
||||
outgoing_message_sender,
|
||||
arg0_paths,
|
||||
Arc::new(config),
|
||||
environment_manager,
|
||||
state_db,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Task: read from stdin, push to `incoming_tx`.
|
||||
let stdin_reader_handle = tokio::spawn({
|
||||
async move {
|
||||
@@ -138,28 +148,17 @@ pub async fn run_main(
|
||||
});
|
||||
|
||||
// Task: process incoming messages.
|
||||
let processor_handle = tokio::spawn({
|
||||
let outgoing_message_sender = OutgoingMessageSender::new(outgoing_tx);
|
||||
let mut processor = MessageProcessor::new(
|
||||
outgoing_message_sender,
|
||||
arg0_paths,
|
||||
Arc::new(config),
|
||||
environment_manager,
|
||||
state_db,
|
||||
)
|
||||
.await;
|
||||
async move {
|
||||
while let Some(msg) = incoming_rx.recv().await {
|
||||
match msg {
|
||||
JsonRpcMessage::Request(r) => processor.process_request(r).await,
|
||||
JsonRpcMessage::Response(r) => processor.process_response(r).await,
|
||||
JsonRpcMessage::Notification(n) => processor.process_notification(n).await,
|
||||
JsonRpcMessage::Error(e) => processor.process_error(e),
|
||||
}
|
||||
let processor_handle = tokio::spawn(async move {
|
||||
while let Some(msg) = incoming_rx.recv().await {
|
||||
match msg {
|
||||
JsonRpcMessage::Request(r) => processor.process_request(r).await,
|
||||
JsonRpcMessage::Response(r) => processor.process_response(r).await,
|
||||
JsonRpcMessage::Notification(n) => processor.process_notification(n).await,
|
||||
JsonRpcMessage::Error(e) => processor.process_error(e),
|
||||
}
|
||||
|
||||
info!("processor task exited (channel closed)");
|
||||
}
|
||||
|
||||
info!("processor task exited (channel closed)");
|
||||
});
|
||||
|
||||
// Task: write outgoing messages to stdout.
|
||||
|
||||
@@ -5,6 +5,7 @@ use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_core::StateDbHandle;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::resolve_installation_id;
|
||||
use codex_core::thread_store_from_config;
|
||||
use codex_exec_server::EnvironmentManager;
|
||||
use codex_login::AuthManager;
|
||||
@@ -55,13 +56,14 @@ impl MessageProcessor {
|
||||
config: Arc<Config>,
|
||||
environment_manager: Arc<EnvironmentManager>,
|
||||
state_db: Option<StateDbHandle>,
|
||||
) -> Self {
|
||||
) -> std::io::Result<Self> {
|
||||
let outgoing = Arc::new(outgoing);
|
||||
let auth_manager = AuthManager::shared_from_config(
|
||||
config.as_ref(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
)
|
||||
.await;
|
||||
let installation_id = resolve_installation_id(&config.codex_home).await?;
|
||||
let thread_manager = Arc::new(ThreadManager::new(
|
||||
config.as_ref(),
|
||||
auth_manager,
|
||||
@@ -70,14 +72,15 @@ impl MessageProcessor {
|
||||
/*analytics_events_client*/ None,
|
||||
thread_store_from_config(config.as_ref(), state_db.clone()),
|
||||
state_db.clone(),
|
||||
installation_id,
|
||||
));
|
||||
Self {
|
||||
Ok(Self {
|
||||
outgoing,
|
||||
initialized: false,
|
||||
arg0_paths,
|
||||
thread_manager,
|
||||
running_requests_id_to_codex_uuid: Arc::new(Mutex::new(HashMap::new())),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn process_request(&mut self, request: JsonRpcRequest<ClientRequest>) {
|
||||
|
||||
@@ -56,6 +56,7 @@ use codex_core_api::built_in_model_providers;
|
||||
use codex_core_api::find_codex_home;
|
||||
use codex_core_api::init_state_db;
|
||||
use codex_core_api::item_event_to_server_notification;
|
||||
use codex_core_api::resolve_installation_id;
|
||||
use codex_core_api::set_default_originator;
|
||||
use codex_core_api::thread_store_from_config;
|
||||
|
||||
@@ -114,6 +115,7 @@ async fn run_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
|
||||
let thread_store = thread_store_from_config(&config, state_db.clone());
|
||||
let environment_manager =
|
||||
Arc::new(EnvironmentManager::new(EnvironmentManagerArgs::new(local_runtime_paths)).await);
|
||||
let installation_id = resolve_installation_id(&config.codex_home).await?;
|
||||
let thread_manager = ThreadManager::new(
|
||||
&config,
|
||||
auth_manager,
|
||||
@@ -122,6 +124,7 @@ async fn run_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
|
||||
/*analytics_events_client*/ None,
|
||||
Arc::clone(&thread_store),
|
||||
state_db,
|
||||
installation_id,
|
||||
);
|
||||
|
||||
let NewThread {
|
||||
|
||||
Reference in New Issue
Block a user