Compare commits

..

1 Commits

Author SHA1 Message Date
mikhail-oai
0786633811 Use binary split-entry keyring storage for auth to avoid Windows credential size failures 2026-03-01 18:30:57 -05:00
38 changed files with 808 additions and 1423 deletions

View File

@@ -46,16 +46,6 @@
"type": "null"
}
]
},
"planType": {
"anyOf": [
{
"$ref": "#/definitions/PlanType"
},
{
"type": "null"
}
]
}
},
"type": "object"

View File

@@ -7319,16 +7319,6 @@
"type": "null"
}
]
},
"planType": {
"anyOf": [
{
"$ref": "#/definitions/v2/PlanType"
},
{
"type": "null"
}
]
}
},
"title": "AccountUpdatedNotification",

View File

@@ -26,20 +26,6 @@
"type": "string"
}
]
},
"PlanType": {
"enum": [
"free",
"go",
"plus",
"pro",
"team",
"business",
"enterprise",
"edu",
"unknown"
],
"type": "string"
}
},
"properties": {
@@ -52,16 +38,6 @@
"type": "null"
}
]
},
"planType": {
"anyOf": [
{
"$ref": "#/definitions/PlanType"
},
{
"type": "null"
}
]
}
},
"title": "AccountUpdatedNotification",

View File

@@ -2,6 +2,5 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { AuthMode } from "../AuthMode";
import type { PlanType } from "../PlanType";
export type AccountUpdatedNotification = { authMode: AuthMode | null, planType: PlanType | null, };
export type AccountUpdatedNotification = { authMode: AuthMode | null, };

View File

@@ -2557,7 +2557,6 @@ pub struct Thread {
#[ts(export_to = "v2/")]
pub struct AccountUpdatedNotification {
pub auth_mode: Option<AuthMode>,
pub plan_type: Option<PlanType>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]

View File

@@ -952,7 +952,7 @@ The JSON-RPC auth/account surface exposes request/response methods plus server-i
### Authentication modes
Codex supports these authentication modes. The current mode is surfaced in `account/updated` (`authMode`), which also includes the current ChatGPT `planType` when available, and can be inferred from `account/read`.
Codex supports these authentication modes. The current mode is surfaced in `account/updated` (`authMode`) and can be inferred from `account/read`.
- **API key (`apiKey`)**: Caller supplies an OpenAI API key via `account/login/start` with `type: "apiKey"`. The API key is saved and used for API requests.
- **ChatGPT managed (`chatgpt`)** (recommended): Codex owns the ChatGPT OAuth flow and refresh tokens. Start via `account/login/start` with `type: "chatgpt"`; Codex persists tokens to disk and refreshes them automatically.
@@ -964,7 +964,7 @@ Codex supports these authentication modes. The current mode is surfaced in `acco
- `account/login/completed` (notify) — emitted when a login attempt finishes (success or error).
- `account/login/cancel` — cancel a pending ChatGPT login by `loginId`.
- `account/logout` — sign out; triggers `account/updated`.
- `account/updated` (notify) — emitted whenever auth mode changes (`authMode`: `apikey`, `chatgpt`, or `null`) and includes the current ChatGPT `planType` when available.
- `account/updated` (notify) — emitted whenever auth mode changes (`authMode`: `apikey`, `chatgpt`, or `null`).
- `account/rateLimits/read` — fetch ChatGPT rate limits; updates arrive via `account/rateLimits/updated` (notify).
- `account/rateLimits/updated` (notify) — emitted whenever a user's ChatGPT rate limits change.
- `mcpServer/oauthLogin/completed` (notify) — emitted after a `mcpServer/oauth/login` flow finishes for a server; payload includes `{ name, success, error? }`.
@@ -1008,7 +1008,7 @@ Field notes:
3. Notifications:
```json
{ "method": "account/login/completed", "params": { "loginId": null, "success": true, "error": null } }
{ "method": "account/updated", "params": { "authMode": "apikey", "planType": null } }
{ "method": "account/updated", "params": { "authMode": "apikey" } }
```
### 3) Log in with ChatGPT (browser flow)
@@ -1022,7 +1022,7 @@ Field notes:
3. Wait for notifications:
```json
{ "method": "account/login/completed", "params": { "loginId": "<uuid>", "success": true, "error": null } }
{ "method": "account/updated", "params": { "authMode": "chatgpt", "planType": "plus" } }
{ "method": "account/updated", "params": { "authMode": "chatgpt" } }
```
### 4) Cancel a ChatGPT login
@@ -1037,7 +1037,7 @@ Field notes:
```json
{ "method": "account/logout", "id": 5 }
{ "id": 5, "result": {} }
{ "method": "account/updated", "params": { "authMode": null, "planType": null } }
{ "method": "account/updated", "params": { "authMode": null } }
```
### 6) Rate limits (ChatGPT)

View File

@@ -423,14 +423,6 @@ pub(crate) struct CodexMessageProcessorArgs {
}
impl CodexMessageProcessor {
fn current_account_updated_notification(&self) -> AccountUpdatedNotification {
let auth = self.auth_manager.auth_cached();
AccountUpdatedNotification {
auth_mode: auth.as_ref().map(CodexAuth::api_auth_mode),
plan_type: auth.as_ref().and_then(CodexAuth::account_plan_type),
}
}
async fn load_thread(
&self,
thread_id: &str,
@@ -1065,10 +1057,15 @@ impl CodexMessageProcessor {
))
.await;
let payload_v2 = AccountUpdatedNotification {
auth_mode: self
.auth_manager
.auth_cached()
.as_ref()
.map(CodexAuth::api_auth_mode),
};
self.outgoing
.send_server_notification(ServerNotification::AccountUpdated(
self.current_account_updated_notification(),
))
.send_server_notification(ServerNotification::AccountUpdated(payload_v2))
.await;
}
Err(error) => {
@@ -1284,10 +1281,12 @@ impl CodexMessageProcessor {
.await;
// Notify clients with the actual current auth mode.
let auth = auth_manager.auth_cached();
let current_auth_method = auth_manager
.auth_cached()
.as_ref()
.map(CodexAuth::api_auth_mode);
let payload_v2 = AccountUpdatedNotification {
auth_mode: auth.as_ref().map(CodexAuth::api_auth_mode),
plan_type: auth.as_ref().and_then(CodexAuth::account_plan_type),
auth_mode: current_auth_method,
};
outgoing_clone
.send_server_notification(ServerNotification::AccountUpdated(
@@ -1468,10 +1467,11 @@ impl CodexMessageProcessor {
))
.await;
let payload_v2 = AccountUpdatedNotification {
auth_mode: self.auth_manager.get_api_auth_mode(),
};
self.outgoing
.send_server_notification(ServerNotification::AccountUpdated(
self.current_account_updated_notification(),
))
.send_server_notification(ServerNotification::AccountUpdated(payload_v2))
.await;
}
@@ -1529,7 +1529,6 @@ impl CodexMessageProcessor {
let payload_v2 = AccountUpdatedNotification {
auth_mode: current_auth_method,
plan_type: None,
};
self.outgoing
.send_server_notification(ServerNotification::AccountUpdated(payload_v2))

View File

@@ -612,7 +612,6 @@ mod tests {
fn verify_account_updated_notification_serialization() {
let notification = ServerNotification::AccountUpdated(AccountUpdatedNotification {
auth_mode: Some(AuthMode::ApiKey),
plan_type: None,
});
let jsonrpc_notification = OutgoingMessage::AppServerNotification(notification);
@@ -620,8 +619,7 @@ mod tests {
json!({
"method": "account/updated",
"params": {
"authMode": "apikey",
"planType": null
"authMode": "apikey"
},
}),
serde_json::to_value(jsonrpc_notification)

View File

@@ -628,7 +628,6 @@ fn append_rollout_turn_context(path: &Path, timestamp: &str, model: &str) -> std
model: model.to_string(),
personality: None,
collaboration_mode: None,
realtime_active: Some(false),
effort: None,
summary: ReasoningSummary::Auto,
user_instructions: None,

View File

@@ -131,7 +131,6 @@ async fn logout_account_removes_auth_and_notifies() -> Result<()> {
payload.auth_mode.is_none(),
"auth_method should be None after logout"
);
assert_eq!(payload.plan_type, None);
assert!(
!codex_home.path().join("auth.json").exists(),
@@ -202,7 +201,6 @@ async fn set_auth_token_updates_account_and_notifies() -> Result<()> {
bail!("unexpected notification: {parsed:?}");
};
assert_eq!(payload.auth_mode, Some(AuthMode::ChatgptAuthTokens));
assert_eq!(payload.plan_type, Some(AccountPlanType::Pro));
let get_id = mcp
.send_get_account_request(GetAccountParams {
@@ -845,7 +843,6 @@ async fn login_account_api_key_succeeds_and_notifies() -> Result<()> {
bail!("unexpected notification: {parsed:?}");
};
pretty_assertions::assert_eq!(payload.auth_mode, Some(AuthMode::ApiKey));
pretty_assertions::assert_eq!(payload.plan_type, None);
assert!(codex_home.path().join("auth.json").exists());
Ok(())

View File

@@ -133,6 +133,21 @@ impl AuthStorageBackend for FileAuthStorage {
}
const KEYRING_SERVICE: &str = "Codex Auth";
const KEYRING_LAYOUT_VERSION: &str = "v2";
const KEYRING_ACTIVE_REVISION_ENTRY: &str = "active";
const KEYRING_MANIFEST_ENTRY: &str = "manifest";
const KEYRING_OPENAI_API_KEY_ENTRY: &str = "OPENAI_API_KEY";
const KEYRING_ID_TOKEN_ENTRY: &str = "tokens.id_token";
const KEYRING_ACCESS_TOKEN_ENTRY: &str = "tokens.access_token";
const KEYRING_REFRESH_TOKEN_ENTRY: &str = "tokens.refresh_token";
const KEYRING_ACCOUNT_ID_ENTRY: &str = "tokens.account_id";
const KEYRING_RECORD_ENTRIES: [&str; 5] = [
KEYRING_MANIFEST_ENTRY,
KEYRING_OPENAI_API_KEY_ENTRY,
KEYRING_ID_TOKEN_ENTRY,
KEYRING_ACCESS_TOKEN_ENTRY,
KEYRING_REFRESH_TOKEN_ENTRY,
];
// turns codex_home path into a stable, short key string
fn compute_store_key(codex_home: &Path) -> std::io::Result<String> {
@@ -148,6 +163,47 @@ fn compute_store_key(codex_home: &Path) -> std::io::Result<String> {
Ok(format!("cli|{truncated}"))
}
fn keyring_layout_key(base_key: &str, suffix: &str) -> String {
format!("{base_key}|{KEYRING_LAYOUT_VERSION}|{suffix}")
}
fn keyring_revision_key(base_key: &str, revision: &str, suffix: &str) -> String {
format!("{base_key}|{KEYRING_LAYOUT_VERSION}|{revision}|{suffix}")
}
fn next_keyring_revision() -> String {
Utc::now()
.timestamp_nanos_opt()
.unwrap_or_else(|| Utc::now().timestamp_micros() * 1_000)
.to_string()
}
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
struct KeyringAuthManifest {
auth_mode: Option<AuthMode>,
has_openai_api_key: bool,
has_tokens: bool,
has_account_id: bool,
last_refresh: Option<DateTime<Utc>>,
}
impl From<&AuthDotJson> for KeyringAuthManifest {
fn from(auth: &AuthDotJson) -> Self {
let has_account_id = auth
.tokens
.as_ref()
.and_then(|tokens| tokens.account_id.as_ref())
.is_some();
Self {
auth_mode: auth.auth_mode,
has_openai_api_key: auth.openai_api_key.is_some(),
has_tokens: auth.tokens.is_some(),
has_account_id,
last_refresh: auth.last_refresh,
}
}
}
#[derive(Clone, Debug)]
struct KeyringAuthStorage {
codex_home: PathBuf,
@@ -162,7 +218,7 @@ impl KeyringAuthStorage {
}
}
fn load_from_keyring(&self, key: &str) -> std::io::Result<Option<AuthDotJson>> {
fn load_legacy_from_keyring(&self, key: &str) -> std::io::Result<Option<AuthDotJson>> {
match self.keyring_store.load(KEYRING_SERVICE, key) {
Ok(Some(serialized)) => serde_json::from_str(&serialized).map(Some).map_err(|err| {
std::io::Error::other(format!(
@@ -177,19 +233,231 @@ impl KeyringAuthStorage {
}
}
fn save_to_keyring(&self, key: &str, value: &str) -> std::io::Result<()> {
match self.keyring_store.save(KEYRING_SERVICE, key, value) {
fn load_secret_from_keyring(&self, key: &str, field: &str) -> std::io::Result<Option<Vec<u8>>> {
match self.keyring_store.load_secret(KEYRING_SERVICE, key) {
Ok(secret) => Ok(secret),
Err(error) => Err(std::io::Error::other(format!(
"failed to load {field} from keyring: {}",
error.message()
))),
}
}
fn load_utf8_secret_from_keyring(
&self,
key: &str,
field: &str,
) -> std::io::Result<Option<String>> {
let Some(secret) = self.load_secret_from_keyring(key, field)? else {
return Ok(None);
};
String::from_utf8(secret).map(Some).map_err(|err| {
std::io::Error::other(format!(
"failed to decode {field} from keyring as UTF-8: {err}"
))
})
}
fn save_secret_to_keyring(&self, key: &str, value: &[u8], field: &str) -> std::io::Result<()> {
match self.keyring_store.save_secret(KEYRING_SERVICE, key, value) {
Ok(()) => Ok(()),
Err(error) => {
let message = format!(
"failed to write OAuth tokens to keyring: {}",
error.message()
);
let message = format!("failed to write {field} to keyring: {}", error.message());
warn!("{message}");
Err(std::io::Error::other(message))
}
}
}
fn load_active_revision(&self, base_key: &str) -> std::io::Result<Option<String>> {
let active_key = keyring_layout_key(base_key, KEYRING_ACTIVE_REVISION_ENTRY);
self.load_utf8_secret_from_keyring(&active_key, "active auth revision")
}
fn load_required_utf8_secret(&self, key: &str, field: &str) -> std::io::Result<String> {
self.load_utf8_secret_from_keyring(key, field)?
.ok_or_else(|| std::io::Error::other(format!("missing {field} in keyring")))
}
fn load_manifest(
&self,
base_key: &str,
revision: &str,
) -> std::io::Result<KeyringAuthManifest> {
let manifest_key = keyring_revision_key(base_key, revision, KEYRING_MANIFEST_ENTRY);
let manifest = self
.load_secret_from_keyring(&manifest_key, "auth manifest")?
.ok_or_else(|| std::io::Error::other("missing auth manifest in keyring"))?;
serde_json::from_slice(&manifest).map_err(|err| {
std::io::Error::other(format!(
"failed to deserialize auth manifest from keyring: {err}"
))
})
}
fn load_v2_from_keyring(&self, base_key: &str, revision: &str) -> std::io::Result<AuthDotJson> {
let manifest = self.load_manifest(base_key, revision)?;
let openai_api_key = if manifest.has_openai_api_key {
let key = keyring_revision_key(base_key, revision, KEYRING_OPENAI_API_KEY_ENTRY);
Some(self.load_required_utf8_secret(&key, "OPENAI_API_KEY")?)
} else {
None
};
let tokens = if manifest.has_tokens {
let id_token_key = keyring_revision_key(base_key, revision, KEYRING_ID_TOKEN_ENTRY);
let id_token = self.load_required_utf8_secret(&id_token_key, "ID token")?;
let access_token_key =
keyring_revision_key(base_key, revision, KEYRING_ACCESS_TOKEN_ENTRY);
let access_token = self.load_required_utf8_secret(&access_token_key, "access token")?;
let refresh_token_key =
keyring_revision_key(base_key, revision, KEYRING_REFRESH_TOKEN_ENTRY);
let refresh_token =
self.load_required_utf8_secret(&refresh_token_key, "refresh token")?;
let account_id = if manifest.has_account_id {
let account_id_key =
keyring_revision_key(base_key, revision, KEYRING_ACCOUNT_ID_ENTRY);
Some(self.load_required_utf8_secret(&account_id_key, "account ID")?)
} else {
None
};
Some(TokenData {
id_token: crate::token_data::parse_chatgpt_jwt_claims(&id_token)
.map_err(std::io::Error::other)?,
access_token,
refresh_token,
account_id,
})
} else {
None
};
Ok(AuthDotJson {
auth_mode: manifest.auth_mode,
openai_api_key,
tokens,
last_refresh: manifest.last_refresh,
})
}
fn load_from_keyring(&self, base_key: &str) -> std::io::Result<Option<AuthDotJson>> {
if let Some(revision) = self.load_active_revision(base_key)? {
return self.load_v2_from_keyring(base_key, &revision).map(Some);
}
self.load_legacy_from_keyring(base_key)
}
fn write_optional_secret(
&self,
base_key: &str,
revision: &str,
entry: &str,
value: Option<&str>,
field: &str,
) -> std::io::Result<()> {
if let Some(value) = value {
let key = keyring_revision_key(base_key, revision, entry);
self.save_secret_to_keyring(&key, value.as_bytes(), field)?;
}
Ok(())
}
fn delete_keyring_entry(&self, key: &str) -> std::io::Result<bool> {
self.keyring_store
.delete(KEYRING_SERVICE, key)
.map_err(|err| {
std::io::Error::other(format!("failed to delete auth from keyring: {err}"))
})
}
fn delete_v2_revision(&self, base_key: &str, revision: &str) -> std::io::Result<bool> {
let mut removed = false;
for entry in KEYRING_RECORD_ENTRIES {
let key = keyring_revision_key(base_key, revision, entry);
removed |= self.delete_keyring_entry(&key)?;
}
let account_id_key = keyring_revision_key(base_key, revision, KEYRING_ACCOUNT_ID_ENTRY);
removed |= self.delete_keyring_entry(&account_id_key)?;
Ok(removed)
}
fn delete_from_keyring_only(&self) -> std::io::Result<bool> {
let base_key = compute_store_key(&self.codex_home)?;
let mut removed = false;
if let Some(revision) = self.load_active_revision(&base_key)? {
removed |= self.delete_v2_revision(&base_key, &revision)?;
let active_key = keyring_layout_key(&base_key, KEYRING_ACTIVE_REVISION_ENTRY);
removed |= self.delete_keyring_entry(&active_key)?;
}
removed |= self.delete_keyring_entry(&base_key)?;
Ok(removed)
}
fn save_v2_to_keyring(&self, base_key: &str, auth: &AuthDotJson) -> std::io::Result<()> {
let previous_revision = match self.load_active_revision(base_key) {
Ok(revision) => revision,
Err(err) => {
warn!("failed to read previous auth revision from keyring: {err}");
None
}
};
let revision = next_keyring_revision();
let manifest = KeyringAuthManifest::from(auth);
self.write_optional_secret(
base_key,
&revision,
KEYRING_OPENAI_API_KEY_ENTRY,
auth.openai_api_key.as_deref(),
"OPENAI_API_KEY",
)?;
if let Some(tokens) = auth.tokens.as_ref() {
self.write_optional_secret(
base_key,
&revision,
KEYRING_ID_TOKEN_ENTRY,
Some(&tokens.id_token.raw_jwt),
"ID token",
)?;
self.write_optional_secret(
base_key,
&revision,
KEYRING_ACCESS_TOKEN_ENTRY,
Some(&tokens.access_token),
"access token",
)?;
self.write_optional_secret(
base_key,
&revision,
KEYRING_REFRESH_TOKEN_ENTRY,
Some(&tokens.refresh_token),
"refresh token",
)?;
self.write_optional_secret(
base_key,
&revision,
KEYRING_ACCOUNT_ID_ENTRY,
tokens.account_id.as_deref(),
"account ID",
)?;
}
let manifest_key = keyring_revision_key(base_key, &revision, KEYRING_MANIFEST_ENTRY);
let manifest_bytes = serde_json::to_vec(&manifest).map_err(std::io::Error::other)?;
self.save_secret_to_keyring(&manifest_key, &manifest_bytes, "auth manifest")?;
let active_key = keyring_layout_key(base_key, KEYRING_ACTIVE_REVISION_ENTRY);
self.save_secret_to_keyring(&active_key, revision.as_bytes(), "active auth revision")?;
if let Some(previous_revision) = previous_revision
&& previous_revision != revision
&& let Err(err) = self.delete_v2_revision(base_key, &previous_revision)
{
warn!("failed to remove stale auth revision from keyring: {err}");
}
if let Err(err) = self.delete_keyring_entry(base_key) {
warn!("failed to remove legacy auth entry from keyring: {err}");
}
Ok(())
}
}
impl AuthStorageBackend for KeyringAuthStorage {
@@ -199,10 +467,8 @@ impl AuthStorageBackend for KeyringAuthStorage {
}
fn save(&self, auth: &AuthDotJson) -> std::io::Result<()> {
let key = compute_store_key(&self.codex_home)?;
// Simpler error mapping per style: prefer method reference over closure
let serialized = serde_json::to_string(auth).map_err(std::io::Error::other)?;
self.save_to_keyring(&key, &serialized)?;
let base_key = compute_store_key(&self.codex_home)?;
self.save_v2_to_keyring(&base_key, auth)?;
if let Err(err) = delete_file_if_exists(&self.codex_home) {
warn!("failed to remove CLI auth fallback file: {err}");
}
@@ -210,13 +476,7 @@ impl AuthStorageBackend for KeyringAuthStorage {
}
fn delete(&self) -> std::io::Result<bool> {
let key = compute_store_key(&self.codex_home)?;
let keyring_removed = self
.keyring_store
.delete(KEYRING_SERVICE, &key)
.map_err(|err| {
std::io::Error::other(format!("failed to delete auth from keyring: {err}"))
})?;
let keyring_removed = self.delete_from_keyring_only()?;
let file_removed = delete_file_if_exists(&self.codex_home)?;
Ok(keyring_removed || file_removed)
}
@@ -341,9 +601,58 @@ mod tests {
use serde_json::json;
use tempfile::tempdir;
use codex_keyring_store::CredentialStoreError;
use codex_keyring_store::tests::MockKeyringStore;
use keyring::Error as KeyringError;
#[derive(Clone, Debug)]
struct SaveSecretErrorKeyringStore {
inner: MockKeyringStore,
}
impl KeyringStore for SaveSecretErrorKeyringStore {
fn load(
&self,
service: &str,
account: &str,
) -> Result<Option<String>, CredentialStoreError> {
self.inner.load(service, account)
}
fn load_secret(
&self,
service: &str,
account: &str,
) -> Result<Option<Vec<u8>>, CredentialStoreError> {
self.inner.load_secret(service, account)
}
fn save(
&self,
service: &str,
account: &str,
value: &str,
) -> Result<(), CredentialStoreError> {
self.inner.save(service, account, value)
}
fn save_secret(
&self,
_service: &str,
_account: &str,
_value: &[u8],
) -> Result<(), CredentialStoreError> {
Err(CredentialStoreError::new(KeyringError::Invalid(
"error".into(),
"save".into(),
)))
}
fn delete(&self, service: &str, account: &str) -> Result<bool, CredentialStoreError> {
self.inner.delete(service, account)
}
}
#[tokio::test]
async fn file_storage_load_returns_auth_dot_json() -> anyhow::Result<()> {
let codex_home = tempdir()?;
@@ -432,19 +741,19 @@ mod tests {
Ok(())
}
fn seed_keyring_and_fallback_auth_file_for_delete<F>(
mock_keyring: &MockKeyringStore,
fn seed_keyring_and_fallback_auth_file_for_delete(
storage: &KeyringAuthStorage,
codex_home: &Path,
compute_key: F,
) -> anyhow::Result<(String, PathBuf)>
where
F: FnOnce() -> std::io::Result<String>,
{
let key = compute_key()?;
mock_keyring.save(KEYRING_SERVICE, &key, "{}")?;
auth: &AuthDotJson,
) -> anyhow::Result<(String, String, PathBuf)> {
storage.save(auth)?;
let base_key = compute_store_key(codex_home)?;
let revision = storage
.load_active_revision(&base_key)?
.context("active auth revision should exist")?;
let auth_file = get_auth_file(codex_home);
std::fs::write(&auth_file, "stale")?;
Ok((key, auth_file))
Ok((base_key, revision, auth_file))
}
fn seed_keyring_with_auth<F>(
@@ -463,15 +772,58 @@ mod tests {
fn assert_keyring_saved_auth_and_removed_fallback(
mock_keyring: &MockKeyringStore,
key: &str,
base_key: &str,
codex_home: &Path,
expected: &AuthDotJson,
) {
let saved_value = mock_keyring
.saved_value(key)
.expect("keyring entry should exist");
let expected_serialized = serde_json::to_string(expected).expect("serialize expected auth");
assert_eq!(saved_value, expected_serialized);
let active_key = keyring_layout_key(base_key, KEYRING_ACTIVE_REVISION_ENTRY);
let revision = mock_keyring
.saved_secret_utf8(&active_key)
.expect("active auth revision should exist");
assert!(
mock_keyring.saved_value(base_key).is_none(),
"legacy keyring entry should not be used for split auth storage"
);
let manifest_key = keyring_revision_key(base_key, &revision, KEYRING_MANIFEST_ENTRY);
let manifest_bytes = mock_keyring
.saved_secret(&manifest_key)
.expect("auth manifest should exist");
let manifest: KeyringAuthManifest =
serde_json::from_slice(&manifest_bytes).expect("manifest should deserialize");
assert_eq!(manifest, KeyringAuthManifest::from(expected));
let openai_api_key_key =
keyring_revision_key(base_key, &revision, KEYRING_OPENAI_API_KEY_ENTRY);
assert_eq!(
mock_keyring.saved_secret_utf8(&openai_api_key_key),
expected.openai_api_key
);
if let Some(tokens) = expected.tokens.as_ref() {
let id_token_key = keyring_revision_key(base_key, &revision, KEYRING_ID_TOKEN_ENTRY);
assert_eq!(
mock_keyring.saved_secret_utf8(&id_token_key),
Some(tokens.id_token.raw_jwt.clone())
);
let access_token_key =
keyring_revision_key(base_key, &revision, KEYRING_ACCESS_TOKEN_ENTRY);
assert_eq!(
mock_keyring.saved_secret_utf8(&access_token_key),
Some(tokens.access_token.clone())
);
let refresh_token_key =
keyring_revision_key(base_key, &revision, KEYRING_REFRESH_TOKEN_ENTRY);
assert_eq!(
mock_keyring.saved_secret_utf8(&refresh_token_key),
Some(tokens.refresh_token.clone())
);
let account_id_key =
keyring_revision_key(base_key, &revision, KEYRING_ACCOUNT_ID_ENTRY);
assert_eq!(
mock_keyring.saved_secret_utf8(&account_id_key),
tokens.account_id.clone()
);
}
let auth_file = get_auth_file(codex_home);
assert!(
!auth_file.exists(),
@@ -520,7 +872,7 @@ mod tests {
}
#[test]
fn keyring_auth_storage_load_returns_deserialized_auth() -> anyhow::Result<()> {
fn keyring_auth_storage_load_supports_legacy_single_entry() -> anyhow::Result<()> {
let codex_home = tempdir()?;
let mock_keyring = MockKeyringStore::default();
let storage = KeyringAuthStorage::new(
@@ -544,6 +896,21 @@ mod tests {
Ok(())
}
#[test]
fn keyring_auth_storage_load_returns_deserialized_v2_auth() -> anyhow::Result<()> {
let codex_home = tempdir()?;
let mock_keyring = MockKeyringStore::default();
let storage =
KeyringAuthStorage::new(codex_home.path().to_path_buf(), Arc::new(mock_keyring));
let expected = auth_with_prefix("split");
storage.save(&expected)?;
let loaded = storage.load()?;
assert_eq!(Some(expected), loaded);
Ok(())
}
#[test]
fn keyring_auth_storage_compute_store_key_for_home_directory() -> anyhow::Result<()> {
let codex_home = PathBuf::from("~/.codex");
@@ -596,18 +963,29 @@ mod tests {
codex_home.path().to_path_buf(),
Arc::new(mock_keyring.clone()),
);
let (key, auth_file) = seed_keyring_and_fallback_auth_file_for_delete(
&mock_keyring,
codex_home.path(),
|| compute_store_key(codex_home.path()),
)?;
let auth = auth_with_prefix("delete");
let (base_key, revision, auth_file) =
seed_keyring_and_fallback_auth_file_for_delete(&storage, codex_home.path(), &auth)?;
let removed = storage.delete()?;
assert!(removed, "delete should report removal");
let active_key = keyring_layout_key(&base_key, KEYRING_ACTIVE_REVISION_ENTRY);
assert!(
!mock_keyring.contains(&key),
"keyring entry should be removed"
!mock_keyring.contains(&active_key),
"active revision should be removed"
);
for entry in KEYRING_RECORD_ENTRIES {
let key = keyring_revision_key(&base_key, &revision, entry);
assert!(
!mock_keyring.contains(&key),
"keyring entry should be removed"
);
}
let account_id_key = keyring_revision_key(&base_key, &revision, KEYRING_ACCOUNT_ID_ENTRY);
assert!(
!mock_keyring.contains(&account_id_key),
"account id entry should be removed"
);
assert!(
!auth_file.exists(),
@@ -662,7 +1040,11 @@ mod tests {
Arc::new(mock_keyring.clone()),
);
let key = compute_store_key(codex_home.path())?;
mock_keyring.set_error(&key, KeyringError::Invalid("error".into(), "load".into()));
let active_key = keyring_layout_key(&key, KEYRING_ACTIVE_REVISION_ENTRY);
mock_keyring.set_error(
&active_key,
KeyringError::Invalid("error".into(), "load".into()),
);
let expected = auth_with_prefix("fallback");
storage.file_storage.save(&expected)?;
@@ -701,12 +1083,13 @@ mod tests {
fn auto_auth_storage_save_falls_back_when_keyring_errors() -> anyhow::Result<()> {
let codex_home = tempdir()?;
let mock_keyring = MockKeyringStore::default();
let storage = AutoAuthStorage::new(
codex_home.path().to_path_buf(),
Arc::new(mock_keyring.clone()),
);
let failing_keyring = SaveSecretErrorKeyringStore {
inner: mock_keyring.clone(),
};
let storage =
AutoAuthStorage::new(codex_home.path().to_path_buf(), Arc::new(failing_keyring));
let key = compute_store_key(codex_home.path())?;
mock_keyring.set_error(&key, KeyringError::Invalid("error".into(), "save".into()));
let active_key = keyring_layout_key(&key, KEYRING_ACTIVE_REVISION_ENTRY);
let auth = auth_with_prefix("fallback");
storage.save(&auth)?;
@@ -722,8 +1105,8 @@ mod tests {
.context("fallback auth should exist")?;
assert_eq!(saved, auth);
assert!(
mock_keyring.saved_value(&key).is_none(),
"keyring should not contain value when save fails"
mock_keyring.saved_secret_utf8(&active_key).is_none(),
"keyring should not point to a saved auth revision when save fails"
);
Ok(())
}
@@ -736,18 +1119,37 @@ mod tests {
codex_home.path().to_path_buf(),
Arc::new(mock_keyring.clone()),
);
let (key, auth_file) = seed_keyring_and_fallback_auth_file_for_delete(
&mock_keyring,
let auth = auth_with_prefix("auto-delete");
let (base_key, revision, auth_file) = seed_keyring_and_fallback_auth_file_for_delete(
storage.keyring_storage.as_ref(),
codex_home.path(),
|| compute_store_key(codex_home.path()),
&auth,
)?;
let removed = storage.delete()?;
assert!(removed, "delete should report removal");
assert!(
!mock_keyring.contains(&key),
"keyring entry should be removed"
!mock_keyring.contains(&keyring_layout_key(
&base_key,
KEYRING_ACTIVE_REVISION_ENTRY
)),
"active revision should be removed"
);
for entry in KEYRING_RECORD_ENTRIES {
let key = keyring_revision_key(&base_key, &revision, entry);
assert!(
!mock_keyring.contains(&key),
"keyring entry should be removed"
);
}
assert!(
!mock_keyring.contains(&keyring_revision_key(
&base_key,
&revision,
KEYRING_ACCOUNT_ID_ENTRY
)),
"account id entry should be removed"
);
assert!(
!auth_file.exists(),

View File

@@ -166,19 +166,6 @@ pub enum SteerInputError {
ExpectedTurnMismatch { expected: String, actual: String },
EmptyInput,
}
/// Notes from the previous real user turn.
///
/// Conceptually this is the same role that `previous_model` used to fill, but
/// it can carry other prior-turn settings that matter when constructing
/// sensible state-change diffs or full-context reinjection, such as model
/// switches or detecting a prior `realtime_active -> false` transition.
#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct PreviousTurnSettings {
pub(crate) model: String,
pub(crate) realtime_active: Option<bool>,
}
use crate::exec_policy::ExecPolicyUpdateError;
use crate::feedback_tags;
use crate::file_watcher::FileWatcher;
@@ -628,7 +615,6 @@ impl TurnSkillsContext {
#[derive(Debug)]
pub(crate) struct TurnContext {
pub(crate) sub_id: String,
pub(crate) realtime_active: bool,
pub(crate) config: Arc<Config>,
pub(crate) auth_manager: Option<Arc<AuthManager>>,
pub(crate) model_info: ModelInfo,
@@ -716,7 +702,6 @@ impl TurnContext {
Self {
sub_id: self.sub_id.clone(),
realtime_active: self.realtime_active,
config: Arc::new(config),
auth_manager: self.auth_manager.clone(),
model_info: model_info.clone(),
@@ -780,7 +765,6 @@ impl TurnContext {
model: self.model_info.slug.clone(),
personality: self.personality,
collaboration_mode: Some(self.collaboration_mode.clone()),
realtime_active: Some(self.realtime_active),
effort: self.reasoning_effort,
summary: self.reasoning_summary,
user_instructions: self.user_instructions.clone(),
@@ -1092,7 +1076,6 @@ impl Session {
let (current_date, timezone) = local_time_context();
TurnContext {
sub_id,
realtime_active: false,
config: per_turn_config.clone(),
auth_manager: auth_manager_for_context,
model_info: model_info.clone(),
@@ -1774,13 +1757,13 @@ impl Session {
// TODO(ccunningham): Defer initial context insertion until the first real turn
// starts so it reflects the actual first-turn settings (permissions, etc.) and
// we do not emit model-visible "diff" updates before the first user message.
let items = self.build_initial_context(&turn_context).await;
let items = self.build_initial_context(&turn_context, None).await;
self.record_conversation_items(&turn_context, &items).await;
{
let mut state = self.state.lock().await;
state.set_reference_context_item(Some(turn_context.to_turn_context_item()));
}
self.set_previous_turn_settings(None).await;
self.set_previous_model(None).await;
// Ensure initial items are visible to immediate readers (e.g., tests, forks).
if !is_subagent {
self.flush_rollout().await;
@@ -1790,25 +1773,19 @@ impl Session {
let rollout_items = resumed_history.history;
let restored_tool_selection =
Self::extract_mcp_tool_selection_from_rollout(&rollout_items);
let reconstructed_rollout = self
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
.await;
let previous_turn_settings = reconstructed_rollout.previous_turn_settings.clone();
self.set_previous_turn_settings(previous_turn_settings.clone())
.await;
let previous_model = reconstructed_rollout.previous_model.clone();
let curr = turn_context.model_info.slug.as_str();
{
let mut state = self.state.lock().await;
state.set_reference_context_item(reconstructed_rollout.reference_context_item);
}
self.set_previous_model(previous_model.clone()).await;
// If resuming, warn when the last recorded model differs from the current one.
let curr: &str = turn_context.model_info.slug.as_str();
if let Some(prev) = previous_turn_settings
.as_ref()
.map(|settings| settings.model.as_str())
.filter(|model| *model != curr)
{
if let Some(prev) = previous_model.as_deref().filter(|p| *p != curr) {
warn!("resuming session with different model: previous={prev}, current={curr}");
self.send_event(
&turn_context,
@@ -1848,20 +1825,11 @@ impl Session {
InitialHistory::Forked(rollout_items) => {
let restored_tool_selection =
Self::extract_mcp_tool_selection_from_rollout(&rollout_items);
let reconstructed_rollout = self
.reconstruct_history_from_rollout(&turn_context, &rollout_items)
.await;
self.set_previous_turn_settings(
reconstructed_rollout.previous_turn_settings.clone(),
)
.await;
{
let mut state = self.state.lock().await;
state.set_reference_context_item(
reconstructed_rollout.reference_context_item.clone(),
);
}
let previous_model = reconstructed_rollout.previous_model.clone();
self.set_previous_model(previous_model).await;
// Always add response items to conversation history
let reconstructed_history = reconstructed_rollout.history;
@@ -1886,7 +1854,7 @@ impl Session {
}
// Append the current session's initial context after the reconstructed history.
let initial_context = self.build_initial_context(&turn_context).await;
let initial_context = self.build_initial_context(&turn_context, None).await;
self.record_conversation_items(&turn_context, &initial_context)
.await;
{
@@ -1960,17 +1928,14 @@ impl Session {
active_selected_tools
}
async fn previous_turn_settings(&self) -> Option<PreviousTurnSettings> {
async fn previous_model(&self) -> Option<String> {
let state = self.state.lock().await;
state.previous_turn_settings()
state.previous_model()
}
pub(crate) async fn set_previous_turn_settings(
&self,
previous_turn_settings: Option<PreviousTurnSettings>,
) {
pub(crate) async fn set_previous_model(&self, previous_model: Option<String>) {
let mut state = self.state.lock().await;
state.set_previous_turn_settings(previous_turn_settings);
state.set_previous_model(previous_model);
}
fn maybe_refresh_shell_snapshot_for_cwd(
@@ -2133,7 +2098,6 @@ impl Session {
Arc::clone(&self.js_repl),
skills_outcome,
);
turn_context.realtime_active = self.conversation.running_state().await.is_some();
if let Some(final_schema) = final_output_json_schema {
turn_context.final_output_json_schema = final_schema;
@@ -2295,24 +2259,21 @@ impl Session {
.await
}
async fn build_settings_update_items(
fn build_settings_update_items(
&self,
reference_context_item: Option<&TurnContextItem>,
previous_user_turn_model: Option<&str>,
current_context: &TurnContext,
) -> Vec<ResponseItem> {
// TODO: Make context updates a pure diff of persisted previous/current TurnContextItem
// state so replay/backtracking is deterministic. Runtime inputs that affect model-visible
// context (shell, exec policy, feature gates, previous-turn bridge) should be persisted
// context (shell, exec policy, feature gates, previous-model bridge) should be persisted
// state or explicit non-state replay events.
let previous_turn_settings = {
let state = self.state.lock().await;
state.previous_turn_settings()
};
let shell = self.user_shell();
let exec_policy = self.services.exec_policy.current();
crate::context_manager::updates::build_settings_update_items(
reference_context_item,
previous_turn_settings.as_ref(),
previous_user_turn_model,
current_context,
shell.as_ref(),
exec_policy.as_ref(),
@@ -2951,22 +2912,14 @@ impl Session {
pub(crate) async fn build_initial_context(
&self,
turn_context: &TurnContext,
previous_user_turn_model: Option<&str>,
) -> Vec<ResponseItem> {
let mut developer_sections = Vec::<String>::with_capacity(8);
let mut contextual_user_sections = Vec::<String>::with_capacity(2);
let shell = self.user_shell();
let (reference_context_item, previous_turn_settings, collaboration_mode, base_instructions) = {
let state = self.state.lock().await;
(
state.reference_context_item(),
state.previous_turn_settings(),
state.session_configuration.collaboration_mode.clone(),
state.session_configuration.base_instructions.clone(),
)
};
if let Some(model_switch_message) =
crate::context_manager::updates::build_model_instructions_update_item(
previous_turn_settings.as_ref(),
previous_user_turn_model,
turn_context,
)
{
@@ -2994,18 +2947,18 @@ impl Session {
developer_sections.push(memory_prompt);
}
// Add developer instructions from collaboration_mode if they exist and are non-empty
let (collaboration_mode, base_instructions) = {
let state = self.state.lock().await;
(
state.session_configuration.collaboration_mode.clone(),
state.session_configuration.base_instructions.clone(),
)
};
if let Some(collab_instructions) =
DeveloperInstructions::from_collaboration_mode(&collaboration_mode)
{
developer_sections.push(collab_instructions.into_text());
}
if let Some(realtime_update) = crate::context_manager::updates::build_initial_realtime_item(
reference_context_item.as_ref(),
previous_turn_settings.as_ref(),
turn_context,
) {
developer_sections.push(realtime_update.into_text());
}
if self.features.enabled(Feature::Personality)
&& let Some(personality) = turn_context.personality
{
@@ -3107,6 +3060,7 @@ impl Session {
pub(crate) async fn record_context_updates_and_set_reference_context_item(
&self,
turn_context: &TurnContext,
previous_user_turn_model: Option<&str>,
) {
let reference_context_item = {
let state = self.state.lock().await;
@@ -3114,11 +3068,15 @@ impl Session {
};
let should_inject_full_context = reference_context_item.is_none();
let context_items = if should_inject_full_context {
self.build_initial_context(turn_context).await
self.build_initial_context(turn_context, previous_user_turn_model)
.await
} else {
// Steady-state path: append only context diffs to minimize token overhead.
self.build_settings_update_items(reference_context_item.as_ref(), turn_context)
.await
self.build_settings_update_items(
reference_context_item.as_ref(),
previous_user_turn_model,
turn_context,
)
};
let turn_context_item = turn_context.to_turn_context_item();
if !context_items.is_empty() {
@@ -4384,8 +4342,8 @@ mod handlers {
// TODO(ccunningham): Fix rollback/backtracking baseline handling.
// We clear `reference_context_item` here, but should restore the
// post-rollback baseline from the surviving history/rollout instead.
// Truncating history should also invalidate/recompute `previous_turn_settings`
// so the next regular turn replays any dropped model/realtime
// Truncating history should also invalidate/recompute `previous_model`
// so the next regular turn replays any dropped model-switch
// instructions.
history.drop_last_n_user_turns(num_turns);
@@ -4629,7 +4587,6 @@ async fn spawn_review_thread(
let review_turn_context = TurnContext {
sub_id: review_turn_id,
realtime_active: parent_turn_context.realtime_active,
config: per_turn_config,
auth_manager: auth_manager_for_context,
model_info: model_info.clone(),
@@ -4789,8 +4746,12 @@ pub(crate) async fn run_turn(
let skills_outcome = Some(turn_context.turn_skills.outcome.as_ref());
sess.record_context_updates_and_set_reference_context_item(turn_context.as_ref())
.await;
let previous_model = sess.previous_model().await;
sess.record_context_updates_and_set_reference_context_item(
turn_context.as_ref(),
previous_model.as_deref(),
)
.await;
let available_connectors = if turn_context.config.features.enabled(Feature::Apps) {
let mcp_tools = match sess
@@ -4896,14 +4857,11 @@ pub(crate) async fn run_turn(
let response_item: ResponseItem = initial_input_for_turn.clone().into();
sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item)
.await;
// Track the previous-turn baseline from the regular user-turn path only so
// Track the previous-model baseline from the regular user-turn path only so
// standalone tasks (compact/shell/review/undo) cannot suppress future
// model/realtime injections.
sess.set_previous_turn_settings(Some(PreviousTurnSettings {
model: turn_context.model_info.slug.clone(),
realtime_active: Some(turn_context.realtime_active),
}))
.await;
// `<model_switch>` injections.
sess.set_previous_model(Some(turn_context.model_info.slug.clone()))
.await;
if !skill_items.is_empty() {
sess.record_conversation_items(&turn_context, &skill_items)
@@ -5011,6 +4969,7 @@ pub(crate) async fn run_turn(
&sess,
&turn_context,
InitialContextInjection::BeforeLastUserMessage,
previous_model.as_deref(),
)
.await
.is_err()
@@ -5135,7 +5094,13 @@ async fn run_pre_sampling_compact(
.unwrap_or(i64::MAX);
// Compact if the total usage tokens are greater than the auto compact limit
if total_usage_tokens >= auto_compact_limit {
run_auto_compact(sess, turn_context, InitialContextInjection::DoNotInject).await?;
run_auto_compact(
sess,
turn_context,
InitialContextInjection::DoNotInject,
None,
)
.await?;
}
Ok(())
}
@@ -5151,12 +5116,12 @@ async fn maybe_run_previous_model_inline_compact(
turn_context: &Arc<TurnContext>,
total_usage_tokens: i64,
) -> CodexResult<bool> {
let Some(previous_turn_settings) = sess.previous_turn_settings().await else {
let Some(previous_model) = sess.previous_model().await else {
return Ok(false);
};
let previous_model_turn_context = Arc::new(
turn_context
.with_model(previous_turn_settings.model, &sess.services.models_manager)
.with_model(previous_model, &sess.services.models_manager)
.await,
);
@@ -5178,6 +5143,7 @@ async fn maybe_run_previous_model_inline_compact(
sess,
&previous_model_turn_context,
InitialContextInjection::DoNotInject,
None,
)
.await?;
return Ok(true);
@@ -5189,12 +5155,14 @@ async fn run_auto_compact(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
initial_context_injection: InitialContextInjection,
previous_user_turn_model: Option<&str>,
) -> CodexResult<()> {
if should_use_remote_compact_task(&turn_context.provider) {
run_inline_remote_auto_compact_task(
Arc::clone(sess),
Arc::clone(turn_context),
initial_context_injection,
previous_user_turn_model,
)
.await?;
} else {
@@ -5202,6 +5170,7 @@ async fn run_auto_compact(
Arc::clone(sess),
Arc::clone(turn_context),
initial_context_injection,
previous_user_turn_model,
)
.await?;
}
@@ -6572,23 +6541,6 @@ mod tests {
}
}
fn developer_input_texts(items: &[ResponseItem]) -> Vec<&str> {
items
.iter()
.filter_map(|item| match item {
ResponseItem::Message { role, content, .. } if role == "developer" => {
Some(content.as_slice())
}
_ => None,
})
.flat_map(|content| content.iter())
.filter_map(|item| match item {
ContentItem::InputText { text } => Some(text.as_str()),
_ => None,
})
.collect()
}
fn make_connector(id: &str, name: &str) -> AppInfo {
AppInfo {
id: id.to_string(),
@@ -7240,14 +7192,14 @@ mod tests {
assert_eq!(expected, history_before_seed.raw_items());
session
.record_context_updates_and_set_reference_context_item(&turn_context)
.record_context_updates_and_set_reference_context_item(&turn_context, None)
.await;
expected.extend(session.build_initial_context(&turn_context).await);
expected.extend(session.build_initial_context(&turn_context, None).await);
let history_after_seed = session.clone_history().await;
assert_eq!(expected, history_after_seed.raw_items());
session
.record_context_updates_and_set_reference_context_item(&turn_context)
.record_context_updates_and_set_reference_context_item(&turn_context, None)
.await;
let history_after_second_seed = session.clone_history().await;
assert_eq!(
@@ -7407,7 +7359,7 @@ mod tests {
let reconstruction_turn = session.new_default_turn().await;
expected.extend(
session
.build_initial_context(reconstruction_turn.as_ref())
.build_initial_context(reconstruction_turn.as_ref(), None)
.await,
);
let history = session.state.lock().await.clone_history();
@@ -7415,7 +7367,7 @@ mod tests {
}
#[tokio::test]
async fn record_initial_history_forked_hydrates_previous_turn_settings() {
async fn record_initial_history_forked_hydrates_previous_model() {
let (session, turn_context) = make_session_and_context().await;
let previous_model = "forked-rollout-model";
let previous_context_item = TurnContextItem {
@@ -7429,7 +7381,6 @@ mod tests {
model: previous_model.to_string(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
realtime_active: Some(turn_context.realtime_active),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: None,
@@ -7471,11 +7422,8 @@ mod tests {
.await;
assert_eq!(
session.previous_turn_settings().await,
Some(PreviousTurnSettings {
model: previous_model.to_string(),
realtime_active: Some(turn_context.realtime_active),
})
session.previous_model().await,
Some(previous_model.to_string())
);
}
@@ -7483,7 +7431,7 @@ mod tests {
async fn thread_rollback_drops_last_turn_from_history() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let initial_context = sess.build_initial_context(tc.as_ref()).await;
let initial_context = sess.build_initial_context(tc.as_ref(), None).await;
sess.record_into_history(&initial_context, tc.as_ref())
.await;
@@ -7530,11 +7478,8 @@ mod tests {
},
];
sess.record_into_history(&turn_2, tc.as_ref()).await;
sess.set_previous_turn_settings(Some(PreviousTurnSettings {
model: "previous-regular-model".to_string(),
realtime_active: Some(tc.realtime_active),
}))
.await;
sess.set_previous_model(Some("previous-regular-model".to_string()))
.await;
handlers::thread_rollback(&sess, "sub-1".to_string(), 1).await;
@@ -7548,11 +7493,8 @@ mod tests {
let history = sess.clone_history().await;
assert_eq!(expected, history.raw_items());
assert_eq!(
sess.previous_turn_settings().await,
Some(PreviousTurnSettings {
model: "previous-regular-model".to_string(),
realtime_active: Some(tc.realtime_active),
})
sess.previous_model().await,
Some("previous-regular-model".to_string())
);
}
@@ -7560,7 +7502,7 @@ mod tests {
async fn thread_rollback_clears_history_when_num_turns_exceeds_existing_turns() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let initial_context = sess.build_initial_context(tc.as_ref()).await;
let initial_context = sess.build_initial_context(tc.as_ref(), None).await;
sess.record_into_history(&initial_context, tc.as_ref())
.await;
@@ -7588,7 +7530,7 @@ mod tests {
async fn thread_rollback_fails_when_turn_in_progress() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let initial_context = sess.build_initial_context(tc.as_ref()).await;
let initial_context = sess.build_initial_context(tc.as_ref(), None).await;
sess.record_into_history(&initial_context, tc.as_ref())
.await;
@@ -7609,7 +7551,7 @@ mod tests {
async fn thread_rollback_fails_when_num_turns_is_zero() {
let (sess, tc, rx) = make_session_and_context_with_rx().await;
let initial_context = sess.build_initial_context(tc.as_ref()).await;
let initial_context = sess.build_initial_context(tc.as_ref(), None).await;
sess.record_into_history(&initial_context, tc.as_ref())
.await;
@@ -8572,9 +8514,9 @@ mod tests {
}
#[tokio::test]
async fn spawn_task_does_not_update_previous_turn_settings_for_non_run_turn_tasks() {
async fn spawn_task_does_not_update_previous_model_for_non_run_turn_tasks() {
let (sess, tc, _rx) = make_session_and_context_with_rx().await;
sess.set_previous_turn_settings(None).await;
sess.set_previous_model(None).await;
let input = vec![UserInput::Text {
text: "hello".to_string(),
text_elements: Vec::new(),
@@ -8591,7 +8533,7 @@ mod tests {
.await;
sess.abort_all_tasks(TurnAbortReason::Interrupted).await;
assert_eq!(sess.previous_turn_settings().await, None);
assert_eq!(sess.previous_model().await, None);
}
#[tokio::test]
@@ -8630,9 +8572,11 @@ mod tests {
current_context.config = Arc::new(config);
let reference_context_item = previous_context.to_turn_context_item();
let update_items = session
.build_settings_update_items(Some(&reference_context_item), &current_context)
.await;
let update_items = session.build_settings_update_items(
Some(&reference_context_item),
None,
&current_context,
);
let environment_update = update_items
.iter()
@@ -8665,9 +8609,11 @@ mod tests {
current_context.timezone = Some("Europe/Berlin".to_string());
let reference_context_item = previous_context.to_turn_context_item();
let update_items = session
.build_settings_update_items(Some(&reference_context_item), &current_context)
.await;
let update_items = session.build_settings_update_items(
Some(&reference_context_item),
None,
&current_context,
);
let environment_update = update_items
.iter()
@@ -8685,176 +8631,15 @@ mod tests {
assert!(environment_update.contains("<timezone>Europe/Berlin</timezone>"));
}
#[tokio::test]
async fn build_settings_update_items_emits_realtime_start_when_session_becomes_live() {
let (session, previous_context) = make_session_and_context().await;
let previous_context = Arc::new(previous_context);
let mut current_context = previous_context
.with_model(
previous_context.model_info.slug.clone(),
&session.services.models_manager,
)
.await;
current_context.realtime_active = true;
let update_items = session
.build_settings_update_items(
Some(&previous_context.to_turn_context_item()),
&current_context,
)
.await;
let developer_texts = developer_input_texts(&update_items);
assert!(
developer_texts
.iter()
.any(|text| text.contains("<realtime_conversation>")),
"expected a realtime start update, got {developer_texts:?}"
);
}
#[tokio::test]
async fn build_settings_update_items_emits_realtime_end_when_session_stops_being_live() {
let (session, mut previous_context) = make_session_and_context().await;
previous_context.realtime_active = true;
let mut current_context = previous_context
.with_model(
previous_context.model_info.slug.clone(),
&session.services.models_manager,
)
.await;
current_context.realtime_active = false;
let update_items = session
.build_settings_update_items(
Some(&previous_context.to_turn_context_item()),
&current_context,
)
.await;
let developer_texts = developer_input_texts(&update_items);
assert!(
developer_texts
.iter()
.any(|text| text.contains("Reason: inactive")),
"expected a realtime end update, got {developer_texts:?}"
);
}
#[tokio::test]
async fn build_settings_update_items_uses_previous_turn_settings_for_realtime_end() {
let (session, previous_context) = make_session_and_context().await;
let mut previous_context_item = previous_context.to_turn_context_item();
previous_context_item.realtime_active = None;
let previous_turn_settings = PreviousTurnSettings {
model: previous_context.model_info.slug.clone(),
realtime_active: Some(true),
};
let mut current_context = previous_context
.with_model(
previous_context.model_info.slug.clone(),
&session.services.models_manager,
)
.await;
current_context.realtime_active = false;
session
.set_previous_turn_settings(Some(previous_turn_settings))
.await;
let update_items = session
.build_settings_update_items(Some(&previous_context_item), &current_context)
.await;
let developer_texts = developer_input_texts(&update_items);
assert!(
developer_texts
.iter()
.any(|text| text.contains("Reason: inactive")),
"expected a realtime end update from previous turn settings, got {developer_texts:?}"
);
}
#[tokio::test]
async fn build_initial_context_uses_previous_realtime_state() {
let (session, mut turn_context) = make_session_and_context().await;
turn_context.realtime_active = true;
let initial_context = session.build_initial_context(&turn_context).await;
let developer_texts = developer_input_texts(&initial_context);
assert!(
developer_texts
.iter()
.any(|text| text.contains("<realtime_conversation>")),
"expected initial context to describe active realtime state, got {developer_texts:?}"
);
let previous_context_item = turn_context.to_turn_context_item();
{
let mut state = session.state.lock().await;
state.set_reference_context_item(Some(previous_context_item));
}
let resumed_context = session.build_initial_context(&turn_context).await;
let resumed_developer_texts = developer_input_texts(&resumed_context);
assert!(
!resumed_developer_texts
.iter()
.any(|text| text.contains("<realtime_conversation>")),
"did not expect a duplicate realtime update, got {resumed_developer_texts:?}"
);
}
#[tokio::test]
async fn build_initial_context_uses_previous_turn_settings_for_realtime_end() {
let (session, turn_context) = make_session_and_context().await;
let previous_turn_settings = PreviousTurnSettings {
model: turn_context.model_info.slug.clone(),
realtime_active: Some(true),
};
session
.set_previous_turn_settings(Some(previous_turn_settings))
.await;
let initial_context = session.build_initial_context(&turn_context).await;
let developer_texts = developer_input_texts(&initial_context);
assert!(
developer_texts
.iter()
.any(|text| text.contains("Reason: inactive")),
"expected initial context to describe an ended realtime session, got {developer_texts:?}"
);
}
#[tokio::test]
async fn build_initial_context_restates_realtime_start_when_reference_context_is_missing() {
let (session, mut turn_context) = make_session_and_context().await;
turn_context.realtime_active = true;
let previous_turn_settings = PreviousTurnSettings {
model: turn_context.model_info.slug.clone(),
realtime_active: Some(true),
};
session
.set_previous_turn_settings(Some(previous_turn_settings))
.await;
let initial_context = session.build_initial_context(&turn_context).await;
let developer_texts = developer_input_texts(&initial_context);
assert!(
developer_texts
.iter()
.any(|text| text.contains("<realtime_conversation>")),
"expected initial context to restate active realtime when the reference context is missing, got {developer_texts:?}"
);
}
#[tokio::test]
async fn record_context_updates_and_set_reference_context_item_injects_full_context_when_baseline_missing()
{
let (session, turn_context) = make_session_and_context().await;
session
.record_context_updates_and_set_reference_context_item(&turn_context)
.record_context_updates_and_set_reference_context_item(&turn_context, None)
.await;
let history = session.clone_history().await;
let initial_context = session.build_initial_context(&turn_context).await;
let initial_context = session.build_initial_context(&turn_context, None).await;
assert_eq!(history.raw_items().to_vec(), initial_context);
let current_context = session.reference_context_item().await;
@@ -8882,7 +8667,7 @@ mod tests {
.record_into_history(std::slice::from_ref(&compacted_summary), &turn_context)
.await;
session
.record_context_updates_and_set_reference_context_item(&turn_context)
.record_context_updates_and_set_reference_context_item(&turn_context, None)
.await;
{
let mut state = session.state.lock().await;
@@ -8893,12 +8678,12 @@ mod tests {
.await;
session
.record_context_updates_and_set_reference_context_item(&turn_context)
.record_context_updates_and_set_reference_context_item(&turn_context, None)
.await;
let history = session.clone_history().await;
let mut expected_history = vec![compacted_summary];
expected_history.extend(session.build_initial_context(&turn_context).await);
expected_history.extend(session.build_initial_context(&turn_context, None).await);
assert_eq!(history.raw_items().to_vec(), expected_history);
}
@@ -8941,13 +8726,12 @@ mod tests {
*rollout = Some(recorder);
}
let update_items = session
.build_settings_update_items(Some(&previous_context_item), &turn_context)
.await;
let update_items =
session.build_settings_update_items(Some(&previous_context_item), None, &turn_context);
assert_eq!(update_items, Vec::new());
session
.record_context_updates_and_set_reference_context_item(&turn_context)
.record_context_updates_and_set_reference_context_item(&turn_context, None)
.await;
assert_eq!(
@@ -8984,15 +8768,10 @@ mod tests {
#[tokio::test]
async fn build_initial_context_prepends_model_switch_message() {
let (session, turn_context) = make_session_and_context().await;
let previous_turn_settings = PreviousTurnSettings {
model: "previous-regular-model".to_string(),
realtime_active: None,
};
session
.set_previous_turn_settings(Some(previous_turn_settings))
let initial_context = session
.build_initial_context(&turn_context, Some("previous-regular-model"))
.await;
let initial_context = session.build_initial_context(&turn_context).await;
let ResponseItem::Message { role, content, .. } = &initial_context[0] else {
panic!("expected developer message");
@@ -9054,13 +8833,10 @@ mod tests {
}
session
.set_previous_turn_settings(Some(PreviousTurnSettings {
model: previous_context.model_info.slug.clone(),
realtime_active: Some(previous_context.realtime_active),
}))
.await;
session
.record_context_updates_and_set_reference_context_item(&turn_context)
.record_context_updates_and_set_reference_context_item(
&turn_context,
Some(previous_context.model_info.slug.as_str()),
)
.await;
session.ensure_rollout_materialized().await;
session.flush_rollout().await;
@@ -9479,7 +9255,7 @@ mod tests {
// personality_spec) matches reconstruction.
let reconstruction_turn = session.new_default_turn().await;
let mut initial_context = session
.build_initial_context(reconstruction_turn.as_ref())
.build_initial_context(reconstruction_turn.as_ref(), None)
.await;
// Ensure personality_spec is present when Personality is enabled, so expected matches
// what reconstruction produces (build_initial_context may omit it when baked into model).

View File

@@ -5,7 +5,7 @@ use super::*;
#[derive(Debug)]
pub(super) struct RolloutReconstruction {
pub(super) history: Vec<ResponseItem>,
pub(super) previous_turn_settings: Option<PreviousTurnSettings>,
pub(super) previous_model: Option<String>,
pub(super) reference_context_item: Option<TurnContextItem>,
}
@@ -29,7 +29,7 @@ enum TurnReferenceContextItem {
struct ActiveReplaySegment<'a> {
turn_id: Option<String>,
counts_as_user_turn: bool,
previous_turn_settings: Option<PreviousTurnSettings>,
previous_model: Option<String>,
reference_context_item: TurnReferenceContextItem,
base_replacement_history: Option<&'a [ResponseItem]>,
}
@@ -42,7 +42,7 @@ fn turn_ids_are_compatible(active_turn_id: Option<&str>, item_turn_id: Option<&s
fn finalize_active_segment<'a>(
active_segment: ActiveReplaySegment<'a>,
base_replacement_history: &mut Option<&'a [ResponseItem]>,
previous_turn_settings: &mut Option<PreviousTurnSettings>,
previous_model: &mut Option<String>,
reference_context_item: &mut TurnReferenceContextItem,
pending_rollback_turns: &mut usize,
) {
@@ -64,9 +64,9 @@ fn finalize_active_segment<'a>(
*base_replacement_history = Some(segment_base_replacement_history);
}
// `previous_turn_settings` come from the newest surviving user turn that established them.
if previous_turn_settings.is_none() && active_segment.counts_as_user_turn {
*previous_turn_settings = active_segment.previous_turn_settings;
// `previous_model` comes from the newest surviving user turn that established one.
if previous_model.is_none() && active_segment.counts_as_user_turn {
*previous_model = active_segment.previous_model;
}
// `reference_context_item` comes from the newest surviving user turn baseline, or
@@ -94,7 +94,7 @@ impl Session {
// are both known; then replay only the buffered surviving tail forward to preserve exact
// history semantics.
let mut base_replacement_history: Option<&[ResponseItem]> = None;
let mut previous_turn_settings = None;
let mut previous_model = None;
let mut reference_context_item = TurnReferenceContextItem::NeverSet;
// Rollback is "drop the newest N user turns". While scanning in reverse, that becomes
// "skip the next N user-turn segments we finalize".
@@ -170,10 +170,7 @@ impl Session {
active_segment.turn_id.as_deref(),
ctx.turn_id.as_deref(),
) {
active_segment.previous_turn_settings = Some(PreviousTurnSettings {
model: ctx.model.clone(),
realtime_active: ctx.realtime_active,
});
active_segment.previous_model = Some(ctx.model.clone());
if matches!(
active_segment.reference_context_item,
TurnReferenceContextItem::NeverSet
@@ -195,7 +192,7 @@ impl Session {
finalize_active_segment(
active_segment,
&mut base_replacement_history,
&mut previous_turn_settings,
&mut previous_model,
&mut reference_context_item,
&mut pending_rollback_turns,
);
@@ -207,7 +204,7 @@ impl Session {
}
if base_replacement_history.is_some()
&& previous_turn_settings.is_some()
&& previous_model.is_some()
&& !matches!(reference_context_item, TurnReferenceContextItem::NeverSet)
{
// At this point we have both eager resume metadata values and the replacement-
@@ -221,7 +218,7 @@ impl Session {
finalize_active_segment(
active_segment,
&mut base_replacement_history,
&mut previous_turn_settings,
&mut previous_model,
&mut reference_context_item,
&mut pending_rollback_turns,
);
@@ -290,7 +287,7 @@ impl Session {
RolloutReconstruction {
history: history.raw_items().to_vec(),
previous_turn_settings,
previous_model,
reference_context_item,
}
}

View File

@@ -34,8 +34,7 @@ fn assistant_message(text: &str) -> ResponseItem {
}
#[tokio::test]
async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previous_turn_settings()
{
async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previous_model() {
let (session, turn_context) = make_session_and_context().await;
let previous_model = "previous-rollout-model";
let previous_context_item = TurnContextItem {
@@ -49,7 +48,6 @@ async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previ
model: previous_model.to_string(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
realtime_active: Some(turn_context.realtime_active),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: None,
@@ -67,12 +65,12 @@ async fn record_initial_history_resumed_bare_turn_context_does_not_hydrate_previ
}))
.await;
assert_eq!(session.previous_turn_settings().await, None);
assert_eq!(session.previous_model().await, None);
assert!(session.reference_context_item().await.is_none());
}
#[tokio::test]
async fn record_initial_history_resumed_hydrates_previous_turn_settings_from_lifecycle_turn_with_missing_turn_context_id()
async fn record_initial_history_resumed_hydrates_previous_model_from_lifecycle_turn_with_missing_turn_context_id()
{
let (session, turn_context) = make_session_and_context().await;
let previous_model = "previous-rollout-model";
@@ -87,7 +85,6 @@ async fn record_initial_history_resumed_hydrates_previous_turn_settings_from_lif
model: previous_model.to_string(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
realtime_active: Some(turn_context.realtime_active),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: None,
@@ -135,11 +132,8 @@ async fn record_initial_history_resumed_hydrates_previous_turn_settings_from_lif
.await;
assert_eq!(
session.previous_turn_settings().await,
Some(PreviousTurnSettings {
model: previous_model.to_string(),
realtime_active: Some(turn_context.realtime_active),
})
session.previous_model().await,
Some(previous_model.to_string())
);
}
@@ -226,11 +220,8 @@ async fn reconstruct_history_rollback_keeps_history_and_metadata_in_sync_for_com
vec![turn_one_user, turn_one_assistant]
);
assert_eq!(
reconstructed.previous_turn_settings,
Some(PreviousTurnSettings {
model: turn_context.model_info.slug.clone(),
realtime_active: Some(turn_context.realtime_active),
})
reconstructed.previous_model,
Some(turn_context.model_info.slug.clone())
);
assert_eq!(
serde_json::to_value(reconstructed.reference_context_item)
@@ -308,11 +299,8 @@ async fn reconstruct_history_rollback_keeps_history_and_metadata_in_sync_for_inc
vec![turn_one_user, turn_one_assistant]
);
assert_eq!(
reconstructed.previous_turn_settings,
Some(PreviousTurnSettings {
model: turn_context.model_info.slug.clone(),
realtime_active: Some(turn_context.realtime_active),
})
reconstructed.previous_model,
Some(turn_context.model_info.slug.clone())
);
assert_eq!(
serde_json::to_value(reconstructed.reference_context_item)
@@ -414,11 +402,8 @@ async fn reconstruct_history_rollback_skips_non_user_turns_for_history_and_metad
vec![turn_one_user, turn_one_assistant]
);
assert_eq!(
reconstructed.previous_turn_settings,
Some(PreviousTurnSettings {
model: turn_context.model_info.slug.clone(),
realtime_active: Some(turn_context.realtime_active),
})
reconstructed.previous_model,
Some(turn_context.model_info.slug.clone())
);
assert_eq!(
serde_json::to_value(reconstructed.reference_context_item)
@@ -471,7 +456,7 @@ async fn reconstruct_history_rollback_clears_history_and_metadata_when_exceeding
.await;
assert_eq!(reconstructed.history, Vec::new());
assert_eq!(reconstructed.previous_turn_settings, None);
assert_eq!(reconstructed.previous_model, None);
assert!(reconstructed.reference_context_item.is_none());
}
@@ -534,7 +519,7 @@ async fn record_initial_history_resumed_rollback_skips_only_user_turns() {
}))
.await;
assert_eq!(session.previous_turn_settings().await, None);
assert_eq!(session.previous_model().await, None);
assert!(session.reference_context_item().await.is_none());
}
@@ -604,11 +589,8 @@ async fn record_initial_history_resumed_rollback_drops_incomplete_user_turn_comp
.await;
assert_eq!(
session.previous_turn_settings().await,
Some(PreviousTurnSettings {
model: turn_context.model_info.slug.clone(),
realtime_active: Some(turn_context.realtime_active),
})
session.previous_model().await,
Some(turn_context.model_info.slug.clone())
);
assert_eq!(
serde_json::to_value(session.reference_context_item().await)
@@ -655,7 +637,7 @@ async fn record_initial_history_resumed_does_not_seed_reference_context_item_aft
}))
.await;
assert_eq!(session.previous_turn_settings().await, None);
assert_eq!(session.previous_model().await, None);
assert!(session.reference_context_item().await.is_none());
}
@@ -748,7 +730,6 @@ async fn record_initial_history_resumed_turn_context_after_compaction_reestablis
model: previous_model.to_string(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
realtime_active: Some(turn_context.realtime_active),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: None,
@@ -799,11 +780,8 @@ async fn record_initial_history_resumed_turn_context_after_compaction_reestablis
.await;
assert_eq!(
session.previous_turn_settings().await,
Some(PreviousTurnSettings {
model: previous_model.to_string(),
realtime_active: Some(turn_context.realtime_active),
})
session.previous_model().await,
Some(previous_model.to_string())
);
assert_eq!(
serde_json::to_value(session.reference_context_item().await)
@@ -819,7 +797,6 @@ async fn record_initial_history_resumed_turn_context_after_compaction_reestablis
model: previous_model.to_string(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
realtime_active: Some(turn_context.realtime_active),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: None,
@@ -847,7 +824,6 @@ async fn record_initial_history_resumed_aborted_turn_without_id_clears_active_tu
model: previous_model.to_string(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
realtime_active: Some(turn_context.realtime_active),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: None,
@@ -920,11 +896,8 @@ async fn record_initial_history_resumed_aborted_turn_without_id_clears_active_tu
.await;
assert_eq!(
session.previous_turn_settings().await,
Some(PreviousTurnSettings {
model: previous_model.to_string(),
realtime_active: Some(turn_context.realtime_active),
})
session.previous_model().await,
Some(previous_model.to_string())
);
assert!(session.reference_context_item().await.is_none());
}
@@ -952,7 +925,6 @@ async fn record_initial_history_resumed_unmatched_abort_preserves_active_turn_fo
model: current_model.to_string(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
realtime_active: Some(turn_context.realtime_active),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: None,
@@ -1023,11 +995,8 @@ async fn record_initial_history_resumed_unmatched_abort_preserves_active_turn_fo
.await;
assert_eq!(
session.previous_turn_settings().await,
Some(PreviousTurnSettings {
model: current_model.to_string(),
realtime_active: Some(turn_context.realtime_active),
})
session.previous_model().await,
Some(current_model.to_string())
);
assert_eq!(
serde_json::to_value(session.reference_context_item().await)
@@ -1053,7 +1022,6 @@ async fn record_initial_history_resumed_trailing_incomplete_turn_compaction_clea
model: previous_model.to_string(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
realtime_active: Some(turn_context.realtime_active),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: None,
@@ -1120,11 +1088,8 @@ async fn record_initial_history_resumed_trailing_incomplete_turn_compaction_clea
.await;
assert_eq!(
session.previous_turn_settings().await,
Some(PreviousTurnSettings {
model: previous_model.to_string(),
realtime_active: Some(turn_context.realtime_active),
})
session.previous_model().await,
Some(previous_model.to_string())
);
assert!(session.reference_context_item().await.is_none());
}
@@ -1166,11 +1131,8 @@ async fn record_initial_history_resumed_trailing_incomplete_turn_preserves_turn_
.await;
assert_eq!(
session.previous_turn_settings().await,
Some(PreviousTurnSettings {
model: turn_context.model_info.slug.clone(),
realtime_active: Some(turn_context.realtime_active),
})
session.previous_model().await,
Some(turn_context.model_info.slug.clone())
);
assert_eq!(
serde_json::to_value(session.reference_context_item().await)
@@ -1196,7 +1158,6 @@ async fn record_initial_history_resumed_replaced_incomplete_compacted_turn_clear
model: previous_model.to_string(),
personality: turn_context.personality,
collaboration_mode: Some(turn_context.collaboration_mode.clone()),
realtime_active: Some(turn_context.realtime_active),
effort: turn_context.reasoning_effort,
summary: turn_context.reasoning_summary,
user_instructions: None,
@@ -1273,11 +1234,8 @@ async fn record_initial_history_resumed_replaced_incomplete_compacted_turn_clear
.await;
assert_eq!(
session.previous_turn_settings().await,
Some(PreviousTurnSettings {
model: previous_model.to_string(),
realtime_active: Some(turn_context.realtime_active),
})
session.previous_model().await,
Some(previous_model.to_string())
);
assert!(session.reference_context_item().await.is_none());
}

View File

@@ -4,8 +4,6 @@ use crate::ModelProviderInfo;
use crate::Prompt;
use crate::client::ModelClientSession;
use crate::client_common::ResponseEvent;
#[cfg(test)]
use crate::codex::PreviousTurnSettings;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::codex::get_last_assistant_message_from_turn;
@@ -55,6 +53,7 @@ pub(crate) async fn run_inline_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
initial_context_injection: InitialContextInjection,
previous_user_turn_model: Option<&str>,
) -> CodexResult<()> {
let prompt = turn_context.compact_prompt().to_string();
let input = vec![UserInput::Text {
@@ -63,7 +62,14 @@ pub(crate) async fn run_inline_auto_compact_task(
text_elements: Vec::new(),
}];
run_compact_task_inner(sess, turn_context, input, initial_context_injection).await?;
run_compact_task_inner(
sess,
turn_context,
input,
initial_context_injection,
previous_user_turn_model,
)
.await?;
Ok(())
}
@@ -83,6 +89,7 @@ pub(crate) async fn run_compact_task(
turn_context,
input,
InitialContextInjection::DoNotInject,
None,
)
.await
}
@@ -92,6 +99,7 @@ async fn run_compact_task_inner(
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
initial_context_injection: InitialContextInjection,
previous_user_turn_model: Option<&str>,
) -> CodexResult<()> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(&turn_context, &compaction_item)
@@ -199,7 +207,9 @@ async fn run_compact_task_inner(
initial_context_injection,
InitialContextInjection::BeforeLastUserMessage
) {
let initial_context = sess.build_initial_context(turn_context.as_ref()).await;
let initial_context = sess
.build_initial_context(turn_context.as_ref(), previous_user_turn_model)
.await;
new_history =
insert_initial_context_before_last_real_user_or_summary(new_history, initial_context);
}
@@ -443,18 +453,18 @@ mod tests {
async fn process_compacted_history_with_test_session(
compacted_history: Vec<ResponseItem>,
previous_turn_settings: Option<&PreviousTurnSettings>,
previous_user_turn_model: Option<&str>,
) -> (Vec<ResponseItem>, Vec<ResponseItem>) {
let (session, turn_context) = crate::codex::make_session_and_context().await;
session
.set_previous_turn_settings(previous_turn_settings.cloned())
let initial_context = session
.build_initial_context(&turn_context, previous_user_turn_model)
.await;
let initial_context = session.build_initial_context(&turn_context).await;
let refreshed = crate::compact_remote::process_compacted_history(
&session,
&turn_context,
compacted_history,
InitialContextInjection::BeforeLastUserMessage,
previous_user_turn_model,
)
.await;
(refreshed, initial_context)
@@ -849,14 +859,10 @@ keep me updated
end_turn: None,
phase: None,
}];
let previous_turn_settings = PreviousTurnSettings {
model: "previous-regular-model".to_string(),
realtime_active: None,
};
let (refreshed, initial_context) = process_compacted_history_with_test_session(
compacted_history,
Some(&previous_turn_settings),
Some("previous-regular-model"),
)
.await;

View File

@@ -26,8 +26,15 @@ pub(crate) async fn run_inline_remote_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
initial_context_injection: InitialContextInjection,
previous_user_turn_model: Option<&str>,
) -> CodexResult<()> {
run_remote_compact_task_inner(&sess, &turn_context, initial_context_injection).await?;
run_remote_compact_task_inner(
&sess,
&turn_context,
initial_context_injection,
previous_user_turn_model,
)
.await?;
Ok(())
}
@@ -42,16 +49,28 @@ pub(crate) async fn run_remote_compact_task(
});
sess.send_event(&turn_context, start_event).await;
run_remote_compact_task_inner(&sess, &turn_context, InitialContextInjection::DoNotInject).await
run_remote_compact_task_inner(
&sess,
&turn_context,
InitialContextInjection::DoNotInject,
None,
)
.await
}
async fn run_remote_compact_task_inner(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
initial_context_injection: InitialContextInjection,
previous_user_turn_model: Option<&str>,
) -> CodexResult<()> {
if let Err(err) =
run_remote_compact_task_inner_impl(sess, turn_context, initial_context_injection).await
if let Err(err) = run_remote_compact_task_inner_impl(
sess,
turn_context,
initial_context_injection,
previous_user_turn_model,
)
.await
{
let event = EventMsg::Error(
err.to_error_event(Some("Error running remote compact task".to_string())),
@@ -66,6 +85,7 @@ async fn run_remote_compact_task_inner_impl(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
initial_context_injection: InitialContextInjection,
previous_user_turn_model: Option<&str>,
) -> CodexResult<()> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(turn_context, &compaction_item)
@@ -127,6 +147,7 @@ async fn run_remote_compact_task_inner_impl(
turn_context.as_ref(),
new_history,
initial_context_injection,
previous_user_turn_model,
)
.await;
@@ -155,6 +176,7 @@ pub(crate) async fn process_compacted_history(
turn_context: &TurnContext,
mut compacted_history: Vec<ResponseItem>,
initial_context_injection: InitialContextInjection,
previous_user_turn_model: Option<&str>,
) -> Vec<ResponseItem> {
// Mid-turn compaction is the only path that must inject initial context above the last user
// message in the replacement history. Pre-turn compaction instead injects context after the
@@ -163,7 +185,8 @@ pub(crate) async fn process_compacted_history(
initial_context_injection,
InitialContextInjection::BeforeLastUserMessage
) {
sess.build_initial_context(turn_context).await
sess.build_initial_context(turn_context, previous_user_turn_model)
.await
} else {
Vec::new()
};

View File

@@ -1,4 +1,3 @@
use crate::codex::PreviousTurnSettings;
use crate::codex::TurnContext;
use crate::environment_context::EnvironmentContext;
use crate::features::Feature;
@@ -65,33 +64,6 @@ fn build_collaboration_mode_update_item(
}
}
pub(crate) fn build_realtime_update_item(
previous: Option<&TurnContextItem>,
previous_turn_settings: Option<&PreviousTurnSettings>,
next: &TurnContext,
) -> Option<DeveloperInstructions> {
match (
previous.and_then(|item| item.realtime_active),
next.realtime_active,
) {
(Some(true), false) => Some(DeveloperInstructions::realtime_end_message("inactive")),
(Some(false), true) | (None, true) => Some(DeveloperInstructions::realtime_start_message()),
(Some(true), true) | (Some(false), false) => None,
(None, false) => previous_turn_settings
.and_then(|settings| settings.realtime_active)
.filter(|realtime_active| *realtime_active)
.map(|_| DeveloperInstructions::realtime_end_message("inactive")),
}
}
pub(crate) fn build_initial_realtime_item(
previous: Option<&TurnContextItem>,
previous_turn_settings: Option<&PreviousTurnSettings>,
next: &TurnContext,
) -> Option<DeveloperInstructions> {
build_realtime_update_item(previous, previous_turn_settings, next)
}
fn build_personality_update_item(
previous: Option<&TurnContextItem>,
next: &TurnContext,
@@ -128,11 +100,11 @@ pub(crate) fn personality_message_for(
}
pub(crate) fn build_model_instructions_update_item(
previous_turn_settings: Option<&PreviousTurnSettings>,
previous_user_turn_model: Option<&str>,
next: &TurnContext,
) -> Option<DeveloperInstructions> {
let previous_turn_settings = previous_turn_settings?;
if previous_turn_settings.model == next.model_info.slug {
let previous_model = previous_user_turn_model?;
if previous_model == next.model_info.slug {
return None;
}
@@ -175,7 +147,7 @@ fn build_text_message(role: &str, text_sections: Vec<String>) -> Option<Response
pub(crate) fn build_settings_update_items(
previous: Option<&TurnContextItem>,
previous_turn_settings: Option<&PreviousTurnSettings>,
previous_user_turn_model: Option<&str>,
next: &TurnContext,
shell: &Shell,
exec_policy: &Policy,
@@ -185,10 +157,9 @@ pub(crate) fn build_settings_update_items(
let developer_update_sections = [
// Keep model-switch instructions first so model-specific guidance is read before
// any other context diffs on this turn.
build_model_instructions_update_item(previous_turn_settings, next),
build_model_instructions_update_item(previous_user_turn_model, next),
build_permissions_update_item(previous, next, exec_policy),
build_collaboration_mode_update_item(previous, next),
build_realtime_update_item(previous, previous_turn_settings, next),
build_personality_update_item(previous, next, personality_feature_enabled),
]
.into_iter()

View File

@@ -27,8 +27,6 @@ use codex_protocol::protocol::RealtimeConversationStartedEvent;
use http::HeaderMap;
use serde_json::Value;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::debug;
@@ -49,7 +47,6 @@ struct ConversationState {
audio_tx: Sender<RealtimeAudioFrame>,
text_tx: Sender<String>,
task: JoinHandle<()>,
realtime_active: Arc<AtomicBool>,
}
#[allow(dead_code)]
@@ -62,9 +59,7 @@ impl RealtimeConversationManager {
pub(crate) async fn running_state(&self) -> Option<()> {
let state = self.state.lock().await;
state
.as_ref()
.and_then(|state| state.realtime_active.load(Ordering::Relaxed).then_some(()))
state.as_ref().map(|_| ())
}
pub(crate) async fn start(
@@ -73,13 +68,12 @@ impl RealtimeConversationManager {
extra_headers: Option<HeaderMap>,
prompt: String,
session_id: Option<String>,
) -> CodexResult<(Receiver<RealtimeEvent>, Arc<AtomicBool>)> {
) -> CodexResult<Receiver<RealtimeEvent>> {
let previous_state = {
let mut guard = self.state.lock().await;
guard.take()
};
if let Some(state) = previous_state {
state.realtime_active.store(false, Ordering::Relaxed);
state.task.abort();
let _ = state.task.await;
}
@@ -103,7 +97,6 @@ impl RealtimeConversationManager {
let (events_tx, events_rx) =
async_channel::bounded::<RealtimeEvent>(OUTPUT_EVENTS_QUEUE_CAPACITY);
let realtime_active = Arc::new(AtomicBool::new(true));
let task = spawn_realtime_input_task(writer, events, text_rx, audio_rx, events_tx);
let mut guard = self.state.lock().await;
@@ -111,9 +104,8 @@ impl RealtimeConversationManager {
audio_tx,
text_tx,
task,
realtime_active: Arc::clone(&realtime_active),
});
Ok((events_rx, realtime_active))
Ok(events_rx)
}
pub(crate) async fn audio_in(&self, frame: RealtimeAudioFrame) -> CodexResult<()> {
@@ -166,7 +158,6 @@ impl RealtimeConversationManager {
};
if let Some(state) = state {
state.realtime_active.store(false, Ordering::Relaxed);
state.task.abort();
let _ = state.task.await;
}
@@ -195,7 +186,7 @@ pub(crate) async fn handle_start(
.session_id
.or_else(|| Some(sess.conversation_id.to_string()));
info!("starting realtime conversation");
let (events_rx, realtime_active) = match sess
let events_rx = match sess
.conversation
.start(api_provider, None, prompt, requested_session_id.clone())
.await
@@ -245,7 +236,7 @@ pub(crate) async fn handle_start(
)))
.await;
}
if realtime_active.swap(false, Ordering::Relaxed) {
if let Some(()) = sess_clone.conversation.running_state().await {
info!("realtime conversation transport closed");
sess_clone
.send_event_raw(ev(EventMsg::RealtimeConversationClosed(

View File

@@ -1402,7 +1402,6 @@ mod tests {
model: "test-model".to_string(),
personality: None,
collaboration_mode: None,
realtime_active: None,
effort: None,
summary: ReasoningSummaryConfig::Auto,
user_instructions: None,

View File

@@ -193,7 +193,7 @@ mod tests {
#[tokio::test]
async fn ignores_session_prefix_messages_when_truncating_rollout_from_start() {
let (session, turn_context) = make_session_and_context().await;
let mut items = session.build_initial_context(&turn_context).await;
let mut items = session.build_initial_context(&turn_context, None).await;
items.push(user_msg("feature request"));
items.push(assistant_msg("ack"));
items.push(user_msg("second question"));

View File

@@ -5,7 +5,6 @@ use std::collections::HashMap;
use std::collections::HashSet;
use tokio::task::JoinHandle;
use crate::codex::PreviousTurnSettings;
use crate::codex::SessionConfiguration;
use crate::context_manager::ContextManager;
use crate::error::Result as CodexResult;
@@ -24,10 +23,10 @@ pub(crate) struct SessionState {
pub(crate) server_reasoning_included: bool,
pub(crate) dependency_env: HashMap<String, String>,
pub(crate) mcp_dependency_prompted: HashSet<String>,
/// Settings used by the latest regular user turn, used for turn-to-turn
/// model/realtime handling on subsequent regular turns (including full-context
/// reinjection after resume or `/compact`).
previous_turn_settings: Option<PreviousTurnSettings>,
/// Model used by the latest regular user turn, used for model-switch handling
/// on subsequent regular turns (including full-context reinjection after
/// resume or `/compact`).
previous_model: Option<String>,
/// Startup regular task pre-created during session initialization.
pub(crate) startup_regular_task: Option<JoinHandle<CodexResult<RegularTask>>>,
pub(crate) active_mcp_tool_selection: Option<Vec<String>>,
@@ -45,7 +44,7 @@ impl SessionState {
server_reasoning_included: false,
dependency_env: HashMap::new(),
mcp_dependency_prompted: HashSet::new(),
previous_turn_settings: None,
previous_model: None,
startup_regular_task: None,
active_mcp_tool_selection: None,
active_connector_selection: HashSet::new(),
@@ -61,14 +60,11 @@ impl SessionState {
self.history.record_items(items, policy);
}
pub(crate) fn previous_turn_settings(&self) -> Option<PreviousTurnSettings> {
self.previous_turn_settings.clone()
pub(crate) fn previous_model(&self) -> Option<String> {
self.previous_model.clone()
}
pub(crate) fn set_previous_turn_settings(
&mut self,
previous_turn_settings: Option<PreviousTurnSettings>,
) {
self.previous_turn_settings = previous_turn_settings;
pub(crate) fn set_previous_model(&mut self, previous_model: Option<String>) {
self.previous_model = previous_model;
}
pub(crate) fn clone_history(&self) -> ContextManager {

View File

@@ -745,7 +745,7 @@ mod tests {
#[tokio::test]
async fn ignores_session_prefix_messages_when_truncating() {
let (session, turn_context) = make_session_and_context().await;
let mut items = session.build_initial_context(&turn_context).await;
let mut items = session.build_initial_context(&turn_context, None).await;
items.push(user_msg("feature request"));
items.push(assistant_msg("ack"));
items.push(user_msg("second question"));

View File

@@ -214,14 +214,11 @@ pub fn format_labeled_items_snapshot(
fn format_snapshot_text(text: &str, options: &ContextSnapshotOptions) -> String {
match options.render_mode {
ContextSnapshotRenderMode::RedactedText => {
normalize_snapshot_line_endings(&canonicalize_snapshot_text(text)).replace('\n', "\\n")
}
ContextSnapshotRenderMode::FullText => {
normalize_snapshot_line_endings(text).replace('\n', "\\n")
canonicalize_snapshot_text(text).replace('\n', "\\n")
}
ContextSnapshotRenderMode::FullText => text.replace('\n', "\\n"),
ContextSnapshotRenderMode::KindWithTextPrefix { max_chars } => {
let normalized = normalize_snapshot_line_endings(&canonicalize_snapshot_text(text))
.replace('\n', "\\n");
let normalized = canonicalize_snapshot_text(text).replace('\n', "\\n");
if normalized.chars().count() <= max_chars {
normalized
} else {
@@ -233,10 +230,6 @@ fn format_snapshot_text(text: &str, options: &ContextSnapshotOptions) -> String
}
}
fn normalize_snapshot_line_endings(text: &str) -> String {
text.replace("\r\n", "\n").replace('\r', "\n")
}
fn canonicalize_snapshot_text(text: &str) -> String {
if text.starts_with("<permissions instructions>") {
return "<PERMISSIONS_INSTRUCTIONS>".to_string();
@@ -315,25 +308,6 @@ mod tests {
);
}
#[test]
fn full_text_mode_normalizes_crlf_line_endings() {
let items = vec![json!({
"type": "message",
"role": "user",
"content": [{
"type": "input_text",
"text": "line one\r\n\r\nline two"
}]
})];
let rendered = format_response_items_snapshot(
&items,
&ContextSnapshotOptions::default().render_mode(ContextSnapshotRenderMode::FullText),
);
assert_eq!(rendered, r"00:message/user:line one\n\nline two");
}
#[test]
fn redacted_text_mode_keeps_canonical_placeholders() {
let items = vec![json!({
@@ -375,29 +349,6 @@ mod tests {
);
}
#[test]
fn kind_with_text_prefix_mode_normalizes_crlf_line_endings() {
let items = vec![json!({
"type": "message",
"role": "developer",
"content": [{
"type": "input_text",
"text": "<realtime_conversation>\r\nRealtime conversation started.\r\n\r\nYou are..."
}]
})];
let rendered = format_response_items_snapshot(
&items,
&ContextSnapshotOptions::default()
.render_mode(ContextSnapshotRenderMode::KindWithTextPrefix { max_chars: 64 }),
);
assert_eq!(
rendered,
r"00:message/developer:<realtime_conversation>\nRealtime conversation started.\n\nYou a..."
);
}
#[test]
fn image_only_message_is_rendered_as_non_text_span() {
let items = vec![json!({

View File

@@ -9,14 +9,10 @@ use codex_core::compact::SUMMARY_PREFIX;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::ConversationStartParams;
use codex_protocol::protocol::ErrorEvent;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ItemCompletedEvent;
use codex_protocol::protocol::ItemStartedEvent;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::user_input::UserInput;
@@ -26,15 +22,12 @@ use core_test_support::context_snapshot::ContextSnapshotRenderMode;
use core_test_support::responses;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::sse;
use core_test_support::responses::start_websocket_server;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodexBuilder;
use core_test_support::test_codex::TestCodexHarness;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;
use serde_json::json;
use wiremock::ResponseTemplate;
fn approx_token_count(text: &str) -> i64 {
@@ -75,104 +68,6 @@ fn format_labeled_requests_snapshot(
)
}
fn compacted_summary_only_output(summary: &str) -> Vec<ResponseItem> {
vec![ResponseItem::Compaction {
encrypted_content: summary_with_prefix(summary),
}]
}
fn remote_realtime_test_codex_builder(
realtime_server: &responses::WebSocketTestServer,
) -> TestCodexBuilder {
let realtime_base_url = realtime_server.uri().to_string();
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(move |config| {
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
})
}
async fn start_remote_realtime_server() -> responses::WebSocketTestServer {
start_websocket_server(vec![vec![
vec![json!({
"type": "session.created",
"session": { "id": "sess_remote_compact" }
})],
// Keep the websocket open after startup so routed transcript items during the test do not
// exhaust the scripted responses and mark realtime inactive before the assertions run.
vec![],
vec![],
vec![],
vec![],
vec![],
vec![],
vec![],
vec![],
]])
.await
}
async fn start_realtime_conversation(codex: &codex_core::CodexThread) -> Result<()> {
codex
.submit(Op::RealtimeConversationStart(ConversationStartParams {
prompt: "backend prompt".to_string(),
session_id: None,
}))
.await?;
wait_for_event_match(codex, |msg| match msg {
EventMsg::RealtimeConversationStarted(started) => Some(Ok(started.clone())),
EventMsg::Error(err) => Some(Err(err.clone())),
_ => None,
})
.await
.unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}"));
wait_for_event_match(codex, |msg| match msg {
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
payload: RealtimeEvent::SessionCreated { session_id },
}) => Some(session_id.clone()),
_ => None,
})
.await;
Ok(())
}
async fn close_realtime_conversation(codex: &codex_core::CodexThread) -> Result<()> {
codex.submit(Op::RealtimeConversationClose).await?;
wait_for_event_match(codex, |msg| match msg {
EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()),
_ => None,
})
.await;
Ok(())
}
fn assert_request_contains_realtime_start(request: &responses::ResponsesRequest) {
let body = request.body_json().to_string();
assert!(
body.contains("<realtime_conversation>"),
"expected request to restate realtime instructions"
);
assert!(
!body.contains("Reason: inactive"),
"expected request to use realtime start instructions"
);
}
fn assert_request_contains_realtime_end(request: &responses::ResponsesRequest) {
let body = request.body_json().to_string();
assert!(
body.contains("<realtime_conversation>"),
"expected request to restate realtime instructions"
);
assert!(
body.contains("Reason: inactive"),
"expected request to use realtime end instructions"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_replaces_history_for_followups() -> Result<()> {
skip_if_no_network!(Ok(()));
@@ -1410,470 +1305,6 @@ async fn remote_compact_refreshes_stale_developer_instructions_without_resume()
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_pre_turn_compaction_restates_realtime_start() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = wiremock::MockServer::start().await;
let realtime_server = start_remote_realtime_server().await;
let mut builder = remote_realtime_test_codex_builder(&realtime_server).with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
});
let test = builder.build(&server).await?;
let responses_mock = responses::mount_sse_sequence(
&server,
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", 500),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_SECOND_REPLY"),
responses::ev_completed_with_tokens("r2", 80),
]),
],
)
.await;
let compact_mock = responses::mount_compact_json_once(
&server,
serde_json::json!({
"output": compacted_summary_only_output(
"REMOTE_PRETURN_REALTIME_STILL_ACTIVE_SUMMARY"
)
}),
)
.await;
start_realtime_conversation(test.codex.as_ref()).await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(requests.len(), 2, "expected two model requests");
let compact_request = compact_mock.single_request();
let post_compact_request = &requests[1];
assert_request_contains_realtime_start(post_compact_request);
insta::assert_snapshot!(
"remote_pre_turn_compaction_restates_realtime_start_shapes",
format_labeled_requests_snapshot(
"Remote pre-turn auto-compaction while realtime remains active: compaction clears the reference baseline, so the follow-up request restates realtime-start instructions.",
&[
("Remote Compaction Request", &compact_request),
(
"Remote Post-Compaction History Layout",
post_compact_request
),
]
)
);
close_realtime_conversation(test.codex.as_ref()).await?;
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_pre_turn_compaction_restates_realtime_end() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = wiremock::MockServer::start().await;
let realtime_server = start_remote_realtime_server().await;
let mut builder = remote_realtime_test_codex_builder(&realtime_server).with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
});
let test = builder.build(&server).await?;
let responses_mock = responses::mount_sse_sequence(
&server,
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", 500),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_SECOND_REPLY"),
responses::ev_completed_with_tokens("r2", 80),
]),
],
)
.await;
let compact_mock = responses::mount_compact_json_once(
&server,
serde_json::json!({
"output": compacted_summary_only_output(
"REMOTE_PRETURN_REALTIME_CLOSED_SUMMARY"
)
}),
)
.await;
start_realtime_conversation(test.codex.as_ref()).await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
close_realtime_conversation(test.codex.as_ref()).await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(requests.len(), 2, "expected two model requests");
let compact_request = compact_mock.single_request();
let post_compact_request = &requests[1];
assert_request_contains_realtime_end(post_compact_request);
insta::assert_snapshot!(
"remote_pre_turn_compaction_restates_realtime_end_shapes",
format_labeled_requests_snapshot(
"Remote pre-turn auto-compaction after realtime was closed between turns: the follow-up request emits realtime-end instructions from previous-turn settings even though compaction cleared the reference baseline.",
&[
("Remote Compaction Request", &compact_request),
(
"Remote Post-Compaction History Layout",
post_compact_request
),
]
)
);
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_manual_compact_restates_realtime_start() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = wiremock::MockServer::start().await;
let realtime_server = start_remote_realtime_server().await;
let mut builder = remote_realtime_test_codex_builder(&realtime_server);
let test = builder.build(&server).await?;
let responses_mock = responses::mount_sse_sequence(
&server,
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", 60),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_SECOND_REPLY"),
responses::ev_completed_with_tokens("r2", 80),
]),
],
)
.await;
let compact_mock = responses::mount_compact_json_once(
&server,
serde_json::json!({
"output": compacted_summary_only_output(
"REMOTE_MANUAL_REALTIME_STILL_ACTIVE_SUMMARY"
)
}),
)
.await;
start_realtime_conversation(test.codex.as_ref()).await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
test.codex.submit(Op::Compact).await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(requests.len(), 2, "expected two model requests");
let compact_request = compact_mock.single_request();
let post_compact_request = &requests[1];
assert_request_contains_realtime_start(post_compact_request);
insta::assert_snapshot!(
"remote_manual_compact_restates_realtime_start_shapes",
format_labeled_requests_snapshot(
"Remote manual /compact while realtime remains active: the next regular turn restates realtime-start instructions after compaction clears the baseline.",
&[
("Remote Compaction Request", &compact_request),
(
"Remote Post-Compaction History Layout",
post_compact_request
),
]
)
);
close_realtime_conversation(test.codex.as_ref()).await?;
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_mid_turn_compaction_does_not_restate_realtime_end()
-> Result<()> {
skip_if_no_network!(Ok(()));
let server = wiremock::MockServer::start().await;
let realtime_server = start_remote_realtime_server().await;
let mut builder = remote_realtime_test_codex_builder(&realtime_server).with_config(|config| {
config.model_auto_compact_token_limit = Some(200);
});
let test = builder.build(&server).await?;
let responses_mock = responses::mount_sse_sequence(
&server,
vec![
responses::sse(vec![
responses::ev_assistant_message("setup", "REMOTE_SETUP_REPLY"),
responses::ev_completed_with_tokens("setup-response", 60),
]),
responses::sse(vec![
responses::ev_function_call("call-remote-mid-turn", DUMMY_FUNCTION_NAME, "{}"),
responses::ev_completed_with_tokens("r1", 500),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_MID_TURN_FINAL_REPLY"),
responses::ev_completed_with_tokens("r2", 80),
]),
],
)
.await;
let compact_mock = responses::mount_compact_json_once(
&server,
serde_json::json!({
"output": compacted_summary_only_output(
"REMOTE_MID_TURN_REALTIME_CLOSED_SUMMARY"
)
}),
)
.await;
start_realtime_conversation(test.codex.as_ref()).await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "SETUP_USER".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
close_realtime_conversation(test.codex.as_ref()).await?;
test.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&test.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(requests.len(), 3, "expected three model requests");
let second_turn_request = &requests[1];
let compact_request = compact_mock.single_request();
let post_compact_request = &requests[2];
assert_request_contains_realtime_end(second_turn_request);
assert!(
!post_compact_request
.body_json()
.to_string()
.contains("<realtime_conversation>"),
"did not expect post-compaction history to restate realtime instructions once the current turn had already established an inactive baseline"
);
insta::assert_snapshot!(
"remote_mid_turn_compaction_does_not_restate_realtime_end_shapes",
format_labeled_requests_snapshot(
"Remote mid-turn continuation compaction after realtime was closed before the turn: the initial second-turn request emits realtime-end instructions, but the continuation request does not restate them after compaction because the current turn already established the inactive baseline.",
&[
("Second Turn Initial Request", second_turn_request),
("Remote Compaction Request", &compact_request),
(
"Remote Post-Compaction History Layout",
post_compact_request
),
]
)
);
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn snapshot_request_shape_remote_compact_resume_restates_realtime_end() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = wiremock::MockServer::start().await;
let realtime_server = start_remote_realtime_server().await;
let mut builder = remote_realtime_test_codex_builder(&realtime_server);
let initial = builder.build(&server).await?;
let home = initial.home.clone();
let rollout_path = initial
.session_configured
.rollout_path
.clone()
.expect("rollout path");
let responses_mock = responses::mount_sse_sequence(
&server,
vec![
responses::sse(vec![
responses::ev_assistant_message("m1", "REMOTE_FIRST_REPLY"),
responses::ev_completed_with_tokens("r1", 60),
]),
responses::sse(vec![
responses::ev_assistant_message("m2", "REMOTE_AFTER_RESUME_REPLY"),
responses::ev_completed_with_tokens("r2", 80),
]),
],
)
.await;
let compact_mock = responses::mount_compact_json_once(
&server,
serde_json::json!({
"output": compacted_summary_only_output(
"REMOTE_RESUME_REALTIME_CLOSED_SUMMARY"
)
}),
)
.await;
start_realtime_conversation(initial.codex.as_ref()).await?;
initial
.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_ONE".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&initial.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
close_realtime_conversation(initial.codex.as_ref()).await?;
initial.codex.submit(Op::Compact).await?;
wait_for_event(&initial.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
initial.codex.submit(Op::Shutdown).await?;
wait_for_event(&initial.codex, |ev| {
matches!(ev, EventMsg::ShutdownComplete)
})
.await;
let mut resume_builder =
test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let resumed = resume_builder.resume(&server, home, rollout_path).await?;
resumed
.codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "USER_TWO".to_string(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&resumed.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;
assert_eq!(compact_mock.requests().len(), 1);
let requests = responses_mock.requests();
assert_eq!(requests.len(), 2, "expected two model requests");
let compact_request = compact_mock.single_request();
let after_resume_request = &requests[1];
assert_request_contains_realtime_end(after_resume_request);
insta::assert_snapshot!(
"remote_compact_resume_restates_realtime_end_shapes",
format_labeled_requests_snapshot(
"After remote manual /compact and resume, the first resumed turn rebuilds history from the compaction item and restates realtime-end instructions from reconstructed previous-turn settings.",
&[
("Remote Compaction Request", &compact_request),
("Remote Post-Resume History Layout", after_resume_request),
]
)
);
realtime_server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
// TODO(ccunningham): Update once remote pre-turn compaction includes incoming user input.
async fn snapshot_request_shape_remote_pre_turn_compaction_including_incoming_user_message()

View File

@@ -367,7 +367,6 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> {
server.shutdown().await;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn conversation_uses_experimental_realtime_ws_base_url_override() -> Result<()> {
skip_if_no_network!(Ok(()));

View File

@@ -36,7 +36,6 @@ fn resume_history(
model: previous_model.to_string(),
personality: None,
collaboration_mode: None,
realtime_active: None,
effort: config.model_reasoning_effort,
summary: config
.model_reasoning_summary

View File

@@ -1,24 +0,0 @@
---
source: core/tests/suite/compact_remote.rs
expression: "format_labeled_requests_snapshot(\"After remote manual /compact and resume, the first resumed turn rebuilds history from the compaction item and restates realtime-end instructions from reconstructed previous-turn settings.\",\n&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Resume History Layout\", after_resume_request),])"
---
Scenario: After remote manual /compact and resume, the first resumed turn rebuilds history from the compaction item and restates realtime-end instructions from reconstructed previous-turn settings.
## Remote Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/developer:<realtime_conversation>\nRealtime conversation started.\n\nYou a...
03:message/user:USER_ONE
04:message/assistant:REMOTE_FIRST_REPLY
## Remote Post-Resume History Layout
00:compaction:encrypted=true
01:message/developer[2]:
[01] <PERMISSIONS_INSTRUCTIONS>
[02] <realtime_conversation>\nRealtime conversation ended.\n\nYou are...
02:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:USER_TWO

View File

@@ -1,24 +0,0 @@
---
source: core/tests/suite/compact_remote.rs
expression: "format_labeled_requests_snapshot(\"Remote manual /compact while realtime remains active: the next regular turn restates realtime-start instructions after compaction clears the baseline.\",\n&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Layout\", post_compact_request),])"
---
Scenario: Remote manual /compact while realtime remains active: the next regular turn restates realtime-start instructions after compaction clears the baseline.
## Remote Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/developer:<realtime_conversation>\nRealtime conversation started.\n\nYou a...
03:message/user:USER_ONE
04:message/assistant:REMOTE_FIRST_REPLY
## Remote Post-Compaction History Layout
00:compaction:encrypted=true
01:message/developer[2]:
[01] <PERMISSIONS_INSTRUCTIONS>
[02] <realtime_conversation>\nRealtime conversation started.\n\nYou a...
02:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:USER_TWO

View File

@@ -1,36 +0,0 @@
---
source: core/tests/suite/compact_remote.rs
expression: "format_labeled_requests_snapshot(\"Remote mid-turn continuation compaction after realtime was closed before the turn: the initial second-turn request emits realtime-end instructions, but the continuation request does not restate them after compaction because the current turn already established the inactive baseline.\",\n&[(\"Second Turn Initial Request\", second_turn_request),\n(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Layout\", post_compact_request),])"
---
Scenario: Remote mid-turn continuation compaction after realtime was closed before the turn: the initial second-turn request emits realtime-end instructions, but the continuation request does not restate them after compaction because the current turn already established the inactive baseline.
## Second Turn Initial Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/developer:<realtime_conversation>\nRealtime conversation started.\n\nYou a...
03:message/user:SETUP_USER
04:message/assistant:REMOTE_SETUP_REPLY
05:message/developer:<realtime_conversation>\nRealtime conversation ended.\n\nYou are...
06:message/user:USER_TWO
## Remote Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/developer:<realtime_conversation>\nRealtime conversation started.\n\nYou a...
03:message/user:SETUP_USER
04:message/assistant:REMOTE_SETUP_REPLY
05:message/developer:<realtime_conversation>\nRealtime conversation ended.\n\nYou are...
06:message/user:USER_TWO
07:function_call/test_tool
08:function_call_output:unsupported call: test_tool
## Remote Post-Compaction History Layout
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:compaction:encrypted=true

View File

@@ -1,24 +0,0 @@
---
source: core/tests/suite/compact_remote.rs
expression: "format_labeled_requests_snapshot(\"Remote pre-turn auto-compaction after realtime was closed between turns: the follow-up request emits realtime-end instructions from previous-turn settings even though compaction cleared the reference baseline.\",\n&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Layout\", post_compact_request),])"
---
Scenario: Remote pre-turn auto-compaction after realtime was closed between turns: the follow-up request emits realtime-end instructions from previous-turn settings even though compaction cleared the reference baseline.
## Remote Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/developer:<realtime_conversation>\nRealtime conversation started.\n\nYou a...
03:message/user:USER_ONE
04:message/assistant:REMOTE_FIRST_REPLY
## Remote Post-Compaction History Layout
00:compaction:encrypted=true
01:message/developer[2]:
[01] <PERMISSIONS_INSTRUCTIONS>
[02] <realtime_conversation>\nRealtime conversation ended.\n\nYou are...
02:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:USER_TWO

View File

@@ -1,24 +0,0 @@
---
source: core/tests/suite/compact_remote.rs
expression: "format_labeled_requests_snapshot(\"Remote pre-turn auto-compaction while realtime remains active: compaction clears the reference baseline, so the follow-up request restates realtime-start instructions.\",\n&[(\"Remote Compaction Request\", &compact_request),\n(\"Remote Post-Compaction History Layout\", post_compact_request),])"
---
Scenario: Remote pre-turn auto-compaction while realtime remains active: compaction clears the reference baseline, so the follow-up request restates realtime-start instructions.
## Remote Compaction Request
00:message/developer:<PERMISSIONS_INSTRUCTIONS>
01:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
02:message/developer:<realtime_conversation>\nRealtime conversation started.\n\nYou a...
03:message/user:USER_ONE
04:message/assistant:REMOTE_FIRST_REPLY
## Remote Post-Compaction History Layout
00:compaction:encrypted=true
01:message/developer[2]:
[01] <PERMISSIONS_INSTRUCTIONS>
[02] <realtime_conversation>\nRealtime conversation started.\n\nYou a...
02:message/user[2]:
[01] <AGENTS_MD>
[02] <ENVIRONMENT_CONTEXT:cwd=<CWD>>
03:message/user:USER_TWO

View File

@@ -41,7 +41,18 @@ impl Error for CredentialStoreError {}
/// Shared credential store abstraction for keyring-backed implementations.
pub trait KeyringStore: Debug + Send + Sync {
fn load(&self, service: &str, account: &str) -> Result<Option<String>, CredentialStoreError>;
fn load_secret(
&self,
service: &str,
account: &str,
) -> Result<Option<Vec<u8>>, CredentialStoreError>;
fn save(&self, service: &str, account: &str, value: &str) -> Result<(), CredentialStoreError>;
fn save_secret(
&self,
service: &str,
account: &str,
value: &[u8],
) -> Result<(), CredentialStoreError>;
fn delete(&self, service: &str, account: &str) -> Result<bool, CredentialStoreError>;
}
@@ -68,6 +79,31 @@ impl KeyringStore for DefaultKeyringStore {
}
}
fn load_secret(
&self,
service: &str,
account: &str,
) -> Result<Option<Vec<u8>>, CredentialStoreError> {
trace!("keyring.load_secret start, service={service}, account={account}");
let entry = Entry::new(service, account).map_err(CredentialStoreError::new)?;
match entry.get_secret() {
Ok(secret) => {
trace!("keyring.load_secret success, service={service}, account={account}");
Ok(Some(secret))
}
Err(keyring::Error::NoEntry) => {
trace!("keyring.load_secret no entry, service={service}, account={account}");
Ok(None)
}
Err(error) => {
trace!(
"keyring.load_secret error, service={service}, account={account}, error={error}"
);
Err(CredentialStoreError::new(error))
}
}
}
fn save(&self, service: &str, account: &str, value: &str) -> Result<(), CredentialStoreError> {
trace!(
"keyring.save start, service={service}, account={account}, value_len={}",
@@ -86,6 +122,31 @@ impl KeyringStore for DefaultKeyringStore {
}
}
fn save_secret(
&self,
service: &str,
account: &str,
value: &[u8],
) -> Result<(), CredentialStoreError> {
trace!(
"keyring.save_secret start, service={service}, account={account}, value_len={}",
value.len()
);
let entry = Entry::new(service, account).map_err(CredentialStoreError::new)?;
match entry.set_secret(value) {
Ok(()) => {
trace!("keyring.save_secret success, service={service}, account={account}");
Ok(())
}
Err(error) => {
trace!(
"keyring.save_secret error, service={service}, account={account}, error={error}"
);
Err(CredentialStoreError::new(error))
}
}
}
fn delete(&self, service: &str, account: &str) -> Result<bool, CredentialStoreError> {
trace!("keyring.delete start, service={service}, account={account}");
let entry = Entry::new(service, account).map_err(CredentialStoreError::new)?;
@@ -145,6 +206,22 @@ pub mod tests {
credential.get_password().ok()
}
pub fn saved_secret(&self, account: &str) -> Option<Vec<u8>> {
let credential = {
let guard = self
.credentials
.lock()
.unwrap_or_else(PoisonError::into_inner);
guard.get(account).cloned()
}?;
credential.get_secret().ok()
}
pub fn saved_secret_utf8(&self, account: &str) -> Option<String> {
let secret = self.saved_secret(account)?;
String::from_utf8(secret).ok()
}
pub fn set_error(&self, account: &str, error: KeyringError) {
let credential = self.credential(account);
credential.set_error(error);
@@ -184,6 +261,30 @@ pub mod tests {
}
}
fn load_secret(
&self,
_service: &str,
account: &str,
) -> Result<Option<Vec<u8>>, CredentialStoreError> {
let credential = {
let guard = self
.credentials
.lock()
.unwrap_or_else(PoisonError::into_inner);
guard.get(account).cloned()
};
let Some(credential) = credential else {
return Ok(None);
};
match credential.get_secret() {
Ok(secret) => Ok(Some(secret)),
Err(KeyringError::NoEntry) => Ok(None),
Err(error) => Err(CredentialStoreError::new(error)),
}
}
fn save(
&self,
_service: &str,
@@ -196,6 +297,18 @@ pub mod tests {
.map_err(CredentialStoreError::new)
}
fn save_secret(
&self,
_service: &str,
account: &str,
value: &[u8],
) -> Result<(), CredentialStoreError> {
let credential = self.credential(account);
credential
.set_secret(value)
.map_err(CredentialStoreError::new)
}
fn delete(&self, _service: &str, account: &str) -> Result<bool, CredentialStoreError> {
let credential = {
let guard = self

View File

@@ -14,8 +14,6 @@ use crate::protocol::AskForApproval;
use crate::protocol::COLLABORATION_MODE_CLOSE_TAG;
use crate::protocol::COLLABORATION_MODE_OPEN_TAG;
use crate::protocol::NetworkAccess;
use crate::protocol::REALTIME_CONVERSATION_CLOSE_TAG;
use crate::protocol::REALTIME_CONVERSATION_OPEN_TAG;
use crate::protocol::SandboxPolicy;
use crate::protocol::WritableRoot;
use crate::user_input::UserInput;
@@ -342,9 +340,6 @@ const SANDBOX_MODE_WORKSPACE_WRITE: &str =
include_str!("prompts/permissions/sandbox_mode/workspace_write.md");
const SANDBOX_MODE_READ_ONLY: &str = include_str!("prompts/permissions/sandbox_mode/read_only.md");
const REALTIME_START_INSTRUCTIONS: &str = include_str!("prompts/realtime/realtime_start.md");
const REALTIME_END_INSTRUCTIONS: &str = include_str!("prompts/realtime/realtime_end.md");
impl DeveloperInstructions {
pub fn new<T: Into<String>>(text: T) -> Self {
Self { text: text.into() }
@@ -414,20 +409,6 @@ impl DeveloperInstructions {
))
}
pub fn realtime_start_message() -> Self {
DeveloperInstructions::new(format!(
"{REALTIME_CONVERSATION_OPEN_TAG}\n{}\n{REALTIME_CONVERSATION_CLOSE_TAG}",
REALTIME_START_INSTRUCTIONS.trim()
))
}
pub fn realtime_end_message(reason: &str) -> Self {
DeveloperInstructions::new(format!(
"{REALTIME_CONVERSATION_OPEN_TAG}\n{}\n\nReason: {reason}\n{REALTIME_CONVERSATION_CLOSE_TAG}",
REALTIME_END_INSTRUCTIONS.trim()
))
}
pub fn personality_spec_message(spec: String) -> Self {
let message = format!(
"<personality_spec> The user has requested a new communication style. Future messages should adhere to the following personality: \n{spec} </personality_spec>"

View File

@@ -1,5 +0,0 @@
Realtime conversation ended.
You are still operating behind an intermediary rather than speaking to the user directly. Use the conversation transcript and current context to decide whether backend help is actually needed, and avoid verbose responses that only add latency.
Subsequent user input may return to typed text rather than transcript-style text. Do not assume recognition errors or missing punctuation once realtime has ended. Resume normal chat behavior.

View File

@@ -1,10 +0,0 @@
Realtime conversation started.
You are operating as a backend executor behind an intermediary. The user does not talk to you directly. Any response you produce will be consumed by the intermediary and may be summarized before the user sees it.
When invoked, you receive the latest conversation transcript and any relevant mode or metadata. The intermediary may invoke you even when backend help is not actually needed. Use the transcript to decide whether you should do work. If backend help is unnecessary, avoid verbose responses that add user-visible latency.
When user text is routed from realtime, treat it as a transcript. It may be unpunctuated or contain recognition errors.
- Ask brief clarification questions when needed.
- Keep responses concise and action-oriented.

View File

@@ -70,8 +70,6 @@ pub const ENVIRONMENT_CONTEXT_OPEN_TAG: &str = "<environment_context>";
pub const ENVIRONMENT_CONTEXT_CLOSE_TAG: &str = "</environment_context>";
pub const COLLABORATION_MODE_OPEN_TAG: &str = "<collaboration_mode>";
pub const COLLABORATION_MODE_CLOSE_TAG: &str = "</collaboration_mode>";
pub const REALTIME_CONVERSATION_OPEN_TAG: &str = "<realtime_conversation>";
pub const REALTIME_CONVERSATION_CLOSE_TAG: &str = "</realtime_conversation>";
pub const USER_MESSAGE_BEGIN: &str = "## My request for Codex:";
/// Submission Queue Entry - requests from user
@@ -2148,8 +2146,6 @@ pub struct TurnContextItem {
pub personality: Option<Personality>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub collaboration_mode: Option<CollaborationMode>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub realtime_active: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub effort: Option<ReasoningEffortConfig>,
pub summary: ReasoningSummaryConfig,
@@ -3380,7 +3376,6 @@ mod tests {
model: "gpt-5".to_string(),
personality: None,
collaboration_mode: None,
realtime_active: None,
effort: None,
summary: ReasoningSummaryConfig::Auto,
user_instructions: None,

View File

@@ -260,7 +260,6 @@ mod tests {
model: "gpt-5".to_string(),
personality: None,
collaboration_mode: None,
realtime_active: None,
effort: None,
summary: ReasoningSummary::Auto,
user_instructions: None,
@@ -297,7 +296,6 @@ mod tests {
model: "gpt-5".to_string(),
personality: None,
collaboration_mode: None,
realtime_active: None,
effort: None,
summary: ReasoningSummary::Auto,
user_instructions: None,

View File

@@ -1280,7 +1280,6 @@ mod tests {
model,
personality: None,
collaboration_mode: None,
realtime_active: Some(false),
effort: config.model_reasoning_effort,
summary: config
.model_reasoning_summary