mirror of
https://github.com/openai/codex.git
synced 2026-04-30 01:16:54 +00:00
Move codex exec onto typed app-server APIs
This commit is contained in:
@@ -9,6 +9,7 @@ mod event_processor;
|
||||
mod event_processor_with_human_output;
|
||||
pub mod event_processor_with_jsonl_output;
|
||||
pub mod exec_events;
|
||||
mod typed_exec_event;
|
||||
|
||||
pub use cli::Cli;
|
||||
pub use cli::Command;
|
||||
@@ -17,11 +18,9 @@ use codex_app_server_client::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY;
|
||||
use codex_app_server_client::InProcessAppServerClient;
|
||||
use codex_app_server_client::InProcessClientStartArgs;
|
||||
use codex_app_server_client::InProcessServerEvent;
|
||||
use codex_app_server_protocol::ChatgptAuthTokensRefreshResponse;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::ConfigWarningNotification;
|
||||
use codex_app_server_protocol::JSONRPCErrorError;
|
||||
use codex_app_server_protocol::JSONRPCNotification;
|
||||
use codex_app_server_protocol::McpServerElicitationAction;
|
||||
use codex_app_server_protocol::McpServerElicitationRequestResponse;
|
||||
use codex_app_server_protocol::RequestId;
|
||||
@@ -30,8 +29,12 @@ use codex_app_server_protocol::ReviewStartResponse;
|
||||
use codex_app_server_protocol::ReviewTarget as ApiReviewTarget;
|
||||
use codex_app_server_protocol::ServerNotification;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::ThreadListParams;
|
||||
use codex_app_server_protocol::ThreadListResponse;
|
||||
use codex_app_server_protocol::ThreadResumeParams;
|
||||
use codex_app_server_protocol::ThreadResumeResponse;
|
||||
use codex_app_server_protocol::ThreadSortKey;
|
||||
use codex_app_server_protocol::ThreadSourceKind;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadUnsubscribeParams;
|
||||
@@ -41,8 +44,7 @@ use codex_app_server_protocol::TurnInterruptResponse;
|
||||
use codex_app_server_protocol::TurnStartParams;
|
||||
use codex_app_server_protocol::TurnStartResponse;
|
||||
use codex_arg0::Arg0DispatchPaths;
|
||||
use codex_cloud_requirements::cloud_requirements_loader;
|
||||
use codex_core::AuthManager;
|
||||
use codex_cloud_requirements::cloud_requirements_loader_for_storage;
|
||||
use codex_core::LMSTUDIO_OSS_PROVIDER_ID;
|
||||
use codex_core::OLLAMA_OSS_PROVIDER_ID;
|
||||
use codex_core::auth::AuthConfig;
|
||||
@@ -62,11 +64,8 @@ use codex_core::git_info::get_git_repo_root;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_otel::set_parent_from_context;
|
||||
use codex_otel::traceparent_context_from_env;
|
||||
use codex_protocol::account::PlanType as AccountPlanType;
|
||||
use codex_protocol::config_types::SandboxMode;
|
||||
use codex_protocol::protocol::AskForApproval;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::ReviewRequest;
|
||||
use codex_protocol::protocol::ReviewTarget;
|
||||
use codex_protocol::protocol::SessionConfiguredEvent;
|
||||
@@ -79,8 +78,6 @@ use event_processor_with_human_output::EventProcessorWithHumanOutput;
|
||||
use event_processor_with_jsonl_output::EventProcessorWithJsonOutput;
|
||||
use serde_json::Value;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::collections::VecDeque;
|
||||
use std::io::IsTerminal;
|
||||
use std::io::Read;
|
||||
use std::path::PathBuf;
|
||||
@@ -99,10 +96,10 @@ use uuid::Uuid;
|
||||
use crate::cli::Command as ExecCommand;
|
||||
use crate::event_processor::CodexStatus;
|
||||
use crate::event_processor::EventProcessor;
|
||||
use crate::typed_exec_event::TypedExecEvent;
|
||||
use crate::typed_exec_event::session_configured_from_thread_response;
|
||||
use codex_core::default_client::set_default_client_residency_requirement;
|
||||
use codex_core::default_client::set_default_originator;
|
||||
use codex_core::find_thread_path_by_id_str;
|
||||
use codex_core::find_thread_path_by_name_str;
|
||||
|
||||
const DEFAULT_ANALYTICS_ENABLED: bool = true;
|
||||
|
||||
@@ -287,18 +284,17 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
|
||||
}
|
||||
};
|
||||
|
||||
let cloud_auth_manager = AuthManager::shared(
|
||||
codex_home.clone(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
config_toml.cli_auth_credentials_store.unwrap_or_default(),
|
||||
);
|
||||
let chatgpt_base_url = config_toml
|
||||
.chatgpt_base_url
|
||||
.clone()
|
||||
.unwrap_or_else(|| "https://chatgpt.com/backend-api/".to_string());
|
||||
// TODO(gt): Make cloud requirements failures blocking once we can fail-closed.
|
||||
let cloud_requirements =
|
||||
cloud_requirements_loader(cloud_auth_manager, chatgpt_base_url, codex_home.clone());
|
||||
let cloud_requirements = cloud_requirements_loader_for_storage(
|
||||
codex_home.clone(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
config_toml.cli_auth_credentials_store.unwrap_or_default(),
|
||||
chatgpt_base_url,
|
||||
);
|
||||
let run_cli_overrides = cli_kv_overrides.clone();
|
||||
let run_loader_overrides = LoaderOverrides::default();
|
||||
let run_cloud_requirements = cloud_requirements.clone();
|
||||
@@ -450,6 +446,8 @@ pub async fn run_main(cli: Cli, arg0_paths: Arg0DispatchPaths) -> anyhow::Result
|
||||
experimental_api: true,
|
||||
opt_out_notification_methods: Vec::new(),
|
||||
channel_capacity: DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
|
||||
auto_handle_chatgpt_auth_refresh: true,
|
||||
allow_legacy_notifications: false,
|
||||
};
|
||||
run_exec_session(ExecRunArgs {
|
||||
in_process_start_args,
|
||||
@@ -500,14 +498,6 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
last_message_file.clone(),
|
||||
)),
|
||||
};
|
||||
let required_mcp_servers: HashSet<String> = config
|
||||
.mcp_servers
|
||||
.get()
|
||||
.iter()
|
||||
.filter(|(_, server)| server.enabled && server.required)
|
||||
.map(|(name, _)| name.clone())
|
||||
.collect();
|
||||
|
||||
if oss {
|
||||
// We're in the oss section, so provider_id should be Some
|
||||
// Let's handle None case gracefully though just in case
|
||||
@@ -547,17 +537,16 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
anyhow::anyhow!("failed to initialize in-process app-server client: {err}")
|
||||
})?;
|
||||
|
||||
// Handle resume subcommand by resolving a rollout path and using explicit resume API.
|
||||
// Handle resume subcommand through existing `thread/list` + `thread/resume`
|
||||
// APIs so exec no longer reaches into rollout storage directly.
|
||||
let (primary_thread_id, fallback_session_configured) =
|
||||
if let Some(ExecCommand::Resume(args)) = command.as_ref() {
|
||||
let resume_path = resolve_resume_path(&config, args).await?;
|
||||
|
||||
if let Some(path) = resume_path {
|
||||
if let Some(thread_id) = resolve_resume_thread_id(&client, &config, args).await? {
|
||||
let response: ThreadResumeResponse = send_request_with_response(
|
||||
&client,
|
||||
ClientRequest::ThreadResume {
|
||||
request_id: request_ids.next(),
|
||||
params: thread_resume_params_from_config(&config, Some(path)),
|
||||
params: thread_resume_params_from_config(&config, thread_id),
|
||||
},
|
||||
"thread/resume",
|
||||
)
|
||||
@@ -598,7 +587,6 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
};
|
||||
|
||||
let primary_thread_id_for_span = primary_thread_id.to_string();
|
||||
let mut buffered_events = VecDeque::new();
|
||||
// Use the start/resume response as the authoritative bootstrap payload.
|
||||
// Waiting for a later streamed `SessionConfigured` event adds up to 10s of
|
||||
// avoidable startup latency on the in-process path.
|
||||
@@ -670,10 +658,7 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
// is using.
|
||||
event_processor.print_config_summary(&config, &prompt_summary, &session_configured);
|
||||
if !json_mode && let Some(message) = codex_core::config::missing_system_bwrap_warning() {
|
||||
let _ = event_processor.process_event(Event {
|
||||
id: String::new(),
|
||||
msg: EventMsg::Warning(codex_protocol::protocol::WarningEvent { message }),
|
||||
});
|
||||
let _ = event_processor.process_event(TypedExecEvent::Warning(message));
|
||||
}
|
||||
|
||||
info!("Codex initialized with event: {session_configured:?}");
|
||||
@@ -748,34 +733,30 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
let mut interrupt_channel_open = true;
|
||||
let primary_thread_id_for_requests = primary_thread_id.to_string();
|
||||
loop {
|
||||
let server_event = if let Some(event) = buffered_events.pop_front() {
|
||||
Some(event)
|
||||
} else {
|
||||
tokio::select! {
|
||||
maybe_interrupt = interrupt_rx.recv(), if interrupt_channel_open => {
|
||||
if maybe_interrupt.is_none() {
|
||||
interrupt_channel_open = false;
|
||||
continue;
|
||||
}
|
||||
if let Err(err) = send_request_with_response::<TurnInterruptResponse>(
|
||||
&client,
|
||||
ClientRequest::TurnInterrupt {
|
||||
request_id: request_ids.next(),
|
||||
params: TurnInterruptParams {
|
||||
thread_id: primary_thread_id_for_requests.clone(),
|
||||
turn_id: task_id.clone(),
|
||||
},
|
||||
},
|
||||
"turn/interrupt",
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("turn/interrupt failed: {err}");
|
||||
}
|
||||
let server_event = tokio::select! {
|
||||
maybe_interrupt = interrupt_rx.recv(), if interrupt_channel_open => {
|
||||
if maybe_interrupt.is_none() {
|
||||
interrupt_channel_open = false;
|
||||
continue;
|
||||
}
|
||||
maybe_event = client.next_event() => maybe_event,
|
||||
if let Err(err) = send_request_with_response::<TurnInterruptResponse>(
|
||||
&client,
|
||||
ClientRequest::TurnInterrupt {
|
||||
request_id: request_ids.next(),
|
||||
params: TurnInterruptParams {
|
||||
thread_id: primary_thread_id_for_requests.clone(),
|
||||
turn_id: task_id.clone(),
|
||||
},
|
||||
},
|
||||
"turn/interrupt",
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("turn/interrupt failed: {err}");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
maybe_event = client.next_event() => maybe_event,
|
||||
};
|
||||
|
||||
let Some(server_event) = server_event else {
|
||||
@@ -784,69 +765,37 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
|
||||
match server_event {
|
||||
InProcessServerEvent::ServerRequest(request) => {
|
||||
handle_server_request(
|
||||
&client,
|
||||
request,
|
||||
&config,
|
||||
&primary_thread_id_for_requests,
|
||||
&mut error_seen,
|
||||
)
|
||||
.await;
|
||||
handle_server_request(&client, request, &mut error_seen).await;
|
||||
}
|
||||
InProcessServerEvent::ServerNotification(notification) => {
|
||||
if let ServerNotification::Error(payload) = ¬ification
|
||||
if let ServerNotification::Error(payload) = ¬ification {
|
||||
if payload.thread_id == primary_thread_id_for_requests
|
||||
&& payload.turn_id == task_id
|
||||
&& !payload.will_retry
|
||||
{
|
||||
error_seen = true;
|
||||
}
|
||||
} else if let ServerNotification::TurnCompleted(payload) = ¬ification
|
||||
&& payload.thread_id == primary_thread_id_for_requests
|
||||
&& payload.turn_id == task_id
|
||||
&& !payload.will_retry
|
||||
&& payload.turn.id == task_id
|
||||
&& matches!(
|
||||
payload.turn.status,
|
||||
codex_app_server_protocol::TurnStatus::Failed
|
||||
| codex_app_server_protocol::TurnStatus::Interrupted
|
||||
)
|
||||
{
|
||||
error_seen = true;
|
||||
}
|
||||
}
|
||||
InProcessServerEvent::LegacyNotification(notification) => {
|
||||
let decoded = match decode_legacy_notification(notification) {
|
||||
Ok(event) => event,
|
||||
Err(err) => {
|
||||
warn!("{err}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if decoded.conversation_id.as_deref()
|
||||
!= Some(primary_thread_id_for_requests.as_str())
|
||||
&& decoded.conversation_id.is_some()
|
||||
|
||||
if should_process_notification(
|
||||
¬ification,
|
||||
&primary_thread_id_for_requests,
|
||||
&task_id,
|
||||
) && let Some(event) = TypedExecEvent::from_server_notification(notification)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let event = decoded.event;
|
||||
if matches!(event.msg, EventMsg::SessionConfigured(_)) {
|
||||
continue;
|
||||
}
|
||||
if matches!(event.msg, EventMsg::Error(_)) {
|
||||
// The legacy bridge still carries fatal turn failures for
|
||||
// exec. Preserve the non-zero exit behavior until this
|
||||
// path is fully replaced by typed server notifications.
|
||||
error_seen = true;
|
||||
}
|
||||
match &event.msg {
|
||||
EventMsg::TurnComplete(payload) => {
|
||||
if payload.turn_id != task_id {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
EventMsg::TurnAborted(payload) => {
|
||||
if payload.turn_id.as_deref() != Some(task_id.as_str()) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
EventMsg::McpStartupUpdate(update) => {
|
||||
if required_mcp_servers.contains(&update.server)
|
||||
&& let codex_protocol::protocol::McpStartupStatus::Failed { error } =
|
||||
&update.status
|
||||
{
|
||||
error_seen = true;
|
||||
eprintln!(
|
||||
"Required MCP server '{}' failed to initialize: {error}",
|
||||
update.server
|
||||
);
|
||||
match event_processor.process_event(event) {
|
||||
CodexStatus::Running => {}
|
||||
CodexStatus::InitiateShutdown => {
|
||||
if let Err(err) = request_shutdown(
|
||||
&client,
|
||||
&mut request_ids,
|
||||
@@ -859,38 +808,14 @@ async fn run_exec_session(args: ExecRunArgs) -> anyhow::Result<()> {
|
||||
break;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
match event_processor.process_event(event) {
|
||||
CodexStatus::Running => {}
|
||||
CodexStatus::InitiateShutdown => {
|
||||
if let Err(err) = request_shutdown(
|
||||
&client,
|
||||
&mut request_ids,
|
||||
&primary_thread_id_for_requests,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("thread/unsubscribe failed during shutdown: {err}");
|
||||
}
|
||||
break;
|
||||
}
|
||||
CodexStatus::Shutdown => {
|
||||
// `ShutdownComplete` does not identify which attached
|
||||
// thread emitted it, so subagent shutdowns must not end
|
||||
// the primary exec loop early.
|
||||
}
|
||||
}
|
||||
}
|
||||
InProcessServerEvent::Lagged { skipped } => {
|
||||
let message = lagged_event_warning_message(skipped);
|
||||
warn!("{message}");
|
||||
let _ = event_processor.process_event(Event {
|
||||
id: String::new(),
|
||||
msg: EventMsg::Warning(codex_protocol::protocol::WarningEvent { message }),
|
||||
});
|
||||
let _ = event_processor.process_event(TypedExecEvent::Warning(message));
|
||||
}
|
||||
InProcessServerEvent::LegacyNotification(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -936,10 +861,9 @@ fn thread_start_params_from_config(config: &Config) -> ThreadStartParams {
|
||||
}
|
||||
}
|
||||
|
||||
fn thread_resume_params_from_config(config: &Config, path: Option<PathBuf>) -> ThreadResumeParams {
|
||||
fn thread_resume_params_from_config(config: &Config, thread_id: String) -> ThreadResumeParams {
|
||||
ThreadResumeParams {
|
||||
thread_id: "resume".to_string(),
|
||||
path,
|
||||
thread_id,
|
||||
model: config.model.clone(),
|
||||
model_provider: Some(config.model_provider_id.clone()),
|
||||
cwd: Some(config.cwd.to_string_lossy().to_string()),
|
||||
@@ -1017,56 +941,6 @@ fn session_configured_from_thread_resume_response(
|
||||
)
|
||||
}
|
||||
|
||||
#[expect(
|
||||
clippy::too_many_arguments,
|
||||
reason = "session mapping keeps explicit fields"
|
||||
)]
|
||||
/// Synthesizes startup session metadata from `thread/start` or `thread/resume`.
|
||||
///
|
||||
/// This is a compatibility bridge for the current in-process architecture.
|
||||
/// Some session fields are not available synchronously from the start/resume
|
||||
/// response, so callers must treat the result as a best-effort fallback until
|
||||
/// a later `SessionConfigured` event proves otherwise.
|
||||
/// TODO(architecture): stop synthesizing a partial `SessionConfiguredEvent`
|
||||
/// here. Either return the authoritative session-configured payload from
|
||||
/// `thread/start`/`thread/resume`, or introduce a smaller bootstrap type for
|
||||
/// exec so this path cannot accidentally depend on placeholder fields.
|
||||
fn session_configured_from_thread_response(
|
||||
thread_id: &str,
|
||||
thread_name: Option<String>,
|
||||
rollout_path: Option<PathBuf>,
|
||||
model: String,
|
||||
model_provider_id: String,
|
||||
service_tier: Option<codex_protocol::config_types::ServiceTier>,
|
||||
approval_policy: AskForApproval,
|
||||
approvals_reviewer: codex_protocol::config_types::ApprovalsReviewer,
|
||||
sandbox_policy: codex_protocol::protocol::SandboxPolicy,
|
||||
cwd: PathBuf,
|
||||
reasoning_effort: Option<codex_protocol::openai_models::ReasoningEffort>,
|
||||
) -> Result<SessionConfiguredEvent, String> {
|
||||
let session_id = codex_protocol::ThreadId::from_string(thread_id)
|
||||
.map_err(|err| format!("thread id `{thread_id}` is invalid: {err}"))?;
|
||||
|
||||
Ok(SessionConfiguredEvent {
|
||||
session_id,
|
||||
forked_from_id: None,
|
||||
thread_name,
|
||||
model,
|
||||
model_provider_id,
|
||||
service_tier,
|
||||
approval_policy,
|
||||
approvals_reviewer,
|
||||
sandbox_policy,
|
||||
cwd,
|
||||
reasoning_effort,
|
||||
history_log_id: 0,
|
||||
history_entry_count: 0,
|
||||
initial_messages: None,
|
||||
network_proxy: None,
|
||||
rollout_path,
|
||||
})
|
||||
}
|
||||
|
||||
fn review_target_to_api(target: ReviewTarget) -> ApiReviewTarget {
|
||||
match target {
|
||||
ReviewTarget::UncommittedChanges => ApiReviewTarget::UncommittedChanges,
|
||||
@@ -1076,62 +950,157 @@ fn review_target_to_api(target: ReviewTarget) -> ApiReviewTarget {
|
||||
}
|
||||
}
|
||||
|
||||
fn normalize_legacy_notification_method(method: &str) -> &str {
|
||||
method.strip_prefix("codex/event/").unwrap_or(method)
|
||||
}
|
||||
|
||||
fn lagged_event_warning_message(skipped: usize) -> String {
|
||||
format!("in-process app-server event stream lagged; dropped {skipped} events")
|
||||
}
|
||||
|
||||
struct DecodedLegacyNotification {
|
||||
conversation_id: Option<String>,
|
||||
event: Event,
|
||||
fn should_process_notification(
|
||||
notification: &ServerNotification,
|
||||
thread_id: &str,
|
||||
turn_id: &str,
|
||||
) -> bool {
|
||||
match notification {
|
||||
ServerNotification::ConfigWarning(_) | ServerNotification::DeprecationNotice(_) => true,
|
||||
ServerNotification::Error(notification) => {
|
||||
notification.thread_id == thread_id && notification.turn_id == turn_id
|
||||
}
|
||||
ServerNotification::HookCompleted(notification) => {
|
||||
notification.thread_id == thread_id
|
||||
&& notification
|
||||
.turn_id
|
||||
.as_deref()
|
||||
.is_none_or(|candidate| candidate == turn_id)
|
||||
}
|
||||
ServerNotification::HookStarted(notification) => {
|
||||
notification.thread_id == thread_id
|
||||
&& notification
|
||||
.turn_id
|
||||
.as_deref()
|
||||
.is_none_or(|candidate| candidate == turn_id)
|
||||
}
|
||||
ServerNotification::ItemCompleted(notification) => {
|
||||
notification.thread_id == thread_id && notification.turn_id == turn_id
|
||||
}
|
||||
ServerNotification::ItemStarted(notification) => {
|
||||
notification.thread_id == thread_id && notification.turn_id == turn_id
|
||||
}
|
||||
ServerNotification::ModelRerouted(notification) => {
|
||||
notification.thread_id == thread_id && notification.turn_id == turn_id
|
||||
}
|
||||
ServerNotification::ThreadTokenUsageUpdated(notification) => {
|
||||
notification.thread_id == thread_id && notification.turn_id == turn_id
|
||||
}
|
||||
ServerNotification::TurnCompleted(notification) => {
|
||||
notification.thread_id == thread_id && notification.turn.id == turn_id
|
||||
}
|
||||
ServerNotification::TurnDiffUpdated(notification) => {
|
||||
notification.thread_id == thread_id && notification.turn_id == turn_id
|
||||
}
|
||||
ServerNotification::TurnPlanUpdated(notification) => {
|
||||
notification.thread_id == thread_id && notification.turn_id == turn_id
|
||||
}
|
||||
ServerNotification::TurnStarted(notification) => {
|
||||
notification.thread_id == thread_id && notification.turn.id == turn_id
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn decode_legacy_notification(
|
||||
notification: JSONRPCNotification,
|
||||
) -> Result<DecodedLegacyNotification, String> {
|
||||
let value = notification
|
||||
.params
|
||||
.unwrap_or_else(|| serde_json::Value::Object(serde_json::Map::new()));
|
||||
let method = notification.method;
|
||||
let normalized_method = normalize_legacy_notification_method(&method).to_string();
|
||||
let serde_json::Value::Object(mut object) = value else {
|
||||
return Err(format!(
|
||||
"legacy notification `{method}` params were not an object"
|
||||
));
|
||||
};
|
||||
let conversation_id = object
|
||||
.get("conversationId")
|
||||
.and_then(serde_json::Value::as_str)
|
||||
.map(str::to_owned);
|
||||
let mut event_payload = if let Some(serde_json::Value::Object(msg_payload)) = object.get("msg")
|
||||
{
|
||||
serde_json::Value::Object(msg_payload.clone())
|
||||
} else {
|
||||
object.remove("conversationId");
|
||||
serde_json::Value::Object(object)
|
||||
};
|
||||
let serde_json::Value::Object(ref mut object) = event_payload else {
|
||||
return Err(format!(
|
||||
"legacy notification `{method}` event payload was not an object"
|
||||
));
|
||||
};
|
||||
object.insert(
|
||||
"type".to_string(),
|
||||
serde_json::Value::String(normalized_method),
|
||||
);
|
||||
fn all_thread_source_kinds() -> Vec<ThreadSourceKind> {
|
||||
vec![
|
||||
ThreadSourceKind::Cli,
|
||||
ThreadSourceKind::VsCode,
|
||||
ThreadSourceKind::Exec,
|
||||
ThreadSourceKind::AppServer,
|
||||
ThreadSourceKind::Custom,
|
||||
ThreadSourceKind::SubAgent,
|
||||
ThreadSourceKind::SubAgentReview,
|
||||
ThreadSourceKind::SubAgentCompact,
|
||||
ThreadSourceKind::SubAgentThreadSpawn,
|
||||
ThreadSourceKind::SubAgentOther,
|
||||
ThreadSourceKind::Unknown,
|
||||
]
|
||||
}
|
||||
|
||||
let msg: EventMsg = serde_json::from_value(event_payload)
|
||||
.map_err(|err| format!("failed to decode event: {err}"))?;
|
||||
Ok(DecodedLegacyNotification {
|
||||
conversation_id,
|
||||
event: Event {
|
||||
id: String::new(),
|
||||
msg,
|
||||
},
|
||||
})
|
||||
async fn resolve_resume_thread_id(
|
||||
client: &InProcessAppServerClient,
|
||||
config: &Config,
|
||||
args: &crate::cli::ResumeArgs,
|
||||
) -> anyhow::Result<Option<String>> {
|
||||
let filter_cwd = if args.all {
|
||||
None
|
||||
} else {
|
||||
Some(config.cwd.to_string_lossy().to_string())
|
||||
};
|
||||
let model_providers = Some(vec![config.model_provider_id.clone()]);
|
||||
|
||||
if args.last {
|
||||
let response: ThreadListResponse = send_request_with_response(
|
||||
client,
|
||||
ClientRequest::ThreadList {
|
||||
request_id: RequestId::Integer(0),
|
||||
params: ThreadListParams {
|
||||
cursor: None,
|
||||
limit: Some(1),
|
||||
sort_key: Some(ThreadSortKey::UpdatedAt),
|
||||
model_providers,
|
||||
source_kinds: Some(all_thread_source_kinds()),
|
||||
archived: Some(false),
|
||||
cwd: filter_cwd,
|
||||
search_term: None,
|
||||
},
|
||||
},
|
||||
"thread/list",
|
||||
)
|
||||
.await
|
||||
.map_err(anyhow::Error::msg)?;
|
||||
return Ok(response.data.into_iter().next().map(|thread| thread.id));
|
||||
}
|
||||
|
||||
let Some(session_id) = args.session_id.as_deref() else {
|
||||
return Ok(None);
|
||||
};
|
||||
if Uuid::parse_str(session_id).is_ok() {
|
||||
return Ok(Some(session_id.to_string()));
|
||||
}
|
||||
|
||||
let mut cursor = None;
|
||||
loop {
|
||||
let response: ThreadListResponse = send_request_with_response(
|
||||
client,
|
||||
ClientRequest::ThreadList {
|
||||
request_id: RequestId::Integer(0),
|
||||
params: ThreadListParams {
|
||||
cursor,
|
||||
limit: Some(100),
|
||||
sort_key: Some(ThreadSortKey::UpdatedAt),
|
||||
model_providers: Some(vec![config.model_provider_id.clone()]),
|
||||
source_kinds: Some(all_thread_source_kinds()),
|
||||
archived: Some(false),
|
||||
cwd: if args.all {
|
||||
None
|
||||
} else {
|
||||
Some(config.cwd.to_string_lossy().to_string())
|
||||
},
|
||||
search_term: Some(session_id.to_string()),
|
||||
},
|
||||
},
|
||||
"thread/list",
|
||||
)
|
||||
.await
|
||||
.map_err(anyhow::Error::msg)?;
|
||||
if let Some(thread) = response
|
||||
.data
|
||||
.into_iter()
|
||||
.find(|thread| thread.name.as_deref() == Some(session_id))
|
||||
{
|
||||
return Ok(Some(thread.id));
|
||||
}
|
||||
let Some(next_cursor) = response.next_cursor else {
|
||||
return Ok(None);
|
||||
};
|
||||
cursor = Some(next_cursor);
|
||||
}
|
||||
}
|
||||
|
||||
fn canceled_mcp_server_elicitation_response() -> Result<Value, String> {
|
||||
@@ -1205,8 +1174,6 @@ fn server_request_method_name(request: &ServerRequest) -> String {
|
||||
async fn handle_server_request(
|
||||
client: &InProcessAppServerClient,
|
||||
request: ServerRequest,
|
||||
config: &Config,
|
||||
_thread_id: &str,
|
||||
error_seen: &mut bool,
|
||||
) {
|
||||
let method = server_request_method_name(&request);
|
||||
@@ -1228,50 +1195,6 @@ async fn handle_server_request(
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
ServerRequest::ChatgptAuthTokensRefresh { request_id, params } => {
|
||||
let refresh_result = tokio::task::spawn_blocking({
|
||||
let config = config.clone();
|
||||
move || local_external_chatgpt_tokens(&config)
|
||||
})
|
||||
.await;
|
||||
|
||||
match refresh_result {
|
||||
Err(err) => {
|
||||
reject_server_request(
|
||||
client,
|
||||
request_id,
|
||||
&method,
|
||||
format!("local chatgpt auth refresh task failed in exec: {err}"),
|
||||
)
|
||||
.await
|
||||
}
|
||||
Ok(Err(reason)) => reject_server_request(client, request_id, &method, reason).await,
|
||||
Ok(Ok(response)) => {
|
||||
if let Some(previous_account_id) = params.previous_account_id.as_deref()
|
||||
&& previous_account_id != response.chatgpt_account_id
|
||||
{
|
||||
warn!(
|
||||
"local auth refresh account mismatch: expected `{previous_account_id}`, got `{}`",
|
||||
response.chatgpt_account_id
|
||||
);
|
||||
}
|
||||
match serde_json::to_value(response) {
|
||||
Ok(value) => {
|
||||
resolve_server_request(
|
||||
client,
|
||||
request_id,
|
||||
value,
|
||||
"account/chatgptAuthTokens/refresh",
|
||||
)
|
||||
.await
|
||||
}
|
||||
Err(err) => Err(format!(
|
||||
"failed to serialize chatgpt auth refresh response: {err}"
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
ServerRequest::CommandExecutionRequestApproval { request_id, params } => {
|
||||
reject_server_request(
|
||||
client,
|
||||
@@ -1356,6 +1279,16 @@ async fn handle_server_request(
|
||||
)
|
||||
.await
|
||||
}
|
||||
ServerRequest::ChatgptAuthTokensRefresh { request_id, .. } => {
|
||||
reject_server_request(
|
||||
client,
|
||||
request_id,
|
||||
&method,
|
||||
"chatgpt auth refresh should be handled by the in-process app-server client"
|
||||
.to_string(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(err) = handle_result {
|
||||
@@ -1364,91 +1297,6 @@ async fn handle_server_request(
|
||||
}
|
||||
}
|
||||
|
||||
fn local_external_chatgpt_tokens(
|
||||
config: &Config,
|
||||
) -> Result<ChatgptAuthTokensRefreshResponse, String> {
|
||||
let auth_manager = AuthManager::shared(
|
||||
config.codex_home.clone(),
|
||||
/*enable_codex_api_key_env*/ false,
|
||||
config.cli_auth_credentials_store_mode,
|
||||
);
|
||||
auth_manager.set_forced_chatgpt_workspace_id(config.forced_chatgpt_workspace_id.clone());
|
||||
auth_manager.reload();
|
||||
|
||||
let auth = auth_manager
|
||||
.auth_cached()
|
||||
.ok_or_else(|| "no cached auth available for local token refresh".to_string())?;
|
||||
if !auth.is_external_chatgpt_tokens() {
|
||||
return Err("external ChatGPT token auth is not active".to_string());
|
||||
}
|
||||
|
||||
let access_token = auth
|
||||
.get_token()
|
||||
.map_err(|err| format!("failed to read external access token: {err}"))?;
|
||||
let chatgpt_account_id = auth
|
||||
.get_account_id()
|
||||
.ok_or_else(|| "external token auth is missing chatgpt account id".to_string())?;
|
||||
let chatgpt_plan_type = auth.account_plan_type().map(|plan_type| match plan_type {
|
||||
AccountPlanType::Free => "free".to_string(),
|
||||
AccountPlanType::Go => "go".to_string(),
|
||||
AccountPlanType::Plus => "plus".to_string(),
|
||||
AccountPlanType::Pro => "pro".to_string(),
|
||||
AccountPlanType::Team => "team".to_string(),
|
||||
AccountPlanType::Business => "business".to_string(),
|
||||
AccountPlanType::Enterprise => "enterprise".to_string(),
|
||||
AccountPlanType::Edu => "edu".to_string(),
|
||||
AccountPlanType::Unknown => "unknown".to_string(),
|
||||
});
|
||||
|
||||
Ok(ChatgptAuthTokensRefreshResponse {
|
||||
access_token,
|
||||
chatgpt_account_id,
|
||||
chatgpt_plan_type,
|
||||
})
|
||||
}
|
||||
|
||||
async fn resolve_resume_path(
|
||||
config: &Config,
|
||||
args: &crate::cli::ResumeArgs,
|
||||
) -> anyhow::Result<Option<PathBuf>> {
|
||||
if args.last {
|
||||
let default_provider_filter = vec![config.model_provider_id.clone()];
|
||||
let filter_cwd = if args.all {
|
||||
None
|
||||
} else {
|
||||
Some(config.cwd.as_path())
|
||||
};
|
||||
match codex_core::RolloutRecorder::find_latest_thread_path(
|
||||
config,
|
||||
/*page_size*/ 1,
|
||||
/*cursor*/ None,
|
||||
codex_core::ThreadSortKey::UpdatedAt,
|
||||
&[],
|
||||
Some(default_provider_filter.as_slice()),
|
||||
&config.model_provider_id,
|
||||
filter_cwd,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(path) => Ok(path),
|
||||
Err(e) => {
|
||||
error!("Error listing threads: {e}");
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
} else if let Some(id_str) = args.session_id.as_deref() {
|
||||
if Uuid::parse_str(id_str).is_ok() {
|
||||
let path = find_thread_path_by_id_str(&config.codex_home, id_str).await?;
|
||||
Ok(path)
|
||||
} else {
|
||||
let path = find_thread_path_by_name_str(&config.codex_home, id_str).await?;
|
||||
Ok(path)
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn load_output_schema(path: Option<PathBuf>) -> Option<Value> {
|
||||
let path = path?;
|
||||
|
||||
@@ -1806,29 +1654,6 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decode_legacy_notification_preserves_conversation_id() {
|
||||
let decoded = decode_legacy_notification(JSONRPCNotification {
|
||||
method: "codex/event/error".to_string(),
|
||||
params: Some(serde_json::json!({
|
||||
"conversationId": "thread-123",
|
||||
"msg": {
|
||||
"message": "boom"
|
||||
}
|
||||
})),
|
||||
})
|
||||
.expect("legacy notification should decode");
|
||||
|
||||
assert_eq!(decoded.conversation_id.as_deref(), Some("thread-123"));
|
||||
assert!(matches!(
|
||||
decoded.event.msg,
|
||||
EventMsg::Error(codex_protocol::protocol::ErrorEvent {
|
||||
message,
|
||||
codex_error_info: None,
|
||||
}) if message == "boom"
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn canceled_mcp_server_elicitation_response_uses_cancel_action() {
|
||||
let value = canceled_mcp_server_elicitation_response()
|
||||
|
||||
Reference in New Issue
Block a user