mirror of
https://github.com/openai/codex.git
synced 2026-04-24 14:45:27 +00:00
253 lines
8.1 KiB
Rust
253 lines
8.1 KiB
Rust
use anyhow::Context;
|
|
use codex_protocol::models::ContentItem;
|
|
use codex_protocol::models::ResponseItem;
|
|
use codex_protocol::protocol::AskForApproval;
|
|
use codex_protocol::protocol::EventMsg;
|
|
use codex_protocol::protocol::Op;
|
|
use codex_protocol::protocol::RolloutItem;
|
|
use codex_protocol::protocol::RolloutLine;
|
|
use codex_protocol::protocol::SandboxPolicy;
|
|
use codex_protocol::user_input::UserInput;
|
|
use core_test_support::responses;
|
|
use core_test_support::responses::ev_assistant_message;
|
|
use core_test_support::responses::ev_completed;
|
|
use core_test_support::responses::ev_response_created;
|
|
use core_test_support::responses::sse;
|
|
use core_test_support::responses::start_mock_server;
|
|
use core_test_support::skip_if_no_network;
|
|
use core_test_support::test_codex::TestCodex;
|
|
use core_test_support::test_codex::test_codex;
|
|
use core_test_support::wait_for_event;
|
|
use image::ImageBuffer;
|
|
use image::Rgba;
|
|
use pretty_assertions::assert_eq;
|
|
use std::path::Path;
|
|
use std::time::Duration;
|
|
|
|
fn find_user_message_with_image(text: &str) -> Option<ResponseItem> {
|
|
for line in text.lines() {
|
|
let trimmed = line.trim();
|
|
if trimmed.is_empty() {
|
|
continue;
|
|
}
|
|
let rollout: RolloutLine = match serde_json::from_str(trimmed) {
|
|
Ok(rollout) => rollout,
|
|
Err(_) => continue,
|
|
};
|
|
if let RolloutItem::ResponseItem(ResponseItem::Message { role, content, .. }) =
|
|
&rollout.item
|
|
&& role == "user"
|
|
&& content
|
|
.iter()
|
|
.any(|span| matches!(span, ContentItem::InputImage { .. }))
|
|
&& let RolloutItem::ResponseItem(item) = rollout.item.clone()
|
|
{
|
|
return Some(item);
|
|
}
|
|
}
|
|
None
|
|
}
|
|
|
|
fn extract_image_url(item: &ResponseItem) -> Option<String> {
|
|
match item {
|
|
ResponseItem::Message { content, .. } => content.iter().find_map(|span| match span {
|
|
ContentItem::InputImage { image_url } => Some(image_url.clone()),
|
|
_ => None,
|
|
}),
|
|
_ => None,
|
|
}
|
|
}
|
|
|
|
async fn read_rollout_text(path: &Path) -> anyhow::Result<String> {
|
|
for _ in 0..50 {
|
|
if path.exists()
|
|
&& let Ok(text) = std::fs::read_to_string(path)
|
|
&& !text.trim().is_empty()
|
|
{
|
|
return Ok(text);
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(20)).await;
|
|
}
|
|
std::fs::read_to_string(path)
|
|
.with_context(|| format!("read rollout file at {}", path.display()))
|
|
}
|
|
|
|
fn write_test_png(path: &Path, color: [u8; 4]) -> anyhow::Result<()> {
|
|
if let Some(parent) = path.parent() {
|
|
std::fs::create_dir_all(parent)?;
|
|
}
|
|
let image = ImageBuffer::from_pixel(2, 2, Rgba(color));
|
|
image.save(path)?;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn copy_paste_local_image_persists_rollout_request_shape() -> anyhow::Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = start_mock_server().await;
|
|
|
|
let TestCodex {
|
|
codex,
|
|
cwd,
|
|
session_configured,
|
|
home: _home,
|
|
..
|
|
} = test_codex().build(&server).await?;
|
|
|
|
let rel_path = "images/paste.png";
|
|
let abs_path = cwd.path().join(rel_path);
|
|
write_test_png(&abs_path, [12, 34, 56, 255])?;
|
|
|
|
let response = sse(vec![
|
|
ev_response_created("resp-1"),
|
|
ev_assistant_message("msg-1", "done"),
|
|
ev_completed("resp-1"),
|
|
]);
|
|
responses::mount_sse_once(&server, response).await;
|
|
|
|
let session_model = session_configured.model.clone();
|
|
|
|
codex
|
|
.submit(Op::UserTurn {
|
|
items: vec![
|
|
UserInput::LocalImage {
|
|
path: abs_path.clone(),
|
|
},
|
|
UserInput::Text {
|
|
text: "pasted image".to_string(),
|
|
text_elements: Vec::new(),
|
|
},
|
|
],
|
|
final_output_json_schema: None,
|
|
cwd: cwd.path().to_path_buf(),
|
|
approval_policy: AskForApproval::Never,
|
|
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
|
model: session_model,
|
|
effort: None,
|
|
summary: None,
|
|
service_tier: None,
|
|
collaboration_mode: None,
|
|
personality: None,
|
|
})
|
|
.await?;
|
|
|
|
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
|
codex.submit(Op::Shutdown).await?;
|
|
wait_for_event(&codex, |event| matches!(event, EventMsg::ShutdownComplete)).await;
|
|
|
|
let rollout_path = codex.rollout_path().expect("rollout path");
|
|
let rollout_text = read_rollout_text(&rollout_path).await?;
|
|
let actual = find_user_message_with_image(&rollout_text)
|
|
.expect("expected user message with input image in rollout");
|
|
|
|
let image_url = extract_image_url(&actual).expect("expected image url in rollout");
|
|
let expected = ResponseItem::Message {
|
|
id: None,
|
|
role: "user".to_string(),
|
|
content: vec![
|
|
ContentItem::InputText {
|
|
text: codex_protocol::models::local_image_open_tag_text(1),
|
|
},
|
|
ContentItem::InputImage { image_url },
|
|
ContentItem::InputText {
|
|
text: codex_protocol::models::image_close_tag_text(),
|
|
},
|
|
ContentItem::InputText {
|
|
text: "pasted image".to_string(),
|
|
},
|
|
],
|
|
metadata: None,
|
|
end_turn: None,
|
|
phase: None,
|
|
};
|
|
|
|
assert_eq!(actual, expected);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
async fn drag_drop_image_persists_rollout_request_shape() -> anyhow::Result<()> {
|
|
skip_if_no_network!(Ok(()));
|
|
|
|
let server = start_mock_server().await;
|
|
|
|
let TestCodex {
|
|
codex,
|
|
cwd,
|
|
session_configured,
|
|
home: _home,
|
|
..
|
|
} = test_codex().build(&server).await?;
|
|
|
|
let image_url = "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAQAAAC1HAwCAAAAC0lEQVR4nGNgYAAAAAMAASsJTYQAAAAASUVORK5CYII=".to_string();
|
|
|
|
let response = sse(vec![
|
|
ev_response_created("resp-1"),
|
|
ev_assistant_message("msg-1", "done"),
|
|
ev_completed("resp-1"),
|
|
]);
|
|
responses::mount_sse_once(&server, response).await;
|
|
|
|
let session_model = session_configured.model.clone();
|
|
|
|
codex
|
|
.submit(Op::UserTurn {
|
|
items: vec![
|
|
UserInput::Image {
|
|
image_url: image_url.clone(),
|
|
},
|
|
UserInput::Text {
|
|
text: "dropped image".to_string(),
|
|
text_elements: Vec::new(),
|
|
},
|
|
],
|
|
final_output_json_schema: None,
|
|
cwd: cwd.path().to_path_buf(),
|
|
approval_policy: AskForApproval::Never,
|
|
sandbox_policy: SandboxPolicy::DangerFullAccess,
|
|
model: session_model,
|
|
effort: None,
|
|
summary: None,
|
|
service_tier: None,
|
|
collaboration_mode: None,
|
|
personality: None,
|
|
})
|
|
.await?;
|
|
|
|
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
|
|
codex.submit(Op::Shutdown).await?;
|
|
wait_for_event(&codex, |event| matches!(event, EventMsg::ShutdownComplete)).await;
|
|
|
|
let rollout_path = codex.rollout_path().expect("rollout path");
|
|
let rollout_text = read_rollout_text(&rollout_path).await?;
|
|
let actual = find_user_message_with_image(&rollout_text)
|
|
.expect("expected user message with input image in rollout");
|
|
|
|
let image_url = extract_image_url(&actual).expect("expected image url in rollout");
|
|
let expected = ResponseItem::Message {
|
|
id: None,
|
|
role: "user".to_string(),
|
|
content: vec![
|
|
ContentItem::InputText {
|
|
text: codex_protocol::models::image_open_tag_text(),
|
|
},
|
|
ContentItem::InputImage { image_url },
|
|
ContentItem::InputText {
|
|
text: codex_protocol::models::image_close_tag_text(),
|
|
},
|
|
ContentItem::InputText {
|
|
text: "dropped image".to_string(),
|
|
},
|
|
],
|
|
metadata: None,
|
|
end_turn: None,
|
|
phase: None,
|
|
};
|
|
|
|
assert_eq!(actual, expected);
|
|
|
|
Ok(())
|
|
}
|