Compare commits

...

1 Commits
main ... pr5657

Author SHA1 Message Date
Michael Bolin
4a62376e6b fix: attempting to resume an existing conversation in ConversationManager should reuse it 2025-10-24 16:12:43 -07:00
2 changed files with 73 additions and 10 deletions

View File

@@ -32,8 +32,34 @@ pub struct NewConversation {
/// [`ConversationManager`] is responsible for creating conversations and
/// maintaining them in memory.
#[derive(Clone)]
struct ConversationEntry {
conversation: Arc<CodexConversation>,
session_configured: SessionConfiguredEvent,
}
impl ConversationEntry {
fn new(
conversation: Arc<CodexConversation>,
session_configured: SessionConfiguredEvent,
) -> Self {
Self {
conversation,
session_configured,
}
}
fn to_new_conversation(&self, conversation_id: ConversationId) -> NewConversation {
NewConversation {
conversation_id,
conversation: self.conversation.clone(),
session_configured: self.session_configured.clone(),
}
}
}
pub struct ConversationManager {
conversations: Arc<RwLock<HashMap<ConversationId, Arc<CodexConversation>>>>,
conversations: Arc<RwLock<HashMap<ConversationId, ConversationEntry>>>,
auth_manager: Arc<AuthManager>,
session_source: SessionSource,
}
@@ -99,10 +125,11 @@ impl ConversationManager {
};
let conversation = Arc::new(CodexConversation::new(codex));
let entry = ConversationEntry::new(conversation.clone(), session_configured.clone());
self.conversations
.write()
.await
.insert(conversation_id, conversation.clone());
.insert(conversation_id, entry);
Ok(NewConversation {
conversation_id,
@@ -118,7 +145,7 @@ impl ConversationManager {
let conversations = self.conversations.read().await;
conversations
.get(&conversation_id)
.cloned()
.map(|entry| entry.conversation.clone())
.ok_or_else(|| CodexErr::ConversationNotFound(conversation_id))
}
@@ -129,11 +156,22 @@ impl ConversationManager {
auth_manager: Arc<AuthManager>,
) -> CodexResult<NewConversation> {
let initial_history = RolloutRecorder::get_rollout_history(&rollout_path).await?;
let CodexSpawnOk {
codex,
conversation_id,
} = Codex::spawn(config, auth_manager, initial_history, self.session_source).await?;
self.finalize_spawn(codex, conversation_id).await
if let InitialHistory::Resumed(resumed) = &initial_history
&& let Some(existing) = self
.conversations
.read()
.await
.get(&resumed.conversation_id)
.cloned()
{
Ok(existing.to_new_conversation(resumed.conversation_id))
} else {
let CodexSpawnOk {
codex,
conversation_id,
} = Codex::spawn(config, auth_manager, initial_history, self.session_source).await?;
self.finalize_spawn(codex, conversation_id).await
}
}
/// Removes the conversation from the manager's internal map, though the
@@ -144,7 +182,11 @@ impl ConversationManager {
&self,
conversation_id: &ConversationId,
) -> Option<Arc<CodexConversation>> {
self.conversations.write().await.remove(conversation_id)
self.conversations
.write()
.await
.remove(conversation_id)
.map(|entry| entry.conversation)
}
/// Fork an existing conversation by taking messages up to the given position

View File

@@ -247,7 +247,11 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
session_configured,
..
} = conversation_manager
.resume_conversation_from_rollout(config, session_path.clone(), auth_manager)
.resume_conversation_from_rollout(
config.clone(),
session_path.clone(),
auth_manager.clone(),
)
.await
.expect("resume conversation");
@@ -260,6 +264,23 @@ async fn resume_includes_initial_messages_and_sends_prior_items() {
let expected_initial_json = json!([]);
assert_eq!(initial_json, expected_initial_json);
let NewConversation {
conversation: codex_again,
session_configured: session_configured_again,
..
} = conversation_manager
.resume_conversation_from_rollout(
config.clone(),
session_path.clone(),
auth_manager.clone(),
)
.await
.expect("resume existing conversation");
assert!(Arc::ptr_eq(&codex, &codex_again));
let session_configured_json = serde_json::to_value(&session_configured).unwrap();
let session_configured_again_json = serde_json::to_value(&session_configured_again).unwrap();
assert_eq!(session_configured_json, session_configured_again_json);
// 2) Submit new input; the request body must include the prior item followed by the new user input.
codex
.submit(Op::UserInput {