mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
fix: attempting to resume an existing conversation in ConversationManager should reuse it
This commit is contained in:
@@ -32,8 +32,34 @@ pub struct NewConversation {
|
||||
|
||||
/// [`ConversationManager`] is responsible for creating conversations and
|
||||
/// maintaining them in memory.
|
||||
#[derive(Clone)]
|
||||
struct ConversationEntry {
|
||||
conversation: Arc<CodexConversation>,
|
||||
session_configured: SessionConfiguredEvent,
|
||||
}
|
||||
|
||||
impl ConversationEntry {
|
||||
fn new(
|
||||
conversation: Arc<CodexConversation>,
|
||||
session_configured: SessionConfiguredEvent,
|
||||
) -> Self {
|
||||
Self {
|
||||
conversation,
|
||||
session_configured,
|
||||
}
|
||||
}
|
||||
|
||||
fn to_new_conversation(&self, conversation_id: ConversationId) -> NewConversation {
|
||||
NewConversation {
|
||||
conversation_id,
|
||||
conversation: self.conversation.clone(),
|
||||
session_configured: self.session_configured.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ConversationManager {
|
||||
conversations: Arc<RwLock<HashMap<ConversationId, Arc<CodexConversation>>>>,
|
||||
conversations: Arc<RwLock<HashMap<ConversationId, ConversationEntry>>>,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
session_source: SessionSource,
|
||||
}
|
||||
@@ -99,10 +125,11 @@ impl ConversationManager {
|
||||
};
|
||||
|
||||
let conversation = Arc::new(CodexConversation::new(codex));
|
||||
let entry = ConversationEntry::new(conversation.clone(), session_configured.clone());
|
||||
self.conversations
|
||||
.write()
|
||||
.await
|
||||
.insert(conversation_id, conversation.clone());
|
||||
.insert(conversation_id, entry);
|
||||
|
||||
Ok(NewConversation {
|
||||
conversation_id,
|
||||
@@ -118,7 +145,7 @@ impl ConversationManager {
|
||||
let conversations = self.conversations.read().await;
|
||||
conversations
|
||||
.get(&conversation_id)
|
||||
.cloned()
|
||||
.map(|entry| entry.conversation.clone())
|
||||
.ok_or_else(|| CodexErr::ConversationNotFound(conversation_id))
|
||||
}
|
||||
|
||||
@@ -129,11 +156,22 @@ impl ConversationManager {
|
||||
auth_manager: Arc<AuthManager>,
|
||||
) -> CodexResult<NewConversation> {
|
||||
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
|
||||
let CodexSpawnOk {
|
||||
codex,
|
||||
conversation_id,
|
||||
} = Codex::spawn(config, auth_manager, initial_history, self.session_source).await?;
|
||||
self.finalize_spawn(codex, conversation_id).await
|
||||
if let InitialHistory::Resumed(resumed) = &initial_history
|
||||
&& let Some(existing) = self
|
||||
.conversations
|
||||
.read()
|
||||
.await
|
||||
.get(&resumed.conversation_id)
|
||||
.cloned()
|
||||
{
|
||||
Ok(existing.to_new_conversation(resumed.conversation_id))
|
||||
} else {
|
||||
let CodexSpawnOk {
|
||||
codex,
|
||||
conversation_id,
|
||||
} = Codex::spawn(config, auth_manager, initial_history, self.session_source).await?;
|
||||
self.finalize_spawn(codex, conversation_id).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes the conversation from the manager's internal map, though the
|
||||
@@ -144,7 +182,11 @@ impl ConversationManager {
|
||||
&self,
|
||||
conversation_id: &ConversationId,
|
||||
) -> Option<Arc<CodexConversation>> {
|
||||
self.conversations.write().await.remove(conversation_id)
|
||||
self.conversations
|
||||
.write()
|
||||
.await
|
||||
.remove(conversation_id)
|
||||
.map(|entry| entry.conversation)
|
||||
}
|
||||
|
||||
/// Fork an existing conversation by taking messages up to the given position
|
||||
|
||||
@@ -247,7 +247,11 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
|
||||
session_configured,
|
||||
..
|
||||
} = conversation_manager
|
||||
.resume_conversation_from_rollout(config, session_path.clone(), auth_manager)
|
||||
.resume_conversation_from_rollout(
|
||||
config.clone(),
|
||||
session_path.clone(),
|
||||
auth_manager.clone(),
|
||||
)
|
||||
.await
|
||||
.expect("resume conversation");
|
||||
|
||||
@@ -260,6 +264,23 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
|
||||
let expected_initial_json = json!([]);
|
||||
assert_eq!(initial_json, expected_initial_json);
|
||||
|
||||
let NewConversation {
|
||||
conversation: codex_again,
|
||||
session_configured: session_configured_again,
|
||||
..
|
||||
} = conversation_manager
|
||||
.resume_conversation_from_rollout(
|
||||
config.clone(),
|
||||
session_path.clone(),
|
||||
auth_manager.clone(),
|
||||
)
|
||||
.await
|
||||
.expect("resume existing conversation");
|
||||
assert!(Arc::ptr_eq(&codex, &codex_again));
|
||||
let session_configured_json = serde_json::to_value(&session_configured).unwrap();
|
||||
let session_configured_again_json = serde_json::to_value(&session_configured_again).unwrap();
|
||||
assert_eq!(session_configured_json, session_configured_again_json);
|
||||
|
||||
// 2) Submit new input; the request body must include the prior item followed by the new user input.
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
|
||||
Reference in New Issue
Block a user