mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
fix
This commit is contained in:
@@ -1,7 +1,6 @@
|
||||
use crate::endpoint::realtime_websocket::protocol::ConversationItem;
|
||||
use crate::endpoint::realtime_websocket::protocol::ConversationItemContent;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeAudioFrame;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeConnectionState;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeEvent;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage;
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionConfig;
|
||||
@@ -318,9 +317,7 @@ impl RealtimeWebsocketWriter {
|
||||
impl RealtimeWebsocketEvents {
|
||||
pub async fn next_event(&self) -> Result<Option<RealtimeEvent>, ApiError> {
|
||||
if self.is_closed.load(Ordering::SeqCst) {
|
||||
return Ok(Some(RealtimeEvent::State(
|
||||
RealtimeConnectionState::Disconnected,
|
||||
)));
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let msg = match self.rx_message.lock().await.recv().await {
|
||||
@@ -333,9 +330,7 @@ impl RealtimeWebsocketEvents {
|
||||
}
|
||||
None => {
|
||||
self.is_closed.store(true, Ordering::SeqCst);
|
||||
return Ok(Some(RealtimeEvent::State(
|
||||
RealtimeConnectionState::Disconnected,
|
||||
)));
|
||||
return Ok(None);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -343,9 +338,7 @@ impl RealtimeWebsocketEvents {
|
||||
Message::Text(text) => Ok(parse_realtime_event(&text)),
|
||||
Message::Close(_) => {
|
||||
self.is_closed.store(true, Ordering::SeqCst);
|
||||
Ok(Some(RealtimeEvent::State(
|
||||
RealtimeConnectionState::Disconnected,
|
||||
)))
|
||||
Ok(None)
|
||||
}
|
||||
Message::Binary(_) => Ok(Some(RealtimeEvent::Error(
|
||||
"unexpected binary realtime websocket event".to_string(),
|
||||
|
||||
@@ -146,3 +146,162 @@ async fn realtime_ws_e2e_session_create_and_event_flow() {
|
||||
connection.close().await.expect("close");
|
||||
server.await.expect("server task");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn realtime_ws_e2e_send_while_next_event_waits() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
|
||||
let addr = listener.local_addr().expect("local addr");
|
||||
|
||||
let server = tokio::spawn(async move {
|
||||
let (stream, _) = listener.accept().await.expect("accept");
|
||||
let mut ws = accept_async(stream).await.expect("accept ws");
|
||||
|
||||
let first = ws
|
||||
.next()
|
||||
.await
|
||||
.expect("first msg")
|
||||
.expect("first msg ok")
|
||||
.into_text()
|
||||
.expect("text");
|
||||
let first_json: Value = serde_json::from_str(&first).expect("json");
|
||||
assert_eq!(first_json["type"], "session.create");
|
||||
|
||||
let second = ws
|
||||
.next()
|
||||
.await
|
||||
.expect("second msg")
|
||||
.expect("second msg ok")
|
||||
.into_text()
|
||||
.expect("text");
|
||||
let second_json: Value = serde_json::from_str(&second).expect("json");
|
||||
assert_eq!(second_json["type"], "response.input_audio.delta");
|
||||
|
||||
ws.send(Message::Text(
|
||||
json!({
|
||||
"type": "session.created",
|
||||
"session": {"id": "sess_after_send"}
|
||||
})
|
||||
.to_string()
|
||||
.into(),
|
||||
))
|
||||
.await
|
||||
.expect("send session.created");
|
||||
});
|
||||
|
||||
let provider = Provider {
|
||||
name: "test".to_string(),
|
||||
base_url: "http://localhost".to_string(),
|
||||
query_params: Some(HashMap::new()),
|
||||
headers: HeaderMap::new(),
|
||||
retry: RetryConfig {
|
||||
max_attempts: 1,
|
||||
base_delay: Duration::from_millis(1),
|
||||
retry_429: false,
|
||||
retry_5xx: false,
|
||||
retry_transport: false,
|
||||
},
|
||||
stream_idle_timeout: Duration::from_secs(5),
|
||||
};
|
||||
let client = RealtimeWebsocketClient::new(provider);
|
||||
let connection = client
|
||||
.connect(
|
||||
RealtimeSessionConfig {
|
||||
api_url: format!("ws://{addr}"),
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: Some("conv_123".to_string()),
|
||||
},
|
||||
HeaderMap::new(),
|
||||
HeaderMap::new(),
|
||||
)
|
||||
.await
|
||||
.expect("connect");
|
||||
|
||||
let (send_result, next_result) = tokio::join!(
|
||||
async {
|
||||
tokio::time::timeout(
|
||||
Duration::from_millis(200),
|
||||
connection.send_audio_frame(RealtimeAudioFrame {
|
||||
data: "AQID".to_string(),
|
||||
sample_rate: 48000,
|
||||
num_channels: 1,
|
||||
samples_per_channel: Some(960),
|
||||
}),
|
||||
)
|
||||
.await
|
||||
},
|
||||
connection.next_event()
|
||||
);
|
||||
|
||||
send_result
|
||||
.expect("send should not block on next_event")
|
||||
.expect("send audio");
|
||||
let next_event = next_result.expect("next event").expect("event");
|
||||
assert_eq!(
|
||||
next_event,
|
||||
RealtimeEvent::SessionCreated {
|
||||
session_id: "sess_after_send".to_string()
|
||||
}
|
||||
);
|
||||
|
||||
connection.close().await.expect("close");
|
||||
server.await.expect("server task");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn realtime_ws_e2e_disconnected_emitted_once() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind");
|
||||
let addr = listener.local_addr().expect("local addr");
|
||||
|
||||
let server = tokio::spawn(async move {
|
||||
let (stream, _) = listener.accept().await.expect("accept");
|
||||
let mut ws = accept_async(stream).await.expect("accept ws");
|
||||
|
||||
let first = ws
|
||||
.next()
|
||||
.await
|
||||
.expect("first msg")
|
||||
.expect("first msg ok")
|
||||
.into_text()
|
||||
.expect("text");
|
||||
let first_json: Value = serde_json::from_str(&first).expect("json");
|
||||
assert_eq!(first_json["type"], "session.create");
|
||||
|
||||
ws.send(Message::Close(None)).await.expect("send close");
|
||||
});
|
||||
|
||||
let provider = Provider {
|
||||
name: "test".to_string(),
|
||||
base_url: "http://localhost".to_string(),
|
||||
query_params: Some(HashMap::new()),
|
||||
headers: HeaderMap::new(),
|
||||
retry: RetryConfig {
|
||||
max_attempts: 1,
|
||||
base_delay: Duration::from_millis(1),
|
||||
retry_429: false,
|
||||
retry_5xx: false,
|
||||
retry_transport: false,
|
||||
},
|
||||
stream_idle_timeout: Duration::from_secs(5),
|
||||
};
|
||||
let client = RealtimeWebsocketClient::new(provider);
|
||||
let connection = client
|
||||
.connect(
|
||||
RealtimeSessionConfig {
|
||||
api_url: format!("ws://{addr}"),
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: Some("conv_123".to_string()),
|
||||
},
|
||||
HeaderMap::new(),
|
||||
HeaderMap::new(),
|
||||
)
|
||||
.await
|
||||
.expect("connect");
|
||||
|
||||
let first = connection.next_event().await.expect("next event");
|
||||
assert_eq!(first, None);
|
||||
|
||||
let second = connection.next_event().await.expect("next event");
|
||||
assert_eq!(second, None);
|
||||
|
||||
server.await.expect("server task");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user