moving concurrent execution to mcp instead of subprocess/exec

This commit is contained in:
pap
2025-07-26 17:26:49 +01:00
parent e8dc36a3d0
commit 875b5cb1e4
8 changed files with 327 additions and 51 deletions

2
codex-rs/Cargo.lock generated
View File

@@ -636,8 +636,10 @@ dependencies = [
"codex-exec",
"codex-linux-sandbox",
"codex-login",
"codex-mcp-client",
"codex-mcp-server",
"codex-tui",
"mcp-types",
"serde",
"serde_json",
"tempfile",

View File

@@ -25,6 +25,10 @@ codex-exec = { path = "../exec" }
codex-login = { path = "../login" }
codex-linux-sandbox = { path = "../linux-sandbox" }
codex-mcp-server = { path = "../mcp-server" }
# Added for `codex pap` subcommand to act as an MCP client to the `codex mcp` server.
codex-mcp-client = { path = "../mcp-client" }
# Direct dependency for types used in pap subcommand
mcp-types = { path = "../mcp-types" }
codex-tui = { path = "../tui" }
serde_json = "1"
serde = { version = "1", features = ["derive"] }

View File

@@ -48,32 +48,24 @@ pub fn maybe_spawn_concurrent(
let autonomous = tui_cli.full_auto
|| tui_cli.dangerously_bypass_approvals_and_sandbox
|| approval_on_failure;
if !autonomous {
eprintln!(
"Error: --concurrent requires autonomous mode. Use one of: --full-auto, --ask-for-approval on-failure, or --dangerously-bypass-approvals-and-sandbox."
);
std::process::exit(2);
}
if tui_cli.prompt.is_none() {
eprintln!(
"Error: --concurrent requires a prompt argument so the agent does not wait for interactive input."
);
std::process::exit(2);
}
// Build exec args from interactive CLI for autonomous run without TUI (background).
let mut exec_args: Vec<String> = Vec::new();
if !tui_cli.images.is_empty() {
exec_args.push("--image".into());
exec_args.push(tui_cli.images.iter().map(|p| p.display().to_string()).collect::<Vec<_>>().join(","));
}
if let Some(model) = &tui_cli.model { exec_args.push("--model".into()); exec_args.push(model.clone()); }
if let Some(profile) = &tui_cli.config_profile { exec_args.push("--profile".into()); exec_args.push(profile.clone()); }
if let Some(sandbox) = &tui_cli.sandbox_mode { exec_args.push("--sandbox".into()); exec_args.push(format!("{sandbox:?}").to_lowercase().replace('_', "-")); }
if tui_cli.full_auto { exec_args.push("--full-auto".into()); }
if tui_cli.dangerously_bypass_approvals_and_sandbox { exec_args.push("--dangerously-bypass-approvals-and-sandbox".into()); }
if tui_cli.skip_git_repo_check { exec_args.push("--skip-git-repo-check".into()); }
for raw in root_raw_overrides { exec_args.push("-c".into()); exec_args.push(raw.clone()); }
// todo: pap dynamically get those
let mut worker_args: Vec<String> = Vec::new();
// Map model/profile directly.
if let Some(model) = &tui_cli.model { worker_args.push("--model".into()); worker_args.push(model.clone()); }
if let Some(profile) = &tui_cli.config_profile { worker_args.push("--profile".into()); worker_args.push(profile.clone()); }
// Derive approval-policy & sandbox (respect explicit flags first, then full-auto / dangerous shortcuts).
let mut approval_policy: Option<String> = tui_cli.approval_policy.map(|a| format!("{a:?}").to_lowercase().replace('_', "-"));
let mut sandbox: Option<String> = tui_cli.sandbox_mode.map(|s| format!("{s:?}").to_lowercase().replace('_', "-"));
if approval_policy.is_none() && tui_cli.full_auto { approval_policy = Some("on-failure".into()); }
if sandbox.is_none() && tui_cli.full_auto { sandbox = Some("workspace-write".into()); }
if tui_cli.dangerously_bypass_approvals_and_sandbox { approval_policy = Some("never".into()); sandbox = Some("danger-full-access".into()); }
if let Some(ap) = approval_policy { worker_args.push("--approval-policy".into()); worker_args.push(ap); }
if let Some(sb) = sandbox { worker_args.push("--sandbox".into()); worker_args.push(sb); }
// Config overrides (-c) from root and interactive CLI.
for raw in root_raw_overrides { worker_args.push("--worker-config".into()); worker_args.push(raw.clone()); }
for raw in &tui_cli.config_overrides.raw_overrides { worker_args.push("--worker-config".into()); worker_args.push(raw.clone()); }
// Derive a single slug (shared by worktree branch & log filename) from the prompt.
let raw_prompt = tui_cli.prompt.as_deref().unwrap_or("");
@@ -122,8 +114,9 @@ pub fn maybe_spawn_concurrent(
original_commit = git_capture(["rev-parse", "HEAD"]).ok();
match create_concurrent_worktree(&branch_name_effective) {
Ok(Some(info)) => {
exec_args.push("--cd".into());
exec_args.push(info.worktree_path.display().to_string());
// Record worktree path to pass as --cwd to worker
worker_args.push("--cwd".into());
worker_args.push(info.worktree_path.display().to_string());
created_worktree = Some((info.worktree_path, info.branch_name.clone()));
// Keep the original git output plus a concise created line (for log file only).
pre_spawn_logs.push_str(&info.logs);
@@ -142,12 +135,14 @@ pub fn maybe_spawn_concurrent(
}
}
} else if let Some(explicit) = &tui_cli.cwd {
exec_args.push("--cd".into());
exec_args.push(explicit.display().to_string());
worker_args.push("--cwd".into());
worker_args.push(explicit.display().to_string());
}
// Prompt (safe to unwrap due to earlier validation).
if let Some(prompt) = tui_cli.prompt.clone() { exec_args.push(prompt); }
// Prompt (safe to unwrap due to earlier validation in autonomous case). For non-autonomous
// (interactive later) runs we intentionally do NOT pass the prompt to the subprocess so it
// will wait for a Submission over stdin.
if let Some(prompt) = tui_cli.prompt.clone() { worker_args.push("--prompt".into()); worker_args.push(prompt); } else { eprintln!("Error: --concurrent requires a prompt."); return Ok(false); }
// Create (or truncate) the log file and write any pre-spawn logs we captured.
let file = match File::create(&log_path) {
@@ -170,8 +165,8 @@ pub fn maybe_spawn_concurrent(
let mut cmd = Command::new(
std::env::current_exe().unwrap_or_else(|_| PathBuf::from("codex"))
);
cmd.arg("exec");
for a in &exec_args { cmd.arg(a); }
cmd.arg("worker");
for a in &worker_args { cmd.arg(a); }
// Provide metadata for auto merge if we created a worktree.
if let Some((wt_path, branch)) = &created_worktree {
if effective_automerge { cmd.env("CODEX_CONCURRENT_AUTOMERGE", "1"); }
@@ -186,7 +181,7 @@ pub fn maybe_spawn_concurrent(
cmd.stdout(Stdio::from(file));
if let Some(f2) = file_err { cmd.stderr(Stdio::from(f2)); }
match cmd.spawn() {
Ok(child) => {
Ok(mut child) => {
// Human-friendly multi-line output with bold headers.
let branch_val = created_worktree.as_ref().map(|(_, b)| b.as_str()).unwrap_or("(none)");
let worktree_val = created_worktree
@@ -198,10 +193,9 @@ pub fn maybe_spawn_concurrent(
println!("\x1b[1mPID:\x1b[0m {}", child.id());
println!("\x1b[1mBranch:\x1b[0m {}", branch_val);
println!("\x1b[1mWorktree:\x1b[0m {}", worktree_val);
println!("\x1b[1mState:\x1b[0m started");
// Use bold bright magenta (95) for actionable follow-up commands.
println!("\nMonitor all tasks: \x1b[1;95mcodex tasks ls\x1b[0m");
println!("Watch this task: \x1b[1;95mcodex logs {} -f\x1b[0m", task_id);
let initial_state = "started";
println!("\x1b[1mState:\x1b[0m {}", initial_state);
println!("\nStreaming logs (press Ctrl+C to abort view; task will continue)...\n");
// Record task metadata to CODEX_HOME/tasks.jsonl (JSON Lines file).
let record_time = std::time::SystemTime::now()
@@ -224,18 +218,19 @@ pub fn maybe_spawn_concurrent(
"automerge": effective_automerge,
"explicit_branch_name": user_branch_name_opt,
"token_count": serde_json::Value::Null,
"state": "started",
"state": initial_state,
});
if let Ok(mut f) = std::fs::OpenOptions::new().create(true).append(true).open(&tasks_path) {
use std::io::Write;
if let Err(e) = writeln!(f, "{}", record.to_string()) {
eprintln!("Warning: failed writing task record to {}: {e}", tasks_path.display());
}
} else {
eprintln!("Warning: could not open tasks log file at {}", tasks_path.display());
let _ = writeln!(f, "{}", record.to_string());
}
}
return Ok(true); // background spawned
// Attach: tail the log file until process exits.
if let Err(e) = stream_log_until_exit(&log_path, &mut child) {
eprintln!("Error streaming logs: {e}");
}
return Ok(true); // run handled inline
}
Err(e) => {
eprintln!("Failed to start background exec: {e}. Falling back to interactive mode.");
@@ -354,4 +349,42 @@ fn parse_env_bool(name: &str) -> Option<bool> {
"0" | "false" | "no" | "off" => Some(false),
_ => None,
}
}
}
// Attach helper: follow the log file while the child runs.
// todo: remove this once we have a tui
fn stream_log_until_exit(log_path: &std::path::Path, child: &mut std::process::Child) -> anyhow::Result<()> {
use std::io::{Read, Seek, SeekFrom};
use std::time::Duration;
let mut f = std::fs::OpenOptions::new().read(true).open(log_path)?;
let mut pos: u64 = 0;
// Print any existing content first.
let mut existing = String::new();
f.read_to_string(&mut existing)?;
print!("{}", existing);
pos = existing.len() as u64;
loop {
// Check if process has exited.
if let Some(status) = child.try_wait()? {
// Drain any remaining bytes.
let mut tail = String::new();
f.seek(SeekFrom::Start(pos))?;
f.read_to_string(&mut tail)?;
if !tail.is_empty() { print!("{}", tail); }
println!("\n\x1b[1mTask exited with status: {}\x1b[0m", status);
break;
}
// Read new bytes if any.
let meta = f.metadata()?;
let len = meta.len();
if len > pos {
f.seek(SeekFrom::Start(pos))?;
let mut buf = String::new();
f.read_to_string(&mut buf)?;
if !buf.is_empty() { print!("{}", buf); }
pos = len;
}
std::thread::sleep(Duration::from_millis(500));
}
Ok(())
}

View File

@@ -15,6 +15,11 @@ use codex_tui::Cli as TuiCli;
use std::path::PathBuf;
use crate::proto::ProtoCli;
use codex_mcp_client::McpClient;
use mcp_types::{ClientCapabilities, Implementation};
use serde_json::json;
use std::time::Duration;
use tracing::{debug, info};
/// Codex CLI
///
@@ -90,6 +95,10 @@ enum Subcommand {
/// Inspect full metadata for a task.
Inspect(codex_cli::inspect::InspectCli),
/// Hidden: internal worker used for --concurrent MCP-based runs.
#[clap(hide = true)]
Worker(ConcurrentWorkerCli),
}
#[derive(Debug, Parser)]
@@ -120,6 +129,27 @@ struct LoginCommand {
config_overrides: CliConfigOverrides,
}
#[derive(Debug, Parser)]
struct ConcurrentWorkerCli {
#[clap(long)]
prompt: String,
#[clap(long)]
model: Option<String>,
#[clap(long)]
profile: Option<String>,
#[clap(long, value_name = "POLICY")] // untrusted | on-failure | never
approval_policy: Option<String>,
#[clap(long, value_name = "MODE")] // read-only | workspace-write | danger-full-access
sandbox: Option<String>,
#[clap(long)]
cwd: Option<String>,
#[clap(flatten)]
config_overrides: CliConfigOverrides,
/// Optional base instructions override
#[clap(long = "base-instructions")]
base_instructions: Option<String>,
}
fn main() -> anyhow::Result<()> {
codex_linux_sandbox::run_with_sandbox(|codex_linux_sandbox_exe| async move {
cli_main(codex_linux_sandbox_exe).await?;
@@ -160,6 +190,136 @@ async fn cli_main(codex_linux_sandbox_exe: Option<PathBuf>) -> anyhow::Result<()
Some(Subcommand::Mcp) => {
codex_mcp_server::run_main(codex_linux_sandbox_exe).await?;
}
Some(Subcommand::Worker(worker_cli)) => {
// Internal worker invoked by maybe_spawn_concurrent. Runs a single Codex MCP tool-call.
debug!(?worker_cli.prompt, "starting concurrent worker");
// Build MCP client by spawning current binary with `mcp` subcommand.
let exe = std::env::current_exe()?;
let exe_str = exe.to_string_lossy().to_string();
// Pass through OPENAI_API_KEY (and related) so MCP server can access the model provider.
let mut extra_env: std::collections::HashMap<String, String> = std::collections::HashMap::new();
// TODO: pap check if this is needed + check if we can use the same env vars as the main process (overall)
if let Ok(v) = std::env::var("OPENAI_API_KEY") { extra_env.insert("OPENAI_API_KEY".into(), v); }
if let Ok(v) = std::env::var("OPENAI_BASE_URL") { extra_env.insert("OPENAI_BASE_URL".into(), v); }
let client = McpClient::new_stdio_client(exe_str, vec!["mcp".to_string()], Some(extra_env)).await?;
// Initialize MCP session.
let init_params = mcp_types::InitializeRequestParams {
capabilities: ClientCapabilities { experimental: None, roots: None, sampling: None, elicitation: Some(json!({})) },
client_info: Implementation { name: "codex-concurrent-worker".into(), version: env!("CARGO_PKG_VERSION").into(), title: Some("Codex Concurrent Worker".into()) },
protocol_version: mcp_types::MCP_SCHEMA_VERSION.to_string(),
};
let _init_res = client.initialize(init_params, None, Some(Duration::from_secs(15))).await?;
debug!("initialized MCP session for worker");
// Build arguments for codex tool call using kebab-case keys expected by MCP server.
let mut arg_obj = serde_json::Map::new();
// todo: how to pass all variables dynamically?
arg_obj.insert("prompt".to_string(), worker_cli.prompt.clone().into());
if let Some(m) = worker_cli.model.clone() { arg_obj.insert("model".into(), m.into()); }
if let Some(p) = worker_cli.profile.clone() { arg_obj.insert("profile".into(), p.into()); }
if let Some(ap) = worker_cli.approval_policy.clone() { arg_obj.insert("approval-policy".into(), ap.into()); }
if let Some(sb) = worker_cli.sandbox.clone() { arg_obj.insert("sandbox".into(), sb.into()); }
if let Some(cwd) = worker_cli.cwd.clone() { arg_obj.insert("cwd".into(), cwd.into()); }
if let Some(bi) = worker_cli.base_instructions.clone() { arg_obj.insert("base-instructions".into(), bi.into()); }
let config_json = serde_json::to_value(&worker_cli.config_overrides)?;
arg_obj.insert("config".into(), config_json);
let args_json = serde_json::Value::Object(arg_obj);
debug!(?args_json, "calling codex tool via MCP");
let mut session_id: Option<String> = None;
// Grab notifications receiver to watch for SessionConfigured (to extract sessionId) while first tool call runs.
let mut notif_rx = client.take_notification_receiver().await;
// Spawn a task to extract sessionId and print filtered events.
if let Some(mut rx) = notif_rx.take() {
tokio::spawn(async move {
use serde_json::Value;
while let Some(n) = rx.recv().await {
if let Some(p) = &n.params {
if let Some(root) = p.as_object() {
if let Some(val) = root.get("sessionId").or_else(|| root.get("session_id")) {
// todo: reuse session id as task id
if let Some(s) = val.as_str() { if !s.is_empty() { println!("SESSION ID: {}", s); } }
}
if let Some(Value::Object(msg)) = root.get("msg") {
if let Some(Value::String(typ)) = msg.get("type") {
if typ.ends_with("_delta") { continue; }
// todo: use the tui once it manages multi processes
match typ.as_str() {
"agent_reasoning" => {
if let Some(Value::String(text)) = msg.get("text") { println!("\x1b[36mreasoning:\x1b[0m {}", text); }
}
"exec_approval_request" => {
let cmd = msg.get("command").and_then(|v| v.as_array()).map(|arr| arr.iter().filter_map(|x| x.as_str()).collect::<Vec<_>>().join(" ")).unwrap_or_default();
let cwd = msg.get("cwd").and_then(|v| v.as_str()).unwrap_or("");
println!("\x1b[33mexec approval requested:\x1b[0m {cmd} (cwd: {cwd})");
}
"apply_patch_approval_request" => {
let reason = msg.get("reason").and_then(|v| v.as_str()).unwrap_or("");
println!("\x1b[35mpatch approval requested:\x1b[0m {reason}");
}
"task_complete" => {
println!("\x1b[32mtask complete\x1b[0m");
}
_ => { /* suppress other event types */ }
}
}
}
}
}
}
});
}
let first_result = client.call_tool("codex".to_string(), Some(args_json), None).await;
// todo: to test we have not implemented the tool call yet
match &first_result {
Ok(r) => debug!(blocks = r.content.len(), "codex initial tool call completed"),
Err(e) => debug!(error = %e, "codex tool call failed"),
}
let first_result = first_result?;
// Print any text content to stdout.
let mut printed_any = false;
for block in &first_result.content {
if let mcp_types::ContentBlock::TextContent(t) = block { println!("{}", t.text); printed_any = true; }
}
if !printed_any { info!("no text content blocks returned from initial codex tool call"); }
// Attempt to parse session id from printed notifications (fallback approach): scan stdout not feasible here; so rely on user-visible marker.
// Interactive loop for follow-up prompts.
use std::io::{stdin, stdout, Write};
loop {
print!("codex> "); let _ = stdout().flush();
let mut line = String::new();
if stdin().read_line(&mut line).is_err() { break; }
let trimmed = line.trim();
if trimmed.is_empty() || trimmed == "/exit" || trimmed == ":q" { break; }
// If session id still unknown, ask user to paste it.
if session_id.is_none() {
if trimmed.starts_with("session ") { session_id = Some(trimmed[8..].trim().to_string()); println!("Stored session id."); continue; }
}
if session_id.is_none() { println!("(Need session id; when you see 'SESSION ID: <uuid>' above, copy it or type 'session <uuid>')"); continue; }
let args = serde_json::json!({ "sessionId": session_id.clone().unwrap(), "prompt": trimmed });
let reply = client.call_tool("codex-reply".to_string(), Some(args), None).await;
match reply {
Ok(r) => {
for block in r.content {
if let mcp_types::ContentBlock::TextContent(t) = block { println!("{}", t.text); }
}
}
Err(e) => println!("Error: {e}"),
}
}
// Append completion record to tasks.jsonl now that interactive loop ends.
if let Ok(task_id) = std::env::var("CODEX_TASK_ID") {
if let Some(base) = codex_base_dir_for_worker() {
let tasks_path = base.join("tasks.jsonl");
let ts = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).map(|d| d.as_secs()).unwrap_or(0);
let obj = serde_json::json!({
"task_id": task_id,
"completion_time": ts,
"end_time": ts,
"state": "done",
});
let _ = append_json_line(&tasks_path, &obj);
}
}
}
Some(Subcommand::Login(mut login_cli)) => {
prepend_config_flags(&mut login_cli.config_overrides, cli.config_overrides);
run_login_with_chatgpt(login_cli.config_overrides).await;
@@ -223,3 +383,18 @@ fn print_completion(cmd: CompletionCommand) {
let name = "codex";
generate(cmd.shell, &mut app, name, &mut std::io::stdout());
}
// Helper functions for worker
fn codex_base_dir_for_worker() -> Option<std::path::PathBuf> {
if let Ok(val) = std::env::var("CODEX_HOME") { if !val.is_empty() { return std::fs::canonicalize(val).ok(); } }
let home = std::env::var_os("HOME")?;
let base = std::path::PathBuf::from(home).join(".codex");
let _ = std::fs::create_dir_all(&base);
Some(base)
}
fn append_json_line(path: &std::path::PathBuf, val: &serde_json::Value) -> std::io::Result<()> {
use std::io::Write;
let mut f = std::fs::OpenOptions::new().create(true).append(true).open(path)?;
writeln!(f, "{}", val.to_string())
}

View File

@@ -10,7 +10,7 @@ workspace = true
clap = { version = "4", features = ["derive", "wrap_help"], optional = true }
codex-core = { path = "../core" }
toml = { version = "0.9", optional = true }
serde = { version = "1", optional = true }
serde = { version = "1", features = ["derive"], optional = true }
[features]
# Separate feature so that `clap` is not a mandatory dependency.

View File

@@ -15,7 +15,7 @@ use toml::Value;
/// CLI option that captures arbitrary configuration overrides specified as
/// `-c key=value`. It intentionally keeps both halves **unparsed** so that the
/// calling code can decide how to interpret the right-hand side.
#[derive(Parser, Debug, Default, Clone)]
#[derive(Parser, Debug, Default, Clone, serde::Serialize)]
pub struct CliConfigOverrides {
/// Override a configuration value that would otherwise be loaded from
/// `~/.codex/config.toml`. Use a dotted path (`foo.bar.baz`) to override

View File

@@ -199,7 +199,21 @@ impl EventProcessor for EventProcessorWithHumanOutput {
ts_println!(self, "{}", message.style(self.dimmed));
}
EventMsg::TaskStarted => {
// Ignore.
if let Ok(task_id) = std::env::var("CODEX_TASK_ID") {
if let Some(base) = codex_base_dir_for_logging() {
let tasks_path = base.join("tasks.jsonl");
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let obj = serde_json::json!({
"task_id": task_id,
"update_time": ts,
"state": "started",
});
let _ = append_json_line(&tasks_path, &obj);
}
}
}
EventMsg::TaskComplete(TaskCompleteEvent { last_agent_message }) => {
@@ -537,10 +551,41 @@ impl EventProcessor for EventProcessorWithHumanOutput {
}
}
EventMsg::ExecApprovalRequest(_) => {
// Should we exit?
// When a background task requests execution approval, persist a state transition
// so `codex tasks ls` can reflect that it is waiting on user input.
if let Ok(task_id) = std::env::var("CODEX_TASK_ID") {
if let Some(base) = codex_base_dir_for_logging() {
let tasks_path = base.join("tasks.jsonl");
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let obj = serde_json::json!({
"task_id": task_id,
"update_time": ts,
"state": "waiting_exec_approval",
});
let _ = append_json_line(&tasks_path, &obj);
}
}
}
EventMsg::ApplyPatchApprovalRequest(_) => {
// Should we exit?
// todo: test/verify and verify if useful to keep now
if let Ok(task_id) = std::env::var("CODEX_TASK_ID") {
if let Some(base) = codex_base_dir_for_logging() {
let tasks_path = base.join("tasks.jsonl");
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let obj = serde_json::json!({
"task_id": task_id,
"update_time": ts,
"state": "waiting_patch_approval",
});
let _ = append_json_line(&tasks_path, &obj);
}
}
}
EventMsg::AgentReasoning(agent_reasoning_event) => {
if self.show_agent_reasoning {

View File

@@ -75,6 +75,9 @@ pub struct McpClient {
/// Monotonically increasing counter used to generate request IDs.
id_counter: AtomicI64,
/// Channel receiver for notifications (single consumer). Created per client.
notifications_rx: Mutex<Option<mpsc::Receiver<JSONRPCNotification>>>,
}
impl McpClient {
@@ -110,6 +113,7 @@ impl McpClient {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
let pending: Arc<Mutex<HashMap<i64, PendingSender>>> = Arc::new(Mutex::new(HashMap::new()));
let (notif_tx, notif_rx) = mpsc::channel::<JSONRPCNotification>(CHANNEL_CAPACITY);
// Spawn writer task. It listens on the `outgoing_rx` channel and
// writes messages to the child's STDIN.
@@ -156,8 +160,15 @@ impl McpClient {
Self::dispatch_error(err, &pending).await;
}
Ok(JSONRPCMessage::Notification(JSONRPCNotification { .. })) => {
// For now we only log server-initiated notifications.
// Log and also print notifications so callers (e.g., concurrent worker) can stream progress.
info!("<- notification: {}", line);
// (Filtered printing handled by higher-level caller; suppress raw spam here.)
// Attempt to forward the notification to channel subscribers.
if let Ok(parsed) = serde_json::from_str::<JSONRPCMessage>(&line) {
if let JSONRPCMessage::Notification(n) = parsed {
let _ = notif_tx.try_send(n);
}
}
}
Ok(other) => {
// Batch responses and requests are currently not
@@ -183,6 +194,7 @@ impl McpClient {
outgoing_tx,
pending,
id_counter: AtomicI64::new(1),
notifications_rx: Mutex::new(Some(notif_rx)),
})
}
@@ -349,6 +361,11 @@ impl McpClient {
self.send_request::<CallToolRequest>(params, timeout).await
}
/// Take the notifications receiver (only once). Returns None if already taken.
pub async fn take_notification_receiver(&self) -> Option<mpsc::Receiver<JSONRPCNotification>> {
self.notifications_rx.lock().await.take()
}
/// Internal helper: route a JSON-RPC *response* object to the pending map.
async fn dispatch_response(
resp: JSONRPCResponse,