Compare commits

..

1 Commits

Author SHA1 Message Date
Rennie Song
b302f5786b Propagate thread id in MCP tool metadata 2026-04-15 23:28:47 -07:00
29 changed files with 380 additions and 755 deletions

View File

@@ -94,11 +94,6 @@ print_bazel_test_log_tails() {
local rel_path="${target#//}"
rel_path="${rel_path/://}"
local test_log="${testlogs_dir}/${rel_path}/test.log"
local reported_test_log
reported_test_log="$(grep -F "FAIL: ${target} " "$console_log" | sed -nE 's#.* \(see ([^)]+/test\.log)\).*#\1#p' | head -n 1 || true)"
if [[ -n "$reported_test_log" ]]; then
test_log="$reported_test_log"
fi
echo "::group::Bazel test log tail for ${target}"
if [[ -f "$test_log" ]]; then

View File

@@ -146,7 +146,7 @@ Example with notification opt-out:
- `thread/memoryMode/set` — experimental; set a threads persisted memory eligibility to `"enabled"` or `"disabled"` for either a loaded thread or a stored rollout; returns `{}` on success.
- `memory/reset` — experimental; clear the current `CODEX_HOME/memories` directory and reset persisted memory stage data in sqlite while preserving existing thread memory modes; returns `{}` on success.
- `thread/status/changed` — notification emitted when a loaded threads status changes (`threadId` + new `status`).
- `thread/archive` — move a threads rollout file into the archived directory and attempt to move any spawned descendant thread rollout files; returns `{}` on success and emits `thread/archived` for each archived thread.
- `thread/archive` — move a threads rollout file into the archived directory; returns `{}` on success and emits `thread/archived`.
- `thread/unsubscribe` — unsubscribe this connection from thread turn/item events. If this was the last subscriber, the server keeps the thread loaded and unloads it only after it has had no subscribers and no thread activity for 30 minutes, then emits `thread/closed`.
- `thread/name/set` — set or update a threads user-facing name for either a loaded thread or a persisted rollout; returns `{}` on success and emits `thread/name/updated` to initialized, opted-in clients. Thread names are not required to be unique; name lookups resolve to the most recently updated thread.
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success and emits `thread/unarchived`.
@@ -426,7 +426,7 @@ Experimental: use `memory/reset` to clear local memory artifacts and sqlite-back
### Example: Archive a thread
Use `thread/archive` to move the persisted rollout (stored as a JSONL file on disk) into the archived sessions directory and attempt to move any spawned descendant thread rollouts.
Use `thread/archive` to move the persisted rollout (stored as a JSONL file on disk) into the archived sessions directory.
```json
{ "method": "thread/archive", "id": 21, "params": { "threadId": "thr_b" } }

View File

@@ -186,7 +186,6 @@ use codex_app_server_protocol::ThreadUnsubscribeStatus;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnError;
use codex_app_server_protocol::TurnInterruptParams;
use codex_app_server_protocol::TurnInterruptResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::TurnStatus;
@@ -2732,36 +2731,8 @@ impl CodexMessageProcessor {
}
};
let mut thread_ids = vec![thread_id];
if let Some(state_db_ctx) = get_state_db(&self.config).await {
let descendants = match state_db_ctx.list_thread_spawn_descendants(thread_id).await {
Ok(descendants) => descendants,
Err(err) => {
self.outgoing
.send_error(
request_id,
JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!(
"failed to list spawned descendants for thread id {thread_id}: {err}"
),
data: None,
},
)
.await;
return;
}
};
let mut seen = HashSet::from([thread_id]);
for descendant_id in descendants {
if seen.insert(descendant_id) {
thread_ids.push(descendant_id);
}
}
}
let mut archive_thread_ids = Vec::new();
match self
let thread_id_str = thread_id.to_string();
if let Err(err) = self
.thread_store
.read_thread(StoreReadThreadParams {
thread_id,
@@ -2770,98 +2741,34 @@ impl CodexMessageProcessor {
})
.await
{
Ok(thread) => {
if thread.archived_at.is_none() {
archive_thread_ids.push(thread_id);
}
}
Err(err) => {
self.outgoing
.send_error(request_id, thread_store_archive_error("archive", err))
.await;
return;
}
}
for descendant_thread_id in thread_ids.into_iter().skip(1) {
match self
.thread_store
.read_thread(StoreReadThreadParams {
thread_id: descendant_thread_id,
include_archived: true,
include_history: false,
})
.await
{
Ok(thread) => {
if thread.archived_at.is_none() {
archive_thread_ids.push(descendant_thread_id);
}
}
Err(err) => {
warn!(
"failed to read spawned descendant thread {descendant_thread_id} while archiving {thread_id}: {err}"
);
}
}
}
let mut archived_thread_ids = Vec::new();
let Some((parent_thread_id, descendant_thread_ids)) = archive_thread_ids.split_first()
else {
self.outgoing
.send_response(request_id, ThreadArchiveResponse {})
.send_error(request_id, thread_store_archive_error("archive", err))
.await;
return;
};
}
self.prepare_thread_for_archive(thread_id).await;
self.prepare_thread_for_archive(*parent_thread_id).await;
match self
.thread_store
.archive_thread(StoreArchiveThreadParams {
thread_id: *parent_thread_id,
})
.archive_thread(StoreArchiveThreadParams { thread_id })
.await
{
Ok(()) => {
archived_thread_ids.push(parent_thread_id.to_string());
let response = ThreadArchiveResponse {};
self.outgoing.send_response(request_id, response).await;
let notification = ThreadArchivedNotification {
thread_id: thread_id_str,
};
self.outgoing
.send_server_notification(ServerNotification::ThreadArchived(notification))
.await;
}
Err(err) => {
self.outgoing
.send_error(request_id, thread_store_archive_error("archive", err))
.await;
return;
}
}
for descendant_thread_id in descendant_thread_ids.iter().rev().copied() {
self.prepare_thread_for_archive(descendant_thread_id).await;
match self
.thread_store
.archive_thread(StoreArchiveThreadParams {
thread_id: descendant_thread_id,
})
.await
{
Ok(()) => {
archived_thread_ids.push(descendant_thread_id.to_string());
}
Err(err) => {
warn!(
"failed to archive spawned descendant thread {descendant_thread_id} while archiving {thread_id}: {err}"
);
}
}
}
self.outgoing
.send_response(request_id, ThreadArchiveResponse {})
.await;
for thread_id in archived_thread_ids {
let notification = ThreadArchivedNotification { thread_id };
self.outgoing
.send_server_notification(ServerNotification::ThreadArchived(notification))
.await;
}
}
async fn thread_increment_elicitation(
@@ -5685,17 +5592,19 @@ impl CodexMessageProcessor {
params: McpServerToolCallParams,
) {
let outgoing = Arc::clone(&self.outgoing);
let (_, thread) = match self.load_thread(&params.thread_id).await {
let thread_id = params.thread_id.clone();
let (_, thread) = match self.load_thread(&thread_id).await {
Ok(thread) => thread,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let meta = with_mcp_tool_call_thread_id_meta(params.meta, &thread_id);
tokio::spawn(async move {
let result = thread
.call_mcp_tool(&params.server, &params.tool, params.arguments, params.meta)
.call_mcp_tool(&params.server, &params.tool, params.arguments, meta)
.await;
match result {
Ok(result) => {
@@ -7652,12 +7561,9 @@ impl CodexMessageProcessor {
async fn turn_interrupt(&self, request_id: ConnectionRequestId, params: TurnInterruptParams) {
let TurnInterruptParams { thread_id, turn_id } = params;
let is_startup_interrupt = turn_id.is_empty();
if !is_startup_interrupt {
self.outgoing
.record_request_turn_id(&request_id, &turn_id)
.await;
}
self.outgoing
.record_request_turn_id(&request_id, &turn_id)
.await;
let (thread_uuid, thread) = match self.load_thread(&thread_id).await {
Ok(v) => v,
@@ -7667,48 +7573,21 @@ impl CodexMessageProcessor {
}
};
// Record turn interrupts so we can reply when TurnAborted arrives. Startup
// interrupts do not have a turn and are acknowledged after submission.
if !is_startup_interrupt {
let request = request_id.clone();
// Record the pending interrupt so we can reply when TurnAborted arrives.
{
let thread_state = self.thread_state_manager.thread_state(thread_uuid).await;
let mut thread_state = thread_state.lock().await;
thread_state
.pending_interrupts
.push((request_id.clone(), ApiVersion::V2));
.push((request, ApiVersion::V2));
}
// Submit the interrupt. Turn interrupts respond upon TurnAborted; startup
// interrupts respond here because startup cancellation has no turn event.
let submit_result = self
// Submit the interrupt; we'll respond upon TurnAborted.
let _ = self
.submit_core_op(&request_id, thread.as_ref(), Op::Interrupt)
.await;
match submit_result {
Ok(_) if is_startup_interrupt => {
self.outgoing
.send_response(request_id, TurnInterruptResponse {})
.await;
}
Ok(_) => {}
Err(err) => {
if !is_startup_interrupt {
let thread_state = self.thread_state_manager.thread_state(thread_uuid).await;
let mut thread_state = thread_state.lock().await;
thread_state
.pending_interrupts
.retain(|(pending_request_id, _)| pending_request_id != &request_id);
}
let interrupt_target = if is_startup_interrupt {
"startup"
} else {
"turn"
};
self.send_internal_error(
request_id,
format!("failed to interrupt {interrupt_target}: {err}"),
)
.await;
}
}
}
async fn ensure_conversation_listener(
@@ -9306,6 +9185,32 @@ fn thread_store_archive_error(operation: &str, err: ThreadStoreError) -> JSONRPC
}
}
const MCP_TOOL_THREAD_ID_META_KEY: &str = "threadId";
fn with_mcp_tool_call_thread_id_meta(
meta: Option<serde_json::Value>,
thread_id: &str,
) -> Option<serde_json::Value> {
match meta {
Some(serde_json::Value::Object(mut map)) => {
map.insert(
MCP_TOOL_THREAD_ID_META_KEY.to_string(),
serde_json::Value::String(thread_id.to_string()),
);
Some(serde_json::Value::Object(map))
}
None => {
let mut map = serde_json::Map::new();
map.insert(
MCP_TOOL_THREAD_ID_META_KEY.to_string(),
serde_json::Value::String(thread_id.to_string()),
);
Some(serde_json::Value::Object(map))
}
other => other,
}
}
fn summary_from_stored_thread(
thread: StoredThread,
fallback_provider: &str,

View File

@@ -83,10 +83,11 @@ url = "{mcp_server_url}/mcp"
)
.await??;
let ThreadStartResponse { thread, .. } = to_response(thread_start_resp)?;
let thread_id = thread.id.clone();
let tool_call_request_id = mcp
.send_mcp_server_tool_call_request(McpServerToolCallParams {
thread_id: thread.id,
thread_id: thread_id.clone(),
server: TEST_SERVER_NAME.to_string(),
tool: TEST_TOOL_NAME.to_string(),
arguments: Some(json!({
@@ -114,6 +115,7 @@ url = "{mcp_server_url}/mcp"
response.structured_content,
Some(json!({
"echoed": "hello from app",
"threadId": thread_id,
}))
);
assert_eq!(response.is_error, Some(false));
@@ -203,7 +205,7 @@ impl ServerHandler for ToolAppsMcpServer {
async fn call_tool(
&self,
request: CallToolRequestParams,
_context: RequestContext<RoleServer>,
context: RequestContext<RoleServer>,
) -> Result<CallToolResult, rmcp::ErrorData> {
assert_eq!(request.name.as_ref(), TEST_TOOL_NAME);
let message = request
@@ -212,12 +214,19 @@ impl ServerHandler for ToolAppsMcpServer {
.and_then(|arguments| arguments.get("message"))
.and_then(|value| value.as_str())
.unwrap_or_default();
let thread_id = context
.meta
.0
.get("threadId")
.and_then(|value| value.as_str())
.unwrap_or_default();
let mut meta = Meta::new();
meta.0.insert("calledBy".to_string(), json!("mcp-app"));
let mut result = CallToolResult::structured(json!({
"echoed": message,
"threadId": thread_id,
}));
result.content = vec![Content::text(format!("echo: {message}"))];
result.meta = Some(meta);

View File

@@ -1,6 +1,5 @@
use anyhow::Result;
use app_test_support::McpProcess;
use app_test_support::create_fake_rollout;
use app_test_support::create_mock_responses_server_repeating_assistant;
use app_test_support::to_response;
use codex_app_server_protocol::JSONRPCError;
@@ -20,11 +19,7 @@ use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput;
use codex_core::ARCHIVED_SESSIONS_SUBDIR;
use codex_core::find_archived_thread_path_by_id_str;
use codex_core::find_thread_path_by_id_str;
use codex_protocol::ThreadId;
use codex_state::DirectionalThreadSpawnEdgeStatus;
use codex_state::StateRuntime;
use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir;
@@ -165,311 +160,6 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn thread_archive_archives_spawned_descendants() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let parent_id = create_fake_rollout(
codex_home.path(),
"2025-01-01T00-00-00",
"2025-01-01T00:00:00Z",
"parent",
Some("mock_provider"),
/*git_info*/ None,
)?;
let child_id = create_fake_rollout(
codex_home.path(),
"2025-01-01T00-01-00",
"2025-01-01T00:01:00Z",
"child",
Some("mock_provider"),
/*git_info*/ None,
)?;
let grandchild_id = create_fake_rollout(
codex_home.path(),
"2025-01-01T00-02-00",
"2025-01-01T00:02:00Z",
"grandchild",
Some("mock_provider"),
/*git_info*/ None,
)?;
let parent_thread_id = ThreadId::from_string(&parent_id)?;
let child_thread_id = ThreadId::from_string(&child_id)?;
let grandchild_thread_id = ThreadId::from_string(&grandchild_id)?;
let state_db =
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
state_db
.mark_backfill_complete(/*last_watermark*/ None)
.await?;
state_db
.upsert_thread_spawn_edge(
parent_thread_id,
child_thread_id,
DirectionalThreadSpawnEdgeStatus::Closed,
)
.await?;
state_db
.upsert_thread_spawn_edge(
child_thread_id,
grandchild_thread_id,
DirectionalThreadSpawnEdgeStatus::Open,
)
.await?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: parent_id.clone(),
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
)
.await??;
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
let mut archived_ids = Vec::new();
for _ in 0..3 {
let notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/archived"),
)
.await??;
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
notification
.params
.expect("thread/archived notification params"),
)?;
archived_ids.push(archived_notification.thread_id);
}
assert_eq!(archived_ids, vec![parent_id, grandchild_id, child_id]);
for thread_id in [parent_thread_id, child_thread_id, grandchild_thread_id] {
assert!(
find_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
.await?
.is_none(),
"expected active rollout for {thread_id} to be archived"
);
assert!(
find_archived_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
.await?
.is_some(),
"expected archived rollout for {thread_id} to exist"
);
}
Ok(())
}
#[tokio::test]
async fn thread_archive_succeeds_when_descendant_archive_fails() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let parent_id = create_fake_rollout(
codex_home.path(),
"2025-01-01T00-00-00",
"2025-01-01T00:00:00Z",
"parent",
Some("mock_provider"),
/*git_info*/ None,
)?;
let child_id = create_fake_rollout(
codex_home.path(),
"2025-01-01T00-01-00",
"2025-01-01T00:01:00Z",
"child",
Some("mock_provider"),
/*git_info*/ None,
)?;
let grandchild_id = create_fake_rollout(
codex_home.path(),
"2025-01-01T00-02-00",
"2025-01-01T00:02:00Z",
"grandchild",
Some("mock_provider"),
/*git_info*/ None,
)?;
let parent_thread_id = ThreadId::from_string(&parent_id)?;
let child_thread_id = ThreadId::from_string(&child_id)?;
let grandchild_thread_id = ThreadId::from_string(&grandchild_id)?;
let state_db =
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
state_db
.mark_backfill_complete(/*last_watermark*/ None)
.await?;
state_db
.upsert_thread_spawn_edge(
parent_thread_id,
child_thread_id,
DirectionalThreadSpawnEdgeStatus::Closed,
)
.await?;
state_db
.upsert_thread_spawn_edge(
child_thread_id,
grandchild_thread_id,
DirectionalThreadSpawnEdgeStatus::Open,
)
.await?;
let child_rollout_path = find_thread_path_by_id_str(codex_home.path(), &child_id)
.await?
.expect("child rollout path");
let archived_child_path = codex_home
.path()
.join(ARCHIVED_SESSIONS_SUBDIR)
.join(child_rollout_path.file_name().expect("rollout file name"));
std::fs::create_dir_all(&archived_child_path)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: parent_id.clone(),
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
)
.await??;
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
let mut archived_ids = Vec::new();
for _ in 0..2 {
let notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/archived"),
)
.await??;
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
notification
.params
.expect("thread/archived notification params"),
)?;
archived_ids.push(archived_notification.thread_id);
}
assert_eq!(archived_ids, vec![parent_id, grandchild_id]);
assert!(
timeout(
std::time::Duration::from_millis(250),
mcp.read_stream_until_notification_message("thread/archived"),
)
.await
.is_err()
);
assert!(
child_rollout_path.exists(),
"child should stay active after descendant archive failure"
);
assert!(
archived_child_path.is_dir(),
"test conflict should remain in archived sessions"
);
for thread_id in [parent_thread_id, grandchild_thread_id] {
assert!(
find_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
.await?
.is_none(),
"expected active rollout for {thread_id} to be archived"
);
assert!(
find_archived_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
.await?
.is_some(),
"expected archived rollout for {thread_id} to exist"
);
}
Ok(())
}
#[tokio::test]
async fn thread_archive_succeeds_when_spawned_descendant_is_missing() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path(), &server.uri())?;
let parent_id = create_fake_rollout(
codex_home.path(),
"2025-01-01T00-00-00",
"2025-01-01T00:00:00Z",
"parent",
Some("mock_provider"),
/*git_info*/ None,
)?;
let parent_thread_id = ThreadId::from_string(&parent_id)?;
let missing_child_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000901")?;
let state_db =
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
state_db
.mark_backfill_complete(/*last_watermark*/ None)
.await?;
state_db
.upsert_thread_spawn_edge(
parent_thread_id,
missing_child_thread_id,
DirectionalThreadSpawnEdgeStatus::Closed,
)
.await?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
let archive_id = mcp
.send_thread_archive_request(ThreadArchiveParams {
thread_id: parent_id.clone(),
})
.await?;
let archive_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
)
.await??;
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
let notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/archived"),
)
.await??;
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
notification
.params
.expect("thread/archived notification params"),
)?;
assert_eq!(archived_notification.thread_id, parent_id);
assert!(
find_thread_path_by_id_str(codex_home.path(), &parent_id)
.await?
.is_none(),
"parent should be archived even when a descendant is missing"
);
assert!(
find_archived_thread_path_by_id_str(codex_home.path(), &parent_id)
.await?
.is_some(),
"parent should be moved into archived sessions"
);
Ok(())
}
#[tokio::test]
async fn thread_archive_clears_stale_subscriptions_before_resume() -> Result<()> {
let server = create_mock_responses_server_repeating_assistant("Done").await;

View File

@@ -130,9 +130,9 @@ async fn thread_unsubscribe_keeps_thread_loaded_until_idle_timeout() -> Result<(
async fn thread_unsubscribe_during_turn_keeps_turn_running() -> Result<()> {
#[cfg(target_os = "windows")]
let shell_command = vec![
"Start-Sleep".to_string(),
"-Seconds".to_string(),
"1".to_string(),
"powershell".to_string(),
"-Command".to_string(),
"Start-Sleep -Seconds 1".to_string(),
];
#[cfg(not(target_os = "windows"))]
let shell_command = vec!["sleep".to_string(), "1".to_string()];

View File

@@ -104,7 +104,6 @@ pub struct Client {
bearer_token: Option<String>,
user_agent: Option<HeaderValue>,
chatgpt_account_id: Option<String>,
chatgpt_account_is_fedramp: bool,
path_style: PathStyle,
}
@@ -130,7 +129,6 @@ impl Client {
bearer_token: None,
user_agent: None,
chatgpt_account_id: None,
chatgpt_account_is_fedramp: false,
path_style,
})
}
@@ -143,9 +141,6 @@ impl Client {
if let Some(account_id) = auth.get_account_id() {
client = client.with_chatgpt_account_id(account_id);
}
if auth.is_fedramp_account() {
client = client.with_fedramp_routing_header();
}
Ok(client)
}
@@ -166,11 +161,6 @@ impl Client {
self
}
pub fn with_fedramp_routing_header(mut self) -> Self {
self.chatgpt_account_is_fedramp = true;
self
}
pub fn with_path_style(mut self, style: PathStyle) -> Self {
self.path_style = style;
self
@@ -195,11 +185,6 @@ impl Client {
{
h.insert(name, hv);
}
if self.chatgpt_account_is_fedramp
&& let Ok(name) = HeaderName::from_bytes(b"X-OpenAI-Fedramp")
{
h.insert(name, HeaderValue::from_static("true"));
}
h
}

View File

@@ -179,7 +179,6 @@ struct UsageErrorBody {
pub struct CoreAuthProvider {
pub token: Option<String>,
pub account_id: Option<String>,
pub is_fedramp_account: bool,
}
impl CoreAuthProvider {
@@ -197,7 +196,6 @@ impl CoreAuthProvider {
Self {
token: token.map(str::to_string),
account_id: account_id.map(str::to_string),
is_fedramp_account: false,
}
}
}
@@ -214,8 +212,5 @@ impl ApiAuthProvider for CoreAuthProvider {
{
let _ = headers.insert("ChatGPT-Account-ID", header);
}
if self.is_fedramp_account {
crate::auth::add_fedramp_routing_header(headers);
}
}
}

View File

@@ -136,7 +136,6 @@ fn core_auth_provider_reports_when_auth_header_will_attach() {
let auth = CoreAuthProvider {
token: Some("access-token".to_string()),
account_id: None,
is_fedramp_account: false,
};
assert!(auth.auth_header_attached());
@@ -163,22 +162,3 @@ fn core_auth_provider_adds_auth_headers() {
Some("workspace-123")
);
}
#[test]
fn core_auth_provider_adds_fedramp_routing_header_for_fedramp_accounts() {
let auth = CoreAuthProvider {
token: Some("access-token".to_string()),
account_id: Some("workspace-123".to_string()),
is_fedramp_account: true,
};
let mut headers = HeaderMap::new();
crate::AuthProvider::add_auth_headers(&auth, &mut headers);
assert_eq!(
headers
.get("X-OpenAI-Fedramp")
.and_then(|value| value.to_str().ok()),
Some("true")
);
}

View File

@@ -1,5 +1,4 @@
use http::HeaderMap;
use http::HeaderValue;
/// Adds authentication headers to API requests.
///
@@ -9,26 +8,3 @@ use http::HeaderValue;
pub trait AuthProvider: Send + Sync {
fn add_auth_headers(&self, headers: &mut HeaderMap);
}
pub(crate) fn add_fedramp_routing_header(headers: &mut HeaderMap) {
headers.insert("X-OpenAI-Fedramp", HeaderValue::from_static("true"));
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn add_fedramp_routing_header_sets_header() {
let mut headers = HeaderMap::new();
add_fedramp_routing_header(&mut headers);
assert_eq!(
headers
.get("X-OpenAI-Fedramp")
.and_then(|v| v.to_str().ok()),
Some("true")
);
}
}

View File

@@ -758,13 +758,12 @@ impl McpConnectionManager {
let submit_id = startup_submit_id.clone();
let auth_entry = auth_entries.get(&server_name).cloned();
join_set.spawn(async move {
let mut outcome = async_managed_client.client().await;
let outcome = async_managed_client.client().await;
if cancel_token.is_cancelled() {
outcome = Err(StartupOutcomeError::Cancelled);
return (server_name, Err(StartupOutcomeError::Cancelled));
}
let status = match &outcome {
Ok(_) => McpStartupStatus::Ready,
Err(StartupOutcomeError::Cancelled) => McpStartupStatus::Cancelled,
Err(error) => {
let error_str = mcp_init_error_display(
server_name.as_str(),

View File

@@ -31,10 +31,6 @@ pub const DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS: i64 = 30;
pub const DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS: i64 = 6;
pub const DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION: usize = 256;
pub const DEFAULT_MEMORIES_MAX_UNUSED_DAYS: i64 = 30;
const MIN_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION: usize = 1;
const MAX_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION: usize = 4096;
const MIN_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 1;
const MAX_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 128;
const fn default_enabled() -> bool {
true
@@ -189,14 +185,12 @@ pub struct MemoriesToml {
/// When `false`, skip injecting memory usage instructions into developer prompts.
pub use_memories: Option<bool>,
/// Maximum number of recent raw memories retained for global consolidation.
#[schemars(range(min = 1, max = 4096))]
pub max_raw_memories_for_consolidation: Option<usize>,
/// Maximum number of days since a memory was last used before it becomes ineligible for phase 2 selection.
pub max_unused_days: Option<i64>,
/// Maximum age of the threads used for memories.
pub max_rollout_age_days: Option<i64>,
/// Maximum number of rollout candidates processed per pass.
#[schemars(range(min = 1, max = 128))]
pub max_rollouts_per_startup: Option<usize>,
/// Minimum idle time between last thread activity and memory creation (hours). > 12h recommended.
pub min_rollout_idle_hours: Option<i64>,
@@ -250,10 +244,7 @@ impl From<MemoriesToml> for MemoriesConfig {
max_raw_memories_for_consolidation: toml
.max_raw_memories_for_consolidation
.unwrap_or(defaults.max_raw_memories_for_consolidation)
.clamp(
MIN_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
MAX_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
),
.min(4096),
max_unused_days: toml
.max_unused_days
.unwrap_or(defaults.max_unused_days)
@@ -265,10 +256,7 @@ impl From<MemoriesToml> for MemoriesConfig {
max_rollouts_per_startup: toml
.max_rollouts_per_startup
.unwrap_or(defaults.max_rollouts_per_startup)
.clamp(
MIN_MEMORIES_MAX_ROLLOUTS_PER_STARTUP,
MAX_MEMORIES_MAX_ROLLOUTS_PER_STARTUP,
),
.min(128),
min_rollout_idle_hours: toml
.min_rollout_idle_hours
.unwrap_or(defaults.min_rollout_idle_hours)

View File

@@ -41,21 +41,3 @@ fn deserialize_skill_config_with_path_selector() {
}
);
}
#[test]
fn memories_config_clamps_count_limits_to_nonzero_values() {
let config = MemoriesConfig::from(MemoriesToml {
max_raw_memories_for_consolidation: Some(0),
max_rollouts_per_startup: Some(0),
..Default::default()
});
assert_eq!(
config,
MemoriesConfig {
max_raw_memories_for_consolidation: 1,
max_rollouts_per_startup: 1,
..MemoriesConfig::default()
}
);
}

View File

@@ -855,8 +855,7 @@
"max_raw_memories_for_consolidation": {
"description": "Maximum number of recent raw memories retained for global consolidation.",
"format": "uint",
"maximum": 4096.0,
"minimum": 1.0,
"minimum": 0.0,
"type": "integer"
},
"max_rollout_age_days": {
@@ -867,8 +866,7 @@
"max_rollouts_per_startup": {
"description": "Maximum number of rollout candidates processed per pass.",
"format": "uint",
"maximum": 128.0,
"minimum": 1.0,
"minimum": 0.0,
"type": "integer"
},
"max_unused_days": {

View File

@@ -737,7 +737,6 @@ mod tests {
chatgpt_plan_type: None,
chatgpt_user_id: user_id.map(ToOwned::to_owned),
chatgpt_account_id: Some(account_id.to_string()),
chatgpt_account_is_fedramp: false,
raw_jwt: fake_id_token(account_id, user_id),
},
access_token: format!("access-token-{account_id}"),

View File

@@ -115,7 +115,6 @@ async fn build_uploaded_local_argument_value(
let upload_auth = CoreAuthProvider {
token: Some(token_data.access_token),
account_id: token_data.account_id,
is_fedramp_account: auth.is_fedramp_account(),
};
let uploaded = upload_local_file(
turn_context.config.chatgpt_base_url.trim_end_matches('/'),

View File

@@ -477,6 +477,8 @@ async fn execute_mcp_tool_call(
metadata.and_then(|metadata| metadata.openai_file_input_params.as_deref()),
)
.await?;
let request_meta =
with_mcp_tool_call_thread_id_meta(request_meta, &sess.conversation_id.to_string());
let request_meta =
augment_mcp_tool_request_meta_with_sandbox_state(sess, turn_context, server, request_meta)
.await
@@ -660,6 +662,7 @@ pub(crate) struct McpToolApprovalMetadata {
const MCP_TOOL_CODEX_APPS_META_KEY: &str = "_codex_apps";
const MCP_TOOL_OPENAI_OUTPUT_TEMPLATE_META_KEY: &str = "openai/outputTemplate";
const MCP_TOOL_UI_RESOURCE_URI_META_KEY: &str = "ui/resourceUri";
const MCP_TOOL_THREAD_ID_META_KEY: &str = "threadId";
fn custom_mcp_tool_approval_mode(
turn_context: &TurnContext,
@@ -709,6 +712,30 @@ fn build_mcp_tool_call_request_meta(
(!request_meta.is_empty()).then_some(serde_json::Value::Object(request_meta))
}
fn with_mcp_tool_call_thread_id_meta(
meta: Option<serde_json::Value>,
thread_id: &str,
) -> Option<serde_json::Value> {
match meta {
Some(serde_json::Value::Object(mut map)) => {
map.insert(
MCP_TOOL_THREAD_ID_META_KEY.to_string(),
serde_json::Value::String(thread_id.to_string()),
);
Some(serde_json::Value::Object(map))
}
None => {
let mut map = serde_json::Map::new();
map.insert(
MCP_TOOL_THREAD_ID_META_KEY.to_string(),
serde_json::Value::String(thread_id.to_string()),
);
Some(serde_json::Value::Object(map))
}
other => other,
}
}
#[derive(Clone, Copy)]
struct McpToolApprovalPromptOptions {
allow_session_remember: bool,

View File

@@ -650,6 +650,35 @@ async fn codex_apps_tool_call_request_meta_includes_turn_metadata_and_codex_apps
);
}
#[test]
fn mcp_tool_call_thread_id_meta_is_added_to_request_meta() {
assert_eq!(
with_mcp_tool_call_thread_id_meta(
Some(serde_json::json!({
"source": "test-client",
"threadId": "stale-thread",
})),
"thread-live",
),
Some(serde_json::json!({
"source": "test-client",
"threadId": "thread-live",
}))
);
assert_eq!(
with_mcp_tool_call_thread_id_meta(None, "thread-live"),
Some(serde_json::json!({
"threadId": "thread-live",
}))
);
assert_eq!(
with_mcp_tool_call_thread_id_meta(Some(serde_json::json!("invalid-meta")), "thread-live"),
Some(serde_json::json!("invalid-meta"))
);
}
#[test]
fn accepted_elicitation_content_converts_to_request_user_input_response() {
let response = request_user_input_response_from_elicitation_content(Some(serde_json::json!(

View File

@@ -1,5 +1,6 @@
//! Verifies that parent and spawned subagent Responses API requests carry the expected window,
//! parent-thread, and subagent identity headers.
//! Exercises a real `responses-api-proxy` process with request dumping enabled, then verifies that
//! parent and spawned subagent requests carry the expected window, parent-thread, and subagent
//! identity headers in the dumped Responses API requests.
use anyhow::Result;
use anyhow::anyhow;
@@ -9,8 +10,6 @@ use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::user_input::UserInput;
use core_test_support::responses::ResponseMock;
use core_test_support::responses::ResponsesRequest;
use core_test_support::responses::ev_assistant_message;
use core_test_support::responses::ev_completed;
use core_test_support::responses::ev_function_call;
@@ -22,28 +21,119 @@ use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodex;
use core_test_support::test_codex::test_codex;
use pretty_assertions::assert_eq;
use serde_json::Value;
use serde_json::json;
use std::io::ErrorKind;
use std::io::Write;
use std::path::Path;
use std::process::Child;
use std::process::Command as StdCommand;
use std::process::Stdio;
use std::time::Duration;
use std::time::Instant;
use tempfile::TempDir;
const PARENT_PROMPT: &str = "spawn a subagent and report when it is started";
const CHILD_PROMPT: &str = "child: say done";
const SPAWN_CALL_ID: &str = "spawn-call-1";
const REQUEST_POLL_INTERVAL: Duration = Duration::from_millis(/*millis*/ 20);
const PROXY_START_TIMEOUT: Duration = Duration::from_secs(/*secs*/ 30);
const PROXY_POLL_INTERVAL: Duration = Duration::from_millis(/*millis*/ 20);
const TURN_TIMEOUT: Duration = Duration::from_secs(/*secs*/ 60);
struct ResponsesApiProxy {
child: Child,
port: u16,
}
impl ResponsesApiProxy {
fn start(upstream_url: &str, dump_dir: &Path) -> Result<Self> {
let server_info = dump_dir.join("server-info.json");
let (proxy_program, use_codex_multitool) =
match codex_utils_cargo_bin::cargo_bin("codex-responses-api-proxy") {
Ok(path) => (path, false),
Err(_) => (codex_utils_cargo_bin::cargo_bin("codex")?, true),
};
let mut command = StdCommand::new(proxy_program);
if use_codex_multitool {
command.arg("responses-api-proxy");
}
let mut child = command
.args(["--server-info"])
.arg(&server_info)
.args(["--upstream-url", upstream_url, "--dump-dir"])
.arg(dump_dir)
.stdin(Stdio::piped())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()?;
child
.stdin
.take()
.ok_or_else(|| anyhow!("responses-api-proxy stdin was not piped"))?
.write_all(b"dummy\n")?;
let deadline = Instant::now() + PROXY_START_TIMEOUT;
loop {
match std::fs::read_to_string(&server_info) {
Ok(info) => {
if !info.trim().is_empty() {
match serde_json::from_str::<Value>(&info) {
Ok(info) => {
let port = info
.get("port")
.and_then(Value::as_u64)
.ok_or_else(|| anyhow!("proxy server info missing port"))?;
return Ok(Self {
child,
port: u16::try_from(port)?,
});
}
Err(err) if err.is_eof() => {}
Err(err) => return Err(err.into()),
}
}
}
Err(err) if err.kind() == ErrorKind::NotFound => {}
Err(err) => return Err(err.into()),
}
if let Some(status) = child.try_wait()? {
return Err(anyhow!(
"responses-api-proxy exited before writing server info: {status}"
));
}
if Instant::now() >= deadline {
return Err(anyhow!("timed out waiting for responses-api-proxy"));
}
std::thread::sleep(PROXY_POLL_INTERVAL);
}
}
fn base_url(&self) -> String {
format!("http://127.0.0.1:{}/v1", self.port)
}
}
impl Drop for ResponsesApiProxy {
fn drop(&mut self) {
let _ = self.child.kill();
let _ = self.child.wait();
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn responses_api_parent_and_subagent_requests_include_identity_headers() -> Result<()> {
async fn responses_api_proxy_dumps_parent_and_subagent_identity_headers() -> Result<()> {
skip_if_no_network!(Ok(()));
let server = start_mock_server().await;
let dump_dir = TempDir::new()?;
let proxy =
ResponsesApiProxy::start(&format!("{}/v1/responses", server.uri()), dump_dir.path())?;
let spawn_args = serde_json::to_string(&json!({ "message": CHILD_PROMPT }))?;
let parent_mock = mount_sse_once_match(
mount_sse_once_match(
&server,
|req: &wiremock::Request| {
request_body_contains(req, PARENT_PROMPT)
&& request_header(req, "x-openai-subagent").is_none()
},
|req: &wiremock::Request| request_body_contains(req, PARENT_PROMPT),
sse(vec![
ev_response_created("resp-parent-1"),
ev_function_call(SPAWN_CALL_ID, "spawn_agent", &spawn_args),
@@ -51,12 +141,10 @@ async fn responses_api_parent_and_subagent_requests_include_identity_headers() -
]),
)
.await;
let child_mock = mount_sse_once_match(
mount_sse_once_match(
&server,
|req: &wiremock::Request| {
request_body_contains(req, CHILD_PROMPT)
&& !request_body_contains(req, SPAWN_CALL_ID)
&& request_header(req, "x-openai-subagent") == Some("collab_spawn")
request_body_contains(req, CHILD_PROMPT) && !request_body_contains(req, SPAWN_CALL_ID)
},
sse(vec![
ev_response_created("resp-child-1"),
@@ -67,10 +155,7 @@ async fn responses_api_parent_and_subagent_requests_include_identity_headers() -
.await;
mount_sse_once_match(
&server,
|req: &wiremock::Request| {
request_body_contains(req, SPAWN_CALL_ID)
&& request_header(req, "x-openai-subagent").is_none()
},
|req: &wiremock::Request| request_body_contains(req, SPAWN_CALL_ID),
sse(vec![
ev_response_created("resp-parent-2"),
ev_assistant_message("msg-parent-2", "parent done"),
@@ -79,52 +164,50 @@ async fn responses_api_parent_and_subagent_requests_include_identity_headers() -
)
.await;
let mut builder = test_codex().with_config(|config| {
let proxy_base_url = proxy.base_url();
let mut builder = test_codex().with_config(move |config| {
config.model_provider.base_url = Some(proxy_base_url);
config
.features
.disable(Feature::EnableRequestCompression)
.expect("test config should allow feature update");
});
let test = builder.build(&server).await?;
submit_turn_with_timeout(&test, PARENT_PROMPT).await?;
submit_turn_with_timeout(&test, PARENT_PROMPT, dump_dir.path()).await?;
let parent = wait_for_matching_request(&parent_mock, "parent request", |request| {
request.body_contains_text(PARENT_PROMPT) && request.header("x-openai-subagent").is_none()
})
.await?;
let child = wait_for_matching_request(&child_mock, "child request", |request| {
request.body_contains_text(CHILD_PROMPT)
&& !request.body_contains_text(SPAWN_CALL_ID)
&& request.header("x-openai-subagent").as_deref() == Some("collab_spawn")
})
.await?;
let dumps = wait_for_proxy_request_dumps(dump_dir.path())?;
let parent = dumps
.iter()
.find(|dump| dump_body_contains(dump, PARENT_PROMPT))
.ok_or_else(|| anyhow!("missing parent request dump"))?;
let child = dumps
.iter()
.find(|dump| {
dump_body_contains(dump, CHILD_PROMPT) && !dump_body_contains(dump, SPAWN_CALL_ID)
})
.ok_or_else(|| anyhow!("missing child request dump"))?;
let parent_window_id = parent
.header("x-codex-window-id")
let parent_window_id = header(parent, "x-codex-window-id")
.ok_or_else(|| anyhow!("parent request missing x-codex-window-id"))?;
let child_window_id = child
.header("x-codex-window-id")
let child_window_id = header(child, "x-codex-window-id")
.ok_or_else(|| anyhow!("child request missing x-codex-window-id"))?;
let (parent_thread_id, parent_generation) = split_window_id(&parent_window_id)?;
let (child_thread_id, child_generation) = split_window_id(&child_window_id)?;
let (parent_thread_id, parent_generation) = split_window_id(parent_window_id)?;
let (child_thread_id, child_generation) = split_window_id(child_window_id)?;
assert_eq!(parent_generation, 0);
assert_eq!(child_generation, 0);
assert!(child_thread_id != parent_thread_id);
assert_eq!(parent.header("x-openai-subagent"), None);
assert_eq!(header(parent, "x-openai-subagent"), None);
assert_eq!(header(child, "x-openai-subagent"), Some("collab_spawn"));
assert_eq!(
child.header("x-openai-subagent").as_deref(),
Some("collab_spawn")
);
assert_eq!(
child.header("x-codex-parent-thread-id").as_deref(),
header(child, "x-codex-parent-thread-id"),
Some(parent_thread_id)
);
Ok(())
}
async fn submit_turn_with_timeout(test: &TestCodex, prompt: &str) -> Result<()> {
async fn submit_turn_with_timeout(test: &TestCodex, prompt: &str, dump_dir: &Path) -> Result<()> {
let session_model = test.session_configured.model.clone();
test.codex
.submit(Op::UserTurn {
@@ -152,14 +235,14 @@ async fn submit_turn_with_timeout(test: &TestCodex, prompt: &str) -> Result<()>
})
.await?;
let turn_started = wait_for_event_result(test, "turn started", |event| {
let turn_started = wait_for_event_result(test, "turn started", dump_dir, |event| {
matches!(event, EventMsg::TurnStarted(_))
})
.await?;
let EventMsg::TurnStarted(turn_started) = turn_started else {
unreachable!("event predicate only matches turn started events");
};
wait_for_event_result(test, "turn complete", |event| match event {
wait_for_event_result(test, "turn complete", dump_dir, |event| match event {
EventMsg::TurnComplete(event) => event.turn_id == turn_started.turn_id,
_ => false,
})
@@ -168,33 +251,10 @@ async fn submit_turn_with_timeout(test: &TestCodex, prompt: &str) -> Result<()>
Ok(())
}
async fn wait_for_matching_request<F>(
mock: &ResponseMock,
label: &str,
mut predicate: F,
) -> Result<ResponsesRequest>
where
F: FnMut(&ResponsesRequest) -> bool,
{
tokio::time::timeout(TURN_TIMEOUT, async {
loop {
if let Some(request) = mock
.requests()
.into_iter()
.find(|request| predicate(request))
{
return request;
}
tokio::time::sleep(REQUEST_POLL_INTERVAL).await;
}
})
.await
.map_err(|_| anyhow!("timed out waiting for {label}"))
}
async fn wait_for_event_result<F>(
test: &TestCodex,
stage: &str,
dump_dir: &Path,
mut predicate: F,
) -> Result<EventMsg>
where
@@ -213,8 +273,9 @@ where
.await
.map_err(|_| {
anyhow!(
"timed out waiting for {stage}; saw events: {}",
seen_events.join(" | ")
"timed out waiting for {stage}; saw events: {}; {}",
seen_events.join(" | "),
proxy_dump_summary(dump_dir)
)
})?
}
@@ -229,8 +290,95 @@ fn request_body_contains(req: &wiremock::Request, text: &str) -> bool {
std::str::from_utf8(&req.body).is_ok_and(|body| body.contains(text))
}
fn request_header<'a>(req: &'a wiremock::Request, name: &str) -> Option<&'a str> {
req.headers.get(name).and_then(|value| value.to_str().ok())
fn wait_for_proxy_request_dumps(dump_dir: &Path) -> Result<Vec<Value>> {
let deadline = Instant::now() + Duration::from_secs(/*secs*/ 2);
loop {
let dumps = read_proxy_request_dumps(dump_dir).unwrap_or_default();
if dumps.len() >= 3
&& dumps
.iter()
.any(|dump| dump_body_contains(dump, CHILD_PROMPT))
{
return Ok(dumps);
}
if Instant::now() >= deadline {
return Err(anyhow!(
"timed out waiting for proxy request dumps, got {}",
dumps.len()
));
}
std::thread::sleep(PROXY_POLL_INTERVAL);
}
}
fn read_proxy_request_dumps(dump_dir: &Path) -> Result<Vec<Value>> {
let mut dumps = Vec::new();
for entry in std::fs::read_dir(dump_dir)? {
let path = entry?.path();
if path
.file_name()
.and_then(|name| name.to_str())
.is_some_and(|name| name.ends_with("-request.json"))
{
let contents = std::fs::read_to_string(&path)?;
if contents.trim().is_empty() {
continue;
}
match serde_json::from_str(&contents) {
Ok(dump) => dumps.push(dump),
Err(err) if err.is_eof() => continue,
Err(err) => return Err(err.into()),
}
}
}
Ok(dumps)
}
fn proxy_dump_summary(dump_dir: &Path) -> String {
match read_proxy_request_dumps(dump_dir) {
Ok(dumps) => {
let bodies = dumps
.iter()
.filter_map(|dump| dump.get("body"))
.map(Value::to_string)
.collect::<Vec<_>>()
.join("; ");
format!("proxy wrote {} request dumps: {bodies}", dumps.len())
}
Err(err) => format!("failed to read proxy request dumps: {err}"),
}
}
#[test]
fn read_proxy_request_dumps_ignores_in_progress_files() -> Result<()> {
let dump_dir = TempDir::new()?;
std::fs::write(dump_dir.path().join("empty-request.json"), "")?;
std::fs::write(dump_dir.path().join("partial-request.json"), "{\"body\"")?;
std::fs::write(
dump_dir.path().join("complete-request.json"),
serde_json::to_string(&json!({ "body": "ready" }))?,
)?;
assert_eq!(
read_proxy_request_dumps(dump_dir.path())?,
vec![json!({ "body": "ready" })]
);
Ok(())
}
fn dump_body_contains(dump: &Value, text: &str) -> bool {
dump.get("body")
.is_some_and(|body| body.to_string().contains(text))
}
fn header<'a>(dump: &'a Value, name: &str) -> Option<&'a str> {
dump.get("headers")?.as_array()?.iter().find_map(|header| {
(header.get("name")?.as_str()?.eq_ignore_ascii_case(name))
.then(|| header.get("value")?.as_str())
.flatten()
})
}
fn split_window_id(window_id: &str) -> Result<(&str, u64)> {

View File

@@ -11,7 +11,6 @@ pub fn auth_provider_from_auth(
return Ok(CoreAuthProvider {
token: Some(api_key),
account_id: None,
is_fedramp_account: false,
});
}
@@ -19,7 +18,6 @@ pub fn auth_provider_from_auth(
return Ok(CoreAuthProvider {
token: Some(token),
account_id: None,
is_fedramp_account: false,
});
}
@@ -28,13 +26,11 @@ pub fn auth_provider_from_auth(
Ok(CoreAuthProvider {
token: Some(token),
account_id: auth.get_account_id(),
is_fedramp_account: auth.is_fedramp_account(),
})
} else {
Ok(CoreAuthProvider {
token: None,
account_id: None,
is_fedramp_account: false,
})
}
}

View File

@@ -130,7 +130,6 @@ async fn pro_account_with_no_api_key_uses_chatgpt_auth() {
chatgpt_plan_type: Some(InternalPlanType::Known(InternalKnownPlan::Pro)),
chatgpt_user_id: Some("user-12345".to_string()),
chatgpt_account_id: None,
chatgpt_account_is_fedramp: false,
raw_jwt: fake_jwt,
},
access_token: "test-access-token".to_string(),

View File

@@ -293,12 +293,6 @@ impl CodexAuth {
self.get_current_token_data().and_then(|t| t.account_id)
}
/// Returns false if `is_chatgpt_auth()` is false or the token omits the FedRAMP claim.
pub fn is_fedramp_account(&self) -> bool {
self.get_current_token_data()
.is_some_and(|t| t.id_token.is_fedramp_account())
}
/// Returns `None` if `is_chatgpt_auth()` is false.
pub fn get_account_email(&self) -> Option<String> {
self.get_current_token_data().and_then(|t| t.id_token.email)

View File

@@ -36,8 +36,6 @@ pub struct IdTokenInfo {
pub chatgpt_user_id: Option<String>,
/// Organization/workspace identifier associated with the token, if present.
pub chatgpt_account_id: Option<String>,
/// Whether the selected ChatGPT workspace must route through the FedRAMP edge.
pub chatgpt_account_is_fedramp: bool,
pub raw_jwt: String,
}
@@ -62,10 +60,6 @@ impl IdTokenInfo {
Some(PlanType::Known(plan)) if plan.is_workspace_account()
)
}
pub fn is_fedramp_account(&self) -> bool {
self.chatgpt_account_is_fedramp
}
}
#[derive(Deserialize)]
@@ -94,8 +88,6 @@ struct AuthClaims {
user_id: Option<String>,
#[serde(default)]
chatgpt_account_id: Option<String>,
#[serde(default)]
chatgpt_account_is_fedramp: bool,
}
#[derive(Deserialize)]
@@ -147,7 +139,6 @@ pub fn parse_chatgpt_jwt_claims(jwt: &str) -> Result<IdTokenInfo, IdTokenInfoErr
chatgpt_plan_type: auth.chatgpt_plan_type,
chatgpt_user_id: auth.chatgpt_user_id.or(auth.user_id),
chatgpt_account_id: auth.chatgpt_account_id,
chatgpt_account_is_fedramp: auth.chatgpt_account_is_fedramp,
}),
None => Ok(IdTokenInfo {
email,
@@ -155,7 +146,6 @@ pub fn parse_chatgpt_jwt_claims(jwt: &str) -> Result<IdTokenInfo, IdTokenInfoErr
chatgpt_plan_type: None,
chatgpt_user_id: None,
chatgpt_account_id: None,
chatgpt_account_is_fedramp: false,
}),
}
}

View File

@@ -114,22 +114,6 @@ fn id_token_info_handles_missing_fields() {
let info = parse_chatgpt_jwt_claims(&fake_jwt).expect("should parse");
assert!(info.email.is_none());
assert!(info.get_chatgpt_plan_type().is_none());
assert_eq!(info.is_fedramp_account(), false);
}
#[test]
fn id_token_info_parses_fedramp_account_claim() {
let fake_jwt = fake_jwt(serde_json::json!({
"email": "user@example.com",
"https://api.openai.com/auth": {
"chatgpt_account_id": "account-fed",
"chatgpt_account_is_fedramp": true,
}
}));
let info = parse_chatgpt_jwt_claims(&fake_jwt).expect("should parse");
assert_eq!(info.chatgpt_account_id.as_deref(), Some("account-fed"));
assert_eq!(info.is_fedramp_account(), true);
}
#[test]

View File

@@ -231,12 +231,6 @@ mod tests {
Header::from_bytes(&b"Cookie"[..], &b"user-session=secret"[..]).expect("cookie header"),
Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..])
.expect("content-type header"),
Header::from_bytes(&b"x-codex-window-id"[..], &b"thread-1:0"[..])
.expect("window id header"),
Header::from_bytes(&b"x-codex-parent-thread-id"[..], &b"parent-thread-1"[..])
.expect("parent thread id header"),
Header::from_bytes(&b"x-openai-subagent"[..], &b"collab_spawn"[..])
.expect("subagent header"),
];
let exchange_dump = dumper
@@ -268,18 +262,6 @@ mod tests {
{
"name": "Content-Type",
"value": "application/json"
},
{
"name": "x-codex-window-id",
"value": "thread-1:0"
},
{
"name": "x-codex-parent-thread-id",
"value": "parent-thread-1"
},
{
"name": "x-openai-subagent",
"value": "collab_spawn"
}
],
"body": {

View File

@@ -384,8 +384,9 @@ mod tests {
use std::io;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use pretty_assertions::assert_eq;
use tokio::time::Instant;
use tracing_subscriber::filter::Targets;
use tracing_subscriber::fmt::writer::MakeWriter;
use tracing_subscriber::layer::SubscriberExt;
@@ -441,7 +442,6 @@ mod tests {
.await
.expect("initialize runtime");
let writer = SharedWriter::default();
let layer = start(runtime.clone());
let subscriber = tracing_subscriber::registry()
.with(
@@ -452,8 +452,7 @@ mod tests {
.with_filter(Targets::new().with_default(tracing::Level::TRACE)),
)
.with(
layer
.clone()
start(runtime.clone())
.with_filter(Targets::new().with_default(tracing::Level::TRACE)),
);
let guard = subscriber.set_default();
@@ -464,7 +463,6 @@ mod tests {
});
tracing::debug!("threadless-after");
layer.flush().await;
drop(guard);
let feedback_logs = writer.snapshot();
@@ -477,17 +475,24 @@ mod tests {
.collect::<Vec<_>>()
.join("\n")
};
let sqlite_logs = String::from_utf8(
runtime
.query_feedback_logs("thread-1")
.await
.expect("query feedback logs"),
)
.expect("valid utf-8");
assert_eq!(
without_timestamps(&sqlite_logs),
without_timestamps(&feedback_logs)
);
let deadline = Instant::now() + Duration::from_secs(2);
loop {
let sqlite_logs = String::from_utf8(
runtime
.query_feedback_logs("thread-1")
.await
.expect("query feedback logs"),
)
.expect("valid utf-8");
if without_timestamps(&sqlite_logs) == without_timestamps(&feedback_logs) {
break;
}
assert!(
Instant::now() < deadline,
"sqlite feedback logs did not match feedback formatter output before timeout\nsqlite:\n{sqlite_logs}\nfeedback:\n{feedback_logs}"
);
tokio::time::sleep(Duration::from_millis(10)).await;
}
let _ = tokio::fs::remove_dir_all(codex_home).await;
}

View File

@@ -144,17 +144,6 @@ ON CONFLICT(child_thread_id) DO UPDATE SET
.await
}
/// List all spawned descendants of `root_thread_id`.
///
/// Descendants are returned breadth-first by depth, then by thread id for stable ordering.
pub async fn list_thread_spawn_descendants(
&self,
root_thread_id: ThreadId,
) -> anyhow::Result<Vec<ThreadId>> {
self.list_thread_spawn_descendants_matching(root_thread_id, /*status*/ None)
.await
}
/// Find a direct spawned child of `parent_thread_id` by canonical agent path.
pub async fn find_thread_spawn_child_by_path(
&self,
@@ -1666,11 +1655,5 @@ mod tests {
.await
.expect("open descendants from child should load");
assert_eq!(open_descendants_from_child, vec![grandchild_thread_id]);
let all_descendants = runtime
.list_thread_spawn_descendants(parent_thread_id)
.await
.expect("all descendants should load");
assert_eq!(all_descendants, vec![child_thread_id, grandchild_thread_id]);
}
}

View File

@@ -2378,11 +2378,10 @@ impl App {
) -> Result<bool> {
match op.view() {
AppCommandView::Interrupt => {
if let Some(turn_id) = self.active_turn_id_for_thread(thread_id).await {
app_server.turn_interrupt(thread_id, turn_id).await?;
} else {
app_server.startup_interrupt(thread_id).await?;
}
let Some(turn_id) = self.active_turn_id_for_thread(thread_id).await else {
return Ok(true);
};
app_server.turn_interrupt(thread_id, turn_id).await?;
Ok(true)
}
AppCommandView::UserTurn {
@@ -11411,18 +11410,11 @@ guardian_approval = true
#[tokio::test]
async fn interrupt_without_active_turn_is_treated_as_handled() {
let mut app = make_test_app().await;
let thread_id = ThreadId::new();
let mut app_server =
crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
.await
.expect("embedded app server");
let started = app_server
.start_thread(app.chat_widget.config_ref())
.await
.expect("thread/start should succeed");
let thread_id = started.session.thread_id;
app.enqueue_primary_thread_session(started.session, started.turns)
.await
.expect("primary thread should be registered");
let op = AppCommand::interrupt();
let handled = app

View File

@@ -476,10 +476,6 @@ impl AppServerSession {
Ok(())
}
pub(crate) async fn startup_interrupt(&mut self, thread_id: ThreadId) -> Result<()> {
self.turn_interrupt(thread_id, String::new()).await
}
pub(crate) async fn turn_steer(
&mut self,
thread_id: ThreadId,