mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
Compare commits
3 Commits
pr7780
...
prompt-cac
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7fcbefdc79 | ||
|
|
17ba27ed81 | ||
|
|
4223f5a381 |
@@ -88,6 +88,7 @@ pub struct ModelClient {
|
||||
effort: Option<ReasoningEffortConfig>,
|
||||
summary: ReasoningSummaryConfig,
|
||||
session_source: SessionSource,
|
||||
prompt_cache_key: String,
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
@@ -101,6 +102,7 @@ impl ModelClient {
|
||||
summary: ReasoningSummaryConfig,
|
||||
conversation_id: ConversationId,
|
||||
session_source: SessionSource,
|
||||
prompt_cache_key: String,
|
||||
) -> Self {
|
||||
let client = create_client();
|
||||
|
||||
@@ -114,6 +116,7 @@ impl ModelClient {
|
||||
effort,
|
||||
summary,
|
||||
session_source,
|
||||
prompt_cache_key,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -246,7 +249,7 @@ impl ModelClient {
|
||||
store: azure_workaround,
|
||||
stream: true,
|
||||
include,
|
||||
prompt_cache_key: Some(self.conversation_id.to_string()),
|
||||
prompt_cache_key: Some(self.prompt_cache_key.clone()),
|
||||
text,
|
||||
};
|
||||
|
||||
|
||||
@@ -158,6 +158,7 @@ impl Codex {
|
||||
auth_manager: Arc<AuthManager>,
|
||||
conversation_history: InitialHistory,
|
||||
session_source: SessionSource,
|
||||
prompt_cache_key: Option<String>,
|
||||
) -> CodexResult<CodexSpawnOk> {
|
||||
let (tx_sub, rx_sub) = async_channel::bounded(SUBMISSION_CHANNEL_CAPACITY);
|
||||
let (tx_event, rx_event) = async_channel::unbounded();
|
||||
@@ -190,6 +191,7 @@ impl Codex {
|
||||
tx_event.clone(),
|
||||
conversation_history,
|
||||
session_source_clone,
|
||||
prompt_cache_key,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
@@ -253,6 +255,7 @@ pub(crate) struct Session {
|
||||
pub(crate) active_turn: Mutex<Option<ActiveTurn>>,
|
||||
pub(crate) services: SessionServices,
|
||||
next_internal_sub_id: AtomicU64,
|
||||
prompt_cache_key: String,
|
||||
}
|
||||
|
||||
/// The context needed for a single turn of the conversation.
|
||||
@@ -368,6 +371,7 @@ impl Session {
|
||||
session_configuration: &SessionConfiguration,
|
||||
conversation_id: ConversationId,
|
||||
sub_id: String,
|
||||
prompt_cache_key: String,
|
||||
) -> TurnContext {
|
||||
let config = session_configuration.original_config_do_not_use.clone();
|
||||
let model_family = find_family_for_model(&session_configuration.model)
|
||||
@@ -395,6 +399,7 @@ impl Session {
|
||||
session_configuration.model_reasoning_summary,
|
||||
conversation_id,
|
||||
session_configuration.session_source.clone(),
|
||||
prompt_cache_key,
|
||||
);
|
||||
|
||||
let tools_config = ToolsConfig::new(&ToolsConfigParams {
|
||||
@@ -425,6 +430,7 @@ impl Session {
|
||||
tx_event: Sender<Event>,
|
||||
initial_history: InitialHistory,
|
||||
session_source: SessionSource,
|
||||
prompt_cache_key: Option<String>,
|
||||
) -> anyhow::Result<Arc<Self>> {
|
||||
debug!(
|
||||
"Configuring session: model={}; provider={:?}",
|
||||
@@ -589,6 +595,7 @@ impl Session {
|
||||
active_turn: Mutex::new(None),
|
||||
services,
|
||||
next_internal_sub_id: AtomicU64::new(0),
|
||||
prompt_cache_key: prompt_cache_key.unwrap_or_else(|| conversation_id.to_string()),
|
||||
});
|
||||
|
||||
// Dispatch the SessionConfiguredEvent first and then report any errors.
|
||||
@@ -618,6 +625,10 @@ impl Session {
|
||||
Ok(sess)
|
||||
}
|
||||
|
||||
pub(crate) fn get_prompt_cache_key(&self) -> String {
|
||||
self.prompt_cache_key.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn get_tx_event(&self) -> Sender<Event> {
|
||||
self.tx_event.clone()
|
||||
}
|
||||
@@ -703,6 +714,7 @@ impl Session {
|
||||
&session_configuration,
|
||||
self.conversation_id,
|
||||
sub_id,
|
||||
self.prompt_cache_key.clone(),
|
||||
);
|
||||
if let Some(final_schema) = updates.final_output_json_schema {
|
||||
turn_context.final_output_json_schema = final_schema;
|
||||
@@ -788,16 +800,11 @@ impl Session {
|
||||
failure_message: Option<&str>,
|
||||
) -> Option<SandboxCommandAssessment> {
|
||||
let config = turn_context.client.config();
|
||||
let provider = turn_context.client.provider().clone();
|
||||
let auth_manager = Arc::clone(&self.services.auth_manager);
|
||||
let otel = self.services.otel_event_manager.clone();
|
||||
crate::sandboxing::assessment::assess_command(
|
||||
config,
|
||||
provider,
|
||||
auth_manager,
|
||||
&otel,
|
||||
self.conversation_id,
|
||||
turn_context.client.get_session_source(),
|
||||
turn_context.client.clone(),
|
||||
call_id,
|
||||
command,
|
||||
&turn_context.sandbox_policy,
|
||||
@@ -1656,6 +1663,7 @@ async fn spawn_review_thread(
|
||||
per_turn_config.model_reasoning_summary,
|
||||
sess.conversation_id,
|
||||
parent_turn_context.client.get_session_source(),
|
||||
sess.prompt_cache_key.clone(),
|
||||
);
|
||||
|
||||
let review_turn_context = TurnContext {
|
||||
@@ -2529,6 +2537,7 @@ mod tests {
|
||||
&session_configuration,
|
||||
conversation_id,
|
||||
"turn_id".to_string(),
|
||||
conversation_id.to_string(),
|
||||
);
|
||||
|
||||
let session = Session {
|
||||
@@ -2538,6 +2547,7 @@ mod tests {
|
||||
active_turn: Mutex::new(None),
|
||||
services,
|
||||
next_internal_sub_id: AtomicU64::new(0),
|
||||
prompt_cache_key: conversation_id.to_string(),
|
||||
};
|
||||
|
||||
(session, turn_context)
|
||||
@@ -2603,6 +2613,7 @@ mod tests {
|
||||
&session_configuration,
|
||||
conversation_id,
|
||||
"turn_id".to_string(),
|
||||
conversation_id.to_string(),
|
||||
));
|
||||
|
||||
let session = Arc::new(Session {
|
||||
@@ -2612,6 +2623,7 @@ mod tests {
|
||||
active_turn: Mutex::new(None),
|
||||
services,
|
||||
next_internal_sub_id: AtomicU64::new(0),
|
||||
prompt_cache_key: conversation_id.to_string(),
|
||||
});
|
||||
|
||||
(session, turn_context, rx_event)
|
||||
|
||||
@@ -45,6 +45,7 @@ pub(crate) async fn run_codex_conversation_interactive(
|
||||
auth_manager,
|
||||
InitialHistory::New,
|
||||
SessionSource::SubAgent(SubAgentSource::Review),
|
||||
Some(parent_session.get_prompt_cache_key()),
|
||||
)
|
||||
.await?;
|
||||
let codex = Arc::new(codex);
|
||||
|
||||
@@ -74,6 +74,7 @@ impl ConversationManager {
|
||||
auth_manager,
|
||||
InitialHistory::New,
|
||||
self.session_source.clone(),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
self.finalize_spawn(codex, conversation_id).await
|
||||
@@ -150,6 +151,7 @@ impl ConversationManager {
|
||||
auth_manager,
|
||||
initial_history,
|
||||
self.session_source.clone(),
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
self.finalize_spawn(codex, conversation_id).await
|
||||
@@ -175,6 +177,7 @@ impl ConversationManager {
|
||||
nth_user_message: usize,
|
||||
config: Config,
|
||||
path: PathBuf,
|
||||
conversation_id: ConversationId,
|
||||
) -> CodexResult<NewConversation> {
|
||||
// Compute the prefix up to the cut point.
|
||||
let history = RolloutRecorder::get_rollout_history(&path).await?;
|
||||
@@ -185,7 +188,14 @@ impl ConversationManager {
|
||||
let CodexSpawnOk {
|
||||
codex,
|
||||
conversation_id,
|
||||
} = Codex::spawn(config, auth_manager, history, self.session_source.clone()).await?;
|
||||
} = Codex::spawn(
|
||||
config,
|
||||
auth_manager,
|
||||
history,
|
||||
self.session_source.clone(),
|
||||
Some(conversation_id.to_string()),
|
||||
)
|
||||
.await?;
|
||||
|
||||
self.finalize_spawn(codex, conversation_id).await
|
||||
}
|
||||
|
||||
@@ -4,8 +4,6 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::AuthManager;
|
||||
use crate::ModelProviderInfo;
|
||||
use crate::client::ModelClient;
|
||||
use crate::client_common::Prompt;
|
||||
use crate::client_common::ResponseEvent;
|
||||
@@ -13,11 +11,9 @@ use crate::config::Config;
|
||||
use crate::protocol::SandboxPolicy;
|
||||
use askama::Template;
|
||||
use codex_otel::otel_event_manager::OtelEventManager;
|
||||
use codex_protocol::ConversationId;
|
||||
use codex_protocol::models::ContentItem;
|
||||
use codex_protocol::models::ResponseItem;
|
||||
use codex_protocol::protocol::SandboxCommandAssessment;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use futures::StreamExt;
|
||||
use serde_json::json;
|
||||
use tokio::time::timeout;
|
||||
@@ -50,11 +46,8 @@ struct SandboxAssessmentPromptTemplate<'a> {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub(crate) async fn assess_command(
|
||||
config: Arc<Config>,
|
||||
provider: ModelProviderInfo,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
parent_otel: &OtelEventManager,
|
||||
conversation_id: ConversationId,
|
||||
session_source: SessionSource,
|
||||
client: ModelClient,
|
||||
call_id: &str,
|
||||
command: &[String],
|
||||
sandbox_policy: &SandboxPolicy,
|
||||
@@ -132,20 +125,6 @@ pub(crate) async fn assess_command(
|
||||
output_schema: Some(sandbox_assessment_schema()),
|
||||
};
|
||||
|
||||
let child_otel =
|
||||
parent_otel.with_model(config.model.as_str(), config.model_family.slug.as_str());
|
||||
|
||||
let client = ModelClient::new(
|
||||
Arc::clone(&config),
|
||||
Some(auth_manager),
|
||||
child_otel,
|
||||
provider,
|
||||
config.model_reasoning_effort,
|
||||
config.model_reasoning_summary,
|
||||
conversation_id,
|
||||
session_source,
|
||||
);
|
||||
|
||||
let start = Instant::now();
|
||||
let assessment_result = timeout(SANDBOX_ASSESSMENT_TIMEOUT, async move {
|
||||
let mut stream = client.stream(&prompt).await?;
|
||||
|
||||
@@ -95,6 +95,7 @@ async fn run_request(input: Vec<ResponseItem>) -> Value {
|
||||
summary,
|
||||
conversation_id,
|
||||
codex_protocol::protocol::SessionSource::Exec,
|
||||
conversation_id.to_string(),
|
||||
);
|
||||
|
||||
let mut prompt = Prompt::default();
|
||||
|
||||
@@ -95,6 +95,7 @@ async fn run_stream_with_bytes(sse_body: &[u8]) -> Vec<ResponseEvent> {
|
||||
summary,
|
||||
conversation_id,
|
||||
codex_protocol::protocol::SessionSource::Exec,
|
||||
conversation_id.to_string(),
|
||||
);
|
||||
|
||||
let mut prompt = Prompt::default();
|
||||
|
||||
@@ -77,6 +77,7 @@ async fn responses_stream_includes_task_type_header() {
|
||||
summary,
|
||||
conversation_id,
|
||||
SessionSource::Exec,
|
||||
conversation_id.to_string(),
|
||||
);
|
||||
|
||||
let mut prompt = Prompt::default();
|
||||
|
||||
@@ -676,6 +676,7 @@ async fn azure_responses_request_includes_store_and_reasoning_ids() {
|
||||
summary,
|
||||
conversation_id,
|
||||
codex_protocol::protocol::SessionSource::Exec,
|
||||
conversation_id.to_string(),
|
||||
);
|
||||
|
||||
let mut prompt = Prompt::default();
|
||||
|
||||
@@ -21,6 +21,7 @@ use codex_core::config::OPENAI_DEFAULT_MODEL;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR;
|
||||
use codex_protocol::ConversationId;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::load_default_config_for_test;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
@@ -78,7 +79,8 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
|
||||
mount_initial_flow(&server).await;
|
||||
|
||||
// 2. Start a new conversation and drive it through the compact/resume/fork steps.
|
||||
let (_home, config, manager, base) = start_test_conversation(&server).await;
|
||||
let (_home, config, manager, base, base_conversation_id) =
|
||||
start_test_conversation(&server).await;
|
||||
|
||||
user_turn(&base, "hello world").await;
|
||||
compact_conversation(&base).await;
|
||||
@@ -97,7 +99,7 @@ async fn compact_resume_and_fork_preserve_model_history_view() {
|
||||
"compact+resume test expects resumed path {resumed_path:?} to exist",
|
||||
);
|
||||
|
||||
let forked = fork_conversation(&manager, &config, resumed_path, 2).await;
|
||||
let forked = fork_conversation(&manager, &config, resumed_path, 2, base_conversation_id).await;
|
||||
user_turn(&forked, "AFTER_FORK").await;
|
||||
|
||||
// 3. Capture the requests to the model and validate the history slices.
|
||||
@@ -535,7 +537,8 @@ async fn compact_resume_after_second_compaction_preserves_history() {
|
||||
mount_second_compact_flow(&server).await;
|
||||
|
||||
// 2. Drive the conversation through compact -> resume -> fork -> compact -> resume.
|
||||
let (_home, config, manager, base) = start_test_conversation(&server).await;
|
||||
let (_home, config, manager, base, base_conversation_id) =
|
||||
start_test_conversation(&server).await;
|
||||
|
||||
user_turn(&base, "hello world").await;
|
||||
compact_conversation(&base).await;
|
||||
@@ -554,7 +557,7 @@ async fn compact_resume_after_second_compaction_preserves_history() {
|
||||
"second compact test expects resumed path {resumed_path:?} to exist",
|
||||
);
|
||||
|
||||
let forked = fork_conversation(&manager, &config, resumed_path, 3).await;
|
||||
let forked = fork_conversation(&manager, &config, resumed_path, 3, base_conversation_id).await;
|
||||
user_turn(&forked, "AFTER_FORK").await;
|
||||
|
||||
compact_conversation(&forked).await;
|
||||
@@ -780,7 +783,13 @@ async fn mount_second_compact_flow(server: &MockServer) {
|
||||
|
||||
async fn start_test_conversation(
|
||||
server: &MockServer,
|
||||
) -> (TempDir, Config, ConversationManager, Arc<CodexConversation>) {
|
||||
) -> (
|
||||
TempDir,
|
||||
Config,
|
||||
ConversationManager,
|
||||
Arc<CodexConversation>,
|
||||
ConversationId,
|
||||
) {
|
||||
let model_provider = ModelProviderInfo {
|
||||
base_url: Some(format!("{}/v1", server.uri())),
|
||||
..built_in_model_providers()["openai"].clone()
|
||||
@@ -790,12 +799,16 @@ async fn start_test_conversation(
|
||||
config.model_provider = model_provider;
|
||||
|
||||
let manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"));
|
||||
let NewConversation { conversation, .. } = manager
|
||||
let NewConversation {
|
||||
conversation,
|
||||
conversation_id,
|
||||
..
|
||||
} = manager
|
||||
.new_conversation(config.clone())
|
||||
.await
|
||||
.expect("create conversation");
|
||||
|
||||
(home, config, manager, conversation)
|
||||
(home, config, manager, conversation, conversation_id)
|
||||
}
|
||||
|
||||
async fn user_turn(conversation: &Arc<CodexConversation>, text: &str) {
|
||||
@@ -840,9 +853,10 @@ async fn fork_conversation(
|
||||
config: &Config,
|
||||
path: std::path::PathBuf,
|
||||
nth_user_message: usize,
|
||||
conversation_id: ConversationId,
|
||||
) -> Arc<CodexConversation> {
|
||||
let NewConversation { conversation, .. } = manager
|
||||
.fork_conversation(nth_user_message, config.clone(), path)
|
||||
.fork_conversation(nth_user_message, config.clone(), path, conversation_id)
|
||||
.await
|
||||
.expect("fork conversation");
|
||||
conversation
|
||||
|
||||
@@ -58,6 +58,7 @@ async fn fork_conversation_twice_drops_to_first_message() {
|
||||
let conversation_manager = ConversationManager::with_auth(CodexAuth::from_api_key("dummy"));
|
||||
let NewConversation {
|
||||
conversation: codex,
|
||||
conversation_id,
|
||||
..
|
||||
} = conversation_manager
|
||||
.new_conversation(config)
|
||||
@@ -129,7 +130,12 @@ async fn fork_conversation_twice_drops_to_first_message() {
|
||||
conversation: codex_fork1,
|
||||
..
|
||||
} = conversation_manager
|
||||
.fork_conversation(1, config_for_fork.clone(), base_path.clone())
|
||||
.fork_conversation(
|
||||
1,
|
||||
config_for_fork.clone(),
|
||||
base_path.clone(),
|
||||
conversation_id,
|
||||
)
|
||||
.await
|
||||
.expect("fork 1");
|
||||
|
||||
@@ -147,7 +153,12 @@ async fn fork_conversation_twice_drops_to_first_message() {
|
||||
conversation: codex_fork2,
|
||||
..
|
||||
} = conversation_manager
|
||||
.fork_conversation(0, config_for_fork.clone(), fork1_path.clone())
|
||||
.fork_conversation(
|
||||
0,
|
||||
config_for_fork.clone(),
|
||||
fork1_path.clone(),
|
||||
conversation_id,
|
||||
)
|
||||
.await
|
||||
.expect("fork 2");
|
||||
|
||||
|
||||
@@ -305,7 +305,12 @@ impl App {
|
||||
let cfg = self.chat_widget.config_ref().clone();
|
||||
// Perform the fork via a thin wrapper for clarity/testability.
|
||||
let result = self
|
||||
.perform_fork(ev.path.clone(), nth_user_message, cfg.clone())
|
||||
.perform_fork(
|
||||
ev.path.clone(),
|
||||
nth_user_message,
|
||||
cfg.clone(),
|
||||
ev.conversation_id,
|
||||
)
|
||||
.await;
|
||||
match result {
|
||||
Ok(new_conv) => {
|
||||
@@ -321,9 +326,10 @@ impl App {
|
||||
path: PathBuf,
|
||||
nth_user_message: usize,
|
||||
cfg: codex_core::config::Config,
|
||||
conversation_id: ConversationId,
|
||||
) -> codex_core::error::Result<codex_core::NewConversation> {
|
||||
self.server
|
||||
.fork_conversation(nth_user_message, cfg, path)
|
||||
.fork_conversation(nth_user_message, cfg, path, conversation_id)
|
||||
.await
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user