Host Agent planning in codex app-server

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Iliyan Malchev
2026-03-19 13:21:53 -07:00
parent c438fc2fb2
commit f1a739e7eb
11 changed files with 447 additions and 190 deletions

View File

@@ -0,0 +1,360 @@
package com.openai.codexd
import android.content.Context
import android.util.Log
import java.io.BufferedWriter
import java.io.File
import java.io.IOException
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import org.json.JSONArray
import org.json.JSONObject
object AgentCodexAppServerClient {
private const val TAG = "AgentCodexClient"
private const val REQUEST_TIMEOUT_MS = 30_000L
data class RuntimeStatus(
val authenticated: Boolean,
val accountEmail: String?,
val clientCount: Int,
val modelProviderId: String,
val configuredModel: String?,
val effectiveModel: String?,
val upstreamBaseUrl: String,
)
private val lifecycleLock = Any()
private val requestIdSequence = AtomicInteger(1)
private val activeRequests = AtomicInteger(0)
private val pendingResponses = ConcurrentHashMap<String, LinkedBlockingQueue<JSONObject>>()
private val notifications = LinkedBlockingQueue<JSONObject>()
private var process: Process? = null
private var writer: BufferedWriter? = null
private var stdoutThread: Thread? = null
private var stderrThread: Thread? = null
private var initialized = false
fun requestText(
context: Context,
instructions: String,
prompt: String,
): String = synchronized(lifecycleLock) {
ensureStarted(context.applicationContext)
activeRequests.incrementAndGet()
try {
notifications.clear()
val threadId = startThread(context.applicationContext, instructions)
startTurn(threadId, prompt)
waitForTurnCompletion()
} finally {
activeRequests.decrementAndGet()
}
}
fun readRuntimeStatus(context: Context): RuntimeStatus = synchronized(lifecycleLock) {
ensureStarted(context.applicationContext)
activeRequests.incrementAndGet()
try {
val accountResponse = request(
method = "account/read",
params = JSONObject().put("refreshToken", false),
)
val configResponse = request(
method = "config/read",
params = JSONObject().put("includeLayers", false),
)
parseRuntimeStatus(accountResponse, configResponse)
} finally {
activeRequests.decrementAndGet()
}
}
private fun ensureStarted(context: Context) {
if (process?.isAlive == true && writer != null && initialized) {
return
}
closeProcess()
notifications.clear()
pendingResponses.clear()
val codexHome = File(context.filesDir, "codex-home").apply(File::mkdirs)
val startedProcess = ProcessBuilder(
listOf(
CodexCliBinaryLocator.resolve(context).absolutePath,
"-c",
"enable_request_compression=false",
"app-server",
"--listen",
"stdio://",
),
).apply {
environment()["CODEX_HOME"] = codexHome.absolutePath
environment()["RUST_LOG"] = "info"
}.start()
process = startedProcess
writer = startedProcess.outputStream.bufferedWriter()
startStdoutPump(startedProcess)
startStderrPump(startedProcess)
initialize()
initialized = true
}
private fun closeProcess() {
stdoutThread?.interrupt()
stderrThread?.interrupt()
runCatching { writer?.close() }
writer = null
process?.destroy()
process = null
initialized = false
}
private fun initialize() {
request(
method = "initialize",
params = JSONObject()
.put(
"clientInfo",
JSONObject()
.put("name", "android_agent")
.put("title", "Android Agent")
.put("version", "0.1.0"),
)
.put("capabilities", JSONObject().put("experimentalApi", true)),
)
notify("initialized", JSONObject())
}
private fun startThread(
context: Context,
instructions: String,
): String {
val result = request(
method = "thread/start",
params = JSONObject()
.put("approvalPolicy", "never")
.put("sandbox", "read-only")
.put("ephemeral", true)
.put("cwd", context.filesDir.absolutePath)
.put("serviceName", "android_agent")
.put("baseInstructions", instructions),
)
return result.getJSONObject("thread").getString("id")
}
private fun startTurn(
threadId: String,
prompt: String,
) {
request(
method = "turn/start",
params = JSONObject()
.put("threadId", threadId)
.put(
"input",
JSONArray().put(
JSONObject()
.put("type", "text")
.put("text", prompt),
),
),
)
}
private fun waitForTurnCompletion(): String {
val streamedAgentMessages = mutableMapOf<String, StringBuilder>()
var finalAgentMessage: String? = null
val deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(REQUEST_TIMEOUT_MS)
while (true) {
val remainingNanos = deadline - System.nanoTime()
if (remainingNanos <= 0L) {
throw IOException("Timed out waiting for Agent turn completion")
}
val notification = notifications.poll(remainingNanos, TimeUnit.NANOSECONDS)
if (notification == null) {
checkProcessAlive()
continue
}
if (notification.has("id") && notification.has("method")) {
rejectUnsupportedServerRequest(notification)
continue
}
val params = notification.optJSONObject("params") ?: JSONObject()
when (notification.optString("method")) {
"item/agentMessage/delta" -> {
val itemId = params.optString("itemId")
if (itemId.isNotBlank()) {
streamedAgentMessages.getOrPut(itemId, ::StringBuilder)
.append(params.optString("delta"))
}
}
"item/completed" -> {
val item = params.optJSONObject("item") ?: continue
if (item.optString("type") == "agentMessage") {
val itemId = item.optString("id")
val text = item.optString("text").ifBlank {
streamedAgentMessages[itemId]?.toString().orEmpty()
}
if (text.isNotBlank()) {
finalAgentMessage = text
}
}
}
"turn/completed" -> {
val turn = params.optJSONObject("turn") ?: JSONObject()
return when (turn.optString("status")) {
"completed" -> finalAgentMessage?.takeIf(String::isNotBlank)
?: throw IOException("Agent turn completed without an assistant message")
"interrupted" -> throw IOException("Agent turn interrupted")
else -> throw IOException(
turn.opt("error")?.toString()
?: "Agent turn failed with status ${turn.optString("status", "unknown")}",
)
}
}
}
}
}
private fun rejectUnsupportedServerRequest(message: JSONObject) {
val requestId = message.opt("id") ?: return
val method = message.optString("method", "unknown")
sendMessage(
JSONObject()
.put("id", requestId)
.put(
"error",
JSONObject()
.put("code", -32601)
.put("message", "Unsupported Agent app-server request: $method"),
),
)
}
private fun request(
method: String,
params: JSONObject,
): JSONObject {
val requestId = requestIdSequence.getAndIncrement().toString()
val responseQueue = LinkedBlockingQueue<JSONObject>(1)
pendingResponses[requestId] = responseQueue
try {
sendMessage(
JSONObject()
.put("id", requestId)
.put("method", method)
.put("params", params),
)
val response = responseQueue.poll(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS)
?: throw IOException("Timed out waiting for $method response")
val error = response.optJSONObject("error")
if (error != null) {
throw IOException("$method failed: ${error.optString("message", error.toString())}")
}
return response.optJSONObject("result") ?: JSONObject()
} finally {
pendingResponses.remove(requestId)
}
}
private fun notify(
method: String,
params: JSONObject,
) {
sendMessage(
JSONObject()
.put("method", method)
.put("params", params),
)
}
private fun sendMessage(message: JSONObject) {
val activeWriter = writer ?: throw IOException("Agent app-server writer unavailable")
activeWriter.write(message.toString())
activeWriter.newLine()
activeWriter.flush()
}
private fun startStdoutPump(process: Process) {
stdoutThread = Thread {
process.inputStream.bufferedReader().useLines { lines ->
lines.forEach { line ->
if (line.isBlank()) {
return@forEach
}
val message = runCatching { JSONObject(line) }
.getOrElse { err ->
Log.w(TAG, "Failed to parse Agent app-server stdout line", err)
return@forEach
}
routeInbound(message)
}
}
}.also {
it.name = "AgentCodexStdout"
it.start()
}
}
private fun startStderrPump(process: Process) {
stderrThread = Thread {
process.errorStream.bufferedReader().useLines { lines ->
lines.forEach { line ->
if (line.isNotBlank()) {
Log.i(TAG, line)
}
}
}
}.also {
it.name = "AgentCodexStderr"
it.start()
}
}
private fun routeInbound(message: JSONObject) {
if (message.has("id") && !message.has("method")) {
pendingResponses[message.get("id").toString()]?.offer(message)
return
}
notifications.offer(message)
}
private fun checkProcessAlive() {
val activeProcess = process ?: throw IOException("Agent app-server unavailable")
if (!activeProcess.isAlive) {
initialized = false
throw IOException("Agent app-server exited with code ${activeProcess.exitValue()}")
}
}
private fun parseRuntimeStatus(
accountResponse: JSONObject,
configResponse: JSONObject,
): RuntimeStatus {
val account = accountResponse.optJSONObject("account")
val config = configResponse.optJSONObject("config") ?: JSONObject()
val configuredModel = config.optString("model").ifBlank { null }
val configuredProvider = config.optString("model_provider").ifBlank { null }
val accountType = account?.optString("type").orEmpty()
return RuntimeStatus(
authenticated = account != null,
accountEmail = account?.optString("email")?.ifBlank { null },
clientCount = activeRequests.get(),
modelProviderId = configuredProvider ?: inferModelProviderId(accountType),
configuredModel = configuredModel,
effectiveModel = configuredModel,
upstreamBaseUrl = config.optString("chatgpt_base_url").ifBlank { "provider-default" },
)
}
private fun inferModelProviderId(accountType: String): String {
return when (accountType) {
"chatgpt" -> "chatgpt"
"apiKey" -> "openai"
else -> "unknown"
}
}
}

View File

@@ -54,19 +54,13 @@ object AgentTaskPlanner {
usedOverride = true,
)
}
val runtimeStatus = CodexdLocalClient.waitForRuntimeStatus(context)
if (!runtimeStatus.authenticated) {
throw IOException("codexd is not authenticated")
}
val model = runtimeStatus.effectiveModel ?: throw IOException("codexd effective model unavailable")
val launchableApps = AgentInstalledAppCatalog.listLaunchableApps(context)
.take(MAX_LAUNCHABLE_APPS)
if (launchableApps.isEmpty()) {
throw IOException("No launchable apps available for planning")
}
val planText = CodexResponsesClient.requestText(
val planText = AgentCodexAppServerClient.requestText(
context = context,
model = model,
instructions = PLANNER_INSTRUCTIONS,
prompt = buildPlannerPrompt(userObjective, launchableApps),
)

View File

@@ -19,7 +19,7 @@ class CodexAgentBridgeService : Service() {
private val binder = object : ICodexAgentBridgeService.Stub() {
override fun getRuntimeStatus(): BridgeRuntimeStatus {
val status = runCatching {
CodexdLocalClient.waitForRuntimeStatus(this@CodexAgentBridgeService)
AgentCodexAppServerClient.readRuntimeStatus(this@CodexAgentBridgeService)
}.getOrElse { err ->
throw err.asBinderError("getRuntimeStatus")
}

View File

@@ -128,14 +128,12 @@ class CodexAgentService : AgentService() {
question: String,
events: List<AgentSessionEvent>,
): String {
val runtimeStatus = CodexdLocalClient.waitForRuntimeStatus(this)
val runtimeStatus = AgentCodexAppServerClient.readRuntimeStatus(this)
if (!runtimeStatus.authenticated) {
throw IOException("codexd is not authenticated")
throw IOException("Agent runtime is not authenticated")
}
val model = runtimeStatus.effectiveModel ?: throw IOException("codexd effective model unavailable")
return CodexResponsesClient.requestText(
return AgentCodexAppServerClient.requestText(
context = this,
model = model,
instructions = AUTO_ANSWER_INSTRUCTIONS,
prompt = buildAutoAnswerPrompt(session, question, events),
)

View File

@@ -0,0 +1,15 @@
package com.openai.codexd
import android.content.Context
import java.io.File
import java.io.IOException
object CodexCliBinaryLocator {
fun resolve(context: Context): File {
val binary = File(context.applicationInfo.nativeLibraryDir, "libcodex.so")
if (!binary.exists()) {
throw IOException("codex binary missing at ${binary.absolutePath}")
}
return binary
}
}

View File

@@ -1,136 +0,0 @@
package com.openai.codexd
import java.io.IOException
import org.json.JSONArray
import org.json.JSONObject
object CodexResponsesClient {
fun requestText(
context: android.content.Context,
model: String,
instructions: String,
prompt: String,
): String {
val requestBody = buildRequest(
model = model,
instructions = instructions,
prompt = prompt,
)
val response = CodexdLocalClient.waitForResponse(
context = context,
method = "POST",
path = "/v1/responses",
body = requestBody.toString(),
)
if (response.statusCode != 200) {
throw IOException("HTTP ${response.statusCode}: ${response.body}")
}
return parseResponsesOutputText(response.body)
}
fun buildRequest(
model: String,
instructions: String,
prompt: String,
): JSONObject {
return JSONObject()
.put("model", model)
.put("store", false)
.put("stream", true)
.put("instructions", instructions)
.put(
"input",
JSONArray().put(
JSONObject()
.put("role", "user")
.put(
"content",
JSONArray().put(
JSONObject()
.put("type", "input_text")
.put("text", prompt),
),
),
),
)
}
fun parseResponsesOutputText(body: String): String {
val trimmedBody = body.trim()
if (trimmedBody.startsWith("event:") || trimmedBody.startsWith("data:")) {
return parseResponsesStreamOutputText(trimmedBody)
}
return parseResponsesJsonOutputText(JSONObject(trimmedBody))
}
private fun parseResponsesJsonOutputText(data: JSONObject): String {
val directOutput = data.optString("output_text")
if (directOutput.isNotBlank()) {
return directOutput
}
val output = data.optJSONArray("output")
?: throw IOException("Responses payload missing output")
val combined = buildString {
for (outputIndex in 0 until output.length()) {
val item = output.optJSONObject(outputIndex) ?: continue
val content = item.optJSONArray("content") ?: continue
for (contentIndex in 0 until content.length()) {
val part = content.optJSONObject(contentIndex) ?: continue
if (part.optString("type") == "output_text") {
append(part.optString("text"))
}
}
}
}
if (combined.isBlank()) {
throw IOException("Responses payload missing output_text content")
}
return combined
}
private fun parseResponsesStreamOutputText(body: String): String {
val deltaText = StringBuilder()
val completedItems = mutableListOf<String>()
body.split("\n\n").forEach { rawEvent ->
val lines = rawEvent.lineSequence().map(String::trimEnd).toList()
if (lines.isEmpty()) {
return@forEach
}
val dataPayload = lines
.filter { it.startsWith("data:") }
.joinToString("\n") { it.removePrefix("data:").trimStart() }
.trim()
if (dataPayload.isEmpty() || dataPayload == "[DONE]") {
return@forEach
}
val event = JSONObject(dataPayload)
when (event.optString("type")) {
"response.output_text.delta" -> deltaText.append(event.optString("delta"))
"response.output_item.done" -> {
val item = event.optJSONObject("item") ?: return@forEach
val content = item.optJSONArray("content") ?: return@forEach
val text = buildString {
for (index in 0 until content.length()) {
val part = content.optJSONObject(index) ?: continue
if (part.optString("type") == "output_text") {
append(part.optString("text"))
}
}
}
if (text.isNotBlank()) {
completedItems += text
}
}
"response.failed" -> throw IOException(event.toString())
}
}
if (deltaText.isNotBlank()) {
return deltaText.toString()
}
val completedText = completedItems.joinToString("")
if (completedText.isNotBlank()) {
return completedText
}
throw IOException("Responses stream missing output_text content")
}
}

View File

@@ -177,7 +177,6 @@ class MainActivity : Activity() {
showToast("Enter a prompt")
return
}
ensureCodexdRunningForAgent()
thread {
val result = runCatching {
val plan = AgentTaskPlanner.plan(
@@ -617,9 +616,9 @@ class MainActivity : Activity() {
val runtimeStatus = latestRuntimeStatus
if (runtimeStatus == null) {
return if (isServiceRunning) {
"Agent runtime: probing codexd..."
"codexd bridge: probing..."
} else {
"Agent runtime: codexd unavailable"
"codexd bridge: unavailable"
}
}
val authSummary = if (runtimeStatus.authenticated) {
@@ -632,7 +631,7 @@ class MainActivity : Activity() {
?.let { ", configured=$it" }
?: ""
val effectiveModel = runtimeStatus.effectiveModel ?: "unknown"
return "Agent runtime: $authSummary, provider=${runtimeStatus.modelProviderId}, effective=$effectiveModel$configuredModelSuffix, clients=${runtimeStatus.clientCount}, base=${runtimeStatus.upstreamBaseUrl}"
return "codexd bridge: $authSummary, provider=${runtimeStatus.modelProviderId}, effective=$effectiveModel$configuredModelSuffix, clients=${runtimeStatus.clientCount}, base=${runtimeStatus.upstreamBaseUrl}"
}
private fun updateAuthUi(
@@ -717,16 +716,6 @@ class MainActivity : Activity() {
}
}
private fun ensureCodexdRunningForAgent() {
val intent = Intent(this, CodexdForegroundService::class.java).apply {
action = CodexdForegroundService.ACTION_START
putExtra(CodexdForegroundService.EXTRA_SOCKET_PATH, defaultSocketPath())
putExtra(CodexdForegroundService.EXTRA_CODEX_HOME, defaultCodexHome())
}
startForegroundService(intent)
isServiceRunning = true
}
private data class AuthStatus(
val authenticated: Boolean,
val accountEmail: String?,