mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
core: make compact turns finish and interrupt promptly
Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -1,8 +1,10 @@
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::SessionTask;
|
||||
use super::SessionTaskContext;
|
||||
use crate::codex::TurnContext;
|
||||
use crate::error::Result as CodexResult;
|
||||
use crate::state::TaskKind;
|
||||
use async_trait::async_trait;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
@@ -11,6 +13,18 @@ use tokio_util::sync::CancellationToken;
|
||||
#[derive(Clone, Copy, Default)]
|
||||
pub(crate) struct CompactTask;
|
||||
|
||||
async fn await_compaction_or_cancellation<F>(
|
||||
cancellation_token: CancellationToken,
|
||||
compact_future: F,
|
||||
) where
|
||||
F: Future<Output = CodexResult<()>>,
|
||||
{
|
||||
tokio::select! {
|
||||
_ = cancellation_token.cancelled() => {}
|
||||
_ = compact_future => {}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl SessionTask for CompactTask {
|
||||
fn kind(&self) -> TaskKind {
|
||||
@@ -26,24 +40,81 @@ impl SessionTask for CompactTask {
|
||||
session: Arc<SessionTaskContext>,
|
||||
ctx: Arc<TurnContext>,
|
||||
input: Vec<UserInput>,
|
||||
_cancellation_token: CancellationToken,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> Option<String> {
|
||||
let session = session.clone_session();
|
||||
let _ = if crate::compact::should_use_remote_compact_task(&ctx.provider) {
|
||||
if crate::compact::should_use_remote_compact_task(&ctx.provider) {
|
||||
let _ = session.services.session_telemetry.counter(
|
||||
"codex.task.compact",
|
||||
/*inc*/ 1,
|
||||
&[("type", "remote")],
|
||||
);
|
||||
crate::compact_remote::run_remote_compact_task(session.clone(), ctx).await
|
||||
await_compaction_or_cancellation(
|
||||
cancellation_token,
|
||||
crate::compact_remote::run_remote_compact_task(session.clone(), ctx),
|
||||
)
|
||||
.await;
|
||||
} else {
|
||||
let _ = session.services.session_telemetry.counter(
|
||||
"codex.task.compact",
|
||||
/*inc*/ 1,
|
||||
&[("type", "local")],
|
||||
);
|
||||
crate::compact::run_compact_task(session.clone(), ctx, input).await
|
||||
};
|
||||
await_compaction_or_cancellation(
|
||||
cancellation_token,
|
||||
crate::compact::run_compact_task(session.clone(), ctx, input),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::future::pending;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio::sync::Notify;
|
||||
|
||||
use super::await_compaction_or_cancellation;
|
||||
use crate::error::Result as CodexResult;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
#[tokio::test]
|
||||
async fn await_compaction_or_cancellation_returns_when_future_finishes() {
|
||||
let finished = Arc::new(Notify::new());
|
||||
let finished_clone = Arc::clone(&finished);
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
await_compaction_or_cancellation(CancellationToken::new(), async move {
|
||||
finished_clone.notify_waiters();
|
||||
Ok(())
|
||||
})
|
||||
.await;
|
||||
});
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(1), finished.notified())
|
||||
.await
|
||||
.expect("compaction future should be awaited");
|
||||
task.await.expect("task should complete");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn await_compaction_or_cancellation_returns_when_cancelled() {
|
||||
let cancellation_token = CancellationToken::new();
|
||||
let child_token = cancellation_token.child_token();
|
||||
|
||||
let task = tokio::spawn(async move {
|
||||
await_compaction_or_cancellation(child_token, pending::<CodexResult<()>>()).await;
|
||||
});
|
||||
|
||||
cancellation_token.cancel();
|
||||
|
||||
tokio::time::timeout(Duration::from_secs(1), task)
|
||||
.await
|
||||
.expect("cancellation should unblock compaction waiting")
|
||||
.expect("task should complete cleanly");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -199,12 +199,14 @@ impl Session {
|
||||
)
|
||||
.await;
|
||||
let sess = session_ctx.clone_session();
|
||||
sess.flush_rollout().await;
|
||||
if !task_cancellation_token.is_cancelled() {
|
||||
// Emit completion uniformly from spawn site so all tasks share the same lifecycle.
|
||||
// Emit completion uniformly from spawn site so all tasks share the same
|
||||
// lifecycle. Do this before flushing rollout durability so the UI is not
|
||||
// stuck in `Working` after the task's visible work already finished.
|
||||
sess.on_task_finished(Arc::clone(&ctx_for_finish), last_agent_message)
|
||||
.await;
|
||||
}
|
||||
sess.flush_rollout().await;
|
||||
done_clone.notify_waiters();
|
||||
}
|
||||
.instrument(task_span),
|
||||
|
||||
Reference in New Issue
Block a user