mirror of
https://github.com/openai/codex.git
synced 2026-04-19 12:14:48 +00:00
Compare commits
1 Commits
jif/cascad
...
codex/prop
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b302f5786b |
5
.github/scripts/run-bazel-ci.sh
vendored
5
.github/scripts/run-bazel-ci.sh
vendored
@@ -94,11 +94,6 @@ print_bazel_test_log_tails() {
|
||||
local rel_path="${target#//}"
|
||||
rel_path="${rel_path/://}"
|
||||
local test_log="${testlogs_dir}/${rel_path}/test.log"
|
||||
local reported_test_log
|
||||
reported_test_log="$(grep -F "FAIL: ${target} " "$console_log" | sed -nE 's#.* \(see ([^)]+/test\.log)\).*#\1#p' | head -n 1 || true)"
|
||||
if [[ -n "$reported_test_log" ]]; then
|
||||
test_log="$reported_test_log"
|
||||
fi
|
||||
|
||||
echo "::group::Bazel test log tail for ${target}"
|
||||
if [[ -f "$test_log" ]]; then
|
||||
|
||||
@@ -146,7 +146,7 @@ Example with notification opt-out:
|
||||
- `thread/memoryMode/set` — experimental; set a thread’s persisted memory eligibility to `"enabled"` or `"disabled"` for either a loaded thread or a stored rollout; returns `{}` on success.
|
||||
- `memory/reset` — experimental; clear the current `CODEX_HOME/memories` directory and reset persisted memory stage data in sqlite while preserving existing thread memory modes; returns `{}` on success.
|
||||
- `thread/status/changed` — notification emitted when a loaded thread’s status changes (`threadId` + new `status`).
|
||||
- `thread/archive` — move a thread’s rollout file into the archived directory and attempt to move any spawned descendant thread rollout files; returns `{}` on success and emits `thread/archived` for each archived thread.
|
||||
- `thread/archive` — move a thread’s rollout file into the archived directory; returns `{}` on success and emits `thread/archived`.
|
||||
- `thread/unsubscribe` — unsubscribe this connection from thread turn/item events. If this was the last subscriber, the server keeps the thread loaded and unloads it only after it has had no subscribers and no thread activity for 30 minutes, then emits `thread/closed`.
|
||||
- `thread/name/set` — set or update a thread’s user-facing name for either a loaded thread or a persisted rollout; returns `{}` on success and emits `thread/name/updated` to initialized, opted-in clients. Thread names are not required to be unique; name lookups resolve to the most recently updated thread.
|
||||
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success and emits `thread/unarchived`.
|
||||
@@ -426,7 +426,7 @@ Experimental: use `memory/reset` to clear local memory artifacts and sqlite-back
|
||||
|
||||
### Example: Archive a thread
|
||||
|
||||
Use `thread/archive` to move the persisted rollout (stored as a JSONL file on disk) into the archived sessions directory and attempt to move any spawned descendant thread rollouts.
|
||||
Use `thread/archive` to move the persisted rollout (stored as a JSONL file on disk) into the archived sessions directory.
|
||||
|
||||
```json
|
||||
{ "method": "thread/archive", "id": 21, "params": { "threadId": "thr_b" } }
|
||||
|
||||
@@ -186,7 +186,6 @@ use codex_app_server_protocol::ThreadUnsubscribeStatus;
|
||||
use codex_app_server_protocol::Turn;
|
||||
use codex_app_server_protocol::TurnError;
|
||||
use codex_app_server_protocol::TurnInterruptParams;
|
||||
use codex_app_server_protocol::TurnInterruptResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::TurnStatus;
|
||||
@@ -2732,36 +2731,8 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
let mut thread_ids = vec![thread_id];
|
||||
if let Some(state_db_ctx) = get_state_db(&self.config).await {
|
||||
let descendants = match state_db_ctx.list_thread_spawn_descendants(thread_id).await {
|
||||
Ok(descendants) => descendants,
|
||||
Err(err) => {
|
||||
self.outgoing
|
||||
.send_error(
|
||||
request_id,
|
||||
JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!(
|
||||
"failed to list spawned descendants for thread id {thread_id}: {err}"
|
||||
),
|
||||
data: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
let mut seen = HashSet::from([thread_id]);
|
||||
for descendant_id in descendants {
|
||||
if seen.insert(descendant_id) {
|
||||
thread_ids.push(descendant_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut archive_thread_ids = Vec::new();
|
||||
match self
|
||||
let thread_id_str = thread_id.to_string();
|
||||
if let Err(err) = self
|
||||
.thread_store
|
||||
.read_thread(StoreReadThreadParams {
|
||||
thread_id,
|
||||
@@ -2770,98 +2741,34 @@ impl CodexMessageProcessor {
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(thread) => {
|
||||
if thread.archived_at.is_none() {
|
||||
archive_thread_ids.push(thread_id);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
self.outgoing
|
||||
.send_error(request_id, thread_store_archive_error("archive", err))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
for descendant_thread_id in thread_ids.into_iter().skip(1) {
|
||||
match self
|
||||
.thread_store
|
||||
.read_thread(StoreReadThreadParams {
|
||||
thread_id: descendant_thread_id,
|
||||
include_archived: true,
|
||||
include_history: false,
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(thread) => {
|
||||
if thread.archived_at.is_none() {
|
||||
archive_thread_ids.push(descendant_thread_id);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to read spawned descendant thread {descendant_thread_id} while archiving {thread_id}: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut archived_thread_ids = Vec::new();
|
||||
let Some((parent_thread_id, descendant_thread_ids)) = archive_thread_ids.split_first()
|
||||
else {
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadArchiveResponse {})
|
||||
.send_error(request_id, thread_store_archive_error("archive", err))
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
}
|
||||
self.prepare_thread_for_archive(thread_id).await;
|
||||
|
||||
self.prepare_thread_for_archive(*parent_thread_id).await;
|
||||
match self
|
||||
.thread_store
|
||||
.archive_thread(StoreArchiveThreadParams {
|
||||
thread_id: *parent_thread_id,
|
||||
})
|
||||
.archive_thread(StoreArchiveThreadParams { thread_id })
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
archived_thread_ids.push(parent_thread_id.to_string());
|
||||
let response = ThreadArchiveResponse {};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
let notification = ThreadArchivedNotification {
|
||||
thread_id: thread_id_str,
|
||||
};
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::ThreadArchived(notification))
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
self.outgoing
|
||||
.send_error(request_id, thread_store_archive_error("archive", err))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
for descendant_thread_id in descendant_thread_ids.iter().rev().copied() {
|
||||
self.prepare_thread_for_archive(descendant_thread_id).await;
|
||||
match self
|
||||
.thread_store
|
||||
.archive_thread(StoreArchiveThreadParams {
|
||||
thread_id: descendant_thread_id,
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
archived_thread_ids.push(descendant_thread_id.to_string());
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to archive spawned descendant thread {descendant_thread_id} while archiving {thread_id}: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadArchiveResponse {})
|
||||
.await;
|
||||
for thread_id in archived_thread_ids {
|
||||
let notification = ThreadArchivedNotification { thread_id };
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::ThreadArchived(notification))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn thread_increment_elicitation(
|
||||
@@ -5685,17 +5592,19 @@ impl CodexMessageProcessor {
|
||||
params: McpServerToolCallParams,
|
||||
) {
|
||||
let outgoing = Arc::clone(&self.outgoing);
|
||||
let (_, thread) = match self.load_thread(¶ms.thread_id).await {
|
||||
let thread_id = params.thread_id.clone();
|
||||
let (_, thread) = match self.load_thread(&thread_id).await {
|
||||
Ok(thread) => thread,
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
let meta = with_mcp_tool_call_thread_id_meta(params.meta, &thread_id);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let result = thread
|
||||
.call_mcp_tool(¶ms.server, ¶ms.tool, params.arguments, params.meta)
|
||||
.call_mcp_tool(¶ms.server, ¶ms.tool, params.arguments, meta)
|
||||
.await;
|
||||
match result {
|
||||
Ok(result) => {
|
||||
@@ -7652,12 +7561,9 @@ impl CodexMessageProcessor {
|
||||
|
||||
async fn turn_interrupt(&self, request_id: ConnectionRequestId, params: TurnInterruptParams) {
|
||||
let TurnInterruptParams { thread_id, turn_id } = params;
|
||||
let is_startup_interrupt = turn_id.is_empty();
|
||||
if !is_startup_interrupt {
|
||||
self.outgoing
|
||||
.record_request_turn_id(&request_id, &turn_id)
|
||||
.await;
|
||||
}
|
||||
self.outgoing
|
||||
.record_request_turn_id(&request_id, &turn_id)
|
||||
.await;
|
||||
|
||||
let (thread_uuid, thread) = match self.load_thread(&thread_id).await {
|
||||
Ok(v) => v,
|
||||
@@ -7667,48 +7573,21 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
// Record turn interrupts so we can reply when TurnAborted arrives. Startup
|
||||
// interrupts do not have a turn and are acknowledged after submission.
|
||||
if !is_startup_interrupt {
|
||||
let request = request_id.clone();
|
||||
|
||||
// Record the pending interrupt so we can reply when TurnAborted arrives.
|
||||
{
|
||||
let thread_state = self.thread_state_manager.thread_state(thread_uuid).await;
|
||||
let mut thread_state = thread_state.lock().await;
|
||||
thread_state
|
||||
.pending_interrupts
|
||||
.push((request_id.clone(), ApiVersion::V2));
|
||||
.push((request, ApiVersion::V2));
|
||||
}
|
||||
|
||||
// Submit the interrupt. Turn interrupts respond upon TurnAborted; startup
|
||||
// interrupts respond here because startup cancellation has no turn event.
|
||||
let submit_result = self
|
||||
// Submit the interrupt; we'll respond upon TurnAborted.
|
||||
let _ = self
|
||||
.submit_core_op(&request_id, thread.as_ref(), Op::Interrupt)
|
||||
.await;
|
||||
match submit_result {
|
||||
Ok(_) if is_startup_interrupt => {
|
||||
self.outgoing
|
||||
.send_response(request_id, TurnInterruptResponse {})
|
||||
.await;
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(err) => {
|
||||
if !is_startup_interrupt {
|
||||
let thread_state = self.thread_state_manager.thread_state(thread_uuid).await;
|
||||
let mut thread_state = thread_state.lock().await;
|
||||
thread_state
|
||||
.pending_interrupts
|
||||
.retain(|(pending_request_id, _)| pending_request_id != &request_id);
|
||||
}
|
||||
let interrupt_target = if is_startup_interrupt {
|
||||
"startup"
|
||||
} else {
|
||||
"turn"
|
||||
};
|
||||
self.send_internal_error(
|
||||
request_id,
|
||||
format!("failed to interrupt {interrupt_target}: {err}"),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn ensure_conversation_listener(
|
||||
@@ -9306,6 +9185,32 @@ fn thread_store_archive_error(operation: &str, err: ThreadStoreError) -> JSONRPC
|
||||
}
|
||||
}
|
||||
|
||||
const MCP_TOOL_THREAD_ID_META_KEY: &str = "threadId";
|
||||
|
||||
fn with_mcp_tool_call_thread_id_meta(
|
||||
meta: Option<serde_json::Value>,
|
||||
thread_id: &str,
|
||||
) -> Option<serde_json::Value> {
|
||||
match meta {
|
||||
Some(serde_json::Value::Object(mut map)) => {
|
||||
map.insert(
|
||||
MCP_TOOL_THREAD_ID_META_KEY.to_string(),
|
||||
serde_json::Value::String(thread_id.to_string()),
|
||||
);
|
||||
Some(serde_json::Value::Object(map))
|
||||
}
|
||||
None => {
|
||||
let mut map = serde_json::Map::new();
|
||||
map.insert(
|
||||
MCP_TOOL_THREAD_ID_META_KEY.to_string(),
|
||||
serde_json::Value::String(thread_id.to_string()),
|
||||
);
|
||||
Some(serde_json::Value::Object(map))
|
||||
}
|
||||
other => other,
|
||||
}
|
||||
}
|
||||
|
||||
fn summary_from_stored_thread(
|
||||
thread: StoredThread,
|
||||
fallback_provider: &str,
|
||||
|
||||
@@ -83,10 +83,11 @@ url = "{mcp_server_url}/mcp"
|
||||
)
|
||||
.await??;
|
||||
let ThreadStartResponse { thread, .. } = to_response(thread_start_resp)?;
|
||||
let thread_id = thread.id.clone();
|
||||
|
||||
let tool_call_request_id = mcp
|
||||
.send_mcp_server_tool_call_request(McpServerToolCallParams {
|
||||
thread_id: thread.id,
|
||||
thread_id: thread_id.clone(),
|
||||
server: TEST_SERVER_NAME.to_string(),
|
||||
tool: TEST_TOOL_NAME.to_string(),
|
||||
arguments: Some(json!({
|
||||
@@ -114,6 +115,7 @@ url = "{mcp_server_url}/mcp"
|
||||
response.structured_content,
|
||||
Some(json!({
|
||||
"echoed": "hello from app",
|
||||
"threadId": thread_id,
|
||||
}))
|
||||
);
|
||||
assert_eq!(response.is_error, Some(false));
|
||||
@@ -203,7 +205,7 @@ impl ServerHandler for ToolAppsMcpServer {
|
||||
async fn call_tool(
|
||||
&self,
|
||||
request: CallToolRequestParams,
|
||||
_context: RequestContext<RoleServer>,
|
||||
context: RequestContext<RoleServer>,
|
||||
) -> Result<CallToolResult, rmcp::ErrorData> {
|
||||
assert_eq!(request.name.as_ref(), TEST_TOOL_NAME);
|
||||
let message = request
|
||||
@@ -212,12 +214,19 @@ impl ServerHandler for ToolAppsMcpServer {
|
||||
.and_then(|arguments| arguments.get("message"))
|
||||
.and_then(|value| value.as_str())
|
||||
.unwrap_or_default();
|
||||
let thread_id = context
|
||||
.meta
|
||||
.0
|
||||
.get("threadId")
|
||||
.and_then(|value| value.as_str())
|
||||
.unwrap_or_default();
|
||||
|
||||
let mut meta = Meta::new();
|
||||
meta.0.insert("calledBy".to_string(), json!("mcp-app"));
|
||||
|
||||
let mut result = CallToolResult::structured(json!({
|
||||
"echoed": message,
|
||||
"threadId": thread_id,
|
||||
}));
|
||||
result.content = vec![Content::text(format!("echo: {message}"))];
|
||||
result.meta = Some(meta);
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_fake_rollout;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
@@ -20,11 +19,7 @@ use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_core::ARCHIVED_SESSIONS_SUBDIR;
|
||||
use codex_core::find_archived_thread_path_by_id_str;
|
||||
use codex_core::find_thread_path_by_id_str;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_state::DirectionalThreadSpawnEdgeStatus;
|
||||
use codex_state::StateRuntime;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
@@ -165,311 +160,6 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_archive_archives_spawned_descendants() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let parent_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-00-00",
|
||||
"2025-01-01T00:00:00Z",
|
||||
"parent",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let child_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-01-00",
|
||||
"2025-01-01T00:01:00Z",
|
||||
"child",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let grandchild_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-02-00",
|
||||
"2025-01-01T00:02:00Z",
|
||||
"grandchild",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
|
||||
let parent_thread_id = ThreadId::from_string(&parent_id)?;
|
||||
let child_thread_id = ThreadId::from_string(&child_id)?;
|
||||
let grandchild_thread_id = ThreadId::from_string(&grandchild_id)?;
|
||||
let state_db =
|
||||
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
|
||||
state_db
|
||||
.mark_backfill_complete(/*last_watermark*/ None)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
child_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Closed,
|
||||
)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
child_thread_id,
|
||||
grandchild_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let archive_id = mcp
|
||||
.send_thread_archive_request(ThreadArchiveParams {
|
||||
thread_id: parent_id.clone(),
|
||||
})
|
||||
.await?;
|
||||
let archive_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
|
||||
|
||||
let mut archived_ids = Vec::new();
|
||||
for _ in 0..3 {
|
||||
let notification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/archived"),
|
||||
)
|
||||
.await??;
|
||||
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
|
||||
notification
|
||||
.params
|
||||
.expect("thread/archived notification params"),
|
||||
)?;
|
||||
archived_ids.push(archived_notification.thread_id);
|
||||
}
|
||||
assert_eq!(archived_ids, vec![parent_id, grandchild_id, child_id]);
|
||||
|
||||
for thread_id in [parent_thread_id, child_thread_id, grandchild_thread_id] {
|
||||
assert!(
|
||||
find_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
|
||||
.await?
|
||||
.is_none(),
|
||||
"expected active rollout for {thread_id} to be archived"
|
||||
);
|
||||
assert!(
|
||||
find_archived_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
|
||||
.await?
|
||||
.is_some(),
|
||||
"expected archived rollout for {thread_id} to exist"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_archive_succeeds_when_descendant_archive_fails() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let parent_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-00-00",
|
||||
"2025-01-01T00:00:00Z",
|
||||
"parent",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let child_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-01-00",
|
||||
"2025-01-01T00:01:00Z",
|
||||
"child",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let grandchild_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-02-00",
|
||||
"2025-01-01T00:02:00Z",
|
||||
"grandchild",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
|
||||
let parent_thread_id = ThreadId::from_string(&parent_id)?;
|
||||
let child_thread_id = ThreadId::from_string(&child_id)?;
|
||||
let grandchild_thread_id = ThreadId::from_string(&grandchild_id)?;
|
||||
let state_db =
|
||||
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
|
||||
state_db
|
||||
.mark_backfill_complete(/*last_watermark*/ None)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
child_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Closed,
|
||||
)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
child_thread_id,
|
||||
grandchild_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let child_rollout_path = find_thread_path_by_id_str(codex_home.path(), &child_id)
|
||||
.await?
|
||||
.expect("child rollout path");
|
||||
let archived_child_path = codex_home
|
||||
.path()
|
||||
.join(ARCHIVED_SESSIONS_SUBDIR)
|
||||
.join(child_rollout_path.file_name().expect("rollout file name"));
|
||||
std::fs::create_dir_all(&archived_child_path)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let archive_id = mcp
|
||||
.send_thread_archive_request(ThreadArchiveParams {
|
||||
thread_id: parent_id.clone(),
|
||||
})
|
||||
.await?;
|
||||
let archive_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
|
||||
|
||||
let mut archived_ids = Vec::new();
|
||||
for _ in 0..2 {
|
||||
let notification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/archived"),
|
||||
)
|
||||
.await??;
|
||||
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
|
||||
notification
|
||||
.params
|
||||
.expect("thread/archived notification params"),
|
||||
)?;
|
||||
archived_ids.push(archived_notification.thread_id);
|
||||
}
|
||||
assert_eq!(archived_ids, vec![parent_id, grandchild_id]);
|
||||
|
||||
assert!(
|
||||
timeout(
|
||||
std::time::Duration::from_millis(250),
|
||||
mcp.read_stream_until_notification_message("thread/archived"),
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
|
||||
assert!(
|
||||
child_rollout_path.exists(),
|
||||
"child should stay active after descendant archive failure"
|
||||
);
|
||||
assert!(
|
||||
archived_child_path.is_dir(),
|
||||
"test conflict should remain in archived sessions"
|
||||
);
|
||||
for thread_id in [parent_thread_id, grandchild_thread_id] {
|
||||
assert!(
|
||||
find_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
|
||||
.await?
|
||||
.is_none(),
|
||||
"expected active rollout for {thread_id} to be archived"
|
||||
);
|
||||
assert!(
|
||||
find_archived_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
|
||||
.await?
|
||||
.is_some(),
|
||||
"expected archived rollout for {thread_id} to exist"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_archive_succeeds_when_spawned_descendant_is_missing() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let parent_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-00-00",
|
||||
"2025-01-01T00:00:00Z",
|
||||
"parent",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let parent_thread_id = ThreadId::from_string(&parent_id)?;
|
||||
let missing_child_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000901")?;
|
||||
|
||||
let state_db =
|
||||
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
|
||||
state_db
|
||||
.mark_backfill_complete(/*last_watermark*/ None)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
missing_child_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Closed,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let archive_id = mcp
|
||||
.send_thread_archive_request(ThreadArchiveParams {
|
||||
thread_id: parent_id.clone(),
|
||||
})
|
||||
.await?;
|
||||
let archive_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
|
||||
|
||||
let notification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/archived"),
|
||||
)
|
||||
.await??;
|
||||
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
|
||||
notification
|
||||
.params
|
||||
.expect("thread/archived notification params"),
|
||||
)?;
|
||||
assert_eq!(archived_notification.thread_id, parent_id);
|
||||
|
||||
assert!(
|
||||
find_thread_path_by_id_str(codex_home.path(), &parent_id)
|
||||
.await?
|
||||
.is_none(),
|
||||
"parent should be archived even when a descendant is missing"
|
||||
);
|
||||
assert!(
|
||||
find_archived_thread_path_by_id_str(codex_home.path(), &parent_id)
|
||||
.await?
|
||||
.is_some(),
|
||||
"parent should be moved into archived sessions"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_archive_clears_stale_subscriptions_before_resume() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
|
||||
@@ -130,9 +130,9 @@ async fn thread_unsubscribe_keeps_thread_loaded_until_idle_timeout() -> Result<(
|
||||
async fn thread_unsubscribe_during_turn_keeps_turn_running() -> Result<()> {
|
||||
#[cfg(target_os = "windows")]
|
||||
let shell_command = vec![
|
||||
"Start-Sleep".to_string(),
|
||||
"-Seconds".to_string(),
|
||||
"1".to_string(),
|
||||
"powershell".to_string(),
|
||||
"-Command".to_string(),
|
||||
"Start-Sleep -Seconds 1".to_string(),
|
||||
];
|
||||
#[cfg(not(target_os = "windows"))]
|
||||
let shell_command = vec!["sleep".to_string(), "1".to_string()];
|
||||
|
||||
@@ -104,7 +104,6 @@ pub struct Client {
|
||||
bearer_token: Option<String>,
|
||||
user_agent: Option<HeaderValue>,
|
||||
chatgpt_account_id: Option<String>,
|
||||
chatgpt_account_is_fedramp: bool,
|
||||
path_style: PathStyle,
|
||||
}
|
||||
|
||||
@@ -130,7 +129,6 @@ impl Client {
|
||||
bearer_token: None,
|
||||
user_agent: None,
|
||||
chatgpt_account_id: None,
|
||||
chatgpt_account_is_fedramp: false,
|
||||
path_style,
|
||||
})
|
||||
}
|
||||
@@ -143,9 +141,6 @@ impl Client {
|
||||
if let Some(account_id) = auth.get_account_id() {
|
||||
client = client.with_chatgpt_account_id(account_id);
|
||||
}
|
||||
if auth.is_fedramp_account() {
|
||||
client = client.with_fedramp_routing_header();
|
||||
}
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
@@ -166,11 +161,6 @@ impl Client {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_fedramp_routing_header(mut self) -> Self {
|
||||
self.chatgpt_account_is_fedramp = true;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_path_style(mut self, style: PathStyle) -> Self {
|
||||
self.path_style = style;
|
||||
self
|
||||
@@ -195,11 +185,6 @@ impl Client {
|
||||
{
|
||||
h.insert(name, hv);
|
||||
}
|
||||
if self.chatgpt_account_is_fedramp
|
||||
&& let Ok(name) = HeaderName::from_bytes(b"X-OpenAI-Fedramp")
|
||||
{
|
||||
h.insert(name, HeaderValue::from_static("true"));
|
||||
}
|
||||
h
|
||||
}
|
||||
|
||||
|
||||
@@ -179,7 +179,6 @@ struct UsageErrorBody {
|
||||
pub struct CoreAuthProvider {
|
||||
pub token: Option<String>,
|
||||
pub account_id: Option<String>,
|
||||
pub is_fedramp_account: bool,
|
||||
}
|
||||
|
||||
impl CoreAuthProvider {
|
||||
@@ -197,7 +196,6 @@ impl CoreAuthProvider {
|
||||
Self {
|
||||
token: token.map(str::to_string),
|
||||
account_id: account_id.map(str::to_string),
|
||||
is_fedramp_account: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -214,8 +212,5 @@ impl ApiAuthProvider for CoreAuthProvider {
|
||||
{
|
||||
let _ = headers.insert("ChatGPT-Account-ID", header);
|
||||
}
|
||||
if self.is_fedramp_account {
|
||||
crate::auth::add_fedramp_routing_header(headers);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -136,7 +136,6 @@ fn core_auth_provider_reports_when_auth_header_will_attach() {
|
||||
let auth = CoreAuthProvider {
|
||||
token: Some("access-token".to_string()),
|
||||
account_id: None,
|
||||
is_fedramp_account: false,
|
||||
};
|
||||
|
||||
assert!(auth.auth_header_attached());
|
||||
@@ -163,22 +162,3 @@ fn core_auth_provider_adds_auth_headers() {
|
||||
Some("workspace-123")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn core_auth_provider_adds_fedramp_routing_header_for_fedramp_accounts() {
|
||||
let auth = CoreAuthProvider {
|
||||
token: Some("access-token".to_string()),
|
||||
account_id: Some("workspace-123".to_string()),
|
||||
is_fedramp_account: true,
|
||||
};
|
||||
let mut headers = HeaderMap::new();
|
||||
|
||||
crate::AuthProvider::add_auth_headers(&auth, &mut headers);
|
||||
|
||||
assert_eq!(
|
||||
headers
|
||||
.get("X-OpenAI-Fedramp")
|
||||
.and_then(|value| value.to_str().ok()),
|
||||
Some("true")
|
||||
);
|
||||
}
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
use http::HeaderMap;
|
||||
use http::HeaderValue;
|
||||
|
||||
/// Adds authentication headers to API requests.
|
||||
///
|
||||
@@ -9,26 +8,3 @@ use http::HeaderValue;
|
||||
pub trait AuthProvider: Send + Sync {
|
||||
fn add_auth_headers(&self, headers: &mut HeaderMap);
|
||||
}
|
||||
|
||||
pub(crate) fn add_fedramp_routing_header(headers: &mut HeaderMap) {
|
||||
headers.insert("X-OpenAI-Fedramp", HeaderValue::from_static("true"));
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn add_fedramp_routing_header_sets_header() {
|
||||
let mut headers = HeaderMap::new();
|
||||
|
||||
add_fedramp_routing_header(&mut headers);
|
||||
|
||||
assert_eq!(
|
||||
headers
|
||||
.get("X-OpenAI-Fedramp")
|
||||
.and_then(|v| v.to_str().ok()),
|
||||
Some("true")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -758,13 +758,12 @@ impl McpConnectionManager {
|
||||
let submit_id = startup_submit_id.clone();
|
||||
let auth_entry = auth_entries.get(&server_name).cloned();
|
||||
join_set.spawn(async move {
|
||||
let mut outcome = async_managed_client.client().await;
|
||||
let outcome = async_managed_client.client().await;
|
||||
if cancel_token.is_cancelled() {
|
||||
outcome = Err(StartupOutcomeError::Cancelled);
|
||||
return (server_name, Err(StartupOutcomeError::Cancelled));
|
||||
}
|
||||
let status = match &outcome {
|
||||
Ok(_) => McpStartupStatus::Ready,
|
||||
Err(StartupOutcomeError::Cancelled) => McpStartupStatus::Cancelled,
|
||||
Err(error) => {
|
||||
let error_str = mcp_init_error_display(
|
||||
server_name.as_str(),
|
||||
|
||||
@@ -31,10 +31,6 @@ pub const DEFAULT_MEMORIES_MAX_ROLLOUT_AGE_DAYS: i64 = 30;
|
||||
pub const DEFAULT_MEMORIES_MIN_ROLLOUT_IDLE_HOURS: i64 = 6;
|
||||
pub const DEFAULT_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION: usize = 256;
|
||||
pub const DEFAULT_MEMORIES_MAX_UNUSED_DAYS: i64 = 30;
|
||||
const MIN_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION: usize = 1;
|
||||
const MAX_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION: usize = 4096;
|
||||
const MIN_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 1;
|
||||
const MAX_MEMORIES_MAX_ROLLOUTS_PER_STARTUP: usize = 128;
|
||||
|
||||
const fn default_enabled() -> bool {
|
||||
true
|
||||
@@ -189,14 +185,12 @@ pub struct MemoriesToml {
|
||||
/// When `false`, skip injecting memory usage instructions into developer prompts.
|
||||
pub use_memories: Option<bool>,
|
||||
/// Maximum number of recent raw memories retained for global consolidation.
|
||||
#[schemars(range(min = 1, max = 4096))]
|
||||
pub max_raw_memories_for_consolidation: Option<usize>,
|
||||
/// Maximum number of days since a memory was last used before it becomes ineligible for phase 2 selection.
|
||||
pub max_unused_days: Option<i64>,
|
||||
/// Maximum age of the threads used for memories.
|
||||
pub max_rollout_age_days: Option<i64>,
|
||||
/// Maximum number of rollout candidates processed per pass.
|
||||
#[schemars(range(min = 1, max = 128))]
|
||||
pub max_rollouts_per_startup: Option<usize>,
|
||||
/// Minimum idle time between last thread activity and memory creation (hours). > 12h recommended.
|
||||
pub min_rollout_idle_hours: Option<i64>,
|
||||
@@ -250,10 +244,7 @@ impl From<MemoriesToml> for MemoriesConfig {
|
||||
max_raw_memories_for_consolidation: toml
|
||||
.max_raw_memories_for_consolidation
|
||||
.unwrap_or(defaults.max_raw_memories_for_consolidation)
|
||||
.clamp(
|
||||
MIN_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
|
||||
MAX_MEMORIES_MAX_RAW_MEMORIES_FOR_CONSOLIDATION,
|
||||
),
|
||||
.min(4096),
|
||||
max_unused_days: toml
|
||||
.max_unused_days
|
||||
.unwrap_or(defaults.max_unused_days)
|
||||
@@ -265,10 +256,7 @@ impl From<MemoriesToml> for MemoriesConfig {
|
||||
max_rollouts_per_startup: toml
|
||||
.max_rollouts_per_startup
|
||||
.unwrap_or(defaults.max_rollouts_per_startup)
|
||||
.clamp(
|
||||
MIN_MEMORIES_MAX_ROLLOUTS_PER_STARTUP,
|
||||
MAX_MEMORIES_MAX_ROLLOUTS_PER_STARTUP,
|
||||
),
|
||||
.min(128),
|
||||
min_rollout_idle_hours: toml
|
||||
.min_rollout_idle_hours
|
||||
.unwrap_or(defaults.min_rollout_idle_hours)
|
||||
|
||||
@@ -41,21 +41,3 @@ fn deserialize_skill_config_with_path_selector() {
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn memories_config_clamps_count_limits_to_nonzero_values() {
|
||||
let config = MemoriesConfig::from(MemoriesToml {
|
||||
max_raw_memories_for_consolidation: Some(0),
|
||||
max_rollouts_per_startup: Some(0),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
assert_eq!(
|
||||
config,
|
||||
MemoriesConfig {
|
||||
max_raw_memories_for_consolidation: 1,
|
||||
max_rollouts_per_startup: 1,
|
||||
..MemoriesConfig::default()
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@@ -855,8 +855,7 @@
|
||||
"max_raw_memories_for_consolidation": {
|
||||
"description": "Maximum number of recent raw memories retained for global consolidation.",
|
||||
"format": "uint",
|
||||
"maximum": 4096.0,
|
||||
"minimum": 1.0,
|
||||
"minimum": 0.0,
|
||||
"type": "integer"
|
||||
},
|
||||
"max_rollout_age_days": {
|
||||
@@ -867,8 +866,7 @@
|
||||
"max_rollouts_per_startup": {
|
||||
"description": "Maximum number of rollout candidates processed per pass.",
|
||||
"format": "uint",
|
||||
"maximum": 128.0,
|
||||
"minimum": 1.0,
|
||||
"minimum": 0.0,
|
||||
"type": "integer"
|
||||
},
|
||||
"max_unused_days": {
|
||||
|
||||
@@ -737,7 +737,6 @@ mod tests {
|
||||
chatgpt_plan_type: None,
|
||||
chatgpt_user_id: user_id.map(ToOwned::to_owned),
|
||||
chatgpt_account_id: Some(account_id.to_string()),
|
||||
chatgpt_account_is_fedramp: false,
|
||||
raw_jwt: fake_id_token(account_id, user_id),
|
||||
},
|
||||
access_token: format!("access-token-{account_id}"),
|
||||
|
||||
@@ -115,7 +115,6 @@ async fn build_uploaded_local_argument_value(
|
||||
let upload_auth = CoreAuthProvider {
|
||||
token: Some(token_data.access_token),
|
||||
account_id: token_data.account_id,
|
||||
is_fedramp_account: auth.is_fedramp_account(),
|
||||
};
|
||||
let uploaded = upload_local_file(
|
||||
turn_context.config.chatgpt_base_url.trim_end_matches('/'),
|
||||
|
||||
@@ -477,6 +477,8 @@ async fn execute_mcp_tool_call(
|
||||
metadata.and_then(|metadata| metadata.openai_file_input_params.as_deref()),
|
||||
)
|
||||
.await?;
|
||||
let request_meta =
|
||||
with_mcp_tool_call_thread_id_meta(request_meta, &sess.conversation_id.to_string());
|
||||
let request_meta =
|
||||
augment_mcp_tool_request_meta_with_sandbox_state(sess, turn_context, server, request_meta)
|
||||
.await
|
||||
@@ -660,6 +662,7 @@ pub(crate) struct McpToolApprovalMetadata {
|
||||
const MCP_TOOL_CODEX_APPS_META_KEY: &str = "_codex_apps";
|
||||
const MCP_TOOL_OPENAI_OUTPUT_TEMPLATE_META_KEY: &str = "openai/outputTemplate";
|
||||
const MCP_TOOL_UI_RESOURCE_URI_META_KEY: &str = "ui/resourceUri";
|
||||
const MCP_TOOL_THREAD_ID_META_KEY: &str = "threadId";
|
||||
|
||||
fn custom_mcp_tool_approval_mode(
|
||||
turn_context: &TurnContext,
|
||||
@@ -709,6 +712,30 @@ fn build_mcp_tool_call_request_meta(
|
||||
(!request_meta.is_empty()).then_some(serde_json::Value::Object(request_meta))
|
||||
}
|
||||
|
||||
fn with_mcp_tool_call_thread_id_meta(
|
||||
meta: Option<serde_json::Value>,
|
||||
thread_id: &str,
|
||||
) -> Option<serde_json::Value> {
|
||||
match meta {
|
||||
Some(serde_json::Value::Object(mut map)) => {
|
||||
map.insert(
|
||||
MCP_TOOL_THREAD_ID_META_KEY.to_string(),
|
||||
serde_json::Value::String(thread_id.to_string()),
|
||||
);
|
||||
Some(serde_json::Value::Object(map))
|
||||
}
|
||||
None => {
|
||||
let mut map = serde_json::Map::new();
|
||||
map.insert(
|
||||
MCP_TOOL_THREAD_ID_META_KEY.to_string(),
|
||||
serde_json::Value::String(thread_id.to_string()),
|
||||
);
|
||||
Some(serde_json::Value::Object(map))
|
||||
}
|
||||
other => other,
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
struct McpToolApprovalPromptOptions {
|
||||
allow_session_remember: bool,
|
||||
|
||||
@@ -650,6 +650,35 @@ async fn codex_apps_tool_call_request_meta_includes_turn_metadata_and_codex_apps
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mcp_tool_call_thread_id_meta_is_added_to_request_meta() {
|
||||
assert_eq!(
|
||||
with_mcp_tool_call_thread_id_meta(
|
||||
Some(serde_json::json!({
|
||||
"source": "test-client",
|
||||
"threadId": "stale-thread",
|
||||
})),
|
||||
"thread-live",
|
||||
),
|
||||
Some(serde_json::json!({
|
||||
"source": "test-client",
|
||||
"threadId": "thread-live",
|
||||
}))
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
with_mcp_tool_call_thread_id_meta(None, "thread-live"),
|
||||
Some(serde_json::json!({
|
||||
"threadId": "thread-live",
|
||||
}))
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
with_mcp_tool_call_thread_id_meta(Some(serde_json::json!("invalid-meta")), "thread-live"),
|
||||
Some(serde_json::json!("invalid-meta"))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn accepted_elicitation_content_converts_to_request_user_input_response() {
|
||||
let response = request_user_input_response_from_elicitation_content(Some(serde_json::json!(
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
//! Verifies that parent and spawned subagent Responses API requests carry the expected window,
|
||||
//! parent-thread, and subagent identity headers.
|
||||
//! Exercises a real `responses-api-proxy` process with request dumping enabled, then verifies that
|
||||
//! parent and spawned subagent requests carry the expected window, parent-thread, and subagent
|
||||
//! identity headers in the dumped Responses API requests.
|
||||
|
||||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
@@ -9,8 +10,6 @@ use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use codex_protocol::user_input::UserInput;
|
||||
use core_test_support::responses::ResponseMock;
|
||||
use core_test_support::responses::ResponsesRequest;
|
||||
use core_test_support::responses::ev_assistant_message;
|
||||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_function_call;
|
||||
@@ -22,28 +21,119 @@ use core_test_support::skip_if_no_network;
|
||||
use core_test_support::test_codex::TestCodex;
|
||||
use core_test_support::test_codex::test_codex;
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::Value;
|
||||
use serde_json::json;
|
||||
use std::io::ErrorKind;
|
||||
use std::io::Write;
|
||||
use std::path::Path;
|
||||
use std::process::Child;
|
||||
use std::process::Command as StdCommand;
|
||||
use std::process::Stdio;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
use tempfile::TempDir;
|
||||
|
||||
const PARENT_PROMPT: &str = "spawn a subagent and report when it is started";
|
||||
const CHILD_PROMPT: &str = "child: say done";
|
||||
const SPAWN_CALL_ID: &str = "spawn-call-1";
|
||||
const REQUEST_POLL_INTERVAL: Duration = Duration::from_millis(/*millis*/ 20);
|
||||
const PROXY_START_TIMEOUT: Duration = Duration::from_secs(/*secs*/ 30);
|
||||
const PROXY_POLL_INTERVAL: Duration = Duration::from_millis(/*millis*/ 20);
|
||||
const TURN_TIMEOUT: Duration = Duration::from_secs(/*secs*/ 60);
|
||||
|
||||
struct ResponsesApiProxy {
|
||||
child: Child,
|
||||
port: u16,
|
||||
}
|
||||
|
||||
impl ResponsesApiProxy {
|
||||
fn start(upstream_url: &str, dump_dir: &Path) -> Result<Self> {
|
||||
let server_info = dump_dir.join("server-info.json");
|
||||
let (proxy_program, use_codex_multitool) =
|
||||
match codex_utils_cargo_bin::cargo_bin("codex-responses-api-proxy") {
|
||||
Ok(path) => (path, false),
|
||||
Err(_) => (codex_utils_cargo_bin::cargo_bin("codex")?, true),
|
||||
};
|
||||
let mut command = StdCommand::new(proxy_program);
|
||||
if use_codex_multitool {
|
||||
command.arg("responses-api-proxy");
|
||||
}
|
||||
let mut child = command
|
||||
.args(["--server-info"])
|
||||
.arg(&server_info)
|
||||
.args(["--upstream-url", upstream_url, "--dump-dir"])
|
||||
.arg(dump_dir)
|
||||
.stdin(Stdio::piped())
|
||||
.stdout(Stdio::null())
|
||||
.stderr(Stdio::null())
|
||||
.spawn()?;
|
||||
|
||||
child
|
||||
.stdin
|
||||
.take()
|
||||
.ok_or_else(|| anyhow!("responses-api-proxy stdin was not piped"))?
|
||||
.write_all(b"dummy\n")?;
|
||||
|
||||
let deadline = Instant::now() + PROXY_START_TIMEOUT;
|
||||
loop {
|
||||
match std::fs::read_to_string(&server_info) {
|
||||
Ok(info) => {
|
||||
if !info.trim().is_empty() {
|
||||
match serde_json::from_str::<Value>(&info) {
|
||||
Ok(info) => {
|
||||
let port = info
|
||||
.get("port")
|
||||
.and_then(Value::as_u64)
|
||||
.ok_or_else(|| anyhow!("proxy server info missing port"))?;
|
||||
return Ok(Self {
|
||||
child,
|
||||
port: u16::try_from(port)?,
|
||||
});
|
||||
}
|
||||
Err(err) if err.is_eof() => {}
|
||||
Err(err) => return Err(err.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => {}
|
||||
Err(err) => return Err(err.into()),
|
||||
}
|
||||
if let Some(status) = child.try_wait()? {
|
||||
return Err(anyhow!(
|
||||
"responses-api-proxy exited before writing server info: {status}"
|
||||
));
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
return Err(anyhow!("timed out waiting for responses-api-proxy"));
|
||||
}
|
||||
std::thread::sleep(PROXY_POLL_INTERVAL);
|
||||
}
|
||||
}
|
||||
|
||||
fn base_url(&self) -> String {
|
||||
format!("http://127.0.0.1:{}/v1", self.port)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ResponsesApiProxy {
|
||||
fn drop(&mut self) {
|
||||
let _ = self.child.kill();
|
||||
let _ = self.child.wait();
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_api_parent_and_subagent_requests_include_identity_headers() -> Result<()> {
|
||||
async fn responses_api_proxy_dumps_parent_and_subagent_identity_headers() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let dump_dir = TempDir::new()?;
|
||||
let proxy =
|
||||
ResponsesApiProxy::start(&format!("{}/v1/responses", server.uri()), dump_dir.path())?;
|
||||
|
||||
let spawn_args = serde_json::to_string(&json!({ "message": CHILD_PROMPT }))?;
|
||||
let parent_mock = mount_sse_once_match(
|
||||
mount_sse_once_match(
|
||||
&server,
|
||||
|req: &wiremock::Request| {
|
||||
request_body_contains(req, PARENT_PROMPT)
|
||||
&& request_header(req, "x-openai-subagent").is_none()
|
||||
},
|
||||
|req: &wiremock::Request| request_body_contains(req, PARENT_PROMPT),
|
||||
sse(vec![
|
||||
ev_response_created("resp-parent-1"),
|
||||
ev_function_call(SPAWN_CALL_ID, "spawn_agent", &spawn_args),
|
||||
@@ -51,12 +141,10 @@ async fn responses_api_parent_and_subagent_requests_include_identity_headers() -
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let child_mock = mount_sse_once_match(
|
||||
mount_sse_once_match(
|
||||
&server,
|
||||
|req: &wiremock::Request| {
|
||||
request_body_contains(req, CHILD_PROMPT)
|
||||
&& !request_body_contains(req, SPAWN_CALL_ID)
|
||||
&& request_header(req, "x-openai-subagent") == Some("collab_spawn")
|
||||
request_body_contains(req, CHILD_PROMPT) && !request_body_contains(req, SPAWN_CALL_ID)
|
||||
},
|
||||
sse(vec![
|
||||
ev_response_created("resp-child-1"),
|
||||
@@ -67,10 +155,7 @@ async fn responses_api_parent_and_subagent_requests_include_identity_headers() -
|
||||
.await;
|
||||
mount_sse_once_match(
|
||||
&server,
|
||||
|req: &wiremock::Request| {
|
||||
request_body_contains(req, SPAWN_CALL_ID)
|
||||
&& request_header(req, "x-openai-subagent").is_none()
|
||||
},
|
||||
|req: &wiremock::Request| request_body_contains(req, SPAWN_CALL_ID),
|
||||
sse(vec![
|
||||
ev_response_created("resp-parent-2"),
|
||||
ev_assistant_message("msg-parent-2", "parent done"),
|
||||
@@ -79,52 +164,50 @@ async fn responses_api_parent_and_subagent_requests_include_identity_headers() -
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
let proxy_base_url = proxy.base_url();
|
||||
let mut builder = test_codex().with_config(move |config| {
|
||||
config.model_provider.base_url = Some(proxy_base_url);
|
||||
config
|
||||
.features
|
||||
.disable(Feature::EnableRequestCompression)
|
||||
.expect("test config should allow feature update");
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
submit_turn_with_timeout(&test, PARENT_PROMPT).await?;
|
||||
submit_turn_with_timeout(&test, PARENT_PROMPT, dump_dir.path()).await?;
|
||||
|
||||
let parent = wait_for_matching_request(&parent_mock, "parent request", |request| {
|
||||
request.body_contains_text(PARENT_PROMPT) && request.header("x-openai-subagent").is_none()
|
||||
})
|
||||
.await?;
|
||||
let child = wait_for_matching_request(&child_mock, "child request", |request| {
|
||||
request.body_contains_text(CHILD_PROMPT)
|
||||
&& !request.body_contains_text(SPAWN_CALL_ID)
|
||||
&& request.header("x-openai-subagent").as_deref() == Some("collab_spawn")
|
||||
})
|
||||
.await?;
|
||||
let dumps = wait_for_proxy_request_dumps(dump_dir.path())?;
|
||||
let parent = dumps
|
||||
.iter()
|
||||
.find(|dump| dump_body_contains(dump, PARENT_PROMPT))
|
||||
.ok_or_else(|| anyhow!("missing parent request dump"))?;
|
||||
let child = dumps
|
||||
.iter()
|
||||
.find(|dump| {
|
||||
dump_body_contains(dump, CHILD_PROMPT) && !dump_body_contains(dump, SPAWN_CALL_ID)
|
||||
})
|
||||
.ok_or_else(|| anyhow!("missing child request dump"))?;
|
||||
|
||||
let parent_window_id = parent
|
||||
.header("x-codex-window-id")
|
||||
let parent_window_id = header(parent, "x-codex-window-id")
|
||||
.ok_or_else(|| anyhow!("parent request missing x-codex-window-id"))?;
|
||||
let child_window_id = child
|
||||
.header("x-codex-window-id")
|
||||
let child_window_id = header(child, "x-codex-window-id")
|
||||
.ok_or_else(|| anyhow!("child request missing x-codex-window-id"))?;
|
||||
let (parent_thread_id, parent_generation) = split_window_id(&parent_window_id)?;
|
||||
let (child_thread_id, child_generation) = split_window_id(&child_window_id)?;
|
||||
let (parent_thread_id, parent_generation) = split_window_id(parent_window_id)?;
|
||||
let (child_thread_id, child_generation) = split_window_id(child_window_id)?;
|
||||
|
||||
assert_eq!(parent_generation, 0);
|
||||
assert_eq!(child_generation, 0);
|
||||
assert!(child_thread_id != parent_thread_id);
|
||||
assert_eq!(parent.header("x-openai-subagent"), None);
|
||||
assert_eq!(header(parent, "x-openai-subagent"), None);
|
||||
assert_eq!(header(child, "x-openai-subagent"), Some("collab_spawn"));
|
||||
assert_eq!(
|
||||
child.header("x-openai-subagent").as_deref(),
|
||||
Some("collab_spawn")
|
||||
);
|
||||
assert_eq!(
|
||||
child.header("x-codex-parent-thread-id").as_deref(),
|
||||
header(child, "x-codex-parent-thread-id"),
|
||||
Some(parent_thread_id)
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn submit_turn_with_timeout(test: &TestCodex, prompt: &str) -> Result<()> {
|
||||
async fn submit_turn_with_timeout(test: &TestCodex, prompt: &str, dump_dir: &Path) -> Result<()> {
|
||||
let session_model = test.session_configured.model.clone();
|
||||
test.codex
|
||||
.submit(Op::UserTurn {
|
||||
@@ -152,14 +235,14 @@ async fn submit_turn_with_timeout(test: &TestCodex, prompt: &str) -> Result<()>
|
||||
})
|
||||
.await?;
|
||||
|
||||
let turn_started = wait_for_event_result(test, "turn started", |event| {
|
||||
let turn_started = wait_for_event_result(test, "turn started", dump_dir, |event| {
|
||||
matches!(event, EventMsg::TurnStarted(_))
|
||||
})
|
||||
.await?;
|
||||
let EventMsg::TurnStarted(turn_started) = turn_started else {
|
||||
unreachable!("event predicate only matches turn started events");
|
||||
};
|
||||
wait_for_event_result(test, "turn complete", |event| match event {
|
||||
wait_for_event_result(test, "turn complete", dump_dir, |event| match event {
|
||||
EventMsg::TurnComplete(event) => event.turn_id == turn_started.turn_id,
|
||||
_ => false,
|
||||
})
|
||||
@@ -168,33 +251,10 @@ async fn submit_turn_with_timeout(test: &TestCodex, prompt: &str) -> Result<()>
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_for_matching_request<F>(
|
||||
mock: &ResponseMock,
|
||||
label: &str,
|
||||
mut predicate: F,
|
||||
) -> Result<ResponsesRequest>
|
||||
where
|
||||
F: FnMut(&ResponsesRequest) -> bool,
|
||||
{
|
||||
tokio::time::timeout(TURN_TIMEOUT, async {
|
||||
loop {
|
||||
if let Some(request) = mock
|
||||
.requests()
|
||||
.into_iter()
|
||||
.find(|request| predicate(request))
|
||||
{
|
||||
return request;
|
||||
}
|
||||
tokio::time::sleep(REQUEST_POLL_INTERVAL).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(|_| anyhow!("timed out waiting for {label}"))
|
||||
}
|
||||
|
||||
async fn wait_for_event_result<F>(
|
||||
test: &TestCodex,
|
||||
stage: &str,
|
||||
dump_dir: &Path,
|
||||
mut predicate: F,
|
||||
) -> Result<EventMsg>
|
||||
where
|
||||
@@ -213,8 +273,9 @@ where
|
||||
.await
|
||||
.map_err(|_| {
|
||||
anyhow!(
|
||||
"timed out waiting for {stage}; saw events: {}",
|
||||
seen_events.join(" | ")
|
||||
"timed out waiting for {stage}; saw events: {}; {}",
|
||||
seen_events.join(" | "),
|
||||
proxy_dump_summary(dump_dir)
|
||||
)
|
||||
})?
|
||||
}
|
||||
@@ -229,8 +290,95 @@ fn request_body_contains(req: &wiremock::Request, text: &str) -> bool {
|
||||
std::str::from_utf8(&req.body).is_ok_and(|body| body.contains(text))
|
||||
}
|
||||
|
||||
fn request_header<'a>(req: &'a wiremock::Request, name: &str) -> Option<&'a str> {
|
||||
req.headers.get(name).and_then(|value| value.to_str().ok())
|
||||
fn wait_for_proxy_request_dumps(dump_dir: &Path) -> Result<Vec<Value>> {
|
||||
let deadline = Instant::now() + Duration::from_secs(/*secs*/ 2);
|
||||
loop {
|
||||
let dumps = read_proxy_request_dumps(dump_dir).unwrap_or_default();
|
||||
if dumps.len() >= 3
|
||||
&& dumps
|
||||
.iter()
|
||||
.any(|dump| dump_body_contains(dump, CHILD_PROMPT))
|
||||
{
|
||||
return Ok(dumps);
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
return Err(anyhow!(
|
||||
"timed out waiting for proxy request dumps, got {}",
|
||||
dumps.len()
|
||||
));
|
||||
}
|
||||
std::thread::sleep(PROXY_POLL_INTERVAL);
|
||||
}
|
||||
}
|
||||
|
||||
fn read_proxy_request_dumps(dump_dir: &Path) -> Result<Vec<Value>> {
|
||||
let mut dumps = Vec::new();
|
||||
for entry in std::fs::read_dir(dump_dir)? {
|
||||
let path = entry?.path();
|
||||
if path
|
||||
.file_name()
|
||||
.and_then(|name| name.to_str())
|
||||
.is_some_and(|name| name.ends_with("-request.json"))
|
||||
{
|
||||
let contents = std::fs::read_to_string(&path)?;
|
||||
if contents.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
match serde_json::from_str(&contents) {
|
||||
Ok(dump) => dumps.push(dump),
|
||||
Err(err) if err.is_eof() => continue,
|
||||
Err(err) => return Err(err.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(dumps)
|
||||
}
|
||||
|
||||
fn proxy_dump_summary(dump_dir: &Path) -> String {
|
||||
match read_proxy_request_dumps(dump_dir) {
|
||||
Ok(dumps) => {
|
||||
let bodies = dumps
|
||||
.iter()
|
||||
.filter_map(|dump| dump.get("body"))
|
||||
.map(Value::to_string)
|
||||
.collect::<Vec<_>>()
|
||||
.join("; ");
|
||||
format!("proxy wrote {} request dumps: {bodies}", dumps.len())
|
||||
}
|
||||
Err(err) => format!("failed to read proxy request dumps: {err}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn read_proxy_request_dumps_ignores_in_progress_files() -> Result<()> {
|
||||
let dump_dir = TempDir::new()?;
|
||||
std::fs::write(dump_dir.path().join("empty-request.json"), "")?;
|
||||
std::fs::write(dump_dir.path().join("partial-request.json"), "{\"body\"")?;
|
||||
std::fs::write(
|
||||
dump_dir.path().join("complete-request.json"),
|
||||
serde_json::to_string(&json!({ "body": "ready" }))?,
|
||||
)?;
|
||||
|
||||
assert_eq!(
|
||||
read_proxy_request_dumps(dump_dir.path())?,
|
||||
vec![json!({ "body": "ready" })]
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn dump_body_contains(dump: &Value, text: &str) -> bool {
|
||||
dump.get("body")
|
||||
.is_some_and(|body| body.to_string().contains(text))
|
||||
}
|
||||
|
||||
fn header<'a>(dump: &'a Value, name: &str) -> Option<&'a str> {
|
||||
dump.get("headers")?.as_array()?.iter().find_map(|header| {
|
||||
(header.get("name")?.as_str()?.eq_ignore_ascii_case(name))
|
||||
.then(|| header.get("value")?.as_str())
|
||||
.flatten()
|
||||
})
|
||||
}
|
||||
|
||||
fn split_window_id(window_id: &str) -> Result<(&str, u64)> {
|
||||
|
||||
@@ -11,7 +11,6 @@ pub fn auth_provider_from_auth(
|
||||
return Ok(CoreAuthProvider {
|
||||
token: Some(api_key),
|
||||
account_id: None,
|
||||
is_fedramp_account: false,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -19,7 +18,6 @@ pub fn auth_provider_from_auth(
|
||||
return Ok(CoreAuthProvider {
|
||||
token: Some(token),
|
||||
account_id: None,
|
||||
is_fedramp_account: false,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -28,13 +26,11 @@ pub fn auth_provider_from_auth(
|
||||
Ok(CoreAuthProvider {
|
||||
token: Some(token),
|
||||
account_id: auth.get_account_id(),
|
||||
is_fedramp_account: auth.is_fedramp_account(),
|
||||
})
|
||||
} else {
|
||||
Ok(CoreAuthProvider {
|
||||
token: None,
|
||||
account_id: None,
|
||||
is_fedramp_account: false,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -130,7 +130,6 @@ async fn pro_account_with_no_api_key_uses_chatgpt_auth() {
|
||||
chatgpt_plan_type: Some(InternalPlanType::Known(InternalKnownPlan::Pro)),
|
||||
chatgpt_user_id: Some("user-12345".to_string()),
|
||||
chatgpt_account_id: None,
|
||||
chatgpt_account_is_fedramp: false,
|
||||
raw_jwt: fake_jwt,
|
||||
},
|
||||
access_token: "test-access-token".to_string(),
|
||||
|
||||
@@ -293,12 +293,6 @@ impl CodexAuth {
|
||||
self.get_current_token_data().and_then(|t| t.account_id)
|
||||
}
|
||||
|
||||
/// Returns false if `is_chatgpt_auth()` is false or the token omits the FedRAMP claim.
|
||||
pub fn is_fedramp_account(&self) -> bool {
|
||||
self.get_current_token_data()
|
||||
.is_some_and(|t| t.id_token.is_fedramp_account())
|
||||
}
|
||||
|
||||
/// Returns `None` if `is_chatgpt_auth()` is false.
|
||||
pub fn get_account_email(&self) -> Option<String> {
|
||||
self.get_current_token_data().and_then(|t| t.id_token.email)
|
||||
|
||||
@@ -36,8 +36,6 @@ pub struct IdTokenInfo {
|
||||
pub chatgpt_user_id: Option<String>,
|
||||
/// Organization/workspace identifier associated with the token, if present.
|
||||
pub chatgpt_account_id: Option<String>,
|
||||
/// Whether the selected ChatGPT workspace must route through the FedRAMP edge.
|
||||
pub chatgpt_account_is_fedramp: bool,
|
||||
pub raw_jwt: String,
|
||||
}
|
||||
|
||||
@@ -62,10 +60,6 @@ impl IdTokenInfo {
|
||||
Some(PlanType::Known(plan)) if plan.is_workspace_account()
|
||||
)
|
||||
}
|
||||
|
||||
pub fn is_fedramp_account(&self) -> bool {
|
||||
self.chatgpt_account_is_fedramp
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
@@ -94,8 +88,6 @@ struct AuthClaims {
|
||||
user_id: Option<String>,
|
||||
#[serde(default)]
|
||||
chatgpt_account_id: Option<String>,
|
||||
#[serde(default)]
|
||||
chatgpt_account_is_fedramp: bool,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
@@ -147,7 +139,6 @@ pub fn parse_chatgpt_jwt_claims(jwt: &str) -> Result<IdTokenInfo, IdTokenInfoErr
|
||||
chatgpt_plan_type: auth.chatgpt_plan_type,
|
||||
chatgpt_user_id: auth.chatgpt_user_id.or(auth.user_id),
|
||||
chatgpt_account_id: auth.chatgpt_account_id,
|
||||
chatgpt_account_is_fedramp: auth.chatgpt_account_is_fedramp,
|
||||
}),
|
||||
None => Ok(IdTokenInfo {
|
||||
email,
|
||||
@@ -155,7 +146,6 @@ pub fn parse_chatgpt_jwt_claims(jwt: &str) -> Result<IdTokenInfo, IdTokenInfoErr
|
||||
chatgpt_plan_type: None,
|
||||
chatgpt_user_id: None,
|
||||
chatgpt_account_id: None,
|
||||
chatgpt_account_is_fedramp: false,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -114,22 +114,6 @@ fn id_token_info_handles_missing_fields() {
|
||||
let info = parse_chatgpt_jwt_claims(&fake_jwt).expect("should parse");
|
||||
assert!(info.email.is_none());
|
||||
assert!(info.get_chatgpt_plan_type().is_none());
|
||||
assert_eq!(info.is_fedramp_account(), false);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn id_token_info_parses_fedramp_account_claim() {
|
||||
let fake_jwt = fake_jwt(serde_json::json!({
|
||||
"email": "user@example.com",
|
||||
"https://api.openai.com/auth": {
|
||||
"chatgpt_account_id": "account-fed",
|
||||
"chatgpt_account_is_fedramp": true,
|
||||
}
|
||||
}));
|
||||
|
||||
let info = parse_chatgpt_jwt_claims(&fake_jwt).expect("should parse");
|
||||
assert_eq!(info.chatgpt_account_id.as_deref(), Some("account-fed"));
|
||||
assert_eq!(info.is_fedramp_account(), true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -231,12 +231,6 @@ mod tests {
|
||||
Header::from_bytes(&b"Cookie"[..], &b"user-session=secret"[..]).expect("cookie header"),
|
||||
Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..])
|
||||
.expect("content-type header"),
|
||||
Header::from_bytes(&b"x-codex-window-id"[..], &b"thread-1:0"[..])
|
||||
.expect("window id header"),
|
||||
Header::from_bytes(&b"x-codex-parent-thread-id"[..], &b"parent-thread-1"[..])
|
||||
.expect("parent thread id header"),
|
||||
Header::from_bytes(&b"x-openai-subagent"[..], &b"collab_spawn"[..])
|
||||
.expect("subagent header"),
|
||||
];
|
||||
|
||||
let exchange_dump = dumper
|
||||
@@ -268,18 +262,6 @@ mod tests {
|
||||
{
|
||||
"name": "Content-Type",
|
||||
"value": "application/json"
|
||||
},
|
||||
{
|
||||
"name": "x-codex-window-id",
|
||||
"value": "thread-1:0"
|
||||
},
|
||||
{
|
||||
"name": "x-codex-parent-thread-id",
|
||||
"value": "parent-thread-1"
|
||||
},
|
||||
{
|
||||
"name": "x-openai-subagent",
|
||||
"value": "collab_spawn"
|
||||
}
|
||||
],
|
||||
"body": {
|
||||
|
||||
@@ -384,8 +384,9 @@ mod tests {
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::Duration;
|
||||
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::time::Instant;
|
||||
use tracing_subscriber::filter::Targets;
|
||||
use tracing_subscriber::fmt::writer::MakeWriter;
|
||||
use tracing_subscriber::layer::SubscriberExt;
|
||||
@@ -441,7 +442,6 @@ mod tests {
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
let writer = SharedWriter::default();
|
||||
let layer = start(runtime.clone());
|
||||
|
||||
let subscriber = tracing_subscriber::registry()
|
||||
.with(
|
||||
@@ -452,8 +452,7 @@ mod tests {
|
||||
.with_filter(Targets::new().with_default(tracing::Level::TRACE)),
|
||||
)
|
||||
.with(
|
||||
layer
|
||||
.clone()
|
||||
start(runtime.clone())
|
||||
.with_filter(Targets::new().with_default(tracing::Level::TRACE)),
|
||||
);
|
||||
let guard = subscriber.set_default();
|
||||
@@ -464,7 +463,6 @@ mod tests {
|
||||
});
|
||||
tracing::debug!("threadless-after");
|
||||
|
||||
layer.flush().await;
|
||||
drop(guard);
|
||||
|
||||
let feedback_logs = writer.snapshot();
|
||||
@@ -477,17 +475,24 @@ mod tests {
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n")
|
||||
};
|
||||
let sqlite_logs = String::from_utf8(
|
||||
runtime
|
||||
.query_feedback_logs("thread-1")
|
||||
.await
|
||||
.expect("query feedback logs"),
|
||||
)
|
||||
.expect("valid utf-8");
|
||||
assert_eq!(
|
||||
without_timestamps(&sqlite_logs),
|
||||
without_timestamps(&feedback_logs)
|
||||
);
|
||||
let deadline = Instant::now() + Duration::from_secs(2);
|
||||
loop {
|
||||
let sqlite_logs = String::from_utf8(
|
||||
runtime
|
||||
.query_feedback_logs("thread-1")
|
||||
.await
|
||||
.expect("query feedback logs"),
|
||||
)
|
||||
.expect("valid utf-8");
|
||||
if without_timestamps(&sqlite_logs) == without_timestamps(&feedback_logs) {
|
||||
break;
|
||||
}
|
||||
assert!(
|
||||
Instant::now() < deadline,
|
||||
"sqlite feedback logs did not match feedback formatter output before timeout\nsqlite:\n{sqlite_logs}\nfeedback:\n{feedback_logs}"
|
||||
);
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
}
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
@@ -144,17 +144,6 @@ ON CONFLICT(child_thread_id) DO UPDATE SET
|
||||
.await
|
||||
}
|
||||
|
||||
/// List all spawned descendants of `root_thread_id`.
|
||||
///
|
||||
/// Descendants are returned breadth-first by depth, then by thread id for stable ordering.
|
||||
pub async fn list_thread_spawn_descendants(
|
||||
&self,
|
||||
root_thread_id: ThreadId,
|
||||
) -> anyhow::Result<Vec<ThreadId>> {
|
||||
self.list_thread_spawn_descendants_matching(root_thread_id, /*status*/ None)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Find a direct spawned child of `parent_thread_id` by canonical agent path.
|
||||
pub async fn find_thread_spawn_child_by_path(
|
||||
&self,
|
||||
@@ -1666,11 +1655,5 @@ mod tests {
|
||||
.await
|
||||
.expect("open descendants from child should load");
|
||||
assert_eq!(open_descendants_from_child, vec![grandchild_thread_id]);
|
||||
|
||||
let all_descendants = runtime
|
||||
.list_thread_spawn_descendants(parent_thread_id)
|
||||
.await
|
||||
.expect("all descendants should load");
|
||||
assert_eq!(all_descendants, vec![child_thread_id, grandchild_thread_id]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2378,11 +2378,10 @@ impl App {
|
||||
) -> Result<bool> {
|
||||
match op.view() {
|
||||
AppCommandView::Interrupt => {
|
||||
if let Some(turn_id) = self.active_turn_id_for_thread(thread_id).await {
|
||||
app_server.turn_interrupt(thread_id, turn_id).await?;
|
||||
} else {
|
||||
app_server.startup_interrupt(thread_id).await?;
|
||||
}
|
||||
let Some(turn_id) = self.active_turn_id_for_thread(thread_id).await else {
|
||||
return Ok(true);
|
||||
};
|
||||
app_server.turn_interrupt(thread_id, turn_id).await?;
|
||||
Ok(true)
|
||||
}
|
||||
AppCommandView::UserTurn {
|
||||
@@ -11411,18 +11410,11 @@ guardian_approval = true
|
||||
#[tokio::test]
|
||||
async fn interrupt_without_active_turn_is_treated_as_handled() {
|
||||
let mut app = make_test_app().await;
|
||||
let thread_id = ThreadId::new();
|
||||
let mut app_server =
|
||||
crate::start_embedded_app_server_for_picker(app.chat_widget.config_ref())
|
||||
.await
|
||||
.expect("embedded app server");
|
||||
let started = app_server
|
||||
.start_thread(app.chat_widget.config_ref())
|
||||
.await
|
||||
.expect("thread/start should succeed");
|
||||
let thread_id = started.session.thread_id;
|
||||
app.enqueue_primary_thread_session(started.session, started.turns)
|
||||
.await
|
||||
.expect("primary thread should be registered");
|
||||
let op = AppCommand::interrupt();
|
||||
|
||||
let handled = app
|
||||
|
||||
@@ -476,10 +476,6 @@ impl AppServerSession {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn startup_interrupt(&mut self, thread_id: ThreadId) -> Result<()> {
|
||||
self.turn_interrupt(thread_id, String::new()).await
|
||||
}
|
||||
|
||||
pub(crate) async fn turn_steer(
|
||||
&mut self,
|
||||
thread_id: ThreadId,
|
||||
|
||||
Reference in New Issue
Block a user