Compare commits

...

40 Commits

Author SHA1 Message Date
Eric Traut
36b50e02b8 codex: address PR review feedback (#15106) 2026-03-21 15:14:45 -06:00
Eric Traut
ca4837cd0d codex: address PR review feedback (#15106) 2026-03-21 15:14:45 -06:00
Eric Traut
bf91bae434 codex: address PR review feedback (#15106) 2026-03-21 15:14:45 -06:00
Eric Traut
d054de45bd codex: fix CI failure on PR #15106 2026-03-21 15:14:45 -06:00
Eric Traut
be5dba6189 codex: restore exec json streaming compatibility (#15106) 2026-03-21 15:14:45 -06:00
Eric Traut
efa0f04e6f Simplify exec event handling 2026-03-21 15:14:45 -06:00
Eric Traut
36ba70c2bc codex: reject in-process external auth refresh (#15106) 2026-03-21 15:14:45 -06:00
Eric Traut
9855a2269f codex: fix PR CI regressions (#15106) 2026-03-21 15:14:07 -06:00
Eric Traut
851c37f066 codex: address PR review feedback (#15106) 2026-03-21 15:14:07 -06:00
Eric Traut
38027b41ff codex: address PR review feedback (#15106) 2026-03-21 15:14:07 -06:00
Eric Traut
c84b994c3d codex: drop legacy app-server events from client API (#15106) 2026-03-21 15:12:37 -06:00
Eric Traut
50b9c114be codex: remove in-process legacy notifications (#15106) 2026-03-21 15:12:14 -06:00
Eric Traut
d15933b8d9 codex: remove legacy app-server notification plumbing (#15106) 2026-03-21 15:12:14 -06:00
Eric Traut
5d438ff4f4 codex: restore typed exec json output coverage (#15106) 2026-03-21 15:11:29 -06:00
Eric Traut
e5172ff1c1 codex: restore exec json output test path (#15106) 2026-03-21 15:11:29 -06:00
Eric Traut
e4d070ff55 codex: move exec jsonl tests to crate tests dir (#15106) 2026-03-21 15:11:29 -06:00
Eric Traut
2e69132680 codex: simplify in-process startup wiring (#15106) 2026-03-21 15:11:28 -06:00
Eric Traut
86a33fbd79 codex: fix CI failure on PR #15106 2026-03-21 15:11:28 -06:00
Eric Traut
d4c72ea829 codex: address PR review feedback (#15106) 2026-03-21 15:11:28 -06:00
Eric Traut
50b61563c5 codex: address PR review feedback (#15106) 2026-03-21 15:11:28 -06:00
Eric Traut
be700f8284 codex: fix CI failure on PR #15106 2026-03-21 15:11:28 -06:00
Eric Traut
6058aba292 codex: fix CI failure on PR #15106 2026-03-21 15:11:28 -06:00
Eric Traut
99248e8300 codex: address PR review feedback (#15106) 2026-03-21 15:11:28 -06:00
Eric Traut
6e32ed312d codex: remove dead tui app server imports (#15106) 2026-03-21 15:11:28 -06:00
Eric Traut
4daf0300e1 codex: fix clippy regression in human output (#15106) 2026-03-21 15:11:28 -06:00
Eric Traut
e616e4b28b codex: address PR review feedback (#15106) 2026-03-21 15:11:28 -06:00
Eric Traut
f83b099c10 Fix exec human output fallbacks 2026-03-21 15:11:28 -06:00
Eric Traut
eeeb01a4bd Fix Bazel exec test fixtures 2026-03-21 15:11:28 -06:00
Eric Traut
c2851508b4 Fix exec CI regressions on app-server migration 2026-03-21 15:11:28 -06:00
Eric Traut
a19077ce26 codex: move exec json output tests back out of impl (#15106) 2026-03-21 15:11:28 -06:00
Eric Traut
58e2e9a2bb codex: fix follow-up PR blockers (#15106) 2026-03-21 15:11:27 -06:00
Eric Traut
17482fc8e6 codex: address PR review feedback (#15106) 2026-03-21 15:10:50 -06:00
Eric Traut
8dc496ec47 Restore typed exec JSON output coverage 2026-03-21 15:10:50 -06:00
Eric Traut
cf6cc9043d Move codex exec onto typed app-server APIs 2026-03-21 15:10:50 -06:00
Eric Traut
cf0223887f Remove legacy auth and notification handling from tui_app_server (#15414)
## Summary
- remove `tui_app_server` handling for legacy app-server notifications
- drop the local ChatGPT auth refresh request path from `tui_app_server`
- remove the now-unused refresh response helper from local auth loading

Split out of #15106 so the `tui_app_server` cleanup can land separately
from the larger `codex-exec` app-server migration.
2026-03-21 15:06:10 -06:00
Channing Conger
c23566b3af Add JIT entitlement for macosx (#15409)
Without this entitlement, hardened mac os release binaries are unable to
allocate the executable memory for the JIT compiled JS.

Tested with local signing.  Without entitlement I reproduce the error:
```
#
# Fatal process out of memory: Failed to reserve virtual memory for CodeRange
#
==== C stack trace ===============================

    0   codex                               0x00000001075d1acc codex + 85760716
    1   codex                               0x00000001075d6a64 codex + 85781092
    2   codex                               0x00000001075c7100 codex + 85717248
    3   codex                               0x0000000107637394 codex + 86176660
    4   codex                               0x0000000107823cfc codex + 88194300
    5   codex                               0x000000010777c438 codex + 87508024
    6   codex                               0x000000010777d130 codex + 87511344
    7   codex                               0x0000000107c87a54 codex + 92797524
    8   codex                               0x0000000107641188 codex + 86217096
    9   codex                               0x00000001076412d8 codex + 86217432
    10  codex                               0x0000000107553908 codex + 85244168
    11  codex                               0x000000010465f124 codex + 36008228
    12  codex                               0x000000010466a0d0 codex + 36053200
    13  codex                               0x000000010466ce78 codex + 36064888
    14  codex                               0x000000010734edb0 codex + 83127728
    15  libsystem_pthread.dylib             0x00000001810d3c08 _pthread_start + 136
    16  libsystem_pthread.dylib             0x00000001810ceba8 thread_start + 8
zsh: trace trap  target/release/codex exec --enable code_mode_only --enable code_mode --
```

With the entitlement the exec succeeds.
2026-03-21 13:43:14 -07:00
Eric Traut
b0236501e2 Remove legacy app-server notification handling from tui_app_server (#15390)
As part of moving the TUI onto the app server, we added some temporary
handling of some legacy events. We've confirmed that these do not need
to be supported, so this PR removes this support from the
tui_app_server, allowing for additional simplifications in follow-on
PRs. These events are needed only for very old rollouts. None of the
other app server clients (IDE extension or app) support these either.

## Summary
- stop translating legacy `codex/event/*` notifications inside
`tui_app_server`
- remove the TUI-side legacy warning and rollback buffering/replay paths
that were only fed by those notifications
- keep the lower-level app-server and app-server-client legacy event
plumbing intact so PR #15106 can rebase on top and handle the remaining
exec/lower-layer migration separately
2026-03-21 12:29:33 -06:00
Dylan Hurd
0d9bb8ea58 chore(context) Include guardian approval context (#15366)
## Summary
Include the guardian context in the developer message for approvals

## Testing
- [x] Updated unit tests
2026-03-21 16:31:22 +00:00
Matthew Zeng
06e06ab173 [plugins] Fix plugin explicit mention context management. (#15372)
- [x] Fix plugin explicit mention context management.
2026-03-21 00:29:29 -07:00
Channing Conger
e4eedd6170 Code mode on v8 (#15276)
Moves Code Mode to a new crate with no dependencies on codex. This
create encodes the code mode semantics that we want for lifetime,
mounting, tool calling.

The model-facing surface is mostly unchanged. `exec` still runs raw
JavaScript, `wait` still resumes or terminates a `cell_id`, nested tools
are still available through `tools.*`, and helpers like `text`, `image`,
`store`, `load`, `notify`, `yield_control`, and `exit` still exist.

The major change is underneath that surface:

- Old code mode was an external Node runtime.
- New code mode is an in-process V8 runtime embedded directly in Rust.
- Old code mode managed cells inside a long-lived Node runner process.
- New code mode manages cells in Rust, with one V8 runtime thread per
active `exec`.
- Old code mode used JSON protocol messages over child stdin/stdout plus
Node worker-thread messages.
- New code mode uses Rust channels and direct V8 callbacks/events.

This PR also fixes the two migration regressions that fell out of that
substrate change:

- `wait { terminate: true }` now waits for the V8 runtime to actually
stop before reporting termination.
- synchronous top-level `exit()` now succeeds again instead of surfacing
as a script error.

---

- `core/src/tools/code_mode/*` is now mostly an adapter layer for the
public `exec` / `wait` tools.
- `code-mode/src/service.rs` owns cell sessions and async control flow
in Rust.
- `code-mode/src/runtime/*.rs` owns the embedded V8 isolate and
JavaScript execution.
- each `exec` spawns a dedicated runtime thread plus a Rust
session-control task.
- helper globals are installed directly into the V8 context instead of
being injected through a source prelude.
- helper modules like `tools.js` and `@openai/code_mode` are synthesized
through V8 module resolution callbacks in Rust.

---

Also added a benchmark for showing the speed of init and use of a code
mode env:
```
$ cargo bench -p codex-code-mode --bench exec_overhead -- --samples 30 --warm-iterations 25 --tool-counts 0,32,128
Finished [`bench` profile [optimized]](https://doc.rust-lang.org/cargo/reference/profiles.html#default-profiles) target(s) in 0.18s
     Running benches/exec_overhead.rs (target/release/deps/exec_overhead-008c440d800545ae)
exec_overhead: samples=30, warm_iterations=25, tool_counts=[0, 32, 128]
scenario       tools samples    warmups      iters      mean/exec       p95/exec       rssΔ p50       rssΔ max
cold_exec          0      30          0          1         1.13ms         1.20ms        8.05MiB        8.06MiB
warm_exec          0      30          1         25       473.43us       512.49us      912.00KiB        1.33MiB
cold_exec         32      30          0          1         1.03ms         1.15ms        8.08MiB        8.11MiB
warm_exec         32      30          1         25       509.73us       545.76us      960.00KiB        1.30MiB
cold_exec        128      30          0          1         1.14ms         1.19ms        8.30MiB        8.34MiB
warm_exec        128      30          1         25       575.08us       591.03us      736.00KiB      864.00KiB
memory uses a fresh-process max RSS delta for each scenario
```

---------

Co-authored-by: Codex <noreply@openai.com>
2026-03-20 23:36:58 -07:00
57 changed files with 5561 additions and 7047 deletions

View File

@@ -132,9 +132,11 @@ runs:
keychain_args+=(--keychain "${APPLE_CODESIGN_KEYCHAIN}")
fi
entitlements_path="$GITHUB_ACTION_PATH/codex.entitlements.plist"
for binary in codex codex-responses-api-proxy; do
path="codex-rs/target/${TARGET}/release/${binary}"
codesign --force --options runtime --timestamp --sign "$APPLE_CODESIGN_IDENTITY" "${keychain_args[@]}" "$path"
codesign --force --options runtime --timestamp --entitlements "$entitlements_path" --sign "$APPLE_CODESIGN_IDENTITY" "${keychain_args[@]}" "$path"
done
- name: Notarize macOS binaries

View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>com.apple.security.cs.allow-jit</key>
<true/>
</dict>
</plist>

20
codex-rs/Cargo.lock generated
View File

@@ -1488,7 +1488,6 @@ dependencies = [
"codex-app-server-protocol",
"codex-arg0",
"codex-core",
"codex-features",
"codex-feedback",
"codex-protocol",
"futures",
@@ -1800,6 +1799,20 @@ dependencies = [
"thiserror 2.0.18",
]
[[package]]
name = "codex-code-mode"
version = "0.0.0"
dependencies = [
"async-trait",
"pretty_assertions",
"serde",
"serde_json",
"tokio",
"tokio-util",
"tracing",
"v8",
]
[[package]]
name = "codex-config"
version = "0.0.0"
@@ -1857,6 +1870,7 @@ dependencies = [
"codex-arg0",
"codex-artifacts",
"codex-async-utils",
"codex-code-mode",
"codex-config",
"codex-connectors",
"codex-exec-server",
@@ -1981,9 +1995,7 @@ dependencies = [
"codex-utils-absolute-path",
"codex-utils-cargo-bin",
"codex-utils-cli",
"codex-utils-elapsed",
"codex-utils-oss",
"codex-utils-sandbox-summary",
"core_test_support",
"libc",
"opentelemetry",
@@ -1991,10 +2003,8 @@ dependencies = [
"owo-colors",
"predicates",
"pretty_assertions",
"rmcp",
"serde",
"serde_json",
"shlex",
"supports-color 3.0.2",
"tempfile",
"tokio",

View File

@@ -13,6 +13,7 @@ members = [
"feedback",
"features",
"codex-backend-openapi-models",
"code-mode",
"cloud-requirements",
"cloud-tasks",
"cloud-tasks-client",
@@ -91,6 +92,7 @@ app_test_support = { path = "app-server/tests/common" }
codex-ansi-escape = { path = "ansi-escape" }
codex-api = { path = "codex-api" }
codex-artifacts = { path = "artifacts" }
codex-code-mode = { path = "code-mode" }
codex-package-manager = { path = "package-manager" }
codex-app-server = { path = "app-server" }
codex-app-server-client = { path = "app-server-client" }
@@ -374,7 +376,7 @@ ignored = [
"openssl-sys",
"codex-utils-readiness",
"codex-secrets",
"codex-v8-poc"
"codex-v8-poc",
]
[profile.release]

View File

@@ -16,7 +16,6 @@ codex-app-server = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-arg0 = { workspace = true }
codex-core = { workspace = true }
codex-features = { workspace = true }
codex-feedback = { workspace = true }
codex-protocol = { workspace = true }
futures = { workspace = true }

View File

@@ -35,19 +35,14 @@ use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::InitializeCapabilities;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result as JsonRpcResult;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_arg0::Arg0DispatchPaths;
use codex_core::AuthManager;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::LoaderOverrides;
use codex_core::models_manager::collaboration_mode_presets::CollaborationModesConfig;
use codex_features::Feature;
use codex_feedback::CodexFeedback;
use codex_protocol::protocol::SessionSource;
use serde::de::DeserializeOwned;
@@ -73,7 +68,6 @@ pub type RequestResult = std::result::Result<JsonRpcResult, JSONRPCErrorError>;
pub enum AppServerEvent {
Lagged { skipped: usize },
ServerNotification(ServerNotification),
LegacyNotification(JSONRPCNotification),
ServerRequest(ServerRequest),
Disconnected { message: String },
}
@@ -85,9 +79,6 @@ impl From<InProcessServerEvent> for AppServerEvent {
InProcessServerEvent::ServerNotification(notification) => {
Self::ServerNotification(notification)
}
InProcessServerEvent::LegacyNotification(notification) => {
Self::LegacyNotification(notification)
}
InProcessServerEvent::ServerRequest(request) => Self::ServerRequest(request),
}
}
@@ -97,19 +88,12 @@ fn event_requires_delivery(event: &InProcessServerEvent) -> bool {
// These terminal events drive surface shutdown/completion state. Dropping
// them under backpressure can leave exec/TUI waiting forever even though
// the underlying turn has already ended.
match event {
matches!(
event,
InProcessServerEvent::ServerNotification(
codex_app_server_protocol::ServerNotification::TurnCompleted(_),
) => true,
InProcessServerEvent::LegacyNotification(notification) => matches!(
notification
.method
.strip_prefix("codex/event/")
.unwrap_or(&notification.method),
"task_complete" | "turn_aborted" | "shutdown_complete"
),
_ => false,
}
)
)
}
/// Layered error for [`InProcessAppServerClient::request_typed`].
@@ -159,16 +143,6 @@ impl Error for TypedRequestError {
}
}
#[derive(Clone)]
struct SharedCoreManagers {
// Temporary bootstrap escape hatch for embedders that still need direct
// core handles during the in-process app-server migration. Once TUI/exec
// stop depending on direct manager access, remove this wrapper and keep
// manager ownership entirely inside the app-server runtime.
auth_manager: Arc<AuthManager>,
thread_manager: Arc<ThreadManager>,
}
#[derive(Clone)]
pub struct InProcessClientStartArgs {
/// Resolved argv0 dispatch paths used by command execution internals.
@@ -202,30 +176,6 @@ pub struct InProcessClientStartArgs {
}
impl InProcessClientStartArgs {
fn shared_core_managers(&self) -> SharedCoreManagers {
let auth_manager = AuthManager::shared(
self.config.codex_home.clone(),
self.enable_codex_api_key_env,
self.config.cli_auth_credentials_store_mode,
);
let thread_manager = Arc::new(ThreadManager::new(
self.config.as_ref(),
auth_manager.clone(),
self.session_source.clone(),
CollaborationModesConfig {
default_mode_request_user_input: self
.config
.features
.enabled(Feature::DefaultModeRequestUserInput),
},
));
SharedCoreManagers {
auth_manager,
thread_manager,
}
}
/// Builds initialize params from caller-provided metadata.
pub fn initialize_params(&self) -> InitializeParams {
let capabilities = InitializeCapabilities {
@@ -247,7 +197,7 @@ impl InProcessClientStartArgs {
}
}
fn into_runtime_start_args(self, shared_core: &SharedCoreManagers) -> InProcessStartArgs {
fn into_runtime_start_args(self) -> InProcessStartArgs {
let initialize = self.initialize_params();
InProcessStartArgs {
arg0_paths: self.arg0_paths,
@@ -255,8 +205,6 @@ impl InProcessClientStartArgs {
cli_overrides: self.cli_overrides,
loader_overrides: self.loader_overrides,
cloud_requirements: self.cloud_requirements,
auth_manager: Some(shared_core.auth_manager.clone()),
thread_manager: Some(shared_core.thread_manager.clone()),
feedback: self.feedback,
config_warnings: self.config_warnings,
session_source: self.session_source,
@@ -310,8 +258,6 @@ pub struct InProcessAppServerClient {
command_tx: mpsc::Sender<ClientCommand>,
event_rx: mpsc::Receiver<InProcessServerEvent>,
worker_handle: tokio::task::JoinHandle<()>,
auth_manager: Arc<AuthManager>,
thread_manager: Arc<ThreadManager>,
}
#[derive(Clone)]
@@ -338,9 +284,8 @@ impl InProcessAppServerClient {
/// with overload error instead of being silently dropped.
pub async fn start(args: InProcessClientStartArgs) -> IoResult<Self> {
let channel_capacity = args.channel_capacity.max(1);
let shared_core = args.shared_core_managers();
let mut handle =
codex_app_server::in_process::start(args.into_runtime_start_args(&shared_core)).await?;
codex_app_server::in_process::start(args.into_runtime_start_args()).await?;
let request_sender = handle.sender();
let (command_tx, mut command_rx) = mpsc::channel::<ClientCommand>(channel_capacity);
let (event_tx, event_rx) = mpsc::channel::<InProcessServerEvent>(channel_capacity);
@@ -401,6 +346,25 @@ impl InProcessAppServerClient {
let Some(event) = event else {
break;
};
if let InProcessServerEvent::ServerRequest(
ServerRequest::ChatgptAuthTokensRefresh { request_id, .. }
) = &event
{
let send_result = request_sender.fail_server_request(
request_id.clone(),
JSONRPCErrorError {
code: -32000,
message: "chatgpt auth token refresh is not supported for in-process app-server clients".to_string(),
data: None,
},
);
if let Err(err) = send_result {
warn!(
"failed to reject unsupported chatgpt auth token refresh request: {err}"
);
}
continue;
}
if skipped_events > 0 {
if event_requires_delivery(&event) {
@@ -491,21 +455,9 @@ impl InProcessAppServerClient {
command_tx,
event_rx,
worker_handle,
auth_manager: shared_core.auth_manager,
thread_manager: shared_core.thread_manager,
})
}
/// Temporary bootstrap escape hatch for embedders migrating toward RPC-only usage.
pub fn auth_manager(&self) -> Arc<AuthManager> {
self.auth_manager.clone()
}
/// Temporary bootstrap escape hatch for embedders migrating toward RPC-only usage.
pub fn thread_manager(&self) -> Arc<ThreadManager> {
self.thread_manager.clone()
}
pub fn request_handle(&self) -> InProcessAppServerRequestHandle {
InProcessAppServerRequestHandle {
command_tx: self.command_tx.clone(),
@@ -664,8 +616,6 @@ impl InProcessAppServerClient {
command_tx,
event_rx,
worker_handle,
auth_manager: _,
thread_manager: _,
} = self;
let mut worker_handle = worker_handle;
// Drop the caller-facing receiver before asking the worker to shut
@@ -857,8 +807,6 @@ mod tests {
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ToolRequestUserInputParams;
use codex_app_server_protocol::ToolRequestUserInputQuestion;
use codex_core::AuthManager;
use codex_core::ThreadManager;
use codex_core::config::ConfigBuilder;
use futures::SinkExt;
use futures::StreamExt;
@@ -1052,7 +1000,7 @@ mod tests {
}
#[tokio::test]
async fn shared_thread_manager_tracks_threads_started_via_app_server() {
async fn threads_started_via_app_server_are_visible_through_typed_requests() {
let client = start_test_client(SessionSource::Cli).await;
let response: ThreadStartResponse = client
@@ -1065,17 +1013,19 @@ mod tests {
})
.await
.expect("thread/start should succeed");
let created_thread_id = codex_protocol::ThreadId::from_string(&response.thread.id)
.expect("thread id should parse");
timeout(
Duration::from_secs(2),
client.thread_manager().get_thread(created_thread_id),
)
.await
.expect("timed out waiting for retained thread manager to observe started thread")
.expect("started thread should be visible through the shared thread manager");
let thread_ids = client.thread_manager().list_thread_ids().await;
assert!(thread_ids.contains(&created_thread_id));
let read = client
.request_typed::<codex_app_server_protocol::ThreadReadResponse>(
ClientRequest::ThreadRead {
request_id: RequestId::Integer(4),
params: codex_app_server_protocol::ThreadReadParams {
thread_id: response.thread.id.clone(),
include_turns: false,
},
},
)
.await
.expect("thread/read should return the newly started thread");
assert_eq!(read.thread.id, response.thread.id);
client.shutdown().await.expect("shutdown should complete");
}
@@ -1472,22 +1422,6 @@ mod tests {
let (command_tx, _command_rx) = mpsc::channel(1);
let (event_tx, event_rx) = mpsc::channel(1);
let worker_handle = tokio::spawn(async {});
let config = build_test_config().await;
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
false,
config.cli_auth_credentials_store_mode,
);
let thread_manager = Arc::new(ThreadManager::new(
&config,
auth_manager.clone(),
SessionSource::Exec,
CollaborationModesConfig {
default_mode_request_user_input: config
.features
.enabled(Feature::DefaultModeRequestUserInput),
},
));
event_tx
.send(InProcessServerEvent::Lagged { skipped: 3 })
.await
@@ -1498,8 +1432,6 @@ mod tests {
command_tx,
event_rx,
worker_handle,
auth_manager,
thread_manager,
};
let event = timeout(Duration::from_secs(2), client.next_event())
@@ -1530,37 +1462,38 @@ mod tests {
)
)
));
assert!(event_requires_delivery(
&InProcessServerEvent::LegacyNotification(
codex_app_server_protocol::JSONRPCNotification {
method: "codex/event/turn_aborted".to_string(),
params: None,
}
)
));
assert!(!event_requires_delivery(&InProcessServerEvent::Lagged {
skipped: 1
}));
}
#[tokio::test]
async fn accessors_expose_retained_shared_managers() {
let client = start_test_client(SessionSource::Cli).await;
async fn runtime_start_args_leave_manager_bootstrap_to_app_server() {
let config = Arc::new(build_test_config().await);
assert!(
Arc::ptr_eq(&client.auth_manager(), &client.auth_manager()),
"auth_manager accessor should clone the retained shared manager"
);
assert!(
Arc::ptr_eq(&client.thread_manager(), &client.thread_manager()),
"thread_manager accessor should clone the retained shared manager"
);
let runtime_args = InProcessClientStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config: config.clone(),
cli_overrides: Vec::new(),
loader_overrides: LoaderOverrides::default(),
cloud_requirements: CloudRequirementsLoader::default(),
feedback: CodexFeedback::new(),
config_warnings: Vec::new(),
session_source: SessionSource::Exec,
enable_codex_api_key_env: false,
client_name: "codex-app-server-client-test".to_string(),
client_version: "0.0.0-test".to_string(),
experimental_api: true,
opt_out_notification_methods: Vec::new(),
channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
}
.into_runtime_start_args();
client.shutdown().await.expect("shutdown should complete");
assert_eq!(runtime_args.config, config);
}
#[tokio::test]
async fn shutdown_completes_promptly_with_retained_shared_managers() {
async fn shutdown_completes_promptly_without_retained_managers() {
let client = start_test_client(SessionSource::Cli).await;
timeout(Duration::from_secs(1), client.shutdown())

View File

@@ -272,18 +272,19 @@ impl RemoteAppServerClient {
}
}
Ok(JSONRPCMessage::Notification(notification)) => {
let event = app_server_event_from_notification(notification);
if let Err(err) = deliver_event(
&event_tx,
&mut skipped_events,
event,
&mut stream,
)
.await
{
warn!(%err, "failed to deliver remote app-server event");
break;
}
if let Some(event) =
app_server_event_from_notification(notification)
&& let Err(err) = deliver_event(
&event_tx,
&mut skipped_events,
event,
&mut stream,
)
.await
{
warn!(%err, "failed to deliver remote app-server event");
break;
}
}
Ok(JSONRPCMessage::Request(request)) => {
let request_id = request.id.clone();
@@ -673,7 +674,9 @@ async fn initialize_remote_connection(
)));
}
JSONRPCMessage::Notification(notification) => {
pending_events.push(app_server_event_from_notification(notification));
if let Some(event) = app_server_event_from_notification(notification) {
pending_events.push(event);
}
}
JSONRPCMessage::Request(request) => {
let request_id = request.id.clone();
@@ -756,10 +759,10 @@ async fn initialize_remote_connection(
Ok(pending_events)
}
fn app_server_event_from_notification(notification: JSONRPCNotification) -> AppServerEvent {
match ServerNotification::try_from(notification.clone()) {
Ok(notification) => AppServerEvent::ServerNotification(notification),
Err(_) => AppServerEvent::LegacyNotification(notification),
fn app_server_event_from_notification(notification: JSONRPCNotification) -> Option<AppServerEvent> {
match ServerNotification::try_from(notification) {
Ok(notification) => Some(AppServerEvent::ServerNotification(notification)),
Err(_) => None,
}
}
@@ -852,13 +855,6 @@ async fn reject_if_server_request_dropped(
fn event_requires_delivery(event: &AppServerEvent) -> bool {
match event {
AppServerEvent::ServerNotification(ServerNotification::TurnCompleted(_)) => true,
AppServerEvent::LegacyNotification(notification) => matches!(
notification
.method
.strip_prefix("codex/event/")
.unwrap_or(&notification.method),
"task_complete" | "turn_aborted" | "shutdown_complete"
),
AppServerEvent::Disconnected { .. } => true,
AppServerEvent::Lagged { .. }
| AppServerEvent::ServerNotification(_)

View File

@@ -68,14 +68,11 @@ use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::Result;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_arg0::Arg0DispatchPaths;
use codex_core::AuthManager;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::LoaderOverrides;
@@ -98,16 +95,6 @@ fn server_notification_requires_delivery(notification: &ServerNotification) -> b
matches!(notification, ServerNotification::TurnCompleted(_))
}
fn legacy_notification_requires_delivery(notification: &JSONRPCNotification) -> bool {
matches!(
notification
.method
.strip_prefix("codex/event/")
.unwrap_or(&notification.method),
"task_complete" | "turn_aborted" | "shutdown_complete"
)
}
/// Input needed to start an in-process app-server runtime.
///
/// These fields mirror the pieces of ambient process state that stdio and
@@ -124,10 +111,6 @@ pub struct InProcessStartArgs {
pub loader_overrides: LoaderOverrides,
/// Preloaded cloud requirements provider.
pub cloud_requirements: CloudRequirementsLoader,
/// Optional prebuilt auth manager reused by an embedding caller.
pub auth_manager: Option<Arc<AuthManager>>,
/// Optional prebuilt thread manager reused by an embedding caller.
pub thread_manager: Option<Arc<ThreadManager>>,
/// Feedback sink used by app-server/core telemetry and logs.
pub feedback: CodexFeedback,
/// Startup warnings emitted after initialize succeeds.
@@ -144,11 +127,6 @@ pub struct InProcessStartArgs {
/// Event emitted from the app-server to the in-process client.
///
/// The stream carries three event families because CLI surfaces are mid-migration
/// from the legacy `codex_protocol::Event` model to the typed app-server
/// notification model. Once all surfaces consume only [`ServerNotification`],
/// [`LegacyNotification`](Self::LegacyNotification) can be removed.
///
/// [`Lagged`](Self::Lagged) is a transport health marker, not an application
/// event — it signals that the consumer fell behind and some events were dropped.
#[derive(Debug, Clone)]
@@ -157,8 +135,6 @@ pub enum InProcessServerEvent {
ServerRequest(ServerRequest),
/// App-server notification directed to the embedded client.
ServerNotification(ServerNotification),
/// Legacy JSON-RPC notification from core event bridge.
LegacyNotification(JSONRPCNotification),
/// Indicates one or more events were dropped due to backpressure.
Lagged { skipped: usize },
}
@@ -390,7 +366,6 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
Arc::clone(&outbound_initialized),
Arc::clone(&outbound_experimental_api_enabled),
Arc::clone(&outbound_opted_out_notification_methods),
/*allow_legacy_notifications*/ true,
/*disconnect_sender*/ None,
),
);
@@ -410,8 +385,6 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
cli_overrides: args.cli_overrides,
loader_overrides: args.loader_overrides,
cloud_requirements: args.cloud_requirements,
auth_manager: args.auth_manager,
thread_manager: args.thread_manager,
feedback: args.feedback,
log_db: None,
config_warnings: args.config_warnings,
@@ -655,32 +628,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle {
}
}
}
OutgoingMessage::Notification(notification) => {
let notification = JSONRPCNotification {
method: notification.method,
params: notification.params,
};
if legacy_notification_requires_delivery(&notification) {
if event_tx
.send(InProcessServerEvent::LegacyNotification(notification))
.await
.is_err()
{
break;
}
} else if let Err(send_error) =
event_tx.try_send(InProcessServerEvent::LegacyNotification(notification))
{
match send_error {
mpsc::error::TrySendError::Full(_) => {
warn!("dropping in-process legacy notification (queue full)");
}
mpsc::error::TrySendError::Closed(_) => {
break;
}
}
}
}
OutgoingMessage::Notification(_notification) => {}
}
}
}
@@ -759,8 +707,6 @@ mod tests {
cli_overrides: Vec::new(),
loader_overrides: LoaderOverrides::default(),
cloud_requirements: CloudRequirementsLoader::default(),
auth_manager: None,
thread_manager: None,
feedback: CodexFeedback::new(),
config_warnings: Vec::new(),
session_source,
@@ -858,7 +804,7 @@ mod tests {
}
#[test]
fn guaranteed_delivery_helpers_cover_terminal_notifications() {
fn guaranteed_delivery_helpers_cover_terminal_server_notifications() {
assert!(server_notification_requires_delivery(
&ServerNotification::TurnCompleted(TurnCompletedNotification {
thread_id: "thread-1".to_string(),
@@ -870,30 +816,5 @@ mod tests {
},
})
));
assert!(legacy_notification_requires_delivery(
&JSONRPCNotification {
method: "codex/event/task_complete".to_string(),
params: None,
}
));
assert!(legacy_notification_requires_delivery(
&JSONRPCNotification {
method: "codex/event/turn_aborted".to_string(),
params: None,
}
));
assert!(legacy_notification_requires_delivery(
&JSONRPCNotification {
method: "codex/event/shutdown_complete".to_string(),
params: None,
}
));
assert!(!legacy_notification_requires_delivery(
&JSONRPCNotification {
method: "codex/event/item_started".to_string(),
params: None,
}
));
}
}

View File

@@ -104,8 +104,6 @@ enum OutboundControlEvent {
Opened {
connection_id: ConnectionId,
writer: mpsc::Sender<crate::outgoing_message::OutgoingMessage>,
// Allow codex/event/* notifications to be emitted.
allow_legacy_notifications: bool,
disconnect_sender: Option<CancellationToken>,
initialized: Arc<AtomicBool>,
experimental_api_enabled: Arc<AtomicBool>,
@@ -562,7 +560,6 @@ pub async fn run_main_with_transport(
OutboundControlEvent::Opened {
connection_id,
writer,
allow_legacy_notifications,
disconnect_sender,
initialized,
experimental_api_enabled,
@@ -575,7 +572,6 @@ pub async fn run_main_with_transport(
initialized,
experimental_api_enabled,
opted_out_notification_methods,
allow_legacy_notifications,
disconnect_sender,
),
);
@@ -618,8 +614,6 @@ pub async fn run_main_with_transport(
cli_overrides,
loader_overrides,
cloud_requirements: cloud_requirements.clone(),
auth_manager: None,
thread_manager: None,
feedback: feedback.clone(),
log_db,
config_warnings,
@@ -675,7 +669,6 @@ pub async fn run_main_with_transport(
TransportEvent::ConnectionOpened {
connection_id,
writer,
allow_legacy_notifications,
disconnect_sender,
} => {
let outbound_initialized = Arc::new(AtomicBool::new(false));
@@ -687,7 +680,6 @@ pub async fn run_main_with_transport(
.send(OutboundControlEvent::Opened {
connection_id,
writer,
allow_legacy_notifications,
disconnect_sender,
initialized: Arc::clone(&outbound_initialized),
experimental_api_enabled: Arc::clone(

View File

@@ -172,8 +172,6 @@ pub(crate) struct MessageProcessorArgs {
pub(crate) cli_overrides: Vec<(String, TomlValue)>,
pub(crate) loader_overrides: LoaderOverrides,
pub(crate) cloud_requirements: CloudRequirementsLoader,
pub(crate) auth_manager: Option<Arc<AuthManager>>,
pub(crate) thread_manager: Option<Arc<ThreadManager>>,
pub(crate) feedback: CodexFeedback,
pub(crate) log_db: Option<LogDbLayer>,
pub(crate) config_warnings: Vec<ConfigWarningNotification>,
@@ -192,36 +190,27 @@ impl MessageProcessor {
cli_overrides,
loader_overrides,
cloud_requirements,
auth_manager,
thread_manager,
feedback,
log_db,
config_warnings,
session_source,
enable_codex_api_key_env,
} = args;
let (auth_manager, thread_manager) = match (auth_manager, thread_manager) {
(Some(auth_manager), Some(thread_manager)) => (auth_manager, thread_manager),
(None, None) => {
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
enable_codex_api_key_env,
config.cli_auth_credentials_store_mode,
);
let thread_manager = Arc::new(ThreadManager::new(
config.as_ref(),
auth_manager.clone(),
session_source,
CollaborationModesConfig {
default_mode_request_user_input: config
.features
.enabled(Feature::DefaultModeRequestUserInput),
},
));
(auth_manager, thread_manager)
}
_ => panic!("MessageProcessorArgs must provide both auth_manager and thread_manager"),
};
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
enable_codex_api_key_env,
config.cli_auth_credentials_store_mode,
);
let thread_manager = Arc::new(ThreadManager::new(
config.as_ref(),
auth_manager.clone(),
session_source,
CollaborationModesConfig {
default_mode_request_user_input: config
.features
.enabled(Feature::DefaultModeRequestUserInput),
},
));
auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id.clone());
auth_manager.set_external_auth_refresher(Arc::new(ExternalAuthRefreshBridge {
outgoing: outgoing.clone(),

View File

@@ -239,8 +239,6 @@ fn build_test_processor(
cli_overrides: Vec::new(),
loader_overrides: LoaderOverrides::default(),
cloud_requirements: CloudRequirementsLoader::default(),
auth_manager: None,
thread_manager: None,
feedback: CodexFeedback::new(),
log_db: None,
config_warnings: Vec::new(),

View File

@@ -188,7 +188,6 @@ pub(crate) enum TransportEvent {
ConnectionOpened {
connection_id: ConnectionId,
writer: mpsc::Sender<OutgoingMessage>,
allow_legacy_notifications: bool,
disconnect_sender: Option<CancellationToken>,
},
ConnectionClosed {
@@ -226,7 +225,6 @@ pub(crate) struct OutboundConnectionState {
pub(crate) initialized: Arc<AtomicBool>,
pub(crate) experimental_api_enabled: Arc<AtomicBool>,
pub(crate) opted_out_notification_methods: Arc<RwLock<HashSet<String>>>,
pub(crate) allow_legacy_notifications: bool,
pub(crate) writer: mpsc::Sender<OutgoingMessage>,
disconnect_sender: Option<CancellationToken>,
}
@@ -237,14 +235,12 @@ impl OutboundConnectionState {
initialized: Arc<AtomicBool>,
experimental_api_enabled: Arc<AtomicBool>,
opted_out_notification_methods: Arc<RwLock<HashSet<String>>>,
allow_legacy_notifications: bool,
disconnect_sender: Option<CancellationToken>,
) -> Self {
Self {
initialized,
experimental_api_enabled,
opted_out_notification_methods,
allow_legacy_notifications,
writer,
disconnect_sender,
}
@@ -272,7 +268,6 @@ pub(crate) async fn start_stdio_connection(
.send(TransportEvent::ConnectionOpened {
connection_id,
writer: writer_tx,
allow_legacy_notifications: false,
disconnect_sender: None,
})
.await
@@ -376,7 +371,6 @@ async fn run_websocket_connection(
.send(TransportEvent::ConnectionOpened {
connection_id,
writer: writer_tx,
allow_legacy_notifications: false,
disconnect_sender: Some(disconnect_token.clone()),
})
.await
@@ -584,13 +578,10 @@ fn should_skip_notification_for_connection(
connection_state: &OutboundConnectionState,
message: &OutgoingMessage,
) -> bool {
if !connection_state.allow_legacy_notifications
&& matches!(message, OutgoingMessage::Notification(_))
{
if matches!(message, OutgoingMessage::Notification(_)) {
// Raw legacy `codex/event/*` notifications are still emitted upstream
// for in-process compatibility, but they are no longer part of the
// external app-server contract. Keep dropping them here until the
// producer path can be deleted entirely.
// for compatibility, but they are no longer part of the app-server
// client contract. Drop them until the producer path can be deleted.
return true;
}
@@ -970,7 +961,6 @@ mod tests {
initialized,
Arc::new(AtomicBool::new(true)),
opted_out_notification_methods,
false,
None,
),
);
@@ -1008,7 +998,6 @@ mod tests {
Arc::new(AtomicBool::new(true)),
Arc::new(AtomicBool::new(true)),
Arc::new(RwLock::new(HashSet::new())),
false,
None,
),
);
@@ -1034,7 +1023,7 @@ mod tests {
}
#[tokio::test]
async fn to_connection_legacy_notifications_are_preserved_for_in_process_clients() {
async fn to_connection_legacy_notifications_are_dropped_for_in_process_clients() {
let connection_id = ConnectionId(11);
let (writer_tx, mut writer_rx) = mpsc::channel(1);
@@ -1046,7 +1035,6 @@ mod tests {
Arc::new(AtomicBool::new(true)),
Arc::new(AtomicBool::new(true)),
Arc::new(RwLock::new(HashSet::new())),
true,
None,
),
);
@@ -1065,17 +1053,10 @@ mod tests {
)
.await;
let message = writer_rx
.recv()
.await
.expect("legacy notification should reach in-process clients");
assert!(matches!(
message,
OutgoingMessage::Notification(crate::outgoing_message::OutgoingNotification {
method,
params: None,
}) if method == "codex/event/task_started"
));
assert!(
writer_rx.try_recv().is_err(),
"legacy notifications should not reach in-process clients"
);
}
#[tokio::test]
@@ -1091,7 +1072,6 @@ mod tests {
Arc::new(AtomicBool::new(true)),
Arc::new(AtomicBool::new(false)),
Arc::new(RwLock::new(HashSet::new())),
false,
None,
),
);
@@ -1158,7 +1138,6 @@ mod tests {
Arc::new(AtomicBool::new(true)),
Arc::new(AtomicBool::new(true)),
Arc::new(RwLock::new(HashSet::new())),
false,
None,
),
);
@@ -1246,7 +1225,6 @@ mod tests {
Arc::new(AtomicBool::new(true)),
Arc::new(AtomicBool::new(true)),
Arc::new(RwLock::new(HashSet::new())),
false,
Some(fast_disconnect_token.clone()),
),
);
@@ -1257,7 +1235,6 @@ mod tests {
Arc::new(AtomicBool::new(true)),
Arc::new(AtomicBool::new(true)),
Arc::new(RwLock::new(HashSet::new())),
false,
Some(slow_disconnect_token.clone()),
),
);
@@ -1329,7 +1306,6 @@ mod tests {
Arc::new(AtomicBool::new(true)),
Arc::new(AtomicBool::new(true)),
Arc::new(RwLock::new(HashSet::new())),
false,
None,
),
);

View File

@@ -0,0 +1,6 @@
load("//:defs.bzl", "codex_rust_crate")
codex_rust_crate(
name = "code-mode",
crate_name = "codex_code_mode",
)

View File

@@ -0,0 +1,25 @@
[package]
edition.workspace = true
license.workspace = true
name = "codex-code-mode"
version.workspace = true
[lib]
doctest = false
name = "codex_code_mode"
path = "src/lib.rs"
[lints]
workspace = true
[dependencies]
async-trait = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "sync", "time"] }
tokio-util = { workspace = true, features = ["rt"] }
tracing = { workspace = true }
v8 = { workspace = true }
[dev-dependencies]
pretty_assertions = { workspace = true }

View File

@@ -0,0 +1,555 @@
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JsonValue;
use crate::PUBLIC_TOOL_NAME;
const MAX_JS_SAFE_INTEGER: u64 = (1_u64 << 53) - 1;
const CODE_MODE_ONLY_PREFACE: &str =
"Use `exec/wait` tool to run all other tools, do not attempt to use any other tools directly";
const EXEC_DESCRIPTION_TEMPLATE: &str = r#"## exec
- Runs raw JavaScript in an isolated context (no Node, no file system, or network access, no console).
- Send raw JavaScript source text, not JSON, quoted strings, or markdown code fences.
- You may optionally start the tool input with a first-line pragma like `// @exec: {"yield_time_ms": 10000, "max_output_tokens": 1000}`.
- `yield_time_ms` asks `exec` to yield early after that many milliseconds if the script is still running.
- `max_output_tokens` sets the token budget for direct `exec` results. By default the result is truncated to 10000 tokens.
- All nested tools are available on the global `tools` object, for example `await tools.exec_command(...)`. Tool names are exposed as normalized JavaScript identifiers, for example `await tools.mcp__ologs__get_profile(...)`.
- Tool methods take either string or object as parameter.
- They return either a structured value or a string based on the description above.
- Global helpers:
- `exit()`: Immediately ends the current script successfully (like an early return from the top level).
- `text(value: string | number | boolean | undefined | null)`: Appends a text item. Non-string values are stringified with `JSON.stringify(...)` when possible.
- `image(imageUrlOrItem: string | { image_url: string; detail?: "auto" | "low" | "high" | "original" | null })`: Appends an image item. `image_url` can be an HTTPS URL or a base64-encoded `data:` URL.
- `store(key: string, value: any)`: stores a serializable value under a string key for later `exec` calls in the same session.
- `load(key: string)`: returns the stored value for a string key, or `undefined` if it is missing.
- `notify(value: string | number | boolean | undefined | null)`: immediately injects an extra `custom_tool_call_output` for the current `exec` call. Values are stringified like `text(...)`.
- `ALL_TOOLS`: metadata for the enabled nested tools as `{ name, description }` entries.
- `yield_control()`: yields the accumulated output to the model immediately while the script keeps running."#;
const WAIT_DESCRIPTION_TEMPLATE: &str = r#"- Use `wait` only after `exec` returns `Script running with cell ID ...`.
- `cell_id` identifies the running `exec` cell to resume.
- `yield_time_ms` controls how long to wait for more output before yielding again. If omitted, `wait` uses its default wait timeout.
- `max_tokens` limits how much new output this wait call returns.
- `terminate: true` stops the running cell instead of waiting for more output.
- `wait` returns only the new output since the last yield, or the final completion or termination result for that cell.
- If the cell is still running, `wait` may yield again with the same `cell_id`.
- If the cell has already finished, `wait` returns the completed result and closes the cell."#;
pub const CODE_MODE_PRAGMA_PREFIX: &str = "// @exec:";
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum CodeModeToolKind {
Function,
Freeform,
}
#[derive(Clone, Debug, PartialEq)]
pub struct ToolDefinition {
pub name: String,
pub description: String,
pub kind: CodeModeToolKind,
pub input_schema: Option<JsonValue>,
pub output_schema: Option<JsonValue>,
}
#[derive(Debug, Default, Deserialize, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
struct CodeModeExecPragma {
#[serde(default)]
yield_time_ms: Option<u64>,
#[serde(default)]
max_output_tokens: Option<usize>,
}
#[derive(Debug, PartialEq, Eq)]
pub struct ParsedExecSource {
pub code: String,
pub yield_time_ms: Option<u64>,
pub max_output_tokens: Option<usize>,
}
pub fn parse_exec_source(input: &str) -> Result<ParsedExecSource, String> {
if input.trim().is_empty() {
return Err(
"exec expects raw JavaScript source text (non-empty). Provide JS only, optionally with first-line `// @exec: {\"yield_time_ms\": 10000, \"max_output_tokens\": 1000}`.".to_string(),
);
}
let mut args = ParsedExecSource {
code: input.to_string(),
yield_time_ms: None,
max_output_tokens: None,
};
let mut lines = input.splitn(2, '\n');
let first_line = lines.next().unwrap_or_default();
let rest = lines.next().unwrap_or_default();
let trimmed = first_line.trim_start();
let Some(pragma) = trimmed.strip_prefix(CODE_MODE_PRAGMA_PREFIX) else {
return Ok(args);
};
if rest.trim().is_empty() {
return Err(
"exec pragma must be followed by JavaScript source on subsequent lines".to_string(),
);
}
let directive = pragma.trim();
if directive.is_empty() {
return Err(
"exec pragma must be a JSON object with supported fields `yield_time_ms` and `max_output_tokens`"
.to_string(),
);
}
let value: serde_json::Value = serde_json::from_str(directive).map_err(|err| {
format!(
"exec pragma must be valid JSON with supported fields `yield_time_ms` and `max_output_tokens`: {err}"
)
})?;
let object = value.as_object().ok_or_else(|| {
"exec pragma must be a JSON object with supported fields `yield_time_ms` and `max_output_tokens`"
.to_string()
})?;
for key in object.keys() {
match key.as_str() {
"yield_time_ms" | "max_output_tokens" => {}
_ => {
return Err(format!(
"exec pragma only supports `yield_time_ms` and `max_output_tokens`; got `{key}`"
));
}
}
}
let pragma: CodeModeExecPragma = serde_json::from_value(value).map_err(|err| {
format!(
"exec pragma fields `yield_time_ms` and `max_output_tokens` must be non-negative safe integers: {err}"
)
})?;
if pragma
.yield_time_ms
.is_some_and(|yield_time_ms| yield_time_ms > MAX_JS_SAFE_INTEGER)
{
return Err(
"exec pragma field `yield_time_ms` must be a non-negative safe integer".to_string(),
);
}
if pragma.max_output_tokens.is_some_and(|max_output_tokens| {
u64::try_from(max_output_tokens)
.map(|max_output_tokens| max_output_tokens > MAX_JS_SAFE_INTEGER)
.unwrap_or(true)
}) {
return Err(
"exec pragma field `max_output_tokens` must be a non-negative safe integer".to_string(),
);
}
args.code = rest.to_string();
args.yield_time_ms = pragma.yield_time_ms;
args.max_output_tokens = pragma.max_output_tokens;
Ok(args)
}
pub fn is_code_mode_nested_tool(tool_name: &str) -> bool {
tool_name != crate::PUBLIC_TOOL_NAME && tool_name != crate::WAIT_TOOL_NAME
}
pub fn build_exec_tool_description(
enabled_tools: &[(String, String)],
code_mode_only: bool,
) -> String {
if !code_mode_only {
return EXEC_DESCRIPTION_TEMPLATE.to_string();
}
let mut sections = vec![
CODE_MODE_ONLY_PREFACE.to_string(),
EXEC_DESCRIPTION_TEMPLATE.to_string(),
];
if !enabled_tools.is_empty() {
let nested_tool_reference = enabled_tools
.iter()
.map(|(name, nested_description)| {
let global_name = normalize_code_mode_identifier(name);
format!(
"### `{global_name}` (`{name}`)\n{}",
nested_description.trim()
)
})
.collect::<Vec<_>>()
.join("\n\n");
sections.push(nested_tool_reference);
}
sections.join("\n\n")
}
pub fn build_wait_tool_description() -> &'static str {
WAIT_DESCRIPTION_TEMPLATE
}
pub fn normalize_code_mode_identifier(tool_key: &str) -> String {
let mut identifier = String::new();
for (index, ch) in tool_key.chars().enumerate() {
let is_valid = if index == 0 {
ch == '_' || ch == '$' || ch.is_ascii_alphabetic()
} else {
ch == '_' || ch == '$' || ch.is_ascii_alphanumeric()
};
if is_valid {
identifier.push(ch);
} else {
identifier.push('_');
}
}
if identifier.is_empty() {
"_".to_string()
} else {
identifier
}
}
pub fn augment_tool_definition(mut definition: ToolDefinition) -> ToolDefinition {
if definition.name != PUBLIC_TOOL_NAME {
definition.description = append_code_mode_sample_for_definition(&definition);
}
definition
}
pub fn enabled_tool_metadata(definition: &ToolDefinition) -> EnabledToolMetadata {
EnabledToolMetadata {
tool_name: definition.name.clone(),
global_name: normalize_code_mode_identifier(&definition.name),
description: definition.description.clone(),
kind: definition.kind,
}
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize)]
pub struct EnabledToolMetadata {
pub tool_name: String,
pub global_name: String,
pub description: String,
pub kind: CodeModeToolKind,
}
pub fn append_code_mode_sample(
description: &str,
tool_name: &str,
input_name: &str,
input_type: String,
output_type: String,
) -> String {
let declaration = format!(
"declare const tools: {{ {} }};",
render_code_mode_tool_declaration(tool_name, input_name, input_type, output_type)
);
format!("{description}\n\nexec tool declaration:\n```ts\n{declaration}\n```")
}
fn append_code_mode_sample_for_definition(definition: &ToolDefinition) -> String {
let input_name = match definition.kind {
CodeModeToolKind::Function => "args",
CodeModeToolKind::Freeform => "input",
};
let input_type = match definition.kind {
CodeModeToolKind::Function => definition
.input_schema
.as_ref()
.map(render_json_schema_to_typescript)
.unwrap_or_else(|| "unknown".to_string()),
CodeModeToolKind::Freeform => "string".to_string(),
};
let output_type = definition
.output_schema
.as_ref()
.map(render_json_schema_to_typescript)
.unwrap_or_else(|| "unknown".to_string());
append_code_mode_sample(
&definition.description,
&definition.name,
input_name,
input_type,
output_type,
)
}
fn render_code_mode_tool_declaration(
tool_name: &str,
input_name: &str,
input_type: String,
output_type: String,
) -> String {
let tool_name = normalize_code_mode_identifier(tool_name);
format!("{tool_name}({input_name}: {input_type}): Promise<{output_type}>;")
}
pub fn render_json_schema_to_typescript(schema: &JsonValue) -> String {
render_json_schema_to_typescript_inner(schema)
}
fn render_json_schema_to_typescript_inner(schema: &JsonValue) -> String {
match schema {
JsonValue::Bool(true) => "unknown".to_string(),
JsonValue::Bool(false) => "never".to_string(),
JsonValue::Object(map) => {
if let Some(value) = map.get("const") {
return render_json_schema_literal(value);
}
if let Some(values) = map.get("enum").and_then(JsonValue::as_array) {
let rendered = values
.iter()
.map(render_json_schema_literal)
.collect::<Vec<_>>();
if !rendered.is_empty() {
return rendered.join(" | ");
}
}
for key in ["anyOf", "oneOf"] {
if let Some(variants) = map.get(key).and_then(JsonValue::as_array) {
let rendered = variants
.iter()
.map(render_json_schema_to_typescript_inner)
.collect::<Vec<_>>();
if !rendered.is_empty() {
return rendered.join(" | ");
}
}
}
if let Some(variants) = map.get("allOf").and_then(JsonValue::as_array) {
let rendered = variants
.iter()
.map(render_json_schema_to_typescript_inner)
.collect::<Vec<_>>();
if !rendered.is_empty() {
return rendered.join(" & ");
}
}
if let Some(schema_type) = map.get("type") {
if let Some(types) = schema_type.as_array() {
let rendered = types
.iter()
.filter_map(JsonValue::as_str)
.map(|schema_type| render_json_schema_type_keyword(map, schema_type))
.collect::<Vec<_>>();
if !rendered.is_empty() {
return rendered.join(" | ");
}
}
if let Some(schema_type) = schema_type.as_str() {
return render_json_schema_type_keyword(map, schema_type);
}
}
if map.contains_key("properties")
|| map.contains_key("additionalProperties")
|| map.contains_key("required")
{
return render_json_schema_object(map);
}
if map.contains_key("items") || map.contains_key("prefixItems") {
return render_json_schema_array(map);
}
"unknown".to_string()
}
_ => "unknown".to_string(),
}
}
fn render_json_schema_type_keyword(
map: &serde_json::Map<String, JsonValue>,
schema_type: &str,
) -> String {
match schema_type {
"string" => "string".to_string(),
"number" | "integer" => "number".to_string(),
"boolean" => "boolean".to_string(),
"null" => "null".to_string(),
"array" => render_json_schema_array(map),
"object" => render_json_schema_object(map),
_ => "unknown".to_string(),
}
}
fn render_json_schema_array(map: &serde_json::Map<String, JsonValue>) -> String {
if let Some(items) = map.get("items") {
let item_type = render_json_schema_to_typescript_inner(items);
return format!("Array<{item_type}>");
}
if let Some(items) = map.get("prefixItems").and_then(JsonValue::as_array) {
let item_types = items
.iter()
.map(render_json_schema_to_typescript_inner)
.collect::<Vec<_>>();
if !item_types.is_empty() {
return format!("[{}]", item_types.join(", "));
}
}
"unknown[]".to_string()
}
fn render_json_schema_object(map: &serde_json::Map<String, JsonValue>) -> String {
let required = map
.get("required")
.and_then(JsonValue::as_array)
.map(|items| {
items
.iter()
.filter_map(JsonValue::as_str)
.collect::<Vec<_>>()
})
.unwrap_or_default();
let properties = map
.get("properties")
.and_then(JsonValue::as_object)
.cloned()
.unwrap_or_default();
let mut sorted_properties = properties.iter().collect::<Vec<_>>();
sorted_properties.sort_unstable_by(|(name_a, _), (name_b, _)| name_a.cmp(name_b));
let mut lines = sorted_properties
.into_iter()
.map(|(name, value)| {
let optional = if required.iter().any(|required_name| required_name == name) {
""
} else {
"?"
};
let property_name = render_json_schema_property_name(name);
let property_type = render_json_schema_to_typescript_inner(value);
format!("{property_name}{optional}: {property_type};")
})
.collect::<Vec<_>>();
if let Some(additional_properties) = map.get("additionalProperties") {
let property_type = match additional_properties {
JsonValue::Bool(true) => Some("unknown".to_string()),
JsonValue::Bool(false) => None,
value => Some(render_json_schema_to_typescript_inner(value)),
};
if let Some(property_type) = property_type {
lines.push(format!("[key: string]: {property_type};"));
}
} else if properties.is_empty() {
lines.push("[key: string]: unknown;".to_string());
}
if lines.is_empty() {
return "{}".to_string();
}
format!("{{ {} }}", lines.join(" "))
}
fn render_json_schema_property_name(name: &str) -> String {
if normalize_code_mode_identifier(name) == name {
name.to_string()
} else {
serde_json::to_string(name).unwrap_or_else(|_| format!("\"{}\"", name.replace('"', "\\\"")))
}
}
fn render_json_schema_literal(value: &JsonValue) -> String {
serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string())
}
#[cfg(test)]
mod tests {
use super::CodeModeToolKind;
use super::ParsedExecSource;
use super::ToolDefinition;
use super::augment_tool_definition;
use super::build_exec_tool_description;
use super::normalize_code_mode_identifier;
use super::parse_exec_source;
use pretty_assertions::assert_eq;
use serde_json::json;
#[test]
fn parse_exec_source_without_pragma() {
assert_eq!(
parse_exec_source("text('hi')").unwrap(),
ParsedExecSource {
code: "text('hi')".to_string(),
yield_time_ms: None,
max_output_tokens: None,
}
);
}
#[test]
fn parse_exec_source_with_pragma() {
assert_eq!(
parse_exec_source("// @exec: {\"yield_time_ms\": 10}\ntext('hi')").unwrap(),
ParsedExecSource {
code: "text('hi')".to_string(),
yield_time_ms: Some(10),
max_output_tokens: None,
}
);
}
#[test]
fn normalize_identifier_rewrites_invalid_characters() {
assert_eq!(
"mcp__ologs__get_profile",
normalize_code_mode_identifier("mcp__ologs__get_profile")
);
assert_eq!(
"hidden_dynamic_tool",
normalize_code_mode_identifier("hidden-dynamic-tool")
);
}
#[test]
fn augment_tool_definition_appends_typed_declaration() {
let definition = ToolDefinition {
name: "hidden_dynamic_tool".to_string(),
description: "Test tool".to_string(),
kind: CodeModeToolKind::Function,
input_schema: Some(json!({
"type": "object",
"properties": { "city": { "type": "string" } },
"required": ["city"],
"additionalProperties": false
})),
output_schema: Some(json!({
"type": "object",
"properties": { "ok": { "type": "boolean" } },
"required": ["ok"]
})),
};
let description = augment_tool_definition(definition).description;
assert!(description.contains("declare const tools"));
assert!(
description.contains(
"hidden_dynamic_tool(args: { city: string; }): Promise<{ ok: boolean; }>;"
)
);
}
#[test]
fn code_mode_only_description_includes_nested_tools() {
let description =
build_exec_tool_description(&[("foo".to_string(), "bar".to_string())], true);
assert!(description.contains("### `foo` (`foo`)"));
}
}

View File

@@ -0,0 +1,30 @@
mod description;
mod response;
mod runtime;
mod service;
pub use description::CODE_MODE_PRAGMA_PREFIX;
pub use description::CodeModeToolKind;
pub use description::ToolDefinition;
pub use description::append_code_mode_sample;
pub use description::augment_tool_definition;
pub use description::build_exec_tool_description;
pub use description::build_wait_tool_description;
pub use description::is_code_mode_nested_tool;
pub use description::normalize_code_mode_identifier;
pub use description::parse_exec_source;
pub use description::render_json_schema_to_typescript;
pub use response::FunctionCallOutputContentItem;
pub use response::ImageDetail;
pub use runtime::DEFAULT_EXEC_YIELD_TIME_MS;
pub use runtime::DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL;
pub use runtime::DEFAULT_WAIT_YIELD_TIME_MS;
pub use runtime::ExecuteRequest;
pub use runtime::RuntimeResponse;
pub use runtime::WaitRequest;
pub use service::CodeModeService;
pub use service::CodeModeTurnHost;
pub use service::CodeModeTurnWorker;
pub const PUBLIC_TOOL_NAME: &str = "exec";
pub const WAIT_TOOL_NAME: &str = "wait";

View File

@@ -0,0 +1,24 @@
use serde::Deserialize;
use serde::Serialize;
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum ImageDetail {
Auto,
Low,
High,
Original,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum FunctionCallOutputContentItem {
InputText {
text: String,
},
InputImage {
image_url: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
detail: Option<ImageDetail>,
},
}

View File

@@ -0,0 +1,209 @@
use crate::response::FunctionCallOutputContentItem;
use super::EXIT_SENTINEL;
use super::RuntimeEvent;
use super::RuntimeState;
use super::value::json_to_v8;
use super::value::normalize_output_image;
use super::value::serialize_output_text;
use super::value::throw_type_error;
use super::value::v8_value_to_json;
pub(super) fn tool_callback(
scope: &mut v8::PinScope<'_, '_>,
args: v8::FunctionCallbackArguments,
mut retval: v8::ReturnValue<v8::Value>,
) {
let tool_name = args.data().to_rust_string_lossy(scope);
let input = if args.length() == 0 {
Ok(None)
} else {
v8_value_to_json(scope, args.get(0))
};
let input = match input {
Ok(input) => input,
Err(error_text) => {
throw_type_error(scope, &error_text);
return;
}
};
let Some(resolver) = v8::PromiseResolver::new(scope) else {
throw_type_error(scope, "failed to create tool promise");
return;
};
let promise = resolver.get_promise(scope);
let resolver = v8::Global::new(scope, resolver);
let Some(state) = scope.get_slot_mut::<RuntimeState>() else {
throw_type_error(scope, "runtime state unavailable");
return;
};
let id = format!("tool-{}", state.next_tool_call_id);
state.next_tool_call_id = state.next_tool_call_id.saturating_add(1);
let event_tx = state.event_tx.clone();
state.pending_tool_calls.insert(id.clone(), resolver);
let _ = event_tx.send(RuntimeEvent::ToolCall {
id,
name: tool_name,
input,
});
retval.set(promise.into());
}
pub(super) fn text_callback(
scope: &mut v8::PinScope<'_, '_>,
args: v8::FunctionCallbackArguments,
mut retval: v8::ReturnValue<v8::Value>,
) {
let value = if args.length() == 0 {
v8::undefined(scope).into()
} else {
args.get(0)
};
let text = match serialize_output_text(scope, value) {
Ok(text) => text,
Err(error_text) => {
throw_type_error(scope, &error_text);
return;
}
};
if let Some(state) = scope.get_slot::<RuntimeState>() {
let _ = state.event_tx.send(RuntimeEvent::ContentItem(
FunctionCallOutputContentItem::InputText { text },
));
}
retval.set(v8::undefined(scope).into());
}
pub(super) fn image_callback(
scope: &mut v8::PinScope<'_, '_>,
args: v8::FunctionCallbackArguments,
mut retval: v8::ReturnValue<v8::Value>,
) {
let value = if args.length() == 0 {
v8::undefined(scope).into()
} else {
args.get(0)
};
let image_item = match normalize_output_image(scope, value) {
Ok(image_item) => image_item,
Err(()) => return,
};
if let Some(state) = scope.get_slot::<RuntimeState>() {
let _ = state.event_tx.send(RuntimeEvent::ContentItem(image_item));
}
retval.set(v8::undefined(scope).into());
}
pub(super) fn store_callback(
scope: &mut v8::PinScope<'_, '_>,
args: v8::FunctionCallbackArguments,
_retval: v8::ReturnValue<v8::Value>,
) {
let key = match args.get(0).to_string(scope) {
Some(key) => key.to_rust_string_lossy(scope),
None => {
throw_type_error(scope, "store key must be a string");
return;
}
};
let value = args.get(1);
let serialized = match v8_value_to_json(scope, value) {
Ok(Some(value)) => value,
Ok(None) => {
throw_type_error(
scope,
&format!("Unable to store {key:?}. Only plain serializable objects can be stored."),
);
return;
}
Err(error_text) => {
throw_type_error(scope, &error_text);
return;
}
};
if let Some(state) = scope.get_slot_mut::<RuntimeState>() {
state.stored_values.insert(key, serialized);
}
}
pub(super) fn load_callback(
scope: &mut v8::PinScope<'_, '_>,
args: v8::FunctionCallbackArguments,
mut retval: v8::ReturnValue<v8::Value>,
) {
let key = match args.get(0).to_string(scope) {
Some(key) => key.to_rust_string_lossy(scope),
None => {
throw_type_error(scope, "load key must be a string");
return;
}
};
let value = scope
.get_slot::<RuntimeState>()
.and_then(|state| state.stored_values.get(&key))
.cloned();
let Some(value) = value else {
retval.set(v8::undefined(scope).into());
return;
};
let Some(value) = json_to_v8(scope, &value) else {
throw_type_error(scope, "failed to load stored value");
return;
};
retval.set(value);
}
pub(super) fn notify_callback(
scope: &mut v8::PinScope<'_, '_>,
args: v8::FunctionCallbackArguments,
mut retval: v8::ReturnValue<v8::Value>,
) {
let value = if args.length() == 0 {
v8::undefined(scope).into()
} else {
args.get(0)
};
let text = match serialize_output_text(scope, value) {
Ok(text) => text,
Err(error_text) => {
throw_type_error(scope, &error_text);
return;
}
};
if text.trim().is_empty() {
throw_type_error(scope, "notify expects non-empty text");
return;
}
if let Some(state) = scope.get_slot::<RuntimeState>() {
let _ = state.event_tx.send(RuntimeEvent::Notify {
call_id: state.tool_call_id.clone(),
text,
});
}
retval.set(v8::undefined(scope).into());
}
pub(super) fn yield_control_callback(
scope: &mut v8::PinScope<'_, '_>,
_args: v8::FunctionCallbackArguments,
_retval: v8::ReturnValue<v8::Value>,
) {
if let Some(state) = scope.get_slot::<RuntimeState>() {
let _ = state.event_tx.send(RuntimeEvent::YieldRequested);
}
}
pub(super) fn exit_callback(
scope: &mut v8::PinScope<'_, '_>,
_args: v8::FunctionCallbackArguments,
_retval: v8::ReturnValue<v8::Value>,
) {
if let Some(state) = scope.get_slot_mut::<RuntimeState>() {
state.exit_requested = true;
}
if let Some(error) = v8::String::new(scope, EXIT_SENTINEL) {
scope.throw_exception(error.into());
}
}

View File

@@ -0,0 +1,138 @@
use super::RuntimeState;
use super::callbacks::exit_callback;
use super::callbacks::image_callback;
use super::callbacks::load_callback;
use super::callbacks::notify_callback;
use super::callbacks::store_callback;
use super::callbacks::text_callback;
use super::callbacks::tool_callback;
use super::callbacks::yield_control_callback;
pub(super) fn install_globals(scope: &mut v8::PinScope<'_, '_>) -> Result<(), String> {
let global = scope.get_current_context().global(scope);
let console = v8::String::new(scope, "console")
.ok_or_else(|| "failed to allocate global `console`".to_string())?;
if global.delete(scope, console.into()) != Some(true) {
return Err("failed to remove global `console`".to_string());
}
let tools = build_tools_object(scope)?;
let all_tools = build_all_tools_value(scope)?;
let text = helper_function(scope, "text", text_callback)?;
let image = helper_function(scope, "image", image_callback)?;
let store = helper_function(scope, "store", store_callback)?;
let load = helper_function(scope, "load", load_callback)?;
let notify = helper_function(scope, "notify", notify_callback)?;
let yield_control = helper_function(scope, "yield_control", yield_control_callback)?;
let exit = helper_function(scope, "exit", exit_callback)?;
set_global(scope, global, "tools", tools.into())?;
set_global(scope, global, "ALL_TOOLS", all_tools)?;
set_global(scope, global, "text", text.into())?;
set_global(scope, global, "image", image.into())?;
set_global(scope, global, "store", store.into())?;
set_global(scope, global, "load", load.into())?;
set_global(scope, global, "notify", notify.into())?;
set_global(scope, global, "yield_control", yield_control.into())?;
set_global(scope, global, "exit", exit.into())?;
Ok(())
}
fn build_tools_object<'s>(
scope: &mut v8::PinScope<'s, '_>,
) -> Result<v8::Local<'s, v8::Object>, String> {
let tools = v8::Object::new(scope);
let enabled_tools = scope
.get_slot::<RuntimeState>()
.map(|state| state.enabled_tools.clone())
.unwrap_or_default();
for tool in enabled_tools {
let name = v8::String::new(scope, &tool.global_name)
.ok_or_else(|| "failed to allocate tool name".to_string())?;
let function = tool_function(scope, &tool.tool_name)?;
tools.set(scope, name.into(), function.into());
}
Ok(tools)
}
fn build_all_tools_value<'s>(
scope: &mut v8::PinScope<'s, '_>,
) -> Result<v8::Local<'s, v8::Value>, String> {
let enabled_tools = scope
.get_slot::<RuntimeState>()
.map(|state| state.enabled_tools.clone())
.unwrap_or_default();
let array = v8::Array::new(scope, enabled_tools.len() as i32);
let name_key = v8::String::new(scope, "name")
.ok_or_else(|| "failed to allocate ALL_TOOLS name key".to_string())?;
let description_key = v8::String::new(scope, "description")
.ok_or_else(|| "failed to allocate ALL_TOOLS description key".to_string())?;
for (index, tool) in enabled_tools.iter().enumerate() {
let item = v8::Object::new(scope);
let name = v8::String::new(scope, &tool.global_name)
.ok_or_else(|| "failed to allocate ALL_TOOLS name".to_string())?;
let description = v8::String::new(scope, &tool.description)
.ok_or_else(|| "failed to allocate ALL_TOOLS description".to_string())?;
if item.set(scope, name_key.into(), name.into()) != Some(true) {
return Err("failed to set ALL_TOOLS name".to_string());
}
if item.set(scope, description_key.into(), description.into()) != Some(true) {
return Err("failed to set ALL_TOOLS description".to_string());
}
if array.set_index(scope, index as u32, item.into()) != Some(true) {
return Err("failed to append ALL_TOOLS metadata".to_string());
}
}
Ok(array.into())
}
fn helper_function<'s, F>(
scope: &mut v8::PinScope<'s, '_>,
name: &str,
callback: F,
) -> Result<v8::Local<'s, v8::Function>, String>
where
F: v8::MapFnTo<v8::FunctionCallback>,
{
let name =
v8::String::new(scope, name).ok_or_else(|| "failed to allocate helper name".to_string())?;
let template = v8::FunctionTemplate::builder(callback)
.data(name.into())
.build(scope);
template
.get_function(scope)
.ok_or_else(|| "failed to create helper function".to_string())
}
fn tool_function<'s>(
scope: &mut v8::PinScope<'s, '_>,
tool_name: &str,
) -> Result<v8::Local<'s, v8::Function>, String> {
let data = v8::String::new(scope, tool_name)
.ok_or_else(|| "failed to allocate tool callback data".to_string())?;
let template = v8::FunctionTemplate::builder(tool_callback)
.data(data.into())
.build(scope);
template
.get_function(scope)
.ok_or_else(|| "failed to create tool function".to_string())
}
fn set_global<'s>(
scope: &mut v8::PinScope<'s, '_>,
global: v8::Local<'s, v8::Object>,
name: &str,
value: v8::Local<'s, v8::Value>,
) -> Result<(), String> {
let key = v8::String::new(scope, name)
.ok_or_else(|| format!("failed to allocate global `{name}`"))?;
if global.set(scope, key.into(), value) == Some(true) {
Ok(())
} else {
Err(format!("failed to set global `{name}`"))
}
}

View File

@@ -0,0 +1,349 @@
mod callbacks;
mod globals;
mod module_loader;
mod value;
use std::collections::HashMap;
use std::sync::OnceLock;
use std::sync::mpsc as std_mpsc;
use std::thread;
use serde_json::Value as JsonValue;
use tokio::sync::mpsc;
use crate::description::EnabledToolMetadata;
use crate::description::ToolDefinition;
use crate::description::enabled_tool_metadata;
use crate::response::FunctionCallOutputContentItem;
pub const DEFAULT_EXEC_YIELD_TIME_MS: u64 = 10_000;
pub const DEFAULT_WAIT_YIELD_TIME_MS: u64 = 10_000;
pub const DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL: usize = 10_000;
const EXIT_SENTINEL: &str = "__codex_code_mode_exit__";
#[derive(Clone, Debug)]
pub struct ExecuteRequest {
pub tool_call_id: String,
pub enabled_tools: Vec<ToolDefinition>,
pub source: String,
pub stored_values: HashMap<String, JsonValue>,
pub yield_time_ms: Option<u64>,
pub max_output_tokens: Option<usize>,
}
#[derive(Clone, Debug)]
pub struct WaitRequest {
pub cell_id: String,
pub yield_time_ms: u64,
pub terminate: bool,
}
#[derive(Debug, PartialEq)]
pub enum RuntimeResponse {
Yielded {
cell_id: String,
content_items: Vec<FunctionCallOutputContentItem>,
},
Terminated {
cell_id: String,
content_items: Vec<FunctionCallOutputContentItem>,
},
Result {
cell_id: String,
content_items: Vec<FunctionCallOutputContentItem>,
stored_values: HashMap<String, JsonValue>,
error_text: Option<String>,
},
}
#[derive(Debug)]
pub(crate) enum TurnMessage {
ToolCall {
cell_id: String,
id: String,
name: String,
input: Option<JsonValue>,
},
Notify {
cell_id: String,
call_id: String,
text: String,
},
}
#[derive(Debug)]
pub(crate) enum RuntimeCommand {
ToolResponse { id: String, result: JsonValue },
ToolError { id: String, error_text: String },
Terminate,
}
#[derive(Debug)]
pub(crate) enum RuntimeEvent {
Started,
ContentItem(FunctionCallOutputContentItem),
YieldRequested,
ToolCall {
id: String,
name: String,
input: Option<JsonValue>,
},
Notify {
call_id: String,
text: String,
},
Result {
stored_values: HashMap<String, JsonValue>,
error_text: Option<String>,
},
}
pub(crate) fn spawn_runtime(
request: ExecuteRequest,
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
) -> Result<(std_mpsc::Sender<RuntimeCommand>, v8::IsolateHandle), String> {
let (command_tx, command_rx) = std_mpsc::channel();
let (isolate_handle_tx, isolate_handle_rx) = std_mpsc::sync_channel(1);
let enabled_tools = request
.enabled_tools
.iter()
.map(enabled_tool_metadata)
.collect::<Vec<_>>();
let config = RuntimeConfig {
tool_call_id: request.tool_call_id,
enabled_tools,
source: request.source,
stored_values: request.stored_values,
};
thread::spawn(move || {
run_runtime(config, event_tx, command_rx, isolate_handle_tx);
});
let isolate_handle = isolate_handle_rx
.recv()
.map_err(|_| "failed to initialize code mode runtime".to_string())?;
Ok((command_tx, isolate_handle))
}
#[derive(Clone)]
struct RuntimeConfig {
tool_call_id: String,
enabled_tools: Vec<EnabledToolMetadata>,
source: String,
stored_values: HashMap<String, JsonValue>,
}
pub(super) struct RuntimeState {
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
pending_tool_calls: HashMap<String, v8::Global<v8::PromiseResolver>>,
stored_values: HashMap<String, JsonValue>,
enabled_tools: Vec<EnabledToolMetadata>,
next_tool_call_id: u64,
tool_call_id: String,
exit_requested: bool,
}
pub(super) enum CompletionState {
Pending,
Completed {
stored_values: HashMap<String, JsonValue>,
error_text: Option<String>,
},
}
fn initialize_v8() {
static PLATFORM: OnceLock<v8::SharedRef<v8::Platform>> = OnceLock::new();
let _ = PLATFORM.get_or_init(|| {
let platform = v8::new_default_platform(0, false).make_shared();
v8::V8::initialize_platform(platform.clone());
v8::V8::initialize();
platform
});
}
fn run_runtime(
config: RuntimeConfig,
event_tx: mpsc::UnboundedSender<RuntimeEvent>,
command_rx: std_mpsc::Receiver<RuntimeCommand>,
isolate_handle_tx: std_mpsc::SyncSender<v8::IsolateHandle>,
) {
initialize_v8();
let isolate = &mut v8::Isolate::new(v8::CreateParams::default());
let isolate_handle = isolate.thread_safe_handle();
if isolate_handle_tx.send(isolate_handle).is_err() {
return;
}
isolate.set_host_import_module_dynamically_callback(module_loader::dynamic_import_callback);
v8::scope!(let scope, isolate);
let context = v8::Context::new(scope, Default::default());
let scope = &mut v8::ContextScope::new(scope, context);
scope.set_slot(RuntimeState {
event_tx: event_tx.clone(),
pending_tool_calls: HashMap::new(),
stored_values: config.stored_values,
enabled_tools: config.enabled_tools,
next_tool_call_id: 1,
tool_call_id: config.tool_call_id,
exit_requested: false,
});
if let Err(error_text) = globals::install_globals(scope) {
send_result(&event_tx, HashMap::new(), Some(error_text));
return;
}
let _ = event_tx.send(RuntimeEvent::Started);
let pending_promise = match module_loader::evaluate_main_module(scope, &config.source) {
Ok(pending_promise) => pending_promise,
Err(error_text) => {
capture_scope_send_error(scope, &event_tx, Some(error_text));
return;
}
};
match module_loader::completion_state(scope, pending_promise.as_ref()) {
CompletionState::Completed {
stored_values,
error_text,
} => {
send_result(&event_tx, stored_values, error_text);
return;
}
CompletionState::Pending => {}
}
let mut pending_promise = pending_promise;
loop {
let Ok(command) = command_rx.recv() else {
break;
};
match command {
RuntimeCommand::Terminate => break,
RuntimeCommand::ToolResponse { id, result } => {
if let Err(error_text) =
module_loader::resolve_tool_response(scope, &id, Ok(result))
{
capture_scope_send_error(scope, &event_tx, Some(error_text));
return;
}
}
RuntimeCommand::ToolError { id, error_text } => {
if let Err(runtime_error) =
module_loader::resolve_tool_response(scope, &id, Err(error_text))
{
capture_scope_send_error(scope, &event_tx, Some(runtime_error));
return;
}
}
}
scope.perform_microtask_checkpoint();
match module_loader::completion_state(scope, pending_promise.as_ref()) {
CompletionState::Completed {
stored_values,
error_text,
} => {
send_result(&event_tx, stored_values, error_text);
return;
}
CompletionState::Pending => {}
}
if let Some(promise) = pending_promise.as_ref() {
let promise = v8::Local::new(scope, promise);
if promise.state() != v8::PromiseState::Pending {
pending_promise = None;
}
}
}
}
fn capture_scope_send_error(
scope: &mut v8::PinScope<'_, '_>,
event_tx: &mpsc::UnboundedSender<RuntimeEvent>,
error_text: Option<String>,
) {
let stored_values = scope
.get_slot::<RuntimeState>()
.map(|state| state.stored_values.clone())
.unwrap_or_default();
send_result(event_tx, stored_values, error_text);
}
fn send_result(
event_tx: &mpsc::UnboundedSender<RuntimeEvent>,
stored_values: HashMap<String, JsonValue>,
error_text: Option<String>,
) {
let _ = event_tx.send(RuntimeEvent::Result {
stored_values,
error_text,
});
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::time::Duration;
use pretty_assertions::assert_eq;
use tokio::sync::mpsc;
use super::ExecuteRequest;
use super::RuntimeEvent;
use super::spawn_runtime;
fn execute_request(source: &str) -> ExecuteRequest {
ExecuteRequest {
tool_call_id: "call_1".to_string(),
enabled_tools: Vec::new(),
source: source.to_string(),
stored_values: HashMap::new(),
yield_time_ms: Some(1),
max_output_tokens: None,
}
}
#[tokio::test]
async fn terminate_execution_stops_cpu_bound_module() {
let (event_tx, mut event_rx) = mpsc::unbounded_channel();
let (_runtime_tx, runtime_terminate_handle) =
spawn_runtime(execute_request("while (true) {}"), event_tx).unwrap();
let started_event = tokio::time::timeout(Duration::from_secs(1), event_rx.recv())
.await
.unwrap()
.unwrap();
assert!(matches!(started_event, RuntimeEvent::Started));
assert!(runtime_terminate_handle.terminate_execution());
let result_event = tokio::time::timeout(Duration::from_secs(1), event_rx.recv())
.await
.unwrap()
.unwrap();
let RuntimeEvent::Result {
stored_values,
error_text,
} = result_event
else {
panic!("expected runtime result after termination");
};
assert_eq!(stored_values, HashMap::new());
assert!(error_text.is_some());
assert!(
tokio::time::timeout(Duration::from_secs(1), event_rx.recv())
.await
.unwrap()
.is_none()
);
}
}

View File

@@ -0,0 +1,235 @@
use serde_json::Value as JsonValue;
use super::CompletionState;
use super::EXIT_SENTINEL;
use super::RuntimeState;
use super::value::json_to_v8;
use super::value::value_to_error_text;
pub(super) fn evaluate_main_module(
scope: &mut v8::PinScope<'_, '_>,
source_text: &str,
) -> Result<Option<v8::Global<v8::Promise>>, String> {
let tc = std::pin::pin!(v8::TryCatch::new(scope));
let mut tc = tc.init();
let source = v8::String::new(&tc, source_text)
.ok_or_else(|| "failed to allocate exec source".to_string())?;
let origin = script_origin(&mut tc, "exec_main.mjs")?;
let mut source = v8::script_compiler::Source::new(source, Some(&origin));
let module = v8::script_compiler::compile_module(&tc, &mut source).ok_or_else(|| {
tc.exception()
.map(|exception| value_to_error_text(&mut tc, exception))
.unwrap_or_else(|| "unknown code mode exception".to_string())
})?;
module
.instantiate_module(&tc, resolve_module_callback)
.ok_or_else(|| {
tc.exception()
.map(|exception| value_to_error_text(&mut tc, exception))
.unwrap_or_else(|| "unknown code mode exception".to_string())
})?;
let result = match module.evaluate(&tc) {
Some(result) => result,
None => {
if let Some(exception) = tc.exception() {
if is_exit_exception(&mut tc, exception) {
return Ok(None);
}
return Err(value_to_error_text(&mut tc, exception));
}
return Err("unknown code mode exception".to_string());
}
};
tc.perform_microtask_checkpoint();
if result.is_promise() {
let promise = v8::Local::<v8::Promise>::try_from(result)
.map_err(|_| "failed to read exec promise".to_string())?;
return Ok(Some(v8::Global::new(&tc, promise)));
}
Ok(None)
}
fn is_exit_exception(
scope: &mut v8::PinScope<'_, '_>,
exception: v8::Local<'_, v8::Value>,
) -> bool {
scope
.get_slot::<RuntimeState>()
.map(|state| state.exit_requested)
.unwrap_or(false)
&& exception.is_string()
&& exception.to_rust_string_lossy(scope) == EXIT_SENTINEL
}
pub(super) fn resolve_tool_response(
scope: &mut v8::PinScope<'_, '_>,
id: &str,
response: Result<JsonValue, String>,
) -> Result<(), String> {
let resolver = {
let state = scope
.get_slot_mut::<RuntimeState>()
.ok_or_else(|| "runtime state unavailable".to_string())?;
state.pending_tool_calls.remove(id)
}
.ok_or_else(|| format!("unknown tool call `{id}`"))?;
let tc = std::pin::pin!(v8::TryCatch::new(scope));
let mut tc = tc.init();
let resolver = v8::Local::new(&tc, &resolver);
match response {
Ok(result) => {
let value = json_to_v8(&mut tc, &result)
.ok_or_else(|| "failed to serialize tool response".to_string())?;
resolver.resolve(&tc, value);
}
Err(error_text) => {
let value = v8::String::new(&tc, &error_text)
.ok_or_else(|| "failed to allocate tool error".to_string())?;
resolver.reject(&tc, value.into());
}
}
if tc.has_caught() {
return Err(tc
.exception()
.map(|exception| value_to_error_text(&mut tc, exception))
.unwrap_or_else(|| "unknown code mode exception".to_string()));
}
Ok(())
}
pub(super) fn completion_state(
scope: &mut v8::PinScope<'_, '_>,
pending_promise: Option<&v8::Global<v8::Promise>>,
) -> CompletionState {
let stored_values = scope
.get_slot::<RuntimeState>()
.map(|state| state.stored_values.clone())
.unwrap_or_default();
let Some(pending_promise) = pending_promise else {
return CompletionState::Completed {
stored_values,
error_text: None,
};
};
let promise = v8::Local::new(scope, pending_promise);
match promise.state() {
v8::PromiseState::Pending => CompletionState::Pending,
v8::PromiseState::Fulfilled => CompletionState::Completed {
stored_values,
error_text: None,
},
v8::PromiseState::Rejected => {
let result = promise.result(scope);
let error_text = if is_exit_exception(scope, result) {
None
} else {
Some(value_to_error_text(scope, result))
};
CompletionState::Completed {
stored_values,
error_text,
}
}
}
}
fn script_origin<'s>(
scope: &mut v8::PinScope<'s, '_>,
resource_name_: &str,
) -> Result<v8::ScriptOrigin<'s>, String> {
let resource_name = v8::String::new(scope, resource_name_)
.ok_or_else(|| "failed to allocate script origin".to_string())?;
let source_map_url = v8::String::new(scope, resource_name_)
.ok_or_else(|| "failed to allocate source map url".to_string())?;
Ok(v8::ScriptOrigin::new(
scope,
resource_name.into(),
0,
0,
true,
0,
Some(source_map_url.into()),
true,
false,
true,
None,
))
}
fn resolve_module_callback<'s>(
context: v8::Local<'s, v8::Context>,
specifier: v8::Local<'s, v8::String>,
_import_attributes: v8::Local<'s, v8::FixedArray>,
_referrer: v8::Local<'s, v8::Module>,
) -> Option<v8::Local<'s, v8::Module>> {
v8::callback_scope!(unsafe scope, context);
let specifier = specifier.to_rust_string_lossy(scope);
resolve_module(scope, &specifier)
}
pub(super) fn dynamic_import_callback<'s>(
scope: &mut v8::PinScope<'s, '_>,
_host_defined_options: v8::Local<'s, v8::Data>,
_resource_name: v8::Local<'s, v8::Value>,
specifier: v8::Local<'s, v8::String>,
_import_attributes: v8::Local<'s, v8::FixedArray>,
) -> Option<v8::Local<'s, v8::Promise>> {
let specifier = specifier.to_rust_string_lossy(scope);
let resolver = v8::PromiseResolver::new(scope)?;
match resolve_module(scope, &specifier) {
Some(module) => {
if module.get_status() == v8::ModuleStatus::Uninstantiated
&& module
.instantiate_module(scope, resolve_module_callback)
.is_none()
{
let error = v8::String::new(scope, "failed to instantiate module")
.map(Into::into)
.unwrap_or_else(|| v8::undefined(scope).into());
resolver.reject(scope, error);
return Some(resolver.get_promise(scope));
}
if matches!(
module.get_status(),
v8::ModuleStatus::Instantiated | v8::ModuleStatus::Evaluated
) && module.evaluate(scope).is_none()
{
let error = v8::String::new(scope, "failed to evaluate module")
.map(Into::into)
.unwrap_or_else(|| v8::undefined(scope).into());
resolver.reject(scope, error);
return Some(resolver.get_promise(scope));
}
let namespace = module.get_module_namespace();
resolver.resolve(scope, namespace);
Some(resolver.get_promise(scope))
}
None => {
let error = v8::String::new(scope, "unsupported import in exec")
.map(Into::into)
.unwrap_or_else(|| v8::undefined(scope).into());
resolver.reject(scope, error);
Some(resolver.get_promise(scope))
}
}
}
fn resolve_module<'s>(
scope: &mut v8::PinScope<'s, '_>,
specifier: &str,
) -> Option<v8::Local<'s, v8::Module>> {
if let Some(message) =
v8::String::new(scope, &format!("Unsupported import in exec: {specifier}"))
{
scope.throw_exception(message.into());
} else {
scope.throw_exception(v8::undefined(scope).into());
}
None
}

View File

@@ -0,0 +1,163 @@
use serde_json::Value as JsonValue;
use crate::response::FunctionCallOutputContentItem;
use crate::response::ImageDetail;
pub(super) fn serialize_output_text(
scope: &mut v8::PinScope<'_, '_>,
value: v8::Local<'_, v8::Value>,
) -> Result<String, String> {
if value.is_undefined()
|| value.is_null()
|| value.is_boolean()
|| value.is_number()
|| value.is_big_int()
|| value.is_string()
{
return Ok(value.to_rust_string_lossy(scope));
}
let tc = std::pin::pin!(v8::TryCatch::new(scope));
let mut tc = tc.init();
if let Some(stringified) = v8::json::stringify(&tc, value) {
return Ok(stringified.to_rust_string_lossy(&tc));
}
if tc.has_caught() {
return Err(tc
.exception()
.map(|exception| value_to_error_text(&mut tc, exception))
.unwrap_or_else(|| "unknown code mode exception".to_string()));
}
Ok(value.to_rust_string_lossy(&tc))
}
pub(super) fn normalize_output_image(
scope: &mut v8::PinScope<'_, '_>,
value: v8::Local<'_, v8::Value>,
) -> Result<FunctionCallOutputContentItem, ()> {
let result = (|| -> Result<FunctionCallOutputContentItem, String> {
let (image_url, detail) = if value.is_string() {
(value.to_rust_string_lossy(scope), None)
} else if value.is_object() && !value.is_array() {
let object = v8::Local::<v8::Object>::try_from(value).map_err(|_| {
"image expects a non-empty image URL string or an object with image_url and optional detail".to_string()
})?;
let image_url_key = v8::String::new(scope, "image_url")
.ok_or_else(|| "failed to allocate image helper keys".to_string())?;
let detail_key = v8::String::new(scope, "detail")
.ok_or_else(|| "failed to allocate image helper keys".to_string())?;
let image_url = object
.get(scope, image_url_key.into())
.filter(|value| value.is_string())
.map(|value| value.to_rust_string_lossy(scope))
.ok_or_else(|| {
"image expects a non-empty image URL string or an object with image_url and optional detail"
.to_string()
})?;
let detail = match object.get(scope, detail_key.into()) {
Some(value) if value.is_string() => Some(value.to_rust_string_lossy(scope)),
Some(value) if value.is_null() || value.is_undefined() => None,
Some(_) => return Err("image detail must be a string when provided".to_string()),
None => None,
};
(image_url, detail)
} else {
return Err(
"image expects a non-empty image URL string or an object with image_url and optional detail"
.to_string(),
);
};
if image_url.is_empty() {
return Err(
"image expects a non-empty image URL string or an object with image_url and optional detail"
.to_string(),
);
}
let lower = image_url.to_ascii_lowercase();
if !(lower.starts_with("http://")
|| lower.starts_with("https://")
|| lower.starts_with("data:"))
{
return Err("image expects an http(s) or data URL".to_string());
}
let detail = match detail {
Some(detail) => {
let normalized = detail.to_ascii_lowercase();
Some(match normalized.as_str() {
"auto" => ImageDetail::Auto,
"low" => ImageDetail::Low,
"high" => ImageDetail::High,
"original" => ImageDetail::Original,
_ => {
return Err(
"image detail must be one of: auto, low, high, original".to_string()
);
}
})
}
None => None,
};
Ok(FunctionCallOutputContentItem::InputImage { image_url, detail })
})();
match result {
Ok(item) => Ok(item),
Err(error_text) => {
throw_type_error(scope, &error_text);
Err(())
}
}
}
pub(super) fn v8_value_to_json(
scope: &mut v8::PinScope<'_, '_>,
value: v8::Local<'_, v8::Value>,
) -> Result<Option<JsonValue>, String> {
let tc = std::pin::pin!(v8::TryCatch::new(scope));
let mut tc = tc.init();
let Some(stringified) = v8::json::stringify(&tc, value) else {
if tc.has_caught() {
return Err(tc
.exception()
.map(|exception| value_to_error_text(&mut tc, exception))
.unwrap_or_else(|| "unknown code mode exception".to_string()));
}
return Ok(None);
};
serde_json::from_str(&stringified.to_rust_string_lossy(&tc))
.map(Some)
.map_err(|err| format!("failed to serialize JavaScript value: {err}"))
}
pub(super) fn json_to_v8<'s>(
scope: &mut v8::PinScope<'s, '_>,
value: &JsonValue,
) -> Option<v8::Local<'s, v8::Value>> {
let json = serde_json::to_string(value).ok()?;
let json = v8::String::new(scope, &json)?;
v8::json::parse(scope, json)
}
pub(super) fn value_to_error_text(
scope: &mut v8::PinScope<'_, '_>,
value: v8::Local<'_, v8::Value>,
) -> String {
if value.is_object()
&& let Ok(object) = v8::Local::<v8::Object>::try_from(value)
&& let Some(key) = v8::String::new(scope, "stack")
&& let Some(stack) = object.get(scope, key.into())
&& stack.is_string()
{
return stack.to_rust_string_lossy(scope);
}
value.to_rust_string_lossy(scope)
}
pub(super) fn throw_type_error(scope: &mut v8::PinScope<'_, '_>, message: &str) {
if let Some(message) = v8::String::new(scope, message) {
scope.throw_exception(message.into());
}
}

View File

@@ -0,0 +1,673 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use async_trait::async_trait;
use serde_json::Value as JsonValue;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use crate::FunctionCallOutputContentItem;
use crate::runtime::DEFAULT_EXEC_YIELD_TIME_MS;
use crate::runtime::ExecuteRequest;
use crate::runtime::RuntimeCommand;
use crate::runtime::RuntimeEvent;
use crate::runtime::RuntimeResponse;
use crate::runtime::TurnMessage;
use crate::runtime::WaitRequest;
use crate::runtime::spawn_runtime;
#[async_trait]
pub trait CodeModeTurnHost: Send + Sync {
async fn invoke_tool(
&self,
tool_name: String,
input: Option<JsonValue>,
cancellation_token: CancellationToken,
) -> Result<JsonValue, String>;
async fn notify(&self, call_id: String, cell_id: String, text: String) -> Result<(), String>;
}
#[derive(Clone)]
struct SessionHandle {
control_tx: mpsc::UnboundedSender<SessionControlCommand>,
runtime_tx: std::sync::mpsc::Sender<RuntimeCommand>,
}
struct Inner {
stored_values: Mutex<HashMap<String, JsonValue>>,
sessions: Mutex<HashMap<String, SessionHandle>>,
turn_message_tx: mpsc::UnboundedSender<TurnMessage>,
turn_message_rx: Arc<Mutex<mpsc::UnboundedReceiver<TurnMessage>>>,
next_cell_id: AtomicU64,
}
pub struct CodeModeService {
inner: Arc<Inner>,
}
impl CodeModeService {
pub fn new() -> Self {
let (turn_message_tx, turn_message_rx) = mpsc::unbounded_channel();
Self {
inner: Arc::new(Inner {
stored_values: Mutex::new(HashMap::new()),
sessions: Mutex::new(HashMap::new()),
turn_message_tx,
turn_message_rx: Arc::new(Mutex::new(turn_message_rx)),
next_cell_id: AtomicU64::new(1),
}),
}
}
pub async fn stored_values(&self) -> HashMap<String, JsonValue> {
self.inner.stored_values.lock().await.clone()
}
pub async fn replace_stored_values(&self, values: HashMap<String, JsonValue>) {
*self.inner.stored_values.lock().await = values;
}
pub async fn execute(&self, request: ExecuteRequest) -> Result<RuntimeResponse, String> {
let cell_id = self
.inner
.next_cell_id
.fetch_add(1, Ordering::Relaxed)
.to_string();
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (runtime_tx, runtime_terminate_handle) = spawn_runtime(request.clone(), event_tx)?;
let (control_tx, control_rx) = mpsc::unbounded_channel();
let (response_tx, response_rx) = oneshot::channel();
self.inner.sessions.lock().await.insert(
cell_id.clone(),
SessionHandle {
control_tx: control_tx.clone(),
runtime_tx: runtime_tx.clone(),
},
);
tokio::spawn(run_session_control(
Arc::clone(&self.inner),
SessionControlContext {
cell_id: cell_id.clone(),
runtime_tx,
runtime_terminate_handle,
},
event_rx,
control_rx,
response_tx,
request.yield_time_ms.unwrap_or(DEFAULT_EXEC_YIELD_TIME_MS),
));
response_rx
.await
.map_err(|_| "exec runtime ended unexpectedly".to_string())
}
pub async fn wait(&self, request: WaitRequest) -> Result<RuntimeResponse, String> {
let cell_id = request.cell_id.clone();
let handle = self
.inner
.sessions
.lock()
.await
.get(&request.cell_id)
.cloned();
let Some(handle) = handle else {
return Ok(missing_cell_response(cell_id));
};
let (response_tx, response_rx) = oneshot::channel();
let control_message = if request.terminate {
SessionControlCommand::Terminate { response_tx }
} else {
SessionControlCommand::Poll {
yield_time_ms: request.yield_time_ms,
response_tx,
}
};
if handle.control_tx.send(control_message).is_err() {
return Ok(missing_cell_response(cell_id));
}
match response_rx.await {
Ok(response) => Ok(response),
Err(_) => Ok(missing_cell_response(request.cell_id)),
}
}
pub fn start_turn_worker(&self, host: Arc<dyn CodeModeTurnHost>) -> CodeModeTurnWorker {
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let inner = Arc::clone(&self.inner);
let turn_message_rx = Arc::clone(&self.inner.turn_message_rx);
tokio::spawn(async move {
loop {
let next_message = tokio::select! {
_ = &mut shutdown_rx => break,
message = async {
let mut turn_message_rx = turn_message_rx.lock().await;
turn_message_rx.recv().await
} => message,
};
let Some(next_message) = next_message else {
break;
};
match next_message {
TurnMessage::Notify {
cell_id,
call_id,
text,
} => {
if let Err(err) = host.notify(call_id, cell_id.clone(), text).await {
warn!(
"failed to deliver code mode notification for cell {cell_id}: {err}"
);
}
}
TurnMessage::ToolCall {
cell_id,
id,
name,
input,
} => {
let host = Arc::clone(&host);
let inner = Arc::clone(&inner);
tokio::spawn(async move {
let response = host
.invoke_tool(name, input, CancellationToken::new())
.await;
let runtime_tx = inner
.sessions
.lock()
.await
.get(&cell_id)
.map(|handle| handle.runtime_tx.clone());
let Some(runtime_tx) = runtime_tx else {
return;
};
let command = match response {
Ok(result) => RuntimeCommand::ToolResponse { id, result },
Err(error_text) => RuntimeCommand::ToolError { id, error_text },
};
let _ = runtime_tx.send(command);
});
}
}
}
});
CodeModeTurnWorker {
shutdown_tx: Some(shutdown_tx),
}
}
}
impl Default for CodeModeService {
fn default() -> Self {
Self::new()
}
}
pub struct CodeModeTurnWorker {
shutdown_tx: Option<oneshot::Sender<()>>,
}
impl Drop for CodeModeTurnWorker {
fn drop(&mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
}
}
enum SessionControlCommand {
Poll {
yield_time_ms: u64,
response_tx: oneshot::Sender<RuntimeResponse>,
},
Terminate {
response_tx: oneshot::Sender<RuntimeResponse>,
},
}
struct PendingResult {
content_items: Vec<FunctionCallOutputContentItem>,
stored_values: HashMap<String, JsonValue>,
error_text: Option<String>,
}
struct SessionControlContext {
cell_id: String,
runtime_tx: std::sync::mpsc::Sender<RuntimeCommand>,
runtime_terminate_handle: v8::IsolateHandle,
}
fn missing_cell_response(cell_id: String) -> RuntimeResponse {
RuntimeResponse::Result {
error_text: Some(format!("exec cell {cell_id} not found")),
cell_id,
content_items: Vec::new(),
stored_values: HashMap::new(),
}
}
fn pending_result_response(cell_id: &str, result: PendingResult) -> RuntimeResponse {
RuntimeResponse::Result {
cell_id: cell_id.to_string(),
content_items: result.content_items,
stored_values: result.stored_values,
error_text: result.error_text,
}
}
fn send_or_buffer_result(
cell_id: &str,
result: PendingResult,
response_tx: &mut Option<oneshot::Sender<RuntimeResponse>>,
pending_result: &mut Option<PendingResult>,
) -> bool {
if let Some(response_tx) = response_tx.take() {
let _ = response_tx.send(pending_result_response(cell_id, result));
return true;
}
*pending_result = Some(result);
false
}
async fn run_session_control(
inner: Arc<Inner>,
context: SessionControlContext,
mut event_rx: mpsc::UnboundedReceiver<RuntimeEvent>,
mut control_rx: mpsc::UnboundedReceiver<SessionControlCommand>,
initial_response_tx: oneshot::Sender<RuntimeResponse>,
initial_yield_time_ms: u64,
) {
let SessionControlContext {
cell_id,
runtime_tx,
runtime_terminate_handle,
} = context;
let mut content_items = Vec::new();
let mut pending_result: Option<PendingResult> = None;
let mut response_tx = Some(initial_response_tx);
let mut termination_requested = false;
let mut runtime_closed = false;
let mut yield_timer: Option<std::pin::Pin<Box<tokio::time::Sleep>>> = None;
loop {
tokio::select! {
maybe_event = async {
if runtime_closed {
std::future::pending::<Option<RuntimeEvent>>().await
} else {
event_rx.recv().await
}
} => {
let Some(event) = maybe_event else {
runtime_closed = true;
if termination_requested {
if let Some(response_tx) = response_tx.take() {
let _ = response_tx.send(RuntimeResponse::Terminated {
cell_id: cell_id.clone(),
content_items: std::mem::take(&mut content_items),
});
}
break;
}
if pending_result.is_none() {
let result = PendingResult {
content_items: std::mem::take(&mut content_items),
stored_values: HashMap::new(),
error_text: Some("exec runtime ended unexpectedly".to_string()),
};
if send_or_buffer_result(
&cell_id,
result,
&mut response_tx,
&mut pending_result,
) {
break;
}
}
continue;
};
match event {
RuntimeEvent::Started => {
yield_timer = Some(Box::pin(tokio::time::sleep(Duration::from_millis(initial_yield_time_ms))));
}
RuntimeEvent::ContentItem(item) => {
content_items.push(item);
}
RuntimeEvent::YieldRequested => {
yield_timer = None;
if let Some(response_tx) = response_tx.take() {
let _ = response_tx.send(RuntimeResponse::Yielded {
cell_id: cell_id.clone(),
content_items: std::mem::take(&mut content_items),
});
}
}
RuntimeEvent::Notify { call_id, text } => {
let _ = inner.turn_message_tx.send(TurnMessage::Notify {
cell_id: cell_id.clone(),
call_id,
text,
});
}
RuntimeEvent::ToolCall { id, name, input } => {
let _ = inner.turn_message_tx.send(TurnMessage::ToolCall {
cell_id: cell_id.clone(),
id,
name,
input,
});
}
RuntimeEvent::Result {
stored_values,
error_text,
} => {
yield_timer = None;
if termination_requested {
if let Some(response_tx) = response_tx.take() {
let _ = response_tx.send(RuntimeResponse::Terminated {
cell_id: cell_id.clone(),
content_items: std::mem::take(&mut content_items),
});
}
break;
}
let result = PendingResult {
content_items: std::mem::take(&mut content_items),
stored_values,
error_text,
};
if send_or_buffer_result(
&cell_id,
result,
&mut response_tx,
&mut pending_result,
) {
break;
}
}
}
}
maybe_command = control_rx.recv() => {
let Some(command) = maybe_command else {
break;
};
match command {
SessionControlCommand::Poll {
yield_time_ms,
response_tx: next_response_tx,
} => {
if let Some(result) = pending_result.take() {
let _ = next_response_tx.send(pending_result_response(&cell_id, result));
break;
}
response_tx = Some(next_response_tx);
yield_timer = Some(Box::pin(tokio::time::sleep(Duration::from_millis(yield_time_ms))));
}
SessionControlCommand::Terminate { response_tx: next_response_tx } => {
if let Some(result) = pending_result.take() {
let _ = next_response_tx.send(pending_result_response(&cell_id, result));
break;
}
response_tx = Some(next_response_tx);
termination_requested = true;
yield_timer = None;
let _ = runtime_tx.send(RuntimeCommand::Terminate);
let _ = runtime_terminate_handle.terminate_execution();
if runtime_closed {
if let Some(response_tx) = response_tx.take() {
let _ = response_tx.send(RuntimeResponse::Terminated {
cell_id: cell_id.clone(),
content_items: std::mem::take(&mut content_items),
});
}
break;
} else {
continue;
}
}
}
}
_ = async {
if let Some(yield_timer) = yield_timer.as_mut() {
yield_timer.await;
} else {
std::future::pending::<()>().await;
}
} => {
yield_timer = None;
if let Some(response_tx) = response_tx.take() {
let _ = response_tx.send(RuntimeResponse::Yielded {
cell_id: cell_id.clone(),
content_items: std::mem::take(&mut content_items),
});
}
}
}
}
let _ = runtime_tx.send(RuntimeCommand::Terminate);
inner.sessions.lock().await.remove(&cell_id);
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::AtomicU64;
use std::time::Duration;
use pretty_assertions::assert_eq;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use super::CodeModeService;
use super::Inner;
use super::RuntimeCommand;
use super::RuntimeResponse;
use super::SessionControlCommand;
use super::SessionControlContext;
use super::run_session_control;
use crate::FunctionCallOutputContentItem;
use crate::runtime::ExecuteRequest;
use crate::runtime::RuntimeEvent;
use crate::runtime::spawn_runtime;
fn execute_request(source: &str) -> ExecuteRequest {
ExecuteRequest {
tool_call_id: "call_1".to_string(),
enabled_tools: Vec::new(),
source: source.to_string(),
stored_values: HashMap::new(),
yield_time_ms: Some(1),
max_output_tokens: None,
}
}
fn test_inner() -> Arc<Inner> {
let (turn_message_tx, turn_message_rx) = mpsc::unbounded_channel();
Arc::new(Inner {
stored_values: Mutex::new(HashMap::new()),
sessions: Mutex::new(HashMap::new()),
turn_message_tx,
turn_message_rx: Arc::new(Mutex::new(turn_message_rx)),
next_cell_id: AtomicU64::new(1),
})
}
#[tokio::test]
async fn synchronous_exit_returns_successfully() {
let service = CodeModeService::new();
let response = service
.execute(ExecuteRequest {
source: r#"text("before"); exit(); text("after");"#.to_string(),
yield_time_ms: None,
..execute_request("")
})
.await
.unwrap();
assert_eq!(
response,
RuntimeResponse::Result {
cell_id: "1".to_string(),
content_items: vec![FunctionCallOutputContentItem::InputText {
text: "before".to_string(),
}],
stored_values: HashMap::new(),
error_text: None,
}
);
}
#[tokio::test]
async fn v8_console_is_not_exposed_on_global_this() {
let service = CodeModeService::new();
let response = service
.execute(ExecuteRequest {
source: r#"text(String(Object.hasOwn(globalThis, "console")));"#.to_string(),
yield_time_ms: None,
..execute_request("")
})
.await
.unwrap();
assert_eq!(
response,
RuntimeResponse::Result {
cell_id: "1".to_string(),
content_items: vec![FunctionCallOutputContentItem::InputText {
text: "false".to_string(),
}],
stored_values: HashMap::new(),
error_text: None,
}
);
}
#[tokio::test]
async fn output_helpers_return_undefined() {
let service = CodeModeService::new();
let response = service
.execute(ExecuteRequest {
source: r#"
const returnsUndefined = [
text("first"),
image("https://example.com/image.jpg"),
notify("ping"),
].map((value) => value === undefined);
text(JSON.stringify(returnsUndefined));
"#
.to_string(),
yield_time_ms: None,
..execute_request("")
})
.await
.unwrap();
assert_eq!(
response,
RuntimeResponse::Result {
cell_id: "1".to_string(),
content_items: vec![
FunctionCallOutputContentItem::InputText {
text: "first".to_string(),
},
FunctionCallOutputContentItem::InputImage {
image_url: "https://example.com/image.jpg".to_string(),
detail: None,
},
FunctionCallOutputContentItem::InputText {
text: "[true,true,true]".to_string(),
},
],
stored_values: HashMap::new(),
error_text: None,
}
);
}
#[tokio::test]
async fn terminate_waits_for_runtime_shutdown_before_responding() {
let inner = test_inner();
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (control_tx, control_rx) = mpsc::unbounded_channel();
let (initial_response_tx, initial_response_rx) = oneshot::channel();
let (runtime_event_tx, _runtime_event_rx) = mpsc::unbounded_channel();
let (runtime_tx, runtime_terminate_handle) = spawn_runtime(
ExecuteRequest {
source: "await new Promise(() => {})".to_string(),
yield_time_ms: None,
..execute_request("")
},
runtime_event_tx,
)
.unwrap();
tokio::spawn(run_session_control(
inner,
SessionControlContext {
cell_id: "cell-1".to_string(),
runtime_tx: runtime_tx.clone(),
runtime_terminate_handle,
},
event_rx,
control_rx,
initial_response_tx,
60_000,
));
event_tx.send(RuntimeEvent::Started).unwrap();
event_tx.send(RuntimeEvent::YieldRequested).unwrap();
assert_eq!(
initial_response_rx.await.unwrap(),
RuntimeResponse::Yielded {
cell_id: "cell-1".to_string(),
content_items: Vec::new(),
}
);
let (terminate_response_tx, terminate_response_rx) = oneshot::channel();
control_tx
.send(SessionControlCommand::Terminate {
response_tx: terminate_response_tx,
})
.unwrap();
let terminate_response = async { terminate_response_rx.await.unwrap() };
tokio::pin!(terminate_response);
assert!(
tokio::time::timeout(Duration::from_millis(100), terminate_response.as_mut())
.await
.is_err()
);
drop(event_tx);
assert_eq!(
terminate_response.await,
RuntimeResponse::Terminated {
cell_id: "cell-1".to_string(),
content_items: Vec::new(),
}
);
let _ = runtime_tx.send(RuntimeCommand::Terminate);
}
}

View File

@@ -31,6 +31,7 @@ codex-api = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-apply-patch = { workspace = true }
codex-async-utils = { workspace = true }
codex-code-mode = { workspace = true }
codex-connectors = { workspace = true }
codex-config = { workspace = true }
codex-exec-server = { workspace = true }

View File

@@ -3439,6 +3439,7 @@ impl Session {
DeveloperInstructions::from_policy(
turn_context.sandbox_policy.get(),
turn_context.approval_policy.value(),
turn_context.config.approvals_reviewer,
self.services.exec_policy.current().as_ref(),
&turn_context.cwd,
turn_context
@@ -5510,20 +5511,6 @@ pub(crate) async fn run_turn(
&available_connectors,
&skill_name_counts_lower,
));
// Explicit plugin mentions can make a plugin's enabled apps callable for
// this turn without persisting those connectors as sticky user selections.
let mut turn_enabled_connectors = explicitly_enabled_connectors.clone();
turn_enabled_connectors.extend(
mentioned_plugins
.iter()
.flat_map(|plugin| plugin.app_connector_ids.iter())
.map(|connector_id| connector_id.0.clone())
.filter(|connector_id| {
available_connectors
.iter()
.any(|connector| connector.is_enabled && connector.id == *connector_id)
}),
);
let connector_names_by_id = available_connectors
.iter()
.map(|connector| (connector.id.as_str(), connector.name.as_str()))
@@ -5675,7 +5662,7 @@ pub(crate) async fn run_turn(
&mut client_session,
turn_metadata_header.as_deref(),
sampling_request_input,
&turn_enabled_connectors,
&explicitly_enabled_connectors,
skills_outcome,
&mut server_model_warning_emitted_for_turn,
cancellation_token.child_token(),

View File

@@ -641,6 +641,21 @@ fn filter_connectors_for_input_skips_disabled_connectors() {
assert_eq!(selected, Vec::new());
}
#[test]
fn filter_connectors_for_input_skips_plugin_mentions() {
let connectors = vec![make_connector("figma", "Figma")];
let input = vec![user_message("use [@figma](plugin://figma@openai-curated)")];
let explicitly_enabled_connectors = HashSet::new();
let selected = filter_connectors_for_input(
&connectors,
&input,
&explicitly_enabled_connectors,
&HashMap::new(),
);
assert_eq!(selected, Vec::new());
}
#[test]
fn collect_explicit_app_ids_from_skill_items_includes_linked_mentions() {
let connectors = vec![make_connector("calendar", "Calendar")];

View File

@@ -43,6 +43,7 @@ fn build_permissions_update_item(
Some(DeveloperInstructions::from_policy(
next.sandbox_policy.get(),
next.approval_policy.value(),
next.config.approvals_reviewer,
exec_policy,
&next.cwd,
next.features.enabled(Feature::ExecPermissionApprovals),

View File

@@ -1,51 +0,0 @@
const __codexContentItems = Array.isArray(globalThis.__codexContentItems)
? globalThis.__codexContentItems
: [];
const __codexRuntime = globalThis.__codexRuntime;
delete globalThis.__codexRuntime;
Object.defineProperty(globalThis, '__codexContentItems', {
value: __codexContentItems,
configurable: true,
enumerable: false,
writable: false,
});
(() => {
if (!__codexRuntime || typeof __codexRuntime !== 'object') {
throw new Error('code mode runtime is unavailable');
}
function defineGlobal(name, value) {
Object.defineProperty(globalThis, name, {
value,
configurable: true,
enumerable: true,
writable: false,
});
}
defineGlobal('ALL_TOOLS', __codexRuntime.ALL_TOOLS);
defineGlobal('exit', __codexRuntime.exit);
defineGlobal('image', __codexRuntime.image);
defineGlobal('load', __codexRuntime.load);
defineGlobal('notify', __codexRuntime.notify);
defineGlobal('store', __codexRuntime.store);
defineGlobal('text', __codexRuntime.text);
defineGlobal('tools', __codexRuntime.tools);
defineGlobal('yield_control', __codexRuntime.yield_control);
defineGlobal(
'console',
Object.freeze({
log() {},
info() {},
warn() {},
error() {},
debug() {},
})
);
})();
__CODE_MODE_USER_CODE_PLACEHOLDER__

View File

@@ -1,19 +0,0 @@
## exec
- Runs raw JavaScript in an isolated context (no Node, no file system, or network access, no console).
- Send raw JavaScript source text, not JSON, quoted strings, or markdown code fences.
- You may optionally start the tool input with a first-line pragma like `// @exec: {"yield_time_ms": 10000, "max_output_tokens": 1000}`.
- `yield_time_ms` asks `exec` to yield early after that many milliseconds if the script is still running.
- `max_output_tokens` sets the token budget for direct `exec` results. By default the result is truncated to 10000 tokens.
- All nested tools are available on the global `tools` object, for example `await tools.exec_command(...)`. Tool names are exposed as normalized JavaScript identifiers, for example `await tools.mcp__ologs__get_profile(...)`.
- Tool methods take either string or object as parameter.
- They return either a structured value or a string based on the description above.
- Global helpers:
- `exit()`: Immediately ends the current script successfully (like an early return from the top level).
- `text(value: string | number | boolean | undefined | null)`: Appends a text item and returns it. Non-string values are stringified with `JSON.stringify(...)` when possible.
- `image(imageUrlOrItem: string | { image_url: string; detail?: "auto" | "low" | "high" | "original" | null })`: Appends an image item and returns it. `image_url` can be an HTTPS URL or a base64-encoded `data:` URL.
- `store(key: string, value: any)`: stores a serializable value under a string key for later `exec` calls in the same session.
- `load(key: string)`: returns the stored value for a string key, or `undefined` if it is missing.
- `notify(value: string | number | boolean | undefined | null)`: immediately injects an extra `custom_tool_call_output` for the current `exec` call. Values are stringified like `text(...)`.
- `ALL_TOOLS`: metadata for the enabled nested tools as `{ name, description }` entries.
- `yield_control()`: yields the accumulated output to the model immediately while the script keeps running.

View File

@@ -1,8 +1,5 @@
use async_trait::async_trait;
use serde::Deserialize;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::function_tool::FunctionCallError;
use crate::tools::context::FunctionToolOutput;
use crate::tools::context::ToolInvocation;
@@ -10,180 +7,52 @@ use crate::tools::context::ToolPayload;
use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
use super::CODE_MODE_PRAGMA_PREFIX;
use super::CodeModeSessionProgress;
use super::ExecContext;
use super::PUBLIC_TOOL_NAME;
use super::build_enabled_tools;
use super::handle_node_message;
use super::protocol::HostToNodeMessage;
use super::protocol::build_source;
use super::handle_runtime_response;
pub struct CodeModeExecuteHandler;
const MAX_JS_SAFE_INTEGER: u64 = (1_u64 << 53) - 1;
#[derive(Debug, Default, Deserialize, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
struct CodeModeExecPragma {
#[serde(default)]
yield_time_ms: Option<u64>,
#[serde(default)]
max_output_tokens: Option<usize>,
}
#[derive(Debug, PartialEq, Eq)]
struct CodeModeExecArgs {
code: String,
yield_time_ms: Option<u64>,
max_output_tokens: Option<usize>,
}
impl CodeModeExecuteHandler {
async fn execute(
&self,
session: std::sync::Arc<Session>,
turn: std::sync::Arc<TurnContext>,
session: std::sync::Arc<crate::codex::Session>,
turn: std::sync::Arc<crate::codex::TurnContext>,
call_id: String,
code: String,
) -> Result<FunctionToolOutput, FunctionCallError> {
let args = parse_freeform_args(&code)?;
let args =
codex_code_mode::parse_exec_source(&code).map_err(FunctionCallError::RespondToModel)?;
let exec = ExecContext { session, turn };
let enabled_tools = build_enabled_tools(&exec).await;
let service = &exec.session.services.code_mode_service;
let stored_values = service.stored_values().await;
let source =
build_source(&args.code, &enabled_tools).map_err(FunctionCallError::RespondToModel)?;
let cell_id = service.allocate_cell_id().await;
let request_id = service.allocate_request_id().await;
let process_slot = service
.ensure_started()
.await
.map_err(|err| FunctionCallError::RespondToModel(err.to_string()))?;
let stored_values = exec
.session
.services
.code_mode_service
.stored_values()
.await;
let started_at = std::time::Instant::now();
let message = HostToNodeMessage::Start {
request_id: request_id.clone(),
cell_id: cell_id.clone(),
tool_call_id: call_id,
default_yield_time_ms: super::DEFAULT_EXEC_YIELD_TIME_MS,
enabled_tools,
stored_values,
source,
yield_time_ms: args.yield_time_ms,
max_output_tokens: args.max_output_tokens,
};
let result = {
let mut process_slot = process_slot;
let Some(process) = process_slot.as_mut() else {
return Err(FunctionCallError::RespondToModel(format!(
"{PUBLIC_TOOL_NAME} runner failed to start"
)));
};
let message = process
.send(&request_id, &message)
.await
.map_err(|err| err.to_string());
let message = match message {
Ok(message) => message,
Err(error) => return Err(FunctionCallError::RespondToModel(error)),
};
handle_node_message(
&exec, cell_id, message, /*poll_max_output_tokens*/ None, started_at,
)
let response = exec
.session
.services
.code_mode_service
.execute(codex_code_mode::ExecuteRequest {
tool_call_id: call_id,
enabled_tools,
source: args.code,
stored_values,
yield_time_ms: args.yield_time_ms,
max_output_tokens: args.max_output_tokens,
})
.await
};
match result {
Ok(CodeModeSessionProgress::Finished(output))
| Ok(CodeModeSessionProgress::Yielded { output }) => Ok(output),
Err(error) => Err(FunctionCallError::RespondToModel(error)),
}
.map_err(FunctionCallError::RespondToModel)?;
handle_runtime_response(&exec, response, args.max_output_tokens, started_at)
.await
.map_err(FunctionCallError::RespondToModel)
}
}
fn parse_freeform_args(input: &str) -> Result<CodeModeExecArgs, FunctionCallError> {
if input.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"exec expects raw JavaScript source text (non-empty). Provide JS only, optionally with first-line `// @exec: {\"yield_time_ms\": 10000, \"max_output_tokens\": 1000}`.".to_string(),
));
}
let mut args = CodeModeExecArgs {
code: input.to_string(),
yield_time_ms: None,
max_output_tokens: None,
};
let mut lines = input.splitn(2, '\n');
let first_line = lines.next().unwrap_or_default();
let rest = lines.next().unwrap_or_default();
let trimmed = first_line.trim_start();
let Some(pragma) = trimmed.strip_prefix(CODE_MODE_PRAGMA_PREFIX) else {
return Ok(args);
};
if rest.trim().is_empty() {
return Err(FunctionCallError::RespondToModel(
"exec pragma must be followed by JavaScript source on subsequent lines".to_string(),
));
}
let directive = pragma.trim();
if directive.is_empty() {
return Err(FunctionCallError::RespondToModel(
"exec pragma must be a JSON object with supported fields `yield_time_ms` and `max_output_tokens`"
.to_string(),
));
}
let value: serde_json::Value = serde_json::from_str(directive).map_err(|err| {
FunctionCallError::RespondToModel(format!(
"exec pragma must be valid JSON with supported fields `yield_time_ms` and `max_output_tokens`: {err}"
))
})?;
let object = value.as_object().ok_or_else(|| {
FunctionCallError::RespondToModel(
"exec pragma must be a JSON object with supported fields `yield_time_ms` and `max_output_tokens`"
.to_string(),
)
})?;
for key in object.keys() {
match key.as_str() {
"yield_time_ms" | "max_output_tokens" => {}
_ => {
return Err(FunctionCallError::RespondToModel(format!(
"exec pragma only supports `yield_time_ms` and `max_output_tokens`; got `{key}`"
)));
}
}
}
let pragma: CodeModeExecPragma = serde_json::from_value(value).map_err(|err| {
FunctionCallError::RespondToModel(format!(
"exec pragma fields `yield_time_ms` and `max_output_tokens` must be non-negative safe integers: {err}"
))
})?;
if pragma
.yield_time_ms
.is_some_and(|yield_time_ms| yield_time_ms > MAX_JS_SAFE_INTEGER)
{
return Err(FunctionCallError::RespondToModel(
"exec pragma field `yield_time_ms` must be a non-negative safe integer".to_string(),
));
}
if pragma.max_output_tokens.is_some_and(|max_output_tokens| {
u64::try_from(max_output_tokens)
.map(|max_output_tokens| max_output_tokens > MAX_JS_SAFE_INTEGER)
.unwrap_or(true)
}) {
return Err(FunctionCallError::RespondToModel(
"exec pragma field `max_output_tokens` must be a non-negative safe integer".to_string(),
));
}
args.code = rest.to_string();
args.yield_time_ms = pragma.yield_time_ms;
args.max_output_tokens = pragma.max_output_tokens;
Ok(args)
}
#[async_trait]
impl ToolHandler for CodeModeExecuteHandler {
type Output = FunctionToolOutput;
@@ -216,7 +85,3 @@ impl ToolHandler for CodeModeExecuteHandler {
}
}
}
#[cfg(test)]
#[path = "execute_handler_tests.rs"]
mod execute_handler_tests;

View File

@@ -1,15 +1,18 @@
mod execute_handler;
mod process;
mod protocol;
mod service;
mod response_adapter;
mod wait_handler;
mod worker;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use codex_code_mode::CodeModeTurnHost;
use codex_code_mode::RuntimeResponse;
use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use serde_json::Value as JsonValue;
use tokio_util::sync::CancellationToken;
use crate::client_common::tools::ToolSpec;
use crate::codex::Session;
@@ -17,9 +20,8 @@ use crate::codex::TurnContext;
use crate::function_tool::FunctionCallError;
use crate::tools::ToolRouter;
use crate::tools::code_mode_description::augment_tool_spec_for_code_mode;
use crate::tools::code_mode_description::code_mode_tool_reference;
use crate::tools::code_mode_description::normalize_code_mode_identifier;
use crate::tools::context::FunctionToolOutput;
use crate::tools::context::SharedTurnDiffTracker;
use crate::tools::context::ToolPayload;
use crate::tools::parallel::ToolCallRuntime;
use crate::tools::router::ToolCall;
@@ -29,180 +31,202 @@ use crate::truncate::TruncationPolicy;
use crate::truncate::formatted_truncate_text_content_items_with_policy;
use crate::truncate::truncate_function_output_items_with_policy;
use crate::unified_exec::resolve_max_tokens;
use codex_features::Feature;
const CODE_MODE_RUNNER_SOURCE: &str = include_str!("runner.cjs");
const CODE_MODE_BRIDGE_SOURCE: &str = include_str!("bridge.js");
const CODE_MODE_DESCRIPTION_TEMPLATE: &str = include_str!("description.md");
const CODE_MODE_WAIT_DESCRIPTION_TEMPLATE: &str = include_str!("wait_description.md");
const CODE_MODE_PRAGMA_PREFIX: &str = "// @exec:";
const CODE_MODE_ONLY_PREFACE: &str =
"Use `exec/wait` tool to run all other tools, do not attempt to use any other tools directly";
pub(crate) use execute_handler::CodeModeExecuteHandler;
use response_adapter::into_function_call_output_content_items;
pub(crate) use wait_handler::CodeModeWaitHandler;
pub(crate) const PUBLIC_TOOL_NAME: &str = "exec";
pub(crate) const WAIT_TOOL_NAME: &str = "wait";
pub(crate) fn is_code_mode_nested_tool(tool_name: &str) -> bool {
tool_name != PUBLIC_TOOL_NAME && tool_name != WAIT_TOOL_NAME
}
pub(crate) const DEFAULT_EXEC_YIELD_TIME_MS: u64 = 10_000;
pub(crate) const DEFAULT_WAIT_YIELD_TIME_MS: u64 = 10_000;
pub(crate) const PUBLIC_TOOL_NAME: &str = codex_code_mode::PUBLIC_TOOL_NAME;
pub(crate) const WAIT_TOOL_NAME: &str = codex_code_mode::WAIT_TOOL_NAME;
pub(crate) const DEFAULT_WAIT_YIELD_TIME_MS: u64 = codex_code_mode::DEFAULT_WAIT_YIELD_TIME_MS;
#[derive(Clone)]
pub(super) struct ExecContext {
pub(crate) struct ExecContext {
pub(super) session: Arc<Session>,
pub(super) turn: Arc<TurnContext>,
}
pub(crate) use execute_handler::CodeModeExecuteHandler;
pub(crate) use service::CodeModeService;
pub(crate) use wait_handler::CodeModeWaitHandler;
enum CodeModeSessionProgress {
Finished(FunctionToolOutput),
Yielded { output: FunctionToolOutput },
pub(crate) struct CodeModeService {
inner: codex_code_mode::CodeModeService,
}
enum CodeModeExecutionStatus {
Completed,
Failed,
Running(String),
Terminated,
}
pub(crate) fn tool_description(enabled_tools: &[(String, String)], code_mode_only: bool) -> String {
let description_template = CODE_MODE_DESCRIPTION_TEMPLATE.trim_end();
if !code_mode_only {
return description_template.to_string();
impl CodeModeService {
pub(crate) fn new(_js_repl_node_path: Option<PathBuf>) -> Self {
Self {
inner: codex_code_mode::CodeModeService::new(),
}
}
let mut sections = vec![
CODE_MODE_ONLY_PREFACE.to_string(),
description_template.to_string(),
];
pub(crate) async fn stored_values(&self) -> std::collections::HashMap<String, JsonValue> {
self.inner.stored_values().await
}
if !enabled_tools.is_empty() {
let nested_tool_reference = enabled_tools
.iter()
.map(|(name, nested_description)| {
let global_name = normalize_code_mode_identifier(name);
format!(
"### `{global_name}` (`{name}`)\n{}",
nested_description.trim()
)
pub(crate) async fn replace_stored_values(
&self,
values: std::collections::HashMap<String, JsonValue>,
) {
self.inner.replace_stored_values(values).await;
}
pub(crate) async fn execute(
&self,
request: codex_code_mode::ExecuteRequest,
) -> Result<RuntimeResponse, String> {
self.inner.execute(request).await
}
pub(crate) async fn wait(
&self,
request: codex_code_mode::WaitRequest,
) -> Result<RuntimeResponse, String> {
self.inner.wait(request).await
}
pub(crate) async fn start_turn_worker(
&self,
session: &Arc<Session>,
turn: &Arc<TurnContext>,
router: Arc<ToolRouter>,
tracker: SharedTurnDiffTracker,
) -> Option<codex_code_mode::CodeModeTurnWorker> {
if !turn.features.enabled(Feature::CodeMode) {
return None;
}
let exec = ExecContext {
session: Arc::clone(session),
turn: Arc::clone(turn),
};
let tool_runtime =
ToolCallRuntime::new(router, Arc::clone(session), Arc::clone(turn), tracker);
let host = Arc::new(CoreTurnHost { exec, tool_runtime });
Some(self.inner.start_turn_worker(host))
}
}
struct CoreTurnHost {
exec: ExecContext,
tool_runtime: ToolCallRuntime,
}
#[async_trait::async_trait]
impl CodeModeTurnHost for CoreTurnHost {
async fn invoke_tool(
&self,
tool_name: String,
input: Option<JsonValue>,
cancellation_token: CancellationToken,
) -> Result<JsonValue, String> {
call_nested_tool(
self.exec.clone(),
self.tool_runtime.clone(),
tool_name,
input,
cancellation_token,
)
.await
.map_err(|error| error.to_string())
}
async fn notify(&self, call_id: String, cell_id: String, text: String) -> Result<(), String> {
if text.trim().is_empty() {
return Ok(());
}
self.exec
.session
.inject_response_items(vec![ResponseInputItem::CustomToolCallOutput {
call_id,
name: Some(PUBLIC_TOOL_NAME.to_string()),
output: FunctionCallOutputPayload::from_text(text),
}])
.await
.map_err(|_| {
format!("failed to inject exec notify message for cell {cell_id}: no active turn")
})
.collect::<Vec<_>>()
.join("\n\n");
sections.push(nested_tool_reference);
}
sections.join("\n\n")
}
pub(crate) fn wait_tool_description() -> &'static str {
CODE_MODE_WAIT_DESCRIPTION_TEMPLATE
}
async fn handle_node_message(
pub(super) async fn handle_runtime_response(
exec: &ExecContext,
cell_id: String,
message: protocol::NodeToHostMessage,
poll_max_output_tokens: Option<Option<usize>>,
response: RuntimeResponse,
max_output_tokens: Option<usize>,
started_at: std::time::Instant,
) -> Result<CodeModeSessionProgress, String> {
match message {
protocol::NodeToHostMessage::ToolCall { .. } => Err(protocol::unexpected_tool_call_error()),
protocol::NodeToHostMessage::Notify { .. } => Err(format!(
"unexpected {PUBLIC_TOOL_NAME} notify message in response path"
)),
protocol::NodeToHostMessage::Yielded { content_items, .. } => {
let mut delta_items = output_content_items_from_json_values(content_items)?;
delta_items = truncate_code_mode_result(delta_items, poll_max_output_tokens.flatten());
prepend_script_status(
&mut delta_items,
CodeModeExecutionStatus::Running(cell_id),
started_at.elapsed(),
);
Ok(CodeModeSessionProgress::Yielded {
output: FunctionToolOutput::from_content(delta_items, Some(true)),
})
) -> Result<FunctionToolOutput, String> {
let script_status = format_script_status(&response);
match response {
RuntimeResponse::Yielded { content_items, .. } => {
let mut content_items = into_function_call_output_content_items(content_items);
content_items = truncate_code_mode_result(content_items, max_output_tokens);
prepend_script_status(&mut content_items, &script_status, started_at.elapsed());
Ok(FunctionToolOutput::from_content(content_items, Some(true)))
}
protocol::NodeToHostMessage::Terminated { content_items, .. } => {
let mut delta_items = output_content_items_from_json_values(content_items)?;
delta_items = truncate_code_mode_result(delta_items, poll_max_output_tokens.flatten());
prepend_script_status(
&mut delta_items,
CodeModeExecutionStatus::Terminated,
started_at.elapsed(),
);
Ok(CodeModeSessionProgress::Finished(
FunctionToolOutput::from_content(delta_items, Some(true)),
))
RuntimeResponse::Terminated { content_items, .. } => {
let mut content_items = into_function_call_output_content_items(content_items);
content_items = truncate_code_mode_result(content_items, max_output_tokens);
prepend_script_status(&mut content_items, &script_status, started_at.elapsed());
Ok(FunctionToolOutput::from_content(content_items, Some(true)))
}
protocol::NodeToHostMessage::Result {
RuntimeResponse::Result {
content_items,
stored_values,
error_text,
max_output_tokens_per_exec_call,
..
} => {
let mut content_items = into_function_call_output_content_items(content_items);
exec.session
.services
.code_mode_service
.replace_stored_values(stored_values)
.await;
let mut delta_items = output_content_items_from_json_values(content_items)?;
let success = error_text.is_none();
if let Some(error_text) = error_text {
delta_items.push(FunctionCallOutputContentItem::InputText {
content_items.push(FunctionCallOutputContentItem::InputText {
text: format!("Script error:\n{error_text}"),
});
}
let mut delta_items = truncate_code_mode_result(
delta_items,
poll_max_output_tokens.unwrap_or(max_output_tokens_per_exec_call),
);
prepend_script_status(
&mut delta_items,
if success {
CodeModeExecutionStatus::Completed
} else {
CodeModeExecutionStatus::Failed
},
started_at.elapsed(),
);
Ok(CodeModeSessionProgress::Finished(
FunctionToolOutput::from_content(delta_items, Some(success)),
content_items = truncate_code_mode_result(content_items, max_output_tokens);
prepend_script_status(&mut content_items, &script_status, started_at.elapsed());
Ok(FunctionToolOutput::from_content(
content_items,
Some(success),
))
}
}
}
fn format_script_status(response: &RuntimeResponse) -> String {
match response {
RuntimeResponse::Yielded { cell_id, .. } => {
format!("Script running with cell ID {cell_id}")
}
RuntimeResponse::Terminated { .. } => "Script terminated".to_string(),
RuntimeResponse::Result { error_text, .. } => {
if error_text.is_none() {
"Script completed".to_string()
} else {
"Script failed".to_string()
}
}
}
}
fn prepend_script_status(
content_items: &mut Vec<FunctionCallOutputContentItem>,
status: CodeModeExecutionStatus,
status: &str,
wall_time: Duration,
) {
let wall_time_seconds = ((wall_time.as_secs_f32()) * 10.0).round() / 10.0;
let header = format!(
"{}\nWall time {wall_time_seconds:.1} seconds\nOutput:\n",
match status {
CodeModeExecutionStatus::Completed => "Script completed".to_string(),
CodeModeExecutionStatus::Failed => "Script failed".to_string(),
CodeModeExecutionStatus::Running(cell_id) => {
format!("Script running with cell ID {cell_id}")
}
CodeModeExecutionStatus::Terminated => "Script terminated".to_string(),
}
);
let header = format!("{status}\nWall time {wall_time_seconds:.1} seconds\nOutput:\n");
content_items.insert(0, FunctionCallOutputContentItem::InputText { text: header });
}
fn truncate_code_mode_result(
items: Vec<FunctionCallOutputContentItem>,
max_output_tokens_per_exec_call: Option<usize>,
max_output_tokens: Option<usize>,
) -> Vec<FunctionCallOutputContentItem> {
let max_output_tokens = resolve_max_tokens(max_output_tokens_per_exec_call);
let max_output_tokens = resolve_max_tokens(max_output_tokens);
let policy = TruncationPolicy::Tokens(max_output_tokens);
if items
.iter()
@@ -216,21 +240,9 @@ fn truncate_code_mode_result(
truncate_function_output_items_with_policy(&items, policy)
}
fn output_content_items_from_json_values(
content_items: Vec<JsonValue>,
) -> Result<Vec<FunctionCallOutputContentItem>, String> {
content_items
.into_iter()
.enumerate()
.map(|(index, item)| {
serde_json::from_value(item).map_err(|err| {
format!("invalid {PUBLIC_TOOL_NAME} content item at index {index}: {err}")
})
})
.collect()
}
async fn build_enabled_tools(exec: &ExecContext) -> Vec<protocol::EnabledTool> {
pub(super) async fn build_enabled_tools(
exec: &ExecContext,
) -> Vec<codex_code_mode::ToolDefinition> {
let router = build_nested_router(exec).await;
let mut out = router
.specs()
@@ -238,39 +250,37 @@ async fn build_enabled_tools(exec: &ExecContext) -> Vec<protocol::EnabledTool> {
.map(|spec| augment_tool_spec_for_code_mode(spec, /*code_mode_enabled*/ true))
.filter_map(enabled_tool_from_spec)
.collect::<Vec<_>>();
out.sort_by(|left, right| left.tool_name.cmp(&right.tool_name));
out.dedup_by(|left, right| left.tool_name == right.tool_name);
out.sort_by(|left, right| left.name.cmp(&right.name));
out.dedup_by(|left, right| left.name == right.name);
out
}
fn enabled_tool_from_spec(spec: ToolSpec) -> Option<protocol::EnabledTool> {
fn enabled_tool_from_spec(spec: ToolSpec) -> Option<codex_code_mode::ToolDefinition> {
let tool_name = spec.name().to_string();
if !is_code_mode_nested_tool(&tool_name) {
if !codex_code_mode::is_code_mode_nested_tool(&tool_name) {
return None;
}
let reference = code_mode_tool_reference(&tool_name);
let global_name = normalize_code_mode_identifier(&tool_name);
let (description, kind) = match spec {
ToolSpec::Function(tool) => (tool.description, protocol::CodeModeToolKind::Function),
ToolSpec::Freeform(tool) => (tool.description, protocol::CodeModeToolKind::Freeform),
match spec {
ToolSpec::Function(tool) => Some(codex_code_mode::ToolDefinition {
name: tool_name,
description: tool.description,
kind: codex_code_mode::CodeModeToolKind::Function,
input_schema: serde_json::to_value(&tool.parameters).ok(),
output_schema: tool.output_schema,
}),
ToolSpec::Freeform(tool) => Some(codex_code_mode::ToolDefinition {
name: tool_name,
description: tool.description,
kind: codex_code_mode::CodeModeToolKind::Freeform,
input_schema: None,
output_schema: None,
}),
ToolSpec::LocalShell {}
| ToolSpec::ImageGeneration { .. }
| ToolSpec::ToolSearch { .. }
| ToolSpec::WebSearch { .. } => {
return None;
}
};
Some(protocol::EnabledTool {
tool_name,
global_name,
module_path: reference.module_path,
namespace: reference.namespace,
name: normalize_code_mode_identifier(&reference.tool_key),
description,
kind,
})
| ToolSpec::WebSearch { .. } => None,
}
}
async fn build_nested_router(exec: &ExecContext) -> ToolRouter {
@@ -303,7 +313,7 @@ async fn call_nested_tool(
tool_runtime: ToolCallRuntime,
tool_name: String,
input: Option<JsonValue>,
cancellation_token: tokio_util::sync::CancellationToken,
cancellation_token: CancellationToken,
) -> Result<JsonValue, FunctionCallError> {
if tool_name == PUBLIC_TOOL_NAME {
return Err(FunctionCallError::RespondToModel(format!(
@@ -340,18 +350,18 @@ async fn call_nested_tool(
Ok(result.code_mode_result())
}
fn tool_kind_for_spec(spec: &ToolSpec) -> protocol::CodeModeToolKind {
fn tool_kind_for_spec(spec: &ToolSpec) -> codex_code_mode::CodeModeToolKind {
if matches!(spec, ToolSpec::Freeform(_)) {
protocol::CodeModeToolKind::Freeform
codex_code_mode::CodeModeToolKind::Freeform
} else {
protocol::CodeModeToolKind::Function
codex_code_mode::CodeModeToolKind::Function
}
}
fn tool_kind_for_name(
spec: Option<ToolSpec>,
tool_name: &str,
) -> Result<protocol::CodeModeToolKind, String> {
) -> Result<codex_code_mode::CodeModeToolKind, String> {
spec.as_ref()
.map(tool_kind_for_spec)
.ok_or_else(|| format!("tool `{tool_name}` is not enabled in {PUBLIC_TOOL_NAME}"))
@@ -364,8 +374,12 @@ fn build_nested_tool_payload(
) -> Result<ToolPayload, String> {
let actual_kind = tool_kind_for_name(spec, tool_name)?;
match actual_kind {
protocol::CodeModeToolKind::Function => build_function_tool_payload(tool_name, input),
protocol::CodeModeToolKind::Freeform => build_freeform_tool_payload(tool_name, input),
codex_code_mode::CodeModeToolKind::Function => {
build_function_tool_payload(tool_name, input)
}
codex_code_mode::CodeModeToolKind::Freeform => {
build_freeform_tool_payload(tool_name, input)
}
}
}

View File

@@ -1,173 +0,0 @@
use std::collections::HashMap;
use std::sync::Arc;
use tokio::io::AsyncBufReadExt;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tracing::warn;
use super::CODE_MODE_RUNNER_SOURCE;
use super::PUBLIC_TOOL_NAME;
use super::protocol::HostToNodeMessage;
use super::protocol::NodeToHostMessage;
use super::protocol::message_request_id;
pub(super) struct CodeModeProcess {
pub(super) child: tokio::process::Child,
pub(super) stdin: Arc<Mutex<tokio::process::ChildStdin>>,
pub(super) stdout_task: JoinHandle<()>,
pub(super) response_waiters: Arc<Mutex<HashMap<String, oneshot::Sender<NodeToHostMessage>>>>,
pub(super) message_rx: Arc<Mutex<mpsc::UnboundedReceiver<NodeToHostMessage>>>,
}
impl CodeModeProcess {
pub(super) async fn send(
&mut self,
request_id: &str,
message: &HostToNodeMessage,
) -> Result<NodeToHostMessage, std::io::Error> {
if self.stdout_task.is_finished() {
return Err(std::io::Error::other(format!(
"{PUBLIC_TOOL_NAME} runner is not available"
)));
}
let (tx, rx) = oneshot::channel();
self.response_waiters
.lock()
.await
.insert(request_id.to_string(), tx);
if let Err(err) = write_message(&self.stdin, message).await {
self.response_waiters.lock().await.remove(request_id);
return Err(err);
}
match rx.await {
Ok(message) => Ok(message),
Err(_) => Err(std::io::Error::other(format!(
"{PUBLIC_TOOL_NAME} runner is not available"
))),
}
}
pub(super) fn has_exited(&mut self) -> Result<bool, std::io::Error> {
self.child
.try_wait()
.map(|status| status.is_some())
.map_err(std::io::Error::other)
}
}
pub(super) async fn spawn_code_mode_process(
node_path: &std::path::Path,
) -> Result<CodeModeProcess, std::io::Error> {
let mut cmd = tokio::process::Command::new(node_path);
cmd.arg("--experimental-vm-modules");
cmd.arg("--eval");
cmd.arg(CODE_MODE_RUNNER_SOURCE);
cmd.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.kill_on_drop(true);
let mut child = cmd.spawn().map_err(std::io::Error::other)?;
let stdout = child.stdout.take().ok_or_else(|| {
std::io::Error::other(format!("{PUBLIC_TOOL_NAME} runner missing stdout"))
})?;
let stderr = child.stderr.take().ok_or_else(|| {
std::io::Error::other(format!("{PUBLIC_TOOL_NAME} runner missing stderr"))
})?;
let stdin = child
.stdin
.take()
.ok_or_else(|| std::io::Error::other(format!("{PUBLIC_TOOL_NAME} runner missing stdin")))?;
let stdin = Arc::new(Mutex::new(stdin));
let response_waiters = Arc::new(Mutex::new(HashMap::<
String,
oneshot::Sender<NodeToHostMessage>,
>::new()));
let (message_tx, message_rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
let mut reader = BufReader::new(stderr);
let mut buf = Vec::new();
match reader.read_to_end(&mut buf).await {
Ok(_) => {
let stderr = String::from_utf8_lossy(&buf).trim().to_string();
if !stderr.is_empty() {
warn!("{PUBLIC_TOOL_NAME} runner stderr: {stderr}");
}
}
Err(err) => {
warn!("failed to read {PUBLIC_TOOL_NAME} stderr: {err}");
}
}
});
let stdout_task = tokio::spawn({
let response_waiters = Arc::clone(&response_waiters);
async move {
let mut stdout_lines = BufReader::new(stdout).lines();
loop {
let line = match stdout_lines.next_line().await {
Ok(line) => line,
Err(err) => {
warn!("failed to read {PUBLIC_TOOL_NAME} stdout: {err}");
break;
}
};
let Some(line) = line else {
break;
};
if line.trim().is_empty() {
continue;
}
let message: NodeToHostMessage = match serde_json::from_str(&line) {
Ok(message) => message,
Err(err) => {
warn!("failed to parse {PUBLIC_TOOL_NAME} stdout message: {err}");
break;
}
};
match message {
message @ (NodeToHostMessage::ToolCall { .. }
| NodeToHostMessage::Notify { .. }) => {
let _ = message_tx.send(message);
}
message => {
if let Some(request_id) = message_request_id(&message)
&& let Some(waiter) = response_waiters.lock().await.remove(request_id)
{
let _ = waiter.send(message);
}
}
}
}
response_waiters.lock().await.clear();
}
});
Ok(CodeModeProcess {
child,
stdin,
stdout_task,
response_waiters,
message_rx: Arc::new(Mutex::new(message_rx)),
})
}
pub(super) async fn write_message(
stdin: &Arc<Mutex<tokio::process::ChildStdin>>,
message: &HostToNodeMessage,
) -> Result<(), std::io::Error> {
let line = serde_json::to_string(message).map_err(std::io::Error::other)?;
let mut stdin = stdin.lock().await;
stdin.write_all(line.as_bytes()).await?;
stdin.write_all(b"\n").await?;
stdin.flush().await?;
Ok(())
}

View File

@@ -1,169 +0,0 @@
use std::collections::HashMap;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JsonValue;
use super::CODE_MODE_BRIDGE_SOURCE;
use super::PUBLIC_TOOL_NAME;
#[derive(Clone, Copy, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")]
pub(super) enum CodeModeToolKind {
Function,
Freeform,
}
#[derive(Clone, Debug, Serialize)]
pub(super) struct EnabledTool {
pub(super) tool_name: String,
pub(super) global_name: String,
#[serde(rename = "module")]
pub(super) module_path: String,
pub(super) namespace: Vec<String>,
pub(super) name: String,
pub(super) description: String,
pub(super) kind: CodeModeToolKind,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
pub(super) struct CodeModeToolCall {
pub(super) request_id: String,
pub(super) id: String,
pub(super) name: String,
#[serde(default)]
pub(super) input: Option<JsonValue>,
}
#[derive(Clone, Debug, Deserialize)]
pub(super) struct CodeModeNotify {
pub(super) cell_id: String,
pub(super) call_id: String,
pub(super) text: String,
}
#[derive(Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub(super) enum HostToNodeMessage {
Start {
request_id: String,
cell_id: String,
tool_call_id: String,
default_yield_time_ms: u64,
enabled_tools: Vec<EnabledTool>,
stored_values: HashMap<String, JsonValue>,
source: String,
yield_time_ms: Option<u64>,
max_output_tokens: Option<usize>,
},
Poll {
request_id: String,
cell_id: String,
yield_time_ms: u64,
},
Terminate {
request_id: String,
cell_id: String,
},
Response {
request_id: String,
id: String,
code_mode_result: JsonValue,
#[serde(default)]
error_text: Option<String>,
},
}
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub(super) enum NodeToHostMessage {
ToolCall {
#[serde(flatten)]
tool_call: CodeModeToolCall,
},
Yielded {
request_id: String,
content_items: Vec<JsonValue>,
},
Terminated {
request_id: String,
content_items: Vec<JsonValue>,
},
Notify {
#[serde(flatten)]
notify: CodeModeNotify,
},
Result {
request_id: String,
content_items: Vec<JsonValue>,
stored_values: HashMap<String, JsonValue>,
#[serde(default)]
error_text: Option<String>,
#[serde(default)]
max_output_tokens_per_exec_call: Option<usize>,
},
}
pub(super) fn build_source(
user_code: &str,
enabled_tools: &[EnabledTool],
) -> Result<String, String> {
let enabled_tools_json = serde_json::to_string(enabled_tools)
.map_err(|err| format!("failed to serialize enabled tools: {err}"))?;
Ok(CODE_MODE_BRIDGE_SOURCE
.replace(
"__CODE_MODE_ENABLED_TOOLS_PLACEHOLDER__",
&enabled_tools_json,
)
.replace("__CODE_MODE_USER_CODE_PLACEHOLDER__", user_code))
}
pub(super) fn message_request_id(message: &NodeToHostMessage) -> Option<&str> {
match message {
NodeToHostMessage::ToolCall { .. } => None,
NodeToHostMessage::Yielded { request_id, .. }
| NodeToHostMessage::Terminated { request_id, .. }
| NodeToHostMessage::Result { request_id, .. } => Some(request_id),
NodeToHostMessage::Notify { .. } => None,
}
}
pub(super) fn unexpected_tool_call_error() -> String {
format!("{PUBLIC_TOOL_NAME} received an unexpected tool call response")
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use super::CodeModeNotify;
use super::NodeToHostMessage;
use super::message_request_id;
#[test]
fn message_request_id_absent_for_notify() {
let message = NodeToHostMessage::Notify {
notify: CodeModeNotify {
cell_id: "1".to_string(),
call_id: "call-1".to_string(),
text: "hello".to_string(),
},
};
assert_eq!(None, message_request_id(&message));
}
#[test]
fn message_request_id_present_for_result() {
let message = NodeToHostMessage::Result {
request_id: "req-1".to_string(),
content_items: Vec::new(),
stored_values: HashMap::new(),
error_text: None,
max_output_tokens_per_exec_call: None,
};
assert_eq!(Some("req-1"), message_request_id(&message));
}
}

View File

@@ -0,0 +1,44 @@
use codex_code_mode::ImageDetail as CodeModeImageDetail;
use codex_protocol::models::FunctionCallOutputContentItem;
use codex_protocol::models::ImageDetail;
trait IntoProtocol<T> {
fn into_protocol(self) -> T;
}
pub(super) fn into_function_call_output_content_items(
items: Vec<codex_code_mode::FunctionCallOutputContentItem>,
) -> Vec<FunctionCallOutputContentItem> {
items.into_iter().map(IntoProtocol::into_protocol).collect()
}
impl IntoProtocol<ImageDetail> for CodeModeImageDetail {
fn into_protocol(self) -> ImageDetail {
let value = self;
match value {
CodeModeImageDetail::Auto => ImageDetail::Auto,
CodeModeImageDetail::Low => ImageDetail::Low,
CodeModeImageDetail::High => ImageDetail::High,
CodeModeImageDetail::Original => ImageDetail::Original,
}
}
}
impl IntoProtocol<FunctionCallOutputContentItem>
for codex_code_mode::FunctionCallOutputContentItem
{
fn into_protocol(self) -> FunctionCallOutputContentItem {
let value = self;
match value {
codex_code_mode::FunctionCallOutputContentItem::InputText { text } => {
FunctionCallOutputContentItem::InputText { text }
}
codex_code_mode::FunctionCallOutputContentItem::InputImage { image_url, detail } => {
FunctionCallOutputContentItem::InputImage {
image_url,
detail: detail.map(IntoProtocol::into_protocol),
}
}
}
}
}

View File

@@ -1,938 +0,0 @@
'use strict';
const readline = require('node:readline');
const { Worker } = require('node:worker_threads');
const DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL = 10000;
function normalizeMaxOutputTokensPerExecCall(value) {
if (!Number.isSafeInteger(value) || value < 0) {
throw new TypeError('max_output_tokens_per_exec_call must be a non-negative safe integer');
}
return value;
}
function normalizeYieldTime(value) {
if (!Number.isSafeInteger(value) || value < 0) {
throw new TypeError('yield_time must be a non-negative safe integer');
}
return value;
}
function formatErrorText(error) {
return String(error && error.stack ? error.stack : error);
}
function cloneJsonValue(value) {
return JSON.parse(JSON.stringify(value));
}
function clearTimer(timer) {
if (timer !== null) {
clearTimeout(timer);
}
return null;
}
function takeContentItems(session) {
const clonedContentItems = cloneJsonValue(session.content_items);
session.content_items.splice(0, session.content_items.length);
return Array.isArray(clonedContentItems) ? clonedContentItems : [];
}
function codeModeWorkerMain() {
'use strict';
const { parentPort, workerData } = require('node:worker_threads');
const vm = require('node:vm');
const { SourceTextModule, SyntheticModule } = vm;
function formatErrorText(error) {
return String(error && error.stack ? error.stack : error);
}
function cloneJsonValue(value) {
return JSON.parse(JSON.stringify(value));
}
class CodeModeExitSignal extends Error {
constructor() {
super('code mode exit');
this.name = 'CodeModeExitSignal';
}
}
function isCodeModeExitSignal(error) {
return error instanceof CodeModeExitSignal;
}
function createToolCaller() {
let nextId = 0;
const pending = new Map();
parentPort.on('message', (message) => {
if (message.type === 'tool_response') {
const entry = pending.get(message.id);
if (!entry) {
return;
}
pending.delete(message.id);
entry.resolve(message.result ?? '');
return;
}
if (message.type === 'tool_response_error') {
const entry = pending.get(message.id);
if (!entry) {
return;
}
pending.delete(message.id);
entry.reject(new Error(message.error_text ?? 'tool call failed'));
return;
}
});
return (name, input) => {
const id = 'msg-' + ++nextId;
return new Promise((resolve, reject) => {
pending.set(id, { resolve, reject });
parentPort.postMessage({
type: 'tool_call',
id,
name: String(name),
input,
});
});
};
}
function createContentItems() {
const contentItems = [];
const push = contentItems.push.bind(contentItems);
contentItems.push = (...items) => {
for (const item of items) {
parentPort.postMessage({
type: 'content_item',
item: cloneJsonValue(item),
});
}
return push(...items);
};
parentPort.on('message', (message) => {
if (message.type === 'clear_content') {
contentItems.splice(0, contentItems.length);
}
});
return contentItems;
}
function createGlobalToolsNamespace(callTool, enabledTools) {
const tools = Object.create(null);
for (const { tool_name, global_name } of enabledTools) {
Object.defineProperty(tools, global_name, {
value: async (args) => callTool(tool_name, args),
configurable: false,
enumerable: true,
writable: false,
});
}
return Object.freeze(tools);
}
function createModuleToolsNamespace(callTool, enabledTools) {
const tools = Object.create(null);
for (const { tool_name, global_name } of enabledTools) {
Object.defineProperty(tools, global_name, {
value: async (args) => callTool(tool_name, args),
configurable: false,
enumerable: true,
writable: false,
});
}
return Object.freeze(tools);
}
function createAllToolsMetadata(enabledTools) {
return Object.freeze(
enabledTools.map(({ global_name, description }) =>
Object.freeze({
name: global_name,
description,
})
)
);
}
function createToolsModule(context, callTool, enabledTools) {
const tools = createModuleToolsNamespace(callTool, enabledTools);
const allTools = createAllToolsMetadata(enabledTools);
const exportNames = ['ALL_TOOLS'];
for (const { global_name } of enabledTools) {
if (global_name !== 'ALL_TOOLS') {
exportNames.push(global_name);
}
}
const uniqueExportNames = [...new Set(exportNames)];
return new SyntheticModule(
uniqueExportNames,
function initToolsModule() {
this.setExport('ALL_TOOLS', allTools);
for (const exportName of uniqueExportNames) {
if (exportName !== 'ALL_TOOLS') {
this.setExport(exportName, tools[exportName]);
}
}
},
{ context }
);
}
function ensureContentItems(context) {
if (!Array.isArray(context.__codexContentItems)) {
context.__codexContentItems = [];
}
return context.__codexContentItems;
}
function serializeOutputText(value) {
if (typeof value === 'string') {
return value;
}
if (
typeof value === 'undefined' ||
value === null ||
typeof value === 'boolean' ||
typeof value === 'number' ||
typeof value === 'bigint'
) {
return String(value);
}
const serialized = JSON.stringify(value);
if (typeof serialized === 'string') {
return serialized;
}
return String(value);
}
function normalizeOutputImage(value) {
let imageUrl;
let detail;
if (typeof value === 'string') {
imageUrl = value;
} else if (
value &&
typeof value === 'object' &&
!Array.isArray(value)
) {
if (typeof value.image_url === 'string') {
imageUrl = value.image_url;
}
if (typeof value.detail === 'string') {
detail = value.detail;
} else if (
Object.prototype.hasOwnProperty.call(value, 'detail') &&
value.detail !== null &&
typeof value.detail !== 'undefined'
) {
throw new TypeError('image detail must be a string when provided');
}
}
if (typeof imageUrl !== 'string' || !imageUrl) {
throw new TypeError(
'image expects a non-empty image URL string or an object with image_url and optional detail'
);
}
if (!/^(?:https?:\/\/|data:)/i.test(imageUrl)) {
throw new TypeError('image expects an http(s) or data URL');
}
if (typeof detail !== 'undefined' && !/^(?:auto|low|high|original)$/i.test(detail)) {
throw new TypeError('image detail must be one of: auto, low, high, original');
}
const normalized = { image_url: imageUrl };
if (typeof detail === 'string') {
normalized.detail = detail.toLowerCase();
}
return normalized;
}
function createCodeModeHelpers(context, state, toolCallId) {
const load = (key) => {
if (typeof key !== 'string') {
throw new TypeError('load key must be a string');
}
if (!Object.prototype.hasOwnProperty.call(state.storedValues, key)) {
return undefined;
}
return cloneJsonValue(state.storedValues[key]);
};
const store = (key, value) => {
if (typeof key !== 'string') {
throw new TypeError('store key must be a string');
}
state.storedValues[key] = cloneJsonValue(value);
};
const text = (value) => {
const item = {
type: 'input_text',
text: serializeOutputText(value),
};
ensureContentItems(context).push(item);
return item;
};
const image = (value) => {
const item = Object.assign({ type: 'input_image' }, normalizeOutputImage(value));
ensureContentItems(context).push(item);
return item;
};
const yieldControl = () => {
parentPort.postMessage({ type: 'yield' });
};
const notify = (value) => {
const text = serializeOutputText(value);
if (text.trim().length === 0) {
throw new TypeError('notify expects non-empty text');
}
if (typeof toolCallId !== 'string' || toolCallId.length === 0) {
throw new TypeError('notify requires a valid tool call id');
}
parentPort.postMessage({
type: 'notify',
call_id: toolCallId,
text,
});
return text;
};
const exit = () => {
throw new CodeModeExitSignal();
};
return Object.freeze({
exit,
image,
load,
notify,
output_image: image,
output_text: text,
store,
text,
yield_control: yieldControl,
});
}
function createCodeModeModule(context, helpers) {
return new SyntheticModule(
[
'exit',
'image',
'load',
'notify',
'output_text',
'output_image',
'store',
'text',
'yield_control',
],
function initCodeModeModule() {
this.setExport('exit', helpers.exit);
this.setExport('image', helpers.image);
this.setExport('load', helpers.load);
this.setExport('notify', helpers.notify);
this.setExport('output_text', helpers.output_text);
this.setExport('output_image', helpers.output_image);
this.setExport('store', helpers.store);
this.setExport('text', helpers.text);
this.setExport('yield_control', helpers.yield_control);
},
{ context }
);
}
function createBridgeRuntime(callTool, enabledTools, helpers) {
return Object.freeze({
ALL_TOOLS: createAllToolsMetadata(enabledTools),
exit: helpers.exit,
image: helpers.image,
load: helpers.load,
notify: helpers.notify,
store: helpers.store,
text: helpers.text,
tools: createGlobalToolsNamespace(callTool, enabledTools),
yield_control: helpers.yield_control,
});
}
function namespacesMatch(left, right) {
if (left.length !== right.length) {
return false;
}
return left.every((segment, index) => segment === right[index]);
}
function createNamespacedToolsNamespace(callTool, enabledTools, namespace) {
const tools = Object.create(null);
for (const tool of enabledTools) {
const toolNamespace = Array.isArray(tool.namespace) ? tool.namespace : [];
if (!namespacesMatch(toolNamespace, namespace)) {
continue;
}
Object.defineProperty(tools, tool.name, {
value: async (args) => callTool(tool.tool_name, args),
configurable: false,
enumerable: true,
writable: false,
});
}
return Object.freeze(tools);
}
function createNamespacedToolsModule(context, callTool, enabledTools, namespace) {
const tools = createNamespacedToolsNamespace(callTool, enabledTools, namespace);
const exportNames = [];
for (const exportName of Object.keys(tools)) {
if (exportName !== 'ALL_TOOLS') {
exportNames.push(exportName);
}
}
const uniqueExportNames = [...new Set(exportNames)];
return new SyntheticModule(
uniqueExportNames,
function initNamespacedToolsModule() {
for (const exportName of uniqueExportNames) {
this.setExport(exportName, tools[exportName]);
}
},
{ context }
);
}
function createModuleResolver(context, callTool, enabledTools, helpers) {
let toolsModule;
let codeModeModule;
const namespacedModules = new Map();
return function resolveModule(specifier) {
if (specifier === 'tools.js') {
toolsModule ??= createToolsModule(context, callTool, enabledTools);
return toolsModule;
}
if (specifier === '@openai/code_mode' || specifier === 'openai/code_mode') {
codeModeModule ??= createCodeModeModule(context, helpers);
return codeModeModule;
}
const namespacedMatch = /^tools\/(.+)\.js$/.exec(specifier);
if (!namespacedMatch) {
throw new Error('Unsupported import in exec: ' + specifier);
}
const namespace = namespacedMatch[1]
.split('/')
.filter((segment) => segment.length > 0);
if (namespace.length === 0) {
throw new Error('Unsupported import in exec: ' + specifier);
}
const cacheKey = namespace.join('/');
if (!namespacedModules.has(cacheKey)) {
namespacedModules.set(
cacheKey,
createNamespacedToolsModule(context, callTool, enabledTools, namespace)
);
}
return namespacedModules.get(cacheKey);
};
}
async function resolveDynamicModule(specifier, resolveModule) {
const module = resolveModule(specifier);
if (module.status === 'unlinked') {
await module.link(resolveModule);
}
if (module.status === 'linked' || module.status === 'evaluating') {
await module.evaluate();
}
if (module.status === 'errored') {
throw module.error;
}
return module;
}
async function runModule(context, start, callTool, helpers) {
const resolveModule = createModuleResolver(
context,
callTool,
start.enabled_tools ?? [],
helpers
);
const mainModule = new SourceTextModule(start.source, {
context,
identifier: 'exec_main.mjs',
importModuleDynamically: async (specifier) =>
resolveDynamicModule(specifier, resolveModule),
});
await mainModule.link(resolveModule);
await mainModule.evaluate();
}
async function main() {
const start = workerData ?? {};
const toolCallId = start.tool_call_id;
const state = {
storedValues: cloneJsonValue(start.stored_values ?? {}),
};
const callTool = createToolCaller();
const enabledTools = start.enabled_tools ?? [];
const contentItems = createContentItems();
const context = vm.createContext({
__codexContentItems: contentItems,
});
const helpers = createCodeModeHelpers(context, state, toolCallId);
Object.defineProperty(context, '__codexRuntime', {
value: createBridgeRuntime(callTool, enabledTools, helpers),
configurable: true,
enumerable: false,
writable: false,
});
parentPort.postMessage({ type: 'started' });
try {
await runModule(context, start, callTool, helpers);
parentPort.postMessage({
type: 'result',
stored_values: state.storedValues,
});
} catch (error) {
if (isCodeModeExitSignal(error)) {
parentPort.postMessage({
type: 'result',
stored_values: state.storedValues,
});
return;
}
parentPort.postMessage({
type: 'result',
stored_values: state.storedValues,
error_text: formatErrorText(error),
});
}
}
void main().catch((error) => {
parentPort.postMessage({
type: 'result',
stored_values: {},
error_text: formatErrorText(error),
});
});
}
function createProtocol() {
const rl = readline.createInterface({
input: process.stdin,
crlfDelay: Infinity,
});
let nextId = 0;
const pending = new Map();
const sessions = new Map();
let closedResolve;
const closed = new Promise((resolve) => {
closedResolve = resolve;
});
rl.on('line', (line) => {
if (!line.trim()) {
return;
}
let message;
try {
message = JSON.parse(line);
} catch (error) {
process.stderr.write(formatErrorText(error) + '\n');
return;
}
if (message.type === 'start') {
startSession(protocol, sessions, message);
return;
}
if (message.type === 'poll') {
const session = sessions.get(message.cell_id);
if (session) {
session.request_id = String(message.request_id);
if (session.pending_result) {
void completeSession(protocol, sessions, session, session.pending_result);
} else {
schedulePollYield(protocol, session, normalizeYieldTime(message.yield_time_ms ?? 0));
}
} else {
void protocol.send({
type: 'result',
request_id: message.request_id,
content_items: [],
stored_values: {},
error_text: `exec cell ${message.cell_id} not found`,
max_output_tokens_per_exec_call: DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL,
});
}
return;
}
if (message.type === 'terminate') {
const session = sessions.get(message.cell_id);
if (session) {
session.request_id = String(message.request_id);
void terminateSession(protocol, sessions, session);
} else {
void protocol.send({
type: 'result',
request_id: message.request_id,
content_items: [],
stored_values: {},
error_text: `exec cell ${message.cell_id} not found`,
max_output_tokens_per_exec_call: DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL,
});
}
return;
}
if (message.type === 'response') {
const entry = pending.get(message.request_id + ':' + message.id);
if (!entry) {
return;
}
pending.delete(message.request_id + ':' + message.id);
if (typeof message.error_text === 'string') {
entry.reject(new Error(message.error_text));
return;
}
entry.resolve(message.code_mode_result ?? '');
return;
}
process.stderr.write('Unknown protocol message type: ' + message.type + '\n');
});
rl.on('close', () => {
const error = new Error('stdin closed');
for (const entry of pending.values()) {
entry.reject(error);
}
pending.clear();
for (const session of sessions.values()) {
session.initial_yield_timer = clearTimer(session.initial_yield_timer);
session.poll_yield_timer = clearTimer(session.poll_yield_timer);
void session.worker.terminate().catch(() => {});
}
sessions.clear();
closedResolve();
});
function send(message) {
return new Promise((resolve, reject) => {
process.stdout.write(JSON.stringify(message) + '\n', (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
}
function request(type, payload) {
const requestId = 'req-' + ++nextId;
const id = 'msg-' + ++nextId;
const pendingKey = requestId + ':' + id;
return new Promise((resolve, reject) => {
pending.set(pendingKey, { resolve, reject });
void send({ type, request_id: requestId, id, ...payload }).catch((error) => {
pending.delete(pendingKey);
reject(error);
});
});
}
const protocol = { closed, request, send };
return protocol;
}
function sessionWorkerSource() {
return '(' + codeModeWorkerMain.toString() + ')();';
}
function startSession(protocol, sessions, start) {
if (typeof start.tool_call_id !== 'string' || start.tool_call_id.length === 0) {
throw new TypeError('start requires a valid tool_call_id');
}
const maxOutputTokensPerExecCall =
start.max_output_tokens == null
? DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL
: normalizeMaxOutputTokensPerExecCall(start.max_output_tokens);
const session = {
completed: false,
content_items: [],
default_yield_time_ms: normalizeYieldTime(start.default_yield_time_ms),
id: start.cell_id,
initial_yield_time_ms:
start.yield_time_ms == null
? normalizeYieldTime(start.default_yield_time_ms)
: normalizeYieldTime(start.yield_time_ms),
initial_yield_timer: null,
initial_yield_triggered: false,
max_output_tokens_per_exec_call: maxOutputTokensPerExecCall,
pending_result: null,
poll_yield_timer: null,
request_id: String(start.request_id),
worker: new Worker(sessionWorkerSource(), {
eval: true,
workerData: start,
}),
};
sessions.set(session.id, session);
session.worker.on('message', (message) => {
void handleWorkerMessage(protocol, sessions, session, message).catch((error) => {
void completeSession(protocol, sessions, session, {
type: 'result',
stored_values: {},
error_text: formatErrorText(error),
});
});
});
session.worker.on('error', (error) => {
void completeSession(protocol, sessions, session, {
type: 'result',
stored_values: {},
error_text: formatErrorText(error),
});
});
session.worker.on('exit', (code) => {
if (code !== 0 && !session.completed) {
void completeSession(protocol, sessions, session, {
type: 'result',
stored_values: {},
error_text: 'exec worker exited with code ' + code,
});
}
});
}
async function handleWorkerMessage(protocol, sessions, session, message) {
if (session.completed) {
return;
}
if (message.type === 'content_item') {
session.content_items.push(cloneJsonValue(message.item));
return;
}
if (message.type === 'started') {
scheduleInitialYield(protocol, session, session.initial_yield_time_ms);
return;
}
if (message.type === 'yield') {
void sendYielded(protocol, session);
return;
}
if (message.type === 'notify') {
if (typeof message.text !== 'string' || message.text.trim().length === 0) {
throw new TypeError('notify requires non-empty text');
}
if (typeof message.call_id !== 'string' || message.call_id.length === 0) {
throw new TypeError('notify requires a valid call id');
}
await protocol.send({
type: 'notify',
cell_id: session.id,
call_id: message.call_id,
text: message.text,
});
return;
}
if (message.type === 'tool_call') {
void forwardToolCall(protocol, session, message);
return;
}
if (message.type === 'result') {
const result = {
type: 'result',
stored_values: cloneJsonValue(message.stored_values ?? {}),
error_text:
typeof message.error_text === 'string' ? message.error_text : undefined,
};
if (session.request_id === null) {
session.pending_result = result;
session.initial_yield_timer = clearTimer(session.initial_yield_timer);
session.poll_yield_timer = clearTimer(session.poll_yield_timer);
return;
}
await completeSession(protocol, sessions, session, result);
return;
}
process.stderr.write('Unknown worker message type: ' + message.type + '\n');
}
async function forwardToolCall(protocol, session, message) {
try {
const result = await protocol.request('tool_call', {
name: String(message.name),
input: message.input,
});
if (session.completed) {
return;
}
try {
session.worker.postMessage({
type: 'tool_response',
id: message.id,
result,
});
} catch {}
} catch (error) {
if (session.completed) {
return;
}
try {
session.worker.postMessage({
type: 'tool_response_error',
id: message.id,
error_text: formatErrorText(error),
});
} catch {}
}
}
async function sendYielded(protocol, session) {
if (session.completed || session.request_id === null) {
return;
}
session.initial_yield_timer = clearTimer(session.initial_yield_timer);
session.initial_yield_triggered = true;
session.poll_yield_timer = clearTimer(session.poll_yield_timer);
const contentItems = takeContentItems(session);
const requestId = session.request_id;
try {
session.worker.postMessage({ type: 'clear_content' });
} catch {}
await protocol.send({
type: 'yielded',
request_id: requestId,
content_items: contentItems,
});
session.request_id = null;
}
function scheduleInitialYield(protocol, session, yieldTime) {
if (session.completed || session.initial_yield_triggered) {
return yieldTime;
}
session.initial_yield_timer = clearTimer(session.initial_yield_timer);
session.initial_yield_timer = setTimeout(() => {
session.initial_yield_timer = null;
session.initial_yield_triggered = true;
void sendYielded(protocol, session);
}, yieldTime);
return yieldTime;
}
function schedulePollYield(protocol, session, yieldTime) {
if (session.completed) {
return;
}
session.poll_yield_timer = clearTimer(session.poll_yield_timer);
session.poll_yield_timer = setTimeout(() => {
session.poll_yield_timer = null;
void sendYielded(protocol, session);
}, yieldTime);
}
async function completeSession(protocol, sessions, session, message) {
if (session.completed) {
return;
}
if (session.request_id === null) {
session.pending_result = message;
session.initial_yield_timer = clearTimer(session.initial_yield_timer);
session.poll_yield_timer = clearTimer(session.poll_yield_timer);
return;
}
const requestId = session.request_id;
session.completed = true;
session.initial_yield_timer = clearTimer(session.initial_yield_timer);
session.poll_yield_timer = clearTimer(session.poll_yield_timer);
sessions.delete(session.id);
const contentItems = takeContentItems(session);
session.pending_result = null;
try {
session.worker.postMessage({ type: 'clear_content' });
} catch {}
await protocol.send({
...message,
request_id: requestId,
content_items: contentItems,
max_output_tokens_per_exec_call: session.max_output_tokens_per_exec_call,
});
}
async function terminateSession(protocol, sessions, session) {
if (session.completed) {
return;
}
session.completed = true;
session.initial_yield_timer = clearTimer(session.initial_yield_timer);
session.poll_yield_timer = clearTimer(session.poll_yield_timer);
sessions.delete(session.id);
const contentItems = takeContentItems(session);
try {
await session.worker.terminate();
} catch {}
await protocol.send({
type: 'terminated',
request_id: session.request_id,
content_items: contentItems,
});
}
async function main() {
const protocol = createProtocol();
await protocol.closed;
}
void main().catch(async (error) => {
try {
process.stderr.write(formatErrorText(error) + '\n');
} finally {
process.exitCode = 1;
}
});

View File

@@ -1,108 +0,0 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use serde_json::Value as JsonValue;
use tokio::sync::Mutex;
use tracing::warn;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::tools::ToolRouter;
use crate::tools::context::SharedTurnDiffTracker;
use crate::tools::js_repl::resolve_compatible_node;
use crate::tools::parallel::ToolCallRuntime;
use codex_features::Feature;
use super::ExecContext;
use super::PUBLIC_TOOL_NAME;
use super::process::CodeModeProcess;
use super::process::spawn_code_mode_process;
use super::worker::CodeModeWorker;
pub(crate) struct CodeModeService {
js_repl_node_path: Option<PathBuf>,
stored_values: Mutex<HashMap<String, JsonValue>>,
process: Arc<Mutex<Option<CodeModeProcess>>>,
next_cell_id: Mutex<u64>,
}
impl CodeModeService {
pub(crate) fn new(js_repl_node_path: Option<PathBuf>) -> Self {
Self {
js_repl_node_path,
stored_values: Mutex::new(HashMap::new()),
process: Arc::new(Mutex::new(None)),
next_cell_id: Mutex::new(1),
}
}
pub(crate) async fn stored_values(&self) -> HashMap<String, JsonValue> {
self.stored_values.lock().await.clone()
}
pub(crate) async fn replace_stored_values(&self, values: HashMap<String, JsonValue>) {
*self.stored_values.lock().await = values;
}
pub(super) async fn ensure_started(
&self,
) -> Result<tokio::sync::OwnedMutexGuard<Option<CodeModeProcess>>, std::io::Error> {
let mut process_slot = self.process.lock().await;
let needs_spawn = match process_slot.as_mut() {
Some(process) => !matches!(process.has_exited(), Ok(false)),
None => true,
};
if needs_spawn {
let node_path = resolve_compatible_node(self.js_repl_node_path.as_deref())
.await
.map_err(std::io::Error::other)?;
*process_slot = Some(spawn_code_mode_process(&node_path).await?);
}
drop(process_slot);
Ok(self.process.clone().lock_owned().await)
}
pub(crate) async fn start_turn_worker(
&self,
session: &Arc<Session>,
turn: &Arc<TurnContext>,
router: Arc<ToolRouter>,
tracker: SharedTurnDiffTracker,
) -> Option<CodeModeWorker> {
if !turn.features.enabled(Feature::CodeMode) {
return None;
}
let exec = ExecContext {
session: Arc::clone(session),
turn: Arc::clone(turn),
};
let tool_runtime =
ToolCallRuntime::new(router, Arc::clone(session), Arc::clone(turn), tracker);
let mut process_slot = match self.ensure_started().await {
Ok(process_slot) => process_slot,
Err(err) => {
warn!("failed to start {PUBLIC_TOOL_NAME} worker for turn: {err}");
return None;
}
};
let Some(process) = process_slot.as_mut() else {
warn!(
"failed to start {PUBLIC_TOOL_NAME} worker for turn: {PUBLIC_TOOL_NAME} runner failed to start"
);
return None;
};
Some(process.worker(exec, tool_runtime))
}
pub(crate) async fn allocate_cell_id(&self) -> String {
let mut next_cell_id = self.next_cell_id.lock().await;
let cell_id = *next_cell_id;
*next_cell_id = next_cell_id.saturating_add(1);
cell_id.to_string()
}
pub(crate) async fn allocate_request_id(&self) -> String {
uuid::Uuid::new_v4().to_string()
}
}

View File

@@ -1,8 +0,0 @@
- Use `wait` only after `exec` returns `Script running with cell ID ...`.
- `cell_id` identifies the running `exec` cell to resume.
- `yield_time_ms` controls how long to wait for more output before yielding again. If omitted, `wait` uses its default wait timeout.
- `max_tokens` limits how much new output this wait call returns.
- `terminate: true` stops the running cell instead of waiting for more output.
- `wait` returns only the new output since the last yield, or the final completion or termination result for that cell.
- If the cell is still running, `wait` may yield again with the same `cell_id`.
- If the cell has already finished, `wait` returns the completed result and closes the cell.

View File

@@ -8,13 +8,10 @@ use crate::tools::context::ToolPayload;
use crate::tools::registry::ToolHandler;
use crate::tools::registry::ToolKind;
use super::CodeModeSessionProgress;
use super::DEFAULT_WAIT_YIELD_TIME_MS;
use super::ExecContext;
use super::PUBLIC_TOOL_NAME;
use super::WAIT_TOOL_NAME;
use super::handle_node_message;
use super::protocol::HostToNodeMessage;
use super::handle_runtime_response;
pub struct CodeModeWaitHandler;
@@ -63,66 +60,21 @@ impl ToolHandler for CodeModeWaitHandler {
ToolPayload::Function { arguments } if tool_name == WAIT_TOOL_NAME => {
let args: ExecWaitArgs = parse_arguments(&arguments)?;
let exec = ExecContext { session, turn };
let request_id = exec
.session
.services
.code_mode_service
.allocate_request_id()
.await;
let started_at = std::time::Instant::now();
let message = if args.terminate {
HostToNodeMessage::Terminate {
request_id: request_id.clone(),
cell_id: args.cell_id.clone(),
}
} else {
HostToNodeMessage::Poll {
request_id: request_id.clone(),
cell_id: args.cell_id.clone(),
yield_time_ms: args.yield_time_ms,
}
};
let process_slot = exec
let response = exec
.session
.services
.code_mode_service
.ensure_started()
.wait(codex_code_mode::WaitRequest {
cell_id: args.cell_id,
yield_time_ms: args.yield_time_ms,
terminate: args.terminate,
})
.await
.map_err(|err| FunctionCallError::RespondToModel(err.to_string()))?;
let result = {
let mut process_slot = process_slot;
let Some(process) = process_slot.as_mut() else {
return Err(FunctionCallError::RespondToModel(format!(
"{PUBLIC_TOOL_NAME} runner failed to start"
)));
};
if !matches!(process.has_exited(), Ok(false)) {
return Err(FunctionCallError::RespondToModel(format!(
"{PUBLIC_TOOL_NAME} runner failed to start"
)));
}
let message = process
.send(&request_id, &message)
.await
.map_err(|err| err.to_string());
let message = match message {
Ok(message) => message,
Err(error) => return Err(FunctionCallError::RespondToModel(error)),
};
handle_node_message(
&exec,
args.cell_id,
message,
Some(args.max_tokens),
started_at,
)
.map_err(FunctionCallError::RespondToModel)?;
handle_runtime_response(&exec, response, args.max_tokens, started_at)
.await
};
match result {
Ok(CodeModeSessionProgress::Finished(output))
| Ok(CodeModeSessionProgress::Yielded { output }) => Ok(output),
Err(error) => Err(FunctionCallError::RespondToModel(error)),
}
.map_err(FunctionCallError::RespondToModel)
}
_ => Err(FunctionCallError::RespondToModel(format!(
"{WAIT_TOOL_NAME} expects JSON arguments"

View File

@@ -1,116 +0,0 @@
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tracing::error;
use tracing::warn;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseInputItem;
use super::ExecContext;
use super::PUBLIC_TOOL_NAME;
use super::call_nested_tool;
use super::process::CodeModeProcess;
use super::process::write_message;
use super::protocol::HostToNodeMessage;
use super::protocol::NodeToHostMessage;
use crate::tools::parallel::ToolCallRuntime;
pub(crate) struct CodeModeWorker {
shutdown_tx: Option<oneshot::Sender<()>>,
}
impl Drop for CodeModeWorker {
fn drop(&mut self) {
if let Some(shutdown_tx) = self.shutdown_tx.take() {
let _ = shutdown_tx.send(());
}
}
}
impl CodeModeProcess {
pub(super) fn worker(
&self,
exec: ExecContext,
tool_runtime: ToolCallRuntime,
) -> CodeModeWorker {
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
let stdin = self.stdin.clone();
let message_rx = self.message_rx.clone();
tokio::spawn(async move {
loop {
let next_message = tokio::select! {
_ = &mut shutdown_rx => break,
message = async {
let mut message_rx = message_rx.lock().await;
message_rx.recv().await
} => message,
};
let Some(next_message) = next_message else {
break;
};
match next_message {
NodeToHostMessage::ToolCall { tool_call } => {
let exec = exec.clone();
let tool_runtime = tool_runtime.clone();
let stdin = stdin.clone();
tokio::spawn(async move {
let result = call_nested_tool(
exec,
tool_runtime,
tool_call.name,
tool_call.input,
CancellationToken::new(),
)
.await;
let (code_mode_result, error_text) = match result {
Ok(code_mode_result) => (code_mode_result, None),
Err(error) => (serde_json::Value::Null, Some(error.to_string())),
};
let response = HostToNodeMessage::Response {
request_id: tool_call.request_id,
id: tool_call.id,
code_mode_result,
error_text,
};
if let Err(err) = write_message(&stdin, &response).await {
warn!("failed to write {PUBLIC_TOOL_NAME} tool response: {err}");
}
});
}
NodeToHostMessage::Notify { notify } => {
if notify.text.trim().is_empty() {
continue;
}
if exec
.session
.inject_response_items(vec![ResponseInputItem::CustomToolCallOutput {
call_id: notify.call_id.clone(),
name: Some(PUBLIC_TOOL_NAME.to_string()),
output: FunctionCallOutputPayload::from_text(notify.text),
}])
.await
.is_err()
{
warn!(
"failed to inject {PUBLIC_TOOL_NAME} notify message for cell {}: no active turn",
notify.cell_id
);
}
}
unexpected_message @ (NodeToHostMessage::Yielded { .. }
| NodeToHostMessage::Terminated { .. }
| NodeToHostMessage::Result { .. }) => {
error!(
"received unexpected {PUBLIC_TOOL_NAME} message in worker loop: {unexpected_message:?}"
);
break;
}
}
}
});
CodeModeWorker {
shutdown_tx: Some(shutdown_tx),
}
}
}

View File

@@ -1,30 +1,11 @@
use crate::client_common::tools::ToolSpec;
use crate::mcp::split_qualified_tool_name;
use crate::tools::code_mode::PUBLIC_TOOL_NAME;
use serde_json::Value as JsonValue;
pub(crate) struct CodeModeToolReference {
pub(crate) module_path: String,
pub(crate) namespace: Vec<String>,
pub(crate) tool_key: String,
}
pub(crate) fn code_mode_tool_reference(tool_name: &str) -> CodeModeToolReference {
if let Some((server_name, tool_key)) = split_qualified_tool_name(tool_name) {
let namespace = vec!["mcp".to_string(), server_name];
return CodeModeToolReference {
module_path: format!("tools/{}.js", namespace.join("/")),
namespace,
tool_key,
};
}
CodeModeToolReference {
module_path: "tools.js".to_string(),
namespace: Vec::new(),
tool_key: tool_name.to_string(),
}
}
#[allow(unused_imports)]
#[cfg(test)]
pub(crate) use codex_code_mode::append_code_mode_sample;
#[allow(unused_imports)]
#[cfg(test)]
pub(crate) use codex_code_mode::render_json_schema_to_typescript;
pub(crate) fn augment_tool_spec_for_code_mode(spec: ToolSpec, code_mode_enabled: bool) -> ToolSpec {
if !code_mode_enabled {
@@ -33,27 +14,27 @@ pub(crate) fn augment_tool_spec_for_code_mode(spec: ToolSpec, code_mode_enabled:
match spec {
ToolSpec::Function(mut tool) => {
if tool.name != PUBLIC_TOOL_NAME {
tool.description = append_code_mode_sample(
&tool.description,
&tool.name,
"args",
serde_json::to_value(&tool.parameters)
.ok()
.as_ref()
.map(render_json_schema_to_typescript)
.unwrap_or_else(|| "unknown".to_string()),
tool.output_schema
.as_ref()
.map(render_json_schema_to_typescript)
.unwrap_or_else(|| "unknown".to_string()),
);
}
let input_type = serde_json::to_value(&tool.parameters)
.ok()
.map(|schema| codex_code_mode::render_json_schema_to_typescript(&schema))
.unwrap_or_else(|| "unknown".to_string());
let output_type = tool
.output_schema
.as_ref()
.map(codex_code_mode::render_json_schema_to_typescript)
.unwrap_or_else(|| "unknown".to_string());
tool.description = codex_code_mode::append_code_mode_sample(
&tool.description,
&tool.name,
"args",
input_type,
output_type,
);
ToolSpec::Function(tool)
}
ToolSpec::Freeform(mut tool) => {
if tool.name != PUBLIC_TOOL_NAME {
tool.description = append_code_mode_sample(
if tool.name != codex_code_mode::PUBLIC_TOOL_NAME {
tool.description = codex_code_mode::append_code_mode_sample(
&tool.description,
&tool.name,
"input",
@@ -66,234 +47,3 @@ pub(crate) fn augment_tool_spec_for_code_mode(spec: ToolSpec, code_mode_enabled:
other => other,
}
}
fn append_code_mode_sample(
description: &str,
tool_name: &str,
input_name: &str,
input_type: String,
output_type: String,
) -> String {
let declaration = format!(
"declare const tools: {{ {} }};",
render_code_mode_tool_declaration(tool_name, input_name, input_type, output_type)
);
format!("{description}\n\nexec tool declaration:\n```ts\n{declaration}\n```")
}
fn render_code_mode_tool_declaration(
tool_name: &str,
input_name: &str,
input_type: String,
output_type: String,
) -> String {
let tool_name = normalize_code_mode_identifier(tool_name);
format!("{tool_name}({input_name}: {input_type}): Promise<{output_type}>;")
}
pub(crate) fn normalize_code_mode_identifier(tool_key: &str) -> String {
let mut identifier = String::new();
for (index, ch) in tool_key.chars().enumerate() {
let is_valid = if index == 0 {
ch == '_' || ch == '$' || ch.is_ascii_alphabetic()
} else {
ch == '_' || ch == '$' || ch.is_ascii_alphanumeric()
};
if is_valid {
identifier.push(ch);
} else {
identifier.push('_');
}
}
if identifier.is_empty() {
"_".to_string()
} else {
identifier
}
}
fn render_json_schema_to_typescript(schema: &JsonValue) -> String {
render_json_schema_to_typescript_inner(schema)
}
fn render_json_schema_to_typescript_inner(schema: &JsonValue) -> String {
match schema {
JsonValue::Bool(true) => "unknown".to_string(),
JsonValue::Bool(false) => "never".to_string(),
JsonValue::Object(map) => {
if let Some(value) = map.get("const") {
return render_json_schema_literal(value);
}
if let Some(values) = map.get("enum").and_then(serde_json::Value::as_array) {
let rendered = values
.iter()
.map(render_json_schema_literal)
.collect::<Vec<_>>();
if !rendered.is_empty() {
return rendered.join(" | ");
}
}
for key in ["anyOf", "oneOf"] {
if let Some(variants) = map.get(key).and_then(serde_json::Value::as_array) {
let rendered = variants
.iter()
.map(render_json_schema_to_typescript_inner)
.collect::<Vec<_>>();
if !rendered.is_empty() {
return rendered.join(" | ");
}
}
}
if let Some(variants) = map.get("allOf").and_then(serde_json::Value::as_array) {
let rendered = variants
.iter()
.map(render_json_schema_to_typescript_inner)
.collect::<Vec<_>>();
if !rendered.is_empty() {
return rendered.join(" & ");
}
}
if let Some(schema_type) = map.get("type") {
if let Some(types) = schema_type.as_array() {
let rendered = types
.iter()
.filter_map(serde_json::Value::as_str)
.map(|schema_type| render_json_schema_type_keyword(map, schema_type))
.collect::<Vec<_>>();
if !rendered.is_empty() {
return rendered.join(" | ");
}
}
if let Some(schema_type) = schema_type.as_str() {
return render_json_schema_type_keyword(map, schema_type);
}
}
if map.contains_key("properties")
|| map.contains_key("additionalProperties")
|| map.contains_key("required")
{
return render_json_schema_object(map);
}
if map.contains_key("items") || map.contains_key("prefixItems") {
return render_json_schema_array(map);
}
"unknown".to_string()
}
_ => "unknown".to_string(),
}
}
fn render_json_schema_type_keyword(
map: &serde_json::Map<String, JsonValue>,
schema_type: &str,
) -> String {
match schema_type {
"string" => "string".to_string(),
"number" | "integer" => "number".to_string(),
"boolean" => "boolean".to_string(),
"null" => "null".to_string(),
"array" => render_json_schema_array(map),
"object" => render_json_schema_object(map),
_ => "unknown".to_string(),
}
}
fn render_json_schema_array(map: &serde_json::Map<String, JsonValue>) -> String {
if let Some(items) = map.get("items") {
let item_type = render_json_schema_to_typescript_inner(items);
return format!("Array<{item_type}>");
}
if let Some(items) = map.get("prefixItems").and_then(serde_json::Value::as_array) {
let item_types = items
.iter()
.map(render_json_schema_to_typescript_inner)
.collect::<Vec<_>>();
if !item_types.is_empty() {
return format!("[{}]", item_types.join(", "));
}
}
"unknown[]".to_string()
}
fn render_json_schema_object(map: &serde_json::Map<String, JsonValue>) -> String {
let required = map
.get("required")
.and_then(serde_json::Value::as_array)
.map(|items| {
items
.iter()
.filter_map(serde_json::Value::as_str)
.collect::<Vec<_>>()
})
.unwrap_or_default();
let properties = map
.get("properties")
.and_then(serde_json::Value::as_object)
.cloned()
.unwrap_or_default();
let mut sorted_properties = properties.iter().collect::<Vec<_>>();
sorted_properties.sort_unstable_by(|(name_a, _), (name_b, _)| name_a.cmp(name_b));
let mut lines = sorted_properties
.into_iter()
.map(|(name, value)| {
let optional = if required.iter().any(|required_name| required_name == name) {
""
} else {
"?"
};
let property_name = render_json_schema_property_name(name);
let property_type = render_json_schema_to_typescript_inner(value);
format!("{property_name}{optional}: {property_type};")
})
.collect::<Vec<_>>();
if let Some(additional_properties) = map.get("additionalProperties") {
let additional_type = match additional_properties {
JsonValue::Bool(true) => Some("unknown".to_string()),
JsonValue::Bool(false) => None,
value => Some(render_json_schema_to_typescript_inner(value)),
};
if let Some(additional_type) = additional_type {
lines.push(format!("[key: string]: {additional_type};"));
}
} else if properties.is_empty() {
lines.push("[key: string]: unknown;".to_string());
}
if lines.is_empty() {
return "{}".to_string();
}
format!("{{ {} }}", lines.join(" "))
}
fn render_json_schema_property_name(name: &str) -> String {
if normalize_code_mode_identifier(name) == name {
name.to_string()
} else {
serde_json::to_string(name).unwrap_or_else(|_| format!("\"{}\"", name.replace('"', "\\\"")))
}
}
fn render_json_schema_literal(value: &JsonValue) -> String {
serde_json::to_string(value).unwrap_or_else(|_| "unknown".to_string())
}
#[cfg(test)]
#[path = "code_mode_description_tests.rs"]
mod tests;

View File

@@ -4,7 +4,6 @@ use crate::codex::TurnContext;
use crate::function_tool::FunctionCallError;
use crate::mcp_connection_manager::ToolInfo;
use crate::sandboxing::SandboxPermissions;
use crate::tools::code_mode::is_code_mode_nested_tool;
use crate::tools::context::SharedTurnDiffTracker;
use crate::tools::context::ToolInvocation;
use crate::tools::context::ToolPayload;
@@ -67,7 +66,7 @@ impl ToolRouter {
specs
.iter()
.filter_map(|configured_tool| {
if !is_code_mode_nested_tool(configured_tool.spec.name()) {
if !codex_code_mode::is_code_mode_nested_tool(configured_tool.spec.name()) {
Some(configured_tool.spec.clone())
} else {
None

View File

@@ -11,9 +11,6 @@ use crate::shell::Shell;
use crate::shell::ShellType;
use crate::tools::code_mode::PUBLIC_TOOL_NAME;
use crate::tools::code_mode::WAIT_TOOL_NAME;
use crate::tools::code_mode::is_code_mode_nested_tool;
use crate::tools::code_mode::tool_description as code_mode_tool_description;
use crate::tools::code_mode::wait_tool_description as code_mode_wait_tool_description;
use crate::tools::code_mode_description::augment_tool_spec_for_code_mode;
use crate::tools::discoverable::DiscoverablePluginInfo;
use crate::tools::discoverable::DiscoverableTool;
@@ -833,7 +830,7 @@ fn create_wait_tool() -> ToolSpec {
name: WAIT_TOOL_NAME.to_string(),
description: format!(
"Waits on a yielded `{PUBLIC_TOOL_NAME}` cell and returns new output or completion.\n{}",
code_mode_wait_tool_description().trim()
codex_code_mode::build_wait_tool_description().trim()
),
strict: false,
parameters: JsonSchema::Object {
@@ -2176,7 +2173,10 @@ SOURCE: /[\s\S]+/
ToolSpec::Freeform(FreeformTool {
name: PUBLIC_TOOL_NAME.to_string(),
description: code_mode_tool_description(enabled_tools, code_mode_only_enabled),
description: codex_code_mode::build_exec_tool_description(
enabled_tools,
code_mode_only_enabled,
),
format: FreeformToolFormat {
r#type: "grammar".to_string(),
syntax: "lark".to_string(),
@@ -2647,7 +2647,7 @@ pub(crate) fn build_specs_with_discoverable_tools(
ToolSpec::Freeform(tool) => (tool.name, tool.description),
_ => return None,
};
is_code_mode_nested_tool(&name).then_some((name, description))
codex_code_mode::is_code_mode_nested_tool(&name).then_some((name, description))
})
.collect::<Vec<_>>();
enabled_tools.sort_by(|left, right| left.0.cmp(&right.0));

View File

@@ -1672,8 +1672,6 @@ async fn code_mode_exit_stops_script_immediately() -> Result<()> {
&server,
"use exec to stop script early with exit helper",
r#"
import { exit, text } from "@openai/code_mode";
text("before");
exit();
text("after");
@@ -2129,6 +2127,7 @@ text(JSON.stringify(Object.getOwnPropertyNames(globalThis).sort()));
"SuppressedError",
"Symbol",
"SyntaxError",
"Temporal",
"TypeError",
"URIError",
"Uint16Array",
@@ -2141,7 +2140,6 @@ text(JSON.stringify(Object.getOwnPropertyNames(globalThis).sort()));
"WebAssembly",
"__codexContentItems",
"add_content",
"console",
"decodeURI",
"decodeURIComponent",
"encodeURI",
@@ -2282,10 +2280,8 @@ async fn code_mode_can_call_hidden_dynamic_tools() -> Result<()> {
test.session_configured = new_thread.session_configured;
let code = r#"
import { ALL_TOOLS, hidden_dynamic_tool } from "tools.js";
const tool = ALL_TOOLS.find(({ name }) => name === "hidden_dynamic_tool");
const out = await hidden_dynamic_tool({ city: "Paris" });
const out = await tools.hidden_dynamic_tool({ city: "Paris" });
text(
JSON.stringify({
name: tool?.name ?? null,

View File

@@ -493,6 +493,7 @@ async fn permissions_message_includes_writable_roots() -> Result<()> {
let expected = DeveloperInstructions::from_policy(
&sandbox_policy,
AskForApproval::OnRequest,
test.config.approvals_reviewer,
&Policy::empty(),
test.config.cwd.as_path(),
false,

View File

@@ -159,7 +159,9 @@ async fn unified_exec_intercepts_apply_patch_exec_command() -> Result<()> {
let call_id = "uexec-apply-patch";
let args = json!({
"cmd": command,
"yield_time_ms": 250,
// The intercepted apply_patch path spawns a helper process, which can
// take longer than a tiny unified-exec yield deadline on CI.
"yield_time_ms": 5_000,
});
let responses = vec![

View File

@@ -3,6 +3,7 @@ name = "codex-exec"
version.workspace = true
edition.workspace = true
license.workspace = true
autotests = false
[[bin]]
name = "codex-exec"
@@ -12,6 +13,10 @@ path = "src/main.rs"
name = "codex_exec"
path = "src/lib.rs"
[[test]]
name = "all"
path = "tests/all.rs"
[lints]
workspace = true
@@ -28,13 +33,10 @@ codex-otel = { workspace = true }
codex-protocol = { workspace = true }
codex-utils-absolute-path = { workspace = true }
codex-utils-cli = { workspace = true }
codex-utils-elapsed = { workspace = true }
codex-utils-oss = { workspace = true }
codex-utils-sandbox-summary = { workspace = true }
owo-colors = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
shlex = { workspace = true }
supports-color = { workspace = true }
tokio = { workspace = true, features = [
"io-std",
@@ -63,7 +65,6 @@ opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
predicates = { workspace = true }
pretty_assertions = { workspace = true }
rmcp = { workspace = true }
tempfile = { workspace = true }
tracing-opentelemetry = { workspace = true }
uuid = { workspace = true }

View File

@@ -1,13 +1,13 @@
use std::path::Path;
use codex_app_server_protocol::ServerNotification;
use codex_core::config::Config;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::SessionConfiguredEvent;
pub(crate) enum CodexStatus {
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CodexStatus {
Running,
InitiateShutdown,
Shutdown,
}
pub(crate) trait EventProcessor {
@@ -19,8 +19,11 @@ pub(crate) trait EventProcessor {
session_configured: &SessionConfiguredEvent,
);
/// Handle a single event emitted by the agent.
fn process_event(&mut self, event: Event) -> CodexStatus;
/// Handle a single typed app-server notification emitted by the agent.
fn process_server_notification(&mut self, notification: ServerNotification) -> CodexStatus;
/// Handle a local exec warning that is not represented as an app-server notification.
fn process_warning(&mut self, message: String) -> CodexStatus;
fn print_final_output(&mut self) {}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -17,11 +17,9 @@ use codex_app_server_client::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
use codex_app_server_client::InProcessAppServerClient;
use codex_app_server_client::InProcessClientStartArgs;
use codex_app_server_client::InProcessServerEvent;
use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::McpServerElicitationAction;
use codex_app_server_protocol::McpServerElicitationRequestResponse;
use codex_app_server_protocol::RequestId;
@@ -30,8 +28,13 @@ use codex_app_server_protocol::ReviewStartResponse;
use codex_app_server_protocol::ReviewTarget as ApiReviewTarget;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::Thread as AppServerThread;
use codex_app_server_protocol::ThreadListParams;
use codex_app_server_protocol::ThreadListResponse;
use codex_app_server_protocol::ThreadResumeParams;
use codex_app_server_protocol::ThreadResumeResponse;
use codex_app_server_protocol::ThreadSortKey;
use codex_app_server_protocol::ThreadSourceKind;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadUnsubscribeParams;
@@ -41,8 +44,7 @@ use codex_app_server_protocol::TurnInterruptResponse;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_arg0::Arg0DispatchPaths;
use codex_cloud_requirements::cloud_requirements_loader;
use codex_core::AuthManager;
use codex_cloud_requirements::cloud_requirements_loader_for_storage;
use codex_core::LMSTUDIO_OSS_PROVIDER_ID;
use codex_core::OLLAMA_OSS_PROVIDER_ID;
use codex_core::auth::AuthConfig;
@@ -59,16 +61,17 @@ use codex_core::config_loader::LoaderOverrides;
use codex_core::config_loader::format_config_error_with_source;
use codex_core::format_exec_policy_error_with_source;
use codex_core::git_info::get_git_repo_root;
use codex_core::path_utils;
use codex_feedback::CodexFeedback;
use codex_otel::set_parent_from_context;
use codex_otel::traceparent_context_from_env;
use codex_protocol::account::PlanType as AccountPlanType;
use codex_protocol::config_types::SandboxMode;
use codex_protocol::protocol::AskForApproval;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::ReviewRequest;
use codex_protocol::protocol::ReviewTarget;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::protocol::RolloutLine;
use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SessionConfiguredEvent;
use codex_protocol::protocol::SessionSource;
use codex_protocol::user_input::UserInput;
@@ -79,10 +82,9 @@ use event_processor_with_human_output::EventProcessorWithHumanOutput;
use event_processor_with_jsonl_output::EventProcessorWithJsonOutput;
use serde_json::Value;
use std::collections::HashMap;
use std::collections::HashSet;
use std::collections::VecDeque;
use std::io::IsTerminal;
use std::io::Read;
use std::path::Path;
use std::path::PathBuf;
use supports_color::Stream;
use tokio::sync::mpsc;
@@ -101,8 +103,6 @@ use crate::event_processor::CodexStatus;
use crate::event_processor::EventProcessor;
use codex_core::default_client::set_default_client_residency_requirement;
use codex_core::default_client::set_default_originator;
use codex_core::find_thread_path_by_id_str;
use codex_core::find_thread_path_by_name_str;
const DEFAULT_ANALYTICS_ENABLED: bool = true;
@@ -287,18 +287,17 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
}
};
let cloud_auth_manager = AuthManager::shared(
codex_home.clone(),
/*enable_codex_api_key_env*/ false,
config_toml.cli_auth_credentials_store.unwrap_or_default(),
);
let chatgpt_base_url = config_toml
.chatgpt_base_url
.clone()
.unwrap_or_else(|| "https://chatgpt.com/backend-api/".to_string());
// TODO(gt): Make cloud requirements failures blocking once we can fail-closed.
let cloud_requirements =
cloud_requirements_loader(cloud_auth_manager, chatgpt_base_url, codex_home.clone());
let cloud_requirements = cloud_requirements_loader_for_storage(
codex_home.clone(),
/*enable_codex_api_key_env*/ false,
config_toml.cli_auth_credentials_store.unwrap_or_default(),
chatgpt_base_url,
);
let run_cli_overrides = cli_kv_overrides.clone();
let run_loader_overrides = LoaderOverrides::default();
let run_cloud_requirements = cloud_requirements.clone();
@@ -500,14 +499,6 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
last_message_file.clone(),
)),
};
let required_mcp_servers: HashSet<String> = config
.mcp_servers
.get()
.iter()
.filter(|(_, server)| server.enabled && server.required)
.map(|(name, _)| name.clone())
.collect();
if oss {
// We're in the oss section, so provider_id should be Some
// Let's handle None case gracefully though just in case
@@ -547,17 +538,16 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
anyhow::anyhow!("failed to initialize in-process app-server client: {err}")
})?;
// Handle resume subcommand by resolving a rollout path and using explicit resume API.
// Handle resume subcommand through existing `thread/list` + `thread/resume`
// APIs so exec no longer reaches into rollout storage directly.
let (primary_thread_id, fallback_session_configured) =
if let Some(ExecCommand::Resume(args)) = command.as_ref() {
let resume_path = resolve_resume_path(&config, args).await?;
if let Some(path) = resume_path {
if let Some(thread_id) = resolve_resume_thread_id(&client, &config, args).await? {
let response: ThreadResumeResponse = send_request_with_response(
&client,
ClientRequest::ThreadResume {
request_id: request_ids.next(),
params: thread_resume_params_from_config(&config, Some(path)),
params: thread_resume_params_from_config(&config, thread_id),
},
"thread/resume",
)
@@ -598,7 +588,6 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
};
let primary_thread_id_for_span = primary_thread_id.to_string();
let mut buffered_events = VecDeque::new();
// Use the start/resume response as the authoritative bootstrap payload.
// Waiting for a later streamed `SessionConfigured` event adds up to 10s of
// avoidable startup latency on the in-process path.
@@ -670,10 +659,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
// is using.
event_processor.print_config_summary(&config, &prompt_summary, &session_configured);
if !json_mode && let Some(message) = codex_core::config::missing_system_bwrap_warning() {
let _ = event_processor.process_event(Event {
id: String::new(),
msg: EventMsg::Warning(codex_protocol::protocol::WarningEvent { message }),
});
event_processor.process_warning(message);
}
info!("Codex initialized with event: {session_configured:?}");
@@ -748,34 +734,30 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
let mut interrupt_channel_open = true;
let primary_thread_id_for_requests = primary_thread_id.to_string();
loop {
let server_event = if let Some(event) = buffered_events.pop_front() {
Some(event)
} else {
tokio::select! {
maybe_interrupt = interrupt_rx.recv(), if interrupt_channel_open => {
if maybe_interrupt.is_none() {
interrupt_channel_open = false;
continue;
}
if let Err(err) = send_request_with_response::<TurnInterruptResponse>(
&client,
ClientRequest::TurnInterrupt {
request_id: request_ids.next(),
params: TurnInterruptParams {
thread_id: primary_thread_id_for_requests.clone(),
turn_id: task_id.clone(),
},
},
"turn/interrupt",
)
.await
{
warn!("turn/interrupt failed: {err}");
}
let server_event = tokio::select! {
maybe_interrupt = interrupt_rx.recv(), if interrupt_channel_open => {
if maybe_interrupt.is_none() {
interrupt_channel_open = false;
continue;
}
maybe_event = client.next_event() => maybe_event,
if let Err(err) = send_request_with_response::<TurnInterruptResponse>(
&client,
ClientRequest::TurnInterrupt {
request_id: request_ids.next(),
params: TurnInterruptParams {
thread_id: primary_thread_id_for_requests.clone(),
turn_id: task_id.clone(),
},
},
"turn/interrupt",
)
.await
{
warn!("turn/interrupt failed: {err}");
}
continue;
}
maybe_event = client.next_event() => maybe_event,
};
let Some(server_event) = server_event else {
@@ -784,69 +766,36 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
match server_event {
InProcessServerEvent::ServerRequest(request) => {
handle_server_request(
&client,
request,
&config,
&primary_thread_id_for_requests,
&mut error_seen,
)
.await;
handle_server_request(&client, request, &mut error_seen).await;
}
InProcessServerEvent::ServerNotification(notification) => {
if let ServerNotification::Error(payload) = &notification
if let ServerNotification::Error(payload) = &notification {
if payload.thread_id == primary_thread_id_for_requests
&& payload.turn_id == task_id
&& !payload.will_retry
{
error_seen = true;
}
} else if let ServerNotification::TurnCompleted(payload) = &notification
&& payload.thread_id == primary_thread_id_for_requests
&& payload.turn_id == task_id
&& !payload.will_retry
&& payload.turn.id == task_id
&& matches!(
payload.turn.status,
codex_app_server_protocol::TurnStatus::Failed
| codex_app_server_protocol::TurnStatus::Interrupted
)
{
error_seen = true;
}
}
InProcessServerEvent::LegacyNotification(notification) => {
let decoded = match decode_legacy_notification(notification) {
Ok(event) => event,
Err(err) => {
warn!("{err}");
continue;
}
};
if decoded.conversation_id.as_deref()
!= Some(primary_thread_id_for_requests.as_str())
&& decoded.conversation_id.is_some()
{
continue;
}
let event = decoded.event;
if matches!(event.msg, EventMsg::SessionConfigured(_)) {
continue;
}
if matches!(event.msg, EventMsg::Error(_)) {
// The legacy bridge still carries fatal turn failures for
// exec. Preserve the non-zero exit behavior until this
// path is fully replaced by typed server notifications.
error_seen = true;
}
match &event.msg {
EventMsg::TurnComplete(payload) => {
if payload.turn_id != task_id {
continue;
}
}
EventMsg::TurnAborted(payload) => {
if payload.turn_id.as_deref() != Some(task_id.as_str()) {
continue;
}
}
EventMsg::McpStartupUpdate(update) => {
if required_mcp_servers.contains(&update.server)
&& let codex_protocol::protocol::McpStartupStatus::Failed { error } =
&update.status
{
error_seen = true;
eprintln!(
"Required MCP server '{}' failed to initialize: {error}",
update.server
);
if should_process_notification(
&notification,
&primary_thread_id_for_requests,
&task_id,
) {
match event_processor.process_server_notification(notification) {
CodexStatus::Running => {}
CodexStatus::InitiateShutdown => {
if let Err(err) = request_shutdown(
&client,
&mut request_ids,
@@ -859,37 +808,12 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
break;
}
}
_ => {}
}
match event_processor.process_event(event) {
CodexStatus::Running => {}
CodexStatus::InitiateShutdown => {
if let Err(err) = request_shutdown(
&client,
&mut request_ids,
&primary_thread_id_for_requests,
)
.await
{
warn!("thread/unsubscribe failed during shutdown: {err}");
}
break;
}
CodexStatus::Shutdown => {
// `ShutdownComplete` does not identify which attached
// thread emitted it, so subagent shutdowns must not end
// the primary exec loop early.
}
}
}
InProcessServerEvent::Lagged { skipped } => {
let message = lagged_event_warning_message(skipped);
warn!("{message}");
let _ = event_processor.process_event(Event {
id: String::new(),
msg: EventMsg::Warning(codex_protocol::protocol::WarningEvent { message }),
});
event_processor.process_warning(message);
}
}
}
@@ -936,10 +860,9 @@ fn thread_start_params_from_config(config: &Config) -> ThreadStartParams {
}
}
fn thread_resume_params_from_config(config: &Config, path: Option<PathBuf>) -> ThreadResumeParams {
fn thread_resume_params_from_config(config: &Config, thread_id: String) -> ThreadResumeParams {
ThreadResumeParams {
thread_id: "resume".to_string(),
path,
thread_id,
model: config.model.clone(),
model_provider: Some(config.model_provider_id.clone()),
cwd: Some(config.cwd.to_string_lossy().to_string()),
@@ -1017,20 +940,19 @@ fn session_configured_from_thread_resume_response(
)
}
fn review_target_to_api(target: ReviewTarget) -> ApiReviewTarget {
match target {
ReviewTarget::UncommittedChanges => ApiReviewTarget::UncommittedChanges,
ReviewTarget::BaseBranch { branch } => ApiReviewTarget::BaseBranch { branch },
ReviewTarget::Commit { sha, title } => ApiReviewTarget::Commit { sha, title },
ReviewTarget::Custom { instructions } => ApiReviewTarget::Custom { instructions },
}
}
#[expect(
clippy::too_many_arguments,
reason = "session mapping keeps explicit fields"
)]
/// Synthesizes startup session metadata from `thread/start` or `thread/resume`.
///
/// This is a compatibility bridge for the current in-process architecture.
/// Some session fields are not available synchronously from the start/resume
/// response, so callers must treat the result as a best-effort fallback until
/// a later `SessionConfigured` event proves otherwise.
/// TODO(architecture): stop synthesizing a partial `SessionConfiguredEvent`
/// here. Either return the authoritative session-configured payload from
/// `thread/start`/`thread/resume`, or introduce a smaller bootstrap type for
/// exec so this path cannot accidentally depend on placeholder fields.
fn session_configured_from_thread_response(
thread_id: &str,
thread_name: Option<String>,
@@ -1040,7 +962,7 @@ fn session_configured_from_thread_response(
service_tier: Option<codex_protocol::config_types::ServiceTier>,
approval_policy: AskForApproval,
approvals_reviewer: codex_protocol::config_types::ApprovalsReviewer,
sandbox_policy: codex_protocol::protocol::SandboxPolicy,
sandbox_policy: SandboxPolicy,
cwd: PathBuf,
reasoning_effort: Option<codex_protocol::openai_models::ReasoningEffort>,
) -> Result<SessionConfiguredEvent, String> {
@@ -1067,71 +989,198 @@ fn session_configured_from_thread_response(
})
}
fn review_target_to_api(target: ReviewTarget) -> ApiReviewTarget {
match target {
ReviewTarget::UncommittedChanges => ApiReviewTarget::UncommittedChanges,
ReviewTarget::BaseBranch { branch } => ApiReviewTarget::BaseBranch { branch },
ReviewTarget::Commit { sha, title } => ApiReviewTarget::Commit { sha, title },
ReviewTarget::Custom { instructions } => ApiReviewTarget::Custom { instructions },
}
}
fn normalize_legacy_notification_method(method: &str) -> &str {
method.strip_prefix("codex/event/").unwrap_or(method)
}
fn lagged_event_warning_message(skipped: usize) -> String {
format!("in-process app-server event stream lagged; dropped {skipped} events")
}
struct DecodedLegacyNotification {
conversation_id: Option<String>,
event: Event,
fn should_process_notification(
notification: &ServerNotification,
thread_id: &str,
turn_id: &str,
) -> bool {
match notification {
ServerNotification::ConfigWarning(_) | ServerNotification::DeprecationNotice(_) => true,
ServerNotification::Error(notification) => {
notification.thread_id == thread_id && notification.turn_id == turn_id
}
ServerNotification::HookCompleted(notification) => {
notification.thread_id == thread_id
&& notification
.turn_id
.as_deref()
.is_none_or(|candidate| candidate == turn_id)
}
ServerNotification::HookStarted(notification) => {
notification.thread_id == thread_id
&& notification
.turn_id
.as_deref()
.is_none_or(|candidate| candidate == turn_id)
}
ServerNotification::ItemCompleted(notification) => {
notification.thread_id == thread_id && notification.turn_id == turn_id
}
ServerNotification::ItemStarted(notification) => {
notification.thread_id == thread_id && notification.turn_id == turn_id
}
ServerNotification::ModelRerouted(notification) => {
notification.thread_id == thread_id && notification.turn_id == turn_id
}
ServerNotification::ThreadTokenUsageUpdated(notification) => {
notification.thread_id == thread_id && notification.turn_id == turn_id
}
ServerNotification::TurnCompleted(notification) => {
notification.thread_id == thread_id && notification.turn.id == turn_id
}
ServerNotification::TurnDiffUpdated(notification) => {
notification.thread_id == thread_id && notification.turn_id == turn_id
}
ServerNotification::TurnPlanUpdated(notification) => {
notification.thread_id == thread_id && notification.turn_id == turn_id
}
ServerNotification::TurnStarted(notification) => {
notification.thread_id == thread_id && notification.turn.id == turn_id
}
_ => false,
}
}
fn decode_legacy_notification(
notification: JSONRPCNotification,
) -> Result<DecodedLegacyNotification, String> {
let value = notification
.params
.unwrap_or_else(|| serde_json::Value::Object(serde_json::Map::new()));
let method = notification.method;
let normalized_method = normalize_legacy_notification_method(&method).to_string();
let serde_json::Value::Object(mut object) = value else {
return Err(format!(
"legacy notification `{method}` params were not an object"
));
};
let conversation_id = object
.get("conversationId")
.and_then(serde_json::Value::as_str)
.map(str::to_owned);
let mut event_payload = if let Some(serde_json::Value::Object(msg_payload)) = object.get("msg")
{
serde_json::Value::Object(msg_payload.clone())
} else {
object.remove("conversationId");
serde_json::Value::Object(object)
};
let serde_json::Value::Object(ref mut object) = event_payload else {
return Err(format!(
"legacy notification `{method}` event payload was not an object"
));
};
object.insert(
"type".to_string(),
serde_json::Value::String(normalized_method),
);
fn all_thread_source_kinds() -> Vec<ThreadSourceKind> {
vec![
ThreadSourceKind::Cli,
ThreadSourceKind::VsCode,
ThreadSourceKind::Exec,
ThreadSourceKind::AppServer,
ThreadSourceKind::SubAgent,
ThreadSourceKind::SubAgentReview,
ThreadSourceKind::SubAgentCompact,
ThreadSourceKind::SubAgentThreadSpawn,
ThreadSourceKind::SubAgentOther,
ThreadSourceKind::Unknown,
]
}
let msg: EventMsg = serde_json::from_value(event_payload)
.map_err(|err| format!("failed to decode event: {err}"))?;
Ok(DecodedLegacyNotification {
conversation_id,
event: Event {
id: String::new(),
msg,
},
})
async fn latest_thread_cwd(thread: &AppServerThread) -> PathBuf {
if let Some(path) = thread.path.as_deref()
&& let Some(cwd) = parse_latest_turn_context_cwd(path).await
{
return cwd;
}
thread.cwd.clone()
}
async fn parse_latest_turn_context_cwd(path: &Path) -> Option<PathBuf> {
let text = tokio::fs::read_to_string(path).await.ok()?;
for line in text.lines().rev() {
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let Ok(rollout_line) = serde_json::from_str::<RolloutLine>(trimmed) else {
continue;
};
if let RolloutItem::TurnContext(item) = rollout_line.item {
return Some(item.cwd);
}
}
None
}
fn cwds_match(current_cwd: &Path, session_cwd: &Path) -> bool {
match (
path_utils::normalize_for_path_comparison(current_cwd),
path_utils::normalize_for_path_comparison(session_cwd),
) {
(Ok(current), Ok(session)) => current == session,
_ => current_cwd == session_cwd,
}
}
async fn resolve_resume_thread_id(
client: &InProcessAppServerClient,
config: &Config,
args: &crate::cli::ResumeArgs,
) -> anyhow::Result<Option<String>> {
let model_providers = Some(vec![config.model_provider_id.clone()]);
if args.last {
let mut cursor = None;
loop {
let response: ThreadListResponse = send_request_with_response(
client,
ClientRequest::ThreadList {
request_id: RequestId::Integer(0),
params: ThreadListParams {
cursor,
limit: Some(100),
sort_key: Some(ThreadSortKey::UpdatedAt),
model_providers: model_providers.clone(),
source_kinds: Some(all_thread_source_kinds()),
archived: Some(false),
cwd: None,
search_term: None,
},
},
"thread/list",
)
.await
.map_err(anyhow::Error::msg)?;
for thread in response.data {
if args.all || cwds_match(config.cwd.as_path(), &latest_thread_cwd(&thread).await) {
return Ok(Some(thread.id));
}
}
let Some(next_cursor) = response.next_cursor else {
return Ok(None);
};
cursor = Some(next_cursor);
}
}
let Some(session_id) = args.session_id.as_deref() else {
return Ok(None);
};
if Uuid::parse_str(session_id).is_ok() {
return Ok(Some(session_id.to_string()));
}
let mut cursor = None;
loop {
let response: ThreadListResponse = send_request_with_response(
client,
ClientRequest::ThreadList {
request_id: RequestId::Integer(0),
params: ThreadListParams {
cursor,
limit: Some(100),
sort_key: Some(ThreadSortKey::UpdatedAt),
model_providers: Some(vec![config.model_provider_id.clone()]),
source_kinds: Some(all_thread_source_kinds()),
archived: Some(false),
cwd: None,
// Thread names are attached separately from rollout titles, so name
// resolution must scan the filtered list client-side instead of relying
// on the backend `search_term` filter.
search_term: None,
},
},
"thread/list",
)
.await
.map_err(anyhow::Error::msg)?;
for thread in response.data {
if thread.name.as_deref() != Some(session_id) {
continue;
}
if args.all || cwds_match(config.cwd.as_path(), &latest_thread_cwd(&thread).await) {
return Ok(Some(thread.id));
}
}
let Some(next_cursor) = response.next_cursor else {
return Ok(None);
};
cursor = Some(next_cursor);
}
}
fn canceled_mcp_server_elicitation_response() -> Result<Value, String> {
@@ -1205,8 +1254,6 @@ fn server_request_method_name(request: &ServerRequest) -> String {
async fn handle_server_request(
client: &InProcessAppServerClient,
request: ServerRequest,
config: &Config,
_thread_id: &str,
error_seen: &mut bool,
) {
let method = server_request_method_name(&request);
@@ -1228,50 +1275,6 @@ async fn handle_server_request(
Err(err) => Err(err),
}
}
ServerRequest::ChatgptAuthTokensRefresh { request_id, params } => {
let refresh_result = tokio::task::spawn_blocking({
let config = config.clone();
move || local_external_chatgpt_tokens(&config)
})
.await;
match refresh_result {
Err(err) => {
reject_server_request(
client,
request_id,
&method,
format!("local chatgpt auth refresh task failed in exec: {err}"),
)
.await
}
Ok(Err(reason)) => reject_server_request(client, request_id, &method, reason).await,
Ok(Ok(response)) => {
if let Some(previous_account_id) = params.previous_account_id.as_deref()
&& previous_account_id != response.chatgpt_account_id
{
warn!(
"local auth refresh account mismatch: expected `{previous_account_id}`, got `{}`",
response.chatgpt_account_id
);
}
match serde_json::to_value(response) {
Ok(value) => {
resolve_server_request(
client,
request_id,
value,
"account/chatgptAuthTokens/refresh",
)
.await
}
Err(err) => Err(format!(
"failed to serialize chatgpt auth refresh response: {err}"
)),
}
}
}
}
ServerRequest::CommandExecutionRequestApproval { request_id, params } => {
reject_server_request(
client,
@@ -1320,6 +1323,15 @@ async fn handle_server_request(
)
.await
}
ServerRequest::ChatgptAuthTokensRefresh { request_id, .. } => {
reject_server_request(
client,
request_id,
&method,
"chatgpt auth token refresh is not supported in exec mode".to_string(),
)
.await
}
ServerRequest::ApplyPatchApproval { request_id, params } => {
reject_server_request(
client,
@@ -1364,91 +1376,6 @@ async fn handle_server_request(
}
}
fn local_external_chatgpt_tokens(
config: &Config,
) -> Result<ChatgptAuthTokensRefreshResponse, String> {
let auth_manager = AuthManager::shared(
config.codex_home.clone(),
/*enable_codex_api_key_env*/ false,
config.cli_auth_credentials_store_mode,
);
auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id.clone());
auth_manager.reload();
let auth = auth_manager
.auth_cached()
.ok_or_else(|| "no cached auth available for local token refresh".to_string())?;
if !auth.is_external_chatgpt_tokens() {
return Err("external ChatGPT token auth is not active".to_string());
}
let access_token = auth
.get_token()
.map_err(|err| format!("failed to read external access token: {err}"))?;
let chatgpt_account_id = auth
.get_account_id()
.ok_or_else(|| "external token auth is missing chatgpt account id".to_string())?;
let chatgpt_plan_type = auth.account_plan_type().map(|plan_type| match plan_type {
AccountPlanType::Free => "free".to_string(),
AccountPlanType::Go => "go".to_string(),
AccountPlanType::Plus => "plus".to_string(),
AccountPlanType::Pro => "pro".to_string(),
AccountPlanType::Team => "team".to_string(),
AccountPlanType::Business => "business".to_string(),
AccountPlanType::Enterprise => "enterprise".to_string(),
AccountPlanType::Edu => "edu".to_string(),
AccountPlanType::Unknown => "unknown".to_string(),
});
Ok(ChatgptAuthTokensRefreshResponse {
access_token,
chatgpt_account_id,
chatgpt_plan_type,
})
}
async fn resolve_resume_path(
config: &Config,
args: &crate::cli::ResumeArgs,
) -> anyhow::Result<Option<PathBuf>> {
if args.last {
let default_provider_filter = vec![config.model_provider_id.clone()];
let filter_cwd = if args.all {
None
} else {
Some(config.cwd.as_path())
};
match codex_core::RolloutRecorder::find_latest_thread_path(
config,
/*page_size*/ 1,
/*cursor*/ None,
codex_core::ThreadSortKey::UpdatedAt,
&[],
Some(default_provider_filter.as_slice()),
&config.model_provider_id,
filter_cwd,
)
.await
{
Ok(path) => Ok(path),
Err(e) => {
error!("Error listing threads: {e}");
Ok(None)
}
}
} else if let Some(id_str) = args.session_id.as_deref() {
if Uuid::parse_str(id_str).is_ok() {
let path = find_thread_path_by_id_str(&config.codex_home, id_str).await?;
Ok(path)
} else {
let path = find_thread_path_by_name_str(&config.codex_home, id_str).await?;
Ok(path)
}
} else {
Ok(None)
}
}
fn load_output_schema(path: Option<PathBuf>) -> Option<Value> {
let path = path?;
@@ -1806,29 +1733,6 @@ mod tests {
);
}
#[test]
fn decode_legacy_notification_preserves_conversation_id() {
let decoded = decode_legacy_notification(JSONRPCNotification {
method: "codex/event/error".to_string(),
params: Some(serde_json::json!({
"conversationId": "thread-123",
"msg": {
"message": "boom"
}
})),
})
.expect("legacy notification should decode");
assert_eq!(decoded.conversation_id.as_deref(), Some("thread-123"));
assert!(matches!(
decoded.event.msg,
EventMsg::Error(codex_protocol::protocol::ErrorEvent {
message,
codex_error_info: None,
}) if message == "boom"
));
}
#[test]
fn canceled_mcp_server_elicitation_response_uses_cancel_action() {
let value = canceled_mcp_server_elicitation_response()

File diff suppressed because it is too large Load Diff

View File

@@ -9,6 +9,7 @@ use serde::Serialize;
use serde::ser::Serializer;
use ts_rs::TS;
use crate::config_types::ApprovalsReviewer;
use crate::config_types::CollaborationMode;
use crate::config_types::SandboxMode;
use crate::protocol::AskForApproval;
@@ -481,6 +482,7 @@ const APPROVAL_POLICY_ON_REQUEST_RULE: &str =
include_str!("prompts/permissions/approval_policy/on_request.md");
const APPROVAL_POLICY_ON_REQUEST_RULE_REQUEST_PERMISSION: &str =
include_str!("prompts/permissions/approval_policy/on_request_rule_request_permission.md");
const GUARDIAN_SUBAGENT_APPROVAL_SUFFIX: &str = "`approvals_reviewer` is `guardian_subagent`: Sandbox escalations with require_escalated will be reviewed for compliance with the policy. If a rejection happens, you should proceed only with a materially safer alternative, or inform the user of the risk and send a final message to ask for approval.";
const SANDBOX_MODE_DANGER_FULL_ACCESS: &str =
include_str!("prompts/permissions/sandbox_mode/danger_full_access.md");
@@ -491,6 +493,14 @@ const SANDBOX_MODE_READ_ONLY: &str = include_str!("prompts/permissions/sandbox_m
const REALTIME_START_INSTRUCTIONS: &str = include_str!("prompts/realtime/realtime_start.md");
const REALTIME_END_INSTRUCTIONS: &str = include_str!("prompts/realtime/realtime_end.md");
struct PermissionsPromptConfig<'a> {
approval_policy: AskForApproval,
approvals_reviewer: ApprovalsReviewer,
exec_policy: &'a Policy,
exec_permission_approvals_enabled: bool,
request_permissions_tool_enabled: bool,
}
impl DeveloperInstructions {
pub fn new<T: Into<String>>(text: T) -> Self {
Self { text: text.into() }
@@ -498,6 +508,7 @@ impl DeveloperInstructions {
pub fn from(
approval_policy: AskForApproval,
approvals_reviewer: ApprovalsReviewer,
exec_policy: &Policy,
exec_permission_approvals_enabled: bool,
request_permissions_tool_enabled: bool,
@@ -541,6 +552,14 @@ impl DeveloperInstructions {
),
};
let text = if approvals_reviewer == ApprovalsReviewer::GuardianSubagent
&& approval_policy != AskForApproval::Never
{
format!("{text}\n\n{GUARDIAN_SUBAGENT_APPROVAL_SUFFIX}")
} else {
text
};
DeveloperInstructions::new(text)
}
@@ -590,6 +609,7 @@ impl DeveloperInstructions {
pub fn from_policy(
sandbox_policy: &SandboxPolicy,
approval_policy: AskForApproval,
approvals_reviewer: ApprovalsReviewer,
exec_policy: &Policy,
cwd: &Path,
exec_permission_approvals_enabled: bool,
@@ -614,11 +634,14 @@ impl DeveloperInstructions {
DeveloperInstructions::from_permissions_with_network(
sandbox_mode,
network_access,
approval_policy,
exec_policy,
PermissionsPromptConfig {
approval_policy,
approvals_reviewer,
exec_policy,
exec_permission_approvals_enabled,
request_permissions_tool_enabled,
},
writable_roots,
exec_permission_approvals_enabled,
request_permissions_tool_enabled,
)
}
@@ -639,11 +662,8 @@ impl DeveloperInstructions {
fn from_permissions_with_network(
sandbox_mode: SandboxMode,
network_access: NetworkAccess,
approval_policy: AskForApproval,
exec_policy: &Policy,
config: PermissionsPromptConfig<'_>,
writable_roots: Option<Vec<WritableRoot>>,
exec_permission_approvals_enabled: bool,
request_permissions_tool_enabled: bool,
) -> Self {
let start_tag = DeveloperInstructions::new("<permissions instructions>");
let end_tag = DeveloperInstructions::new("</permissions instructions>");
@@ -653,10 +673,11 @@ impl DeveloperInstructions {
network_access,
))
.concat(DeveloperInstructions::from(
approval_policy,
exec_policy,
exec_permission_approvals_enabled,
request_permissions_tool_enabled,
config.approval_policy,
config.approvals_reviewer,
config.exec_policy,
config.exec_permission_approvals_enabled,
config.request_permissions_tool_enabled,
))
.concat(DeveloperInstructions::from_writable_roots(writable_roots))
.concat(end_tag)
@@ -1923,11 +1944,14 @@ mod tests {
let instructions = DeveloperInstructions::from_permissions_with_network(
SandboxMode::WorkspaceWrite,
NetworkAccess::Enabled,
AskForApproval::OnRequest,
&Policy::empty(),
PermissionsPromptConfig {
approval_policy: AskForApproval::OnRequest,
approvals_reviewer: ApprovalsReviewer::User,
exec_policy: &Policy::empty(),
exec_permission_approvals_enabled: false,
request_permissions_tool_enabled: false,
},
None,
false,
false,
);
let text = instructions.into_text();
@@ -1954,6 +1978,7 @@ mod tests {
let instructions = DeveloperInstructions::from_policy(
&policy,
AskForApproval::UnlessTrusted,
ApprovalsReviewer::User,
&Policy::empty(),
&PathBuf::from("/tmp"),
false,
@@ -1976,11 +2001,14 @@ mod tests {
let instructions = DeveloperInstructions::from_permissions_with_network(
SandboxMode::WorkspaceWrite,
NetworkAccess::Enabled,
AskForApproval::OnRequest,
&exec_policy,
PermissionsPromptConfig {
approval_policy: AskForApproval::OnRequest,
approvals_reviewer: ApprovalsReviewer::User,
exec_policy: &exec_policy,
exec_permission_approvals_enabled: false,
request_permissions_tool_enabled: false,
},
None,
false,
false,
);
let text = instructions.into_text();
@@ -1994,11 +2022,14 @@ mod tests {
let instructions = DeveloperInstructions::from_permissions_with_network(
SandboxMode::WorkspaceWrite,
NetworkAccess::Enabled,
AskForApproval::UnlessTrusted,
&Policy::empty(),
PermissionsPromptConfig {
approval_policy: AskForApproval::UnlessTrusted,
approvals_reviewer: ApprovalsReviewer::User,
exec_policy: &Policy::empty(),
exec_permission_approvals_enabled: false,
request_permissions_tool_enabled: true,
},
None,
false,
true,
);
let text = instructions.into_text();
@@ -2011,11 +2042,14 @@ mod tests {
let instructions = DeveloperInstructions::from_permissions_with_network(
SandboxMode::WorkspaceWrite,
NetworkAccess::Enabled,
AskForApproval::OnFailure,
&Policy::empty(),
PermissionsPromptConfig {
approval_policy: AskForApproval::OnFailure,
approvals_reviewer: ApprovalsReviewer::User,
exec_policy: &Policy::empty(),
exec_permission_approvals_enabled: false,
request_permissions_tool_enabled: true,
},
None,
false,
true,
);
let text = instructions.into_text();
@@ -2028,11 +2062,14 @@ mod tests {
let instructions = DeveloperInstructions::from_permissions_with_network(
SandboxMode::WorkspaceWrite,
NetworkAccess::Enabled,
AskForApproval::OnRequest,
&Policy::empty(),
PermissionsPromptConfig {
approval_policy: AskForApproval::OnRequest,
approvals_reviewer: ApprovalsReviewer::User,
exec_policy: &Policy::empty(),
exec_permission_approvals_enabled: true,
request_permissions_tool_enabled: false,
},
None,
true,
false,
);
let text = instructions.into_text();
@@ -2045,11 +2082,14 @@ mod tests {
let instructions = DeveloperInstructions::from_permissions_with_network(
SandboxMode::WorkspaceWrite,
NetworkAccess::Enabled,
AskForApproval::OnRequest,
&Policy::empty(),
PermissionsPromptConfig {
approval_policy: AskForApproval::OnRequest,
approvals_reviewer: ApprovalsReviewer::User,
exec_policy: &Policy::empty(),
exec_permission_approvals_enabled: false,
request_permissions_tool_enabled: true,
},
None,
false,
true,
);
let text = instructions.into_text();
@@ -2064,11 +2104,14 @@ mod tests {
let instructions = DeveloperInstructions::from_permissions_with_network(
SandboxMode::WorkspaceWrite,
NetworkAccess::Enabled,
AskForApproval::OnRequest,
&Policy::empty(),
PermissionsPromptConfig {
approval_policy: AskForApproval::OnRequest,
approvals_reviewer: ApprovalsReviewer::User,
exec_policy: &Policy::empty(),
exec_permission_approvals_enabled: true,
request_permissions_tool_enabled: true,
},
None,
true,
true,
);
let text = instructions.into_text();
@@ -2076,6 +2119,35 @@ mod tests {
assert!(text.contains("# request_permissions Tool"));
}
#[test]
fn guardian_subagent_approvals_append_guardian_specific_guidance() {
let text = DeveloperInstructions::from(
AskForApproval::OnRequest,
ApprovalsReviewer::GuardianSubagent,
&Policy::empty(),
false,
false,
)
.into_text();
assert!(text.contains("`approvals_reviewer` is `guardian_subagent`"));
assert!(text.contains("materially safer alternative"));
}
#[test]
fn guardian_subagent_approvals_omit_guardian_specific_guidance_when_approval_is_never() {
let text = DeveloperInstructions::from(
AskForApproval::Never,
ApprovalsReviewer::GuardianSubagent,
&Policy::empty(),
false,
false,
)
.into_text();
assert!(!text.contains("`approvals_reviewer` is `guardian_subagent`"));
}
fn granular_categories_section(title: &str, categories: &[&str]) -> String {
format!("{title}\n{}", categories.join("\n"))
}
@@ -2118,6 +2190,7 @@ mod tests {
request_permissions: true,
mcp_elicitations: false,
}),
ApprovalsReviewer::User,
&Policy::empty(),
true,
false,
@@ -2151,6 +2224,7 @@ mod tests {
request_permissions: true,
mcp_elicitations: true,
}),
ApprovalsReviewer::User,
&Policy::empty(),
true,
false,
@@ -2183,6 +2257,7 @@ mod tests {
request_permissions: true,
mcp_elicitations: true,
}),
ApprovalsReviewer::User,
&Policy::empty(),
false,
false,
@@ -2215,6 +2290,7 @@ mod tests {
request_permissions: true,
mcp_elicitations: true,
}),
ApprovalsReviewer::User,
&Policy::empty(),
true,
true,
@@ -2230,6 +2306,7 @@ mod tests {
request_permissions: false,
mcp_elicitations: true,
}),
ApprovalsReviewer::User,
&Policy::empty(),
true,
true,
@@ -2249,6 +2326,7 @@ mod tests {
request_permissions: true,
mcp_elicitations: false,
}),
ApprovalsReviewer::User,
&Policy::empty(),
true,
false,

View File

@@ -464,8 +464,6 @@ enum ThreadBufferedEvent {
Notification(ServerNotification),
Request(ServerRequest),
HistoryEntryResponse(GetHistoryEntryResponseEvent),
LegacyWarning(String),
LegacyRollback { num_turns: u32 },
}
#[derive(Debug)]
@@ -474,7 +472,6 @@ struct ThreadEventStore {
turns: Vec<Turn>,
buffer: VecDeque<ThreadBufferedEvent>,
pending_interactive_replay: PendingInteractiveReplayState,
pending_local_legacy_rollbacks: VecDeque<u32>,
active_turn_id: Option<String>,
input_state: Option<ThreadInputState>,
capacity: usize,
@@ -483,10 +480,7 @@ struct ThreadEventStore {
impl ThreadEventStore {
fn event_survives_session_refresh(event: &ThreadBufferedEvent) -> bool {
matches!(
event,
ThreadBufferedEvent::Request(_) | ThreadBufferedEvent::LegacyWarning(_)
)
matches!(event, ThreadBufferedEvent::Request(_))
}
fn new(capacity: usize) -> Self {
@@ -495,7 +489,6 @@ impl ThreadEventStore {
turns: Vec::new(),
buffer: VecDeque::new(),
pending_interactive_replay: PendingInteractiveReplayState::default(),
pending_local_legacy_rollbacks: VecDeque::new(),
active_turn_id: None,
input_state: None,
capacity,
@@ -521,7 +514,6 @@ impl ThreadEventStore {
}
fn set_turns(&mut self, turns: Vec<Turn>) {
self.pending_local_legacy_rollbacks.clear();
self.active_turn_id = turns
.iter()
.rev()
@@ -578,37 +570,6 @@ impl ThreadEventStore {
self.active_turn_id = None;
}
fn note_local_thread_rollback(&mut self, num_turns: u32) {
self.pending_local_legacy_rollbacks.push_back(num_turns);
while self.pending_local_legacy_rollbacks.len() > self.capacity {
self.pending_local_legacy_rollbacks.pop_front();
}
}
fn consume_pending_local_legacy_rollback(&mut self, num_turns: u32) -> bool {
match self.pending_local_legacy_rollbacks.front() {
Some(pending_num_turns) if *pending_num_turns == num_turns => {
self.pending_local_legacy_rollbacks.pop_front();
true
}
_ => false,
}
}
fn apply_legacy_thread_rollback(&mut self, num_turns: u32) {
let num_turns = usize::try_from(num_turns).unwrap_or(usize::MAX);
if num_turns >= self.turns.len() {
self.turns.clear();
} else {
self.turns
.truncate(self.turns.len().saturating_sub(num_turns));
}
self.buffer.clear();
self.pending_interactive_replay = PendingInteractiveReplayState::default();
self.pending_local_legacy_rollbacks.clear();
self.active_turn_id = None;
}
fn snapshot(&self) -> ThreadEventSnapshot {
ThreadEventSnapshot {
session: self.session.clone(),
@@ -623,9 +584,7 @@ impl ThreadEventStore {
.pending_interactive_replay
.should_replay_snapshot_request(request),
ThreadBufferedEvent::Notification(_)
| ThreadBufferedEvent::HistoryEntryResponse(_)
| ThreadBufferedEvent::LegacyWarning(_)
| ThreadBufferedEvent::LegacyRollback { .. } => true,
| ThreadBufferedEvent::HistoryEntryResponse(_) => true,
})
.cloned()
.collect(),
@@ -2283,50 +2242,6 @@ impl App {
Ok(())
}
async fn enqueue_thread_legacy_warning(
&mut self,
thread_id: ThreadId,
message: String,
) -> Result<()> {
let (sender, store) = {
let channel = self.ensure_thread_channel(thread_id);
(channel.sender.clone(), Arc::clone(&channel.store))
};
let should_send = {
let mut guard = store.lock().await;
guard
.buffer
.push_back(ThreadBufferedEvent::LegacyWarning(message.clone()));
if guard.buffer.len() > guard.capacity
&& let Some(removed) = guard.buffer.pop_front()
&& let ThreadBufferedEvent::Request(request) = &removed
{
guard
.pending_interactive_replay
.note_evicted_server_request(request);
}
guard.active
};
if should_send {
match sender.try_send(ThreadBufferedEvent::LegacyWarning(message)) {
Ok(()) => {}
Err(TrySendError::Full(event)) => {
tokio::spawn(async move {
if let Err(err) = sender.send(event).await {
tracing::warn!("thread {thread_id} event channel closed: {err}");
}
});
}
Err(TrySendError::Closed(_)) => {
tracing::warn!("thread {thread_id} event channel closed");
}
}
}
Ok(())
}
async fn enqueue_thread_history_entry_response(
&mut self,
thread_id: ThreadId,
@@ -2371,64 +2286,6 @@ impl App {
Ok(())
}
async fn enqueue_thread_legacy_rollback(
&mut self,
thread_id: ThreadId,
num_turns: u32,
) -> Result<()> {
let (sender, store) = {
let channel = self.ensure_thread_channel(thread_id);
(channel.sender.clone(), Arc::clone(&channel.store))
};
let should_send = {
let mut guard = store.lock().await;
if guard.consume_pending_local_legacy_rollback(num_turns) {
false
} else {
guard.apply_legacy_thread_rollback(num_turns);
guard.active
}
};
if should_send {
match sender.try_send(ThreadBufferedEvent::LegacyRollback { num_turns }) {
Ok(()) => {}
Err(TrySendError::Full(event)) => {
tokio::spawn(async move {
if let Err(err) = sender.send(event).await {
tracing::warn!("thread {thread_id} event channel closed: {err}");
}
});
}
Err(TrySendError::Closed(_)) => {
tracing::warn!("thread {thread_id} event channel closed");
}
}
}
Ok(())
}
async fn enqueue_primary_thread_legacy_warning(&mut self, message: String) -> Result<()> {
if let Some(thread_id) = self.primary_thread_id {
return self.enqueue_thread_legacy_warning(thread_id, message).await;
}
self.pending_primary_events
.push_back(ThreadBufferedEvent::LegacyWarning(message));
Ok(())
}
async fn enqueue_primary_thread_legacy_rollback(&mut self, num_turns: u32) -> Result<()> {
if let Some(thread_id) = self.primary_thread_id {
return self
.enqueue_thread_legacy_rollback(thread_id, num_turns)
.await;
}
self.pending_primary_events
.push_back(ThreadBufferedEvent::LegacyRollback { num_turns });
Ok(())
}
async fn enqueue_primary_thread_session(
&mut self,
session: ThreadSessionState,
@@ -2466,14 +2323,6 @@ impl App {
self.enqueue_thread_history_entry_response(thread_id, event)
.await?;
}
ThreadBufferedEvent::LegacyWarning(message) => {
self.enqueue_thread_legacy_warning(thread_id, message)
.await?;
}
ThreadBufferedEvent::LegacyRollback { num_turns } => {
self.enqueue_thread_legacy_rollback(thread_id, num_turns)
.await?;
}
}
}
self.chat_widget
@@ -4769,7 +4618,6 @@ impl App {
if let Some(channel) = self.thread_event_channels.get(&thread_id) {
let mut store = channel.store.lock().await;
store.apply_thread_rollback(response);
store.note_local_thread_rollback(num_turns);
}
if self.active_thread_id == Some(thread_id)
&& let Some(mut rx) = self.active_thread_rx.take()
@@ -4814,13 +4662,6 @@ impl App {
ThreadBufferedEvent::HistoryEntryResponse(event) => {
self.chat_widget.handle_history_entry_response(event);
}
ThreadBufferedEvent::LegacyWarning(message) => {
self.chat_widget.add_warning_message(message);
}
ThreadBufferedEvent::LegacyRollback { num_turns } => {
self.handle_backtrack_rollback_succeeded(num_turns);
self.chat_widget.handle_thread_rolled_back();
}
}
if needs_refresh {
self.refresh_status_line();
@@ -4838,13 +4679,6 @@ impl App {
ThreadBufferedEvent::HistoryEntryResponse(event) => {
self.chat_widget.handle_history_entry_response(event)
}
ThreadBufferedEvent::LegacyWarning(message) => {
self.chat_widget.add_warning_message(message);
}
ThreadBufferedEvent::LegacyRollback { num_turns } => {
self.handle_backtrack_rollback_succeeded(num_turns);
self.chat_widget.handle_thread_rolled_back();
}
}
}
@@ -5335,6 +5169,7 @@ mod tests {
use codex_app_server_protocol::AdditionalPermissionProfile;
use codex_app_server_protocol::AgentMessageDeltaNotification;
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::NetworkApprovalContext as AppServerNetworkApprovalContext;
use codex_app_server_protocol::NetworkApprovalProtocol as AppServerNetworkApprovalProtocol;
use codex_app_server_protocol::NetworkPolicyAmendment as AppServerNetworkPolicyAmendment;
@@ -5935,33 +5770,6 @@ mod tests {
}
}
#[tokio::test]
async fn replay_thread_snapshot_replays_legacy_warning_history() {
let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await;
app.replay_thread_snapshot(
ThreadEventSnapshot {
session: None,
turns: Vec::new(),
events: vec![ThreadBufferedEvent::LegacyWarning(
"legacy warning message".to_string(),
)],
input_state: None,
},
false,
);
let mut saw_warning = false;
while let Ok(event) = app_event_rx.try_recv() {
if let AppEvent::InsertHistoryCell(cell) = event {
let transcript = lines_to_single_string(&cell.transcript_lines(80));
saw_warning |= transcript.contains("legacy warning message");
}
}
assert!(saw_warning, "expected replayed legacy warning history cell");
}
#[tokio::test]
async fn replay_only_thread_keeps_restored_queue_visible() {
let (mut app, _app_event_rx, _op_rx) = make_test_app_with_channels().await;
@@ -7308,87 +7116,6 @@ guardian_approval = true
Ok(())
}
#[tokio::test]
async fn legacy_warning_eviction_clears_pending_interactive_replay_state() -> Result<()> {
let mut app = make_test_app().await;
let thread_id = ThreadId::new();
let channel = ThreadEventChannel::new(1);
{
let mut store = channel.store.lock().await;
store.push_request(exec_approval_request(
thread_id,
"turn-approval",
"call-approval",
None,
));
assert_eq!(store.has_pending_thread_approvals(), true);
}
app.thread_event_channels.insert(thread_id, channel);
app.enqueue_thread_legacy_warning(thread_id, "legacy warning".to_string())
.await?;
let store = app
.thread_event_channels
.get(&thread_id)
.expect("thread store should exist")
.store
.lock()
.await;
assert_eq!(store.has_pending_thread_approvals(), false);
let snapshot = store.snapshot();
assert_eq!(snapshot.events.len(), 1);
assert!(matches!(
snapshot.events.first(),
Some(ThreadBufferedEvent::LegacyWarning(message)) if message == "legacy warning"
));
Ok(())
}
#[tokio::test]
async fn legacy_thread_rollback_trims_inactive_thread_snapshot_state() -> Result<()> {
let mut app = make_test_app().await;
let thread_id = ThreadId::new();
let session = test_thread_session(thread_id, PathBuf::from("/tmp/project"));
let turns = vec![
test_turn("turn-1", TurnStatus::Completed, Vec::new()),
test_turn("turn-2", TurnStatus::Completed, Vec::new()),
];
let channel = ThreadEventChannel::new_with_session(4, session, turns);
{
let mut store = channel.store.lock().await;
store.push_request(exec_approval_request(
thread_id,
"turn-approval",
"call-approval",
None,
));
assert_eq!(store.has_pending_thread_approvals(), true);
}
app.thread_event_channels.insert(thread_id, channel);
app.enqueue_thread_legacy_rollback(thread_id, 1).await?;
let store = app
.thread_event_channels
.get(&thread_id)
.expect("thread store should exist")
.store
.lock()
.await;
assert_eq!(
store.turns,
vec![test_turn("turn-1", TurnStatus::Completed, Vec::new())]
);
assert_eq!(store.has_pending_thread_approvals(), false);
let snapshot = store.snapshot();
assert_eq!(snapshot.turns, store.turns);
assert!(snapshot.events.is_empty());
Ok(())
}
#[tokio::test]
async fn inactive_thread_started_notification_initializes_replay_session() -> Result<()> {
let mut app = make_test_app().await;
@@ -8108,16 +7835,6 @@ guardian_approval = true
assert_eq!(store.has_pending_thread_approvals(), false);
}
#[test]
fn thread_event_store_consumes_matching_local_legacy_rollback_once() {
let mut store = ThreadEventStore::new(8);
store.note_local_thread_rollback(2);
assert!(store.consume_pending_local_legacy_rollback(2));
assert!(!store.consume_pending_local_legacy_rollback(2));
assert!(!store.consume_pending_local_legacy_rollback(1));
}
fn next_user_turn_op(op_rx: &mut tokio::sync::mpsc::UnboundedReceiver<Op>) -> Op {
let mut seen = Vec::new();
while let Ok(op) = op_rx.try_recv() {
@@ -9076,8 +8793,13 @@ guardian_approval = true
let (tx, rx) = mpsc::channel(8);
app.active_thread_id = Some(thread_id);
app.active_thread_rx = Some(rx);
tx.send(ThreadBufferedEvent::LegacyWarning(
"stale warning".to_string(),
tx.send(ThreadBufferedEvent::Notification(
ServerNotification::ConfigWarning(ConfigWarningNotification {
summary: "stale warning".to_string(),
details: None,
path: None,
range: None,
}),
))
.await
.expect("event should queue");
@@ -9115,62 +8837,6 @@ guardian_approval = true
assert!(matches!(rx.try_recv(), Err(TryRecvError::Empty)));
}
#[tokio::test]
async fn local_rollback_response_suppresses_matching_legacy_rollback() {
let mut app = make_test_app().await;
let thread_id = ThreadId::new();
let session = test_thread_session(thread_id, PathBuf::from("/tmp/project"));
let initial_turns = vec![
test_turn("turn-1", TurnStatus::Completed, Vec::new()),
test_turn("turn-2", TurnStatus::Completed, Vec::new()),
];
app.thread_event_channels.insert(
thread_id,
ThreadEventChannel::new_with_session(8, session, initial_turns),
);
app.handle_thread_rollback_response(
thread_id,
1,
&ThreadRollbackResponse {
thread: Thread {
id: thread_id.to_string(),
preview: String::new(),
ephemeral: false,
model_provider: "openai".to_string(),
created_at: 0,
updated_at: 0,
status: codex_app_server_protocol::ThreadStatus::Idle,
path: None,
cwd: PathBuf::from("/tmp/project"),
cli_version: "0.0.0".to_string(),
source: SessionSource::Cli.into(),
agent_nickname: None,
agent_role: None,
git_info: None,
name: None,
turns: vec![test_turn("turn-1", TurnStatus::Completed, Vec::new())],
},
},
)
.await;
app.enqueue_thread_legacy_rollback(thread_id, 1)
.await
.expect("legacy rollback should not fail");
let store = app
.thread_event_channels
.get(&thread_id)
.expect("thread channel")
.store
.lock()
.await;
let snapshot = store.snapshot();
assert_eq!(snapshot.turns.len(), 1);
assert!(snapshot.events.is_empty());
}
#[tokio::test]
async fn new_session_requests_shutdown_for_previous_conversation() {
let (mut app, mut app_event_rx, mut op_rx) = make_test_app_with_channels().await;

View File

@@ -16,13 +16,9 @@ use crate::app_event::AppEvent;
use crate::app_server_session::AppServerSession;
use crate::app_server_session::app_server_rate_limit_snapshot_to_core;
use crate::app_server_session::status_account_display_from_auth_mode;
use crate::local_chatgpt_auth::load_local_chatgpt_auth;
use codex_app_server_client::AppServerEvent;
use codex_app_server_protocol::AuthMode;
use codex_app_server_protocol::ChatgptAuthTokensRefreshParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
#[cfg(test)]
@@ -106,16 +102,9 @@ use codex_protocol::protocol::TurnAbortedEvent;
use codex_protocol::protocol::TurnCompleteEvent;
#[cfg(test)]
use codex_protocol::protocol::TurnStartedEvent;
use serde_json::Value;
#[cfg(test)]
use std::time::Duration;
#[derive(Debug, PartialEq, Eq)]
enum LegacyThreadNotification {
Warning(String),
Rollback { num_turns: u32 },
}
impl App {
pub(super) async fn handle_app_server_event(
&mut self,
@@ -133,48 +122,7 @@ impl App {
self.handle_server_notification_event(app_server_client, notification)
.await;
}
AppServerEvent::LegacyNotification(notification) => {
if let Some((thread_id, legacy_notification)) =
legacy_thread_notification(notification)
{
let result = match legacy_notification {
LegacyThreadNotification::Warning(message) => {
if self.primary_thread_id == Some(thread_id)
|| self.primary_thread_id.is_none()
{
self.enqueue_primary_thread_legacy_warning(message).await
} else {
self.enqueue_thread_legacy_warning(thread_id, message).await
}
}
LegacyThreadNotification::Rollback { num_turns } => {
if self.primary_thread_id == Some(thread_id)
|| self.primary_thread_id.is_none()
{
self.enqueue_primary_thread_legacy_rollback(num_turns).await
} else {
self.enqueue_thread_legacy_rollback(thread_id, num_turns)
.await
}
}
};
if let Err(err) = result {
tracing::warn!("failed to enqueue app-server legacy notification: {err}");
}
} else {
tracing::debug!("ignoring legacy app-server notification in tui_app_server");
}
}
AppServerEvent::ServerRequest(request) => {
if let ServerRequest::ChatgptAuthTokensRefresh { request_id, params } = request {
self.handle_chatgpt_auth_tokens_refresh_request(
app_server_client,
request_id,
params,
)
.await;
return;
}
self.handle_server_request_event(app_server_client, request)
.await;
}
@@ -293,71 +241,6 @@ impl App {
tracing::warn!("failed to enqueue app-server request: {err}");
}
}
async fn handle_chatgpt_auth_tokens_refresh_request(
&mut self,
app_server_client: &AppServerSession,
request_id: RequestId,
params: ChatgptAuthTokensRefreshParams,
) {
let config = self.config.clone();
let result = tokio::task::spawn_blocking(move || {
resolve_chatgpt_auth_tokens_refresh_response(
&config.codex_home,
config.cli_auth_credentials_store_mode,
config.forced_chatgpt_workspace_id.as_deref(),
&params,
)
})
.await;
match result {
Ok(Ok(response)) => {
let response = serde_json::to_value(response).map_err(|err| {
format!("failed to serialize chatgpt auth refresh response: {err}")
});
match response {
Ok(response) => {
if let Err(err) = app_server_client
.resolve_server_request(request_id, response)
.await
{
tracing::warn!("failed to resolve chatgpt auth refresh request: {err}");
}
}
Err(err) => {
self.chat_widget.add_error_message(err.clone());
if let Err(reject_err) = self
.reject_app_server_request(app_server_client, request_id, err)
.await
{
tracing::warn!("{reject_err}");
}
}
}
}
Ok(Err(err)) => {
self.chat_widget.add_error_message(err.clone());
if let Err(reject_err) = self
.reject_app_server_request(app_server_client, request_id, err)
.await
{
tracing::warn!("{reject_err}");
}
}
Err(err) => {
let message = format!("chatgpt auth refresh task failed: {err}");
self.chat_widget.add_error_message(message.clone());
if let Err(reject_err) = self
.reject_app_server_request(app_server_client, request_id, message)
.await
{
tracing::warn!("{reject_err}");
}
}
}
}
async fn reject_app_server_request(
&self,
app_server_client: &AppServerSession,
@@ -519,28 +402,6 @@ fn server_notification_thread_target(
}
}
fn resolve_chatgpt_auth_tokens_refresh_response(
codex_home: &std::path::Path,
auth_credentials_store_mode: codex_core::auth::AuthCredentialsStoreMode,
forced_chatgpt_workspace_id: Option<&str>,
params: &ChatgptAuthTokensRefreshParams,
) -> Result<codex_app_server_protocol::ChatgptAuthTokensRefreshResponse, String> {
let auth = load_local_chatgpt_auth(
codex_home,
auth_credentials_store_mode,
forced_chatgpt_workspace_id,
)?;
if let Some(previous_account_id) = params.previous_account_id.as_deref()
&& previous_account_id != auth.chatgpt_account_id
{
return Err(format!(
"local ChatGPT auth refresh account mismatch: expected `{previous_account_id}`, got `{}`",
auth.chatgpt_account_id
));
}
Ok(auth.to_refresh_response())
}
#[cfg(test)]
/// Convert a `Thread` snapshot into a flat sequence of protocol `Event`s
/// suitable for replaying into the TUI event store.
@@ -567,48 +428,6 @@ pub(super) fn thread_snapshot_events(
.collect()
}
fn legacy_thread_notification(
notification: JSONRPCNotification,
) -> Option<(ThreadId, LegacyThreadNotification)> {
let method = notification
.method
.strip_prefix("codex/event/")
.unwrap_or(&notification.method);
let Value::Object(mut params) = notification.params? else {
return None;
};
let thread_id = params
.remove("conversationId")
.and_then(|value| serde_json::from_value::<String>(value).ok())
.and_then(|value| ThreadId::from_string(&value).ok())?;
let msg = params.get("msg").and_then(Value::as_object)?;
match method {
"warning" => {
let message = msg
.get("type")
.and_then(Value::as_str)
.zip(msg.get("message"))
.and_then(|(kind, message)| (kind == "warning").then_some(message))
.and_then(Value::as_str)
.map(ToOwned::to_owned)?;
Some((thread_id, LegacyThreadNotification::Warning(message)))
}
"thread_rolled_back" => {
let num_turns = msg
.get("type")
.and_then(Value::as_str)
.zip(msg.get("num_turns"))
.and_then(|(kind, num_turns)| (kind == "thread_rolled_back").then_some(num_turns))
.and_then(Value::as_u64)
.and_then(|num_turns| u32::try_from(num_turns).ok())?;
Some((thread_id, LegacyThreadNotification::Rollback { num_turns }))
}
_ => None,
}
}
#[cfg(test)]
fn server_notification_thread_events(
notification: ServerNotification,
@@ -1153,113 +972,6 @@ fn split_command_string(command: &str) -> Vec<String> {
}
}
#[cfg(test)]
mod refresh_tests {
use super::*;
use base64::Engine;
use chrono::Utc;
use codex_app_server_protocol::AuthMode;
use codex_core::auth::AuthCredentialsStoreMode;
use codex_core::auth::AuthDotJson;
use codex_core::auth::save_auth;
use codex_core::token_data::TokenData;
use pretty_assertions::assert_eq;
use serde::Serialize;
use serde_json::json;
use tempfile::TempDir;
fn fake_jwt(account_id: &str, plan_type: &str) -> String {
#[derive(Serialize)]
struct Header {
alg: &'static str,
typ: &'static str,
}
let header = Header {
alg: "none",
typ: "JWT",
};
let payload = json!({
"email": "user@example.com",
"https://api.openai.com/auth": {
"chatgpt_account_id": account_id,
"chatgpt_plan_type": plan_type,
},
});
let encode = |bytes: &[u8]| base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(bytes);
let header_b64 = encode(&serde_json::to_vec(&header).expect("serialize header"));
let payload_b64 = encode(&serde_json::to_vec(&payload).expect("serialize payload"));
let signature_b64 = encode(b"sig");
format!("{header_b64}.{payload_b64}.{signature_b64}")
}
fn write_chatgpt_auth(codex_home: &std::path::Path) {
let id_token = fake_jwt("workspace-1", "business");
let access_token = fake_jwt("workspace-1", "business");
save_auth(
codex_home,
&AuthDotJson {
auth_mode: Some(AuthMode::Chatgpt),
openai_api_key: None,
tokens: Some(TokenData {
id_token: codex_core::token_data::parse_chatgpt_jwt_claims(&id_token)
.expect("id token should parse"),
access_token,
refresh_token: "refresh-token".to_string(),
account_id: Some("workspace-1".to_string()),
}),
last_refresh: Some(Utc::now()),
},
AuthCredentialsStoreMode::File,
)
.expect("chatgpt auth should save");
}
#[test]
fn refresh_request_uses_local_chatgpt_auth() {
let codex_home = TempDir::new().expect("tempdir");
write_chatgpt_auth(codex_home.path());
let response = resolve_chatgpt_auth_tokens_refresh_response(
codex_home.path(),
AuthCredentialsStoreMode::File,
Some("workspace-1"),
&ChatgptAuthTokensRefreshParams {
reason: codex_app_server_protocol::ChatgptAuthTokensRefreshReason::Unauthorized,
previous_account_id: Some("workspace-1".to_string()),
},
)
.expect("refresh response should resolve");
assert_eq!(response.chatgpt_account_id, "workspace-1");
assert_eq!(response.chatgpt_plan_type.as_deref(), Some("business"));
assert!(!response.access_token.is_empty());
}
#[test]
fn refresh_request_rejects_account_mismatch() {
let codex_home = TempDir::new().expect("tempdir");
write_chatgpt_auth(codex_home.path());
let err = resolve_chatgpt_auth_tokens_refresh_response(
codex_home.path(),
AuthCredentialsStoreMode::File,
Some("workspace-1"),
&ChatgptAuthTokensRefreshParams {
reason: codex_app_server_protocol::ChatgptAuthTokensRefreshReason::Unauthorized,
previous_account_id: Some("workspace-2".to_string()),
},
)
.expect_err("mismatched account should fail");
assert_eq!(
err,
"local ChatGPT auth refresh account mismatch: expected `workspace-2`, got `workspace-1`"
);
}
}
#[cfg(test)]
fn app_server_web_search_action_to_core(
action: codex_app_server_protocol::WebSearchAction,
@@ -1289,9 +1001,7 @@ fn app_server_codex_error_info_to_core(
#[cfg(test)]
mod tests {
use super::LegacyThreadNotification;
use super::command_execution_started_event;
use super::legacy_thread_notification;
use super::server_notification_thread_events;
use super::thread_snapshot_events;
use super::turn_snapshot_events;
@@ -1303,7 +1013,6 @@ mod tests {
use codex_app_server_protocol::CommandExecutionStatus;
use codex_app_server_protocol::ItemCompletedNotification;
use codex_app_server_protocol::ItemStartedNotification;
use codex_app_server_protocol::JSONRPCNotification;
use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::Thread;
@@ -1324,57 +1033,8 @@ mod tests {
use codex_protocol::protocol::TurnAbortReason;
use codex_protocol::protocol::TurnAbortedEvent;
use pretty_assertions::assert_eq;
use serde_json::json;
use std::path::PathBuf;
#[test]
fn legacy_warning_notification_extracts_thread_id_and_message() {
let thread_id = ThreadId::new();
let warning = legacy_thread_notification(JSONRPCNotification {
method: "codex/event/warning".to_string(),
params: Some(json!({
"conversationId": thread_id.to_string(),
"id": "event-1",
"msg": {
"type": "warning",
"message": "legacy warning message",
},
})),
});
assert_eq!(
warning,
Some((
thread_id,
LegacyThreadNotification::Warning("legacy warning message".to_string())
))
);
}
#[test]
fn legacy_thread_rollback_notification_extracts_thread_id_and_turn_count() {
let thread_id = ThreadId::new();
let rollback = legacy_thread_notification(JSONRPCNotification {
method: "codex/event/thread_rolled_back".to_string(),
params: Some(json!({
"conversationId": thread_id.to_string(),
"id": "event-1",
"msg": {
"type": "thread_rolled_back",
"num_turns": 2,
},
})),
});
assert_eq!(
rollback,
Some((
thread_id,
LegacyThreadNotification::Rollback { num_turns: 2 }
))
);
}
#[test]
fn bridges_completed_agent_messages_from_server_notifications() {
let thread_id = "019cee8c-b993-7e33-88c0-014d4e62612d".to_string();

View File

@@ -9548,10 +9548,6 @@ impl ChatWidget {
self.request_redraw();
}
pub(crate) fn add_warning_message(&mut self, message: String) {
self.on_warning(message);
}
fn add_app_server_stub_message(&mut self, feature: &str) {
warn!(feature, "stubbed unsupported app-server TUI feature");
self.add_error_message(format!("{feature}: {APP_SERVER_TUI_STUB_MESSAGE}"));

View File

@@ -1,7 +1,6 @@
use std::path::Path;
use codex_app_server_protocol::AuthMode;
use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse;
use codex_core::auth::AuthCredentialsStoreMode;
use codex_core::auth::load_auth_dot_json;
@@ -12,16 +11,6 @@ pub(crate) struct LocalChatgptAuth {
pub(crate) chatgpt_plan_type: Option<String>,
}
impl LocalChatgptAuth {
pub(crate) fn to_refresh_response(&self) -> ChatgptAuthTokensRefreshResponse {
ChatgptAuthTokensRefreshResponse {
access_token: self.access_token.clone(),
chatgpt_account_id: self.chatgpt_account_id.clone(),
chatgpt_plan_type: self.chatgpt_plan_type.clone(),
}
}
}
pub(crate) fn load_local_chatgpt_auth(
codex_home: &Path,
auth_credentials_store_mode: AuthCredentialsStoreMode,

View File

@@ -504,7 +504,6 @@ pub(crate) async fn run_onboarding_app(
return Err(color_eyre::eyre::eyre!(message));
}
AppServerEvent::Lagged { .. }
| AppServerEvent::LegacyNotification(_)
| AppServerEvent::ServerRequest(_) => {}
}
}