Compare commits

...

1 Commits

Author SHA1 Message Date
pakrym-oai
f564b7fed8 haha 2026-02-21 14:57:23 -08:00
9 changed files with 607 additions and 45 deletions

1
codex-rs/Cargo.lock generated
View File

@@ -2245,6 +2245,7 @@ dependencies = [
"chrono",
"clap",
"codex-ansi-escape",
"codex-app-server",
"codex-app-server-protocol",
"codex-arg0",
"codex-backend-client",

View File

@@ -235,6 +235,11 @@ client_request_definitions! {
params: v2::ThreadReadParams,
response: v2::ThreadReadResponse,
},
#[experimental("thread/submitOp")]
ThreadSubmitOp => "thread/submitOp" {
params: v2::ThreadSubmitOpParams,
response: v2::ThreadSubmitOpResponse,
},
SkillsList => "skills/list" {
params: v2::SkillsListParams,
response: v2::SkillsListResponse,

View File

@@ -1962,6 +1962,21 @@ pub struct ThreadReadResponse {
pub thread: Thread,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadSubmitOpParams {
pub thread_id: String,
/// Serialized `codex_protocol::protocol::Op` payload used as a temporary
/// compatibility bridge while clients migrate to typed app-server methods.
pub op: JsonValue,
}
#[derive(Serialize, Deserialize, Debug, Default, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadSubmitOpResponse {}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View File

@@ -149,6 +149,8 @@ use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::ThreadStatus;
use codex_app_server_protocol::ThreadSubmitOpParams;
use codex_app_server_protocol::ThreadSubmitOpResponse;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::ThreadUnarchiveResponse;
use codex_app_server_protocol::ThreadUnarchivedNotification;
@@ -586,6 +588,10 @@ impl CodexMessageProcessor {
self.thread_read(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::ThreadSubmitOp { request_id, params } => {
self.thread_submit_op(to_connection_request_id(request_id), params)
.await;
}
ClientRequest::SkillsList { request_id, params } => {
self.skills_list(to_connection_request_id(request_id), params)
.await;
@@ -2784,6 +2790,46 @@ impl CodexMessageProcessor {
self.outgoing.send_response(request_id, response).await;
}
async fn thread_submit_op(
&self,
request_id: ConnectionRequestId,
params: ThreadSubmitOpParams,
) {
let ThreadSubmitOpParams { thread_id, op } = params;
let (_, thread) = match self.load_thread(&thread_id).await {
Ok(v) => v,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let op = match serde_json::from_value::<Op>(op) {
Ok(op) => op,
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("invalid core op payload: {err}"),
)
.await;
return;
}
};
match thread.submit(op).await {
Ok(_) => {
self.outgoing
.send_response(request_id, ThreadSubmitOpResponse {})
.await;
}
Err(err) => {
self.send_internal_error(request_id, format!("failed to submit op: {err}"))
.await;
}
}
}
pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
self.thread_manager.subscribe_thread_created()
}

View File

@@ -0,0 +1,158 @@
use std::path::PathBuf;
use std::sync::Arc;
use crate::codex_message_processor::CodexMessageProcessor;
use crate::codex_message_processor::CodexMessageProcessorArgs;
use crate::outgoing_message::ConnectionId;
use crate::outgoing_message::OutgoingEnvelope;
use crate::outgoing_message::OutgoingMessage;
use crate::outgoing_message::OutgoingMessageSender;
use crate::transport::CHANNEL_CAPACITY;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCResponse;
use codex_cloud_requirements::cloud_requirements_loader;
use codex_core::AuthManager;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_feedback::CodexFeedback;
use tokio::sync::mpsc;
use toml::Value as TomlValue;
const EMBEDDED_CONNECTION_ID: ConnectionId = ConnectionId(0);
#[derive(Debug)]
enum EmbeddedSessionInput {
Request(ClientRequest),
Response(JSONRPCResponse),
Error(JSONRPCError),
}
pub struct EmbeddedSessionClientArgs {
pub auth_manager: Arc<AuthManager>,
pub thread_manager: Arc<ThreadManager>,
pub config: Config,
pub cli_overrides: Vec<(String, TomlValue)>,
pub feedback: CodexFeedback,
pub codex_linux_sandbox_exe: Option<PathBuf>,
}
pub struct EmbeddedSessionClient {
input_tx: mpsc::Sender<EmbeddedSessionInput>,
output_rx: mpsc::Receiver<JSONRPCMessage>,
}
impl EmbeddedSessionClient {
pub fn spawn(args: EmbeddedSessionClientArgs) -> Self {
let (input_tx, mut input_rx) = mpsc::channel::<EmbeddedSessionInput>(CHANNEL_CAPACITY);
let (output_tx, output_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<OutgoingEnvelope>(CHANNEL_CAPACITY);
let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx));
let EmbeddedSessionClientArgs {
auth_manager,
thread_manager,
config,
cli_overrides,
feedback,
codex_linux_sandbox_exe,
} = args;
let config = Arc::new(config);
let cloud_requirements = Arc::new(std::sync::RwLock::new(cloud_requirements_loader(
auth_manager.clone(),
config.chatgpt_base_url.clone(),
config.codex_home.clone(),
)));
let mut processor = CodexMessageProcessor::new(CodexMessageProcessorArgs {
auth_manager,
thread_manager,
outgoing: outgoing.clone(),
codex_linux_sandbox_exe,
config,
cli_overrides,
cloud_requirements,
single_client_mode: true,
feedback,
});
tokio::spawn(async move {
while let Some(envelope) = outgoing_rx.recv().await {
let message = match envelope {
OutgoingEnvelope::ToConnection {
connection_id,
message,
} => {
if connection_id != EMBEDDED_CONNECTION_ID {
continue;
}
outgoing_message_to_jsonrpc(message)
}
OutgoingEnvelope::Broadcast { message } => outgoing_message_to_jsonrpc(message),
};
if output_tx.send(message).await.is_err() {
break;
}
}
});
tokio::spawn(async move {
while let Some(input) = input_rx.recv().await {
match input {
EmbeddedSessionInput::Request(request) => {
processor
.process_request(EMBEDDED_CONNECTION_ID, request)
.await;
}
EmbeddedSessionInput::Response(response) => {
outgoing
.notify_client_response(response.id, response.result)
.await;
}
EmbeddedSessionInput::Error(error) => {
outgoing.notify_client_error(error.id, error.error).await;
}
}
}
processor.connection_closed(EMBEDDED_CONNECTION_ID).await;
});
Self {
input_tx,
output_rx,
}
}
pub async fn send_request(&self, request: ClientRequest) -> std::io::Result<()> {
self.input_tx
.send(EmbeddedSessionInput::Request(request))
.await
.map_err(|_| std::io::Error::other("embedded app-server session is closed"))
}
pub async fn send_response(&self, response: JSONRPCResponse) -> std::io::Result<()> {
self.input_tx
.send(EmbeddedSessionInput::Response(response))
.await
.map_err(|_| std::io::Error::other("embedded app-server session is closed"))
}
pub async fn send_error(&self, error: JSONRPCError) -> std::io::Result<()> {
self.input_tx
.send(EmbeddedSessionInput::Error(error))
.await
.map_err(|_| std::io::Error::other("embedded app-server session is closed"))
}
pub async fn recv(&mut self) -> Option<JSONRPCMessage> {
self.output_rx.recv().await
}
}
fn outgoing_message_to_jsonrpc(message: OutgoingMessage) -> JSONRPCMessage {
let value =
serde_json::to_value(message).expect("outgoing app-server message should serialize");
serde_json::from_value(value).expect("outgoing app-server message should decode as JSON-RPC")
}

View File

@@ -56,6 +56,7 @@ mod bespoke_event_handling;
mod codex_message_processor;
mod config_api;
mod dynamic_tools;
mod embedded_session;
mod error_code;
mod filters;
mod fuzzy_file_search;
@@ -67,6 +68,8 @@ mod thread_status;
mod transport;
pub use crate::transport::AppServerTransport;
pub use embedded_session::EmbeddedSessionClient;
pub use embedded_session::EmbeddedSessionClientArgs;
const LOG_FORMAT_ENV_VAR: &str = "LOG_FORMAT";

View File

@@ -27,6 +27,7 @@ base64 = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true, features = ["derive"] }
codex-ansi-escape = { workspace = true }
codex-app-server = { workspace = true }
codex-app-server-protocol = { workspace = true }
codex-arg0 = { workspace = true }
codex-backend-client = { workspace = true }

View File

@@ -2616,7 +2616,13 @@ impl ChatWidget {
let prevent_idle_sleep = config.features.enabled(Feature::PreventIdleSleep);
let mut rng = rand::rng();
let placeholder = PLACEHOLDERS[rng.random_range(0..PLACEHOLDERS.len())].to_string();
let codex_op_tx = spawn_agent(config.clone(), app_event_tx.clone(), thread_manager);
let codex_op_tx = spawn_agent(
config.clone(),
app_event_tx.clone(),
thread_manager,
auth_manager.clone(),
feedback.clone(),
);
let model_override = model.as_deref();
let model_for_header = model

View File

@@ -1,73 +1,152 @@
use std::collections::HashMap;
use std::sync::Arc;
use codex_app_server::EmbeddedSessionClient;
use codex_app_server::EmbeddedSessionClientArgs;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::CommandExecutionApprovalDecision;
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
use codex_app_server_protocol::DynamicToolCallOutputContentItem as V2DynamicToolCallOutputContentItem;
use codex_app_server_protocol::DynamicToolCallResponse as V2DynamicToolCallResponse;
use codex_app_server_protocol::FileChangeApprovalDecision;
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
use codex_app_server_protocol::JSONRPCMessage;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId as AppServerRequestId;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadSubmitOpParams;
use codex_app_server_protocol::ToolRequestUserInputAnswer;
use codex_app_server_protocol::ToolRequestUserInputResponse;
use codex_core::AuthManager;
use codex_core::CodexThread;
use codex_core::NewThread;
use codex_core::ThreadManager;
use codex_core::config::Config;
use codex_feedback::CodexFeedback;
use codex_protocol::dynamic_tools::DynamicToolCallOutputContentItem as CoreDynamicToolCallOutputContentItem;
use codex_protocol::dynamic_tools::DynamicToolResponse as CoreDynamicToolResponse;
use codex_protocol::protocol::Event;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::Op;
use codex_protocol::protocol::ReviewDecision;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::unbounded_channel;
use crate::app_event::AppEvent;
use crate::app_event_sender::AppEventSender;
#[derive(Default)]
struct PendingServerRequests {
exec_approval_by_id: HashMap<String, AppServerRequestId>,
patch_approval_by_item_id: HashMap<String, AppServerRequestId>,
user_input_by_turn_id: HashMap<String, AppServerRequestId>,
dynamic_tool_by_call_id: HashMap<String, AppServerRequestId>,
}
/// Spawn the agent bootstrapper and op forwarding loop, returning the
/// `UnboundedSender<Op>` used by the UI to submit operations.
pub(crate) fn spawn_agent(
config: Config,
app_event_tx: AppEventSender,
server: Arc<ThreadManager>,
auth_manager: Arc<AuthManager>,
feedback: CodexFeedback,
) -> UnboundedSender<Op> {
let (codex_op_tx, mut codex_op_rx) = unbounded_channel::<Op>();
let app_event_tx_clone = app_event_tx;
tokio::spawn(async move {
let NewThread {
thread,
session_configured,
..
} = match server.start_thread(config).await {
Ok(v) => v,
Err(err) => {
let message = format!("Failed to initialize codex: {err}");
tracing::error!("{message}");
app_event_tx_clone.send(AppEvent::CodexEvent(Event {
id: "".to_string(),
msg: EventMsg::Error(err.to_error_event(None)),
}));
app_event_tx_clone.send(AppEvent::FatalExitRequest(message));
tracing::error!("failed to initialize codex: {err}");
return;
}
};
// Forward the captured `SessionConfigured` event so it can be rendered in the UI.
let ev = codex_protocol::protocol::Event {
// The `id` does not matter for rendering, so we can use a fake value.
id: "".to_string(),
msg: codex_protocol::protocol::EventMsg::SessionConfigured(session_configured),
};
app_event_tx_clone.send(AppEvent::CodexEvent(ev));
let thread_clone = thread.clone();
tokio::spawn(async move {
while let Some(op) = codex_op_rx.recv().await {
let id = thread_clone.submit(op).await;
if let Err(e) = id {
tracing::error!("failed to submit op: {e}");
}
}
let mut client = EmbeddedSessionClient::spawn(EmbeddedSessionClientArgs {
auth_manager,
thread_manager: server,
codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(),
config: config.clone(),
cli_overrides: Vec::new(),
feedback,
});
while let Ok(event) = thread.next_event().await {
let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
app_event_tx_clone.send(AppEvent::CodexEvent(event));
if is_shutdown_complete {
// ShutdownComplete is terminal for a thread; drop this receiver task so
// the Arc<CodexThread> can be released and thread resources can clean up.
break;
let mut next_request_id = 1_i64;
let start_request_id = next_request_id_value(&mut next_request_id);
let start_request = ClientRequest::ThreadStart {
request_id: start_request_id.clone(),
params: thread_start_params_from_config(&config),
};
if let Err(err) = client.send_request(start_request).await {
let message = format!("Failed to initialize codex app-server session: {err}");
tracing::error!("{message}");
app_event_tx.send(AppEvent::FatalExitRequest(message));
return;
}
let mut thread_id: Option<String> = None;
let mut pending = PendingServerRequests::default();
while thread_id.is_none() {
let Some(message) = client.recv().await else {
let message =
"Embedded app-server session closed before thread/start completed".to_string();
tracing::error!("{message}");
app_event_tx.send(AppEvent::FatalExitRequest(message));
return;
};
if handle_app_server_message(
&app_event_tx,
&start_request_id,
Some(&mut thread_id),
&mut pending,
message,
) {
return;
}
}
let Some(thread_id) = thread_id else {
let message = "thread/start did not return a thread id".to_string();
tracing::error!("{message}");
app_event_tx.send(AppEvent::FatalExitRequest(message));
return;
};
loop {
tokio::select! {
maybe_op = codex_op_rx.recv() => {
let Some(op) = maybe_op else {
break;
};
let handled = match try_handle_server_request_reply_op(&mut client, &mut pending, &op).await {
Ok(handled) => handled,
Err(err) => {
tracing::error!("failed to answer app-server server request: {err}");
false
}
};
if handled {
continue;
}
if is_server_request_reply_op(&op) {
tracing::warn!("dropping reply op without pending app-server request: {op:?}");
continue;
}
if let Err(err) = submit_thread_op(&client, &thread_id, &op, &mut next_request_id).await {
tracing::error!("failed to submit op via app-server: {err}");
}
}
maybe_message = client.recv() => {
let Some(message) = maybe_message else {
break;
};
if handle_app_server_message(
&app_event_tx,
&start_request_id,
None,
&mut pending,
message,
) {
break;
}
}
}
}
});
@@ -75,6 +154,254 @@ pub(crate) fn spawn_agent(
codex_op_tx
}
fn handle_app_server_message(
app_event_tx: &AppEventSender,
start_request_id: &AppServerRequestId,
thread_id: Option<&mut Option<String>>,
pending: &mut PendingServerRequests,
message: JSONRPCMessage,
) -> bool {
match message {
JSONRPCMessage::Notification(notification) => {
if let Some(event) = decode_raw_codex_event_notification(notification) {
let is_shutdown_complete = matches!(event.msg, EventMsg::ShutdownComplete);
app_event_tx.send(AppEvent::CodexEvent(event));
if is_shutdown_complete {
return true;
}
}
}
JSONRPCMessage::Request(request) => {
if let Ok(server_request) = ServerRequest::try_from(request) {
match server_request {
ServerRequest::CommandExecutionRequestApproval { request_id, params } => {
let key = params.approval_id.unwrap_or(params.item_id);
pending.exec_approval_by_id.insert(key, request_id);
}
ServerRequest::FileChangeRequestApproval { request_id, params } => {
pending
.patch_approval_by_item_id
.insert(params.item_id, request_id);
}
ServerRequest::ToolRequestUserInput { request_id, params } => {
pending
.user_input_by_turn_id
.insert(params.turn_id, request_id);
}
ServerRequest::DynamicToolCall { request_id, params } => {
pending
.dynamic_tool_by_call_id
.insert(params.call_id, request_id);
}
_ => {}
}
}
}
JSONRPCMessage::Response(response) => {
if &response.id == start_request_id {
match serde_json::from_value::<ThreadStartResponse>(response.result) {
Ok(parsed) => {
if let Some(thread_id) = thread_id {
*thread_id = Some(parsed.thread.id);
}
}
Err(err) => {
tracing::error!("failed to decode thread/start response: {err}");
}
}
}
}
JSONRPCMessage::Error(error) => {
if &error.id == start_request_id {
let message = format!(
"thread/start failed: code={} message={}",
error.error.code, error.error.message
);
tracing::error!("{message}");
app_event_tx.send(AppEvent::FatalExitRequest(message));
return true;
}
}
}
false
}
fn decode_raw_codex_event_notification(
notification: codex_app_server_protocol::JSONRPCNotification,
) -> Option<Event> {
if !notification.method.starts_with("codex/event/") {
return None;
}
let params = notification.params?;
match serde_json::from_value::<Event>(params) {
Ok(event) => Some(event),
Err(err) => {
tracing::warn!("failed to decode raw codex event notification: {err}");
None
}
}
}
fn thread_start_params_from_config(config: &Config) -> ThreadStartParams {
ThreadStartParams {
model: config.model.clone(),
model_provider: Some(config.model_provider_id.clone()),
cwd: Some(config.cwd.display().to_string()),
approval_policy: Some((*config.permissions.approval_policy.get()).into()),
personality: config.personality,
experimental_raw_events: true,
..Default::default()
}
}
async fn submit_thread_op(
client: &EmbeddedSessionClient,
thread_id: &str,
op: &Op,
next_request_id: &mut i64,
) -> std::io::Result<()> {
let request = ClientRequest::ThreadSubmitOp {
request_id: next_request_id_value(next_request_id),
params: ThreadSubmitOpParams {
thread_id: thread_id.to_string(),
op: serde_json::to_value(op).map_err(std::io::Error::other)?,
},
};
client.send_request(request).await
}
fn next_request_id_value(next_request_id: &mut i64) -> AppServerRequestId {
let value = *next_request_id;
*next_request_id += 1;
AppServerRequestId::Integer(value)
}
fn is_server_request_reply_op(op: &Op) -> bool {
matches!(
op,
Op::ExecApproval { .. }
| Op::PatchApproval { .. }
| Op::UserInputAnswer { .. }
| Op::DynamicToolResponse { .. }
)
}
async fn try_handle_server_request_reply_op(
client: &mut EmbeddedSessionClient,
pending: &mut PendingServerRequests,
op: &Op,
) -> std::io::Result<bool> {
match op {
Op::ExecApproval { id, decision, .. } => {
if let Some(request_id) = pending.exec_approval_by_id.remove(id) {
let response = CommandExecutionRequestApprovalResponse {
decision: map_exec_approval_decision(decision.clone()),
};
send_jsonrpc_response(client, request_id, response).await?;
return Ok(true);
}
}
Op::PatchApproval { id, decision } => {
if let Some(request_id) = pending.patch_approval_by_item_id.remove(id) {
let response = FileChangeRequestApprovalResponse {
decision: map_file_change_approval_decision(decision.clone()),
};
send_jsonrpc_response(client, request_id, response).await?;
return Ok(true);
}
}
Op::UserInputAnswer { id, response } => {
if let Some(request_id) = pending.user_input_by_turn_id.remove(id) {
let response = ToolRequestUserInputResponse {
answers: response
.answers
.iter()
.map(|(question_id, answer)| {
(
question_id.clone(),
ToolRequestUserInputAnswer {
answers: answer.answers.clone(),
},
)
})
.collect(),
};
send_jsonrpc_response(client, request_id, response).await?;
return Ok(true);
}
}
Op::DynamicToolResponse { id, response } => {
if let Some(request_id) = pending.dynamic_tool_by_call_id.remove(id) {
let response = v2_dynamic_tool_response_from_core(response.clone());
send_jsonrpc_response(client, request_id, response).await?;
return Ok(true);
}
}
_ => {}
}
Ok(false)
}
async fn send_jsonrpc_response<T: serde::Serialize>(
client: &mut EmbeddedSessionClient,
request_id: AppServerRequestId,
response: T,
) -> std::io::Result<()> {
let result = serde_json::to_value(response).map_err(std::io::Error::other)?;
client
.send_response(JSONRPCResponse {
id: request_id,
result,
})
.await
}
fn map_exec_approval_decision(decision: ReviewDecision) -> CommandExecutionApprovalDecision {
match decision {
ReviewDecision::Approved => CommandExecutionApprovalDecision::Accept,
ReviewDecision::ApprovedForSession => CommandExecutionApprovalDecision::AcceptForSession,
ReviewDecision::ApprovedExecpolicyAmendment {
proposed_execpolicy_amendment,
} => CommandExecutionApprovalDecision::AcceptWithExecpolicyAmendment {
execpolicy_amendment: proposed_execpolicy_amendment.into(),
},
ReviewDecision::Denied => CommandExecutionApprovalDecision::Decline,
ReviewDecision::Abort => CommandExecutionApprovalDecision::Cancel,
}
}
fn map_file_change_approval_decision(decision: ReviewDecision) -> FileChangeApprovalDecision {
match decision {
ReviewDecision::Approved => FileChangeApprovalDecision::Accept,
ReviewDecision::ApprovedForSession => FileChangeApprovalDecision::AcceptForSession,
ReviewDecision::Denied => FileChangeApprovalDecision::Decline,
ReviewDecision::Abort => FileChangeApprovalDecision::Cancel,
ReviewDecision::ApprovedExecpolicyAmendment { .. } => FileChangeApprovalDecision::Accept,
}
}
fn v2_dynamic_tool_response_from_core(
response: CoreDynamicToolResponse,
) -> V2DynamicToolCallResponse {
V2DynamicToolCallResponse {
content_items: response
.content_items
.into_iter()
.map(|item| match item {
CoreDynamicToolCallOutputContentItem::InputText { text } => {
V2DynamicToolCallOutputContentItem::InputText { text }
}
CoreDynamicToolCallOutputContentItem::InputImage { image_url } => {
V2DynamicToolCallOutputContentItem::InputImage { image_url }
}
})
.collect(),
success: response.success,
}
}
/// Spawn agent loops for an existing thread (e.g., a forked thread).
/// Sends the provided `SessionConfiguredEvent` immediately, then forwards subsequent
/// events and accepts Ops for submission.