mirror of
https://github.com/openai/codex.git
synced 2026-04-29 17:06:51 +00:00
Use framework session bridges for Agent transport
Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
@@ -0,0 +1,176 @@
|
||||
package com.openai.codexd
|
||||
|
||||
import android.app.agent.AgentManager
|
||||
import android.content.Context
|
||||
import android.os.ParcelFileDescriptor
|
||||
import android.util.Log
|
||||
import java.io.BufferedInputStream
|
||||
import java.io.BufferedOutputStream
|
||||
import java.io.Closeable
|
||||
import java.io.DataInputStream
|
||||
import java.io.DataOutputStream
|
||||
import java.io.EOFException
|
||||
import java.io.FileInputStream
|
||||
import java.io.FileOutputStream
|
||||
import java.io.IOException
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import kotlin.concurrent.thread
|
||||
import org.json.JSONObject
|
||||
|
||||
object AgentSessionBridgeServer {
|
||||
private val runningBridges = ConcurrentHashMap<String, RunningBridge>()
|
||||
|
||||
fun ensureStarted(
|
||||
context: Context,
|
||||
agentManager: AgentManager,
|
||||
sessionId: String,
|
||||
) {
|
||||
runningBridges.computeIfAbsent(sessionId) {
|
||||
RunningBridge(
|
||||
context = context.applicationContext,
|
||||
agentManager = agentManager,
|
||||
sessionId = sessionId,
|
||||
).also(RunningBridge::start)
|
||||
}
|
||||
}
|
||||
|
||||
fun closeSession(sessionId: String) {
|
||||
runningBridges.remove(sessionId)?.close()
|
||||
}
|
||||
|
||||
private class RunningBridge(
|
||||
private val context: Context,
|
||||
private val agentManager: AgentManager,
|
||||
private val sessionId: String,
|
||||
) : Closeable {
|
||||
companion object {
|
||||
private const val TAG = "AgentSessionBridge"
|
||||
private const val METHOD_GET_RUNTIME_STATUS = "getRuntimeStatus"
|
||||
private const val METHOD_SEND_RESPONSES_REQUEST = "sendResponsesRequest"
|
||||
}
|
||||
|
||||
private val closed = AtomicBoolean(false)
|
||||
private var bridgeFd: ParcelFileDescriptor? = null
|
||||
private var input: DataInputStream? = null
|
||||
private var output: DataOutputStream? = null
|
||||
private val serveThread = thread(
|
||||
start = false,
|
||||
name = "AgentSessionBridge-$sessionId",
|
||||
) {
|
||||
serveLoop()
|
||||
}
|
||||
|
||||
fun start() {
|
||||
serveThread.start()
|
||||
}
|
||||
|
||||
override fun close() {
|
||||
if (!closed.compareAndSet(false, true)) {
|
||||
return
|
||||
}
|
||||
runCatching { input?.close() }
|
||||
runCatching { output?.close() }
|
||||
runCatching { bridgeFd?.close() }
|
||||
serveThread.interrupt()
|
||||
}
|
||||
|
||||
private fun serveLoop() {
|
||||
try {
|
||||
val fd = agentManager.openSessionBridge(sessionId)
|
||||
bridgeFd = fd
|
||||
input = DataInputStream(BufferedInputStream(FileInputStream(fd.fileDescriptor)))
|
||||
output = DataOutputStream(BufferedOutputStream(FileOutputStream(fd.fileDescriptor)))
|
||||
Log.i(TAG, "Opened framework session bridge for $sessionId")
|
||||
while (!closed.get()) {
|
||||
val request = try {
|
||||
readMessage(input ?: break)
|
||||
} catch (_: EOFException) {
|
||||
return
|
||||
}
|
||||
val response = handleRequest(request)
|
||||
writeMessage(output ?: break, response)
|
||||
}
|
||||
} catch (err: Exception) {
|
||||
if (!closed.get()) {
|
||||
Log.w(TAG, "Session bridge failed for $sessionId", err)
|
||||
}
|
||||
} finally {
|
||||
runningBridges.remove(sessionId, this)
|
||||
close()
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleRequest(request: JSONObject): JSONObject {
|
||||
val requestId = request.optString("requestId")
|
||||
return runCatching {
|
||||
when (request.optString("method")) {
|
||||
METHOD_GET_RUNTIME_STATUS -> {
|
||||
val status = AgentCodexAppServerClient.readRuntimeStatus(context)
|
||||
JSONObject()
|
||||
.put("requestId", requestId)
|
||||
.put("ok", true)
|
||||
.put(
|
||||
"runtimeStatus",
|
||||
JSONObject()
|
||||
.put("authenticated", status.authenticated)
|
||||
.put("accountEmail", status.accountEmail)
|
||||
.put("clientCount", status.clientCount)
|
||||
.put("modelProviderId", status.modelProviderId)
|
||||
.put("configuredModel", status.configuredModel)
|
||||
.put("effectiveModel", status.effectiveModel)
|
||||
.put("upstreamBaseUrl", status.upstreamBaseUrl),
|
||||
)
|
||||
}
|
||||
METHOD_SEND_RESPONSES_REQUEST -> {
|
||||
val httpResponse = AgentResponsesProxy.sendResponsesRequest(
|
||||
context,
|
||||
request.optString("requestBody"),
|
||||
)
|
||||
JSONObject()
|
||||
.put("requestId", requestId)
|
||||
.put("ok", true)
|
||||
.put(
|
||||
"httpResponse",
|
||||
JSONObject()
|
||||
.put("statusCode", httpResponse.statusCode)
|
||||
.put("body", httpResponse.body),
|
||||
)
|
||||
}
|
||||
else -> {
|
||||
JSONObject()
|
||||
.put("requestId", requestId)
|
||||
.put("ok", false)
|
||||
.put("error", "Unsupported bridge method: ${request.optString("method")}")
|
||||
}
|
||||
}
|
||||
}.getOrElse { err ->
|
||||
JSONObject()
|
||||
.put("requestId", requestId)
|
||||
.put("ok", false)
|
||||
.put("error", err.message ?: err::class.java.simpleName)
|
||||
}
|
||||
}
|
||||
|
||||
private fun readMessage(input: DataInputStream): JSONObject {
|
||||
val size = input.readInt()
|
||||
if (size <= 0) {
|
||||
throw IOException("Invalid session bridge message length: $size")
|
||||
}
|
||||
val payload = ByteArray(size)
|
||||
input.readFully(payload)
|
||||
return JSONObject(payload.toString(StandardCharsets.UTF_8))
|
||||
}
|
||||
|
||||
private fun writeMessage(
|
||||
output: DataOutputStream,
|
||||
message: JSONObject,
|
||||
) {
|
||||
val payload = message.toString().toByteArray(StandardCharsets.UTF_8)
|
||||
output.writeInt(payload.size)
|
||||
output.write(payload)
|
||||
output.flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,11 @@ import java.util.concurrent.Executor
|
||||
|
||||
class AgentSessionController(context: Context) {
|
||||
companion object {
|
||||
private const val BRIDGE_REQUEST_PREFIX = "__codex_bridge__ "
|
||||
private const val BRIDGE_RESPONSE_PREFIX = "__codex_bridge_result__ "
|
||||
private const val DIAGNOSTIC_NOT_LOADED = "Diagnostics not loaded."
|
||||
private const val MAX_TIMELINE_EVENTS = 12
|
||||
private const val MAX_EVENT_MESSAGE_CHARS = 240
|
||||
private const val PREFERRED_GENIE_PACKAGE = "com.openai.codex.genie"
|
||||
private const val QUESTION_ANSWER_RETRY_COUNT = 10
|
||||
private const val QUESTION_ANSWER_RETRY_DELAY_MS = 50L
|
||||
@@ -44,8 +49,7 @@ class AgentSessionController(context: Context) {
|
||||
val manager = agentManager ?: return AgentSnapshot.unavailable
|
||||
val roleHolders = manager.getGenieRoleHolders(currentUserId())
|
||||
val selectedGeniePackage = selectGeniePackage(roleHolders)
|
||||
val sessionDetails = manager.getSessions(currentUserId()).map { session ->
|
||||
val events = manager.getSessionEvents(session.sessionId)
|
||||
var sessionDetails = manager.getSessions(currentUserId()).map { session ->
|
||||
AgentSessionDetails(
|
||||
sessionId = session.sessionId,
|
||||
parentSessionId = session.parentSessionId,
|
||||
@@ -54,13 +58,27 @@ class AgentSessionController(context: Context) {
|
||||
state = session.state,
|
||||
stateLabel = stateToString(session.state),
|
||||
targetDetached = session.isTargetDetached,
|
||||
latestQuestion = findLastEventMessage(events, AgentSessionEvent.TYPE_QUESTION),
|
||||
latestResult = findLastEventMessage(events, AgentSessionEvent.TYPE_RESULT),
|
||||
latestError = findLastEventMessage(events, AgentSessionEvent.TYPE_ERROR),
|
||||
latestTrace = findLastEventMessage(events, AgentSessionEvent.TYPE_TRACE),
|
||||
timeline = renderTimeline(events),
|
||||
latestQuestion = null,
|
||||
latestResult = null,
|
||||
latestError = null,
|
||||
latestTrace = null,
|
||||
timeline = DIAGNOSTIC_NOT_LOADED,
|
||||
)
|
||||
}
|
||||
val selectedSessionId = chooseSelectedSession(sessionDetails, focusedSessionId)?.sessionId
|
||||
val parentSessionId = selectedSessionId?.let { selectedId ->
|
||||
findParentSession(sessionDetails, sessionDetails.firstOrNull { it.sessionId == selectedId })?.sessionId
|
||||
}
|
||||
val diagnosticSessionIds = linkedSetOf<String>().apply {
|
||||
parentSessionId?.let(::add)
|
||||
selectedSessionId?.let(::add)
|
||||
}
|
||||
val diagnosticsBySessionId = diagnosticSessionIds.associateWith { sessionId ->
|
||||
loadSessionDiagnostics(manager, sessionId)
|
||||
}
|
||||
sessionDetails = sessionDetails.map { session ->
|
||||
diagnosticsBySessionId[session.sessionId]?.let(session::withDiagnostics) ?: session
|
||||
}
|
||||
val selectedSession = chooseSelectedSession(sessionDetails, focusedSessionId)
|
||||
val parentSession = findParentSession(sessionDetails, selectedSession)
|
||||
val relatedSessions = if (parentSession == null) {
|
||||
@@ -255,18 +273,76 @@ class AgentSessionController(context: Context) {
|
||||
for (index in events.indices.reversed()) {
|
||||
val event = events[index]
|
||||
if (event.type == type && event.message != null) {
|
||||
return event.message
|
||||
return summarizeEventMessage(event.message)
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
private fun loadSessionDiagnostics(manager: AgentManager, sessionId: String): SessionDiagnostics {
|
||||
val events = manager.getSessionEvents(sessionId)
|
||||
return SessionDiagnostics(
|
||||
latestQuestion = findLastEventMessage(events, AgentSessionEvent.TYPE_QUESTION),
|
||||
latestResult = findLastEventMessage(events, AgentSessionEvent.TYPE_RESULT),
|
||||
latestError = findLastEventMessage(events, AgentSessionEvent.TYPE_ERROR),
|
||||
latestTrace = findLastEventMessage(events, AgentSessionEvent.TYPE_TRACE),
|
||||
timeline = renderTimeline(events),
|
||||
)
|
||||
}
|
||||
|
||||
private fun renderTimeline(events: List<AgentSessionEvent>): String {
|
||||
if (events.isEmpty()) {
|
||||
return "No framework events yet."
|
||||
}
|
||||
return events.joinToString("\n") { event ->
|
||||
"${eventTypeToString(event.type)}: ${event.message ?: ""}"
|
||||
return events.takeLast(MAX_TIMELINE_EVENTS).joinToString("\n") { event ->
|
||||
"${eventTypeToString(event.type)}: ${summarizeEventMessage(event.message).orEmpty()}"
|
||||
}
|
||||
}
|
||||
|
||||
private fun summarizeEventMessage(message: String?): String? {
|
||||
val trimmed = message?.trim()?.takeIf(String::isNotEmpty) ?: return null
|
||||
if (trimmed.startsWith(BRIDGE_REQUEST_PREFIX)) {
|
||||
return summarizeBridgeRequest(trimmed)
|
||||
}
|
||||
if (trimmed.startsWith(BRIDGE_RESPONSE_PREFIX)) {
|
||||
return summarizeBridgeResponse(trimmed)
|
||||
}
|
||||
return if (trimmed.length <= MAX_EVENT_MESSAGE_CHARS) {
|
||||
trimmed
|
||||
} else {
|
||||
trimmed.take(MAX_EVENT_MESSAGE_CHARS) + "…"
|
||||
}
|
||||
}
|
||||
|
||||
private fun summarizeBridgeRequest(message: String): String {
|
||||
val request = runCatching {
|
||||
org.json.JSONObject(message.removePrefix(BRIDGE_REQUEST_PREFIX))
|
||||
}.getOrNull()
|
||||
val method = request?.optString("method")?.ifEmpty { "unknown" } ?: "unknown"
|
||||
val requestId = request?.optString("requestId")?.takeIf(String::isNotBlank)
|
||||
return buildString {
|
||||
append("Bridge request: ")
|
||||
append(method)
|
||||
requestId?.let { append(" (#$it)") }
|
||||
}
|
||||
}
|
||||
|
||||
private fun summarizeBridgeResponse(message: String): String {
|
||||
val response = runCatching {
|
||||
org.json.JSONObject(message.removePrefix(BRIDGE_RESPONSE_PREFIX))
|
||||
}.getOrNull()
|
||||
val requestId = response?.optString("requestId")?.takeIf(String::isNotBlank)
|
||||
val statusCode = response?.optJSONObject("httpResponse")?.optInt("statusCode")
|
||||
val ok = response?.optBoolean("ok")
|
||||
return buildString {
|
||||
append("Bridge response")
|
||||
requestId?.let { append(" (#$it)") }
|
||||
if (statusCode != null) {
|
||||
append(": HTTP $statusCode")
|
||||
} else if (ok != null) {
|
||||
append(": ")
|
||||
append(if (ok) "ok" else "error")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -345,6 +421,24 @@ data class AgentSessionDetails(
|
||||
val latestError: String?,
|
||||
val latestTrace: String?,
|
||||
val timeline: String,
|
||||
) {
|
||||
fun withDiagnostics(diagnostics: SessionDiagnostics): AgentSessionDetails {
|
||||
return copy(
|
||||
latestQuestion = diagnostics.latestQuestion,
|
||||
latestResult = diagnostics.latestResult,
|
||||
latestError = diagnostics.latestError,
|
||||
latestTrace = diagnostics.latestTrace,
|
||||
timeline = diagnostics.timeline,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
data class SessionDiagnostics(
|
||||
val latestQuestion: String?,
|
||||
val latestResult: String?,
|
||||
val latestError: String?,
|
||||
val latestTrace: String?,
|
||||
val timeline: String,
|
||||
)
|
||||
|
||||
data class SessionStartResult(
|
||||
|
||||
@@ -22,6 +22,7 @@ class CodexAgentService : AgentService() {
|
||||
private const val MAX_AUTO_ANSWER_CONTEXT_CHARS = 800
|
||||
private val handledGenieQuestions = java.util.concurrent.ConcurrentHashMap.newKeySet<String>()
|
||||
private val pendingGenieQuestions = java.util.concurrent.ConcurrentHashMap.newKeySet<String>()
|
||||
private val pendingQuestionLoads = java.util.concurrent.ConcurrentHashMap.newKeySet<String>()
|
||||
private val handledBridgeRequests = java.util.concurrent.ConcurrentHashMap.newKeySet<String>()
|
||||
}
|
||||
|
||||
@@ -38,30 +39,71 @@ class CodexAgentService : AgentService() {
|
||||
|
||||
override fun onCreate() {
|
||||
super.onCreate()
|
||||
AgentSocketBridgeServer.ensureStarted(this)
|
||||
}
|
||||
|
||||
override fun onSessionChanged(session: AgentSessionInfo) {
|
||||
Log.i(TAG, "onSessionChanged $session")
|
||||
maybeAutoAnswerGenieQuestion(session)
|
||||
updateQuestionNotification(session)
|
||||
agentManager?.let { manager ->
|
||||
if (shouldServeSessionBridge(session)) {
|
||||
AgentSessionBridgeServer.ensureStarted(this, manager, session.sessionId)
|
||||
} else {
|
||||
AgentSessionBridgeServer.closeSession(session.sessionId)
|
||||
}
|
||||
}
|
||||
if (session.state != AgentSessionInfo.STATE_WAITING_FOR_USER) {
|
||||
AgentQuestionNotifier.cancel(this, session.sessionId)
|
||||
return
|
||||
}
|
||||
if (!pendingQuestionLoads.add(session.sessionId)) {
|
||||
return
|
||||
}
|
||||
thread(name = "CodexAgentQuestionLoad-${session.sessionId}") {
|
||||
try {
|
||||
handleWaitingSession(session)
|
||||
} finally {
|
||||
pendingQuestionLoads.remove(session.sessionId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun onSessionRemoved(sessionId: String) {
|
||||
Log.i(TAG, "onSessionRemoved sessionId=$sessionId")
|
||||
AgentSessionBridgeServer.closeSession(sessionId)
|
||||
AgentQuestionNotifier.cancel(this, sessionId)
|
||||
handledGenieQuestions.removeIf { it.startsWith("$sessionId:") }
|
||||
handledBridgeRequests.removeIf { it.startsWith("$sessionId:") }
|
||||
pendingGenieQuestions.removeIf { it.startsWith("$sessionId:") }
|
||||
}
|
||||
|
||||
private fun maybeAutoAnswerGenieQuestion(session: AgentSessionInfo) {
|
||||
if (session.state != AgentSessionInfo.STATE_WAITING_FOR_USER) {
|
||||
return
|
||||
private fun shouldServeSessionBridge(session: AgentSessionInfo): Boolean {
|
||||
if (session.targetPackage.isNullOrBlank()) {
|
||||
return false
|
||||
}
|
||||
if (session.geniePackage.isNullOrBlank()) {
|
||||
return false
|
||||
}
|
||||
return when (session.state) {
|
||||
AgentSessionInfo.STATE_COMPLETED,
|
||||
AgentSessionInfo.STATE_CANCELLED,
|
||||
AgentSessionInfo.STATE_FAILED,
|
||||
-> false
|
||||
else -> true
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleWaitingSession(session: AgentSessionInfo) {
|
||||
val manager = agentManager ?: return
|
||||
val events = manager.getSessionEvents(session.sessionId)
|
||||
val question = findLatestQuestion(events) ?: return
|
||||
updateQuestionNotification(session, question)
|
||||
maybeAutoAnswerGenieQuestion(session, question, events)
|
||||
}
|
||||
|
||||
private fun maybeAutoAnswerGenieQuestion(
|
||||
session: AgentSessionInfo,
|
||||
question: String,
|
||||
events: List<AgentSessionEvent>,
|
||||
) {
|
||||
val questionKey = genieQuestionKey(session.sessionId, question)
|
||||
if (handledGenieQuestions.contains(questionKey) || !pendingGenieQuestions.add(questionKey)) {
|
||||
return
|
||||
@@ -72,12 +114,12 @@ class CodexAgentService : AgentService() {
|
||||
if (isBridgeQuestion(question)) {
|
||||
answerBridgeQuestion(session, question)
|
||||
handledGenieQuestions.add(questionKey)
|
||||
AgentQuestionNotifier.cancel(this, session.sessionId)
|
||||
Log.i(TAG, "Answered bridge question for ${session.sessionId}")
|
||||
} else {
|
||||
when (val result = requestGenieAutoAnswer(session, question, events)) {
|
||||
AutoAnswerResult.Answered -> {
|
||||
handledGenieQuestions.add(questionKey)
|
||||
AgentQuestionNotifier.cancel(this, session.sessionId)
|
||||
Log.i(TAG, "Answered bridge question for ${session.sessionId}")
|
||||
} else {
|
||||
when (val result = requestGenieAutoAnswer(session, question, events)) {
|
||||
AutoAnswerResult.Answered -> {
|
||||
handledGenieQuestions.add(questionKey)
|
||||
AgentQuestionNotifier.cancel(this, session.sessionId)
|
||||
Log.i(TAG, "Auto-answered Genie question for ${session.sessionId}")
|
||||
}
|
||||
@@ -108,14 +150,8 @@ class CodexAgentService : AgentService() {
|
||||
}
|
||||
}
|
||||
|
||||
private fun updateQuestionNotification(session: AgentSessionInfo) {
|
||||
if (session.state != AgentSessionInfo.STATE_WAITING_FOR_USER) {
|
||||
AgentQuestionNotifier.cancel(this, session.sessionId)
|
||||
return
|
||||
}
|
||||
val manager = agentManager ?: return
|
||||
val question = findLatestQuestion(manager.getSessionEvents(session.sessionId))
|
||||
if (question.isNullOrBlank()) {
|
||||
private fun updateQuestionNotification(session: AgentSessionInfo, question: String) {
|
||||
if (question.isBlank()) {
|
||||
AgentQuestionNotifier.cancel(this, session.sessionId)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -94,7 +94,6 @@ class MainActivity : Activity() {
|
||||
|
||||
override fun onCreate(savedInstanceState: Bundle?) {
|
||||
super.onCreate(savedInstanceState)
|
||||
AgentSocketBridgeServer.ensureStarted(this)
|
||||
setContentView(R.layout.activity_main)
|
||||
updatePaths()
|
||||
handleSessionIntent(intent)
|
||||
|
||||
Reference in New Issue
Block a user