Compare commits

...

1 Commits

Author SHA1 Message Date
starr-openai
7b230fcf6c chore(skills): add rust-analyzer lsp timing skill sample
Co-authored-by: Codex <noreply@openai.com>
2026-03-21 02:09:02 +00:00
4 changed files with 1071 additions and 0 deletions

View File

@@ -0,0 +1,108 @@
---
name: rust-analyzer-lsp-timing
description: Benchmark end-to-end Rust edit feedback loops using a long-lived rust-analyzer process (LSP) and record diagnostic latency.
---
# Rust Analyzer LSP Timing
Use this skill when you want:
- one long-lived rust-analyzer process for iterative edits
- cheap verification after each change (no full `cargo test`)
- measured latency from edit event to first diagnostic update
## What it does
The script runs rust-analyzer once, keeps it warm, and reports per-change timing:
- change kind (`whitespace`, `comment`, `code`, or custom label)
- elapsed ms from `didChange` to first `textDocument/publishDiagnostics`
- diagnostic count after that change
- CSV log file for trend comparison
## Setup
1. Ensure rust-analyzer is available:
- `rustup component add rust-analyzer`
2. Open the target worktree checkout (for example `/home/dev-user/code/codex-lsp`).
3. Use a small interactive file to pilot realistic edit types:
- comment-only change
- whitespace-only change
- code change
- larger refactor change
## Core script
From your codex root, run:
```bash
python ~/.codex/skills/rust-analyzer-lsp-timing/scripts/ra_lsp_timing.py \
/home/dev-user/code/codex-lsp \
codex-rs/core/src/tools/orchestrator.rs \
--iterations 0 \
--log /tmp/ra-lsp-timing.csv
```
`--iterations 0` means run until you stop with `q`.
Workflow:
1. Make a manual edit in the target file.
2. Return to terminal and press Enter.
3. (Optional) enter a label for this edit; press Enter for automatic type inference.
4. The script sends the edit over the same rust-analyzer session and logs timing.
## Useful run modes
- **Quick ad-hoc sweep**
- Run one file through 10 timed edits: set `--iterations 10`.
- **Manual categories**
- Keep labels consistent: `comment`, `whitespace`, `code` for cleaner comparison.
- **Baseline check**
- Add `--install-ra` first if rust-analyzer is missing.
## Notes
- This benchmark is best-effort LSP-level feedback latency and not a replacement for targeted `cargo check` on release-critical changes.
- This does not require editor integration; it talks to rust-analyzer directly over LSP stdin/stdout.
## New: rust-analyzer daemon mode
Use this when you want a long-lived, reconnectable RA process you can reuse from quick checks.
### Files
- `~/.codex/skills/rust-analyzer-lsp-timing/scripts/ra_lsp_daemon.py`
- `~/.codex/skills/rust-analyzer-lsp-timing/scripts/ra_lsp_client.py`
### Client flow (recommended)
```bash
python ~/.codex/skills/rust-analyzer-lsp-timing/scripts/ra_lsp_client.py \
--workspace /home/dev-user/code/codex-lsp \
--file codex-rs/core/src/tools/orchestrator.rs \
--action check
```
This command auto-starts the daemon if it is not already running.
### Query state
```bash
python ~/.codex/skills/rust-analyzer-lsp-timing/scripts/ra_lsp_client.py \
--workspace /home/dev-user/code/codex-lsp \
--action state
```
### Stop daemon
```bash
python ~/.codex/skills/rust-analyzer-lsp-timing/scripts/ra_lsp_client.py \
--workspace /home/dev-user/code/codex-lsp \
--action stop
```
### What to expect
- The first check after a file edit starts a diagnostics pass.
- Repeated edits are much faster than restarting RA each time because the same process is reused.

View File

@@ -0,0 +1,191 @@
#!/usr/bin/env python3
"""Client for the rust-analyzer LSP daemon."""
from __future__ import annotations
import argparse
import json
import hashlib
import os
import socket
import subprocess
import sys
import time
from pathlib import Path
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Interact with the RA daemon")
parser.add_argument("--workspace", required=True, help="Workspace root")
parser.add_argument("--socket", default="", help="UNIX socket path (optional)")
parser.add_argument(
"--action",
choices=["check", "state", "start", "stop"],
default="check",
)
parser.add_argument(
"--file",
default="",
help="Rust file relative to workspace (required for check)",
)
parser.add_argument("--label", default="", help="Optional label for check event")
parser.add_argument("--timeout", type=float, default=45.0, help="Per-change timeout in seconds")
parser.add_argument(
"--install-ra",
action="store_true",
help="Install rust-analyzer if missing when starting daemon",
)
parser.add_argument(
"--no-auto-start",
action="store_true",
help="Do not auto-start daemon if socket is missing",
)
parser.add_argument("--json", action="store_true", help="Output raw JSON only")
return parser.parse_args()
def make_socket_path(workspace: Path) -> Path:
key = hashlib.sha1(str(workspace.resolve()).encode("utf-8")).hexdigest()[:16]
return Path("/tmp") / f"ra-lsp-daemon-{key}.sock"
def daemon_script_path() -> Path:
return Path(__file__).resolve().parent / "ra_lsp_daemon.py"
def send_request(socket_path: Path, request: dict, timeout: float = 2.0) -> dict:
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
client.settimeout(timeout)
client.connect(str(socket_path))
payload = (json.dumps(request) + "\n").encode("utf-8")
client.sendall(payload)
response = b""
while True:
chunk = client.recv(4096)
if not chunk:
break
response += chunk
if response.endswith(b"\n"):
break
if not response:
raise RuntimeError("daemon returned no response")
return json.loads(response.decode("utf-8").strip())
def ensure_daemon(workspace: Path, socket_path: Path, timeout: float, install_ra: bool) -> None:
# Cheap health probe.
try:
resp = send_request(socket_path, {"action": "ping", "workspace": str(workspace)})
if resp.get("ok"):
return
except Exception:
pass
# Auto-start one daemon for this workspace.
daemon_proc = subprocess.Popen(
[
sys.executable,
str(daemon_script_path()),
"--workspace",
str(workspace),
"--socket",
str(socket_path),
"--timeout",
str(timeout),
]
+ (["--install-ra"] if install_ra else []),
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True,
)
# Wait for readiness.
for _ in range(40):
time.sleep(0.1)
try:
resp = send_request(socket_path, {"action": "ping", "workspace": str(workspace)})
if resp.get("ok"):
return
except Exception:
continue
raise RuntimeError("daemon did not become ready in time")
def main() -> int:
args = parse_args()
workspace = Path(args.workspace).expanduser().resolve()
if args.socket:
socket_path = Path(args.socket).expanduser()
else:
socket_path = make_socket_path(workspace)
if args.action == "start":
ensure_daemon(workspace, socket_path, args.timeout, args.install_ra)
if args.json:
print(json.dumps({"ok": True, "socket": str(socket_path)}))
else:
print(f"daemon ready on {socket_path}")
return 0
if not args.no_auto_start:
ensure_daemon(workspace, socket_path, args.timeout, args.install_ra)
else:
try:
send_request(socket_path, {"action": "ping", "workspace": str(workspace)})
except Exception as exc:
raise RuntimeError(f"daemon unavailable: {exc}")
if args.action == "state":
response = send_request(socket_path, {"action": "state", "workspace": str(workspace)})
if args.json:
print(json.dumps(response))
else:
state = response.get("state", {})
print("state:")
for key in ("workspace", "socket", "uptime_seconds", "requests", "ra_pid"):
print(f" {key}: {state.get(key)}")
if state.get("open_files"):
print(" open_files:")
for file in state["open_files"]:
print(f" - {file}")
return 0
if args.action == "check":
if not args.file:
raise RuntimeError("--file is required for action check")
req = {
"action": "check",
"workspace": str(workspace),
"file": args.file,
"label": args.label or None,
"timeout": args.timeout,
}
response = send_request(socket_path, req, timeout=max(0.5, min(120.0, args.timeout + 1)))
if args.json:
print(json.dumps(response))
return 0
if not response.get("ok"):
print(f"check failed: {response.get('error')}")
return 1
print(
f"{response.get('file')}: changed={response.get('changed')} "
f"elapsed_ms={response.get('elapsed_ms'):.2f} "
f"diagnostics={response.get('diagnostic_count')} cached={response.get('cached')}"
)
return 0
if args.action == "stop":
resp = send_request(socket_path, {"action": "stop", "workspace": str(workspace)})
if args.json:
print(json.dumps(resp))
else:
print("stop:", "ok" if resp.get("ok") else "failed")
return 0
raise RuntimeError("unknown action")
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,439 @@
#!/usr/bin/env python3
"""Persistent rust-analyzer daemon with a tiny JSON-over-UNIX-socket control API."""
from __future__ import annotations
import argparse
import json
import os
import re
import hashlib
import select
import socket
import signal
import subprocess
import threading
import time
from pathlib import Path
from typing import Dict, Optional
def find_rust_analyzer(install_if_missing: bool) -> str:
rustup_path = subprocess.run(
["rustup", "which", "--toolchain", "stable", "rust-analyzer"],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
timeout=20,
)
if rustup_path.returncode == 0 and rustup_path.stdout.strip():
candidate = rustup_path.stdout.strip()
probe = subprocess.run(
[candidate, "-V"],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
timeout=10,
)
if probe.returncode == 0:
return candidate
for candidate in ["rust-analyzer"]:
proc = subprocess.run(
[candidate, "-V"],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
timeout=10,
)
if proc.returncode == 0:
return candidate
if not install_if_missing:
raise RuntimeError(
"rust-analyzer not runnable. Re-run with --install-ra or install with "
"`rustup component add rust-analyzer`."
)
install = subprocess.run(["rustup", "component", "add", "rust-analyzer"], check=False)
if install.returncode != 0:
raise RuntimeError("failed to install rust-analyzer.")
probe = subprocess.run(
["rust-analyzer", "-V"],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
timeout=10,
)
if probe.returncode != 0:
raise RuntimeError("rust-analyzer still not runnable after install.")
return "rust-analyzer"
def to_uri(path: Path) -> str:
return path.resolve().as_uri()
class LspSession:
def __init__(self, ra_bin: str, workspace: Path):
self.proc = subprocess.Popen(
[ra_bin],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
cwd=str(workspace),
text=False,
bufsize=0,
)
if self.proc.stdin is None or self.proc.stdout is None:
raise RuntimeError("Unable to start rust-analyzer stdio pipes.")
self._stdin = self.proc.stdin
self._stdout = self.proc.stdout
self._buffer = b""
self._next_id = 1
def _read_exact(self, n: int, timeout: float) -> bytes:
out = b""
end = time.time() + timeout
while len(out) < n:
remaining = end - time.time()
if remaining <= 0:
raise TimeoutError("timeout while reading rust-analyzer response.")
ready, _, _ = select.select([self._stdout], [], [], remaining)
if not ready:
raise TimeoutError("timeout while reading rust-analyzer response.")
chunk = os.read(self._stdout.fileno(), 65536)
if not chunk:
raise EOFError("rust-analyzer closed stdout.")
out += chunk
return out
def recv(self, timeout: float) -> Dict:
end = time.time() + timeout
header_end = b"\r\n\r\n"
while header_end not in self._buffer:
self._buffer += self._read_exact(1, max(0.1, end - time.time()))
header, rest = self._buffer.split(header_end, 1)
self._buffer = rest
match = re.search(rb"Content-Length:\s*(\d+)", header, flags=re.IGNORECASE)
if not match:
raise RuntimeError(f"malformed RA header: {header!r}")
body_len = int(match.group(1))
body = self._buffer
if len(body) < body_len:
body += self._read_exact(body_len - len(body), max(0.1, end - time.time()))
self._buffer = b""
else:
self._buffer = body[body_len:]
body = body[:body_len]
return json.loads(body.decode("utf-8"))
def send(self, message: Dict) -> None:
payload = json.dumps(message).encode("utf-8")
header = f"Content-Length: {len(payload)}\r\n\r\n".encode("ascii")
self._stdin.write(header + payload)
self._stdin.flush()
def request(self, method: str, params: Dict, timeout: float = 120.0) -> Dict:
req_id = self._next_id
self._next_id += 1
self.send(
{
"jsonrpc": "2.0",
"id": req_id,
"method": method,
"params": params,
}
)
while True:
message = self.recv(timeout)
if message.get("id") == req_id:
return message
def notify(self, method: str, params: Dict) -> None:
self.send({"jsonrpc": "2.0", "method": method, "params": params})
def wait_for_file_diagnostics(self, file_uri: str, timeout: float) -> int:
deadline = time.time() + timeout
while True:
msg = self.recv(max(0.05, deadline - time.time()))
if msg.get("method") == "textDocument/publishDiagnostics":
params = msg.get("params", {})
if params.get("uri") == file_uri:
return len(params.get("diagnostics", []))
if time.time() >= deadline:
raise TimeoutError("timed out waiting for diagnostics.")
def close(self) -> None:
if self.proc.poll() is None:
self.proc.terminate()
self.proc.wait(timeout=2)
class RaDaemon:
def __init__(self, workspace: Path, socket_path: Path, timeout: float, install_ra: bool):
self.workspace = workspace
self.socket_path = socket_path
self.timeout = timeout
self.started_at = time.time()
self._state_lock = threading.Lock()
self._stop = threading.Event()
self._requests = 0
self._file_state: Dict[str, Dict[str, object]] = {}
self._session = LspSession(find_rust_analyzer(install_ra), workspace)
self._initialize_lsp()
def _initialize_lsp(self) -> None:
req = self._session.request(
"initialize",
{
"processId": os.getpid(),
"rootUri": to_uri(self.workspace),
"rootPath": str(self.workspace),
"capabilities": {},
"workspaceFolders": [{"uri": to_uri(self.workspace), "name": self.workspace.name}],
},
timeout=self.timeout,
)
if req.get("error"):
raise RuntimeError(f"initialize error: {req['error']!r}")
self._session.notify("initialized", {})
def check_file(self, file_path: Path, label: Optional[str]) -> Dict:
file_path = file_path.expanduser().resolve()
if self.workspace not in file_path.parents and file_path.parent != self.workspace:
return {"ok": False, "error": "file is outside workspace"}
file_uri = to_uri(file_path)
current_text = file_path.read_text(encoding="utf-8")
with self._state_lock:
self._requests += 1
state = self._file_state.get(file_uri)
previous_text = ""
version = 1
if state:
previous_text = state.get("text", "")
version = int(state.get("version", 1)) + 1
if state is None:
self._session.notify(
"textDocument/didOpen",
{
"textDocument": {
"uri": file_uri,
"languageId": "rust",
"version": version,
"text": current_text,
}
},
)
self._file_state[file_uri] = {
"text": current_text,
"version": version,
"diagnostics": 0,
}
changed = True
else:
changed = current_text != previous_text
if changed:
self._session.notify(
"textDocument/didChange",
{
"textDocument": {"uri": file_uri, "version": version},
"contentChanges": [{"text": current_text}],
},
)
self._file_state[file_uri]["version"] = version
self._file_state[file_uri]["text"] = current_text
if not changed:
return {
"ok": True,
"file": file_uri,
"changed": False,
"elapsed_ms": 0.0,
"diagnostic_count": int(state["diagnostics"]) if state else 0,
"label": label,
"cached": True,
}
start = time.perf_counter()
diag_count = self._session.wait_for_file_diagnostics(file_uri, self.timeout)
elapsed_ms = (time.perf_counter() - start) * 1000.0
with self._state_lock:
self._file_state[file_uri]["diagnostics"] = diag_count
return {
"ok": True,
"file": file_uri,
"changed": True,
"elapsed_ms": elapsed_ms,
"diagnostic_count": int(diag_count),
"label": label,
"cached": False,
}
def request_stop(self) -> None:
self._stop.set()
def state(self) -> Dict:
with self._state_lock:
return {
"workspace": str(self.workspace),
"socket": str(self.socket_path),
"uptime_seconds": round(time.time() - self.started_at, 3),
"requests": self._requests,
"open_files": sorted(self._file_state.keys()),
"ra_pid": self._session.proc.pid,
}
def should_stop(self) -> bool:
return self._stop.is_set()
def close(self):
self._session.close()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Rust-analyzer daemon")
parser.add_argument("--workspace", required=True, help="Workspace root")
parser.add_argument(
"--socket",
required=True,
help="UNIX socket path for daemon control channel",
)
parser.add_argument("--timeout", type=float, default=45.0, help="LSP timeout seconds")
parser.add_argument(
"--install-ra",
action="store_true",
help="Install rust-analyzer if missing",
)
return parser.parse_args()
def make_socket_path(workspace: Path) -> Path:
key = hashlib.sha1(str(workspace.resolve()).encode("utf-8")).hexdigest()[:16]
return Path("/tmp") / f"ra-lsp-daemon-{key}.sock"
def parse_request(raw: str) -> Dict:
req = json.loads(raw)
if not isinstance(req, dict):
raise ValueError("request must be a JSON object")
return req
def handle_client(conn: socket.socket, daemon: RaDaemon) -> None:
with conn:
try:
conn.settimeout(2.0)
raw = b""
while not raw.endswith(b"\n"):
chunk = conn.recv(4096)
if not chunk:
break
raw += chunk
if not raw:
return
request = parse_request(raw.decode("utf-8").strip())
action = request.get("action")
req_id = request.get("id")
response = {"id": req_id}
if action == "ping":
response.update(
{
"ok": True,
"state": {
"workspace": str(daemon.workspace),
"socket": str(daemon.socket_path),
},
}
)
elif action == "state":
response.update({"ok": True, "state": daemon.state()})
elif action == "check":
if "file" not in request:
response.update({"ok": False, "error": "missing file field"})
else:
workspace = Path(request["workspace"])
workspace = workspace.expanduser().resolve()
file_path = (workspace / request["file"]).resolve()
response.update(
daemon.check_file(
file_path,
request.get("label"),
)
)
elif action == "stop":
response.update({"ok": True})
daemon.request_stop()
else:
response.update({"ok": False, "error": f"unknown action: {action}"})
conn.sendall((json.dumps(response) + "\n").encode("utf-8"))
except Exception as exc:
try:
err = {
"id": req_id,
"ok": False,
"error": repr(exc),
}
conn.sendall((json.dumps(err) + "\n").encode("utf-8"))
except Exception:
pass
def main() -> int:
args = parse_args()
workspace = Path(args.workspace).expanduser().resolve()
socket_path = Path(args.socket).expanduser()
if not workspace.is_dir():
raise RuntimeError(f"workspace does not exist: {workspace}")
if socket_path.exists():
socket_path.unlink()
daemon = RaDaemon(workspace, socket_path, args.timeout, args.install_ra)
server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
server.settimeout(0.5)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(str(socket_path))
server.listen()
def shutdown():
server.close()
daemon.close()
if socket_path.exists():
socket_path.unlink(missing_ok=True)
for sig in (signal.SIGTERM, signal.SIGINT):
signal.signal(sig, lambda *_: shutdown())
print(f"ra-lsp daemon running: workspace={workspace} socket={socket_path} pid={os.getpid()}")
try:
while not daemon.should_stop():
try:
conn, _ = server.accept()
except (OSError, TimeoutError):
if daemon.should_stop():
break
continue
try:
t = threading.Thread(target=handle_client, args=(conn, daemon), daemon=True)
t.start()
except Exception:
conn.close()
finally:
shutdown()
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,333 @@
#!/usr/bin/env python3
"""Benchmark rust-analyzer incremental diagnostic latency in one persistent session."""
from __future__ import annotations
import argparse
import difflib
import json
import os
import re
import select
import subprocess
import time
from pathlib import Path
from typing import Dict
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Time RA diagnostics after edits.")
parser.add_argument("workspace", help="Workspace root (directory containing Cargo.toml)")
parser.add_argument("file", help="Rust file to benchmark")
parser.add_argument("--iterations", type=int, default=0, help="0 means infinite loop")
parser.add_argument(
"--log",
default="",
help="Path to CSV log file (default: /tmp/ra-lsp-timing.csv)",
)
parser.add_argument("--timeout", type=float, default=45.0, help="Per-change timeout in seconds")
parser.add_argument(
"--install-ra",
action="store_true",
help="Install rust-analyzer with rustup when missing",
)
return parser.parse_args()
def classify_change(old: str, new: str) -> str:
if "".join(old.split()) == "".join(new.split()):
return "whitespace"
old_lines = old.splitlines()
new_lines = new.splitlines()
diff = list(
difflib.unified_diff(
old_lines,
new_lines,
fromfile="old",
tofile="new",
lineterm="",
)
)
changed = [
line
for line in diff
if (
(line.startswith("+") and not line.startswith("+++"))
or (line.startswith("-") and not line.startswith("---"))
)
]
if not changed:
return "code"
if all(
line[1:].lstrip().startswith(("//", "/*", "*", "*/")) or not line[1:].strip()
for line in changed
):
return "comment"
return "code"
def find_rust_analyzer(install_if_missing: bool) -> str:
# Prefer the explicit stable toolchain binary even if cwd enforces an older one.
rustup_path = subprocess.run(
["rustup", "which", "--toolchain", "stable", "rust-analyzer"],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
timeout=20,
)
if rustup_path.returncode == 0 and rustup_path.stdout.strip():
candidate = rustup_path.stdout.strip()
probe = subprocess.run(
[candidate, "-V"],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
timeout=10,
)
if probe.returncode == 0:
return candidate
candidates = ["rust-analyzer"]
for candidate in candidates:
proc = subprocess.run(
[candidate, "-V"],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
timeout=10,
)
if proc.returncode == 0:
return candidate
if not install_if_missing:
raise RuntimeError(
"rust-analyzer not runnable. Re-run with --install-ra or install manually "
"with `rustup component add rust-analyzer`."
)
install = subprocess.run(
["rustup", "component", "add", "rust-analyzer"],
check=False,
)
if install.returncode != 0:
raise RuntimeError("Failed to install rust-analyzer via rustup.")
proc = subprocess.run(
["rust-analyzer", "-V"],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
timeout=10,
)
if proc.returncode != 0:
raise RuntimeError("rust-analyzer still not runnable after install.")
return "rust-analyzer"
def to_uri(path: Path) -> str:
return path.resolve().as_uri()
class LspSession:
def __init__(self, ra_bin: str, workspace: Path):
self.proc = subprocess.Popen(
[ra_bin],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
cwd=str(workspace),
text=False,
bufsize=0,
)
if self.proc.stdin is None or self.proc.stdout is None:
raise RuntimeError("Unable to start rust-analyzer stdio pipes.")
self._stdin = self.proc.stdin
self._stdout = self.proc.stdout
self._buffer = b""
self._next_id = 1
def _read_exact(self, n: int, timeout: float) -> bytes:
out = b""
end = time.time() + timeout
while len(out) < n:
remaining = end - time.time()
if remaining <= 0:
raise TimeoutError("Timeout while reading rust-analyzer response.")
ready, _, _ = select.select([self._stdout], [], [], remaining)
if not ready:
raise TimeoutError("Timeout while reading rust-analyzer response.")
chunk = os.read(self._stdout.fileno(), 65536)
if not chunk:
raise EOFError("rust-analyzer closed stdout.")
out += chunk
return out
def recv(self, timeout: float) -> Dict:
end = time.time() + timeout
header_end = b"\r\n\r\n"
while header_end not in self._buffer:
self._buffer += self._read_exact(1, max(0.1, end - time.time()))
header, rest = self._buffer.split(header_end, 1)
self._buffer = rest
match = re.search(rb"Content-Length:\s*(\d+)", header, flags=re.IGNORECASE)
if not match:
raise RuntimeError(f"Malformed RA header: {header!r}")
body_len = int(match.group(1))
body = self._buffer
if len(body) < body_len:
body += self._read_exact(
body_len - len(body), max(0.1, end - time.time())
)
self._buffer = b""
else:
self._buffer = body[body_len:]
body = body[:body_len]
return json.loads(body.decode("utf-8"))
def send(self, message: Dict) -> None:
payload = json.dumps(message).encode("utf-8")
header = f"Content-Length: {len(payload)}\r\n\r\n".encode("ascii")
self._stdin.write(header + payload)
self._stdin.flush()
def request(self, method: str, params: Dict, timeout: float = 120.0) -> Dict:
req_id = self._next_id
self._next_id += 1
self.send(
{
"jsonrpc": "2.0",
"id": req_id,
"method": method,
"params": params,
}
)
while True:
message = self.recv(timeout)
if message.get("id") == req_id:
return message
def notify(self, method: str, params: Dict) -> None:
self.send({"jsonrpc": "2.0", "method": method, "params": params})
def wait_for_file_diagnostics(self, file_uri: str, timeout: float) -> int:
deadline = time.time() + timeout
while True:
msg = self.recv(max(0.05, deadline - time.time()))
if msg.get("method") == "textDocument/publishDiagnostics":
params = msg.get("params", {})
if params.get("uri") == file_uri:
return len(params.get("diagnostics", []))
remaining = deadline - time.time()
if remaining <= 0:
raise TimeoutError("Timed out waiting for diagnostics.")
def close(self) -> None:
if self.proc.poll() is None:
self.proc.terminate()
self.proc.wait(timeout=2)
def main() -> int:
args = parse_args()
workspace = Path(args.workspace).expanduser().resolve()
file_path = workspace / args.file
if not workspace.exists() or not workspace.is_dir():
raise RuntimeError(f"workspace does not exist: {workspace}")
if not file_path.exists():
raise RuntimeError(f"file does not exist: {file_path}")
log_path = Path(args.log or "/tmp/ra-lsp-timing.csv").expanduser()
ra_bin = find_rust_analyzer(args.install_ra)
session = LspSession(ra_bin, workspace)
file_uri = to_uri(file_path)
with log_path.open("a", encoding="utf-8") as log:
if log.tell() == 0:
log.write("ts,round,label,auto_label,elapsed_ms,diagnostic_count\n")
init = session.request(
"initialize",
{
"processId": os.getpid(),
"rootUri": to_uri(workspace),
"rootPath": str(workspace),
"capabilities": {},
"workspaceFolders": [{"uri": to_uri(workspace), "name": workspace.name}],
},
)
if init.get("error"):
raise RuntimeError(f"initialize error: {init['error']!r}")
session.notify("initialized", {})
previous_text = file_path.read_text(encoding="utf-8")
version = 1
session.notify(
"textDocument/didOpen",
{
"textDocument": {
"uri": file_uri,
"languageId": "rust",
"version": version,
"text": previous_text,
}
},
)
print("Initial diagnostics sync...")
count = session.wait_for_file_diagnostics(file_uri, args.timeout)
print(f"initial diagnostics: {count}")
rounds = 0
while args.iterations == 0 or rounds < args.iterations:
prompt = input("Edit target file now, then press Enter (or 'q' to quit): ").strip()
if prompt.lower() in {"q", "quit", "exit"}:
break
current_text = file_path.read_text(encoding="utf-8")
if current_text == previous_text:
print("No change detected; skipping.")
continue
label = input("Label (blank=auto): ").strip()
auto_label = classify_change(previous_text, current_text)
if not label:
label = auto_label
version += 1
session.notify(
"textDocument/didChange",
{
"textDocument": {"uri": file_uri, "version": version},
"contentChanges": [
{"text": current_text},
],
},
)
started = time.time()
diag_count = session.wait_for_file_diagnostics(file_uri, args.timeout)
elapsed_ms = (time.time() - started) * 1000.0
now = time.strftime("%Y-%m-%d %H:%M:%S")
log.write(
f"{now},{rounds + 1},{label},{auto_label},{elapsed_ms:.3f},{diag_count}\n"
)
log.flush()
previous_text = current_text
rounds += 1
print(f"{label} | {auto_label} | {elapsed_ms:.2f} ms | diagnostics: {diag_count}")
session.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())