Refactor realtime handoff state

- group handoff output sender and active handoff id into a dedicated state object
- centralize handoff output enqueue and active handoff lookup behavior

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim
2026-03-02 11:36:28 -07:00
parent b6a2077336
commit 75c4cd839d

View File

@@ -53,14 +53,47 @@ struct HandoffOutput {
output_text: String,
}
#[derive(Clone, Debug)]
struct RealtimeHandoffState {
output_tx: Sender<HandoffOutput>,
active_handoff_id: Arc<Mutex<Option<String>>>,
}
impl RealtimeHandoffState {
fn new(output_tx: Sender<HandoffOutput>) -> Self {
Self {
output_tx,
active_handoff_id: Arc::new(Mutex::new(None)),
}
}
async fn send_output(&self, output_text: String) -> CodexResult<()> {
let Some(handoff_id) = self.active_handoff_id().await else {
return Ok(());
};
self.output_tx
.send(HandoffOutput {
handoff_id,
output_text,
})
.await
.map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?;
Ok(())
}
async fn active_handoff_id(&self) -> Option<String> {
self.active_handoff_id.lock().await.clone()
}
}
#[allow(dead_code)]
struct ConversationState {
audio_tx: Sender<RealtimeAudioFrame>,
user_text_tx: Sender<String>,
handoff_output_tx: Sender<HandoffOutput>,
handoff: RealtimeHandoffState,
task: JoinHandle<()>,
realtime_active: Arc<AtomicBool>,
active_handoff_id: Arc<Mutex<Option<String>>>,
}
#[allow(dead_code)]
@@ -123,7 +156,7 @@ impl RealtimeConversationManager {
async_channel::bounded::<RealtimeEvent>(OUTPUT_EVENTS_QUEUE_CAPACITY);
let realtime_active = Arc::new(AtomicBool::new(true));
let active_handoff_id = Arc::new(Mutex::new(None));
let handoff = RealtimeHandoffState::new(handoff_output_tx);
let task = spawn_realtime_input_task(
writer,
events,
@@ -131,17 +164,16 @@ impl RealtimeConversationManager {
handoff_output_rx,
audio_rx,
events_tx,
Arc::clone(&active_handoff_id),
handoff.clone(),
);
let mut guard = self.state.lock().await;
*guard = Some(ConversationState {
audio_tx,
user_text_tx,
handoff_output_tx,
handoff,
task,
realtime_active: Arc::clone(&realtime_active),
active_handoff_id,
});
Ok((events_rx, realtime_active))
}
@@ -190,41 +222,25 @@ impl RealtimeConversationManager {
}
pub(crate) async fn handoff_out(&self, output_text: String) -> CodexResult<()> {
let (sender, active_handoff_id) = {
let handoff = {
let guard = self.state.lock().await;
let Some(state) = guard.as_ref() else {
return Err(CodexErr::InvalidRequest(
"conversation is not running".to_string(),
));
};
(
state.handoff_output_tx.clone(),
Arc::clone(&state.active_handoff_id),
)
state.handoff.clone()
};
let Some(handoff_id) = active_handoff_id.lock().await.clone() else {
return Ok(());
};
sender
.send(HandoffOutput {
handoff_id,
output_text,
})
.await
.map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?;
Ok(())
handoff.send_output(output_text).await
}
pub(crate) async fn active_handoff_id(&self) -> Option<String> {
let active_handoff_id = {
let handoff = {
let guard = self.state.lock().await;
guard
.as_ref()
.map(|state| Arc::clone(&state.active_handoff_id))
guard.as_ref().map(|state| state.handoff.clone())
}?;
active_handoff_id.lock().await.clone()
handoff.active_handoff_id().await
}
pub(crate) async fn shutdown(&self) -> CodexResult<()> {
@@ -431,7 +447,7 @@ fn spawn_realtime_input_task(
handoff_output_rx: Receiver<HandoffOutput>,
audio_rx: Receiver<RealtimeAudioFrame>,
events_tx: Sender<RealtimeEvent>,
active_handoff_id: Arc<Mutex<Option<String>>>,
handoff_state: RealtimeHandoffState,
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
@@ -470,7 +486,8 @@ fn spawn_realtime_input_task(
match event {
Ok(Some(event)) => {
if let RealtimeEvent::HandoffRequested(handoff) = &event {
*active_handoff_id.lock().await = Some(handoff.handoff_id.clone());
*handoff_state.active_handoff_id.lock().await =
Some(handoff.handoff_id.clone());
}
let should_stop = matches!(&event, RealtimeEvent::Error(_));
if events_tx.send(event).await.is_err() {