mirror of
https://github.com/openai/codex.git
synced 2026-06-02 11:22:01 +00:00
Compare commits
8 Commits
iceweasel/
...
dev/mzeng/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
afe57f7db8 | ||
|
|
244b15c95d | ||
|
|
25a0f6784d | ||
|
|
7ab825e047 | ||
|
|
76de99ff25 | ||
|
|
12f0e0b0eb | ||
|
|
9657104a7b | ||
|
|
c0b5d8d24a |
@@ -23,6 +23,7 @@ In the codex-rs folder where the rust code lives:
|
||||
- When making a change that adds or changes an API, ensure that the documentation in the `docs/` folder is up to date if applicable.
|
||||
- Prefer private modules and explicitly exported public crate API.
|
||||
- If you change `ConfigToml` or nested config types, run `just write-config-schema` to update `codex-rs/core/config.schema.json`.
|
||||
- When working with MCP tool calls, prefer using `codex-rs/codex-mcp/src/mcp_connection_manager.rs` to handle mutation of tools and tool calls. Aim to minimize the footprint of changes and leverage existing abstractions rather than plumbing code through multiple levels of function calls.
|
||||
- If you change Rust dependencies (`Cargo.toml` or `Cargo.lock`), run `just bazel-lock-update` from the
|
||||
repo root to refresh `MODULE.bazel.lock`, and include that lockfile update in the same change.
|
||||
- After dependency changes, run `just bazel-lock-check` from the repo root so lockfile drift is caught
|
||||
|
||||
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -1414,6 +1414,7 @@ dependencies = [
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
"tokio-test",
|
||||
|
||||
@@ -199,6 +199,7 @@ use codex_core::SteerInputError;
|
||||
use codex_core::ThreadConfigSnapshot;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::ThreadSortKey as CoreThreadSortKey;
|
||||
use codex_core::append_thread_name;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config::ConfigOverrides;
|
||||
use codex_core::config::NetworkProxyAuditMetadata;
|
||||
@@ -288,11 +289,13 @@ use codex_protocol::protocol::ReviewTarget as CoreReviewTarget;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionConfiguredEvent;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::ThreadNameUpdatedEvent;
|
||||
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
|
||||
use codex_protocol::protocol::W3cTraceContext;
|
||||
use codex_protocol::user_input::MAX_USER_INPUT_TEXT_CHARS;
|
||||
use codex_protocol::user_input::UserInput as CoreInputItem;
|
||||
use codex_rmcp_client::perform_oauth_login_return_url;
|
||||
use codex_rollout::append_rollout_item_to_path;
|
||||
use codex_rollout::state_db::StateDbHandle;
|
||||
use codex_rollout::state_db::get_state_db;
|
||||
use codex_rollout::state_db::reconcile_rollout;
|
||||
@@ -2674,11 +2677,11 @@ impl CodexMessageProcessor {
|
||||
return;
|
||||
}
|
||||
|
||||
let thread_exists =
|
||||
let rollout_path =
|
||||
match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()).await
|
||||
{
|
||||
Ok(Some(_)) => true,
|
||||
Ok(None) => false,
|
||||
Ok(Some(path)) => Some(path),
|
||||
Ok(None) => None,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
@@ -2689,19 +2692,39 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
if !thread_exists {
|
||||
let Some(rollout_path) = rollout_path else {
|
||||
self.send_invalid_request_error(request_id, format!("thread not found: {thread_id}"))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) =
|
||||
codex_core::append_thread_name(&self.config.codex_home, thread_id, &name).await
|
||||
{
|
||||
let msg = EventMsg::ThreadNameUpdated(ThreadNameUpdatedEvent {
|
||||
thread_id,
|
||||
thread_name: Some(name.clone()),
|
||||
});
|
||||
let item = RolloutItem::EventMsg(msg);
|
||||
if let Err(err) = append_rollout_item_to_path(rollout_path.as_path(), &item).await {
|
||||
self.send_internal_error(request_id, format!("failed to set thread name: {err}"))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
if let Err(err) = append_thread_name(&self.config.codex_home, thread_id, &name).await {
|
||||
self.send_internal_error(request_id, format!("failed to index thread name: {err}"))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
|
||||
let state_db_ctx = open_state_db_for_direct_thread_lookup(&self.config).await;
|
||||
reconcile_rollout(
|
||||
state_db_ctx.as_deref(),
|
||||
rollout_path.as_path(),
|
||||
self.config.model_provider_id.as_str(),
|
||||
/*builder*/ None,
|
||||
&[],
|
||||
/*archived_only*/ None,
|
||||
/*new_thread_memory_mode*/ None,
|
||||
)
|
||||
.await;
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadSetNameResponse {})
|
||||
@@ -3445,13 +3468,7 @@ impl CodexMessageProcessor {
|
||||
threads.push((conversation_id, thread));
|
||||
}
|
||||
|
||||
let names = match find_thread_names_by_ids(&self.config.codex_home, &thread_ids).await {
|
||||
Ok(names) => names,
|
||||
Err(err) => {
|
||||
warn!("Failed to read thread names: {err}");
|
||||
HashMap::new()
|
||||
}
|
||||
};
|
||||
let names = thread_titles_by_ids(&self.config, &thread_ids).await;
|
||||
|
||||
let statuses = self
|
||||
.thread_watch_manager
|
||||
@@ -3461,7 +3478,9 @@ impl CodexMessageProcessor {
|
||||
let data = threads
|
||||
.into_iter()
|
||||
.map(|(conversation_id, mut thread)| {
|
||||
thread.name = names.get(&conversation_id).cloned();
|
||||
if let Some(title) = names.get(&conversation_id).cloned() {
|
||||
set_thread_name_from_title(&mut thread, title);
|
||||
}
|
||||
if let Some(status) = statuses.get(&thread.id) {
|
||||
thread.status = status.clone();
|
||||
}
|
||||
@@ -4265,13 +4284,8 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
|
||||
async fn attach_thread_name(&self, thread_id: ThreadId, thread: &mut Thread) {
|
||||
match find_thread_name_by_id(&self.config.codex_home, &thread_id).await {
|
||||
Ok(name) => {
|
||||
thread.name = name;
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("Failed to read thread name for {thread_id}: {err}");
|
||||
}
|
||||
if let Some(title) = title_from_state_db(&self.config, thread_id).await {
|
||||
set_thread_name_from_title(thread, title);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8029,7 +8043,7 @@ async fn handle_thread_listener_command(
|
||||
async fn handle_pending_thread_resume_request(
|
||||
conversation_id: ThreadId,
|
||||
conversation: &Arc<CodexThread>,
|
||||
codex_home: &Path,
|
||||
_codex_home: &Path,
|
||||
thread_state_manager: &ThreadStateManager,
|
||||
thread_state: &Arc<Mutex<ThreadState>>,
|
||||
thread_watch_manager: &ThreadWatchManager,
|
||||
@@ -8087,11 +8101,6 @@ async fn handle_pending_thread_resume_request(
|
||||
has_live_in_progress_turn,
|
||||
);
|
||||
|
||||
match find_thread_name_by_id(codex_home, &conversation_id).await {
|
||||
Ok(thread_name) => thread.name = thread_name,
|
||||
Err(err) => warn!("Failed to read thread name for {conversation_id}: {err}"),
|
||||
}
|
||||
|
||||
let ThreadConfigSnapshot {
|
||||
model,
|
||||
model_provider_id,
|
||||
@@ -8653,7 +8662,7 @@ async fn read_summary_from_state_db_by_thread_id(
|
||||
config: &Config,
|
||||
thread_id: ThreadId,
|
||||
) -> Option<ConversationSummary> {
|
||||
let state_db_ctx = get_state_db(config).await;
|
||||
let state_db_ctx = open_state_db_for_direct_thread_lookup(config).await;
|
||||
read_summary_from_state_db_context_by_thread_id(state_db_ctx.as_ref(), thread_id).await
|
||||
}
|
||||
|
||||
@@ -8670,6 +8679,71 @@ async fn read_summary_from_state_db_context_by_thread_id(
|
||||
Some(summary_from_thread_metadata(&metadata))
|
||||
}
|
||||
|
||||
async fn title_from_state_db(config: &Config, thread_id: ThreadId) -> Option<String> {
|
||||
if let Some(state_db_ctx) = open_state_db_for_direct_thread_lookup(config).await
|
||||
&& let Some(metadata) = state_db_ctx.get_thread(thread_id).await.ok().flatten()
|
||||
&& let Some(title) = distinct_title(&metadata)
|
||||
{
|
||||
return Some(title);
|
||||
}
|
||||
find_thread_name_by_id(&config.codex_home, &thread_id)
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
}
|
||||
|
||||
async fn thread_titles_by_ids(
|
||||
config: &Config,
|
||||
thread_ids: &HashSet<ThreadId>,
|
||||
) -> HashMap<ThreadId, String> {
|
||||
let mut names = HashMap::with_capacity(thread_ids.len());
|
||||
if let Some(state_db_ctx) = open_state_db_for_direct_thread_lookup(config).await {
|
||||
for &thread_id in thread_ids {
|
||||
let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await else {
|
||||
continue;
|
||||
};
|
||||
if let Some(title) = distinct_title(&metadata) {
|
||||
names.insert(thread_id, title);
|
||||
}
|
||||
}
|
||||
}
|
||||
if names.len() < thread_ids.len()
|
||||
&& let Ok(legacy_names) = find_thread_names_by_ids(&config.codex_home, thread_ids).await
|
||||
{
|
||||
for (thread_id, title) in legacy_names {
|
||||
names.entry(thread_id).or_insert(title);
|
||||
}
|
||||
}
|
||||
names
|
||||
}
|
||||
|
||||
async fn open_state_db_for_direct_thread_lookup(config: &Config) -> Option<StateDbHandle> {
|
||||
StateRuntime::init(config.sqlite_home.clone(), config.model_provider_id.clone())
|
||||
.await
|
||||
.ok()
|
||||
}
|
||||
|
||||
fn non_empty_title(metadata: &ThreadMetadata) -> Option<String> {
|
||||
let title = metadata.title.trim();
|
||||
(!title.is_empty()).then(|| title.to_string())
|
||||
}
|
||||
|
||||
fn distinct_title(metadata: &ThreadMetadata) -> Option<String> {
|
||||
let title = non_empty_title(metadata)?;
|
||||
if metadata.first_user_message.as_deref().map(str::trim) == Some(title.as_str()) {
|
||||
None
|
||||
} else {
|
||||
Some(title)
|
||||
}
|
||||
}
|
||||
|
||||
fn set_thread_name_from_title(thread: &mut Thread, title: String) {
|
||||
if title.trim().is_empty() || thread.preview.trim() == title.trim() {
|
||||
return;
|
||||
}
|
||||
thread.name = Some(title);
|
||||
}
|
||||
|
||||
async fn summary_from_thread_list_item(
|
||||
it: codex_core::ThreadItem,
|
||||
fallback_provider: &str,
|
||||
@@ -8968,6 +9042,14 @@ async fn load_thread_summary_for_rollout(
|
||||
} else if let Some(summary) = read_summary_from_state_db_by_thread_id(config, thread_id).await {
|
||||
merge_mutable_thread_metadata(&mut thread, summary_to_thread(summary));
|
||||
}
|
||||
let title = if let Some(metadata) = persisted_metadata {
|
||||
non_empty_title(metadata)
|
||||
} else {
|
||||
title_from_state_db(config, thread_id).await
|
||||
};
|
||||
if let Some(title) = title {
|
||||
set_thread_name_from_title(&mut thread, title);
|
||||
}
|
||||
Ok(thread)
|
||||
}
|
||||
|
||||
|
||||
@@ -21,7 +21,14 @@ use codex_app_server_protocol::ThreadResumeParams;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadSetNameParams;
|
||||
use codex_app_server_protocol::ThreadSetNameResponse;
|
||||
use codex_core::find_thread_name_by_id;
|
||||
use codex_core::find_thread_path_by_id_str;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::RolloutLine;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
use tokio::time::Duration;
|
||||
use tokio::time::timeout;
|
||||
@@ -77,6 +84,11 @@ async fn thread_name_updated_broadcasts_for_loaded_threads() -> Result<()> {
|
||||
let ws2_notification =
|
||||
read_notification_for_method(&mut ws2, "thread/name/updated").await?;
|
||||
assert_thread_name_updated(ws2_notification, &conversation_id, renamed)?;
|
||||
assert_legacy_thread_name(codex_home.path(), &conversation_id, renamed).await?;
|
||||
assert_eq!(
|
||||
thread_name_update_rollout_count(codex_home.path(), &conversation_id).await?,
|
||||
1
|
||||
);
|
||||
|
||||
assert_no_message(&mut ws1, Duration::from_millis(250)).await?;
|
||||
assert_no_message(&mut ws2, Duration::from_millis(250)).await?;
|
||||
@@ -128,6 +140,11 @@ async fn thread_name_updated_broadcasts_for_not_loaded_threads() -> Result<()> {
|
||||
let ws2_notification =
|
||||
read_notification_for_method(&mut ws2, "thread/name/updated").await?;
|
||||
assert_thread_name_updated(ws2_notification, &conversation_id, renamed)?;
|
||||
assert_legacy_thread_name(codex_home.path(), &conversation_id, renamed).await?;
|
||||
assert_eq!(
|
||||
thread_name_update_rollout_count(codex_home.path(), &conversation_id).await?,
|
||||
1
|
||||
);
|
||||
|
||||
assert_no_message(&mut ws1, Duration::from_millis(250)).await?;
|
||||
assert_no_message(&mut ws2, Duration::from_millis(250)).await?;
|
||||
@@ -174,3 +191,38 @@ fn assert_thread_name_updated(
|
||||
assert_eq!(notification.thread_name.as_deref(), Some(thread_name));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn assert_legacy_thread_name(
|
||||
codex_home: &Path,
|
||||
conversation_id: &str,
|
||||
expected_name: &str,
|
||||
) -> Result<()> {
|
||||
let thread_id = ThreadId::from_string(conversation_id)?;
|
||||
assert_eq!(
|
||||
find_thread_name_by_id(codex_home, &thread_id)
|
||||
.await?
|
||||
.as_deref(),
|
||||
Some(expected_name)
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn thread_name_update_rollout_count(
|
||||
codex_home: &Path,
|
||||
conversation_id: &str,
|
||||
) -> Result<usize> {
|
||||
let rollout_path = find_thread_path_by_id_str(codex_home, conversation_id)
|
||||
.await?
|
||||
.context("rollout path")?;
|
||||
let contents = tokio::fs::read_to_string(rollout_path).await?;
|
||||
Ok(contents
|
||||
.lines()
|
||||
.filter_map(|line| serde_json::from_str::<RolloutLine>(line).ok())
|
||||
.filter(|line| {
|
||||
matches!(
|
||||
line.item,
|
||||
RolloutItem::EventMsg(EventMsg::ThreadNameUpdated(_))
|
||||
)
|
||||
})
|
||||
.count())
|
||||
}
|
||||
|
||||
@@ -14,22 +14,24 @@ codex-protocol = { workspace = true }
|
||||
codex-utils-rustls-provider = { workspace = true }
|
||||
futures = { workspace = true }
|
||||
http = { workspace = true }
|
||||
reqwest = { workspace = true, features = ["json", "stream"] }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["macros", "net", "rt", "sync", "time"] }
|
||||
tokio = { workspace = true, features = ["fs", "macros", "net", "rt", "sync", "time"] }
|
||||
tokio-tungstenite = { workspace = true }
|
||||
tungstenite = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
eventsource-stream = { workspace = true }
|
||||
regex-lite = { workspace = true }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
tokio-util = { workspace = true, features = ["codec", "io"] }
|
||||
url = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
anyhow = { workspace = true }
|
||||
assert_matches = { workspace = true }
|
||||
pretty_assertions = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
tokio-test = { workspace = true }
|
||||
wiremock = { workspace = true }
|
||||
reqwest = { workspace = true }
|
||||
|
||||
370
codex-rs/codex-api/src/files.rs
Normal file
370
codex-rs/codex-api/src/files.rs
Normal file
@@ -0,0 +1,370 @@
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::AuthProvider;
|
||||
use codex_client::build_reqwest_client_with_custom_ca;
|
||||
use reqwest::StatusCode;
|
||||
use reqwest::header::CONTENT_LENGTH;
|
||||
use serde::Deserialize;
|
||||
use tokio::fs::File;
|
||||
use tokio::time::Instant;
|
||||
use tokio_util::io::ReaderStream;
|
||||
|
||||
pub const OPENAI_FILE_URI_PREFIX: &str = "sediment://";
|
||||
pub const OPENAI_FILE_UPLOAD_LIMIT_BYTES: u64 = 512 * 1024 * 1024;
|
||||
|
||||
const OPENAI_FILE_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
const OPENAI_FILE_FINALIZE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const OPENAI_FILE_FINALIZE_RETRY_DELAY: Duration = Duration::from_millis(250);
|
||||
const OPENAI_FILE_USE_CASE: &str = "codex";
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct UploadedOpenAiFile {
|
||||
pub file_id: String,
|
||||
pub uri: String,
|
||||
pub download_url: String,
|
||||
pub file_name: String,
|
||||
pub file_size_bytes: u64,
|
||||
pub mime_type: Option<String>,
|
||||
pub path: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum OpenAiFileError {
|
||||
#[error("path `{path}` does not exist")]
|
||||
MissingPath { path: PathBuf },
|
||||
#[error("path `{path}` is not a file")]
|
||||
NotAFile { path: PathBuf },
|
||||
#[error("path `{path}` cannot be read: {source}")]
|
||||
ReadFile {
|
||||
path: PathBuf,
|
||||
#[source]
|
||||
source: std::io::Error,
|
||||
},
|
||||
#[error(
|
||||
"file `{path}` is too large: {size_bytes} bytes exceeds the limit of {limit_bytes} bytes"
|
||||
)]
|
||||
FileTooLarge {
|
||||
path: PathBuf,
|
||||
size_bytes: u64,
|
||||
limit_bytes: u64,
|
||||
},
|
||||
#[error("failed to send OpenAI file request to {url}: {source}")]
|
||||
Request {
|
||||
url: String,
|
||||
#[source]
|
||||
source: reqwest::Error,
|
||||
},
|
||||
#[error("OpenAI file request to {url} failed with status {status}: {body}")]
|
||||
UnexpectedStatus {
|
||||
url: String,
|
||||
status: StatusCode,
|
||||
body: String,
|
||||
},
|
||||
#[error("failed to parse OpenAI file response from {url}: {source}")]
|
||||
Decode {
|
||||
url: String,
|
||||
#[source]
|
||||
source: serde_json::Error,
|
||||
},
|
||||
#[error("OpenAI file upload for `{file_id}` is not ready yet")]
|
||||
UploadNotReady { file_id: String },
|
||||
#[error("OpenAI file upload for `{file_id}` failed: {message}")]
|
||||
UploadFailed { file_id: String, message: String },
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct CreateFileResponse {
|
||||
file_id: String,
|
||||
upload_url: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
struct DownloadLinkResponse {
|
||||
status: String,
|
||||
download_url: Option<String>,
|
||||
file_name: Option<String>,
|
||||
mime_type: Option<String>,
|
||||
error_message: Option<String>,
|
||||
}
|
||||
|
||||
pub fn openai_file_uri(file_id: &str) -> String {
|
||||
format!("{OPENAI_FILE_URI_PREFIX}{file_id}")
|
||||
}
|
||||
|
||||
pub async fn upload_local_file(
|
||||
base_url: &str,
|
||||
auth: &impl AuthProvider,
|
||||
path: &Path,
|
||||
) -> Result<UploadedOpenAiFile, OpenAiFileError> {
|
||||
let metadata = tokio::fs::metadata(path)
|
||||
.await
|
||||
.map_err(|source| match source.kind() {
|
||||
std::io::ErrorKind::NotFound => OpenAiFileError::MissingPath {
|
||||
path: path.to_path_buf(),
|
||||
},
|
||||
_ => OpenAiFileError::ReadFile {
|
||||
path: path.to_path_buf(),
|
||||
source,
|
||||
},
|
||||
})?;
|
||||
if !metadata.is_file() {
|
||||
return Err(OpenAiFileError::NotAFile {
|
||||
path: path.to_path_buf(),
|
||||
});
|
||||
}
|
||||
if metadata.len() > OPENAI_FILE_UPLOAD_LIMIT_BYTES {
|
||||
return Err(OpenAiFileError::FileTooLarge {
|
||||
path: path.to_path_buf(),
|
||||
size_bytes: metadata.len(),
|
||||
limit_bytes: OPENAI_FILE_UPLOAD_LIMIT_BYTES,
|
||||
});
|
||||
}
|
||||
|
||||
let file_name = path
|
||||
.file_name()
|
||||
.and_then(|value| value.to_str())
|
||||
.unwrap_or("file")
|
||||
.to_string();
|
||||
let create_url = format!("{}/files", base_url.trim_end_matches('/'));
|
||||
let create_response = authorized_request(auth, reqwest::Method::POST, &create_url)
|
||||
.json(&serde_json::json!({
|
||||
"file_name": file_name,
|
||||
"file_size": metadata.len(),
|
||||
"use_case": OPENAI_FILE_USE_CASE,
|
||||
}))
|
||||
.send()
|
||||
.await
|
||||
.map_err(|source| OpenAiFileError::Request {
|
||||
url: create_url.clone(),
|
||||
source,
|
||||
})?;
|
||||
let create_status = create_response.status();
|
||||
let create_body = create_response.text().await.unwrap_or_default();
|
||||
if !create_status.is_success() {
|
||||
return Err(OpenAiFileError::UnexpectedStatus {
|
||||
url: create_url,
|
||||
status: create_status,
|
||||
body: create_body,
|
||||
});
|
||||
}
|
||||
let create_payload: CreateFileResponse =
|
||||
serde_json::from_str(&create_body).map_err(|source| OpenAiFileError::Decode {
|
||||
url: create_url.clone(),
|
||||
source,
|
||||
})?;
|
||||
|
||||
let upload_file = File::open(path)
|
||||
.await
|
||||
.map_err(|source| OpenAiFileError::ReadFile {
|
||||
path: path.to_path_buf(),
|
||||
source,
|
||||
})?;
|
||||
let upload_response = build_reqwest_client()
|
||||
.put(&create_payload.upload_url)
|
||||
.timeout(OPENAI_FILE_REQUEST_TIMEOUT)
|
||||
.header("x-ms-blob-type", "BlockBlob")
|
||||
.header(CONTENT_LENGTH, metadata.len())
|
||||
.body(reqwest::Body::wrap_stream(ReaderStream::new(upload_file)))
|
||||
.send()
|
||||
.await
|
||||
.map_err(|source| OpenAiFileError::Request {
|
||||
url: create_payload.upload_url.clone(),
|
||||
source,
|
||||
})?;
|
||||
let upload_status = upload_response.status();
|
||||
let upload_body = upload_response.text().await.unwrap_or_default();
|
||||
if !upload_status.is_success() {
|
||||
return Err(OpenAiFileError::UnexpectedStatus {
|
||||
url: create_payload.upload_url.clone(),
|
||||
status: upload_status,
|
||||
body: upload_body,
|
||||
});
|
||||
}
|
||||
|
||||
let finalize_url = format!(
|
||||
"{}/files/{}/uploaded",
|
||||
base_url.trim_end_matches('/'),
|
||||
create_payload.file_id,
|
||||
);
|
||||
let finalize_started_at = Instant::now();
|
||||
loop {
|
||||
let finalize_response = authorized_request(auth, reqwest::Method::POST, &finalize_url)
|
||||
.json(&serde_json::json!({}))
|
||||
.send()
|
||||
.await
|
||||
.map_err(|source| OpenAiFileError::Request {
|
||||
url: finalize_url.clone(),
|
||||
source,
|
||||
})?;
|
||||
let finalize_status = finalize_response.status();
|
||||
let finalize_body = finalize_response.text().await.unwrap_or_default();
|
||||
if !finalize_status.is_success() {
|
||||
return Err(OpenAiFileError::UnexpectedStatus {
|
||||
url: finalize_url.clone(),
|
||||
status: finalize_status,
|
||||
body: finalize_body,
|
||||
});
|
||||
}
|
||||
let finalize_payload: DownloadLinkResponse =
|
||||
serde_json::from_str(&finalize_body).map_err(|source| OpenAiFileError::Decode {
|
||||
url: finalize_url.clone(),
|
||||
source,
|
||||
})?;
|
||||
|
||||
match finalize_payload.status.as_str() {
|
||||
"success" => {
|
||||
return Ok(UploadedOpenAiFile {
|
||||
file_id: create_payload.file_id.clone(),
|
||||
uri: openai_file_uri(&create_payload.file_id),
|
||||
download_url: finalize_payload.download_url.ok_or_else(|| {
|
||||
OpenAiFileError::UploadFailed {
|
||||
file_id: create_payload.file_id.clone(),
|
||||
message: "missing download_url".to_string(),
|
||||
}
|
||||
})?,
|
||||
file_name: finalize_payload.file_name.unwrap_or(file_name),
|
||||
file_size_bytes: metadata.len(),
|
||||
mime_type: finalize_payload.mime_type,
|
||||
path: path.to_path_buf(),
|
||||
});
|
||||
}
|
||||
"retry" => {
|
||||
if finalize_started_at.elapsed() >= OPENAI_FILE_FINALIZE_TIMEOUT {
|
||||
return Err(OpenAiFileError::UploadNotReady {
|
||||
file_id: create_payload.file_id,
|
||||
});
|
||||
}
|
||||
tokio::time::sleep(OPENAI_FILE_FINALIZE_RETRY_DELAY).await;
|
||||
}
|
||||
_ => {
|
||||
return Err(OpenAiFileError::UploadFailed {
|
||||
file_id: create_payload.file_id,
|
||||
message: finalize_payload
|
||||
.error_message
|
||||
.unwrap_or_else(|| "upload finalization returned an error".to_string()),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn authorized_request(
|
||||
auth: &impl AuthProvider,
|
||||
method: reqwest::Method,
|
||||
url: &str,
|
||||
) -> reqwest::RequestBuilder {
|
||||
let client = build_reqwest_client();
|
||||
let mut request = client
|
||||
.request(method, url)
|
||||
.timeout(OPENAI_FILE_REQUEST_TIMEOUT);
|
||||
if let Some(token) = auth.bearer_token() {
|
||||
request = request.bearer_auth(token);
|
||||
}
|
||||
if let Some(account_id) = auth.account_id() {
|
||||
request = request.header("chatgpt-account-id", account_id);
|
||||
}
|
||||
request
|
||||
}
|
||||
|
||||
fn build_reqwest_client() -> reqwest::Client {
|
||||
build_reqwest_client_with_custom_ca(reqwest::Client::builder()).unwrap_or_else(|error| {
|
||||
tracing::warn!(error = %error, "failed to build OpenAI file upload client");
|
||||
reqwest::Client::new()
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::CoreAuthProvider;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering;
|
||||
use tempfile::TempDir;
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::Request;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::body_json;
|
||||
use wiremock::matchers::header;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
fn chatgpt_auth() -> CoreAuthProvider {
|
||||
CoreAuthProvider::for_test(Some("token"), Some("account_id"))
|
||||
}
|
||||
|
||||
fn base_url_for(server: &MockServer) -> String {
|
||||
format!("{}/backend-api", server.uri())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn upload_local_file_returns_canonical_uri() {
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/backend-api/files"))
|
||||
.and(header("chatgpt-account-id", "account_id"))
|
||||
.and(body_json(serde_json::json!({
|
||||
"file_name": "hello.txt",
|
||||
"file_size": 5,
|
||||
"use_case": "codex",
|
||||
})))
|
||||
.respond_with(
|
||||
ResponseTemplate::new(200)
|
||||
.set_body_json(serde_json::json!({"file_id": "file_123", "upload_url": format!("{}/upload/file_123", server.uri())})),
|
||||
)
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("PUT"))
|
||||
.and(path("/upload/file_123"))
|
||||
.and(header("content-length", "5"))
|
||||
.respond_with(ResponseTemplate::new(200))
|
||||
.mount(&server)
|
||||
.await;
|
||||
let finalize_attempts = Arc::new(AtomicUsize::new(0));
|
||||
let finalize_attempts_responder = Arc::clone(&finalize_attempts);
|
||||
let download_url = format!("{}/download/file_123", server.uri());
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/backend-api/files/file_123/uploaded"))
|
||||
.respond_with(move |_request: &Request| {
|
||||
if finalize_attempts_responder.fetch_add(1, Ordering::SeqCst) == 0 {
|
||||
return ResponseTemplate::new(200).set_body_json(serde_json::json!({
|
||||
"status": "retry"
|
||||
}));
|
||||
}
|
||||
|
||||
ResponseTemplate::new(200).set_body_json(serde_json::json!({
|
||||
"status": "success",
|
||||
"download_url": download_url,
|
||||
"file_name": "hello.txt",
|
||||
"mime_type": "text/plain",
|
||||
"file_size_bytes": 5
|
||||
}))
|
||||
})
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let base_url = base_url_for(&server);
|
||||
let dir = TempDir::new().expect("temp dir");
|
||||
let path = dir.path().join("hello.txt");
|
||||
tokio::fs::write(&path, b"hello").await.expect("write file");
|
||||
|
||||
let uploaded = upload_local_file(&base_url, &chatgpt_auth(), &path)
|
||||
.await
|
||||
.expect("upload succeeds");
|
||||
|
||||
assert_eq!(uploaded.file_id, "file_123");
|
||||
assert_eq!(uploaded.uri, "sediment://file_123");
|
||||
assert_eq!(
|
||||
uploaded.download_url,
|
||||
format!("{}/download/file_123", server.uri())
|
||||
);
|
||||
assert_eq!(uploaded.file_name, "hello.txt");
|
||||
assert_eq!(uploaded.mime_type, Some("text/plain".to_string()));
|
||||
assert_eq!(finalize_attempts.load(Ordering::SeqCst), 2);
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,7 @@ pub(crate) mod auth;
|
||||
pub(crate) mod common;
|
||||
pub(crate) mod endpoint;
|
||||
pub(crate) mod error;
|
||||
pub(crate) mod files;
|
||||
pub(crate) mod provider;
|
||||
pub(crate) mod rate_limits;
|
||||
pub(crate) mod requests;
|
||||
@@ -52,6 +53,7 @@ pub use crate::endpoint::ResponsesWebsocketClient;
|
||||
pub use crate::endpoint::ResponsesWebsocketConnection;
|
||||
pub use crate::endpoint::session_update_session_json;
|
||||
pub use crate::error::ApiError;
|
||||
pub use crate::files::upload_local_file;
|
||||
pub use crate::provider::Provider;
|
||||
pub use crate::provider::RetryConfig;
|
||||
pub use crate::provider::is_azure_responses_wire_base_url;
|
||||
|
||||
@@ -38,4 +38,5 @@ pub use mcp_connection_manager::McpConnectionManager;
|
||||
pub use mcp_connection_manager::SandboxState;
|
||||
pub use mcp_connection_manager::ToolInfo;
|
||||
pub use mcp_connection_manager::codex_apps_tools_cache_key;
|
||||
pub use mcp_connection_manager::declared_openai_file_input_param_names;
|
||||
pub use mcp_connection_manager::filter_non_codex_apps_mcp_tools_only;
|
||||
|
||||
@@ -74,6 +74,8 @@ use rmcp::model::Tool;
|
||||
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use serde_json::Map;
|
||||
use serde_json::Value as JsonValue;
|
||||
use sha1::Digest;
|
||||
use sha1::Sha1;
|
||||
use tokio::sync::Mutex;
|
||||
@@ -197,6 +199,89 @@ pub struct ToolInfo {
|
||||
pub connector_description: Option<String>,
|
||||
}
|
||||
|
||||
const META_OPENAI_FILE_PARAMS: &str = "openai/fileParams";
|
||||
|
||||
pub fn declared_openai_file_input_param_names(
|
||||
meta: Option<&Map<String, JsonValue>>,
|
||||
) -> Vec<String> {
|
||||
let Some(meta) = meta else {
|
||||
return Vec::new();
|
||||
};
|
||||
|
||||
meta.get(META_OPENAI_FILE_PARAMS)
|
||||
.and_then(JsonValue::as_array)
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.filter_map(JsonValue::as_str)
|
||||
.filter(|value| !value.is_empty())
|
||||
.map(str::to_string)
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Returns the model-visible view of a tool while preserving the raw metadata
|
||||
/// used by execution. Keep cache entries raw and call this at manager return
|
||||
/// boundaries.
|
||||
fn tool_with_model_visible_input_schema(tool: &Tool) -> Tool {
|
||||
let file_params = declared_openai_file_input_param_names(tool.meta.as_deref());
|
||||
if file_params.is_empty() {
|
||||
return tool.clone();
|
||||
}
|
||||
|
||||
let mut tool = tool.clone();
|
||||
let mut input_schema = JsonValue::Object(tool.input_schema.as_ref().clone());
|
||||
mask_input_schema_for_file_path_params(&mut input_schema, &file_params);
|
||||
if let JsonValue::Object(input_schema) = input_schema {
|
||||
tool.input_schema = Arc::new(input_schema);
|
||||
}
|
||||
tool
|
||||
}
|
||||
|
||||
fn mask_input_schema_for_file_path_params(input_schema: &mut JsonValue, file_params: &[String]) {
|
||||
let Some(properties) = input_schema
|
||||
.as_object_mut()
|
||||
.and_then(|schema| schema.get_mut("properties"))
|
||||
.and_then(JsonValue::as_object_mut)
|
||||
else {
|
||||
return;
|
||||
};
|
||||
|
||||
for field_name in file_params {
|
||||
let Some(property_schema) = properties.get_mut(field_name) else {
|
||||
continue;
|
||||
};
|
||||
mask_input_property_schema(property_schema);
|
||||
}
|
||||
}
|
||||
|
||||
fn mask_input_property_schema(schema: &mut JsonValue) {
|
||||
let Some(object) = schema.as_object_mut() else {
|
||||
return;
|
||||
};
|
||||
|
||||
let mut description = object
|
||||
.get("description")
|
||||
.and_then(JsonValue::as_str)
|
||||
.map(str::to_string)
|
||||
.unwrap_or_default();
|
||||
let guidance = "This parameter expects an absolute local file path. If you want to upload a file, provide the absolute path to that file here.";
|
||||
if description.is_empty() {
|
||||
description = guidance.to_string();
|
||||
} else if !description.contains(guidance) {
|
||||
description = format!("{description} {guidance}");
|
||||
}
|
||||
|
||||
let is_array = object.get("type").and_then(JsonValue::as_str) == Some("array")
|
||||
|| object.get("items").is_some();
|
||||
object.clear();
|
||||
object.insert("description".to_string(), JsonValue::String(description));
|
||||
if is_array {
|
||||
object.insert("type".to_string(), JsonValue::String("array".to_string()));
|
||||
object.insert("items".to_string(), serde_json::json!({ "type": "string" }));
|
||||
} else {
|
||||
object.insert("type".to_string(), JsonValue::String("string".to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct CodexAppsToolsCacheKey {
|
||||
account_id: Option<String>,
|
||||
@@ -534,6 +619,10 @@ impl AsyncManagedClient {
|
||||
let annotate_tools = |tools: Vec<ToolInfo>| {
|
||||
let mut tools = tools;
|
||||
for tool in &mut tools {
|
||||
if tool.server_name == CODEX_APPS_MCP_SERVER_NAME {
|
||||
tool.tool = tool_with_model_visible_input_schema(&tool.tool);
|
||||
}
|
||||
|
||||
let plugin_names = match tool.connector_id.as_deref() {
|
||||
Some(connector_id) => self
|
||||
.tool_plugin_provenance
|
||||
@@ -914,10 +1003,13 @@ impl McpConnectionManager {
|
||||
list_start.elapsed(),
|
||||
&[("cache", "miss")],
|
||||
);
|
||||
Ok(qualify_tools(filter_tools(
|
||||
tools,
|
||||
&managed_client.tool_filter,
|
||||
)))
|
||||
let tools = filter_tools(tools, &managed_client.tool_filter)
|
||||
.into_iter()
|
||||
.map(|mut tool| {
|
||||
tool.tool = tool_with_model_visible_input_schema(&tool.tool);
|
||||
tool
|
||||
});
|
||||
Ok(qualify_tools(tools))
|
||||
}
|
||||
|
||||
/// Returns a single map that contains all resources. Each key is the
|
||||
@@ -1419,6 +1511,12 @@ async fn start_server_task(
|
||||
.await
|
||||
.map_err(StartupOutcomeError::from)?;
|
||||
|
||||
let server_supports_sandbox_state_capability = initialize_result
|
||||
.capabilities
|
||||
.experimental
|
||||
.as_ref()
|
||||
.and_then(|exp| exp.get(MCP_SANDBOX_STATE_CAPABILITY))
|
||||
.is_some();
|
||||
let list_start = Instant::now();
|
||||
let fetch_start = Instant::now();
|
||||
let tools = list_tools_for_client_uncached(
|
||||
@@ -1448,12 +1546,6 @@ async fn start_server_task(
|
||||
}
|
||||
let tools = filter_tools(tools, &tool_filter);
|
||||
|
||||
let server_supports_sandbox_state_capability = initialize_result
|
||||
.capabilities
|
||||
.experimental
|
||||
.as_ref()
|
||||
.and_then(|exp| exp.get(MCP_SANDBOX_STATE_CAPABILITY))
|
||||
.is_some();
|
||||
let managed = ManagedClient {
|
||||
client: Arc::clone(&client),
|
||||
tools,
|
||||
|
||||
@@ -3,6 +3,7 @@ use codex_protocol::protocol::GranularApprovalConfig;
|
||||
use codex_protocol::protocol::McpAuthStatus;
|
||||
use pretty_assertions::assert_eq;
|
||||
use rmcp::model::JsonObject;
|
||||
use rmcp::model::Meta;
|
||||
use rmcp::model::NumberOrString;
|
||||
use std::collections::HashSet;
|
||||
use std::sync::Arc;
|
||||
@@ -63,6 +64,86 @@ fn create_codex_apps_tools_cache_context(
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn declared_openai_file_fields_treat_names_literally() {
|
||||
let meta = serde_json::json!({
|
||||
"openai/fileParams": ["file", "input_file", "attachments"]
|
||||
});
|
||||
let meta = meta.as_object().expect("meta object");
|
||||
|
||||
assert_eq!(
|
||||
declared_openai_file_input_param_names(Some(meta)),
|
||||
vec![
|
||||
"file".to_string(),
|
||||
"input_file".to_string(),
|
||||
"attachments".to_string(),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tool_with_model_visible_input_schema_masks_file_params() {
|
||||
let mut tool = create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "upload").tool;
|
||||
tool.input_schema = Arc::new(
|
||||
serde_json::json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"file": {
|
||||
"type": "object",
|
||||
"description": "Original file payload."
|
||||
},
|
||||
"files": {
|
||||
"type": "array",
|
||||
"items": {"type": "object"}
|
||||
}
|
||||
}
|
||||
})
|
||||
.as_object()
|
||||
.expect("object")
|
||||
.clone(),
|
||||
);
|
||||
tool.meta = Some(Meta(
|
||||
serde_json::json!({
|
||||
"openai/fileParams": ["file", "files"]
|
||||
})
|
||||
.as_object()
|
||||
.expect("object")
|
||||
.clone(),
|
||||
));
|
||||
|
||||
let tool = tool_with_model_visible_input_schema(&tool);
|
||||
|
||||
assert_eq!(
|
||||
*tool.input_schema,
|
||||
serde_json::json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"file": {
|
||||
"type": "string",
|
||||
"description": "Original file payload. This parameter expects an absolute local file path. If you want to upload a file, provide the absolute path to that file here."
|
||||
},
|
||||
"files": {
|
||||
"type": "array",
|
||||
"items": {"type": "string"},
|
||||
"description": "This parameter expects an absolute local file path. If you want to upload a file, provide the absolute path to that file here."
|
||||
}
|
||||
}
|
||||
})
|
||||
.as_object()
|
||||
.expect("object")
|
||||
.clone()
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn tool_with_model_visible_input_schema_leaves_tools_without_file_params_unchanged() {
|
||||
let original_tool = create_test_tool("custom", "upload").tool;
|
||||
|
||||
let tool = tool_with_model_visible_input_schema(&original_tool);
|
||||
|
||||
assert_eq!(tool, original_tool);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn elicitation_granular_policy_defaults_to_prompting() {
|
||||
assert!(!elicitation_is_rejected_by_policy(
|
||||
|
||||
@@ -33,7 +33,7 @@ use crate::realtime_conversation::handle_close as handle_realtime_conversation_c
|
||||
use crate::realtime_conversation::handle_start as handle_realtime_conversation_start;
|
||||
use crate::realtime_conversation::handle_text as handle_realtime_conversation_text;
|
||||
use crate::render_skills_section;
|
||||
use crate::rollout::session_index;
|
||||
use crate::rollout::find_thread_name_by_id;
|
||||
use crate::session_prefix::format_subagent_notification_message;
|
||||
use crate::skills_load_input_from_config;
|
||||
use crate::stream_events_utils::HandleOutputCtx;
|
||||
@@ -1096,6 +1096,26 @@ fn local_time_context() -> (String, String) {
|
||||
}
|
||||
}
|
||||
|
||||
async fn thread_title_from_state_db(
|
||||
state_db: Option<&state_db::StateDbHandle>,
|
||||
codex_home: &Path,
|
||||
conversation_id: ThreadId,
|
||||
) -> Option<String> {
|
||||
if let Some(metadata) = state_db
|
||||
&& let Some(metadata) = metadata.get_thread(conversation_id).await.ok().flatten()
|
||||
{
|
||||
let title = metadata.title.trim();
|
||||
if !title.is_empty() && metadata.first_user_message.as_deref().map(str::trim) != Some(title)
|
||||
{
|
||||
return Some(title.to_string());
|
||||
}
|
||||
}
|
||||
find_thread_name_by_id(codex_home, &conversation_id)
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct SessionConfiguration {
|
||||
/// Provider identifier ("openai", "openrouter", ...).
|
||||
@@ -1882,19 +1902,12 @@ impl Session {
|
||||
tx
|
||||
};
|
||||
let thread_name =
|
||||
match session_index::find_thread_name_by_id(&config.codex_home, &conversation_id)
|
||||
thread_title_from_state_db(state_db_ctx.as_ref(), &config.codex_home, conversation_id)
|
||||
.instrument(info_span!(
|
||||
"session_init.thread_name_lookup",
|
||||
otel.name = "session_init.thread_name_lookup",
|
||||
))
|
||||
.await
|
||||
{
|
||||
Ok(name) => name,
|
||||
Err(err) => {
|
||||
warn!("Failed to read session index for thread name: {err}");
|
||||
None
|
||||
}
|
||||
};
|
||||
.await;
|
||||
session_configuration.thread_name = thread_name.clone();
|
||||
let state = SessionState::new(session_configuration.clone());
|
||||
let managed_network_requirements_enabled = config.managed_network_requirements_enabled();
|
||||
@@ -4848,7 +4861,6 @@ mod handlers {
|
||||
|
||||
use crate::review_prompts::resolve_review_request;
|
||||
use crate::rollout::RolloutRecorder;
|
||||
use crate::rollout::session_index;
|
||||
use crate::tasks::CompactTask;
|
||||
use crate::tasks::UndoTask;
|
||||
use crate::tasks::UserShellCommandMode;
|
||||
@@ -5528,13 +5540,25 @@ mod handlers {
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Persists the thread name in the session index, updates in-memory state, and emits
|
||||
/// a `ThreadNameUpdated` event on success.
|
||||
///
|
||||
/// This appends the name to `CODEX_HOME/sessions_index.jsonl` via `session_index::append_thread_name` for the
|
||||
/// current `thread_id`, then updates `SessionConfiguration::thread_name`.
|
||||
///
|
||||
/// Returns an error event if the name is empty or session persistence is disabled.
|
||||
async fn persist_thread_name_update(
|
||||
sess: &Arc<Session>,
|
||||
event: ThreadNameUpdatedEvent,
|
||||
) -> anyhow::Result<EventMsg> {
|
||||
let msg = EventMsg::ThreadNameUpdated(event);
|
||||
let item = RolloutItem::EventMsg(msg.clone());
|
||||
let recorder = {
|
||||
let guard = sess.services.rollout.lock().await;
|
||||
guard.clone()
|
||||
}
|
||||
.ok_or_else(|| anyhow::anyhow!("Session persistence is disabled; cannot rename thread."))?;
|
||||
recorder.persist().await?;
|
||||
recorder.record_items(std::slice::from_ref(&item)).await?;
|
||||
recorder.flush().await?;
|
||||
Ok(msg)
|
||||
}
|
||||
|
||||
/// Persists the thread name in the rollout and state database, updates in-memory state, and
|
||||
/// emits a `ThreadNameUpdated` event on success.
|
||||
pub async fn set_thread_name(sess: &Arc<Session>, sub_id: String, name: String) {
|
||||
let Some(name) = crate::util::normalize_thread_name(&name) else {
|
||||
let event = Event {
|
||||
@@ -5548,47 +5572,33 @@ mod handlers {
|
||||
return;
|
||||
};
|
||||
|
||||
let persistence_enabled = {
|
||||
let rollout = sess.services.rollout.lock().await;
|
||||
rollout.is_some()
|
||||
};
|
||||
if !persistence_enabled {
|
||||
let event = Event {
|
||||
id: sub_id,
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: "Session persistence is disabled; cannot rename thread.".to_string(),
|
||||
codex_error_info: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
};
|
||||
sess.send_event_raw(event).await;
|
||||
return;
|
||||
let updated = ThreadNameUpdatedEvent {
|
||||
thread_id: sess.conversation_id,
|
||||
thread_name: Some(name.clone()),
|
||||
};
|
||||
|
||||
if let Err(e) = sess.try_ensure_rollout_materialized().await {
|
||||
let event = Event {
|
||||
id: sub_id,
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: format!("Failed to set thread name: {e}"),
|
||||
codex_error_info: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
};
|
||||
sess.send_event_raw(event).await;
|
||||
return;
|
||||
}
|
||||
let msg = match persist_thread_name_update(sess, updated).await {
|
||||
Ok(msg) => msg,
|
||||
Err(err) => {
|
||||
warn!("Failed to persist thread name update to rollout: {err}");
|
||||
let event = Event {
|
||||
id: sub_id,
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: err.to_string(),
|
||||
codex_error_info: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
};
|
||||
sess.send_event_raw(event).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let codex_home = sess.codex_home().await;
|
||||
if let Err(e) =
|
||||
session_index::append_thread_name(&codex_home, sess.conversation_id, &name).await
|
||||
if let Some(state_db) = sess.services.state_db.as_deref()
|
||||
&& let Err(err) = state_db
|
||||
.update_thread_title(sess.conversation_id, &name)
|
||||
.await
|
||||
{
|
||||
let event = Event {
|
||||
id: sub_id,
|
||||
msg: EventMsg::Error(ErrorEvent {
|
||||
message: format!("Failed to set thread name: {e}"),
|
||||
codex_error_info: Some(CodexErrorInfo::Other),
|
||||
}),
|
||||
};
|
||||
sess.send_event_raw(event).await;
|
||||
return;
|
||||
warn!("Failed to update thread title in state db: {err}");
|
||||
}
|
||||
|
||||
{
|
||||
@@ -5596,14 +5606,14 @@ mod handlers {
|
||||
state.session_configuration.thread_name = Some(name.clone());
|
||||
}
|
||||
|
||||
sess.send_event_raw(Event {
|
||||
id: sub_id,
|
||||
msg: EventMsg::ThreadNameUpdated(ThreadNameUpdatedEvent {
|
||||
thread_id: sess.conversation_id,
|
||||
thread_name: Some(name),
|
||||
}),
|
||||
})
|
||||
.await;
|
||||
let codex_home = sess.codex_home().await;
|
||||
if let Err(err) =
|
||||
crate::rollout::append_thread_name(&codex_home, sess.conversation_id, &name).await
|
||||
{
|
||||
warn!("Failed to update legacy thread name index: {err}");
|
||||
}
|
||||
|
||||
sess.deliver_event_raw(Event { id: sub_id, msg }).await;
|
||||
}
|
||||
|
||||
pub async fn shutdown(sess: &Arc<Session>, sub_id: String) -> bool {
|
||||
|
||||
@@ -56,6 +56,7 @@ mod original_image_detail;
|
||||
pub use codex_mcp::MCP_SANDBOX_STATE_CAPABILITY;
|
||||
pub use codex_mcp::MCP_SANDBOX_STATE_METHOD;
|
||||
pub use codex_mcp::SandboxState;
|
||||
mod mcp_openai_file;
|
||||
mod mcp_tool_call;
|
||||
mod memories;
|
||||
pub(crate) mod mention_syntax;
|
||||
@@ -165,10 +166,10 @@ pub use rollout::append_thread_name;
|
||||
pub use rollout::find_archived_thread_path_by_id_str;
|
||||
#[deprecated(note = "use find_thread_path_by_id_str")]
|
||||
pub use rollout::find_conversation_path_by_id_str;
|
||||
pub use rollout::find_thread_meta_by_name_str;
|
||||
pub use rollout::find_thread_name_by_id;
|
||||
pub use rollout::find_thread_names_by_ids;
|
||||
pub use rollout::find_thread_path_by_id_str;
|
||||
pub use rollout::find_thread_path_by_name_str;
|
||||
pub use rollout::parse_cursor;
|
||||
pub use rollout::read_head_for_summary;
|
||||
pub use rollout::read_session_meta_line;
|
||||
|
||||
473
codex-rs/core/src/mcp_openai_file.rs
Normal file
473
codex-rs/core/src/mcp_openai_file.rs
Normal file
@@ -0,0 +1,473 @@
|
||||
//! Bridges Apps SDK-style `openai/fileParams` metadata into Codex's MCP flow.
|
||||
//!
|
||||
//! Strategy:
|
||||
//! - Inspect `_meta["openai/fileParams"]` to discover which tool arguments are
|
||||
//! file inputs.
|
||||
//! - At tool execution time, upload those local files to OpenAI file storage
|
||||
//! and rewrite only the declared arguments into the provided-file payload
|
||||
//! shape expected by the downstream Apps tool.
|
||||
//!
|
||||
//! Model-visible schema masking is owned by `codex-mcp` alongside MCP tool
|
||||
//! inventory, so this module only handles the execution-time argument rewrite.
|
||||
|
||||
use crate::codex::Session;
|
||||
use crate::codex::TurnContext;
|
||||
use codex_api::CoreAuthProvider;
|
||||
use codex_api::upload_local_file;
|
||||
use codex_login::CodexAuth;
|
||||
use serde_json::Value as JsonValue;
|
||||
|
||||
pub(crate) async fn rewrite_mcp_tool_arguments_for_openai_files(
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
arguments_value: Option<JsonValue>,
|
||||
openai_file_input_params: Option<&[String]>,
|
||||
) -> Result<Option<JsonValue>, String> {
|
||||
let Some(openai_file_input_params) = openai_file_input_params else {
|
||||
return Ok(arguments_value);
|
||||
};
|
||||
|
||||
let Some(arguments_value) = arguments_value else {
|
||||
return Ok(None);
|
||||
};
|
||||
let Some(arguments) = arguments_value.as_object() else {
|
||||
return Ok(Some(arguments_value));
|
||||
};
|
||||
let auth = sess.services.auth_manager.auth().await;
|
||||
let mut rewritten_arguments = arguments.clone();
|
||||
|
||||
for field_name in openai_file_input_params {
|
||||
let Some(value) = arguments.get(field_name) else {
|
||||
continue;
|
||||
};
|
||||
let Some(uploaded_value) =
|
||||
rewrite_argument_value_for_openai_files(turn_context, auth.as_ref(), field_name, value)
|
||||
.await?
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
rewritten_arguments.insert(field_name.clone(), uploaded_value);
|
||||
}
|
||||
|
||||
if rewritten_arguments == *arguments {
|
||||
return Ok(Some(arguments_value));
|
||||
}
|
||||
|
||||
Ok(Some(JsonValue::Object(rewritten_arguments)))
|
||||
}
|
||||
|
||||
async fn rewrite_argument_value_for_openai_files(
|
||||
turn_context: &TurnContext,
|
||||
auth: Option<&CodexAuth>,
|
||||
field_name: &str,
|
||||
value: &JsonValue,
|
||||
) -> Result<Option<JsonValue>, String> {
|
||||
match value {
|
||||
JsonValue::String(path_or_file_ref) => {
|
||||
let rewritten = build_uploaded_local_argument_value(
|
||||
turn_context,
|
||||
auth,
|
||||
field_name,
|
||||
/*index*/ None,
|
||||
path_or_file_ref,
|
||||
)
|
||||
.await?;
|
||||
Ok(Some(rewritten))
|
||||
}
|
||||
JsonValue::Array(values) => {
|
||||
let mut rewritten_values = Vec::with_capacity(values.len());
|
||||
for (index, item) in values.iter().enumerate() {
|
||||
let Some(path_or_file_ref) = item.as_str() else {
|
||||
return Ok(None);
|
||||
};
|
||||
let rewritten = build_uploaded_local_argument_value(
|
||||
turn_context,
|
||||
auth,
|
||||
field_name,
|
||||
Some(index),
|
||||
path_or_file_ref,
|
||||
)
|
||||
.await?;
|
||||
rewritten_values.push(rewritten);
|
||||
}
|
||||
Ok(Some(JsonValue::Array(rewritten_values)))
|
||||
}
|
||||
_ => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_uploaded_local_argument_value(
|
||||
turn_context: &TurnContext,
|
||||
auth: Option<&CodexAuth>,
|
||||
field_name: &str,
|
||||
index: Option<usize>,
|
||||
file_path: &str,
|
||||
) -> Result<JsonValue, String> {
|
||||
let resolved_path = turn_context.resolve_path(Some(file_path.to_string()));
|
||||
let Some(auth) = auth else {
|
||||
return Err(
|
||||
"ChatGPT auth is required to upload local files for Codex Apps tools".to_string(),
|
||||
);
|
||||
};
|
||||
let token_data = auth
|
||||
.get_token_data()
|
||||
.map_err(|error| format!("failed to read ChatGPT auth for file upload: {error}"))?;
|
||||
let upload_auth = CoreAuthProvider {
|
||||
token: Some(token_data.access_token),
|
||||
account_id: token_data.account_id,
|
||||
};
|
||||
let uploaded = upload_local_file(
|
||||
turn_context.config.chatgpt_base_url.trim_end_matches('/'),
|
||||
&upload_auth,
|
||||
&resolved_path,
|
||||
)
|
||||
.await
|
||||
.map_err(|error| match index {
|
||||
Some(index) => {
|
||||
format!("failed to upload `{file_path}` for `{field_name}[{index}]`: {error}")
|
||||
}
|
||||
None => format!("failed to upload `{file_path}` for `{field_name}`: {error}"),
|
||||
})?;
|
||||
Ok(serde_json::json!({
|
||||
"download_url": uploaded.download_url,
|
||||
"file_id": uploaded.file_id,
|
||||
"mime_type": uploaded.mime_type,
|
||||
"file_name": uploaded.file_name,
|
||||
"uri": uploaded.uri,
|
||||
"file_size_bytes": uploaded.file_size_bytes,
|
||||
}))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::codex::make_session_and_context;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::sync::Arc;
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[tokio::test]
|
||||
async fn openai_file_argument_rewrite_requires_declared_file_params() {
|
||||
let (session, turn_context) = make_session_and_context().await;
|
||||
let arguments = Some(serde_json::json!({
|
||||
"file": "/tmp/codex-smoke-file.txt"
|
||||
}));
|
||||
|
||||
let rewritten = rewrite_mcp_tool_arguments_for_openai_files(
|
||||
&session,
|
||||
&Arc::new(turn_context),
|
||||
arguments.clone(),
|
||||
/*openai_file_input_params*/ None,
|
||||
)
|
||||
.await
|
||||
.expect("rewrite should succeed");
|
||||
|
||||
assert_eq!(rewritten, arguments);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn build_uploaded_local_argument_value_uploads_local_file_path() {
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::body_json;
|
||||
use wiremock::matchers::header;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/backend-api/files"))
|
||||
.and(header("chatgpt-account-id", "account_id"))
|
||||
.and(body_json(serde_json::json!({
|
||||
"file_name": "file_report.csv",
|
||||
"file_size": 5,
|
||||
"use_case": "codex",
|
||||
})))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
|
||||
"file_id": "file_123",
|
||||
"upload_url": format!("{}/upload/file_123", server.uri()),
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("PUT"))
|
||||
.and(path("/upload/file_123"))
|
||||
.respond_with(ResponseTemplate::new(200))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/backend-api/files/file_123/uploaded"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
|
||||
"status": "success",
|
||||
"download_url": format!("{}/download/file_123", server.uri()),
|
||||
"file_name": "file_report.csv",
|
||||
"mime_type": "text/csv",
|
||||
"file_size_bytes": 5,
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let (_, mut turn_context) = make_session_and_context().await;
|
||||
let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing();
|
||||
let dir = tempdir().expect("temp dir");
|
||||
let local_path = dir.path().join("file_report.csv");
|
||||
tokio::fs::write(&local_path, b"hello")
|
||||
.await
|
||||
.expect("write local file");
|
||||
turn_context.cwd = AbsolutePathBuf::try_from(dir.path()).expect("absolute path");
|
||||
|
||||
let mut config = (*turn_context.config).clone();
|
||||
config.chatgpt_base_url = format!("{}/backend-api", server.uri());
|
||||
turn_context.config = Arc::new(config);
|
||||
|
||||
let rewritten = build_uploaded_local_argument_value(
|
||||
&turn_context,
|
||||
Some(&auth),
|
||||
"file",
|
||||
/*index*/ None,
|
||||
"file_report.csv",
|
||||
)
|
||||
.await
|
||||
.expect("rewrite should upload the local file");
|
||||
|
||||
assert_eq!(
|
||||
rewritten,
|
||||
serde_json::json!({
|
||||
"download_url": format!("{}/download/file_123", server.uri()),
|
||||
"file_id": "file_123",
|
||||
"mime_type": "text/csv",
|
||||
"file_name": "file_report.csv",
|
||||
"uri": "sediment://file_123",
|
||||
"file_size_bytes": 5,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rewrite_argument_value_for_openai_files_rewrites_scalar_path() {
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::body_json;
|
||||
use wiremock::matchers::header;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/backend-api/files"))
|
||||
.and(header("chatgpt-account-id", "account_id"))
|
||||
.and(body_json(serde_json::json!({
|
||||
"file_name": "file_report.csv",
|
||||
"file_size": 5,
|
||||
"use_case": "codex",
|
||||
})))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
|
||||
"file_id": "file_123",
|
||||
"upload_url": format!("{}/upload/file_123", server.uri()),
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("PUT"))
|
||||
.and(path("/upload/file_123"))
|
||||
.respond_with(ResponseTemplate::new(200))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/backend-api/files/file_123/uploaded"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
|
||||
"status": "success",
|
||||
"download_url": format!("{}/download/file_123", server.uri()),
|
||||
"file_name": "file_report.csv",
|
||||
"mime_type": "text/csv",
|
||||
"file_size_bytes": 5,
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let (_, mut turn_context) = make_session_and_context().await;
|
||||
let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing();
|
||||
let dir = tempdir().expect("temp dir");
|
||||
let local_path = dir.path().join("file_report.csv");
|
||||
tokio::fs::write(&local_path, b"hello")
|
||||
.await
|
||||
.expect("write local file");
|
||||
turn_context.cwd = AbsolutePathBuf::try_from(dir.path()).expect("absolute path");
|
||||
|
||||
let mut config = (*turn_context.config).clone();
|
||||
config.chatgpt_base_url = format!("{}/backend-api", server.uri());
|
||||
turn_context.config = Arc::new(config);
|
||||
let rewritten = rewrite_argument_value_for_openai_files(
|
||||
&turn_context,
|
||||
Some(&auth),
|
||||
"file",
|
||||
&serde_json::json!("file_report.csv"),
|
||||
)
|
||||
.await
|
||||
.expect("rewrite should succeed");
|
||||
|
||||
assert_eq!(
|
||||
rewritten,
|
||||
Some(serde_json::json!({
|
||||
"download_url": format!("{}/download/file_123", server.uri()),
|
||||
"file_id": "file_123",
|
||||
"mime_type": "text/csv",
|
||||
"file_name": "file_report.csv",
|
||||
"uri": "sediment://file_123",
|
||||
"file_size_bytes": 5,
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rewrite_argument_value_for_openai_files_rewrites_array_paths() {
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::body_json;
|
||||
use wiremock::matchers::header;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
let server = MockServer::start().await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/backend-api/files"))
|
||||
.and(header("chatgpt-account-id", "account_id"))
|
||||
.and(body_json(serde_json::json!({
|
||||
"file_name": "one.csv",
|
||||
"file_size": 3,
|
||||
"use_case": "codex",
|
||||
})))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
|
||||
"file_id": "file_1",
|
||||
"upload_url": format!("{}/upload/file_1", server.uri()),
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/backend-api/files"))
|
||||
.and(header("chatgpt-account-id", "account_id"))
|
||||
.and(body_json(serde_json::json!({
|
||||
"file_name": "two.csv",
|
||||
"file_size": 3,
|
||||
"use_case": "codex",
|
||||
})))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
|
||||
"file_id": "file_2",
|
||||
"upload_url": format!("{}/upload/file_2", server.uri()),
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("PUT"))
|
||||
.and(path("/upload/file_1"))
|
||||
.respond_with(ResponseTemplate::new(200))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("PUT"))
|
||||
.and(path("/upload/file_2"))
|
||||
.respond_with(ResponseTemplate::new(200))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/backend-api/files/file_1/uploaded"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
|
||||
"status": "success",
|
||||
"download_url": format!("{}/download/file_1", server.uri()),
|
||||
"file_name": "one.csv",
|
||||
"mime_type": "text/csv",
|
||||
"file_size_bytes": 3,
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/backend-api/files/file_2/uploaded"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
|
||||
"status": "success",
|
||||
"download_url": format!("{}/download/file_2", server.uri()),
|
||||
"file_name": "two.csv",
|
||||
"mime_type": "text/csv",
|
||||
"file_size_bytes": 3,
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let (_, mut turn_context) = make_session_and_context().await;
|
||||
let auth = CodexAuth::create_dummy_chatgpt_auth_for_testing();
|
||||
let dir = tempdir().expect("temp dir");
|
||||
tokio::fs::write(dir.path().join("one.csv"), b"one")
|
||||
.await
|
||||
.expect("write first local file");
|
||||
tokio::fs::write(dir.path().join("two.csv"), b"two")
|
||||
.await
|
||||
.expect("write second local file");
|
||||
turn_context.cwd = AbsolutePathBuf::try_from(dir.path()).expect("absolute path");
|
||||
|
||||
let mut config = (*turn_context.config).clone();
|
||||
config.chatgpt_base_url = format!("{}/backend-api", server.uri());
|
||||
turn_context.config = Arc::new(config);
|
||||
let rewritten = rewrite_argument_value_for_openai_files(
|
||||
&turn_context,
|
||||
Some(&auth),
|
||||
"files",
|
||||
&serde_json::json!(["one.csv", "two.csv"]),
|
||||
)
|
||||
.await
|
||||
.expect("rewrite should succeed");
|
||||
|
||||
assert_eq!(
|
||||
rewritten,
|
||||
Some(serde_json::json!([
|
||||
{
|
||||
"download_url": format!("{}/download/file_1", server.uri()),
|
||||
"file_id": "file_1",
|
||||
"mime_type": "text/csv",
|
||||
"file_name": "one.csv",
|
||||
"uri": "sediment://file_1",
|
||||
"file_size_bytes": 3,
|
||||
},
|
||||
{
|
||||
"download_url": format!("{}/download/file_2", server.uri()),
|
||||
"file_id": "file_2",
|
||||
"mime_type": "text/csv",
|
||||
"file_name": "two.csv",
|
||||
"uri": "sediment://file_2",
|
||||
"file_size_bytes": 3,
|
||||
}
|
||||
]))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rewrite_mcp_tool_arguments_for_openai_files_surfaces_upload_failures() {
|
||||
let (mut session, turn_context) = make_session_and_context().await;
|
||||
session.services.auth_manager = crate::test_support::auth_manager_from_auth(
|
||||
CodexAuth::create_dummy_chatgpt_auth_for_testing(),
|
||||
);
|
||||
let error = rewrite_mcp_tool_arguments_for_openai_files(
|
||||
&session,
|
||||
&turn_context,
|
||||
Some(serde_json::json!({
|
||||
"file": "/definitely/missing/file.csv",
|
||||
})),
|
||||
Some(&["file".to_string()]),
|
||||
)
|
||||
.await
|
||||
.expect_err("missing file should fail");
|
||||
|
||||
assert!(error.contains("failed to upload"));
|
||||
assert!(error.contains("file"));
|
||||
}
|
||||
}
|
||||
@@ -26,6 +26,7 @@ use crate::guardian::guardian_approval_request_to_json;
|
||||
use crate::guardian::guardian_rejection_message;
|
||||
use crate::guardian::review_approval_request;
|
||||
use crate::guardian::routes_approval_to_guardian;
|
||||
use crate::mcp_openai_file::rewrite_mcp_tool_arguments_for_openai_files;
|
||||
use crate::mcp_tool_approval_templates::RenderedMcpToolApprovalParam;
|
||||
use crate::mcp_tool_approval_templates::render_mcp_tool_approval_template;
|
||||
use codex_analytics::AppInvocation;
|
||||
@@ -34,10 +35,11 @@ use codex_analytics::build_track_events_context;
|
||||
use codex_config::types::AppToolApproval;
|
||||
use codex_features::Feature;
|
||||
use codex_mcp::CODEX_APPS_MCP_SERVER_NAME;
|
||||
use codex_mcp::mcp_permission_prompt_is_auto_approved;
|
||||
use codex_mcp::declared_openai_file_input_param_names;
|
||||
use codex_otel::sanitize_metric_tag_value;
|
||||
use codex_protocol::mcp::CallToolResult;
|
||||
use codex_protocol::openai_models::InputModality;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::McpInvocation;
|
||||
use codex_protocol::protocol::McpToolCallBeginEvent;
|
||||
@@ -178,14 +180,16 @@ pub(crate) async fn handle_mcp_tool_call(
|
||||
|
||||
let start = Instant::now();
|
||||
let result = async {
|
||||
sess.call_tool(
|
||||
execute_mcp_tool_call(
|
||||
sess.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
&server,
|
||||
&tool_name,
|
||||
arguments_value.clone(),
|
||||
metadata.as_ref(),
|
||||
request_meta.clone(),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| format!("tool call error: {e:?}"))
|
||||
}
|
||||
.instrument(mcp_tool_call_span(
|
||||
sess.as_ref(),
|
||||
@@ -200,13 +204,6 @@ pub(crate) async fn handle_mcp_tool_call(
|
||||
},
|
||||
))
|
||||
.await;
|
||||
let result = sanitize_mcp_tool_result_for_model(
|
||||
turn_context
|
||||
.model_info
|
||||
.input_modalities
|
||||
.contains(&InputModality::Image),
|
||||
result,
|
||||
);
|
||||
if let Err(error) = &result {
|
||||
tracing::warn!("MCP tool call error: {error:?}");
|
||||
}
|
||||
@@ -294,11 +291,17 @@ pub(crate) async fn handle_mcp_tool_call(
|
||||
maybe_mark_thread_memory_mode_polluted(sess.as_ref(), turn_context.as_ref()).await;
|
||||
|
||||
let start = Instant::now();
|
||||
// Perform the tool call.
|
||||
let result = async {
|
||||
sess.call_tool(&server, &tool_name, arguments_value.clone(), request_meta)
|
||||
.await
|
||||
.map_err(|e| format!("tool call error: {e:?}"))
|
||||
execute_mcp_tool_call(
|
||||
sess.as_ref(),
|
||||
turn_context.as_ref(),
|
||||
&server,
|
||||
&tool_name,
|
||||
arguments_value.clone(),
|
||||
metadata.as_ref(),
|
||||
request_meta,
|
||||
)
|
||||
.await
|
||||
}
|
||||
.instrument(mcp_tool_call_span(
|
||||
sess.as_ref(),
|
||||
@@ -313,13 +316,6 @@ pub(crate) async fn handle_mcp_tool_call(
|
||||
},
|
||||
))
|
||||
.await;
|
||||
let result = sanitize_mcp_tool_result_for_model(
|
||||
turn_context
|
||||
.model_info
|
||||
.input_modalities
|
||||
.contains(&InputModality::Image),
|
||||
result,
|
||||
);
|
||||
if let Err(error) = &result {
|
||||
tracing::warn!("MCP tool call error: {error:?}");
|
||||
}
|
||||
@@ -453,6 +449,35 @@ fn record_server_fields(span: &Span, url: Option<&str>) {
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_mcp_tool_call(
|
||||
sess: &Session,
|
||||
turn_context: &TurnContext,
|
||||
server: &str,
|
||||
tool_name: &str,
|
||||
arguments_value: Option<serde_json::Value>,
|
||||
metadata: Option<&McpToolApprovalMetadata>,
|
||||
request_meta: Option<serde_json::Value>,
|
||||
) -> Result<CallToolResult, String> {
|
||||
let rewritten_arguments = rewrite_mcp_tool_arguments_for_openai_files(
|
||||
sess,
|
||||
turn_context,
|
||||
arguments_value,
|
||||
metadata.and_then(|metadata| metadata.openai_file_input_params.as_deref()),
|
||||
)
|
||||
.await?;
|
||||
let result = sess
|
||||
.call_tool(server, tool_name, rewritten_arguments, request_meta)
|
||||
.await
|
||||
.map_err(|e| format!("tool call error: {e:?}"))?;
|
||||
sanitize_mcp_tool_result_for_model(
|
||||
turn_context
|
||||
.model_info
|
||||
.input_modalities
|
||||
.contains(&InputModality::Image),
|
||||
Ok(result),
|
||||
)
|
||||
}
|
||||
|
||||
async fn maybe_mark_thread_memory_mode_polluted(sess: &Session, turn_context: &TurnContext) {
|
||||
if !turn_context
|
||||
.config
|
||||
@@ -566,6 +591,7 @@ pub(crate) struct McpToolApprovalMetadata {
|
||||
tool_title: Option<String>,
|
||||
tool_description: Option<String>,
|
||||
codex_apps_meta: Option<serde_json::Map<String, serde_json::Value>>,
|
||||
openai_file_input_params: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
const MCP_TOOL_CODEX_APPS_META_KEY: &str = "_codex_apps";
|
||||
@@ -694,10 +720,7 @@ async fn maybe_request_mcp_tool_approval(
|
||||
metadata: Option<&McpToolApprovalMetadata>,
|
||||
approval_mode: AppToolApproval,
|
||||
) -> Option<McpToolApprovalDecision> {
|
||||
if mcp_permission_prompt_is_auto_approved(
|
||||
turn_context.approval_policy.value(),
|
||||
turn_context.sandbox_policy.get(),
|
||||
) {
|
||||
if turn_context.approval_policy.value() == AskForApproval::Never {
|
||||
return None;
|
||||
}
|
||||
|
||||
@@ -983,7 +1006,6 @@ pub(crate) async fn lookup_mcp_tool_metadata(
|
||||
.await
|
||||
.list_all_tools()
|
||||
.await;
|
||||
|
||||
let tool_info = tools
|
||||
.into_values()
|
||||
.find(|tool_info| tool_info.server_name == server && tool_info.tool.name == tool_name)?;
|
||||
@@ -1025,6 +1047,10 @@ pub(crate) async fn lookup_mcp_tool_metadata(
|
||||
.and_then(|meta| meta.get(MCP_TOOL_CODEX_APPS_META_KEY))
|
||||
.and_then(serde_json::Value::as_object)
|
||||
.cloned(),
|
||||
openai_file_input_params: Some(declared_openai_file_input_param_names(
|
||||
tool_info.tool.meta.as_deref(),
|
||||
))
|
||||
.filter(|params| !params.is_empty()),
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -59,6 +59,7 @@ fn approval_metadata(
|
||||
tool_title: tool_title.map(str::to_string),
|
||||
tool_description: tool_description.map(str::to_string),
|
||||
codex_apps_meta: None,
|
||||
openai_file_input_params: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -597,6 +598,7 @@ async fn codex_apps_tool_call_request_meta_includes_turn_metadata_and_codex_apps
|
||||
.cloned()
|
||||
.expect("_codex_apps metadata should be an object"),
|
||||
),
|
||||
openai_file_input_params: None,
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
@@ -744,6 +746,7 @@ fn guardian_mcp_review_request_includes_annotations_when_present() {
|
||||
tool_title: None,
|
||||
tool_description: None,
|
||||
codex_apps_meta: None,
|
||||
openai_file_input_params: None,
|
||||
};
|
||||
|
||||
let request = build_guardian_mcp_tool_review_request("call-1", &invocation, Some(&metadata));
|
||||
@@ -1254,6 +1257,7 @@ async fn approve_mode_skips_when_annotations_do_not_require_approval() {
|
||||
tool_title: Some("Read Only Tool".to_string()),
|
||||
tool_description: None,
|
||||
codex_apps_meta: None,
|
||||
openai_file_input_params: None,
|
||||
};
|
||||
|
||||
let decision = maybe_request_mcp_tool_approval(
|
||||
@@ -1321,6 +1325,7 @@ async fn guardian_mode_skips_auto_when_annotations_do_not_require_approval() {
|
||||
tool_title: Some("Read Only Tool".to_string()),
|
||||
tool_description: None,
|
||||
codex_apps_meta: None,
|
||||
openai_file_input_params: None,
|
||||
};
|
||||
|
||||
let decision = maybe_request_mcp_tool_approval(
|
||||
@@ -1391,6 +1396,7 @@ async fn guardian_mode_mcp_denial_returns_rationale_message() {
|
||||
tool_title: Some("Dangerous Tool".to_string()),
|
||||
tool_description: Some("Reads calendar data.".to_string()),
|
||||
codex_apps_meta: None,
|
||||
openai_file_input_params: None,
|
||||
};
|
||||
|
||||
let decision = maybe_request_mcp_tool_approval(
|
||||
@@ -1441,6 +1447,7 @@ async fn prompt_mode_waits_for_approval_when_annotations_do_not_require_approval
|
||||
tool_title: Some("Read Only Tool".to_string()),
|
||||
tool_description: None,
|
||||
codex_apps_meta: None,
|
||||
openai_file_input_params: None,
|
||||
};
|
||||
|
||||
let mut approval_task = {
|
||||
@@ -1517,6 +1524,7 @@ async fn approve_mode_blocks_when_arc_returns_interrupt_for_model() {
|
||||
tool_title: Some("Dangerous Tool".to_string()),
|
||||
tool_description: Some("Performs a risky action.".to_string()),
|
||||
codex_apps_meta: None,
|
||||
openai_file_input_params: None,
|
||||
};
|
||||
|
||||
let decision = maybe_request_mcp_tool_approval(
|
||||
@@ -1586,6 +1594,7 @@ async fn custom_approve_mode_blocks_when_arc_returns_interrupt_for_model() {
|
||||
tool_title: Some("Dangerous Tool".to_string()),
|
||||
tool_description: Some("Performs a risky action.".to_string()),
|
||||
codex_apps_meta: None,
|
||||
openai_file_input_params: None,
|
||||
};
|
||||
|
||||
let decision = maybe_request_mcp_tool_approval(
|
||||
@@ -1655,6 +1664,7 @@ async fn approve_mode_blocks_when_arc_returns_interrupt_without_annotations() {
|
||||
tool_title: Some("Dangerous Tool".to_string()),
|
||||
tool_description: Some("Performs a risky action.".to_string()),
|
||||
codex_apps_meta: None,
|
||||
openai_file_input_params: None,
|
||||
};
|
||||
|
||||
let decision = maybe_request_mcp_tool_approval(
|
||||
@@ -1676,7 +1686,7 @@ async fn approve_mode_blocks_when_arc_returns_interrupt_without_annotations() {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn full_access_mode_skips_arc_monitor_for_all_approval_modes() {
|
||||
async fn never_policy_approves_mcp_tool_calls_for_all_approval_modes() {
|
||||
use wiremock::Mock;
|
||||
use wiremock::MockServer;
|
||||
use wiremock::ResponseTemplate;
|
||||
@@ -1711,7 +1721,7 @@ async fn full_access_mode_skips_arc_monitor_for_all_approval_modes() {
|
||||
.expect("test setup should allow updating approval policy");
|
||||
turn_context
|
||||
.sandbox_policy
|
||||
.set(SandboxPolicy::DangerFullAccess)
|
||||
.set(SandboxPolicy::new_workspace_write_policy())
|
||||
.expect("test setup should allow updating sandbox policy");
|
||||
let mut config = (*turn_context.config).clone();
|
||||
config.chatgpt_base_url = server.uri();
|
||||
@@ -1732,6 +1742,7 @@ async fn full_access_mode_skips_arc_monitor_for_all_approval_modes() {
|
||||
tool_title: Some("Dangerous Tool".to_string()),
|
||||
tool_description: Some("Performs a risky action.".to_string()),
|
||||
codex_apps_meta: None,
|
||||
openai_file_input_params: None,
|
||||
};
|
||||
|
||||
for approval_mode in [
|
||||
@@ -1833,6 +1844,7 @@ async fn approve_mode_routes_arc_ask_user_to_guardian_when_guardian_reviewer_is_
|
||||
tool_title: Some("Dangerous Tool".to_string()),
|
||||
tool_description: Some("Performs a risky action.".to_string()),
|
||||
codex_apps_meta: None,
|
||||
openai_file_input_params: None,
|
||||
};
|
||||
|
||||
let decision = maybe_request_mcp_tool_approval(
|
||||
|
||||
@@ -25,6 +25,7 @@ pub(crate) use control::clear_memory_root_contents;
|
||||
pub(crate) use start::start_memories_startup_task;
|
||||
|
||||
mod artifacts {
|
||||
pub(super) const EXTENSIONS_SUBDIR: &str = "memories_extensions";
|
||||
pub(super) const ROLLOUT_SUMMARIES_SUBDIR: &str = "rollout_summaries";
|
||||
pub(super) const RAW_MEMORIES_FILENAME: &str = "raw_memories.md";
|
||||
}
|
||||
@@ -106,6 +107,10 @@ fn rollout_summaries_dir(root: &Path) -> PathBuf {
|
||||
root.join(artifacts::ROLLOUT_SUMMARIES_SUBDIR)
|
||||
}
|
||||
|
||||
fn memory_extensions_root(root: &Path) -> PathBuf {
|
||||
root.with_file_name(artifacts::EXTENSIONS_SUBDIR)
|
||||
}
|
||||
|
||||
fn raw_memories_file(root: &Path) -> PathBuf {
|
||||
root.join(artifacts::RAW_MEMORIES_FILENAME)
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use crate::memories::memory_extensions_root;
|
||||
use crate::memories::memory_root;
|
||||
use crate::memories::phase_one;
|
||||
use crate::memories::storage::rollout_summary_file_stem_from_parts;
|
||||
@@ -31,6 +32,18 @@ static MEMORY_TOOL_DEVELOPER_INSTRUCTIONS_TEMPLATE: LazyLock<Template> = LazyLoc
|
||||
"memories/read_path.md",
|
||||
)
|
||||
});
|
||||
static MEMORY_EXTENSIONS_FOLDER_STRUCTURE_TEMPLATE: LazyLock<Template> = LazyLock::new(|| {
|
||||
parse_embedded_template(
|
||||
MEMORY_EXTENSIONS_FOLDER_STRUCTURE,
|
||||
"memories/extensions_folder_structure.md",
|
||||
)
|
||||
});
|
||||
static MEMORY_EXTENSIONS_PRIMARY_INPUTS_TEMPLATE: LazyLock<Template> = LazyLock::new(|| {
|
||||
parse_embedded_template(
|
||||
MEMORY_EXTENSIONS_PRIMARY_INPUTS,
|
||||
"memories/extensions_primary_inputs.md",
|
||||
)
|
||||
});
|
||||
|
||||
fn parse_embedded_template(source: &'static str, template_name: &str) -> Template {
|
||||
match Template::parse(source) {
|
||||
@@ -39,24 +52,82 @@ fn parse_embedded_template(source: &'static str, template_name: &str) -> Templat
|
||||
}
|
||||
}
|
||||
|
||||
const MEMORY_EXTENSIONS_FOLDER_STRUCTURE: &str = r#"
|
||||
Memory extensions (under {{ memory_extensions_root }}/):
|
||||
|
||||
- <extension_name>/instructions.md
|
||||
- Source-specific guidance for interpreting additional memory signals. If an
|
||||
extension folder exists, you must read its instructions.md to determine how to use this memory
|
||||
source.
|
||||
|
||||
If the user has any memory extensions, you MUST read the instructions for each extension to
|
||||
determine how to use the memory source. If it has no extension folders, continue with the standard
|
||||
memory inputs only.
|
||||
"#;
|
||||
|
||||
const MEMORY_EXTENSIONS_PRIMARY_INPUTS: &str = r#"
|
||||
Optional source-specific inputs:
|
||||
Under `{{ memory_extensions_root }}/`:
|
||||
|
||||
- `<extension_name>/instructions.md`
|
||||
- If extension folders exist, read each instructions.md first and follow it when interpreting
|
||||
that extension's memory source.
|
||||
"#;
|
||||
|
||||
/// Builds the consolidation subagent prompt for a specific memory root.
|
||||
pub(super) fn build_consolidation_prompt(
|
||||
memory_root: &Path,
|
||||
selection: &Phase2InputSelection,
|
||||
) -> String {
|
||||
let memory_extensions_root = memory_extensions_root(memory_root);
|
||||
let memory_extensions_exist = memory_extensions_root.is_dir();
|
||||
let memory_root = memory_root.display().to_string();
|
||||
let memory_extensions_root = memory_extensions_root.display().to_string();
|
||||
let memory_extensions_folder_structure = if memory_extensions_exist {
|
||||
render_memory_extensions_block(
|
||||
&MEMORY_EXTENSIONS_FOLDER_STRUCTURE_TEMPLATE,
|
||||
&memory_extensions_root,
|
||||
)
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
let memory_extensions_primary_inputs = if memory_extensions_exist {
|
||||
render_memory_extensions_block(
|
||||
&MEMORY_EXTENSIONS_PRIMARY_INPUTS_TEMPLATE,
|
||||
&memory_extensions_root,
|
||||
)
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
let phase2_input_selection = render_phase2_input_selection(selection);
|
||||
CONSOLIDATION_PROMPT_TEMPLATE
|
||||
.render([
|
||||
("memory_root", memory_root.as_str()),
|
||||
(
|
||||
"memory_extensions_folder_structure",
|
||||
memory_extensions_folder_structure.as_str(),
|
||||
),
|
||||
(
|
||||
"memory_extensions_primary_inputs",
|
||||
memory_extensions_primary_inputs.as_str(),
|
||||
),
|
||||
("phase2_input_selection", phase2_input_selection.as_str()),
|
||||
])
|
||||
.unwrap_or_else(|err| {
|
||||
warn!("failed to render memories consolidation prompt template: {err}");
|
||||
format!(
|
||||
"## Memory Phase 2 (Consolidation)\nConsolidate Codex memories in: {memory_root}\n\n{phase2_input_selection}"
|
||||
)
|
||||
})
|
||||
warn!("failed to render memories consolidation prompt template: {err}");
|
||||
format!(
|
||||
"## Memory Phase 2 (Consolidation)\nConsolidate Codex memories in: {memory_root}\n\n{phase2_input_selection}"
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
fn render_memory_extensions_block(template: &Template, memory_extensions_root: &str) -> String {
|
||||
template
|
||||
.render([("memory_extensions_root", memory_extensions_root)])
|
||||
.unwrap_or_else(|err| {
|
||||
warn!("failed to render memories extension prompt block: {err}");
|
||||
String::new()
|
||||
})
|
||||
}
|
||||
|
||||
fn render_phase2_input_selection(selection: &Phase2InputSelection) -> String {
|
||||
|
||||
@@ -55,14 +55,56 @@ fn build_stage_one_input_message_uses_default_limit_when_model_context_window_mi
|
||||
|
||||
#[test]
|
||||
fn build_consolidation_prompt_renders_embedded_template() {
|
||||
let prompt =
|
||||
build_consolidation_prompt(Path::new("/tmp/memories"), &Phase2InputSelection::default());
|
||||
let temp = tempdir().unwrap();
|
||||
let memories_dir = temp.path().join("memories");
|
||||
|
||||
assert!(prompt.contains("Folder structure (under /tmp/memories/):"));
|
||||
let prompt = build_consolidation_prompt(&memories_dir, &Phase2InputSelection::default());
|
||||
|
||||
assert!(prompt.contains(&format!(
|
||||
"Folder structure (under {}/):",
|
||||
memories_dir.display()
|
||||
)));
|
||||
assert!(!prompt.contains("Memory extensions (under"));
|
||||
assert!(!prompt.contains("<extension_name>/instructions.md"));
|
||||
assert!(prompt.contains("**Diff since last consolidation:**"));
|
||||
assert!(prompt.contains("- selected inputs this run: 0"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn build_consolidation_prompt_points_to_extensions_without_inlining_them() {
|
||||
let temp = tempdir().unwrap();
|
||||
let memories_dir = temp.path().join("memories");
|
||||
let extension_dir = temp.path().join("memories_extensions/tape_recorder");
|
||||
tokio_fs::create_dir_all(extension_dir.join("resources"))
|
||||
.await
|
||||
.unwrap();
|
||||
tokio_fs::write(
|
||||
extension_dir.join("instructions.md"),
|
||||
"source-specific instructions\n",
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
tokio_fs::write(
|
||||
extension_dir.join("resources/notes.md"),
|
||||
"source-specific resource\n",
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let prompt = build_consolidation_prompt(&memories_dir, &Phase2InputSelection::default());
|
||||
let memory_extensions_dir = temp.path().join("memories_extensions");
|
||||
|
||||
assert!(prompt.contains(&format!(
|
||||
"Memory extensions (under {}/)",
|
||||
memory_extensions_dir.display()
|
||||
)));
|
||||
assert!(prompt.contains(&format!("Under `{}/`:", memory_extensions_dir.display())));
|
||||
assert!(prompt.contains("<extension_name>/instructions.md"));
|
||||
assert!(prompt.contains("Optional source-specific inputs:"));
|
||||
assert!(!prompt.contains("source-specific instructions"));
|
||||
assert!(!prompt.contains("source-specific resource"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn build_memory_tool_developer_instructions_renders_embedded_template() {
|
||||
let temp = tempdir().unwrap();
|
||||
|
||||
@@ -60,9 +60,9 @@ pub(crate) fn denied_network_policy_message(blocked: &BlockedRequest) -> Option<
|
||||
let detail = match blocked.reason.as_str() {
|
||||
"denied" => "domain is explicitly denied by policy and cannot be approved from this prompt",
|
||||
"not_allowed" => "domain is not on the allowlist for the current sandbox mode",
|
||||
"not_allowed_local" => "local/private network addresses are blocked by policy",
|
||||
"not_allowed_local" => "local/private network addresses are blocked by the sandbox policy",
|
||||
"method_not_allowed" => "request method is blocked by the current network mode",
|
||||
"proxy_disabled" => "managed network proxy is disabled",
|
||||
"proxy_disabled" => "network proxy is disabled",
|
||||
_ => "request is blocked by network policy",
|
||||
};
|
||||
|
||||
|
||||
@@ -14,10 +14,10 @@ pub use codex_rollout::append_thread_name;
|
||||
pub use codex_rollout::find_archived_thread_path_by_id_str;
|
||||
#[deprecated(note = "use find_thread_path_by_id_str")]
|
||||
pub use codex_rollout::find_conversation_path_by_id_str;
|
||||
pub use codex_rollout::find_thread_meta_by_name_str;
|
||||
pub use codex_rollout::find_thread_name_by_id;
|
||||
pub use codex_rollout::find_thread_names_by_ids;
|
||||
pub use codex_rollout::find_thread_path_by_id_str;
|
||||
pub use codex_rollout::find_thread_path_by_name_str;
|
||||
pub use codex_rollout::parse_cursor;
|
||||
pub use codex_rollout::read_head_for_summary;
|
||||
pub use codex_rollout::read_session_meta_line;
|
||||
@@ -66,11 +66,6 @@ pub(crate) mod recorder {
|
||||
pub use codex_rollout::RolloutRecorder;
|
||||
}
|
||||
|
||||
pub(crate) mod session_index {
|
||||
pub use codex_rollout::append_thread_name;
|
||||
pub use codex_rollout::find_thread_name_by_id;
|
||||
}
|
||||
|
||||
pub(crate) use crate::session_rollout_init_error::map_session_init_error;
|
||||
|
||||
pub(crate) mod truncation {
|
||||
|
||||
@@ -202,13 +202,22 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
|
||||
) -> Result<UnifiedExecProcess, ToolError> {
|
||||
let base_command = &req.command;
|
||||
let session_shell = ctx.session.user_shell();
|
||||
let command = maybe_wrap_shell_lc_with_snapshot(
|
||||
base_command,
|
||||
session_shell.as_ref(),
|
||||
&req.cwd,
|
||||
&req.explicit_env_overrides,
|
||||
&req.env,
|
||||
);
|
||||
let environment_is_remote = ctx
|
||||
.turn
|
||||
.environment
|
||||
.as_ref()
|
||||
.is_some_and(|environment| environment.is_remote());
|
||||
let command = if environment_is_remote {
|
||||
base_command.to_vec()
|
||||
} else {
|
||||
maybe_wrap_shell_lc_with_snapshot(
|
||||
base_command,
|
||||
session_shell.as_ref(),
|
||||
&req.cwd,
|
||||
&req.explicit_env_overrides,
|
||||
&req.env,
|
||||
)
|
||||
};
|
||||
let command = if matches!(session_shell.shell_type, ShellType::PowerShell) {
|
||||
prefix_powershell_script_with_utf8(&command)
|
||||
} else {
|
||||
|
||||
@@ -33,7 +33,7 @@ Folder structure (under {{ memory_root }}/):
|
||||
- Recap of the rollout, including lessons learned, reusable knowledge,
|
||||
pointers/references, and pruned raw evidence snippets. Distilled version of
|
||||
everything valuable from the raw rollout.
|
||||
|
||||
{{ memory_extensions_folder_structure }}
|
||||
============================================================
|
||||
GLOBAL SAFETY, HYGIENE, AND NO-FILLER RULES (STRICT)
|
||||
============================================================
|
||||
@@ -135,7 +135,7 @@ Under `{{ memory_root }}/`:
|
||||
- read the existing summary so updates stay consistent
|
||||
- `skills/*`
|
||||
- read existing skills so updates are incremental and non-duplicative
|
||||
|
||||
{{ memory_extensions_primary_inputs }}
|
||||
Mode selection:
|
||||
|
||||
- INIT phase: existing artifacts are missing/empty (especially `memory_summary.md`
|
||||
|
||||
@@ -22,6 +22,8 @@ const SEARCHABLE_TOOL_COUNT: usize = 100;
|
||||
pub const CALENDAR_CREATE_EVENT_RESOURCE_URI: &str =
|
||||
"connector://calendar/tools/calendar_create_event";
|
||||
const CALENDAR_LIST_EVENTS_RESOURCE_URI: &str = "connector://calendar/tools/calendar_list_events";
|
||||
pub const DOCUMENT_EXTRACT_TEXT_RESOURCE_URI: &str =
|
||||
"connector://calendar/tools/calendar_extract_text";
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppsTestServer {
|
||||
@@ -235,6 +237,39 @@ impl Respond for CodexAppsJsonRpcResponder {
|
||||
"connector_id": CONNECTOR_ID
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"name": "calendar_extract_text",
|
||||
"description": "Extract text from an uploaded document.",
|
||||
"annotations": {
|
||||
"readOnlyHint": false
|
||||
},
|
||||
"inputSchema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"file": {
|
||||
"type": "object",
|
||||
"description": "Document file payload.",
|
||||
"properties": {
|
||||
"file_id": { "type": "string" }
|
||||
},
|
||||
"required": ["file_id"]
|
||||
}
|
||||
},
|
||||
"required": ["file"],
|
||||
"additionalProperties": false
|
||||
},
|
||||
"_meta": {
|
||||
"connector_id": CONNECTOR_ID,
|
||||
"connector_name": self.connector_name.clone(),
|
||||
"connector_description": self.connector_description.clone(),
|
||||
"openai/fileParams": ["file"],
|
||||
"_codex_apps": {
|
||||
"resource_uri": DOCUMENT_EXTRACT_TEXT_RESOURCE_URI,
|
||||
"contains_mcp_source": true,
|
||||
"connector_id": CONNECTOR_ID
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"nextCursor": null
|
||||
@@ -245,7 +280,7 @@ impl Respond for CodexAppsJsonRpcResponder {
|
||||
.pointer_mut("/result/tools")
|
||||
.and_then(Value::as_array_mut)
|
||||
{
|
||||
for index in 2..SEARCHABLE_TOOL_COUNT {
|
||||
for index in 3..SEARCHABLE_TOOL_COUNT {
|
||||
tools.push(json!({
|
||||
"name": format!("calendar_timezone_option_{index}"),
|
||||
"description": format!("Read timezone option {index}."),
|
||||
@@ -283,6 +318,10 @@ impl Respond for CodexAppsJsonRpcResponder {
|
||||
.pointer("/params/arguments/starts_at")
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or_default();
|
||||
let file_id = body
|
||||
.pointer("/params/arguments/file/file_id")
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or_default();
|
||||
let codex_apps_meta = body.pointer("/params/_meta/_codex_apps").cloned();
|
||||
|
||||
ResponseTemplate::new(200).set_body_json(json!({
|
||||
@@ -291,7 +330,7 @@ impl Respond for CodexAppsJsonRpcResponder {
|
||||
"result": {
|
||||
"content": [{
|
||||
"type": "text",
|
||||
"text": format!("called {tool_name} for {title} at {starts_at}")
|
||||
"text": format!("called {tool_name} for {title} at {starts_at} with {file_id}")
|
||||
}],
|
||||
"structuredContent": {
|
||||
"_codex_apps": codex_apps_meta,
|
||||
|
||||
@@ -106,6 +106,7 @@ mod model_switching;
|
||||
mod model_visible_layout;
|
||||
mod models_cache_ttl;
|
||||
mod models_etag_responses;
|
||||
mod openai_file_mcp;
|
||||
mod otel;
|
||||
mod pending_input;
|
||||
mod permissions_messages;
|
||||
|
||||
173
codex-rs/core/tests/suite/openai_file_mcp.rs
Normal file
173
codex-rs/core/tests/suite/openai_file_mcp.rs
Normal file
@@ -0,0 +1,173 @@
|
||||
#![cfg(not(target_os = "windows"))]
|
||||
|
||||
use anyhow::Result;
|
||||
use codex_core::config::Config;
|
||||
use codex_features::Feature;
|
||||
use codex_login::CodexAuth;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use core_test_support::apps_test_server::AppsTestServer;
|
||||
use core_test_support::apps_test_server::DOCUMENT_EXTRACT_TEXT_RESOURCE_URI;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_function_call;
|
||||
use core_test_support::responses::ev_response_created;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
use core_test_support::responses::sse;
|
||||
use core_test_support::responses::start_mock_server;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use wiremock::Mock;
|
||||
use wiremock::ResponseTemplate;
|
||||
use wiremock::matchers::body_json;
|
||||
use wiremock::matchers::header;
|
||||
use wiremock::matchers::method;
|
||||
use wiremock::matchers::path;
|
||||
|
||||
const DOCUMENT_EXTRACT_TOOL: &str = "mcp__codex_apps__calendar_extract_text";
|
||||
|
||||
fn configure_apps(config: &mut Config, chatgpt_base_url: &str) {
|
||||
if let Err(err) = config.features.enable(Feature::Apps) {
|
||||
panic!("test config should allow feature update: {err}");
|
||||
}
|
||||
config.chatgpt_base_url = chatgpt_base_url.to_string();
|
||||
}
|
||||
|
||||
fn tool_by_name<'a>(body: &'a Value, name: &str) -> &'a Value {
|
||||
body.get("tools")
|
||||
.and_then(Value::as_array)
|
||||
.and_then(|tools| {
|
||||
tools.iter().find(|tool| {
|
||||
tool.get("name").and_then(Value::as_str) == Some(name)
|
||||
|| tool.get("type").and_then(Value::as_str) == Some(name)
|
||||
})
|
||||
})
|
||||
.unwrap_or_else(|| panic!("missing tool {name} in /v1/responses request: {body:?}"))
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn codex_apps_file_params_upload_local_paths_before_mcp_tool_call() -> Result<()> {
|
||||
let server = start_mock_server().await;
|
||||
let apps_server = AppsTestServer::mount(&server).await?;
|
||||
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/files"))
|
||||
.and(header("chatgpt-account-id", "account_id"))
|
||||
.and(body_json(json!({
|
||||
"file_name": "report.txt",
|
||||
"file_size": 11,
|
||||
"use_case": "codex",
|
||||
})))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"file_id": "file_123",
|
||||
"upload_url": format!("{}/upload/file_123", server.uri()),
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("PUT"))
|
||||
.and(path("/upload/file_123"))
|
||||
.and(header("content-length", "11"))
|
||||
.respond_with(ResponseTemplate::new(200))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
Mock::given(method("POST"))
|
||||
.and(path("/files/file_123/uploaded"))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
|
||||
"status": "success",
|
||||
"download_url": format!("{}/download/file_123", server.uri()),
|
||||
"file_name": "report.txt",
|
||||
"mime_type": "text/plain",
|
||||
"file_size_bytes": 11,
|
||||
})))
|
||||
.expect(1)
|
||||
.mount(&server)
|
||||
.await;
|
||||
|
||||
let call_id = "extract-call-1";
|
||||
let mock = mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_function_call(
|
||||
call_id,
|
||||
DOCUMENT_EXTRACT_TOOL,
|
||||
&json!({"file": "report.txt"}).to_string(),
|
||||
),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
sse(vec![
|
||||
ev_response_created("resp-2"),
|
||||
ev_assistant_message("msg-1", "done"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(move |config| configure_apps(config, apps_server.chatgpt_base_url.as_str()));
|
||||
let test = builder.build(&server).await?;
|
||||
tokio::fs::write(test.cwd.path().join("report.txt"), b"hello world").await?;
|
||||
|
||||
test.submit_turn_with_policies(
|
||||
"Extract the report text with the app tool.",
|
||||
AskForApproval::Never,
|
||||
SandboxPolicy::DangerFullAccess,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let requests = mock.requests();
|
||||
let body = requests[0].body_json();
|
||||
let extract_tool = tool_by_name(&body, DOCUMENT_EXTRACT_TOOL);
|
||||
assert_eq!(
|
||||
extract_tool.pointer("/parameters/properties/file"),
|
||||
Some(&json!({
|
||||
"type": "string",
|
||||
"description": "Document file payload. This parameter expects an absolute local file path. If you want to upload a file, provide the absolute path to that file here."
|
||||
}))
|
||||
);
|
||||
|
||||
let apps_tool_call = server
|
||||
.received_requests()
|
||||
.await
|
||||
.unwrap_or_default()
|
||||
.into_iter()
|
||||
.find_map(|request| {
|
||||
let body: Value = serde_json::from_slice(&request.body).ok()?;
|
||||
(request.url.path() == "/api/codex/apps"
|
||||
&& body.get("method").and_then(Value::as_str) == Some("tools/call")
|
||||
&& body.pointer("/params/name").and_then(Value::as_str)
|
||||
== Some("calendar_extract_text"))
|
||||
.then_some(body)
|
||||
})
|
||||
.expect("apps calendar_extract_text tools/call request should be recorded");
|
||||
|
||||
assert_eq!(
|
||||
apps_tool_call.pointer("/params/arguments/file"),
|
||||
Some(&json!({
|
||||
"download_url": format!("{}/download/file_123", server.uri()),
|
||||
"file_id": "file_123",
|
||||
"mime_type": "text/plain",
|
||||
"file_name": "report.txt",
|
||||
"uri": "sediment://file_123",
|
||||
"file_size_bytes": 11,
|
||||
}))
|
||||
);
|
||||
assert_eq!(
|
||||
apps_tool_call.pointer("/params/_meta/_codex_apps"),
|
||||
Some(&json!({
|
||||
"resource_uri": DOCUMENT_EXTRACT_TEXT_RESOURCE_URI,
|
||||
"contains_mcp_source": true,
|
||||
"connector_id": "calendar",
|
||||
}))
|
||||
);
|
||||
|
||||
server.verify().await;
|
||||
Ok(())
|
||||
}
|
||||
@@ -9,8 +9,8 @@ use codex_core::RolloutRecorder;
|
||||
use codex_core::RolloutRecorderParams;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
use codex_core::find_archived_thread_path_by_id_str;
|
||||
use codex_core::find_thread_meta_by_name_str;
|
||||
use codex_core::find_thread_path_by_id_str;
|
||||
use codex_core::find_thread_path_by_name_str;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::models::BaseInstructions;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
@@ -197,9 +197,10 @@ async fn find_locates_rollout_file_written_by_recorder() -> std::io::Result<()>
|
||||
),
|
||||
)?;
|
||||
|
||||
let found = find_thread_path_by_name_str(home.path(), thread_name).await?;
|
||||
let found = find_thread_meta_by_name_str(home.path(), thread_name).await?;
|
||||
|
||||
let path = found.expect("expected rollout path to be found");
|
||||
let (path, session_meta) = found.expect("expected rollout path to be found");
|
||||
assert_eq!(session_meta.meta.id, thread_id);
|
||||
assert!(path.exists());
|
||||
let contents = std::fs::read_to_string(&path)?;
|
||||
assert!(contents.contains(&thread_id.to_string()));
|
||||
|
||||
@@ -60,6 +60,7 @@ use codex_core::config::resolve_oss_provider;
|
||||
use codex_core::config_loader::ConfigLoadError;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_core::config_loader::format_config_error_with_source;
|
||||
use codex_core::find_thread_meta_by_name_str;
|
||||
use codex_core::format_exec_policy_error_with_source;
|
||||
use codex_core::path_utils;
|
||||
use codex_feedback::CodexFeedback;
|
||||
@@ -1256,6 +1257,27 @@ async fn resolve_resume_thread_id(
|
||||
if Uuid::parse_str(session_id).is_ok() {
|
||||
return Ok(Some(session_id.to_string()));
|
||||
}
|
||||
if let Some(state_db) = codex_core::get_state_db(config).await {
|
||||
let cwd = (!args.all).then_some(config.cwd.as_path());
|
||||
let resolved = state_db
|
||||
.find_thread_by_exact_title(
|
||||
session_id,
|
||||
&[],
|
||||
/*model_providers*/ None,
|
||||
/*archived_only*/ false,
|
||||
cwd,
|
||||
)
|
||||
.await?;
|
||||
if let Some(thread) = resolved {
|
||||
return Ok(Some(thread.id.to_string()));
|
||||
}
|
||||
if let Some((_, session_meta)) =
|
||||
find_thread_meta_by_name_str(&config.codex_home, session_id).await?
|
||||
&& (args.all || cwds_match(config.cwd.as_path(), &session_meta.meta.cwd))
|
||||
{
|
||||
return Ok(Some(session_meta.meta.id.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
let mut cursor = None;
|
||||
loop {
|
||||
@@ -1271,10 +1293,7 @@ async fn resolve_resume_thread_id(
|
||||
source_kinds: Some(all_thread_source_kinds()),
|
||||
archived: Some(false),
|
||||
cwd: None,
|
||||
// Thread names are attached separately from rollout titles, so name
|
||||
// resolution must scan the filtered list client-side instead of relying
|
||||
// on the backend `search_term` filter.
|
||||
search_term: None,
|
||||
search_term: Some(session_id.to_string()),
|
||||
},
|
||||
},
|
||||
"thread/list",
|
||||
|
||||
@@ -6,6 +6,7 @@ use crate::reasons::REASON_METHOD_NOT_ALLOWED;
|
||||
use crate::reasons::REASON_MITM_REQUIRED;
|
||||
use crate::reasons::REASON_NOT_ALLOWED;
|
||||
use crate::reasons::REASON_NOT_ALLOWED_LOCAL;
|
||||
use crate::reasons::REASON_PROXY_DISABLED;
|
||||
use rama_http::Body;
|
||||
use rama_http::Response;
|
||||
use rama_http::StatusCode;
|
||||
@@ -59,18 +60,13 @@ pub fn blocked_header_value(reason: &str) -> &'static str {
|
||||
|
||||
pub fn blocked_message(reason: &str) -> &'static str {
|
||||
match reason {
|
||||
REASON_NOT_ALLOWED => {
|
||||
"Codex blocked this request: domain not in allowlist (this is not a denylist block)."
|
||||
}
|
||||
REASON_NOT_ALLOWED_LOCAL => {
|
||||
"Codex blocked this request: local/private addresses not allowed."
|
||||
}
|
||||
REASON_DENIED => "Codex blocked this request: domain denied by policy.",
|
||||
REASON_METHOD_NOT_ALLOWED => {
|
||||
"Codex blocked this request: method not allowed in limited mode."
|
||||
}
|
||||
REASON_MITM_REQUIRED => "Codex blocked this request: MITM required for limited HTTPS.",
|
||||
_ => "Codex blocked this request by network policy.",
|
||||
REASON_NOT_ALLOWED => "Domain not in allowlist.",
|
||||
REASON_NOT_ALLOWED_LOCAL => "Sandbox policy blocks local/private network addresses.",
|
||||
REASON_DENIED => "Domain denied by the sandbox policy.",
|
||||
REASON_METHOD_NOT_ALLOWED => "Method not allowed in limited mode.",
|
||||
REASON_MITM_REQUIRED => "MITM required for limited HTTPS.",
|
||||
REASON_PROXY_DISABLED => "network proxy is disabled",
|
||||
_ => "Request blocked by network policy.",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -117,9 +113,6 @@ mod tests {
|
||||
};
|
||||
|
||||
let message = blocked_message_with_policy(REASON_NOT_ALLOWED, &details);
|
||||
assert_eq!(
|
||||
message,
|
||||
"Codex blocked this request: domain not in allowlist (this is not a denylist block)."
|
||||
);
|
||||
assert_eq!(message, "Domain not in allowlist.");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,10 +54,11 @@ pub use policy::EventPersistenceMode;
|
||||
pub use policy::should_persist_response_item_for_memories;
|
||||
pub use recorder::RolloutRecorder;
|
||||
pub use recorder::RolloutRecorderParams;
|
||||
pub use recorder::append_rollout_item_to_path;
|
||||
pub use session_index::append_thread_name;
|
||||
pub use session_index::find_thread_meta_by_name_str;
|
||||
pub use session_index::find_thread_name_by_id;
|
||||
pub use session_index::find_thread_names_by_ids;
|
||||
pub use session_index::find_thread_path_by_name_str;
|
||||
pub use state_db::StateDbHandle;
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -97,6 +97,7 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option<EventPersistenceMode> {
|
||||
| EventMsg::AgentReasoning(_)
|
||||
| EventMsg::AgentReasoningRawContent(_)
|
||||
| EventMsg::TokenCount(_)
|
||||
| EventMsg::ThreadNameUpdated(_)
|
||||
| EventMsg::ContextCompacted(_)
|
||||
| EventMsg::EnteredReviewMode(_)
|
||||
| EventMsg::ExitedReviewMode(_)
|
||||
@@ -142,7 +143,6 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option<EventPersistenceMode> {
|
||||
| EventMsg::AgentReasoningSectionBreak(_)
|
||||
| EventMsg::RawResponseItem(_)
|
||||
| EventMsg::SessionConfigured(_)
|
||||
| EventMsg::ThreadNameUpdated(_)
|
||||
| EventMsg::McpToolCallBegin(_)
|
||||
| EventMsg::WebSearchBegin(_)
|
||||
| EventMsg::ExecCommandBegin(_)
|
||||
@@ -188,8 +188,10 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option<EventPersistenceMode> {
|
||||
mod tests {
|
||||
use super::EventPersistenceMode;
|
||||
use super::should_persist_event_msg;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ImageGenerationEndEvent;
|
||||
use codex_protocol::protocol::ThreadNameUpdatedEvent;
|
||||
|
||||
#[test]
|
||||
fn persists_image_generation_end_events_in_limited_mode() {
|
||||
@@ -206,4 +208,17 @@ mod tests {
|
||||
EventPersistenceMode::Limited
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn persists_thread_name_updates_in_limited_mode() {
|
||||
let event = EventMsg::ThreadNameUpdated(ThreadNameUpdatedEvent {
|
||||
thread_id: ThreadId::new(),
|
||||
thread_name: Some("saved-session".to_string()),
|
||||
});
|
||||
|
||||
assert!(should_persist_event_msg(
|
||||
&event,
|
||||
EventPersistenceMode::Limited
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -225,6 +225,26 @@ impl RolloutRecorder {
|
||||
search_term: Option<&str>,
|
||||
) -> std::io::Result<ThreadsPage> {
|
||||
let codex_home = config.codex_home();
|
||||
let state_db_ctx = state_db::get_state_db(config).await;
|
||||
|
||||
if search_term.is_some()
|
||||
&& let Some(db_page) = state_db::list_threads_db(
|
||||
state_db_ctx.as_deref(),
|
||||
codex_home,
|
||||
page_size,
|
||||
cursor,
|
||||
sort_key,
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
archived,
|
||||
search_term,
|
||||
)
|
||||
.await
|
||||
&& (!db_page.items.is_empty() || cursor.is_some())
|
||||
{
|
||||
return Ok(db_page.into());
|
||||
}
|
||||
|
||||
// Filesystem-first listing intentionally overfetches so we can repair stale/missing
|
||||
// SQLite rollout paths before the final DB-backed page is returned.
|
||||
let fs_page_size = page_size.saturating_mul(2).max(page_size);
|
||||
@@ -256,7 +276,6 @@ impl RolloutRecorder {
|
||||
.await?
|
||||
};
|
||||
|
||||
let state_db_ctx = state_db::get_state_db(config).await;
|
||||
if state_db_ctx.is_none() {
|
||||
// Keep legacy behavior when SQLite is unavailable: return filesystem results
|
||||
// at the requested page size.
|
||||
@@ -951,6 +970,23 @@ async fn sync_thread_state_after_write(
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Append one already-filtered rollout item to an existing rollout JSONL file.
|
||||
///
|
||||
/// This is for metadata updates to unloaded threads. Live sessions should use
|
||||
/// `RolloutRecorder::record_items` so rollout and SQLite updates remain ordered
|
||||
/// with the rest of the session stream.
|
||||
pub async fn append_rollout_item_to_path(
|
||||
rollout_path: &Path,
|
||||
item: &RolloutItem,
|
||||
) -> std::io::Result<()> {
|
||||
let file = tokio::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(rollout_path)
|
||||
.await?;
|
||||
let mut writer = JsonlWriter { file };
|
||||
writer.write_rollout_item(item).await
|
||||
}
|
||||
|
||||
struct JsonlWriter {
|
||||
file: tokio::fs::File,
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use serde::Deserialize;
|
||||
use serde::Serialize;
|
||||
use tokio::io::AsyncBufReadExt;
|
||||
@@ -111,12 +112,12 @@ pub async fn find_thread_names_by_ids(
|
||||
Ok(names)
|
||||
}
|
||||
|
||||
/// Locate a recorded thread rollout file by thread name using newest-first ordering.
|
||||
/// Returns `Ok(Some(path))` if found, `Ok(None)` if not present.
|
||||
pub async fn find_thread_path_by_name_str(
|
||||
/// Locate a recorded thread rollout and read its session metadata by thread name.
|
||||
/// Returns the newest indexed name that still has a readable rollout header.
|
||||
pub async fn find_thread_meta_by_name_str(
|
||||
codex_home: &Path,
|
||||
name: &str,
|
||||
) -> std::io::Result<Option<PathBuf>> {
|
||||
) -> std::io::Result<Option<(PathBuf, SessionMetaLine)>> {
|
||||
if name.trim().is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
@@ -136,11 +137,11 @@ pub async fn find_thread_path_by_name_str(
|
||||
// rename cannot shadow an older persisted session with the same name.
|
||||
if let Some(path) =
|
||||
super::list::find_thread_path_by_id_str(codex_home, &thread_id.to_string()).await?
|
||||
&& super::list::read_session_meta_line(&path).await.is_ok()
|
||||
&& let Ok(session_meta) = super::list::read_session_meta_line(&path).await
|
||||
{
|
||||
drop(rx);
|
||||
scan.await.map_err(std::io::Error::other)??;
|
||||
return Ok(Some(path));
|
||||
return Ok(Some((path, session_meta)));
|
||||
}
|
||||
}
|
||||
scan.await.map_err(std::io::Error::other)??;
|
||||
|
||||
@@ -73,7 +73,7 @@ fn find_thread_id_by_name_prefers_latest_entry() -> std::io::Result<()> {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn find_thread_path_by_name_str_skips_newest_entry_without_rollout() -> std::io::Result<()> {
|
||||
async fn find_thread_meta_by_name_str_skips_newest_entry_without_rollout() -> std::io::Result<()> {
|
||||
// A newer unsaved name entry should not shadow an older persisted rollout with the same name.
|
||||
let temp = TempDir::new()?;
|
||||
let path = session_index_path(temp.path());
|
||||
@@ -99,14 +99,17 @@ async fn find_thread_path_by_name_str_skips_newest_entry_without_rollout() -> st
|
||||
];
|
||||
write_index(&path, &lines)?;
|
||||
|
||||
let found = find_thread_path_by_name_str(temp.path(), "same").await?;
|
||||
let found = find_thread_meta_by_name_str(temp.path(), "same").await?;
|
||||
|
||||
assert_eq!(found, Some(saved_rollout_path));
|
||||
assert_eq!(
|
||||
found.map(|(path, session_meta)| (path, session_meta.meta.id)),
|
||||
Some((saved_rollout_path, saved_id))
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn find_thread_path_by_name_str_skips_partial_rollout() -> std::io::Result<()> {
|
||||
async fn find_thread_meta_by_name_str_skips_partial_rollout() -> std::io::Result<()> {
|
||||
let temp = TempDir::new()?;
|
||||
let path = session_index_path(temp.path());
|
||||
let saved_id = ThreadId::new();
|
||||
@@ -133,14 +136,14 @@ async fn find_thread_path_by_name_str_skips_partial_rollout() -> std::io::Result
|
||||
];
|
||||
write_index(&path, &lines)?;
|
||||
|
||||
let found = find_thread_path_by_name_str(temp.path(), "same").await?;
|
||||
let found = find_thread_meta_by_name_str(temp.path(), "same").await?;
|
||||
|
||||
assert_eq!(found, Some(saved_rollout_path));
|
||||
assert_eq!(found.map(|(path, _)| path), Some(saved_rollout_path));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn find_thread_path_by_name_str_ignores_historical_name_after_rename() -> std::io::Result<()>
|
||||
async fn find_thread_meta_by_name_str_ignores_historical_name_after_rename() -> std::io::Result<()>
|
||||
{
|
||||
let temp = TempDir::new()?;
|
||||
let path = session_index_path(temp.path());
|
||||
@@ -171,9 +174,9 @@ async fn find_thread_path_by_name_str_ignores_historical_name_after_rename() ->
|
||||
];
|
||||
write_index(&path, &lines)?;
|
||||
|
||||
let found = find_thread_path_by_name_str(temp.path(), "same").await?;
|
||||
let found = find_thread_meta_by_name_str(temp.path(), "same").await?;
|
||||
|
||||
assert_eq!(found, Some(current_rollout_path));
|
||||
assert_eq!(found.map(|(path, _)| path), Some(current_rollout_path));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -33,7 +33,9 @@ pub fn apply_rollout_item(
|
||||
pub fn rollout_item_affects_thread_metadata(item: &RolloutItem) -> bool {
|
||||
match item {
|
||||
RolloutItem::SessionMeta(_) | RolloutItem::TurnContext(_) => true,
|
||||
RolloutItem::EventMsg(EventMsg::TokenCount(_) | EventMsg::UserMessage(_)) => true,
|
||||
RolloutItem::EventMsg(
|
||||
EventMsg::TokenCount(_) | EventMsg::UserMessage(_) | EventMsg::ThreadNameUpdated(_),
|
||||
) => true,
|
||||
RolloutItem::EventMsg(_) | RolloutItem::ResponseItem(_) | RolloutItem::Compacted(_) => {
|
||||
false
|
||||
}
|
||||
@@ -95,13 +97,18 @@ fn apply_event_msg(metadata: &mut ThreadMetadata, event: &EventMsg) {
|
||||
}
|
||||
}
|
||||
}
|
||||
EventMsg::ThreadNameUpdated(updated) => {
|
||||
if let Some(title) = updated.thread_name.as_deref()
|
||||
&& !title.trim().is_empty()
|
||||
{
|
||||
metadata.title = title.trim().to_string();
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
fn apply_response_item(_metadata: &mut ThreadMetadata, _item: &ResponseItem) {
|
||||
// Title and first_user_message are derived from EventMsg::UserMessage only.
|
||||
}
|
||||
fn apply_response_item(_metadata: &mut ThreadMetadata, _item: &ResponseItem) {}
|
||||
|
||||
fn strip_user_message_prefix(text: &str) -> &str {
|
||||
match text.find(USER_MESSAGE_BEGIN) {
|
||||
@@ -152,6 +159,7 @@ mod tests {
|
||||
use codex_protocol::protocol::SessionMeta;
|
||||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::ThreadNameUpdatedEvent;
|
||||
use codex_protocol::protocol::TurnContextItem;
|
||||
use codex_protocol::protocol::USER_MESSAGE_BEGIN;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
@@ -198,6 +206,25 @@ mod tests {
|
||||
assert_eq!(metadata.title, "actual user request");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn thread_name_update_replaces_title_without_changing_first_user_message() {
|
||||
let mut metadata = metadata_for_test();
|
||||
metadata.title = "actual user request".to_string();
|
||||
metadata.first_user_message = Some("actual user request".to_string());
|
||||
let item = RolloutItem::EventMsg(EventMsg::ThreadNameUpdated(ThreadNameUpdatedEvent {
|
||||
thread_id: metadata.id,
|
||||
thread_name: Some("saved-session".to_string()),
|
||||
}));
|
||||
|
||||
apply_rollout_item(&mut metadata, &item, "test-provider");
|
||||
|
||||
assert_eq!(
|
||||
metadata.first_user_message.as_deref(),
|
||||
Some("actual user request")
|
||||
);
|
||||
assert_eq!(metadata.title, "saved-session");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn event_msg_image_only_user_message_sets_image_placeholder_preview() {
|
||||
let mut metadata = metadata_for_test();
|
||||
|
||||
@@ -326,6 +326,66 @@ ON CONFLICT(child_thread_id) DO NOTHING
|
||||
.map(PathBuf::from))
|
||||
}
|
||||
|
||||
/// Find the newest thread whose user-facing title exactly matches `title`.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn find_thread_by_exact_title(
|
||||
&self,
|
||||
title: &str,
|
||||
allowed_sources: &[String],
|
||||
model_providers: Option<&[String]>,
|
||||
archived_only: bool,
|
||||
cwd: Option<&Path>,
|
||||
) -> anyhow::Result<Option<crate::ThreadMetadata>> {
|
||||
let mut builder = QueryBuilder::<Sqlite>::new(
|
||||
r#"
|
||||
SELECT
|
||||
id,
|
||||
rollout_path,
|
||||
created_at,
|
||||
updated_at,
|
||||
source,
|
||||
agent_nickname,
|
||||
agent_role,
|
||||
agent_path,
|
||||
model_provider,
|
||||
model,
|
||||
reasoning_effort,
|
||||
cwd,
|
||||
cli_version,
|
||||
title,
|
||||
sandbox_policy,
|
||||
approval_mode,
|
||||
tokens_used,
|
||||
first_user_message,
|
||||
archived_at,
|
||||
git_sha,
|
||||
git_branch,
|
||||
git_origin_url
|
||||
FROM threads
|
||||
"#,
|
||||
);
|
||||
push_thread_filters(
|
||||
&mut builder,
|
||||
archived_only,
|
||||
allowed_sources,
|
||||
model_providers,
|
||||
/*anchor*/ None,
|
||||
crate::SortKey::UpdatedAt,
|
||||
/*search_term*/ None,
|
||||
);
|
||||
builder.push(" AND title = ");
|
||||
builder.push_bind(title);
|
||||
if let Some(cwd) = cwd {
|
||||
builder.push(" AND cwd = ");
|
||||
builder.push_bind(cwd.display().to_string());
|
||||
}
|
||||
push_thread_order_and_limit(&mut builder, crate::SortKey::UpdatedAt, /*limit*/ 1);
|
||||
|
||||
let row = builder.build().fetch_optional(self.pool.as_ref()).await?;
|
||||
row.map(|row| ThreadRow::try_from_row(&row).and_then(crate::ThreadMetadata::try_from))
|
||||
.transpose()
|
||||
}
|
||||
|
||||
/// List threads using the underlying database.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn list_threads(
|
||||
@@ -521,6 +581,19 @@ ON CONFLICT(id) DO NOTHING
|
||||
Ok(result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
pub async fn update_thread_title(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
title: &str,
|
||||
) -> anyhow::Result<bool> {
|
||||
let result = sqlx::query("UPDATE threads SET title = ? WHERE id = ?")
|
||||
.bind(title)
|
||||
.bind(thread_id.to_string())
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(result.rows_affected() > 0)
|
||||
}
|
||||
|
||||
pub async fn touch_thread_updated_at(
|
||||
&self,
|
||||
thread_id: ThreadId,
|
||||
|
||||
@@ -4219,7 +4219,13 @@ impl App {
|
||||
tui.frame_requester().schedule_frame();
|
||||
}
|
||||
AppEvent::ResumeSessionByIdOrName(id_or_name) => {
|
||||
match crate::lookup_session_target_with_app_server(app_server, &id_or_name).await? {
|
||||
match crate::lookup_session_target_with_app_server(
|
||||
app_server,
|
||||
self.config.codex_home.as_path(),
|
||||
&id_or_name,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
Some(target_session) => {
|
||||
return self
|
||||
.resume_target_session(tui, app_server, target_session)
|
||||
|
||||
@@ -29,6 +29,7 @@ use unicode_width::UnicodeWidthStr;
|
||||
pub(crate) const TOOL_CALL_MAX_LINES: usize = 5;
|
||||
const USER_SHELL_TOOL_CALL_MAX_LINES: usize = 50;
|
||||
const MAX_INTERACTION_PREVIEW_CHARS: usize = 80;
|
||||
const TRANSCRIPT_HINT: &str = "ctrl + t to view transcript";
|
||||
|
||||
pub(crate) struct OutputLinesParams {
|
||||
pub(crate) line_limit: usize,
|
||||
@@ -154,7 +155,7 @@ pub(crate) fn output_lines(
|
||||
};
|
||||
if show_ellipsis {
|
||||
let omitted = total - 2 * line_limit;
|
||||
out.push(format!("… +{omitted} lines").into());
|
||||
out.push(ExecCell::output_ellipsis_line(omitted));
|
||||
}
|
||||
|
||||
let tail_start = if show_ellipsis {
|
||||
@@ -250,6 +251,14 @@ impl HistoryCell for ExecCell {
|
||||
}
|
||||
|
||||
impl ExecCell {
|
||||
fn output_ellipsis_text(omitted: usize) -> String {
|
||||
format!("… +{omitted} lines ({TRANSCRIPT_HINT})")
|
||||
}
|
||||
|
||||
fn output_ellipsis_line(omitted: usize) -> Line<'static> {
|
||||
Line::from(vec![Self::output_ellipsis_text(omitted).dim()])
|
||||
}
|
||||
|
||||
fn exploring_display_lines(&self, width: u16) -> Vec<Line<'static>> {
|
||||
let mut out: Vec<Line<'static>> = Vec::new();
|
||||
out.push(Line::from(vec![
|
||||
@@ -559,25 +568,24 @@ impl ExecCell {
|
||||
if total_rows <= max_rows {
|
||||
return lines.to_vec();
|
||||
}
|
||||
if max_rows == 1 {
|
||||
// Carry forward any previously omitted count and add any
|
||||
// additionally hidden content lines from this truncation.
|
||||
let base = omitted_hint.unwrap_or(0);
|
||||
// When an existing ellipsis is present, `lines` already includes
|
||||
// that single representation line; exclude it from the count of
|
||||
// additionally omitted content lines.
|
||||
let extra = lines
|
||||
// Reserve space for the transcript hint itself so the returned output
|
||||
// still respects the row budget on narrow terminals.
|
||||
let estimated_omitted = omitted_hint.unwrap_or(0)
|
||||
+ lines
|
||||
.len()
|
||||
.saturating_sub(usize::from(omitted_hint.is_some()));
|
||||
let omitted = base + extra;
|
||||
return vec![Self::ellipsis_line_with_prefix(
|
||||
omitted,
|
||||
let ellipsis_rows =
|
||||
Self::output_ellipsis_row_count(estimated_omitted, width, ellipsis_prefix.as_ref());
|
||||
if ellipsis_rows >= max_rows {
|
||||
return vec![Self::output_ellipsis_line_with_prefix(
|
||||
estimated_omitted,
|
||||
ellipsis_prefix.as_ref(),
|
||||
)];
|
||||
}
|
||||
|
||||
let head_budget = (max_rows - 1) / 2;
|
||||
let tail_budget = max_rows - head_budget - 1;
|
||||
let available_rows = max_rows - ellipsis_rows;
|
||||
let head_budget = available_rows / 2;
|
||||
let tail_budget = available_rows - head_budget;
|
||||
let mut head_lines: Vec<Line<'static>> = Vec::new();
|
||||
let mut head_rows = 0usize;
|
||||
let mut head_end = 0usize;
|
||||
@@ -611,7 +619,7 @@ impl ExecCell {
|
||||
.len()
|
||||
.saturating_sub(out.len() + tail_lines_reversed.len())
|
||||
.saturating_sub(usize::from(omitted_hint.is_some()));
|
||||
out.push(Self::ellipsis_line_with_prefix(
|
||||
out.push(Self::output_ellipsis_line_with_prefix(
|
||||
base + additional,
|
||||
ellipsis_prefix.as_ref(),
|
||||
));
|
||||
@@ -625,11 +633,27 @@ impl ExecCell {
|
||||
Line::from(vec![format!("… +{omitted} lines").dim()])
|
||||
}
|
||||
|
||||
/// Builds an ellipsis line (`… +N lines`) with an optional leading
|
||||
/// prefix so the ellipsis aligns with the output gutter.
|
||||
fn ellipsis_line_with_prefix(omitted: usize, prefix: Option<&Line<'static>>) -> Line<'static> {
|
||||
fn output_ellipsis_row_count(
|
||||
omitted: usize,
|
||||
width: u16,
|
||||
prefix: Option<&Line<'static>>,
|
||||
) -> usize {
|
||||
Paragraph::new(Text::from(vec![Self::output_ellipsis_line_with_prefix(
|
||||
omitted, prefix,
|
||||
)]))
|
||||
.wrap(Wrap { trim: false })
|
||||
.line_count(width)
|
||||
.max(1)
|
||||
}
|
||||
|
||||
/// Builds an output ellipsis line (`… +N lines (ctrl + t to view transcript)`)
|
||||
/// with an optional leading prefix so the ellipsis aligns with the output gutter.
|
||||
fn output_ellipsis_line_with_prefix(
|
||||
omitted: usize,
|
||||
prefix: Option<&Line<'static>>,
|
||||
) -> Line<'static> {
|
||||
let mut line = prefix.cloned().unwrap_or_default();
|
||||
line.push_span(format!("… +{omitted} lines").dim());
|
||||
line.push_span(Self::output_ellipsis_text(omitted).dim());
|
||||
line
|
||||
}
|
||||
}
|
||||
@@ -692,6 +716,13 @@ mod tests {
|
||||
use codex_protocol::protocol::ExecCommandSource;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
fn render_line_text(line: &Line<'static>) -> String {
|
||||
line.spans
|
||||
.iter()
|
||||
.map(|span| span.content.as_ref())
|
||||
.collect::<String>()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn user_shell_output_is_limited_by_screen_lines() {
|
||||
let long_url_like = format!(
|
||||
@@ -785,6 +816,16 @@ mod tests {
|
||||
contains_ellipsis,
|
||||
"expected truncated output to include an ellipsis line"
|
||||
);
|
||||
let normalized = lines
|
||||
.iter()
|
||||
.map(render_line_text)
|
||||
.join(" ")
|
||||
.split_whitespace()
|
||||
.join(" ");
|
||||
assert!(
|
||||
normalized.contains(TRANSCRIPT_HINT),
|
||||
"expected truncated output to advertise transcript shortcut, got {normalized}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -792,33 +833,82 @@ mod tests {
|
||||
let lines = vec![
|
||||
Line::from(" └ short"),
|
||||
Line::from(" this-is-a-very-long-token-that-wraps-many-rows"),
|
||||
Line::from(" … +4 lines"),
|
||||
Line::from(format!(
|
||||
" {}",
|
||||
ExecCell::output_ellipsis_text(/*omitted*/ 4)
|
||||
)),
|
||||
Line::from(" tail"),
|
||||
];
|
||||
|
||||
let truncated = ExecCell::truncate_lines_middle(
|
||||
&lines,
|
||||
/*max_rows*/ 2,
|
||||
/*width*/ 12,
|
||||
/*width*/ 80,
|
||||
Some(4),
|
||||
Some(Line::from(" ".dim())),
|
||||
);
|
||||
let rendered: Vec<String> = truncated
|
||||
.iter()
|
||||
.map(|line| {
|
||||
line.spans
|
||||
.iter()
|
||||
.map(|span| span.content.as_ref())
|
||||
.collect::<String>()
|
||||
})
|
||||
.collect();
|
||||
let rendered: Vec<String> = truncated.iter().map(render_line_text).collect();
|
||||
|
||||
assert!(
|
||||
rendered.iter().any(|line| line.contains("… +6 lines")),
|
||||
rendered
|
||||
.iter()
|
||||
.any(|line| line.contains("… +6 lines (ctrl + t to view transcript)")),
|
||||
"expected omitted hint to count hidden lines (not wrapped rows), got: {rendered:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn output_lines_ellipsis_includes_transcript_hint() {
|
||||
let output = CommandOutput {
|
||||
exit_code: 0,
|
||||
aggregated_output: (1..=7).map(|n| n.to_string()).join("\n"),
|
||||
formatted_output: String::new(),
|
||||
};
|
||||
|
||||
let rendered: Vec<String> = output_lines(
|
||||
Some(&output),
|
||||
OutputLinesParams {
|
||||
line_limit: 2,
|
||||
only_err: false,
|
||||
include_angle_pipe: false,
|
||||
include_prefix: false,
|
||||
},
|
||||
)
|
||||
.lines
|
||||
.iter()
|
||||
.map(render_line_text)
|
||||
.collect();
|
||||
|
||||
assert!(
|
||||
rendered
|
||||
.iter()
|
||||
.any(|line| line.contains("… +3 lines (ctrl + t to view transcript)")),
|
||||
"expected logical truncation to include transcript hint, got: {rendered:?}"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn command_truncation_ellipsis_does_not_include_transcript_hint() {
|
||||
let truncated = ExecCell::limit_lines_from_start(
|
||||
&[
|
||||
Line::from("first"),
|
||||
Line::from("second"),
|
||||
Line::from("third"),
|
||||
],
|
||||
/*keep*/ 2,
|
||||
);
|
||||
let rendered: Vec<String> = truncated.iter().map(render_line_text).collect();
|
||||
|
||||
assert_eq!(
|
||||
rendered,
|
||||
vec![
|
||||
"first".to_string(),
|
||||
"second".to_string(),
|
||||
"… +1 lines".to_string(),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn truncate_lines_middle_does_not_truncate_blank_prefixed_output_lines() {
|
||||
let mut lines = vec![Line::from(" └ start")];
|
||||
|
||||
@@ -33,6 +33,7 @@ use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::ConfigLoadError;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
use codex_core::config_loader::format_config_error_with_source;
|
||||
use codex_core::find_thread_meta_by_name_str;
|
||||
use codex_core::format_exec_policy_error_with_source;
|
||||
use codex_core::path_utils;
|
||||
use codex_core::read_session_meta_line;
|
||||
@@ -492,6 +493,7 @@ fn session_target_from_app_server_thread(
|
||||
|
||||
async fn lookup_session_target_by_name_with_app_server(
|
||||
app_server: &mut AppServerSession,
|
||||
codex_home: &Path,
|
||||
name: &str,
|
||||
) -> color_eyre::Result<Option<resume_picker::SessionTarget>> {
|
||||
let mut cursor = None;
|
||||
@@ -505,10 +507,7 @@ async fn lookup_session_target_by_name_with_app_server(
|
||||
source_kinds: Some(vec![ThreadSourceKind::Cli, ThreadSourceKind::VsCode]),
|
||||
archived: Some(false),
|
||||
cwd: None,
|
||||
// Thread names are hydrated after `thread/list` resolves rollout metadata, so
|
||||
// name-based resume must scan the filtered list client-side instead of relying on
|
||||
// the backend search index.
|
||||
search_term: None,
|
||||
search_term: Some(name.to_string()),
|
||||
})
|
||||
.await?;
|
||||
if let Some(thread) = response
|
||||
@@ -519,7 +518,15 @@ async fn lookup_session_target_by_name_with_app_server(
|
||||
return Ok(session_target_from_app_server_thread(thread));
|
||||
}
|
||||
if response.next_cursor.is_none() {
|
||||
return Ok(None);
|
||||
if app_server.is_remote() {
|
||||
return Ok(None);
|
||||
}
|
||||
return Ok(find_thread_meta_by_name_str(codex_home, name).await?.map(
|
||||
|(path, session_meta)| resume_picker::SessionTarget {
|
||||
path: Some(path),
|
||||
thread_id: session_meta.meta.id,
|
||||
},
|
||||
));
|
||||
}
|
||||
cursor = response.next_cursor;
|
||||
}
|
||||
@@ -527,6 +534,7 @@ async fn lookup_session_target_by_name_with_app_server(
|
||||
|
||||
async fn lookup_session_target_with_app_server(
|
||||
app_server: &mut AppServerSession,
|
||||
codex_home: &Path,
|
||||
id_or_name: &str,
|
||||
) -> color_eyre::Result<Option<resume_picker::SessionTarget>> {
|
||||
if Uuid::parse_str(id_or_name).is_ok() {
|
||||
@@ -557,7 +565,7 @@ async fn lookup_session_target_with_app_server(
|
||||
};
|
||||
}
|
||||
|
||||
lookup_session_target_by_name_with_app_server(app_server, id_or_name).await
|
||||
lookup_session_target_by_name_with_app_server(app_server, codex_home, id_or_name).await
|
||||
}
|
||||
|
||||
async fn lookup_latest_session_target_with_app_server(
|
||||
@@ -1163,7 +1171,13 @@ async fn run_ratatui_app(
|
||||
let Some(startup_app_server) = app_server.as_mut() else {
|
||||
unreachable!("app server should be initialized for --fork <id>");
|
||||
};
|
||||
match lookup_session_target_with_app_server(startup_app_server, id_str).await? {
|
||||
match lookup_session_target_with_app_server(
|
||||
startup_app_server,
|
||||
config.codex_home.as_path(),
|
||||
id_str,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
Some(target_session) => resume_picker::SessionSelection::Fork(target_session),
|
||||
None => {
|
||||
shutdown_app_server_if_present(app_server.take()).await;
|
||||
@@ -1224,7 +1238,13 @@ async fn run_ratatui_app(
|
||||
let Some(startup_app_server) = app_server.as_mut() else {
|
||||
unreachable!("app server should be initialized for --resume <id>");
|
||||
};
|
||||
match lookup_session_target_with_app_server(startup_app_server, id_str).await? {
|
||||
match lookup_session_target_with_app_server(
|
||||
startup_app_server,
|
||||
config.codex_home.as_path(),
|
||||
id_str,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
Some(target_session) => resume_picker::SessionSelection::Resume(target_session),
|
||||
None => {
|
||||
shutdown_app_server_if_present(app_server.take()).await;
|
||||
@@ -1997,8 +2017,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn lookup_session_target_by_name_ignores_backend_search_term_mismatch()
|
||||
-> color_eyre::Result<()> {
|
||||
async fn lookup_session_target_by_name_uses_backend_title_search() -> color_eyre::Result<()> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
let config = build_config(&temp_dir).await?;
|
||||
let thread_id = ThreadId::new();
|
||||
@@ -2034,22 +2053,76 @@ mod tests {
|
||||
);
|
||||
builder.cwd = session_cwd;
|
||||
let mut metadata = builder.build(config.model_provider_id.as_str());
|
||||
metadata.title = "Different rollout title".to_string();
|
||||
metadata.title = "saved-session".to_string();
|
||||
metadata.first_user_message = Some("preview text".to_string());
|
||||
state_runtime
|
||||
.upsert_thread(&metadata)
|
||||
.await
|
||||
.map_err(std::io::Error::other)?;
|
||||
|
||||
codex_core::append_thread_name(&config.codex_home, thread_id, "saved-session").await?;
|
||||
let mut app_server =
|
||||
AppServerSession::new(codex_app_server_client::AppServerClient::InProcess(
|
||||
start_test_embedded_app_server(config).await?,
|
||||
));
|
||||
let target = lookup_session_target_by_name_with_app_server(
|
||||
&mut app_server,
|
||||
temp_dir.path(),
|
||||
"saved-session",
|
||||
)
|
||||
.await?;
|
||||
let target = target.expect("name lookup should find the saved thread");
|
||||
assert_eq!(target.path, Some(rollout_path));
|
||||
assert_eq!(target.thread_id, thread_id);
|
||||
|
||||
app_server.shutdown().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn lookup_session_target_by_name_falls_back_to_legacy_index() -> color_eyre::Result<()> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
let config = build_config(&temp_dir).await?;
|
||||
let thread_id = ThreadId::new();
|
||||
let rollout_path = temp_dir
|
||||
.path()
|
||||
.join("sessions/2025/02/01")
|
||||
.join(format!("rollout-2025-02-01T10-00-00-{thread_id}.jsonl"));
|
||||
std::fs::create_dir_all(rollout_path.parent().expect("rollout parent"))?;
|
||||
let session_meta = SessionMeta {
|
||||
id: thread_id,
|
||||
timestamp: "2025-02-01T10:00:00Z".to_string(),
|
||||
model_provider: Some(config.model_provider_id.clone()),
|
||||
..SessionMeta::default()
|
||||
};
|
||||
let line = RolloutLine {
|
||||
timestamp: session_meta.timestamp.clone(),
|
||||
item: RolloutItem::SessionMeta(SessionMetaLine {
|
||||
meta: session_meta,
|
||||
git: None,
|
||||
}),
|
||||
};
|
||||
std::fs::write(
|
||||
&rollout_path,
|
||||
format!("{}\n", serde_json::to_string(&line)?),
|
||||
)?;
|
||||
std::fs::write(
|
||||
temp_dir.path().join("session_index.jsonl"),
|
||||
format!(
|
||||
"{{\"id\":\"{thread_id}\",\"thread_name\":\"hello\",\"updated_at\":\"2025-02-02T10:00:00Z\"}}\n"
|
||||
),
|
||||
)?;
|
||||
|
||||
let mut app_server =
|
||||
AppServerSession::new(codex_app_server_client::AppServerClient::InProcess(
|
||||
start_test_embedded_app_server(config).await?,
|
||||
));
|
||||
let target =
|
||||
lookup_session_target_by_name_with_app_server(&mut app_server, "saved-session").await?;
|
||||
let target = target.expect("name lookup should find the saved thread");
|
||||
let target = lookup_session_target_by_name_with_app_server(
|
||||
&mut app_server,
|
||||
temp_dir.path(),
|
||||
"hello",
|
||||
)
|
||||
.await?;
|
||||
let target = target.expect("legacy name lookup should find the saved thread");
|
||||
assert_eq!(target.path, Some(rollout_path));
|
||||
assert_eq!(target.thread_id, thread_id);
|
||||
|
||||
|
||||
@@ -5,6 +5,6 @@ expression: rendered
|
||||
• Ran seq 1 10 1>&2 && false
|
||||
└ 1
|
||||
2
|
||||
… +6 lines
|
||||
… +6 lines (ctrl + t to view transcript)
|
||||
9
|
||||
10
|
||||
|
||||
Reference in New Issue
Block a user