mirror of
https://github.com/openai/codex.git
synced 2026-03-05 06:03:20 +00:00
Compare commits
1 Commits
fix/notify
...
pakrym/hah
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f564b7fed8 |
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
@@ -2245,6 +2245,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"clap",
|
||||
"codex-ansi-escape",
|
||||
"codex-app-server",
|
||||
"codex-app-server-protocol",
|
||||
"codex-arg0",
|
||||
"codex-backend-client",
|
||||
|
||||
@@ -235,6 +235,11 @@ client_request_definitions! {
|
||||
params: v2::ThreadReadParams,
|
||||
response: v2::ThreadReadResponse,
|
||||
},
|
||||
#[experimental("thread/submitOp")]
|
||||
ThreadSubmitOp => "thread/submitOp" {
|
||||
params: v2::ThreadSubmitOpParams,
|
||||
response: v2::ThreadSubmitOpResponse,
|
||||
},
|
||||
SkillsList => "skills/list" {
|
||||
params: v2::SkillsListParams,
|
||||
response: v2::SkillsListResponse,
|
||||
|
||||
@@ -1962,6 +1962,21 @@ pub struct ThreadReadResponse {
|
||||
pub thread: Thread,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadSubmitOpParams {
|
||||
pub thread_id: String,
|
||||
/// Serialized `codex_protocol::protocol::Op` payload used as a temporary
|
||||
/// compatibility bridge while clients migrate to typed app-server methods.
|
||||
pub op: JsonValue,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
pub struct ThreadSubmitOpResponse {}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export_to = "v2/")]
|
||||
|
||||
@@ -149,6 +149,8 @@ use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadStartedNotification;
|
||||
use codex_app_server_protocol::ThreadStatus;
|
||||
use codex_app_server_protocol::ThreadSubmitOpParams;
|
||||
use codex_app_server_protocol::ThreadSubmitOpResponse;
|
||||
use codex_app_server_protocol::ThreadUnarchiveParams;
|
||||
use codex_app_server_protocol::ThreadUnarchiveResponse;
|
||||
use codex_app_server_protocol::ThreadUnarchivedNotification;
|
||||
@@ -586,6 +588,10 @@ impl CodexMessageProcessor {
|
||||
self.thread_read(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::ThreadSubmitOp { request_id, params } => {
|
||||
self.thread_submit_op(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
}
|
||||
ClientRequest::SkillsList { request_id, params } => {
|
||||
self.skills_list(to_connection_request_id(request_id), params)
|
||||
.await;
|
||||
@@ -2784,6 +2790,46 @@ impl CodexMessageProcessor {
|
||||
self.outgoing.send_response(request_id, response).await;
|
||||
}
|
||||
|
||||
async fn thread_submit_op(
|
||||
&self,
|
||||
request_id: ConnectionRequestId,
|
||||
params: ThreadSubmitOpParams,
|
||||
) {
|
||||
let ThreadSubmitOpParams { thread_id, op } = params;
|
||||
|
||||
let (_, thread) = match self.load_thread(&thread_id).await {
|
||||
Ok(v) => v,
|
||||
Err(error) => {
|
||||
self.outgoing.send_error(request_id, error).await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let op = match serde_json::from_value::<Op>(op) {
|
||||
Ok(op) => op,
|
||||
Err(err) => {
|
||||
self.send_invalid_request_error(
|
||||
request_id,
|
||||
format!("invalid core op payload: {err}"),
|
||||
)
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
match thread.submit(op).await {
|
||||
Ok(_) => {
|
||||
self.outgoing
|
||||
.send_response(request_id, ThreadSubmitOpResponse {})
|
||||
.await;
|
||||
}
|
||||
Err(err) => {
|
||||
self.send_internal_error(request_id, format!("failed to submit op: {err}"))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
|
||||
self.thread_manager.subscribe_thread_created()
|
||||
}
|
||||
|
||||
158
codex-rs/app-server/src/embedded_session.rs
Normal file
158
codex-rs/app-server/src/embedded_session.rs
Normal file
@@ -0,0 +1,158 @@
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::codex_message_processor::CodexMessageProcessor;
|
||||
use crate::codex_message_processor::CodexMessageProcessorArgs;
|
||||
use crate::outgoing_message::ConnectionId;
|
||||
use crate::outgoing_message::OutgoingEnvelope;
|
||||
use crate::outgoing_message::OutgoingMessage;
|
||||
use crate::outgoing_message::OutgoingMessageSender;
|
||||
use crate::transport::CHANNEL_CAPACITY;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::JSONRPCError;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_cloud_requirements::cloud_requirements_loader;
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::config::Config;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use tokio::sync::mpsc;
|
||||
use toml::Value as TomlValue;
|
||||
|
||||
const EMBEDDED_CONNECTION_ID: ConnectionId = ConnectionId(0);
|
||||
|
||||
#[derive(Debug)]
|
||||
enum EmbeddedSessionInput {
|
||||
Request(ClientRequest),
|
||||
Response(JSONRPCResponse),
|
||||
Error(JSONRPCError),
|
||||
}
|
||||
|
||||
pub struct EmbeddedSessionClientArgs {
|
||||
pub auth_manager: Arc<AuthManager>,
|
||||
pub thread_manager: Arc<ThreadManager>,
|
||||
pub config: Config,
|
||||
pub cli_overrides: Vec<(String, TomlValue)>,
|
||||
pub feedback: CodexFeedback,
|
||||
pub codex_linux_sandbox_exe: Option<PathBuf>,
|
||||
}
|
||||
|
||||
pub struct EmbeddedSessionClient {
|
||||
input_tx: mpsc::Sender<EmbeddedSessionInput>,
|
||||
output_rx: mpsc::Receiver<JSONRPCMessage>,
|
||||
}
|
||||
|
||||
impl EmbeddedSessionClient {
|
||||
pub fn spawn(args: EmbeddedSessionClientArgs) -> Self {
|
||||
let (input_tx, mut input_rx) = mpsc::channel::<EmbeddedSessionInput>(CHANNEL_CAPACITY);
|
||||
let (output_tx, output_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
|
||||
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
|
||||
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
|
||||
|
||||
let EmbeddedSessionClientArgs {
|
||||
auth_manager,
|
||||
thread_manager,
|
||||
config,
|
||||
cli_overrides,
|
||||
feedback,
|
||||
codex_linux_sandbox_exe,
|
||||
} = args;
|
||||
let config = Arc::new(config);
|
||||
let cloud_requirements = Arc::new(std::sync::RwLock::new(cloud_requirements_loader(
|
||||
auth_manager.clone(),
|
||||
config.chatgpt_base_url.clone(),
|
||||
config.codex_home.clone(),
|
||||
)));
|
||||
|
||||
let mut processor = CodexMessageProcessor::new(CodexMessageProcessorArgs {
|
||||
auth_manager,
|
||||
thread_manager,
|
||||
outgoing: outgoing.clone(),
|
||||
codex_linux_sandbox_exe,
|
||||
config,
|
||||
cli_overrides,
|
||||
cloud_requirements,
|
||||
single_client_mode: true,
|
||||
feedback,
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(envelope) = outgoing_rx.recv().await {
|
||||
let message = match envelope {
|
||||
OutgoingEnvelope::ToConnection {
|
||||
connection_id,
|
||||
message,
|
||||
} => {
|
||||
if connection_id != EMBEDDED_CONNECTION_ID {
|
||||
continue;
|
||||
}
|
||||
outgoing_message_to_jsonrpc(message)
|
||||
}
|
||||
OutgoingEnvelope::Broadcast { message } => outgoing_message_to_jsonrpc(message),
|
||||
};
|
||||
|
||||
if output_tx.send(message).await.is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(input) = input_rx.recv().await {
|
||||
match input {
|
||||
EmbeddedSessionInput::Request(request) => {
|
||||
processor
|
||||
.process_request(EMBEDDED_CONNECTION_ID, request)
|
||||
.await;
|
||||
}
|
||||
EmbeddedSessionInput::Response(response) => {
|
||||
outgoing
|
||||
.notify_client_response(response.id, response.result)
|
||||
.await;
|
||||
}
|
||||
EmbeddedSessionInput::Error(error) => {
|
||||
outgoing.notify_client_error(error.id, error.error).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
processor.connection_closed(EMBEDDED_CONNECTION_ID).await;
|
||||
});
|
||||
|
||||
Self {
|
||||
input_tx,
|
||||
output_rx,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send_request(&self, request: ClientRequest) -> std::io::Result<()> {
|
||||
self.input_tx
|
||||
.send(EmbeddedSessionInput::Request(request))
|
||||
.await
|
||||
.map_err(|_| std::io::Error::other("embedded app-server session is closed"))
|
||||
}
|
||||
|
||||
pub async fn send_response(&self, response: JSONRPCResponse) -> std::io::Result<()> {
|
||||
self.input_tx
|
||||
.send(EmbeddedSessionInput::Response(response))
|
||||
.await
|
||||
.map_err(|_| std::io::Error::other("embedded app-server session is closed"))
|
||||
}
|
||||
|
||||
pub async fn send_error(&self, error: JSONRPCError) -> std::io::Result<()> {
|
||||
self.input_tx
|
||||
.send(EmbeddedSessionInput::Error(error))
|
||||
.await
|
||||
.map_err(|_| std::io::Error::other("embedded app-server session is closed"))
|
||||
}
|
||||
|
||||
pub async fn recv(&mut self) -> Option<JSONRPCMessage> {
|
||||
self.output_rx.recv().await
|
||||
}
|
||||
}
|
||||
|
||||
fn outgoing_message_to_jsonrpc(message: OutgoingMessage) -> JSONRPCMessage {
|
||||
let value =
|
||||
serde_json::to_value(message).expect("outgoing app-server message should serialize");
|
||||
serde_json::from_value(value).expect("outgoing app-server message should decode as JSON-RPC")
|
||||
}
|
||||
@@ -56,6 +56,7 @@ mod bespoke_event_handling;
|
||||
mod codex_message_processor;
|
||||
mod config_api;
|
||||
mod dynamic_tools;
|
||||
mod embedded_session;
|
||||
mod error_code;
|
||||
mod filters;
|
||||
mod fuzzy_file_search;
|
||||
@@ -67,6 +68,8 @@ mod thread_status;
|
||||
mod transport;
|
||||
|
||||
pub use crate::transport::AppServerTransport;
|
||||
pub use embedded_session::EmbeddedSessionClient;
|
||||
pub use embedded_session::EmbeddedSessionClientArgs;
|
||||
|
||||
const LOG_FORMAT_ENV_VAR: &str = "LOG_FORMAT";
|
||||
|
||||
|
||||
@@ -27,6 +27,7 @@ base64 = { workspace = true }
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
clap = { workspace = true, features = ["derive"] }
|
||||
codex-ansi-escape = { workspace = true }
|
||||
codex-app-server = { workspace = true }
|
||||
codex-app-server-protocol = { workspace = true }
|
||||
codex-arg0 = { workspace = true }
|
||||
codex-backend-client = { workspace = true }
|
||||
|
||||
@@ -2616,7 +2616,13 @@ impl ChatWidget {
|
||||
let prevent_idle_sleep = config.features.enabled(Feature::PreventIdleSleep);
|
||||
let mut rng = rand::rng();
|
||||
let placeholder = PLACEHOLDERS[rng.random_range(0..PLACEHOLDERS.len())].to_string();
|
||||
let codex_op_tx = spawn_agent(config.clone(), app_event_tx.clone(), thread_manager);
|
||||
let codex_op_tx = spawn_agent(
|
||||
config.clone(),
|
||||
app_event_tx.clone(),
|
||||
thread_manager,
|
||||
auth_manager.clone(),
|
||||
feedback.clone(),
|
||||
);
|
||||
|
||||
let model_override = model.as_deref();
|
||||
let model_for_header = model
|
||||
|
||||
@@ -1,73 +1,152 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use codex_app_server::EmbeddedSessionClient;
|
||||
use codex_app_server::EmbeddedSessionClientArgs;
|
||||
use codex_app_server_protocol::ClientRequest;
|
||||
use codex_app_server_protocol::CommandExecutionApprovalDecision;
|
||||
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
|
||||
use codex_app_server_protocol::DynamicToolCallOutputContentItem as V2DynamicToolCallOutputContentItem;
|
||||
use codex_app_server_protocol::DynamicToolCallResponse as V2DynamicToolCallResponse;
|
||||
use codex_app_server_protocol::FileChangeApprovalDecision;
|
||||
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
|
||||
use codex_app_server_protocol::JSONRPCMessage;
|
||||
use codex_app_server_protocol::JSONRPCResponse;
|
||||
use codex_app_server_protocol::RequestId as AppServerRequestId;
|
||||
use codex_app_server_protocol::ServerRequest;
|
||||
use codex_app_server_protocol::ThreadStartParams;
|
||||
use codex_app_server_protocol::ThreadStartResponse;
|
||||
use codex_app_server_protocol::ThreadSubmitOpParams;
|
||||
use codex_app_server_protocol::ToolRequestUserInputAnswer;
|
||||
use codex_app_server_protocol::ToolRequestUserInputResponse;
|
||||
use codex_core::AuthManager;
|
||||
use codex_core::CodexThread;
|
||||
use codex_core::NewThread;
|
||||
use codex_core::ThreadManager;
|
||||
use codex_core::config::Config;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem as CoreDynamicToolCallOutputContentItem;
|
||||
use codex_protocol::dynamic_tools::DynamicToolResponse as CoreDynamicToolResponse;
|
||||
use codex_protocol::protocol::Event;
|
||||
use codex_protocol::protocol::EventMsg;
|
||||
use codex_protocol::protocol::Op;
|
||||
use codex_protocol::protocol::ReviewDecision;
|
||||
use tokio::sync::mpsc::UnboundedSender;
|
||||
use tokio::sync::mpsc::unbounded_channel;
|
||||
|
||||
use crate::app_event::AppEvent;
|
||||
use crate::app_event_sender::AppEventSender;
|
||||
|
||||
#[derive(Default)]
|
||||
struct PendingServerRequests {
|
||||
exec_approval_by_id: HashMap<String, AppServerRequestId>,
|
||||
patch_approval_by_item_id: HashMap<String, AppServerRequestId>,
|
||||
user_input_by_turn_id: HashMap<String, AppServerRequestId>,
|
||||
dynamic_tool_by_call_id: HashMap<String, AppServerRequestId>,
|
||||
}
|
||||
|
||||
/// Spawn the agent bootstrapper and op forwarding loop, returning the
|
||||
/// `UnboundedSender<Op>` used by the UI to submit operations.
|
||||
pub(crate) fn spawn_agent(
|
||||
config: Config,
|
||||
app_event_tx: AppEventSender,
|
||||
server: Arc<ThreadManager>,
|
||||
auth_manager: Arc<AuthManager>,
|
||||
feedback: CodexFeedback,
|
||||
) -> UnboundedSender<Op> {
|
||||
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
|
||||
|
||||
let app_event_tx_clone = app_event_tx;
|
||||
tokio::spawn(async move {
|
||||
let NewThread {
|
||||
thread,
|
||||
session_configured,
|
||||
..
|
||||
} = match server.start_thread(config).await {
|
||||
Ok(v) => v,
|
||||
Err(err) => {
|
||||
let message = format!("Failed to initialize codex: {err}");
|
||||
tracing::error!("{message}");
|
||||
app_event_tx_clone.send(AppEvent::CodexEvent(Event {
|
||||
id: "".to_string(),
|
||||
msg: EventMsg::Error(err.to_error_event(None)),
|
||||
}));
|
||||
app_event_tx_clone.send(AppEvent::FatalExitRequest(message));
|
||||
tracing::error!("failed to initialize codex: {err}");
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Forward the captured `SessionConfigured` event so it can be rendered in the UI.
|
||||
let ev = codex_protocol::protocol::Event {
|
||||
// The `id` does not matter for rendering, so we can use a fake value.
|
||||
id: "".to_string(),
|
||||
msg: codex_protocol::protocol::EventMsg::SessionConfigured(session_configured),
|
||||
};
|
||||
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
|
||||
|
||||
let thread_clone = thread.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Some(op) = codex_op_rx.recv().await {
|
||||
let id = thread_clone.submit(op).await;
|
||||
if let Err(e) = id {
|
||||
tracing::error!("failed to submit op: {e}");
|
||||
}
|
||||
}
|
||||
let mut client = EmbeddedSessionClient::spawn(EmbeddedSessionClientArgs {
|
||||
auth_manager,
|
||||
thread_manager: server,
|
||||
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
|
||||
config: config.clone(),
|
||||
cli_overrides: Vec::new(),
|
||||
feedback,
|
||||
});
|
||||
|
||||
while let Ok(event) = thread.next_event().await {
|
||||
let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
|
||||
app_event_tx_clone.send(AppEvent::CodexEvent(event));
|
||||
if is_shutdown_complete {
|
||||
// ShutdownComplete is terminal for a thread; drop this receiver task so
|
||||
// the Arc<CodexThread> can be released and thread resources can clean up.
|
||||
break;
|
||||
let mut next_request_id = 1_i64;
|
||||
let start_request_id = next_request_id_value(&mut next_request_id);
|
||||
let start_request = ClientRequest::ThreadStart {
|
||||
request_id: start_request_id.clone(),
|
||||
params: thread_start_params_from_config(&config),
|
||||
};
|
||||
if let Err(err) = client.send_request(start_request).await {
|
||||
let message = format!("Failed to initialize codex app-server session: {err}");
|
||||
tracing::error!("{message}");
|
||||
app_event_tx.send(AppEvent::FatalExitRequest(message));
|
||||
return;
|
||||
}
|
||||
|
||||
let mut thread_id: Option<String> = None;
|
||||
let mut pending = PendingServerRequests::default();
|
||||
|
||||
while thread_id.is_none() {
|
||||
let Some(message) = client.recv().await else {
|
||||
let message =
|
||||
"Embedded app-server session closed before thread/start completed".to_string();
|
||||
tracing::error!("{message}");
|
||||
app_event_tx.send(AppEvent::FatalExitRequest(message));
|
||||
return;
|
||||
};
|
||||
if handle_app_server_message(
|
||||
&app_event_tx,
|
||||
&start_request_id,
|
||||
Some(&mut thread_id),
|
||||
&mut pending,
|
||||
message,
|
||||
) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
let Some(thread_id) = thread_id else {
|
||||
let message = "thread/start did not return a thread id".to_string();
|
||||
tracing::error!("{message}");
|
||||
app_event_tx.send(AppEvent::FatalExitRequest(message));
|
||||
return;
|
||||
};
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
maybe_op = codex_op_rx.recv() => {
|
||||
let Some(op) = maybe_op else {
|
||||
break;
|
||||
};
|
||||
|
||||
let handled = match try_handle_server_request_reply_op(&mut client, &mut pending, &op).await {
|
||||
Ok(handled) => handled,
|
||||
Err(err) => {
|
||||
tracing::error!("failed to answer app-server server request: {err}");
|
||||
false
|
||||
}
|
||||
};
|
||||
if handled {
|
||||
continue;
|
||||
}
|
||||
if is_server_request_reply_op(&op) {
|
||||
tracing::warn!("dropping reply op without pending app-server request: {op:?}");
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Err(err) = submit_thread_op(&client, &thread_id, &op, &mut next_request_id).await {
|
||||
tracing::error!("failed to submit op via app-server: {err}");
|
||||
}
|
||||
}
|
||||
maybe_message = client.recv() => {
|
||||
let Some(message) = maybe_message else {
|
||||
break;
|
||||
};
|
||||
if handle_app_server_message(
|
||||
&app_event_tx,
|
||||
&start_request_id,
|
||||
None,
|
||||
&mut pending,
|
||||
message,
|
||||
) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -75,6 +154,254 @@ pub(crate) fn spawn_agent(
|
||||
codex_op_tx
|
||||
}
|
||||
|
||||
fn handle_app_server_message(
|
||||
app_event_tx: &AppEventSender,
|
||||
start_request_id: &AppServerRequestId,
|
||||
thread_id: Option<&mut Option<String>>,
|
||||
pending: &mut PendingServerRequests,
|
||||
message: JSONRPCMessage,
|
||||
) -> bool {
|
||||
match message {
|
||||
JSONRPCMessage::Notification(notification) => {
|
||||
if let Some(event) = decode_raw_codex_event_notification(notification) {
|
||||
let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
|
||||
app_event_tx.send(AppEvent::CodexEvent(event));
|
||||
if is_shutdown_complete {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
JSONRPCMessage::Request(request) => {
|
||||
if let Ok(server_request) = ServerRequest::try_from(request) {
|
||||
match server_request {
|
||||
ServerRequest::CommandExecutionRequestApproval { request_id, params } => {
|
||||
let key = params.approval_id.unwrap_or(params.item_id);
|
||||
pending.exec_approval_by_id.insert(key, request_id);
|
||||
}
|
||||
ServerRequest::FileChangeRequestApproval { request_id, params } => {
|
||||
pending
|
||||
.patch_approval_by_item_id
|
||||
.insert(params.item_id, request_id);
|
||||
}
|
||||
ServerRequest::ToolRequestUserInput { request_id, params } => {
|
||||
pending
|
||||
.user_input_by_turn_id
|
||||
.insert(params.turn_id, request_id);
|
||||
}
|
||||
ServerRequest::DynamicToolCall { request_id, params } => {
|
||||
pending
|
||||
.dynamic_tool_by_call_id
|
||||
.insert(params.call_id, request_id);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
JSONRPCMessage::Response(response) => {
|
||||
if &response.id == start_request_id {
|
||||
match serde_json::from_value::<ThreadStartResponse>(response.result) {
|
||||
Ok(parsed) => {
|
||||
if let Some(thread_id) = thread_id {
|
||||
*thread_id = Some(parsed.thread.id);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!("failed to decode thread/start response: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
JSONRPCMessage::Error(error) => {
|
||||
if &error.id == start_request_id {
|
||||
let message = format!(
|
||||
"thread/start failed: code={} message={}",
|
||||
error.error.code, error.error.message
|
||||
);
|
||||
tracing::error!("{message}");
|
||||
app_event_tx.send(AppEvent::FatalExitRequest(message));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
fn decode_raw_codex_event_notification(
|
||||
notification: codex_app_server_protocol::JSONRPCNotification,
|
||||
) -> Option<Event> {
|
||||
if !notification.method.starts_with("codex/event/") {
|
||||
return None;
|
||||
}
|
||||
let params = notification.params?;
|
||||
match serde_json::from_value::<Event>(params) {
|
||||
Ok(event) => Some(event),
|
||||
Err(err) => {
|
||||
tracing::warn!("failed to decode raw codex event notification: {err}");
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn thread_start_params_from_config(config: &Config) -> ThreadStartParams {
|
||||
ThreadStartParams {
|
||||
model: config.model.clone(),
|
||||
model_provider: Some(config.model_provider_id.clone()),
|
||||
cwd: Some(config.cwd.display().to_string()),
|
||||
approval_policy: Some((*config.permissions.approval_policy.get()).into()),
|
||||
personality: config.personality,
|
||||
experimental_raw_events: true,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
async fn submit_thread_op(
|
||||
client: &EmbeddedSessionClient,
|
||||
thread_id: &str,
|
||||
op: &Op,
|
||||
next_request_id: &mut i64,
|
||||
) -> std::io::Result<()> {
|
||||
let request = ClientRequest::ThreadSubmitOp {
|
||||
request_id: next_request_id_value(next_request_id),
|
||||
params: ThreadSubmitOpParams {
|
||||
thread_id: thread_id.to_string(),
|
||||
op: serde_json::to_value(op).map_err(std::io::Error::other)?,
|
||||
},
|
||||
};
|
||||
client.send_request(request).await
|
||||
}
|
||||
|
||||
fn next_request_id_value(next_request_id: &mut i64) -> AppServerRequestId {
|
||||
let value = *next_request_id;
|
||||
*next_request_id += 1;
|
||||
AppServerRequestId::Integer(value)
|
||||
}
|
||||
|
||||
fn is_server_request_reply_op(op: &Op) -> bool {
|
||||
matches!(
|
||||
op,
|
||||
Op::ExecApproval { .. }
|
||||
| Op::PatchApproval { .. }
|
||||
| Op::UserInputAnswer { .. }
|
||||
| Op::DynamicToolResponse { .. }
|
||||
)
|
||||
}
|
||||
|
||||
async fn try_handle_server_request_reply_op(
|
||||
client: &mut EmbeddedSessionClient,
|
||||
pending: &mut PendingServerRequests,
|
||||
op: &Op,
|
||||
) -> std::io::Result<bool> {
|
||||
match op {
|
||||
Op::ExecApproval { id, decision, .. } => {
|
||||
if let Some(request_id) = pending.exec_approval_by_id.remove(id) {
|
||||
let response = CommandExecutionRequestApprovalResponse {
|
||||
decision: map_exec_approval_decision(decision.clone()),
|
||||
};
|
||||
send_jsonrpc_response(client, request_id, response).await?;
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
Op::PatchApproval { id, decision } => {
|
||||
if let Some(request_id) = pending.patch_approval_by_item_id.remove(id) {
|
||||
let response = FileChangeRequestApprovalResponse {
|
||||
decision: map_file_change_approval_decision(decision.clone()),
|
||||
};
|
||||
send_jsonrpc_response(client, request_id, response).await?;
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
Op::UserInputAnswer { id, response } => {
|
||||
if let Some(request_id) = pending.user_input_by_turn_id.remove(id) {
|
||||
let response = ToolRequestUserInputResponse {
|
||||
answers: response
|
||||
.answers
|
||||
.iter()
|
||||
.map(|(question_id, answer)| {
|
||||
(
|
||||
question_id.clone(),
|
||||
ToolRequestUserInputAnswer {
|
||||
answers: answer.answers.clone(),
|
||||
},
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
send_jsonrpc_response(client, request_id, response).await?;
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
Op::DynamicToolResponse { id, response } => {
|
||||
if let Some(request_id) = pending.dynamic_tool_by_call_id.remove(id) {
|
||||
let response = v2_dynamic_tool_response_from_core(response.clone());
|
||||
send_jsonrpc_response(client, request_id, response).await?;
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
async fn send_jsonrpc_response<T: serde::Serialize>(
|
||||
client: &mut EmbeddedSessionClient,
|
||||
request_id: AppServerRequestId,
|
||||
response: T,
|
||||
) -> std::io::Result<()> {
|
||||
let result = serde_json::to_value(response).map_err(std::io::Error::other)?;
|
||||
client
|
||||
.send_response(JSONRPCResponse {
|
||||
id: request_id,
|
||||
result,
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
fn map_exec_approval_decision(decision: ReviewDecision) -> CommandExecutionApprovalDecision {
|
||||
match decision {
|
||||
ReviewDecision::Approved => CommandExecutionApprovalDecision::Accept,
|
||||
ReviewDecision::ApprovedForSession => CommandExecutionApprovalDecision::AcceptForSession,
|
||||
ReviewDecision::ApprovedExecpolicyAmendment {
|
||||
proposed_execpolicy_amendment,
|
||||
} => CommandExecutionApprovalDecision::AcceptWithExecpolicyAmendment {
|
||||
execpolicy_amendment: proposed_execpolicy_amendment.into(),
|
||||
},
|
||||
ReviewDecision::Denied => CommandExecutionApprovalDecision::Decline,
|
||||
ReviewDecision::Abort => CommandExecutionApprovalDecision::Cancel,
|
||||
}
|
||||
}
|
||||
|
||||
fn map_file_change_approval_decision(decision: ReviewDecision) -> FileChangeApprovalDecision {
|
||||
match decision {
|
||||
ReviewDecision::Approved => FileChangeApprovalDecision::Accept,
|
||||
ReviewDecision::ApprovedForSession => FileChangeApprovalDecision::AcceptForSession,
|
||||
ReviewDecision::Denied => FileChangeApprovalDecision::Decline,
|
||||
ReviewDecision::Abort => FileChangeApprovalDecision::Cancel,
|
||||
ReviewDecision::ApprovedExecpolicyAmendment { .. } => FileChangeApprovalDecision::Accept,
|
||||
}
|
||||
}
|
||||
|
||||
fn v2_dynamic_tool_response_from_core(
|
||||
response: CoreDynamicToolResponse,
|
||||
) -> V2DynamicToolCallResponse {
|
||||
V2DynamicToolCallResponse {
|
||||
content_items: response
|
||||
.content_items
|
||||
.into_iter()
|
||||
.map(|item| match item {
|
||||
CoreDynamicToolCallOutputContentItem::InputText { text } => {
|
||||
V2DynamicToolCallOutputContentItem::InputText { text }
|
||||
}
|
||||
CoreDynamicToolCallOutputContentItem::InputImage { image_url } => {
|
||||
V2DynamicToolCallOutputContentItem::InputImage { image_url }
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
success: response.success,
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn agent loops for an existing thread (e.g., a forked thread).
|
||||
/// Sends the provided `SessionConfiguredEvent` immediately, then forwards subsequent
|
||||
/// events and accepts Ops for submission.
|
||||
|
||||
Reference in New Issue
Block a user