Use per-launch idempotency for cloud executor registration

This commit is contained in:
Michael Zeng
2026-04-25 09:41:36 -07:00
parent 07182cdc04
commit 350b2679b4

View File

@@ -13,6 +13,7 @@ use tokio::time::sleep;
use tokio_tungstenite::connect_async;
use tracing::info;
use tracing::warn;
use uuid::Uuid;
use crate::ExecServerError;
use crate::ExecServerRuntimePaths;
@@ -131,9 +132,10 @@ impl CloudExecutorConfig {
fn registration_request(
&self,
auth: &CodexAuth,
registration_id: Uuid,
) -> Result<CloudEnvironmentRegisterExecutorRequest, ExecServerError> {
Ok(CloudEnvironmentRegisterExecutorRequest {
idempotency_id: self.default_idempotency_id(auth)?,
idempotency_id: self.default_idempotency_id(auth, registration_id)?,
environment_id: self.cloud_environment_id.clone(),
name: Some(self.cloud_name.clone()),
labels: BTreeMap::new(),
@@ -141,7 +143,11 @@ impl CloudExecutorConfig {
})
}
fn default_idempotency_id(&self, auth: &CodexAuth) -> Result<String, ExecServerError> {
fn default_idempotency_id(
&self,
auth: &CodexAuth,
registration_id: Uuid,
) -> Result<String, ExecServerError> {
let mut hasher = sha2::Sha256::new();
hasher.update(chatgpt_account_id(auth)?.as_bytes());
hasher.update(b"\0");
@@ -150,6 +156,8 @@ impl CloudExecutorConfig {
hasher.update(self.cloud_name.as_bytes());
hasher.update(b"\0");
hasher.update(PROTOCOL_VERSION);
hasher.update(b"\0");
hasher.update(registration_id.as_bytes());
let digest = hasher.finalize();
Ok(format!("codex-exec-server-{digest:x}"))
}
@@ -164,11 +172,12 @@ pub async fn run_cloud_executor(
) -> Result<(), ExecServerError> {
let client = CloudEnvironmentClient::new(config.cloud_base_url.clone(), auth_manager.clone())?;
let processor = ConnectionProcessor::new(runtime_paths);
let registration_id = Uuid::new_v4();
let mut backoff = Duration::from_secs(1);
loop {
let auth = cloud_environment_chatgpt_auth(&auth_manager).await?;
let request = config.registration_request(&auth)?;
let request = config.registration_request(&auth, registration_id)?;
let response = client.register_executor(&request).await?;
eprintln!(
"codex exec-server cloud executor {} registered in environment {}",
@@ -356,8 +365,9 @@ mod tests {
async fn register_executor_posts_with_chatgpt_auth_headers() {
let server = MockServer::start().await;
let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing();
let registration_id = Uuid::from_u128(1);
let request = CloudExecutorConfig::new(server.uri())
.registration_request(&auth)
.registration_request(&auth, registration_id)
.expect("registration request");
let expected_request = serde_json::to_value(&request).expect("serialize request");
Mock::given(method("POST"))
@@ -388,4 +398,25 @@ mod tests {
}
);
}
#[test]
fn registration_idempotency_key_is_stable_within_process_and_unique_across_launches() {
let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing();
let config = CloudExecutorConfig::new("http://127.0.0.1:18084".to_string());
let first_registration_id = Uuid::from_u128(1);
let second_registration_id = Uuid::from_u128(2);
let first = config
.registration_request(&auth, first_registration_id)
.expect("first registration");
let repeated = config
.registration_request(&auth, first_registration_id)
.expect("repeated registration");
let second = config
.registration_request(&auth, second_registration_id)
.expect("second registration");
assert_eq!(first.idempotency_id, repeated.idempotency_id);
assert_ne!(first.idempotency_id, second.idempotency_id);
}
}