refactor to share archive logic

This commit is contained in:
Owen Lin
2025-11-02 10:06:12 -08:00
parent 27251b597d
commit e249fcdc1d

View File

@@ -980,9 +980,7 @@ impl CodexMessageProcessor {
Err(err) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"failed to locate conversation id {conversation_id}: {err}"
),
message: format!("failed to locate conversation id {conversation_id}: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
@@ -990,83 +988,95 @@ impl CodexMessageProcessor {
}
};
// The remaining logic mirrors archive_conversation(), but returns
// ThreadArchiveResponse for v2.
let rollout_folder = self.config.codex_home.join(codex_core::SESSIONS_SUBDIR);
let canonical_rollout_path = tokio::fs::canonicalize(&rollout_path).await;
let canonical_rollout_path = if let Ok(path) = canonical_rollout_path
&& path.starts_with(&rollout_folder)
match self
.archive_conversation_common(conversation_id, &rollout_path)
.await
{
path
} else {
let error = JSONRPCErrorError {
Ok(()) => {
let response = ThreadArchiveResponse {};
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
self.outgoing.send_error(request_id, err).await;
}
}
}
async fn archive_conversation_common(
&self,
conversation_id: ConversationId,
rollout_path: &Path,
) -> Result<(), JSONRPCErrorError> {
// Verify rollout_path is under sessions dir.
let rollout_folder = self.config.codex_home.join(codex_core::SESSIONS_SUBDIR);
let canonical_rollout_path =
tokio::fs::canonicalize(rollout_path)
.await
.map_err(|_| JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"rollout path `{}` must be in sessions directory",
rollout_path.display()
),
data: None,
})?;
if !canonical_rollout_path.starts_with(&rollout_folder) {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"rollout path `{}` must be in sessions directory",
rollout_path.display()
),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
};
});
}
// Verify file name matches conversation id.
let required_suffix = format!("{conversation_id}.jsonl");
let Some(file_name) = canonical_rollout_path.file_name().map(OsStr::to_owned) else {
let error = JSONRPCErrorError {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"rollout path `{}` missing file name",
rollout_path.display()
),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
});
};
if !file_name
.to_string_lossy()
.ends_with(required_suffix.as_str())
{
let error = JSONRPCErrorError {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"rollout path `{}` does not match conversation id {conversation_id}",
rollout_path.display()
),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
});
}
let removed_conversation = self
// If the conversation is active, request shutdown and wait briefly.
if let Some(conversation) = self
.conversation_manager
.remove_conversation(&conversation_id)
.await;
if let Some(conversation) = removed_conversation {
.await
{
info!("conversation {conversation_id} was active; shutting down");
let conversation_clone = conversation.clone();
let notify = Arc::new(tokio::sync::Notify::new());
let notify_clone = notify.clone();
let is_shutdown = tokio::spawn(async move {
loop {
select! {
_ = notify_clone.notified() => {
break;
}
_ = notify_clone.notified() => { break; }
event = conversation_clone.next_event() => {
if let Ok(event) = event && matches!(event.msg, EventMsg::ShutdownComplete) {
break;
}
if let Ok(event) = event && matches!(event.msg, EventMsg::ShutdownComplete) { break; }
}
}
}
});
match conversation.submit(Op::Shutdown).await {
Ok(_) => {
select! {
@@ -1084,6 +1094,7 @@ impl CodexMessageProcessor {
}
}
// Move the rollout file to archived.
let result: std::io::Result<()> = async {
let archive_folder = self
.config
@@ -1095,20 +1106,11 @@ impl CodexMessageProcessor {
}
.await;
match result {
Ok(()) => {
let response = ThreadArchiveResponse {};
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to archive conversation: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
}
result.map_err(|err| JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to archive conversation: {err}"),
data: None,
})
}
async fn get_conversation_summary(
@@ -1471,132 +1473,16 @@ impl CodexMessageProcessor {
conversation_id,
rollout_path,
} = params;
// Verify that the rollout path is in the sessions directory or else
// a malicious client could specify an arbitrary path.
let rollout_folder = self.config.codex_home.join(codex_core::SESSIONS_SUBDIR);
let canonical_rollout_path = tokio::fs::canonicalize(&rollout_path).await;
let canonical_rollout_path = if let Ok(path) = canonical_rollout_path
&& path.starts_with(&rollout_folder)
match self
.archive_conversation_common(conversation_id, &rollout_path)
.await
{
path
} else {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"rollout path `{}` must be in sessions directory",
rollout_path.display()
),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
};
let required_suffix = format!("{conversation_id}.jsonl");
let Some(file_name) = canonical_rollout_path.file_name().map(OsStr::to_owned) else {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"rollout path `{}` missing file name",
rollout_path.display()
),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
};
if !file_name
.to_string_lossy()
.ends_with(required_suffix.as_str())
{
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"rollout path `{}` does not match conversation id {conversation_id}",
rollout_path.display()
),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
let removed_conversation = self
.conversation_manager
.remove_conversation(&conversation_id)
.await;
if let Some(conversation) = removed_conversation {
info!("conversation {conversation_id} was active; shutting down");
let conversation_clone = conversation.clone();
let notify = Arc::new(tokio::sync::Notify::new());
let notify_clone = notify.clone();
// Establish the listener for ShutdownComplete before submitting
// Shutdown so it is not missed.
let is_shutdown = tokio::spawn(async move {
loop {
select! {
_ = notify_clone.notified() => {
break;
}
event = conversation_clone.next_event() => {
if let Ok(event) = event && matches!(event.msg, EventMsg::ShutdownComplete) {
break;
}
}
}
}
});
// Request shutdown.
match conversation.submit(Op::Shutdown).await {
Ok(_) => {
// Successfully submitted Shutdown; wait before proceeding.
select! {
_ = is_shutdown => {
// Normal shutdown: proceed with archive.
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
warn!("conversation {conversation_id} shutdown timed out; proceeding with archive");
notify.notify_one();
}
}
}
Err(err) => {
error!("failed to submit Shutdown to conversation {conversation_id}: {err}");
notify.notify_one();
// Perhaps we lost a shutdown race, so let's continue to
// clean up the .jsonl file.
}
}
}
// Move the .jsonl file to the archived sessions subdir.
let result: std::io::Result<()> = async {
let archive_folder = self
.config
.codex_home
.join(codex_core::ARCHIVED_SESSIONS_SUBDIR);
tokio::fs::create_dir_all(&archive_folder).await?;
tokio::fs::rename(&canonical_rollout_path, &archive_folder.join(&file_name)).await?;
Ok(())
}
.await;
match result {
Ok(()) => {
let response = ArchiveConversationResponse {};
self.outgoing.send_response(request_id, response).await;
}
Err(err) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to archive conversation: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
self.outgoing.send_error(request_id, err).await;
}
}
}