Compare commits

...

3 Commits

Author SHA1 Message Date
Ahmed Ibrahim
7fcbefdc79 initial_history 2025-10-29 22:34:34 -07:00
Ahmed Ibrahim
17ba27ed81 initial_history 2025-10-29 22:34:01 -07:00
Ahmed Ibrahim
4223f5a381 prompt cache key 2025-10-29 22:16:41 -07:00
12 changed files with 82 additions and 42 deletions

View File

@@ -88,6 +88,7 @@ pub struct ModelClient {
effort: Option<ReasoningEffortConfig>,
summary: ReasoningSummaryConfig,
session_source: SessionSource,
prompt_cache_key: String,
}
#[allow(clippy::too_many_arguments)]
@@ -101,6 +102,7 @@ impl ModelClient {
summary: ReasoningSummaryConfig,
conversation_id: ConversationId,
session_source: SessionSource,
prompt_cache_key: String,
) -> Self {
let client = create_client();
@@ -114,6 +116,7 @@ impl ModelClient {
effort,
summary,
session_source,
prompt_cache_key,
}
}
@@ -246,7 +249,7 @@ impl ModelClient {
store: azure_workaround,
stream: true,
include,
prompt_cache_key: Some(self.conversation_id.to_string()),
prompt_cache_key: Some(self.prompt_cache_key.clone()),
text,
};

View File

@@ -158,6 +158,7 @@ impl Codex {
auth_manager: Arc<AuthManager>,
conversation_history: InitialHistory,
session_source: SessionSource,
prompt_cache_key: Option<String>,
) -> CodexResult<CodexSpawnOk> {
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
let (tx_event, rx_event) = async_channel::unbounded();
@@ -190,6 +191,7 @@ impl Codex {
tx_event.clone(),
conversation_history,
session_source_clone,
prompt_cache_key,
)
.await
.map_err(|e| {
@@ -253,6 +255,7 @@ pub(crate) struct Session {
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
pub(crate) services: SessionServices,
next_internal_sub_id: AtomicU64,
prompt_cache_key: String,
}
/// The context needed for a single turn of the conversation.
@@ -368,6 +371,7 @@ impl Session {
session_configuration: &SessionConfiguration,
conversation_id: ConversationId,
sub_id: String,
prompt_cache_key: String,
) -> TurnContext {
let config = session_configuration.original_config_do_not_use.clone();
let model_family = find_family_for_model(&session_configuration.model)
@@ -395,6 +399,7 @@ impl Session {
session_configuration.model_reasoning_summary,
conversation_id,
session_configuration.session_source.clone(),
prompt_cache_key,
);
let tools_config = ToolsConfig::new(&ToolsConfigParams {
@@ -425,6 +430,7 @@ impl Session {
tx_event: Sender<Event>,
initial_history: InitialHistory,
session_source: SessionSource,
prompt_cache_key: Option<String>,
) -> anyhow::Result<Arc<Self>> {
debug!(
"Configuring session: model={}; provider={:?}",
@@ -589,6 +595,7 @@ impl Session {
active_turn: Mutex::new(None),
services,
next_internal_sub_id: AtomicU64::new(0),
prompt_cache_key: prompt_cache_key.unwrap_or_else(|| conversation_id.to_string()),
});
// Dispatch the SessionConfiguredEvent first and then report any errors.
@@ -618,6 +625,10 @@ impl Session {
Ok(sess)
}
pub(crate) fn get_prompt_cache_key(&self) -> String {
self.prompt_cache_key.clone()
}
pub(crate) fn get_tx_event(&self) -> Sender<Event> {
self.tx_event.clone()
}
@@ -703,6 +714,7 @@ impl Session {
&session_configuration,
self.conversation_id,
sub_id,
self.prompt_cache_key.clone(),
);
if let Some(final_schema) = updates.final_output_json_schema {
turn_context.final_output_json_schema = final_schema;
@@ -788,16 +800,11 @@ impl Session {
failure_message: Option<&str>,
) -> Option<SandboxCommandAssessment> {
let config = turn_context.client.config();
let provider = turn_context.client.provider().clone();
let auth_manager = Arc::clone(&self.services.auth_manager);
let otel = self.services.otel_event_manager.clone();
crate::sandboxing::assessment::assess_command(
config,
provider,
auth_manager,
&otel,
self.conversation_id,
turn_context.client.get_session_source(),
turn_context.client.clone(),
call_id,
command,
&turn_context.sandbox_policy,
@@ -1656,6 +1663,7 @@ async fn spawn_review_thread(
per_turn_config.model_reasoning_summary,
sess.conversation_id,
parent_turn_context.client.get_session_source(),
sess.prompt_cache_key.clone(),
);
let review_turn_context = TurnContext {
@@ -2529,6 +2537,7 @@ mod tests {
&session_configuration,
conversation_id,
"turn_id".to_string(),
conversation_id.to_string(),
);
let session = Session {
@@ -2538,6 +2547,7 @@ mod tests {
active_turn: Mutex::new(None),
services,
next_internal_sub_id: AtomicU64::new(0),
prompt_cache_key: conversation_id.to_string(),
};
(session, turn_context)
@@ -2603,6 +2613,7 @@ mod tests {
&session_configuration,
conversation_id,
"turn_id".to_string(),
conversation_id.to_string(),
));
let session = Arc::new(Session {
@@ -2612,6 +2623,7 @@ mod tests {
active_turn: Mutex::new(None),
services,
next_internal_sub_id: AtomicU64::new(0),
prompt_cache_key: conversation_id.to_string(),
});
(session, turn_context, rx_event)

View File

@@ -45,6 +45,7 @@ pub(crate) async fn run_codex_conversation_interactive(
auth_manager,
InitialHistory::New,
SessionSource::SubAgent(SubAgentSource::Review),
Some(parent_session.get_prompt_cache_key()),
)
.await?;
let codex = Arc::new(codex);

View File

@@ -74,6 +74,7 @@ impl ConversationManager {
auth_manager,
InitialHistory::New,
self.session_source.clone(),
None,
)
.await?;
self.finalize_spawn(codex, conversation_id).await
@@ -150,6 +151,7 @@ impl ConversationManager {
auth_manager,
initial_history,
self.session_source.clone(),
None,
)
.await?;
self.finalize_spawn(codex, conversation_id).await
@@ -175,6 +177,7 @@ impl ConversationManager {
nth_user_message: usize,
config: Config,
path: PathBuf,
conversation_id: ConversationId,
) -> CodexResult<NewConversation> {
// Compute the prefix up to the cut point.
let history = RolloutRecorder::get_rollout_history(&path).await?;
@@ -185,7 +188,14 @@ impl ConversationManager {
let CodexSpawnOk {
codex,
conversation_id,
} = Codex::spawn(config, auth_manager, history, self.session_source.clone()).await?;
} = Codex::spawn(
config,
auth_manager,
history,
self.session_source.clone(),
Some(conversation_id.to_string()),
)
.await?;
self.finalize_spawn(codex, conversation_id).await
}

View File

@@ -4,8 +4,6 @@ use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use crate::AuthManager;
use crate::ModelProviderInfo;
use crate::client::ModelClient;
use crate::client_common::Prompt;
use crate::client_common::ResponseEvent;
@@ -13,11 +11,9 @@ use crate::config::Config;
use crate::protocol::SandboxPolicy;
use askama::Template;
use codex_otel::otel_event_manager::OtelEventManager;
use codex_protocol::ConversationId;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::SandboxCommandAssessment;
use codex_protocol::protocol::SessionSource;
use futures::StreamExt;
use serde_json::json;
use tokio::time::timeout;
@@ -50,11 +46,8 @@ struct SandboxAssessmentPromptTemplate<'a> {
#[allow(clippy::too_many_arguments)]
pub(crate) async fn assess_command(
config: Arc<Config>,
provider: ModelProviderInfo,
auth_manager: Arc<AuthManager>,
parent_otel: &OtelEventManager,
conversation_id: ConversationId,
session_source: SessionSource,
client: ModelClient,
call_id: &str,
command: &[String],
sandbox_policy: &SandboxPolicy,
@@ -132,20 +125,6 @@ pub(crate) async fn assess_command(
output_schema: Some(sandbox_assessment_schema()),
};
let child_otel =
parent_otel.with_model(config.model.as_str(), config.model_family.slug.as_str());
let client = ModelClient::new(
Arc::clone(&config),
Some(auth_manager),
child_otel,
provider,
config.model_reasoning_effort,
config.model_reasoning_summary,
conversation_id,
session_source,
);
let start = Instant::now();
let assessment_result = timeout(SANDBOX_ASSESSMENT_TIMEOUT, async move {
let mut stream = client.stream(&prompt).await?;

View File

@@ -95,6 +95,7 @@ async fn run_request(input: Vec<ResponseItem>) -> Value {
summary,
conversation_id,
codex_protocol::protocol::SessionSource::Exec,
conversation_id.to_string(),
);
let mut prompt = Prompt::default();

View File

@@ -95,6 +95,7 @@ async fn run_stream_with_bytes(sse_body: &[u8]) -> Vec<ResponseEvent> {
summary,
conversation_id,
codex_protocol::protocol::SessionSource::Exec,
conversation_id.to_string(),
);
let mut prompt = Prompt::default();

View File

@@ -77,6 +77,7 @@ async fn responses_stream_includes_task_type_header() {
summary,
conversation_id,
SessionSource::Exec,
conversation_id.to_string(),
);
let mut prompt = Prompt::default();

View File

@@ -676,6 +676,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
summary,
conversation_id,
codex_protocol::protocol::SessionSource::Exec,
conversation_id.to_string(),
);
let mut prompt = Prompt::default();

View File

@@ -21,6 +21,7 @@ use codex_core::config::OPENAI_DEFAULT_MODEL;
use codex_core::protocol::EventMsg;
use codex_core::protocol::Op;
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
use codex_protocol::ConversationId;
use codex_protocol::user_input::UserInput;
use core_test_support::load_default_config_for_test;
use core_test_support::responses::ev_assistant_message;
@@ -78,7 +79,8 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
mount_initial_flow(&server).await;
// 2. Start a new conversation and drive it through the compact/resume/fork steps.
let (_home, config, manager, base) = start_test_conversation(&server).await;
let (_home, config, manager, base, base_conversation_id) =
start_test_conversation(&server).await;
user_turn(&base, "hello world").await;
compact_conversation(&base).await;
@@ -97,7 +99,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
"compact+resume test expects resumed path {resumed_path:?} to exist",
);
let forked = fork_conversation(&manager, &config, resumed_path, 2).await;
let forked = fork_conversation(&manager, &config, resumed_path, 2, base_conversation_id).await;
user_turn(&forked, "AFTER_FORK").await;
// 3. Capture the requests to the model and validate the history slices.
@@ -535,7 +537,8 @@ async fn compact_resume_after_second_compaction_preserves_history() {
mount_second_compact_flow(&server).await;
// 2. Drive the conversation through compact -> resume -> fork -> compact -> resume.
let (_home, config, manager, base) = start_test_conversation(&server).await;
let (_home, config, manager, base, base_conversation_id) =
start_test_conversation(&server).await;
user_turn(&base, "hello world").await;
compact_conversation(&base).await;
@@ -554,7 +557,7 @@ async fn compact_resume_after_second_compaction_preserves_history() {
"second compact test expects resumed path {resumed_path:?} to exist",
);
let forked = fork_conversation(&manager, &config, resumed_path, 3).await;
let forked = fork_conversation(&manager, &config, resumed_path, 3, base_conversation_id).await;
user_turn(&forked, "AFTER_FORK").await;
compact_conversation(&forked).await;
@@ -780,7 +783,13 @@ async fn mount_second_compact_flow(server: &MockServer) {
async fn start_test_conversation(
server: &MockServer,
) -> (TempDir, Config, ConversationManager, Arc<CodexConversation>) {
) -> (
TempDir,
Config,
ConversationManager,
Arc<CodexConversation>,
ConversationId,
) {
let model_provider = ModelProviderInfo {
base_url: Some(format!("{}/v1", server.uri())),
..built_in_model_providers()["openai"].clone()
@@ -790,12 +799,16 @@ async fn start_test_conversation(
config.model_provider = model_provider;
let manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"));
let NewConversation { conversation, .. } = manager
let NewConversation {
conversation,
conversation_id,
..
} = manager
.new_conversation(config.clone())
.await
.expect("create conversation");
(home, config, manager, conversation)
(home, config, manager, conversation, conversation_id)
}
async fn user_turn(conversation: &Arc<CodexConversation>, text: &str) {
@@ -840,9 +853,10 @@ async fn fork_conversation(
config: &Config,
path: std::path::PathBuf,
nth_user_message: usize,
conversation_id: ConversationId,
) -> Arc<CodexConversation> {
let NewConversation { conversation, .. } = manager
.fork_conversation(nth_user_message, config.clone(), path)
.fork_conversation(nth_user_message, config.clone(), path, conversation_id)
.await
.expect("fork conversation");
conversation

View File

@@ -58,6 +58,7 @@ async fn fork_conversation_twice_drops_to_first_message() {
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"));
let NewConversation {
conversation: codex,
conversation_id,
..
} = conversation_manager
.new_conversation(config)
@@ -129,7 +130,12 @@ async fn fork_conversation_twice_drops_to_first_message() {
conversation: codex_fork1,
..
} = conversation_manager
.fork_conversation(1, config_for_fork.clone(), base_path.clone())
.fork_conversation(
1,
config_for_fork.clone(),
base_path.clone(),
conversation_id,
)
.await
.expect("fork 1");
@@ -147,7 +153,12 @@ async fn fork_conversation_twice_drops_to_first_message() {
conversation: codex_fork2,
..
} = conversation_manager
.fork_conversation(0, config_for_fork.clone(), fork1_path.clone())
.fork_conversation(
0,
config_for_fork.clone(),
fork1_path.clone(),
conversation_id,
)
.await
.expect("fork 2");

View File

@@ -305,7 +305,12 @@ impl App {
let cfg = self.chat_widget.config_ref().clone();
// Perform the fork via a thin wrapper for clarity/testability.
let result = self
.perform_fork(ev.path.clone(), nth_user_message, cfg.clone())
.perform_fork(
ev.path.clone(),
nth_user_message,
cfg.clone(),
ev.conversation_id,
)
.await;
match result {
Ok(new_conv) => {
@@ -321,9 +326,10 @@ impl App {
path: PathBuf,
nth_user_message: usize,
cfg: codex_core::config::Config,
conversation_id: ConversationId,
) -> codex_core::error::Result<codex_core::NewConversation> {
self.server
.fork_conversation(nth_user_message, cfg, path)
.fork_conversation(nth_user_message, cfg, path, conversation_id)
.await
}