Compare commits

...

2 Commits

Author SHA1 Message Date
Gabriel Peal
176e80a5af fmt 2025-09-08 21:18:20 -07:00
Gabriel Peal
bc7ef54c72 impl 2025-09-08 21:16:11 -07:00
7 changed files with 161 additions and 20 deletions

View File

@@ -1345,21 +1345,34 @@ async fn submission_loop(
sess.set_task(task);
}
}
Op::Shutdown => {
Op::Shutdown { delete_rollout } => {
info!("Shutting down Codex instance");
sess.interrupt_task();
// Gracefully flush and shutdown rollout recorder on session end so tests
// that inspect the rollout file do not race with the background writer.
let recorder_opt = sess.rollout.lock_unchecked().take();
if let Some(rec) = recorder_opt
&& let Err(e) = rec.shutdown().await
{
warn!("failed to shutdown rollout recorder: {e}");
let shutdown_error: Option<String> = match recorder_opt {
Some(recorder) => match recorder.shutdown().await {
Ok(()) => {
if delete_rollout && let Err(e) = recorder.delete_rollout_file().await {
warn!("failed to delete rollout file: {e}");
Some(format!("Failed to delete rollout file: {e}"))
} else {
None
}
}
Err(e) => {
warn!("failed to shutdown rollout recorder: {e}");
Some("Failed to shutdown rollout recorder".to_string())
}
},
None => None,
};
if let Some(message) = shutdown_error {
let event = Event {
id: sub.id.clone(),
msg: EventMsg::Error(ErrorEvent {
message: "Failed to shutdown rollout recorder".to_string(),
}),
msg: EventMsg::Error(ErrorEvent { message }),
};
if let Err(e) = sess.tx_event.send(event).await {
warn!("failed to send error message: {e:?}");

View File

@@ -3,6 +3,7 @@
use std::fs::File;
use std::fs::{self};
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::path::Path;
use std::path::PathBuf;
@@ -71,6 +72,8 @@ pub struct SavedSession {
/// ```
#[derive(Clone)]
pub struct RolloutRecorder {
/// The file path of the rollout jsonl.file.
path: PathBuf,
tx: Sender<RolloutCmd>,
}
@@ -119,7 +122,7 @@ impl RolloutRecorder {
/// cannot be created or the rollout file cannot be opened we return the
/// error so the caller can decide whether to disable persistence.
pub async fn new(config: &Config, params: RolloutRecorderParams) -> std::io::Result<Self> {
let (file, meta) = match params {
let (file, path, meta) = match params {
RolloutRecorderParams::Create {
conversation_id,
instructions,
@@ -128,6 +131,7 @@ impl RolloutRecorder {
file,
conversation_id: session_id,
timestamp,
path,
} = create_log_file(config, conversation_id)?;
let timestamp_format: &[FormatItem] = format_description!(
@@ -140,6 +144,7 @@ impl RolloutRecorder {
(
tokio::fs::File::from_std(file),
path,
Some(SessionMeta {
timestamp,
id: session_id,
@@ -147,13 +152,13 @@ impl RolloutRecorder {
}),
)
}
RolloutRecorderParams::Resume { path } => (
tokio::fs::OpenOptions::new()
RolloutRecorderParams::Resume { path } => {
let file = tokio::fs::OpenOptions::new()
.append(true)
.open(path)
.await?,
None,
),
.open(&path)
.await?;
(file, path, None)
}
};
// Clone the cwd for the spawned task to collect git info asynchronously
@@ -169,7 +174,19 @@ impl RolloutRecorder {
// driver instead of blocking the runtime.
tokio::task::spawn(rollout_writer(file, rx, meta, cwd));
Ok(Self { tx })
Ok(Self { path, tx })
}
pub async fn delete_rollout_file_at_path(path: &Path) -> std::io::Result<()> {
match tokio::fs::remove_file(path).await {
Ok(()) => Ok(()),
Err(err) if err.kind() == ErrorKind::NotFound => Ok(()),
Err(err) => Err(err),
}
}
pub async fn delete_rollout_file(&self) -> std::io::Result<()> {
Self::delete_rollout_file_at_path(&self.path).await
}
pub(crate) async fn record_items(&self, items: &[ResponseItem]) -> std::io::Result<()> {
@@ -294,6 +311,9 @@ struct LogFileInfo {
/// Timestamp for the start of the session.
timestamp: OffsetDateTime,
/// Path to the created rollout file.
path: PathBuf,
}
fn create_log_file(
@@ -330,6 +350,7 @@ fn create_log_file(
file,
conversation_id,
timestamp,
path,
})
}

View File

@@ -271,7 +271,11 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
match shutdown {
CodexStatus::Running => continue,
CodexStatus::InitiateShutdown => {
conversation.submit(Op::Shutdown).await?;
conversation
.submit(Op::Shutdown {
delete_rollout: false,
})
.await?;
}
CodexStatus::Shutdown => {
break;

View File

@@ -40,6 +40,8 @@ use codex_protocol::mcp_protocol::AuthStatusChangeNotification;
use codex_protocol::mcp_protocol::ClientRequest;
use codex_protocol::mcp_protocol::ConversationId;
use codex_protocol::mcp_protocol::ConversationSummary;
use codex_protocol::mcp_protocol::DeleteConversationParams;
use codex_protocol::mcp_protocol::DeleteConversationResponse;
use codex_protocol::mcp_protocol::EXEC_COMMAND_APPROVAL_METHOD;
use codex_protocol::mcp_protocol::ExecArbitraryCommandResponse;
use codex_protocol::mcp_protocol::ExecCommandApprovalParams;
@@ -142,6 +144,9 @@ impl CodexMessageProcessor {
ClientRequest::ResumeConversation { request_id, params } => {
self.handle_resume_conversation(request_id, params).await;
}
ClientRequest::DeleteConversation { request_id, params } => {
self.handle_delete_conversation(request_id, params).await;
}
ClientRequest::SendUserMessage { request_id, params } => {
self.send_user_message(request_id, params).await;
}
@@ -669,6 +674,81 @@ impl CodexMessageProcessor {
}
}
async fn handle_delete_conversation(
&self,
request_id: RequestId,
params: DeleteConversationParams,
) {
let DeleteConversationParams {
conversation_id,
rollout_path,
} = params;
let conversation = match self
.conversation_manager
.get_conversation(conversation_id)
.await
{
Ok(conversation) => conversation,
Err(_) => {
if let Some(path) = rollout_path {
if let Err(err) = RolloutRecorder::delete_rollout_file_at_path(&path).await {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!(
"failed to delete rollout file {}: {err}",
path.display()
),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
self.outgoing
.send_response(request_id, DeleteConversationResponse {})
.await;
} else {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"conversation {conversation_id} not found and no rollout path provided"
),
data: None,
};
self.outgoing.send_error(request_id, error).await;
}
return;
}
};
if let Err(err) = conversation
.submit(Op::Shutdown {
delete_rollout: true,
})
.await
{
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("error shutting down conversation {conversation_id}: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
self.conversation_manager
.remove_conversation(conversation_id)
.await;
{
let mut pending = self.pending_interrupts.lock().await;
pending.remove(&conversation_id);
}
self.outgoing
.send_response(request_id, DeleteConversationResponse {})
.await;
}
async fn send_user_message(&self, request_id: RequestId, params: SendUserMessageParams) {
let SendUserMessageParams {
conversation_id,

View File

@@ -91,6 +91,11 @@ pub enum ClientRequest {
request_id: RequestId,
params: ResumeConversationParams,
},
DeleteConversation {
#[serde(rename = "id")]
request_id: RequestId,
params: DeleteConversationParams,
},
SendUserMessage {
#[serde(rename = "id")]
request_id: RequestId,
@@ -214,6 +219,18 @@ pub struct ResumeConversationResponse {
pub initial_messages: Option<Vec<EventMsg>>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)]
#[serde(rename_all = "camelCase")]
pub struct DeleteConversationParams {
pub conversation_id: ConversationId,
#[serde(skip_serializing_if = "Option::is_none")]
pub rollout_path: Option<PathBuf>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)]
#[serde(rename_all = "camelCase")]
pub struct DeleteConversationResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, TS)]
#[serde(rename_all = "camelCase")]
pub struct ListConversationsParams {

View File

@@ -163,7 +163,11 @@ pub enum Op {
/// to generate a summary which will be returned as an AgentMessage event.
Compact,
/// Request to shut down codex instance.
Shutdown,
Shutdown {
/// Whether to delete the rollout file after the session shuts down.
#[serde(default)]
delete_rollout: bool,
},
}
/// Determines the conditions under which the user is consulted to approve

View File

@@ -1298,7 +1298,9 @@ impl ChatWidget {
return;
}
self.submit_op(Op::Shutdown);
self.submit_op(Op::Shutdown {
delete_rollout: false,
});
}
pub(crate) fn composer_is_empty(&self) -> bool {