Make Agent runtime status event-driven

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Iliyan Malchev
2026-03-19 23:45:58 -07:00
parent b124bf5170
commit 85f88c85c9
2 changed files with 97 additions and 25 deletions

View File

@@ -7,9 +7,12 @@ import java.io.BufferedWriter
import java.io.File
import java.io.IOException
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArraySet
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.thread
import org.json.JSONArray
import org.json.JSONObject
@@ -17,6 +20,7 @@ object AgentCodexAppServerClient {
private const val TAG = "AgentCodexClient"
private const val REQUEST_TIMEOUT_MS = 30_000L
private const val DEFAULT_AGENT_MODEL = "gpt-5.3-codex"
private const val AGENT_APP_SERVER_RUST_LOG = "warn"
data class RuntimeStatus(
val authenticated: Boolean,
@@ -28,11 +32,16 @@ object AgentCodexAppServerClient {
val upstreamBaseUrl: String,
)
fun interface RuntimeStatusListener {
fun onRuntimeStatusChanged(status: RuntimeStatus?)
}
private val lifecycleLock = Any()
private val requestIdSequence = AtomicInteger(1)
private val activeRequests = AtomicInteger(0)
private val pendingResponses = ConcurrentHashMap<String, LinkedBlockingQueue<JSONObject>>()
private val notifications = LinkedBlockingQueue<JSONObject>()
private val runtimeStatusListeners = CopyOnWriteArraySet<RuntimeStatusListener>()
private var process: Process? = null
private var writer: BufferedWriter? = null
@@ -40,6 +49,40 @@ object AgentCodexAppServerClient {
private var stderrThread: Thread? = null
private var localProxy: AgentLocalCodexProxy? = null
private var initialized = false
@Volatile
private var cachedRuntimeStatus: RuntimeStatus? = null
private val runtimeStatusRefreshInFlight = AtomicBoolean(false)
fun currentRuntimeStatus(): RuntimeStatus? = cachedRuntimeStatus
fun registerRuntimeStatusListener(listener: RuntimeStatusListener) {
runtimeStatusListeners += listener
listener.onRuntimeStatusChanged(cachedRuntimeStatus)
}
fun unregisterRuntimeStatusListener(listener: RuntimeStatusListener) {
runtimeStatusListeners -= listener
}
fun refreshRuntimeStatusAsync(
context: Context,
refreshToken: Boolean = false,
) {
if (!runtimeStatusRefreshInFlight.compareAndSet(false, true)) {
return
}
thread(name = "AgentRuntimeStatusRefresh") {
try {
runCatching {
readRuntimeStatus(context, refreshToken)
}.onFailure {
updateCachedRuntimeStatus(null)
}
} finally {
runtimeStatusRefreshInFlight.set(false)
}
}
}
fun requestText(
context: Context,
@@ -52,6 +95,7 @@ object AgentCodexAppServerClient {
): String = synchronized(lifecycleLock) {
ensureStarted(context.applicationContext)
activeRequests.incrementAndGet()
updateClientCount()
try {
Log.i(
TAG,
@@ -73,6 +117,7 @@ object AgentCodexAppServerClient {
}
} finally {
activeRequests.decrementAndGet()
updateClientCount()
}
}
@@ -82,6 +127,7 @@ object AgentCodexAppServerClient {
): RuntimeStatus = synchronized(lifecycleLock) {
ensureStarted(context.applicationContext)
activeRequests.incrementAndGet()
updateClientCount()
try {
val accountResponse = request(
method = "account/read",
@@ -91,9 +137,10 @@ object AgentCodexAppServerClient {
method = "config/read",
params = JSONObject().put("includeLayers", false),
)
parseRuntimeStatus(accountResponse, configResponse)
parseRuntimeStatus(accountResponse, configResponse).also(::updateCachedRuntimeStatus)
} finally {
activeRequests.decrementAndGet()
updateClientCount()
}
}
@@ -122,7 +169,7 @@ object AgentCodexAppServerClient {
),
).apply {
environment()["CODEX_HOME"] = codexHome.absolutePath
environment()["RUST_LOG"] = "info"
environment()["RUST_LOG"] = AGENT_APP_SERVER_RUST_LOG
}.start()
process = startedProcess
writer = startedProcess.outputStream.bufferedWriter()
@@ -142,6 +189,7 @@ object AgentCodexAppServerClient {
process?.destroy()
process = null
initialized = false
updateCachedRuntimeStatus(null)
}
private fun initialize() {
@@ -457,9 +505,7 @@ object AgentCodexAppServerClient {
stderrThread = Thread {
process.errorStream.bufferedReader().useLines { lines ->
lines.forEach { line ->
if (line.isNotBlank()) {
Log.i(TAG, line)
}
logAgentStderrLine(line)
}
}
}.also {
@@ -480,10 +526,41 @@ object AgentCodexAppServerClient {
val activeProcess = process ?: throw IOException("Agent app-server unavailable")
if (!activeProcess.isAlive) {
initialized = false
updateCachedRuntimeStatus(null)
throw IOException("Agent app-server exited with code ${activeProcess.exitValue()}")
}
}
private fun logAgentStderrLine(line: String) {
if (line.isBlank()) {
return
}
when {
line.contains(" ERROR ") || line.startsWith("ERROR") -> Log.e(TAG, line)
line.contains(" WARN ") || line.startsWith("WARN") -> Log.w(TAG, line)
}
}
private fun updateClientCount() {
val currentStatus = cachedRuntimeStatus ?: return
val updatedStatus = currentStatus.copy(clientCount = activeRequests.get())
updateCachedRuntimeStatus(updatedStatus)
}
private fun updateCachedRuntimeStatus(status: RuntimeStatus?) {
if (cachedRuntimeStatus == status) {
return
}
cachedRuntimeStatus = status
runtimeStatusListeners.forEach { listener ->
runCatching {
listener.onRuntimeStatusChanged(status)
}.onFailure { err ->
Log.w(TAG, "Runtime status listener failed", err)
}
}
}
private fun parseRuntimeStatus(
accountResponse: JSONObject,
configResponse: JSONObject,

View File

@@ -50,16 +50,19 @@ class MainActivity : Activity() {
@Volatile
private var agentRefreshInFlight = false
@Volatile
private var agentRuntimeRefreshInFlight = false
@Volatile
private var latestAgentRuntimeStatus: AgentCodexAppServerClient.RuntimeStatus? = null
private val refreshHandler = Handler(Looper.getMainLooper())
private val agentSessionController by lazy { AgentSessionController(this) }
private val sessionUiLeaseToken = Binder()
private val runtimeStatusListener = AgentCodexAppServerClient.RuntimeStatusListener { status ->
latestAgentRuntimeStatus = status
runOnUiThread {
findViewById<TextView>(R.id.agent_runtime_status).text = renderAgentRuntimeStatus()
}
}
private val refreshRunnable = object : Runnable {
override fun run() {
refreshAgentRuntimeStatus()
refreshAuthStatus()
refreshAgentSessions()
refreshHandler.postDelayed(this, STATUS_REFRESH_INTERVAL_MS)
@@ -105,12 +108,15 @@ class MainActivity : Activity() {
override fun onResume() {
super.onResume()
registerSessionListenerIfNeeded()
AgentCodexAppServerClient.registerRuntimeStatusListener(runtimeStatusListener)
AgentCodexAppServerClient.refreshRuntimeStatusAsync(this)
refreshHandler.removeCallbacks(refreshRunnable)
refreshHandler.post(refreshRunnable)
}
override fun onPause() {
refreshHandler.removeCallbacks(refreshRunnable)
AgentCodexAppServerClient.unregisterRuntimeStatusListener(runtimeStatusListener)
unregisterSessionListenerIfNeeded()
updateSessionUiLease(null)
super.onPause()
@@ -120,7 +126,7 @@ class MainActivity : Activity() {
findViewById<TextView>(R.id.socket_path).text = defaultSocketPath()
findViewById<TextView>(R.id.codex_home).text = defaultCodexHome()
isServiceRunning = false
latestAgentRuntimeStatus = null
latestAgentRuntimeStatus = AgentCodexAppServerClient.currentRuntimeStatus()
updateAuthUi("Codexd status: unknown", false, null, emptyList())
updateAgentUi(AgentSnapshot.unavailable)
}
@@ -369,6 +375,7 @@ class MainActivity : Activity() {
startService(intent)
isServiceRunning = false
updateAuthUi("Codexd status: stopping service...", false, 0, emptyList())
AgentCodexAppServerClient.refreshRuntimeStatusAsync(this)
return
}
@@ -377,6 +384,7 @@ class MainActivity : Activity() {
isServiceRunning = true
updateAuthUi("Codexd status: starting service...", isAuthenticated, null, emptyList())
refreshAuthStatus()
AgentCodexAppServerClient.refreshRuntimeStatusAsync(this)
}
fun authAction(@Suppress("UNUSED_PARAMETER") view: View) {
@@ -407,6 +415,7 @@ class MainActivity : Activity() {
when (deviceResponse.status) {
"already_authenticated" -> {
updateAuthUi("Codexd status: already authenticated", true, null, emptyList())
AgentCodexAppServerClient.refreshRuntimeStatusAsync(this, refreshToken = true)
showToast("Already signed in")
}
"pending", "in_progress" -> {
@@ -447,6 +456,7 @@ class MainActivity : Activity() {
result.onSuccess {
showToast("Signed out")
refreshAuthStatus()
AgentCodexAppServerClient.refreshRuntimeStatusAsync(this)
}
}
}
@@ -482,22 +492,6 @@ class MainActivity : Activity() {
}
}
private fun refreshAgentRuntimeStatus() {
if (agentRuntimeRefreshInFlight) {
return
}
agentRuntimeRefreshInFlight = true
thread {
latestAgentRuntimeStatus = runCatching {
AgentCodexAppServerClient.readRuntimeStatus(this)
}.getOrNull()
runOnUiThread {
findViewById<TextView>(R.id.agent_runtime_status).text = renderAgentRuntimeStatus()
}
agentRuntimeRefreshInFlight = false
}
}
private fun refreshAgentSessions(force: Boolean = false) {
if (!force && agentRefreshInFlight) {
return
@@ -532,6 +526,7 @@ class MainActivity : Activity() {
status.clientCount,
status.clients,
)
AgentCodexAppServerClient.refreshRuntimeStatusAsync(this, refreshToken = true)
showToast("Signed in")
return
}