mirror of
https://github.com/openai/codex.git
synced 2026-05-05 03:47:01 +00:00
Compare commits
11 Commits
codex-wind
...
etraut/shu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
73af41cf2c | ||
|
|
3ad47058d4 | ||
|
|
c37a0c8314 | ||
|
|
fc9ae98657 | ||
|
|
93eb7ec719 | ||
|
|
7b22f4a97d | ||
|
|
ba96d37df4 | ||
|
|
7615a2a93a | ||
|
|
fea9f89215 | ||
|
|
b7669fc7e9 | ||
|
|
4891934519 |
@@ -402,6 +402,7 @@ enum AppListLoadResult {
|
||||
|
||||
enum ThreadShutdownResult {
|
||||
Complete,
|
||||
ShutdownFailed(String),
|
||||
SubmitFailed,
|
||||
TimedOut,
|
||||
}
|
||||
@@ -2171,6 +2172,9 @@ impl CodexMessageProcessor {
|
||||
for thread_id in report.submit_failed {
|
||||
warn!("failed to submit Shutdown to thread {thread_id}");
|
||||
}
|
||||
for (thread_id, error) in report.shutdown_failed {
|
||||
warn!("thread {thread_id} shutdown completed with error: {error}");
|
||||
}
|
||||
for thread_id in report.timed_out {
|
||||
warn!("timed out waiting for thread {thread_id} to shut down");
|
||||
}
|
||||
@@ -5433,6 +5437,9 @@ impl CodexMessageProcessor {
|
||||
async fn wait_for_thread_shutdown(thread: &Arc<CodexThread>) -> ThreadShutdownResult {
|
||||
match tokio::time::timeout(Duration::from_secs(10), thread.shutdown_and_wait()).await {
|
||||
Ok(Ok(())) => ThreadShutdownResult::Complete,
|
||||
Ok(Err(CodexErr::ShutdownFailed(message))) => {
|
||||
ThreadShutdownResult::ShutdownFailed(message)
|
||||
}
|
||||
Ok(Err(_)) => ThreadShutdownResult::SubmitFailed,
|
||||
Err(_) => ThreadShutdownResult::TimedOut,
|
||||
}
|
||||
@@ -5515,40 +5522,41 @@ impl CodexMessageProcessor {
|
||||
let thread_manager = self.thread_manager.clone();
|
||||
let thread_watch_manager = self.thread_watch_manager.clone();
|
||||
tokio::spawn(async move {
|
||||
match Self::wait_for_thread_shutdown(&thread).await {
|
||||
ThreadShutdownResult::Complete => {
|
||||
if thread_manager.remove_thread(&thread_id).await.is_none() {
|
||||
info!(
|
||||
"thread {thread_id} was already removed before unsubscribe finalized"
|
||||
);
|
||||
thread_watch_manager
|
||||
.remove_thread(&thread_id.to_string())
|
||||
.await;
|
||||
pending_thread_unloads.lock().await.remove(&thread_id);
|
||||
return;
|
||||
}
|
||||
thread_watch_manager
|
||||
.remove_thread(&thread_id.to_string())
|
||||
.await;
|
||||
let notification = ThreadClosedNotification {
|
||||
thread_id: thread_id.to_string(),
|
||||
};
|
||||
outgoing
|
||||
.send_server_notification(ServerNotification::ThreadClosed(
|
||||
notification,
|
||||
))
|
||||
.await;
|
||||
pending_thread_unloads.lock().await.remove(&thread_id);
|
||||
}
|
||||
let shutdown_error = match Self::wait_for_thread_shutdown(&thread).await {
|
||||
ThreadShutdownResult::Complete => None,
|
||||
ThreadShutdownResult::ShutdownFailed(error) => Some(error),
|
||||
ThreadShutdownResult::SubmitFailed => {
|
||||
pending_thread_unloads.lock().await.remove(&thread_id);
|
||||
warn!("failed to submit Shutdown to thread {thread_id}");
|
||||
return;
|
||||
}
|
||||
ThreadShutdownResult::TimedOut => {
|
||||
pending_thread_unloads.lock().await.remove(&thread_id);
|
||||
warn!("thread {thread_id} shutdown timed out; leaving thread loaded");
|
||||
return;
|
||||
}
|
||||
};
|
||||
if let Some(error) = shutdown_error {
|
||||
error!("thread {thread_id} shutdown completed with error: {error}");
|
||||
}
|
||||
if thread_manager.remove_thread(&thread_id).await.is_none() {
|
||||
info!("thread {thread_id} was already removed before unsubscribe finalized");
|
||||
thread_watch_manager
|
||||
.remove_thread(&thread_id.to_string())
|
||||
.await;
|
||||
pending_thread_unloads.lock().await.remove(&thread_id);
|
||||
return;
|
||||
}
|
||||
thread_watch_manager
|
||||
.remove_thread(&thread_id.to_string())
|
||||
.await;
|
||||
let notification = ThreadClosedNotification {
|
||||
thread_id: thread_id.to_string(),
|
||||
};
|
||||
outgoing
|
||||
.send_server_notification(ServerNotification::ThreadClosed(notification))
|
||||
.await;
|
||||
pending_thread_unloads.lock().await.remove(&thread_id);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -5635,6 +5643,9 @@ impl CodexMessageProcessor {
|
||||
info!("thread {thread_id} was active; shutting down");
|
||||
match Self::wait_for_thread_shutdown(&conversation).await {
|
||||
ThreadShutdownResult::Complete => {}
|
||||
ThreadShutdownResult::ShutdownFailed(error) => {
|
||||
error!("thread {thread_id} shutdown completed with error: {error}");
|
||||
}
|
||||
ThreadShutdownResult::SubmitFailed => {
|
||||
error!(
|
||||
"failed to submit Shutdown to thread {thread_id}; proceeding with archive"
|
||||
|
||||
@@ -747,7 +747,10 @@ impl Codex {
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
session_loop_termination.await;
|
||||
Ok(())
|
||||
match self.session.shutdown_failure.borrow().clone() {
|
||||
Some(message) => Err(CodexErr::ShutdownFailed(message)),
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn next_event(&self) -> CodexResult<Event> {
|
||||
@@ -821,6 +824,7 @@ pub(crate) struct Session {
|
||||
pub(crate) conversation_id: ThreadId,
|
||||
tx_event: Sender<Event>,
|
||||
agent_status: watch::Sender<AgentStatus>,
|
||||
shutdown_failure: watch::Sender<Option<String>>,
|
||||
out_of_band_elicitation_paused: watch::Sender<bool>,
|
||||
state: Mutex<SessionState>,
|
||||
/// Serializes rebuild/apply cycles for the running proxy; each cycle
|
||||
@@ -2039,12 +2043,14 @@ impl Session {
|
||||
));
|
||||
let (out_of_band_elicitation_paused, _out_of_band_elicitation_paused_rx) =
|
||||
watch::channel(false);
|
||||
let (shutdown_failure, _shutdown_failure_rx) = watch::channel(None);
|
||||
|
||||
let (mailbox, mailbox_rx) = Mailbox::new();
|
||||
let sess = Arc::new(Session {
|
||||
conversation_id,
|
||||
tx_event: tx_event.clone(),
|
||||
agent_status,
|
||||
shutdown_failure,
|
||||
out_of_band_elicitation_paused,
|
||||
state: Mutex::new(state),
|
||||
managed_network_proxy_refresh_lock: Mutex::new(()),
|
||||
@@ -5629,6 +5635,7 @@ mod handlers {
|
||||
|
||||
// Gracefully flush and shutdown rollout recorder on session end so tests
|
||||
// that inspect the rollout file do not race with the background writer.
|
||||
let mut shutdown_failures = Vec::new();
|
||||
let recorder_opt = {
|
||||
let mut guard = sess.services.rollout.lock().await;
|
||||
guard.take()
|
||||
@@ -5637,15 +5644,37 @@ mod handlers {
|
||||
&& let Err(e) = rec.shutdown().await
|
||||
{
|
||||
warn!("failed to shutdown rollout recorder: {e}");
|
||||
let message = "Failed to shutdown rollout recorder".to_string();
|
||||
shutdown_failures.push(message.clone());
|
||||
let event = Event {
|
||||
id: sub_id.clone(),
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: "Failed to shutdown rollout recorder".to_string(),
|
||||
message,
|
||||
codex_error_info: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
};
|
||||
sess.send_event_raw(event).await;
|
||||
}
|
||||
if let Some(state_db) = sess.services.state_db.as_deref()
|
||||
&& let Err(e) = state_db.checkpoint_wal().await
|
||||
{
|
||||
warn!("failed to checkpoint state db WAL during shutdown: {e}");
|
||||
let message = "Failed to checkpoint state database WAL".to_string();
|
||||
shutdown_failures.push(message.clone());
|
||||
let event = Event {
|
||||
id: sub_id.clone(),
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message,
|
||||
codex_error_info: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
};
|
||||
sess.send_event_raw(event).await;
|
||||
}
|
||||
|
||||
if !shutdown_failures.is_empty() {
|
||||
sess.shutdown_failure
|
||||
.send_replace(Some(shutdown_failures.join("; ")));
|
||||
}
|
||||
|
||||
let event = Event {
|
||||
id: sub_id,
|
||||
|
||||
@@ -2949,6 +2949,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
|
||||
conversation_id,
|
||||
tx_event,
|
||||
agent_status: agent_status_tx,
|
||||
shutdown_failure: watch::channel(None).0,
|
||||
out_of_band_elicitation_paused: watch::channel(false).0,
|
||||
state: Mutex::new(state),
|
||||
managed_network_proxy_refresh_lock: Mutex::new(()),
|
||||
@@ -3494,6 +3495,42 @@ async fn shutdown_and_wait_waits_when_shutdown_is_already_in_progress() {
|
||||
.expect("shutdown waiter");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shutdown_and_wait_returns_shutdown_error_status() {
|
||||
let (session, _turn_context) = make_session_and_context().await;
|
||||
let (tx_sub, rx_sub) = async_channel::bounded(4);
|
||||
let (_tx_event, rx_event) = async_channel::unbounded();
|
||||
let (_agent_status_tx, agent_status) = watch::channel(AgentStatus::PendingInit);
|
||||
let error_message = "shutdown durability failed".to_string();
|
||||
let status_message = error_message.clone();
|
||||
let session = Arc::new(session);
|
||||
let session_for_loop = Arc::clone(&session);
|
||||
let session_loop_handle = tokio::spawn(async move {
|
||||
let shutdown: Submission = rx_sub.recv().await.expect("shutdown submission");
|
||||
assert_eq!(shutdown.op, Op::Shutdown);
|
||||
session_for_loop
|
||||
.shutdown_failure
|
||||
.send_replace(Some(status_message));
|
||||
});
|
||||
let codex = Arc::new(Codex {
|
||||
tx_sub,
|
||||
rx_event,
|
||||
agent_status,
|
||||
session,
|
||||
session_loop_termination: session_loop_termination_from_handle(session_loop_handle),
|
||||
});
|
||||
|
||||
let err = codex
|
||||
.shutdown_and_wait()
|
||||
.await
|
||||
.expect_err("shutdown error status should propagate");
|
||||
|
||||
match err {
|
||||
CodexErr::ShutdownFailed(message) => assert_eq!(message, error_message),
|
||||
other => panic!("unexpected error: {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn shutdown_and_wait_shuts_down_cached_guardian_subagent() {
|
||||
let (parent_session, parent_turn_context) = make_session_and_context().await;
|
||||
@@ -3793,6 +3830,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
|
||||
conversation_id,
|
||||
tx_event,
|
||||
agent_status: agent_status_tx,
|
||||
shutdown_failure: watch::channel(None).0,
|
||||
out_of_band_elicitation_paused: watch::channel(false).0,
|
||||
state: Mutex::new(state),
|
||||
managed_network_proxy_refresh_lock: Mutex::new(()),
|
||||
|
||||
@@ -179,12 +179,14 @@ impl From<usize> for ForkSnapshot {
|
||||
#[derive(Debug, Default, PartialEq, Eq)]
|
||||
pub struct ThreadShutdownReport {
|
||||
pub completed: Vec<ThreadId>,
|
||||
pub shutdown_failed: Vec<(ThreadId, String)>,
|
||||
pub submit_failed: Vec<ThreadId>,
|
||||
pub timed_out: Vec<ThreadId>,
|
||||
}
|
||||
|
||||
enum ShutdownOutcome {
|
||||
Complete,
|
||||
ShutdownFailed(String),
|
||||
SubmitFailed,
|
||||
TimedOut,
|
||||
}
|
||||
@@ -605,6 +607,9 @@ impl ThreadManager {
|
||||
let outcome = match tokio::time::timeout(timeout, thread.shutdown_and_wait()).await
|
||||
{
|
||||
Ok(Ok(())) => ShutdownOutcome::Complete,
|
||||
Ok(Err(CodexErr::ShutdownFailed(message))) => {
|
||||
ShutdownOutcome::ShutdownFailed(message)
|
||||
}
|
||||
Ok(Err(_)) => ShutdownOutcome::SubmitFailed,
|
||||
Err(_) => ShutdownOutcome::TimedOut,
|
||||
};
|
||||
@@ -616,6 +621,9 @@ impl ThreadManager {
|
||||
while let Some((thread_id, outcome)) = shutdowns.next().await {
|
||||
match outcome {
|
||||
ShutdownOutcome::Complete => report.completed.push(thread_id),
|
||||
ShutdownOutcome::ShutdownFailed(message) => {
|
||||
report.shutdown_failed.push((thread_id, message));
|
||||
}
|
||||
ShutdownOutcome::SubmitFailed => report.submit_failed.push(thread_id),
|
||||
ShutdownOutcome::TimedOut => report.timed_out.push(thread_id),
|
||||
}
|
||||
@@ -625,10 +633,16 @@ impl ThreadManager {
|
||||
for thread_id in &report.completed {
|
||||
tracked_threads.remove(thread_id);
|
||||
}
|
||||
for (thread_id, _) in &report.shutdown_failed {
|
||||
tracked_threads.remove(thread_id);
|
||||
}
|
||||
|
||||
report
|
||||
.completed
|
||||
.sort_by_key(std::string::ToString::to_string);
|
||||
report
|
||||
.shutdown_failed
|
||||
.sort_by_key(|(thread_id, _)| thread_id.to_string());
|
||||
report
|
||||
.submit_failed
|
||||
.sort_by_key(std::string::ToString::to_string);
|
||||
|
||||
@@ -2473,14 +2473,12 @@ async fn wait_agent_returns_final_status_without_timeout() {
|
||||
.await
|
||||
.expect("subscribe should succeed");
|
||||
|
||||
let _ = thread
|
||||
thread
|
||||
.thread
|
||||
.submit(Op::Shutdown {})
|
||||
.shutdown_and_wait()
|
||||
.await
|
||||
.expect("shutdown should submit");
|
||||
let _ = timeout(Duration::from_secs(1), status_rx.changed())
|
||||
.await
|
||||
.expect("shutdown status should arrive");
|
||||
.expect("shutdown should complete");
|
||||
assert_eq!(status_rx.borrow_and_update().clone(), AgentStatus::Shutdown);
|
||||
|
||||
let invocation = invocation(
|
||||
Arc::new(session),
|
||||
|
||||
@@ -137,6 +137,8 @@ pub enum CodexErr {
|
||||
UnsupportedOperation(String),
|
||||
#[error("{0}")]
|
||||
RefreshTokenFailed(RefreshTokenFailedError),
|
||||
#[error("shutdown failed: {0}")]
|
||||
ShutdownFailed(String),
|
||||
#[error("Fatal error: {0}")]
|
||||
Fatal(String),
|
||||
// -----------------------------------------------------------------
|
||||
@@ -171,6 +173,7 @@ impl CodexErr {
|
||||
| CodexErr::Interrupted
|
||||
| CodexErr::EnvVar(_)
|
||||
| CodexErr::Fatal(_)
|
||||
| CodexErr::ShutdownFailed(_)
|
||||
| CodexErr::UsageNotIncluded
|
||||
| CodexErr::QuotaExceeded
|
||||
| CodexErr::InvalidImageRequest()
|
||||
|
||||
@@ -98,10 +98,10 @@ enum RolloutCmd {
|
||||
},
|
||||
/// Ensure all prior writes are processed; respond when flushed.
|
||||
Flush {
|
||||
ack: oneshot::Sender<()>,
|
||||
ack: oneshot::Sender<std::io::Result<()>>,
|
||||
},
|
||||
Shutdown {
|
||||
ack: oneshot::Sender<()>,
|
||||
ack: oneshot::Sender<std::io::Result<()>>,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -525,7 +525,7 @@ impl RolloutRecorder {
|
||||
.await
|
||||
.map_err(|e| IoError::other(format!("failed to queue rollout flush: {e}")))?;
|
||||
rx.await
|
||||
.map_err(|e| IoError::other(format!("failed waiting for rollout flush: {e}")))
|
||||
.map_err(|e| IoError::other(format!("failed waiting for rollout flush: {e}")))?
|
||||
}
|
||||
|
||||
pub async fn load_rollout_items(
|
||||
@@ -613,9 +613,9 @@ impl RolloutRecorder {
|
||||
pub async fn shutdown(&self) -> std::io::Result<()> {
|
||||
let (tx_done, rx_done) = oneshot::channel();
|
||||
match self.tx.send(RolloutCmd::Shutdown { ack: tx_done }).await {
|
||||
Ok(_) => rx_done
|
||||
.await
|
||||
.map_err(|e| IoError::other(format!("failed waiting for rollout shutdown: {e}")))?,
|
||||
Ok(_) => rx_done.await.map_err(|e| {
|
||||
IoError::other(format!("failed waiting for rollout shutdown: {e}"))
|
||||
})??,
|
||||
Err(e) => {
|
||||
warn!("failed to send rollout shutdown command: {e}");
|
||||
return Err(IoError::other(format!(
|
||||
@@ -746,18 +746,10 @@ async fn rollout_writer(
|
||||
while let Some(cmd) = rx.recv().await {
|
||||
match cmd {
|
||||
RolloutCmd::AddItems(items) => {
|
||||
if items.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
if writer.is_none() {
|
||||
buffered_items.extend(items);
|
||||
continue;
|
||||
}
|
||||
|
||||
write_and_reconcile_items(
|
||||
writer.as_mut(),
|
||||
items.as_slice(),
|
||||
write_or_buffer_rollout_items(
|
||||
&mut writer,
|
||||
&mut buffered_items,
|
||||
items,
|
||||
&rollout_path,
|
||||
state_db_ctx.as_deref(),
|
||||
state_builder.as_ref(),
|
||||
@@ -766,70 +758,104 @@ async fn rollout_writer(
|
||||
.await?;
|
||||
}
|
||||
RolloutCmd::Persist { ack } => {
|
||||
if writer.is_none() {
|
||||
let result = async {
|
||||
let Some(log_file_info) = deferred_log_file_info.take() else {
|
||||
return Err(IoError::other(
|
||||
"deferred rollout recorder missing log file metadata",
|
||||
));
|
||||
};
|
||||
let file = open_log_file(log_file_info.path.as_path())?;
|
||||
writer = Some(JsonlWriter {
|
||||
file: tokio::fs::File::from_std(file),
|
||||
});
|
||||
let result = async {
|
||||
materialize_writer_if_needed(
|
||||
&mut writer,
|
||||
&mut deferred_log_file_info,
|
||||
&mut meta,
|
||||
&mut buffered_items,
|
||||
&cwd,
|
||||
&rollout_path,
|
||||
state_db_ctx.as_deref(),
|
||||
&mut state_builder,
|
||||
default_provider.as_str(),
|
||||
generate_memories,
|
||||
)
|
||||
.await?;
|
||||
flush_writer(writer.as_mut()).await
|
||||
}
|
||||
.await;
|
||||
|
||||
if let Some(session_meta) = meta.take() {
|
||||
write_session_meta(
|
||||
writer.as_mut(),
|
||||
session_meta,
|
||||
&cwd,
|
||||
&rollout_path,
|
||||
state_db_ctx.as_deref(),
|
||||
&mut state_builder,
|
||||
default_provider.as_str(),
|
||||
generate_memories,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if !buffered_items.is_empty() {
|
||||
write_and_reconcile_items(
|
||||
writer.as_mut(),
|
||||
buffered_items.as_slice(),
|
||||
&rollout_path,
|
||||
state_db_ctx.as_deref(),
|
||||
state_builder.as_ref(),
|
||||
default_provider.as_str(),
|
||||
)
|
||||
.await?;
|
||||
buffered_items.clear();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
match result {
|
||||
Ok(()) => {
|
||||
let _ = ack.send(Ok(()));
|
||||
}
|
||||
.await;
|
||||
|
||||
if let Err(err) = result {
|
||||
let kind = err.kind();
|
||||
let message = err.to_string();
|
||||
let _ = ack.send(Err(IoError::new(kind, message.clone())));
|
||||
return Err(IoError::new(kind, message));
|
||||
Err(err) => {
|
||||
let return_err = clone_io_error(&err);
|
||||
let _ = ack.send(Err(err));
|
||||
return Err(return_err);
|
||||
}
|
||||
}
|
||||
let _ = ack.send(Ok(()));
|
||||
}
|
||||
RolloutCmd::Flush { ack } => {
|
||||
// Deferred fresh threads may not have an initialized file yet.
|
||||
if let Some(writer) = writer.as_mut()
|
||||
&& let Err(e) = writer.file.flush().await
|
||||
{
|
||||
let _ = ack.send(());
|
||||
return Err(e);
|
||||
match flush_writer(writer.as_mut()).await {
|
||||
Ok(()) => {
|
||||
let _ = ack.send(Ok(()));
|
||||
}
|
||||
Err(err) => {
|
||||
let return_err = clone_io_error(&err);
|
||||
let _ = ack.send(Err(err));
|
||||
return Err(return_err);
|
||||
}
|
||||
}
|
||||
let _ = ack.send(());
|
||||
}
|
||||
RolloutCmd::Shutdown { ack } => {
|
||||
let _ = ack.send(());
|
||||
rx.close();
|
||||
let mut shutdown_acks = vec![ack];
|
||||
let result = async {
|
||||
while let Some(cmd) = rx.recv().await {
|
||||
match cmd {
|
||||
RolloutCmd::AddItems(items) => {
|
||||
write_or_buffer_rollout_items(
|
||||
&mut writer,
|
||||
&mut buffered_items,
|
||||
items,
|
||||
&rollout_path,
|
||||
state_db_ctx.as_deref(),
|
||||
state_builder.as_ref(),
|
||||
default_provider.as_str(),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
RolloutCmd::Persist { ack }
|
||||
| RolloutCmd::Flush { ack }
|
||||
| RolloutCmd::Shutdown { ack } => {
|
||||
shutdown_acks.push(ack);
|
||||
}
|
||||
}
|
||||
}
|
||||
materialize_writer_if_needed(
|
||||
&mut writer,
|
||||
&mut deferred_log_file_info,
|
||||
&mut meta,
|
||||
&mut buffered_items,
|
||||
&cwd,
|
||||
&rollout_path,
|
||||
state_db_ctx.as_deref(),
|
||||
&mut state_builder,
|
||||
default_provider.as_str(),
|
||||
generate_memories,
|
||||
)
|
||||
.await?;
|
||||
sync_writer(writer.as_mut()).await
|
||||
}
|
||||
.await;
|
||||
match result {
|
||||
Ok(()) => {
|
||||
for ack in shutdown_acks {
|
||||
let _ = ack.send(Ok(()));
|
||||
}
|
||||
break;
|
||||
}
|
||||
Err(err) => {
|
||||
let return_err = clone_io_error(&err);
|
||||
for ack in shutdown_acks {
|
||||
let _ = ack.send(Err(clone_io_error(&err)));
|
||||
}
|
||||
return Err(return_err);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -837,6 +863,111 @@ async fn rollout_writer(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_or_buffer_rollout_items(
|
||||
writer: &mut Option<JsonlWriter>,
|
||||
buffered_items: &mut Vec<RolloutItem>,
|
||||
items: Vec<RolloutItem>,
|
||||
rollout_path: &Path,
|
||||
state_db_ctx: Option<&StateRuntime>,
|
||||
state_builder: Option<&ThreadMetadataBuilder>,
|
||||
default_provider: &str,
|
||||
) -> std::io::Result<()> {
|
||||
if items.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if writer.is_none() {
|
||||
buffered_items.extend(items);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
write_and_reconcile_items(
|
||||
writer.as_mut(),
|
||||
items.as_slice(),
|
||||
rollout_path,
|
||||
state_db_ctx,
|
||||
state_builder,
|
||||
default_provider,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
fn clone_io_error(err: &IoError) -> IoError {
|
||||
IoError::new(err.kind(), err.to_string())
|
||||
}
|
||||
|
||||
async fn sync_writer(writer: Option<&mut JsonlWriter>) -> std::io::Result<()> {
|
||||
if let Some(writer) = writer {
|
||||
writer.file.flush().await?;
|
||||
writer.file.sync_all().await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn flush_writer(writer: Option<&mut JsonlWriter>) -> std::io::Result<()> {
|
||||
if let Some(writer) = writer {
|
||||
writer.file.flush().await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn materialize_writer_if_needed(
|
||||
writer: &mut Option<JsonlWriter>,
|
||||
deferred_log_file_info: &mut Option<LogFileInfo>,
|
||||
meta: &mut Option<SessionMeta>,
|
||||
buffered_items: &mut Vec<RolloutItem>,
|
||||
cwd: &Path,
|
||||
rollout_path: &Path,
|
||||
state_db_ctx: Option<&StateRuntime>,
|
||||
state_builder: &mut Option<ThreadMetadataBuilder>,
|
||||
default_provider: &str,
|
||||
generate_memories: bool,
|
||||
) -> std::io::Result<()> {
|
||||
if writer.is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let Some(log_file_info) = deferred_log_file_info.take() else {
|
||||
return Err(IoError::other(
|
||||
"deferred rollout recorder missing log file metadata",
|
||||
));
|
||||
};
|
||||
let file = open_log_file(log_file_info.path.as_path())?;
|
||||
*writer = Some(JsonlWriter {
|
||||
file: tokio::fs::File::from_std(file),
|
||||
});
|
||||
|
||||
if let Some(session_meta) = meta.take() {
|
||||
write_session_meta(
|
||||
writer.as_mut(),
|
||||
session_meta,
|
||||
cwd,
|
||||
rollout_path,
|
||||
state_db_ctx,
|
||||
state_builder,
|
||||
default_provider,
|
||||
generate_memories,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if !buffered_items.is_empty() {
|
||||
write_and_reconcile_items(
|
||||
writer.as_mut(),
|
||||
buffered_items.as_slice(),
|
||||
rollout_path,
|
||||
state_db_ctx,
|
||||
state_builder.as_ref(),
|
||||
default_provider,
|
||||
)
|
||||
.await?;
|
||||
buffered_items.clear();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn write_session_meta(
|
||||
mut writer: Option<&mut JsonlWriter>,
|
||||
|
||||
@@ -60,6 +60,7 @@ pub async fn append_session_index_entry(
|
||||
line.push('\n');
|
||||
file.write_all(line.as_bytes()).await?;
|
||||
file.flush().await?;
|
||||
file.sync_all().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -69,6 +69,10 @@ pub use remote_control::RemoteControlEnrollmentRecord;
|
||||
// metadata, rather than the exact sum of all persisted SQLite column bytes.
|
||||
const LOG_PARTITION_SIZE_LIMIT_BYTES: i64 = 10 * 1024 * 1024;
|
||||
const LOG_PARTITION_ROW_LIMIT: i64 = 1_000;
|
||||
const CHECKPOINT_WAL_BUSY_MAX_ATTEMPTS: usize = 10;
|
||||
const CHECKPOINT_WAL_BUSY_TIMEOUT_MS: u64 = 100;
|
||||
const SQLITE_BUSY_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const SQLITE_BUSY_TIMEOUT_MS: u64 = 5_000;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct StateRuntime {
|
||||
@@ -139,6 +143,53 @@ impl StateRuntime {
|
||||
pub fn codex_home(&self) -> &Path {
|
||||
self.codex_home.as_path()
|
||||
}
|
||||
|
||||
/// Checkpoint both runtime SQLite databases so WAL contents are persisted to the main files.
|
||||
pub async fn checkpoint_wal(&self) -> anyhow::Result<()> {
|
||||
checkpoint_wal_pool(self.pool.as_ref(), "state").await?;
|
||||
checkpoint_wal_pool(self.logs_pool.as_ref(), "logs").await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
async fn checkpoint_wal_pool(pool: &SqlitePool, name: &str) -> anyhow::Result<()> {
|
||||
let mut conn = pool.acquire().await?;
|
||||
sqlx::query(&format!(
|
||||
"PRAGMA busy_timeout = {CHECKPOINT_WAL_BUSY_TIMEOUT_MS}"
|
||||
))
|
||||
.execute(&mut *conn)
|
||||
.await?;
|
||||
let checkpoint_result = async {
|
||||
let mut last_busy_result = None;
|
||||
for attempt in 1..=CHECKPOINT_WAL_BUSY_MAX_ATTEMPTS {
|
||||
let row = sqlx::query("PRAGMA wal_checkpoint(TRUNCATE)")
|
||||
.fetch_one(&mut *conn)
|
||||
.await?;
|
||||
let busy: i64 = row.try_get(0)?;
|
||||
let log_pages: i64 = row.try_get(1)?;
|
||||
let checkpointed_pages: i64 = row.try_get(2)?;
|
||||
if busy == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
last_busy_result = Some((checkpointed_pages, log_pages));
|
||||
if attempt < CHECKPOINT_WAL_BUSY_MAX_ATTEMPTS {
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
}
|
||||
|
||||
let (checkpointed_pages, log_pages) = last_busy_result.unwrap_or((0, 0));
|
||||
anyhow::bail!(
|
||||
"{name} WAL checkpoint was busy: {checkpointed_pages}/{log_pages} pages checkpointed"
|
||||
)
|
||||
};
|
||||
let checkpoint_result: anyhow::Result<()> = checkpoint_result.await;
|
||||
let restore_result = sqlx::query(&format!("PRAGMA busy_timeout = {SQLITE_BUSY_TIMEOUT_MS}"))
|
||||
.execute(&mut *conn)
|
||||
.await;
|
||||
restore_result?;
|
||||
|
||||
checkpoint_result
|
||||
}
|
||||
|
||||
fn base_sqlite_options(path: &Path) -> SqliteConnectOptions {
|
||||
@@ -147,7 +198,7 @@ fn base_sqlite_options(path: &Path) -> SqliteConnectOptions {
|
||||
.create_if_missing(true)
|
||||
.journal_mode(SqliteJournalMode::Wal)
|
||||
.synchronous(SqliteSynchronous::Normal)
|
||||
.busy_timeout(Duration::from_secs(5))
|
||||
.busy_timeout(SQLITE_BUSY_TIMEOUT)
|
||||
.log_statements(LevelFilter::Off)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user