mirror of
https://github.com/openai/codex.git
synced 2026-05-10 14:22:30 +00:00
Compare commits
6 Commits
etraut/for
...
dev/rasmus
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b994100b3 | ||
|
|
513dc28717 | ||
|
|
97d4b42583 | ||
|
|
1029742cf7 | ||
|
|
a718b6fd47 | ||
|
|
660153b6de |
12
.codex/skills/code-review-breaking-changes/SKILL.md
Normal file
12
.codex/skills/code-review-breaking-changes/SKILL.md
Normal file
@@ -0,0 +1,12 @@
|
||||
---
|
||||
name: code-breaking-changes
|
||||
description: Breaking changes
|
||||
---
|
||||
|
||||
Search for breaking changes in external integration surfaces:
|
||||
- app-server APIs
|
||||
- CLI parameters
|
||||
- configuration loading
|
||||
- resuming sessions from existing rollouts
|
||||
|
||||
Do not stop after finding one issue; analyze all possible ways breaking changes can happen.
|
||||
11
.codex/skills/code-review-change-size/SKILL.md
Normal file
11
.codex/skills/code-review-change-size/SKILL.md
Normal file
@@ -0,0 +1,11 @@
|
||||
---
|
||||
name: code-review-change-size
|
||||
description: Change size guidance (800 lines)
|
||||
---
|
||||
|
||||
Unless the change is mechanical the total number of changed lines should not exceed 800 lines.
|
||||
For complex logic changes the size should be under 500 lines.
|
||||
|
||||
If the change is larger, explain whether it can be split into reviewable stages and identify the smallest coherent stage to land first.
|
||||
Base the staging suggestion on the actual diff, dependencies, and affected call sites.
|
||||
|
||||
12
.codex/skills/code-review-context/SKILL.md
Normal file
12
.codex/skills/code-review-context/SKILL.md
Normal file
@@ -0,0 +1,12 @@
|
||||
---
|
||||
name: code-review-context
|
||||
description: Model visible context
|
||||
---
|
||||
|
||||
Codex maintains a context (history of messages) that is sent to the model in inference requests.
|
||||
|
||||
1. No history rewrite - the context must be built up incrementally.
|
||||
2. Avoid frequent changes to context that cause cache misses.
|
||||
3. No unbounded items - everything injected in the model context must have a bounded size and a hard cap.
|
||||
4. No items larger than 10K tokens.
|
||||
5. Highlight new individual items that can cross >1k tokens as P0. These need an additional manual review.
|
||||
14
.codex/skills/code-review-testing/SKILL.md
Normal file
14
.codex/skills/code-review-testing/SKILL.md
Normal file
@@ -0,0 +1,14 @@
|
||||
---
|
||||
name: code-review-testing
|
||||
description: Test authoring guidance
|
||||
---
|
||||
|
||||
For agent changes prefer integration tests over unit tests. Integration tests are under `core/suite` and use `test_codex` to set up a test instance of codex.
|
||||
|
||||
Features that change the agent logic MUST add an integration test:
|
||||
- Provide a list of major logic changes and user-facing behaviors that need to be tested.
|
||||
|
||||
If unit tests are needed, put them in a dedicated test file (*_tests.rs).
|
||||
Avoid test-only functions in the main implementation.
|
||||
|
||||
Check whether there are existing helpers to make tests more streamlined and readable.
|
||||
14
.codex/skills/code-review/SKILL.md
Normal file
14
.codex/skills/code-review/SKILL.md
Normal file
@@ -0,0 +1,14 @@
|
||||
---
|
||||
name: code-review
|
||||
description: Run a final code review on a pull request
|
||||
---
|
||||
|
||||
Use subagents to review code using all code-review-* skills in this repository other than this orchestrator. One subagent per skill. Pass full skill path to subagents. Use xhigh reasoning.
|
||||
|
||||
Make sure to return every single issue. You can return an unlimited number of findings.
|
||||
Use raw Markdown to report findings.
|
||||
Number findings for ease of reference.
|
||||
Each finding must include a specific file path and line number.
|
||||
|
||||
If the GitHub user running the review is the owner of the pull request add a `code-reviewed` label.
|
||||
Do not leave GitHub comments unless explicitly asked.
|
||||
19
codex-rs/Cargo.lock
generated
19
codex-rs/Cargo.lock
generated
@@ -1464,6 +1464,7 @@ dependencies = [
|
||||
"codex-git-utils",
|
||||
"codex-login",
|
||||
"codex-mcp",
|
||||
"codex-model-provider-info",
|
||||
"codex-models-manager",
|
||||
"codex-otel",
|
||||
"codex-protocol",
|
||||
@@ -1518,6 +1519,7 @@ dependencies = [
|
||||
"codex-app-server",
|
||||
"codex-app-server-protocol",
|
||||
"codex-arg0",
|
||||
"codex-config",
|
||||
"codex-core",
|
||||
"codex-exec-server",
|
||||
"codex-feedback",
|
||||
@@ -1866,6 +1868,7 @@ name = "codex-config"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"codex-app-server-protocol",
|
||||
"codex-execpolicy",
|
||||
"codex-features",
|
||||
@@ -2891,10 +2894,11 @@ name = "codex-stdio-to-uds"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"codex-uds",
|
||||
"codex-utils-cargo-bin",
|
||||
"pretty_assertions",
|
||||
"tempfile",
|
||||
"uds_windows",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3058,6 +3062,18 @@ dependencies = [
|
||||
"winsplit",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-uds"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"async-io",
|
||||
"pretty_assertions",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"uds_windows",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "codex-utils-absolute-path"
|
||||
version = "0.0.0"
|
||||
@@ -10970,6 +10986,7 @@ checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"futures-sink",
|
||||
"futures-util",
|
||||
"pin-project-lite",
|
||||
|
||||
@@ -90,6 +90,7 @@ members = [
|
||||
"terminal-detection",
|
||||
"test-binary-support",
|
||||
"thread-store",
|
||||
"uds",
|
||||
"codex-experimental-api-macros",
|
||||
"plugin",
|
||||
"model-provider",
|
||||
@@ -175,6 +176,7 @@ codex-test-binary-support = { path = "test-binary-support" }
|
||||
codex-thread-store = { path = "thread-store" }
|
||||
codex-tools = { path = "tools" }
|
||||
codex-tui = { path = "tui" }
|
||||
codex-uds = { path = "uds" }
|
||||
codex-utils-absolute-path = { path = "utils/absolute-path" }
|
||||
codex-utils-approval-presets = { path = "utils/approval-presets" }
|
||||
codex-utils-cache = { path = "utils/cache" }
|
||||
@@ -212,6 +214,7 @@ arc-swap = "1.9.0"
|
||||
assert_cmd = "2"
|
||||
assert_matches = "1.5.0"
|
||||
async-channel = "2.3.1"
|
||||
async-io = "2.6.0"
|
||||
async-stream = "0.3.6"
|
||||
async-trait = "0.1.89"
|
||||
axum = { version = "0.8", default-features = false }
|
||||
|
||||
@@ -15,6 +15,7 @@ workspace = true
|
||||
codex-app-server = { workspace = true }
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
codex-arg0 = { workspace = true }
|
||||
codex-config = { workspace = true }
|
||||
codex-core = { workspace = true }
|
||||
codex-exec-server = { workspace = true }
|
||||
codex-feedback = { workspace = true }
|
||||
|
||||
@@ -41,6 +41,7 @@ use codex_app_server_protocol::Result as JsonRpcResult;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_config::NoopThreadConfigLoader;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
@@ -385,6 +386,7 @@ impl InProcessClientStartArgs {
|
||||
cli_overrides: self.cli_overrides,
|
||||
loader_overrides: self.loader_overrides,
|
||||
cloud_requirements: self.cloud_requirements,
|
||||
thread_config_loader: Arc::new(NoopThreadConfigLoader),
|
||||
feedback: self.feedback,
|
||||
log_db: self.log_db,
|
||||
environment_manager: self.environment_manager,
|
||||
|
||||
@@ -97,6 +97,7 @@ axum = { workspace = true, default-features = false, features = [
|
||||
"tokio",
|
||||
] }
|
||||
core_test_support = { workspace = true }
|
||||
codex-model-provider-info = { workspace = true }
|
||||
codex-utils-cargo-bin = { workspace = true }
|
||||
opentelemetry = { workspace = true }
|
||||
opentelemetry_sdk = { workspace = true }
|
||||
|
||||
@@ -147,7 +147,7 @@ Example with notification opt-out:
|
||||
- `thread/memoryMode/set` — experimental; set a thread’s persisted memory eligibility to `"enabled"` or `"disabled"` for either a loaded thread or a stored rollout; returns `{}` on success.
|
||||
- `memory/reset` — experimental; clear the current `CODEX_HOME/memories` directory and reset persisted memory stage data in sqlite while preserving existing thread memory modes; returns `{}` on success.
|
||||
- `thread/status/changed` — notification emitted when a loaded thread’s status changes (`threadId` + new `status`).
|
||||
- `thread/archive` — move a thread’s rollout file into the archived directory; returns `{}` on success and emits `thread/archived`.
|
||||
- `thread/archive` — move a thread’s rollout file into the archived directory and attempt to move any spawned descendant thread rollout files; returns `{}` on success and emits `thread/archived` for each archived thread.
|
||||
- `thread/unsubscribe` — unsubscribe this connection from thread turn/item events. If this was the last subscriber, the server keeps the thread loaded and unloads it only after it has had no subscribers and no thread activity for 30 minutes, then emits `thread/closed`.
|
||||
- `thread/name/set` — set or update a thread’s user-facing name for either a loaded thread or a persisted rollout; returns `{}` on success and emits `thread/name/updated` to initialized, opted-in clients. Thread names are not required to be unique; name lookups resolve to the most recently updated thread.
|
||||
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success and emits `thread/unarchived`.
|
||||
@@ -448,7 +448,7 @@ Experimental: use `memory/reset` to clear local memory artifacts and sqlite-back
|
||||
|
||||
### Example: Archive a thread
|
||||
|
||||
Use `thread/archive` to move the persisted rollout (stored as a JSONL file on disk) into the archived sessions directory.
|
||||
Use `thread/archive` to move the persisted rollout (stored as a JSONL file on disk) into the archived sessions directory and attempt to move any spawned descendant thread rollouts.
|
||||
|
||||
```json
|
||||
{ "method": "thread/archive", "id": 21, "params": { "threadId": "thr_b" } }
|
||||
|
||||
@@ -510,7 +510,8 @@ pub(crate) async fn apply_bespoke_event_handling(
|
||||
))
|
||||
.await;
|
||||
}
|
||||
RealtimeEvent::ConversationItemDone { .. } => {}
|
||||
RealtimeEvent::ConversationItemDone { .. }
|
||||
| RealtimeEvent::NoopRequested(_) => {}
|
||||
RealtimeEvent::HandoffRequested(handoff) => {
|
||||
let notification = ThreadRealtimeItemAddedNotification {
|
||||
thread_id: conversation_id.to_string(),
|
||||
|
||||
@@ -213,6 +213,7 @@ use codex_backend_client::AddCreditsNudgeCreditType as BackendAddCreditsNudgeCre
|
||||
use codex_backend_client::Client as BackendClient;
|
||||
use codex_chatgpt::connectors;
|
||||
use codex_cloud_requirements::cloud_requirements_loader;
|
||||
use codex_config::ThreadConfigLoader;
|
||||
use codex_config::types::McpServerTransportConfig;
|
||||
use codex_core::CodexThread;
|
||||
use codex_core::ForkSnapshot;
|
||||
@@ -479,6 +480,7 @@ pub(crate) struct CodexMessageProcessor {
|
||||
cli_overrides: Arc<RwLock<Vec<(String, TomlValue)>>>,
|
||||
runtime_feature_enablement: Arc<RwLock<BTreeMap<String, bool>>>,
|
||||
cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
|
||||
thread_config_loader: Arc<dyn ThreadConfigLoader>,
|
||||
active_login: Arc<Mutex<Option<ActiveLogin>>>,
|
||||
pending_thread_unloads: Arc<Mutex<HashSet<ThreadId>>>,
|
||||
thread_state_manager: ThreadStateManager,
|
||||
@@ -639,6 +641,7 @@ pub(crate) struct CodexMessageProcessorArgs {
|
||||
pub(crate) cli_overrides: Arc<RwLock<Vec<(String, TomlValue)>>>,
|
||||
pub(crate) runtime_feature_enablement: Arc<RwLock<BTreeMap<String, bool>>>,
|
||||
pub(crate) cloud_requirements: Arc<RwLock<CloudRequirementsLoader>>,
|
||||
pub(crate) thread_config_loader: Arc<dyn ThreadConfigLoader>,
|
||||
pub(crate) feedback: CodexFeedback,
|
||||
pub(crate) log_db: Option<LogDbLayer>,
|
||||
}
|
||||
@@ -726,6 +729,7 @@ impl CodexMessageProcessor {
|
||||
cli_overrides,
|
||||
runtime_feature_enablement,
|
||||
cloud_requirements,
|
||||
thread_config_loader,
|
||||
feedback,
|
||||
log_db,
|
||||
} = args;
|
||||
@@ -740,6 +744,7 @@ impl CodexMessageProcessor {
|
||||
cli_overrides,
|
||||
runtime_feature_enablement,
|
||||
cloud_requirements,
|
||||
thread_config_loader,
|
||||
active_login: Arc::new(Mutex::new(None)),
|
||||
pending_thread_unloads: Arc::new(Mutex::new(HashSet::new())),
|
||||
thread_state_manager: ThreadStateManager::new(),
|
||||
@@ -2433,12 +2438,14 @@ impl CodexMessageProcessor {
|
||||
};
|
||||
let request_trace = request_context.request_trace();
|
||||
let runtime_feature_enablement = self.current_runtime_feature_enablement();
|
||||
let thread_config_loader = Arc::clone(&self.thread_config_loader);
|
||||
let thread_start_task = async move {
|
||||
Self::thread_start_task(
|
||||
listener_task_context,
|
||||
cli_overrides,
|
||||
runtime_feature_enablement,
|
||||
cloud_requirements,
|
||||
thread_config_loader,
|
||||
request_id,
|
||||
app_server_client_name,
|
||||
app_server_client_version,
|
||||
@@ -2515,6 +2522,7 @@ impl CodexMessageProcessor {
|
||||
cli_overrides: Vec<(String, TomlValue)>,
|
||||
runtime_feature_enablement: BTreeMap<String, bool>,
|
||||
cloud_requirements: CloudRequirementsLoader,
|
||||
thread_config_loader: Arc<dyn ThreadConfigLoader>,
|
||||
request_id: ConnectionRequestId,
|
||||
app_server_client_name: Option<String>,
|
||||
app_server_client_version: Option<String>,
|
||||
@@ -2532,6 +2540,7 @@ impl CodexMessageProcessor {
|
||||
&cli_overrides,
|
||||
config_overrides.clone(),
|
||||
typesafe_overrides.clone(),
|
||||
Arc::clone(&thread_config_loader),
|
||||
&cloud_requirements,
|
||||
&listener_task_context.codex_home,
|
||||
&runtime_feature_enablement,
|
||||
@@ -2613,6 +2622,7 @@ impl CodexMessageProcessor {
|
||||
cli_overrides_for_reload,
|
||||
config_overrides,
|
||||
typesafe_overrides,
|
||||
thread_config_loader,
|
||||
&cloud_requirements,
|
||||
&listener_task_context.codex_home,
|
||||
&runtime_feature_enablement,
|
||||
@@ -2861,8 +2871,36 @@ impl CodexMessageProcessor {
|
||||
}
|
||||
};
|
||||
|
||||
let thread_id_str = thread_id.to_string();
|
||||
if let Err(err) = self
|
||||
let mut thread_ids = vec![thread_id];
|
||||
if let Some(state_db_ctx) = get_state_db(&self.config).await {
|
||||
let descendants = match state_db_ctx.list_thread_spawn_descendants(thread_id).await {
|
||||
Ok(descendants) => descendants,
|
||||
Err(err) => {
|
||||
self.outgoing
|
||||
.send_error(
|
||||
request_id,
|
||||
JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!(
|
||||
"failed to list spawned descendants for thread id {thread_id}: {err}"
|
||||
),
|
||||
data: None,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
let mut seen = HashSet::from([thread_id]);
|
||||
for descendant_id in descendants {
|
||||
if seen.insert(descendant_id) {
|
||||
thread_ids.push(descendant_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut archive_thread_ids = Vec::new();
|
||||
match self
|
||||
.thread_store
|
||||
.read_thread(StoreReadThreadParams {
|
||||
thread_id,
|
||||
@@ -2871,34 +2909,98 @@ impl CodexMessageProcessor {
|
||||
})
|
||||
.await
|
||||
{
|
||||
self.outgoing
|
||||
.send_error(request_id, thread_store_archive_error("archive", err))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
self.prepare_thread_for_archive(thread_id).await;
|
||||
|
||||
match self
|
||||
.thread_store
|
||||
.archive_thread(StoreArchiveThreadParams { thread_id })
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
let response = ThreadArchiveResponse {};
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
let notification = ThreadArchivedNotification {
|
||||
thread_id: thread_id_str,
|
||||
};
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::ThreadArchived(notification))
|
||||
.await;
|
||||
Ok(thread) => {
|
||||
if thread.archived_at.is_none() {
|
||||
archive_thread_ids.push(thread_id);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
self.outgoing
|
||||
.send_error(request_id, thread_store_archive_error("archive", err))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
for descendant_thread_id in thread_ids.into_iter().skip(1) {
|
||||
match self
|
||||
.thread_store
|
||||
.read_thread(StoreReadThreadParams {
|
||||
thread_id: descendant_thread_id,
|
||||
include_archived: true,
|
||||
include_history: false,
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(thread) => {
|
||||
if thread.archived_at.is_none() {
|
||||
archive_thread_ids.push(descendant_thread_id);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to read spawned descendant thread {descendant_thread_id} while archiving {thread_id}: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut archived_thread_ids = Vec::new();
|
||||
let Some((parent_thread_id, descendant_thread_ids)) = archive_thread_ids.split_first()
|
||||
else {
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadArchiveResponse {})
|
||||
.await;
|
||||
return;
|
||||
};
|
||||
|
||||
self.prepare_thread_for_archive(*parent_thread_id).await;
|
||||
match self
|
||||
.thread_store
|
||||
.archive_thread(StoreArchiveThreadParams {
|
||||
thread_id: *parent_thread_id,
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
archived_thread_ids.push(parent_thread_id.to_string());
|
||||
}
|
||||
Err(err) => {
|
||||
self.outgoing
|
||||
.send_error(request_id, thread_store_archive_error("archive", err))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
for descendant_thread_id in descendant_thread_ids.iter().rev().copied() {
|
||||
self.prepare_thread_for_archive(descendant_thread_id).await;
|
||||
match self
|
||||
.thread_store
|
||||
.archive_thread(StoreArchiveThreadParams {
|
||||
thread_id: descendant_thread_id,
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(()) => {
|
||||
archived_thread_ids.push(descendant_thread_id.to_string());
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to archive spawned descendant thread {descendant_thread_id} while archiving {thread_id}: {err}"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadArchiveResponse {})
|
||||
.await;
|
||||
for thread_id in archived_thread_ids {
|
||||
let notification = ThreadArchivedNotification { thread_id };
|
||||
self.outgoing
|
||||
.send_server_notification(ServerNotification::ThreadArchived(notification))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn thread_increment_elicitation(
|
||||
@@ -5120,62 +5222,62 @@ impl CodexMessageProcessor {
|
||||
request_id: ConnectionRequestId,
|
||||
params: GetConversationSummaryParams,
|
||||
) {
|
||||
if let GetConversationSummaryParams::ThreadId { conversation_id } = ¶ms
|
||||
&& let Some(summary) =
|
||||
read_summary_from_state_db_by_thread_id(&self.config, *conversation_id).await
|
||||
{
|
||||
let response = GetConversationSummaryResponse { summary };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
return;
|
||||
}
|
||||
|
||||
let path = match params {
|
||||
GetConversationSummaryParams::RolloutPath { rollout_path } => {
|
||||
if rollout_path.is_relative() {
|
||||
self.config.codex_home.join(&rollout_path).to_path_buf()
|
||||
} else {
|
||||
rollout_path
|
||||
}
|
||||
}
|
||||
GetConversationSummaryParams::ThreadId { conversation_id } => {
|
||||
match codex_core::find_thread_path_by_id_str(
|
||||
&self.config.codex_home,
|
||||
&conversation_id.to_string(),
|
||||
)
|
||||
let fallback_provider = self.config.model_provider_id.as_str();
|
||||
let read_result = match params {
|
||||
GetConversationSummaryParams::ThreadId { conversation_id } => self
|
||||
.thread_store
|
||||
.read_thread(StoreReadThreadParams {
|
||||
thread_id: conversation_id,
|
||||
include_archived: true,
|
||||
include_history: false,
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(Some(p)) => p,
|
||||
_ => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!(
|
||||
"no rollout found for conversation id {conversation_id}"
|
||||
),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
.map_err(|err| conversation_summary_thread_id_read_error(conversation_id, err)),
|
||||
GetConversationSummaryParams::RolloutPath { rollout_path } => {
|
||||
let Some(local_thread_store) = self
|
||||
.thread_store
|
||||
.as_any()
|
||||
.downcast_ref::<LocalThreadStore>()
|
||||
else {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message:
|
||||
"rollout path queries are only supported with the local thread store"
|
||||
.to_string(),
|
||||
data: None,
|
||||
};
|
||||
return self.outgoing.send_error(request_id, error).await;
|
||||
};
|
||||
|
||||
local_thread_store
|
||||
.read_thread_by_rollout_path(
|
||||
rollout_path.clone(),
|
||||
/*include_archived*/ true,
|
||||
/*include_history*/ false,
|
||||
)
|
||||
.await
|
||||
.map_err(|err| conversation_summary_rollout_path_read_error(&rollout_path, err))
|
||||
}
|
||||
};
|
||||
|
||||
let fallback_provider = self.config.model_provider_id.as_str();
|
||||
match read_summary_from_rollout(&path, fallback_provider).await {
|
||||
Ok(summary) => {
|
||||
match read_result {
|
||||
Ok(stored_thread) => {
|
||||
let Some(summary) = summary_from_stored_thread(stored_thread, fallback_provider)
|
||||
else {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message:
|
||||
"failed to load conversation summary: thread is missing rollout path"
|
||||
.to_string(),
|
||||
data: None,
|
||||
};
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
};
|
||||
let response = GetConversationSummaryResponse { summary };
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
Err(err) => {
|
||||
let error = JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!(
|
||||
"failed to load conversation summary from {}: {}",
|
||||
path.display(),
|
||||
err
|
||||
),
|
||||
data: None,
|
||||
};
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
}
|
||||
}
|
||||
@@ -6436,6 +6538,7 @@ impl CodexMessageProcessor {
|
||||
&cli_overrides,
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
self.thread_config_loader.as_ref(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
@@ -9360,6 +9463,7 @@ async fn derive_config_from_params(
|
||||
cli_overrides: &[(String, TomlValue)],
|
||||
request_overrides: Option<HashMap<String, serde_json::Value>>,
|
||||
typesafe_overrides: ConfigOverrides,
|
||||
thread_config_loader: Arc<dyn ThreadConfigLoader>,
|
||||
cloud_requirements: &CloudRequirementsLoader,
|
||||
codex_home: &Path,
|
||||
runtime_feature_enablement: &BTreeMap<String, bool>,
|
||||
@@ -9380,6 +9484,7 @@ async fn derive_config_from_params(
|
||||
.cli_overrides(merged_cli_overrides)
|
||||
.harness_overrides(typesafe_overrides)
|
||||
.cloud_requirements(cloud_requirements.clone())
|
||||
.thread_config_loader(thread_config_loader)
|
||||
.build()
|
||||
.await?;
|
||||
apply_runtime_feature_enablement(&mut config, runtime_feature_enablement);
|
||||
@@ -9541,6 +9646,61 @@ fn thread_store_list_error(err: ThreadStoreError) -> JSONRPCErrorError {
|
||||
}
|
||||
}
|
||||
|
||||
fn conversation_summary_thread_id_read_error(
|
||||
conversation_id: ThreadId,
|
||||
err: ThreadStoreError,
|
||||
) -> JSONRPCErrorError {
|
||||
let no_rollout_message = format!("no rollout found for thread id {conversation_id}");
|
||||
match err {
|
||||
ThreadStoreError::InvalidRequest { message } if message == no_rollout_message => {
|
||||
conversation_summary_not_found_error(conversation_id)
|
||||
}
|
||||
ThreadStoreError::ThreadNotFound { thread_id } if thread_id == conversation_id => {
|
||||
conversation_summary_not_found_error(conversation_id)
|
||||
}
|
||||
ThreadStoreError::InvalidRequest { message } => JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message,
|
||||
data: None,
|
||||
},
|
||||
err => JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!("failed to load conversation summary for {conversation_id}: {err}"),
|
||||
data: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn conversation_summary_not_found_error(conversation_id: ThreadId) -> JSONRPCErrorError {
|
||||
JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message: format!("no rollout found for conversation id {conversation_id}"),
|
||||
data: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn conversation_summary_rollout_path_read_error(
|
||||
path: &Path,
|
||||
err: ThreadStoreError,
|
||||
) -> JSONRPCErrorError {
|
||||
match err {
|
||||
ThreadStoreError::InvalidRequest { message } => JSONRPCErrorError {
|
||||
code: INVALID_REQUEST_ERROR_CODE,
|
||||
message,
|
||||
data: None,
|
||||
},
|
||||
err => JSONRPCErrorError {
|
||||
code: INTERNAL_ERROR_CODE,
|
||||
message: format!(
|
||||
"failed to load conversation summary from {}: {}",
|
||||
path.display(),
|
||||
err
|
||||
),
|
||||
data: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn thread_store_write_error(operation: &str, err: ThreadStoreError) -> JSONRPCErrorError {
|
||||
match err {
|
||||
ThreadStoreError::ThreadNotFound { thread_id } => JSONRPCErrorError {
|
||||
@@ -10255,6 +10415,11 @@ mod tests {
|
||||
use chrono::Utc;
|
||||
use codex_app_server_protocol::ServerRequestPayload;
|
||||
use codex_app_server_protocol::ToolRequestUserInputParams;
|
||||
use codex_config::SessionThreadConfig;
|
||||
use codex_config::StaticThreadConfigLoader;
|
||||
use codex_config::ThreadConfigSource;
|
||||
use codex_model_provider_info::ModelProviderInfo;
|
||||
use codex_model_provider_info::WireApi;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::openai_models::ReasoningEffort;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
@@ -10267,6 +10432,7 @@ mod tests {
|
||||
use pretty_assertions::assert_eq;
|
||||
use serde_json::json;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tempfile::TempDir;
|
||||
|
||||
#[test]
|
||||
@@ -10419,6 +10585,64 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn derive_config_from_params_uses_session_thread_config_model_provider() -> Result<()> {
|
||||
let temp_dir = TempDir::new()?;
|
||||
let session_provider = ModelProviderInfo {
|
||||
name: "session".to_string(),
|
||||
base_url: Some("http://127.0.0.1:8061/api/codex".to_string()),
|
||||
env_key: None,
|
||||
env_key_instructions: None,
|
||||
experimental_bearer_token: None,
|
||||
auth: None,
|
||||
wire_api: WireApi::Responses,
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: None,
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
websocket_connect_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: true,
|
||||
};
|
||||
let config = derive_config_from_params(
|
||||
&[],
|
||||
Some(HashMap::from([
|
||||
("model_provider".to_string(), json!("request")),
|
||||
("features.plugins".to_string(), json!(true)),
|
||||
(
|
||||
"model_providers.session".to_string(),
|
||||
json!({
|
||||
"name": "request",
|
||||
"base_url": "http://127.0.0.1:9999/api/codex",
|
||||
"wire_api": "responses",
|
||||
}),
|
||||
),
|
||||
])),
|
||||
ConfigOverrides::default(),
|
||||
Arc::new(StaticThreadConfigLoader::new(vec![
|
||||
ThreadConfigSource::Session(SessionThreadConfig {
|
||||
model_provider: Some("session".to_string()),
|
||||
model_providers: HashMap::from([(
|
||||
"session".to_string(),
|
||||
session_provider.clone(),
|
||||
)]),
|
||||
features: BTreeMap::from([("plugins".to_string(), false)]),
|
||||
}),
|
||||
])),
|
||||
&CloudRequirementsLoader::default(),
|
||||
temp_dir.path(),
|
||||
&BTreeMap::new(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert_eq!(config.model_provider_id, "session");
|
||||
assert_eq!(config.model_provider, session_provider);
|
||||
assert!(!config.features.enabled(Feature::Plugins));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn collect_resume_override_mismatches_includes_service_tier() {
|
||||
let request = ThreadResumeParams {
|
||||
|
||||
@@ -75,6 +75,7 @@ use codex_app_server_protocol::Result;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_config::ThreadConfigLoader;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config_loader::CloudRequirementsLoader;
|
||||
use codex_core::config_loader::LoaderOverrides;
|
||||
@@ -116,6 +117,8 @@ pub struct InProcessStartArgs {
|
||||
pub loader_overrides: LoaderOverrides,
|
||||
/// Preloaded cloud requirements provider.
|
||||
pub cloud_requirements: CloudRequirementsLoader,
|
||||
/// Loader used to fetch typed thread config sources before a thread starts.
|
||||
pub thread_config_loader: Arc<dyn ThreadConfigLoader>,
|
||||
/// Feedback sink used by app-server/core telemetry and logs.
|
||||
pub feedback: CodexFeedback,
|
||||
/// SQLite tracing layer used to flush recently emitted logs before feedback upload.
|
||||
@@ -397,6 +400,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
|
||||
cli_overrides: args.cli_overrides,
|
||||
loader_overrides: args.loader_overrides,
|
||||
cloud_requirements: args.cloud_requirements,
|
||||
thread_config_loader: args.thread_config_loader,
|
||||
feedback: args.feedback,
|
||||
log_db: args.log_db,
|
||||
config_warnings: args.config_warnings,
|
||||
@@ -731,6 +735,7 @@ mod tests {
|
||||
cli_overrides: Vec::new(),
|
||||
loader_overrides: LoaderOverrides::default(),
|
||||
cloud_requirements: CloudRequirementsLoader::default(),
|
||||
thread_config_loader: Arc::new(codex_config::NoopThreadConfigLoader),
|
||||
feedback: CodexFeedback::new(),
|
||||
log_db: None,
|
||||
environment_manager: Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)),
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_cloud_requirements::cloud_requirements_loader;
|
||||
use codex_config::NoopThreadConfigLoader;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config::ConfigBuilder;
|
||||
use codex_core::config_loader::CloudRequirementsLoader;
|
||||
@@ -416,6 +417,7 @@ pub async fn run_main_with_transport(
|
||||
}
|
||||
};
|
||||
let loader_overrides_for_config_api = loader_overrides.clone();
|
||||
let thread_config_loader = Arc::new(NoopThreadConfigLoader);
|
||||
let mut config_warnings = Vec::new();
|
||||
let config = match ConfigBuilder::default()
|
||||
.cli_overrides(cli_kv_overrides.clone())
|
||||
@@ -659,6 +661,7 @@ pub async fn run_main_with_transport(
|
||||
cli_overrides,
|
||||
loader_overrides,
|
||||
cloud_requirements: cloud_requirements.clone(),
|
||||
thread_config_loader,
|
||||
feedback: feedback.clone(),
|
||||
log_db,
|
||||
config_warnings,
|
||||
|
||||
@@ -63,6 +63,7 @@ use codex_app_server_protocol::ServerRequestPayload;
|
||||
use codex_app_server_protocol::experimental_required_message;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_chatgpt::connectors;
|
||||
use codex_config::ThreadConfigLoader;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::config_loader::CloudRequirementsLoader;
|
||||
@@ -235,6 +236,7 @@ pub(crate) struct MessageProcessorArgs {
|
||||
pub(crate) cli_overrides: Vec<(String, TomlValue)>,
|
||||
pub(crate) loader_overrides: LoaderOverrides,
|
||||
pub(crate) cloud_requirements: CloudRequirementsLoader,
|
||||
pub(crate) thread_config_loader: Arc<dyn ThreadConfigLoader>,
|
||||
pub(crate) feedback: CodexFeedback,
|
||||
pub(crate) log_db: Option<LogDbLayer>,
|
||||
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
|
||||
@@ -256,6 +258,7 @@ impl MessageProcessor {
|
||||
cli_overrides,
|
||||
loader_overrides,
|
||||
cloud_requirements,
|
||||
thread_config_loader,
|
||||
feedback,
|
||||
log_db,
|
||||
config_warnings,
|
||||
@@ -301,6 +304,7 @@ impl MessageProcessor {
|
||||
cli_overrides: cli_overrides.clone(),
|
||||
runtime_feature_enablement: runtime_feature_enablement.clone(),
|
||||
cloud_requirements: cloud_requirements.clone(),
|
||||
thread_config_loader,
|
||||
feedback,
|
||||
log_db,
|
||||
});
|
||||
|
||||
@@ -245,6 +245,7 @@ fn build_test_processor(
|
||||
cli_overrides: Vec::new(),
|
||||
loader_overrides: LoaderOverrides::default(),
|
||||
cloud_requirements: CloudRequirementsLoader::default(),
|
||||
thread_config_loader: Arc::new(codex_config::NoopThreadConfigLoader),
|
||||
feedback: CodexFeedback::new(),
|
||||
log_db: None,
|
||||
config_warnings: Vec::new(),
|
||||
|
||||
@@ -6,6 +6,7 @@ use app_test_support::to_response;
|
||||
use codex_app_server_protocol::ConversationSummary;
|
||||
use codex_app_server_protocol::GetConversationSummaryParams;
|
||||
use codex_app_server_protocol::GetConversationSummaryResponse;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
use codex_protocol::ThreadId;
|
||||
@@ -18,16 +19,18 @@ use tokio::time::timeout;
|
||||
const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
|
||||
const FILENAME_TS: &str = "2025-01-02T12-00-00";
|
||||
const META_RFC3339: &str = "2025-01-02T12:00:00Z";
|
||||
const CREATED_AT_RFC3339: &str = "2025-01-02T12:00:00.000Z";
|
||||
const UPDATED_AT_RFC3339: &str = "2025-01-02T12:00:00.000Z";
|
||||
const PREVIEW: &str = "Summarize this conversation";
|
||||
const MODEL_PROVIDER: &str = "openai";
|
||||
const INVALID_REQUEST_ERROR_CODE: i64 = -32600;
|
||||
|
||||
fn expected_summary(conversation_id: ThreadId, path: PathBuf) -> ConversationSummary {
|
||||
ConversationSummary {
|
||||
conversation_id,
|
||||
path,
|
||||
preview: PREVIEW.to_string(),
|
||||
timestamp: Some(META_RFC3339.to_string()),
|
||||
timestamp: Some(CREATED_AT_RFC3339.to_string()),
|
||||
updated_at: Some(UPDATED_AT_RFC3339.to_string()),
|
||||
model_provider: MODEL_PROVIDER.to_string(),
|
||||
cwd: PathBuf::from("/"),
|
||||
@@ -77,6 +80,37 @@ async fn get_conversation_summary_by_thread_id_reads_rollout() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn get_conversation_summary_by_rollout_path_rejects_remote_thread_store() -> Result<()> {
|
||||
let codex_home = TempDir::new()?;
|
||||
std::fs::write(
|
||||
codex_home.path().join("config.toml"),
|
||||
r#"experimental_thread_store_endpoint = "http://127.0.0.1:1"
|
||||
"#,
|
||||
)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let request_id = mcp
|
||||
.send_get_conversation_summary_request(GetConversationSummaryParams::RolloutPath {
|
||||
rollout_path: PathBuf::from("sessions/2025/01/02/rollout.jsonl"),
|
||||
})
|
||||
.await?;
|
||||
let error: JSONRPCError = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
|
||||
)
|
||||
.await??;
|
||||
|
||||
assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE);
|
||||
assert_eq!(
|
||||
error.error.message,
|
||||
"rollout path queries are only supported with the local thread store"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn get_conversation_summary_by_relative_rollout_path_resolves_from_codex_home() -> Result<()>
|
||||
{
|
||||
|
||||
@@ -175,6 +175,7 @@ async fn mcp_resource_read_returns_error_for_unknown_thread() -> Result<()> {
|
||||
cli_overrides: Vec::new(),
|
||||
loader_overrides,
|
||||
cloud_requirements: CloudRequirementsLoader::default(),
|
||||
thread_config_loader: Arc::new(codex_config::NoopThreadConfigLoader),
|
||||
feedback: CodexFeedback::new(),
|
||||
log_db: None,
|
||||
environment_manager: Arc::new(EnvironmentManager::new(/*exec_server_url*/ None)),
|
||||
|
||||
@@ -1155,7 +1155,7 @@ async fn realtime_webrtc_start_emits_sdp_notification() -> Result<()> {
|
||||
Some("multipart/form-data; boundary=codex-realtime-call-boundary")
|
||||
);
|
||||
let body = String::from_utf8(request.body).context("multipart body should be utf-8")?;
|
||||
let session = r#"{"tool_choice":"auto","type":"realtime","model":"gpt-realtime-1.5","instructions":"backend prompt\n\nstartup context","output_modalities":["audio"],"audio":{"input":{"format":{"type":"audio/pcm","rate":24000},"noise_reduction":{"type":"near_field"},"transcription":{"model":"gpt-4o-mini-transcribe"},"turn_detection":{"type":"server_vad","interrupt_response":true,"create_response":true,"silence_duration_ms":500}},"output":{"format":{"type":"audio/pcm","rate":24000},"voice":"marin"}},"tools":[{"type":"function","name":"background_agent","description":"Send a user request to the background agent. Use this as the default action. Do not rephrase the user's ask or rewrite it in your own words; pass along the user's own words. If the background agent is idle, this starts a new task and returns the final result to the user. If the background agent is already working on a task, this sends the request as guidance to steer that previous task. If the user asks to do something next, later, after this, or once current work finishes, call this tool so the work is actually queued instead of merely promising to do it later.","parameters":{"type":"object","properties":{"prompt":{"type":"string","description":"The user request to delegate to the background agent."}},"required":["prompt"],"additionalProperties":false}}]}"#;
|
||||
let session = r#"{"tool_choice":"auto","type":"realtime","model":"gpt-realtime-1.5","instructions":"backend prompt\n\nstartup context","output_modalities":["audio"],"audio":{"input":{"format":{"type":"audio/pcm","rate":24000},"noise_reduction":{"type":"near_field"},"transcription":{"model":"gpt-4o-mini-transcribe"},"turn_detection":{"type":"server_vad","interrupt_response":true,"create_response":true,"silence_duration_ms":500}},"output":{"format":{"type":"audio/pcm","rate":24000},"voice":"marin"}},"tools":[{"type":"function","name":"background_agent","description":"Send a user request to the background agent. Use this as the default action. Do not rephrase the user's ask or rewrite it in your own words; pass along the user's own words. If the background agent is idle, this starts a new task and returns the final result to the user. If the background agent is already working on a task, this sends the request as guidance to steer that previous task. If the user asks to do something next, later, after this, or once current work finishes, call this tool so the work is actually queued instead of merely promising to do it later.","parameters":{"type":"object","properties":{"prompt":{"type":"string","description":"The user request to delegate to the background agent."}},"required":["prompt"],"additionalProperties":false}},{"type":"function","name":"remain_silent","description":"Call this when the best response is to say nothing. Use it instead of speaking after hidden system/control messages, after background agent updates in silent modes, or whenever acknowledging aloud would be distracting. This tool has no user-visible effect.","parameters":{"type":"object","properties":{},"additionalProperties":false}}]}"#;
|
||||
let session = normalized_json_string(session)?;
|
||||
assert_eq!(
|
||||
body,
|
||||
@@ -2260,6 +2260,10 @@ fn assert_v2_session_update(request: &Value) -> Result<()> {
|
||||
request["session"]["tools"][0]["name"].as_str(),
|
||||
Some("background_agent")
|
||||
);
|
||||
assert_eq!(
|
||||
request["session"]["tools"][1]["name"].as_str(),
|
||||
Some("remain_silent")
|
||||
);
|
||||
assert_eq!(
|
||||
request["session"]["audio"]["input"]["transcription"]["model"].as_str(),
|
||||
Some("gpt-4o-mini-transcribe")
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use anyhow::Result;
|
||||
use app_test_support::McpProcess;
|
||||
use app_test_support::create_fake_rollout;
|
||||
use app_test_support::create_mock_responses_server_repeating_assistant;
|
||||
use app_test_support::to_response;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
@@ -19,7 +20,11 @@ use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_app_server_protocol::UserInput;
|
||||
use codex_core::ARCHIVED_SESSIONS_SUBDIR;
|
||||
use codex_core::find_archived_thread_path_by_id_str;
|
||||
use codex_core::find_thread_path_by_id_str;
|
||||
use codex_protocol::ThreadId;
|
||||
use codex_state::DirectionalThreadSpawnEdgeStatus;
|
||||
use codex_state::StateRuntime;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::Path;
|
||||
use tempfile::TempDir;
|
||||
@@ -160,6 +165,311 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_archive_archives_spawned_descendants() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let parent_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-00-00",
|
||||
"2025-01-01T00:00:00Z",
|
||||
"parent",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let child_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-01-00",
|
||||
"2025-01-01T00:01:00Z",
|
||||
"child",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let grandchild_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-02-00",
|
||||
"2025-01-01T00:02:00Z",
|
||||
"grandchild",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
|
||||
let parent_thread_id = ThreadId::from_string(&parent_id)?;
|
||||
let child_thread_id = ThreadId::from_string(&child_id)?;
|
||||
let grandchild_thread_id = ThreadId::from_string(&grandchild_id)?;
|
||||
let state_db =
|
||||
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
|
||||
state_db
|
||||
.mark_backfill_complete(/*last_watermark*/ None)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
child_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Closed,
|
||||
)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
child_thread_id,
|
||||
grandchild_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let archive_id = mcp
|
||||
.send_thread_archive_request(ThreadArchiveParams {
|
||||
thread_id: parent_id.clone(),
|
||||
})
|
||||
.await?;
|
||||
let archive_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
|
||||
|
||||
let mut archived_ids = Vec::new();
|
||||
for _ in 0..3 {
|
||||
let notification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/archived"),
|
||||
)
|
||||
.await??;
|
||||
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
|
||||
notification
|
||||
.params
|
||||
.expect("thread/archived notification params"),
|
||||
)?;
|
||||
archived_ids.push(archived_notification.thread_id);
|
||||
}
|
||||
assert_eq!(archived_ids, vec![parent_id, grandchild_id, child_id]);
|
||||
|
||||
for thread_id in [parent_thread_id, child_thread_id, grandchild_thread_id] {
|
||||
assert!(
|
||||
find_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
|
||||
.await?
|
||||
.is_none(),
|
||||
"expected active rollout for {thread_id} to be archived"
|
||||
);
|
||||
assert!(
|
||||
find_archived_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
|
||||
.await?
|
||||
.is_some(),
|
||||
"expected archived rollout for {thread_id} to exist"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_archive_succeeds_when_descendant_archive_fails() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let parent_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-00-00",
|
||||
"2025-01-01T00:00:00Z",
|
||||
"parent",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let child_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-01-00",
|
||||
"2025-01-01T00:01:00Z",
|
||||
"child",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let grandchild_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-02-00",
|
||||
"2025-01-01T00:02:00Z",
|
||||
"grandchild",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
|
||||
let parent_thread_id = ThreadId::from_string(&parent_id)?;
|
||||
let child_thread_id = ThreadId::from_string(&child_id)?;
|
||||
let grandchild_thread_id = ThreadId::from_string(&grandchild_id)?;
|
||||
let state_db =
|
||||
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
|
||||
state_db
|
||||
.mark_backfill_complete(/*last_watermark*/ None)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
child_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Closed,
|
||||
)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
child_thread_id,
|
||||
grandchild_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Open,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let child_rollout_path = find_thread_path_by_id_str(codex_home.path(), &child_id)
|
||||
.await?
|
||||
.expect("child rollout path");
|
||||
let archived_child_path = codex_home
|
||||
.path()
|
||||
.join(ARCHIVED_SESSIONS_SUBDIR)
|
||||
.join(child_rollout_path.file_name().expect("rollout file name"));
|
||||
std::fs::create_dir_all(&archived_child_path)?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let archive_id = mcp
|
||||
.send_thread_archive_request(ThreadArchiveParams {
|
||||
thread_id: parent_id.clone(),
|
||||
})
|
||||
.await?;
|
||||
let archive_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
|
||||
|
||||
let mut archived_ids = Vec::new();
|
||||
for _ in 0..2 {
|
||||
let notification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/archived"),
|
||||
)
|
||||
.await??;
|
||||
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
|
||||
notification
|
||||
.params
|
||||
.expect("thread/archived notification params"),
|
||||
)?;
|
||||
archived_ids.push(archived_notification.thread_id);
|
||||
}
|
||||
assert_eq!(archived_ids, vec![parent_id, grandchild_id]);
|
||||
|
||||
assert!(
|
||||
timeout(
|
||||
std::time::Duration::from_millis(250),
|
||||
mcp.read_stream_until_notification_message("thread/archived"),
|
||||
)
|
||||
.await
|
||||
.is_err()
|
||||
);
|
||||
|
||||
assert!(
|
||||
child_rollout_path.exists(),
|
||||
"child should stay active after descendant archive failure"
|
||||
);
|
||||
assert!(
|
||||
archived_child_path.is_dir(),
|
||||
"test conflict should remain in archived sessions"
|
||||
);
|
||||
for thread_id in [parent_thread_id, grandchild_thread_id] {
|
||||
assert!(
|
||||
find_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
|
||||
.await?
|
||||
.is_none(),
|
||||
"expected active rollout for {thread_id} to be archived"
|
||||
);
|
||||
assert!(
|
||||
find_archived_thread_path_by_id_str(codex_home.path(), &thread_id.to_string())
|
||||
.await?
|
||||
.is_some(),
|
||||
"expected archived rollout for {thread_id} to exist"
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_archive_succeeds_when_spawned_descendant_is_missing() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
let codex_home = TempDir::new()?;
|
||||
create_config_toml(codex_home.path(), &server.uri())?;
|
||||
|
||||
let parent_id = create_fake_rollout(
|
||||
codex_home.path(),
|
||||
"2025-01-01T00-00-00",
|
||||
"2025-01-01T00:00:00Z",
|
||||
"parent",
|
||||
Some("mock_provider"),
|
||||
/*git_info*/ None,
|
||||
)?;
|
||||
let parent_thread_id = ThreadId::from_string(&parent_id)?;
|
||||
let missing_child_thread_id = ThreadId::from_string("00000000-0000-0000-0000-000000000901")?;
|
||||
|
||||
let state_db =
|
||||
StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?;
|
||||
state_db
|
||||
.mark_backfill_complete(/*last_watermark*/ None)
|
||||
.await?;
|
||||
state_db
|
||||
.upsert_thread_spawn_edge(
|
||||
parent_thread_id,
|
||||
missing_child_thread_id,
|
||||
DirectionalThreadSpawnEdgeStatus::Closed,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut mcp = McpProcess::new(codex_home.path()).await?;
|
||||
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;
|
||||
|
||||
let archive_id = mcp
|
||||
.send_thread_archive_request(ThreadArchiveParams {
|
||||
thread_id: parent_id.clone(),
|
||||
})
|
||||
.await?;
|
||||
let archive_resp: JSONRPCResponse = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_response_message(RequestId::Integer(archive_id)),
|
||||
)
|
||||
.await??;
|
||||
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
|
||||
|
||||
let notification = timeout(
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
mcp.read_stream_until_notification_message("thread/archived"),
|
||||
)
|
||||
.await??;
|
||||
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
|
||||
notification
|
||||
.params
|
||||
.expect("thread/archived notification params"),
|
||||
)?;
|
||||
assert_eq!(archived_notification.thread_id, parent_id);
|
||||
|
||||
assert!(
|
||||
find_thread_path_by_id_str(codex_home.path(), &parent_id)
|
||||
.await?
|
||||
.is_none(),
|
||||
"parent should be archived even when a descendant is missing"
|
||||
);
|
||||
assert!(
|
||||
find_archived_thread_path_by_id_str(codex_home.path(), &parent_id)
|
||||
.await?
|
||||
.is_some(),
|
||||
"parent should be moved into archived sessions"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn thread_archive_clears_stale_subscriptions_before_resume() -> Result<()> {
|
||||
let server = create_mock_responses_server_repeating_assistant("Done").await;
|
||||
|
||||
@@ -1090,8 +1090,7 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
|
||||
"stdio-to-uds",
|
||||
)?;
|
||||
let socket_path = cmd.socket_path;
|
||||
tokio::task::spawn_blocking(move || codex_stdio_to_uds::run(socket_path.as_path()))
|
||||
.await??;
|
||||
codex_stdio_to_uds::run(socket_path.as_path()).await?;
|
||||
}
|
||||
Some(Subcommand::ExecServer(cmd)) => {
|
||||
reject_remote_mode_for_subcommand(
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::endpoint::realtime_websocket::methods_common::conversation_handoff_append_message;
|
||||
use crate::endpoint::realtime_websocket::methods_common::conversation_function_call_output_message;
|
||||
use crate::endpoint::realtime_websocket::methods_common::conversation_item_create_message;
|
||||
use crate::endpoint::realtime_websocket::methods_common::normalized_session_mode;
|
||||
use crate::endpoint::realtime_websocket::methods_common::session_update_session;
|
||||
@@ -230,13 +230,13 @@ impl RealtimeWebsocketConnection {
|
||||
self.writer.send_conversation_item_create(text).await
|
||||
}
|
||||
|
||||
pub async fn send_conversation_handoff_append(
|
||||
pub async fn send_conversation_function_call_output(
|
||||
&self,
|
||||
handoff_id: String,
|
||||
call_id: String,
|
||||
output_text: String,
|
||||
) -> Result<(), ApiError> {
|
||||
self.writer
|
||||
.send_conversation_handoff_append(handoff_id, output_text)
|
||||
.send_conversation_function_call_output(call_id, output_text)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -290,14 +290,14 @@ impl RealtimeWebsocketWriter {
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn send_conversation_handoff_append(
|
||||
pub async fn send_conversation_function_call_output(
|
||||
&self,
|
||||
handoff_id: String,
|
||||
call_id: String,
|
||||
output_text: String,
|
||||
) -> Result<(), ApiError> {
|
||||
self.send_json(&conversation_handoff_append_message(
|
||||
self.send_json(&conversation_function_call_output_message(
|
||||
self.event_parser,
|
||||
handoff_id,
|
||||
call_id,
|
||||
output_text,
|
||||
))
|
||||
.await
|
||||
@@ -471,6 +471,7 @@ impl RealtimeWebsocketEvents {
|
||||
| RealtimeEvent::ResponseCancelled(_)
|
||||
| RealtimeEvent::ResponseDone(_)
|
||||
| RealtimeEvent::ConversationItemDone { .. }
|
||||
| RealtimeEvent::NoopRequested(_)
|
||||
| RealtimeEvent::ConversationItemAdded(_)
|
||||
| RealtimeEvent::Error(_) => {}
|
||||
}
|
||||
@@ -825,6 +826,7 @@ mod tests {
|
||||
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptEntry;
|
||||
use codex_protocol::protocol::RealtimeHandoffRequested;
|
||||
use codex_protocol::protocol::RealtimeInputAudioSpeechStarted;
|
||||
use codex_protocol::protocol::RealtimeNoopRequested;
|
||||
use codex_protocol::protocol::RealtimeResponseCancelled;
|
||||
use codex_protocol::protocol::RealtimeResponseCreated;
|
||||
use codex_protocol::protocol::RealtimeResponseDone;
|
||||
@@ -1090,6 +1092,29 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_realtime_v2_noop_tool_call_event() {
|
||||
let payload = json!({
|
||||
"type": "conversation.item.done",
|
||||
"item": {
|
||||
"id": "item_silent",
|
||||
"type": "function_call",
|
||||
"name": "remain_silent",
|
||||
"call_id": "call_silent",
|
||||
"arguments": "{}"
|
||||
}
|
||||
})
|
||||
.to_string();
|
||||
|
||||
assert_eq!(
|
||||
parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2),
|
||||
Some(RealtimeEvent::NoopRequested(RealtimeNoopRequested {
|
||||
call_id: "call_silent".to_string(),
|
||||
item_id: "item_silent".to_string(),
|
||||
}))
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_realtime_v2_input_audio_transcription_delta_event() {
|
||||
let payload = json!({
|
||||
@@ -1689,7 +1714,7 @@ mod tests {
|
||||
.await
|
||||
.expect("send item");
|
||||
connection
|
||||
.send_conversation_handoff_append(
|
||||
.send_conversation_function_call_output(
|
||||
"handoff_1".to_string(),
|
||||
"hello from background agent".to_string(),
|
||||
)
|
||||
@@ -1850,6 +1875,18 @@ mod tests {
|
||||
first_json["session"]["tools"][0]["parameters"]["required"],
|
||||
json!(["prompt"])
|
||||
);
|
||||
assert_eq!(
|
||||
first_json["session"]["tools"][1]["type"],
|
||||
Value::String("function".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
first_json["session"]["tools"][1]["name"],
|
||||
Value::String("remain_silent".to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
first_json["session"]["tools"][1]["parameters"]["properties"],
|
||||
json!({})
|
||||
);
|
||||
assert_eq!(
|
||||
first_json["session"]["tool_choice"],
|
||||
Value::String("auto".to_string())
|
||||
@@ -1961,7 +1998,10 @@ mod tests {
|
||||
.await
|
||||
.expect("send text item");
|
||||
connection
|
||||
.send_conversation_handoff_append("call_1".to_string(), "delegated result".to_string())
|
||||
.send_conversation_function_call_output(
|
||||
"call_1".to_string(),
|
||||
"delegated result".to_string(),
|
||||
)
|
||||
.await
|
||||
.expect("send handoff output");
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@ use crate::endpoint::realtime_websocket::methods_v1::conversation_handoff_append
|
||||
use crate::endpoint::realtime_websocket::methods_v1::conversation_item_create_message as v1_conversation_item_create_message;
|
||||
use crate::endpoint::realtime_websocket::methods_v1::session_update_session as v1_session_update_session;
|
||||
use crate::endpoint::realtime_websocket::methods_v1::websocket_intent as v1_websocket_intent;
|
||||
use crate::endpoint::realtime_websocket::methods_v2::conversation_handoff_append_message as v2_conversation_handoff_append_message;
|
||||
use crate::endpoint::realtime_websocket::methods_v2::conversation_function_call_output_message as v2_conversation_function_call_output_message;
|
||||
use crate::endpoint::realtime_websocket::methods_v2::conversation_item_create_message as v2_conversation_item_create_message;
|
||||
use crate::endpoint::realtime_websocket::methods_v2::session_update_session as v2_session_update_session;
|
||||
use crate::endpoint::realtime_websocket::methods_v2::websocket_intent as v2_websocket_intent;
|
||||
@@ -40,18 +40,18 @@ pub(super) fn conversation_item_create_message(
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn conversation_handoff_append_message(
|
||||
pub(super) fn conversation_function_call_output_message(
|
||||
event_parser: RealtimeEventParser,
|
||||
handoff_id: String,
|
||||
call_id: String,
|
||||
output_text: String,
|
||||
) -> RealtimeOutboundMessage {
|
||||
match event_parser {
|
||||
RealtimeEventParser::V1 => v1_conversation_handoff_append_message(
|
||||
handoff_id,
|
||||
call_id,
|
||||
format!("{AGENT_FINAL_MESSAGE_PREFIX}{output_text}"),
|
||||
),
|
||||
RealtimeEventParser::RealtimeV2 => {
|
||||
v2_conversation_handoff_append_message(handoff_id, output_text)
|
||||
v2_conversation_function_call_output_message(call_id, output_text)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +32,8 @@ const REALTIME_V2_OUTPUT_MODALITY_TEXT: &str = "text";
|
||||
const REALTIME_V2_TOOL_CHOICE: &str = "auto";
|
||||
const REALTIME_V2_BACKGROUND_AGENT_TOOL_NAME: &str = "background_agent";
|
||||
const REALTIME_V2_BACKGROUND_AGENT_TOOL_DESCRIPTION: &str = "Send a user request to the background agent. Use this as the default action. Do not rephrase the user's ask or rewrite it in your own words; pass along the user's own words. If the background agent is idle, this starts a new task and returns the final result to the user. If the background agent is already working on a task, this sends the request as guidance to steer that previous task. If the user asks to do something next, later, after this, or once current work finishes, call this tool so the work is actually queued instead of merely promising to do it later.";
|
||||
const REALTIME_V2_SILENCE_TOOL_NAME: &str = "remain_silent";
|
||||
const REALTIME_V2_SILENCE_TOOL_DESCRIPTION: &str = "Call this when the best response is to say nothing. Use it instead of speaking after hidden system/control messages, after background agent updates in silent modes, or whenever acknowledging aloud would be distracting. This tool has no user-visible effect.";
|
||||
const REALTIME_V2_INPUT_TRANSCRIPTION_MODEL: &str = "gpt-4o-mini-transcribe";
|
||||
|
||||
pub(super) fn conversation_item_create_message(text: String) -> RealtimeOutboundMessage {
|
||||
@@ -47,14 +49,14 @@ pub(super) fn conversation_item_create_message(text: String) -> RealtimeOutbound
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn conversation_handoff_append_message(
|
||||
handoff_id: String,
|
||||
pub(super) fn conversation_function_call_output_message(
|
||||
call_id: String,
|
||||
output_text: String,
|
||||
) -> RealtimeOutboundMessage {
|
||||
RealtimeOutboundMessage::ConversationItemCreate {
|
||||
item: ConversationItemPayload::FunctionCallOutput(ConversationFunctionCallOutputItem {
|
||||
r#type: ConversationItemType::FunctionCallOutput,
|
||||
call_id: handoff_id,
|
||||
call_id,
|
||||
output: output_text,
|
||||
}),
|
||||
}
|
||||
@@ -100,22 +102,34 @@ pub(super) fn session_update_session(
|
||||
voice,
|
||||
}),
|
||||
},
|
||||
tools: Some(vec![SessionFunctionTool {
|
||||
r#type: SessionToolType::Function,
|
||||
name: REALTIME_V2_BACKGROUND_AGENT_TOOL_NAME.to_string(),
|
||||
description: REALTIME_V2_BACKGROUND_AGENT_TOOL_DESCRIPTION.to_string(),
|
||||
parameters: json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"prompt": {
|
||||
"type": "string",
|
||||
"description": "The user request to delegate to the background agent."
|
||||
}
|
||||
},
|
||||
"required": ["prompt"],
|
||||
"additionalProperties": false
|
||||
}),
|
||||
}]),
|
||||
tools: Some(vec![
|
||||
SessionFunctionTool {
|
||||
r#type: SessionToolType::Function,
|
||||
name: REALTIME_V2_BACKGROUND_AGENT_TOOL_NAME.to_string(),
|
||||
description: REALTIME_V2_BACKGROUND_AGENT_TOOL_DESCRIPTION.to_string(),
|
||||
parameters: json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"prompt": {
|
||||
"type": "string",
|
||||
"description": "The user request to delegate to the background agent."
|
||||
}
|
||||
},
|
||||
"required": ["prompt"],
|
||||
"additionalProperties": false
|
||||
}),
|
||||
},
|
||||
SessionFunctionTool {
|
||||
r#type: SessionToolType::Function,
|
||||
name: REALTIME_V2_SILENCE_TOOL_NAME.to_string(),
|
||||
description: REALTIME_V2_SILENCE_TOOL_DESCRIPTION.to_string(),
|
||||
parameters: json!({
|
||||
"type": "object",
|
||||
"properties": {},
|
||||
"additionalProperties": false
|
||||
}),
|
||||
},
|
||||
]),
|
||||
tool_choice: Some(REALTIME_V2_TOOL_CHOICE.to_string()),
|
||||
},
|
||||
RealtimeSessionMode::Transcription => SessionUpdateSession {
|
||||
|
||||
@@ -7,6 +7,7 @@ use codex_protocol::protocol::RealtimeAudioFrame;
|
||||
use codex_protocol::protocol::RealtimeEvent;
|
||||
use codex_protocol::protocol::RealtimeHandoffRequested;
|
||||
use codex_protocol::protocol::RealtimeInputAudioSpeechStarted;
|
||||
use codex_protocol::protocol::RealtimeNoopRequested;
|
||||
use codex_protocol::protocol::RealtimeResponseCancelled;
|
||||
use codex_protocol::protocol::RealtimeResponseCreated;
|
||||
use codex_protocol::protocol::RealtimeResponseDone;
|
||||
@@ -15,6 +16,7 @@ use serde_json::Value;
|
||||
use tracing::debug;
|
||||
|
||||
const BACKGROUND_AGENT_TOOL_NAME: &str = "background_agent";
|
||||
const SILENCE_TOOL_NAME: &str = "remain_silent";
|
||||
const DEFAULT_AUDIO_SAMPLE_RATE: u32 = 24_000;
|
||||
const DEFAULT_AUDIO_CHANNELS: u16 = 1;
|
||||
const TOOL_ARGUMENT_KEYS: [&str; 5] = ["input_transcript", "input", "text", "prompt", "query"];
|
||||
@@ -127,6 +129,9 @@ fn parse_conversation_item_done_event(parsed: &Value) -> Option<RealtimeEvent> {
|
||||
if let Some(handoff) = parse_handoff_requested_event(item) {
|
||||
return Some(handoff);
|
||||
}
|
||||
if let Some(noop) = parse_noop_requested_event(item) {
|
||||
return Some(noop);
|
||||
}
|
||||
|
||||
item.get("id")
|
||||
.and_then(Value::as_str)
|
||||
@@ -160,6 +165,29 @@ fn parse_handoff_requested_event(item: &JsonMap<String, Value>) -> Option<Realti
|
||||
}))
|
||||
}
|
||||
|
||||
fn parse_noop_requested_event(item: &JsonMap<String, Value>) -> Option<RealtimeEvent> {
|
||||
let item_type = item.get("type").and_then(Value::as_str);
|
||||
let item_name = item.get("name").and_then(Value::as_str);
|
||||
if item_type != Some("function_call") || item_name != Some(SILENCE_TOOL_NAME) {
|
||||
return None;
|
||||
}
|
||||
|
||||
let call_id = item
|
||||
.get("call_id")
|
||||
.and_then(Value::as_str)
|
||||
.or_else(|| item.get("id").and_then(Value::as_str))?;
|
||||
let item_id = item
|
||||
.get("id")
|
||||
.and_then(Value::as_str)
|
||||
.unwrap_or(call_id)
|
||||
.to_string();
|
||||
|
||||
Some(RealtimeEvent::NoopRequested(RealtimeNoopRequested {
|
||||
call_id: call_id.to_string(),
|
||||
item_id,
|
||||
}))
|
||||
}
|
||||
|
||||
fn extract_input_transcript(arguments: &str) -> String {
|
||||
if arguments.is_empty() {
|
||||
return String::new();
|
||||
|
||||
@@ -9,6 +9,7 @@ workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
codex-execpolicy = { workspace = true }
|
||||
codex-features = { workspace = true }
|
||||
|
||||
@@ -18,6 +18,7 @@ pub mod schema;
|
||||
pub mod shell_environment;
|
||||
mod skills_config;
|
||||
mod state;
|
||||
mod thread_config;
|
||||
pub mod types;
|
||||
|
||||
pub const CONFIG_TOML_FILE: &str = "config.toml";
|
||||
@@ -93,5 +94,14 @@ pub use state::ConfigLayerEntry;
|
||||
pub use state::ConfigLayerStack;
|
||||
pub use state::ConfigLayerStackOrdering;
|
||||
pub use state::LoaderOverrides;
|
||||
pub use thread_config::NoopThreadConfigLoader;
|
||||
pub use thread_config::SessionThreadConfig;
|
||||
pub use thread_config::StaticThreadConfigLoader;
|
||||
pub use thread_config::ThreadConfigContext;
|
||||
pub use thread_config::ThreadConfigLoadError;
|
||||
pub use thread_config::ThreadConfigLoadErrorCode;
|
||||
pub use thread_config::ThreadConfigLoader;
|
||||
pub use thread_config::ThreadConfigSource;
|
||||
pub use thread_config::UserThreadConfig;
|
||||
|
||||
pub use codex_app_server_protocol::ConfigLayerSource;
|
||||
|
||||
313
codex-rs/config/src/thread_config.rs
Normal file
313
codex-rs/config/src/thread_config.rs
Normal file
@@ -0,0 +1,313 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use codex_app_server_protocol::ConfigLayerSource;
|
||||
use codex_model_provider_info::ModelProviderInfo;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use thiserror::Error;
|
||||
use toml::Value as TomlValue;
|
||||
|
||||
use crate::ConfigLayerEntry;
|
||||
|
||||
/// Context available to implementations when loading thread-scoped config.
|
||||
#[derive(Clone, Debug, Default, Eq, PartialEq)]
|
||||
pub struct ThreadConfigContext {
|
||||
pub thread_id: Option<String>,
|
||||
pub cwd: Option<AbsolutePathBuf>,
|
||||
}
|
||||
|
||||
/// Config values owned by the service that starts or manages the session.
|
||||
#[derive(Clone, Debug, Default, PartialEq)]
|
||||
pub struct SessionThreadConfig {
|
||||
pub model_provider: Option<String>,
|
||||
pub model_providers: HashMap<String, ModelProviderInfo>,
|
||||
pub features: BTreeMap<String, bool>,
|
||||
}
|
||||
|
||||
/// Config values owned by the authenticated user.
|
||||
#[derive(Clone, Debug, Default, Eq, PartialEq)]
|
||||
pub struct UserThreadConfig {}
|
||||
|
||||
/// A typed config payload paired with the authority that produced it.
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub enum ThreadConfigSource {
|
||||
Session(SessionThreadConfig),
|
||||
User(UserThreadConfig),
|
||||
}
|
||||
|
||||
/// Stable category for failures returned while loading thread config.
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub enum ThreadConfigLoadErrorCode {
|
||||
Auth,
|
||||
Timeout,
|
||||
Parse,
|
||||
RequestFailed,
|
||||
Internal,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, Error, PartialEq)]
|
||||
#[error("{message}")]
|
||||
pub struct ThreadConfigLoadError {
|
||||
code: ThreadConfigLoadErrorCode,
|
||||
message: String,
|
||||
status_code: Option<u16>,
|
||||
}
|
||||
|
||||
impl ThreadConfigLoadError {
|
||||
pub fn new(
|
||||
code: ThreadConfigLoadErrorCode,
|
||||
status_code: Option<u16>,
|
||||
message: impl Into<String>,
|
||||
) -> Self {
|
||||
Self {
|
||||
code,
|
||||
message: message.into(),
|
||||
status_code,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn code(&self) -> ThreadConfigLoadErrorCode {
|
||||
self.code
|
||||
}
|
||||
|
||||
pub fn status_code(&self) -> Option<u16> {
|
||||
self.status_code
|
||||
}
|
||||
}
|
||||
|
||||
/// Loads typed config sources for a new thread.
|
||||
///
|
||||
/// Implementations should fetch only the source-specific config they own and
|
||||
/// return typed payloads without applying precedence or merge rules. Callers
|
||||
/// are responsible for resolving the returned sources into the effective
|
||||
/// runtime config.
|
||||
#[async_trait]
|
||||
pub trait ThreadConfigLoader: Send + Sync {
|
||||
/// Load source-specific typed config.
|
||||
///
|
||||
/// Implementations should keep this method focused on fetching and parsing
|
||||
/// their owned sources. Most callers should use [`Self::load_config_layers`]
|
||||
/// so precedence and merging continue through the ordinary config layer
|
||||
/// stack.
|
||||
async fn load(
|
||||
&self,
|
||||
context: ThreadConfigContext,
|
||||
) -> Result<Vec<ThreadConfigSource>, ThreadConfigLoadError>;
|
||||
|
||||
async fn load_config_layers(
|
||||
&self,
|
||||
context: ThreadConfigContext,
|
||||
) -> Result<Vec<ConfigLayerEntry>, ThreadConfigLoadError> {
|
||||
let sources = self.load(context).await?;
|
||||
sources
|
||||
.into_iter()
|
||||
.map(thread_config_source_to_layer)
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map(|layers| layers.into_iter().flatten().collect())
|
||||
}
|
||||
}
|
||||
|
||||
/// Loader backed by a static set of typed thread config sources.
|
||||
#[derive(Clone, Debug, Default, PartialEq)]
|
||||
pub struct StaticThreadConfigLoader {
|
||||
sources: Vec<ThreadConfigSource>,
|
||||
}
|
||||
|
||||
impl StaticThreadConfigLoader {
|
||||
pub fn new(sources: Vec<ThreadConfigSource>) -> Self {
|
||||
Self { sources }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ThreadConfigLoader for StaticThreadConfigLoader {
|
||||
async fn load(
|
||||
&self,
|
||||
_context: ThreadConfigContext,
|
||||
) -> Result<Vec<ThreadConfigSource>, ThreadConfigLoadError> {
|
||||
Ok(self.sources.clone())
|
||||
}
|
||||
}
|
||||
|
||||
/// Loader used when no external thread config source is configured.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct NoopThreadConfigLoader;
|
||||
|
||||
#[async_trait]
|
||||
impl ThreadConfigLoader for NoopThreadConfigLoader {
|
||||
async fn load(
|
||||
&self,
|
||||
_context: ThreadConfigContext,
|
||||
) -> Result<Vec<ThreadConfigSource>, ThreadConfigLoadError> {
|
||||
Ok(Vec::new())
|
||||
}
|
||||
}
|
||||
|
||||
fn thread_config_source_to_layer(
|
||||
source: ThreadConfigSource,
|
||||
) -> Result<Option<ConfigLayerEntry>, ThreadConfigLoadError> {
|
||||
match source {
|
||||
ThreadConfigSource::Session(config) => {
|
||||
let config = session_thread_config_to_toml(config)?;
|
||||
if is_empty_table(&config) {
|
||||
Ok(None)
|
||||
} else {
|
||||
Ok(Some(ConfigLayerEntry::new(
|
||||
ConfigLayerSource::SessionFlags,
|
||||
config,
|
||||
)))
|
||||
}
|
||||
}
|
||||
// UserThreadConfig has no TOML-backed fields yet. When it grows one,
|
||||
// fold it into the existing user layer instead of adding another
|
||||
// ConfigLayerSource variant.
|
||||
ThreadConfigSource::User(_config) => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn is_empty_table(config: &TomlValue) -> bool {
|
||||
config.as_table().is_some_and(toml::map::Map::is_empty)
|
||||
}
|
||||
|
||||
fn session_thread_config_to_toml(
|
||||
config: SessionThreadConfig,
|
||||
) -> Result<TomlValue, ThreadConfigLoadError> {
|
||||
let mut table = toml::map::Map::new();
|
||||
|
||||
if let Some(model_provider) = config.model_provider {
|
||||
table.insert(
|
||||
"model_provider".to_string(),
|
||||
TomlValue::String(model_provider),
|
||||
);
|
||||
}
|
||||
|
||||
if !config.model_providers.is_empty() {
|
||||
let model_providers = TomlValue::try_from(config.model_providers).map_err(|err| {
|
||||
ThreadConfigLoadError::new(
|
||||
ThreadConfigLoadErrorCode::Parse,
|
||||
/*status_code*/ None,
|
||||
format!("failed to convert session model providers to config TOML: {err}"),
|
||||
)
|
||||
})?;
|
||||
table.insert("model_providers".to_string(), model_providers);
|
||||
}
|
||||
|
||||
if !config.features.is_empty() {
|
||||
let features = config
|
||||
.features
|
||||
.into_iter()
|
||||
.map(|(feature, enabled)| (feature, TomlValue::Boolean(enabled)))
|
||||
.collect();
|
||||
table.insert("features".to_string(), TomlValue::Table(features));
|
||||
}
|
||||
|
||||
Ok(TomlValue::Table(table))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use codex_model_provider_info::ModelProviderInfo;
|
||||
use codex_model_provider_info::WireApi;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn loader_returns_session_and_user_sources() {
|
||||
let loader = StaticThreadConfigLoader::new(vec![
|
||||
ThreadConfigSource::Session(SessionThreadConfig {
|
||||
model_provider: Some("local".to_string()),
|
||||
model_providers: HashMap::from([("local".to_string(), test_provider("local"))]),
|
||||
features: BTreeMap::from([("plugins".to_string(), false)]),
|
||||
}),
|
||||
ThreadConfigSource::User(UserThreadConfig::default()),
|
||||
]);
|
||||
|
||||
let sources = loader
|
||||
.load(ThreadConfigContext {
|
||||
thread_id: Some("thread-1".to_string()),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("thread config loads");
|
||||
|
||||
assert_eq!(
|
||||
sources,
|
||||
vec![
|
||||
ThreadConfigSource::Session(SessionThreadConfig {
|
||||
model_provider: Some("local".to_string()),
|
||||
model_providers: HashMap::from([("local".to_string(), test_provider("local"))]),
|
||||
features: BTreeMap::from([("plugins".to_string(), false)]),
|
||||
}),
|
||||
ThreadConfigSource::User(UserThreadConfig::default()),
|
||||
]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn loader_translates_sources_to_config_layers() {
|
||||
let loader = StaticThreadConfigLoader::new(vec![
|
||||
ThreadConfigSource::User(UserThreadConfig::default()),
|
||||
ThreadConfigSource::Session(SessionThreadConfig {
|
||||
model_provider: Some("local".to_string()),
|
||||
model_providers: HashMap::from([("local".to_string(), test_provider("local"))]),
|
||||
features: BTreeMap::from([("plugins".to_string(), false)]),
|
||||
}),
|
||||
]);
|
||||
let layers = loader
|
||||
.load_config_layers(ThreadConfigContext {
|
||||
cwd: Some(
|
||||
AbsolutePathBuf::from_absolute_path_checked(
|
||||
std::env::temp_dir().join("project"),
|
||||
)
|
||||
.expect("absolute cwd"),
|
||||
),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("thread config layers load");
|
||||
|
||||
assert_eq!(
|
||||
layers,
|
||||
vec![ConfigLayerEntry::new(
|
||||
ConfigLayerSource::SessionFlags,
|
||||
toml::toml! {
|
||||
model_provider = "local"
|
||||
|
||||
[model_providers.local]
|
||||
name = "local"
|
||||
base_url = "http://127.0.0.1:8061/api/codex"
|
||||
wire_api = "responses"
|
||||
requires_openai_auth = false
|
||||
supports_websockets = true
|
||||
|
||||
[features]
|
||||
plugins = false
|
||||
}
|
||||
.into()
|
||||
)]
|
||||
);
|
||||
}
|
||||
|
||||
fn test_provider(name: &str) -> ModelProviderInfo {
|
||||
ModelProviderInfo {
|
||||
name: name.to_string(),
|
||||
base_url: Some("http://127.0.0.1:8061/api/codex".to_string()),
|
||||
env_key: None,
|
||||
env_key_instructions: None,
|
||||
experimental_bearer_token: None,
|
||||
auth: None,
|
||||
wire_api: WireApi::Responses,
|
||||
query_params: None,
|
||||
http_headers: None,
|
||||
env_http_headers: None,
|
||||
request_max_retries: None,
|
||||
stream_max_retries: None,
|
||||
stream_idle_timeout_ms: None,
|
||||
websocket_connect_timeout_ms: None,
|
||||
requires_openai_auth: false,
|
||||
supports_websockets: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2110,6 +2110,7 @@ async fn managed_config_overrides_oauth_store_mode() -> anyhow::Result<()> {
|
||||
&Vec::new(),
|
||||
overrides,
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
let cfg =
|
||||
@@ -2244,6 +2245,7 @@ async fn managed_config_wins_over_cli_overrides() -> anyhow::Result<()> {
|
||||
&[("model".to_string(), TomlValue::String("cli".to_string()))],
|
||||
overrides,
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ use crate::unified_exec::MIN_EMPTY_YIELD_TIME_MS;
|
||||
use crate::windows_sandbox::WindowsSandboxLevelExt;
|
||||
use crate::windows_sandbox::resolve_windows_sandbox_mode;
|
||||
use crate::windows_sandbox::resolve_windows_sandbox_private_desktop;
|
||||
use codex_config::ThreadConfigLoader;
|
||||
use codex_config::config_toml::ConfigToml;
|
||||
use codex_config::config_toml::ProjectConfig;
|
||||
use codex_config::config_toml::RealtimeAudioConfig;
|
||||
@@ -90,6 +91,7 @@ use std::collections::HashMap;
|
||||
use std::io::ErrorKind;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::config::permissions::compile_permission_profile;
|
||||
use crate::config::permissions::get_readable_roots_required_for_codex_runtime;
|
||||
@@ -646,13 +648,14 @@ impl AuthManagerConfig for Config {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
#[derive(Clone, Default)]
|
||||
pub struct ConfigBuilder {
|
||||
codex_home: Option<PathBuf>,
|
||||
cli_overrides: Option<Vec<(String, TomlValue)>>,
|
||||
harness_overrides: Option<ConfigOverrides>,
|
||||
loader_overrides: Option<LoaderOverrides>,
|
||||
cloud_requirements: CloudRequirementsLoader,
|
||||
thread_config_loader: Option<Arc<dyn ThreadConfigLoader>>,
|
||||
fallback_cwd: Option<PathBuf>,
|
||||
}
|
||||
|
||||
@@ -682,6 +685,14 @@ impl ConfigBuilder {
|
||||
self
|
||||
}
|
||||
|
||||
pub fn thread_config_loader(
|
||||
mut self,
|
||||
thread_config_loader: Arc<dyn ThreadConfigLoader>,
|
||||
) -> Self {
|
||||
self.thread_config_loader = Some(thread_config_loader);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn fallback_cwd(mut self, fallback_cwd: Option<PathBuf>) -> Self {
|
||||
self.fallback_cwd = fallback_cwd;
|
||||
self
|
||||
@@ -694,6 +705,7 @@ impl ConfigBuilder {
|
||||
harness_overrides,
|
||||
loader_overrides,
|
||||
cloud_requirements,
|
||||
thread_config_loader,
|
||||
fallback_cwd,
|
||||
} = self;
|
||||
let codex_home = match codex_home {
|
||||
@@ -716,6 +728,9 @@ impl ConfigBuilder {
|
||||
&cli_overrides,
|
||||
loader_overrides,
|
||||
cloud_requirements,
|
||||
thread_config_loader
|
||||
.as_deref()
|
||||
.unwrap_or(&codex_config::NoopThreadConfigLoader),
|
||||
)
|
||||
.await?;
|
||||
let merged_toml = config_layer_stack.effective_config();
|
||||
@@ -894,6 +909,7 @@ pub async fn load_config_as_toml_with_cli_and_loader_overrides(
|
||||
&cli_overrides,
|
||||
loader_overrides,
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1065,6 +1081,7 @@ pub async fn load_global_mcp_servers(
|
||||
&cli_overrides,
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
let merged_toml = config_layer_stack.effective_config();
|
||||
|
||||
@@ -431,6 +431,7 @@ impl ConfigService {
|
||||
&self.cli_overrides,
|
||||
self.loader_overrides.clone(),
|
||||
self.cloud_requirements.clone(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ This module is the canonical place to **load and describe Codex configuration la
|
||||
|
||||
Exported from `codex_core::config_loader`:
|
||||
|
||||
- `load_config_layers_state(fs, codex_home, cwd_opt, cli_overrides, overrides, cloud_requirements) -> ConfigLayerStack`
|
||||
- `load_config_layers_state(fs, codex_home, cwd_opt, cli_overrides, overrides, cloud_requirements, thread_config_loader) -> ConfigLayerStack`
|
||||
- `ConfigLayerStack`
|
||||
- `effective_config() -> toml::Value`
|
||||
- `origins() -> HashMap<String, ConfigLayerMetadata>`
|
||||
@@ -29,6 +29,9 @@ Precedence is **top overrides bottom**:
|
||||
3. **Session flags** (CLI overrides, applied as dotted-path TOML writes)
|
||||
4. **User** config (`config.toml`)
|
||||
|
||||
Thread config entries supplied by `thread_config_loader` are inserted according
|
||||
to their translated `ConfigLayerSource` precedence.
|
||||
|
||||
Layers with a `disabled_reason` are still surfaced for UI, but are ignored when
|
||||
computing the effective config and origins metadata. This is what
|
||||
`ConfigLayerStack::effective_config()` implements.
|
||||
@@ -41,6 +44,7 @@ Most callers want the effective config plus metadata:
|
||||
use codex_core::config_loader::{
|
||||
CloudRequirementsLoader, LoaderOverrides, load_config_layers_state,
|
||||
};
|
||||
use codex_config::NoopThreadConfigLoader;
|
||||
use codex_exec_server::LOCAL_FS;
|
||||
use codex_utils_absolute_path::AbsolutePathBuf;
|
||||
use toml::Value as TomlValue;
|
||||
@@ -54,6 +58,7 @@ let layers = load_config_layers_state(
|
||||
&cli_overrides,
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
&NoopThreadConfigLoader,
|
||||
).await?;
|
||||
|
||||
let effective = layers.effective_config();
|
||||
|
||||
@@ -9,6 +9,8 @@ use crate::config_loader::layer_io::LoadedConfigLayers;
|
||||
use codex_app_server_protocol::ConfigLayerSource;
|
||||
use codex_config::CONFIG_TOML_FILE;
|
||||
use codex_config::ConfigRequirementsWithSources;
|
||||
use codex_config::ThreadConfigContext;
|
||||
use codex_config::ThreadConfigLoader;
|
||||
use codex_config::config_toml::ConfigToml;
|
||||
use codex_config::config_toml::ProjectConfig;
|
||||
use codex_exec_server::ExecutorFileSystem;
|
||||
@@ -127,6 +129,7 @@ pub async fn load_config_layers_state(
|
||||
cli_overrides: &[(String, TomlValue)],
|
||||
overrides: LoaderOverrides,
|
||||
cloud_requirements: CloudRequirementsLoader,
|
||||
thread_config_loader: &dyn ThreadConfigLoader,
|
||||
) -> io::Result<ConfigLayerStack> {
|
||||
let ignore_user_config = overrides.ignore_user_config;
|
||||
let ignore_user_and_project_exec_policy_rules =
|
||||
@@ -161,6 +164,15 @@ pub async fn load_config_layers_state(
|
||||
)
|
||||
.await?;
|
||||
|
||||
let thread_config_context = ThreadConfigContext {
|
||||
thread_id: None,
|
||||
cwd: cwd.clone(),
|
||||
};
|
||||
let thread_config_layers = thread_config_loader
|
||||
.load_config_layers(thread_config_context)
|
||||
.await
|
||||
.map_err(io::Error::other)?;
|
||||
|
||||
let mut layers = Vec::<ConfigLayerEntry>::new();
|
||||
|
||||
let cli_overrides_layer = if cli_overrides.is_empty() {
|
||||
@@ -283,6 +295,10 @@ pub async fn load_config_layers_state(
|
||||
));
|
||||
}
|
||||
|
||||
for thread_config_layer in thread_config_layers {
|
||||
insert_layer_by_precedence(&mut layers, thread_config_layer);
|
||||
}
|
||||
|
||||
// Make a best-effort to support the legacy `managed_config.toml` as a
|
||||
// config layer on top of everything else. For fields in
|
||||
// `managed_config.toml` that do not have an equivalent in
|
||||
@@ -332,6 +348,16 @@ pub async fn load_config_layers_state(
|
||||
.with_user_and_project_exec_policy_rules_ignored(ignore_user_and_project_exec_policy_rules))
|
||||
}
|
||||
|
||||
fn insert_layer_by_precedence(layers: &mut Vec<ConfigLayerEntry>, layer: ConfigLayerEntry) {
|
||||
match layers
|
||||
.iter()
|
||||
.position(|existing| existing.name.precedence() > layer.name.precedence())
|
||||
{
|
||||
Some(index) => layers.insert(index, layer),
|
||||
None => layers.push(layer),
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempts to load a config.toml file from `config_toml`.
|
||||
/// - If the file exists and is valid TOML, passes the parsed `toml::Value` to
|
||||
/// `create_entry` and returns the resulting layer entry.
|
||||
|
||||
@@ -15,6 +15,9 @@ use crate::config_loader::RequirementSource;
|
||||
use crate::config_loader::load_requirements_toml;
|
||||
use crate::config_loader::version_for_toml;
|
||||
use codex_config::CONFIG_TOML_FILE;
|
||||
use codex_config::SessionThreadConfig;
|
||||
use codex_config::StaticThreadConfigLoader;
|
||||
use codex_config::ThreadConfigSource;
|
||||
use codex_config::config_toml::ConfigToml;
|
||||
use codex_config::config_toml::ProjectConfig;
|
||||
use codex_exec_server::LOCAL_FS;
|
||||
@@ -100,6 +103,7 @@ async fn returns_config_error_for_invalid_user_config_toml() {
|
||||
&[] as &[(String, TomlValue)],
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await
|
||||
.expect_err("expected error");
|
||||
@@ -131,6 +135,7 @@ async fn ignore_user_config_keeps_empty_user_layer() -> std::io::Result<()> {
|
||||
..Default::default()
|
||||
},
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -161,6 +166,7 @@ async fn ignore_rules_marks_config_stack_for_exec_policy_rule_skip() -> std::io:
|
||||
..Default::default()
|
||||
},
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -185,6 +191,7 @@ async fn returns_config_error_for_invalid_managed_config_toml() {
|
||||
&[] as &[(String, TomlValue)],
|
||||
overrides,
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await
|
||||
.expect_err("expected error");
|
||||
@@ -270,6 +277,7 @@ extra = true
|
||||
&[] as &[(String, TomlValue)],
|
||||
overrides,
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await
|
||||
.expect("load config");
|
||||
@@ -303,6 +311,7 @@ async fn returns_empty_when_all_layers_missing() {
|
||||
&[] as &[(String, TomlValue)],
|
||||
overrides,
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await
|
||||
.expect("load layers");
|
||||
@@ -354,6 +363,56 @@ async fn returns_empty_when_all_layers_missing() {
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn includes_thread_config_layers_in_stack() -> anyhow::Result<()> {
|
||||
let tmp = tempdir()?;
|
||||
let cwd_dir = tmp.path().join("project");
|
||||
tokio::fs::create_dir_all(&cwd_dir).await?;
|
||||
let cwd = AbsolutePathBuf::from_absolute_path(&cwd_dir)?;
|
||||
let layers = load_config_layers_state(
|
||||
LOCAL_FS.as_ref(),
|
||||
tmp.path(),
|
||||
Some(cwd),
|
||||
&[("features.plugins".to_string(), TomlValue::Boolean(true))],
|
||||
LoaderOverrides::without_managed_config_for_tests(),
|
||||
CloudRequirementsLoader::default(),
|
||||
&StaticThreadConfigLoader::new(vec![ThreadConfigSource::Session(SessionThreadConfig {
|
||||
features: BTreeMap::from([("plugins".to_string(), false)]),
|
||||
..Default::default()
|
||||
})]),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let layer_sources = layers
|
||||
.layers_high_to_low()
|
||||
.into_iter()
|
||||
.map(|layer| layer.name.clone())
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(
|
||||
layer_sources,
|
||||
vec![
|
||||
super::ConfigLayerSource::SessionFlags,
|
||||
super::ConfigLayerSource::SessionFlags,
|
||||
super::ConfigLayerSource::User {
|
||||
file: AbsolutePathBuf::resolve_path_against_base(CONFIG_TOML_FILE, tmp.path()),
|
||||
},
|
||||
super::ConfigLayerSource::System {
|
||||
file: super::system_config_toml_file()?,
|
||||
},
|
||||
]
|
||||
);
|
||||
assert_eq!(
|
||||
layers
|
||||
.effective_config()
|
||||
.get("features")
|
||||
.and_then(TomlValue::as_table)
|
||||
.and_then(|features| features.get("plugins")),
|
||||
Some(&TomlValue::Boolean(false))
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(target_os = "macos")]
|
||||
#[tokio::test]
|
||||
async fn managed_preferences_take_highest_precedence() {
|
||||
@@ -396,6 +455,7 @@ flag = false
|
||||
&[] as &[(String, TomlValue)],
|
||||
overrides,
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await
|
||||
.expect("load config");
|
||||
@@ -498,6 +558,7 @@ allowed_sandbox_modes = ["read-only"]
|
||||
&[] as &[(String, TomlValue)],
|
||||
loader_overrides,
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -560,6 +621,7 @@ allowed_approval_policies = ["never"]
|
||||
&[] as &[(String, TomlValue)],
|
||||
loader_overrides,
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -713,6 +775,7 @@ allowed_approval_policies = ["on-request"]
|
||||
guardian_policy_config: None,
|
||||
}))
|
||||
}),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -931,6 +994,7 @@ async fn load_config_layers_includes_cloud_requirements() -> anyhow::Result<()>
|
||||
&[] as &[(String, TomlValue)],
|
||||
LoaderOverrides::default(),
|
||||
cloud_requirements,
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -974,6 +1038,7 @@ async fn load_config_layers_fails_when_cloud_requirements_loader_fails() -> anyh
|
||||
"cloud requirements failed",
|
||||
))
|
||||
}),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await
|
||||
.expect_err("cloud requirements failure should fail closed");
|
||||
@@ -1021,6 +1086,7 @@ async fn project_layers_prefer_closest_cwd() -> std::io::Result<()> {
|
||||
&[] as &[(String, TomlValue)],
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1166,6 +1232,7 @@ async fn project_layer_is_added_when_dot_codex_exists_without_config_toml() -> s
|
||||
&[] as &[(String, TomlValue)],
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1206,6 +1273,7 @@ async fn codex_home_is_not_loaded_as_project_layer_from_home_dir() -> std::io::R
|
||||
&[] as &[(String, TomlValue)],
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1263,6 +1331,7 @@ async fn codex_home_within_project_tree_is_not_double_loaded() -> std::io::Resul
|
||||
&[] as &[(String, TomlValue)],
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1334,6 +1403,7 @@ async fn project_layers_disabled_when_untrusted_or_unknown() -> std::io::Result<
|
||||
&[] as &[(String, TomlValue)],
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
let project_layers_untrusted: Vec<_> = layers_untrusted
|
||||
@@ -1373,6 +1443,7 @@ async fn project_layers_disabled_when_untrusted_or_unknown() -> std::io::Result<
|
||||
&[] as &[(String, TomlValue)],
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
let project_layers_unknown: Vec<_> = layers_unknown
|
||||
@@ -1439,6 +1510,7 @@ async fn project_trust_does_not_match_configured_alias_for_canonical_cwd() -> st
|
||||
&[] as &[(String, TomlValue)],
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1592,6 +1664,7 @@ async fn invalid_project_config_ignored_when_untrusted_or_unknown() -> std::io::
|
||||
&[] as &[(String, TomlValue)],
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
let project_layers: Vec<_> = layers
|
||||
@@ -1660,6 +1733,7 @@ async fn project_layer_without_config_toml_is_disabled_when_untrusted_or_unknown
|
||||
&[] as &[(String, TomlValue)],
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
let project_layers: Vec<_> = layers
|
||||
@@ -1720,6 +1794,7 @@ async fn cli_overrides_with_relative_paths_do_not_break_trust_check() -> std::io
|
||||
&cli_overrides,
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1763,6 +1838,7 @@ async fn project_root_markers_supports_alternate_markers() -> std::io::Result<()
|
||||
&[] as &[(String, TomlValue)],
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -53,6 +53,7 @@ async fn build_config_state_with_mtimes() -> Result<(ConfigState, Vec<LayerMtime
|
||||
&cli_overrides,
|
||||
overrides,
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await
|
||||
.context("failed to load Codex config")?;
|
||||
|
||||
@@ -1105,7 +1105,7 @@ async fn handle_handoff_output(
|
||||
output_text,
|
||||
} => {
|
||||
writer
|
||||
.send_conversation_handoff_append(handoff_id, output_text)
|
||||
.send_conversation_function_call_output(handoff_id, output_text)
|
||||
.await
|
||||
}
|
||||
},
|
||||
@@ -1129,7 +1129,7 @@ async fn handle_handoff_output(
|
||||
output_text: _,
|
||||
} => {
|
||||
if let Err(err) = writer
|
||||
.send_conversation_handoff_append(
|
||||
.send_conversation_function_call_output(
|
||||
handoff_id,
|
||||
REALTIME_V2_HANDOFF_COMPLETE_ACKNOWLEDGEMENT.to_string(),
|
||||
)
|
||||
@@ -1263,7 +1263,7 @@ async fn handle_realtime_server_event(
|
||||
match active_handoff {
|
||||
Some(_) => {
|
||||
if let Err(err) = writer
|
||||
.send_conversation_handoff_append(
|
||||
.send_conversation_function_call_output(
|
||||
handoff.handoff_id.clone(),
|
||||
REALTIME_V2_STEER_ACKNOWLEDGEMENT.to_string(),
|
||||
)
|
||||
@@ -1292,6 +1292,27 @@ async fn handle_realtime_server_event(
|
||||
}
|
||||
false
|
||||
}
|
||||
RealtimeEvent::NoopRequested(noop) => {
|
||||
*output_audio_state = None;
|
||||
|
||||
match session_kind {
|
||||
RealtimeSessionKind::V1 => {}
|
||||
RealtimeSessionKind::V2 => {
|
||||
if let Err(err) = writer
|
||||
.send_conversation_function_call_output(noop.call_id.clone(), String::new())
|
||||
.await
|
||||
{
|
||||
let mapped_error = map_api_error(err);
|
||||
warn!("failed to send realtime noop function output: {mapped_error}");
|
||||
let _ = events_tx
|
||||
.send(RealtimeEvent::Error(mapped_error.to_string()))
|
||||
.await;
|
||||
return Err(mapped_error.into());
|
||||
}
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
RealtimeEvent::Error(_) => true,
|
||||
RealtimeEvent::SessionUpdated { session_id, .. } => {
|
||||
info!(realtime_session_id = %session_id, "realtime session updated");
|
||||
|
||||
@@ -545,6 +545,7 @@ pub async fn list_skills(sess: &Session, sub_id: String, cwds: Vec<PathBuf>, for
|
||||
empty_cli_overrides,
|
||||
LoaderOverrides::default(),
|
||||
CloudRequirementsLoader::default(),
|
||||
&codex_config::NoopThreadConfigLoader,
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
||||
@@ -22,6 +22,7 @@ use codex_protocol::protocol::RealtimeAudioFrame;
|
||||
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
|
||||
use codex_protocol::protocol::RealtimeConversationVersion;
|
||||
use codex_protocol::protocol::RealtimeEvent;
|
||||
use codex_protocol::protocol::RealtimeNoopRequested;
|
||||
use codex_protocol::protocol::RealtimeOutputModality;
|
||||
use codex_protocol::protocol::RealtimeVoice;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
@@ -2121,6 +2122,92 @@ async fn conversation_user_text_turn_is_capped_when_mirrored_to_realtime() -> Re
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn realtime_v2_noop_tool_call_returns_empty_function_output_without_response() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let api_server = start_mock_server().await;
|
||||
let realtime_server = start_websocket_server(vec![vec![
|
||||
vec![
|
||||
json!({
|
||||
"type": "session.updated",
|
||||
"session": { "id": "sess_silent", "instructions": "backend prompt" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.item.done",
|
||||
"item": {
|
||||
"id": "item_silent",
|
||||
"type": "function_call",
|
||||
"name": "remain_silent",
|
||||
"call_id": "call_silent",
|
||||
"arguments": "{}"
|
||||
}
|
||||
}),
|
||||
],
|
||||
vec![],
|
||||
]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
config.realtime.version = RealtimeWsVersion::V2;
|
||||
}
|
||||
});
|
||||
let test = builder.build(&api_server).await?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
output_modality: RealtimeOutputModality::Audio,
|
||||
prompt: Some(Some("backend prompt".to_string())),
|
||||
session_id: None,
|
||||
transport: None,
|
||||
voice: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::NoopRequested(RealtimeNoopRequested { call_id, .. }),
|
||||
}) if call_id == "call_silent" => Some(()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
let function_output = realtime_server
|
||||
.wait_for_request(/*connection_index*/ 0, /*request_index*/ 1)
|
||||
.await;
|
||||
assert_eq!(
|
||||
function_output.body_json(),
|
||||
json!({
|
||||
"type": "conversation.item.create",
|
||||
"item": {
|
||||
"type": "function_call_output",
|
||||
"call_id": "call_silent",
|
||||
"output": ""
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
let realtime_response_create = timeout(Duration::from_millis(200), async {
|
||||
wait_for_matching_websocket_request(
|
||||
&realtime_server,
|
||||
"unexpected realtime response request for noop tool call",
|
||||
|request| request.body_json()["type"].as_str() == Some("response.create"),
|
||||
)
|
||||
.await
|
||||
})
|
||||
.await;
|
||||
assert!(
|
||||
realtime_response_create.is_err(),
|
||||
"noop tool calls should not request a realtime response"
|
||||
);
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn conversation_mirrors_assistant_message_text_to_realtime_handoff() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
@@ -329,6 +329,12 @@ pub struct RealtimeHandoffRequested {
|
||||
pub active_transcript: Vec<RealtimeTranscriptEntry>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
|
||||
pub struct RealtimeNoopRequested {
|
||||
pub call_id: String,
|
||||
pub item_id: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)]
|
||||
pub struct RealtimeInputAudioSpeechStarted {
|
||||
pub item_id: Option<String>,
|
||||
@@ -369,6 +375,7 @@ pub enum RealtimeEvent {
|
||||
item_id: String,
|
||||
},
|
||||
HandoffRequested(RealtimeHandoffRequested),
|
||||
NoopRequested(RealtimeNoopRequested),
|
||||
Error(String),
|
||||
}
|
||||
|
||||
|
||||
@@ -145,6 +145,17 @@ ON CONFLICT(child_thread_id) DO UPDATE SET
|
||||
.await
|
||||
}
|
||||
|
||||
/// List all spawned descendants of `root_thread_id`.
|
||||
///
|
||||
/// Descendants are returned breadth-first by depth, then by thread id for stable ordering.
|
||||
pub async fn list_thread_spawn_descendants(
|
||||
&self,
|
||||
root_thread_id: ThreadId,
|
||||
) -> anyhow::Result<Vec<ThreadId>> {
|
||||
self.list_thread_spawn_descendants_matching(root_thread_id, /*status*/ None)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Find a direct spawned child of `parent_thread_id` by canonical agent path.
|
||||
pub async fn find_thread_spawn_child_by_path(
|
||||
&self,
|
||||
@@ -1761,5 +1772,11 @@ mod tests {
|
||||
.await
|
||||
.expect("open descendants from child should load");
|
||||
assert_eq!(open_descendants_from_child, vec![grandchild_thread_id]);
|
||||
|
||||
let all_descendants = runtime
|
||||
.list_thread_spawn_descendants(parent_thread_id)
|
||||
.await
|
||||
.expect("all descendants should load");
|
||||
assert_eq!(all_descendants, vec![child_thread_id, grandchild_thread_id]);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,9 +17,13 @@ workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
|
||||
[target.'cfg(target_os = "windows")'.dependencies]
|
||||
uds_windows = { workspace = true }
|
||||
codex-uds = { workspace = true }
|
||||
tokio = { workspace = true, features = [
|
||||
"io-std",
|
||||
"io-util",
|
||||
"macros",
|
||||
"rt-multi-thread",
|
||||
] }
|
||||
|
||||
[dev-dependencies]
|
||||
codex-utils-cargo-bin = { workspace = true }
|
||||
|
||||
@@ -17,4 +17,4 @@ Unfortunately, the Rust standard library does not provide support for UNIX domai
|
||||
|
||||
https://github.com/rust-lang/rust/issues/56533
|
||||
|
||||
As a workaround, this crate leverages https://crates.io/crates/uds_windows as a dependency on Windows.
|
||||
As a workaround, this crate uses `codex-uds`, which provides a cross-platform async UDS API backed by https://crates.io/crates/uds_windows on Windows.
|
||||
|
||||
@@ -1,56 +1,46 @@
|
||||
#![deny(clippy::print_stdout)]
|
||||
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::net::Shutdown;
|
||||
use std::path::Path;
|
||||
use std::thread;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::anyhow;
|
||||
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::net::UnixStream;
|
||||
|
||||
#[cfg(windows)]
|
||||
use uds_windows::UnixStream;
|
||||
use codex_uds::UnixStream;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
/// Connects to the Unix Domain Socket at `socket_path` and relays data between
|
||||
/// standard input/output and the socket.
|
||||
pub fn run(socket_path: &Path) -> anyhow::Result<()> {
|
||||
let mut stream = UnixStream::connect(socket_path)
|
||||
pub async fn run(socket_path: &Path) -> anyhow::Result<()> {
|
||||
let stream = UnixStream::connect(socket_path)
|
||||
.await
|
||||
.with_context(|| format!("failed to connect to socket at {}", socket_path.display()))?;
|
||||
let (mut socket_reader, mut socket_writer) = tokio::io::split(stream);
|
||||
|
||||
let mut reader = stream
|
||||
.try_clone()
|
||||
.context("failed to clone socket for reading")?;
|
||||
|
||||
let stdout_thread = thread::spawn(move || -> io::Result<()> {
|
||||
let stdout = io::stdout();
|
||||
let mut handle = stdout.lock();
|
||||
io::copy(&mut reader, &mut handle)?;
|
||||
handle.flush()?;
|
||||
let copy_socket_to_stdout = async {
|
||||
let mut stdout = tokio::io::stdout();
|
||||
tokio::io::copy(&mut socket_reader, &mut stdout).await?;
|
||||
stdout.flush().await?;
|
||||
Ok(())
|
||||
});
|
||||
};
|
||||
let copy_stdin_to_socket = async {
|
||||
let mut stdin = tokio::io::stdin();
|
||||
tokio::io::copy(&mut stdin, &mut socket_writer)
|
||||
.await
|
||||
.context("failed to copy data from stdin to socket")?;
|
||||
|
||||
let stdin = io::stdin();
|
||||
{
|
||||
let mut handle = stdin.lock();
|
||||
io::copy(&mut handle, &mut stream).context("failed to copy data from stdin to socket")?;
|
||||
}
|
||||
// The peer can close immediately after sending its response; in that
|
||||
// race, half-closing our write side can report NotConnected on some
|
||||
// platforms.
|
||||
if let Err(err) = socket_writer.shutdown().await
|
||||
&& err.kind() != io::ErrorKind::NotConnected
|
||||
{
|
||||
return Err(err).context("failed to shutdown socket writer");
|
||||
}
|
||||
|
||||
// The peer can close immediately after sending its response; in that race,
|
||||
// half-closing our write side can report NotConnected on some platforms.
|
||||
if let Err(err) = stream.shutdown(Shutdown::Write)
|
||||
&& err.kind() != io::ErrorKind::NotConnected
|
||||
{
|
||||
return Err(err).context("failed to shutdown socket writer");
|
||||
}
|
||||
anyhow::Ok(())
|
||||
};
|
||||
|
||||
let stdout_result = stdout_thread
|
||||
.join()
|
||||
.map_err(|_| anyhow!("thread panicked while copying socket data to stdout"))?;
|
||||
stdout_result.context("failed to copy data from socket to stdout")?;
|
||||
tokio::try_join!(copy_stdin_to_socket, copy_socket_to_stdout)
|
||||
.context("failed to relay data between stdio and socket")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -2,7 +2,8 @@ use std::env;
|
||||
use std::path::PathBuf;
|
||||
use std::process;
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let mut args = env::args_os().skip(1);
|
||||
let Some(socket_path) = args.next() else {
|
||||
eprintln!("Usage: codex-stdio-to-uds <socket-path>");
|
||||
@@ -15,5 +16,5 @@ fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
let socket_path = PathBuf::from(socket_path);
|
||||
codex_stdio_to_uds::run(&socket_path)
|
||||
codex_stdio_to_uds::run(&socket_path).await
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::io::ErrorKind;
|
||||
use std::io::Read;
|
||||
use std::io::Write;
|
||||
use std::process::Command;
|
||||
use std::process::ExitStatus;
|
||||
use std::process::Stdio;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
@@ -9,17 +9,13 @@ use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::anyhow;
|
||||
use codex_uds::UnixListener;
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::net::UnixListener;
|
||||
|
||||
#[cfg(windows)]
|
||||
use uds_windows::UnixListener;
|
||||
|
||||
#[test]
|
||||
fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
|
||||
#[tokio::test]
|
||||
async fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
|
||||
// This test intentionally avoids `read_to_end()` on the server side because
|
||||
// waiting for EOF can race with socket half-close behavior on slower runners.
|
||||
// Reading the exact request length keeps the test deterministic.
|
||||
@@ -32,7 +28,7 @@ fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
|
||||
let request = b"request";
|
||||
let request_path = dir.path().join("request.txt");
|
||||
std::fs::write(&request_path, request).context("failed to write child stdin fixture")?;
|
||||
let listener = match UnixListener::bind(&socket_path) {
|
||||
let listener = match UnixListener::bind(&socket_path).await {
|
||||
Ok(listener) => listener,
|
||||
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
|
||||
eprintln!("skipping test: failed to bind unix socket: {err}");
|
||||
@@ -43,105 +39,119 @@ fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
|
||||
}
|
||||
};
|
||||
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let (event_tx, event_rx) = mpsc::channel();
|
||||
let server_thread = thread::spawn(move || -> anyhow::Result<()> {
|
||||
let server_task = tokio::spawn(async move {
|
||||
let mut listener = listener;
|
||||
let _ = event_tx.send("waiting for accept".to_string());
|
||||
let (mut connection, _) = listener
|
||||
let mut connection = listener
|
||||
.accept()
|
||||
.await
|
||||
.context("failed to accept test connection")?;
|
||||
let _ = event_tx.send("accepted connection".to_string());
|
||||
let mut received = vec![0; request.len()];
|
||||
connection
|
||||
.read_exact(&mut received)
|
||||
.await
|
||||
.context("failed to read data from client")?;
|
||||
let _ = event_tx.send(format!("read {} bytes", received.len()));
|
||||
tx.send(received)
|
||||
.map_err(|_| anyhow!("failed to send received bytes to test thread"))?;
|
||||
connection
|
||||
.write_all(b"response")
|
||||
.await
|
||||
.context("failed to write response to client")?;
|
||||
let _ = event_tx.send("wrote response".to_string());
|
||||
Ok(())
|
||||
anyhow::Ok(received)
|
||||
});
|
||||
|
||||
let stdin = std::fs::File::open(&request_path).context("failed to open child stdin fixture")?;
|
||||
let mut child = Command::new(codex_utils_cargo_bin::cargo_bin("codex-stdio-to-uds")?)
|
||||
.arg(&socket_path)
|
||||
.stdin(Stdio::from(stdin))
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
.context("failed to spawn codex-stdio-to-uds")?;
|
||||
struct ChildOutput {
|
||||
status: ExitStatus,
|
||||
stdout: Vec<u8>,
|
||||
stderr: Vec<u8>,
|
||||
server_events: Vec<String>,
|
||||
}
|
||||
|
||||
let mut child_stdout = child.stdout.take().context("missing child stdout")?;
|
||||
let mut child_stderr = child.stderr.take().context("missing child stderr")?;
|
||||
let (stdout_tx, stdout_rx) = mpsc::channel();
|
||||
let (stderr_tx, stderr_rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let mut stdout = Vec::new();
|
||||
let result = child_stdout.read_to_end(&mut stdout).map(|_| stdout);
|
||||
let _ = stdout_tx.send(result);
|
||||
});
|
||||
thread::spawn(move || {
|
||||
let mut stderr = Vec::new();
|
||||
let result = child_stderr.read_to_end(&mut stderr).map(|_| stderr);
|
||||
let _ = stderr_tx.send(result);
|
||||
let child_task = tokio::task::spawn_blocking(move || -> anyhow::Result<ChildOutput> {
|
||||
let stdin =
|
||||
std::fs::File::open(&request_path).context("failed to open child stdin fixture")?;
|
||||
let mut child = Command::new(codex_utils_cargo_bin::cargo_bin("codex-stdio-to-uds")?)
|
||||
.arg(&socket_path)
|
||||
.stdin(Stdio::from(stdin))
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
.context("failed to spawn codex-stdio-to-uds")?;
|
||||
|
||||
let mut child_stdout = child.stdout.take().context("missing child stdout")?;
|
||||
let mut child_stderr = child.stderr.take().context("missing child stderr")?;
|
||||
let (stdout_tx, stdout_rx) = mpsc::channel();
|
||||
let (stderr_tx, stderr_rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let mut stdout = Vec::new();
|
||||
let result = child_stdout.read_to_end(&mut stdout).map(|_| stdout);
|
||||
let _ = stdout_tx.send(result);
|
||||
});
|
||||
thread::spawn(move || {
|
||||
let mut stderr = Vec::new();
|
||||
let result = child_stderr.read_to_end(&mut stderr).map(|_| stderr);
|
||||
let _ = stderr_tx.send(result);
|
||||
});
|
||||
|
||||
let mut server_events = Vec::new();
|
||||
let deadline = Instant::now() + Duration::from_secs(5);
|
||||
let status = loop {
|
||||
while let Ok(event) = event_rx.try_recv() {
|
||||
server_events.push(event);
|
||||
}
|
||||
|
||||
if let Some(status) = child.try_wait().context("failed to poll child status")? {
|
||||
break status;
|
||||
}
|
||||
|
||||
if Instant::now() >= deadline {
|
||||
let _ = child.kill();
|
||||
let _ = child.wait();
|
||||
let stderr = stderr_rx
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.context("timed out waiting for child stderr after kill")?
|
||||
.context("failed to read child stderr")?;
|
||||
anyhow::bail!(
|
||||
"codex-stdio-to-uds did not exit in time; server events: {:?}; stderr: {}",
|
||||
server_events,
|
||||
String::from_utf8_lossy(&stderr).trim_end()
|
||||
);
|
||||
}
|
||||
|
||||
thread::sleep(Duration::from_millis(25));
|
||||
};
|
||||
|
||||
let stdout = stdout_rx
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.context("timed out waiting for child stdout")?
|
||||
.context("failed to read child stdout")?;
|
||||
let stderr = stderr_rx
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.context("timed out waiting for child stderr")?
|
||||
.context("failed to read child stderr")?;
|
||||
|
||||
Ok(ChildOutput {
|
||||
status,
|
||||
stdout,
|
||||
stderr,
|
||||
server_events,
|
||||
})
|
||||
});
|
||||
|
||||
let mut server_events = Vec::new();
|
||||
let deadline = Instant::now() + Duration::from_secs(5);
|
||||
let status = loop {
|
||||
while let Ok(event) = event_rx.try_recv() {
|
||||
server_events.push(event);
|
||||
}
|
||||
|
||||
if let Some(status) = child.try_wait().context("failed to poll child status")? {
|
||||
break status;
|
||||
}
|
||||
|
||||
if Instant::now() >= deadline {
|
||||
let _ = child.kill();
|
||||
let _ = child.wait();
|
||||
let stderr = stderr_rx
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.context("timed out waiting for child stderr after kill")?
|
||||
.context("failed to read child stderr")?;
|
||||
anyhow::bail!(
|
||||
"codex-stdio-to-uds did not exit in time; server events: {:?}; stderr: {}",
|
||||
server_events,
|
||||
String::from_utf8_lossy(&stderr).trim_end()
|
||||
);
|
||||
}
|
||||
|
||||
thread::sleep(Duration::from_millis(25));
|
||||
};
|
||||
|
||||
let stdout = stdout_rx
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.context("timed out waiting for child stdout")?
|
||||
.context("failed to read child stdout")?;
|
||||
let stderr = stderr_rx
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.context("timed out waiting for child stderr")?
|
||||
.context("failed to read child stderr")?;
|
||||
let child_output = child_task.await.context("child task panicked")??;
|
||||
assert!(
|
||||
status.success(),
|
||||
child_output.status.success(),
|
||||
"codex-stdio-to-uds exited with {status}; server events: {:?}; stderr: {}",
|
||||
server_events,
|
||||
String::from_utf8_lossy(&stderr).trim_end()
|
||||
child_output.server_events,
|
||||
String::from_utf8_lossy(&child_output.stderr).trim_end(),
|
||||
status = child_output.status
|
||||
);
|
||||
assert_eq!(stdout, b"response");
|
||||
assert_eq!(child_output.stdout, b"response");
|
||||
|
||||
let received = rx
|
||||
.recv_timeout(Duration::from_secs(1))
|
||||
.context("server did not receive data in time")?;
|
||||
let received = server_task.await.context("server task panicked")??;
|
||||
assert_eq!(received, request);
|
||||
|
||||
let server_result = server_thread
|
||||
.join()
|
||||
.map_err(|_| anyhow!("server thread panicked"))?;
|
||||
server_result.context("server failed")?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -38,10 +38,30 @@ impl LocalThreadStore {
|
||||
pub fn new(config: RolloutConfig) -> Self {
|
||||
Self { config }
|
||||
}
|
||||
|
||||
/// Read a local rollout-backed thread by path.
|
||||
pub async fn read_thread_by_rollout_path(
|
||||
&self,
|
||||
rollout_path: std::path::PathBuf,
|
||||
include_archived: bool,
|
||||
include_history: bool,
|
||||
) -> ThreadStoreResult<StoredThread> {
|
||||
read_thread::read_thread_by_rollout_path(
|
||||
self,
|
||||
rollout_path,
|
||||
include_archived,
|
||||
include_history,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ThreadStore for LocalThreadStore {
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
}
|
||||
|
||||
async fn create_thread(
|
||||
&self,
|
||||
_params: CreateThreadParams,
|
||||
|
||||
@@ -31,15 +31,7 @@ pub(super) async fn read_thread(
|
||||
&& (params.include_archived || metadata.archived_at.is_none())
|
||||
{
|
||||
let mut thread = stored_thread_from_sqlite_metadata(store, metadata).await;
|
||||
if params.include_history {
|
||||
let Some(path) = thread.rollout_path.clone() else {
|
||||
return Err(ThreadStoreError::Internal {
|
||||
message: format!("failed to locate rollout for thread {thread_id}"),
|
||||
});
|
||||
};
|
||||
let items = load_history_items(&path).await?;
|
||||
thread.history = Some(StoredThreadHistory { thread_id, items });
|
||||
}
|
||||
attach_history_if_requested(&mut thread, params.include_history).await?;
|
||||
return Ok(thread);
|
||||
}
|
||||
|
||||
@@ -49,19 +41,60 @@ pub(super) async fn read_thread(
|
||||
message: format!("no rollout found for thread id {thread_id}"),
|
||||
})?;
|
||||
|
||||
let mut thread = read_thread_from_rollout_path(store, thread_id, path).await?;
|
||||
if params.include_history {
|
||||
let Some(path) = thread.rollout_path.clone() else {
|
||||
return Err(ThreadStoreError::Internal {
|
||||
message: format!("failed to load thread history for thread {thread_id}"),
|
||||
});
|
||||
};
|
||||
let items = load_history_items(&path).await?;
|
||||
thread.history = Some(StoredThreadHistory { thread_id, items });
|
||||
}
|
||||
let mut thread = read_thread_from_rollout_path(store, path).await?;
|
||||
attach_history_if_requested(&mut thread, params.include_history).await?;
|
||||
Ok(thread)
|
||||
}
|
||||
|
||||
pub(super) async fn read_thread_by_rollout_path(
|
||||
store: &LocalThreadStore,
|
||||
rollout_path: std::path::PathBuf,
|
||||
include_archived: bool,
|
||||
include_history: bool,
|
||||
) -> ThreadStoreResult<StoredThread> {
|
||||
let path = resolve_requested_rollout_path(store, rollout_path)?;
|
||||
let mut thread = read_thread_from_rollout_path(store, path).await?;
|
||||
if !include_archived && thread.archived_at.is_some() {
|
||||
return Err(ThreadStoreError::InvalidRequest {
|
||||
message: format!("thread {} is archived", thread.thread_id),
|
||||
});
|
||||
}
|
||||
attach_history_if_requested(&mut thread, include_history).await?;
|
||||
Ok(thread)
|
||||
}
|
||||
|
||||
fn resolve_requested_rollout_path(
|
||||
store: &LocalThreadStore,
|
||||
rollout_path: std::path::PathBuf,
|
||||
) -> ThreadStoreResult<std::path::PathBuf> {
|
||||
let path = if rollout_path.is_relative() {
|
||||
store.config.codex_home.join(rollout_path)
|
||||
} else {
|
||||
rollout_path
|
||||
};
|
||||
std::fs::canonicalize(&path).map_err(|err| ThreadStoreError::InvalidRequest {
|
||||
message: format!("failed to resolve rollout path `{}`: {err}", path.display()),
|
||||
})
|
||||
}
|
||||
|
||||
async fn attach_history_if_requested(
|
||||
thread: &mut StoredThread,
|
||||
include_history: bool,
|
||||
) -> ThreadStoreResult<()> {
|
||||
if !include_history {
|
||||
return Ok(());
|
||||
}
|
||||
let thread_id = thread.thread_id;
|
||||
let Some(path) = thread.rollout_path.clone() else {
|
||||
return Err(ThreadStoreError::Internal {
|
||||
message: format!("failed to load thread history for thread {thread_id}"),
|
||||
});
|
||||
};
|
||||
let items = load_history_items(&path).await?;
|
||||
thread.history = Some(StoredThreadHistory { thread_id, items });
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn resolve_rollout_path(
|
||||
store: &LocalThreadStore,
|
||||
thread_id: codex_protocol::ThreadId,
|
||||
@@ -94,7 +127,6 @@ async fn resolve_rollout_path(
|
||||
|
||||
async fn read_thread_from_rollout_path(
|
||||
store: &LocalThreadStore,
|
||||
thread_id: codex_protocol::ThreadId,
|
||||
path: std::path::PathBuf,
|
||||
) -> ThreadStoreResult<StoredThread> {
|
||||
let Some(item) = read_thread_item_from_rollout(path.clone()).await else {
|
||||
@@ -116,7 +148,7 @@ async fn read_thread_from_rollout_path(
|
||||
.ok()
|
||||
.and_then(|meta_line| meta_line.meta.forked_from_id);
|
||||
if let Ok(Some(title)) =
|
||||
find_thread_name_by_id(store.config.codex_home.as_path(), &thread_id).await
|
||||
find_thread_name_by_id(store.config.codex_home.as_path(), &thread.thread_id).await
|
||||
{
|
||||
set_thread_name_from_title(&mut thread, title);
|
||||
}
|
||||
@@ -348,6 +380,36 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_thread_returns_rollout_path_summary() {
|
||||
let home = TempDir::new().expect("temp dir");
|
||||
let store = LocalThreadStore::new(test_config(home.path()));
|
||||
let uuid = Uuid::from_u128(211);
|
||||
let thread_id = ThreadId::from_string(&uuid.to_string()).expect("valid thread id");
|
||||
let active_path =
|
||||
write_session_file(home.path(), "2025-01-03T12-00-00", uuid).expect("session file");
|
||||
let relative_path = active_path
|
||||
.strip_prefix(home.path())
|
||||
.expect("path should be under codex home")
|
||||
.to_path_buf();
|
||||
|
||||
let thread = store
|
||||
.read_thread_by_rollout_path(
|
||||
relative_path,
|
||||
/*include_archived*/ false,
|
||||
/*include_history*/ false,
|
||||
)
|
||||
.await
|
||||
.expect("read thread by rollout path");
|
||||
|
||||
assert_eq!(thread.thread_id, thread_id);
|
||||
assert_eq!(
|
||||
thread.rollout_path,
|
||||
Some(std::fs::canonicalize(active_path).expect("canonical path"))
|
||||
);
|
||||
assert_eq!(thread.preview, "Hello from user");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn read_thread_returns_archived_rollout_when_requested() {
|
||||
let home = TempDir::new().expect("temp dir");
|
||||
|
||||
@@ -48,6 +48,10 @@ impl RemoteThreadStore {
|
||||
|
||||
#[async_trait]
|
||||
impl ThreadStore for RemoteThreadStore {
|
||||
fn as_any(&self) -> &dyn std::any::Any {
|
||||
self
|
||||
}
|
||||
|
||||
async fn create_thread(
|
||||
&self,
|
||||
_params: CreateThreadParams,
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::any::Any;
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::AppendThreadItemsParams;
|
||||
@@ -16,7 +18,11 @@ use crate::UpdateThreadMetadataParams;
|
||||
|
||||
/// Storage-neutral thread persistence boundary.
|
||||
#[async_trait]
|
||||
pub trait ThreadStore: Send + Sync {
|
||||
pub trait ThreadStore: Any + Send + Sync {
|
||||
/// Return this store as [`Any`] so callers at API boundaries can reject requests that only
|
||||
/// make sense for a concrete store implementation.
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
|
||||
/// Creates a new thread and returns a live recorder for future appends.
|
||||
async fn create_thread(
|
||||
&self,
|
||||
|
||||
@@ -338,6 +338,7 @@ impl ChatWidget {
|
||||
RealtimeEvent::ConversationItemAdded(_item) => {}
|
||||
RealtimeEvent::ConversationItemDone { .. } => {}
|
||||
RealtimeEvent::HandoffRequested(_) => {}
|
||||
RealtimeEvent::NoopRequested(_) => {}
|
||||
RealtimeEvent::Error(message) => {
|
||||
self.fail_realtime_conversation(format!("Realtime voice error: {message}"));
|
||||
}
|
||||
|
||||
6
codex-rs/uds/BUILD.bazel
Normal file
6
codex-rs/uds/BUILD.bazel
Normal file
@@ -0,0 +1,6 @@
|
||||
load("//:defs.bzl", "codex_rust_crate")
|
||||
|
||||
codex_rust_crate(
|
||||
name = "uds",
|
||||
crate_name = "codex_uds",
|
||||
)
|
||||
29
codex-rs/uds/Cargo.toml
Normal file
29
codex-rs/uds/Cargo.toml
Normal file
@@ -0,0 +1,29 @@
|
||||
[package]
|
||||
name = "codex-uds"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[lib]
|
||||
name = "codex_uds"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
tokio = { workspace = true, features = ["fs", "net", "rt"] }
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
async-io = { workspace = true }
|
||||
tokio-util = { workspace = true, features = ["compat"] }
|
||||
uds_windows = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
tokio = { workspace = true, features = [
|
||||
"io-util",
|
||||
"macros",
|
||||
"rt-multi-thread",
|
||||
] }
|
||||
331
codex-rs/uds/src/lib.rs
Normal file
331
codex-rs/uds/src/lib.rs
Normal file
@@ -0,0 +1,331 @@
|
||||
//! Cross-platform async Unix domain socket helpers.
|
||||
|
||||
use std::io::Result as IoResult;
|
||||
use std::path::Path;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::io::ReadBuf;
|
||||
|
||||
/// Creates `socket_dir` if needed and restricts it to the current user where
|
||||
/// the platform exposes Unix permissions.
|
||||
pub async fn prepare_private_socket_directory(socket_dir: impl AsRef<Path>) -> IoResult<()> {
|
||||
platform::prepare_private_socket_directory(socket_dir.as_ref()).await
|
||||
}
|
||||
|
||||
/// Returns whether `socket_path` points at a stale Unix socket rendezvous path.
|
||||
///
|
||||
/// On Unix this checks the file type. On Windows, `uds_windows` represents the
|
||||
/// rendezvous as a regular path, so existence is the only useful stale-path
|
||||
/// signal available.
|
||||
pub async fn is_stale_socket_path(socket_path: impl AsRef<Path>) -> IoResult<bool> {
|
||||
platform::is_stale_socket_path(socket_path.as_ref()).await
|
||||
}
|
||||
|
||||
/// Async Unix domain socket listener.
|
||||
pub struct UnixListener {
|
||||
inner: platform::Listener,
|
||||
}
|
||||
|
||||
impl UnixListener {
|
||||
/// Binds a new listener at `socket_path`.
|
||||
pub async fn bind(socket_path: impl AsRef<Path>) -> IoResult<Self> {
|
||||
platform::bind_listener(socket_path.as_ref())
|
||||
.await
|
||||
.map(|inner| Self { inner })
|
||||
}
|
||||
|
||||
/// Accepts the next incoming stream.
|
||||
pub async fn accept(&mut self) -> IoResult<UnixStream> {
|
||||
self.inner.accept().await.map(|inner| UnixStream { inner })
|
||||
}
|
||||
}
|
||||
|
||||
/// Async Unix domain socket stream.
|
||||
pub struct UnixStream {
|
||||
inner: platform::Stream,
|
||||
}
|
||||
|
||||
impl UnixStream {
|
||||
/// Connects to `socket_path`.
|
||||
pub async fn connect(socket_path: impl AsRef<Path>) -> IoResult<Self> {
|
||||
platform::connect_stream(socket_path.as_ref())
|
||||
.await
|
||||
.map(|inner| Self { inner })
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for UnixStream {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<IoResult<()>> {
|
||||
Pin::new(&mut self.get_mut().inner).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for UnixStream {
|
||||
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<IoResult<usize>> {
|
||||
Pin::new(&mut self.get_mut().inner).poll_write(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
|
||||
Pin::new(&mut self.get_mut().inner).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
|
||||
Pin::new(&mut self.get_mut().inner).poll_shutdown(cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
mod platform {
|
||||
use std::io;
|
||||
use std::io::ErrorKind;
|
||||
use std::io::Result as IoResult;
|
||||
use std::os::unix::fs::FileTypeExt;
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::path::Path;
|
||||
|
||||
use tokio::fs;
|
||||
use tokio::net::UnixListener;
|
||||
use tokio::net::UnixStream;
|
||||
|
||||
/// Owner-only access keeps the control socket directory private while
|
||||
/// preserving owner traversal and socket path creation.
|
||||
const SOCKET_DIR_MODE: u32 = 0o700;
|
||||
const SOCKET_DIR_PERMISSION_BITS: u32 = 0o777;
|
||||
|
||||
pub(super) type Stream = UnixStream;
|
||||
|
||||
pub(super) struct Listener(UnixListener);
|
||||
|
||||
pub(super) async fn prepare_private_socket_directory(socket_dir: &Path) -> IoResult<()> {
|
||||
let mut dir_builder = fs::DirBuilder::new();
|
||||
dir_builder.mode(SOCKET_DIR_MODE);
|
||||
match dir_builder.create(socket_dir).await {
|
||||
Ok(()) => return Ok(()),
|
||||
Err(err) if err.kind() == ErrorKind::AlreadyExists => {}
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
|
||||
let metadata = fs::symlink_metadata(socket_dir).await?;
|
||||
if !metadata.is_dir() {
|
||||
return Err(io::Error::new(
|
||||
ErrorKind::AlreadyExists,
|
||||
format!(
|
||||
"socket directory path exists and is not a directory: {}",
|
||||
socket_dir.display()
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
let permissions = metadata.permissions();
|
||||
// The SSH-over-UDS control socket is reachable by path, so the
|
||||
// rendezvous directory must be owner-traversable while denying
|
||||
// group/other access; exact 0700 fixes insecure modes and unusable
|
||||
// owner-only modes like 0600.
|
||||
if permissions.mode() & SOCKET_DIR_PERMISSION_BITS != SOCKET_DIR_MODE {
|
||||
fs::set_permissions(socket_dir, std::fs::Permissions::from_mode(SOCKET_DIR_MODE))
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) async fn bind_listener(socket_path: &Path) -> IoResult<Listener> {
|
||||
UnixListener::bind(socket_path).map(Listener)
|
||||
}
|
||||
|
||||
impl Listener {
|
||||
pub(super) async fn accept(&mut self) -> IoResult<Stream> {
|
||||
self.0.accept().await.map(|(stream, _addr)| stream)
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn connect_stream(socket_path: &Path) -> IoResult<Stream> {
|
||||
UnixStream::connect(socket_path).await
|
||||
}
|
||||
|
||||
pub(super) async fn is_stale_socket_path(socket_path: &Path) -> IoResult<bool> {
|
||||
Ok(fs::symlink_metadata(socket_path)
|
||||
.await?
|
||||
.file_type()
|
||||
.is_socket())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(windows)]
|
||||
mod platform {
|
||||
use std::io;
|
||||
use std::io::Result as IoResult;
|
||||
use std::net::Shutdown;
|
||||
use std::ops::Deref;
|
||||
use std::os::windows::io::AsRawSocket;
|
||||
use std::os::windows::io::AsSocket;
|
||||
use std::os::windows::io::BorrowedSocket;
|
||||
use std::path::Path;
|
||||
use std::pin::Pin;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use std::task::ready;
|
||||
|
||||
use async_io::Async;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::io::ReadBuf;
|
||||
use tokio::task;
|
||||
use tokio_util::compat::Compat;
|
||||
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
||||
|
||||
pub(super) struct Stream(Compat<Async<WindowsUnixStream>>);
|
||||
|
||||
pub(super) async fn prepare_private_socket_directory(socket_dir: &Path) -> IoResult<()> {
|
||||
tokio::fs::create_dir_all(socket_dir).await
|
||||
}
|
||||
|
||||
pub(super) struct Listener(Async<WindowsUnixListener>);
|
||||
|
||||
pub(super) async fn bind_listener(socket_path: &Path) -> IoResult<Listener> {
|
||||
let socket_path = socket_path.to_path_buf();
|
||||
let listener =
|
||||
spawn_blocking_io(move || uds_windows::UnixListener::bind(socket_path)).await?;
|
||||
Async::new(WindowsUnixListener::from(listener)).map(Listener)
|
||||
}
|
||||
|
||||
impl Listener {
|
||||
pub(super) async fn accept(&mut self) -> IoResult<Stream> {
|
||||
let (stream, _addr) = self.0.read_with(|listener| listener.accept()).await?;
|
||||
Async::new(WindowsUnixStream::from(stream))
|
||||
.map(FuturesAsyncReadCompatExt::compat)
|
||||
.map(Stream)
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn connect_stream(socket_path: &Path) -> IoResult<Stream> {
|
||||
let socket_path = socket_path.to_path_buf();
|
||||
let stream =
|
||||
spawn_blocking_io(move || uds_windows::UnixStream::connect(socket_path)).await?;
|
||||
Async::new(WindowsUnixStream::from(stream))
|
||||
.map(FuturesAsyncReadCompatExt::compat)
|
||||
.map(Stream)
|
||||
}
|
||||
|
||||
pub(super) async fn is_stale_socket_path(socket_path: &Path) -> IoResult<bool> {
|
||||
tokio::fs::try_exists(socket_path).await
|
||||
}
|
||||
|
||||
async fn spawn_blocking_io<T>(
|
||||
operation: impl FnOnce() -> IoResult<T> + Send + 'static,
|
||||
) -> IoResult<T>
|
||||
where
|
||||
T: Send + 'static,
|
||||
{
|
||||
task::spawn_blocking(operation)
|
||||
.await
|
||||
.map_err(|err| io::Error::other(format!("blocking socket task failed: {err}")))?
|
||||
}
|
||||
|
||||
pub(super) struct WindowsUnixListener(uds_windows::UnixListener);
|
||||
|
||||
impl From<uds_windows::UnixListener> for WindowsUnixListener {
|
||||
fn from(listener: uds_windows::UnixListener) -> Self {
|
||||
Self(listener)
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for WindowsUnixListener {
|
||||
type Target = uds_windows::UnixListener;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl AsSocket for WindowsUnixListener {
|
||||
fn as_socket(&self) -> BorrowedSocket<'_> {
|
||||
unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) struct WindowsUnixStream(uds_windows::UnixStream);
|
||||
|
||||
impl From<uds_windows::UnixStream> for WindowsUnixStream {
|
||||
fn from(stream: uds_windows::UnixStream) -> Self {
|
||||
Self(stream)
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for WindowsUnixStream {
|
||||
type Target = uds_windows::UnixStream;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl AsSocket for WindowsUnixStream {
|
||||
fn as_socket(&self) -> BorrowedSocket<'_> {
|
||||
unsafe { BorrowedSocket::borrow_raw(self.as_raw_socket()) }
|
||||
}
|
||||
}
|
||||
|
||||
impl io::Read for WindowsUnixStream {
|
||||
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
|
||||
io::Read::read(&mut self.0, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl io::Write for WindowsUnixStream {
|
||||
fn write(&mut self, buf: &[u8]) -> IoResult<usize> {
|
||||
io::Write::write(&mut self.0, buf)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> IoResult<()> {
|
||||
io::Write::flush(&mut self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for Stream {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<IoResult<()>> {
|
||||
Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for Stream {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<IoResult<usize>> {
|
||||
Pin::new(&mut self.get_mut().0).poll_write(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
|
||||
Pin::new(&mut self.get_mut().0).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<IoResult<()>> {
|
||||
let stream = &mut self.get_mut().0;
|
||||
ready!(Pin::new(&mut *stream).poll_flush(cx))?;
|
||||
// `Compat<Async<_>>` maps shutdown to `poll_close()`, which only
|
||||
// flushes for `async_io::Async`; call the socket shutdown directly.
|
||||
stream.get_ref().get_ref().shutdown(Shutdown::Write)?;
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl async_io::IoSafe for WindowsUnixListener {}
|
||||
unsafe impl async_io::IoSafe for WindowsUnixStream {}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod lib_tests;
|
||||
121
codex-rs/uds/src/lib_tests.rs
Normal file
121
codex-rs/uds/src/lib_tests.rs
Normal file
@@ -0,0 +1,121 @@
|
||||
use std::io::ErrorKind;
|
||||
|
||||
use pretty_assertions::assert_eq;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn prepare_private_socket_directory_creates_directory() {
|
||||
let temp_dir = tempfile::TempDir::new().expect("temp dir");
|
||||
let socket_dir = temp_dir.path().join("app-server-control");
|
||||
|
||||
prepare_private_socket_directory(&socket_dir)
|
||||
.await
|
||||
.expect("socket dir should be created");
|
||||
|
||||
assert!(socket_dir.is_dir());
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test]
|
||||
async fn prepare_private_socket_directory_sets_existing_permissions_to_owner_only() {
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
|
||||
let temp_dir = tempfile::TempDir::new().expect("temp dir");
|
||||
for mode in [0o755, 0o600] {
|
||||
let socket_dir = temp_dir.path().join(format!("app-server-control-{mode:o}"));
|
||||
std::fs::create_dir(&socket_dir).expect("socket dir should be created");
|
||||
std::fs::set_permissions(&socket_dir, std::fs::Permissions::from_mode(mode))
|
||||
.expect("socket dir permissions should be changed");
|
||||
|
||||
prepare_private_socket_directory(&socket_dir)
|
||||
.await
|
||||
.expect("socket dir permissions should be set exactly");
|
||||
|
||||
let mode = std::fs::metadata(&socket_dir)
|
||||
.expect("socket dir metadata")
|
||||
.permissions()
|
||||
.mode();
|
||||
assert_eq!(mode & 0o777, 0o700);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test]
|
||||
async fn regular_file_path_is_not_stale_socket_path() {
|
||||
let temp_dir = tempfile::TempDir::new().expect("temp dir");
|
||||
let regular_file = temp_dir.path().join("not-a-socket");
|
||||
std::fs::write(®ular_file, b"not a socket").expect("regular file should be created");
|
||||
|
||||
assert!(
|
||||
!is_stale_socket_path(®ular_file)
|
||||
.await
|
||||
.expect("stale socket check should succeed")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn bound_listener_path_is_stale_socket_path() {
|
||||
let temp_dir = tempfile::TempDir::new().expect("temp dir");
|
||||
let socket_path = temp_dir.path().join("socket");
|
||||
let _listener = match UnixListener::bind(&socket_path).await {
|
||||
Ok(listener) => listener,
|
||||
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
|
||||
eprintln!("skipping test: failed to bind unix socket: {err}");
|
||||
return;
|
||||
}
|
||||
Err(err) => panic!("failed to bind test socket: {err}"),
|
||||
};
|
||||
|
||||
assert!(
|
||||
is_stale_socket_path(&socket_path)
|
||||
.await
|
||||
.expect("stale socket check should succeed")
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stream_round_trips_data_between_listener_and_client() {
|
||||
let temp_dir = tempfile::TempDir::new().expect("temp dir");
|
||||
let socket_path = temp_dir.path().join("socket");
|
||||
let mut listener = match UnixListener::bind(&socket_path).await {
|
||||
Ok(listener) => listener,
|
||||
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
|
||||
eprintln!("skipping test: failed to bind unix socket: {err}");
|
||||
return;
|
||||
}
|
||||
Err(err) => panic!("failed to bind test socket: {err}"),
|
||||
};
|
||||
|
||||
let server_task = tokio::spawn(async move {
|
||||
let mut server_stream = listener.accept().await.expect("connection should accept");
|
||||
let mut request = [0; 7];
|
||||
server_stream
|
||||
.read_exact(&mut request)
|
||||
.await
|
||||
.expect("server should read request");
|
||||
assert_eq!(&request, b"request");
|
||||
server_stream
|
||||
.write_all(b"response")
|
||||
.await
|
||||
.expect("server should write response");
|
||||
});
|
||||
|
||||
let mut client_stream = UnixStream::connect(&socket_path)
|
||||
.await
|
||||
.expect("client should connect");
|
||||
client_stream
|
||||
.write_all(b"request")
|
||||
.await
|
||||
.expect("client should write request");
|
||||
let mut response = [0; 8];
|
||||
client_stream
|
||||
.read_exact(&mut response)
|
||||
.await
|
||||
.expect("client should read response");
|
||||
assert_eq!(&response, b"response");
|
||||
|
||||
server_task.await.expect("server task should join");
|
||||
}
|
||||
Reference in New Issue
Block a user