Roll up parent Agent sessions cleanly

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Iliyan Malchev
2026-03-20 14:15:32 -07:00
parent d0ae5e2da2
commit 0a251f0dac
4 changed files with 331 additions and 17 deletions

View File

@@ -0,0 +1,121 @@
package com.openai.codexd
import android.app.agent.AgentSessionInfo
object AgentSessionStateValues {
const val CREATED = AgentSessionInfo.STATE_CREATED
const val RUNNING = AgentSessionInfo.STATE_RUNNING
const val WAITING_FOR_USER = AgentSessionInfo.STATE_WAITING_FOR_USER
const val COMPLETED = AgentSessionInfo.STATE_COMPLETED
const val CANCELLED = AgentSessionInfo.STATE_CANCELLED
const val FAILED = AgentSessionInfo.STATE_FAILED
const val QUEUED = AgentSessionInfo.STATE_QUEUED
}
data class ParentSessionChildSummary(
val sessionId: String,
val targetPackage: String?,
val state: Int,
val latestResult: String?,
val latestError: String?,
)
data class ParentSessionRollup(
val state: Int,
val resultMessage: String?,
val errorMessage: String?,
)
object AgentParentSessionAggregator {
fun rollup(childSessions: List<ParentSessionChildSummary>): ParentSessionRollup {
val state = computeParentState(childSessions.map(ParentSessionChildSummary::state))
return when (state) {
AgentSessionInfo.STATE_COMPLETED -> ParentSessionRollup(
state = state,
resultMessage = buildParentResult(childSessions),
errorMessage = null,
)
AgentSessionInfo.STATE_FAILED -> ParentSessionRollup(
state = state,
resultMessage = null,
errorMessage = buildParentError(childSessions),
)
else -> ParentSessionRollup(
state = state,
resultMessage = null,
errorMessage = null,
)
}
}
private fun computeParentState(childStates: List<Int>): Int {
var anyWaiting = false
var anyRunning = false
var anyQueued = false
var anyFailed = false
var anyCancelled = false
var anyCompleted = false
childStates.forEach { state ->
when (state) {
AgentSessionInfo.STATE_WAITING_FOR_USER -> anyWaiting = true
AgentSessionInfo.STATE_RUNNING -> anyRunning = true
AgentSessionInfo.STATE_QUEUED -> anyQueued = true
AgentSessionInfo.STATE_FAILED -> anyFailed = true
AgentSessionInfo.STATE_CANCELLED -> anyCancelled = true
AgentSessionInfo.STATE_COMPLETED -> anyCompleted = true
}
}
return when {
anyWaiting -> AgentSessionInfo.STATE_WAITING_FOR_USER
anyRunning || anyQueued -> AgentSessionInfo.STATE_RUNNING
anyFailed -> AgentSessionInfo.STATE_FAILED
anyCompleted -> AgentSessionInfo.STATE_COMPLETED
anyCancelled -> AgentSessionInfo.STATE_CANCELLED
else -> AgentSessionInfo.STATE_CREATED
}
}
private fun buildParentResult(childSessions: List<ParentSessionChildSummary>): String {
return buildString {
append("Completed delegated session")
childSessions.forEach { childSession ->
append("; ")
append(childSession.targetPackage ?: childSession.sessionId)
append(": ")
append(
childSession.latestResult
?: childSession.latestError
?: stateToString(childSession.state),
)
}
}
}
private fun buildParentError(childSessions: List<ParentSessionChildSummary>): String {
return buildString {
append("Delegated session failed")
childSessions.forEach { childSession ->
if (childSession.state != AgentSessionInfo.STATE_FAILED) {
return@forEach
}
append("; ")
append(childSession.targetPackage ?: childSession.sessionId)
append(": ")
append(childSession.latestError ?: stateToString(childSession.state))
}
}
}
private fun stateToString(state: Int): String {
return when (state) {
AgentSessionInfo.STATE_CREATED -> "CREATED"
AgentSessionInfo.STATE_RUNNING -> "RUNNING"
AgentSessionInfo.STATE_WAITING_FOR_USER -> "WAITING_FOR_USER"
AgentSessionInfo.STATE_QUEUED -> "QUEUED"
AgentSessionInfo.STATE_COMPLETED -> "COMPLETED"
AgentSessionInfo.STATE_CANCELLED -> "CANCELLED"
AgentSessionInfo.STATE_FAILED -> "FAILED"
else -> state.toString()
}
}
}

View File

@@ -4,6 +4,7 @@ import android.app.agent.AgentManager
import android.app.agent.AgentService
import android.app.agent.AgentSessionEvent
import android.app.agent.AgentSessionInfo
import android.os.Process
import android.util.Log
import java.io.IOException
import kotlin.concurrent.thread
@@ -24,6 +25,7 @@ class CodexAgentService : AgentService() {
private val pendingGenieQuestions = java.util.concurrent.ConcurrentHashMap.newKeySet<String>()
private val pendingQuestionLoads = java.util.concurrent.ConcurrentHashMap.newKeySet<String>()
private val handledBridgeRequests = java.util.concurrent.ConcurrentHashMap.newKeySet<String>()
private val pendingParentRollups = java.util.concurrent.ConcurrentHashMap.newKeySet<String>()
}
private sealed class AutoAnswerResult {
@@ -43,6 +45,7 @@ class CodexAgentService : AgentService() {
override fun onSessionChanged(session: AgentSessionInfo) {
Log.i(TAG, "onSessionChanged $session")
maybeRollUpParentSession(session)
agentManager?.let { manager ->
if (shouldServeSessionBridge(session)) {
AgentSessionBridgeServer.ensureStarted(this, manager, session.sessionId)
@@ -75,6 +78,79 @@ class CodexAgentService : AgentService() {
pendingGenieQuestions.removeIf { it.startsWith("$sessionId:") }
}
private fun maybeRollUpParentSession(session: AgentSessionInfo) {
val parentSessionId = when {
!session.parentSessionId.isNullOrBlank() -> session.parentSessionId
isDirectParentSession(session) -> session.sessionId
else -> null
} ?: return
if (!pendingParentRollups.add(parentSessionId)) {
return
}
thread(name = "CodexAgentParentRollup-$parentSessionId") {
try {
runCatching {
rollUpParentSession(parentSessionId)
}.onFailure { err ->
Log.w(TAG, "Parent session roll-up failed for $parentSessionId", err)
}
} finally {
pendingParentRollups.remove(parentSessionId)
}
}
}
private fun rollUpParentSession(parentSessionId: String) {
val manager = agentManager ?: return
val sessions = manager.getSessions(currentUserId())
val parentSession = sessions.firstOrNull { it.sessionId == parentSessionId } ?: return
if (!isDirectParentSession(parentSession)) {
return
}
val childSessions = sessions.filter { it.parentSessionId == parentSessionId }
if (childSessions.isEmpty()) {
return
}
val rollup = AgentParentSessionAggregator.rollup(
childSessions.map { childSession ->
val events = manager.getSessionEvents(childSession.sessionId)
ParentSessionChildSummary(
sessionId = childSession.sessionId,
targetPackage = childSession.targetPackage,
state = childSession.state,
latestResult = findLastEventMessage(events, AgentSessionEvent.TYPE_RESULT),
latestError = findLastEventMessage(events, AgentSessionEvent.TYPE_ERROR),
)
},
)
if (parentSession.state != rollup.state) {
runCatching {
manager.updateSessionState(parentSessionId, rollup.state)
}.onFailure { err ->
Log.w(TAG, "Failed to update parent session state for $parentSessionId", err)
}
}
val parentEvents = if (rollup.resultMessage != null || rollup.errorMessage != null) {
manager.getSessionEvents(parentSessionId)
} else {
emptyList()
}
if (rollup.resultMessage != null && findLastEventMessage(parentEvents, AgentSessionEvent.TYPE_RESULT) == null) {
runCatching {
manager.publishResult(parentSessionId, rollup.resultMessage)
}.onFailure { err ->
Log.w(TAG, "Failed to publish parent result for $parentSessionId", err)
}
}
if (rollup.errorMessage != null && findLastEventMessage(parentEvents, AgentSessionEvent.TYPE_ERROR) == null) {
runCatching {
manager.publishError(parentSessionId, rollup.errorMessage)
}.onFailure { err ->
Log.w(TAG, "Failed to publish parent error for $parentSessionId", err)
}
}
}
private fun shouldServeSessionBridge(session: AgentSessionInfo): Boolean {
if (session.targetPackage.isNullOrBlank()) {
return false
@@ -260,6 +336,12 @@ class CodexAgentService : AgentService() {
}?.message
}
private fun findLastEventMessage(events: List<AgentSessionEvent>, type: Int): String? {
return events.lastOrNull { event ->
event.type == type && !event.message.isNullOrBlank()
}?.message
}
private fun isBridgeQuestion(question: String): Boolean {
return question.startsWith(BRIDGE_REQUEST_PREFIX)
}
@@ -359,4 +441,14 @@ class CodexAgentService : AgentService() {
}
return "$sessionId:$question"
}
private fun isDirectParentSession(session: AgentSessionInfo): Boolean {
return session.anchor == AgentSessionInfo.ANCHOR_AGENT &&
session.parentSessionId == null &&
session.targetPackage == null
}
private fun currentUserId(): Int {
return Process.myUid() / 100000
}
}

View File

@@ -0,0 +1,84 @@
package com.openai.codexd
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNull
import org.junit.Test
class AgentParentSessionAggregatorTest {
@Test
fun rollupReturnsCompletedSummaryWhenChildrenComplete() {
val rollup = AgentParentSessionAggregator.rollup(
listOf(
ParentSessionChildSummary(
sessionId = "child-1",
targetPackage = "com.android.deskclock",
state = AgentSessionStateValues.COMPLETED,
latestResult = "Alarm set for 2:07 PM.",
latestError = null,
),
),
)
assertEquals(AgentSessionStateValues.COMPLETED, rollup.state)
assertEquals(
"Completed delegated session; com.android.deskclock: Alarm set for 2:07 PM.",
rollup.resultMessage,
)
assertNull(rollup.errorMessage)
}
@Test
fun rollupReturnsWaitingWhenAnyChildWaitsForUser() {
val rollup = AgentParentSessionAggregator.rollup(
listOf(
ParentSessionChildSummary(
sessionId = "child-1",
targetPackage = "com.android.deskclock",
state = AgentSessionStateValues.WAITING_FOR_USER,
latestResult = null,
latestError = null,
),
ParentSessionChildSummary(
sessionId = "child-2",
targetPackage = "com.android.settings",
state = AgentSessionStateValues.COMPLETED,
latestResult = "Completed task.",
latestError = null,
),
),
)
assertEquals(AgentSessionStateValues.WAITING_FOR_USER, rollup.state)
assertNull(rollup.resultMessage)
assertNull(rollup.errorMessage)
}
@Test
fun rollupReturnsFailedSummaryWhenAnyChildFails() {
val rollup = AgentParentSessionAggregator.rollup(
listOf(
ParentSessionChildSummary(
sessionId = "child-1",
targetPackage = "com.android.deskclock",
state = AgentSessionStateValues.FAILED,
latestResult = null,
latestError = "Permission denied.",
),
ParentSessionChildSummary(
sessionId = "child-2",
targetPackage = "com.android.settings",
state = AgentSessionStateValues.COMPLETED,
latestResult = "Completed task.",
latestError = null,
),
),
)
assertEquals(AgentSessionStateValues.FAILED, rollup.state)
assertNull(rollup.resultMessage)
assertEquals(
"Delegated session failed; com.android.deskclock: Permission denied.",
rollup.errorMessage,
)
}
}

View File

@@ -9,6 +9,7 @@ import java.io.BufferedWriter
import java.io.Closeable
import java.io.File
import java.io.IOException
import java.io.InterruptedIOException
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
@@ -107,17 +108,25 @@ class CodexAppServerHost(
private fun startStdoutPump() {
stdoutThread = Thread {
process.inputStream.bufferedReader().useLines { lines ->
lines.forEach { line ->
if (line.isBlank()) {
return@forEach
}
val message = runCatching { JSONObject(line) }
.getOrElse { err ->
Log.w(TAG, "Failed to parse codex app-server stdout line", err)
try {
process.inputStream.bufferedReader().useLines { lines ->
lines.forEach { line ->
if (line.isBlank()) {
return@forEach
}
routeInbound(message)
val message = runCatching { JSONObject(line) }
.getOrElse { err ->
Log.w(TAG, "Failed to parse codex app-server stdout line", err)
return@forEach
}
routeInbound(message)
}
}
} catch (_: InterruptedIOException) {
// Expected when the hosted app-server exits and the stream closes underneath the reader.
} catch (err: IOException) {
if (!control.cancelled && process.isAlive) {
Log.w(TAG, "Stdout pump failed for ${request.sessionId}", err)
}
}
}.also {
@@ -128,16 +137,24 @@ class CodexAppServerHost(
private fun startStderrPump() {
stderrThread = Thread {
process.errorStream.bufferedReader().useLines { lines ->
lines.forEach { line ->
if (line.isBlank()) {
return@forEach
}
when {
line.contains(" ERROR ") -> Log.e(TAG, line)
line.contains(" WARN ") || line.startsWith("WARNING:") -> Log.w(TAG, line)
try {
process.errorStream.bufferedReader().useLines { lines ->
lines.forEach { line ->
if (line.isBlank()) {
return@forEach
}
when {
line.contains(" ERROR ") -> Log.e(TAG, line)
line.contains(" WARN ") || line.startsWith("WARNING:") -> Log.w(TAG, line)
}
}
}
} catch (_: InterruptedIOException) {
// Expected when the hosted app-server exits and the stream closes underneath the reader.
} catch (err: IOException) {
if (!control.cancelled && process.isAlive) {
Log.w(TAG, "Stderr pump failed for ${request.sessionId}", err)
}
}
}.also {
it.name = "CodexAppServerStderr-${request.sessionId}"