mirror of
https://github.com/openai/codex.git
synced 2026-02-01 22:47:52 +00:00
Compare commits
2 Commits
add-contex
...
gpeal/dele
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
176e80a5af | ||
|
|
bc7ef54c72 |
@@ -1345,21 +1345,34 @@ async fn submission_loop(
|
||||
sess.set_task(task);
|
||||
}
|
||||
}
|
||||
Op::Shutdown => {
|
||||
Op::Shutdown { delete_rollout } => {
|
||||
info!("Shutting down Codex instance");
|
||||
|
||||
sess.interrupt_task();
|
||||
// Gracefully flush and shutdown rollout recorder on session end so tests
|
||||
// that inspect the rollout file do not race with the background writer.
|
||||
let recorder_opt = sess.rollout.lock_unchecked().take();
|
||||
if let Some(rec) = recorder_opt
|
||||
&& let Err(e) = rec.shutdown().await
|
||||
{
|
||||
warn!("failed to shutdown rollout recorder: {e}");
|
||||
let shutdown_error: Option<String> = match recorder_opt {
|
||||
Some(recorder) => match recorder.shutdown().await {
|
||||
Ok(()) => {
|
||||
if delete_rollout && let Err(e) = recorder.delete_rollout_file().await {
|
||||
warn!("failed to delete rollout file: {e}");
|
||||
Some(format!("Failed to delete rollout file: {e}"))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("failed to shutdown rollout recorder: {e}");
|
||||
Some("Failed to shutdown rollout recorder".to_string())
|
||||
}
|
||||
},
|
||||
None => None,
|
||||
};
|
||||
|
||||
if let Some(message) = shutdown_error {
|
||||
let event = Event {
|
||||
id: sub.id.clone(),
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: "Failed to shutdown rollout recorder".to_string(),
|
||||
}),
|
||||
msg: EventMsg::Error(ErrorEvent { message }),
|
||||
};
|
||||
if let Err(e) = sess.tx_event.send(event).await {
|
||||
warn!("failed to send error message: {e:?}");
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
use std::fs::File;
|
||||
use std::fs::{self};
|
||||
use std::io::Error as IoError;
|
||||
use std::io::ErrorKind;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
@@ -71,6 +72,8 @@ pub struct SavedSession {
|
||||
/// ```
|
||||
#[derive(Clone)]
|
||||
pub struct RolloutRecorder {
|
||||
/// The file path of the rollout jsonl.file.
|
||||
path: PathBuf,
|
||||
tx: Sender<RolloutCmd>,
|
||||
}
|
||||
|
||||
@@ -119,7 +122,7 @@ impl RolloutRecorder {
|
||||
/// cannot be created or the rollout file cannot be opened we return the
|
||||
/// error so the caller can decide whether to disable persistence.
|
||||
pub async fn new(config: &Config, params: RolloutRecorderParams) -> std::io::Result<Self> {
|
||||
let (file, meta) = match params {
|
||||
let (file, path, meta) = match params {
|
||||
RolloutRecorderParams::Create {
|
||||
conversation_id,
|
||||
instructions,
|
||||
@@ -128,6 +131,7 @@ impl RolloutRecorder {
|
||||
file,
|
||||
conversation_id: session_id,
|
||||
timestamp,
|
||||
path,
|
||||
} = create_log_file(config, conversation_id)?;
|
||||
|
||||
let timestamp_format: &[FormatItem] = format_description!(
|
||||
@@ -140,6 +144,7 @@ impl RolloutRecorder {
|
||||
|
||||
(
|
||||
tokio::fs::File::from_std(file),
|
||||
path,
|
||||
Some(SessionMeta {
|
||||
timestamp,
|
||||
id: session_id,
|
||||
@@ -147,13 +152,13 @@ impl RolloutRecorder {
|
||||
}),
|
||||
)
|
||||
}
|
||||
RolloutRecorderParams::Resume { path } => (
|
||||
tokio::fs::OpenOptions::new()
|
||||
RolloutRecorderParams::Resume { path } => {
|
||||
let file = tokio::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(path)
|
||||
.await?,
|
||||
None,
|
||||
),
|
||||
.open(&path)
|
||||
.await?;
|
||||
(file, path, None)
|
||||
}
|
||||
};
|
||||
|
||||
// Clone the cwd for the spawned task to collect git info asynchronously
|
||||
@@ -169,7 +174,19 @@ impl RolloutRecorder {
|
||||
// driver instead of blocking the runtime.
|
||||
tokio::task::spawn(rollout_writer(file, rx, meta, cwd));
|
||||
|
||||
Ok(Self { tx })
|
||||
Ok(Self { path, tx })
|
||||
}
|
||||
|
||||
pub async fn delete_rollout_file_at_path(path: &Path) -> std::io::Result<()> {
|
||||
match tokio::fs::remove_file(path).await {
|
||||
Ok(()) => Ok(()),
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => Ok(()),
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete_rollout_file(&self) -> std::io::Result<()> {
|
||||
Self::delete_rollout_file_at_path(&self.path).await
|
||||
}
|
||||
|
||||
pub(crate) async fn record_items(&self, items: &[ResponseItem]) -> std::io::Result<()> {
|
||||
@@ -294,6 +311,9 @@ struct LogFileInfo {
|
||||
|
||||
/// Timestamp for the start of the session.
|
||||
timestamp: OffsetDateTime,
|
||||
|
||||
/// Path to the created rollout file.
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
fn create_log_file(
|
||||
@@ -330,6 +350,7 @@ fn create_log_file(
|
||||
file,
|
||||
conversation_id,
|
||||
timestamp,
|
||||
path,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -271,7 +271,11 @@ pub async fn run_main(cli: Cli, codex_linux_sandbox_exe: Option<PathBuf>) -> any
|
||||
match shutdown {
|
||||
CodexStatus::Running => continue,
|
||||
CodexStatus::InitiateShutdown => {
|
||||
conversation.submit(Op::Shutdown).await?;
|
||||
conversation
|
||||
.submit(Op::Shutdown {
|
||||
delete_rollout: false,
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
CodexStatus::Shutdown => {
|
||||
break;
|
||||
|
||||
@@ -40,6 +40,8 @@ use codex_protocol::mcp_protocol::AuthStatusChangeNotification;
|
||||
use codex_protocol::mcp_protocol::ClientRequest;
|
||||
use codex_protocol::mcp_protocol::ConversationId;
|
||||
use codex_protocol::mcp_protocol::ConversationSummary;
|
||||
use codex_protocol::mcp_protocol::DeleteConversationParams;
|
||||
use codex_protocol::mcp_protocol::DeleteConversationResponse;
|
||||
use codex_protocol::mcp_protocol::EXEC_COMMAND_APPROVAL_METHOD;
|
||||
use codex_protocol::mcp_protocol::ExecArbitraryCommandResponse;
|
||||
use codex_protocol::mcp_protocol::ExecCommandApprovalParams;
|
||||
@@ -142,6 +144,9 @@ impl CodexMessageProcessor {
|
||||
ClientRequest::ResumeConversation { request_id, params } => {
|
||||
self.handle_resume_conversation(request_id, params).await;
|
||||
}
|
||||
ClientRequest::DeleteConversation { request_id, params } => {
|
||||
self.handle_delete_conversation(request_id, params).await;
|
||||
}
|
||||
ClientRequest::SendUserMessage { request_id, params } => {
|
||||
self.send_user_message(request_id, params).await;
|
||||
}
|
||||
@@ -669,6 +674,81 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_delete_conversation(
|
||||
&self,
|
||||
request_id: RequestId,
|
||||
params: DeleteConversationParams,
|
||||
) {
|
||||
let DeleteConversationParams {
|
||||
conversation_id,
|
||||
rollout_path,
|
||||
} = params;
|
||||
|
||||
let conversation = match self
|
||||
.conversation_manager
|
||||
.get_conversation(conversation_id)
|
||||
.await
|
||||
{
|
||||
Ok(conversation) => conversation,
|
||||
Err(_) => {
|
||||
if let Some(path) = rollout_path {
|
||||
if let Err(err) = RolloutRecorder::delete_rollout_file_at_path(&path).await {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!(
|
||||
"failed to delete rollout file {}: {err}",
|
||||
path.display()
|
||||
),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, DeleteConversationResponse {})
|
||||
.await;
|
||||
} else {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!(
|
||||
"conversation {conversation_id} not found and no rollout path provided"
|
||||
),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
}
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Err(err) = conversation
|
||||
.submit(Op::Shutdown {
|
||||
delete_rollout: true,
|
||||
})
|
||||
.await
|
||||
{
|
||||
let error = JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("error shutting down conversation {conversation_id}: {err}"),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
|
||||
self.conversation_manager
|
||||
.remove_conversation(conversation_id)
|
||||
.await;
|
||||
{
|
||||
let mut pending = self.pending_interrupts.lock().await;
|
||||
pending.remove(&conversation_id);
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, DeleteConversationResponse {})
|
||||
.await;
|
||||
}
|
||||
|
||||
async fn send_user_message(&self, request_id: RequestId, params: SendUserMessageParams) {
|
||||
let SendUserMessageParams {
|
||||
conversation_id,
|
||||
|
||||
@@ -91,6 +91,11 @@ pub enum ClientRequest {
|
||||
request_id: RequestId,
|
||||
params: ResumeConversationParams,
|
||||
},
|
||||
DeleteConversation {
|
||||
#[serde(rename = "id")]
|
||||
request_id: RequestId,
|
||||
params: DeleteConversationParams,
|
||||
},
|
||||
SendUserMessage {
|
||||
#[serde(rename = "id")]
|
||||
request_id: RequestId,
|
||||
@@ -214,6 +219,18 @@ pub struct ResumeConversationResponse {
|
||||
pub initial_messages: Option<Vec<EventMsg>>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct DeleteConversationParams {
|
||||
pub conversation_id: ConversationId,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub rollout_path: Option<PathBuf>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct DeleteConversationResponse {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ListConversationsParams {
|
||||
|
||||
@@ -163,7 +163,11 @@ pub enum Op {
|
||||
/// to generate a summary which will be returned as an AgentMessage event.
|
||||
Compact,
|
||||
/// Request to shut down codex instance.
|
||||
Shutdown,
|
||||
Shutdown {
|
||||
/// Whether to delete the rollout file after the session shuts down.
|
||||
#[serde(default)]
|
||||
delete_rollout: bool,
|
||||
},
|
||||
}
|
||||
|
||||
/// Determines the conditions under which the user is consulted to approve
|
||||
|
||||
@@ -1298,7 +1298,9 @@ impl ChatWidget {
|
||||
return;
|
||||
}
|
||||
|
||||
self.submit_op(Op::Shutdown);
|
||||
self.submit_op(Op::Shutdown {
|
||||
delete_rollout: false,
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn composer_is_empty(&self) -> bool {
|
||||
|
||||
Reference in New Issue
Block a user