app-server: run initialized rpcs with keyed serialization (#17373)

## Why

Initialized app-server RPCs no longer need to bottleneck behind one
request processor path. Running them concurrently improves
responsiveness, but several request families still mutate shared state
or depend on ordered side effects. Those stateful families need an
auditable serialization contract so concurrency does not reorder thread,
config, auth, command, watcher, MCP, or similar state transitions.

This PR keeps that boundary explicit: stateful work is serialized by the
smallest useful key, while intentionally read-only or externally
concurrent work remains unkeyed. In particular, `thread/list` and
`thread/turns/list` explicitly have no serialization because they
primarily read append-only rollout storage and should continue to be
served concurrently.

## What changed

- Adds `ClientRequest::serialization_scope()` in `app-server-protocol`
and requires every client request definition to declare its
serialization behavior.
- Introduces keyed request scopes for thread, thread path, command exec
process, fuzzy search session, fs watch, MCP OAuth, and global state
buckets such as config, account auth, memory, and device keys.
- Routes initialized app-server RPCs through per-key FIFO serialization
while allowing unkeyed initialized requests to run concurrently.
- Cancels in-flight initialized RPC work when the connection disconnects
or the app-server exits so spawned request tasks do not outlive their
session.
- Adds focused coverage for representative keyed and unkeyed
serialization scopes, including explicitly concurrent
`thread/turns/list` behavior.

## Validation

- Added protocol tests for representative keyed serialization scopes and
intentionally unkeyed request families.
- Added app-server request serialization tests covering per-key FIFO
behavior, concurrent unkeyed execution, disconnect shutdown, and config
read-after-write ordering.
- Local focused protocol validation after the latest rebase is currently
blocked by packageproxy failing to resolve locked `rustls-webpki
0.103.13`; CI is expected to provide the full validation signal.
This commit is contained in:
Ruslan Nigmatullin
2026-04-28 12:23:34 -07:00
committed by GitHub
parent 7f7c7c2c07
commit 0700f979ba
8 changed files with 1218 additions and 15 deletions

View File

@@ -1,4 +1,5 @@
use std::path::Path;
use std::path::PathBuf;
use crate::JSONRPCNotification;
use crate::JSONRPCRequest;
@@ -73,6 +74,76 @@ macro_rules! experimental_type_entry {
};
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ClientRequestSerializationScope {
Global(&'static str),
Thread { thread_id: String },
ThreadPath { path: PathBuf },
CommandExecProcess { process_id: String },
FuzzyFileSearchSession { session_id: String },
FsWatch { watch_id: String },
McpOauth { server_name: String },
}
macro_rules! serialization_scope_expr {
($actual_params:ident, None) => {
None
};
($actual_params:ident, global($key:literal)) => {
Some(ClientRequestSerializationScope::Global($key))
};
($actual_params:ident, thread_id($params:ident . $field:ident)) => {
Some(ClientRequestSerializationScope::Thread {
thread_id: $actual_params.$field.clone(),
})
};
($actual_params:ident, optional_thread_id($params:ident . $field:ident)) => {
$actual_params
.$field
.clone()
.map(|thread_id| ClientRequestSerializationScope::Thread { thread_id })
};
($actual_params:ident, thread_or_path($params:ident . $thread_field:ident, $params2:ident . $path_field:ident)) => {
if !$actual_params.$thread_field.is_empty() {
Some(ClientRequestSerializationScope::Thread {
thread_id: $actual_params.$thread_field.clone(),
})
} else if let Some(path) = $actual_params.$path_field.clone() {
Some(ClientRequestSerializationScope::ThreadPath { path })
} else {
Some(ClientRequestSerializationScope::Thread {
thread_id: $actual_params.$thread_field.clone(),
})
}
};
($actual_params:ident, optional_command_process_id($params:ident . $field:ident)) => {
$actual_params
.$field
.clone()
.map(|process_id| ClientRequestSerializationScope::CommandExecProcess { process_id })
};
($actual_params:ident, command_process_id($params:ident . $field:ident)) => {
Some(ClientRequestSerializationScope::CommandExecProcess {
process_id: $actual_params.$field.clone(),
})
};
($actual_params:ident, fuzzy_session_id($params:ident . $field:ident)) => {
Some(ClientRequestSerializationScope::FuzzyFileSearchSession {
session_id: $actual_params.$field.clone(),
})
};
($actual_params:ident, fs_watch_id($params:ident . $field:ident)) => {
Some(ClientRequestSerializationScope::FsWatch {
watch_id: $actual_params.$field.clone(),
})
};
($actual_params:ident, mcp_oauth_server($params:ident . $field:ident)) => {
Some(ClientRequestSerializationScope::McpOauth {
server_name: $actual_params.$field.clone(),
})
};
}
/// Generates an `enum ClientRequest` where each variant is a request that the
/// client can send to the server. Each variant has associated `params` and
/// `response` types. Also generates a `export_client_responses()` function to
@@ -85,6 +156,7 @@ macro_rules! client_request_definitions {
$variant:ident $(=> $wire:literal)? {
params: $(#[$params_meta:meta])* $params:ty,
$(inspect_params: $inspect_params:tt,)?
serialization: $serialization:ident $( ( $($serialization_args:tt)* ) )?,
response: $response:ty,
}
),* $(,)?
@@ -123,6 +195,19 @@ macro_rules! client_request_definitions {
})
.unwrap_or_else(|| "<unknown>".to_string())
}
pub fn serialization_scope(&self) -> Option<ClientRequestSerializationScope> {
match self {
$(
Self::$variant { params, .. } => {
let _ = params;
serialization_scope_expr!(
params, $serialization $( ( $($serialization_args)* ) )?
)
}
)*
}
}
}
/// Typed response from the server to the client.
@@ -235,6 +320,7 @@ macro_rules! client_request_definitions {
client_request_definitions! {
Initialize {
params: v1::InitializeParams,
serialization: None,
response: v1::InitializeResponse,
},
@@ -244,24 +330,29 @@ client_request_definitions! {
ThreadStart => "thread/start" {
params: v2::ThreadStartParams,
inspect_params: true,
serialization: None,
response: v2::ThreadStartResponse,
},
ThreadResume => "thread/resume" {
params: v2::ThreadResumeParams,
inspect_params: true,
serialization: thread_or_path(params.thread_id, params.path),
response: v2::ThreadResumeResponse,
},
ThreadFork => "thread/fork" {
params: v2::ThreadForkParams,
inspect_params: true,
serialization: thread_or_path(params.thread_id, params.path),
response: v2::ThreadForkResponse,
},
ThreadArchive => "thread/archive" {
params: v2::ThreadArchiveParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadArchiveResponse,
},
ThreadUnsubscribe => "thread/unsubscribe" {
params: v2::ThreadUnsubscribeParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadUnsubscribeResponse,
},
#[experimental("thread/increment_elicitation")]
@@ -271,6 +362,7 @@ client_request_definitions! {
/// approval or other elicitation is pending outside the app-server request flow.
ThreadIncrementElicitation => "thread/increment_elicitation" {
params: v2::ThreadIncrementElicitationParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadIncrementElicitationResponse,
},
#[experimental("thread/decrement_elicitation")]
@@ -279,302 +371,372 @@ client_request_definitions! {
/// When the count reaches zero, timeout accounting resumes for the thread.
ThreadDecrementElicitation => "thread/decrement_elicitation" {
params: v2::ThreadDecrementElicitationParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadDecrementElicitationResponse,
},
ThreadSetName => "thread/name/set" {
params: v2::ThreadSetNameParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadSetNameResponse,
},
#[experimental("thread/goal/set")]
ThreadGoalSet => "thread/goal/set" {
params: v2::ThreadGoalSetParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadGoalSetResponse,
},
#[experimental("thread/goal/get")]
ThreadGoalGet => "thread/goal/get" {
params: v2::ThreadGoalGetParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadGoalGetResponse,
},
#[experimental("thread/goal/clear")]
ThreadGoalClear => "thread/goal/clear" {
params: v2::ThreadGoalClearParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadGoalClearResponse,
},
ThreadMetadataUpdate => "thread/metadata/update" {
params: v2::ThreadMetadataUpdateParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadMetadataUpdateResponse,
},
#[experimental("thread/memoryMode/set")]
ThreadMemoryModeSet => "thread/memoryMode/set" {
params: v2::ThreadMemoryModeSetParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadMemoryModeSetResponse,
},
#[experimental("memory/reset")]
MemoryReset => "memory/reset" {
params: #[ts(type = "undefined")] #[serde(skip_serializing_if = "Option::is_none")] Option<()>,
serialization: global("memory"),
response: v2::MemoryResetResponse,
},
ThreadUnarchive => "thread/unarchive" {
params: v2::ThreadUnarchiveParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadUnarchiveResponse,
},
ThreadCompactStart => "thread/compact/start" {
params: v2::ThreadCompactStartParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadCompactStartResponse,
},
ThreadShellCommand => "thread/shellCommand" {
params: v2::ThreadShellCommandParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadShellCommandResponse,
},
ThreadApproveGuardianDeniedAction => "thread/approveGuardianDeniedAction" {
params: v2::ThreadApproveGuardianDeniedActionParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadApproveGuardianDeniedActionResponse,
},
#[experimental("thread/backgroundTerminals/clean")]
ThreadBackgroundTerminalsClean => "thread/backgroundTerminals/clean" {
params: v2::ThreadBackgroundTerminalsCleanParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadBackgroundTerminalsCleanResponse,
},
ThreadRollback => "thread/rollback" {
params: v2::ThreadRollbackParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadRollbackResponse,
},
ThreadList => "thread/list" {
params: v2::ThreadListParams,
serialization: None,
response: v2::ThreadListResponse,
},
ThreadLoadedList => "thread/loaded/list" {
params: v2::ThreadLoadedListParams,
serialization: None,
response: v2::ThreadLoadedListResponse,
},
ThreadRead => "thread/read" {
params: v2::ThreadReadParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadReadResponse,
},
ThreadTurnsList => "thread/turns/list" {
params: v2::ThreadTurnsListParams,
// Explicitly concurrent: this primarily reads append-only rollout storage.
serialization: None,
response: v2::ThreadTurnsListResponse,
},
/// Append raw Responses API items to the thread history without starting a user turn.
ThreadInjectItems => "thread/inject_items" {
params: v2::ThreadInjectItemsParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadInjectItemsResponse,
},
SkillsList => "skills/list" {
params: v2::SkillsListParams,
serialization: global("config"),
response: v2::SkillsListResponse,
},
MarketplaceAdd => "marketplace/add" {
params: v2::MarketplaceAddParams,
serialization: global("config"),
response: v2::MarketplaceAddResponse,
},
MarketplaceRemove => "marketplace/remove" {
params: v2::MarketplaceRemoveParams,
serialization: global("config"),
response: v2::MarketplaceRemoveResponse,
},
MarketplaceUpgrade => "marketplace/upgrade" {
params: v2::MarketplaceUpgradeParams,
serialization: global("config"),
response: v2::MarketplaceUpgradeResponse,
},
PluginList => "plugin/list" {
params: v2::PluginListParams,
serialization: global("config"),
response: v2::PluginListResponse,
},
PluginRead => "plugin/read" {
params: v2::PluginReadParams,
serialization: global("config"),
response: v2::PluginReadResponse,
},
AppsList => "app/list" {
params: v2::AppsListParams,
serialization: None,
response: v2::AppsListResponse,
},
DeviceKeyCreate => "device/key/create" {
params: v2::DeviceKeyCreateParams,
serialization: global("device-key"),
response: v2::DeviceKeyCreateResponse,
},
DeviceKeyPublic => "device/key/public" {
params: v2::DeviceKeyPublicParams,
serialization: global("device-key"),
response: v2::DeviceKeyPublicResponse,
},
DeviceKeySign => "device/key/sign" {
params: v2::DeviceKeySignParams,
serialization: global("device-key"),
response: v2::DeviceKeySignResponse,
},
// File system requests are intentionally concurrent. Desktop already treats local
// file system operations as concurrent, and app-server remote fs mirrors that model.
FsReadFile => "fs/readFile" {
params: v2::FsReadFileParams,
serialization: None,
response: v2::FsReadFileResponse,
},
FsWriteFile => "fs/writeFile" {
params: v2::FsWriteFileParams,
serialization: None,
response: v2::FsWriteFileResponse,
},
FsCreateDirectory => "fs/createDirectory" {
params: v2::FsCreateDirectoryParams,
serialization: None,
response: v2::FsCreateDirectoryResponse,
},
FsGetMetadata => "fs/getMetadata" {
params: v2::FsGetMetadataParams,
serialization: None,
response: v2::FsGetMetadataResponse,
},
FsReadDirectory => "fs/readDirectory" {
params: v2::FsReadDirectoryParams,
serialization: None,
response: v2::FsReadDirectoryResponse,
},
FsRemove => "fs/remove" {
params: v2::FsRemoveParams,
serialization: None,
response: v2::FsRemoveResponse,
},
FsCopy => "fs/copy" {
params: v2::FsCopyParams,
serialization: None,
response: v2::FsCopyResponse,
},
FsWatch => "fs/watch" {
params: v2::FsWatchParams,
serialization: fs_watch_id(params.watch_id),
response: v2::FsWatchResponse,
},
FsUnwatch => "fs/unwatch" {
params: v2::FsUnwatchParams,
serialization: fs_watch_id(params.watch_id),
response: v2::FsUnwatchResponse,
},
SkillsConfigWrite => "skills/config/write" {
params: v2::SkillsConfigWriteParams,
serialization: global("config"),
response: v2::SkillsConfigWriteResponse,
},
PluginInstall => "plugin/install" {
params: v2::PluginInstallParams,
serialization: global("config"),
response: v2::PluginInstallResponse,
},
PluginUninstall => "plugin/uninstall" {
params: v2::PluginUninstallParams,
serialization: global("config"),
response: v2::PluginUninstallResponse,
},
TurnStart => "turn/start" {
params: v2::TurnStartParams,
inspect_params: true,
serialization: thread_id(params.thread_id),
response: v2::TurnStartResponse,
},
TurnSteer => "turn/steer" {
params: v2::TurnSteerParams,
inspect_params: true,
serialization: thread_id(params.thread_id),
response: v2::TurnSteerResponse,
},
TurnInterrupt => "turn/interrupt" {
params: v2::TurnInterruptParams,
serialization: thread_id(params.thread_id),
response: v2::TurnInterruptResponse,
},
#[experimental("thread/realtime/start")]
ThreadRealtimeStart => "thread/realtime/start" {
params: v2::ThreadRealtimeStartParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadRealtimeStartResponse,
},
#[experimental("thread/realtime/appendAudio")]
ThreadRealtimeAppendAudio => "thread/realtime/appendAudio" {
params: v2::ThreadRealtimeAppendAudioParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadRealtimeAppendAudioResponse,
},
#[experimental("thread/realtime/appendText")]
ThreadRealtimeAppendText => "thread/realtime/appendText" {
params: v2::ThreadRealtimeAppendTextParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadRealtimeAppendTextResponse,
},
#[experimental("thread/realtime/stop")]
ThreadRealtimeStop => "thread/realtime/stop" {
params: v2::ThreadRealtimeStopParams,
serialization: thread_id(params.thread_id),
response: v2::ThreadRealtimeStopResponse,
},
#[experimental("thread/realtime/listVoices")]
ThreadRealtimeListVoices => "thread/realtime/listVoices" {
params: v2::ThreadRealtimeListVoicesParams,
serialization: None,
response: v2::ThreadRealtimeListVoicesResponse,
},
ReviewStart => "review/start" {
params: v2::ReviewStartParams,
serialization: thread_id(params.thread_id),
response: v2::ReviewStartResponse,
},
ModelList => "model/list" {
params: v2::ModelListParams,
serialization: None,
response: v2::ModelListResponse,
},
ExperimentalFeatureList => "experimentalFeature/list" {
params: v2::ExperimentalFeatureListParams,
serialization: global("config"),
response: v2::ExperimentalFeatureListResponse,
},
ExperimentalFeatureEnablementSet => "experimentalFeature/enablement/set" {
params: v2::ExperimentalFeatureEnablementSetParams,
serialization: global("config"),
response: v2::ExperimentalFeatureEnablementSetResponse,
},
#[experimental("collaborationMode/list")]
/// Lists collaboration mode presets.
CollaborationModeList => "collaborationMode/list" {
params: v2::CollaborationModeListParams,
serialization: None,
response: v2::CollaborationModeListResponse,
},
#[experimental("mock/experimentalMethod")]
/// Test-only method used to validate experimental gating.
MockExperimentalMethod => "mock/experimentalMethod" {
params: v2::MockExperimentalMethodParams,
serialization: None,
response: v2::MockExperimentalMethodResponse,
},
McpServerOauthLogin => "mcpServer/oauth/login" {
params: v2::McpServerOauthLoginParams,
serialization: mcp_oauth_server(params.name),
response: v2::McpServerOauthLoginResponse,
},
McpServerRefresh => "config/mcpServer/reload" {
params: #[ts(type = "undefined")] #[serde(skip_serializing_if = "Option::is_none")] Option<()>,
serialization: global("mcp-registry"),
response: v2::McpServerRefreshResponse,
},
McpServerStatusList => "mcpServerStatus/list" {
params: v2::ListMcpServerStatusParams,
serialization: global("mcp-registry"),
response: v2::ListMcpServerStatusResponse,
},
McpResourceRead => "mcpServer/resource/read" {
params: v2::McpResourceReadParams,
serialization: optional_thread_id(params.thread_id),
response: v2::McpResourceReadResponse,
},
McpServerToolCall => "mcpServer/tool/call" {
params: v2::McpServerToolCallParams,
serialization: thread_id(params.thread_id),
response: v2::McpServerToolCallResponse,
},
WindowsSandboxSetupStart => "windowsSandbox/setupStart" {
params: v2::WindowsSandboxSetupStartParams,
serialization: global("windows-sandbox-setup"),
response: v2::WindowsSandboxSetupStartResponse,
},
LoginAccount => "account/login/start" {
params: v2::LoginAccountParams,
inspect_params: true,
serialization: global("account-auth"),
response: v2::LoginAccountResponse,
},
CancelLoginAccount => "account/login/cancel" {
params: v2::CancelLoginAccountParams,
serialization: global("account-auth"),
response: v2::CancelLoginAccountResponse,
},
LogoutAccount => "account/logout" {
params: #[ts(type = "undefined")] #[serde(skip_serializing_if = "Option::is_none")] Option<()>,
serialization: global("account-auth"),
response: v2::LogoutAccountResponse,
},
GetAccountRateLimits => "account/rateLimits/read" {
params: #[ts(type = "undefined")] #[serde(skip_serializing_if = "Option::is_none")] Option<()>,
serialization: None,
response: v2::GetAccountRateLimitsResponse,
},
SendAddCreditsNudgeEmail => "account/sendAddCreditsNudgeEmail" {
params: v2::SendAddCreditsNudgeEmailParams,
serialization: global("account-auth"),
response: v2::SendAddCreditsNudgeEmailResponse,
},
FeedbackUpload => "feedback/upload" {
params: v2::FeedbackUploadParams,
serialization: None,
response: v2::FeedbackUploadResponse,
},
@@ -582,86 +744,106 @@ client_request_definitions! {
OneOffCommandExec => "command/exec" {
params: v2::CommandExecParams,
inspect_params: true,
serialization: optional_command_process_id(params.process_id),
response: v2::CommandExecResponse,
},
/// Write stdin bytes to a running `command/exec` session or close stdin.
CommandExecWrite => "command/exec/write" {
params: v2::CommandExecWriteParams,
serialization: command_process_id(params.process_id),
response: v2::CommandExecWriteResponse,
},
/// Terminate a running `command/exec` session by client-supplied `processId`.
CommandExecTerminate => "command/exec/terminate" {
params: v2::CommandExecTerminateParams,
serialization: command_process_id(params.process_id),
response: v2::CommandExecTerminateResponse,
},
/// Resize a running PTY-backed `command/exec` session by client-supplied `processId`.
CommandExecResize => "command/exec/resize" {
params: v2::CommandExecResizeParams,
serialization: command_process_id(params.process_id),
response: v2::CommandExecResizeResponse,
},
ConfigRead => "config/read" {
params: v2::ConfigReadParams,
serialization: global("config"),
response: v2::ConfigReadResponse,
},
ExternalAgentConfigDetect => "externalAgentConfig/detect" {
params: v2::ExternalAgentConfigDetectParams,
serialization: global("config"),
response: v2::ExternalAgentConfigDetectResponse,
},
ExternalAgentConfigImport => "externalAgentConfig/import" {
params: v2::ExternalAgentConfigImportParams,
serialization: global("config"),
response: v2::ExternalAgentConfigImportResponse,
},
ConfigValueWrite => "config/value/write" {
params: v2::ConfigValueWriteParams,
serialization: global("config"),
response: v2::ConfigWriteResponse,
},
ConfigBatchWrite => "config/batchWrite" {
params: v2::ConfigBatchWriteParams,
serialization: global("config"),
response: v2::ConfigWriteResponse,
},
ConfigRequirementsRead => "configRequirements/read" {
params: #[ts(type = "undefined")] #[serde(skip_serializing_if = "Option::is_none")] Option<()>,
serialization: global("config"),
response: v2::ConfigRequirementsReadResponse,
},
GetAccount => "account/read" {
params: v2::GetAccountParams,
serialization: global("account-auth"),
response: v2::GetAccountResponse,
},
/// DEPRECATED APIs below
GetConversationSummary {
params: v1::GetConversationSummaryParams,
serialization: None,
response: v1::GetConversationSummaryResponse,
},
GitDiffToRemote {
params: v1::GitDiffToRemoteParams,
serialization: None,
response: v1::GitDiffToRemoteResponse,
},
/// DEPRECATED in favor of GetAccount
GetAuthStatus {
params: v1::GetAuthStatusParams,
serialization: global("account-auth"),
response: v1::GetAuthStatusResponse,
},
// Legacy fuzzy search cancellation is intentionally concurrent: clients reuse a
// cancellation token so a newer request can cancel an older in-flight search.
FuzzyFileSearch {
params: FuzzyFileSearchParams,
serialization: None,
response: FuzzyFileSearchResponse,
},
#[experimental("fuzzyFileSearch/sessionStart")]
FuzzyFileSearchSessionStart => "fuzzyFileSearch/sessionStart" {
params: FuzzyFileSearchSessionStartParams,
serialization: fuzzy_session_id(params.session_id),
response: FuzzyFileSearchSessionStartResponse,
},
#[experimental("fuzzyFileSearch/sessionUpdate")]
FuzzyFileSearchSessionUpdate => "fuzzyFileSearch/sessionUpdate" {
params: FuzzyFileSearchSessionUpdateParams,
serialization: fuzzy_session_id(params.session_id),
response: FuzzyFileSearchSessionUpdateResponse,
},
#[experimental("fuzzyFileSearch/sessionStop")]
FuzzyFileSearchSessionStop => "fuzzyFileSearch/sessionStop" {
params: FuzzyFileSearchSessionStopParams,
serialization: fuzzy_session_id(params.session_id),
response: FuzzyFileSearchSessionStopResponse,
},
}
@@ -1150,6 +1332,325 @@ mod tests {
test_path_buf(&path).abs()
}
fn request_id() -> RequestId {
const REQUEST_ID: i64 = 1;
RequestId::Integer(REQUEST_ID)
}
#[test]
fn client_request_serialization_scope_covers_keyed_families() {
let thread_id = "thread-1".to_string();
let thread_resume = ClientRequest::ThreadResume {
request_id: request_id(),
params: v2::ThreadResumeParams {
thread_id: thread_id.clone(),
..Default::default()
},
};
assert_eq!(
thread_resume.serialization_scope(),
Some(ClientRequestSerializationScope::Thread {
thread_id: thread_id.clone()
})
);
let thread_resume_with_path = ClientRequest::ThreadResume {
request_id: request_id(),
params: v2::ThreadResumeParams {
thread_id: thread_id.clone(),
path: Some(PathBuf::from("/tmp/resume-thread.jsonl")),
..Default::default()
},
};
assert_eq!(
thread_resume_with_path.serialization_scope(),
Some(ClientRequestSerializationScope::Thread {
thread_id: thread_id.clone()
})
);
let thread_fork = ClientRequest::ThreadFork {
request_id: request_id(),
params: v2::ThreadForkParams {
thread_id: thread_id.clone(),
path: Some(PathBuf::from("/tmp/source-thread.jsonl")),
..Default::default()
},
};
assert_eq!(
thread_fork.serialization_scope(),
Some(ClientRequestSerializationScope::Thread { thread_id })
);
let command_exec = ClientRequest::OneOffCommandExec {
request_id: request_id(),
params: v2::CommandExecParams {
command: vec!["sleep".to_string(), "10".to_string()],
process_id: Some("proc-1".to_string()),
tty: false,
stream_stdin: false,
stream_stdout_stderr: false,
output_bytes_cap: None,
disable_output_cap: false,
disable_timeout: false,
timeout_ms: None,
cwd: None,
env: None,
size: None,
sandbox_policy: None,
permission_profile: None,
},
};
assert_eq!(
command_exec.serialization_scope(),
Some(ClientRequestSerializationScope::CommandExecProcess {
process_id: "proc-1".to_string()
})
);
let fuzzy_update = ClientRequest::FuzzyFileSearchSessionUpdate {
request_id: request_id(),
params: FuzzyFileSearchSessionUpdateParams {
session_id: "search-1".to_string(),
query: "lib".to_string(),
},
};
assert_eq!(
fuzzy_update.serialization_scope(),
Some(ClientRequestSerializationScope::FuzzyFileSearchSession {
session_id: "search-1".to_string()
})
);
let fs_watch = ClientRequest::FsWatch {
request_id: request_id(),
params: v2::FsWatchParams {
watch_id: "watch-1".to_string(),
path: absolute_path("/tmp/repo"),
},
};
assert_eq!(
fs_watch.serialization_scope(),
Some(ClientRequestSerializationScope::FsWatch {
watch_id: "watch-1".to_string()
})
);
let plugin_install = ClientRequest::PluginInstall {
request_id: request_id(),
params: v2::PluginInstallParams {
marketplace_path: Some(absolute_path("/tmp/marketplace")),
remote_marketplace_name: None,
plugin_name: "plugin-a".to_string(),
},
};
assert_eq!(
plugin_install.serialization_scope(),
Some(ClientRequestSerializationScope::Global("config"))
);
let plugin_uninstall = ClientRequest::PluginUninstall {
request_id: request_id(),
params: v2::PluginUninstallParams {
plugin_id: "plugin-a".to_string(),
},
};
assert_eq!(
plugin_uninstall.serialization_scope(),
Some(ClientRequestSerializationScope::Global("config"))
);
let mcp_oauth = ClientRequest::McpServerOauthLogin {
request_id: request_id(),
params: v2::McpServerOauthLoginParams {
name: "server-a".to_string(),
scopes: None,
timeout_secs: None,
},
};
assert_eq!(
mcp_oauth.serialization_scope(),
Some(ClientRequestSerializationScope::McpOauth {
server_name: "server-a".to_string()
})
);
let mcp_resource_read = ClientRequest::McpResourceRead {
request_id: request_id(),
params: v2::McpResourceReadParams {
thread_id: Some("thread-1".to_string()),
server: "server-a".to_string(),
uri: "file:///tmp/resource".to_string(),
},
};
assert_eq!(
mcp_resource_read.serialization_scope(),
Some(ClientRequestSerializationScope::Thread {
thread_id: "thread-1".to_string()
})
);
let config_read = ClientRequest::ConfigRead {
request_id: request_id(),
params: v2::ConfigReadParams {
include_layers: false,
cwd: None,
},
};
assert_eq!(
config_read.serialization_scope(),
Some(ClientRequestSerializationScope::Global("config"))
);
let account_read = ClientRequest::GetAccount {
request_id: request_id(),
params: v2::GetAccountParams {
refresh_token: false,
},
};
assert_eq!(
account_read.serialization_scope(),
Some(ClientRequestSerializationScope::Global("account-auth"))
);
let thread_goal_set = ClientRequest::ThreadGoalSet {
request_id: request_id(),
params: v2::ThreadGoalSetParams {
thread_id: "goal-thread".to_string(),
objective: Some("ship it".to_string()),
status: None,
token_budget: None,
},
};
assert_eq!(
thread_goal_set.serialization_scope(),
Some(ClientRequestSerializationScope::Thread {
thread_id: "goal-thread".to_string()
})
);
let guardian_approval = ClientRequest::ThreadApproveGuardianDeniedAction {
request_id: request_id(),
params: v2::ThreadApproveGuardianDeniedActionParams {
thread_id: "guardian-thread".to_string(),
event: json!({ "type": "guardian" }),
},
};
assert_eq!(
guardian_approval.serialization_scope(),
Some(ClientRequestSerializationScope::Thread {
thread_id: "guardian-thread".to_string()
})
);
let marketplace_remove = ClientRequest::MarketplaceRemove {
request_id: request_id(),
params: v2::MarketplaceRemoveParams {
marketplace_name: "marketplace".to_string(),
},
};
assert_eq!(
marketplace_remove.serialization_scope(),
Some(ClientRequestSerializationScope::Global("config"))
);
let device_key_create = ClientRequest::DeviceKeyCreate {
request_id: request_id(),
params: v2::DeviceKeyCreateParams {
protection_policy: None,
account_user_id: "user".to_string(),
client_id: "client".to_string(),
},
};
assert_eq!(
device_key_create.serialization_scope(),
Some(ClientRequestSerializationScope::Global("device-key"))
);
let add_credits_nudge = ClientRequest::SendAddCreditsNudgeEmail {
request_id: request_id(),
params: v2::SendAddCreditsNudgeEmailParams {
credit_type: v2::AddCreditsNudgeCreditType::Credits,
},
};
assert_eq!(
add_credits_nudge.serialization_scope(),
Some(ClientRequestSerializationScope::Global("account-auth"))
);
}
#[test]
fn client_request_serialization_scope_covers_unkeyed_representatives() {
let initialize = ClientRequest::Initialize {
request_id: request_id(),
params: v1::InitializeParams {
client_info: v1::ClientInfo {
name: "test".to_string(),
title: None,
version: "0.1.0".to_string(),
},
capabilities: None,
},
};
assert_eq!(initialize.serialization_scope(), None);
let thread_start = ClientRequest::ThreadStart {
request_id: request_id(),
params: v2::ThreadStartParams::default(),
};
assert_eq!(thread_start.serialization_scope(), None);
let command_exec = ClientRequest::OneOffCommandExec {
request_id: request_id(),
params: v2::CommandExecParams {
command: vec!["true".to_string()],
process_id: None,
tty: false,
stream_stdin: false,
stream_stdout_stderr: false,
output_bytes_cap: None,
disable_output_cap: false,
disable_timeout: false,
timeout_ms: None,
cwd: None,
env: None,
size: None,
sandbox_policy: None,
permission_profile: None,
},
};
assert_eq!(command_exec.serialization_scope(), None);
let fs_read = ClientRequest::FsReadFile {
request_id: request_id(),
params: v2::FsReadFileParams {
path: absolute_path("/tmp/file.txt"),
},
};
assert_eq!(fs_read.serialization_scope(), None);
let thread_turns_list = ClientRequest::ThreadTurnsList {
request_id: request_id(),
params: v2::ThreadTurnsListParams {
thread_id: "thread-1".to_string(),
cursor: None,
limit: None,
sort_direction: None,
},
};
assert_eq!(thread_turns_list.serialization_scope(), None);
let mcp_resource_read = ClientRequest::McpResourceRead {
request_id: request_id(),
params: v2::McpResourceReadParams {
thread_id: None,
server: "server-a".to_string(),
uri: "file:///tmp/resource".to_string(),
},
};
assert_eq!(mcp_resource_read.serialization_scope(), None);
}
#[test]
fn serialize_get_conversation_summary() -> Result<()> {
let request = ClientRequest::GetConversationSummary {