[codex] Refactor app-server dispatch result flow (#20897)

## Why

App-server request handling had response sending spread across many
individual handlers, which made it harder to see which requests return
payloads, which methods send their own delayed response, and which
branches emit notifications after a response.

## What changed

- Centralized normal `ClientResponsePayload` sending in the dispatch
path.
- Kept explicit-response methods explicit where they need custom
ordering or delayed delivery.
- Removed forward-only handler wrappers and immediate `async { ...
}.await` bodies where they were not needed.
- Moved branch-specific post-response notifications into the branches
that own the response ordering.
- Replaced unreachable delegated request-family error arms with explicit
`unreachable!` cases.

## Verification

- `cargo check -p codex-app-server`
- `cargo test -p codex-app-server thread_goal`
- `just fix -p codex-app-server`
This commit is contained in:
pakrym-oai
2026-05-03 18:57:46 -07:00
committed by GitHub
parent 67849d950d
commit 9ddfda9db7
4 changed files with 1751 additions and 2516 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -8,15 +8,6 @@ use codex_core_plugins::remote::validate_remote_plugin_id;
impl CodexMessageProcessor {
pub(super) async fn plugin_list(
&self,
request_id: ConnectionRequestId,
params: PluginListParams,
) {
let result = self.plugin_list_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn plugin_list_response(
&self,
params: PluginListParams,
) -> Result<PluginListResponse, JSONRPCErrorError> {
@@ -174,15 +165,6 @@ impl CodexMessageProcessor {
}
pub(super) async fn plugin_read(
&self,
request_id: ConnectionRequestId,
params: PluginReadParams,
) {
let result = self.plugin_read_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn plugin_read_response(
&self,
params: PluginReadParams,
) -> Result<PluginReadResponse, JSONRPCErrorError> {
@@ -303,15 +285,6 @@ impl CodexMessageProcessor {
}
pub(super) async fn plugin_skill_read(
&self,
request_id: ConnectionRequestId,
params: PluginSkillReadParams,
) {
let result = self.plugin_skill_read_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn plugin_skill_read_response(
&self,
params: PluginSkillReadParams,
) -> Result<PluginSkillReadResponse, JSONRPCErrorError> {
@@ -358,15 +331,6 @@ impl CodexMessageProcessor {
}
pub(super) async fn plugin_share_save(
&self,
request_id: ConnectionRequestId,
params: PluginShareSaveParams,
) {
let result = self.plugin_share_save_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn plugin_share_save_response(
&self,
params: PluginShareSaveParams,
) -> Result<PluginShareSaveResponse, JSONRPCErrorError> {
@@ -403,15 +367,7 @@ impl CodexMessageProcessor {
pub(super) async fn plugin_share_list(
&self,
request_id: ConnectionRequestId,
_params: PluginShareListParams,
) {
let result = self.plugin_share_list_response().await;
self.outgoing.send_result(request_id, result).await;
}
async fn plugin_share_list_response(
&self,
) -> Result<PluginShareListResponse, JSONRPCErrorError> {
let (config, auth) = self.load_plugin_share_config_and_auth().await?;
let remote_plugin_service_config = RemotePluginServiceConfig {
@@ -443,15 +399,6 @@ impl CodexMessageProcessor {
}
pub(super) async fn plugin_share_delete(
&self,
request_id: ConnectionRequestId,
params: PluginShareDeleteParams,
) {
let result = self.plugin_share_delete_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn plugin_share_delete_response(
&self,
params: PluginShareDeleteParams,
) -> Result<PluginShareDeleteResponse, JSONRPCErrorError> {
@@ -490,15 +437,6 @@ impl CodexMessageProcessor {
}
pub(super) async fn plugin_install(
&self,
request_id: ConnectionRequestId,
params: PluginInstallParams,
) {
let result = self.plugin_install_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn plugin_install_response(
&self,
params: PluginInstallParams,
) -> Result<PluginInstallResponse, JSONRPCErrorError> {
@@ -760,15 +698,6 @@ impl CodexMessageProcessor {
}
pub(super) async fn plugin_uninstall(
&self,
request_id: ConnectionRequestId,
params: PluginUninstallParams,
) {
let result = self.plugin_uninstall_response(params).await;
self.outgoing.send_result(request_id, result).await;
}
async fn plugin_uninstall_response(
&self,
params: PluginUninstallParams,
) -> Result<PluginUninstallResponse, JSONRPCErrorError> {
@@ -975,28 +904,16 @@ fn remote_plugin_catalog_error_to_jsonrpc(
err: RemotePluginCatalogError,
context: &str,
) -> JSONRPCErrorError {
match err {
let code = match &err {
RemotePluginCatalogError::AuthRequired | RemotePluginCatalogError::UnsupportedAuthMode => {
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("{context}: {err}"),
data: None,
}
INVALID_REQUEST_ERROR_CODE
}
RemotePluginCatalogError::UnexpectedStatus { status, .. } if status.as_u16() == 404 => {
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("{context}: {err}"),
data: None,
}
INVALID_REQUEST_ERROR_CODE
}
RemotePluginCatalogError::InvalidPluginPath { .. }
| RemotePluginCatalogError::ArchiveTooLarge { .. }
| RemotePluginCatalogError::UnknownMarketplace { .. } => JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("{context}: {err}"),
data: None,
},
| RemotePluginCatalogError::UnknownMarketplace { .. } => INVALID_REQUEST_ERROR_CODE,
RemotePluginCatalogError::AuthToken(_)
| RemotePluginCatalogError::Request { .. }
| RemotePluginCatalogError::UnexpectedStatus { .. }
@@ -1010,11 +927,12 @@ fn remote_plugin_catalog_error_to_jsonrpc(
| RemotePluginCatalogError::ArchiveJoin(_)
| RemotePluginCatalogError::MissingUploadEtag
| RemotePluginCatalogError::UnexpectedResponse(_)
| RemotePluginCatalogError::CacheRemove(_) => JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("{context}: {err}"),
data: None,
},
| RemotePluginCatalogError::CacheRemove(_) => INTERNAL_ERROR_CODE,
};
JSONRPCErrorError {
code,
message: format!("{context}: {err}"),
data: None,
}
}

View File

@@ -6,63 +6,26 @@ impl CodexMessageProcessor {
&self,
request_id: ConnectionRequestId,
params: ThreadGoalSetParams,
) {
) -> Result<(), JSONRPCErrorError> {
if !self.config.features.enabled(Feature::Goals) {
self.send_invalid_request_error(request_id, "goals feature is disabled".to_string())
.await;
return;
return Err(invalid_request("goals feature is disabled"));
}
let thread_id = match parse_thread_id_for_request(params.thread_id.as_str()) {
Ok(thread_id) => thread_id,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let state_db = match self.state_db_for_materialized_thread(thread_id).await {
Ok(state_db) => state_db,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let thread_id = parse_thread_id_for_request(params.thread_id.as_str())?;
let state_db = self.state_db_for_materialized_thread(thread_id).await?;
let running_thread = self.thread_manager.get_thread(thread_id).await.ok();
let rollout_path = match running_thread.as_ref() {
Some(thread) => match thread.rollout_path() {
Some(path) => path,
None => {
self.send_invalid_request_error(
request_id,
format!("ephemeral thread does not support goals: {thread_id}"),
)
.await;
return;
}
},
None => {
match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string())
.await
{
Ok(Some(path)) => path,
Ok(None) => {
self.send_invalid_request_error(
request_id,
format!("thread not found: {thread_id}"),
)
.await;
return;
}
Err(err) => {
self.send_internal_error(
request_id,
format!("failed to locate thread id {thread_id}: {err}"),
)
.await;
return;
}
}
}
Some(thread) => thread.rollout_path().ok_or_else(|| {
invalid_request(format!(
"ephemeral thread does not support goals: {thread_id}"
))
})?,
None => find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string())
.await
.map_err(|err| {
internal_error(format!("failed to locate thread id {thread_id}: {err}"))
})?
.ok_or_else(|| invalid_request(format!("thread not found: {thread_id}")))?,
};
reconcile_rollout(
Some(&state_db),
@@ -84,61 +47,51 @@ impl CodexMessageProcessor {
let objective = params.objective.as_deref().map(str::trim);
if let Some(objective) = objective {
if let Err(message) = validate_thread_goal_objective(objective) {
self.send_invalid_request_error(request_id, message).await;
return;
}
if let Err(message) = validate_goal_budget(params.token_budget.flatten()) {
self.send_invalid_request_error(request_id, message).await;
return;
}
} else if let Some(token_budget) = params.token_budget
&& let Err(message) = validate_goal_budget(token_budget)
{
self.send_invalid_request_error(request_id, message).await;
return;
validate_thread_goal_objective(objective).map_err(invalid_request)?;
}
if objective.is_some() || params.token_budget.is_some() {
validate_goal_budget(params.token_budget.flatten()).map_err(invalid_request)?;
}
if let Some(thread) = running_thread.as_ref() {
thread.prepare_external_goal_mutation().await;
}
let goal = if let Some(objective) = objective {
match state_db.get_thread_goal(thread_id).await {
Ok(goal) => {
if let Some(goal) = goal.as_ref().filter(|goal| {
goal.objective == objective
&& goal.status != codex_state::ThreadGoalStatus::Complete
}) {
state_db
.update_thread_goal(
thread_id,
codex_state::ThreadGoalUpdate {
status,
token_budget: params.token_budget,
expected_goal_id: Some(goal.goal_id.clone()),
},
let goal = (if let Some(objective) = objective {
let existing_goal = state_db
.get_thread_goal(thread_id)
.await
.map_err(|err| invalid_request(err.to_string()))?;
if let Some(goal) = existing_goal.as_ref().filter(|goal| {
goal.objective == objective
&& goal.status != codex_state::ThreadGoalStatus::Complete
}) {
state_db
.update_thread_goal(
thread_id,
codex_state::ThreadGoalUpdate {
status,
token_budget: params.token_budget,
expected_goal_id: Some(goal.goal_id.clone()),
},
)
.await
.and_then(|goal| {
goal.ok_or_else(|| {
anyhow::anyhow!(
"cannot update goal for thread {thread_id}: no goal exists"
)
.await
.and_then(|goal| {
goal.ok_or_else(|| {
anyhow::anyhow!(
"cannot update goal for thread {thread_id}: no goal exists"
)
})
})
} else {
state_db
.replace_thread_goal(
thread_id,
objective,
status.unwrap_or(codex_state::ThreadGoalStatus::Active),
params.token_budget.flatten(),
)
.await
}
}
Err(err) => Err(err),
})
})
} else {
state_db
.replace_thread_goal(
thread_id,
objective,
status.unwrap_or(codex_state::ThreadGoalStatus::Active),
params.token_budget.flatten(),
)
.await
}
} else {
state_db
@@ -156,16 +109,8 @@ impl CodexMessageProcessor {
anyhow::anyhow!("cannot update goal for thread {thread_id}: no goal exists")
})
})
};
let goal = match goal {
Ok(goal) => goal,
Err(err) => {
self.send_invalid_request_error(request_id, err.to_string())
.await;
return;
}
};
})
.map_err(|err| invalid_request(err.to_string()))?;
let goal_status = goal.status;
let goal = api_thread_goal_from_state(goal);
self.outgoing
@@ -179,107 +124,51 @@ impl CodexMessageProcessor {
if let Some(thread) = running_thread.as_ref() {
thread.apply_external_goal_set(goal_status).await;
}
Ok(())
}
pub(super) async fn thread_goal_get(
&self,
request_id: ConnectionRequestId,
params: ThreadGoalGetParams,
) {
) -> Result<ThreadGoalGetResponse, JSONRPCErrorError> {
if !self.config.features.enabled(Feature::Goals) {
self.send_invalid_request_error(request_id, "goals feature is disabled".to_string())
.await;
return;
return Err(invalid_request("goals feature is disabled"));
}
let thread_id = match parse_thread_id_for_request(params.thread_id.as_str()) {
Ok(thread_id) => thread_id,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let state_db = match self.state_db_for_materialized_thread(thread_id).await {
Ok(state_db) => state_db,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let goal = match state_db.get_thread_goal(thread_id).await {
Ok(goal) => goal.map(api_thread_goal_from_state),
Err(err) => {
self.send_internal_error(request_id, format!("failed to read thread goal: {err}"))
.await;
return;
}
};
self.outgoing
.send_response(request_id, ThreadGoalGetResponse { goal })
.await;
let thread_id = parse_thread_id_for_request(params.thread_id.as_str())?;
let state_db = self.state_db_for_materialized_thread(thread_id).await?;
let goal = state_db
.get_thread_goal(thread_id)
.await
.map_err(|err| internal_error(format!("failed to read thread goal: {err}")))?
.map(api_thread_goal_from_state);
Ok(ThreadGoalGetResponse { goal })
}
pub(super) async fn thread_goal_clear(
&self,
request_id: ConnectionRequestId,
params: ThreadGoalClearParams,
) {
) -> Result<(), JSONRPCErrorError> {
if !self.config.features.enabled(Feature::Goals) {
self.send_invalid_request_error(request_id, "goals feature is disabled".to_string())
.await;
return;
return Err(invalid_request("goals feature is disabled"));
}
let thread_id = match parse_thread_id_for_request(params.thread_id.as_str()) {
Ok(thread_id) => thread_id,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let state_db = match self.state_db_for_materialized_thread(thread_id).await {
Ok(state_db) => state_db,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let thread_id = parse_thread_id_for_request(params.thread_id.as_str())?;
let state_db = self.state_db_for_materialized_thread(thread_id).await?;
let running_thread = self.thread_manager.get_thread(thread_id).await.ok();
let rollout_path = match running_thread.as_ref() {
Some(thread) => match thread.rollout_path() {
Some(path) => path,
None => {
self.send_invalid_request_error(
request_id,
format!("ephemeral thread does not support goals: {thread_id}"),
)
.await;
return;
}
},
None => {
match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string())
.await
{
Ok(Some(path)) => path,
Ok(None) => {
self.send_invalid_request_error(
request_id,
format!("thread not found: {thread_id}"),
)
.await;
return;
}
Err(err) => {
self.send_internal_error(
request_id,
format!("failed to locate thread id {thread_id}: {err}"),
)
.await;
return;
}
}
}
Some(thread) => thread.rollout_path().ok_or_else(|| {
invalid_request(format!(
"ephemeral thread does not support goals: {thread_id}"
))
})?,
None => find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string())
.await
.map_err(|err| {
internal_error(format!("failed to locate thread id {thread_id}: {err}"))
})?
.ok_or_else(|| invalid_request(format!("thread not found: {thread_id}")))?,
};
reconcile_rollout(
Some(&state_db),
@@ -301,14 +190,10 @@ impl CodexMessageProcessor {
let thread_state = thread_state.lock().await;
thread_state.listener_command_tx()
};
let cleared = match state_db.delete_thread_goal(thread_id).await {
Ok(cleared) => cleared,
Err(err) => {
self.send_internal_error(request_id, format!("failed to clear thread goal: {err}"))
.await;
return;
}
};
let cleared = state_db
.delete_thread_goal(thread_id)
.await
.map_err(|err| internal_error(format!("failed to clear thread goal: {err}")))?;
if cleared && let Some(thread) = running_thread.as_ref() {
thread.apply_external_goal_clear().await;
@@ -321,6 +206,7 @@ impl CodexMessageProcessor {
self.emit_thread_goal_cleared_ordered(thread_id, listener_command_tx)
.await;
}
Ok(())
}
async fn state_db_for_materialized_thread(
@@ -337,18 +223,12 @@ impl CodexMessageProcessor {
return Ok(state_db);
}
} else {
match find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string()).await
{
Ok(Some(_)) => {}
Ok(None) => {
return Err(invalid_request(format!("thread not found: {thread_id}")));
}
Err(err) => {
return Err(internal_error(format!(
"failed to locate thread id {thread_id}: {err}"
)));
}
}
find_thread_path_by_id_str(&self.config.codex_home, &thread_id.to_string())
.await
.map_err(|err| {
internal_error(format!("failed to locate thread id {thread_id}: {err}"))
})?
.ok_or_else(|| invalid_request(format!("thread not found: {thread_id}")))?;
}
open_state_db_for_direct_thread_lookup(&self.config)

View File

@@ -38,14 +38,8 @@ use codex_app_server_protocol::ClientInfo;
use codex_app_server_protocol::ClientNotification;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::ConfigBatchWriteParams;
use codex_app_server_protocol::ConfigValueWriteParams;
use codex_app_server_protocol::ConfigWarningNotification;
use codex_app_server_protocol::DeviceKeyCreateParams;
use codex_app_server_protocol::DeviceKeyPublicParams;
use codex_app_server_protocol::DeviceKeySignParams;
use codex_app_server_protocol::ExperimentalApi;
use codex_app_server_protocol::ExperimentalFeatureEnablementSetParams;
use codex_app_server_protocol::ExternalAgentConfigImportCompletedNotification;
use codex_app_server_protocol::ExternalAgentConfigImportParams;
use codex_app_server_protocol::ExternalAgentConfigImportResponse;
@@ -394,25 +388,29 @@ impl MessageProcessor {
Arc::clone(&self.outgoing),
request_context.clone(),
async {
let result = async {
let request_json = serde_json::to_value(&request)
.map_err(|err| invalid_request(format!("Invalid request: {err}")))?;
let codex_request = serde_json::from_value::<ClientRequest>(request_json)
.map_err(|err| invalid_request(format!("Invalid request: {err}")))?;
// Websocket callers finalize outbound readiness in lib.rs after mirroring
// session state into outbound state and sending initialize notifications to
// this specific connection. Passing `None` avoids marking the connection
// ready too early from inside the shared request handler.
self.handle_client_request(
request_id.clone(),
codex_request,
Arc::clone(&session),
/*outbound_initialized*/ None,
request_context.clone(),
)
.await
}
.await;
let codex_request = serde_json::to_value(&request)
.map_err(|err| invalid_request(format!("Invalid request: {err}")))
.and_then(|request_json| {
serde_json::from_value::<ClientRequest>(request_json)
.map_err(|err| invalid_request(format!("Invalid request: {err}")))
});
let result = match codex_request {
Ok(codex_request) => {
// Websocket callers finalize outbound readiness in lib.rs after mirroring
// session state into outbound state and sending initialize notifications to
// this specific connection. Passing `None` avoids marking the connection
// ready too early from inside the shared request handler.
self.handle_client_request(
request_id.clone(),
codex_request,
Arc::clone(&session),
/*outbound_initialized*/ None,
request_context.clone(),
)
.await
}
Err(error) => Err(error),
};
if let Err(error) = result {
self.outgoing.send_error(request_id.clone(), error).await;
}
@@ -792,161 +790,141 @@ impl MessageProcessor {
device_key_requests_allowed: bool,
) -> Result<(), JSONRPCErrorError> {
let connection_id = connection_request_id.connection_id;
let request_id_for_connection = |request_id| ConnectionRequestId {
let request_id = ConnectionRequestId {
connection_id,
request_id,
request_id: codex_request.id().clone(),
};
match codex_request {
ClientRequest::ConfigRead { request_id, params } => {
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.config_api.read(params).await,
let result: Result<Option<ClientResponsePayload>, JSONRPCErrorError> = match codex_request {
ClientRequest::ConfigRead { params, .. } => self
.config_api
.read(params)
.await
.map(|response| Some(response.into())),
ClientRequest::ExternalAgentConfigDetect { params, .. } => self
.external_agent_config_api
.detect(params)
.await
.map(|response| Some(response.into())),
ClientRequest::ExternalAgentConfigImport { params, .. } => self
.handle_external_agent_config_import(request_id.clone(), params)
.await
.map(|()| None),
ClientRequest::ConfigValueWrite { params, .. } => self
.handle_config_mutation_result(self.config_api.write_value(params).await)
.await
.map(|response| Some(ClientResponsePayload::ConfigValueWrite(response))),
ClientRequest::ConfigBatchWrite { params, .. } => self
.handle_config_mutation_result(self.config_api.batch_write(params).await)
.await
.map(|response| Some(ClientResponsePayload::ConfigBatchWrite(response))),
ClientRequest::ExperimentalFeatureEnablementSet { params, .. } => {
let should_refresh_apps_list = params.enablement.get("apps").copied() == Some(true);
match self
.handle_config_mutation_result(
self.config_api
.set_experimental_feature_enablement(params)
.await,
)
.await;
.await
{
Ok(response) => {
self.outgoing
.send_response_as(
request_id.clone(),
ClientResponsePayload::ExperimentalFeatureEnablementSet(response),
)
.await;
if should_refresh_apps_list {
self.refresh_apps_list_after_experimental_feature_enablement_set()
.await;
}
Ok(None)
}
Err(error) => Err(error),
}
}
ClientRequest::ExternalAgentConfigDetect { request_id, params } => {
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.external_agent_config_api.detect(params).await,
)
.await;
}
ClientRequest::ExternalAgentConfigImport { request_id, params } => {
self.handle_external_agent_config_import(
request_id_for_connection(request_id),
params,
)
.await?;
}
ClientRequest::ConfigValueWrite { request_id, params } => {
self.handle_config_value_write(request_id_for_connection(request_id), params)
.await;
}
ClientRequest::ConfigBatchWrite { request_id, params } => {
self.handle_config_batch_write(request_id_for_connection(request_id), params)
.await;
}
ClientRequest::ExperimentalFeatureEnablementSet { request_id, params } => {
self.handle_experimental_feature_enablement_set(
request_id_for_connection(request_id),
params,
)
.await;
}
ClientRequest::ConfigRequirementsRead {
request_id,
params: _,
} => {
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.config_api.config_requirements_read().await,
)
.await;
}
ClientRequest::DeviceKeyCreate { request_id, params } => {
self.handle_device_key_create(
request_id_for_connection(request_id),
params,
ClientRequest::ConfigRequirementsRead { params: _, .. } => self
.config_api
.config_requirements_read()
.await
.map(|response| Some(response.into())),
ClientRequest::DeviceKeyCreate { params, .. } => {
self.spawn_device_key_request(
request_id.clone(),
"device/key/create",
device_key_requests_allowed,
move |device_key_api| async move { device_key_api.create(params).await },
);
Ok(None)
}
ClientRequest::DeviceKeyPublic { request_id, params } => {
self.handle_device_key_public(
request_id_for_connection(request_id),
params,
ClientRequest::DeviceKeyPublic { params, .. } => {
self.spawn_device_key_request(
request_id.clone(),
"device/key/public",
device_key_requests_allowed,
move |device_key_api| async move { device_key_api.public(params).await },
);
Ok(None)
}
ClientRequest::DeviceKeySign { request_id, params } => {
self.handle_device_key_sign(
request_id_for_connection(request_id),
params,
ClientRequest::DeviceKeySign { params, .. } => {
self.spawn_device_key_request(
request_id.clone(),
"device/key/sign",
device_key_requests_allowed,
move |device_key_api| async move { device_key_api.sign(params).await },
);
Ok(None)
}
ClientRequest::FsReadFile { request_id, params } => {
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.fs_api.read_file(params).await,
)
.await;
}
ClientRequest::FsWriteFile { request_id, params } => {
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.fs_api.write_file(params).await,
)
.await;
}
ClientRequest::FsCreateDirectory { request_id, params } => {
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.fs_api.create_directory(params).await,
)
.await;
}
ClientRequest::FsGetMetadata { request_id, params } => {
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.fs_api.get_metadata(params).await,
)
.await;
}
ClientRequest::FsReadDirectory { request_id, params } => {
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.fs_api.read_directory(params).await,
)
.await;
}
ClientRequest::FsRemove { request_id, params } => {
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.fs_api.remove(params).await,
)
.await;
}
ClientRequest::FsCopy { request_id, params } => {
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.fs_api.copy(params).await,
)
.await;
}
ClientRequest::FsWatch { request_id, params } => {
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.fs_watch_manager.watch(connection_id, params).await,
)
.await;
}
ClientRequest::FsUnwatch { request_id, params } => {
self.outgoing
.send_result(
request_id_for_connection(request_id),
self.fs_watch_manager.unwatch(connection_id, params).await,
)
.await;
}
ClientRequest::ModelProviderCapabilitiesRead {
request_id,
params: _,
} => {
self.handle_model_provider_capabilities_read(request_id_for_connection(request_id))
.await;
}
ClientRequest::FsReadFile { params, .. } => self
.fs_api
.read_file(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsWriteFile { params, .. } => self
.fs_api
.write_file(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsCreateDirectory { params, .. } => self
.fs_api
.create_directory(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsGetMetadata { params, .. } => self
.fs_api
.get_metadata(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsReadDirectory { params, .. } => self
.fs_api
.read_directory(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsRemove { params, .. } => self
.fs_api
.remove(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsCopy { params, .. } => self
.fs_api
.copy(params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsWatch { params, .. } => self
.fs_watch_manager
.watch(connection_id, params)
.await
.map(|response| Some(response.into())),
ClientRequest::FsUnwatch { params, .. } => self
.fs_watch_manager
.unwatch(connection_id, params)
.await
.map(|response| Some(response.into())),
ClientRequest::ModelProviderCapabilitiesRead { params: _, .. } => self
.handle_model_provider_capabilities_read()
.await
.map(|response| Some(response.into())),
other => {
// Box the delegated future so this wrapper's async state machine does not
// inline the full `CodexMessageProcessor::process_request` future, which
@@ -961,78 +939,38 @@ impl MessageProcessor {
)
.boxed()
.await;
Ok(None)
}
};
match result {
Ok(Some(response)) => {
self.outgoing
.send_response_as(request_id.clone(), response)
.await;
}
Ok(None) => {}
Err(error) => {
self.outgoing.send_error(request_id.clone(), error).await;
}
}
Ok(())
}
async fn handle_model_provider_capabilities_read(&self, request_id: ConnectionRequestId) {
let result = async {
let config = self
.config_api
.load_latest_config(/*fallback_cwd*/ None)
.await?;
let provider = create_model_provider(config.model_provider, /*auth_manager*/ None);
let capabilities = provider.capabilities();
Ok::<_, JSONRPCErrorError>(ModelProviderCapabilitiesReadResponse {
namespace_tools: capabilities.namespace_tools,
image_generation: capabilities.image_generation,
web_search: capabilities.web_search,
})
}
.await;
self.outgoing.send_result(request_id, result).await;
}
async fn handle_config_value_write(
async fn handle_model_provider_capabilities_read(
&self,
request_id: ConnectionRequestId,
params: ConfigValueWriteParams,
) {
let result = self.config_api.write_value(params).await;
self.handle_config_mutation_result(
request_id,
result,
ClientResponsePayload::ConfigValueWrite,
)
.await
}
async fn handle_config_batch_write(
&self,
request_id: ConnectionRequestId,
params: ConfigBatchWriteParams,
) {
let result = self.config_api.batch_write(params).await;
self.handle_config_mutation_result(
request_id,
result,
ClientResponsePayload::ConfigBatchWrite,
)
.await;
}
async fn handle_experimental_feature_enablement_set(
&self,
request_id: ConnectionRequestId,
params: ExperimentalFeatureEnablementSetParams,
) {
let should_refresh_apps_list = params.enablement.get("apps").copied() == Some(true);
let result = self
) -> Result<ModelProviderCapabilitiesReadResponse, JSONRPCErrorError> {
let config = self
.config_api
.set_experimental_feature_enablement(params)
.await;
let is_ok = result.is_ok();
self.handle_config_mutation_result(
request_id,
result,
ClientResponsePayload::ExperimentalFeatureEnablementSet,
)
.await;
if should_refresh_apps_list && is_ok {
self.refresh_apps_list_after_experimental_feature_enablement_set()
.await;
}
.load_latest_config(/*fallback_cwd*/ None)
.await?;
let provider = create_model_provider(config.model_provider, /*auth_manager*/ None);
let capabilities = provider.capabilities();
Ok(ModelProviderCapabilitiesReadResponse {
namespace_tools: capabilities.namespace_tools,
image_generation: capabilities.image_generation,
web_search: capabilities.web_search,
})
}
async fn refresh_apps_list_after_experimental_feature_enablement_set(&self) {
@@ -1106,19 +1044,11 @@ impl MessageProcessor {
async fn handle_config_mutation_result<T>(
&self,
request_id: ConnectionRequestId,
result: std::result::Result<T, JSONRPCErrorError>,
wrap_success: impl FnOnce(T) -> ClientResponsePayload,
) {
match result {
Ok(response) => {
self.handle_config_mutation().await;
self.outgoing
.send_response_as(request_id, wrap_success(response))
.await;
}
Err(error) => self.outgoing.send_error(request_id, error).await,
}
) -> Result<T, JSONRPCErrorError> {
let response = result?;
self.handle_config_mutation().await;
Ok(response)
}
async fn handle_config_mutation(&self) {
@@ -1144,48 +1074,6 @@ impl MessageProcessor {
}
}
fn handle_device_key_create(
&self,
request_id: ConnectionRequestId,
params: DeviceKeyCreateParams,
device_key_requests_allowed: bool,
) {
self.spawn_device_key_request(
request_id,
"device/key/create",
device_key_requests_allowed,
move |device_key_api| async move { device_key_api.create(params).await },
);
}
fn handle_device_key_public(
&self,
request_id: ConnectionRequestId,
params: DeviceKeyPublicParams,
device_key_requests_allowed: bool,
) {
self.spawn_device_key_request(
request_id,
"device/key/public",
device_key_requests_allowed,
move |device_key_api| async move { device_key_api.public(params).await },
);
}
fn handle_device_key_sign(
&self,
request_id: ConnectionRequestId,
params: DeviceKeySignParams,
device_key_requests_allowed: bool,
) {
self.spawn_device_key_request(
request_id,
"device/key/sign",
device_key_requests_allowed,
move |device_key_api| async move { device_key_api.sign(params).await },
);
}
fn spawn_device_key_request<R, F, Fut>(
&self,
request_id: ConnectionRequestId,
@@ -1200,15 +1088,13 @@ impl MessageProcessor {
let device_key_api = self.device_key_api.clone();
let outgoing = Arc::clone(&self.outgoing);
tokio::spawn(async move {
let result = async {
if !device_key_requests_allowed {
return Err(invalid_request(format!(
"{method} is not available over remote transports"
)));
}
let result = if !device_key_requests_allowed {
Err(invalid_request(format!(
"{method} is not available over remote transports"
)))
} else {
run_request(device_key_api).await
}
.await;
};
outgoing.send_result(request_id, result).await;
});
}